summaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorLinus Färnstrand <linus@mullvad.net>2017-01-06 15:28:27 +0100
committerLinus Färnstrand <linus@mullvad.net>2017-01-06 15:28:27 +0100
commitc0f537e06778191fa5af2a115a3edf84c25aa6b9 (patch)
tree7a4acb2056385d0c807f9413848391e044fd339f /src
parentd9361edb81ee322e75851f1e4066d74ae655bf6e (diff)
downloadmullvadvpn-c0f537e06778191fa5af2a115a3edf84c25aa6b9.tar.xz
mullvadvpn-c0f537e06778191fa5af2a115a3edf84c25aa6b9.zip
Add monitor module state machine
Diffstat (limited to 'src')
-rw-r--r--src/process/monitor.rs151
1 files changed, 151 insertions, 0 deletions
diff --git a/src/process/monitor.rs b/src/process/monitor.rs
index 4a19642224..96a747964f 100644
--- a/src/process/monitor.rs
+++ b/src/process/monitor.rs
@@ -127,3 +127,154 @@ impl Error for TransitionError {
}
}
}
+
+enum Event {
+ Start(TransitionResult<(Option<ChildStdout>, Option<ChildStderr>)>),
+ Stop(TransitionResult<()>),
+ ChildExited(bool),
+}
+
+enum State<C: MonitorChild> {
+ Stopped,
+ Running(RunningState<C>),
+ Stopping(StoppingState<C>),
+}
+
+struct RunningState<C: MonitorChild> {
+ child: C,
+}
+
+struct StoppingState<C: MonitorChild> {
+ child: C,
+}
+
+// Messages sent internally between the `ChildMonitor`, `StateMachine` and the thread that monitors
+// the child process.
+enum MonitorMsg {
+ AddListener(Box<MonitorEventListener>),
+ Start,
+ Stop,
+ ChildExited(bool),
+ Shutdown,
+}
+
+fn spawn_state_machine<C, B>(builder: B) -> Sender<MonitorMsg>
+ where C: MonitorChild,
+ B: ChildSpawner<C>
+{
+ let state_machine = StateMachine::new(builder);
+ let tx = state_machine.get_handle();
+ thread::spawn(move || {
+ state_machine.run();
+ });
+ tx
+}
+
+struct StateMachine<C: MonitorChild, B: ChildSpawner<C>> {
+ process_builder: B,
+ tx: Sender<MonitorMsg>,
+ rx: Receiver<MonitorMsg>,
+ listener: Box<MonitorEventListener>,
+ state: State<C>,
+}
+
+impl<C: MonitorChild, B: ChildSpawner<C>> StateMachine<C, B> {
+ pub fn new(process_builder: B) -> Self {
+ let (tx, rx) = mpsc::channel();
+ let state_machine = StateMachine {
+ process_builder: process_builder,
+ tx: tx,
+ rx: rx,
+ listener: Box::new(NullListener),
+ state: State::Stopped,
+ };
+ state_machine
+ }
+
+ pub fn get_handle(&self) -> Sender<MonitorMsg> {
+ self.tx.clone()
+ }
+
+ pub fn run(mut self) {
+ while let Ok(msg) = self.rx.recv() {
+ let event = match msg {
+ MonitorMsg::AddListener(listener) => {
+ self.set_listener(listener);
+ None
+ }
+ MonitorMsg::Start => Some(Event::Start(self.start())),
+ MonitorMsg::Stop => Some(Event::Stop(self.stop())),
+ MonitorMsg::ChildExited(success) => {
+ self.state = State::Stopped;
+ Some(Event::ChildExited(success))
+ }
+ MonitorMsg::Shutdown => break,
+ };
+ if let Some(event) = event {
+ self.notify_listener(event);
+ }
+ }
+ }
+
+ fn set_listener(&mut self, listener: Box<MonitorEventListener>) {
+ self.listener = listener;
+ }
+
+ fn start(&mut self) -> TransitionResult<(Option<ChildStdout>, Option<ChildStderr>)> {
+ if let State::Stopped = self.state {
+ let mut child = self.process_builder.spawn()?;
+ let io = (child.stdout(), child.stderr());
+ self.state = State::Running(RunningState { child: child.clone() });
+ self.start_monitor_thread(child);
+ Ok(io)
+ } else {
+ Err(TransitionError::InvalidState)
+ }
+ }
+
+ fn start_monitor_thread(&self, child: C) {
+ let tx = self.tx.clone();
+ thread::spawn(move || {
+ let success = child.wait().unwrap_or(false);
+ drop(tx.send(MonitorMsg::ChildExited(success)));
+ });
+ }
+
+ fn stop(&mut self) -> TransitionResult<()> {
+ let result = if let State::Running(ref mut running_state) = self.state {
+ running_state.child
+ .kill()
+ .map(|_| running_state.child.clone())
+ .map_err(|e| TransitionError::IoError(e))
+ } else {
+ Err(TransitionError::InvalidState)
+ };
+ match result {
+ Ok(child) => {
+ self.state = State::Stopping(StoppingState { child: child });
+ Ok(())
+ }
+ Err(e) => Err(e),
+ }
+ }
+
+ fn notify_listener(&mut self, event: Event) {
+ match event {
+ Event::Start(result) => {
+ self.listener.started(result);
+ }
+ Event::Stop(result) => {
+ self.listener.stopping(result);
+ }
+ Event::ChildExited(clean) => {
+ self.listener.child_exited(clean);
+ }
+ }
+ }
+}
+
+impl<C: MonitorChild, B: ChildSpawner<C>> Drop for StateMachine<C, B> {
+ fn drop(&mut self) {
+ drop(self.stop())
+ }
+}