diff options
| author | David Lönnhager <david.l@mullvad.net> | 2022-04-21 15:31:45 +0200 |
|---|---|---|
| committer | David Lönnhager <david.l@mullvad.net> | 2022-04-26 10:44:05 +0200 |
| commit | abfc684140de6d55dd4a31f0161d59dd58c926e2 (patch) | |
| tree | 61c3395d1fd497b00168034c0097fba40cfdfc58 | |
| parent | 1f43ff79447f553edf3f328ae8de3c65c05a1ef5 (diff) | |
| download | mullvadvpn-abfc684140de6d55dd4a31f0161d59dd58c926e2.tar.xz mullvadvpn-abfc684140de6d55dd4a31f0161d59dd58c926e2.zip | |
Decouple API bridge selection from the daemon
| -rw-r--r-- | mullvad-daemon/src/api.rs | 168 | ||||
| -rw-r--r-- | mullvad-daemon/src/lib.rs | 90 | ||||
| -rw-r--r-- | mullvad-relay-selector/src/lib.rs | 58 |
3 files changed, 157 insertions, 159 deletions
diff --git a/mullvad-daemon/src/api.rs b/mullvad-daemon/src/api.rs index 99d614a741..48870049c7 100644 --- a/mullvad-daemon/src/api.rs +++ b/mullvad-daemon/src/api.rs @@ -1,66 +1,148 @@ -use crate::DaemonEventSender; use futures::{ channel::{mpsc, oneshot}, - stream, Stream, StreamExt, + Future, Stream, }; -use mullvad_api::{proxy::ApiConnectionMode, ApiEndpointUpdateCallback}; +use mullvad_api::{ + proxy::{ApiConnectionMode, ProxyConfig}, + ApiEndpointUpdateCallback, +}; +use mullvad_relay_selector::RelaySelector; use std::{ net::SocketAddr, + path::PathBuf, + pin::Pin, sync::{Arc, Mutex, Weak}, + task::Poll, }; -use talpid_core::{mpsc::Sender, tunnel_state_machine::TunnelCommand}; +use talpid_core::tunnel_state_machine::TunnelCommand; use talpid_types::{ - net::{AllowedEndpoint, Endpoint, TransportProtocol}, + net::{openvpn::ProxySettings, AllowedEndpoint, Endpoint, TransportProtocol}, ErrorExt, }; -pub(crate) struct ApiConnectionModeRequest { - pub response_tx: oneshot::Sender<ApiConnectionMode>, - pub retry_attempt: u32, +/// A stream that returns the next API connection mode to use for reaching the API. +/// +/// When `mullvad-api` fails to contact the API, it requests a new connection mode. +/// The API can be connected to either directly (i.e., [`ApiConnectionMode::Direct`]) +/// or from a bridge ([`ApiConnectionMode::Proxied`]). +/// +/// * Every 3rd attempt returns [`ApiConnectionMode::Direct`]. +/// * Any other attempt returns a configuration for the bridge that is closest to the selected relay +/// location and matches all bridge constraints. +/// * When no matching bridge is found, e.g. if the selected hosting providers don't match any +/// bridge, [`ApiConnectionMode::Direct`] is returned. +pub struct ApiConnectionModeProvider { + cache_dir: PathBuf, + + selector_rx: Option<oneshot::Receiver<RelaySelector>>, + + relay_selector: Option<RelaySelector>, + retry_attempt: u32, + + current_task: Option<Pin<Box<dyn Future<Output = ApiConnectionMode> + Send>>>, } -/// Returns a stream that returns the next API bridge to try. -/// `initial_config` refers to the first config returned by the stream. The daemon is not notified -/// of this. -pub(crate) fn create_api_config_provider( - daemon_sender: DaemonEventSender<ApiConnectionModeRequest>, - initial_config: ApiConnectionMode, -) -> impl Stream<Item = ApiConnectionMode> + Unpin { - struct Context { - attempt: u32, - daemon_sender: DaemonEventSender<ApiConnectionModeRequest>, +impl Stream for ApiConnectionModeProvider { + type Item = ApiConnectionMode; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll<Option<Self::Item>> { + // First, obtain a relay selector handle + // Until we have obtained the handle, just return `ApiConnectionMode::Direct`. + if let Some(rx) = self.selector_rx.as_mut() { + match rx.try_recv() { + Ok(Some(selector)) => { + self.selector_rx = None; + self.relay_selector = Some(selector); + } + Ok(None) | Err(_) => return Poll::Ready(Some(ApiConnectionMode::Direct)), + } + } + + // Poll the current task + if let Some(task) = self.current_task.as_mut() { + return match task.as_mut().poll(cx) { + Poll::Ready(mode) => { + self.current_task = None; + Poll::Ready(Some(mode)) + } + Poll::Pending => Poll::Pending, + }; + } + + // Create a new task. + // Unwrapping is safe since we must have a handle at this point. + let selector = self.relay_selector.as_ref().unwrap(); + + let config = if Self::should_use_bridge(self.retry_attempt) { + selector + .get_api_bridge() + .map(|settings| match settings { + ProxySettings::Shadowsocks(ss_settings) => { + ApiConnectionMode::Proxied(ProxyConfig::Shadowsocks(ss_settings)) + } + _ => { + log::error!("Received unexpected proxy settings type"); + ApiConnectionMode::Direct + } + }) + .unwrap_or(ApiConnectionMode::Direct) + } else { + ApiConnectionMode::Direct + }; + + self.retry_attempt = self.retry_attempt.wrapping_add(1); + + let cache_dir = self.cache_dir.clone(); + self.current_task = Some(Box::pin(async move { + if let Err(error) = config.save(&cache_dir).await { + log::debug!( + "{}", + error.display_chain_with_msg("Failed to save API endpoint") + ); + } + config + })); + + return self.poll_next(cx); } +} - let ctx = Context { - attempt: 1, - daemon_sender, - }; +impl ApiConnectionModeProvider { + pub(crate) fn new(cache_dir: PathBuf) -> (Self, ApiConnectionModeProviderHandle) { + let (selector_tx, selector_rx) = oneshot::channel(); - Box::pin( - stream::once(async move { initial_config }).chain(stream::unfold( - ctx, - |mut ctx| async move { - ctx.attempt = ctx.attempt.wrapping_add(1); - let (response_tx, response_rx) = oneshot::channel(); + ( + Self { + cache_dir, - let _ = ctx.daemon_sender.send(ApiConnectionModeRequest { - response_tx, - retry_attempt: ctx.attempt, - }); + selector_rx: Some(selector_rx), - let new_config = response_rx.await.unwrap_or_else(|error| { - log::error!( - "{}", - error.display_chain_with_msg("Failed to receive API proxy config") - ); - // Fall back on unbridged connection - ApiConnectionMode::Direct - }); + relay_selector: None, + retry_attempt: 0, - Some((new_config, ctx)) + current_task: None, }, - )), - ) + ApiConnectionModeProviderHandle { tx: selector_tx }, + ) + } + + fn should_use_bridge(retry_attempt: u32) -> bool { + retry_attempt % 3 > 0 + } +} + +/// Used to initialize [ApiConnectionModeProvider]'s relay selector handle. +pub(crate) struct ApiConnectionModeProviderHandle { + tx: oneshot::Sender<RelaySelector>, +} + +impl ApiConnectionModeProviderHandle { + pub fn set_relay_selector(self, selector: RelaySelector) { + let _ = self.tx.send(selector); + } } /// Notifies the tunnel state machine that the API (real or proxied) endpoint has diff --git a/mullvad-daemon/src/lib.rs b/mullvad-daemon/src/lib.rs index 9db0afd719..54c6c66717 100644 --- a/mullvad-daemon/src/lib.rs +++ b/mullvad-daemon/src/lib.rs @@ -362,8 +362,6 @@ pub(crate) enum InternalDaemonEvent { TriggerShutdown, /// The background job fetching new `AppVersionInfo`s got a new info object. NewAppVersionInfo(AppVersionInfo), - /// Request from REST client to use a different API endpoint. - GenerateApiConnectionMode(api::ApiConnectionModeRequest), /// Sent when a device is updated in any way (key rotation, login, logout, etc.). DeviceEvent(InnerDeviceEvent), /// Handles updates from versions without devices. @@ -397,12 +395,6 @@ impl From<AppVersionInfo> for InternalDaemonEvent { } } -impl From<api::ApiConnectionModeRequest> for InternalDaemonEvent { - fn from(request: api::ApiConnectionModeRequest) -> Self { - InternalDaemonEvent::GenerateApiConnectionMode(request) - } -} - impl From<InnerDeviceEvent> for InternalDaemonEvent { fn from(event: InnerDeviceEvent) -> Self { InternalDaemonEvent::DeviceEvent(event) @@ -644,10 +636,8 @@ where let endpoint_updater = api::ApiEndpointUpdaterHandle::new(); - let proxy_provider = api::create_api_config_provider( - internal_event_tx.to_specialized_sender(), - ApiConnectionMode::Direct, - ); + let (proxy_provider, proxy_provider_handle) = + api::ApiConnectionModeProvider::new(cache_dir.clone()); let api_handle = api_runtime .mullvad_rest_handle(proxy_provider, endpoint_updater.callback()) .await; @@ -762,6 +752,8 @@ where let initial_selector_config = new_selector_config(&settings); let relay_selector = RelaySelector::new(initial_selector_config, &resource_dir, &cache_dir); + proxy_provider_handle.set_relay_selector(relay_selector.clone()); + let mut relay_list_updater = RelayListUpdater::new( relay_selector.clone(), api_handle.clone(), @@ -952,9 +944,6 @@ where NewAppVersionInfo(app_version_info) => { self.handle_new_app_version_info(app_version_info) } - GenerateApiConnectionMode(request) => { - self.handle_generate_api_connection_mode(request).await - } DeviceEvent(event) => self.handle_device_event(event).await, DeviceMigrationEvent(event) => self.handle_device_migration_event(event).await, #[cfg(windows)] @@ -1291,77 +1280,6 @@ where self.event_listener.notify_app_version(app_version_info); } - /// Returns the next API connection mode to use for reaching the API. - /// - /// When `mullvad-api` fails to contact the API, it requests a new connection mode - /// from this function, which will be used for future requests. The API can be - /// connected to either directly (i.e., [`ApiConnectionMode::Direct`]) or from - /// a bridge ([`ApiConnectionMode::Proxied`]). - /// - /// * Every 3rd attempt returns [`ApiConnectionMode::Direct`] (i.e., no bridge). - /// * For any other attempt, this function returns a configuration for the bridge that is - /// closest to the selected relay location[^note] and matches all bridge constraints. - /// * When no matching bridge is found, e.g. if the selected hosting providers don't match any - /// bridge, [`ApiConnectionMode::Direct`] is returned. - /// - /// [^note]: The "selected relay location" is the location of the last relay that - /// the daemon connected to, or, if no relay was connected to, the "midpoint" of - /// all relays that match the selected relay location constraint. - async fn handle_generate_api_connection_mode( - &mut self, - request: api::ApiConnectionModeRequest, - ) { - let location = self - .last_generated_relays - .as_ref() - .and_then(LastSelectedRelays::first_hop_coordinates) - .or_else(|| { - if let RelaySettings::Normal(settings) = self.settings.get_relay_settings() { - self.relay_selector.get_relay_midpoint(&settings) - } else { - None - } - }); - let bridge = if request.retry_attempt % 3 > 0 { - let constraints = match &self.settings.bridge_settings { - BridgeSettings::Normal(settings) => InternalBridgeConstraints { - location: settings.location.clone(), - providers: settings.providers.clone(), - transport_protocol: Constraint::Only(TransportProtocol::Tcp), - }, - _ => InternalBridgeConstraints { - location: Constraint::Any, - providers: Constraint::Any, - transport_protocol: Constraint::Only(TransportProtocol::Tcp), - }, - }; - self.relay_selector - .get_proxy_settings(&constraints, location) - } else { - None - }; - let config = match bridge { - Some((settings, _relay)) => match settings { - ProxySettings::Shadowsocks(ss_settings) => { - ApiConnectionMode::Proxied(ProxyConfig::Shadowsocks(ss_settings)) - } - _ => { - log::error!("Received unexpected proxy settings type"); - ApiConnectionMode::Direct - } - }, - None => ApiConnectionMode::Direct, - }; - - if let Err(error) = config.save(&self.cache_dir).await { - log::debug!( - "{}", - error.display_chain_with_msg("Failed to save API endpoint") - ); - } - let _ = request.response_tx.send(config); - } - async fn handle_device_event(&mut self, event: InnerDeviceEvent) { match &event { InnerDeviceEvent::Login(device) => { diff --git a/mullvad-relay-selector/src/lib.rs b/mullvad-relay-selector/src/lib.rs index 988e5a2368..6df586027c 100644 --- a/mullvad-relay-selector/src/lib.rs +++ b/mullvad-relay-selector/src/lib.rs @@ -671,21 +671,12 @@ impl RelaySelector { relay, }))) } - BridgeState::Auto => { - if let Some((settings, relay)) = self.get_auto_proxy_settings( - &bridge_constraints, - Some(location), - retry_attempt, - ) { - Ok(Some(SelectedBridge::Normal(NormalSelectedBridge { - settings, - relay, - }))) - } else { - Ok(None) - } - } - BridgeState::Off => Ok(None), + BridgeState::Auto if self.should_use_bridge(retry_attempt) => Ok(self + .get_proxy_settings(&bridge_constraints, Some(location)) + .map(|(settings, relay)| { + SelectedBridge::Normal(NormalSelectedBridge { settings, relay }) + })), + BridgeState::Auto | BridgeState::Off => Ok(None), } } BridgeSettings::Custom(bridge_settings) => match config.bridge_state { @@ -698,23 +689,30 @@ impl RelaySelector { } } - #[cfg(not(target_os = "android"))] - fn get_auto_proxy_settings<T: Into<Coordinates>>( - &self, - bridge_constraints: &InternalBridgeConstraints, - location: Option<T>, - retry_attempt: u32, - ) -> Option<(ProxySettings, Relay)> { - if !self.should_use_bridge(retry_attempt) { - return None; - } + /// Returns a bridge based on the relay and bridge constraints, ignoring the bridge state. + pub fn get_api_bridge(&self) -> Option<ProxySettings> { + let config = self.config.lock(); - // For now, only TCP tunnels are supported. - if let Constraint::Only(TransportProtocol::Udp) = bridge_constraints.transport_protocol { - return None; - } + let near_location = match &config.relay_settings { + RelaySettings::Normal(settings) => self.get_relay_midpoint(settings), + _ => None, + }; + + let constraints = match &config.bridge_settings { + BridgeSettings::Normal(settings) => InternalBridgeConstraints { + location: settings.location.clone(), + providers: settings.providers.clone(), + transport_protocol: Constraint::Only(TransportProtocol::Tcp), + }, + BridgeSettings::Custom(_bridge_settings) => InternalBridgeConstraints { + location: Constraint::Any, + providers: Constraint::Any, + transport_protocol: Constraint::Only(TransportProtocol::Tcp), + }, + }; - self.get_proxy_settings(bridge_constraints, location) + self.get_proxy_settings(&constraints, near_location) + .map(|(settings, _relay)| settings) } #[cfg(not(target_os = "android"))] |
