diff options
| author | David Lönnhager <david.l@mullvad.net> | 2022-06-08 12:05:58 +0200 |
|---|---|---|
| committer | David Lönnhager <david.l@mullvad.net> | 2022-06-14 13:59:50 +0200 |
| commit | 15346dbc719a8b0c70720235ea23e117342c7708 (patch) | |
| tree | afdd5fbd07ff96ebcff22923fe50bf3a8d972fa1 | |
| parent | b1537ae79f9d0bc2e038c744e6a55b2e565bfc75 (diff) | |
| download | mullvadvpn-15346dbc719a8b0c70720235ea23e117342c7708.tar.xz mullvadvpn-15346dbc719a8b0c70720235ea23e117342c7708.zip | |
Add tunnel state machine handle
| -rw-r--r-- | mullvad-daemon/src/lib.rs | 20 | ||||
| -rw-r--r-- | talpid-core/src/tunnel_state_machine/mod.rs | 21 |
2 files changed, 26 insertions, 15 deletions
diff --git a/mullvad-daemon/src/lib.rs b/mullvad-daemon/src/lib.rs index 8dfcf67ace..b6a7d55955 100644 --- a/mullvad-daemon/src/lib.rs +++ b/mullvad-daemon/src/lib.rs @@ -66,7 +66,7 @@ use std::{ use talpid_core::split_tunnel; use talpid_core::{ mpsc::Sender, - tunnel_state_machine::{self, TunnelCommand}, + tunnel_state_machine::{self, TunnelCommand, TunnelStateMachineHandle}, }; #[cfg(target_os = "android")] use talpid_types::android::AndroidContext; @@ -506,7 +506,6 @@ pub trait EventListener { } pub struct Daemon<L: EventListener> { - tunnel_command_tx: Arc<mpsc::UnboundedSender<TunnelCommand>>, tunnel_state: TunnelState, target_state: PersistentTargetState, state: DaemonExecutionState, @@ -529,7 +528,7 @@ pub struct Daemon<L: EventListener> { parameters_generator: tunnel::ParametersGenerator, app_version_info: Option<AppVersionInfo>, shutdown_tasks: Vec<Pin<Box<dyn Future<Output = ()>>>>, - tunnel_state_machine_handle: tunnel_state_machine::JoinHandle, + tunnel_state_machine_handle: TunnelStateMachineHandle, #[cfg(target_os = "windows")] volume_update_tx: mpsc::UnboundedSender<()>, } @@ -650,7 +649,7 @@ where let (offline_state_tx, offline_state_rx) = mpsc::unbounded(); #[cfg(target_os = "windows")] let (volume_update_tx, volume_update_rx) = mpsc::unbounded(); - let (tunnel_command_tx, tunnel_state_machine_handle) = tunnel_state_machine::spawn( + let tunnel_state_machine_handle = tunnel_state_machine::spawn( tunnel_state_machine::InitialTunnelState { allow_lan: settings.allow_lan, block_when_disconnected: settings.block_when_disconnected, @@ -675,7 +674,8 @@ where .await .map_err(Error::TunnelError)?; - endpoint_updater.set_tunnel_command_tx(Arc::downgrade(&tunnel_command_tx)); + endpoint_updater + .set_tunnel_command_tx(Arc::downgrade(tunnel_state_machine_handle.command_tx())); api::forward_offline_state(api_availability.clone(), offline_state_rx); @@ -706,7 +706,6 @@ where relay_list_updater.update().await; let daemon = Daemon { - tunnel_command_tx, tunnel_state: TunnelState::Disconnected, target_state, state: DaemonExecutionState::Running, @@ -792,7 +791,7 @@ where L, Vec<Pin<Box<dyn Future<Output = ()>>>>, mullvad_api::Runtime, - tunnel_state_machine::JoinHandle, + TunnelStateMachineHandle, ) { let Daemon { event_listener, @@ -918,12 +917,12 @@ where fn schedule_reconnect(&mut self, delay: Duration) { self.unschedule_reconnect(); - let tunnel_command_tx = self.tx.to_specialized_sender(); + let daemon_command_tx = self.tx.to_specialized_sender(); let (future, abort_handle) = abortable(Box::pin(async move { tokio::time::sleep(delay).await; log::debug!("Attempting to reconnect"); let (tx, rx) = oneshot::channel(); - let _ = tunnel_command_tx.send(DaemonCommand::Reconnect(tx)); + let _ = daemon_command_tx.send(DaemonCommand::Reconnect(tx)); // suppress "unable to send" warning: let _ = rx.await; })); @@ -2212,7 +2211,8 @@ where } fn send_tunnel_command(&self, command: TunnelCommand) { - self.tunnel_command_tx + self.tunnel_state_machine_handle + .command_tx() .unbounded_send(command) .expect("Tunnel state machine has stopped"); } diff --git a/talpid-core/src/tunnel_state_machine/mod.rs b/talpid-core/src/tunnel_state_machine/mod.rs index 42d20181a4..16d6e6d55b 100644 --- a/talpid-core/src/tunnel_state_machine/mod.rs +++ b/talpid-core/src/tunnel_state_machine/mod.rs @@ -116,7 +116,7 @@ pub async fn spawn( #[cfg(target_os = "windows")] volume_update_rx: mpsc::UnboundedReceiver<()>, #[cfg(target_os = "macos")] exclusion_gid: u32, #[cfg(target_os = "android")] android_context: AndroidContext, -) -> Result<(Arc<mpsc::UnboundedSender<TunnelCommand>>, JoinHandle), Error> { +) -> Result<TunnelStateMachineHandle, Error> { let (command_tx, command_rx) = mpsc::unbounded(); let command_tx = Arc::new(command_tx); @@ -157,7 +157,10 @@ pub async fn spawn( } }); - Ok((command_tx, JoinHandle { shutdown_rx })) + Ok(TunnelStateMachineHandle { + command_tx, + shutdown_rx, + }) } /// Representation of external commands for the tunnel state machine. @@ -596,18 +599,26 @@ state_wrapper! { } } -/// Handle used to wait for the tunnel state machine to shut down. -pub struct JoinHandle { +/// Handle used to control the tunnel state machine. +pub struct TunnelStateMachineHandle { + command_tx: Arc<mpsc::UnboundedSender<TunnelCommand>>, shutdown_rx: oneshot::Receiver<()>, } -impl JoinHandle { +impl TunnelStateMachineHandle { /// Waits for the tunnel state machine to shut down. /// This may fail after a timeout of `TUNNEL_STATE_MACHINE_SHUTDOWN_TIMEOUT`. pub async fn try_join(self) { + drop(self.command_tx); + match tokio::time::timeout(TUNNEL_STATE_MACHINE_SHUTDOWN_TIMEOUT, self.shutdown_rx).await { Ok(_) => log::info!("Tunnel state machine shut down"), Err(_) => log::error!("Tunnel state machine did not shut down gracefully"), } } + + /// Returns tunnel command sender. + pub fn command_tx(&self) -> &Arc<mpsc::UnboundedSender<TunnelCommand>> { + &self.command_tx + } } |
