diff options
| author | David Lönnhager <david.l@mullvad.net> | 2019-11-26 14:27:35 +0100 |
|---|---|---|
| committer | David Lönnhager <david.l@mullvad.net> | 2019-11-26 14:27:35 +0100 |
| commit | 86fb4a15f385d500100b5978e7f5eb16ee750b43 (patch) | |
| tree | 4a5f6a8b1999a30726e91d59e89f8eb5123cc3ee | |
| parent | 7f075d9c74682e5efa36d3aa07f6e792596c6120 (diff) | |
| parent | 77e65562aa5b6868b002b64b06afafc64c49d987 (diff) | |
| download | mullvadvpn-86fb4a15f385d500100b5978e7f5eb16ee750b43.tar.xz mullvadvpn-86fb4a15f385d500100b5978e7f5eb16ee750b43.zip | |
Merge branch 'close-tunnel-thread-gracefully'
| -rw-r--r-- | mullvad-daemon/src/lib.rs | 21 | ||||
| -rw-r--r-- | talpid-core/src/offline/dummy.rs | 5 | ||||
| -rw-r--r-- | talpid-core/src/offline/linux.rs | 18 | ||||
| -rw-r--r-- | talpid-core/src/offline/macos.rs | 17 | ||||
| -rw-r--r-- | talpid-core/src/offline/mod.rs | 3 | ||||
| -rw-r--r-- | talpid-core/src/offline/windows.rs | 21 | ||||
| -rw-r--r-- | talpid-core/src/routing/windows.rs | 4 | ||||
| -rw-r--r-- | talpid-core/src/tunnel_state_machine/mod.rs | 11 | ||||
| -rw-r--r-- | talpid-core/src/winnet.rs | 8 |
9 files changed, 61 insertions, 47 deletions
diff --git a/mullvad-daemon/src/lib.rs b/mullvad-daemon/src/lib.rs index 0a2c65cb3b..35839d47f2 100644 --- a/mullvad-daemon/src/lib.rs +++ b/mullvad-daemon/src/lib.rs @@ -27,9 +27,10 @@ use crate::management_interface::{ BoxFuture, ManagementInterfaceEventBroadcaster, ManagementInterfaceServer, }; use futures::{ + executor, future::{self, Executor}, sync::{mpsc::UnboundedSender, oneshot}, - Future, Sink, + Future, }; use log::{debug, error, info, warn}; use mullvad_rpc::{AccountsProxy, HttpHandle, WireguardKeyProxy}; @@ -49,7 +50,13 @@ use mullvad_types::{ use settings::Settings; #[cfg(not(target_os = "android"))] use std::path::Path; -use std::{io, mem, path::PathBuf, sync::mpsc, thread, time::Duration}; +use std::{ + io, mem, + path::PathBuf, + sync::{mpsc, Arc}, + thread, + time::Duration, +}; use talpid_core::{ mpsc::IntoSender, tunnel::tun_provider::{PlatformTunProvider, TunProvider}, @@ -128,8 +135,6 @@ pub enum Error { ReadDirError(#[error(source)] io::Error), } -type SyncUnboundedSender<T> = ::futures::sink::Wait<UnboundedSender<T>>; - /// All events that can happen in the daemon. Sent from various threads and exposed interfaces. pub(crate) enum InternalDaemonEvent { /// Tunnel has changed state. @@ -248,7 +253,7 @@ pub trait EventListener { } pub struct Daemon<L: EventListener = ManagementInterfaceEventBroadcaster> { - tunnel_command_tx: SyncUnboundedSender<TunnelCommand>, + tunnel_command_tx: Arc<UnboundedSender<TunnelCommand>>, tunnel_state: TunnelState, target_state: TargetState, state: DaemonExecutionState, @@ -449,7 +454,7 @@ where relay_selector.update(); let mut daemon = Daemon { - tunnel_command_tx: Sink::wait(tunnel_command_tx), + tunnel_command_tx, tunnel_state: TunnelState::Disconnected, target_state: TargetState::Unsecured, state: DaemonExecutionState::Running, @@ -1491,8 +1496,8 @@ where } fn send_tunnel_command(&mut self, command: TunnelCommand) { - self.tunnel_command_tx - .send(command) + let mut sink = executor::spawn(Arc::make_mut(&mut self.tunnel_command_tx)); + sink.wait_send(command) .expect("Tunnel state machine has stopped"); } diff --git a/talpid-core/src/offline/dummy.rs b/talpid-core/src/offline/dummy.rs index 37eefb3c5e..7eda41d433 100644 --- a/talpid-core/src/offline/dummy.rs +++ b/talpid-core/src/offline/dummy.rs @@ -1,5 +1,6 @@ use crate::tunnel_state_machine::TunnelCommand; use futures::sync::mpsc::UnboundedSender; +use std::sync::Weak; #[derive(err_derive::Error, Debug)] #[error(display = "Dummy offline check error")] @@ -13,6 +14,8 @@ impl MonitorHandle { } } -pub fn spawn_monitor(_sender: UnboundedSender<TunnelCommand>) -> Result<MonitorHandle, Error> { +pub fn spawn_monitor( + _sender: Weak<UnboundedSender<TunnelCommand>>, +) -> Result<MonitorHandle, Error> { Ok(MonitorHandle) } diff --git a/talpid-core/src/offline/linux.rs b/talpid-core/src/offline/linux.rs index c38233d5e5..b75f187505 100644 --- a/talpid-core/src/offline/linux.rs +++ b/talpid-core/src/offline/linux.rs @@ -9,7 +9,7 @@ use rtnetlink::{ constants::{RTMGRP_IPV4_IFADDR, RTMGRP_IPV6_IFADDR, RTMGRP_LINK, RTMGRP_NOTIFY}, Connection, Handle, }; -use std::{collections::BTreeSet, io, thread}; +use std::{collections::BTreeSet, io, sync::Weak, thread}; use talpid_types::ErrorExt; pub type Result<T> = std::result::Result<T, Error>; @@ -38,7 +38,7 @@ pub enum Error { pub struct MonitorHandle; -pub fn spawn_monitor(sender: UnboundedSender<TunnelCommand>) -> Result<MonitorHandle> { +pub fn spawn_monitor(sender: Weak<UnboundedSender<TunnelCommand>>) -> Result<MonitorHandle> { let socket = SocketAddr::new( 0, RTMGRP_NOTIFY | RTMGRP_LINK | RTMGRP_IPV4_IFADDR | RTMGRP_IPV6_IFADDR, @@ -209,11 +209,11 @@ fn monitor_event_loop( struct LinkMonitor { is_offline: bool, - sender: UnboundedSender<TunnelCommand>, + sender: Weak<UnboundedSender<TunnelCommand>>, } impl LinkMonitor { - pub fn new(sender: UnboundedSender<TunnelCommand>) -> Self { + pub fn new(sender: Weak<UnboundedSender<TunnelCommand>>) -> Self { let is_offline = is_offline(); LinkMonitor { is_offline, sender } @@ -226,14 +226,16 @@ impl LinkMonitor { fn set_is_offline(&mut self, is_offline: bool) { if self.is_offline != is_offline { self.is_offline = is_offline; - let _ = self - .sender - .unbounded_send(TunnelCommand::IsOffline(is_offline)); + if let Some(sender) = self.sender.upgrade() { + let _ = sender.unbounded_send(TunnelCommand::IsOffline(is_offline)); + } } } /// Allow the offline check to fail open. fn reset(&mut self) { - let _ = self.sender.unbounded_send(TunnelCommand::IsOffline(false)); + if let Some(sender) = self.sender.upgrade() { + let _ = sender.unbounded_send(TunnelCommand::IsOffline(false)); + } } } diff --git a/talpid-core/src/offline/macos.rs b/talpid-core/src/offline/macos.rs index 51f13a21a1..ab9fee3b38 100644 --- a/talpid-core/src/offline/macos.rs +++ b/talpid-core/src/offline/macos.rs @@ -1,7 +1,10 @@ use crate::tunnel_state_machine::TunnelCommand; use futures::sync::mpsc::UnboundedSender; use log::{debug, trace}; -use std::{sync::mpsc, thread}; +use std::{ + sync::{mpsc, Weak}, + thread, +}; use system_configuration::{ core_foundation::{ array::CFArray, @@ -23,7 +26,7 @@ pub enum Error { pub struct MonitorHandle; -pub fn spawn_monitor(sender: UnboundedSender<TunnelCommand>) -> Result<MonitorHandle, Error> { +pub fn spawn_monitor(sender: Weak<UnboundedSender<TunnelCommand>>) -> Result<MonitorHandle, Error> { let (result_tx, result_rx) = mpsc::channel(); thread::spawn(move || match create_dynamic_store(sender) { Ok(store) => { @@ -44,7 +47,9 @@ impl MonitorHandle { } } -fn create_dynamic_store(sender: UnboundedSender<TunnelCommand>) -> Result<SCDynamicStore, Error> { +fn create_dynamic_store( + sender: Weak<UnboundedSender<TunnelCommand>>, +) -> Result<SCDynamicStore, Error> { let callback_context = SCDynamicStoreCallBackContext { callout: primary_interface_change_callback, info: sender, @@ -76,12 +81,14 @@ fn run_dynamic_store_runloop(store: SCDynamicStore) { fn primary_interface_change_callback( store: SCDynamicStore, _changed_keys: CFArray<CFString>, - state: &mut UnboundedSender<TunnelCommand>, + state: &mut Weak<UnboundedSender<TunnelCommand>>, ) { let is_offline = store.get(CFString::new(PRIMARY_INTERFACE_KEY)).is_none(); debug!( "Computer went {}", if is_offline { "offline" } else { "online" } ); - let _ = state.unbounded_send(TunnelCommand::IsOffline(is_offline)); + if let Some(state) = state.upgrade() { + let _ = state.unbounded_send(TunnelCommand::IsOffline(is_offline)); + } } diff --git a/talpid-core/src/offline/mod.rs b/talpid-core/src/offline/mod.rs index 34db9453e7..2092da2f0a 100644 --- a/talpid-core/src/offline/mod.rs +++ b/talpid-core/src/offline/mod.rs @@ -1,5 +1,6 @@ use crate::tunnel_state_machine::TunnelCommand; use futures::sync::mpsc::UnboundedSender; +use std::sync::Weak; #[cfg(target_os = "macos")] #[path = "macos.rs"] @@ -27,6 +28,6 @@ impl MonitorHandle { } } -pub fn spawn_monitor(sender: UnboundedSender<TunnelCommand>) -> Result<MonitorHandle, Error> { +pub fn spawn_monitor(sender: Weak<UnboundedSender<TunnelCommand>>) -> Result<MonitorHandle, Error> { Ok(MonitorHandle(imp::spawn_monitor(sender)?)) } diff --git a/talpid-core/src/offline/windows.rs b/talpid-core/src/offline/windows.rs index 816fe89b4d..8bb13c98e3 100644 --- a/talpid-core/src/offline/windows.rs +++ b/talpid-core/src/offline/windows.rs @@ -15,7 +15,7 @@ use std::{ mem::zeroed, os::windows::io::{IntoRawHandle, RawHandle}, ptr, - sync::Arc, + sync::{Arc, Weak}, thread, time::Duration, }; @@ -62,7 +62,7 @@ pub struct BroadcastListener { unsafe impl Send for BroadcastListener {} impl BroadcastListener { - pub fn start(sender: UnboundedSender<TunnelCommand>) -> Result<Self, Error> { + pub fn start(sender: Weak<UnboundedSender<TunnelCommand>>) -> Result<Self, Error> { let mut system_state = Arc::new(Mutex::new(SystemState { network_connectivity: false, suspended: false, @@ -223,9 +223,7 @@ impl Drop for BroadcastListener { PostThreadMessageW(self.thread_id, REQUEST_THREAD_SHUTDOWN, 0, 0); WaitForSingleObject(self.thread_handle, INFINITE); CloseHandle(self.thread_handle); - if !winnet::WinNet_DeactivateConnectivityMonitor() { - log::error!("Failed to deactivate connectivity monitor"); - } + winnet::WinNet_DeactivateConnectivityMonitor(); } } } @@ -239,7 +237,7 @@ enum StateChange { struct SystemState { network_connectivity: bool, suspended: bool, - daemon_channel: UnboundedSender<TunnelCommand>, + daemon_channel: Weak<UnboundedSender<TunnelCommand>>, } impl SystemState { @@ -257,11 +255,10 @@ impl SystemState { let new_state = self.is_offline_currently(); if old_state != new_state { - if let Err(e) = self - .daemon_channel - .unbounded_send(TunnelCommand::IsOffline(new_state)) - { - log::error!("Failed to send new offline state to daemon: {}", e); + if let Some(daemon_channel) = self.daemon_channel.upgrade() { + if let Err(e) = daemon_channel.unbounded_send(TunnelCommand::IsOffline(new_state)) { + log::error!("Failed to send new offline state to daemon: {}", e); + } } } } @@ -273,7 +270,7 @@ impl SystemState { pub type MonitorHandle = BroadcastListener; -pub fn spawn_monitor(sender: UnboundedSender<TunnelCommand>) -> Result<MonitorHandle, Error> { +pub fn spawn_monitor(sender: Weak<UnboundedSender<TunnelCommand>>) -> Result<MonitorHandle, Error> { BroadcastListener::start(sender) } diff --git a/talpid-core/src/routing/windows.rs b/talpid-core/src/routing/windows.rs index 684d1a3184..9e70c800fa 100644 --- a/talpid-core/src/routing/windows.rs +++ b/talpid-core/src/routing/windows.rs @@ -47,9 +47,7 @@ impl RouteManagerImpl { impl Drop for RouteManagerImpl { fn drop(&mut self) { - if !winnet::deactivate_routing_manager() { - log::error!("Failed to deactivate routing manager"); - } + winnet::deactivate_routing_manager() } } diff --git a/talpid-core/src/tunnel_state_machine/mod.rs b/talpid-core/src/tunnel_state_machine/mod.rs index 66e564456f..42c959c81e 100644 --- a/talpid-core/src/tunnel_state_machine/mod.rs +++ b/talpid-core/src/tunnel_state_machine/mod.rs @@ -25,7 +25,7 @@ use futures::{sync::mpsc, Async, Future, Poll, Stream}; use std::{ io, path::{Path, PathBuf}, - sync::mpsc as sync_mpsc, + sync::{mpsc as sync_mpsc, Arc}, thread, }; use talpid_types::{ @@ -69,14 +69,15 @@ pub fn spawn<P, T>( resource_dir: PathBuf, cache_dir: P, state_change_listener: IntoSender<TunnelStateTransition, T>, -) -> Result<mpsc::UnboundedSender<TunnelCommand>, Error> +) -> Result<Arc<mpsc::UnboundedSender<TunnelCommand>>, Error> where P: AsRef<Path> + Send + 'static, T: From<TunnelStateTransition> + Send + 'static, { let (command_tx, command_rx) = mpsc::unbounded(); + let command_tx = Arc::new(command_tx); let offline_monitor = - offline::spawn_monitor(command_tx.clone()).map_err(Error::OfflineMonitorError)?; + offline::spawn_monitor(Arc::downgrade(&command_tx)).map_err(Error::OfflineMonitorError)?; let is_offline = offline_monitor.is_offline(); let (startup_result_tx, startup_result_rx) = sync_mpsc::channel(); @@ -116,8 +117,8 @@ where startup_result_rx .recv() - .expect("Failed to start tunnel state machine thread") - .map(|_| command_tx) + .expect("Failed to start tunnel state machine thread")?; + Ok(command_tx) } fn create_event_loop<T>( diff --git a/talpid-core/src/winnet.rs b/talpid-core/src/winnet.rs index d4130e213b..7568200d43 100644 --- a/talpid-core/src/winnet.rs +++ b/talpid-core/src/winnet.rs @@ -364,7 +364,7 @@ pub fn routing_manager_add_routes(routes: &[WinNetRoute]) -> bool { unsafe { WinNet_AddRoutes(ptr, length) } } -pub fn deactivate_routing_manager() -> bool { +pub fn deactivate_routing_manager() { unsafe { WinNet_DeactivateRouteManager() } } @@ -406,7 +406,7 @@ mod api { // pub fn WinNet_DeleteRoute(route: *const super::WinNetRoute) -> bool; #[link_name = "WinNet_DeactivateRouteManager"] - pub fn WinNet_DeactivateRouteManager() -> bool; + pub fn WinNet_DeactivateRouteManager(); #[link_name = "WinNet_EnsureTopMetric"] pub fn WinNet_EnsureTopMetric( @@ -429,7 +429,7 @@ mod api { ) -> bool; #[link_name = "WinNet_ReleaseString"] - pub fn WinNet_ReleaseString(string: *mut wchar_t) -> u32; + pub fn WinNet_ReleaseString(string: *mut wchar_t); #[link_name = "WinNet_ActivateConnectivityMonitor"] pub fn WinNet_ActivateConnectivityMonitor( @@ -450,7 +450,7 @@ mod api { pub fn WinNet_UnregisterDefaultRouteChangedCallback(registrationHandle: *mut libc::c_void); #[link_name = "WinNet_DeactivateConnectivityMonitor"] - pub fn WinNet_DeactivateConnectivityMonitor() -> bool; + pub fn WinNet_DeactivateConnectivityMonitor(); #[link_name = "WinNet_AddDeviceIpAddresses"] pub fn WinNet_AddDeviceIpAddresses( |
