summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--mullvad-daemon/src/tunnel_state_machine/connected_state.rs19
-rw-r--r--mullvad-daemon/src/tunnel_state_machine/connecting_state.rs23
-rw-r--r--mullvad-daemon/src/tunnel_state_machine/mod.rs94
3 files changed, 99 insertions, 37 deletions
diff --git a/mullvad-daemon/src/tunnel_state_machine/connected_state.rs b/mullvad-daemon/src/tunnel_state_machine/connected_state.rs
index 5163029ec1..4a006153ba 100644
--- a/mullvad-daemon/src/tunnel_state_machine/connected_state.rs
+++ b/mullvad-daemon/src/tunnel_state_machine/connected_state.rs
@@ -1,32 +1,43 @@
use futures::sync::mpsc;
use futures::Stream;
-use talpid_core::tunnel::TunnelEvent;
+use talpid_core::tunnel::{TunnelEvent, TunnelMetadata};
+use talpid_types::net::TunnelEndpoint;
use super::{
AfterDisconnect, CloseHandle, DisconnectingState, EventConsequence, StateEntryResult,
- TunnelCommand, TunnelState, TunnelStateWrapper,
+ TunnelCommand, TunnelState, TunnelStateTransition, TunnelStateWrapper,
};
pub struct ConnectedStateBootstrap {
+ pub metadata: TunnelMetadata,
pub tunnel_events: mpsc::UnboundedReceiver<TunnelEvent>,
+ pub tunnel_endpoint: TunnelEndpoint,
pub close_handle: CloseHandle,
}
/// The tunnel is up and working.
pub struct ConnectedState {
- close_handle: CloseHandle,
+ metadata: TunnelMetadata,
tunnel_events: mpsc::UnboundedReceiver<TunnelEvent>,
+ tunnel_endpoint: TunnelEndpoint,
+ close_handle: CloseHandle,
}
impl ConnectedState {
fn from(bootstrap: ConnectedStateBootstrap) -> Self {
ConnectedState {
- close_handle: bootstrap.close_handle,
+ metadata: bootstrap.metadata,
tunnel_events: bootstrap.tunnel_events,
+ tunnel_endpoint: bootstrap.tunnel_endpoint,
+ close_handle: bootstrap.close_handle,
}
}
+ pub fn info(&self) -> TunnelStateTransition {
+ TunnelStateTransition::Connected(self.tunnel_endpoint, self.metadata.clone())
+ }
+
fn handle_commands(
self,
commands: &mut mpsc::UnboundedReceiver<TunnelCommand>,
diff --git a/mullvad-daemon/src/tunnel_state_machine/connecting_state.rs b/mullvad-daemon/src/tunnel_state_machine/connecting_state.rs
index e9677ecf6b..c9860a2105 100644
--- a/mullvad-daemon/src/tunnel_state_machine/connecting_state.rs
+++ b/mullvad-daemon/src/tunnel_state_machine/connecting_state.rs
@@ -8,13 +8,13 @@ use futures::sink::Wait;
use futures::sync::mpsc;
use futures::{Sink, Stream};
-use talpid_core::tunnel::{TunnelEvent, TunnelMonitor};
-use talpid_types::net::TunnelEndpointData;
+use talpid_core::tunnel::{TunnelEvent, TunnelMetadata, TunnelMonitor};
+use talpid_types::net::{TunnelEndpoint, TunnelEndpointData};
use super::{
AfterDisconnect, CloseHandle, ConnectedState, ConnectedStateBootstrap, DisconnectedState,
DisconnectingState, EventConsequence, Result, ResultExt, StateEntryResult, TunnelCommand,
- TunnelParameters, TunnelState, TunnelStateWrapper, OPENVPN_LOG_FILENAME,
+ TunnelParameters, TunnelState, TunnelStateTransition, TunnelStateWrapper, OPENVPN_LOG_FILENAME,
WIREGUARD_LOG_FILENAME,
};
use logging;
@@ -30,11 +30,13 @@ const TUNNEL_INTERFACE_ALIAS: Option<&str> = None;
pub struct ConnectingState {
close_handle: CloseHandle,
tunnel_events: mpsc::UnboundedReceiver<TunnelEvent>,
+ tunnel_endpoint: TunnelEndpoint,
}
impl ConnectingState {
fn new(parameters: TunnelParameters) -> Result<Self> {
let (event_tx, event_rx) = mpsc::unbounded();
+ let tunnel_endpoint = parameters.endpoint;
let monitor = Self::spawn_tunnel_monitor(parameters, event_tx.wait())?;
let close_handle = CloseHandle::new(&monitor);
@@ -43,6 +45,7 @@ impl ConnectingState {
Ok(ConnectingState {
close_handle,
tunnel_events: event_rx,
+ tunnel_endpoint,
})
}
@@ -108,13 +111,19 @@ impl ConnectingState {
});
}
- fn into_connected_state_bootstrap(self) -> ConnectedStateBootstrap {
+ fn into_connected_state_bootstrap(self, metadata: TunnelMetadata) -> ConnectedStateBootstrap {
ConnectedStateBootstrap {
+ metadata,
tunnel_events: self.tunnel_events,
+ tunnel_endpoint: self.tunnel_endpoint,
close_handle: self.close_handle,
}
}
+ pub fn info(&self) -> TunnelStateTransition {
+ TunnelStateTransition::Connecting(self.tunnel_endpoint)
+ }
+
fn handle_commands(
self,
commands: &mut mpsc::UnboundedReceiver<TunnelCommand>,
@@ -134,9 +143,9 @@ impl ConnectingState {
use self::EventConsequence::*;
match try_handle_event!(self, self.tunnel_events.poll()) {
- Ok(TunnelEvent::Up(_)) => {
- NewState(ConnectedState::enter(self.into_connected_state_bootstrap()))
- }
+ Ok(TunnelEvent::Up(metadata)) => NewState(ConnectedState::enter(
+ self.into_connected_state_bootstrap(metadata),
+ )),
Ok(_) => SameState(self),
Err(_) => NewState(DisconnectingState::enter((
self.close_handle.close(),
diff --git a/mullvad-daemon/src/tunnel_state_machine/mod.rs b/mullvad-daemon/src/tunnel_state_machine/mod.rs
index 57f611f3ef..2c8742b812 100644
--- a/mullvad-daemon/src/tunnel_state_machine/mod.rs
+++ b/mullvad-daemon/src/tunnel_state_machine/mod.rs
@@ -13,11 +13,12 @@ use std::thread;
use error_chain::ChainedError;
use futures::sync::{mpsc, oneshot};
-use futures::{Async, Future, Poll};
+use futures::{Async, Poll, Stream};
use tokio_core::reactor::Core;
use mullvad_types::account::AccountToken;
-use talpid_core::tunnel::{self, TunnelMonitor};
+use talpid_core::mpsc::IntoSender;
+use talpid_core::tunnel::{self, TunnelMetadata, TunnelMonitor};
use talpid_types::net::{TunnelEndpoint, TunnelOptions};
use self::connected_state::{ConnectedState, ConnectedStateBootstrap};
@@ -29,11 +30,16 @@ use super::{OPENVPN_LOG_FILENAME, WIREGUARD_LOG_FILENAME};
error_chain!{}
/// Spawn the tunnel state machine thread, returning a channel for sending tunnel commands.
-pub fn spawn() -> mpsc::UnboundedSender<TunnelCommand> {
+pub fn spawn<T>(
+ state_change_listener: IntoSender<TunnelStateTransition, T>,
+) -> mpsc::UnboundedSender<TunnelCommand>
+where
+ T: From<TunnelStateTransition> + Send + 'static,
+{
let (command_tx, command_rx) = mpsc::unbounded();
thread::spawn(move || {
- if let Err(error) = event_loop(command_rx) {
+ if let Err(error) = event_loop(command_rx, state_change_listener) {
error!("{}", error.display_chain());
}
});
@@ -41,15 +47,24 @@ pub fn spawn() -> mpsc::UnboundedSender<TunnelCommand> {
command_tx
}
-fn event_loop(commands: mpsc::UnboundedReceiver<TunnelCommand>) -> Result<()> {
+fn event_loop<T>(
+ commands: mpsc::UnboundedReceiver<TunnelCommand>,
+ state_change_listener: IntoSender<TunnelStateTransition, T>,
+) -> Result<()>
+where
+ T: From<TunnelStateTransition> + Send + 'static,
+{
let mut reactor =
Core::new().chain_err(|| "Failed to initialize tunnel state machine event loop")?;
let state_machine = TunnelStateMachine::new(commands);
reactor
- .run(state_machine)
- .chain_err(|| "Tunnel state machine finished with an error")
+ .run(state_machine.for_each(|state_change_event| {
+ state_change_listener
+ .send(state_change_event)
+ .chain_err(|| "Failed to send state change event to listener")
+ })).chain_err(|| "Tunnel state machine finished with an error")
}
/// Representation of external commands for the tunnel state machine.
@@ -69,11 +84,21 @@ pub struct TunnelParameters {
pub account_token: AccountToken,
}
+/// Event resulting from a transition to a new tunnel state.
+#[derive(Clone, Debug, PartialEq)]
+pub enum TunnelStateTransition {
+ Disconnected,
+ Connecting(TunnelEndpoint),
+ Connected(TunnelEndpoint, TunnelMetadata),
+ Disconnecting,
+}
+
/// Asynchronous handling of the tunnel state machine.
///
-/// This type implements `Future`, and attempts to advance the state machine based on the events
+/// This type implements `Stream`, and attempts to advance the state machine based on the events
/// received on the commands stream and possibly on events that specific states are also listening
-/// to.
+/// to. Every time it successfully advances the state machine a `TunnelStateTransition` is emitted
+/// by the stream.
struct TunnelStateMachine {
current_state: Option<TunnelStateWrapper>,
commands: mpsc::UnboundedReceiver<TunnelCommand>,
@@ -91,27 +116,42 @@ impl TunnelStateMachine {
}
}
-impl Future for TunnelStateMachine {
- type Item = ();
+impl Stream for TunnelStateMachine {
+ type Item = TunnelStateTransition;
type Error = Error;
- fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+ fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ use self::EventConsequence::*;
+
let mut state = self
.current_state
.take()
.expect("State machine lost track of its state!");
- let mut event_was_received = true;
+ let mut result = Ok(Async::Ready(None));
+ let mut event_was_ignored = true;
- while event_was_received {
+ while event_was_ignored {
let transition = state.handle_event(&mut self.commands);
- event_was_received = transition.is_because_of_an_event();
+ event_was_ignored = match transition {
+ SameState(_) => true,
+ NewState(_) | NoEvents(_) => false,
+ };
+
+ result = match transition {
+ NewState(Ok(ref state)) | NewState(Err((_, ref state))) => {
+ Ok(Async::Ready(Some(state.info())))
+ }
+ SameState(_) => result,
+ NoEvents(_) => Ok(Async::NotReady),
+ };
+
state = transition.into_wrapped_tunnel_state();
}
self.current_state = Some(state);
- Ok(Async::NotReady)
+ result
}
}
@@ -129,16 +169,6 @@ impl<T> EventConsequence<T>
where
T: TunnelState,
{
- /// Checks if this transition happened after an event was received.
- pub fn is_because_of_an_event(&self) -> bool {
- use self::EventConsequence::*;
-
- match self {
- NewState(_) | SameState(_) => true,
- NoEvents(_) => false,
- }
- }
-
/// Helper method to chain handling multiple different event types.
///
/// The `handle_event` is only called if no events were handled so far.
@@ -215,6 +245,18 @@ enum TunnelStateWrapper {
Disconnecting(DisconnectingState),
}
+impl TunnelStateWrapper {
+ /// Returns information describing the state.
+ fn info(&self) -> TunnelStateTransition {
+ match *self {
+ TunnelStateWrapper::Disconnected(_) => TunnelStateTransition::Disconnected,
+ TunnelStateWrapper::Connecting(ref state) => state.info(),
+ TunnelStateWrapper::Connected(ref state) => state.info(),
+ TunnelStateWrapper::Disconnecting(_) => TunnelStateTransition::Disconnecting,
+ }
+ }
+}
+
macro_rules! impl_from_for_tunnel_state {
($state_variant:ident($state_type:ident)) => {
impl From<$state_type> for TunnelStateWrapper {