diff options
| author | Emīls <emils@mullvad.net> | 2020-04-15 10:41:45 +0100 |
|---|---|---|
| committer | Emīls Piņķis <emils@mullvad.net> | 2020-04-27 11:17:00 +0100 |
| commit | 80fb0b2bd59708f4b0c9f01b410eef739f1af6a8 (patch) | |
| tree | 96913ebfe90beb6b134d43e747143503ed6fa7a6 | |
| parent | 6d7141d24d17f2642eb3bc19675dd05b57fb6265 (diff) | |
| download | mullvadvpn-80fb0b2bd59708f4b0c9f01b410eef739f1af6a8.tar.xz mullvadvpn-80fb0b2bd59708f4b0c9f01b410eef739f1af6a8.zip | |
Rework offline detection on Linux
| -rw-r--r-- | talpid-core/src/offline/linux.rs | 283 | ||||
| -rw-r--r-- | talpid-core/src/offline/mod.rs | 2 | ||||
| -rw-r--r-- | talpid-core/src/tunnel_state_machine/mod.rs | 2 |
3 files changed, 113 insertions, 174 deletions
diff --git a/talpid-core/src/offline/linux.rs b/talpid-core/src/offline/linux.rs index b75f187505..0b6526f0e5 100644 --- a/talpid-core/src/offline/linux.rs +++ b/talpid-core/src/offline/linux.rs @@ -1,178 +1,173 @@ use crate::tunnel_state_machine::TunnelCommand; -use futures::{future::Either, sync::mpsc::UnboundedSender, Future, Stream}; -use log::{error, warn}; -use netlink_packet::{ - AddressMessage, LinkInfo, LinkInfoKind, LinkLayerType, LinkMessage, LinkNla, NetlinkMessage, +use futures::{StreamExt, TryStreamExt}; +use futures01::sync::mpsc::UnboundedSender; +use netlink_packet_route::{ + constants::{ARPHRD_LOOPBACK, ARPHRD_NONE, IFF_LOWER_UP, IFF_UP}, + rtnl::link::nlas::{Info as LinkInfo, InfoKind, Nla as LinkNla}, + LinkMessage, }; use netlink_sys::SocketAddr; use rtnetlink::{ constants::{RTMGRP_IPV4_IFADDR, RTMGRP_IPV6_IFADDR, RTMGRP_LINK, RTMGRP_NOTIFY}, - Connection, Handle, + Handle, }; -use std::{collections::BTreeSet, io, sync::Weak, thread}; -use talpid_types::ErrorExt; +use std::{collections::BTreeSet, io, sync::Weak}; pub type Result<T> = std::result::Result<T, Error>; +const EVENT_LOOP_THREAD_NAME: &str = "mullvad-offline-detection-event-loop"; + #[derive(err_derive::Error, Debug)] #[error(no_from)] pub enum Error { #[error(display = "Failed to get list of IP links")] GetLinksError(#[error(source)] failure::Compat<rtnetlink::Error>), + #[error(display = "Failed to get list of IP addresses")] + GetAddressesError(#[error(source)] failure::Compat<rtnetlink::Error>), + #[error(display = "Failed to connect to netlink socket")] NetlinkConnectionError(#[error(source)] io::Error), + #[error(display = "Failed to connect to bind to netlink socket")] + BindError(#[error(source)] io::Error), + #[error(display = "Failed to start listening on netlink socket")] NetlinkBindError(#[error(source)] io::Error), - #[error(display = "Error while communicating on the netlink socket")] - NetlinkError(#[error(source)] netlink_proto::Error), - #[error(display = "Error while processing netlink messages")] MonitorNetlinkError, #[error(display = "Netlink connection has unexpectedly disconnected")] NetlinkDisconnected, -} - -pub struct MonitorHandle; - -pub fn spawn_monitor(sender: Weak<UnboundedSender<TunnelCommand>>) -> Result<MonitorHandle> { - let socket = SocketAddr::new( - 0, - RTMGRP_NOTIFY | RTMGRP_LINK | RTMGRP_IPV4_IFADDR | RTMGRP_IPV6_IFADDR, - ); - let (mut connection, _, messages) = rtnetlink::new_connection_with_messages().unwrap(); - connection - .socket_mut() - .bind(&socket) - .map_err(Error::NetlinkBindError)?; - - let link_monitor = LinkMonitor::new(sender); - - thread::spawn(|| { - if let Err(error) = monitor_event_loop(connection, messages, link_monitor) { - error!( - "{}", - error.display_chain_with_msg("Error running link monitor event loop") - ); - } - }); - - Ok(MonitorHandle) + #[error(display = "Failed to initialize event loop")] + EventLoopError(#[error(source)] io::Error), } -fn is_offline() -> bool { - check_if_offline().unwrap_or_else(|error| { - warn!( - "{}", - error.display_chain_with_msg("Failed to check for internet connection") - ); - false - }) +pub struct MonitorHandle { + handle: rtnetlink::Handle, + runtime: tokio02::runtime::Runtime, } impl MonitorHandle { - pub fn is_offline(&self) -> bool { - is_offline() + pub fn is_offline(&mut self) -> bool { + match self.runtime.block_on(check_offline_state(&self.handle)) { + Ok(is_offline) => is_offline, + Err(err) => { + log::error!( + "Failed to verify offline state: {}. Presuming connectivity", + err + ); + false + } + } } } -/// Checks if there are no running links or that none of the running links have IP addresses -/// assigned to them. -fn check_if_offline() -> Result<bool> { - let mut connection = NetlinkConnection::new()?; - let interfaces = connection.running_interfaces()?; +pub fn spawn_monitor(sender: Weak<UnboundedSender<TunnelCommand>>) -> Result<MonitorHandle> { + let mut runtime = tokio02::runtime::Builder::new() + .threaded_scheduler() + .core_threads(1) + .enable_all() + .thread_name(EVENT_LOOP_THREAD_NAME) + .build() + .map_err(Error::EventLoopError)?; - if interfaces.is_empty() { - Ok(true) - } else { - // Check if the current IP addresses are not assigned to any one of the running interfaces - Ok(connection - .addresses()? - .into_iter() - .all(|address| !interfaces.contains(&address.header.index))) - } -} + let (connection, handle, mut messages) = runtime.block_on(async move { + let (mut connection, handle, messages) = + rtnetlink::new_connection().map_err(Error::NetlinkConnectionError)?; -struct NetlinkConnection { - connection: Option<Connection>, - handle: Handle, -} + let mgroup_flags = RTMGRP_IPV4_IFADDR | RTMGRP_IPV6_IFADDR | RTMGRP_LINK | RTMGRP_NOTIFY; + let addr = SocketAddr::new(0, mgroup_flags); -impl NetlinkConnection { - /// Open a connection on the netlink socket. - pub fn new() -> Result<Self> { - let (connection, handle) = - rtnetlink::new_connection().map_err(Error::NetlinkConnectionError)?; + connection + .socket_mut() + .bind(&addr) + .map_err(Error::BindError)?; - Ok(NetlinkConnection { - connection: Some(connection), - handle, - }) - } + Ok((connection, handle, messages)) + })?; - /// List all IP addresses assigned to all interfaces. - pub fn addresses(&mut self) -> Result<Vec<AddressMessage>> { - self.execute_request(self.handle.address().get().execute().collect()) - } + // Connection will be closed once the runtime is dropped + let _ = runtime.spawn(connection); + let mut is_offline = runtime.block_on(check_offline_state(&handle))?; - /// List all links registered on the system. - fn links(&mut self) -> Result<Vec<LinkMessage>> { - self.execute_request(self.handle.link().get().execute().collect()) - } + let monitor_handle = MonitorHandle { + handle: handle.clone(), + runtime, + }; - /// List all unique interface indices that have a running link. - pub fn running_interfaces(&mut self) -> Result<BTreeSet<u32>> { - let links = self.links()?; - Ok(links - .into_iter() - .filter(link_provides_connectivity) - .map(|link| link.header.index) - .collect()) - } + let _ = monitor_handle.runtime.spawn(async move { + while let Some(_new_message) = messages.next().await { + match sender.upgrade() { + Some(sender) => { + let new_offline_state = check_offline_state(&handle).await.unwrap_or(false); + if new_offline_state != is_offline { + is_offline = new_offline_state; + let _ = sender.unbounded_send(TunnelCommand::IsOffline(is_offline)); + } + } + None => return, + } + } + }); - /// Helper function to execute an asynchronous request synchronously. - fn execute_request<R>(&mut self, request: R) -> Result<R::Item> - where - R: Future<Error = rtnetlink::Error>, + + Ok(monitor_handle) +} + +async fn check_offline_state(handle: &Handle) -> Result<bool> { + let mut link_request = handle.link().get().execute(); + let mut links = BTreeSet::new(); + while let Some(link) = link_request + .try_next() + .await + .map_err(failure::Fail::compat) + .map_err(Error::GetLinksError)? { - let connection = self.connection.take().ok_or(Error::NetlinkDisconnected)?; + if link_provides_connectivity(&link) { + links.insert(link.header.index); + } + } + + if links.is_empty() { + return Ok(true); + } - let (result, connection) = match connection.select2(request).wait() { - Ok(Either::A(_)) => return Err(Error::NetlinkDisconnected), - Err(Either::A((error, _))) => return Err(Error::NetlinkError(error)), - Ok(Either::B((links, connection))) => (Ok(links), connection), - Err(Either::B((error, connection))) => ( - Err(Error::GetLinksError(failure::Fail::compat(error))), - connection, - ), - }; + let mut address_request = handle.address().get().execute(); - self.connection = Some(connection); - result + while let Some(address) = address_request + .try_next() + .await + .map_err(failure::Fail::compat) + .map_err(Error::GetAddressesError)? + { + if links.contains(&address.header.index) { + return Ok(false); + } } + Ok(true) } + +// TODO: Improve by allowing bridge links to provide connectivity, will require route checking. fn link_provides_connectivity(link: &LinkMessage) -> bool { // Some tunnels have the link layer type set to None - link.header.link_layer_type != LinkLayerType::Loopback - && link.header.link_layer_type != LinkLayerType::None - && link.header.link_layer_type != LinkLayerType::Irda - && link.header.flags.is_running() + link.header.link_layer_type != ARPHRD_NONE + && link.header.link_layer_type != ARPHRD_LOOPBACK + && (link.header.flags & IFF_UP > 0 || link.header.flags & IFF_LOWER_UP > 0) && !is_virtual_interface(link) } fn is_virtual_interface(link: &LinkMessage) -> bool { for nla in link.nlas.iter() { - if let LinkNla::LinkInfo(link_info) = nla { - for info in link_info.iter() { + if let LinkNla::Info(info_nlas) = nla { + for info in info_nlas.iter() { // LinkInfo::Kind seems to only be set when the link is actually virtual if let LinkInfo::Kind(ref kind) = info { - use LinkInfoKind::*; + use InfoKind::*; return match kind { Dummy | Bridge | Tun | Nlmon | IpTun => true, _ => false, @@ -183,59 +178,3 @@ fn is_virtual_interface(link: &LinkMessage) -> bool { } false } - -fn monitor_event_loop( - connection: Connection, - channel: impl Stream<Item = NetlinkMessage, Error = ()>, - mut link_monitor: LinkMonitor, -) -> Result<()> { - let monitor = channel - .for_each(|_message| { - link_monitor.update(); - Ok(()) - }) - .map_err(|_| Error::MonitorNetlinkError); - - // Under normal circumstances, this runs forever. - let result = connection - .map_err(Error::NetlinkError) - .join(monitor) - .wait() - .map(|_| ()); - // But if it fails, it should fail open. - link_monitor.reset(); - result -} - -struct LinkMonitor { - is_offline: bool, - sender: Weak<UnboundedSender<TunnelCommand>>, -} - -impl LinkMonitor { - pub fn new(sender: Weak<UnboundedSender<TunnelCommand>>) -> Self { - let is_offline = is_offline(); - - LinkMonitor { is_offline, sender } - } - - pub fn update(&mut self) { - self.set_is_offline(is_offline()); - } - - fn set_is_offline(&mut self, is_offline: bool) { - if self.is_offline != is_offline { - self.is_offline = is_offline; - if let Some(sender) = self.sender.upgrade() { - let _ = sender.unbounded_send(TunnelCommand::IsOffline(is_offline)); - } - } - } - - /// Allow the offline check to fail open. - fn reset(&mut self) { - if let Some(sender) = self.sender.upgrade() { - let _ = sender.unbounded_send(TunnelCommand::IsOffline(false)); - } - } -} diff --git a/talpid-core/src/offline/mod.rs b/talpid-core/src/offline/mod.rs index cff84ce163..5cda6290a3 100644 --- a/talpid-core/src/offline/mod.rs +++ b/talpid-core/src/offline/mod.rs @@ -25,7 +25,7 @@ pub use self::imp::Error; pub struct MonitorHandle(imp::MonitorHandle); impl MonitorHandle { - pub fn is_offline(&self) -> bool { + pub fn is_offline(&mut self) -> bool { self.0.is_offline() } } diff --git a/talpid-core/src/tunnel_state_machine/mod.rs b/talpid-core/src/tunnel_state_machine/mod.rs index e5c6ea397f..baf52c4b2b 100644 --- a/talpid-core/src/tunnel_state_machine/mod.rs +++ b/talpid-core/src/tunnel_state_machine/mod.rs @@ -79,7 +79,7 @@ pub fn spawn( ) -> Result<Arc<mpsc::UnboundedSender<TunnelCommand>>, Error> { let (command_tx, command_rx) = mpsc::unbounded(); let command_tx = Arc::new(command_tx); - let offline_monitor = offline::spawn_monitor( + let mut offline_monitor = offline::spawn_monitor( Arc::downgrade(&command_tx), #[cfg(target_os = "android")] android_context.clone(), |
