diff options
| author | Janito Vaqueiro Ferreira Filho <janito@mullvad.net> | 2018-05-14 17:25:40 -0300 |
|---|---|---|
| committer | Janito Vaqueiro Ferreira Filho <janito@mullvad.net> | 2018-05-16 09:22:18 -0300 |
| commit | 12b623ba572503434b8264fa937345eccca47ef1 (patch) | |
| tree | b500bc54d76855cebe536f74430e1aad0be78fc2 | |
| parent | 41a4456b3e8df29d6688fcad3ac63625cfca7cb1 (diff) | |
| download | mullvadvpn-12b623ba572503434b8264fa937345eccca47ef1.tar.xz mullvadvpn-12b623ba572503434b8264fa937345eccca47ef1.zip | |
Use a separate thread to handle RPC connection
| -rw-r--r-- | talpid-ipc/src/client.rs | 200 |
1 files changed, 137 insertions, 63 deletions
diff --git a/talpid-ipc/src/client.rs b/talpid-ipc/src/client.rs index a1ff460ce5..9a32a82e3a 100644 --- a/talpid-ipc/src/client.rs +++ b/talpid-ipc/src/client.rs @@ -1,4 +1,4 @@ -use std::sync::{mpsc, Arc, Mutex, MutexGuard}; +use std::sync::mpsc; use std::thread; use error_chain::ChainedError; @@ -17,6 +17,10 @@ mod errors { display("Failed to connect to RPC server: {}", details) } + ConnectionHandlerStopped { + description("The WebSocket connection handler thread has stopped") + } + ErrorResponse(error_message: String) { description("Received an RPC error response") display("Received an RPC error response: {}", error_message) @@ -48,6 +52,10 @@ mod errors { ) } + SerializeArgumentsError { + description("Failed to serialize JSON-RPC request arguments") + } + WebSocketError { description("Error with WebSocket connection") } @@ -76,8 +84,21 @@ impl ActiveRequest { } } +enum WsIpcCommand { + Call { + method: String, + arguments: JsonValue, + response_tx: mpsc::Sender<Result<JsonValue>>, + }, + Response { + id: i64, + result: Result<JsonValue>, + }, + Error(Error), +} + struct Factory { - active_request: Arc<Mutex<Option<ActiveRequest>>>, + connection_tx: mpsc::Sender<WsIpcCommand>, sender_tx: mpsc::Sender<ws::Sender>, } @@ -90,14 +111,14 @@ impl ws::Factory for Factory { let _ = self.sender_tx.send(sender); Handler { - active_request: self.active_request.clone(), + connection_tx: self.connection_tx.clone(), } } } struct Handler { - active_request: Arc<Mutex<Option<ActiveRequest>>>, + connection_tx: mpsc::Sender<WsIpcCommand>, } impl Handler { @@ -107,18 +128,12 @@ impl Handler { let response_id = self.parse_response_id(&mut response_json_object)?; let rpc_result = self.parse_response_result(response_json_object); - let mut active_request = self.lock_active_request(); - - if let Some(mut request) = active_request.take() { - if response_id == request.id() { - let _ = request.send_response(rpc_result); - } else { - warn!("Received an unexpect JSON-RPC message"); - *active_request = Some(request); - } - } - - Ok(()) + self.connection_tx + .send(WsIpcCommand::Response { + id: response_id, + result: rpc_result, + }) + .chain_err(|| ErrorKind::ConnectionHandlerStopped) } fn parse_message_object(&self, msg: ws::Message) -> Result<JsonMap> { @@ -173,12 +188,6 @@ impl Handler { ).into()), } } - - fn lock_active_request(&mut self) -> MutexGuard<Option<ActiveRequest>> { - self.active_request - .lock() - .expect("a thread panicked while using the active JSON-RPC request") - } } impl ws::Handler for Handler { @@ -192,40 +201,32 @@ impl ws::Handler for Handler { } fn on_error(&mut self, error: ws::Error) { - if let Some(active_request) = self.lock_active_request().as_mut() { - active_request.send_response(Err(error).chain_err(|| ErrorKind::WebSocketError)); - } + let error = Error::with_chain(error, ErrorKind::WebSocketError); + + let _ = self.connection_tx.send(WsIpcCommand::Error(error)); } } - pub struct WsIpcClient { - next_id: i64, - active_request: Arc<Mutex<Option<ActiveRequest>>>, - sender: ws::Sender, + connection_tx: mpsc::Sender<WsIpcCommand>, } impl WsIpcClient { pub fn connect(server_id: &::IpcServerId) -> Result<Self> { let url = Url::parse(&server_id) .chain_err(|| ErrorKind::InvalidServerIdUrl(server_id.to_owned()))?; - let active_request = Arc::new(Mutex::new(None)); - let sender = Self::open_websocket(url, active_request.clone())?; + let (connection_tx, connection_rx) = mpsc::channel(); + let sender = Self::open_websocket(url, connection_tx.clone())?; - Ok(WsIpcClient { - next_id: 1, - active_request, - sender, - }) + WsIpcClientConnection::spawn(sender, connection_rx); + + Ok(WsIpcClient { connection_tx }) } - fn open_websocket( - url: Url, - active_request: Arc<Mutex<Option<ActiveRequest>>>, - ) -> Result<ws::Sender> { + fn open_websocket(url: Url, connection_tx: mpsc::Sender<WsIpcCommand>) -> Result<ws::Sender> { let (sender_tx, sender_rx) = mpsc::channel(); let factory = Factory { - active_request, + connection_tx, sender_tx, }; @@ -251,21 +252,81 @@ impl WsIpcClient { .chain_err(|| ErrorKind::ConnectError("WebSocket connection failed")) } - pub fn call<T, O>(&mut self, method: &str, params: &T) -> Result<O> + pub fn call<S, T, O>(&mut self, method: S, params: &T) -> Result<O> where + S: ToString, T: serde::Serialize, O: for<'de> serde::Deserialize<'de>, { - let id = self.new_id(); - let (result_tx, result_rx) = mpsc::channel(); + let arguments = + serde_json::to_value(params).chain_err(|| ErrorKind::SerializeArgumentsError)?; + let (response_tx, response_rx) = mpsc::channel(); + let command = WsIpcCommand::Call { + method: method.to_string(), + arguments, + response_tx, + }; - self.queue_request_response(id, result_tx); - self.send_request(id, method, params)?; + self.connection_tx + .send(command) + .chain_err(|| ErrorKind::ConnectionHandlerStopped)?; - let json_result = result_rx.recv().chain_err(|| ErrorKind::MissingResponse)?; + let json_result = response_rx.recv().chain_err(|| ErrorKind::MissingResponse)?; Ok(serde_json::from_value(json_result?).chain_err(|| ErrorKind::DeserializeResponseError)?) } +} + +struct WsIpcClientConnection { + next_id: i64, + active_request: Option<ActiveRequest>, + sender: ws::Sender, +} + +impl WsIpcClientConnection { + pub fn spawn(sender: ws::Sender, commands: mpsc::Receiver<WsIpcCommand>) { + let mut instance = WsIpcClientConnection { + next_id: 1, + active_request: None, + sender, + }; + + thread::spawn(move || { + if let Err(error) = instance.run(commands) { + let chained_error = Error::with_chain(error, "WsIpcClient event loop error"); + error!("{}", chained_error.display_chain()); + } + }); + } + + fn run(&mut self, commands: mpsc::Receiver<WsIpcCommand>) -> Result<()> { + use self::WsIpcCommand::*; + + for command in commands { + match command { + Call { + method, + arguments, + response_tx, + } => self.call(method, arguments, response_tx)?, + Response { id, result } => self.handle_response(id, result)?, + Error(error) => self.handle_error(error), + } + } + + Ok(()) + } + + fn call( + &mut self, + method: String, + arguments: JsonValue, + response_tx: mpsc::Sender<Result<JsonValue>>, + ) -> Result<()> { + let id = self.new_id(); + self.queue_request_response(id, response_tx); + self.send_request(id, method, arguments) + } fn new_id(&mut self) -> i64 { let id = self.next_id; @@ -273,29 +334,19 @@ impl WsIpcClient { id } - fn queue_request_response(&mut self, id: i64, result_tx: mpsc::Sender<Result<JsonValue>>) { - let mut active_request = self.active_request - .lock() - .expect("a thread panicked using the active RPC request map"); - - *active_request = Some(ActiveRequest::new(id, result_tx)); + fn queue_request_response(&mut self, id: i64, response_tx: mpsc::Sender<Result<JsonValue>>) { + self.active_request = Some(ActiveRequest::new(id, response_tx)); } - fn send_request<T>(&mut self, id: i64, method: &str, params: &T) -> Result<()> - where - T: serde::Serialize, - { - let json_request = self.build_json_request(id, method, params); + fn send_request(&mut self, id: i64, method: String, arguments: JsonValue) -> Result<()> { + let json_request = self.build_json_request(id, &method, arguments); self.sender .send(json_request.as_bytes()) - .chain_err(|| ErrorKind::SendRequestError(method.to_owned())) + .chain_err(|| ErrorKind::SendRequestError(method)) } - fn build_json_request<T>(&mut self, id: i64, method: &str, params: &T) -> String - where - T: serde::Serialize, - { + fn build_json_request(&mut self, id: i64, method: &str, params: JsonValue) -> String { let request_json = json!({ "jsonrpc": "2.0", "id": id, @@ -304,4 +355,27 @@ impl WsIpcClient { }); format!("{}", request_json) } + + fn handle_response(&mut self, id: i64, result: Result<JsonValue>) -> Result<()> { + if let Some(mut request) = self.active_request.take() { + if request.id() == id { + request.send_response(result); + } else { + self.active_request = Some(request); + warn!("Received an unexpected response with ID {}", id); + } + } else { + warn!("Received an unexpected response with ID {}", id); + } + + Ok(()) + } + + fn handle_error(&mut self, error: Error) { + if let Some(ref mut request) = self.active_request { + let _ = request.response_tx.send(Err(error)); + } else { + error!("{}", error.display_chain()); + } + } } |
