diff options
| author | Linus Färnstrand <linus@mullvad.net> | 2017-03-06 13:36:55 +0100 |
|---|---|---|
| committer | Linus Färnstrand <linus@mullvad.net> | 2017-03-06 13:36:55 +0100 |
| commit | c8f40a5088199ad18078418c503277dd24f6dfc8 (patch) | |
| tree | 078e623cf008d02edcea63d9224a6851a419e0d8 | |
| parent | 6ba372e47e0bbc6fb8ecd449ebf10a4bef2658ec (diff) | |
| parent | a044c1b2d8ea5035d21c2e9228475298204fd6fa (diff) | |
| download | mullvadvpn-c8f40a5088199ad18078418c503277dd24f6dfc8.tar.xz mullvadvpn-c8f40a5088199ad18078418c503277dd24f6dfc8.zip | |
Merge branch 'ipc-client'
| -rw-r--r-- | talpid_ipc/src/lib.rs | 4 | ||||
| -rw-r--r-- | talpid_ipc/src/nop_ipc.rs | 11 | ||||
| -rw-r--r-- | talpid_ipc/src/zmq_ipc.rs | 33 | ||||
| -rw-r--r-- | talpid_ipc/tests/zmq_integration_tests.rs | 60 |
4 files changed, 62 insertions, 46 deletions
diff --git a/talpid_ipc/src/lib.rs b/talpid_ipc/src/lib.rs index d9b8c53aed..324fb36f05 100644 --- a/talpid_ipc/src/lib.rs +++ b/talpid_ipc/src/lib.rs @@ -27,5 +27,9 @@ error_chain!{ CouldNotStartServer { description("Failed to start the IPC server") } + + SendError { + description("Unable to send message") + } } } diff --git a/talpid_ipc/src/nop_ipc.rs b/talpid_ipc/src/nop_ipc.rs index 6db87f3b54..8faa58ea34 100644 --- a/talpid_ipc/src/nop_ipc.rs +++ b/talpid_ipc/src/nop_ipc.rs @@ -9,3 +9,14 @@ use super::{OnMessage, ErrorKind, Result, IpcServerId}; fn start_new_server(_on_message: Box<OnMessage<Vec<u8>>>) -> Result<IpcServerId> { Err(ErrorKind::CouldNotStartServer.into()) } + +pub struct IpcClient; +impl IpcClient { + pub fn new(server_id: IpcServerId) -> Self { + IpcClient + } + + pub fn send(mut self, message: &[u8]) -> Result<()> { + Err(ErrorKind::SendError.into()) + } +} diff --git a/talpid_ipc/src/zmq_ipc.rs b/talpid_ipc/src/zmq_ipc.rs index 0cbb4bcfd0..b7e77a15c4 100644 --- a/talpid_ipc/src/zmq_ipc.rs +++ b/talpid_ipc/src/zmq_ipc.rs @@ -45,3 +45,36 @@ fn start_receive_loop(socket: zmq::Socket, on_message(read_res); }) } + +pub struct IpcClient { + server_address: IpcServerId, + socket: Option<zmq::Socket>, +} +impl IpcClient { + pub fn new(server_id: IpcServerId) -> Self { + IpcClient { + server_address: server_id, + socket: None, + } + } + + pub fn send(&mut self, message: &[u8]) -> Result<()> { + if self.socket.is_none() { + self.connect().chain_err(|| ErrorKind::SendError)?; + } + + let socket = self.socket.as_ref().unwrap(); + socket.send(message, 0).chain_err(|| ErrorKind::SendError) + } + + fn connect(&mut self) -> Result<()> { + let ctx = zmq::Context::new(); + let socket = ctx.socket(zmq::PUSH) + .chain_err(|| "Could not create ZeroMQ PUSH socket".to_owned())?; + socket.connect(&self.server_address) + .chain_err(|| format!("Could not connect to {:?}", self.server_address))?; + + self.socket = Some(socket); + Ok(()) + } +} diff --git a/talpid_ipc/tests/zmq_integration_tests.rs b/talpid_ipc/tests/zmq_integration_tests.rs index 2b976c877d..07469a8f50 100644 --- a/talpid_ipc/tests/zmq_integration_tests.rs +++ b/talpid_ipc/tests/zmq_integration_tests.rs @@ -9,64 +9,32 @@ mod zmq_integration_tests { extern crate talpid_ipc; extern crate zmq; - - use self::talpid_ipc::Result; + use self::talpid_ipc::{Result, IpcClient}; use std::sync::mpsc::{self, Receiver}; use std::time::Duration; - const A_VALID_MESSAGE: [u8; 1] = [1]; - #[test] - fn can_connect_to_server_with_the_returned_id() { - let connection_string = talpid_ipc::start_new_server(Box::new(|_| {})) - .expect("Unable to start server"); + fn can_connect_and_send_and_receive_messages() { + let (connection_string, new_messages_rx) = start_server(); - let connection_res = connect_to_server(&connection_string); - assert!(connection_res.is_ok(), - "Unable to connect to the server with the given connection string"); - } - - #[test] - fn publishes_incoming_messages_to_channel() { - let new_messages_rx = connect_and_send(&A_VALID_MESSAGE); + let mut ipc_client = IpcClient::new(connection_string); + ipc_client.send(&[1, 3, 3, 7]).expect("Could not send message"); let message = new_messages_rx.recv_timeout(Duration::from_millis(1000)) .expect("Did not receive a message"); - assert_matches!(message, Ok(TestMessage::Hello)); - } - - fn connect_and_send(message: &[u8]) -> Receiver<Result<TestMessage>> { - let (tx, rx) = mpsc::channel(); - - let connection_string = talpid_ipc::start_new_server(Box::new(move |message| { - let _ = tx.send(message.and_then(parse_to_test_enum)); - })).expect("Could not start the server"); - let socket = connect_to_server(&connection_string) - .expect("Could not connect to the server"); - socket.send(message, 0).expect("Could not send message"); - - rx + assert_eq!(message.unwrap(), + &[1, 3, 3, 7], + "Read data does not match sent data"); } - fn connect_to_server(connection_string: &str) -> zmq::Result<zmq::Socket> { - let ctx = zmq::Context::new(); - - let socket = ctx.socket(zmq::PUSH)?; - socket.connect(connection_string)?; - Ok(socket) - } + fn start_server() -> (String, Receiver<Result<Vec<u8>>>) { + let (tx, rx) = mpsc::channel(); - fn parse_to_test_enum(message_as_bytes: Vec<u8>) -> Result<TestMessage> { - if message_as_bytes == A_VALID_MESSAGE { - Ok(TestMessage::Hello) - } else { - Err(format!("Invalid message: {:?}", message_as_bytes).into()) - } - } + let connection_string = + talpid_ipc::start_new_server(Box::new(move |message| { let _ = tx.send(message); })) + .expect("Could not start the server"); - #[derive(Debug, PartialEq)] - pub enum TestMessage { - Hello, + (connection_string, rx) } } |
