summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--talpid_ipc/src/zmq_ipc.rs40
-rw-r--r--talpid_ipc/tests/zmq_integration_tests.rs13
2 files changed, 16 insertions, 37 deletions
diff --git a/talpid_ipc/src/zmq_ipc.rs b/talpid_ipc/src/zmq_ipc.rs
index ff4c4372a3..0afab082b2 100644
--- a/talpid_ipc/src/zmq_ipc.rs
+++ b/talpid_ipc/src/zmq_ipc.rs
@@ -3,46 +3,30 @@ extern crate zmq;
use super::{OnMessage, ErrorKind, Result, ResultExt};
use std::thread;
-/// The signature of functions that can be used to parse the incoming data
-/// This is very very similar to `TryFrom` on purpose because I wanted `TryFrom`,
-/// but I couldn't get it to work.
-type MessageParser<MessageType> = Fn(Vec<u8>) -> Result<MessageType> + Send + 'static;
-
type IpcServerId = String;
/// The server end of our Inter-Process Communcation implementation.
-/// It can parse any kind of message, so when you create
-/// it you supply it with a `parser` that converts
-/// the read bytes into the type you want.
-pub struct Server<T>
- where T: 'static
-{
- parser: Box<MessageParser<T>>,
+pub struct Server {
port: u16,
}
-impl<T> Server<T>
- where T: 'static
-{
- pub fn new(port: u16, parser: Box<MessageParser<T>>) -> Self {
- Server {
- port: port,
- parser: parser,
- }
+impl Server {
+ pub fn new(port: u16) -> Self {
+ Server { port: port }
}
/// Starts listening to incoming IPC connections on the specified port.
/// Messages are sent to the `on_message` callback. If anything went wrong
- /// when reading or parsing the message, the message will be an `Err`.
+ /// when reading the message, the message will be an `Err`.
/// NOTE that this does not apply to errors regarding whether the server
/// could start or not, those are returned directly by this function.
///
/// This function is non-blocking and thus spawns a thread where it
/// listens to messages.
- pub fn start(self, on_message: Box<OnMessage<T>>) -> Result<IpcServerId> {
+ pub fn start(self, on_message: Box<OnMessage<Vec<u8>>>) -> Result<IpcServerId> {
let socket =
Self::start_zmq_server(self.port).chain_err(|| ErrorKind::CouldNotStartServer)?;
- let _ = Self::start_receive_loop(socket, on_message, self.parser);
+ let _ = Self::start_receive_loop(socket, on_message);
Ok(format!("localhost:{}", self.port))
}
@@ -57,18 +41,12 @@ impl<T> Server<T>
}
fn start_receive_loop(socket: zmq::Socket,
- mut on_message: Box<OnMessage<T>>,
- parser: Box<MessageParser<T>>)
+ mut on_message: Box<OnMessage<Vec<u8>>>)
-> thread::JoinHandle<()> {
thread::spawn(move || loop {
- let read_res = Self::read(&socket, &parser);
+ let read_res = socket.recv_bytes(0).chain_err(|| ErrorKind::ReadFailure);
on_message(read_res)
})
}
-
- fn read(socket: &zmq::Socket, parser: &Box<MessageParser<T>>) -> Result<T> {
- let bytes = socket.recv_bytes(0).chain_err(|| ErrorKind::ReadFailure)?;
- parser(bytes)
- }
}
diff --git a/talpid_ipc/tests/zmq_integration_tests.rs b/talpid_ipc/tests/zmq_integration_tests.rs
index 6a6314c57e..a076c6fd7a 100644
--- a/talpid_ipc/tests/zmq_integration_tests.rs
+++ b/talpid_ipc/tests/zmq_integration_tests.rs
@@ -15,7 +15,7 @@ const AN_INVALID_MESSAGE: u8 = 2;
#[test]
fn returns_connection_string_when_started() {
let port = 1341;
- let connection_string = talpid_ipc::Server::new(port, Box::new(|_| Ok(())))
+ let connection_string = talpid_ipc::Server::new(port)
.start(Box::new(|_| {}))
.expect("Unable to start server");
@@ -29,8 +29,8 @@ fn returns_connection_string_when_started() {
fn gives_error_when_unable_to_start() {
let port = 1340;
- let ipc_server1 = talpid_ipc::Server::new(port, Box::new(parse_to_test_enum));
- let ipc_server2 = talpid_ipc::Server::new(port, Box::new(parse_to_test_enum));
+ let ipc_server1 = talpid_ipc::Server::new(port);
+ let ipc_server2 = talpid_ipc::Server::new(port);
ipc_server1.start(Box::new(|_| {}))
.expect("Unable to start the first server. Results inconclusive");
@@ -64,9 +64,10 @@ fn does_not_publish_unknown_messages() {
fn connect_and_send(port: u16, message: u8) -> Receiver<Result<TestMessage>> {
let (tx, rx) = mpsc::channel();
- let ipc_server = talpid_ipc::Server::new(port, Box::new(parse_to_test_enum));
- ipc_server.start(Box::new(move |message| { let _ = tx.send(message); }))
- .expect("Could not start the server");
+ let ipc_server = talpid_ipc::Server::new(port);
+ ipc_server.start(Box::new(move |message| {
+ let _ = tx.send(message.and_then(parse_to_test_enum));
+ })).expect("Could not start the server");
let socket = connect_to_server(port).expect("Could not connect to the server");
socket.send(&[message], 0).expect("Could not send message");