summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--mullvad-daemon/src/lib.rs32
-rw-r--r--mullvad-daemon/src/management_interface.rs505
2 files changed, 232 insertions, 305 deletions
diff --git a/mullvad-daemon/src/lib.rs b/mullvad-daemon/src/lib.rs
index 414d64e267..c987e0d5c1 100644
--- a/mullvad-daemon/src/lib.rs
+++ b/mullvad-daemon/src/lib.rs
@@ -18,15 +18,18 @@ mod settings;
pub mod version;
mod version_check;
-use futures::future::{abortable, AbortHandle};
-use futures01::{
- future::{self, Executor},
- stream::Wait,
- sync::{
+use futures::{
+ channel::{
mpsc::{UnboundedReceiver, UnboundedSender},
oneshot,
},
- Future, Stream,
+ executor::BlockingStream,
+ future::{abortable, AbortHandle},
+};
+use futures01::{
+ future::{self, Executor},
+ sync::{mpsc as old_mpsc, oneshot as old_oneshot},
+ Future,
};
use log::{debug, error, info, warn};
use mullvad_rpc::AccountsProxy;
@@ -343,7 +346,7 @@ pub struct DaemonCommandChannel {
impl DaemonCommandChannel {
pub fn new() -> Self {
- let (untracked_sender, receiver) = futures01::sync::mpsc::unbounded();
+ let (untracked_sender, receiver) = futures::channel::mpsc::unbounded();
let sender = DaemonCommandSender(Arc::new(untracked_sender));
Self { sender, receiver }
@@ -454,13 +457,13 @@ pub trait EventListener {
}
pub struct Daemon<L: EventListener> {
- tunnel_command_tx: Arc<UnboundedSender<TunnelCommand>>,
+ tunnel_command_tx: Arc<old_mpsc::UnboundedSender<TunnelCommand>>,
tunnel_state: TunnelState,
target_state: TargetState,
state: DaemonExecutionState,
#[cfg(target_os = "linux")]
exclude_pids: split_tunnel::PidManager,
- rx: Wait<UnboundedReceiver<InternalDaemonEvent>>,
+ rx: BlockingStream<UnboundedReceiver<InternalDaemonEvent>>,
tx: DaemonEventSender,
reconnection_job: Option<AbortHandle>,
event_listener: L,
@@ -478,7 +481,7 @@ pub struct Daemon<L: EventListener> {
app_version_info: AppVersionInfo,
shutdown_callbacks: Vec<Box<dyn FnOnce()>>,
/// oneshot channel that completes once the tunnel state machine has been shut down
- tunnel_state_machine_shutdown_signal: oneshot::Receiver<()>,
+ tunnel_state_machine_shutdown_signal: old_oneshot::Receiver<()>,
cache_dir: PathBuf,
}
@@ -496,7 +499,7 @@ where
#[cfg(target_os = "android")] android_context: AndroidContext,
) -> Result<Self, Error> {
let (tunnel_state_machine_shutdown_tx, tunnel_state_machine_shutdown_signal) =
- oneshot::channel();
+ old_oneshot::channel();
let mut rpc_runtime = mullvad_rpc::MullvadRpcRuntime::with_cache_dir(&cache_dir)
.map_err(Error::InitRpcFactory)?;
@@ -605,7 +608,7 @@ where
state: DaemonExecutionState::Running,
#[cfg(target_os = "linux")]
exclude_pids: split_tunnel::PidManager::new().map_err(Error::InitSplitTunneling)?,
- rx: internal_event_rx.wait(),
+ rx: futures::executor::block_on_stream(internal_event_rx),
tx: internal_event_tx,
reconnection_job: None,
event_listener,
@@ -650,7 +653,8 @@ where
if self.target_state == TargetState::Secured {
self.connect_tunnel();
}
- while let Some(Ok(event)) = self.rx.next() {
+
+ while let Some(event) = self.rx.next() {
self.handle_event(event);
if self.state == DaemonExecutionState::Finished {
break;
@@ -690,7 +694,7 @@ where
/// Shuts down the daemon without shutting down the underlying event listener and the shutdown
/// callbacks
- fn shutdown(self) -> (L, Vec<Box<dyn FnOnce()>>, oneshot::Receiver<()>) {
+ fn shutdown(self) -> (L, Vec<Box<dyn FnOnce()>>, old_oneshot::Receiver<()>) {
let Daemon {
event_listener,
shutdown_callbacks,
diff --git a/mullvad-daemon/src/management_interface.rs b/mullvad-daemon/src/management_interface.rs
index dee8cb3eea..24a213e7cb 100644
--- a/mullvad-daemon/src/management_interface.rs
+++ b/mullvad-daemon/src/management_interface.rs
@@ -1,6 +1,6 @@
use crate::{DaemonCommand, DaemonCommandSender, EventListener};
-use futures::compat::Future01CompatExt;
-use futures01::{future, sync, Future};
+use futures::{channel::oneshot, compat::Future01CompatExt};
+use futures01::Future as OldFuture;
use mullvad_management_interface::{
types::{self, daemon_event, management_service_server::ManagementService},
Code, Request, Response, Status,
@@ -53,7 +53,6 @@ type EventsListenerReceiver =
type EventsListenerSender =
tokio02::sync::mpsc::UnboundedSender<Result<types::DaemonEvent, Status>>;
-const INVALID_ACCOUNT_TOKEN_MESSAGE: &str = "No valid account token configured";
const INVALID_VOUCHER_MESSAGE: &str = "This voucher code is invalid";
const USED_VOUCHER_MESSAGE: &str = "This voucher code has already been used";
@@ -71,46 +70,35 @@ impl ManagementService for ManagementServiceImpl {
async fn connect_tunnel(&self, _: Request<()>) -> ServiceResult<()> {
log::debug!("connect_tunnel");
- let (tx, rx) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::SetTargetState(tx, TargetState::Secured))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
- .and_then(|result| match result {
- Ok(()) => Ok(Response::new(())),
- Err(()) => Err(Status::new(
- Code::Unauthenticated,
- INVALID_ACCOUNT_TOKEN_MESSAGE,
- )),
- })
- .compat()
- .await
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::SetTargetState(tx, TargetState::Secured))?;
+ let result = rx.await.map_err(|_| Status::internal("internal error"))?;
+ match result {
+ Ok(()) => Ok(Response::new(())),
+ Err(()) => Err(Status::new(Code::from(-900), "No account token configured")),
+ }
}
async fn disconnect_tunnel(&self, _: Request<()>) -> ServiceResult<()> {
log::debug!("disconnect_tunnel");
- let (tx, _) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::SetTargetState(tx, TargetState::Unsecured))
- .then(|_| Ok(Response::new(())))
- .compat()
- .await
+ let (tx, _) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::SetTargetState(tx, TargetState::Unsecured))?;
+ Ok(Response::new(()))
}
async fn reconnect_tunnel(&self, _: Request<()>) -> ServiceResult<()> {
log::debug!("reconnect_tunnel");
- self.send_command_to_daemon(DaemonCommand::Reconnect)
- .map(Response::new)
- .compat()
- .await
+ self.send_command_to_daemon(DaemonCommand::Reconnect)?;
+ Ok(Response::new(()))
}
async fn get_tunnel_state(&self, _: Request<()>) -> ServiceResult<types::TunnelState> {
log::debug!("get_tunnel_state");
- let (tx, rx) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::GetState(tx))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
- .and_then(|state| Ok(Response::new(convert_state(state))))
- .compat()
- .await
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::GetState(tx))?;
+ let state = rx.await.map_err(|_| Status::internal("internal error"))?;
+ Ok(Response::new(convert_state(state)))
}
// Control the daemon and receive events
@@ -127,30 +115,25 @@ impl ManagementService for ManagementServiceImpl {
async fn prepare_restart(&self, _: Request<()>) -> ServiceResult<()> {
log::debug!("prepare_restart");
- self.send_command_to_daemon(DaemonCommand::PrepareRestart)
- .map(Response::new)
- .compat()
- .await
+ self.send_command_to_daemon(DaemonCommand::PrepareRestart)?;
+ Ok(Response::new(()))
}
async fn shutdown(&self, _: Request<()>) -> ServiceResult<()> {
log::debug!("shutdown");
- self.send_command_to_daemon(DaemonCommand::Shutdown)
- .map(Response::new)
- .compat()
- .await
+ self.send_command_to_daemon(DaemonCommand::Shutdown)?;
+ Ok(Response::new(()))
}
async fn factory_reset(&self, _: Request<()>) -> ServiceResult<()> {
#[cfg(not(target_os = "android"))]
{
log::debug!("factory_reset");
- let (tx, rx) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::FactoryReset(tx))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::FactoryReset(tx))?;
+ rx.await
.map(Response::new)
- .compat()
- .await
+ .map_err(|_| Status::internal("internal error"))
}
#[cfg(target_os = "android")]
{
@@ -160,25 +143,22 @@ impl ManagementService for ManagementServiceImpl {
async fn get_current_version(&self, _: Request<()>) -> ServiceResult<String> {
log::debug!("get_current_version");
- let (tx, rx) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::GetCurrentVersion(tx))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::GetCurrentVersion(tx))?;
+ rx.await
.map(Response::new)
- .compat()
- .await
+ .map_err(|_| Status::internal("internal error"))
}
async fn get_version_info(&self, _: Request<()>) -> ServiceResult<types::AppVersionInfo> {
log::debug!("get_version_info");
- let (tx, rx) = sync::oneshot::channel();
- let app_version_info = self
- .send_command_to_daemon(DaemonCommand::GetVersionInfo(tx))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
- .compat()
- .await?;
-
- Ok(Response::new(convert_version_info(&app_version_info)))
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::GetVersionInfo(tx))?;
+ rx.await
+ .map_err(|_| Status::internal("internal error"))
+ .map(|version_info| convert_version_info(&version_info))
+ .map(Response::new)
}
// Relays and tunnel constraints
@@ -186,10 +166,8 @@ impl ManagementService for ManagementServiceImpl {
async fn update_relay_locations(&self, _: Request<()>) -> ServiceResult<()> {
log::debug!("update_relay_locations");
- self.send_command_to_daemon(DaemonCommand::UpdateRelayLocations)
- .compat()
- .await
- .map(Response::new)
+ self.send_command_to_daemon(DaemonCommand::UpdateRelayLocations)?;
+ Ok(Response::new(()))
}
async fn update_relay_settings(
@@ -197,15 +175,14 @@ impl ManagementService for ManagementServiceImpl {
request: Request<types::RelaySettingsUpdate>,
) -> ServiceResult<()> {
log::debug!("update_relay_settings");
- let (tx, rx) = sync::oneshot::channel();
+ let (tx, rx) = oneshot::channel();
let constraints_update = convert_relay_settings_update(&request.into_inner())?;
let message = DaemonCommand::UpdateRelaySettings(tx, constraints_update);
- self.send_command_to_daemon(message)
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
+ self.send_command_to_daemon(message)?;
+ rx.await
.map(Response::new)
- .compat()
- .await
+ .map_err(|_| Status::internal("internal error"))
}
async fn get_relay_locations(
@@ -214,12 +191,9 @@ impl ManagementService for ManagementServiceImpl {
) -> ServiceResult<Self::GetRelayLocationsStream> {
log::debug!("get_relay_locations");
- let (tx, rx) = sync::oneshot::channel();
- let locations = self
- .send_command_to_daemon(DaemonCommand::GetRelayLocations(tx))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
- .compat()
- .await?;
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::GetRelayLocations(tx))?;
+ let locations = rx.await.map_err(|_| Status::internal("internal error"))?;
let (mut stream_tx, stream_rx) =
tokio02::sync::mpsc::channel(cmp::max(1, locations.countries.len()));
@@ -243,18 +217,13 @@ impl ManagementService for ManagementServiceImpl {
async fn get_current_location(&self, _: Request<()>) -> ServiceResult<types::GeoIpLocation> {
log::debug!("get_current_location");
- let (tx, rx) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::GetCurrentLocation(tx))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
- .and_then(|geoip| {
- if let Some(geoip) = geoip {
- Ok(Response::new(convert_geoip_location(geoip)))
- } else {
- Err(Status::not_found("no location was found"))
- }
- })
- .compat()
- .await
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::GetCurrentLocation(tx))?;
+ let result = rx.await.map_err(|_| Status::internal("internal error"))?;
+ match result {
+ Some(geoip) => Ok(Response::new(convert_geoip_location(geoip))),
+ None => Err(Status::not_found("no location was found")),
+ }
}
async fn set_bridge_settings(
@@ -328,15 +297,12 @@ impl ManagementService for ManagementServiceImpl {
log::debug!("set_bridge_settings({:?})", settings);
- let (tx, rx) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::SetBridgeSettings(tx, settings))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
- .and_then(|settings_result| {
- settings_result.map_err(|_| Status::internal("internal error"))
- })
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::SetBridgeSettings(tx, settings))?;
+ let settings_result = rx.await.map_err(|_| Status::internal("internal error"))?;
+ settings_result
.map(Response::new)
- .compat()
- .await
+ .map_err(|_| Status::internal("internal error"))
}
async fn set_bridge_state(&self, request: Request<types::BridgeState>) -> ServiceResult<()> {
@@ -350,15 +316,12 @@ impl ManagementService for ManagementServiceImpl {
};
log::debug!("set_bridge_state({:?})", bridge_state);
- let (tx, rx) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::SetBridgeState(tx, bridge_state))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
- .and_then(|settings_result| {
- settings_result.map_err(|_| Status::internal("internal error"))
- })
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::SetBridgeState(tx, bridge_state))?;
+ let settings_result = rx.await.map_err(|_| Status::internal("internal error"))?;
+ settings_result
.map(Response::new)
- .compat()
- .await
+ .map_err(|_| Status::internal("internal error"))
}
// Settings
@@ -366,59 +329,54 @@ impl ManagementService for ManagementServiceImpl {
async fn get_settings(&self, _: Request<()>) -> ServiceResult<types::Settings> {
log::debug!("get_settings");
- let (tx, rx) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::GetSettings(tx))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::GetSettings(tx))?;
+ rx.await
.map(|settings| Response::new(convert_settings(&settings)))
- .compat()
- .await
+ .map_err(|_| Status::internal("internal error"))
}
async fn set_allow_lan(&self, request: Request<bool>) -> ServiceResult<()> {
let allow_lan = request.into_inner();
log::debug!("set_allow_lan({})", allow_lan);
- let (tx, rx) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::SetAllowLan(tx, allow_lan))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::SetAllowLan(tx, allow_lan))?;
+ rx.await
.map(Response::new)
- .compat()
- .await
+ .map_err(|_| Status::internal("internal error"))
}
async fn set_show_beta_releases(&self, request: Request<bool>) -> ServiceResult<()> {
let enabled = request.into_inner();
log::debug!("set_show_beta_releases({})", enabled);
- let (tx, rx) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::SetShowBetaReleases(tx, enabled))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::SetShowBetaReleases(tx, enabled))?;
+ rx.await
.map(Response::new)
- .compat()
- .await
+ .map_err(|_| Status::internal("internal error"))
}
async fn set_block_when_disconnected(&self, request: Request<bool>) -> ServiceResult<()> {
let block_when_disconnected = request.into_inner();
log::debug!("set_block_when_disconnected({})", block_when_disconnected);
- let (tx, rx) = sync::oneshot::channel();
+ let (tx, rx) = oneshot::channel();
self.send_command_to_daemon(DaemonCommand::SetBlockWhenDisconnected(
tx,
block_when_disconnected,
- ))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
- .map(Response::new)
- .compat()
- .await
+ ))?;
+ rx.await
+ .map(Response::new)
+ .map_err(|_| Status::internal("internal error"))
}
async fn set_auto_connect(&self, request: Request<bool>) -> ServiceResult<()> {
let auto_connect = request.into_inner();
log::debug!("set_auto_connect({})", auto_connect);
- let (tx, rx) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::SetAutoConnect(tx, auto_connect))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::SetAutoConnect(tx, auto_connect))?;
+ rx.await
.map(Response::new)
- .compat()
- .await
+ .map_err(|_| Status::internal("internal error"))
}
async fn set_openvpn_mssfix(&self, request: Request<u32>) -> ServiceResult<()> {
@@ -429,50 +387,45 @@ impl ManagementService for ManagementServiceImpl {
None
};
log::debug!("set_openvpn_mssfix({:?})", mssfix);
- let (tx, rx) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::SetOpenVpnMssfix(tx, mssfix))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::SetOpenVpnMssfix(tx, mssfix))?;
+ rx.await
.map(Response::new)
- .compat()
- .await
+ .map_err(|_| Status::internal("internal error"))
}
async fn set_wireguard_mtu(&self, request: Request<u32>) -> ServiceResult<()> {
let mtu = request.into_inner();
let mtu = if mtu != 0 { Some(mtu as u16) } else { None };
log::debug!("set_wireguard_mtu({:?})", mtu);
- let (tx, rx) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::SetWireguardMtu(tx, mtu))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::SetWireguardMtu(tx, mtu))?;
+ rx.await
.map(Response::new)
- .compat()
- .await
+ .map_err(|_| Status::internal("internal error"))
}
async fn set_enable_ipv6(&self, request: Request<bool>) -> ServiceResult<()> {
let enable_ipv6 = request.into_inner();
log::debug!("set_enable_ipv6({})", enable_ipv6);
- let (tx, rx) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::SetEnableIpv6(tx, enable_ipv6))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::SetEnableIpv6(tx, enable_ipv6))?;
+ rx.await
.map(Response::new)
- .compat()
- .await
+ .map_err(|_| Status::internal("internal error"))
}
// Account management
//
async fn create_new_account(&self, _: Request<()>) -> ServiceResult<String> {
- let (tx, rx) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::CreateNewAccount(tx))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
- .and_then(|result| match result {
- Ok(account_token) => Ok(Response::new(account_token)),
- Err(_) => Err(Status::internal("internal error")),
- })
- .compat()
- .await
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::CreateNewAccount(tx))?;
+ let result = rx.await.map_err(|_| Status::internal("internal error"))?;
+ match result {
+ Ok(account_token) => Ok(Response::new(account_token)),
+ Err(_) => Err(Status::internal("internal error")),
+ }
}
async fn set_account(&self, request: Request<AccountToken>) -> ServiceResult<()> {
@@ -483,14 +436,11 @@ impl ManagementService for ManagementServiceImpl {
} else {
Some(account_token)
};
- let (tx, rx) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::SetAccount(tx, account_token))
- .and_then(|_| {
- rx.map(Response::new)
- .map_err(|_| Status::internal("internal error"))
- })
- .compat()
- .await
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::SetAccount(tx, account_token))?;
+ rx.await
+ .map(Response::new)
+ .map_err(|_| Status::internal("internal error"))
}
async fn get_account_data(
@@ -499,26 +449,24 @@ impl ManagementService for ManagementServiceImpl {
) -> ServiceResult<types::AccountData> {
log::debug!("get_account_data");
let account_token = request.into_inner();
- let (tx, rx) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::GetAccountData(tx, account_token))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
- .and_then(|rpc_future| {
- rpc_future
- .map(|account_data| {
- Response::new(types::AccountData {
- expiry: Some(types::Timestamp {
- seconds: account_data.expiry.timestamp(),
- nanos: 0,
- }),
- })
- })
- .map_err(|error: RestError| {
- log::error!(
- "Unable to get account data from API: {}",
- error.display_chain()
- );
- map_rest_account_error(error)
- })
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::GetAccountData(tx, account_token))?;
+ let rpc_future = rx.await.map_err(|_| Status::internal("internal error"))?;
+ rpc_future
+ .map(|account_data| {
+ Response::new(types::AccountData {
+ expiry: Some(types::Timestamp {
+ seconds: account_data.expiry.timestamp(),
+ nanos: 0,
+ }),
+ })
+ })
+ .map_err(|error: RestError| {
+ log::error!(
+ "Unable to get account data from API: {}",
+ error.display_chain()
+ );
+ map_rest_account_error(error)
})
.compat()
.await
@@ -527,12 +475,11 @@ impl ManagementService for ManagementServiceImpl {
async fn get_account_history(&self, _: Request<()>) -> ServiceResult<types::AccountHistory> {
// TODO: this might be a stream
log::debug!("get_account_history");
- let (tx, rx) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::GetAccountHistory(tx))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::GetAccountHistory(tx))?;
+ rx.await
+ .map_err(|_| Status::internal("internal error"))
.map(|history| Response::new(types::AccountHistory { token: history }))
- .compat()
- .await
}
async fn remove_account_from_history(
@@ -541,39 +488,35 @@ impl ManagementService for ManagementServiceImpl {
) -> ServiceResult<()> {
log::debug!("remove_account_from_history");
let account_token = request.into_inner();
- let (tx, rx) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::RemoveAccountFromHistory(tx, account_token))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::RemoveAccountFromHistory(tx, account_token))?;
+ rx.await
.map(Response::new)
- .compat()
- .await
+ .map_err(|_| Status::internal("internal error"))
}
async fn clear_account_history(&self, _: Request<()>) -> ServiceResult<()> {
log::debug!("clear_account_history");
- let (tx, rx) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::ClearAccountHistory(tx))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::ClearAccountHistory(tx))?;
+ rx.await
.map(Response::new)
- .compat()
- .await
+ .map_err(|_| Status::internal("internal error"))
}
async fn get_www_auth_token(&self, _: Request<()>) -> ServiceResult<String> {
log::debug!("get_www_auth_token");
- let (tx, rx) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::GetWwwAuthToken(tx))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
- .and_then(|rpc_future| {
- rpc_future
- .map(Response::new)
- .map_err(|error: mullvad_rpc::rest::Error| {
- log::error!(
- "Unable to get account data from API: {}",
- error.display_chain()
- );
- map_rest_account_error(error)
- })
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::GetWwwAuthToken(tx))?;
+ let rpc_future = rx.await.map_err(|_| Status::internal("internal error"))?;
+ rpc_future
+ .map(Response::new)
+ .map_err(|error: mullvad_rpc::rest::Error| {
+ log::error!(
+ "Unable to get account data from API: {}",
+ error.display_chain()
+ );
+ map_rest_account_error(error)
})
.compat()
.await
@@ -585,35 +528,32 @@ impl ManagementService for ManagementServiceImpl {
) -> ServiceResult<types::VoucherSubmission> {
log::debug!("submit_voucher");
let voucher = request.into_inner();
- let (tx, rx) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::SubmitVoucher(tx, voucher))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
- .and_then(|f| {
- f.map(|submission| {
- Response::new(types::VoucherSubmission {
- seconds_added: submission.time_added,
- new_expiry: Some(types::Timestamp {
- seconds: submission.new_expiry.timestamp(),
- nanos: 0,
- }),
- })
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::SubmitVoucher(tx, voucher))?;
+ let result = rx.await.map_err(|_| Status::internal("internal error"))?;
+ result
+ .map(|submission| {
+ Response::new(types::VoucherSubmission {
+ seconds_added: submission.time_added,
+ new_expiry: Some(types::Timestamp {
+ seconds: submission.new_expiry.timestamp(),
+ nanos: 0,
+ }),
})
- .map_err(|e| match e {
- RestError::ApiError(StatusCode::BAD_REQUEST, message) => {
- match &message.as_str() {
- &mullvad_rpc::INVALID_VOUCHER => {
- Status::new(Code::NotFound, INVALID_VOUCHER_MESSAGE)
- }
-
- &mullvad_rpc::VOUCHER_USED => {
- Status::new(Code::ResourceExhausted, USED_VOUCHER_MESSAGE)
- }
+ })
+ .map_err(|e| match e {
+ RestError::ApiError(StatusCode::BAD_REQUEST, message) => match &message.as_str() {
+ &mullvad_rpc::INVALID_VOUCHER => {
+ Status::new(Code::NotFound, INVALID_VOUCHER_MESSAGE)
+ }
- _ => Status::internal("internal error"),
- }
+ &mullvad_rpc::VOUCHER_USED => {
+ Status::new(Code::ResourceExhausted, USED_VOUCHER_MESSAGE)
}
+
_ => Status::internal("internal error"),
- })
+ },
+ _ => Status::internal("internal error"),
})
.compat()
.await
@@ -626,61 +566,55 @@ impl ManagementService for ManagementServiceImpl {
let interval = request.into_inner();
log::debug!("set_wireguard_rotation_interval({:?})", interval);
- let (tx, rx) = sync::oneshot::channel();
+ let (tx, rx) = oneshot::channel();
self.send_command_to_daemon(DaemonCommand::SetWireguardRotationInterval(
tx,
Some(interval),
- ))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
- .map(Response::new)
- .compat()
- .await
+ ))?;
+ rx.await
+ .map_err(|_| Status::internal("internal error"))
+ .map(Response::new)
}
async fn reset_wireguard_rotation_interval(&self, _: Request<()>) -> ServiceResult<()> {
log::debug!("reset_wireguard_rotation_interval");
- let (tx, rx) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::SetWireguardRotationInterval(tx, None))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::SetWireguardRotationInterval(tx, None))?;
+ rx.await
+ .map_err(|_| Status::internal("internal error"))
.map(Response::new)
- .compat()
- .await
}
async fn generate_wireguard_key(&self, _: Request<()>) -> ServiceResult<types::KeygenEvent> {
// TODO: return error for TooManyKeys, GenerationFailure
// on success, simply return the new key or nil
log::debug!("generate_wireguard_key");
- let (tx, rx) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::GenerateWireguardKey(tx))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::GenerateWireguardKey(tx))?;
+ rx.await
+ .map_err(|_| Status::internal("internal error"))
.map(|event| Response::new(convert_wireguard_key_event(&event)))
- .compat()
- .await
}
async fn get_wireguard_key(&self, _: Request<()>) -> ServiceResult<types::PublicKey> {
log::debug!("get_wireguard_key");
- let (tx, rx) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::GetWireguardKey(tx))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
- .then(|response| match response {
- Ok(Some(key)) => Ok(Response::new(convert_public_key(&key))),
- Ok(None) => Err(Status::not_found("no WireGuard key was found")),
- Err(e) => Err(e),
- })
- .compat()
- .await
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::GetWireguardKey(tx))?;
+ let response = rx.await.map_err(|_| Status::internal("internal error"));
+ match response {
+ Ok(Some(key)) => Ok(Response::new(convert_public_key(&key))),
+ Ok(None) => Err(Status::not_found("no WireGuard key was found")),
+ Err(e) => Err(e),
+ }
}
async fn verify_wireguard_key(&self, _: Request<()>) -> ServiceResult<bool> {
log::debug!("verify_wireguard_key");
- let (tx, rx) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::VerifyWireguardKey(tx))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::VerifyWireguardKey(tx))?;
+ rx.await
+ .map_err(|_| Status::internal("internal error"))
.map(Response::new)
- .compat()
- .await
}
// Split tunneling
@@ -693,12 +627,9 @@ impl ManagementService for ManagementServiceImpl {
#[cfg(target_os = "linux")]
{
log::debug!("get_split_tunnel_processes");
- let (tx, rx) = sync::oneshot::channel();
- let pids = self
- .send_command_to_daemon(DaemonCommand::GetSplitTunnelProcesses(tx))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
- .compat()
- .await?;
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::GetSplitTunnelProcesses(tx))?;
+ let pids = rx.await.map_err(|_| Status::internal("internal error"))?;
let (tx, rx) = tokio02::sync::mpsc::unbounded_channel();
tokio02::spawn(async move {
@@ -720,12 +651,11 @@ impl ManagementService for ManagementServiceImpl {
async fn add_split_tunnel_process(&self, request: Request<i32>) -> ServiceResult<()> {
let pid = request.into_inner();
log::debug!("add_split_tunnel_process");
- let (tx, rx) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::AddSplitTunnelProcess(tx, pid))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::AddSplitTunnelProcess(tx, pid))?;
+ rx.await
+ .map_err(|_| Status::internal("internal error"))
.map(Response::new)
- .compat()
- .await
}
#[cfg(not(target_os = "linux"))]
async fn add_split_tunnel_process(&self, _: Request<i32>) -> ServiceResult<()> {
@@ -736,12 +666,11 @@ impl ManagementService for ManagementServiceImpl {
async fn remove_split_tunnel_process(&self, request: Request<i32>) -> ServiceResult<()> {
let pid = request.into_inner();
log::debug!("remove_split_tunnel_process");
- let (tx, rx) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::RemoveSplitTunnelProcess(tx, pid))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::RemoveSplitTunnelProcess(tx, pid))?;
+ rx.await
+ .map_err(|_| Status::internal("internal error"))
.map(Response::new)
- .compat()
- .await
}
#[cfg(not(target_os = "linux"))]
async fn remove_split_tunnel_process(&self, _: Request<i32>) -> ServiceResult<()> {
@@ -752,12 +681,11 @@ impl ManagementService for ManagementServiceImpl {
#[cfg(target_os = "linux")]
{
log::debug!("clear_split_tunnel_processes");
- let (tx, rx) = sync::oneshot::channel();
- self.send_command_to_daemon(DaemonCommand::ClearSplitTunnelProcesses(tx))
- .and_then(|_| rx.map_err(|_| Status::internal("internal error")))
+ let (tx, rx) = oneshot::channel();
+ self.send_command_to_daemon(DaemonCommand::ClearSplitTunnelProcesses(tx))?;
+ rx.await
+ .map_err(|_| Status::internal("internal error"))
.map(Response::new)
- .compat()
- .await
}
#[cfg(not(target_os = "linux"))]
{
@@ -768,15 +696,10 @@ impl ManagementService for ManagementServiceImpl {
impl ManagementServiceImpl {
/// Sends a command to the daemon and maps the error to an RPC error.
- fn send_command_to_daemon(
- &self,
- command: DaemonCommand,
- ) -> impl Future<Item = (), Error = Status> {
- future::result(
- self.daemon_tx
- .send(command)
- .map_err(|_| Status::internal("internal error")),
- )
+ fn send_command_to_daemon(&self, command: DaemonCommand) -> Result<(), Status> {
+ self.daemon_tx
+ .send(command)
+ .map_err(|_| Status::internal("internal error"))
}
}