summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorEmīls <emils@mullvad.net>2020-04-15 10:41:45 +0100
committerEmīls Piņķis <emils@mullvad.net>2020-04-27 11:17:00 +0100
commit80fb0b2bd59708f4b0c9f01b410eef739f1af6a8 (patch)
tree96913ebfe90beb6b134d43e747143503ed6fa7a6
parent6d7141d24d17f2642eb3bc19675dd05b57fb6265 (diff)
downloadmullvadvpn-80fb0b2bd59708f4b0c9f01b410eef739f1af6a8.tar.xz
mullvadvpn-80fb0b2bd59708f4b0c9f01b410eef739f1af6a8.zip
Rework offline detection on Linux
-rw-r--r--talpid-core/src/offline/linux.rs283
-rw-r--r--talpid-core/src/offline/mod.rs2
-rw-r--r--talpid-core/src/tunnel_state_machine/mod.rs2
3 files changed, 113 insertions, 174 deletions
diff --git a/talpid-core/src/offline/linux.rs b/talpid-core/src/offline/linux.rs
index b75f187505..0b6526f0e5 100644
--- a/talpid-core/src/offline/linux.rs
+++ b/talpid-core/src/offline/linux.rs
@@ -1,178 +1,173 @@
use crate::tunnel_state_machine::TunnelCommand;
-use futures::{future::Either, sync::mpsc::UnboundedSender, Future, Stream};
-use log::{error, warn};
-use netlink_packet::{
- AddressMessage, LinkInfo, LinkInfoKind, LinkLayerType, LinkMessage, LinkNla, NetlinkMessage,
+use futures::{StreamExt, TryStreamExt};
+use futures01::sync::mpsc::UnboundedSender;
+use netlink_packet_route::{
+ constants::{ARPHRD_LOOPBACK, ARPHRD_NONE, IFF_LOWER_UP, IFF_UP},
+ rtnl::link::nlas::{Info as LinkInfo, InfoKind, Nla as LinkNla},
+ LinkMessage,
};
use netlink_sys::SocketAddr;
use rtnetlink::{
constants::{RTMGRP_IPV4_IFADDR, RTMGRP_IPV6_IFADDR, RTMGRP_LINK, RTMGRP_NOTIFY},
- Connection, Handle,
+ Handle,
};
-use std::{collections::BTreeSet, io, sync::Weak, thread};
-use talpid_types::ErrorExt;
+use std::{collections::BTreeSet, io, sync::Weak};
pub type Result<T> = std::result::Result<T, Error>;
+const EVENT_LOOP_THREAD_NAME: &str = "mullvad-offline-detection-event-loop";
+
#[derive(err_derive::Error, Debug)]
#[error(no_from)]
pub enum Error {
#[error(display = "Failed to get list of IP links")]
GetLinksError(#[error(source)] failure::Compat<rtnetlink::Error>),
+ #[error(display = "Failed to get list of IP addresses")]
+ GetAddressesError(#[error(source)] failure::Compat<rtnetlink::Error>),
+
#[error(display = "Failed to connect to netlink socket")]
NetlinkConnectionError(#[error(source)] io::Error),
+ #[error(display = "Failed to connect to bind to netlink socket")]
+ BindError(#[error(source)] io::Error),
+
#[error(display = "Failed to start listening on netlink socket")]
NetlinkBindError(#[error(source)] io::Error),
- #[error(display = "Error while communicating on the netlink socket")]
- NetlinkError(#[error(source)] netlink_proto::Error),
-
#[error(display = "Error while processing netlink messages")]
MonitorNetlinkError,
#[error(display = "Netlink connection has unexpectedly disconnected")]
NetlinkDisconnected,
-}
-
-pub struct MonitorHandle;
-
-pub fn spawn_monitor(sender: Weak<UnboundedSender<TunnelCommand>>) -> Result<MonitorHandle> {
- let socket = SocketAddr::new(
- 0,
- RTMGRP_NOTIFY | RTMGRP_LINK | RTMGRP_IPV4_IFADDR | RTMGRP_IPV6_IFADDR,
- );
- let (mut connection, _, messages) = rtnetlink::new_connection_with_messages().unwrap();
- connection
- .socket_mut()
- .bind(&socket)
- .map_err(Error::NetlinkBindError)?;
-
- let link_monitor = LinkMonitor::new(sender);
-
- thread::spawn(|| {
- if let Err(error) = monitor_event_loop(connection, messages, link_monitor) {
- error!(
- "{}",
- error.display_chain_with_msg("Error running link monitor event loop")
- );
- }
- });
-
- Ok(MonitorHandle)
+ #[error(display = "Failed to initialize event loop")]
+ EventLoopError(#[error(source)] io::Error),
}
-fn is_offline() -> bool {
- check_if_offline().unwrap_or_else(|error| {
- warn!(
- "{}",
- error.display_chain_with_msg("Failed to check for internet connection")
- );
- false
- })
+pub struct MonitorHandle {
+ handle: rtnetlink::Handle,
+ runtime: tokio02::runtime::Runtime,
}
impl MonitorHandle {
- pub fn is_offline(&self) -> bool {
- is_offline()
+ pub fn is_offline(&mut self) -> bool {
+ match self.runtime.block_on(check_offline_state(&self.handle)) {
+ Ok(is_offline) => is_offline,
+ Err(err) => {
+ log::error!(
+ "Failed to verify offline state: {}. Presuming connectivity",
+ err
+ );
+ false
+ }
+ }
}
}
-/// Checks if there are no running links or that none of the running links have IP addresses
-/// assigned to them.
-fn check_if_offline() -> Result<bool> {
- let mut connection = NetlinkConnection::new()?;
- let interfaces = connection.running_interfaces()?;
+pub fn spawn_monitor(sender: Weak<UnboundedSender<TunnelCommand>>) -> Result<MonitorHandle> {
+ let mut runtime = tokio02::runtime::Builder::new()
+ .threaded_scheduler()
+ .core_threads(1)
+ .enable_all()
+ .thread_name(EVENT_LOOP_THREAD_NAME)
+ .build()
+ .map_err(Error::EventLoopError)?;
- if interfaces.is_empty() {
- Ok(true)
- } else {
- // Check if the current IP addresses are not assigned to any one of the running interfaces
- Ok(connection
- .addresses()?
- .into_iter()
- .all(|address| !interfaces.contains(&address.header.index)))
- }
-}
+ let (connection, handle, mut messages) = runtime.block_on(async move {
+ let (mut connection, handle, messages) =
+ rtnetlink::new_connection().map_err(Error::NetlinkConnectionError)?;
-struct NetlinkConnection {
- connection: Option<Connection>,
- handle: Handle,
-}
+ let mgroup_flags = RTMGRP_IPV4_IFADDR | RTMGRP_IPV6_IFADDR | RTMGRP_LINK | RTMGRP_NOTIFY;
+ let addr = SocketAddr::new(0, mgroup_flags);
-impl NetlinkConnection {
- /// Open a connection on the netlink socket.
- pub fn new() -> Result<Self> {
- let (connection, handle) =
- rtnetlink::new_connection().map_err(Error::NetlinkConnectionError)?;
+ connection
+ .socket_mut()
+ .bind(&addr)
+ .map_err(Error::BindError)?;
- Ok(NetlinkConnection {
- connection: Some(connection),
- handle,
- })
- }
+ Ok((connection, handle, messages))
+ })?;
- /// List all IP addresses assigned to all interfaces.
- pub fn addresses(&mut self) -> Result<Vec<AddressMessage>> {
- self.execute_request(self.handle.address().get().execute().collect())
- }
+ // Connection will be closed once the runtime is dropped
+ let _ = runtime.spawn(connection);
+ let mut is_offline = runtime.block_on(check_offline_state(&handle))?;
- /// List all links registered on the system.
- fn links(&mut self) -> Result<Vec<LinkMessage>> {
- self.execute_request(self.handle.link().get().execute().collect())
- }
+ let monitor_handle = MonitorHandle {
+ handle: handle.clone(),
+ runtime,
+ };
- /// List all unique interface indices that have a running link.
- pub fn running_interfaces(&mut self) -> Result<BTreeSet<u32>> {
- let links = self.links()?;
- Ok(links
- .into_iter()
- .filter(link_provides_connectivity)
- .map(|link| link.header.index)
- .collect())
- }
+ let _ = monitor_handle.runtime.spawn(async move {
+ while let Some(_new_message) = messages.next().await {
+ match sender.upgrade() {
+ Some(sender) => {
+ let new_offline_state = check_offline_state(&handle).await.unwrap_or(false);
+ if new_offline_state != is_offline {
+ is_offline = new_offline_state;
+ let _ = sender.unbounded_send(TunnelCommand::IsOffline(is_offline));
+ }
+ }
+ None => return,
+ }
+ }
+ });
- /// Helper function to execute an asynchronous request synchronously.
- fn execute_request<R>(&mut self, request: R) -> Result<R::Item>
- where
- R: Future<Error = rtnetlink::Error>,
+
+ Ok(monitor_handle)
+}
+
+async fn check_offline_state(handle: &Handle) -> Result<bool> {
+ let mut link_request = handle.link().get().execute();
+ let mut links = BTreeSet::new();
+ while let Some(link) = link_request
+ .try_next()
+ .await
+ .map_err(failure::Fail::compat)
+ .map_err(Error::GetLinksError)?
{
- let connection = self.connection.take().ok_or(Error::NetlinkDisconnected)?;
+ if link_provides_connectivity(&link) {
+ links.insert(link.header.index);
+ }
+ }
+
+ if links.is_empty() {
+ return Ok(true);
+ }
- let (result, connection) = match connection.select2(request).wait() {
- Ok(Either::A(_)) => return Err(Error::NetlinkDisconnected),
- Err(Either::A((error, _))) => return Err(Error::NetlinkError(error)),
- Ok(Either::B((links, connection))) => (Ok(links), connection),
- Err(Either::B((error, connection))) => (
- Err(Error::GetLinksError(failure::Fail::compat(error))),
- connection,
- ),
- };
+ let mut address_request = handle.address().get().execute();
- self.connection = Some(connection);
- result
+ while let Some(address) = address_request
+ .try_next()
+ .await
+ .map_err(failure::Fail::compat)
+ .map_err(Error::GetAddressesError)?
+ {
+ if links.contains(&address.header.index) {
+ return Ok(false);
+ }
}
+ Ok(true)
}
+
+// TODO: Improve by allowing bridge links to provide connectivity, will require route checking.
fn link_provides_connectivity(link: &LinkMessage) -> bool {
// Some tunnels have the link layer type set to None
- link.header.link_layer_type != LinkLayerType::Loopback
- && link.header.link_layer_type != LinkLayerType::None
- && link.header.link_layer_type != LinkLayerType::Irda
- && link.header.flags.is_running()
+ link.header.link_layer_type != ARPHRD_NONE
+ && link.header.link_layer_type != ARPHRD_LOOPBACK
+ && (link.header.flags & IFF_UP > 0 || link.header.flags & IFF_LOWER_UP > 0)
&& !is_virtual_interface(link)
}
fn is_virtual_interface(link: &LinkMessage) -> bool {
for nla in link.nlas.iter() {
- if let LinkNla::LinkInfo(link_info) = nla {
- for info in link_info.iter() {
+ if let LinkNla::Info(info_nlas) = nla {
+ for info in info_nlas.iter() {
// LinkInfo::Kind seems to only be set when the link is actually virtual
if let LinkInfo::Kind(ref kind) = info {
- use LinkInfoKind::*;
+ use InfoKind::*;
return match kind {
Dummy | Bridge | Tun | Nlmon | IpTun => true,
_ => false,
@@ -183,59 +178,3 @@ fn is_virtual_interface(link: &LinkMessage) -> bool {
}
false
}
-
-fn monitor_event_loop(
- connection: Connection,
- channel: impl Stream<Item = NetlinkMessage, Error = ()>,
- mut link_monitor: LinkMonitor,
-) -> Result<()> {
- let monitor = channel
- .for_each(|_message| {
- link_monitor.update();
- Ok(())
- })
- .map_err(|_| Error::MonitorNetlinkError);
-
- // Under normal circumstances, this runs forever.
- let result = connection
- .map_err(Error::NetlinkError)
- .join(monitor)
- .wait()
- .map(|_| ());
- // But if it fails, it should fail open.
- link_monitor.reset();
- result
-}
-
-struct LinkMonitor {
- is_offline: bool,
- sender: Weak<UnboundedSender<TunnelCommand>>,
-}
-
-impl LinkMonitor {
- pub fn new(sender: Weak<UnboundedSender<TunnelCommand>>) -> Self {
- let is_offline = is_offline();
-
- LinkMonitor { is_offline, sender }
- }
-
- pub fn update(&mut self) {
- self.set_is_offline(is_offline());
- }
-
- fn set_is_offline(&mut self, is_offline: bool) {
- if self.is_offline != is_offline {
- self.is_offline = is_offline;
- if let Some(sender) = self.sender.upgrade() {
- let _ = sender.unbounded_send(TunnelCommand::IsOffline(is_offline));
- }
- }
- }
-
- /// Allow the offline check to fail open.
- fn reset(&mut self) {
- if let Some(sender) = self.sender.upgrade() {
- let _ = sender.unbounded_send(TunnelCommand::IsOffline(false));
- }
- }
-}
diff --git a/talpid-core/src/offline/mod.rs b/talpid-core/src/offline/mod.rs
index cff84ce163..5cda6290a3 100644
--- a/talpid-core/src/offline/mod.rs
+++ b/talpid-core/src/offline/mod.rs
@@ -25,7 +25,7 @@ pub use self::imp::Error;
pub struct MonitorHandle(imp::MonitorHandle);
impl MonitorHandle {
- pub fn is_offline(&self) -> bool {
+ pub fn is_offline(&mut self) -> bool {
self.0.is_offline()
}
}
diff --git a/talpid-core/src/tunnel_state_machine/mod.rs b/talpid-core/src/tunnel_state_machine/mod.rs
index e5c6ea397f..baf52c4b2b 100644
--- a/talpid-core/src/tunnel_state_machine/mod.rs
+++ b/talpid-core/src/tunnel_state_machine/mod.rs
@@ -79,7 +79,7 @@ pub fn spawn(
) -> Result<Arc<mpsc::UnboundedSender<TunnelCommand>>, Error> {
let (command_tx, command_rx) = mpsc::unbounded();
let command_tx = Arc::new(command_tx);
- let offline_monitor = offline::spawn_monitor(
+ let mut offline_monitor = offline::spawn_monitor(
Arc::downgrade(&command_tx),
#[cfg(target_os = "android")]
android_context.clone(),