diff options
| author | Emīls <emils@mullvad.net> | 2020-06-04 17:02:58 +0100 |
|---|---|---|
| committer | Emīls <emils@mullvad.net> | 2020-06-05 15:02:29 +0100 |
| commit | 7e988ff92914d7582ca6cb7dc92ec844fa6edcdc (patch) | |
| tree | f9c4ee99b28813e5e1793d27a53f3d532f40759b /talpid-core | |
| parent | 6f70216af1c0727b12ba578991b4908727798342 (diff) | |
| download | mullvadvpn-7e988ff92914d7582ca6cb7dc92ec844fa6edcdc.tar.xz mullvadvpn-7e988ff92914d7582ca6cb7dc92ec844fa6edcdc.zip | |
Simplify routing code on Linux
Diffstat (limited to 'talpid-core')
| -rw-r--r-- | talpid-core/src/routing/linux.rs | 83 | ||||
| -rw-r--r-- | talpid-core/src/routing/unix.rs | 6 |
2 files changed, 12 insertions, 77 deletions
diff --git a/talpid-core/src/routing/linux.rs b/talpid-core/src/routing/linux.rs index 71becd2a63..ba5ffb66a6 100644 --- a/talpid-core/src/routing/linux.rs +++ b/talpid-core/src/routing/linux.rs @@ -13,16 +13,9 @@ use std::{ io::{self, BufRead, BufReader, Read, Seek, Write}, net::{IpAddr, Ipv4Addr}, process::Command, - thread, }; -use futures01::{stream::Stream as old_stream, sync::mpsc as old_mpsc}; - -use futures::{ - channel::mpsc::{self, UnboundedReceiver}, - future::FutureExt, - StreamExt, TryStreamExt, -}; +use futures::{channel::mpsc::UnboundedReceiver, future::FutureExt, StreamExt, TryStreamExt}; use netlink_packet_route::{ @@ -96,69 +89,13 @@ pub enum Error { IpFailed, } -pub struct RouteManagerImpl { - manage_rx: old_mpsc::UnboundedReceiver<RouteManagerCommand>, - manager: RouteManagerImplInner, - runtime: tokio02::runtime::Runtime, -} - -impl RouteManagerImpl { - /// Creates a new RouteManagerImplInner. - pub fn new( - required_routes: HashSet<RequiredRoute>, - manage_rx: old_mpsc::UnboundedReceiver<RouteManagerCommand>, - ) -> Result<Self> { - let mut runtime = tokio02::runtime::Builder::new() - .basic_scheduler() - .core_threads(1) - .enable_all() - .thread_name("mullvad-route-manager-event-loop") - .build() - .map_err(Error::EventLoopError)?; - - let manager = runtime.block_on(RouteManagerImplInner::new(required_routes))?; - - Ok(Self { - manage_rx, - runtime, - manager, - }) - } - - pub fn wait(self) -> Result<()> { - let Self { - manage_rx, - mut runtime, - manager, - } = self; - - let (new_manage_tx, new_manage_rx) = mpsc::unbounded(); - - thread::spawn(move || { - for msg in manage_rx.wait() { - match msg { - Ok(msg) => { - if new_manage_tx.unbounded_send(msg).is_err() { - log::error!("RouteManager receiver unexpectedly dropped"); - break; - } - } - Err(_) => break, - } - } - }); - - runtime.block_on(manager.into_future(new_manage_rx)) - } -} - #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)] struct RequiredDefaultRoute { table_id: u8, destination: IpNetwork, } -pub struct RouteManagerImplInner { +pub struct RouteManagerImpl { handle: Handle, messages: UnboundedReceiver<(NetlinkMessage<RtnlMessage>, SocketAddr)>, iface_map: BTreeMap<u32, String>, @@ -175,7 +112,7 @@ pub struct RouteManagerImplInner { split_table_id: i32, } -impl RouteManagerImplInner { +impl RouteManagerImpl { pub async fn new(required_routes: HashSet<RequiredRoute>) -> Result<Self> { let (mut connection, handle, messages) = rtnetlink::new_connection().map_err(Error::ConnectError)?; @@ -627,13 +564,11 @@ impl RouteManagerImplInner { } - pub async fn into_future( - mut self, - mut manage_rx: UnboundedReceiver<RouteManagerCommand>, - ) -> Result<()> { + pub async fn run(mut self, manage_rx: UnboundedReceiver<RouteManagerCommand>) -> Result<()> { + let mut manage_rx = manage_rx.fuse(); loop { futures::select! { - command = manage_rx.select_next_some().fuse() => { + command = manage_rx.select_next_some() => { self.process_command(command).await?; }, (route_change, socket) = self.messages.select_next_some().fuse() => { @@ -932,7 +867,7 @@ impl RouteManagerImplInner { } } -impl Drop for RouteManagerImplInner { +impl Drop for RouteManagerImpl { fn drop(&mut self) { futures::executor::block_on(self.cleanup_routes()) } @@ -970,7 +905,7 @@ mod test { fn test_drop_in_executor() { let mut runtime = tokio02::runtime::Runtime::new().expect("Failed to initialize runtime"); runtime.block_on(async { - let manager = RouteManagerImplInner::new(HashSet::new()) + let manager = RouteManagerImpl::new(HashSet::new()) .await .expect("Failed to initialize route manager"); std::mem::drop(manager); @@ -982,7 +917,7 @@ mod test { fn test_drop() { let mut runtime = tokio02::runtime::Runtime::new().expect("Failed to initialize runtime"); let manager = runtime.block_on(async { - RouteManagerImplInner::new(HashSet::new()) + RouteManagerImpl::new(HashSet::new()) .await .expect("Failed to initialize route manager") }); diff --git a/talpid-core/src/routing/unix.rs b/talpid-core/src/routing/unix.rs index 90bbc19b03..38896a4392 100644 --- a/talpid-core/src/routing/unix.rs +++ b/talpid-core/src/routing/unix.rs @@ -148,7 +148,7 @@ impl RouteManager { /// Route PID-associated packets through the physical interface. #[cfg(target_os = "linux")] - pub fn enable_exclusions_routes(&self) -> Result<(), Error> { + pub fn enable_exclusions_routes(&mut self) -> Result<(), Error> { if let Some(tx) = &self.manage_tx { let (result_tx, result_rx) = oneshot::channel(); if tx @@ -158,7 +158,7 @@ impl RouteManager { return Err(Error::RouteManagerDown); } - match result_rx.wait() { + match self.runtime.block_on(result_rx) { Ok(result) => result.map_err(Error::PlatformError), Err(error) => { log::trace!("{}", error.display_chain_with_msg("channel is closed")); @@ -206,7 +206,7 @@ impl RouteManager { return Err(Error::RouteManagerDown); } - match result_rx.wait() { + match self.runtime.block_on(result_rx) { Ok(result) => result.map_err(Error::PlatformError), Err(error) => { log::trace!("{}", error.display_chain_with_msg("channel is closed")); |
