diff options
| -rw-r--r-- | mullvad-daemon/src/main.rs | 1 | ||||
| -rw-r--r-- | mullvad-daemon/src/tunnel_state_machine/connected_state.rs | 72 | ||||
| -rw-r--r-- | mullvad-daemon/src/tunnel_state_machine/connecting_state.rs | 172 | ||||
| -rw-r--r-- | mullvad-daemon/src/tunnel_state_machine/disconnected_state.rs | 30 | ||||
| -rw-r--r-- | mullvad-daemon/src/tunnel_state_machine/disconnecting_state.rs | 75 | ||||
| -rw-r--r-- | mullvad-daemon/src/tunnel_state_machine/macros.rs | 21 | ||||
| -rw-r--r-- | mullvad-daemon/src/tunnel_state_machine/mod.rs | 304 |
7 files changed, 675 insertions, 0 deletions
diff --git a/mullvad-daemon/src/main.rs b/mullvad-daemon/src/main.rs index c32d4eef18..628f286509 100644 --- a/mullvad-daemon/src/main.rs +++ b/mullvad-daemon/src/main.rs @@ -55,6 +55,7 @@ mod rpc_uniqueness_check; mod settings; mod shutdown; mod system_service; +mod tunnel_state_machine; mod version; use error_chain::ChainedError; diff --git a/mullvad-daemon/src/tunnel_state_machine/connected_state.rs b/mullvad-daemon/src/tunnel_state_machine/connected_state.rs new file mode 100644 index 0000000000..5163029ec1 --- /dev/null +++ b/mullvad-daemon/src/tunnel_state_machine/connected_state.rs @@ -0,0 +1,72 @@ +use futures::sync::mpsc; +use futures::Stream; + +use talpid_core::tunnel::TunnelEvent; + +use super::{ + AfterDisconnect, CloseHandle, DisconnectingState, EventConsequence, StateEntryResult, + TunnelCommand, TunnelState, TunnelStateWrapper, +}; + +pub struct ConnectedStateBootstrap { + pub tunnel_events: mpsc::UnboundedReceiver<TunnelEvent>, + pub close_handle: CloseHandle, +} + +/// The tunnel is up and working. +pub struct ConnectedState { + close_handle: CloseHandle, + tunnel_events: mpsc::UnboundedReceiver<TunnelEvent>, +} + +impl ConnectedState { + fn from(bootstrap: ConnectedStateBootstrap) -> Self { + ConnectedState { + close_handle: bootstrap.close_handle, + tunnel_events: bootstrap.tunnel_events, + } + } + + fn handle_commands( + self, + commands: &mut mpsc::UnboundedReceiver<TunnelCommand>, + ) -> EventConsequence<Self> { + use self::EventConsequence::*; + + match try_handle_event!(self, commands.poll()) { + Ok(TunnelCommand::Connect(_)) => SameState(self), + Ok(TunnelCommand::Disconnect) | Err(_) => NewState(DisconnectingState::enter(( + self.close_handle.close(), + AfterDisconnect::Nothing, + ))), + } + } + + fn handle_tunnel_events(mut self) -> EventConsequence<Self> { + use self::EventConsequence::*; + + match try_handle_event!(self, self.tunnel_events.poll()) { + Ok(TunnelEvent::Down) | Err(_) => NewState(DisconnectingState::enter(( + self.close_handle.close(), + AfterDisconnect::Nothing, + ))), + Ok(_) => SameState(self), + } + } +} + +impl TunnelState for ConnectedState { + type Bootstrap = ConnectedStateBootstrap; + + fn enter(bootstrap: Self::Bootstrap) -> StateEntryResult { + Ok(TunnelStateWrapper::from(ConnectedState::from(bootstrap))) + } + + fn handle_event( + self, + commands: &mut mpsc::UnboundedReceiver<TunnelCommand>, + ) -> EventConsequence<Self> { + self.handle_commands(commands) + .or_else(Self::handle_tunnel_events) + } +} diff --git a/mullvad-daemon/src/tunnel_state_machine/connecting_state.rs b/mullvad-daemon/src/tunnel_state_machine/connecting_state.rs new file mode 100644 index 0000000000..e9677ecf6b --- /dev/null +++ b/mullvad-daemon/src/tunnel_state_machine/connecting_state.rs @@ -0,0 +1,172 @@ +use std::ffi::OsString; +use std::path::PathBuf; +use std::sync::Mutex; +use std::thread; +use std::time::{Duration, Instant}; + +use futures::sink::Wait; +use futures::sync::mpsc; +use futures::{Sink, Stream}; + +use talpid_core::tunnel::{TunnelEvent, TunnelMonitor}; +use talpid_types::net::TunnelEndpointData; + +use super::{ + AfterDisconnect, CloseHandle, ConnectedState, ConnectedStateBootstrap, DisconnectedState, + DisconnectingState, EventConsequence, Result, ResultExt, StateEntryResult, TunnelCommand, + TunnelParameters, TunnelState, TunnelStateWrapper, OPENVPN_LOG_FILENAME, + WIREGUARD_LOG_FILENAME, +}; +use logging; + +const MIN_TUNNEL_ALIVE_TIME: Duration = Duration::from_millis(1000); + +#[cfg(windows)] +const TUNNEL_INTERFACE_ALIAS: Option<&str> = Some("Mullvad"); +#[cfg(not(windows))] +const TUNNEL_INTERFACE_ALIAS: Option<&str> = None; + +/// The tunnel has been started, but it is not established/functional. +pub struct ConnectingState { + close_handle: CloseHandle, + tunnel_events: mpsc::UnboundedReceiver<TunnelEvent>, +} + +impl ConnectingState { + fn new(parameters: TunnelParameters) -> Result<Self> { + let (event_tx, event_rx) = mpsc::unbounded(); + let monitor = Self::spawn_tunnel_monitor(parameters, event_tx.wait())?; + let close_handle = CloseHandle::new(&monitor); + + Self::spawn_tunnel_monitor_wait_thread(monitor); + + Ok(ConnectingState { + close_handle, + tunnel_events: event_rx, + }) + } + + fn spawn_tunnel_monitor( + parameters: TunnelParameters, + events: Wait<mpsc::UnboundedSender<TunnelEvent>>, + ) -> Result<TunnelMonitor> { + let event_tx = Mutex::new(events); + let on_tunnel_event = move |event| { + let send_result = event_tx + .lock() + .expect("A thread panicked while sending a tunnel event") + .send(event); + + if send_result.is_err() { + warn!("Tunnel state machine stopped before tunnel event was received"); + } + }; + let log_file = Self::prepare_tunnel_log_file(¶meters)?; + + TunnelMonitor::new( + parameters.endpoint, + ¶meters.options, + TUNNEL_INTERFACE_ALIAS.to_owned().map(OsString::from), + ¶meters.account_token, + log_file.as_ref().map(PathBuf::as_path), + ¶meters.resource_dir, + on_tunnel_event, + ).chain_err(|| "Unable to start tunnel monitor") + } + + fn prepare_tunnel_log_file(parameters: &TunnelParameters) -> Result<Option<PathBuf>> { + if let Some(ref log_dir) = parameters.log_dir { + let filename = match parameters.endpoint.tunnel { + TunnelEndpointData::OpenVpn(_) => OPENVPN_LOG_FILENAME, + TunnelEndpointData::Wireguard(_) => WIREGUARD_LOG_FILENAME, + }; + let tunnel_log = log_dir.join(filename); + logging::rotate_log(&tunnel_log).chain_err(|| "Unable to rotate tunnel log")?; + Ok(Some(tunnel_log)) + } else { + Ok(None) + } + } + + fn spawn_tunnel_monitor_wait_thread(tunnel_monitor: TunnelMonitor) { + thread::spawn(move || { + let start = Instant::now(); + + match tunnel_monitor.wait() { + Ok(_) => debug!("Tunnel has finished without errors"), + Err(error) => { + let chained_error = error.chain_err(|| "Tunnel has stopped unexpectedly"); + warn!("{}", chained_error); + } + } + + if let Some(remaining_time) = MIN_TUNNEL_ALIVE_TIME.checked_sub(start.elapsed()) { + thread::sleep(remaining_time); + } + + trace!("Tunnel monitor thread exit"); + }); + } + + fn into_connected_state_bootstrap(self) -> ConnectedStateBootstrap { + ConnectedStateBootstrap { + tunnel_events: self.tunnel_events, + close_handle: self.close_handle, + } + } + + fn handle_commands( + self, + commands: &mut mpsc::UnboundedReceiver<TunnelCommand>, + ) -> EventConsequence<Self> { + use self::EventConsequence::*; + + match try_handle_event!(self, commands.poll()) { + Ok(TunnelCommand::Connect(_)) => SameState(self), + Ok(TunnelCommand::Disconnect) | Err(_) => NewState(DisconnectingState::enter(( + self.close_handle.close(), + AfterDisconnect::Nothing, + ))), + } + } + + fn handle_tunnel_events(mut self) -> EventConsequence<Self> { + use self::EventConsequence::*; + + match try_handle_event!(self, self.tunnel_events.poll()) { + Ok(TunnelEvent::Up(_)) => { + NewState(ConnectedState::enter(self.into_connected_state_bootstrap())) + } + Ok(_) => SameState(self), + Err(_) => NewState(DisconnectingState::enter(( + self.close_handle.close(), + AfterDisconnect::Nothing, + ))), + } + } +} + +impl TunnelState for ConnectingState { + type Bootstrap = TunnelParameters; + + fn enter(parameters: Self::Bootstrap) -> StateEntryResult { + Self::new(parameters) + .map(TunnelStateWrapper::from) + .chain_err(|| "Failed to start tunnel") + .map_err(|error| { + ( + error, + DisconnectedState::enter(()) + .expect("Failed to transition to fallback disconnected state"), + ) + }) + } + + fn handle_event( + self, + commands: &mut mpsc::UnboundedReceiver<TunnelCommand>, + ) -> EventConsequence<Self> { + self.handle_commands(commands) + .or_else(Self::handle_tunnel_events) + } +} diff --git a/mullvad-daemon/src/tunnel_state_machine/disconnected_state.rs b/mullvad-daemon/src/tunnel_state_machine/disconnected_state.rs new file mode 100644 index 0000000000..15bdc90e9b --- /dev/null +++ b/mullvad-daemon/src/tunnel_state_machine/disconnected_state.rs @@ -0,0 +1,30 @@ +use futures::sync::mpsc; +use futures::Stream; + +use super::{ + ConnectingState, EventConsequence, StateEntryResult, TunnelCommand, TunnelState, + TunnelStateWrapper, +}; + +/// No tunnel is running. +pub struct DisconnectedState; + +impl TunnelState for DisconnectedState { + type Bootstrap = (); + + fn enter(_: Self::Bootstrap) -> StateEntryResult { + Ok(TunnelStateWrapper::from(DisconnectedState)) + } + + fn handle_event( + self, + commands: &mut mpsc::UnboundedReceiver<TunnelCommand>, + ) -> EventConsequence<Self> { + use self::EventConsequence::*; + + match try_handle_event!(self, commands.poll()) { + Ok(TunnelCommand::Connect(parameters)) => NewState(ConnectingState::enter(parameters)), + Ok(TunnelCommand::Disconnect) | Err(_) => SameState(self), + } + } +} diff --git a/mullvad-daemon/src/tunnel_state_machine/disconnecting_state.rs b/mullvad-daemon/src/tunnel_state_machine/disconnecting_state.rs new file mode 100644 index 0000000000..fd1899a44f --- /dev/null +++ b/mullvad-daemon/src/tunnel_state_machine/disconnecting_state.rs @@ -0,0 +1,75 @@ +use std::io; + +use futures::sync::{mpsc, oneshot}; +use futures::{Async, Future, Stream}; + +use super::{ + ConnectingState, DisconnectedState, EventConsequence, StateEntryResult, TunnelCommand, + TunnelParameters, TunnelState, TunnelStateWrapper, +}; + +/// This state is active from when we manually trigger a tunnel kill until the tunnel wait +/// operation (TunnelExit) returned. +pub struct DisconnectingState { + exited: oneshot::Receiver<io::Result<()>>, + after_disconnect: AfterDisconnect, +} + +impl DisconnectingState { + fn handle_commands( + mut self, + commands: &mut mpsc::UnboundedReceiver<TunnelCommand>, + ) -> EventConsequence<Self> { + use self::AfterDisconnect::*; + + self.after_disconnect = match try_handle_event!(self, commands.poll()) { + Ok(TunnelCommand::Connect(parameters)) => Reconnect(parameters), + Ok(TunnelCommand::Disconnect) | Err(_) => Nothing, + }; + + EventConsequence::SameState(self) + } + + fn handle_exit_event(mut self) -> EventConsequence<Self> { + use self::EventConsequence::*; + + match self.exited.poll() { + Ok(Async::NotReady) => NoEvents(self), + Ok(Async::Ready(_)) | Err(_) => NewState(self.after_disconnect()), + } + } + + fn after_disconnect(self) -> StateEntryResult { + match self.after_disconnect { + AfterDisconnect::Nothing => DisconnectedState::enter(()), + AfterDisconnect::Reconnect(tunnel_parameters) => { + ConnectingState::enter(tunnel_parameters) + } + } + } +} + +impl TunnelState for DisconnectingState { + type Bootstrap = (oneshot::Receiver<io::Result<()>>, AfterDisconnect); + + fn enter((exited, after_disconnect): Self::Bootstrap) -> StateEntryResult { + Ok(TunnelStateWrapper::from(DisconnectingState { + exited, + after_disconnect, + })) + } + + fn handle_event( + self, + commands: &mut mpsc::UnboundedReceiver<TunnelCommand>, + ) -> EventConsequence<Self> { + self.handle_commands(commands) + .or_else(Self::handle_exit_event) + } +} + +/// Which state should be transitioned to after disconnection is complete. +pub enum AfterDisconnect { + Nothing, + Reconnect(TunnelParameters), +} diff --git a/mullvad-daemon/src/tunnel_state_machine/macros.rs b/mullvad-daemon/src/tunnel_state_machine/macros.rs new file mode 100644 index 0000000000..2241c8cb06 --- /dev/null +++ b/mullvad-daemon/src/tunnel_state_machine/macros.rs @@ -0,0 +1,21 @@ +/// Try to receive an event from a `Stream`'s asynchronous poll expression. +/// +/// This macro is similar to the `try_ready!` macro provided in `futures`. If there is an event +/// ready, it will be returned wrapped in a `Result`. If there are no events ready to be received, +/// the outer function will return with a transition that indicates that no events were received, +/// which is analogous to `Async::NotReady`. +/// +/// When the asynchronous event indicates that the stream has finished or that it has failed, an +/// error type is returned so that either close scenario can be handled in a similar way. +macro_rules! try_handle_event { + ($same_state:expr, $event:expr) => { + match $event { + Ok(::futures::Async::Ready(Some(event))) => Ok(event), + Ok(::futures::Async::Ready(None)) => Err(None), + Ok(::futures::Async::NotReady) => { + return ::tunnel_state_machine::EventConsequence::NoEvents($same_state); + } + Err(error) => Err(Some(error)), + } + }; +} diff --git a/mullvad-daemon/src/tunnel_state_machine/mod.rs b/mullvad-daemon/src/tunnel_state_machine/mod.rs new file mode 100644 index 0000000000..57f611f3ef --- /dev/null +++ b/mullvad-daemon/src/tunnel_state_machine/mod.rs @@ -0,0 +1,304 @@ +#[macro_use] +mod macros; + +mod connected_state; +mod connecting_state; +mod disconnected_state; +mod disconnecting_state; + +use std::fmt::{Debug, Formatter, Result as FmtResult}; +use std::io; +use std::path::PathBuf; +use std::thread; + +use error_chain::ChainedError; +use futures::sync::{mpsc, oneshot}; +use futures::{Async, Future, Poll}; +use tokio_core::reactor::Core; + +use mullvad_types::account::AccountToken; +use talpid_core::tunnel::{self, TunnelMonitor}; +use talpid_types::net::{TunnelEndpoint, TunnelOptions}; + +use self::connected_state::{ConnectedState, ConnectedStateBootstrap}; +use self::connecting_state::ConnectingState; +use self::disconnected_state::DisconnectedState; +use self::disconnecting_state::{AfterDisconnect, DisconnectingState}; +use super::{OPENVPN_LOG_FILENAME, WIREGUARD_LOG_FILENAME}; + +error_chain!{} + +/// Spawn the tunnel state machine thread, returning a channel for sending tunnel commands. +pub fn spawn() -> mpsc::UnboundedSender<TunnelCommand> { + let (command_tx, command_rx) = mpsc::unbounded(); + + thread::spawn(move || { + if let Err(error) = event_loop(command_rx) { + error!("{}", error.display_chain()); + } + }); + + command_tx +} + +fn event_loop(commands: mpsc::UnboundedReceiver<TunnelCommand>) -> Result<()> { + let mut reactor = + Core::new().chain_err(|| "Failed to initialize tunnel state machine event loop")?; + + let state_machine = TunnelStateMachine::new(commands); + + reactor + .run(state_machine) + .chain_err(|| "Tunnel state machine finished with an error") +} + +/// Representation of external commands for the tunnel state machine. +pub enum TunnelCommand { + /// Open tunnel connection. + Connect(TunnelParameters), + /// Close tunnel connection. + Disconnect, +} + +/// Information necessary to open a tunnel. +pub struct TunnelParameters { + pub endpoint: TunnelEndpoint, + pub options: TunnelOptions, + pub log_dir: Option<PathBuf>, + pub resource_dir: PathBuf, + pub account_token: AccountToken, +} + +/// Asynchronous handling of the tunnel state machine. +/// +/// This type implements `Future`, and attempts to advance the state machine based on the events +/// received on the commands stream and possibly on events that specific states are also listening +/// to. +struct TunnelStateMachine { + current_state: Option<TunnelStateWrapper>, + commands: mpsc::UnboundedReceiver<TunnelCommand>, +} + +impl TunnelStateMachine { + fn new(commands: mpsc::UnboundedReceiver<TunnelCommand>) -> Self { + let initial_state = + TunnelStateWrapper::enter(()).expect("Failed to create initial tunnel state"); + + TunnelStateMachine { + current_state: Some(initial_state), + commands, + } + } +} + +impl Future for TunnelStateMachine { + type Item = (); + type Error = Error; + + fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + let mut state = self + .current_state + .take() + .expect("State machine lost track of its state!"); + let mut event_was_received = true; + + while event_was_received { + let transition = state.handle_event(&mut self.commands); + + event_was_received = transition.is_because_of_an_event(); + state = transition.into_wrapped_tunnel_state(); + } + + self.current_state = Some(state); + + Ok(Async::NotReady) + } +} + +/// Asynchronous result of an attempt to progress a state. +enum EventConsequence<T: TunnelState> { + /// Transition to a new state. + NewState(StateEntryResult), + /// An event was received, but it was ignored by the state so no transition is performed. + SameState(T), + /// No events were received, the event loop should block until one becomes available. + NoEvents(T), +} + +impl<T> EventConsequence<T> +where + T: TunnelState, +{ + /// Checks if this transition happened after an event was received. + pub fn is_because_of_an_event(&self) -> bool { + use self::EventConsequence::*; + + match self { + NewState(_) | SameState(_) => true, + NoEvents(_) => false, + } + } + + /// Helper method to chain handling multiple different event types. + /// + /// The `handle_event` is only called if no events were handled so far. + pub fn or_else<F>(self, handle_event: F) -> Self + where + F: FnOnce(T) -> Self, + { + use self::EventConsequence::*; + + match self { + NoEvents(state) => handle_event(state), + consequence => consequence, + } + } + + /// Extracts the destination state as a `TunnelStateWrapper`. + /// + /// If the destination state isn't the original target state, an error is logged. + pub fn into_wrapped_tunnel_state(self) -> TunnelStateWrapper { + use self::EventConsequence::*; + + match self { + NewState(Ok(wrapped_tunnel_state)) => wrapped_tunnel_state, + NewState(Err((error, wrapped_tunnel_state))) => { + error!("{}", error.chain_err(|| "Tunnel state transition failed")); + wrapped_tunnel_state + } + SameState(tunnel_state) | NoEvents(tunnel_state) => tunnel_state.into(), + } + } +} + +/// Result of entering a `T: TunnelState`. +/// +/// It is either the state itself when successful, or an error paired with a fallback state. +type StateEntryResult = ::std::result::Result<TunnelStateWrapper, (Error, TunnelStateWrapper)>; + +/// Trait that contains the method all states should implement to handle an event and advance the +/// state machine. +trait TunnelState: Into<TunnelStateWrapper> + Sized { + /// Type representing extra information required for entering the state. + type Bootstrap; + + /// Constructor function. + /// + /// This is the state entry point. It attempts to enter the state, and may fail by entering an + /// error or fallback state instead. + fn enter(bootstrap: Self::Bootstrap) -> StateEntryResult; + + /// Main state function. + /// + /// This is state exit point. It consumes itself and returns the next state to advance to when + /// it has completed, or itself if it wants to ignore a received event or if no events were + /// ready to be received. See [`EventConsequence`] for more details. + /// + /// An implementation can handle events from many sources, but it should also handle command + /// events received through the provided `commands` stream. + /// + /// [`EventConsequence`]: enum.EventConsequence.html + fn handle_event( + self, + commands: &mut mpsc::UnboundedReceiver<TunnelCommand>, + ) -> EventConsequence<Self>; +} + +/// Valid states of the tunnel. +/// +/// All implementations must implement `TunnelState` so that they can handle events and +/// commands in order to advance the state machine. +enum TunnelStateWrapper { + Disconnected(DisconnectedState), + Connecting(ConnectingState), + Connected(ConnectedState), + Disconnecting(DisconnectingState), +} + +macro_rules! impl_from_for_tunnel_state { + ($state_variant:ident($state_type:ident)) => { + impl From<$state_type> for TunnelStateWrapper { + fn from(state: $state_type) -> Self { + TunnelStateWrapper::$state_variant(state) + } + } + }; +} + +impl_from_for_tunnel_state!(Disconnected(DisconnectedState)); +impl_from_for_tunnel_state!(Connecting(ConnectingState)); +impl_from_for_tunnel_state!(Connected(ConnectedState)); +impl_from_for_tunnel_state!(Disconnecting(DisconnectingState)); + +impl TunnelState for TunnelStateWrapper { + type Bootstrap = <DisconnectedState as TunnelState>::Bootstrap; + + fn enter(bootstrap: Self::Bootstrap) -> StateEntryResult { + DisconnectedState::enter(bootstrap) + } + + fn handle_event( + self, + commands: &mut mpsc::UnboundedReceiver<TunnelCommand>, + ) -> EventConsequence<TunnelStateWrapper> { + use self::EventConsequence::*; + + macro_rules! handle_event { + ( $($state:ident),* $(,)* ) => { + match self { + $( + TunnelStateWrapper::$state(state) => match state.handle_event(commands) { + NewState(tunnel_state) => NewState(tunnel_state), + SameState(state) => SameState(TunnelStateWrapper::$state(state)), + NoEvents(state) => NoEvents(TunnelStateWrapper::$state(state)), + }, + )* + } + } + } + + handle_event! { + Disconnected, + Connecting, + Connected, + Disconnecting, + } + } +} + +impl Debug for TunnelStateWrapper { + fn fmt(&self, formatter: &mut Formatter) -> FmtResult { + use self::TunnelStateWrapper::*; + + match *self { + Disconnected(_) => write!(formatter, "TunnelStateWrapper::Disconnected(_)"), + Connecting(_) => write!(formatter, "TunnelStateWrapper::Connecting(_)"), + Connected(_) => write!(formatter, "TunnelStateWrapper::Connected(_)"), + Disconnecting(_) => write!(formatter, "TunnelStateWrapper::Disconnecting(_)"), + } + } +} + +/// Internal handle to request tunnel to be closed. +pub struct CloseHandle { + tunnel_close_handle: tunnel::CloseHandle, +} + +impl CloseHandle { + fn new(tunnel_monitor: &TunnelMonitor) -> Self { + CloseHandle { + tunnel_close_handle: tunnel_monitor.close_handle(), + } + } + + fn close(self) -> oneshot::Receiver<io::Result<()>> { + let (close_tx, close_rx) = oneshot::channel(); + + thread::spawn(move || { + let _ = close_tx.send(self.tunnel_close_handle.close()); + trace!("Tunnel kill thread exit"); + }); + + close_rx + } +} |
