diff options
| author | David Lönnhager <david.l@mullvad.net> | 2020-05-11 17:39:39 +0200 |
|---|---|---|
| committer | David Lönnhager <david.l@mullvad.net> | 2020-05-13 17:56:36 +0200 |
| commit | a77366895dc9e3bfbb2946b5787a1b256c1f9ca8 (patch) | |
| tree | 963ad05365c67cb845e9ed5724365fd3a741ff50 | |
| parent | 3f2f1ff05935c6f23c518abb02b9a97cdcf037f9 (diff) | |
| download | mullvadvpn-a77366895dc9e3bfbb2946b5787a1b256c1f9ca8.tar.xz mullvadvpn-a77366895dc9e3bfbb2946b5787a1b256c1f9ca8.zip | |
Update route manager on macOS
| -rw-r--r-- | talpid-core/src/routing/macos.rs | 149 |
1 files changed, 82 insertions, 67 deletions
diff --git a/talpid-core/src/routing/macos.rs b/talpid-core/src/routing/macos.rs index 4f05927824..2059254850 100644 --- a/talpid-core/src/routing/macos.rs +++ b/talpid-core/src/routing/macos.rs @@ -1,4 +1,4 @@ -use crate::routing::{NetNode, Node, RequiredRoute, Route}; +use crate::routing::{imp::RouteManagerCommand, NetNode, Node, RequiredRoute, Route}; use ipnetwork::IpNetwork; use std::{ @@ -8,7 +8,11 @@ use std::{ process::{Command, ExitStatus, Stdio}, }; -use futures01::{stream, sync::oneshot, Async, Future, IntoFuture, Stream}; +use futures01::{ + stream, + sync::{mpsc, oneshot}, + Async, Future, IntoFuture, Stream, +}; use tokio_process::{Child, CommandExt}; @@ -69,18 +73,16 @@ pub struct RouteManagerImpl { current_state: RouteManagerState, v4_gateway: Option<Node>, v6_gateway: Option<Node>, - shutdown_rx: Option<oneshot::Receiver<oneshot::Sender<()>>>, + manage_rx: Option<mpsc::UnboundedReceiver<RouteManagerCommand>>, } impl RouteManagerImpl { pub fn new( required_routes: HashSet<RequiredRoute>, - shutdown_rx: oneshot::Receiver<oneshot::Sender<()>>, + manage_rx: mpsc::UnboundedReceiver<RouteManagerCommand>, ) -> Result<Self> { - let mut applied_routes = HashSet::new(); - let mut routes_to_apply = vec![]; - let mut default_destinations = HashSet::new(); + let change_listener = ChangeListener::new().map_err(Error::FailedToMonitorRoutes)?; let v4_gateway = Self::get_default_node_cmd("-inet").wait()?; let v6_gateway = Self::get_default_node_cmd("-inet6").wait()?; @@ -89,6 +91,27 @@ impl RouteManagerImpl { return Err(Error::NoDefaultRoute); } + let mut manager = Self { + default_destinations: HashSet::new(), + applied_routes: HashSet::new(), + current_state: RouteManagerState::Listening(change_listener), + manage_rx: Some(manage_rx), + v4_gateway, + v6_gateway, + }; + + manager.add_required_routes(required_routes)?; + + Ok(manager) + } + + fn add_required_routes( + &mut self, + required_routes: HashSet<RequiredRoute>, + ) -> Result<()> { + let mut routes_to_apply = vec![]; + let mut default_destinations = HashSet::new(); + for route in required_routes { match route.node { NetNode::DefaultNode => { @@ -99,47 +122,22 @@ impl RouteManagerImpl { } } - let apply_routes_fn = || -> Result<()> { - for route in routes_to_apply { - Self::add_route(&route).wait()?; - applied_routes.insert(route); - } - for destination in default_destinations.iter() { - match (&v4_gateway, &v6_gateway, destination.is_ipv4()) { - (Some(gateway), _, true) | (_, Some(gateway), false) => { - let route = Route::new(gateway.clone(), *destination); - Self::add_route(&route).wait()?; - applied_routes.insert(route); - } - _ => (), - }; - } - - Ok(()) - }; - - if let Err(e) = apply_routes_fn() { - log::error!("Failed to apply routes - {}", e); - for applied_route in applied_routes.iter() { - if let Err(removal_err) = Self::delete_route(applied_route.prefix).wait() { - log::error!( - "Failed to clean up routes after failing to set them up - {}", - removal_err - ); + for route in routes_to_apply { + Self::add_route(&route).wait()?; + self.applied_routes.insert(route); + } + for destination in default_destinations.iter() { + match (&self.v4_gateway, &self.v6_gateway, destination.is_ipv4()) { + (Some(gateway), _, true) | (_, Some(gateway), false) => { + let route = Route::new(gateway.clone(), *destination); + Self::add_route(&route).wait()?; + self.applied_routes.insert(route); } - } - return Err(e); + _ => (), + }; } - let change_listener = ChangeListener::new().map_err(Error::FailedToMonitorRoutes)?; - Ok(Self { - default_destinations, - applied_routes, - current_state: RouteManagerState::Listening(change_listener), - shutdown_rx: Some(shutdown_rx), - v4_gateway, - v6_gateway, - }) + Ok(()) } // Retrieves the node that's currently used to reach 0.0.0.0/0 @@ -230,10 +228,7 @@ impl RouteManagerImpl { .map_err(Error::FailedToAddRoute) } - fn shutdown_future( - &self, - shutdown_done_tx: Option<oneshot::Sender<()>>, - ) -> impl Future<Item = (), Error = ()> + Send { + fn cleanup_routes(&self) -> impl Future<Item = (), Error = ()> + Send { let remove_route_future = |route: &Route| { Self::delete_route(route.prefix).then(|removal| { match removal { @@ -261,16 +256,21 @@ impl RouteManagerImpl { _ => None, } })); - stream::futures_ordered(routes_to_remove) - .for_each(|_| Ok(())) - .and_then(|_| { - if let Some(tx) = shutdown_done_tx { - if tx.send(()).is_err() { - log::debug!("RouteManager already dropped") - } + stream::futures_ordered(routes_to_remove).for_each(|_| Ok(())) + } + + fn shutdown_future( + &self, + shutdown_done_tx: Option<oneshot::Sender<()>>, + ) -> impl Future<Item = (), Error = ()> + Send { + self.cleanup_routes().and_then(|_| { + if let Some(tx) = shutdown_done_tx { + if tx.send(()).is_err() { + log::debug!("RouteManager already dropped") } - Ok(()) - }) + } + Ok(()) + }) } fn apply_new_default_routes( @@ -323,20 +323,35 @@ impl Future for RouteManagerImpl { type Item = (); type Error = Error; fn poll(&mut self) -> Result<Async<()>> { - if let Some(mut shutdown_rx) = self.shutdown_rx.take() { - match shutdown_rx.poll() { - Ok(Async::Ready(shutdown_tx)) => { - self.current_state = RouteManagerState::Shutdown(Box::new( - self.shutdown_future(Some(shutdown_tx)), - )); - } + if let Some(mut manage_rx) = self.manage_rx.take() { + match manage_rx.poll() { + Ok(Async::Ready(Some(command))) => match command { + RouteManagerCommand::Shutdown(tx) => { + self.current_state = + RouteManagerState::Shutdown(Box::new(self.shutdown_future(Some(tx)))); + } + RouteManagerCommand::AddRoutes(routes, result_tx) => { + self.manage_rx = Some(manage_rx); + log::debug!("Adding routes: {:?}", routes); + if let Err(error) = self.add_required_routes(routes) { + let _ = result_tx.send(Err(error)); + } else { + let _ = result_tx.send(Ok(())); + } + } + RouteManagerCommand::ClearRoutes => { + self.manage_rx = Some(manage_rx); + log::debug!("Clearing routes"); + let _ = self.cleanup_routes().wait(); + } + }, // handle is already dropped - Err(_) => { + Ok(Async::Ready(None)) | Err(_) => { self.current_state = RouteManagerState::Shutdown(Box::new(self.shutdown_future(None))); } Ok(Async::NotReady) => { - self.shutdown_rx = Some(shutdown_rx); + self.manage_rx = Some(manage_rx); } }; } |
