diff options
| author | Janito Vaqueiro Ferreira Filho <janito@mullvad.net> | 2018-08-24 08:01:50 -0300 |
|---|---|---|
| committer | Janito Vaqueiro Ferreira Filho <janito@mullvad.net> | 2018-08-24 08:01:50 -0300 |
| commit | dbf0d70aa8d69d3300bc56d89b24e839654199e9 (patch) | |
| tree | e1219fe458d2d175eb1782aca2a790f52cbba6c0 | |
| parent | 69a5d4b11c5324f744664aa4c5c47d9c637d9d2c (diff) | |
| parent | 698079af57a5e1f71793d54c8f0a05894ba29565 (diff) | |
| download | mullvadvpn-dbf0d70aa8d69d3300bc56d89b24e839654199e9.tar.xz mullvadvpn-dbf0d70aa8d69d3300bc56d89b24e839654199e9.zip | |
Merge branch 'daemon-state-refactor'
| -rw-r--r-- | mullvad-daemon/src/main.rs | 440 | ||||
| -rw-r--r-- | mullvad-daemon/src/management_interface.rs | 60 | ||||
| -rw-r--r-- | mullvad-daemon/src/tunnel_state_machine/connected_state.rs | 183 | ||||
| -rw-r--r-- | mullvad-daemon/src/tunnel_state_machine/connecting_state.rs | 294 | ||||
| -rw-r--r-- | mullvad-daemon/src/tunnel_state_machine/disconnected_state.rs | 49 | ||||
| -rw-r--r-- | mullvad-daemon/src/tunnel_state_machine/disconnecting_state.rs | 104 | ||||
| -rw-r--r-- | mullvad-daemon/src/tunnel_state_machine/macros.rs | 21 | ||||
| -rw-r--r-- | mullvad-daemon/src/tunnel_state_machine/mod.rs | 389 | ||||
| -rw-r--r-- | talpid-core/src/tunnel/mod.rs | 2 |
9 files changed, 1218 insertions, 324 deletions
diff --git a/mullvad-daemon/src/main.rs b/mullvad-daemon/src/main.rs index d1e620f3df..08f8c34146 100644 --- a/mullvad-daemon/src/main.rs +++ b/mullvad-daemon/src/main.rs @@ -55,14 +55,17 @@ mod rpc_uniqueness_check; mod settings; mod shutdown; mod system_service; +mod tunnel_state_machine; mod version; use error_chain::ChainedError; -use futures::Future; +use futures::sync::mpsc::UnboundedSender; +use futures::{Future, Sink}; use jsonrpc_core::futures::sync::oneshot::Sender as OneshotSender; -use management_interface::{BoxFuture, ManagementInterfaceServer, TunnelCommand}; +use management_interface::{BoxFuture, ManagementCommand, ManagementInterfaceServer}; use mullvad_rpc::{AccountsProxy, AppVersionProxy, HttpHandle}; +use tunnel_state_machine::{TunnelCommand, TunnelParameters, TunnelStateTransition}; use mullvad_types::account::{AccountData, AccountToken}; use mullvad_types::location::GeoIpLocation; @@ -71,17 +74,14 @@ use mullvad_types::relay_list::{Relay, RelayList}; use mullvad_types::states::{DaemonState, SecurityState, TargetState}; use mullvad_types::version::{AppVersion, AppVersionInfo}; -use std::io; use std::net::IpAddr; use std::path::PathBuf; -use std::sync::{mpsc, Arc, Mutex}; -use std::thread; -use std::time::{Duration, Instant}; +use std::sync::mpsc; +use std::time::Duration; +use std::{mem, thread}; -use talpid_core::firewall::{Firewall, FirewallProxy, SecurityPolicy}; use talpid_core::mpsc::IntoSender; -use talpid_core::tunnel::{self, TunnelEvent, TunnelMetadata, TunnelMonitor}; -use talpid_types::net::{TunnelEndpoint, TunnelEndpointData, TunnelOptions}; +use talpid_types::net::TunnelOptions; error_chain!{ @@ -96,22 +96,10 @@ error_chain!{ DaemonIsAlreadyRunning { description("Another instance of the daemon is already running") } - /// The client is in the wrong state for the requested operation. Optimally the code should - /// be written in such a way so such states can't exist. - InvalidState { - description("Client is in an invalid state for the requested operation") - } - TunnelError(msg: &'static str) { - description("Error in the tunnel monitor") - display("Tunnel monitor error: {}", msg) - } ManagementInterfaceError(msg: &'static str) { description("Error in the management interface") display("Management interface error: {}", msg) } - FirewallError { - description("Firewall error") - } InvalidSettings(msg: &'static str) { description("Invalid settings") display("Invalid Settings: {}", msg) @@ -120,80 +108,91 @@ error_chain!{ description("Found no valid relays to connect to") } } -} -static MIN_TUNNEL_ALIVE_TIME: Duration = Duration::from_millis(1000); + links { + TunnelError(tunnel_state_machine::Error, tunnel_state_machine::ErrorKind); + } +} const DAEMON_LOG_FILENAME: &str = "daemon.log"; -const OPENVPN_LOG_FILENAME: &str = "openvpn.log"; -const WIREGUARD_LOG_FILENAME: &str = "wireguard.log"; -#[cfg(windows)] -const TUNNEL_INTERFACE_ALIAS: &str = "Mullvad"; +type SyncUnboundedSender<T> = ::futures::sink::Wait<UnboundedSender<T>>; /// All events that can happen in the daemon. Sent from various threads and exposed interfaces. pub enum DaemonEvent { - /// An event coming from the tunnel software to indicate a change in state. - TunnelEvent(TunnelEvent), - /// Triggered by the thread waiting for the tunnel process. Means the tunnel process - /// exited. - TunnelExited(tunnel::Result<()>), - /// Triggered by the thread waiting for a tunnel close operation to complete. Contains the - /// result of trying to kill the tunnel. - TunnelKillResult(io::Result<()>), + /// Tunnel has changed state. + TunnelStateTransition(TunnelStateTransition), /// An event coming from the JSONRPC-2.0 management interface. - ManagementInterfaceEvent(TunnelCommand), + ManagementInterfaceEvent(ManagementCommand), /// Triggered if the server hosting the JSONRPC-2.0 management interface dies unexpectedly. ManagementInterfaceExited(talpid_ipc::Result<()>), /// Daemon shutdown triggered by a signal, ctrl-c or similar. TriggerShutdown, } -impl From<TunnelEvent> for DaemonEvent { - fn from(tunnel_event: TunnelEvent) -> Self { - DaemonEvent::TunnelEvent(tunnel_event) +impl From<TunnelStateTransition> for DaemonEvent { + fn from(tunnel_state_transition: TunnelStateTransition) -> Self { + DaemonEvent::TunnelStateTransition(tunnel_state_transition) } } -impl From<TunnelCommand> for DaemonEvent { - fn from(tunnel_command: TunnelCommand) -> Self { - DaemonEvent::ManagementInterfaceEvent(tunnel_command) +impl From<ManagementCommand> for DaemonEvent { + fn from(command: ManagementCommand) -> Self { + DaemonEvent::ManagementInterfaceEvent(command) } } -/// Represents the internal state of the actual tunnel. -// TODO(linus): Put the tunnel::CloseHandle into this state, so it can't exist when not running. -#[derive(Debug, Copy, Clone, Eq, PartialEq)] -pub enum TunnelState { - /// No tunnel is running. - NotRunning, - /// The tunnel has been started, but it is not established/functional. - Connecting, - /// The tunnel is up and working. - Connected, - /// This state is active from when we manually trigger a tunnel kill until the tunnel wait - /// operation (TunnelExit) returned. +#[derive(Clone, Debug, Eq, PartialEq)] +enum DaemonExecutionState { + Running, Exiting, + Finished, } -impl TunnelState { - pub fn as_security_state(&self) -> SecurityState { - use TunnelState::*; - match *self { - NotRunning | Connecting => SecurityState::Unsecured, - Connected | Exiting => SecurityState::Secured, +impl DaemonExecutionState { + pub fn shutdown(&mut self, tunnel_state: TunnelStateTransition) { + use self::DaemonExecutionState::*; + + match self { + Running => { + match tunnel_state { + TunnelStateTransition::Disconnected => mem::replace(self, Finished), + _ => mem::replace(self, Exiting), + }; + } + Exiting | Finished => {} + }; + } + + pub fn disconnected(&mut self) { + use self::DaemonExecutionState::*; + + match self { + Exiting => { + mem::replace(self, Finished); + } + Running | Finished => {} + }; + } + + pub fn is_running(&mut self) -> bool { + use self::DaemonExecutionState::*; + + match self { + Running => true, + Exiting | Finished => false, } } } struct Daemon { - state: TunnelState, - // The tunnel_close_handle must only exist in the Connecting and Connected states! - tunnel_close_handle: Option<tunnel::CloseHandle>, + tunnel_command_tx: SyncUnboundedSender<TunnelCommand>, + tunnel_state: TunnelStateTransition, + security_state: SecurityState, last_broadcasted_state: DaemonState, target_state: TargetState, - shutdown: bool, + state: DaemonExecutionState, rx: mpsc::Receiver<DaemonEvent>, tx: mpsc::Sender<DaemonEvent>, management_interface_broadcaster: management_interface::EventBroadcaster, @@ -203,10 +202,7 @@ struct Daemon { https_handle: mullvad_rpc::rest::RequestSender, tokio_remote: tokio_core::reactor::Remote, relay_selector: relays::RelaySelector, - firewall: FirewallProxy, current_relay: Option<Relay>, - tunnel_endpoint: Option<TunnelEndpoint>, - tunnel_metadata: Option<TunnelMetadata>, log_dir: Option<PathBuf>, resource_dir: PathBuf, } @@ -240,19 +236,23 @@ impl Daemon { relays::RelaySelector::new(rpc_handle.clone(), &resource_dir, &cache_dir); let (tx, rx) = mpsc::channel(); + let tunnel_command_tx = + tunnel_state_machine::spawn(cache_dir.clone(), IntoSender::from(tx.clone()))?; + + let target_state = TargetState::Unsecured; let management_interface_broadcaster = Self::start_management_interface(tx.clone(), cache_dir.clone())?; - let state = TunnelState::NotRunning; - let target_state = TargetState::Unsecured; + Ok(Daemon { - state, - tunnel_close_handle: None, + tunnel_command_tx: Sink::wait(tunnel_command_tx), + tunnel_state: TunnelStateTransition::Disconnected, + security_state: SecurityState::Unsecured, target_state, last_broadcasted_state: DaemonState { - state: state.as_security_state(), + state: SecurityState::Unsecured, target_state, }, - shutdown: false, + state: DaemonExecutionState::Running, rx, tx, management_interface_broadcaster, @@ -262,10 +262,7 @@ impl Daemon { https_handle, tokio_remote, relay_selector, - firewall: FirewallProxy::new(&cache_dir).chain_err(|| ErrorKind::FirewallError)?, current_relay: None, - tunnel_endpoint: None, - tunnel_metadata: None, log_dir, resource_dir, }) @@ -285,7 +282,7 @@ impl Daemon { } fn start_management_interface_server( - event_tx: IntoSender<TunnelCommand, DaemonEvent>, + event_tx: IntoSender<ManagementCommand, DaemonEvent>, cache_dir: PathBuf, ) -> Result<ManagementInterfaceServer> { let shared_secret = uuid::Uuid::new_v4().to_string(); @@ -323,7 +320,7 @@ impl Daemon { } while let Ok(event) = self.rx.recv() { self.handle_event(event)?; - if self.shutdown && self.state == TunnelState::NotRunning { + if self.state == DaemonExecutionState::Finished { break; } } @@ -333,49 +330,38 @@ impl Daemon { fn handle_event(&mut self, event: DaemonEvent) -> Result<()> { use DaemonEvent::*; match event { - TunnelEvent(event) => self.handle_tunnel_event(event), - TunnelExited(result) => self.handle_tunnel_exited(result), - TunnelKillResult(result) => self.handle_tunnel_kill_result(result), + TunnelStateTransition(transition) => self.handle_tunnel_state_transition(transition), ManagementInterfaceEvent(event) => self.handle_management_interface_event(event), ManagementInterfaceExited(result) => self.handle_management_interface_exited(result), TriggerShutdown => self.handle_trigger_shutdown_event(), } } - fn handle_tunnel_event(&mut self, tunnel_event: TunnelEvent) -> Result<()> { - debug!("Tunnel event: {:?}", tunnel_event); - if self.state == TunnelState::Connecting { - if let TunnelEvent::Up(metadata) = tunnel_event { - self.tunnel_metadata = Some(metadata); - self.set_security_policy()?; - self.set_state(TunnelState::Connected) - } else { - Ok(()) - } - } else if self.state == TunnelState::Connected && tunnel_event == TunnelEvent::Down { - self.kill_tunnel() - } else { - Ok(()) - } - } + fn handle_tunnel_state_transition( + &mut self, + tunnel_state: TunnelStateTransition, + ) -> Result<()> { + use self::TunnelStateTransition::*; - fn handle_tunnel_exited(&mut self, result: tunnel::Result<()>) -> Result<()> { - if let Err(e) = result.chain_err(|| "Tunnel exited in an unexpected way") { - error!("{}", e.display_chain()); + debug!("New tunnel state: {:?}", tunnel_state); + + if tunnel_state == Disconnected { + self.state.disconnected(); } - self.current_relay = None; - self.tunnel_endpoint = None; - self.tunnel_metadata = None; - self.tunnel_close_handle = None; - self.set_state(TunnelState::NotRunning) - } - fn handle_tunnel_kill_result(&mut self, result: io::Result<()>) -> Result<()> { - result.chain_err(|| "Error while trying to close tunnel") + self.tunnel_state = tunnel_state; + self.security_state = match tunnel_state { + Disconnected | Connecting => SecurityState::Unsecured, + Connected | Disconnecting => SecurityState::Secured, + }; + + self.broadcast_state(); + + Ok(()) } - fn handle_management_interface_event(&mut self, event: TunnelCommand) -> Result<()> { - use TunnelCommand::*; + fn handle_management_interface_event(&mut self, event: ManagementCommand) -> Result<()> { + use ManagementCommand::*; match event { SetTargetState(state) => self.on_set_target_state(state), GetState(tx) => Ok(self.on_get_state(tx)), @@ -402,7 +388,7 @@ impl Daemon { } fn on_set_target_state(&mut self, new_target_state: TargetState) -> Result<()> { - if !self.shutdown { + if self.state.is_running() { self.set_target_state(new_target_state) } else { warn!("Ignoring target state change request due to shutdown"); @@ -465,11 +451,9 @@ impl Daemon { match save_result.chain_err(|| "Unable to save settings") { Ok(account_changed) => { Self::oneshot_send(tx, (), "set_account response"); - let tunnel_needs_restart = - self.state == TunnelState::Connecting || self.state == TunnelState::Connected; - if account_changed && tunnel_needs_restart { + if account_changed { info!("Initiating tunnel restart because the account token changed"); - self.kill_tunnel()?; + self.connect_tunnel()?; } } Err(e) => error!("{}", e.display_chain()), @@ -515,12 +499,9 @@ impl Daemon { Ok(changed) => { Self::oneshot_send(tx, (), "update_relay_settings response"); - let tunnel_needs_restart = - self.state == TunnelState::Connecting || self.state == TunnelState::Connected; - - if changed && tunnel_needs_restart { + if changed { info!("Initiating tunnel restart because the relay settings changed"); - self.kill_tunnel()?; + self.connect_tunnel()?; } } Err(e) => error!("{}", e.display_chain()), @@ -537,8 +518,10 @@ impl Daemon { let save_result = self.settings.set_allow_lan(allow_lan); match save_result.chain_err(|| "Unable to save settings") { Ok(settings_changed) => { - if settings_changed && self.target_state == TargetState::Secured { - self.set_security_policy()?; + if settings_changed { + self.tunnel_command_tx + .send(TunnelCommand::AllowLan(allow_lan)) + .expect("Tunnel state machine has stopped"); } Self::oneshot_send(tx, (), "set_allow_lan response"); } @@ -592,12 +575,9 @@ impl Daemon { Ok(settings_changed) => { Self::oneshot_send(tx, (), "set_openvpn_enable_ipv6 response"); - let tunnel_needs_restart = - self.state == TunnelState::Connecting || self.state == TunnelState::Connected; - - if settings_changed && tunnel_needs_restart { + if settings_changed { info!("Initiating tunnel restart because the enable IPv6 setting changed"); - self.kill_tunnel()?; + self.connect_tunnel()?; } } Err(e) => error!("{}", e.display_chain()), @@ -627,29 +607,15 @@ impl Daemon { } fn handle_trigger_shutdown_event(&mut self) -> Result<()> { - self.shutdown = true; - self.set_target_state(TargetState::Unsecured) - } + self.state.shutdown(self.tunnel_state); + self.disconnect_tunnel(); - /// Update the state of the client. If it changed, notify the subscribers and trigger - /// appropriate actions. - fn set_state(&mut self, new_state: TunnelState) -> Result<()> { - if new_state != self.state { - debug!("State {:?} => {:?}", self.state, new_state); - self.state = new_state; - self.broadcast_state(); - self.verify_state_consistency()?; - self.apply_target_state() - } else { - // Calling set_state with the same state we already have is an error. Should try to - // mitigate this possibility completely with a better state machine later. - Err(ErrorKind::InvalidState.into()) - } + Ok(()) } fn broadcast_state(&mut self) { let new_daemon_state = DaemonState { - state: self.state.as_security_state(), + state: self.security_state, target_state: self.target_state, }; if self.last_broadcasted_state != new_daemon_state { @@ -659,21 +625,6 @@ impl Daemon { } } - // Check that the current state is valid and consistent. - fn verify_state_consistency(&self) -> Result<()> { - use TunnelState::*; - ensure!( - match self.state { - NotRunning => self.tunnel_close_handle.is_none(), - Connecting => self.tunnel_close_handle.is_some(), - Connected => self.tunnel_close_handle.is_some(), - Exiting => self.tunnel_close_handle.is_none(), - }, - ErrorKind::InvalidState - ); - Ok(()) - } - /// Set the target state of the client. If it changed trigger the operations needed to /// progress towards that state. fn set_target_state(&mut self, new_state: TargetState) -> Result<()> { @@ -688,139 +639,66 @@ impl Daemon { } fn apply_target_state(&mut self) -> Result<()> { - match (self.target_state, self.state) { - (TargetState::Secured, TunnelState::NotRunning) => { + match self.target_state { + TargetState::Secured => { debug!("Triggering tunnel start"); - if let Err(e) = self.start_tunnel().chain_err(|| "Failed to start tunnel") { + if let Err(e) = self.connect_tunnel().chain_err(|| "Failed to start tunnel") { error!("{}", e.display_chain()); self.current_relay = None; - self.tunnel_endpoint = None; self.management_interface_broadcaster.notify_error(&e); self.set_target_state(TargetState::Unsecured)?; } - Ok(()) } - (TargetState::Unsecured, TunnelState::NotRunning) => self.reset_security_policy(), - (TargetState::Unsecured, TunnelState::Connecting) - | (TargetState::Unsecured, TunnelState::Connected) => self.kill_tunnel(), - (..) => Ok(()), + TargetState::Unsecured => self.disconnect_tunnel(), } + + Ok(()) } - fn start_tunnel(&mut self) -> Result<()> { - ensure!( - self.target_state == TargetState::Secured && self.state == TunnelState::NotRunning, - ErrorKind::InvalidState - ); + fn connect_tunnel(&mut self) -> Result<()> { + let parameters = self.build_tunnel_parameters()?; - match self.settings.get_relay_settings() { - RelaySettings::CustomTunnelEndpoint(custom_relay) => { - let tunnel_endpoint = custom_relay - .to_tunnel_endpoint() - .chain_err(|| ErrorKind::NoRelay)?; - self.tunnel_endpoint = Some(tunnel_endpoint); - } + self.tunnel_command_tx + .send(TunnelCommand::Connect(parameters)) + .expect("Tunnel state machine has stopped"); + + Ok(()) + } + + fn disconnect_tunnel(&mut self) { + self.tunnel_command_tx + .send(TunnelCommand::Disconnect) + .expect("Tunnel state machine has stopped"); + } + + fn build_tunnel_parameters(&mut self) -> Result<TunnelParameters> { + let endpoint = match self.settings.get_relay_settings() { + RelaySettings::CustomTunnelEndpoint(custom_relay) => custom_relay + .to_tunnel_endpoint() + .chain_err(|| ErrorKind::NoRelay)?, RelaySettings::Normal(constraints) => { let (relay, tunnel_endpoint) = self .relay_selector .get_tunnel_endpoint(&constraints) .chain_err(|| ErrorKind::NoRelay)?; - self.tunnel_endpoint = Some(tunnel_endpoint); self.current_relay = Some(relay); + tunnel_endpoint } - } + }; let account_token = self .settings .get_account_token() .ok_or(ErrorKind::InvalidSettings("No account token"))?; - self.set_security_policy()?; - - let tunnel_monitor = - self.spawn_tunnel_monitor(self.tunnel_endpoint.unwrap(), &account_token)?; - self.tunnel_close_handle = Some(tunnel_monitor.close_handle()); - self.spawn_tunnel_monitor_wait_thread(tunnel_monitor); - - self.set_state(TunnelState::Connecting)?; - Ok(()) - } - - fn spawn_tunnel_monitor( - &self, - tunnel_endpoint: TunnelEndpoint, - account_token: &str, - ) -> Result<TunnelMonitor> { - // Must wrap the channel in a Mutex because TunnelMonitor forces the closure to be Sync - let event_tx = Arc::new(Mutex::new(self.tx.clone())); - let on_tunnel_event = move |event| { - let _ = event_tx - .lock() - .unwrap() - .send(DaemonEvent::TunnelEvent(event)); - }; - - let tunnel_log = if let Some(ref log_dir) = self.log_dir { - let filename = match tunnel_endpoint.tunnel { - TunnelEndpointData::OpenVpn(_) => OPENVPN_LOG_FILENAME, - TunnelEndpointData::Wireguard(_) => WIREGUARD_LOG_FILENAME, - }; - let tunnel_log = log_dir.join(filename); - logging::rotate_log(&tunnel_log).chain_err(|| "Unable to rotate tunnel log")?; - Some(tunnel_log) - } else { - None - }; - - let tunnel_options = self.settings.get_tunnel_options(); - let tunnel_alias = { - #[cfg(windows)] - { - Some(TUNNEL_INTERFACE_ALIAS.into()) - } - #[cfg(not(windows))] - { - None - } - }; - TunnelMonitor::new( - tunnel_endpoint, - &tunnel_options, - tunnel_alias, + Ok(TunnelParameters { + endpoint, + options: self.settings.get_tunnel_options().clone(), + log_dir: self.log_dir.clone(), + resource_dir: self.resource_dir.clone(), account_token, - tunnel_log.as_ref().map(PathBuf::as_path), - &self.resource_dir, - on_tunnel_event, - ).chain_err(|| ErrorKind::TunnelError("Unable to start tunnel monitor")) - } - - fn spawn_tunnel_monitor_wait_thread(&self, tunnel_monitor: TunnelMonitor) { - let error_tx = self.tx.clone(); - thread::spawn(move || { - let start = Instant::now(); - let result = tunnel_monitor.wait(); - if let Some(sleep_dur) = MIN_TUNNEL_ALIVE_TIME.checked_sub(start.elapsed()) { - thread::sleep(sleep_dur); - } - let _ = error_tx.send(DaemonEvent::TunnelExited(result)); - trace!("Tunnel monitor thread exit"); - }); - } - - fn kill_tunnel(&mut self) -> Result<()> { - ensure!( - self.state == TunnelState::Connecting || self.state == TunnelState::Connected, - ErrorKind::InvalidState - ); - let close_handle = self.tunnel_close_handle.take().unwrap(); - self.set_state(TunnelState::Exiting)?; - let result_tx = self.tx.clone(); - thread::spawn(move || { - let result = close_handle.close(); - let _ = result_tx.send(DaemonEvent::TunnelKillResult(result)); - trace!("Tunnel kill thread exit"); - }); - Ok(()) + allow_lan: self.settings.get_allow_lan(), + }) } pub fn shutdown_handle(&self) -> DaemonShutdownHandle { @@ -828,32 +706,6 @@ impl Daemon { tx: self.tx.clone(), } } - - fn set_security_policy(&mut self) -> Result<()> { - let policy = match (self.tunnel_endpoint, self.tunnel_metadata.as_ref()) { - (Some(relay), None) => SecurityPolicy::Connecting { - relay_endpoint: relay.to_endpoint(), - allow_lan: self.settings.get_allow_lan(), - }, - (Some(relay), Some(tunnel_metadata)) => SecurityPolicy::Connected { - relay_endpoint: relay.to_endpoint(), - tunnel: tunnel_metadata.clone(), - allow_lan: self.settings.get_allow_lan(), - }, - _ => bail!(ErrorKind::InvalidState), - }; - debug!("Set security policy: {:?}", policy); - self.firewall - .apply_policy(policy) - .chain_err(|| ErrorKind::FirewallError) - } - - fn reset_security_policy(&mut self) -> Result<()> { - debug!("Reset security policy"); - self.firewall - .reset_policy() - .chain_err(|| ErrorKind::FirewallError) - } } struct DaemonShutdownHandle { diff --git a/mullvad-daemon/src/management_interface.rs b/mullvad-daemon/src/management_interface.rs index 2a77650a52..3f2d920180 100644 --- a/mullvad-daemon/src/management_interface.rs +++ b/mullvad-daemon/src/management_interface.rs @@ -167,7 +167,7 @@ build_rpc_trait! { /// Enum representing commands coming in on the management interface. -pub enum TunnelCommand { +pub enum ManagementCommand { /// Change target state. SetTargetState(TargetState), /// Request the current state. @@ -224,12 +224,12 @@ pub struct ManagementInterfaceServer { impl ManagementInterfaceServer { pub fn start<T>( - tunnel_tx: IntoSender<TunnelCommand, T>, + tunnel_tx: IntoSender<ManagementCommand, T>, shared_secret: String, cache_dir: PathBuf, ) -> talpid_ipc::Result<Self> where - T: From<TunnelCommand> + 'static + Send, + T: From<ManagementCommand> + 'static + Send, { let rpc = ManagementInterface::new(tunnel_tx, shared_secret, cache_dir); let subscriptions = rpc.subscriptions.clone(); @@ -297,16 +297,16 @@ impl EventBroadcaster { } } -struct ManagementInterface<T: From<TunnelCommand> + 'static + Send> { +struct ManagementInterface<T: From<ManagementCommand> + 'static + Send> { subscriptions: Arc<ActiveSubscriptions>, - tx: Mutex<IntoSender<TunnelCommand, T>>, + tx: Mutex<IntoSender<ManagementCommand, T>>, shared_secret: String, cache_dir: PathBuf, } -impl<T: From<TunnelCommand> + 'static + Send> ManagementInterface<T> { +impl<T: From<ManagementCommand> + 'static + Send> ManagementInterface<T> { pub fn new( - tx: IntoSender<TunnelCommand, T>, + tx: IntoSender<ManagementCommand, T>, shared_secret: String, cache_dir: PathBuf, ) -> Self { @@ -354,7 +354,7 @@ impl<T: From<TunnelCommand> + 'static + Send> ManagementInterface<T> { } /// Sends a command to the daemon and maps the error to an RPC error. - fn send_command_to_daemon(&self, command: TunnelCommand) -> BoxFuture<(), Error> { + fn send_command_to_daemon(&self, command: ManagementCommand) -> BoxFuture<(), Error> { Box::new( future::result(self.tx.lock().unwrap().send(command)) .map_err(|_| Error::internal_error()), @@ -408,7 +408,9 @@ macro_rules! try_future { }; } -impl<T: From<TunnelCommand> + 'static + Send> ManagementInterfaceApi for ManagementInterface<T> { +impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi + for ManagementInterface<T> +{ type Metadata = Meta; fn auth(&self, meta: Self::Metadata, shared_secret: String) -> BoxFuture<(), Error> { @@ -431,7 +433,7 @@ impl<T: From<TunnelCommand> + 'static + Send> ManagementInterfaceApi for Managem try_future!(self.check_auth(&meta)); let (tx, rx) = sync::oneshot::channel(); let future = self - .send_command_to_daemon(TunnelCommand::GetAccountData(tx, account_token)) + .send_command_to_daemon(ManagementCommand::GetAccountData(tx, account_token)) .and_then(|_| rx.map_err(|_| Error::internal_error())) .and_then(|rpc_future| { rpc_future.map_err(|error: mullvad_rpc::Error| { @@ -450,7 +452,7 @@ impl<T: From<TunnelCommand> + 'static + Send> ManagementInterfaceApi for Managem try_future!(self.check_auth(&meta)); let (tx, rx) = sync::oneshot::channel(); let future = self - .send_command_to_daemon(TunnelCommand::GetRelayLocations(tx)) + .send_command_to_daemon(ManagementCommand::GetRelayLocations(tx)) .and_then(|_| rx.map_err(|_| Error::internal_error())); Box::new(future) } @@ -464,7 +466,7 @@ impl<T: From<TunnelCommand> + 'static + Send> ManagementInterfaceApi for Managem try_future!(self.check_auth(&meta)); let (tx, rx) = sync::oneshot::channel(); let future = self - .send_command_to_daemon(TunnelCommand::SetAccount(tx, account_token.clone())) + .send_command_to_daemon(ManagementCommand::SetAccount(tx, account_token.clone())) .and_then(|_| rx.map_err(|_| Error::internal_error())); if let Some(new_account_token) = account_token { @@ -486,7 +488,7 @@ impl<T: From<TunnelCommand> + 'static + Send> ManagementInterfaceApi for Managem try_future!(self.check_auth(&meta)); let (tx, rx) = sync::oneshot::channel(); let future = self - .send_command_to_daemon(TunnelCommand::GetAccount(tx)) + .send_command_to_daemon(ManagementCommand::GetAccount(tx)) .and_then(|_| rx.map_err(|_| Error::internal_error())); Box::new(future) } @@ -500,7 +502,7 @@ impl<T: From<TunnelCommand> + 'static + Send> ManagementInterfaceApi for Managem try_future!(self.check_auth(&meta)); let (tx, rx) = sync::oneshot::channel(); - let message = TunnelCommand::UpdateRelaySettings(tx, constraints_update); + let message = ManagementCommand::UpdateRelaySettings(tx, constraints_update); let future = self .send_command_to_daemon(message) .and_then(|_| rx.map_err(|_| Error::internal_error())); @@ -512,7 +514,7 @@ impl<T: From<TunnelCommand> + 'static + Send> ManagementInterfaceApi for Managem try_future!(self.check_auth(&meta)); let (tx, rx) = sync::oneshot::channel(); let future = self - .send_command_to_daemon(TunnelCommand::GetRelaySettings(tx)) + .send_command_to_daemon(ManagementCommand::GetRelaySettings(tx)) .and_then(|_| rx.map_err(|_| Error::internal_error())); Box::new(future) } @@ -522,7 +524,7 @@ impl<T: From<TunnelCommand> + 'static + Send> ManagementInterfaceApi for Managem try_future!(self.check_auth(&meta)); let (tx, rx) = sync::oneshot::channel(); let future = self - .send_command_to_daemon(TunnelCommand::SetAllowLan(tx, allow_lan)) + .send_command_to_daemon(ManagementCommand::SetAllowLan(tx, allow_lan)) .and_then(|_| rx.map_err(|_| Error::internal_error())); Box::new(future) } @@ -532,7 +534,7 @@ impl<T: From<TunnelCommand> + 'static + Send> ManagementInterfaceApi for Managem try_future!(self.check_auth(&meta)); let (tx, rx) = sync::oneshot::channel(); let future = self - .send_command_to_daemon(TunnelCommand::GetAllowLan(tx)) + .send_command_to_daemon(ManagementCommand::GetAllowLan(tx)) .and_then(|_| rx.map_err(|_| Error::internal_error())); Box::new(future) } @@ -542,7 +544,7 @@ impl<T: From<TunnelCommand> + 'static + Send> ManagementInterfaceApi for Managem try_future!(self.check_auth(&meta)); let (tx, rx) = sync::oneshot::channel(); let future = self - .send_command_to_daemon(TunnelCommand::SetAutoConnect(tx, auto_connect)) + .send_command_to_daemon(ManagementCommand::SetAutoConnect(tx, auto_connect)) .and_then(|_| rx.map_err(|_| Error::internal_error())); Box::new(future) } @@ -552,7 +554,7 @@ impl<T: From<TunnelCommand> + 'static + Send> ManagementInterfaceApi for Managem try_future!(self.check_auth(&meta)); let (tx, rx) = sync::oneshot::channel(); let future = self - .send_command_to_daemon(TunnelCommand::GetAutoConnect(tx)) + .send_command_to_daemon(ManagementCommand::GetAutoConnect(tx)) .and_then(|_| rx.map_err(|_| Error::internal_error())); Box::new(future) } @@ -560,13 +562,13 @@ impl<T: From<TunnelCommand> + 'static + Send> ManagementInterfaceApi for Managem fn connect(&self, meta: Self::Metadata) -> BoxFuture<(), Error> { trace!("connect"); try_future!(self.check_auth(&meta)); - self.send_command_to_daemon(TunnelCommand::SetTargetState(TargetState::Secured)) + self.send_command_to_daemon(ManagementCommand::SetTargetState(TargetState::Secured)) } fn disconnect(&self, meta: Self::Metadata) -> BoxFuture<(), Error> { trace!("disconnect"); try_future!(self.check_auth(&meta)); - self.send_command_to_daemon(TunnelCommand::SetTargetState(TargetState::Unsecured)) + self.send_command_to_daemon(ManagementCommand::SetTargetState(TargetState::Unsecured)) } fn get_state(&self, meta: Self::Metadata) -> BoxFuture<DaemonState, Error> { @@ -574,7 +576,7 @@ impl<T: From<TunnelCommand> + 'static + Send> ManagementInterfaceApi for Managem try_future!(self.check_auth(&meta)); let (state_tx, state_rx) = sync::oneshot::channel(); let future = self - .send_command_to_daemon(TunnelCommand::GetState(state_tx)) + .send_command_to_daemon(ManagementCommand::GetState(state_tx)) .and_then(|_| state_rx.map_err(|_| Error::internal_error())); Box::new(future) } @@ -584,7 +586,7 @@ impl<T: From<TunnelCommand> + 'static + Send> ManagementInterfaceApi for Managem try_future!(self.check_auth(&meta)); let (tx, rx) = sync::oneshot::channel(); let future = self - .send_command_to_daemon(TunnelCommand::GetCurrentLocation(tx)) + .send_command_to_daemon(ManagementCommand::GetCurrentLocation(tx)) .and_then(|_| rx.map_err(|_| Error::internal_error())); Box::new(future) } @@ -592,7 +594,7 @@ impl<T: From<TunnelCommand> + 'static + Send> ManagementInterfaceApi for Managem fn shutdown(&self, meta: Self::Metadata) -> BoxFuture<(), Error> { trace!("shutdown"); try_future!(self.check_auth(&meta)); - self.send_command_to_daemon(TunnelCommand::Shutdown) + self.send_command_to_daemon(ManagementCommand::Shutdown) } fn get_account_history(&self, meta: Self::Metadata) -> BoxFuture<Vec<AccountToken>, Error> { @@ -637,7 +639,7 @@ impl<T: From<TunnelCommand> + 'static + Send> ManagementInterfaceApi for Managem try_future!(self.check_auth(&meta)); let (tx, rx) = sync::oneshot::channel(); let future = self - .send_command_to_daemon(TunnelCommand::SetOpenVpnMssfix(tx, mssfix)) + .send_command_to_daemon(ManagementCommand::SetOpenVpnMssfix(tx, mssfix)) .and_then(|_| rx.map_err(|_| Error::internal_error())); Box::new(future) @@ -652,7 +654,7 @@ impl<T: From<TunnelCommand> + 'static + Send> ManagementInterfaceApi for Managem try_future!(self.check_auth(&meta)); let (tx, rx) = sync::oneshot::channel(); let future = self - .send_command_to_daemon(TunnelCommand::SetOpenVpnEnableIpv6(tx, enable_ipv6)) + .send_command_to_daemon(ManagementCommand::SetOpenVpnEnableIpv6(tx, enable_ipv6)) .and_then(|_| rx.map_err(|_| Error::internal_error())); Box::new(future) @@ -663,7 +665,7 @@ impl<T: From<TunnelCommand> + 'static + Send> ManagementInterfaceApi for Managem try_future!(self.check_auth(&meta)); let (tx, rx) = sync::oneshot::channel(); let future = self - .send_command_to_daemon(TunnelCommand::GetTunnelOptions(tx)) + .send_command_to_daemon(ManagementCommand::GetTunnelOptions(tx)) .and_then(|_| rx.map_err(|_| Error::internal_error())); Box::new(future) } @@ -672,7 +674,7 @@ impl<T: From<TunnelCommand> + 'static + Send> ManagementInterfaceApi for Managem try_future!(self.check_auth(&meta)); let (tx, rx) = sync::oneshot::channel(); let future = self - .send_command_to_daemon(TunnelCommand::GetCurrentVersion(tx)) + .send_command_to_daemon(ManagementCommand::GetCurrentVersion(tx)) .and_then(|_| rx.map_err(|_| Error::internal_error())); Box::new(future) @@ -682,7 +684,7 @@ impl<T: From<TunnelCommand> + 'static + Send> ManagementInterfaceApi for Managem try_future!(self.check_auth(&meta)); let (tx, rx) = sync::oneshot::channel(); let future = self - .send_command_to_daemon(TunnelCommand::GetVersionInfo(tx)) + .send_command_to_daemon(ManagementCommand::GetVersionInfo(tx)) .and_then(|_| rx.map_err(|_| Error::internal_error())) .and_then(|version_future| { version_future.map_err(|error| { diff --git a/mullvad-daemon/src/tunnel_state_machine/connected_state.rs b/mullvad-daemon/src/tunnel_state_machine/connected_state.rs new file mode 100644 index 0000000000..332608f055 --- /dev/null +++ b/mullvad-daemon/src/tunnel_state_machine/connected_state.rs @@ -0,0 +1,183 @@ +use futures::sync::{mpsc, oneshot}; +use futures::{Async, Future, Stream}; + +use talpid_core::firewall::{Firewall, SecurityPolicy}; +use talpid_core::tunnel::{CloseHandle, TunnelEvent, TunnelMetadata}; +use talpid_types::net::TunnelEndpoint; + +use super::{ + AfterDisconnect, ConnectingState, DisconnectingState, EventConsequence, Result, ResultExt, + SharedTunnelStateValues, StateEntryResult, TunnelCommand, TunnelParameters, TunnelState, + TunnelStateWrapper, +}; + +pub struct ConnectedStateBootstrap { + pub metadata: TunnelMetadata, + pub tunnel_events: mpsc::UnboundedReceiver<TunnelEvent>, + pub tunnel_endpoint: TunnelEndpoint, + pub tunnel_parameters: TunnelParameters, + pub tunnel_close_event: oneshot::Receiver<()>, + pub close_handle: CloseHandle, +} + +/// The tunnel is up and working. +pub struct ConnectedState { + metadata: TunnelMetadata, + tunnel_events: mpsc::UnboundedReceiver<TunnelEvent>, + tunnel_endpoint: TunnelEndpoint, + tunnel_parameters: TunnelParameters, + tunnel_close_event: oneshot::Receiver<()>, + close_handle: CloseHandle, +} + +impl ConnectedState { + fn from(bootstrap: ConnectedStateBootstrap) -> Self { + ConnectedState { + metadata: bootstrap.metadata, + tunnel_events: bootstrap.tunnel_events, + tunnel_endpoint: bootstrap.tunnel_endpoint, + tunnel_parameters: bootstrap.tunnel_parameters, + tunnel_close_event: bootstrap.tunnel_close_event, + close_handle: bootstrap.close_handle, + } + } + + fn set_security_policy(&self, shared_values: &mut SharedTunnelStateValues) -> Result<()> { + let policy = SecurityPolicy::Connected { + relay_endpoint: self.tunnel_endpoint.to_endpoint(), + tunnel: self.metadata.clone(), + allow_lan: self.tunnel_parameters.allow_lan, + }; + + debug!("Set security policy: {:?}", policy); + shared_values + .firewall + .apply_policy(policy) + .chain_err(|| "Failed to apply security policy for connected state") + } + + fn handle_commands( + mut self, + commands: &mut mpsc::UnboundedReceiver<TunnelCommand>, + shared_values: &mut SharedTunnelStateValues, + ) -> EventConsequence<Self> { + use self::EventConsequence::*; + + match try_handle_event!(self, commands.poll()) { + Ok(TunnelCommand::Connect(parameters)) => { + if parameters != self.tunnel_parameters { + NewState(DisconnectingState::enter( + shared_values, + ( + self.close_handle, + self.tunnel_close_event, + AfterDisconnect::Reconnect(parameters), + ), + )) + } else { + SameState(self) + } + } + Ok(TunnelCommand::Disconnect) | Err(_) => NewState(DisconnectingState::enter( + shared_values, + ( + self.close_handle, + self.tunnel_close_event, + AfterDisconnect::Nothing, + ), + )), + Ok(TunnelCommand::AllowLan(allow_lan)) => { + self.tunnel_parameters.allow_lan = allow_lan; + + match self.set_security_policy(shared_values) { + Ok(()) => SameState(self), + Err(error) => { + error!("{}", error.chain_err(|| "Failed to update security policy")); + NewState(DisconnectingState::enter( + shared_values, + ( + self.close_handle, + self.tunnel_close_event, + AfterDisconnect::Nothing, + ), + )) + } + } + } + } + } + + fn handle_tunnel_events( + mut self, + shared_values: &mut SharedTunnelStateValues, + ) -> EventConsequence<Self> { + use self::EventConsequence::*; + + match try_handle_event!(self, self.tunnel_events.poll()) { + Ok(TunnelEvent::Down) | Err(_) => NewState(DisconnectingState::enter( + shared_values, + ( + self.close_handle, + self.tunnel_close_event, + AfterDisconnect::Reconnect(self.tunnel_parameters), + ), + )), + Ok(_) => SameState(self), + } + } + + fn handle_tunnel_close_event( + mut self, + shared_values: &mut SharedTunnelStateValues, + ) -> EventConsequence<Self> { + use self::EventConsequence::*; + + match self.tunnel_close_event.poll() { + Ok(Async::Ready(_)) => {} + Ok(Async::NotReady) => return NoEvents(self), + Err(_cancelled) => warn!("Tunnel monitor thread has stopped unexpectedly"), + } + + info!("Tunnel closed. Reconnecting."); + NewState(ConnectingState::enter( + shared_values, + self.tunnel_parameters, + )) + } +} + +impl TunnelState for ConnectedState { + type Bootstrap = ConnectedStateBootstrap; + + fn enter( + shared_values: &mut SharedTunnelStateValues, + bootstrap: Self::Bootstrap, + ) -> StateEntryResult { + let connected_state = ConnectedState::from(bootstrap); + + match connected_state.set_security_policy(shared_values) { + Ok(()) => Ok(TunnelStateWrapper::from(connected_state)), + Err(error) => Err(( + error, + DisconnectingState::enter( + shared_values, + ( + connected_state.close_handle, + connected_state.tunnel_close_event, + AfterDisconnect::Nothing, + ), + ).expect("Failed to disconnect after failed transition to connected state"), + )), + } + } + + fn handle_event( + self, + commands: &mut mpsc::UnboundedReceiver<TunnelCommand>, + shared_values: &mut SharedTunnelStateValues, + ) -> EventConsequence<Self> { + self.handle_commands(commands, shared_values) + .or_else(Self::handle_tunnel_events, shared_values) + .or_else(Self::handle_tunnel_close_event, shared_values) + } +} diff --git a/mullvad-daemon/src/tunnel_state_machine/connecting_state.rs b/mullvad-daemon/src/tunnel_state_machine/connecting_state.rs new file mode 100644 index 0000000000..a564982527 --- /dev/null +++ b/mullvad-daemon/src/tunnel_state_machine/connecting_state.rs @@ -0,0 +1,294 @@ +use std::ffi::OsString; +use std::path::PathBuf; +use std::sync::Mutex; +use std::thread; +use std::time::{Duration, Instant}; + +use futures::sink::Wait; +use futures::sync::{mpsc, oneshot}; +use futures::{Async, Future, Sink, Stream}; + +use talpid_core::firewall::{Firewall, SecurityPolicy}; +use talpid_core::tunnel::{CloseHandle, TunnelEvent, TunnelMetadata, TunnelMonitor}; +use talpid_types::net::{TunnelEndpoint, TunnelEndpointData}; + +use super::{ + AfterDisconnect, ConnectedState, ConnectedStateBootstrap, DisconnectedState, + DisconnectingState, EventConsequence, Result, ResultExt, SharedTunnelStateValues, + StateEntryResult, TunnelCommand, TunnelParameters, TunnelState, TunnelStateWrapper, +}; +use logging; + +const MIN_TUNNEL_ALIVE_TIME: Duration = Duration::from_millis(1000); + +const OPENVPN_LOG_FILENAME: &str = "openvpn.log"; +const WIREGUARD_LOG_FILENAME: &str = "wireguard.log"; + +#[cfg(windows)] +const TUNNEL_INTERFACE_ALIAS: Option<&str> = Some("Mullvad"); +#[cfg(not(windows))] +const TUNNEL_INTERFACE_ALIAS: Option<&str> = None; + +/// The tunnel has been started, but it is not established/functional. +pub struct ConnectingState { + tunnel_events: mpsc::UnboundedReceiver<TunnelEvent>, + tunnel_endpoint: TunnelEndpoint, + tunnel_parameters: TunnelParameters, + tunnel_close_event: oneshot::Receiver<()>, + close_handle: CloseHandle, +} + +impl ConnectingState { + fn new( + shared_values: &mut SharedTunnelStateValues, + parameters: TunnelParameters, + ) -> Result<Self> { + Self::set_security_policy(shared_values, parameters.endpoint, parameters.allow_lan)?; + + let tunnel_endpoint = parameters.endpoint; + let (tunnel_events, tunnel_close_event, close_handle) = Self::start_tunnel(¶meters)?; + + Ok(ConnectingState { + tunnel_events, + tunnel_endpoint, + tunnel_parameters: parameters, + tunnel_close_event, + close_handle, + }) + } + + fn set_security_policy( + shared_values: &mut SharedTunnelStateValues, + endpoint: TunnelEndpoint, + allow_lan: bool, + ) -> Result<()> { + let policy = SecurityPolicy::Connecting { + relay_endpoint: endpoint.to_endpoint(), + allow_lan, + }; + + debug!("Set security policy: {:?}", policy); + shared_values + .firewall + .apply_policy(policy) + .chain_err(|| "Failed to apply security policy for connecting state") + } + + fn start_tunnel( + parameters: &TunnelParameters, + ) -> Result<( + mpsc::UnboundedReceiver<TunnelEvent>, + oneshot::Receiver<()>, + CloseHandle, + )> { + let (event_tx, event_rx) = mpsc::unbounded(); + let monitor = Self::spawn_tunnel_monitor(¶meters, event_tx.wait())?; + let close_handle = monitor.close_handle(); + let tunnel_close_event = Self::spawn_tunnel_monitor_wait_thread(monitor); + + Ok((event_rx, tunnel_close_event, close_handle)) + } + + fn spawn_tunnel_monitor( + parameters: &TunnelParameters, + events: Wait<mpsc::UnboundedSender<TunnelEvent>>, + ) -> Result<TunnelMonitor> { + let event_tx = Mutex::new(events); + let on_tunnel_event = move |event| { + let send_result = event_tx + .lock() + .expect("A thread panicked while sending a tunnel event") + .send(event); + + if send_result.is_err() { + warn!("Tunnel state machine stopped before tunnel event was received"); + } + }; + let log_file = Self::prepare_tunnel_log_file(¶meters)?; + + TunnelMonitor::new( + parameters.endpoint, + ¶meters.options, + TUNNEL_INTERFACE_ALIAS.to_owned().map(OsString::from), + ¶meters.account_token, + log_file.as_ref().map(PathBuf::as_path), + ¶meters.resource_dir, + on_tunnel_event, + ).chain_err(|| "Unable to start tunnel monitor") + } + + fn prepare_tunnel_log_file(parameters: &TunnelParameters) -> Result<Option<PathBuf>> { + if let Some(ref log_dir) = parameters.log_dir { + let filename = match parameters.endpoint.tunnel { + TunnelEndpointData::OpenVpn(_) => OPENVPN_LOG_FILENAME, + TunnelEndpointData::Wireguard(_) => WIREGUARD_LOG_FILENAME, + }; + let tunnel_log = log_dir.join(filename); + logging::rotate_log(&tunnel_log).chain_err(|| "Unable to rotate tunnel log")?; + Ok(Some(tunnel_log)) + } else { + Ok(None) + } + } + + fn spawn_tunnel_monitor_wait_thread(tunnel_monitor: TunnelMonitor) -> oneshot::Receiver<()> { + let (tunnel_close_event_tx, tunnel_close_event_rx) = oneshot::channel(); + + thread::spawn(move || { + let start = Instant::now(); + + match tunnel_monitor.wait() { + Ok(_) => debug!("Tunnel has finished without errors"), + Err(error) => { + let chained_error = error.chain_err(|| "Tunnel has stopped unexpectedly"); + warn!("{}", chained_error); + } + } + + if let Some(remaining_time) = MIN_TUNNEL_ALIVE_TIME.checked_sub(start.elapsed()) { + thread::sleep(remaining_time); + } + + if tunnel_close_event_tx.send(()).is_err() { + warn!("Tunnel state machine stopped before receiving tunnel closed event"); + } + + trace!("Tunnel monitor thread exit"); + }); + + tunnel_close_event_rx + } + + fn into_connected_state_bootstrap(self, metadata: TunnelMetadata) -> ConnectedStateBootstrap { + ConnectedStateBootstrap { + metadata, + tunnel_events: self.tunnel_events, + tunnel_endpoint: self.tunnel_endpoint, + tunnel_parameters: self.tunnel_parameters, + tunnel_close_event: self.tunnel_close_event, + close_handle: self.close_handle, + } + } + + fn handle_commands( + mut self, + commands: &mut mpsc::UnboundedReceiver<TunnelCommand>, + shared_values: &mut SharedTunnelStateValues, + ) -> EventConsequence<Self> { + use self::EventConsequence::*; + + match try_handle_event!(self, commands.poll()) { + Ok(TunnelCommand::Connect(parameters)) => { + if parameters != self.tunnel_parameters { + NewState(DisconnectingState::enter( + shared_values, + ( + self.close_handle, + self.tunnel_close_event, + AfterDisconnect::Reconnect(parameters), + ), + )) + } else { + SameState(self) + } + } + Ok(TunnelCommand::Disconnect) | Err(_) => NewState(DisconnectingState::enter( + shared_values, + ( + self.close_handle, + self.tunnel_close_event, + AfterDisconnect::Nothing, + ), + )), + Ok(TunnelCommand::AllowLan(allow_lan)) => { + self.tunnel_parameters.allow_lan = allow_lan; + match Self::set_security_policy(shared_values, self.tunnel_endpoint, allow_lan) { + Ok(()) => SameState(self), + Err(error) => { + error!("{}", error.chain_err(|| "Failed to update security policy")); + NewState(DisconnectingState::enter( + shared_values, + ( + self.close_handle, + self.tunnel_close_event, + AfterDisconnect::Nothing, + ), + )) + } + } + } + } + } + + fn handle_tunnel_events( + mut self, + shared_values: &mut SharedTunnelStateValues, + ) -> EventConsequence<Self> { + use self::EventConsequence::*; + + match try_handle_event!(self, self.tunnel_events.poll()) { + Ok(TunnelEvent::Up(metadata)) => NewState(ConnectedState::enter( + shared_values, + self.into_connected_state_bootstrap(metadata), + )), + Ok(_) => SameState(self), + Err(_) => NewState(DisconnectingState::enter( + shared_values, + ( + self.close_handle, + self.tunnel_close_event, + AfterDisconnect::Reconnect(self.tunnel_parameters), + ), + )), + } + } + + fn handle_tunnel_close_event( + mut self, + shared_values: &mut SharedTunnelStateValues, + ) -> EventConsequence<Self> { + use self::EventConsequence::*; + + match self.tunnel_close_event.poll() { + Ok(Async::Ready(_)) => {} + Ok(Async::NotReady) => return NoEvents(self), + Err(_cancelled) => warn!("Tunnel monitor thread has stopped unexpectedly"), + } + + info!("Tunnel closed. Reconnecting."); + NewState(ConnectingState::enter( + shared_values, + self.tunnel_parameters, + )) + } +} + +impl TunnelState for ConnectingState { + type Bootstrap = TunnelParameters; + + fn enter( + shared_values: &mut SharedTunnelStateValues, + parameters: Self::Bootstrap, + ) -> StateEntryResult { + Self::new(shared_values, parameters) + .map(TunnelStateWrapper::from) + .chain_err(|| "Failed to start tunnel") + .map_err(|error| { + ( + error, + DisconnectedState::enter(shared_values, ()) + .expect("Failed to transition to fallback disconnected state"), + ) + }) + } + + fn handle_event( + self, + commands: &mut mpsc::UnboundedReceiver<TunnelCommand>, + shared_values: &mut SharedTunnelStateValues, + ) -> EventConsequence<Self> { + self.handle_commands(commands, shared_values) + .or_else(Self::handle_tunnel_events, shared_values) + .or_else(Self::handle_tunnel_close_event, shared_values) + } +} diff --git a/mullvad-daemon/src/tunnel_state_machine/disconnected_state.rs b/mullvad-daemon/src/tunnel_state_machine/disconnected_state.rs new file mode 100644 index 0000000000..20abcc7ec2 --- /dev/null +++ b/mullvad-daemon/src/tunnel_state_machine/disconnected_state.rs @@ -0,0 +1,49 @@ +use error_chain::ChainedError; +use futures::sync::mpsc; +use futures::Stream; + +use talpid_core::firewall::Firewall; + +use super::{ + ConnectingState, Error, EventConsequence, SharedTunnelStateValues, StateEntryResult, + TunnelCommand, TunnelState, TunnelStateWrapper, +}; + +/// No tunnel is running. +pub struct DisconnectedState; + +impl DisconnectedState { + fn reset_security_policy(shared_values: &mut SharedTunnelStateValues) { + debug!("Reset security policy"); + if let Err(error) = shared_values.firewall.reset_policy() { + let chained_error = Error::with_chain(error, "Failed to reset security policy"); + error!("{}", chained_error.display_chain()); + } + } +} + +impl TunnelState for DisconnectedState { + type Bootstrap = (); + + fn enter(shared_values: &mut SharedTunnelStateValues, _: Self::Bootstrap) -> StateEntryResult { + Self::reset_security_policy(shared_values); + + Ok(TunnelStateWrapper::from(DisconnectedState)) + } + + fn handle_event( + self, + commands: &mut mpsc::UnboundedReceiver<TunnelCommand>, + shared_values: &mut SharedTunnelStateValues, + ) -> EventConsequence<Self> { + use self::EventConsequence::*; + + match try_handle_event!(self, commands.poll()) { + Ok(TunnelCommand::Connect(parameters)) => { + NewState(ConnectingState::enter(shared_values, parameters)) + } + Ok(_) => SameState(self), + Err(_) => Finished, + } + } +} diff --git a/mullvad-daemon/src/tunnel_state_machine/disconnecting_state.rs b/mullvad-daemon/src/tunnel_state_machine/disconnecting_state.rs new file mode 100644 index 0000000000..9785264da1 --- /dev/null +++ b/mullvad-daemon/src/tunnel_state_machine/disconnecting_state.rs @@ -0,0 +1,104 @@ +use error_chain::ChainedError; +use futures::sync::{mpsc, oneshot}; +use futures::{Async, Future, Stream}; + +use talpid_core::tunnel::CloseHandle; + +use super::{ + ConnectingState, DisconnectedState, EventConsequence, ResultExt, SharedTunnelStateValues, + StateEntryResult, TunnelCommand, TunnelParameters, TunnelState, TunnelStateWrapper, +}; + +/// This state is active from when we manually trigger a tunnel kill until the tunnel wait +/// operation (TunnelExit) returned. +pub struct DisconnectingState { + exited: oneshot::Receiver<()>, + after_disconnect: AfterDisconnect, +} + +impl DisconnectingState { + fn handle_commands( + mut self, + commands: &mut mpsc::UnboundedReceiver<TunnelCommand>, + ) -> EventConsequence<Self> { + use self::AfterDisconnect::*; + + let event = try_handle_event!(self, commands.poll()); + let after_disconnect = self.after_disconnect; + + self.after_disconnect = match after_disconnect { + AfterDisconnect::Nothing => match event { + Ok(TunnelCommand::Connect(parameters)) => Reconnect(parameters), + _ => Nothing, + }, + AfterDisconnect::Reconnect(mut tunnel_parameters) => match event { + Ok(TunnelCommand::Connect(parameters)) => Reconnect(parameters), + Ok(TunnelCommand::AllowLan(allow_lan)) => { + tunnel_parameters.allow_lan = allow_lan; + Reconnect(tunnel_parameters) + } + Ok(TunnelCommand::Disconnect) | Err(_) => Nothing, + }, + }; + + EventConsequence::SameState(self) + } + + fn handle_exit_event( + mut self, + shared_values: &mut SharedTunnelStateValues, + ) -> EventConsequence<Self> { + use self::EventConsequence::*; + + match self.exited.poll() { + Ok(Async::NotReady) => NoEvents(self), + Ok(Async::Ready(_)) | Err(_) => NewState(self.after_disconnect(shared_values)), + } + } + + fn after_disconnect(self, shared_values: &mut SharedTunnelStateValues) -> StateEntryResult { + match self.after_disconnect { + AfterDisconnect::Nothing => DisconnectedState::enter(shared_values, ()), + AfterDisconnect::Reconnect(tunnel_parameters) => { + ConnectingState::enter(shared_values, tunnel_parameters) + } + } + } +} + +impl TunnelState for DisconnectingState { + type Bootstrap = (CloseHandle, oneshot::Receiver<()>, AfterDisconnect); + + fn enter( + _: &mut SharedTunnelStateValues, + (close_handle, exited, after_disconnect): Self::Bootstrap, + ) -> StateEntryResult { + let close_result = close_handle + .close() + .chain_err(|| "Failed to request tunnel monitor to close the tunnel"); + + if let Err(error) = close_result { + error!("{}", error.display_chain()); + } + + Ok(TunnelStateWrapper::from(DisconnectingState { + exited, + after_disconnect, + })) + } + + fn handle_event( + self, + commands: &mut mpsc::UnboundedReceiver<TunnelCommand>, + shared_values: &mut SharedTunnelStateValues, + ) -> EventConsequence<Self> { + self.handle_commands(commands) + .or_else(Self::handle_exit_event, shared_values) + } +} + +/// Which state should be transitioned to after disconnection is complete. +pub enum AfterDisconnect { + Nothing, + Reconnect(TunnelParameters), +} diff --git a/mullvad-daemon/src/tunnel_state_machine/macros.rs b/mullvad-daemon/src/tunnel_state_machine/macros.rs new file mode 100644 index 0000000000..2241c8cb06 --- /dev/null +++ b/mullvad-daemon/src/tunnel_state_machine/macros.rs @@ -0,0 +1,21 @@ +/// Try to receive an event from a `Stream`'s asynchronous poll expression. +/// +/// This macro is similar to the `try_ready!` macro provided in `futures`. If there is an event +/// ready, it will be returned wrapped in a `Result`. If there are no events ready to be received, +/// the outer function will return with a transition that indicates that no events were received, +/// which is analogous to `Async::NotReady`. +/// +/// When the asynchronous event indicates that the stream has finished or that it has failed, an +/// error type is returned so that either close scenario can be handled in a similar way. +macro_rules! try_handle_event { + ($same_state:expr, $event:expr) => { + match $event { + Ok(::futures::Async::Ready(Some(event))) => Ok(event), + Ok(::futures::Async::Ready(None)) => Err(None), + Ok(::futures::Async::NotReady) => { + return ::tunnel_state_machine::EventConsequence::NoEvents($same_state); + } + Err(error) => Err(Some(error)), + } + }; +} diff --git a/mullvad-daemon/src/tunnel_state_machine/mod.rs b/mullvad-daemon/src/tunnel_state_machine/mod.rs new file mode 100644 index 0000000000..1e84e41d67 --- /dev/null +++ b/mullvad-daemon/src/tunnel_state_machine/mod.rs @@ -0,0 +1,389 @@ +#[macro_use] +mod macros; + +mod connected_state; +mod connecting_state; +mod disconnected_state; +mod disconnecting_state; + +use std::fmt::{Debug, Formatter, Result as FmtResult}; +use std::path::{Path, PathBuf}; +use std::sync::mpsc as sync_mpsc; +use std::thread; + +use error_chain::ChainedError; +use futures::sync::mpsc; +use futures::{Async, Future, Poll, Stream}; +use tokio_core::reactor::Core; + +use mullvad_types::account::AccountToken; +use talpid_core::firewall::{Firewall, FirewallProxy}; +use talpid_core::mpsc::IntoSender; +use talpid_types::net::{TunnelEndpoint, TunnelOptions}; + +use self::connected_state::{ConnectedState, ConnectedStateBootstrap}; +use self::connecting_state::ConnectingState; +use self::disconnected_state::DisconnectedState; +use self::disconnecting_state::{AfterDisconnect, DisconnectingState}; + +error_chain! { + errors { + FirewallError { + description("Firewall error") + } + ReactorError { + description("Failed to initialize tunnel state machine event loop executor") + } + } +} + +/// Spawn the tunnel state machine thread, returning a channel for sending tunnel commands. +pub fn spawn<P, T>( + cache_dir: P, + state_change_listener: IntoSender<TunnelStateTransition, T>, +) -> Result<mpsc::UnboundedSender<TunnelCommand>> +where + P: AsRef<Path> + Send + 'static, + T: From<TunnelStateTransition> + Send + 'static, +{ + let (command_tx, command_rx) = mpsc::unbounded(); + let (startup_result_tx, startup_result_rx) = sync_mpsc::channel(); + + thread::spawn( + move || match create_event_loop(cache_dir, command_rx, state_change_listener) { + Ok((mut reactor, event_loop)) => { + startup_result_tx.send(Ok(())).expect( + "Tunnel state machine won't be started because the owner thread crashed", + ); + + if let Err(error) = reactor.run(event_loop) { + let chained_error = + Error::with_chain(error, "Tunnel state machine exited with an error"); + error!("{}", chained_error.display_chain()); + } + } + Err(startup_error) => { + startup_result_tx + .send(Err(startup_error)) + .expect("Failed to send startup error"); + } + }, + ); + + startup_result_rx + .recv() + .expect("Failed to start tunnel state machine thread") + .map(|_| command_tx) +} + +fn create_event_loop<P, T>( + cache_dir: P, + commands: mpsc::UnboundedReceiver<TunnelCommand>, + state_change_listener: IntoSender<TunnelStateTransition, T>, +) -> Result<(Core, impl Future<Item = (), Error = Error>)> +where + P: AsRef<Path>, + T: From<TunnelStateTransition> + Send + 'static, +{ + let reactor = Core::new().chain_err(|| ErrorKind::ReactorError)?; + let state_machine = TunnelStateMachine::new(&cache_dir, commands)?; + + let future = state_machine.for_each(move |state_change_event| { + state_change_listener + .send(state_change_event) + .chain_err(|| "Failed to send state change event to listener") + }); + + Ok((reactor, future)) +} + +/// Representation of external commands for the tunnel state machine. +pub enum TunnelCommand { + /// Enable or disable LAN access in the firewall. + AllowLan(bool), + /// Open tunnel connection. + Connect(TunnelParameters), + /// Close tunnel connection. + Disconnect, +} + +/// Information necessary to open a tunnel. +#[derive(Debug, PartialEq)] +pub struct TunnelParameters { + pub endpoint: TunnelEndpoint, + pub options: TunnelOptions, + pub log_dir: Option<PathBuf>, + pub resource_dir: PathBuf, + pub account_token: AccountToken, + pub allow_lan: bool, +} + +/// Event resulting from a transition to a new tunnel state. +#[derive(Clone, Copy, Debug, PartialEq)] +pub enum TunnelStateTransition { + Disconnected, + Connecting, + Connected, + Disconnecting, +} + +/// Asynchronous handling of the tunnel state machine. +/// +/// This type implements `Stream`, and attempts to advance the state machine based on the events +/// received on the commands stream and possibly on events that specific states are also listening +/// to. Every time it successfully advances the state machine a `TunnelStateTransition` is emitted +/// by the stream. +struct TunnelStateMachine { + current_state: Option<TunnelStateWrapper>, + commands: mpsc::UnboundedReceiver<TunnelCommand>, + shared_values: SharedTunnelStateValues, +} + +impl TunnelStateMachine { + fn new<P: AsRef<Path>>( + cache_dir: P, + commands: mpsc::UnboundedReceiver<TunnelCommand>, + ) -> Result<Self> { + let firewall = FirewallProxy::new(cache_dir).chain_err(|| ErrorKind::FirewallError)?; + let mut shared_values = SharedTunnelStateValues { firewall }; + + let initial_state = TunnelStateWrapper::enter(&mut shared_values, ()) + .expect("Failed to create initial tunnel state"); + + Ok(TunnelStateMachine { + current_state: Some(initial_state), + commands, + shared_values, + }) + } +} + +impl Stream for TunnelStateMachine { + type Item = TunnelStateTransition; + type Error = Error; + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + let mut state = match self.current_state.take() { + Some(state) => state, + None => { + // State machine has halted + return Ok(Async::Ready(None)); + } + }; + + loop { + let event_consequence = state.handle_event(&mut self.commands, &mut self.shared_values); + let action = TunnelStateMachineAction::from(event_consequence); + + match action { + TunnelStateMachineAction::Repeat(returned_state) => { + state = returned_state; + } + TunnelStateMachineAction::Notify(state, result) => { + self.current_state = state; + return result; + } + } + } + } +} + +/// Action the state machine should take, which is discovered base on an event consequence. +/// +/// The action can be to execute another iteration or to notify that something happened. Executing +/// another iteration happens when an event is received and ignored, which causes the tunnel state +/// machine to stay in the same state. The state machine can notify its caller that a state +/// transition has occurred, that it has finished, or that it has paused to wait for new events. +enum TunnelStateMachineAction { + Repeat(TunnelStateWrapper), + Notify( + Option<TunnelStateWrapper>, + Poll<Option<TunnelStateTransition>, Error>, + ), +} + +impl From<EventConsequence<TunnelStateWrapper>> for TunnelStateMachineAction { + fn from(event_consequence: EventConsequence<TunnelStateWrapper>) -> Self { + use self::EventConsequence::*; + use self::TunnelStateMachineAction::*; + + match event_consequence { + NewState(Ok(state)) | NewState(Err((_, state))) => { + let transition = state.info(); + + Notify(Some(state), Ok(Async::Ready(Some(transition)))) + } + SameState(state) => Repeat(state), + NoEvents(state) => Notify(Some(state), Ok(Async::NotReady)), + Finished => Notify(None, Ok(Async::Ready(None))), + } + } +} + +/// Values that are common to all tunnel states. +struct SharedTunnelStateValues { + firewall: FirewallProxy, +} + +/// Asynchronous result of an attempt to progress a state. +enum EventConsequence<T: TunnelState> { + /// Transition to a new state. + NewState(StateEntryResult), + /// An event was received, but it was ignored by the state so no transition is performed. + SameState(T), + /// No events were received, the event loop should block until one becomes available. + NoEvents(T), + /// The state machine has finished its execution. + Finished, +} + +impl<T> EventConsequence<T> +where + T: TunnelState, +{ + /// Helper method to chain handling multiple different event types. + /// + /// The `handle_event` is only called if no events were handled so far. + pub fn or_else<F>(self, handle_event: F, shared_values: &mut SharedTunnelStateValues) -> Self + where + F: FnOnce(T, &mut SharedTunnelStateValues) -> Self, + { + use self::EventConsequence::*; + + match self { + NoEvents(state) => handle_event(state, shared_values), + consequence => consequence, + } + } +} + +/// Result of entering a `T: TunnelState`. +/// +/// It is either the state itself when successful, or an error paired with a fallback state. +type StateEntryResult = ::std::result::Result<TunnelStateWrapper, (Error, TunnelStateWrapper)>; + +/// Trait that contains the method all states should implement to handle an event and advance the +/// state machine. +trait TunnelState: Into<TunnelStateWrapper> + Sized { + /// Type representing extra information required for entering the state. + type Bootstrap; + + /// Constructor function. + /// + /// This is the state entry point. It attempts to enter the state, and may fail by entering an + /// error or fallback state instead. + fn enter( + shared_values: &mut SharedTunnelStateValues, + bootstrap: Self::Bootstrap, + ) -> StateEntryResult; + + /// Main state function. + /// + /// This is state exit point. It consumes itself and returns the next state to advance to when + /// it has completed, or itself if it wants to ignore a received event or if no events were + /// ready to be received. See [`EventConsequence`] for more details. + /// + /// An implementation can handle events from many sources, but it should also handle command + /// events received through the provided `commands` stream. + /// + /// [`EventConsequence`]: enum.EventConsequence.html + fn handle_event( + self, + commands: &mut mpsc::UnboundedReceiver<TunnelCommand>, + shared_values: &mut SharedTunnelStateValues, + ) -> EventConsequence<Self>; +} + +/// Valid states of the tunnel. +/// +/// All implementations must implement `TunnelState` so that they can handle events and +/// commands in order to advance the state machine. +enum TunnelStateWrapper { + Disconnected(DisconnectedState), + Connecting(ConnectingState), + Connected(ConnectedState), + Disconnecting(DisconnectingState), +} + +impl TunnelStateWrapper { + /// Returns information describing the state. + fn info(&self) -> TunnelStateTransition { + match *self { + TunnelStateWrapper::Disconnected(_) => TunnelStateTransition::Disconnected, + TunnelStateWrapper::Connecting(_) => TunnelStateTransition::Connecting, + TunnelStateWrapper::Connected(_) => TunnelStateTransition::Connected, + TunnelStateWrapper::Disconnecting(_) => TunnelStateTransition::Disconnecting, + } + } +} + +macro_rules! impl_from_for_tunnel_state { + ($state_variant:ident($state_type:ident)) => { + impl From<$state_type> for TunnelStateWrapper { + fn from(state: $state_type) -> Self { + TunnelStateWrapper::$state_variant(state) + } + } + }; +} + +impl_from_for_tunnel_state!(Disconnected(DisconnectedState)); +impl_from_for_tunnel_state!(Connecting(ConnectingState)); +impl_from_for_tunnel_state!(Connected(ConnectedState)); +impl_from_for_tunnel_state!(Disconnecting(DisconnectingState)); + +impl TunnelState for TunnelStateWrapper { + type Bootstrap = <DisconnectedState as TunnelState>::Bootstrap; + + fn enter( + shared_values: &mut SharedTunnelStateValues, + bootstrap: Self::Bootstrap, + ) -> StateEntryResult { + DisconnectedState::enter(shared_values, bootstrap) + } + + fn handle_event( + self, + commands: &mut mpsc::UnboundedReceiver<TunnelCommand>, + shared_values: &mut SharedTunnelStateValues, + ) -> EventConsequence<TunnelStateWrapper> { + use self::EventConsequence::*; + + macro_rules! handle_event { + ( $($state:ident),* $(,)* ) => { + match self { + $( + TunnelStateWrapper::$state(state) => { + match state.handle_event(commands, shared_values) { + NewState(tunnel_state) => NewState(tunnel_state), + SameState(state) => SameState(TunnelStateWrapper::$state(state)), + NoEvents(state) => NoEvents(TunnelStateWrapper::$state(state)), + Finished => Finished, + } + } + )* + } + } + } + + handle_event! { + Disconnected, + Connecting, + Connected, + Disconnecting, + } + } +} + +impl Debug for TunnelStateWrapper { + fn fmt(&self, formatter: &mut Formatter) -> FmtResult { + use self::TunnelStateWrapper::*; + + match *self { + Disconnected(_) => write!(formatter, "TunnelStateWrapper::Disconnected(_)"), + Connecting(_) => write!(formatter, "TunnelStateWrapper::Connecting(_)"), + Connected(_) => write!(formatter, "TunnelStateWrapper::Connected(_)"), + Disconnecting(_) => write!(formatter, "TunnelStateWrapper::Disconnecting(_)"), + } + } +} diff --git a/talpid-core/src/tunnel/mod.rs b/talpid-core/src/tunnel/mod.rs index 932279c0a0..98d2d0e2d0 100644 --- a/talpid-core/src/tunnel/mod.rs +++ b/talpid-core/src/tunnel/mod.rs @@ -280,7 +280,7 @@ impl TunnelMonitor { CloseHandle(self.monitor.close_handle()) } - /// Consumes the monitor and block until the tunnel exits or there is an error. + /// Consumes the monitor and blocks until the tunnel exits or there is an error. pub fn wait(self) -> Result<()> { self.monitor .wait() |
