diff options
| -rw-r--r-- | talpid_ipc/src/ipc/mod.rs | 43 | ||||
| -rw-r--r-- | talpid_ipc/src/ipc/nop_ipc.rs | 17 | ||||
| -rw-r--r-- | talpid_ipc/src/ipc/zmq_ipc.rs | 156 | ||||
| -rw-r--r-- | talpid_ipc/src/lib.rs | 34 | ||||
| -rw-r--r-- | talpid_ipc/src/nop_ipc.rs | 16 | ||||
| -rw-r--r-- | talpid_ipc/src/zmq_ipc.rs | 74 | ||||
| -rw-r--r-- | talpid_ipc/tests/zmq_integration_tests.rs | 97 |
7 files changed, 215 insertions, 222 deletions
diff --git a/talpid_ipc/src/ipc/mod.rs b/talpid_ipc/src/ipc/mod.rs deleted file mode 100644 index 93130ee4ac..0000000000 --- a/talpid_ipc/src/ipc/mod.rs +++ /dev/null @@ -1,43 +0,0 @@ -#[cfg(windows)] -#[path = "nop_ipc.rs"] -mod ipc_impl; - -#[cfg(not(windows))] -#[path = "zmq_ipc.rs"] -mod ipc_impl; - -pub use self::ipc_impl::*; - -/// The type signature for functions accepting messages from the server. -/// If the server fails in delivering the message for any reason it will -/// put the cause in the Err part of the `Result`. -pub type OnMessage<MessageType> = FnMut(Result<MessageType>) + Send + 'static; - -/// The server end of our Inter-Process Communcation implementation. -pub trait IpcServer { - type MessageType; - - /// Starts listening to incoming IPC connections on the specified port. - /// Messages are sent to the `on_message` callback. If anything went wrong - /// when reading or parsing the message, the message will be an `Err`. - /// NOTE that this does not apply to errors regarding whether the server - /// could start or not, those are returned directly by this function. - /// - /// This function is non-blocking and thus spawns a thread where it - /// listens to messages. - fn start(self, on_message: Box<OnMessage<Self::MessageType>>) -> Result<()>; -} - -error_chain!{ - errors { - ReadFailure { - description("Could not read IPC message") - } - CouldNotStartServer { - description("Failed to start the IPC server") - } - InvalidMessage(message: Vec<u8>) { - description("The IPC server got a message it did not know how to handle") - } - } -} diff --git a/talpid_ipc/src/ipc/nop_ipc.rs b/talpid_ipc/src/ipc/nop_ipc.rs deleted file mode 100644 index 56e3eb8799..0000000000 --- a/talpid_ipc/src/ipc/nop_ipc.rs +++ /dev/null @@ -1,17 +0,0 @@ -/// This file only exists because we cannot get ZeroMQ to work on -/// Windows. This is not a valid IPC implementation and us using -/// it on Windows will result in a non-functioning client. -/// -/// We plan on trying with ZMQ again in the future. -/// Erik, 2017-02-09 - -use ipc::{IpcServer, OnMessage, ErrorKind, Result}; - -pub struct NopIpcServer; -impl IpcServer for NopIpcServer { - type MessageType = String; - - fn start(self, _on_message: Box<OnMessage<Self::MessageType>>) -> Result<()> { - Err(ErrorKind::CouldNotStartServer.into()) - } -} diff --git a/talpid_ipc/src/ipc/zmq_ipc.rs b/talpid_ipc/src/ipc/zmq_ipc.rs deleted file mode 100644 index 733f42ca42..0000000000 --- a/talpid_ipc/src/ipc/zmq_ipc.rs +++ /dev/null @@ -1,156 +0,0 @@ -extern crate zmq; - -use ipc::{IpcServer, OnMessage, ErrorKind, Result, ResultExt}; -use std::thread; - -/// The signature of functions that can be used to parse the incoming data -/// This is very very similar to `TryFrom` on purpose because I wanted `TryFrom`, -/// but I couldn't get it to work. -type MessageParser<MessageType> = Fn(Vec<u8>) -> Result<MessageType> + Send + 'static; - -/// Implements the `IpcServer` trait using a ZeroMQ PULL socket -/// The IPC server can parse any kind of message, so when you create -/// the `ZmqIpcServer` you supply it with a `parser` that converts -/// the read bytes into the type you want. -pub struct ZmqIpcServer<T> - where T: 'static -{ - parser: Box<MessageParser<T>>, - port: u16, -} - -impl<T> IpcServer for ZmqIpcServer<T> - where T: 'static -{ - type MessageType = T; - - fn start(self, on_message: Box<OnMessage<T>>) -> Result<()> { - let socket = - Self::start_zmq_server(self.port).chain_err(|| ErrorKind::CouldNotStartServer)?; - let _ = Self::start_receive_loop(socket, on_message, self.parser); - Ok(()) - } -} - -impl<T> ZmqIpcServer<T> - where T: 'static -{ - fn start_zmq_server(port: u16) -> zmq::Result<zmq::Socket> { - let ctx = zmq::Context::new(); - - let socket = ctx.socket(zmq::PULL)?; - let connection_string = format!("tcp://127.0.0.1:{}", port); - socket.bind(&connection_string)?; - - Ok(socket) - } - - fn start_receive_loop(socket: zmq::Socket, - mut on_message: Box<OnMessage<T>>, - parser: Box<MessageParser<T>>) - -> thread::JoinHandle<()> { - - thread::spawn(move || loop { - let read_res = Self::read(&socket, &parser); - on_message(read_res) - }) - } - - fn read(socket: &zmq::Socket, parser: &Box<MessageParser<T>>) -> Result<T> { - let bytes = socket.recv_bytes(0).chain_err(|| ErrorKind::ReadFailure)?; - parser(bytes) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use ipc::{IpcServer, Result, ErrorKind}; - use std::result; - use std::sync::mpsc::{self, Receiver}; - use std::time::Duration; - extern crate zmq; - - const A_VALID_MESSAGE: u8 = 1; - const AN_INVALID_MESSAGE: u8 = 2; - - #[test] - fn gives_error_when_unable_to_start() { - let port = 1340; - - let ipc_server1 = ZmqIpcServer { - port: port, - parser: Box::new(parse_to_test_enum), - }; - let ipc_server2 = ZmqIpcServer { - port: port, - parser: Box::new(parse_to_test_enum), - }; - - ipc_server1.start(Box::new(|_| {})) - .expect("Unable to start the first server. Results inconclusive"); - let start_res = ipc_server2.start(Box::new(|_| {})); - - assert!(start_res.is_err()); - let err = start_res.unwrap_err(); - assert_matches!(err.kind(), &ErrorKind::CouldNotStartServer); - assert!(err.iter().count() > 1) - } - - #[test] - fn publishes_incoming_messages_to_channel() { - let new_messages_rx = connect_and_send(1337, A_VALID_MESSAGE); - - let message = new_messages_rx.recv_timeout(Duration::from_millis(1000)) - .expect("Did not receive a message"); - assert_matches!(message, Ok(TestMessage::HELLO)); - } - - #[test] - fn does_not_publish_unknown_messages() { - let rx = connect_and_send(1338, AN_INVALID_MESSAGE); - - let message = rx.recv_timeout(Duration::from_millis(1000)) - .expect("Did not receive message"); - - assert_matches!(message.unwrap_err().kind(), &ErrorKind::InvalidMessage(_)); - } - - fn connect_and_send(port: u16, message: u8) -> Receiver<Result<TestMessage>> { - let (tx, rx) = mpsc::channel(); - - let ipc_server = ZmqIpcServer { - port: port, - parser: Box::new(parse_to_test_enum), - }; - ipc_server.start(Box::new(move |message| { let _ = tx.send(message); })) - .expect("Could not start the server"); - - let socket = connect_to_server(port).expect("Could not connect to the server"); - socket.send(&[message], 0).expect("Could not send message"); - - rx - } - - fn connect_to_server(port: u16) -> result::Result<zmq::Socket, zmq::Error> { - let ctx = zmq::Context::new(); - - let socket = ctx.socket(zmq::PUSH)?; - let connection_string: String = format!("tcp://127.0.0.1:{}", port); - try!(socket.connect(connection_string.as_str())); - Ok(socket) - } - - fn parse_to_test_enum(message_as_bytes: Vec<u8>) -> Result<TestMessage> { - if message_as_bytes[0] == A_VALID_MESSAGE { - Ok(TestMessage::HELLO) - } else { - Err(ErrorKind::InvalidMessage(message_as_bytes).into()) - } - } - - #[derive(Debug, PartialEq)] - pub enum TestMessage { - HELLO, - } -} diff --git a/talpid_ipc/src/lib.rs b/talpid_ipc/src/lib.rs index 1cc3ec497c..75ace1d87e 100644 --- a/talpid_ipc/src/lib.rs +++ b/talpid_ipc/src/lib.rs @@ -1,9 +1,31 @@ -#[cfg(test)] -#[macro_use] -extern crate assert_matches; - #[macro_use] extern crate error_chain; -mod ipc; -pub use ipc::*; +#[cfg(windows)] +#[path = "nop_ipc.rs"] +mod ipc_impl; + +#[cfg(not(windows))] +#[path = "zmq_ipc.rs"] +mod ipc_impl; + +pub use self::ipc_impl::*; + +/// The type signature for functions accepting messages from the server. +/// If the server fails in delivering the message for any reason it will +/// put the cause in the Err part of the `Result`. +pub type OnMessage<MessageType> = FnMut(Result<MessageType>) + Send + 'static; + +error_chain!{ + errors { + ReadFailure { + description("Could not read IPC message") + } + CouldNotStartServer { + description("Failed to start the IPC server") + } + InvalidMessage(message: Vec<u8>) { + description("The IPC server got a message it did not know how to handle") + } + } +} diff --git a/talpid_ipc/src/nop_ipc.rs b/talpid_ipc/src/nop_ipc.rs new file mode 100644 index 0000000000..6378dd3213 --- /dev/null +++ b/talpid_ipc/src/nop_ipc.rs @@ -0,0 +1,16 @@ +use super::{OnMessage, ErrorKind, Result}; + +/// This implementation only exists because we cannot get ZeroMQ to work on +/// Windows. This is not a valid IPC implementation and us using +/// it on Windows will result in a non-functioning client. +/// +/// We plan on trying with ZMQ again in the future. +/// Erik, 2017-02-09 +pub struct Server; +impl<T> Server<T> + where T: 'static +{ + fn start(self, _on_message: Box<OnMessage<T>>) -> Result<()> { + Err(ErrorKind::CouldNotStartServer.into()) + } +} diff --git a/talpid_ipc/src/zmq_ipc.rs b/talpid_ipc/src/zmq_ipc.rs new file mode 100644 index 0000000000..ff4c4372a3 --- /dev/null +++ b/talpid_ipc/src/zmq_ipc.rs @@ -0,0 +1,74 @@ +extern crate zmq; + +use super::{OnMessage, ErrorKind, Result, ResultExt}; +use std::thread; + +/// The signature of functions that can be used to parse the incoming data +/// This is very very similar to `TryFrom` on purpose because I wanted `TryFrom`, +/// but I couldn't get it to work. +type MessageParser<MessageType> = Fn(Vec<u8>) -> Result<MessageType> + Send + 'static; + +type IpcServerId = String; + +/// The server end of our Inter-Process Communcation implementation. +/// It can parse any kind of message, so when you create +/// it you supply it with a `parser` that converts +/// the read bytes into the type you want. +pub struct Server<T> + where T: 'static +{ + parser: Box<MessageParser<T>>, + port: u16, +} + +impl<T> Server<T> + where T: 'static +{ + pub fn new(port: u16, parser: Box<MessageParser<T>>) -> Self { + Server { + port: port, + parser: parser, + } + } + + /// Starts listening to incoming IPC connections on the specified port. + /// Messages are sent to the `on_message` callback. If anything went wrong + /// when reading or parsing the message, the message will be an `Err`. + /// NOTE that this does not apply to errors regarding whether the server + /// could start or not, those are returned directly by this function. + /// + /// This function is non-blocking and thus spawns a thread where it + /// listens to messages. + pub fn start(self, on_message: Box<OnMessage<T>>) -> Result<IpcServerId> { + let socket = + Self::start_zmq_server(self.port).chain_err(|| ErrorKind::CouldNotStartServer)?; + let _ = Self::start_receive_loop(socket, on_message, self.parser); + Ok(format!("localhost:{}", self.port)) + } + + fn start_zmq_server(port: u16) -> zmq::Result<zmq::Socket> { + let ctx = zmq::Context::new(); + + let socket = ctx.socket(zmq::PULL)?; + let connection_string = format!("tcp://127.0.0.1:{}", port); + socket.bind(&connection_string)?; + + Ok(socket) + } + + fn start_receive_loop(socket: zmq::Socket, + mut on_message: Box<OnMessage<T>>, + parser: Box<MessageParser<T>>) + -> thread::JoinHandle<()> { + + thread::spawn(move || loop { + let read_res = Self::read(&socket, &parser); + on_message(read_res) + }) + } + + fn read(socket: &zmq::Socket, parser: &Box<MessageParser<T>>) -> Result<T> { + let bytes = socket.recv_bytes(0).chain_err(|| ErrorKind::ReadFailure)?; + parser(bytes) + } +} diff --git a/talpid_ipc/tests/zmq_integration_tests.rs b/talpid_ipc/tests/zmq_integration_tests.rs new file mode 100644 index 0000000000..6a6314c57e --- /dev/null +++ b/talpid_ipc/tests/zmq_integration_tests.rs @@ -0,0 +1,97 @@ +extern crate talpid_ipc; +extern crate zmq; + +#[macro_use] +extern crate assert_matches; + +use std::result; +use std::sync::mpsc::{self, Receiver}; +use std::time::Duration; +use talpid_ipc::{ErrorKind, Result}; + +const A_VALID_MESSAGE: u8 = 1; +const AN_INVALID_MESSAGE: u8 = 2; + +#[test] +fn returns_connection_string_when_started() { + let port = 1341; + let connection_string = talpid_ipc::Server::new(port, Box::new(|_| Ok(()))) + .start(Box::new(|_| {})) + .expect("Unable to start server"); + + assert!(connection_string.contains("localhost"), + format!("'{}' did not contain 'localhost'", connection_string)); + assert!(connection_string.contains(&port.to_string()), + format!("'{}' did not contain the port", connection_string)); +} + +#[test] +fn gives_error_when_unable_to_start() { + let port = 1340; + + let ipc_server1 = talpid_ipc::Server::new(port, Box::new(parse_to_test_enum)); + let ipc_server2 = talpid_ipc::Server::new(port, Box::new(parse_to_test_enum)); + + ipc_server1.start(Box::new(|_| {})) + .expect("Unable to start the first server. Results inconclusive"); + let start_res = ipc_server2.start(Box::new(|_| {})); + + assert!(start_res.is_err()); + let err = start_res.unwrap_err(); + assert_matches!(err.kind(), &ErrorKind::CouldNotStartServer); + assert!(err.iter().count() > 1) +} + +#[test] +fn publishes_incoming_messages_to_channel() { + let new_messages_rx = connect_and_send(1337, A_VALID_MESSAGE); + + let message = new_messages_rx.recv_timeout(Duration::from_millis(1000)) + .expect("Did not receive a message"); + assert_matches!(message, Ok(TestMessage::HELLO)); +} + +#[test] +fn does_not_publish_unknown_messages() { + let rx = connect_and_send(1338, AN_INVALID_MESSAGE); + + let message = rx.recv_timeout(Duration::from_millis(1000)) + .expect("Did not receive message"); + + assert_matches!(message.unwrap_err().kind(), &ErrorKind::InvalidMessage(_)); +} + +fn connect_and_send(port: u16, message: u8) -> Receiver<Result<TestMessage>> { + let (tx, rx) = mpsc::channel(); + + let ipc_server = talpid_ipc::Server::new(port, Box::new(parse_to_test_enum)); + ipc_server.start(Box::new(move |message| { let _ = tx.send(message); })) + .expect("Could not start the server"); + + let socket = connect_to_server(port).expect("Could not connect to the server"); + socket.send(&[message], 0).expect("Could not send message"); + + rx +} + +fn connect_to_server(port: u16) -> result::Result<zmq::Socket, zmq::Error> { + let ctx = zmq::Context::new(); + + let socket = ctx.socket(zmq::PUSH)?; + let connection_string: String = format!("tcp://127.0.0.1:{}", port); + try!(socket.connect(connection_string.as_str())); + Ok(socket) +} + +fn parse_to_test_enum(message_as_bytes: Vec<u8>) -> Result<TestMessage> { + if message_as_bytes[0] == A_VALID_MESSAGE { + Ok(TestMessage::HELLO) + } else { + Err(ErrorKind::InvalidMessage(message_as_bytes).into()) + } +} + +#[derive(Debug, PartialEq)] +pub enum TestMessage { + HELLO, +} |
