diff options
| author | Linus Färnstrand <linus@mullvad.net> | 2017-01-23 16:17:02 +0100 |
|---|---|---|
| committer | Linus Färnstrand <linus@mullvad.net> | 2017-01-25 10:22:06 +0100 |
| commit | aab55711d06b202dbe48d0d638209c09636cf81a (patch) | |
| tree | 7a90036a51c9c8b2a9427f151e7b6703cebd55a0 /src/process | |
| parent | 12fb13374a022cee7076d985c3f2fe575a0af5b6 (diff) | |
| download | mullvadvpn-aab55711d06b202dbe48d0d638209c09636cf81a.tar.xz mullvadvpn-aab55711d06b202dbe48d0d638209c09636cf81a.zip | |
Make ChildMonitor synchronous
Diffstat (limited to 'src/process')
| -rw-r--r-- | src/process/monitor.rs | 325 |
1 files changed, 71 insertions, 254 deletions
diff --git a/src/process/monitor.rs b/src/process/monitor.rs index b2080b93de..067f3d6641 100644 --- a/src/process/monitor.rs +++ b/src/process/monitor.rs @@ -2,38 +2,9 @@ use std::error::Error; use std::fmt; use std::io; use std::process::{ChildStdout, ChildStderr}; -use std::sync::mpsc::{self, Receiver, Sender}; +use std::sync::{Arc, Mutex}; use std::thread; -/// Trait for listeners of events coming from `ChildMonitor`. All methods come with default -/// implementations that does nothing. So it is possible to implement this trait and only implement -/// the events one care about. -pub trait MonitorEventListener: Send + 'static { - /// Event called when an `ChildMonitor::start` has been processed. Given `result` indicate if - /// the start succeeded or failed. - fn started(&mut self, result: TransitionResult<(Option<ChildStdout>, Option<ChildStderr>)>) { - drop(result); - } - - /// Event called when an `ChildMonitor::stop` has been processed. Given `result` indicate if - /// the monitor successfully went from running to stopping. Note that this does not mean the - /// subprocess is now dead, only that the monitor has initiated stopping it. See `child_exited` - /// for actual process exit. - fn stopping(&mut self, result: TransitionResult<()>) { - drop(result); - } - - /// Event called when the process monitored by the `ChildMonitor` has exited. `clean` - /// indicate if the process exited with a zero exit code or not. - fn child_exited(&mut self, clean: bool) { - drop(clean); - } -} - -// The default listener, not doing anything on any event. -struct NullListener; -impl MonitorEventListener for NullListener {} - /// Trait for objects that represent child processes that `ChildMonitor` can monitor pub trait MonitoredChild: Clone + Send + 'static { @@ -99,194 +70,95 @@ impl Error for TransitionError { } } -enum Event { - Start(TransitionResult<(Option<ChildStdout>, Option<ChildStderr>)>), - Stop(TransitionResult<()>), - ChildExited(bool), -} enum State<C: MonitoredChild> { Stopped, Running(RunningState<C>), - Stopping(StoppingState<C>), } struct RunningState<C: MonitoredChild> { child: C, + thread_handle: Option<thread::JoinHandle<()>>, } -struct StoppingState<C: MonitoredChild> { - _child: C, -} - -// Messages sent internally between the `ChildMonitor`, `StateMachine` and the thread that monitors -// the child process. -enum MonitorMsg { - AddListener(Box<MonitorEventListener>), - Start, - Stop, - ChildExited(bool), - Shutdown, -} - -fn spawn_state_machine<C, B>(builder: B) -> Sender<MonitorMsg> - where C: MonitoredChild, - B: ChildSpawner<C> -{ - let state_machine = StateMachine::new(builder); - let tx = state_machine.get_handle(); - thread::spawn(move || { state_machine.run(); }); - tx -} - -struct StateMachine<C: MonitoredChild, B: ChildSpawner<C>> { +/// A child process monitor. Takes care of starting and monitoring a child process and runs the +/// listener on child exit. +pub struct ChildMonitor<C: MonitoredChild, B: ChildSpawner<C>> { process_builder: B, - tx: Sender<MonitorMsg>, - rx: Receiver<MonitorMsg>, - listener: Box<MonitorEventListener>, - state: State<C>, + state: Arc<Mutex<State<C>>>, } -impl<C: MonitoredChild, B: ChildSpawner<C>> StateMachine<C, B> { - pub fn new(process_builder: B) -> Self { - let (tx, rx) = mpsc::channel(); - let state_machine = StateMachine { - process_builder: process_builder, - tx: tx, - rx: rx, - listener: Box::new(NullListener), - state: State::Stopped, - }; - state_machine - } - - pub fn get_handle(&self) -> Sender<MonitorMsg> { - self.tx.clone() - } - - pub fn run(mut self) { - while let Ok(msg) = self.rx.recv() { - let event = match msg { - MonitorMsg::AddListener(listener) => { - self.set_listener(listener); - None - } - MonitorMsg::Start => Some(Event::Start(self.start())), - MonitorMsg::Stop => Some(Event::Stop(self.stop())), - MonitorMsg::ChildExited(success) => { - self.state = State::Stopped; - Some(Event::ChildExited(success)) - } - MonitorMsg::Shutdown => break, - }; - if let Some(event) = event { - self.notify_listener(event); - } +impl<C: MonitoredChild, B: ChildSpawner<C>> ChildMonitor<C, B> { + /// Creates a new `ChildMonitor` that spawn processes with the given `builder`. The new + /// `ChildMonitor` will be in the stopped state and not start any process until you call + /// `start()`. + pub fn new(builder: B) -> Self { + ChildMonitor { + process_builder: builder, + state: Arc::new(Mutex::new(State::Stopped)), } } - fn set_listener(&mut self, listener: Box<MonitorEventListener>) { - self.listener = listener; - } - - fn start(&mut self) -> TransitionResult<(Option<ChildStdout>, Option<ChildStderr>)> { - if let State::Stopped = self.state { + /// Starts the child process and begins to monitor it. `listener` will be called as soon as the + /// child process exits. + pub fn start<L>(&mut self, + listener: L) + -> TransitionResult<(Option<ChildStdout>, Option<ChildStderr>)> + where L: FnMut(bool) + Send + 'static + { + let mut state_lock = self.state.lock().unwrap(); + if let State::Stopped = *state_lock { let mut child = self.process_builder.spawn()?; let io = (child.stdout(), child.stderr()); - self.state = State::Running(RunningState { child: child.clone() }); - self.start_monitor_thread(child); + let thread_handle = self.spawn_monitor(child.clone(), listener); + *state_lock = State::Running(RunningState { + child: child, + thread_handle: Some(thread_handle), + }); Ok(io) } else { Err(TransitionError::InvalidState) } } - fn start_monitor_thread(&self, child: C) { - let tx = self.tx.clone(); + fn spawn_monitor<L>(&self, child: C, mut listener: L) -> thread::JoinHandle<()> + where L: FnMut(bool) + Send + 'static + { + let state_mutex = self.state.clone(); thread::spawn(move || { let success = child.wait().unwrap_or(false); - drop(tx.send(MonitorMsg::ChildExited(success))); - }); + let mut state_lock = state_mutex.lock().unwrap(); + *state_lock = State::Stopped; + listener(success); + }) } - fn stop(&mut self) -> TransitionResult<()> { - let result = if let State::Running(ref mut running_state) = self.state { - running_state.child - .kill() - .map(|_| running_state.child.clone()) - .map_err(|e| TransitionError::IoError(e)) + /// Sends a kill signal to the child process. + pub fn stop(&self) -> TransitionResult<()> { + let state_lock = self.state.lock().unwrap(); + if let State::Running(ref running_state) = *state_lock { + running_state.child.kill()?; + Ok(()) } else { Err(TransitionError::InvalidState) - }; - match result { - Ok(child) => { - self.state = State::Stopping(StoppingState { _child: child }); - Ok(()) - } - Err(e) => Err(e), - } - } - - fn notify_listener(&mut self, event: Event) { - match event { - Event::Start(result) => { - self.listener.started(result); - } - Event::Stop(result) => { - self.listener.stopping(result); - } - Event::ChildExited(clean) => { - self.listener.child_exited(clean); - } } } } -impl<C: MonitoredChild, B: ChildSpawner<C>> Drop for StateMachine<C, B> { +impl<C: MonitoredChild, B: ChildSpawner<C>> Drop for ChildMonitor<C, B> { fn drop(&mut self) { - drop(self.stop()) - } -} - -/// A child process monitor. Takes care of starting and monitoring a child process and sends -/// out events about it to a registered listener. -pub struct ChildMonitor { - state_machine: Sender<MonitorMsg>, -} - -impl ChildMonitor { - /// Creates a new `ChildMonitor` that spawn processes with the given `builder`. The new - /// `ChildMonitor` will be in the stopped state and not start any process until you call - /// `start()` - pub fn new<C: MonitoredChild, B: ChildSpawner<C>>(builder: B) -> Self { - ChildMonitor { state_machine: spawn_state_machine(builder) } - } - - /// Set the event listener to `listener`. Note that events might not show up on the new - /// listener immediately after this call since the listener change message must be processed by - /// the backend first. - pub fn set_listener<L>(&self, listener: L) - where L: MonitorEventListener - { - self.state_machine.send(MonitorMsg::AddListener(Box::new(listener))).unwrap(); - } - - /// Start the process to monitor. This will trigger an `MonitorEventListener::started` event. - pub fn start(&self) { - self.state_machine.send(MonitorMsg::Start).unwrap(); - } - - /// Stop the monitored process. This will trigger an `MonitorEventListener::stopped` event. - pub fn stop(&self) { - self.state_machine.send(MonitorMsg::Stop).unwrap(); - } -} - -impl Drop for ChildMonitor { - fn drop(&mut self) { - self.state_machine - .send(MonitorMsg::Shutdown) - .expect("Internal error, not able to send Shutdown"); + let thread_handle = { + let mut state_lock = self.state.lock().unwrap(); + if let State::Running(ref mut state) = *state_lock { + let _ = state.child.kill(); + state.thread_handle.take() + } else { + None + } + }; + if let Some(thread_handle) = thread_handle { + let _ = thread_handle.join(); + } } } @@ -297,7 +169,7 @@ mod child_monitor { use std::io; use std::process::{ChildStdout, ChildStderr}; use std::sync::{Arc, Mutex}; - use std::sync::mpsc::{self, Sender, Receiver}; + use std::sync::mpsc; use std::thread; use std::time::Duration; @@ -363,41 +235,6 @@ mod child_monitor { } } - #[derive(Debug)] - enum MockEvent { - Start(TransitionResult<(bool, bool)>), - Stop(TransitionResult<()>), - ChildExited(bool), - } - - struct MockListener { - tx: Sender<MockEvent>, - } - - impl MockListener { - pub fn new() -> (Self, Receiver<MockEvent>) { - let (tx, rx) = mpsc::channel(); - let mock_listener = MockListener { tx: tx }; - (mock_listener, rx) - } - } - - impl MonitorEventListener for MockListener { - fn started(&mut self, - result: TransitionResult<(Option<ChildStdout>, Option<ChildStderr>)>) { - let result = result.map(|(a, b)| (a.is_some(), b.is_some())); - drop(self.tx.send(MockEvent::Start(result))); - } - - fn stopping(&mut self, result: TransitionResult<()>) { - drop(self.tx.send(MockEvent::Stop(result))); - } - - fn child_exited(&mut self, clean: bool) { - drop(self.tx.send(MockEvent::ChildExited(clean))); - } - } - /// Tries to recv a message from the given `$rx` for one second and tries to match it with the /// given expected value, `$expected` macro_rules! assert_event { @@ -413,55 +250,35 @@ mod child_monitor { #[test] fn normal_start() { let builder = MockChildSpawner::new(Some(MockChild::instant_exit())); - let (listener, rx) = MockListener::new(); - let testee = ChildMonitor::new(builder); - - testee.set_listener(listener); - testee.start(); + let mut testee = ChildMonitor::new(builder); - assert_event!(rx, Ok(MockEvent::Start(Ok(_)))); - assert_event!(rx, Ok(MockEvent::ChildExited(true))); + let (tx, rx) = mpsc::channel(); + assert!(testee.start(move |success| tx.send(success).unwrap()).is_ok()); + assert_event!(rx, Ok(true)); } #[test] fn start_failed() { let builder = MockChildSpawner::new(None); - let (listener, rx) = MockListener::new(); - let testee = ChildMonitor::new(builder); - - testee.set_listener(listener); - testee.start(); - - assert_event!(rx, Ok(MockEvent::Start(Err(TransitionError::IoError(_))))); - } - - #[test] - fn notifies_latest_multiple_listeners() { - let builder = MockChildSpawner::new(Some(MockChild::instant_exit())); - let (listener, rx) = MockListener::new(); - let (listener2, rx2) = MockListener::new(); - let testee = ChildMonitor::new(builder); - - testee.set_listener(listener); - testee.set_listener(listener2); - testee.start(); + let mut testee = ChildMonitor::new(builder); - assert_event!(rx2, Ok(MockEvent::Start(Ok(_)))); + let (tx, rx) = mpsc::channel(); + assert!(testee.start(move |success| tx.send(success).unwrap()).is_err()); + // Make sure that the listener is not kept anywhere. Failing to start should drop the + // listener assert_event!(rx, Err(mpsc::RecvTimeoutError::Disconnected)); } #[test] fn normal_stop() { let builder = MockChildSpawner::new(Some(MockChild::alive_until_kill())); - let (listener, rx) = MockListener::new(); - let testee = ChildMonitor::new(builder); + let mut testee = ChildMonitor::new(builder); - testee.set_listener(listener); - testee.start(); - testee.stop(); + let (tx, rx) = mpsc::channel(); + assert!(testee.start(move |success| tx.send(success).unwrap()).is_ok()); + assert_event!(rx, Err(mpsc::RecvTimeoutError::Timeout)); - drop(rx.recv_timeout(Duration::new(1, 0))); - assert_event!(rx, Ok(MockEvent::Stop(Ok(())))); - assert_event!(rx, Ok(MockEvent::ChildExited(true))); + assert!(testee.stop().is_ok()); + assert_event!(rx, Ok(true)); } } |
