summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorLinus Färnstrand <linus@mullvad.net>2017-03-07 08:41:29 +0100
committerLinus Färnstrand <linus@mullvad.net>2017-03-07 08:41:29 +0100
commitf085642d25c2f91095ce01d324a59b5cee23e8b0 (patch)
tree663b6c7e56f1e561ae90275746eb12d1387cbab3
parent6fbf16b682fe6c399e5c7f5d5f4a8f96a5071093 (diff)
parente878b14618f56b106f42df417582a7468f12c9b8 (diff)
downloadmullvadvpn-f085642d25c2f91095ce01d324a59b5cee23e8b0.tar.xz
mullvadvpn-f085642d25c2f91095ce01d324a59b5cee23e8b0.zip
Merge branch 'openvpn-monitor'
-rw-r--r--Cargo.lock1
-rw-r--r--src/lib.rs2
-rw-r--r--src/process/openvpn.rs86
-rw-r--r--talpid_cli/src/main.rs28
-rw-r--r--talpid_openvpn_plugin/Cargo.toml3
-rw-r--r--talpid_openvpn_plugin/src/ffi/structs.rs2
-rw-r--r--talpid_openvpn_plugin/src/lib.rs32
-rw-r--r--talpid_openvpn_plugin/src/processing.rs30
8 files changed, 154 insertions, 30 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 63d6b95fa8..ab64cc5d51 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -6,6 +6,7 @@ dependencies = [
"env_logger 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
"error-chain 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
+ "talpid_ipc 0.1.0",
]
[[package]]
diff --git a/src/lib.rs b/src/lib.rs
index f8e5e96a46..00c4061801 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -11,6 +11,8 @@ extern crate clonablechild;
#[macro_use]
extern crate error_chain;
+extern crate talpid_ipc;
+
/// Working with processes.
pub mod process;
diff --git a/src/process/openvpn.rs b/src/process/openvpn.rs
index 65f87064ff..2d25deb965 100644
--- a/src/process/openvpn.rs
+++ b/src/process/openvpn.rs
@@ -1,14 +1,30 @@
-use super::monitor::ChildSpawner;
+use super::monitor::{ChildSpawner, ChildMonitor};
use clonablechild::{ClonableChild, ChildExt};
use net::{RemoteAddr, ToRemoteAddrs};
+use std::collections::HashMap;
use std::ffi::{OsString, OsStr};
use std::fmt;
use std::io;
+use std::ops::DerefMut;
use std::path::{Path, PathBuf};
-use std::process::{Command, Child, Stdio};
+use std::process::{Command, Child, Stdio, ChildStdout, ChildStderr};
+use std::sync::{Arc, Mutex};
+
+use talpid_ipc;
+
+error_chain!{
+ errors {
+ /// Error while communicating with the OpenVPN plugin
+ PluginCommunicationError
+ }
+ links {
+ ChildMonitorError(::process::monitor::Error, ::process::monitor::ErrorKind)
+ #[doc="Something went wrong in the underlying ChildMonitor"];
+ }
+}
/// An OpenVPN process builder, providing control over the different arguments that the OpenVPN
/// binary accepts.
@@ -141,6 +157,72 @@ impl ChildSpawner for OpenVpnCommand {
}
+/// Possible events from OpenVPN
+pub enum OpenVpnEvent {
+ /// An event from the plugin loaded into OpenVPN.
+ PluginEvent(Result<HashMap<String, String>>),
+ /// The OpenVPN process exited. The bool indicates if the process exited cleanly.
+ Shutdown(bool),
+}
+
+/// A struct able to start and monitor OpenVPN processes.
+pub struct OpenVpnMonitor {
+ command: OpenVpnCommand,
+ monitor: ChildMonitor<OpenVpnCommand>,
+}
+
+impl OpenVpnMonitor {
+ /// Creates a new `OpenVpnMonitor` based on the given command
+ pub fn new(command: OpenVpnCommand) -> Self {
+ OpenVpnMonitor {
+ command: command.clone(),
+ monitor: ChildMonitor::new(command),
+ }
+ }
+
+ /// Starts OpenVPN and begins to monitor it.
+ pub fn start<L>(&mut self, listener: L) -> Result<(Option<ChildStdout>, Option<ChildStderr>)>
+ where L: FnMut(OpenVpnEvent) + Send + 'static
+ {
+ let shared_listener = Arc::new(Mutex::new(listener));
+ self.start_plugin_listener(shared_listener.clone())?;
+ self.start_child_monitor(shared_listener)
+ }
+
+ fn start_plugin_listener<L>(&mut self, shared_listener: Arc<Mutex<L>>) -> Result<()>
+ where L: FnMut(OpenVpnEvent) + Send + 'static
+ {
+ let server_id = talpid_ipc::start_new_server(move |msg| {
+ let chained_msg = msg.chain_err(|| ErrorKind::PluginCommunicationError);
+ let mut listener = shared_listener.lock().unwrap();
+ (listener.deref_mut())(OpenVpnEvent::PluginEvent(chained_msg));
+ }).chain_err(|| ErrorKind::PluginCommunicationError)?;
+ self.command.plugin("./target/debug/libtalpid_openvpn_plugin.so",
+ vec![server_id]);
+ Ok(())
+ }
+
+ fn start_child_monitor<L>(&mut self,
+ shared_listener: Arc<Mutex<L>>)
+ -> Result<(Option<ChildStdout>, Option<ChildStderr>)>
+ where L: FnMut(OpenVpnEvent) + Send + 'static
+ {
+ let callback = move |clean_exit| {
+ let mut listener = shared_listener.lock().unwrap();
+ (listener.deref_mut())(OpenVpnEvent::Shutdown(clean_exit));
+ };
+
+ self.monitor = ChildMonitor::new(self.command.clone());
+ Ok(self.monitor.start(callback)?)
+ }
+
+ /// Forwards a stop call to the underlying `ChildMonitor`.
+ pub fn stop(&self) -> Result<()> {
+ Ok(self.monitor.stop()?)
+ }
+}
+
+
#[cfg(test)]
mod tests {
use super::OpenVpnCommand;
diff --git a/talpid_cli/src/main.rs b/talpid_cli/src/main.rs
index 552e47bdc7..b6ce7654ff 100644
--- a/talpid_cli/src/main.rs
+++ b/talpid_cli/src/main.rs
@@ -11,8 +11,7 @@ use std::io::{self, Read, Write};
use std::sync::mpsc::{self, Receiver};
use std::thread;
-use talpid_core::process::monitor::{ChildMonitor, ChildSpawner};
-use talpid_core::process::openvpn::OpenVpnCommand;
+use talpid_core::process::openvpn::{OpenVpnCommand, OpenVpnEvent, OpenVpnMonitor};
mod cli;
@@ -21,7 +20,7 @@ use cli::Args;
error_chain! {
links {
- Monitor(talpid_core::process::monitor::Error, talpid_core::process::monitor::ErrorKind);
+ Monitor(talpid_core::process::openvpn::Error, talpid_core::process::openvpn::ErrorKind);
}
}
@@ -42,7 +41,7 @@ fn main() {
fn run() -> Result<()> {
let args = cli::parse_args_or_exit();
let command = create_openvpn_command(&args);
- let monitor = ChildMonitor::new(command);
+ let monitor = OpenVpnMonitor::new(command);
main_loop(monitor)
}
@@ -56,20 +55,25 @@ fn create_openvpn_command(args: &Args) -> OpenVpnCommand {
command
}
-fn main_loop<S>(mut monitor: ChildMonitor<S>) -> Result<()>
- where S: ChildSpawner
-{
+fn main_loop(mut monitor: OpenVpnMonitor) -> Result<()> {
loop {
let rx = start_monitor(&mut monitor).chain_err(|| "Unable to start OpenVPN")?;
- let clean_exit = rx.recv().unwrap();
- println!("Monitored process exited. clean: {}", clean_exit);
+ while let Ok(msg) = rx.recv() {
+ match msg {
+ OpenVpnEvent::Shutdown(clean) => {
+ println!("Monitored process exited. clean: {}", clean);
+ break;
+ }
+ OpenVpnEvent::PluginEvent(env) => {
+ println!("OpenVPN event with env:\n{:?}", env);
+ }
+ }
+ }
std::thread::sleep(std::time::Duration::from_millis(500));
}
}
-fn start_monitor<S>(monitor: &mut ChildMonitor<S>) -> Result<Receiver<bool>>
- where S: ChildSpawner
-{
+fn start_monitor(monitor: &mut OpenVpnMonitor) -> Result<Receiver<OpenVpnEvent>> {
let (tx, rx) = mpsc::channel();
let callback = move |clean| tx.send(clean).unwrap();
Ok(monitor.start(callback)
diff --git a/talpid_openvpn_plugin/Cargo.toml b/talpid_openvpn_plugin/Cargo.toml
index 79c0f387cb..ca870cb1cb 100644
--- a/talpid_openvpn_plugin/Cargo.toml
+++ b/talpid_openvpn_plugin/Cargo.toml
@@ -12,5 +12,8 @@ error-chain = "0.8"
log = "0.3"
env_logger = "0.4"
+[dependencies.talpid_ipc]
+path = "../talpid_ipc"
+
[dev-dependencies]
assert_matches = "1.0"
diff --git a/talpid_openvpn_plugin/src/ffi/structs.rs b/talpid_openvpn_plugin/src/ffi/structs.rs
index ba5bc2b343..b80f7ae2a2 100644
--- a/talpid_openvpn_plugin/src/ffi/structs.rs
+++ b/talpid_openvpn_plugin/src/ffi/structs.rs
@@ -4,7 +4,7 @@ use std::os::raw::{c_char, c_int, c_uint, c_void};
#[repr(C)]
pub struct openvpn_plugin_args_open_in {
type_mask: c_int,
- argv: *const *const c_char,
+ pub argv: *const *const c_char,
envp: *const *const c_char,
callbacks: *const c_void,
ssl_api: ovpnSSLAPI,
diff --git a/talpid_openvpn_plugin/src/lib.rs b/talpid_openvpn_plugin/src/lib.rs
index 4eb2a0f336..58343de791 100644
--- a/talpid_openvpn_plugin/src/lib.rs
+++ b/talpid_openvpn_plugin/src/lib.rs
@@ -9,6 +9,8 @@ extern crate env_logger;
#[macro_use]
extern crate assert_matches;
+extern crate talpid_ipc;
+
use std::os::raw::{c_int, c_void};
mod ffi;
@@ -29,6 +31,12 @@ error_chain!{
ParseEnvFailed {
description("Unable to parse environment variables from OpenVPN")
}
+ ParseArgsFailed {
+ description("Unable to parse arguments from OpenVPN")
+ }
+ EventProcessingFailed {
+ description("Failed to process the event")
+ }
}
}
@@ -45,13 +53,13 @@ pub static INTERESTING_EVENTS: &'static [OpenVpnPluginEvent] = &[OpenVpnPluginEv
/// plugin.
#[no_mangle]
pub extern "C" fn openvpn_plugin_open_v3(_version: c_int,
- _args: *const ffi::openvpn_plugin_args_open_in,
+ args: *const ffi::openvpn_plugin_args_open_in,
retptr: *mut ffi::openvpn_plugin_args_open_return)
-> c_int {
if init_logger().is_err() {
return ffi::OPENVPN_PLUGIN_FUNC_ERROR;
}
- match openvpn_plugin_open_v3_internal(retptr) {
+ match openvpn_plugin_open_v3_internal(args, retptr) {
Ok(_) => ffi::OPENVPN_PLUGIN_FUNC_SUCCESS,
Err(e) => {
log_error("Unable to initialize plugin", &e);
@@ -60,19 +68,31 @@ pub extern "C" fn openvpn_plugin_open_v3(_version: c_int,
}
}
-fn openvpn_plugin_open_v3_internal(retptr: *mut ffi::openvpn_plugin_args_open_return)
+fn openvpn_plugin_open_v3_internal(args: *const ffi::openvpn_plugin_args_open_in,
+ retptr: *mut ffi::openvpn_plugin_args_open_return)
-> Result<()> {
debug!("Initializing plugin");
- let handle = Box::new(EventProcessor::new().chain_err(|| ErrorKind::InitHandleFailed)?);
+ let core_server_id = parse_args(args)?;
+ let processor = EventProcessor::new(core_server_id).chain_err(|| ErrorKind::InitHandleFailed)?;
unsafe {
(*retptr).type_mask = ffi::events_to_bitmask(INTERESTING_EVENTS);
// Converting the handle into a raw pointer will make it escape Rust deallocation. See
// `openvpn_plugin_close_v1` for deallocation.
- (*retptr).handle = Box::into_raw(handle) as *const c_void;
+ (*retptr).handle = Box::into_raw(Box::new(processor)) as *const c_void;
}
Ok(())
}
+fn parse_args(args: *const ffi::openvpn_plugin_args_open_in) -> Result<talpid_ipc::IpcServerId> {
+ let mut args_iter = unsafe { ffi::parse::string_array((*args).argv) }
+ .chain_err(|| ErrorKind::ParseArgsFailed)?
+ .into_iter();
+ let _plugin_path = args_iter.next();
+ let core_server_id: String = args_iter.next()
+ .ok_or(ErrorKind::Msg("No core server id given as first argument".to_owned()))?;
+ Ok(core_server_id)
+}
+
/// Called by OpenVPN just before the plugin is unloaded. Should correctly close the plugin and
/// deallocate any `handle` initialized by the plugin in `openvpn_plugin_open_v3`
@@ -108,7 +128,7 @@ fn openvpn_plugin_func_v3_internal(args: *const ffi::openvpn_plugin_args_func_in
let env = unsafe { ffi::parse::env((*args).envp) }.chain_err(|| ErrorKind::ParseEnvFailed)?;
let mut handle = unsafe { Box::from_raw((*args).handle as *mut EventProcessor) };
- handle.process_event(event, env);
+ handle.process_event(event, env).chain_err(|| ErrorKind::EventProcessingFailed)?;
// Convert the handle back to a raw pointer to not deallocate it when we return.
Box::into_raw(handle);
diff --git a/talpid_openvpn_plugin/src/processing.rs b/talpid_openvpn_plugin/src/processing.rs
index 337030744e..586d0574ec 100644
--- a/talpid_openvpn_plugin/src/processing.rs
+++ b/talpid_openvpn_plugin/src/processing.rs
@@ -1,24 +1,36 @@
-
-
use ffi::OpenVpnPluginEvent;
+
use std::collections::HashMap;
+use talpid_ipc::{IpcClient, IpcServerId};
-error_chain!{}
+error_chain! {
+ errors {
+ IpcSendingError {
+ description("Failed while sending an event over the IPC channel")
+ }
+ }
+}
/// Struct processing OpenVPN events and notifies listeners over IPC
-pub struct EventProcessor;
+pub struct EventProcessor {
+ ipc_client: IpcClient<HashMap<String, String>>,
+}
impl EventProcessor {
- pub fn new() -> Result<EventProcessor> {
+ pub fn new(server_id: IpcServerId) -> Result<EventProcessor> {
debug!("Creating EventProcessor");
- Ok(EventProcessor)
+ let ipc_client = IpcClient::new(server_id);
+ Ok(EventProcessor { ipc_client: ipc_client })
}
- pub fn process_event(&mut self, event: OpenVpnPluginEvent, _env: HashMap<String, String>) {
- // TODO(linus): This is where we should send events to core.
- trace!("Hello from EventProcessor: {:?}", event);
+ pub fn process_event(&mut self,
+ event: OpenVpnPluginEvent,
+ env: HashMap<String, String>)
+ -> Result<()> {
+ trace!("Processing \"{:?}\" event", event);
+ self.ipc_client.send(&env).chain_err(|| ErrorKind::IpcSendingError)
}
}