diff options
| author | Janito Vaqueiro Ferreira Filho <janito@mullvad.net> | 2018-08-20 09:28:45 -0300 |
|---|---|---|
| committer | Janito Vaqueiro Ferreira Filho <janito@mullvad.net> | 2018-08-23 15:00:04 -0300 |
| commit | 5abf70718d7386559fcbf7310eab764124d0ed61 (patch) | |
| tree | 61d267554b6348ad208bd92f7363da9d8e7af716 | |
| parent | 03e080066ca0e9bdbadb5cf192ff43efab1749ce (diff) | |
| download | mullvadvpn-5abf70718d7386559fcbf7310eab764124d0ed61.tar.xz mullvadvpn-5abf70718d7386559fcbf7310eab764124d0ed61.zip | |
Refactor tunnel monitoring by state machine
Remove `CloseHandle` and notify the tunnel that it should be closed when
entering the `DisconnectedState`. The `TunnelMonitor` thread is used to
notify when the tunnel has stopped, so all states should listen to the
`oneshot::Receiver` of the tunnel close event and handle it accordingly.
4 files changed, 86 insertions, 55 deletions
diff --git a/mullvad-daemon/src/tunnel_state_machine/connected_state.rs b/mullvad-daemon/src/tunnel_state_machine/connected_state.rs index f17eaf6ab8..398b6d7819 100644 --- a/mullvad-daemon/src/tunnel_state_machine/connected_state.rs +++ b/mullvad-daemon/src/tunnel_state_machine/connected_state.rs @@ -1,11 +1,11 @@ -use futures::sync::mpsc; -use futures::Stream; +use futures::sync::{mpsc, oneshot}; +use futures::{Async, Future, Stream}; -use talpid_core::tunnel::{TunnelEvent, TunnelMetadata}; +use talpid_core::tunnel::{CloseHandle, TunnelEvent, TunnelMetadata}; use talpid_types::net::TunnelEndpoint; use super::{ - AfterDisconnect, CloseHandle, DisconnectingState, EventConsequence, StateEntryResult, + AfterDisconnect, ConnectingState, DisconnectingState, EventConsequence, StateEntryResult, TunnelCommand, TunnelParameters, TunnelState, TunnelStateTransition, TunnelStateWrapper, }; @@ -14,6 +14,7 @@ pub struct ConnectedStateBootstrap { pub tunnel_events: mpsc::UnboundedReceiver<TunnelEvent>, pub tunnel_endpoint: TunnelEndpoint, pub tunnel_parameters: TunnelParameters, + pub tunnel_close_event: oneshot::Receiver<()>, pub close_handle: CloseHandle, } @@ -23,6 +24,7 @@ pub struct ConnectedState { tunnel_events: mpsc::UnboundedReceiver<TunnelEvent>, tunnel_endpoint: TunnelEndpoint, tunnel_parameters: TunnelParameters, + tunnel_close_event: oneshot::Receiver<()>, close_handle: CloseHandle, } @@ -33,6 +35,7 @@ impl ConnectedState { tunnel_events: bootstrap.tunnel_events, tunnel_endpoint: bootstrap.tunnel_endpoint, tunnel_parameters: bootstrap.tunnel_parameters, + tunnel_close_event: bootstrap.tunnel_close_event, close_handle: bootstrap.close_handle, } } @@ -51,7 +54,8 @@ impl ConnectedState { Ok(TunnelCommand::Connect(parameters)) => { if parameters != self.tunnel_parameters { NewState(DisconnectingState::enter(( - self.close_handle.close(), + self.close_handle, + self.tunnel_close_event, AfterDisconnect::Reconnect(parameters), ))) } else { @@ -59,7 +63,8 @@ impl ConnectedState { } } Ok(TunnelCommand::Disconnect) | Err(_) => NewState(DisconnectingState::enter(( - self.close_handle.close(), + self.close_handle, + self.tunnel_close_event, AfterDisconnect::Nothing, ))), } @@ -70,12 +75,26 @@ impl ConnectedState { match try_handle_event!(self, self.tunnel_events.poll()) { Ok(TunnelEvent::Down) | Err(_) => NewState(DisconnectingState::enter(( - self.close_handle.close(), + self.close_handle, + self.tunnel_close_event, AfterDisconnect::Reconnect(self.tunnel_parameters), ))), Ok(_) => SameState(self), } } + + fn handle_tunnel_close_event(mut self) -> EventConsequence<Self> { + use self::EventConsequence::*; + + match self.tunnel_close_event.poll() { + Ok(Async::Ready(_)) => {} + Ok(Async::NotReady) => return NoEvents(self), + Err(_cancelled) => warn!("Tunnel monitor thread has stopped unexpectedly"), + } + + info!("Tunnel closed. Reconnecting."); + NewState(ConnectingState::enter(self.tunnel_parameters)) + } } impl TunnelState for ConnectedState { @@ -91,5 +110,6 @@ impl TunnelState for ConnectedState { ) -> EventConsequence<Self> { self.handle_commands(commands) .or_else(Self::handle_tunnel_events) + .or_else(Self::handle_tunnel_close_event) } } diff --git a/mullvad-daemon/src/tunnel_state_machine/connecting_state.rs b/mullvad-daemon/src/tunnel_state_machine/connecting_state.rs index 9723e13728..e909a786f3 100644 --- a/mullvad-daemon/src/tunnel_state_machine/connecting_state.rs +++ b/mullvad-daemon/src/tunnel_state_machine/connecting_state.rs @@ -5,14 +5,14 @@ use std::thread; use std::time::{Duration, Instant}; use futures::sink::Wait; -use futures::sync::mpsc; -use futures::{Sink, Stream}; +use futures::sync::{mpsc, oneshot}; +use futures::{Async, Future, Sink, Stream}; -use talpid_core::tunnel::{TunnelEvent, TunnelMetadata, TunnelMonitor}; +use talpid_core::tunnel::{CloseHandle, TunnelEvent, TunnelMetadata, TunnelMonitor}; use talpid_types::net::{TunnelEndpoint, TunnelEndpointData}; use super::{ - AfterDisconnect, CloseHandle, ConnectedState, ConnectedStateBootstrap, DisconnectedState, + AfterDisconnect, ConnectedState, ConnectedStateBootstrap, DisconnectedState, DisconnectingState, EventConsequence, Result, ResultExt, StateEntryResult, TunnelCommand, TunnelParameters, TunnelState, TunnelStateTransition, TunnelStateWrapper, OPENVPN_LOG_FILENAME, WIREGUARD_LOG_FILENAME, @@ -28,26 +28,27 @@ const TUNNEL_INTERFACE_ALIAS: Option<&str> = None; /// The tunnel has been started, but it is not established/functional. pub struct ConnectingState { - close_handle: CloseHandle, tunnel_events: mpsc::UnboundedReceiver<TunnelEvent>, tunnel_endpoint: TunnelEndpoint, tunnel_parameters: TunnelParameters, + tunnel_close_event: oneshot::Receiver<()>, + close_handle: CloseHandle, } impl ConnectingState { fn new(parameters: TunnelParameters) -> Result<Self> { - let (event_tx, event_rx) = mpsc::unbounded(); let tunnel_endpoint = parameters.endpoint; + let (event_tx, event_rx) = mpsc::unbounded(); let monitor = Self::spawn_tunnel_monitor(¶meters, event_tx.wait())?; - let close_handle = CloseHandle::new(&monitor); - - Self::spawn_tunnel_monitor_wait_thread(monitor); + let close_handle = monitor.close_handle(); + let tunnel_close_event = Self::spawn_tunnel_monitor_wait_thread(monitor); Ok(ConnectingState { - close_handle, tunnel_events: event_rx, tunnel_endpoint, tunnel_parameters: parameters, + tunnel_close_event, + close_handle, }) } @@ -93,7 +94,9 @@ impl ConnectingState { } } - fn spawn_tunnel_monitor_wait_thread(tunnel_monitor: TunnelMonitor) { + fn spawn_tunnel_monitor_wait_thread(tunnel_monitor: TunnelMonitor) -> oneshot::Receiver<()> { + let (tunnel_close_event_tx, tunnel_close_event_rx) = oneshot::channel(); + thread::spawn(move || { let start = Instant::now(); @@ -109,8 +112,14 @@ impl ConnectingState { thread::sleep(remaining_time); } + if tunnel_close_event_tx.send(()).is_err() { + warn!("Tunnel state machine stopped before receiving tunnel closed event"); + } + trace!("Tunnel monitor thread exit"); }); + + tunnel_close_event_rx } fn into_connected_state_bootstrap(self, metadata: TunnelMetadata) -> ConnectedStateBootstrap { @@ -119,6 +128,7 @@ impl ConnectingState { tunnel_events: self.tunnel_events, tunnel_endpoint: self.tunnel_endpoint, tunnel_parameters: self.tunnel_parameters, + tunnel_close_event: self.tunnel_close_event, close_handle: self.close_handle, } } @@ -137,7 +147,8 @@ impl ConnectingState { Ok(TunnelCommand::Connect(parameters)) => { if parameters != self.tunnel_parameters { NewState(DisconnectingState::enter(( - self.close_handle.close(), + self.close_handle, + self.tunnel_close_event, AfterDisconnect::Reconnect(parameters), ))) } else { @@ -145,7 +156,8 @@ impl ConnectingState { } } Ok(TunnelCommand::Disconnect) | Err(_) => NewState(DisconnectingState::enter(( - self.close_handle.close(), + self.close_handle, + self.tunnel_close_event, AfterDisconnect::Nothing, ))), } @@ -160,11 +172,25 @@ impl ConnectingState { )), Ok(_) => SameState(self), Err(_) => NewState(DisconnectingState::enter(( - self.close_handle.close(), + self.close_handle, + self.tunnel_close_event, AfterDisconnect::Nothing, ))), } } + + fn handle_tunnel_close_event(mut self) -> EventConsequence<Self> { + use self::EventConsequence::*; + + match self.tunnel_close_event.poll() { + Ok(Async::Ready(_)) => {} + Ok(Async::NotReady) => return NoEvents(self), + Err(_cancelled) => warn!("Tunnel monitor thread has stopped unexpectedly"), + } + + info!("Tunnel closed. Reconnecting."); + NewState(ConnectingState::enter(self.tunnel_parameters)) + } } impl TunnelState for ConnectingState { @@ -189,5 +215,6 @@ impl TunnelState for ConnectingState { ) -> EventConsequence<Self> { self.handle_commands(commands) .or_else(Self::handle_tunnel_events) + .or_else(Self::handle_tunnel_close_event) } } diff --git a/mullvad-daemon/src/tunnel_state_machine/disconnecting_state.rs b/mullvad-daemon/src/tunnel_state_machine/disconnecting_state.rs index fd1899a44f..d291c5411d 100644 --- a/mullvad-daemon/src/tunnel_state_machine/disconnecting_state.rs +++ b/mullvad-daemon/src/tunnel_state_machine/disconnecting_state.rs @@ -1,17 +1,18 @@ -use std::io; - +use error_chain::ChainedError; use futures::sync::{mpsc, oneshot}; use futures::{Async, Future, Stream}; +use talpid_core::tunnel::CloseHandle; + use super::{ - ConnectingState, DisconnectedState, EventConsequence, StateEntryResult, TunnelCommand, - TunnelParameters, TunnelState, TunnelStateWrapper, + ConnectingState, DisconnectedState, EventConsequence, ResultExt, StateEntryResult, + TunnelCommand, TunnelParameters, TunnelState, TunnelStateWrapper, }; /// This state is active from when we manually trigger a tunnel kill until the tunnel wait /// operation (TunnelExit) returned. pub struct DisconnectingState { - exited: oneshot::Receiver<io::Result<()>>, + exited: oneshot::Receiver<()>, after_disconnect: AfterDisconnect, } @@ -50,9 +51,17 @@ impl DisconnectingState { } impl TunnelState for DisconnectingState { - type Bootstrap = (oneshot::Receiver<io::Result<()>>, AfterDisconnect); + type Bootstrap = (CloseHandle, oneshot::Receiver<()>, AfterDisconnect); + + fn enter((close_handle, exited, after_disconnect): Self::Bootstrap) -> StateEntryResult { + let close_result = close_handle + .close() + .chain_err(|| "Failed to request tunnel monitor to close the tunnel"); + + if let Err(error) = close_result { + error!("{}", error.display_chain()); + } - fn enter((exited, after_disconnect): Self::Bootstrap) -> StateEntryResult { Ok(TunnelStateWrapper::from(DisconnectingState { exited, after_disconnect, diff --git a/mullvad-daemon/src/tunnel_state_machine/mod.rs b/mullvad-daemon/src/tunnel_state_machine/mod.rs index 92cc064077..3afc42ae8b 100644 --- a/mullvad-daemon/src/tunnel_state_machine/mod.rs +++ b/mullvad-daemon/src/tunnel_state_machine/mod.rs @@ -7,18 +7,17 @@ mod disconnected_state; mod disconnecting_state; use std::fmt::{Debug, Formatter, Result as FmtResult}; -use std::io; use std::path::PathBuf; use std::thread; use error_chain::ChainedError; -use futures::sync::{mpsc, oneshot}; +use futures::sync::mpsc; use futures::{Async, Poll, Stream}; use tokio_core::reactor::Core; use mullvad_types::account::AccountToken; use talpid_core::mpsc::IntoSender; -use talpid_core::tunnel::{self, TunnelMetadata, TunnelMonitor}; +use talpid_core::tunnel::TunnelMetadata; use talpid_types::net::{TunnelEndpoint, TunnelOptions}; use self::connected_state::{ConnectedState, ConnectedStateBootstrap}; @@ -327,27 +326,3 @@ impl Debug for TunnelStateWrapper { } } } - -/// Internal handle to request tunnel to be closed. -pub struct CloseHandle { - tunnel_close_handle: tunnel::CloseHandle, -} - -impl CloseHandle { - fn new(tunnel_monitor: &TunnelMonitor) -> Self { - CloseHandle { - tunnel_close_handle: tunnel_monitor.close_handle(), - } - } - - fn close(self) -> oneshot::Receiver<io::Result<()>> { - let (close_tx, close_rx) = oneshot::channel(); - - thread::spawn(move || { - let _ = close_tx.send(self.tunnel_close_handle.close()); - trace!("Tunnel kill thread exit"); - }); - - close_rx - } -} |
