summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorDavid Lönnhager <david.l@mullvad.net>2023-10-11 17:11:23 +0200
committerDavid Lönnhager <david.l@mullvad.net>2023-10-11 17:11:23 +0200
commit51e06a48b68c7aeb2fd2c19205a107c030ac649e (patch)
tree274a3118b124eee271cdd940fc5dffb77ed08078
parent9383325ea310ded42066b68402c1c3fc7ff63955 (diff)
parent968c1ebaf942976755974905553cde8f20cb7678 (diff)
downloadmullvadvpn-51e06a48b68c7aeb2fd2c19205a107c030ac649e.tar.xz
mullvadvpn-51e06a48b68c7aeb2fd2c19205a107c030ac649e.zip
Merge branch 'weekly-cleanup/rewrite-talpid-openvpn-to-be-more-async-des-285' into main
-rw-r--r--Cargo.lock2
-rw-r--r--Cargo.toml1
-rw-r--r--android/translations-converter/Cargo.toml2
-rw-r--r--mullvad-api/Cargo.toml2
-rw-r--r--mullvad-daemon/Cargo.toml2
-rw-r--r--mullvad-management-interface/Cargo.toml2
-rw-r--r--mullvad-paths/Cargo.toml2
-rw-r--r--mullvad-problem-report/Cargo.toml2
-rw-r--r--mullvad-relay-selector/Cargo.toml2
-rw-r--r--mullvad-setup/Cargo.toml2
-rw-r--r--mullvad-types/Cargo.toml2
-rw-r--r--talpid-core/Cargo.toml5
-rw-r--r--talpid-dbus/Cargo.toml2
-rw-r--r--talpid-openvpn/Cargo.toml4
-rw-r--r--talpid-openvpn/src/lib.rs286
-rw-r--r--talpid-openvpn/src/process/mod.rs3
-rw-r--r--talpid-openvpn/src/process/openvpn.rs85
-rw-r--r--talpid-openvpn/src/process/stoppable_process.rs53
-rw-r--r--talpid-routing/Cargo.toml2
-rw-r--r--talpid-wireguard/Cargo.toml2
20 files changed, 199 insertions, 264 deletions
diff --git a/Cargo.lock b/Cargo.lock
index b3b830e0ce..c398c67f7a 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3487,10 +3487,8 @@ name = "talpid-openvpn"
version = "0.0.0"
dependencies = [
"async-trait",
- "duct",
"err-derive",
"futures",
- "is-terminal",
"log",
"once_cell",
"os_pipe",
diff --git a/Cargo.toml b/Cargo.toml
index ffa8d8784a..f68e491d9f 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -60,6 +60,7 @@ windows-sys = "0.48.0"
chrono = { version = "0.4.26", default-features = false}
clap = { version = "4.2.7", features = ["cargo", "derive"] }
+once_cell = "1.13"
[profile.release]
diff --git a/android/translations-converter/Cargo.toml b/android/translations-converter/Cargo.toml
index d0868d70e1..1c3b89e1d8 100644
--- a/android/translations-converter/Cargo.toml
+++ b/android/translations-converter/Cargo.toml
@@ -11,7 +11,7 @@ publish.workspace = true
[dependencies]
err-derive = { workspace = true }
htmlize = { version = "1.0.2", features = ["unescape"] }
-once_cell = "1.13"
+once_cell = { workspace = true }
regex = "1"
serde = { version = "1", features = ["derive"] }
quick-xml = { version = "0.27.1", features = ["serialize"] }
diff --git a/mullvad-api/Cargo.toml b/mullvad-api/Cargo.toml
index 83d8bf0723..08db7de306 100644
--- a/mullvad-api/Cargo.toml
+++ b/mullvad-api/Cargo.toml
@@ -26,7 +26,7 @@ tokio = { workspace = true, features = ["macros", "time", "rt-multi-thread", "ne
tokio-rustls = "0.24.1"
tokio-socks = "0.5.1"
rustls-pemfile = "1.0.3"
-once_cell = "1.13"
+once_cell = { workspace = true }
mullvad-fs = { path = "../mullvad-fs" }
mullvad-types = { path = "../mullvad-types" }
diff --git a/mullvad-daemon/Cargo.toml b/mullvad-daemon/Cargo.toml
index aaa893e170..cc89500abd 100644
--- a/mullvad-daemon/Cargo.toml
+++ b/mullvad-daemon/Cargo.toml
@@ -17,7 +17,7 @@ chrono = { workspace = true }
err-derive = { workspace = true }
fern = { version = "0.6", features = ["colored"] }
futures = "0.3"
-once_cell = "1.13"
+once_cell = { workspace = true }
libc = "0.2"
log = { workspace = true }
regex = "1.0"
diff --git a/mullvad-management-interface/Cargo.toml b/mullvad-management-interface/Cargo.toml
index 5a36da5a29..dc70a47905 100644
--- a/mullvad-management-interface/Cargo.toml
+++ b/mullvad-management-interface/Cargo.toml
@@ -25,7 +25,7 @@ log = { workspace = true }
[target.'cfg(unix)'.dependencies]
nix = "0.23"
-once_cell = "1.13"
+once_cell = { workspace = true }
[build-dependencies]
tonic-build = { workspace = true, default-features = false, features = ["transport", "prost"] }
diff --git a/mullvad-paths/Cargo.toml b/mullvad-paths/Cargo.toml
index 3b4e017920..78ddeefa9a 100644
--- a/mullvad-paths/Cargo.toml
+++ b/mullvad-paths/Cargo.toml
@@ -16,7 +16,7 @@ log = { workspace = true }
[target.'cfg(windows)'.dependencies]
widestring = "1.0"
-once_cell = "1.13"
+once_cell = { workspace = true }
[target.'cfg(windows)'.dependencies.windows-sys]
workspace = true
diff --git a/mullvad-problem-report/Cargo.toml b/mullvad-problem-report/Cargo.toml
index 5bddff5640..9001b4a181 100644
--- a/mullvad-problem-report/Cargo.toml
+++ b/mullvad-problem-report/Cargo.toml
@@ -11,7 +11,7 @@ publish.workspace = true
[dependencies]
dirs = "5.0.1"
err-derive = { workspace = true }
-once_cell = "1.13"
+once_cell = { workspace = true }
log = { workspace = true }
regex = "1.0"
uuid = { version = "1.4.1", features = ["v4"] }
diff --git a/mullvad-relay-selector/Cargo.toml b/mullvad-relay-selector/Cargo.toml
index 6574b866a4..aa954ba610 100644
--- a/mullvad-relay-selector/Cargo.toml
+++ b/mullvad-relay-selector/Cargo.toml
@@ -25,4 +25,4 @@ mullvad-api = { path = "../mullvad-api" }
mullvad-types = { path = "../mullvad-types" }
[dev-dependencies]
-once_cell = "1.13"
+once_cell = { workspace = true }
diff --git a/mullvad-setup/Cargo.toml b/mullvad-setup/Cargo.toml
index df509a5e80..22fc63b54f 100644
--- a/mullvad-setup/Cargo.toml
+++ b/mullvad-setup/Cargo.toml
@@ -16,7 +16,7 @@ path = "src/main.rs"
clap = { workspace = true }
env_logger = { workspace = true }
err-derive = { workspace = true }
-once_cell = "1.13"
+once_cell = { workspace = true }
mullvad-management-interface = { path = "../mullvad-management-interface" }
diff --git a/mullvad-types/Cargo.toml b/mullvad-types/Cargo.toml
index db300ef00c..603f9df388 100644
--- a/mullvad-types/Cargo.toml
+++ b/mullvad-types/Cargo.toml
@@ -12,7 +12,7 @@ publish.workspace = true
chrono = { workspace = true, features = ["clock", "serde"] }
err-derive = { workspace = true }
ipnetwork = "0.16"
-once_cell = "1.13"
+once_cell = { workspace = true }
log = { workspace = true }
regex = "1"
serde = { version = "1.0", features = ["derive"] }
diff --git a/talpid-core/Cargo.toml b/talpid-core/Cargo.toml
index b8c730a1bc..e39b07f244 100644
--- a/talpid-core/Cargo.toml
+++ b/talpid-core/Cargo.toml
@@ -9,11 +9,10 @@ edition.workspace = true
publish.workspace = true
[dependencies]
-duct = "0.13"
err-derive = { workspace = true }
futures = "0.3.15"
ipnetwork = "0.16"
-once_cell = "1.13"
+once_cell = { workspace = true }
libc = "0.2"
log = { workspace = true }
parking_lot = "0.12.0"
@@ -42,6 +41,7 @@ nftnl = { version = "0.6.2", features = ["nftnl-1-1-0"] }
mnl = { version = "0.2.2", features = ["mnl-1-0-4"] }
which = { version = "4.0", default-features = false }
talpid-dbus = { path = "../talpid-dbus" }
+duct = "0.13"
[target.'cfg(target_os = "macos")'.dependencies]
@@ -51,6 +51,7 @@ trust-dns-server = { version = "0.23.0", features = ["resolver"] }
trust-dns-proto = "0.23.0"
subslice = "0.2"
async-trait = "0.1"
+duct = "0.13"
[target.'cfg(windows)'.dependencies]
diff --git a/talpid-dbus/Cargo.toml b/talpid-dbus/Cargo.toml
index aa37b32640..60ab88f973 100644
--- a/talpid-dbus/Cargo.toml
+++ b/talpid-dbus/Cargo.toml
@@ -10,7 +10,7 @@ publish.workspace = true
[target.'cfg(target_os = "linux")'.dependencies]
dbus = "0.9"
err-derive = { workspace = true }
-once_cell = "1.13"
+once_cell = { workspace = true }
log = { workspace = true }
libc = "0.2"
tokio = { workspace = true, features = ["rt"] }
diff --git a/talpid-openvpn/Cargo.toml b/talpid-openvpn/Cargo.toml
index 259bfa9b66..8106c4816d 100644
--- a/talpid-openvpn/Cargo.toml
+++ b/talpid-openvpn/Cargo.toml
@@ -11,11 +11,9 @@ publish.workspace = true
[dependencies]
async-trait = "0.1"
-duct = "0.13"
err-derive = { workspace = true }
futures = "0.3.15"
-is-terminal = "0.4.2"
-once_cell = "1.13"
+once_cell = { workspace = true }
log = { workspace = true }
os_pipe = "1.1.4"
parking_lot = "0.12.0"
diff --git a/talpid-openvpn/src/lib.rs b/talpid-openvpn/src/lib.rs
index 6d2989160e..e6080044cb 100644
--- a/talpid-openvpn/src/lib.rs
+++ b/talpid-openvpn/src/lib.rs
@@ -7,10 +7,7 @@ use crate::proxy::{ProxyMonitor, ProxyResourceData};
use futures::channel::oneshot;
#[cfg(windows)]
use once_cell::sync::Lazy;
-use process::{
- openvpn::{OpenVpnCommand, OpenVpnProcHandle},
- stoppable_process::StoppableProcess,
-};
+use process::openvpn::{OpenVpnCommand, OpenVpnProcHandle};
#[cfg(target_os = "linux")]
use std::collections::{HashMap, HashSet};
#[cfg(windows)]
@@ -22,16 +19,15 @@ use std::{
process::ExitStatus,
sync::{
atomic::{AtomicBool, Ordering},
- mpsc, Arc, Mutex,
+ mpsc, Arc,
},
- thread,
time::Duration,
};
#[cfg(target_os = "linux")]
use talpid_routing::{self, RequiredRoute};
use talpid_tunnel::TunnelEvent;
use talpid_types::{net::openvpn, ErrorExt};
-use tokio::task;
+use tokio::{sync::Mutex, task};
#[cfg(windows)]
use widestring::U16CString;
@@ -170,7 +166,7 @@ pub struct OpenVpnMonitor<C: OpenVpnBuilder = OpenVpnCommand> {
>,
abort_spawn: futures::future::AbortHandle,
- child: Arc<Mutex<Option<Arc<C::ProcessHandle>>>>,
+ child: Arc<Mutex<Option<C::ProcessHandle>>>,
proxy_monitor: Option<Box<dyn ProxyMonitor>>,
closed: Arc<AtomicBool>,
/// Keep the `TempFile` for the user-pass file in the struct, so it's removed on drop.
@@ -437,16 +433,12 @@ impl<C: OpenVpnBuilder + Send + 'static> OpenVpnMonitor<C> {
let close_handle = monitor.close_handle();
tokio::spawn(async move {
if tunnel_close_rx.await.is_ok() {
- tokio::task::spawn_blocking(move || {
- if let Err(error) = close_handle.close() {
- log::error!(
- "{}",
- error.display_chain_with_msg("Failed to close the tunnel")
- );
- }
- })
- .await
- .expect("close handle panic");
+ if let Err(error) = close_handle.close().await {
+ log::error!(
+ "{}",
+ error.display_chain_with_msg("Failed to close the tunnel")
+ );
+ }
}
});
@@ -491,17 +483,18 @@ impl<C: OpenVpnBuilder + Send + 'static> OpenVpnMonitor<C> {
}
let handle = self.runtime.clone();
-
- thread::spawn(move || {
- tx_tunnel.send(Stopped::Tunnel(self.wait_tunnel())).unwrap();
+ handle.spawn(async move {
+ tx_tunnel
+ .send(Stopped::Tunnel(self.wait_tunnel().await))
+ .unwrap();
let _ = proxy_close_handle.close();
});
- thread::spawn(move || {
+ handle.spawn(async move {
tx_proxy
- .send(Stopped::Proxy(handle.block_on(proxy_monitor.wait())))
+ .send(Stopped::Proxy(proxy_monitor.wait().await))
.unwrap();
- let _ = tunnel_close_handle.close();
+ tunnel_close_handle.close().await
});
let result = rx.recv().expect("wait got no result");
@@ -516,13 +509,19 @@ impl<C: OpenVpnBuilder + Send + 'static> OpenVpnMonitor<C> {
}
} else {
// No proxy active, wait only for the tunnel.
- self.wait_tunnel()
+ let handle = self.runtime.clone();
+ let (tx_tunnel, rx) = mpsc::channel();
+ handle.spawn(async move {
+ let x = self.wait_tunnel();
+ tx_tunnel.send(x.await).unwrap();
+ });
+ rx.recv().expect("wait_tunnel got no result")
}
}
/// Supplement `inner_wait_tunnel()` with logging and error handling.
- fn wait_tunnel(self) -> Result<()> {
- let result = self.inner_wait_tunnel();
+ async fn wait_tunnel(self) -> Result<()> {
+ let result = self.inner_wait_tunnel().await;
match result {
WaitResult::Preparation(result) => match result {
Err(error) => {
@@ -559,13 +558,15 @@ impl<C: OpenVpnBuilder + Send + 'static> OpenVpnMonitor<C> {
/// Waits for both the child process and the event dispatcher in parallel. After both have
/// returned this returns the earliest result.
- fn inner_wait_tunnel(mut self) -> WaitResult {
+ async fn inner_wait_tunnel(mut self) -> WaitResult {
let child = match self
- .runtime
- .block_on(self.spawn_task.take().unwrap())
+ .spawn_task
+ .take()
+ .unwrap()
+ .await
.expect("spawn task panicked")
{
- Ok(Ok(child)) => Arc::new(child),
+ Ok(Ok(child)) => child,
Ok(Err(error)) => {
self.closed.swap(true, Ordering::SeqCst);
return WaitResult::Preparation(Err(error));
@@ -574,41 +575,33 @@ impl<C: OpenVpnBuilder + Send + 'static> OpenVpnMonitor<C> {
};
if self.closed.load(Ordering::SeqCst) {
- let _ = child.kill();
+ let _ = child.kill().await;
return WaitResult::Preparation(Ok(()));
}
{
- self.child.lock().unwrap().replace(child.clone());
+ self.child.lock().await.replace(child);
}
- let closed_handle = self.closed.clone();
- let child_close_handle = self.close_handle();
-
- let (child_tx, rx) = mpsc::channel();
- let dispatcher_tx = child_tx.clone();
-
let event_server_abort_tx = self.event_server_abort_tx.clone();
- thread::spawn(move || {
- let result = child.wait();
- let closed = closed_handle.load(Ordering::SeqCst);
- child_tx.send(WaitResult::Child(result, closed)).unwrap();
+ let kill_child = async move {
+ let result = self.child.lock().await.as_ref().unwrap().wait().await;
+ let closed = self.closed.load(Ordering::SeqCst);
+ let result = WaitResult::Child(result, closed);
event_server_abort_tx.trigger();
- });
-
- let server_join_handle = self
- .server_join_handle
- .take()
- .expect("No event server quit handle");
- self.runtime.spawn(async move {
+ result
+ };
+ let kill_event_dispatcher = async move {
+ let server_join_handle = self
+ .server_join_handle
+ .take()
+ .expect("No event server quit handle");
let _ = server_join_handle.await;
- dispatcher_tx.send(WaitResult::EventDispatcher).unwrap();
- let _ = child_close_handle.close();
- });
+ WaitResult::EventDispatcher
+ };
- let result = rx.recv().expect("inner_wait_tunnel no result");
- let _ = rx.recv().expect("inner_wait_tunnel no second result");
+ let (result, _) = tokio::join!(kill_child, kill_event_dispatcher);
result
}
@@ -726,18 +719,18 @@ impl<C: OpenVpnBuilder + Send + 'static> OpenVpnMonitor<C> {
/// A handle to an `OpenVpnMonitor` for closing it.
#[derive(Debug, Clone)]
pub struct OpenVpnCloseHandle<H: ProcessHandle = OpenVpnProcHandle> {
- child: Arc<Mutex<Option<Arc<H>>>>,
+ child: Arc<Mutex<Option<H>>>,
abort_spawn: futures::future::AbortHandle,
closed: Arc<AtomicBool>,
}
impl<H: ProcessHandle> OpenVpnCloseHandle<H> {
/// Kills the underlying OpenVPN process, making the `OpenVpnMonitor::wait` method return.
- pub fn close(self) -> io::Result<()> {
+ pub async fn close(self) -> io::Result<()> {
if !self.closed.swap(true, Ordering::SeqCst) {
self.abort_spawn.abort();
- if let Some(child) = self.child.lock().unwrap().as_ref() {
- child.kill()
+ if let Some(child) = self.child.lock().await.as_ref() {
+ child.kill().await
} else {
Ok(())
}
@@ -775,12 +768,13 @@ pub trait OpenVpnBuilder {
}
/// Trait for types acting as handles to subprocesses for `OpenVpnMonitor`
+#[async_trait::async_trait]
pub trait ProcessHandle: Send + Sync + 'static {
/// Block until the subprocess exits or there is an error in the wait syscall.
- fn wait(&self) -> io::Result<ExitStatus>;
+ async fn wait(&self) -> io::Result<ExitStatus>;
/// Kill the subprocess.
- fn kill(&self) -> io::Result<()>;
+ async fn kill(&self) -> io::Result<()>;
}
impl OpenVpnBuilder for OpenVpnCommand {
@@ -799,7 +793,7 @@ impl OpenVpnBuilder for OpenVpnCommand {
}
fn start(&self) -> io::Result<OpenVpnProcHandle> {
- OpenVpnProcHandle::new(self.build())
+ OpenVpnProcHandle::new(&mut self.build())
}
#[cfg(target_os = "linux")]
@@ -809,13 +803,14 @@ impl OpenVpnBuilder for OpenVpnCommand {
}
}
+#[async_trait::async_trait]
impl ProcessHandle for OpenVpnProcHandle {
- fn wait(&self) -> io::Result<ExitStatus> {
- self.inner.wait().map(|output| output.status)
+ async fn wait(&self) -> io::Result<ExitStatus> {
+ self.wait().await
}
- fn kill(&self) -> io::Result<()> {
- self.nice_kill(OPENVPN_DIE_TIMEOUT)
+ async fn kill(&self) -> io::Result<()> {
+ self.nice_kill(OPENVPN_DIE_TIMEOUT).await
}
}
@@ -1219,32 +1214,25 @@ mod tests {
#[derive(Debug, Copy, Clone)]
struct TestProcessHandle(i32);
+ #[async_trait::async_trait]
impl ProcessHandle for TestProcessHandle {
#[cfg(unix)]
- fn wait(&self) -> io::Result<ExitStatus> {
+ async fn wait(&self) -> io::Result<ExitStatus> {
use std::os::unix::process::ExitStatusExt;
Ok(ExitStatus::from_raw(self.0))
}
#[cfg(windows)]
- fn wait(&self) -> io::Result<ExitStatus> {
+ async fn wait(&self) -> io::Result<ExitStatus> {
use std::os::windows::process::ExitStatusExt;
Ok(ExitStatus::from_raw(self.0 as u32))
}
- fn kill(&self) -> io::Result<()> {
+ async fn kill(&self) -> io::Result<()> {
Ok(())
}
}
- fn new_runtime() -> Result<tokio::runtime::Runtime> {
- tokio::runtime::Builder::new_multi_thread()
- .worker_threads(2)
- .enable_all()
- .build()
- .map_err(Error::RuntimeError)
- }
-
fn create_init_args_plugin_log(
plugin_path: PathBuf,
log_path: Option<PathBuf>,
@@ -1269,131 +1257,111 @@ mod tests {
create_init_args_plugin_log("".into(), None)
}
- #[test]
- fn sets_plugin() {
+ #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+ async fn sets_plugin() {
let builder = TestOpenVpnBuilder::default();
- let runtime = new_runtime().unwrap();
let openvpn_init_args = create_init_args_plugin_log("./my_test_plugin".into(), None);
- let _ = runtime.block_on(async {
- OpenVpnMonitor::new_internal(
- builder.clone(),
- openvpn_init_args,
- TestOpenvpnEventProxy {},
- #[cfg(windows)]
- Box::new(TestWintunContext {}),
- )
- });
+ let _ = OpenVpnMonitor::new_internal(
+ builder.clone(),
+ openvpn_init_args,
+ TestOpenvpnEventProxy {},
+ #[cfg(windows)]
+ Box::new(TestWintunContext {}),
+ );
assert_eq!(
Some(PathBuf::from("./my_test_plugin")),
*builder.plugin.lock()
);
}
- #[test]
- fn sets_log() {
+ #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+ async fn sets_log() {
let builder = TestOpenVpnBuilder::default();
- let runtime = new_runtime().unwrap();
let openvpn_init_args =
create_init_args_plugin_log("".into(), Some(PathBuf::from("./my_test_log_file")));
- let _ = runtime.block_on(async {
- OpenVpnMonitor::new_internal(
- builder.clone(),
- openvpn_init_args,
- TestOpenvpnEventProxy {},
- #[cfg(windows)]
- Box::new(TestWintunContext {}),
- )
- });
+ let _ = OpenVpnMonitor::new_internal(
+ builder.clone(),
+ openvpn_init_args,
+ TestOpenvpnEventProxy {},
+ #[cfg(windows)]
+ Box::new(TestWintunContext {}),
+ );
assert_eq!(
Some(PathBuf::from("./my_test_log_file")),
*builder.log.lock()
);
}
- #[test]
- fn exit_successfully() {
+ #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+ async fn exit_successfully() {
let builder = TestOpenVpnBuilder {
process_handle: Some(TestProcessHandle(0)),
..Default::default()
};
- let runtime = new_runtime().unwrap();
let openvpn_init_args = create_init_args();
- let testee = runtime
- .block_on(async {
- OpenVpnMonitor::new_internal(
- builder,
- openvpn_init_args,
- TestOpenvpnEventProxy {},
- #[cfg(windows)]
- Box::new(TestWintunContext {}),
- )
- })
- .unwrap();
+ let testee = OpenVpnMonitor::new_internal(
+ builder,
+ openvpn_init_args,
+ TestOpenvpnEventProxy {},
+ #[cfg(windows)]
+ Box::new(TestWintunContext {}),
+ )
+ .unwrap();
assert!(testee.wait().is_ok());
}
- #[test]
- fn exit_error() {
+ #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+ async fn exit_error() {
let builder = TestOpenVpnBuilder {
process_handle: Some(TestProcessHandle(1)),
..Default::default()
};
- let runtime = new_runtime().unwrap();
let openvpn_init_args = create_init_args();
- let testee = runtime
- .block_on(async move {
- OpenVpnMonitor::new_internal(
- builder,
- openvpn_init_args,
- TestOpenvpnEventProxy {},
- #[cfg(windows)]
- Box::new(TestWintunContext {}),
- )
- })
- .unwrap();
+ let testee = OpenVpnMonitor::new_internal(
+ builder,
+ openvpn_init_args,
+ TestOpenvpnEventProxy {},
+ #[cfg(windows)]
+ Box::new(TestWintunContext {}),
+ )
+ .unwrap();
assert!(testee.wait().is_err());
}
- #[test]
- fn wait_closed() {
+ #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+ async fn wait_closed() {
let builder = TestOpenVpnBuilder {
process_handle: Some(TestProcessHandle(1)),
..Default::default()
};
- let runtime = new_runtime().unwrap();
let openvpn_init_args = create_init_args();
- let testee = runtime
- .block_on(async {
- OpenVpnMonitor::new_internal(
- builder,
- openvpn_init_args,
- TestOpenvpnEventProxy {},
- #[cfg(windows)]
- Box::new(TestWintunContext {}),
- )
- })
- .unwrap();
+ let testee = OpenVpnMonitor::new_internal(
+ builder,
+ openvpn_init_args,
+ TestOpenvpnEventProxy {},
+ #[cfg(windows)]
+ Box::new(TestWintunContext {}),
+ )
+ .unwrap();
- testee.close_handle().close().unwrap();
- assert!(testee.wait().is_ok());
+ testee.close_handle().close().await.unwrap();
+ let result = testee.wait();
+ println!("[testee.wait(): {:?}]", result);
+ assert!(result.is_ok());
}
- #[test]
- fn failed_process_start() {
+ #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+ async fn failed_process_start() {
let builder = TestOpenVpnBuilder::default();
- let runtime = new_runtime().unwrap();
let openvpn_init_args = create_init_args();
- let result = runtime
- .block_on(async {
- OpenVpnMonitor::new_internal(
- builder,
- openvpn_init_args,
- TestOpenvpnEventProxy {},
- #[cfg(windows)]
- Box::new(TestWintunContext {}),
- )
- })
- .unwrap();
+ let result = OpenVpnMonitor::new_internal(
+ builder,
+ openvpn_init_args,
+ TestOpenvpnEventProxy {},
+ #[cfg(windows)]
+ Box::new(TestWintunContext {}),
+ )
+ .unwrap();
match result.wait() {
Err(Error::StartProcessError) => (),
_ => panic!("Wrong error"),
diff --git a/talpid-openvpn/src/process/mod.rs b/talpid-openvpn/src/process/mod.rs
index 54cdeb9042..f496df31ae 100644
--- a/talpid-openvpn/src/process/mod.rs
+++ b/talpid-openvpn/src/process/mod.rs
@@ -1,6 +1,3 @@
/// A module for all OpenVPN related process management.
#[cfg(not(target_os = "android"))]
pub mod openvpn;
-
-/// A trait for stopping subprocesses gracefully.
-pub mod stoppable_process;
diff --git a/talpid-openvpn/src/process/openvpn.rs b/talpid-openvpn/src/process/openvpn.rs
index 722468e3ca..44e00d0eda 100644
--- a/talpid-openvpn/src/process/openvpn.rs
+++ b/talpid-openvpn/src/process/openvpn.rs
@@ -1,6 +1,3 @@
-use duct;
-
-use super::stoppable_process::StoppableProcess;
use os_pipe::{pipe, PipeWriter};
use parking_lot::Mutex;
use shell_escape;
@@ -190,9 +187,11 @@ impl OpenVpnCommand {
}
/// Build a runnable expression from the current state of the command.
- pub fn build(&self) -> duct::Expression {
+ pub fn build(&self) -> tokio::process::Command {
log::debug!("Building expression: {}", &self);
- duct::cmd(&self.openvpn_bin, self.get_arguments()).unchecked()
+ let mut handle = tokio::process::Command::new(&self.openvpn_bin);
+ handle.args(self.get_arguments());
+ handle
}
/// Returns all arguments that the subprocess would be spawned with.
@@ -365,8 +364,11 @@ impl fmt::Display for OpenVpnCommand {
/// Handle to a running OpenVPN process.
pub struct OpenVpnProcHandle {
- /// Duct handle
- pub inner: duct::Handle,
+ /// Handle to the child process running OpenVPN.
+ ///
+ /// This handle is acquired by calling [`OpenVpnCommand::build`] (or
+ /// [`tokio::process::Command::spawn`]).
+ pub inner: std::sync::Arc<tokio::sync::Mutex<tokio::process::Child>>,
/// Pipe handle to stdin of the OpenVPN process. Our custom fork of OpenVPN
/// has been changed so that it exits cleanly when stdin is closed. This is a hack
/// solution to cleanly shut OpenVPN down without using the
@@ -377,62 +379,85 @@ pub struct OpenVpnProcHandle {
impl OpenVpnProcHandle {
/// Configures the expression to run OpenVPN in a way compatible with this handle
/// and spawns it. Returns the handle.
- pub fn new(mut cmd: duct::Expression) -> io::Result<Self> {
- use is_terminal::IsTerminal;
+ pub fn new(mut cmd: &mut tokio::process::Command) -> io::Result<Self> {
+ use std::io::IsTerminal;
if !std::io::stdout().is_terminal() {
- cmd = cmd.stdout_null();
+ cmd = cmd.stdout(std::process::Stdio::null())
}
if !std::io::stderr().is_terminal() {
- cmd = cmd.stderr_null();
+ cmd = cmd.stderr(std::process::Stdio::null())
}
let (reader, writer) = pipe()?;
- let proc_handle = cmd.stdin_file(reader).start()?;
+ let proc_handle = cmd.stdin(reader).spawn()?;
Ok(Self {
- inner: proc_handle,
+ inner: std::sync::Arc::new(tokio::sync::Mutex::new(proc_handle)),
stdin: Mutex::new(Some(writer)),
})
}
-}
-impl StoppableProcess for OpenVpnProcHandle {
- fn stop(&self) {
+ /// Attempts to stop the OpenVPN process gracefully in the given time
+ /// period, otherwise kills the process.
+ pub async fn nice_kill(&self, timeout: std::time::Duration) -> io::Result<()> {
+ log::debug!("Trying to stop child process gracefully");
+ self.stop().await;
+
+ // Wait for the process to die for a maximum of `timeout`.
+ let wait_result = tokio::time::timeout(timeout, self.wait()).await;
+ match wait_result {
+ Ok(_) => log::debug!("Child process terminated gracefully"),
+ Err(_) => {
+ log::warn!(
+ "Child process did not terminate gracefully within timeout, forcing termination"
+ );
+ self.kill().await?;
+ }
+ }
+ Ok(())
+ }
+
+ /// Waits for the child to exit completely, returning the status that it
+ /// exited with. See [tokio::process::Child::wait] for in-depth
+ /// documentation.
+ async fn wait(&self) -> io::Result<std::process::ExitStatus> {
+ self.inner.lock().await.wait().await
+ }
+
+ /// Kill the OpenVPN process and drop its stdin handle.
+ async fn stop(&self) {
// Dropping our stdin handle so that it is closed once. Closing the handle should
// gracefully stop our OpenVPN child process.
if self.stdin.lock().take().is_none() {
log::warn!("Tried to close OpenVPN stdin handle twice, this is a bug");
}
+ self.clean_up().await
}
- fn kill(&self) -> io::Result<()> {
+ async fn kill(&self) -> io::Result<()> {
log::warn!("Killing OpenVPN process");
- self.inner.kill()?;
+ self.inner.lock().await.kill().await?;
log::debug!("OpenVPN forcefully killed");
Ok(())
}
- fn has_stopped(&self) -> io::Result<bool> {
- match self.inner.try_wait() {
- Ok(None) => Ok(false),
- Ok(Some(_)) => Ok(true),
- Err(e) => Err(e),
- }
+ async fn has_stopped(&self) -> io::Result<bool> {
+ let exit_status = self.inner.lock().await.try_wait()?;
+ Ok(exit_status.is_some())
}
-}
-impl Drop for OpenVpnProcHandle {
- fn drop(&mut self) {
- let result = match self.has_stopped() {
- Ok(false) => self.kill(),
+ /// Try to kill the OpenVPN process.
+ async fn clean_up(&self) {
+ let result = match self.has_stopped().await {
+ Ok(false) => self.kill().await,
Err(e) => {
log::error!(
"{}",
e.display_chain_with_msg("Failed to check if OpenVPN is running")
);
- self.kill()
+ self.kill().await
}
_ => Ok(()),
};
diff --git a/talpid-openvpn/src/process/stoppable_process.rs b/talpid-openvpn/src/process/stoppable_process.rs
deleted file mode 100644
index 3681c6cfa8..0000000000
--- a/talpid-openvpn/src/process/stoppable_process.rs
+++ /dev/null
@@ -1,53 +0,0 @@
-use std::{
- io, thread,
- time::{Duration, Instant},
-};
-
-static POLL_INTERVAL_MS: Duration = Duration::from_millis(50);
-
-/// A best effort attempt at stopping a subprocess whilst also ensuring that the subprocess is
-/// killed eventually.
-pub trait StoppableProcess
-where
- Self: Sized,
-{
- /// Gracefully stops a process.
- fn stop(&self);
-
- /// Kills a process unconditionally. Implementations should strive to never fail.
- fn kill(&self) -> io::Result<()>;
-
- /// Check if process is stopped. This method must not block.
- fn has_stopped(&self) -> io::Result<bool>;
-
- /// Attempts to stop a process gracefully in the given time period, otherwise kills the
- /// process.
- fn nice_kill(&self, timeout: Duration) -> io::Result<()> {
- log::debug!("Trying to stop child process gracefully");
- self.stop();
- if wait_timeout(self, timeout)? {
- log::debug!("Child process terminated gracefully");
- } else {
- log::warn!(
- "Child process did not terminate gracefully within timeout, forcing termination"
- );
- self.kill()?;
- }
- Ok(())
- }
-}
-/// Wait for a process to die for a maximum of `timeout`. Returns true if the process died within
-/// the timeout.
-fn wait_timeout<T>(process: &T, timeout: Duration) -> io::Result<bool>
-where
- T: StoppableProcess + Sized,
-{
- let timer = Instant::now();
- while timer.elapsed() < timeout {
- if process.has_stopped()? {
- return Ok(true);
- }
- thread::sleep(POLL_INTERVAL_MS);
- }
- Ok(false)
-}
diff --git a/talpid-routing/Cargo.toml b/talpid-routing/Cargo.toml
index 50bd4c1477..56fb12b13c 100644
--- a/talpid-routing/Cargo.toml
+++ b/talpid-routing/Cargo.toml
@@ -21,7 +21,7 @@ talpid-types = { path = "../talpid-types" }
[target.'cfg(target_os = "linux")'.dependencies]
libc = "0.2"
-once_cell = "1.13"
+once_cell = { workspace = true }
rtnetlink = "0.11"
netlink-packet-route = "0.13"
netlink-sys = "0.8.3"
diff --git a/talpid-wireguard/Cargo.toml b/talpid-wireguard/Cargo.toml
index 04ea3fc00b..f8e854a7c5 100644
--- a/talpid-wireguard/Cargo.toml
+++ b/talpid-wireguard/Cargo.toml
@@ -14,7 +14,7 @@ err-derive = { workspace = true }
futures = "0.3.15"
hex = "0.4"
ipnetwork = "0.16"
-once_cell = "1.13"
+once_cell = { workspace = true }
libc = "0.2"
log = { workspace = true }
parking_lot = "0.12.0"