diff options
| author | David Lönnhager <david.l@mullvad.net> | 2022-02-04 17:01:30 +0100 |
|---|---|---|
| committer | David Lönnhager <david.l@mullvad.net> | 2022-03-01 15:30:25 +0100 |
| commit | 0e3275fb181f2b17ebde7f8a2b68713f491c73d4 (patch) | |
| tree | 5376677ba84874df1f003e370a887af3b3a63046 | |
| parent | b21c4a0ead284f907f6c50acb840d58642c4f10b (diff) | |
| download | mullvadvpn-0e3275fb181f2b17ebde7f8a2b68713f491c73d4.tar.xz mullvadvpn-0e3275fb181f2b17ebde7f8a2b68713f491c73d4.zip | |
Use tokio::spawn in mullvad-rpc crate
| -rw-r--r-- | mullvad-daemon/src/relays/updater.rs | 3 | ||||
| -rw-r--r-- | mullvad-rpc/src/https_client_with_sni.rs | 5 | ||||
| -rw-r--r-- | mullvad-rpc/src/lib.rs | 7 | ||||
| -rw-r--r-- | mullvad-rpc/src/rest.rs | 30 |
4 files changed, 15 insertions, 30 deletions
diff --git a/mullvad-daemon/src/relays/updater.rs b/mullvad-daemon/src/relays/updater.rs index 0c06beb43f..381e5bd54c 100644 --- a/mullvad-daemon/src/relays/updater.rs +++ b/mullvad-daemon/src/relays/updater.rs @@ -58,7 +58,6 @@ impl RelayListUpdater { api_availability: ApiAvailabilityHandle, ) -> RelayListUpdaterHandle { let (tx, cmd_rx) = mpsc::channel(1); - let service = rpc_handle.service(); let rpc_client = RelayListProxy::new(rpc_handle); let updater = RelayListUpdater { rpc_client, @@ -69,7 +68,7 @@ impl RelayListUpdater { api_availability, }; - service.spawn(updater.run(cmd_rx)); + tokio::spawn(updater.run(cmd_rx)); RelayListUpdaterHandle { tx } } diff --git a/mullvad-rpc/src/https_client_with_sni.rs b/mullvad-rpc/src/https_client_with_sni.rs index 8328ed93d9..238b191491 100644 --- a/mullvad-rpc/src/https_client_with_sni.rs +++ b/mullvad-rpc/src/https_client_with_sni.rs @@ -37,7 +37,7 @@ use talpid_types::ErrorExt; #[cfg(target_os = "android")] use tokio::net::TcpSocket; -use tokio::{net::TcpStream, runtime::Handle, time::timeout}; +use tokio::{net::TcpStream, time::timeout}; const CONNECT_TIMEOUT: Duration = Duration::from_secs(5); @@ -132,7 +132,6 @@ pub type SocketBypassRequest = (RawFd, oneshot::Sender<()>); impl HttpsConnectorWithSni { pub fn new( - handle: Handle, sni_hostname: Option<String>, address_cache: AddressCache, #[cfg(target_os = "android")] socket_bypass_tx: Option<mpsc::Sender<SocketBypassRequest>>, @@ -146,7 +145,7 @@ impl HttpsConnectorWithSni { let inner_copy = inner.clone(); let notify = abort_notify.clone(); - handle.spawn(async move { + tokio::spawn(async move { // Handle requests by `HttpsConnectorWithSniHandle`s while let Some(request) = rx.next().await { let handles = { diff --git a/mullvad-rpc/src/lib.rs b/mullvad-rpc/src/lib.rs index e625dca376..cef6a73cc0 100644 --- a/mullvad-rpc/src/lib.rs +++ b/mullvad-rpc/src/lib.rs @@ -217,8 +217,7 @@ impl MullvadRpcRuntime { new_address_callback: impl (Fn(SocketAddr) -> AcceptedNewEndpoint) + Send + Sync + 'static, #[cfg(target_os = "android")] socket_bypass_tx: Option<mpsc::Sender<SocketBypassRequest>>, ) -> rest::RequestServiceHandle { - let service = rest::RequestService::new( - self.handle.clone(), + let service_handle = rest::RequestService::new( sni_hostname, self.api_availability.handle(), self.address_cache.clone(), @@ -228,9 +227,7 @@ impl MullvadRpcRuntime { socket_bypass_tx, ) .await; - let handle = service.handle(); - self.handle.spawn(service.into_future()); - handle + service_handle } /// Returns a request factory initialized to create requests for the master API diff --git a/mullvad-rpc/src/rest.rs b/mullvad-rpc/src/rest.rs index 3b7d0a0bc7..146d92f395 100644 --- a/mullvad-rpc/src/rest.rs +++ b/mullvad-rpc/src/rest.rs @@ -27,7 +27,6 @@ use std::{ time::{Duration, Instant}, }; use talpid_types::ErrorExt; -use tokio::runtime::Handle; pub use hyper::StatusCode; @@ -98,7 +97,6 @@ pub(crate) struct RequestService< command_rx: mpsc::Receiver<RequestCommand>, connector_handle: HttpsConnectorWithSniHandle, client: hyper::Client<HttpsConnectorWithSni, hyper::Body>, - handle: Handle, next_id: u64, in_flight_requests: BTreeMap<u64, AbortHandle>, proxy_config_provider: T, @@ -115,16 +113,14 @@ impl< { /// Constructs a new request service. pub async fn new( - handle: Handle, sni_hostname: Option<String>, api_availability: ApiAvailabilityHandle, address_cache: AddressCache, mut proxy_config_provider: T, new_address_callback: F, #[cfg(target_os = "android")] socket_bypass_tx: Option<mpsc::Sender<SocketBypassRequest>>, - ) -> RequestService<T> { + ) -> RequestServiceHandle { let (connector, connector_handle) = HttpsConnectorWithSni::new( - handle.clone(), sni_hostname, address_cache.clone(), #[cfg(target_os = "android")] @@ -139,26 +135,26 @@ impl< let (command_tx, command_rx) = mpsc::channel(1); let client = Client::builder().build(connector); - Self { + let service = Self { command_tx, command_rx, connector_handle, client, - handle, in_flight_requests: BTreeMap::new(), next_id: 0, proxy_config_provider, new_address_callback, address_cache, api_availability, - } + }; + let handle = service.handle(); + tokio::spawn(service.into_future()); + handle } - /// Constructs a handle - pub fn handle(&self) -> RequestServiceHandle { + fn handle(&self) -> RequestServiceHandle { RequestServiceHandle { tx: self.command_tx.clone(), - handle: self.handle.clone(), } } @@ -204,7 +200,7 @@ impl< }; self.in_flight_requests.insert(id, abort_handle); - self.handle.spawn(future); + tokio::spawn(future); } RequestCommand::RequestFinished(id) => { self.in_flight_requests.remove(&id); @@ -243,7 +239,7 @@ impl< id } - pub async fn into_future(mut self) { + async fn into_future(mut self) { while let Some(command) = self.command_rx.next().await { self.process_command(command).await; } @@ -255,7 +251,6 @@ impl< /// A handle to interact with a spawned `RequestService`. pub struct RequestServiceHandle { tx: mpsc::Sender<RequestCommand>, - handle: Handle, } impl RequestServiceHandle { @@ -278,11 +273,6 @@ impl RequestServiceHandle { completion_rx.await.map_err(|_| Error::ReceiveError)? } - - /// Spawns a future on the RPC runtime. - pub fn spawn<T: Send + 'static>(&self, future: impl Future<Output = T> + Send + 'static) { - let _ = self.handle.spawn(future); - } } #[derive(Debug)] @@ -603,7 +593,7 @@ impl MullvadRestHandle { let handle = self.clone(); let availability = self.availability.clone(); - self.service.spawn(async move { + tokio::spawn(async move { // always start the fetch after 15 minutes let api_proxy = crate::ApiProxy::new(handle); let mut next_check = Instant::now() + API_IP_CHECK_DELAY; |
