diff options
| author | Linus Färnstrand <linus@mullvad.net> | 2018-08-07 15:21:08 +0200 |
|---|---|---|
| committer | Emīls Piņķis <emils@mullvad.net> | 2018-08-29 16:27:51 +0100 |
| commit | 3b62a06e9bfec75043888fa58c4ccba17689be4c (patch) | |
| tree | c677ab18ab5abea0cb98aa805e78d8cbb746d498 | |
| parent | 559a81c6044d620159d8229295bee3b2e909eca4 (diff) | |
| download | mullvadvpn-3b62a06e9bfec75043888fa58c4ccba17689be4c.tar.xz mullvadvpn-3b62a06e9bfec75043888fa58c4ccba17689be4c.zip | |
Use IPC instead of a websocket to communicate between daemon and plugin
| -rw-r--r-- | mullvad-daemon/Cargo.toml | 9 | ||||
| -rw-r--r-- | mullvad-daemon/src/main.rs | 25 | ||||
| -rw-r--r-- | mullvad-daemon/src/management_interface.rs | 41 | ||||
| -rw-r--r-- | mullvad-daemon/src/relays.rs | 9 | ||||
| -rw-r--r-- | mullvad-daemon/src/rpc_uniqueness_check.rs | 4 | ||||
| -rw-r--r-- | talpid-core/Cargo.toml | 4 | ||||
| -rw-r--r-- | talpid-core/src/tunnel/openvpn.rs | 102 | ||||
| -rw-r--r-- | talpid-ipc/Cargo.toml | 13 | ||||
| -rw-r--r-- | talpid-ipc/src/client.rs | 585 | ||||
| -rw-r--r-- | talpid-ipc/src/lib.rs | 47 | ||||
| -rw-r--r-- | talpid-ipc/tests/ipc-client-server.rs | 54 | ||||
| -rw-r--r-- | talpid-openvpn-plugin/Cargo.toml | 5 | ||||
| -rw-r--r-- | talpid-openvpn-plugin/src/lib.rs | 55 | ||||
| -rw-r--r-- | talpid-openvpn-plugin/src/processing.rs | 71 |
14 files changed, 242 insertions, 782 deletions
diff --git a/mullvad-daemon/Cargo.toml b/mullvad-daemon/Cargo.toml index bd6add0daf..9f491e2dd4 100644 --- a/mullvad-daemon/Cargo.toml +++ b/mullvad-daemon/Cargo.toml @@ -15,10 +15,11 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" log = "0.4" log-panics = "2.0.0" -jsonrpc-core = { git = "https://github.com/paritytech/jsonrpc", tag = "v8.0.1" } -jsonrpc-macros = { git = "https://github.com/paritytech/jsonrpc", tag = "v8.0.1" } -jsonrpc-pubsub = { git = "https://github.com/paritytech/jsonrpc", tag = "v8.0.1" } -jsonrpc-ws-server = { git = "https://github.com/paritytech/jsonrpc", tag = "v8.0.1" } +jsonrpc-core = { git = "https://github.com/paritytech/jsonrpc", branch = "master" } +jsonrpc-macros = { git = "https://github.com/paritytech/jsonrpc", branch = "master" } +jsonrpc-pubsub = { git = "https://github.com/paritytech/jsonrpc", branch = "master" } +jsonrpc-ws-server = { git = "https://github.com/paritytech/jsonrpc", branch = "master" } +jsonrpc-ipc-server = { git = "https://github.com/paritytech/jsonrpc", branch = "master" } uuid = { version = "0.6", features = ["v4"] } lazy_static = "1.0" rand = "0.5" diff --git a/mullvad-daemon/src/main.rs b/mullvad-daemon/src/main.rs index 848207fbb1..a859baec4c 100644 --- a/mullvad-daemon/src/main.rs +++ b/mullvad-daemon/src/main.rs @@ -25,6 +25,7 @@ extern crate serde_json; extern crate jsonrpc_core; #[macro_use] extern crate jsonrpc_macros; +extern crate jsonrpc_ipc_server; extern crate jsonrpc_pubsub; extern crate jsonrpc_ws_server; extern crate rand; @@ -125,8 +126,9 @@ pub enum DaemonEvent { TunnelStateTransition(TunnelStateTransition), /// An event coming from the JSONRPC-2.0 management interface. ManagementInterfaceEvent(ManagementCommand), + // TODO(emilsp): try and get an error from the management interface /// Triggered if the server hosting the JSONRPC-2.0 management interface dies unexpectedly. - ManagementInterfaceExited(talpid_ipc::Result<()>), + ManagementInterfaceExited, /// Daemon shutdown triggered by a signal, ctrl-c or similar. TriggerShutdown, } @@ -308,7 +310,7 @@ impl Daemon { thread::spawn(move || { let result = server.wait(); error!("Mullvad management interface shut down"); - let _ = exit_tx.send(DaemonEvent::ManagementInterfaceExited(result)); + let _ = exit_tx.send(DaemonEvent::ManagementInterfaceExited); }); } @@ -333,7 +335,7 @@ impl Daemon { match event { TunnelStateTransition(transition) => self.handle_tunnel_state_transition(transition), ManagementInterfaceEvent(event) => self.handle_management_interface_event(event), - ManagementInterfaceExited(result) => self.handle_management_interface_exited(result), + ManagementInterfaceExited => self.handle_management_interface_exited(), TriggerShutdown => self.handle_trigger_shutdown_event(), } } @@ -599,12 +601,17 @@ impl Daemon { } } - fn handle_management_interface_exited(&self, result: talpid_ipc::Result<()>) -> Result<()> { - let error = ErrorKind::ManagementInterfaceError("Server exited unexpectedly"); - match result { - Ok(()) => Err(error.into()), - Err(e) => Err(e).chain_err(|| error), - } + // TODO: (emilsp) fix this + // fn handle_management_interface_exited(&self, result: talpid_ipc_ws::Result<()>) -> + // Result<()> { let error = ErrorKind::ManagementInterfaceError("Server exited + // unexpectedly"); match result { + // Ok(()) => Err(error.into()), + // Err(e) => Err(e).chain_err(|| error), + // } + // } + + fn handle_management_interface_exited(&self) -> Result<()> { + Err(ErrorKind::ManagementInterfaceError("Server exited unexpectedly").into()) } fn handle_trigger_shutdown_event(&mut self) -> Result<()> { diff --git a/mullvad-daemon/src/management_interface.rs b/mullvad-daemon/src/management_interface.rs index 3f2d920180..bb2e540cc1 100644 --- a/mullvad-daemon/src/management_interface.rs +++ b/mullvad-daemon/src/management_interface.rs @@ -4,13 +4,14 @@ use error_chain::ChainedError; use jsonrpc_core::futures::sync::oneshot::Sender as OneshotSender; use jsonrpc_core::futures::{future, sync, Future}; use jsonrpc_core::{Error, ErrorCode, MetaIoHandler, Metadata}; +use jsonrpc_ipc_server; use jsonrpc_macros::pubsub; use jsonrpc_pubsub::{PubSubHandler, PubSubMetadata, Session, SubscriptionId}; -use jsonrpc_ws_server; use mullvad_rpc; use mullvad_types::account::{AccountData, AccountToken}; use mullvad_types::location::GeoIpLocation; +use mullvad_paths; use mullvad_types::relay_constraints::{RelaySettings, RelaySettingsUpdate}; use mullvad_types::relay_list::RelayList; use mullvad_types::states::{DaemonState, TargetState}; @@ -237,7 +238,12 @@ impl ManagementInterfaceServer { let mut io = PubSubHandler::default(); io.extend_with(rpc.to_delegate()); let meta_io: MetaIoHandler<Meta> = io.into(); - let server = talpid_ipc::IpcServer::start_with_metadata(meta_io, meta_extractor)?; + let path = mullvad_paths::get_rpc_socket_path(); + let server = talpid_ipc::IpcServer::start_with_metadata( + meta_io, + meta_extractor, + path.to_string_lossy().to_string(), + )?; Ok(ManagementInterfaceServer { server, subscriptions, @@ -245,7 +251,7 @@ impl ManagementInterfaceServer { } pub fn address(&self) -> &str { - self.server.address() + self.server.path() } pub fn event_broadcaster(&self) -> EventBroadcaster { @@ -256,11 +262,19 @@ impl ManagementInterfaceServer { /// Consumes the server and waits for it to finish. Returns an error if the server exited /// due to an error. - pub fn wait(self) -> talpid_ipc::Result<()> { + pub fn wait(self) { self.server.wait() } } +fn ipc_path() -> String { + if cfg!(windows) { + "//./pipe/mullvad_daemon_socket".to_owned() + } else { + "/tmp/mullvad_daemon_socket".to_owned() + } +} + /// A handle that allows broadcasting messages to all subscribers of the management interface. pub struct EventBroadcaster { @@ -380,13 +394,14 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterface<T> { } fn check_auth(&self, meta: &Meta) -> Result<(), Error> { - if meta.authenticated.load(Ordering::SeqCst) { - trace!("auth success"); - Ok(()) - } else { - trace!("auth failed"); - Err(Error::invalid_request()) - } + Ok(()) + // if meta.authenticated.load(Ordering::SeqCst) { + // trace!("auth success"); + // Ok(()) + // } else { + // trace!("auth failed"); + // Err(Error::invalid_request()) + // } } fn load_history(&self) -> Result<AccountHistory, AccountHistoryError> { @@ -751,9 +766,9 @@ impl PubSubMetadata for Meta { } /// Metadata extractor function for `Meta`. -fn meta_extractor(context: &jsonrpc_ws_server::RequestContext) -> Meta { +fn meta_extractor(context: &jsonrpc_ipc_server::RequestContext) -> Meta { Meta { - session: Some(Arc::new(Session::new(context.sender()))), + session: Some(Arc::new(Session::new(context.sender.clone()))), authenticated: Arc::new(AtomicBool::new(false)), } } diff --git a/mullvad-daemon/src/relays.rs b/mullvad-daemon/src/relays.rs index 4b0b07217e..28edaff9cc 100644 --- a/mullvad-daemon/src/relays.rs +++ b/mullvad-daemon/src/relays.rs @@ -24,15 +24,18 @@ use rand::{self, Rng, ThreadRng}; use tokio_timer::{TimeoutError, Timer}; const RELAYS_FILENAME: &str = "relays.json"; -const DOWNLOAD_TIMEOUT: Duration = Duration::from_secs(15); -const UPDATE_INTERVAL: Duration = Duration::from_secs(60 * 60); +const DOWNLOAD_TIMEOUT: Duration = Duration::from_secs(30); +const UPDATE_INTERVAL: Duration = Duration::from_secs(15); const MAX_CACHE_AGE: Duration = Duration::from_secs(60 * 60 * 24); error_chain! { errors { RelayCacheError { description("Error with relay cache on disk") } DownloadError { description("Error when trying to download the list of relays") } - DownloadTimeoutError { description("Timed out when trying to download the list of relays") } + DownloadTimeoutError(error: tokio_timer::Error) { + display("Timer error - {}", error) + description("Timed out when trying to download the list of relays") + } NoRelay { description("No relays matching current constraints") } SerializationError { description("Error in serialization of relaylist") } } diff --git a/mullvad-daemon/src/rpc_uniqueness_check.rs b/mullvad-daemon/src/rpc_uniqueness_check.rs index 76130f72e8..2766aa9199 100644 --- a/mullvad-daemon/src/rpc_uniqueness_check.rs +++ b/mullvad-daemon/src/rpc_uniqueness_check.rs @@ -1,7 +1,7 @@ use error_chain::ChainedError; use log::Level; -use mullvad_ipc_client::DaemonRpcClient; +use mullvad_ipc_client::new_standalone_ipc_client; /// Checks if there is another instance of the daemon running. @@ -9,7 +9,7 @@ use mullvad_ipc_client::DaemonRpcClient; /// Tries to connect to another daemon and perform a simple RPC call. If it fails, assumes the /// other daemon has stopped. pub fn is_another_instance_running() -> bool { - match DaemonRpcClient::new() { + match new_standalone_ipc_client() { Ok(_) => true, Err(error) => { let msg = diff --git a/talpid-core/Cargo.toml b/talpid-core/Cargo.toml index 52297f214d..23df76dfd8 100644 --- a/talpid-core/Cargo.toml +++ b/talpid-core/Cargo.toml @@ -10,8 +10,8 @@ atty = "0.2" duct = "0.11" error-chain = "0.12" futures = "0.1" -jsonrpc-core = { git = "https://github.com/paritytech/jsonrpc", tag = "v8.0.1" } -jsonrpc-macros = { git = "https://github.com/paritytech/jsonrpc", tag = "v8.0.1" } +jsonrpc-core = { git = "https://github.com/paritytech/jsonrpc", branch = "master" } +jsonrpc-macros = { git = "https://github.com/paritytech/jsonrpc", branch = "master" } libc = "0.2.20" log = "0.4" openvpn-plugin = { version = "0.3", features = ["serde"] } diff --git a/talpid-core/src/tunnel/openvpn.rs b/talpid-core/src/tunnel/openvpn.rs index c63429a4f5..8e89ed4a5c 100644 --- a/talpid-core/src/tunnel/openvpn.rs +++ b/talpid-core/src/tunnel/openvpn.rs @@ -12,7 +12,6 @@ use std::thread; use std::time::Duration; use talpid_ipc; -use uuid; mod errors { error_chain!{ @@ -60,14 +59,10 @@ impl<C: OpenVpnBuilder> OpenVpnMonitor<C> { L: Fn(OpenVpnPluginEvent, HashMap<String, String>) + Send + Sync + 'static, P: AsRef<Path>, { - let credentials = uuid::Uuid::new_v4().to_string(); - let event_dispatcher = event_server::start(credentials.clone(), on_event) - .chain_err(|| ErrorKind::EventDispatcherError)?; + let event_dispatcher = + event_server::start(on_event).chain_err(|| ErrorKind::EventDispatcherError)?; - cmd.plugin( - plugin_path, - vec![event_dispatcher.address().to_owned(), credentials], - ); + cmd.plugin(plugin_path, vec![event_dispatcher.path().to_owned()]); let child = cmd .start() .chain_err(|| ErrorKind::ChildProcessError("Failed to start"))?; @@ -108,12 +103,9 @@ impl<C: OpenVpnBuilder> OpenVpnMonitor<C> { error!("OpenVPN process wait error: {}", e); Err(e).chain_err(|| ErrorKind::ChildProcessError("Error when waiting")) } - WaitResult::EventDispatcher(result) => { - error!("OpenVPN Event server exited unexpectedly: {:?}", result); - match result { - Ok(()) => Err(ErrorKind::EventDispatcherError.into()), - Err(e) => Err(e).chain_err(|| ErrorKind::EventDispatcherError), - } + WaitResult::EventDispatcher => { + error!("OpenVPN Event server exited unexpectedly"); + Err(ErrorKind::EventDispatcherError.into()) } } } @@ -137,10 +129,8 @@ impl<C: OpenVpnBuilder> OpenVpnMonitor<C> { dispatcher_handle.close(); }); thread::spawn(move || { - let result = event_dispatcher.wait(); - dispatcher_tx - .send(WaitResult::EventDispatcher(result)) - .unwrap(); + event_dispatcher.wait(); + dispatcher_tx.send(WaitResult::EventDispatcher).unwrap(); let _ = child_close_handle.close(); }); @@ -172,7 +162,7 @@ impl<H: ProcessHandle> OpenVpnCloseHandle<H> { #[derive(Debug)] enum WaitResult { Child(io::Result<ExitStatus>, bool), - EventDispatcher(talpid_ipc::Result<()>), + EventDispatcher, } /// Trait for types acting as OpenVPN process starters for `OpenVpnMonitor`. @@ -221,41 +211,39 @@ impl ProcessHandle for OpenVpnProcHandle { mod event_server { use super::OpenVpnPluginEvent; - use jsonrpc_core::{Compatibility, Error, MetaIoHandler, Metadata}; + use jsonrpc_core::{Error, IoHandler, MetaIoHandler}; use std::collections::HashMap; - use std::sync::atomic::{AtomicBool, Ordering}; - use std::sync::Arc; use talpid_ipc; + use uuid; /// Construct and start the IPC server with the given event listener callback. - pub fn start<L>(credentials: String, on_event: L) -> talpid_ipc::Result<talpid_ipc::IpcServer> + pub fn start<L>(on_event: L) -> talpid_ipc::Result<talpid_ipc::IpcServer> where L: Fn(OpenVpnPluginEvent, HashMap<String, String>) + Send + Sync + 'static, { - let rpc = OpenVpnEventApiImpl { - credentials, - on_event, + let uuid = uuid::Uuid::new_v4().to_string(); + let ipc_path = if cfg!(windows) { + format!("//./pipe/talpid-openvpn-{}", uuid) + } else { + format!("/tmp/talpid-openvpn-{}", uuid) }; - let mut io = MetaIoHandler::with_compatibility(Compatibility::V2); + let rpc = OpenVpnEventApiImpl { on_event }; + let mut io = IoHandler::new(); io.extend_with(rpc.to_delegate()); - talpid_ipc::IpcServer::start(io.into()) + let meta_io: MetaIoHandler<()> = MetaIoHandler::from(io); + talpid_ipc::IpcServer::start(meta_io, ipc_path) } build_rpc_trait! { pub trait OpenVpnEventApi { - type Metadata; - - #[rpc(meta, name = "authenticate")] - fn authenticate(&self, Self::Metadata, String) -> Result<bool, Error>; - #[rpc(meta, name = "openvpn_event")] - fn openvpn_event(&self, Self::Metadata, OpenVpnPluginEvent, HashMap<String, String>) + #[rpc(name = "openvpn_event")] + fn openvpn_event(&self, OpenVpnPluginEvent, HashMap<String, String>) -> Result<(), Error>; } } struct OpenVpnEventApiImpl<L> { - credentials: String, on_event: L, } @@ -263,60 +251,16 @@ mod event_server { where L: Fn(OpenVpnPluginEvent, HashMap<String, String>) + Send + Sync + 'static, { - type Metadata = Meta; - - fn authenticate( - &self, - metadata: Self::Metadata, - credentials: String, - ) -> Result<bool, Error> { - if credentials == self.credentials { - metadata.authenticated.store(true, Ordering::Relaxed); - Ok(true) - } else { - Ok(false) - } - } - fn openvpn_event( &self, - metadata: Self::Metadata, event: OpenVpnPluginEvent, env: HashMap<String, String>, ) -> Result<(), Error> { trace!("OpenVPN event {:?}", event); - metadata.check_authentication()?; (self.on_event)(event, env); Ok(()) } } - - #[derive(Clone)] - struct Meta { - authenticated: Arc<AtomicBool>, - } - - impl Default for Meta { - fn default() -> Self { - Meta { - authenticated: Arc::new(AtomicBool::new(false)), - } - } - } - - impl Meta { - fn check_authentication(&self) -> Result<(), Error> { - if self.authenticated.load(Ordering::Relaxed) { - trace!("Authenticated"); - Ok(()) - } else { - trace!("Not authenticated"); - Err(Error::invalid_request()) - } - } - } - - impl Metadata for Meta {} } diff --git a/talpid-ipc/Cargo.toml b/talpid-ipc/Cargo.toml index 591b0b1803..80b56f0b4f 100644 --- a/talpid-ipc/Cargo.toml +++ b/talpid-ipc/Cargo.toml @@ -10,13 +10,18 @@ error-chain = "0.12" serde = "1.0" serde_json = "1.0" log = "0.4" -jsonrpc-core = { git = "https://github.com/paritytech/jsonrpc", tag = "v8.0.1" } -jsonrpc-pubsub = { git = "https://github.com/paritytech/jsonrpc", tag = "v8.0.1" } -jsonrpc-ws-server = { git = "https://github.com/paritytech/jsonrpc", tag = "v8.0.1" } +jsonrpc-core = { git = "https://github.com/paritytech/jsonrpc", branch = "master" } +jsonrpc-pubsub = { git = "https://github.com/paritytech/jsonrpc", branch = "master" } +jsonrpc-ipc-server = { git = "https://github.com/paritytech/jsonrpc", branch = "master" } +jsonrpc-client-core = { git = "https://github.com/mullvad/jsonrpc-client-rs" } +jsonrpc-client-ipc = { git = "https://github.com/mullvad/jsonrpc-client-rs" } +tokio-core = "0.1" ws = { git = "https://github.com/tomusdrw/ws-rs" } url = "1.4" [dev-dependencies] assert_matches = "1.0" env_logger = "0.5" -jsonrpc-macros = { git = "https://github.com/paritytech/jsonrpc", tag = "v8.0.1" } +jsonrpc-macros = { git = "https://github.com/paritytech/jsonrpc", branch = "master" } +uuid = { version = "0.6", features = ["v4"] } +futures = "0.1.23" diff --git a/talpid-ipc/src/client.rs b/talpid-ipc/src/client.rs deleted file mode 100644 index 63c95cc74f..0000000000 --- a/talpid-ipc/src/client.rs +++ /dev/null @@ -1,585 +0,0 @@ -use std::collections::HashMap; -use std::sync::mpsc; -use std::thread; - -use error_chain::ChainedError; -use jsonrpc_pubsub::SubscriptionId; -use serde; -use serde_json::{self, Result as JsonResult, Value as JsonValue}; -use url::Url; -use ws; - -type JsonMap = serde_json::map::Map<String, JsonValue>; - -mod errors { - error_chain! { - errors { - ConnectError(details: &'static str) { - description("Failed to connect to RPC server") - display("Failed to connect to RPC server: {}", details) - } - - ConnectionHandlerStopped { - description("The WebSocket connection handler thread has stopped") - } - - ErrorResponse(error_message: String) { - description("Received an RPC error response") - display("Received an RPC error response: {}", error_message) - } - - DeserializeResponseError { - description("Failed to deserialize response") - } - - DeserializeSubscriptionEvent(event: String) { - description("Failed to deserialize RPC subscription event") - display("Failed to deserialize RPC subscription event {}", event) - } - - ForwardSubscriptionEvent(event: String) { - description("Failed to forward RPC subscription event") - display("Failed to forward RPC subscription event {}", event) - } - - InvalidJsonRpcResponse(details: &'static str) { - description("Received an invalid JSON-RPC response") - display("Received an invalid JSON-RPC response: {}", details) - } - - InvalidServerIdUrl(server_id: ::IpcServerId) { - description("Unable to parse given server ID as a URL") - display("Unable to parse given server ID as a URL: {}", server_id) - } - - InvalidSubscriptionEvent(details: &'static str) { - description("Received an invalid JSON-RPC PubSub event") - display("Received an invalid JSON-RPC PubSub event: {}", details) - } - - InvalidSubscriptionId(raw_id: ::serde_json::Value) { - description("Received an invalid JSON-RPC subscription ID for subscribe request") - display( - "Received an invalid JSON-RPC subscription ID for subscribe request: {}", - raw_id, - ) - } - - MissingResponse { - description("No response received") - } - - SendRequestError(method: String) { - description("Failed to send a request to call a remote JSON-RPC procedure") - display( - "Failed to send a request to call the \"{}\" remote JSON-RPC procedure", - method - ) - } - - SerializeArgumentsError { - description("Failed to serialize JSON-RPC request arguments") - } - - SerializeSubscriptionId { - description("Failed to serialize JSON-RPC subscription ID") - } - - UnsubscribeError { - description("Failed to unsubscribe from a remote event") - } - - WebSocketError { - description("Error with WebSocket connection") - } - } - } -} -pub use self::errors::*; - -#[derive(Debug, Eq, PartialEq)] -pub enum SubscriptionHandlerResult { - Active, - Finished, -} - -type SubscriptionHandler = Box<Fn(JsonValue) -> SubscriptionHandlerResult + Send>; - -struct ActiveRequest { - id: i64, - response_tx: mpsc::Sender<Result<JsonValue>>, -} - -impl ActiveRequest { - pub fn new(id: i64, response_tx: mpsc::Sender<Result<JsonValue>>) -> Self { - ActiveRequest { id, response_tx } - } - - pub fn id(&self) -> i64 { - self.id - } - - pub fn send_response(&mut self, response: Result<JsonValue>) { - let _ = self.response_tx.send(response); - } -} - -enum WsIpcCommand { - Call { - method: String, - arguments: JsonValue, - response_tx: mpsc::Sender<Result<JsonValue>>, - }, - - Subscribe { - id: SubscriptionId, - handler: SubscriptionHandler, - unsubscribe_method: String, - }, - - Response { - id: i64, - result: Result<JsonValue>, - }, - - Notification { - subscription: SubscriptionId, - event: JsonValue, - }, - - Error(Error), -} - -struct Factory { - connection_tx: mpsc::Sender<WsIpcCommand>, - sender_tx: mpsc::Sender<ws::Sender>, -} - -impl ws::Factory for Factory { - type Handler = Handler; - - fn connection_made(&mut self, sender: ws::Sender) -> Self::Handler { - trace!("Connection established"); - - let _ = self.sender_tx.send(sender); - - Handler { - connection_tx: self.connection_tx.clone(), - } - } -} - - -struct Handler { - connection_tx: mpsc::Sender<WsIpcCommand>, -} - -impl Handler { - fn process_message(&mut self, msg: ws::Message) -> Result<()> { - trace!("WsIpcClient incoming message: {:?}", msg); - let mut message_json_object = self.parse_message_object(msg)?; - let response_id = self.parse_response_id(&mut message_json_object)?; - - let command = if let Some(id) = response_id { - let result = self.parse_response_result(message_json_object); - - WsIpcCommand::Response { id, result } - } else { - let (subscription, event) = self.parse_subscription_event(message_json_object)?; - - WsIpcCommand::Notification { - subscription, - event, - } - }; - - self.connection_tx - .send(command) - .chain_err(|| ErrorKind::ConnectionHandlerStopped) - } - - fn parse_message_object(&self, msg: ws::Message) -> Result<JsonMap> { - let parsed_json: JsonResult<JsonValue> = match msg { - ws::Message::Text(s) => serde_json::from_str(&s), - ws::Message::Binary(b) => serde_json::from_slice(&b), - }; - let json = parsed_json.chain_err(|| { - ErrorKind::InvalidJsonRpcResponse("Unable to deserialize ws message as JSON") - })?; - - let mut json_object_map = match json { - JsonValue::Object(object_map) => object_map, - _ => bail!(ErrorKind::InvalidJsonRpcResponse( - "Received response is not a JSON object" - )), - }; - - ensure!( - json_object_map.remove("jsonrpc") == Some(JsonValue::String("2.0".to_owned())), - ErrorKind::InvalidJsonRpcResponse("Invalid JSON-RPC version field in response") - ); - - Ok(json_object_map) - } - - fn parse_response_id(&self, json_object_map: &mut JsonMap) -> Result<Option<i64>> { - match json_object_map.remove("id") { - Some(JsonValue::Number(id)) => id.as_i64().map(Some).ok_or_else(|| { - ErrorKind::InvalidJsonRpcResponse("Invalid request ID number").into() - }), - Some(_) => Err(ErrorKind::InvalidJsonRpcResponse("Invalid request ID value").into()), - None => Ok(None), - } - } - - fn parse_response_result(&self, mut json_object_map: JsonMap) -> Result<JsonValue> { - let result = json_object_map.remove("result"); - let error = json_object_map.remove("error"); - - match (result, error) { - (Some(remote_result), None) => Ok(remote_result), - (None, Some(JsonValue::String(remote_error))) => { - Err(ErrorKind::ErrorResponse(remote_error).into()) - } - (None, Some(json_value)) => { - Err(ErrorKind::ErrorResponse(json_value.to_string()).into()) - } - (None, None) => Err(ErrorKind::InvalidJsonRpcResponse("Missing RPC result").into()), - (Some(_), Some(_)) => Err(ErrorKind::InvalidJsonRpcResponse( - "Response is ambiguous, contains both a successful result and an error", - ).into()), - } - } - - fn parse_subscription_event( - &mut self, - mut notification: JsonMap, - ) -> Result<(SubscriptionId, JsonValue)> { - match notification.remove("params") { - Some(JsonValue::Object(mut parameters)) => { - let raw_id = parameters.remove("subscription").ok_or_else(|| { - ErrorKind::InvalidSubscriptionEvent("Missing subscription ID") - })?; - let id = SubscriptionId::parse_value(&raw_id).ok_or_else(|| { - ErrorKind::InvalidSubscriptionEvent("Invalid subscription ID") - })?; - let event = parameters - .remove("result") - .ok_or_else(|| ErrorKind::InvalidSubscriptionEvent("Missing event data"))?; - - Ok((id, event)) - } - Some(_) => bail!(ErrorKind::InvalidSubscriptionEvent( - "RPC parameters is not a JSON object map" - )), - None => bail!(ErrorKind::InvalidSubscriptionEvent( - "Missing RPC parameters" - )), - } - } -} - -impl ws::Handler for Handler { - fn on_message(&mut self, msg: ws::Message) -> ws::Result<()> { - if let Err(error) = self.process_message(msg) { - let chained_error = error.chain_err(|| "Failed to process RPC message"); - error!("{}", chained_error.display_chain()); - } - - Ok(()) - } - - fn on_error(&mut self, error: ws::Error) { - let error = Error::with_chain(error, ErrorKind::WebSocketError); - - let _ = self.connection_tx.send(WsIpcCommand::Error(error)); - } -} - -pub struct WsIpcClient { - connection_tx: mpsc::Sender<WsIpcCommand>, -} - -impl WsIpcClient { - pub fn connect(server_id: &::IpcServerId) -> Result<Self> { - let url = Url::parse(&server_id) - .chain_err(|| ErrorKind::InvalidServerIdUrl(server_id.to_owned()))?; - let (connection_tx, connection_rx) = mpsc::channel(); - let sender = Self::open_websocket(url, connection_tx.clone())?; - - WsIpcClientConnection::spawn(sender, connection_rx); - - Ok(WsIpcClient { connection_tx }) - } - - fn open_websocket(url: Url, connection_tx: mpsc::Sender<WsIpcCommand>) -> Result<ws::Sender> { - let (sender_tx, sender_rx) = mpsc::channel(); - let factory = Factory { - connection_tx, - sender_tx, - }; - - let mut websocket = ws::WebSocket::new(factory) - .chain_err(|| ErrorKind::ConnectError("Unable to create WebSocket"))?; - - websocket - .connect(url) - .chain_err(|| ErrorKind::ConnectError("Unable to connect WebSocket to URL"))?; - - thread::spawn(move || { - let result = websocket - .run() - .chain_err(|| ErrorKind::ConnectError("Error while running WebSocket event loop")); - - if let Err(error) = result { - error!("{}", error.display_chain()); - } - }); - - sender_rx - .recv() - .chain_err(|| ErrorKind::ConnectError("WebSocket connection failed")) - } - - pub fn subscribe<V, M>( - &mut self, - subscribe_method: String, - unsubscribe_method: String, - sender: mpsc::Sender<M>, - ) -> Result<()> - where - V: for<'de> serde::Deserialize<'de>, - M: From<V> + Send + 'static, - { - let raw_subscription_id = self.call(&subscribe_method, &[] as &[u8; 0])?; - let subscription_id = SubscriptionId::parse_value(&raw_subscription_id) - .ok_or_else(|| ErrorKind::InvalidSubscriptionId(raw_subscription_id))?; - - let handler = move |json_value| match forward_subscription_event( - &subscribe_method, - json_value, - &sender, - ) { - Ok(()) => SubscriptionHandlerResult::Active, - Err(error) => { - error!("{}", error.display_chain()); - SubscriptionHandlerResult::Finished - } - }; - - self.register_subscription(subscription_id, handler, unsubscribe_method)?; - - Ok(()) - } - - fn register_subscription<H>( - &mut self, - id: SubscriptionId, - handler: H, - unsubscribe_method: String, - ) -> Result<()> - where - H: Fn(JsonValue) -> SubscriptionHandlerResult + Send + 'static, - { - self.connection_tx - .send(WsIpcCommand::Subscribe { - id, - handler: Box::new(handler), - unsubscribe_method, - }).chain_err(|| ErrorKind::ConnectionHandlerStopped) - } - - pub fn call<S, T, O>(&mut self, method: S, params: &T) -> Result<O> - where - S: ToString, - T: serde::Serialize, - O: for<'de> serde::Deserialize<'de>, - { - let arguments = - serde_json::to_value(params).chain_err(|| ErrorKind::SerializeArgumentsError)?; - let (response_tx, response_rx) = mpsc::channel(); - let command = WsIpcCommand::Call { - method: method.to_string(), - arguments, - response_tx, - }; - - self.connection_tx - .send(command) - .chain_err(|| ErrorKind::ConnectionHandlerStopped)?; - - let json_result = response_rx - .recv() - .chain_err(|| ErrorKind::MissingResponse)?; - - Ok(serde_json::from_value(json_result?) - .chain_err(|| ErrorKind::DeserializeResponseError)?) - } -} - -struct WsIpcClientConnection { - next_id: i64, - active_request: Option<ActiveRequest>, - active_subscriptions: HashMap<SubscriptionId, (SubscriptionHandler, String)>, - sender: ws::Sender, -} - -impl WsIpcClientConnection { - pub fn spawn(sender: ws::Sender, commands: mpsc::Receiver<WsIpcCommand>) { - let mut instance = WsIpcClientConnection { - next_id: 1, - active_request: None, - active_subscriptions: HashMap::new(), - sender, - }; - - thread::spawn(move || { - if let Err(error) = instance.run(commands) { - let chained_error = Error::with_chain(error, "WsIpcClient event loop error"); - error!("{}", chained_error.display_chain()); - } - }); - } - - fn run(&mut self, commands: mpsc::Receiver<WsIpcCommand>) -> Result<()> { - use self::WsIpcCommand::*; - - for command in commands { - match command { - Call { - method, - arguments, - response_tx, - } => self.call(method, arguments, response_tx)?, - Subscribe { - id, - handler, - unsubscribe_method, - } => { - self.active_subscriptions - .insert(id, (handler, unsubscribe_method)); - } - Response { id, result } => self.handle_response(id, result)?, - Notification { - subscription, - event, - } => self.handle_notification(subscription, event)?, - Error(error) => self.handle_error(error), - } - } - - Ok(()) - } - - fn call( - &mut self, - method: String, - arguments: JsonValue, - response_tx: mpsc::Sender<Result<JsonValue>>, - ) -> Result<()> { - let id = self.new_id(); - self.queue_request_response(id, response_tx); - self.send_request(id, method, arguments) - } - - fn new_id(&mut self) -> i64 { - let id = self.next_id; - self.next_id += 1; - id - } - - fn queue_request_response(&mut self, id: i64, response_tx: mpsc::Sender<Result<JsonValue>>) { - self.active_request = Some(ActiveRequest::new(id, response_tx)); - } - - fn send_request(&mut self, id: i64, method: String, arguments: JsonValue) -> Result<()> { - let json_request = self.build_json_request(id, &method, arguments); - - self.sender - .send(json_request.as_bytes()) - .chain_err(|| ErrorKind::SendRequestError(method)) - } - - fn build_json_request(&mut self, id: i64, method: &str, params: JsonValue) -> String { - let request_json = json!({ - "jsonrpc": "2.0", - "id": id, - "method": method, - "params": params, - }); - format!("{}", request_json) - } - - fn handle_response(&mut self, id: i64, result: Result<JsonValue>) -> Result<()> { - if let Some(mut request) = self.active_request.take() { - if request.id() == id { - request.send_response(result); - } else { - self.active_request = Some(request); - warn!("Received an unexpected response with ID {}", id); - } - } else { - warn!("Received an unexpected response with ID {}", id); - } - - Ok(()) - } - - fn handle_notification(&mut self, id: SubscriptionId, event: JsonValue) -> Result<()> { - let unsubscribe_method = - if let Some((handler, unsubscribe_method)) = self.active_subscriptions.get(&id) { - match handler(event) { - SubscriptionHandlerResult::Active => None, - SubscriptionHandlerResult::Finished => Some(unsubscribe_method.clone()), - } - } else { - warn!("Received an unexpected notification"); - None - }; - - if let Some(method) = unsubscribe_method { - self.unsubscribe(method, id)?; - } - - Ok(()) - } - - fn unsubscribe(&mut self, method: String, id: SubscriptionId) -> Result<()> { - self.active_subscriptions.remove(&id); - - let (result_tx, _) = mpsc::channel(); - let arguments = match id { - SubscriptionId::Number(id) => serde_json::to_value(&[id]), - SubscriptionId::String(id) => serde_json::to_value(&[id]), - }.chain_err(|| ErrorKind::SerializeSubscriptionId); - - self.call(method, arguments?, result_tx) - .chain_err(|| ErrorKind::UnsubscribeError) - } - - fn handle_error(&mut self, error: Error) { - if let Some(ref mut request) = self.active_request { - let _ = request.response_tx.send(Err(error)); - } else { - error!("{}", error.display_chain()); - } - } -} - -fn forward_subscription_event<V, M>( - subscribe_method: &String, - json_value: JsonValue, - sender: &mpsc::Sender<M>, -) -> Result<()> -where - V: for<'de> serde::Deserialize<'de>, - M: From<V> + Send + 'static, -{ - let value: V = serde_json::from_value(json_value) - .chain_err(|| ErrorKind::DeserializeSubscriptionEvent(subscribe_method.clone()))?; - let message = M::from(value); - - sender - .send(message) - .chain_err(|| ErrorKind::ForwardSubscriptionEvent(subscribe_method.clone())) -} diff --git a/talpid-ipc/src/lib.rs b/talpid-ipc/src/lib.rs index 4c6c737165..d775dfbcaa 100644 --- a/talpid-ipc/src/lib.rs +++ b/talpid-ipc/src/lib.rs @@ -18,20 +18,20 @@ extern crate serde; extern crate serde_json; extern crate jsonrpc_core; +extern crate jsonrpc_ipc_server; extern crate jsonrpc_pubsub; -extern crate jsonrpc_ws_server; +#[macro_use] +extern crate jsonrpc_client_core; +extern crate jsonrpc_client_ipc; +extern crate tokio_core; extern crate url; extern crate ws; use jsonrpc_core::{MetaIoHandler, Metadata}; -use jsonrpc_ws_server::{MetaExtractor, NoopExtractor, Server, ServerBuilder}; +use jsonrpc_ipc_server::{MetaExtractor, NoopExtractor, Server, ServerBuilder}; use std::fmt; -use std::net::{IpAddr, Ipv4Addr, SocketAddr}; - -mod client; -pub use client::*; /// An Id created by the Ipc server that the client can use to connect to it pub type IpcServerId = String; @@ -46,33 +46,36 @@ error_chain!{ pub struct IpcServer { - address: String, + path: String, server: Server, } impl IpcServer { - pub fn start<M: Metadata>(handler: MetaIoHandler<M>) -> Result<Self> { - Self::start_with_metadata(handler, NoopExtractor) + pub fn start<M: Metadata + Default>(handler: MetaIoHandler<M>, path: String) -> Result<Self> { + Self::start_with_metadata(handler, NoopExtractor, path) } - pub fn start_with_metadata<M, E>(handler: MetaIoHandler<M>, meta_extractor: E) -> Result<Self> + pub fn start_with_metadata<M, E>( + handler: MetaIoHandler<M>, + meta_extractor: E, + path: String, + ) -> Result<Self> where - M: Metadata, + M: Metadata + Default, E: MetaExtractor<M>, { - let listen_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0); ServerBuilder::new(handler) .session_meta_extractor(meta_extractor) - .start(&listen_addr) + .start(&path) .map(|server| IpcServer { - address: format!("ws://{}", server.addr()), + path: path.to_owned(), server: server, }).chain_err(|| ErrorKind::IpcServerError) } - /// Returns the localhost address this `IpcServer` is listening on. - pub fn address(&self) -> &str { - &self.address + /// Returns the uds/named pipe path this `IpcServer` is listening on. + pub fn path(&self) -> &str { + &self.path } /// Creates a handle bound to this `IpcServer` that can be used to shut it down. @@ -80,10 +83,10 @@ impl IpcServer { CloseHandle(self.server.close_handle()) } - /// Consumes the server and waits for it to finish. Get an `CloseHandle` before calling this + /// Consumes the server and waits for it to finish. Get a `CloseHandle` before calling this /// if you want to be able to shut the server down. - pub fn wait(self) -> Result<()> { - self.server.wait().chain_err(|| ErrorKind::IpcServerError) + pub fn wait(self) { + self.server.wait() } } @@ -92,14 +95,14 @@ impl IpcServer { impl fmt::Debug for IpcServer { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("IpcServer") - .field("address", &self.address) + .field("path", &self.path) .finish() } } #[derive(Clone)] -pub struct CloseHandle(jsonrpc_ws_server::CloseHandle); +pub struct CloseHandle(jsonrpc_ipc_server::CloseHandle); impl CloseHandle { pub fn close(self) { diff --git a/talpid-ipc/tests/ipc-client-server.rs b/talpid-ipc/tests/ipc-client-server.rs index 34b44beb68..b7ae7c2f2f 100644 --- a/talpid-ipc/tests/ipc-client-server.rs +++ b/talpid-ipc/tests/ipc-client-server.rs @@ -1,11 +1,22 @@ #[macro_use] extern crate assert_matches; extern crate env_logger; +extern crate jsonrpc_client_core; +extern crate jsonrpc_client_ipc; extern crate jsonrpc_core; #[macro_use] extern crate jsonrpc_macros; extern crate talpid_ipc; +extern crate tokio_core; +extern crate uuid; +extern crate futures; + +use futures::sync::oneshot; +use futures::Future; +use tokio_core::reactor::Core; + +use jsonrpc_client_core::Error as ClientError; use jsonrpc_core::{Error, IoHandler}; use std::sync::{mpsc, Mutex}; use std::time::Duration; @@ -35,13 +46,13 @@ fn can_call_rpcs_on_server() { env_logger::init(); let (server, rx) = create_server(); - let server_id = server.address().to_owned(); - let mut client = create_client(&server_id); + let server_path = server.path().to_owned(); + let mut client = create_client(server_path); - let _result: () = client.call("foo", &[97]).unwrap(); + let _result: () = client.call_method("foo", &[97]).wait().unwrap(); assert_eq!(Ok(97), rx.recv_timeout(Duration::from_millis(500))); - let result: Result<(), _> = client.call("invalid_method", &[0]); + let result: Result<(), ClientError> = client.call_method("invalid_method", &[0]).wait(); assert_matches!(result, Err(_)); server.close_handle().close(); } @@ -51,14 +62,7 @@ fn can_call_rpcs_on_server() { #[test] #[should_panic] fn ipc_client_invalid_url() { - create_client(&"INVALID ID".to_owned()); -} - -#[test] -fn ipc_client_bad_connection() { - let mut client = create_client(&"ws://127.0.0.1:9876".to_owned()); - let result: Result<(), _> = client.call("invalid_method", &[0]); - assert_matches!(result, Err(_)); + create_client("INVALID ID".to_owned()); } fn create_server() -> (talpid_ipc::IpcServer, mpsc::Receiver<i64>) { @@ -67,10 +71,30 @@ fn create_server() -> (talpid_ipc::IpcServer, mpsc::Receiver<i64>) { let mut io = IoHandler::new(); io.extend_with(rpc.to_delegate()); - let server = talpid_ipc::IpcServer::start(io.into()).unwrap(); + let uuid = uuid::Uuid::new_v4().to_string(); + let ipc_path = if cfg!(windows) { + format!(r"\\.\pipe\ipc-test-{}", uuid) + } else { + format!("/tmp/ipc-test-{}", uuid) + }; + let server = talpid_ipc::IpcServer::start(io.into(), ipc_path).unwrap(); (server, rx) } -fn create_client(id: &talpid_ipc::IpcServerId) -> talpid_ipc::WsIpcClient { - talpid_ipc::WsIpcClient::connect(id).unwrap() +fn create_client(ipc_path: String) -> jsonrpc_client_core::ClientHandle { + use std::thread; + let (tx, rx) = oneshot::channel(); + + thread::spawn(move || { + let mut core = Core::new().expect("failed to spawn reactor"); + let (client, client_handle) = + jsonrpc_client_ipc::IpcTransport::new(&ipc_path, &core.handle()) + .expect("failed to construct a transport") + .into_client(); + tx.send(client_handle); + core.run(client); + }); + + let handle = rx.wait().expect("Failed to construct a valid client"); + handle } diff --git a/talpid-openvpn-plugin/Cargo.toml b/talpid-openvpn-plugin/Cargo.toml index c5987d68ef..165a4976be 100644 --- a/talpid-openvpn-plugin/Cargo.toml +++ b/talpid-openvpn-plugin/Cargo.toml @@ -12,9 +12,14 @@ crate-type = ["cdylib"] error-chain = "0.12" log = "0.4" env_logger = "0.5" +jsonrpc-client-core = { git = "https://github.com/mullvad/jsonrpc-client-rs" } +jsonrpc-client-ipc = { git = "https://github.com/mullvad/jsonrpc-client-rs" } +tokio-core = "0.1" +futures = "0.1" openvpn-plugin = { version = "0.3", features = ["serde", "log"] } talpid-ipc = { path = "../talpid-ipc" } + [target.'cfg(windows)'.build-dependencies] windres = "0.2" diff --git a/talpid-openvpn-plugin/src/lib.rs b/talpid-openvpn-plugin/src/lib.rs index 9c6c690339..ed358a5df7 100644 --- a/talpid-openvpn-plugin/src/lib.rs +++ b/talpid-openvpn-plugin/src/lib.rs @@ -13,12 +13,19 @@ extern crate error_chain; extern crate log; #[macro_use] +extern crate jsonrpc_client_core; +extern crate futures; +extern crate jsonrpc_client_ipc; +#[macro_use] extern crate openvpn_plugin; -extern crate talpid_ipc; +extern crate tokio_core; +use error_chain::ChainedError; use openvpn_plugin::types::{EventResult, OpenVpnPluginEvent}; use std::collections::HashMap; use std::ffi::CString; +use std::sync::Mutex; + mod processing; use processing::EventProcessor; @@ -54,26 +61,28 @@ openvpn_plugin!( ::openvpn_open, ::openvpn_close, ::openvpn_event, - ::EventProcessor + ::Mutex<EventProcessor> ); pub struct Arguments { - server_id: talpid_ipc::IpcServerId, - credentials: String, + ipc_socket_path: String, } fn openvpn_open( args: Vec<CString>, _env: HashMap<CString, CString>, -) -> Result<(Vec<OpenVpnPluginEvent>, EventProcessor)> { +) -> Result<(Vec<OpenVpnPluginEvent>, Mutex<EventProcessor>)> { env_logger::init(); debug!("Initializing plugin"); let arguments = parse_args(&args)?; - info!("Connecting back to talpid core at {}", arguments.server_id); - let processor = EventProcessor::new(&arguments).chain_err(|| ErrorKind::InitHandleFailed)?; + info!( + "Connecting back to talpid core at {}", + arguments.ipc_socket_path + ); + let processor = EventProcessor::new(arguments).chain_err(|| ErrorKind::InitHandleFailed)?; - Ok((INTERESTING_EVENTS.to_vec(), processor)) + Ok((INTERESTING_EVENTS.to_vec(), Mutex::new(processor))) } fn parse_args(args: &[CString]) -> Result<Arguments> { @@ -82,37 +91,39 @@ fn parse_args(args: &[CString]) -> Result<Arguments> { .into_iter(); let _plugin_path = args_iter.next(); - let server_id: talpid_ipc::IpcServerId = args_iter + let ipc_socket_path: String = args_iter .next() .ok_or_else(|| ErrorKind::Msg("No core server id given as first argument".to_owned()))?; - let credentials = args_iter - .next() - .ok_or_else(|| ErrorKind::Msg("No IPC credentials given as second argument".to_owned()))?; - Ok(Arguments { - server_id, - credentials, - }) + Ok(Arguments { ipc_socket_path }) } -fn openvpn_close(_handle: EventProcessor) { - debug!("Unloading plugin"); +fn openvpn_close(_handle: Mutex<EventProcessor>) { + info!("Unloading plugin"); } fn openvpn_event( event: OpenVpnPluginEvent, _args: Vec<CString>, env: HashMap<CString, CString>, - handle: &mut EventProcessor, + handle: &mut Mutex<EventProcessor>, ) -> Result<EventResult> { debug!("Received event: {:?}", event); let parsed_env = openvpn_plugin::ffi::parse::env_utf8(&env).chain_err(|| ErrorKind::ParseEnvFailed)?; - handle + let result = handle + .lock() + .expect("failed to obtain mutex for EventProcessor") .process_event(event, parsed_env) - .chain_err(|| ErrorKind::EventProcessingFailed)?; - Ok(EventResult::Success) + .chain_err(|| ErrorKind::EventProcessingFailed); + match result { + Ok(()) => Ok(EventResult::Success), + Err(e) => { + error!("{}", e.display_chain()); + Ok(EventResult::Failure) + } + } } diff --git a/talpid-openvpn-plugin/src/processing.rs b/talpid-openvpn-plugin/src/processing.rs index 99533945ed..66d009b44f 100644 --- a/talpid-openvpn-plugin/src/processing.rs +++ b/talpid-openvpn-plugin/src/processing.rs @@ -1,42 +1,55 @@ use openvpn_plugin; use std::collections::HashMap; -use std::sync::Mutex; -use talpid_ipc::WsIpcClient; + +extern crate futures; + +use jsonrpc_client_core::{Future, Result as ClientResult, Transport}; +use jsonrpc_client_ipc::IpcTransport; +use tokio_core::reactor::Core; use super::Arguments; error_chain! { errors { - AuthDenied { - description("Failed to authenticate with Talpid IPC server") - } IpcSendingError { description("Failed while sending an event over the IPC channel") } + + Shutdown { + description("Connection is shut down") + } + } } /// Struct processing OpenVPN events and notifies listeners over IPC pub struct EventProcessor { - ipc_client: Mutex<WsIpcClient>, + ipc_client: EventProxy, + client_stop: ::std::sync::mpsc::Receiver<ClientResult<()>>, + core: Core, } impl EventProcessor { - pub fn new(arguments: &Arguments) -> Result<EventProcessor> { + pub fn new(arguments: Arguments) -> Result<EventProcessor> { trace!("Creating EventProcessor"); - let mut ipc_client = WsIpcClient::connect(&arguments.server_id) - .chain_err(|| "Unable to create IPC client")?; + let core = Core::new().chain_err(|| "Unable to initialize Tokio Core")?; + let handle = core.handle(); + let (client, client_handle) = IpcTransport::new(&arguments.ipc_socket_path, &handle) + .chain_err(|| "Unable to create IPC transport")? + .into_client(); - trace!("Authenticating EventProcessor"); - match ipc_client.call("authenticate", &[&arguments.credentials]) { - Ok(true) => trace!("Credentials accepted"), - Ok(false) => bail!(ErrorKind::AuthDenied), - Err(error) => bail!(Error::with_chain(error, ErrorKind::AuthDenied)), - } + let (tx, client_stop) = ::std::sync::mpsc::channel(); + + let client_future = client.then(move |result| tx.send(result)).map_err(|_| ()); + handle.spawn(client_future); + + let ipc_client = EventProxy::new(client_handle); Ok(EventProcessor { - ipc_client: Mutex::new(ipc_client), + ipc_client, + client_stop, + core, }) } @@ -46,11 +59,25 @@ impl EventProcessor { env: HashMap<String, String>, ) -> Result<()> { trace!("Processing \"{:?}\" event", event); - self.ipc_client - .lock() - .expect("a thread panicked while using the RPC client in the OpenVPN plugin") - .call("openvpn_event", &(event, env)) - .map(|_: Option<()>| ()) - .chain_err(|| ErrorKind::IpcSendingError) + let call_future = self + .ipc_client + .openvpn_event(event, env) + .map_err(|e| Error::with_chain(e, ErrorKind::IpcSendingError)); + self.core.run(call_future)?; + self.check_client_status() + } + + fn check_client_status(&mut self) -> Result<()> { + use std::sync::mpsc::TryRecvError::*; + match self.client_stop.try_recv() { + Err(Empty) => Ok(()), + Err(Disconnected) => Err(ErrorKind::Shutdown.into()), + Ok(Ok(_)) => Err(ErrorKind::Shutdown.into()), + Ok(Err(e)) => Err(Error::with_chain(e, ErrorKind::IpcSendingError)), + } } } + +jsonrpc_client!(pub struct EventProxy { + pub fn openvpn_event(&mut self, event: openvpn_plugin::types::OpenVpnPluginEvent, env: HashMap<String, String>) -> Future<()>; +}); |
