summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorDavid Lönnhager <david.l@mullvad.net>2020-05-11 17:39:39 +0200
committerDavid Lönnhager <david.l@mullvad.net>2020-05-13 17:56:36 +0200
commita77366895dc9e3bfbb2946b5787a1b256c1f9ca8 (patch)
tree963ad05365c67cb845e9ed5724365fd3a741ff50
parent3f2f1ff05935c6f23c518abb02b9a97cdcf037f9 (diff)
downloadmullvadvpn-a77366895dc9e3bfbb2946b5787a1b256c1f9ca8.tar.xz
mullvadvpn-a77366895dc9e3bfbb2946b5787a1b256c1f9ca8.zip
Update route manager on macOS
-rw-r--r--talpid-core/src/routing/macos.rs149
1 files changed, 82 insertions, 67 deletions
diff --git a/talpid-core/src/routing/macos.rs b/talpid-core/src/routing/macos.rs
index 4f05927824..2059254850 100644
--- a/talpid-core/src/routing/macos.rs
+++ b/talpid-core/src/routing/macos.rs
@@ -1,4 +1,4 @@
-use crate::routing::{NetNode, Node, RequiredRoute, Route};
+use crate::routing::{imp::RouteManagerCommand, NetNode, Node, RequiredRoute, Route};
use ipnetwork::IpNetwork;
use std::{
@@ -8,7 +8,11 @@ use std::{
process::{Command, ExitStatus, Stdio},
};
-use futures01::{stream, sync::oneshot, Async, Future, IntoFuture, Stream};
+use futures01::{
+ stream,
+ sync::{mpsc, oneshot},
+ Async, Future, IntoFuture, Stream,
+};
use tokio_process::{Child, CommandExt};
@@ -69,18 +73,16 @@ pub struct RouteManagerImpl {
current_state: RouteManagerState,
v4_gateway: Option<Node>,
v6_gateway: Option<Node>,
- shutdown_rx: Option<oneshot::Receiver<oneshot::Sender<()>>>,
+ manage_rx: Option<mpsc::UnboundedReceiver<RouteManagerCommand>>,
}
impl RouteManagerImpl {
pub fn new(
required_routes: HashSet<RequiredRoute>,
- shutdown_rx: oneshot::Receiver<oneshot::Sender<()>>,
+ manage_rx: mpsc::UnboundedReceiver<RouteManagerCommand>,
) -> Result<Self> {
- let mut applied_routes = HashSet::new();
- let mut routes_to_apply = vec![];
- let mut default_destinations = HashSet::new();
+ let change_listener = ChangeListener::new().map_err(Error::FailedToMonitorRoutes)?;
let v4_gateway = Self::get_default_node_cmd("-inet").wait()?;
let v6_gateway = Self::get_default_node_cmd("-inet6").wait()?;
@@ -89,6 +91,27 @@ impl RouteManagerImpl {
return Err(Error::NoDefaultRoute);
}
+ let mut manager = Self {
+ default_destinations: HashSet::new(),
+ applied_routes: HashSet::new(),
+ current_state: RouteManagerState::Listening(change_listener),
+ manage_rx: Some(manage_rx),
+ v4_gateway,
+ v6_gateway,
+ };
+
+ manager.add_required_routes(required_routes)?;
+
+ Ok(manager)
+ }
+
+ fn add_required_routes(
+ &mut self,
+ required_routes: HashSet<RequiredRoute>,
+ ) -> Result<()> {
+ let mut routes_to_apply = vec![];
+ let mut default_destinations = HashSet::new();
+
for route in required_routes {
match route.node {
NetNode::DefaultNode => {
@@ -99,47 +122,22 @@ impl RouteManagerImpl {
}
}
- let apply_routes_fn = || -> Result<()> {
- for route in routes_to_apply {
- Self::add_route(&route).wait()?;
- applied_routes.insert(route);
- }
- for destination in default_destinations.iter() {
- match (&v4_gateway, &v6_gateway, destination.is_ipv4()) {
- (Some(gateway), _, true) | (_, Some(gateway), false) => {
- let route = Route::new(gateway.clone(), *destination);
- Self::add_route(&route).wait()?;
- applied_routes.insert(route);
- }
- _ => (),
- };
- }
-
- Ok(())
- };
-
- if let Err(e) = apply_routes_fn() {
- log::error!("Failed to apply routes - {}", e);
- for applied_route in applied_routes.iter() {
- if let Err(removal_err) = Self::delete_route(applied_route.prefix).wait() {
- log::error!(
- "Failed to clean up routes after failing to set them up - {}",
- removal_err
- );
+ for route in routes_to_apply {
+ Self::add_route(&route).wait()?;
+ self.applied_routes.insert(route);
+ }
+ for destination in default_destinations.iter() {
+ match (&self.v4_gateway, &self.v6_gateway, destination.is_ipv4()) {
+ (Some(gateway), _, true) | (_, Some(gateway), false) => {
+ let route = Route::new(gateway.clone(), *destination);
+ Self::add_route(&route).wait()?;
+ self.applied_routes.insert(route);
}
- }
- return Err(e);
+ _ => (),
+ };
}
- let change_listener = ChangeListener::new().map_err(Error::FailedToMonitorRoutes)?;
- Ok(Self {
- default_destinations,
- applied_routes,
- current_state: RouteManagerState::Listening(change_listener),
- shutdown_rx: Some(shutdown_rx),
- v4_gateway,
- v6_gateway,
- })
+ Ok(())
}
// Retrieves the node that's currently used to reach 0.0.0.0/0
@@ -230,10 +228,7 @@ impl RouteManagerImpl {
.map_err(Error::FailedToAddRoute)
}
- fn shutdown_future(
- &self,
- shutdown_done_tx: Option<oneshot::Sender<()>>,
- ) -> impl Future<Item = (), Error = ()> + Send {
+ fn cleanup_routes(&self) -> impl Future<Item = (), Error = ()> + Send {
let remove_route_future = |route: &Route| {
Self::delete_route(route.prefix).then(|removal| {
match removal {
@@ -261,16 +256,21 @@ impl RouteManagerImpl {
_ => None,
}
}));
- stream::futures_ordered(routes_to_remove)
- .for_each(|_| Ok(()))
- .and_then(|_| {
- if let Some(tx) = shutdown_done_tx {
- if tx.send(()).is_err() {
- log::debug!("RouteManager already dropped")
- }
+ stream::futures_ordered(routes_to_remove).for_each(|_| Ok(()))
+ }
+
+ fn shutdown_future(
+ &self,
+ shutdown_done_tx: Option<oneshot::Sender<()>>,
+ ) -> impl Future<Item = (), Error = ()> + Send {
+ self.cleanup_routes().and_then(|_| {
+ if let Some(tx) = shutdown_done_tx {
+ if tx.send(()).is_err() {
+ log::debug!("RouteManager already dropped")
}
- Ok(())
- })
+ }
+ Ok(())
+ })
}
fn apply_new_default_routes(
@@ -323,20 +323,35 @@ impl Future for RouteManagerImpl {
type Item = ();
type Error = Error;
fn poll(&mut self) -> Result<Async<()>> {
- if let Some(mut shutdown_rx) = self.shutdown_rx.take() {
- match shutdown_rx.poll() {
- Ok(Async::Ready(shutdown_tx)) => {
- self.current_state = RouteManagerState::Shutdown(Box::new(
- self.shutdown_future(Some(shutdown_tx)),
- ));
- }
+ if let Some(mut manage_rx) = self.manage_rx.take() {
+ match manage_rx.poll() {
+ Ok(Async::Ready(Some(command))) => match command {
+ RouteManagerCommand::Shutdown(tx) => {
+ self.current_state =
+ RouteManagerState::Shutdown(Box::new(self.shutdown_future(Some(tx))));
+ }
+ RouteManagerCommand::AddRoutes(routes, result_tx) => {
+ self.manage_rx = Some(manage_rx);
+ log::debug!("Adding routes: {:?}", routes);
+ if let Err(error) = self.add_required_routes(routes) {
+ let _ = result_tx.send(Err(error));
+ } else {
+ let _ = result_tx.send(Ok(()));
+ }
+ }
+ RouteManagerCommand::ClearRoutes => {
+ self.manage_rx = Some(manage_rx);
+ log::debug!("Clearing routes");
+ let _ = self.cleanup_routes().wait();
+ }
+ },
// handle is already dropped
- Err(_) => {
+ Ok(Async::Ready(None)) | Err(_) => {
self.current_state =
RouteManagerState::Shutdown(Box::new(self.shutdown_future(None)));
}
Ok(Async::NotReady) => {
- self.shutdown_rx = Some(shutdown_rx);
+ self.manage_rx = Some(manage_rx);
}
};
}