diff options
| author | David Lönnhager <david.l@mullvad.net> | 2021-07-08 17:02:41 +0200 |
|---|---|---|
| committer | David Lönnhager <david.l@mullvad.net> | 2021-07-13 15:57:38 +0200 |
| commit | aa24d8a7e45f5712e380bacd57c3e463a6f9a006 (patch) | |
| tree | 18ce74ec55b7b0018c57756725efecf003ac5965 | |
| parent | e50e220dbc248f84903f85bad97c4e609415ff54 (diff) | |
| download | mullvadvpn-aa24d8a7e45f5712e380bacd57c3e463a6f9a006.tar.xz mullvadvpn-aa24d8a7e45f5712e380bacd57c3e463a6f9a006.zip | |
Fix management interface streams
| -rw-r--r-- | mullvad-daemon/src/management_interface.rs | 19 |
1 files changed, 9 insertions, 10 deletions
diff --git a/mullvad-daemon/src/management_interface.rs b/mullvad-daemon/src/management_interface.rs index ed69f84838..bdc5c4d4bb 100644 --- a/mullvad-daemon/src/management_interface.rs +++ b/mullvad-daemon/src/management_interface.rs @@ -27,6 +27,7 @@ use std::{ time::Duration, }; use talpid_types::ErrorExt; +use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream}; #[derive(err_derive::Error, Debug)] #[error(no_from)] @@ -42,8 +43,7 @@ struct ManagementServiceImpl { } pub type ServiceResult<T> = std::result::Result<Response<T>, Status>; -type EventsListenerReceiver = - tokio::sync::mpsc::UnboundedReceiver<Result<types::DaemonEvent, Status>>; +type EventsListenerReceiver = UnboundedReceiverStream<Result<types::DaemonEvent, Status>>; type EventsListenerSender = tokio::sync::mpsc::UnboundedSender<Result<types::DaemonEvent, Status>>; const INVALID_VOUCHER_MESSAGE: &str = "This voucher code is invalid"; @@ -51,9 +51,8 @@ const USED_VOUCHER_MESSAGE: &str = "This voucher code has already been used"; #[mullvad_management_interface::async_trait] impl ManagementService for ManagementServiceImpl { - type GetRelayLocationsStream = - tokio::sync::mpsc::Receiver<Result<types::RelayListCountry, Status>>; - type GetSplitTunnelProcessesStream = tokio::sync::mpsc::UnboundedReceiver<Result<i32, Status>>; + type GetRelayLocationsStream = ReceiverStream<Result<types::RelayListCountry, Status>>; + type GetSplitTunnelProcessesStream = UnboundedReceiverStream<Result<i32, Status>>; type EventsListenStream = EventsListenerReceiver; // Control and get the tunnel state @@ -102,7 +101,7 @@ impl ManagementService for ManagementServiceImpl { let mut subscriptions = self.subscriptions.write(); subscriptions.push(tx); - Ok(Response::new(rx)) + Ok(Response::new(UnboundedReceiverStream::new(rx))) } async fn prepare_restart(&self, _: Request<()>) -> ServiceResult<()> { @@ -194,7 +193,7 @@ impl ManagementService for ManagementServiceImpl { self.send_command_to_daemon(DaemonCommand::GetRelayLocations(tx))?; let locations = self.wait_for_result(rx).await?; - let (mut stream_tx, stream_rx) = + let (stream_tx, stream_rx) = tokio::sync::mpsc::channel(cmp::max(1, locations.countries.len())); tokio::spawn(async move { @@ -211,7 +210,7 @@ impl ManagementService for ManagementServiceImpl { } }); - Ok(Response::new(stream_rx)) + Ok(Response::new(ReceiverStream::new(stream_rx))) } async fn get_current_location(&self, _: Request<()>) -> ServiceResult<types::GeoIpLocation> { @@ -586,12 +585,12 @@ impl ManagementService for ManagementServiceImpl { } }); - Ok(Response::new(rx)) + Ok(Response::new(UnboundedReceiverStream::new(rx))) } #[cfg(not(target_os = "linux"))] { let (_, rx) = tokio::sync::mpsc::unbounded_channel(); - Ok(Response::new(rx)) + Ok(Response::new(UnboundedReceiverStream::new(rx))) } } |
