summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorDavid Lönnhager <david.l@mullvad.net>2020-08-15 02:48:03 +0200
committerDavid Lönnhager <david.l@mullvad.net>2020-09-01 14:15:49 +0200
commitaa435ecc967581ca021153456a0ba96a2483470b (patch)
tree365f20ffa678da2d48b99430fed8576b0348390c
parentc2fd9b42b65821891fbdde2244d6685079808282 (diff)
downloadmullvadvpn-aa435ecc967581ca021153456a0ba96a2483470b.tar.xz
mullvadvpn-aa435ecc967581ca021153456a0ba96a2483470b.zip
Use new-type futures for daemon-TSM boundary and the offline monitor
-rw-r--r--mullvad-daemon/src/lib.rs113
-rw-r--r--talpid-core/src/offline/android.rs2
-rw-r--r--talpid-core/src/offline/linux.rs3
-rw-r--r--talpid-core/src/offline/macos.rs2
-rw-r--r--talpid-core/src/offline/mod.rs2
-rw-r--r--talpid-core/src/offline/windows.rs2
-rw-r--r--talpid-core/src/tunnel_state_machine/mod.rs33
7 files changed, 88 insertions, 69 deletions
diff --git a/mullvad-daemon/src/lib.rs b/mullvad-daemon/src/lib.rs
index e6c09753a8..cf210cde1c 100644
--- a/mullvad-daemon/src/lib.rs
+++ b/mullvad-daemon/src/lib.rs
@@ -19,18 +19,11 @@ pub mod version;
mod version_check;
use futures::{
- channel::{
- mpsc::{UnboundedReceiver, UnboundedSender},
- oneshot,
- },
+ channel::{mpsc, oneshot},
executor::BlockingStream,
future::{abortable, AbortHandle},
};
-use futures01::{
- future,
- sync::{mpsc as old_mpsc, oneshot as old_oneshot},
- Future,
-};
+use futures01::{future, Future};
use log::{debug, error, info, warn};
use mullvad_rpc::AccountsProxy;
use mullvad_types::{
@@ -56,7 +49,7 @@ use std::{
marker::PhantomData,
mem,
path::PathBuf,
- sync::{mpsc, Arc, Weak},
+ sync::{mpsc as sync_mpsc, Arc, Weak},
time::Duration,
};
#[cfg(target_os = "linux")]
@@ -254,7 +247,7 @@ pub(crate) enum InternalDaemonEvent {
TunnelStateTransition(TunnelStateTransition),
/// Request from the `MullvadTunnelParametersGenerator` to obtain a new relay.
GenerateTunnelParameters(
- mpsc::Sender<Result<TunnelParameters, ParameterGenerationError>>,
+ sync_mpsc::Sender<Result<TunnelParameters, ParameterGenerationError>>,
u32,
),
/// A command sent to the daemon.
@@ -340,12 +333,12 @@ impl DaemonExecutionState {
pub struct DaemonCommandChannel {
sender: DaemonCommandSender,
- receiver: UnboundedReceiver<InternalDaemonEvent>,
+ receiver: mpsc::UnboundedReceiver<InternalDaemonEvent>,
}
impl DaemonCommandChannel {
pub fn new() -> Self {
- let (untracked_sender, receiver) = futures::channel::mpsc::unbounded();
+ let (untracked_sender, receiver) = mpsc::unbounded();
let sender = DaemonCommandSender(Arc::new(untracked_sender));
Self { sender, receiver }
@@ -355,7 +348,12 @@ impl DaemonCommandChannel {
self.sender.clone()
}
- fn destructure(self) -> (DaemonEventSender, UnboundedReceiver<InternalDaemonEvent>) {
+ fn destructure(
+ self,
+ ) -> (
+ DaemonEventSender,
+ mpsc::UnboundedReceiver<InternalDaemonEvent>,
+ ) {
let event_sender = DaemonEventSender::new(Arc::downgrade(&self.sender.0));
(event_sender, self.receiver)
@@ -363,7 +361,7 @@ impl DaemonCommandChannel {
}
#[derive(Clone)]
-pub struct DaemonCommandSender(Arc<UnboundedSender<InternalDaemonEvent>>);
+pub struct DaemonCommandSender(Arc<mpsc::UnboundedSender<InternalDaemonEvent>>);
impl DaemonCommandSender {
pub fn send(&self, command: DaemonCommand) -> Result<(), Error> {
@@ -374,7 +372,7 @@ impl DaemonCommandSender {
}
pub(crate) struct DaemonEventSender<E = InternalDaemonEvent> {
- sender: Weak<UnboundedSender<InternalDaemonEvent>>,
+ sender: Weak<mpsc::UnboundedSender<InternalDaemonEvent>>,
_event: PhantomData<E>,
}
@@ -391,7 +389,7 @@ where
}
impl DaemonEventSender {
- pub fn new(sender: Weak<UnboundedSender<InternalDaemonEvent>>) -> Self {
+ pub fn new(sender: Weak<mpsc::UnboundedSender<InternalDaemonEvent>>) -> Self {
DaemonEventSender {
sender,
_event: PhantomData,
@@ -456,13 +454,13 @@ pub trait EventListener {
}
pub struct Daemon<L: EventListener> {
- tunnel_command_tx: Arc<old_mpsc::UnboundedSender<TunnelCommand>>,
+ tunnel_command_tx: Arc<mpsc::UnboundedSender<TunnelCommand>>,
tunnel_state: TunnelState,
target_state: TargetState,
state: DaemonExecutionState,
#[cfg(target_os = "linux")]
exclude_pids: split_tunnel::PidManager,
- rx: BlockingStream<UnboundedReceiver<InternalDaemonEvent>>,
+ rx: BlockingStream<mpsc::UnboundedReceiver<InternalDaemonEvent>>,
tx: DaemonEventSender,
reconnection_job: Option<AbortHandle>,
event_listener: L,
@@ -479,7 +477,7 @@ pub struct Daemon<L: EventListener> {
app_version_info: AppVersionInfo,
shutdown_callbacks: Vec<Box<dyn FnOnce()>>,
/// oneshot channel that completes once the tunnel state machine has been shut down
- tunnel_state_machine_shutdown_signal: old_oneshot::Receiver<()>,
+ tunnel_state_machine_shutdown_signal: oneshot::Receiver<()>,
cache_dir: PathBuf,
}
@@ -497,7 +495,7 @@ where
#[cfg(target_os = "android")] android_context: AndroidContext,
) -> Result<Self, Error> {
let (tunnel_state_machine_shutdown_tx, tunnel_state_machine_shutdown_signal) =
- old_oneshot::channel();
+ oneshot::channel();
let mut rpc_runtime = mullvad_rpc::MullvadRpcRuntime::with_cache_dir(&cache_dir)
.map_err(Error::InitRpcFactory)?;
@@ -565,19 +563,21 @@ where
let tunnel_parameters_generator = MullvadTunnelParametersGenerator {
tx: internal_event_tx.clone(),
};
- let tunnel_command_tx = tunnel_state_machine::spawn(
- settings.allow_lan,
- settings.block_when_disconnected,
- tunnel_parameters_generator,
- log_dir,
- resource_dir,
- cache_dir.clone(),
- internal_event_tx.to_specialized_sender(),
- tunnel_state_machine_shutdown_tx,
- #[cfg(target_os = "android")]
- android_context,
- )
- .map_err(Error::TunnelError)?;
+ let tunnel_command_tx = rpc_runtime
+ .runtime()
+ .block_on(tunnel_state_machine::spawn(
+ settings.allow_lan,
+ settings.block_when_disconnected,
+ tunnel_parameters_generator,
+ log_dir,
+ resource_dir,
+ cache_dir.clone(),
+ internal_event_tx.to_specialized_sender(),
+ tunnel_state_machine_shutdown_tx,
+ #[cfg(target_os = "android")]
+ android_context,
+ ))
+ .map_err(Error::TunnelError)?;
let wireguard_key_manager =
wireguard::KeyManager::new(internal_event_tx.clone(), rpc_handle.clone());
@@ -661,44 +661,51 @@ where
}
fn finalize(self) {
- let (event_listener, shutdown_callbacks, tunnel_state_machine_shutdown_signal) =
- self.shutdown();
+ let (
+ event_listener,
+ shutdown_callbacks,
+ mut rpc_runtime,
+ tunnel_state_machine_shutdown_signal,
+ ) = self.shutdown();
for cb in shutdown_callbacks {
cb();
}
- let state_machine_shutdown = tokio_timer::Timer::default().timeout(
- // the oneshot::Canceled error type does not play well with the timer error, as such,
- // it has to be cast away.
- tunnel_state_machine_shutdown_signal.map_err(|_| {
- log::error!("Tunnel state machine already shut down");
- }),
+ let shutdown_signal = tokio02::time::timeout(
TUNNEL_STATE_MACHINE_SHUTDOWN_TIMEOUT,
+ tunnel_state_machine_shutdown_signal,
);
- match state_machine_shutdown.wait() {
- Ok(_) => {
- log::info!("Tunnel state machine shut down");
- }
- Err(_) => {
- log::error!("Tunnel state machine did not shut down in time, shutting down anyway");
- }
+ match rpc_runtime.runtime().block_on(shutdown_signal) {
+ Ok(_) => log::info!("Tunnel state machine shut down"),
+ Err(_) => log::error!("Tunnel state machine did not shut down gracefully"),
}
+ mem::drop(rpc_runtime);
+
mem::drop(event_listener);
}
/// Shuts down the daemon without shutting down the underlying event listener and the shutdown
/// callbacks
- fn shutdown(self) -> (L, Vec<Box<dyn FnOnce()>>, old_oneshot::Receiver<()>) {
+ fn shutdown(
+ self,
+ ) -> (
+ L,
+ Vec<Box<dyn FnOnce()>>,
+ mullvad_rpc::MullvadRpcRuntime,
+ oneshot::Receiver<()>,
+ ) {
let Daemon {
event_listener,
shutdown_callbacks,
+ rpc_runtime,
tunnel_state_machine_shutdown_signal,
..
} = self;
(
event_listener,
shutdown_callbacks,
+ rpc_runtime,
tunnel_state_machine_shutdown_signal,
)
}
@@ -781,7 +788,9 @@ where
fn handle_generate_tunnel_parameters(
&mut self,
- tunnel_parameters_tx: &mpsc::Sender<Result<TunnelParameters, ParameterGenerationError>>,
+ tunnel_parameters_tx: &sync_mpsc::Sender<
+ Result<TunnelParameters, ParameterGenerationError>,
+ >,
retry_attempt: u32,
) {
if let Some(account_token) = self.settings.get_account_token() {
@@ -1985,7 +1994,7 @@ impl TunnelParametersGenerator for MullvadTunnelParametersGenerator {
&mut self,
retry_attempt: u32,
) -> Result<TunnelParameters, ParameterGenerationError> {
- let (response_tx, response_rx) = mpsc::channel();
+ let (response_tx, response_rx) = sync_mpsc::channel();
if self
.tx
.send(InternalDaemonEvent::GenerateTunnelParameters(
diff --git a/talpid-core/src/offline/android.rs b/talpid-core/src/offline/android.rs
index 3863415cfb..7135ac339d 100644
--- a/talpid-core/src/offline/android.rs
+++ b/talpid-core/src/offline/android.rs
@@ -1,5 +1,5 @@
use crate::tunnel_state_machine::TunnelCommand;
-use futures01::sync::mpsc::UnboundedSender;
+use futures::channel::mpsc::UnboundedSender;
use jnix::{
jni::{
self,
diff --git a/talpid-core/src/offline/linux.rs b/talpid-core/src/offline/linux.rs
index 0b6526f0e5..435bc2193d 100644
--- a/talpid-core/src/offline/linux.rs
+++ b/talpid-core/src/offline/linux.rs
@@ -1,6 +1,5 @@
use crate::tunnel_state_machine::TunnelCommand;
-use futures::{StreamExt, TryStreamExt};
-use futures01::sync::mpsc::UnboundedSender;
+use futures::{channel::mpsc::UnboundedSender, StreamExt, TryStreamExt};
use netlink_packet_route::{
constants::{ARPHRD_LOOPBACK, ARPHRD_NONE, IFF_LOWER_UP, IFF_UP},
rtnl::link::nlas::{Info as LinkInfo, InfoKind, Nla as LinkNla},
diff --git a/talpid-core/src/offline/macos.rs b/talpid-core/src/offline/macos.rs
index 602da6cda9..82310d5f70 100644
--- a/talpid-core/src/offline/macos.rs
+++ b/talpid-core/src/offline/macos.rs
@@ -1,5 +1,5 @@
use crate::tunnel_state_machine::TunnelCommand;
-use futures01::sync::mpsc::UnboundedSender;
+use futures::channel::mpsc::UnboundedSender;
use std::{
net::{Ipv4Addr, SocketAddr},
sync::{
diff --git a/talpid-core/src/offline/mod.rs b/talpid-core/src/offline/mod.rs
index 5cda6290a3..baaa839780 100644
--- a/talpid-core/src/offline/mod.rs
+++ b/talpid-core/src/offline/mod.rs
@@ -1,5 +1,5 @@
use crate::tunnel_state_machine::TunnelCommand;
-use futures01::sync::mpsc::UnboundedSender;
+use futures::channel::mpsc::UnboundedSender;
use std::sync::Weak;
#[cfg(target_os = "android")]
use talpid_types::android::AndroidContext;
diff --git a/talpid-core/src/offline/windows.rs b/talpid-core/src/offline/windows.rs
index c7a86e4073..1563638bf6 100644
--- a/talpid-core/src/offline/windows.rs
+++ b/talpid-core/src/offline/windows.rs
@@ -1,5 +1,5 @@
use crate::{logging::windows::log_sink, tunnel_state_machine::TunnelCommand, winnet};
-use futures01::sync::mpsc::UnboundedSender;
+use futures::channel::mpsc::UnboundedSender;
use parking_lot::Mutex;
use std::{
ffi::c_void,
diff --git a/talpid-core/src/tunnel_state_machine/mod.rs b/talpid-core/src/tunnel_state_machine/mod.rs
index b0222ade67..6f5debdd52 100644
--- a/talpid-core/src/tunnel_state_machine/mod.rs
+++ b/talpid-core/src/tunnel_state_machine/mod.rs
@@ -23,10 +23,11 @@ use crate::{
tunnel::tun_provider::TunProvider,
};
-use futures01::{
- sync::{mpsc, oneshot},
- Async, Future, Poll, Stream,
+use futures::{
+ channel::{mpsc, oneshot},
+ StreamExt,
};
+use futures01::{sync::mpsc as old_mpsc, Async, Future, Poll, Stream};
use std::{
collections::HashSet,
io,
@@ -77,7 +78,7 @@ pub enum Error {
}
/// Spawn the tunnel state machine thread, returning a channel for sending tunnel commands.
-pub fn spawn(
+pub async fn spawn(
allow_lan: bool,
block_when_disconnected: bool,
tunnel_parameters_generator: impl TunnelParametersGenerator,
@@ -88,7 +89,7 @@ pub fn spawn(
shutdown_tx: oneshot::Sender<()>,
#[cfg(target_os = "android")] android_context: AndroidContext,
) -> Result<Arc<mpsc::UnboundedSender<TunnelCommand>>, Error> {
- let (command_tx, command_rx) = mpsc::unbounded();
+ let (command_tx, mut command_rx) = mpsc::unbounded();
let command_tx = Arc::new(command_tx);
let mut offline_monitor = offline::spawn_monitor(
Arc::downgrade(&command_tx),
@@ -105,6 +106,16 @@ pub fn spawn(
allow_lan,
);
+ // Hide internal 0.1 futures from the client
+ let (command_adapter_tx, command_adapter_rx) = old_mpsc::unbounded();
+ tokio02::spawn(async move {
+ while let Some(command) = command_rx.next().await {
+ if command_adapter_tx.unbounded_send(command).is_err() {
+ log::error!("Failed to forward daemon command");
+ }
+ }
+ });
+
let (startup_result_tx, startup_result_rx) = sync_mpsc::channel();
thread::spawn(move || {
match create_event_loop(
@@ -116,7 +127,7 @@ pub fn spawn(
log_dir,
resource_dir,
cache_dir,
- command_rx,
+ command_adapter_rx,
state_change_listener,
shutdown_tx,
) {
@@ -156,7 +167,7 @@ fn create_event_loop(
log_dir: Option<PathBuf>,
resource_dir: PathBuf,
cache_dir: impl AsRef<Path>,
- commands: mpsc::UnboundedReceiver<TunnelCommand>,
+ commands: old_mpsc::UnboundedReceiver<TunnelCommand>,
state_change_listener: impl Sender<TunnelStateTransition>,
shutdown_tx: oneshot::Sender<()>,
) -> Result<(Core, impl Future<Item = (), Error = Error>), Error> {
@@ -213,7 +224,7 @@ pub enum TunnelCommand {
/// by the stream.
struct TunnelStateMachine {
current_state: Option<TunnelStateWrapper>,
- commands: mpsc::UnboundedReceiver<TunnelCommand>,
+ commands: old_mpsc::UnboundedReceiver<TunnelCommand>,
shared_values: SharedTunnelStateValues,
}
@@ -227,7 +238,7 @@ impl TunnelStateMachine {
log_dir: Option<PathBuf>,
resource_dir: PathBuf,
cache_dir: impl AsRef<Path>,
- commands: mpsc::UnboundedReceiver<TunnelCommand>,
+ commands: old_mpsc::UnboundedReceiver<TunnelCommand>,
) -> Result<Self, Error> {
let args = if block_when_disconnected {
FirewallArguments {
@@ -432,7 +443,7 @@ trait TunnelState: Into<TunnelStateWrapper> + Sized {
/// [`EventConsequence`]: enum.EventConsequence.html
fn handle_event(
self,
- commands: &mut mpsc::UnboundedReceiver<TunnelCommand>,
+ commands: &mut old_mpsc::UnboundedReceiver<TunnelCommand>,
shared_values: &mut SharedTunnelStateValues,
) -> EventConsequence<Self>;
}
@@ -456,7 +467,7 @@ macro_rules! state_wrapper {
impl $wrapper_name {
fn handle_event(
self,
- commands: &mut mpsc::UnboundedReceiver<TunnelCommand>,
+ commands: &mut old_mpsc::UnboundedReceiver<TunnelCommand>,
shared_values: &mut SharedTunnelStateValues,
) -> TunnelStateMachineAction {
match self {