diff options
| author | David Lönnhager <david.l@mullvad.net> | 2022-03-09 17:29:43 +0100 |
|---|---|---|
| committer | David Lönnhager <david.l@mullvad.net> | 2022-03-15 09:12:52 +0100 |
| commit | ea725658a7ca446b6616cfbacfca147ffeaa979d (patch) | |
| tree | 6c4272660fb175a2ff58104a50f76416b5d180c3 | |
| parent | 6312a3037ed6e72db3e07c62f2b13ae4b8f0a706 (diff) | |
| download | mullvadvpn-ea725658a7ca446b6616cfbacfca147ffeaa979d.tar.xz mullvadvpn-ea725658a7ca446b6616cfbacfca147ffeaa979d.zip | |
Shut down REST request service when all handles have been dropped
| -rw-r--r-- | mullvad-rpc/src/rest.rs | 37 |
1 files changed, 16 insertions, 21 deletions
diff --git a/mullvad-rpc/src/rest.rs b/mullvad-rpc/src/rest.rs index 6f36a2a096..292bcd0cbb 100644 --- a/mullvad-rpc/src/rest.rs +++ b/mullvad-rpc/src/rest.rs @@ -9,7 +9,6 @@ use crate::{ }; use futures::{ channel::{mpsc, oneshot}, - sink::SinkExt, stream::StreamExt, Stream, TryFutureExt, }; @@ -22,6 +21,7 @@ use mullvad_types::account::AccountToken; use std::{ future::Future, str::FromStr, + sync::{Arc, Weak}, time::{Duration, Instant}, }; use talpid_types::ErrorExt; @@ -114,8 +114,8 @@ pub(crate) struct RequestService< T: Stream<Item = ApiConnectionMode>, F: ApiEndpointUpdateCallback + Send, > { - command_tx: mpsc::Sender<RequestCommand>, - command_rx: mpsc::Receiver<RequestCommand>, + command_tx: Weak<mpsc::UnboundedSender<RequestCommand>>, + command_rx: mpsc::UnboundedReceiver<RequestCommand>, connector_handle: HttpsConnectorWithSniHandle, client: hyper::Client<HttpsConnectorWithSni, hyper::Body>, proxy_config_provider: T, @@ -150,11 +150,13 @@ impl< .await .map(|config| connector_handle.set_connection_mode(config)); - let (command_tx, command_rx) = mpsc::channel(1); + let (command_tx, command_rx) = mpsc::unbounded(); let client = Client::builder().build(connector); + let command_tx = Arc::new(command_tx); + let service = Self { - command_tx, + command_tx: Arc::downgrade(&command_tx), command_rx, connector_handle, client, @@ -163,21 +165,15 @@ impl< address_cache, api_availability, }; - let handle = service.handle(); + let handle = RequestServiceHandle { tx: command_tx }; tokio::spawn(service.into_future()); handle } - fn handle(&self) -> RequestServiceHandle { - RequestServiceHandle { - tx: self.command_tx.clone(), - } - } - async fn process_command(&mut self, command: RequestCommand) { match command { RequestCommand::NewRequest(request, completion_tx) => { - let mut tx = self.command_tx.clone(); + let tx = self.command_tx.upgrade(); let timeout = request.timeout(); let hyper_request = request.into_request(); @@ -201,7 +197,9 @@ impl< if let Err(err) = &response { if err.is_network_error() && !api_availability.get_state().is_offline() { log::error!("{}", err.display_chain_with_msg("HTTP request failed")); - let _ = tx.send(RequestCommand::NextApiConfig).await; + if let Some(tx) = tx { + let _ = tx.unbounded_send(RequestCommand::NextApiConfig); + } } } @@ -242,24 +240,21 @@ impl< #[derive(Clone)] /// A handle to interact with a spawned `RequestService`. pub struct RequestServiceHandle { - tx: mpsc::Sender<RequestCommand>, + tx: Arc<mpsc::UnboundedSender<RequestCommand>>, } impl RequestServiceHandle { /// Resets the corresponding RequestService, dropping all in-flight requests. pub async fn reset(&self) { - let mut tx = self.tx.clone(); - let _ = tx.send(RequestCommand::Reset).await; + let _ = self.tx.unbounded_send(RequestCommand::Reset); } /// Submits a `RestRequest` for exectuion to the request service. pub async fn request(&self, request: RestRequest) -> Result<Response> { let (completion_tx, completion_rx) = oneshot::channel(); - let mut tx = self.tx.clone(); - tx.send(RequestCommand::NewRequest(request, completion_tx)) - .await + self.tx + .unbounded_send(RequestCommand::NewRequest(request, completion_tx)) .map_err(|_| Error::SendError)?; - completion_rx.await.map_err(|_| Error::ReceiveError)? } } |
