summaryrefslogtreecommitdiffhomepage
path: root/talpid-core/src
diff options
context:
space:
mode:
authorDavid Lönnhager <david.l@mullvad.net>2020-05-06 20:29:29 +0200
committerDavid Lönnhager <david.l@mullvad.net>2020-05-13 17:56:36 +0200
commit3369c5116d10b53a2fcbcdf6173cee61fcd5d483 (patch)
tree474c9589864a69fc871da78eac6a18554a79d596 /talpid-core/src
parentba215529bbf0c6bbe708c538d0260f565833d77d (diff)
downloadmullvadvpn-3369c5116d10b53a2fcbcdf6173cee61fcd5d483.tar.xz
mullvadvpn-3369c5116d10b53a2fcbcdf6173cee61fcd5d483.zip
Add RouteManagerCommand
Diffstat (limited to 'talpid-core/src')
-rw-r--r--talpid-core/src/routing/linux.rs57
-rw-r--r--talpid-core/src/routing/unix.rs31
2 files changed, 60 insertions, 28 deletions
diff --git a/talpid-core/src/routing/linux.rs b/talpid-core/src/routing/linux.rs
index 124c25e319..f128c71fc9 100644
--- a/talpid-core/src/routing/linux.rs
+++ b/talpid-core/src/routing/linux.rs
@@ -1,18 +1,16 @@
-use crate::routing::{NetNode, Node, RequiredRoute, Route};
+use crate::routing::{imp::RouteManagerCommand, NetNode, Node, RequiredRoute, Route};
use ipnetwork::IpNetwork;
use std::{
collections::{BTreeMap, HashSet},
io,
net::IpAddr,
+ thread,
};
-use futures01::sync::oneshot as old_oneshot;
+use futures01::{stream::Stream as old_stream, sync::mpsc as old_mpsc};
-use futures::{
- channel::mpsc::UnboundedReceiver, compat::Future01CompatExt, future::FutureExt, StreamExt,
- TryStreamExt,
-};
+use futures::{channel::mpsc::UnboundedReceiver, future::FutureExt, StreamExt, TryStreamExt};
use netlink_packet_route::{
@@ -65,7 +63,7 @@ pub enum Error {
}
pub struct RouteManagerImpl {
- shutdown_rx: old_oneshot::Receiver<old_oneshot::Sender<()>>,
+ manage_rx: old_mpsc::UnboundedReceiver<RouteManagerCommand>,
manager: RouteManagerImplInner,
runtime: tokio02::runtime::Runtime,
}
@@ -74,7 +72,7 @@ impl RouteManagerImpl {
/// Creates a new RouteManagerImplInner.
pub fn new(
required_routes: HashSet<RequiredRoute>,
- shutdown_rx: old_oneshot::Receiver<old_oneshot::Sender<()>>,
+ manage_rx: old_mpsc::UnboundedReceiver<RouteManagerCommand>,
) -> Result<Self> {
let mut runtime = tokio02::runtime::Builder::new()
.basic_scheduler()
@@ -87,7 +85,7 @@ impl RouteManagerImpl {
let manager = runtime.block_on(RouteManagerImplInner::new(required_routes))?;
Ok(Self {
- shutdown_rx,
+ manage_rx,
runtime,
manager,
})
@@ -95,11 +93,28 @@ impl RouteManagerImpl {
pub fn wait(self) -> Result<()> {
let Self {
- shutdown_rx,
+ manage_rx,
mut runtime,
manager,
} = self;
- runtime.block_on(manager.into_future(shutdown_rx))
+
+ let (new_manage_tx, new_manage_rx) = futures::channel::mpsc::unbounded();
+
+ thread::spawn(move || {
+ for msg in manage_rx.wait() {
+ match msg {
+ Ok(msg) => {
+ if new_manage_tx.unbounded_send(msg).is_err() {
+ log::error!("RouteManager receiver unexpectedly dropped");
+ break;
+ }
+ }
+ Err(_) => break,
+ }
+ }
+ });
+
+ runtime.block_on(manager.into_future(new_manage_rx))
}
}
@@ -159,7 +174,6 @@ impl RouteManagerImplInner {
}
}
-
let mut monitor = Self {
iface_map,
handle,
@@ -398,19 +412,20 @@ impl RouteManagerImplInner {
pub async fn into_future(
mut self,
- shutdown_rx: futures01::sync::oneshot::Receiver<futures01::sync::oneshot::Sender<()>>,
+ mut manage_rx: UnboundedReceiver<RouteManagerCommand>,
) -> Result<()> {
- let mut shutdown = shutdown_rx.compat().fuse();
loop {
futures::select! {
- shutdown_signal = shutdown => {
- log::trace!("Shutting down route manager");
- self.cleanup_routes().await;
- log::trace!("Route manager done");
- if let Ok(shutdown_signal) = shutdown_signal {
- let _ = shutdown_signal.send(());
+ command = manage_rx.select_next_some().fuse() => {
+ match command {
+ RouteManagerCommand::Shutdown(shutdown_signal) => {
+ log::trace!("Shutting down route manager");
+ self.cleanup_routes().await;
+ log::trace!("Route manager done");
+ let _ = shutdown_signal.send(());
+ return Ok(());
+ }
}
- return Ok(());
},
(route_change, socket) = self.messages.select_next_some().fuse() => {
self.process_netlink_message(route_change).await?;
diff --git a/talpid-core/src/routing/unix.rs b/talpid-core/src/routing/unix.rs
index 8dbfdf4d56..5b123d180f 100644
--- a/talpid-core/src/routing/unix.rs
+++ b/talpid-core/src/routing/unix.rs
@@ -2,7 +2,13 @@
#![cfg_attr(target_os = "windows", allow(dead_code))]
// TODO: remove the allow(dead_code) for android once it's up to scratch.
use super::RequiredRoute;
-use futures01::{sync::oneshot, Future};
+use futures01::{
+ sync::{
+ mpsc::{unbounded, UnboundedSender},
+ oneshot,
+ },
+ Future,
+};
use std::{collections::HashSet, sync::mpsc::sync_channel};
#[cfg(target_os = "macos")]
@@ -33,11 +39,16 @@ pub enum Error {
FailedToSpawnManager,
}
+#[derive(Debug)]
+pub enum RouteManagerCommand {
+ Shutdown(oneshot::Sender<()>),
+}
+
/// RouteManager applies a set of routes to the route table.
/// If a destination has to be routed through the default node,
/// the route will be adjusted dynamically when the default route changes.
pub struct RouteManager {
- tx: Option<oneshot::Sender<oneshot::Sender<()>>>,
+ manage_tx: Option<UnboundedSender<RouteManagerCommand>>,
}
impl RouteManager {
@@ -45,11 +56,11 @@ impl RouteManager {
/// Takes a set of network destinations and network nodes as an argument, and applies said
/// routes.
pub fn new(required_routes: HashSet<RequiredRoute>) -> Result<Self, Error> {
- let (tx, rx) = oneshot::channel();
+ let (manage_tx, manage_rx) = unbounded();
let (start_tx, start_rx) = sync_channel(1);
std::thread::spawn(
- move || match imp::RouteManagerImpl::new(required_routes, rx) {
+ move || match imp::RouteManagerImpl::new(required_routes, manage_rx) {
Ok(route_manager) => {
let _ = start_tx.send(Ok(()));
if let Err(e) = route_manager.wait() {
@@ -62,7 +73,9 @@ impl RouteManager {
},
);
match start_rx.recv() {
- Ok(Ok(())) => Ok(Self { tx: Some(tx) }),
+ Ok(Ok(())) => Ok(Self {
+ manage_tx: Some(manage_tx),
+ }),
Ok(Err(e)) => Err(e),
Err(_) => Err(Error::RoutingManagerThreadPanic),
}
@@ -70,9 +83,13 @@ impl RouteManager {
/// Stops RouteManager and removes all of the applied routes.
pub fn stop(&mut self) {
- if let Some(tx) = self.tx.take() {
+ if let Some(tx) = self.manage_tx.take() {
let (wait_tx, wait_rx) = oneshot::channel();
- if tx.send(wait_tx).is_err() {
+
+ if tx
+ .unbounded_send(RouteManagerCommand::Shutdown(wait_tx))
+ .is_err()
+ {
log::error!("RouteManager already down!");
return;
}