summaryrefslogtreecommitdiffhomepage
path: root/mullvad-daemon/src/lib.rs
diff options
context:
space:
mode:
authorDavid Lönnhager <david.l@mullvad.net>2022-02-28 17:59:49 +0100
committerDavid Lönnhager <david.l@mullvad.net>2022-03-01 15:30:23 +0100
commitd2bdcd5878ae6ea748e10fe8e430b80548ec2fb5 (patch)
tree47690c0d69b5793eaf0348faaa4440fe92abf84e /mullvad-daemon/src/lib.rs
parentbcf3278eeb1b63f2ff8fa6ee68ab4cc8bb8b76fd (diff)
downloadmullvadvpn-d2bdcd5878ae6ea748e10fe8e430b80548ec2fb5.tar.xz
mullvadvpn-d2bdcd5878ae6ea748e10fe8e430b80548ec2fb5.zip
Add proxy config generator to daemon
Diffstat (limited to 'mullvad-daemon/src/lib.rs')
-rw-r--r--mullvad-daemon/src/lib.rs120
1 files changed, 94 insertions, 26 deletions
diff --git a/mullvad-daemon/src/lib.rs b/mullvad-daemon/src/lib.rs
index f8890abc9b..35b6f7ead7 100644
--- a/mullvad-daemon/src/lib.rs
+++ b/mullvad-daemon/src/lib.rs
@@ -6,6 +6,7 @@ extern crate serde;
mod account;
pub mod account_history;
+mod api;
pub mod exception_logging;
#[cfg(target_os = "macos")]
pub mod exclusion_gid;
@@ -29,7 +30,10 @@ use futures::{
future::{abortable, AbortHandle, Future},
StreamExt,
};
-use mullvad_rpc::availability::ApiAvailabilityHandle;
+use mullvad_rpc::{
+ availability::ApiAvailabilityHandle,
+ proxy::{ApiConnectionMode, ProxyConfig},
+};
use mullvad_types::{
account::{AccountData, AccountToken, VoucherSubmission},
endpoint::MullvadEndpoint,
@@ -54,7 +58,7 @@ use std::{collections::HashSet, ffi::OsString};
use std::{
marker::PhantomData,
mem,
- net::{IpAddr, Ipv4Addr},
+ net::{IpAddr, Ipv4Addr, SocketAddr},
path::PathBuf,
pin::Pin,
sync::{mpsc as sync_mpsc, Arc, Weak},
@@ -70,8 +74,8 @@ use talpid_core::{
use talpid_types::android::AndroidContext;
use talpid_types::{
net::{
- openvpn, AllowedEndpoint, Endpoint, TransportProtocol, TunnelEndpoint, TunnelParameters,
- TunnelType,
+ openvpn::{self, ProxySettings},
+ AllowedEndpoint, Endpoint, TransportProtocol, TunnelEndpoint, TunnelParameters, TunnelType,
},
tunnel::{ErrorStateCause, ParameterGenerationError, TunnelStateTransition},
ErrorExt,
@@ -327,6 +331,8 @@ pub(crate) enum InternalDaemonEvent {
NewAccountEvent(AccountToken, oneshot::Sender<Result<String, Error>>),
/// The background job fetching new `AppVersionInfo`s got a new info object.
NewAppVersionInfo(AppVersionInfo),
+ /// Request from REST client to use a different API endpoint.
+ GenerateApiConnectionMode(api::ApiConnectionModeRequest),
/// The split tunnel paths or state were updated.
#[cfg(target_os = "windows")]
ExcludedPathsEvent(ExcludedPathsUpdate, oneshot::Sender<Result<(), Error>>),
@@ -356,6 +362,12 @@ impl From<AppVersionInfo> for InternalDaemonEvent {
}
}
+impl From<api::ApiConnectionModeRequest> for InternalDaemonEvent {
+ fn from(request: api::ApiConnectionModeRequest) -> Self {
+ InternalDaemonEvent::GenerateApiConnectionMode(request)
+ }
+}
+
#[derive(Clone, Debug, Eq, PartialEq)]
enum DaemonExecutionState {
Running,
@@ -546,6 +558,7 @@ pub struct Daemon<L: EventListener> {
app_version_info: Option<AppVersionInfo>,
shutdown_tasks: Vec<Pin<Box<dyn Future<Output = ()>>>>,
tunnel_state_machine_handle: tunnel_state_machine::JoinHandle,
+ cache_dir: PathBuf,
#[cfg(target_os = "windows")]
volume_update_tx: mpsc::UnboundedSender<()>,
}
@@ -569,6 +582,8 @@ where
exclusion_gid::set_exclusion_gid().map_err(Error::GroupIdError)?
};
+ mullvad_rpc::proxy::ApiConnectionMode::try_delete_cache(&cache_dir).await;
+
let runtime = tokio::runtime::Handle::current();
let (internal_event_tx, internal_event_rx) = command_channel.destructure();
@@ -606,7 +621,7 @@ where
vec![]
};
- let mut rpc_runtime = mullvad_rpc::MullvadRpcRuntime::with_cache(
+ let rpc_runtime = mullvad_rpc::MullvadRpcRuntime::with_cache(
&cache_dir,
true,
#[cfg(target_os = "android")]
@@ -619,7 +634,7 @@ where
api_availability.suspend();
let initial_api_endpoint =
- Self::get_allowed_endpoint(rpc_runtime.address_cache.get_address());
+ Self::get_allowed_endpoint(rpc_runtime.address_cache.get_address().await);
let (offline_state_tx, offline_state_rx) = mpsc::unbounded();
#[cfg(target_os = "windows")]
@@ -649,25 +664,37 @@ where
.await
.map_err(Error::TunnelError)?;
- let address_change_runtime = runtime.clone();
- let tunnel_cmd_weak_tx = Arc::downgrade(&tunnel_command_tx);
- rpc_runtime.set_address_change_listener(move |address| {
- let (result_tx, result_rx) = oneshot::channel();
- let tx = tunnel_cmd_weak_tx.clone();
- address_change_runtime.block_on(async move {
- if let Some(tx) = tx.upgrade() {
- let _ = tx.unbounded_send(TunnelCommand::AllowEndpoint(
- Self::get_allowed_endpoint(address),
+ let api_endpoint_tunnel_tx = Arc::downgrade(&tunnel_command_tx);
+ let api_endpoint_handler = move |address: SocketAddr| {
+ let tunnel_tx = api_endpoint_tunnel_tx.clone();
+ async move {
+ let (result_tx, result_rx) = oneshot::channel();
+ if let Some(tunnel_tx) = tunnel_tx.upgrade() {
+ let _ = tunnel_tx.unbounded_send(TunnelCommand::AllowEndpoint(
+ Self::get_allowed_endpoint(address.clone()),
result_tx,
));
- result_rx.await.map_err(|_| ())
+ if result_rx.await.is_ok() {
+ log::debug!("API endpoint: {}", address);
+ true
+ } else {
+ log::error!("Failed to update allowed endpoint");
+ false
+ }
} else {
- Err(())
+ log::error!("Tunnel state machine is down");
+ false
}
- })
- });
+ }
+ };
- let rpc_handle = rpc_runtime.mullvad_rest_handle();
+ let proxy_provider = api::create_api_config_provider(
+ internal_event_tx.to_specialized_sender(),
+ ApiConnectionMode::Direct,
+ );
+ let rpc_handle = rpc_runtime
+ .mullvad_rest_handle(proxy_provider, api_endpoint_handler)
+ .await;
Self::forward_offline_state(api_availability.clone(), offline_state_rx).await;
@@ -740,6 +767,7 @@ where
app_version_info,
shutdown_tasks: vec![],
tunnel_state_machine_handle,
+ cache_dir,
#[cfg(target_os = "windows")]
volume_update_tx,
};
@@ -902,6 +930,9 @@ where
NewAppVersionInfo(app_version_info) => {
self.handle_new_app_version_info(app_version_info)
}
+ GenerateApiConnectionMode(request) => {
+ self.handle_generate_api_connection_mode(request).await
+ }
#[cfg(windows)]
ExcludedPathsEvent(update, tx) => self.handle_new_excluded_paths(update, tx).await,
}
@@ -1072,7 +1103,7 @@ where
BridgeState::On => {
let (bridge_settings, bridge_relay) = self
.relay_selector
- .get_proxy_settings(&bridge_constraints, location)
+ .get_proxy_settings(&bridge_constraints, Some(location))
.ok_or(Error::NoBridgeAvailable)?;
self.last_generated_bridge_relay = Some(bridge_relay);
Some(bridge_settings)
@@ -1081,7 +1112,7 @@ where
if let Some((bridge_settings, bridge_relay)) =
self.relay_selector.get_auto_proxy_settings(
&bridge_constraints,
- location,
+ Some(location),
retry_attempt,
)
{
@@ -1348,6 +1379,43 @@ where
self.event_listener.notify_app_version(app_version_info);
}
+ async fn handle_generate_api_connection_mode(
+ &mut self,
+ request: api::ApiConnectionModeRequest,
+ ) {
+ let constraints = InternalBridgeConstraints {
+ location: Constraint::Any,
+ providers: Constraint::Any,
+ transport_protocol: Constraint::Only(TransportProtocol::Tcp),
+ };
+
+ let bridge = if request.retry_attempt % 3 > 0 {
+ self.relay_selector.get_proxy_settings(&constraints, None)
+ } else {
+ None
+ };
+ let config = match bridge {
+ Some((settings, _relay)) => match settings {
+ ProxySettings::Shadowsocks(ss_settings) => {
+ ApiConnectionMode::Proxied(ProxyConfig::Shadowsocks(ss_settings))
+ }
+ _ => {
+ log::error!("Received unexpected proxy settings type");
+ ApiConnectionMode::Direct
+ }
+ },
+ None => ApiConnectionMode::Direct,
+ };
+
+ if let Err(error) = config.save(&self.cache_dir).await {
+ log::debug!(
+ "{}",
+ error.display_chain_with_msg("Failed to save API endpoint")
+ );
+ }
+ let _ = request.response_tx.send(config);
+ }
+
#[cfg(windows)]
async fn handle_new_excluded_paths(
&mut self,
@@ -1406,7 +1474,7 @@ where
match &self.tunnel_state {
Disconnected => {
- let location = self.get_geo_location();
+ let location = self.get_geo_location().await;
tokio::spawn(async {
Self::oneshot_send(tx, location.await.ok(), "current location");
});
@@ -1419,7 +1487,7 @@ where
}
Connected { location, .. } => {
let relay_location = location.clone();
- let location_future = self.get_geo_location();
+ let location_future = self.get_geo_location().await;
tokio::spawn(async {
let location = location_future.await;
Self::oneshot_send(
@@ -1440,8 +1508,8 @@ where
}
}
- fn get_geo_location(&mut self) -> impl Future<Output = Result<GeoIpLocation, ()>> {
- let rpc_service = self.rpc_runtime.rest_handle();
+ async fn get_geo_location(&mut self) -> impl Future<Output = Result<GeoIpLocation, ()>> {
+ let rpc_service = self.rpc_runtime.rest_handle().await;
async {
geoip::send_location_request(rpc_service)
.await