diff options
| author | Erik Larkö <erik@mullvad.net> | 2017-03-23 22:35:44 +0800 |
|---|---|---|
| committer | Erik Larkö <erik@mullvad.net> | 2017-03-27 16:22:16 +0800 |
| commit | 674e00e0bd2a86f322a0e076089c3136751809ad (patch) | |
| tree | d15226d020f3bbaf3e5c36d3f3cd202e8919e078 | |
| parent | 49eadad3da54f0a549ff32cbfb4da31814559433 (diff) | |
| download | mullvadvpn-674e00e0bd2a86f322a0e076089c3136751809ad.tar.xz mullvadvpn-674e00e0bd2a86f322a0e076089c3136751809ad.zip | |
Can stop the Http server
| -rw-r--r-- | talpid_cli/src/main.rs | 1 | ||||
| -rw-r--r-- | talpid_ipc/src/http_ipc.rs | 95 |
2 files changed, 73 insertions, 23 deletions
diff --git a/talpid_cli/src/main.rs b/talpid_cli/src/main.rs index 9bb7cfdc96..28e07a84f8 100644 --- a/talpid_cli/src/main.rs +++ b/talpid_cli/src/main.rs @@ -103,3 +103,4 @@ fn pass_io<I, O>(mut input: I, mut output: O) { thread::spawn(move || { io::copy(&mut input, &mut output).unwrap(); }); } + diff --git a/talpid_ipc/src/http_ipc.rs b/talpid_ipc/src/http_ipc.rs index c7ad44586b..699bd80177 100644 --- a/talpid_ipc/src/http_ipc.rs +++ b/talpid_ipc/src/http_ipc.rs @@ -4,45 +4,94 @@ extern crate serde_json; use super::{ErrorKind, Result, ResultExt, IpcServerId}; use serde; use std::thread; +use std::time::Duration; +use std::sync::mpsc; -pub fn start_new_server<T, U, F>(on_message: F) -> Result<IpcServerId> +pub struct HttpServerHandle{ + pub address: IpcServerId, + stop_tx: mpsc::SyncSender<u8> +} +impl HttpServerHandle { + pub fn stop(&self) { + let _ = self.stop_tx.send(0); + } +} +impl Drop for HttpServerHandle { + fn drop(&mut self) { + self.stop(); + } +} + +pub fn start_server<T, U, F>(on_message: F) -> Result<HttpServerHandle> where T: serde::Deserialize + 'static, - U: serde::Serialize, - F: FnMut(Result<T>) -> U + Send + 'static + U: serde::Serialize, + F: FnMut(Result<T>) -> U + Send + 'static { - for port in 5000..5010 { - let addr = format!("127.0.0.1:{}", port); + for port in 5000..5010 { + let addr = format!("127.0.0.1:{}", port); - if let Ok(server) = start_http_server(&addr) { - let _ = start_receive_loop(server, on_message); - debug!("Started a HTTP IPC server on {}", addr); - return Ok(format!("http://{}", addr)); - } - } + if let Ok(server) = start_http_server(&addr) { + let (stop_tx, stop_rx) = mpsc::sync_channel(0); + let handle = HttpServerHandle { + stop_tx: stop_tx, + address: format!("http://{}", addr), + }; - bail!(ErrorKind::CouldNotStartServer) + start_receive_loop(on_message, server, stop_rx); + debug!("Started a HTTP IPC server on {}", addr); + return Ok(handle); + } + } + bail!(ErrorKind::CouldNotStartServer) } fn start_http_server(addr: &str) -> Result<tiny_http::Server> { tiny_http::Server::http(addr).map_err(|e| ErrorKind::Msg(e.to_string()).into()) } -fn start_receive_loop<T, U, F>(server: tiny_http::Server, mut on_message: F) +fn start_receive_loop<T, U, F>(mut on_message: F, http_server: tiny_http::Server, stop_rx: mpsc::Receiver<u8>) where T: serde::Deserialize + 'static, - U: serde::Serialize, - F: FnMut(Result<T>) -> U + Send + 'static + U: serde::Serialize, + F: FnMut(Result<T>) -> U + Send + 'static { - thread::spawn(move || for mut request in server.incoming_requests() { - let read_res = parse_request(&mut request); - let response = on_message(read_res); - let reply_res = send_response(&response, request); - - if let Err(e) = reply_res { - error!("Failed sending reply to request, {}", e); + thread::spawn(move || loop { + if should_stop(&stop_rx) { + debug!("Stopping the server"); + break; } + + receive(&mut on_message, &http_server); }); } +fn should_stop(stop_rx: &mpsc::Receiver<u8>) -> bool { + match stop_rx.try_recv() { + Err(mpsc::TryRecvError::Empty) => false, + _ => true + } +} + +fn receive<T, U, F>(on_message: &mut F, http_server: &tiny_http::Server) + where T: serde::Deserialize + 'static, + U: serde::Serialize, + F: FnMut(Result<T>) -> U + Send + 'static +{ + let req_res = http_server.recv_timeout(Duration::from_millis(1000)); + match req_res { + Ok(Some(mut request)) => { + let read_res = parse_request(&mut request); + let response = on_message(read_res); + let reply_res = send_response(&response, request); + + if let Err(e) = reply_res { + error!("Failed sending reply to request, {}", e); + } + } + Ok(None) => (), + Err(e) => error!("Failed receiving request: {}", e), + } +} + fn parse_request<T: serde::Deserialize>(request: &mut tiny_http::Request) -> Result<T> { let reader = request.as_reader(); let mut buffer = String::new(); @@ -62,4 +111,4 @@ fn send_response<U: serde::Serialize>(response: &U, request: tiny_http::Request) request.respond(tiny_http::Response::from_string(response_as_string)) .chain_err(|| "Failed responding to HTTP request") }) -}
\ No newline at end of file +} |
