summaryrefslogtreecommitdiffhomepage
path: root/mullvad-rpc/src
diff options
context:
space:
mode:
authorDavid Lönnhager <david.l@mullvad.net>2022-03-09 17:29:43 +0100
committerDavid Lönnhager <david.l@mullvad.net>2022-03-15 09:12:52 +0100
commitea725658a7ca446b6616cfbacfca147ffeaa979d (patch)
tree6c4272660fb175a2ff58104a50f76416b5d180c3 /mullvad-rpc/src
parent6312a3037ed6e72db3e07c62f2b13ae4b8f0a706 (diff)
downloadmullvadvpn-ea725658a7ca446b6616cfbacfca147ffeaa979d.tar.xz
mullvadvpn-ea725658a7ca446b6616cfbacfca147ffeaa979d.zip
Shut down REST request service when all handles have been dropped
Diffstat (limited to 'mullvad-rpc/src')
-rw-r--r--mullvad-rpc/src/rest.rs37
1 files changed, 16 insertions, 21 deletions
diff --git a/mullvad-rpc/src/rest.rs b/mullvad-rpc/src/rest.rs
index 6f36a2a096..292bcd0cbb 100644
--- a/mullvad-rpc/src/rest.rs
+++ b/mullvad-rpc/src/rest.rs
@@ -9,7 +9,6 @@ use crate::{
};
use futures::{
channel::{mpsc, oneshot},
- sink::SinkExt,
stream::StreamExt,
Stream, TryFutureExt,
};
@@ -22,6 +21,7 @@ use mullvad_types::account::AccountToken;
use std::{
future::Future,
str::FromStr,
+ sync::{Arc, Weak},
time::{Duration, Instant},
};
use talpid_types::ErrorExt;
@@ -114,8 +114,8 @@ pub(crate) struct RequestService<
T: Stream<Item = ApiConnectionMode>,
F: ApiEndpointUpdateCallback + Send,
> {
- command_tx: mpsc::Sender<RequestCommand>,
- command_rx: mpsc::Receiver<RequestCommand>,
+ command_tx: Weak<mpsc::UnboundedSender<RequestCommand>>,
+ command_rx: mpsc::UnboundedReceiver<RequestCommand>,
connector_handle: HttpsConnectorWithSniHandle,
client: hyper::Client<HttpsConnectorWithSni, hyper::Body>,
proxy_config_provider: T,
@@ -150,11 +150,13 @@ impl<
.await
.map(|config| connector_handle.set_connection_mode(config));
- let (command_tx, command_rx) = mpsc::channel(1);
+ let (command_tx, command_rx) = mpsc::unbounded();
let client = Client::builder().build(connector);
+ let command_tx = Arc::new(command_tx);
+
let service = Self {
- command_tx,
+ command_tx: Arc::downgrade(&command_tx),
command_rx,
connector_handle,
client,
@@ -163,21 +165,15 @@ impl<
address_cache,
api_availability,
};
- let handle = service.handle();
+ let handle = RequestServiceHandle { tx: command_tx };
tokio::spawn(service.into_future());
handle
}
- fn handle(&self) -> RequestServiceHandle {
- RequestServiceHandle {
- tx: self.command_tx.clone(),
- }
- }
-
async fn process_command(&mut self, command: RequestCommand) {
match command {
RequestCommand::NewRequest(request, completion_tx) => {
- let mut tx = self.command_tx.clone();
+ let tx = self.command_tx.upgrade();
let timeout = request.timeout();
let hyper_request = request.into_request();
@@ -201,7 +197,9 @@ impl<
if let Err(err) = &response {
if err.is_network_error() && !api_availability.get_state().is_offline() {
log::error!("{}", err.display_chain_with_msg("HTTP request failed"));
- let _ = tx.send(RequestCommand::NextApiConfig).await;
+ if let Some(tx) = tx {
+ let _ = tx.unbounded_send(RequestCommand::NextApiConfig);
+ }
}
}
@@ -242,24 +240,21 @@ impl<
#[derive(Clone)]
/// A handle to interact with a spawned `RequestService`.
pub struct RequestServiceHandle {
- tx: mpsc::Sender<RequestCommand>,
+ tx: Arc<mpsc::UnboundedSender<RequestCommand>>,
}
impl RequestServiceHandle {
/// Resets the corresponding RequestService, dropping all in-flight requests.
pub async fn reset(&self) {
- let mut tx = self.tx.clone();
- let _ = tx.send(RequestCommand::Reset).await;
+ let _ = self.tx.unbounded_send(RequestCommand::Reset);
}
/// Submits a `RestRequest` for exectuion to the request service.
pub async fn request(&self, request: RestRequest) -> Result<Response> {
let (completion_tx, completion_rx) = oneshot::channel();
- let mut tx = self.tx.clone();
- tx.send(RequestCommand::NewRequest(request, completion_tx))
- .await
+ self.tx
+ .unbounded_send(RequestCommand::NewRequest(request, completion_tx))
.map_err(|_| Error::SendError)?;
-
completion_rx.await.map_err(|_| Error::ReceiveError)?
}
}