diff options
| author | David Lönnhager <david.l@mullvad.net> | 2022-01-13 15:54:56 +0100 |
|---|---|---|
| committer | David Lönnhager <david.l@mullvad.net> | 2022-01-18 09:28:37 +0100 |
| commit | 69d30398025513c9ff719bb35e4dd0419d772d62 (patch) | |
| tree | eaf257c7b497ae74136e064b75ab7a4d063d5c00 | |
| parent | 3c290d73bfa221e7e46b2a0587df32b6ed712591 (diff) | |
| download | mullvadvpn-69d30398025513c9ff719bb35e4dd0419d772d62.tar.xz mullvadvpn-69d30398025513c9ff719bb35e4dd0419d772d62.zip | |
Invert async/sync logic in wireguard monitor
| -rw-r--r-- | talpid-core/src/tunnel/wireguard/mod.rs | 109 |
1 files changed, 55 insertions, 54 deletions
diff --git a/talpid-core/src/tunnel/wireguard/mod.rs b/talpid-core/src/tunnel/wireguard/mod.rs index 6cc2d85fdb..576d37c1ed 100644 --- a/talpid-core/src/tunnel/wireguard/mod.rs +++ b/talpid-core/src/tunnel/wireguard/mod.rs @@ -3,11 +3,13 @@ use self::config::Config; use super::tun_provider; use super::{tun_provider::TunProvider, TunnelEvent, TunnelMetadata}; use crate::routing::{self, RequiredRoute}; -#[cfg(windows)] -use futures::channel::{mpsc, oneshot}; use futures::future::abortable; #[cfg(windows)] -use futures::{FutureExt, StreamExt}; +use futures::{ + channel::{mpsc, oneshot}, + future::{self, Either}, + StreamExt, +}; #[cfg(target_os = "linux")] use lazy_static::lazy_static; #[cfg(target_os = "linux")] @@ -191,7 +193,7 @@ impl WireguardMonitor { } #[cfg(target_os = "windows")] - let (setup_done_tx, setup_done_rx) = mpsc::channel(0); + let (setup_done_tx, mut setup_done_rx) = mpsc::channel(0); let tunnel = Self::open_tunnel( runtime.clone(), &config, @@ -236,84 +238,83 @@ impl WireguardMonitor { let metadata = Self::tunnel_metadata(&iface_name, &config); - std::thread::spawn(move || { + tokio::spawn(async move { #[cfg(windows)] { - let mut done_rx = setup_done_rx.fuse(); let iface_close_sender = close_sender.clone(); - let result = runtime.block_on(async move { - futures::select! { - result = done_rx.next() => { - match result { - Some(result) => { - result.map_err(|error| { - log::error!("{}", error.display_chain_with_msg("Failed to configure tunnel interface")); - iface_close_sender.send(CloseMsg::SetupError( - Error::IpInterfacesError - )) - .unwrap_or(()) - }) - } - None => Err(()), - } - } - _ = stop_setup_rx.fuse() => Err(()), - } - }); + let result = match future::select(setup_done_rx.next(), stop_setup_rx).await { + Either::Left((result, _)) => match result { + Some(result) => result.map_err(|error| { + log::error!( + "{}", + error.display_chain_with_msg( + "Failed to configure tunnel interface" + ) + ); + iface_close_sender + .send(CloseMsg::SetupError(Error::IpInterfacesError)) + .unwrap_or(()) + }), + None => Err(()), + }, + Either::Right(_) => Err(()), + }; if result.is_err() { return; } } - runtime.block_on((on_event)(TunnelEvent::InterfaceUp(metadata.clone()))); + (on_event)(TunnelEvent::InterfaceUp(metadata.clone())).await; - let setup_iface_routes = || -> Result<()> { + let setup_iface_routes = async move { #[cfg(target_os = "windows")] if !crate::winnet::add_device_ip_addresses(&iface_name, &config.tunnel.addresses) { return Err(Error::SetIpAddressesError); } - runtime.block_on(async move { - #[cfg(target_os = "linux")] - route_handle - .create_routing_rules(config.enable_ipv6) - .await - .map_err(Error::SetupRoutingError)?; + #[cfg(target_os = "linux")] + route_handle + .create_routing_rules(config.enable_ipv6) + .await + .map_err(Error::SetupRoutingError)?; - let routes = Self::get_in_tunnel_routes(&iface_name, &config) - .chain(Self::get_tunnel_traffic_routes(&endpoint_addrs)); + let routes = Self::get_in_tunnel_routes(&iface_name, &config) + .chain(Self::get_tunnel_traffic_routes(&endpoint_addrs)); - route_handle - .add_routes(routes.collect()) - .await - .map_err(Error::SetupRoutingError) - }) + route_handle + .add_routes(routes.collect()) + .await + .map_err(Error::SetupRoutingError) }; - if let Err(error) = setup_iface_routes() { + if let Err(error) = setup_iface_routes.await { let _ = close_sender.send(CloseMsg::SetupError(error)); return; } - match connectivity_monitor.establish_connectivity(retry_attempt) { - Ok(true) => { - runtime.block_on((on_event)(TunnelEvent::Up(metadata))); + tokio::task::spawn_blocking(move || { + match connectivity_monitor.establish_connectivity(retry_attempt) { + Ok(true) => { + tokio::spawn((on_event)(TunnelEvent::Up(metadata))); - if let Err(error) = connectivity_monitor.run() { + if let Err(error) = connectivity_monitor.run() { + log::error!( + "{}", + error.display_chain_with_msg("Connectivity monitor failed") + ); + } + } + Ok(false) => log::warn!("Timeout while checking tunnel connection"), + Err(error) => { log::error!( "{}", - error.display_chain_with_msg("Connectivity monitor failed") + error.display_chain_with_msg("Failed to check tunnel connection") ); } } - Ok(false) => log::warn!("Timeout while checking tunnel connection"), - Err(error) => { - log::error!( - "{}", - error.display_chain_with_msg("Failed to check tunnel connection") - ); - } - } + }) + .await + .expect("connectivity monitor thread panicked"); let _ = close_sender.send(CloseMsg::PingErr); }); |
