summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorDavid Lönnhager <david.l@mullvad.net>2021-11-09 10:12:25 +0100
committerDavid Lönnhager <david.l@mullvad.net>2021-11-09 10:12:25 +0100
commit51514cf10148bc797a52354903c736afe104b233 (patch)
treedb2a7edcefc2bb63b641b35f2cc1a7798a71e10c
parent43f278edf6e2723caa09c8217211490462bab5d8 (diff)
parent8e5d0a4862b37bab8f2b67da9db802579d29cbde (diff)
downloadmullvadvpn-51514cf10148bc797a52354903c736afe104b233.tar.xz
mullvadvpn-51514cf10148bc797a52354903c736afe104b233.zip
Merge branch 'fix-management-interface-cleanup'
-rw-r--r--Cargo.lock2
-rw-r--r--mullvad-daemon/Cargo.toml1
-rw-r--r--mullvad-daemon/src/lib.rs6
-rw-r--r--mullvad-daemon/src/main.rs10
-rw-r--r--mullvad-daemon/src/management_interface.rs84
-rw-r--r--mullvad-daemon/src/relays.rs38
-rw-r--r--mullvad-management-interface/Cargo.toml1
-rw-r--r--mullvad-management-interface/src/lib.rs24
8 files changed, 54 insertions, 112 deletions
diff --git a/Cargo.lock b/Cargo.lock
index e49f9d1f00..1cc44a6f43 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1254,7 +1254,6 @@ dependencies = [
"talpid-types",
"tokio",
"tokio-stream",
- "triggered",
"uuid",
"winapi 0.3.9",
"windows-service",
@@ -1312,7 +1311,6 @@ dependencies = [
"tonic",
"tonic-build",
"tower",
- "triggered",
]
[[package]]
diff --git a/mullvad-daemon/Cargo.toml b/mullvad-daemon/Cargo.toml
index cbe5d25e1c..25c85e5b87 100644
--- a/mullvad-daemon/Cargo.toml
+++ b/mullvad-daemon/Cargo.toml
@@ -36,7 +36,6 @@ talpid-types = { path = "../talpid-types" }
talpid-platform-metadata = { path = "../talpid-platform-metadata" }
[target.'cfg(not(target_os="android"))'.dependencies]
-triggered = "0.1.1"
mullvad-management-interface = { path = "../mullvad-management-interface" }
[target.'cfg(target_os="android")'.dependencies]
diff --git a/mullvad-daemon/src/lib.rs b/mullvad-daemon/src/lib.rs
index 3bc7b486bc..9d64a67bea 100644
--- a/mullvad-daemon/src/lib.rs
+++ b/mullvad-daemon/src/lib.rs
@@ -735,11 +735,7 @@ where
);
// Attempt to download a fresh relay list
- let mut relay_handle = relay_selector.updater_handle();
- relay_handle
- .update_relay_list_deferred()
- .await
- .expect("Relay list updated thread has stopped unexpectedly");
+ relay_selector.update().await;
let mut daemon = Daemon {
tunnel_command_tx,
diff --git a/mullvad-daemon/src/main.rs b/mullvad-daemon/src/main.rs
index 6ea88dd836..b438e112cc 100644
--- a/mullvad-daemon/src/main.rs
+++ b/mullvad-daemon/src/main.rs
@@ -157,15 +157,13 @@ async fn create_daemon(
async fn spawn_management_interface(
command_sender: DaemonCommandSender,
) -> Result<ManagementInterfaceEventBroadcaster, String> {
- let server = ManagementInterfaceServer::start(command_sender)
+ let (socket_path, event_broadcaster) = ManagementInterfaceServer::start(command_sender)
.await
.map_err(|error| {
- error.display_chain_with_msg("Unable to start management interface server")
- })?;
- let event_broadcaster = server.event_broadcaster();
+ error.display_chain_with_msg("Unable to start management interface server")
+ })?;
- info!("Management interface listening on {}", server.socket_path());
- tokio::spawn(server.run());
+ info!("Management interface listening on {}", socket_path);
Ok(event_broadcaster)
}
diff --git a/mullvad-daemon/src/management_interface.rs b/mullvad-daemon/src/management_interface.rs
index fea356a01f..222e088193 100644
--- a/mullvad-daemon/src/management_interface.rs
+++ b/mullvad-daemon/src/management_interface.rs
@@ -1,5 +1,8 @@
use crate::{account_history, settings, DaemonCommand, DaemonCommandSender, EventListener};
-use futures::channel::oneshot;
+use futures::{
+ channel::{mpsc, oneshot},
+ StreamExt,
+};
use mullvad_management_interface::{
types::{self, daemon_event, management_service_server::ManagementService},
Code, Request, Response, Status,
@@ -23,7 +26,7 @@ use std::path::PathBuf;
use std::{
cmp,
convert::{TryFrom, TryInto},
- sync::{mpsc, Arc},
+ sync::Arc,
time::Duration,
};
use talpid_types::ErrorExt;
@@ -719,70 +722,43 @@ impl ManagementServiceImpl {
}
}
-pub struct ManagementInterfaceServer {
- subscriptions: Arc<RwLock<Vec<EventsListenerSender>>>,
- socket_path: String,
- server_abort_tx: triggered::Trigger,
- server_join_handle: Option<
- tokio::task::JoinHandle<std::result::Result<(), mullvad_management_interface::Error>>,
- >,
-}
+pub struct ManagementInterfaceServer(());
impl ManagementInterfaceServer {
- pub async fn start(tunnel_tx: DaemonCommandSender) -> Result<Self, Error> {
+ pub async fn start(
+ tunnel_tx: DaemonCommandSender,
+ ) -> Result<(String, ManagementInterfaceEventBroadcaster), Error> {
let subscriptions = Arc::<RwLock<Vec<EventsListenerSender>>>::default();
let socket_path = mullvad_paths::get_rpc_socket_path()
.to_string_lossy()
.to_string();
- let (server_abort_tx, server_abort_rx) = triggered::trigger();
- let (start_tx, start_rx) = mpsc::channel();
+ let (server_abort_tx, server_abort_rx) = mpsc::channel(0);
let server = ManagementServiceImpl {
daemon_tx: tunnel_tx,
subscriptions: subscriptions.clone(),
};
- let server_join_handle = tokio::spawn(mullvad_management_interface::spawn_rpc_server(
- server,
- start_tx,
- server_abort_rx,
- ));
-
- if let Err(_) = start_rx.recv() {
- return Err(server_join_handle
- .await
- .expect("Failed to resolve quit handle future")
- .map_err(Error::SetupError)
- .unwrap_err());
- }
-
- Ok(ManagementInterfaceServer {
- subscriptions,
- socket_path,
- server_abort_tx,
- server_join_handle: Some(server_join_handle),
+ let join_handle = mullvad_management_interface::spawn_rpc_server(server, async move {
+ server_abort_rx.into_future().await;
})
- }
+ .await
+ .map_err(Error::SetupError)?;
- pub fn socket_path(&self) -> &str {
- &self.socket_path
- }
-
- pub fn event_broadcaster(&self) -> ManagementInterfaceEventBroadcaster {
- ManagementInterfaceEventBroadcaster {
- subscriptions: self.subscriptions.clone(),
- close_handle: self.server_abort_tx.clone(),
- }
- }
-
- /// Consumes the server and waits for it to finish.
- pub async fn run(self) {
- if let Some(server_join_handle) = self.server_join_handle {
- if let Err(error) = server_join_handle.await {
- log::error!("Management server panic: {:?}", error);
+ tokio::spawn(async move {
+ if let Err(error) = join_handle.await {
+ log::error!("Management server panic: {}", error);
}
log::info!("Management interface shut down");
- }
+ });
+
+ Ok((
+ socket_path,
+ ManagementInterfaceEventBroadcaster {
+ subscriptions,
+ _close_handle: server_abort_tx,
+ },
+ ))
}
}
@@ -790,7 +766,7 @@ impl ManagementInterfaceServer {
#[derive(Clone)]
pub struct ManagementInterfaceEventBroadcaster {
subscriptions: Arc<RwLock<Vec<EventsListenerSender>>>,
- close_handle: triggered::Trigger,
+ _close_handle: mpsc::Sender<()>,
}
impl EventListener for ManagementInterfaceEventBroadcaster {
@@ -857,12 +833,6 @@ impl ManagementInterfaceEventBroadcaster {
}
}
-impl Drop for ManagementInterfaceEventBroadcaster {
- fn drop(&mut self) {
- self.close_handle.trigger();
- }
-}
-
/// Converts [`mullvad_daemon::Error`] into a tonic status.
fn map_daemon_error(error: crate::Error) -> Status {
use crate::Error as DaemonError;
diff --git a/mullvad-daemon/src/relays.rs b/mullvad-daemon/src/relays.rs
index f1f5ec8e9e..45541069f3 100644
--- a/mullvad-daemon/src/relays.rs
+++ b/mullvad-daemon/src/relays.rs
@@ -224,20 +224,12 @@ impl RelaySelector {
}
/// Download the newest relay list.
- pub fn update(&mut self) -> impl Future<Output = ()> {
- let mut updater = self.updater.as_ref().unwrap().clone();
- async move {
- updater
- .update_relay_list()
- .await
- .expect("Relay list updated thread has stopped unexpectedly");
+ pub async fn update(&self) {
+ if let Some(mut updater) = self.updater.clone() {
+ let _ = updater.update_relay_list().await;
}
}
- pub fn updater_handle(&self) -> RelayListUpdaterHandle {
- self.updater.as_ref().unwrap().clone()
- }
-
/// Returns all countries and cities. The cities in the object returned does not have any
/// relays in them.
pub fn get_locations(&mut self) -> RelayList {
@@ -992,20 +984,13 @@ impl RelaySelector {
#[derive(Clone)]
pub struct RelayListUpdaterHandle {
- tx: mpsc::Sender<bool>,
+ tx: mpsc::Sender<()>,
}
impl RelayListUpdaterHandle {
pub async fn update_relay_list(&mut self) -> Result<(), Error> {
self.tx
- .send(false)
- .await
- .map_err(|_| Error::DownloaderShutDown)
- }
-
- pub async fn update_relay_list_deferred(&mut self) -> Result<(), Error> {
- self.tx
- .send(true)
+ .send(())
.await
.map_err(|_| Error::DownloaderShutDown)
}
@@ -1045,7 +1030,7 @@ impl RelayListUpdater {
RelayListUpdaterHandle { tx }
}
- async fn run(mut self, mut cmd_rx: mpsc::Receiver<bool>) {
+ async fn run(mut self, mut cmd_rx: mpsc::Receiver<()>) {
let mut check_interval =
tokio_stream::wrappers::IntervalStream::new(tokio::time::interval_at(
(Instant::now() + UPDATE_CHECK_INTERVAL).into(),
@@ -1070,17 +1055,12 @@ impl RelayListUpdater {
cmd = cmd_rx.next() => {
match cmd {
- Some(defer) => {
+ Some(()) => {
let tag = self.parsed_relays.lock().tag().map(|tag| tag.to_string());
- if defer {
- let download_future = Self::download_relay_list(self.api_availability.clone(), self.rpc_client.clone(), tag);
- self.consume_new_relay_list(download_future.await).await;
- } else {
- self.consume_new_relay_list(self.rpc_client.relay_list(tag).await.map_err(mullvad_rpc::Error::from)).await;
- }
+ download_future = Box::pin(Self::download_relay_list(self.api_availability.clone(), self.rpc_client.clone(), tag).fuse());
},
None => {
- log::error!("Relay list updater shutting down");
+ log::trace!("Relay list updater shutting down");
return;
}
}
diff --git a/mullvad-management-interface/Cargo.toml b/mullvad-management-interface/Cargo.toml
index 346bf7f9b1..6a4100715b 100644
--- a/mullvad-management-interface/Cargo.toml
+++ b/mullvad-management-interface/Cargo.toml
@@ -19,7 +19,6 @@ prost-types = "0.8"
parity-tokio-ipc = "0.9"
futures = "0.3"
tokio = { version = "1.8", features = [ "rt" ] }
-triggered = "0.1.1"
log = "0.4"
[target.'cfg(unix)'.dependencies]
diff --git a/mullvad-management-interface/src/lib.rs b/mullvad-management-interface/src/lib.rs
index 69cdc01d74..c5b4d80487 100644
--- a/mullvad-management-interface/src/lib.rs
+++ b/mullvad-management-interface/src/lib.rs
@@ -4,6 +4,7 @@ use parity_tokio_ipc::Endpoint as IpcEndpoint;
#[cfg(unix)]
use std::{env, fs, os::unix::fs::PermissionsExt};
use std::{
+ future::Future,
io,
pin::Pin,
task::{Context, Poll},
@@ -66,11 +67,12 @@ pub async fn new_rpc_client() -> Result<ManagementServiceClient, Error> {
Ok(ManagementServiceClient::new(channel))
}
-pub async fn spawn_rpc_server<T: ManagementService>(
+pub type ServerJoinHandle = tokio::task::JoinHandle<Result<(), Error>>;
+
+pub async fn spawn_rpc_server<T: ManagementService, F: Future<Output = ()> + Send + 'static>(
service: T,
- server_start_tx: std::sync::mpsc::Sender<()>,
- abort_rx: triggered::Listener,
-) -> std::result::Result<(), Error> {
+ abort_rx: F,
+) -> std::result::Result<ServerJoinHandle, Error> {
use futures::stream::TryStreamExt;
use parity_tokio_ipc::SecurityAttributes;
@@ -95,13 +97,13 @@ pub async fn spawn_rpc_server<T: ManagementService>(
.map_err(Error::PermissionsError)?;
}
- let _ = server_start_tx.send(());
-
- Server::builder()
- .add_service(ManagementServiceServer::new(service))
- .serve_with_incoming_shutdown(incoming.map_ok(StreamBox), abort_rx)
- .await
- .map_err(Error::GrpcTransportError)
+ Ok(tokio::spawn(async move {
+ Server::builder()
+ .add_service(ManagementServiceServer::new(service))
+ .serve_with_incoming_shutdown(incoming.map_ok(StreamBox), abort_rx)
+ .await
+ .map_err(Error::GrpcTransportError)
+ }))
}
#[derive(Debug)]