summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--talpid-core/src/offline/android.rs4
-rw-r--r--talpid-core/src/offline/linux.rs58
-rw-r--r--talpid-core/src/offline/macos.rs6
-rw-r--r--talpid-core/src/offline/mod.rs19
-rw-r--r--talpid-core/src/offline/windows.rs6
-rw-r--r--talpid-core/src/tunnel_state_machine/mod.rs3
6 files changed, 50 insertions, 46 deletions
diff --git a/talpid-core/src/offline/android.rs b/talpid-core/src/offline/android.rs
index 7135ac339d..fefe2556cf 100644
--- a/talpid-core/src/offline/android.rs
+++ b/talpid-core/src/offline/android.rs
@@ -96,7 +96,7 @@ impl MonitorHandle {
})
}
- pub fn is_offline(&self) -> bool {
+ pub async fn is_offline(&self) -> bool {
match self.get_is_connected() {
Ok(is_connected) => !is_connected,
Err(error) => {
@@ -205,7 +205,7 @@ unsafe fn get_sender_from_address(address: jlong) -> Box<Weak<UnboundedSender<Tu
Box::from_raw(address as *mut Weak<UnboundedSender<TunnelCommand>>)
}
-pub fn spawn_monitor(
+pub async fn spawn_monitor(
sender: Weak<UnboundedSender<TunnelCommand>>,
android_context: AndroidContext,
) -> Result<MonitorHandle, Error> {
diff --git a/talpid-core/src/offline/linux.rs b/talpid-core/src/offline/linux.rs
index 63c058de76..5e4a4fa7e2 100644
--- a/talpid-core/src/offline/linux.rs
+++ b/talpid-core/src/offline/linux.rs
@@ -1,5 +1,8 @@
use crate::tunnel_state_machine::TunnelCommand;
-use futures::{channel::mpsc::UnboundedSender, StreamExt, TryStreamExt};
+use futures::{
+ channel::{mpsc::UnboundedSender, oneshot},
+ FutureExt, StreamExt, TryStreamExt,
+};
use netlink_packet_route::{
constants::{ARPHRD_LOOPBACK, ARPHRD_NONE, IFF_LOWER_UP, IFF_UP},
rtnl::link::nlas::{Info as LinkInfo, InfoKind, Nla as LinkNla},
@@ -14,8 +17,6 @@ use std::{collections::BTreeSet, io, sync::Weak};
pub type Result<T> = std::result::Result<T, Error>;
-const EVENT_LOOP_THREAD_NAME: &str = "mullvad-offline-detection-event-loop";
-
#[derive(err_derive::Error, Debug)]
#[error(no_from)]
pub enum Error {
@@ -46,12 +47,12 @@ pub enum Error {
pub struct MonitorHandle {
handle: rtnetlink::Handle,
- runtime: tokio::runtime::Runtime,
+ _stop_connection_tx: oneshot::Sender<()>,
}
impl MonitorHandle {
- pub fn is_offline(&mut self) -> bool {
- match self.runtime.block_on(check_offline_state(&self.handle)) {
+ pub async fn is_offline(&mut self) -> bool {
+ match check_offline_state(&self.handle).await {
Ok(is_offline) => is_offline,
Err(err) => {
log::error!(
@@ -64,41 +65,36 @@ impl MonitorHandle {
}
}
-pub fn spawn_monitor(sender: Weak<UnboundedSender<TunnelCommand>>) -> Result<MonitorHandle> {
- let mut runtime = tokio::runtime::Builder::new()
- .threaded_scheduler()
- .core_threads(1)
- .enable_all()
- .thread_name(EVENT_LOOP_THREAD_NAME)
- .build()
- .map_err(Error::EventLoopError)?;
-
- let (connection, handle, mut messages) = runtime.block_on(async move {
- let (mut connection, handle, messages) =
- rtnetlink::new_connection().map_err(Error::NetlinkConnectionError)?;
+pub async fn spawn_monitor(sender: Weak<UnboundedSender<TunnelCommand>>) -> Result<MonitorHandle> {
+ let (mut connection, handle, mut messages) =
+ rtnetlink::new_connection().map_err(Error::NetlinkConnectionError)?;
- let mgroup_flags = RTMGRP_IPV4_IFADDR | RTMGRP_IPV6_IFADDR | RTMGRP_LINK | RTMGRP_NOTIFY;
- let addr = SocketAddr::new(0, mgroup_flags);
+ let mgroup_flags = RTMGRP_IPV4_IFADDR | RTMGRP_IPV6_IFADDR | RTMGRP_LINK | RTMGRP_NOTIFY;
+ let addr = SocketAddr::new(0, mgroup_flags);
- connection
- .socket_mut()
- .bind(&addr)
- .map_err(Error::BindError)?;
+ connection
+ .socket_mut()
+ .bind(&addr)
+ .map_err(Error::BindError)?;
- Ok((connection, handle, messages))
- })?;
+ let (stop_connection_tx, stop_rx) = oneshot::channel();
- // Connection will be closed once the runtime is dropped
- let _ = runtime.spawn(connection);
- let mut is_offline = runtime.block_on(check_offline_state(&handle))?;
+ // Connection will be closed once the channel is dropped
+ tokio::spawn(async {
+ futures::select! {
+ _ = connection.fuse() => (),
+ _ = stop_rx.fuse() => (),
+ }
+ });
+ let mut is_offline = check_offline_state(&handle).await?;
let monitor_handle = MonitorHandle {
handle: handle.clone(),
- runtime,
+ _stop_connection_tx: stop_connection_tx,
};
- let _ = monitor_handle.runtime.spawn(async move {
+ tokio::spawn(async move {
while let Some(_new_message) = messages.next().await {
match sender.upgrade() {
Some(sender) => {
diff --git a/talpid-core/src/offline/macos.rs b/talpid-core/src/offline/macos.rs
index 82310d5f70..2569fa06c6 100644
--- a/talpid-core/src/offline/macos.rs
+++ b/talpid-core/src/offline/macos.rs
@@ -44,7 +44,7 @@ pub struct MonitorHandle;
impl MonitorHandle {
/// Host is considered to be offline if the IPv4 internet is considered to be unreachable by the
/// given reachability flags *or* there are no active physical interfaces.
- pub fn is_offline(&self) -> bool {
+ pub async fn is_offline(&self) -> bool {
let reachability = SCNetworkReachability::from(ipv4_internet());
let store = SCDynamicStoreBuilder::new("talpid-offline-check").build();
reachability
@@ -54,7 +54,9 @@ impl MonitorHandle {
}
}
-pub fn spawn_monitor(sender: Weak<UnboundedSender<TunnelCommand>>) -> Result<MonitorHandle, Error> {
+pub async fn spawn_monitor(
+ sender: Weak<UnboundedSender<TunnelCommand>>,
+) -> Result<MonitorHandle, Error> {
let (result_tx, result_rx) = mpsc::channel();
thread::spawn(move || {
let mut reachability_ref = SCNetworkReachability::from(ipv4_internet());
diff --git a/talpid-core/src/offline/mod.rs b/talpid-core/src/offline/mod.rs
index baaa839780..9a6ce1dae5 100644
--- a/talpid-core/src/offline/mod.rs
+++ b/talpid-core/src/offline/mod.rs
@@ -25,18 +25,21 @@ pub use self::imp::Error;
pub struct MonitorHandle(imp::MonitorHandle);
impl MonitorHandle {
- pub fn is_offline(&mut self) -> bool {
- self.0.is_offline()
+ pub async fn is_offline(&mut self) -> bool {
+ self.0.is_offline().await
}
}
-pub fn spawn_monitor(
+pub async fn spawn_monitor(
sender: Weak<UnboundedSender<TunnelCommand>>,
#[cfg(target_os = "android")] android_context: AndroidContext,
) -> Result<MonitorHandle, Error> {
- Ok(MonitorHandle(imp::spawn_monitor(
- sender,
- #[cfg(target_os = "android")]
- android_context,
- )?))
+ Ok(MonitorHandle(
+ imp::spawn_monitor(
+ sender,
+ #[cfg(target_os = "android")]
+ android_context,
+ )
+ .await?,
+ ))
}
diff --git a/talpid-core/src/offline/windows.rs b/talpid-core/src/offline/windows.rs
index 1563638bf6..d9e5c7782d 100644
--- a/talpid-core/src/offline/windows.rs
+++ b/talpid-core/src/offline/windows.rs
@@ -203,7 +203,7 @@ impl BroadcastListener {
state.apply_change(StateChange::NetworkConnectivity(connectivity));
}
- pub fn is_offline(&self) -> bool {
+ pub async fn is_offline(&self) -> bool {
let state = self._system_state.lock();
state.is_offline_currently().unwrap_or(false)
}
@@ -264,7 +264,9 @@ impl SystemState {
pub type MonitorHandle = BroadcastListener;
-pub fn spawn_monitor(sender: Weak<UnboundedSender<TunnelCommand>>) -> Result<MonitorHandle, Error> {
+pub async fn spawn_monitor(
+ sender: Weak<UnboundedSender<TunnelCommand>>,
+) -> Result<MonitorHandle, Error> {
BroadcastListener::start(sender)
}
diff --git a/talpid-core/src/tunnel_state_machine/mod.rs b/talpid-core/src/tunnel_state_machine/mod.rs
index 22c0d823a5..4d96a2fe3c 100644
--- a/talpid-core/src/tunnel_state_machine/mod.rs
+++ b/talpid-core/src/tunnel_state_machine/mod.rs
@@ -93,8 +93,9 @@ pub async fn spawn(
#[cfg(target_os = "android")]
android_context.clone(),
)
+ .await
.map_err(Error::OfflineMonitorError)?;
- let is_offline = offline_monitor.is_offline();
+ let is_offline = offline_monitor.is_offline().await;
let tun_provider = TunProvider::new(
#[cfg(target_os = "android")]