summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorErik Larkö <erik@mullvad.net>2017-03-22 12:34:33 +0800
committerErik Larkö <erik@mullvad.net>2017-03-22 14:24:48 +0800
commit0b849ba02420a6c1d47b7f13fa7c7105e232d2f3 (patch)
tree8c365b55a9d5b867af5e2f618e3d932329c265dc
parentd7cbed11b6569098b7ff569db17ed48f75e94827 (diff)
downloadmullvadvpn-0b849ba02420a6c1d47b7f13fa7c7105e232d2f3.tar.xz
mullvadvpn-0b849ba02420a6c1d47b7f13fa7c7105e232d2f3.zip
Have the IPC API support responses
-rw-r--r--talpid_ipc/src/http_ipc.rs82
-rw-r--r--talpid_ipc/src/lib.rs3
-rw-r--r--talpid_ipc/tests/http_integration_tests.rs8
3 files changed, 66 insertions, 27 deletions
diff --git a/talpid_ipc/src/http_ipc.rs b/talpid_ipc/src/http_ipc.rs
index 2ca438d666..ce06c2e356 100644
--- a/talpid_ipc/src/http_ipc.rs
+++ b/talpid_ipc/src/http_ipc.rs
@@ -4,11 +4,13 @@ extern crate hyper;
use super::{ErrorKind, Result, ResultExt, IpcServerId};
use serde;
+use std::io::Read;
use std::thread;
-pub fn start_new_server<T, F>(on_message: F) -> Result<IpcServerId>
+pub fn start_new_server<T, U, F>(on_message: F) -> Result<IpcServerId>
where T: serde::Deserialize + 'static,
- F: FnMut(Result<T>) + Send + 'static
+ U: serde::Serialize,
+ F: FnMut(Result<T>) -> U + Send + 'static
{
let addr = "127.0.0.1:5000";
@@ -26,58 +28,90 @@ fn chain_boxed_err(boxed_cause: Box<::std::error::Error>, new_error: ErrorKind)
super::Error::with_chain(cause, new_error)
}
-fn start_receive_loop<T, F>(server: tiny_http::Server, mut on_message: F)
+fn start_receive_loop<T, U, F>(server: tiny_http::Server, mut on_message: F)
where T: serde::Deserialize + 'static,
- F: FnMut(Result<T>) + Send + 'static
+ U: serde::Serialize,
+ F: FnMut(Result<T>) -> U + Send + 'static
{
thread::spawn(move || for mut request in server.incoming_requests() {
- let read_res = read_body_as_string(&mut request)
- .and_then(|s| serde_json::from_str(&s).chain_err(|| ErrorKind::ParseFailure));
+ let read_res = parse_request(&mut request);
+ let response = on_message(read_res);
+ let reply_res = send_response(&response, request);
- on_message(read_res);
-
- let _ = request.respond(tiny_http::Response::from_string("Ok"));
+ if reply_res.is_err() {
+ error!("Failed sending reply to request, {}",
+ reply_res.unwrap_err());
+ }
});
}
+fn parse_request<T: serde::Deserialize>(request: &mut tiny_http::Request) -> Result<T> {
+ read_body_as_string(request)
+ .and_then(|s| serde_json::from_str(&s).chain_err(|| ErrorKind::ParseFailure))
+}
+
fn read_body_as_string(request: &mut tiny_http::Request) -> Result<String> {
- let mut buffer = Vec::new();
- request.as_reader()
- .read_to_end(&mut buffer)
- .chain_err(|| ErrorKind::ReadFailure)?;
+ let mut buffer = String::new();
+ let res = request.as_reader()
+ .read_to_string(&mut buffer)
+ .chain_err(|| ErrorKind::ReadFailure)
+ .map(|_| buffer);
- let res = String::from_utf8(buffer).chain_err(|| ErrorKind::ReadFailure);
debug!("HTTP IPC read body {:?}", res);
res
}
-pub struct IpcClient<T>
- where T: serde::Serialize
+fn send_response<U: serde::Serialize>(response: &U, request: tiny_http::Request) -> Result<()> {
+ serde_json::to_string(response)
+ .chain_err(|| ErrorKind::ReplyFailure)
+ .and_then(|response_as_string| {
+
+ debug!("HTTP IPC responding with {:?}", response_as_string);
+ request.respond(tiny_http::Response::from_string(response_as_string))
+ .chain_err(|| ErrorKind::ReplyFailure)
+ })
+}
+
+pub struct IpcClient<T, U>
+ where T: serde::Serialize,
+ U: serde::Deserialize
{
server_id: IpcServerId,
client: hyper::Client,
- _phantom: ::std::marker::PhantomData<T>,
+ _phantom_t: ::std::marker::PhantomData<T>,
+ _phantom_u: ::std::marker::PhantomData<U>,
}
-impl<T> IpcClient<T>
- where T: serde::Serialize
+impl<T, U> IpcClient<T, U>
+ where T: serde::Serialize,
+ U: serde::Deserialize
{
pub fn new(server_id: IpcServerId) -> Self {
IpcClient {
server_id: server_id,
client: hyper::Client::new(),
- _phantom: ::std::marker::PhantomData,
+ _phantom_t: ::std::marker::PhantomData,
+ _phantom_u: ::std::marker::PhantomData,
}
}
- pub fn send(&mut self, message: &T) -> Result<()> {
+ pub fn send(&mut self, message: &T) -> Result<U> {
let message_json = serde_json::to_string(message).chain_err(|| ErrorKind::ParseFailure)?;
debug!("HTTP IPC sending {}", message_json);
- self.client
+ let mut reply = self.client
.post(&self.server_id)
.body(&message_json)
.send()
- .map(|_| ())
- .chain_err(|| ErrorKind::SendError)
+ .chain_err(|| ErrorKind::SendError)?;
+
+
+ Self::parse_reply(&mut reply)
+ }
+
+ fn parse_reply(reply: &mut hyper::client::response::Response) -> Result<U> {
+ let mut buffer = String::new();
+ reply.read_to_string(&mut buffer).chain_err(|| ErrorKind::ParseFailure)?;
+
+ serde_json::from_str(&buffer).chain_err(|| ErrorKind::ParseFailure)
}
}
diff --git a/talpid_ipc/src/lib.rs b/talpid_ipc/src/lib.rs
index 1fcfe47962..2e32997620 100644
--- a/talpid_ipc/src/lib.rs
+++ b/talpid_ipc/src/lib.rs
@@ -35,5 +35,8 @@ error_chain!{
SendError {
description("Unable to send message")
}
+ ReplyFailure {
+ description("The server failed to reply to the request")
+ }
}
}
diff --git a/talpid_ipc/tests/http_integration_tests.rs b/talpid_ipc/tests/http_integration_tests.rs
index b9fcb942b1..72f6d190bc 100644
--- a/talpid_ipc/tests/http_integration_tests.rs
+++ b/talpid_ipc/tests/http_integration_tests.rs
@@ -11,16 +11,17 @@ mod http_integration_tests {
#[test]
fn can_connect_and_send_and_receive_messages() {
- let (connection_string, new_messages_rx) = start_server::<String>();
+ let (connection_string, server_messages) = start_server::<String>();
let mut ipc_client = http_ipc::IpcClient::new(connection_string);
let msg = "Hello".to_owned();
- ipc_client.send(&msg).expect("Could not send message");
+ let response: String = ipc_client.send(&msg).expect("Could not send message");
- let message = new_messages_rx.recv_timeout(Duration::from_millis(1000))
+ let message = server_messages.recv_timeout(Duration::from_millis(1000))
.expect("Did not receive a message");
assert_eq!(message.unwrap(), "Hello", "Got wrong message");
+ assert_eq!(response, "RESPONSE");
}
fn start_server<T>() -> (IpcServerId, Receiver<Result<T>>)
@@ -30,6 +31,7 @@ mod http_integration_tests {
let connection_string = http_ipc::start_new_server(move |message: Result<T>| {
let _ = tx.send(message);
+ "RESPONSE"
})
.expect("Could not start the server");