diff options
| author | Erik Larkö <erik@mullvad.net> | 2017-03-02 22:05:46 +0800 |
|---|---|---|
| committer | Linus Färnstrand <linus@mullvad.net> | 2017-03-06 13:22:36 +0100 |
| commit | e8c5b8c69b993d85235923a1b7d7cec4f76238bb (patch) | |
| tree | 12ec659c67ebc5f2891e0ce5c499bf586b7db8e1 | |
| parent | 6ba372e47e0bbc6fb8ecd449ebf10a4bef2658ec (diff) | |
| download | mullvadvpn-e8c5b8c69b993d85235923a1b7d7cec4f76238bb.tar.xz mullvadvpn-e8c5b8c69b993d85235923a1b7d7cec4f76238bb.zip | |
Add IpcClient
| -rw-r--r-- | talpid_ipc/src/lib.rs | 7 | ||||
| -rw-r--r-- | talpid_ipc/src/nop_ipc.rs | 11 | ||||
| -rw-r--r-- | talpid_ipc/src/zmq_ipc.rs | 33 | ||||
| -rw-r--r-- | talpid_ipc/tests/zmq_integration_tests.rs | 60 |
4 files changed, 65 insertions, 46 deletions
diff --git a/talpid_ipc/src/lib.rs b/talpid_ipc/src/lib.rs index d9b8c53aed..5987ca1c2a 100644 --- a/talpid_ipc/src/lib.rs +++ b/talpid_ipc/src/lib.rs @@ -27,5 +27,12 @@ error_chain!{ CouldNotStartServer { description("Failed to start the IPC server") } + + NotConnected { + description("The IPC client is not connected to the server") + } + SendError { + description("Unable to send message") + } } } diff --git a/talpid_ipc/src/nop_ipc.rs b/talpid_ipc/src/nop_ipc.rs index 6db87f3b54..c8c0bbcc2f 100644 --- a/talpid_ipc/src/nop_ipc.rs +++ b/talpid_ipc/src/nop_ipc.rs @@ -9,3 +9,14 @@ use super::{OnMessage, ErrorKind, Result, IpcServerId}; fn start_new_server(_on_message: Box<OnMessage<Vec<u8>>>) -> Result<IpcServerId> { Err(ErrorKind::CouldNotStartServer.into()) } + +pub struct IpcClient; +impl IpcClient { + pub fn new(server_id: IpcServerId) -> Self { + IpcClient; + } + + pub fn send(mut self, message: &[u8]) -> Result<()> { + Err(ErrorKind::SendError.into()) + } +} diff --git a/talpid_ipc/src/zmq_ipc.rs b/talpid_ipc/src/zmq_ipc.rs index 0cbb4bcfd0..0e8f18517f 100644 --- a/talpid_ipc/src/zmq_ipc.rs +++ b/talpid_ipc/src/zmq_ipc.rs @@ -45,3 +45,36 @@ fn start_receive_loop(socket: zmq::Socket, on_message(read_res); }) } + +pub struct IpcClient { + server_address: IpcServerId, + socket: Option<zmq::Socket>, +} +impl IpcClient { + pub fn new(server_id: IpcServerId) -> Self { + IpcClient { + server_address: server_id, + socket: None, + } + } + + pub fn send(mut self, message: &[u8]) -> Result<()> { + if self.socket.is_none() { + self.connect().chain_err(|| ErrorKind::SendError)?; + } + + let socket = self.socket.unwrap(); + socket.send(message, 0).chain_err(|| ErrorKind::SendError) + } + + fn connect(&mut self) -> Result<()> { + let ctx = zmq::Context::new(); + let socket = ctx.socket(zmq::PUSH) + .chain_err(|| format!("Could not connect to {:?}", self.server_address))?; + socket.connect(&self.server_address) + .chain_err(|| format!("Could not connect to {:?}", self.server_address))?; + + self.socket = Some(socket); + Ok(()) + } +} diff --git a/talpid_ipc/tests/zmq_integration_tests.rs b/talpid_ipc/tests/zmq_integration_tests.rs index 2b976c877d..c72549a837 100644 --- a/talpid_ipc/tests/zmq_integration_tests.rs +++ b/talpid_ipc/tests/zmq_integration_tests.rs @@ -9,64 +9,32 @@ mod zmq_integration_tests { extern crate talpid_ipc; extern crate zmq; - - use self::talpid_ipc::Result; + use self::talpid_ipc::{Result, IpcClient}; use std::sync::mpsc::{self, Receiver}; use std::time::Duration; - const A_VALID_MESSAGE: [u8; 1] = [1]; - #[test] - fn can_connect_to_server_with_the_returned_id() { - let connection_string = talpid_ipc::start_new_server(Box::new(|_| {})) - .expect("Unable to start server"); + fn can_connect_and_send_and_receive_messages() { + let (connection_string, new_messages_rx) = start_server(); - let connection_res = connect_to_server(&connection_string); - assert!(connection_res.is_ok(), - "Unable to connect to the server with the given connection string"); - } - - #[test] - fn publishes_incoming_messages_to_channel() { - let new_messages_rx = connect_and_send(&A_VALID_MESSAGE); + let ipc_client = IpcClient::new(connection_string); + ipc_client.send(&[1, 3, 3, 7]).expect("Could not send message"); let message = new_messages_rx.recv_timeout(Duration::from_millis(1000)) .expect("Did not receive a message"); - assert_matches!(message, Ok(TestMessage::Hello)); - } - - fn connect_and_send(message: &[u8]) -> Receiver<Result<TestMessage>> { - let (tx, rx) = mpsc::channel(); - - let connection_string = talpid_ipc::start_new_server(Box::new(move |message| { - let _ = tx.send(message.and_then(parse_to_test_enum)); - })).expect("Could not start the server"); - let socket = connect_to_server(&connection_string) - .expect("Could not connect to the server"); - socket.send(message, 0).expect("Could not send message"); - - rx + let a = vec![1, 3, 3, 7]; + assert!(message.is_ok(), "Got error"); + assert_eq!(message.unwrap(), a, "Got wrong message"); } - fn connect_to_server(connection_string: &str) -> zmq::Result<zmq::Socket> { - let ctx = zmq::Context::new(); - - let socket = ctx.socket(zmq::PUSH)?; - socket.connect(connection_string)?; - Ok(socket) - } + fn start_server() -> (String, Receiver<Result<Vec<u8>>>) { + let (tx, rx) = mpsc::channel(); - fn parse_to_test_enum(message_as_bytes: Vec<u8>) -> Result<TestMessage> { - if message_as_bytes == A_VALID_MESSAGE { - Ok(TestMessage::Hello) - } else { - Err(format!("Invalid message: {:?}", message_as_bytes).into()) - } - } + let connection_string = + talpid_ipc::start_new_server(Box::new(move |message| { let _ = tx.send(message); })) + .expect("Could not start the server"); - #[derive(Debug, PartialEq)] - pub enum TestMessage { - Hello, + (connection_string, rx) } } |
