diff options
| author | Erik Larkö <erik@mullvad.net> | 2017-03-22 12:34:33 +0800 |
|---|---|---|
| committer | Erik Larkö <erik@mullvad.net> | 2017-03-22 14:24:48 +0800 |
| commit | 0b849ba02420a6c1d47b7f13fa7c7105e232d2f3 (patch) | |
| tree | 8c365b55a9d5b867af5e2f618e3d932329c265dc | |
| parent | d7cbed11b6569098b7ff569db17ed48f75e94827 (diff) | |
| download | mullvadvpn-0b849ba02420a6c1d47b7f13fa7c7105e232d2f3.tar.xz mullvadvpn-0b849ba02420a6c1d47b7f13fa7c7105e232d2f3.zip | |
Have the IPC API support responses
| -rw-r--r-- | talpid_ipc/src/http_ipc.rs | 82 | ||||
| -rw-r--r-- | talpid_ipc/src/lib.rs | 3 | ||||
| -rw-r--r-- | talpid_ipc/tests/http_integration_tests.rs | 8 |
3 files changed, 66 insertions, 27 deletions
diff --git a/talpid_ipc/src/http_ipc.rs b/talpid_ipc/src/http_ipc.rs index 2ca438d666..ce06c2e356 100644 --- a/talpid_ipc/src/http_ipc.rs +++ b/talpid_ipc/src/http_ipc.rs @@ -4,11 +4,13 @@ extern crate hyper; use super::{ErrorKind, Result, ResultExt, IpcServerId}; use serde; +use std::io::Read; use std::thread; -pub fn start_new_server<T, F>(on_message: F) -> Result<IpcServerId> +pub fn start_new_server<T, U, F>(on_message: F) -> Result<IpcServerId> where T: serde::Deserialize + 'static, - F: FnMut(Result<T>) + Send + 'static + U: serde::Serialize, + F: FnMut(Result<T>) -> U + Send + 'static { let addr = "127.0.0.1:5000"; @@ -26,58 +28,90 @@ fn chain_boxed_err(boxed_cause: Box<::std::error::Error>, new_error: ErrorKind) super::Error::with_chain(cause, new_error) } -fn start_receive_loop<T, F>(server: tiny_http::Server, mut on_message: F) +fn start_receive_loop<T, U, F>(server: tiny_http::Server, mut on_message: F) where T: serde::Deserialize + 'static, - F: FnMut(Result<T>) + Send + 'static + U: serde::Serialize, + F: FnMut(Result<T>) -> U + Send + 'static { thread::spawn(move || for mut request in server.incoming_requests() { - let read_res = read_body_as_string(&mut request) - .and_then(|s| serde_json::from_str(&s).chain_err(|| ErrorKind::ParseFailure)); + let read_res = parse_request(&mut request); + let response = on_message(read_res); + let reply_res = send_response(&response, request); - on_message(read_res); - - let _ = request.respond(tiny_http::Response::from_string("Ok")); + if reply_res.is_err() { + error!("Failed sending reply to request, {}", + reply_res.unwrap_err()); + } }); } +fn parse_request<T: serde::Deserialize>(request: &mut tiny_http::Request) -> Result<T> { + read_body_as_string(request) + .and_then(|s| serde_json::from_str(&s).chain_err(|| ErrorKind::ParseFailure)) +} + fn read_body_as_string(request: &mut tiny_http::Request) -> Result<String> { - let mut buffer = Vec::new(); - request.as_reader() - .read_to_end(&mut buffer) - .chain_err(|| ErrorKind::ReadFailure)?; + let mut buffer = String::new(); + let res = request.as_reader() + .read_to_string(&mut buffer) + .chain_err(|| ErrorKind::ReadFailure) + .map(|_| buffer); - let res = String::from_utf8(buffer).chain_err(|| ErrorKind::ReadFailure); debug!("HTTP IPC read body {:?}", res); res } -pub struct IpcClient<T> - where T: serde::Serialize +fn send_response<U: serde::Serialize>(response: &U, request: tiny_http::Request) -> Result<()> { + serde_json::to_string(response) + .chain_err(|| ErrorKind::ReplyFailure) + .and_then(|response_as_string| { + + debug!("HTTP IPC responding with {:?}", response_as_string); + request.respond(tiny_http::Response::from_string(response_as_string)) + .chain_err(|| ErrorKind::ReplyFailure) + }) +} + +pub struct IpcClient<T, U> + where T: serde::Serialize, + U: serde::Deserialize { server_id: IpcServerId, client: hyper::Client, - _phantom: ::std::marker::PhantomData<T>, + _phantom_t: ::std::marker::PhantomData<T>, + _phantom_u: ::std::marker::PhantomData<U>, } -impl<T> IpcClient<T> - where T: serde::Serialize +impl<T, U> IpcClient<T, U> + where T: serde::Serialize, + U: serde::Deserialize { pub fn new(server_id: IpcServerId) -> Self { IpcClient { server_id: server_id, client: hyper::Client::new(), - _phantom: ::std::marker::PhantomData, + _phantom_t: ::std::marker::PhantomData, + _phantom_u: ::std::marker::PhantomData, } } - pub fn send(&mut self, message: &T) -> Result<()> { + pub fn send(&mut self, message: &T) -> Result<U> { let message_json = serde_json::to_string(message).chain_err(|| ErrorKind::ParseFailure)?; debug!("HTTP IPC sending {}", message_json); - self.client + let mut reply = self.client .post(&self.server_id) .body(&message_json) .send() - .map(|_| ()) - .chain_err(|| ErrorKind::SendError) + .chain_err(|| ErrorKind::SendError)?; + + + Self::parse_reply(&mut reply) + } + + fn parse_reply(reply: &mut hyper::client::response::Response) -> Result<U> { + let mut buffer = String::new(); + reply.read_to_string(&mut buffer).chain_err(|| ErrorKind::ParseFailure)?; + + serde_json::from_str(&buffer).chain_err(|| ErrorKind::ParseFailure) } } diff --git a/talpid_ipc/src/lib.rs b/talpid_ipc/src/lib.rs index 1fcfe47962..2e32997620 100644 --- a/talpid_ipc/src/lib.rs +++ b/talpid_ipc/src/lib.rs @@ -35,5 +35,8 @@ error_chain!{ SendError { description("Unable to send message") } + ReplyFailure { + description("The server failed to reply to the request") + } } } diff --git a/talpid_ipc/tests/http_integration_tests.rs b/talpid_ipc/tests/http_integration_tests.rs index b9fcb942b1..72f6d190bc 100644 --- a/talpid_ipc/tests/http_integration_tests.rs +++ b/talpid_ipc/tests/http_integration_tests.rs @@ -11,16 +11,17 @@ mod http_integration_tests { #[test] fn can_connect_and_send_and_receive_messages() { - let (connection_string, new_messages_rx) = start_server::<String>(); + let (connection_string, server_messages) = start_server::<String>(); let mut ipc_client = http_ipc::IpcClient::new(connection_string); let msg = "Hello".to_owned(); - ipc_client.send(&msg).expect("Could not send message"); + let response: String = ipc_client.send(&msg).expect("Could not send message"); - let message = new_messages_rx.recv_timeout(Duration::from_millis(1000)) + let message = server_messages.recv_timeout(Duration::from_millis(1000)) .expect("Did not receive a message"); assert_eq!(message.unwrap(), "Hello", "Got wrong message"); + assert_eq!(response, "RESPONSE"); } fn start_server<T>() -> (IpcServerId, Receiver<Result<T>>) @@ -30,6 +31,7 @@ mod http_integration_tests { let connection_string = http_ipc::start_new_server(move |message: Result<T>| { let _ = tx.send(message); + "RESPONSE" }) .expect("Could not start the server"); |
