summaryrefslogtreecommitdiffhomepage
path: root/talpid_core
diff options
context:
space:
mode:
authorLinus Färnstrand <linus@mullvad.net>2017-06-08 13:39:53 +0200
committerLinus Färnstrand <linus@mullvad.net>2017-06-08 13:42:27 +0200
commit630d103f0d41edf9178a05729d8b28531de23a4d (patch)
tree950b327e56376da9c575f3ffcf8e557993691a0a /talpid_core
parent85b477d93760fed42dd2fcfda9cdf3384fe7925d (diff)
downloadmullvadvpn-630d103f0d41edf9178a05729d8b28531de23a4d.tar.xz
mullvadvpn-630d103f0d41edf9178a05729d8b28531de23a4d.zip
Adapt OpenVpnMonitor to the CloseHandle design
Diffstat (limited to 'talpid_core')
-rw-r--r--talpid_core/Cargo.toml4
-rw-r--r--talpid_core/src/tunnel/openvpn.rs202
2 files changed, 123 insertions, 83 deletions
diff --git a/talpid_core/Cargo.toml b/talpid_core/Cargo.toml
index 18c998cf93..6fd7222f27 100644
--- a/talpid_core/Cargo.toml
+++ b/talpid_core/Cargo.toml
@@ -8,8 +8,8 @@ description = "Core backend functionality of the Mullvad VPN client"
duct = "0.8"
error-chain = "0.10"
log = "0.3"
-jsonrpc-core = { git = "https://github.com/faern/jsonrpc", branch = "bind-zero" }
-jsonrpc-macros = { git = "https://github.com/faern/jsonrpc", branch = "bind-zero" }
+jsonrpc-core = { git = "https://github.com/faern/jsonrpc", branch = "ws-close-handle" }
+jsonrpc-macros = { git = "https://github.com/faern/jsonrpc", branch = "ws-close-handle" }
[dependencies.talpid_ipc]
path = "../talpid_ipc"
diff --git a/talpid_core/src/tunnel/openvpn.rs b/talpid_core/src/tunnel/openvpn.rs
index 9d366ebb4a..b805d3563a 100644
--- a/talpid_core/src/tunnel/openvpn.rs
+++ b/talpid_core/src/tunnel/openvpn.rs
@@ -1,30 +1,29 @@
+use duct;
use jsonrpc_core::{Error, IoHandler};
-use openvpn_ffi;
-use process::monitor::ChildMonitor;
+use openvpn_ffi::{OpenVpnEnv, OpenVpnPluginEvent};
use process::openvpn::OpenVpnCommand;
use std::io;
-use std::path::{Path, PathBuf};
+use std::path::Path;
use std::process;
use std::result::Result as StdResult;
-use std::sync::{Arc, Mutex};
+use std::sync::{Arc, mpsc};
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::thread;
use talpid_ipc;
mod errors {
error_chain!{
errors {
- /// The `OpenVpnMonitor` is in an invalid state for the requested operation.
- InvalidState {
- description("Invalid state. OpenVPN is already running")
+ /// Unable to start, wait for or kill the OpenVPN process.
+ ChildProcessError(msg: &'static str) {
+ description("Unable to start, wait for or kill the OpenVPN process")
+ display("OpenVPN process error: {}", msg)
}
- /// Unable to start or kill the OpenVPN process.
- ChildProcessError {
- description("Unable to start or kill the OpenVPN process")
- }
- /// Unable to start or manage the IPC server listening for events from OpenVPN
- IpcServerError {
- description("Unable to start or manage the IPC server")
+ /// Unable to start or manage the IPC server listening for events from OpenVPN.
+ EventDispatcherError {
+ description("Unable to start or manage the event dispatcher IPC server")
}
}
}
@@ -32,89 +31,127 @@ mod errors {
pub use self::errors::*;
-/// Possible events from OpenVPN
-#[derive(Debug)]
-pub enum OpenVpnEvent {
- /// An event from the plugin loaded into OpenVPN.
- PluginEvent(openvpn_ffi::OpenVpnPluginEvent, openvpn_ffi::OpenVpnEnv),
- /// The OpenVPN process exited. Containing the result of waiting for the process.
- Shutdown(io::Result<process::ExitStatus>),
-}
-
-/// Struct for monitoring OpenVPN processes.
+/// Struct for monitoring an OpenVPN process.
pub struct OpenVpnMonitor {
- on_event: Arc<Fn(OpenVpnEvent) + Send + Sync + 'static>,
- plugin_path: PathBuf,
- child: Arc<Mutex<Option<ChildMonitor>>>,
- event_dispatcher: OpenVpnEventDispatcher,
+ child: Arc<duct::Handle>,
+ event_dispatcher: Option<OpenVpnEventDispatcher>,
+ closed: Arc<AtomicBool>,
}
impl OpenVpnMonitor {
/// Creates a new `OpenVpnMonitor` with the given listener and using the plugin at the given
/// path.
- pub fn new<L, P>(on_event: L, plugin_path: P) -> Result<Self>
- where L: Fn(OpenVpnEvent) + Send + Sync + 'static,
+ pub fn new<L, P>(mut cmd: OpenVpnCommand, on_event: L, plugin_path: P) -> Result<Self>
+ where L: Fn(OpenVpnPluginEvent, OpenVpnEnv) + Send + Sync + 'static,
P: AsRef<Path>
{
- let on_event = Arc::new(on_event);
- let event_dispatcher = Self::start_event_dispatcher(on_event.clone())?;
+ let event_dispatcher = OpenVpnEventDispatcher::start(on_event)
+ .chain_err(|| ErrorKind::EventDispatcherError)?;
+
+ cmd.plugin(plugin_path, vec![event_dispatcher.address().to_owned()]);
+ let child = cmd.build()
+ .start()
+ .chain_err(|| ErrorKind::ChildProcessError("Failed to start"))?;
+
Ok(
OpenVpnMonitor {
- on_event,
- plugin_path: plugin_path.as_ref().to_owned(),
- child: Arc::new(Mutex::new(None)),
- event_dispatcher,
+ child: Arc::new(child),
+ event_dispatcher: Some(event_dispatcher),
+ closed: Arc::new(AtomicBool::new(false)),
},
)
}
- fn start_event_dispatcher(on_event: Arc<Fn(OpenVpnEvent) + Send + Sync + 'static>)
- -> Result<OpenVpnEventDispatcher> {
- let on_plugin_event = move |event, env| (*on_event)(OpenVpnEvent::PluginEvent(event, env));
- OpenVpnEventDispatcher::start(on_plugin_event).chain_err(|| ErrorKind::IpcServerError)
+ /// Creates a handle to this monitor, allowing the tunnel to be closed while some other thread
+ /// is blocked in `wait`.
+ pub fn close_handle(&self) -> OpenVpnCloseHandle {
+ OpenVpnCloseHandle {
+ child: self.child.clone(),
+ closed: self.closed.clone(),
+ }
}
- /// Tries to start a new OpenVPN process if one is not already running.
- /// If this `OpenVpnMonitor is already monitoring a running process it will return an
- /// `InvalidState` error.
- pub fn start(&self, cmd: OpenVpnCommand) -> Result<()> {
- let mut child_lock = self.child.lock().unwrap();
- if child_lock.is_some() {
- bail!(ErrorKind::InvalidState);
+ /// Consumes the monitor and blocks until OpenVPN exits or there is an error in either waiting
+ /// for the process or in the event dispatcher.
+ pub fn wait(mut self) -> Result<()> {
+ match self.wait_result() {
+ WaitResult::Child(Ok(exit_status)) => {
+ if exit_status.success() || self.closed.load(Ordering::SeqCst) {
+ debug!(
+ "OpenVPN exited, as expected, with exit status: {}",
+ exit_status
+ );
+ Ok(())
+ } else {
+ error!("OpenVPN died unepectedly with status: {}", exit_status);
+ Err(ErrorKind::ChildProcessError("Died unexpectedly").into())
+ }
+ }
+ WaitResult::Child(Err(e)) => {
+ error!("OpenVPN process wait error: {}", e);
+ Err(e).chain_err(|| ErrorKind::ChildProcessError("Error when waiting"))
+ }
+ WaitResult::EventDispatcher(result) => {
+ error!("OpenVpnEventDispatcher exited unexpectedly: {:?}", result);
+ match result {
+ Ok(()) => Err(ErrorKind::EventDispatcherError.into()),
+ Err(e) => Err(e).chain_err(|| ErrorKind::EventDispatcherError),
+ }
+ }
}
- *child_lock = Some(self.start_child_monitor(cmd)?);
- Ok(())
}
- fn start_child_monitor(&self, mut cmd: OpenVpnCommand) -> Result<ChildMonitor> {
- self.set_plugin(&mut cmd);
+ /// Waits for both the child process and the event dispatcher in parallel. After both have
+ /// returned this returns the earliest result.
+ fn wait_result(&mut self) -> WaitResult {
+ let child_wait_handle = self.child.clone();
+ let child_kill_handle = self.child.clone();
+ let event_dispatcher = self.event_dispatcher.take().unwrap();
+ let dispatcher_handle = event_dispatcher.close_handle();
- let child = self.child.clone();
- let on_event = self.on_event.clone();
+ let (child_tx, rx) = mpsc::channel();
+ let dispatcher_tx = child_tx.clone();
- let on_exit = move |exit_status: io::Result<&process::Output>| {
- *child.lock().unwrap() = None;
- (*on_event)(OpenVpnEvent::Shutdown(exit_status.map(|output| output.status)),)
- };
- ChildMonitor::start(&cmd.build(), on_exit).chain_err(|| ErrorKind::ChildProcessError)
- }
+ thread::spawn(
+ move || {
+ let result = child_wait_handle.wait().map(|output| output.status);
+ child_tx.send(WaitResult::Child(result)).unwrap();
+ dispatcher_handle.close();
+ },
+ );
+ thread::spawn(
+ move || {
+ let result = event_dispatcher.wait();
+ dispatcher_tx.send(WaitResult::EventDispatcher(result)).unwrap();
+ let _ = child_kill_handle.kill();
+ },
+ );
- fn set_plugin(&self, cmd: &mut OpenVpnCommand) {
- let event_dispatcher_address = self.event_dispatcher.address().to_string();
- cmd.plugin(&self.plugin_path, vec![event_dispatcher_address]);
+ let result = rx.recv().unwrap();
+ let _ = rx.recv().unwrap();
+ result
}
+}
- /// Tries to kill the OpenVPN process if it is running. If it is already dead, this does
- /// nothing.
- pub fn kill(&self) -> Result<()> {
- if let Some(ref child) = *self.child.lock().unwrap() {
- child.kill().chain_err(|| ErrorKind::ChildProcessError)?;
- }
- Ok(())
- }
+/// A handle to an `OpenVpnMonitor` for closing it.
+pub struct OpenVpnCloseHandle {
+ child: Arc<duct::Handle>,
+ closed: Arc<AtomicBool>,
}
+impl OpenVpnCloseHandle {
+ /// Kills the underlying OpenVPN process, making the `OpenVpnMonitor::wait` method return.
+ pub fn close(&self) -> io::Result<()> {
+ self.closed.store(true, Ordering::SeqCst);
+ self.child.kill()
+ }
+}
+/// Internal enum to differentiate between if the child process or the event dispatcher died first.
+enum WaitResult {
+ Child(io::Result<process::ExitStatus>),
+ EventDispatcher(talpid_ipc::Result<()>),
+}
/// IPC server for listening to events coming from plugin loaded into OpenVPN.
@@ -125,8 +162,7 @@ pub struct OpenVpnEventDispatcher {
impl OpenVpnEventDispatcher {
/// Construct and start the IPC server with the given event listener callback.
pub fn start<L>(on_event: L) -> talpid_ipc::Result<Self>
- where L: Fn(openvpn_ffi::OpenVpnPluginEvent, openvpn_ffi::OpenVpnEnv),
- L: Send + Sync + 'static
+ where L: Fn(OpenVpnPluginEvent, OpenVpnEnv) + Send + Sync + 'static
{
let rpc = OpenVpnEventApiImpl { on_event };
let mut io = IoHandler::new();
@@ -140,7 +176,14 @@ impl OpenVpnEventDispatcher {
self.server.address()
}
- /// Consumes the server and waits for it to finish.
+ /// Creates a handle to this event dispatcher, allowing the listening server to be closed while
+ /// some other thread is blocked in `wait`.
+ pub fn close_handle(&self) -> talpid_ipc::CloseHandle {
+ self.server.close_handle()
+ }
+
+ /// Consumes the server and waits for it to finish. Returns an error if the server exited
+ /// due to an error.
pub fn wait(self) -> talpid_ipc::Result<()> {
self.server.wait()
}
@@ -153,8 +196,8 @@ mod api {
pub trait OpenVpnEventApi {
#[rpc(name = "openvpn_event")]
fn openvpn_event(&self,
- openvpn_ffi::OpenVpnPluginEvent,
- openvpn_ffi::OpenVpnEnv)
+ OpenVpnPluginEvent,
+ OpenVpnEnv)
-> StdResult<(), Error>;
}
}
@@ -162,18 +205,15 @@ mod api {
use self::api::*;
struct OpenVpnEventApiImpl<L>
- where L: Fn(openvpn_ffi::OpenVpnPluginEvent, openvpn_ffi::OpenVpnEnv) + Send + Sync + 'static
+ where L: Fn(OpenVpnPluginEvent, OpenVpnEnv) + Send + Sync + 'static
{
on_event: L,
}
impl<L> OpenVpnEventApi for OpenVpnEventApiImpl<L>
- where L: Fn(openvpn_ffi::OpenVpnPluginEvent, openvpn_ffi::OpenVpnEnv) + Send + Sync + 'static
+ where L: Fn(OpenVpnPluginEvent, OpenVpnEnv) + Send + Sync + 'static
{
- fn openvpn_event(&self,
- event: openvpn_ffi::OpenVpnPluginEvent,
- env: openvpn_ffi::OpenVpnEnv)
- -> StdResult<(), Error> {
+ fn openvpn_event(&self, event: OpenVpnPluginEvent, env: OpenVpnEnv) -> StdResult<(), Error> {
debug!("OpenVPN event {:?}", event);
(self.on_event)(event, env);
Ok(())