diff options
| author | Emīls <emils@mullvad.net> | 2020-04-15 10:39:06 +0100 |
|---|---|---|
| committer | Emīls Piņķis <emils@mullvad.net> | 2020-04-27 11:17:00 +0100 |
| commit | 6d7141d24d17f2642eb3bc19675dd05b57fb6265 (patch) | |
| tree | 6112b0877b13da0fc7099bfb60c8bc933e4f2946 | |
| parent | 816326f503fbb3e8ba098b96060f95878e788326 (diff) | |
| download | mullvadvpn-6d7141d24d17f2642eb3bc19675dd05b57fb6265.tar.xz mullvadvpn-6d7141d24d17f2642eb3bc19675dd05b57fb6265.zip | |
Rework routing on Linux
| -rw-r--r-- | talpid-core/Cargo.toml | 12 | ||||
| -rw-r--r-- | talpid-core/src/routing/linux.rs | 666 | ||||
| -rw-r--r-- | talpid-core/src/routing/linux/change_listener.rs | 241 | ||||
| -rw-r--r-- | talpid-core/src/routing/linux/mod.rs | 440 | ||||
| -rw-r--r-- | talpid-core/src/routing/mod.rs | 4 | ||||
| -rw-r--r-- | talpid-core/src/routing/unix.rs | 2 |
6 files changed, 675 insertions, 690 deletions
diff --git a/talpid-core/Cargo.toml b/talpid-core/Cargo.toml index 4ac081cd7e..3e85b7dfe5 100644 --- a/talpid-core/Cargo.toml +++ b/talpid-core/Cargo.toml @@ -13,6 +13,7 @@ cfg-if = "0.1" duct = "0.13" err-derive = "0.2.1" futures01 = { package = "futures", version = "0.1" } +futures = { package = "futures", version = "0.3", features = [ "compat" ]} hex = "0.4" ipnetwork = "0.15" jsonrpc-core = { git = "https://github.com/mullvad/jsonrpc", branch = "mullvad-fork" } @@ -50,10 +51,13 @@ dbus = "0.6" failure = "0.1" notify = "4.0" resolv-conf = "0.6.1" -rtnetlink = { git = "https://github.com/mullvad/netlink", rev = "f768adfcc8c6b064ef7ae3c792c4c21d0d96d0b5" } -netlink-proto = { git = "https://github.com/mullvad/netlink", rev = "f768adfcc8c6b064ef7ae3c792c4c21d0d96d0b5" } -netlink-packet = { git = "https://github.com/mullvad/netlink", rev = "f768adfcc8c6b064ef7ae3c792c4c21d0d96d0b5" } -netlink-sys = { git = "https://github.com/mullvad/netlink", rev = "f768adfcc8c6b064ef7ae3c792c4c21d0d96d0b5" } +async-stream = "0.2" +rtnetlink = "0.2" +netlink-packet-route = "0.2" +netlink-proto = "0.2" +netlink-sys = "0.2" +futures = { package = "futures", version = "0.3" } +tokio02 = { package = "tokio", version = "0.2", features = [ "rt-core", "rt-threaded"] } nftnl = { version = "0.3", features = ["nftnl-1-1-0"] } mnl = { version = "0.2.0", features = ["mnl-1-0-4"] } which = { version = "3.1", default-features = false } diff --git a/talpid-core/src/routing/linux.rs b/talpid-core/src/routing/linux.rs new file mode 100644 index 0000000000..7d21fcf647 --- /dev/null +++ b/talpid-core/src/routing/linux.rs @@ -0,0 +1,666 @@ +use crate::routing::{NetNode, Node, Route}; + +use ipnetwork::IpNetwork; +use std::{ + collections::{BTreeMap, HashMap, HashSet}, + io, + net::IpAddr, +}; + +use futures01::sync::oneshot as old_oneshot; + +use futures::{ + channel::mpsc::UnboundedReceiver, compat::Future01CompatExt, future::FutureExt, StreamExt, + TryStreamExt, +}; + + +use netlink_packet_route::{ + link::{nlas::Nla as LinkNla, LinkMessage}, + route::{nlas::Nla as RouteNla, RouteHeader, RouteMessage}, + rtnl::{ + constants::{RTN_UNICAST, RTPROT_STATIC, RT_SCOPE_UNIVERSE, RT_TABLE_MAIN}, + RouteFlags, + }, + NetlinkMessage, NetlinkPayload, RtnlMessage, +}; +use netlink_sys::SocketAddr; +use rtnetlink::{ + constants::{RTMGRP_IPV4_ROUTE, RTMGRP_IPV6_ROUTE, RTMGRP_LINK, RTMGRP_NOTIFY}, + Handle, IpVersion, +}; + +use libc::{AF_INET, AF_INET6}; + + +pub type Result<T> = std::result::Result<T, Error>; + +/// Errors that can happen in the Linux routing integration +#[derive(err_derive::Error, Debug)] +#[error(no_from)] +pub enum Error { + /// Failed to add route. + #[error(display = "Failed to add route")] + FailedToAddRoute(#[error(source)] io::Error), + + /// Failed to remove route. + #[error(display = "Failed to remove route")] + FailedToRemoveRoute(#[error(source)] io::Error), + + /// Error while running "ip route". + #[error(display = "Error while running \"ip route\"")] + FailedToRunIp(#[error(source)] io::Error), + + /// Invocation of `ip route` ended with a non-zero exit code + #[error(display = "ip returend a non-zero exit code")] + ErrorIpFailed, + + /// Received unexpected output from `ip route` + #[error(display = "Received unexpected output from \"ip\"")] + UnexpectedOutput, + + /// No default route exists + #[error(display = "No default route in \"ip route\" output")] + NoDefaultRoute, + + /// Route table change stream failed. + #[error(display = "Route change listener failed")] + NetlinkConnectionError(#[error(source)] failure::Compat<rtnetlink::Error>), + + #[error(display = "Failed to open a netlink connection")] + ConnectError(#[error(source)] io::Error), + + #[error(display = "Failed to bind netlink socket")] + BindError(#[error(source)] io::Error), + + #[error(display = "Netlink error")] + NetlinkError(#[error(source)] failure::Compat<rtnetlink::Error>), + + #[error(display = "Route without a valid node")] + InvalidRoute, + + #[error(display = "Invalid length of byte buffer for IP address")] + InvalidIpBytes, + + #[error(display = "Invalid network prefix")] + InvalidNetworkPrefix(#[error(source)] ipnetwork::IpNetworkError), + + #[error(display = "Failed to initialize event loop")] + EventLoopError(#[error(source)] io::Error), + + #[error(display = "Unknown device index - {}", _0)] + UnknownDeviceIndex(u32), +} + +pub struct RouteManagerImpl { + shutdown_rx: old_oneshot::Receiver<old_oneshot::Sender<()>>, + manager: RouteManagerImplInner, + runtime: tokio02::runtime::Runtime, +} + +impl RouteManagerImpl { + /// Creates a new RouteManagerImplInner. + pub fn new( + required_routes: HashMap<IpNetwork, NetNode>, + shutdown_rx: old_oneshot::Receiver<old_oneshot::Sender<()>>, + ) -> Result<Self> { + let mut runtime = tokio02::runtime::Builder::new() + .basic_scheduler() + .core_threads(1) + .enable_all() + .thread_name("mullvad-route-manager-event-loop") + .build() + .map_err(Error::EventLoopError)?; + + let manager = runtime.block_on(RouteManagerImplInner::new(required_routes))?; + + Ok(Self { + shutdown_rx, + runtime, + manager, + }) + } + + pub fn wait(self) -> Result<()> { + let Self { + shutdown_rx, + mut runtime, + manager, + } = self; + runtime.block_on(manager.into_future(shutdown_rx)) + } +} + +pub struct RouteManagerImplInner { + handle: Handle, + messages: UnboundedReceiver<(NetlinkMessage<RtnlMessage>, SocketAddr)>, + iface_map: BTreeMap<u32, String>, + + // currently added routes + added_routes: HashSet<Route>, + // default route tracking + // destinations that should be routed through the default route + required_default_routes: HashSet<IpNetwork>, + default_routes: HashSet<Route>, + best_default_node_v4: Option<Node>, + best_default_node_v6: Option<Node>, +} + +impl RouteManagerImplInner { + pub async fn new(required_routes: HashMap<IpNetwork, NetNode>) -> Result<Self> { + let (mut connection, handle, messages) = + rtnetlink::new_connection().map_err(Error::ConnectError)?; + + let mgroup_flags = RTMGRP_IPV4_ROUTE | RTMGRP_IPV6_ROUTE | RTMGRP_LINK | RTMGRP_NOTIFY; + let addr = SocketAddr::new(0, mgroup_flags); + connection + .socket_mut() + .bind(&addr) + .map_err(Error::BindError)?; + + tokio02::spawn(connection); + + let iface_map = Self::initialize_link_map(&handle).await?; + + + let mut required_normal_routes = HashSet::new(); + let mut required_default_routes = HashSet::new(); + + for (destination, node) in required_routes { + match node { + NetNode::RealNode(node) => { + required_normal_routes.insert(Route::new(node, destination)); + } + NetNode::DefaultNode => { + required_default_routes.insert(destination); + } + } + } + + + let mut monitor = Self { + iface_map, + handle, + messages, + + required_default_routes, + added_routes: HashSet::new(), + + default_routes: HashSet::new(), + best_default_node_v4: None, + best_default_node_v6: None, + }; + + monitor.default_routes = monitor.get_default_routes().await?; + monitor.best_default_node_v4 = + Self::pick_best_default_node(&monitor.default_routes, IpVersion::V4); + monitor.best_default_node_v6 = + Self::pick_best_default_node(&monitor.default_routes, IpVersion::V6); + + + for normal_route in required_normal_routes.into_iter() { + monitor.add_route(normal_route).await?; + } + + for prefix in monitor.required_default_routes.clone().into_iter() { + if let (false, _, Some(default_node)) | (true, Some(default_node), _) = ( + prefix.is_ipv4(), + &monitor.best_default_node_v4, + &monitor.best_default_node_v6, + ) { + // best to pick a single node identifier rather than device + ip + let route = Route::new(default_node.clone(), prefix); + monitor.add_route(route).await?; + } + } + Ok(monitor) + } + + async fn get_default_routes(&self) -> Result<HashSet<Route>> { + let mut routes = self.get_default_routes_inner(IpVersion::V4).await?; + routes.extend(self.get_default_routes_inner(IpVersion::V6).await?); + Ok(routes) + } + + async fn get_default_routes_inner(&self, version: IpVersion) -> Result<HashSet<Route>> { + let mut routes = HashSet::new(); + let mut route_request = self.handle.route().get(version).execute(); + if let Some(route) = route_request + .try_next() + .await + .map_err(failure::Fail::compat) + .map_err(Error::NetlinkError)? + { + if route.header.destination_prefix_length == 0 { + if let Some(default_route) = self.parse_route_message(route)? { + routes.insert(default_route); + } + } + }; + Ok(routes) + } + + async fn initialize_link_map(handle: &rtnetlink::Handle) -> Result<BTreeMap<u32, String>> { + let mut link_map = BTreeMap::new(); + let mut link_request = handle.link().get().execute(); + while let Some(link) = link_request + .try_next() + .await + .map_err(failure::Fail::compat) + .map_err(Error::NetlinkError)? + { + if let Some((idx, link_name)) = Self::map_iface_name_to_idx(link) { + link_map.insert(idx, link_name); + } + } + + Ok(link_map) + } + + fn find_iface_idx(&self, iface_name: &str) -> Option<u32> { + self.iface_map + .iter() + .find(|(_idx, name)| name.as_str() == iface_name) + .map(|(idx, _name)| *idx) + } + + + async fn process_new_route(&mut self, route: Route) -> Result<()> { + if route.prefix.prefix() == 0 { + self.default_routes.insert(route); + self.update_default_routes().await?; + } + Ok(()) + } + + async fn process_deleted_route(&mut self, route: Route) -> Result<()> { + if route.prefix.prefix() == 0 { + self.default_routes.remove(&route); + self.update_default_routes().await?; + } + if self.added_routes.contains(&route) { + self.added_routes.remove(&route); + } + Ok(()) + } + + async fn update_default_routes(&mut self) -> Result<()> { + let new_best_v4 = Self::pick_best_default_node(&self.default_routes, IpVersion::V4); + if self.best_default_node_v4 != new_best_v4 && new_best_v4.is_some() { + let new_node = new_best_v4.unwrap(); + let old_node = self.best_default_node_v4.take(); + let v4_destinations: Vec<_> = self + .required_default_routes + .iter() + .filter(|ip| ip.is_ipv4()) + .cloned() + .collect(); + for destination in v4_destinations { + let new_route = Route::new(new_node.clone(), destination); + if let Some(old_node) = &old_node { + let old_route = Route::new(old_node.clone(), destination); + if let Err(e) = self.delete_route(&old_route).await { + log::error!("Failed to remove old route {} - {}", &old_route, e); + } + } + if let Err(e) = self.add_route(new_route).await { + log::error!("Failed to add new route {} - {}", &new_node, e); + } + } + self.best_default_node_v4 = Some(new_node); + } + + let new_best_v6 = Self::pick_best_default_node(&self.default_routes, IpVersion::V6); + if self.best_default_node_v6 != new_best_v6 && new_best_v6.is_some() { + let new_node = new_best_v6.unwrap(); + let old_node = self.best_default_node_v6.take(); + let v6_destinations: Vec<_> = self + .required_default_routes + .iter() + .filter(|ip| !ip.is_ipv4()) + .cloned() + .collect(); + + for destination in v6_destinations { + let new_route = Route::new(new_node.clone(), destination); + if let Some(old_node) = &old_node { + let old_route = Route::new(old_node.clone(), destination); + + if let Err(e) = self.delete_route(&old_route).await { + log::error!("Failed to remove old route {} - {}", &old_route, e); + } + } + if let Err(e) = self.add_route(new_route).await { + log::error!("Failed to add new route {} - {}", &new_node, e); + } + } + self.best_default_node_v6 = Some(new_node); + } + + Ok(()) + } + + fn pick_best_default_node(routes: &HashSet<Route>, version: IpVersion) -> Option<Node> { + // Pick the route with the lowest metric - thus the most favourable route. + routes + .iter() + .filter(|route| route.prefix.is_ipv4() == (version == IpVersion::V4)) + .fold( + None, + |best_route: Option<Route>, next_route| match best_route { + Some(current_best) => { + if current_best.metric.unwrap_or(0) > next_route.metric.unwrap_or(0) { + Some(next_route.clone()) + } else { + Some(current_best) + } + } + None => Some(next_route.clone()), + }, + ) + .map(|route| route.node) + } + + async fn cleanup_routes(&mut self) { + for route in self.added_routes.drain().collect::<Vec<_>>().iter() { + if let Err(e) = self.delete_route(&route).await { + if let Error::NetlinkError(err) = &e { + if let rtnetlink::ErrorKind::NetlinkError(msg) = err.get_ref().kind() { + // -3 means that the route doesn't exist anymore anyway + if msg.code == -3 { + continue; + } + } + } + log::error!("Failed to remove route - {} - {}", route, e); + } + } + } + + + pub async fn into_future( + mut self, + shutdown_rx: futures01::sync::oneshot::Receiver<futures01::sync::oneshot::Sender<()>>, + ) -> Result<()> { + futures::select! { + shutdown_signal = shutdown_rx.compat().fuse() => { + log::trace!("Shutting down route manager"); + self.cleanup_routes().await; + log::trace!("Route manager done"); + if let Ok(shutdown_signal) = shutdown_signal { + let _ = shutdown_signal.send(()); + } + return Ok(()); + }, + (route_change, socket) = self.messages.select_next_some().fuse() => { + self.process_netlink_message(route_change).await?; + } + }; + Ok(()) + } + + async fn process_netlink_message(&mut self, msg: NetlinkMessage<RtnlMessage>) -> Result<()> { + match msg.payload { + NetlinkPayload::InnerMessage(RtnlMessage::NewLink(new_link)) => { + if let Some((idx, name)) = Self::map_iface_name_to_idx(new_link) { + self.iface_map.insert(idx, name); + } + } + NetlinkPayload::InnerMessage(RtnlMessage::DelLink(old_link)) => { + if let Some((idx, _)) = Self::map_iface_name_to_idx(old_link) { + self.iface_map.remove(&idx); + } + } + + NetlinkPayload::InnerMessage(RtnlMessage::NewRoute(new_route)) => { + if let Some(new_route) = self.parse_route_message(new_route)? { + self.process_new_route(new_route).await?; + } + } + NetlinkPayload::InnerMessage(RtnlMessage::DelRoute(old_route)) => { + if let Some(deletion) = self.parse_route_message(old_route)? { + self.process_deleted_route(deletion).await?; + } + } + _ => (), + }; + Ok(()) + } + + // Tries to coax a Route out of a RouteMessage, but only if it's a route from the main routing + // table + // TODO: Change to account for different routing tables. + fn parse_route_message(&self, msg: RouteMessage) -> Result<Option<Route>> { + if msg.header.table != RT_TABLE_MAIN { + return Ok(None); + } + + + let mut prefix = None; + let mut node_addr = None; + let mut device = None; + let mut metric = None; + let mut gateway = None; + + let destination_length = msg.header.destination_prefix_length; + let af_spec = msg.header.address_family; + + for nla in msg.nlas.iter() { + match nla { + RouteNla::Oif(device_idx) => { + match self.iface_map.get(&device_idx) { + Some(device_name) => device = Some(device_name.to_string()), + None => { + return Err(Error::UnknownDeviceIndex(*device_idx)); + } + }; + } + + RouteNla::Via(addr) => { + node_addr = Self::parse_ip(&addr).map(Some)?; + } + + RouteNla::Destination(addr) => { + prefix = Self::parse_ip(&addr) + .and_then(|ip| { + ipnetwork::IpNetwork::new(ip, destination_length) + .map_err(Error::InvalidNetworkPrefix) + }) + .map(Some)?; + } + + // gateway NLAs indicate that this is actually a default route + RouteNla::Gateway(gateway_ip) => { + gateway = Self::parse_ip(&gateway_ip).map(Some)?; + } + + RouteNla::Priority(priority) => { + metric = Some(*priority); + } + _ => continue, + } + } + + // when a gateway is specified but prefix is none, then this is a default route + if prefix.is_none() && gateway.is_some() { + prefix = match af_spec as i32 { + AF_INET => Some("0.0.0.0/0".parse().expect("failed to parse ipnetwork")), + AF_INET6 => Some("::/0".parse().expect("failed to parse ipnetwork")), + _ => None, + }; + } + + if device.is_none() && node_addr.is_none() || prefix.is_none() { + return Err(Error::InvalidRoute); + } + + + let node = Node { + ip: node_addr.or(gateway), + device, + }; + + Ok(Some(Route { + node, + prefix: prefix.unwrap(), + metric, + })) + } + + fn map_iface_name_to_idx(msg: LinkMessage) -> Option<(u32, String)> { + let index = msg.header.index; + for nla in msg.nlas { + if let LinkNla::IfName(name) = nla { + return Some((index, name)); + } + } + None + } + + fn parse_ip(bytes: &[u8]) -> Result<IpAddr> { + if bytes.len() == 4 { + let mut ipv4_bytes = [0u8; 4]; + ipv4_bytes.copy_from_slice(bytes); + Ok(IpAddr::from(ipv4_bytes)) + } else if bytes.len() == 16 { + let mut ipv6_bytes = [0u8; 16]; + ipv6_bytes.copy_from_slice(bytes); + Ok(IpAddr::from(ipv6_bytes)) + } else { + log::error!("Expected either 4 or 16 bytes, got {} bytes", bytes.len()); + Err(Error::InvalidIpBytes) + } + } + + async fn delete_route(&self, route: &Route) -> Result<()> { + let mut route_message = RouteMessage { + header: RouteHeader { + address_family: if route.prefix.is_ipv4() { + AF_INET as u8 + } else { + AF_INET6 as u8 + }, + source_prefix_length: 0, + destination_prefix_length: route.prefix.prefix(), + tos: 0u8, + table: RT_TABLE_MAIN, + protocol: RTPROT_STATIC, + scope: RT_SCOPE_UNIVERSE, + kind: RTN_UNICAST, + flags: RouteFlags::empty(), + }, + nlas: vec![RouteNla::Destination(ip_to_bytes(route.prefix.ip()))], + }; + if let Some(interface_name) = route.node.get_device() { + if let Some(iface_idx) = self.find_iface_idx(interface_name) { + route_message.nlas.push(RouteNla::Oif(iface_idx)); + } + } + + if let Some(gateway) = route.node.get_address() { + let gateway_nla = if route.node.get_device().is_some() { + RouteNla::Gateway(ip_to_bytes(gateway)) + } else { + RouteNla::Via(ip_to_bytes(gateway)) + }; + route_message.nlas.push(gateway_nla); + } + + + self.handle + .route() + .del(route_message) + .execute() + .await + .map_err(failure::Fail::compat) + .map_err(Error::NetlinkError) + } + + async fn add_route(&mut self, route: Route) -> Result<()> { + let add_message = match &route.prefix { + IpNetwork::V4(v4_prefix) => { + let mut add_message = self + .handle + .route() + .add_v4() + .destination_prefix(v4_prefix.ip(), v4_prefix.prefix()); + + if v4_prefix.size() > 1 { + add_message = add_message.scope(RT_SCOPE_LINK) + } + + if let Some(IpAddr::V4(node_address)) = route.node.get_address() { + add_message = add_message.gateway(node_address); + } + + if let Some(interface_name) = route.node.get_device() { + if let Some(iface_idx) = self.find_iface_idx(interface_name) { + add_message = add_message.output_interface(iface_idx); + } + } + + add_message.message_mut().clone() + } + + IpNetwork::V6(v6_prefix) => { + let mut add_message = self + .handle + .route() + .add_v6() + .destination_prefix(v6_prefix.ip(), v6_prefix.prefix()); + + if v6_prefix.size() > 1 { + add_message = add_message.scope(RT_SCOPE_LINK) + } + + if let Some(IpAddr::V6(node_address)) = route.node.get_address() { + add_message = add_message.gateway(node_address); + } + + if let Some(interface_name) = route.node.get_device() { + if let Some(iface_idx) = self.find_iface_idx(interface_name) { + add_message = add_message.output_interface(iface_idx); + } + } + + add_message.message_mut().clone() + } + }; + + // Need to modify the request in place to set the correct flags to be able to replace any + // existing routes - self.handle.route().add_v4().execute() sets the NLM_F_EXCL flag which + // will make the request fail if a route with the same destination already exists. + use netlink_packet_route::constants::*; + let mut req = NetlinkMessage::from(RtnlMessage::NewRoute(add_message)); + req.header.flags = NLM_F_REQUEST | NLM_F_ACK | NLM_F_CREATE | NLM_F_REPLACE; + + let mut response = self + .handle + .request(req) + .map_err(failure::Fail::compat) + .map_err(Error::NetlinkError)?; + + while let Some(message) = response.next().await { + if let NetlinkPayload::Error(err) = message.payload { + let compat_err = + failure::Fail::compat(rtnetlink::ErrorKind::NetlinkError(err).into()); + return Err(Error::NetlinkError(compat_err)); + } + } + self.added_routes.insert(route.clone()); + Ok(()) + } +} + +impl Drop for RouteManagerImplInner { + fn drop(&mut self) { + futures::executor::block_on(self.cleanup_routes()) + } +} + +fn ip_to_bytes(addr: IpAddr) -> Vec<u8> { + match addr { + IpAddr::V4(addr) => addr.octets().to_vec(), + IpAddr::V6(addr) => addr.octets().to_vec(), + } +} diff --git a/talpid-core/src/routing/linux/change_listener.rs b/talpid-core/src/routing/linux/change_listener.rs deleted file mode 100644 index 82608a59a7..0000000000 --- a/talpid-core/src/routing/linux/change_listener.rs +++ /dev/null @@ -1,241 +0,0 @@ -use crate::routing::{Node, Route}; - -use super::RouteChange; -use futures::{future::Either, sync::mpsc, Async, Future, Stream}; -use std::{collections::BTreeMap, io, net::IpAddr}; - -use netlink_packet::{ - LinkMessage, LinkNla, NetlinkMessage, NetlinkPayload, RouteMessage, RouteNla, RtnlMessage, -}; -use netlink_sys::SocketAddr; -use rtnetlink::constants::{ - AF_INET, AF_INET6, RTMGRP_IPV4_ROUTE, RTMGRP_IPV6_ROUTE, RTMGRP_LINK, RTMGRP_NOTIFY, -}; - -#[derive(err_derive::Error, Debug)] -#[error(no_from)] -pub enum Error { - #[error(display = "Netlink connection failed")] - NetlinkError(#[error(source)] failure::Compat<rtnetlink::Error>), - #[error(display = "Netlink protocol error")] - NetlinkProtocolError(#[error(source)] failure::Compat<netlink_proto::Error>), - #[error(display = "Failed to open a netlink connection")] - ConnectError(#[error(source)] io::Error), - #[error(display = "Route without a valid node")] - InvalidRoute, - #[error(display = "Invalid length of byte buffer for IP address")] - InvalidIpBytes, - #[error(display = "Invalid network prefix")] - InvalidNetworkPrefix(#[error(source)] ipnetwork::IpNetworkError), - #[error(display = "Unknown device index - {}", _0)] - UnknownDeviceIndex(u32), - #[error(display = "Failed to bind netlink socket")] - BindError(#[error(source)] io::Error), - #[error(display = "Netlink connection stopped sending messages")] - NetlinkConnectionClosed, -} - -type Result<T> = std::result::Result<T, Error>; - -pub(super) struct RouteChangeListener { - connection: rtnetlink::Connection, - messages: mpsc::UnboundedReceiver<NetlinkMessage>, - iface_map: BTreeMap<u32, String>, -} - -impl RouteChangeListener { - pub fn new() -> Result<Self> { - let (mut connection, handle, messages) = - rtnetlink::new_connection_with_messages().map_err(Error::ConnectError)?; - - let mgroup_flags = RTMGRP_IPV4_ROUTE | RTMGRP_IPV6_ROUTE | RTMGRP_LINK | RTMGRP_NOTIFY; - let addr = SocketAddr::new(0, mgroup_flags); - connection - .socket_mut() - .bind(&addr) - .map_err(Error::BindError)?; - - let (iface_map, connection) = Self::initialize_link_map(connection, handle)?; - - Ok(Self { - connection, - messages, - iface_map, - }) - } - - fn map_netlink_to_route_change(&mut self, msg: NetlinkMessage) -> Result<Option<RouteChange>> { - match msg.payload { - NetlinkPayload::Rtnl(RtnlMessage::NewLink(new_link)) => { - if let Some((idx, name)) = Self::map_iface_name_to_idx(new_link) { - self.iface_map.insert(idx, name); - } - Ok(None) - } - NetlinkPayload::Rtnl(RtnlMessage::DelLink(old_link)) => { - if let Some((idx, _)) = Self::map_iface_name_to_idx(old_link) { - self.iface_map.remove(&idx); - } - Ok(None) - } - - NetlinkPayload::Rtnl(RtnlMessage::NewRoute(new_route)) => { - self.get_route(new_route).map(RouteChange::Add).map(Some) - } - NetlinkPayload::Rtnl(RtnlMessage::DelRoute(old_route)) => { - self.get_route(old_route).map(RouteChange::Remove).map(Some) - } - _ => Ok(None), - } - } - - // Tries to coax a Route out of a RouteMessage - fn get_route(&self, msg: RouteMessage) -> Result<Route> { - let mut prefix = None; - let mut node_addr = None; - let mut device = None; - let mut metric = None; - let mut gateway = None; - - let destination_length = msg.header.destination_length; - let af_spec = msg.header.address_family; - - for nla in msg.nlas.iter() { - match nla { - RouteNla::Oif(device_idx) => { - match self.iface_map.get(&device_idx) { - Some(device_name) => device = Some(device_name.to_string()), - None => { - return Err(Error::UnknownDeviceIndex(*device_idx)); - } - }; - } - - RouteNla::Via(addr) => { - node_addr = Self::parse_ip(&addr).map(Some)?; - } - - RouteNla::Destination(addr) => { - prefix = Self::parse_ip(&addr) - .and_then(|ip| { - ipnetwork::IpNetwork::new(ip, destination_length) - .map_err(Error::InvalidNetworkPrefix) - }) - .map(Some)?; - } - - // gateway NLAs indicate that this is actually a default route - RouteNla::Gateway(gateway_ip) => { - gateway = Self::parse_ip(&gateway_ip).map(Some)?; - } - - RouteNla::Priority(priority) => { - metric = Some(*priority); - } - _ => continue, - } - } - - // when a gateway is specified but prefix is none, then this is a default route - if prefix.is_none() && gateway.is_some() { - prefix = match af_spec as u16 { - AF_INET => Some("0.0.0.0/0".parse().expect("failed to parse ipnetwork")), - AF_INET6 => Some("::/0".parse().expect("failed to parse ipnetwork")), - _ => None, - }; - } - - if device.is_none() && node_addr.is_none() || prefix.is_none() { - return Err(Error::InvalidRoute); - } - - - let node = Node { - ip: node_addr, - device, - }; - - Ok(Route { - node, - prefix: prefix.unwrap(), - metric, - }) - } - - fn map_iface_name_to_idx(msg: LinkMessage) -> Option<(u32, String)> { - let index = msg.header.index; - for nla in msg.nlas { - match nla { - LinkNla::IfName(name) => return Some((index, name)), - _ => continue, - } - } - None - } - - fn parse_ip(bytes: &[u8]) -> Result<IpAddr> { - if bytes.len() == 4 { - let mut ipv4_bytes = [0u8; 4]; - ipv4_bytes.copy_from_slice(bytes); - Ok(IpAddr::from(ipv4_bytes)) - } else if bytes.len() == 16 { - let mut ipv6_bytes = [0u8; 16]; - ipv6_bytes.copy_from_slice(bytes); - Ok(IpAddr::from(ipv6_bytes)) - } else { - log::error!("Expected either 4 or 16 bytes, got {} bytes", bytes.len()); - Err(Error::InvalidIpBytes) - } - } - - pub fn initialize_link_map( - connection: rtnetlink::Connection, - handle: rtnetlink::Handle, - ) -> Result<(BTreeMap<u32, String>, rtnetlink::Connection)> { - let request = handle - .link() - .get() - .execute() - .filter_map(Self::map_iface_name_to_idx) - .collect(); - - match connection.select2(request).wait() { - Ok(Either::A(_)) => Err(Error::NetlinkConnectionClosed), - Err(Either::A((error, _))) => { - Err(Error::NetlinkProtocolError(failure::Fail::compat(error))) - } - Ok(Either::B((links, connection))) => Ok((links.into_iter().collect(), connection)), - Err(Either::B((error, _))) => Err(Error::NetlinkError(failure::Fail::compat(error))), - } - } -} - -impl Stream for RouteChangeListener { - type Item = RouteChange; - type Error = Error; - - fn poll(&mut self) -> Result<Async<Option<RouteChange>>> { - self.connection - .poll() - .map_err(failure::Fail::compat) - .map_err(Error::NetlinkProtocolError)?; - - loop { - match futures::try_ready!(self - .messages - .poll() - .map_err(|_| Error::NetlinkConnectionClosed)) - { - Some(message) => { - if let Some(route_change) = self.map_netlink_to_route_change(message)? { - return Ok(Async::Ready(Some(route_change))); - }; - continue; - } - None => { - return Err(Error::NetlinkConnectionClosed); - } - } - } - } -} diff --git a/talpid-core/src/routing/linux/mod.rs b/talpid-core/src/routing/linux/mod.rs deleted file mode 100644 index cdc57c87e1..0000000000 --- a/talpid-core/src/routing/linux/mod.rs +++ /dev/null @@ -1,440 +0,0 @@ -use crate::routing::{NetNode, Node, Route}; - -use ipnetwork::IpNetwork; -use std::{ - collections::{HashMap, HashSet}, - io, - process::{Command, Stdio}, -}; - -mod change_listener; -use change_listener::{Error as RouteChangeListenerError, RouteChangeListener}; - -use futures::{sync::oneshot, Async, Future, Stream}; - -pub type Result<T> = std::result::Result<T, Error>; - -/// Errors that can happen in the Linux routing integration -#[derive(err_derive::Error, Debug)] -#[error(no_from)] -pub enum Error { - /// Failed to add route. - #[error(display = "Failed to add route")] - FailedToAddRoute(#[error(source)] io::Error), - - /// Failed to remove route. - #[error(display = "Failed to remove route")] - FailedToRemoveRoute(#[error(source)] io::Error), - - /// Error while running "ip route". - #[error(display = "Error while running \"ip route\"")] - FailedToRunIp(#[error(source)] io::Error), - - /// Invocation of `ip route` ended with a non-zero exit code - #[error(display = "ip returend a non-zero exit code")] - ErrorIpFailed, - - /// Received unexpected output from `ip route` - #[error(display = "Received unexpected output from \"ip\"")] - UnexpectedOutput, - - /// No default route exists - #[error(display = "No default route in \"ip route\" output")] - NoDefaultRoute, - - /// Route table change stream failed. - #[error(display = "Route change listener failed")] - ChangeListenerError(#[error(source)] RouteChangeListenerError), - - /// Route table change stream failed. - #[error(display = "Route change listener closed unexpectedly")] - ChangeListenerClosed, -} - -pub struct RouteManagerImpl { - changes: RouteChangeListener, - - // currently added routes - added_routes: HashSet<Route>, - // default route tracking - // destinations that should be routed through the default route - required_default_routes: HashSet<IpNetwork>, - default_routes: HashSet<Route>, - best_default_node_v4: Option<Node>, - best_default_node_v6: Option<Node>, - - // if the stop channel is set, the future should wind down - remove added routes and send a - // signal. - shutdown_finished_tx: Option<oneshot::Sender<()>>, - shutdown_rx: oneshot::Receiver<oneshot::Sender<()>>, - should_shut_down: bool, -} - -impl RouteManagerImpl { - /// Creates a new RouteManager. - pub fn new( - required_routes: HashMap<IpNetwork, NetNode>, - shutdown_rx: oneshot::Receiver<oneshot::Sender<()>>, - ) -> Result<Self> { - let changes = RouteChangeListener::new().map_err(Error::ChangeListenerError)?; - - let mut required_normal_routes = HashSet::new(); - let mut required_default_routes = HashSet::new(); - - for (destination, node) in required_routes { - match node { - NetNode::RealNode(node) => { - required_normal_routes.insert(Route::new(node, destination)); - } - NetNode::DefaultNode => { - required_default_routes.insert(destination); - } - } - } - - let default_routes = Self::get_default_routes()?; - - let best_default_node_v4 = Self::pick_best_default_node(&default_routes, true); - let best_default_node_v6 = Self::pick_best_default_node(&default_routes, false); - - let mut monitor = Self { - changes, - - required_default_routes, - added_routes: HashSet::new(), - - default_routes, - best_default_node_v4, - best_default_node_v6, - - shutdown_finished_tx: None, - shutdown_rx, - should_shut_down: false, - }; - for normal_route in required_normal_routes.iter() { - monitor.add_route(&normal_route)?; - } - - for prefix in monitor.required_default_routes.clone().into_iter() { - if let (false, _, Some(default_node)) | (true, Some(default_node), _) = ( - prefix.is_ipv4(), - &monitor.best_default_node_v4, - &monitor.best_default_node_v6, - ) { - // best to pick a single node identifier rather than device + ip - let route = Route::new(default_node.clone(), prefix); - monitor.add_route(&route)?; - } - } - Ok(monitor) - } - - fn process_route_table_change(&mut self) -> Result<()> { - loop { - let change = self.changes.poll().map_err(Error::ChangeListenerError)?; - match change { - Async::NotReady => return Ok(()), - Async::Ready(Some(RouteChange::Add(route))) => self.process_new_route(route), - Async::Ready(Some(RouteChange::Remove(route))) => self.process_deleted_route(route), - Async::Ready(None) => return Err(Error::ChangeListenerClosed), - } - } - } - - fn process_new_route(&mut self, route: Route) { - if route.prefix.prefix() == 0 { - self.default_routes.insert(route); - self.update_default_routes(); - } - } - - fn process_deleted_route(&mut self, route: Route) { - if route.prefix.prefix() == 0 { - self.update_default_routes(); - } - } - - fn update_default_routes(&mut self) { - let new_best_v4 = Self::pick_best_default_node(&self.default_routes, true); - if self.best_default_node_v4 != new_best_v4 && new_best_v4.is_some() { - let new_node = new_best_v4.unwrap(); - let old_node = self.best_default_node_v4.take(); - let v4_destinations: Vec<_> = self - .required_default_routes - .iter() - .filter(|ip| ip.is_ipv4()) - .cloned() - .collect(); - for destination in v4_destinations { - let new_route = Route::new(new_node.clone(), destination); - if let Some(old_node) = &old_node { - let old_route = Route::new(old_node.clone(), destination); - if let Err(e) = self.delete_route(&old_route) { - log::error!("Failed to remove old route {} - {}", &old_route, e); - } - } - if let Err(e) = self.add_route(&new_route) { - log::error!("Failed to add new route {} - {}", &new_node, e); - } - } - self.best_default_node_v4 = Some(new_node); - } - - let new_best_v6 = Self::pick_best_default_node(&self.default_routes, false); - if self.best_default_node_v6 != new_best_v6 && new_best_v6.is_some() { - let new_node = new_best_v6.unwrap(); - let old_node = self.best_default_node_v6.take(); - let v6_destinations: Vec<_> = self - .required_default_routes - .iter() - .filter(|ip| !ip.is_ipv4()) - .cloned() - .collect(); - - for destination in v6_destinations { - let new_route = Route::new(new_node.clone(), destination); - if let Some(old_node) = &old_node { - let old_route = Route::new(old_node.clone(), destination); - - if let Err(e) = self.delete_route(&old_route) { - log::error!("Failed to remove old route {} - {}", &old_route, e); - } - } - if let Err(e) = self.add_route(&new_route) { - log::error!("Failed to add new route {} - {}", &new_node, e); - } - } - self.best_default_node_v6 = Some(new_node); - } - } - - fn pick_best_default_node(routes: &HashSet<Route>, v4: bool) -> Option<Node> { - // Pick the route with the lowest metric - thus the most favourable route. - routes - .iter() - .filter(|route| route.prefix.is_ipv4() == v4) - .fold( - None, - |best_route: Option<Route>, next_route| match best_route { - Some(current_best) => { - if current_best.metric.unwrap_or(0) > next_route.metric.unwrap_or(0) { - Some(next_route.clone()) - } else { - Some(current_best) - } - } - None => Some(next_route.clone()), - }, - ) - .map(|route| route.node) - } - - fn route_cmd(action: &str, route: &Route) -> Command { - let mut cmd = Command::new("ip"); - - cmd.arg(ip_vers(&route)) - .arg("route") - .arg(action) - .arg(route.prefix.to_string()); - - if let Some(addr) = route.node.get_address() { - cmd.arg("via").arg(addr.to_string()); - }; - if let Some(device) = route.node.get_device() { - cmd.arg("dev").arg(device); - }; - if let Some(metric) = route.metric { - cmd.arg("metric").arg(metric.to_string()); - }; - - cmd - } - - fn run_cmd(mut cmd: Command, err: impl Fn(io::Error) -> Error) -> Result<()> { - log::trace!("running cmd - {:?}", &cmd); - let status = cmd.status().map_err(|e| err(e))?; - match status.code() { - Some(0) => Ok(()), - Some(i) => Err(err(io::Error::new( - io::ErrorKind::Other, - format!("exit status {}", i), - ))), - None => Err(err(io::Error::new( - io::ErrorKind::Other, - "interrupted by signal", - ))), - } - } - - fn get_default_routes_inner(ip_version: IpVersion) -> Result<Vec<Route>> { - let mut cmd = Command::new("ip"); - cmd.arg(ip_version.to_route_arg()).arg("route").arg("show"); - - cmd.stdout(Stdio::piped()) - .output() - .map_err(Error::FailedToRunIp) - .and_then(move |output| { - let output_lines = String::from_utf8(output.stdout.clone()) - .map_err(|_| Error::UnexpectedOutput)?; - Ok(output_lines - .lines() - .filter_map(|line| { - if line.starts_with("default") { - parse_ip_route_show_line(line, ip_version) - } else { - None - } - }) - .collect()) - }) - } - - /// Adds routes to the system routing table. - fn add_route(&mut self, route: &Route) -> Result<()> { - let cmd = Self::route_cmd("replace", route); - Self::run_cmd(cmd, Error::FailedToAddRoute)?; - self.added_routes.insert(route.clone()); - Ok(()) - } - - /// Removes previously set routes. If routes were set for specific tables, the whole tables - /// will be removed. - fn delete_route(&mut self, route: &Route) -> Result<()> { - let cmd = Self::route_cmd("delete", route); - Self::run_cmd(cmd, Error::FailedToRemoveRoute)?; - self.added_routes.remove(route); - Ok(()) - } - - fn cleanup_routes(&mut self) { - for route in self.added_routes.drain().collect::<Vec<_>>().iter() { - if let Err(e) = self.delete_route(&route) { - log::error!("Failed to remove route - {} - {}", route, e); - } - } - } - - - /// Retrieves the gateway for the default route - fn get_default_routes() -> Result<HashSet<Route>> { - let v4_routes = Self::get_default_routes_inner(IpVersion::V4)?; - let v6_routes = Self::get_default_routes_inner(IpVersion::V6)?; - Ok(v4_routes.into_iter().chain(v6_routes.into_iter()).collect()) - } -} - -#[derive(Debug, Copy, Clone)] -enum IpVersion { - V4, - V6, -} - -impl IpVersion { - fn to_route_arg(self) -> &'static str { - match self { - IpVersion::V4 => "-4", - IpVersion::V6 => "-6", - } - } -} - -impl Future for RouteManagerImpl { - type Item = (); - type Error = Error; - fn poll(&mut self) -> Result<Async<()>> { - if !self.should_shut_down { - match self.shutdown_rx.poll() { - Ok(Async::NotReady) => (), - Ok(Async::Ready(tx)) => { - self.should_shut_down = true; - self.shutdown_finished_tx = Some(tx); - } - Err(_) => { - self.should_shut_down = true; - } - }; - self.process_route_table_change()?; - } - if self.should_shut_down { - self.cleanup_routes(); - if let Some(tx) = self.shutdown_finished_tx.take() { - if tx.send(()).is_err() { - log::error!("RouteManagerHandle already stopped"); - } - } - Ok(Async::Ready(())) - } else { - Ok(Async::NotReady) - } - } -} - -impl Drop for RouteManagerImpl { - fn drop(&mut self) { - self.cleanup_routes(); - } -} - -// intended to parse lines sucha as the following: -// default via 192.168.1.1 dev wlp61s0 proto dhcp metric 600 -fn parse_ip_route_show_line(line: &str, ip_version: IpVersion) -> Option<Route> { - let mut node_ip = None; - let mut device = None; - let mut metric = None; - - let mut tokens = line.split_whitespace(); - let prefix_str = tokens.next()?; - let prefix = match prefix_str { - "default" => match ip_version { - IpVersion::V4 => "0.0.0.0/0".parse().unwrap(), - IpVersion::V6 => "::/0".parse().unwrap(), - }, - prefix_str => prefix_str.parse().ok()?, - }; - - let tokens: Vec<&str> = tokens.collect(); - for pair in tokens.chunks(2) { - if pair.len() != 2 { - log::error!("unexpected output from ip"); - break; - } - let kind = pair[0]; - let value = pair[1]; - - match kind { - "via" => node_ip = value.parse().ok(), - "dev" => device = Some(value.to_string()), - "metric" => metric = value.parse().ok(), - _ => continue, - }; - } - - if node_ip.is_none() && device.is_none() { - None - } else { - let node = Node { - ip: node_ip, - device, - }; - - Some(Route { - node, - prefix, - metric, - }) - } -} - -fn ip_vers(route: &Route) -> &'static str { - if route.prefix.is_ipv4() { - "-4" - } else { - "-6" - } -} - -#[derive(Debug, PartialEq)] -enum RouteChange { - Add(Route), - Remove(Route), -} diff --git a/talpid-core/src/routing/mod.rs b/talpid-core/src/routing/mod.rs index e98bf1992f..2baf625f2f 100644 --- a/talpid-core/src/routing/mod.rs +++ b/talpid-core/src/routing/mod.rs @@ -30,10 +30,6 @@ impl Route { metric: None, } } - - fn is_ipv4(&self) -> bool { - self.prefix.is_ipv4() - } } impl fmt::Display for Route { diff --git a/talpid-core/src/routing/unix.rs b/talpid-core/src/routing/unix.rs index 6a379e1eca..dffd6c21b4 100644 --- a/talpid-core/src/routing/unix.rs +++ b/talpid-core/src/routing/unix.rs @@ -11,7 +11,7 @@ use std::{collections::HashMap, sync::mpsc::sync_channel}; mod imp; #[cfg(target_os = "linux")] -#[path = "linux/mod.rs"] +#[path = "linux.rs"] mod imp; #[cfg(target_os = "android")] |
