summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorLinus Färnstrand <linus@mullvad.net>2017-06-09 13:20:18 +0200
committerLinus Färnstrand <linus@mullvad.net>2017-06-09 13:20:18 +0200
commitc850ba6d06010f3c0d528a750659707ac48fd2ae (patch)
treec26bb0035409e98cca29a0d7ec3201a875ab7315
parentc2aaa4eb26af015ef949785668751cded783675d (diff)
parent7deaeb4712d334bd34f8f98bbc168978020862f7 (diff)
downloadmullvadvpn-c850ba6d06010f3c0d528a750659707ac48fd2ae.tar.xz
mullvadvpn-c850ba6d06010f3c0d528a750659707ac48fd2ae.zip
Merge branch 'low-level-close-handle' into master-new-daemon
-rw-r--r--Cargo.lock50
-rw-r--r--mullvad_daemon/Cargo.toml8
-rw-r--r--talpid_core/Cargo.toml4
-rw-r--r--talpid_core/src/process/mod.rs3
-rw-r--r--talpid_core/src/process/monitor.rs113
-rw-r--r--talpid_core/src/tunnel/openvpn.rs202
-rw-r--r--talpid_ipc/Cargo.toml6
-rw-r--r--talpid_ipc/src/lib.rs31
8 files changed, 170 insertions, 247 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 9e86a409c1..8d587baee0 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -213,7 +213,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "jsonrpc-core"
version = "7.0.0"
-source = "git+https://github.com/faern/jsonrpc?branch=bind-zero#863467e499d3c8e0262d1dc211d0525352471d7a"
+source = "git+https://github.com/faern/jsonrpc?branch=ws-close-handle#8c47e034f05b8e42d3440078b758dd732e85894c"
dependencies = [
"futures 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -225,19 +225,19 @@ dependencies = [
[[package]]
name = "jsonrpc-macros"
version = "7.0.0"
-source = "git+https://github.com/faern/jsonrpc?branch=bind-zero#863467e499d3c8e0262d1dc211d0525352471d7a"
+source = "git+https://github.com/faern/jsonrpc?branch=ws-close-handle#8c47e034f05b8e42d3440078b758dd732e85894c"
dependencies = [
- "jsonrpc-core 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)",
- "jsonrpc-pubsub 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)",
+ "jsonrpc-core 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)",
+ "jsonrpc-pubsub 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)",
"serde 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "jsonrpc-pubsub"
version = "7.0.0"
-source = "git+https://github.com/faern/jsonrpc?branch=bind-zero#863467e499d3c8e0262d1dc211d0525352471d7a"
+source = "git+https://github.com/faern/jsonrpc?branch=ws-close-handle#8c47e034f05b8e42d3440078b758dd732e85894c"
dependencies = [
- "jsonrpc-core 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)",
+ "jsonrpc-core 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)",
"log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)",
"parking_lot 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
]
@@ -245,10 +245,10 @@ dependencies = [
[[package]]
name = "jsonrpc-server-utils"
version = "7.0.0"
-source = "git+https://github.com/faern/jsonrpc?branch=bind-zero#863467e499d3c8e0262d1dc211d0525352471d7a"
+source = "git+https://github.com/faern/jsonrpc?branch=ws-close-handle#8c47e034f05b8e42d3440078b758dd732e85894c"
dependencies = [
"globset 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
- "jsonrpc-core 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)",
+ "jsonrpc-core 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)",
"log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-core 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-io 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -257,10 +257,10 @@ dependencies = [
[[package]]
name = "jsonrpc-ws-server"
version = "7.0.0"
-source = "git+https://github.com/faern/jsonrpc?branch=bind-zero#863467e499d3c8e0262d1dc211d0525352471d7a"
+source = "git+https://github.com/faern/jsonrpc?branch=ws-close-handle#8c47e034f05b8e42d3440078b758dd732e85894c"
dependencies = [
- "jsonrpc-core 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)",
- "jsonrpc-server-utils 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)",
+ "jsonrpc-core 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)",
+ "jsonrpc-server-utils 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)",
"log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)",
"ws 0.7.1 (git+https://github.com/tomusdrw/ws-rs)",
]
@@ -341,10 +341,10 @@ dependencies = [
"assert_matches 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"env_logger 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
"error-chain 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)",
- "jsonrpc-core 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)",
- "jsonrpc-macros 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)",
- "jsonrpc-pubsub 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)",
- "jsonrpc-ws-server 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)",
+ "jsonrpc-core 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)",
+ "jsonrpc-macros 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)",
+ "jsonrpc-pubsub 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)",
+ "jsonrpc-ws-server 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)",
"log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_derive 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -575,8 +575,8 @@ dependencies = [
"assert_matches 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"duct 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)",
"error-chain 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)",
- "jsonrpc-core 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)",
- "jsonrpc-macros 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)",
+ "jsonrpc-core 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)",
+ "jsonrpc-macros 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)",
"log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)",
"openvpn_ffi 0.1.0",
"talpid_ipc 0.1.0",
@@ -589,9 +589,9 @@ dependencies = [
"assert_matches 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"env_logger 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
"error-chain 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)",
- "jsonrpc-core 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)",
- "jsonrpc-macros 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)",
- "jsonrpc-ws-server 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)",
+ "jsonrpc-core 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)",
+ "jsonrpc-macros 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)",
+ "jsonrpc-ws-server 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)",
"log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_json 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -774,11 +774,11 @@ dependencies = [
"checksum idna 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "6ac85ec3f80c8e4e99d9325521337e14ec7555c458a14e377d189659a427f375"
"checksum iovec 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "29d062ee61fccdf25be172e70f34c9f6efc597e1fb8f6526e8437b2046ab26be"
"checksum itoa 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "eb2f404fbc66fd9aac13e998248505e7ecb2ad8e44ab6388684c5fb11c6c251c"
-"checksum jsonrpc-core 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)" = "<none>"
-"checksum jsonrpc-macros 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)" = "<none>"
-"checksum jsonrpc-pubsub 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)" = "<none>"
-"checksum jsonrpc-server-utils 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)" = "<none>"
-"checksum jsonrpc-ws-server 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)" = "<none>"
+"checksum jsonrpc-core 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)" = "<none>"
+"checksum jsonrpc-macros 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)" = "<none>"
+"checksum jsonrpc-pubsub 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)" = "<none>"
+"checksum jsonrpc-server-utils 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)" = "<none>"
+"checksum jsonrpc-ws-server 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)" = "<none>"
"checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d"
"checksum lazycell 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ce12306c4739d86ee97c23139f3a34ddf0387bbf181bc7929d287025a8c3ef6b"
"checksum lazycell 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "3b585b7a6811fb03aa10e74b278a0f00f8dd9b45dc681f148bb29fa5cb61859b"
diff --git a/mullvad_daemon/Cargo.toml b/mullvad_daemon/Cargo.toml
index 9982a1bbaa..e425e71515 100644
--- a/mullvad_daemon/Cargo.toml
+++ b/mullvad_daemon/Cargo.toml
@@ -10,10 +10,10 @@ serde = "1.0"
serde_derive = "1.0"
log = "0.3"
env_logger = "0.4"
-jsonrpc-core = { git = "https://github.com/faern/jsonrpc", branch = "bind-zero" }
-jsonrpc-macros = { git = "https://github.com/faern/jsonrpc", branch = "bind-zero" }
-jsonrpc-pubsub = { git = "https://github.com/faern/jsonrpc", branch = "bind-zero" }
-jsonrpc-ws-server = { git = "https://github.com/faern/jsonrpc", branch = "bind-zero" }
+jsonrpc-core = { git = "https://github.com/faern/jsonrpc", branch = "ws-close-handle" }
+jsonrpc-macros = { git = "https://github.com/faern/jsonrpc", branch = "ws-close-handle" }
+jsonrpc-pubsub = { git = "https://github.com/faern/jsonrpc", branch = "ws-close-handle" }
+jsonrpc-ws-server = { git = "https://github.com/faern/jsonrpc", branch = "ws-close-handle" }
[dependencies.talpid_ipc]
path = "../talpid_ipc"
diff --git a/talpid_core/Cargo.toml b/talpid_core/Cargo.toml
index 18c998cf93..6fd7222f27 100644
--- a/talpid_core/Cargo.toml
+++ b/talpid_core/Cargo.toml
@@ -8,8 +8,8 @@ description = "Core backend functionality of the Mullvad VPN client"
duct = "0.8"
error-chain = "0.10"
log = "0.3"
-jsonrpc-core = { git = "https://github.com/faern/jsonrpc", branch = "bind-zero" }
-jsonrpc-macros = { git = "https://github.com/faern/jsonrpc", branch = "bind-zero" }
+jsonrpc-core = { git = "https://github.com/faern/jsonrpc", branch = "ws-close-handle" }
+jsonrpc-macros = { git = "https://github.com/faern/jsonrpc", branch = "ws-close-handle" }
[dependencies.talpid_ipc]
path = "../talpid_ipc"
diff --git a/talpid_core/src/process/mod.rs b/talpid_core/src/process/mod.rs
index a7d76ea9f4..6ae568907b 100644
--- a/talpid_core/src/process/mod.rs
+++ b/talpid_core/src/process/mod.rs
@@ -1,5 +1,2 @@
-/// A module for monitoring child processes and get notified of events on them.
-pub mod monitor;
-
/// A module for all OpenVPN related process management.
pub mod openvpn;
diff --git a/talpid_core/src/process/monitor.rs b/talpid_core/src/process/monitor.rs
deleted file mode 100644
index 1568d80615..0000000000
--- a/talpid_core/src/process/monitor.rs
+++ /dev/null
@@ -1,113 +0,0 @@
-use duct;
-
-use std::io;
-use std::process;
-use std::sync::Arc;
-use std::thread;
-
-/// A child process monitor. Takes care of starting and monitoring a child process and calls the
-/// listener on child exit. If the child is still running when a `ChildMonitor` instance goes out
-/// of scope it will kill the child and wait for it to exit properly.
-pub struct ChildMonitor {
- child: Arc<duct::Handle>,
- thread: Option<thread::JoinHandle<()>>,
-}
-
-impl ChildMonitor {
- /// Starts the child process and begins to monitor it. `on_exit` will be called as soon as the
- /// child process exits.
- pub fn start<L>(expression: &duct::Expression, mut on_exit: L) -> io::Result<Self>
- where L: FnMut(io::Result<&process::Output>) + Send + 'static
- {
- let child = Arc::new(expression.start()?);
- let child_clone = child.clone();
- let thread = Some(thread::spawn(move || on_exit(child_clone.wait())));
- Ok(ChildMonitor { child, thread })
- }
-
- /// Wait for the child to exit. Blocking the current thread. The `on_exit` callback is
- /// guaranteed to fire before this method returns.
- pub fn wait(&mut self) -> io::Result<&process::Output> {
- if let Some(thread) = self.thread.take() {
- if let Err(e) = thread.join() {
- error!("Panic in the on_exit callback in ChildMonitor: {:?}", e);
- }
- }
- self.child.wait()
- }
-
- /// Send a kill signal to the child. No need to call `wait` after to free the PID. The monitor
- /// will wait for the process for you.
- pub fn kill(&self) -> io::Result<()> {
- self.child.kill()
- }
-}
-
-impl Drop for ChildMonitor {
- fn drop(&mut self) {
- let _ = self.kill();
- let _ = self.child.wait();
- }
-}
-
-
-#[cfg(test)]
-mod child_monitor_tests {
- use super::*;
- use duct::{Expression, cmd};
-
- use std::sync::mpsc;
- use std::time::Duration;
-
- fn echo_cmd(s: &str) -> Expression {
- cmd("echo", &[s]).stdout_capture().unchecked()
- }
-
- fn invalid_cmd() -> Expression {
- cmd("this command does not exist", &[""]).unchecked()
- }
-
- fn sleep_cmd(secs: u32) -> Expression {
- if cfg!(windows) {
- cmd("ping", &["127.0.0.1", "-n", &(secs + 1).to_string()]).unchecked()
- } else {
- cmd("sleep", &[secs.to_string()]).unchecked()
- }
- }
-
- fn spawn(cmd: &Expression) -> (ChildMonitor, mpsc::Receiver<io::Result<process::Output>>) {
- let (tx, rx) = mpsc::channel();
- let child =
- ChildMonitor::start(cmd, move |res| tx.send(res.map(|out| out.clone())).unwrap())
- .expect("Unable to start process");
- (child, rx)
- }
-
- #[test]
- fn echo() {
- let (mut child, rx) = spawn(&echo_cmd("foobar"));
- let wait_output = child.wait().unwrap();
- let callback_output = rx.try_recv().unwrap().unwrap();
-
- assert!(callback_output.status.success());
- assert_eq!("foobar\n".as_bytes(), &callback_output.stdout[..]);
- assert_eq!(wait_output.status, callback_output.status);
- assert_eq!(wait_output.stdout, callback_output.stdout);
- }
-
- #[test]
- fn invalid_command() {
- assert!(ChildMonitor::start(&invalid_cmd(), |_| {}).is_err());
- }
-
- #[test]
- fn callback_after_kill() {
- let (child, rx) = spawn(&sleep_cmd(100000));
- // Make sure on_exit is not triggered within the first second. It should not be called
- // until we kill the process.
- assert!(rx.recv_timeout(Duration::from_secs(1)).is_err());
-
- child.kill().unwrap();
- assert!(!rx.recv_timeout(Duration::from_secs(10)).unwrap().unwrap().status.success());
- }
-}
diff --git a/talpid_core/src/tunnel/openvpn.rs b/talpid_core/src/tunnel/openvpn.rs
index 9d366ebb4a..cf33484f38 100644
--- a/talpid_core/src/tunnel/openvpn.rs
+++ b/talpid_core/src/tunnel/openvpn.rs
@@ -1,30 +1,29 @@
+use duct;
use jsonrpc_core::{Error, IoHandler};
-use openvpn_ffi;
-use process::monitor::ChildMonitor;
+use openvpn_ffi::{OpenVpnEnv, OpenVpnPluginEvent};
use process::openvpn::OpenVpnCommand;
use std::io;
-use std::path::{Path, PathBuf};
+use std::path::Path;
use std::process;
use std::result::Result as StdResult;
-use std::sync::{Arc, Mutex};
+use std::sync::{Arc, mpsc};
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::thread;
use talpid_ipc;
mod errors {
error_chain!{
errors {
- /// The `OpenVpnMonitor` is in an invalid state for the requested operation.
- InvalidState {
- description("Invalid state. OpenVPN is already running")
+ /// Unable to start, wait for or kill the OpenVPN process.
+ ChildProcessError(msg: &'static str) {
+ description("Unable to start, wait for or kill the OpenVPN process")
+ display("OpenVPN process error: {}", msg)
}
- /// Unable to start or kill the OpenVPN process.
- ChildProcessError {
- description("Unable to start or kill the OpenVPN process")
- }
- /// Unable to start or manage the IPC server listening for events from OpenVPN
- IpcServerError {
- description("Unable to start or manage the IPC server")
+ /// Unable to start or manage the IPC server listening for events from OpenVPN.
+ EventDispatcherError {
+ description("Unable to start or manage the event dispatcher IPC server")
}
}
}
@@ -32,89 +31,127 @@ mod errors {
pub use self::errors::*;
-/// Possible events from OpenVPN
-#[derive(Debug)]
-pub enum OpenVpnEvent {
- /// An event from the plugin loaded into OpenVPN.
- PluginEvent(openvpn_ffi::OpenVpnPluginEvent, openvpn_ffi::OpenVpnEnv),
- /// The OpenVPN process exited. Containing the result of waiting for the process.
- Shutdown(io::Result<process::ExitStatus>),
-}
-
-/// Struct for monitoring OpenVPN processes.
+/// Struct for monitoring an OpenVPN process.
pub struct OpenVpnMonitor {
- on_event: Arc<Fn(OpenVpnEvent) + Send + Sync + 'static>,
- plugin_path: PathBuf,
- child: Arc<Mutex<Option<ChildMonitor>>>,
- event_dispatcher: OpenVpnEventDispatcher,
+ child: Arc<duct::Handle>,
+ event_dispatcher: Option<OpenVpnEventDispatcher>,
+ closed: Arc<AtomicBool>,
}
impl OpenVpnMonitor {
/// Creates a new `OpenVpnMonitor` with the given listener and using the plugin at the given
/// path.
- pub fn new<L, P>(on_event: L, plugin_path: P) -> Result<Self>
- where L: Fn(OpenVpnEvent) + Send + Sync + 'static,
+ pub fn new<L, P>(mut cmd: OpenVpnCommand, on_event: L, plugin_path: P) -> Result<Self>
+ where L: Fn(OpenVpnPluginEvent, OpenVpnEnv) + Send + Sync + 'static,
P: AsRef<Path>
{
- let on_event = Arc::new(on_event);
- let event_dispatcher = Self::start_event_dispatcher(on_event.clone())?;
+ let event_dispatcher = OpenVpnEventDispatcher::start(on_event)
+ .chain_err(|| ErrorKind::EventDispatcherError)?;
+
+ cmd.plugin(plugin_path, vec![event_dispatcher.address().to_owned()]);
+ let child = cmd.build()
+ .start()
+ .chain_err(|| ErrorKind::ChildProcessError("Failed to start"))?;
+
Ok(
OpenVpnMonitor {
- on_event,
- plugin_path: plugin_path.as_ref().to_owned(),
- child: Arc::new(Mutex::new(None)),
- event_dispatcher,
+ child: Arc::new(child),
+ event_dispatcher: Some(event_dispatcher),
+ closed: Arc::new(AtomicBool::new(false)),
},
)
}
- fn start_event_dispatcher(on_event: Arc<Fn(OpenVpnEvent) + Send + Sync + 'static>)
- -> Result<OpenVpnEventDispatcher> {
- let on_plugin_event = move |event, env| (*on_event)(OpenVpnEvent::PluginEvent(event, env));
- OpenVpnEventDispatcher::start(on_plugin_event).chain_err(|| ErrorKind::IpcServerError)
+ /// Creates a handle to this monitor, allowing the tunnel to be closed while some other thread
+ /// is blocked in `wait`.
+ pub fn close_handle(&self) -> OpenVpnCloseHandle {
+ OpenVpnCloseHandle {
+ child: self.child.clone(),
+ closed: self.closed.clone(),
+ }
}
- /// Tries to start a new OpenVPN process if one is not already running.
- /// If this `OpenVpnMonitor is already monitoring a running process it will return an
- /// `InvalidState` error.
- pub fn start(&self, cmd: OpenVpnCommand) -> Result<()> {
- let mut child_lock = self.child.lock().unwrap();
- if child_lock.is_some() {
- bail!(ErrorKind::InvalidState);
+ /// Consumes the monitor and blocks until OpenVPN exits or there is an error in either waiting
+ /// for the process or in the event dispatcher.
+ pub fn wait(mut self) -> Result<()> {
+ match self.wait_result() {
+ WaitResult::Child(Ok(exit_status)) => {
+ if exit_status.success() || self.closed.load(Ordering::SeqCst) {
+ debug!(
+ "OpenVPN exited, as expected, with exit status: {}",
+ exit_status
+ );
+ Ok(())
+ } else {
+ error!("OpenVPN died unexpectedly with status: {}", exit_status);
+ Err(ErrorKind::ChildProcessError("Died unexpectedly").into())
+ }
+ }
+ WaitResult::Child(Err(e)) => {
+ error!("OpenVPN process wait error: {}", e);
+ Err(e).chain_err(|| ErrorKind::ChildProcessError("Error when waiting"))
+ }
+ WaitResult::EventDispatcher(result) => {
+ error!("OpenVpnEventDispatcher exited unexpectedly: {:?}", result);
+ match result {
+ Ok(()) => Err(ErrorKind::EventDispatcherError.into()),
+ Err(e) => Err(e).chain_err(|| ErrorKind::EventDispatcherError),
+ }
+ }
}
- *child_lock = Some(self.start_child_monitor(cmd)?);
- Ok(())
}
- fn start_child_monitor(&self, mut cmd: OpenVpnCommand) -> Result<ChildMonitor> {
- self.set_plugin(&mut cmd);
+ /// Waits for both the child process and the event dispatcher in parallel. After both have
+ /// returned this returns the earliest result.
+ fn wait_result(&mut self) -> WaitResult {
+ let child_wait_handle = self.child.clone();
+ let child_kill_handle = self.child.clone();
+ let event_dispatcher = self.event_dispatcher.take().unwrap();
+ let dispatcher_handle = event_dispatcher.close_handle();
- let child = self.child.clone();
- let on_event = self.on_event.clone();
+ let (child_tx, rx) = mpsc::channel();
+ let dispatcher_tx = child_tx.clone();
- let on_exit = move |exit_status: io::Result<&process::Output>| {
- *child.lock().unwrap() = None;
- (*on_event)(OpenVpnEvent::Shutdown(exit_status.map(|output| output.status)),)
- };
- ChildMonitor::start(&cmd.build(), on_exit).chain_err(|| ErrorKind::ChildProcessError)
- }
+ thread::spawn(
+ move || {
+ let result = child_wait_handle.wait().map(|output| output.status);
+ child_tx.send(WaitResult::Child(result)).unwrap();
+ dispatcher_handle.close();
+ },
+ );
+ thread::spawn(
+ move || {
+ let result = event_dispatcher.wait();
+ dispatcher_tx.send(WaitResult::EventDispatcher(result)).unwrap();
+ let _ = child_kill_handle.kill();
+ },
+ );
- fn set_plugin(&self, cmd: &mut OpenVpnCommand) {
- let event_dispatcher_address = self.event_dispatcher.address().to_string();
- cmd.plugin(&self.plugin_path, vec![event_dispatcher_address]);
+ let result = rx.recv().unwrap();
+ let _ = rx.recv().unwrap();
+ result
}
+}
- /// Tries to kill the OpenVPN process if it is running. If it is already dead, this does
- /// nothing.
- pub fn kill(&self) -> Result<()> {
- if let Some(ref child) = *self.child.lock().unwrap() {
- child.kill().chain_err(|| ErrorKind::ChildProcessError)?;
- }
- Ok(())
- }
+/// A handle to an `OpenVpnMonitor` for closing it.
+pub struct OpenVpnCloseHandle {
+ child: Arc<duct::Handle>,
+ closed: Arc<AtomicBool>,
}
+impl OpenVpnCloseHandle {
+ /// Kills the underlying OpenVPN process, making the `OpenVpnMonitor::wait` method return.
+ pub fn close(&self) -> io::Result<()> {
+ self.closed.store(true, Ordering::SeqCst);
+ self.child.kill()
+ }
+}
+/// Internal enum to differentiate between if the child process or the event dispatcher died first.
+enum WaitResult {
+ Child(io::Result<process::ExitStatus>),
+ EventDispatcher(talpid_ipc::Result<()>),
+}
/// IPC server for listening to events coming from plugin loaded into OpenVPN.
@@ -125,8 +162,7 @@ pub struct OpenVpnEventDispatcher {
impl OpenVpnEventDispatcher {
/// Construct and start the IPC server with the given event listener callback.
pub fn start<L>(on_event: L) -> talpid_ipc::Result<Self>
- where L: Fn(openvpn_ffi::OpenVpnPluginEvent, openvpn_ffi::OpenVpnEnv),
- L: Send + Sync + 'static
+ where L: Fn(OpenVpnPluginEvent, OpenVpnEnv) + Send + Sync + 'static
{
let rpc = OpenVpnEventApiImpl { on_event };
let mut io = IoHandler::new();
@@ -140,7 +176,14 @@ impl OpenVpnEventDispatcher {
self.server.address()
}
- /// Consumes the server and waits for it to finish.
+ /// Creates a handle to this event dispatcher, allowing the listening server to be closed while
+ /// some other thread is blocked in `wait`.
+ pub fn close_handle(&self) -> talpid_ipc::CloseHandle {
+ self.server.close_handle()
+ }
+
+ /// Consumes the server and waits for it to finish. Returns an error if the server exited
+ /// due to an error.
pub fn wait(self) -> talpid_ipc::Result<()> {
self.server.wait()
}
@@ -153,8 +196,8 @@ mod api {
pub trait OpenVpnEventApi {
#[rpc(name = "openvpn_event")]
fn openvpn_event(&self,
- openvpn_ffi::OpenVpnPluginEvent,
- openvpn_ffi::OpenVpnEnv)
+ OpenVpnPluginEvent,
+ OpenVpnEnv)
-> StdResult<(), Error>;
}
}
@@ -162,18 +205,15 @@ mod api {
use self::api::*;
struct OpenVpnEventApiImpl<L>
- where L: Fn(openvpn_ffi::OpenVpnPluginEvent, openvpn_ffi::OpenVpnEnv) + Send + Sync + 'static
+ where L: Fn(OpenVpnPluginEvent, OpenVpnEnv) + Send + Sync + 'static
{
on_event: L,
}
impl<L> OpenVpnEventApi for OpenVpnEventApiImpl<L>
- where L: Fn(openvpn_ffi::OpenVpnPluginEvent, openvpn_ffi::OpenVpnEnv) + Send + Sync + 'static
+ where L: Fn(OpenVpnPluginEvent, OpenVpnEnv) + Send + Sync + 'static
{
- fn openvpn_event(&self,
- event: openvpn_ffi::OpenVpnPluginEvent,
- env: openvpn_ffi::OpenVpnEnv)
- -> StdResult<(), Error> {
+ fn openvpn_event(&self, event: OpenVpnPluginEvent, env: OpenVpnEnv) -> StdResult<(), Error> {
debug!("OpenVPN event {:?}", event);
(self.on_event)(event, env);
Ok(())
diff --git a/talpid_ipc/Cargo.toml b/talpid_ipc/Cargo.toml
index d1a2bb13e2..630cb1ced1 100644
--- a/talpid_ipc/Cargo.toml
+++ b/talpid_ipc/Cargo.toml
@@ -9,12 +9,12 @@ error-chain = "0.10"
serde = "1.0"
serde_json = "1.0"
log = "0.3"
-jsonrpc-core = { git = "https://github.com/faern/jsonrpc", branch = "bind-zero" }
-jsonrpc-ws-server = { git = "https://github.com/faern/jsonrpc", branch = "bind-zero" }
+jsonrpc-core = { git = "https://github.com/faern/jsonrpc", branch = "ws-close-handle" }
+jsonrpc-ws-server = { git = "https://github.com/faern/jsonrpc", branch = "ws-close-handle" }
ws = { git = "https://github.com/tomusdrw/ws-rs" }
url = "1.4"
[dev-dependencies]
assert_matches = "1.0"
env_logger = "0.4"
-jsonrpc-macros = { git = "https://github.com/faern/jsonrpc", branch = "bind-zero" }
+jsonrpc-macros = { git = "https://github.com/faern/jsonrpc", branch = "ws-close-handle" }
diff --git a/talpid_ipc/src/lib.rs b/talpid_ipc/src/lib.rs
index f6d4acea19..3ef9bf4733 100644
--- a/talpid_ipc/src/lib.rs
+++ b/talpid_ipc/src/lib.rs
@@ -26,18 +26,6 @@ pub type IpcServerId = String;
error_chain!{
errors {
- ReadFailure {
- description("Could not read IPC message")
- }
- ParseFailure {
- description("Unable to serialize/deserialize message")
- }
- CouldNotStartServer {
- description("Failed to start the IPC server")
- }
- SendError {
- description("Unable to send message")
- }
IpcServerError {
description("Error in IPC server")
}
@@ -74,17 +62,28 @@ impl IpcServer {
.chain_err(|| ErrorKind::IpcServerError)
}
+ /// Returns the localhost address this `IpcServer` is listening on.
pub fn address(&self) -> &str {
&self.address
}
- /// Consumes the server, stops it and waits for it to finish.
- pub fn stop(self) {
- self.server.close();
+ /// Creates a handle bound to this `IpcServer` that can be used to shut it down.
+ pub fn close_handle(&self) -> CloseHandle {
+ CloseHandle(self.server.close_handle())
}
- /// Consumes the server and waits for it to finish.
+ /// Consumes the server and waits for it to finish. Get an `CloseHandle` before calling this
+ /// if you want to be able to shut the server down.
pub fn wait(self) -> Result<()> {
self.server.wait().chain_err(|| ErrorKind::IpcServerError)
}
}
+
+#[derive(Clone)]
+pub struct CloseHandle(jsonrpc_ws_server::CloseHandle);
+
+impl CloseHandle {
+ pub fn close(self) {
+ self.0.close();
+ }
+}