diff options
| author | David Lönnhager <david.l@mullvad.net> | 2021-10-01 15:49:30 +0200 |
|---|---|---|
| committer | David Lönnhager <david.l@mullvad.net> | 2021-10-01 15:49:30 +0200 |
| commit | 7feeb5252d8f76ca7823b2e7f8c340e8a7e802e7 (patch) | |
| tree | 6e7faa1ee1f23e2c985c6abf5b791b80a2a5b434 | |
| parent | 478e3ed78bc07719283952b9743adda9e3bc6d96 (diff) | |
| parent | 8e78e7bf97f59a39055c0d7183b836e057164cd6 (diff) | |
| download | mullvadvpn-7feeb5252d8f76ca7823b2e7f8c340e8a7e802e7.tar.xz mullvadvpn-7feeb5252d8f76ca7823b2e7f8c340e8a7e802e7.zip | |
Merge branch 'retry-frontend-api-reqs'
| -rw-r--r-- | mullvad-daemon/src/account.rs | 82 | ||||
| -rw-r--r-- | mullvad-daemon/src/lib.rs | 7 | ||||
| -rw-r--r-- | mullvad-daemon/src/relays.rs | 4 | ||||
| -rw-r--r-- | mullvad-daemon/src/version_check.rs | 23 | ||||
| -rw-r--r-- | mullvad-daemon/src/wireguard.rs | 68 | ||||
| -rw-r--r-- | mullvad-problem-report/src/lib.rs | 8 | ||||
| -rw-r--r-- | mullvad-rpc/src/lib.rs | 17 | ||||
| -rw-r--r-- | mullvad-rpc/src/rest.rs | 50 | ||||
| -rw-r--r-- | mullvad-setup/src/main.rs | 27 | ||||
| -rw-r--r-- | talpid-core/src/future_retry.rs | 33 |
10 files changed, 239 insertions, 80 deletions
diff --git a/mullvad-daemon/src/account.rs b/mullvad-daemon/src/account.rs index 88b996743a..9fda91a813 100644 --- a/mullvad-daemon/src/account.rs +++ b/mullvad-daemon/src/account.rs @@ -2,16 +2,21 @@ use chrono::{DateTime, Utc}; use futures::future::{abortable, AbortHandle}; use mullvad_rpc::{ availability::ApiAvailabilityHandle, - rest::{self, MullvadRestHandle}, + rest::{self, Error as RestError, MullvadRestHandle}, AccountsProxy, }; use mullvad_types::account::{AccountToken, VoucherSubmission}; -use std::time::Duration; -use talpid_core::future_retry::{retry_future_with_backoff, ExponentialBackoff, Jittered}; +use std::{future::Future, time::Duration}; +use talpid_core::future_retry::{ + constant_interval, retry_future, retry_future_n, ExponentialBackoff, Jittered, +}; + +const RETRY_ACTION_INTERVAL: Duration = Duration::ZERO; +const RETRY_ACTION_MAX_RETRIES: usize = 2; -const RETRY_INTERVAL_INITIAL: Duration = Duration::from_secs(4); -const RETRY_INTERVAL_FACTOR: u32 = 5; -const RETRY_INTERVAL_MAX: Duration = Duration::from_secs(24 * 60 * 60); +const RETRY_EXPIRY_CHECK_INTERVAL_INITIAL: Duration = Duration::from_secs(4); +const RETRY_EXPIRY_CHECK_INTERVAL_FACTOR: u32 = 5; +const RETRY_EXPIRY_CHECK_INTERVAL_MAX: Duration = Duration::from_secs(24 * 60 * 60); pub struct Account(()); @@ -20,12 +25,45 @@ pub struct Account(()); pub struct AccountHandle { api_availability: ApiAvailabilityHandle, initial_check_abort_handle: AbortHandle, - pub proxy: AccountsProxy, + proxy: AccountsProxy, } impl AccountHandle { + pub fn create_account(&self) -> impl Future<Output = Result<AccountToken, rest::Error>> { + let mut proxy = self.proxy.clone(); + let api_handle = self.api_availability.clone(); + retry_future_n( + move || proxy.create_account(), + move |result| Self::should_retry(result, &api_handle), + constant_interval(RETRY_ACTION_INTERVAL), + RETRY_ACTION_MAX_RETRIES, + ) + } + + pub fn get_www_auth_token( + &self, + account: AccountToken, + ) -> impl Future<Output = Result<String, rest::Error>> { + let proxy = self.proxy.clone(); + let api_handle = self.api_availability.clone(); + retry_future_n( + move || proxy.get_www_auth_token(account.clone()), + move |result| Self::should_retry(result, &api_handle), + constant_interval(RETRY_ACTION_INTERVAL), + RETRY_ACTION_MAX_RETRIES, + ) + } + pub async fn check_expiry(&self, token: AccountToken) -> Result<DateTime<Utc>, rest::Error> { - let result = self.proxy.get_expiry(token).await; + let proxy = self.proxy.clone(); + let api_handle = self.api_availability.clone(); + let result = retry_future_n( + move || proxy.get_expiry(token.clone()), + move |result| Self::should_retry(result, &api_handle), + constant_interval(RETRY_ACTION_INTERVAL), + RETRY_ACTION_MAX_RETRIES, + ) + .await; if handle_expiry_result_inner(&result, &self.api_availability) { self.initial_check_abort_handle.abort(); } @@ -37,13 +75,28 @@ impl AccountHandle { account_token: AccountToken, voucher: String, ) -> Result<VoucherSubmission, rest::Error> { - let result = self.proxy.submit_voucher(account_token, voucher).await; + let mut proxy = self.proxy.clone(); + let api_handle = self.api_availability.clone(); + let result = retry_future_n( + move || proxy.submit_voucher(account_token.clone(), voucher.clone()), + move |result| Self::should_retry(result, &api_handle), + constant_interval(RETRY_ACTION_INTERVAL), + RETRY_ACTION_MAX_RETRIES, + ) + .await; if result.is_ok() { self.initial_check_abort_handle.abort(); self.api_availability.resume(); } result } + + fn should_retry<T>(result: &Result<T, RestError>, api_handle: &ApiAvailabilityHandle) -> bool { + match result { + Err(error) if error.is_network_error() => !api_handle.get_state().is_offline(), + _ => false, + } + } } impl Account { @@ -68,8 +121,11 @@ impl Account { }; let retry_strategy = Jittered::jitter( - ExponentialBackoff::new(RETRY_INTERVAL_INITIAL, RETRY_INTERVAL_FACTOR) - .max_delay(RETRY_INTERVAL_MAX), + ExponentialBackoff::new( + RETRY_EXPIRY_CHECK_INTERVAL_INITIAL, + RETRY_EXPIRY_CHECK_INTERVAL_FACTOR, + ) + .max_delay(RETRY_EXPIRY_CHECK_INTERVAL_MAX), ); let future_generator = move || { let wait_online = api_availability.wait_online(); @@ -81,9 +137,7 @@ impl Account { } }; let should_retry = move |state_was_updated: &bool| -> bool { !*state_was_updated }; - let retry_future = - retry_future_with_backoff(future_generator, should_retry, retry_strategy); - retry_future.await; + retry_future(future_generator, should_retry, retry_strategy).await; }); runtime.spawn(future); diff --git a/mullvad-daemon/src/lib.rs b/mullvad-daemon/src/lib.rs index a7493f1d82..15733d2f29 100644 --- a/mullvad-daemon/src/lib.rs +++ b/mullvad-daemon/src/lib.rs @@ -1452,8 +1452,7 @@ where async fn on_create_new_account(&mut self, tx: ResponseTx<String, Error>) { let daemon_tx = self.tx.clone(); - let future = self.account.proxy.create_account(); - + let future = self.account.create_account(); tokio::spawn(async move { match future.await { Ok(account_token) => { @@ -1484,7 +1483,7 @@ where async fn on_get_www_auth_token(&mut self, tx: ResponseTx<String, Error>) { if let Some(account_token) = self.settings.get_account_token() { - let future = self.account.proxy.get_www_auth_token(account_token); + let future = self.account.get_www_auth_token(account_token); let rpc_call = async { Self::oneshot_send( tx, @@ -1592,7 +1591,7 @@ where { let remove_key = self .wireguard_key_manager - .remove_key(previous_token, previous_key); + .remove_key_with_backoff(previous_token, previous_key); tokio::spawn(async move { if let Err(error) = remove_key.await { log::error!( diff --git a/mullvad-daemon/src/relays.rs b/mullvad-daemon/src/relays.rs index 607724a534..9811864630 100644 --- a/mullvad-daemon/src/relays.rs +++ b/mullvad-daemon/src/relays.rs @@ -29,7 +29,7 @@ use std::{ sync::Arc, time::{self, Duration, Instant, SystemTime}, }; -use talpid_core::future_retry::{retry_future_with_backoff, ExponentialBackoff, Jittered}; +use talpid_core::future_retry::{retry_future, ExponentialBackoff, Jittered}; use talpid_types::{ net::{ all_of_the_internet, openvpn::ProxySettings, wireguard, IpVersion, TransportProtocol, @@ -1141,7 +1141,7 @@ impl RelayListUpdater { ExponentialBackoff::new(EXPONENTIAL_BACKOFF_INITIAL, EXPONENTIAL_BACKOFF_FACTOR) .max_delay(UPDATE_INTERVAL * 2); - let download_future = retry_future_with_backoff( + let download_future = retry_future( download_futures, |result| result.is_err(), Jittered::jitter(exponential_backoff), diff --git a/mullvad-daemon/src/version_check.rs b/mullvad-daemon/src/version_check.rs index 44259eb1c6..dfcc82b6e8 100644 --- a/mullvad-daemon/src/version_check.rs +++ b/mullvad-daemon/src/version_check.rs @@ -38,7 +38,7 @@ const UPDATE_INTERVAL: Duration = Duration::from_secs(60 * 60 * 24); /// Wait this long until next try if an update failed const UPDATE_INTERVAL_ERROR: Duration = Duration::from_secs(60 * 60 * 6); /// Retry interval for `RunVersionCheck`. -const IMMEDIATE_UPDATE_INTERVAL_ERROR: Duration = Duration::from_secs(1); +const IMMEDIATE_UPDATE_INTERVAL_ERROR: Duration = Duration::ZERO; const IMMEDIATE_UPDATE_MAX_RETRIES: usize = 2; #[cfg(target_os = "linux")] @@ -204,13 +204,26 @@ impl VersionUpdater { .map_err(Error::Download) }; - Box::pin(talpid_core::future_retry::retry_future_with_backoff( + Box::pin(talpid_core::future_retry::retry_future_n( download_future_factory, - move |result| result.is_err() && !api_handle.get_state().is_offline(), - std::iter::repeat(IMMEDIATE_UPDATE_INTERVAL_ERROR).take(IMMEDIATE_UPDATE_MAX_RETRIES), + move |result| Self::should_retry_immediate(result, &api_handle), + std::iter::repeat(IMMEDIATE_UPDATE_INTERVAL_ERROR), + IMMEDIATE_UPDATE_MAX_RETRIES, )) } + fn should_retry_immediate<T>( + result: &Result<T, Error>, + api_handle: &ApiAvailabilityHandle, + ) -> bool { + match result { + Err(Error::Download(error)) if error.is_network_error() => { + !api_handle.get_state().is_offline() + } + _ => false, + } + } + fn create_update_background_future( &self, ) -> std::pin::Pin< @@ -232,7 +245,7 @@ impl VersionUpdater { } }; - Box::pin(talpid_core::future_retry::retry_future_with_backoff( + Box::pin(talpid_core::future_retry::retry_future( download_future_factory, |result| result.is_err(), std::iter::repeat(UPDATE_INTERVAL_ERROR), diff --git a/mullvad-daemon/src/wireguard.rs b/mullvad-daemon/src/wireguard.rs index c0c260de29..8f0eacd454 100644 --- a/mullvad-daemon/src/wireguard.rs +++ b/mullvad-daemon/src/wireguard.rs @@ -9,8 +9,10 @@ pub use mullvad_types::wireguard::*; use std::{future::Future, pin::Pin, time::Duration}; use futures::future::{abortable, AbortHandle}; +#[cfg(not(target_os = "android"))] +use talpid_core::future_retry::constant_interval; use talpid_core::{ - future_retry::{retry_future_with_backoff, ExponentialBackoff, Jittered}, + future_retry::{retry_future, retry_future_n, ExponentialBackoff, Jittered}, mpsc::Sender, }; @@ -30,6 +32,11 @@ const RETRY_INTERVAL_INITIAL: Duration = Duration::from_secs(4); const RETRY_INTERVAL_FACTOR: u32 = 5; const RETRY_INTERVAL_MAX: Duration = Duration::from_secs(24 * 60 * 60); +#[cfg(not(target_os = "android"))] +const SHORT_RETRY_INTERVAL: Duration = Duration::ZERO; + +const MAX_KEY_REMOVAL_RETRIES: usize = 2; + #[derive(err_derive::Error, Debug)] pub enum Error { #[error(display = "Unexpected HTTP request error")] @@ -138,17 +145,61 @@ impl KeyManager { } /// Removes a key from an account + #[cfg(not(target_os = "android"))] pub fn remove_key( &self, account: AccountToken, key: talpid_types::net::wireguard::PublicKey, ) -> impl Future<Output = Result<()>> { + self.remove_key_inner(account, key, constant_interval(SHORT_RETRY_INTERVAL), false) + } + + /// Removes a key from an account + pub fn remove_key_with_backoff( + &self, + account: AccountToken, + key: talpid_types::net::wireguard::PublicKey, + ) -> impl Future<Output = Result<()>> { + let retry_strategy = Jittered::jitter( + ExponentialBackoff::new(RETRY_INTERVAL_INITIAL, RETRY_INTERVAL_FACTOR) + .max_delay(RETRY_INTERVAL_MAX), + ); + self.remove_key_inner(account, key, retry_strategy, true) + } + + fn remove_key_inner<D: Iterator<Item = Duration> + 'static>( + &self, + account: AccountToken, + key: talpid_types::net::wireguard::PublicKey, + retry_strategy: D, + offline_check: bool, + ) -> impl Future<Output = Result<()>> { let mut rpc = mullvad_rpc::WireguardKeyProxy::new(self.http_handle.clone()); - async move { - rpc.remove_wireguard_key(account, &key) - .await - .map_err(Self::map_rpc_error) - } + let api_handle = self.availability_handle.clone(); + let api_handle_2 = api_handle.clone(); + let future = retry_future_n( + move || { + let remove_key = rpc.remove_wireguard_key(account.clone(), key.clone()); + let wait_future = api_handle.wait_online(); + async move { + if offline_check { + let _ = wait_future.await; + } + remove_key.await + } + }, + move |result| match result { + Ok(_) => false, + Err(error) => Self::should_retry_removal(error, &api_handle_2), + }, + retry_strategy, + MAX_KEY_REMOVAL_RETRIES, + ); + async move { future.await.map_err(Self::map_rpc_error) } + } + + fn should_retry_removal(error: &RestError, api_handle: &ApiAvailabilityHandle) -> bool { + error.is_network_error() && !api_handle.get_state().is_offline() } fn should_retry(error: &RestError) -> bool { @@ -218,8 +269,7 @@ impl KeyManager { } }; - let upload_future = - retry_future_with_backoff(future_generator, should_retry, retry_strategy); + let upload_future = retry_future(future_generator, should_retry, retry_strategy); let (cancellable_upload, abort_handle) = abortable(Box::pin(upload_future)); @@ -417,7 +467,7 @@ impl KeyManager { } }; - retry_future_with_backoff( + retry_future( move || rotate_key.clone()(&old_key), should_retry, retry_strategy, diff --git a/mullvad-problem-report/src/lib.rs b/mullvad-problem-report/src/lib.rs index 26cbcb4fb1..e6312062ff 100644 --- a/mullvad-problem-report/src/lib.rs +++ b/mullvad-problem-report/src/lib.rs @@ -10,7 +10,6 @@ use std::{ fs::{self, File}, io::{self, BufWriter, Read, Seek, SeekFrom, Write}, path::{Path, PathBuf}, - time::Duration, }; use talpid_types::ErrorExt; @@ -35,7 +34,6 @@ const LINE_SEPARATOR: &str = "\n"; const LINE_SEPARATOR: &str = "\r\n"; const MAX_SEND_ATTEMPTS: usize = 3; -const RETRY_INTERVAL: Duration = Duration::from_millis(500); /// Custom macro to write a line to an output formatter that uses platform-specific newline /// character sequences. @@ -306,10 +304,12 @@ pub fn send_problem_report( eprintln!( "{}", error.display_chain_with_msg("Failed to send problem report") - ) + ); + if !error.is_network_error() { + break; + } } } - tokio::time::sleep(RETRY_INTERVAL).await; } Err(Error::SendProblemReportError) }) diff --git a/mullvad-rpc/src/lib.rs b/mullvad-rpc/src/lib.rs index 098ed2f0b4..883128f8aa 100644 --- a/mullvad-rpc/src/lib.rs +++ b/mullvad-rpc/src/lib.rs @@ -492,14 +492,13 @@ impl WireguardKeyProxy { rest::deserialize_body(response).await } - pub async fn remove_wireguard_key( + pub fn remove_wireguard_key( &mut self, account_token: AccountToken, - key: &wireguard::PublicKey, - ) -> Result<(), rest::Error> { + key: wireguard::PublicKey, + ) -> impl Future<Output = Result<(), rest::Error>> { let service = self.handle.service.clone(); - - let _ = rest::send_request( + let future = rest::send_request( &self.handle.factory, service, &format!( @@ -509,9 +508,11 @@ impl WireguardKeyProxy { Method::DELETE, Some(account_token), StatusCode::NO_CONTENT, - ) - .await?; - Ok(()) + ); + async move { + let _ = future.await?; + Ok(()) + } } } diff --git a/mullvad-rpc/src/rest.rs b/mullvad-rpc/src/rest.rs index 77bf06fd55..779da6ccf5 100644 --- a/mullvad-rpc/src/rest.rs +++ b/mullvad-rpc/src/rest.rs @@ -75,6 +75,15 @@ pub enum Error { UriError(#[error(source)] http::uri::InvalidUri), } +impl Error { + pub fn is_network_error(&self) -> bool { + match self { + Error::HyperError(_) | Error::TimeoutError(_) => true, + _ => false, + } + } +} + /// A service that executes HTTP requests, allowing for on-demand termination of all in-flight /// requests pub(crate) struct RequestService { @@ -148,30 +157,27 @@ impl RequestService { let response = flatten_result(flatten_result(response)); if let Some(host_addr) = host_addr { if let Err(err) = &response { - match err { - Error::HyperError(_) | Error::TimeoutError(_) => { - log::error!( - "{}", - err.display_chain_with_msg("HTTP request failed") - ); - if !api_availability.get_state().is_offline() { - let current_address = address_cache.peek_address(); - if current_address == host_addr - && address_cache.has_tried_current_address() - { - handle.spawn(async move { - address_cache.select_new_address().await; - let new_address = address_cache.peek_address(); - log::error!( - "Request failed using address {}. Trying next API address: {}", - current_address, - new_address, - ); - }); - } + if err.is_network_error() { + log::error!( + "{}", + err.display_chain_with_msg("HTTP request failed") + ); + if !api_availability.get_state().is_offline() { + let current_address = address_cache.peek_address(); + if current_address == host_addr + && address_cache.has_tried_current_address() + { + handle.spawn(async move { + address_cache.select_new_address().await; + let new_address = address_cache.peek_address(); + log::error!( + "Request failed using address {}. Trying next API address: {}", + current_address, + new_address, + ); + }); } } - _ => (), } } } diff --git a/mullvad-setup/src/main.rs b/mullvad-setup/src/main.rs index 5f88e3456f..396d456afa 100644 --- a/mullvad-setup/src/main.rs +++ b/mullvad-setup/src/main.rs @@ -2,8 +2,11 @@ use clap::{crate_authors, crate_description, crate_name, SubCommand}; use mullvad_management_interface::new_rpc_client; use mullvad_rpc::MullvadRpcRuntime; use mullvad_types::version::ParsedAppVersion; -use std::{path::PathBuf, process}; -use talpid_core::firewall::{self, Firewall, FirewallArguments}; +use std::{path::PathBuf, process, time::Duration}; +use talpid_core::{ + firewall::{self, Firewall, FirewallArguments}, + future_retry::{constant_interval, retry_future_n}, +}; use talpid_types::ErrorExt; pub const PRODUCT_VERSION: &str = include_str!(concat!(env!("OUT_DIR"), "/product-version.txt")); @@ -13,6 +16,9 @@ lazy_static::lazy_static! { static ref IS_DEV_BUILD: bool = APP_VERSION.is_dev(); } +const KEY_RETRY_INTERVAL: Duration = Duration::ZERO; +const KEY_RETRY_MAX_RETRIES: usize = 2; + #[repr(i32)] enum ExitStatus { Ok = 0, @@ -178,10 +184,19 @@ async fn remove_wireguard_key() -> Result<(), Error> { .map_err(Error::RpcInitializationError)?; let mut key_proxy = mullvad_rpc::WireguardKeyProxy::new(rpc_runtime.mullvad_rest_handle()); - key_proxy - .remove_wireguard_key(token, &wg_data.private_key.public_key()) - .await - .map_err(Error::RemoveKeyError)?; + retry_future_n( + move || { + key_proxy.remove_wireguard_key(token.clone(), wg_data.private_key.public_key()) + }, + move |result| match result { + Err(error) => error.is_network_error(), + _ => false, + }, + constant_interval(KEY_RETRY_INTERVAL), + KEY_RETRY_MAX_RETRIES, + ) + .await + .map_err(Error::RemoveKeyError)?; settings .set_wireguard(None) .await diff --git a/talpid-core/src/future_retry.rs b/talpid-core/src/future_retry.rs index 0e494cc96c..3c640a9037 100644 --- a/talpid-core/src/future_retry.rs +++ b/talpid-core/src/future_retry.rs @@ -5,8 +5,26 @@ use std::{future::Future, time::Duration}; /// required - run a timer for 60 seconds until a delay is shorter than 5 minutes. const MAX_SINGLE_DELAY: Duration = Duration::from_secs(5 * 60); -/// Retries a future until it should stop as determined by the retry function. -pub async fn retry_future_with_backoff< +/// Convenience function that works like [`retry_future`] but limits the number +/// of retries to `max_retries`. +pub async fn retry_future_n< + F: FnMut() -> O + 'static, + R: FnMut(&T) -> bool + 'static, + D: Iterator<Item = Duration> + 'static, + O: Future<Output = T>, + T, +>( + factory: F, + should_retry: R, + delays: D, + max_retries: usize, +) -> T { + retry_future(factory, should_retry, delays.take(max_retries)).await +} + +/// Retries a future until it should stop as determined by the retry function, or when +/// the iterator returns `None`. +pub async fn retry_future< F: FnMut() -> O + 'static, R: FnMut(&T) -> bool + 'static, D: Iterator<Item = Duration> + 'static, @@ -22,15 +40,18 @@ pub async fn retry_future_with_backoff< if should_retry(¤t_result) { if let Some(delay) = delays.next() { sleep(delay).await; - } else { - return current_result; + continue; } - } else { - return current_result; } + return current_result; } } +/// Returns an iterator that repeats the same interval. +pub fn constant_interval(interval: Duration) -> impl Iterator<Item = Duration> { + std::iter::repeat(interval) +} + async fn sleep(mut delay: Duration) { while delay > MAX_SINGLE_DELAY { delay -= MAX_SINGLE_DELAY; |
