summaryrefslogtreecommitdiffhomepage
path: root/talpid-core/src
diff options
context:
space:
mode:
authorDavid Lönnhager <david.l@mullvad.net>2021-06-21 16:00:42 +0200
committerDavid Lönnhager <david.l@mullvad.net>2021-06-22 12:44:50 +0200
commit10db393d7e588dbf2f12be6c8f4384db0fe3b562 (patch)
tree20f60079ef2f5ee0354173c75e72e89e81e6fa69 /talpid-core/src
parentec98f94f75451aae52792b531f80f31aed328333 (diff)
downloadmullvadvpn-10db393d7e588dbf2f12be6c8f4384db0fe3b562.tar.xz
mullvadvpn-10db393d7e588dbf2f12be6c8f4384db0fe3b562.zip
Add route change callbacks to the RouteManager on Linux
Diffstat (limited to 'talpid-core/src')
-rw-r--r--talpid-core/src/routing/linux.rs41
-rw-r--r--talpid-core/src/routing/unix.rs29
2 files changed, 60 insertions, 10 deletions
diff --git a/talpid-core/src/routing/linux.rs b/talpid-core/src/routing/linux.rs
index 5ea19f8dba..d0878cf3b8 100644
--- a/talpid-core/src/routing/linux.rs
+++ b/talpid-core/src/routing/linux.rs
@@ -1,4 +1,7 @@
-use crate::routing::{imp::RouteManagerCommand, NetNode, Node, RequiredRoute, Route};
+use crate::routing::{
+ imp::{CallbackMessage, RouteManagerCommand},
+ NetNode, Node, RequiredRoute, Route,
+};
use std::{
collections::{BTreeMap, HashSet},
io,
@@ -6,7 +9,11 @@ use std::{
};
use talpid_types::ErrorExt;
-use futures::{channel::mpsc::UnboundedReceiver, future::FutureExt, StreamExt, TryStreamExt};
+use futures::{
+ channel::mpsc::{UnboundedReceiver, UnboundedSender},
+ future::FutureExt,
+ StreamExt, TryStream, TryStreamExt,
+};
use ipnetwork::IpNetwork;
use lazy_static::lazy_static;
use netlink_packet_route::{
@@ -115,6 +122,7 @@ pub struct RouteManagerImpl {
handle: Handle,
messages: UnboundedReceiver<(NetlinkMessage<RtnlMessage>, SocketAddr)>,
iface_map: BTreeMap<u32, NetworkInterface>,
+ listeners: Vec<UnboundedSender<CallbackMessage>>,
// currently added routes
added_routes: HashSet<Route>,
@@ -137,9 +145,10 @@ impl RouteManagerImpl {
let iface_map = Self::initialize_link_map(&handle).await?;
let mut monitor = Self {
- iface_map,
handle,
messages,
+ iface_map,
+ listeners: vec![],
added_routes: HashSet::new(),
};
@@ -295,8 +304,8 @@ impl RouteManagerImpl {
.map(|(idx, _name)| *idx)
}
- async fn process_deleted_route(&mut self, route: Route) -> Result<()> {
- self.added_routes.remove(&route);
+ fn process_deleted_route(&mut self, route: &Route) -> Result<()> {
+ self.added_routes.remove(route);
Ok(())
}
@@ -347,6 +356,9 @@ impl RouteManagerImpl {
RouteManagerCommand::ClearRoutingRules(result_tx) => {
let _ = result_tx.send(self.clear_routing_rules().await);
}
+ RouteManagerCommand::NewChangeListener(result_tx) => {
+ let _ = result_tx.send(self.listen());
+ }
RouteManagerCommand::ClearRoutes => {
log::debug!("Clearing routes");
self.cleanup_routes().await;
@@ -367,9 +379,15 @@ impl RouteManagerImpl {
self.iface_map.remove(&idx);
}
}
+ NetlinkPayload::InnerMessage(RtnlMessage::NewRoute(new_route)) => {
+ if let Some(addition) = self.parse_route_message(new_route)? {
+ self.notify_change_listeners(CallbackMessage::NewRoute(addition));
+ }
+ }
NetlinkPayload::InnerMessage(RtnlMessage::DelRoute(old_route)) => {
if let Some(deletion) = self.parse_route_message(old_route)? {
- self.process_deleted_route(deletion).await?;
+ self.process_deleted_route(&deletion)?;
+ self.notify_change_listeners(CallbackMessage::DelRoute(deletion));
}
}
_ => (),
@@ -377,6 +395,11 @@ impl RouteManagerImpl {
Ok(())
}
+ fn notify_change_listeners(&mut self, message: CallbackMessage) {
+ self.listeners
+ .retain(|listener| listener.unbounded_send(message.clone()).is_ok());
+ }
+
// Tries to coax a Route out of a RouteMessage
fn parse_route_message(&self, msg: RouteMessage) -> Result<Option<Route>> {
let af_spec = msg.header.address_family;
@@ -676,6 +699,12 @@ impl RouteManagerImpl {
Ok(())
}
+ fn listen(&mut self) -> UnboundedReceiver<CallbackMessage> {
+ let (tx, rx) = futures::channel::mpsc::unbounded();
+ self.listeners.push(tx);
+ rx
+ }
+
async fn destructor(&mut self) {
self.cleanup_routes().await;
diff --git a/talpid-core/src/routing/unix.rs b/talpid-core/src/routing/unix.rs
index 4b7cb8eed4..8806ee3ab4 100644
--- a/talpid-core/src/routing/unix.rs
+++ b/talpid-core/src/routing/unix.rs
@@ -1,11 +1,14 @@
#![cfg_attr(target_os = "android", allow(dead_code))]
#![cfg_attr(target_os = "windows", allow(dead_code))]
// TODO: remove the allow(dead_code) for android once it's up to scratch.
-use super::RequiredRoute;
+use super::{RequiredRoute, Route};
-use futures::channel::{
- mpsc::{self, UnboundedSender},
- oneshot,
+use futures::{
+ channel::{
+ mpsc::{self, UnboundedSender},
+ oneshot,
+ },
+ stream::Stream,
};
use std::{collections::HashSet, io};
@@ -90,6 +93,16 @@ impl RouteManagerHandle {
.map_err(|_| Error::ManagerChannelDown)?
.map_err(Error::PlatformError)
}
+
+ /// Listen for route changes.
+ #[cfg(target_os = "linux")]
+ pub async fn change_listener(&self) -> Result<impl Stream<Item = CallbackMessage>, Error> {
+ let (response_tx, response_rx) = oneshot::channel();
+ self.tx
+ .unbounded_send(RouteManagerCommand::NewChangeListener(response_tx))
+ .map_err(|_| Error::RouteManagerDown)?;
+ response_rx.await.map_err(|_| Error::ManagerChannelDown)
+ }
}
/// Commands for the underlying route manager object.
@@ -105,6 +118,14 @@ pub(crate) enum RouteManagerCommand {
CreateRoutingRules(bool, oneshot::Sender<Result<(), PlatformError>>),
#[cfg(target_os = "linux")]
ClearRoutingRules(oneshot::Sender<Result<(), PlatformError>>),
+ #[cfg(target_os = "linux")]
+ NewChangeListener(oneshot::Sender<mpsc::UnboundedReceiver<CallbackMessage>>),
+}
+
+#[derive(Debug, Clone)]
+pub enum CallbackMessage {
+ NewRoute(Route),
+ DelRoute(Route),
}
/// RouteManager applies a set of routes to the route table.