summaryrefslogtreecommitdiffhomepage
path: root/talpid-core
diff options
context:
space:
mode:
authorEmīls <emils@mullvad.net>2020-06-04 17:02:58 +0100
committerEmīls <emils@mullvad.net>2020-06-05 15:02:29 +0100
commit7e988ff92914d7582ca6cb7dc92ec844fa6edcdc (patch)
treef9c4ee99b28813e5e1793d27a53f3d532f40759b /talpid-core
parent6f70216af1c0727b12ba578991b4908727798342 (diff)
downloadmullvadvpn-7e988ff92914d7582ca6cb7dc92ec844fa6edcdc.tar.xz
mullvadvpn-7e988ff92914d7582ca6cb7dc92ec844fa6edcdc.zip
Simplify routing code on Linux
Diffstat (limited to 'talpid-core')
-rw-r--r--talpid-core/src/routing/linux.rs83
-rw-r--r--talpid-core/src/routing/unix.rs6
2 files changed, 12 insertions, 77 deletions
diff --git a/talpid-core/src/routing/linux.rs b/talpid-core/src/routing/linux.rs
index 71becd2a63..ba5ffb66a6 100644
--- a/talpid-core/src/routing/linux.rs
+++ b/talpid-core/src/routing/linux.rs
@@ -13,16 +13,9 @@ use std::{
io::{self, BufRead, BufReader, Read, Seek, Write},
net::{IpAddr, Ipv4Addr},
process::Command,
- thread,
};
-use futures01::{stream::Stream as old_stream, sync::mpsc as old_mpsc};
-
-use futures::{
- channel::mpsc::{self, UnboundedReceiver},
- future::FutureExt,
- StreamExt, TryStreamExt,
-};
+use futures::{channel::mpsc::UnboundedReceiver, future::FutureExt, StreamExt, TryStreamExt};
use netlink_packet_route::{
@@ -96,69 +89,13 @@ pub enum Error {
IpFailed,
}
-pub struct RouteManagerImpl {
- manage_rx: old_mpsc::UnboundedReceiver<RouteManagerCommand>,
- manager: RouteManagerImplInner,
- runtime: tokio02::runtime::Runtime,
-}
-
-impl RouteManagerImpl {
- /// Creates a new RouteManagerImplInner.
- pub fn new(
- required_routes: HashSet<RequiredRoute>,
- manage_rx: old_mpsc::UnboundedReceiver<RouteManagerCommand>,
- ) -> Result<Self> {
- let mut runtime = tokio02::runtime::Builder::new()
- .basic_scheduler()
- .core_threads(1)
- .enable_all()
- .thread_name("mullvad-route-manager-event-loop")
- .build()
- .map_err(Error::EventLoopError)?;
-
- let manager = runtime.block_on(RouteManagerImplInner::new(required_routes))?;
-
- Ok(Self {
- manage_rx,
- runtime,
- manager,
- })
- }
-
- pub fn wait(self) -> Result<()> {
- let Self {
- manage_rx,
- mut runtime,
- manager,
- } = self;
-
- let (new_manage_tx, new_manage_rx) = mpsc::unbounded();
-
- thread::spawn(move || {
- for msg in manage_rx.wait() {
- match msg {
- Ok(msg) => {
- if new_manage_tx.unbounded_send(msg).is_err() {
- log::error!("RouteManager receiver unexpectedly dropped");
- break;
- }
- }
- Err(_) => break,
- }
- }
- });
-
- runtime.block_on(manager.into_future(new_manage_rx))
- }
-}
-
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)]
struct RequiredDefaultRoute {
table_id: u8,
destination: IpNetwork,
}
-pub struct RouteManagerImplInner {
+pub struct RouteManagerImpl {
handle: Handle,
messages: UnboundedReceiver<(NetlinkMessage<RtnlMessage>, SocketAddr)>,
iface_map: BTreeMap<u32, String>,
@@ -175,7 +112,7 @@ pub struct RouteManagerImplInner {
split_table_id: i32,
}
-impl RouteManagerImplInner {
+impl RouteManagerImpl {
pub async fn new(required_routes: HashSet<RequiredRoute>) -> Result<Self> {
let (mut connection, handle, messages) =
rtnetlink::new_connection().map_err(Error::ConnectError)?;
@@ -627,13 +564,11 @@ impl RouteManagerImplInner {
}
- pub async fn into_future(
- mut self,
- mut manage_rx: UnboundedReceiver<RouteManagerCommand>,
- ) -> Result<()> {
+ pub async fn run(mut self, manage_rx: UnboundedReceiver<RouteManagerCommand>) -> Result<()> {
+ let mut manage_rx = manage_rx.fuse();
loop {
futures::select! {
- command = manage_rx.select_next_some().fuse() => {
+ command = manage_rx.select_next_some() => {
self.process_command(command).await?;
},
(route_change, socket) = self.messages.select_next_some().fuse() => {
@@ -932,7 +867,7 @@ impl RouteManagerImplInner {
}
}
-impl Drop for RouteManagerImplInner {
+impl Drop for RouteManagerImpl {
fn drop(&mut self) {
futures::executor::block_on(self.cleanup_routes())
}
@@ -970,7 +905,7 @@ mod test {
fn test_drop_in_executor() {
let mut runtime = tokio02::runtime::Runtime::new().expect("Failed to initialize runtime");
runtime.block_on(async {
- let manager = RouteManagerImplInner::new(HashSet::new())
+ let manager = RouteManagerImpl::new(HashSet::new())
.await
.expect("Failed to initialize route manager");
std::mem::drop(manager);
@@ -982,7 +917,7 @@ mod test {
fn test_drop() {
let mut runtime = tokio02::runtime::Runtime::new().expect("Failed to initialize runtime");
let manager = runtime.block_on(async {
- RouteManagerImplInner::new(HashSet::new())
+ RouteManagerImpl::new(HashSet::new())
.await
.expect("Failed to initialize route manager")
});
diff --git a/talpid-core/src/routing/unix.rs b/talpid-core/src/routing/unix.rs
index 90bbc19b03..38896a4392 100644
--- a/talpid-core/src/routing/unix.rs
+++ b/talpid-core/src/routing/unix.rs
@@ -148,7 +148,7 @@ impl RouteManager {
/// Route PID-associated packets through the physical interface.
#[cfg(target_os = "linux")]
- pub fn enable_exclusions_routes(&self) -> Result<(), Error> {
+ pub fn enable_exclusions_routes(&mut self) -> Result<(), Error> {
if let Some(tx) = &self.manage_tx {
let (result_tx, result_rx) = oneshot::channel();
if tx
@@ -158,7 +158,7 @@ impl RouteManager {
return Err(Error::RouteManagerDown);
}
- match result_rx.wait() {
+ match self.runtime.block_on(result_rx) {
Ok(result) => result.map_err(Error::PlatformError),
Err(error) => {
log::trace!("{}", error.display_chain_with_msg("channel is closed"));
@@ -206,7 +206,7 @@ impl RouteManager {
return Err(Error::RouteManagerDown);
}
- match result_rx.wait() {
+ match self.runtime.block_on(result_rx) {
Ok(result) => result.map_err(Error::PlatformError),
Err(error) => {
log::trace!("{}", error.display_chain_with_msg("channel is closed"));