diff options
| author | David Lönnhager <david.l@mullvad.net> | 2020-08-15 02:48:03 +0200 |
|---|---|---|
| committer | David Lönnhager <david.l@mullvad.net> | 2020-09-01 14:15:49 +0200 |
| commit | aa435ecc967581ca021153456a0ba96a2483470b (patch) | |
| tree | 365f20ffa678da2d48b99430fed8576b0348390c | |
| parent | c2fd9b42b65821891fbdde2244d6685079808282 (diff) | |
| download | mullvadvpn-aa435ecc967581ca021153456a0ba96a2483470b.tar.xz mullvadvpn-aa435ecc967581ca021153456a0ba96a2483470b.zip | |
Use new-type futures for daemon-TSM boundary and the offline monitor
| -rw-r--r-- | mullvad-daemon/src/lib.rs | 113 | ||||
| -rw-r--r-- | talpid-core/src/offline/android.rs | 2 | ||||
| -rw-r--r-- | talpid-core/src/offline/linux.rs | 3 | ||||
| -rw-r--r-- | talpid-core/src/offline/macos.rs | 2 | ||||
| -rw-r--r-- | talpid-core/src/offline/mod.rs | 2 | ||||
| -rw-r--r-- | talpid-core/src/offline/windows.rs | 2 | ||||
| -rw-r--r-- | talpid-core/src/tunnel_state_machine/mod.rs | 33 |
7 files changed, 88 insertions, 69 deletions
diff --git a/mullvad-daemon/src/lib.rs b/mullvad-daemon/src/lib.rs index e6c09753a8..cf210cde1c 100644 --- a/mullvad-daemon/src/lib.rs +++ b/mullvad-daemon/src/lib.rs @@ -19,18 +19,11 @@ pub mod version; mod version_check; use futures::{ - channel::{ - mpsc::{UnboundedReceiver, UnboundedSender}, - oneshot, - }, + channel::{mpsc, oneshot}, executor::BlockingStream, future::{abortable, AbortHandle}, }; -use futures01::{ - future, - sync::{mpsc as old_mpsc, oneshot as old_oneshot}, - Future, -}; +use futures01::{future, Future}; use log::{debug, error, info, warn}; use mullvad_rpc::AccountsProxy; use mullvad_types::{ @@ -56,7 +49,7 @@ use std::{ marker::PhantomData, mem, path::PathBuf, - sync::{mpsc, Arc, Weak}, + sync::{mpsc as sync_mpsc, Arc, Weak}, time::Duration, }; #[cfg(target_os = "linux")] @@ -254,7 +247,7 @@ pub(crate) enum InternalDaemonEvent { TunnelStateTransition(TunnelStateTransition), /// Request from the `MullvadTunnelParametersGenerator` to obtain a new relay. GenerateTunnelParameters( - mpsc::Sender<Result<TunnelParameters, ParameterGenerationError>>, + sync_mpsc::Sender<Result<TunnelParameters, ParameterGenerationError>>, u32, ), /// A command sent to the daemon. @@ -340,12 +333,12 @@ impl DaemonExecutionState { pub struct DaemonCommandChannel { sender: DaemonCommandSender, - receiver: UnboundedReceiver<InternalDaemonEvent>, + receiver: mpsc::UnboundedReceiver<InternalDaemonEvent>, } impl DaemonCommandChannel { pub fn new() -> Self { - let (untracked_sender, receiver) = futures::channel::mpsc::unbounded(); + let (untracked_sender, receiver) = mpsc::unbounded(); let sender = DaemonCommandSender(Arc::new(untracked_sender)); Self { sender, receiver } @@ -355,7 +348,12 @@ impl DaemonCommandChannel { self.sender.clone() } - fn destructure(self) -> (DaemonEventSender, UnboundedReceiver<InternalDaemonEvent>) { + fn destructure( + self, + ) -> ( + DaemonEventSender, + mpsc::UnboundedReceiver<InternalDaemonEvent>, + ) { let event_sender = DaemonEventSender::new(Arc::downgrade(&self.sender.0)); (event_sender, self.receiver) @@ -363,7 +361,7 @@ impl DaemonCommandChannel { } #[derive(Clone)] -pub struct DaemonCommandSender(Arc<UnboundedSender<InternalDaemonEvent>>); +pub struct DaemonCommandSender(Arc<mpsc::UnboundedSender<InternalDaemonEvent>>); impl DaemonCommandSender { pub fn send(&self, command: DaemonCommand) -> Result<(), Error> { @@ -374,7 +372,7 @@ impl DaemonCommandSender { } pub(crate) struct DaemonEventSender<E = InternalDaemonEvent> { - sender: Weak<UnboundedSender<InternalDaemonEvent>>, + sender: Weak<mpsc::UnboundedSender<InternalDaemonEvent>>, _event: PhantomData<E>, } @@ -391,7 +389,7 @@ where } impl DaemonEventSender { - pub fn new(sender: Weak<UnboundedSender<InternalDaemonEvent>>) -> Self { + pub fn new(sender: Weak<mpsc::UnboundedSender<InternalDaemonEvent>>) -> Self { DaemonEventSender { sender, _event: PhantomData, @@ -456,13 +454,13 @@ pub trait EventListener { } pub struct Daemon<L: EventListener> { - tunnel_command_tx: Arc<old_mpsc::UnboundedSender<TunnelCommand>>, + tunnel_command_tx: Arc<mpsc::UnboundedSender<TunnelCommand>>, tunnel_state: TunnelState, target_state: TargetState, state: DaemonExecutionState, #[cfg(target_os = "linux")] exclude_pids: split_tunnel::PidManager, - rx: BlockingStream<UnboundedReceiver<InternalDaemonEvent>>, + rx: BlockingStream<mpsc::UnboundedReceiver<InternalDaemonEvent>>, tx: DaemonEventSender, reconnection_job: Option<AbortHandle>, event_listener: L, @@ -479,7 +477,7 @@ pub struct Daemon<L: EventListener> { app_version_info: AppVersionInfo, shutdown_callbacks: Vec<Box<dyn FnOnce()>>, /// oneshot channel that completes once the tunnel state machine has been shut down - tunnel_state_machine_shutdown_signal: old_oneshot::Receiver<()>, + tunnel_state_machine_shutdown_signal: oneshot::Receiver<()>, cache_dir: PathBuf, } @@ -497,7 +495,7 @@ where #[cfg(target_os = "android")] android_context: AndroidContext, ) -> Result<Self, Error> { let (tunnel_state_machine_shutdown_tx, tunnel_state_machine_shutdown_signal) = - old_oneshot::channel(); + oneshot::channel(); let mut rpc_runtime = mullvad_rpc::MullvadRpcRuntime::with_cache_dir(&cache_dir) .map_err(Error::InitRpcFactory)?; @@ -565,19 +563,21 @@ where let tunnel_parameters_generator = MullvadTunnelParametersGenerator { tx: internal_event_tx.clone(), }; - let tunnel_command_tx = tunnel_state_machine::spawn( - settings.allow_lan, - settings.block_when_disconnected, - tunnel_parameters_generator, - log_dir, - resource_dir, - cache_dir.clone(), - internal_event_tx.to_specialized_sender(), - tunnel_state_machine_shutdown_tx, - #[cfg(target_os = "android")] - android_context, - ) - .map_err(Error::TunnelError)?; + let tunnel_command_tx = rpc_runtime + .runtime() + .block_on(tunnel_state_machine::spawn( + settings.allow_lan, + settings.block_when_disconnected, + tunnel_parameters_generator, + log_dir, + resource_dir, + cache_dir.clone(), + internal_event_tx.to_specialized_sender(), + tunnel_state_machine_shutdown_tx, + #[cfg(target_os = "android")] + android_context, + )) + .map_err(Error::TunnelError)?; let wireguard_key_manager = wireguard::KeyManager::new(internal_event_tx.clone(), rpc_handle.clone()); @@ -661,44 +661,51 @@ where } fn finalize(self) { - let (event_listener, shutdown_callbacks, tunnel_state_machine_shutdown_signal) = - self.shutdown(); + let ( + event_listener, + shutdown_callbacks, + mut rpc_runtime, + tunnel_state_machine_shutdown_signal, + ) = self.shutdown(); for cb in shutdown_callbacks { cb(); } - let state_machine_shutdown = tokio_timer::Timer::default().timeout( - // the oneshot::Canceled error type does not play well with the timer error, as such, - // it has to be cast away. - tunnel_state_machine_shutdown_signal.map_err(|_| { - log::error!("Tunnel state machine already shut down"); - }), + let shutdown_signal = tokio02::time::timeout( TUNNEL_STATE_MACHINE_SHUTDOWN_TIMEOUT, + tunnel_state_machine_shutdown_signal, ); - match state_machine_shutdown.wait() { - Ok(_) => { - log::info!("Tunnel state machine shut down"); - } - Err(_) => { - log::error!("Tunnel state machine did not shut down in time, shutting down anyway"); - } + match rpc_runtime.runtime().block_on(shutdown_signal) { + Ok(_) => log::info!("Tunnel state machine shut down"), + Err(_) => log::error!("Tunnel state machine did not shut down gracefully"), } + mem::drop(rpc_runtime); + mem::drop(event_listener); } /// Shuts down the daemon without shutting down the underlying event listener and the shutdown /// callbacks - fn shutdown(self) -> (L, Vec<Box<dyn FnOnce()>>, old_oneshot::Receiver<()>) { + fn shutdown( + self, + ) -> ( + L, + Vec<Box<dyn FnOnce()>>, + mullvad_rpc::MullvadRpcRuntime, + oneshot::Receiver<()>, + ) { let Daemon { event_listener, shutdown_callbacks, + rpc_runtime, tunnel_state_machine_shutdown_signal, .. } = self; ( event_listener, shutdown_callbacks, + rpc_runtime, tunnel_state_machine_shutdown_signal, ) } @@ -781,7 +788,9 @@ where fn handle_generate_tunnel_parameters( &mut self, - tunnel_parameters_tx: &mpsc::Sender<Result<TunnelParameters, ParameterGenerationError>>, + tunnel_parameters_tx: &sync_mpsc::Sender< + Result<TunnelParameters, ParameterGenerationError>, + >, retry_attempt: u32, ) { if let Some(account_token) = self.settings.get_account_token() { @@ -1985,7 +1994,7 @@ impl TunnelParametersGenerator for MullvadTunnelParametersGenerator { &mut self, retry_attempt: u32, ) -> Result<TunnelParameters, ParameterGenerationError> { - let (response_tx, response_rx) = mpsc::channel(); + let (response_tx, response_rx) = sync_mpsc::channel(); if self .tx .send(InternalDaemonEvent::GenerateTunnelParameters( diff --git a/talpid-core/src/offline/android.rs b/talpid-core/src/offline/android.rs index 3863415cfb..7135ac339d 100644 --- a/talpid-core/src/offline/android.rs +++ b/talpid-core/src/offline/android.rs @@ -1,5 +1,5 @@ use crate::tunnel_state_machine::TunnelCommand; -use futures01::sync::mpsc::UnboundedSender; +use futures::channel::mpsc::UnboundedSender; use jnix::{ jni::{ self, diff --git a/talpid-core/src/offline/linux.rs b/talpid-core/src/offline/linux.rs index 0b6526f0e5..435bc2193d 100644 --- a/talpid-core/src/offline/linux.rs +++ b/talpid-core/src/offline/linux.rs @@ -1,6 +1,5 @@ use crate::tunnel_state_machine::TunnelCommand; -use futures::{StreamExt, TryStreamExt}; -use futures01::sync::mpsc::UnboundedSender; +use futures::{channel::mpsc::UnboundedSender, StreamExt, TryStreamExt}; use netlink_packet_route::{ constants::{ARPHRD_LOOPBACK, ARPHRD_NONE, IFF_LOWER_UP, IFF_UP}, rtnl::link::nlas::{Info as LinkInfo, InfoKind, Nla as LinkNla}, diff --git a/talpid-core/src/offline/macos.rs b/talpid-core/src/offline/macos.rs index 602da6cda9..82310d5f70 100644 --- a/talpid-core/src/offline/macos.rs +++ b/talpid-core/src/offline/macos.rs @@ -1,5 +1,5 @@ use crate::tunnel_state_machine::TunnelCommand; -use futures01::sync::mpsc::UnboundedSender; +use futures::channel::mpsc::UnboundedSender; use std::{ net::{Ipv4Addr, SocketAddr}, sync::{ diff --git a/talpid-core/src/offline/mod.rs b/talpid-core/src/offline/mod.rs index 5cda6290a3..baaa839780 100644 --- a/talpid-core/src/offline/mod.rs +++ b/talpid-core/src/offline/mod.rs @@ -1,5 +1,5 @@ use crate::tunnel_state_machine::TunnelCommand; -use futures01::sync::mpsc::UnboundedSender; +use futures::channel::mpsc::UnboundedSender; use std::sync::Weak; #[cfg(target_os = "android")] use talpid_types::android::AndroidContext; diff --git a/talpid-core/src/offline/windows.rs b/talpid-core/src/offline/windows.rs index c7a86e4073..1563638bf6 100644 --- a/talpid-core/src/offline/windows.rs +++ b/talpid-core/src/offline/windows.rs @@ -1,5 +1,5 @@ use crate::{logging::windows::log_sink, tunnel_state_machine::TunnelCommand, winnet}; -use futures01::sync::mpsc::UnboundedSender; +use futures::channel::mpsc::UnboundedSender; use parking_lot::Mutex; use std::{ ffi::c_void, diff --git a/talpid-core/src/tunnel_state_machine/mod.rs b/talpid-core/src/tunnel_state_machine/mod.rs index b0222ade67..6f5debdd52 100644 --- a/talpid-core/src/tunnel_state_machine/mod.rs +++ b/talpid-core/src/tunnel_state_machine/mod.rs @@ -23,10 +23,11 @@ use crate::{ tunnel::tun_provider::TunProvider, }; -use futures01::{ - sync::{mpsc, oneshot}, - Async, Future, Poll, Stream, +use futures::{ + channel::{mpsc, oneshot}, + StreamExt, }; +use futures01::{sync::mpsc as old_mpsc, Async, Future, Poll, Stream}; use std::{ collections::HashSet, io, @@ -77,7 +78,7 @@ pub enum Error { } /// Spawn the tunnel state machine thread, returning a channel for sending tunnel commands. -pub fn spawn( +pub async fn spawn( allow_lan: bool, block_when_disconnected: bool, tunnel_parameters_generator: impl TunnelParametersGenerator, @@ -88,7 +89,7 @@ pub fn spawn( shutdown_tx: oneshot::Sender<()>, #[cfg(target_os = "android")] android_context: AndroidContext, ) -> Result<Arc<mpsc::UnboundedSender<TunnelCommand>>, Error> { - let (command_tx, command_rx) = mpsc::unbounded(); + let (command_tx, mut command_rx) = mpsc::unbounded(); let command_tx = Arc::new(command_tx); let mut offline_monitor = offline::spawn_monitor( Arc::downgrade(&command_tx), @@ -105,6 +106,16 @@ pub fn spawn( allow_lan, ); + // Hide internal 0.1 futures from the client + let (command_adapter_tx, command_adapter_rx) = old_mpsc::unbounded(); + tokio02::spawn(async move { + while let Some(command) = command_rx.next().await { + if command_adapter_tx.unbounded_send(command).is_err() { + log::error!("Failed to forward daemon command"); + } + } + }); + let (startup_result_tx, startup_result_rx) = sync_mpsc::channel(); thread::spawn(move || { match create_event_loop( @@ -116,7 +127,7 @@ pub fn spawn( log_dir, resource_dir, cache_dir, - command_rx, + command_adapter_rx, state_change_listener, shutdown_tx, ) { @@ -156,7 +167,7 @@ fn create_event_loop( log_dir: Option<PathBuf>, resource_dir: PathBuf, cache_dir: impl AsRef<Path>, - commands: mpsc::UnboundedReceiver<TunnelCommand>, + commands: old_mpsc::UnboundedReceiver<TunnelCommand>, state_change_listener: impl Sender<TunnelStateTransition>, shutdown_tx: oneshot::Sender<()>, ) -> Result<(Core, impl Future<Item = (), Error = Error>), Error> { @@ -213,7 +224,7 @@ pub enum TunnelCommand { /// by the stream. struct TunnelStateMachine { current_state: Option<TunnelStateWrapper>, - commands: mpsc::UnboundedReceiver<TunnelCommand>, + commands: old_mpsc::UnboundedReceiver<TunnelCommand>, shared_values: SharedTunnelStateValues, } @@ -227,7 +238,7 @@ impl TunnelStateMachine { log_dir: Option<PathBuf>, resource_dir: PathBuf, cache_dir: impl AsRef<Path>, - commands: mpsc::UnboundedReceiver<TunnelCommand>, + commands: old_mpsc::UnboundedReceiver<TunnelCommand>, ) -> Result<Self, Error> { let args = if block_when_disconnected { FirewallArguments { @@ -432,7 +443,7 @@ trait TunnelState: Into<TunnelStateWrapper> + Sized { /// [`EventConsequence`]: enum.EventConsequence.html fn handle_event( self, - commands: &mut mpsc::UnboundedReceiver<TunnelCommand>, + commands: &mut old_mpsc::UnboundedReceiver<TunnelCommand>, shared_values: &mut SharedTunnelStateValues, ) -> EventConsequence<Self>; } @@ -456,7 +467,7 @@ macro_rules! state_wrapper { impl $wrapper_name { fn handle_event( self, - commands: &mut mpsc::UnboundedReceiver<TunnelCommand>, + commands: &mut old_mpsc::UnboundedReceiver<TunnelCommand>, shared_values: &mut SharedTunnelStateValues, ) -> TunnelStateMachineAction { match self { |
