diff options
| author | Sebastian Holmin <sebastian.holmin@mullvad.net> | 2025-04-09 13:47:36 +0200 |
|---|---|---|
| committer | Sebastian Holmin <sebastian.holmin@mullvad.net> | 2025-05-28 13:25:27 +0200 |
| commit | 23e7acba0f8afd4d238df067c836ae649fa80b84 (patch) | |
| tree | bd658b140a9f27066d1d7ae6788d84c3f4af2eef /mullvad-daemon/src/version | |
| parent | c525c77d54f5c449f872094a804d77b7e85bfc55 (diff) | |
| download | mullvadvpn-23e7acba0f8afd4d238df067c836ae649fa80b84.tar.xz mullvadvpn-23e7acba0f8afd4d238df067c836ae649fa80b84.zip | |
Add in app upgrades to the daemon
---------
Co-authored-by: Markus Pettersson <markus.pettersson@mullvad.net>
Diffstat (limited to 'mullvad-daemon/src/version')
| -rw-r--r-- | mullvad-daemon/src/version/check.rs | 2 | ||||
| -rw-r--r-- | mullvad-daemon/src/version/downloader.rs | 215 | ||||
| -rw-r--r-- | mullvad-daemon/src/version/mod.rs | 3 | ||||
| -rw-r--r-- | mullvad-daemon/src/version/router.rs | 604 |
4 files changed, 422 insertions, 402 deletions
diff --git a/mullvad-daemon/src/version/check.rs b/mullvad-daemon/src/version/check.rs index d2221a55d5..e6ebff9920 100644 --- a/mullvad-daemon/src/version/check.rs +++ b/mullvad-daemon/src/version/check.rs @@ -468,7 +468,7 @@ async fn try_load_cache(cache_dir: &Path) -> Result<(VersionCache, SystemTime), let cache: VersionCache = serde_json::from_str(&content).map_err(Error::Deserialize)?; - if cache_is_old(&cache.latest_version, &*APP_VERSION) { + if cache_is_old(&cache.latest_version, &APP_VERSION) { return Err(Error::OutdatedVersion); } diff --git a/mullvad-daemon/src/version/downloader.rs b/mullvad-daemon/src/version/downloader.rs index d1d9126f5b..f7cd2cc5cd 100644 --- a/mullvad-daemon/src/version/downloader.rs +++ b/mullvad-daemon/src/version/downloader.rs @@ -1,107 +1,133 @@ #![cfg(update)] -use futures::channel::{mpsc, oneshot}; -use mullvad_update::app::{AppDownloader, AppDownloaderParameters, HttpAppDownloader}; +use mullvad_types::version::{AppUpgradeDownloadProgress, AppUpgradeError, AppUpgradeEvent}; +use mullvad_update::app::{ + AppDownloader, AppDownloaderParameters, DownloadError, HttpAppDownloader, +}; use rand::seq::SliceRandom; -use std::time::Duration; -use std::{future::Future, path::PathBuf}; +use std::path::PathBuf; +use std::time::{Duration, Instant}; use tokio::fs; +use tokio::sync::broadcast; -type Result<T> = std::result::Result<T, Error>; - -pub struct Downloader(()); - -pub type AbortHandle = oneshot::Sender<()>; - -/// App updater event -pub enum UpdateEvent { - /// Download progress update - Downloading { - /// Server that the app is being downloaded from - server: String, - /// A fraction in `[0,1]` that describes how much of the installer has been downloaded - complete_frac: f32, - /// Estimated time left - time_left: Duration, - }, - /// Download failed due to some error - DownloadFailed, - /// Download completed, so verifying now - Verifying, - /// The verification failed due to some error - VerificationFailed, - /// There is a downloaded and verified installer available - Verified { verified_installer_path: PathBuf }, -} - +#[derive(thiserror::Error, Debug)] pub enum Error { #[error("Failed to get download directory")] GetDownloadDir(#[from] mullvad_paths::Error), #[error("Failed to create download directory")] - CreateDownloadDir(#[source] io::Error), + CreateDownloadDir(#[source] std::io::Error), + + #[error("Failed to download app")] + Download(#[from] DownloadError), + + #[error("Download was cancelled or panicked")] + JoinError(#[from] tokio::task::JoinError), #[error("Could not select URL for app update")] NoUrlFound, } -impl Downloader { - /// Begin or resume download of `version` - pub async fn start( - version: mullvad_update::version::Version, - event_tx: mpsc::UnboundedSender<UpdateEvent>, - ) -> Result<impl Future<Output = ()>> { - let url = select_cdn_url(&version.urls) - .ok_or(Error::NoUrlFound)? - .to_owned(); +type Result<T> = std::result::Result<T, Error>; - let download_dir = mullvad_paths::cache_dir()?.join("mullvad-update"); - fs::create_dir_all(&download_dir) - .await - .map_err(Error::CreateDownloadDir)?; +#[derive(Debug)] +pub struct DownloaderHandle { + /// Handle to the downloader task + task: tokio::task::JoinHandle<std::result::Result<PathBuf, Error>>, + /// Handle to send `AppUpgradeEvent::Aborted` when the downloader is dropped + dropped_tx: Option<broadcast::Sender<AppUpgradeEvent>>, +} - let params = AppDownloaderParameters { - app_version: version.version, - app_url: url.clone(), - app_size: version.size, - app_progress: ProgressUpdater::new(server_from_url(&url), event_tx.clone()), - app_sha256: version.sha256, - cache_dir: download_dir, - }; - let mut downloader = HttpAppDownloader::from(params); +impl Drop for DownloaderHandle { + fn drop(&mut self) { + self.task.abort(); + if let Some(dropped_tx) = self.dropped_tx.take() { + // If the downloader is dropped, send an event to notify that it was aborted + let _ = dropped_tx.send(AppUpgradeEvent::Aborted); + } + } +} - Ok(async move { - if let Err(_error) = downloader.download_executable().await { - let _ = event_tx.unbounded_send(UpdateEvent::DownloadFailed); - return; - } +impl DownloaderHandle { + /// Wait for the downloader to finish + pub async fn wait(&mut self) -> Result<PathBuf> { + let path = (&mut self.task).await?; + self.dropped_tx = None; // Prevent sending the aborted event after successful download + path + } +} - let _ = event_tx.unbounded_send(UpdateEvent::Verifying); +pub fn spawn_downloader( + version: mullvad_update::version::Version, + event_tx: broadcast::Sender<AppUpgradeEvent>, +) -> DownloaderHandle { + DownloaderHandle { + task: tokio::spawn(start(version, event_tx.clone())), + dropped_tx: Some(event_tx), + } +} - if let Err(_error) = downloader.verify().await { - let _ = event_tx.unbounded_send(UpdateEvent::VerificationFailed); - return; - } +/// Begin or resume download of `version` +async fn start( + version: mullvad_update::version::Version, + event_tx: broadcast::Sender<AppUpgradeEvent>, +) -> Result<PathBuf> { + let url = select_cdn_url(&version.urls) + .ok_or(Error::NoUrlFound)? + .to_owned(); - let _ = event_tx.unbounded_send(UpdateEvent::Verified { - verified_installer_path: downloader.bin_path(), - }); - }) - } + log::info!("Downloading app version '{}' from {url}", version.version); + + let download_dir = mullvad_paths::cache_dir()?.join("mullvad-update"); + log::trace!("Download directory: {download_dir:?}"); + fs::create_dir_all(&download_dir) + .await + .map_err(Error::CreateDownloadDir)?; + + let params = AppDownloaderParameters { + app_version: version.version, + app_url: url.clone(), + app_size: version.size, + app_progress: ProgressUpdater::new(server_from_url(&url), event_tx.clone()), + app_sha256: version.sha256, + cache_dir: download_dir, + }; + let mut downloader = HttpAppDownloader::from(params); + + if let Err(download_err) = downloader.download_executable().await { + log::error!("Failed to download app: {download_err}"); + let _ = event_tx.send(AppUpgradeEvent::Error(AppUpgradeError::DownloadFailed)); + return Err(download_err.into()); + }; + + let _ = event_tx.send(AppUpgradeEvent::VerifyingInstaller); + + if let Err(verify_err) = downloader.verify().await { + log::error!("Failed to verify downloaded app: {verify_err}"); + let _ = event_tx.send(AppUpgradeEvent::Error(AppUpgradeError::VerificationFailed)); + return Err(verify_err.into()); + }; + + let _ = event_tx.send(AppUpgradeEvent::VerifiedInstaller); + Ok(downloader.bin_path()) } struct ProgressUpdater { server: String, - event_tx: mpsc::UnboundedSender<UpdateEvent>, + event_tx: broadcast::Sender<AppUpgradeEvent>, complete_frac: f32, + start_time: Instant, + complete_frac_at_start: Option<f32>, } impl ProgressUpdater { - fn new(server: String, event_tx: mpsc::UnboundedSender<UpdateEvent>) -> Self { + fn new(server: String, event_tx: broadcast::Sender<AppUpgradeEvent>) -> Self { Self { server, event_tx, complete_frac: 0., + start_time: Instant::now(), + complete_frac_at_start: None, } } } @@ -115,29 +141,52 @@ impl mullvad_update::fetch::ProgressUpdater for ProgressUpdater { if (self.complete_frac - fraction_complete).abs() < 0.01 { return; } + let complete_frac_at_start = self.complete_frac_at_start.get_or_insert(fraction_complete); self.complete_frac = fraction_complete; - let _ = self.event_tx.unbounded_send(UpdateEvent::Downloading { - server: self.server.clone(), - complete_frac: fraction_complete, - // TODO: estimate time left based on how much was downloaded (maybe in last n seconds) - time_left: Duration::ZERO, - }); + let _ = self.event_tx.send(AppUpgradeEvent::DownloadProgress( + AppUpgradeDownloadProgress { + server: self.server.clone(), + progress: (fraction_complete * 100.0) as u32, + time_left: estimate_time_left( + self.start_time, + fraction_complete, + *complete_frac_at_start, + ), + }, + )); } fn clear_progress(&mut self) { self.complete_frac = 0.; - let _ = self.event_tx.unbounded_send(UpdateEvent::Downloading { - server: self.server.clone(), - complete_frac: 0., - // TODO: Check if this is reasonable - time_left: Duration::ZERO, - }); + let _ = self.event_tx.send(AppUpgradeEvent::DownloadProgress( + AppUpgradeDownloadProgress { + server: self.server.clone(), + progress: 0, + time_left: None, + }, + )); } } +fn estimate_time_left( + start_time: Instant, + fraction_complete: f32, + complete_frac_at_start: f32, +) -> Option<Duration> { + let completed_frac_since_start = fraction_complete - complete_frac_at_start; + // Don't estimate time left if the progress is less than 1%, to avoid division numerical instability + if completed_frac_since_start <= 0.01 { + return None; + } + let remaining_frac = 1.0 - fraction_complete; + + let elapsed = start_time.elapsed(); + Some(elapsed.mul_f32(remaining_frac / completed_frac_since_start)) +} + /// Select a mirror to download from /// Currently, the selection is random fn select_cdn_url(urls: &[String]) -> Option<&str> { diff --git a/mullvad-daemon/src/version/mod.rs b/mullvad-daemon/src/version/mod.rs index 7c769ceb99..e83082ae64 100644 --- a/mullvad-daemon/src/version/mod.rs +++ b/mullvad-daemon/src/version/mod.rs @@ -38,9 +38,6 @@ pub enum Error { #[error("Version cache update was aborted")] UpdateAborted, - - #[cfg(update)] - Update(#[transparent] downloader::Error), } /// Contains the date of the git commit this was built from diff --git a/mullvad-daemon/src/version/router.rs b/mullvad-daemon/src/version/router.rs index 7e62c0e2fb..6e9eec9f45 100644 --- a/mullvad-daemon/src/version/router.rs +++ b/mullvad-daemon/src/version/router.rs @@ -11,6 +11,7 @@ use mullvad_types::version::{AppVersionInfo, SuggestedUpgrade}; use mullvad_update::version::VersionInfo; use talpid_core::mpsc::Sender; +use crate::management_interface::AppUpgradeBroadcast; use crate::DaemonEventSender; use super::{ @@ -20,9 +21,6 @@ use super::{ #[cfg(update)] use super::downloader; -#[cfg(update)] -use mullvad_types::version::AppUpgradeEvent; -#[cfg(update)] use std::mem; type Result<T> = std::result::Result<T, Error>; @@ -41,7 +39,7 @@ impl VersionRouterHandle { result_rx.await.map_err(|_| Error::VersionRouterClosed) } - pub async fn get_latest_version(&self) -> Result<mullvad_types::version::AppVersionInfo> { + pub async fn get_latest_version(&self) -> Result<AppVersionInfo> { let (result_tx, result_rx) = oneshot::channel(); self.tx .send(Message::GetLatestVersion(result_tx)) @@ -66,17 +64,6 @@ impl VersionRouterHandle { .map_err(|_| Error::VersionRouterClosed)?; result_rx.await.map_err(|_| Error::VersionRouterClosed) } - - #[cfg(update)] - pub fn new_upgrade_event_listener( - &self, - ) -> Result<mpsc::UnboundedReceiver<mullvad_types::version::AppUpgradeEvent>> { - let (event_tx, event_rx) = mpsc::unbounded(); - self.tx - .send(Message::NewUpgradeEventListener { event_tx }) - .map_err(|_| Error::VersionRouterClosed)?; - Ok(event_rx) - } } /// Router of version updates and update requests. @@ -87,9 +74,9 @@ impl VersionRouterHandle { /// in case new version info is received while the update is in progress. pub struct VersionRouter { rx: mpsc::UnboundedReceiver<Message>, - state: RoutingState, + state: State, beta_program: bool, - version_event_sender: DaemonEventSender<mullvad_types::version::AppVersionInfo>, + version_event_sender: DaemonEventSender<AppVersionInfo>, /// Version updater version_check: check::VersionUpdaterHandle, /// Channel used to receive updates from `version_check` @@ -97,31 +84,11 @@ pub struct VersionRouter { /// Future that resolves when `get_latest_version` resolves version_request: Fuse<Pin<Box<dyn Future<Output = Result<VersionCache>> + Send>>>, /// Channels that receive responses to `get_latest_version` - version_request_channels: Vec<oneshot::Sender<Result<mullvad_types::version::AppVersionInfo>>>, - #[cfg(update)] - update: Update, -} - -#[cfg(update)] -struct Update { - /// Channel used to send upgrade events from [downloader::Downloader] - update_event_tx: mpsc::UnboundedSender<downloader::UpdateEvent>, - /// Channel used to receive upgrade events from [downloader::Downloader] - update_event_rx: mpsc::UnboundedReceiver<downloader::UpdateEvent>, - /// Clients that will also receive events - upgrade_listeners: Vec<mpsc::UnboundedSender<AppUpgradeEvent>>, -} + version_request_channels: Vec<oneshot::Sender<Result<AppVersionInfo>>>, -#[cfg(update)] -impl Update { - fn new() -> Self { - let (update_event_tx, update_event_rx) = mpsc::unbounded(); - Self { - update_event_tx, - update_event_rx, - upgrade_listeners: Vec::default(), - } - } + /// Broadcast channel for app upgrade events + #[cfg(update)] + app_upgrade_broadcast: AppUpgradeBroadcast, } enum Message { @@ -131,56 +98,93 @@ enum Message { result_tx: oneshot::Sender<()>, }, /// Check for updates - GetLatestVersion(oneshot::Sender<Result<mullvad_types::version::AppVersionInfo>>), + GetLatestVersion(oneshot::Sender<Result<AppVersionInfo>>), /// Update the application #[cfg(update)] UpdateApplication { result_tx: oneshot::Sender<()> }, /// Cancel the ongoing update #[cfg(update)] CancelUpdate { result_tx: oneshot::Sender<()> }, - /// Listen for events - #[cfg(update)] - NewUpgradeEventListener { - /// Channel for receiving update events - event_tx: mpsc::UnboundedSender<AppUpgradeEvent>, - }, } #[derive(Debug)] -enum RoutingState { +enum State { /// There is no version available yet NoVersion, /// Running version checker, no upgrade in progress - HasVersion { version_info: VersionCache }, + HasVersion { version_cache: VersionCache }, /// Download is in progress, so we don't forward version checks + #[cfg(update)] Downloading { /// Version info received from `HasVersion` - version_info: VersionCache, - /// The version being upgraded to (derived from `suggested_upgrade`). - /// Should be one of the versions in `version_info`. + version_cache: VersionCache, + /// The version being upgraded to, derived from `version_info` and beta program state upgrading_to_version: mullvad_update::version::Version, - /// Version check update received while paused - /// When transitioning out of `Upgrading`, this will cause `version_info` to be updated - new_version: Option<VersionCache>, /// Tokio task for the downloader handle - downloader_handle: tokio::task::JoinHandle<()>, + downloader_handle: downloader::DownloaderHandle, }, /// Download is complete. We have a verified binary + #[cfg(update)] Downloaded { /// Version info received from `HasVersion` - version_info: VersionCache, + version_cache: VersionCache, /// Path to verified installer verified_installer_path: PathBuf, }, } +impl std::fmt::Display for State { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + State::NoVersion => write!(f, "NoVersion"), + State::HasVersion { .. } => write!(f, "HasVersion"), + #[cfg(update)] + State::Downloading { + upgrading_to_version, + .. + } => write!(f, "Downloading '{}'", upgrading_to_version.version), + #[cfg(update)] + State::Downloaded { + verified_installer_path, + .. + } => write!(f, "Downloaded '{}'", verified_installer_path.display()), + } + } +} + +impl State { + fn get_version_cache(&self) -> Option<&VersionCache> { + match self { + State::NoVersion => None, + State::HasVersion { version_cache, .. } => Some(version_cache), + #[cfg(update)] + State::Downloading { version_cache, .. } | State::Downloaded { version_cache, .. } => { + Some(version_cache) + } + } + } + + fn get_verified_installer_path(&self) -> Option<&PathBuf> { + match self { + #[cfg(update)] + State::Downloaded { + verified_installer_path, + .. + } => Some(verified_installer_path), + _ => None, + } + } +} + impl VersionRouter { + #[cfg_attr(not(update), allow(unused_variables))] pub(crate) fn spawn( api_handle: MullvadRestHandle, availability_handle: ApiAvailability, cache_dir: PathBuf, - version_event_sender: DaemonEventSender<mullvad_types::version::AppVersionInfo>, + version_event_sender: DaemonEventSender<AppVersionInfo>, beta_program: bool, + app_upgrade_broadcast: AppUpgradeBroadcast, ) -> VersionRouterHandle { let (tx, rx) = mpsc::unbounded(); @@ -190,10 +194,9 @@ impl VersionRouter { VersionUpdater::spawn(api_handle, availability_handle, cache_dir, new_version_tx) .await; - // TODO: tokio::join! here? Self { rx, - state: RoutingState::NoVersion, + state: State::NoVersion, beta_program, version_check, version_event_sender, @@ -201,7 +204,7 @@ impl VersionRouter { version_request: Fuse::terminated(), version_request_channels: vec![], #[cfg(update)] - update: Update::new(), + app_upgrade_broadcast, } .run() .await; @@ -210,22 +213,6 @@ impl VersionRouter { } async fn run(mut self) { - // HACK: We can (should) only handle update events on some targets. - // Trying to cfg a branch in `tokio::select!` will not work, so creating - // a closure for conditionally responding to upgrade events will have to do. - #[cfg(update)] - let handle_update_event = || async { - // Received upgrade event from `downloader` - if let Some(update_event) = self.update.update_event_rx.next().await { - self.handle_update_event(update_event); - }; - }; - - #[cfg(not(update))] - let handle_update_event = || async { - let () = std::future::pending().await; - }; - loop { tokio::select! { // Respond to version info requests @@ -247,9 +234,13 @@ impl VersionRouter { Some(new_version) = self.new_version_rx.next() => { self.on_new_version(new_version); } - // Received & handled update event from `downloader` - () = handle_update_event() => { }, - Some(message) = self.rx.next() => self.handle_message(message).await, + res = wait_for_update(&mut self.state) => { + // If the download was successful, we send the new version + if let Some(app_update_info) = res { + let _ = self.version_event_sender.send(app_update_info); + } + }, + Some(message) = self.rx.next() => self.handle_message(message), else => break, } } @@ -257,8 +248,7 @@ impl VersionRouter { } /// Handle [Message] sent by user - #[cfg_attr(not(update), allow(clippy::unused_async))] - async fn handle_message(&mut self, message: Message) { + fn handle_message(&mut self, message: Message) { match message { Message::SetBetaProgram { state, result_tx } => { self.set_beta_program(state); @@ -271,298 +261,294 @@ impl VersionRouter { } #[cfg(update)] Message::UpdateApplication { result_tx } => { - self.update_application().await; + self.update_application(); let _ = result_tx.send(()); } #[cfg(update)] Message::CancelUpdate { result_tx } => { - self.cancel_upgrade().await; + self.cancel_upgrade(); let _ = result_tx.send(()); } + } + } + + /// Handle new version info + /// + /// If the router is in the process of upgrading, it will not propagate versions, but only + /// remember it for when it transitions back into the "idle" (version check) state. + fn on_new_version(&mut self, version_cache: VersionCache) { + match &mut self.state { + State::NoVersion => { + // Receive first version + let app_version_info = to_app_version_info(&version_cache, self.beta_program, None); + let _ = self.version_event_sender.send(app_version_info.clone()); + self.state = State::HasVersion { version_cache }; + } + // Already have version info, just update it + State::HasVersion { + version_cache: prev_cache, + } => { + if let Some(version_info) = updated_app_version_info_on_new_version_cache( + prev_cache, + &version_cache, + self.beta_program, + ) { + // New version available + let _ = self.version_event_sender.send(version_info.clone()); + } + self.state = State::HasVersion { version_cache }; + } #[cfg(update)] - Message::NewUpgradeEventListener { - event_tx: result_tx, + State::Downloaded { + version_cache: ref mut prev_cache, + .. + } + | State::Downloading { + version_cache: ref mut prev_cache, + .. } => { - self.update.upgrade_listeners.push(result_tx); + // If version changed, cancel download + if let Some(version_info) = updated_app_version_info_on_new_version_cache( + prev_cache, + &version_cache, + self.beta_program, + ) { + log::warn!("Received new version while upgrading: {version_info:?}, aborting"); + + let _ = self.version_event_sender.send(version_info.clone()); + self.state = State::HasVersion { version_cache }; + } else { + *prev_cache = version_cache; + } } } + + // Notify version requesters + if let Some(cache) = self.state.get_version_cache() { + self.notify_version_requesters(to_app_version_info( + cache, + self.beta_program, + self.state.get_verified_installer_path().cloned(), + )); + } + } + + fn notify_version_requesters(&mut self, new_app_version_info: AppVersionInfo) { + // Cancel update notifications + self.version_request = Fuse::terminated(); + // Notify all requesters + for tx in self.version_request_channels.drain(..) { + let _ = tx.send(Ok(new_app_version_info.clone())); + } } fn set_beta_program(&mut self, new_state: bool) { - let prev_state = self.beta_program; - if new_state == prev_state { + if new_state == self.beta_program { return; } + let previous_state = self.beta_program; self.beta_program = new_state; + let Some(new_app_version_info) = self.state.get_version_cache().and_then(|version_cache| { + updated_app_version_info_on_new_beta(version_cache, previous_state, new_state) + }) else { + return; + }; - match &self.state { - // Emit version event if suggested upgrade changes - RoutingState::HasVersion { version_info } - | RoutingState::Downloaded { version_info, .. } => { - let prev_app_version_info = to_app_version_info(version_info, prev_state); - let new_app_version_info = to_app_version_info(version_info, new_state); + // Always cancel download if the suggested upgrade changes - if new_app_version_info != prev_app_version_info { - let _ = self.version_event_sender.send(new_app_version_info); + let version_cache = match mem::replace(&mut self.state, State::NoVersion) { + #[cfg(update)] + State::Downloaded { version_cache, .. } | State::Downloading { version_cache, .. } => { + log::warn!("Switching beta after while updating resulted in new suggested upgrade: {:?}, aborting", new_app_version_info.suggested_upgrade); + version_cache + } + State::HasVersion { version_cache } => version_cache, + State::NoVersion => { + unreachable!("Can't get recommended upgrade on beta change without version") + } + }; - // Note: If we're in the `Downloaded` state, this resets the state to `HasVersion` - self.state = RoutingState::HasVersion { - version_info: version_info.clone(), - }; + self.state = State::HasVersion { version_cache }; + let _ = self.version_event_sender.send(new_app_version_info.clone()); - self.notify_version_requesters(); - } - } - // If there's no version or upgrading, do nothing - RoutingState::NoVersion | RoutingState::Downloading { .. } => (), - } + self.notify_version_requesters(new_app_version_info); } fn get_latest_version( &mut self, result_tx: oneshot::Sender<std::result::Result<AppVersionInfo, Error>>, ) { - match &self.state { - // When not upgrading, potentially fetch new version info, and append `result_tx` to - // list of channels to notify. - // We don't wait on `get_version_info` so that we don't block user commands. - RoutingState::NoVersion - | RoutingState::HasVersion { .. } - | RoutingState::Downloaded { .. } => { - // Start a version request unless already in progress - if self.version_request.is_terminated() { - let check = self.version_check.clone(); - let check_fut: Pin<Box<dyn Future<Output = Result<VersionCache>> + Send>> = - Box::pin(async move { check.get_version_info().await }); - self.version_request = check_fut.fuse(); - } - // Append to response channels - self.version_request_channels.push(result_tx); - } - // During upgrades, just pass on the last known version - RoutingState::Downloading { - version_info, - upgrading_to_version, - new_version: _, - downloader_handle: _, - } => { - let suggested_upgrade = suggested_upgrade_for_version(upgrading_to_version); - let info = AppVersionInfo { - current_version_supported: version_info.current_version_supported, - suggested_upgrade: Some(suggested_upgrade), - }; - let _ = result_tx.send(Ok(info)); - } + // Start a version request unless already in progress + match self + .refresh_version_check_tx + .unbounded_send(()) + .map_err(|_e| Error::VersionRouterClosed) + { + // Append to response channels + Ok(()) => self.version_request_channels.push(result_tx), + Err(err) => result_tx + .send(Err(err)) + .unwrap_or_else(|e| log::warn!("Failed to send version request result: {e:?}")), } + // Append to response channels + self.version_request_channels.push(result_tx); } #[cfg(update)] - async fn update_application(&mut self) { - match mem::replace(&mut self.state, RoutingState::NoVersion) { - // Checking state: start upgrade, if upgrade is available - RoutingState::HasVersion { version_info } => { - let Some(suggested_upgrade) = - suggested_upgrade(&version_info.latest_version, self.beta_program) + fn update_application(&mut self) { + use crate::version::downloader::spawn_downloader; + + match mem::replace(&mut self.state, State::NoVersion) { + // If we're already downloading or have a version, do nothing + State::Downloaded { version_cache, .. } | State::HasVersion { version_cache } => { + let Some(upgrading_to_version) = + recommended_version_upgrade(&version_cache.latest_version, self.beta_program) else { // If there's no suggested upgrade, do nothing - log::trace!("Received update request without suggested upgrade"); - self.state = RoutingState::HasVersion { version_info }; + log::debug!("Received update request without suggested upgrade"); + self.state = State::HasVersion { version_cache }; return; }; + log::info!( + "Starting upgrade to version {}", + upgrading_to_version.version + ); - let downloader_handle = tokio::spawn( - downloader::Downloader::start( - suggested_upgrade.clone(), - self.update_event_tx.clone(), - ) - .await - .expect("TODO: handle err"), + let downloader_handle = spawn_downloader( + upgrading_to_version.clone(), + self.app_upgrade_broadcast.clone(), ); - log::debug!("Starting upgrade"); - self.state = RoutingState::Downloading { - version_info, - upgrading_to_version: suggested_upgrade, - new_version: None, + self.state = State::Downloading { + version_cache, + upgrading_to_version, downloader_handle, }; - - // Notify callers of `get_latest_version`: cancel the version check and - // advertise the last known version as latest - self.notify_version_requesters(); } // Already downloading/downloaded or there is no version: do nothing state => { + log::debug!("Ignoring update request while in state {:?}", state); self.state = state; } } } #[cfg(update)] - async fn cancel_upgrade(&mut self) { - match mem::replace(&mut self.state, RoutingState::NoVersion) { + fn cancel_upgrade(&mut self) { + match mem::replace(&mut self.state, State::NoVersion) { // If we're upgrading, emit an event if a version was received during the upgrade // Otherwise, just reset upgrade info to last known state - RoutingState::Downloading { - version_info, - upgrading_to_version: _, - new_version, - downloader_handle, - } => { - // Abort download - downloader_handle.abort(); - let _ = downloader_handle.await; - - // Reset app version info to last known state - self.state = RoutingState::HasVersion { version_info }; - - // If we also received an upgrade, emit new version event - if let Some(version) = new_version { - let app_version = to_app_version_info(&version, self.beta_program); - let _ = self.version_event_sender.send(app_version); - } + State::Downloaded { version_cache, .. } | State::Downloading { version_cache, .. } => { + self.state = State::HasVersion { version_cache }; } // No-op unless we're downloading something right now // In the `Downloaded` state, we also do nothing state => self.state = state, }; + debug_assert!(!matches!( + self.state, + State::Downloading { .. } | State::NoVersion + )); } +} - /// Handle new version info - /// - /// If the router is in the process of upgrading, it will not propagate versions, but only - /// remember it for when it transitions back into the "idle" (version check) state. - fn on_new_version(&mut self, version: VersionCache) { - match &mut self.state { - // Set app version info - RoutingState::NoVersion => { - self.state = RoutingState::HasVersion { - version_info: version.clone(), - }; +fn updated_app_version_info_on_new_version_cache( + version_cache: &VersionCache, + new_version_cache: &VersionCache, + beta_program: bool, +) -> Option<AppVersionInfo> { + let prev_app_version = to_app_version_info(version_cache, beta_program, None); + let new_app_version = to_app_version_info(new_version_cache, beta_program, None); - // Initial version is propagated - let app_version_info = to_app_version_info(&version, self.beta_program); - let _ = self.version_event_sender.send(app_version_info); - } - // Update app version info - RoutingState::HasVersion { - version_info: prev_version, - .. - } - | RoutingState::Downloaded { - version_info: prev_version, - .. - } => { - // If the version changed, notify channel - // Note: Must account for beta program state - let prev_app_version = to_app_version_info(prev_version, self.beta_program); - let new_app_version = to_app_version_info(&version, self.beta_program); - if new_app_version != prev_app_version { - let _ = self.version_event_sender.send(new_app_version); - } + // Update version info + if new_app_version != prev_app_version { + Some(new_app_version) + } else { + None + } +} - // Note: If we're in the `Downloaded` state, this resets the state to `HasVersion` - if prev_version != &version { - self.state = RoutingState::HasVersion { - version_info: version, - }; - } - } - // If we're upgrading, remember the new version, but don't send any notification - RoutingState::Downloading { - ref mut new_version, - .. - } => { - *new_version = Some(version); - } - } +fn updated_app_version_info_on_new_beta( + version_cache: &VersionCache, + previous_beta_state: bool, + new_beta_state: bool, +) -> Option<AppVersionInfo> { + let prev_app_version = to_app_version_info(version_cache, previous_beta_state, None); + let new_app_version = to_app_version_info(version_cache, new_beta_state, None); - // Notify callers of `get_latest_version` - self.notify_version_requesters(); + // Update version info + if new_app_version != prev_app_version { + Some(new_app_version) + } else { + None } +} +/// Wait for the update to finish. In case no update is in progress (or the platform does not +/// support in-app upgrades), then the future will never resolve as to not escape the select statement. +#[allow(clippy::unused_async, unused_variables)] +async fn wait_for_update(state: &mut State) -> Option<AppVersionInfo> { #[cfg(update)] - fn handle_update_event(&mut self, event: downloader::UpdateEvent) { - debug_assert!( - matches!(self.state, RoutingState::Downloading { .. }), - "unexpected routing state: {:?}", - self.state - ); - - use downloader::UpdateEvent; - - match event { - UpdateEvent::Downloading { - server, - complete_frac: f32, - time_left, - } => { - // TODO: emit version event to clients - } - UpdateEvent::DownloadFailed => { - // TODO: transition to HasVersion state - // TODO: emit version event to clients - } - UpdateEvent::Verifying => { - // TODO: emit version event to clients - } - UpdateEvent::VerificationFailed => { - // TODO: transition to HasVersion state - // TODO: emit version event to clients + match state { + State::Downloading { + version_cache, + ref mut downloader_handle, + upgrading_to_version, + .. + } => match downloader_handle.wait().await { + Ok(verified_installer_path) => { + let app_update_info = AppVersionInfo { + current_version_supported: version_cache.current_version_supported, + suggested_upgrade: Some({ + SuggestedUpgrade { + version: upgrading_to_version.version.clone(), + changelog: upgrading_to_version.changelog.clone(), + verified_installer_path: Some(verified_installer_path.clone()), + } + }), + }; + *state = State::Downloaded { + version_cache: version_cache.clone(), + verified_installer_path, + }; + Some(app_update_info) } - UpdateEvent::Verified { - verified_installer_path, - } => { - // TODO: transition to Downloaded state - // TODO: emit version event to clients + Err(err) => { + log::trace!("Downloader task ended: {err}"); + *state = State::HasVersion { + version_cache: version_cache.clone(), + }; + None } + }, + _ => { + let () = std::future::pending().await; + unreachable!() } } - - /// Notify clients requesting a version - fn notify_version_requesters(&mut self) { - // Cancel update notifications - self.version_request = Fuse::terminated(); - - let version_info = match &self.state { - RoutingState::NoVersion => { - log::error!("Dropping version request channels since there's no version"); - self.version_request_channels.clear(); - return; - } - // Update app version info - RoutingState::HasVersion { version_info } - | RoutingState::Downloaded { version_info, .. } => { - to_app_version_info(version_info, self.beta_program) - } - // If we're upgrading, emit the version we're currently upgrading to - RoutingState::Downloading { - version_info, - upgrading_to_version, - .. - } => { - let suggested_upgrade = suggested_upgrade_for_version(upgrading_to_version); - AppVersionInfo { - current_version_supported: version_info.current_version_supported, - suggested_upgrade: Some(suggested_upgrade), - } - } - }; - - // Notify all requesters - for tx in self.version_request_channels.drain(..) { - let _ = tx.send(Ok(version_info.clone())); - } + #[cfg(not(update))] + { + let () = std::future::pending().await; + unreachable!() } } /// Extract [`AppVersionInfo`], containing upgrade version and `current_version_supported` /// from [VersionCache] and beta program state. -fn to_app_version_info(cache: &VersionCache, beta_program: bool) -> AppVersionInfo { +fn to_app_version_info( + cache: &VersionCache, + beta_program: bool, + verified_installer_path: Option<PathBuf>, +) -> AppVersionInfo { let current_version_supported = cache.current_version_supported; - let suggested_upgrade = suggested_upgrade(&cache.latest_version, beta_program) - .as_ref() - .map(suggested_upgrade_for_version); + let suggested_upgrade = + recommended_version_upgrade(&cache.latest_version, beta_program).map(|version| { + SuggestedUpgrade { + version: version.version, + changelog: version.changelog, + verified_installer_path, + } + }); AppVersionInfo { current_version_supported, suggested_upgrade, @@ -570,7 +556,7 @@ fn to_app_version_info(cache: &VersionCache, beta_program: bool) -> AppVersionIn } /// Extract upgrade version from [VersionCache] based on `beta_program` -fn suggested_upgrade( +fn recommended_version_upgrade( version_info: &VersionInfo, beta_program: bool, ) -> Option<mullvad_update::version::Version> { @@ -588,15 +574,3 @@ fn suggested_upgrade( None } } - -/// Convert [mullvad_update::version::Version] to [SuggestedUpgrade] -fn suggested_upgrade_for_version( - version_details: &mullvad_update::version::Version, -) -> SuggestedUpgrade { - SuggestedUpgrade { - version: version_details.version.clone(), - changelog: Some(version_details.changelog.clone()), - // TODO: This should return the downloaded & verified path, if it exists - verified_installer_path: None, - } -} |
