summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorErik Larkö <erik@mullvad.net>2017-03-23 22:35:44 +0800
committerErik Larkö <erik@mullvad.net>2017-03-27 16:22:16 +0800
commit674e00e0bd2a86f322a0e076089c3136751809ad (patch)
treed15226d020f3bbaf3e5c36d3f3cd202e8919e078
parent49eadad3da54f0a549ff32cbfb4da31814559433 (diff)
downloadmullvadvpn-674e00e0bd2a86f322a0e076089c3136751809ad.tar.xz
mullvadvpn-674e00e0bd2a86f322a0e076089c3136751809ad.zip
Can stop the Http server
-rw-r--r--talpid_cli/src/main.rs1
-rw-r--r--talpid_ipc/src/http_ipc.rs95
2 files changed, 73 insertions, 23 deletions
diff --git a/talpid_cli/src/main.rs b/talpid_cli/src/main.rs
index 9bb7cfdc96..28e07a84f8 100644
--- a/talpid_cli/src/main.rs
+++ b/talpid_cli/src/main.rs
@@ -103,3 +103,4 @@ fn pass_io<I, O>(mut input: I, mut output: O)
{
thread::spawn(move || { io::copy(&mut input, &mut output).unwrap(); });
}
+
diff --git a/talpid_ipc/src/http_ipc.rs b/talpid_ipc/src/http_ipc.rs
index c7ad44586b..699bd80177 100644
--- a/talpid_ipc/src/http_ipc.rs
+++ b/talpid_ipc/src/http_ipc.rs
@@ -4,45 +4,94 @@ extern crate serde_json;
use super::{ErrorKind, Result, ResultExt, IpcServerId};
use serde;
use std::thread;
+use std::time::Duration;
+use std::sync::mpsc;
-pub fn start_new_server<T, U, F>(on_message: F) -> Result<IpcServerId>
+pub struct HttpServerHandle{
+ pub address: IpcServerId,
+ stop_tx: mpsc::SyncSender<u8>
+}
+impl HttpServerHandle {
+ pub fn stop(&self) {
+ let _ = self.stop_tx.send(0);
+ }
+}
+impl Drop for HttpServerHandle {
+ fn drop(&mut self) {
+ self.stop();
+ }
+}
+
+pub fn start_server<T, U, F>(on_message: F) -> Result<HttpServerHandle>
where T: serde::Deserialize + 'static,
- U: serde::Serialize,
- F: FnMut(Result<T>) -> U + Send + 'static
+ U: serde::Serialize,
+ F: FnMut(Result<T>) -> U + Send + 'static
{
- for port in 5000..5010 {
- let addr = format!("127.0.0.1:{}", port);
+ for port in 5000..5010 {
+ let addr = format!("127.0.0.1:{}", port);
- if let Ok(server) = start_http_server(&addr) {
- let _ = start_receive_loop(server, on_message);
- debug!("Started a HTTP IPC server on {}", addr);
- return Ok(format!("http://{}", addr));
- }
- }
+ if let Ok(server) = start_http_server(&addr) {
+ let (stop_tx, stop_rx) = mpsc::sync_channel(0);
+ let handle = HttpServerHandle {
+ stop_tx: stop_tx,
+ address: format!("http://{}", addr),
+ };
- bail!(ErrorKind::CouldNotStartServer)
+ start_receive_loop(on_message, server, stop_rx);
+ debug!("Started a HTTP IPC server on {}", addr);
+ return Ok(handle);
+ }
+ }
+ bail!(ErrorKind::CouldNotStartServer)
}
fn start_http_server(addr: &str) -> Result<tiny_http::Server> {
tiny_http::Server::http(addr).map_err(|e| ErrorKind::Msg(e.to_string()).into())
}
-fn start_receive_loop<T, U, F>(server: tiny_http::Server, mut on_message: F)
+fn start_receive_loop<T, U, F>(mut on_message: F, http_server: tiny_http::Server, stop_rx: mpsc::Receiver<u8>)
where T: serde::Deserialize + 'static,
- U: serde::Serialize,
- F: FnMut(Result<T>) -> U + Send + 'static
+ U: serde::Serialize,
+ F: FnMut(Result<T>) -> U + Send + 'static
{
- thread::spawn(move || for mut request in server.incoming_requests() {
- let read_res = parse_request(&mut request);
- let response = on_message(read_res);
- let reply_res = send_response(&response, request);
-
- if let Err(e) = reply_res {
- error!("Failed sending reply to request, {}", e);
+ thread::spawn(move || loop {
+ if should_stop(&stop_rx) {
+ debug!("Stopping the server");
+ break;
}
+
+ receive(&mut on_message, &http_server);
});
}
+fn should_stop(stop_rx: &mpsc::Receiver<u8>) -> bool {
+ match stop_rx.try_recv() {
+ Err(mpsc::TryRecvError::Empty) => false,
+ _ => true
+ }
+}
+
+fn receive<T, U, F>(on_message: &mut F, http_server: &tiny_http::Server)
+ where T: serde::Deserialize + 'static,
+ U: serde::Serialize,
+ F: FnMut(Result<T>) -> U + Send + 'static
+{
+ let req_res = http_server.recv_timeout(Duration::from_millis(1000));
+ match req_res {
+ Ok(Some(mut request)) => {
+ let read_res = parse_request(&mut request);
+ let response = on_message(read_res);
+ let reply_res = send_response(&response, request);
+
+ if let Err(e) = reply_res {
+ error!("Failed sending reply to request, {}", e);
+ }
+ }
+ Ok(None) => (),
+ Err(e) => error!("Failed receiving request: {}", e),
+ }
+}
+
fn parse_request<T: serde::Deserialize>(request: &mut tiny_http::Request) -> Result<T> {
let reader = request.as_reader();
let mut buffer = String::new();
@@ -62,4 +111,4 @@ fn send_response<U: serde::Serialize>(response: &U, request: tiny_http::Request)
request.respond(tiny_http::Response::from_string(response_as_string))
.chain_err(|| "Failed responding to HTTP request")
})
-} \ No newline at end of file
+}