summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorJanito Vaqueiro Ferreira Filho <janito@mullvad.net>2018-05-14 17:25:40 -0300
committerJanito Vaqueiro Ferreira Filho <janito@mullvad.net>2018-05-16 09:22:18 -0300
commit12b623ba572503434b8264fa937345eccca47ef1 (patch)
treeb500bc54d76855cebe536f74430e1aad0be78fc2
parent41a4456b3e8df29d6688fcad3ac63625cfca7cb1 (diff)
downloadmullvadvpn-12b623ba572503434b8264fa937345eccca47ef1.tar.xz
mullvadvpn-12b623ba572503434b8264fa937345eccca47ef1.zip
Use a separate thread to handle RPC connection
-rw-r--r--talpid-ipc/src/client.rs200
1 files changed, 137 insertions, 63 deletions
diff --git a/talpid-ipc/src/client.rs b/talpid-ipc/src/client.rs
index a1ff460ce5..9a32a82e3a 100644
--- a/talpid-ipc/src/client.rs
+++ b/talpid-ipc/src/client.rs
@@ -1,4 +1,4 @@
-use std::sync::{mpsc, Arc, Mutex, MutexGuard};
+use std::sync::mpsc;
use std::thread;
use error_chain::ChainedError;
@@ -17,6 +17,10 @@ mod errors {
display("Failed to connect to RPC server: {}", details)
}
+ ConnectionHandlerStopped {
+ description("The WebSocket connection handler thread has stopped")
+ }
+
ErrorResponse(error_message: String) {
description("Received an RPC error response")
display("Received an RPC error response: {}", error_message)
@@ -48,6 +52,10 @@ mod errors {
)
}
+ SerializeArgumentsError {
+ description("Failed to serialize JSON-RPC request arguments")
+ }
+
WebSocketError {
description("Error with WebSocket connection")
}
@@ -76,8 +84,21 @@ impl ActiveRequest {
}
}
+enum WsIpcCommand {
+ Call {
+ method: String,
+ arguments: JsonValue,
+ response_tx: mpsc::Sender<Result<JsonValue>>,
+ },
+ Response {
+ id: i64,
+ result: Result<JsonValue>,
+ },
+ Error(Error),
+}
+
struct Factory {
- active_request: Arc<Mutex<Option<ActiveRequest>>>,
+ connection_tx: mpsc::Sender<WsIpcCommand>,
sender_tx: mpsc::Sender<ws::Sender>,
}
@@ -90,14 +111,14 @@ impl ws::Factory for Factory {
let _ = self.sender_tx.send(sender);
Handler {
- active_request: self.active_request.clone(),
+ connection_tx: self.connection_tx.clone(),
}
}
}
struct Handler {
- active_request: Arc<Mutex<Option<ActiveRequest>>>,
+ connection_tx: mpsc::Sender<WsIpcCommand>,
}
impl Handler {
@@ -107,18 +128,12 @@ impl Handler {
let response_id = self.parse_response_id(&mut response_json_object)?;
let rpc_result = self.parse_response_result(response_json_object);
- let mut active_request = self.lock_active_request();
-
- if let Some(mut request) = active_request.take() {
- if response_id == request.id() {
- let _ = request.send_response(rpc_result);
- } else {
- warn!("Received an unexpect JSON-RPC message");
- *active_request = Some(request);
- }
- }
-
- Ok(())
+ self.connection_tx
+ .send(WsIpcCommand::Response {
+ id: response_id,
+ result: rpc_result,
+ })
+ .chain_err(|| ErrorKind::ConnectionHandlerStopped)
}
fn parse_message_object(&self, msg: ws::Message) -> Result<JsonMap> {
@@ -173,12 +188,6 @@ impl Handler {
).into()),
}
}
-
- fn lock_active_request(&mut self) -> MutexGuard<Option<ActiveRequest>> {
- self.active_request
- .lock()
- .expect("a thread panicked while using the active JSON-RPC request")
- }
}
impl ws::Handler for Handler {
@@ -192,40 +201,32 @@ impl ws::Handler for Handler {
}
fn on_error(&mut self, error: ws::Error) {
- if let Some(active_request) = self.lock_active_request().as_mut() {
- active_request.send_response(Err(error).chain_err(|| ErrorKind::WebSocketError));
- }
+ let error = Error::with_chain(error, ErrorKind::WebSocketError);
+
+ let _ = self.connection_tx.send(WsIpcCommand::Error(error));
}
}
-
pub struct WsIpcClient {
- next_id: i64,
- active_request: Arc<Mutex<Option<ActiveRequest>>>,
- sender: ws::Sender,
+ connection_tx: mpsc::Sender<WsIpcCommand>,
}
impl WsIpcClient {
pub fn connect(server_id: &::IpcServerId) -> Result<Self> {
let url = Url::parse(&server_id)
.chain_err(|| ErrorKind::InvalidServerIdUrl(server_id.to_owned()))?;
- let active_request = Arc::new(Mutex::new(None));
- let sender = Self::open_websocket(url, active_request.clone())?;
+ let (connection_tx, connection_rx) = mpsc::channel();
+ let sender = Self::open_websocket(url, connection_tx.clone())?;
- Ok(WsIpcClient {
- next_id: 1,
- active_request,
- sender,
- })
+ WsIpcClientConnection::spawn(sender, connection_rx);
+
+ Ok(WsIpcClient { connection_tx })
}
- fn open_websocket(
- url: Url,
- active_request: Arc<Mutex<Option<ActiveRequest>>>,
- ) -> Result<ws::Sender> {
+ fn open_websocket(url: Url, connection_tx: mpsc::Sender<WsIpcCommand>) -> Result<ws::Sender> {
let (sender_tx, sender_rx) = mpsc::channel();
let factory = Factory {
- active_request,
+ connection_tx,
sender_tx,
};
@@ -251,21 +252,81 @@ impl WsIpcClient {
.chain_err(|| ErrorKind::ConnectError("WebSocket connection failed"))
}
- pub fn call<T, O>(&mut self, method: &str, params: &T) -> Result<O>
+ pub fn call<S, T, O>(&mut self, method: S, params: &T) -> Result<O>
where
+ S: ToString,
T: serde::Serialize,
O: for<'de> serde::Deserialize<'de>,
{
- let id = self.new_id();
- let (result_tx, result_rx) = mpsc::channel();
+ let arguments =
+ serde_json::to_value(params).chain_err(|| ErrorKind::SerializeArgumentsError)?;
+ let (response_tx, response_rx) = mpsc::channel();
+ let command = WsIpcCommand::Call {
+ method: method.to_string(),
+ arguments,
+ response_tx,
+ };
- self.queue_request_response(id, result_tx);
- self.send_request(id, method, params)?;
+ self.connection_tx
+ .send(command)
+ .chain_err(|| ErrorKind::ConnectionHandlerStopped)?;
- let json_result = result_rx.recv().chain_err(|| ErrorKind::MissingResponse)?;
+ let json_result = response_rx.recv().chain_err(|| ErrorKind::MissingResponse)?;
Ok(serde_json::from_value(json_result?).chain_err(|| ErrorKind::DeserializeResponseError)?)
}
+}
+
+struct WsIpcClientConnection {
+ next_id: i64,
+ active_request: Option<ActiveRequest>,
+ sender: ws::Sender,
+}
+
+impl WsIpcClientConnection {
+ pub fn spawn(sender: ws::Sender, commands: mpsc::Receiver<WsIpcCommand>) {
+ let mut instance = WsIpcClientConnection {
+ next_id: 1,
+ active_request: None,
+ sender,
+ };
+
+ thread::spawn(move || {
+ if let Err(error) = instance.run(commands) {
+ let chained_error = Error::with_chain(error, "WsIpcClient event loop error");
+ error!("{}", chained_error.display_chain());
+ }
+ });
+ }
+
+ fn run(&mut self, commands: mpsc::Receiver<WsIpcCommand>) -> Result<()> {
+ use self::WsIpcCommand::*;
+
+ for command in commands {
+ match command {
+ Call {
+ method,
+ arguments,
+ response_tx,
+ } => self.call(method, arguments, response_tx)?,
+ Response { id, result } => self.handle_response(id, result)?,
+ Error(error) => self.handle_error(error),
+ }
+ }
+
+ Ok(())
+ }
+
+ fn call(
+ &mut self,
+ method: String,
+ arguments: JsonValue,
+ response_tx: mpsc::Sender<Result<JsonValue>>,
+ ) -> Result<()> {
+ let id = self.new_id();
+ self.queue_request_response(id, response_tx);
+ self.send_request(id, method, arguments)
+ }
fn new_id(&mut self) -> i64 {
let id = self.next_id;
@@ -273,29 +334,19 @@ impl WsIpcClient {
id
}
- fn queue_request_response(&mut self, id: i64, result_tx: mpsc::Sender<Result<JsonValue>>) {
- let mut active_request = self.active_request
- .lock()
- .expect("a thread panicked using the active RPC request map");
-
- *active_request = Some(ActiveRequest::new(id, result_tx));
+ fn queue_request_response(&mut self, id: i64, response_tx: mpsc::Sender<Result<JsonValue>>) {
+ self.active_request = Some(ActiveRequest::new(id, response_tx));
}
- fn send_request<T>(&mut self, id: i64, method: &str, params: &T) -> Result<()>
- where
- T: serde::Serialize,
- {
- let json_request = self.build_json_request(id, method, params);
+ fn send_request(&mut self, id: i64, method: String, arguments: JsonValue) -> Result<()> {
+ let json_request = self.build_json_request(id, &method, arguments);
self.sender
.send(json_request.as_bytes())
- .chain_err(|| ErrorKind::SendRequestError(method.to_owned()))
+ .chain_err(|| ErrorKind::SendRequestError(method))
}
- fn build_json_request<T>(&mut self, id: i64, method: &str, params: &T) -> String
- where
- T: serde::Serialize,
- {
+ fn build_json_request(&mut self, id: i64, method: &str, params: JsonValue) -> String {
let request_json = json!({
"jsonrpc": "2.0",
"id": id,
@@ -304,4 +355,27 @@ impl WsIpcClient {
});
format!("{}", request_json)
}
+
+ fn handle_response(&mut self, id: i64, result: Result<JsonValue>) -> Result<()> {
+ if let Some(mut request) = self.active_request.take() {
+ if request.id() == id {
+ request.send_response(result);
+ } else {
+ self.active_request = Some(request);
+ warn!("Received an unexpected response with ID {}", id);
+ }
+ } else {
+ warn!("Received an unexpected response with ID {}", id);
+ }
+
+ Ok(())
+ }
+
+ fn handle_error(&mut self, error: Error) {
+ if let Some(ref mut request) = self.active_request {
+ let _ = request.response_tx.send(Err(error));
+ } else {
+ error!("{}", error.display_chain());
+ }
+ }
}