diff options
| author | Linus Färnstrand <linus@mullvad.net> | 2017-01-06 15:28:27 +0100 |
|---|---|---|
| committer | Linus Färnstrand <linus@mullvad.net> | 2017-01-06 15:28:27 +0100 |
| commit | c0f537e06778191fa5af2a115a3edf84c25aa6b9 (patch) | |
| tree | 7a4acb2056385d0c807f9413848391e044fd339f /src | |
| parent | d9361edb81ee322e75851f1e4066d74ae655bf6e (diff) | |
| download | mullvadvpn-c0f537e06778191fa5af2a115a3edf84c25aa6b9.tar.xz mullvadvpn-c0f537e06778191fa5af2a115a3edf84c25aa6b9.zip | |
Add monitor module state machine
Diffstat (limited to 'src')
| -rw-r--r-- | src/process/monitor.rs | 151 |
1 files changed, 151 insertions, 0 deletions
diff --git a/src/process/monitor.rs b/src/process/monitor.rs index 4a19642224..96a747964f 100644 --- a/src/process/monitor.rs +++ b/src/process/monitor.rs @@ -127,3 +127,154 @@ impl Error for TransitionError { } } } + +enum Event { + Start(TransitionResult<(Option<ChildStdout>, Option<ChildStderr>)>), + Stop(TransitionResult<()>), + ChildExited(bool), +} + +enum State<C: MonitorChild> { + Stopped, + Running(RunningState<C>), + Stopping(StoppingState<C>), +} + +struct RunningState<C: MonitorChild> { + child: C, +} + +struct StoppingState<C: MonitorChild> { + child: C, +} + +// Messages sent internally between the `ChildMonitor`, `StateMachine` and the thread that monitors +// the child process. +enum MonitorMsg { + AddListener(Box<MonitorEventListener>), + Start, + Stop, + ChildExited(bool), + Shutdown, +} + +fn spawn_state_machine<C, B>(builder: B) -> Sender<MonitorMsg> + where C: MonitorChild, + B: ChildSpawner<C> +{ + let state_machine = StateMachine::new(builder); + let tx = state_machine.get_handle(); + thread::spawn(move || { + state_machine.run(); + }); + tx +} + +struct StateMachine<C: MonitorChild, B: ChildSpawner<C>> { + process_builder: B, + tx: Sender<MonitorMsg>, + rx: Receiver<MonitorMsg>, + listener: Box<MonitorEventListener>, + state: State<C>, +} + +impl<C: MonitorChild, B: ChildSpawner<C>> StateMachine<C, B> { + pub fn new(process_builder: B) -> Self { + let (tx, rx) = mpsc::channel(); + let state_machine = StateMachine { + process_builder: process_builder, + tx: tx, + rx: rx, + listener: Box::new(NullListener), + state: State::Stopped, + }; + state_machine + } + + pub fn get_handle(&self) -> Sender<MonitorMsg> { + self.tx.clone() + } + + pub fn run(mut self) { + while let Ok(msg) = self.rx.recv() { + let event = match msg { + MonitorMsg::AddListener(listener) => { + self.set_listener(listener); + None + } + MonitorMsg::Start => Some(Event::Start(self.start())), + MonitorMsg::Stop => Some(Event::Stop(self.stop())), + MonitorMsg::ChildExited(success) => { + self.state = State::Stopped; + Some(Event::ChildExited(success)) + } + MonitorMsg::Shutdown => break, + }; + if let Some(event) = event { + self.notify_listener(event); + } + } + } + + fn set_listener(&mut self, listener: Box<MonitorEventListener>) { + self.listener = listener; + } + + fn start(&mut self) -> TransitionResult<(Option<ChildStdout>, Option<ChildStderr>)> { + if let State::Stopped = self.state { + let mut child = self.process_builder.spawn()?; + let io = (child.stdout(), child.stderr()); + self.state = State::Running(RunningState { child: child.clone() }); + self.start_monitor_thread(child); + Ok(io) + } else { + Err(TransitionError::InvalidState) + } + } + + fn start_monitor_thread(&self, child: C) { + let tx = self.tx.clone(); + thread::spawn(move || { + let success = child.wait().unwrap_or(false); + drop(tx.send(MonitorMsg::ChildExited(success))); + }); + } + + fn stop(&mut self) -> TransitionResult<()> { + let result = if let State::Running(ref mut running_state) = self.state { + running_state.child + .kill() + .map(|_| running_state.child.clone()) + .map_err(|e| TransitionError::IoError(e)) + } else { + Err(TransitionError::InvalidState) + }; + match result { + Ok(child) => { + self.state = State::Stopping(StoppingState { child: child }); + Ok(()) + } + Err(e) => Err(e), + } + } + + fn notify_listener(&mut self, event: Event) { + match event { + Event::Start(result) => { + self.listener.started(result); + } + Event::Stop(result) => { + self.listener.stopping(result); + } + Event::ChildExited(clean) => { + self.listener.child_exited(clean); + } + } + } +} + +impl<C: MonitorChild, B: ChildSpawner<C>> Drop for StateMachine<C, B> { + fn drop(&mut self) { + drop(self.stop()) + } +} |
