diff options
| author | Emīls Piņķis <emils@mullvad.net> | 2019-05-09 12:04:21 +0100 |
|---|---|---|
| committer | Emīls Piņķis <emils@mullvad.net> | 2019-05-09 12:04:21 +0100 |
| commit | abf84cc6adc4f975aa6514597bf7862d6489a6b4 (patch) | |
| tree | 3fd02a8c95708ef30b385171a0476609d5ad19df | |
| parent | 304695494a8e4a3f34fd9882db43b3305ca88503 (diff) | |
| parent | 6b941de39a182a9a5b3cfe3b4730f6078812ec05 (diff) | |
| download | mullvadvpn-abf84cc6adc4f975aa6514597bf7862d6489a6b4.tar.xz mullvadvpn-abf84cc6adc4f975aa6514597bf7862d6489a6b4.zip | |
Merge branch 'improve-routing'
| -rw-r--r-- | .dockerignore | 1 | ||||
| -rw-r--r-- | .travis.yml | 55 | ||||
| -rw-r--r-- | CHANGELOG.md | 2 | ||||
| -rw-r--r-- | Cargo.lock | 52 | ||||
| -rw-r--r-- | Dockerfile | 20 | ||||
| -rw-r--r-- | ci/rust-linux-script.sh | 15 | ||||
| m--------- | dist-assets/binaries | 0 | ||||
| -rw-r--r-- | mullvad-daemon/src/logging.rs | 1 | ||||
| -rw-r--r-- | talpid-core/Cargo.toml | 3 | ||||
| -rw-r--r-- | talpid-core/src/routing/android.rs | 35 | ||||
| -rw-r--r-- | talpid-core/src/routing/linux.rs | 141 | ||||
| -rw-r--r-- | talpid-core/src/routing/linux/change_listener.rs | 241 | ||||
| -rw-r--r-- | talpid-core/src/routing/linux/mod.rs | 541 | ||||
| -rw-r--r-- | talpid-core/src/routing/macos.rs | 475 | ||||
| -rw-r--r-- | talpid-core/src/routing/mod.rs | 208 | ||||
| -rw-r--r-- | talpid-core/src/routing/subprocess.rs | 46 | ||||
| -rw-r--r-- | talpid-core/src/tunnel/wireguard/mod.rs | 68 |
17 files changed, 1485 insertions, 419 deletions
diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000000..91224e5de8 --- /dev/null +++ b/.dockerignore @@ -0,0 +1 @@ +**/* diff --git a/.travis.yml b/.travis.yml index ed1c113976..c6ae998b4c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -58,48 +58,43 @@ matrix: - language: rust rust: stable os: osx - before_script: &rust_before_script + before_script: - source env.sh - export RUSTFLAGS="--deny unused_imports --deny dead_code" - env - script: &rust_script + script: - cargo build --verbose - cargo test --verbose # Daemon - Linux - - language: rust - rust: nightly - os: linux - addons: &rust_linux_addons - apt: - sources: - - sourceline: "ppa:jonathonf/gcc-7.3" - key_url: "https://pgp.key-server.io/download/0xE03043828C3FF4BB" - packages: - - binutils - - libdbus-1-dev - before_script: *rust_before_script + - os: linux + name: Daemon - nigtly Rust + language: minimal + dist: xenial + services: docker + before_script: &rust_before_script + - docker run -d --name mvd-build -v $(pwd):/travis -w /travis mullvadvpn/mullvadvpn-app-build:latest tail -f /dev/null + - docker ps script: - - cargo build --verbose - - cargo test --verbose - # Install and run rustfmt on nightly only until rustfmt.toml settings are stabilized. - - rustup component add rustfmt-preview - - cargo fmt --version || true - - cargo fmt -- --check --unstable-features + - docker exec -t mvd-build bash ci/rust-linux-script.sh nightly - - language: rust - rust: beta - os: linux - addons: *rust_linux_addons + - os: linux + name: Daemon - beta Rust + language: minimal + dist: xenial + services: docker before_script: *rust_before_script - script: *rust_script + script: + - docker exec -t mvd-build bash ci/rust-linux-script.sh beta - - language: rust - rust: stable - os: linux - addons: *rust_linux_addons + - os: linux + name: Daemon - stable Rust + language: minimal + dist: xenial + services: docker before_script: *rust_before_script - script: *rust_script + script: + - docker exec -t mvd-build bash ci/rust-linux-script.sh stable notifications: diff --git a/CHANGELOG.md b/CHANGELOG.md index a1b5a83339..b4a7dacd9c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,8 @@ Line wrap the file at 100 chars. Th ## [Unreleased] ### Added +- Add support for roaming between connections when using wireguard + #### Linux - Add standard window decorations to the application window. - Allow a subset of NDP (Router solicitation, router advertisement and redirects) in the firewall. diff --git a/Cargo.lock b/Cargo.lock index 99dcef5a24..65af2c717c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -22,6 +22,11 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] +name = "arc-swap" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] name = "argon2rs" version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1850,6 +1855,15 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] +name = "signal-hook" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "arc-swap 0.3.11 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.51 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] name = "simple-signal" version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2013,6 +2027,9 @@ dependencies = [ "talpid-types 0.1.0", "tempfile 3.0.7 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-core 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-executor 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-io 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-process 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", "tun 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", "uuid 0.7.4 (registry+https://github.com/rust-lang/crates.io-index)", "which 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2243,6 +2260,21 @@ dependencies = [ ] [[package]] +name = "tokio-process" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.51 (registry+https://github.com/rust-lang/crates.io-index)", + "mio 0.6.16 (registry+https://github.com/rust-lang/crates.io-index)", + "mio-named-pipes 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-io 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-reactor 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-signal 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] name = "tokio-proto" version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2286,6 +2318,22 @@ dependencies = [ ] [[package]] +name = "tokio-signal" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.51 (registry+https://github.com/rust-lang/crates.io-index)", + "mio 0.6.16 (registry+https://github.com/rust-lang/crates.io-index)", + "mio-uds 0.6.7 (registry+https://github.com/rust-lang/crates.io-index)", + "signal-hook 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-executor 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-io 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-reactor 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] name = "tokio-sync" version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2611,6 +2659,7 @@ dependencies = [ "checksum aho-corasick 0.6.10 (registry+https://github.com/rust-lang/crates.io-index)" = "81ce3d38065e618af2d7b77e10c5ad9a069859b4be3c2250f674af3840d9c8a5" "checksum ansi_term 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ee49baf6cb617b853aa8d93bf420db2383fab46d314482ca2803b40d5fde979b" "checksum antidote 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "34fde25430d87a9388dadbe6e34d7f72a462c8b43ac8d309b42b0a8505d7e2a5" +"checksum arc-swap 0.3.11 (registry+https://github.com/rust-lang/crates.io-index)" = "bc4662175ead9cd84451d5c35070517777949a2ed84551764129cedb88384841" "checksum argon2rs 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)" = "3f67b0b6a86dae6e67ff4ca2b6201396074996379fba2b92ff649126f37cb392" "checksum arrayvec 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)" = "92c7fb76bc8826a8b33b4ee5bb07a247a81e76764ab4d55e8f73e3a4d8808c71" "checksum assert_matches 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7deb0a829ca7bcfaf5da70b073a8d128619259a7be8216a355e23f00763059e5" @@ -2794,6 +2843,7 @@ dependencies = [ "checksum serde_json 1.0.39 (registry+https://github.com/rust-lang/crates.io-index)" = "5a23aa71d4a4d43fdbfaac00eff68ba8a06a51759a89ac3304323e800c4dd40d" "checksum shared_child 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "7e254d79655b3a1cb86b4079fc080c91664af57d0a81facb59fc828503cd4f48" "checksum shell-escape 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "170a13e64f2a51b77a45702ba77287f5c6829375b04a69cf2222acd17d0cfab9" +"checksum signal-hook 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "97a47ae722318beceb0294e6f3d601205a1e6abaa4437d9d33e3a212233e3021" "checksum simple-signal 1.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "53f7da44adcc42667d57483bd93f81295f27d66897804b757573b61b6f13288b" "checksum slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "17b4fcaed89ab08ef143da37bc52adbcc04d4a69014f4c1208d6b51f0c47bc23" "checksum slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8" @@ -2826,9 +2876,11 @@ dependencies = [ "checksum tokio-io 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)" = "5090db468dad16e1a7a54c8c67280c5e4b544f3d3e018f0b913b400261f85926" "checksum tokio-named-pipes 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "9d282d483052288b2308ba5ee795f5673b159c9bdf63c385a05609da782a5eae" "checksum tokio-openssl 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "4646ae1fd623393de3d796ea53af75acd02938dd5579544fbd6d236d041978a6" +"checksum tokio-process 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "88e1281e412013f1ff5787def044a9577a0bed059f451e835f1643201f8b777d" "checksum tokio-proto 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8fbb47ae81353c63c487030659494b295f6cb6576242f907f203473b191b0389" "checksum tokio-reactor 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)" = "6af16bfac7e112bea8b0442542161bfc41cbfa4466b580bdda7d18cb88b911ce" "checksum tokio-service 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "24da22d077e0f15f55162bdbdc661228c1581892f52074fb242678d015b45162" +"checksum tokio-signal 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)" = "dd6dc5276ea05ce379a16de90083ec80836440d5ef8a6a39545a3207373b8296" "checksum tokio-sync 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "fda385df506bf7546e70872767f71e81640f1f251bdf2fd8eb81a0eaec5fe022" "checksum tokio-tcp 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "1d14b10654be682ac43efee27401d792507e30fd8d26389e1da3b185de2e4119" "checksum tokio-threadpool 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)" = "ec5759cf26cf9659555f36c431b515e3d05f66831741c85b4b5d5dfb9cf1323c" diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000000..44490598d1 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,20 @@ +# To build the image: +# docker build . -t mullvadvpn/mullvadvpn-app-build +# To push the image to our docker hub: +# docker push mullvadvpn/mullvadvpn-app-build +FROM debian:unstable +RUN apt update -y +RUN apt install build-essential \ + gcc \ + libdbus-1-dev \ + rpm \ + gconf2 \ + go2 \ + binutils \ + curl \ + p7zip-full \ + git -y +RUN curl https://sh.rustup.rs -sSf | bash -s -- -y +ENV PATH="/root/.cargo/bin/:${PATH}" +RUN mkdir /mvd +CMD tail -f /dev/null diff --git a/ci/rust-linux-script.sh b/ci/rust-linux-script.sh new file mode 100644 index 0000000000..2e05b76a97 --- /dev/null +++ b/ci/rust-linux-script.sh @@ -0,0 +1,15 @@ +set -eu +RUST_TOOLCHAIN_CHANNEL=$1 +RUSTFLAGS="--deny unused_imports --deny dead_code" + +source env.sh "" +rustup update $RUST_TOOLCHAIN_CHANNEL +rustup default $RUST_TOOLCHAIN_CHANNEL + +cargo build --verbose +cargo test --verbose +if [ "${RUST_TOOLCHAIN_CHANNEL}" = "nightly" ]; then + rustup component add rustfmt-preview; + rustfmt --version; + cargo fmt -- --check --unstable-features; +fi diff --git a/dist-assets/binaries b/dist-assets/binaries -Subproject 4b0ad1d6cc92d7f327443e7698b90df0152f4b8 +Subproject 9af1ef2d2f677aeaf5ddebcc6d2ee1df84c19e6 diff --git a/mullvad-daemon/src/logging.rs b/mullvad-daemon/src/logging.rs index cacd35fa80..d56196f9c8 100644 --- a/mullvad-daemon/src/logging.rs +++ b/mullvad-daemon/src/logging.rs @@ -40,6 +40,7 @@ const SILENCED_CRATES: &[&str] = &[ "hyper", "rtnetlink", "netlink_proto", + "netlink_sys", "iproute2", ]; const SLIGHTLY_SILENCED_CRATES: &[&str] = &["mnl", "nftnl"]; diff --git a/talpid-core/Cargo.toml b/talpid-core/Cargo.toml index 982b1d500d..833b2cb9b6 100644 --- a/talpid-core/Cargo.toml +++ b/talpid-core/Cargo.toml @@ -32,6 +32,9 @@ hex = "0.3" ipnetwork = "0.14" lazy_static = "1.0" nix = "0.13" +tokio-process = "0.2" +tokio-executor = "0.1" +tokio-io = "0.1" [target.'cfg(target_os = "linux")'.dependencies] diff --git a/talpid-core/src/routing/android.rs b/talpid-core/src/routing/android.rs index 7c3ba70230..bd21629906 100644 --- a/talpid-core/src/routing/android.rs +++ b/talpid-core/src/routing/android.rs @@ -1,29 +1,28 @@ -use super::RequiredRoutes; -use std::net::{IpAddr, Ipv4Addr}; +use futures::{sync::oneshot, Async, Future}; +use ipnetwork::IpNetwork; +use std::collections::HashMap; /// Stub error type for routing errors on Android. #[derive(Debug, err_derive::Error)] #[error(display = "Unknown Android routing error")] pub struct Error; -pub struct RouteManager; +/// Stub route manager for Android +pub struct RouteManagerImpl; -impl super::RoutingT for RouteManager { - type Error = Error; - - fn new() -> Result<Self, Self::Error> { - Ok(RouteManager) - } - - fn add_routes(&mut self, _required_routes: RequiredRoutes) -> Result<(), Self::Error> { - Ok(()) - } - - fn delete_routes(&mut self) -> Result<(), Self::Error> { - Ok(()) +impl RouteManagerImpl { + pub fn new( + _required_routes: HashMap<IpNetwork, super::NetNode>, + _shutdown_rx: oneshot::Receiver<oneshot::Sender<()>>, + ) -> Result<Self, Error> { + Ok(Self {}) } +} - fn get_default_route_node(&mut self) -> Result<IpAddr, Self::Error> { - Ok(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0))) +impl Future for RouteManagerImpl { + type Item = (); + type Error = Error; + fn poll(&mut self) -> Result<Async<()>, Error> { + Ok(Async::Ready(())) } } diff --git a/talpid-core/src/routing/linux.rs b/talpid-core/src/routing/linux.rs deleted file mode 100644 index b229b192c2..0000000000 --- a/talpid-core/src/routing/linux.rs +++ /dev/null @@ -1,141 +0,0 @@ -use super::{NetNode, RequiredRoutes, Route}; - -use super::subprocess::{Exec, RunExpr}; -use std::{ - collections::HashSet, - io, - net::{AddrParseError, IpAddr}, -}; - -pub type Result<T> = std::result::Result<T, Error>; - -/// Errors that can happen in the Linux routing integration -#[derive(err_derive::Error, Debug)] -pub enum Error { - /// Failed to add route. - #[error(display = "Failed to add route")] - FailedToAddRoute(#[error(cause)] io::Error), - - /// Failed to remove route. - #[error(display = "Failed to remove route")] - FailedToRemoveRoute(#[error(cause)] io::Error), - - /// Error while running "ip route". - #[error(display = "Error while running \"ip route\"")] - FailedToRunIp(#[error(cause)] io::Error), - - /// No default route in "ip route" output. - #[error(display = "No default route in \"ip route\" output")] - NoDefaultRoute, - - /// Failed to parse default route as IP. - #[error(display = "Failed to parse default route as IP: {}", _0)] - ParseDefaultRoute(String, #[error(cause)] AddrParseError), -} - - -pub struct RouteManager { - added_routes: HashSet<super::Route>, -} - -impl RouteManager { - fn add_route(&mut self, route: super::Route) -> Result<()> { - if route.prefix.prefix() == 0 { - return if route.prefix.is_ipv4() { - self.add_route(Route::new("0.0.0.0/1".parse().unwrap(), route.node.clone()))?; - self.add_route(Route::new( - "128.0.0.0/1".parse().unwrap(), - route.node.clone(), - )) - } else { - self.add_route(Route::new("::/1".parse().unwrap(), route.node.clone()))?; - self.add_route(Route::new("8000::/1".parse().unwrap(), route.node.clone())) - }; - } - - let cmd = Self::add_route_cmd(&route); - - cmd.into_expr() - .run_expr() - .map_err(Error::FailedToAddRoute)?; - - self.added_routes.insert(route); - Ok(()) - } - - fn add_route_cmd(route: &Route) -> Exec { - let cmd = Exec::cmd("ip") - .arg(ip_vers(&route)) - .arg("route") - .arg("add") - .arg(route.prefix.to_string()); - match &route.node { - NetNode::Address(ref addr) => cmd.arg("via").arg(addr.to_string()), - NetNode::Device(ref device) => cmd.arg("dev").arg(device), - } - } -} - -fn ip_vers(route: &Route) -> &'static str { - if route.prefix.is_ipv4() { - "-4" - } else { - "-6" - } -} - - -impl super::RoutingT for RouteManager { - type Error = Error; - fn new() -> Result<Self> { - Ok(RouteManager { - added_routes: HashSet::new(), - }) - } - - fn add_routes(&mut self, required_routes: RequiredRoutes) -> Result<()> { - for route in required_routes.routes.into_iter() { - if let Err(e) = self.add_route(route) { - let _ = self.delete_routes(); - return Err(e); - } - } - Ok(()) - } - - fn delete_routes(&mut self) -> Result<()> { - let mut end_result = Ok(()); - for route in self.added_routes.drain() { - let result = duct::cmd!( - "ip", - ip_vers(&route), - "route", - "delete", - route.prefix.to_string() - ) - .run_expr() - .map_err(Error::FailedToRemoveRoute); - if let Err(e) = result { - log::error!("Failed to remove route {} - {}", route.prefix, e); - end_result = Err(e); - } - } - end_result - } - - /// Retrieves the gateway for the default route - fn get_default_route_node(&mut self) -> Result<IpAddr> { - let output = duct::cmd!("ip", "route") - .stdout() - .map_err(Error::FailedToRunIp)?; - let ip_str: &str = output - .lines() - .find(|line| line.trim().starts_with("default via ")) - .and_then(|line| line.trim().split_whitespace().nth(2)) - .ok_or(Error::NoDefaultRoute)?; - - ip_str - .parse() - .map_err(|e| Error::ParseDefaultRoute(ip_str.to_owned(), e)) - } -} diff --git a/talpid-core/src/routing/linux/change_listener.rs b/talpid-core/src/routing/linux/change_listener.rs new file mode 100644 index 0000000000..44148a1d3b --- /dev/null +++ b/talpid-core/src/routing/linux/change_listener.rs @@ -0,0 +1,241 @@ +use super::{ + super::{Node, Route}, + RouteChange, +}; +use futures::{future::Either, sync::mpsc, Async, Future, Stream}; +use std::{collections::BTreeMap, io, net::IpAddr}; + +use netlink_packet::{ + LinkMessage, LinkNla, NetlinkMessage, NetlinkPayload, RouteMessage, RouteNla, RtnlMessage, +}; +use netlink_sys::SocketAddr; +use rtnetlink::constants::{ + AF_INET, AF_INET6, RTMGRP_IPV4_ROUTE, RTMGRP_IPV6_ROUTE, RTMGRP_LINK, RTMGRP_NOTIFY, +}; + +#[derive(err_derive::Error, Debug)] +pub enum Error { + #[error(display = "Netlink connection failed")] + NetlinkError(#[error(cause)] failure::Compat<rtnetlink::Error>), + #[error(display = "Netlink protocol error")] + NetlinkProtocolError(#[error(cause)] failure::Compat<netlink_proto::Error>), + #[error(display = "Failed to open a netlink connection")] + ConnectError(#[error(cause)] io::Error), + #[error(display = "Route without a valid node")] + InvalidRoute, + #[error(display = "Invalid length of byte buffer for IP address")] + InvalidIpBytes, + #[error(display = "Invalid network prefix")] + InvalidNetworkPrefix(#[error(cause)] ipnetwork::IpNetworkError), + #[error(display = "Unknown device index - {}", _0)] + UnknownDeviceIndex(u32), + #[error(display = "Failed to bind netlink socket")] + BindError(#[error(cause)] io::Error), + #[error(display = "Netlink connection stopped sending messages")] + NetlinkConnectionClosed, +} + +type Result<T> = ::std::result::Result<T, Error>; + +pub(super) struct RouteChangeListener { + connection: rtnetlink::Connection, + messages: mpsc::UnboundedReceiver<NetlinkMessage>, + iface_map: BTreeMap<u32, String>, +} + +impl RouteChangeListener { + pub fn new() -> Result<Self> { + let (mut connection, handle, messages) = + rtnetlink::new_connection_with_messages().map_err(Error::ConnectError)?; + + let mgroup_flags = RTMGRP_IPV4_ROUTE | RTMGRP_IPV6_ROUTE | RTMGRP_LINK | RTMGRP_NOTIFY; + let addr = SocketAddr::new(0, mgroup_flags); + connection + .socket_mut() + .bind(&addr) + .map_err(Error::BindError)?; + + let (iface_map, connection) = Self::initialize_link_map(connection, handle)?; + + Ok(Self { + connection, + messages, + iface_map, + }) + } + + fn map_netlink_to_route_change(&mut self, msg: NetlinkMessage) -> Result<Option<RouteChange>> { + match msg.payload { + NetlinkPayload::Rtnl(RtnlMessage::NewLink(new_link)) => { + if let Some((idx, name)) = Self::map_iface_name_to_idx(new_link) { + self.iface_map.insert(idx, name); + } + Ok(None) + } + NetlinkPayload::Rtnl(RtnlMessage::DelLink(old_link)) => { + if let Some((idx, _)) = Self::map_iface_name_to_idx(old_link) { + self.iface_map.remove(&idx); + } + Ok(None) + } + + NetlinkPayload::Rtnl(RtnlMessage::NewRoute(new_route)) => { + self.get_route(new_route).map(RouteChange::Add).map(Some) + } + NetlinkPayload::Rtnl(RtnlMessage::DelRoute(old_route)) => { + self.get_route(old_route).map(RouteChange::Remove).map(Some) + } + _ => Ok(None), + } + } + + // Tries to coax a Route out of a RouteMessage + fn get_route(&self, msg: RouteMessage) -> Result<Route> { + let mut prefix = None; + let mut node_addr = None; + let mut device = None; + let mut metric = None; + let mut gateway = None; + + let destination_length = msg.header.destination_length; + let af_spec = msg.header.address_family; + + for nla in msg.nlas.iter() { + match nla { + RouteNla::Oif(device_idx) => { + match self.iface_map.get(&device_idx) { + Some(device_name) => device = Some(device_name.to_string()), + None => { + return Err(Error::UnknownDeviceIndex(*device_idx)); + } + }; + } + + RouteNla::Via(addr) => { + node_addr = Self::parse_ip(&addr).map(Some)?; + } + + RouteNla::Destination(addr) => { + prefix = Self::parse_ip(&addr) + .and_then(|ip| { + ipnetwork::IpNetwork::new(ip, destination_length) + .map_err(Error::InvalidNetworkPrefix) + }) + .map(Some)?; + } + + // gateway NLAs indicate that this is actually a default route + RouteNla::Gateway(gateway_ip) => { + gateway = Self::parse_ip(&gateway_ip).map(Some)?; + } + + RouteNla::Priority(priority) => { + metric = Some(*priority); + } + _ => continue, + } + } + + // when a gateway is specified but prefix is none, then this is a default route + if prefix.is_none() && gateway.is_some() { + prefix = match af_spec as u16 { + AF_INET => Some("0.0.0.0/0".parse().expect("failed to parse ipnetwork")), + AF_INET6 => Some("::/0".parse().expect("failed to parse ipnetwork")), + _ => None, + }; + } + + if device.is_none() && node_addr.is_none() || prefix.is_none() { + return Err(Error::InvalidRoute); + } + + + let node = Node { + ip: node_addr, + device, + }; + + Ok(Route { + node, + prefix: prefix.unwrap(), + metric, + }) + } + + fn map_iface_name_to_idx(msg: LinkMessage) -> Option<(u32, String)> { + let index = msg.header.index; + for nla in msg.nlas { + match nla { + LinkNla::IfName(name) => return Some((index, name)), + _ => continue, + } + } + None + } + + fn parse_ip(bytes: &[u8]) -> Result<IpAddr> { + if bytes.len() == 4 { + let mut ipv4_bytes = [0u8; 4]; + ipv4_bytes.copy_from_slice(bytes); + Ok(IpAddr::from(ipv4_bytes)) + } else if bytes.len() == 16 { + let mut ipv6_bytes = [0u8; 16]; + ipv6_bytes.copy_from_slice(bytes); + Ok(IpAddr::from(ipv6_bytes)) + } else { + log::error!("Expected either 4 or 16 bytes, got {} bytes", bytes.len()); + Err(Error::InvalidIpBytes) + } + } + + pub fn initialize_link_map( + connection: rtnetlink::Connection, + handle: rtnetlink::Handle, + ) -> Result<(BTreeMap<u32, String>, rtnetlink::Connection)> { + let request = handle + .link() + .get() + .execute() + .filter_map(Self::map_iface_name_to_idx) + .collect(); + + match connection.select2(request).wait() { + Ok(Either::A(_)) => Err(Error::NetlinkConnectionClosed), + Err(Either::A((error, _))) => { + Err(Error::NetlinkProtocolError(failure::Fail::compat(error))) + } + Ok(Either::B((links, connection))) => Ok((links.into_iter().collect(), connection)), + Err(Either::B((error, _))) => Err(Error::NetlinkError(failure::Fail::compat(error))), + } + } +} + +impl Stream for RouteChangeListener { + type Item = RouteChange; + type Error = Error; + + fn poll(&mut self) -> Result<Async<Option<RouteChange>>> { + self.connection + .poll() + .map_err(failure::Fail::compat) + .map_err(Error::NetlinkProtocolError)?; + + loop { + match futures::try_ready!(self + .messages + .poll() + .map_err(|_| Error::NetlinkConnectionClosed)) + { + Some(message) => { + if let Some(route_change) = self.map_netlink_to_route_change(message)? { + return Ok(Async::Ready(Some(route_change))); + }; + continue; + } + None => { + return Err(Error::NetlinkConnectionClosed); + } + } + } + } +} diff --git a/talpid-core/src/routing/linux/mod.rs b/talpid-core/src/routing/linux/mod.rs new file mode 100644 index 0000000000..c5056f08eb --- /dev/null +++ b/talpid-core/src/routing/linux/mod.rs @@ -0,0 +1,541 @@ +use super::{NetNode, Node, Route}; + +use ipnetwork::IpNetwork; +use std::{ + collections::{HashMap, HashSet, VecDeque}, + io, + process::{Command, Stdio}, +}; + +mod change_listener; +use change_listener::{Error as RouteChangeListenerError, RouteChangeListener}; + +use futures::{sync::oneshot, Async, Future, IntoFuture, Stream}; +use tokio_process::CommandExt; + +pub type Result<T> = std::result::Result<T, Error>; + +/// Errors that can happen in the Linux routing integration +#[derive(err_derive::Error, Debug)] +pub enum Error { + /// Failed to add route. + #[error(display = "Failed to add route")] + FailedToAddRoute, + + /// Failed to remove route. + #[error(display = "Failed to remove route")] + FailedToRemoveRoute, + + /// Error while running "ip route". + #[error(display = "Error while running \"ip route\"")] + FailedToRunIp(#[error(cause)] io::Error), + + /// Invocation of `ip route` ended with a non-zero exit code + #[error(display = "ip returend a non-zero exit code")] + ErrorIpFailed, + + /// Received unexpected output from `ip route` + #[error(display = "Received unexpected output from \"ip\"")] + UnexpectedOutput, + + /// No default route exists + #[error(display = "No default route in \"ip route\" output")] + NoDefaultRoute, + + /// Route table change stream failed. + #[error(display = "Route change listener failed")] + ChangeListenerError(#[error(cause)] RouteChangeListenerError), + + /// Route table change stream failed. + #[error(display = "Route change listener closed unexpectedly")] + ChangeListenerClosed, +} + +pub struct RouteManagerImpl { + changes: RouteChangeListener, + + // currently added routes + added_routes: HashSet<Route>, + // default route tracking + // destinations that should be routed through the default route + required_default_routes: HashSet<IpNetwork>, + default_routes: HashSet<Route>, + best_default_node_v4: Option<Node>, + best_default_node_v6: Option<Node>, + + // pending changes + needed_changes: VecDeque<RouteChange>, + pending_change: Option<PendingChange>, + + // if the stop channel is set, the future should wind down - remove added routes and send a + // signal. + shutdown_finished_tx: Option<oneshot::Sender<()>>, + shutdown_rx: oneshot::Receiver<oneshot::Sender<()>>, + should_shut_down: bool, +} + +impl RouteManagerImpl { + /// Creates a new RouteManager. + pub fn new( + required_routes: HashMap<IpNetwork, NetNode>, + shutdown_rx: oneshot::Receiver<oneshot::Sender<()>>, + ) -> Result<Self> { + let changes = RouteChangeListener::new().map_err(Error::ChangeListenerError)?; + + let mut required_normal_routes = HashSet::new(); + let mut required_default_routes = HashSet::new(); + let mut added_routes = HashSet::new(); + + for (destination, node) in required_routes { + match node { + NetNode::RealNode(node) => { + required_normal_routes.insert(Route::new(node, destination)); + } + NetNode::DefaultNode => { + required_default_routes.insert(destination); + } + } + } + + let default_routes = Self::get_default_routes().wait()?; + + let best_default_node_v4 = Self::pick_best_default_node(&default_routes, true); + let best_default_node_v6 = Self::pick_best_default_node(&default_routes, false); + + let mut establish_baseline_fn = || -> Result<()> { + for normal_route in required_normal_routes.iter() { + Self::add_route(&normal_route).wait()?; + added_routes.insert(normal_route.clone()); + } + + for prefix in required_default_routes.iter() { + match ( + prefix.is_ipv4(), + &best_default_node_v4, + &best_default_node_v6, + ) { + (false, _, Some(default_node)) | (true, Some(default_node), _) => { + // best to pick a single node identifier rather than device + ip + let route = Route::new(default_node.clone(), *prefix); + Self::add_route(&route).wait()?; + added_routes.insert(route); + } + // at this point in time, there exists no default route for the given IP version + // so no routes will be added. The assumption is that routing ipv4 through ipv6 + // nodes may or may not be bonkers + _ => continue, + } + } + Ok(()) + }; + + if let Err(e) = establish_baseline_fn() { + for setup_route in added_routes { + if let Err(removal_err) = Self::delete_route(&setup_route).wait() { + log::error!( + "Failed to remove route whilst cleaning up failed initialization +of route monitor -{}", + removal_err + ); + } + } + return Err(e); + } + + + Ok(Self { + changes, + + required_default_routes, + added_routes, + + default_routes, + best_default_node_v4, + best_default_node_v6, + + needed_changes: VecDeque::new(), + pending_change: None, + + shutdown_finished_tx: None, + shutdown_rx, + should_shut_down: false, + }) + } + + fn process_route_table_change(&mut self) -> Result<()> { + loop { + let change = self.changes.poll().map_err(Error::ChangeListenerError)?; + match change { + Async::NotReady => return Ok(()), + Async::Ready(Some(RouteChange::Add(route))) => self.process_new_route(route), + Async::Ready(Some(RouteChange::Remove(route))) => self.process_deleted_route(route), + Async::Ready(None) => return Err(Error::ChangeListenerClosed), + } + } + } + + fn process_new_route(&mut self, route: Route) { + self.needed_changes.retain(|change| { + if let RouteChange::Add(old_route) = change { + old_route != &route + } else { + true + } + }); + if route.prefix.prefix() == 0 { + self.default_routes.insert(route); + self.update_default_rotues(); + } + } + + fn process_deleted_route(&mut self, route: Route) { + self.needed_changes.retain(|change| { + if let RouteChange::Remove(old_route) = change { + old_route != &route + } else { + true + } + }); + if route.prefix.prefix() == 0 { + self.update_default_rotues(); + } + } + + fn update_default_rotues(&mut self) { + let new_best_v4 = Self::pick_best_default_node(&self.default_routes, true); + if self.best_default_node_v4 != new_best_v4 && new_best_v4.is_some() { + let new_node = new_best_v4.unwrap(); + let old_node = self.best_default_node_v4.take(); + let v4_destinations: Vec<_> = self + .required_default_routes + .iter() + .filter(|ip| ip.is_ipv4()) + .cloned() + .collect(); + for destination in v4_destinations { + if let Some(old_node) = &old_node { + self.enque_route_change(RouteChange::Remove(Route::new( + old_node.clone(), + destination, + ))); + } + + self.enque_route_change(RouteChange::Add(Route::new( + new_node.clone(), + destination, + ))); + } + self.best_default_node_v4 = Some(new_node); + } + + let new_best_v6 = Self::pick_best_default_node(&self.default_routes, false); + if self.best_default_node_v6 != new_best_v6 && new_best_v6.is_some() { + let new_node = new_best_v6.unwrap(); + let old_node = self.best_default_node_v6.take(); + let v6_destinations: Vec<_> = self + .required_default_routes + .iter() + .filter(|ip| !ip.is_ipv4()) + .cloned() + .collect(); + + for destination in v6_destinations { + if let Some(old_node) = &old_node { + self.enque_route_change(RouteChange::Remove(Route::new( + old_node.clone(), + destination, + ))); + } + self.enque_route_change(RouteChange::Add(Route::new( + new_node.clone(), + destination, + ))); + } + self.best_default_node_v6 = Some(new_node); + } + } + + fn enque_route_change(&mut self, route_change: RouteChange) { + // Only add a route change to the queue of changes if a change like this doesn't exist + // already. + if self + .pending_change + .as_ref() + .map(|pending_change| &pending_change.change != &route_change) + .unwrap_or(true) + && self + .needed_changes + .iter() + .all(|enqued_change| enqued_change != &route_change) + { + self.needed_changes.push_back(route_change); + } + } + + fn pick_best_default_node(routes: &HashSet<Route>, v4: bool) -> Option<Node> { + // Pick the route with the lowest metric - thus the most favourable route. + routes + .iter() + .filter(|route| route.prefix.is_ipv4() == v4) + .fold( + None, + |best_route: Option<Route>, next_route| match best_route { + Some(current_best) => { + if current_best.metric.unwrap_or(0) > next_route.metric.unwrap_or(0) { + Some(next_route.clone()) + } else { + Some(current_best) + } + } + None => Some(next_route.clone()), + }, + ) + .map(|route| route.node) + } + + // Try and apply changes to the routing table if any are necessary. + // Returns true if no more changes are to be made. + fn apply_route_table_changes(&mut self) -> Result<bool> { + let mut should_stop = false; + while !should_stop { + if self.pending_change.is_none() { + if let Some(change) = self.needed_changes.pop_front() { + let process = match &change { + RouteChange::Add(route) => Self::add_route(route), + RouteChange::Remove(route) => Self::delete_route(route), + }; + self.pending_change = Some(PendingChange { change, process }); + } + } + + if let Some(mut change) = self.pending_change.take() { + match change.process.poll()? { + Async::NotReady => { + self.pending_change = Some(change); + should_stop = true; + } + Async::Ready(_) => { + match change.change { + RouteChange::Add(route) => { + self.added_routes.insert(route); + } + RouteChange::Remove(route) => { + self.added_routes.remove(&route); + } + }; + } + }; + } else { + should_stop = true; + } + } + + Ok(self.pending_change.is_none() && self.needed_changes.len() == 0) + } + + fn route_cmd(action: &str, route: &Route) -> Command { + let mut cmd = Command::new("ip"); + + cmd.arg(ip_vers(&route)) + .arg("route") + .arg(action) + .arg(route.prefix.to_string()); + + if let Some(addr) = route.node.get_address() { + cmd.arg("via").arg(addr.to_string()); + }; + if let Some(device) = route.node.get_device() { + cmd.arg("dev").arg(device); + }; + if let Some(metric) = route.metric { + cmd.arg("metric").arg(metric.to_string()); + }; + + cmd + } + + fn run_cmd(mut cmd: Command, err: Error) -> Box<dyn Future<Item = (), Error = Error> + Send> { + log::trace!("running cmd - {:?}", &cmd); + Box::new( + cmd.spawn_async() + .into_future() + .flatten() + .map_err(Error::FailedToRunIp) + .and_then(|exit_status| { + if exit_status.success() { + Ok(()) + } else { + Err(err) + } + }), + ) + } + + fn get_default_routes_inner( + ip_version: IpVersion, + ) -> impl Future<Item = Vec<Route>, Error = Error> { + let mut cmd = Command::new("ip"); + cmd.arg(ip_version.to_route_arg()).arg("route").arg("show"); + + Box::new( + cmd.stdout(Stdio::piped()) + .spawn_async() + .map_err(Error::FailedToRunIp) + .into_future() + .and_then(|proc| proc.wait_with_output().map_err(Error::FailedToRunIp)) + .and_then(move |output| { + let output_lines = String::from_utf8(output.stdout.clone()) + .map_err(|_| Error::UnexpectedOutput)?; + Ok(output_lines + .lines() + .filter_map(|line| { + if line.starts_with("default") { + parse_ip_route_show_line(line, ip_version) + } else { + None + } + }) + .collect()) + }), + ) + } + + /// Adds routes to the system routing table. + fn add_route(route: &Route) -> Box<dyn Future<Item = (), Error = Error> + Send> { + let cmd = Self::route_cmd("replace", route); + Self::run_cmd(cmd, Error::FailedToAddRoute) + } + + /// Removes previously set routes. If routes were set for specific tables, the whole tables + /// will be removed. + fn delete_route(route: &Route) -> Box<dyn Future<Item = (), Error = Error> + Send> { + let cmd = Self::route_cmd("delete", route); + Self::run_cmd(cmd, Error::FailedToRemoveRoute) + } + + /// Retrieves the gateway for the default route + fn get_default_routes() -> Box<dyn Future<Item = HashSet<Route>, Error = Error> + Send> { + Box::new( + Self::get_default_routes_inner(IpVersion::V4) + .join(Self::get_default_routes_inner(IpVersion::V6)) + .map(|(v4_routes, v6_routes)| { + v4_routes.into_iter().chain(v6_routes.into_iter()).collect() + }), + ) + } +} + +#[derive(Debug, Copy, Clone)] +enum IpVersion { + V4, + V6, +} + +impl IpVersion { + fn to_route_arg(&self) -> &'static str { + match self { + IpVersion::V4 => "-4", + IpVersion::V6 => "-6", + } + } +} + +impl Future for RouteManagerImpl { + type Item = (); + type Error = Error; + fn poll(&mut self) -> Result<Async<()>> { + if !self.should_shut_down { + match self.shutdown_rx.poll() { + Ok(Async::NotReady) => (), + Ok(Async::Ready(tx)) => { + self.should_shut_down = true; + self.shutdown_finished_tx = Some(tx); + } + Err(_) => { + self.should_shut_down = true; + } + }; + self.process_route_table_change()?; + } + let all_changes_applied = self.apply_route_table_changes()?; + if all_changes_applied && self.should_shut_down { + if let Some(tx) = self.shutdown_finished_tx.take() { + if let Err(_) = tx.send(()) { + log::error!("RouteManagerHandle already stopped"); + } + } + Ok(Async::Ready(())) + } else { + Ok(Async::NotReady) + } + } +} + +// intended to parse lines sucha as the following: +// default via 192.168.1.1 dev wlp61s0 proto dhcp metric 600 +fn parse_ip_route_show_line(line: &str, ip_version: IpVersion) -> Option<Route> { + let mut node_ip = None; + let mut device = None; + let mut metric = None; + + let mut tokens = line.split_whitespace(); + let prefix_str = tokens.next()?; + let prefix = match prefix_str { + "default" => match ip_version { + IpVersion::V4 => "0.0.0.0/0".parse().unwrap(), + IpVersion::V6 => "::/0".parse().unwrap(), + }, + prefix_str => prefix_str.parse().ok()?, + }; + + let tokens: Vec<&str> = tokens.collect(); + for pair in tokens.chunks(2) { + if pair.len() != 2 { + log::error!("unexpected output from ip"); + break; + } + let kind = pair[0]; + let value = pair[1]; + + match kind { + "via" => node_ip = value.parse().ok(), + "dev" => device = Some(value.to_string()), + "metric" => metric = value.parse().ok(), + _ => continue, + }; + } + + if node_ip.is_none() && device.is_none() { + None + } else { + let node = super::Node { + ip: node_ip, + device, + }; + + Some(super::Route { + node, + prefix, + metric, + }) + } +} + +fn ip_vers(route: &Route) -> &'static str { + if route.prefix.is_ipv4() { + "-4" + } else { + "-6" + } +} + +#[derive(Debug, PartialEq)] +enum RouteChange { + Add(Route), + Remove(Route), +} + +struct PendingChange { + change: RouteChange, + process: Box<dyn Future<Item = (), Error = Error> + Send>, +} diff --git a/talpid-core/src/routing/macos.rs b/talpid-core/src/routing/macos.rs index 722efbb320..987ffc14c9 100644 --- a/talpid-core/src/routing/macos.rs +++ b/talpid-core/src/routing/macos.rs @@ -1,12 +1,16 @@ -use super::{NetNode, RequiredRoutes, Route}; +use super::{NetNode, Node, Route}; -use super::subprocess::{Exec, RunExpr}; +use ipnetwork::IpNetwork; use std::{ - collections::HashSet, + collections::{HashMap, HashSet}, io, - net::{AddrParseError, IpAddr}, + net::IpAddr, + process::{Command, ExitStatus, Stdio}, }; +use futures::{stream, sync::oneshot, Async, Future, IntoFuture, Stream}; +use tokio_process::{Child, CommandExt}; + pub type Result<T> = std::result::Result<T, Error>; @@ -22,118 +26,419 @@ pub enum Error { FailedToRemoveRoute(#[error(cause)] io::Error), /// Error while running "ip route". - #[error(display = "Error while running \"ip route\"")] - FailedToRunIp(#[error(cause)] io::Error), + #[error(display = "Error while running \"route get\"")] + FailedToRunRoute(#[error(cause)] io::Error), + + /// Error while monitoring routes with `route -nv monitor` + #[error(display = "Error while running \"route -nv monitor\"")] + FailedToMonitorRoutes(#[error(cause)] io::Error), /// No default route in "ip route" output. #[error(display = "No default route in \"ip route\" output")] NoDefaultRoute, - /// Failed to parse default route as IP. - #[error(display = "Failed to parse default route as IP: {}", _0)] - ParseDefaultRoute(String, #[error(cause)] AddrParseError), + /// Unexpected output from netstat + #[error(display = "Unexpected output from netstat")] + BadOutputFromNetstat, } -pub struct RouteManager { - set_routes: HashSet<Route>, +enum RouteManagerState { + Listening(ChangeListener), + ObtainingDefaultRoutes( + Box<dyn Future<Item = (Option<Node>, Option<Node>), Error = Error> + Send>, + ), + Applying(Box<dyn Future<Item = (), Error = ()> + Send>), + Shutdown(Box<dyn Future<Item = (), Error = ()> + Send>), } -impl RouteManager { - fn add_route(&mut self, route: Route) -> Result<()> { - if route.prefix.prefix() == 0 { - return if route.prefix.is_ipv4() { - self.add_route(Route::new("0.0.0.0/1".parse().unwrap(), route.node.clone()))?; - self.add_route(Route::new( - "128.0.0.0/1".parse().unwrap(), - route.node.clone(), - )) - } else { - self.add_route(Route::new("::/1".parse().unwrap(), route.node.clone()))?; - self.add_route(Route::new("8000::/1".parse().unwrap(), route.node.clone())) - }; +/// Route manager can be in 1 of 4 states - +/// - waiting for a route to be added or removed from the route table +/// - obtaining default routes +/// - applying changes to the route table +/// - shutting down +/// +/// Only the _shutting down_ state can be reached from all other states, but during normal +/// operation, the route manager will add all the required routes during startup and will start +/// waiting for changes to the route table. If any change is detected, it will stop listening for +/// new changes, obtain new default routes and reapply routes that should be routed through the +/// default nodes. Once the routes are reapplied, the route table changes are monitored again. +pub struct RouteManagerImpl { + default_destinations: HashSet<IpNetwork>, + applied_routes: HashSet<Route>, + current_state: RouteManagerState, + v4_gateway: Option<Node>, + v6_gateway: Option<Node>, + shutdown_rx: Option<oneshot::Receiver<oneshot::Sender<()>>>, +} + + +impl RouteManagerImpl { + pub fn new( + required_routes: HashMap<IpNetwork, NetNode>, + shutdown_rx: oneshot::Receiver<oneshot::Sender<()>>, + ) -> Result<Self> { + let mut applied_routes = HashSet::new(); + let mut routes_to_apply = vec![]; + let mut default_destinations = HashSet::new(); + + let v4_gateway = Self::get_default_node_cmd("-inet").wait()?; + let v6_gateway = Self::get_default_node_cmd("-inet6").wait()?; + + if v4_gateway.is_none() && v6_gateway.is_none() { + return Err(Error::NoDefaultRoute); + } + + for (destination, node) in required_routes.into_iter() { + match node { + NetNode::DefaultNode => { + default_destinations.insert(destination); + } + + NetNode::RealNode(node) => routes_to_apply.push(Route::new(node, destination)), + } + } + + let apply_routes_fn = || -> Result<()> { + for route in routes_to_apply { + Self::add_route(&route).wait()?; + applied_routes.insert(route); + } + for destination in default_destinations.iter() { + match (&v4_gateway, &v6_gateway, destination.is_ipv4()) { + (Some(gateway), _, true) | (_, Some(gateway), false) => { + let route = Route::new(gateway.clone(), *destination); + Self::add_route(&route).wait()?; + applied_routes.insert(route); + } + _ => (), + }; + } + + Ok(()) + }; + + if let Err(e) = apply_routes_fn() { + log::error!("Failed to apply routes - {}", e); + for applied_route in applied_routes.iter() { + if let Err(removal_err) = Self::delete_route(applied_route.prefix).wait() { + log::error!( + "Failed to clean up routes after failing to set them up - {}", + removal_err + ); + } + } + return Err(e); + } + let change_listener = ChangeListener::new().map_err(Error::FailedToMonitorRoutes)?; + + Ok(Self { + default_destinations, + applied_routes, + current_state: RouteManagerState::Listening(change_listener), + shutdown_rx: Some(shutdown_rx), + v4_gateway, + v6_gateway, + }) + } + + // Retrieves the node that's currently used to reach 0.0.0.0/0 + // Arguments can be either -inet or -inet6 + fn get_default_node_cmd( + if_family: &'static str, + ) -> impl Future<Item = Option<Node>, Error = Error> { + let mut cmd = Command::new("route"); + cmd.arg("-n").arg("get").arg(if_family).arg("default"); + + cmd.output_async() + .map_err(Error::FailedToRunRoute) + .and_then(|output| { + let output = String::from_utf8(output.stdout).map_err(|e| { + log::error!("Failed to parse utf-8 bytes from output of netstat - {}", e); + Error::BadOutputFromNetstat + })?; + Ok(Self::parse_route(&output)) + }) + } + + fn parse_route(route_output: &str) -> Option<Node> { + let mut address: Option<IpAddr> = None; + let mut device = None; + for line in route_output.lines() { + // we're looking for just 2 different lines: + // interface: utun0 + // gateway: 192.168.3.1 + let tokens: Vec<_> = line.split_whitespace().collect(); + if tokens.len() == 2 { + match tokens[0].trim() { + "interface:" => { + device = Some(tokens[1].to_string()); + } + "gateway:" => { + address = Self::parse_gateway_line(tokens[1]); + } + _ => continue, + } + } } - let mut cmd = Exec::cmd("route") - .arg("-q") + match (address, device) { + (Some(address), Some(device)) => Some(Node::new(address, device)), + (Some(address), None) => Some(Node::address(address)), + (None, Some(device)) => Some(Node::device(device)), + _ => None, + } + } + + fn parse_gateway_line(line: &str) -> Option<IpAddr> { + // IPv6 addresses may contain interfaces + // if line contains '%' it should be split off + line.split('%') + .next() + .and_then(|ip_str| ip_str.parse().ok()) + } + + pub fn delete_route( + destination: IpNetwork, + ) -> impl Future<Item = ExitStatus, Error = Error> + Send { + let mut cmd = Command::new("route"); + cmd.arg("-q") + .arg("-n") + .arg("delete") + .arg(ip_vers(destination)) + .arg(destination.to_string()); + + futures::lazy(move || cmd.spawn_async().into_future().and_then(|f| f)) + .map_err(Error::FailedToRemoveRoute) + } + + fn add_route(route: &Route) -> impl Future<Item = ExitStatus, Error = Error> + Send { + let mut cmd = Command::new("route"); + cmd.arg("-q") .arg("-n") .arg("add") - .arg(ip_vers(&route)) + .arg(ip_vers(route.prefix)) .arg(route.prefix.to_string()); - cmd = match &route.node { - NetNode::Address(ref addr) => cmd.arg("-gateway").arg(addr.to_string()), - NetNode::Device(device) => cmd.arg("-interface").arg(&device), + + if let Some(addr) = route.node.get_address() { + cmd.arg("-gateway").arg(addr.to_string()); + } else if let Some(device) = route.node.get_device() { + cmd.arg("-interface").arg(device); + } + + futures::lazy(move || cmd.spawn_async().into_future().and_then(|f| f)) + .map_err(Error::FailedToAddRoute) + } + + fn shutdown_future( + &self, + shutdown_done_tx: Option<oneshot::Sender<()>>, + ) -> impl Future<Item = (), Error = ()> + Send { + let remove_route_future = |route: &Route| { + Self::delete_route(route.prefix).then(|removal| { + match removal { + Ok(status) => { + if !status.success() { + log::debug!("Failed to remove route during shutdown"); + } + } + Err(e) => log::error!("Failed to remove route during shutdown - {}", e), + }; + Ok(()) + }) + }; + let mut routes_to_remove: Vec<_> = self + .applied_routes + .iter() + .map(remove_route_future) + .collect(); + routes_to_remove.extend(self.default_destinations.iter().filter_map(|dest| { + match (&self.v4_gateway, &self.v6_gateway, dest.is_ipv4()) { + (Some(gateway), _, true) | (_, Some(gateway), false) => { + let route = Route::new(gateway.clone(), *dest); + Some(remove_route_future(&route)) + } + _ => None, + } + })); + stream::futures_ordered(routes_to_remove) + .for_each(|_| Ok(())) + .and_then(|_| { + if let Some(tx) = shutdown_done_tx { + if tx.send(()).is_err() { + log::debug!("RouteManager already dropped") + } + } + Ok(()) + }) + } + + fn apply_new_default_routes( + &self, + new_v4: Option<Node>, + new_v6: Option<Node>, + ) -> impl Future<Item = (), Error = ()> + Send { + let apply_route_future = |route: &Route| { + let add_route_future = Self::add_route(route); + // always try to remove old route first - if it's still set, the new one won't be + // applied + Self::delete_route(route.prefix) + .then(|_| add_route_future) + .then(|addition| { + match addition { + Ok(status) => { + if !status.success() { + log::info!("Failed to reapply route"); + } + } + Err(e) => log::error!("Failed to reset route: {}", e), + } + Ok(()) + }) }; - cmd.into_expr() - .run_expr() - .map_err(Error::FailedToAddRoute)?; - self.set_routes.insert(route); - Ok(()) + + let add_new_routes = self.default_destinations.iter().filter_map(|dest| { + match (&new_v4, &new_v6, dest.is_ipv4()) { + (Some(gateway), _, true) | (_, Some(gateway), false) => { + let new_route = Route::new(gateway.clone(), *dest); + Some(apply_route_future(&new_route)) + } + + _ => None, + } + }); + + stream::futures_ordered(add_new_routes).for_each(|_| Ok(())) } -} -fn ip_vers(route: &Route) -> &'static str { - if route.prefix.is_ipv4() { - "-inet" - } else { - "-inet6" + fn get_default_routes_future( + &self, + ) -> impl Future<Item = (Option<Node>, Option<Node>), Error = Error> + Send { + Self::get_default_node_cmd("-inet").join(Self::get_default_node_cmd("-inet6")) } } -impl super::RoutingT for RouteManager { +impl Future for RouteManagerImpl { + type Item = (); type Error = Error; + fn poll(&mut self) -> Result<Async<()>> { + if let Some(mut shutdown_rx) = self.shutdown_rx.take() { + match shutdown_rx.poll() { + Ok(Async::Ready(shutdown_tx)) => { + self.current_state = RouteManagerState::Shutdown(Box::new( + self.shutdown_future(Some(shutdown_tx)), + )); + } + // handle is already dropped + Err(_) => { + self.current_state = + RouteManagerState::Shutdown(Box::new(self.shutdown_future(None))); + } + Ok(Async::NotReady) => { + self.shutdown_rx = Some(shutdown_rx); + } + }; + } - fn new() -> Result<Self> { - Ok(Self { - set_routes: HashSet::new(), - }) - } - fn add_routes(&mut self, required_routes: RequiredRoutes) -> Result<()> { - for route in required_routes.routes.into_iter() { - if let Err(e) = self.add_route(route) { - let _ = self.delete_routes(); - return Err(e); + loop { + match &mut self.current_state { + RouteManagerState::Listening(listener) => { + match listener.poll().map_err(Error::FailedToMonitorRoutes)? { + Async::Ready(()) => { + self.current_state = RouteManagerState::ObtainingDefaultRoutes( + Box::new(self.get_default_routes_future()), + ); + } + Async::NotReady => break, + } + } + + RouteManagerState::ObtainingDefaultRoutes(f) => match f.poll()? { + Async::Ready((v4_gateway, v6_gateway)) => { + self.current_state = RouteManagerState::Applying(Box::new( + self.apply_new_default_routes(v4_gateway, v6_gateway), + )); + } + Async::NotReady => break, + }, + + RouteManagerState::Applying(f) => { + match f.poll() { + // the future for reapplying routes never fails - just logs failures + Err(_) => unreachable!(), + Ok(Async::NotReady) => break, + Ok(Async::Ready(_)) => { + self.current_state = RouteManagerState::Listening( + ChangeListener::new().map_err(Error::FailedToMonitorRoutes)?, + ); + } + } + } + + RouteManagerState::Shutdown(shutdown_future) => { + return Ok(shutdown_future.poll().unwrap_or(Async::Ready(()))); + } } } - Ok(()) + + Ok(Async::NotReady) } +} - fn delete_routes(&mut self) -> Result<()> { - let mut end_result = Ok(()); - for route in self.set_routes.drain() { - let result = duct::cmd!( - "route", - "-q", - "-n", - "delete", - ip_vers(&route), - route.prefix.to_string() - ) - .run_expr() - .map_err(Error::FailedToRemoveRoute); - if let Err(e) = result { - log::error!("failed to reset remove route: {}", e); - end_result = Err(e); - } - } - // returning the last error as to signal some kind of failure. - end_result +fn ip_vers(prefix: IpNetwork) -> &'static str { + if prefix.is_ipv4() { + "-inet" + } else { + "-inet6" } +} - fn get_default_route_node(&mut self) -> Result<IpAddr> { - let output = duct::cmd!("route", "-n", "get", "default") - .stdout() - .map_err(Error::FailedToRunIp)?; - let ip_str: &str = output - .lines() - .find(|line| line.trim().starts_with("gateway: ")) - .and_then(|line| line.trim().split_whitespace().skip(1).next()) - .ok_or(Error::NoDefaultRoute)?; +pub struct ChangeListener { + process: Child, + lines: tokio_io::io::Lines<std::io::BufReader<tokio_process::ChildStdout>>, +} + +impl ChangeListener { + pub fn new() -> ::std::io::Result<Self> { + let mut cmd = Command::new("route"); + cmd.arg("-vn").arg("monitor").stdout(Stdio::piped()); - ip_str - .parse() - .map_err(|e| Error::ParseDefaultRoute(ip_str.to_owned(), e)) + let mut process = cmd.spawn_async()?; + + let reader = ::std::io::BufReader::new(process.stdout().take().unwrap()); + let lines = tokio_io::io::lines(reader); + + Ok(Self { process, lines }) + } +} + +impl Future for ChangeListener { + type Item = (); + type Error = ::std::io::Error; + + fn poll(&mut self) -> ::std::io::Result<Async<Self::Item>> { + match self.process.poll() { + Ok(Async::NotReady) => (), + Ok(Async::Ready(status)) => { + log::debug!("route listener exited - {:?}", status); + return Ok(Async::Ready(())); + } + Err(e) => return Err(e), + }; + loop { + return match self.lines.poll()? { + Async::NotReady => Ok(Async::NotReady), + Async::Ready(Some(line)) => { + if line.starts_with("RTM_DELETE") || line.starts_with("RTM_ADD") { + Ok(Async::Ready(())) + } else { + continue; + } + } + Async::Ready(None) => Ok(Async::Ready(())), + }; + } } } diff --git a/talpid-core/src/routing/mod.rs b/talpid-core/src/routing/mod.rs index 1fdd818025..2651e9e2f7 100644 --- a/talpid-core/src/routing/mod.rs +++ b/talpid-core/src/routing/mod.rs @@ -1,107 +1,181 @@ +#![cfg_attr(target_os = "android", allow(dead_code))] +// TODO: remove the allow(dead_code) for android once it's up to scratch. +use futures::{sync::oneshot, Future}; use ipnetwork::IpNetwork; -use std::net::IpAddr; +use std::{collections::HashMap, net::IpAddr}; +use tokio_executor::Executor; #[cfg(target_os = "macos")] #[path = "macos.rs"] mod imp; #[cfg(target_os = "linux")] -#[path = "linux.rs"] +#[path = "linux/mod.rs"] mod imp; #[cfg(target_os = "android")] #[path = "android.rs"] mod imp; -pub use self::imp::Error; +pub use imp::Error as PlatformError; -#[cfg(any(target_os = "macos", target_os = "linux"))] -mod subprocess; +/// Errors that can be encountered whilst initializing RouteManager +#[derive(err_derive::Error, Debug)] +pub enum Error { + /// Platform sepcific error occured + #[error(display = "Failed to create route manager")] + FailedToInitializeManager(#[error(cause)] imp::Error), + /// Failed to spawn route manager future + #[error(display = "Failed to spawn route manager on the provided executor")] + FailedToSpawnManager, +} -/// A single route -#[derive(Hash, Eq, PartialEq)] -pub struct Route { - /// Route prefix - pub prefix: IpNetwork, - /// Route node - pub node: NetNode, +/// RouteManager applies a set of routes to the route table. +/// If a destination has to be routed through the default node, +/// the route will be adjusted dynamically when the default route changes. +pub struct RouteManager { + tx: Option<oneshot::Sender<oneshot::Sender<()>>>, } -impl Route { - /// Create a new route - pub fn new(prefix: IpNetwork, node: NetNode) -> Self { - Self { prefix, node } +impl RouteManager { + /// Constructs a RouteManager and applies the required routes. + /// Takes a map of network destinations and network nodes as an argument, and applies said + /// routes. + pub fn new( + required_routes: HashMap<IpNetwork, NetNode>, + exec: &mut impl Executor, + ) -> Result<Self, Error> { + let (tx, rx) = oneshot::channel(); + + + let route_manager = imp::RouteManagerImpl::new(required_routes, rx) + .map_err(Error::FailedToInitializeManager)?; + exec.spawn(Box::new( + route_manager.map_err(|e| log::error!("Routing manager failed - {}", e)), + )) + .map_err(|_| Error::FailedToSpawnManager)?; + + Ok(Self { tx: Some(tx) }) } -} -/// A network node for a given route -#[derive(Hash, Eq, PartialEq, Clone)] -pub enum NetNode { - /// For routing something through a network host - Address(IpAddr), - /// For routing something through an interface - Device(String), + /// Stops RouteManager and removes all of the applied routes. + pub fn stop(&mut self) { + if let Some(tx) = self.tx.take() { + let (wait_tx, wait_rx) = oneshot::channel(); + if let Err(_) = tx.send(wait_tx) { + log::error!("RouteManager already down!"); + return; + } + + if let Err(_) = wait_rx.wait() { + log::error!("RouteManager paniced while shutting down"); + } + } + } } -/// Contains a set of routes to be added -pub struct RequiredRoutes { - /// List of routes to be applied to the routing table. - pub routes: Vec<Route>, +impl Drop for RouteManager { + fn drop(&mut self) { + self.stop(); + } } -/// Manages adding and removing routes from the routing table. -pub struct RouteManager { - inner: imp::RouteManager, + +/// A netowrk route with a specific network node, destinaiton and an optional metric. +#[derive(Debug, Hash, Eq, PartialEq, Clone)] +struct Route { + node: Node, + prefix: IpNetwork, + metric: Option<u32>, } -impl RouteManager { - /// Creates a new RouteManager. - pub fn new() -> Result<Self, Error> { - Ok(RouteManager { - inner: imp::RouteManager::new()?, - }) +impl Route { + fn new(node: Node, prefix: IpNetwork) -> Self { + Self { + node, + prefix, + metric: None, + } } +} - /// Set routes in the routing table. - pub fn add_routes(&mut self, required_routes: RequiredRoutes) -> Result<(), Error> { - self.inner.add_routes(required_routes) - } +/// A network route that should be applied by the RouteManager. +/// It can either be routed through a specific network node or it can be routed through the current +/// default route. +#[derive(Debug, Hash, Eq, PartialEq, Clone)] +pub struct RequiredRoute { + prefix: IpNetwork, + node: NetNode, +} - /// Remove previously set routes from the routing table. - pub fn delete_routes(&mut self) -> Result<(), Error> { - self.inner.delete_routes() +impl RequiredRoute { + /// Constructs a new required route. + pub fn new(prefix: IpNetwork, node: impl Into<NetNode>) -> Self { + Self { + node: node.into(), + prefix, + } } +} - /// Retrieves the gateway for the default route. - pub fn get_default_route_node(&mut self) -> Result<std::net::IpAddr, Error> { - // use routing::RoutingT; - self.inner.get_default_route_node() - } +/// A NetNode represents a network node - either a real one or a symbolic default one. +/// A route with a symbolic default node will be changed whenever a new default route is created. +#[derive(Debug, Hash, Eq, PartialEq, Clone)] +pub enum NetNode { + /// A real node will be used to set a regular route that will remain unchanged for the lifetime + /// of the RouteManager + RealNode(Node), + /// A default node is a symbolic node that will resolve to the network node used in the current + /// most preferable default route + DefaultNode, } -impl Drop for RouteManager { - fn drop(&mut self) { - if let Err(e) = self.delete_routes() { - log::error!("Failed to reset routes on drop - {}", e); - } +impl From<Node> for NetNode { + fn from(node: Node) -> NetNode { + NetNode::RealNode(node) } } -/// This trait unifies platform specific implementations of route managers -pub trait RoutingT: Sized { - /// Error type of the implementation - type Error: std::error::Error; +/// Node represents a real network node - it can be identified by a network interface name, an IP +/// address or both. +#[derive(Debug, Hash, Eq, PartialEq, Clone)] +pub struct Node { + ip: Option<IpAddr>, + device: Option<String>, +} - /// Creates a new router - fn new() -> Result<Self, Self::Error>; +impl Node { + /// Construct an Node with both an IP address and an interface name. + pub fn new(address: IpAddr, iface_name: String) -> Self { + Self { + ip: Some(address), + device: Some(iface_name), + } + } + + /// Construct an Node from an IP address. + pub fn address(address: IpAddr) -> Node { + Self { + ip: Some(address), + device: None, + } + } - /// Adds routes to the system routing table. - fn add_routes(&mut self, required_routes: RequiredRoutes) -> Result<(), Self::Error>; + /// Construct a Node from a network interface name. + pub fn device(iface_name: String) -> Node { + Self { + ip: None, + device: Some(iface_name), + } + } - /// Removes previously set routes. If routes were set for specific tables, the whole tables - /// will be removed. - fn delete_routes(&mut self) -> Result<(), Self::Error>; + /// Retrieve a node's IP address + pub fn get_address(&self) -> Option<IpAddr> { + self.ip + } - /// Retrieves the gateway for the default route - fn get_default_route_node(&mut self) -> Result<std::net::IpAddr, Self::Error>; + /// Retrieve a node's network interface name + pub fn get_device(&self) -> Option<&str> { + self.device.as_ref().map(|s| s.as_ref()) + } } diff --git a/talpid-core/src/routing/subprocess.rs b/talpid-core/src/routing/subprocess.rs deleted file mode 100644 index 8b6292c1b4..0000000000 --- a/talpid-core/src/routing/subprocess.rs +++ /dev/null @@ -1,46 +0,0 @@ -use duct::Expression; -use std::ffi::{OsStr, OsString}; - -pub trait RunExpr: Sized { - fn run_expr(self) -> ::std::io::Result<()>; - fn stdout(self) -> ::std::io::Result<String>; -} - - -impl RunExpr for Expression { - fn run_expr(self) -> ::std::io::Result<()> { - log::trace!("Executing command - {:?}", self); - self.run().map(|_| ()) - } - - fn stdout(self) -> ::std::io::Result<String> { - log::trace!("Executing command - {:?}", self); - self.stdout_capture() - .run() - .map(|output| String::from_utf8_lossy(&output.stdout).into_owned()) - } -} - - -pub struct Exec { - cmd: OsString, - args: Vec<OsString>, -} - -impl Exec { - pub fn cmd<S: AsRef<OsStr>>(cmd: S) -> Exec { - Exec { - cmd: cmd.as_ref().to_owned(), - args: vec![], - } - } - - pub fn arg<S: AsRef<OsStr>>(mut self, arg: S) -> Exec { - self.args.push(arg.as_ref().to_owned()); - self - } - - pub fn into_expr(self) -> Expression { - duct::cmd(self.cmd, self.args) - } -} diff --git a/talpid-core/src/tunnel/wireguard/mod.rs b/talpid-core/src/tunnel/wireguard/mod.rs index 79034f22cd..24805f7dbe 100644 --- a/talpid-core/src/tunnel/wireguard/mod.rs +++ b/talpid-core/src/tunnel/wireguard/mod.rs @@ -3,7 +3,7 @@ use self::config::Config; use super::{TunnelEvent, TunnelMetadata}; use crate::routing; -use std::{io, path::Path, sync::mpsc}; +use std::{collections::HashMap, io, path::Path, sync::mpsc}; pub mod config; mod ping_monitor; @@ -53,7 +53,7 @@ pub struct WireguardMonitor { /// Tunnel implementation tunnel: Box<dyn Tunnel>, /// Route manager - router: routing::RouteManager, + route_handle: routing::RouteManager, /// Callback to signal tunnel events event_callback: Box<dyn Fn(TunnelEvent) + Send + Sync + 'static>, close_msg_sender: mpsc::Sender<CloseMsg>, @@ -67,17 +67,21 @@ impl WireguardMonitor { on_event: F, ) -> Result<WireguardMonitor> { let tunnel = Box::new(WgGoTunnel::start_tunnel(&config, log_path)?); - let router = routing::RouteManager::new().map_err(Error::SetupRoutingError)?; + let iface_name = tunnel.get_interface_name(); + let route_handle = routing::RouteManager::new( + Self::get_routes(iface_name, &config), + &mut tokio_executor::DefaultExecutor::current(), + ) + .map_err(Error::SetupRoutingError)?; let event_callback = Box::new(on_event.clone()); let (close_msg_sender, close_msg_receiver) = mpsc::channel(); - let mut monitor = WireguardMonitor { + let monitor = WireguardMonitor { tunnel, - router, + route_handle, event_callback, close_msg_sender, close_msg_receiver, }; - monitor.setup_routing(&config)?; let metadata = monitor.tunnel_metadata(&config); let iface_name = monitor.tunnel.get_interface_name().to_string(); @@ -101,7 +105,6 @@ impl WireguardMonitor { let _ = close_sender.send(CloseMsg::PingErr); }); - Ok(monitor) } @@ -121,9 +124,7 @@ impl WireguardMonitor { // Clear routes manually - otherwise there will be some log spam since the tunnel device // can be removed before the routes are cleared, which automatically clears some of the // routes that were set. - if let Err(e) = self.router.delete_routes() { - log::error!("Failed to remove a route from the routing table - {}", e); - } + self.route_handle.stop(); if let Err(e) = self.tunnel.stop() { log::error!("Failed to stop tunnel - {}", e); @@ -132,40 +133,44 @@ impl WireguardMonitor { wait_result } - fn setup_routing(&mut self, config: &Config) -> Result<()> { - let iface_name = self.tunnel.get_interface_name(); - let mut routes: Vec<_> = config + fn get_routes( + iface_name: &str, + config: &Config, + ) -> HashMap<ipnetwork::IpNetwork, crate::routing::NetNode> { + let node = routing::Node::device(iface_name.to_string()); + let mut routes: HashMap<_, _> = config .peers .iter() .flat_map(|peer| peer.allowed_ips.iter()) .cloned() - .map(|allowed_ip| { - routing::Route::new(allowed_ip, routing::NetNode::Device(iface_name.to_string())) + .flat_map(|allowed_ip| { + if allowed_ip.prefix() == 0 { + if allowed_ip.is_ipv4() { + vec![ + ("0.0.0.0/1".parse().unwrap(), node.clone().into()), + ("128.0.0.0/1".parse().unwrap(), node.clone().into()), + ] + } else { + vec![ + ("8000::/1".parse().unwrap(), node.clone().into()), + ("::/1".parse().unwrap(), node.clone().into()), + ] + } + } else { + vec![(allowed_ip, node.clone().into())] + } }) .collect(); - // To survive network roaming, we should listen for new routes and reapply them - // here - probably would need RouteManager be extended. Or maybe RouteManager can deal - // with it on it's own - let default_node = self - .router - .get_default_route_node() - .map_err(Error::SetupRoutingError)?; - // route endpoints with specific routes for peer in config.peers.iter() { - let default_route = routing::Route::new( + routes.insert( peer.endpoint.ip().into(), - routing::NetNode::Address(default_node), + routing::NetNode::DefaultNode.into(), ); - routes.push(default_route); } - let required_routes = routing::RequiredRoutes { routes }; - - self.router - .add_routes(required_routes) - .map_err(Error::SetupRoutingError) + routes } fn tunnel_metadata(&self, config: &Config) -> TunnelMetadata { @@ -189,7 +194,6 @@ pub struct CloseHandle { chan: mpsc::Sender<CloseMsg>, } - impl CloseHandle { pub fn close(&mut self) { if let Err(e) = self.chan.send(CloseMsg::Stop) { |
