diff options
| author | Markus Pettersson <markus.pettersson@mullvad.net> | 2024-08-02 15:05:08 +0200 |
|---|---|---|
| committer | Markus Pettersson <markus.pettersson@mullvad.net> | 2024-08-06 07:57:01 +0200 |
| commit | 4df215fdd188e59f22cfcc514177548d55ab21de (patch) | |
| tree | f31bfced4638bcac2828b2d9b4aa9d5fe27a4674 | |
| parent | 79f4d1276226f2495035f7b8ce93a1074cfdac52 (diff) | |
| download | mullvadvpn-4df215fdd188e59f22cfcc514177548d55ab21de.tar.xz mullvadvpn-4df215fdd188e59f22cfcc514177548d55ab21de.zip | |
Shut down gRPC server gracefully
This commit also removes the `EventListener` trait and the daemon is no
longer parameterized over it.
| -rw-r--r-- | mullvad-daemon/src/access_method.rs | 7 | ||||
| -rw-r--r-- | mullvad-daemon/src/custom_list.rs | 7 | ||||
| -rw-r--r-- | mullvad-daemon/src/lib.rs | 90 | ||||
| -rw-r--r-- | mullvad-daemon/src/main.rs | 40 | ||||
| -rw-r--r-- | mullvad-daemon/src/management_interface.rs | 123 | ||||
| -rw-r--r-- | mullvad-jni/src/lib.rs | 16 | ||||
| -rw-r--r-- | mullvad-management-interface/src/lib.rs | 8 |
7 files changed, 144 insertions, 147 deletions
diff --git a/mullvad-daemon/src/access_method.rs b/mullvad-daemon/src/access_method.rs index e683165685..d4e0fc951b 100644 --- a/mullvad-daemon/src/access_method.rs +++ b/mullvad-daemon/src/access_method.rs @@ -1,4 +1,4 @@ -use crate::{api, settings, Daemon, EventListener}; +use crate::{api, settings, Daemon}; use mullvad_api::{proxy::ApiConnectionMode, rest, ApiProxy}; use mullvad_types::{ access_method::{self, AccessMethod, AccessMethodSetting}, @@ -28,10 +28,7 @@ pub enum Error { Settings(#[from] settings::Error), } -impl<L> Daemon<L> -where - L: EventListener, -{ +impl Daemon { /// Add a [`AccessMethod`] to the daemon's settings. /// /// If the daemon settings are successfully updated, the diff --git a/mullvad-daemon/src/custom_list.rs b/mullvad-daemon/src/custom_list.rs index 459ef9b932..b989a7fe56 100644 --- a/mullvad-daemon/src/custom_list.rs +++ b/mullvad-daemon/src/custom_list.rs @@ -1,4 +1,4 @@ -use crate::{new_selector_config, Daemon, Error, EventListener}; +use crate::{new_selector_config, Daemon, Error}; use mullvad_types::{ constraints::Constraint, custom_list::{CustomList, Id}, @@ -6,10 +6,7 @@ use mullvad_types::{ }; use talpid_types::net::TunnelType; -impl<L> Daemon<L> -where - L: EventListener, -{ +impl Daemon { /// Create a new custom list. /// /// Returns an error if the name is not unique. diff --git a/mullvad-daemon/src/lib.rs b/mullvad-daemon/src/lib.rs index ba83da27f5..19a9a2e949 100644 --- a/mullvad-daemon/src/lib.rs +++ b/mullvad-daemon/src/lib.rs @@ -37,6 +37,7 @@ use futures::{ StreamExt, }; use geoip::GeoIpHandler; +use management_interface::ManagementInterfaceServer; use mullvad_relay_selector::{ AdditionalRelayConstraints, AdditionalWireguardConstraints, RelaySelector, SelectorConfig, }; @@ -111,6 +112,9 @@ pub enum Error { #[error("REST request failed")] RestError(#[source] mullvad_api::rest::Error), + #[error("Management interface error")] + ManagementInterfaceError(#[source] management_interface::Error), + #[error("API availability check failed")] ApiCheckError(#[source] mullvad_api::availability::Error), @@ -549,32 +553,7 @@ where } } -/// Trait representing something that can broadcast daemon events. -pub trait EventListener: Clone + Send + Sync + 'static { - /// Notify that the tunnel state changed. - fn notify_new_state(&self, new_state: TunnelState); - - /// Notify that the settings changed. - fn notify_settings(&self, settings: Settings); - - /// Notify that the relay list changed. - fn notify_relay_list(&self, relay_list: RelayList); - - /// Notify that info about the latest available app version changed. - /// Or some flag about the currently running version is changed. - fn notify_app_version(&self, app_version_info: AppVersionInfo); - - /// Notify that device changed (login, logout, or key rotation). - fn notify_device_event(&self, event: DeviceEvent); - - /// Notify that a device was revoked using `RemoveDevice`. - fn notify_remove_device_event(&self, event: RemoveDeviceEvent); - - /// Notify that the api access method changed. - fn notify_new_access_method_event(&self, new_access_method: AccessMethodSetting); -} - -pub struct Daemon<L: EventListener> { +pub struct Daemon { tunnel_state: TunnelState, target_state: PersistentTargetState, #[cfg(target_os = "linux")] @@ -582,7 +561,7 @@ pub struct Daemon<L: EventListener> { rx: mpsc::UnboundedReceiver<InternalDaemonEvent>, tx: DaemonEventSender, reconnection_job: Option<AbortHandle>, - event_listener: L, + management_interface: ManagementInterfaceServer, migration_complete: migrations::MigrationComplete, settings: SettingsPersister, account_history: account_history::AccountHistory, @@ -602,26 +581,29 @@ pub struct Daemon<L: EventListener> { location_handler: GeoIpHandler, } -impl<L> Daemon<L> -where - L: EventListener, -{ +impl Daemon { + #[allow(clippy::too_many_arguments)] pub async fn start( log_dir: Option<PathBuf>, resource_dir: PathBuf, settings_dir: PathBuf, cache_dir: PathBuf, - event_listener: L, - command_channel: DaemonCommandChannel, + rpc_socket_path: PathBuf, #[cfg(target_os = "android")] android_context: AndroidContext, ) -> Result<Self, Error> { #[cfg(target_os = "macos")] macos::bump_filehandle_limit(); - mullvad_api::proxy::ApiConnectionMode::try_delete_cache(&cache_dir).await; + let command_channel = DaemonCommandChannel::new(); + let command_sender = command_channel.sender(); + + let management_interface = + ManagementInterfaceServer::start(command_sender, rpc_socket_path) + .map_err(Error::ManagementInterfaceError)?; let (internal_event_tx, internal_event_rx) = command_channel.destructure(); + mullvad_api::proxy::ApiConnectionMode::try_delete_cache(&cache_dir).await; let api_runtime = mullvad_api::Runtime::with_cache( &cache_dir, true, @@ -644,7 +626,7 @@ where None }); - let settings_event_listener = event_listener.clone(); + let settings_event_listener = management_interface.notifier().clone(); let mut settings = SettingsPersister::load(&settings_dir).await; settings.register_change_listener(move |settings| { // Notify management interface server of changes to the settings @@ -804,7 +786,7 @@ where api::forward_offline_state(api_availability.clone(), offline_state_rx); - let relay_list_listener = event_listener.clone(); + let relay_list_listener = management_interface.notifier().clone(); let on_relay_list_update = move |relay_list: &RelayList| { relay_list_listener.notify_relay_list(relay_list.clone()); }; @@ -844,7 +826,7 @@ where rx: internal_event_rx, tx: internal_event_tx, reconnection_job: None, - event_listener, + management_interface, migration_complete, settings, account_history, @@ -915,7 +897,7 @@ where /// be destroyed, and executing shutdown tasks async fn finalize(self) { let Daemon { - event_listener, + management_interface, shutdown_tasks, api_runtime, tunnel_state_machine_handle, @@ -932,8 +914,9 @@ where account_manager.shutdown().await; tunnel_state_machine_handle.try_join().await; + // Wait for the management interface server to shut down + management_interface.stop().await; - drop(event_listener); drop(api_runtime); } @@ -1042,7 +1025,9 @@ where } self.tunnel_state = tunnel_state.clone(); - self.event_listener.notify_new_state(tunnel_state); + self.management_interface + .notifier() + .notify_new_state(tunnel_state); self.fetch_am_i_mullvad(); } @@ -1110,7 +1095,8 @@ where _ => return, }; - self.event_listener + self.management_interface + .notifier() .notify_new_state(self.tunnel_state.clone()); } @@ -1125,7 +1111,8 @@ where // Make sure to update the daemon's actual tunnel state. Otherwise feature indicator changes won't be persisted. self.tunnel_state .set_feature_indicators(new_feature_indicators); - self.event_listener + self.management_interface + .notifier() .notify_new_state(self.tunnel_state.clone()); } } @@ -1287,7 +1274,9 @@ where } fn handle_new_app_version_info(&mut self, app_version_info: AppVersionInfo) { - self.event_listener.notify_app_version(app_version_info); + self.management_interface + .notifier() + .notify_app_version(app_version_info); } async fn handle_device_event(&mut self, event: AccountEvent) { @@ -1338,7 +1327,8 @@ where _ => (), } if let AccountEvent::Device(event) = event { - self.event_listener + self.management_interface + .notifier() .notify_device_event(DeviceEvent::from(event)); } } @@ -1367,14 +1357,14 @@ where // currently active access method. The announcement should be // made after the firewall policy has been updated, since the // new access method will be useless before then. - let event_listener = self.event_listener.clone(); + let notifier = self.management_interface.notifier().clone(); tokio::spawn(async move { // Wait for the firewall policy to be updated. let _ = completion_rx.await; // Let the emitter of this event know that the firewall has been updated. let _ = endpoint_active_tx.send(()); // Notify clients about the change if necessary. - event_listener.notify_new_access_method_event(setting); + notifier.notify_new_access_method_event(setting); }); } } @@ -1385,7 +1375,7 @@ where result: Result<PrivateAccountAndDevice, device::Error>, ) { let account_manager = self.account_manager.clone(); - let event_listener = self.event_listener.clone(); + let notifier = self.management_interface.notifier().clone(); tokio::spawn(async move { if let Ok(Some(_)) = account_manager .data_after_login() @@ -1414,7 +1404,7 @@ where new_state: DeviceState::LoggedOut, }, }; - event_listener.notify_device_event(event); + notifier.notify_device_event(event); } }); } @@ -1639,7 +1629,7 @@ where device_id: DeviceId, ) { let device_service = self.account_manager.device_service.clone(); - let event_listener = self.event_listener.clone(); + let notifier = self.management_interface.notifier().clone(); tokio::spawn(async move { let result = device_service @@ -1648,7 +1638,7 @@ where .map(move |new_devices| { // FIXME: We should be able to get away with only returning the removed ID, // and not have to request the list from the API. - event_listener.notify_remove_device_event(RemoveDeviceEvent { + notifier.notify_remove_device_event(RemoveDeviceEvent { account_token, new_devices, }); diff --git a/mullvad-daemon/src/main.rs b/mullvad-daemon/src/main.rs index 253f804965..86b718e323 100644 --- a/mullvad-daemon/src/main.rs +++ b/mullvad-daemon/src/main.rs @@ -1,15 +1,8 @@ +use std::{path::PathBuf, thread, time::Duration}; + #[cfg(not(windows))] use mullvad_daemon::cleanup_old_rpc_socket; -use mullvad_daemon::{ - logging, - management_interface::{ManagementInterfaceEventBroadcaster, ManagementInterfaceServer}, - rpc_uniqueness_check, runtime, version, Daemon, DaemonCommandChannel, DaemonCommandSender, -}; -use std::{ - path::{Path, PathBuf}, - thread, - time::Duration, -}; +use mullvad_daemon::{logging, rpc_uniqueness_check, runtime, version, Daemon}; use talpid_types::ErrorExt; mod cli; @@ -196,9 +189,7 @@ async fn run_standalone(log_dir: Option<PathBuf>) -> Result<(), String> { Ok(()) } -async fn create_daemon( - log_dir: Option<PathBuf>, -) -> Result<Daemon<ManagementInterfaceEventBroadcaster>, String> { +async fn create_daemon(log_dir: Option<PathBuf>) -> Result<Daemon, String> { let rpc_socket_path = mullvad_paths::get_rpc_socket_path(); let resource_dir = mullvad_paths::get_resource_dir(); let settings_dir = mullvad_paths::settings_dir() @@ -206,38 +197,17 @@ async fn create_daemon( let cache_dir = mullvad_paths::cache_dir() .map_err(|e| e.display_chain_with_msg("Unable to get cache dir"))?; - let command_channel = DaemonCommandChannel::new(); - let event_listener = spawn_management_interface(command_channel.sender(), rpc_socket_path)?; - Daemon::start( log_dir, resource_dir, settings_dir, cache_dir, - event_listener, - command_channel, + rpc_socket_path, ) .await .map_err(|e| e.display_chain_with_msg("Unable to initialize daemon")) } -fn spawn_management_interface( - command_sender: DaemonCommandSender, - rpc_socket_path: impl AsRef<Path>, -) -> Result<ManagementInterfaceEventBroadcaster, String> { - let event_broadcaster = ManagementInterfaceServer::start(command_sender, &rpc_socket_path) - .map_err(|error| { - error.display_chain_with_msg("Unable to start management interface server") - })?; - - log::info!( - "Management interface listening on {}", - rpc_socket_path.as_ref().display() - ); - - Ok(event_broadcaster) -} - #[cfg(unix)] fn running_as_admin() -> bool { let uid = unsafe { libc::getuid() }; diff --git a/mullvad-daemon/src/management_interface.rs b/mullvad-daemon/src/management_interface.rs index a7e2b23f4f..4d0f558a97 100644 --- a/mullvad-daemon/src/management_interface.rs +++ b/mullvad-daemon/src/management_interface.rs @@ -1,6 +1,4 @@ -use crate::{ - account_history, device, version_check, DaemonCommand, DaemonCommandSender, EventListener, -}; +use crate::{account_history, device, version_check, DaemonCommand, DaemonCommandSender}; use futures::{ channel::{mpsc, oneshot}, StreamExt, @@ -8,7 +6,7 @@ use futures::{ use mullvad_api::{rest::Error as RestError, StatusCode}; use mullvad_management_interface::{ types::{self, daemon_event, management_service_server::ManagementService}, - Code, Request, Response, Status, + Code, Request, Response, ServerJoinHandle, Status, }; use mullvad_types::{ account::AccountToken, @@ -28,8 +26,11 @@ use std::{ time::Duration, }; use talpid_types::ErrorExt; +use tokio::time::timeout; use tokio_stream::wrappers::UnboundedReceiverStream; +const RPC_SERVER_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(1); + #[derive(thiserror::Error, Debug)] pub enum Error { // Unable to start the management interface server @@ -1054,53 +1055,98 @@ impl ManagementServiceImpl { } } -pub struct ManagementInterfaceServer(()); +/// The running management interface serving gRPC requests. +pub struct ManagementInterfaceServer { + /// The rpc server spawned by [`Self::start`]. When the underlying join handle yields, the rpc + /// server has shutdown. + rpc_server_join_handle: ServerJoinHandle, + /// Channel used to signal the running gRPC server to shutdown. This needs to be done before + /// awaiting trying to join [`Self::rpc_server_join_handle`]. + server_abort_tx: mpsc::Sender<()>, + /// A reference to the associated [`ManagementInterfaceEventBroadcaster`]. This may be used to + /// broadcast certain events to all subscribers of the management interface. + broadcast: ManagementInterfaceEventBroadcaster, +} impl ManagementInterfaceServer { pub fn start( - tunnel_tx: DaemonCommandSender, + daemon_tx: DaemonCommandSender, rpc_socket_path: impl AsRef<Path>, - ) -> Result<ManagementInterfaceEventBroadcaster, Error> { + ) -> Result<ManagementInterfaceServer, Error> { let subscriptions = Arc::<Mutex<Vec<EventsListenerSender>>>::default(); - + // NOTE: It is important that the channel buffer size is kept at 0. When sending a signal + // to abort the gRPC server, the sender can be awaited to know when the gRPC server has + // received and started processing the shutdown signal. let (server_abort_tx, server_abort_rx) = mpsc::channel(0); let server = ManagementServiceImpl { - daemon_tx: tunnel_tx, + daemon_tx, subscriptions: subscriptions.clone(), }; - let join_handle = mullvad_management_interface::spawn_rpc_server( + let rpc_server_join_handle = mullvad_management_interface::spawn_rpc_server( server, async move { server_abort_rx.into_future().await; }, - rpc_socket_path, + &rpc_socket_path, ) .map_err(Error::SetupError)?; - tokio::spawn(async move { - if let Err(error) = join_handle.await { - log::error!("Management server panic: {}", error); - } - log::info!("Management interface shut down"); - }); + log::info!( + "Management interface listening on {}", + rpc_socket_path.as_ref().display() + ); + + let broadcast = ManagementInterfaceEventBroadcaster { subscriptions }; - Ok(ManagementInterfaceEventBroadcaster { - subscriptions, - _close_handle: server_abort_tx, + Ok(ManagementInterfaceServer { + rpc_server_join_handle, + server_abort_tx, + broadcast, }) } + + /// Wait for the server to shut down gracefully. If that does not happend within [`RPC_SERVER_SHUTDOWN_TIMEOUT`], + /// the gRPC server is aborted and we yield the async execution. + pub async fn stop(mut self) { + use futures::SinkExt; + // Send a singal to the underlying RPC server to shut down. + let _ = self.server_abort_tx.send(()).await; + + match timeout(RPC_SERVER_SHUTDOWN_TIMEOUT, self.rpc_server_join_handle).await { + // Joining the rpc server handle timed out + Err(timeout) => { + log::error!("Timed out while shutting down management server: {timeout}"); + } + Ok(join_result) => { + if let Err(_error) = join_result { + log::error!("Management server task failed to execute until completion"); + } + } + } + } + + /// Obtain a reference to the associated [`ManagementInterfaceEventBroadcaster`]. + pub const fn notifier(&self) -> &ManagementInterfaceEventBroadcaster { + &self.broadcast + } } /// A handle that allows broadcasting messages to all subscribers of the management interface. #[derive(Clone)] pub struct ManagementInterfaceEventBroadcaster { subscriptions: Arc<Mutex<Vec<EventsListenerSender>>>, - _close_handle: mpsc::Sender<()>, } -impl EventListener for ManagementInterfaceEventBroadcaster { +impl ManagementInterfaceEventBroadcaster { + fn notify(&self, value: types::DaemonEvent) { + let mut subscriptions = self.subscriptions.lock().unwrap(); + subscriptions.retain(|tx| tx.send(Ok(value.clone())).is_ok()); + } + + /// Notify that the tunnel state changed. + /// /// Sends a new state update to all `new_state` subscribers of the management interface. - fn notify_new_state(&self, new_state: TunnelState) { + pub(crate) fn notify_new_state(&self, new_state: TunnelState) { self.notify(types::DaemonEvent { event: Some(daemon_event::Event::TunnelState(types::TunnelState::from( new_state, @@ -1108,8 +1154,10 @@ impl EventListener for ManagementInterfaceEventBroadcaster { }) } + /// Notify that the settings changed. + /// /// Sends settings to all `settings` subscribers of the management interface. - fn notify_settings(&self, settings: Settings) { + pub(crate) fn notify_settings(&self, settings: Settings) { log::debug!("Broadcasting new settings"); self.notify(types::DaemonEvent { event: Some(daemon_event::Event::Settings(types::Settings::from( @@ -1118,8 +1166,10 @@ impl EventListener for ManagementInterfaceEventBroadcaster { }) } + /// Notify that the relay list changed. + /// /// Sends relays to all subscribers of the management interface. - fn notify_relay_list(&self, relay_list: RelayList) { + pub(crate) fn notify_relay_list(&self, relay_list: RelayList) { log::debug!("Broadcasting new relay list"); self.notify(types::DaemonEvent { event: Some(daemon_event::Event::RelayList(types::RelayList::from( @@ -1128,7 +1178,9 @@ impl EventListener for ManagementInterfaceEventBroadcaster { }) } - fn notify_app_version(&self, app_version_info: version::AppVersionInfo) { + /// Notify that info about the latest available app version changed. + /// Or some flag about the currently running version is changed. + pub(crate) fn notify_app_version(&self, app_version_info: version::AppVersionInfo) { log::debug!("Broadcasting new app version info"); self.notify(types::DaemonEvent { event: Some(daemon_event::Event::VersionInfo( @@ -1137,7 +1189,8 @@ impl EventListener for ManagementInterfaceEventBroadcaster { }) } - fn notify_device_event(&self, device: mullvad_types::device::DeviceEvent) { + /// Notify that device changed (login, logout, or key rotation). + pub(crate) fn notify_device_event(&self, device: mullvad_types::device::DeviceEvent) { log::debug!("Broadcasting device event"); self.notify(types::DaemonEvent { event: Some(daemon_event::Event::Device(types::DeviceEvent::from( @@ -1146,7 +1199,11 @@ impl EventListener for ManagementInterfaceEventBroadcaster { }) } - fn notify_remove_device_event(&self, remove_event: mullvad_types::device::RemoveDeviceEvent) { + /// Notify that a device was revoked using `RemoveDevice`. + pub(crate) fn notify_remove_device_event( + &self, + remove_event: mullvad_types::device::RemoveDeviceEvent, + ) { log::debug!("Broadcasting remove device event"); self.notify(types::DaemonEvent { event: Some(daemon_event::Event::RemoveDevice( @@ -1155,7 +1212,8 @@ impl EventListener for ManagementInterfaceEventBroadcaster { }) } - fn notify_new_access_method_event( + /// Notify that the api access method changed. + pub(crate) fn notify_new_access_method_event( &self, new_access_method: mullvad_types::access_method::AccessMethodSetting, ) { @@ -1168,13 +1226,6 @@ impl EventListener for ManagementInterfaceEventBroadcaster { } } -impl ManagementInterfaceEventBroadcaster { - fn notify(&self, value: types::DaemonEvent) { - let mut subscriptions = self.subscriptions.lock().unwrap(); - subscriptions.retain(|tx| tx.send(Ok(value.clone())).is_ok()); - } -} - /// Converts [`crate::Error`] into a tonic status. fn map_daemon_error(error: crate::Error) -> Status { use crate::Error as DaemonError; diff --git a/mullvad-jni/src/lib.rs b/mullvad-jni/src/lib.rs index 1cd6765439..6c045b061f 100644 --- a/mullvad-jni/src/lib.rs +++ b/mullvad-jni/src/lib.rs @@ -13,8 +13,7 @@ use jnix::{ FromJava, JnixEnv, }; use mullvad_daemon::{ - cleanup_old_rpc_socket, exception_logging, logging, - management_interface::ManagementInterfaceServer, runtime::new_multi_thread, version, Daemon, + cleanup_old_rpc_socket, exception_logging, logging, runtime::new_multi_thread, version, Daemon, DaemonCommandChannel, DaemonCommandSender, }; use std::{ @@ -48,9 +47,6 @@ pub enum Error { #[error("Failed to init Tokio runtime")] InitTokio(#[source] io::Error), - - #[error("Failed to spawn the management interface")] - SpawnManagementInterface(#[source] mullvad_daemon::management_interface::Error), } /// Throw a Java exception and return if `result` is an error @@ -169,7 +165,6 @@ fn spawn_daemon( rpc_socket, files_dir, cache_dir, - daemon_command_channel, android_context, ))?; @@ -184,23 +179,16 @@ async fn spawn_daemon_inner( rpc_socket: PathBuf, files_dir: PathBuf, cache_dir: PathBuf, - command_channel: DaemonCommandChannel, android_context: AndroidContext, ) -> Result<tokio::task::JoinHandle<()>, Error> { cleanup_old_rpc_socket(&rpc_socket).await; - let event_listener = ManagementInterfaceServer::start(command_channel.sender(), &rpc_socket) - .map_err(Error::SpawnManagementInterface)?; - - log::info!("Management interface listening on {}", rpc_socket.display()); - let daemon = Daemon::start( Some(files_dir.clone()), files_dir.clone(), files_dir, cache_dir, - event_listener, - command_channel, + rpc_socket, android_context, ) .await diff --git a/mullvad-management-interface/src/lib.rs b/mullvad-management-interface/src/lib.rs index 43b9974d59..76564ff3db 100644 --- a/mullvad-management-interface/src/lib.rs +++ b/mullvad-management-interface/src/lib.rs @@ -133,7 +133,7 @@ pub async fn new_rpc_client() -> Result<ManagementServiceClient, Error> { #[cfg(not(target_os = "android"))] pub use client::MullvadProxyClient; -pub type ServerJoinHandle = tokio::task::JoinHandle<Result<(), Error>>; +pub type ServerJoinHandle = tokio::task::JoinHandle<()>; pub fn spawn_rpc_server<T: ManagementService, F: Future<Output = ()> + Send + 'static>( service: T, @@ -164,11 +164,15 @@ pub fn spawn_rpc_server<T: ManagementService, F: Future<Output = ()> + Send + 's } Ok(tokio::spawn(async move { - Server::builder() + if let Err(execution_error) = Server::builder() .add_service(ManagementServiceServer::new(service)) .serve_with_incoming_shutdown(incoming.map_ok(StreamBox), abort_rx) .await .map_err(Error::GrpcTransportError) + { + log::error!("Management server panic: {execution_error}"); + } + log::trace!("gRPC server is shutting down"); })) } |
