diff options
| author | Linus Färnstrand <linus@mullvad.net> | 2017-05-04 17:29:44 +0200 |
|---|---|---|
| committer | Linus Färnstrand <linus@mullvad.net> | 2017-05-04 17:30:23 +0200 |
| commit | 3ee8af5fbe38815f53b8354c6f2ea6fd54e37ccf (patch) | |
| tree | eecd5a029998c86bf9063a717477d76917131ee5 | |
| parent | cb04c13934b186e940a4a364c20420b6579bbaf2 (diff) | |
| download | mullvadvpn-3ee8af5fbe38815f53b8354c6f2ea6fd54e37ccf.tar.xz mullvadvpn-3ee8af5fbe38815f53b8354c6f2ea6fd54e37ccf.zip | |
Simplify ChildMonitor to use duct and no generics
| -rw-r--r-- | talpid_core/src/process/mod.rs | 26 | ||||
| -rw-r--r-- | talpid_core/src/process/monitor.rs | 290 |
2 files changed, 69 insertions, 247 deletions
diff --git a/talpid_core/src/process/mod.rs b/talpid_core/src/process/mod.rs index 3259f90524..a7d76ea9f4 100644 --- a/talpid_core/src/process/mod.rs +++ b/talpid_core/src/process/mod.rs @@ -1,31 +1,5 @@ -use std::io; - -use std::process::{ChildStderr, ChildStdout}; - /// A module for monitoring child processes and get notified of events on them. pub mod monitor; -use self::monitor::MonitoredChild; /// A module for all OpenVPN related process management. pub mod openvpn; - -use clonablechild::ClonableChild; - - -impl MonitoredChild for ClonableChild { - fn wait(&self) -> io::Result<bool> { - ClonableChild::wait(self).map(|exit_status| exit_status.success()) - } - - fn kill(&self) -> io::Result<()> { - ClonableChild::kill(self) - } - - fn stdout(&mut self) -> Option<ChildStdout> { - self.stdout() - } - - fn stderr(&mut self) -> Option<ChildStderr> { - self.stderr() - } -} diff --git a/talpid_core/src/process/monitor.rs b/talpid_core/src/process/monitor.rs index c4e8b402d4..09cf3bef82 100644 --- a/talpid_core/src/process/monitor.rs +++ b/talpid_core/src/process/monitor.rs @@ -1,144 +1,50 @@ +use duct; + use std::io; -use std::process::{ChildStderr, ChildStdout}; -use std::sync::{Arc, Mutex}; +use std::process; +use std::sync::Arc; use std::thread; - -error_chain! { - errors { - /// The transition could not be made because the state machine was not in a state that - /// could transition to the desired state. - InvalidState { - description("Invalid state for desired transition") - } - /// Error representing a failure in spawning the child process - Spawn { - description("Unable to spawn child process") - } - /// Error representing a failure in sending a kill signal to the child process - Kill { - description("Unable to send kill signal to process") - } - } -} - -/// Trait for objects that represent child processes that `ChildMonitor` can monitor -pub trait MonitoredChild: Clone + Send + 'static { - /// Waits for the child to exit completely, returning if the child exited cleanly or not. - fn wait(&self) -> io::Result<bool>; - - /// Forces the child to exit. - fn kill(&self) -> io::Result<()>; - - /// Retreives the stdout stream for the child. - fn stdout(&mut self) -> Option<ChildStdout>; - - /// Retreives the stderr stream for the child. - fn stderr(&mut self) -> Option<ChildStderr>; -} - -/// Trait for objects that can spawn any type of child process object implementing `MonitoredChild`. -pub trait ChildSpawner: Send + 'static { - /// The type of child being spawned. - type Child: MonitoredChild; - - /// Spawns the child process, returning a handle to it on success. - fn spawn(&mut self) -> io::Result<Self::Child>; +/// A child process monitor. Takes care of starting and monitoring a child process and calls the +/// listener on child exit. If the child is still running when a `ChildMonitor` instance goes out +/// of scope it will kill the child and wait for it to exit properly. +pub struct ChildMonitor { + child: Arc<duct::Handle>, + thread: Option<thread::JoinHandle<()>>, } - -enum State<C: MonitoredChild> { - Stopped, - Running(RunningState<C>), -} - -struct RunningState<C: MonitoredChild> { - child: C, - thread_handle: Option<thread::JoinHandle<()>>, -} - -/// A child process monitor. Takes care of starting and monitoring a child process and runs the -/// listener on child exit. -pub struct ChildMonitor<S: ChildSpawner> { - spawner: S, - state: Arc<Mutex<State<S::Child>>>, -} - -impl<S: ChildSpawner> ChildMonitor<S> { - /// Creates a new `ChildMonitor` that spawns processes with the given `spawner`. The new - /// `ChildMonitor` will be in the stopped state and not start any process until you call - /// `start()`. - pub fn new(spawner: S) -> Self { - ChildMonitor { - spawner: spawner, - state: Arc::new(Mutex::new(State::Stopped)), - } - } - - /// Starts the child process and begins to monitor it. `listener` will be called as soon as the +impl ChildMonitor { + /// Starts the child process and begins to monitor it. `on_exit` will be called as soon as the /// child process exits. - pub fn start<L>(&mut self, listener: L) -> Result<(Option<ChildStdout>, Option<ChildStderr>)> - where L: FnMut(bool) + Send + 'static + pub fn start<L>(expression: &duct::Expression, mut on_exit: L) -> io::Result<Self> + where L: FnMut(io::Result<&process::Output>) + Send + 'static { - let mut state_lock = self.state.lock().unwrap(); - if let State::Stopped = *state_lock { - let mut child = self.spawner.spawn().chain_err(|| ErrorKind::Spawn)?; - let io = (child.stdout(), child.stderr()); - let thread_handle = self.spawn_monitor(child.clone(), listener); - *state_lock = State::Running( - RunningState { - child: child, - thread_handle: Some(thread_handle), - }, - ); - Ok(io) - } else { - bail!(ErrorKind::InvalidState); - } + let child = Arc::new(expression.start()?); + let child_clone = child.clone(); + let thread = Some(thread::spawn(move || on_exit(child_clone.wait()))); + Ok(ChildMonitor { child, thread }) } - fn spawn_monitor<L>(&self, child: S::Child, mut listener: L) -> thread::JoinHandle<()> - where L: FnMut(bool) + Send + 'static - { - let state_mutex = self.state.clone(); - thread::spawn( - move || { - let success = child.wait().unwrap_or(false); - { - let mut state_lock = state_mutex.lock().unwrap(); - *state_lock = State::Stopped; - } - listener(success); - }, - ) + /// Wait for the child to exit. Blocking the current thread. The `on_exit` callback is + /// guaranteed to fire before this method returns. + pub fn wait(&mut self) -> io::Result<&process::Output> { + if let Some(thread) = self.thread.take() { + let _ = thread.join(); + } + self.child.wait() } - /// Sends a kill signal to the child process. - pub fn stop(&self) -> Result<()> { - let state_lock = self.state.lock().unwrap(); - if let State::Running(ref running_state) = *state_lock { - running_state.child.kill().chain_err(|| ErrorKind::Kill)?; - Ok(()) - } else { - bail!(ErrorKind::InvalidState); - } + /// Send a kill signal to the child. No need to call `wait` after to free the PID. The monitor + /// will wait for the process for you. + pub fn kill(&self) -> io::Result<()> { + self.child.kill() } } -impl<S: ChildSpawner> Drop for ChildMonitor<S> { +impl Drop for ChildMonitor { fn drop(&mut self) { - let thread_handle = { - let mut state_lock = self.state.lock().unwrap(); - if let State::Running(ref mut state) = *state_lock { - let _ = state.child.kill(); - state.thread_handle.take() - } else { - None - } - }; - if let Some(thread_handle) = thread_handle { - let _ = thread_handle.join(); - } + let _ = self.kill(); + let _ = self.wait(); } } @@ -146,77 +52,11 @@ impl<S: ChildSpawner> Drop for ChildMonitor<S> { #[cfg(test)] mod child_monitor_tests { use super::*; - use std::io; - use std::process::{ChildStderr, ChildStdout}; - use std::sync::{Arc, Mutex}; + use duct::{Expression, cmd}; + use std::sync::mpsc; - use std::thread; use std::time::Duration; - #[derive(Clone)] - struct MockChild { - died: Arc<Mutex<bool>>, - } - - impl MockChild { - pub fn instant_exit() -> Self { - Self::new(true) - } - - pub fn alive_until_kill() -> Self { - Self::new(false) - } - - fn new(died: bool) -> Self { - MockChild { died: Arc::new(Mutex::new(died)) } - } - } - - impl MonitoredChild for MockChild { - fn wait(&self) -> io::Result<bool> { - loop { - if *self.died.lock().unwrap() { - break; - } - thread::sleep(Duration::new(0, 1_000_000)); - } - Ok(true) - } - - fn kill(&self) -> io::Result<()> { - *self.died.lock().unwrap() = true; - Ok(()) - } - - fn stdout(&mut self) -> Option<ChildStdout> { - None - } - - fn stderr(&mut self) -> Option<ChildStderr> { - None - } - } - - struct MockChildSpawner { - spawn_result: Option<MockChild>, - } - - impl MockChildSpawner { - pub fn new(spawn_result: Option<MockChild>) -> Self { - MockChildSpawner { spawn_result: spawn_result } - } - } - - impl ChildSpawner for MockChildSpawner { - type Child = MockChild; - - fn spawn(&mut self) -> io::Result<MockChild> { - self.spawn_result - .clone() - .ok_or(io::Error::new(io::ErrorKind::Other, "Mocking a failed process spawn"),) - } - } - /// Tries to recv a message from the given `$rx` for one second and tries to match it with the /// given expected value, `$expected` macro_rules! assert_event { @@ -226,46 +66,54 @@ mod child_monitor_tests { }} } - #[test] - fn normal_start() { - let spawner = MockChildSpawner::new(Some(MockChild::instant_exit())); - let mut testee = ChildMonitor::new(spawner); + fn echo_cmd(s: &str) -> Expression { + cmd("echo", &[s]).stdout_capture().unchecked() + } - let (tx, rx) = mpsc::channel(); - assert!(testee.start(move |success| tx.send(success).unwrap()).is_ok()); - assert_event!(rx, Ok(true)); + fn invalid_cmd() -> Expression { + cmd("this command does not exist", &[""]).unchecked() } - #[test] - fn start_failed() { - let spawner = MockChildSpawner::new(None); - let mut testee = ChildMonitor::new(spawner); + fn sleep_cmd(secs: u32) -> Expression { + if cfg!(windows) { + cmd("ping", &["127.0.0.1", "-n", &(secs + 1).to_string()]).unchecked() + } else { + cmd("sleep", &[secs.to_string()]).unchecked() + } + } + fn spawn(cmd: &Expression) -> (ChildMonitor, mpsc::Receiver<io::Result<process::Output>>) { let (tx, rx) = mpsc::channel(); - assert!(testee.start(move |success| tx.send(success).unwrap()).is_err()); - // Make sure that the listener is not kept anywhere. Failing to start should drop the - // listener - assert_event!(rx, Err(mpsc::RecvTimeoutError::Disconnected)); + let child = + ChildMonitor::start(cmd, move |res| tx.send(res.map(|out| out.clone())).unwrap()) + .expect("Unable to start process"); + (child, rx) } #[test] - fn normal_stop() { - let spawner = MockChildSpawner::new(Some(MockChild::alive_until_kill())); - let mut testee = ChildMonitor::new(spawner); + fn echo() { + let (mut child, rx) = spawn(&echo_cmd("foobar")); + let wait_output = child.wait().unwrap(); + let callback_output = rx.try_recv().unwrap().unwrap(); - let (tx, rx) = mpsc::channel(); - assert!(testee.start(move |success| tx.send(success).unwrap()).is_ok()); - assert_event!(rx, Err(mpsc::RecvTimeoutError::Timeout)); + assert!(callback_output.status.success()); + assert_eq!("foobar\n".as_bytes(), &callback_output.stdout[..]); + assert_eq!(wait_output.status, callback_output.status); + assert_eq!(wait_output.stdout, callback_output.stdout); + } - assert!(testee.stop().is_ok()); - assert_event!(rx, Ok(true)); + #[test] + fn invalid_command() { + assert!(ChildMonitor::start(&invalid_cmd(), |_| {}).is_err()); } #[test] - fn stop_without_start() { - let spawner = MockChildSpawner::new(Some(MockChild::alive_until_kill())); - let testee = ChildMonitor::new(spawner); + fn callback_after_kill() { + let (child, rx) = spawn(&sleep_cmd(100000)); + // Make sure on_exit is not triggered now. + assert!(rx.recv_timeout(Duration::from_secs(1)).is_err()); - assert_matches!(testee.stop(), Err(Error(ErrorKind::InvalidState, _))); + child.kill().unwrap(); + assert!(!rx.recv().unwrap().unwrap().status.success()); } } |
