diff options
| -rw-r--r-- | mullvad-daemon/src/tunnel_state_machine/mod.rs | 65 |
1 files changed, 45 insertions, 20 deletions
diff --git a/mullvad-daemon/src/tunnel_state_machine/mod.rs b/mullvad-daemon/src/tunnel_state_machine/mod.rs index 316a000d77..3ab7b87475 100644 --- a/mullvad-daemon/src/tunnel_state_machine/mod.rs +++ b/mullvad-daemon/src/tunnel_state_machine/mod.rs @@ -8,11 +8,12 @@ mod disconnecting_state; use std::fmt::{Debug, Formatter, Result as FmtResult}; use std::path::PathBuf; +use std::sync::mpsc as sync_mpsc; use std::thread; use error_chain::ChainedError; use futures::sync::mpsc; -use futures::{Async, Poll, Stream}; +use futures::{Async, Future, Poll, Stream}; use tokio_core::reactor::Core; use mullvad_types::account::AccountToken; @@ -26,44 +27,68 @@ use self::disconnected_state::DisconnectedState; use self::disconnecting_state::{AfterDisconnect, DisconnectingState}; use super::{OPENVPN_LOG_FILENAME, WIREGUARD_LOG_FILENAME}; -error_chain!{} +error_chain! { + errors { + ReactorError { + description("Failed to initialize tunnel state machine event loop executor") + } + } +} /// Spawn the tunnel state machine thread, returning a channel for sending tunnel commands. pub fn spawn<T>( state_change_listener: IntoSender<TunnelStateTransition, T>, -) -> mpsc::UnboundedSender<TunnelCommand> +) -> Result<mpsc::UnboundedSender<TunnelCommand>> where T: From<TunnelStateTransition> + Send + 'static, { let (command_tx, command_rx) = mpsc::unbounded(); + let (startup_result_tx, startup_result_rx) = sync_mpsc::channel(); - thread::spawn(move || { - if let Err(error) = event_loop(command_rx, state_change_listener) { - error!("{}", error.display_chain()); - } - }); + thread::spawn( + move || match create_event_loop(command_rx, state_change_listener) { + Ok((mut reactor, event_loop)) => { + startup_result_tx.send(Ok(())).expect( + "Tunnel state machine won't be started because the owner thread crashed", + ); + + if let Err(error) = reactor.run(event_loop) { + let chained_error = + Error::with_chain(error, "Tunnel state machine exited with an error"); + error!("{}", chained_error.display_chain()); + } + } + Err(startup_error) => { + startup_result_tx + .send(Err(startup_error)) + .expect("Failed to send startup error"); + } + }, + ); - command_tx + startup_result_rx + .recv() + .expect("Failed to start tunnel state machine thread") + .map(|_| command_tx) } -fn event_loop<T>( +fn create_event_loop<T>( commands: mpsc::UnboundedReceiver<TunnelCommand>, state_change_listener: IntoSender<TunnelStateTransition, T>, -) -> Result<()> +) -> Result<(Core, impl Future<Item = (), Error = Error>)> where T: From<TunnelStateTransition> + Send + 'static, { - let mut reactor = - Core::new().chain_err(|| "Failed to initialize tunnel state machine event loop")?; - + let reactor = Core::new().chain_err(|| ErrorKind::ReactorError)?; let state_machine = TunnelStateMachine::new(commands); - reactor - .run(state_machine.for_each(|state_change_event| { - state_change_listener - .send(state_change_event) - .chain_err(|| "Failed to send state change event to listener") - })).chain_err(|| "Tunnel state machine finished with an error") + let future = state_machine.for_each(move |state_change_event| { + state_change_listener + .send(state_change_event) + .chain_err(|| "Failed to send state change event to listener") + }); + + Ok((reactor, future)) } /// Representation of external commands for the tunnel state machine. |
