summaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorLinus Färnstrand <linus@mullvad.net>2017-01-10 12:58:57 +0100
committerLinus Färnstrand <linus@mullvad.net>2017-01-10 12:58:57 +0100
commitdf4d821fb48fb547c3448246adab9b4b3b37f5f8 (patch)
tree2d68104b5b89c869611b2a9546fdd15418fc1683 /src
parent9c85c6055faa12eb88a005b68fc7fd1d82ac03d2 (diff)
parent399f1380fb825b077dd5c6ce1e1db7b94a55a92e (diff)
downloadmullvadvpn-df4d821fb48fb547c3448246adab9b4b3b37f5f8.tar.xz
mullvadvpn-df4d821fb48fb547c3448246adab9b4b3b37f5f8.zip
Merge branch 'openvpn-monitor'
Diffstat (limited to 'src')
-rw-r--r--src/lib.rs8
-rw-r--r--src/process/mod.rs (renamed from src/process.rs)35
-rw-r--r--src/process/monitor.rs469
3 files changed, 505 insertions, 7 deletions
diff --git a/src/lib.rs b/src/lib.rs
index c65c221aec..1b3cf59b66 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -2,14 +2,10 @@
//! The core components of the talpidaemon VPN client.
+extern crate clonablechild;
+
/// Working with processes.
pub mod process;
/// Network primitives.
pub mod net;
-
-#[cfg(test)]
-mod tests {
- #[test]
- fn it_works() {}
-}
diff --git a/src/process.rs b/src/process/mod.rs
index 08b2637faf..aa298ce465 100644
--- a/src/process.rs
+++ b/src/process/mod.rs
@@ -4,7 +4,14 @@ use std::ffi::{OsString, OsStr};
use std::fmt;
use std::io;
use std::path::{Path, PathBuf};
-use std::process::{Command, Child, Stdio};
+use std::process::{Command, Child, Stdio, ChildStdout, ChildStderr};
+
+/// A module for monitoring child processes and get notified of events on them.
+pub mod monitor;
+
+use clonablechild::{ClonableChild, ChildExt};
+
+use self::monitor::{MonitoredChild, ChildSpawner};
/// An OpenVPN process builder, providing control over the different arguments that the OpenVPN
/// binary accepts.
@@ -95,6 +102,32 @@ fn write_argument(fmt: &mut fmt::Formatter, arg: &str) -> fmt::Result {
Ok(())
}
+
+impl MonitoredChild for ClonableChild {
+ fn wait(&self) -> io::Result<bool> {
+ ClonableChild::wait(self).map(|exit_status| exit_status.success())
+ }
+
+ fn kill(&self) -> io::Result<()> {
+ ClonableChild::kill(self)
+ }
+
+ fn stdout(&mut self) -> Option<ChildStdout> {
+ self.stdout()
+ }
+
+ fn stderr(&mut self) -> Option<ChildStderr> {
+ self.stderr()
+ }
+}
+
+impl ChildSpawner<ClonableChild> for OpenVpnBuilder {
+ fn spawn(&mut self) -> io::Result<ClonableChild> {
+ OpenVpnBuilder::spawn(self).map(|child| child.into_clonable())
+ }
+}
+
+
#[cfg(test)]
mod tests {
use net::RemoteAddr;
diff --git a/src/process/monitor.rs b/src/process/monitor.rs
new file mode 100644
index 0000000000..2b88271ca9
--- /dev/null
+++ b/src/process/monitor.rs
@@ -0,0 +1,469 @@
+use std::error::Error;
+use std::fmt;
+use std::io;
+use std::process::{ChildStdout, ChildStderr};
+use std::sync::mpsc::{self, Receiver, Sender};
+use std::thread;
+
+/// Trait for listeners of events coming from `ChildMonitor`. All methods come with default
+/// implementations that does nothing. So it is possible to implement this trait and only implement
+/// the events one care about.
+pub trait MonitorEventListener: Send + 'static {
+ /// Event called when an `ChildMonitor::start` has been processed. Given `result` indicate if
+ /// the start succeeded or failed.
+ fn started(&mut self, result: TransitionResult<(Option<ChildStdout>, Option<ChildStderr>)>) {
+ drop(result);
+ }
+
+ /// Event called when an `ChildMonitor::stop` has been processed. Given `result` indicate if
+ /// the monitor successfully went from running to stopping. Note that this does not mean the
+ /// subprocess is now dead, only that the monitor has initiated stopping it. See `child_exited`
+ /// for actual process exit.
+ fn stopping(&mut self, result: TransitionResult<()>) {
+ drop(result);
+ }
+
+ /// Event called when the process monitored by the `ChildMonitor` has exited. `clean`
+ /// indicate if the process exited with a zero exit code or not.
+ fn child_exited(&mut self, clean: bool) {
+ drop(clean);
+ }
+}
+
+// The default listener, not doing anything on any event.
+struct NullListener;
+impl MonitorEventListener for NullListener {}
+
+
+/// Trait for objects that represent child processes that `ChildMonitor` can monitor
+pub trait MonitoredChild: Clone + Send + 'static {
+ /// Waits for the child to exit completely, returning if the child exited cleanly or not.
+ fn wait(&self) -> io::Result<bool>;
+
+ /// Forces the child to exit.
+ fn kill(&self) -> io::Result<()>;
+
+ /// Retreives the stdout stream for the child.
+ fn stdout(&mut self) -> Option<ChildStdout>;
+
+ /// Retreives the stderr stream for the child.
+ fn stderr(&mut self) -> Option<ChildStderr>;
+}
+
+/// Trait for objects that can spawn any type of child process object implementing `MonitoredChild`.
+pub trait ChildSpawner<C: MonitoredChild>: Send + 'static {
+ /// Spawns the child process, returning a handle to it on success.
+ fn spawn(&mut self) -> io::Result<C>;
+}
+
+
+/// Type alias for results of transitions in the `ChildMonitor` state machine.
+pub type TransitionResult<T> = Result<T, TransitionError>;
+
+/// Error type for transitions in the `ChildMonitor` state machine.
+#[derive(Debug)]
+pub enum TransitionError {
+ /// The transition could not be made because the state machine was not in a state that could
+ /// transition to the desired state.
+ InvalidState,
+
+ /// The transition failed because of an `io::Error`.
+ IoError(io::Error),
+}
+
+impl From<io::Error> for TransitionError {
+ fn from(error: io::Error) -> Self {
+ TransitionError::IoError(error)
+ }
+}
+
+impl fmt::Display for TransitionError {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.write_str(self.description())
+ }
+}
+
+impl Error for TransitionError {
+ fn description(&self) -> &str {
+ match *self {
+ TransitionError::InvalidState => "Invalid state for desired transition",
+ TransitionError::IoError(..) => "Transition failed due to IO error",
+ }
+ }
+
+ fn cause(&self) -> Option<&Error> {
+ match *self {
+ TransitionError::IoError(ref e) => Some(e),
+ _ => None,
+ }
+ }
+}
+
+enum Event {
+ Start(TransitionResult<(Option<ChildStdout>, Option<ChildStderr>)>),
+ Stop(TransitionResult<()>),
+ ChildExited(bool),
+}
+
+enum State<C: MonitoredChild> {
+ Stopped,
+ Running(RunningState<C>),
+ Stopping(StoppingState<C>),
+}
+
+struct RunningState<C: MonitoredChild> {
+ child: C,
+}
+
+struct StoppingState<C: MonitoredChild> {
+ _child: C,
+}
+
+// Messages sent internally between the `ChildMonitor`, `StateMachine` and the thread that monitors
+// the child process.
+enum MonitorMsg {
+ AddListener(Box<MonitorEventListener>),
+ Start,
+ Stop,
+ ChildExited(bool),
+ Shutdown,
+}
+
+fn spawn_state_machine<C, B>(builder: B) -> Sender<MonitorMsg>
+ where C: MonitoredChild,
+ B: ChildSpawner<C>
+{
+ let state_machine = StateMachine::new(builder);
+ let tx = state_machine.get_handle();
+ thread::spawn(move || {
+ state_machine.run();
+ });
+ tx
+}
+
+struct StateMachine<C: MonitoredChild, B: ChildSpawner<C>> {
+ process_builder: B,
+ tx: Sender<MonitorMsg>,
+ rx: Receiver<MonitorMsg>,
+ listener: Box<MonitorEventListener>,
+ state: State<C>,
+}
+
+impl<C: MonitoredChild, B: ChildSpawner<C>> StateMachine<C, B> {
+ pub fn new(process_builder: B) -> Self {
+ let (tx, rx) = mpsc::channel();
+ let state_machine = StateMachine {
+ process_builder: process_builder,
+ tx: tx,
+ rx: rx,
+ listener: Box::new(NullListener),
+ state: State::Stopped,
+ };
+ state_machine
+ }
+
+ pub fn get_handle(&self) -> Sender<MonitorMsg> {
+ self.tx.clone()
+ }
+
+ pub fn run(mut self) {
+ while let Ok(msg) = self.rx.recv() {
+ let event = match msg {
+ MonitorMsg::AddListener(listener) => {
+ self.set_listener(listener);
+ None
+ }
+ MonitorMsg::Start => Some(Event::Start(self.start())),
+ MonitorMsg::Stop => Some(Event::Stop(self.stop())),
+ MonitorMsg::ChildExited(success) => {
+ self.state = State::Stopped;
+ Some(Event::ChildExited(success))
+ }
+ MonitorMsg::Shutdown => break,
+ };
+ if let Some(event) = event {
+ self.notify_listener(event);
+ }
+ }
+ }
+
+ fn set_listener(&mut self, listener: Box<MonitorEventListener>) {
+ self.listener = listener;
+ }
+
+ fn start(&mut self) -> TransitionResult<(Option<ChildStdout>, Option<ChildStderr>)> {
+ if let State::Stopped = self.state {
+ let mut child = self.process_builder.spawn()?;
+ let io = (child.stdout(), child.stderr());
+ self.state = State::Running(RunningState { child: child.clone() });
+ self.start_monitor_thread(child);
+ Ok(io)
+ } else {
+ Err(TransitionError::InvalidState)
+ }
+ }
+
+ fn start_monitor_thread(&self, child: C) {
+ let tx = self.tx.clone();
+ thread::spawn(move || {
+ let success = child.wait().unwrap_or(false);
+ drop(tx.send(MonitorMsg::ChildExited(success)));
+ });
+ }
+
+ fn stop(&mut self) -> TransitionResult<()> {
+ let result = if let State::Running(ref mut running_state) = self.state {
+ running_state.child
+ .kill()
+ .map(|_| running_state.child.clone())
+ .map_err(|e| TransitionError::IoError(e))
+ } else {
+ Err(TransitionError::InvalidState)
+ };
+ match result {
+ Ok(child) => {
+ self.state = State::Stopping(StoppingState { _child: child });
+ Ok(())
+ }
+ Err(e) => Err(e),
+ }
+ }
+
+ fn notify_listener(&mut self, event: Event) {
+ match event {
+ Event::Start(result) => {
+ self.listener.started(result);
+ }
+ Event::Stop(result) => {
+ self.listener.stopping(result);
+ }
+ Event::ChildExited(clean) => {
+ self.listener.child_exited(clean);
+ }
+ }
+ }
+}
+
+impl<C: MonitoredChild, B: ChildSpawner<C>> Drop for StateMachine<C, B> {
+ fn drop(&mut self) {
+ drop(self.stop())
+ }
+}
+
+/// A child process monitor. Takes care of starting and monitoring a child process and sends
+/// out events about it to a registered listener.
+pub struct ChildMonitor {
+ state_machine: Sender<MonitorMsg>,
+}
+
+impl ChildMonitor {
+ /// Creates a new `ChildMonitor` that spawn processes with the given `builder`. The new
+ /// `ChildMonitor` will be in the stopped state and not start any process until you call
+ /// `start()`
+ pub fn new<C: MonitoredChild, B: ChildSpawner<C>>(builder: B) -> Self {
+ ChildMonitor { state_machine: spawn_state_machine(builder) }
+ }
+
+ /// Set the event listener to `listener`. Note that events might not show up on the new
+ /// listener immediately after this call since the listener change message must be processed by
+ /// the backend first.
+ pub fn set_listener<L>(&self, listener: L)
+ where L: MonitorEventListener
+ {
+ self.state_machine.send(MonitorMsg::AddListener(Box::new(listener))).unwrap();
+ }
+
+ /// Start the process to monitor. This will trigger an `MonitorEventListener::started` event.
+ pub fn start(&self) {
+ self.state_machine.send(MonitorMsg::Start).unwrap();
+ }
+
+ /// Stop the monitored process. This will trigger an `MonitorEventListener::stopped` event.
+ pub fn stop(&self) {
+ self.state_machine.send(MonitorMsg::Stop).unwrap();
+ }
+}
+
+impl Drop for ChildMonitor {
+ fn drop(&mut self) {
+ self.state_machine
+ .send(MonitorMsg::Shutdown)
+ .expect("Internal error, not able to send Shutdown");
+ }
+}
+
+
+#[cfg(test)]
+mod child_monitor {
+ use std::io;
+ use std::process::{ChildStdout, ChildStderr};
+ use std::sync::{Arc, Mutex};
+ use std::sync::mpsc::{self, Sender, Receiver};
+ use std::thread;
+ use std::time::Duration;
+ use super::*;
+
+ #[derive(Clone)]
+ struct MockChild {
+ died: Arc<Mutex<bool>>,
+ }
+
+ impl MockChild {
+ pub fn instant_exit() -> Self {
+ Self::new(true)
+ }
+
+ pub fn alive_until_kill() -> Self {
+ Self::new(false)
+ }
+
+ fn new(died: bool) -> Self {
+ MockChild { died: Arc::new(Mutex::new(died)) }
+ }
+ }
+
+ impl MonitoredChild for MockChild {
+ fn wait(&self) -> io::Result<bool> {
+ loop {
+ if *self.died.lock().unwrap() {
+ break;
+ }
+ thread::sleep(Duration::new(0, 1_000_000));
+ }
+ Ok(true)
+ }
+
+ fn kill(&self) -> io::Result<()> {
+ *self.died.lock().unwrap() = true;
+ Ok(())
+ }
+
+ fn stdout(&mut self) -> Option<ChildStdout> {
+ None
+ }
+
+ fn stderr(&mut self) -> Option<ChildStderr> {
+ None
+ }
+ }
+
+ struct MockChildSpawner {
+ spawn_result: Option<MockChild>,
+ }
+
+ impl MockChildSpawner {
+ pub fn new(spawn_result: Option<MockChild>) -> Self {
+ MockChildSpawner { spawn_result: spawn_result }
+ }
+ }
+
+ impl ChildSpawner<MockChild> for MockChildSpawner {
+ fn spawn(&mut self) -> io::Result<MockChild> {
+ self.spawn_result
+ .clone()
+ .ok_or(io::Error::new(io::ErrorKind::Other, "Mocking a failed process spawn"))
+ }
+ }
+
+ #[derive(Debug)]
+ enum MockEvent {
+ Start(TransitionResult<(bool, bool)>),
+ Stop(TransitionResult<()>),
+ ChildExited(bool),
+ }
+
+ struct MockListener {
+ tx: Sender<MockEvent>,
+ }
+
+ impl MockListener {
+ pub fn new() -> (Self, Receiver<MockEvent>) {
+ let (tx, rx) = mpsc::channel();
+ let mock_listener = MockListener { tx: tx };
+ (mock_listener, rx)
+ }
+ }
+
+ impl MonitorEventListener for MockListener {
+ fn started(&mut self,
+ result: TransitionResult<(Option<ChildStdout>, Option<ChildStderr>)>) {
+ let result = result.map(|(a, b)| (a.is_some(), b.is_some()));
+ drop(self.tx.send(MockEvent::Start(result)));
+ }
+
+ fn stopping(&mut self, result: TransitionResult<()>) {
+ drop(self.tx.send(MockEvent::Stop(result)));
+ }
+
+ fn child_exited(&mut self, clean: bool) {
+ drop(self.tx.send(MockEvent::ChildExited(clean)));
+ }
+ }
+
+ /// Tries to recv a message from the given `$rx` for one second and tries to match it with the
+ /// given expected value, `$expected`
+ macro_rules! assert_event {
+ ($rx:ident, $expected:pat) => {{
+ let result = $rx.recv_timeout(Duration::new(1, 0));
+ if let $expected = result {} else {
+ let msg = stringify!($expected);
+ panic!("Expected {}. Got {:?}", msg, result);
+ }
+ }}
+ }
+
+ #[test]
+ fn normal_start() {
+ let builder = MockChildSpawner::new(Some(MockChild::instant_exit()));
+ let (listener, rx) = MockListener::new();
+ let testee = ChildMonitor::new(builder);
+
+ testee.set_listener(listener);
+ testee.start();
+
+ assert_event!(rx, Ok(MockEvent::Start(Ok(_))));
+ assert_event!(rx, Ok(MockEvent::ChildExited(true)));
+ }
+
+ #[test]
+ fn start_failed() {
+ let builder = MockChildSpawner::new(None);
+ let (listener, rx) = MockListener::new();
+ let testee = ChildMonitor::new(builder);
+
+ testee.set_listener(listener);
+ testee.start();
+
+ assert_event!(rx, Ok(MockEvent::Start(Err(TransitionError::IoError(_)))));
+ }
+
+ #[test]
+ fn notifies_latest_multiple_listeners() {
+ let builder = MockChildSpawner::new(Some(MockChild::instant_exit()));
+ let (listener, rx) = MockListener::new();
+ let (listener2, rx2) = MockListener::new();
+ let testee = ChildMonitor::new(builder);
+
+ testee.set_listener(listener);
+ testee.set_listener(listener2);
+ testee.start();
+
+ assert_event!(rx2, Ok(MockEvent::Start(Ok(_))));
+ assert_event!(rx, Err(mpsc::RecvTimeoutError::Disconnected));
+ }
+
+ #[test]
+ fn normal_stop() {
+ let builder = MockChildSpawner::new(Some(MockChild::alive_until_kill()));
+ let (listener, rx) = MockListener::new();
+ let testee = ChildMonitor::new(builder);
+
+ testee.set_listener(listener);
+ testee.start();
+ testee.stop();
+
+ drop(rx.recv_timeout(Duration::new(1, 0)));
+ assert_event!(rx, Ok(MockEvent::Stop(Ok(()))));
+ assert_event!(rx, Ok(MockEvent::ChildExited(true)));
+ }
+}