diff options
| -rw-r--r-- | talpid_core/Cargo.toml | 4 | ||||
| -rw-r--r-- | talpid_core/src/tunnel/openvpn.rs | 202 |
2 files changed, 123 insertions, 83 deletions
diff --git a/talpid_core/Cargo.toml b/talpid_core/Cargo.toml index 18c998cf93..6fd7222f27 100644 --- a/talpid_core/Cargo.toml +++ b/talpid_core/Cargo.toml @@ -8,8 +8,8 @@ description = "Core backend functionality of the Mullvad VPN client" duct = "0.8" error-chain = "0.10" log = "0.3" -jsonrpc-core = { git = "https://github.com/faern/jsonrpc", branch = "bind-zero" } -jsonrpc-macros = { git = "https://github.com/faern/jsonrpc", branch = "bind-zero" } +jsonrpc-core = { git = "https://github.com/faern/jsonrpc", branch = "ws-close-handle" } +jsonrpc-macros = { git = "https://github.com/faern/jsonrpc", branch = "ws-close-handle" } [dependencies.talpid_ipc] path = "../talpid_ipc" diff --git a/talpid_core/src/tunnel/openvpn.rs b/talpid_core/src/tunnel/openvpn.rs index 9d366ebb4a..b805d3563a 100644 --- a/talpid_core/src/tunnel/openvpn.rs +++ b/talpid_core/src/tunnel/openvpn.rs @@ -1,30 +1,29 @@ +use duct; use jsonrpc_core::{Error, IoHandler}; -use openvpn_ffi; -use process::monitor::ChildMonitor; +use openvpn_ffi::{OpenVpnEnv, OpenVpnPluginEvent}; use process::openvpn::OpenVpnCommand; use std::io; -use std::path::{Path, PathBuf}; +use std::path::Path; use std::process; use std::result::Result as StdResult; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, mpsc}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::thread; use talpid_ipc; mod errors { error_chain!{ errors { - /// The `OpenVpnMonitor` is in an invalid state for the requested operation. - InvalidState { - description("Invalid state. OpenVPN is already running") + /// Unable to start, wait for or kill the OpenVPN process. + ChildProcessError(msg: &'static str) { + description("Unable to start, wait for or kill the OpenVPN process") + display("OpenVPN process error: {}", msg) } - /// Unable to start or kill the OpenVPN process. - ChildProcessError { - description("Unable to start or kill the OpenVPN process") - } - /// Unable to start or manage the IPC server listening for events from OpenVPN - IpcServerError { - description("Unable to start or manage the IPC server") + /// Unable to start or manage the IPC server listening for events from OpenVPN. + EventDispatcherError { + description("Unable to start or manage the event dispatcher IPC server") } } } @@ -32,89 +31,127 @@ mod errors { pub use self::errors::*; -/// Possible events from OpenVPN -#[derive(Debug)] -pub enum OpenVpnEvent { - /// An event from the plugin loaded into OpenVPN. - PluginEvent(openvpn_ffi::OpenVpnPluginEvent, openvpn_ffi::OpenVpnEnv), - /// The OpenVPN process exited. Containing the result of waiting for the process. - Shutdown(io::Result<process::ExitStatus>), -} - -/// Struct for monitoring OpenVPN processes. +/// Struct for monitoring an OpenVPN process. pub struct OpenVpnMonitor { - on_event: Arc<Fn(OpenVpnEvent) + Send + Sync + 'static>, - plugin_path: PathBuf, - child: Arc<Mutex<Option<ChildMonitor>>>, - event_dispatcher: OpenVpnEventDispatcher, + child: Arc<duct::Handle>, + event_dispatcher: Option<OpenVpnEventDispatcher>, + closed: Arc<AtomicBool>, } impl OpenVpnMonitor { /// Creates a new `OpenVpnMonitor` with the given listener and using the plugin at the given /// path. - pub fn new<L, P>(on_event: L, plugin_path: P) -> Result<Self> - where L: Fn(OpenVpnEvent) + Send + Sync + 'static, + pub fn new<L, P>(mut cmd: OpenVpnCommand, on_event: L, plugin_path: P) -> Result<Self> + where L: Fn(OpenVpnPluginEvent, OpenVpnEnv) + Send + Sync + 'static, P: AsRef<Path> { - let on_event = Arc::new(on_event); - let event_dispatcher = Self::start_event_dispatcher(on_event.clone())?; + let event_dispatcher = OpenVpnEventDispatcher::start(on_event) + .chain_err(|| ErrorKind::EventDispatcherError)?; + + cmd.plugin(plugin_path, vec![event_dispatcher.address().to_owned()]); + let child = cmd.build() + .start() + .chain_err(|| ErrorKind::ChildProcessError("Failed to start"))?; + Ok( OpenVpnMonitor { - on_event, - plugin_path: plugin_path.as_ref().to_owned(), - child: Arc::new(Mutex::new(None)), - event_dispatcher, + child: Arc::new(child), + event_dispatcher: Some(event_dispatcher), + closed: Arc::new(AtomicBool::new(false)), }, ) } - fn start_event_dispatcher(on_event: Arc<Fn(OpenVpnEvent) + Send + Sync + 'static>) - -> Result<OpenVpnEventDispatcher> { - let on_plugin_event = move |event, env| (*on_event)(OpenVpnEvent::PluginEvent(event, env)); - OpenVpnEventDispatcher::start(on_plugin_event).chain_err(|| ErrorKind::IpcServerError) + /// Creates a handle to this monitor, allowing the tunnel to be closed while some other thread + /// is blocked in `wait`. + pub fn close_handle(&self) -> OpenVpnCloseHandle { + OpenVpnCloseHandle { + child: self.child.clone(), + closed: self.closed.clone(), + } } - /// Tries to start a new OpenVPN process if one is not already running. - /// If this `OpenVpnMonitor is already monitoring a running process it will return an - /// `InvalidState` error. - pub fn start(&self, cmd: OpenVpnCommand) -> Result<()> { - let mut child_lock = self.child.lock().unwrap(); - if child_lock.is_some() { - bail!(ErrorKind::InvalidState); + /// Consumes the monitor and blocks until OpenVPN exits or there is an error in either waiting + /// for the process or in the event dispatcher. + pub fn wait(mut self) -> Result<()> { + match self.wait_result() { + WaitResult::Child(Ok(exit_status)) => { + if exit_status.success() || self.closed.load(Ordering::SeqCst) { + debug!( + "OpenVPN exited, as expected, with exit status: {}", + exit_status + ); + Ok(()) + } else { + error!("OpenVPN died unepectedly with status: {}", exit_status); + Err(ErrorKind::ChildProcessError("Died unexpectedly").into()) + } + } + WaitResult::Child(Err(e)) => { + error!("OpenVPN process wait error: {}", e); + Err(e).chain_err(|| ErrorKind::ChildProcessError("Error when waiting")) + } + WaitResult::EventDispatcher(result) => { + error!("OpenVpnEventDispatcher exited unexpectedly: {:?}", result); + match result { + Ok(()) => Err(ErrorKind::EventDispatcherError.into()), + Err(e) => Err(e).chain_err(|| ErrorKind::EventDispatcherError), + } + } } - *child_lock = Some(self.start_child_monitor(cmd)?); - Ok(()) } - fn start_child_monitor(&self, mut cmd: OpenVpnCommand) -> Result<ChildMonitor> { - self.set_plugin(&mut cmd); + /// Waits for both the child process and the event dispatcher in parallel. After both have + /// returned this returns the earliest result. + fn wait_result(&mut self) -> WaitResult { + let child_wait_handle = self.child.clone(); + let child_kill_handle = self.child.clone(); + let event_dispatcher = self.event_dispatcher.take().unwrap(); + let dispatcher_handle = event_dispatcher.close_handle(); - let child = self.child.clone(); - let on_event = self.on_event.clone(); + let (child_tx, rx) = mpsc::channel(); + let dispatcher_tx = child_tx.clone(); - let on_exit = move |exit_status: io::Result<&process::Output>| { - *child.lock().unwrap() = None; - (*on_event)(OpenVpnEvent::Shutdown(exit_status.map(|output| output.status)),) - }; - ChildMonitor::start(&cmd.build(), on_exit).chain_err(|| ErrorKind::ChildProcessError) - } + thread::spawn( + move || { + let result = child_wait_handle.wait().map(|output| output.status); + child_tx.send(WaitResult::Child(result)).unwrap(); + dispatcher_handle.close(); + }, + ); + thread::spawn( + move || { + let result = event_dispatcher.wait(); + dispatcher_tx.send(WaitResult::EventDispatcher(result)).unwrap(); + let _ = child_kill_handle.kill(); + }, + ); - fn set_plugin(&self, cmd: &mut OpenVpnCommand) { - let event_dispatcher_address = self.event_dispatcher.address().to_string(); - cmd.plugin(&self.plugin_path, vec![event_dispatcher_address]); + let result = rx.recv().unwrap(); + let _ = rx.recv().unwrap(); + result } +} - /// Tries to kill the OpenVPN process if it is running. If it is already dead, this does - /// nothing. - pub fn kill(&self) -> Result<()> { - if let Some(ref child) = *self.child.lock().unwrap() { - child.kill().chain_err(|| ErrorKind::ChildProcessError)?; - } - Ok(()) - } +/// A handle to an `OpenVpnMonitor` for closing it. +pub struct OpenVpnCloseHandle { + child: Arc<duct::Handle>, + closed: Arc<AtomicBool>, } +impl OpenVpnCloseHandle { + /// Kills the underlying OpenVPN process, making the `OpenVpnMonitor::wait` method return. + pub fn close(&self) -> io::Result<()> { + self.closed.store(true, Ordering::SeqCst); + self.child.kill() + } +} +/// Internal enum to differentiate between if the child process or the event dispatcher died first. +enum WaitResult { + Child(io::Result<process::ExitStatus>), + EventDispatcher(talpid_ipc::Result<()>), +} /// IPC server for listening to events coming from plugin loaded into OpenVPN. @@ -125,8 +162,7 @@ pub struct OpenVpnEventDispatcher { impl OpenVpnEventDispatcher { /// Construct and start the IPC server with the given event listener callback. pub fn start<L>(on_event: L) -> talpid_ipc::Result<Self> - where L: Fn(openvpn_ffi::OpenVpnPluginEvent, openvpn_ffi::OpenVpnEnv), - L: Send + Sync + 'static + where L: Fn(OpenVpnPluginEvent, OpenVpnEnv) + Send + Sync + 'static { let rpc = OpenVpnEventApiImpl { on_event }; let mut io = IoHandler::new(); @@ -140,7 +176,14 @@ impl OpenVpnEventDispatcher { self.server.address() } - /// Consumes the server and waits for it to finish. + /// Creates a handle to this event dispatcher, allowing the listening server to be closed while + /// some other thread is blocked in `wait`. + pub fn close_handle(&self) -> talpid_ipc::CloseHandle { + self.server.close_handle() + } + + /// Consumes the server and waits for it to finish. Returns an error if the server exited + /// due to an error. pub fn wait(self) -> talpid_ipc::Result<()> { self.server.wait() } @@ -153,8 +196,8 @@ mod api { pub trait OpenVpnEventApi { #[rpc(name = "openvpn_event")] fn openvpn_event(&self, - openvpn_ffi::OpenVpnPluginEvent, - openvpn_ffi::OpenVpnEnv) + OpenVpnPluginEvent, + OpenVpnEnv) -> StdResult<(), Error>; } } @@ -162,18 +205,15 @@ mod api { use self::api::*; struct OpenVpnEventApiImpl<L> - where L: Fn(openvpn_ffi::OpenVpnPluginEvent, openvpn_ffi::OpenVpnEnv) + Send + Sync + 'static + where L: Fn(OpenVpnPluginEvent, OpenVpnEnv) + Send + Sync + 'static { on_event: L, } impl<L> OpenVpnEventApi for OpenVpnEventApiImpl<L> - where L: Fn(openvpn_ffi::OpenVpnPluginEvent, openvpn_ffi::OpenVpnEnv) + Send + Sync + 'static + where L: Fn(OpenVpnPluginEvent, OpenVpnEnv) + Send + Sync + 'static { - fn openvpn_event(&self, - event: openvpn_ffi::OpenVpnPluginEvent, - env: openvpn_ffi::OpenVpnEnv) - -> StdResult<(), Error> { + fn openvpn_event(&self, event: OpenVpnPluginEvent, env: OpenVpnEnv) -> StdResult<(), Error> { debug!("OpenVPN event {:?}", event); (self.on_event)(event, env); Ok(()) |
