diff options
| author | Sebastian Holmin <sebastian.holmin@mullvad.net> | 2025-04-09 13:47:36 +0200 |
|---|---|---|
| committer | Sebastian Holmin <sebastian.holmin@mullvad.net> | 2025-05-28 13:25:27 +0200 |
| commit | 23e7acba0f8afd4d238df067c836ae649fa80b84 (patch) | |
| tree | bd658b140a9f27066d1d7ae6788d84c3f4af2eef | |
| parent | c525c77d54f5c449f872094a804d77b7e85bfc55 (diff) | |
| download | mullvadvpn-23e7acba0f8afd4d238df067c836ae649fa80b84.tar.xz mullvadvpn-23e7acba0f8afd4d238df067c836ae649fa80b84.zip | |
Add in app upgrades to the daemon
---------
Co-authored-by: Markus Pettersson <markus.pettersson@mullvad.net>
| -rw-r--r-- | Cargo.lock | 1 | ||||
| -rw-r--r-- | installer-downloader/src/controller.rs | 2 | ||||
| -rw-r--r-- | mullvad-daemon/Cargo.toml | 2 | ||||
| -rw-r--r-- | mullvad-daemon/build.rs | 21 | ||||
| -rw-r--r-- | mullvad-daemon/src/lib.rs | 29 | ||||
| -rw-r--r-- | mullvad-daemon/src/management_interface.rs | 54 | ||||
| -rw-r--r-- | mullvad-daemon/src/version/check.rs | 2 | ||||
| -rw-r--r-- | mullvad-daemon/src/version/downloader.rs | 215 | ||||
| -rw-r--r-- | mullvad-daemon/src/version/mod.rs | 3 | ||||
| -rw-r--r-- | mullvad-daemon/src/version/router.rs | 604 | ||||
| -rw-r--r-- | mullvad-management-interface/proto/management_interface.proto | 4 | ||||
| -rw-r--r-- | mullvad-management-interface/src/types/conversions/version.rs | 20 | ||||
| -rw-r--r-- | mullvad-paths/src/cache.rs | 9 | ||||
| -rw-r--r-- | mullvad-paths/src/lib.rs | 17 | ||||
| -rw-r--r-- | mullvad-paths/src/logs.rs | 16 | ||||
| -rw-r--r-- | mullvad-paths/src/settings.rs | 10 | ||||
| -rw-r--r-- | mullvad-paths/src/unix.rs | 30 | ||||
| -rw-r--r-- | mullvad-paths/src/windows.rs | 46 | ||||
| -rw-r--r-- | mullvad-types/src/version.rs | 23 |
19 files changed, 598 insertions, 510 deletions
diff --git a/Cargo.lock b/Cargo.lock index c7097fc645..08708f4541 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5809,6 +5809,7 @@ dependencies = [ "futures-core", "pin-project-lite", "tokio", + "tokio-util 0.7.10", ] [[package]] diff --git a/installer-downloader/src/controller.rs b/installer-downloader/src/controller.rs index cf63154719..a279498e27 100644 --- a/installer-downloader/src/controller.rs +++ b/installer-downloader/src/controller.rs @@ -328,7 +328,7 @@ impl<D: AppDelegate + 'static, A: From<UiAppDownloaderParameters<D>> + AppDownlo } }; - log::debug!("Download directory: {}", download_dir.display()); + log::trace!("Download directory: {}", download_dir.display()); // Begin download let (tx, rx) = oneshot::channel(); diff --git a/mullvad-daemon/Cargo.toml b/mullvad-daemon/Cargo.toml index 6078473c06..c7a25031fa 100644 --- a/mullvad-daemon/Cargo.toml +++ b/mullvad-daemon/Cargo.toml @@ -29,7 +29,7 @@ regex = "1.0" serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } tokio = { workspace = true, features = ["fs", "io-util", "rt-multi-thread", "sync", "time"] } -tokio-stream = "0.1" +tokio-stream = { version = "0.1", features = ["sync"]} socket2 = { workspace = true } mullvad-relay-selector = { path = "../mullvad-relay-selector" } diff --git a/mullvad-daemon/build.rs b/mullvad-daemon/build.rs index 40065c2a89..a1d1c69b4a 100644 --- a/mullvad-daemon/build.rs +++ b/mullvad-daemon/build.rs @@ -36,7 +36,7 @@ fn main() { // Enable in-app upgrades on macOS and Windows println!("cargo::rustc-check-cfg=cfg(update)"); - if cfg!(any(target_os = "macos", target_os = "windows")) { + if matches!(target_os(), Os::Windows | Os::Macos) { println!(r#"cargo::rustc-cfg=update"#); } } @@ -51,3 +51,22 @@ fn commit_date() -> String { .trim() .to_owned() } + +#[derive(PartialEq, Eq, Clone, Copy)] +enum Os { + Windows, + Macos, + Linux, + Android, +} + +fn target_os() -> Os { + let target_os = env::var("CARGO_CFG_TARGET_OS").unwrap(); + match target_os.as_str() { + "windows" => Os::Windows, + "macos" => Os::Macos, + "linux" => Os::Linux, + "android" => Os::Android, + _ => panic!("Unsupported target os: {target_os}"), + } +} diff --git a/mullvad-daemon/src/lib.rs b/mullvad-daemon/src/lib.rs index 2b59d7f62d..60832c708e 100644 --- a/mullvad-daemon/src/lib.rs +++ b/mullvad-daemon/src/lib.rs @@ -405,9 +405,9 @@ pub enum DaemonCommand { /// Prompt the daemon to start an app version upgrade. /// /// If an upgrade had previously been started but not completed the daemon should continue the upgrade process at the appropriate step. The client need not be notified about this detail. - AppUpgrade(ResponseTx<(), Error>), + AppUpgrade(ResponseTx<(), version::Error>), /// Prompt the daemon to abort the current upgrade. - AppUpgradeAbort(ResponseTx<(), Error>), + AppUpgradeAbort(ResponseTx<(), version::Error>), } /// All events that can happen in the daemon. Sent from various threads and exposed interfaces. @@ -659,9 +659,13 @@ impl Daemon { macos::bump_filehandle_limit(); let command_sender = daemon_command_channel.sender(); - let management_interface = - ManagementInterfaceServer::start(command_sender, config.rpc_socket_path) - .map_err(Error::ManagementInterfaceError)?; + let app_upgrade_broadcast = tokio::sync::broadcast::channel(128).0; // TODO: look over bufsize + let management_interface = ManagementInterfaceServer::start( + command_sender, + config.rpc_socket_path, + app_upgrade_broadcast.clone(), + ) + .map_err(Error::ManagementInterfaceError)?; let (internal_event_tx, internal_event_rx) = daemon_command_channel.destructure(); @@ -902,6 +906,7 @@ impl Daemon { config.cache_dir.clone(), internal_event_tx.to_specialized_sender(), settings.show_beta_releases, + app_upgrade_broadcast, ); // Attempt to download a fresh relay list @@ -1478,8 +1483,8 @@ impl Daemon { GetFeatureIndicators(tx) => self.on_get_feature_indicators(tx), DisableRelay { relay, tx } => self.on_toggle_relay(relay, false, tx), EnableRelay { relay, tx } => self.on_toggle_relay(relay, true, tx), - AppUpgrade(tx) => self.on_app_upgrade(tx), - AppUpgradeAbort(tx) => self.on_app_upgrade_abort(tx), + AppUpgrade(tx) => self.on_app_upgrade(tx).await, + AppUpgradeAbort(tx) => self.on_app_upgrade_abort(tx).await, } } @@ -3223,15 +3228,13 @@ impl Daemon { Self::oneshot_send(tx, (), "on_toggle_relay response"); } - fn on_app_upgrade(&self, tx: ResponseTx<(), Error>) { - // TODO: Call the Downloader - let result = Ok(()); + async fn on_app_upgrade(&self, tx: ResponseTx<(), version::Error>) { + let result = self.version_handle.update_application().await; Self::oneshot_send(tx, result, "on_app_upgrade response"); } - fn on_app_upgrade_abort(&self, tx: ResponseTx<(), Error>) { - // TODO: Abort the Downloader - let result = Ok(()); + async fn on_app_upgrade_abort(&self, tx: ResponseTx<(), version::Error>) { + let result = self.version_handle.cancel_update().await; Self::oneshot_send(tx, result, "on_app_upgrade_abort response"); } diff --git a/mullvad-daemon/src/management_interface.rs b/mullvad-daemon/src/management_interface.rs index 96a541f87b..764d98421a 100644 --- a/mullvad-daemon/src/management_interface.rs +++ b/mullvad-daemon/src/management_interface.rs @@ -38,10 +38,12 @@ pub enum Error { SetupError(#[source] mullvad_management_interface::Error), } +pub type AppUpgradeBroadcast = tokio::sync::broadcast::Sender<version::AppUpgradeEvent>; + struct ManagementServiceImpl { daemon_tx: DaemonCommandSender, subscriptions: Arc<Mutex<Vec<EventsListenerSender>>>, - app_upgrade_event_subscribers: Arc<Mutex<Vec<AppUpgradeEventListenerSender>>>, + pub app_upgrade_broadcast: AppUpgradeBroadcast, } pub type ServiceResult<T> = std::result::Result<Response<T>, Status>; @@ -49,9 +51,7 @@ type EventsListenerReceiver = UnboundedReceiverStream<Result<types::DaemonEvent, type EventsListenerSender = tokio::sync::mpsc::UnboundedSender<Result<types::DaemonEvent, Status>>; type AppUpgradeEventListenerReceiver = - UnboundedReceiverStream<Result<types::AppUpgradeEvent, Status>>; -type AppUpgradeEventListenerSender = - tokio::sync::mpsc::UnboundedSender<Result<types::AppUpgradeEvent, Status>>; + Box<dyn futures::Stream<Item = Result<types::AppUpgradeEvent, Status>> + Send + Unpin>; const INVALID_VOUCHER_MESSAGE: &str = "This voucher code is invalid"; const USED_VOUCHER_MESSAGE: &str = "This voucher code has already been used"; @@ -1131,7 +1131,9 @@ impl ManagementService for ManagementServiceImpl { let (tx, rx) = oneshot::channel(); self.send_command_to_daemon(DaemonCommand::AppUpgrade(tx))?; - self.wait_for_result(rx).await?.map_err(map_daemon_error)?; + self.wait_for_result(rx) + .await? + .map_err(map_version_check_error)?; Ok(Response::new(())) } @@ -1142,7 +1144,9 @@ impl ManagementService for ManagementServiceImpl { let (tx, rx) = oneshot::channel(); self.send_command_to_daemon(DaemonCommand::AppUpgradeAbort(tx))?; - self.wait_for_result(rx).await?.map_err(map_daemon_error)?; + self.wait_for_result(rx) + .await? + .map_err(map_version_check_error)?; Ok(Response::new(())) } @@ -1151,13 +1155,19 @@ impl ManagementService for ManagementServiceImpl { &self, _: Request<()>, ) -> ServiceResult<Self::AppUpgradeEventsListenStream> { - let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); - - let mut subscriptions = self.app_upgrade_event_subscribers.lock().unwrap(); - subscriptions.push(tx); + log::debug!("app_upgrade_events_listen"); + let rx = self.app_upgrade_broadcast.subscribe(); + let upgrade_event_stream = + tokio_stream::wrappers::BroadcastStream::new(rx).map(|result| match result { + Ok(event) => Ok(event.into()), + Err(error) => Err(Status::internal(format!( + "Failed to receive app upgrade event: {error}" + ))), + }); - let upgrade_event_stream = UnboundedReceiverStream::new(rx); - Ok(Response::new(upgrade_event_stream)) + Ok(Response::new( + Box::new(upgrade_event_stream) as Self::AppUpgradeEventsListenStream + )) } } @@ -1191,18 +1201,19 @@ impl ManagementInterfaceServer { pub fn start( daemon_tx: DaemonCommandSender, rpc_socket_path: impl AsRef<Path>, + app_upgrade_broadcast: tokio::sync::broadcast::Sender<version::AppUpgradeEvent>, ) -> Result<ManagementInterfaceServer, Error> { let subscriptions = Arc::<Mutex<Vec<EventsListenerSender>>>::default(); - let app_upgrade_event_subscriptions = - Arc::<Mutex<Vec<AppUpgradeEventListenerSender>>>::default(); + // NOTE: It is important that the channel buffer size is kept at 0. When sending a signal // to abort the gRPC server, the sender can be awaited to know when the gRPC server has // received and started processing the shutdown signal. let (server_abort_tx, server_abort_rx) = mpsc::channel(0); + let server = ManagementServiceImpl { daemon_tx, subscriptions: subscriptions.clone(), - app_upgrade_event_subscribers: app_upgrade_event_subscriptions.clone(), + app_upgrade_broadcast, }; let rpc_server_join_handle = mullvad_management_interface::spawn_rpc_server( server, @@ -1218,10 +1229,7 @@ impl ManagementInterfaceServer { rpc_socket_path.as_ref().display() ); - let broadcast = ManagementInterfaceEventBroadcaster { - subscriptions, - app_upgrade_event_subscriptions, - }; + let broadcast = ManagementInterfaceEventBroadcaster { subscriptions }; Ok(ManagementInterfaceServer { rpc_server_join_handle, @@ -1261,7 +1269,6 @@ impl ManagementInterfaceServer { #[derive(Clone)] pub struct ManagementInterfaceEventBroadcaster { subscriptions: Arc<Mutex<Vec<EventsListenerSender>>>, - app_upgrade_event_subscriptions: Arc<Mutex<Vec<AppUpgradeEventListenerSender>>>, } impl ManagementInterfaceEventBroadcaster { @@ -1270,11 +1277,6 @@ impl ManagementInterfaceEventBroadcaster { subscriptions.retain(|tx| tx.send(Ok(value.clone())).is_ok()); } - pub(crate) fn notify_upgrade_event(&self, value: version::AppUpgradeEvent) { - let mut subscriptions = self.app_upgrade_event_subscriptions.lock().unwrap(); - subscriptions.retain(|tx| tx.send(Ok(value.clone().into())).is_ok()); - } - /// Notify that the tunnel state changed. /// /// Sends a new state update to all `new_state` subscribers of the management interface. @@ -1313,7 +1315,7 @@ impl ManagementInterfaceEventBroadcaster { /// Notify that info about the latest available app version changed. /// Or some flag about the currently running version is changed. pub(crate) fn notify_app_version(&self, app_version_info: version::AppVersionInfo) { - log::debug!("Broadcasting new app version info"); + log::debug!("Broadcasting app version info:\n{app_version_info}"); self.notify(types::DaemonEvent { event: Some(daemon_event::Event::VersionInfo( types::AppVersionInfo::from(app_version_info), diff --git a/mullvad-daemon/src/version/check.rs b/mullvad-daemon/src/version/check.rs index d2221a55d5..e6ebff9920 100644 --- a/mullvad-daemon/src/version/check.rs +++ b/mullvad-daemon/src/version/check.rs @@ -468,7 +468,7 @@ async fn try_load_cache(cache_dir: &Path) -> Result<(VersionCache, SystemTime), let cache: VersionCache = serde_json::from_str(&content).map_err(Error::Deserialize)?; - if cache_is_old(&cache.latest_version, &*APP_VERSION) { + if cache_is_old(&cache.latest_version, &APP_VERSION) { return Err(Error::OutdatedVersion); } diff --git a/mullvad-daemon/src/version/downloader.rs b/mullvad-daemon/src/version/downloader.rs index d1d9126f5b..f7cd2cc5cd 100644 --- a/mullvad-daemon/src/version/downloader.rs +++ b/mullvad-daemon/src/version/downloader.rs @@ -1,107 +1,133 @@ #![cfg(update)] -use futures::channel::{mpsc, oneshot}; -use mullvad_update::app::{AppDownloader, AppDownloaderParameters, HttpAppDownloader}; +use mullvad_types::version::{AppUpgradeDownloadProgress, AppUpgradeError, AppUpgradeEvent}; +use mullvad_update::app::{ + AppDownloader, AppDownloaderParameters, DownloadError, HttpAppDownloader, +}; use rand::seq::SliceRandom; -use std::time::Duration; -use std::{future::Future, path::PathBuf}; +use std::path::PathBuf; +use std::time::{Duration, Instant}; use tokio::fs; +use tokio::sync::broadcast; -type Result<T> = std::result::Result<T, Error>; - -pub struct Downloader(()); - -pub type AbortHandle = oneshot::Sender<()>; - -/// App updater event -pub enum UpdateEvent { - /// Download progress update - Downloading { - /// Server that the app is being downloaded from - server: String, - /// A fraction in `[0,1]` that describes how much of the installer has been downloaded - complete_frac: f32, - /// Estimated time left - time_left: Duration, - }, - /// Download failed due to some error - DownloadFailed, - /// Download completed, so verifying now - Verifying, - /// The verification failed due to some error - VerificationFailed, - /// There is a downloaded and verified installer available - Verified { verified_installer_path: PathBuf }, -} - +#[derive(thiserror::Error, Debug)] pub enum Error { #[error("Failed to get download directory")] GetDownloadDir(#[from] mullvad_paths::Error), #[error("Failed to create download directory")] - CreateDownloadDir(#[source] io::Error), + CreateDownloadDir(#[source] std::io::Error), + + #[error("Failed to download app")] + Download(#[from] DownloadError), + + #[error("Download was cancelled or panicked")] + JoinError(#[from] tokio::task::JoinError), #[error("Could not select URL for app update")] NoUrlFound, } -impl Downloader { - /// Begin or resume download of `version` - pub async fn start( - version: mullvad_update::version::Version, - event_tx: mpsc::UnboundedSender<UpdateEvent>, - ) -> Result<impl Future<Output = ()>> { - let url = select_cdn_url(&version.urls) - .ok_or(Error::NoUrlFound)? - .to_owned(); +type Result<T> = std::result::Result<T, Error>; - let download_dir = mullvad_paths::cache_dir()?.join("mullvad-update"); - fs::create_dir_all(&download_dir) - .await - .map_err(Error::CreateDownloadDir)?; +#[derive(Debug)] +pub struct DownloaderHandle { + /// Handle to the downloader task + task: tokio::task::JoinHandle<std::result::Result<PathBuf, Error>>, + /// Handle to send `AppUpgradeEvent::Aborted` when the downloader is dropped + dropped_tx: Option<broadcast::Sender<AppUpgradeEvent>>, +} - let params = AppDownloaderParameters { - app_version: version.version, - app_url: url.clone(), - app_size: version.size, - app_progress: ProgressUpdater::new(server_from_url(&url), event_tx.clone()), - app_sha256: version.sha256, - cache_dir: download_dir, - }; - let mut downloader = HttpAppDownloader::from(params); +impl Drop for DownloaderHandle { + fn drop(&mut self) { + self.task.abort(); + if let Some(dropped_tx) = self.dropped_tx.take() { + // If the downloader is dropped, send an event to notify that it was aborted + let _ = dropped_tx.send(AppUpgradeEvent::Aborted); + } + } +} - Ok(async move { - if let Err(_error) = downloader.download_executable().await { - let _ = event_tx.unbounded_send(UpdateEvent::DownloadFailed); - return; - } +impl DownloaderHandle { + /// Wait for the downloader to finish + pub async fn wait(&mut self) -> Result<PathBuf> { + let path = (&mut self.task).await?; + self.dropped_tx = None; // Prevent sending the aborted event after successful download + path + } +} - let _ = event_tx.unbounded_send(UpdateEvent::Verifying); +pub fn spawn_downloader( + version: mullvad_update::version::Version, + event_tx: broadcast::Sender<AppUpgradeEvent>, +) -> DownloaderHandle { + DownloaderHandle { + task: tokio::spawn(start(version, event_tx.clone())), + dropped_tx: Some(event_tx), + } +} - if let Err(_error) = downloader.verify().await { - let _ = event_tx.unbounded_send(UpdateEvent::VerificationFailed); - return; - } +/// Begin or resume download of `version` +async fn start( + version: mullvad_update::version::Version, + event_tx: broadcast::Sender<AppUpgradeEvent>, +) -> Result<PathBuf> { + let url = select_cdn_url(&version.urls) + .ok_or(Error::NoUrlFound)? + .to_owned(); - let _ = event_tx.unbounded_send(UpdateEvent::Verified { - verified_installer_path: downloader.bin_path(), - }); - }) - } + log::info!("Downloading app version '{}' from {url}", version.version); + + let download_dir = mullvad_paths::cache_dir()?.join("mullvad-update"); + log::trace!("Download directory: {download_dir:?}"); + fs::create_dir_all(&download_dir) + .await + .map_err(Error::CreateDownloadDir)?; + + let params = AppDownloaderParameters { + app_version: version.version, + app_url: url.clone(), + app_size: version.size, + app_progress: ProgressUpdater::new(server_from_url(&url), event_tx.clone()), + app_sha256: version.sha256, + cache_dir: download_dir, + }; + let mut downloader = HttpAppDownloader::from(params); + + if let Err(download_err) = downloader.download_executable().await { + log::error!("Failed to download app: {download_err}"); + let _ = event_tx.send(AppUpgradeEvent::Error(AppUpgradeError::DownloadFailed)); + return Err(download_err.into()); + }; + + let _ = event_tx.send(AppUpgradeEvent::VerifyingInstaller); + + if let Err(verify_err) = downloader.verify().await { + log::error!("Failed to verify downloaded app: {verify_err}"); + let _ = event_tx.send(AppUpgradeEvent::Error(AppUpgradeError::VerificationFailed)); + return Err(verify_err.into()); + }; + + let _ = event_tx.send(AppUpgradeEvent::VerifiedInstaller); + Ok(downloader.bin_path()) } struct ProgressUpdater { server: String, - event_tx: mpsc::UnboundedSender<UpdateEvent>, + event_tx: broadcast::Sender<AppUpgradeEvent>, complete_frac: f32, + start_time: Instant, + complete_frac_at_start: Option<f32>, } impl ProgressUpdater { - fn new(server: String, event_tx: mpsc::UnboundedSender<UpdateEvent>) -> Self { + fn new(server: String, event_tx: broadcast::Sender<AppUpgradeEvent>) -> Self { Self { server, event_tx, complete_frac: 0., + start_time: Instant::now(), + complete_frac_at_start: None, } } } @@ -115,29 +141,52 @@ impl mullvad_update::fetch::ProgressUpdater for ProgressUpdater { if (self.complete_frac - fraction_complete).abs() < 0.01 { return; } + let complete_frac_at_start = self.complete_frac_at_start.get_or_insert(fraction_complete); self.complete_frac = fraction_complete; - let _ = self.event_tx.unbounded_send(UpdateEvent::Downloading { - server: self.server.clone(), - complete_frac: fraction_complete, - // TODO: estimate time left based on how much was downloaded (maybe in last n seconds) - time_left: Duration::ZERO, - }); + let _ = self.event_tx.send(AppUpgradeEvent::DownloadProgress( + AppUpgradeDownloadProgress { + server: self.server.clone(), + progress: (fraction_complete * 100.0) as u32, + time_left: estimate_time_left( + self.start_time, + fraction_complete, + *complete_frac_at_start, + ), + }, + )); } fn clear_progress(&mut self) { self.complete_frac = 0.; - let _ = self.event_tx.unbounded_send(UpdateEvent::Downloading { - server: self.server.clone(), - complete_frac: 0., - // TODO: Check if this is reasonable - time_left: Duration::ZERO, - }); + let _ = self.event_tx.send(AppUpgradeEvent::DownloadProgress( + AppUpgradeDownloadProgress { + server: self.server.clone(), + progress: 0, + time_left: None, + }, + )); } } +fn estimate_time_left( + start_time: Instant, + fraction_complete: f32, + complete_frac_at_start: f32, +) -> Option<Duration> { + let completed_frac_since_start = fraction_complete - complete_frac_at_start; + // Don't estimate time left if the progress is less than 1%, to avoid division numerical instability + if completed_frac_since_start <= 0.01 { + return None; + } + let remaining_frac = 1.0 - fraction_complete; + + let elapsed = start_time.elapsed(); + Some(elapsed.mul_f32(remaining_frac / completed_frac_since_start)) +} + /// Select a mirror to download from /// Currently, the selection is random fn select_cdn_url(urls: &[String]) -> Option<&str> { diff --git a/mullvad-daemon/src/version/mod.rs b/mullvad-daemon/src/version/mod.rs index 7c769ceb99..e83082ae64 100644 --- a/mullvad-daemon/src/version/mod.rs +++ b/mullvad-daemon/src/version/mod.rs @@ -38,9 +38,6 @@ pub enum Error { #[error("Version cache update was aborted")] UpdateAborted, - - #[cfg(update)] - Update(#[transparent] downloader::Error), } /// Contains the date of the git commit this was built from diff --git a/mullvad-daemon/src/version/router.rs b/mullvad-daemon/src/version/router.rs index 7e62c0e2fb..6e9eec9f45 100644 --- a/mullvad-daemon/src/version/router.rs +++ b/mullvad-daemon/src/version/router.rs @@ -11,6 +11,7 @@ use mullvad_types::version::{AppVersionInfo, SuggestedUpgrade}; use mullvad_update::version::VersionInfo; use talpid_core::mpsc::Sender; +use crate::management_interface::AppUpgradeBroadcast; use crate::DaemonEventSender; use super::{ @@ -20,9 +21,6 @@ use super::{ #[cfg(update)] use super::downloader; -#[cfg(update)] -use mullvad_types::version::AppUpgradeEvent; -#[cfg(update)] use std::mem; type Result<T> = std::result::Result<T, Error>; @@ -41,7 +39,7 @@ impl VersionRouterHandle { result_rx.await.map_err(|_| Error::VersionRouterClosed) } - pub async fn get_latest_version(&self) -> Result<mullvad_types::version::AppVersionInfo> { + pub async fn get_latest_version(&self) -> Result<AppVersionInfo> { let (result_tx, result_rx) = oneshot::channel(); self.tx .send(Message::GetLatestVersion(result_tx)) @@ -66,17 +64,6 @@ impl VersionRouterHandle { .map_err(|_| Error::VersionRouterClosed)?; result_rx.await.map_err(|_| Error::VersionRouterClosed) } - - #[cfg(update)] - pub fn new_upgrade_event_listener( - &self, - ) -> Result<mpsc::UnboundedReceiver<mullvad_types::version::AppUpgradeEvent>> { - let (event_tx, event_rx) = mpsc::unbounded(); - self.tx - .send(Message::NewUpgradeEventListener { event_tx }) - .map_err(|_| Error::VersionRouterClosed)?; - Ok(event_rx) - } } /// Router of version updates and update requests. @@ -87,9 +74,9 @@ impl VersionRouterHandle { /// in case new version info is received while the update is in progress. pub struct VersionRouter { rx: mpsc::UnboundedReceiver<Message>, - state: RoutingState, + state: State, beta_program: bool, - version_event_sender: DaemonEventSender<mullvad_types::version::AppVersionInfo>, + version_event_sender: DaemonEventSender<AppVersionInfo>, /// Version updater version_check: check::VersionUpdaterHandle, /// Channel used to receive updates from `version_check` @@ -97,31 +84,11 @@ pub struct VersionRouter { /// Future that resolves when `get_latest_version` resolves version_request: Fuse<Pin<Box<dyn Future<Output = Result<VersionCache>> + Send>>>, /// Channels that receive responses to `get_latest_version` - version_request_channels: Vec<oneshot::Sender<Result<mullvad_types::version::AppVersionInfo>>>, - #[cfg(update)] - update: Update, -} - -#[cfg(update)] -struct Update { - /// Channel used to send upgrade events from [downloader::Downloader] - update_event_tx: mpsc::UnboundedSender<downloader::UpdateEvent>, - /// Channel used to receive upgrade events from [downloader::Downloader] - update_event_rx: mpsc::UnboundedReceiver<downloader::UpdateEvent>, - /// Clients that will also receive events - upgrade_listeners: Vec<mpsc::UnboundedSender<AppUpgradeEvent>>, -} + version_request_channels: Vec<oneshot::Sender<Result<AppVersionInfo>>>, -#[cfg(update)] -impl Update { - fn new() -> Self { - let (update_event_tx, update_event_rx) = mpsc::unbounded(); - Self { - update_event_tx, - update_event_rx, - upgrade_listeners: Vec::default(), - } - } + /// Broadcast channel for app upgrade events + #[cfg(update)] + app_upgrade_broadcast: AppUpgradeBroadcast, } enum Message { @@ -131,56 +98,93 @@ enum Message { result_tx: oneshot::Sender<()>, }, /// Check for updates - GetLatestVersion(oneshot::Sender<Result<mullvad_types::version::AppVersionInfo>>), + GetLatestVersion(oneshot::Sender<Result<AppVersionInfo>>), /// Update the application #[cfg(update)] UpdateApplication { result_tx: oneshot::Sender<()> }, /// Cancel the ongoing update #[cfg(update)] CancelUpdate { result_tx: oneshot::Sender<()> }, - /// Listen for events - #[cfg(update)] - NewUpgradeEventListener { - /// Channel for receiving update events - event_tx: mpsc::UnboundedSender<AppUpgradeEvent>, - }, } #[derive(Debug)] -enum RoutingState { +enum State { /// There is no version available yet NoVersion, /// Running version checker, no upgrade in progress - HasVersion { version_info: VersionCache }, + HasVersion { version_cache: VersionCache }, /// Download is in progress, so we don't forward version checks + #[cfg(update)] Downloading { /// Version info received from `HasVersion` - version_info: VersionCache, - /// The version being upgraded to (derived from `suggested_upgrade`). - /// Should be one of the versions in `version_info`. + version_cache: VersionCache, + /// The version being upgraded to, derived from `version_info` and beta program state upgrading_to_version: mullvad_update::version::Version, - /// Version check update received while paused - /// When transitioning out of `Upgrading`, this will cause `version_info` to be updated - new_version: Option<VersionCache>, /// Tokio task for the downloader handle - downloader_handle: tokio::task::JoinHandle<()>, + downloader_handle: downloader::DownloaderHandle, }, /// Download is complete. We have a verified binary + #[cfg(update)] Downloaded { /// Version info received from `HasVersion` - version_info: VersionCache, + version_cache: VersionCache, /// Path to verified installer verified_installer_path: PathBuf, }, } +impl std::fmt::Display for State { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + State::NoVersion => write!(f, "NoVersion"), + State::HasVersion { .. } => write!(f, "HasVersion"), + #[cfg(update)] + State::Downloading { + upgrading_to_version, + .. + } => write!(f, "Downloading '{}'", upgrading_to_version.version), + #[cfg(update)] + State::Downloaded { + verified_installer_path, + .. + } => write!(f, "Downloaded '{}'", verified_installer_path.display()), + } + } +} + +impl State { + fn get_version_cache(&self) -> Option<&VersionCache> { + match self { + State::NoVersion => None, + State::HasVersion { version_cache, .. } => Some(version_cache), + #[cfg(update)] + State::Downloading { version_cache, .. } | State::Downloaded { version_cache, .. } => { + Some(version_cache) + } + } + } + + fn get_verified_installer_path(&self) -> Option<&PathBuf> { + match self { + #[cfg(update)] + State::Downloaded { + verified_installer_path, + .. + } => Some(verified_installer_path), + _ => None, + } + } +} + impl VersionRouter { + #[cfg_attr(not(update), allow(unused_variables))] pub(crate) fn spawn( api_handle: MullvadRestHandle, availability_handle: ApiAvailability, cache_dir: PathBuf, - version_event_sender: DaemonEventSender<mullvad_types::version::AppVersionInfo>, + version_event_sender: DaemonEventSender<AppVersionInfo>, beta_program: bool, + app_upgrade_broadcast: AppUpgradeBroadcast, ) -> VersionRouterHandle { let (tx, rx) = mpsc::unbounded(); @@ -190,10 +194,9 @@ impl VersionRouter { VersionUpdater::spawn(api_handle, availability_handle, cache_dir, new_version_tx) .await; - // TODO: tokio::join! here? Self { rx, - state: RoutingState::NoVersion, + state: State::NoVersion, beta_program, version_check, version_event_sender, @@ -201,7 +204,7 @@ impl VersionRouter { version_request: Fuse::terminated(), version_request_channels: vec![], #[cfg(update)] - update: Update::new(), + app_upgrade_broadcast, } .run() .await; @@ -210,22 +213,6 @@ impl VersionRouter { } async fn run(mut self) { - // HACK: We can (should) only handle update events on some targets. - // Trying to cfg a branch in `tokio::select!` will not work, so creating - // a closure for conditionally responding to upgrade events will have to do. - #[cfg(update)] - let handle_update_event = || async { - // Received upgrade event from `downloader` - if let Some(update_event) = self.update.update_event_rx.next().await { - self.handle_update_event(update_event); - }; - }; - - #[cfg(not(update))] - let handle_update_event = || async { - let () = std::future::pending().await; - }; - loop { tokio::select! { // Respond to version info requests @@ -247,9 +234,13 @@ impl VersionRouter { Some(new_version) = self.new_version_rx.next() => { self.on_new_version(new_version); } - // Received & handled update event from `downloader` - () = handle_update_event() => { }, - Some(message) = self.rx.next() => self.handle_message(message).await, + res = wait_for_update(&mut self.state) => { + // If the download was successful, we send the new version + if let Some(app_update_info) = res { + let _ = self.version_event_sender.send(app_update_info); + } + }, + Some(message) = self.rx.next() => self.handle_message(message), else => break, } } @@ -257,8 +248,7 @@ impl VersionRouter { } /// Handle [Message] sent by user - #[cfg_attr(not(update), allow(clippy::unused_async))] - async fn handle_message(&mut self, message: Message) { + fn handle_message(&mut self, message: Message) { match message { Message::SetBetaProgram { state, result_tx } => { self.set_beta_program(state); @@ -271,298 +261,294 @@ impl VersionRouter { } #[cfg(update)] Message::UpdateApplication { result_tx } => { - self.update_application().await; + self.update_application(); let _ = result_tx.send(()); } #[cfg(update)] Message::CancelUpdate { result_tx } => { - self.cancel_upgrade().await; + self.cancel_upgrade(); let _ = result_tx.send(()); } + } + } + + /// Handle new version info + /// + /// If the router is in the process of upgrading, it will not propagate versions, but only + /// remember it for when it transitions back into the "idle" (version check) state. + fn on_new_version(&mut self, version_cache: VersionCache) { + match &mut self.state { + State::NoVersion => { + // Receive first version + let app_version_info = to_app_version_info(&version_cache, self.beta_program, None); + let _ = self.version_event_sender.send(app_version_info.clone()); + self.state = State::HasVersion { version_cache }; + } + // Already have version info, just update it + State::HasVersion { + version_cache: prev_cache, + } => { + if let Some(version_info) = updated_app_version_info_on_new_version_cache( + prev_cache, + &version_cache, + self.beta_program, + ) { + // New version available + let _ = self.version_event_sender.send(version_info.clone()); + } + self.state = State::HasVersion { version_cache }; + } #[cfg(update)] - Message::NewUpgradeEventListener { - event_tx: result_tx, + State::Downloaded { + version_cache: ref mut prev_cache, + .. + } + | State::Downloading { + version_cache: ref mut prev_cache, + .. } => { - self.update.upgrade_listeners.push(result_tx); + // If version changed, cancel download + if let Some(version_info) = updated_app_version_info_on_new_version_cache( + prev_cache, + &version_cache, + self.beta_program, + ) { + log::warn!("Received new version while upgrading: {version_info:?}, aborting"); + + let _ = self.version_event_sender.send(version_info.clone()); + self.state = State::HasVersion { version_cache }; + } else { + *prev_cache = version_cache; + } } } + + // Notify version requesters + if let Some(cache) = self.state.get_version_cache() { + self.notify_version_requesters(to_app_version_info( + cache, + self.beta_program, + self.state.get_verified_installer_path().cloned(), + )); + } + } + + fn notify_version_requesters(&mut self, new_app_version_info: AppVersionInfo) { + // Cancel update notifications + self.version_request = Fuse::terminated(); + // Notify all requesters + for tx in self.version_request_channels.drain(..) { + let _ = tx.send(Ok(new_app_version_info.clone())); + } } fn set_beta_program(&mut self, new_state: bool) { - let prev_state = self.beta_program; - if new_state == prev_state { + if new_state == self.beta_program { return; } + let previous_state = self.beta_program; self.beta_program = new_state; + let Some(new_app_version_info) = self.state.get_version_cache().and_then(|version_cache| { + updated_app_version_info_on_new_beta(version_cache, previous_state, new_state) + }) else { + return; + }; - match &self.state { - // Emit version event if suggested upgrade changes - RoutingState::HasVersion { version_info } - | RoutingState::Downloaded { version_info, .. } => { - let prev_app_version_info = to_app_version_info(version_info, prev_state); - let new_app_version_info = to_app_version_info(version_info, new_state); + // Always cancel download if the suggested upgrade changes - if new_app_version_info != prev_app_version_info { - let _ = self.version_event_sender.send(new_app_version_info); + let version_cache = match mem::replace(&mut self.state, State::NoVersion) { + #[cfg(update)] + State::Downloaded { version_cache, .. } | State::Downloading { version_cache, .. } => { + log::warn!("Switching beta after while updating resulted in new suggested upgrade: {:?}, aborting", new_app_version_info.suggested_upgrade); + version_cache + } + State::HasVersion { version_cache } => version_cache, + State::NoVersion => { + unreachable!("Can't get recommended upgrade on beta change without version") + } + }; - // Note: If we're in the `Downloaded` state, this resets the state to `HasVersion` - self.state = RoutingState::HasVersion { - version_info: version_info.clone(), - }; + self.state = State::HasVersion { version_cache }; + let _ = self.version_event_sender.send(new_app_version_info.clone()); - self.notify_version_requesters(); - } - } - // If there's no version or upgrading, do nothing - RoutingState::NoVersion | RoutingState::Downloading { .. } => (), - } + self.notify_version_requesters(new_app_version_info); } fn get_latest_version( &mut self, result_tx: oneshot::Sender<std::result::Result<AppVersionInfo, Error>>, ) { - match &self.state { - // When not upgrading, potentially fetch new version info, and append `result_tx` to - // list of channels to notify. - // We don't wait on `get_version_info` so that we don't block user commands. - RoutingState::NoVersion - | RoutingState::HasVersion { .. } - | RoutingState::Downloaded { .. } => { - // Start a version request unless already in progress - if self.version_request.is_terminated() { - let check = self.version_check.clone(); - let check_fut: Pin<Box<dyn Future<Output = Result<VersionCache>> + Send>> = - Box::pin(async move { check.get_version_info().await }); - self.version_request = check_fut.fuse(); - } - // Append to response channels - self.version_request_channels.push(result_tx); - } - // During upgrades, just pass on the last known version - RoutingState::Downloading { - version_info, - upgrading_to_version, - new_version: _, - downloader_handle: _, - } => { - let suggested_upgrade = suggested_upgrade_for_version(upgrading_to_version); - let info = AppVersionInfo { - current_version_supported: version_info.current_version_supported, - suggested_upgrade: Some(suggested_upgrade), - }; - let _ = result_tx.send(Ok(info)); - } + // Start a version request unless already in progress + match self + .refresh_version_check_tx + .unbounded_send(()) + .map_err(|_e| Error::VersionRouterClosed) + { + // Append to response channels + Ok(()) => self.version_request_channels.push(result_tx), + Err(err) => result_tx + .send(Err(err)) + .unwrap_or_else(|e| log::warn!("Failed to send version request result: {e:?}")), } + // Append to response channels + self.version_request_channels.push(result_tx); } #[cfg(update)] - async fn update_application(&mut self) { - match mem::replace(&mut self.state, RoutingState::NoVersion) { - // Checking state: start upgrade, if upgrade is available - RoutingState::HasVersion { version_info } => { - let Some(suggested_upgrade) = - suggested_upgrade(&version_info.latest_version, self.beta_program) + fn update_application(&mut self) { + use crate::version::downloader::spawn_downloader; + + match mem::replace(&mut self.state, State::NoVersion) { + // If we're already downloading or have a version, do nothing + State::Downloaded { version_cache, .. } | State::HasVersion { version_cache } => { + let Some(upgrading_to_version) = + recommended_version_upgrade(&version_cache.latest_version, self.beta_program) else { // If there's no suggested upgrade, do nothing - log::trace!("Received update request without suggested upgrade"); - self.state = RoutingState::HasVersion { version_info }; + log::debug!("Received update request without suggested upgrade"); + self.state = State::HasVersion { version_cache }; return; }; + log::info!( + "Starting upgrade to version {}", + upgrading_to_version.version + ); - let downloader_handle = tokio::spawn( - downloader::Downloader::start( - suggested_upgrade.clone(), - self.update_event_tx.clone(), - ) - .await - .expect("TODO: handle err"), + let downloader_handle = spawn_downloader( + upgrading_to_version.clone(), + self.app_upgrade_broadcast.clone(), ); - log::debug!("Starting upgrade"); - self.state = RoutingState::Downloading { - version_info, - upgrading_to_version: suggested_upgrade, - new_version: None, + self.state = State::Downloading { + version_cache, + upgrading_to_version, downloader_handle, }; - - // Notify callers of `get_latest_version`: cancel the version check and - // advertise the last known version as latest - self.notify_version_requesters(); } // Already downloading/downloaded or there is no version: do nothing state => { + log::debug!("Ignoring update request while in state {:?}", state); self.state = state; } } } #[cfg(update)] - async fn cancel_upgrade(&mut self) { - match mem::replace(&mut self.state, RoutingState::NoVersion) { + fn cancel_upgrade(&mut self) { + match mem::replace(&mut self.state, State::NoVersion) { // If we're upgrading, emit an event if a version was received during the upgrade // Otherwise, just reset upgrade info to last known state - RoutingState::Downloading { - version_info, - upgrading_to_version: _, - new_version, - downloader_handle, - } => { - // Abort download - downloader_handle.abort(); - let _ = downloader_handle.await; - - // Reset app version info to last known state - self.state = RoutingState::HasVersion { version_info }; - - // If we also received an upgrade, emit new version event - if let Some(version) = new_version { - let app_version = to_app_version_info(&version, self.beta_program); - let _ = self.version_event_sender.send(app_version); - } + State::Downloaded { version_cache, .. } | State::Downloading { version_cache, .. } => { + self.state = State::HasVersion { version_cache }; } // No-op unless we're downloading something right now // In the `Downloaded` state, we also do nothing state => self.state = state, }; + debug_assert!(!matches!( + self.state, + State::Downloading { .. } | State::NoVersion + )); } +} - /// Handle new version info - /// - /// If the router is in the process of upgrading, it will not propagate versions, but only - /// remember it for when it transitions back into the "idle" (version check) state. - fn on_new_version(&mut self, version: VersionCache) { - match &mut self.state { - // Set app version info - RoutingState::NoVersion => { - self.state = RoutingState::HasVersion { - version_info: version.clone(), - }; +fn updated_app_version_info_on_new_version_cache( + version_cache: &VersionCache, + new_version_cache: &VersionCache, + beta_program: bool, +) -> Option<AppVersionInfo> { + let prev_app_version = to_app_version_info(version_cache, beta_program, None); + let new_app_version = to_app_version_info(new_version_cache, beta_program, None); - // Initial version is propagated - let app_version_info = to_app_version_info(&version, self.beta_program); - let _ = self.version_event_sender.send(app_version_info); - } - // Update app version info - RoutingState::HasVersion { - version_info: prev_version, - .. - } - | RoutingState::Downloaded { - version_info: prev_version, - .. - } => { - // If the version changed, notify channel - // Note: Must account for beta program state - let prev_app_version = to_app_version_info(prev_version, self.beta_program); - let new_app_version = to_app_version_info(&version, self.beta_program); - if new_app_version != prev_app_version { - let _ = self.version_event_sender.send(new_app_version); - } + // Update version info + if new_app_version != prev_app_version { + Some(new_app_version) + } else { + None + } +} - // Note: If we're in the `Downloaded` state, this resets the state to `HasVersion` - if prev_version != &version { - self.state = RoutingState::HasVersion { - version_info: version, - }; - } - } - // If we're upgrading, remember the new version, but don't send any notification - RoutingState::Downloading { - ref mut new_version, - .. - } => { - *new_version = Some(version); - } - } +fn updated_app_version_info_on_new_beta( + version_cache: &VersionCache, + previous_beta_state: bool, + new_beta_state: bool, +) -> Option<AppVersionInfo> { + let prev_app_version = to_app_version_info(version_cache, previous_beta_state, None); + let new_app_version = to_app_version_info(version_cache, new_beta_state, None); - // Notify callers of `get_latest_version` - self.notify_version_requesters(); + // Update version info + if new_app_version != prev_app_version { + Some(new_app_version) + } else { + None } +} +/// Wait for the update to finish. In case no update is in progress (or the platform does not +/// support in-app upgrades), then the future will never resolve as to not escape the select statement. +#[allow(clippy::unused_async, unused_variables)] +async fn wait_for_update(state: &mut State) -> Option<AppVersionInfo> { #[cfg(update)] - fn handle_update_event(&mut self, event: downloader::UpdateEvent) { - debug_assert!( - matches!(self.state, RoutingState::Downloading { .. }), - "unexpected routing state: {:?}", - self.state - ); - - use downloader::UpdateEvent; - - match event { - UpdateEvent::Downloading { - server, - complete_frac: f32, - time_left, - } => { - // TODO: emit version event to clients - } - UpdateEvent::DownloadFailed => { - // TODO: transition to HasVersion state - // TODO: emit version event to clients - } - UpdateEvent::Verifying => { - // TODO: emit version event to clients - } - UpdateEvent::VerificationFailed => { - // TODO: transition to HasVersion state - // TODO: emit version event to clients + match state { + State::Downloading { + version_cache, + ref mut downloader_handle, + upgrading_to_version, + .. + } => match downloader_handle.wait().await { + Ok(verified_installer_path) => { + let app_update_info = AppVersionInfo { + current_version_supported: version_cache.current_version_supported, + suggested_upgrade: Some({ + SuggestedUpgrade { + version: upgrading_to_version.version.clone(), + changelog: upgrading_to_version.changelog.clone(), + verified_installer_path: Some(verified_installer_path.clone()), + } + }), + }; + *state = State::Downloaded { + version_cache: version_cache.clone(), + verified_installer_path, + }; + Some(app_update_info) } - UpdateEvent::Verified { - verified_installer_path, - } => { - // TODO: transition to Downloaded state - // TODO: emit version event to clients + Err(err) => { + log::trace!("Downloader task ended: {err}"); + *state = State::HasVersion { + version_cache: version_cache.clone(), + }; + None } + }, + _ => { + let () = std::future::pending().await; + unreachable!() } } - - /// Notify clients requesting a version - fn notify_version_requesters(&mut self) { - // Cancel update notifications - self.version_request = Fuse::terminated(); - - let version_info = match &self.state { - RoutingState::NoVersion => { - log::error!("Dropping version request channels since there's no version"); - self.version_request_channels.clear(); - return; - } - // Update app version info - RoutingState::HasVersion { version_info } - | RoutingState::Downloaded { version_info, .. } => { - to_app_version_info(version_info, self.beta_program) - } - // If we're upgrading, emit the version we're currently upgrading to - RoutingState::Downloading { - version_info, - upgrading_to_version, - .. - } => { - let suggested_upgrade = suggested_upgrade_for_version(upgrading_to_version); - AppVersionInfo { - current_version_supported: version_info.current_version_supported, - suggested_upgrade: Some(suggested_upgrade), - } - } - }; - - // Notify all requesters - for tx in self.version_request_channels.drain(..) { - let _ = tx.send(Ok(version_info.clone())); - } + #[cfg(not(update))] + { + let () = std::future::pending().await; + unreachable!() } } /// Extract [`AppVersionInfo`], containing upgrade version and `current_version_supported` /// from [VersionCache] and beta program state. -fn to_app_version_info(cache: &VersionCache, beta_program: bool) -> AppVersionInfo { +fn to_app_version_info( + cache: &VersionCache, + beta_program: bool, + verified_installer_path: Option<PathBuf>, +) -> AppVersionInfo { let current_version_supported = cache.current_version_supported; - let suggested_upgrade = suggested_upgrade(&cache.latest_version, beta_program) - .as_ref() - .map(suggested_upgrade_for_version); + let suggested_upgrade = + recommended_version_upgrade(&cache.latest_version, beta_program).map(|version| { + SuggestedUpgrade { + version: version.version, + changelog: version.changelog, + verified_installer_path, + } + }); AppVersionInfo { current_version_supported, suggested_upgrade, @@ -570,7 +556,7 @@ fn to_app_version_info(cache: &VersionCache, beta_program: bool) -> AppVersionIn } /// Extract upgrade version from [VersionCache] based on `beta_program` -fn suggested_upgrade( +fn recommended_version_upgrade( version_info: &VersionInfo, beta_program: bool, ) -> Option<mullvad_update::version::Version> { @@ -588,15 +574,3 @@ fn suggested_upgrade( None } } - -/// Convert [mullvad_update::version::Version] to [SuggestedUpgrade] -fn suggested_upgrade_for_version( - version_details: &mullvad_update::version::Version, -) -> SuggestedUpgrade { - SuggestedUpgrade { - version: version_details.version.clone(), - changelog: Some(version_details.changelog.clone()), - // TODO: This should return the downloaded & verified path, if it exists - verified_installer_path: None, - } -} diff --git a/mullvad-management-interface/proto/management_interface.proto b/mullvad-management-interface/proto/management_interface.proto index 4793007fb0..a7e5e7a18c 100644 --- a/mullvad-management-interface/proto/management_interface.proto +++ b/mullvad-management-interface/proto/management_interface.proto @@ -153,7 +153,7 @@ message AppUpgradeDownloadStarting {} message AppUpgradeDownloadProgress { string server = 1; uint32 progress = 2; - google.protobuf.Duration time_left = 3; + optional google.protobuf.Duration time_left = 3; } message AppUpgradeAborted {} message AppUpgradeVerifyingInstaller {} @@ -665,7 +665,7 @@ message ExcludedProcessList { repeated ExcludedProcess processes = 1; } message SuggestedUpgrade { string version = 1; - optional string changelog = 2; + string changelog = 2; optional string verified_installer_path = 3; } diff --git a/mullvad-management-interface/src/types/conversions/version.rs b/mullvad-management-interface/src/types/conversions/version.rs index 3c66941f3e..381dbbc0df 100644 --- a/mullvad-management-interface/src/types/conversions/version.rs +++ b/mullvad-management-interface/src/types/conversions/version.rs @@ -121,12 +121,15 @@ impl TryFrom<proto::AppUpgradeEvent> for AppUpgradeEvent { impl From<AppUpgradeDownloadProgress> for proto::AppUpgradeDownloadProgress { fn from(value: AppUpgradeDownloadProgress) -> Self { // From the docs: Converts a std::time::Duration to a Duration, failing if the duration is too large. - let time_left = prost_types::Duration::try_from(value.time_left) - .expect("Failed to convert duration to protobuf"); + let time_left = value + .time_left + .map(prost_types::Duration::try_from) + .transpose() + .expect("Failed to convert duration to protobuf, duration is too large"); proto::AppUpgradeDownloadProgress { server: value.server, progress: value.progress, - time_left: Some(time_left), + time_left, } } } @@ -135,14 +138,13 @@ impl TryFrom<proto::AppUpgradeDownloadProgress> for AppUpgradeDownloadProgress { type Error = FromProtobufTypeError; fn try_from(value: proto::AppUpgradeDownloadProgress) -> Result<Self, Self::Error> { - let Some(time_left) = value.time_left else { - return Err(FromProtobufTypeError::InvalidArgument( - "Non-existent AppUpgradeDownloadProgress::time_left", - )); - }; // From the docs: Converts a Duration to a std::time::Duration, failing if the duration is negative. - let time_left = std::time::Duration::try_from(time_left) + let time_left = value + .time_left + .map(std::time::Duration::try_from) + .transpose() .expect("Failed to convert duration to std::time::Duration"); + let progress = AppUpgradeDownloadProgress { server: value.server, progress: value.progress, diff --git a/mullvad-paths/src/cache.rs b/mullvad-paths/src/cache.rs index 3b31a9460b..26f36a4629 100644 --- a/mullvad-paths/src/cache.rs +++ b/mullvad-paths/src/cache.rs @@ -4,10 +4,11 @@ use std::{env, path::PathBuf}; /// Creates and returns the cache directory pointed to by `MULLVAD_CACHE_DIR`, or the default /// one if that variable is unset. pub fn cache_dir() -> Result<PathBuf> { - #[cfg(unix)] - let permissions = crate::unix::Permissions::ReadExecOnly; - #[cfg(target_os = "windows")] - let permissions = true; + let permissions = Some(crate::UserPermissions { + read: true, + write: false, + execute: true, + }); crate::create_dir(get_cache_dir()?, permissions) } diff --git a/mullvad-paths/src/lib.rs b/mullvad-paths/src/lib.rs index c5eb5b9a52..508a3e8956 100644 --- a/mullvad-paths/src/lib.rs +++ b/mullvad-paths/src/lib.rs @@ -46,6 +46,23 @@ pub enum Error { NoDataDir, } +#[derive(Clone, Copy)] +pub struct UserPermissions { + pub read: bool, + pub write: bool, + pub execute: bool, +} + +impl UserPermissions { + pub fn read_only() -> Self { + UserPermissions { + read: true, + write: false, + execute: false, + } + } +} + #[cfg(unix)] use unix::create_dir; diff --git a/mullvad-paths/src/logs.rs b/mullvad-paths/src/logs.rs index 03ab990632..5a1fb2034d 100644 --- a/mullvad-paths/src/logs.rs +++ b/mullvad-paths/src/logs.rs @@ -4,14 +4,14 @@ use std::{env, path::PathBuf}; /// Creates and returns the logging directory pointed to by `MULLVAD_LOG_DIR`, or the default /// one if that variable is unset. pub fn log_dir() -> Result<PathBuf> { - #[cfg(unix)] - { - crate::create_dir(get_log_dir()?, crate::unix::Permissions::ReadExecOnly) - } - #[cfg(target_os = "windows")] - { - crate::create_dir(get_log_dir()?, true) - } + let permissions = Some(crate::UserPermissions { + read: true, + write: false, + // Unix: Make directory contents readable + execute: cfg!(unix), + }); + + crate::create_dir(get_log_dir()?, permissions) } /// Get the logging directory, but don't try to create it. diff --git a/mullvad-paths/src/settings.rs b/mullvad-paths/src/settings.rs index 016e158bee..3852c56ddd 100644 --- a/mullvad-paths/src/settings.rs +++ b/mullvad-paths/src/settings.rs @@ -4,15 +4,7 @@ use std::{env, path::PathBuf}; /// Creates and returns the settings directory pointed to by `MULLVAD_SETTINGS_DIR`, or the default /// one if that variable is unset. pub fn settings_dir() -> Result<PathBuf> { - #[cfg(unix)] - { - crate::create_dir(get_settings_dir()?, crate::unix::Permissions::Any) - } - - #[cfg(target_os = "windows")] - { - crate::create_dir(get_settings_dir()?, false) - } + crate::create_dir(get_settings_dir()?, None) } fn get_settings_dir() -> Result<PathBuf> { diff --git a/mullvad-paths/src/unix.rs b/mullvad-paths/src/unix.rs index 90a5d2fc5c..5d5b3b03a1 100644 --- a/mullvad-paths/src/unix.rs +++ b/mullvad-paths/src/unix.rs @@ -4,35 +4,29 @@ use std::{ path::{Path, PathBuf}, }; -use crate::{Error, Result}; +use crate::{Error, Result, UserPermissions}; pub const PRODUCT_NAME: &str = "mullvad-vpn"; -#[derive(Clone, Copy, PartialEq)] -pub enum Permissions { - /// Do not set any particular permissions. They will be inherited instead. - Any, - /// Only root should have write access. Other users will have - /// read and execute permissions (0o755). - ReadExecOnly, -} +impl UserPermissions { + fn fs_permissions(self) -> fs::Permissions { + const OWNER_BITS: u32 = 0o700; -impl Permissions { - fn fs_permissions(self) -> Option<fs::Permissions> { - match self { - Permissions::Any => None, - Permissions::ReadExecOnly => Some(std::os::unix::fs::PermissionsExt::from_mode(0o755)), - } + let rbits = if self.read { 0o044 } else { 0 }; + let wbits = if self.write { 0o022 } else { 0 }; + let ebits = if self.execute { 0o011 } else { 0 }; + + std::os::unix::fs::PermissionsExt::from_mode(OWNER_BITS | rbits | wbits | ebits) } } /// Create a directory at `dir`, setting the permissions given by `permissions`, unless it exists. /// If the directory already exists, but the permissions are not at least as strict as expected, /// then it will be deleted and recreated. -pub fn create_dir(dir: PathBuf, permissions: Permissions) -> Result<PathBuf> { +pub fn create_dir(dir: PathBuf, permissions: Option<UserPermissions>) -> Result<PathBuf> { let mut dir_builder = fs::DirBuilder::new(); - let fs_perms = permissions.fs_permissions(); - if let Some(fs_perms) = fs_perms.as_ref() { + let fs_perms = permissions.as_ref().map(|perms| perms.fs_permissions()); + if let Some(fs_perms) = &fs_perms { dir_builder.mode(fs_perms.mode()); } match dir_builder.create(&dir) { diff --git a/mullvad-paths/src/windows.rs b/mullvad-paths/src/windows.rs index 102fa53f5f..5bab1c68f3 100644 --- a/mullvad-paths/src/windows.rs +++ b/mullvad-paths/src/windows.rs @@ -1,6 +1,6 @@ #![allow(clippy::undocumented_unsafe_blocks)] // Remove me if you dare. -use crate::{Error, Result}; +use crate::{Error, Result, UserPermissions}; use once_cell::sync::OnceCell; use std::{ ffi::OsStr, @@ -15,7 +15,7 @@ use windows_sys::{ Win32::{ Foundation::{ CloseHandle, LocalFree, ERROR_INSUFFICIENT_BUFFER, ERROR_SUCCESS, GENERIC_ALL, - GENERIC_READ, HANDLE, INVALID_HANDLE_VALUE, LUID, S_OK, + GENERIC_EXECUTE, GENERIC_READ, GENERIC_WRITE, HANDLE, INVALID_HANDLE_VALUE, LUID, S_OK, }, Security::{ self, AdjustTokenPrivileges, @@ -58,9 +58,10 @@ pub fn get_allusersprofile_dir() -> Result<PathBuf> { /// file permissions corresponding to Authenticated Users - Read Only and Administrators - Full /// Access. Only directories that do not already exist and the leaf directory will have their /// permissions set. -pub fn create_dir(path: PathBuf, set_security_permissions: bool) -> Result<PathBuf> { - if set_security_permissions { - create_dir_with_permissions_recursive(&path)?; +#[cfg(windows)] +pub fn create_dir(path: PathBuf, user_permissions: Option<UserPermissions>) -> Result<PathBuf> { + if let Some(user_permissions) = user_permissions { + create_dir_recursive_with_permissions(&path, user_permissions)?; } else { std::fs::create_dir_all(&path).map_err(|e| { Error::CreateDirFailed( @@ -93,12 +94,31 @@ fn get_wide_str<S: AsRef<OsStr>>(string: S) -> Vec<u16> { wide_string } +impl UserPermissions { + fn flags(self) -> u32 { + let mut flags = 0; + if self.read { + flags |= GENERIC_READ; + } + if self.write { + flags |= GENERIC_WRITE; + } + if self.execute { + flags |= GENERIC_EXECUTE; + } + flags + } +} + /// If directory at path already exists, set permissions for it. /// If directory at path don't exist but parent does, create directory and set permissions. /// If parent directory at path does not exist then recurse and create parent directory and set /// permissions for it, then create child directory and set permissions. /// This does not set permissions for parent directories that already exists. -fn create_dir_with_permissions_recursive(path: &Path) -> Result<()> { +fn create_dir_recursive_with_permissions( + path: &Path, + user_permissions: UserPermissions, +) -> Result<()> { // No directory to create if path == Path::new("") { return Ok(()); @@ -106,13 +126,13 @@ fn create_dir_with_permissions_recursive(path: &Path) -> Result<()> { match std::fs::create_dir(path) { Ok(()) => { - return set_security_permissions(path); + return set_security_permissions(path, user_permissions); } // Could not find parent directory, try creating parent Err(e) if e.kind() == io::ErrorKind::NotFound => (), // Directory already exists, set permissions Err(e) if e.kind() == io::ErrorKind::AlreadyExists && path.is_dir() => { - return set_security_permissions(path); + return set_security_permissions(path, user_permissions); } Err(e) => { return Err(Error::CreateDirFailed( @@ -124,7 +144,7 @@ fn create_dir_with_permissions_recursive(path: &Path) -> Result<()> { match path.parent() { // Create parent directory - Some(parent) => create_dir_with_permissions_recursive(parent)?, + Some(parent) => create_dir_recursive_with_permissions(parent, user_permissions)?, None => { // Reached the top of the tree but when creating directories only got NotFound for some // reason @@ -139,19 +159,19 @@ fn create_dir_with_permissions_recursive(path: &Path) -> Result<()> { } std::fs::create_dir(path).map_err(|e| Error::CreateDirFailed(path.display().to_string(), e))?; - set_security_permissions(path) + set_security_permissions(path, user_permissions) } /// Recursively creates directories for the given path with permissions that give full access to /// admins and read only access to authenticated users. If any of the directories already exist this /// will not return an error, instead it will apply the permissions and if successful return Ok(()). pub fn create_privileged_directory(path: &Path) -> Result<()> { - create_dir_with_permissions_recursive(path) + create_dir_recursive_with_permissions(path, UserPermissions::read_only()) } /// Sets security permissions for path such that admin has full ownership and access while /// authenticated users only have read access. -fn set_security_permissions(path: &Path) -> Result<()> { +fn set_security_permissions(path: &Path, user_permissions: UserPermissions) -> Result<()> { let wide_path = get_wide_str(path); let security_information = Security::DACL_SECURITY_INFORMATION | Security::PROTECTED_DACL_SECURITY_INFORMATION @@ -216,7 +236,7 @@ fn set_security_permissions(path: &Path) -> Result<()> { }; let authenticated_users_ea = EXPLICIT_ACCESS_W { - grfAccessPermissions: GENERIC_READ, + grfAccessPermissions: user_permissions.flags(), grfAccessMode: SET_ACCESS, grfInheritance: NO_INHERITANCE | SUB_CONTAINERS_AND_OBJECTS_INHERIT, Trustee: trustee, diff --git a/mullvad-types/src/version.rs b/mullvad-types/src/version.rs index 2ca7218114..e2e73b4ea7 100644 --- a/mullvad-types/src/version.rs +++ b/mullvad-types/src/version.rs @@ -1,4 +1,4 @@ -use std::path::PathBuf; +use std::{fmt::Display, path::PathBuf}; use serde::{Deserialize, Serialize}; @@ -18,12 +18,29 @@ pub struct AppVersionInfo { pub suggested_upgrade: Option<SuggestedUpgrade>, } +impl Display for AppVersionInfo { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + if let Some(suggested_upgrade) = &self.suggested_upgrade { + writeln!(f, "Suggested upgrade: {}", suggested_upgrade.version)?; + if let Some(path) = &suggested_upgrade.verified_installer_path { + writeln!(f, "verified installer path: '{}'", path.display())?; + } + } + if self.current_version_supported { + write!(f, "Current version supported")?; + } else { + write!(f, "Current version not supported")?; + } + Ok(()) + } +} + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct SuggestedUpgrade { /// Version available for update pub version: mullvad_version::Version, /// Changelog - pub changelog: Option<String>, + pub changelog: String, /// Path to the available installer, iff it has been verified pub verified_installer_path: Option<PathBuf>, } @@ -32,7 +49,7 @@ pub struct SuggestedUpgrade { pub struct AppUpgradeDownloadProgress { pub server: String, pub progress: u32, - pub time_left: std::time::Duration, + pub time_left: Option<std::time::Duration>, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] |
