diff options
| author | Emīls <emils@mullvad.net> | 2019-11-26 17:00:25 +0000 |
|---|---|---|
| committer | Emīls <emils@mullvad.net> | 2019-11-27 12:11:46 +0000 |
| commit | 0a6549ac082a289a3de01b4c96b5062a3b99b1cc (patch) | |
| tree | 96b0b4e99e9296b91a1b34a5e3d084699d6aa5c7 | |
| parent | efc0a899ab27be62f30d8f59f487680c52f25cec (diff) | |
| download | mullvadvpn-0a6549ac082a289a3de01b4c96b5062a3b99b1cc.tar.xz mullvadvpn-0a6549ac082a289a3de01b4c96b5062a3b99b1cc.zip | |
Stop using tokio-process for routing
| -rw-r--r-- | talpid-core/src/routing/linux/mod.rs | 300 |
1 files changed, 94 insertions, 206 deletions
diff --git a/talpid-core/src/routing/linux/mod.rs b/talpid-core/src/routing/linux/mod.rs index 98fc929db7..00020eccbc 100644 --- a/talpid-core/src/routing/linux/mod.rs +++ b/talpid-core/src/routing/linux/mod.rs @@ -2,7 +2,7 @@ use super::{NetNode, Node, Route}; use ipnetwork::IpNetwork; use std::{ - collections::{HashMap, HashSet, VecDeque}, + collections::{HashMap, HashSet}, io, process::{Command, Stdio}, }; @@ -10,21 +10,21 @@ use std::{ mod change_listener; use change_listener::{Error as RouteChangeListenerError, RouteChangeListener}; -use futures::{sync::oneshot, Async, Future, IntoFuture, Stream}; -use tokio_process::CommandExt; +use futures::{sync::oneshot, Async, Future, Stream}; pub type Result<T> = std::result::Result<T, Error>; /// Errors that can happen in the Linux routing integration #[derive(err_derive::Error, Debug)] +#[error(no_from)] pub enum Error { /// Failed to add route. #[error(display = "Failed to add route")] - FailedToAddRoute, + FailedToAddRoute(#[error(source)] io::Error), /// Failed to remove route. #[error(display = "Failed to remove route")] - FailedToRemoveRoute, + FailedToRemoveRoute(#[error(source)] io::Error), /// Error while running "ip route". #[error(display = "Error while running \"ip route\"")] @@ -63,10 +63,6 @@ pub struct RouteManagerImpl { best_default_node_v4: Option<Node>, best_default_node_v6: Option<Node>, - // pending changes - needed_changes: VecDeque<RouteChange>, - pending_change: Option<PendingChange>, - // if the stop channel is set, the future should wind down - remove added routes and send a // signal. shutdown_finished_tx: Option<oneshot::Sender<()>>, @@ -84,7 +80,6 @@ impl RouteManagerImpl { let mut required_normal_routes = HashSet::new(); let mut required_default_routes = HashSet::new(); - let mut added_routes = HashSet::new(); for (destination, node) in required_routes { match node { @@ -97,69 +92,41 @@ impl RouteManagerImpl { } } - let default_routes = Self::get_default_routes().wait()?; + let default_routes = Self::get_default_routes()?; let best_default_node_v4 = Self::pick_best_default_node(&default_routes, true); let best_default_node_v6 = Self::pick_best_default_node(&default_routes, false); - let mut establish_baseline_fn = || -> Result<()> { - for normal_route in required_normal_routes.iter() { - Self::add_route(&normal_route).wait()?; - added_routes.insert(normal_route.clone()); - } - - for prefix in required_default_routes.iter() { - match ( - prefix.is_ipv4(), - &best_default_node_v4, - &best_default_node_v6, - ) { - (false, _, Some(default_node)) | (true, Some(default_node), _) => { - // best to pick a single node identifier rather than device + ip - let route = Route::new(default_node.clone(), *prefix); - Self::add_route(&route).wait()?; - added_routes.insert(route); - } - // at this point in time, there exists no default route for the given IP version - // so no routes will be added. The assumption is that routing ipv4 through ipv6 - // nodes may or may not be bonkers - _ => continue, - } - } - Ok(()) - }; - - if let Err(e) = establish_baseline_fn() { - for setup_route in added_routes { - if let Err(removal_err) = Self::delete_route(&setup_route).wait() { - log::error!( - "Failed to remove route whilst cleaning up failed initialization -of route monitor -{}", - removal_err - ); - } - } - return Err(e); - } - - - Ok(Self { + let mut monitor = Self { changes, required_default_routes, - added_routes, + added_routes: HashSet::new(), default_routes, best_default_node_v4, best_default_node_v6, - needed_changes: VecDeque::new(), - pending_change: None, - shutdown_finished_tx: None, shutdown_rx, should_shut_down: false, - }) + }; + for normal_route in required_normal_routes.iter() { + monitor.add_route(&normal_route)?; + } + + for prefix in monitor.required_default_routes.clone().into_iter() { + if let (false, _, Some(default_node)) | (true, Some(default_node), _) = ( + prefix.is_ipv4(), + &monitor.best_default_node_v4, + &monitor.best_default_node_v6, + ) { + // best to pick a single node identifier rather than device + ip + let route = Route::new(default_node.clone(), prefix); + monitor.add_route(&route)?; + } + } + Ok(monitor) } fn process_route_table_change(&mut self) -> Result<()> { @@ -175,33 +142,19 @@ of route monitor -{}", } fn process_new_route(&mut self, route: Route) { - self.needed_changes.retain(|change| { - if let RouteChange::Add(old_route) = change { - old_route != &route - } else { - true - } - }); if route.prefix.prefix() == 0 { self.default_routes.insert(route); - self.update_default_rotues(); + self.update_default_routes(); } } fn process_deleted_route(&mut self, route: Route) { - self.needed_changes.retain(|change| { - if let RouteChange::Remove(old_route) = change { - old_route != &route - } else { - true - } - }); if route.prefix.prefix() == 0 { - self.update_default_rotues(); + self.update_default_routes(); } } - fn update_default_rotues(&mut self) { + fn update_default_routes(&mut self) { let new_best_v4 = Self::pick_best_default_node(&self.default_routes, true); if self.best_default_node_v4 != new_best_v4 && new_best_v4.is_some() { let new_node = new_best_v4.unwrap(); @@ -213,17 +166,16 @@ of route monitor -{}", .cloned() .collect(); for destination in v4_destinations { + let new_route = Route::new(new_node.clone(), destination); if let Some(old_node) = &old_node { - self.enque_route_change(RouteChange::Remove(Route::new( - old_node.clone(), - destination, - ))); + let old_route = Route::new(old_node.clone(), destination); + if let Err(e) = self.delete_route(&old_route) { + log::error!("Failed to remove old route {} - {}", &old_route, e); + } + } + if let Err(e) = self.add_route(&new_route) { + log::error!("Failed to add new route {} - {}", &new_node, e); } - - self.enque_route_change(RouteChange::Add(Route::new( - new_node.clone(), - destination, - ))); } self.best_default_node_v4 = Some(new_node); } @@ -240,38 +192,22 @@ of route monitor -{}", .collect(); for destination in v6_destinations { + let new_route = Route::new(new_node.clone(), destination); if let Some(old_node) = &old_node { - self.enque_route_change(RouteChange::Remove(Route::new( - old_node.clone(), - destination, - ))); + let old_route = Route::new(old_node.clone(), destination); + + if let Err(e) = self.delete_route(&old_route) { + log::error!("Failed to remove old route {} - {}", &old_route, e); + } + } + if let Err(e) = self.add_route(&new_route) { + log::error!("Failed to add new route {} - {}", &new_node, e); } - self.enque_route_change(RouteChange::Add(Route::new( - new_node.clone(), - destination, - ))); } self.best_default_node_v6 = Some(new_node); } } - fn enque_route_change(&mut self, route_change: RouteChange) { - // Only add a route change to the queue of changes if a change like this doesn't exist - // already. - if self - .pending_change - .as_ref() - .map(|pending_change| pending_change.change != route_change) - .unwrap_or(true) - && self - .needed_changes - .iter() - .all(|enqued_change| enqued_change != &route_change) - { - self.needed_changes.push_back(route_change); - } - } - fn pick_best_default_node(routes: &HashSet<Route>, v4: bool) -> Option<Node> { // Pick the route with the lowest metric - thus the most favourable route. routes @@ -293,46 +229,6 @@ of route monitor -{}", .map(|route| route.node) } - // Try and apply changes to the routing table if any are necessary. - // Returns true if no more changes are to be made. - fn apply_route_table_changes(&mut self) -> Result<bool> { - let mut should_stop = false; - while !should_stop { - if self.pending_change.is_none() { - if let Some(change) = self.needed_changes.pop_front() { - let process = match &change { - RouteChange::Add(route) => Self::add_route(route), - RouteChange::Remove(route) => Self::delete_route(route), - }; - self.pending_change = Some(PendingChange { change, process }); - } - } - - if let Some(mut change) = self.pending_change.take() { - match change.process.poll()? { - Async::NotReady => { - self.pending_change = Some(change); - should_stop = true; - } - Async::Ready(_) => { - match change.change { - RouteChange::Add(route) => { - self.added_routes.insert(route); - } - RouteChange::Remove(route) => { - self.added_routes.remove(&route); - } - }; - } - }; - } else { - should_stop = true; - } - } - - Ok(self.pending_change.is_none() && self.needed_changes.is_empty()) - } - fn route_cmd(action: &str, route: &Route) -> Command { let mut cmd = Command::new("ip"); @@ -354,74 +250,65 @@ of route monitor -{}", cmd } - fn run_cmd(mut cmd: Command, err: Error) -> Box<dyn Future<Item = (), Error = Error> + Send> { + fn run_cmd(mut cmd: Command, err: impl Fn(io::Error) -> Error) -> Result<()> { log::trace!("running cmd - {:?}", &cmd); - Box::new( - cmd.spawn_async() - .into_future() - .flatten() - .map_err(Error::FailedToRunIp) - .and_then(|exit_status| { - if exit_status.success() { - Ok(()) - } else { - Err(err) - } - }), - ) + cmd.output().map_err(err).map(|_| ()) } - fn get_default_routes_inner( - ip_version: IpVersion, - ) -> impl Future<Item = Vec<Route>, Error = Error> { + fn get_default_routes_inner(ip_version: IpVersion) -> Result<Vec<Route>> { let mut cmd = Command::new("ip"); cmd.arg(ip_version.to_route_arg()).arg("route").arg("show"); - Box::new( - cmd.stdout(Stdio::piped()) - .spawn_async() - .map_err(Error::FailedToRunIp) - .into_future() - .and_then(|proc| proc.wait_with_output().map_err(Error::FailedToRunIp)) - .and_then(move |output| { - let output_lines = String::from_utf8(output.stdout.clone()) - .map_err(|_| Error::UnexpectedOutput)?; - Ok(output_lines - .lines() - .filter_map(|line| { - if line.starts_with("default") { - parse_ip_route_show_line(line, ip_version) - } else { - None - } - }) - .collect()) - }), - ) + cmd.stdout(Stdio::piped()) + .output() + .map_err(Error::FailedToRunIp) + .and_then(move |output| { + let output_lines = String::from_utf8(output.stdout.clone()) + .map_err(|_| Error::UnexpectedOutput)?; + Ok(output_lines + .lines() + .filter_map(|line| { + if line.starts_with("default") { + parse_ip_route_show_line(line, ip_version) + } else { + None + } + }) + .collect()) + }) } /// Adds routes to the system routing table. - fn add_route(route: &Route) -> Box<dyn Future<Item = (), Error = Error> + Send> { + fn add_route(&mut self, route: &Route) -> Result<()> { let cmd = Self::route_cmd("replace", route); - Self::run_cmd(cmd, Error::FailedToAddRoute) + Self::run_cmd(cmd, Error::FailedToAddRoute)?; + self.added_routes.insert(route.clone()); + Ok(()) } /// Removes previously set routes. If routes were set for specific tables, the whole tables /// will be removed. - fn delete_route(route: &Route) -> Box<dyn Future<Item = (), Error = Error> + Send> { + fn delete_route(&mut self, route: &Route) -> Result<()> { let cmd = Self::route_cmd("delete", route); - Self::run_cmd(cmd, Error::FailedToRemoveRoute) + Self::run_cmd(cmd, Error::FailedToRemoveRoute)?; + self.added_routes.remove(route); + Ok(()) + } + + fn cleanup_routes(&mut self) { + for route in self.added_routes.drain().collect::<Vec<_>>().iter() { + if let Err(e) = self.delete_route(&route) { + log::error!("Failed to remove route - {} - {}", route, e); + } + } } + /// Retrieves the gateway for the default route - fn get_default_routes() -> Box<dyn Future<Item = HashSet<Route>, Error = Error> + Send> { - Box::new( - Self::get_default_routes_inner(IpVersion::V4) - .join(Self::get_default_routes_inner(IpVersion::V6)) - .map(|(v4_routes, v6_routes)| { - v4_routes.into_iter().chain(v6_routes.into_iter()).collect() - }), - ) + fn get_default_routes() -> Result<HashSet<Route>> { + let v4_routes = Self::get_default_routes_inner(IpVersion::V4)?; + let v6_routes = Self::get_default_routes_inner(IpVersion::V6)?; + Ok(v4_routes.into_iter().chain(v6_routes.into_iter()).collect()) } } @@ -457,8 +344,8 @@ impl Future for RouteManagerImpl { }; self.process_route_table_change()?; } - let all_changes_applied = self.apply_route_table_changes()?; - if all_changes_applied && self.should_shut_down { + if self.should_shut_down { + self.cleanup_routes(); if let Some(tx) = self.shutdown_finished_tx.take() { if tx.send(()).is_err() { log::error!("RouteManagerHandle already stopped"); @@ -471,6 +358,12 @@ impl Future for RouteManagerImpl { } } +impl Drop for RouteManagerImpl { + fn drop(&mut self) { + self.cleanup_routes(); + } +} + // intended to parse lines sucha as the following: // default via 192.168.1.1 dev wlp61s0 proto dhcp metric 600 fn parse_ip_route_show_line(line: &str, ip_version: IpVersion) -> Option<Route> { @@ -534,8 +427,3 @@ enum RouteChange { Add(Route), Remove(Route), } - -struct PendingChange { - change: RouteChange, - process: Box<dyn Future<Item = (), Error = Error> + Send>, -} |
