diff options
| -rw-r--r-- | talpid-core/src/offline/android.rs | 4 | ||||
| -rw-r--r-- | talpid-core/src/offline/linux.rs | 58 | ||||
| -rw-r--r-- | talpid-core/src/offline/macos.rs | 6 | ||||
| -rw-r--r-- | talpid-core/src/offline/mod.rs | 19 | ||||
| -rw-r--r-- | talpid-core/src/offline/windows.rs | 6 | ||||
| -rw-r--r-- | talpid-core/src/tunnel_state_machine/mod.rs | 3 |
6 files changed, 50 insertions, 46 deletions
diff --git a/talpid-core/src/offline/android.rs b/talpid-core/src/offline/android.rs index 7135ac339d..fefe2556cf 100644 --- a/talpid-core/src/offline/android.rs +++ b/talpid-core/src/offline/android.rs @@ -96,7 +96,7 @@ impl MonitorHandle { }) } - pub fn is_offline(&self) -> bool { + pub async fn is_offline(&self) -> bool { match self.get_is_connected() { Ok(is_connected) => !is_connected, Err(error) => { @@ -205,7 +205,7 @@ unsafe fn get_sender_from_address(address: jlong) -> Box<Weak<UnboundedSender<Tu Box::from_raw(address as *mut Weak<UnboundedSender<TunnelCommand>>) } -pub fn spawn_monitor( +pub async fn spawn_monitor( sender: Weak<UnboundedSender<TunnelCommand>>, android_context: AndroidContext, ) -> Result<MonitorHandle, Error> { diff --git a/talpid-core/src/offline/linux.rs b/talpid-core/src/offline/linux.rs index 63c058de76..5e4a4fa7e2 100644 --- a/talpid-core/src/offline/linux.rs +++ b/talpid-core/src/offline/linux.rs @@ -1,5 +1,8 @@ use crate::tunnel_state_machine::TunnelCommand; -use futures::{channel::mpsc::UnboundedSender, StreamExt, TryStreamExt}; +use futures::{ + channel::{mpsc::UnboundedSender, oneshot}, + FutureExt, StreamExt, TryStreamExt, +}; use netlink_packet_route::{ constants::{ARPHRD_LOOPBACK, ARPHRD_NONE, IFF_LOWER_UP, IFF_UP}, rtnl::link::nlas::{Info as LinkInfo, InfoKind, Nla as LinkNla}, @@ -14,8 +17,6 @@ use std::{collections::BTreeSet, io, sync::Weak}; pub type Result<T> = std::result::Result<T, Error>; -const EVENT_LOOP_THREAD_NAME: &str = "mullvad-offline-detection-event-loop"; - #[derive(err_derive::Error, Debug)] #[error(no_from)] pub enum Error { @@ -46,12 +47,12 @@ pub enum Error { pub struct MonitorHandle { handle: rtnetlink::Handle, - runtime: tokio::runtime::Runtime, + _stop_connection_tx: oneshot::Sender<()>, } impl MonitorHandle { - pub fn is_offline(&mut self) -> bool { - match self.runtime.block_on(check_offline_state(&self.handle)) { + pub async fn is_offline(&mut self) -> bool { + match check_offline_state(&self.handle).await { Ok(is_offline) => is_offline, Err(err) => { log::error!( @@ -64,41 +65,36 @@ impl MonitorHandle { } } -pub fn spawn_monitor(sender: Weak<UnboundedSender<TunnelCommand>>) -> Result<MonitorHandle> { - let mut runtime = tokio::runtime::Builder::new() - .threaded_scheduler() - .core_threads(1) - .enable_all() - .thread_name(EVENT_LOOP_THREAD_NAME) - .build() - .map_err(Error::EventLoopError)?; - - let (connection, handle, mut messages) = runtime.block_on(async move { - let (mut connection, handle, messages) = - rtnetlink::new_connection().map_err(Error::NetlinkConnectionError)?; +pub async fn spawn_monitor(sender: Weak<UnboundedSender<TunnelCommand>>) -> Result<MonitorHandle> { + let (mut connection, handle, mut messages) = + rtnetlink::new_connection().map_err(Error::NetlinkConnectionError)?; - let mgroup_flags = RTMGRP_IPV4_IFADDR | RTMGRP_IPV6_IFADDR | RTMGRP_LINK | RTMGRP_NOTIFY; - let addr = SocketAddr::new(0, mgroup_flags); + let mgroup_flags = RTMGRP_IPV4_IFADDR | RTMGRP_IPV6_IFADDR | RTMGRP_LINK | RTMGRP_NOTIFY; + let addr = SocketAddr::new(0, mgroup_flags); - connection - .socket_mut() - .bind(&addr) - .map_err(Error::BindError)?; + connection + .socket_mut() + .bind(&addr) + .map_err(Error::BindError)?; - Ok((connection, handle, messages)) - })?; + let (stop_connection_tx, stop_rx) = oneshot::channel(); - // Connection will be closed once the runtime is dropped - let _ = runtime.spawn(connection); - let mut is_offline = runtime.block_on(check_offline_state(&handle))?; + // Connection will be closed once the channel is dropped + tokio::spawn(async { + futures::select! { + _ = connection.fuse() => (), + _ = stop_rx.fuse() => (), + } + }); + let mut is_offline = check_offline_state(&handle).await?; let monitor_handle = MonitorHandle { handle: handle.clone(), - runtime, + _stop_connection_tx: stop_connection_tx, }; - let _ = monitor_handle.runtime.spawn(async move { + tokio::spawn(async move { while let Some(_new_message) = messages.next().await { match sender.upgrade() { Some(sender) => { diff --git a/talpid-core/src/offline/macos.rs b/talpid-core/src/offline/macos.rs index 82310d5f70..2569fa06c6 100644 --- a/talpid-core/src/offline/macos.rs +++ b/talpid-core/src/offline/macos.rs @@ -44,7 +44,7 @@ pub struct MonitorHandle; impl MonitorHandle { /// Host is considered to be offline if the IPv4 internet is considered to be unreachable by the /// given reachability flags *or* there are no active physical interfaces. - pub fn is_offline(&self) -> bool { + pub async fn is_offline(&self) -> bool { let reachability = SCNetworkReachability::from(ipv4_internet()); let store = SCDynamicStoreBuilder::new("talpid-offline-check").build(); reachability @@ -54,7 +54,9 @@ impl MonitorHandle { } } -pub fn spawn_monitor(sender: Weak<UnboundedSender<TunnelCommand>>) -> Result<MonitorHandle, Error> { +pub async fn spawn_monitor( + sender: Weak<UnboundedSender<TunnelCommand>>, +) -> Result<MonitorHandle, Error> { let (result_tx, result_rx) = mpsc::channel(); thread::spawn(move || { let mut reachability_ref = SCNetworkReachability::from(ipv4_internet()); diff --git a/talpid-core/src/offline/mod.rs b/talpid-core/src/offline/mod.rs index baaa839780..9a6ce1dae5 100644 --- a/talpid-core/src/offline/mod.rs +++ b/talpid-core/src/offline/mod.rs @@ -25,18 +25,21 @@ pub use self::imp::Error; pub struct MonitorHandle(imp::MonitorHandle); impl MonitorHandle { - pub fn is_offline(&mut self) -> bool { - self.0.is_offline() + pub async fn is_offline(&mut self) -> bool { + self.0.is_offline().await } } -pub fn spawn_monitor( +pub async fn spawn_monitor( sender: Weak<UnboundedSender<TunnelCommand>>, #[cfg(target_os = "android")] android_context: AndroidContext, ) -> Result<MonitorHandle, Error> { - Ok(MonitorHandle(imp::spawn_monitor( - sender, - #[cfg(target_os = "android")] - android_context, - )?)) + Ok(MonitorHandle( + imp::spawn_monitor( + sender, + #[cfg(target_os = "android")] + android_context, + ) + .await?, + )) } diff --git a/talpid-core/src/offline/windows.rs b/talpid-core/src/offline/windows.rs index 1563638bf6..d9e5c7782d 100644 --- a/talpid-core/src/offline/windows.rs +++ b/talpid-core/src/offline/windows.rs @@ -203,7 +203,7 @@ impl BroadcastListener { state.apply_change(StateChange::NetworkConnectivity(connectivity)); } - pub fn is_offline(&self) -> bool { + pub async fn is_offline(&self) -> bool { let state = self._system_state.lock(); state.is_offline_currently().unwrap_or(false) } @@ -264,7 +264,9 @@ impl SystemState { pub type MonitorHandle = BroadcastListener; -pub fn spawn_monitor(sender: Weak<UnboundedSender<TunnelCommand>>) -> Result<MonitorHandle, Error> { +pub async fn spawn_monitor( + sender: Weak<UnboundedSender<TunnelCommand>>, +) -> Result<MonitorHandle, Error> { BroadcastListener::start(sender) } diff --git a/talpid-core/src/tunnel_state_machine/mod.rs b/talpid-core/src/tunnel_state_machine/mod.rs index 22c0d823a5..4d96a2fe3c 100644 --- a/talpid-core/src/tunnel_state_machine/mod.rs +++ b/talpid-core/src/tunnel_state_machine/mod.rs @@ -93,8 +93,9 @@ pub async fn spawn( #[cfg(target_os = "android")] android_context.clone(), ) + .await .map_err(Error::OfflineMonitorError)?; - let is_offline = offline_monitor.is_offline(); + let is_offline = offline_monitor.is_offline().await; let tun_provider = TunProvider::new( #[cfg(target_os = "android")] |
