summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorDavid Lönnhager <david.l@mullvad.net>2022-01-13 15:54:56 +0100
committerDavid Lönnhager <david.l@mullvad.net>2022-01-18 09:28:37 +0100
commit69d30398025513c9ff719bb35e4dd0419d772d62 (patch)
treeeaf257c7b497ae74136e064b75ab7a4d063d5c00
parent3c290d73bfa221e7e46b2a0587df32b6ed712591 (diff)
downloadmullvadvpn-69d30398025513c9ff719bb35e4dd0419d772d62.tar.xz
mullvadvpn-69d30398025513c9ff719bb35e4dd0419d772d62.zip
Invert async/sync logic in wireguard monitor
-rw-r--r--talpid-core/src/tunnel/wireguard/mod.rs109
1 files changed, 55 insertions, 54 deletions
diff --git a/talpid-core/src/tunnel/wireguard/mod.rs b/talpid-core/src/tunnel/wireguard/mod.rs
index 6cc2d85fdb..576d37c1ed 100644
--- a/talpid-core/src/tunnel/wireguard/mod.rs
+++ b/talpid-core/src/tunnel/wireguard/mod.rs
@@ -3,11 +3,13 @@ use self::config::Config;
use super::tun_provider;
use super::{tun_provider::TunProvider, TunnelEvent, TunnelMetadata};
use crate::routing::{self, RequiredRoute};
-#[cfg(windows)]
-use futures::channel::{mpsc, oneshot};
use futures::future::abortable;
#[cfg(windows)]
-use futures::{FutureExt, StreamExt};
+use futures::{
+ channel::{mpsc, oneshot},
+ future::{self, Either},
+ StreamExt,
+};
#[cfg(target_os = "linux")]
use lazy_static::lazy_static;
#[cfg(target_os = "linux")]
@@ -191,7 +193,7 @@ impl WireguardMonitor {
}
#[cfg(target_os = "windows")]
- let (setup_done_tx, setup_done_rx) = mpsc::channel(0);
+ let (setup_done_tx, mut setup_done_rx) = mpsc::channel(0);
let tunnel = Self::open_tunnel(
runtime.clone(),
&config,
@@ -236,84 +238,83 @@ impl WireguardMonitor {
let metadata = Self::tunnel_metadata(&iface_name, &config);
- std::thread::spawn(move || {
+ tokio::spawn(async move {
#[cfg(windows)]
{
- let mut done_rx = setup_done_rx.fuse();
let iface_close_sender = close_sender.clone();
- let result = runtime.block_on(async move {
- futures::select! {
- result = done_rx.next() => {
- match result {
- Some(result) => {
- result.map_err(|error| {
- log::error!("{}", error.display_chain_with_msg("Failed to configure tunnel interface"));
- iface_close_sender.send(CloseMsg::SetupError(
- Error::IpInterfacesError
- ))
- .unwrap_or(())
- })
- }
- None => Err(()),
- }
- }
- _ = stop_setup_rx.fuse() => Err(()),
- }
- });
+ let result = match future::select(setup_done_rx.next(), stop_setup_rx).await {
+ Either::Left((result, _)) => match result {
+ Some(result) => result.map_err(|error| {
+ log::error!(
+ "{}",
+ error.display_chain_with_msg(
+ "Failed to configure tunnel interface"
+ )
+ );
+ iface_close_sender
+ .send(CloseMsg::SetupError(Error::IpInterfacesError))
+ .unwrap_or(())
+ }),
+ None => Err(()),
+ },
+ Either::Right(_) => Err(()),
+ };
if result.is_err() {
return;
}
}
- runtime.block_on((on_event)(TunnelEvent::InterfaceUp(metadata.clone())));
+ (on_event)(TunnelEvent::InterfaceUp(metadata.clone())).await;
- let setup_iface_routes = || -> Result<()> {
+ let setup_iface_routes = async move {
#[cfg(target_os = "windows")]
if !crate::winnet::add_device_ip_addresses(&iface_name, &config.tunnel.addresses) {
return Err(Error::SetIpAddressesError);
}
- runtime.block_on(async move {
- #[cfg(target_os = "linux")]
- route_handle
- .create_routing_rules(config.enable_ipv6)
- .await
- .map_err(Error::SetupRoutingError)?;
+ #[cfg(target_os = "linux")]
+ route_handle
+ .create_routing_rules(config.enable_ipv6)
+ .await
+ .map_err(Error::SetupRoutingError)?;
- let routes = Self::get_in_tunnel_routes(&iface_name, &config)
- .chain(Self::get_tunnel_traffic_routes(&endpoint_addrs));
+ let routes = Self::get_in_tunnel_routes(&iface_name, &config)
+ .chain(Self::get_tunnel_traffic_routes(&endpoint_addrs));
- route_handle
- .add_routes(routes.collect())
- .await
- .map_err(Error::SetupRoutingError)
- })
+ route_handle
+ .add_routes(routes.collect())
+ .await
+ .map_err(Error::SetupRoutingError)
};
- if let Err(error) = setup_iface_routes() {
+ if let Err(error) = setup_iface_routes.await {
let _ = close_sender.send(CloseMsg::SetupError(error));
return;
}
- match connectivity_monitor.establish_connectivity(retry_attempt) {
- Ok(true) => {
- runtime.block_on((on_event)(TunnelEvent::Up(metadata)));
+ tokio::task::spawn_blocking(move || {
+ match connectivity_monitor.establish_connectivity(retry_attempt) {
+ Ok(true) => {
+ tokio::spawn((on_event)(TunnelEvent::Up(metadata)));
- if let Err(error) = connectivity_monitor.run() {
+ if let Err(error) = connectivity_monitor.run() {
+ log::error!(
+ "{}",
+ error.display_chain_with_msg("Connectivity monitor failed")
+ );
+ }
+ }
+ Ok(false) => log::warn!("Timeout while checking tunnel connection"),
+ Err(error) => {
log::error!(
"{}",
- error.display_chain_with_msg("Connectivity monitor failed")
+ error.display_chain_with_msg("Failed to check tunnel connection")
);
}
}
- Ok(false) => log::warn!("Timeout while checking tunnel connection"),
- Err(error) => {
- log::error!(
- "{}",
- error.display_chain_with_msg("Failed to check tunnel connection")
- );
- }
- }
+ })
+ .await
+ .expect("connectivity monitor thread panicked");
let _ = close_sender.send(CloseMsg::PingErr);
});