summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorMarkus Pettersson <markus.pettersson@mullvad.net>2024-08-02 15:05:08 +0200
committerMarkus Pettersson <markus.pettersson@mullvad.net>2024-08-06 07:57:01 +0200
commit4df215fdd188e59f22cfcc514177548d55ab21de (patch)
treef31bfced4638bcac2828b2d9b4aa9d5fe27a4674
parent79f4d1276226f2495035f7b8ce93a1074cfdac52 (diff)
downloadmullvadvpn-4df215fdd188e59f22cfcc514177548d55ab21de.tar.xz
mullvadvpn-4df215fdd188e59f22cfcc514177548d55ab21de.zip
Shut down gRPC server gracefully
This commit also removes the `EventListener` trait and the daemon is no longer parameterized over it.
-rw-r--r--mullvad-daemon/src/access_method.rs7
-rw-r--r--mullvad-daemon/src/custom_list.rs7
-rw-r--r--mullvad-daemon/src/lib.rs90
-rw-r--r--mullvad-daemon/src/main.rs40
-rw-r--r--mullvad-daemon/src/management_interface.rs123
-rw-r--r--mullvad-jni/src/lib.rs16
-rw-r--r--mullvad-management-interface/src/lib.rs8
7 files changed, 144 insertions, 147 deletions
diff --git a/mullvad-daemon/src/access_method.rs b/mullvad-daemon/src/access_method.rs
index e683165685..d4e0fc951b 100644
--- a/mullvad-daemon/src/access_method.rs
+++ b/mullvad-daemon/src/access_method.rs
@@ -1,4 +1,4 @@
-use crate::{api, settings, Daemon, EventListener};
+use crate::{api, settings, Daemon};
use mullvad_api::{proxy::ApiConnectionMode, rest, ApiProxy};
use mullvad_types::{
access_method::{self, AccessMethod, AccessMethodSetting},
@@ -28,10 +28,7 @@ pub enum Error {
Settings(#[from] settings::Error),
}
-impl<L> Daemon<L>
-where
- L: EventListener,
-{
+impl Daemon {
/// Add a [`AccessMethod`] to the daemon's settings.
///
/// If the daemon settings are successfully updated, the
diff --git a/mullvad-daemon/src/custom_list.rs b/mullvad-daemon/src/custom_list.rs
index 459ef9b932..b989a7fe56 100644
--- a/mullvad-daemon/src/custom_list.rs
+++ b/mullvad-daemon/src/custom_list.rs
@@ -1,4 +1,4 @@
-use crate::{new_selector_config, Daemon, Error, EventListener};
+use crate::{new_selector_config, Daemon, Error};
use mullvad_types::{
constraints::Constraint,
custom_list::{CustomList, Id},
@@ -6,10 +6,7 @@ use mullvad_types::{
};
use talpid_types::net::TunnelType;
-impl<L> Daemon<L>
-where
- L: EventListener,
-{
+impl Daemon {
/// Create a new custom list.
///
/// Returns an error if the name is not unique.
diff --git a/mullvad-daemon/src/lib.rs b/mullvad-daemon/src/lib.rs
index ba83da27f5..19a9a2e949 100644
--- a/mullvad-daemon/src/lib.rs
+++ b/mullvad-daemon/src/lib.rs
@@ -37,6 +37,7 @@ use futures::{
StreamExt,
};
use geoip::GeoIpHandler;
+use management_interface::ManagementInterfaceServer;
use mullvad_relay_selector::{
AdditionalRelayConstraints, AdditionalWireguardConstraints, RelaySelector, SelectorConfig,
};
@@ -111,6 +112,9 @@ pub enum Error {
#[error("REST request failed")]
RestError(#[source] mullvad_api::rest::Error),
+ #[error("Management interface error")]
+ ManagementInterfaceError(#[source] management_interface::Error),
+
#[error("API availability check failed")]
ApiCheckError(#[source] mullvad_api::availability::Error),
@@ -549,32 +553,7 @@ where
}
}
-/// Trait representing something that can broadcast daemon events.
-pub trait EventListener: Clone + Send + Sync + 'static {
- /// Notify that the tunnel state changed.
- fn notify_new_state(&self, new_state: TunnelState);
-
- /// Notify that the settings changed.
- fn notify_settings(&self, settings: Settings);
-
- /// Notify that the relay list changed.
- fn notify_relay_list(&self, relay_list: RelayList);
-
- /// Notify that info about the latest available app version changed.
- /// Or some flag about the currently running version is changed.
- fn notify_app_version(&self, app_version_info: AppVersionInfo);
-
- /// Notify that device changed (login, logout, or key rotation).
- fn notify_device_event(&self, event: DeviceEvent);
-
- /// Notify that a device was revoked using `RemoveDevice`.
- fn notify_remove_device_event(&self, event: RemoveDeviceEvent);
-
- /// Notify that the api access method changed.
- fn notify_new_access_method_event(&self, new_access_method: AccessMethodSetting);
-}
-
-pub struct Daemon<L: EventListener> {
+pub struct Daemon {
tunnel_state: TunnelState,
target_state: PersistentTargetState,
#[cfg(target_os = "linux")]
@@ -582,7 +561,7 @@ pub struct Daemon<L: EventListener> {
rx: mpsc::UnboundedReceiver<InternalDaemonEvent>,
tx: DaemonEventSender,
reconnection_job: Option<AbortHandle>,
- event_listener: L,
+ management_interface: ManagementInterfaceServer,
migration_complete: migrations::MigrationComplete,
settings: SettingsPersister,
account_history: account_history::AccountHistory,
@@ -602,26 +581,29 @@ pub struct Daemon<L: EventListener> {
location_handler: GeoIpHandler,
}
-impl<L> Daemon<L>
-where
- L: EventListener,
-{
+impl Daemon {
+ #[allow(clippy::too_many_arguments)]
pub async fn start(
log_dir: Option<PathBuf>,
resource_dir: PathBuf,
settings_dir: PathBuf,
cache_dir: PathBuf,
- event_listener: L,
- command_channel: DaemonCommandChannel,
+ rpc_socket_path: PathBuf,
#[cfg(target_os = "android")] android_context: AndroidContext,
) -> Result<Self, Error> {
#[cfg(target_os = "macos")]
macos::bump_filehandle_limit();
- mullvad_api::proxy::ApiConnectionMode::try_delete_cache(&cache_dir).await;
+ let command_channel = DaemonCommandChannel::new();
+ let command_sender = command_channel.sender();
+
+ let management_interface =
+ ManagementInterfaceServer::start(command_sender, rpc_socket_path)
+ .map_err(Error::ManagementInterfaceError)?;
let (internal_event_tx, internal_event_rx) = command_channel.destructure();
+ mullvad_api::proxy::ApiConnectionMode::try_delete_cache(&cache_dir).await;
let api_runtime = mullvad_api::Runtime::with_cache(
&cache_dir,
true,
@@ -644,7 +626,7 @@ where
None
});
- let settings_event_listener = event_listener.clone();
+ let settings_event_listener = management_interface.notifier().clone();
let mut settings = SettingsPersister::load(&settings_dir).await;
settings.register_change_listener(move |settings| {
// Notify management interface server of changes to the settings
@@ -804,7 +786,7 @@ where
api::forward_offline_state(api_availability.clone(), offline_state_rx);
- let relay_list_listener = event_listener.clone();
+ let relay_list_listener = management_interface.notifier().clone();
let on_relay_list_update = move |relay_list: &RelayList| {
relay_list_listener.notify_relay_list(relay_list.clone());
};
@@ -844,7 +826,7 @@ where
rx: internal_event_rx,
tx: internal_event_tx,
reconnection_job: None,
- event_listener,
+ management_interface,
migration_complete,
settings,
account_history,
@@ -915,7 +897,7 @@ where
/// be destroyed, and executing shutdown tasks
async fn finalize(self) {
let Daemon {
- event_listener,
+ management_interface,
shutdown_tasks,
api_runtime,
tunnel_state_machine_handle,
@@ -932,8 +914,9 @@ where
account_manager.shutdown().await;
tunnel_state_machine_handle.try_join().await;
+ // Wait for the management interface server to shut down
+ management_interface.stop().await;
- drop(event_listener);
drop(api_runtime);
}
@@ -1042,7 +1025,9 @@ where
}
self.tunnel_state = tunnel_state.clone();
- self.event_listener.notify_new_state(tunnel_state);
+ self.management_interface
+ .notifier()
+ .notify_new_state(tunnel_state);
self.fetch_am_i_mullvad();
}
@@ -1110,7 +1095,8 @@ where
_ => return,
};
- self.event_listener
+ self.management_interface
+ .notifier()
.notify_new_state(self.tunnel_state.clone());
}
@@ -1125,7 +1111,8 @@ where
// Make sure to update the daemon's actual tunnel state. Otherwise feature indicator changes won't be persisted.
self.tunnel_state
.set_feature_indicators(new_feature_indicators);
- self.event_listener
+ self.management_interface
+ .notifier()
.notify_new_state(self.tunnel_state.clone());
}
}
@@ -1287,7 +1274,9 @@ where
}
fn handle_new_app_version_info(&mut self, app_version_info: AppVersionInfo) {
- self.event_listener.notify_app_version(app_version_info);
+ self.management_interface
+ .notifier()
+ .notify_app_version(app_version_info);
}
async fn handle_device_event(&mut self, event: AccountEvent) {
@@ -1338,7 +1327,8 @@ where
_ => (),
}
if let AccountEvent::Device(event) = event {
- self.event_listener
+ self.management_interface
+ .notifier()
.notify_device_event(DeviceEvent::from(event));
}
}
@@ -1367,14 +1357,14 @@ where
// currently active access method. The announcement should be
// made after the firewall policy has been updated, since the
// new access method will be useless before then.
- let event_listener = self.event_listener.clone();
+ let notifier = self.management_interface.notifier().clone();
tokio::spawn(async move {
// Wait for the firewall policy to be updated.
let _ = completion_rx.await;
// Let the emitter of this event know that the firewall has been updated.
let _ = endpoint_active_tx.send(());
// Notify clients about the change if necessary.
- event_listener.notify_new_access_method_event(setting);
+ notifier.notify_new_access_method_event(setting);
});
}
}
@@ -1385,7 +1375,7 @@ where
result: Result<PrivateAccountAndDevice, device::Error>,
) {
let account_manager = self.account_manager.clone();
- let event_listener = self.event_listener.clone();
+ let notifier = self.management_interface.notifier().clone();
tokio::spawn(async move {
if let Ok(Some(_)) = account_manager
.data_after_login()
@@ -1414,7 +1404,7 @@ where
new_state: DeviceState::LoggedOut,
},
};
- event_listener.notify_device_event(event);
+ notifier.notify_device_event(event);
}
});
}
@@ -1639,7 +1629,7 @@ where
device_id: DeviceId,
) {
let device_service = self.account_manager.device_service.clone();
- let event_listener = self.event_listener.clone();
+ let notifier = self.management_interface.notifier().clone();
tokio::spawn(async move {
let result = device_service
@@ -1648,7 +1638,7 @@ where
.map(move |new_devices| {
// FIXME: We should be able to get away with only returning the removed ID,
// and not have to request the list from the API.
- event_listener.notify_remove_device_event(RemoveDeviceEvent {
+ notifier.notify_remove_device_event(RemoveDeviceEvent {
account_token,
new_devices,
});
diff --git a/mullvad-daemon/src/main.rs b/mullvad-daemon/src/main.rs
index 253f804965..86b718e323 100644
--- a/mullvad-daemon/src/main.rs
+++ b/mullvad-daemon/src/main.rs
@@ -1,15 +1,8 @@
+use std::{path::PathBuf, thread, time::Duration};
+
#[cfg(not(windows))]
use mullvad_daemon::cleanup_old_rpc_socket;
-use mullvad_daemon::{
- logging,
- management_interface::{ManagementInterfaceEventBroadcaster, ManagementInterfaceServer},
- rpc_uniqueness_check, runtime, version, Daemon, DaemonCommandChannel, DaemonCommandSender,
-};
-use std::{
- path::{Path, PathBuf},
- thread,
- time::Duration,
-};
+use mullvad_daemon::{logging, rpc_uniqueness_check, runtime, version, Daemon};
use talpid_types::ErrorExt;
mod cli;
@@ -196,9 +189,7 @@ async fn run_standalone(log_dir: Option<PathBuf>) -> Result<(), String> {
Ok(())
}
-async fn create_daemon(
- log_dir: Option<PathBuf>,
-) -> Result<Daemon<ManagementInterfaceEventBroadcaster>, String> {
+async fn create_daemon(log_dir: Option<PathBuf>) -> Result<Daemon, String> {
let rpc_socket_path = mullvad_paths::get_rpc_socket_path();
let resource_dir = mullvad_paths::get_resource_dir();
let settings_dir = mullvad_paths::settings_dir()
@@ -206,38 +197,17 @@ async fn create_daemon(
let cache_dir = mullvad_paths::cache_dir()
.map_err(|e| e.display_chain_with_msg("Unable to get cache dir"))?;
- let command_channel = DaemonCommandChannel::new();
- let event_listener = spawn_management_interface(command_channel.sender(), rpc_socket_path)?;
-
Daemon::start(
log_dir,
resource_dir,
settings_dir,
cache_dir,
- event_listener,
- command_channel,
+ rpc_socket_path,
)
.await
.map_err(|e| e.display_chain_with_msg("Unable to initialize daemon"))
}
-fn spawn_management_interface(
- command_sender: DaemonCommandSender,
- rpc_socket_path: impl AsRef<Path>,
-) -> Result<ManagementInterfaceEventBroadcaster, String> {
- let event_broadcaster = ManagementInterfaceServer::start(command_sender, &rpc_socket_path)
- .map_err(|error| {
- error.display_chain_with_msg("Unable to start management interface server")
- })?;
-
- log::info!(
- "Management interface listening on {}",
- rpc_socket_path.as_ref().display()
- );
-
- Ok(event_broadcaster)
-}
-
#[cfg(unix)]
fn running_as_admin() -> bool {
let uid = unsafe { libc::getuid() };
diff --git a/mullvad-daemon/src/management_interface.rs b/mullvad-daemon/src/management_interface.rs
index a7e2b23f4f..4d0f558a97 100644
--- a/mullvad-daemon/src/management_interface.rs
+++ b/mullvad-daemon/src/management_interface.rs
@@ -1,6 +1,4 @@
-use crate::{
- account_history, device, version_check, DaemonCommand, DaemonCommandSender, EventListener,
-};
+use crate::{account_history, device, version_check, DaemonCommand, DaemonCommandSender};
use futures::{
channel::{mpsc, oneshot},
StreamExt,
@@ -8,7 +6,7 @@ use futures::{
use mullvad_api::{rest::Error as RestError, StatusCode};
use mullvad_management_interface::{
types::{self, daemon_event, management_service_server::ManagementService},
- Code, Request, Response, Status,
+ Code, Request, Response, ServerJoinHandle, Status,
};
use mullvad_types::{
account::AccountToken,
@@ -28,8 +26,11 @@ use std::{
time::Duration,
};
use talpid_types::ErrorExt;
+use tokio::time::timeout;
use tokio_stream::wrappers::UnboundedReceiverStream;
+const RPC_SERVER_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(1);
+
#[derive(thiserror::Error, Debug)]
pub enum Error {
// Unable to start the management interface server
@@ -1054,53 +1055,98 @@ impl ManagementServiceImpl {
}
}
-pub struct ManagementInterfaceServer(());
+/// The running management interface serving gRPC requests.
+pub struct ManagementInterfaceServer {
+ /// The rpc server spawned by [`Self::start`]. When the underlying join handle yields, the rpc
+ /// server has shutdown.
+ rpc_server_join_handle: ServerJoinHandle,
+ /// Channel used to signal the running gRPC server to shutdown. This needs to be done before
+ /// awaiting trying to join [`Self::rpc_server_join_handle`].
+ server_abort_tx: mpsc::Sender<()>,
+ /// A reference to the associated [`ManagementInterfaceEventBroadcaster`]. This may be used to
+ /// broadcast certain events to all subscribers of the management interface.
+ broadcast: ManagementInterfaceEventBroadcaster,
+}
impl ManagementInterfaceServer {
pub fn start(
- tunnel_tx: DaemonCommandSender,
+ daemon_tx: DaemonCommandSender,
rpc_socket_path: impl AsRef<Path>,
- ) -> Result<ManagementInterfaceEventBroadcaster, Error> {
+ ) -> Result<ManagementInterfaceServer, Error> {
let subscriptions = Arc::<Mutex<Vec<EventsListenerSender>>>::default();
-
+ // NOTE: It is important that the channel buffer size is kept at 0. When sending a signal
+ // to abort the gRPC server, the sender can be awaited to know when the gRPC server has
+ // received and started processing the shutdown signal.
let (server_abort_tx, server_abort_rx) = mpsc::channel(0);
let server = ManagementServiceImpl {
- daemon_tx: tunnel_tx,
+ daemon_tx,
subscriptions: subscriptions.clone(),
};
- let join_handle = mullvad_management_interface::spawn_rpc_server(
+ let rpc_server_join_handle = mullvad_management_interface::spawn_rpc_server(
server,
async move {
server_abort_rx.into_future().await;
},
- rpc_socket_path,
+ &rpc_socket_path,
)
.map_err(Error::SetupError)?;
- tokio::spawn(async move {
- if let Err(error) = join_handle.await {
- log::error!("Management server panic: {}", error);
- }
- log::info!("Management interface shut down");
- });
+ log::info!(
+ "Management interface listening on {}",
+ rpc_socket_path.as_ref().display()
+ );
+
+ let broadcast = ManagementInterfaceEventBroadcaster { subscriptions };
- Ok(ManagementInterfaceEventBroadcaster {
- subscriptions,
- _close_handle: server_abort_tx,
+ Ok(ManagementInterfaceServer {
+ rpc_server_join_handle,
+ server_abort_tx,
+ broadcast,
})
}
+
+ /// Wait for the server to shut down gracefully. If that does not happend within [`RPC_SERVER_SHUTDOWN_TIMEOUT`],
+ /// the gRPC server is aborted and we yield the async execution.
+ pub async fn stop(mut self) {
+ use futures::SinkExt;
+ // Send a singal to the underlying RPC server to shut down.
+ let _ = self.server_abort_tx.send(()).await;
+
+ match timeout(RPC_SERVER_SHUTDOWN_TIMEOUT, self.rpc_server_join_handle).await {
+ // Joining the rpc server handle timed out
+ Err(timeout) => {
+ log::error!("Timed out while shutting down management server: {timeout}");
+ }
+ Ok(join_result) => {
+ if let Err(_error) = join_result {
+ log::error!("Management server task failed to execute until completion");
+ }
+ }
+ }
+ }
+
+ /// Obtain a reference to the associated [`ManagementInterfaceEventBroadcaster`].
+ pub const fn notifier(&self) -> &ManagementInterfaceEventBroadcaster {
+ &self.broadcast
+ }
}
/// A handle that allows broadcasting messages to all subscribers of the management interface.
#[derive(Clone)]
pub struct ManagementInterfaceEventBroadcaster {
subscriptions: Arc<Mutex<Vec<EventsListenerSender>>>,
- _close_handle: mpsc::Sender<()>,
}
-impl EventListener for ManagementInterfaceEventBroadcaster {
+impl ManagementInterfaceEventBroadcaster {
+ fn notify(&self, value: types::DaemonEvent) {
+ let mut subscriptions = self.subscriptions.lock().unwrap();
+ subscriptions.retain(|tx| tx.send(Ok(value.clone())).is_ok());
+ }
+
+ /// Notify that the tunnel state changed.
+ ///
/// Sends a new state update to all `new_state` subscribers of the management interface.
- fn notify_new_state(&self, new_state: TunnelState) {
+ pub(crate) fn notify_new_state(&self, new_state: TunnelState) {
self.notify(types::DaemonEvent {
event: Some(daemon_event::Event::TunnelState(types::TunnelState::from(
new_state,
@@ -1108,8 +1154,10 @@ impl EventListener for ManagementInterfaceEventBroadcaster {
})
}
+ /// Notify that the settings changed.
+ ///
/// Sends settings to all `settings` subscribers of the management interface.
- fn notify_settings(&self, settings: Settings) {
+ pub(crate) fn notify_settings(&self, settings: Settings) {
log::debug!("Broadcasting new settings");
self.notify(types::DaemonEvent {
event: Some(daemon_event::Event::Settings(types::Settings::from(
@@ -1118,8 +1166,10 @@ impl EventListener for ManagementInterfaceEventBroadcaster {
})
}
+ /// Notify that the relay list changed.
+ ///
/// Sends relays to all subscribers of the management interface.
- fn notify_relay_list(&self, relay_list: RelayList) {
+ pub(crate) fn notify_relay_list(&self, relay_list: RelayList) {
log::debug!("Broadcasting new relay list");
self.notify(types::DaemonEvent {
event: Some(daemon_event::Event::RelayList(types::RelayList::from(
@@ -1128,7 +1178,9 @@ impl EventListener for ManagementInterfaceEventBroadcaster {
})
}
- fn notify_app_version(&self, app_version_info: version::AppVersionInfo) {
+ /// Notify that info about the latest available app version changed.
+ /// Or some flag about the currently running version is changed.
+ pub(crate) fn notify_app_version(&self, app_version_info: version::AppVersionInfo) {
log::debug!("Broadcasting new app version info");
self.notify(types::DaemonEvent {
event: Some(daemon_event::Event::VersionInfo(
@@ -1137,7 +1189,8 @@ impl EventListener for ManagementInterfaceEventBroadcaster {
})
}
- fn notify_device_event(&self, device: mullvad_types::device::DeviceEvent) {
+ /// Notify that device changed (login, logout, or key rotation).
+ pub(crate) fn notify_device_event(&self, device: mullvad_types::device::DeviceEvent) {
log::debug!("Broadcasting device event");
self.notify(types::DaemonEvent {
event: Some(daemon_event::Event::Device(types::DeviceEvent::from(
@@ -1146,7 +1199,11 @@ impl EventListener for ManagementInterfaceEventBroadcaster {
})
}
- fn notify_remove_device_event(&self, remove_event: mullvad_types::device::RemoveDeviceEvent) {
+ /// Notify that a device was revoked using `RemoveDevice`.
+ pub(crate) fn notify_remove_device_event(
+ &self,
+ remove_event: mullvad_types::device::RemoveDeviceEvent,
+ ) {
log::debug!("Broadcasting remove device event");
self.notify(types::DaemonEvent {
event: Some(daemon_event::Event::RemoveDevice(
@@ -1155,7 +1212,8 @@ impl EventListener for ManagementInterfaceEventBroadcaster {
})
}
- fn notify_new_access_method_event(
+ /// Notify that the api access method changed.
+ pub(crate) fn notify_new_access_method_event(
&self,
new_access_method: mullvad_types::access_method::AccessMethodSetting,
) {
@@ -1168,13 +1226,6 @@ impl EventListener for ManagementInterfaceEventBroadcaster {
}
}
-impl ManagementInterfaceEventBroadcaster {
- fn notify(&self, value: types::DaemonEvent) {
- let mut subscriptions = self.subscriptions.lock().unwrap();
- subscriptions.retain(|tx| tx.send(Ok(value.clone())).is_ok());
- }
-}
-
/// Converts [`crate::Error`] into a tonic status.
fn map_daemon_error(error: crate::Error) -> Status {
use crate::Error as DaemonError;
diff --git a/mullvad-jni/src/lib.rs b/mullvad-jni/src/lib.rs
index 1cd6765439..6c045b061f 100644
--- a/mullvad-jni/src/lib.rs
+++ b/mullvad-jni/src/lib.rs
@@ -13,8 +13,7 @@ use jnix::{
FromJava, JnixEnv,
};
use mullvad_daemon::{
- cleanup_old_rpc_socket, exception_logging, logging,
- management_interface::ManagementInterfaceServer, runtime::new_multi_thread, version, Daemon,
+ cleanup_old_rpc_socket, exception_logging, logging, runtime::new_multi_thread, version, Daemon,
DaemonCommandChannel, DaemonCommandSender,
};
use std::{
@@ -48,9 +47,6 @@ pub enum Error {
#[error("Failed to init Tokio runtime")]
InitTokio(#[source] io::Error),
-
- #[error("Failed to spawn the management interface")]
- SpawnManagementInterface(#[source] mullvad_daemon::management_interface::Error),
}
/// Throw a Java exception and return if `result` is an error
@@ -169,7 +165,6 @@ fn spawn_daemon(
rpc_socket,
files_dir,
cache_dir,
- daemon_command_channel,
android_context,
))?;
@@ -184,23 +179,16 @@ async fn spawn_daemon_inner(
rpc_socket: PathBuf,
files_dir: PathBuf,
cache_dir: PathBuf,
- command_channel: DaemonCommandChannel,
android_context: AndroidContext,
) -> Result<tokio::task::JoinHandle<()>, Error> {
cleanup_old_rpc_socket(&rpc_socket).await;
- let event_listener = ManagementInterfaceServer::start(command_channel.sender(), &rpc_socket)
- .map_err(Error::SpawnManagementInterface)?;
-
- log::info!("Management interface listening on {}", rpc_socket.display());
-
let daemon = Daemon::start(
Some(files_dir.clone()),
files_dir.clone(),
files_dir,
cache_dir,
- event_listener,
- command_channel,
+ rpc_socket,
android_context,
)
.await
diff --git a/mullvad-management-interface/src/lib.rs b/mullvad-management-interface/src/lib.rs
index 43b9974d59..76564ff3db 100644
--- a/mullvad-management-interface/src/lib.rs
+++ b/mullvad-management-interface/src/lib.rs
@@ -133,7 +133,7 @@ pub async fn new_rpc_client() -> Result<ManagementServiceClient, Error> {
#[cfg(not(target_os = "android"))]
pub use client::MullvadProxyClient;
-pub type ServerJoinHandle = tokio::task::JoinHandle<Result<(), Error>>;
+pub type ServerJoinHandle = tokio::task::JoinHandle<()>;
pub fn spawn_rpc_server<T: ManagementService, F: Future<Output = ()> + Send + 'static>(
service: T,
@@ -164,11 +164,15 @@ pub fn spawn_rpc_server<T: ManagementService, F: Future<Output = ()> + Send + 's
}
Ok(tokio::spawn(async move {
- Server::builder()
+ if let Err(execution_error) = Server::builder()
.add_service(ManagementServiceServer::new(service))
.serve_with_incoming_shutdown(incoming.map_ok(StreamBox), abort_rx)
.await
.map_err(Error::GrpcTransportError)
+ {
+ log::error!("Management server panic: {execution_error}");
+ }
+ log::trace!("gRPC server is shutting down");
}))
}