diff options
| author | David Lönnhager <david.l@mullvad.net> | 2022-02-28 17:59:49 +0100 |
|---|---|---|
| committer | David Lönnhager <david.l@mullvad.net> | 2022-03-01 15:30:23 +0100 |
| commit | d2bdcd5878ae6ea748e10fe8e430b80548ec2fb5 (patch) | |
| tree | 47690c0d69b5793eaf0348faaa4440fe92abf84e | |
| parent | bcf3278eeb1b63f2ff8fa6ee68ab4cc8bb8b76fd (diff) | |
| download | mullvadvpn-d2bdcd5878ae6ea748e10fe8e430b80548ec2fb5.tar.xz mullvadvpn-d2bdcd5878ae6ea748e10fe8e430b80548ec2fb5.zip | |
Add proxy config generator to daemon
| -rw-r--r-- | mullvad-daemon/src/api.rs | 54 | ||||
| -rw-r--r-- | mullvad-daemon/src/lib.rs | 120 | ||||
| -rw-r--r-- | mullvad-daemon/src/relays/mod.rs | 12 | ||||
| -rw-r--r-- | mullvad-problem-report/src/lib.rs | 86 | ||||
| -rw-r--r-- | mullvad-setup/src/main.rs | 16 |
5 files changed, 221 insertions, 67 deletions
diff --git a/mullvad-daemon/src/api.rs b/mullvad-daemon/src/api.rs new file mode 100644 index 0000000000..2ed689b418 --- /dev/null +++ b/mullvad-daemon/src/api.rs @@ -0,0 +1,54 @@ +use crate::DaemonEventSender; +use futures::{channel::oneshot, stream, Stream, StreamExt}; +use mullvad_rpc::proxy::ApiConnectionMode; +use talpid_core::mpsc::Sender; +use talpid_types::ErrorExt; + +pub(crate) struct ApiConnectionModeRequest { + pub response_tx: oneshot::Sender<ApiConnectionMode>, + pub retry_attempt: u32, +} + +/// Returns a stream that returns the next API bridge to try. +/// `initial_config` refers to the first config returned by the stream. The daemon is not notified +/// of this. +pub(crate) fn create_api_config_provider( + daemon_sender: DaemonEventSender<ApiConnectionModeRequest>, + initial_config: ApiConnectionMode, +) -> impl Stream<Item = ApiConnectionMode> + Unpin { + struct Context { + attempt: u32, + daemon_sender: DaemonEventSender<ApiConnectionModeRequest>, + } + + let ctx = Context { + attempt: 1, + daemon_sender, + }; + + Box::pin( + stream::once(async move { initial_config }).chain(stream::unfold( + ctx, + |mut ctx| async move { + ctx.attempt = ctx.attempt.wrapping_add(1); + let (response_tx, response_rx) = oneshot::channel(); + + let _ = ctx.daemon_sender.send(ApiConnectionModeRequest { + response_tx, + retry_attempt: ctx.attempt, + }); + + let new_config = response_rx.await.unwrap_or_else(|error| { + log::error!( + "{}", + error.display_chain_with_msg("Failed to receive API proxy config") + ); + // Fall back on unbridged connection + ApiConnectionMode::Direct + }); + + Some((new_config, ctx)) + }, + )), + ) +} diff --git a/mullvad-daemon/src/lib.rs b/mullvad-daemon/src/lib.rs index f8890abc9b..35b6f7ead7 100644 --- a/mullvad-daemon/src/lib.rs +++ b/mullvad-daemon/src/lib.rs @@ -6,6 +6,7 @@ extern crate serde; mod account; pub mod account_history; +mod api; pub mod exception_logging; #[cfg(target_os = "macos")] pub mod exclusion_gid; @@ -29,7 +30,10 @@ use futures::{ future::{abortable, AbortHandle, Future}, StreamExt, }; -use mullvad_rpc::availability::ApiAvailabilityHandle; +use mullvad_rpc::{ + availability::ApiAvailabilityHandle, + proxy::{ApiConnectionMode, ProxyConfig}, +}; use mullvad_types::{ account::{AccountData, AccountToken, VoucherSubmission}, endpoint::MullvadEndpoint, @@ -54,7 +58,7 @@ use std::{collections::HashSet, ffi::OsString}; use std::{ marker::PhantomData, mem, - net::{IpAddr, Ipv4Addr}, + net::{IpAddr, Ipv4Addr, SocketAddr}, path::PathBuf, pin::Pin, sync::{mpsc as sync_mpsc, Arc, Weak}, @@ -70,8 +74,8 @@ use talpid_core::{ use talpid_types::android::AndroidContext; use talpid_types::{ net::{ - openvpn, AllowedEndpoint, Endpoint, TransportProtocol, TunnelEndpoint, TunnelParameters, - TunnelType, + openvpn::{self, ProxySettings}, + AllowedEndpoint, Endpoint, TransportProtocol, TunnelEndpoint, TunnelParameters, TunnelType, }, tunnel::{ErrorStateCause, ParameterGenerationError, TunnelStateTransition}, ErrorExt, @@ -327,6 +331,8 @@ pub(crate) enum InternalDaemonEvent { NewAccountEvent(AccountToken, oneshot::Sender<Result<String, Error>>), /// The background job fetching new `AppVersionInfo`s got a new info object. NewAppVersionInfo(AppVersionInfo), + /// Request from REST client to use a different API endpoint. + GenerateApiConnectionMode(api::ApiConnectionModeRequest), /// The split tunnel paths or state were updated. #[cfg(target_os = "windows")] ExcludedPathsEvent(ExcludedPathsUpdate, oneshot::Sender<Result<(), Error>>), @@ -356,6 +362,12 @@ impl From<AppVersionInfo> for InternalDaemonEvent { } } +impl From<api::ApiConnectionModeRequest> for InternalDaemonEvent { + fn from(request: api::ApiConnectionModeRequest) -> Self { + InternalDaemonEvent::GenerateApiConnectionMode(request) + } +} + #[derive(Clone, Debug, Eq, PartialEq)] enum DaemonExecutionState { Running, @@ -546,6 +558,7 @@ pub struct Daemon<L: EventListener> { app_version_info: Option<AppVersionInfo>, shutdown_tasks: Vec<Pin<Box<dyn Future<Output = ()>>>>, tunnel_state_machine_handle: tunnel_state_machine::JoinHandle, + cache_dir: PathBuf, #[cfg(target_os = "windows")] volume_update_tx: mpsc::UnboundedSender<()>, } @@ -569,6 +582,8 @@ where exclusion_gid::set_exclusion_gid().map_err(Error::GroupIdError)? }; + mullvad_rpc::proxy::ApiConnectionMode::try_delete_cache(&cache_dir).await; + let runtime = tokio::runtime::Handle::current(); let (internal_event_tx, internal_event_rx) = command_channel.destructure(); @@ -606,7 +621,7 @@ where vec![] }; - let mut rpc_runtime = mullvad_rpc::MullvadRpcRuntime::with_cache( + let rpc_runtime = mullvad_rpc::MullvadRpcRuntime::with_cache( &cache_dir, true, #[cfg(target_os = "android")] @@ -619,7 +634,7 @@ where api_availability.suspend(); let initial_api_endpoint = - Self::get_allowed_endpoint(rpc_runtime.address_cache.get_address()); + Self::get_allowed_endpoint(rpc_runtime.address_cache.get_address().await); let (offline_state_tx, offline_state_rx) = mpsc::unbounded(); #[cfg(target_os = "windows")] @@ -649,25 +664,37 @@ where .await .map_err(Error::TunnelError)?; - let address_change_runtime = runtime.clone(); - let tunnel_cmd_weak_tx = Arc::downgrade(&tunnel_command_tx); - rpc_runtime.set_address_change_listener(move |address| { - let (result_tx, result_rx) = oneshot::channel(); - let tx = tunnel_cmd_weak_tx.clone(); - address_change_runtime.block_on(async move { - if let Some(tx) = tx.upgrade() { - let _ = tx.unbounded_send(TunnelCommand::AllowEndpoint( - Self::get_allowed_endpoint(address), + let api_endpoint_tunnel_tx = Arc::downgrade(&tunnel_command_tx); + let api_endpoint_handler = move |address: SocketAddr| { + let tunnel_tx = api_endpoint_tunnel_tx.clone(); + async move { + let (result_tx, result_rx) = oneshot::channel(); + if let Some(tunnel_tx) = tunnel_tx.upgrade() { + let _ = tunnel_tx.unbounded_send(TunnelCommand::AllowEndpoint( + Self::get_allowed_endpoint(address.clone()), result_tx, )); - result_rx.await.map_err(|_| ()) + if result_rx.await.is_ok() { + log::debug!("API endpoint: {}", address); + true + } else { + log::error!("Failed to update allowed endpoint"); + false + } } else { - Err(()) + log::error!("Tunnel state machine is down"); + false } - }) - }); + } + }; - let rpc_handle = rpc_runtime.mullvad_rest_handle(); + let proxy_provider = api::create_api_config_provider( + internal_event_tx.to_specialized_sender(), + ApiConnectionMode::Direct, + ); + let rpc_handle = rpc_runtime + .mullvad_rest_handle(proxy_provider, api_endpoint_handler) + .await; Self::forward_offline_state(api_availability.clone(), offline_state_rx).await; @@ -740,6 +767,7 @@ where app_version_info, shutdown_tasks: vec![], tunnel_state_machine_handle, + cache_dir, #[cfg(target_os = "windows")] volume_update_tx, }; @@ -902,6 +930,9 @@ where NewAppVersionInfo(app_version_info) => { self.handle_new_app_version_info(app_version_info) } + GenerateApiConnectionMode(request) => { + self.handle_generate_api_connection_mode(request).await + } #[cfg(windows)] ExcludedPathsEvent(update, tx) => self.handle_new_excluded_paths(update, tx).await, } @@ -1072,7 +1103,7 @@ where BridgeState::On => { let (bridge_settings, bridge_relay) = self .relay_selector - .get_proxy_settings(&bridge_constraints, location) + .get_proxy_settings(&bridge_constraints, Some(location)) .ok_or(Error::NoBridgeAvailable)?; self.last_generated_bridge_relay = Some(bridge_relay); Some(bridge_settings) @@ -1081,7 +1112,7 @@ where if let Some((bridge_settings, bridge_relay)) = self.relay_selector.get_auto_proxy_settings( &bridge_constraints, - location, + Some(location), retry_attempt, ) { @@ -1348,6 +1379,43 @@ where self.event_listener.notify_app_version(app_version_info); } + async fn handle_generate_api_connection_mode( + &mut self, + request: api::ApiConnectionModeRequest, + ) { + let constraints = InternalBridgeConstraints { + location: Constraint::Any, + providers: Constraint::Any, + transport_protocol: Constraint::Only(TransportProtocol::Tcp), + }; + + let bridge = if request.retry_attempt % 3 > 0 { + self.relay_selector.get_proxy_settings(&constraints, None) + } else { + None + }; + let config = match bridge { + Some((settings, _relay)) => match settings { + ProxySettings::Shadowsocks(ss_settings) => { + ApiConnectionMode::Proxied(ProxyConfig::Shadowsocks(ss_settings)) + } + _ => { + log::error!("Received unexpected proxy settings type"); + ApiConnectionMode::Direct + } + }, + None => ApiConnectionMode::Direct, + }; + + if let Err(error) = config.save(&self.cache_dir).await { + log::debug!( + "{}", + error.display_chain_with_msg("Failed to save API endpoint") + ); + } + let _ = request.response_tx.send(config); + } + #[cfg(windows)] async fn handle_new_excluded_paths( &mut self, @@ -1406,7 +1474,7 @@ where match &self.tunnel_state { Disconnected => { - let location = self.get_geo_location(); + let location = self.get_geo_location().await; tokio::spawn(async { Self::oneshot_send(tx, location.await.ok(), "current location"); }); @@ -1419,7 +1487,7 @@ where } Connected { location, .. } => { let relay_location = location.clone(); - let location_future = self.get_geo_location(); + let location_future = self.get_geo_location().await; tokio::spawn(async { let location = location_future.await; Self::oneshot_send( @@ -1440,8 +1508,8 @@ where } } - fn get_geo_location(&mut self) -> impl Future<Output = Result<GeoIpLocation, ()>> { - let rpc_service = self.rpc_runtime.rest_handle(); + async fn get_geo_location(&mut self) -> impl Future<Output = Result<GeoIpLocation, ()>> { + let rpc_service = self.rpc_runtime.rest_handle().await; async { geoip::send_location_request(rpc_service) .await diff --git a/mullvad-daemon/src/relays/mod.rs b/mullvad-daemon/src/relays/mod.rs index 4f96a6ddcd..43bd2a697a 100644 --- a/mullvad-daemon/src/relays/mod.rs +++ b/mullvad-daemon/src/relays/mod.rs @@ -638,7 +638,7 @@ impl RelaySelector { pub fn get_auto_proxy_settings( &mut self, bridge_constraints: &InternalBridgeConstraints, - location: &Location, + location: Option<&Location>, retry_attempt: u32, ) -> Option<(ProxySettings, Relay)> { if !self.should_use_bridge(retry_attempt) { @@ -666,7 +666,7 @@ impl RelaySelector { pub fn get_proxy_settings( &mut self, constraints: &InternalBridgeConstraints, - location: &Location, + location: Option<&Location>, ) -> Option<(ProxySettings, Relay)> { let mut matching_relays: Vec<Relay> = self .parsed_relays @@ -681,9 +681,11 @@ impl RelaySelector { return None; } - matching_relays.sort_by_cached_key(|relay| { - (relay.location.as_ref().unwrap().distance_from(&location) * 1000.0) as i64 - }); + if let Some(location) = location { + matching_relays.sort_by_cached_key(|relay| { + (relay.location.as_ref().unwrap().distance_from(&location) * 1000.0) as i64 + }); + } matching_relays.get(0).and_then(|relay| { self.pick_random_bridge(&relay) .map(|bridge| (bridge, relay.clone())) diff --git a/mullvad-problem-report/src/lib.rs b/mullvad-problem-report/src/lib.rs index b652a0411f..4b9def6adb 100644 --- a/mullvad-problem-report/src/lib.rs +++ b/mullvad-problem-report/src/lib.rs @@ -1,6 +1,7 @@ #![deny(rust_2018_idioms)] use lazy_static::lazy_static; +use mullvad_rpc::proxy::ApiConnectionMode; use regex::Regex; use std::{ borrow::Cow, @@ -270,49 +271,70 @@ pub fn send_problem_report( } })?, ); - let metadata = - ProblemReport::parse_metadata(&report_content).unwrap_or_else(|| metadata::collect()); let runtime = tokio::runtime::Builder::new_multi_thread() .worker_threads(2) .enable_all() .build() .map_err(Error::CreateRuntime)?; + runtime.block_on(send_problem_report_inner( + user_email, + user_message, + &report_content, + cache_dir, + )) +} - let mut rpc_manager = runtime - .block_on(mullvad_rpc::MullvadRpcRuntime::with_cache( - cache_dir, - false, - #[cfg(target_os = "android")] - None, - )) - .map_err(Error::CreateRpcClientError)?; - let rpc_client = mullvad_rpc::ProblemReportProxy::new(rpc_manager.mullvad_rest_handle()); +async fn send_problem_report_inner( + user_email: &str, + user_message: &str, + report_content: &str, + cache_dir: &Path, +) -> Result<(), Error> { + let metadata = + ProblemReport::parse_metadata(&report_content).unwrap_or_else(|| metadata::collect()); + let rpc_runtime = mullvad_rpc::MullvadRpcRuntime::with_cache( + cache_dir, + false, + #[cfg(target_os = "android")] + None, + ) + .await + .map_err(Error::CreateRpcClientError)?; - runtime.block_on(async move { - for _attempt in 0..MAX_SEND_ATTEMPTS { - match rpc_client - .problem_report(user_email, user_message, &report_content, &metadata) - .await - { - Ok(()) => { - return Ok(()); - } - Err(error) => { - if !error.is_network_error() { - return Err(Error::SendProblemReportError(error)); - } - log::error!( - "{}", - error.display_chain_with_msg( - "Failed to send problem report due to network error" - ) - ); + let rpc_client = mullvad_rpc::ProblemReportProxy::new( + rpc_runtime + .mullvad_rest_handle( + ApiConnectionMode::try_from_cache(cache_dir) + .await + .into_repeat(), + |_| async { true }, + ) + .await, + ); + + for _attempt in 0..MAX_SEND_ATTEMPTS { + match rpc_client + .problem_report(user_email, user_message, &report_content, &metadata) + .await + { + Ok(()) => { + return Ok(()); + } + Err(error) => { + if !error.is_network_error() { + return Err(Error::SendProblemReportError(error)); } + log::error!( + "{}", + error.display_chain_with_msg( + "Failed to send problem report due to network error" + ) + ); } } - Err(Error::SendFailedTooManyTimes) - }) + } + Err(Error::SendFailedTooManyTimes) } fn write_problem_report(path: &Path, problem_report: &ProblemReport) -> io::Result<()> { diff --git a/mullvad-setup/src/main.rs b/mullvad-setup/src/main.rs index 36e1004742..e65b1278f8 100644 --- a/mullvad-setup/src/main.rs +++ b/mullvad-setup/src/main.rs @@ -1,6 +1,6 @@ use clap::{crate_authors, crate_description, crate_name, App}; use mullvad_management_interface::new_rpc_client; -use mullvad_rpc::MullvadRpcRuntime; +use mullvad_rpc::{proxy::ApiConnectionMode, MullvadRpcRuntime}; use mullvad_types::version::ParsedAppVersion; use std::{path::PathBuf, process, time::Duration}; use talpid_core::{ @@ -165,11 +165,19 @@ async fn remove_wireguard_key() -> Result<(), Error> { if let Some(token) = settings.get_account_token() { if let Some(wg_data) = settings.get_wireguard() { - let mut rpc_runtime = MullvadRpcRuntime::with_cache(&cache_path, false) + let rpc_runtime = MullvadRpcRuntime::with_cache(&cache_path, false) .await .map_err(Error::RpcInitializationError)?; - let mut key_proxy = - mullvad_rpc::WireguardKeyProxy::new(rpc_runtime.mullvad_rest_handle()); + let mut key_proxy = mullvad_rpc::WireguardKeyProxy::new( + rpc_runtime + .mullvad_rest_handle( + ApiConnectionMode::try_from_cache(&cache_path) + .await + .into_repeat(), + |_| async { true }, + ) + .await, + ); retry_future_n( move || { key_proxy.remove_wireguard_key(token.clone(), wg_data.private_key.public_key()) |
