summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorLinus Färnstrand <linus@mullvad.net>2017-05-18 17:04:09 +0200
committerLinus Färnstrand <linus@mullvad.net>2017-05-23 08:53:41 +0200
commit078b706f309253c9cd97b2689c5505d198acc52d (patch)
treedfaca1099212016b993dd29c799c5e5ec2ac1357
parentce6f18887987675762df78a6499bb4d8f3c56618 (diff)
downloadmullvadvpn-078b706f309253c9cd97b2689c5505d198acc52d.tar.xz
mullvadvpn-078b706f309253c9cd97b2689c5505d198acc52d.zip
Add initial WsIpcClient
-rw-r--r--talpid_ipc/Cargo.toml3
-rw-r--r--talpid_ipc/src/client.rs132
-rw-r--r--talpid_ipc/src/lib.rs6
3 files changed, 141 insertions, 0 deletions
diff --git a/talpid_ipc/Cargo.toml b/talpid_ipc/Cargo.toml
index a0b24de4a3..d9b0b2dd77 100644
--- a/talpid_ipc/Cargo.toml
+++ b/talpid_ipc/Cargo.toml
@@ -11,9 +11,12 @@ serde_json = "1.0"
log = "0.3"
jsonrpc-core = { git = "https://github.com/faern/jsonrpc", branch = "bind-zero" }
jsonrpc-ws-server = { git = "https://github.com/faern/jsonrpc", branch = "bind-zero" }
+ws = { git = "https://github.com/faern/ws-rs", branch = "arbitrary-handshake-responses" }
+url = "1.4"
[target.'cfg(not(windows))'.dependencies]
zmq = "0.8"
[dev-dependencies]
assert_matches = "1.0"
+env_logger = "0.4"
diff --git a/talpid_ipc/src/client.rs b/talpid_ipc/src/client.rs
new file mode 100644
index 0000000000..1776d46273
--- /dev/null
+++ b/talpid_ipc/src/client.rs
@@ -0,0 +1,132 @@
+use serde;
+use serde_json;
+use std::sync::mpsc;
+use url;
+use ws;
+
+mod errors {
+ error_chain!{}
+}
+pub use self::errors::*;
+
+
+struct Factory {
+ request: String,
+ result_tx: mpsc::Sender<Result<()>>,
+}
+
+impl ws::Factory for Factory {
+ type Handler = Handler;
+
+ fn connection_made(&mut self, sender: ws::Sender) -> Self::Handler {
+ debug!("Sending: {}", self.request);
+ if let Err(e) =
+ sender.send(&self.request[..]).chain_err(|| "Unable to send jsonrpc request") {
+ self.result_tx.send(Err(e)).unwrap();
+ }
+ Handler {
+ sender,
+ result_tx: self.result_tx.clone(),
+ }
+ }
+}
+
+
+struct Handler {
+ sender: ws::Sender,
+ result_tx: mpsc::Sender<Result<()>>,
+}
+
+impl Handler {
+ fn validate_reply(&self, msg: ws::Message) -> ws::Result<()> {
+ let json: serde_json::Value = match msg {
+ ws::Message::Text(s) => serde_json::from_str(&s),
+ ws::Message::Binary(b) => serde_json::from_slice(&b),
+ }
+ .map_err(|e| ws::Error::from(Box::new(e)))?;
+ debug!("JSON response: {}", json);
+ // TODO(linus): Properly validate reply
+ Ok(())
+ }
+}
+
+impl ws::Handler for Handler {
+ fn on_message(&mut self, msg: ws::Message) -> ws::Result<()> {
+ self.validate_reply(msg)?;
+ let close_result = self.sender.close(ws::CloseCode::Normal);
+ if let Err(e) = close_result.chain_err(|| "Unable to close WebSocket") {
+ self.result_tx.send(Err(e)).unwrap();
+ }
+ self.result_tx.send(Ok(())).unwrap();
+ Ok(())
+ }
+}
+
+
+pub struct WsIpcClient {
+ url: url::Url,
+ next_id: i64,
+}
+
+impl WsIpcClient {
+ pub fn new(server_id: ::IpcServerId) -> Result<Self> {
+ let url = url::Url::parse(&server_id).chain_err(|| "Unable to parse server_id as url")?;
+ Ok(WsIpcClient { url, next_id: 1 })
+ }
+
+ pub fn call<T>(&mut self, method: &str, params: &T) -> Result<()>
+ where T: serde::Serialize
+ {
+ let (result_tx, result_rx) = mpsc::channel();
+ let factory = Factory {
+ request: self.get_json(method, params),
+ result_tx: result_tx,
+ };
+ let mut ws = ws::WebSocket::new(factory).chain_err(|| "Unable to create WebSocket")?;
+ ws.connect(self.url.clone()).chain_err(|| "Unable to connect WebSocket to url")?;
+ ws.run().chain_err(|| "Error while running WebSocket event loop")?;
+
+ match result_rx.try_recv() {
+ Ok(result) => result,
+ Err(_) => bail!("Internal error, no WebSocket status"),
+ }
+ }
+
+ fn get_json<T>(&mut self, method: &str, params: &T) -> String
+ where T: serde::Serialize
+ {
+ let request_json = json!({
+ "jsonrpc": "2.0",
+ "id": self.get_id(),
+ "method": method,
+ "params": params,
+ });
+ format!("{}", request_json)
+ }
+
+ fn get_id(&mut self) -> i64 {
+ let id = self.next_id;
+ self.next_id += 1;
+ id
+ }
+}
+
+
+#[cfg(test)]
+mod tests {
+ extern crate env_logger;
+ use super::*;
+
+ // TODO(linus): This is not a test. Just an ugly way to quickly test the client implementation
+ #[test]
+ #[ignore]
+ fn ws_ipc_client_tester() {
+ env_logger::init().unwrap();
+
+ let mut ws = WsIpcClient::new("ws://127.0.0.1:INSERT_PORT".to_owned()).unwrap();
+ let event = serde_json::Value::String("Up".to_owned());
+ let env = serde_json::Value::Object(serde_json::Map::new());
+ let params = serde_json::Value::Array(vec![event, env]);
+ println!("CALL RESULT: {:?}", ws.call("openvpn_event", &params));
+ }
+}
diff --git a/talpid_ipc/src/lib.rs b/talpid_ipc/src/lib.rs
index 8452a8db92..f0f50e5f2b 100644
--- a/talpid_ipc/src/lib.rs
+++ b/talpid_ipc/src/lib.rs
@@ -4,9 +4,13 @@ extern crate error_chain;
extern crate log;
extern crate serde;
+#[macro_use]
+extern crate serde_json;
extern crate jsonrpc_core;
extern crate jsonrpc_ws_server;
+extern crate ws;
+extern crate url;
use jsonrpc_core::{MetaIoHandler, Metadata};
use jsonrpc_ws_server::{MetaExtractor, NoopExtractor, Server, ServerBuilder};
@@ -24,6 +28,8 @@ mod ipc_impl;
pub use self::ipc_impl::*;
+mod client;
+pub use client::*;
/// An Id created by the Ipc server that the client can use to connect to it
pub type IpcServerId = String;