summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock2
-rw-r--r--mullvad-daemon/Cargo.toml1
-rw-r--r--mullvad-daemon/src/main.rs10
-rw-r--r--mullvad-daemon/src/management_interface.rs84
-rw-r--r--mullvad-management-interface/Cargo.toml1
-rw-r--r--mullvad-management-interface/src/lib.rs24
6 files changed, 44 insertions, 78 deletions
diff --git a/Cargo.lock b/Cargo.lock
index e49f9d1f00..1cc44a6f43 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1254,7 +1254,6 @@ dependencies = [
"talpid-types",
"tokio",
"tokio-stream",
- "triggered",
"uuid",
"winapi 0.3.9",
"windows-service",
@@ -1312,7 +1311,6 @@ dependencies = [
"tonic",
"tonic-build",
"tower",
- "triggered",
]
[[package]]
diff --git a/mullvad-daemon/Cargo.toml b/mullvad-daemon/Cargo.toml
index cbe5d25e1c..25c85e5b87 100644
--- a/mullvad-daemon/Cargo.toml
+++ b/mullvad-daemon/Cargo.toml
@@ -36,7 +36,6 @@ talpid-types = { path = "../talpid-types" }
talpid-platform-metadata = { path = "../talpid-platform-metadata" }
[target.'cfg(not(target_os="android"))'.dependencies]
-triggered = "0.1.1"
mullvad-management-interface = { path = "../mullvad-management-interface" }
[target.'cfg(target_os="android")'.dependencies]
diff --git a/mullvad-daemon/src/main.rs b/mullvad-daemon/src/main.rs
index 6ea88dd836..b438e112cc 100644
--- a/mullvad-daemon/src/main.rs
+++ b/mullvad-daemon/src/main.rs
@@ -157,15 +157,13 @@ async fn create_daemon(
async fn spawn_management_interface(
command_sender: DaemonCommandSender,
) -> Result<ManagementInterfaceEventBroadcaster, String> {
- let server = ManagementInterfaceServer::start(command_sender)
+ let (socket_path, event_broadcaster) = ManagementInterfaceServer::start(command_sender)
.await
.map_err(|error| {
- error.display_chain_with_msg("Unable to start management interface server")
- })?;
- let event_broadcaster = server.event_broadcaster();
+ error.display_chain_with_msg("Unable to start management interface server")
+ })?;
- info!("Management interface listening on {}", server.socket_path());
- tokio::spawn(server.run());
+ info!("Management interface listening on {}", socket_path);
Ok(event_broadcaster)
}
diff --git a/mullvad-daemon/src/management_interface.rs b/mullvad-daemon/src/management_interface.rs
index fea356a01f..222e088193 100644
--- a/mullvad-daemon/src/management_interface.rs
+++ b/mullvad-daemon/src/management_interface.rs
@@ -1,5 +1,8 @@
use crate::{account_history, settings, DaemonCommand, DaemonCommandSender, EventListener};
-use futures::channel::oneshot;
+use futures::{
+ channel::{mpsc, oneshot},
+ StreamExt,
+};
use mullvad_management_interface::{
types::{self, daemon_event, management_service_server::ManagementService},
Code, Request, Response, Status,
@@ -23,7 +26,7 @@ use std::path::PathBuf;
use std::{
cmp,
convert::{TryFrom, TryInto},
- sync::{mpsc, Arc},
+ sync::Arc,
time::Duration,
};
use talpid_types::ErrorExt;
@@ -719,70 +722,43 @@ impl ManagementServiceImpl {
}
}
-pub struct ManagementInterfaceServer {
- subscriptions: Arc<RwLock<Vec<EventsListenerSender>>>,
- socket_path: String,
- server_abort_tx: triggered::Trigger,
- server_join_handle: Option<
- tokio::task::JoinHandle<std::result::Result<(), mullvad_management_interface::Error>>,
- >,
-}
+pub struct ManagementInterfaceServer(());
impl ManagementInterfaceServer {
- pub async fn start(tunnel_tx: DaemonCommandSender) -> Result<Self, Error> {
+ pub async fn start(
+ tunnel_tx: DaemonCommandSender,
+ ) -> Result<(String, ManagementInterfaceEventBroadcaster), Error> {
let subscriptions = Arc::<RwLock<Vec<EventsListenerSender>>>::default();
let socket_path = mullvad_paths::get_rpc_socket_path()
.to_string_lossy()
.to_string();
- let (server_abort_tx, server_abort_rx) = triggered::trigger();
- let (start_tx, start_rx) = mpsc::channel();
+ let (server_abort_tx, server_abort_rx) = mpsc::channel(0);
let server = ManagementServiceImpl {
daemon_tx: tunnel_tx,
subscriptions: subscriptions.clone(),
};
- let server_join_handle = tokio::spawn(mullvad_management_interface::spawn_rpc_server(
- server,
- start_tx,
- server_abort_rx,
- ));
-
- if let Err(_) = start_rx.recv() {
- return Err(server_join_handle
- .await
- .expect("Failed to resolve quit handle future")
- .map_err(Error::SetupError)
- .unwrap_err());
- }
-
- Ok(ManagementInterfaceServer {
- subscriptions,
- socket_path,
- server_abort_tx,
- server_join_handle: Some(server_join_handle),
+ let join_handle = mullvad_management_interface::spawn_rpc_server(server, async move {
+ server_abort_rx.into_future().await;
})
- }
+ .await
+ .map_err(Error::SetupError)?;
- pub fn socket_path(&self) -> &str {
- &self.socket_path
- }
-
- pub fn event_broadcaster(&self) -> ManagementInterfaceEventBroadcaster {
- ManagementInterfaceEventBroadcaster {
- subscriptions: self.subscriptions.clone(),
- close_handle: self.server_abort_tx.clone(),
- }
- }
-
- /// Consumes the server and waits for it to finish.
- pub async fn run(self) {
- if let Some(server_join_handle) = self.server_join_handle {
- if let Err(error) = server_join_handle.await {
- log::error!("Management server panic: {:?}", error);
+ tokio::spawn(async move {
+ if let Err(error) = join_handle.await {
+ log::error!("Management server panic: {}", error);
}
log::info!("Management interface shut down");
- }
+ });
+
+ Ok((
+ socket_path,
+ ManagementInterfaceEventBroadcaster {
+ subscriptions,
+ _close_handle: server_abort_tx,
+ },
+ ))
}
}
@@ -790,7 +766,7 @@ impl ManagementInterfaceServer {
#[derive(Clone)]
pub struct ManagementInterfaceEventBroadcaster {
subscriptions: Arc<RwLock<Vec<EventsListenerSender>>>,
- close_handle: triggered::Trigger,
+ _close_handle: mpsc::Sender<()>,
}
impl EventListener for ManagementInterfaceEventBroadcaster {
@@ -857,12 +833,6 @@ impl ManagementInterfaceEventBroadcaster {
}
}
-impl Drop for ManagementInterfaceEventBroadcaster {
- fn drop(&mut self) {
- self.close_handle.trigger();
- }
-}
-
/// Converts [`mullvad_daemon::Error`] into a tonic status.
fn map_daemon_error(error: crate::Error) -> Status {
use crate::Error as DaemonError;
diff --git a/mullvad-management-interface/Cargo.toml b/mullvad-management-interface/Cargo.toml
index 346bf7f9b1..6a4100715b 100644
--- a/mullvad-management-interface/Cargo.toml
+++ b/mullvad-management-interface/Cargo.toml
@@ -19,7 +19,6 @@ prost-types = "0.8"
parity-tokio-ipc = "0.9"
futures = "0.3"
tokio = { version = "1.8", features = [ "rt" ] }
-triggered = "0.1.1"
log = "0.4"
[target.'cfg(unix)'.dependencies]
diff --git a/mullvad-management-interface/src/lib.rs b/mullvad-management-interface/src/lib.rs
index 69cdc01d74..c5b4d80487 100644
--- a/mullvad-management-interface/src/lib.rs
+++ b/mullvad-management-interface/src/lib.rs
@@ -4,6 +4,7 @@ use parity_tokio_ipc::Endpoint as IpcEndpoint;
#[cfg(unix)]
use std::{env, fs, os::unix::fs::PermissionsExt};
use std::{
+ future::Future,
io,
pin::Pin,
task::{Context, Poll},
@@ -66,11 +67,12 @@ pub async fn new_rpc_client() -> Result<ManagementServiceClient, Error> {
Ok(ManagementServiceClient::new(channel))
}
-pub async fn spawn_rpc_server<T: ManagementService>(
+pub type ServerJoinHandle = tokio::task::JoinHandle<Result<(), Error>>;
+
+pub async fn spawn_rpc_server<T: ManagementService, F: Future<Output = ()> + Send + 'static>(
service: T,
- server_start_tx: std::sync::mpsc::Sender<()>,
- abort_rx: triggered::Listener,
-) -> std::result::Result<(), Error> {
+ abort_rx: F,
+) -> std::result::Result<ServerJoinHandle, Error> {
use futures::stream::TryStreamExt;
use parity_tokio_ipc::SecurityAttributes;
@@ -95,13 +97,13 @@ pub async fn spawn_rpc_server<T: ManagementService>(
.map_err(Error::PermissionsError)?;
}
- let _ = server_start_tx.send(());
-
- Server::builder()
- .add_service(ManagementServiceServer::new(service))
- .serve_with_incoming_shutdown(incoming.map_ok(StreamBox), abort_rx)
- .await
- .map_err(Error::GrpcTransportError)
+ Ok(tokio::spawn(async move {
+ Server::builder()
+ .add_service(ManagementServiceServer::new(service))
+ .serve_with_incoming_shutdown(incoming.map_ok(StreamBox), abort_rx)
+ .await
+ .map_err(Error::GrpcTransportError)
+ }))
}
#[derive(Debug)]