diff options
| -rw-r--r-- | Cargo.lock | 1 | ||||
| -rw-r--r-- | mullvad-daemon/src/lib.rs | 6 | ||||
| -rw-r--r-- | mullvad-daemon/src/wireguard.rs | 3 | ||||
| -rw-r--r-- | mullvad-rpc/Cargo.toml | 1 | ||||
| -rw-r--r-- | mullvad-rpc/src/rest.rs | 85 | ||||
| -rw-r--r-- | talpid-core/src/lib.rs | 3 |
6 files changed, 17 insertions, 82 deletions
diff --git a/Cargo.lock b/Cargo.lock index e9415bc4b6..2fcdf232d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1527,6 +1527,7 @@ dependencies = [ "regex 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.101 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.41 (registry+https://github.com/rust-lang/crates.io-index)", + "talpid-core 0.1.0", "talpid-types 0.1.0", "tempfile 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "tokio 0.2.21 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/mullvad-daemon/src/lib.rs b/mullvad-daemon/src/lib.rs index f7755fbf75..3e5ba1dda9 100644 --- a/mullvad-daemon/src/lib.rs +++ b/mullvad-daemon/src/lib.rs @@ -28,10 +28,7 @@ use futures01::{ Future, Stream, }; use log::{debug, error, info, warn}; -use mullvad_rpc::{ - rest::{CancelHandle, Cancellable}, - AccountsProxy, -}; +use mullvad_rpc::AccountsProxy; use mullvad_types::{ account::{AccountData, AccountToken, VoucherSubmission}, endpoint::MullvadEndpoint, @@ -61,6 +58,7 @@ use std::{ #[cfg(target_os = "linux")] use talpid_core::split_tunnel; use talpid_core::{ + future_cancel::{CancelHandle, Cancellable}, mpsc::Sender, tunnel_state_machine::{self, TunnelCommand, TunnelParametersGenerator}, }; diff --git a/mullvad-daemon/src/wireguard.rs b/mullvad-daemon/src/wireguard.rs index 923fc4fb34..9206f97be2 100644 --- a/mullvad-daemon/src/wireguard.rs +++ b/mullvad-daemon/src/wireguard.rs @@ -1,6 +1,6 @@ use crate::{account_history::AccountHistory, DaemonEventSender, InternalDaemonEvent}; use chrono::offset::Utc; -use mullvad_rpc::rest::{CancelHandle, Cancellable, Error as RestError, MullvadRestHandle}; +use mullvad_rpc::rest::{Error as RestError, MullvadRestHandle}; use mullvad_types::account::AccountToken; pub use mullvad_types::wireguard::*; use std::{ @@ -10,6 +10,7 @@ use std::{ }; use talpid_core::{ + future_cancel::{CancelHandle, Cancellable}, future_retry::{retry_future_with_backoff, ExponentialBackoff, Jittered}, mpsc::Sender, }; diff --git a/mullvad-rpc/Cargo.toml b/mullvad-rpc/Cargo.toml index 3c5a29a617..ad82fd736d 100644 --- a/mullvad-rpc/Cargo.toml +++ b/mullvad-rpc/Cargo.toml @@ -28,6 +28,7 @@ webpki = { version = "0.21", features = [] } mullvad-types = { path = "../mullvad-types" } talpid-types = { path = "../talpid-types" } +talpid-core = { path = "../talpid-core" } [dev-dependencies] filetime = "0.2" diff --git a/mullvad-rpc/src/rest.rs b/mullvad-rpc/src/rest.rs index c2677c7212..4a8220749b 100644 --- a/mullvad-rpc/src/rest.rs +++ b/mullvad-rpc/src/rest.rs @@ -2,7 +2,7 @@ use futures::{ channel::{mpsc, oneshot}, sink::SinkExt, stream::StreamExt, - FutureExt, TryFutureExt, + TryFutureExt, }; use futures01::Future as OldFuture; use hyper::{ @@ -10,16 +10,8 @@ use hyper::{ header::{self, HeaderValue}, Method, Uri, }; -use std::{ - collections::BTreeMap, - future::Future, - mem, - net::IpAddr, - pin::Pin, - str::FromStr, - task::{Context, Poll}, - time::Duration, -}; +use std::{collections::BTreeMap, future::Future, mem, net::IpAddr, str::FromStr, time::Duration}; +use talpid_core::future_cancel::{CancelErr, CancelHandle, Cancellable}; use tokio::runtime::Handle; pub use hyper::StatusCode; @@ -122,12 +114,10 @@ impl<C: Connect + Clone + Send + Sync + 'static> RequestService<C> { ); let future = async move { - let response = tokio::time::timeout( - timeout, - request_future.into_future().map_err(Error::Cancelled), - ) - .await - .map_err(Error::TimeoutError); + let response = + tokio::time::timeout(timeout, request_future.map_err(Error::Cancelled)) + .await + .map_err(Error::TimeoutError); let response = flatten_result(flatten_result(response)); @@ -219,7 +209,7 @@ impl RequestServiceHandle { }); - rx.map_err(|_| Error::Cancelled(CancelErr(()))).flatten() + rx.map_err(|_| Error::ReceiveError).flatten() } /// Spawns a future on the RPC runtime. @@ -425,65 +415,6 @@ impl RequestFactory { } -#[derive(Debug)] -pub struct CancelErr(()); - -pub struct Cancellable<F: Future> { - rx: oneshot::Receiver<()>, - f: F, -} - -pub struct CancelHandle { - tx: oneshot::Sender<()>, -} - -impl CancelHandle { - pub fn cancel(self) { - let _ = self.tx.send(()); - } -} - - -impl<F> Cancellable<F> -where - F: Future, -{ - pub fn new(f: F) -> (Self, CancelHandle) { - let (tx, rx) = oneshot::channel(); - (Self { f, rx }, CancelHandle { tx }) - } - - async fn into_future(self) -> std::result::Result<F::Output, CancelErr> { - futures::select! { - _cancelled = self.rx.fuse() => { - Err(CancelErr(())) - }, - value = self.f.fuse() => { - Ok(value) - } - } - } -} - - -impl<F: Future<Output = T> + Unpin, T: Unpin> Future for Cancellable<F> { - type Output = std::result::Result<T, CancelErr>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - let inner = self.get_mut(); - - if let Poll::Ready(ready) = inner.f.poll_unpin(cx) { - return Poll::Ready(Ok(ready)); - } - - if let Poll::Ready(_) = inner.rx.poll_unpin(cx) { - return Poll::Ready(Err(CancelErr(()))); - } - - Poll::Pending - } -} - pub fn get_request<T: serde::de::DeserializeOwned>( factory: &RequestFactory, service: RequestServiceHandle, diff --git a/talpid-core/src/lib.rs b/talpid-core/src/lib.rs index fbf06c8815..f41031ad56 100644 --- a/talpid-core/src/lib.rs +++ b/talpid-core/src/lib.rs @@ -48,6 +48,9 @@ pub mod tunnel_state_machine; /// Future utilities pub mod future_retry; +/// Cancel a future remotely +pub mod future_cancel; + #[cfg(not(target_os = "android"))] /// Internal code for managing bundled proxy software. mod proxy; |
