summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorDavid Lönnhager <david.l@mullvad.net>2022-06-08 12:05:58 +0200
committerDavid Lönnhager <david.l@mullvad.net>2022-06-14 13:59:50 +0200
commit15346dbc719a8b0c70720235ea23e117342c7708 (patch)
treeafdd5fbd07ff96ebcff22923fe50bf3a8d972fa1
parentb1537ae79f9d0bc2e038c744e6a55b2e565bfc75 (diff)
downloadmullvadvpn-15346dbc719a8b0c70720235ea23e117342c7708.tar.xz
mullvadvpn-15346dbc719a8b0c70720235ea23e117342c7708.zip
Add tunnel state machine handle
-rw-r--r--mullvad-daemon/src/lib.rs20
-rw-r--r--talpid-core/src/tunnel_state_machine/mod.rs21
2 files changed, 26 insertions, 15 deletions
diff --git a/mullvad-daemon/src/lib.rs b/mullvad-daemon/src/lib.rs
index 8dfcf67ace..b6a7d55955 100644
--- a/mullvad-daemon/src/lib.rs
+++ b/mullvad-daemon/src/lib.rs
@@ -66,7 +66,7 @@ use std::{
use talpid_core::split_tunnel;
use talpid_core::{
mpsc::Sender,
- tunnel_state_machine::{self, TunnelCommand},
+ tunnel_state_machine::{self, TunnelCommand, TunnelStateMachineHandle},
};
#[cfg(target_os = "android")]
use talpid_types::android::AndroidContext;
@@ -506,7 +506,6 @@ pub trait EventListener {
}
pub struct Daemon<L: EventListener> {
- tunnel_command_tx: Arc<mpsc::UnboundedSender<TunnelCommand>>,
tunnel_state: TunnelState,
target_state: PersistentTargetState,
state: DaemonExecutionState,
@@ -529,7 +528,7 @@ pub struct Daemon<L: EventListener> {
parameters_generator: tunnel::ParametersGenerator,
app_version_info: Option<AppVersionInfo>,
shutdown_tasks: Vec<Pin<Box<dyn Future<Output = ()>>>>,
- tunnel_state_machine_handle: tunnel_state_machine::JoinHandle,
+ tunnel_state_machine_handle: TunnelStateMachineHandle,
#[cfg(target_os = "windows")]
volume_update_tx: mpsc::UnboundedSender<()>,
}
@@ -650,7 +649,7 @@ where
let (offline_state_tx, offline_state_rx) = mpsc::unbounded();
#[cfg(target_os = "windows")]
let (volume_update_tx, volume_update_rx) = mpsc::unbounded();
- let (tunnel_command_tx, tunnel_state_machine_handle) = tunnel_state_machine::spawn(
+ let tunnel_state_machine_handle = tunnel_state_machine::spawn(
tunnel_state_machine::InitialTunnelState {
allow_lan: settings.allow_lan,
block_when_disconnected: settings.block_when_disconnected,
@@ -675,7 +674,8 @@ where
.await
.map_err(Error::TunnelError)?;
- endpoint_updater.set_tunnel_command_tx(Arc::downgrade(&tunnel_command_tx));
+ endpoint_updater
+ .set_tunnel_command_tx(Arc::downgrade(tunnel_state_machine_handle.command_tx()));
api::forward_offline_state(api_availability.clone(), offline_state_rx);
@@ -706,7 +706,6 @@ where
relay_list_updater.update().await;
let daemon = Daemon {
- tunnel_command_tx,
tunnel_state: TunnelState::Disconnected,
target_state,
state: DaemonExecutionState::Running,
@@ -792,7 +791,7 @@ where
L,
Vec<Pin<Box<dyn Future<Output = ()>>>>,
mullvad_api::Runtime,
- tunnel_state_machine::JoinHandle,
+ TunnelStateMachineHandle,
) {
let Daemon {
event_listener,
@@ -918,12 +917,12 @@ where
fn schedule_reconnect(&mut self, delay: Duration) {
self.unschedule_reconnect();
- let tunnel_command_tx = self.tx.to_specialized_sender();
+ let daemon_command_tx = self.tx.to_specialized_sender();
let (future, abort_handle) = abortable(Box::pin(async move {
tokio::time::sleep(delay).await;
log::debug!("Attempting to reconnect");
let (tx, rx) = oneshot::channel();
- let _ = tunnel_command_tx.send(DaemonCommand::Reconnect(tx));
+ let _ = daemon_command_tx.send(DaemonCommand::Reconnect(tx));
// suppress "unable to send" warning:
let _ = rx.await;
}));
@@ -2212,7 +2211,8 @@ where
}
fn send_tunnel_command(&self, command: TunnelCommand) {
- self.tunnel_command_tx
+ self.tunnel_state_machine_handle
+ .command_tx()
.unbounded_send(command)
.expect("Tunnel state machine has stopped");
}
diff --git a/talpid-core/src/tunnel_state_machine/mod.rs b/talpid-core/src/tunnel_state_machine/mod.rs
index 42d20181a4..16d6e6d55b 100644
--- a/talpid-core/src/tunnel_state_machine/mod.rs
+++ b/talpid-core/src/tunnel_state_machine/mod.rs
@@ -116,7 +116,7 @@ pub async fn spawn(
#[cfg(target_os = "windows")] volume_update_rx: mpsc::UnboundedReceiver<()>,
#[cfg(target_os = "macos")] exclusion_gid: u32,
#[cfg(target_os = "android")] android_context: AndroidContext,
-) -> Result<(Arc<mpsc::UnboundedSender<TunnelCommand>>, JoinHandle), Error> {
+) -> Result<TunnelStateMachineHandle, Error> {
let (command_tx, command_rx) = mpsc::unbounded();
let command_tx = Arc::new(command_tx);
@@ -157,7 +157,10 @@ pub async fn spawn(
}
});
- Ok((command_tx, JoinHandle { shutdown_rx }))
+ Ok(TunnelStateMachineHandle {
+ command_tx,
+ shutdown_rx,
+ })
}
/// Representation of external commands for the tunnel state machine.
@@ -596,18 +599,26 @@ state_wrapper! {
}
}
-/// Handle used to wait for the tunnel state machine to shut down.
-pub struct JoinHandle {
+/// Handle used to control the tunnel state machine.
+pub struct TunnelStateMachineHandle {
+ command_tx: Arc<mpsc::UnboundedSender<TunnelCommand>>,
shutdown_rx: oneshot::Receiver<()>,
}
-impl JoinHandle {
+impl TunnelStateMachineHandle {
/// Waits for the tunnel state machine to shut down.
/// This may fail after a timeout of `TUNNEL_STATE_MACHINE_SHUTDOWN_TIMEOUT`.
pub async fn try_join(self) {
+ drop(self.command_tx);
+
match tokio::time::timeout(TUNNEL_STATE_MACHINE_SHUTDOWN_TIMEOUT, self.shutdown_rx).await {
Ok(_) => log::info!("Tunnel state machine shut down"),
Err(_) => log::error!("Tunnel state machine did not shut down gracefully"),
}
}
+
+ /// Returns tunnel command sender.
+ pub fn command_tx(&self) -> &Arc<mpsc::UnboundedSender<TunnelCommand>> {
+ &self.command_tx
+ }
}