summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorLinus Färnstrand <linus@mullvad.net>2017-03-06 13:36:55 +0100
committerLinus Färnstrand <linus@mullvad.net>2017-03-06 13:36:55 +0100
commitc8f40a5088199ad18078418c503277dd24f6dfc8 (patch)
tree078e623cf008d02edcea63d9224a6851a419e0d8
parent6ba372e47e0bbc6fb8ecd449ebf10a4bef2658ec (diff)
parenta044c1b2d8ea5035d21c2e9228475298204fd6fa (diff)
downloadmullvadvpn-c8f40a5088199ad18078418c503277dd24f6dfc8.tar.xz
mullvadvpn-c8f40a5088199ad18078418c503277dd24f6dfc8.zip
Merge branch 'ipc-client'
-rw-r--r--talpid_ipc/src/lib.rs4
-rw-r--r--talpid_ipc/src/nop_ipc.rs11
-rw-r--r--talpid_ipc/src/zmq_ipc.rs33
-rw-r--r--talpid_ipc/tests/zmq_integration_tests.rs60
4 files changed, 62 insertions, 46 deletions
diff --git a/talpid_ipc/src/lib.rs b/talpid_ipc/src/lib.rs
index d9b8c53aed..324fb36f05 100644
--- a/talpid_ipc/src/lib.rs
+++ b/talpid_ipc/src/lib.rs
@@ -27,5 +27,9 @@ error_chain!{
CouldNotStartServer {
description("Failed to start the IPC server")
}
+
+ SendError {
+ description("Unable to send message")
+ }
}
}
diff --git a/talpid_ipc/src/nop_ipc.rs b/talpid_ipc/src/nop_ipc.rs
index 6db87f3b54..8faa58ea34 100644
--- a/talpid_ipc/src/nop_ipc.rs
+++ b/talpid_ipc/src/nop_ipc.rs
@@ -9,3 +9,14 @@ use super::{OnMessage, ErrorKind, Result, IpcServerId};
fn start_new_server(_on_message: Box<OnMessage<Vec<u8>>>) -> Result<IpcServerId> {
Err(ErrorKind::CouldNotStartServer.into())
}
+
+pub struct IpcClient;
+impl IpcClient {
+ pub fn new(server_id: IpcServerId) -> Self {
+ IpcClient
+ }
+
+ pub fn send(mut self, message: &[u8]) -> Result<()> {
+ Err(ErrorKind::SendError.into())
+ }
+}
diff --git a/talpid_ipc/src/zmq_ipc.rs b/talpid_ipc/src/zmq_ipc.rs
index 0cbb4bcfd0..b7e77a15c4 100644
--- a/talpid_ipc/src/zmq_ipc.rs
+++ b/talpid_ipc/src/zmq_ipc.rs
@@ -45,3 +45,36 @@ fn start_receive_loop(socket: zmq::Socket,
on_message(read_res);
})
}
+
+pub struct IpcClient {
+ server_address: IpcServerId,
+ socket: Option<zmq::Socket>,
+}
+impl IpcClient {
+ pub fn new(server_id: IpcServerId) -> Self {
+ IpcClient {
+ server_address: server_id,
+ socket: None,
+ }
+ }
+
+ pub fn send(&mut self, message: &[u8]) -> Result<()> {
+ if self.socket.is_none() {
+ self.connect().chain_err(|| ErrorKind::SendError)?;
+ }
+
+ let socket = self.socket.as_ref().unwrap();
+ socket.send(message, 0).chain_err(|| ErrorKind::SendError)
+ }
+
+ fn connect(&mut self) -> Result<()> {
+ let ctx = zmq::Context::new();
+ let socket = ctx.socket(zmq::PUSH)
+ .chain_err(|| "Could not create ZeroMQ PUSH socket".to_owned())?;
+ socket.connect(&self.server_address)
+ .chain_err(|| format!("Could not connect to {:?}", self.server_address))?;
+
+ self.socket = Some(socket);
+ Ok(())
+ }
+}
diff --git a/talpid_ipc/tests/zmq_integration_tests.rs b/talpid_ipc/tests/zmq_integration_tests.rs
index 2b976c877d..07469a8f50 100644
--- a/talpid_ipc/tests/zmq_integration_tests.rs
+++ b/talpid_ipc/tests/zmq_integration_tests.rs
@@ -9,64 +9,32 @@ mod zmq_integration_tests {
extern crate talpid_ipc;
extern crate zmq;
-
- use self::talpid_ipc::Result;
+ use self::talpid_ipc::{Result, IpcClient};
use std::sync::mpsc::{self, Receiver};
use std::time::Duration;
- const A_VALID_MESSAGE: [u8; 1] = [1];
-
#[test]
- fn can_connect_to_server_with_the_returned_id() {
- let connection_string = talpid_ipc::start_new_server(Box::new(|_| {}))
- .expect("Unable to start server");
+ fn can_connect_and_send_and_receive_messages() {
+ let (connection_string, new_messages_rx) = start_server();
- let connection_res = connect_to_server(&connection_string);
- assert!(connection_res.is_ok(),
- "Unable to connect to the server with the given connection string");
- }
-
- #[test]
- fn publishes_incoming_messages_to_channel() {
- let new_messages_rx = connect_and_send(&A_VALID_MESSAGE);
+ let mut ipc_client = IpcClient::new(connection_string);
+ ipc_client.send(&[1, 3, 3, 7]).expect("Could not send message");
let message = new_messages_rx.recv_timeout(Duration::from_millis(1000))
.expect("Did not receive a message");
- assert_matches!(message, Ok(TestMessage::Hello));
- }
-
- fn connect_and_send(message: &[u8]) -> Receiver<Result<TestMessage>> {
- let (tx, rx) = mpsc::channel();
-
- let connection_string = talpid_ipc::start_new_server(Box::new(move |message| {
- let _ = tx.send(message.and_then(parse_to_test_enum));
- })).expect("Could not start the server");
- let socket = connect_to_server(&connection_string)
- .expect("Could not connect to the server");
- socket.send(message, 0).expect("Could not send message");
-
- rx
+ assert_eq!(message.unwrap(),
+ &[1, 3, 3, 7],
+ "Read data does not match sent data");
}
- fn connect_to_server(connection_string: &str) -> zmq::Result<zmq::Socket> {
- let ctx = zmq::Context::new();
-
- let socket = ctx.socket(zmq::PUSH)?;
- socket.connect(connection_string)?;
- Ok(socket)
- }
+ fn start_server() -> (String, Receiver<Result<Vec<u8>>>) {
+ let (tx, rx) = mpsc::channel();
- fn parse_to_test_enum(message_as_bytes: Vec<u8>) -> Result<TestMessage> {
- if message_as_bytes == A_VALID_MESSAGE {
- Ok(TestMessage::Hello)
- } else {
- Err(format!("Invalid message: {:?}", message_as_bytes).into())
- }
- }
+ let connection_string =
+ talpid_ipc::start_new_server(Box::new(move |message| { let _ = tx.send(message); }))
+ .expect("Could not start the server");
- #[derive(Debug, PartialEq)]
- pub enum TestMessage {
- Hello,
+ (connection_string, rx)
}
}