diff options
| author | David Lönnhager <david.l@mullvad.net> | 2021-11-03 19:21:04 +0100 |
|---|---|---|
| committer | David Lönnhager <david.l@mullvad.net> | 2021-11-09 10:11:38 +0100 |
| commit | 8e5d0a4862b37bab8f2b67da9db802579d29cbde (patch) | |
| tree | db2a7edcefc2bb63b641b35f2cc1a7798a71e10c | |
| parent | a1b1067d7e8fb30a7cda35ec4e39fcb95f8e7e78 (diff) | |
| download | mullvadvpn-8e5d0a4862b37bab8f2b67da9db802579d29cbde.tar.xz mullvadvpn-8e5d0a4862b37bab8f2b67da9db802579d29cbde.zip | |
Close management interface server when all broadcasters are closed
| -rw-r--r-- | Cargo.lock | 2 | ||||
| -rw-r--r-- | mullvad-daemon/Cargo.toml | 1 | ||||
| -rw-r--r-- | mullvad-daemon/src/main.rs | 10 | ||||
| -rw-r--r-- | mullvad-daemon/src/management_interface.rs | 84 | ||||
| -rw-r--r-- | mullvad-management-interface/Cargo.toml | 1 | ||||
| -rw-r--r-- | mullvad-management-interface/src/lib.rs | 24 |
6 files changed, 44 insertions, 78 deletions
diff --git a/Cargo.lock b/Cargo.lock index e49f9d1f00..1cc44a6f43 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1254,7 +1254,6 @@ dependencies = [ "talpid-types", "tokio", "tokio-stream", - "triggered", "uuid", "winapi 0.3.9", "windows-service", @@ -1312,7 +1311,6 @@ dependencies = [ "tonic", "tonic-build", "tower", - "triggered", ] [[package]] diff --git a/mullvad-daemon/Cargo.toml b/mullvad-daemon/Cargo.toml index cbe5d25e1c..25c85e5b87 100644 --- a/mullvad-daemon/Cargo.toml +++ b/mullvad-daemon/Cargo.toml @@ -36,7 +36,6 @@ talpid-types = { path = "../talpid-types" } talpid-platform-metadata = { path = "../talpid-platform-metadata" } [target.'cfg(not(target_os="android"))'.dependencies] -triggered = "0.1.1" mullvad-management-interface = { path = "../mullvad-management-interface" } [target.'cfg(target_os="android")'.dependencies] diff --git a/mullvad-daemon/src/main.rs b/mullvad-daemon/src/main.rs index 6ea88dd836..b438e112cc 100644 --- a/mullvad-daemon/src/main.rs +++ b/mullvad-daemon/src/main.rs @@ -157,15 +157,13 @@ async fn create_daemon( async fn spawn_management_interface( command_sender: DaemonCommandSender, ) -> Result<ManagementInterfaceEventBroadcaster, String> { - let server = ManagementInterfaceServer::start(command_sender) + let (socket_path, event_broadcaster) = ManagementInterfaceServer::start(command_sender) .await .map_err(|error| { - error.display_chain_with_msg("Unable to start management interface server") - })?; - let event_broadcaster = server.event_broadcaster(); + error.display_chain_with_msg("Unable to start management interface server") + })?; - info!("Management interface listening on {}", server.socket_path()); - tokio::spawn(server.run()); + info!("Management interface listening on {}", socket_path); Ok(event_broadcaster) } diff --git a/mullvad-daemon/src/management_interface.rs b/mullvad-daemon/src/management_interface.rs index fea356a01f..222e088193 100644 --- a/mullvad-daemon/src/management_interface.rs +++ b/mullvad-daemon/src/management_interface.rs @@ -1,5 +1,8 @@ use crate::{account_history, settings, DaemonCommand, DaemonCommandSender, EventListener}; -use futures::channel::oneshot; +use futures::{ + channel::{mpsc, oneshot}, + StreamExt, +}; use mullvad_management_interface::{ types::{self, daemon_event, management_service_server::ManagementService}, Code, Request, Response, Status, @@ -23,7 +26,7 @@ use std::path::PathBuf; use std::{ cmp, convert::{TryFrom, TryInto}, - sync::{mpsc, Arc}, + sync::Arc, time::Duration, }; use talpid_types::ErrorExt; @@ -719,70 +722,43 @@ impl ManagementServiceImpl { } } -pub struct ManagementInterfaceServer { - subscriptions: Arc<RwLock<Vec<EventsListenerSender>>>, - socket_path: String, - server_abort_tx: triggered::Trigger, - server_join_handle: Option< - tokio::task::JoinHandle<std::result::Result<(), mullvad_management_interface::Error>>, - >, -} +pub struct ManagementInterfaceServer(()); impl ManagementInterfaceServer { - pub async fn start(tunnel_tx: DaemonCommandSender) -> Result<Self, Error> { + pub async fn start( + tunnel_tx: DaemonCommandSender, + ) -> Result<(String, ManagementInterfaceEventBroadcaster), Error> { let subscriptions = Arc::<RwLock<Vec<EventsListenerSender>>>::default(); let socket_path = mullvad_paths::get_rpc_socket_path() .to_string_lossy() .to_string(); - let (server_abort_tx, server_abort_rx) = triggered::trigger(); - let (start_tx, start_rx) = mpsc::channel(); + let (server_abort_tx, server_abort_rx) = mpsc::channel(0); let server = ManagementServiceImpl { daemon_tx: tunnel_tx, subscriptions: subscriptions.clone(), }; - let server_join_handle = tokio::spawn(mullvad_management_interface::spawn_rpc_server( - server, - start_tx, - server_abort_rx, - )); - - if let Err(_) = start_rx.recv() { - return Err(server_join_handle - .await - .expect("Failed to resolve quit handle future") - .map_err(Error::SetupError) - .unwrap_err()); - } - - Ok(ManagementInterfaceServer { - subscriptions, - socket_path, - server_abort_tx, - server_join_handle: Some(server_join_handle), + let join_handle = mullvad_management_interface::spawn_rpc_server(server, async move { + server_abort_rx.into_future().await; }) - } + .await + .map_err(Error::SetupError)?; - pub fn socket_path(&self) -> &str { - &self.socket_path - } - - pub fn event_broadcaster(&self) -> ManagementInterfaceEventBroadcaster { - ManagementInterfaceEventBroadcaster { - subscriptions: self.subscriptions.clone(), - close_handle: self.server_abort_tx.clone(), - } - } - - /// Consumes the server and waits for it to finish. - pub async fn run(self) { - if let Some(server_join_handle) = self.server_join_handle { - if let Err(error) = server_join_handle.await { - log::error!("Management server panic: {:?}", error); + tokio::spawn(async move { + if let Err(error) = join_handle.await { + log::error!("Management server panic: {}", error); } log::info!("Management interface shut down"); - } + }); + + Ok(( + socket_path, + ManagementInterfaceEventBroadcaster { + subscriptions, + _close_handle: server_abort_tx, + }, + )) } } @@ -790,7 +766,7 @@ impl ManagementInterfaceServer { #[derive(Clone)] pub struct ManagementInterfaceEventBroadcaster { subscriptions: Arc<RwLock<Vec<EventsListenerSender>>>, - close_handle: triggered::Trigger, + _close_handle: mpsc::Sender<()>, } impl EventListener for ManagementInterfaceEventBroadcaster { @@ -857,12 +833,6 @@ impl ManagementInterfaceEventBroadcaster { } } -impl Drop for ManagementInterfaceEventBroadcaster { - fn drop(&mut self) { - self.close_handle.trigger(); - } -} - /// Converts [`mullvad_daemon::Error`] into a tonic status. fn map_daemon_error(error: crate::Error) -> Status { use crate::Error as DaemonError; diff --git a/mullvad-management-interface/Cargo.toml b/mullvad-management-interface/Cargo.toml index 346bf7f9b1..6a4100715b 100644 --- a/mullvad-management-interface/Cargo.toml +++ b/mullvad-management-interface/Cargo.toml @@ -19,7 +19,6 @@ prost-types = "0.8" parity-tokio-ipc = "0.9" futures = "0.3" tokio = { version = "1.8", features = [ "rt" ] } -triggered = "0.1.1" log = "0.4" [target.'cfg(unix)'.dependencies] diff --git a/mullvad-management-interface/src/lib.rs b/mullvad-management-interface/src/lib.rs index 69cdc01d74..c5b4d80487 100644 --- a/mullvad-management-interface/src/lib.rs +++ b/mullvad-management-interface/src/lib.rs @@ -4,6 +4,7 @@ use parity_tokio_ipc::Endpoint as IpcEndpoint; #[cfg(unix)] use std::{env, fs, os::unix::fs::PermissionsExt}; use std::{ + future::Future, io, pin::Pin, task::{Context, Poll}, @@ -66,11 +67,12 @@ pub async fn new_rpc_client() -> Result<ManagementServiceClient, Error> { Ok(ManagementServiceClient::new(channel)) } -pub async fn spawn_rpc_server<T: ManagementService>( +pub type ServerJoinHandle = tokio::task::JoinHandle<Result<(), Error>>; + +pub async fn spawn_rpc_server<T: ManagementService, F: Future<Output = ()> + Send + 'static>( service: T, - server_start_tx: std::sync::mpsc::Sender<()>, - abort_rx: triggered::Listener, -) -> std::result::Result<(), Error> { + abort_rx: F, +) -> std::result::Result<ServerJoinHandle, Error> { use futures::stream::TryStreamExt; use parity_tokio_ipc::SecurityAttributes; @@ -95,13 +97,13 @@ pub async fn spawn_rpc_server<T: ManagementService>( .map_err(Error::PermissionsError)?; } - let _ = server_start_tx.send(()); - - Server::builder() - .add_service(ManagementServiceServer::new(service)) - .serve_with_incoming_shutdown(incoming.map_ok(StreamBox), abort_rx) - .await - .map_err(Error::GrpcTransportError) + Ok(tokio::spawn(async move { + Server::builder() + .add_service(ManagementServiceServer::new(service)) + .serve_with_incoming_shutdown(incoming.map_ok(StreamBox), abort_rx) + .await + .map_err(Error::GrpcTransportError) + })) } #[derive(Debug)] |
