diff options
| author | Emīls <emils@mullvad.net> | 2019-11-27 12:12:11 +0000 |
|---|---|---|
| committer | Emīls <emils@mullvad.net> | 2019-11-27 12:12:11 +0000 |
| commit | 9e326e4b058d2937391e086d7c87d4efc930365b (patch) | |
| tree | 8f22494d939aac49b7cdd7542328eadef3cb5807 | |
| parent | b2486cd53338f44dd721166c5db02faef4b7b396 (diff) | |
| parent | 1aa94c35e25a3892b991379982244785f1a9e0f7 (diff) | |
| download | mullvadvpn-9e326e4b058d2937391e086d7c87d4efc930365b.tar.xz mullvadvpn-9e326e4b058d2937391e086d7c87d4efc930365b.zip | |
Merge branch 'linux-routing-use-sync-subprocess'
| -rw-r--r-- | CHANGELOG.md | 1 | ||||
| -rw-r--r-- | talpid-core/src/routing/linux/mod.rs | 300 | ||||
| -rw-r--r-- | talpid-core/src/routing/mod.rs | 25 |
3 files changed, 119 insertions, 207 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index f8d7728f7a..a6dcdd8f19 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -49,6 +49,7 @@ Line wrap the file at 100 chars. Th - Improve stability on Linux by using the routing netlink socket in its own thread. - When trying to use `resolvconf` for managing DNS, the daemon will check if `dnsmasq` is running and misconfigured. +- Improve stability on Linux by simplifying route management code. #### Windows - Detect removal of the OpenVPN TAP adapter on reconnection attempts. diff --git a/talpid-core/src/routing/linux/mod.rs b/talpid-core/src/routing/linux/mod.rs index 98fc929db7..00020eccbc 100644 --- a/talpid-core/src/routing/linux/mod.rs +++ b/talpid-core/src/routing/linux/mod.rs @@ -2,7 +2,7 @@ use super::{NetNode, Node, Route}; use ipnetwork::IpNetwork; use std::{ - collections::{HashMap, HashSet, VecDeque}, + collections::{HashMap, HashSet}, io, process::{Command, Stdio}, }; @@ -10,21 +10,21 @@ use std::{ mod change_listener; use change_listener::{Error as RouteChangeListenerError, RouteChangeListener}; -use futures::{sync::oneshot, Async, Future, IntoFuture, Stream}; -use tokio_process::CommandExt; +use futures::{sync::oneshot, Async, Future, Stream}; pub type Result<T> = std::result::Result<T, Error>; /// Errors that can happen in the Linux routing integration #[derive(err_derive::Error, Debug)] +#[error(no_from)] pub enum Error { /// Failed to add route. #[error(display = "Failed to add route")] - FailedToAddRoute, + FailedToAddRoute(#[error(source)] io::Error), /// Failed to remove route. #[error(display = "Failed to remove route")] - FailedToRemoveRoute, + FailedToRemoveRoute(#[error(source)] io::Error), /// Error while running "ip route". #[error(display = "Error while running \"ip route\"")] @@ -63,10 +63,6 @@ pub struct RouteManagerImpl { best_default_node_v4: Option<Node>, best_default_node_v6: Option<Node>, - // pending changes - needed_changes: VecDeque<RouteChange>, - pending_change: Option<PendingChange>, - // if the stop channel is set, the future should wind down - remove added routes and send a // signal. shutdown_finished_tx: Option<oneshot::Sender<()>>, @@ -84,7 +80,6 @@ impl RouteManagerImpl { let mut required_normal_routes = HashSet::new(); let mut required_default_routes = HashSet::new(); - let mut added_routes = HashSet::new(); for (destination, node) in required_routes { match node { @@ -97,69 +92,41 @@ impl RouteManagerImpl { } } - let default_routes = Self::get_default_routes().wait()?; + let default_routes = Self::get_default_routes()?; let best_default_node_v4 = Self::pick_best_default_node(&default_routes, true); let best_default_node_v6 = Self::pick_best_default_node(&default_routes, false); - let mut establish_baseline_fn = || -> Result<()> { - for normal_route in required_normal_routes.iter() { - Self::add_route(&normal_route).wait()?; - added_routes.insert(normal_route.clone()); - } - - for prefix in required_default_routes.iter() { - match ( - prefix.is_ipv4(), - &best_default_node_v4, - &best_default_node_v6, - ) { - (false, _, Some(default_node)) | (true, Some(default_node), _) => { - // best to pick a single node identifier rather than device + ip - let route = Route::new(default_node.clone(), *prefix); - Self::add_route(&route).wait()?; - added_routes.insert(route); - } - // at this point in time, there exists no default route for the given IP version - // so no routes will be added. The assumption is that routing ipv4 through ipv6 - // nodes may or may not be bonkers - _ => continue, - } - } - Ok(()) - }; - - if let Err(e) = establish_baseline_fn() { - for setup_route in added_routes { - if let Err(removal_err) = Self::delete_route(&setup_route).wait() { - log::error!( - "Failed to remove route whilst cleaning up failed initialization -of route monitor -{}", - removal_err - ); - } - } - return Err(e); - } - - - Ok(Self { + let mut monitor = Self { changes, required_default_routes, - added_routes, + added_routes: HashSet::new(), default_routes, best_default_node_v4, best_default_node_v6, - needed_changes: VecDeque::new(), - pending_change: None, - shutdown_finished_tx: None, shutdown_rx, should_shut_down: false, - }) + }; + for normal_route in required_normal_routes.iter() { + monitor.add_route(&normal_route)?; + } + + for prefix in monitor.required_default_routes.clone().into_iter() { + if let (false, _, Some(default_node)) | (true, Some(default_node), _) = ( + prefix.is_ipv4(), + &monitor.best_default_node_v4, + &monitor.best_default_node_v6, + ) { + // best to pick a single node identifier rather than device + ip + let route = Route::new(default_node.clone(), prefix); + monitor.add_route(&route)?; + } + } + Ok(monitor) } fn process_route_table_change(&mut self) -> Result<()> { @@ -175,33 +142,19 @@ of route monitor -{}", } fn process_new_route(&mut self, route: Route) { - self.needed_changes.retain(|change| { - if let RouteChange::Add(old_route) = change { - old_route != &route - } else { - true - } - }); if route.prefix.prefix() == 0 { self.default_routes.insert(route); - self.update_default_rotues(); + self.update_default_routes(); } } fn process_deleted_route(&mut self, route: Route) { - self.needed_changes.retain(|change| { - if let RouteChange::Remove(old_route) = change { - old_route != &route - } else { - true - } - }); if route.prefix.prefix() == 0 { - self.update_default_rotues(); + self.update_default_routes(); } } - fn update_default_rotues(&mut self) { + fn update_default_routes(&mut self) { let new_best_v4 = Self::pick_best_default_node(&self.default_routes, true); if self.best_default_node_v4 != new_best_v4 && new_best_v4.is_some() { let new_node = new_best_v4.unwrap(); @@ -213,17 +166,16 @@ of route monitor -{}", .cloned() .collect(); for destination in v4_destinations { + let new_route = Route::new(new_node.clone(), destination); if let Some(old_node) = &old_node { - self.enque_route_change(RouteChange::Remove(Route::new( - old_node.clone(), - destination, - ))); + let old_route = Route::new(old_node.clone(), destination); + if let Err(e) = self.delete_route(&old_route) { + log::error!("Failed to remove old route {} - {}", &old_route, e); + } + } + if let Err(e) = self.add_route(&new_route) { + log::error!("Failed to add new route {} - {}", &new_node, e); } - - self.enque_route_change(RouteChange::Add(Route::new( - new_node.clone(), - destination, - ))); } self.best_default_node_v4 = Some(new_node); } @@ -240,38 +192,22 @@ of route monitor -{}", .collect(); for destination in v6_destinations { + let new_route = Route::new(new_node.clone(), destination); if let Some(old_node) = &old_node { - self.enque_route_change(RouteChange::Remove(Route::new( - old_node.clone(), - destination, - ))); + let old_route = Route::new(old_node.clone(), destination); + + if let Err(e) = self.delete_route(&old_route) { + log::error!("Failed to remove old route {} - {}", &old_route, e); + } + } + if let Err(e) = self.add_route(&new_route) { + log::error!("Failed to add new route {} - {}", &new_node, e); } - self.enque_route_change(RouteChange::Add(Route::new( - new_node.clone(), - destination, - ))); } self.best_default_node_v6 = Some(new_node); } } - fn enque_route_change(&mut self, route_change: RouteChange) { - // Only add a route change to the queue of changes if a change like this doesn't exist - // already. - if self - .pending_change - .as_ref() - .map(|pending_change| pending_change.change != route_change) - .unwrap_or(true) - && self - .needed_changes - .iter() - .all(|enqued_change| enqued_change != &route_change) - { - self.needed_changes.push_back(route_change); - } - } - fn pick_best_default_node(routes: &HashSet<Route>, v4: bool) -> Option<Node> { // Pick the route with the lowest metric - thus the most favourable route. routes @@ -293,46 +229,6 @@ of route monitor -{}", .map(|route| route.node) } - // Try and apply changes to the routing table if any are necessary. - // Returns true if no more changes are to be made. - fn apply_route_table_changes(&mut self) -> Result<bool> { - let mut should_stop = false; - while !should_stop { - if self.pending_change.is_none() { - if let Some(change) = self.needed_changes.pop_front() { - let process = match &change { - RouteChange::Add(route) => Self::add_route(route), - RouteChange::Remove(route) => Self::delete_route(route), - }; - self.pending_change = Some(PendingChange { change, process }); - } - } - - if let Some(mut change) = self.pending_change.take() { - match change.process.poll()? { - Async::NotReady => { - self.pending_change = Some(change); - should_stop = true; - } - Async::Ready(_) => { - match change.change { - RouteChange::Add(route) => { - self.added_routes.insert(route); - } - RouteChange::Remove(route) => { - self.added_routes.remove(&route); - } - }; - } - }; - } else { - should_stop = true; - } - } - - Ok(self.pending_change.is_none() && self.needed_changes.is_empty()) - } - fn route_cmd(action: &str, route: &Route) -> Command { let mut cmd = Command::new("ip"); @@ -354,74 +250,65 @@ of route monitor -{}", cmd } - fn run_cmd(mut cmd: Command, err: Error) -> Box<dyn Future<Item = (), Error = Error> + Send> { + fn run_cmd(mut cmd: Command, err: impl Fn(io::Error) -> Error) -> Result<()> { log::trace!("running cmd - {:?}", &cmd); - Box::new( - cmd.spawn_async() - .into_future() - .flatten() - .map_err(Error::FailedToRunIp) - .and_then(|exit_status| { - if exit_status.success() { - Ok(()) - } else { - Err(err) - } - }), - ) + cmd.output().map_err(err).map(|_| ()) } - fn get_default_routes_inner( - ip_version: IpVersion, - ) -> impl Future<Item = Vec<Route>, Error = Error> { + fn get_default_routes_inner(ip_version: IpVersion) -> Result<Vec<Route>> { let mut cmd = Command::new("ip"); cmd.arg(ip_version.to_route_arg()).arg("route").arg("show"); - Box::new( - cmd.stdout(Stdio::piped()) - .spawn_async() - .map_err(Error::FailedToRunIp) - .into_future() - .and_then(|proc| proc.wait_with_output().map_err(Error::FailedToRunIp)) - .and_then(move |output| { - let output_lines = String::from_utf8(output.stdout.clone()) - .map_err(|_| Error::UnexpectedOutput)?; - Ok(output_lines - .lines() - .filter_map(|line| { - if line.starts_with("default") { - parse_ip_route_show_line(line, ip_version) - } else { - None - } - }) - .collect()) - }), - ) + cmd.stdout(Stdio::piped()) + .output() + .map_err(Error::FailedToRunIp) + .and_then(move |output| { + let output_lines = String::from_utf8(output.stdout.clone()) + .map_err(|_| Error::UnexpectedOutput)?; + Ok(output_lines + .lines() + .filter_map(|line| { + if line.starts_with("default") { + parse_ip_route_show_line(line, ip_version) + } else { + None + } + }) + .collect()) + }) } /// Adds routes to the system routing table. - fn add_route(route: &Route) -> Box<dyn Future<Item = (), Error = Error> + Send> { + fn add_route(&mut self, route: &Route) -> Result<()> { let cmd = Self::route_cmd("replace", route); - Self::run_cmd(cmd, Error::FailedToAddRoute) + Self::run_cmd(cmd, Error::FailedToAddRoute)?; + self.added_routes.insert(route.clone()); + Ok(()) } /// Removes previously set routes. If routes were set for specific tables, the whole tables /// will be removed. - fn delete_route(route: &Route) -> Box<dyn Future<Item = (), Error = Error> + Send> { + fn delete_route(&mut self, route: &Route) -> Result<()> { let cmd = Self::route_cmd("delete", route); - Self::run_cmd(cmd, Error::FailedToRemoveRoute) + Self::run_cmd(cmd, Error::FailedToRemoveRoute)?; + self.added_routes.remove(route); + Ok(()) + } + + fn cleanup_routes(&mut self) { + for route in self.added_routes.drain().collect::<Vec<_>>().iter() { + if let Err(e) = self.delete_route(&route) { + log::error!("Failed to remove route - {} - {}", route, e); + } + } } + /// Retrieves the gateway for the default route - fn get_default_routes() -> Box<dyn Future<Item = HashSet<Route>, Error = Error> + Send> { - Box::new( - Self::get_default_routes_inner(IpVersion::V4) - .join(Self::get_default_routes_inner(IpVersion::V6)) - .map(|(v4_routes, v6_routes)| { - v4_routes.into_iter().chain(v6_routes.into_iter()).collect() - }), - ) + fn get_default_routes() -> Result<HashSet<Route>> { + let v4_routes = Self::get_default_routes_inner(IpVersion::V4)?; + let v6_routes = Self::get_default_routes_inner(IpVersion::V6)?; + Ok(v4_routes.into_iter().chain(v6_routes.into_iter()).collect()) } } @@ -457,8 +344,8 @@ impl Future for RouteManagerImpl { }; self.process_route_table_change()?; } - let all_changes_applied = self.apply_route_table_changes()?; - if all_changes_applied && self.should_shut_down { + if self.should_shut_down { + self.cleanup_routes(); if let Some(tx) = self.shutdown_finished_tx.take() { if tx.send(()).is_err() { log::error!("RouteManagerHandle already stopped"); @@ -471,6 +358,12 @@ impl Future for RouteManagerImpl { } } +impl Drop for RouteManagerImpl { + fn drop(&mut self) { + self.cleanup_routes(); + } +} + // intended to parse lines sucha as the following: // default via 192.168.1.1 dev wlp61s0 proto dhcp metric 600 fn parse_ip_route_show_line(line: &str, ip_version: IpVersion) -> Option<Route> { @@ -534,8 +427,3 @@ enum RouteChange { Add(Route), Remove(Route), } - -struct PendingChange { - change: RouteChange, - process: Box<dyn Future<Item = (), Error = Error> + Send>, -} diff --git a/talpid-core/src/routing/mod.rs b/talpid-core/src/routing/mod.rs index a01ef01723..517e9e7d44 100644 --- a/talpid-core/src/routing/mod.rs +++ b/talpid-core/src/routing/mod.rs @@ -3,7 +3,7 @@ // TODO: remove the allow(dead_code) for android once it's up to scratch. use futures::{sync::oneshot, Future}; use ipnetwork::IpNetwork; -use std::{collections::HashMap, net::IpAddr}; +use std::{collections::HashMap, fmt, net::IpAddr}; #[cfg(target_os = "macos")] #[path = "macos.rs"] @@ -144,6 +144,16 @@ impl Route { } } +impl fmt::Display for Route { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{} via {}", self.prefix, self.node)?; + if let Some(metric) = &self.metric { + write!(f, " metric {}", *metric)?; + } + Ok(()) + } +} + /// A network route that should be applied by the RouteManager. /// It can either be routed through a specific network node or it can be routed through the current /// default route. @@ -224,3 +234,16 @@ impl Node { self.device.as_ref().map(|s| s.as_ref()) } } + +impl fmt::Display for Node { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + if let Some(ip) = &self.ip { + write!(f, "{}", ip)?; + } + if let Some(device) = &self.device { + let extra_space = if self.ip.is_some() { " " } else { "" }; + write!(f, "{}dev {}", extra_space, device)?; + } + Ok(()) + } +} |
