summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorEmīls <emils@mullvad.net>2020-04-15 10:39:06 +0100
committerEmīls Piņķis <emils@mullvad.net>2020-04-27 11:17:00 +0100
commit6d7141d24d17f2642eb3bc19675dd05b57fb6265 (patch)
tree6112b0877b13da0fc7099bfb60c8bc933e4f2946
parent816326f503fbb3e8ba098b96060f95878e788326 (diff)
downloadmullvadvpn-6d7141d24d17f2642eb3bc19675dd05b57fb6265.tar.xz
mullvadvpn-6d7141d24d17f2642eb3bc19675dd05b57fb6265.zip
Rework routing on Linux
-rw-r--r--talpid-core/Cargo.toml12
-rw-r--r--talpid-core/src/routing/linux.rs666
-rw-r--r--talpid-core/src/routing/linux/change_listener.rs241
-rw-r--r--talpid-core/src/routing/linux/mod.rs440
-rw-r--r--talpid-core/src/routing/mod.rs4
-rw-r--r--talpid-core/src/routing/unix.rs2
6 files changed, 675 insertions, 690 deletions
diff --git a/talpid-core/Cargo.toml b/talpid-core/Cargo.toml
index 4ac081cd7e..3e85b7dfe5 100644
--- a/talpid-core/Cargo.toml
+++ b/talpid-core/Cargo.toml
@@ -13,6 +13,7 @@ cfg-if = "0.1"
duct = "0.13"
err-derive = "0.2.1"
futures01 = { package = "futures", version = "0.1" }
+futures = { package = "futures", version = "0.3", features = [ "compat" ]}
hex = "0.4"
ipnetwork = "0.15"
jsonrpc-core = { git = "https://github.com/mullvad/jsonrpc", branch = "mullvad-fork" }
@@ -50,10 +51,13 @@ dbus = "0.6"
failure = "0.1"
notify = "4.0"
resolv-conf = "0.6.1"
-rtnetlink = { git = "https://github.com/mullvad/netlink", rev = "f768adfcc8c6b064ef7ae3c792c4c21d0d96d0b5" }
-netlink-proto = { git = "https://github.com/mullvad/netlink", rev = "f768adfcc8c6b064ef7ae3c792c4c21d0d96d0b5" }
-netlink-packet = { git = "https://github.com/mullvad/netlink", rev = "f768adfcc8c6b064ef7ae3c792c4c21d0d96d0b5" }
-netlink-sys = { git = "https://github.com/mullvad/netlink", rev = "f768adfcc8c6b064ef7ae3c792c4c21d0d96d0b5" }
+async-stream = "0.2"
+rtnetlink = "0.2"
+netlink-packet-route = "0.2"
+netlink-proto = "0.2"
+netlink-sys = "0.2"
+futures = { package = "futures", version = "0.3" }
+tokio02 = { package = "tokio", version = "0.2", features = [ "rt-core", "rt-threaded"] }
nftnl = { version = "0.3", features = ["nftnl-1-1-0"] }
mnl = { version = "0.2.0", features = ["mnl-1-0-4"] }
which = { version = "3.1", default-features = false }
diff --git a/talpid-core/src/routing/linux.rs b/talpid-core/src/routing/linux.rs
new file mode 100644
index 0000000000..7d21fcf647
--- /dev/null
+++ b/talpid-core/src/routing/linux.rs
@@ -0,0 +1,666 @@
+use crate::routing::{NetNode, Node, Route};
+
+use ipnetwork::IpNetwork;
+use std::{
+ collections::{BTreeMap, HashMap, HashSet},
+ io,
+ net::IpAddr,
+};
+
+use futures01::sync::oneshot as old_oneshot;
+
+use futures::{
+ channel::mpsc::UnboundedReceiver, compat::Future01CompatExt, future::FutureExt, StreamExt,
+ TryStreamExt,
+};
+
+
+use netlink_packet_route::{
+ link::{nlas::Nla as LinkNla, LinkMessage},
+ route::{nlas::Nla as RouteNla, RouteHeader, RouteMessage},
+ rtnl::{
+ constants::{RTN_UNICAST, RTPROT_STATIC, RT_SCOPE_UNIVERSE, RT_TABLE_MAIN},
+ RouteFlags,
+ },
+ NetlinkMessage, NetlinkPayload, RtnlMessage,
+};
+use netlink_sys::SocketAddr;
+use rtnetlink::{
+ constants::{RTMGRP_IPV4_ROUTE, RTMGRP_IPV6_ROUTE, RTMGRP_LINK, RTMGRP_NOTIFY},
+ Handle, IpVersion,
+};
+
+use libc::{AF_INET, AF_INET6};
+
+
+pub type Result<T> = std::result::Result<T, Error>;
+
+/// Errors that can happen in the Linux routing integration
+#[derive(err_derive::Error, Debug)]
+#[error(no_from)]
+pub enum Error {
+ /// Failed to add route.
+ #[error(display = "Failed to add route")]
+ FailedToAddRoute(#[error(source)] io::Error),
+
+ /// Failed to remove route.
+ #[error(display = "Failed to remove route")]
+ FailedToRemoveRoute(#[error(source)] io::Error),
+
+ /// Error while running "ip route".
+ #[error(display = "Error while running \"ip route\"")]
+ FailedToRunIp(#[error(source)] io::Error),
+
+ /// Invocation of `ip route` ended with a non-zero exit code
+ #[error(display = "ip returend a non-zero exit code")]
+ ErrorIpFailed,
+
+ /// Received unexpected output from `ip route`
+ #[error(display = "Received unexpected output from \"ip\"")]
+ UnexpectedOutput,
+
+ /// No default route exists
+ #[error(display = "No default route in \"ip route\" output")]
+ NoDefaultRoute,
+
+ /// Route table change stream failed.
+ #[error(display = "Route change listener failed")]
+ NetlinkConnectionError(#[error(source)] failure::Compat<rtnetlink::Error>),
+
+ #[error(display = "Failed to open a netlink connection")]
+ ConnectError(#[error(source)] io::Error),
+
+ #[error(display = "Failed to bind netlink socket")]
+ BindError(#[error(source)] io::Error),
+
+ #[error(display = "Netlink error")]
+ NetlinkError(#[error(source)] failure::Compat<rtnetlink::Error>),
+
+ #[error(display = "Route without a valid node")]
+ InvalidRoute,
+
+ #[error(display = "Invalid length of byte buffer for IP address")]
+ InvalidIpBytes,
+
+ #[error(display = "Invalid network prefix")]
+ InvalidNetworkPrefix(#[error(source)] ipnetwork::IpNetworkError),
+
+ #[error(display = "Failed to initialize event loop")]
+ EventLoopError(#[error(source)] io::Error),
+
+ #[error(display = "Unknown device index - {}", _0)]
+ UnknownDeviceIndex(u32),
+}
+
+pub struct RouteManagerImpl {
+ shutdown_rx: old_oneshot::Receiver<old_oneshot::Sender<()>>,
+ manager: RouteManagerImplInner,
+ runtime: tokio02::runtime::Runtime,
+}
+
+impl RouteManagerImpl {
+ /// Creates a new RouteManagerImplInner.
+ pub fn new(
+ required_routes: HashMap<IpNetwork, NetNode>,
+ shutdown_rx: old_oneshot::Receiver<old_oneshot::Sender<()>>,
+ ) -> Result<Self> {
+ let mut runtime = tokio02::runtime::Builder::new()
+ .basic_scheduler()
+ .core_threads(1)
+ .enable_all()
+ .thread_name("mullvad-route-manager-event-loop")
+ .build()
+ .map_err(Error::EventLoopError)?;
+
+ let manager = runtime.block_on(RouteManagerImplInner::new(required_routes))?;
+
+ Ok(Self {
+ shutdown_rx,
+ runtime,
+ manager,
+ })
+ }
+
+ pub fn wait(self) -> Result<()> {
+ let Self {
+ shutdown_rx,
+ mut runtime,
+ manager,
+ } = self;
+ runtime.block_on(manager.into_future(shutdown_rx))
+ }
+}
+
+pub struct RouteManagerImplInner {
+ handle: Handle,
+ messages: UnboundedReceiver<(NetlinkMessage<RtnlMessage>, SocketAddr)>,
+ iface_map: BTreeMap<u32, String>,
+
+ // currently added routes
+ added_routes: HashSet<Route>,
+ // default route tracking
+ // destinations that should be routed through the default route
+ required_default_routes: HashSet<IpNetwork>,
+ default_routes: HashSet<Route>,
+ best_default_node_v4: Option<Node>,
+ best_default_node_v6: Option<Node>,
+}
+
+impl RouteManagerImplInner {
+ pub async fn new(required_routes: HashMap<IpNetwork, NetNode>) -> Result<Self> {
+ let (mut connection, handle, messages) =
+ rtnetlink::new_connection().map_err(Error::ConnectError)?;
+
+ let mgroup_flags = RTMGRP_IPV4_ROUTE | RTMGRP_IPV6_ROUTE | RTMGRP_LINK | RTMGRP_NOTIFY;
+ let addr = SocketAddr::new(0, mgroup_flags);
+ connection
+ .socket_mut()
+ .bind(&addr)
+ .map_err(Error::BindError)?;
+
+ tokio02::spawn(connection);
+
+ let iface_map = Self::initialize_link_map(&handle).await?;
+
+
+ let mut required_normal_routes = HashSet::new();
+ let mut required_default_routes = HashSet::new();
+
+ for (destination, node) in required_routes {
+ match node {
+ NetNode::RealNode(node) => {
+ required_normal_routes.insert(Route::new(node, destination));
+ }
+ NetNode::DefaultNode => {
+ required_default_routes.insert(destination);
+ }
+ }
+ }
+
+
+ let mut monitor = Self {
+ iface_map,
+ handle,
+ messages,
+
+ required_default_routes,
+ added_routes: HashSet::new(),
+
+ default_routes: HashSet::new(),
+ best_default_node_v4: None,
+ best_default_node_v6: None,
+ };
+
+ monitor.default_routes = monitor.get_default_routes().await?;
+ monitor.best_default_node_v4 =
+ Self::pick_best_default_node(&monitor.default_routes, IpVersion::V4);
+ monitor.best_default_node_v6 =
+ Self::pick_best_default_node(&monitor.default_routes, IpVersion::V6);
+
+
+ for normal_route in required_normal_routes.into_iter() {
+ monitor.add_route(normal_route).await?;
+ }
+
+ for prefix in monitor.required_default_routes.clone().into_iter() {
+ if let (false, _, Some(default_node)) | (true, Some(default_node), _) = (
+ prefix.is_ipv4(),
+ &monitor.best_default_node_v4,
+ &monitor.best_default_node_v6,
+ ) {
+ // best to pick a single node identifier rather than device + ip
+ let route = Route::new(default_node.clone(), prefix);
+ monitor.add_route(route).await?;
+ }
+ }
+ Ok(monitor)
+ }
+
+ async fn get_default_routes(&self) -> Result<HashSet<Route>> {
+ let mut routes = self.get_default_routes_inner(IpVersion::V4).await?;
+ routes.extend(self.get_default_routes_inner(IpVersion::V6).await?);
+ Ok(routes)
+ }
+
+ async fn get_default_routes_inner(&self, version: IpVersion) -> Result<HashSet<Route>> {
+ let mut routes = HashSet::new();
+ let mut route_request = self.handle.route().get(version).execute();
+ if let Some(route) = route_request
+ .try_next()
+ .await
+ .map_err(failure::Fail::compat)
+ .map_err(Error::NetlinkError)?
+ {
+ if route.header.destination_prefix_length == 0 {
+ if let Some(default_route) = self.parse_route_message(route)? {
+ routes.insert(default_route);
+ }
+ }
+ };
+ Ok(routes)
+ }
+
+ async fn initialize_link_map(handle: &rtnetlink::Handle) -> Result<BTreeMap<u32, String>> {
+ let mut link_map = BTreeMap::new();
+ let mut link_request = handle.link().get().execute();
+ while let Some(link) = link_request
+ .try_next()
+ .await
+ .map_err(failure::Fail::compat)
+ .map_err(Error::NetlinkError)?
+ {
+ if let Some((idx, link_name)) = Self::map_iface_name_to_idx(link) {
+ link_map.insert(idx, link_name);
+ }
+ }
+
+ Ok(link_map)
+ }
+
+ fn find_iface_idx(&self, iface_name: &str) -> Option<u32> {
+ self.iface_map
+ .iter()
+ .find(|(_idx, name)| name.as_str() == iface_name)
+ .map(|(idx, _name)| *idx)
+ }
+
+
+ async fn process_new_route(&mut self, route: Route) -> Result<()> {
+ if route.prefix.prefix() == 0 {
+ self.default_routes.insert(route);
+ self.update_default_routes().await?;
+ }
+ Ok(())
+ }
+
+ async fn process_deleted_route(&mut self, route: Route) -> Result<()> {
+ if route.prefix.prefix() == 0 {
+ self.default_routes.remove(&route);
+ self.update_default_routes().await?;
+ }
+ if self.added_routes.contains(&route) {
+ self.added_routes.remove(&route);
+ }
+ Ok(())
+ }
+
+ async fn update_default_routes(&mut self) -> Result<()> {
+ let new_best_v4 = Self::pick_best_default_node(&self.default_routes, IpVersion::V4);
+ if self.best_default_node_v4 != new_best_v4 && new_best_v4.is_some() {
+ let new_node = new_best_v4.unwrap();
+ let old_node = self.best_default_node_v4.take();
+ let v4_destinations: Vec<_> = self
+ .required_default_routes
+ .iter()
+ .filter(|ip| ip.is_ipv4())
+ .cloned()
+ .collect();
+ for destination in v4_destinations {
+ let new_route = Route::new(new_node.clone(), destination);
+ if let Some(old_node) = &old_node {
+ let old_route = Route::new(old_node.clone(), destination);
+ if let Err(e) = self.delete_route(&old_route).await {
+ log::error!("Failed to remove old route {} - {}", &old_route, e);
+ }
+ }
+ if let Err(e) = self.add_route(new_route).await {
+ log::error!("Failed to add new route {} - {}", &new_node, e);
+ }
+ }
+ self.best_default_node_v4 = Some(new_node);
+ }
+
+ let new_best_v6 = Self::pick_best_default_node(&self.default_routes, IpVersion::V6);
+ if self.best_default_node_v6 != new_best_v6 && new_best_v6.is_some() {
+ let new_node = new_best_v6.unwrap();
+ let old_node = self.best_default_node_v6.take();
+ let v6_destinations: Vec<_> = self
+ .required_default_routes
+ .iter()
+ .filter(|ip| !ip.is_ipv4())
+ .cloned()
+ .collect();
+
+ for destination in v6_destinations {
+ let new_route = Route::new(new_node.clone(), destination);
+ if let Some(old_node) = &old_node {
+ let old_route = Route::new(old_node.clone(), destination);
+
+ if let Err(e) = self.delete_route(&old_route).await {
+ log::error!("Failed to remove old route {} - {}", &old_route, e);
+ }
+ }
+ if let Err(e) = self.add_route(new_route).await {
+ log::error!("Failed to add new route {} - {}", &new_node, e);
+ }
+ }
+ self.best_default_node_v6 = Some(new_node);
+ }
+
+ Ok(())
+ }
+
+ fn pick_best_default_node(routes: &HashSet<Route>, version: IpVersion) -> Option<Node> {
+ // Pick the route with the lowest metric - thus the most favourable route.
+ routes
+ .iter()
+ .filter(|route| route.prefix.is_ipv4() == (version == IpVersion::V4))
+ .fold(
+ None,
+ |best_route: Option<Route>, next_route| match best_route {
+ Some(current_best) => {
+ if current_best.metric.unwrap_or(0) > next_route.metric.unwrap_or(0) {
+ Some(next_route.clone())
+ } else {
+ Some(current_best)
+ }
+ }
+ None => Some(next_route.clone()),
+ },
+ )
+ .map(|route| route.node)
+ }
+
+ async fn cleanup_routes(&mut self) {
+ for route in self.added_routes.drain().collect::<Vec<_>>().iter() {
+ if let Err(e) = self.delete_route(&route).await {
+ if let Error::NetlinkError(err) = &e {
+ if let rtnetlink::ErrorKind::NetlinkError(msg) = err.get_ref().kind() {
+ // -3 means that the route doesn't exist anymore anyway
+ if msg.code == -3 {
+ continue;
+ }
+ }
+ }
+ log::error!("Failed to remove route - {} - {}", route, e);
+ }
+ }
+ }
+
+
+ pub async fn into_future(
+ mut self,
+ shutdown_rx: futures01::sync::oneshot::Receiver<futures01::sync::oneshot::Sender<()>>,
+ ) -> Result<()> {
+ futures::select! {
+ shutdown_signal = shutdown_rx.compat().fuse() => {
+ log::trace!("Shutting down route manager");
+ self.cleanup_routes().await;
+ log::trace!("Route manager done");
+ if let Ok(shutdown_signal) = shutdown_signal {
+ let _ = shutdown_signal.send(());
+ }
+ return Ok(());
+ },
+ (route_change, socket) = self.messages.select_next_some().fuse() => {
+ self.process_netlink_message(route_change).await?;
+ }
+ };
+ Ok(())
+ }
+
+ async fn process_netlink_message(&mut self, msg: NetlinkMessage<RtnlMessage>) -> Result<()> {
+ match msg.payload {
+ NetlinkPayload::InnerMessage(RtnlMessage::NewLink(new_link)) => {
+ if let Some((idx, name)) = Self::map_iface_name_to_idx(new_link) {
+ self.iface_map.insert(idx, name);
+ }
+ }
+ NetlinkPayload::InnerMessage(RtnlMessage::DelLink(old_link)) => {
+ if let Some((idx, _)) = Self::map_iface_name_to_idx(old_link) {
+ self.iface_map.remove(&idx);
+ }
+ }
+
+ NetlinkPayload::InnerMessage(RtnlMessage::NewRoute(new_route)) => {
+ if let Some(new_route) = self.parse_route_message(new_route)? {
+ self.process_new_route(new_route).await?;
+ }
+ }
+ NetlinkPayload::InnerMessage(RtnlMessage::DelRoute(old_route)) => {
+ if let Some(deletion) = self.parse_route_message(old_route)? {
+ self.process_deleted_route(deletion).await?;
+ }
+ }
+ _ => (),
+ };
+ Ok(())
+ }
+
+ // Tries to coax a Route out of a RouteMessage, but only if it's a route from the main routing
+ // table
+ // TODO: Change to account for different routing tables.
+ fn parse_route_message(&self, msg: RouteMessage) -> Result<Option<Route>> {
+ if msg.header.table != RT_TABLE_MAIN {
+ return Ok(None);
+ }
+
+
+ let mut prefix = None;
+ let mut node_addr = None;
+ let mut device = None;
+ let mut metric = None;
+ let mut gateway = None;
+
+ let destination_length = msg.header.destination_prefix_length;
+ let af_spec = msg.header.address_family;
+
+ for nla in msg.nlas.iter() {
+ match nla {
+ RouteNla::Oif(device_idx) => {
+ match self.iface_map.get(&device_idx) {
+ Some(device_name) => device = Some(device_name.to_string()),
+ None => {
+ return Err(Error::UnknownDeviceIndex(*device_idx));
+ }
+ };
+ }
+
+ RouteNla::Via(addr) => {
+ node_addr = Self::parse_ip(&addr).map(Some)?;
+ }
+
+ RouteNla::Destination(addr) => {
+ prefix = Self::parse_ip(&addr)
+ .and_then(|ip| {
+ ipnetwork::IpNetwork::new(ip, destination_length)
+ .map_err(Error::InvalidNetworkPrefix)
+ })
+ .map(Some)?;
+ }
+
+ // gateway NLAs indicate that this is actually a default route
+ RouteNla::Gateway(gateway_ip) => {
+ gateway = Self::parse_ip(&gateway_ip).map(Some)?;
+ }
+
+ RouteNla::Priority(priority) => {
+ metric = Some(*priority);
+ }
+ _ => continue,
+ }
+ }
+
+ // when a gateway is specified but prefix is none, then this is a default route
+ if prefix.is_none() && gateway.is_some() {
+ prefix = match af_spec as i32 {
+ AF_INET => Some("0.0.0.0/0".parse().expect("failed to parse ipnetwork")),
+ AF_INET6 => Some("::/0".parse().expect("failed to parse ipnetwork")),
+ _ => None,
+ };
+ }
+
+ if device.is_none() && node_addr.is_none() || prefix.is_none() {
+ return Err(Error::InvalidRoute);
+ }
+
+
+ let node = Node {
+ ip: node_addr.or(gateway),
+ device,
+ };
+
+ Ok(Some(Route {
+ node,
+ prefix: prefix.unwrap(),
+ metric,
+ }))
+ }
+
+ fn map_iface_name_to_idx(msg: LinkMessage) -> Option<(u32, String)> {
+ let index = msg.header.index;
+ for nla in msg.nlas {
+ if let LinkNla::IfName(name) = nla {
+ return Some((index, name));
+ }
+ }
+ None
+ }
+
+ fn parse_ip(bytes: &[u8]) -> Result<IpAddr> {
+ if bytes.len() == 4 {
+ let mut ipv4_bytes = [0u8; 4];
+ ipv4_bytes.copy_from_slice(bytes);
+ Ok(IpAddr::from(ipv4_bytes))
+ } else if bytes.len() == 16 {
+ let mut ipv6_bytes = [0u8; 16];
+ ipv6_bytes.copy_from_slice(bytes);
+ Ok(IpAddr::from(ipv6_bytes))
+ } else {
+ log::error!("Expected either 4 or 16 bytes, got {} bytes", bytes.len());
+ Err(Error::InvalidIpBytes)
+ }
+ }
+
+ async fn delete_route(&self, route: &Route) -> Result<()> {
+ let mut route_message = RouteMessage {
+ header: RouteHeader {
+ address_family: if route.prefix.is_ipv4() {
+ AF_INET as u8
+ } else {
+ AF_INET6 as u8
+ },
+ source_prefix_length: 0,
+ destination_prefix_length: route.prefix.prefix(),
+ tos: 0u8,
+ table: RT_TABLE_MAIN,
+ protocol: RTPROT_STATIC,
+ scope: RT_SCOPE_UNIVERSE,
+ kind: RTN_UNICAST,
+ flags: RouteFlags::empty(),
+ },
+ nlas: vec![RouteNla::Destination(ip_to_bytes(route.prefix.ip()))],
+ };
+ if let Some(interface_name) = route.node.get_device() {
+ if let Some(iface_idx) = self.find_iface_idx(interface_name) {
+ route_message.nlas.push(RouteNla::Oif(iface_idx));
+ }
+ }
+
+ if let Some(gateway) = route.node.get_address() {
+ let gateway_nla = if route.node.get_device().is_some() {
+ RouteNla::Gateway(ip_to_bytes(gateway))
+ } else {
+ RouteNla::Via(ip_to_bytes(gateway))
+ };
+ route_message.nlas.push(gateway_nla);
+ }
+
+
+ self.handle
+ .route()
+ .del(route_message)
+ .execute()
+ .await
+ .map_err(failure::Fail::compat)
+ .map_err(Error::NetlinkError)
+ }
+
+ async fn add_route(&mut self, route: Route) -> Result<()> {
+ let add_message = match &route.prefix {
+ IpNetwork::V4(v4_prefix) => {
+ let mut add_message = self
+ .handle
+ .route()
+ .add_v4()
+ .destination_prefix(v4_prefix.ip(), v4_prefix.prefix());
+
+ if v4_prefix.size() > 1 {
+ add_message = add_message.scope(RT_SCOPE_LINK)
+ }
+
+ if let Some(IpAddr::V4(node_address)) = route.node.get_address() {
+ add_message = add_message.gateway(node_address);
+ }
+
+ if let Some(interface_name) = route.node.get_device() {
+ if let Some(iface_idx) = self.find_iface_idx(interface_name) {
+ add_message = add_message.output_interface(iface_idx);
+ }
+ }
+
+ add_message.message_mut().clone()
+ }
+
+ IpNetwork::V6(v6_prefix) => {
+ let mut add_message = self
+ .handle
+ .route()
+ .add_v6()
+ .destination_prefix(v6_prefix.ip(), v6_prefix.prefix());
+
+ if v6_prefix.size() > 1 {
+ add_message = add_message.scope(RT_SCOPE_LINK)
+ }
+
+ if let Some(IpAddr::V6(node_address)) = route.node.get_address() {
+ add_message = add_message.gateway(node_address);
+ }
+
+ if let Some(interface_name) = route.node.get_device() {
+ if let Some(iface_idx) = self.find_iface_idx(interface_name) {
+ add_message = add_message.output_interface(iface_idx);
+ }
+ }
+
+ add_message.message_mut().clone()
+ }
+ };
+
+ // Need to modify the request in place to set the correct flags to be able to replace any
+ // existing routes - self.handle.route().add_v4().execute() sets the NLM_F_EXCL flag which
+ // will make the request fail if a route with the same destination already exists.
+ use netlink_packet_route::constants::*;
+ let mut req = NetlinkMessage::from(RtnlMessage::NewRoute(add_message));
+ req.header.flags = NLM_F_REQUEST | NLM_F_ACK | NLM_F_CREATE | NLM_F_REPLACE;
+
+ let mut response = self
+ .handle
+ .request(req)
+ .map_err(failure::Fail::compat)
+ .map_err(Error::NetlinkError)?;
+
+ while let Some(message) = response.next().await {
+ if let NetlinkPayload::Error(err) = message.payload {
+ let compat_err =
+ failure::Fail::compat(rtnetlink::ErrorKind::NetlinkError(err).into());
+ return Err(Error::NetlinkError(compat_err));
+ }
+ }
+ self.added_routes.insert(route.clone());
+ Ok(())
+ }
+}
+
+impl Drop for RouteManagerImplInner {
+ fn drop(&mut self) {
+ futures::executor::block_on(self.cleanup_routes())
+ }
+}
+
+fn ip_to_bytes(addr: IpAddr) -> Vec<u8> {
+ match addr {
+ IpAddr::V4(addr) => addr.octets().to_vec(),
+ IpAddr::V6(addr) => addr.octets().to_vec(),
+ }
+}
diff --git a/talpid-core/src/routing/linux/change_listener.rs b/talpid-core/src/routing/linux/change_listener.rs
deleted file mode 100644
index 82608a59a7..0000000000
--- a/talpid-core/src/routing/linux/change_listener.rs
+++ /dev/null
@@ -1,241 +0,0 @@
-use crate::routing::{Node, Route};
-
-use super::RouteChange;
-use futures::{future::Either, sync::mpsc, Async, Future, Stream};
-use std::{collections::BTreeMap, io, net::IpAddr};
-
-use netlink_packet::{
- LinkMessage, LinkNla, NetlinkMessage, NetlinkPayload, RouteMessage, RouteNla, RtnlMessage,
-};
-use netlink_sys::SocketAddr;
-use rtnetlink::constants::{
- AF_INET, AF_INET6, RTMGRP_IPV4_ROUTE, RTMGRP_IPV6_ROUTE, RTMGRP_LINK, RTMGRP_NOTIFY,
-};
-
-#[derive(err_derive::Error, Debug)]
-#[error(no_from)]
-pub enum Error {
- #[error(display = "Netlink connection failed")]
- NetlinkError(#[error(source)] failure::Compat<rtnetlink::Error>),
- #[error(display = "Netlink protocol error")]
- NetlinkProtocolError(#[error(source)] failure::Compat<netlink_proto::Error>),
- #[error(display = "Failed to open a netlink connection")]
- ConnectError(#[error(source)] io::Error),
- #[error(display = "Route without a valid node")]
- InvalidRoute,
- #[error(display = "Invalid length of byte buffer for IP address")]
- InvalidIpBytes,
- #[error(display = "Invalid network prefix")]
- InvalidNetworkPrefix(#[error(source)] ipnetwork::IpNetworkError),
- #[error(display = "Unknown device index - {}", _0)]
- UnknownDeviceIndex(u32),
- #[error(display = "Failed to bind netlink socket")]
- BindError(#[error(source)] io::Error),
- #[error(display = "Netlink connection stopped sending messages")]
- NetlinkConnectionClosed,
-}
-
-type Result<T> = std::result::Result<T, Error>;
-
-pub(super) struct RouteChangeListener {
- connection: rtnetlink::Connection,
- messages: mpsc::UnboundedReceiver<NetlinkMessage>,
- iface_map: BTreeMap<u32, String>,
-}
-
-impl RouteChangeListener {
- pub fn new() -> Result<Self> {
- let (mut connection, handle, messages) =
- rtnetlink::new_connection_with_messages().map_err(Error::ConnectError)?;
-
- let mgroup_flags = RTMGRP_IPV4_ROUTE | RTMGRP_IPV6_ROUTE | RTMGRP_LINK | RTMGRP_NOTIFY;
- let addr = SocketAddr::new(0, mgroup_flags);
- connection
- .socket_mut()
- .bind(&addr)
- .map_err(Error::BindError)?;
-
- let (iface_map, connection) = Self::initialize_link_map(connection, handle)?;
-
- Ok(Self {
- connection,
- messages,
- iface_map,
- })
- }
-
- fn map_netlink_to_route_change(&mut self, msg: NetlinkMessage) -> Result<Option<RouteChange>> {
- match msg.payload {
- NetlinkPayload::Rtnl(RtnlMessage::NewLink(new_link)) => {
- if let Some((idx, name)) = Self::map_iface_name_to_idx(new_link) {
- self.iface_map.insert(idx, name);
- }
- Ok(None)
- }
- NetlinkPayload::Rtnl(RtnlMessage::DelLink(old_link)) => {
- if let Some((idx, _)) = Self::map_iface_name_to_idx(old_link) {
- self.iface_map.remove(&idx);
- }
- Ok(None)
- }
-
- NetlinkPayload::Rtnl(RtnlMessage::NewRoute(new_route)) => {
- self.get_route(new_route).map(RouteChange::Add).map(Some)
- }
- NetlinkPayload::Rtnl(RtnlMessage::DelRoute(old_route)) => {
- self.get_route(old_route).map(RouteChange::Remove).map(Some)
- }
- _ => Ok(None),
- }
- }
-
- // Tries to coax a Route out of a RouteMessage
- fn get_route(&self, msg: RouteMessage) -> Result<Route> {
- let mut prefix = None;
- let mut node_addr = None;
- let mut device = None;
- let mut metric = None;
- let mut gateway = None;
-
- let destination_length = msg.header.destination_length;
- let af_spec = msg.header.address_family;
-
- for nla in msg.nlas.iter() {
- match nla {
- RouteNla::Oif(device_idx) => {
- match self.iface_map.get(&device_idx) {
- Some(device_name) => device = Some(device_name.to_string()),
- None => {
- return Err(Error::UnknownDeviceIndex(*device_idx));
- }
- };
- }
-
- RouteNla::Via(addr) => {
- node_addr = Self::parse_ip(&addr).map(Some)?;
- }
-
- RouteNla::Destination(addr) => {
- prefix = Self::parse_ip(&addr)
- .and_then(|ip| {
- ipnetwork::IpNetwork::new(ip, destination_length)
- .map_err(Error::InvalidNetworkPrefix)
- })
- .map(Some)?;
- }
-
- // gateway NLAs indicate that this is actually a default route
- RouteNla::Gateway(gateway_ip) => {
- gateway = Self::parse_ip(&gateway_ip).map(Some)?;
- }
-
- RouteNla::Priority(priority) => {
- metric = Some(*priority);
- }
- _ => continue,
- }
- }
-
- // when a gateway is specified but prefix is none, then this is a default route
- if prefix.is_none() && gateway.is_some() {
- prefix = match af_spec as u16 {
- AF_INET => Some("0.0.0.0/0".parse().expect("failed to parse ipnetwork")),
- AF_INET6 => Some("::/0".parse().expect("failed to parse ipnetwork")),
- _ => None,
- };
- }
-
- if device.is_none() && node_addr.is_none() || prefix.is_none() {
- return Err(Error::InvalidRoute);
- }
-
-
- let node = Node {
- ip: node_addr,
- device,
- };
-
- Ok(Route {
- node,
- prefix: prefix.unwrap(),
- metric,
- })
- }
-
- fn map_iface_name_to_idx(msg: LinkMessage) -> Option<(u32, String)> {
- let index = msg.header.index;
- for nla in msg.nlas {
- match nla {
- LinkNla::IfName(name) => return Some((index, name)),
- _ => continue,
- }
- }
- None
- }
-
- fn parse_ip(bytes: &[u8]) -> Result<IpAddr> {
- if bytes.len() == 4 {
- let mut ipv4_bytes = [0u8; 4];
- ipv4_bytes.copy_from_slice(bytes);
- Ok(IpAddr::from(ipv4_bytes))
- } else if bytes.len() == 16 {
- let mut ipv6_bytes = [0u8; 16];
- ipv6_bytes.copy_from_slice(bytes);
- Ok(IpAddr::from(ipv6_bytes))
- } else {
- log::error!("Expected either 4 or 16 bytes, got {} bytes", bytes.len());
- Err(Error::InvalidIpBytes)
- }
- }
-
- pub fn initialize_link_map(
- connection: rtnetlink::Connection,
- handle: rtnetlink::Handle,
- ) -> Result<(BTreeMap<u32, String>, rtnetlink::Connection)> {
- let request = handle
- .link()
- .get()
- .execute()
- .filter_map(Self::map_iface_name_to_idx)
- .collect();
-
- match connection.select2(request).wait() {
- Ok(Either::A(_)) => Err(Error::NetlinkConnectionClosed),
- Err(Either::A((error, _))) => {
- Err(Error::NetlinkProtocolError(failure::Fail::compat(error)))
- }
- Ok(Either::B((links, connection))) => Ok((links.into_iter().collect(), connection)),
- Err(Either::B((error, _))) => Err(Error::NetlinkError(failure::Fail::compat(error))),
- }
- }
-}
-
-impl Stream for RouteChangeListener {
- type Item = RouteChange;
- type Error = Error;
-
- fn poll(&mut self) -> Result<Async<Option<RouteChange>>> {
- self.connection
- .poll()
- .map_err(failure::Fail::compat)
- .map_err(Error::NetlinkProtocolError)?;
-
- loop {
- match futures::try_ready!(self
- .messages
- .poll()
- .map_err(|_| Error::NetlinkConnectionClosed))
- {
- Some(message) => {
- if let Some(route_change) = self.map_netlink_to_route_change(message)? {
- return Ok(Async::Ready(Some(route_change)));
- };
- continue;
- }
- None => {
- return Err(Error::NetlinkConnectionClosed);
- }
- }
- }
- }
-}
diff --git a/talpid-core/src/routing/linux/mod.rs b/talpid-core/src/routing/linux/mod.rs
deleted file mode 100644
index cdc57c87e1..0000000000
--- a/talpid-core/src/routing/linux/mod.rs
+++ /dev/null
@@ -1,440 +0,0 @@
-use crate::routing::{NetNode, Node, Route};
-
-use ipnetwork::IpNetwork;
-use std::{
- collections::{HashMap, HashSet},
- io,
- process::{Command, Stdio},
-};
-
-mod change_listener;
-use change_listener::{Error as RouteChangeListenerError, RouteChangeListener};
-
-use futures::{sync::oneshot, Async, Future, Stream};
-
-pub type Result<T> = std::result::Result<T, Error>;
-
-/// Errors that can happen in the Linux routing integration
-#[derive(err_derive::Error, Debug)]
-#[error(no_from)]
-pub enum Error {
- /// Failed to add route.
- #[error(display = "Failed to add route")]
- FailedToAddRoute(#[error(source)] io::Error),
-
- /// Failed to remove route.
- #[error(display = "Failed to remove route")]
- FailedToRemoveRoute(#[error(source)] io::Error),
-
- /// Error while running "ip route".
- #[error(display = "Error while running \"ip route\"")]
- FailedToRunIp(#[error(source)] io::Error),
-
- /// Invocation of `ip route` ended with a non-zero exit code
- #[error(display = "ip returend a non-zero exit code")]
- ErrorIpFailed,
-
- /// Received unexpected output from `ip route`
- #[error(display = "Received unexpected output from \"ip\"")]
- UnexpectedOutput,
-
- /// No default route exists
- #[error(display = "No default route in \"ip route\" output")]
- NoDefaultRoute,
-
- /// Route table change stream failed.
- #[error(display = "Route change listener failed")]
- ChangeListenerError(#[error(source)] RouteChangeListenerError),
-
- /// Route table change stream failed.
- #[error(display = "Route change listener closed unexpectedly")]
- ChangeListenerClosed,
-}
-
-pub struct RouteManagerImpl {
- changes: RouteChangeListener,
-
- // currently added routes
- added_routes: HashSet<Route>,
- // default route tracking
- // destinations that should be routed through the default route
- required_default_routes: HashSet<IpNetwork>,
- default_routes: HashSet<Route>,
- best_default_node_v4: Option<Node>,
- best_default_node_v6: Option<Node>,
-
- // if the stop channel is set, the future should wind down - remove added routes and send a
- // signal.
- shutdown_finished_tx: Option<oneshot::Sender<()>>,
- shutdown_rx: oneshot::Receiver<oneshot::Sender<()>>,
- should_shut_down: bool,
-}
-
-impl RouteManagerImpl {
- /// Creates a new RouteManager.
- pub fn new(
- required_routes: HashMap<IpNetwork, NetNode>,
- shutdown_rx: oneshot::Receiver<oneshot::Sender<()>>,
- ) -> Result<Self> {
- let changes = RouteChangeListener::new().map_err(Error::ChangeListenerError)?;
-
- let mut required_normal_routes = HashSet::new();
- let mut required_default_routes = HashSet::new();
-
- for (destination, node) in required_routes {
- match node {
- NetNode::RealNode(node) => {
- required_normal_routes.insert(Route::new(node, destination));
- }
- NetNode::DefaultNode => {
- required_default_routes.insert(destination);
- }
- }
- }
-
- let default_routes = Self::get_default_routes()?;
-
- let best_default_node_v4 = Self::pick_best_default_node(&default_routes, true);
- let best_default_node_v6 = Self::pick_best_default_node(&default_routes, false);
-
- let mut monitor = Self {
- changes,
-
- required_default_routes,
- added_routes: HashSet::new(),
-
- default_routes,
- best_default_node_v4,
- best_default_node_v6,
-
- shutdown_finished_tx: None,
- shutdown_rx,
- should_shut_down: false,
- };
- for normal_route in required_normal_routes.iter() {
- monitor.add_route(&normal_route)?;
- }
-
- for prefix in monitor.required_default_routes.clone().into_iter() {
- if let (false, _, Some(default_node)) | (true, Some(default_node), _) = (
- prefix.is_ipv4(),
- &monitor.best_default_node_v4,
- &monitor.best_default_node_v6,
- ) {
- // best to pick a single node identifier rather than device + ip
- let route = Route::new(default_node.clone(), prefix);
- monitor.add_route(&route)?;
- }
- }
- Ok(monitor)
- }
-
- fn process_route_table_change(&mut self) -> Result<()> {
- loop {
- let change = self.changes.poll().map_err(Error::ChangeListenerError)?;
- match change {
- Async::NotReady => return Ok(()),
- Async::Ready(Some(RouteChange::Add(route))) => self.process_new_route(route),
- Async::Ready(Some(RouteChange::Remove(route))) => self.process_deleted_route(route),
- Async::Ready(None) => return Err(Error::ChangeListenerClosed),
- }
- }
- }
-
- fn process_new_route(&mut self, route: Route) {
- if route.prefix.prefix() == 0 {
- self.default_routes.insert(route);
- self.update_default_routes();
- }
- }
-
- fn process_deleted_route(&mut self, route: Route) {
- if route.prefix.prefix() == 0 {
- self.update_default_routes();
- }
- }
-
- fn update_default_routes(&mut self) {
- let new_best_v4 = Self::pick_best_default_node(&self.default_routes, true);
- if self.best_default_node_v4 != new_best_v4 && new_best_v4.is_some() {
- let new_node = new_best_v4.unwrap();
- let old_node = self.best_default_node_v4.take();
- let v4_destinations: Vec<_> = self
- .required_default_routes
- .iter()
- .filter(|ip| ip.is_ipv4())
- .cloned()
- .collect();
- for destination in v4_destinations {
- let new_route = Route::new(new_node.clone(), destination);
- if let Some(old_node) = &old_node {
- let old_route = Route::new(old_node.clone(), destination);
- if let Err(e) = self.delete_route(&old_route) {
- log::error!("Failed to remove old route {} - {}", &old_route, e);
- }
- }
- if let Err(e) = self.add_route(&new_route) {
- log::error!("Failed to add new route {} - {}", &new_node, e);
- }
- }
- self.best_default_node_v4 = Some(new_node);
- }
-
- let new_best_v6 = Self::pick_best_default_node(&self.default_routes, false);
- if self.best_default_node_v6 != new_best_v6 && new_best_v6.is_some() {
- let new_node = new_best_v6.unwrap();
- let old_node = self.best_default_node_v6.take();
- let v6_destinations: Vec<_> = self
- .required_default_routes
- .iter()
- .filter(|ip| !ip.is_ipv4())
- .cloned()
- .collect();
-
- for destination in v6_destinations {
- let new_route = Route::new(new_node.clone(), destination);
- if let Some(old_node) = &old_node {
- let old_route = Route::new(old_node.clone(), destination);
-
- if let Err(e) = self.delete_route(&old_route) {
- log::error!("Failed to remove old route {} - {}", &old_route, e);
- }
- }
- if let Err(e) = self.add_route(&new_route) {
- log::error!("Failed to add new route {} - {}", &new_node, e);
- }
- }
- self.best_default_node_v6 = Some(new_node);
- }
- }
-
- fn pick_best_default_node(routes: &HashSet<Route>, v4: bool) -> Option<Node> {
- // Pick the route with the lowest metric - thus the most favourable route.
- routes
- .iter()
- .filter(|route| route.prefix.is_ipv4() == v4)
- .fold(
- None,
- |best_route: Option<Route>, next_route| match best_route {
- Some(current_best) => {
- if current_best.metric.unwrap_or(0) > next_route.metric.unwrap_or(0) {
- Some(next_route.clone())
- } else {
- Some(current_best)
- }
- }
- None => Some(next_route.clone()),
- },
- )
- .map(|route| route.node)
- }
-
- fn route_cmd(action: &str, route: &Route) -> Command {
- let mut cmd = Command::new("ip");
-
- cmd.arg(ip_vers(&route))
- .arg("route")
- .arg(action)
- .arg(route.prefix.to_string());
-
- if let Some(addr) = route.node.get_address() {
- cmd.arg("via").arg(addr.to_string());
- };
- if let Some(device) = route.node.get_device() {
- cmd.arg("dev").arg(device);
- };
- if let Some(metric) = route.metric {
- cmd.arg("metric").arg(metric.to_string());
- };
-
- cmd
- }
-
- fn run_cmd(mut cmd: Command, err: impl Fn(io::Error) -> Error) -> Result<()> {
- log::trace!("running cmd - {:?}", &cmd);
- let status = cmd.status().map_err(|e| err(e))?;
- match status.code() {
- Some(0) => Ok(()),
- Some(i) => Err(err(io::Error::new(
- io::ErrorKind::Other,
- format!("exit status {}", i),
- ))),
- None => Err(err(io::Error::new(
- io::ErrorKind::Other,
- "interrupted by signal",
- ))),
- }
- }
-
- fn get_default_routes_inner(ip_version: IpVersion) -> Result<Vec<Route>> {
- let mut cmd = Command::new("ip");
- cmd.arg(ip_version.to_route_arg()).arg("route").arg("show");
-
- cmd.stdout(Stdio::piped())
- .output()
- .map_err(Error::FailedToRunIp)
- .and_then(move |output| {
- let output_lines = String::from_utf8(output.stdout.clone())
- .map_err(|_| Error::UnexpectedOutput)?;
- Ok(output_lines
- .lines()
- .filter_map(|line| {
- if line.starts_with("default") {
- parse_ip_route_show_line(line, ip_version)
- } else {
- None
- }
- })
- .collect())
- })
- }
-
- /// Adds routes to the system routing table.
- fn add_route(&mut self, route: &Route) -> Result<()> {
- let cmd = Self::route_cmd("replace", route);
- Self::run_cmd(cmd, Error::FailedToAddRoute)?;
- self.added_routes.insert(route.clone());
- Ok(())
- }
-
- /// Removes previously set routes. If routes were set for specific tables, the whole tables
- /// will be removed.
- fn delete_route(&mut self, route: &Route) -> Result<()> {
- let cmd = Self::route_cmd("delete", route);
- Self::run_cmd(cmd, Error::FailedToRemoveRoute)?;
- self.added_routes.remove(route);
- Ok(())
- }
-
- fn cleanup_routes(&mut self) {
- for route in self.added_routes.drain().collect::<Vec<_>>().iter() {
- if let Err(e) = self.delete_route(&route) {
- log::error!("Failed to remove route - {} - {}", route, e);
- }
- }
- }
-
-
- /// Retrieves the gateway for the default route
- fn get_default_routes() -> Result<HashSet<Route>> {
- let v4_routes = Self::get_default_routes_inner(IpVersion::V4)?;
- let v6_routes = Self::get_default_routes_inner(IpVersion::V6)?;
- Ok(v4_routes.into_iter().chain(v6_routes.into_iter()).collect())
- }
-}
-
-#[derive(Debug, Copy, Clone)]
-enum IpVersion {
- V4,
- V6,
-}
-
-impl IpVersion {
- fn to_route_arg(self) -> &'static str {
- match self {
- IpVersion::V4 => "-4",
- IpVersion::V6 => "-6",
- }
- }
-}
-
-impl Future for RouteManagerImpl {
- type Item = ();
- type Error = Error;
- fn poll(&mut self) -> Result<Async<()>> {
- if !self.should_shut_down {
- match self.shutdown_rx.poll() {
- Ok(Async::NotReady) => (),
- Ok(Async::Ready(tx)) => {
- self.should_shut_down = true;
- self.shutdown_finished_tx = Some(tx);
- }
- Err(_) => {
- self.should_shut_down = true;
- }
- };
- self.process_route_table_change()?;
- }
- if self.should_shut_down {
- self.cleanup_routes();
- if let Some(tx) = self.shutdown_finished_tx.take() {
- if tx.send(()).is_err() {
- log::error!("RouteManagerHandle already stopped");
- }
- }
- Ok(Async::Ready(()))
- } else {
- Ok(Async::NotReady)
- }
- }
-}
-
-impl Drop for RouteManagerImpl {
- fn drop(&mut self) {
- self.cleanup_routes();
- }
-}
-
-// intended to parse lines sucha as the following:
-// default via 192.168.1.1 dev wlp61s0 proto dhcp metric 600
-fn parse_ip_route_show_line(line: &str, ip_version: IpVersion) -> Option<Route> {
- let mut node_ip = None;
- let mut device = None;
- let mut metric = None;
-
- let mut tokens = line.split_whitespace();
- let prefix_str = tokens.next()?;
- let prefix = match prefix_str {
- "default" => match ip_version {
- IpVersion::V4 => "0.0.0.0/0".parse().unwrap(),
- IpVersion::V6 => "::/0".parse().unwrap(),
- },
- prefix_str => prefix_str.parse().ok()?,
- };
-
- let tokens: Vec<&str> = tokens.collect();
- for pair in tokens.chunks(2) {
- if pair.len() != 2 {
- log::error!("unexpected output from ip");
- break;
- }
- let kind = pair[0];
- let value = pair[1];
-
- match kind {
- "via" => node_ip = value.parse().ok(),
- "dev" => device = Some(value.to_string()),
- "metric" => metric = value.parse().ok(),
- _ => continue,
- };
- }
-
- if node_ip.is_none() && device.is_none() {
- None
- } else {
- let node = Node {
- ip: node_ip,
- device,
- };
-
- Some(Route {
- node,
- prefix,
- metric,
- })
- }
-}
-
-fn ip_vers(route: &Route) -> &'static str {
- if route.prefix.is_ipv4() {
- "-4"
- } else {
- "-6"
- }
-}
-
-#[derive(Debug, PartialEq)]
-enum RouteChange {
- Add(Route),
- Remove(Route),
-}
diff --git a/talpid-core/src/routing/mod.rs b/talpid-core/src/routing/mod.rs
index e98bf1992f..2baf625f2f 100644
--- a/talpid-core/src/routing/mod.rs
+++ b/talpid-core/src/routing/mod.rs
@@ -30,10 +30,6 @@ impl Route {
metric: None,
}
}
-
- fn is_ipv4(&self) -> bool {
- self.prefix.is_ipv4()
- }
}
impl fmt::Display for Route {
diff --git a/talpid-core/src/routing/unix.rs b/talpid-core/src/routing/unix.rs
index 6a379e1eca..dffd6c21b4 100644
--- a/talpid-core/src/routing/unix.rs
+++ b/talpid-core/src/routing/unix.rs
@@ -11,7 +11,7 @@ use std::{collections::HashMap, sync::mpsc::sync_channel};
mod imp;
#[cfg(target_os = "linux")]
-#[path = "linux/mod.rs"]
+#[path = "linux.rs"]
mod imp;
#[cfg(target_os = "android")]