summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorDavid Lönnhager <david.l@mullvad.net>2021-11-01 10:55:23 +0100
committerDavid Lönnhager <david.l@mullvad.net>2021-11-01 10:55:23 +0100
commit6a6bc26be3d4033be0bb37666a0cda7211dd5aa1 (patch)
treed815d64ed044c02c4381d5d65d3a30adbb1cc21d
parent8952904a4202a1517d247043ca9a462dc819f1e5 (diff)
parent4967100c75d7f14e67a650cee83278afed3dd41a (diff)
downloadmullvadvpn-6a6bc26be3d4033be0bb37666a0cda7211dd5aa1.tar.xz
mullvadvpn-6a6bc26be3d4033be0bb37666a0cda7211dd5aa1.zip
Merge branch 'delay-startup-reqs'
-rw-r--r--CHANGELOG.md2
-rw-r--r--mullvad-daemon/Cargo.toml2
-rw-r--r--mullvad-daemon/src/account.rs12
-rw-r--r--mullvad-daemon/src/lib.rs143
-rw-r--r--mullvad-daemon/src/relays.rs12
-rw-r--r--mullvad-daemon/src/version_check.rs2
-rw-r--r--mullvad-daemon/src/wireguard.rs4
-rw-r--r--mullvad-problem-report/src/lib.rs1
-rw-r--r--mullvad-rpc/src/address_cache.rs20
-rw-r--r--mullvad-rpc/src/availability.rs51
-rw-r--r--mullvad-rpc/src/lib.rs32
-rw-r--r--mullvad-rpc/src/rest.rs13
-rw-r--r--mullvad-setup/src/main.rs1
13 files changed, 148 insertions, 147 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index e0a7526ade..29514318ca 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -40,6 +40,8 @@ Line wrap the file at 100 chars. Th
### Fixed
- Fix desktop app showing a future date for when WireGuard key was generated.
- Fix desktop app split tunneling view to not overflow on very long application names.
+- Prevent API requests from being made prior to the tunnel state machine being set up.
+ Rarely, failed requests could result in a deadlock.
#### Windows
- Fix detection of Windows 11. Problem reports will now correctly report Windows 11 instead
diff --git a/mullvad-daemon/Cargo.toml b/mullvad-daemon/Cargo.toml
index 24d21209f3..dc00d46853 100644
--- a/mullvad-daemon/Cargo.toml
+++ b/mullvad-daemon/Cargo.toml
@@ -24,7 +24,7 @@ rand = "0.7"
regex = "1.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
-tokio = { version = "1.8", features = [ "fs", "rt-multi-thread", "sync" ] }
+tokio = { version = "1.8", features = [ "fs", "rt-multi-thread", "sync", "time" ] }
tokio-stream = "0.1"
uuid = { version = "0.8", features = ["v4"] }
diff --git a/mullvad-daemon/src/account.rs b/mullvad-daemon/src/account.rs
index 9fda91a813..26795072ab 100644
--- a/mullvad-daemon/src/account.rs
+++ b/mullvad-daemon/src/account.rs
@@ -86,7 +86,7 @@ impl AccountHandle {
.await;
if result.is_ok() {
self.initial_check_abort_handle.abort();
- self.api_availability.resume();
+ self.api_availability.resume_background();
}
result
}
@@ -107,7 +107,7 @@ impl Account {
api_availability: ApiAvailabilityHandle,
) -> AccountHandle {
let accounts_proxy = AccountsProxy::new(rpc_handle);
- api_availability.pause();
+ api_availability.pause_background();
let api_availability_copy = api_availability.clone();
let accounts_proxy_copy = accounts_proxy.clone();
@@ -116,7 +116,7 @@ impl Account {
let token = if let Some(token) = token {
token
} else {
- api_availability.pause();
+ api_availability.pause_background();
return;
};
@@ -155,16 +155,16 @@ fn handle_expiry_result_inner(
) -> bool {
match result {
Ok(_expiry) if *_expiry >= chrono::Utc::now() => {
- api_availability.resume();
+ api_availability.resume_background();
true
}
Ok(_expiry) => {
- api_availability.pause();
+ api_availability.pause_background();
true
}
Err(mullvad_rpc::rest::Error::ApiError(_status, code)) => {
if code == mullvad_rpc::INVALID_ACCOUNT || code == mullvad_rpc::INVALID_AUTH {
- api_availability.pause();
+ api_availability.pause_background();
return true;
}
false
diff --git a/mullvad-daemon/src/lib.rs b/mullvad-daemon/src/lib.rs
index 7597f01020..21ca9298b8 100644
--- a/mullvad-daemon/src/lib.rs
+++ b/mullvad-daemon/src/lib.rs
@@ -24,7 +24,7 @@ mod version_check;
use futures::{
channel::{mpsc, oneshot},
future::{abortable, AbortHandle, Future},
- SinkExt, StreamExt,
+ StreamExt,
};
use log::{debug, error, info, warn};
use mullvad_rpc::availability::ApiAvailabilityHandle;
@@ -559,49 +559,6 @@ where
let runtime = tokio::runtime::Handle::current();
let (internal_event_tx, internal_event_rx) = command_channel.destructure();
- let (address_change_tx, mut address_change_rx) = mpsc::channel(0);
- let address_change_tx = std::sync::Mutex::new(address_change_tx);
- let address_change_runtime = runtime.clone();
-
- let mut rpc_runtime = mullvad_rpc::MullvadRpcRuntime::with_cache(
- runtime.clone(),
- Some(&resource_dir),
- &cache_dir,
- true,
- move |address| {
- let (result_tx, result_rx) = oneshot::channel();
-
- let mut tx = address_change_tx.lock().unwrap().clone();
- address_change_runtime.block_on(async move {
- let tunnel_command = TunnelCommand::AllowEndpoint(
- Endpoint::from_socket_address(address, TransportProtocol::Tcp),
- result_tx,
- );
- let _ = tx.send(tunnel_command).await;
- result_rx.await.map_err(|_| ())
- })
- },
- #[cfg(target_os = "android")]
- Self::create_bypass_tx(&internal_event_tx),
- )
- .await
- .map_err(Error::InitRpcFactory)?;
- let rpc_handle = rpc_runtime.mullvad_rest_handle();
- let api_availability = rpc_runtime.availability_handle();
-
- let relay_list_listener = event_listener.clone();
- let on_relay_list_update = move |relay_list: &RelayList| {
- relay_list_listener.notify_relay_list(relay_list.clone());
- };
-
- let relay_selector = relays::RelaySelector::new(
- rpc_handle.clone(),
- on_relay_list_update,
- &resource_dir,
- &cache_dir,
- api_availability.clone(),
- );
-
if let Err(error) = migrations::migrate_all(&cache_dir, &settings_dir).await {
log::error!(
@@ -615,21 +572,6 @@ where
let _ = settings.set_show_beta_releases(true).await;
}
- let app_version_info = version_check::load_cache(&cache_dir).await;
- let (version_updater, version_updater_handle) = version_check::VersionUpdater::new(
- rpc_handle.clone(),
- api_availability.clone(),
- cache_dir.clone(),
- internal_event_tx.to_specialized_sender(),
- app_version_info.clone(),
- settings.show_beta_releases,
- );
- tokio::spawn(version_updater.run());
- let account_history =
- account_history::AccountHistory::new(&settings_dir, settings.get_account_token())
- .await
- .map_err(Error::LoadAccountHistory)?;
-
// Restore the tunnel to a previous state
let target_cache = cache_dir.join(TARGET_START_STATE_FILE);
let cached_target_state: Option<TargetState> =
@@ -676,10 +618,6 @@ where
};
Self::cache_target_state(&cache_dir, initial_target_state).await;
- let initial_api_endpoint = Endpoint::from_socket_address(
- rpc_runtime.address_cache.peek_address(),
- TransportProtocol::Tcp,
- );
#[cfg(windows)]
let exclude_paths = if settings.split_tunnel.enable_exclusions {
settings
@@ -692,8 +630,26 @@ where
vec![]
};
- let (offline_state_tx, offline_state_rx) = mpsc::unbounded();
+ let mut rpc_runtime = mullvad_rpc::MullvadRpcRuntime::with_cache(
+ runtime.clone(),
+ Some(&resource_dir),
+ &cache_dir,
+ true,
+ #[cfg(target_os = "android")]
+ Self::create_bypass_tx(&internal_event_tx),
+ )
+ .await
+ .map_err(Error::InitRpcFactory)?;
+
+ let api_availability = rpc_runtime.availability_handle();
+ api_availability.suspend();
+ let initial_api_endpoint = Endpoint::from_socket_address(
+ rpc_runtime.address_cache.peek_address(),
+ TransportProtocol::Tcp,
+ );
+
+ let (offline_state_tx, offline_state_rx) = mpsc::unbounded();
let tunnel_command_tx = tunnel_state_machine::spawn(
runtime.clone(),
tunnel_state_machine::InitialTunnelState {
@@ -707,7 +663,7 @@ where
},
tunnel_parameters_generator,
log_dir,
- resource_dir,
+ resource_dir.clone(),
cache_dir.clone(),
internal_event_tx.to_specialized_sender(),
offline_state_tx,
@@ -718,19 +674,56 @@ where
.await
.map_err(Error::TunnelError)?;
- Self::forward_offline_state(&runtime, api_availability.clone(), offline_state_rx).await;
-
- let tsm_api_address_change_tx = Arc::downgrade(&tunnel_command_tx);
- tokio::spawn(async move {
- while let Some(address_change) = address_change_rx.next().await {
- if let Some(tx) = tsm_api_address_change_tx.upgrade() {
- let _ = tx.unbounded_send(address_change);
+ let address_change_runtime = runtime.clone();
+ let tunnel_cmd_weak_tx = Arc::downgrade(&tunnel_command_tx);
+ rpc_runtime.set_address_change_listener(move |address| {
+ let (result_tx, result_rx) = oneshot::channel();
+ let tx = tunnel_cmd_weak_tx.clone();
+ address_change_runtime.block_on(async move {
+ if let Some(tx) = tx.upgrade() {
+ let _ = tx.unbounded_send(TunnelCommand::AllowEndpoint(
+ Endpoint::from_socket_address(address, TransportProtocol::Tcp),
+ result_tx,
+ ));
+ result_rx.await.map_err(|_| ())
} else {
- return;
+ Err(())
}
- }
+ })
});
+ let rpc_handle = rpc_runtime.mullvad_rest_handle();
+
+ Self::forward_offline_state(&runtime, api_availability.clone(), offline_state_rx).await;
+
+ let relay_list_listener = event_listener.clone();
+ let on_relay_list_update = move |relay_list: &RelayList| {
+ relay_list_listener.notify_relay_list(relay_list.clone());
+ };
+
+ let relay_selector = relays::RelaySelector::new(
+ rpc_handle.clone(),
+ on_relay_list_update,
+ &resource_dir,
+ &cache_dir,
+ api_availability.clone(),
+ );
+
+ let app_version_info = version_check::load_cache(&cache_dir).await;
+ let (version_updater, version_updater_handle) = version_check::VersionUpdater::new(
+ rpc_handle.clone(),
+ api_availability.clone(),
+ cache_dir.clone(),
+ internal_event_tx.to_specialized_sender(),
+ app_version_info.clone(),
+ settings.show_beta_releases,
+ );
+ tokio::spawn(version_updater.run());
+ let account_history =
+ account_history::AccountHistory::new(&settings_dir, settings.get_account_token())
+ .await
+ .map_err(Error::LoadAccountHistory)?;
+
let wireguard_key_manager = wireguard::KeyManager::new(
internal_event_tx.clone(),
api_availability.clone(),
@@ -781,6 +774,8 @@ where
daemon.ensure_wireguard_keys_for_current_account().await;
+ api_availability.unsuspend();
+
Ok(daemon)
}
diff --git a/mullvad-daemon/src/relays.rs b/mullvad-daemon/src/relays.rs
index 9811864630..f1f5ec8e9e 100644
--- a/mullvad-daemon/src/relays.rs
+++ b/mullvad-daemon/src/relays.rs
@@ -1046,10 +1046,12 @@ impl RelayListUpdater {
}
async fn run(mut self, mut cmd_rx: mpsc::Receiver<bool>) {
- let mut check_interval = tokio_stream::wrappers::IntervalStream::new(
- tokio::time::interval(UPDATE_CHECK_INTERVAL),
- )
- .fuse();
+ let mut check_interval =
+ tokio_stream::wrappers::IntervalStream::new(tokio::time::interval_at(
+ (Instant::now() + UPDATE_CHECK_INTERVAL).into(),
+ UPDATE_CHECK_INTERVAL,
+ ))
+ .fuse();
let mut download_future = Box::pin(Fuse::terminated());
loop {
futures::select! {
@@ -1129,7 +1131,7 @@ impl RelayListUpdater {
tag: Option<String>,
) -> impl Future<Output = Result<Option<RelayList>, mullvad_rpc::Error>> + 'static {
let download_futures = move || {
- let available = api_handle.wait_available();
+ let available = api_handle.wait_background();
let req = rpc_handle.relay_list(tag.clone());
async move {
available.await?;
diff --git a/mullvad-daemon/src/version_check.rs b/mullvad-daemon/src/version_check.rs
index dfcc82b6e8..a97e6a7478 100644
--- a/mullvad-daemon/src/version_check.rs
+++ b/mullvad-daemon/src/version_check.rs
@@ -233,7 +233,7 @@ impl VersionUpdater {
let version_proxy = self.version_proxy.clone();
let platform_version = self.platform_version.clone();
let download_future_factory = move || {
- let when_available = api_handle.wait_available();
+ let when_available = api_handle.wait_background();
let request = version_proxy.version_check(
PRODUCT_VERSION.to_owned(),
PLATFORM,
diff --git a/mullvad-daemon/src/wireguard.rs b/mullvad-daemon/src/wireguard.rs
index 8f0eacd454..da6490cb5d 100644
--- a/mullvad-daemon/src/wireguard.rs
+++ b/mullvad-daemon/src/wireguard.rs
@@ -229,7 +229,7 @@ impl KeyManager {
let availability_handle = self.availability_handle.clone();
let future_generator = move || {
- let wait_available = availability_handle.wait_available();
+ let wait_available = availability_handle.wait_background();
let fut = inner_future_generator();
let error_tx = error_tx.clone();
let error_account = error_account.clone();
@@ -381,7 +381,7 @@ impl KeyManager {
let rotate_key_for_account =
move |old_key: &PublicKey| -> Pin<Box<dyn Future<Output = Result<PublicKey>> + Send>> {
- let wait_available = availability_handle.wait_available();
+ let wait_available = availability_handle.wait_background();
let rotate = Self::rotate_key(
daemon_tx.clone(),
http_handle.clone(),
diff --git a/mullvad-problem-report/src/lib.rs b/mullvad-problem-report/src/lib.rs
index e6312062ff..9a5bca1a61 100644
--- a/mullvad-problem-report/src/lib.rs
+++ b/mullvad-problem-report/src/lib.rs
@@ -283,7 +283,6 @@ pub fn send_problem_report(
None,
cache_dir,
false,
- |_| Ok(()),
#[cfg(target_os = "android")]
None,
))
diff --git a/mullvad-rpc/src/address_cache.rs b/mullvad-rpc/src/address_cache.rs
index b681265dc5..44a7127f7d 100644
--- a/mullvad-rpc/src/address_cache.rs
+++ b/mullvad-rpc/src/address_cache.rs
@@ -44,11 +44,7 @@ pub struct AddressCache {
impl AddressCache {
/// Initialize cache using the given list, and write changes to `write_path`.
- pub fn new(
- addresses: Vec<SocketAddr>,
- write_path: Option<Box<Path>>,
- change_listener: Arc<Box<CurrentAddressChangeListener>>,
- ) -> Result<Self, Error> {
+ pub fn new(addresses: Vec<SocketAddr>, write_path: Option<Box<Path>>) -> Result<Self, Error> {
let mut cache = AddressCacheInner::from_addresses(addresses)?;
cache.shuffle_tail();
log::trace!("API address cache: {:?}", cache.addresses);
@@ -57,23 +53,15 @@ impl AddressCache {
let address_cache = Self {
inner: Arc::new(Mutex::new(cache)),
write_path: write_path.map(|cache| Arc::from(cache)),
- change_listener,
+ change_listener: Arc::new(Box::new(|_| Ok(()))),
};
Ok(address_cache)
}
/// Initialize cache using `read_path`, and write changes to `write_path`.
- pub async fn from_file(
- read_path: &Path,
- write_path: Option<Box<Path>>,
- change_listener: Arc<Box<CurrentAddressChangeListener>>,
- ) -> Result<Self, Error> {
+ pub async fn from_file(read_path: &Path, write_path: Option<Box<Path>>) -> Result<Self, Error> {
log::debug!("Loading API addresses from {:?}", read_path);
- Self::new(
- read_address_file(read_path).await?,
- write_path,
- change_listener,
- )
+ Self::new(read_address_file(read_path).await?, write_path)
}
pub fn set_change_listener(&mut self, change_listener: Arc<Box<CurrentAddressChangeListener>>) {
diff --git a/mullvad-rpc/src/availability.rs b/mullvad-rpc/src/availability.rs
index 227bc0cd35..77803b44b2 100644
--- a/mullvad-rpc/src/availability.rs
+++ b/mullvad-rpc/src/availability.rs
@@ -18,21 +18,22 @@ pub enum Error {
#[derive(PartialEq, Eq, Clone, Copy, Debug, Default)]
pub struct State {
- pause_automatic: bool,
+ suspended: bool,
+ pause_background: bool,
offline: bool,
}
impl State {
- pub fn is_paused(&self) -> bool {
- self.pause_automatic
+ pub fn is_suspended(&self) -> bool {
+ self.suspended
}
- pub fn is_offline(&self) -> bool {
- self.offline
+ pub fn is_background_paused(&self) -> bool {
+ self.offline || self.pause_background || self.suspended
}
- pub fn is_available(&self) -> bool {
- !self.is_paused() && !self.is_offline()
+ pub fn is_offline(&self) -> bool {
+ self.offline
}
}
@@ -67,18 +68,34 @@ pub struct ApiAvailabilityHandle {
}
impl ApiAvailabilityHandle {
- pub fn pause(&self) {
+ pub fn suspend(&self) {
+ let mut state = self.state.lock().unwrap();
+ if !state.suspended {
+ state.suspended = true;
+ let _ = self.tx.send(*state);
+ }
+ }
+
+ pub fn unsuspend(&self) {
+ let mut state = self.state.lock().unwrap();
+ if state.suspended {
+ state.suspended = false;
+ let _ = self.tx.send(*state);
+ }
+ }
+
+ pub fn pause_background(&self) {
let mut state = self.state.lock().unwrap();
- if !state.pause_automatic {
- state.pause_automatic = true;
+ if !state.pause_background {
+ state.pause_background = true;
let _ = self.tx.send(*state);
}
}
- pub fn resume(&self) {
+ pub fn resume_background(&self) {
let mut state = self.state.lock().unwrap();
- if state.pause_automatic {
- state.pause_automatic = false;
+ if state.pause_background {
+ state.pause_background = false;
let _ = self.tx.send(*state);
}
}
@@ -95,8 +112,12 @@ impl ApiAvailabilityHandle {
*self.state.lock().unwrap()
}
- pub fn wait_available(&self) -> impl Future<Output = Result<(), Error>> {
- self.wait_for_state(|state| state.is_available())
+ pub fn wait_for_unsuspend(&self) -> impl Future<Output = Result<(), Error>> {
+ self.wait_for_state(|state| !state.is_suspended())
+ }
+
+ pub fn wait_background(&self) -> impl Future<Output = Result<(), Error>> {
+ self.wait_for_state(|state| !state.is_background_paused())
}
pub fn wait_online(&self) -> impl Future<Output = Result<(), Error>> {
diff --git a/mullvad-rpc/src/lib.rs b/mullvad-rpc/src/lib.rs
index 38f1499d02..61654cab1c 100644
--- a/mullvad-rpc/src/lib.rs
+++ b/mullvad-rpc/src/lib.rs
@@ -117,11 +117,7 @@ impl MullvadRpcRuntime {
) -> Result<Self, Error> {
Ok(MullvadRpcRuntime {
handle,
- address_cache: AddressCache::new(
- vec![API_ADDRESS.clone()],
- None,
- Arc::new(Box::new(|_| Ok(()))),
- )?,
+ address_cache: AddressCache::new(vec![API_ADDRESS.clone()], None)?,
api_availability: ApiAvailability::new(availability::State::default()),
#[cfg(target_os = "android")]
socket_bypass_tx,
@@ -136,7 +132,6 @@ impl MullvadRpcRuntime {
resource_dir: Option<&Path>,
cache_dir: &Path,
write_changes: bool,
- address_change_listener: impl Fn(SocketAddr) -> Result<(), ()> + Send + Sync + 'static,
#[cfg(target_os = "android")] socket_bypass_tx: Option<mpsc::Sender<SocketBypassRequest>>,
) -> Result<Self, Error> {
#[cfg(feature = "api-override")]
@@ -155,16 +150,7 @@ impl MullvadRpcRuntime {
None
};
- let address_change_listener =
- Arc::<Box<CurrentAddressChangeListener>>::new(Box::new(address_change_listener));
-
- let address_cache = match AddressCache::from_file(
- &cache_file,
- write_file.clone(),
- address_change_listener.clone(),
- )
- .await
- {
+ let address_cache = match AddressCache::from_file(&cache_file, write_file.clone()).await {
Ok(cache) => cache,
Err(error) => {
let cache_exists = cache_file.exists();
@@ -181,12 +167,8 @@ impl MullvadRpcRuntime {
match resource_dir {
Some(resource_dir) => {
let read_file = resource_dir.join(API_IP_CACHE_FILENAME);
- let empty_listener =
- Arc::<Box<CurrentAddressChangeListener>>::new(Box::new(|_| Ok(())));
- let mut cache =
- AddressCache::from_file(&read_file, write_file, empty_listener).await?;
+ let cache = AddressCache::from_file(&read_file, write_file).await?;
cache.randomize().await?;
- cache.set_change_listener(address_change_listener);
cache
}
None => return Err(Error::AddressCacheError(error)),
@@ -203,6 +185,14 @@ impl MullvadRpcRuntime {
})
}
+ pub fn set_address_change_listener(
+ &mut self,
+ address_change_listener: impl Fn(SocketAddr) -> Result<(), ()> + Send + Sync + 'static,
+ ) {
+ self.address_cache
+ .set_change_listener(Arc::new(Box::new(address_change_listener)));
+ }
+
/// Creates a new request service and returns a handle to it.
fn new_request_service(&mut self, sni_hostname: Option<String>) -> rest::RequestServiceHandle {
let https_connector = HttpsConnectorWithSni::new(
diff --git a/mullvad-rpc/src/rest.rs b/mullvad-rpc/src/rest.rs
index 779da6ccf5..cc2510f97f 100644
--- a/mullvad-rpc/src/rest.rs
+++ b/mullvad-rpc/src/rest.rs
@@ -142,11 +142,16 @@ impl RequestService {
let hyper_request = request.into_request();
let host_addr = get_request_socket_addr(&hyper_request);
- let (request_future, abort_handle) =
- abortable(self.client.request(hyper_request).map_err(Error::from));
+ let api_availability = self.api_availability.clone();
+ let suspend_fut = api_availability.wait_for_unsuspend();
+ let request_fut = self.client.request(hyper_request).map_err(Error::from);
+
+ let (request_future, abort_handle) = abortable(async move {
+ let _ = suspend_fut.await;
+ request_fut.await
+ });
let address_cache = self.address_cache.clone();
let handle = self.handle.clone();
- let api_availability = self.api_availability.clone();
let future = async move {
let response =
@@ -646,7 +651,7 @@ impl MullvadRestHandle {
loop {
interval.tick().await;
if next_check < Instant::now() {
- if let Err(error) = availability.wait_available().await {
+ if let Err(error) = availability.wait_background().await {
log::error!("Failed while waiting for API: {}", error);
next_check = next_error_check();
continue;
diff --git a/mullvad-setup/src/main.rs b/mullvad-setup/src/main.rs
index 396d456afa..eb09513a1f 100644
--- a/mullvad-setup/src/main.rs
+++ b/mullvad-setup/src/main.rs
@@ -178,7 +178,6 @@ async fn remove_wireguard_key() -> Result<(), Error> {
None,
&cache_path,
false,
- |_| Ok(()),
)
.await
.map_err(Error::RpcInitializationError)?;