summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorEmīls <emils@mullvad.net>2019-11-27 12:12:11 +0000
committerEmīls <emils@mullvad.net>2019-11-27 12:12:11 +0000
commit9e326e4b058d2937391e086d7c87d4efc930365b (patch)
tree8f22494d939aac49b7cdd7542328eadef3cb5807
parentb2486cd53338f44dd721166c5db02faef4b7b396 (diff)
parent1aa94c35e25a3892b991379982244785f1a9e0f7 (diff)
downloadmullvadvpn-9e326e4b058d2937391e086d7c87d4efc930365b.tar.xz
mullvadvpn-9e326e4b058d2937391e086d7c87d4efc930365b.zip
Merge branch 'linux-routing-use-sync-subprocess'
-rw-r--r--CHANGELOG.md1
-rw-r--r--talpid-core/src/routing/linux/mod.rs300
-rw-r--r--talpid-core/src/routing/mod.rs25
3 files changed, 119 insertions, 207 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index f8d7728f7a..a6dcdd8f19 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -49,6 +49,7 @@ Line wrap the file at 100 chars. Th
- Improve stability on Linux by using the routing netlink socket in its own thread.
- When trying to use `resolvconf` for managing DNS, the daemon will check if
`dnsmasq` is running and misconfigured.
+- Improve stability on Linux by simplifying route management code.
#### Windows
- Detect removal of the OpenVPN TAP adapter on reconnection attempts.
diff --git a/talpid-core/src/routing/linux/mod.rs b/talpid-core/src/routing/linux/mod.rs
index 98fc929db7..00020eccbc 100644
--- a/talpid-core/src/routing/linux/mod.rs
+++ b/talpid-core/src/routing/linux/mod.rs
@@ -2,7 +2,7 @@ use super::{NetNode, Node, Route};
use ipnetwork::IpNetwork;
use std::{
- collections::{HashMap, HashSet, VecDeque},
+ collections::{HashMap, HashSet},
io,
process::{Command, Stdio},
};
@@ -10,21 +10,21 @@ use std::{
mod change_listener;
use change_listener::{Error as RouteChangeListenerError, RouteChangeListener};
-use futures::{sync::oneshot, Async, Future, IntoFuture, Stream};
-use tokio_process::CommandExt;
+use futures::{sync::oneshot, Async, Future, Stream};
pub type Result<T> = std::result::Result<T, Error>;
/// Errors that can happen in the Linux routing integration
#[derive(err_derive::Error, Debug)]
+#[error(no_from)]
pub enum Error {
/// Failed to add route.
#[error(display = "Failed to add route")]
- FailedToAddRoute,
+ FailedToAddRoute(#[error(source)] io::Error),
/// Failed to remove route.
#[error(display = "Failed to remove route")]
- FailedToRemoveRoute,
+ FailedToRemoveRoute(#[error(source)] io::Error),
/// Error while running "ip route".
#[error(display = "Error while running \"ip route\"")]
@@ -63,10 +63,6 @@ pub struct RouteManagerImpl {
best_default_node_v4: Option<Node>,
best_default_node_v6: Option<Node>,
- // pending changes
- needed_changes: VecDeque<RouteChange>,
- pending_change: Option<PendingChange>,
-
// if the stop channel is set, the future should wind down - remove added routes and send a
// signal.
shutdown_finished_tx: Option<oneshot::Sender<()>>,
@@ -84,7 +80,6 @@ impl RouteManagerImpl {
let mut required_normal_routes = HashSet::new();
let mut required_default_routes = HashSet::new();
- let mut added_routes = HashSet::new();
for (destination, node) in required_routes {
match node {
@@ -97,69 +92,41 @@ impl RouteManagerImpl {
}
}
- let default_routes = Self::get_default_routes().wait()?;
+ let default_routes = Self::get_default_routes()?;
let best_default_node_v4 = Self::pick_best_default_node(&default_routes, true);
let best_default_node_v6 = Self::pick_best_default_node(&default_routes, false);
- let mut establish_baseline_fn = || -> Result<()> {
- for normal_route in required_normal_routes.iter() {
- Self::add_route(&normal_route).wait()?;
- added_routes.insert(normal_route.clone());
- }
-
- for prefix in required_default_routes.iter() {
- match (
- prefix.is_ipv4(),
- &best_default_node_v4,
- &best_default_node_v6,
- ) {
- (false, _, Some(default_node)) | (true, Some(default_node), _) => {
- // best to pick a single node identifier rather than device + ip
- let route = Route::new(default_node.clone(), *prefix);
- Self::add_route(&route).wait()?;
- added_routes.insert(route);
- }
- // at this point in time, there exists no default route for the given IP version
- // so no routes will be added. The assumption is that routing ipv4 through ipv6
- // nodes may or may not be bonkers
- _ => continue,
- }
- }
- Ok(())
- };
-
- if let Err(e) = establish_baseline_fn() {
- for setup_route in added_routes {
- if let Err(removal_err) = Self::delete_route(&setup_route).wait() {
- log::error!(
- "Failed to remove route whilst cleaning up failed initialization
-of route monitor -{}",
- removal_err
- );
- }
- }
- return Err(e);
- }
-
-
- Ok(Self {
+ let mut monitor = Self {
changes,
required_default_routes,
- added_routes,
+ added_routes: HashSet::new(),
default_routes,
best_default_node_v4,
best_default_node_v6,
- needed_changes: VecDeque::new(),
- pending_change: None,
-
shutdown_finished_tx: None,
shutdown_rx,
should_shut_down: false,
- })
+ };
+ for normal_route in required_normal_routes.iter() {
+ monitor.add_route(&normal_route)?;
+ }
+
+ for prefix in monitor.required_default_routes.clone().into_iter() {
+ if let (false, _, Some(default_node)) | (true, Some(default_node), _) = (
+ prefix.is_ipv4(),
+ &monitor.best_default_node_v4,
+ &monitor.best_default_node_v6,
+ ) {
+ // best to pick a single node identifier rather than device + ip
+ let route = Route::new(default_node.clone(), prefix);
+ monitor.add_route(&route)?;
+ }
+ }
+ Ok(monitor)
}
fn process_route_table_change(&mut self) -> Result<()> {
@@ -175,33 +142,19 @@ of route monitor -{}",
}
fn process_new_route(&mut self, route: Route) {
- self.needed_changes.retain(|change| {
- if let RouteChange::Add(old_route) = change {
- old_route != &route
- } else {
- true
- }
- });
if route.prefix.prefix() == 0 {
self.default_routes.insert(route);
- self.update_default_rotues();
+ self.update_default_routes();
}
}
fn process_deleted_route(&mut self, route: Route) {
- self.needed_changes.retain(|change| {
- if let RouteChange::Remove(old_route) = change {
- old_route != &route
- } else {
- true
- }
- });
if route.prefix.prefix() == 0 {
- self.update_default_rotues();
+ self.update_default_routes();
}
}
- fn update_default_rotues(&mut self) {
+ fn update_default_routes(&mut self) {
let new_best_v4 = Self::pick_best_default_node(&self.default_routes, true);
if self.best_default_node_v4 != new_best_v4 && new_best_v4.is_some() {
let new_node = new_best_v4.unwrap();
@@ -213,17 +166,16 @@ of route monitor -{}",
.cloned()
.collect();
for destination in v4_destinations {
+ let new_route = Route::new(new_node.clone(), destination);
if let Some(old_node) = &old_node {
- self.enque_route_change(RouteChange::Remove(Route::new(
- old_node.clone(),
- destination,
- )));
+ let old_route = Route::new(old_node.clone(), destination);
+ if let Err(e) = self.delete_route(&old_route) {
+ log::error!("Failed to remove old route {} - {}", &old_route, e);
+ }
+ }
+ if let Err(e) = self.add_route(&new_route) {
+ log::error!("Failed to add new route {} - {}", &new_node, e);
}
-
- self.enque_route_change(RouteChange::Add(Route::new(
- new_node.clone(),
- destination,
- )));
}
self.best_default_node_v4 = Some(new_node);
}
@@ -240,38 +192,22 @@ of route monitor -{}",
.collect();
for destination in v6_destinations {
+ let new_route = Route::new(new_node.clone(), destination);
if let Some(old_node) = &old_node {
- self.enque_route_change(RouteChange::Remove(Route::new(
- old_node.clone(),
- destination,
- )));
+ let old_route = Route::new(old_node.clone(), destination);
+
+ if let Err(e) = self.delete_route(&old_route) {
+ log::error!("Failed to remove old route {} - {}", &old_route, e);
+ }
+ }
+ if let Err(e) = self.add_route(&new_route) {
+ log::error!("Failed to add new route {} - {}", &new_node, e);
}
- self.enque_route_change(RouteChange::Add(Route::new(
- new_node.clone(),
- destination,
- )));
}
self.best_default_node_v6 = Some(new_node);
}
}
- fn enque_route_change(&mut self, route_change: RouteChange) {
- // Only add a route change to the queue of changes if a change like this doesn't exist
- // already.
- if self
- .pending_change
- .as_ref()
- .map(|pending_change| pending_change.change != route_change)
- .unwrap_or(true)
- && self
- .needed_changes
- .iter()
- .all(|enqued_change| enqued_change != &route_change)
- {
- self.needed_changes.push_back(route_change);
- }
- }
-
fn pick_best_default_node(routes: &HashSet<Route>, v4: bool) -> Option<Node> {
// Pick the route with the lowest metric - thus the most favourable route.
routes
@@ -293,46 +229,6 @@ of route monitor -{}",
.map(|route| route.node)
}
- // Try and apply changes to the routing table if any are necessary.
- // Returns true if no more changes are to be made.
- fn apply_route_table_changes(&mut self) -> Result<bool> {
- let mut should_stop = false;
- while !should_stop {
- if self.pending_change.is_none() {
- if let Some(change) = self.needed_changes.pop_front() {
- let process = match &change {
- RouteChange::Add(route) => Self::add_route(route),
- RouteChange::Remove(route) => Self::delete_route(route),
- };
- self.pending_change = Some(PendingChange { change, process });
- }
- }
-
- if let Some(mut change) = self.pending_change.take() {
- match change.process.poll()? {
- Async::NotReady => {
- self.pending_change = Some(change);
- should_stop = true;
- }
- Async::Ready(_) => {
- match change.change {
- RouteChange::Add(route) => {
- self.added_routes.insert(route);
- }
- RouteChange::Remove(route) => {
- self.added_routes.remove(&route);
- }
- };
- }
- };
- } else {
- should_stop = true;
- }
- }
-
- Ok(self.pending_change.is_none() && self.needed_changes.is_empty())
- }
-
fn route_cmd(action: &str, route: &Route) -> Command {
let mut cmd = Command::new("ip");
@@ -354,74 +250,65 @@ of route monitor -{}",
cmd
}
- fn run_cmd(mut cmd: Command, err: Error) -> Box<dyn Future<Item = (), Error = Error> + Send> {
+ fn run_cmd(mut cmd: Command, err: impl Fn(io::Error) -> Error) -> Result<()> {
log::trace!("running cmd - {:?}", &cmd);
- Box::new(
- cmd.spawn_async()
- .into_future()
- .flatten()
- .map_err(Error::FailedToRunIp)
- .and_then(|exit_status| {
- if exit_status.success() {
- Ok(())
- } else {
- Err(err)
- }
- }),
- )
+ cmd.output().map_err(err).map(|_| ())
}
- fn get_default_routes_inner(
- ip_version: IpVersion,
- ) -> impl Future<Item = Vec<Route>, Error = Error> {
+ fn get_default_routes_inner(ip_version: IpVersion) -> Result<Vec<Route>> {
let mut cmd = Command::new("ip");
cmd.arg(ip_version.to_route_arg()).arg("route").arg("show");
- Box::new(
- cmd.stdout(Stdio::piped())
- .spawn_async()
- .map_err(Error::FailedToRunIp)
- .into_future()
- .and_then(|proc| proc.wait_with_output().map_err(Error::FailedToRunIp))
- .and_then(move |output| {
- let output_lines = String::from_utf8(output.stdout.clone())
- .map_err(|_| Error::UnexpectedOutput)?;
- Ok(output_lines
- .lines()
- .filter_map(|line| {
- if line.starts_with("default") {
- parse_ip_route_show_line(line, ip_version)
- } else {
- None
- }
- })
- .collect())
- }),
- )
+ cmd.stdout(Stdio::piped())
+ .output()
+ .map_err(Error::FailedToRunIp)
+ .and_then(move |output| {
+ let output_lines = String::from_utf8(output.stdout.clone())
+ .map_err(|_| Error::UnexpectedOutput)?;
+ Ok(output_lines
+ .lines()
+ .filter_map(|line| {
+ if line.starts_with("default") {
+ parse_ip_route_show_line(line, ip_version)
+ } else {
+ None
+ }
+ })
+ .collect())
+ })
}
/// Adds routes to the system routing table.
- fn add_route(route: &Route) -> Box<dyn Future<Item = (), Error = Error> + Send> {
+ fn add_route(&mut self, route: &Route) -> Result<()> {
let cmd = Self::route_cmd("replace", route);
- Self::run_cmd(cmd, Error::FailedToAddRoute)
+ Self::run_cmd(cmd, Error::FailedToAddRoute)?;
+ self.added_routes.insert(route.clone());
+ Ok(())
}
/// Removes previously set routes. If routes were set for specific tables, the whole tables
/// will be removed.
- fn delete_route(route: &Route) -> Box<dyn Future<Item = (), Error = Error> + Send> {
+ fn delete_route(&mut self, route: &Route) -> Result<()> {
let cmd = Self::route_cmd("delete", route);
- Self::run_cmd(cmd, Error::FailedToRemoveRoute)
+ Self::run_cmd(cmd, Error::FailedToRemoveRoute)?;
+ self.added_routes.remove(route);
+ Ok(())
+ }
+
+ fn cleanup_routes(&mut self) {
+ for route in self.added_routes.drain().collect::<Vec<_>>().iter() {
+ if let Err(e) = self.delete_route(&route) {
+ log::error!("Failed to remove route - {} - {}", route, e);
+ }
+ }
}
+
/// Retrieves the gateway for the default route
- fn get_default_routes() -> Box<dyn Future<Item = HashSet<Route>, Error = Error> + Send> {
- Box::new(
- Self::get_default_routes_inner(IpVersion::V4)
- .join(Self::get_default_routes_inner(IpVersion::V6))
- .map(|(v4_routes, v6_routes)| {
- v4_routes.into_iter().chain(v6_routes.into_iter()).collect()
- }),
- )
+ fn get_default_routes() -> Result<HashSet<Route>> {
+ let v4_routes = Self::get_default_routes_inner(IpVersion::V4)?;
+ let v6_routes = Self::get_default_routes_inner(IpVersion::V6)?;
+ Ok(v4_routes.into_iter().chain(v6_routes.into_iter()).collect())
}
}
@@ -457,8 +344,8 @@ impl Future for RouteManagerImpl {
};
self.process_route_table_change()?;
}
- let all_changes_applied = self.apply_route_table_changes()?;
- if all_changes_applied && self.should_shut_down {
+ if self.should_shut_down {
+ self.cleanup_routes();
if let Some(tx) = self.shutdown_finished_tx.take() {
if tx.send(()).is_err() {
log::error!("RouteManagerHandle already stopped");
@@ -471,6 +358,12 @@ impl Future for RouteManagerImpl {
}
}
+impl Drop for RouteManagerImpl {
+ fn drop(&mut self) {
+ self.cleanup_routes();
+ }
+}
+
// intended to parse lines sucha as the following:
// default via 192.168.1.1 dev wlp61s0 proto dhcp metric 600
fn parse_ip_route_show_line(line: &str, ip_version: IpVersion) -> Option<Route> {
@@ -534,8 +427,3 @@ enum RouteChange {
Add(Route),
Remove(Route),
}
-
-struct PendingChange {
- change: RouteChange,
- process: Box<dyn Future<Item = (), Error = Error> + Send>,
-}
diff --git a/talpid-core/src/routing/mod.rs b/talpid-core/src/routing/mod.rs
index a01ef01723..517e9e7d44 100644
--- a/talpid-core/src/routing/mod.rs
+++ b/talpid-core/src/routing/mod.rs
@@ -3,7 +3,7 @@
// TODO: remove the allow(dead_code) for android once it's up to scratch.
use futures::{sync::oneshot, Future};
use ipnetwork::IpNetwork;
-use std::{collections::HashMap, net::IpAddr};
+use std::{collections::HashMap, fmt, net::IpAddr};
#[cfg(target_os = "macos")]
#[path = "macos.rs"]
@@ -144,6 +144,16 @@ impl Route {
}
}
+impl fmt::Display for Route {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(f, "{} via {}", self.prefix, self.node)?;
+ if let Some(metric) = &self.metric {
+ write!(f, " metric {}", *metric)?;
+ }
+ Ok(())
+ }
+}
+
/// A network route that should be applied by the RouteManager.
/// It can either be routed through a specific network node or it can be routed through the current
/// default route.
@@ -224,3 +234,16 @@ impl Node {
self.device.as_ref().map(|s| s.as_ref())
}
}
+
+impl fmt::Display for Node {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ if let Some(ip) = &self.ip {
+ write!(f, "{}", ip)?;
+ }
+ if let Some(device) = &self.device {
+ let extra_space = if self.ip.is_some() { " " } else { "" };
+ write!(f, "{}dev {}", extra_space, device)?;
+ }
+ Ok(())
+ }
+}