summaryrefslogtreecommitdiffhomepage
path: root/talpid-core/src
diff options
context:
space:
mode:
authorJanito Vaqueiro Ferreira Filho <janito@mullvad.net>2018-08-27 13:24:49 -0300
committerJanito Vaqueiro Ferreira Filho <janito@mullvad.net>2018-08-29 10:35:37 -0300
commit0b6a3e38032248d7d33c3ec84b643ba609d7f96f (patch)
tree92619604cdd321ffcc5730b278f524e3eb5ce6f1 /talpid-core/src
parent4de28f0913f11f1b4c29e4e0d4cc85a664c1c97b (diff)
downloadmullvadvpn-0b6a3e38032248d7d33c3ec84b643ba609d7f96f.tar.xz
mullvadvpn-0b6a3e38032248d7d33c3ec84b643ba609d7f96f.zip
Move tunnel state machine to Talpid
Diffstat (limited to 'talpid-core/src')
-rw-r--r--talpid-core/src/lib.rs5
-rw-r--r--talpid-core/src/tunnel_state_machine/connected_state.rs183
-rw-r--r--talpid-core/src/tunnel_state_machine/connecting_state.rs294
-rw-r--r--talpid-core/src/tunnel_state_machine/disconnected_state.rs48
-rw-r--r--talpid-core/src/tunnel_state_machine/disconnecting_state.rs103
-rw-r--r--talpid-core/src/tunnel_state_machine/macros.rs21
-rw-r--r--talpid-core/src/tunnel_state_machine/mod.rs381
7 files changed, 1035 insertions, 0 deletions
diff --git a/talpid-core/src/lib.rs b/talpid-core/src/lib.rs
index b96764848f..55df5e135f 100644
--- a/talpid-core/src/lib.rs
+++ b/talpid-core/src/lib.rs
@@ -19,6 +19,7 @@ extern crate log;
extern crate error_chain;
#[cfg(target_os = "linux")]
extern crate failure;
+extern crate futures;
#[cfg(unix)]
extern crate ipnetwork;
extern crate jsonrpc_core;
@@ -29,6 +30,7 @@ extern crate jsonrpc_macros;
extern crate lazy_static;
extern crate libc;
extern crate shell_escape;
+extern crate tokio_core;
extern crate uuid;
#[cfg(target_os = "linux")]
extern crate which;
@@ -56,4 +58,7 @@ pub mod mpsc;
/// Abstractions over operating system network security settings.
pub mod security;
+/// State machine to handle tunnel configuration.
+pub mod tunnel_state_machine;
+
mod mktemp;
diff --git a/talpid-core/src/tunnel_state_machine/connected_state.rs b/talpid-core/src/tunnel_state_machine/connected_state.rs
new file mode 100644
index 0000000000..1b119eb387
--- /dev/null
+++ b/talpid-core/src/tunnel_state_machine/connected_state.rs
@@ -0,0 +1,183 @@
+use futures::sync::{mpsc, oneshot};
+use futures::{Async, Future, Stream};
+
+use talpid_types::net::TunnelEndpoint;
+
+use super::{
+ AfterDisconnect, ConnectingState, DisconnectingState, EventConsequence, Result, ResultExt,
+ SharedTunnelStateValues, StateEntryResult, TunnelCommand, TunnelParameters, TunnelState,
+ TunnelStateWrapper,
+};
+use security::{NetworkSecurity, SecurityPolicy};
+use tunnel::{CloseHandle, TunnelEvent, TunnelMetadata};
+
+pub struct ConnectedStateBootstrap {
+ pub metadata: TunnelMetadata,
+ pub tunnel_events: mpsc::UnboundedReceiver<TunnelEvent>,
+ pub tunnel_endpoint: TunnelEndpoint,
+ pub tunnel_parameters: TunnelParameters,
+ pub tunnel_close_event: oneshot::Receiver<()>,
+ pub close_handle: CloseHandle,
+}
+
+/// The tunnel is up and working.
+pub struct ConnectedState {
+ metadata: TunnelMetadata,
+ tunnel_events: mpsc::UnboundedReceiver<TunnelEvent>,
+ tunnel_endpoint: TunnelEndpoint,
+ tunnel_parameters: TunnelParameters,
+ tunnel_close_event: oneshot::Receiver<()>,
+ close_handle: CloseHandle,
+}
+
+impl ConnectedState {
+ fn from(bootstrap: ConnectedStateBootstrap) -> Self {
+ ConnectedState {
+ metadata: bootstrap.metadata,
+ tunnel_events: bootstrap.tunnel_events,
+ tunnel_endpoint: bootstrap.tunnel_endpoint,
+ tunnel_parameters: bootstrap.tunnel_parameters,
+ tunnel_close_event: bootstrap.tunnel_close_event,
+ close_handle: bootstrap.close_handle,
+ }
+ }
+
+ fn set_security_policy(&self, shared_values: &mut SharedTunnelStateValues) -> Result<()> {
+ let policy = SecurityPolicy::Connected {
+ relay_endpoint: self.tunnel_endpoint.to_endpoint(),
+ tunnel: self.metadata.clone(),
+ allow_lan: self.tunnel_parameters.allow_lan,
+ };
+
+ debug!("Setting security policy: {:?}", policy);
+ shared_values
+ .security
+ .apply_policy(policy)
+ .chain_err(|| "Failed to apply security policy for connected state")
+ }
+
+ fn handle_commands(
+ mut self,
+ commands: &mut mpsc::UnboundedReceiver<TunnelCommand>,
+ shared_values: &mut SharedTunnelStateValues,
+ ) -> EventConsequence<Self> {
+ use self::EventConsequence::*;
+
+ match try_handle_event!(self, commands.poll()) {
+ Ok(TunnelCommand::Connect(parameters)) => {
+ if parameters != self.tunnel_parameters {
+ NewState(DisconnectingState::enter(
+ shared_values,
+ (
+ self.close_handle,
+ self.tunnel_close_event,
+ AfterDisconnect::Reconnect(parameters),
+ ),
+ ))
+ } else {
+ SameState(self)
+ }
+ }
+ Ok(TunnelCommand::Disconnect) | Err(_) => NewState(DisconnectingState::enter(
+ shared_values,
+ (
+ self.close_handle,
+ self.tunnel_close_event,
+ AfterDisconnect::Nothing,
+ ),
+ )),
+ Ok(TunnelCommand::AllowLan(allow_lan)) => {
+ self.tunnel_parameters.allow_lan = allow_lan;
+
+ match self.set_security_policy(shared_values) {
+ Ok(()) => SameState(self),
+ Err(error) => {
+ error!("{}", error.chain_err(|| "Failed to update security policy"));
+ NewState(DisconnectingState::enter(
+ shared_values,
+ (
+ self.close_handle,
+ self.tunnel_close_event,
+ AfterDisconnect::Nothing,
+ ),
+ ))
+ }
+ }
+ }
+ }
+ }
+
+ fn handle_tunnel_events(
+ mut self,
+ shared_values: &mut SharedTunnelStateValues,
+ ) -> EventConsequence<Self> {
+ use self::EventConsequence::*;
+
+ match try_handle_event!(self, self.tunnel_events.poll()) {
+ Ok(TunnelEvent::Down) | Err(_) => NewState(DisconnectingState::enter(
+ shared_values,
+ (
+ self.close_handle,
+ self.tunnel_close_event,
+ AfterDisconnect::Reconnect(self.tunnel_parameters),
+ ),
+ )),
+ Ok(_) => SameState(self),
+ }
+ }
+
+ fn handle_tunnel_close_event(
+ mut self,
+ shared_values: &mut SharedTunnelStateValues,
+ ) -> EventConsequence<Self> {
+ use self::EventConsequence::*;
+
+ match self.tunnel_close_event.poll() {
+ Ok(Async::Ready(_)) => {}
+ Ok(Async::NotReady) => return NoEvents(self),
+ Err(_cancelled) => warn!("Tunnel monitor thread has stopped unexpectedly"),
+ }
+
+ info!("Tunnel closed. Reconnecting.");
+ NewState(ConnectingState::enter(
+ shared_values,
+ self.tunnel_parameters,
+ ))
+ }
+}
+
+impl TunnelState for ConnectedState {
+ type Bootstrap = ConnectedStateBootstrap;
+
+ fn enter(
+ shared_values: &mut SharedTunnelStateValues,
+ bootstrap: Self::Bootstrap,
+ ) -> StateEntryResult {
+ let connected_state = ConnectedState::from(bootstrap);
+
+ match connected_state.set_security_policy(shared_values) {
+ Ok(()) => Ok(TunnelStateWrapper::from(connected_state)),
+ Err(error) => Err((
+ error,
+ DisconnectingState::enter(
+ shared_values,
+ (
+ connected_state.close_handle,
+ connected_state.tunnel_close_event,
+ AfterDisconnect::Nothing,
+ ),
+ ).expect("Failed to disconnect after failed transition to connected state"),
+ )),
+ }
+ }
+
+ fn handle_event(
+ self,
+ commands: &mut mpsc::UnboundedReceiver<TunnelCommand>,
+ shared_values: &mut SharedTunnelStateValues,
+ ) -> EventConsequence<Self> {
+ self.handle_commands(commands, shared_values)
+ .or_else(Self::handle_tunnel_events, shared_values)
+ .or_else(Self::handle_tunnel_close_event, shared_values)
+ }
+}
diff --git a/talpid-core/src/tunnel_state_machine/connecting_state.rs b/talpid-core/src/tunnel_state_machine/connecting_state.rs
new file mode 100644
index 0000000000..2588da0e77
--- /dev/null
+++ b/talpid-core/src/tunnel_state_machine/connecting_state.rs
@@ -0,0 +1,294 @@
+use std::ffi::OsString;
+use std::path::PathBuf;
+use std::sync::Mutex;
+use std::thread;
+use std::time::{Duration, Instant};
+
+use futures::sink::Wait;
+use futures::sync::{mpsc, oneshot};
+use futures::{Async, Future, Sink, Stream};
+
+use talpid_types::net::{TunnelEndpoint, TunnelEndpointData};
+
+use super::{
+ AfterDisconnect, ConnectedState, ConnectedStateBootstrap, DisconnectedState,
+ DisconnectingState, EventConsequence, Result, ResultExt, SharedTunnelStateValues,
+ StateEntryResult, TunnelCommand, TunnelParameters, TunnelState, TunnelStateWrapper,
+};
+use logging;
+use security::{NetworkSecurity, SecurityPolicy};
+use tunnel::{CloseHandle, TunnelEvent, TunnelMetadata, TunnelMonitor};
+
+const MIN_TUNNEL_ALIVE_TIME: Duration = Duration::from_millis(1000);
+
+const OPENVPN_LOG_FILENAME: &str = "openvpn.log";
+const WIREGUARD_LOG_FILENAME: &str = "wireguard.log";
+
+#[cfg(windows)]
+const TUNNEL_INTERFACE_ALIAS: Option<&str> = Some("Mullvad");
+#[cfg(not(windows))]
+const TUNNEL_INTERFACE_ALIAS: Option<&str> = None;
+
+/// The tunnel has been started, but it is not established/functional.
+pub struct ConnectingState {
+ tunnel_events: mpsc::UnboundedReceiver<TunnelEvent>,
+ tunnel_endpoint: TunnelEndpoint,
+ tunnel_parameters: TunnelParameters,
+ tunnel_close_event: oneshot::Receiver<()>,
+ close_handle: CloseHandle,
+}
+
+impl ConnectingState {
+ fn new(
+ shared_values: &mut SharedTunnelStateValues,
+ parameters: TunnelParameters,
+ ) -> Result<Self> {
+ Self::set_security_policy(shared_values, parameters.endpoint, parameters.allow_lan)?;
+
+ let tunnel_endpoint = parameters.endpoint;
+ let (tunnel_events, tunnel_close_event, close_handle) = Self::start_tunnel(&parameters)?;
+
+ Ok(ConnectingState {
+ tunnel_events,
+ tunnel_endpoint,
+ tunnel_parameters: parameters,
+ tunnel_close_event,
+ close_handle,
+ })
+ }
+
+ fn set_security_policy(
+ shared_values: &mut SharedTunnelStateValues,
+ endpoint: TunnelEndpoint,
+ allow_lan: bool,
+ ) -> Result<()> {
+ let policy = SecurityPolicy::Connecting {
+ relay_endpoint: endpoint.to_endpoint(),
+ allow_lan,
+ };
+
+ debug!("Setting security policy: {:?}", policy);
+ shared_values
+ .security
+ .apply_policy(policy)
+ .chain_err(|| "Failed to apply security policy for connecting state")
+ }
+
+ fn start_tunnel(
+ parameters: &TunnelParameters,
+ ) -> Result<(
+ mpsc::UnboundedReceiver<TunnelEvent>,
+ oneshot::Receiver<()>,
+ CloseHandle,
+ )> {
+ let (event_tx, event_rx) = mpsc::unbounded();
+ let monitor = Self::spawn_tunnel_monitor(&parameters, event_tx.wait())?;
+ let close_handle = monitor.close_handle();
+ let tunnel_close_event = Self::spawn_tunnel_monitor_wait_thread(monitor);
+
+ Ok((event_rx, tunnel_close_event, close_handle))
+ }
+
+ fn spawn_tunnel_monitor(
+ parameters: &TunnelParameters,
+ events: Wait<mpsc::UnboundedSender<TunnelEvent>>,
+ ) -> Result<TunnelMonitor> {
+ let event_tx = Mutex::new(events);
+ let on_tunnel_event = move |event| {
+ let send_result = event_tx
+ .lock()
+ .expect("A thread panicked while sending a tunnel event")
+ .send(event);
+
+ if send_result.is_err() {
+ warn!("Tunnel state machine stopped before tunnel event was received");
+ }
+ };
+ let log_file = Self::prepare_tunnel_log_file(&parameters)?;
+
+ TunnelMonitor::new(
+ parameters.endpoint,
+ &parameters.options,
+ TUNNEL_INTERFACE_ALIAS.to_owned().map(OsString::from),
+ &parameters.username,
+ log_file.as_ref().map(PathBuf::as_path),
+ &parameters.resource_dir,
+ on_tunnel_event,
+ ).chain_err(|| "Unable to start tunnel monitor")
+ }
+
+ fn prepare_tunnel_log_file(parameters: &TunnelParameters) -> Result<Option<PathBuf>> {
+ if let Some(ref log_dir) = parameters.log_dir {
+ let filename = match parameters.endpoint.tunnel {
+ TunnelEndpointData::OpenVpn(_) => OPENVPN_LOG_FILENAME,
+ TunnelEndpointData::Wireguard(_) => WIREGUARD_LOG_FILENAME,
+ };
+ let tunnel_log = log_dir.join(filename);
+ logging::rotate_log(&tunnel_log).chain_err(|| "Unable to rotate tunnel log")?;
+ Ok(Some(tunnel_log))
+ } else {
+ Ok(None)
+ }
+ }
+
+ fn spawn_tunnel_monitor_wait_thread(tunnel_monitor: TunnelMonitor) -> oneshot::Receiver<()> {
+ let (tunnel_close_event_tx, tunnel_close_event_rx) = oneshot::channel();
+
+ thread::spawn(move || {
+ let start = Instant::now();
+
+ match tunnel_monitor.wait() {
+ Ok(_) => debug!("Tunnel has finished without errors"),
+ Err(error) => {
+ let chained_error = error.chain_err(|| "Tunnel has stopped unexpectedly");
+ warn!("{}", chained_error);
+ }
+ }
+
+ if let Some(remaining_time) = MIN_TUNNEL_ALIVE_TIME.checked_sub(start.elapsed()) {
+ thread::sleep(remaining_time);
+ }
+
+ if tunnel_close_event_tx.send(()).is_err() {
+ warn!("Tunnel state machine stopped before receiving tunnel closed event");
+ }
+
+ trace!("Tunnel monitor thread exit");
+ });
+
+ tunnel_close_event_rx
+ }
+
+ fn into_connected_state_bootstrap(self, metadata: TunnelMetadata) -> ConnectedStateBootstrap {
+ ConnectedStateBootstrap {
+ metadata,
+ tunnel_events: self.tunnel_events,
+ tunnel_endpoint: self.tunnel_endpoint,
+ tunnel_parameters: self.tunnel_parameters,
+ tunnel_close_event: self.tunnel_close_event,
+ close_handle: self.close_handle,
+ }
+ }
+
+ fn handle_commands(
+ mut self,
+ commands: &mut mpsc::UnboundedReceiver<TunnelCommand>,
+ shared_values: &mut SharedTunnelStateValues,
+ ) -> EventConsequence<Self> {
+ use self::EventConsequence::*;
+
+ match try_handle_event!(self, commands.poll()) {
+ Ok(TunnelCommand::Connect(parameters)) => {
+ if parameters != self.tunnel_parameters {
+ NewState(DisconnectingState::enter(
+ shared_values,
+ (
+ self.close_handle,
+ self.tunnel_close_event,
+ AfterDisconnect::Reconnect(parameters),
+ ),
+ ))
+ } else {
+ SameState(self)
+ }
+ }
+ Ok(TunnelCommand::Disconnect) | Err(_) => NewState(DisconnectingState::enter(
+ shared_values,
+ (
+ self.close_handle,
+ self.tunnel_close_event,
+ AfterDisconnect::Nothing,
+ ),
+ )),
+ Ok(TunnelCommand::AllowLan(allow_lan)) => {
+ self.tunnel_parameters.allow_lan = allow_lan;
+ match Self::set_security_policy(shared_values, self.tunnel_endpoint, allow_lan) {
+ Ok(()) => SameState(self),
+ Err(error) => {
+ error!("{}", error.chain_err(|| "Failed to update security policy"));
+ NewState(DisconnectingState::enter(
+ shared_values,
+ (
+ self.close_handle,
+ self.tunnel_close_event,
+ AfterDisconnect::Nothing,
+ ),
+ ))
+ }
+ }
+ }
+ }
+ }
+
+ fn handle_tunnel_events(
+ mut self,
+ shared_values: &mut SharedTunnelStateValues,
+ ) -> EventConsequence<Self> {
+ use self::EventConsequence::*;
+
+ match try_handle_event!(self, self.tunnel_events.poll()) {
+ Ok(TunnelEvent::Up(metadata)) => NewState(ConnectedState::enter(
+ shared_values,
+ self.into_connected_state_bootstrap(metadata),
+ )),
+ Ok(_) => SameState(self),
+ Err(_) => NewState(DisconnectingState::enter(
+ shared_values,
+ (
+ self.close_handle,
+ self.tunnel_close_event,
+ AfterDisconnect::Reconnect(self.tunnel_parameters),
+ ),
+ )),
+ }
+ }
+
+ fn handle_tunnel_close_event(
+ mut self,
+ shared_values: &mut SharedTunnelStateValues,
+ ) -> EventConsequence<Self> {
+ use self::EventConsequence::*;
+
+ match self.tunnel_close_event.poll() {
+ Ok(Async::Ready(_)) => {}
+ Ok(Async::NotReady) => return NoEvents(self),
+ Err(_cancelled) => warn!("Tunnel monitor thread has stopped unexpectedly"),
+ }
+
+ info!("Tunnel closed. Reconnecting.");
+ NewState(ConnectingState::enter(
+ shared_values,
+ self.tunnel_parameters,
+ ))
+ }
+}
+
+impl TunnelState for ConnectingState {
+ type Bootstrap = TunnelParameters;
+
+ fn enter(
+ shared_values: &mut SharedTunnelStateValues,
+ parameters: Self::Bootstrap,
+ ) -> StateEntryResult {
+ Self::new(shared_values, parameters)
+ .map(TunnelStateWrapper::from)
+ .chain_err(|| "Failed to start tunnel")
+ .map_err(|error| {
+ (
+ error,
+ DisconnectedState::enter(shared_values, ())
+ .expect("Failed to transition to fallback disconnected state"),
+ )
+ })
+ }
+
+ fn handle_event(
+ self,
+ commands: &mut mpsc::UnboundedReceiver<TunnelCommand>,
+ shared_values: &mut SharedTunnelStateValues,
+ ) -> EventConsequence<Self> {
+ self.handle_commands(commands, shared_values)
+ .or_else(Self::handle_tunnel_events, shared_values)
+ .or_else(Self::handle_tunnel_close_event, shared_values)
+ }
+}
diff --git a/talpid-core/src/tunnel_state_machine/disconnected_state.rs b/talpid-core/src/tunnel_state_machine/disconnected_state.rs
new file mode 100644
index 0000000000..694e280ec6
--- /dev/null
+++ b/talpid-core/src/tunnel_state_machine/disconnected_state.rs
@@ -0,0 +1,48 @@
+use error_chain::ChainedError;
+use futures::sync::mpsc;
+use futures::Stream;
+
+use super::{
+ ConnectingState, Error, EventConsequence, SharedTunnelStateValues, StateEntryResult,
+ TunnelCommand, TunnelState, TunnelStateWrapper,
+};
+use security::NetworkSecurity;
+
+/// No tunnel is running.
+pub struct DisconnectedState;
+
+impl DisconnectedState {
+ fn reset_security_policy(shared_values: &mut SharedTunnelStateValues) {
+ debug!("Resetting security policy");
+ if let Err(error) = shared_values.security.reset_policy() {
+ let chained_error = Error::with_chain(error, "Failed to reset security policy");
+ error!("{}", chained_error.display_chain());
+ }
+ }
+}
+
+impl TunnelState for DisconnectedState {
+ type Bootstrap = ();
+
+ fn enter(shared_values: &mut SharedTunnelStateValues, _: Self::Bootstrap) -> StateEntryResult {
+ Self::reset_security_policy(shared_values);
+
+ Ok(TunnelStateWrapper::from(DisconnectedState))
+ }
+
+ fn handle_event(
+ self,
+ commands: &mut mpsc::UnboundedReceiver<TunnelCommand>,
+ shared_values: &mut SharedTunnelStateValues,
+ ) -> EventConsequence<Self> {
+ use self::EventConsequence::*;
+
+ match try_handle_event!(self, commands.poll()) {
+ Ok(TunnelCommand::Connect(parameters)) => {
+ NewState(ConnectingState::enter(shared_values, parameters))
+ }
+ Ok(_) => SameState(self),
+ Err(_) => Finished,
+ }
+ }
+}
diff --git a/talpid-core/src/tunnel_state_machine/disconnecting_state.rs b/talpid-core/src/tunnel_state_machine/disconnecting_state.rs
new file mode 100644
index 0000000000..5b80ad0598
--- /dev/null
+++ b/talpid-core/src/tunnel_state_machine/disconnecting_state.rs
@@ -0,0 +1,103 @@
+use error_chain::ChainedError;
+use futures::sync::{mpsc, oneshot};
+use futures::{Async, Future, Stream};
+
+use super::{
+ ConnectingState, DisconnectedState, EventConsequence, ResultExt, SharedTunnelStateValues,
+ StateEntryResult, TunnelCommand, TunnelParameters, TunnelState, TunnelStateWrapper,
+};
+use tunnel::CloseHandle;
+
+/// This state is active from when we manually trigger a tunnel kill until the tunnel wait
+/// operation (TunnelExit) returned.
+pub struct DisconnectingState {
+ exited: oneshot::Receiver<()>,
+ after_disconnect: AfterDisconnect,
+}
+
+impl DisconnectingState {
+ fn handle_commands(
+ mut self,
+ commands: &mut mpsc::UnboundedReceiver<TunnelCommand>,
+ ) -> EventConsequence<Self> {
+ use self::AfterDisconnect::*;
+
+ let event = try_handle_event!(self, commands.poll());
+ let after_disconnect = self.after_disconnect;
+
+ self.after_disconnect = match after_disconnect {
+ AfterDisconnect::Nothing => match event {
+ Ok(TunnelCommand::Connect(parameters)) => Reconnect(parameters),
+ _ => Nothing,
+ },
+ AfterDisconnect::Reconnect(mut tunnel_parameters) => match event {
+ Ok(TunnelCommand::Connect(parameters)) => Reconnect(parameters),
+ Ok(TunnelCommand::AllowLan(allow_lan)) => {
+ tunnel_parameters.allow_lan = allow_lan;
+ Reconnect(tunnel_parameters)
+ }
+ Ok(TunnelCommand::Disconnect) | Err(_) => Nothing,
+ },
+ };
+
+ EventConsequence::SameState(self)
+ }
+
+ fn handle_exit_event(
+ mut self,
+ shared_values: &mut SharedTunnelStateValues,
+ ) -> EventConsequence<Self> {
+ use self::EventConsequence::*;
+
+ match self.exited.poll() {
+ Ok(Async::NotReady) => NoEvents(self),
+ Ok(Async::Ready(_)) | Err(_) => NewState(self.after_disconnect(shared_values)),
+ }
+ }
+
+ fn after_disconnect(self, shared_values: &mut SharedTunnelStateValues) -> StateEntryResult {
+ match self.after_disconnect {
+ AfterDisconnect::Nothing => DisconnectedState::enter(shared_values, ()),
+ AfterDisconnect::Reconnect(tunnel_parameters) => {
+ ConnectingState::enter(shared_values, tunnel_parameters)
+ }
+ }
+ }
+}
+
+impl TunnelState for DisconnectingState {
+ type Bootstrap = (CloseHandle, oneshot::Receiver<()>, AfterDisconnect);
+
+ fn enter(
+ _: &mut SharedTunnelStateValues,
+ (close_handle, exited, after_disconnect): Self::Bootstrap,
+ ) -> StateEntryResult {
+ let close_result = close_handle
+ .close()
+ .chain_err(|| "Failed to request tunnel monitor to close the tunnel");
+
+ if let Err(error) = close_result {
+ error!("{}", error.display_chain());
+ }
+
+ Ok(TunnelStateWrapper::from(DisconnectingState {
+ exited,
+ after_disconnect,
+ }))
+ }
+
+ fn handle_event(
+ self,
+ commands: &mut mpsc::UnboundedReceiver<TunnelCommand>,
+ shared_values: &mut SharedTunnelStateValues,
+ ) -> EventConsequence<Self> {
+ self.handle_commands(commands)
+ .or_else(Self::handle_exit_event, shared_values)
+ }
+}
+
+/// Which state should be transitioned to after disconnection is complete.
+pub enum AfterDisconnect {
+ Nothing,
+ Reconnect(TunnelParameters),
+}
diff --git a/talpid-core/src/tunnel_state_machine/macros.rs b/talpid-core/src/tunnel_state_machine/macros.rs
new file mode 100644
index 0000000000..2241c8cb06
--- /dev/null
+++ b/talpid-core/src/tunnel_state_machine/macros.rs
@@ -0,0 +1,21 @@
+/// Try to receive an event from a `Stream`'s asynchronous poll expression.
+///
+/// This macro is similar to the `try_ready!` macro provided in `futures`. If there is an event
+/// ready, it will be returned wrapped in a `Result`. If there are no events ready to be received,
+/// the outer function will return with a transition that indicates that no events were received,
+/// which is analogous to `Async::NotReady`.
+///
+/// When the asynchronous event indicates that the stream has finished or that it has failed, an
+/// error type is returned so that either close scenario can be handled in a similar way.
+macro_rules! try_handle_event {
+ ($same_state:expr, $event:expr) => {
+ match $event {
+ Ok(::futures::Async::Ready(Some(event))) => Ok(event),
+ Ok(::futures::Async::Ready(None)) => Err(None),
+ Ok(::futures::Async::NotReady) => {
+ return ::tunnel_state_machine::EventConsequence::NoEvents($same_state);
+ }
+ Err(error) => Err(Some(error)),
+ }
+ };
+}
diff --git a/talpid-core/src/tunnel_state_machine/mod.rs b/talpid-core/src/tunnel_state_machine/mod.rs
new file mode 100644
index 0000000000..c3d592c952
--- /dev/null
+++ b/talpid-core/src/tunnel_state_machine/mod.rs
@@ -0,0 +1,381 @@
+#[macro_use]
+mod macros;
+
+mod connected_state;
+mod connecting_state;
+mod disconnected_state;
+mod disconnecting_state;
+
+use std::fmt::{Debug, Formatter, Result as FmtResult};
+use std::path::{Path, PathBuf};
+use std::sync::mpsc as sync_mpsc;
+use std::thread;
+
+use error_chain::ChainedError;
+use futures::sync::mpsc;
+use futures::{Async, Future, Poll, Stream};
+use tokio_core::reactor::Core;
+
+use talpid_types::net::{TunnelEndpoint, TunnelOptions};
+
+use self::connected_state::{ConnectedState, ConnectedStateBootstrap};
+use self::connecting_state::ConnectingState;
+use self::disconnected_state::DisconnectedState;
+use self::disconnecting_state::{AfterDisconnect, DisconnectingState};
+use super::mpsc::IntoSender;
+use super::security::{NetworkSecurity, NetworkSecurityImpl};
+
+error_chain! {
+ errors {
+ /// An error occurred while setting up the network security.
+ NetworkSecurityError {
+ description("Network security error")
+ }
+ /// An error occurred while attempting to set up the event loop for the tunnel state
+ /// machine.
+ ReactorError {
+ description("Failed to initialize tunnel state machine event loop executor")
+ }
+ }
+}
+
+/// Spawn the tunnel state machine thread, returning a channel for sending tunnel commands.
+pub fn spawn<P, T>(
+ cache_dir: P,
+ state_change_listener: IntoSender<TunnelStateTransition, T>,
+) -> Result<mpsc::UnboundedSender<TunnelCommand>>
+where
+ P: AsRef<Path> + Send + 'static,
+ T: From<TunnelStateTransition> + Send + 'static,
+{
+ let (command_tx, command_rx) = mpsc::unbounded();
+ let (startup_result_tx, startup_result_rx) = sync_mpsc::channel();
+
+ thread::spawn(
+ move || match create_event_loop(cache_dir, command_rx, state_change_listener) {
+ Ok((mut reactor, event_loop)) => {
+ startup_result_tx.send(Ok(())).expect(
+ "Tunnel state machine won't be started because the owner thread crashed",
+ );
+
+ if let Err(error) = reactor.run(event_loop) {
+ let chained_error =
+ Error::with_chain(error, "Tunnel state machine exited with an error");
+ error!("{}", chained_error.display_chain());
+ }
+ }
+ Err(startup_error) => {
+ startup_result_tx
+ .send(Err(startup_error))
+ .expect("Failed to send startup error");
+ }
+ },
+ );
+
+ startup_result_rx
+ .recv()
+ .expect("Failed to start tunnel state machine thread")
+ .map(|_| command_tx)
+}
+
+fn create_event_loop<P, T>(
+ cache_dir: P,
+ commands: mpsc::UnboundedReceiver<TunnelCommand>,
+ state_change_listener: IntoSender<TunnelStateTransition, T>,
+) -> Result<(Core, impl Future<Item = (), Error = Error>)>
+where
+ P: AsRef<Path>,
+ T: From<TunnelStateTransition> + Send + 'static,
+{
+ let reactor = Core::new().chain_err(|| ErrorKind::ReactorError)?;
+ let state_machine = TunnelStateMachine::new(&cache_dir, commands)?;
+
+ let future = state_machine.for_each(move |state_change_event| {
+ state_change_listener
+ .send(state_change_event)
+ .chain_err(|| "Failed to send state change event to listener")
+ });
+
+ Ok((reactor, future))
+}
+
+/// Representation of external commands for the tunnel state machine.
+pub enum TunnelCommand {
+ /// Enable or disable LAN access in the firewall.
+ AllowLan(bool),
+ /// Open tunnel connection.
+ Connect(TunnelParameters),
+ /// Close tunnel connection.
+ Disconnect,
+}
+
+/// Information necessary to open a tunnel.
+#[derive(Debug, PartialEq)]
+pub struct TunnelParameters {
+ /// Tunnel enpoint to connect to.
+ pub endpoint: TunnelEndpoint,
+ /// Tunnel connection options.
+ pub options: TunnelOptions,
+ /// Directory to store tunnel log file.
+ pub log_dir: Option<PathBuf>,
+ /// Resource directory path.
+ pub resource_dir: PathBuf,
+ /// Username to use for setting up the tunnel.
+ pub username: String,
+ /// Should LAN access be allowed outside the tunnel.
+ pub allow_lan: bool,
+}
+
+/// Event resulting from a transition to a new tunnel state.
+#[derive(Clone, Copy, Debug, PartialEq)]
+pub enum TunnelStateTransition {
+ /// No connection is established and network is unsecured.
+ Disconnected,
+ /// Network is secured but tunnel is still connecting.
+ Connecting,
+ /// Tunnel is connected.
+ Connected,
+ /// Disconnecting tunnel.
+ Disconnecting,
+}
+
+/// Asynchronous handling of the tunnel state machine.
+///
+/// This type implements `Stream`, and attempts to advance the state machine based on the events
+/// received on the commands stream and possibly on events that specific states are also listening
+/// to. Every time it successfully advances the state machine a `TunnelStateTransition` is emitted
+/// by the stream.
+struct TunnelStateMachine {
+ current_state: Option<TunnelStateWrapper>,
+ commands: mpsc::UnboundedReceiver<TunnelCommand>,
+ shared_values: SharedTunnelStateValues,
+}
+
+impl TunnelStateMachine {
+ fn new<P: AsRef<Path>>(
+ cache_dir: P,
+ commands: mpsc::UnboundedReceiver<TunnelCommand>,
+ ) -> Result<Self> {
+ let security =
+ NetworkSecurityImpl::new(cache_dir).chain_err(|| ErrorKind::NetworkSecurityError)?;
+ let mut shared_values = SharedTunnelStateValues { security };
+
+ let initial_state = TunnelStateWrapper::new(&mut shared_values, ())
+ .expect("Failed to create initial tunnel state");
+
+ Ok(TunnelStateMachine {
+ current_state: Some(initial_state),
+ commands,
+ shared_values,
+ })
+ }
+}
+
+impl Stream for TunnelStateMachine {
+ type Item = TunnelStateTransition;
+ type Error = Error;
+
+ fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ while let Some(state_wrapper) = self.current_state.take() {
+ match state_wrapper.handle_event(&mut self.commands, &mut self.shared_values) {
+ TunnelStateMachineAction::Repeat(repeat_state_wrapper) => {
+ self.current_state = Some(repeat_state_wrapper);
+ }
+ TunnelStateMachineAction::Notify(state_wrapper, result) => {
+ self.current_state = state_wrapper;
+ return result;
+ }
+ }
+ }
+ Ok(Async::Ready(None))
+ }
+}
+
+/// Action the state machine should take, which is discovered base on an event consequence.
+///
+/// The action can be to execute another iteration or to notify that something happened. Executing
+/// another iteration happens when an event is received and ignored, which causes the tunnel state
+/// machine to stay in the same state. The state machine can notify its caller that a state
+/// transition has occurred, that it has finished, or that it has paused to wait for new events.
+enum TunnelStateMachineAction {
+ Repeat(TunnelStateWrapper),
+ Notify(
+ Option<TunnelStateWrapper>,
+ Poll<Option<TunnelStateTransition>, Error>,
+ ),
+}
+
+impl<T: TunnelState> From<EventConsequence<T>> for TunnelStateMachineAction {
+ fn from(event_consequence: EventConsequence<T>) -> Self {
+ use self::EventConsequence::*;
+ use self::TunnelStateMachineAction::*;
+
+ match event_consequence {
+ NewState(Ok(state_wrapper)) | NewState(Err((_, state_wrapper))) => {
+ let transition = state_wrapper.info();
+ Notify(Some(state_wrapper), Ok(Async::Ready(Some(transition))))
+ }
+ SameState(state) => Repeat(state.into()),
+ NoEvents(state) => Notify(Some(state.into()), Ok(Async::NotReady)),
+ Finished => Notify(None, Ok(Async::Ready(None))),
+ }
+ }
+}
+
+/// Values that are common to all tunnel states.
+struct SharedTunnelStateValues {
+ security: NetworkSecurityImpl,
+}
+
+/// Asynchronous result of an attempt to progress a state.
+enum EventConsequence<T: TunnelState> {
+ /// Transition to a new state.
+ NewState(StateEntryResult),
+ /// An event was received, but it was ignored by the state so no transition is performed.
+ SameState(T),
+ /// No events were received, the event loop should block until one becomes available.
+ NoEvents(T),
+ /// The state machine has finished its execution.
+ Finished,
+}
+
+impl<T> EventConsequence<T>
+where
+ T: TunnelState,
+{
+ /// Helper method to chain handling multiple different event types.
+ ///
+ /// The `handle_event` is only called if no events were handled so far.
+ pub fn or_else<F>(self, handle_event: F, shared_values: &mut SharedTunnelStateValues) -> Self
+ where
+ F: FnOnce(T, &mut SharedTunnelStateValues) -> Self,
+ {
+ use self::EventConsequence::*;
+
+ match self {
+ NoEvents(state) => handle_event(state, shared_values),
+ consequence => consequence,
+ }
+ }
+}
+
+/// Result of entering a `T: TunnelState`.
+///
+/// It is either the state itself when successful, or an error paired with a fallback state.
+type StateEntryResult = ::std::result::Result<TunnelStateWrapper, (Error, TunnelStateWrapper)>;
+
+/// Trait that contains the method all states should implement to handle an event and advance the
+/// state machine.
+trait TunnelState: Into<TunnelStateWrapper> + Sized {
+ /// Type representing extra information required for entering the state.
+ type Bootstrap;
+
+ /// Constructor function.
+ ///
+ /// This is the state entry point. It attempts to enter the state, and may fail by entering an
+ /// error or fallback state instead.
+ fn enter(
+ shared_values: &mut SharedTunnelStateValues,
+ bootstrap: Self::Bootstrap,
+ ) -> StateEntryResult;
+
+ /// Main state function.
+ ///
+ /// This is state exit point. It consumes itself and returns the next state to advance to when
+ /// it has completed, or itself if it wants to ignore a received event or if no events were
+ /// ready to be received. See [`EventConsequence`] for more details.
+ ///
+ /// An implementation can handle events from many sources, but it should also handle command
+ /// events received through the provided `commands` stream.
+ ///
+ /// [`EventConsequence`]: enum.EventConsequence.html
+ fn handle_event(
+ self,
+ commands: &mut mpsc::UnboundedReceiver<TunnelCommand>,
+ shared_values: &mut SharedTunnelStateValues,
+ ) -> EventConsequence<Self>;
+}
+
+/// Valid states of the tunnel.
+///
+/// All implementations must implement `TunnelState` so that they can handle events and
+/// commands in order to advance the state machine.
+enum TunnelStateWrapper {
+ Disconnected(DisconnectedState),
+ Connecting(ConnectingState),
+ Connected(ConnectedState),
+ Disconnecting(DisconnectingState),
+}
+
+impl TunnelStateWrapper {
+ fn new(
+ shared_values: &mut SharedTunnelStateValues,
+ bootstrap: <DisconnectedState as TunnelState>::Bootstrap,
+ ) -> StateEntryResult {
+ DisconnectedState::enter(shared_values, bootstrap)
+ }
+
+ fn handle_event(
+ self,
+ commands: &mut mpsc::UnboundedReceiver<TunnelCommand>,
+ shared_values: &mut SharedTunnelStateValues,
+ ) -> TunnelStateMachineAction {
+ macro_rules! handle_event {
+ ( $($state:ident),* $(,)* ) => {
+ match self {
+ $(
+ TunnelStateWrapper::$state(state) => {
+ let event_consequence = state.handle_event(commands, shared_values);
+ TunnelStateMachineAction::from(event_consequence)
+ }
+ )*
+ }
+ }
+ }
+
+ handle_event! {
+ Disconnected,
+ Connecting,
+ Connected,
+ Disconnecting,
+ }
+ }
+
+ /// Returns information describing the state.
+ fn info(&self) -> TunnelStateTransition {
+ match *self {
+ TunnelStateWrapper::Disconnected(_) => TunnelStateTransition::Disconnected,
+ TunnelStateWrapper::Connecting(_) => TunnelStateTransition::Connecting,
+ TunnelStateWrapper::Connected(_) => TunnelStateTransition::Connected,
+ TunnelStateWrapper::Disconnecting(_) => TunnelStateTransition::Disconnecting,
+ }
+ }
+}
+
+macro_rules! impl_from_for_tunnel_state {
+ ($state_variant:ident($state_type:ident)) => {
+ impl From<$state_type> for TunnelStateWrapper {
+ fn from(state: $state_type) -> Self {
+ TunnelStateWrapper::$state_variant(state)
+ }
+ }
+ };
+}
+
+impl_from_for_tunnel_state!(Disconnected(DisconnectedState));
+impl_from_for_tunnel_state!(Connecting(ConnectingState));
+impl_from_for_tunnel_state!(Connected(ConnectedState));
+impl_from_for_tunnel_state!(Disconnecting(DisconnectingState));
+
+impl Debug for TunnelStateWrapper {
+ fn fmt(&self, formatter: &mut Formatter) -> FmtResult {
+ use self::TunnelStateWrapper::*;
+
+ match *self {
+ Disconnected(_) => write!(formatter, "TunnelStateWrapper::Disconnected(_)"),
+ Connecting(_) => write!(formatter, "TunnelStateWrapper::Connecting(_)"),
+ Connected(_) => write!(formatter, "TunnelStateWrapper::Connected(_)"),
+ Disconnecting(_) => write!(formatter, "TunnelStateWrapper::Disconnecting(_)"),
+ }
+ }
+}