diff options
| author | David Lönnhager <david.l@mullvad.net> | 2020-05-06 20:29:29 +0200 |
|---|---|---|
| committer | David Lönnhager <david.l@mullvad.net> | 2020-05-13 17:56:36 +0200 |
| commit | 3369c5116d10b53a2fcbcdf6173cee61fcd5d483 (patch) | |
| tree | 474c9589864a69fc871da78eac6a18554a79d596 /talpid-core/src | |
| parent | ba215529bbf0c6bbe708c538d0260f565833d77d (diff) | |
| download | mullvadvpn-3369c5116d10b53a2fcbcdf6173cee61fcd5d483.tar.xz mullvadvpn-3369c5116d10b53a2fcbcdf6173cee61fcd5d483.zip | |
Add RouteManagerCommand
Diffstat (limited to 'talpid-core/src')
| -rw-r--r-- | talpid-core/src/routing/linux.rs | 57 | ||||
| -rw-r--r-- | talpid-core/src/routing/unix.rs | 31 |
2 files changed, 60 insertions, 28 deletions
diff --git a/talpid-core/src/routing/linux.rs b/talpid-core/src/routing/linux.rs index 124c25e319..f128c71fc9 100644 --- a/talpid-core/src/routing/linux.rs +++ b/talpid-core/src/routing/linux.rs @@ -1,18 +1,16 @@ -use crate::routing::{NetNode, Node, RequiredRoute, Route}; +use crate::routing::{imp::RouteManagerCommand, NetNode, Node, RequiredRoute, Route}; use ipnetwork::IpNetwork; use std::{ collections::{BTreeMap, HashSet}, io, net::IpAddr, + thread, }; -use futures01::sync::oneshot as old_oneshot; +use futures01::{stream::Stream as old_stream, sync::mpsc as old_mpsc}; -use futures::{ - channel::mpsc::UnboundedReceiver, compat::Future01CompatExt, future::FutureExt, StreamExt, - TryStreamExt, -}; +use futures::{channel::mpsc::UnboundedReceiver, future::FutureExt, StreamExt, TryStreamExt}; use netlink_packet_route::{ @@ -65,7 +63,7 @@ pub enum Error { } pub struct RouteManagerImpl { - shutdown_rx: old_oneshot::Receiver<old_oneshot::Sender<()>>, + manage_rx: old_mpsc::UnboundedReceiver<RouteManagerCommand>, manager: RouteManagerImplInner, runtime: tokio02::runtime::Runtime, } @@ -74,7 +72,7 @@ impl RouteManagerImpl { /// Creates a new RouteManagerImplInner. pub fn new( required_routes: HashSet<RequiredRoute>, - shutdown_rx: old_oneshot::Receiver<old_oneshot::Sender<()>>, + manage_rx: old_mpsc::UnboundedReceiver<RouteManagerCommand>, ) -> Result<Self> { let mut runtime = tokio02::runtime::Builder::new() .basic_scheduler() @@ -87,7 +85,7 @@ impl RouteManagerImpl { let manager = runtime.block_on(RouteManagerImplInner::new(required_routes))?; Ok(Self { - shutdown_rx, + manage_rx, runtime, manager, }) @@ -95,11 +93,28 @@ impl RouteManagerImpl { pub fn wait(self) -> Result<()> { let Self { - shutdown_rx, + manage_rx, mut runtime, manager, } = self; - runtime.block_on(manager.into_future(shutdown_rx)) + + let (new_manage_tx, new_manage_rx) = futures::channel::mpsc::unbounded(); + + thread::spawn(move || { + for msg in manage_rx.wait() { + match msg { + Ok(msg) => { + if new_manage_tx.unbounded_send(msg).is_err() { + log::error!("RouteManager receiver unexpectedly dropped"); + break; + } + } + Err(_) => break, + } + } + }); + + runtime.block_on(manager.into_future(new_manage_rx)) } } @@ -159,7 +174,6 @@ impl RouteManagerImplInner { } } - let mut monitor = Self { iface_map, handle, @@ -398,19 +412,20 @@ impl RouteManagerImplInner { pub async fn into_future( mut self, - shutdown_rx: futures01::sync::oneshot::Receiver<futures01::sync::oneshot::Sender<()>>, + mut manage_rx: UnboundedReceiver<RouteManagerCommand>, ) -> Result<()> { - let mut shutdown = shutdown_rx.compat().fuse(); loop { futures::select! { - shutdown_signal = shutdown => { - log::trace!("Shutting down route manager"); - self.cleanup_routes().await; - log::trace!("Route manager done"); - if let Ok(shutdown_signal) = shutdown_signal { - let _ = shutdown_signal.send(()); + command = manage_rx.select_next_some().fuse() => { + match command { + RouteManagerCommand::Shutdown(shutdown_signal) => { + log::trace!("Shutting down route manager"); + self.cleanup_routes().await; + log::trace!("Route manager done"); + let _ = shutdown_signal.send(()); + return Ok(()); + } } - return Ok(()); }, (route_change, socket) = self.messages.select_next_some().fuse() => { self.process_netlink_message(route_change).await?; diff --git a/talpid-core/src/routing/unix.rs b/talpid-core/src/routing/unix.rs index 8dbfdf4d56..5b123d180f 100644 --- a/talpid-core/src/routing/unix.rs +++ b/talpid-core/src/routing/unix.rs @@ -2,7 +2,13 @@ #![cfg_attr(target_os = "windows", allow(dead_code))] // TODO: remove the allow(dead_code) for android once it's up to scratch. use super::RequiredRoute; -use futures01::{sync::oneshot, Future}; +use futures01::{ + sync::{ + mpsc::{unbounded, UnboundedSender}, + oneshot, + }, + Future, +}; use std::{collections::HashSet, sync::mpsc::sync_channel}; #[cfg(target_os = "macos")] @@ -33,11 +39,16 @@ pub enum Error { FailedToSpawnManager, } +#[derive(Debug)] +pub enum RouteManagerCommand { + Shutdown(oneshot::Sender<()>), +} + /// RouteManager applies a set of routes to the route table. /// If a destination has to be routed through the default node, /// the route will be adjusted dynamically when the default route changes. pub struct RouteManager { - tx: Option<oneshot::Sender<oneshot::Sender<()>>>, + manage_tx: Option<UnboundedSender<RouteManagerCommand>>, } impl RouteManager { @@ -45,11 +56,11 @@ impl RouteManager { /// Takes a set of network destinations and network nodes as an argument, and applies said /// routes. pub fn new(required_routes: HashSet<RequiredRoute>) -> Result<Self, Error> { - let (tx, rx) = oneshot::channel(); + let (manage_tx, manage_rx) = unbounded(); let (start_tx, start_rx) = sync_channel(1); std::thread::spawn( - move || match imp::RouteManagerImpl::new(required_routes, rx) { + move || match imp::RouteManagerImpl::new(required_routes, manage_rx) { Ok(route_manager) => { let _ = start_tx.send(Ok(())); if let Err(e) = route_manager.wait() { @@ -62,7 +73,9 @@ impl RouteManager { }, ); match start_rx.recv() { - Ok(Ok(())) => Ok(Self { tx: Some(tx) }), + Ok(Ok(())) => Ok(Self { + manage_tx: Some(manage_tx), + }), Ok(Err(e)) => Err(e), Err(_) => Err(Error::RoutingManagerThreadPanic), } @@ -70,9 +83,13 @@ impl RouteManager { /// Stops RouteManager and removes all of the applied routes. pub fn stop(&mut self) { - if let Some(tx) = self.tx.take() { + if let Some(tx) = self.manage_tx.take() { let (wait_tx, wait_rx) = oneshot::channel(); - if tx.send(wait_tx).is_err() { + + if tx + .unbounded_send(RouteManagerCommand::Shutdown(wait_tx)) + .is_err() + { log::error!("RouteManager already down!"); return; } |
