diff options
| -rw-r--r-- | mullvad-daemon/src/lib.rs | 32 | ||||
| -rw-r--r-- | mullvad-daemon/src/management_interface.rs | 505 |
2 files changed, 232 insertions, 305 deletions
diff --git a/mullvad-daemon/src/lib.rs b/mullvad-daemon/src/lib.rs index 414d64e267..c987e0d5c1 100644 --- a/mullvad-daemon/src/lib.rs +++ b/mullvad-daemon/src/lib.rs @@ -18,15 +18,18 @@ mod settings; pub mod version; mod version_check; -use futures::future::{abortable, AbortHandle}; -use futures01::{ - future::{self, Executor}, - stream::Wait, - sync::{ +use futures::{ + channel::{ mpsc::{UnboundedReceiver, UnboundedSender}, oneshot, }, - Future, Stream, + executor::BlockingStream, + future::{abortable, AbortHandle}, +}; +use futures01::{ + future::{self, Executor}, + sync::{mpsc as old_mpsc, oneshot as old_oneshot}, + Future, }; use log::{debug, error, info, warn}; use mullvad_rpc::AccountsProxy; @@ -343,7 +346,7 @@ pub struct DaemonCommandChannel { impl DaemonCommandChannel { pub fn new() -> Self { - let (untracked_sender, receiver) = futures01::sync::mpsc::unbounded(); + let (untracked_sender, receiver) = futures::channel::mpsc::unbounded(); let sender = DaemonCommandSender(Arc::new(untracked_sender)); Self { sender, receiver } @@ -454,13 +457,13 @@ pub trait EventListener { } pub struct Daemon<L: EventListener> { - tunnel_command_tx: Arc<UnboundedSender<TunnelCommand>>, + tunnel_command_tx: Arc<old_mpsc::UnboundedSender<TunnelCommand>>, tunnel_state: TunnelState, target_state: TargetState, state: DaemonExecutionState, #[cfg(target_os = "linux")] exclude_pids: split_tunnel::PidManager, - rx: Wait<UnboundedReceiver<InternalDaemonEvent>>, + rx: BlockingStream<UnboundedReceiver<InternalDaemonEvent>>, tx: DaemonEventSender, reconnection_job: Option<AbortHandle>, event_listener: L, @@ -478,7 +481,7 @@ pub struct Daemon<L: EventListener> { app_version_info: AppVersionInfo, shutdown_callbacks: Vec<Box<dyn FnOnce()>>, /// oneshot channel that completes once the tunnel state machine has been shut down - tunnel_state_machine_shutdown_signal: oneshot::Receiver<()>, + tunnel_state_machine_shutdown_signal: old_oneshot::Receiver<()>, cache_dir: PathBuf, } @@ -496,7 +499,7 @@ where #[cfg(target_os = "android")] android_context: AndroidContext, ) -> Result<Self, Error> { let (tunnel_state_machine_shutdown_tx, tunnel_state_machine_shutdown_signal) = - oneshot::channel(); + old_oneshot::channel(); let mut rpc_runtime = mullvad_rpc::MullvadRpcRuntime::with_cache_dir(&cache_dir) .map_err(Error::InitRpcFactory)?; @@ -605,7 +608,7 @@ where state: DaemonExecutionState::Running, #[cfg(target_os = "linux")] exclude_pids: split_tunnel::PidManager::new().map_err(Error::InitSplitTunneling)?, - rx: internal_event_rx.wait(), + rx: futures::executor::block_on_stream(internal_event_rx), tx: internal_event_tx, reconnection_job: None, event_listener, @@ -650,7 +653,8 @@ where if self.target_state == TargetState::Secured { self.connect_tunnel(); } - while let Some(Ok(event)) = self.rx.next() { + + while let Some(event) = self.rx.next() { self.handle_event(event); if self.state == DaemonExecutionState::Finished { break; @@ -690,7 +694,7 @@ where /// Shuts down the daemon without shutting down the underlying event listener and the shutdown /// callbacks - fn shutdown(self) -> (L, Vec<Box<dyn FnOnce()>>, oneshot::Receiver<()>) { + fn shutdown(self) -> (L, Vec<Box<dyn FnOnce()>>, old_oneshot::Receiver<()>) { let Daemon { event_listener, shutdown_callbacks, diff --git a/mullvad-daemon/src/management_interface.rs b/mullvad-daemon/src/management_interface.rs index dee8cb3eea..24a213e7cb 100644 --- a/mullvad-daemon/src/management_interface.rs +++ b/mullvad-daemon/src/management_interface.rs @@ -1,6 +1,6 @@ use crate::{DaemonCommand, DaemonCommandSender, EventListener}; -use futures::compat::Future01CompatExt; -use futures01::{future, sync, Future}; +use futures::{channel::oneshot, compat::Future01CompatExt}; +use futures01::Future as OldFuture; use mullvad_management_interface::{ types::{self, daemon_event, management_service_server::ManagementService}, Code, Request, Response, Status, @@ -53,7 +53,6 @@ type EventsListenerReceiver = type EventsListenerSender = tokio02::sync::mpsc::UnboundedSender<Result<types::DaemonEvent, Status>>; -const INVALID_ACCOUNT_TOKEN_MESSAGE: &str = "No valid account token configured"; const INVALID_VOUCHER_MESSAGE: &str = "This voucher code is invalid"; const USED_VOUCHER_MESSAGE: &str = "This voucher code has already been used"; @@ -71,46 +70,35 @@ impl ManagementService for ManagementServiceImpl { async fn connect_tunnel(&self, _: Request<()>) -> ServiceResult<()> { log::debug!("connect_tunnel"); - let (tx, rx) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::SetTargetState(tx, TargetState::Secured)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) - .and_then(|result| match result { - Ok(()) => Ok(Response::new(())), - Err(()) => Err(Status::new( - Code::Unauthenticated, - INVALID_ACCOUNT_TOKEN_MESSAGE, - )), - }) - .compat() - .await + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::SetTargetState(tx, TargetState::Secured))?; + let result = rx.await.map_err(|_| Status::internal("internal error"))?; + match result { + Ok(()) => Ok(Response::new(())), + Err(()) => Err(Status::new(Code::from(-900), "No account token configured")), + } } async fn disconnect_tunnel(&self, _: Request<()>) -> ServiceResult<()> { log::debug!("disconnect_tunnel"); - let (tx, _) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::SetTargetState(tx, TargetState::Unsecured)) - .then(|_| Ok(Response::new(()))) - .compat() - .await + let (tx, _) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::SetTargetState(tx, TargetState::Unsecured))?; + Ok(Response::new(())) } async fn reconnect_tunnel(&self, _: Request<()>) -> ServiceResult<()> { log::debug!("reconnect_tunnel"); - self.send_command_to_daemon(DaemonCommand::Reconnect) - .map(Response::new) - .compat() - .await + self.send_command_to_daemon(DaemonCommand::Reconnect)?; + Ok(Response::new(())) } async fn get_tunnel_state(&self, _: Request<()>) -> ServiceResult<types::TunnelState> { log::debug!("get_tunnel_state"); - let (tx, rx) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::GetState(tx)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) - .and_then(|state| Ok(Response::new(convert_state(state)))) - .compat() - .await + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::GetState(tx))?; + let state = rx.await.map_err(|_| Status::internal("internal error"))?; + Ok(Response::new(convert_state(state))) } // Control the daemon and receive events @@ -127,30 +115,25 @@ impl ManagementService for ManagementServiceImpl { async fn prepare_restart(&self, _: Request<()>) -> ServiceResult<()> { log::debug!("prepare_restart"); - self.send_command_to_daemon(DaemonCommand::PrepareRestart) - .map(Response::new) - .compat() - .await + self.send_command_to_daemon(DaemonCommand::PrepareRestart)?; + Ok(Response::new(())) } async fn shutdown(&self, _: Request<()>) -> ServiceResult<()> { log::debug!("shutdown"); - self.send_command_to_daemon(DaemonCommand::Shutdown) - .map(Response::new) - .compat() - .await + self.send_command_to_daemon(DaemonCommand::Shutdown)?; + Ok(Response::new(())) } async fn factory_reset(&self, _: Request<()>) -> ServiceResult<()> { #[cfg(not(target_os = "android"))] { log::debug!("factory_reset"); - let (tx, rx) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::FactoryReset(tx)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::FactoryReset(tx))?; + rx.await .map(Response::new) - .compat() - .await + .map_err(|_| Status::internal("internal error")) } #[cfg(target_os = "android")] { @@ -160,25 +143,22 @@ impl ManagementService for ManagementServiceImpl { async fn get_current_version(&self, _: Request<()>) -> ServiceResult<String> { log::debug!("get_current_version"); - let (tx, rx) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::GetCurrentVersion(tx)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::GetCurrentVersion(tx))?; + rx.await .map(Response::new) - .compat() - .await + .map_err(|_| Status::internal("internal error")) } async fn get_version_info(&self, _: Request<()>) -> ServiceResult<types::AppVersionInfo> { log::debug!("get_version_info"); - let (tx, rx) = sync::oneshot::channel(); - let app_version_info = self - .send_command_to_daemon(DaemonCommand::GetVersionInfo(tx)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) - .compat() - .await?; - - Ok(Response::new(convert_version_info(&app_version_info))) + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::GetVersionInfo(tx))?; + rx.await + .map_err(|_| Status::internal("internal error")) + .map(|version_info| convert_version_info(&version_info)) + .map(Response::new) } // Relays and tunnel constraints @@ -186,10 +166,8 @@ impl ManagementService for ManagementServiceImpl { async fn update_relay_locations(&self, _: Request<()>) -> ServiceResult<()> { log::debug!("update_relay_locations"); - self.send_command_to_daemon(DaemonCommand::UpdateRelayLocations) - .compat() - .await - .map(Response::new) + self.send_command_to_daemon(DaemonCommand::UpdateRelayLocations)?; + Ok(Response::new(())) } async fn update_relay_settings( @@ -197,15 +175,14 @@ impl ManagementService for ManagementServiceImpl { request: Request<types::RelaySettingsUpdate>, ) -> ServiceResult<()> { log::debug!("update_relay_settings"); - let (tx, rx) = sync::oneshot::channel(); + let (tx, rx) = oneshot::channel(); let constraints_update = convert_relay_settings_update(&request.into_inner())?; let message = DaemonCommand::UpdateRelaySettings(tx, constraints_update); - self.send_command_to_daemon(message) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) + self.send_command_to_daemon(message)?; + rx.await .map(Response::new) - .compat() - .await + .map_err(|_| Status::internal("internal error")) } async fn get_relay_locations( @@ -214,12 +191,9 @@ impl ManagementService for ManagementServiceImpl { ) -> ServiceResult<Self::GetRelayLocationsStream> { log::debug!("get_relay_locations"); - let (tx, rx) = sync::oneshot::channel(); - let locations = self - .send_command_to_daemon(DaemonCommand::GetRelayLocations(tx)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) - .compat() - .await?; + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::GetRelayLocations(tx))?; + let locations = rx.await.map_err(|_| Status::internal("internal error"))?; let (mut stream_tx, stream_rx) = tokio02::sync::mpsc::channel(cmp::max(1, locations.countries.len())); @@ -243,18 +217,13 @@ impl ManagementService for ManagementServiceImpl { async fn get_current_location(&self, _: Request<()>) -> ServiceResult<types::GeoIpLocation> { log::debug!("get_current_location"); - let (tx, rx) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::GetCurrentLocation(tx)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) - .and_then(|geoip| { - if let Some(geoip) = geoip { - Ok(Response::new(convert_geoip_location(geoip))) - } else { - Err(Status::not_found("no location was found")) - } - }) - .compat() - .await + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::GetCurrentLocation(tx))?; + let result = rx.await.map_err(|_| Status::internal("internal error"))?; + match result { + Some(geoip) => Ok(Response::new(convert_geoip_location(geoip))), + None => Err(Status::not_found("no location was found")), + } } async fn set_bridge_settings( @@ -328,15 +297,12 @@ impl ManagementService for ManagementServiceImpl { log::debug!("set_bridge_settings({:?})", settings); - let (tx, rx) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::SetBridgeSettings(tx, settings)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) - .and_then(|settings_result| { - settings_result.map_err(|_| Status::internal("internal error")) - }) + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::SetBridgeSettings(tx, settings))?; + let settings_result = rx.await.map_err(|_| Status::internal("internal error"))?; + settings_result .map(Response::new) - .compat() - .await + .map_err(|_| Status::internal("internal error")) } async fn set_bridge_state(&self, request: Request<types::BridgeState>) -> ServiceResult<()> { @@ -350,15 +316,12 @@ impl ManagementService for ManagementServiceImpl { }; log::debug!("set_bridge_state({:?})", bridge_state); - let (tx, rx) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::SetBridgeState(tx, bridge_state)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) - .and_then(|settings_result| { - settings_result.map_err(|_| Status::internal("internal error")) - }) + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::SetBridgeState(tx, bridge_state))?; + let settings_result = rx.await.map_err(|_| Status::internal("internal error"))?; + settings_result .map(Response::new) - .compat() - .await + .map_err(|_| Status::internal("internal error")) } // Settings @@ -366,59 +329,54 @@ impl ManagementService for ManagementServiceImpl { async fn get_settings(&self, _: Request<()>) -> ServiceResult<types::Settings> { log::debug!("get_settings"); - let (tx, rx) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::GetSettings(tx)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::GetSettings(tx))?; + rx.await .map(|settings| Response::new(convert_settings(&settings))) - .compat() - .await + .map_err(|_| Status::internal("internal error")) } async fn set_allow_lan(&self, request: Request<bool>) -> ServiceResult<()> { let allow_lan = request.into_inner(); log::debug!("set_allow_lan({})", allow_lan); - let (tx, rx) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::SetAllowLan(tx, allow_lan)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::SetAllowLan(tx, allow_lan))?; + rx.await .map(Response::new) - .compat() - .await + .map_err(|_| Status::internal("internal error")) } async fn set_show_beta_releases(&self, request: Request<bool>) -> ServiceResult<()> { let enabled = request.into_inner(); log::debug!("set_show_beta_releases({})", enabled); - let (tx, rx) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::SetShowBetaReleases(tx, enabled)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::SetShowBetaReleases(tx, enabled))?; + rx.await .map(Response::new) - .compat() - .await + .map_err(|_| Status::internal("internal error")) } async fn set_block_when_disconnected(&self, request: Request<bool>) -> ServiceResult<()> { let block_when_disconnected = request.into_inner(); log::debug!("set_block_when_disconnected({})", block_when_disconnected); - let (tx, rx) = sync::oneshot::channel(); + let (tx, rx) = oneshot::channel(); self.send_command_to_daemon(DaemonCommand::SetBlockWhenDisconnected( tx, block_when_disconnected, - )) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) - .map(Response::new) - .compat() - .await + ))?; + rx.await + .map(Response::new) + .map_err(|_| Status::internal("internal error")) } async fn set_auto_connect(&self, request: Request<bool>) -> ServiceResult<()> { let auto_connect = request.into_inner(); log::debug!("set_auto_connect({})", auto_connect); - let (tx, rx) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::SetAutoConnect(tx, auto_connect)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::SetAutoConnect(tx, auto_connect))?; + rx.await .map(Response::new) - .compat() - .await + .map_err(|_| Status::internal("internal error")) } async fn set_openvpn_mssfix(&self, request: Request<u32>) -> ServiceResult<()> { @@ -429,50 +387,45 @@ impl ManagementService for ManagementServiceImpl { None }; log::debug!("set_openvpn_mssfix({:?})", mssfix); - let (tx, rx) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::SetOpenVpnMssfix(tx, mssfix)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::SetOpenVpnMssfix(tx, mssfix))?; + rx.await .map(Response::new) - .compat() - .await + .map_err(|_| Status::internal("internal error")) } async fn set_wireguard_mtu(&self, request: Request<u32>) -> ServiceResult<()> { let mtu = request.into_inner(); let mtu = if mtu != 0 { Some(mtu as u16) } else { None }; log::debug!("set_wireguard_mtu({:?})", mtu); - let (tx, rx) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::SetWireguardMtu(tx, mtu)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::SetWireguardMtu(tx, mtu))?; + rx.await .map(Response::new) - .compat() - .await + .map_err(|_| Status::internal("internal error")) } async fn set_enable_ipv6(&self, request: Request<bool>) -> ServiceResult<()> { let enable_ipv6 = request.into_inner(); log::debug!("set_enable_ipv6({})", enable_ipv6); - let (tx, rx) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::SetEnableIpv6(tx, enable_ipv6)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::SetEnableIpv6(tx, enable_ipv6))?; + rx.await .map(Response::new) - .compat() - .await + .map_err(|_| Status::internal("internal error")) } // Account management // async fn create_new_account(&self, _: Request<()>) -> ServiceResult<String> { - let (tx, rx) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::CreateNewAccount(tx)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) - .and_then(|result| match result { - Ok(account_token) => Ok(Response::new(account_token)), - Err(_) => Err(Status::internal("internal error")), - }) - .compat() - .await + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::CreateNewAccount(tx))?; + let result = rx.await.map_err(|_| Status::internal("internal error"))?; + match result { + Ok(account_token) => Ok(Response::new(account_token)), + Err(_) => Err(Status::internal("internal error")), + } } async fn set_account(&self, request: Request<AccountToken>) -> ServiceResult<()> { @@ -483,14 +436,11 @@ impl ManagementService for ManagementServiceImpl { } else { Some(account_token) }; - let (tx, rx) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::SetAccount(tx, account_token)) - .and_then(|_| { - rx.map(Response::new) - .map_err(|_| Status::internal("internal error")) - }) - .compat() - .await + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::SetAccount(tx, account_token))?; + rx.await + .map(Response::new) + .map_err(|_| Status::internal("internal error")) } async fn get_account_data( @@ -499,26 +449,24 @@ impl ManagementService for ManagementServiceImpl { ) -> ServiceResult<types::AccountData> { log::debug!("get_account_data"); let account_token = request.into_inner(); - let (tx, rx) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::GetAccountData(tx, account_token)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) - .and_then(|rpc_future| { - rpc_future - .map(|account_data| { - Response::new(types::AccountData { - expiry: Some(types::Timestamp { - seconds: account_data.expiry.timestamp(), - nanos: 0, - }), - }) - }) - .map_err(|error: RestError| { - log::error!( - "Unable to get account data from API: {}", - error.display_chain() - ); - map_rest_account_error(error) - }) + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::GetAccountData(tx, account_token))?; + let rpc_future = rx.await.map_err(|_| Status::internal("internal error"))?; + rpc_future + .map(|account_data| { + Response::new(types::AccountData { + expiry: Some(types::Timestamp { + seconds: account_data.expiry.timestamp(), + nanos: 0, + }), + }) + }) + .map_err(|error: RestError| { + log::error!( + "Unable to get account data from API: {}", + error.display_chain() + ); + map_rest_account_error(error) }) .compat() .await @@ -527,12 +475,11 @@ impl ManagementService for ManagementServiceImpl { async fn get_account_history(&self, _: Request<()>) -> ServiceResult<types::AccountHistory> { // TODO: this might be a stream log::debug!("get_account_history"); - let (tx, rx) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::GetAccountHistory(tx)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::GetAccountHistory(tx))?; + rx.await + .map_err(|_| Status::internal("internal error")) .map(|history| Response::new(types::AccountHistory { token: history })) - .compat() - .await } async fn remove_account_from_history( @@ -541,39 +488,35 @@ impl ManagementService for ManagementServiceImpl { ) -> ServiceResult<()> { log::debug!("remove_account_from_history"); let account_token = request.into_inner(); - let (tx, rx) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::RemoveAccountFromHistory(tx, account_token)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::RemoveAccountFromHistory(tx, account_token))?; + rx.await .map(Response::new) - .compat() - .await + .map_err(|_| Status::internal("internal error")) } async fn clear_account_history(&self, _: Request<()>) -> ServiceResult<()> { log::debug!("clear_account_history"); - let (tx, rx) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::ClearAccountHistory(tx)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::ClearAccountHistory(tx))?; + rx.await .map(Response::new) - .compat() - .await + .map_err(|_| Status::internal("internal error")) } async fn get_www_auth_token(&self, _: Request<()>) -> ServiceResult<String> { log::debug!("get_www_auth_token"); - let (tx, rx) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::GetWwwAuthToken(tx)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) - .and_then(|rpc_future| { - rpc_future - .map(Response::new) - .map_err(|error: mullvad_rpc::rest::Error| { - log::error!( - "Unable to get account data from API: {}", - error.display_chain() - ); - map_rest_account_error(error) - }) + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::GetWwwAuthToken(tx))?; + let rpc_future = rx.await.map_err(|_| Status::internal("internal error"))?; + rpc_future + .map(Response::new) + .map_err(|error: mullvad_rpc::rest::Error| { + log::error!( + "Unable to get account data from API: {}", + error.display_chain() + ); + map_rest_account_error(error) }) .compat() .await @@ -585,35 +528,32 @@ impl ManagementService for ManagementServiceImpl { ) -> ServiceResult<types::VoucherSubmission> { log::debug!("submit_voucher"); let voucher = request.into_inner(); - let (tx, rx) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::SubmitVoucher(tx, voucher)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) - .and_then(|f| { - f.map(|submission| { - Response::new(types::VoucherSubmission { - seconds_added: submission.time_added, - new_expiry: Some(types::Timestamp { - seconds: submission.new_expiry.timestamp(), - nanos: 0, - }), - }) + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::SubmitVoucher(tx, voucher))?; + let result = rx.await.map_err(|_| Status::internal("internal error"))?; + result + .map(|submission| { + Response::new(types::VoucherSubmission { + seconds_added: submission.time_added, + new_expiry: Some(types::Timestamp { + seconds: submission.new_expiry.timestamp(), + nanos: 0, + }), }) - .map_err(|e| match e { - RestError::ApiError(StatusCode::BAD_REQUEST, message) => { - match &message.as_str() { - &mullvad_rpc::INVALID_VOUCHER => { - Status::new(Code::NotFound, INVALID_VOUCHER_MESSAGE) - } - - &mullvad_rpc::VOUCHER_USED => { - Status::new(Code::ResourceExhausted, USED_VOUCHER_MESSAGE) - } + }) + .map_err(|e| match e { + RestError::ApiError(StatusCode::BAD_REQUEST, message) => match &message.as_str() { + &mullvad_rpc::INVALID_VOUCHER => { + Status::new(Code::NotFound, INVALID_VOUCHER_MESSAGE) + } - _ => Status::internal("internal error"), - } + &mullvad_rpc::VOUCHER_USED => { + Status::new(Code::ResourceExhausted, USED_VOUCHER_MESSAGE) } + _ => Status::internal("internal error"), - }) + }, + _ => Status::internal("internal error"), }) .compat() .await @@ -626,61 +566,55 @@ impl ManagementService for ManagementServiceImpl { let interval = request.into_inner(); log::debug!("set_wireguard_rotation_interval({:?})", interval); - let (tx, rx) = sync::oneshot::channel(); + let (tx, rx) = oneshot::channel(); self.send_command_to_daemon(DaemonCommand::SetWireguardRotationInterval( tx, Some(interval), - )) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) - .map(Response::new) - .compat() - .await + ))?; + rx.await + .map_err(|_| Status::internal("internal error")) + .map(Response::new) } async fn reset_wireguard_rotation_interval(&self, _: Request<()>) -> ServiceResult<()> { log::debug!("reset_wireguard_rotation_interval"); - let (tx, rx) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::SetWireguardRotationInterval(tx, None)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::SetWireguardRotationInterval(tx, None))?; + rx.await + .map_err(|_| Status::internal("internal error")) .map(Response::new) - .compat() - .await } async fn generate_wireguard_key(&self, _: Request<()>) -> ServiceResult<types::KeygenEvent> { // TODO: return error for TooManyKeys, GenerationFailure // on success, simply return the new key or nil log::debug!("generate_wireguard_key"); - let (tx, rx) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::GenerateWireguardKey(tx)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::GenerateWireguardKey(tx))?; + rx.await + .map_err(|_| Status::internal("internal error")) .map(|event| Response::new(convert_wireguard_key_event(&event))) - .compat() - .await } async fn get_wireguard_key(&self, _: Request<()>) -> ServiceResult<types::PublicKey> { log::debug!("get_wireguard_key"); - let (tx, rx) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::GetWireguardKey(tx)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) - .then(|response| match response { - Ok(Some(key)) => Ok(Response::new(convert_public_key(&key))), - Ok(None) => Err(Status::not_found("no WireGuard key was found")), - Err(e) => Err(e), - }) - .compat() - .await + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::GetWireguardKey(tx))?; + let response = rx.await.map_err(|_| Status::internal("internal error")); + match response { + Ok(Some(key)) => Ok(Response::new(convert_public_key(&key))), + Ok(None) => Err(Status::not_found("no WireGuard key was found")), + Err(e) => Err(e), + } } async fn verify_wireguard_key(&self, _: Request<()>) -> ServiceResult<bool> { log::debug!("verify_wireguard_key"); - let (tx, rx) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::VerifyWireguardKey(tx)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::VerifyWireguardKey(tx))?; + rx.await + .map_err(|_| Status::internal("internal error")) .map(Response::new) - .compat() - .await } // Split tunneling @@ -693,12 +627,9 @@ impl ManagementService for ManagementServiceImpl { #[cfg(target_os = "linux")] { log::debug!("get_split_tunnel_processes"); - let (tx, rx) = sync::oneshot::channel(); - let pids = self - .send_command_to_daemon(DaemonCommand::GetSplitTunnelProcesses(tx)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) - .compat() - .await?; + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::GetSplitTunnelProcesses(tx))?; + let pids = rx.await.map_err(|_| Status::internal("internal error"))?; let (tx, rx) = tokio02::sync::mpsc::unbounded_channel(); tokio02::spawn(async move { @@ -720,12 +651,11 @@ impl ManagementService for ManagementServiceImpl { async fn add_split_tunnel_process(&self, request: Request<i32>) -> ServiceResult<()> { let pid = request.into_inner(); log::debug!("add_split_tunnel_process"); - let (tx, rx) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::AddSplitTunnelProcess(tx, pid)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::AddSplitTunnelProcess(tx, pid))?; + rx.await + .map_err(|_| Status::internal("internal error")) .map(Response::new) - .compat() - .await } #[cfg(not(target_os = "linux"))] async fn add_split_tunnel_process(&self, _: Request<i32>) -> ServiceResult<()> { @@ -736,12 +666,11 @@ impl ManagementService for ManagementServiceImpl { async fn remove_split_tunnel_process(&self, request: Request<i32>) -> ServiceResult<()> { let pid = request.into_inner(); log::debug!("remove_split_tunnel_process"); - let (tx, rx) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::RemoveSplitTunnelProcess(tx, pid)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::RemoveSplitTunnelProcess(tx, pid))?; + rx.await + .map_err(|_| Status::internal("internal error")) .map(Response::new) - .compat() - .await } #[cfg(not(target_os = "linux"))] async fn remove_split_tunnel_process(&self, _: Request<i32>) -> ServiceResult<()> { @@ -752,12 +681,11 @@ impl ManagementService for ManagementServiceImpl { #[cfg(target_os = "linux")] { log::debug!("clear_split_tunnel_processes"); - let (tx, rx) = sync::oneshot::channel(); - self.send_command_to_daemon(DaemonCommand::ClearSplitTunnelProcesses(tx)) - .and_then(|_| rx.map_err(|_| Status::internal("internal error"))) + let (tx, rx) = oneshot::channel(); + self.send_command_to_daemon(DaemonCommand::ClearSplitTunnelProcesses(tx))?; + rx.await + .map_err(|_| Status::internal("internal error")) .map(Response::new) - .compat() - .await } #[cfg(not(target_os = "linux"))] { @@ -768,15 +696,10 @@ impl ManagementService for ManagementServiceImpl { impl ManagementServiceImpl { /// Sends a command to the daemon and maps the error to an RPC error. - fn send_command_to_daemon( - &self, - command: DaemonCommand, - ) -> impl Future<Item = (), Error = Status> { - future::result( - self.daemon_tx - .send(command) - .map_err(|_| Status::internal("internal error")), - ) + fn send_command_to_daemon(&self, command: DaemonCommand) -> Result<(), Status> { + self.daemon_tx + .send(command) + .map_err(|_| Status::internal("internal error")) } } |
