summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorDavid Lönnhager <david.l@mullvad.net>2021-10-01 15:49:30 +0200
committerDavid Lönnhager <david.l@mullvad.net>2021-10-01 15:49:30 +0200
commit7feeb5252d8f76ca7823b2e7f8c340e8a7e802e7 (patch)
tree6e7faa1ee1f23e2c985c6abf5b791b80a2a5b434
parent478e3ed78bc07719283952b9743adda9e3bc6d96 (diff)
parent8e78e7bf97f59a39055c0d7183b836e057164cd6 (diff)
downloadmullvadvpn-7feeb5252d8f76ca7823b2e7f8c340e8a7e802e7.tar.xz
mullvadvpn-7feeb5252d8f76ca7823b2e7f8c340e8a7e802e7.zip
Merge branch 'retry-frontend-api-reqs'
-rw-r--r--mullvad-daemon/src/account.rs82
-rw-r--r--mullvad-daemon/src/lib.rs7
-rw-r--r--mullvad-daemon/src/relays.rs4
-rw-r--r--mullvad-daemon/src/version_check.rs23
-rw-r--r--mullvad-daemon/src/wireguard.rs68
-rw-r--r--mullvad-problem-report/src/lib.rs8
-rw-r--r--mullvad-rpc/src/lib.rs17
-rw-r--r--mullvad-rpc/src/rest.rs50
-rw-r--r--mullvad-setup/src/main.rs27
-rw-r--r--talpid-core/src/future_retry.rs33
10 files changed, 239 insertions, 80 deletions
diff --git a/mullvad-daemon/src/account.rs b/mullvad-daemon/src/account.rs
index 88b996743a..9fda91a813 100644
--- a/mullvad-daemon/src/account.rs
+++ b/mullvad-daemon/src/account.rs
@@ -2,16 +2,21 @@ use chrono::{DateTime, Utc};
use futures::future::{abortable, AbortHandle};
use mullvad_rpc::{
availability::ApiAvailabilityHandle,
- rest::{self, MullvadRestHandle},
+ rest::{self, Error as RestError, MullvadRestHandle},
AccountsProxy,
};
use mullvad_types::account::{AccountToken, VoucherSubmission};
-use std::time::Duration;
-use talpid_core::future_retry::{retry_future_with_backoff, ExponentialBackoff, Jittered};
+use std::{future::Future, time::Duration};
+use talpid_core::future_retry::{
+ constant_interval, retry_future, retry_future_n, ExponentialBackoff, Jittered,
+};
+
+const RETRY_ACTION_INTERVAL: Duration = Duration::ZERO;
+const RETRY_ACTION_MAX_RETRIES: usize = 2;
-const RETRY_INTERVAL_INITIAL: Duration = Duration::from_secs(4);
-const RETRY_INTERVAL_FACTOR: u32 = 5;
-const RETRY_INTERVAL_MAX: Duration = Duration::from_secs(24 * 60 * 60);
+const RETRY_EXPIRY_CHECK_INTERVAL_INITIAL: Duration = Duration::from_secs(4);
+const RETRY_EXPIRY_CHECK_INTERVAL_FACTOR: u32 = 5;
+const RETRY_EXPIRY_CHECK_INTERVAL_MAX: Duration = Duration::from_secs(24 * 60 * 60);
pub struct Account(());
@@ -20,12 +25,45 @@ pub struct Account(());
pub struct AccountHandle {
api_availability: ApiAvailabilityHandle,
initial_check_abort_handle: AbortHandle,
- pub proxy: AccountsProxy,
+ proxy: AccountsProxy,
}
impl AccountHandle {
+ pub fn create_account(&self) -> impl Future<Output = Result<AccountToken, rest::Error>> {
+ let mut proxy = self.proxy.clone();
+ let api_handle = self.api_availability.clone();
+ retry_future_n(
+ move || proxy.create_account(),
+ move |result| Self::should_retry(result, &api_handle),
+ constant_interval(RETRY_ACTION_INTERVAL),
+ RETRY_ACTION_MAX_RETRIES,
+ )
+ }
+
+ pub fn get_www_auth_token(
+ &self,
+ account: AccountToken,
+ ) -> impl Future<Output = Result<String, rest::Error>> {
+ let proxy = self.proxy.clone();
+ let api_handle = self.api_availability.clone();
+ retry_future_n(
+ move || proxy.get_www_auth_token(account.clone()),
+ move |result| Self::should_retry(result, &api_handle),
+ constant_interval(RETRY_ACTION_INTERVAL),
+ RETRY_ACTION_MAX_RETRIES,
+ )
+ }
+
pub async fn check_expiry(&self, token: AccountToken) -> Result<DateTime<Utc>, rest::Error> {
- let result = self.proxy.get_expiry(token).await;
+ let proxy = self.proxy.clone();
+ let api_handle = self.api_availability.clone();
+ let result = retry_future_n(
+ move || proxy.get_expiry(token.clone()),
+ move |result| Self::should_retry(result, &api_handle),
+ constant_interval(RETRY_ACTION_INTERVAL),
+ RETRY_ACTION_MAX_RETRIES,
+ )
+ .await;
if handle_expiry_result_inner(&result, &self.api_availability) {
self.initial_check_abort_handle.abort();
}
@@ -37,13 +75,28 @@ impl AccountHandle {
account_token: AccountToken,
voucher: String,
) -> Result<VoucherSubmission, rest::Error> {
- let result = self.proxy.submit_voucher(account_token, voucher).await;
+ let mut proxy = self.proxy.clone();
+ let api_handle = self.api_availability.clone();
+ let result = retry_future_n(
+ move || proxy.submit_voucher(account_token.clone(), voucher.clone()),
+ move |result| Self::should_retry(result, &api_handle),
+ constant_interval(RETRY_ACTION_INTERVAL),
+ RETRY_ACTION_MAX_RETRIES,
+ )
+ .await;
if result.is_ok() {
self.initial_check_abort_handle.abort();
self.api_availability.resume();
}
result
}
+
+ fn should_retry<T>(result: &Result<T, RestError>, api_handle: &ApiAvailabilityHandle) -> bool {
+ match result {
+ Err(error) if error.is_network_error() => !api_handle.get_state().is_offline(),
+ _ => false,
+ }
+ }
}
impl Account {
@@ -68,8 +121,11 @@ impl Account {
};
let retry_strategy = Jittered::jitter(
- ExponentialBackoff::new(RETRY_INTERVAL_INITIAL, RETRY_INTERVAL_FACTOR)
- .max_delay(RETRY_INTERVAL_MAX),
+ ExponentialBackoff::new(
+ RETRY_EXPIRY_CHECK_INTERVAL_INITIAL,
+ RETRY_EXPIRY_CHECK_INTERVAL_FACTOR,
+ )
+ .max_delay(RETRY_EXPIRY_CHECK_INTERVAL_MAX),
);
let future_generator = move || {
let wait_online = api_availability.wait_online();
@@ -81,9 +137,7 @@ impl Account {
}
};
let should_retry = move |state_was_updated: &bool| -> bool { !*state_was_updated };
- let retry_future =
- retry_future_with_backoff(future_generator, should_retry, retry_strategy);
- retry_future.await;
+ retry_future(future_generator, should_retry, retry_strategy).await;
});
runtime.spawn(future);
diff --git a/mullvad-daemon/src/lib.rs b/mullvad-daemon/src/lib.rs
index a7493f1d82..15733d2f29 100644
--- a/mullvad-daemon/src/lib.rs
+++ b/mullvad-daemon/src/lib.rs
@@ -1452,8 +1452,7 @@ where
async fn on_create_new_account(&mut self, tx: ResponseTx<String, Error>) {
let daemon_tx = self.tx.clone();
- let future = self.account.proxy.create_account();
-
+ let future = self.account.create_account();
tokio::spawn(async move {
match future.await {
Ok(account_token) => {
@@ -1484,7 +1483,7 @@ where
async fn on_get_www_auth_token(&mut self, tx: ResponseTx<String, Error>) {
if let Some(account_token) = self.settings.get_account_token() {
- let future = self.account.proxy.get_www_auth_token(account_token);
+ let future = self.account.get_www_auth_token(account_token);
let rpc_call = async {
Self::oneshot_send(
tx,
@@ -1592,7 +1591,7 @@ where
{
let remove_key = self
.wireguard_key_manager
- .remove_key(previous_token, previous_key);
+ .remove_key_with_backoff(previous_token, previous_key);
tokio::spawn(async move {
if let Err(error) = remove_key.await {
log::error!(
diff --git a/mullvad-daemon/src/relays.rs b/mullvad-daemon/src/relays.rs
index 607724a534..9811864630 100644
--- a/mullvad-daemon/src/relays.rs
+++ b/mullvad-daemon/src/relays.rs
@@ -29,7 +29,7 @@ use std::{
sync::Arc,
time::{self, Duration, Instant, SystemTime},
};
-use talpid_core::future_retry::{retry_future_with_backoff, ExponentialBackoff, Jittered};
+use talpid_core::future_retry::{retry_future, ExponentialBackoff, Jittered};
use talpid_types::{
net::{
all_of_the_internet, openvpn::ProxySettings, wireguard, IpVersion, TransportProtocol,
@@ -1141,7 +1141,7 @@ impl RelayListUpdater {
ExponentialBackoff::new(EXPONENTIAL_BACKOFF_INITIAL, EXPONENTIAL_BACKOFF_FACTOR)
.max_delay(UPDATE_INTERVAL * 2);
- let download_future = retry_future_with_backoff(
+ let download_future = retry_future(
download_futures,
|result| result.is_err(),
Jittered::jitter(exponential_backoff),
diff --git a/mullvad-daemon/src/version_check.rs b/mullvad-daemon/src/version_check.rs
index 44259eb1c6..dfcc82b6e8 100644
--- a/mullvad-daemon/src/version_check.rs
+++ b/mullvad-daemon/src/version_check.rs
@@ -38,7 +38,7 @@ const UPDATE_INTERVAL: Duration = Duration::from_secs(60 * 60 * 24);
/// Wait this long until next try if an update failed
const UPDATE_INTERVAL_ERROR: Duration = Duration::from_secs(60 * 60 * 6);
/// Retry interval for `RunVersionCheck`.
-const IMMEDIATE_UPDATE_INTERVAL_ERROR: Duration = Duration::from_secs(1);
+const IMMEDIATE_UPDATE_INTERVAL_ERROR: Duration = Duration::ZERO;
const IMMEDIATE_UPDATE_MAX_RETRIES: usize = 2;
#[cfg(target_os = "linux")]
@@ -204,13 +204,26 @@ impl VersionUpdater {
.map_err(Error::Download)
};
- Box::pin(talpid_core::future_retry::retry_future_with_backoff(
+ Box::pin(talpid_core::future_retry::retry_future_n(
download_future_factory,
- move |result| result.is_err() && !api_handle.get_state().is_offline(),
- std::iter::repeat(IMMEDIATE_UPDATE_INTERVAL_ERROR).take(IMMEDIATE_UPDATE_MAX_RETRIES),
+ move |result| Self::should_retry_immediate(result, &api_handle),
+ std::iter::repeat(IMMEDIATE_UPDATE_INTERVAL_ERROR),
+ IMMEDIATE_UPDATE_MAX_RETRIES,
))
}
+ fn should_retry_immediate<T>(
+ result: &Result<T, Error>,
+ api_handle: &ApiAvailabilityHandle,
+ ) -> bool {
+ match result {
+ Err(Error::Download(error)) if error.is_network_error() => {
+ !api_handle.get_state().is_offline()
+ }
+ _ => false,
+ }
+ }
+
fn create_update_background_future(
&self,
) -> std::pin::Pin<
@@ -232,7 +245,7 @@ impl VersionUpdater {
}
};
- Box::pin(talpid_core::future_retry::retry_future_with_backoff(
+ Box::pin(talpid_core::future_retry::retry_future(
download_future_factory,
|result| result.is_err(),
std::iter::repeat(UPDATE_INTERVAL_ERROR),
diff --git a/mullvad-daemon/src/wireguard.rs b/mullvad-daemon/src/wireguard.rs
index c0c260de29..8f0eacd454 100644
--- a/mullvad-daemon/src/wireguard.rs
+++ b/mullvad-daemon/src/wireguard.rs
@@ -9,8 +9,10 @@ pub use mullvad_types::wireguard::*;
use std::{future::Future, pin::Pin, time::Duration};
use futures::future::{abortable, AbortHandle};
+#[cfg(not(target_os = "android"))]
+use talpid_core::future_retry::constant_interval;
use talpid_core::{
- future_retry::{retry_future_with_backoff, ExponentialBackoff, Jittered},
+ future_retry::{retry_future, retry_future_n, ExponentialBackoff, Jittered},
mpsc::Sender,
};
@@ -30,6 +32,11 @@ const RETRY_INTERVAL_INITIAL: Duration = Duration::from_secs(4);
const RETRY_INTERVAL_FACTOR: u32 = 5;
const RETRY_INTERVAL_MAX: Duration = Duration::from_secs(24 * 60 * 60);
+#[cfg(not(target_os = "android"))]
+const SHORT_RETRY_INTERVAL: Duration = Duration::ZERO;
+
+const MAX_KEY_REMOVAL_RETRIES: usize = 2;
+
#[derive(err_derive::Error, Debug)]
pub enum Error {
#[error(display = "Unexpected HTTP request error")]
@@ -138,17 +145,61 @@ impl KeyManager {
}
/// Removes a key from an account
+ #[cfg(not(target_os = "android"))]
pub fn remove_key(
&self,
account: AccountToken,
key: talpid_types::net::wireguard::PublicKey,
) -> impl Future<Output = Result<()>> {
+ self.remove_key_inner(account, key, constant_interval(SHORT_RETRY_INTERVAL), false)
+ }
+
+ /// Removes a key from an account
+ pub fn remove_key_with_backoff(
+ &self,
+ account: AccountToken,
+ key: talpid_types::net::wireguard::PublicKey,
+ ) -> impl Future<Output = Result<()>> {
+ let retry_strategy = Jittered::jitter(
+ ExponentialBackoff::new(RETRY_INTERVAL_INITIAL, RETRY_INTERVAL_FACTOR)
+ .max_delay(RETRY_INTERVAL_MAX),
+ );
+ self.remove_key_inner(account, key, retry_strategy, true)
+ }
+
+ fn remove_key_inner<D: Iterator<Item = Duration> + 'static>(
+ &self,
+ account: AccountToken,
+ key: talpid_types::net::wireguard::PublicKey,
+ retry_strategy: D,
+ offline_check: bool,
+ ) -> impl Future<Output = Result<()>> {
let mut rpc = mullvad_rpc::WireguardKeyProxy::new(self.http_handle.clone());
- async move {
- rpc.remove_wireguard_key(account, &key)
- .await
- .map_err(Self::map_rpc_error)
- }
+ let api_handle = self.availability_handle.clone();
+ let api_handle_2 = api_handle.clone();
+ let future = retry_future_n(
+ move || {
+ let remove_key = rpc.remove_wireguard_key(account.clone(), key.clone());
+ let wait_future = api_handle.wait_online();
+ async move {
+ if offline_check {
+ let _ = wait_future.await;
+ }
+ remove_key.await
+ }
+ },
+ move |result| match result {
+ Ok(_) => false,
+ Err(error) => Self::should_retry_removal(error, &api_handle_2),
+ },
+ retry_strategy,
+ MAX_KEY_REMOVAL_RETRIES,
+ );
+ async move { future.await.map_err(Self::map_rpc_error) }
+ }
+
+ fn should_retry_removal(error: &RestError, api_handle: &ApiAvailabilityHandle) -> bool {
+ error.is_network_error() && !api_handle.get_state().is_offline()
}
fn should_retry(error: &RestError) -> bool {
@@ -218,8 +269,7 @@ impl KeyManager {
}
};
- let upload_future =
- retry_future_with_backoff(future_generator, should_retry, retry_strategy);
+ let upload_future = retry_future(future_generator, should_retry, retry_strategy);
let (cancellable_upload, abort_handle) = abortable(Box::pin(upload_future));
@@ -417,7 +467,7 @@ impl KeyManager {
}
};
- retry_future_with_backoff(
+ retry_future(
move || rotate_key.clone()(&old_key),
should_retry,
retry_strategy,
diff --git a/mullvad-problem-report/src/lib.rs b/mullvad-problem-report/src/lib.rs
index 26cbcb4fb1..e6312062ff 100644
--- a/mullvad-problem-report/src/lib.rs
+++ b/mullvad-problem-report/src/lib.rs
@@ -10,7 +10,6 @@ use std::{
fs::{self, File},
io::{self, BufWriter, Read, Seek, SeekFrom, Write},
path::{Path, PathBuf},
- time::Duration,
};
use talpid_types::ErrorExt;
@@ -35,7 +34,6 @@ const LINE_SEPARATOR: &str = "\n";
const LINE_SEPARATOR: &str = "\r\n";
const MAX_SEND_ATTEMPTS: usize = 3;
-const RETRY_INTERVAL: Duration = Duration::from_millis(500);
/// Custom macro to write a line to an output formatter that uses platform-specific newline
/// character sequences.
@@ -306,10 +304,12 @@ pub fn send_problem_report(
eprintln!(
"{}",
error.display_chain_with_msg("Failed to send problem report")
- )
+ );
+ if !error.is_network_error() {
+ break;
+ }
}
}
- tokio::time::sleep(RETRY_INTERVAL).await;
}
Err(Error::SendProblemReportError)
})
diff --git a/mullvad-rpc/src/lib.rs b/mullvad-rpc/src/lib.rs
index 098ed2f0b4..883128f8aa 100644
--- a/mullvad-rpc/src/lib.rs
+++ b/mullvad-rpc/src/lib.rs
@@ -492,14 +492,13 @@ impl WireguardKeyProxy {
rest::deserialize_body(response).await
}
- pub async fn remove_wireguard_key(
+ pub fn remove_wireguard_key(
&mut self,
account_token: AccountToken,
- key: &wireguard::PublicKey,
- ) -> Result<(), rest::Error> {
+ key: wireguard::PublicKey,
+ ) -> impl Future<Output = Result<(), rest::Error>> {
let service = self.handle.service.clone();
-
- let _ = rest::send_request(
+ let future = rest::send_request(
&self.handle.factory,
service,
&format!(
@@ -509,9 +508,11 @@ impl WireguardKeyProxy {
Method::DELETE,
Some(account_token),
StatusCode::NO_CONTENT,
- )
- .await?;
- Ok(())
+ );
+ async move {
+ let _ = future.await?;
+ Ok(())
+ }
}
}
diff --git a/mullvad-rpc/src/rest.rs b/mullvad-rpc/src/rest.rs
index 77bf06fd55..779da6ccf5 100644
--- a/mullvad-rpc/src/rest.rs
+++ b/mullvad-rpc/src/rest.rs
@@ -75,6 +75,15 @@ pub enum Error {
UriError(#[error(source)] http::uri::InvalidUri),
}
+impl Error {
+ pub fn is_network_error(&self) -> bool {
+ match self {
+ Error::HyperError(_) | Error::TimeoutError(_) => true,
+ _ => false,
+ }
+ }
+}
+
/// A service that executes HTTP requests, allowing for on-demand termination of all in-flight
/// requests
pub(crate) struct RequestService {
@@ -148,30 +157,27 @@ impl RequestService {
let response = flatten_result(flatten_result(response));
if let Some(host_addr) = host_addr {
if let Err(err) = &response {
- match err {
- Error::HyperError(_) | Error::TimeoutError(_) => {
- log::error!(
- "{}",
- err.display_chain_with_msg("HTTP request failed")
- );
- if !api_availability.get_state().is_offline() {
- let current_address = address_cache.peek_address();
- if current_address == host_addr
- && address_cache.has_tried_current_address()
- {
- handle.spawn(async move {
- address_cache.select_new_address().await;
- let new_address = address_cache.peek_address();
- log::error!(
- "Request failed using address {}. Trying next API address: {}",
- current_address,
- new_address,
- );
- });
- }
+ if err.is_network_error() {
+ log::error!(
+ "{}",
+ err.display_chain_with_msg("HTTP request failed")
+ );
+ if !api_availability.get_state().is_offline() {
+ let current_address = address_cache.peek_address();
+ if current_address == host_addr
+ && address_cache.has_tried_current_address()
+ {
+ handle.spawn(async move {
+ address_cache.select_new_address().await;
+ let new_address = address_cache.peek_address();
+ log::error!(
+ "Request failed using address {}. Trying next API address: {}",
+ current_address,
+ new_address,
+ );
+ });
}
}
- _ => (),
}
}
}
diff --git a/mullvad-setup/src/main.rs b/mullvad-setup/src/main.rs
index 5f88e3456f..396d456afa 100644
--- a/mullvad-setup/src/main.rs
+++ b/mullvad-setup/src/main.rs
@@ -2,8 +2,11 @@ use clap::{crate_authors, crate_description, crate_name, SubCommand};
use mullvad_management_interface::new_rpc_client;
use mullvad_rpc::MullvadRpcRuntime;
use mullvad_types::version::ParsedAppVersion;
-use std::{path::PathBuf, process};
-use talpid_core::firewall::{self, Firewall, FirewallArguments};
+use std::{path::PathBuf, process, time::Duration};
+use talpid_core::{
+ firewall::{self, Firewall, FirewallArguments},
+ future_retry::{constant_interval, retry_future_n},
+};
use talpid_types::ErrorExt;
pub const PRODUCT_VERSION: &str = include_str!(concat!(env!("OUT_DIR"), "/product-version.txt"));
@@ -13,6 +16,9 @@ lazy_static::lazy_static! {
static ref IS_DEV_BUILD: bool = APP_VERSION.is_dev();
}
+const KEY_RETRY_INTERVAL: Duration = Duration::ZERO;
+const KEY_RETRY_MAX_RETRIES: usize = 2;
+
#[repr(i32)]
enum ExitStatus {
Ok = 0,
@@ -178,10 +184,19 @@ async fn remove_wireguard_key() -> Result<(), Error> {
.map_err(Error::RpcInitializationError)?;
let mut key_proxy =
mullvad_rpc::WireguardKeyProxy::new(rpc_runtime.mullvad_rest_handle());
- key_proxy
- .remove_wireguard_key(token, &wg_data.private_key.public_key())
- .await
- .map_err(Error::RemoveKeyError)?;
+ retry_future_n(
+ move || {
+ key_proxy.remove_wireguard_key(token.clone(), wg_data.private_key.public_key())
+ },
+ move |result| match result {
+ Err(error) => error.is_network_error(),
+ _ => false,
+ },
+ constant_interval(KEY_RETRY_INTERVAL),
+ KEY_RETRY_MAX_RETRIES,
+ )
+ .await
+ .map_err(Error::RemoveKeyError)?;
settings
.set_wireguard(None)
.await
diff --git a/talpid-core/src/future_retry.rs b/talpid-core/src/future_retry.rs
index 0e494cc96c..3c640a9037 100644
--- a/talpid-core/src/future_retry.rs
+++ b/talpid-core/src/future_retry.rs
@@ -5,8 +5,26 @@ use std::{future::Future, time::Duration};
/// required - run a timer for 60 seconds until a delay is shorter than 5 minutes.
const MAX_SINGLE_DELAY: Duration = Duration::from_secs(5 * 60);
-/// Retries a future until it should stop as determined by the retry function.
-pub async fn retry_future_with_backoff<
+/// Convenience function that works like [`retry_future`] but limits the number
+/// of retries to `max_retries`.
+pub async fn retry_future_n<
+ F: FnMut() -> O + 'static,
+ R: FnMut(&T) -> bool + 'static,
+ D: Iterator<Item = Duration> + 'static,
+ O: Future<Output = T>,
+ T,
+>(
+ factory: F,
+ should_retry: R,
+ delays: D,
+ max_retries: usize,
+) -> T {
+ retry_future(factory, should_retry, delays.take(max_retries)).await
+}
+
+/// Retries a future until it should stop as determined by the retry function, or when
+/// the iterator returns `None`.
+pub async fn retry_future<
F: FnMut() -> O + 'static,
R: FnMut(&T) -> bool + 'static,
D: Iterator<Item = Duration> + 'static,
@@ -22,15 +40,18 @@ pub async fn retry_future_with_backoff<
if should_retry(&current_result) {
if let Some(delay) = delays.next() {
sleep(delay).await;
- } else {
- return current_result;
+ continue;
}
- } else {
- return current_result;
}
+ return current_result;
}
}
+/// Returns an iterator that repeats the same interval.
+pub fn constant_interval(interval: Duration) -> impl Iterator<Item = Duration> {
+ std::iter::repeat(interval)
+}
+
async fn sleep(mut delay: Duration) {
while delay > MAX_SINGLE_DELAY {
delay -= MAX_SINGLE_DELAY;