summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorDavid Lönnhager <david.l@mullvad.net>2019-11-26 14:27:35 +0100
committerDavid Lönnhager <david.l@mullvad.net>2019-11-26 14:27:35 +0100
commit86fb4a15f385d500100b5978e7f5eb16ee750b43 (patch)
tree4a5f6a8b1999a30726e91d59e89f8eb5123cc3ee
parent7f075d9c74682e5efa36d3aa07f6e792596c6120 (diff)
parent77e65562aa5b6868b002b64b06afafc64c49d987 (diff)
downloadmullvadvpn-86fb4a15f385d500100b5978e7f5eb16ee750b43.tar.xz
mullvadvpn-86fb4a15f385d500100b5978e7f5eb16ee750b43.zip
Merge branch 'close-tunnel-thread-gracefully'
-rw-r--r--mullvad-daemon/src/lib.rs21
-rw-r--r--talpid-core/src/offline/dummy.rs5
-rw-r--r--talpid-core/src/offline/linux.rs18
-rw-r--r--talpid-core/src/offline/macos.rs17
-rw-r--r--talpid-core/src/offline/mod.rs3
-rw-r--r--talpid-core/src/offline/windows.rs21
-rw-r--r--talpid-core/src/routing/windows.rs4
-rw-r--r--talpid-core/src/tunnel_state_machine/mod.rs11
-rw-r--r--talpid-core/src/winnet.rs8
9 files changed, 61 insertions, 47 deletions
diff --git a/mullvad-daemon/src/lib.rs b/mullvad-daemon/src/lib.rs
index 0a2c65cb3b..35839d47f2 100644
--- a/mullvad-daemon/src/lib.rs
+++ b/mullvad-daemon/src/lib.rs
@@ -27,9 +27,10 @@ use crate::management_interface::{
BoxFuture, ManagementInterfaceEventBroadcaster, ManagementInterfaceServer,
};
use futures::{
+ executor,
future::{self, Executor},
sync::{mpsc::UnboundedSender, oneshot},
- Future, Sink,
+ Future,
};
use log::{debug, error, info, warn};
use mullvad_rpc::{AccountsProxy, HttpHandle, WireguardKeyProxy};
@@ -49,7 +50,13 @@ use mullvad_types::{
use settings::Settings;
#[cfg(not(target_os = "android"))]
use std::path::Path;
-use std::{io, mem, path::PathBuf, sync::mpsc, thread, time::Duration};
+use std::{
+ io, mem,
+ path::PathBuf,
+ sync::{mpsc, Arc},
+ thread,
+ time::Duration,
+};
use talpid_core::{
mpsc::IntoSender,
tunnel::tun_provider::{PlatformTunProvider, TunProvider},
@@ -128,8 +135,6 @@ pub enum Error {
ReadDirError(#[error(source)] io::Error),
}
-type SyncUnboundedSender<T> = ::futures::sink::Wait<UnboundedSender<T>>;
-
/// All events that can happen in the daemon. Sent from various threads and exposed interfaces.
pub(crate) enum InternalDaemonEvent {
/// Tunnel has changed state.
@@ -248,7 +253,7 @@ pub trait EventListener {
}
pub struct Daemon<L: EventListener = ManagementInterfaceEventBroadcaster> {
- tunnel_command_tx: SyncUnboundedSender<TunnelCommand>,
+ tunnel_command_tx: Arc<UnboundedSender<TunnelCommand>>,
tunnel_state: TunnelState,
target_state: TargetState,
state: DaemonExecutionState,
@@ -449,7 +454,7 @@ where
relay_selector.update();
let mut daemon = Daemon {
- tunnel_command_tx: Sink::wait(tunnel_command_tx),
+ tunnel_command_tx,
tunnel_state: TunnelState::Disconnected,
target_state: TargetState::Unsecured,
state: DaemonExecutionState::Running,
@@ -1491,8 +1496,8 @@ where
}
fn send_tunnel_command(&mut self, command: TunnelCommand) {
- self.tunnel_command_tx
- .send(command)
+ let mut sink = executor::spawn(Arc::make_mut(&mut self.tunnel_command_tx));
+ sink.wait_send(command)
.expect("Tunnel state machine has stopped");
}
diff --git a/talpid-core/src/offline/dummy.rs b/talpid-core/src/offline/dummy.rs
index 37eefb3c5e..7eda41d433 100644
--- a/talpid-core/src/offline/dummy.rs
+++ b/talpid-core/src/offline/dummy.rs
@@ -1,5 +1,6 @@
use crate::tunnel_state_machine::TunnelCommand;
use futures::sync::mpsc::UnboundedSender;
+use std::sync::Weak;
#[derive(err_derive::Error, Debug)]
#[error(display = "Dummy offline check error")]
@@ -13,6 +14,8 @@ impl MonitorHandle {
}
}
-pub fn spawn_monitor(_sender: UnboundedSender<TunnelCommand>) -> Result<MonitorHandle, Error> {
+pub fn spawn_monitor(
+ _sender: Weak<UnboundedSender<TunnelCommand>>,
+) -> Result<MonitorHandle, Error> {
Ok(MonitorHandle)
}
diff --git a/talpid-core/src/offline/linux.rs b/talpid-core/src/offline/linux.rs
index c38233d5e5..b75f187505 100644
--- a/talpid-core/src/offline/linux.rs
+++ b/talpid-core/src/offline/linux.rs
@@ -9,7 +9,7 @@ use rtnetlink::{
constants::{RTMGRP_IPV4_IFADDR, RTMGRP_IPV6_IFADDR, RTMGRP_LINK, RTMGRP_NOTIFY},
Connection, Handle,
};
-use std::{collections::BTreeSet, io, thread};
+use std::{collections::BTreeSet, io, sync::Weak, thread};
use talpid_types::ErrorExt;
pub type Result<T> = std::result::Result<T, Error>;
@@ -38,7 +38,7 @@ pub enum Error {
pub struct MonitorHandle;
-pub fn spawn_monitor(sender: UnboundedSender<TunnelCommand>) -> Result<MonitorHandle> {
+pub fn spawn_monitor(sender: Weak<UnboundedSender<TunnelCommand>>) -> Result<MonitorHandle> {
let socket = SocketAddr::new(
0,
RTMGRP_NOTIFY | RTMGRP_LINK | RTMGRP_IPV4_IFADDR | RTMGRP_IPV6_IFADDR,
@@ -209,11 +209,11 @@ fn monitor_event_loop(
struct LinkMonitor {
is_offline: bool,
- sender: UnboundedSender<TunnelCommand>,
+ sender: Weak<UnboundedSender<TunnelCommand>>,
}
impl LinkMonitor {
- pub fn new(sender: UnboundedSender<TunnelCommand>) -> Self {
+ pub fn new(sender: Weak<UnboundedSender<TunnelCommand>>) -> Self {
let is_offline = is_offline();
LinkMonitor { is_offline, sender }
@@ -226,14 +226,16 @@ impl LinkMonitor {
fn set_is_offline(&mut self, is_offline: bool) {
if self.is_offline != is_offline {
self.is_offline = is_offline;
- let _ = self
- .sender
- .unbounded_send(TunnelCommand::IsOffline(is_offline));
+ if let Some(sender) = self.sender.upgrade() {
+ let _ = sender.unbounded_send(TunnelCommand::IsOffline(is_offline));
+ }
}
}
/// Allow the offline check to fail open.
fn reset(&mut self) {
- let _ = self.sender.unbounded_send(TunnelCommand::IsOffline(false));
+ if let Some(sender) = self.sender.upgrade() {
+ let _ = sender.unbounded_send(TunnelCommand::IsOffline(false));
+ }
}
}
diff --git a/talpid-core/src/offline/macos.rs b/talpid-core/src/offline/macos.rs
index 51f13a21a1..ab9fee3b38 100644
--- a/talpid-core/src/offline/macos.rs
+++ b/talpid-core/src/offline/macos.rs
@@ -1,7 +1,10 @@
use crate::tunnel_state_machine::TunnelCommand;
use futures::sync::mpsc::UnboundedSender;
use log::{debug, trace};
-use std::{sync::mpsc, thread};
+use std::{
+ sync::{mpsc, Weak},
+ thread,
+};
use system_configuration::{
core_foundation::{
array::CFArray,
@@ -23,7 +26,7 @@ pub enum Error {
pub struct MonitorHandle;
-pub fn spawn_monitor(sender: UnboundedSender<TunnelCommand>) -> Result<MonitorHandle, Error> {
+pub fn spawn_monitor(sender: Weak<UnboundedSender<TunnelCommand>>) -> Result<MonitorHandle, Error> {
let (result_tx, result_rx) = mpsc::channel();
thread::spawn(move || match create_dynamic_store(sender) {
Ok(store) => {
@@ -44,7 +47,9 @@ impl MonitorHandle {
}
}
-fn create_dynamic_store(sender: UnboundedSender<TunnelCommand>) -> Result<SCDynamicStore, Error> {
+fn create_dynamic_store(
+ sender: Weak<UnboundedSender<TunnelCommand>>,
+) -> Result<SCDynamicStore, Error> {
let callback_context = SCDynamicStoreCallBackContext {
callout: primary_interface_change_callback,
info: sender,
@@ -76,12 +81,14 @@ fn run_dynamic_store_runloop(store: SCDynamicStore) {
fn primary_interface_change_callback(
store: SCDynamicStore,
_changed_keys: CFArray<CFString>,
- state: &mut UnboundedSender<TunnelCommand>,
+ state: &mut Weak<UnboundedSender<TunnelCommand>>,
) {
let is_offline = store.get(CFString::new(PRIMARY_INTERFACE_KEY)).is_none();
debug!(
"Computer went {}",
if is_offline { "offline" } else { "online" }
);
- let _ = state.unbounded_send(TunnelCommand::IsOffline(is_offline));
+ if let Some(state) = state.upgrade() {
+ let _ = state.unbounded_send(TunnelCommand::IsOffline(is_offline));
+ }
}
diff --git a/talpid-core/src/offline/mod.rs b/talpid-core/src/offline/mod.rs
index 34db9453e7..2092da2f0a 100644
--- a/talpid-core/src/offline/mod.rs
+++ b/talpid-core/src/offline/mod.rs
@@ -1,5 +1,6 @@
use crate::tunnel_state_machine::TunnelCommand;
use futures::sync::mpsc::UnboundedSender;
+use std::sync::Weak;
#[cfg(target_os = "macos")]
#[path = "macos.rs"]
@@ -27,6 +28,6 @@ impl MonitorHandle {
}
}
-pub fn spawn_monitor(sender: UnboundedSender<TunnelCommand>) -> Result<MonitorHandle, Error> {
+pub fn spawn_monitor(sender: Weak<UnboundedSender<TunnelCommand>>) -> Result<MonitorHandle, Error> {
Ok(MonitorHandle(imp::spawn_monitor(sender)?))
}
diff --git a/talpid-core/src/offline/windows.rs b/talpid-core/src/offline/windows.rs
index 816fe89b4d..8bb13c98e3 100644
--- a/talpid-core/src/offline/windows.rs
+++ b/talpid-core/src/offline/windows.rs
@@ -15,7 +15,7 @@ use std::{
mem::zeroed,
os::windows::io::{IntoRawHandle, RawHandle},
ptr,
- sync::Arc,
+ sync::{Arc, Weak},
thread,
time::Duration,
};
@@ -62,7 +62,7 @@ pub struct BroadcastListener {
unsafe impl Send for BroadcastListener {}
impl BroadcastListener {
- pub fn start(sender: UnboundedSender<TunnelCommand>) -> Result<Self, Error> {
+ pub fn start(sender: Weak<UnboundedSender<TunnelCommand>>) -> Result<Self, Error> {
let mut system_state = Arc::new(Mutex::new(SystemState {
network_connectivity: false,
suspended: false,
@@ -223,9 +223,7 @@ impl Drop for BroadcastListener {
PostThreadMessageW(self.thread_id, REQUEST_THREAD_SHUTDOWN, 0, 0);
WaitForSingleObject(self.thread_handle, INFINITE);
CloseHandle(self.thread_handle);
- if !winnet::WinNet_DeactivateConnectivityMonitor() {
- log::error!("Failed to deactivate connectivity monitor");
- }
+ winnet::WinNet_DeactivateConnectivityMonitor();
}
}
}
@@ -239,7 +237,7 @@ enum StateChange {
struct SystemState {
network_connectivity: bool,
suspended: bool,
- daemon_channel: UnboundedSender<TunnelCommand>,
+ daemon_channel: Weak<UnboundedSender<TunnelCommand>>,
}
impl SystemState {
@@ -257,11 +255,10 @@ impl SystemState {
let new_state = self.is_offline_currently();
if old_state != new_state {
- if let Err(e) = self
- .daemon_channel
- .unbounded_send(TunnelCommand::IsOffline(new_state))
- {
- log::error!("Failed to send new offline state to daemon: {}", e);
+ if let Some(daemon_channel) = self.daemon_channel.upgrade() {
+ if let Err(e) = daemon_channel.unbounded_send(TunnelCommand::IsOffline(new_state)) {
+ log::error!("Failed to send new offline state to daemon: {}", e);
+ }
}
}
}
@@ -273,7 +270,7 @@ impl SystemState {
pub type MonitorHandle = BroadcastListener;
-pub fn spawn_monitor(sender: UnboundedSender<TunnelCommand>) -> Result<MonitorHandle, Error> {
+pub fn spawn_monitor(sender: Weak<UnboundedSender<TunnelCommand>>) -> Result<MonitorHandle, Error> {
BroadcastListener::start(sender)
}
diff --git a/talpid-core/src/routing/windows.rs b/talpid-core/src/routing/windows.rs
index 684d1a3184..9e70c800fa 100644
--- a/talpid-core/src/routing/windows.rs
+++ b/talpid-core/src/routing/windows.rs
@@ -47,9 +47,7 @@ impl RouteManagerImpl {
impl Drop for RouteManagerImpl {
fn drop(&mut self) {
- if !winnet::deactivate_routing_manager() {
- log::error!("Failed to deactivate routing manager");
- }
+ winnet::deactivate_routing_manager()
}
}
diff --git a/talpid-core/src/tunnel_state_machine/mod.rs b/talpid-core/src/tunnel_state_machine/mod.rs
index 66e564456f..42c959c81e 100644
--- a/talpid-core/src/tunnel_state_machine/mod.rs
+++ b/talpid-core/src/tunnel_state_machine/mod.rs
@@ -25,7 +25,7 @@ use futures::{sync::mpsc, Async, Future, Poll, Stream};
use std::{
io,
path::{Path, PathBuf},
- sync::mpsc as sync_mpsc,
+ sync::{mpsc as sync_mpsc, Arc},
thread,
};
use talpid_types::{
@@ -69,14 +69,15 @@ pub fn spawn<P, T>(
resource_dir: PathBuf,
cache_dir: P,
state_change_listener: IntoSender<TunnelStateTransition, T>,
-) -> Result<mpsc::UnboundedSender<TunnelCommand>, Error>
+) -> Result<Arc<mpsc::UnboundedSender<TunnelCommand>>, Error>
where
P: AsRef<Path> + Send + 'static,
T: From<TunnelStateTransition> + Send + 'static,
{
let (command_tx, command_rx) = mpsc::unbounded();
+ let command_tx = Arc::new(command_tx);
let offline_monitor =
- offline::spawn_monitor(command_tx.clone()).map_err(Error::OfflineMonitorError)?;
+ offline::spawn_monitor(Arc::downgrade(&command_tx)).map_err(Error::OfflineMonitorError)?;
let is_offline = offline_monitor.is_offline();
let (startup_result_tx, startup_result_rx) = sync_mpsc::channel();
@@ -116,8 +117,8 @@ where
startup_result_rx
.recv()
- .expect("Failed to start tunnel state machine thread")
- .map(|_| command_tx)
+ .expect("Failed to start tunnel state machine thread")?;
+ Ok(command_tx)
}
fn create_event_loop<T>(
diff --git a/talpid-core/src/winnet.rs b/talpid-core/src/winnet.rs
index d4130e213b..7568200d43 100644
--- a/talpid-core/src/winnet.rs
+++ b/talpid-core/src/winnet.rs
@@ -364,7 +364,7 @@ pub fn routing_manager_add_routes(routes: &[WinNetRoute]) -> bool {
unsafe { WinNet_AddRoutes(ptr, length) }
}
-pub fn deactivate_routing_manager() -> bool {
+pub fn deactivate_routing_manager() {
unsafe { WinNet_DeactivateRouteManager() }
}
@@ -406,7 +406,7 @@ mod api {
// pub fn WinNet_DeleteRoute(route: *const super::WinNetRoute) -> bool;
#[link_name = "WinNet_DeactivateRouteManager"]
- pub fn WinNet_DeactivateRouteManager() -> bool;
+ pub fn WinNet_DeactivateRouteManager();
#[link_name = "WinNet_EnsureTopMetric"]
pub fn WinNet_EnsureTopMetric(
@@ -429,7 +429,7 @@ mod api {
) -> bool;
#[link_name = "WinNet_ReleaseString"]
- pub fn WinNet_ReleaseString(string: *mut wchar_t) -> u32;
+ pub fn WinNet_ReleaseString(string: *mut wchar_t);
#[link_name = "WinNet_ActivateConnectivityMonitor"]
pub fn WinNet_ActivateConnectivityMonitor(
@@ -450,7 +450,7 @@ mod api {
pub fn WinNet_UnregisterDefaultRouteChangedCallback(registrationHandle: *mut libc::c_void);
#[link_name = "WinNet_DeactivateConnectivityMonitor"]
- pub fn WinNet_DeactivateConnectivityMonitor() -> bool;
+ pub fn WinNet_DeactivateConnectivityMonitor();
#[link_name = "WinNet_AddDeviceIpAddresses"]
pub fn WinNet_AddDeviceIpAddresses(