diff options
| author | David Lönnhager <david.l@mullvad.net> | 2022-04-11 18:24:25 +0200 |
|---|---|---|
| committer | David Lönnhager <david.l@mullvad.net> | 2022-04-11 18:24:25 +0200 |
| commit | adbbf8c48b2e9326bca2508d72da2b13aa2636b8 (patch) | |
| tree | 60bd266695902efeb7dd908655a4acf853831b7a | |
| parent | 744fb04aa9e7180f37c00d63cd8d068fb04882b1 (diff) | |
| parent | 4e64a6d284828138ffe87aa54755995351658b8a (diff) | |
| download | mullvadvpn-adbbf8c48b2e9326bca2508d72da2b13aa2636b8.tar.xz mullvadvpn-adbbf8c48b2e9326bca2508d72da2b13aa2636b8.zip | |
Merge branch 'fix-long-running-timers'
| -rw-r--r-- | CHANGELOG.md | 1 | ||||
| -rw-r--r-- | Cargo.lock | 11 | ||||
| -rw-r--r-- | Cargo.toml | 1 | ||||
| -rw-r--r-- | mullvad-api/Cargo.toml | 1 | ||||
| -rw-r--r-- | mullvad-api/src/rest.rs | 74 | ||||
| -rw-r--r-- | mullvad-daemon/Cargo.toml | 1 | ||||
| -rw-r--r-- | mullvad-daemon/src/relays/updater.rs | 39 | ||||
| -rw-r--r-- | mullvad-daemon/src/version_check.rs | 22 | ||||
| -rw-r--r-- | talpid-core/Cargo.toml | 1 | ||||
| -rw-r--r-- | talpid-core/src/future_retry.rs | 22 | ||||
| -rw-r--r-- | talpid-time/Cargo.toml | 12 | ||||
| -rw-r--r-- | talpid-time/src/lib.rs | 53 | ||||
| -rw-r--r-- | talpid-time/src/unix.rs | 58 |
13 files changed, 194 insertions, 102 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index 541c335243..cce48a610d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -56,6 +56,7 @@ Line wrap the file at 100 chars. Th - Fix issue where sockets didn't close after disconnecting from WireGuard servers over TCP by updating `udp-over-tcp` to 0.2. - Parse old account history formats correctly when they are empty. +- Use suspend-aware timers for relay list updates and version checks on all platforms. #### Windows - Fix "Open Mullvad VPN" tray context menu item not working after toggling unpinned window setting. diff --git a/Cargo.lock b/Cargo.lock index 3505f87adb..cfdf50934d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1658,6 +1658,7 @@ dependencies = [ "serde", "serde_json", "shadowsocks", + "talpid-time", "talpid-types", "tokio", "tokio-rustls", @@ -1722,6 +1723,7 @@ dependencies = [ "simple-signal", "talpid-core", "talpid-platform-metadata", + "talpid-time", "talpid-types", "tokio", "tokio-stream", @@ -3253,6 +3255,7 @@ dependencies = [ "system-configuration", "talpid-dbus", "talpid-platform-metadata", + "talpid-time", "talpid-types", "tempfile", "tokio", @@ -3312,6 +3315,14 @@ dependencies = [ ] [[package]] +name = "talpid-time" +version = "0.1.0" +dependencies = [ + "libc", + "tokio", +] + +[[package]] name = "talpid-types" version = "0.1.0" dependencies = [ diff --git a/Cargo.toml b/Cargo.toml index d5fff68b4b..f40610b778 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,7 @@ members = [ "talpid-core", "talpid-dbus", "talpid-platform-metadata", + "talpid-time", "mullvad-management-interface", "tunnel-obfuscation", ] diff --git a/mullvad-api/Cargo.toml b/mullvad-api/Cargo.toml index cf3c3debd1..074d67c7c1 100644 --- a/mullvad-api/Cargo.toml +++ b/mullvad-api/Cargo.toml @@ -33,6 +33,7 @@ lazy_static = "1.1.0" mullvad-types = { path = "../mullvad-types" } talpid-types = { path = "../talpid-types" } +talpid-time = { path = "../talpid-time" } shadowsocks = { git = "https://github.com/shadowsocks/shadowsocks-rust", rev = "7388ddfb7d36d5b84908c476daabc91c8b065a37", default-features = false, features = ["stream-cipher"] } diff --git a/mullvad-api/src/rest.rs b/mullvad-api/src/rest.rs index 292bcd0cbb..c3f48b3e8c 100644 --- a/mullvad-api/src/rest.rs +++ b/mullvad-api/src/rest.rs @@ -22,7 +22,7 @@ use std::{ future::Future, str::FromStr, sync::{Arc, Weak}, - time::{Duration, Instant}, + time::Duration, }; use talpid_types::ErrorExt; @@ -33,8 +33,7 @@ pub type Response = hyper::Response<hyper::Body>; const USER_AGENT: &str = "mullvad-app"; -const TIMER_CHECK_INTERVAL: Duration = Duration::from_secs(60); -const API_IP_CHECK_DELAY: Duration = Duration::from_secs(15 * 60); +const API_IP_CHECK_INITIAL: Duration = Duration::from_secs(15 * 60); const API_IP_CHECK_INTERVAL: Duration = Duration::from_secs(24 * 60 * 60); const API_IP_CHECK_ERROR_INTERVAL: Duration = Duration::from_secs(15 * 60); @@ -625,50 +624,41 @@ impl MullvadRestHandle { let availability = self.availability.clone(); tokio::spawn(async move { - // always start the fetch after 15 minutes let api_proxy = crate::ApiProxy::new(handle); - let mut next_check = Instant::now() + API_IP_CHECK_DELAY; - - let next_error_check = || Instant::now() + API_IP_CHECK_ERROR_INTERVAL; - let next_regular_check = || Instant::now() + API_IP_CHECK_INTERVAL; - - let mut interval = tokio::time::interval_at(next_check.into(), TIMER_CHECK_INTERVAL); + let mut next_delay = API_IP_CHECK_INITIAL; loop { - interval.tick().await; - if next_check < Instant::now() { - if let Err(error) = availability.wait_background().await { - log::error!("Failed while waiting for API: {}", error); - next_check = next_error_check(); - continue; - } - match api_proxy.clone().get_api_addrs().await { - Ok(new_addrs) => { - if let Some(addr) = new_addrs.get(0) { - log::debug!( - "Fetched new API address {:?}. Fetching again in {} hours", - addr, - API_IP_CHECK_INTERVAL.as_secs() / (60 * 60) - ); - if let Err(err) = address_cache.set_address(*addr).await { - log::error!( - "Failed to save newly updated API address: {}", - err - ); - } - } else { - log::error!("API returned no API addresses"); - } - next_check = next_regular_check(); - } - Err(err) => { - log::error!( - "Failed to fetch new API addresses: {}. Retrying in {} seconds", - err, - API_IP_CHECK_ERROR_INTERVAL.as_secs() + talpid_time::sleep(next_delay).await; + + if let Err(error) = availability.wait_background().await { + log::error!("Failed while waiting for API: {}", error); + continue; + } + match api_proxy.clone().get_api_addrs().await { + Ok(new_addrs) => { + if let Some(addr) = new_addrs.get(0) { + log::debug!( + "Fetched new API address {:?}. Fetching again in {} hours", + addr, + API_IP_CHECK_INTERVAL.as_secs() / (60 * 60) ); - next_check = next_error_check(); + if let Err(err) = address_cache.set_address(*addr).await { + log::error!("Failed to save newly updated API address: {}", err); + } + } else { + log::error!("API returned no API addresses"); } + + next_delay = API_IP_CHECK_INTERVAL; + } + Err(err) => { + log::error!( + "Failed to fetch new API addresses: {}. Retrying in {} seconds", + err, + API_IP_CHECK_ERROR_INTERVAL.as_secs() + ); + + next_delay = API_IP_CHECK_ERROR_INTERVAL; } } } diff --git a/mullvad-daemon/Cargo.toml b/mullvad-daemon/Cargo.toml index b2905f9aea..f970250385 100644 --- a/mullvad-daemon/Cargo.toml +++ b/mullvad-daemon/Cargo.toml @@ -35,6 +35,7 @@ mullvad-api = { path = "../mullvad-api" } talpid-core = { path = "../talpid-core" } talpid-types = { path = "../talpid-types" } talpid-platform-metadata = { path = "../talpid-platform-metadata" } +talpid-time = { path = "../talpid-time" } [target.'cfg(not(target_os="android"))'.dependencies] mullvad-management-interface = { path = "../mullvad-management-interface" } diff --git a/mullvad-daemon/src/relays/updater.rs b/mullvad-daemon/src/relays/updater.rs index ae55a998f6..35f431125a 100644 --- a/mullvad-daemon/src/relays/updater.rs +++ b/mullvad-daemon/src/relays/updater.rs @@ -10,7 +10,7 @@ use parking_lot::Mutex; use std::{ path::{Path, PathBuf}, sync::Arc, - time::{Duration, Instant, SystemTime}, + time::{Duration, SystemTime, UNIX_EPOCH}, }; use talpid_core::future_retry::{retry_future, ExponentialBackoff, Jittered}; use talpid_types::ErrorExt; @@ -45,7 +45,7 @@ pub struct RelayListUpdater { cache_path: PathBuf, parsed_relays: Arc<Mutex<ParsedRelays>>, on_update: Box<dyn Fn(&RelayList) + Send + 'static>, - earliest_next_try: Instant, + last_check: SystemTime, api_availability: ApiAvailabilityHandle, } @@ -64,7 +64,7 @@ impl RelayListUpdater { cache_path, parsed_relays, on_update, - earliest_next_try: Instant::now() + UPDATE_INTERVAL, + last_check: UNIX_EPOCH, api_availability, }; @@ -74,20 +74,17 @@ impl RelayListUpdater { } async fn run(mut self, mut cmd_rx: mpsc::Receiver<()>) { - let mut check_interval = tokio::time::interval_at( - (Instant::now() + UPDATE_CHECK_INTERVAL).into(), - UPDATE_CHECK_INTERVAL, - ); - check_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - let mut ticker = tokio_stream::wrappers::IntervalStream::new(check_interval).fuse(); let mut download_future = Box::pin(Fuse::terminated()); loop { + let next_check = tokio::time::sleep(UPDATE_CHECK_INTERVAL).fuse(); + tokio::pin!(next_check); + futures::select! { - _check_update = ticker.select_next_some() => { + _check_update = next_check => { if download_future.is_terminated() && self.should_update() { let tag = self.parsed_relays.lock().tag().map(|tag| tag.to_string()); download_future = Box::pin(Self::download_relay_list(self.api_availability.clone(), self.api_client.clone(), tag).fuse()); - self.earliest_next_try = Instant::now() + UPDATE_INTERVAL; + self.last_check = SystemTime::now(); } }, @@ -100,6 +97,7 @@ impl RelayListUpdater { Some(()) => { let tag = self.parsed_relays.lock().tag().map(|tag| tag.to_string()); download_future = Box::pin(Self::download_relay_list(self.api_availability.clone(), self.api_client.clone(), tag).fuse()); + self.last_check = SystemTime::now(); }, None => { log::trace!("Relay list updater shutting down"); @@ -123,23 +121,18 @@ impl RelayListUpdater { } } Ok(None) => log::debug!("Relay list is up-to-date"), - Err(err) => { - log::error!( - "Failed to fetch new relay list: {}. Will retry in {} minutes", - err, - self.earliest_next_try - .saturating_duration_since(Instant::now()) - .as_secs() - / 60 - ); - } + Err(error) => log::error!( + "{}", + error.display_chain_with_msg("Failed to fetch new relay list") + ), } } /// Returns true if the current parsed_relays is older than UPDATE_INTERVAL fn should_update(&mut self) -> bool { - match SystemTime::now().duration_since(self.parsed_relays.lock().last_updated()) { - Ok(duration) => duration > UPDATE_INTERVAL && self.earliest_next_try <= Instant::now(), + let last_check = std::cmp::max(self.parsed_relays.lock().last_updated(), self.last_check); + match SystemTime::now().duration_since(last_check) { + Ok(duration) => duration >= UPDATE_INTERVAL, // If the clock is skewed we have no idea by how much or when the last update // actually was, better download again to get in sync and get a `last_updated` // timestamp corresponding to the new time. diff --git a/mullvad-daemon/src/version_check.rs b/mullvad-daemon/src/version_check.rs index 13792383de..ffa5abc979 100644 --- a/mullvad-daemon/src/version_check.rs +++ b/mullvad-daemon/src/version_check.rs @@ -14,7 +14,7 @@ use std::{ future::Future, io, path::{Path, PathBuf}, - time::{Duration, Instant}, + time::Duration, }; use talpid_core::mpsc::Sender; use talpid_types::ErrorExt; @@ -28,11 +28,7 @@ lazy_static::lazy_static! { } const DOWNLOAD_TIMEOUT: Duration = Duration::from_secs(15); -/// How often the updater should wake up to check the in-memory cache. -/// This exist to prevent problems around sleeping. If you set it to sleep -/// for `UPDATE_INTERVAL` directly and the computer is suspended, that clock -/// won't tick, and the next update will be after 24 hours of the computer being *on*. -const UPDATE_CHECK_INTERVAL: Duration = Duration::from_secs(60 * 5); + /// Wait this long until next check after a successful check const UPDATE_INTERVAL: Duration = Duration::from_secs(60 * 60 * 24); /// Wait this long until next try if an update failed @@ -103,7 +99,6 @@ pub(crate) struct VersionUpdater { update_sender: DaemonEventSender<AppVersionInfo>, last_app_version_info: Option<AppVersionInfo>, platform_version: String, - next_update_time: Instant, show_beta_releases: bool, rx: Option<mpsc::Receiver<VersionUpdaterCommand>>, availability_handle: ApiAvailabilityHandle, @@ -171,7 +166,6 @@ impl VersionUpdater { update_sender, last_app_version_info, platform_version, - next_update_time: Instant::now(), show_beta_releases, rx: Some(rx), availability_handle, @@ -341,7 +335,7 @@ impl VersionUpdater { pub async fn run(mut self) { let mut rx = self.rx.take().unwrap().fuse(); - let next_delay = || Box::pin(tokio::time::sleep(UPDATE_CHECK_INTERVAL)).fuse(); + let next_delay = || Box::pin(talpid_time::sleep(UPDATE_INTERVAL)).fuse(); let mut check_delay = next_delay(); let mut version_check = futures::future::Fuse::terminated(); @@ -400,21 +394,13 @@ impl VersionUpdater { // Sync check in progress continue; } - - if Instant::now() > self.next_update_time { - let download_future = self.create_update_background_future().fuse(); - version_check = download_future; - } else { - check_delay = next_delay(); - } - + version_check = self.create_update_background_future().fuse(); }, response = version_check => { if rx.is_terminated() || self.update_sender.is_closed() { return; } - self.next_update_time = Instant::now() + UPDATE_INTERVAL; match response { Ok(version_info_response) => { diff --git a/talpid-core/Cargo.toml b/talpid-core/Cargo.toml index e1ea19e3cf..d9b6ddd5d9 100644 --- a/talpid-core/Cargo.toml +++ b/talpid-core/Cargo.toml @@ -25,6 +25,7 @@ parking_lot = "0.11" regex = "1.1.0" shell-escape = "0.1" talpid-types = { path = "../talpid-types" } +talpid-time = { path = "../talpid-time" } uuid = { version = "0.8", features = ["v4"] } zeroize = "1" chrono = "0.4.19" diff --git a/talpid-core/src/future_retry.rs b/talpid-core/src/future_retry.rs index 37937e173a..604e4513e1 100644 --- a/talpid-core/src/future_retry.rs +++ b/talpid-core/src/future_retry.rs @@ -1,9 +1,6 @@ use rand::{distributions::OpenClosed01, Rng}; use std::{future::Future, time::Duration}; - -/// Since timers often exhibit weird behavior if they are running for too long, a workaround is -/// required - run a timer for 60 seconds until a delay is shorter than 5 minutes. -const MAX_SINGLE_DELAY: Duration = Duration::from_secs(5 * 60); +use talpid_time::sleep; /// Convenience function that works like [`retry_future`] but limits the number /// of retries to `max_retries`. @@ -52,15 +49,6 @@ pub fn constant_interval(interval: Duration) -> impl Iterator<Item = Duration> { std::iter::repeat(interval) } -async fn sleep(mut delay: Duration) { - while delay > MAX_SINGLE_DELAY { - delay -= MAX_SINGLE_DELAY; - tokio::time::sleep(MAX_SINGLE_DELAY).await; - } - - tokio::time::sleep(delay).await; -} - /// Provides an exponential back-off timer to delay the next retry of a failed operation. pub struct ExponentialBackoff { next: Duration, @@ -206,6 +194,8 @@ mod test { assert!(jittered_duration <= unjittered_duration); } + // NOTE: The test is disabled because the clock does not advance. + #[ignore] #[tokio::test] async fn test_exponential_backoff_delay() { let retry_interval_initial = Duration::from_secs(4); @@ -222,10 +212,4 @@ mod test { ) .await; } - - #[tokio::test] - async fn test_timer_advancement() { - tokio::time::pause(); - sleep(Duration::from_secs(60 * 60)).await - } } diff --git a/talpid-time/Cargo.toml b/talpid-time/Cargo.toml new file mode 100644 index 0000000000..754007e84a --- /dev/null +++ b/talpid-time/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "talpid-time" +version = "0.1.0" +authors = ["Mullvad VPN"] +description = "Time functions" +license = "GPL-3.0" +edition = "2021" +publish = false + +[dependencies] +tokio = { version = "1.8", features = ["time"] } +libc = "0.2" diff --git a/talpid-time/src/lib.rs b/talpid-time/src/lib.rs new file mode 100644 index 0000000000..3b536eda54 --- /dev/null +++ b/talpid-time/src/lib.rs @@ -0,0 +1,53 @@ +use std::time::Duration; + +#[cfg(target_os = "windows")] +mod inner { + pub use std::time::Instant; +} + +#[cfg(unix)] +#[path = "unix.rs"] +mod inner; + +const MAX_SLEEP_INTERVAL: Duration = Duration::from_secs(60); + +/// Represents a measurement of a monotonic clock. +/// Unlike [std::time::Instant], the difference between two +/// instances is guaranteed to include time spent in system +/// sleep. +#[derive(Clone, Copy)] +pub struct Instant { + t: inner::Instant, +} + +impl Instant { + pub fn now() -> Self { + Self { + t: inner::Instant::now(), + } + } + + pub fn duration_since(&self, earlier: Instant) -> Duration { + self.t.duration_since(earlier.t) + } +} + +/// Waits for the specified interval while taking into account system sleep or suspension. +/// The accuracy is to within about one minute. +pub async fn sleep(duration: Duration) { + let started = Instant::now(); + + loop { + let elapsed = Instant::now().duration_since(started); + + if elapsed >= duration { + return; + } + + tokio::time::sleep(std::cmp::min( + MAX_SLEEP_INTERVAL, + duration.saturating_sub(elapsed), + )) + .await; + } +} diff --git a/talpid-time/src/unix.rs b/talpid-time/src/unix.rs new file mode 100644 index 0000000000..b9721dd27a --- /dev/null +++ b/talpid-time/src/unix.rs @@ -0,0 +1,58 @@ +use libc::{clock_gettime, clockid_t, timespec}; +use std::{mem::MaybeUninit, time::Duration}; + +const NSEC_PER_SEC: i64 = 1000000000; + +#[cfg(target_os = "macos")] +const CLOCK_ID: clockid_t = libc::CLOCK_MONOTONIC; + +#[cfg(any(target_os = "linux", target_os = "android"))] +const CLOCK_ID: clockid_t = libc::CLOCK_BOOTTIME; + +#[derive(Clone, Copy)] +pub struct Instant { + t: timespec, +} + +impl Instant { + pub fn now() -> Self { + Self { t: now() } + } + + fn checked_duration_since(&self, earlier: Instant) -> Option<Duration> { + // Assumptions: + // * `tv_sec >= 0` + // * `tv_nsec < NSEC_PER_SEC` + + let (tv_sec, tv_nsec) = if self.t.tv_nsec < earlier.t.tv_nsec { + ( + self.t.tv_sec - earlier.t.tv_sec - 1, + NSEC_PER_SEC - earlier.t.tv_nsec + self.t.tv_nsec, + ) + } else { + ( + self.t.tv_sec - earlier.t.tv_sec, + self.t.tv_nsec - earlier.t.tv_nsec, + ) + }; + + if tv_sec < 0 { + return None; + } + + Some(Duration::new(tv_sec as _, tv_nsec as _)) + } + + pub fn duration_since(&self, earlier: Instant) -> Duration { + self.checked_duration_since(earlier) + .unwrap_or(Duration::ZERO) + } +} + +fn now() -> timespec { + let mut t = MaybeUninit::zeroed(); + unsafe { + clock_gettime(CLOCK_ID, t.as_mut_ptr()); + t.assume_init() + } +} |
