summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorEmīls <emils@mullvad.net>2020-07-29 12:25:32 +0100
committerEmīls <emils@mullvad.net>2020-07-29 12:25:32 +0100
commitcdbef6f3d5d6087fd481d38d137b38f9e2ce586c (patch)
tree16e9a4c49e4ad05e6a9b938b4e98e32cc5196cd1
parent7c5b996b1d7ca1565641ca6e465c4ea8d7135461 (diff)
parent6b6be962b999975d39abfdd353617ea434391534 (diff)
downloadmullvadvpn-cdbef6f3d5d6087fd481d38d137b38f9e2ce586c.tar.xz
mullvadvpn-cdbef6f3d5d6087fd481d38d137b38f9e2ce586c.zip
Merge branch 'add-exponential-backoff-for-relay-list'
-rw-r--r--CHANGELOG.md1
-rw-r--r--mullvad-daemon/src/lib.rs14
-rw-r--r--mullvad-daemon/src/relays.rs229
-rw-r--r--mullvad-rpc/src/bin/relay_list.rs3
-rw-r--r--mullvad-rpc/src/relay_list.rs8
5 files changed, 159 insertions, 96 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index ac571d49f6..23a588d85b 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -38,6 +38,7 @@ Line wrap the file at 100 chars. Th
- Make connectivity checker more resilient to suspension.
- Make uninstaller on desktop platforms attempt to remove WireGuard keys from accounts.
- Make important notifications not timeout on macOS and remain in the notification list on Linux.
+- Add exponential backoff to relay list downloader.
#### Android
- Show a system notification when the account time will soon run out.
diff --git a/mullvad-daemon/src/lib.rs b/mullvad-daemon/src/lib.rs
index 997770f8de..4dc7dcfb47 100644
--- a/mullvad-daemon/src/lib.rs
+++ b/mullvad-daemon/src/lib.rs
@@ -508,7 +508,7 @@ where
let on_relay_list_update = move |relay_list: &RelayList| {
relay_list_listener.notify_relay_list(relay_list.clone());
};
- let relay_selector = relays::RelaySelector::new(
+ let mut relay_selector = relays::RelaySelector::new(
rpc_handle.clone(),
on_relay_list_update,
&resource_dir,
@@ -578,7 +578,7 @@ where
wireguard::KeyManager::new(internal_event_tx.clone(), rpc_handle.clone());
// Attempt to download a fresh relay list
- relay_selector.update();
+ rpc_runtime.runtime().block_on(relay_selector.update());
let initial_target_state = if settings.get_account_token().is_some() {
if settings.auto_connect {
@@ -972,6 +972,13 @@ where
self.rpc_runtime.runtime().spawn(fut);
}
+ fn block_on_future<F>(&mut self, fut: F) -> F::Output
+ where
+ F: std::future::Future,
+ {
+ self.rpc_runtime.runtime().block_on(fut)
+ }
+
fn handle_command(&mut self, command: DaemonCommand) {
use self::DaemonCommand::*;
@@ -1272,7 +1279,8 @@ where
}
fn on_update_relay_locations(&mut self) {
- self.relay_selector.update();
+ let update_future = self.relay_selector.update();
+ self.block_on_future(update_future);
}
fn on_set_account(&mut self, tx: oneshot::Sender<()>, account_token: Option<String>) {
diff --git a/mullvad-daemon/src/relays.rs b/mullvad-daemon/src/relays.rs
index 320994c743..4f10f6e3fd 100644
--- a/mullvad-daemon/src/relays.rs
+++ b/mullvad-daemon/src/relays.rs
@@ -2,7 +2,12 @@
//! updated as well.
use chrono::{DateTime, Local};
-use futures01::Future;
+use futures::{
+ channel::mpsc,
+ future::{Fuse, FusedFuture},
+ FutureExt, SinkExt, StreamExt,
+};
+use log::{debug, error, info, warn};
use mullvad_rpc::{rest::MullvadRestHandle, RelayListProxy};
use mullvad_types::{
endpoint::MullvadEndpoint,
@@ -14,27 +19,24 @@ use mullvad_types::{
relay_list::{OpenVpnEndpointData, Relay, RelayList, RelayTunnels, WireguardEndpointData},
};
use parking_lot::Mutex;
+use rand::{self, rngs::ThreadRng, seq::SliceRandom, Rng};
use std::{
- fs::File,
+ future::Future,
io,
net::{IpAddr, SocketAddr},
path::{Path, PathBuf},
- sync::{mpsc, Arc},
- thread,
- time::{self, Duration, SystemTime},
+ sync::Arc,
+ time::{self, Duration, Instant, SystemTime},
};
+use talpid_core::future_retry::{retry_future_with_backoff, ExponentialBackoff, Jittered};
use talpid_types::{
net::{all_of_the_internet, openvpn::ProxySettings, wireguard, TransportProtocol, TunnelType},
ErrorExt,
};
-
-use log::{debug, error, info, warn};
-use rand::{self, rngs::ThreadRng, seq::SliceRandom, Rng};
-use tokio_timer::{TimeoutError, Timer};
+use tokio02::fs::File;
const DATE_TIME_FORMAT_STR: &str = "%Y-%m-%d %H:%M:%S%.3f";
const RELAYS_FILENAME: &str = "relays.json";
-const DOWNLOAD_TIMEOUT: Duration = Duration::from_secs(15);
/// How often the updater should wake up to check the cache of the in-memory cache of relays.
/// This check is very cheap. The only reason to not have it very often is because if downloading
/// constantly fails it will try very often and fill the logs etc.
@@ -42,32 +44,27 @@ const UPDATE_CHECK_INTERVAL: Duration = Duration::from_secs(60 * 15);
/// How old the cached relays need to be to trigger an update
const UPDATE_INTERVAL: Duration = Duration::from_secs(60 * 60);
+/// First delay of exponential backoff in milliseconds
+const EXPONENTIAL_BACKOFF_DELAY_MS: u64 = 30;
+const EXPONENTIAL_BACKOFF_FACTOR: u64 = 2000;
+
#[derive(err_derive::Error, Debug)]
#[error(no_from)]
pub enum Error {
- #[error(display = "Failed to open relay cache file for reading")]
- ReadCachedRelays(#[error(source)] io::Error),
+ #[error(display = "Failed to open relay cache file")]
+ OpenRelayCache(#[error(source)] io::Error),
- #[error(display = "Failed to open relay cache file for writing")]
+ #[error(display = "Failed to write relay cache file to disk")]
WriteRelayCache(#[error(source)] io::Error),
- #[error(display = "Failed to download the list of relays")]
- Download(#[error(source)] mullvad_rpc::rest::Error),
-
- #[error(display = "Timed out when trying to download the list of relays")]
- DownloadTimeout,
-
#[error(display = "No relays matching current constraints")]
NoRelay,
#[error(display = "Failure in serialization of the relay list")]
Serialize(#[error(source)] serde_json::Error),
-}
-impl<F> From<TimeoutError<F>> for Error {
- fn from(_: TimeoutError<F>) -> Error {
- Error::DownloadTimeout
- }
+ #[error(display = "Downloader already shut down")]
+ DownloaderShutDown,
}
struct ParsedRelays {
@@ -119,15 +116,15 @@ impl ParsedRelays {
pub fn from_file(path: impl AsRef<Path>) -> Result<Self, Error> {
debug!("Reading relays from {}", path.as_ref().display());
let (last_modified, file) =
- Self::open_file(path.as_ref()).map_err(Error::ReadCachedRelays)?;
+ Self::open_file(path.as_ref()).map_err(Error::OpenRelayCache)?;
let relay_list =
serde_json::from_reader(io::BufReader::new(file)).map_err(Error::Serialize)?;
Ok(Self::from_relay_list(relay_list, last_modified))
}
- fn open_file(path: &Path) -> io::Result<(SystemTime, File)> {
- let file = File::open(path)?;
+ fn open_file(path: &Path) -> io::Result<(SystemTime, std::fs::File)> {
+ let file = std::fs::File::open(path)?;
let last_modified = file.metadata()?.modified()?;
Ok((last_modified, file))
}
@@ -177,12 +174,15 @@ impl RelaySelector {
.format(DATE_TIME_FORMAT_STR)
);
let parsed_relays = Arc::new(Mutex::new(unsynchronized_parsed_relays));
- let updater = RelayListUpdater::spawn(
+
+ let updater = RelayListUpdater::new(
rpc_handle,
cache_path,
parsed_relays.clone(),
Box::new(on_update),
);
+
+
RelaySelector {
parsed_relays,
rng: rand::thread_rng(),
@@ -191,10 +191,14 @@ impl RelaySelector {
}
/// Download the newest relay list.
- pub fn update(&self) {
- self.updater
- .send(())
- .expect("Relay list updated thread has stopped unexpectedly");
+ pub fn update(&mut self) -> impl Future<Output = ()> {
+ let mut updater = self.updater.clone();
+ async move {
+ updater
+ .update_relay_list()
+ .await
+ .expect("Relay list updated thread has stopped unexpectedly");
+ }
}
/// Returns all countries and cities. The cities in the object returned does not have any
@@ -776,75 +780,113 @@ impl RelaySelector {
}
}
-type RelayListUpdaterHandle = mpsc::Sender<()>;
+#[derive(Clone)]
+pub struct RelayListUpdaterHandle {
+ tx: mpsc::Sender<()>,
+}
+
+impl RelayListUpdaterHandle {
+ async fn update_relay_list(&mut self) -> Result<(), Error> {
+ self.tx
+ .send(())
+ .await
+ .map_err(|_| Error::DownloaderShutDown)
+ }
+}
struct RelayListUpdater {
rpc_client: RelayListProxy,
cache_path: PathBuf,
parsed_relays: Arc<Mutex<ParsedRelays>>,
- on_update: Box<dyn Fn(&RelayList)>,
- close_handle: mpsc::Receiver<()>,
+ on_update: Box<dyn Fn(&RelayList) + Send + 'static>,
+ earliest_next_try: Instant,
}
impl RelayListUpdater {
- pub fn spawn(
+ pub fn new(
rpc_handle: MullvadRestHandle,
cache_path: PathBuf,
parsed_relays: Arc<Mutex<ParsedRelays>>,
on_update: Box<dyn Fn(&RelayList) + Send + 'static>,
) -> RelayListUpdaterHandle {
- let (tx, rx) = mpsc::channel();
-
- thread::spawn(move || {
- Self::new(rpc_handle, cache_path, parsed_relays, on_update, rx).run()
- });
-
- tx
- }
-
- fn new(
- rpc_handle: MullvadRestHandle,
- cache_path: PathBuf,
- parsed_relays: Arc<Mutex<ParsedRelays>>,
- on_update: Box<dyn Fn(&RelayList)>,
- close_handle: mpsc::Receiver<()>,
- ) -> Self {
+ let (tx, cmd_rx) = mpsc::channel(1);
+ let service = rpc_handle.service();
let rpc_client = RelayListProxy::new(rpc_handle);
-
- RelayListUpdater {
+ let updater = RelayListUpdater {
rpc_client,
cache_path,
parsed_relays,
on_update,
- close_handle,
- }
+ earliest_next_try: Instant::now() + UPDATE_INTERVAL,
+ };
+
+ service.spawn(updater.run(cmd_rx));
+
+ RelayListUpdaterHandle { tx }
}
- fn run(&mut self) {
- debug!("Starting relay list updater thread");
+ async fn run(mut self, mut cmd_rx: mpsc::Receiver<()>) {
loop {
- let should_update = match self.close_handle.recv_timeout(UPDATE_CHECK_INTERVAL) {
- // Someone sent an explicit update command
- Ok(()) => true,
- // Normal timeout, check cache age
- Err(mpsc::RecvTimeoutError::Timeout) => self.should_update(),
- // We have been canceled
- Err(mpsc::RecvTimeoutError::Disconnected) => break,
+ let mut check_interval = tokio02::time::interval(UPDATE_CHECK_INTERVAL).fuse();
+ let mut download_future = Box::pin(Fuse::terminated());
+
+
+ futures::select! {
+ _check_update = check_interval.next() => {
+ if !download_future.is_terminated() && self.should_update() {
+ download_future = Box::pin(Self::download_relay_list(self.rpc_client.clone()).fuse());
+ self.earliest_next_try = Instant::now() + UPDATE_INTERVAL;
+ }
+ },
+
+ new_relay_list = download_future => {
+ self.consume_new_relay_list(new_relay_list).await;
+
+ },
+
+ cmd = cmd_rx.next() => {
+ match cmd {
+ Some(_) => {
+ self.consume_new_relay_list(self.rpc_client.relay_list().await).await;
+ },
+ None => {
+ log::error!("Relay list updater shutting down");
+ return;
+ }
+ }
+ }
+
};
- if should_update {
- match self.update() {
- Ok(()) => info!("Updated list of relays"),
- Err(error) => error!("{}", error.display_chain()),
+ }
+ }
+
+ async fn consume_new_relay_list(
+ &mut self,
+ result: Result<RelayList, mullvad_rpc::rest::Error>,
+ ) {
+ match result {
+ Ok(relay_list) => {
+ if let Err(err) = self.update_cache(relay_list).await {
+ log::error!("Failed to update relay list cache: {}", err);
}
}
+ Err(err) => {
+ log::error!(
+ "Failed to fetch new relay list: {}. Will retry in {} minutes",
+ err,
+ self.earliest_next_try
+ .saturating_duration_since(Instant::now())
+ .as_secs()
+ / 60
+ );
+ }
}
- debug!("Relay list updater thread has finished");
}
/// Returns true if the current parsed_relays is older than UPDATE_INTERVAL
fn should_update(&mut self) -> bool {
match SystemTime::now().duration_since(self.parsed_relays.lock().last_updated()) {
- Ok(duration) => duration > UPDATE_INTERVAL,
+ Ok(duration) => duration > UPDATE_INTERVAL && self.earliest_next_try <= Instant::now(),
// If the clock is skewed we have no idea by how much or when the last update
// actually was, better download again to get in sync and get a `last_updated`
// timestamp corresponding to the new time.
@@ -852,10 +894,25 @@ impl RelayListUpdater {
}
}
- fn update(&mut self) -> Result<(), Error> {
- let new_relay_list = self.download_relay_list()?;
+ fn download_relay_list(
+ rpc_handle: RelayListProxy,
+ ) -> impl Future<Output = Result<RelayList, mullvad_rpc::rest::Error>> + 'static {
+ let download_futures = move || rpc_handle.relay_list();
+
+ let exponential_backoff = ExponentialBackoff::from_millis(EXPONENTIAL_BACKOFF_DELAY_MS)
+ .factor(EXPONENTIAL_BACKOFF_FACTOR)
+ .max_delay(UPDATE_INTERVAL * 2);
- if let Err(error) = self.cache_relays(&new_relay_list) {
+ let download_future = retry_future_with_backoff(
+ download_futures,
+ |result| result.is_err(),
+ Jittered::jitter(exponential_backoff),
+ );
+ download_future
+ }
+
+ async fn update_cache(&mut self, new_relay_list: RelayList) -> Result<(), Error> {
+ if let Err(error) = Self::cache_relays(&self.cache_path, &new_relay_list).await {
error!(
"{}",
error.display_chain_with_msg("Failed to update relay cache on disk")
@@ -874,19 +931,17 @@ impl RelayListUpdater {
Ok(())
}
- fn download_relay_list(&mut self) -> Result<RelayList, Error> {
- let download_future = self.rpc_client.relay_list().map_err(Error::Download);
- let relay_list = Timer::default()
- .timeout(download_future, DOWNLOAD_TIMEOUT)
- .wait()?;
-
- Ok(relay_list)
- }
-
/// Write a `RelayList` to the cache file.
- fn cache_relays(&self, relays: &RelayList) -> Result<(), Error> {
- debug!("Writing relays cache to {}", self.cache_path.display());
- let file = File::create(&self.cache_path).map_err(Error::WriteRelayCache)?;
- serde_json::to_writer_pretty(io::BufWriter::new(file), relays).map_err(Error::Serialize)
+ async fn cache_relays(cache_path: &Path, relays: &RelayList) -> Result<(), Error> {
+ debug!("Writing relays cache to {}", cache_path.display());
+ let mut file = File::create(cache_path)
+ .await
+ .map_err(Error::OpenRelayCache)?;
+ let bytes = serde_json::to_vec_pretty(relays).map_err(Error::Serialize)?;
+ let mut slice: &[u8] = bytes.as_slice();
+ let _ = tokio02::io::copy(&mut slice, &mut file)
+ .await
+ .map_err(Error::WriteRelayCache)?;
+ Ok(())
}
}
diff --git a/mullvad-rpc/src/bin/relay_list.rs b/mullvad-rpc/src/bin/relay_list.rs
index 71c9c30917..27601d4b21 100644
--- a/mullvad-rpc/src/bin/relay_list.rs
+++ b/mullvad-rpc/src/bin/relay_list.rs
@@ -1,6 +1,5 @@
/// Intended to be used to pre-load a relay list when creating an installer for the Mullvad VPN
/// app.
-use futures01::future::Future;
use mullvad_rpc::{rest::Error as RestError, MullvadRpcRuntime, RelayListProxy};
use std::process;
use talpid_types::ErrorExt;
@@ -10,7 +9,7 @@ fn main() {
let relay_list_request = RelayListProxy::new(runtime.mullvad_rest_handle()).relay_list();
- let relay_list = match relay_list_request.wait() {
+ let relay_list = match runtime.runtime().block_on(relay_list_request) {
Ok(relay_list) => relay_list,
Err(RestError::TimeoutError(_)) => {
eprintln!("Request timed out");
diff --git a/mullvad-rpc/src/relay_list.rs b/mullvad-rpc/src/relay_list.rs
index 5e98d4f2c3..b6784d3fda 100644
--- a/mullvad-rpc/src/relay_list.rs
+++ b/mullvad-rpc/src/relay_list.rs
@@ -7,11 +7,13 @@ use talpid_types::net::wireguard;
use std::{
collections::BTreeMap,
+ future::Future,
net::{Ipv4Addr, Ipv6Addr},
time::Duration,
};
/// Fetches relay list from https://api.mullvad.net/v1/relays
+#[derive(Clone)]
pub struct RelayListProxy {
handle: rest::MullvadRestHandle,
}
@@ -25,9 +27,7 @@ impl RelayListProxy {
}
/// Fetch the relay list
- pub fn relay_list(
- &self,
- ) -> impl futures01::future::Future<Item = relay_list::RelayList, Error = rest::Error> {
+ pub fn relay_list(&self) -> impl Future<Output = Result<relay_list::RelayList, rest::Error>> {
let service = self.handle.service.clone();
let request = self.handle.factory.request("/v1/relays", Method::GET);
@@ -43,7 +43,7 @@ impl RelayListProxy {
.await?
.into_relay_list())
};
- self.handle.service.compat_spawn(future)
+ future
}
}