diff options
| author | Emīls <emils@mullvad.net> | 2020-07-21 10:51:59 +0100 |
|---|---|---|
| committer | Emīls <emils@mullvad.net> | 2020-07-21 10:51:59 +0100 |
| commit | 5dd98f4417e7c69f32f2368086926989526d09ff (patch) | |
| tree | b3629b44a533b190f922214dea62cca4cfcc428e | |
| parent | 4c40147f34fed91410b887de97794fbb1a3bc90f (diff) | |
| parent | f25ca6da98cf7180c0659f70a4c8bc92811821b1 (diff) | |
| download | mullvadvpn-5dd98f4417e7c69f32f2368086926989526d09ff.tar.xz mullvadvpn-5dd98f4417e7c69f32f2368086926989526d09ff.zip | |
Merge branch 'move-cancel-hanlde-to-talpid-core'
| -rw-r--r-- | mullvad-daemon/src/lib.rs | 14 | ||||
| -rw-r--r-- | mullvad-daemon/src/wireguard.rs | 23 | ||||
| -rw-r--r-- | mullvad-rpc/src/rest.rs | 97 |
3 files changed, 32 insertions, 102 deletions
diff --git a/mullvad-daemon/src/lib.rs b/mullvad-daemon/src/lib.rs index f7755fbf75..997770f8de 100644 --- a/mullvad-daemon/src/lib.rs +++ b/mullvad-daemon/src/lib.rs @@ -18,6 +18,7 @@ mod settings; pub mod version; mod version_check; +use futures::future::{abortable, AbortHandle}; use futures01::{ future::{self, Executor}, stream::Wait, @@ -28,10 +29,7 @@ use futures01::{ Future, Stream, }; use log::{debug, error, info, warn}; -use mullvad_rpc::{ - rest::{CancelHandle, Cancellable}, - AccountsProxy, -}; +use mullvad_rpc::AccountsProxy; use mullvad_types::{ account::{AccountData, AccountToken, VoucherSubmission}, endpoint::MullvadEndpoint, @@ -464,7 +462,7 @@ pub struct Daemon<L: EventListener> { exclude_pids: split_tunnel::PidManager, rx: Wait<UnboundedReceiver<InternalDaemonEvent>>, tx: DaemonEventSender, - reconnection_job: Option<CancelHandle>, + reconnection_job: Option<AbortHandle>, event_listener: L, settings: SettingsPersister, account_history: account_history::AccountHistory, @@ -950,19 +948,19 @@ where fn schedule_reconnect(&mut self, delay: Duration) { let tunnel_command_tx = self.tx.to_specialized_sender(); - let (future, cancel_handle) = Cancellable::new(Box::pin(async move { + let (future, abort_handle) = abortable(Box::pin(async move { tokio02::time::delay_for(delay).await; log::debug!("Attempting to reconnect"); let _ = tunnel_command_tx.send(DaemonCommand::Reconnect); })); self.spawn_future(future); - self.reconnection_job = Some(cancel_handle); + self.reconnection_job = Some(abort_handle); } fn unschedule_reconnect(&mut self) { if let Some(job) = self.reconnection_job.take() { - job.cancel(); + job.abort(); } } diff --git a/mullvad-daemon/src/wireguard.rs b/mullvad-daemon/src/wireguard.rs index 923fc4fb34..2ebd5fae50 100644 --- a/mullvad-daemon/src/wireguard.rs +++ b/mullvad-daemon/src/wireguard.rs @@ -1,6 +1,6 @@ use crate::{account_history::AccountHistory, DaemonEventSender, InternalDaemonEvent}; use chrono::offset::Utc; -use mullvad_rpc::rest::{CancelHandle, Cancellable, Error as RestError, MullvadRestHandle}; +use mullvad_rpc::rest::{Error as RestError, MullvadRestHandle}; use mullvad_types::account::AccountToken; pub use mullvad_types::wireguard::*; use std::{ @@ -9,6 +9,7 @@ use std::{ time::{Duration, Instant}, }; +use futures::future::{abortable, AbortHandle}; use talpid_core::{ future_retry::{retry_future_with_backoff, ExponentialBackoff, Jittered}, mpsc::Sender, @@ -43,9 +44,9 @@ pub type Result<T> = std::result::Result<T, Error>; pub struct KeyManager { daemon_tx: DaemonEventSender, http_handle: MullvadRestHandle, - current_job: Option<CancelHandle>, + current_job: Option<AbortHandle>, - abort_scheduler_tx: Option<CancelHandle>, + abort_scheduler_tx: Option<AbortHandle>, auto_rotation_interval: Duration, } @@ -98,7 +99,7 @@ impl KeyManager { /// Stop current key generation pub fn reset(&mut self) { if let Some(job) = self.current_job.take() { - job.cancel() + job.abort() } } @@ -203,7 +204,7 @@ impl KeyManager { retry_future_with_backoff(future_generator, should_retry, retry_strategy); - let (cancellable_upload, cancel_handle) = Cancellable::new(Box::pin(upload_future)); + let (cancellable_upload, abort_handle) = abortable(Box::pin(upload_future)); let daemon_tx = self.daemon_tx.clone(); let future = async move { match cancellable_upload.await { @@ -222,7 +223,7 @@ impl KeyManager { self.http_handle.service().spawn(Box::pin(future)); - self.current_job = Some(cancel_handle); + self.current_job = Some(abort_handle); } @@ -391,16 +392,16 @@ impl KeyManager { self.auto_rotation_interval.as_secs(), account_token, ); - let (cancellable, cancel_handle) = Cancellable::new(Box::pin(fut)); + let (request, abort_handle) = abortable(Box::pin(fut)); - self.http_handle.service().spawn(cancellable); - self.abort_scheduler_tx = Some(cancel_handle); + self.http_handle.service().spawn(request); + self.abort_scheduler_tx = Some(abort_handle); } fn stop_automatic_rotation(&mut self) { - if let Some(cancel_handle) = self.abort_scheduler_tx.take() { + if let Some(abort_handle) = self.abort_scheduler_tx.take() { log::info!("Stopping automatic key rotation"); - cancel_handle.cancel(); + abort_handle.abort(); } } } diff --git a/mullvad-rpc/src/rest.rs b/mullvad-rpc/src/rest.rs index c2677c7212..205f350ea5 100644 --- a/mullvad-rpc/src/rest.rs +++ b/mullvad-rpc/src/rest.rs @@ -1,8 +1,9 @@ use futures::{ channel::{mpsc, oneshot}, + future::{abortable, AbortHandle, Aborted}, sink::SinkExt, stream::StreamExt, - FutureExt, TryFutureExt, + TryFutureExt, }; use futures01::Future as OldFuture; use hyper::{ @@ -10,16 +11,7 @@ use hyper::{ header::{self, HeaderValue}, Method, Uri, }; -use std::{ - collections::BTreeMap, - future::Future, - mem, - net::IpAddr, - pin::Pin, - str::FromStr, - task::{Context, Poll}, - time::Duration, -}; +use std::{collections::BTreeMap, future::Future, mem, net::IpAddr, str::FromStr, time::Duration}; use tokio::runtime::Handle; pub use hyper::StatusCode; @@ -35,7 +27,7 @@ const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10); #[derive(err_derive::Error, Debug)] pub enum Error { #[error(display = "Request cancelled")] - Cancelled(CancelErr), + Aborted(Aborted), #[error(display = "Hyper error")] HyperError(#[error(source)] hyper::Error), @@ -76,7 +68,7 @@ pub(crate) struct RequestService<C> { connector: C, handle: Handle, next_id: u64, - in_flight_requests: BTreeMap<u64, CancelHandle>, + in_flight_requests: BTreeMap<u64, AbortHandle>, } impl<C: Connect + Clone + Send + Sync + 'static> RequestService<C> { @@ -115,19 +107,17 @@ impl<C: Connect + Clone + Send + Sync + 'static> RequestService<C> { let mut tx = self.command_tx.clone(); let timeout = request.timeout(); - let (request_future, cancel_handle) = Cancellable::new( + let (request_future, abort_handle) = abortable( self.client .request(request.into_request()) .map_err(Error::from), ); let future = async move { - let response = tokio::time::timeout( - timeout, - request_future.into_future().map_err(Error::Cancelled), - ) - .await - .map_err(Error::TimeoutError); + let response = + tokio::time::timeout(timeout, request_future.map_err(Error::Aborted)) + .await + .map_err(Error::TimeoutError); let response = flatten_result(flatten_result(response)); @@ -141,7 +131,7 @@ impl<C: Connect + Clone + Send + Sync + 'static> RequestService<C> { self.handle.spawn(future); - self.in_flight_requests.insert(id, cancel_handle); + self.in_flight_requests.insert(id, abort_handle); } RequestCommand::RequestFinished(id) => { @@ -156,8 +146,8 @@ impl<C: Connect + Clone + Send + Sync + 'static> RequestService<C> { fn reset(&mut self) { let old_requests = mem::replace(&mut self.in_flight_requests, BTreeMap::new()); - for (_, cancel_handle) in old_requests.into_iter() { - cancel_handle.cancel(); + for (_, abort_handle) in old_requests.into_iter() { + abort_handle.abort(); } let _ = mem::replace(&mut self.client, Self::new_client(self.connector.clone())); self.next_id = 0; @@ -219,7 +209,7 @@ impl RequestServiceHandle { }); - rx.map_err(|_| Error::Cancelled(CancelErr(()))).flatten() + rx.map_err(|_| Error::ReceiveError).flatten() } /// Spawns a future on the RPC runtime. @@ -425,65 +415,6 @@ impl RequestFactory { } -#[derive(Debug)] -pub struct CancelErr(()); - -pub struct Cancellable<F: Future> { - rx: oneshot::Receiver<()>, - f: F, -} - -pub struct CancelHandle { - tx: oneshot::Sender<()>, -} - -impl CancelHandle { - pub fn cancel(self) { - let _ = self.tx.send(()); - } -} - - -impl<F> Cancellable<F> -where - F: Future, -{ - pub fn new(f: F) -> (Self, CancelHandle) { - let (tx, rx) = oneshot::channel(); - (Self { f, rx }, CancelHandle { tx }) - } - - async fn into_future(self) -> std::result::Result<F::Output, CancelErr> { - futures::select! { - _cancelled = self.rx.fuse() => { - Err(CancelErr(())) - }, - value = self.f.fuse() => { - Ok(value) - } - } - } -} - - -impl<F: Future<Output = T> + Unpin, T: Unpin> Future for Cancellable<F> { - type Output = std::result::Result<T, CancelErr>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - let inner = self.get_mut(); - - if let Poll::Ready(ready) = inner.f.poll_unpin(cx) { - return Poll::Ready(Ok(ready)); - } - - if let Poll::Ready(_) = inner.rx.poll_unpin(cx) { - return Poll::Ready(Err(CancelErr(()))); - } - - Poll::Pending - } -} - pub fn get_request<T: serde::de::DeserializeOwned>( factory: &RequestFactory, service: RequestServiceHandle, |
