diff options
| -rw-r--r-- | talpid_ipc/Cargo.toml | 3 | ||||
| -rw-r--r-- | talpid_ipc/src/lib.rs | 10 | ||||
| -rw-r--r-- | talpid_ipc/src/nop_ipc.rs | 34 | ||||
| -rw-r--r-- | talpid_ipc/src/zmq_ipc.rs | 113 | ||||
| -rw-r--r-- | talpid_ipc/tests/zmq_integration_tests.rs | 37 |
5 files changed, 0 insertions, 197 deletions
diff --git a/talpid_ipc/Cargo.toml b/talpid_ipc/Cargo.toml index f7ccfef19f..d1a2bb13e2 100644 --- a/talpid_ipc/Cargo.toml +++ b/talpid_ipc/Cargo.toml @@ -14,9 +14,6 @@ jsonrpc-ws-server = { git = "https://github.com/faern/jsonrpc", branch = "bind-z ws = { git = "https://github.com/tomusdrw/ws-rs" } url = "1.4" -[target.'cfg(not(windows))'.dependencies] -zmq = "0.8" - [dev-dependencies] assert_matches = "1.0" env_logger = "0.4" diff --git a/talpid_ipc/src/lib.rs b/talpid_ipc/src/lib.rs index f0f50e5f2b..f6d4acea19 100644 --- a/talpid_ipc/src/lib.rs +++ b/talpid_ipc/src/lib.rs @@ -18,16 +18,6 @@ use jsonrpc_ws_server::{MetaExtractor, NoopExtractor, Server, ServerBuilder}; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; -#[cfg(windows)] -#[path = "nop_ipc.rs"] -mod ipc_impl; - -#[cfg(not(windows))] -#[path = "zmq_ipc.rs"] -mod ipc_impl; - -pub use self::ipc_impl::*; - mod client; pub use client::*; diff --git a/talpid_ipc/src/nop_ipc.rs b/talpid_ipc/src/nop_ipc.rs deleted file mode 100644 index 8c7a24bc63..0000000000 --- a/talpid_ipc/src/nop_ipc.rs +++ /dev/null @@ -1,34 +0,0 @@ -use super::{ErrorKind, IpcServerId, Result}; - -use serde; - -/// This implementation only exists because we cannot get ZeroMQ to work on -/// Windows. This is not a valid IPC implementation and us using -/// it on Windows will result in a non-functioning client. -/// -/// We plan on trying with ZMQ again in the future. -/// Erik, 2017-02-09 -pub fn start_new_server<T, F>(_on_message: F) -> Result<IpcServerId> - where for<'de> T: serde::Deserialize<'de> + 'static, - F: FnMut(Result<T>) + Send + 'static -{ - bail!(ErrorKind::CouldNotStartServer); -} - -pub struct IpcClient<T> - where T: serde::Serialize -{ - _phantom: ::std::marker::PhantomData<T>, -} - -impl<T> IpcClient<T> - where T: serde::Serialize -{ - pub fn new(_server_id: IpcServerId) -> Self { - IpcClient { _phantom: ::std::marker::PhantomData } - } - - pub fn send(&mut self, _message: &T) -> Result<()> { - bail!(ErrorKind::SendError); - } -} diff --git a/talpid_ipc/src/zmq_ipc.rs b/talpid_ipc/src/zmq_ipc.rs deleted file mode 100644 index 35fe5e59a9..0000000000 --- a/talpid_ipc/src/zmq_ipc.rs +++ /dev/null @@ -1,113 +0,0 @@ -extern crate zmq; -extern crate serde_json; - -use super::{ErrorKind, IpcServerId, Result, ResultExt}; - -use serde; - -use std::thread; - -/// Starts the server end of an IPC channel. The returned `IpcServerId` is the unique identifier -/// allowing an `IpcClient` to connect to this server instance. Returns an error if unable to set -/// up the server. -/// -/// Incoming messages are sent as `Ok` results to the `on_message` callback. IO errors will be sent -/// as `Err` results to the `on_message` callback. -/// -/// This function is non-blocking and thus spawns a thread where it listens to messages. -pub fn start_new_server<T, F>(on_message: F) -> Result<IpcServerId> - where for<'de> T: serde::Deserialize<'de> + 'static, - F: FnMut(Result<T>) + Send + 'static -{ - for port in 5000..5010 { - let connection_string = format!("tcp://127.0.0.1:{}", port); - if let Ok(socket) = start_zmq_server(&connection_string) { - let _ = start_receive_loop(socket, on_message); - debug!("Listening on {}", connection_string); - return Ok(connection_string); - } - } - bail!(ErrorKind::CouldNotStartServer); -} - -fn start_zmq_server(connection_string: &str) -> zmq::Result<zmq::Socket> { - let ctx = zmq::Context::new(); - - let socket = ctx.socket(zmq::PULL)?; - socket.bind(connection_string)?; - - Ok(socket) -} - -fn start_receive_loop<T, F>(socket: zmq::Socket, mut on_message: F) -> thread::JoinHandle<()> - where for<'de> T: serde::Deserialize<'de> + 'static, - F: FnMut(Result<T>) + Send + 'static -{ - thread::spawn( - move || loop { - let read_res = socket - .recv_bytes(0) - .chain_err(|| ErrorKind::ReadFailure) - .and_then(|a| parse_message(&a)); - on_message(read_res); - }, - ) -} - -fn parse_message<'a, T>(message: &'a [u8]) -> Result<T> - where T: serde::Deserialize<'a> + 'static -{ - serde_json::from_slice(message).chain_err(|| ErrorKind::ParseFailure) -} - - -pub struct IpcClient<T> - where T: serde::Serialize -{ - server_id: IpcServerId, - socket: Option<zmq::Socket>, - _phantom: ::std::marker::PhantomData<T>, -} - -impl<T> IpcClient<T> - where T: serde::Serialize -{ - pub fn new(server_id: IpcServerId) -> Self { - IpcClient { - server_id: server_id, - socket: None, - _phantom: ::std::marker::PhantomData, - } - } - - pub fn send(&mut self, message: &T) -> Result<()> { - let bytes = Self::serialize(message)?; - self.send_bytes(bytes.as_slice()) - } - - fn serialize(t: &T) -> Result<Vec<u8>> { - serde_json::to_vec(t).chain_err(|| ErrorKind::ParseFailure) - } - - fn send_bytes(&mut self, message: &[u8]) -> Result<()> { - if self.socket.is_none() { - self.connect().chain_err(|| ErrorKind::SendError)?; - } - - let socket = self.socket.as_ref().unwrap(); - socket.send(message, 0).chain_err(|| ErrorKind::SendError) - } - - fn connect(&mut self) -> Result<()> { - debug!("Trying to establish connection to {}", self.server_id); - let ctx = zmq::Context::new(); - let socket = ctx.socket(zmq::PUSH) - .chain_err(|| "Could not create ZeroMQ PUSH socket".to_owned())?; - socket - .connect(&self.server_id) - .chain_err(|| format!("Could not connect to {:?}", self.server_id))?; - - self.socket = Some(socket); - Ok(()) - } -} diff --git a/talpid_ipc/tests/zmq_integration_tests.rs b/talpid_ipc/tests/zmq_integration_tests.rs deleted file mode 100644 index db9650f7a0..0000000000 --- a/talpid_ipc/tests/zmq_integration_tests.rs +++ /dev/null @@ -1,37 +0,0 @@ -#[cfg(all(test, not(windows)))] -mod zmq_integration_tests { - extern crate serde; - extern crate talpid_ipc; - - use self::talpid_ipc::{IpcClient, IpcServerId, Result}; - - use std::sync::mpsc::{self, Receiver}; - use std::time::Duration; - - #[test] - fn can_connect_and_send_and_receive_messages() { - let (connection_string, new_messages_rx) = start_server::<String>(); - - let mut ipc_client = IpcClient::new(connection_string); - let msg = "Hello".to_owned(); - ipc_client.send(&msg).expect("Could not send message"); - - let message = new_messages_rx - .recv_timeout(Duration::from_millis(1000)) - .expect("Did not receive a message"); - - assert_eq!(message.unwrap(), "Hello", "Got wrong message"); - } - - fn start_server<T>() -> (IpcServerId, Receiver<Result<T>>) - where for<'de> T: serde::Deserialize<'de> + Send + 'static - { - let (tx, rx) = mpsc::channel(); - - let callback = move |message: Result<T>| { let _ = tx.send(message); }; - let connection_string = - talpid_ipc::start_new_server(callback).expect("Could not start the server"); - - (connection_string, rx) - } -} |
