summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--mullvad-daemon/src/lib.rs29
-rw-r--r--mullvad-daemon/src/version_check.rs81
-rw-r--r--talpid-core/src/future_retry.rs2
3 files changed, 92 insertions, 20 deletions
diff --git a/mullvad-daemon/src/lib.rs b/mullvad-daemon/src/lib.rs
index 818c75b908..19692d57fa 100644
--- a/mullvad-daemon/src/lib.rs
+++ b/mullvad-daemon/src/lib.rs
@@ -1659,14 +1659,29 @@ where
if self.app_version_info.is_none() {
log::debug!("No version cache found. Fetching new info");
let mut handle = self.version_updater_handle.clone();
- handle.run_version_check().await;
+ tokio::spawn(async move {
+ Self::oneshot_send(
+ tx,
+ handle
+ .run_version_check()
+ .await
+ .map_err(|error| {
+ log::error!(
+ "{}",
+ error.display_chain_with_msg("Error running version check")
+ )
+ })
+ .ok(),
+ "get_version_info response",
+ );
+ });
+ } else {
+ Self::oneshot_send(
+ tx,
+ self.app_version_info.clone(),
+ "get_version_info response",
+ );
}
-
- Self::oneshot_send(
- tx,
- self.app_version_info.clone(),
- "get_version_info response",
- );
}
fn on_get_current_version(&mut self, tx: oneshot::Sender<AppVersion>) {
diff --git a/mullvad-daemon/src/version_check.rs b/mullvad-daemon/src/version_check.rs
index b06466bb1e..44259eb1c6 100644
--- a/mullvad-daemon/src/version_check.rs
+++ b/mullvad-daemon/src/version_check.rs
@@ -2,7 +2,11 @@ use crate::{
version::{is_beta_version, PRODUCT_VERSION},
DaemonEventSender,
};
-use futures::{channel::mpsc, stream::FusedStream, FutureExt, SinkExt, StreamExt, TryFutureExt};
+use futures::{
+ channel::{mpsc, oneshot},
+ stream::FusedStream,
+ FutureExt, SinkExt, StreamExt, TryFutureExt,
+};
use mullvad_rpc::{availability::ApiAvailabilityHandle, rest::MullvadRestHandle, AppVersionProxy};
use mullvad_types::version::{AppVersionInfo, ParsedAppVersion};
use serde::{Deserialize, Serialize};
@@ -33,6 +37,9 @@ const UPDATE_CHECK_INTERVAL: Duration = Duration::from_secs(60 * 5);
const UPDATE_INTERVAL: Duration = Duration::from_secs(60 * 60 * 24);
/// Wait this long until next try if an update failed
const UPDATE_INTERVAL_ERROR: Duration = Duration::from_secs(60 * 60 * 6);
+/// Retry interval for `RunVersionCheck`.
+const IMMEDIATE_UPDATE_INTERVAL_ERROR: Duration = Duration::from_secs(1);
+const IMMEDIATE_UPDATE_MAX_RETRIES: usize = 2;
#[cfg(target_os = "linux")]
const PLATFORM: &str = "linux";
@@ -83,6 +90,12 @@ pub enum Error {
#[error(display = "Clearing version check cache due to a version mismatch")]
CacheVersionMismatch,
+
+ #[error(display = "Version updater is down")]
+ VersionUpdaterDown,
+
+ #[error(display = "Version cache update was aborted")]
+ UpdateAborted,
}
@@ -96,6 +109,7 @@ pub(crate) struct VersionUpdater {
show_beta_releases: bool,
rx: Option<mpsc::Receiver<VersionUpdaterCommand>>,
availability_handle: ApiAvailabilityHandle,
+ internal_done_tx: Option<oneshot::Sender<AppVersionInfo>>,
}
#[derive(Clone)]
@@ -105,7 +119,7 @@ pub(crate) struct VersionUpdaterHandle {
enum VersionUpdaterCommand {
SetShowBetaReleases(bool),
- RunVersionCheck,
+ RunVersionCheck(oneshot::Sender<AppVersionInfo>),
}
impl VersionUpdaterHandle {
@@ -122,14 +136,17 @@ impl VersionUpdaterHandle {
}
}
- pub async fn run_version_check(&mut self) {
+ pub async fn run_version_check(&mut self) -> Result<AppVersionInfo, Error> {
+ let (done_tx, done_rx) = oneshot::channel();
if self
.tx
- .send(VersionUpdaterCommand::RunVersionCheck)
+ .send(VersionUpdaterCommand::RunVersionCheck(done_tx))
.await
.is_err()
{
- log::error!("Version updater already down");
+ Err(Error::VersionUpdaterDown)
+ } else {
+ done_rx.await.map_err(|_| Error::UpdateAborted)
}
}
}
@@ -160,14 +177,45 @@ impl VersionUpdater {
show_beta_releases,
rx: Some(rx),
availability_handle,
+ internal_done_tx: None,
},
VersionUpdaterHandle { tx },
)
}
fn create_update_future(
+ &mut self,
+ done_tx: oneshot::Sender<AppVersionInfo>,
+ ) -> std::pin::Pin<
+ Box<dyn Future<Output = Result<mullvad_rpc::AppVersionResponse, Error>> + Send + 'static>,
+ > {
+ self.internal_done_tx = Some(done_tx);
+
+ let api_handle = self.availability_handle.clone();
+ let version_proxy = self.version_proxy.clone();
+ let platform_version = self.platform_version.clone();
+ let download_future_factory = move || {
+ version_proxy
+ .version_check(
+ PRODUCT_VERSION.to_owned(),
+ PLATFORM,
+ platform_version.clone(),
+ )
+ .map_err(Error::Download)
+ };
+
+ Box::pin(talpid_core::future_retry::retry_future_with_backoff(
+ download_future_factory,
+ move |result| result.is_err() && !api_handle.get_state().is_offline(),
+ std::iter::repeat(IMMEDIATE_UPDATE_INTERVAL_ERROR).take(IMMEDIATE_UPDATE_MAX_RETRIES),
+ ))
+ }
+
+ fn create_update_background_future(
&self,
- ) -> impl Future<Output = Result<mullvad_rpc::AppVersionResponse, Error>> + Send + 'static {
+ ) -> std::pin::Pin<
+ Box<dyn Future<Output = Result<mullvad_rpc::AppVersionResponse, Error>> + Send + 'static>,
+ > {
let api_handle = self.availability_handle.clone();
let version_proxy = self.version_proxy.clone();
let platform_version = self.platform_version.clone();
@@ -184,11 +232,9 @@ impl VersionUpdater {
}
};
- let should_retry = |result: &Result<_, _>| -> bool { result.is_err() };
-
Box::pin(talpid_core::future_retry::retry_future_with_backoff(
download_future_factory,
- should_retry,
+ |result| result.is_err(),
std::iter::repeat(UPDATE_INTERVAL_ERROR),
))
}
@@ -267,6 +313,10 @@ impl VersionUpdater {
}
async fn update_version_info(&mut self, new_version_info: AppVersionInfo) {
+ if let Some(done_tx) = self.internal_done_tx.take() {
+ let _ = done_tx.send(new_version_info.clone());
+ }
+
// if daemon can't be reached, return immediately
if self.update_sender.send(new_version_info.clone()).is_err() {
return;
@@ -317,11 +367,11 @@ impl VersionUpdater {
}).await;
}
}
- Some(VersionUpdaterCommand::RunVersionCheck) => {
+ Some(VersionUpdaterCommand::RunVersionCheck(done_tx)) => {
if self.update_sender.is_closed() {
return;
}
- let download_future = self.create_update_future().fuse();
+ let download_future = self.create_update_future(done_tx).fuse();
version_check = download_future;
}
// time to shut down
@@ -335,9 +385,13 @@ impl VersionUpdater {
if rx.is_terminated() || self.update_sender.is_closed() {
return;
}
+ if self.internal_done_tx.is_some() {
+ // Sync check in progress
+ continue;
+ }
if Instant::now() > self.next_update_time {
- let download_future = self.create_update_future().fuse();
+ let download_future = self.create_update_background_future().fuse();
version_check = download_future;
} else {
check_delay = next_delay();
@@ -358,7 +412,8 @@ impl VersionUpdater {
self.update_version_info(new_version_info).await;
},
Err(err) => {
- log::error!("Failed to get fetch version info - {}", err);
+ log::error!("Failed to fetch version info - {}", err);
+ self.internal_done_tx = None;
},
}
diff --git a/talpid-core/src/future_retry.rs b/talpid-core/src/future_retry.rs
index 6e7c05fc35..0e494cc96c 100644
--- a/talpid-core/src/future_retry.rs
+++ b/talpid-core/src/future_retry.rs
@@ -22,6 +22,8 @@ pub async fn retry_future_with_backoff<
if should_retry(&current_result) {
if let Some(delay) = delays.next() {
sleep(delay).await;
+ } else {
+ return current_result;
}
} else {
return current_result;