diff options
| -rw-r--r-- | CHANGELOG.md | 1 | ||||
| -rw-r--r-- | mullvad-daemon/src/lib.rs | 14 | ||||
| -rw-r--r-- | mullvad-daemon/src/relays.rs | 229 | ||||
| -rw-r--r-- | mullvad-rpc/src/bin/relay_list.rs | 3 | ||||
| -rw-r--r-- | mullvad-rpc/src/relay_list.rs | 8 |
5 files changed, 159 insertions, 96 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index ac571d49f6..23a588d85b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,6 +38,7 @@ Line wrap the file at 100 chars. Th - Make connectivity checker more resilient to suspension. - Make uninstaller on desktop platforms attempt to remove WireGuard keys from accounts. - Make important notifications not timeout on macOS and remain in the notification list on Linux. +- Add exponential backoff to relay list downloader. #### Android - Show a system notification when the account time will soon run out. diff --git a/mullvad-daemon/src/lib.rs b/mullvad-daemon/src/lib.rs index 997770f8de..4dc7dcfb47 100644 --- a/mullvad-daemon/src/lib.rs +++ b/mullvad-daemon/src/lib.rs @@ -508,7 +508,7 @@ where let on_relay_list_update = move |relay_list: &RelayList| { relay_list_listener.notify_relay_list(relay_list.clone()); }; - let relay_selector = relays::RelaySelector::new( + let mut relay_selector = relays::RelaySelector::new( rpc_handle.clone(), on_relay_list_update, &resource_dir, @@ -578,7 +578,7 @@ where wireguard::KeyManager::new(internal_event_tx.clone(), rpc_handle.clone()); // Attempt to download a fresh relay list - relay_selector.update(); + rpc_runtime.runtime().block_on(relay_selector.update()); let initial_target_state = if settings.get_account_token().is_some() { if settings.auto_connect { @@ -972,6 +972,13 @@ where self.rpc_runtime.runtime().spawn(fut); } + fn block_on_future<F>(&mut self, fut: F) -> F::Output + where + F: std::future::Future, + { + self.rpc_runtime.runtime().block_on(fut) + } + fn handle_command(&mut self, command: DaemonCommand) { use self::DaemonCommand::*; @@ -1272,7 +1279,8 @@ where } fn on_update_relay_locations(&mut self) { - self.relay_selector.update(); + let update_future = self.relay_selector.update(); + self.block_on_future(update_future); } fn on_set_account(&mut self, tx: oneshot::Sender<()>, account_token: Option<String>) { diff --git a/mullvad-daemon/src/relays.rs b/mullvad-daemon/src/relays.rs index 320994c743..4f10f6e3fd 100644 --- a/mullvad-daemon/src/relays.rs +++ b/mullvad-daemon/src/relays.rs @@ -2,7 +2,12 @@ //! updated as well. use chrono::{DateTime, Local}; -use futures01::Future; +use futures::{ + channel::mpsc, + future::{Fuse, FusedFuture}, + FutureExt, SinkExt, StreamExt, +}; +use log::{debug, error, info, warn}; use mullvad_rpc::{rest::MullvadRestHandle, RelayListProxy}; use mullvad_types::{ endpoint::MullvadEndpoint, @@ -14,27 +19,24 @@ use mullvad_types::{ relay_list::{OpenVpnEndpointData, Relay, RelayList, RelayTunnels, WireguardEndpointData}, }; use parking_lot::Mutex; +use rand::{self, rngs::ThreadRng, seq::SliceRandom, Rng}; use std::{ - fs::File, + future::Future, io, net::{IpAddr, SocketAddr}, path::{Path, PathBuf}, - sync::{mpsc, Arc}, - thread, - time::{self, Duration, SystemTime}, + sync::Arc, + time::{self, Duration, Instant, SystemTime}, }; +use talpid_core::future_retry::{retry_future_with_backoff, ExponentialBackoff, Jittered}; use talpid_types::{ net::{all_of_the_internet, openvpn::ProxySettings, wireguard, TransportProtocol, TunnelType}, ErrorExt, }; - -use log::{debug, error, info, warn}; -use rand::{self, rngs::ThreadRng, seq::SliceRandom, Rng}; -use tokio_timer::{TimeoutError, Timer}; +use tokio02::fs::File; const DATE_TIME_FORMAT_STR: &str = "%Y-%m-%d %H:%M:%S%.3f"; const RELAYS_FILENAME: &str = "relays.json"; -const DOWNLOAD_TIMEOUT: Duration = Duration::from_secs(15); /// How often the updater should wake up to check the cache of the in-memory cache of relays. /// This check is very cheap. The only reason to not have it very often is because if downloading /// constantly fails it will try very often and fill the logs etc. @@ -42,32 +44,27 @@ const UPDATE_CHECK_INTERVAL: Duration = Duration::from_secs(60 * 15); /// How old the cached relays need to be to trigger an update const UPDATE_INTERVAL: Duration = Duration::from_secs(60 * 60); +/// First delay of exponential backoff in milliseconds +const EXPONENTIAL_BACKOFF_DELAY_MS: u64 = 30; +const EXPONENTIAL_BACKOFF_FACTOR: u64 = 2000; + #[derive(err_derive::Error, Debug)] #[error(no_from)] pub enum Error { - #[error(display = "Failed to open relay cache file for reading")] - ReadCachedRelays(#[error(source)] io::Error), + #[error(display = "Failed to open relay cache file")] + OpenRelayCache(#[error(source)] io::Error), - #[error(display = "Failed to open relay cache file for writing")] + #[error(display = "Failed to write relay cache file to disk")] WriteRelayCache(#[error(source)] io::Error), - #[error(display = "Failed to download the list of relays")] - Download(#[error(source)] mullvad_rpc::rest::Error), - - #[error(display = "Timed out when trying to download the list of relays")] - DownloadTimeout, - #[error(display = "No relays matching current constraints")] NoRelay, #[error(display = "Failure in serialization of the relay list")] Serialize(#[error(source)] serde_json::Error), -} -impl<F> From<TimeoutError<F>> for Error { - fn from(_: TimeoutError<F>) -> Error { - Error::DownloadTimeout - } + #[error(display = "Downloader already shut down")] + DownloaderShutDown, } struct ParsedRelays { @@ -119,15 +116,15 @@ impl ParsedRelays { pub fn from_file(path: impl AsRef<Path>) -> Result<Self, Error> { debug!("Reading relays from {}", path.as_ref().display()); let (last_modified, file) = - Self::open_file(path.as_ref()).map_err(Error::ReadCachedRelays)?; + Self::open_file(path.as_ref()).map_err(Error::OpenRelayCache)?; let relay_list = serde_json::from_reader(io::BufReader::new(file)).map_err(Error::Serialize)?; Ok(Self::from_relay_list(relay_list, last_modified)) } - fn open_file(path: &Path) -> io::Result<(SystemTime, File)> { - let file = File::open(path)?; + fn open_file(path: &Path) -> io::Result<(SystemTime, std::fs::File)> { + let file = std::fs::File::open(path)?; let last_modified = file.metadata()?.modified()?; Ok((last_modified, file)) } @@ -177,12 +174,15 @@ impl RelaySelector { .format(DATE_TIME_FORMAT_STR) ); let parsed_relays = Arc::new(Mutex::new(unsynchronized_parsed_relays)); - let updater = RelayListUpdater::spawn( + + let updater = RelayListUpdater::new( rpc_handle, cache_path, parsed_relays.clone(), Box::new(on_update), ); + + RelaySelector { parsed_relays, rng: rand::thread_rng(), @@ -191,10 +191,14 @@ impl RelaySelector { } /// Download the newest relay list. - pub fn update(&self) { - self.updater - .send(()) - .expect("Relay list updated thread has stopped unexpectedly"); + pub fn update(&mut self) -> impl Future<Output = ()> { + let mut updater = self.updater.clone(); + async move { + updater + .update_relay_list() + .await + .expect("Relay list updated thread has stopped unexpectedly"); + } } /// Returns all countries and cities. The cities in the object returned does not have any @@ -776,75 +780,113 @@ impl RelaySelector { } } -type RelayListUpdaterHandle = mpsc::Sender<()>; +#[derive(Clone)] +pub struct RelayListUpdaterHandle { + tx: mpsc::Sender<()>, +} + +impl RelayListUpdaterHandle { + async fn update_relay_list(&mut self) -> Result<(), Error> { + self.tx + .send(()) + .await + .map_err(|_| Error::DownloaderShutDown) + } +} struct RelayListUpdater { rpc_client: RelayListProxy, cache_path: PathBuf, parsed_relays: Arc<Mutex<ParsedRelays>>, - on_update: Box<dyn Fn(&RelayList)>, - close_handle: mpsc::Receiver<()>, + on_update: Box<dyn Fn(&RelayList) + Send + 'static>, + earliest_next_try: Instant, } impl RelayListUpdater { - pub fn spawn( + pub fn new( rpc_handle: MullvadRestHandle, cache_path: PathBuf, parsed_relays: Arc<Mutex<ParsedRelays>>, on_update: Box<dyn Fn(&RelayList) + Send + 'static>, ) -> RelayListUpdaterHandle { - let (tx, rx) = mpsc::channel(); - - thread::spawn(move || { - Self::new(rpc_handle, cache_path, parsed_relays, on_update, rx).run() - }); - - tx - } - - fn new( - rpc_handle: MullvadRestHandle, - cache_path: PathBuf, - parsed_relays: Arc<Mutex<ParsedRelays>>, - on_update: Box<dyn Fn(&RelayList)>, - close_handle: mpsc::Receiver<()>, - ) -> Self { + let (tx, cmd_rx) = mpsc::channel(1); + let service = rpc_handle.service(); let rpc_client = RelayListProxy::new(rpc_handle); - - RelayListUpdater { + let updater = RelayListUpdater { rpc_client, cache_path, parsed_relays, on_update, - close_handle, - } + earliest_next_try: Instant::now() + UPDATE_INTERVAL, + }; + + service.spawn(updater.run(cmd_rx)); + + RelayListUpdaterHandle { tx } } - fn run(&mut self) { - debug!("Starting relay list updater thread"); + async fn run(mut self, mut cmd_rx: mpsc::Receiver<()>) { loop { - let should_update = match self.close_handle.recv_timeout(UPDATE_CHECK_INTERVAL) { - // Someone sent an explicit update command - Ok(()) => true, - // Normal timeout, check cache age - Err(mpsc::RecvTimeoutError::Timeout) => self.should_update(), - // We have been canceled - Err(mpsc::RecvTimeoutError::Disconnected) => break, + let mut check_interval = tokio02::time::interval(UPDATE_CHECK_INTERVAL).fuse(); + let mut download_future = Box::pin(Fuse::terminated()); + + + futures::select! { + _check_update = check_interval.next() => { + if !download_future.is_terminated() && self.should_update() { + download_future = Box::pin(Self::download_relay_list(self.rpc_client.clone()).fuse()); + self.earliest_next_try = Instant::now() + UPDATE_INTERVAL; + } + }, + + new_relay_list = download_future => { + self.consume_new_relay_list(new_relay_list).await; + + }, + + cmd = cmd_rx.next() => { + match cmd { + Some(_) => { + self.consume_new_relay_list(self.rpc_client.relay_list().await).await; + }, + None => { + log::error!("Relay list updater shutting down"); + return; + } + } + } + }; - if should_update { - match self.update() { - Ok(()) => info!("Updated list of relays"), - Err(error) => error!("{}", error.display_chain()), + } + } + + async fn consume_new_relay_list( + &mut self, + result: Result<RelayList, mullvad_rpc::rest::Error>, + ) { + match result { + Ok(relay_list) => { + if let Err(err) = self.update_cache(relay_list).await { + log::error!("Failed to update relay list cache: {}", err); } } + Err(err) => { + log::error!( + "Failed to fetch new relay list: {}. Will retry in {} minutes", + err, + self.earliest_next_try + .saturating_duration_since(Instant::now()) + .as_secs() + / 60 + ); + } } - debug!("Relay list updater thread has finished"); } /// Returns true if the current parsed_relays is older than UPDATE_INTERVAL fn should_update(&mut self) -> bool { match SystemTime::now().duration_since(self.parsed_relays.lock().last_updated()) { - Ok(duration) => duration > UPDATE_INTERVAL, + Ok(duration) => duration > UPDATE_INTERVAL && self.earliest_next_try <= Instant::now(), // If the clock is skewed we have no idea by how much or when the last update // actually was, better download again to get in sync and get a `last_updated` // timestamp corresponding to the new time. @@ -852,10 +894,25 @@ impl RelayListUpdater { } } - fn update(&mut self) -> Result<(), Error> { - let new_relay_list = self.download_relay_list()?; + fn download_relay_list( + rpc_handle: RelayListProxy, + ) -> impl Future<Output = Result<RelayList, mullvad_rpc::rest::Error>> + 'static { + let download_futures = move || rpc_handle.relay_list(); + + let exponential_backoff = ExponentialBackoff::from_millis(EXPONENTIAL_BACKOFF_DELAY_MS) + .factor(EXPONENTIAL_BACKOFF_FACTOR) + .max_delay(UPDATE_INTERVAL * 2); - if let Err(error) = self.cache_relays(&new_relay_list) { + let download_future = retry_future_with_backoff( + download_futures, + |result| result.is_err(), + Jittered::jitter(exponential_backoff), + ); + download_future + } + + async fn update_cache(&mut self, new_relay_list: RelayList) -> Result<(), Error> { + if let Err(error) = Self::cache_relays(&self.cache_path, &new_relay_list).await { error!( "{}", error.display_chain_with_msg("Failed to update relay cache on disk") @@ -874,19 +931,17 @@ impl RelayListUpdater { Ok(()) } - fn download_relay_list(&mut self) -> Result<RelayList, Error> { - let download_future = self.rpc_client.relay_list().map_err(Error::Download); - let relay_list = Timer::default() - .timeout(download_future, DOWNLOAD_TIMEOUT) - .wait()?; - - Ok(relay_list) - } - /// Write a `RelayList` to the cache file. - fn cache_relays(&self, relays: &RelayList) -> Result<(), Error> { - debug!("Writing relays cache to {}", self.cache_path.display()); - let file = File::create(&self.cache_path).map_err(Error::WriteRelayCache)?; - serde_json::to_writer_pretty(io::BufWriter::new(file), relays).map_err(Error::Serialize) + async fn cache_relays(cache_path: &Path, relays: &RelayList) -> Result<(), Error> { + debug!("Writing relays cache to {}", cache_path.display()); + let mut file = File::create(cache_path) + .await + .map_err(Error::OpenRelayCache)?; + let bytes = serde_json::to_vec_pretty(relays).map_err(Error::Serialize)?; + let mut slice: &[u8] = bytes.as_slice(); + let _ = tokio02::io::copy(&mut slice, &mut file) + .await + .map_err(Error::WriteRelayCache)?; + Ok(()) } } diff --git a/mullvad-rpc/src/bin/relay_list.rs b/mullvad-rpc/src/bin/relay_list.rs index 71c9c30917..27601d4b21 100644 --- a/mullvad-rpc/src/bin/relay_list.rs +++ b/mullvad-rpc/src/bin/relay_list.rs @@ -1,6 +1,5 @@ /// Intended to be used to pre-load a relay list when creating an installer for the Mullvad VPN /// app. -use futures01::future::Future; use mullvad_rpc::{rest::Error as RestError, MullvadRpcRuntime, RelayListProxy}; use std::process; use talpid_types::ErrorExt; @@ -10,7 +9,7 @@ fn main() { let relay_list_request = RelayListProxy::new(runtime.mullvad_rest_handle()).relay_list(); - let relay_list = match relay_list_request.wait() { + let relay_list = match runtime.runtime().block_on(relay_list_request) { Ok(relay_list) => relay_list, Err(RestError::TimeoutError(_)) => { eprintln!("Request timed out"); diff --git a/mullvad-rpc/src/relay_list.rs b/mullvad-rpc/src/relay_list.rs index 5e98d4f2c3..b6784d3fda 100644 --- a/mullvad-rpc/src/relay_list.rs +++ b/mullvad-rpc/src/relay_list.rs @@ -7,11 +7,13 @@ use talpid_types::net::wireguard; use std::{ collections::BTreeMap, + future::Future, net::{Ipv4Addr, Ipv6Addr}, time::Duration, }; /// Fetches relay list from https://api.mullvad.net/v1/relays +#[derive(Clone)] pub struct RelayListProxy { handle: rest::MullvadRestHandle, } @@ -25,9 +27,7 @@ impl RelayListProxy { } /// Fetch the relay list - pub fn relay_list( - &self, - ) -> impl futures01::future::Future<Item = relay_list::RelayList, Error = rest::Error> { + pub fn relay_list(&self) -> impl Future<Output = Result<relay_list::RelayList, rest::Error>> { let service = self.handle.service.clone(); let request = self.handle.factory.request("/v1/relays", Method::GET); @@ -43,7 +43,7 @@ impl RelayListProxy { .await? .into_relay_list()) }; - self.handle.service.compat_spawn(future) + future } } |
