diff options
| -rw-r--r-- | mullvad-daemon/src/lib.rs | 5 | ||||
| -rw-r--r-- | mullvad-daemon/src/management_interface.rs | 50 | ||||
| -rw-r--r-- | mullvad-daemon/src/rpc_uniqueness_check.rs | 2 | ||||
| -rw-r--r-- | mullvad-management-interface/src/client.rs | 8 | ||||
| -rw-r--r-- | mullvad-management-interface/src/lib.rs | 53 |
5 files changed, 92 insertions, 26 deletions
diff --git a/mullvad-daemon/src/lib.rs b/mullvad-daemon/src/lib.rs index 7ec577fd8f..4d4e80d5c5 100644 --- a/mullvad-daemon/src/lib.rs +++ b/mullvad-daemon/src/lib.rs @@ -81,11 +81,11 @@ use mullvad_types::{ use mullvad_update::version::rollout::Rollout; use relay_list::{RelayListUpdater, RelayListUpdaterHandle}; use settings::SettingsPersister; -use std::collections::BTreeSet; #[cfg(any(target_os = "windows", target_os = "android", target_os = "macos"))] use std::collections::HashSet; #[cfg(target_os = "android")] use std::os::unix::io::RawFd; +use std::{collections::BTreeSet, net::SocketAddr}; use std::{ marker::PhantomData, path::PathBuf, @@ -706,6 +706,8 @@ impl Daemon { #[cfg(target_os = "macos")] macos::bump_filehandle_limit(); + let http_socket_address: SocketAddr = "127.0.0.1:3000".parse().unwrap(); + let command_sender = daemon_command_channel.sender(); let app_upgrade_broadcast = tokio::sync::broadcast::channel(32).0; let management_interface = ManagementInterfaceServer::start( @@ -713,6 +715,7 @@ impl Daemon { config.rpc_socket_path, app_upgrade_broadcast.clone(), config.log_handle, + http_socket_address, ) .map_err(Error::ManagementInterfaceError)?; diff --git a/mullvad-daemon/src/management_interface.rs b/mullvad-daemon/src/management_interface.rs index eba149f811..9fb1c573ae 100644 --- a/mullvad-daemon/src/management_interface.rs +++ b/mullvad-daemon/src/management_interface.rs @@ -21,7 +21,7 @@ use mullvad_types::{ version, wireguard::{RotationInterval, RotationIntervalError}, }; -use std::collections::BTreeSet; +use std::{collections::BTreeSet, net::SocketAddr}; use std::{ path::PathBuf, str::FromStr, @@ -1306,7 +1306,8 @@ impl ManagementServiceImpl { pub struct ManagementInterfaceServer { /// The rpc server spawned by [`Self::start`]. When the underlying join handle yields, the rpc /// server has shutdown. - rpc_server_join_handle: ServerJoinHandle, + uds_rpc_server_join_handle: ServerJoinHandle, + http_rpc_server_join_handle: ServerJoinHandle, /// Channel used to signal the running gRPC server to shutdown. This needs to be done before /// awaiting trying to join [`Self::rpc_server_join_handle`]. server_abort_tx: mpsc::Sender<()>, @@ -1321,6 +1322,7 @@ impl ManagementInterfaceServer { rpc_socket_path: PathBuf, app_upgrade_broadcast: AppUpgradeBroadcast, log_reload_handle: crate::logging::LogHandle, + http_socket_address: SocketAddr, ) -> Result<ManagementInterfaceServer, Error> { let subscriptions = Arc::<Mutex<Vec<EventsListenerSender>>>::default(); @@ -1330,12 +1332,12 @@ impl ManagementInterfaceServer { let (server_abort_tx, server_abort_rx) = mpsc::channel(0); let server = ManagementServiceImpl { - daemon_tx, + daemon_tx: daemon_tx.clone(), subscriptions: subscriptions.clone(), - app_upgrade_broadcast, - log_reload_handle, + app_upgrade_broadcast: app_upgrade_broadcast.clone(), + log_reload_handle: log_reload_handle.clone(), }; - let rpc_server_join_handle = mullvad_management_interface::spawn_rpc_server( + let uds_rpc_server_join_handle = mullvad_management_interface::spawn_uds_rpc_server( server, async move { StreamExt::into_future(server_abort_rx).await; @@ -1344,6 +1346,16 @@ impl ManagementInterfaceServer { ) .map_err(Error::SetupError)?; + let server = ManagementServiceImpl { + daemon_tx, + subscriptions: subscriptions.clone(), + app_upgrade_broadcast, + log_reload_handle, + }; + let http_rpc_server_join_handle = + mullvad_management_interface::spawn_http_rpc_server(server, http_socket_address) + .map_err(Error::SetupError)?; + log::info!( "Management interface listening on {}", rpc_socket_path.display() @@ -1352,7 +1364,8 @@ impl ManagementInterfaceServer { let broadcast = ManagementInterfaceEventBroadcaster { subscriptions }; Ok(ManagementInterfaceServer { - rpc_server_join_handle, + uds_rpc_server_join_handle, + http_rpc_server_join_handle, server_abort_tx, broadcast, }) @@ -1366,14 +1379,31 @@ impl ManagementInterfaceServer { // Send a singal to the underlying RPC server to shut down. let _ = self.server_abort_tx.send(()).await; - match timeout(RPC_SERVER_SHUTDOWN_TIMEOUT, self.rpc_server_join_handle).await { + match timeout(RPC_SERVER_SHUTDOWN_TIMEOUT, self.uds_rpc_server_join_handle).await { + // Joining the rpc server handle timed out + Err(timeout) => { + log::error!("Timed out while shutting down UDS management server: {timeout}"); + } + Ok(join_result) => { + if let Err(_error) = join_result { + log::error!("UDS Management server task failed to execute until completion"); + } + } + } + + match timeout( + RPC_SERVER_SHUTDOWN_TIMEOUT, + self.http_rpc_server_join_handle, + ) + .await + { // Joining the rpc server handle timed out Err(timeout) => { - log::error!("Timed out while shutting down management server: {timeout}"); + log::error!("Timed out while shutting down HTTP management server: {timeout}"); } Ok(join_result) => { if let Err(_error) = join_result { - log::error!("Management server task failed to execute until completion"); + log::error!("HTTP Management server task failed to execute until completion"); } } } diff --git a/mullvad-daemon/src/rpc_uniqueness_check.rs b/mullvad-daemon/src/rpc_uniqueness_check.rs index 89e08eb47f..c1b2172c99 100644 --- a/mullvad-daemon/src/rpc_uniqueness_check.rs +++ b/mullvad-daemon/src/rpc_uniqueness_check.rs @@ -6,7 +6,7 @@ use talpid_types::ErrorExt; /// Tries to connect to another daemon and perform a simple RPC call. If it fails, assumes the /// other daemon has stopped. pub async fn is_another_instance_running() -> bool { - match MullvadProxyClient::new().await { + match MullvadProxyClient::new_without_http().await { Ok(_) => true, Err(error) => { let msg = diff --git a/mullvad-management-interface/src/client.rs b/mullvad-management-interface/src/client.rs index db200c1050..b0db53ebf3 100644 --- a/mullvad-management-interface/src/client.rs +++ b/mullvad-management-interface/src/client.rs @@ -92,8 +92,14 @@ impl TryFrom<types::daemon_event::Event> for DaemonEvent { #[cfg(not(target_os = "android"))] impl MullvadProxyClient { pub async fn new() -> Result<Self> { + let remote_http_addr = Some("http://127.0.0.1:3000".to_string()); #[expect(deprecated)] - super::new_rpc_client().await.map(Self) + super::new_rpc_client(remote_http_addr).await.map(Self) + } + + pub async fn new_without_http() -> Result<Self> { + #[expect(deprecated)] + super::new_rpc_client(None).await.map(Self) } pub fn from_rpc_client(client: crate::ManagementServiceClient) -> Self { diff --git a/mullvad-management-interface/src/lib.rs b/mullvad-management-interface/src/lib.rs index 091714acf6..5e7d7fe16c 100644 --- a/mullvad-management-interface/src/lib.rs +++ b/mullvad-management-interface/src/lib.rs @@ -6,6 +6,7 @@ use std::{env, fs, os::unix::fs::PermissionsExt}; use std::{ future::Future, io, + net::SocketAddr, path::PathBuf, pin::Pin, task::{Context, Poll}, @@ -132,20 +133,29 @@ impl From<tonic::Status> for Error { #[cfg(not(target_os = "android"))] #[deprecated(note = "Prefer MullvadProxyClient")] -pub async fn new_rpc_client() -> Result<ManagementServiceClient, Error> { +pub async fn new_rpc_client( + remote_http_addr: Option<String>, +) -> Result<ManagementServiceClient, Error> { use futures::TryFutureExt; - let ipc_path = mullvad_paths::get_rpc_socket_path(); + if let Some(http_address) = remote_http_addr { + let management_client = ManagementServiceClient::connect(http_address) + .await + .unwrap(); + Ok(management_client) + } else { + let ipc_path = mullvad_paths::get_rpc_socket_path(); - // The URI will be ignored - let channel = Endpoint::from_static("lttp://[::]:50051") - .connect_with_connector(service_fn(move |_: Uri| { - IpcEndpoint::connect(ipc_path.clone()).map_ok(hyper_util::rt::tokio::TokioIo::new) - })) - .await - .map_err(Error::GrpcTransportError)?; + // The URI will be ignored + let channel = Endpoint::from_static("lttp://[::]:50051") + .connect_with_connector(service_fn(move |_: Uri| { + IpcEndpoint::connect(ipc_path.clone()).map_ok(hyper_util::rt::tokio::TokioIo::new) + })) + .await + .map_err(Error::GrpcTransportError)?; - Ok(ManagementServiceClient::new(channel)) + Ok(ManagementServiceClient::new(channel)) + } } #[cfg(not(target_os = "android"))] @@ -153,7 +163,7 @@ pub use client::MullvadProxyClient; pub type ServerJoinHandle = tokio::task::JoinHandle<()>; -pub fn spawn_rpc_server<T: ManagementService, F: Future<Output = ()> + Send + 'static>( +pub fn spawn_uds_rpc_server<T: ManagementService, F: Future<Output = ()> + Send + 'static>( service: T, abort_rx: F, rpc_socket_path: PathBuf, @@ -188,9 +198,26 @@ pub fn spawn_rpc_server<T: ManagementService, F: Future<Output = ()> + Send + 's .await .map_err(Error::GrpcTransportError) { - log::error!("Management server panic: {execution_error}"); + log::error!("UDP Management server panic: {execution_error}"); + } + log::trace!("UDP gRPC server is shutting down"); + })) +} + +pub fn spawn_http_rpc_server<T: ManagementService>( + service: T, + socket_address: SocketAddr, +) -> std::result::Result<ServerJoinHandle, Error> { + Ok(tokio::spawn(async move { + if let Err(execution_error) = Server::builder() + .add_service(ManagementServiceServer::new(service)) + .serve(socket_address) + .await + .map_err(Error::GrpcTransportError) + { + log::error!("HTTP Management server panic: {execution_error}"); } - log::trace!("gRPC server is shutting down"); + log::trace!("HTTP gRPC server is shutting down"); })) } |
