diff options
| author | David Lönnhager <david.l@mullvad.net> | 2021-11-01 10:55:23 +0100 |
|---|---|---|
| committer | David Lönnhager <david.l@mullvad.net> | 2021-11-01 10:55:23 +0100 |
| commit | 6a6bc26be3d4033be0bb37666a0cda7211dd5aa1 (patch) | |
| tree | d815d64ed044c02c4381d5d65d3a30adbb1cc21d | |
| parent | 8952904a4202a1517d247043ca9a462dc819f1e5 (diff) | |
| parent | 4967100c75d7f14e67a650cee83278afed3dd41a (diff) | |
| download | mullvadvpn-6a6bc26be3d4033be0bb37666a0cda7211dd5aa1.tar.xz mullvadvpn-6a6bc26be3d4033be0bb37666a0cda7211dd5aa1.zip | |
Merge branch 'delay-startup-reqs'
| -rw-r--r-- | CHANGELOG.md | 2 | ||||
| -rw-r--r-- | mullvad-daemon/Cargo.toml | 2 | ||||
| -rw-r--r-- | mullvad-daemon/src/account.rs | 12 | ||||
| -rw-r--r-- | mullvad-daemon/src/lib.rs | 143 | ||||
| -rw-r--r-- | mullvad-daemon/src/relays.rs | 12 | ||||
| -rw-r--r-- | mullvad-daemon/src/version_check.rs | 2 | ||||
| -rw-r--r-- | mullvad-daemon/src/wireguard.rs | 4 | ||||
| -rw-r--r-- | mullvad-problem-report/src/lib.rs | 1 | ||||
| -rw-r--r-- | mullvad-rpc/src/address_cache.rs | 20 | ||||
| -rw-r--r-- | mullvad-rpc/src/availability.rs | 51 | ||||
| -rw-r--r-- | mullvad-rpc/src/lib.rs | 32 | ||||
| -rw-r--r-- | mullvad-rpc/src/rest.rs | 13 | ||||
| -rw-r--r-- | mullvad-setup/src/main.rs | 1 |
13 files changed, 148 insertions, 147 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index e0a7526ade..29514318ca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,6 +40,8 @@ Line wrap the file at 100 chars. Th ### Fixed - Fix desktop app showing a future date for when WireGuard key was generated. - Fix desktop app split tunneling view to not overflow on very long application names. +- Prevent API requests from being made prior to the tunnel state machine being set up. + Rarely, failed requests could result in a deadlock. #### Windows - Fix detection of Windows 11. Problem reports will now correctly report Windows 11 instead diff --git a/mullvad-daemon/Cargo.toml b/mullvad-daemon/Cargo.toml index 24d21209f3..dc00d46853 100644 --- a/mullvad-daemon/Cargo.toml +++ b/mullvad-daemon/Cargo.toml @@ -24,7 +24,7 @@ rand = "0.7" regex = "1.0" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" -tokio = { version = "1.8", features = [ "fs", "rt-multi-thread", "sync" ] } +tokio = { version = "1.8", features = [ "fs", "rt-multi-thread", "sync", "time" ] } tokio-stream = "0.1" uuid = { version = "0.8", features = ["v4"] } diff --git a/mullvad-daemon/src/account.rs b/mullvad-daemon/src/account.rs index 9fda91a813..26795072ab 100644 --- a/mullvad-daemon/src/account.rs +++ b/mullvad-daemon/src/account.rs @@ -86,7 +86,7 @@ impl AccountHandle { .await; if result.is_ok() { self.initial_check_abort_handle.abort(); - self.api_availability.resume(); + self.api_availability.resume_background(); } result } @@ -107,7 +107,7 @@ impl Account { api_availability: ApiAvailabilityHandle, ) -> AccountHandle { let accounts_proxy = AccountsProxy::new(rpc_handle); - api_availability.pause(); + api_availability.pause_background(); let api_availability_copy = api_availability.clone(); let accounts_proxy_copy = accounts_proxy.clone(); @@ -116,7 +116,7 @@ impl Account { let token = if let Some(token) = token { token } else { - api_availability.pause(); + api_availability.pause_background(); return; }; @@ -155,16 +155,16 @@ fn handle_expiry_result_inner( ) -> bool { match result { Ok(_expiry) if *_expiry >= chrono::Utc::now() => { - api_availability.resume(); + api_availability.resume_background(); true } Ok(_expiry) => { - api_availability.pause(); + api_availability.pause_background(); true } Err(mullvad_rpc::rest::Error::ApiError(_status, code)) => { if code == mullvad_rpc::INVALID_ACCOUNT || code == mullvad_rpc::INVALID_AUTH { - api_availability.pause(); + api_availability.pause_background(); return true; } false diff --git a/mullvad-daemon/src/lib.rs b/mullvad-daemon/src/lib.rs index 7597f01020..21ca9298b8 100644 --- a/mullvad-daemon/src/lib.rs +++ b/mullvad-daemon/src/lib.rs @@ -24,7 +24,7 @@ mod version_check; use futures::{ channel::{mpsc, oneshot}, future::{abortable, AbortHandle, Future}, - SinkExt, StreamExt, + StreamExt, }; use log::{debug, error, info, warn}; use mullvad_rpc::availability::ApiAvailabilityHandle; @@ -559,49 +559,6 @@ where let runtime = tokio::runtime::Handle::current(); let (internal_event_tx, internal_event_rx) = command_channel.destructure(); - let (address_change_tx, mut address_change_rx) = mpsc::channel(0); - let address_change_tx = std::sync::Mutex::new(address_change_tx); - let address_change_runtime = runtime.clone(); - - let mut rpc_runtime = mullvad_rpc::MullvadRpcRuntime::with_cache( - runtime.clone(), - Some(&resource_dir), - &cache_dir, - true, - move |address| { - let (result_tx, result_rx) = oneshot::channel(); - - let mut tx = address_change_tx.lock().unwrap().clone(); - address_change_runtime.block_on(async move { - let tunnel_command = TunnelCommand::AllowEndpoint( - Endpoint::from_socket_address(address, TransportProtocol::Tcp), - result_tx, - ); - let _ = tx.send(tunnel_command).await; - result_rx.await.map_err(|_| ()) - }) - }, - #[cfg(target_os = "android")] - Self::create_bypass_tx(&internal_event_tx), - ) - .await - .map_err(Error::InitRpcFactory)?; - let rpc_handle = rpc_runtime.mullvad_rest_handle(); - let api_availability = rpc_runtime.availability_handle(); - - let relay_list_listener = event_listener.clone(); - let on_relay_list_update = move |relay_list: &RelayList| { - relay_list_listener.notify_relay_list(relay_list.clone()); - }; - - let relay_selector = relays::RelaySelector::new( - rpc_handle.clone(), - on_relay_list_update, - &resource_dir, - &cache_dir, - api_availability.clone(), - ); - if let Err(error) = migrations::migrate_all(&cache_dir, &settings_dir).await { log::error!( @@ -615,21 +572,6 @@ where let _ = settings.set_show_beta_releases(true).await; } - let app_version_info = version_check::load_cache(&cache_dir).await; - let (version_updater, version_updater_handle) = version_check::VersionUpdater::new( - rpc_handle.clone(), - api_availability.clone(), - cache_dir.clone(), - internal_event_tx.to_specialized_sender(), - app_version_info.clone(), - settings.show_beta_releases, - ); - tokio::spawn(version_updater.run()); - let account_history = - account_history::AccountHistory::new(&settings_dir, settings.get_account_token()) - .await - .map_err(Error::LoadAccountHistory)?; - // Restore the tunnel to a previous state let target_cache = cache_dir.join(TARGET_START_STATE_FILE); let cached_target_state: Option<TargetState> = @@ -676,10 +618,6 @@ where }; Self::cache_target_state(&cache_dir, initial_target_state).await; - let initial_api_endpoint = Endpoint::from_socket_address( - rpc_runtime.address_cache.peek_address(), - TransportProtocol::Tcp, - ); #[cfg(windows)] let exclude_paths = if settings.split_tunnel.enable_exclusions { settings @@ -692,8 +630,26 @@ where vec![] }; - let (offline_state_tx, offline_state_rx) = mpsc::unbounded(); + let mut rpc_runtime = mullvad_rpc::MullvadRpcRuntime::with_cache( + runtime.clone(), + Some(&resource_dir), + &cache_dir, + true, + #[cfg(target_os = "android")] + Self::create_bypass_tx(&internal_event_tx), + ) + .await + .map_err(Error::InitRpcFactory)?; + + let api_availability = rpc_runtime.availability_handle(); + api_availability.suspend(); + let initial_api_endpoint = Endpoint::from_socket_address( + rpc_runtime.address_cache.peek_address(), + TransportProtocol::Tcp, + ); + + let (offline_state_tx, offline_state_rx) = mpsc::unbounded(); let tunnel_command_tx = tunnel_state_machine::spawn( runtime.clone(), tunnel_state_machine::InitialTunnelState { @@ -707,7 +663,7 @@ where }, tunnel_parameters_generator, log_dir, - resource_dir, + resource_dir.clone(), cache_dir.clone(), internal_event_tx.to_specialized_sender(), offline_state_tx, @@ -718,19 +674,56 @@ where .await .map_err(Error::TunnelError)?; - Self::forward_offline_state(&runtime, api_availability.clone(), offline_state_rx).await; - - let tsm_api_address_change_tx = Arc::downgrade(&tunnel_command_tx); - tokio::spawn(async move { - while let Some(address_change) = address_change_rx.next().await { - if let Some(tx) = tsm_api_address_change_tx.upgrade() { - let _ = tx.unbounded_send(address_change); + let address_change_runtime = runtime.clone(); + let tunnel_cmd_weak_tx = Arc::downgrade(&tunnel_command_tx); + rpc_runtime.set_address_change_listener(move |address| { + let (result_tx, result_rx) = oneshot::channel(); + let tx = tunnel_cmd_weak_tx.clone(); + address_change_runtime.block_on(async move { + if let Some(tx) = tx.upgrade() { + let _ = tx.unbounded_send(TunnelCommand::AllowEndpoint( + Endpoint::from_socket_address(address, TransportProtocol::Tcp), + result_tx, + )); + result_rx.await.map_err(|_| ()) } else { - return; + Err(()) } - } + }) }); + let rpc_handle = rpc_runtime.mullvad_rest_handle(); + + Self::forward_offline_state(&runtime, api_availability.clone(), offline_state_rx).await; + + let relay_list_listener = event_listener.clone(); + let on_relay_list_update = move |relay_list: &RelayList| { + relay_list_listener.notify_relay_list(relay_list.clone()); + }; + + let relay_selector = relays::RelaySelector::new( + rpc_handle.clone(), + on_relay_list_update, + &resource_dir, + &cache_dir, + api_availability.clone(), + ); + + let app_version_info = version_check::load_cache(&cache_dir).await; + let (version_updater, version_updater_handle) = version_check::VersionUpdater::new( + rpc_handle.clone(), + api_availability.clone(), + cache_dir.clone(), + internal_event_tx.to_specialized_sender(), + app_version_info.clone(), + settings.show_beta_releases, + ); + tokio::spawn(version_updater.run()); + let account_history = + account_history::AccountHistory::new(&settings_dir, settings.get_account_token()) + .await + .map_err(Error::LoadAccountHistory)?; + let wireguard_key_manager = wireguard::KeyManager::new( internal_event_tx.clone(), api_availability.clone(), @@ -781,6 +774,8 @@ where daemon.ensure_wireguard_keys_for_current_account().await; + api_availability.unsuspend(); + Ok(daemon) } diff --git a/mullvad-daemon/src/relays.rs b/mullvad-daemon/src/relays.rs index 9811864630..f1f5ec8e9e 100644 --- a/mullvad-daemon/src/relays.rs +++ b/mullvad-daemon/src/relays.rs @@ -1046,10 +1046,12 @@ impl RelayListUpdater { } async fn run(mut self, mut cmd_rx: mpsc::Receiver<bool>) { - let mut check_interval = tokio_stream::wrappers::IntervalStream::new( - tokio::time::interval(UPDATE_CHECK_INTERVAL), - ) - .fuse(); + let mut check_interval = + tokio_stream::wrappers::IntervalStream::new(tokio::time::interval_at( + (Instant::now() + UPDATE_CHECK_INTERVAL).into(), + UPDATE_CHECK_INTERVAL, + )) + .fuse(); let mut download_future = Box::pin(Fuse::terminated()); loop { futures::select! { @@ -1129,7 +1131,7 @@ impl RelayListUpdater { tag: Option<String>, ) -> impl Future<Output = Result<Option<RelayList>, mullvad_rpc::Error>> + 'static { let download_futures = move || { - let available = api_handle.wait_available(); + let available = api_handle.wait_background(); let req = rpc_handle.relay_list(tag.clone()); async move { available.await?; diff --git a/mullvad-daemon/src/version_check.rs b/mullvad-daemon/src/version_check.rs index dfcc82b6e8..a97e6a7478 100644 --- a/mullvad-daemon/src/version_check.rs +++ b/mullvad-daemon/src/version_check.rs @@ -233,7 +233,7 @@ impl VersionUpdater { let version_proxy = self.version_proxy.clone(); let platform_version = self.platform_version.clone(); let download_future_factory = move || { - let when_available = api_handle.wait_available(); + let when_available = api_handle.wait_background(); let request = version_proxy.version_check( PRODUCT_VERSION.to_owned(), PLATFORM, diff --git a/mullvad-daemon/src/wireguard.rs b/mullvad-daemon/src/wireguard.rs index 8f0eacd454..da6490cb5d 100644 --- a/mullvad-daemon/src/wireguard.rs +++ b/mullvad-daemon/src/wireguard.rs @@ -229,7 +229,7 @@ impl KeyManager { let availability_handle = self.availability_handle.clone(); let future_generator = move || { - let wait_available = availability_handle.wait_available(); + let wait_available = availability_handle.wait_background(); let fut = inner_future_generator(); let error_tx = error_tx.clone(); let error_account = error_account.clone(); @@ -381,7 +381,7 @@ impl KeyManager { let rotate_key_for_account = move |old_key: &PublicKey| -> Pin<Box<dyn Future<Output = Result<PublicKey>> + Send>> { - let wait_available = availability_handle.wait_available(); + let wait_available = availability_handle.wait_background(); let rotate = Self::rotate_key( daemon_tx.clone(), http_handle.clone(), diff --git a/mullvad-problem-report/src/lib.rs b/mullvad-problem-report/src/lib.rs index e6312062ff..9a5bca1a61 100644 --- a/mullvad-problem-report/src/lib.rs +++ b/mullvad-problem-report/src/lib.rs @@ -283,7 +283,6 @@ pub fn send_problem_report( None, cache_dir, false, - |_| Ok(()), #[cfg(target_os = "android")] None, )) diff --git a/mullvad-rpc/src/address_cache.rs b/mullvad-rpc/src/address_cache.rs index b681265dc5..44a7127f7d 100644 --- a/mullvad-rpc/src/address_cache.rs +++ b/mullvad-rpc/src/address_cache.rs @@ -44,11 +44,7 @@ pub struct AddressCache { impl AddressCache { /// Initialize cache using the given list, and write changes to `write_path`. - pub fn new( - addresses: Vec<SocketAddr>, - write_path: Option<Box<Path>>, - change_listener: Arc<Box<CurrentAddressChangeListener>>, - ) -> Result<Self, Error> { + pub fn new(addresses: Vec<SocketAddr>, write_path: Option<Box<Path>>) -> Result<Self, Error> { let mut cache = AddressCacheInner::from_addresses(addresses)?; cache.shuffle_tail(); log::trace!("API address cache: {:?}", cache.addresses); @@ -57,23 +53,15 @@ impl AddressCache { let address_cache = Self { inner: Arc::new(Mutex::new(cache)), write_path: write_path.map(|cache| Arc::from(cache)), - change_listener, + change_listener: Arc::new(Box::new(|_| Ok(()))), }; Ok(address_cache) } /// Initialize cache using `read_path`, and write changes to `write_path`. - pub async fn from_file( - read_path: &Path, - write_path: Option<Box<Path>>, - change_listener: Arc<Box<CurrentAddressChangeListener>>, - ) -> Result<Self, Error> { + pub async fn from_file(read_path: &Path, write_path: Option<Box<Path>>) -> Result<Self, Error> { log::debug!("Loading API addresses from {:?}", read_path); - Self::new( - read_address_file(read_path).await?, - write_path, - change_listener, - ) + Self::new(read_address_file(read_path).await?, write_path) } pub fn set_change_listener(&mut self, change_listener: Arc<Box<CurrentAddressChangeListener>>) { diff --git a/mullvad-rpc/src/availability.rs b/mullvad-rpc/src/availability.rs index 227bc0cd35..77803b44b2 100644 --- a/mullvad-rpc/src/availability.rs +++ b/mullvad-rpc/src/availability.rs @@ -18,21 +18,22 @@ pub enum Error { #[derive(PartialEq, Eq, Clone, Copy, Debug, Default)] pub struct State { - pause_automatic: bool, + suspended: bool, + pause_background: bool, offline: bool, } impl State { - pub fn is_paused(&self) -> bool { - self.pause_automatic + pub fn is_suspended(&self) -> bool { + self.suspended } - pub fn is_offline(&self) -> bool { - self.offline + pub fn is_background_paused(&self) -> bool { + self.offline || self.pause_background || self.suspended } - pub fn is_available(&self) -> bool { - !self.is_paused() && !self.is_offline() + pub fn is_offline(&self) -> bool { + self.offline } } @@ -67,18 +68,34 @@ pub struct ApiAvailabilityHandle { } impl ApiAvailabilityHandle { - pub fn pause(&self) { + pub fn suspend(&self) { + let mut state = self.state.lock().unwrap(); + if !state.suspended { + state.suspended = true; + let _ = self.tx.send(*state); + } + } + + pub fn unsuspend(&self) { + let mut state = self.state.lock().unwrap(); + if state.suspended { + state.suspended = false; + let _ = self.tx.send(*state); + } + } + + pub fn pause_background(&self) { let mut state = self.state.lock().unwrap(); - if !state.pause_automatic { - state.pause_automatic = true; + if !state.pause_background { + state.pause_background = true; let _ = self.tx.send(*state); } } - pub fn resume(&self) { + pub fn resume_background(&self) { let mut state = self.state.lock().unwrap(); - if state.pause_automatic { - state.pause_automatic = false; + if state.pause_background { + state.pause_background = false; let _ = self.tx.send(*state); } } @@ -95,8 +112,12 @@ impl ApiAvailabilityHandle { *self.state.lock().unwrap() } - pub fn wait_available(&self) -> impl Future<Output = Result<(), Error>> { - self.wait_for_state(|state| state.is_available()) + pub fn wait_for_unsuspend(&self) -> impl Future<Output = Result<(), Error>> { + self.wait_for_state(|state| !state.is_suspended()) + } + + pub fn wait_background(&self) -> impl Future<Output = Result<(), Error>> { + self.wait_for_state(|state| !state.is_background_paused()) } pub fn wait_online(&self) -> impl Future<Output = Result<(), Error>> { diff --git a/mullvad-rpc/src/lib.rs b/mullvad-rpc/src/lib.rs index 38f1499d02..61654cab1c 100644 --- a/mullvad-rpc/src/lib.rs +++ b/mullvad-rpc/src/lib.rs @@ -117,11 +117,7 @@ impl MullvadRpcRuntime { ) -> Result<Self, Error> { Ok(MullvadRpcRuntime { handle, - address_cache: AddressCache::new( - vec![API_ADDRESS.clone()], - None, - Arc::new(Box::new(|_| Ok(()))), - )?, + address_cache: AddressCache::new(vec![API_ADDRESS.clone()], None)?, api_availability: ApiAvailability::new(availability::State::default()), #[cfg(target_os = "android")] socket_bypass_tx, @@ -136,7 +132,6 @@ impl MullvadRpcRuntime { resource_dir: Option<&Path>, cache_dir: &Path, write_changes: bool, - address_change_listener: impl Fn(SocketAddr) -> Result<(), ()> + Send + Sync + 'static, #[cfg(target_os = "android")] socket_bypass_tx: Option<mpsc::Sender<SocketBypassRequest>>, ) -> Result<Self, Error> { #[cfg(feature = "api-override")] @@ -155,16 +150,7 @@ impl MullvadRpcRuntime { None }; - let address_change_listener = - Arc::<Box<CurrentAddressChangeListener>>::new(Box::new(address_change_listener)); - - let address_cache = match AddressCache::from_file( - &cache_file, - write_file.clone(), - address_change_listener.clone(), - ) - .await - { + let address_cache = match AddressCache::from_file(&cache_file, write_file.clone()).await { Ok(cache) => cache, Err(error) => { let cache_exists = cache_file.exists(); @@ -181,12 +167,8 @@ impl MullvadRpcRuntime { match resource_dir { Some(resource_dir) => { let read_file = resource_dir.join(API_IP_CACHE_FILENAME); - let empty_listener = - Arc::<Box<CurrentAddressChangeListener>>::new(Box::new(|_| Ok(()))); - let mut cache = - AddressCache::from_file(&read_file, write_file, empty_listener).await?; + let cache = AddressCache::from_file(&read_file, write_file).await?; cache.randomize().await?; - cache.set_change_listener(address_change_listener); cache } None => return Err(Error::AddressCacheError(error)), @@ -203,6 +185,14 @@ impl MullvadRpcRuntime { }) } + pub fn set_address_change_listener( + &mut self, + address_change_listener: impl Fn(SocketAddr) -> Result<(), ()> + Send + Sync + 'static, + ) { + self.address_cache + .set_change_listener(Arc::new(Box::new(address_change_listener))); + } + /// Creates a new request service and returns a handle to it. fn new_request_service(&mut self, sni_hostname: Option<String>) -> rest::RequestServiceHandle { let https_connector = HttpsConnectorWithSni::new( diff --git a/mullvad-rpc/src/rest.rs b/mullvad-rpc/src/rest.rs index 779da6ccf5..cc2510f97f 100644 --- a/mullvad-rpc/src/rest.rs +++ b/mullvad-rpc/src/rest.rs @@ -142,11 +142,16 @@ impl RequestService { let hyper_request = request.into_request(); let host_addr = get_request_socket_addr(&hyper_request); - let (request_future, abort_handle) = - abortable(self.client.request(hyper_request).map_err(Error::from)); + let api_availability = self.api_availability.clone(); + let suspend_fut = api_availability.wait_for_unsuspend(); + let request_fut = self.client.request(hyper_request).map_err(Error::from); + + let (request_future, abort_handle) = abortable(async move { + let _ = suspend_fut.await; + request_fut.await + }); let address_cache = self.address_cache.clone(); let handle = self.handle.clone(); - let api_availability = self.api_availability.clone(); let future = async move { let response = @@ -646,7 +651,7 @@ impl MullvadRestHandle { loop { interval.tick().await; if next_check < Instant::now() { - if let Err(error) = availability.wait_available().await { + if let Err(error) = availability.wait_background().await { log::error!("Failed while waiting for API: {}", error); next_check = next_error_check(); continue; diff --git a/mullvad-setup/src/main.rs b/mullvad-setup/src/main.rs index 396d456afa..eb09513a1f 100644 --- a/mullvad-setup/src/main.rs +++ b/mullvad-setup/src/main.rs @@ -178,7 +178,6 @@ async fn remove_wireguard_key() -> Result<(), Error> { None, &cache_path, false, - |_| Ok(()), ) .await .map_err(Error::RpcInitializationError)?; |
