diff options
| author | Linus Färnstrand <linus@mullvad.net> | 2017-06-09 13:20:18 +0200 |
|---|---|---|
| committer | Linus Färnstrand <linus@mullvad.net> | 2017-06-09 13:20:18 +0200 |
| commit | c850ba6d06010f3c0d528a750659707ac48fd2ae (patch) | |
| tree | c26bb0035409e98cca29a0d7ec3201a875ab7315 | |
| parent | c2aaa4eb26af015ef949785668751cded783675d (diff) | |
| parent | 7deaeb4712d334bd34f8f98bbc168978020862f7 (diff) | |
| download | mullvadvpn-c850ba6d06010f3c0d528a750659707ac48fd2ae.tar.xz mullvadvpn-c850ba6d06010f3c0d528a750659707ac48fd2ae.zip | |
Merge branch 'low-level-close-handle' into master-new-daemon
| -rw-r--r-- | Cargo.lock | 50 | ||||
| -rw-r--r-- | mullvad_daemon/Cargo.toml | 8 | ||||
| -rw-r--r-- | talpid_core/Cargo.toml | 4 | ||||
| -rw-r--r-- | talpid_core/src/process/mod.rs | 3 | ||||
| -rw-r--r-- | talpid_core/src/process/monitor.rs | 113 | ||||
| -rw-r--r-- | talpid_core/src/tunnel/openvpn.rs | 202 | ||||
| -rw-r--r-- | talpid_ipc/Cargo.toml | 6 | ||||
| -rw-r--r-- | talpid_ipc/src/lib.rs | 31 |
8 files changed, 170 insertions, 247 deletions
diff --git a/Cargo.lock b/Cargo.lock index 9e86a409c1..8d587baee0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -213,7 +213,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "jsonrpc-core" version = "7.0.0" -source = "git+https://github.com/faern/jsonrpc?branch=bind-zero#863467e499d3c8e0262d1dc211d0525352471d7a" +source = "git+https://github.com/faern/jsonrpc?branch=ws-close-handle#8c47e034f05b8e42d3440078b758dd732e85894c" dependencies = [ "futures 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", @@ -225,19 +225,19 @@ dependencies = [ [[package]] name = "jsonrpc-macros" version = "7.0.0" -source = "git+https://github.com/faern/jsonrpc?branch=bind-zero#863467e499d3c8e0262d1dc211d0525352471d7a" +source = "git+https://github.com/faern/jsonrpc?branch=ws-close-handle#8c47e034f05b8e42d3440078b758dd732e85894c" dependencies = [ - "jsonrpc-core 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)", - "jsonrpc-pubsub 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)", + "jsonrpc-core 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)", + "jsonrpc-pubsub 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)", "serde 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] name = "jsonrpc-pubsub" version = "7.0.0" -source = "git+https://github.com/faern/jsonrpc?branch=bind-zero#863467e499d3c8e0262d1dc211d0525352471d7a" +source = "git+https://github.com/faern/jsonrpc?branch=ws-close-handle#8c47e034f05b8e42d3440078b758dd732e85894c" dependencies = [ - "jsonrpc-core 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)", + "jsonrpc-core 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)", "log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -245,10 +245,10 @@ dependencies = [ [[package]] name = "jsonrpc-server-utils" version = "7.0.0" -source = "git+https://github.com/faern/jsonrpc?branch=bind-zero#863467e499d3c8e0262d1dc211d0525352471d7a" +source = "git+https://github.com/faern/jsonrpc?branch=ws-close-handle#8c47e034f05b8e42d3440078b758dd732e85894c" dependencies = [ "globset 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", - "jsonrpc-core 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)", + "jsonrpc-core 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)", "log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-core 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -257,10 +257,10 @@ dependencies = [ [[package]] name = "jsonrpc-ws-server" version = "7.0.0" -source = "git+https://github.com/faern/jsonrpc?branch=bind-zero#863467e499d3c8e0262d1dc211d0525352471d7a" +source = "git+https://github.com/faern/jsonrpc?branch=ws-close-handle#8c47e034f05b8e42d3440078b758dd732e85894c" dependencies = [ - "jsonrpc-core 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)", - "jsonrpc-server-utils 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)", + "jsonrpc-core 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)", + "jsonrpc-server-utils 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)", "log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", "ws 0.7.1 (git+https://github.com/tomusdrw/ws-rs)", ] @@ -341,10 +341,10 @@ dependencies = [ "assert_matches 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", "error-chain 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)", - "jsonrpc-core 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)", - "jsonrpc-macros 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)", - "jsonrpc-pubsub 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)", - "jsonrpc-ws-server 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)", + "jsonrpc-core 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)", + "jsonrpc-macros 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)", + "jsonrpc-pubsub 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)", + "jsonrpc-ws-server 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)", "log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -575,8 +575,8 @@ dependencies = [ "assert_matches 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "duct 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)", "error-chain 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)", - "jsonrpc-core 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)", - "jsonrpc-macros 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)", + "jsonrpc-core 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)", + "jsonrpc-macros 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)", "log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", "openvpn_ffi 0.1.0", "talpid_ipc 0.1.0", @@ -589,9 +589,9 @@ dependencies = [ "assert_matches 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", "error-chain 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)", - "jsonrpc-core 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)", - "jsonrpc-macros 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)", - "jsonrpc-ws-server 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)", + "jsonrpc-core 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)", + "jsonrpc-macros 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)", + "jsonrpc-ws-server 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)", "log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -774,11 +774,11 @@ dependencies = [ "checksum idna 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "6ac85ec3f80c8e4e99d9325521337e14ec7555c458a14e377d189659a427f375" "checksum iovec 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "29d062ee61fccdf25be172e70f34c9f6efc597e1fb8f6526e8437b2046ab26be" "checksum itoa 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "eb2f404fbc66fd9aac13e998248505e7ecb2ad8e44ab6388684c5fb11c6c251c" -"checksum jsonrpc-core 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)" = "<none>" -"checksum jsonrpc-macros 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)" = "<none>" -"checksum jsonrpc-pubsub 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)" = "<none>" -"checksum jsonrpc-server-utils 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)" = "<none>" -"checksum jsonrpc-ws-server 7.0.0 (git+https://github.com/faern/jsonrpc?branch=bind-zero)" = "<none>" +"checksum jsonrpc-core 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)" = "<none>" +"checksum jsonrpc-macros 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)" = "<none>" +"checksum jsonrpc-pubsub 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)" = "<none>" +"checksum jsonrpc-server-utils 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)" = "<none>" +"checksum jsonrpc-ws-server 7.0.0 (git+https://github.com/faern/jsonrpc?branch=ws-close-handle)" = "<none>" "checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d" "checksum lazycell 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ce12306c4739d86ee97c23139f3a34ddf0387bbf181bc7929d287025a8c3ef6b" "checksum lazycell 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "3b585b7a6811fb03aa10e74b278a0f00f8dd9b45dc681f148bb29fa5cb61859b" diff --git a/mullvad_daemon/Cargo.toml b/mullvad_daemon/Cargo.toml index 9982a1bbaa..e425e71515 100644 --- a/mullvad_daemon/Cargo.toml +++ b/mullvad_daemon/Cargo.toml @@ -10,10 +10,10 @@ serde = "1.0" serde_derive = "1.0" log = "0.3" env_logger = "0.4" -jsonrpc-core = { git = "https://github.com/faern/jsonrpc", branch = "bind-zero" } -jsonrpc-macros = { git = "https://github.com/faern/jsonrpc", branch = "bind-zero" } -jsonrpc-pubsub = { git = "https://github.com/faern/jsonrpc", branch = "bind-zero" } -jsonrpc-ws-server = { git = "https://github.com/faern/jsonrpc", branch = "bind-zero" } +jsonrpc-core = { git = "https://github.com/faern/jsonrpc", branch = "ws-close-handle" } +jsonrpc-macros = { git = "https://github.com/faern/jsonrpc", branch = "ws-close-handle" } +jsonrpc-pubsub = { git = "https://github.com/faern/jsonrpc", branch = "ws-close-handle" } +jsonrpc-ws-server = { git = "https://github.com/faern/jsonrpc", branch = "ws-close-handle" } [dependencies.talpid_ipc] path = "../talpid_ipc" diff --git a/talpid_core/Cargo.toml b/talpid_core/Cargo.toml index 18c998cf93..6fd7222f27 100644 --- a/talpid_core/Cargo.toml +++ b/talpid_core/Cargo.toml @@ -8,8 +8,8 @@ description = "Core backend functionality of the Mullvad VPN client" duct = "0.8" error-chain = "0.10" log = "0.3" -jsonrpc-core = { git = "https://github.com/faern/jsonrpc", branch = "bind-zero" } -jsonrpc-macros = { git = "https://github.com/faern/jsonrpc", branch = "bind-zero" } +jsonrpc-core = { git = "https://github.com/faern/jsonrpc", branch = "ws-close-handle" } +jsonrpc-macros = { git = "https://github.com/faern/jsonrpc", branch = "ws-close-handle" } [dependencies.talpid_ipc] path = "../talpid_ipc" diff --git a/talpid_core/src/process/mod.rs b/talpid_core/src/process/mod.rs index a7d76ea9f4..6ae568907b 100644 --- a/talpid_core/src/process/mod.rs +++ b/talpid_core/src/process/mod.rs @@ -1,5 +1,2 @@ -/// A module for monitoring child processes and get notified of events on them. -pub mod monitor; - /// A module for all OpenVPN related process management. pub mod openvpn; diff --git a/talpid_core/src/process/monitor.rs b/talpid_core/src/process/monitor.rs deleted file mode 100644 index 1568d80615..0000000000 --- a/talpid_core/src/process/monitor.rs +++ /dev/null @@ -1,113 +0,0 @@ -use duct; - -use std::io; -use std::process; -use std::sync::Arc; -use std::thread; - -/// A child process monitor. Takes care of starting and monitoring a child process and calls the -/// listener on child exit. If the child is still running when a `ChildMonitor` instance goes out -/// of scope it will kill the child and wait for it to exit properly. -pub struct ChildMonitor { - child: Arc<duct::Handle>, - thread: Option<thread::JoinHandle<()>>, -} - -impl ChildMonitor { - /// Starts the child process and begins to monitor it. `on_exit` will be called as soon as the - /// child process exits. - pub fn start<L>(expression: &duct::Expression, mut on_exit: L) -> io::Result<Self> - where L: FnMut(io::Result<&process::Output>) + Send + 'static - { - let child = Arc::new(expression.start()?); - let child_clone = child.clone(); - let thread = Some(thread::spawn(move || on_exit(child_clone.wait()))); - Ok(ChildMonitor { child, thread }) - } - - /// Wait for the child to exit. Blocking the current thread. The `on_exit` callback is - /// guaranteed to fire before this method returns. - pub fn wait(&mut self) -> io::Result<&process::Output> { - if let Some(thread) = self.thread.take() { - if let Err(e) = thread.join() { - error!("Panic in the on_exit callback in ChildMonitor: {:?}", e); - } - } - self.child.wait() - } - - /// Send a kill signal to the child. No need to call `wait` after to free the PID. The monitor - /// will wait for the process for you. - pub fn kill(&self) -> io::Result<()> { - self.child.kill() - } -} - -impl Drop for ChildMonitor { - fn drop(&mut self) { - let _ = self.kill(); - let _ = self.child.wait(); - } -} - - -#[cfg(test)] -mod child_monitor_tests { - use super::*; - use duct::{Expression, cmd}; - - use std::sync::mpsc; - use std::time::Duration; - - fn echo_cmd(s: &str) -> Expression { - cmd("echo", &[s]).stdout_capture().unchecked() - } - - fn invalid_cmd() -> Expression { - cmd("this command does not exist", &[""]).unchecked() - } - - fn sleep_cmd(secs: u32) -> Expression { - if cfg!(windows) { - cmd("ping", &["127.0.0.1", "-n", &(secs + 1).to_string()]).unchecked() - } else { - cmd("sleep", &[secs.to_string()]).unchecked() - } - } - - fn spawn(cmd: &Expression) -> (ChildMonitor, mpsc::Receiver<io::Result<process::Output>>) { - let (tx, rx) = mpsc::channel(); - let child = - ChildMonitor::start(cmd, move |res| tx.send(res.map(|out| out.clone())).unwrap()) - .expect("Unable to start process"); - (child, rx) - } - - #[test] - fn echo() { - let (mut child, rx) = spawn(&echo_cmd("foobar")); - let wait_output = child.wait().unwrap(); - let callback_output = rx.try_recv().unwrap().unwrap(); - - assert!(callback_output.status.success()); - assert_eq!("foobar\n".as_bytes(), &callback_output.stdout[..]); - assert_eq!(wait_output.status, callback_output.status); - assert_eq!(wait_output.stdout, callback_output.stdout); - } - - #[test] - fn invalid_command() { - assert!(ChildMonitor::start(&invalid_cmd(), |_| {}).is_err()); - } - - #[test] - fn callback_after_kill() { - let (child, rx) = spawn(&sleep_cmd(100000)); - // Make sure on_exit is not triggered within the first second. It should not be called - // until we kill the process. - assert!(rx.recv_timeout(Duration::from_secs(1)).is_err()); - - child.kill().unwrap(); - assert!(!rx.recv_timeout(Duration::from_secs(10)).unwrap().unwrap().status.success()); - } -} diff --git a/talpid_core/src/tunnel/openvpn.rs b/talpid_core/src/tunnel/openvpn.rs index 9d366ebb4a..cf33484f38 100644 --- a/talpid_core/src/tunnel/openvpn.rs +++ b/talpid_core/src/tunnel/openvpn.rs @@ -1,30 +1,29 @@ +use duct; use jsonrpc_core::{Error, IoHandler}; -use openvpn_ffi; -use process::monitor::ChildMonitor; +use openvpn_ffi::{OpenVpnEnv, OpenVpnPluginEvent}; use process::openvpn::OpenVpnCommand; use std::io; -use std::path::{Path, PathBuf}; +use std::path::Path; use std::process; use std::result::Result as StdResult; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, mpsc}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::thread; use talpid_ipc; mod errors { error_chain!{ errors { - /// The `OpenVpnMonitor` is in an invalid state for the requested operation. - InvalidState { - description("Invalid state. OpenVPN is already running") + /// Unable to start, wait for or kill the OpenVPN process. + ChildProcessError(msg: &'static str) { + description("Unable to start, wait for or kill the OpenVPN process") + display("OpenVPN process error: {}", msg) } - /// Unable to start or kill the OpenVPN process. - ChildProcessError { - description("Unable to start or kill the OpenVPN process") - } - /// Unable to start or manage the IPC server listening for events from OpenVPN - IpcServerError { - description("Unable to start or manage the IPC server") + /// Unable to start or manage the IPC server listening for events from OpenVPN. + EventDispatcherError { + description("Unable to start or manage the event dispatcher IPC server") } } } @@ -32,89 +31,127 @@ mod errors { pub use self::errors::*; -/// Possible events from OpenVPN -#[derive(Debug)] -pub enum OpenVpnEvent { - /// An event from the plugin loaded into OpenVPN. - PluginEvent(openvpn_ffi::OpenVpnPluginEvent, openvpn_ffi::OpenVpnEnv), - /// The OpenVPN process exited. Containing the result of waiting for the process. - Shutdown(io::Result<process::ExitStatus>), -} - -/// Struct for monitoring OpenVPN processes. +/// Struct for monitoring an OpenVPN process. pub struct OpenVpnMonitor { - on_event: Arc<Fn(OpenVpnEvent) + Send + Sync + 'static>, - plugin_path: PathBuf, - child: Arc<Mutex<Option<ChildMonitor>>>, - event_dispatcher: OpenVpnEventDispatcher, + child: Arc<duct::Handle>, + event_dispatcher: Option<OpenVpnEventDispatcher>, + closed: Arc<AtomicBool>, } impl OpenVpnMonitor { /// Creates a new `OpenVpnMonitor` with the given listener and using the plugin at the given /// path. - pub fn new<L, P>(on_event: L, plugin_path: P) -> Result<Self> - where L: Fn(OpenVpnEvent) + Send + Sync + 'static, + pub fn new<L, P>(mut cmd: OpenVpnCommand, on_event: L, plugin_path: P) -> Result<Self> + where L: Fn(OpenVpnPluginEvent, OpenVpnEnv) + Send + Sync + 'static, P: AsRef<Path> { - let on_event = Arc::new(on_event); - let event_dispatcher = Self::start_event_dispatcher(on_event.clone())?; + let event_dispatcher = OpenVpnEventDispatcher::start(on_event) + .chain_err(|| ErrorKind::EventDispatcherError)?; + + cmd.plugin(plugin_path, vec![event_dispatcher.address().to_owned()]); + let child = cmd.build() + .start() + .chain_err(|| ErrorKind::ChildProcessError("Failed to start"))?; + Ok( OpenVpnMonitor { - on_event, - plugin_path: plugin_path.as_ref().to_owned(), - child: Arc::new(Mutex::new(None)), - event_dispatcher, + child: Arc::new(child), + event_dispatcher: Some(event_dispatcher), + closed: Arc::new(AtomicBool::new(false)), }, ) } - fn start_event_dispatcher(on_event: Arc<Fn(OpenVpnEvent) + Send + Sync + 'static>) - -> Result<OpenVpnEventDispatcher> { - let on_plugin_event = move |event, env| (*on_event)(OpenVpnEvent::PluginEvent(event, env)); - OpenVpnEventDispatcher::start(on_plugin_event).chain_err(|| ErrorKind::IpcServerError) + /// Creates a handle to this monitor, allowing the tunnel to be closed while some other thread + /// is blocked in `wait`. + pub fn close_handle(&self) -> OpenVpnCloseHandle { + OpenVpnCloseHandle { + child: self.child.clone(), + closed: self.closed.clone(), + } } - /// Tries to start a new OpenVPN process if one is not already running. - /// If this `OpenVpnMonitor is already monitoring a running process it will return an - /// `InvalidState` error. - pub fn start(&self, cmd: OpenVpnCommand) -> Result<()> { - let mut child_lock = self.child.lock().unwrap(); - if child_lock.is_some() { - bail!(ErrorKind::InvalidState); + /// Consumes the monitor and blocks until OpenVPN exits or there is an error in either waiting + /// for the process or in the event dispatcher. + pub fn wait(mut self) -> Result<()> { + match self.wait_result() { + WaitResult::Child(Ok(exit_status)) => { + if exit_status.success() || self.closed.load(Ordering::SeqCst) { + debug!( + "OpenVPN exited, as expected, with exit status: {}", + exit_status + ); + Ok(()) + } else { + error!("OpenVPN died unexpectedly with status: {}", exit_status); + Err(ErrorKind::ChildProcessError("Died unexpectedly").into()) + } + } + WaitResult::Child(Err(e)) => { + error!("OpenVPN process wait error: {}", e); + Err(e).chain_err(|| ErrorKind::ChildProcessError("Error when waiting")) + } + WaitResult::EventDispatcher(result) => { + error!("OpenVpnEventDispatcher exited unexpectedly: {:?}", result); + match result { + Ok(()) => Err(ErrorKind::EventDispatcherError.into()), + Err(e) => Err(e).chain_err(|| ErrorKind::EventDispatcherError), + } + } } - *child_lock = Some(self.start_child_monitor(cmd)?); - Ok(()) } - fn start_child_monitor(&self, mut cmd: OpenVpnCommand) -> Result<ChildMonitor> { - self.set_plugin(&mut cmd); + /// Waits for both the child process and the event dispatcher in parallel. After both have + /// returned this returns the earliest result. + fn wait_result(&mut self) -> WaitResult { + let child_wait_handle = self.child.clone(); + let child_kill_handle = self.child.clone(); + let event_dispatcher = self.event_dispatcher.take().unwrap(); + let dispatcher_handle = event_dispatcher.close_handle(); - let child = self.child.clone(); - let on_event = self.on_event.clone(); + let (child_tx, rx) = mpsc::channel(); + let dispatcher_tx = child_tx.clone(); - let on_exit = move |exit_status: io::Result<&process::Output>| { - *child.lock().unwrap() = None; - (*on_event)(OpenVpnEvent::Shutdown(exit_status.map(|output| output.status)),) - }; - ChildMonitor::start(&cmd.build(), on_exit).chain_err(|| ErrorKind::ChildProcessError) - } + thread::spawn( + move || { + let result = child_wait_handle.wait().map(|output| output.status); + child_tx.send(WaitResult::Child(result)).unwrap(); + dispatcher_handle.close(); + }, + ); + thread::spawn( + move || { + let result = event_dispatcher.wait(); + dispatcher_tx.send(WaitResult::EventDispatcher(result)).unwrap(); + let _ = child_kill_handle.kill(); + }, + ); - fn set_plugin(&self, cmd: &mut OpenVpnCommand) { - let event_dispatcher_address = self.event_dispatcher.address().to_string(); - cmd.plugin(&self.plugin_path, vec![event_dispatcher_address]); + let result = rx.recv().unwrap(); + let _ = rx.recv().unwrap(); + result } +} - /// Tries to kill the OpenVPN process if it is running. If it is already dead, this does - /// nothing. - pub fn kill(&self) -> Result<()> { - if let Some(ref child) = *self.child.lock().unwrap() { - child.kill().chain_err(|| ErrorKind::ChildProcessError)?; - } - Ok(()) - } +/// A handle to an `OpenVpnMonitor` for closing it. +pub struct OpenVpnCloseHandle { + child: Arc<duct::Handle>, + closed: Arc<AtomicBool>, } +impl OpenVpnCloseHandle { + /// Kills the underlying OpenVPN process, making the `OpenVpnMonitor::wait` method return. + pub fn close(&self) -> io::Result<()> { + self.closed.store(true, Ordering::SeqCst); + self.child.kill() + } +} +/// Internal enum to differentiate between if the child process or the event dispatcher died first. +enum WaitResult { + Child(io::Result<process::ExitStatus>), + EventDispatcher(talpid_ipc::Result<()>), +} /// IPC server for listening to events coming from plugin loaded into OpenVPN. @@ -125,8 +162,7 @@ pub struct OpenVpnEventDispatcher { impl OpenVpnEventDispatcher { /// Construct and start the IPC server with the given event listener callback. pub fn start<L>(on_event: L) -> talpid_ipc::Result<Self> - where L: Fn(openvpn_ffi::OpenVpnPluginEvent, openvpn_ffi::OpenVpnEnv), - L: Send + Sync + 'static + where L: Fn(OpenVpnPluginEvent, OpenVpnEnv) + Send + Sync + 'static { let rpc = OpenVpnEventApiImpl { on_event }; let mut io = IoHandler::new(); @@ -140,7 +176,14 @@ impl OpenVpnEventDispatcher { self.server.address() } - /// Consumes the server and waits for it to finish. + /// Creates a handle to this event dispatcher, allowing the listening server to be closed while + /// some other thread is blocked in `wait`. + pub fn close_handle(&self) -> talpid_ipc::CloseHandle { + self.server.close_handle() + } + + /// Consumes the server and waits for it to finish. Returns an error if the server exited + /// due to an error. pub fn wait(self) -> talpid_ipc::Result<()> { self.server.wait() } @@ -153,8 +196,8 @@ mod api { pub trait OpenVpnEventApi { #[rpc(name = "openvpn_event")] fn openvpn_event(&self, - openvpn_ffi::OpenVpnPluginEvent, - openvpn_ffi::OpenVpnEnv) + OpenVpnPluginEvent, + OpenVpnEnv) -> StdResult<(), Error>; } } @@ -162,18 +205,15 @@ mod api { use self::api::*; struct OpenVpnEventApiImpl<L> - where L: Fn(openvpn_ffi::OpenVpnPluginEvent, openvpn_ffi::OpenVpnEnv) + Send + Sync + 'static + where L: Fn(OpenVpnPluginEvent, OpenVpnEnv) + Send + Sync + 'static { on_event: L, } impl<L> OpenVpnEventApi for OpenVpnEventApiImpl<L> - where L: Fn(openvpn_ffi::OpenVpnPluginEvent, openvpn_ffi::OpenVpnEnv) + Send + Sync + 'static + where L: Fn(OpenVpnPluginEvent, OpenVpnEnv) + Send + Sync + 'static { - fn openvpn_event(&self, - event: openvpn_ffi::OpenVpnPluginEvent, - env: openvpn_ffi::OpenVpnEnv) - -> StdResult<(), Error> { + fn openvpn_event(&self, event: OpenVpnPluginEvent, env: OpenVpnEnv) -> StdResult<(), Error> { debug!("OpenVPN event {:?}", event); (self.on_event)(event, env); Ok(()) diff --git a/talpid_ipc/Cargo.toml b/talpid_ipc/Cargo.toml index d1a2bb13e2..630cb1ced1 100644 --- a/talpid_ipc/Cargo.toml +++ b/talpid_ipc/Cargo.toml @@ -9,12 +9,12 @@ error-chain = "0.10" serde = "1.0" serde_json = "1.0" log = "0.3" -jsonrpc-core = { git = "https://github.com/faern/jsonrpc", branch = "bind-zero" } -jsonrpc-ws-server = { git = "https://github.com/faern/jsonrpc", branch = "bind-zero" } +jsonrpc-core = { git = "https://github.com/faern/jsonrpc", branch = "ws-close-handle" } +jsonrpc-ws-server = { git = "https://github.com/faern/jsonrpc", branch = "ws-close-handle" } ws = { git = "https://github.com/tomusdrw/ws-rs" } url = "1.4" [dev-dependencies] assert_matches = "1.0" env_logger = "0.4" -jsonrpc-macros = { git = "https://github.com/faern/jsonrpc", branch = "bind-zero" } +jsonrpc-macros = { git = "https://github.com/faern/jsonrpc", branch = "ws-close-handle" } diff --git a/talpid_ipc/src/lib.rs b/talpid_ipc/src/lib.rs index f6d4acea19..3ef9bf4733 100644 --- a/talpid_ipc/src/lib.rs +++ b/talpid_ipc/src/lib.rs @@ -26,18 +26,6 @@ pub type IpcServerId = String; error_chain!{ errors { - ReadFailure { - description("Could not read IPC message") - } - ParseFailure { - description("Unable to serialize/deserialize message") - } - CouldNotStartServer { - description("Failed to start the IPC server") - } - SendError { - description("Unable to send message") - } IpcServerError { description("Error in IPC server") } @@ -74,17 +62,28 @@ impl IpcServer { .chain_err(|| ErrorKind::IpcServerError) } + /// Returns the localhost address this `IpcServer` is listening on. pub fn address(&self) -> &str { &self.address } - /// Consumes the server, stops it and waits for it to finish. - pub fn stop(self) { - self.server.close(); + /// Creates a handle bound to this `IpcServer` that can be used to shut it down. + pub fn close_handle(&self) -> CloseHandle { + CloseHandle(self.server.close_handle()) } - /// Consumes the server and waits for it to finish. + /// Consumes the server and waits for it to finish. Get an `CloseHandle` before calling this + /// if you want to be able to shut the server down. pub fn wait(self) -> Result<()> { self.server.wait().chain_err(|| ErrorKind::IpcServerError) } } + +#[derive(Clone)] +pub struct CloseHandle(jsonrpc_ws_server::CloseHandle); + +impl CloseHandle { + pub fn close(self) { + self.0.close(); + } +} |
