summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorDavid Lönnhager <david.l@mullvad.net>2022-02-04 17:01:30 +0100
committerDavid Lönnhager <david.l@mullvad.net>2022-03-01 15:30:25 +0100
commit0e3275fb181f2b17ebde7f8a2b68713f491c73d4 (patch)
tree5376677ba84874df1f003e370a887af3b3a63046
parentb21c4a0ead284f907f6c50acb840d58642c4f10b (diff)
downloadmullvadvpn-0e3275fb181f2b17ebde7f8a2b68713f491c73d4.tar.xz
mullvadvpn-0e3275fb181f2b17ebde7f8a2b68713f491c73d4.zip
Use tokio::spawn in mullvad-rpc crate
-rw-r--r--mullvad-daemon/src/relays/updater.rs3
-rw-r--r--mullvad-rpc/src/https_client_with_sni.rs5
-rw-r--r--mullvad-rpc/src/lib.rs7
-rw-r--r--mullvad-rpc/src/rest.rs30
4 files changed, 15 insertions, 30 deletions
diff --git a/mullvad-daemon/src/relays/updater.rs b/mullvad-daemon/src/relays/updater.rs
index 0c06beb43f..381e5bd54c 100644
--- a/mullvad-daemon/src/relays/updater.rs
+++ b/mullvad-daemon/src/relays/updater.rs
@@ -58,7 +58,6 @@ impl RelayListUpdater {
api_availability: ApiAvailabilityHandle,
) -> RelayListUpdaterHandle {
let (tx, cmd_rx) = mpsc::channel(1);
- let service = rpc_handle.service();
let rpc_client = RelayListProxy::new(rpc_handle);
let updater = RelayListUpdater {
rpc_client,
@@ -69,7 +68,7 @@ impl RelayListUpdater {
api_availability,
};
- service.spawn(updater.run(cmd_rx));
+ tokio::spawn(updater.run(cmd_rx));
RelayListUpdaterHandle { tx }
}
diff --git a/mullvad-rpc/src/https_client_with_sni.rs b/mullvad-rpc/src/https_client_with_sni.rs
index 8328ed93d9..238b191491 100644
--- a/mullvad-rpc/src/https_client_with_sni.rs
+++ b/mullvad-rpc/src/https_client_with_sni.rs
@@ -37,7 +37,7 @@ use talpid_types::ErrorExt;
#[cfg(target_os = "android")]
use tokio::net::TcpSocket;
-use tokio::{net::TcpStream, runtime::Handle, time::timeout};
+use tokio::{net::TcpStream, time::timeout};
const CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
@@ -132,7 +132,6 @@ pub type SocketBypassRequest = (RawFd, oneshot::Sender<()>);
impl HttpsConnectorWithSni {
pub fn new(
- handle: Handle,
sni_hostname: Option<String>,
address_cache: AddressCache,
#[cfg(target_os = "android")] socket_bypass_tx: Option<mpsc::Sender<SocketBypassRequest>>,
@@ -146,7 +145,7 @@ impl HttpsConnectorWithSni {
let inner_copy = inner.clone();
let notify = abort_notify.clone();
- handle.spawn(async move {
+ tokio::spawn(async move {
// Handle requests by `HttpsConnectorWithSniHandle`s
while let Some(request) = rx.next().await {
let handles = {
diff --git a/mullvad-rpc/src/lib.rs b/mullvad-rpc/src/lib.rs
index e625dca376..cef6a73cc0 100644
--- a/mullvad-rpc/src/lib.rs
+++ b/mullvad-rpc/src/lib.rs
@@ -217,8 +217,7 @@ impl MullvadRpcRuntime {
new_address_callback: impl (Fn(SocketAddr) -> AcceptedNewEndpoint) + Send + Sync + 'static,
#[cfg(target_os = "android")] socket_bypass_tx: Option<mpsc::Sender<SocketBypassRequest>>,
) -> rest::RequestServiceHandle {
- let service = rest::RequestService::new(
- self.handle.clone(),
+ let service_handle = rest::RequestService::new(
sni_hostname,
self.api_availability.handle(),
self.address_cache.clone(),
@@ -228,9 +227,7 @@ impl MullvadRpcRuntime {
socket_bypass_tx,
)
.await;
- let handle = service.handle();
- self.handle.spawn(service.into_future());
- handle
+ service_handle
}
/// Returns a request factory initialized to create requests for the master API
diff --git a/mullvad-rpc/src/rest.rs b/mullvad-rpc/src/rest.rs
index 3b7d0a0bc7..146d92f395 100644
--- a/mullvad-rpc/src/rest.rs
+++ b/mullvad-rpc/src/rest.rs
@@ -27,7 +27,6 @@ use std::{
time::{Duration, Instant},
};
use talpid_types::ErrorExt;
-use tokio::runtime::Handle;
pub use hyper::StatusCode;
@@ -98,7 +97,6 @@ pub(crate) struct RequestService<
command_rx: mpsc::Receiver<RequestCommand>,
connector_handle: HttpsConnectorWithSniHandle,
client: hyper::Client<HttpsConnectorWithSni, hyper::Body>,
- handle: Handle,
next_id: u64,
in_flight_requests: BTreeMap<u64, AbortHandle>,
proxy_config_provider: T,
@@ -115,16 +113,14 @@ impl<
{
/// Constructs a new request service.
pub async fn new(
- handle: Handle,
sni_hostname: Option<String>,
api_availability: ApiAvailabilityHandle,
address_cache: AddressCache,
mut proxy_config_provider: T,
new_address_callback: F,
#[cfg(target_os = "android")] socket_bypass_tx: Option<mpsc::Sender<SocketBypassRequest>>,
- ) -> RequestService<T> {
+ ) -> RequestServiceHandle {
let (connector, connector_handle) = HttpsConnectorWithSni::new(
- handle.clone(),
sni_hostname,
address_cache.clone(),
#[cfg(target_os = "android")]
@@ -139,26 +135,26 @@ impl<
let (command_tx, command_rx) = mpsc::channel(1);
let client = Client::builder().build(connector);
- Self {
+ let service = Self {
command_tx,
command_rx,
connector_handle,
client,
- handle,
in_flight_requests: BTreeMap::new(),
next_id: 0,
proxy_config_provider,
new_address_callback,
address_cache,
api_availability,
- }
+ };
+ let handle = service.handle();
+ tokio::spawn(service.into_future());
+ handle
}
- /// Constructs a handle
- pub fn handle(&self) -> RequestServiceHandle {
+ fn handle(&self) -> RequestServiceHandle {
RequestServiceHandle {
tx: self.command_tx.clone(),
- handle: self.handle.clone(),
}
}
@@ -204,7 +200,7 @@ impl<
};
self.in_flight_requests.insert(id, abort_handle);
- self.handle.spawn(future);
+ tokio::spawn(future);
}
RequestCommand::RequestFinished(id) => {
self.in_flight_requests.remove(&id);
@@ -243,7 +239,7 @@ impl<
id
}
- pub async fn into_future(mut self) {
+ async fn into_future(mut self) {
while let Some(command) = self.command_rx.next().await {
self.process_command(command).await;
}
@@ -255,7 +251,6 @@ impl<
/// A handle to interact with a spawned `RequestService`.
pub struct RequestServiceHandle {
tx: mpsc::Sender<RequestCommand>,
- handle: Handle,
}
impl RequestServiceHandle {
@@ -278,11 +273,6 @@ impl RequestServiceHandle {
completion_rx.await.map_err(|_| Error::ReceiveError)?
}
-
- /// Spawns a future on the RPC runtime.
- pub fn spawn<T: Send + 'static>(&self, future: impl Future<Output = T> + Send + 'static) {
- let _ = self.handle.spawn(future);
- }
}
#[derive(Debug)]
@@ -603,7 +593,7 @@ impl MullvadRestHandle {
let handle = self.clone();
let availability = self.availability.clone();
- self.service.spawn(async move {
+ tokio::spawn(async move {
// always start the fetch after 15 minutes
let api_proxy = crate::ApiProxy::new(handle);
let mut next_check = Instant::now() + API_IP_CHECK_DELAY;