diff options
3 files changed, 99 insertions, 37 deletions
diff --git a/mullvad-daemon/src/tunnel_state_machine/connected_state.rs b/mullvad-daemon/src/tunnel_state_machine/connected_state.rs index 5163029ec1..4a006153ba 100644 --- a/mullvad-daemon/src/tunnel_state_machine/connected_state.rs +++ b/mullvad-daemon/src/tunnel_state_machine/connected_state.rs @@ -1,32 +1,43 @@ use futures::sync::mpsc; use futures::Stream; -use talpid_core::tunnel::TunnelEvent; +use talpid_core::tunnel::{TunnelEvent, TunnelMetadata}; +use talpid_types::net::TunnelEndpoint; use super::{ AfterDisconnect, CloseHandle, DisconnectingState, EventConsequence, StateEntryResult, - TunnelCommand, TunnelState, TunnelStateWrapper, + TunnelCommand, TunnelState, TunnelStateTransition, TunnelStateWrapper, }; pub struct ConnectedStateBootstrap { + pub metadata: TunnelMetadata, pub tunnel_events: mpsc::UnboundedReceiver<TunnelEvent>, + pub tunnel_endpoint: TunnelEndpoint, pub close_handle: CloseHandle, } /// The tunnel is up and working. pub struct ConnectedState { - close_handle: CloseHandle, + metadata: TunnelMetadata, tunnel_events: mpsc::UnboundedReceiver<TunnelEvent>, + tunnel_endpoint: TunnelEndpoint, + close_handle: CloseHandle, } impl ConnectedState { fn from(bootstrap: ConnectedStateBootstrap) -> Self { ConnectedState { - close_handle: bootstrap.close_handle, + metadata: bootstrap.metadata, tunnel_events: bootstrap.tunnel_events, + tunnel_endpoint: bootstrap.tunnel_endpoint, + close_handle: bootstrap.close_handle, } } + pub fn info(&self) -> TunnelStateTransition { + TunnelStateTransition::Connected(self.tunnel_endpoint, self.metadata.clone()) + } + fn handle_commands( self, commands: &mut mpsc::UnboundedReceiver<TunnelCommand>, diff --git a/mullvad-daemon/src/tunnel_state_machine/connecting_state.rs b/mullvad-daemon/src/tunnel_state_machine/connecting_state.rs index e9677ecf6b..c9860a2105 100644 --- a/mullvad-daemon/src/tunnel_state_machine/connecting_state.rs +++ b/mullvad-daemon/src/tunnel_state_machine/connecting_state.rs @@ -8,13 +8,13 @@ use futures::sink::Wait; use futures::sync::mpsc; use futures::{Sink, Stream}; -use talpid_core::tunnel::{TunnelEvent, TunnelMonitor}; -use talpid_types::net::TunnelEndpointData; +use talpid_core::tunnel::{TunnelEvent, TunnelMetadata, TunnelMonitor}; +use talpid_types::net::{TunnelEndpoint, TunnelEndpointData}; use super::{ AfterDisconnect, CloseHandle, ConnectedState, ConnectedStateBootstrap, DisconnectedState, DisconnectingState, EventConsequence, Result, ResultExt, StateEntryResult, TunnelCommand, - TunnelParameters, TunnelState, TunnelStateWrapper, OPENVPN_LOG_FILENAME, + TunnelParameters, TunnelState, TunnelStateTransition, TunnelStateWrapper, OPENVPN_LOG_FILENAME, WIREGUARD_LOG_FILENAME, }; use logging; @@ -30,11 +30,13 @@ const TUNNEL_INTERFACE_ALIAS: Option<&str> = None; pub struct ConnectingState { close_handle: CloseHandle, tunnel_events: mpsc::UnboundedReceiver<TunnelEvent>, + tunnel_endpoint: TunnelEndpoint, } impl ConnectingState { fn new(parameters: TunnelParameters) -> Result<Self> { let (event_tx, event_rx) = mpsc::unbounded(); + let tunnel_endpoint = parameters.endpoint; let monitor = Self::spawn_tunnel_monitor(parameters, event_tx.wait())?; let close_handle = CloseHandle::new(&monitor); @@ -43,6 +45,7 @@ impl ConnectingState { Ok(ConnectingState { close_handle, tunnel_events: event_rx, + tunnel_endpoint, }) } @@ -108,13 +111,19 @@ impl ConnectingState { }); } - fn into_connected_state_bootstrap(self) -> ConnectedStateBootstrap { + fn into_connected_state_bootstrap(self, metadata: TunnelMetadata) -> ConnectedStateBootstrap { ConnectedStateBootstrap { + metadata, tunnel_events: self.tunnel_events, + tunnel_endpoint: self.tunnel_endpoint, close_handle: self.close_handle, } } + pub fn info(&self) -> TunnelStateTransition { + TunnelStateTransition::Connecting(self.tunnel_endpoint) + } + fn handle_commands( self, commands: &mut mpsc::UnboundedReceiver<TunnelCommand>, @@ -134,9 +143,9 @@ impl ConnectingState { use self::EventConsequence::*; match try_handle_event!(self, self.tunnel_events.poll()) { - Ok(TunnelEvent::Up(_)) => { - NewState(ConnectedState::enter(self.into_connected_state_bootstrap())) - } + Ok(TunnelEvent::Up(metadata)) => NewState(ConnectedState::enter( + self.into_connected_state_bootstrap(metadata), + )), Ok(_) => SameState(self), Err(_) => NewState(DisconnectingState::enter(( self.close_handle.close(), diff --git a/mullvad-daemon/src/tunnel_state_machine/mod.rs b/mullvad-daemon/src/tunnel_state_machine/mod.rs index 57f611f3ef..2c8742b812 100644 --- a/mullvad-daemon/src/tunnel_state_machine/mod.rs +++ b/mullvad-daemon/src/tunnel_state_machine/mod.rs @@ -13,11 +13,12 @@ use std::thread; use error_chain::ChainedError; use futures::sync::{mpsc, oneshot}; -use futures::{Async, Future, Poll}; +use futures::{Async, Poll, Stream}; use tokio_core::reactor::Core; use mullvad_types::account::AccountToken; -use talpid_core::tunnel::{self, TunnelMonitor}; +use talpid_core::mpsc::IntoSender; +use talpid_core::tunnel::{self, TunnelMetadata, TunnelMonitor}; use talpid_types::net::{TunnelEndpoint, TunnelOptions}; use self::connected_state::{ConnectedState, ConnectedStateBootstrap}; @@ -29,11 +30,16 @@ use super::{OPENVPN_LOG_FILENAME, WIREGUARD_LOG_FILENAME}; error_chain!{} /// Spawn the tunnel state machine thread, returning a channel for sending tunnel commands. -pub fn spawn() -> mpsc::UnboundedSender<TunnelCommand> { +pub fn spawn<T>( + state_change_listener: IntoSender<TunnelStateTransition, T>, +) -> mpsc::UnboundedSender<TunnelCommand> +where + T: From<TunnelStateTransition> + Send + 'static, +{ let (command_tx, command_rx) = mpsc::unbounded(); thread::spawn(move || { - if let Err(error) = event_loop(command_rx) { + if let Err(error) = event_loop(command_rx, state_change_listener) { error!("{}", error.display_chain()); } }); @@ -41,15 +47,24 @@ pub fn spawn() -> mpsc::UnboundedSender<TunnelCommand> { command_tx } -fn event_loop(commands: mpsc::UnboundedReceiver<TunnelCommand>) -> Result<()> { +fn event_loop<T>( + commands: mpsc::UnboundedReceiver<TunnelCommand>, + state_change_listener: IntoSender<TunnelStateTransition, T>, +) -> Result<()> +where + T: From<TunnelStateTransition> + Send + 'static, +{ let mut reactor = Core::new().chain_err(|| "Failed to initialize tunnel state machine event loop")?; let state_machine = TunnelStateMachine::new(commands); reactor - .run(state_machine) - .chain_err(|| "Tunnel state machine finished with an error") + .run(state_machine.for_each(|state_change_event| { + state_change_listener + .send(state_change_event) + .chain_err(|| "Failed to send state change event to listener") + })).chain_err(|| "Tunnel state machine finished with an error") } /// Representation of external commands for the tunnel state machine. @@ -69,11 +84,21 @@ pub struct TunnelParameters { pub account_token: AccountToken, } +/// Event resulting from a transition to a new tunnel state. +#[derive(Clone, Debug, PartialEq)] +pub enum TunnelStateTransition { + Disconnected, + Connecting(TunnelEndpoint), + Connected(TunnelEndpoint, TunnelMetadata), + Disconnecting, +} + /// Asynchronous handling of the tunnel state machine. /// -/// This type implements `Future`, and attempts to advance the state machine based on the events +/// This type implements `Stream`, and attempts to advance the state machine based on the events /// received on the commands stream and possibly on events that specific states are also listening -/// to. +/// to. Every time it successfully advances the state machine a `TunnelStateTransition` is emitted +/// by the stream. struct TunnelStateMachine { current_state: Option<TunnelStateWrapper>, commands: mpsc::UnboundedReceiver<TunnelCommand>, @@ -91,27 +116,42 @@ impl TunnelStateMachine { } } -impl Future for TunnelStateMachine { - type Item = (); +impl Stream for TunnelStateMachine { + type Item = TunnelStateTransition; type Error = Error; - fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + use self::EventConsequence::*; + let mut state = self .current_state .take() .expect("State machine lost track of its state!"); - let mut event_was_received = true; + let mut result = Ok(Async::Ready(None)); + let mut event_was_ignored = true; - while event_was_received { + while event_was_ignored { let transition = state.handle_event(&mut self.commands); - event_was_received = transition.is_because_of_an_event(); + event_was_ignored = match transition { + SameState(_) => true, + NewState(_) | NoEvents(_) => false, + }; + + result = match transition { + NewState(Ok(ref state)) | NewState(Err((_, ref state))) => { + Ok(Async::Ready(Some(state.info()))) + } + SameState(_) => result, + NoEvents(_) => Ok(Async::NotReady), + }; + state = transition.into_wrapped_tunnel_state(); } self.current_state = Some(state); - Ok(Async::NotReady) + result } } @@ -129,16 +169,6 @@ impl<T> EventConsequence<T> where T: TunnelState, { - /// Checks if this transition happened after an event was received. - pub fn is_because_of_an_event(&self) -> bool { - use self::EventConsequence::*; - - match self { - NewState(_) | SameState(_) => true, - NoEvents(_) => false, - } - } - /// Helper method to chain handling multiple different event types. /// /// The `handle_event` is only called if no events were handled so far. @@ -215,6 +245,18 @@ enum TunnelStateWrapper { Disconnecting(DisconnectingState), } +impl TunnelStateWrapper { + /// Returns information describing the state. + fn info(&self) -> TunnelStateTransition { + match *self { + TunnelStateWrapper::Disconnected(_) => TunnelStateTransition::Disconnected, + TunnelStateWrapper::Connecting(ref state) => state.info(), + TunnelStateWrapper::Connected(ref state) => state.info(), + TunnelStateWrapper::Disconnecting(_) => TunnelStateTransition::Disconnecting, + } + } +} + macro_rules! impl_from_for_tunnel_state { ($state_variant:ident($state_type:ident)) => { impl From<$state_type> for TunnelStateWrapper { |
