diff options
| author | Janito Vaqueiro Ferreira Filho <janito@mullvad.net> | 2018-08-27 13:24:49 -0300 |
|---|---|---|
| committer | Janito Vaqueiro Ferreira Filho <janito@mullvad.net> | 2018-08-29 10:35:37 -0300 |
| commit | 0b6a3e38032248d7d33c3ec84b643ba609d7f96f (patch) | |
| tree | 92619604cdd321ffcc5730b278f524e3eb5ce6f1 /talpid-core/src | |
| parent | 4de28f0913f11f1b4c29e4e0d4cc85a664c1c97b (diff) | |
| download | mullvadvpn-0b6a3e38032248d7d33c3ec84b643ba609d7f96f.tar.xz mullvadvpn-0b6a3e38032248d7d33c3ec84b643ba609d7f96f.zip | |
Move tunnel state machine to Talpid
Diffstat (limited to 'talpid-core/src')
| -rw-r--r-- | talpid-core/src/lib.rs | 5 | ||||
| -rw-r--r-- | talpid-core/src/tunnel_state_machine/connected_state.rs | 183 | ||||
| -rw-r--r-- | talpid-core/src/tunnel_state_machine/connecting_state.rs | 294 | ||||
| -rw-r--r-- | talpid-core/src/tunnel_state_machine/disconnected_state.rs | 48 | ||||
| -rw-r--r-- | talpid-core/src/tunnel_state_machine/disconnecting_state.rs | 103 | ||||
| -rw-r--r-- | talpid-core/src/tunnel_state_machine/macros.rs | 21 | ||||
| -rw-r--r-- | talpid-core/src/tunnel_state_machine/mod.rs | 381 |
7 files changed, 1035 insertions, 0 deletions
diff --git a/talpid-core/src/lib.rs b/talpid-core/src/lib.rs index b96764848f..55df5e135f 100644 --- a/talpid-core/src/lib.rs +++ b/talpid-core/src/lib.rs @@ -19,6 +19,7 @@ extern crate log; extern crate error_chain; #[cfg(target_os = "linux")] extern crate failure; +extern crate futures; #[cfg(unix)] extern crate ipnetwork; extern crate jsonrpc_core; @@ -29,6 +30,7 @@ extern crate jsonrpc_macros; extern crate lazy_static; extern crate libc; extern crate shell_escape; +extern crate tokio_core; extern crate uuid; #[cfg(target_os = "linux")] extern crate which; @@ -56,4 +58,7 @@ pub mod mpsc; /// Abstractions over operating system network security settings. pub mod security; +/// State machine to handle tunnel configuration. +pub mod tunnel_state_machine; + mod mktemp; diff --git a/talpid-core/src/tunnel_state_machine/connected_state.rs b/talpid-core/src/tunnel_state_machine/connected_state.rs new file mode 100644 index 0000000000..1b119eb387 --- /dev/null +++ b/talpid-core/src/tunnel_state_machine/connected_state.rs @@ -0,0 +1,183 @@ +use futures::sync::{mpsc, oneshot}; +use futures::{Async, Future, Stream}; + +use talpid_types::net::TunnelEndpoint; + +use super::{ + AfterDisconnect, ConnectingState, DisconnectingState, EventConsequence, Result, ResultExt, + SharedTunnelStateValues, StateEntryResult, TunnelCommand, TunnelParameters, TunnelState, + TunnelStateWrapper, +}; +use security::{NetworkSecurity, SecurityPolicy}; +use tunnel::{CloseHandle, TunnelEvent, TunnelMetadata}; + +pub struct ConnectedStateBootstrap { + pub metadata: TunnelMetadata, + pub tunnel_events: mpsc::UnboundedReceiver<TunnelEvent>, + pub tunnel_endpoint: TunnelEndpoint, + pub tunnel_parameters: TunnelParameters, + pub tunnel_close_event: oneshot::Receiver<()>, + pub close_handle: CloseHandle, +} + +/// The tunnel is up and working. +pub struct ConnectedState { + metadata: TunnelMetadata, + tunnel_events: mpsc::UnboundedReceiver<TunnelEvent>, + tunnel_endpoint: TunnelEndpoint, + tunnel_parameters: TunnelParameters, + tunnel_close_event: oneshot::Receiver<()>, + close_handle: CloseHandle, +} + +impl ConnectedState { + fn from(bootstrap: ConnectedStateBootstrap) -> Self { + ConnectedState { + metadata: bootstrap.metadata, + tunnel_events: bootstrap.tunnel_events, + tunnel_endpoint: bootstrap.tunnel_endpoint, + tunnel_parameters: bootstrap.tunnel_parameters, + tunnel_close_event: bootstrap.tunnel_close_event, + close_handle: bootstrap.close_handle, + } + } + + fn set_security_policy(&self, shared_values: &mut SharedTunnelStateValues) -> Result<()> { + let policy = SecurityPolicy::Connected { + relay_endpoint: self.tunnel_endpoint.to_endpoint(), + tunnel: self.metadata.clone(), + allow_lan: self.tunnel_parameters.allow_lan, + }; + + debug!("Setting security policy: {:?}", policy); + shared_values + .security + .apply_policy(policy) + .chain_err(|| "Failed to apply security policy for connected state") + } + + fn handle_commands( + mut self, + commands: &mut mpsc::UnboundedReceiver<TunnelCommand>, + shared_values: &mut SharedTunnelStateValues, + ) -> EventConsequence<Self> { + use self::EventConsequence::*; + + match try_handle_event!(self, commands.poll()) { + Ok(TunnelCommand::Connect(parameters)) => { + if parameters != self.tunnel_parameters { + NewState(DisconnectingState::enter( + shared_values, + ( + self.close_handle, + self.tunnel_close_event, + AfterDisconnect::Reconnect(parameters), + ), + )) + } else { + SameState(self) + } + } + Ok(TunnelCommand::Disconnect) | Err(_) => NewState(DisconnectingState::enter( + shared_values, + ( + self.close_handle, + self.tunnel_close_event, + AfterDisconnect::Nothing, + ), + )), + Ok(TunnelCommand::AllowLan(allow_lan)) => { + self.tunnel_parameters.allow_lan = allow_lan; + + match self.set_security_policy(shared_values) { + Ok(()) => SameState(self), + Err(error) => { + error!("{}", error.chain_err(|| "Failed to update security policy")); + NewState(DisconnectingState::enter( + shared_values, + ( + self.close_handle, + self.tunnel_close_event, + AfterDisconnect::Nothing, + ), + )) + } + } + } + } + } + + fn handle_tunnel_events( + mut self, + shared_values: &mut SharedTunnelStateValues, + ) -> EventConsequence<Self> { + use self::EventConsequence::*; + + match try_handle_event!(self, self.tunnel_events.poll()) { + Ok(TunnelEvent::Down) | Err(_) => NewState(DisconnectingState::enter( + shared_values, + ( + self.close_handle, + self.tunnel_close_event, + AfterDisconnect::Reconnect(self.tunnel_parameters), + ), + )), + Ok(_) => SameState(self), + } + } + + fn handle_tunnel_close_event( + mut self, + shared_values: &mut SharedTunnelStateValues, + ) -> EventConsequence<Self> { + use self::EventConsequence::*; + + match self.tunnel_close_event.poll() { + Ok(Async::Ready(_)) => {} + Ok(Async::NotReady) => return NoEvents(self), + Err(_cancelled) => warn!("Tunnel monitor thread has stopped unexpectedly"), + } + + info!("Tunnel closed. Reconnecting."); + NewState(ConnectingState::enter( + shared_values, + self.tunnel_parameters, + )) + } +} + +impl TunnelState for ConnectedState { + type Bootstrap = ConnectedStateBootstrap; + + fn enter( + shared_values: &mut SharedTunnelStateValues, + bootstrap: Self::Bootstrap, + ) -> StateEntryResult { + let connected_state = ConnectedState::from(bootstrap); + + match connected_state.set_security_policy(shared_values) { + Ok(()) => Ok(TunnelStateWrapper::from(connected_state)), + Err(error) => Err(( + error, + DisconnectingState::enter( + shared_values, + ( + connected_state.close_handle, + connected_state.tunnel_close_event, + AfterDisconnect::Nothing, + ), + ).expect("Failed to disconnect after failed transition to connected state"), + )), + } + } + + fn handle_event( + self, + commands: &mut mpsc::UnboundedReceiver<TunnelCommand>, + shared_values: &mut SharedTunnelStateValues, + ) -> EventConsequence<Self> { + self.handle_commands(commands, shared_values) + .or_else(Self::handle_tunnel_events, shared_values) + .or_else(Self::handle_tunnel_close_event, shared_values) + } +} diff --git a/talpid-core/src/tunnel_state_machine/connecting_state.rs b/talpid-core/src/tunnel_state_machine/connecting_state.rs new file mode 100644 index 0000000000..2588da0e77 --- /dev/null +++ b/talpid-core/src/tunnel_state_machine/connecting_state.rs @@ -0,0 +1,294 @@ +use std::ffi::OsString; +use std::path::PathBuf; +use std::sync::Mutex; +use std::thread; +use std::time::{Duration, Instant}; + +use futures::sink::Wait; +use futures::sync::{mpsc, oneshot}; +use futures::{Async, Future, Sink, Stream}; + +use talpid_types::net::{TunnelEndpoint, TunnelEndpointData}; + +use super::{ + AfterDisconnect, ConnectedState, ConnectedStateBootstrap, DisconnectedState, + DisconnectingState, EventConsequence, Result, ResultExt, SharedTunnelStateValues, + StateEntryResult, TunnelCommand, TunnelParameters, TunnelState, TunnelStateWrapper, +}; +use logging; +use security::{NetworkSecurity, SecurityPolicy}; +use tunnel::{CloseHandle, TunnelEvent, TunnelMetadata, TunnelMonitor}; + +const MIN_TUNNEL_ALIVE_TIME: Duration = Duration::from_millis(1000); + +const OPENVPN_LOG_FILENAME: &str = "openvpn.log"; +const WIREGUARD_LOG_FILENAME: &str = "wireguard.log"; + +#[cfg(windows)] +const TUNNEL_INTERFACE_ALIAS: Option<&str> = Some("Mullvad"); +#[cfg(not(windows))] +const TUNNEL_INTERFACE_ALIAS: Option<&str> = None; + +/// The tunnel has been started, but it is not established/functional. +pub struct ConnectingState { + tunnel_events: mpsc::UnboundedReceiver<TunnelEvent>, + tunnel_endpoint: TunnelEndpoint, + tunnel_parameters: TunnelParameters, + tunnel_close_event: oneshot::Receiver<()>, + close_handle: CloseHandle, +} + +impl ConnectingState { + fn new( + shared_values: &mut SharedTunnelStateValues, + parameters: TunnelParameters, + ) -> Result<Self> { + Self::set_security_policy(shared_values, parameters.endpoint, parameters.allow_lan)?; + + let tunnel_endpoint = parameters.endpoint; + let (tunnel_events, tunnel_close_event, close_handle) = Self::start_tunnel(¶meters)?; + + Ok(ConnectingState { + tunnel_events, + tunnel_endpoint, + tunnel_parameters: parameters, + tunnel_close_event, + close_handle, + }) + } + + fn set_security_policy( + shared_values: &mut SharedTunnelStateValues, + endpoint: TunnelEndpoint, + allow_lan: bool, + ) -> Result<()> { + let policy = SecurityPolicy::Connecting { + relay_endpoint: endpoint.to_endpoint(), + allow_lan, + }; + + debug!("Setting security policy: {:?}", policy); + shared_values + .security + .apply_policy(policy) + .chain_err(|| "Failed to apply security policy for connecting state") + } + + fn start_tunnel( + parameters: &TunnelParameters, + ) -> Result<( + mpsc::UnboundedReceiver<TunnelEvent>, + oneshot::Receiver<()>, + CloseHandle, + )> { + let (event_tx, event_rx) = mpsc::unbounded(); + let monitor = Self::spawn_tunnel_monitor(¶meters, event_tx.wait())?; + let close_handle = monitor.close_handle(); + let tunnel_close_event = Self::spawn_tunnel_monitor_wait_thread(monitor); + + Ok((event_rx, tunnel_close_event, close_handle)) + } + + fn spawn_tunnel_monitor( + parameters: &TunnelParameters, + events: Wait<mpsc::UnboundedSender<TunnelEvent>>, + ) -> Result<TunnelMonitor> { + let event_tx = Mutex::new(events); + let on_tunnel_event = move |event| { + let send_result = event_tx + .lock() + .expect("A thread panicked while sending a tunnel event") + .send(event); + + if send_result.is_err() { + warn!("Tunnel state machine stopped before tunnel event was received"); + } + }; + let log_file = Self::prepare_tunnel_log_file(¶meters)?; + + TunnelMonitor::new( + parameters.endpoint, + ¶meters.options, + TUNNEL_INTERFACE_ALIAS.to_owned().map(OsString::from), + ¶meters.username, + log_file.as_ref().map(PathBuf::as_path), + ¶meters.resource_dir, + on_tunnel_event, + ).chain_err(|| "Unable to start tunnel monitor") + } + + fn prepare_tunnel_log_file(parameters: &TunnelParameters) -> Result<Option<PathBuf>> { + if let Some(ref log_dir) = parameters.log_dir { + let filename = match parameters.endpoint.tunnel { + TunnelEndpointData::OpenVpn(_) => OPENVPN_LOG_FILENAME, + TunnelEndpointData::Wireguard(_) => WIREGUARD_LOG_FILENAME, + }; + let tunnel_log = log_dir.join(filename); + logging::rotate_log(&tunnel_log).chain_err(|| "Unable to rotate tunnel log")?; + Ok(Some(tunnel_log)) + } else { + Ok(None) + } + } + + fn spawn_tunnel_monitor_wait_thread(tunnel_monitor: TunnelMonitor) -> oneshot::Receiver<()> { + let (tunnel_close_event_tx, tunnel_close_event_rx) = oneshot::channel(); + + thread::spawn(move || { + let start = Instant::now(); + + match tunnel_monitor.wait() { + Ok(_) => debug!("Tunnel has finished without errors"), + Err(error) => { + let chained_error = error.chain_err(|| "Tunnel has stopped unexpectedly"); + warn!("{}", chained_error); + } + } + + if let Some(remaining_time) = MIN_TUNNEL_ALIVE_TIME.checked_sub(start.elapsed()) { + thread::sleep(remaining_time); + } + + if tunnel_close_event_tx.send(()).is_err() { + warn!("Tunnel state machine stopped before receiving tunnel closed event"); + } + + trace!("Tunnel monitor thread exit"); + }); + + tunnel_close_event_rx + } + + fn into_connected_state_bootstrap(self, metadata: TunnelMetadata) -> ConnectedStateBootstrap { + ConnectedStateBootstrap { + metadata, + tunnel_events: self.tunnel_events, + tunnel_endpoint: self.tunnel_endpoint, + tunnel_parameters: self.tunnel_parameters, + tunnel_close_event: self.tunnel_close_event, + close_handle: self.close_handle, + } + } + + fn handle_commands( + mut self, + commands: &mut mpsc::UnboundedReceiver<TunnelCommand>, + shared_values: &mut SharedTunnelStateValues, + ) -> EventConsequence<Self> { + use self::EventConsequence::*; + + match try_handle_event!(self, commands.poll()) { + Ok(TunnelCommand::Connect(parameters)) => { + if parameters != self.tunnel_parameters { + NewState(DisconnectingState::enter( + shared_values, + ( + self.close_handle, + self.tunnel_close_event, + AfterDisconnect::Reconnect(parameters), + ), + )) + } else { + SameState(self) + } + } + Ok(TunnelCommand::Disconnect) | Err(_) => NewState(DisconnectingState::enter( + shared_values, + ( + self.close_handle, + self.tunnel_close_event, + AfterDisconnect::Nothing, + ), + )), + Ok(TunnelCommand::AllowLan(allow_lan)) => { + self.tunnel_parameters.allow_lan = allow_lan; + match Self::set_security_policy(shared_values, self.tunnel_endpoint, allow_lan) { + Ok(()) => SameState(self), + Err(error) => { + error!("{}", error.chain_err(|| "Failed to update security policy")); + NewState(DisconnectingState::enter( + shared_values, + ( + self.close_handle, + self.tunnel_close_event, + AfterDisconnect::Nothing, + ), + )) + } + } + } + } + } + + fn handle_tunnel_events( + mut self, + shared_values: &mut SharedTunnelStateValues, + ) -> EventConsequence<Self> { + use self::EventConsequence::*; + + match try_handle_event!(self, self.tunnel_events.poll()) { + Ok(TunnelEvent::Up(metadata)) => NewState(ConnectedState::enter( + shared_values, + self.into_connected_state_bootstrap(metadata), + )), + Ok(_) => SameState(self), + Err(_) => NewState(DisconnectingState::enter( + shared_values, + ( + self.close_handle, + self.tunnel_close_event, + AfterDisconnect::Reconnect(self.tunnel_parameters), + ), + )), + } + } + + fn handle_tunnel_close_event( + mut self, + shared_values: &mut SharedTunnelStateValues, + ) -> EventConsequence<Self> { + use self::EventConsequence::*; + + match self.tunnel_close_event.poll() { + Ok(Async::Ready(_)) => {} + Ok(Async::NotReady) => return NoEvents(self), + Err(_cancelled) => warn!("Tunnel monitor thread has stopped unexpectedly"), + } + + info!("Tunnel closed. Reconnecting."); + NewState(ConnectingState::enter( + shared_values, + self.tunnel_parameters, + )) + } +} + +impl TunnelState for ConnectingState { + type Bootstrap = TunnelParameters; + + fn enter( + shared_values: &mut SharedTunnelStateValues, + parameters: Self::Bootstrap, + ) -> StateEntryResult { + Self::new(shared_values, parameters) + .map(TunnelStateWrapper::from) + .chain_err(|| "Failed to start tunnel") + .map_err(|error| { + ( + error, + DisconnectedState::enter(shared_values, ()) + .expect("Failed to transition to fallback disconnected state"), + ) + }) + } + + fn handle_event( + self, + commands: &mut mpsc::UnboundedReceiver<TunnelCommand>, + shared_values: &mut SharedTunnelStateValues, + ) -> EventConsequence<Self> { + self.handle_commands(commands, shared_values) + .or_else(Self::handle_tunnel_events, shared_values) + .or_else(Self::handle_tunnel_close_event, shared_values) + } +} diff --git a/talpid-core/src/tunnel_state_machine/disconnected_state.rs b/talpid-core/src/tunnel_state_machine/disconnected_state.rs new file mode 100644 index 0000000000..694e280ec6 --- /dev/null +++ b/talpid-core/src/tunnel_state_machine/disconnected_state.rs @@ -0,0 +1,48 @@ +use error_chain::ChainedError; +use futures::sync::mpsc; +use futures::Stream; + +use super::{ + ConnectingState, Error, EventConsequence, SharedTunnelStateValues, StateEntryResult, + TunnelCommand, TunnelState, TunnelStateWrapper, +}; +use security::NetworkSecurity; + +/// No tunnel is running. +pub struct DisconnectedState; + +impl DisconnectedState { + fn reset_security_policy(shared_values: &mut SharedTunnelStateValues) { + debug!("Resetting security policy"); + if let Err(error) = shared_values.security.reset_policy() { + let chained_error = Error::with_chain(error, "Failed to reset security policy"); + error!("{}", chained_error.display_chain()); + } + } +} + +impl TunnelState for DisconnectedState { + type Bootstrap = (); + + fn enter(shared_values: &mut SharedTunnelStateValues, _: Self::Bootstrap) -> StateEntryResult { + Self::reset_security_policy(shared_values); + + Ok(TunnelStateWrapper::from(DisconnectedState)) + } + + fn handle_event( + self, + commands: &mut mpsc::UnboundedReceiver<TunnelCommand>, + shared_values: &mut SharedTunnelStateValues, + ) -> EventConsequence<Self> { + use self::EventConsequence::*; + + match try_handle_event!(self, commands.poll()) { + Ok(TunnelCommand::Connect(parameters)) => { + NewState(ConnectingState::enter(shared_values, parameters)) + } + Ok(_) => SameState(self), + Err(_) => Finished, + } + } +} diff --git a/talpid-core/src/tunnel_state_machine/disconnecting_state.rs b/talpid-core/src/tunnel_state_machine/disconnecting_state.rs new file mode 100644 index 0000000000..5b80ad0598 --- /dev/null +++ b/talpid-core/src/tunnel_state_machine/disconnecting_state.rs @@ -0,0 +1,103 @@ +use error_chain::ChainedError; +use futures::sync::{mpsc, oneshot}; +use futures::{Async, Future, Stream}; + +use super::{ + ConnectingState, DisconnectedState, EventConsequence, ResultExt, SharedTunnelStateValues, + StateEntryResult, TunnelCommand, TunnelParameters, TunnelState, TunnelStateWrapper, +}; +use tunnel::CloseHandle; + +/// This state is active from when we manually trigger a tunnel kill until the tunnel wait +/// operation (TunnelExit) returned. +pub struct DisconnectingState { + exited: oneshot::Receiver<()>, + after_disconnect: AfterDisconnect, +} + +impl DisconnectingState { + fn handle_commands( + mut self, + commands: &mut mpsc::UnboundedReceiver<TunnelCommand>, + ) -> EventConsequence<Self> { + use self::AfterDisconnect::*; + + let event = try_handle_event!(self, commands.poll()); + let after_disconnect = self.after_disconnect; + + self.after_disconnect = match after_disconnect { + AfterDisconnect::Nothing => match event { + Ok(TunnelCommand::Connect(parameters)) => Reconnect(parameters), + _ => Nothing, + }, + AfterDisconnect::Reconnect(mut tunnel_parameters) => match event { + Ok(TunnelCommand::Connect(parameters)) => Reconnect(parameters), + Ok(TunnelCommand::AllowLan(allow_lan)) => { + tunnel_parameters.allow_lan = allow_lan; + Reconnect(tunnel_parameters) + } + Ok(TunnelCommand::Disconnect) | Err(_) => Nothing, + }, + }; + + EventConsequence::SameState(self) + } + + fn handle_exit_event( + mut self, + shared_values: &mut SharedTunnelStateValues, + ) -> EventConsequence<Self> { + use self::EventConsequence::*; + + match self.exited.poll() { + Ok(Async::NotReady) => NoEvents(self), + Ok(Async::Ready(_)) | Err(_) => NewState(self.after_disconnect(shared_values)), + } + } + + fn after_disconnect(self, shared_values: &mut SharedTunnelStateValues) -> StateEntryResult { + match self.after_disconnect { + AfterDisconnect::Nothing => DisconnectedState::enter(shared_values, ()), + AfterDisconnect::Reconnect(tunnel_parameters) => { + ConnectingState::enter(shared_values, tunnel_parameters) + } + } + } +} + +impl TunnelState for DisconnectingState { + type Bootstrap = (CloseHandle, oneshot::Receiver<()>, AfterDisconnect); + + fn enter( + _: &mut SharedTunnelStateValues, + (close_handle, exited, after_disconnect): Self::Bootstrap, + ) -> StateEntryResult { + let close_result = close_handle + .close() + .chain_err(|| "Failed to request tunnel monitor to close the tunnel"); + + if let Err(error) = close_result { + error!("{}", error.display_chain()); + } + + Ok(TunnelStateWrapper::from(DisconnectingState { + exited, + after_disconnect, + })) + } + + fn handle_event( + self, + commands: &mut mpsc::UnboundedReceiver<TunnelCommand>, + shared_values: &mut SharedTunnelStateValues, + ) -> EventConsequence<Self> { + self.handle_commands(commands) + .or_else(Self::handle_exit_event, shared_values) + } +} + +/// Which state should be transitioned to after disconnection is complete. +pub enum AfterDisconnect { + Nothing, + Reconnect(TunnelParameters), +} diff --git a/talpid-core/src/tunnel_state_machine/macros.rs b/talpid-core/src/tunnel_state_machine/macros.rs new file mode 100644 index 0000000000..2241c8cb06 --- /dev/null +++ b/talpid-core/src/tunnel_state_machine/macros.rs @@ -0,0 +1,21 @@ +/// Try to receive an event from a `Stream`'s asynchronous poll expression. +/// +/// This macro is similar to the `try_ready!` macro provided in `futures`. If there is an event +/// ready, it will be returned wrapped in a `Result`. If there are no events ready to be received, +/// the outer function will return with a transition that indicates that no events were received, +/// which is analogous to `Async::NotReady`. +/// +/// When the asynchronous event indicates that the stream has finished or that it has failed, an +/// error type is returned so that either close scenario can be handled in a similar way. +macro_rules! try_handle_event { + ($same_state:expr, $event:expr) => { + match $event { + Ok(::futures::Async::Ready(Some(event))) => Ok(event), + Ok(::futures::Async::Ready(None)) => Err(None), + Ok(::futures::Async::NotReady) => { + return ::tunnel_state_machine::EventConsequence::NoEvents($same_state); + } + Err(error) => Err(Some(error)), + } + }; +} diff --git a/talpid-core/src/tunnel_state_machine/mod.rs b/talpid-core/src/tunnel_state_machine/mod.rs new file mode 100644 index 0000000000..c3d592c952 --- /dev/null +++ b/talpid-core/src/tunnel_state_machine/mod.rs @@ -0,0 +1,381 @@ +#[macro_use] +mod macros; + +mod connected_state; +mod connecting_state; +mod disconnected_state; +mod disconnecting_state; + +use std::fmt::{Debug, Formatter, Result as FmtResult}; +use std::path::{Path, PathBuf}; +use std::sync::mpsc as sync_mpsc; +use std::thread; + +use error_chain::ChainedError; +use futures::sync::mpsc; +use futures::{Async, Future, Poll, Stream}; +use tokio_core::reactor::Core; + +use talpid_types::net::{TunnelEndpoint, TunnelOptions}; + +use self::connected_state::{ConnectedState, ConnectedStateBootstrap}; +use self::connecting_state::ConnectingState; +use self::disconnected_state::DisconnectedState; +use self::disconnecting_state::{AfterDisconnect, DisconnectingState}; +use super::mpsc::IntoSender; +use super::security::{NetworkSecurity, NetworkSecurityImpl}; + +error_chain! { + errors { + /// An error occurred while setting up the network security. + NetworkSecurityError { + description("Network security error") + } + /// An error occurred while attempting to set up the event loop for the tunnel state + /// machine. + ReactorError { + description("Failed to initialize tunnel state machine event loop executor") + } + } +} + +/// Spawn the tunnel state machine thread, returning a channel for sending tunnel commands. +pub fn spawn<P, T>( + cache_dir: P, + state_change_listener: IntoSender<TunnelStateTransition, T>, +) -> Result<mpsc::UnboundedSender<TunnelCommand>> +where + P: AsRef<Path> + Send + 'static, + T: From<TunnelStateTransition> + Send + 'static, +{ + let (command_tx, command_rx) = mpsc::unbounded(); + let (startup_result_tx, startup_result_rx) = sync_mpsc::channel(); + + thread::spawn( + move || match create_event_loop(cache_dir, command_rx, state_change_listener) { + Ok((mut reactor, event_loop)) => { + startup_result_tx.send(Ok(())).expect( + "Tunnel state machine won't be started because the owner thread crashed", + ); + + if let Err(error) = reactor.run(event_loop) { + let chained_error = + Error::with_chain(error, "Tunnel state machine exited with an error"); + error!("{}", chained_error.display_chain()); + } + } + Err(startup_error) => { + startup_result_tx + .send(Err(startup_error)) + .expect("Failed to send startup error"); + } + }, + ); + + startup_result_rx + .recv() + .expect("Failed to start tunnel state machine thread") + .map(|_| command_tx) +} + +fn create_event_loop<P, T>( + cache_dir: P, + commands: mpsc::UnboundedReceiver<TunnelCommand>, + state_change_listener: IntoSender<TunnelStateTransition, T>, +) -> Result<(Core, impl Future<Item = (), Error = Error>)> +where + P: AsRef<Path>, + T: From<TunnelStateTransition> + Send + 'static, +{ + let reactor = Core::new().chain_err(|| ErrorKind::ReactorError)?; + let state_machine = TunnelStateMachine::new(&cache_dir, commands)?; + + let future = state_machine.for_each(move |state_change_event| { + state_change_listener + .send(state_change_event) + .chain_err(|| "Failed to send state change event to listener") + }); + + Ok((reactor, future)) +} + +/// Representation of external commands for the tunnel state machine. +pub enum TunnelCommand { + /// Enable or disable LAN access in the firewall. + AllowLan(bool), + /// Open tunnel connection. + Connect(TunnelParameters), + /// Close tunnel connection. + Disconnect, +} + +/// Information necessary to open a tunnel. +#[derive(Debug, PartialEq)] +pub struct TunnelParameters { + /// Tunnel enpoint to connect to. + pub endpoint: TunnelEndpoint, + /// Tunnel connection options. + pub options: TunnelOptions, + /// Directory to store tunnel log file. + pub log_dir: Option<PathBuf>, + /// Resource directory path. + pub resource_dir: PathBuf, + /// Username to use for setting up the tunnel. + pub username: String, + /// Should LAN access be allowed outside the tunnel. + pub allow_lan: bool, +} + +/// Event resulting from a transition to a new tunnel state. +#[derive(Clone, Copy, Debug, PartialEq)] +pub enum TunnelStateTransition { + /// No connection is established and network is unsecured. + Disconnected, + /// Network is secured but tunnel is still connecting. + Connecting, + /// Tunnel is connected. + Connected, + /// Disconnecting tunnel. + Disconnecting, +} + +/// Asynchronous handling of the tunnel state machine. +/// +/// This type implements `Stream`, and attempts to advance the state machine based on the events +/// received on the commands stream and possibly on events that specific states are also listening +/// to. Every time it successfully advances the state machine a `TunnelStateTransition` is emitted +/// by the stream. +struct TunnelStateMachine { + current_state: Option<TunnelStateWrapper>, + commands: mpsc::UnboundedReceiver<TunnelCommand>, + shared_values: SharedTunnelStateValues, +} + +impl TunnelStateMachine { + fn new<P: AsRef<Path>>( + cache_dir: P, + commands: mpsc::UnboundedReceiver<TunnelCommand>, + ) -> Result<Self> { + let security = + NetworkSecurityImpl::new(cache_dir).chain_err(|| ErrorKind::NetworkSecurityError)?; + let mut shared_values = SharedTunnelStateValues { security }; + + let initial_state = TunnelStateWrapper::new(&mut shared_values, ()) + .expect("Failed to create initial tunnel state"); + + Ok(TunnelStateMachine { + current_state: Some(initial_state), + commands, + shared_values, + }) + } +} + +impl Stream for TunnelStateMachine { + type Item = TunnelStateTransition; + type Error = Error; + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + while let Some(state_wrapper) = self.current_state.take() { + match state_wrapper.handle_event(&mut self.commands, &mut self.shared_values) { + TunnelStateMachineAction::Repeat(repeat_state_wrapper) => { + self.current_state = Some(repeat_state_wrapper); + } + TunnelStateMachineAction::Notify(state_wrapper, result) => { + self.current_state = state_wrapper; + return result; + } + } + } + Ok(Async::Ready(None)) + } +} + +/// Action the state machine should take, which is discovered base on an event consequence. +/// +/// The action can be to execute another iteration or to notify that something happened. Executing +/// another iteration happens when an event is received and ignored, which causes the tunnel state +/// machine to stay in the same state. The state machine can notify its caller that a state +/// transition has occurred, that it has finished, or that it has paused to wait for new events. +enum TunnelStateMachineAction { + Repeat(TunnelStateWrapper), + Notify( + Option<TunnelStateWrapper>, + Poll<Option<TunnelStateTransition>, Error>, + ), +} + +impl<T: TunnelState> From<EventConsequence<T>> for TunnelStateMachineAction { + fn from(event_consequence: EventConsequence<T>) -> Self { + use self::EventConsequence::*; + use self::TunnelStateMachineAction::*; + + match event_consequence { + NewState(Ok(state_wrapper)) | NewState(Err((_, state_wrapper))) => { + let transition = state_wrapper.info(); + Notify(Some(state_wrapper), Ok(Async::Ready(Some(transition)))) + } + SameState(state) => Repeat(state.into()), + NoEvents(state) => Notify(Some(state.into()), Ok(Async::NotReady)), + Finished => Notify(None, Ok(Async::Ready(None))), + } + } +} + +/// Values that are common to all tunnel states. +struct SharedTunnelStateValues { + security: NetworkSecurityImpl, +} + +/// Asynchronous result of an attempt to progress a state. +enum EventConsequence<T: TunnelState> { + /// Transition to a new state. + NewState(StateEntryResult), + /// An event was received, but it was ignored by the state so no transition is performed. + SameState(T), + /// No events were received, the event loop should block until one becomes available. + NoEvents(T), + /// The state machine has finished its execution. + Finished, +} + +impl<T> EventConsequence<T> +where + T: TunnelState, +{ + /// Helper method to chain handling multiple different event types. + /// + /// The `handle_event` is only called if no events were handled so far. + pub fn or_else<F>(self, handle_event: F, shared_values: &mut SharedTunnelStateValues) -> Self + where + F: FnOnce(T, &mut SharedTunnelStateValues) -> Self, + { + use self::EventConsequence::*; + + match self { + NoEvents(state) => handle_event(state, shared_values), + consequence => consequence, + } + } +} + +/// Result of entering a `T: TunnelState`. +/// +/// It is either the state itself when successful, or an error paired with a fallback state. +type StateEntryResult = ::std::result::Result<TunnelStateWrapper, (Error, TunnelStateWrapper)>; + +/// Trait that contains the method all states should implement to handle an event and advance the +/// state machine. +trait TunnelState: Into<TunnelStateWrapper> + Sized { + /// Type representing extra information required for entering the state. + type Bootstrap; + + /// Constructor function. + /// + /// This is the state entry point. It attempts to enter the state, and may fail by entering an + /// error or fallback state instead. + fn enter( + shared_values: &mut SharedTunnelStateValues, + bootstrap: Self::Bootstrap, + ) -> StateEntryResult; + + /// Main state function. + /// + /// This is state exit point. It consumes itself and returns the next state to advance to when + /// it has completed, or itself if it wants to ignore a received event or if no events were + /// ready to be received. See [`EventConsequence`] for more details. + /// + /// An implementation can handle events from many sources, but it should also handle command + /// events received through the provided `commands` stream. + /// + /// [`EventConsequence`]: enum.EventConsequence.html + fn handle_event( + self, + commands: &mut mpsc::UnboundedReceiver<TunnelCommand>, + shared_values: &mut SharedTunnelStateValues, + ) -> EventConsequence<Self>; +} + +/// Valid states of the tunnel. +/// +/// All implementations must implement `TunnelState` so that they can handle events and +/// commands in order to advance the state machine. +enum TunnelStateWrapper { + Disconnected(DisconnectedState), + Connecting(ConnectingState), + Connected(ConnectedState), + Disconnecting(DisconnectingState), +} + +impl TunnelStateWrapper { + fn new( + shared_values: &mut SharedTunnelStateValues, + bootstrap: <DisconnectedState as TunnelState>::Bootstrap, + ) -> StateEntryResult { + DisconnectedState::enter(shared_values, bootstrap) + } + + fn handle_event( + self, + commands: &mut mpsc::UnboundedReceiver<TunnelCommand>, + shared_values: &mut SharedTunnelStateValues, + ) -> TunnelStateMachineAction { + macro_rules! handle_event { + ( $($state:ident),* $(,)* ) => { + match self { + $( + TunnelStateWrapper::$state(state) => { + let event_consequence = state.handle_event(commands, shared_values); + TunnelStateMachineAction::from(event_consequence) + } + )* + } + } + } + + handle_event! { + Disconnected, + Connecting, + Connected, + Disconnecting, + } + } + + /// Returns information describing the state. + fn info(&self) -> TunnelStateTransition { + match *self { + TunnelStateWrapper::Disconnected(_) => TunnelStateTransition::Disconnected, + TunnelStateWrapper::Connecting(_) => TunnelStateTransition::Connecting, + TunnelStateWrapper::Connected(_) => TunnelStateTransition::Connected, + TunnelStateWrapper::Disconnecting(_) => TunnelStateTransition::Disconnecting, + } + } +} + +macro_rules! impl_from_for_tunnel_state { + ($state_variant:ident($state_type:ident)) => { + impl From<$state_type> for TunnelStateWrapper { + fn from(state: $state_type) -> Self { + TunnelStateWrapper::$state_variant(state) + } + } + }; +} + +impl_from_for_tunnel_state!(Disconnected(DisconnectedState)); +impl_from_for_tunnel_state!(Connecting(ConnectingState)); +impl_from_for_tunnel_state!(Connected(ConnectedState)); +impl_from_for_tunnel_state!(Disconnecting(DisconnectingState)); + +impl Debug for TunnelStateWrapper { + fn fmt(&self, formatter: &mut Formatter) -> FmtResult { + use self::TunnelStateWrapper::*; + + match *self { + Disconnected(_) => write!(formatter, "TunnelStateWrapper::Disconnected(_)"), + Connecting(_) => write!(formatter, "TunnelStateWrapper::Connecting(_)"), + Connected(_) => write!(formatter, "TunnelStateWrapper::Connected(_)"), + Disconnecting(_) => write!(formatter, "TunnelStateWrapper::Disconnecting(_)"), + } + } +} |
