summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--talpid_ipc/src/ipc/mod.rs43
-rw-r--r--talpid_ipc/src/ipc/nop_ipc.rs17
-rw-r--r--talpid_ipc/src/ipc/zmq_ipc.rs156
-rw-r--r--talpid_ipc/src/lib.rs34
-rw-r--r--talpid_ipc/src/nop_ipc.rs16
-rw-r--r--talpid_ipc/src/zmq_ipc.rs74
-rw-r--r--talpid_ipc/tests/zmq_integration_tests.rs97
7 files changed, 215 insertions, 222 deletions
diff --git a/talpid_ipc/src/ipc/mod.rs b/talpid_ipc/src/ipc/mod.rs
deleted file mode 100644
index 93130ee4ac..0000000000
--- a/talpid_ipc/src/ipc/mod.rs
+++ /dev/null
@@ -1,43 +0,0 @@
-#[cfg(windows)]
-#[path = "nop_ipc.rs"]
-mod ipc_impl;
-
-#[cfg(not(windows))]
-#[path = "zmq_ipc.rs"]
-mod ipc_impl;
-
-pub use self::ipc_impl::*;
-
-/// The type signature for functions accepting messages from the server.
-/// If the server fails in delivering the message for any reason it will
-/// put the cause in the Err part of the `Result`.
-pub type OnMessage<MessageType> = FnMut(Result<MessageType>) + Send + 'static;
-
-/// The server end of our Inter-Process Communcation implementation.
-pub trait IpcServer {
- type MessageType;
-
- /// Starts listening to incoming IPC connections on the specified port.
- /// Messages are sent to the `on_message` callback. If anything went wrong
- /// when reading or parsing the message, the message will be an `Err`.
- /// NOTE that this does not apply to errors regarding whether the server
- /// could start or not, those are returned directly by this function.
- ///
- /// This function is non-blocking and thus spawns a thread where it
- /// listens to messages.
- fn start(self, on_message: Box<OnMessage<Self::MessageType>>) -> Result<()>;
-}
-
-error_chain!{
- errors {
- ReadFailure {
- description("Could not read IPC message")
- }
- CouldNotStartServer {
- description("Failed to start the IPC server")
- }
- InvalidMessage(message: Vec<u8>) {
- description("The IPC server got a message it did not know how to handle")
- }
- }
-}
diff --git a/talpid_ipc/src/ipc/nop_ipc.rs b/talpid_ipc/src/ipc/nop_ipc.rs
deleted file mode 100644
index 56e3eb8799..0000000000
--- a/talpid_ipc/src/ipc/nop_ipc.rs
+++ /dev/null
@@ -1,17 +0,0 @@
-/// This file only exists because we cannot get ZeroMQ to work on
-/// Windows. This is not a valid IPC implementation and us using
-/// it on Windows will result in a non-functioning client.
-///
-/// We plan on trying with ZMQ again in the future.
-/// Erik, 2017-02-09
-
-use ipc::{IpcServer, OnMessage, ErrorKind, Result};
-
-pub struct NopIpcServer;
-impl IpcServer for NopIpcServer {
- type MessageType = String;
-
- fn start(self, _on_message: Box<OnMessage<Self::MessageType>>) -> Result<()> {
- Err(ErrorKind::CouldNotStartServer.into())
- }
-}
diff --git a/talpid_ipc/src/ipc/zmq_ipc.rs b/talpid_ipc/src/ipc/zmq_ipc.rs
deleted file mode 100644
index 733f42ca42..0000000000
--- a/talpid_ipc/src/ipc/zmq_ipc.rs
+++ /dev/null
@@ -1,156 +0,0 @@
-extern crate zmq;
-
-use ipc::{IpcServer, OnMessage, ErrorKind, Result, ResultExt};
-use std::thread;
-
-/// The signature of functions that can be used to parse the incoming data
-/// This is very very similar to `TryFrom` on purpose because I wanted `TryFrom`,
-/// but I couldn't get it to work.
-type MessageParser<MessageType> = Fn(Vec<u8>) -> Result<MessageType> + Send + 'static;
-
-/// Implements the `IpcServer` trait using a ZeroMQ PULL socket
-/// The IPC server can parse any kind of message, so when you create
-/// the `ZmqIpcServer` you supply it with a `parser` that converts
-/// the read bytes into the type you want.
-pub struct ZmqIpcServer<T>
- where T: 'static
-{
- parser: Box<MessageParser<T>>,
- port: u16,
-}
-
-impl<T> IpcServer for ZmqIpcServer<T>
- where T: 'static
-{
- type MessageType = T;
-
- fn start(self, on_message: Box<OnMessage<T>>) -> Result<()> {
- let socket =
- Self::start_zmq_server(self.port).chain_err(|| ErrorKind::CouldNotStartServer)?;
- let _ = Self::start_receive_loop(socket, on_message, self.parser);
- Ok(())
- }
-}
-
-impl<T> ZmqIpcServer<T>
- where T: 'static
-{
- fn start_zmq_server(port: u16) -> zmq::Result<zmq::Socket> {
- let ctx = zmq::Context::new();
-
- let socket = ctx.socket(zmq::PULL)?;
- let connection_string = format!("tcp://127.0.0.1:{}", port);
- socket.bind(&connection_string)?;
-
- Ok(socket)
- }
-
- fn start_receive_loop(socket: zmq::Socket,
- mut on_message: Box<OnMessage<T>>,
- parser: Box<MessageParser<T>>)
- -> thread::JoinHandle<()> {
-
- thread::spawn(move || loop {
- let read_res = Self::read(&socket, &parser);
- on_message(read_res)
- })
- }
-
- fn read(socket: &zmq::Socket, parser: &Box<MessageParser<T>>) -> Result<T> {
- let bytes = socket.recv_bytes(0).chain_err(|| ErrorKind::ReadFailure)?;
- parser(bytes)
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use ipc::{IpcServer, Result, ErrorKind};
- use std::result;
- use std::sync::mpsc::{self, Receiver};
- use std::time::Duration;
- extern crate zmq;
-
- const A_VALID_MESSAGE: u8 = 1;
- const AN_INVALID_MESSAGE: u8 = 2;
-
- #[test]
- fn gives_error_when_unable_to_start() {
- let port = 1340;
-
- let ipc_server1 = ZmqIpcServer {
- port: port,
- parser: Box::new(parse_to_test_enum),
- };
- let ipc_server2 = ZmqIpcServer {
- port: port,
- parser: Box::new(parse_to_test_enum),
- };
-
- ipc_server1.start(Box::new(|_| {}))
- .expect("Unable to start the first server. Results inconclusive");
- let start_res = ipc_server2.start(Box::new(|_| {}));
-
- assert!(start_res.is_err());
- let err = start_res.unwrap_err();
- assert_matches!(err.kind(), &ErrorKind::CouldNotStartServer);
- assert!(err.iter().count() > 1)
- }
-
- #[test]
- fn publishes_incoming_messages_to_channel() {
- let new_messages_rx = connect_and_send(1337, A_VALID_MESSAGE);
-
- let message = new_messages_rx.recv_timeout(Duration::from_millis(1000))
- .expect("Did not receive a message");
- assert_matches!(message, Ok(TestMessage::HELLO));
- }
-
- #[test]
- fn does_not_publish_unknown_messages() {
- let rx = connect_and_send(1338, AN_INVALID_MESSAGE);
-
- let message = rx.recv_timeout(Duration::from_millis(1000))
- .expect("Did not receive message");
-
- assert_matches!(message.unwrap_err().kind(), &ErrorKind::InvalidMessage(_));
- }
-
- fn connect_and_send(port: u16, message: u8) -> Receiver<Result<TestMessage>> {
- let (tx, rx) = mpsc::channel();
-
- let ipc_server = ZmqIpcServer {
- port: port,
- parser: Box::new(parse_to_test_enum),
- };
- ipc_server.start(Box::new(move |message| { let _ = tx.send(message); }))
- .expect("Could not start the server");
-
- let socket = connect_to_server(port).expect("Could not connect to the server");
- socket.send(&[message], 0).expect("Could not send message");
-
- rx
- }
-
- fn connect_to_server(port: u16) -> result::Result<zmq::Socket, zmq::Error> {
- let ctx = zmq::Context::new();
-
- let socket = ctx.socket(zmq::PUSH)?;
- let connection_string: String = format!("tcp://127.0.0.1:{}", port);
- try!(socket.connect(connection_string.as_str()));
- Ok(socket)
- }
-
- fn parse_to_test_enum(message_as_bytes: Vec<u8>) -> Result<TestMessage> {
- if message_as_bytes[0] == A_VALID_MESSAGE {
- Ok(TestMessage::HELLO)
- } else {
- Err(ErrorKind::InvalidMessage(message_as_bytes).into())
- }
- }
-
- #[derive(Debug, PartialEq)]
- pub enum TestMessage {
- HELLO,
- }
-}
diff --git a/talpid_ipc/src/lib.rs b/talpid_ipc/src/lib.rs
index 1cc3ec497c..75ace1d87e 100644
--- a/talpid_ipc/src/lib.rs
+++ b/talpid_ipc/src/lib.rs
@@ -1,9 +1,31 @@
-#[cfg(test)]
-#[macro_use]
-extern crate assert_matches;
-
#[macro_use]
extern crate error_chain;
-mod ipc;
-pub use ipc::*;
+#[cfg(windows)]
+#[path = "nop_ipc.rs"]
+mod ipc_impl;
+
+#[cfg(not(windows))]
+#[path = "zmq_ipc.rs"]
+mod ipc_impl;
+
+pub use self::ipc_impl::*;
+
+/// The type signature for functions accepting messages from the server.
+/// If the server fails in delivering the message for any reason it will
+/// put the cause in the Err part of the `Result`.
+pub type OnMessage<MessageType> = FnMut(Result<MessageType>) + Send + 'static;
+
+error_chain!{
+ errors {
+ ReadFailure {
+ description("Could not read IPC message")
+ }
+ CouldNotStartServer {
+ description("Failed to start the IPC server")
+ }
+ InvalidMessage(message: Vec<u8>) {
+ description("The IPC server got a message it did not know how to handle")
+ }
+ }
+}
diff --git a/talpid_ipc/src/nop_ipc.rs b/talpid_ipc/src/nop_ipc.rs
new file mode 100644
index 0000000000..6378dd3213
--- /dev/null
+++ b/talpid_ipc/src/nop_ipc.rs
@@ -0,0 +1,16 @@
+use super::{OnMessage, ErrorKind, Result};
+
+/// This implementation only exists because we cannot get ZeroMQ to work on
+/// Windows. This is not a valid IPC implementation and us using
+/// it on Windows will result in a non-functioning client.
+///
+/// We plan on trying with ZMQ again in the future.
+/// Erik, 2017-02-09
+pub struct Server;
+impl<T> Server<T>
+ where T: 'static
+{
+ fn start(self, _on_message: Box<OnMessage<T>>) -> Result<()> {
+ Err(ErrorKind::CouldNotStartServer.into())
+ }
+}
diff --git a/talpid_ipc/src/zmq_ipc.rs b/talpid_ipc/src/zmq_ipc.rs
new file mode 100644
index 0000000000..ff4c4372a3
--- /dev/null
+++ b/talpid_ipc/src/zmq_ipc.rs
@@ -0,0 +1,74 @@
+extern crate zmq;
+
+use super::{OnMessage, ErrorKind, Result, ResultExt};
+use std::thread;
+
+/// The signature of functions that can be used to parse the incoming data
+/// This is very very similar to `TryFrom` on purpose because I wanted `TryFrom`,
+/// but I couldn't get it to work.
+type MessageParser<MessageType> = Fn(Vec<u8>) -> Result<MessageType> + Send + 'static;
+
+type IpcServerId = String;
+
+/// The server end of our Inter-Process Communcation implementation.
+/// It can parse any kind of message, so when you create
+/// it you supply it with a `parser` that converts
+/// the read bytes into the type you want.
+pub struct Server<T>
+ where T: 'static
+{
+ parser: Box<MessageParser<T>>,
+ port: u16,
+}
+
+impl<T> Server<T>
+ where T: 'static
+{
+ pub fn new(port: u16, parser: Box<MessageParser<T>>) -> Self {
+ Server {
+ port: port,
+ parser: parser,
+ }
+ }
+
+ /// Starts listening to incoming IPC connections on the specified port.
+ /// Messages are sent to the `on_message` callback. If anything went wrong
+ /// when reading or parsing the message, the message will be an `Err`.
+ /// NOTE that this does not apply to errors regarding whether the server
+ /// could start or not, those are returned directly by this function.
+ ///
+ /// This function is non-blocking and thus spawns a thread where it
+ /// listens to messages.
+ pub fn start(self, on_message: Box<OnMessage<T>>) -> Result<IpcServerId> {
+ let socket =
+ Self::start_zmq_server(self.port).chain_err(|| ErrorKind::CouldNotStartServer)?;
+ let _ = Self::start_receive_loop(socket, on_message, self.parser);
+ Ok(format!("localhost:{}", self.port))
+ }
+
+ fn start_zmq_server(port: u16) -> zmq::Result<zmq::Socket> {
+ let ctx = zmq::Context::new();
+
+ let socket = ctx.socket(zmq::PULL)?;
+ let connection_string = format!("tcp://127.0.0.1:{}", port);
+ socket.bind(&connection_string)?;
+
+ Ok(socket)
+ }
+
+ fn start_receive_loop(socket: zmq::Socket,
+ mut on_message: Box<OnMessage<T>>,
+ parser: Box<MessageParser<T>>)
+ -> thread::JoinHandle<()> {
+
+ thread::spawn(move || loop {
+ let read_res = Self::read(&socket, &parser);
+ on_message(read_res)
+ })
+ }
+
+ fn read(socket: &zmq::Socket, parser: &Box<MessageParser<T>>) -> Result<T> {
+ let bytes = socket.recv_bytes(0).chain_err(|| ErrorKind::ReadFailure)?;
+ parser(bytes)
+ }
+}
diff --git a/talpid_ipc/tests/zmq_integration_tests.rs b/talpid_ipc/tests/zmq_integration_tests.rs
new file mode 100644
index 0000000000..6a6314c57e
--- /dev/null
+++ b/talpid_ipc/tests/zmq_integration_tests.rs
@@ -0,0 +1,97 @@
+extern crate talpid_ipc;
+extern crate zmq;
+
+#[macro_use]
+extern crate assert_matches;
+
+use std::result;
+use std::sync::mpsc::{self, Receiver};
+use std::time::Duration;
+use talpid_ipc::{ErrorKind, Result};
+
+const A_VALID_MESSAGE: u8 = 1;
+const AN_INVALID_MESSAGE: u8 = 2;
+
+#[test]
+fn returns_connection_string_when_started() {
+ let port = 1341;
+ let connection_string = talpid_ipc::Server::new(port, Box::new(|_| Ok(())))
+ .start(Box::new(|_| {}))
+ .expect("Unable to start server");
+
+ assert!(connection_string.contains("localhost"),
+ format!("'{}' did not contain 'localhost'", connection_string));
+ assert!(connection_string.contains(&port.to_string()),
+ format!("'{}' did not contain the port", connection_string));
+}
+
+#[test]
+fn gives_error_when_unable_to_start() {
+ let port = 1340;
+
+ let ipc_server1 = talpid_ipc::Server::new(port, Box::new(parse_to_test_enum));
+ let ipc_server2 = talpid_ipc::Server::new(port, Box::new(parse_to_test_enum));
+
+ ipc_server1.start(Box::new(|_| {}))
+ .expect("Unable to start the first server. Results inconclusive");
+ let start_res = ipc_server2.start(Box::new(|_| {}));
+
+ assert!(start_res.is_err());
+ let err = start_res.unwrap_err();
+ assert_matches!(err.kind(), &ErrorKind::CouldNotStartServer);
+ assert!(err.iter().count() > 1)
+}
+
+#[test]
+fn publishes_incoming_messages_to_channel() {
+ let new_messages_rx = connect_and_send(1337, A_VALID_MESSAGE);
+
+ let message = new_messages_rx.recv_timeout(Duration::from_millis(1000))
+ .expect("Did not receive a message");
+ assert_matches!(message, Ok(TestMessage::HELLO));
+}
+
+#[test]
+fn does_not_publish_unknown_messages() {
+ let rx = connect_and_send(1338, AN_INVALID_MESSAGE);
+
+ let message = rx.recv_timeout(Duration::from_millis(1000))
+ .expect("Did not receive message");
+
+ assert_matches!(message.unwrap_err().kind(), &ErrorKind::InvalidMessage(_));
+}
+
+fn connect_and_send(port: u16, message: u8) -> Receiver<Result<TestMessage>> {
+ let (tx, rx) = mpsc::channel();
+
+ let ipc_server = talpid_ipc::Server::new(port, Box::new(parse_to_test_enum));
+ ipc_server.start(Box::new(move |message| { let _ = tx.send(message); }))
+ .expect("Could not start the server");
+
+ let socket = connect_to_server(port).expect("Could not connect to the server");
+ socket.send(&[message], 0).expect("Could not send message");
+
+ rx
+}
+
+fn connect_to_server(port: u16) -> result::Result<zmq::Socket, zmq::Error> {
+ let ctx = zmq::Context::new();
+
+ let socket = ctx.socket(zmq::PUSH)?;
+ let connection_string: String = format!("tcp://127.0.0.1:{}", port);
+ try!(socket.connect(connection_string.as_str()));
+ Ok(socket)
+}
+
+fn parse_to_test_enum(message_as_bytes: Vec<u8>) -> Result<TestMessage> {
+ if message_as_bytes[0] == A_VALID_MESSAGE {
+ Ok(TestMessage::HELLO)
+ } else {
+ Err(ErrorKind::InvalidMessage(message_as_bytes).into())
+ }
+}
+
+#[derive(Debug, PartialEq)]
+pub enum TestMessage {
+ HELLO,
+}