summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorLinus Färnstrand <linus@mullvad.net>2018-08-07 15:21:08 +0200
committerEmīls Piņķis <emils@mullvad.net>2018-08-29 16:27:51 +0100
commit3b62a06e9bfec75043888fa58c4ccba17689be4c (patch)
treec677ab18ab5abea0cb98aa805e78d8cbb746d498
parent559a81c6044d620159d8229295bee3b2e909eca4 (diff)
downloadmullvadvpn-3b62a06e9bfec75043888fa58c4ccba17689be4c.tar.xz
mullvadvpn-3b62a06e9bfec75043888fa58c4ccba17689be4c.zip
Use IPC instead of a websocket to communicate between daemon and plugin
-rw-r--r--mullvad-daemon/Cargo.toml9
-rw-r--r--mullvad-daemon/src/main.rs25
-rw-r--r--mullvad-daemon/src/management_interface.rs41
-rw-r--r--mullvad-daemon/src/relays.rs9
-rw-r--r--mullvad-daemon/src/rpc_uniqueness_check.rs4
-rw-r--r--talpid-core/Cargo.toml4
-rw-r--r--talpid-core/src/tunnel/openvpn.rs102
-rw-r--r--talpid-ipc/Cargo.toml13
-rw-r--r--talpid-ipc/src/client.rs585
-rw-r--r--talpid-ipc/src/lib.rs47
-rw-r--r--talpid-ipc/tests/ipc-client-server.rs54
-rw-r--r--talpid-openvpn-plugin/Cargo.toml5
-rw-r--r--talpid-openvpn-plugin/src/lib.rs55
-rw-r--r--talpid-openvpn-plugin/src/processing.rs71
14 files changed, 242 insertions, 782 deletions
diff --git a/mullvad-daemon/Cargo.toml b/mullvad-daemon/Cargo.toml
index bd6add0daf..9f491e2dd4 100644
--- a/mullvad-daemon/Cargo.toml
+++ b/mullvad-daemon/Cargo.toml
@@ -15,10 +15,11 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
log = "0.4"
log-panics = "2.0.0"
-jsonrpc-core = { git = "https://github.com/paritytech/jsonrpc", tag = "v8.0.1" }
-jsonrpc-macros = { git = "https://github.com/paritytech/jsonrpc", tag = "v8.0.1" }
-jsonrpc-pubsub = { git = "https://github.com/paritytech/jsonrpc", tag = "v8.0.1" }
-jsonrpc-ws-server = { git = "https://github.com/paritytech/jsonrpc", tag = "v8.0.1" }
+jsonrpc-core = { git = "https://github.com/paritytech/jsonrpc", branch = "master" }
+jsonrpc-macros = { git = "https://github.com/paritytech/jsonrpc", branch = "master" }
+jsonrpc-pubsub = { git = "https://github.com/paritytech/jsonrpc", branch = "master" }
+jsonrpc-ws-server = { git = "https://github.com/paritytech/jsonrpc", branch = "master" }
+jsonrpc-ipc-server = { git = "https://github.com/paritytech/jsonrpc", branch = "master" }
uuid = { version = "0.6", features = ["v4"] }
lazy_static = "1.0"
rand = "0.5"
diff --git a/mullvad-daemon/src/main.rs b/mullvad-daemon/src/main.rs
index 848207fbb1..a859baec4c 100644
--- a/mullvad-daemon/src/main.rs
+++ b/mullvad-daemon/src/main.rs
@@ -25,6 +25,7 @@ extern crate serde_json;
extern crate jsonrpc_core;
#[macro_use]
extern crate jsonrpc_macros;
+extern crate jsonrpc_ipc_server;
extern crate jsonrpc_pubsub;
extern crate jsonrpc_ws_server;
extern crate rand;
@@ -125,8 +126,9 @@ pub enum DaemonEvent {
TunnelStateTransition(TunnelStateTransition),
/// An event coming from the JSONRPC-2.0 management interface.
ManagementInterfaceEvent(ManagementCommand),
+ // TODO(emilsp): try and get an error from the management interface
/// Triggered if the server hosting the JSONRPC-2.0 management interface dies unexpectedly.
- ManagementInterfaceExited(talpid_ipc::Result<()>),
+ ManagementInterfaceExited,
/// Daemon shutdown triggered by a signal, ctrl-c or similar.
TriggerShutdown,
}
@@ -308,7 +310,7 @@ impl Daemon {
thread::spawn(move || {
let result = server.wait();
error!("Mullvad management interface shut down");
- let _ = exit_tx.send(DaemonEvent::ManagementInterfaceExited(result));
+ let _ = exit_tx.send(DaemonEvent::ManagementInterfaceExited);
});
}
@@ -333,7 +335,7 @@ impl Daemon {
match event {
TunnelStateTransition(transition) => self.handle_tunnel_state_transition(transition),
ManagementInterfaceEvent(event) => self.handle_management_interface_event(event),
- ManagementInterfaceExited(result) => self.handle_management_interface_exited(result),
+ ManagementInterfaceExited => self.handle_management_interface_exited(),
TriggerShutdown => self.handle_trigger_shutdown_event(),
}
}
@@ -599,12 +601,17 @@ impl Daemon {
}
}
- fn handle_management_interface_exited(&self, result: talpid_ipc::Result<()>) -> Result<()> {
- let error = ErrorKind::ManagementInterfaceError("Server exited unexpectedly");
- match result {
- Ok(()) => Err(error.into()),
- Err(e) => Err(e).chain_err(|| error),
- }
+ // TODO: (emilsp) fix this
+ // fn handle_management_interface_exited(&self, result: talpid_ipc_ws::Result<()>) ->
+ // Result<()> { let error = ErrorKind::ManagementInterfaceError("Server exited
+ // unexpectedly"); match result {
+ // Ok(()) => Err(error.into()),
+ // Err(e) => Err(e).chain_err(|| error),
+ // }
+ // }
+
+ fn handle_management_interface_exited(&self) -> Result<()> {
+ Err(ErrorKind::ManagementInterfaceError("Server exited unexpectedly").into())
}
fn handle_trigger_shutdown_event(&mut self) -> Result<()> {
diff --git a/mullvad-daemon/src/management_interface.rs b/mullvad-daemon/src/management_interface.rs
index 3f2d920180..bb2e540cc1 100644
--- a/mullvad-daemon/src/management_interface.rs
+++ b/mullvad-daemon/src/management_interface.rs
@@ -4,13 +4,14 @@ use error_chain::ChainedError;
use jsonrpc_core::futures::sync::oneshot::Sender as OneshotSender;
use jsonrpc_core::futures::{future, sync, Future};
use jsonrpc_core::{Error, ErrorCode, MetaIoHandler, Metadata};
+use jsonrpc_ipc_server;
use jsonrpc_macros::pubsub;
use jsonrpc_pubsub::{PubSubHandler, PubSubMetadata, Session, SubscriptionId};
-use jsonrpc_ws_server;
use mullvad_rpc;
use mullvad_types::account::{AccountData, AccountToken};
use mullvad_types::location::GeoIpLocation;
+use mullvad_paths;
use mullvad_types::relay_constraints::{RelaySettings, RelaySettingsUpdate};
use mullvad_types::relay_list::RelayList;
use mullvad_types::states::{DaemonState, TargetState};
@@ -237,7 +238,12 @@ impl ManagementInterfaceServer {
let mut io = PubSubHandler::default();
io.extend_with(rpc.to_delegate());
let meta_io: MetaIoHandler<Meta> = io.into();
- let server = talpid_ipc::IpcServer::start_with_metadata(meta_io, meta_extractor)?;
+ let path = mullvad_paths::get_rpc_socket_path();
+ let server = talpid_ipc::IpcServer::start_with_metadata(
+ meta_io,
+ meta_extractor,
+ path.to_string_lossy().to_string(),
+ )?;
Ok(ManagementInterfaceServer {
server,
subscriptions,
@@ -245,7 +251,7 @@ impl ManagementInterfaceServer {
}
pub fn address(&self) -> &str {
- self.server.address()
+ self.server.path()
}
pub fn event_broadcaster(&self) -> EventBroadcaster {
@@ -256,11 +262,19 @@ impl ManagementInterfaceServer {
/// Consumes the server and waits for it to finish. Returns an error if the server exited
/// due to an error.
- pub fn wait(self) -> talpid_ipc::Result<()> {
+ pub fn wait(self) {
self.server.wait()
}
}
+fn ipc_path() -> String {
+ if cfg!(windows) {
+ "//./pipe/mullvad_daemon_socket".to_owned()
+ } else {
+ "/tmp/mullvad_daemon_socket".to_owned()
+ }
+}
+
/// A handle that allows broadcasting messages to all subscribers of the management interface.
pub struct EventBroadcaster {
@@ -380,13 +394,14 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterface<T> {
}
fn check_auth(&self, meta: &Meta) -> Result<(), Error> {
- if meta.authenticated.load(Ordering::SeqCst) {
- trace!("auth success");
- Ok(())
- } else {
- trace!("auth failed");
- Err(Error::invalid_request())
- }
+ Ok(())
+ // if meta.authenticated.load(Ordering::SeqCst) {
+ // trace!("auth success");
+ // Ok(())
+ // } else {
+ // trace!("auth failed");
+ // Err(Error::invalid_request())
+ // }
}
fn load_history(&self) -> Result<AccountHistory, AccountHistoryError> {
@@ -751,9 +766,9 @@ impl PubSubMetadata for Meta {
}
/// Metadata extractor function for `Meta`.
-fn meta_extractor(context: &jsonrpc_ws_server::RequestContext) -> Meta {
+fn meta_extractor(context: &jsonrpc_ipc_server::RequestContext) -> Meta {
Meta {
- session: Some(Arc::new(Session::new(context.sender()))),
+ session: Some(Arc::new(Session::new(context.sender.clone()))),
authenticated: Arc::new(AtomicBool::new(false)),
}
}
diff --git a/mullvad-daemon/src/relays.rs b/mullvad-daemon/src/relays.rs
index 4b0b07217e..28edaff9cc 100644
--- a/mullvad-daemon/src/relays.rs
+++ b/mullvad-daemon/src/relays.rs
@@ -24,15 +24,18 @@ use rand::{self, Rng, ThreadRng};
use tokio_timer::{TimeoutError, Timer};
const RELAYS_FILENAME: &str = "relays.json";
-const DOWNLOAD_TIMEOUT: Duration = Duration::from_secs(15);
-const UPDATE_INTERVAL: Duration = Duration::from_secs(60 * 60);
+const DOWNLOAD_TIMEOUT: Duration = Duration::from_secs(30);
+const UPDATE_INTERVAL: Duration = Duration::from_secs(15);
const MAX_CACHE_AGE: Duration = Duration::from_secs(60 * 60 * 24);
error_chain! {
errors {
RelayCacheError { description("Error with relay cache on disk") }
DownloadError { description("Error when trying to download the list of relays") }
- DownloadTimeoutError { description("Timed out when trying to download the list of relays") }
+ DownloadTimeoutError(error: tokio_timer::Error) {
+ display("Timer error - {}", error)
+ description("Timed out when trying to download the list of relays")
+ }
NoRelay { description("No relays matching current constraints") }
SerializationError { description("Error in serialization of relaylist") }
}
diff --git a/mullvad-daemon/src/rpc_uniqueness_check.rs b/mullvad-daemon/src/rpc_uniqueness_check.rs
index 76130f72e8..2766aa9199 100644
--- a/mullvad-daemon/src/rpc_uniqueness_check.rs
+++ b/mullvad-daemon/src/rpc_uniqueness_check.rs
@@ -1,7 +1,7 @@
use error_chain::ChainedError;
use log::Level;
-use mullvad_ipc_client::DaemonRpcClient;
+use mullvad_ipc_client::new_standalone_ipc_client;
/// Checks if there is another instance of the daemon running.
@@ -9,7 +9,7 @@ use mullvad_ipc_client::DaemonRpcClient;
/// Tries to connect to another daemon and perform a simple RPC call. If it fails, assumes the
/// other daemon has stopped.
pub fn is_another_instance_running() -> bool {
- match DaemonRpcClient::new() {
+ match new_standalone_ipc_client() {
Ok(_) => true,
Err(error) => {
let msg =
diff --git a/talpid-core/Cargo.toml b/talpid-core/Cargo.toml
index 52297f214d..23df76dfd8 100644
--- a/talpid-core/Cargo.toml
+++ b/talpid-core/Cargo.toml
@@ -10,8 +10,8 @@ atty = "0.2"
duct = "0.11"
error-chain = "0.12"
futures = "0.1"
-jsonrpc-core = { git = "https://github.com/paritytech/jsonrpc", tag = "v8.0.1" }
-jsonrpc-macros = { git = "https://github.com/paritytech/jsonrpc", tag = "v8.0.1" }
+jsonrpc-core = { git = "https://github.com/paritytech/jsonrpc", branch = "master" }
+jsonrpc-macros = { git = "https://github.com/paritytech/jsonrpc", branch = "master" }
libc = "0.2.20"
log = "0.4"
openvpn-plugin = { version = "0.3", features = ["serde"] }
diff --git a/talpid-core/src/tunnel/openvpn.rs b/talpid-core/src/tunnel/openvpn.rs
index c63429a4f5..8e89ed4a5c 100644
--- a/talpid-core/src/tunnel/openvpn.rs
+++ b/talpid-core/src/tunnel/openvpn.rs
@@ -12,7 +12,6 @@ use std::thread;
use std::time::Duration;
use talpid_ipc;
-use uuid;
mod errors {
error_chain!{
@@ -60,14 +59,10 @@ impl<C: OpenVpnBuilder> OpenVpnMonitor<C> {
L: Fn(OpenVpnPluginEvent, HashMap<String, String>) + Send + Sync + 'static,
P: AsRef<Path>,
{
- let credentials = uuid::Uuid::new_v4().to_string();
- let event_dispatcher = event_server::start(credentials.clone(), on_event)
- .chain_err(|| ErrorKind::EventDispatcherError)?;
+ let event_dispatcher =
+ event_server::start(on_event).chain_err(|| ErrorKind::EventDispatcherError)?;
- cmd.plugin(
- plugin_path,
- vec![event_dispatcher.address().to_owned(), credentials],
- );
+ cmd.plugin(plugin_path, vec![event_dispatcher.path().to_owned()]);
let child = cmd
.start()
.chain_err(|| ErrorKind::ChildProcessError("Failed to start"))?;
@@ -108,12 +103,9 @@ impl<C: OpenVpnBuilder> OpenVpnMonitor<C> {
error!("OpenVPN process wait error: {}", e);
Err(e).chain_err(|| ErrorKind::ChildProcessError("Error when waiting"))
}
- WaitResult::EventDispatcher(result) => {
- error!("OpenVPN Event server exited unexpectedly: {:?}", result);
- match result {
- Ok(()) => Err(ErrorKind::EventDispatcherError.into()),
- Err(e) => Err(e).chain_err(|| ErrorKind::EventDispatcherError),
- }
+ WaitResult::EventDispatcher => {
+ error!("OpenVPN Event server exited unexpectedly");
+ Err(ErrorKind::EventDispatcherError.into())
}
}
}
@@ -137,10 +129,8 @@ impl<C: OpenVpnBuilder> OpenVpnMonitor<C> {
dispatcher_handle.close();
});
thread::spawn(move || {
- let result = event_dispatcher.wait();
- dispatcher_tx
- .send(WaitResult::EventDispatcher(result))
- .unwrap();
+ event_dispatcher.wait();
+ dispatcher_tx.send(WaitResult::EventDispatcher).unwrap();
let _ = child_close_handle.close();
});
@@ -172,7 +162,7 @@ impl<H: ProcessHandle> OpenVpnCloseHandle<H> {
#[derive(Debug)]
enum WaitResult {
Child(io::Result<ExitStatus>, bool),
- EventDispatcher(talpid_ipc::Result<()>),
+ EventDispatcher,
}
/// Trait for types acting as OpenVPN process starters for `OpenVpnMonitor`.
@@ -221,41 +211,39 @@ impl ProcessHandle for OpenVpnProcHandle {
mod event_server {
use super::OpenVpnPluginEvent;
- use jsonrpc_core::{Compatibility, Error, MetaIoHandler, Metadata};
+ use jsonrpc_core::{Error, IoHandler, MetaIoHandler};
use std::collections::HashMap;
- use std::sync::atomic::{AtomicBool, Ordering};
- use std::sync::Arc;
use talpid_ipc;
+ use uuid;
/// Construct and start the IPC server with the given event listener callback.
- pub fn start<L>(credentials: String, on_event: L) -> talpid_ipc::Result<talpid_ipc::IpcServer>
+ pub fn start<L>(on_event: L) -> talpid_ipc::Result<talpid_ipc::IpcServer>
where
L: Fn(OpenVpnPluginEvent, HashMap<String, String>) + Send + Sync + 'static,
{
- let rpc = OpenVpnEventApiImpl {
- credentials,
- on_event,
+ let uuid = uuid::Uuid::new_v4().to_string();
+ let ipc_path = if cfg!(windows) {
+ format!("//./pipe/talpid-openvpn-{}", uuid)
+ } else {
+ format!("/tmp/talpid-openvpn-{}", uuid)
};
- let mut io = MetaIoHandler::with_compatibility(Compatibility::V2);
+ let rpc = OpenVpnEventApiImpl { on_event };
+ let mut io = IoHandler::new();
io.extend_with(rpc.to_delegate());
- talpid_ipc::IpcServer::start(io.into())
+ let meta_io: MetaIoHandler<()> = MetaIoHandler::from(io);
+ talpid_ipc::IpcServer::start(meta_io, ipc_path)
}
build_rpc_trait! {
pub trait OpenVpnEventApi {
- type Metadata;
-
- #[rpc(meta, name = "authenticate")]
- fn authenticate(&self, Self::Metadata, String) -> Result<bool, Error>;
- #[rpc(meta, name = "openvpn_event")]
- fn openvpn_event(&self, Self::Metadata, OpenVpnPluginEvent, HashMap<String, String>)
+ #[rpc(name = "openvpn_event")]
+ fn openvpn_event(&self, OpenVpnPluginEvent, HashMap<String, String>)
-> Result<(), Error>;
}
}
struct OpenVpnEventApiImpl<L> {
- credentials: String,
on_event: L,
}
@@ -263,60 +251,16 @@ mod event_server {
where
L: Fn(OpenVpnPluginEvent, HashMap<String, String>) + Send + Sync + 'static,
{
- type Metadata = Meta;
-
- fn authenticate(
- &self,
- metadata: Self::Metadata,
- credentials: String,
- ) -> Result<bool, Error> {
- if credentials == self.credentials {
- metadata.authenticated.store(true, Ordering::Relaxed);
- Ok(true)
- } else {
- Ok(false)
- }
- }
-
fn openvpn_event(
&self,
- metadata: Self::Metadata,
event: OpenVpnPluginEvent,
env: HashMap<String, String>,
) -> Result<(), Error> {
trace!("OpenVPN event {:?}", event);
- metadata.check_authentication()?;
(self.on_event)(event, env);
Ok(())
}
}
-
- #[derive(Clone)]
- struct Meta {
- authenticated: Arc<AtomicBool>,
- }
-
- impl Default for Meta {
- fn default() -> Self {
- Meta {
- authenticated: Arc::new(AtomicBool::new(false)),
- }
- }
- }
-
- impl Meta {
- fn check_authentication(&self) -> Result<(), Error> {
- if self.authenticated.load(Ordering::Relaxed) {
- trace!("Authenticated");
- Ok(())
- } else {
- trace!("Not authenticated");
- Err(Error::invalid_request())
- }
- }
- }
-
- impl Metadata for Meta {}
}
diff --git a/talpid-ipc/Cargo.toml b/talpid-ipc/Cargo.toml
index 591b0b1803..80b56f0b4f 100644
--- a/talpid-ipc/Cargo.toml
+++ b/talpid-ipc/Cargo.toml
@@ -10,13 +10,18 @@ error-chain = "0.12"
serde = "1.0"
serde_json = "1.0"
log = "0.4"
-jsonrpc-core = { git = "https://github.com/paritytech/jsonrpc", tag = "v8.0.1" }
-jsonrpc-pubsub = { git = "https://github.com/paritytech/jsonrpc", tag = "v8.0.1" }
-jsonrpc-ws-server = { git = "https://github.com/paritytech/jsonrpc", tag = "v8.0.1" }
+jsonrpc-core = { git = "https://github.com/paritytech/jsonrpc", branch = "master" }
+jsonrpc-pubsub = { git = "https://github.com/paritytech/jsonrpc", branch = "master" }
+jsonrpc-ipc-server = { git = "https://github.com/paritytech/jsonrpc", branch = "master" }
+jsonrpc-client-core = { git = "https://github.com/mullvad/jsonrpc-client-rs" }
+jsonrpc-client-ipc = { git = "https://github.com/mullvad/jsonrpc-client-rs" }
+tokio-core = "0.1"
ws = { git = "https://github.com/tomusdrw/ws-rs" }
url = "1.4"
[dev-dependencies]
assert_matches = "1.0"
env_logger = "0.5"
-jsonrpc-macros = { git = "https://github.com/paritytech/jsonrpc", tag = "v8.0.1" }
+jsonrpc-macros = { git = "https://github.com/paritytech/jsonrpc", branch = "master" }
+uuid = { version = "0.6", features = ["v4"] }
+futures = "0.1.23"
diff --git a/talpid-ipc/src/client.rs b/talpid-ipc/src/client.rs
deleted file mode 100644
index 63c95cc74f..0000000000
--- a/talpid-ipc/src/client.rs
+++ /dev/null
@@ -1,585 +0,0 @@
-use std::collections::HashMap;
-use std::sync::mpsc;
-use std::thread;
-
-use error_chain::ChainedError;
-use jsonrpc_pubsub::SubscriptionId;
-use serde;
-use serde_json::{self, Result as JsonResult, Value as JsonValue};
-use url::Url;
-use ws;
-
-type JsonMap = serde_json::map::Map<String, JsonValue>;
-
-mod errors {
- error_chain! {
- errors {
- ConnectError(details: &'static str) {
- description("Failed to connect to RPC server")
- display("Failed to connect to RPC server: {}", details)
- }
-
- ConnectionHandlerStopped {
- description("The WebSocket connection handler thread has stopped")
- }
-
- ErrorResponse(error_message: String) {
- description("Received an RPC error response")
- display("Received an RPC error response: {}", error_message)
- }
-
- DeserializeResponseError {
- description("Failed to deserialize response")
- }
-
- DeserializeSubscriptionEvent(event: String) {
- description("Failed to deserialize RPC subscription event")
- display("Failed to deserialize RPC subscription event {}", event)
- }
-
- ForwardSubscriptionEvent(event: String) {
- description("Failed to forward RPC subscription event")
- display("Failed to forward RPC subscription event {}", event)
- }
-
- InvalidJsonRpcResponse(details: &'static str) {
- description("Received an invalid JSON-RPC response")
- display("Received an invalid JSON-RPC response: {}", details)
- }
-
- InvalidServerIdUrl(server_id: ::IpcServerId) {
- description("Unable to parse given server ID as a URL")
- display("Unable to parse given server ID as a URL: {}", server_id)
- }
-
- InvalidSubscriptionEvent(details: &'static str) {
- description("Received an invalid JSON-RPC PubSub event")
- display("Received an invalid JSON-RPC PubSub event: {}", details)
- }
-
- InvalidSubscriptionId(raw_id: ::serde_json::Value) {
- description("Received an invalid JSON-RPC subscription ID for subscribe request")
- display(
- "Received an invalid JSON-RPC subscription ID for subscribe request: {}",
- raw_id,
- )
- }
-
- MissingResponse {
- description("No response received")
- }
-
- SendRequestError(method: String) {
- description("Failed to send a request to call a remote JSON-RPC procedure")
- display(
- "Failed to send a request to call the \"{}\" remote JSON-RPC procedure",
- method
- )
- }
-
- SerializeArgumentsError {
- description("Failed to serialize JSON-RPC request arguments")
- }
-
- SerializeSubscriptionId {
- description("Failed to serialize JSON-RPC subscription ID")
- }
-
- UnsubscribeError {
- description("Failed to unsubscribe from a remote event")
- }
-
- WebSocketError {
- description("Error with WebSocket connection")
- }
- }
- }
-}
-pub use self::errors::*;
-
-#[derive(Debug, Eq, PartialEq)]
-pub enum SubscriptionHandlerResult {
- Active,
- Finished,
-}
-
-type SubscriptionHandler = Box<Fn(JsonValue) -> SubscriptionHandlerResult + Send>;
-
-struct ActiveRequest {
- id: i64,
- response_tx: mpsc::Sender<Result<JsonValue>>,
-}
-
-impl ActiveRequest {
- pub fn new(id: i64, response_tx: mpsc::Sender<Result<JsonValue>>) -> Self {
- ActiveRequest { id, response_tx }
- }
-
- pub fn id(&self) -> i64 {
- self.id
- }
-
- pub fn send_response(&mut self, response: Result<JsonValue>) {
- let _ = self.response_tx.send(response);
- }
-}
-
-enum WsIpcCommand {
- Call {
- method: String,
- arguments: JsonValue,
- response_tx: mpsc::Sender<Result<JsonValue>>,
- },
-
- Subscribe {
- id: SubscriptionId,
- handler: SubscriptionHandler,
- unsubscribe_method: String,
- },
-
- Response {
- id: i64,
- result: Result<JsonValue>,
- },
-
- Notification {
- subscription: SubscriptionId,
- event: JsonValue,
- },
-
- Error(Error),
-}
-
-struct Factory {
- connection_tx: mpsc::Sender<WsIpcCommand>,
- sender_tx: mpsc::Sender<ws::Sender>,
-}
-
-impl ws::Factory for Factory {
- type Handler = Handler;
-
- fn connection_made(&mut self, sender: ws::Sender) -> Self::Handler {
- trace!("Connection established");
-
- let _ = self.sender_tx.send(sender);
-
- Handler {
- connection_tx: self.connection_tx.clone(),
- }
- }
-}
-
-
-struct Handler {
- connection_tx: mpsc::Sender<WsIpcCommand>,
-}
-
-impl Handler {
- fn process_message(&mut self, msg: ws::Message) -> Result<()> {
- trace!("WsIpcClient incoming message: {:?}", msg);
- let mut message_json_object = self.parse_message_object(msg)?;
- let response_id = self.parse_response_id(&mut message_json_object)?;
-
- let command = if let Some(id) = response_id {
- let result = self.parse_response_result(message_json_object);
-
- WsIpcCommand::Response { id, result }
- } else {
- let (subscription, event) = self.parse_subscription_event(message_json_object)?;
-
- WsIpcCommand::Notification {
- subscription,
- event,
- }
- };
-
- self.connection_tx
- .send(command)
- .chain_err(|| ErrorKind::ConnectionHandlerStopped)
- }
-
- fn parse_message_object(&self, msg: ws::Message) -> Result<JsonMap> {
- let parsed_json: JsonResult<JsonValue> = match msg {
- ws::Message::Text(s) => serde_json::from_str(&s),
- ws::Message::Binary(b) => serde_json::from_slice(&b),
- };
- let json = parsed_json.chain_err(|| {
- ErrorKind::InvalidJsonRpcResponse("Unable to deserialize ws message as JSON")
- })?;
-
- let mut json_object_map = match json {
- JsonValue::Object(object_map) => object_map,
- _ => bail!(ErrorKind::InvalidJsonRpcResponse(
- "Received response is not a JSON object"
- )),
- };
-
- ensure!(
- json_object_map.remove("jsonrpc") == Some(JsonValue::String("2.0".to_owned())),
- ErrorKind::InvalidJsonRpcResponse("Invalid JSON-RPC version field in response")
- );
-
- Ok(json_object_map)
- }
-
- fn parse_response_id(&self, json_object_map: &mut JsonMap) -> Result<Option<i64>> {
- match json_object_map.remove("id") {
- Some(JsonValue::Number(id)) => id.as_i64().map(Some).ok_or_else(|| {
- ErrorKind::InvalidJsonRpcResponse("Invalid request ID number").into()
- }),
- Some(_) => Err(ErrorKind::InvalidJsonRpcResponse("Invalid request ID value").into()),
- None => Ok(None),
- }
- }
-
- fn parse_response_result(&self, mut json_object_map: JsonMap) -> Result<JsonValue> {
- let result = json_object_map.remove("result");
- let error = json_object_map.remove("error");
-
- match (result, error) {
- (Some(remote_result), None) => Ok(remote_result),
- (None, Some(JsonValue::String(remote_error))) => {
- Err(ErrorKind::ErrorResponse(remote_error).into())
- }
- (None, Some(json_value)) => {
- Err(ErrorKind::ErrorResponse(json_value.to_string()).into())
- }
- (None, None) => Err(ErrorKind::InvalidJsonRpcResponse("Missing RPC result").into()),
- (Some(_), Some(_)) => Err(ErrorKind::InvalidJsonRpcResponse(
- "Response is ambiguous, contains both a successful result and an error",
- ).into()),
- }
- }
-
- fn parse_subscription_event(
- &mut self,
- mut notification: JsonMap,
- ) -> Result<(SubscriptionId, JsonValue)> {
- match notification.remove("params") {
- Some(JsonValue::Object(mut parameters)) => {
- let raw_id = parameters.remove("subscription").ok_or_else(|| {
- ErrorKind::InvalidSubscriptionEvent("Missing subscription ID")
- })?;
- let id = SubscriptionId::parse_value(&raw_id).ok_or_else(|| {
- ErrorKind::InvalidSubscriptionEvent("Invalid subscription ID")
- })?;
- let event = parameters
- .remove("result")
- .ok_or_else(|| ErrorKind::InvalidSubscriptionEvent("Missing event data"))?;
-
- Ok((id, event))
- }
- Some(_) => bail!(ErrorKind::InvalidSubscriptionEvent(
- "RPC parameters is not a JSON object map"
- )),
- None => bail!(ErrorKind::InvalidSubscriptionEvent(
- "Missing RPC parameters"
- )),
- }
- }
-}
-
-impl ws::Handler for Handler {
- fn on_message(&mut self, msg: ws::Message) -> ws::Result<()> {
- if let Err(error) = self.process_message(msg) {
- let chained_error = error.chain_err(|| "Failed to process RPC message");
- error!("{}", chained_error.display_chain());
- }
-
- Ok(())
- }
-
- fn on_error(&mut self, error: ws::Error) {
- let error = Error::with_chain(error, ErrorKind::WebSocketError);
-
- let _ = self.connection_tx.send(WsIpcCommand::Error(error));
- }
-}
-
-pub struct WsIpcClient {
- connection_tx: mpsc::Sender<WsIpcCommand>,
-}
-
-impl WsIpcClient {
- pub fn connect(server_id: &::IpcServerId) -> Result<Self> {
- let url = Url::parse(&server_id)
- .chain_err(|| ErrorKind::InvalidServerIdUrl(server_id.to_owned()))?;
- let (connection_tx, connection_rx) = mpsc::channel();
- let sender = Self::open_websocket(url, connection_tx.clone())?;
-
- WsIpcClientConnection::spawn(sender, connection_rx);
-
- Ok(WsIpcClient { connection_tx })
- }
-
- fn open_websocket(url: Url, connection_tx: mpsc::Sender<WsIpcCommand>) -> Result<ws::Sender> {
- let (sender_tx, sender_rx) = mpsc::channel();
- let factory = Factory {
- connection_tx,
- sender_tx,
- };
-
- let mut websocket = ws::WebSocket::new(factory)
- .chain_err(|| ErrorKind::ConnectError("Unable to create WebSocket"))?;
-
- websocket
- .connect(url)
- .chain_err(|| ErrorKind::ConnectError("Unable to connect WebSocket to URL"))?;
-
- thread::spawn(move || {
- let result = websocket
- .run()
- .chain_err(|| ErrorKind::ConnectError("Error while running WebSocket event loop"));
-
- if let Err(error) = result {
- error!("{}", error.display_chain());
- }
- });
-
- sender_rx
- .recv()
- .chain_err(|| ErrorKind::ConnectError("WebSocket connection failed"))
- }
-
- pub fn subscribe<V, M>(
- &mut self,
- subscribe_method: String,
- unsubscribe_method: String,
- sender: mpsc::Sender<M>,
- ) -> Result<()>
- where
- V: for<'de> serde::Deserialize<'de>,
- M: From<V> + Send + 'static,
- {
- let raw_subscription_id = self.call(&subscribe_method, &[] as &[u8; 0])?;
- let subscription_id = SubscriptionId::parse_value(&raw_subscription_id)
- .ok_or_else(|| ErrorKind::InvalidSubscriptionId(raw_subscription_id))?;
-
- let handler = move |json_value| match forward_subscription_event(
- &subscribe_method,
- json_value,
- &sender,
- ) {
- Ok(()) => SubscriptionHandlerResult::Active,
- Err(error) => {
- error!("{}", error.display_chain());
- SubscriptionHandlerResult::Finished
- }
- };
-
- self.register_subscription(subscription_id, handler, unsubscribe_method)?;
-
- Ok(())
- }
-
- fn register_subscription<H>(
- &mut self,
- id: SubscriptionId,
- handler: H,
- unsubscribe_method: String,
- ) -> Result<()>
- where
- H: Fn(JsonValue) -> SubscriptionHandlerResult + Send + 'static,
- {
- self.connection_tx
- .send(WsIpcCommand::Subscribe {
- id,
- handler: Box::new(handler),
- unsubscribe_method,
- }).chain_err(|| ErrorKind::ConnectionHandlerStopped)
- }
-
- pub fn call<S, T, O>(&mut self, method: S, params: &T) -> Result<O>
- where
- S: ToString,
- T: serde::Serialize,
- O: for<'de> serde::Deserialize<'de>,
- {
- let arguments =
- serde_json::to_value(params).chain_err(|| ErrorKind::SerializeArgumentsError)?;
- let (response_tx, response_rx) = mpsc::channel();
- let command = WsIpcCommand::Call {
- method: method.to_string(),
- arguments,
- response_tx,
- };
-
- self.connection_tx
- .send(command)
- .chain_err(|| ErrorKind::ConnectionHandlerStopped)?;
-
- let json_result = response_rx
- .recv()
- .chain_err(|| ErrorKind::MissingResponse)?;
-
- Ok(serde_json::from_value(json_result?)
- .chain_err(|| ErrorKind::DeserializeResponseError)?)
- }
-}
-
-struct WsIpcClientConnection {
- next_id: i64,
- active_request: Option<ActiveRequest>,
- active_subscriptions: HashMap<SubscriptionId, (SubscriptionHandler, String)>,
- sender: ws::Sender,
-}
-
-impl WsIpcClientConnection {
- pub fn spawn(sender: ws::Sender, commands: mpsc::Receiver<WsIpcCommand>) {
- let mut instance = WsIpcClientConnection {
- next_id: 1,
- active_request: None,
- active_subscriptions: HashMap::new(),
- sender,
- };
-
- thread::spawn(move || {
- if let Err(error) = instance.run(commands) {
- let chained_error = Error::with_chain(error, "WsIpcClient event loop error");
- error!("{}", chained_error.display_chain());
- }
- });
- }
-
- fn run(&mut self, commands: mpsc::Receiver<WsIpcCommand>) -> Result<()> {
- use self::WsIpcCommand::*;
-
- for command in commands {
- match command {
- Call {
- method,
- arguments,
- response_tx,
- } => self.call(method, arguments, response_tx)?,
- Subscribe {
- id,
- handler,
- unsubscribe_method,
- } => {
- self.active_subscriptions
- .insert(id, (handler, unsubscribe_method));
- }
- Response { id, result } => self.handle_response(id, result)?,
- Notification {
- subscription,
- event,
- } => self.handle_notification(subscription, event)?,
- Error(error) => self.handle_error(error),
- }
- }
-
- Ok(())
- }
-
- fn call(
- &mut self,
- method: String,
- arguments: JsonValue,
- response_tx: mpsc::Sender<Result<JsonValue>>,
- ) -> Result<()> {
- let id = self.new_id();
- self.queue_request_response(id, response_tx);
- self.send_request(id, method, arguments)
- }
-
- fn new_id(&mut self) -> i64 {
- let id = self.next_id;
- self.next_id += 1;
- id
- }
-
- fn queue_request_response(&mut self, id: i64, response_tx: mpsc::Sender<Result<JsonValue>>) {
- self.active_request = Some(ActiveRequest::new(id, response_tx));
- }
-
- fn send_request(&mut self, id: i64, method: String, arguments: JsonValue) -> Result<()> {
- let json_request = self.build_json_request(id, &method, arguments);
-
- self.sender
- .send(json_request.as_bytes())
- .chain_err(|| ErrorKind::SendRequestError(method))
- }
-
- fn build_json_request(&mut self, id: i64, method: &str, params: JsonValue) -> String {
- let request_json = json!({
- "jsonrpc": "2.0",
- "id": id,
- "method": method,
- "params": params,
- });
- format!("{}", request_json)
- }
-
- fn handle_response(&mut self, id: i64, result: Result<JsonValue>) -> Result<()> {
- if let Some(mut request) = self.active_request.take() {
- if request.id() == id {
- request.send_response(result);
- } else {
- self.active_request = Some(request);
- warn!("Received an unexpected response with ID {}", id);
- }
- } else {
- warn!("Received an unexpected response with ID {}", id);
- }
-
- Ok(())
- }
-
- fn handle_notification(&mut self, id: SubscriptionId, event: JsonValue) -> Result<()> {
- let unsubscribe_method =
- if let Some((handler, unsubscribe_method)) = self.active_subscriptions.get(&id) {
- match handler(event) {
- SubscriptionHandlerResult::Active => None,
- SubscriptionHandlerResult::Finished => Some(unsubscribe_method.clone()),
- }
- } else {
- warn!("Received an unexpected notification");
- None
- };
-
- if let Some(method) = unsubscribe_method {
- self.unsubscribe(method, id)?;
- }
-
- Ok(())
- }
-
- fn unsubscribe(&mut self, method: String, id: SubscriptionId) -> Result<()> {
- self.active_subscriptions.remove(&id);
-
- let (result_tx, _) = mpsc::channel();
- let arguments = match id {
- SubscriptionId::Number(id) => serde_json::to_value(&[id]),
- SubscriptionId::String(id) => serde_json::to_value(&[id]),
- }.chain_err(|| ErrorKind::SerializeSubscriptionId);
-
- self.call(method, arguments?, result_tx)
- .chain_err(|| ErrorKind::UnsubscribeError)
- }
-
- fn handle_error(&mut self, error: Error) {
- if let Some(ref mut request) = self.active_request {
- let _ = request.response_tx.send(Err(error));
- } else {
- error!("{}", error.display_chain());
- }
- }
-}
-
-fn forward_subscription_event<V, M>(
- subscribe_method: &String,
- json_value: JsonValue,
- sender: &mpsc::Sender<M>,
-) -> Result<()>
-where
- V: for<'de> serde::Deserialize<'de>,
- M: From<V> + Send + 'static,
-{
- let value: V = serde_json::from_value(json_value)
- .chain_err(|| ErrorKind::DeserializeSubscriptionEvent(subscribe_method.clone()))?;
- let message = M::from(value);
-
- sender
- .send(message)
- .chain_err(|| ErrorKind::ForwardSubscriptionEvent(subscribe_method.clone()))
-}
diff --git a/talpid-ipc/src/lib.rs b/talpid-ipc/src/lib.rs
index 4c6c737165..d775dfbcaa 100644
--- a/talpid-ipc/src/lib.rs
+++ b/talpid-ipc/src/lib.rs
@@ -18,20 +18,20 @@ extern crate serde;
extern crate serde_json;
extern crate jsonrpc_core;
+extern crate jsonrpc_ipc_server;
extern crate jsonrpc_pubsub;
-extern crate jsonrpc_ws_server;
+#[macro_use]
+extern crate jsonrpc_client_core;
+extern crate jsonrpc_client_ipc;
+extern crate tokio_core;
extern crate url;
extern crate ws;
use jsonrpc_core::{MetaIoHandler, Metadata};
-use jsonrpc_ws_server::{MetaExtractor, NoopExtractor, Server, ServerBuilder};
+use jsonrpc_ipc_server::{MetaExtractor, NoopExtractor, Server, ServerBuilder};
use std::fmt;
-use std::net::{IpAddr, Ipv4Addr, SocketAddr};
-
-mod client;
-pub use client::*;
/// An Id created by the Ipc server that the client can use to connect to it
pub type IpcServerId = String;
@@ -46,33 +46,36 @@ error_chain!{
pub struct IpcServer {
- address: String,
+ path: String,
server: Server,
}
impl IpcServer {
- pub fn start<M: Metadata>(handler: MetaIoHandler<M>) -> Result<Self> {
- Self::start_with_metadata(handler, NoopExtractor)
+ pub fn start<M: Metadata + Default>(handler: MetaIoHandler<M>, path: String) -> Result<Self> {
+ Self::start_with_metadata(handler, NoopExtractor, path)
}
- pub fn start_with_metadata<M, E>(handler: MetaIoHandler<M>, meta_extractor: E) -> Result<Self>
+ pub fn start_with_metadata<M, E>(
+ handler: MetaIoHandler<M>,
+ meta_extractor: E,
+ path: String,
+ ) -> Result<Self>
where
- M: Metadata,
+ M: Metadata + Default,
E: MetaExtractor<M>,
{
- let listen_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
ServerBuilder::new(handler)
.session_meta_extractor(meta_extractor)
- .start(&listen_addr)
+ .start(&path)
.map(|server| IpcServer {
- address: format!("ws://{}", server.addr()),
+ path: path.to_owned(),
server: server,
}).chain_err(|| ErrorKind::IpcServerError)
}
- /// Returns the localhost address this `IpcServer` is listening on.
- pub fn address(&self) -> &str {
- &self.address
+ /// Returns the uds/named pipe path this `IpcServer` is listening on.
+ pub fn path(&self) -> &str {
+ &self.path
}
/// Creates a handle bound to this `IpcServer` that can be used to shut it down.
@@ -80,10 +83,10 @@ impl IpcServer {
CloseHandle(self.server.close_handle())
}
- /// Consumes the server and waits for it to finish. Get an `CloseHandle` before calling this
+ /// Consumes the server and waits for it to finish. Get a `CloseHandle` before calling this
/// if you want to be able to shut the server down.
- pub fn wait(self) -> Result<()> {
- self.server.wait().chain_err(|| ErrorKind::IpcServerError)
+ pub fn wait(self) {
+ self.server.wait()
}
}
@@ -92,14 +95,14 @@ impl IpcServer {
impl fmt::Debug for IpcServer {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("IpcServer")
- .field("address", &self.address)
+ .field("path", &self.path)
.finish()
}
}
#[derive(Clone)]
-pub struct CloseHandle(jsonrpc_ws_server::CloseHandle);
+pub struct CloseHandle(jsonrpc_ipc_server::CloseHandle);
impl CloseHandle {
pub fn close(self) {
diff --git a/talpid-ipc/tests/ipc-client-server.rs b/talpid-ipc/tests/ipc-client-server.rs
index 34b44beb68..b7ae7c2f2f 100644
--- a/talpid-ipc/tests/ipc-client-server.rs
+++ b/talpid-ipc/tests/ipc-client-server.rs
@@ -1,11 +1,22 @@
#[macro_use]
extern crate assert_matches;
extern crate env_logger;
+extern crate jsonrpc_client_core;
+extern crate jsonrpc_client_ipc;
extern crate jsonrpc_core;
#[macro_use]
extern crate jsonrpc_macros;
extern crate talpid_ipc;
+extern crate tokio_core;
+extern crate uuid;
+extern crate futures;
+
+use futures::sync::oneshot;
+use futures::Future;
+use tokio_core::reactor::Core;
+
+use jsonrpc_client_core::Error as ClientError;
use jsonrpc_core::{Error, IoHandler};
use std::sync::{mpsc, Mutex};
use std::time::Duration;
@@ -35,13 +46,13 @@ fn can_call_rpcs_on_server() {
env_logger::init();
let (server, rx) = create_server();
- let server_id = server.address().to_owned();
- let mut client = create_client(&server_id);
+ let server_path = server.path().to_owned();
+ let mut client = create_client(server_path);
- let _result: () = client.call("foo", &[97]).unwrap();
+ let _result: () = client.call_method("foo", &[97]).wait().unwrap();
assert_eq!(Ok(97), rx.recv_timeout(Duration::from_millis(500)));
- let result: Result<(), _> = client.call("invalid_method", &[0]);
+ let result: Result<(), ClientError> = client.call_method("invalid_method", &[0]).wait();
assert_matches!(result, Err(_));
server.close_handle().close();
}
@@ -51,14 +62,7 @@ fn can_call_rpcs_on_server() {
#[test]
#[should_panic]
fn ipc_client_invalid_url() {
- create_client(&"INVALID ID".to_owned());
-}
-
-#[test]
-fn ipc_client_bad_connection() {
- let mut client = create_client(&"ws://127.0.0.1:9876".to_owned());
- let result: Result<(), _> = client.call("invalid_method", &[0]);
- assert_matches!(result, Err(_));
+ create_client("INVALID ID".to_owned());
}
fn create_server() -> (talpid_ipc::IpcServer, mpsc::Receiver<i64>) {
@@ -67,10 +71,30 @@ fn create_server() -> (talpid_ipc::IpcServer, mpsc::Receiver<i64>) {
let mut io = IoHandler::new();
io.extend_with(rpc.to_delegate());
- let server = talpid_ipc::IpcServer::start(io.into()).unwrap();
+ let uuid = uuid::Uuid::new_v4().to_string();
+ let ipc_path = if cfg!(windows) {
+ format!(r"\\.\pipe\ipc-test-{}", uuid)
+ } else {
+ format!("/tmp/ipc-test-{}", uuid)
+ };
+ let server = talpid_ipc::IpcServer::start(io.into(), ipc_path).unwrap();
(server, rx)
}
-fn create_client(id: &talpid_ipc::IpcServerId) -> talpid_ipc::WsIpcClient {
- talpid_ipc::WsIpcClient::connect(id).unwrap()
+fn create_client(ipc_path: String) -> jsonrpc_client_core::ClientHandle {
+ use std::thread;
+ let (tx, rx) = oneshot::channel();
+
+ thread::spawn(move || {
+ let mut core = Core::new().expect("failed to spawn reactor");
+ let (client, client_handle) =
+ jsonrpc_client_ipc::IpcTransport::new(&ipc_path, &core.handle())
+ .expect("failed to construct a transport")
+ .into_client();
+ tx.send(client_handle);
+ core.run(client);
+ });
+
+ let handle = rx.wait().expect("Failed to construct a valid client");
+ handle
}
diff --git a/talpid-openvpn-plugin/Cargo.toml b/talpid-openvpn-plugin/Cargo.toml
index c5987d68ef..165a4976be 100644
--- a/talpid-openvpn-plugin/Cargo.toml
+++ b/talpid-openvpn-plugin/Cargo.toml
@@ -12,9 +12,14 @@ crate-type = ["cdylib"]
error-chain = "0.12"
log = "0.4"
env_logger = "0.5"
+jsonrpc-client-core = { git = "https://github.com/mullvad/jsonrpc-client-rs" }
+jsonrpc-client-ipc = { git = "https://github.com/mullvad/jsonrpc-client-rs" }
+tokio-core = "0.1"
+futures = "0.1"
openvpn-plugin = { version = "0.3", features = ["serde", "log"] }
talpid-ipc = { path = "../talpid-ipc" }
+
[target.'cfg(windows)'.build-dependencies]
windres = "0.2"
diff --git a/talpid-openvpn-plugin/src/lib.rs b/talpid-openvpn-plugin/src/lib.rs
index 9c6c690339..ed358a5df7 100644
--- a/talpid-openvpn-plugin/src/lib.rs
+++ b/talpid-openvpn-plugin/src/lib.rs
@@ -13,12 +13,19 @@ extern crate error_chain;
extern crate log;
#[macro_use]
+extern crate jsonrpc_client_core;
+extern crate futures;
+extern crate jsonrpc_client_ipc;
+#[macro_use]
extern crate openvpn_plugin;
-extern crate talpid_ipc;
+extern crate tokio_core;
+use error_chain::ChainedError;
use openvpn_plugin::types::{EventResult, OpenVpnPluginEvent};
use std::collections::HashMap;
use std::ffi::CString;
+use std::sync::Mutex;
+
mod processing;
use processing::EventProcessor;
@@ -54,26 +61,28 @@ openvpn_plugin!(
::openvpn_open,
::openvpn_close,
::openvpn_event,
- ::EventProcessor
+ ::Mutex<EventProcessor>
);
pub struct Arguments {
- server_id: talpid_ipc::IpcServerId,
- credentials: String,
+ ipc_socket_path: String,
}
fn openvpn_open(
args: Vec<CString>,
_env: HashMap<CString, CString>,
-) -> Result<(Vec<OpenVpnPluginEvent>, EventProcessor)> {
+) -> Result<(Vec<OpenVpnPluginEvent>, Mutex<EventProcessor>)> {
env_logger::init();
debug!("Initializing plugin");
let arguments = parse_args(&args)?;
- info!("Connecting back to talpid core at {}", arguments.server_id);
- let processor = EventProcessor::new(&arguments).chain_err(|| ErrorKind::InitHandleFailed)?;
+ info!(
+ "Connecting back to talpid core at {}",
+ arguments.ipc_socket_path
+ );
+ let processor = EventProcessor::new(arguments).chain_err(|| ErrorKind::InitHandleFailed)?;
- Ok((INTERESTING_EVENTS.to_vec(), processor))
+ Ok((INTERESTING_EVENTS.to_vec(), Mutex::new(processor)))
}
fn parse_args(args: &[CString]) -> Result<Arguments> {
@@ -82,37 +91,39 @@ fn parse_args(args: &[CString]) -> Result<Arguments> {
.into_iter();
let _plugin_path = args_iter.next();
- let server_id: talpid_ipc::IpcServerId = args_iter
+ let ipc_socket_path: String = args_iter
.next()
.ok_or_else(|| ErrorKind::Msg("No core server id given as first argument".to_owned()))?;
- let credentials = args_iter
- .next()
- .ok_or_else(|| ErrorKind::Msg("No IPC credentials given as second argument".to_owned()))?;
- Ok(Arguments {
- server_id,
- credentials,
- })
+ Ok(Arguments { ipc_socket_path })
}
-fn openvpn_close(_handle: EventProcessor) {
- debug!("Unloading plugin");
+fn openvpn_close(_handle: Mutex<EventProcessor>) {
+ info!("Unloading plugin");
}
fn openvpn_event(
event: OpenVpnPluginEvent,
_args: Vec<CString>,
env: HashMap<CString, CString>,
- handle: &mut EventProcessor,
+ handle: &mut Mutex<EventProcessor>,
) -> Result<EventResult> {
debug!("Received event: {:?}", event);
let parsed_env =
openvpn_plugin::ffi::parse::env_utf8(&env).chain_err(|| ErrorKind::ParseEnvFailed)?;
- handle
+ let result = handle
+ .lock()
+ .expect("failed to obtain mutex for EventProcessor")
.process_event(event, parsed_env)
- .chain_err(|| ErrorKind::EventProcessingFailed)?;
- Ok(EventResult::Success)
+ .chain_err(|| ErrorKind::EventProcessingFailed);
+ match result {
+ Ok(()) => Ok(EventResult::Success),
+ Err(e) => {
+ error!("{}", e.display_chain());
+ Ok(EventResult::Failure)
+ }
+ }
}
diff --git a/talpid-openvpn-plugin/src/processing.rs b/talpid-openvpn-plugin/src/processing.rs
index 99533945ed..66d009b44f 100644
--- a/talpid-openvpn-plugin/src/processing.rs
+++ b/talpid-openvpn-plugin/src/processing.rs
@@ -1,42 +1,55 @@
use openvpn_plugin;
use std::collections::HashMap;
-use std::sync::Mutex;
-use talpid_ipc::WsIpcClient;
+
+extern crate futures;
+
+use jsonrpc_client_core::{Future, Result as ClientResult, Transport};
+use jsonrpc_client_ipc::IpcTransport;
+use tokio_core::reactor::Core;
use super::Arguments;
error_chain! {
errors {
- AuthDenied {
- description("Failed to authenticate with Talpid IPC server")
- }
IpcSendingError {
description("Failed while sending an event over the IPC channel")
}
+
+ Shutdown {
+ description("Connection is shut down")
+ }
+
}
}
/// Struct processing OpenVPN events and notifies listeners over IPC
pub struct EventProcessor {
- ipc_client: Mutex<WsIpcClient>,
+ ipc_client: EventProxy,
+ client_stop: ::std::sync::mpsc::Receiver<ClientResult<()>>,
+ core: Core,
}
impl EventProcessor {
- pub fn new(arguments: &Arguments) -> Result<EventProcessor> {
+ pub fn new(arguments: Arguments) -> Result<EventProcessor> {
trace!("Creating EventProcessor");
- let mut ipc_client = WsIpcClient::connect(&arguments.server_id)
- .chain_err(|| "Unable to create IPC client")?;
+ let core = Core::new().chain_err(|| "Unable to initialize Tokio Core")?;
+ let handle = core.handle();
+ let (client, client_handle) = IpcTransport::new(&arguments.ipc_socket_path, &handle)
+ .chain_err(|| "Unable to create IPC transport")?
+ .into_client();
- trace!("Authenticating EventProcessor");
- match ipc_client.call("authenticate", &[&arguments.credentials]) {
- Ok(true) => trace!("Credentials accepted"),
- Ok(false) => bail!(ErrorKind::AuthDenied),
- Err(error) => bail!(Error::with_chain(error, ErrorKind::AuthDenied)),
- }
+ let (tx, client_stop) = ::std::sync::mpsc::channel();
+
+ let client_future = client.then(move |result| tx.send(result)).map_err(|_| ());
+ handle.spawn(client_future);
+
+ let ipc_client = EventProxy::new(client_handle);
Ok(EventProcessor {
- ipc_client: Mutex::new(ipc_client),
+ ipc_client,
+ client_stop,
+ core,
})
}
@@ -46,11 +59,25 @@ impl EventProcessor {
env: HashMap<String, String>,
) -> Result<()> {
trace!("Processing \"{:?}\" event", event);
- self.ipc_client
- .lock()
- .expect("a thread panicked while using the RPC client in the OpenVPN plugin")
- .call("openvpn_event", &(event, env))
- .map(|_: Option<()>| ())
- .chain_err(|| ErrorKind::IpcSendingError)
+ let call_future = self
+ .ipc_client
+ .openvpn_event(event, env)
+ .map_err(|e| Error::with_chain(e, ErrorKind::IpcSendingError));
+ self.core.run(call_future)?;
+ self.check_client_status()
+ }
+
+ fn check_client_status(&mut self) -> Result<()> {
+ use std::sync::mpsc::TryRecvError::*;
+ match self.client_stop.try_recv() {
+ Err(Empty) => Ok(()),
+ Err(Disconnected) => Err(ErrorKind::Shutdown.into()),
+ Ok(Ok(_)) => Err(ErrorKind::Shutdown.into()),
+ Ok(Err(e)) => Err(Error::with_chain(e, ErrorKind::IpcSendingError)),
+ }
}
}
+
+jsonrpc_client!(pub struct EventProxy {
+ pub fn openvpn_event(&mut self, event: openvpn_plugin::types::OpenVpnPluginEvent, env: HashMap<String, String>) -> Future<()>;
+});