diff options
| author | David Lönnhager <david.l@mullvad.net> | 2021-06-21 16:00:42 +0200 |
|---|---|---|
| committer | David Lönnhager <david.l@mullvad.net> | 2021-06-22 12:44:50 +0200 |
| commit | 10db393d7e588dbf2f12be6c8f4384db0fe3b562 (patch) | |
| tree | 20f60079ef2f5ee0354173c75e72e89e81e6fa69 /talpid-core/src | |
| parent | ec98f94f75451aae52792b531f80f31aed328333 (diff) | |
| download | mullvadvpn-10db393d7e588dbf2f12be6c8f4384db0fe3b562.tar.xz mullvadvpn-10db393d7e588dbf2f12be6c8f4384db0fe3b562.zip | |
Add route change callbacks to the RouteManager on Linux
Diffstat (limited to 'talpid-core/src')
| -rw-r--r-- | talpid-core/src/routing/linux.rs | 41 | ||||
| -rw-r--r-- | talpid-core/src/routing/unix.rs | 29 |
2 files changed, 60 insertions, 10 deletions
diff --git a/talpid-core/src/routing/linux.rs b/talpid-core/src/routing/linux.rs index 5ea19f8dba..d0878cf3b8 100644 --- a/talpid-core/src/routing/linux.rs +++ b/talpid-core/src/routing/linux.rs @@ -1,4 +1,7 @@ -use crate::routing::{imp::RouteManagerCommand, NetNode, Node, RequiredRoute, Route}; +use crate::routing::{ + imp::{CallbackMessage, RouteManagerCommand}, + NetNode, Node, RequiredRoute, Route, +}; use std::{ collections::{BTreeMap, HashSet}, io, @@ -6,7 +9,11 @@ use std::{ }; use talpid_types::ErrorExt; -use futures::{channel::mpsc::UnboundedReceiver, future::FutureExt, StreamExt, TryStreamExt}; +use futures::{ + channel::mpsc::{UnboundedReceiver, UnboundedSender}, + future::FutureExt, + StreamExt, TryStream, TryStreamExt, +}; use ipnetwork::IpNetwork; use lazy_static::lazy_static; use netlink_packet_route::{ @@ -115,6 +122,7 @@ pub struct RouteManagerImpl { handle: Handle, messages: UnboundedReceiver<(NetlinkMessage<RtnlMessage>, SocketAddr)>, iface_map: BTreeMap<u32, NetworkInterface>, + listeners: Vec<UnboundedSender<CallbackMessage>>, // currently added routes added_routes: HashSet<Route>, @@ -137,9 +145,10 @@ impl RouteManagerImpl { let iface_map = Self::initialize_link_map(&handle).await?; let mut monitor = Self { - iface_map, handle, messages, + iface_map, + listeners: vec![], added_routes: HashSet::new(), }; @@ -295,8 +304,8 @@ impl RouteManagerImpl { .map(|(idx, _name)| *idx) } - async fn process_deleted_route(&mut self, route: Route) -> Result<()> { - self.added_routes.remove(&route); + fn process_deleted_route(&mut self, route: &Route) -> Result<()> { + self.added_routes.remove(route); Ok(()) } @@ -347,6 +356,9 @@ impl RouteManagerImpl { RouteManagerCommand::ClearRoutingRules(result_tx) => { let _ = result_tx.send(self.clear_routing_rules().await); } + RouteManagerCommand::NewChangeListener(result_tx) => { + let _ = result_tx.send(self.listen()); + } RouteManagerCommand::ClearRoutes => { log::debug!("Clearing routes"); self.cleanup_routes().await; @@ -367,9 +379,15 @@ impl RouteManagerImpl { self.iface_map.remove(&idx); } } + NetlinkPayload::InnerMessage(RtnlMessage::NewRoute(new_route)) => { + if let Some(addition) = self.parse_route_message(new_route)? { + self.notify_change_listeners(CallbackMessage::NewRoute(addition)); + } + } NetlinkPayload::InnerMessage(RtnlMessage::DelRoute(old_route)) => { if let Some(deletion) = self.parse_route_message(old_route)? { - self.process_deleted_route(deletion).await?; + self.process_deleted_route(&deletion)?; + self.notify_change_listeners(CallbackMessage::DelRoute(deletion)); } } _ => (), @@ -377,6 +395,11 @@ impl RouteManagerImpl { Ok(()) } + fn notify_change_listeners(&mut self, message: CallbackMessage) { + self.listeners + .retain(|listener| listener.unbounded_send(message.clone()).is_ok()); + } + // Tries to coax a Route out of a RouteMessage fn parse_route_message(&self, msg: RouteMessage) -> Result<Option<Route>> { let af_spec = msg.header.address_family; @@ -676,6 +699,12 @@ impl RouteManagerImpl { Ok(()) } + fn listen(&mut self) -> UnboundedReceiver<CallbackMessage> { + let (tx, rx) = futures::channel::mpsc::unbounded(); + self.listeners.push(tx); + rx + } + async fn destructor(&mut self) { self.cleanup_routes().await; diff --git a/talpid-core/src/routing/unix.rs b/talpid-core/src/routing/unix.rs index 4b7cb8eed4..8806ee3ab4 100644 --- a/talpid-core/src/routing/unix.rs +++ b/talpid-core/src/routing/unix.rs @@ -1,11 +1,14 @@ #![cfg_attr(target_os = "android", allow(dead_code))] #![cfg_attr(target_os = "windows", allow(dead_code))] // TODO: remove the allow(dead_code) for android once it's up to scratch. -use super::RequiredRoute; +use super::{RequiredRoute, Route}; -use futures::channel::{ - mpsc::{self, UnboundedSender}, - oneshot, +use futures::{ + channel::{ + mpsc::{self, UnboundedSender}, + oneshot, + }, + stream::Stream, }; use std::{collections::HashSet, io}; @@ -90,6 +93,16 @@ impl RouteManagerHandle { .map_err(|_| Error::ManagerChannelDown)? .map_err(Error::PlatformError) } + + /// Listen for route changes. + #[cfg(target_os = "linux")] + pub async fn change_listener(&self) -> Result<impl Stream<Item = CallbackMessage>, Error> { + let (response_tx, response_rx) = oneshot::channel(); + self.tx + .unbounded_send(RouteManagerCommand::NewChangeListener(response_tx)) + .map_err(|_| Error::RouteManagerDown)?; + response_rx.await.map_err(|_| Error::ManagerChannelDown) + } } /// Commands for the underlying route manager object. @@ -105,6 +118,14 @@ pub(crate) enum RouteManagerCommand { CreateRoutingRules(bool, oneshot::Sender<Result<(), PlatformError>>), #[cfg(target_os = "linux")] ClearRoutingRules(oneshot::Sender<Result<(), PlatformError>>), + #[cfg(target_os = "linux")] + NewChangeListener(oneshot::Sender<mpsc::UnboundedReceiver<CallbackMessage>>), +} + +#[derive(Debug, Clone)] +pub enum CallbackMessage { + NewRoute(Route), + DelRoute(Route), } /// RouteManager applies a set of routes to the route table. |
