summaryrefslogtreecommitdiffhomepage
path: root/src/process
diff options
context:
space:
mode:
authorLinus Färnstrand <linus@mullvad.net>2017-01-23 16:17:02 +0100
committerLinus Färnstrand <linus@mullvad.net>2017-01-25 10:22:06 +0100
commitaab55711d06b202dbe48d0d638209c09636cf81a (patch)
tree7a90036a51c9c8b2a9427f151e7b6703cebd55a0 /src/process
parent12fb13374a022cee7076d985c3f2fe575a0af5b6 (diff)
downloadmullvadvpn-aab55711d06b202dbe48d0d638209c09636cf81a.tar.xz
mullvadvpn-aab55711d06b202dbe48d0d638209c09636cf81a.zip
Make ChildMonitor synchronous
Diffstat (limited to 'src/process')
-rw-r--r--src/process/monitor.rs325
1 files changed, 71 insertions, 254 deletions
diff --git a/src/process/monitor.rs b/src/process/monitor.rs
index b2080b93de..067f3d6641 100644
--- a/src/process/monitor.rs
+++ b/src/process/monitor.rs
@@ -2,38 +2,9 @@ use std::error::Error;
use std::fmt;
use std::io;
use std::process::{ChildStdout, ChildStderr};
-use std::sync::mpsc::{self, Receiver, Sender};
+use std::sync::{Arc, Mutex};
use std::thread;
-/// Trait for listeners of events coming from `ChildMonitor`. All methods come with default
-/// implementations that does nothing. So it is possible to implement this trait and only implement
-/// the events one care about.
-pub trait MonitorEventListener: Send + 'static {
- /// Event called when an `ChildMonitor::start` has been processed. Given `result` indicate if
- /// the start succeeded or failed.
- fn started(&mut self, result: TransitionResult<(Option<ChildStdout>, Option<ChildStderr>)>) {
- drop(result);
- }
-
- /// Event called when an `ChildMonitor::stop` has been processed. Given `result` indicate if
- /// the monitor successfully went from running to stopping. Note that this does not mean the
- /// subprocess is now dead, only that the monitor has initiated stopping it. See `child_exited`
- /// for actual process exit.
- fn stopping(&mut self, result: TransitionResult<()>) {
- drop(result);
- }
-
- /// Event called when the process monitored by the `ChildMonitor` has exited. `clean`
- /// indicate if the process exited with a zero exit code or not.
- fn child_exited(&mut self, clean: bool) {
- drop(clean);
- }
-}
-
-// The default listener, not doing anything on any event.
-struct NullListener;
-impl MonitorEventListener for NullListener {}
-
/// Trait for objects that represent child processes that `ChildMonitor` can monitor
pub trait MonitoredChild: Clone + Send + 'static {
@@ -99,194 +70,95 @@ impl Error for TransitionError {
}
}
-enum Event {
- Start(TransitionResult<(Option<ChildStdout>, Option<ChildStderr>)>),
- Stop(TransitionResult<()>),
- ChildExited(bool),
-}
enum State<C: MonitoredChild> {
Stopped,
Running(RunningState<C>),
- Stopping(StoppingState<C>),
}
struct RunningState<C: MonitoredChild> {
child: C,
+ thread_handle: Option<thread::JoinHandle<()>>,
}
-struct StoppingState<C: MonitoredChild> {
- _child: C,
-}
-
-// Messages sent internally between the `ChildMonitor`, `StateMachine` and the thread that monitors
-// the child process.
-enum MonitorMsg {
- AddListener(Box<MonitorEventListener>),
- Start,
- Stop,
- ChildExited(bool),
- Shutdown,
-}
-
-fn spawn_state_machine<C, B>(builder: B) -> Sender<MonitorMsg>
- where C: MonitoredChild,
- B: ChildSpawner<C>
-{
- let state_machine = StateMachine::new(builder);
- let tx = state_machine.get_handle();
- thread::spawn(move || { state_machine.run(); });
- tx
-}
-
-struct StateMachine<C: MonitoredChild, B: ChildSpawner<C>> {
+/// A child process monitor. Takes care of starting and monitoring a child process and runs the
+/// listener on child exit.
+pub struct ChildMonitor<C: MonitoredChild, B: ChildSpawner<C>> {
process_builder: B,
- tx: Sender<MonitorMsg>,
- rx: Receiver<MonitorMsg>,
- listener: Box<MonitorEventListener>,
- state: State<C>,
+ state: Arc<Mutex<State<C>>>,
}
-impl<C: MonitoredChild, B: ChildSpawner<C>> StateMachine<C, B> {
- pub fn new(process_builder: B) -> Self {
- let (tx, rx) = mpsc::channel();
- let state_machine = StateMachine {
- process_builder: process_builder,
- tx: tx,
- rx: rx,
- listener: Box::new(NullListener),
- state: State::Stopped,
- };
- state_machine
- }
-
- pub fn get_handle(&self) -> Sender<MonitorMsg> {
- self.tx.clone()
- }
-
- pub fn run(mut self) {
- while let Ok(msg) = self.rx.recv() {
- let event = match msg {
- MonitorMsg::AddListener(listener) => {
- self.set_listener(listener);
- None
- }
- MonitorMsg::Start => Some(Event::Start(self.start())),
- MonitorMsg::Stop => Some(Event::Stop(self.stop())),
- MonitorMsg::ChildExited(success) => {
- self.state = State::Stopped;
- Some(Event::ChildExited(success))
- }
- MonitorMsg::Shutdown => break,
- };
- if let Some(event) = event {
- self.notify_listener(event);
- }
+impl<C: MonitoredChild, B: ChildSpawner<C>> ChildMonitor<C, B> {
+ /// Creates a new `ChildMonitor` that spawn processes with the given `builder`. The new
+ /// `ChildMonitor` will be in the stopped state and not start any process until you call
+ /// `start()`.
+ pub fn new(builder: B) -> Self {
+ ChildMonitor {
+ process_builder: builder,
+ state: Arc::new(Mutex::new(State::Stopped)),
}
}
- fn set_listener(&mut self, listener: Box<MonitorEventListener>) {
- self.listener = listener;
- }
-
- fn start(&mut self) -> TransitionResult<(Option<ChildStdout>, Option<ChildStderr>)> {
- if let State::Stopped = self.state {
+ /// Starts the child process and begins to monitor it. `listener` will be called as soon as the
+ /// child process exits.
+ pub fn start<L>(&mut self,
+ listener: L)
+ -> TransitionResult<(Option<ChildStdout>, Option<ChildStderr>)>
+ where L: FnMut(bool) + Send + 'static
+ {
+ let mut state_lock = self.state.lock().unwrap();
+ if let State::Stopped = *state_lock {
let mut child = self.process_builder.spawn()?;
let io = (child.stdout(), child.stderr());
- self.state = State::Running(RunningState { child: child.clone() });
- self.start_monitor_thread(child);
+ let thread_handle = self.spawn_monitor(child.clone(), listener);
+ *state_lock = State::Running(RunningState {
+ child: child,
+ thread_handle: Some(thread_handle),
+ });
Ok(io)
} else {
Err(TransitionError::InvalidState)
}
}
- fn start_monitor_thread(&self, child: C) {
- let tx = self.tx.clone();
+ fn spawn_monitor<L>(&self, child: C, mut listener: L) -> thread::JoinHandle<()>
+ where L: FnMut(bool) + Send + 'static
+ {
+ let state_mutex = self.state.clone();
thread::spawn(move || {
let success = child.wait().unwrap_or(false);
- drop(tx.send(MonitorMsg::ChildExited(success)));
- });
+ let mut state_lock = state_mutex.lock().unwrap();
+ *state_lock = State::Stopped;
+ listener(success);
+ })
}
- fn stop(&mut self) -> TransitionResult<()> {
- let result = if let State::Running(ref mut running_state) = self.state {
- running_state.child
- .kill()
- .map(|_| running_state.child.clone())
- .map_err(|e| TransitionError::IoError(e))
+ /// Sends a kill signal to the child process.
+ pub fn stop(&self) -> TransitionResult<()> {
+ let state_lock = self.state.lock().unwrap();
+ if let State::Running(ref running_state) = *state_lock {
+ running_state.child.kill()?;
+ Ok(())
} else {
Err(TransitionError::InvalidState)
- };
- match result {
- Ok(child) => {
- self.state = State::Stopping(StoppingState { _child: child });
- Ok(())
- }
- Err(e) => Err(e),
- }
- }
-
- fn notify_listener(&mut self, event: Event) {
- match event {
- Event::Start(result) => {
- self.listener.started(result);
- }
- Event::Stop(result) => {
- self.listener.stopping(result);
- }
- Event::ChildExited(clean) => {
- self.listener.child_exited(clean);
- }
}
}
}
-impl<C: MonitoredChild, B: ChildSpawner<C>> Drop for StateMachine<C, B> {
+impl<C: MonitoredChild, B: ChildSpawner<C>> Drop for ChildMonitor<C, B> {
fn drop(&mut self) {
- drop(self.stop())
- }
-}
-
-/// A child process monitor. Takes care of starting and monitoring a child process and sends
-/// out events about it to a registered listener.
-pub struct ChildMonitor {
- state_machine: Sender<MonitorMsg>,
-}
-
-impl ChildMonitor {
- /// Creates a new `ChildMonitor` that spawn processes with the given `builder`. The new
- /// `ChildMonitor` will be in the stopped state and not start any process until you call
- /// `start()`
- pub fn new<C: MonitoredChild, B: ChildSpawner<C>>(builder: B) -> Self {
- ChildMonitor { state_machine: spawn_state_machine(builder) }
- }
-
- /// Set the event listener to `listener`. Note that events might not show up on the new
- /// listener immediately after this call since the listener change message must be processed by
- /// the backend first.
- pub fn set_listener<L>(&self, listener: L)
- where L: MonitorEventListener
- {
- self.state_machine.send(MonitorMsg::AddListener(Box::new(listener))).unwrap();
- }
-
- /// Start the process to monitor. This will trigger an `MonitorEventListener::started` event.
- pub fn start(&self) {
- self.state_machine.send(MonitorMsg::Start).unwrap();
- }
-
- /// Stop the monitored process. This will trigger an `MonitorEventListener::stopped` event.
- pub fn stop(&self) {
- self.state_machine.send(MonitorMsg::Stop).unwrap();
- }
-}
-
-impl Drop for ChildMonitor {
- fn drop(&mut self) {
- self.state_machine
- .send(MonitorMsg::Shutdown)
- .expect("Internal error, not able to send Shutdown");
+ let thread_handle = {
+ let mut state_lock = self.state.lock().unwrap();
+ if let State::Running(ref mut state) = *state_lock {
+ let _ = state.child.kill();
+ state.thread_handle.take()
+ } else {
+ None
+ }
+ };
+ if let Some(thread_handle) = thread_handle {
+ let _ = thread_handle.join();
+ }
}
}
@@ -297,7 +169,7 @@ mod child_monitor {
use std::io;
use std::process::{ChildStdout, ChildStderr};
use std::sync::{Arc, Mutex};
- use std::sync::mpsc::{self, Sender, Receiver};
+ use std::sync::mpsc;
use std::thread;
use std::time::Duration;
@@ -363,41 +235,6 @@ mod child_monitor {
}
}
- #[derive(Debug)]
- enum MockEvent {
- Start(TransitionResult<(bool, bool)>),
- Stop(TransitionResult<()>),
- ChildExited(bool),
- }
-
- struct MockListener {
- tx: Sender<MockEvent>,
- }
-
- impl MockListener {
- pub fn new() -> (Self, Receiver<MockEvent>) {
- let (tx, rx) = mpsc::channel();
- let mock_listener = MockListener { tx: tx };
- (mock_listener, rx)
- }
- }
-
- impl MonitorEventListener for MockListener {
- fn started(&mut self,
- result: TransitionResult<(Option<ChildStdout>, Option<ChildStderr>)>) {
- let result = result.map(|(a, b)| (a.is_some(), b.is_some()));
- drop(self.tx.send(MockEvent::Start(result)));
- }
-
- fn stopping(&mut self, result: TransitionResult<()>) {
- drop(self.tx.send(MockEvent::Stop(result)));
- }
-
- fn child_exited(&mut self, clean: bool) {
- drop(self.tx.send(MockEvent::ChildExited(clean)));
- }
- }
-
/// Tries to recv a message from the given `$rx` for one second and tries to match it with the
/// given expected value, `$expected`
macro_rules! assert_event {
@@ -413,55 +250,35 @@ mod child_monitor {
#[test]
fn normal_start() {
let builder = MockChildSpawner::new(Some(MockChild::instant_exit()));
- let (listener, rx) = MockListener::new();
- let testee = ChildMonitor::new(builder);
-
- testee.set_listener(listener);
- testee.start();
+ let mut testee = ChildMonitor::new(builder);
- assert_event!(rx, Ok(MockEvent::Start(Ok(_))));
- assert_event!(rx, Ok(MockEvent::ChildExited(true)));
+ let (tx, rx) = mpsc::channel();
+ assert!(testee.start(move |success| tx.send(success).unwrap()).is_ok());
+ assert_event!(rx, Ok(true));
}
#[test]
fn start_failed() {
let builder = MockChildSpawner::new(None);
- let (listener, rx) = MockListener::new();
- let testee = ChildMonitor::new(builder);
-
- testee.set_listener(listener);
- testee.start();
-
- assert_event!(rx, Ok(MockEvent::Start(Err(TransitionError::IoError(_)))));
- }
-
- #[test]
- fn notifies_latest_multiple_listeners() {
- let builder = MockChildSpawner::new(Some(MockChild::instant_exit()));
- let (listener, rx) = MockListener::new();
- let (listener2, rx2) = MockListener::new();
- let testee = ChildMonitor::new(builder);
-
- testee.set_listener(listener);
- testee.set_listener(listener2);
- testee.start();
+ let mut testee = ChildMonitor::new(builder);
- assert_event!(rx2, Ok(MockEvent::Start(Ok(_))));
+ let (tx, rx) = mpsc::channel();
+ assert!(testee.start(move |success| tx.send(success).unwrap()).is_err());
+ // Make sure that the listener is not kept anywhere. Failing to start should drop the
+ // listener
assert_event!(rx, Err(mpsc::RecvTimeoutError::Disconnected));
}
#[test]
fn normal_stop() {
let builder = MockChildSpawner::new(Some(MockChild::alive_until_kill()));
- let (listener, rx) = MockListener::new();
- let testee = ChildMonitor::new(builder);
+ let mut testee = ChildMonitor::new(builder);
- testee.set_listener(listener);
- testee.start();
- testee.stop();
+ let (tx, rx) = mpsc::channel();
+ assert!(testee.start(move |success| tx.send(success).unwrap()).is_ok());
+ assert_event!(rx, Err(mpsc::RecvTimeoutError::Timeout));
- drop(rx.recv_timeout(Duration::new(1, 0)));
- assert_event!(rx, Ok(MockEvent::Stop(Ok(()))));
- assert_event!(rx, Ok(MockEvent::ChildExited(true)));
+ assert!(testee.stop().is_ok());
+ assert_event!(rx, Ok(true));
}
}