diff options
| author | Linus Färnstrand <linus@mullvad.net> | 2017-03-06 14:11:37 +0100 |
|---|---|---|
| committer | Linus Färnstrand <linus@mullvad.net> | 2017-03-06 15:28:53 +0100 |
| commit | f5597b36237dce1b92b9b3c67b7adeb102906347 (patch) | |
| tree | aca7f0c049eac33895383dc65b3958e3ace51dc0 | |
| parent | 811a5f527d5eb90f93f02b543cca55273891b583 (diff) | |
| download | mullvadvpn-f5597b36237dce1b92b9b3c67b7adeb102906347.tar.xz mullvadvpn-f5597b36237dce1b92b9b3c67b7adeb102906347.zip | |
Add serde serialization to ipc
| -rw-r--r-- | talpid_ipc/src/lib.rs | 5 | ||||
| -rw-r--r-- | talpid_ipc/src/zmq_ipc.rs | 35 | ||||
| -rw-r--r-- | talpid_ipc/tests/zmq_integration_tests.rs | 26 |
3 files changed, 46 insertions, 20 deletions
diff --git a/talpid_ipc/src/lib.rs b/talpid_ipc/src/lib.rs index 895f5a56ff..8698adb55c 100644 --- a/talpid_ipc/src/lib.rs +++ b/talpid_ipc/src/lib.rs @@ -15,13 +15,16 @@ pub use self::ipc_impl::*; /// An Id created by the Ipc server that the client can use to connect to it -type IpcServerId = String; +pub type IpcServerId = String; error_chain!{ errors { ReadFailure { description("Could not read IPC message") } + ParseFailure { + description("Unable to serialize/deserialize message") + } CouldNotStartServer { description("Failed to start the IPC server") } diff --git a/talpid_ipc/src/zmq_ipc.rs b/talpid_ipc/src/zmq_ipc.rs index 1967839415..dc2ab5e0bc 100644 --- a/talpid_ipc/src/zmq_ipc.rs +++ b/talpid_ipc/src/zmq_ipc.rs @@ -1,4 +1,6 @@ extern crate zmq; +extern crate serde_json; + use super::{ErrorKind, Result, ResultExt, IpcServerId}; use serde; @@ -45,24 +47,49 @@ fn start_receive_loop<T, F>(socket: zmq::Socket, mut on_message: F) -> thread::J F: FnMut(Result<T>) + Send + 'static { thread::spawn(move || loop { - let read_res = socket.recv_bytes(0).chain_err(|| ErrorKind::ReadFailure); + let read_res = socket.recv_bytes(0) + .chain_err(|| ErrorKind::ReadFailure) + .and_then(|a| parse_message(&a)); on_message(read_res); }) } -pub struct IpcClient { +fn parse_message<T>(message: &[u8]) -> Result<T> + where T: serde::Deserialize + 'static +{ + serde_json::from_slice(message).chain_err(|| ErrorKind::ParseFailure) +} + + +pub struct IpcClient<T> + where T: serde::Serialize +{ server_address: IpcServerId, socket: Option<zmq::Socket>, + _phantom: ::std::marker::PhantomData<T>, } -impl IpcClient { + +impl<T> IpcClient<T> + where T: serde::Serialize +{ pub fn new(server_id: IpcServerId) -> Self { IpcClient { server_address: server_id, socket: None, + _phantom: ::std::marker::PhantomData, } } - pub fn send(&mut self, message: &[u8]) -> Result<()> { + pub fn send(&mut self, message: &T) -> Result<()> { + let bytes = Self::serialize(message)?; + self.send_bytes(bytes.as_slice()) + } + + fn serialize(t: &T) -> Result<Vec<u8>> { + serde_json::to_vec(t).chain_err(|| ErrorKind::ParseFailure) + } + + fn send_bytes(&mut self, message: &[u8]) -> Result<()> { if self.socket.is_none() { self.connect().chain_err(|| ErrorKind::SendError)?; } diff --git a/talpid_ipc/tests/zmq_integration_tests.rs b/talpid_ipc/tests/zmq_integration_tests.rs index 07469a8f50..498c3af820 100644 --- a/talpid_ipc/tests/zmq_integration_tests.rs +++ b/talpid_ipc/tests/zmq_integration_tests.rs @@ -1,38 +1,34 @@ -#[macro_use] -extern crate error_chain; - -#[macro_use] -extern crate assert_matches; - #[cfg(all(test, not(windows)))] mod zmq_integration_tests { + extern crate serde; extern crate talpid_ipc; - extern crate zmq; - use self::talpid_ipc::{Result, IpcClient}; + use self::talpid_ipc::{Result, IpcServerId, IpcClient}; + use std::sync::mpsc::{self, Receiver}; use std::time::Duration; #[test] fn can_connect_and_send_and_receive_messages() { - let (connection_string, new_messages_rx) = start_server(); + let (connection_string, new_messages_rx) = start_server::<String>(); let mut ipc_client = IpcClient::new(connection_string); - ipc_client.send(&[1, 3, 3, 7]).expect("Could not send message"); + let msg = "Hello".to_owned(); + ipc_client.send(&msg).expect("Could not send message"); let message = new_messages_rx.recv_timeout(Duration::from_millis(1000)) .expect("Did not receive a message"); - assert_eq!(message.unwrap(), - &[1, 3, 3, 7], - "Read data does not match sent data"); + assert_eq!(message.unwrap(), "Hello", "Got wrong message"); } - fn start_server() -> (String, Receiver<Result<Vec<u8>>>) { + fn start_server<T>() -> (IpcServerId, Receiver<Result<T>>) + where T: serde::Deserialize + Send + 'static + { let (tx, rx) = mpsc::channel(); let connection_string = - talpid_ipc::start_new_server(Box::new(move |message| { let _ = tx.send(message); })) + talpid_ipc::start_new_server(move |message: Result<T>| { let _ = tx.send(message); }) .expect("Could not start the server"); (connection_string, rx) |
