summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock1
-rw-r--r--mullvad-daemon/src/lib.rs6
-rw-r--r--mullvad-daemon/src/wireguard.rs3
-rw-r--r--mullvad-rpc/Cargo.toml1
-rw-r--r--mullvad-rpc/src/rest.rs85
-rw-r--r--talpid-core/src/lib.rs3
6 files changed, 17 insertions, 82 deletions
diff --git a/Cargo.lock b/Cargo.lock
index e9415bc4b6..2fcdf232d9 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1527,6 +1527,7 @@ dependencies = [
"regex 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.101 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_json 1.0.41 (registry+https://github.com/rust-lang/crates.io-index)",
+ "talpid-core 0.1.0",
"talpid-types 0.1.0",
"tempfile 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.2.21 (registry+https://github.com/rust-lang/crates.io-index)",
diff --git a/mullvad-daemon/src/lib.rs b/mullvad-daemon/src/lib.rs
index f7755fbf75..3e5ba1dda9 100644
--- a/mullvad-daemon/src/lib.rs
+++ b/mullvad-daemon/src/lib.rs
@@ -28,10 +28,7 @@ use futures01::{
Future, Stream,
};
use log::{debug, error, info, warn};
-use mullvad_rpc::{
- rest::{CancelHandle, Cancellable},
- AccountsProxy,
-};
+use mullvad_rpc::AccountsProxy;
use mullvad_types::{
account::{AccountData, AccountToken, VoucherSubmission},
endpoint::MullvadEndpoint,
@@ -61,6 +58,7 @@ use std::{
#[cfg(target_os = "linux")]
use talpid_core::split_tunnel;
use talpid_core::{
+ future_cancel::{CancelHandle, Cancellable},
mpsc::Sender,
tunnel_state_machine::{self, TunnelCommand, TunnelParametersGenerator},
};
diff --git a/mullvad-daemon/src/wireguard.rs b/mullvad-daemon/src/wireguard.rs
index 923fc4fb34..9206f97be2 100644
--- a/mullvad-daemon/src/wireguard.rs
+++ b/mullvad-daemon/src/wireguard.rs
@@ -1,6 +1,6 @@
use crate::{account_history::AccountHistory, DaemonEventSender, InternalDaemonEvent};
use chrono::offset::Utc;
-use mullvad_rpc::rest::{CancelHandle, Cancellable, Error as RestError, MullvadRestHandle};
+use mullvad_rpc::rest::{Error as RestError, MullvadRestHandle};
use mullvad_types::account::AccountToken;
pub use mullvad_types::wireguard::*;
use std::{
@@ -10,6 +10,7 @@ use std::{
};
use talpid_core::{
+ future_cancel::{CancelHandle, Cancellable},
future_retry::{retry_future_with_backoff, ExponentialBackoff, Jittered},
mpsc::Sender,
};
diff --git a/mullvad-rpc/Cargo.toml b/mullvad-rpc/Cargo.toml
index 3c5a29a617..ad82fd736d 100644
--- a/mullvad-rpc/Cargo.toml
+++ b/mullvad-rpc/Cargo.toml
@@ -28,6 +28,7 @@ webpki = { version = "0.21", features = [] }
mullvad-types = { path = "../mullvad-types" }
talpid-types = { path = "../talpid-types" }
+talpid-core = { path = "../talpid-core" }
[dev-dependencies]
filetime = "0.2"
diff --git a/mullvad-rpc/src/rest.rs b/mullvad-rpc/src/rest.rs
index c2677c7212..4a8220749b 100644
--- a/mullvad-rpc/src/rest.rs
+++ b/mullvad-rpc/src/rest.rs
@@ -2,7 +2,7 @@ use futures::{
channel::{mpsc, oneshot},
sink::SinkExt,
stream::StreamExt,
- FutureExt, TryFutureExt,
+ TryFutureExt,
};
use futures01::Future as OldFuture;
use hyper::{
@@ -10,16 +10,8 @@ use hyper::{
header::{self, HeaderValue},
Method, Uri,
};
-use std::{
- collections::BTreeMap,
- future::Future,
- mem,
- net::IpAddr,
- pin::Pin,
- str::FromStr,
- task::{Context, Poll},
- time::Duration,
-};
+use std::{collections::BTreeMap, future::Future, mem, net::IpAddr, str::FromStr, time::Duration};
+use talpid_core::future_cancel::{CancelErr, CancelHandle, Cancellable};
use tokio::runtime::Handle;
pub use hyper::StatusCode;
@@ -122,12 +114,10 @@ impl<C: Connect + Clone + Send + Sync + 'static> RequestService<C> {
);
let future = async move {
- let response = tokio::time::timeout(
- timeout,
- request_future.into_future().map_err(Error::Cancelled),
- )
- .await
- .map_err(Error::TimeoutError);
+ let response =
+ tokio::time::timeout(timeout, request_future.map_err(Error::Cancelled))
+ .await
+ .map_err(Error::TimeoutError);
let response = flatten_result(flatten_result(response));
@@ -219,7 +209,7 @@ impl RequestServiceHandle {
});
- rx.map_err(|_| Error::Cancelled(CancelErr(()))).flatten()
+ rx.map_err(|_| Error::ReceiveError).flatten()
}
/// Spawns a future on the RPC runtime.
@@ -425,65 +415,6 @@ impl RequestFactory {
}
-#[derive(Debug)]
-pub struct CancelErr(());
-
-pub struct Cancellable<F: Future> {
- rx: oneshot::Receiver<()>,
- f: F,
-}
-
-pub struct CancelHandle {
- tx: oneshot::Sender<()>,
-}
-
-impl CancelHandle {
- pub fn cancel(self) {
- let _ = self.tx.send(());
- }
-}
-
-
-impl<F> Cancellable<F>
-where
- F: Future,
-{
- pub fn new(f: F) -> (Self, CancelHandle) {
- let (tx, rx) = oneshot::channel();
- (Self { f, rx }, CancelHandle { tx })
- }
-
- async fn into_future(self) -> std::result::Result<F::Output, CancelErr> {
- futures::select! {
- _cancelled = self.rx.fuse() => {
- Err(CancelErr(()))
- },
- value = self.f.fuse() => {
- Ok(value)
- }
- }
- }
-}
-
-
-impl<F: Future<Output = T> + Unpin, T: Unpin> Future for Cancellable<F> {
- type Output = std::result::Result<T, CancelErr>;
-
- fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- let inner = self.get_mut();
-
- if let Poll::Ready(ready) = inner.f.poll_unpin(cx) {
- return Poll::Ready(Ok(ready));
- }
-
- if let Poll::Ready(_) = inner.rx.poll_unpin(cx) {
- return Poll::Ready(Err(CancelErr(())));
- }
-
- Poll::Pending
- }
-}
-
pub fn get_request<T: serde::de::DeserializeOwned>(
factory: &RequestFactory,
service: RequestServiceHandle,
diff --git a/talpid-core/src/lib.rs b/talpid-core/src/lib.rs
index fbf06c8815..f41031ad56 100644
--- a/talpid-core/src/lib.rs
+++ b/talpid-core/src/lib.rs
@@ -48,6 +48,9 @@ pub mod tunnel_state_machine;
/// Future utilities
pub mod future_retry;
+/// Cancel a future remotely
+pub mod future_cancel;
+
#[cfg(not(target_os = "android"))]
/// Internal code for managing bundled proxy software.
mod proxy;