summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorDavid Lönnhager <david.l@mullvad.net>2021-07-08 17:02:41 +0200
committerDavid Lönnhager <david.l@mullvad.net>2021-07-13 15:57:38 +0200
commitaa24d8a7e45f5712e380bacd57c3e463a6f9a006 (patch)
tree18ce74ec55b7b0018c57756725efecf003ac5965
parente50e220dbc248f84903f85bad97c4e609415ff54 (diff)
downloadmullvadvpn-aa24d8a7e45f5712e380bacd57c3e463a6f9a006.tar.xz
mullvadvpn-aa24d8a7e45f5712e380bacd57c3e463a6f9a006.zip
Fix management interface streams
-rw-r--r--mullvad-daemon/src/management_interface.rs19
1 files changed, 9 insertions, 10 deletions
diff --git a/mullvad-daemon/src/management_interface.rs b/mullvad-daemon/src/management_interface.rs
index ed69f84838..bdc5c4d4bb 100644
--- a/mullvad-daemon/src/management_interface.rs
+++ b/mullvad-daemon/src/management_interface.rs
@@ -27,6 +27,7 @@ use std::{
time::Duration,
};
use talpid_types::ErrorExt;
+use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream};
#[derive(err_derive::Error, Debug)]
#[error(no_from)]
@@ -42,8 +43,7 @@ struct ManagementServiceImpl {
}
pub type ServiceResult<T> = std::result::Result<Response<T>, Status>;
-type EventsListenerReceiver =
- tokio::sync::mpsc::UnboundedReceiver<Result<types::DaemonEvent, Status>>;
+type EventsListenerReceiver = UnboundedReceiverStream<Result<types::DaemonEvent, Status>>;
type EventsListenerSender = tokio::sync::mpsc::UnboundedSender<Result<types::DaemonEvent, Status>>;
const INVALID_VOUCHER_MESSAGE: &str = "This voucher code is invalid";
@@ -51,9 +51,8 @@ const USED_VOUCHER_MESSAGE: &str = "This voucher code has already been used";
#[mullvad_management_interface::async_trait]
impl ManagementService for ManagementServiceImpl {
- type GetRelayLocationsStream =
- tokio::sync::mpsc::Receiver<Result<types::RelayListCountry, Status>>;
- type GetSplitTunnelProcessesStream = tokio::sync::mpsc::UnboundedReceiver<Result<i32, Status>>;
+ type GetRelayLocationsStream = ReceiverStream<Result<types::RelayListCountry, Status>>;
+ type GetSplitTunnelProcessesStream = UnboundedReceiverStream<Result<i32, Status>>;
type EventsListenStream = EventsListenerReceiver;
// Control and get the tunnel state
@@ -102,7 +101,7 @@ impl ManagementService for ManagementServiceImpl {
let mut subscriptions = self.subscriptions.write();
subscriptions.push(tx);
- Ok(Response::new(rx))
+ Ok(Response::new(UnboundedReceiverStream::new(rx)))
}
async fn prepare_restart(&self, _: Request<()>) -> ServiceResult<()> {
@@ -194,7 +193,7 @@ impl ManagementService for ManagementServiceImpl {
self.send_command_to_daemon(DaemonCommand::GetRelayLocations(tx))?;
let locations = self.wait_for_result(rx).await?;
- let (mut stream_tx, stream_rx) =
+ let (stream_tx, stream_rx) =
tokio::sync::mpsc::channel(cmp::max(1, locations.countries.len()));
tokio::spawn(async move {
@@ -211,7 +210,7 @@ impl ManagementService for ManagementServiceImpl {
}
});
- Ok(Response::new(stream_rx))
+ Ok(Response::new(ReceiverStream::new(stream_rx)))
}
async fn get_current_location(&self, _: Request<()>) -> ServiceResult<types::GeoIpLocation> {
@@ -586,12 +585,12 @@ impl ManagementService for ManagementServiceImpl {
}
});
- Ok(Response::new(rx))
+ Ok(Response::new(UnboundedReceiverStream::new(rx)))
}
#[cfg(not(target_os = "linux"))]
{
let (_, rx) = tokio::sync::mpsc::unbounded_channel();
- Ok(Response::new(rx))
+ Ok(Response::new(UnboundedReceiverStream::new(rx)))
}
}