diff options
| author | Linus Färnstrand <linus@mullvad.net> | 2017-05-18 17:04:09 +0200 |
|---|---|---|
| committer | Linus Färnstrand <linus@mullvad.net> | 2017-05-23 08:53:41 +0200 |
| commit | 078b706f309253c9cd97b2689c5505d198acc52d (patch) | |
| tree | dfaca1099212016b993dd29c799c5e5ec2ac1357 | |
| parent | ce6f18887987675762df78a6499bb4d8f3c56618 (diff) | |
| download | mullvadvpn-078b706f309253c9cd97b2689c5505d198acc52d.tar.xz mullvadvpn-078b706f309253c9cd97b2689c5505d198acc52d.zip | |
Add initial WsIpcClient
| -rw-r--r-- | talpid_ipc/Cargo.toml | 3 | ||||
| -rw-r--r-- | talpid_ipc/src/client.rs | 132 | ||||
| -rw-r--r-- | talpid_ipc/src/lib.rs | 6 |
3 files changed, 141 insertions, 0 deletions
diff --git a/talpid_ipc/Cargo.toml b/talpid_ipc/Cargo.toml index a0b24de4a3..d9b0b2dd77 100644 --- a/talpid_ipc/Cargo.toml +++ b/talpid_ipc/Cargo.toml @@ -11,9 +11,12 @@ serde_json = "1.0" log = "0.3" jsonrpc-core = { git = "https://github.com/faern/jsonrpc", branch = "bind-zero" } jsonrpc-ws-server = { git = "https://github.com/faern/jsonrpc", branch = "bind-zero" } +ws = { git = "https://github.com/faern/ws-rs", branch = "arbitrary-handshake-responses" } +url = "1.4" [target.'cfg(not(windows))'.dependencies] zmq = "0.8" [dev-dependencies] assert_matches = "1.0" +env_logger = "0.4" diff --git a/talpid_ipc/src/client.rs b/talpid_ipc/src/client.rs new file mode 100644 index 0000000000..1776d46273 --- /dev/null +++ b/talpid_ipc/src/client.rs @@ -0,0 +1,132 @@ +use serde; +use serde_json; +use std::sync::mpsc; +use url; +use ws; + +mod errors { + error_chain!{} +} +pub use self::errors::*; + + +struct Factory { + request: String, + result_tx: mpsc::Sender<Result<()>>, +} + +impl ws::Factory for Factory { + type Handler = Handler; + + fn connection_made(&mut self, sender: ws::Sender) -> Self::Handler { + debug!("Sending: {}", self.request); + if let Err(e) = + sender.send(&self.request[..]).chain_err(|| "Unable to send jsonrpc request") { + self.result_tx.send(Err(e)).unwrap(); + } + Handler { + sender, + result_tx: self.result_tx.clone(), + } + } +} + + +struct Handler { + sender: ws::Sender, + result_tx: mpsc::Sender<Result<()>>, +} + +impl Handler { + fn validate_reply(&self, msg: ws::Message) -> ws::Result<()> { + let json: serde_json::Value = match msg { + ws::Message::Text(s) => serde_json::from_str(&s), + ws::Message::Binary(b) => serde_json::from_slice(&b), + } + .map_err(|e| ws::Error::from(Box::new(e)))?; + debug!("JSON response: {}", json); + // TODO(linus): Properly validate reply + Ok(()) + } +} + +impl ws::Handler for Handler { + fn on_message(&mut self, msg: ws::Message) -> ws::Result<()> { + self.validate_reply(msg)?; + let close_result = self.sender.close(ws::CloseCode::Normal); + if let Err(e) = close_result.chain_err(|| "Unable to close WebSocket") { + self.result_tx.send(Err(e)).unwrap(); + } + self.result_tx.send(Ok(())).unwrap(); + Ok(()) + } +} + + +pub struct WsIpcClient { + url: url::Url, + next_id: i64, +} + +impl WsIpcClient { + pub fn new(server_id: ::IpcServerId) -> Result<Self> { + let url = url::Url::parse(&server_id).chain_err(|| "Unable to parse server_id as url")?; + Ok(WsIpcClient { url, next_id: 1 }) + } + + pub fn call<T>(&mut self, method: &str, params: &T) -> Result<()> + where T: serde::Serialize + { + let (result_tx, result_rx) = mpsc::channel(); + let factory = Factory { + request: self.get_json(method, params), + result_tx: result_tx, + }; + let mut ws = ws::WebSocket::new(factory).chain_err(|| "Unable to create WebSocket")?; + ws.connect(self.url.clone()).chain_err(|| "Unable to connect WebSocket to url")?; + ws.run().chain_err(|| "Error while running WebSocket event loop")?; + + match result_rx.try_recv() { + Ok(result) => result, + Err(_) => bail!("Internal error, no WebSocket status"), + } + } + + fn get_json<T>(&mut self, method: &str, params: &T) -> String + where T: serde::Serialize + { + let request_json = json!({ + "jsonrpc": "2.0", + "id": self.get_id(), + "method": method, + "params": params, + }); + format!("{}", request_json) + } + + fn get_id(&mut self) -> i64 { + let id = self.next_id; + self.next_id += 1; + id + } +} + + +#[cfg(test)] +mod tests { + extern crate env_logger; + use super::*; + + // TODO(linus): This is not a test. Just an ugly way to quickly test the client implementation + #[test] + #[ignore] + fn ws_ipc_client_tester() { + env_logger::init().unwrap(); + + let mut ws = WsIpcClient::new("ws://127.0.0.1:INSERT_PORT".to_owned()).unwrap(); + let event = serde_json::Value::String("Up".to_owned()); + let env = serde_json::Value::Object(serde_json::Map::new()); + let params = serde_json::Value::Array(vec![event, env]); + println!("CALL RESULT: {:?}", ws.call("openvpn_event", ¶ms)); + } +} diff --git a/talpid_ipc/src/lib.rs b/talpid_ipc/src/lib.rs index 8452a8db92..f0f50e5f2b 100644 --- a/talpid_ipc/src/lib.rs +++ b/talpid_ipc/src/lib.rs @@ -4,9 +4,13 @@ extern crate error_chain; extern crate log; extern crate serde; +#[macro_use] +extern crate serde_json; extern crate jsonrpc_core; extern crate jsonrpc_ws_server; +extern crate ws; +extern crate url; use jsonrpc_core::{MetaIoHandler, Metadata}; use jsonrpc_ws_server::{MetaExtractor, NoopExtractor, Server, ServerBuilder}; @@ -24,6 +28,8 @@ mod ipc_impl; pub use self::ipc_impl::*; +mod client; +pub use client::*; /// An Id created by the Ipc server that the client can use to connect to it pub type IpcServerId = String; |
