summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorEmīls <emils@mullvad.net>2020-07-21 10:51:59 +0100
committerEmīls <emils@mullvad.net>2020-07-21 10:51:59 +0100
commit5dd98f4417e7c69f32f2368086926989526d09ff (patch)
treeb3629b44a533b190f922214dea62cca4cfcc428e
parent4c40147f34fed91410b887de97794fbb1a3bc90f (diff)
parentf25ca6da98cf7180c0659f70a4c8bc92811821b1 (diff)
downloadmullvadvpn-5dd98f4417e7c69f32f2368086926989526d09ff.tar.xz
mullvadvpn-5dd98f4417e7c69f32f2368086926989526d09ff.zip
Merge branch 'move-cancel-hanlde-to-talpid-core'
-rw-r--r--mullvad-daemon/src/lib.rs14
-rw-r--r--mullvad-daemon/src/wireguard.rs23
-rw-r--r--mullvad-rpc/src/rest.rs97
3 files changed, 32 insertions, 102 deletions
diff --git a/mullvad-daemon/src/lib.rs b/mullvad-daemon/src/lib.rs
index f7755fbf75..997770f8de 100644
--- a/mullvad-daemon/src/lib.rs
+++ b/mullvad-daemon/src/lib.rs
@@ -18,6 +18,7 @@ mod settings;
pub mod version;
mod version_check;
+use futures::future::{abortable, AbortHandle};
use futures01::{
future::{self, Executor},
stream::Wait,
@@ -28,10 +29,7 @@ use futures01::{
Future, Stream,
};
use log::{debug, error, info, warn};
-use mullvad_rpc::{
- rest::{CancelHandle, Cancellable},
- AccountsProxy,
-};
+use mullvad_rpc::AccountsProxy;
use mullvad_types::{
account::{AccountData, AccountToken, VoucherSubmission},
endpoint::MullvadEndpoint,
@@ -464,7 +462,7 @@ pub struct Daemon<L: EventListener> {
exclude_pids: split_tunnel::PidManager,
rx: Wait<UnboundedReceiver<InternalDaemonEvent>>,
tx: DaemonEventSender,
- reconnection_job: Option<CancelHandle>,
+ reconnection_job: Option<AbortHandle>,
event_listener: L,
settings: SettingsPersister,
account_history: account_history::AccountHistory,
@@ -950,19 +948,19 @@ where
fn schedule_reconnect(&mut self, delay: Duration) {
let tunnel_command_tx = self.tx.to_specialized_sender();
- let (future, cancel_handle) = Cancellable::new(Box::pin(async move {
+ let (future, abort_handle) = abortable(Box::pin(async move {
tokio02::time::delay_for(delay).await;
log::debug!("Attempting to reconnect");
let _ = tunnel_command_tx.send(DaemonCommand::Reconnect);
}));
self.spawn_future(future);
- self.reconnection_job = Some(cancel_handle);
+ self.reconnection_job = Some(abort_handle);
}
fn unschedule_reconnect(&mut self) {
if let Some(job) = self.reconnection_job.take() {
- job.cancel();
+ job.abort();
}
}
diff --git a/mullvad-daemon/src/wireguard.rs b/mullvad-daemon/src/wireguard.rs
index 923fc4fb34..2ebd5fae50 100644
--- a/mullvad-daemon/src/wireguard.rs
+++ b/mullvad-daemon/src/wireguard.rs
@@ -1,6 +1,6 @@
use crate::{account_history::AccountHistory, DaemonEventSender, InternalDaemonEvent};
use chrono::offset::Utc;
-use mullvad_rpc::rest::{CancelHandle, Cancellable, Error as RestError, MullvadRestHandle};
+use mullvad_rpc::rest::{Error as RestError, MullvadRestHandle};
use mullvad_types::account::AccountToken;
pub use mullvad_types::wireguard::*;
use std::{
@@ -9,6 +9,7 @@ use std::{
time::{Duration, Instant},
};
+use futures::future::{abortable, AbortHandle};
use talpid_core::{
future_retry::{retry_future_with_backoff, ExponentialBackoff, Jittered},
mpsc::Sender,
@@ -43,9 +44,9 @@ pub type Result<T> = std::result::Result<T, Error>;
pub struct KeyManager {
daemon_tx: DaemonEventSender,
http_handle: MullvadRestHandle,
- current_job: Option<CancelHandle>,
+ current_job: Option<AbortHandle>,
- abort_scheduler_tx: Option<CancelHandle>,
+ abort_scheduler_tx: Option<AbortHandle>,
auto_rotation_interval: Duration,
}
@@ -98,7 +99,7 @@ impl KeyManager {
/// Stop current key generation
pub fn reset(&mut self) {
if let Some(job) = self.current_job.take() {
- job.cancel()
+ job.abort()
}
}
@@ -203,7 +204,7 @@ impl KeyManager {
retry_future_with_backoff(future_generator, should_retry, retry_strategy);
- let (cancellable_upload, cancel_handle) = Cancellable::new(Box::pin(upload_future));
+ let (cancellable_upload, abort_handle) = abortable(Box::pin(upload_future));
let daemon_tx = self.daemon_tx.clone();
let future = async move {
match cancellable_upload.await {
@@ -222,7 +223,7 @@ impl KeyManager {
self.http_handle.service().spawn(Box::pin(future));
- self.current_job = Some(cancel_handle);
+ self.current_job = Some(abort_handle);
}
@@ -391,16 +392,16 @@ impl KeyManager {
self.auto_rotation_interval.as_secs(),
account_token,
);
- let (cancellable, cancel_handle) = Cancellable::new(Box::pin(fut));
+ let (request, abort_handle) = abortable(Box::pin(fut));
- self.http_handle.service().spawn(cancellable);
- self.abort_scheduler_tx = Some(cancel_handle);
+ self.http_handle.service().spawn(request);
+ self.abort_scheduler_tx = Some(abort_handle);
}
fn stop_automatic_rotation(&mut self) {
- if let Some(cancel_handle) = self.abort_scheduler_tx.take() {
+ if let Some(abort_handle) = self.abort_scheduler_tx.take() {
log::info!("Stopping automatic key rotation");
- cancel_handle.cancel();
+ abort_handle.abort();
}
}
}
diff --git a/mullvad-rpc/src/rest.rs b/mullvad-rpc/src/rest.rs
index c2677c7212..205f350ea5 100644
--- a/mullvad-rpc/src/rest.rs
+++ b/mullvad-rpc/src/rest.rs
@@ -1,8 +1,9 @@
use futures::{
channel::{mpsc, oneshot},
+ future::{abortable, AbortHandle, Aborted},
sink::SinkExt,
stream::StreamExt,
- FutureExt, TryFutureExt,
+ TryFutureExt,
};
use futures01::Future as OldFuture;
use hyper::{
@@ -10,16 +11,7 @@ use hyper::{
header::{self, HeaderValue},
Method, Uri,
};
-use std::{
- collections::BTreeMap,
- future::Future,
- mem,
- net::IpAddr,
- pin::Pin,
- str::FromStr,
- task::{Context, Poll},
- time::Duration,
-};
+use std::{collections::BTreeMap, future::Future, mem, net::IpAddr, str::FromStr, time::Duration};
use tokio::runtime::Handle;
pub use hyper::StatusCode;
@@ -35,7 +27,7 @@ const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
#[derive(err_derive::Error, Debug)]
pub enum Error {
#[error(display = "Request cancelled")]
- Cancelled(CancelErr),
+ Aborted(Aborted),
#[error(display = "Hyper error")]
HyperError(#[error(source)] hyper::Error),
@@ -76,7 +68,7 @@ pub(crate) struct RequestService<C> {
connector: C,
handle: Handle,
next_id: u64,
- in_flight_requests: BTreeMap<u64, CancelHandle>,
+ in_flight_requests: BTreeMap<u64, AbortHandle>,
}
impl<C: Connect + Clone + Send + Sync + 'static> RequestService<C> {
@@ -115,19 +107,17 @@ impl<C: Connect + Clone + Send + Sync + 'static> RequestService<C> {
let mut tx = self.command_tx.clone();
let timeout = request.timeout();
- let (request_future, cancel_handle) = Cancellable::new(
+ let (request_future, abort_handle) = abortable(
self.client
.request(request.into_request())
.map_err(Error::from),
);
let future = async move {
- let response = tokio::time::timeout(
- timeout,
- request_future.into_future().map_err(Error::Cancelled),
- )
- .await
- .map_err(Error::TimeoutError);
+ let response =
+ tokio::time::timeout(timeout, request_future.map_err(Error::Aborted))
+ .await
+ .map_err(Error::TimeoutError);
let response = flatten_result(flatten_result(response));
@@ -141,7 +131,7 @@ impl<C: Connect + Clone + Send + Sync + 'static> RequestService<C> {
self.handle.spawn(future);
- self.in_flight_requests.insert(id, cancel_handle);
+ self.in_flight_requests.insert(id, abort_handle);
}
RequestCommand::RequestFinished(id) => {
@@ -156,8 +146,8 @@ impl<C: Connect + Clone + Send + Sync + 'static> RequestService<C> {
fn reset(&mut self) {
let old_requests = mem::replace(&mut self.in_flight_requests, BTreeMap::new());
- for (_, cancel_handle) in old_requests.into_iter() {
- cancel_handle.cancel();
+ for (_, abort_handle) in old_requests.into_iter() {
+ abort_handle.abort();
}
let _ = mem::replace(&mut self.client, Self::new_client(self.connector.clone()));
self.next_id = 0;
@@ -219,7 +209,7 @@ impl RequestServiceHandle {
});
- rx.map_err(|_| Error::Cancelled(CancelErr(()))).flatten()
+ rx.map_err(|_| Error::ReceiveError).flatten()
}
/// Spawns a future on the RPC runtime.
@@ -425,65 +415,6 @@ impl RequestFactory {
}
-#[derive(Debug)]
-pub struct CancelErr(());
-
-pub struct Cancellable<F: Future> {
- rx: oneshot::Receiver<()>,
- f: F,
-}
-
-pub struct CancelHandle {
- tx: oneshot::Sender<()>,
-}
-
-impl CancelHandle {
- pub fn cancel(self) {
- let _ = self.tx.send(());
- }
-}
-
-
-impl<F> Cancellable<F>
-where
- F: Future,
-{
- pub fn new(f: F) -> (Self, CancelHandle) {
- let (tx, rx) = oneshot::channel();
- (Self { f, rx }, CancelHandle { tx })
- }
-
- async fn into_future(self) -> std::result::Result<F::Output, CancelErr> {
- futures::select! {
- _cancelled = self.rx.fuse() => {
- Err(CancelErr(()))
- },
- value = self.f.fuse() => {
- Ok(value)
- }
- }
- }
-}
-
-
-impl<F: Future<Output = T> + Unpin, T: Unpin> Future for Cancellable<F> {
- type Output = std::result::Result<T, CancelErr>;
-
- fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- let inner = self.get_mut();
-
- if let Poll::Ready(ready) = inner.f.poll_unpin(cx) {
- return Poll::Ready(Ok(ready));
- }
-
- if let Poll::Ready(_) = inner.rx.poll_unpin(cx) {
- return Poll::Ready(Err(CancelErr(())));
- }
-
- Poll::Pending
- }
-}
-
pub fn get_request<T: serde::de::DeserializeOwned>(
factory: &RequestFactory,
service: RequestServiceHandle,