summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorDavid Lönnhager <david.l@mullvad.net>2022-02-28 17:59:49 +0100
committerDavid Lönnhager <david.l@mullvad.net>2022-03-01 15:30:23 +0100
commitd2bdcd5878ae6ea748e10fe8e430b80548ec2fb5 (patch)
tree47690c0d69b5793eaf0348faaa4440fe92abf84e
parentbcf3278eeb1b63f2ff8fa6ee68ab4cc8bb8b76fd (diff)
downloadmullvadvpn-d2bdcd5878ae6ea748e10fe8e430b80548ec2fb5.tar.xz
mullvadvpn-d2bdcd5878ae6ea748e10fe8e430b80548ec2fb5.zip
Add proxy config generator to daemon
-rw-r--r--mullvad-daemon/src/api.rs54
-rw-r--r--mullvad-daemon/src/lib.rs120
-rw-r--r--mullvad-daemon/src/relays/mod.rs12
-rw-r--r--mullvad-problem-report/src/lib.rs86
-rw-r--r--mullvad-setup/src/main.rs16
5 files changed, 221 insertions, 67 deletions
diff --git a/mullvad-daemon/src/api.rs b/mullvad-daemon/src/api.rs
new file mode 100644
index 0000000000..2ed689b418
--- /dev/null
+++ b/mullvad-daemon/src/api.rs
@@ -0,0 +1,54 @@
+use crate::DaemonEventSender;
+use futures::{channel::oneshot, stream, Stream, StreamExt};
+use mullvad_rpc::proxy::ApiConnectionMode;
+use talpid_core::mpsc::Sender;
+use talpid_types::ErrorExt;
+
+pub(crate) struct ApiConnectionModeRequest {
+ pub response_tx: oneshot::Sender<ApiConnectionMode>,
+ pub retry_attempt: u32,
+}
+
+/// Returns a stream that returns the next API bridge to try.
+/// `initial_config` refers to the first config returned by the stream. The daemon is not notified
+/// of this.
+pub(crate) fn create_api_config_provider(
+ daemon_sender: DaemonEventSender<ApiConnectionModeRequest>,
+ initial_config: ApiConnectionMode,
+) -> impl Stream<Item = ApiConnectionMode> + Unpin {
+ struct Context {
+ attempt: u32,
+ daemon_sender: DaemonEventSender<ApiConnectionModeRequest>,
+ }
+
+ let ctx = Context {
+ attempt: 1,
+ daemon_sender,
+ };
+
+ Box::pin(
+ stream::once(async move { initial_config }).chain(stream::unfold(
+ ctx,
+ |mut ctx| async move {
+ ctx.attempt = ctx.attempt.wrapping_add(1);
+ let (response_tx, response_rx) = oneshot::channel();
+
+ let _ = ctx.daemon_sender.send(ApiConnectionModeRequest {
+ response_tx,
+ retry_attempt: ctx.attempt,
+ });
+
+ let new_config = response_rx.await.unwrap_or_else(|error| {
+ log::error!(
+ "{}",
+ error.display_chain_with_msg("Failed to receive API proxy config")
+ );
+ // Fall back on unbridged connection
+ ApiConnectionMode::Direct
+ });
+
+ Some((new_config, ctx))
+ },
+ )),
+ )
+}
diff --git a/mullvad-daemon/src/lib.rs b/mullvad-daemon/src/lib.rs
index f8890abc9b..35b6f7ead7 100644
--- a/mullvad-daemon/src/lib.rs
+++ b/mullvad-daemon/src/lib.rs
@@ -6,6 +6,7 @@ extern crate serde;
mod account;
pub mod account_history;
+mod api;
pub mod exception_logging;
#[cfg(target_os = "macos")]
pub mod exclusion_gid;
@@ -29,7 +30,10 @@ use futures::{
future::{abortable, AbortHandle, Future},
StreamExt,
};
-use mullvad_rpc::availability::ApiAvailabilityHandle;
+use mullvad_rpc::{
+ availability::ApiAvailabilityHandle,
+ proxy::{ApiConnectionMode, ProxyConfig},
+};
use mullvad_types::{
account::{AccountData, AccountToken, VoucherSubmission},
endpoint::MullvadEndpoint,
@@ -54,7 +58,7 @@ use std::{collections::HashSet, ffi::OsString};
use std::{
marker::PhantomData,
mem,
- net::{IpAddr, Ipv4Addr},
+ net::{IpAddr, Ipv4Addr, SocketAddr},
path::PathBuf,
pin::Pin,
sync::{mpsc as sync_mpsc, Arc, Weak},
@@ -70,8 +74,8 @@ use talpid_core::{
use talpid_types::android::AndroidContext;
use talpid_types::{
net::{
- openvpn, AllowedEndpoint, Endpoint, TransportProtocol, TunnelEndpoint, TunnelParameters,
- TunnelType,
+ openvpn::{self, ProxySettings},
+ AllowedEndpoint, Endpoint, TransportProtocol, TunnelEndpoint, TunnelParameters, TunnelType,
},
tunnel::{ErrorStateCause, ParameterGenerationError, TunnelStateTransition},
ErrorExt,
@@ -327,6 +331,8 @@ pub(crate) enum InternalDaemonEvent {
NewAccountEvent(AccountToken, oneshot::Sender<Result<String, Error>>),
/// The background job fetching new `AppVersionInfo`s got a new info object.
NewAppVersionInfo(AppVersionInfo),
+ /// Request from REST client to use a different API endpoint.
+ GenerateApiConnectionMode(api::ApiConnectionModeRequest),
/// The split tunnel paths or state were updated.
#[cfg(target_os = "windows")]
ExcludedPathsEvent(ExcludedPathsUpdate, oneshot::Sender<Result<(), Error>>),
@@ -356,6 +362,12 @@ impl From<AppVersionInfo> for InternalDaemonEvent {
}
}
+impl From<api::ApiConnectionModeRequest> for InternalDaemonEvent {
+ fn from(request: api::ApiConnectionModeRequest) -> Self {
+ InternalDaemonEvent::GenerateApiConnectionMode(request)
+ }
+}
+
#[derive(Clone, Debug, Eq, PartialEq)]
enum DaemonExecutionState {
Running,
@@ -546,6 +558,7 @@ pub struct Daemon<L: EventListener> {
app_version_info: Option<AppVersionInfo>,
shutdown_tasks: Vec<Pin<Box<dyn Future<Output = ()>>>>,
tunnel_state_machine_handle: tunnel_state_machine::JoinHandle,
+ cache_dir: PathBuf,
#[cfg(target_os = "windows")]
volume_update_tx: mpsc::UnboundedSender<()>,
}
@@ -569,6 +582,8 @@ where
exclusion_gid::set_exclusion_gid().map_err(Error::GroupIdError)?
};
+ mullvad_rpc::proxy::ApiConnectionMode::try_delete_cache(&cache_dir).await;
+
let runtime = tokio::runtime::Handle::current();
let (internal_event_tx, internal_event_rx) = command_channel.destructure();
@@ -606,7 +621,7 @@ where
vec![]
};
- let mut rpc_runtime = mullvad_rpc::MullvadRpcRuntime::with_cache(
+ let rpc_runtime = mullvad_rpc::MullvadRpcRuntime::with_cache(
&cache_dir,
true,
#[cfg(target_os = "android")]
@@ -619,7 +634,7 @@ where
api_availability.suspend();
let initial_api_endpoint =
- Self::get_allowed_endpoint(rpc_runtime.address_cache.get_address());
+ Self::get_allowed_endpoint(rpc_runtime.address_cache.get_address().await);
let (offline_state_tx, offline_state_rx) = mpsc::unbounded();
#[cfg(target_os = "windows")]
@@ -649,25 +664,37 @@ where
.await
.map_err(Error::TunnelError)?;
- let address_change_runtime = runtime.clone();
- let tunnel_cmd_weak_tx = Arc::downgrade(&tunnel_command_tx);
- rpc_runtime.set_address_change_listener(move |address| {
- let (result_tx, result_rx) = oneshot::channel();
- let tx = tunnel_cmd_weak_tx.clone();
- address_change_runtime.block_on(async move {
- if let Some(tx) = tx.upgrade() {
- let _ = tx.unbounded_send(TunnelCommand::AllowEndpoint(
- Self::get_allowed_endpoint(address),
+ let api_endpoint_tunnel_tx = Arc::downgrade(&tunnel_command_tx);
+ let api_endpoint_handler = move |address: SocketAddr| {
+ let tunnel_tx = api_endpoint_tunnel_tx.clone();
+ async move {
+ let (result_tx, result_rx) = oneshot::channel();
+ if let Some(tunnel_tx) = tunnel_tx.upgrade() {
+ let _ = tunnel_tx.unbounded_send(TunnelCommand::AllowEndpoint(
+ Self::get_allowed_endpoint(address.clone()),
result_tx,
));
- result_rx.await.map_err(|_| ())
+ if result_rx.await.is_ok() {
+ log::debug!("API endpoint: {}", address);
+ true
+ } else {
+ log::error!("Failed to update allowed endpoint");
+ false
+ }
} else {
- Err(())
+ log::error!("Tunnel state machine is down");
+ false
}
- })
- });
+ }
+ };
- let rpc_handle = rpc_runtime.mullvad_rest_handle();
+ let proxy_provider = api::create_api_config_provider(
+ internal_event_tx.to_specialized_sender(),
+ ApiConnectionMode::Direct,
+ );
+ let rpc_handle = rpc_runtime
+ .mullvad_rest_handle(proxy_provider, api_endpoint_handler)
+ .await;
Self::forward_offline_state(api_availability.clone(), offline_state_rx).await;
@@ -740,6 +767,7 @@ where
app_version_info,
shutdown_tasks: vec![],
tunnel_state_machine_handle,
+ cache_dir,
#[cfg(target_os = "windows")]
volume_update_tx,
};
@@ -902,6 +930,9 @@ where
NewAppVersionInfo(app_version_info) => {
self.handle_new_app_version_info(app_version_info)
}
+ GenerateApiConnectionMode(request) => {
+ self.handle_generate_api_connection_mode(request).await
+ }
#[cfg(windows)]
ExcludedPathsEvent(update, tx) => self.handle_new_excluded_paths(update, tx).await,
}
@@ -1072,7 +1103,7 @@ where
BridgeState::On => {
let (bridge_settings, bridge_relay) = self
.relay_selector
- .get_proxy_settings(&bridge_constraints, location)
+ .get_proxy_settings(&bridge_constraints, Some(location))
.ok_or(Error::NoBridgeAvailable)?;
self.last_generated_bridge_relay = Some(bridge_relay);
Some(bridge_settings)
@@ -1081,7 +1112,7 @@ where
if let Some((bridge_settings, bridge_relay)) =
self.relay_selector.get_auto_proxy_settings(
&bridge_constraints,
- location,
+ Some(location),
retry_attempt,
)
{
@@ -1348,6 +1379,43 @@ where
self.event_listener.notify_app_version(app_version_info);
}
+ async fn handle_generate_api_connection_mode(
+ &mut self,
+ request: api::ApiConnectionModeRequest,
+ ) {
+ let constraints = InternalBridgeConstraints {
+ location: Constraint::Any,
+ providers: Constraint::Any,
+ transport_protocol: Constraint::Only(TransportProtocol::Tcp),
+ };
+
+ let bridge = if request.retry_attempt % 3 > 0 {
+ self.relay_selector.get_proxy_settings(&constraints, None)
+ } else {
+ None
+ };
+ let config = match bridge {
+ Some((settings, _relay)) => match settings {
+ ProxySettings::Shadowsocks(ss_settings) => {
+ ApiConnectionMode::Proxied(ProxyConfig::Shadowsocks(ss_settings))
+ }
+ _ => {
+ log::error!("Received unexpected proxy settings type");
+ ApiConnectionMode::Direct
+ }
+ },
+ None => ApiConnectionMode::Direct,
+ };
+
+ if let Err(error) = config.save(&self.cache_dir).await {
+ log::debug!(
+ "{}",
+ error.display_chain_with_msg("Failed to save API endpoint")
+ );
+ }
+ let _ = request.response_tx.send(config);
+ }
+
#[cfg(windows)]
async fn handle_new_excluded_paths(
&mut self,
@@ -1406,7 +1474,7 @@ where
match &self.tunnel_state {
Disconnected => {
- let location = self.get_geo_location();
+ let location = self.get_geo_location().await;
tokio::spawn(async {
Self::oneshot_send(tx, location.await.ok(), "current location");
});
@@ -1419,7 +1487,7 @@ where
}
Connected { location, .. } => {
let relay_location = location.clone();
- let location_future = self.get_geo_location();
+ let location_future = self.get_geo_location().await;
tokio::spawn(async {
let location = location_future.await;
Self::oneshot_send(
@@ -1440,8 +1508,8 @@ where
}
}
- fn get_geo_location(&mut self) -> impl Future<Output = Result<GeoIpLocation, ()>> {
- let rpc_service = self.rpc_runtime.rest_handle();
+ async fn get_geo_location(&mut self) -> impl Future<Output = Result<GeoIpLocation, ()>> {
+ let rpc_service = self.rpc_runtime.rest_handle().await;
async {
geoip::send_location_request(rpc_service)
.await
diff --git a/mullvad-daemon/src/relays/mod.rs b/mullvad-daemon/src/relays/mod.rs
index 4f96a6ddcd..43bd2a697a 100644
--- a/mullvad-daemon/src/relays/mod.rs
+++ b/mullvad-daemon/src/relays/mod.rs
@@ -638,7 +638,7 @@ impl RelaySelector {
pub fn get_auto_proxy_settings(
&mut self,
bridge_constraints: &InternalBridgeConstraints,
- location: &Location,
+ location: Option<&Location>,
retry_attempt: u32,
) -> Option<(ProxySettings, Relay)> {
if !self.should_use_bridge(retry_attempt) {
@@ -666,7 +666,7 @@ impl RelaySelector {
pub fn get_proxy_settings(
&mut self,
constraints: &InternalBridgeConstraints,
- location: &Location,
+ location: Option<&Location>,
) -> Option<(ProxySettings, Relay)> {
let mut matching_relays: Vec<Relay> = self
.parsed_relays
@@ -681,9 +681,11 @@ impl RelaySelector {
return None;
}
- matching_relays.sort_by_cached_key(|relay| {
- (relay.location.as_ref().unwrap().distance_from(&location) * 1000.0) as i64
- });
+ if let Some(location) = location {
+ matching_relays.sort_by_cached_key(|relay| {
+ (relay.location.as_ref().unwrap().distance_from(&location) * 1000.0) as i64
+ });
+ }
matching_relays.get(0).and_then(|relay| {
self.pick_random_bridge(&relay)
.map(|bridge| (bridge, relay.clone()))
diff --git a/mullvad-problem-report/src/lib.rs b/mullvad-problem-report/src/lib.rs
index b652a0411f..4b9def6adb 100644
--- a/mullvad-problem-report/src/lib.rs
+++ b/mullvad-problem-report/src/lib.rs
@@ -1,6 +1,7 @@
#![deny(rust_2018_idioms)]
use lazy_static::lazy_static;
+use mullvad_rpc::proxy::ApiConnectionMode;
use regex::Regex;
use std::{
borrow::Cow,
@@ -270,49 +271,70 @@ pub fn send_problem_report(
}
})?,
);
- let metadata =
- ProblemReport::parse_metadata(&report_content).unwrap_or_else(|| metadata::collect());
let runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(2)
.enable_all()
.build()
.map_err(Error::CreateRuntime)?;
+ runtime.block_on(send_problem_report_inner(
+ user_email,
+ user_message,
+ &report_content,
+ cache_dir,
+ ))
+}
- let mut rpc_manager = runtime
- .block_on(mullvad_rpc::MullvadRpcRuntime::with_cache(
- cache_dir,
- false,
- #[cfg(target_os = "android")]
- None,
- ))
- .map_err(Error::CreateRpcClientError)?;
- let rpc_client = mullvad_rpc::ProblemReportProxy::new(rpc_manager.mullvad_rest_handle());
+async fn send_problem_report_inner(
+ user_email: &str,
+ user_message: &str,
+ report_content: &str,
+ cache_dir: &Path,
+) -> Result<(), Error> {
+ let metadata =
+ ProblemReport::parse_metadata(&report_content).unwrap_or_else(|| metadata::collect());
+ let rpc_runtime = mullvad_rpc::MullvadRpcRuntime::with_cache(
+ cache_dir,
+ false,
+ #[cfg(target_os = "android")]
+ None,
+ )
+ .await
+ .map_err(Error::CreateRpcClientError)?;
- runtime.block_on(async move {
- for _attempt in 0..MAX_SEND_ATTEMPTS {
- match rpc_client
- .problem_report(user_email, user_message, &report_content, &metadata)
- .await
- {
- Ok(()) => {
- return Ok(());
- }
- Err(error) => {
- if !error.is_network_error() {
- return Err(Error::SendProblemReportError(error));
- }
- log::error!(
- "{}",
- error.display_chain_with_msg(
- "Failed to send problem report due to network error"
- )
- );
+ let rpc_client = mullvad_rpc::ProblemReportProxy::new(
+ rpc_runtime
+ .mullvad_rest_handle(
+ ApiConnectionMode::try_from_cache(cache_dir)
+ .await
+ .into_repeat(),
+ |_| async { true },
+ )
+ .await,
+ );
+
+ for _attempt in 0..MAX_SEND_ATTEMPTS {
+ match rpc_client
+ .problem_report(user_email, user_message, &report_content, &metadata)
+ .await
+ {
+ Ok(()) => {
+ return Ok(());
+ }
+ Err(error) => {
+ if !error.is_network_error() {
+ return Err(Error::SendProblemReportError(error));
}
+ log::error!(
+ "{}",
+ error.display_chain_with_msg(
+ "Failed to send problem report due to network error"
+ )
+ );
}
}
- Err(Error::SendFailedTooManyTimes)
- })
+ }
+ Err(Error::SendFailedTooManyTimes)
}
fn write_problem_report(path: &Path, problem_report: &ProblemReport) -> io::Result<()> {
diff --git a/mullvad-setup/src/main.rs b/mullvad-setup/src/main.rs
index 36e1004742..e65b1278f8 100644
--- a/mullvad-setup/src/main.rs
+++ b/mullvad-setup/src/main.rs
@@ -1,6 +1,6 @@
use clap::{crate_authors, crate_description, crate_name, App};
use mullvad_management_interface::new_rpc_client;
-use mullvad_rpc::MullvadRpcRuntime;
+use mullvad_rpc::{proxy::ApiConnectionMode, MullvadRpcRuntime};
use mullvad_types::version::ParsedAppVersion;
use std::{path::PathBuf, process, time::Duration};
use talpid_core::{
@@ -165,11 +165,19 @@ async fn remove_wireguard_key() -> Result<(), Error> {
if let Some(token) = settings.get_account_token() {
if let Some(wg_data) = settings.get_wireguard() {
- let mut rpc_runtime = MullvadRpcRuntime::with_cache(&cache_path, false)
+ let rpc_runtime = MullvadRpcRuntime::with_cache(&cache_path, false)
.await
.map_err(Error::RpcInitializationError)?;
- let mut key_proxy =
- mullvad_rpc::WireguardKeyProxy::new(rpc_runtime.mullvad_rest_handle());
+ let mut key_proxy = mullvad_rpc::WireguardKeyProxy::new(
+ rpc_runtime
+ .mullvad_rest_handle(
+ ApiConnectionMode::try_from_cache(&cache_path)
+ .await
+ .into_repeat(),
+ |_| async { true },
+ )
+ .await,
+ );
retry_future_n(
move || {
key_proxy.remove_wireguard_key(token.clone(), wg_data.private_key.public_key())