summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--mullvad-daemon/src/main.rs440
-rw-r--r--mullvad-daemon/src/management_interface.rs60
-rw-r--r--mullvad-daemon/src/tunnel_state_machine/connected_state.rs183
-rw-r--r--mullvad-daemon/src/tunnel_state_machine/connecting_state.rs294
-rw-r--r--mullvad-daemon/src/tunnel_state_machine/disconnected_state.rs49
-rw-r--r--mullvad-daemon/src/tunnel_state_machine/disconnecting_state.rs104
-rw-r--r--mullvad-daemon/src/tunnel_state_machine/macros.rs21
-rw-r--r--mullvad-daemon/src/tunnel_state_machine/mod.rs389
-rw-r--r--talpid-core/src/tunnel/mod.rs2
9 files changed, 1218 insertions, 324 deletions
diff --git a/mullvad-daemon/src/main.rs b/mullvad-daemon/src/main.rs
index d1e620f3df..08f8c34146 100644
--- a/mullvad-daemon/src/main.rs
+++ b/mullvad-daemon/src/main.rs
@@ -55,14 +55,17 @@ mod rpc_uniqueness_check;
mod settings;
mod shutdown;
mod system_service;
+mod tunnel_state_machine;
mod version;
use error_chain::ChainedError;
-use futures::Future;
+use futures::sync::mpsc::UnboundedSender;
+use futures::{Future, Sink};
use jsonrpc_core::futures::sync::oneshot::Sender as OneshotSender;
-use management_interface::{BoxFuture, ManagementInterfaceServer, TunnelCommand};
+use management_interface::{BoxFuture, ManagementCommand, ManagementInterfaceServer};
use mullvad_rpc::{AccountsProxy, AppVersionProxy, HttpHandle};
+use tunnel_state_machine::{TunnelCommand, TunnelParameters, TunnelStateTransition};
use mullvad_types::account::{AccountData, AccountToken};
use mullvad_types::location::GeoIpLocation;
@@ -71,17 +74,14 @@ use mullvad_types::relay_list::{Relay, RelayList};
use mullvad_types::states::{DaemonState, SecurityState, TargetState};
use mullvad_types::version::{AppVersion, AppVersionInfo};
-use std::io;
use std::net::IpAddr;
use std::path::PathBuf;
-use std::sync::{mpsc, Arc, Mutex};
-use std::thread;
-use std::time::{Duration, Instant};
+use std::sync::mpsc;
+use std::time::Duration;
+use std::{mem, thread};
-use talpid_core::firewall::{Firewall, FirewallProxy, SecurityPolicy};
use talpid_core::mpsc::IntoSender;
-use talpid_core::tunnel::{self, TunnelEvent, TunnelMetadata, TunnelMonitor};
-use talpid_types::net::{TunnelEndpoint, TunnelEndpointData, TunnelOptions};
+use talpid_types::net::TunnelOptions;
error_chain!{
@@ -96,22 +96,10 @@ error_chain!{
DaemonIsAlreadyRunning {
description("Another instance of the daemon is already running")
}
- /// The client is in the wrong state for the requested operation. Optimally the code should
- /// be written in such a way so such states can't exist.
- InvalidState {
- description("Client is in an invalid state for the requested operation")
- }
- TunnelError(msg: &'static str) {
- description("Error in the tunnel monitor")
- display("Tunnel monitor error: {}", msg)
- }
ManagementInterfaceError(msg: &'static str) {
description("Error in the management interface")
display("Management interface error: {}", msg)
}
- FirewallError {
- description("Firewall error")
- }
InvalidSettings(msg: &'static str) {
description("Invalid settings")
display("Invalid Settings: {}", msg)
@@ -120,80 +108,91 @@ error_chain!{
description("Found no valid relays to connect to")
}
}
-}
-static MIN_TUNNEL_ALIVE_TIME: Duration = Duration::from_millis(1000);
+ links {
+ TunnelError(tunnel_state_machine::Error, tunnel_state_machine::ErrorKind);
+ }
+}
const DAEMON_LOG_FILENAME: &str = "daemon.log";
-const OPENVPN_LOG_FILENAME: &str = "openvpn.log";
-const WIREGUARD_LOG_FILENAME: &str = "wireguard.log";
-#[cfg(windows)]
-const TUNNEL_INTERFACE_ALIAS: &str = "Mullvad";
+type SyncUnboundedSender<T> = ::futures::sink::Wait<UnboundedSender<T>>;
/// All events that can happen in the daemon. Sent from various threads and exposed interfaces.
pub enum DaemonEvent {
- /// An event coming from the tunnel software to indicate a change in state.
- TunnelEvent(TunnelEvent),
- /// Triggered by the thread waiting for the tunnel process. Means the tunnel process
- /// exited.
- TunnelExited(tunnel::Result<()>),
- /// Triggered by the thread waiting for a tunnel close operation to complete. Contains the
- /// result of trying to kill the tunnel.
- TunnelKillResult(io::Result<()>),
+ /// Tunnel has changed state.
+ TunnelStateTransition(TunnelStateTransition),
/// An event coming from the JSONRPC-2.0 management interface.
- ManagementInterfaceEvent(TunnelCommand),
+ ManagementInterfaceEvent(ManagementCommand),
/// Triggered if the server hosting the JSONRPC-2.0 management interface dies unexpectedly.
ManagementInterfaceExited(talpid_ipc::Result<()>),
/// Daemon shutdown triggered by a signal, ctrl-c or similar.
TriggerShutdown,
}
-impl From<TunnelEvent> for DaemonEvent {
- fn from(tunnel_event: TunnelEvent) -> Self {
- DaemonEvent::TunnelEvent(tunnel_event)
+impl From<TunnelStateTransition> for DaemonEvent {
+ fn from(tunnel_state_transition: TunnelStateTransition) -> Self {
+ DaemonEvent::TunnelStateTransition(tunnel_state_transition)
}
}
-impl From<TunnelCommand> for DaemonEvent {
- fn from(tunnel_command: TunnelCommand) -> Self {
- DaemonEvent::ManagementInterfaceEvent(tunnel_command)
+impl From<ManagementCommand> for DaemonEvent {
+ fn from(command: ManagementCommand) -> Self {
+ DaemonEvent::ManagementInterfaceEvent(command)
}
}
-/// Represents the internal state of the actual tunnel.
-// TODO(linus): Put the tunnel::CloseHandle into this state, so it can't exist when not running.
-#[derive(Debug, Copy, Clone, Eq, PartialEq)]
-pub enum TunnelState {
- /// No tunnel is running.
- NotRunning,
- /// The tunnel has been started, but it is not established/functional.
- Connecting,
- /// The tunnel is up and working.
- Connected,
- /// This state is active from when we manually trigger a tunnel kill until the tunnel wait
- /// operation (TunnelExit) returned.
+#[derive(Clone, Debug, Eq, PartialEq)]
+enum DaemonExecutionState {
+ Running,
Exiting,
+ Finished,
}
-impl TunnelState {
- pub fn as_security_state(&self) -> SecurityState {
- use TunnelState::*;
- match *self {
- NotRunning | Connecting => SecurityState::Unsecured,
- Connected | Exiting => SecurityState::Secured,
+impl DaemonExecutionState {
+ pub fn shutdown(&mut self, tunnel_state: TunnelStateTransition) {
+ use self::DaemonExecutionState::*;
+
+ match self {
+ Running => {
+ match tunnel_state {
+ TunnelStateTransition::Disconnected => mem::replace(self, Finished),
+ _ => mem::replace(self, Exiting),
+ };
+ }
+ Exiting | Finished => {}
+ };
+ }
+
+ pub fn disconnected(&mut self) {
+ use self::DaemonExecutionState::*;
+
+ match self {
+ Exiting => {
+ mem::replace(self, Finished);
+ }
+ Running | Finished => {}
+ };
+ }
+
+ pub fn is_running(&mut self) -> bool {
+ use self::DaemonExecutionState::*;
+
+ match self {
+ Running => true,
+ Exiting | Finished => false,
}
}
}
struct Daemon {
- state: TunnelState,
- // The tunnel_close_handle must only exist in the Connecting and Connected states!
- tunnel_close_handle: Option<tunnel::CloseHandle>,
+ tunnel_command_tx: SyncUnboundedSender<TunnelCommand>,
+ tunnel_state: TunnelStateTransition,
+ security_state: SecurityState,
last_broadcasted_state: DaemonState,
target_state: TargetState,
- shutdown: bool,
+ state: DaemonExecutionState,
rx: mpsc::Receiver<DaemonEvent>,
tx: mpsc::Sender<DaemonEvent>,
management_interface_broadcaster: management_interface::EventBroadcaster,
@@ -203,10 +202,7 @@ struct Daemon {
https_handle: mullvad_rpc::rest::RequestSender,
tokio_remote: tokio_core::reactor::Remote,
relay_selector: relays::RelaySelector,
- firewall: FirewallProxy,
current_relay: Option<Relay>,
- tunnel_endpoint: Option<TunnelEndpoint>,
- tunnel_metadata: Option<TunnelMetadata>,
log_dir: Option<PathBuf>,
resource_dir: PathBuf,
}
@@ -240,19 +236,23 @@ impl Daemon {
relays::RelaySelector::new(rpc_handle.clone(), &resource_dir, &cache_dir);
let (tx, rx) = mpsc::channel();
+ let tunnel_command_tx =
+ tunnel_state_machine::spawn(cache_dir.clone(), IntoSender::from(tx.clone()))?;
+
+ let target_state = TargetState::Unsecured;
let management_interface_broadcaster =
Self::start_management_interface(tx.clone(), cache_dir.clone())?;
- let state = TunnelState::NotRunning;
- let target_state = TargetState::Unsecured;
+
Ok(Daemon {
- state,
- tunnel_close_handle: None,
+ tunnel_command_tx: Sink::wait(tunnel_command_tx),
+ tunnel_state: TunnelStateTransition::Disconnected,
+ security_state: SecurityState::Unsecured,
target_state,
last_broadcasted_state: DaemonState {
- state: state.as_security_state(),
+ state: SecurityState::Unsecured,
target_state,
},
- shutdown: false,
+ state: DaemonExecutionState::Running,
rx,
tx,
management_interface_broadcaster,
@@ -262,10 +262,7 @@ impl Daemon {
https_handle,
tokio_remote,
relay_selector,
- firewall: FirewallProxy::new(&cache_dir).chain_err(|| ErrorKind::FirewallError)?,
current_relay: None,
- tunnel_endpoint: None,
- tunnel_metadata: None,
log_dir,
resource_dir,
})
@@ -285,7 +282,7 @@ impl Daemon {
}
fn start_management_interface_server(
- event_tx: IntoSender<TunnelCommand, DaemonEvent>,
+ event_tx: IntoSender<ManagementCommand, DaemonEvent>,
cache_dir: PathBuf,
) -> Result<ManagementInterfaceServer> {
let shared_secret = uuid::Uuid::new_v4().to_string();
@@ -323,7 +320,7 @@ impl Daemon {
}
while let Ok(event) = self.rx.recv() {
self.handle_event(event)?;
- if self.shutdown && self.state == TunnelState::NotRunning {
+ if self.state == DaemonExecutionState::Finished {
break;
}
}
@@ -333,49 +330,38 @@ impl Daemon {
fn handle_event(&mut self, event: DaemonEvent) -> Result<()> {
use DaemonEvent::*;
match event {
- TunnelEvent(event) => self.handle_tunnel_event(event),
- TunnelExited(result) => self.handle_tunnel_exited(result),
- TunnelKillResult(result) => self.handle_tunnel_kill_result(result),
+ TunnelStateTransition(transition) => self.handle_tunnel_state_transition(transition),
ManagementInterfaceEvent(event) => self.handle_management_interface_event(event),
ManagementInterfaceExited(result) => self.handle_management_interface_exited(result),
TriggerShutdown => self.handle_trigger_shutdown_event(),
}
}
- fn handle_tunnel_event(&mut self, tunnel_event: TunnelEvent) -> Result<()> {
- debug!("Tunnel event: {:?}", tunnel_event);
- if self.state == TunnelState::Connecting {
- if let TunnelEvent::Up(metadata) = tunnel_event {
- self.tunnel_metadata = Some(metadata);
- self.set_security_policy()?;
- self.set_state(TunnelState::Connected)
- } else {
- Ok(())
- }
- } else if self.state == TunnelState::Connected && tunnel_event == TunnelEvent::Down {
- self.kill_tunnel()
- } else {
- Ok(())
- }
- }
+ fn handle_tunnel_state_transition(
+ &mut self,
+ tunnel_state: TunnelStateTransition,
+ ) -> Result<()> {
+ use self::TunnelStateTransition::*;
- fn handle_tunnel_exited(&mut self, result: tunnel::Result<()>) -> Result<()> {
- if let Err(e) = result.chain_err(|| "Tunnel exited in an unexpected way") {
- error!("{}", e.display_chain());
+ debug!("New tunnel state: {:?}", tunnel_state);
+
+ if tunnel_state == Disconnected {
+ self.state.disconnected();
}
- self.current_relay = None;
- self.tunnel_endpoint = None;
- self.tunnel_metadata = None;
- self.tunnel_close_handle = None;
- self.set_state(TunnelState::NotRunning)
- }
- fn handle_tunnel_kill_result(&mut self, result: io::Result<()>) -> Result<()> {
- result.chain_err(|| "Error while trying to close tunnel")
+ self.tunnel_state = tunnel_state;
+ self.security_state = match tunnel_state {
+ Disconnected | Connecting => SecurityState::Unsecured,
+ Connected | Disconnecting => SecurityState::Secured,
+ };
+
+ self.broadcast_state();
+
+ Ok(())
}
- fn handle_management_interface_event(&mut self, event: TunnelCommand) -> Result<()> {
- use TunnelCommand::*;
+ fn handle_management_interface_event(&mut self, event: ManagementCommand) -> Result<()> {
+ use ManagementCommand::*;
match event {
SetTargetState(state) => self.on_set_target_state(state),
GetState(tx) => Ok(self.on_get_state(tx)),
@@ -402,7 +388,7 @@ impl Daemon {
}
fn on_set_target_state(&mut self, new_target_state: TargetState) -> Result<()> {
- if !self.shutdown {
+ if self.state.is_running() {
self.set_target_state(new_target_state)
} else {
warn!("Ignoring target state change request due to shutdown");
@@ -465,11 +451,9 @@ impl Daemon {
match save_result.chain_err(|| "Unable to save settings") {
Ok(account_changed) => {
Self::oneshot_send(tx, (), "set_account response");
- let tunnel_needs_restart =
- self.state == TunnelState::Connecting || self.state == TunnelState::Connected;
- if account_changed && tunnel_needs_restart {
+ if account_changed {
info!("Initiating tunnel restart because the account token changed");
- self.kill_tunnel()?;
+ self.connect_tunnel()?;
}
}
Err(e) => error!("{}", e.display_chain()),
@@ -515,12 +499,9 @@ impl Daemon {
Ok(changed) => {
Self::oneshot_send(tx, (), "update_relay_settings response");
- let tunnel_needs_restart =
- self.state == TunnelState::Connecting || self.state == TunnelState::Connected;
-
- if changed && tunnel_needs_restart {
+ if changed {
info!("Initiating tunnel restart because the relay settings changed");
- self.kill_tunnel()?;
+ self.connect_tunnel()?;
}
}
Err(e) => error!("{}", e.display_chain()),
@@ -537,8 +518,10 @@ impl Daemon {
let save_result = self.settings.set_allow_lan(allow_lan);
match save_result.chain_err(|| "Unable to save settings") {
Ok(settings_changed) => {
- if settings_changed && self.target_state == TargetState::Secured {
- self.set_security_policy()?;
+ if settings_changed {
+ self.tunnel_command_tx
+ .send(TunnelCommand::AllowLan(allow_lan))
+ .expect("Tunnel state machine has stopped");
}
Self::oneshot_send(tx, (), "set_allow_lan response");
}
@@ -592,12 +575,9 @@ impl Daemon {
Ok(settings_changed) => {
Self::oneshot_send(tx, (), "set_openvpn_enable_ipv6 response");
- let tunnel_needs_restart =
- self.state == TunnelState::Connecting || self.state == TunnelState::Connected;
-
- if settings_changed && tunnel_needs_restart {
+ if settings_changed {
info!("Initiating tunnel restart because the enable IPv6 setting changed");
- self.kill_tunnel()?;
+ self.connect_tunnel()?;
}
}
Err(e) => error!("{}", e.display_chain()),
@@ -627,29 +607,15 @@ impl Daemon {
}
fn handle_trigger_shutdown_event(&mut self) -> Result<()> {
- self.shutdown = true;
- self.set_target_state(TargetState::Unsecured)
- }
+ self.state.shutdown(self.tunnel_state);
+ self.disconnect_tunnel();
- /// Update the state of the client. If it changed, notify the subscribers and trigger
- /// appropriate actions.
- fn set_state(&mut self, new_state: TunnelState) -> Result<()> {
- if new_state != self.state {
- debug!("State {:?} => {:?}", self.state, new_state);
- self.state = new_state;
- self.broadcast_state();
- self.verify_state_consistency()?;
- self.apply_target_state()
- } else {
- // Calling set_state with the same state we already have is an error. Should try to
- // mitigate this possibility completely with a better state machine later.
- Err(ErrorKind::InvalidState.into())
- }
+ Ok(())
}
fn broadcast_state(&mut self) {
let new_daemon_state = DaemonState {
- state: self.state.as_security_state(),
+ state: self.security_state,
target_state: self.target_state,
};
if self.last_broadcasted_state != new_daemon_state {
@@ -659,21 +625,6 @@ impl Daemon {
}
}
- // Check that the current state is valid and consistent.
- fn verify_state_consistency(&self) -> Result<()> {
- use TunnelState::*;
- ensure!(
- match self.state {
- NotRunning => self.tunnel_close_handle.is_none(),
- Connecting => self.tunnel_close_handle.is_some(),
- Connected => self.tunnel_close_handle.is_some(),
- Exiting => self.tunnel_close_handle.is_none(),
- },
- ErrorKind::InvalidState
- );
- Ok(())
- }
-
/// Set the target state of the client. If it changed trigger the operations needed to
/// progress towards that state.
fn set_target_state(&mut self, new_state: TargetState) -> Result<()> {
@@ -688,139 +639,66 @@ impl Daemon {
}
fn apply_target_state(&mut self) -> Result<()> {
- match (self.target_state, self.state) {
- (TargetState::Secured, TunnelState::NotRunning) => {
+ match self.target_state {
+ TargetState::Secured => {
debug!("Triggering tunnel start");
- if let Err(e) = self.start_tunnel().chain_err(|| "Failed to start tunnel") {
+ if let Err(e) = self.connect_tunnel().chain_err(|| "Failed to start tunnel") {
error!("{}", e.display_chain());
self.current_relay = None;
- self.tunnel_endpoint = None;
self.management_interface_broadcaster.notify_error(&e);
self.set_target_state(TargetState::Unsecured)?;
}
- Ok(())
}
- (TargetState::Unsecured, TunnelState::NotRunning) => self.reset_security_policy(),
- (TargetState::Unsecured, TunnelState::Connecting)
- | (TargetState::Unsecured, TunnelState::Connected) => self.kill_tunnel(),
- (..) => Ok(()),
+ TargetState::Unsecured => self.disconnect_tunnel(),
}
+
+ Ok(())
}
- fn start_tunnel(&mut self) -> Result<()> {
- ensure!(
- self.target_state == TargetState::Secured && self.state == TunnelState::NotRunning,
- ErrorKind::InvalidState
- );
+ fn connect_tunnel(&mut self) -> Result<()> {
+ let parameters = self.build_tunnel_parameters()?;
- match self.settings.get_relay_settings() {
- RelaySettings::CustomTunnelEndpoint(custom_relay) => {
- let tunnel_endpoint = custom_relay
- .to_tunnel_endpoint()
- .chain_err(|| ErrorKind::NoRelay)?;
- self.tunnel_endpoint = Some(tunnel_endpoint);
- }
+ self.tunnel_command_tx
+ .send(TunnelCommand::Connect(parameters))
+ .expect("Tunnel state machine has stopped");
+
+ Ok(())
+ }
+
+ fn disconnect_tunnel(&mut self) {
+ self.tunnel_command_tx
+ .send(TunnelCommand::Disconnect)
+ .expect("Tunnel state machine has stopped");
+ }
+
+ fn build_tunnel_parameters(&mut self) -> Result<TunnelParameters> {
+ let endpoint = match self.settings.get_relay_settings() {
+ RelaySettings::CustomTunnelEndpoint(custom_relay) => custom_relay
+ .to_tunnel_endpoint()
+ .chain_err(|| ErrorKind::NoRelay)?,
RelaySettings::Normal(constraints) => {
let (relay, tunnel_endpoint) = self
.relay_selector
.get_tunnel_endpoint(&constraints)
.chain_err(|| ErrorKind::NoRelay)?;
- self.tunnel_endpoint = Some(tunnel_endpoint);
self.current_relay = Some(relay);
+ tunnel_endpoint
}
- }
+ };
let account_token = self
.settings
.get_account_token()
.ok_or(ErrorKind::InvalidSettings("No account token"))?;
- self.set_security_policy()?;
-
- let tunnel_monitor =
- self.spawn_tunnel_monitor(self.tunnel_endpoint.unwrap(), &account_token)?;
- self.tunnel_close_handle = Some(tunnel_monitor.close_handle());
- self.spawn_tunnel_monitor_wait_thread(tunnel_monitor);
-
- self.set_state(TunnelState::Connecting)?;
- Ok(())
- }
-
- fn spawn_tunnel_monitor(
- &self,
- tunnel_endpoint: TunnelEndpoint,
- account_token: &str,
- ) -> Result<TunnelMonitor> {
- // Must wrap the channel in a Mutex because TunnelMonitor forces the closure to be Sync
- let event_tx = Arc::new(Mutex::new(self.tx.clone()));
- let on_tunnel_event = move |event| {
- let _ = event_tx
- .lock()
- .unwrap()
- .send(DaemonEvent::TunnelEvent(event));
- };
-
- let tunnel_log = if let Some(ref log_dir) = self.log_dir {
- let filename = match tunnel_endpoint.tunnel {
- TunnelEndpointData::OpenVpn(_) => OPENVPN_LOG_FILENAME,
- TunnelEndpointData::Wireguard(_) => WIREGUARD_LOG_FILENAME,
- };
- let tunnel_log = log_dir.join(filename);
- logging::rotate_log(&tunnel_log).chain_err(|| "Unable to rotate tunnel log")?;
- Some(tunnel_log)
- } else {
- None
- };
-
- let tunnel_options = self.settings.get_tunnel_options();
- let tunnel_alias = {
- #[cfg(windows)]
- {
- Some(TUNNEL_INTERFACE_ALIAS.into())
- }
- #[cfg(not(windows))]
- {
- None
- }
- };
- TunnelMonitor::new(
- tunnel_endpoint,
- &tunnel_options,
- tunnel_alias,
+ Ok(TunnelParameters {
+ endpoint,
+ options: self.settings.get_tunnel_options().clone(),
+ log_dir: self.log_dir.clone(),
+ resource_dir: self.resource_dir.clone(),
account_token,
- tunnel_log.as_ref().map(PathBuf::as_path),
- &self.resource_dir,
- on_tunnel_event,
- ).chain_err(|| ErrorKind::TunnelError("Unable to start tunnel monitor"))
- }
-
- fn spawn_tunnel_monitor_wait_thread(&self, tunnel_monitor: TunnelMonitor) {
- let error_tx = self.tx.clone();
- thread::spawn(move || {
- let start = Instant::now();
- let result = tunnel_monitor.wait();
- if let Some(sleep_dur) = MIN_TUNNEL_ALIVE_TIME.checked_sub(start.elapsed()) {
- thread::sleep(sleep_dur);
- }
- let _ = error_tx.send(DaemonEvent::TunnelExited(result));
- trace!("Tunnel monitor thread exit");
- });
- }
-
- fn kill_tunnel(&mut self) -> Result<()> {
- ensure!(
- self.state == TunnelState::Connecting || self.state == TunnelState::Connected,
- ErrorKind::InvalidState
- );
- let close_handle = self.tunnel_close_handle.take().unwrap();
- self.set_state(TunnelState::Exiting)?;
- let result_tx = self.tx.clone();
- thread::spawn(move || {
- let result = close_handle.close();
- let _ = result_tx.send(DaemonEvent::TunnelKillResult(result));
- trace!("Tunnel kill thread exit");
- });
- Ok(())
+ allow_lan: self.settings.get_allow_lan(),
+ })
}
pub fn shutdown_handle(&self) -> DaemonShutdownHandle {
@@ -828,32 +706,6 @@ impl Daemon {
tx: self.tx.clone(),
}
}
-
- fn set_security_policy(&mut self) -> Result<()> {
- let policy = match (self.tunnel_endpoint, self.tunnel_metadata.as_ref()) {
- (Some(relay), None) => SecurityPolicy::Connecting {
- relay_endpoint: relay.to_endpoint(),
- allow_lan: self.settings.get_allow_lan(),
- },
- (Some(relay), Some(tunnel_metadata)) => SecurityPolicy::Connected {
- relay_endpoint: relay.to_endpoint(),
- tunnel: tunnel_metadata.clone(),
- allow_lan: self.settings.get_allow_lan(),
- },
- _ => bail!(ErrorKind::InvalidState),
- };
- debug!("Set security policy: {:?}", policy);
- self.firewall
- .apply_policy(policy)
- .chain_err(|| ErrorKind::FirewallError)
- }
-
- fn reset_security_policy(&mut self) -> Result<()> {
- debug!("Reset security policy");
- self.firewall
- .reset_policy()
- .chain_err(|| ErrorKind::FirewallError)
- }
}
struct DaemonShutdownHandle {
diff --git a/mullvad-daemon/src/management_interface.rs b/mullvad-daemon/src/management_interface.rs
index 2a77650a52..3f2d920180 100644
--- a/mullvad-daemon/src/management_interface.rs
+++ b/mullvad-daemon/src/management_interface.rs
@@ -167,7 +167,7 @@ build_rpc_trait! {
/// Enum representing commands coming in on the management interface.
-pub enum TunnelCommand {
+pub enum ManagementCommand {
/// Change target state.
SetTargetState(TargetState),
/// Request the current state.
@@ -224,12 +224,12 @@ pub struct ManagementInterfaceServer {
impl ManagementInterfaceServer {
pub fn start<T>(
- tunnel_tx: IntoSender<TunnelCommand, T>,
+ tunnel_tx: IntoSender<ManagementCommand, T>,
shared_secret: String,
cache_dir: PathBuf,
) -> talpid_ipc::Result<Self>
where
- T: From<TunnelCommand> + 'static + Send,
+ T: From<ManagementCommand> + 'static + Send,
{
let rpc = ManagementInterface::new(tunnel_tx, shared_secret, cache_dir);
let subscriptions = rpc.subscriptions.clone();
@@ -297,16 +297,16 @@ impl EventBroadcaster {
}
}
-struct ManagementInterface<T: From<TunnelCommand> + 'static + Send> {
+struct ManagementInterface<T: From<ManagementCommand> + 'static + Send> {
subscriptions: Arc<ActiveSubscriptions>,
- tx: Mutex<IntoSender<TunnelCommand, T>>,
+ tx: Mutex<IntoSender<ManagementCommand, T>>,
shared_secret: String,
cache_dir: PathBuf,
}
-impl<T: From<TunnelCommand> + 'static + Send> ManagementInterface<T> {
+impl<T: From<ManagementCommand> + 'static + Send> ManagementInterface<T> {
pub fn new(
- tx: IntoSender<TunnelCommand, T>,
+ tx: IntoSender<ManagementCommand, T>,
shared_secret: String,
cache_dir: PathBuf,
) -> Self {
@@ -354,7 +354,7 @@ impl<T: From<TunnelCommand> + 'static + Send> ManagementInterface<T> {
}
/// Sends a command to the daemon and maps the error to an RPC error.
- fn send_command_to_daemon(&self, command: TunnelCommand) -> BoxFuture<(), Error> {
+ fn send_command_to_daemon(&self, command: ManagementCommand) -> BoxFuture<(), Error> {
Box::new(
future::result(self.tx.lock().unwrap().send(command))
.map_err(|_| Error::internal_error()),
@@ -408,7 +408,9 @@ macro_rules! try_future {
};
}
-impl<T: From<TunnelCommand> + 'static + Send> ManagementInterfaceApi for ManagementInterface<T> {
+impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi
+ for ManagementInterface<T>
+{
type Metadata = Meta;
fn auth(&self, meta: Self::Metadata, shared_secret: String) -> BoxFuture<(), Error> {
@@ -431,7 +433,7 @@ impl<T: From<TunnelCommand> + 'static + Send> ManagementInterfaceApi for Managem
try_future!(self.check_auth(&meta));
let (tx, rx) = sync::oneshot::channel();
let future = self
- .send_command_to_daemon(TunnelCommand::GetAccountData(tx, account_token))
+ .send_command_to_daemon(ManagementCommand::GetAccountData(tx, account_token))
.and_then(|_| rx.map_err(|_| Error::internal_error()))
.and_then(|rpc_future| {
rpc_future.map_err(|error: mullvad_rpc::Error| {
@@ -450,7 +452,7 @@ impl<T: From<TunnelCommand> + 'static + Send> ManagementInterfaceApi for Managem
try_future!(self.check_auth(&meta));
let (tx, rx) = sync::oneshot::channel();
let future = self
- .send_command_to_daemon(TunnelCommand::GetRelayLocations(tx))
+ .send_command_to_daemon(ManagementCommand::GetRelayLocations(tx))
.and_then(|_| rx.map_err(|_| Error::internal_error()));
Box::new(future)
}
@@ -464,7 +466,7 @@ impl<T: From<TunnelCommand> + 'static + Send> ManagementInterfaceApi for Managem
try_future!(self.check_auth(&meta));
let (tx, rx) = sync::oneshot::channel();
let future = self
- .send_command_to_daemon(TunnelCommand::SetAccount(tx, account_token.clone()))
+ .send_command_to_daemon(ManagementCommand::SetAccount(tx, account_token.clone()))
.and_then(|_| rx.map_err(|_| Error::internal_error()));
if let Some(new_account_token) = account_token {
@@ -486,7 +488,7 @@ impl<T: From<TunnelCommand> + 'static + Send> ManagementInterfaceApi for Managem
try_future!(self.check_auth(&meta));
let (tx, rx) = sync::oneshot::channel();
let future = self
- .send_command_to_daemon(TunnelCommand::GetAccount(tx))
+ .send_command_to_daemon(ManagementCommand::GetAccount(tx))
.and_then(|_| rx.map_err(|_| Error::internal_error()));
Box::new(future)
}
@@ -500,7 +502,7 @@ impl<T: From<TunnelCommand> + 'static + Send> ManagementInterfaceApi for Managem
try_future!(self.check_auth(&meta));
let (tx, rx) = sync::oneshot::channel();
- let message = TunnelCommand::UpdateRelaySettings(tx, constraints_update);
+ let message = ManagementCommand::UpdateRelaySettings(tx, constraints_update);
let future = self
.send_command_to_daemon(message)
.and_then(|_| rx.map_err(|_| Error::internal_error()));
@@ -512,7 +514,7 @@ impl<T: From<TunnelCommand> + 'static + Send> ManagementInterfaceApi for Managem
try_future!(self.check_auth(&meta));
let (tx, rx) = sync::oneshot::channel();
let future = self
- .send_command_to_daemon(TunnelCommand::GetRelaySettings(tx))
+ .send_command_to_daemon(ManagementCommand::GetRelaySettings(tx))
.and_then(|_| rx.map_err(|_| Error::internal_error()));
Box::new(future)
}
@@ -522,7 +524,7 @@ impl<T: From<TunnelCommand> + 'static + Send> ManagementInterfaceApi for Managem
try_future!(self.check_auth(&meta));
let (tx, rx) = sync::oneshot::channel();
let future = self
- .send_command_to_daemon(TunnelCommand::SetAllowLan(tx, allow_lan))
+ .send_command_to_daemon(ManagementCommand::SetAllowLan(tx, allow_lan))
.and_then(|_| rx.map_err(|_| Error::internal_error()));
Box::new(future)
}
@@ -532,7 +534,7 @@ impl<T: From<TunnelCommand> + 'static + Send> ManagementInterfaceApi for Managem
try_future!(self.check_auth(&meta));
let (tx, rx) = sync::oneshot::channel();
let future = self
- .send_command_to_daemon(TunnelCommand::GetAllowLan(tx))
+ .send_command_to_daemon(ManagementCommand::GetAllowLan(tx))
.and_then(|_| rx.map_err(|_| Error::internal_error()));
Box::new(future)
}
@@ -542,7 +544,7 @@ impl<T: From<TunnelCommand> + 'static + Send> ManagementInterfaceApi for Managem
try_future!(self.check_auth(&meta));
let (tx, rx) = sync::oneshot::channel();
let future = self
- .send_command_to_daemon(TunnelCommand::SetAutoConnect(tx, auto_connect))
+ .send_command_to_daemon(ManagementCommand::SetAutoConnect(tx, auto_connect))
.and_then(|_| rx.map_err(|_| Error::internal_error()));
Box::new(future)
}
@@ -552,7 +554,7 @@ impl<T: From<TunnelCommand> + 'static + Send> ManagementInterfaceApi for Managem
try_future!(self.check_auth(&meta));
let (tx, rx) = sync::oneshot::channel();
let future = self
- .send_command_to_daemon(TunnelCommand::GetAutoConnect(tx))
+ .send_command_to_daemon(ManagementCommand::GetAutoConnect(tx))
.and_then(|_| rx.map_err(|_| Error::internal_error()));
Box::new(future)
}
@@ -560,13 +562,13 @@ impl<T: From<TunnelCommand> + 'static + Send> ManagementInterfaceApi for Managem
fn connect(&self, meta: Self::Metadata) -> BoxFuture<(), Error> {
trace!("connect");
try_future!(self.check_auth(&meta));
- self.send_command_to_daemon(TunnelCommand::SetTargetState(TargetState::Secured))
+ self.send_command_to_daemon(ManagementCommand::SetTargetState(TargetState::Secured))
}
fn disconnect(&self, meta: Self::Metadata) -> BoxFuture<(), Error> {
trace!("disconnect");
try_future!(self.check_auth(&meta));
- self.send_command_to_daemon(TunnelCommand::SetTargetState(TargetState::Unsecured))
+ self.send_command_to_daemon(ManagementCommand::SetTargetState(TargetState::Unsecured))
}
fn get_state(&self, meta: Self::Metadata) -> BoxFuture<DaemonState, Error> {
@@ -574,7 +576,7 @@ impl<T: From<TunnelCommand> + 'static + Send> ManagementInterfaceApi for Managem
try_future!(self.check_auth(&meta));
let (state_tx, state_rx) = sync::oneshot::channel();
let future = self
- .send_command_to_daemon(TunnelCommand::GetState(state_tx))
+ .send_command_to_daemon(ManagementCommand::GetState(state_tx))
.and_then(|_| state_rx.map_err(|_| Error::internal_error()));
Box::new(future)
}
@@ -584,7 +586,7 @@ impl<T: From<TunnelCommand> + 'static + Send> ManagementInterfaceApi for Managem
try_future!(self.check_auth(&meta));
let (tx, rx) = sync::oneshot::channel();
let future = self
- .send_command_to_daemon(TunnelCommand::GetCurrentLocation(tx))
+ .send_command_to_daemon(ManagementCommand::GetCurrentLocation(tx))
.and_then(|_| rx.map_err(|_| Error::internal_error()));
Box::new(future)
}
@@ -592,7 +594,7 @@ impl<T: From<TunnelCommand> + 'static + Send> ManagementInterfaceApi for Managem
fn shutdown(&self, meta: Self::Metadata) -> BoxFuture<(), Error> {
trace!("shutdown");
try_future!(self.check_auth(&meta));
- self.send_command_to_daemon(TunnelCommand::Shutdown)
+ self.send_command_to_daemon(ManagementCommand::Shutdown)
}
fn get_account_history(&self, meta: Self::Metadata) -> BoxFuture<Vec<AccountToken>, Error> {
@@ -637,7 +639,7 @@ impl<T: From<TunnelCommand> + 'static + Send> ManagementInterfaceApi for Managem
try_future!(self.check_auth(&meta));
let (tx, rx) = sync::oneshot::channel();
let future = self
- .send_command_to_daemon(TunnelCommand::SetOpenVpnMssfix(tx, mssfix))
+ .send_command_to_daemon(ManagementCommand::SetOpenVpnMssfix(tx, mssfix))
.and_then(|_| rx.map_err(|_| Error::internal_error()));
Box::new(future)
@@ -652,7 +654,7 @@ impl<T: From<TunnelCommand> + 'static + Send> ManagementInterfaceApi for Managem
try_future!(self.check_auth(&meta));
let (tx, rx) = sync::oneshot::channel();
let future = self
- .send_command_to_daemon(TunnelCommand::SetOpenVpnEnableIpv6(tx, enable_ipv6))
+ .send_command_to_daemon(ManagementCommand::SetOpenVpnEnableIpv6(tx, enable_ipv6))
.and_then(|_| rx.map_err(|_| Error::internal_error()));
Box::new(future)
@@ -663,7 +665,7 @@ impl<T: From<TunnelCommand> + 'static + Send> ManagementInterfaceApi for Managem
try_future!(self.check_auth(&meta));
let (tx, rx) = sync::oneshot::channel();
let future = self
- .send_command_to_daemon(TunnelCommand::GetTunnelOptions(tx))
+ .send_command_to_daemon(ManagementCommand::GetTunnelOptions(tx))
.and_then(|_| rx.map_err(|_| Error::internal_error()));
Box::new(future)
}
@@ -672,7 +674,7 @@ impl<T: From<TunnelCommand> + 'static + Send> ManagementInterfaceApi for Managem
try_future!(self.check_auth(&meta));
let (tx, rx) = sync::oneshot::channel();
let future = self
- .send_command_to_daemon(TunnelCommand::GetCurrentVersion(tx))
+ .send_command_to_daemon(ManagementCommand::GetCurrentVersion(tx))
.and_then(|_| rx.map_err(|_| Error::internal_error()));
Box::new(future)
@@ -682,7 +684,7 @@ impl<T: From<TunnelCommand> + 'static + Send> ManagementInterfaceApi for Managem
try_future!(self.check_auth(&meta));
let (tx, rx) = sync::oneshot::channel();
let future = self
- .send_command_to_daemon(TunnelCommand::GetVersionInfo(tx))
+ .send_command_to_daemon(ManagementCommand::GetVersionInfo(tx))
.and_then(|_| rx.map_err(|_| Error::internal_error()))
.and_then(|version_future| {
version_future.map_err(|error| {
diff --git a/mullvad-daemon/src/tunnel_state_machine/connected_state.rs b/mullvad-daemon/src/tunnel_state_machine/connected_state.rs
new file mode 100644
index 0000000000..332608f055
--- /dev/null
+++ b/mullvad-daemon/src/tunnel_state_machine/connected_state.rs
@@ -0,0 +1,183 @@
+use futures::sync::{mpsc, oneshot};
+use futures::{Async, Future, Stream};
+
+use talpid_core::firewall::{Firewall, SecurityPolicy};
+use talpid_core::tunnel::{CloseHandle, TunnelEvent, TunnelMetadata};
+use talpid_types::net::TunnelEndpoint;
+
+use super::{
+ AfterDisconnect, ConnectingState, DisconnectingState, EventConsequence, Result, ResultExt,
+ SharedTunnelStateValues, StateEntryResult, TunnelCommand, TunnelParameters, TunnelState,
+ TunnelStateWrapper,
+};
+
+pub struct ConnectedStateBootstrap {
+ pub metadata: TunnelMetadata,
+ pub tunnel_events: mpsc::UnboundedReceiver<TunnelEvent>,
+ pub tunnel_endpoint: TunnelEndpoint,
+ pub tunnel_parameters: TunnelParameters,
+ pub tunnel_close_event: oneshot::Receiver<()>,
+ pub close_handle: CloseHandle,
+}
+
+/// The tunnel is up and working.
+pub struct ConnectedState {
+ metadata: TunnelMetadata,
+ tunnel_events: mpsc::UnboundedReceiver<TunnelEvent>,
+ tunnel_endpoint: TunnelEndpoint,
+ tunnel_parameters: TunnelParameters,
+ tunnel_close_event: oneshot::Receiver<()>,
+ close_handle: CloseHandle,
+}
+
+impl ConnectedState {
+ fn from(bootstrap: ConnectedStateBootstrap) -> Self {
+ ConnectedState {
+ metadata: bootstrap.metadata,
+ tunnel_events: bootstrap.tunnel_events,
+ tunnel_endpoint: bootstrap.tunnel_endpoint,
+ tunnel_parameters: bootstrap.tunnel_parameters,
+ tunnel_close_event: bootstrap.tunnel_close_event,
+ close_handle: bootstrap.close_handle,
+ }
+ }
+
+ fn set_security_policy(&self, shared_values: &mut SharedTunnelStateValues) -> Result<()> {
+ let policy = SecurityPolicy::Connected {
+ relay_endpoint: self.tunnel_endpoint.to_endpoint(),
+ tunnel: self.metadata.clone(),
+ allow_lan: self.tunnel_parameters.allow_lan,
+ };
+
+ debug!("Set security policy: {:?}", policy);
+ shared_values
+ .firewall
+ .apply_policy(policy)
+ .chain_err(|| "Failed to apply security policy for connected state")
+ }
+
+ fn handle_commands(
+ mut self,
+ commands: &mut mpsc::UnboundedReceiver<TunnelCommand>,
+ shared_values: &mut SharedTunnelStateValues,
+ ) -> EventConsequence<Self> {
+ use self::EventConsequence::*;
+
+ match try_handle_event!(self, commands.poll()) {
+ Ok(TunnelCommand::Connect(parameters)) => {
+ if parameters != self.tunnel_parameters {
+ NewState(DisconnectingState::enter(
+ shared_values,
+ (
+ self.close_handle,
+ self.tunnel_close_event,
+ AfterDisconnect::Reconnect(parameters),
+ ),
+ ))
+ } else {
+ SameState(self)
+ }
+ }
+ Ok(TunnelCommand::Disconnect) | Err(_) => NewState(DisconnectingState::enter(
+ shared_values,
+ (
+ self.close_handle,
+ self.tunnel_close_event,
+ AfterDisconnect::Nothing,
+ ),
+ )),
+ Ok(TunnelCommand::AllowLan(allow_lan)) => {
+ self.tunnel_parameters.allow_lan = allow_lan;
+
+ match self.set_security_policy(shared_values) {
+ Ok(()) => SameState(self),
+ Err(error) => {
+ error!("{}", error.chain_err(|| "Failed to update security policy"));
+ NewState(DisconnectingState::enter(
+ shared_values,
+ (
+ self.close_handle,
+ self.tunnel_close_event,
+ AfterDisconnect::Nothing,
+ ),
+ ))
+ }
+ }
+ }
+ }
+ }
+
+ fn handle_tunnel_events(
+ mut self,
+ shared_values: &mut SharedTunnelStateValues,
+ ) -> EventConsequence<Self> {
+ use self::EventConsequence::*;
+
+ match try_handle_event!(self, self.tunnel_events.poll()) {
+ Ok(TunnelEvent::Down) | Err(_) => NewState(DisconnectingState::enter(
+ shared_values,
+ (
+ self.close_handle,
+ self.tunnel_close_event,
+ AfterDisconnect::Reconnect(self.tunnel_parameters),
+ ),
+ )),
+ Ok(_) => SameState(self),
+ }
+ }
+
+ fn handle_tunnel_close_event(
+ mut self,
+ shared_values: &mut SharedTunnelStateValues,
+ ) -> EventConsequence<Self> {
+ use self::EventConsequence::*;
+
+ match self.tunnel_close_event.poll() {
+ Ok(Async::Ready(_)) => {}
+ Ok(Async::NotReady) => return NoEvents(self),
+ Err(_cancelled) => warn!("Tunnel monitor thread has stopped unexpectedly"),
+ }
+
+ info!("Tunnel closed. Reconnecting.");
+ NewState(ConnectingState::enter(
+ shared_values,
+ self.tunnel_parameters,
+ ))
+ }
+}
+
+impl TunnelState for ConnectedState {
+ type Bootstrap = ConnectedStateBootstrap;
+
+ fn enter(
+ shared_values: &mut SharedTunnelStateValues,
+ bootstrap: Self::Bootstrap,
+ ) -> StateEntryResult {
+ let connected_state = ConnectedState::from(bootstrap);
+
+ match connected_state.set_security_policy(shared_values) {
+ Ok(()) => Ok(TunnelStateWrapper::from(connected_state)),
+ Err(error) => Err((
+ error,
+ DisconnectingState::enter(
+ shared_values,
+ (
+ connected_state.close_handle,
+ connected_state.tunnel_close_event,
+ AfterDisconnect::Nothing,
+ ),
+ ).expect("Failed to disconnect after failed transition to connected state"),
+ )),
+ }
+ }
+
+ fn handle_event(
+ self,
+ commands: &mut mpsc::UnboundedReceiver<TunnelCommand>,
+ shared_values: &mut SharedTunnelStateValues,
+ ) -> EventConsequence<Self> {
+ self.handle_commands(commands, shared_values)
+ .or_else(Self::handle_tunnel_events, shared_values)
+ .or_else(Self::handle_tunnel_close_event, shared_values)
+ }
+}
diff --git a/mullvad-daemon/src/tunnel_state_machine/connecting_state.rs b/mullvad-daemon/src/tunnel_state_machine/connecting_state.rs
new file mode 100644
index 0000000000..a564982527
--- /dev/null
+++ b/mullvad-daemon/src/tunnel_state_machine/connecting_state.rs
@@ -0,0 +1,294 @@
+use std::ffi::OsString;
+use std::path::PathBuf;
+use std::sync::Mutex;
+use std::thread;
+use std::time::{Duration, Instant};
+
+use futures::sink::Wait;
+use futures::sync::{mpsc, oneshot};
+use futures::{Async, Future, Sink, Stream};
+
+use talpid_core::firewall::{Firewall, SecurityPolicy};
+use talpid_core::tunnel::{CloseHandle, TunnelEvent, TunnelMetadata, TunnelMonitor};
+use talpid_types::net::{TunnelEndpoint, TunnelEndpointData};
+
+use super::{
+ AfterDisconnect, ConnectedState, ConnectedStateBootstrap, DisconnectedState,
+ DisconnectingState, EventConsequence, Result, ResultExt, SharedTunnelStateValues,
+ StateEntryResult, TunnelCommand, TunnelParameters, TunnelState, TunnelStateWrapper,
+};
+use logging;
+
+const MIN_TUNNEL_ALIVE_TIME: Duration = Duration::from_millis(1000);
+
+const OPENVPN_LOG_FILENAME: &str = "openvpn.log";
+const WIREGUARD_LOG_FILENAME: &str = "wireguard.log";
+
+#[cfg(windows)]
+const TUNNEL_INTERFACE_ALIAS: Option<&str> = Some("Mullvad");
+#[cfg(not(windows))]
+const TUNNEL_INTERFACE_ALIAS: Option<&str> = None;
+
+/// The tunnel has been started, but it is not established/functional.
+pub struct ConnectingState {
+ tunnel_events: mpsc::UnboundedReceiver<TunnelEvent>,
+ tunnel_endpoint: TunnelEndpoint,
+ tunnel_parameters: TunnelParameters,
+ tunnel_close_event: oneshot::Receiver<()>,
+ close_handle: CloseHandle,
+}
+
+impl ConnectingState {
+ fn new(
+ shared_values: &mut SharedTunnelStateValues,
+ parameters: TunnelParameters,
+ ) -> Result<Self> {
+ Self::set_security_policy(shared_values, parameters.endpoint, parameters.allow_lan)?;
+
+ let tunnel_endpoint = parameters.endpoint;
+ let (tunnel_events, tunnel_close_event, close_handle) = Self::start_tunnel(&parameters)?;
+
+ Ok(ConnectingState {
+ tunnel_events,
+ tunnel_endpoint,
+ tunnel_parameters: parameters,
+ tunnel_close_event,
+ close_handle,
+ })
+ }
+
+ fn set_security_policy(
+ shared_values: &mut SharedTunnelStateValues,
+ endpoint: TunnelEndpoint,
+ allow_lan: bool,
+ ) -> Result<()> {
+ let policy = SecurityPolicy::Connecting {
+ relay_endpoint: endpoint.to_endpoint(),
+ allow_lan,
+ };
+
+ debug!("Set security policy: {:?}", policy);
+ shared_values
+ .firewall
+ .apply_policy(policy)
+ .chain_err(|| "Failed to apply security policy for connecting state")
+ }
+
+ fn start_tunnel(
+ parameters: &TunnelParameters,
+ ) -> Result<(
+ mpsc::UnboundedReceiver<TunnelEvent>,
+ oneshot::Receiver<()>,
+ CloseHandle,
+ )> {
+ let (event_tx, event_rx) = mpsc::unbounded();
+ let monitor = Self::spawn_tunnel_monitor(&parameters, event_tx.wait())?;
+ let close_handle = monitor.close_handle();
+ let tunnel_close_event = Self::spawn_tunnel_monitor_wait_thread(monitor);
+
+ Ok((event_rx, tunnel_close_event, close_handle))
+ }
+
+ fn spawn_tunnel_monitor(
+ parameters: &TunnelParameters,
+ events: Wait<mpsc::UnboundedSender<TunnelEvent>>,
+ ) -> Result<TunnelMonitor> {
+ let event_tx = Mutex::new(events);
+ let on_tunnel_event = move |event| {
+ let send_result = event_tx
+ .lock()
+ .expect("A thread panicked while sending a tunnel event")
+ .send(event);
+
+ if send_result.is_err() {
+ warn!("Tunnel state machine stopped before tunnel event was received");
+ }
+ };
+ let log_file = Self::prepare_tunnel_log_file(&parameters)?;
+
+ TunnelMonitor::new(
+ parameters.endpoint,
+ &parameters.options,
+ TUNNEL_INTERFACE_ALIAS.to_owned().map(OsString::from),
+ &parameters.account_token,
+ log_file.as_ref().map(PathBuf::as_path),
+ &parameters.resource_dir,
+ on_tunnel_event,
+ ).chain_err(|| "Unable to start tunnel monitor")
+ }
+
+ fn prepare_tunnel_log_file(parameters: &TunnelParameters) -> Result<Option<PathBuf>> {
+ if let Some(ref log_dir) = parameters.log_dir {
+ let filename = match parameters.endpoint.tunnel {
+ TunnelEndpointData::OpenVpn(_) => OPENVPN_LOG_FILENAME,
+ TunnelEndpointData::Wireguard(_) => WIREGUARD_LOG_FILENAME,
+ };
+ let tunnel_log = log_dir.join(filename);
+ logging::rotate_log(&tunnel_log).chain_err(|| "Unable to rotate tunnel log")?;
+ Ok(Some(tunnel_log))
+ } else {
+ Ok(None)
+ }
+ }
+
+ fn spawn_tunnel_monitor_wait_thread(tunnel_monitor: TunnelMonitor) -> oneshot::Receiver<()> {
+ let (tunnel_close_event_tx, tunnel_close_event_rx) = oneshot::channel();
+
+ thread::spawn(move || {
+ let start = Instant::now();
+
+ match tunnel_monitor.wait() {
+ Ok(_) => debug!("Tunnel has finished without errors"),
+ Err(error) => {
+ let chained_error = error.chain_err(|| "Tunnel has stopped unexpectedly");
+ warn!("{}", chained_error);
+ }
+ }
+
+ if let Some(remaining_time) = MIN_TUNNEL_ALIVE_TIME.checked_sub(start.elapsed()) {
+ thread::sleep(remaining_time);
+ }
+
+ if tunnel_close_event_tx.send(()).is_err() {
+ warn!("Tunnel state machine stopped before receiving tunnel closed event");
+ }
+
+ trace!("Tunnel monitor thread exit");
+ });
+
+ tunnel_close_event_rx
+ }
+
+ fn into_connected_state_bootstrap(self, metadata: TunnelMetadata) -> ConnectedStateBootstrap {
+ ConnectedStateBootstrap {
+ metadata,
+ tunnel_events: self.tunnel_events,
+ tunnel_endpoint: self.tunnel_endpoint,
+ tunnel_parameters: self.tunnel_parameters,
+ tunnel_close_event: self.tunnel_close_event,
+ close_handle: self.close_handle,
+ }
+ }
+
+ fn handle_commands(
+ mut self,
+ commands: &mut mpsc::UnboundedReceiver<TunnelCommand>,
+ shared_values: &mut SharedTunnelStateValues,
+ ) -> EventConsequence<Self> {
+ use self::EventConsequence::*;
+
+ match try_handle_event!(self, commands.poll()) {
+ Ok(TunnelCommand::Connect(parameters)) => {
+ if parameters != self.tunnel_parameters {
+ NewState(DisconnectingState::enter(
+ shared_values,
+ (
+ self.close_handle,
+ self.tunnel_close_event,
+ AfterDisconnect::Reconnect(parameters),
+ ),
+ ))
+ } else {
+ SameState(self)
+ }
+ }
+ Ok(TunnelCommand::Disconnect) | Err(_) => NewState(DisconnectingState::enter(
+ shared_values,
+ (
+ self.close_handle,
+ self.tunnel_close_event,
+ AfterDisconnect::Nothing,
+ ),
+ )),
+ Ok(TunnelCommand::AllowLan(allow_lan)) => {
+ self.tunnel_parameters.allow_lan = allow_lan;
+ match Self::set_security_policy(shared_values, self.tunnel_endpoint, allow_lan) {
+ Ok(()) => SameState(self),
+ Err(error) => {
+ error!("{}", error.chain_err(|| "Failed to update security policy"));
+ NewState(DisconnectingState::enter(
+ shared_values,
+ (
+ self.close_handle,
+ self.tunnel_close_event,
+ AfterDisconnect::Nothing,
+ ),
+ ))
+ }
+ }
+ }
+ }
+ }
+
+ fn handle_tunnel_events(
+ mut self,
+ shared_values: &mut SharedTunnelStateValues,
+ ) -> EventConsequence<Self> {
+ use self::EventConsequence::*;
+
+ match try_handle_event!(self, self.tunnel_events.poll()) {
+ Ok(TunnelEvent::Up(metadata)) => NewState(ConnectedState::enter(
+ shared_values,
+ self.into_connected_state_bootstrap(metadata),
+ )),
+ Ok(_) => SameState(self),
+ Err(_) => NewState(DisconnectingState::enter(
+ shared_values,
+ (
+ self.close_handle,
+ self.tunnel_close_event,
+ AfterDisconnect::Reconnect(self.tunnel_parameters),
+ ),
+ )),
+ }
+ }
+
+ fn handle_tunnel_close_event(
+ mut self,
+ shared_values: &mut SharedTunnelStateValues,
+ ) -> EventConsequence<Self> {
+ use self::EventConsequence::*;
+
+ match self.tunnel_close_event.poll() {
+ Ok(Async::Ready(_)) => {}
+ Ok(Async::NotReady) => return NoEvents(self),
+ Err(_cancelled) => warn!("Tunnel monitor thread has stopped unexpectedly"),
+ }
+
+ info!("Tunnel closed. Reconnecting.");
+ NewState(ConnectingState::enter(
+ shared_values,
+ self.tunnel_parameters,
+ ))
+ }
+}
+
+impl TunnelState for ConnectingState {
+ type Bootstrap = TunnelParameters;
+
+ fn enter(
+ shared_values: &mut SharedTunnelStateValues,
+ parameters: Self::Bootstrap,
+ ) -> StateEntryResult {
+ Self::new(shared_values, parameters)
+ .map(TunnelStateWrapper::from)
+ .chain_err(|| "Failed to start tunnel")
+ .map_err(|error| {
+ (
+ error,
+ DisconnectedState::enter(shared_values, ())
+ .expect("Failed to transition to fallback disconnected state"),
+ )
+ })
+ }
+
+ fn handle_event(
+ self,
+ commands: &mut mpsc::UnboundedReceiver<TunnelCommand>,
+ shared_values: &mut SharedTunnelStateValues,
+ ) -> EventConsequence<Self> {
+ self.handle_commands(commands, shared_values)
+ .or_else(Self::handle_tunnel_events, shared_values)
+ .or_else(Self::handle_tunnel_close_event, shared_values)
+ }
+}
diff --git a/mullvad-daemon/src/tunnel_state_machine/disconnected_state.rs b/mullvad-daemon/src/tunnel_state_machine/disconnected_state.rs
new file mode 100644
index 0000000000..20abcc7ec2
--- /dev/null
+++ b/mullvad-daemon/src/tunnel_state_machine/disconnected_state.rs
@@ -0,0 +1,49 @@
+use error_chain::ChainedError;
+use futures::sync::mpsc;
+use futures::Stream;
+
+use talpid_core::firewall::Firewall;
+
+use super::{
+ ConnectingState, Error, EventConsequence, SharedTunnelStateValues, StateEntryResult,
+ TunnelCommand, TunnelState, TunnelStateWrapper,
+};
+
+/// No tunnel is running.
+pub struct DisconnectedState;
+
+impl DisconnectedState {
+ fn reset_security_policy(shared_values: &mut SharedTunnelStateValues) {
+ debug!("Reset security policy");
+ if let Err(error) = shared_values.firewall.reset_policy() {
+ let chained_error = Error::with_chain(error, "Failed to reset security policy");
+ error!("{}", chained_error.display_chain());
+ }
+ }
+}
+
+impl TunnelState for DisconnectedState {
+ type Bootstrap = ();
+
+ fn enter(shared_values: &mut SharedTunnelStateValues, _: Self::Bootstrap) -> StateEntryResult {
+ Self::reset_security_policy(shared_values);
+
+ Ok(TunnelStateWrapper::from(DisconnectedState))
+ }
+
+ fn handle_event(
+ self,
+ commands: &mut mpsc::UnboundedReceiver<TunnelCommand>,
+ shared_values: &mut SharedTunnelStateValues,
+ ) -> EventConsequence<Self> {
+ use self::EventConsequence::*;
+
+ match try_handle_event!(self, commands.poll()) {
+ Ok(TunnelCommand::Connect(parameters)) => {
+ NewState(ConnectingState::enter(shared_values, parameters))
+ }
+ Ok(_) => SameState(self),
+ Err(_) => Finished,
+ }
+ }
+}
diff --git a/mullvad-daemon/src/tunnel_state_machine/disconnecting_state.rs b/mullvad-daemon/src/tunnel_state_machine/disconnecting_state.rs
new file mode 100644
index 0000000000..9785264da1
--- /dev/null
+++ b/mullvad-daemon/src/tunnel_state_machine/disconnecting_state.rs
@@ -0,0 +1,104 @@
+use error_chain::ChainedError;
+use futures::sync::{mpsc, oneshot};
+use futures::{Async, Future, Stream};
+
+use talpid_core::tunnel::CloseHandle;
+
+use super::{
+ ConnectingState, DisconnectedState, EventConsequence, ResultExt, SharedTunnelStateValues,
+ StateEntryResult, TunnelCommand, TunnelParameters, TunnelState, TunnelStateWrapper,
+};
+
+/// This state is active from when we manually trigger a tunnel kill until the tunnel wait
+/// operation (TunnelExit) returned.
+pub struct DisconnectingState {
+ exited: oneshot::Receiver<()>,
+ after_disconnect: AfterDisconnect,
+}
+
+impl DisconnectingState {
+ fn handle_commands(
+ mut self,
+ commands: &mut mpsc::UnboundedReceiver<TunnelCommand>,
+ ) -> EventConsequence<Self> {
+ use self::AfterDisconnect::*;
+
+ let event = try_handle_event!(self, commands.poll());
+ let after_disconnect = self.after_disconnect;
+
+ self.after_disconnect = match after_disconnect {
+ AfterDisconnect::Nothing => match event {
+ Ok(TunnelCommand::Connect(parameters)) => Reconnect(parameters),
+ _ => Nothing,
+ },
+ AfterDisconnect::Reconnect(mut tunnel_parameters) => match event {
+ Ok(TunnelCommand::Connect(parameters)) => Reconnect(parameters),
+ Ok(TunnelCommand::AllowLan(allow_lan)) => {
+ tunnel_parameters.allow_lan = allow_lan;
+ Reconnect(tunnel_parameters)
+ }
+ Ok(TunnelCommand::Disconnect) | Err(_) => Nothing,
+ },
+ };
+
+ EventConsequence::SameState(self)
+ }
+
+ fn handle_exit_event(
+ mut self,
+ shared_values: &mut SharedTunnelStateValues,
+ ) -> EventConsequence<Self> {
+ use self::EventConsequence::*;
+
+ match self.exited.poll() {
+ Ok(Async::NotReady) => NoEvents(self),
+ Ok(Async::Ready(_)) | Err(_) => NewState(self.after_disconnect(shared_values)),
+ }
+ }
+
+ fn after_disconnect(self, shared_values: &mut SharedTunnelStateValues) -> StateEntryResult {
+ match self.after_disconnect {
+ AfterDisconnect::Nothing => DisconnectedState::enter(shared_values, ()),
+ AfterDisconnect::Reconnect(tunnel_parameters) => {
+ ConnectingState::enter(shared_values, tunnel_parameters)
+ }
+ }
+ }
+}
+
+impl TunnelState for DisconnectingState {
+ type Bootstrap = (CloseHandle, oneshot::Receiver<()>, AfterDisconnect);
+
+ fn enter(
+ _: &mut SharedTunnelStateValues,
+ (close_handle, exited, after_disconnect): Self::Bootstrap,
+ ) -> StateEntryResult {
+ let close_result = close_handle
+ .close()
+ .chain_err(|| "Failed to request tunnel monitor to close the tunnel");
+
+ if let Err(error) = close_result {
+ error!("{}", error.display_chain());
+ }
+
+ Ok(TunnelStateWrapper::from(DisconnectingState {
+ exited,
+ after_disconnect,
+ }))
+ }
+
+ fn handle_event(
+ self,
+ commands: &mut mpsc::UnboundedReceiver<TunnelCommand>,
+ shared_values: &mut SharedTunnelStateValues,
+ ) -> EventConsequence<Self> {
+ self.handle_commands(commands)
+ .or_else(Self::handle_exit_event, shared_values)
+ }
+}
+
+/// Which state should be transitioned to after disconnection is complete.
+pub enum AfterDisconnect {
+ Nothing,
+ Reconnect(TunnelParameters),
+}
diff --git a/mullvad-daemon/src/tunnel_state_machine/macros.rs b/mullvad-daemon/src/tunnel_state_machine/macros.rs
new file mode 100644
index 0000000000..2241c8cb06
--- /dev/null
+++ b/mullvad-daemon/src/tunnel_state_machine/macros.rs
@@ -0,0 +1,21 @@
+/// Try to receive an event from a `Stream`'s asynchronous poll expression.
+///
+/// This macro is similar to the `try_ready!` macro provided in `futures`. If there is an event
+/// ready, it will be returned wrapped in a `Result`. If there are no events ready to be received,
+/// the outer function will return with a transition that indicates that no events were received,
+/// which is analogous to `Async::NotReady`.
+///
+/// When the asynchronous event indicates that the stream has finished or that it has failed, an
+/// error type is returned so that either close scenario can be handled in a similar way.
+macro_rules! try_handle_event {
+ ($same_state:expr, $event:expr) => {
+ match $event {
+ Ok(::futures::Async::Ready(Some(event))) => Ok(event),
+ Ok(::futures::Async::Ready(None)) => Err(None),
+ Ok(::futures::Async::NotReady) => {
+ return ::tunnel_state_machine::EventConsequence::NoEvents($same_state);
+ }
+ Err(error) => Err(Some(error)),
+ }
+ };
+}
diff --git a/mullvad-daemon/src/tunnel_state_machine/mod.rs b/mullvad-daemon/src/tunnel_state_machine/mod.rs
new file mode 100644
index 0000000000..1e84e41d67
--- /dev/null
+++ b/mullvad-daemon/src/tunnel_state_machine/mod.rs
@@ -0,0 +1,389 @@
+#[macro_use]
+mod macros;
+
+mod connected_state;
+mod connecting_state;
+mod disconnected_state;
+mod disconnecting_state;
+
+use std::fmt::{Debug, Formatter, Result as FmtResult};
+use std::path::{Path, PathBuf};
+use std::sync::mpsc as sync_mpsc;
+use std::thread;
+
+use error_chain::ChainedError;
+use futures::sync::mpsc;
+use futures::{Async, Future, Poll, Stream};
+use tokio_core::reactor::Core;
+
+use mullvad_types::account::AccountToken;
+use talpid_core::firewall::{Firewall, FirewallProxy};
+use talpid_core::mpsc::IntoSender;
+use talpid_types::net::{TunnelEndpoint, TunnelOptions};
+
+use self::connected_state::{ConnectedState, ConnectedStateBootstrap};
+use self::connecting_state::ConnectingState;
+use self::disconnected_state::DisconnectedState;
+use self::disconnecting_state::{AfterDisconnect, DisconnectingState};
+
+error_chain! {
+ errors {
+ FirewallError {
+ description("Firewall error")
+ }
+ ReactorError {
+ description("Failed to initialize tunnel state machine event loop executor")
+ }
+ }
+}
+
+/// Spawn the tunnel state machine thread, returning a channel for sending tunnel commands.
+pub fn spawn<P, T>(
+ cache_dir: P,
+ state_change_listener: IntoSender<TunnelStateTransition, T>,
+) -> Result<mpsc::UnboundedSender<TunnelCommand>>
+where
+ P: AsRef<Path> + Send + 'static,
+ T: From<TunnelStateTransition> + Send + 'static,
+{
+ let (command_tx, command_rx) = mpsc::unbounded();
+ let (startup_result_tx, startup_result_rx) = sync_mpsc::channel();
+
+ thread::spawn(
+ move || match create_event_loop(cache_dir, command_rx, state_change_listener) {
+ Ok((mut reactor, event_loop)) => {
+ startup_result_tx.send(Ok(())).expect(
+ "Tunnel state machine won't be started because the owner thread crashed",
+ );
+
+ if let Err(error) = reactor.run(event_loop) {
+ let chained_error =
+ Error::with_chain(error, "Tunnel state machine exited with an error");
+ error!("{}", chained_error.display_chain());
+ }
+ }
+ Err(startup_error) => {
+ startup_result_tx
+ .send(Err(startup_error))
+ .expect("Failed to send startup error");
+ }
+ },
+ );
+
+ startup_result_rx
+ .recv()
+ .expect("Failed to start tunnel state machine thread")
+ .map(|_| command_tx)
+}
+
+fn create_event_loop<P, T>(
+ cache_dir: P,
+ commands: mpsc::UnboundedReceiver<TunnelCommand>,
+ state_change_listener: IntoSender<TunnelStateTransition, T>,
+) -> Result<(Core, impl Future<Item = (), Error = Error>)>
+where
+ P: AsRef<Path>,
+ T: From<TunnelStateTransition> + Send + 'static,
+{
+ let reactor = Core::new().chain_err(|| ErrorKind::ReactorError)?;
+ let state_machine = TunnelStateMachine::new(&cache_dir, commands)?;
+
+ let future = state_machine.for_each(move |state_change_event| {
+ state_change_listener
+ .send(state_change_event)
+ .chain_err(|| "Failed to send state change event to listener")
+ });
+
+ Ok((reactor, future))
+}
+
+/// Representation of external commands for the tunnel state machine.
+pub enum TunnelCommand {
+ /// Enable or disable LAN access in the firewall.
+ AllowLan(bool),
+ /// Open tunnel connection.
+ Connect(TunnelParameters),
+ /// Close tunnel connection.
+ Disconnect,
+}
+
+/// Information necessary to open a tunnel.
+#[derive(Debug, PartialEq)]
+pub struct TunnelParameters {
+ pub endpoint: TunnelEndpoint,
+ pub options: TunnelOptions,
+ pub log_dir: Option<PathBuf>,
+ pub resource_dir: PathBuf,
+ pub account_token: AccountToken,
+ pub allow_lan: bool,
+}
+
+/// Event resulting from a transition to a new tunnel state.
+#[derive(Clone, Copy, Debug, PartialEq)]
+pub enum TunnelStateTransition {
+ Disconnected,
+ Connecting,
+ Connected,
+ Disconnecting,
+}
+
+/// Asynchronous handling of the tunnel state machine.
+///
+/// This type implements `Stream`, and attempts to advance the state machine based on the events
+/// received on the commands stream and possibly on events that specific states are also listening
+/// to. Every time it successfully advances the state machine a `TunnelStateTransition` is emitted
+/// by the stream.
+struct TunnelStateMachine {
+ current_state: Option<TunnelStateWrapper>,
+ commands: mpsc::UnboundedReceiver<TunnelCommand>,
+ shared_values: SharedTunnelStateValues,
+}
+
+impl TunnelStateMachine {
+ fn new<P: AsRef<Path>>(
+ cache_dir: P,
+ commands: mpsc::UnboundedReceiver<TunnelCommand>,
+ ) -> Result<Self> {
+ let firewall = FirewallProxy::new(cache_dir).chain_err(|| ErrorKind::FirewallError)?;
+ let mut shared_values = SharedTunnelStateValues { firewall };
+
+ let initial_state = TunnelStateWrapper::enter(&mut shared_values, ())
+ .expect("Failed to create initial tunnel state");
+
+ Ok(TunnelStateMachine {
+ current_state: Some(initial_state),
+ commands,
+ shared_values,
+ })
+ }
+}
+
+impl Stream for TunnelStateMachine {
+ type Item = TunnelStateTransition;
+ type Error = Error;
+
+ fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ let mut state = match self.current_state.take() {
+ Some(state) => state,
+ None => {
+ // State machine has halted
+ return Ok(Async::Ready(None));
+ }
+ };
+
+ loop {
+ let event_consequence = state.handle_event(&mut self.commands, &mut self.shared_values);
+ let action = TunnelStateMachineAction::from(event_consequence);
+
+ match action {
+ TunnelStateMachineAction::Repeat(returned_state) => {
+ state = returned_state;
+ }
+ TunnelStateMachineAction::Notify(state, result) => {
+ self.current_state = state;
+ return result;
+ }
+ }
+ }
+ }
+}
+
+/// Action the state machine should take, which is discovered base on an event consequence.
+///
+/// The action can be to execute another iteration or to notify that something happened. Executing
+/// another iteration happens when an event is received and ignored, which causes the tunnel state
+/// machine to stay in the same state. The state machine can notify its caller that a state
+/// transition has occurred, that it has finished, or that it has paused to wait for new events.
+enum TunnelStateMachineAction {
+ Repeat(TunnelStateWrapper),
+ Notify(
+ Option<TunnelStateWrapper>,
+ Poll<Option<TunnelStateTransition>, Error>,
+ ),
+}
+
+impl From<EventConsequence<TunnelStateWrapper>> for TunnelStateMachineAction {
+ fn from(event_consequence: EventConsequence<TunnelStateWrapper>) -> Self {
+ use self::EventConsequence::*;
+ use self::TunnelStateMachineAction::*;
+
+ match event_consequence {
+ NewState(Ok(state)) | NewState(Err((_, state))) => {
+ let transition = state.info();
+
+ Notify(Some(state), Ok(Async::Ready(Some(transition))))
+ }
+ SameState(state) => Repeat(state),
+ NoEvents(state) => Notify(Some(state), Ok(Async::NotReady)),
+ Finished => Notify(None, Ok(Async::Ready(None))),
+ }
+ }
+}
+
+/// Values that are common to all tunnel states.
+struct SharedTunnelStateValues {
+ firewall: FirewallProxy,
+}
+
+/// Asynchronous result of an attempt to progress a state.
+enum EventConsequence<T: TunnelState> {
+ /// Transition to a new state.
+ NewState(StateEntryResult),
+ /// An event was received, but it was ignored by the state so no transition is performed.
+ SameState(T),
+ /// No events were received, the event loop should block until one becomes available.
+ NoEvents(T),
+ /// The state machine has finished its execution.
+ Finished,
+}
+
+impl<T> EventConsequence<T>
+where
+ T: TunnelState,
+{
+ /// Helper method to chain handling multiple different event types.
+ ///
+ /// The `handle_event` is only called if no events were handled so far.
+ pub fn or_else<F>(self, handle_event: F, shared_values: &mut SharedTunnelStateValues) -> Self
+ where
+ F: FnOnce(T, &mut SharedTunnelStateValues) -> Self,
+ {
+ use self::EventConsequence::*;
+
+ match self {
+ NoEvents(state) => handle_event(state, shared_values),
+ consequence => consequence,
+ }
+ }
+}
+
+/// Result of entering a `T: TunnelState`.
+///
+/// It is either the state itself when successful, or an error paired with a fallback state.
+type StateEntryResult = ::std::result::Result<TunnelStateWrapper, (Error, TunnelStateWrapper)>;
+
+/// Trait that contains the method all states should implement to handle an event and advance the
+/// state machine.
+trait TunnelState: Into<TunnelStateWrapper> + Sized {
+ /// Type representing extra information required for entering the state.
+ type Bootstrap;
+
+ /// Constructor function.
+ ///
+ /// This is the state entry point. It attempts to enter the state, and may fail by entering an
+ /// error or fallback state instead.
+ fn enter(
+ shared_values: &mut SharedTunnelStateValues,
+ bootstrap: Self::Bootstrap,
+ ) -> StateEntryResult;
+
+ /// Main state function.
+ ///
+ /// This is state exit point. It consumes itself and returns the next state to advance to when
+ /// it has completed, or itself if it wants to ignore a received event or if no events were
+ /// ready to be received. See [`EventConsequence`] for more details.
+ ///
+ /// An implementation can handle events from many sources, but it should also handle command
+ /// events received through the provided `commands` stream.
+ ///
+ /// [`EventConsequence`]: enum.EventConsequence.html
+ fn handle_event(
+ self,
+ commands: &mut mpsc::UnboundedReceiver<TunnelCommand>,
+ shared_values: &mut SharedTunnelStateValues,
+ ) -> EventConsequence<Self>;
+}
+
+/// Valid states of the tunnel.
+///
+/// All implementations must implement `TunnelState` so that they can handle events and
+/// commands in order to advance the state machine.
+enum TunnelStateWrapper {
+ Disconnected(DisconnectedState),
+ Connecting(ConnectingState),
+ Connected(ConnectedState),
+ Disconnecting(DisconnectingState),
+}
+
+impl TunnelStateWrapper {
+ /// Returns information describing the state.
+ fn info(&self) -> TunnelStateTransition {
+ match *self {
+ TunnelStateWrapper::Disconnected(_) => TunnelStateTransition::Disconnected,
+ TunnelStateWrapper::Connecting(_) => TunnelStateTransition::Connecting,
+ TunnelStateWrapper::Connected(_) => TunnelStateTransition::Connected,
+ TunnelStateWrapper::Disconnecting(_) => TunnelStateTransition::Disconnecting,
+ }
+ }
+}
+
+macro_rules! impl_from_for_tunnel_state {
+ ($state_variant:ident($state_type:ident)) => {
+ impl From<$state_type> for TunnelStateWrapper {
+ fn from(state: $state_type) -> Self {
+ TunnelStateWrapper::$state_variant(state)
+ }
+ }
+ };
+}
+
+impl_from_for_tunnel_state!(Disconnected(DisconnectedState));
+impl_from_for_tunnel_state!(Connecting(ConnectingState));
+impl_from_for_tunnel_state!(Connected(ConnectedState));
+impl_from_for_tunnel_state!(Disconnecting(DisconnectingState));
+
+impl TunnelState for TunnelStateWrapper {
+ type Bootstrap = <DisconnectedState as TunnelState>::Bootstrap;
+
+ fn enter(
+ shared_values: &mut SharedTunnelStateValues,
+ bootstrap: Self::Bootstrap,
+ ) -> StateEntryResult {
+ DisconnectedState::enter(shared_values, bootstrap)
+ }
+
+ fn handle_event(
+ self,
+ commands: &mut mpsc::UnboundedReceiver<TunnelCommand>,
+ shared_values: &mut SharedTunnelStateValues,
+ ) -> EventConsequence<TunnelStateWrapper> {
+ use self::EventConsequence::*;
+
+ macro_rules! handle_event {
+ ( $($state:ident),* $(,)* ) => {
+ match self {
+ $(
+ TunnelStateWrapper::$state(state) => {
+ match state.handle_event(commands, shared_values) {
+ NewState(tunnel_state) => NewState(tunnel_state),
+ SameState(state) => SameState(TunnelStateWrapper::$state(state)),
+ NoEvents(state) => NoEvents(TunnelStateWrapper::$state(state)),
+ Finished => Finished,
+ }
+ }
+ )*
+ }
+ }
+ }
+
+ handle_event! {
+ Disconnected,
+ Connecting,
+ Connected,
+ Disconnecting,
+ }
+ }
+}
+
+impl Debug for TunnelStateWrapper {
+ fn fmt(&self, formatter: &mut Formatter) -> FmtResult {
+ use self::TunnelStateWrapper::*;
+
+ match *self {
+ Disconnected(_) => write!(formatter, "TunnelStateWrapper::Disconnected(_)"),
+ Connecting(_) => write!(formatter, "TunnelStateWrapper::Connecting(_)"),
+ Connected(_) => write!(formatter, "TunnelStateWrapper::Connected(_)"),
+ Disconnecting(_) => write!(formatter, "TunnelStateWrapper::Disconnecting(_)"),
+ }
+ }
+}
diff --git a/talpid-core/src/tunnel/mod.rs b/talpid-core/src/tunnel/mod.rs
index 932279c0a0..98d2d0e2d0 100644
--- a/talpid-core/src/tunnel/mod.rs
+++ b/talpid-core/src/tunnel/mod.rs
@@ -280,7 +280,7 @@ impl TunnelMonitor {
CloseHandle(self.monitor.close_handle())
}
- /// Consumes the monitor and block until the tunnel exits or there is an error.
+ /// Consumes the monitor and blocks until the tunnel exits or there is an error.
pub fn wait(self) -> Result<()> {
self.monitor
.wait()