summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorJanito Vaqueiro Ferreira Filho <janito@mullvad.net>2020-02-24 09:48:13 -0300
committerJanito Vaqueiro Ferreira Filho <janito@mullvad.net>2020-02-24 09:48:13 -0300
commit807de13a4577db8d97b8718f43fe9aef6be84b6d (patch)
treeffb55ef18e6355f9aa6eb5e5a2fee48203ecc6b3
parentec5549a908c580aff9db9218800b0f73fcbe2b6c (diff)
parent57e13fa5107d6a6392c09ba0bedeecf786206b9c (diff)
downloadmullvadvpn-807de13a4577db8d97b8718f43fe9aef6be84b6d.tar.xz
mullvadvpn-807de13a4577db8d97b8718f43fe9aef6be84b6d.zip
Merge branch 'uncouple-management-interface'
-rw-r--r--mullvad-daemon/src/lib.rs427
-rw-r--r--mullvad-daemon/src/main.rs45
-rw-r--r--mullvad-daemon/src/management_interface.rs185
-rw-r--r--mullvad-daemon/src/version_check.rs21
-rw-r--r--mullvad-daemon/src/wireguard.rs27
-rw-r--r--mullvad-jni/src/daemon_interface.rs47
-rw-r--r--mullvad-jni/src/lib.rs25
-rw-r--r--talpid-core/src/mpsc.rs87
-rw-r--r--talpid-core/src/tunnel_state_machine/mod.rs23
9 files changed, 395 insertions, 492 deletions
diff --git a/mullvad-daemon/src/lib.rs b/mullvad-daemon/src/lib.rs
index f5e76d9b99..ffce1b4187 100644
--- a/mullvad-daemon/src/lib.rs
+++ b/mullvad-daemon/src/lib.rs
@@ -7,17 +7,15 @@ extern crate serde;
mod account_history;
mod geoip;
pub mod logging;
-mod management_interface;
+#[cfg(not(target_os = "android"))]
+pub mod management_interface;
mod relays;
-mod rpc_uniqueness_check;
+#[cfg(not(target_os = "android"))]
+pub mod rpc_uniqueness_check;
mod settings;
pub mod version;
mod version_check;
-pub use crate::management_interface::ManagementCommand;
-use crate::management_interface::{
- BoxFuture, ManagementInterfaceEventBroadcaster, ManagementInterfaceServer,
-};
use futures::{
future::{self, Executor},
stream::Wait,
@@ -46,14 +44,16 @@ use settings::Settings;
#[cfg(not(target_os = "android"))]
use std::path::Path;
use std::{
- io, mem,
+ io,
+ marker::PhantomData,
+ mem,
path::PathBuf,
- sync::{mpsc, Arc},
+ sync::{mpsc, Arc, Weak},
thread,
time::Duration,
};
use talpid_core::{
- mpsc::IntoSender,
+ mpsc::Sender,
tunnel_state_machine::{self, TunnelCommand, TunnelParametersGenerator},
};
#[cfg(target_os = "android")]
@@ -67,14 +67,14 @@ use talpid_types::{
#[path = "wireguard.rs"]
mod wireguard;
-pub type Result<T> = std::result::Result<T, Error>;
+/// FIXME(linus): This is here just because the futures crate has deprecated it and jsonrpc_core
+/// did not introduce their own yet (https://github.com/paritytech/jsonrpc/pull/196).
+/// Remove this and use the one in jsonrpc_core when that is released.
+type BoxFuture<T, E> = Box<dyn Future<Item = T, Error = E> + Send>;
#[derive(err_derive::Error, Debug)]
#[error(no_from)]
pub enum Error {
- #[error(display = "Another instance of the daemon is already running")]
- DaemonIsAlreadyRunning,
-
#[error(display = "Failed to send command to daemon because it is not running")]
DaemonUnavailable,
@@ -90,13 +90,6 @@ pub enum Error {
#[error(display = "Unable to load account history with wireguard key cache")]
LoadAccountHistory(#[error(source)] account_history::Error),
- /// Error in the management interface
- #[error(display = "Unable to start management interface server")]
- StartManagementInterface(#[error(source)] talpid_ipc::Error),
-
- #[error(display = "Management interface server exited unexpectedly")]
- ManagementInterfaceExited,
-
#[error(display = "No wireguard private key available")]
NoKeyAvailable,
@@ -131,32 +124,109 @@ pub enum Error {
ReadDirError(#[error(source)] io::Error),
}
+/// Enum representing commands that can be sent to the daemon.
+pub enum DaemonCommand {
+ /// Set target state. Does nothing if the daemon already has the state that is being set.
+ SetTargetState(oneshot::Sender<std::result::Result<(), ()>>, TargetState),
+ /// Reconnect the tunnel, if one is connecting/connected.
+ Reconnect,
+ /// Request the current state.
+ GetState(oneshot::Sender<TunnelState>),
+ /// Get the current geographical location.
+ GetCurrentLocation(oneshot::Sender<Option<GeoIpLocation>>),
+ CreateNewAccount(oneshot::Sender<std::result::Result<String, mullvad_rpc::Error>>),
+ /// Request the metadata for an account.
+ GetAccountData(
+ oneshot::Sender<BoxFuture<AccountData, mullvad_rpc::Error>>,
+ AccountToken,
+ ),
+ /// Request www auth token for an account
+ GetWwwAuthToken(oneshot::Sender<BoxFuture<String, mullvad_rpc::Error>>),
+ /// Submit voucher to add time to the current account. Returns time added in seconds
+ SubmitVoucher(
+ oneshot::Sender<BoxFuture<VoucherSubmission, mullvad_rpc::Error>>,
+ String,
+ ),
+ /// Request account history
+ GetAccountHistory(oneshot::Sender<Vec<AccountToken>>),
+ /// Request account history
+ RemoveAccountFromHistory(oneshot::Sender<()>, AccountToken),
+ /// Get the list of countries and cities where there are relays.
+ GetRelayLocations(oneshot::Sender<RelayList>),
+ /// Trigger an asynchronous relay list update. This returns before the relay list is actually
+ /// updated.
+ UpdateRelayLocations,
+ /// Set which account token to use for subsequent connection attempts.
+ SetAccount(oneshot::Sender<()>, Option<AccountToken>),
+ /// Place constraints on the type of tunnel and relay
+ UpdateRelaySettings(oneshot::Sender<()>, RelaySettingsUpdate),
+ /// Set the allow LAN setting.
+ SetAllowLan(oneshot::Sender<()>, bool),
+ /// Set the block_when_disconnected setting.
+ SetBlockWhenDisconnected(oneshot::Sender<()>, bool),
+ /// Set the auto-connect setting.
+ SetAutoConnect(oneshot::Sender<()>, bool),
+ /// Set the mssfix argument for OpenVPN
+ SetOpenVpnMssfix(oneshot::Sender<()>, Option<u16>),
+ /// Set proxy details for OpenVPN
+ SetBridgeSettings(
+ oneshot::Sender<std::result::Result<(), settings::Error>>,
+ BridgeSettings,
+ ),
+ /// Set proxy state
+ SetBridgeState(
+ oneshot::Sender<std::result::Result<(), settings::Error>>,
+ BridgeState,
+ ),
+ /// Set if IPv6 should be enabled in the tunnel
+ SetEnableIpv6(oneshot::Sender<()>, bool),
+ /// Set MTU for wireguard tunnels
+ SetWireguardMtu(oneshot::Sender<()>, Option<u16>),
+ /// Set automatic key rotation interval for wireguard tunnels
+ SetWireguardRotationInterval(oneshot::Sender<()>, Option<u32>),
+ /// Get the daemon settings
+ GetSettings(oneshot::Sender<Settings>),
+ /// Generate new wireguard key
+ GenerateWireguardKey(oneshot::Sender<wireguard::KeygenEvent>),
+ /// Return a public key of the currently set wireguard private key, if there is one
+ GetWireguardKey(oneshot::Sender<Option<wireguard::PublicKey>>),
+ /// Verify if the currently set wireguard key is valid.
+ VerifyWireguardKey(oneshot::Sender<bool>),
+ /// Get information about the currently running and latest app versions
+ GetVersionInfo(oneshot::Sender<AppVersionInfo>),
+ /// Get current version of the app
+ GetCurrentVersion(oneshot::Sender<AppVersion>),
+ /// Remove settings and clear the cache
+ #[cfg(not(target_os = "android"))]
+ FactoryReset(oneshot::Sender<()>),
+ /// Makes the daemon exit the main loop and quit.
+ Shutdown,
+}
+
/// All events that can happen in the daemon. Sent from various threads and exposed interfaces.
pub(crate) enum InternalDaemonEvent {
/// Tunnel has changed state.
TunnelStateTransition(TunnelStateTransition),
/// Request from the `MullvadTunnelParametersGenerator` to obtain a new relay.
GenerateTunnelParameters(
- mpsc::Sender<std::result::Result<TunnelParameters, ParameterGenerationError>>,
+ mpsc::Sender<Result<TunnelParameters, ParameterGenerationError>>,
u32,
),
- /// An event coming from the JSONRPC-2.0 management interface.
- ManagementInterfaceEvent(ManagementCommand),
- /// Triggered if the server hosting the JSONRPC-2.0 management interface dies unexpectedly.
- ManagementInterfaceExited,
+ /// A command sent to the daemon.
+ Command(DaemonCommand),
/// Daemon shutdown triggered by a signal, ctrl-c or similar.
TriggerShutdown,
/// Wireguard key generation event
WgKeyEvent(
(
AccountToken,
- std::result::Result<mullvad_types::wireguard::WireguardData, wireguard::Error>,
+ Result<mullvad_types::wireguard::WireguardData, wireguard::Error>,
),
),
/// New Account created
NewAccountEvent(
AccountToken,
- oneshot::Sender<std::result::Result<String, mullvad_rpc::Error>>,
+ oneshot::Sender<Result<String, mullvad_rpc::Error>>,
),
/// The background job fetching new `AppVersionInfo`s got a new info object.
NewAppVersionInfo(AppVersionInfo),
@@ -168,9 +238,9 @@ impl From<TunnelStateTransition> for InternalDaemonEvent {
}
}
-impl From<ManagementCommand> for InternalDaemonEvent {
- fn from(command: ManagementCommand) -> Self {
- InternalDaemonEvent::ManagementInterfaceEvent(command)
+impl From<DaemonCommand> for InternalDaemonEvent {
+ fn from(command: DaemonCommand) -> Self {
+ InternalDaemonEvent::Command(command)
}
}
@@ -223,15 +293,101 @@ impl DaemonExecutionState {
}
}
-pub struct DaemonCommandSender(IntoSender<ManagementCommand, InternalDaemonEvent>);
+pub struct DaemonCommandChannel {
+ sender: DaemonCommandSender,
+ receiver: UnboundedReceiver<InternalDaemonEvent>,
+}
+
+impl DaemonCommandChannel {
+ pub fn new() -> Self {
+ let (untracked_sender, receiver) = futures::sync::mpsc::unbounded();
+ let sender = DaemonCommandSender(Arc::new(untracked_sender));
+
+ Self { sender, receiver }
+ }
+
+ pub fn sender(&self) -> DaemonCommandSender {
+ self.sender.clone()
+ }
+
+ fn destructure(self) -> (DaemonEventSender, UnboundedReceiver<InternalDaemonEvent>) {
+ let event_sender = DaemonEventSender::new(Arc::downgrade(&self.sender.0));
+
+ (event_sender, self.receiver)
+ }
+}
+
+#[derive(Clone)]
+pub struct DaemonCommandSender(Arc<UnboundedSender<InternalDaemonEvent>>);
impl DaemonCommandSender {
- pub(crate) fn new(internal_event_sender: UnboundedSender<InternalDaemonEvent>) -> Self {
- DaemonCommandSender(IntoSender::from(internal_event_sender))
+ pub fn send(&self, command: DaemonCommand) -> Result<(), Error> {
+ self.0
+ .unbounded_send(InternalDaemonEvent::Command(command))
+ .map_err(|_| Error::DaemonUnavailable)
+ }
+}
+
+pub(crate) struct DaemonEventSender<E = InternalDaemonEvent> {
+ sender: Weak<UnboundedSender<InternalDaemonEvent>>,
+ _event: PhantomData<E>,
+}
+
+impl<E> Clone for DaemonEventSender<E>
+where
+ InternalDaemonEvent: From<E>,
+{
+ fn clone(&self) -> Self {
+ DaemonEventSender {
+ sender: self.sender.clone(),
+ _event: PhantomData,
+ }
+ }
+}
+
+impl DaemonEventSender {
+ pub fn new(sender: Weak<UnboundedSender<InternalDaemonEvent>>) -> Self {
+ DaemonEventSender {
+ sender,
+ _event: PhantomData,
+ }
+ }
+
+ pub fn to_specialized_sender<E>(&self) -> DaemonEventSender<E>
+ where
+ InternalDaemonEvent: From<E>,
+ {
+ DaemonEventSender {
+ sender: self.sender.clone(),
+ _event: PhantomData,
+ }
+ }
+}
+
+impl<E> DaemonEventSender<E>
+where
+ InternalDaemonEvent: From<E>,
+{
+ pub fn is_closed(&self) -> bool {
+ self.sender
+ .upgrade()
+ .map(|sender| sender.is_closed())
+ .unwrap_or(true)
}
+}
- pub fn send(&self, command: ManagementCommand) -> Result<()> {
- self.0.send(command).map_err(|_| Error::DaemonUnavailable)
+impl<E> Sender<E> for DaemonEventSender<E>
+where
+ InternalDaemonEvent: From<E>,
+{
+ fn send(&self, event: E) -> Result<(), ()> {
+ if let Some(sender) = self.sender.upgrade() {
+ sender
+ .unbounded_send(InternalDaemonEvent::from(event))
+ .map_err(|_| ())
+ } else {
+ Err(())
+ }
}
}
@@ -254,13 +410,13 @@ pub trait EventListener {
fn notify_key_event(&self, key_event: KeygenEvent);
}
-pub struct Daemon<L: EventListener = ManagementInterfaceEventBroadcaster> {
+pub struct Daemon<L: EventListener> {
tunnel_command_tx: Arc<UnboundedSender<TunnelCommand>>,
tunnel_state: TunnelState,
target_state: TargetState,
state: DaemonExecutionState,
rx: Wait<UnboundedReceiver<InternalDaemonEvent>>,
- tx: UnboundedSender<InternalDaemonEvent>,
+ tx: DaemonEventSender,
reconnection_loop_tx: Option<mpsc::Sender<()>>,
event_listener: L,
settings: Settings,
@@ -277,100 +433,18 @@ pub struct Daemon<L: EventListener = ManagementInterfaceEventBroadcaster> {
shutdown_callbacks: Vec<Box<dyn FnOnce()>>,
}
-impl Daemon<ManagementInterfaceEventBroadcaster> {
- pub fn start(
- log_dir: Option<PathBuf>,
- resource_dir: PathBuf,
- cache_dir: PathBuf,
- // TODO: Remove this once `ManagementInterface` is less coupled to the constructor.
- #[cfg(target_os = "android")] android_context: AndroidContext,
- ) -> Result<Self> {
- if rpc_uniqueness_check::is_another_instance_running() {
- return Err(Error::DaemonIsAlreadyRunning);
- }
- let (tx, rx) = futures::sync::mpsc::unbounded();
- let management_interface_broadcaster = Self::start_management_interface(tx.clone())?;
-
- Self::start_internal(
- tx,
- rx,
- management_interface_broadcaster,
- log_dir,
- resource_dir,
- cache_dir,
- #[cfg(target_os = "android")]
- android_context,
- )
- }
-
- // Starts the management interface and spawns a thread that will process it.
- // Returns a handle that allows notifying all subscribers on events.
- fn start_management_interface(
- event_tx: UnboundedSender<InternalDaemonEvent>,
- ) -> Result<ManagementInterfaceEventBroadcaster> {
- let multiplex_event_tx = IntoSender::from(event_tx.clone());
- let server = Self::start_management_interface_server(multiplex_event_tx)?;
- let event_broadcaster = server.event_broadcaster();
- Self::spawn_management_interface_wait_thread(server, event_tx);
- Ok(event_broadcaster)
- }
-
- fn start_management_interface_server(
- event_tx: IntoSender<ManagementCommand, InternalDaemonEvent>,
- ) -> Result<ManagementInterfaceServer> {
- let server =
- ManagementInterfaceServer::start(event_tx).map_err(Error::StartManagementInterface)?;
- info!("Management interface listening on {}", server.socket_path());
-
- Ok(server)
- }
-
- fn spawn_management_interface_wait_thread(
- server: ManagementInterfaceServer,
- exit_tx: UnboundedSender<InternalDaemonEvent>,
- ) {
- thread::spawn(move || {
- server.wait();
- info!("Management interface shut down");
- let _ = exit_tx.unbounded_send(InternalDaemonEvent::ManagementInterfaceExited);
- });
- }
-}
-
impl<L> Daemon<L>
where
L: EventListener + Clone + Send + 'static,
{
- pub fn start_with_event_listener(
- event_listener: L,
+ pub fn start(
log_dir: Option<PathBuf>,
resource_dir: PathBuf,
cache_dir: PathBuf,
- #[cfg(target_os = "android")] android_context: AndroidContext,
- ) -> Result<Self> {
- let (tx, rx) = futures::sync::mpsc::unbounded();
-
- Self::start_internal(
- tx,
- rx,
- event_listener,
- log_dir,
- resource_dir,
- cache_dir,
- #[cfg(target_os = "android")]
- android_context,
- )
- }
-
- fn start_internal(
- internal_event_tx: UnboundedSender<InternalDaemonEvent>,
- internal_event_rx: UnboundedReceiver<InternalDaemonEvent>,
event_listener: L,
- log_dir: Option<PathBuf>,
- resource_dir: PathBuf,
- cache_dir: PathBuf,
+ command_channel: DaemonCommandChannel,
#[cfg(target_os = "android")] android_context: AndroidContext,
- ) -> Result<Self> {
+ ) -> Result<Self, Error> {
let ca_path = resource_dir.join(mullvad_paths::resources::API_CA_FILENAME);
let mut rpc_manager = mullvad_rpc::MullvadRpcFactory::with_cache_dir(&cache_dir, &ca_path);
@@ -399,11 +473,13 @@ where
&cache_dir,
);
+ let (internal_event_tx, internal_event_rx) = command_channel.destructure();
+
let app_version_info = version_check::load_cache(&cache_dir);
let version_check_future = version_check::VersionUpdater::new(
rpc_handle.clone(),
cache_dir.clone(),
- internal_event_tx.clone(),
+ internal_event_tx.to_specialized_sender(),
app_version_info.clone(),
);
tokio_remote.spawn(|_| version_check_future);
@@ -427,7 +503,7 @@ where
log_dir,
resource_dir,
cache_dir,
- IntoSender::from(internal_event_tx.clone()),
+ internal_event_tx.to_specialized_sender(),
#[cfg(target_os = "android")]
android_context,
)
@@ -483,21 +559,16 @@ where
Ok(daemon)
}
- /// Retrieve a channel for sending daemon commands.
- pub fn command_sender(&self) -> DaemonCommandSender {
- DaemonCommandSender::new(self.tx.clone())
- }
-
/// Consume the `Daemon` and run the main event loop. Blocks until an error happens or a
/// shutdown event is received.
- pub fn run(mut self) -> Result<()> {
+ pub fn run(mut self) -> Result<(), Error> {
if self.settings.get_auto_connect() && self.settings.get_account_token().is_some() {
info!("Automatically connecting since auto-connect is turned on");
self.set_target_state(TargetState::Secured);
}
while let Some(Ok(event)) = self.rx.next() {
- self.handle_event(event)?;
+ self.handle_event(event);
if self.state == DaemonExecutionState::Finished {
break;
}
@@ -515,8 +586,8 @@ where
mem::drop(event_listener);
}
- /// Shuts down the daemon without shutting down the underlying management interface event
- /// listener and the shutdown callbacks
+ /// Shuts down the daemon without shutting down the underlying event listener and the shutdown
+ /// callbacks
fn shutdown(self) -> (L, Vec<Box<dyn FnOnce()>>) {
let Daemon {
event_listener,
@@ -527,17 +598,14 @@ where
}
- fn handle_event(&mut self, event: InternalDaemonEvent) -> Result<()> {
+ fn handle_event(&mut self, event: InternalDaemonEvent) {
use self::InternalDaemonEvent::*;
match event {
TunnelStateTransition(transition) => self.handle_tunnel_state_transition(transition),
GenerateTunnelParameters(tunnel_parameters_tx, retry_attempt) => {
self.handle_generate_tunnel_parameters(&tunnel_parameters_tx, retry_attempt)
}
- ManagementInterfaceEvent(event) => self.handle_management_interface_event(event),
- ManagementInterfaceExited => {
- return Err(Error::ManagementInterfaceExited);
- }
+ Command(command) => self.handle_command(command),
TriggerShutdown => self.trigger_shutdown_event(),
WgKeyEvent(key_event) => self.handle_wireguard_key_event(key_event),
NewAccountEvent(account_token, tx) => self.handle_new_account_event(account_token, tx),
@@ -545,7 +613,6 @@ where
self.handle_new_app_version_info(app_version_info)
}
}
- Ok(())
}
fn handle_tunnel_state_transition(&mut self, tunnel_state_transition: TunnelStateTransition) {
@@ -596,9 +663,7 @@ where
fn handle_generate_tunnel_parameters(
&mut self,
- tunnel_parameters_tx: &mpsc::Sender<
- std::result::Result<TunnelParameters, ParameterGenerationError>,
- >,
+ tunnel_parameters_tx: &mpsc::Sender<Result<TunnelParameters, ParameterGenerationError>>,
retry_attempt: u32,
) {
if let Some(account_token) = self.settings.get_account_token() {
@@ -663,7 +728,7 @@ where
endpoint: MullvadEndpoint,
account_token: String,
retry_attempt: u32,
- ) -> Result<TunnelParameters> {
+ ) -> Result<TunnelParameters, Error> {
let tunnel_options = self.settings.get_tunnel_options().clone();
let location = relay.location.as_ref().expect("Relay has no location set");
self.last_generated_bridge_relay = None;
@@ -763,7 +828,7 @@ where
}
fn schedule_reconnect(&mut self, delay: Duration) {
- let tunnel_command_tx = self.tx.clone();
+ let tunnel_command_tx = self.tx.to_specialized_sender();
let (tx, rx) = mpsc::channel();
self.reconnection_loop_tx = Some(tx);
@@ -773,11 +838,10 @@ where
if let Err(mpsc::RecvTimeoutError::Timeout) = rx.recv_timeout(delay) {
debug!("Attempting to reconnect");
- let _ = tunnel_command_tx.unbounded_send(
- InternalDaemonEvent::ManagementInterfaceEvent(
- ManagementCommand::SetTargetState(result_tx, TargetState::Secured),
- ),
- );
+ let _ = tunnel_command_tx.send(DaemonCommand::SetTargetState(
+ result_tx,
+ TargetState::Secured,
+ ));
}
});
}
@@ -788,13 +852,13 @@ where
}
}
- fn handle_management_interface_event(&mut self, event: ManagementCommand) {
- use self::ManagementCommand::*;
+ fn handle_command(&mut self, command: DaemonCommand) {
+ use self::DaemonCommand::*;
if !self.state.is_running() {
- log::trace!("Dropping management command because the daemon is shutting down",);
+ log::trace!("Dropping daemon command because the daemon is shutting down",);
return;
}
- match event {
+ match command {
SetTargetState(tx, state) => self.on_set_target_state(tx, state),
Reconnect => self.on_reconnect(),
GetState(tx) => self.on_get_state(tx),
@@ -842,7 +906,7 @@ where
&mut self,
event: (
AccountToken,
- std::result::Result<mullvad_types::wireguard::WireguardData, wireguard::Error>,
+ Result<mullvad_types::wireguard::WireguardData, wireguard::Error>,
),
) {
let (account, result) = event;
@@ -905,7 +969,7 @@ where
fn handle_new_account_event(
&mut self,
new_token: AccountToken,
- tx: oneshot::Sender<std::result::Result<String, mullvad_rpc::Error>>,
+ tx: oneshot::Sender<Result<String, mullvad_rpc::Error>>,
) {
match self.set_account(Some(new_token.clone())) {
Ok(_) => {
@@ -925,7 +989,7 @@ where
fn on_set_target_state(
&mut self,
- tx: oneshot::Sender<std::result::Result<(), ()>>,
+ tx: oneshot::Sender<Result<(), ()>>,
new_target_state: TargetState,
) {
if self.state.is_running() {
@@ -1008,27 +1072,23 @@ where
})
}
- fn on_create_new_account(
- &mut self,
- tx: oneshot::Sender<std::result::Result<String, mullvad_rpc::Error>>,
- ) {
+ fn on_create_new_account(&mut self, tx: oneshot::Sender<Result<String, mullvad_rpc::Error>>) {
let daemon_tx = self.tx.clone();
- let future = self.accounts_proxy.create_account().then(
- move |result| -> std::result::Result<(), ()> {
+ let future = self
+ .accounts_proxy
+ .create_account()
+ .then(move |result| -> Result<(), ()> {
match result {
Ok(account_token) => {
- let _ = daemon_tx.unbounded_send(InternalDaemonEvent::NewAccountEvent(
- account_token,
- tx,
- ));
+ let _ =
+ daemon_tx.send(InternalDaemonEvent::NewAccountEvent(account_token, tx));
}
Err(err) => {
let _ = tx.send(Err(err));
}
};
Ok(())
- },
- );
+ });
if self.tokio_remote.execute(future).is_err() {
log::error!("Failed to spawn future for creating a new account");
@@ -1099,10 +1159,7 @@ where
}
}
- fn set_account(
- &mut self,
- account_token: Option<String>,
- ) -> std::result::Result<bool, settings::Error> {
+ fn set_account(&mut self, account_token: Option<String>) -> Result<bool, settings::Error> {
let account_changed = self.settings.set_account_token(account_token.clone())?;
if account_changed {
self.event_listener.notify_settings(self.settings.clone());
@@ -1284,7 +1341,7 @@ where
fn on_set_bridge_settings(
&mut self,
- tx: oneshot::Sender<std::result::Result<(), settings::Error>>,
+ tx: oneshot::Sender<Result<(), settings::Error>>,
new_settings: BridgeSettings,
) {
match self.settings.set_bridge_settings(new_settings) {
@@ -1308,7 +1365,7 @@ where
fn on_set_bridge_state(
&mut self,
- tx: oneshot::Sender<std::result::Result<(), settings::Error>>,
+ tx: oneshot::Sender<Result<(), settings::Error>>,
bridge_state: BridgeState,
) {
let result = match self.settings.set_bridge_state(bridge_state) {
@@ -1415,7 +1472,7 @@ where
}
fn on_generate_wireguard_key(&mut self, tx: oneshot::Sender<KeygenEvent>) {
- let mut result = || -> std::result::Result<KeygenEvent, String> {
+ let mut result = || -> Result<KeygenEvent, String> {
let account_token = self
.settings
.get_account_token()
@@ -1539,7 +1596,7 @@ where
fn oneshot_send<T>(tx: oneshot::Sender<T>, t: T, msg: &'static str) {
if tx.send(t).is_err() {
- warn!("Unable to send {} to management interface client", msg);
+ warn!("Unable to send {} to the daemon command sender", msg);
}
}
@@ -1598,19 +1655,19 @@ where
}
#[cfg(not(target_os = "android"))]
- fn clear_log_directory() -> Result<()> {
+ fn clear_log_directory() -> Result<(), Error> {
let log_dir = mullvad_paths::get_log_dir().map_err(Error::PathError)?;
Self::clear_directory(&log_dir)
}
#[cfg(not(target_os = "android"))]
- fn clear_cache_directory() -> Result<()> {
+ fn clear_cache_directory() -> Result<(), Error> {
let cache_dir = mullvad_paths::cache_dir().map_err(Error::PathError)?;
Self::clear_directory(&cache_dir)
}
#[cfg(not(target_os = "android"))]
- fn clear_directory(path: &Path) -> Result<()> {
+ fn clear_directory(path: &Path) -> Result<(), Error> {
use std::fs;
#[cfg(not(target_os = "windows"))]
{
@@ -1640,7 +1697,7 @@ where
Error::RemoveDirError(entry.path().display().to_string(), e)
})
})
- .collect::<Result<()>>()
+ .collect::<Result<(), Error>>()
})
}
}
@@ -1654,28 +1711,28 @@ where
}
pub struct DaemonShutdownHandle {
- tx: UnboundedSender<InternalDaemonEvent>,
+ tx: DaemonEventSender,
}
impl DaemonShutdownHandle {
pub fn shutdown(&self) {
- let _ = self.tx.unbounded_send(InternalDaemonEvent::TriggerShutdown);
+ let _ = self.tx.send(InternalDaemonEvent::TriggerShutdown);
}
}
struct MullvadTunnelParametersGenerator {
- tx: UnboundedSender<InternalDaemonEvent>,
+ tx: DaemonEventSender,
}
impl TunnelParametersGenerator for MullvadTunnelParametersGenerator {
fn generate(
&mut self,
retry_attempt: u32,
- ) -> std::result::Result<TunnelParameters, ParameterGenerationError> {
+ ) -> Result<TunnelParameters, ParameterGenerationError> {
let (response_tx, response_rx) = mpsc::channel();
if self
.tx
- .unbounded_send(InternalDaemonEvent::GenerateTunnelParameters(
+ .send(InternalDaemonEvent::GenerateTunnelParameters(
response_tx,
retry_attempt,
))
diff --git a/mullvad-daemon/src/main.rs b/mullvad-daemon/src/main.rs
index 4fd4ac6796..4928ba277e 100644
--- a/mullvad-daemon/src/main.rs
+++ b/mullvad-daemon/src/main.rs
@@ -1,7 +1,11 @@
#![deny(rust_2018_idioms)]
use log::{debug, error, info, warn};
-use mullvad_daemon::{logging, version, Daemon};
+use mullvad_daemon::{
+ logging,
+ management_interface::{ManagementInterfaceEventBroadcaster, ManagementInterfaceServer},
+ rpc_uniqueness_check, version, Daemon, DaemonCommandChannel, DaemonCommandSender,
+};
use std::{path::PathBuf, thread, time::Duration};
use talpid_types::ErrorExt;
@@ -84,6 +88,10 @@ fn run_platform(_config: &cli::Config, log_dir: Option<PathBuf>) -> Result<(), S
}
fn run_standalone(log_dir: Option<PathBuf>) -> Result<(), String> {
+ if rpc_uniqueness_check::is_another_instance_running() {
+ return Err("Another instance of the daemon is already running".to_owned());
+ }
+
if !running_as_admin() {
warn!("Running daemon as a non-administrator user, clients might refuse to connect");
}
@@ -101,13 +109,42 @@ fn run_standalone(log_dir: Option<PathBuf>) -> Result<(), String> {
Ok(())
}
-fn create_daemon(log_dir: Option<PathBuf>) -> Result<Daemon, String> {
+fn create_daemon(
+ log_dir: Option<PathBuf>,
+) -> Result<Daemon<ManagementInterfaceEventBroadcaster>, String> {
let resource_dir = mullvad_paths::get_resource_dir();
let cache_dir = mullvad_paths::cache_dir()
.map_err(|e| e.display_chain_with_msg("Unable to get cache dir"))?;
- Daemon::start(log_dir, resource_dir, cache_dir)
- .map_err(|e| e.display_chain_with_msg("Unable to initialize daemon"))
+ let command_channel = DaemonCommandChannel::new();
+ let event_listener = spawn_management_interface(command_channel.sender())?;
+
+ Daemon::start(
+ log_dir,
+ resource_dir,
+ cache_dir,
+ event_listener,
+ command_channel,
+ )
+ .map_err(|e| e.display_chain_with_msg("Unable to initialize daemon"))
+}
+
+fn spawn_management_interface(
+ command_sender: DaemonCommandSender,
+) -> Result<ManagementInterfaceEventBroadcaster, String> {
+ let server = ManagementInterfaceServer::start(command_sender).map_err(|error| {
+ error.display_chain_with_msg("Unable to start management interface server")
+ })?;
+ let event_broadcaster = server.event_broadcaster();
+
+ info!("Management interface listening on {}", server.socket_path());
+
+ thread::spawn(|| {
+ server.wait();
+ info!("Management interface shut down");
+ });
+
+ Ok(event_broadcaster)
}
#[cfg(unix)]
diff --git a/mullvad-daemon/src/management_interface.rs b/mullvad-daemon/src/management_interface.rs
index 22358881ed..8da3d4dabf 100644
--- a/mullvad-daemon/src/management_interface.rs
+++ b/mullvad-daemon/src/management_interface.rs
@@ -1,10 +1,6 @@
-use crate::EventListener;
+use crate::{BoxFuture, DaemonCommand, DaemonCommandSender, EventListener};
use jsonrpc_core::{
- futures::{
- future,
- sync::{self, oneshot::Sender as OneshotSender},
- Future,
- },
+ futures::{future, sync, Future},
Error, ErrorCode, MetaIoHandler, Metadata,
};
use jsonrpc_ipc_server;
@@ -21,21 +17,15 @@ use mullvad_types::{
states::{TargetState, TunnelState},
version, wireguard, DaemonEvent,
};
-use parking_lot::{Mutex, RwLock};
+use parking_lot::RwLock;
use std::{
collections::{hash_map::Entry, HashMap},
sync::Arc,
};
-use talpid_core::mpsc::IntoSender;
use talpid_ipc;
use talpid_types::ErrorExt;
use uuid;
-/// FIXME(linus): This is here just because the futures crate has deprecated it and jsonrpc_core
-/// did not introduce their own yet (https://github.com/paritytech/jsonrpc/pull/196).
-/// Remove this and use the one in jsonrpc_core when that is released.
-pub type BoxFuture<T, E> = Box<dyn Future<Item = T, Error = E> + Send>;
-
build_rpc_trait! {
pub trait ManagementInterfaceApi {
type Metadata;
@@ -190,90 +180,13 @@ build_rpc_trait! {
}
}
-
-/// Enum representing commands coming in on the management interface.
-pub enum ManagementCommand {
- /// Set target state. Does nothing if the daemon already has the state that is being set.
- SetTargetState(OneshotSender<Result<(), ()>>, TargetState),
- /// Reconnect the tunnel, if one is connecting/connected.
- Reconnect,
- /// Request the current state.
- GetState(OneshotSender<TunnelState>),
- /// Get the current geographical location.
- GetCurrentLocation(OneshotSender<Option<GeoIpLocation>>),
- CreateNewAccount(OneshotSender<std::result::Result<String, mullvad_rpc::Error>>),
- /// Request the metadata for an account.
- GetAccountData(
- OneshotSender<BoxFuture<AccountData, mullvad_rpc::Error>>,
- AccountToken,
- ),
- /// Request www auth token for an account
- GetWwwAuthToken(OneshotSender<BoxFuture<String, mullvad_rpc::Error>>),
- /// Submit voucher to add time to the current account. Returns time added in seconds
- SubmitVoucher(
- OneshotSender<BoxFuture<VoucherSubmission, mullvad_rpc::Error>>,
- String,
- ),
- /// Request account history
- GetAccountHistory(OneshotSender<Vec<AccountToken>>),
- /// Request account history
- RemoveAccountFromHistory(OneshotSender<()>, AccountToken),
- /// Get the list of countries and cities where there are relays.
- GetRelayLocations(OneshotSender<RelayList>),
- /// Trigger an asynchronous relay list update. This returns before the relay list is actually
- /// updated.
- UpdateRelayLocations,
- /// Set which account token to use for subsequent connection attempts.
- SetAccount(OneshotSender<()>, Option<AccountToken>),
- /// Place constraints on the type of tunnel and relay
- UpdateRelaySettings(OneshotSender<()>, RelaySettingsUpdate),
- /// Set the allow LAN setting.
- SetAllowLan(OneshotSender<()>, bool),
- /// Set the block_when_disconnected setting.
- SetBlockWhenDisconnected(OneshotSender<()>, bool),
- /// Set the auto-connect setting.
- SetAutoConnect(OneshotSender<()>, bool),
- /// Set the mssfix argument for OpenVPN
- SetOpenVpnMssfix(OneshotSender<()>, Option<u16>),
- /// Set proxy details for OpenVPN
- SetBridgeSettings(OneshotSender<Result<(), settings::Error>>, BridgeSettings),
- /// Set proxy state
- SetBridgeState(OneshotSender<Result<(), settings::Error>>, BridgeState),
- /// Set if IPv6 should be enabled in the tunnel
- SetEnableIpv6(OneshotSender<()>, bool),
- /// Set MTU for wireguard tunnels
- SetWireguardMtu(OneshotSender<()>, Option<u16>),
- /// Set automatic key rotation interval for wireguard tunnels
- SetWireguardRotationInterval(OneshotSender<()>, Option<u32>),
- /// Get the daemon settings
- GetSettings(OneshotSender<Settings>),
- /// Generate new wireguard key
- GenerateWireguardKey(OneshotSender<wireguard::KeygenEvent>),
- /// Return a public key of the currently set wireguard private key, if there is one
- GetWireguardKey(OneshotSender<Option<wireguard::PublicKey>>),
- /// Verify if the currently set wireguard key is valid.
- VerifyWireguardKey(OneshotSender<bool>),
- /// Get information about the currently running and latest app versions
- GetVersionInfo(OneshotSender<version::AppVersionInfo>),
- /// Get current version of the app
- GetCurrentVersion(OneshotSender<version::AppVersion>),
- /// Remove settings and clear the cache
- #[cfg(not(target_os = "android"))]
- FactoryReset(OneshotSender<()>),
- /// Makes the daemon exit the main loop and quit.
- Shutdown,
-}
-
pub struct ManagementInterfaceServer {
server: talpid_ipc::IpcServer,
subscriptions: Arc<RwLock<HashMap<SubscriptionId, pubsub::Sink<DaemonEvent>>>>,
}
impl ManagementInterfaceServer {
- pub fn start<T>(tunnel_tx: IntoSender<ManagementCommand, T>) -> Result<Self, talpid_ipc::Error>
- where
- T: From<ManagementCommand> + 'static + Send,
- {
+ pub fn start(tunnel_tx: DaemonCommandSender) -> Result<Self, talpid_ipc::Error> {
let rpc = ManagementInterface::new(tunnel_tx);
let subscriptions = rpc.subscriptions.clone();
@@ -363,25 +276,25 @@ impl Drop for ManagementInterfaceEventBroadcaster {
}
}
-struct ManagementInterface<T: From<ManagementCommand> + 'static + Send> {
+struct ManagementInterface {
subscriptions: Arc<RwLock<HashMap<SubscriptionId, pubsub::Sink<DaemonEvent>>>>,
- tx: Mutex<IntoSender<ManagementCommand, T>>,
+ tx: DaemonCommandSender,
}
-impl<T: From<ManagementCommand> + 'static + Send> ManagementInterface<T> {
- pub fn new(tx: IntoSender<ManagementCommand, T>) -> Self {
+impl ManagementInterface {
+ pub fn new(tx: DaemonCommandSender) -> Self {
ManagementInterface {
subscriptions: Default::default(),
- tx: Mutex::new(tx),
+ tx,
}
}
/// Sends a command to the daemon and maps the error to an RPC error.
fn send_command_to_daemon(
&self,
- command: ManagementCommand,
+ command: DaemonCommand,
) -> impl Future<Item = (), Error = Error> {
- future::result(self.tx.lock().send(command)).map_err(|_| Error::internal_error())
+ future::result(self.tx.send(command)).map_err(|_| Error::internal_error())
}
/// Converts the given error to an error that can be given to the caller of the API.
@@ -403,15 +316,13 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterface<T> {
}
}
-impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi
- for ManagementInterface<T>
-{
+impl ManagementInterfaceApi for ManagementInterface {
type Metadata = Meta;
fn create_new_account(&self, _: Self::Metadata) -> BoxFuture<String, Error> {
let (tx, rx) = sync::oneshot::channel();
let future = self
- .send_command_to_daemon(ManagementCommand::CreateNewAccount(tx))
+ .send_command_to_daemon(DaemonCommand::CreateNewAccount(tx))
.and_then(|_| rx.map_err(|_| Error::internal_error()))
.and_then(|result| match result {
Ok(account_token) => Ok(account_token),
@@ -429,7 +340,7 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi
log::debug!("get_account_data");
let (tx, rx) = sync::oneshot::channel();
let future = self
- .send_command_to_daemon(ManagementCommand::GetAccountData(tx, account_token))
+ .send_command_to_daemon(DaemonCommand::GetAccountData(tx, account_token))
.and_then(|_| rx.map_err(|_| Error::internal_error()))
.and_then(|rpc_future| {
rpc_future.map_err(|error: mullvad_rpc::Error| {
@@ -447,7 +358,7 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi
log::debug!("get_account_data");
let (tx, rx) = sync::oneshot::channel();
let future = self
- .send_command_to_daemon(ManagementCommand::GetWwwAuthToken(tx))
+ .send_command_to_daemon(DaemonCommand::GetWwwAuthToken(tx))
.and_then(|_| rx.map_err(|_| Error::internal_error()))
.and_then(|rpc_future| {
rpc_future.map_err(|error: mullvad_rpc::Error| {
@@ -469,7 +380,7 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi
log::debug!("submit_voucher");
let (tx, rx) = sync::oneshot::channel();
let future = self
- .send_command_to_daemon(ManagementCommand::SubmitVoucher(tx, voucher))
+ .send_command_to_daemon(DaemonCommand::SubmitVoucher(tx, voucher))
.and_then(|_| rx.map_err(|_| Error::internal_error()))
.and_then(|f| f.map_err(|e| Self::map_rpc_error(&e)));
Box::new(future)
@@ -479,14 +390,14 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi
log::debug!("get_relay_locations");
let (tx, rx) = sync::oneshot::channel();
let future = self
- .send_command_to_daemon(ManagementCommand::GetRelayLocations(tx))
+ .send_command_to_daemon(DaemonCommand::GetRelayLocations(tx))
.and_then(|_| rx.map_err(|_| Error::internal_error()));
Box::new(future)
}
fn update_relay_locations(&self, _: Self::Metadata) -> BoxFuture<(), Error> {
log::debug!("update_relay_locations");
- Box::new(self.send_command_to_daemon(ManagementCommand::UpdateRelayLocations))
+ Box::new(self.send_command_to_daemon(DaemonCommand::UpdateRelayLocations))
}
fn set_account(
@@ -497,7 +408,7 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi
log::debug!("set_account");
let (tx, rx) = sync::oneshot::channel();
let future = self
- .send_command_to_daemon(ManagementCommand::SetAccount(tx, account_token))
+ .send_command_to_daemon(DaemonCommand::SetAccount(tx, account_token))
.and_then(|_| rx.map_err(|_| Error::internal_error()));
Box::new(future)
}
@@ -510,7 +421,7 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi
log::debug!("update_relay_settings");
let (tx, rx) = sync::oneshot::channel();
- let message = ManagementCommand::UpdateRelaySettings(tx, constraints_update);
+ let message = DaemonCommand::UpdateRelaySettings(tx, constraints_update);
let future = self
.send_command_to_daemon(message)
.and_then(|_| rx.map_err(|_| Error::internal_error()));
@@ -521,7 +432,7 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi
log::debug!("set_allow_lan({})", allow_lan);
let (tx, rx) = sync::oneshot::channel();
let future = self
- .send_command_to_daemon(ManagementCommand::SetAllowLan(tx, allow_lan))
+ .send_command_to_daemon(DaemonCommand::SetAllowLan(tx, allow_lan))
.and_then(|_| rx.map_err(|_| Error::internal_error()));
Box::new(future)
}
@@ -534,7 +445,7 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi
log::debug!("set_block_when_disconnected({})", block_when_disconnected);
let (tx, rx) = sync::oneshot::channel();
let future = self
- .send_command_to_daemon(ManagementCommand::SetBlockWhenDisconnected(
+ .send_command_to_daemon(DaemonCommand::SetBlockWhenDisconnected(
tx,
block_when_disconnected,
))
@@ -546,7 +457,7 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi
log::debug!("set_auto_connect({})", auto_connect);
let (tx, rx) = sync::oneshot::channel();
let future = self
- .send_command_to_daemon(ManagementCommand::SetAutoConnect(tx, auto_connect))
+ .send_command_to_daemon(DaemonCommand::SetAutoConnect(tx, auto_connect))
.and_then(|_| rx.map_err(|_| Error::internal_error()));
Box::new(future)
}
@@ -555,7 +466,7 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi
log::debug!("connect");
let (tx, rx) = sync::oneshot::channel();
let future = self
- .send_command_to_daemon(ManagementCommand::SetTargetState(tx, TargetState::Secured))
+ .send_command_to_daemon(DaemonCommand::SetTargetState(tx, TargetState::Secured))
.and_then(|_| rx.map_err(|_| Error::internal_error()))
.and_then(|result| match result {
Ok(()) => future::ok(()),
@@ -572,17 +483,14 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi
log::debug!("disconnect");
let (tx, _) = sync::oneshot::channel();
let future = self
- .send_command_to_daemon(ManagementCommand::SetTargetState(
- tx,
- TargetState::Unsecured,
- ))
+ .send_command_to_daemon(DaemonCommand::SetTargetState(tx, TargetState::Unsecured))
.then(|_| future::ok(()));
Box::new(future)
}
fn reconnect(&self, _: Self::Metadata) -> BoxFuture<(), Error> {
log::debug!("reconnect");
- let future = self.send_command_to_daemon(ManagementCommand::Reconnect);
+ let future = self.send_command_to_daemon(DaemonCommand::Reconnect);
Box::new(future)
}
@@ -590,7 +498,7 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi
log::debug!("get_state");
let (state_tx, state_rx) = sync::oneshot::channel();
let future = self
- .send_command_to_daemon(ManagementCommand::GetState(state_tx))
+ .send_command_to_daemon(DaemonCommand::GetState(state_tx))
.and_then(|_| state_rx.map_err(|_| Error::internal_error()));
Box::new(future)
}
@@ -599,21 +507,21 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi
log::debug!("get_current_location");
let (tx, rx) = sync::oneshot::channel();
let future = self
- .send_command_to_daemon(ManagementCommand::GetCurrentLocation(tx))
+ .send_command_to_daemon(DaemonCommand::GetCurrentLocation(tx))
.and_then(|_| rx.map_err(|_| Error::internal_error()));
Box::new(future)
}
fn shutdown(&self, _: Self::Metadata) -> BoxFuture<(), Error> {
log::debug!("shutdown");
- Box::new(self.send_command_to_daemon(ManagementCommand::Shutdown))
+ Box::new(self.send_command_to_daemon(DaemonCommand::Shutdown))
}
fn get_account_history(&self, _: Self::Metadata) -> BoxFuture<Vec<AccountToken>, Error> {
log::debug!("get_account_history");
let (tx, rx) = sync::oneshot::channel();
let future = self
- .send_command_to_daemon(ManagementCommand::GetAccountHistory(tx))
+ .send_command_to_daemon(DaemonCommand::GetAccountHistory(tx))
.and_then(|_| rx.map_err(|_| Error::internal_error()));
Box::new(future)
}
@@ -626,10 +534,7 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi
log::debug!("remove_account_from_history");
let (tx, rx) = sync::oneshot::channel();
let future = self
- .send_command_to_daemon(ManagementCommand::RemoveAccountFromHistory(
- tx,
- account_token,
- ))
+ .send_command_to_daemon(DaemonCommand::RemoveAccountFromHistory(tx, account_token))
.and_then(|_| rx.map_err(|_| Error::internal_error()));
Box::new(future)
}
@@ -638,7 +543,7 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi
log::debug!("set_openvpn_mssfix({:?})", mssfix);
let (tx, rx) = sync::oneshot::channel();
let future = self
- .send_command_to_daemon(ManagementCommand::SetOpenVpnMssfix(tx, mssfix))
+ .send_command_to_daemon(DaemonCommand::SetOpenVpnMssfix(tx, mssfix))
.and_then(|_| rx.map_err(|_| Error::internal_error()));
Box::new(future)
@@ -652,7 +557,7 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi
log::debug!("set_bridge_settings({:?})", bridge_settings);
let (tx, rx) = sync::oneshot::channel();
let future = self
- .send_command_to_daemon(ManagementCommand::SetBridgeSettings(tx, bridge_settings))
+ .send_command_to_daemon(DaemonCommand::SetBridgeSettings(tx, bridge_settings))
.and_then(|_| rx.map_err(|_| Error::internal_error()))
.and_then(|settings_result| {
settings_result.map_err(|error| match error {
@@ -672,7 +577,7 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi
log::debug!("set_bridge_state({:?})", bridge_state);
let (tx, rx) = sync::oneshot::channel();
let future = self
- .send_command_to_daemon(ManagementCommand::SetBridgeState(tx, bridge_state))
+ .send_command_to_daemon(DaemonCommand::SetBridgeState(tx, bridge_state))
.and_then(|_| rx.map_err(|_| Error::internal_error()))
.and_then(|settings_result| settings_result.map_err(|_| Error::internal_error()));
@@ -683,7 +588,7 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi
log::debug!("set_enable_ipv6({})", enable_ipv6);
let (tx, rx) = sync::oneshot::channel();
let future = self
- .send_command_to_daemon(ManagementCommand::SetEnableIpv6(tx, enable_ipv6))
+ .send_command_to_daemon(DaemonCommand::SetEnableIpv6(tx, enable_ipv6))
.and_then(|_| rx.map_err(|_| Error::internal_error()));
Box::new(future)
@@ -694,7 +599,7 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi
log::debug!("set_wireguard_mtu({:?})", mtu);
let (tx, rx) = sync::oneshot::channel();
let future = self
- .send_command_to_daemon(ManagementCommand::SetWireguardMtu(tx, mtu))
+ .send_command_to_daemon(DaemonCommand::SetWireguardMtu(tx, mtu))
.and_then(|_| rx.map_err(|_| Error::internal_error()));
Box::new(future)
}
@@ -708,9 +613,7 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi
log::debug!("set_wireguard_rotation_interval({:?})", interval);
let (tx, rx) = sync::oneshot::channel();
let future = self
- .send_command_to_daemon(ManagementCommand::SetWireguardRotationInterval(
- tx, interval,
- ))
+ .send_command_to_daemon(DaemonCommand::SetWireguardRotationInterval(tx, interval))
.and_then(|_| rx.map_err(|_| Error::internal_error()));
Box::new(future)
}
@@ -719,7 +622,7 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi
log::debug!("get_settings");
let (tx, rx) = sync::oneshot::channel();
let future = self
- .send_command_to_daemon(ManagementCommand::GetSettings(tx))
+ .send_command_to_daemon(DaemonCommand::GetSettings(tx))
.and_then(|_| rx.map_err(|_| Error::internal_error()));
Box::new(future)
}
@@ -731,7 +634,7 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi
log::debug!("generate_wireguard_key");
let (tx, rx) = sync::oneshot::channel();
let future = self
- .send_command_to_daemon(ManagementCommand::GenerateWireguardKey(tx))
+ .send_command_to_daemon(DaemonCommand::GenerateWireguardKey(tx))
.and_then(|_| rx.map_err(|_| Error::internal_error()));
Box::new(future)
}
@@ -743,7 +646,7 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi
log::debug!("get_wireguard_key");
let (tx, rx) = sync::oneshot::channel();
let future = self
- .send_command_to_daemon(ManagementCommand::GetWireguardKey(tx))
+ .send_command_to_daemon(DaemonCommand::GetWireguardKey(tx))
.and_then(|_| rx.map_err(|_| Error::internal_error()));
Box::new(future)
}
@@ -752,7 +655,7 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi
log::debug!("verify_wireguard_key");
let (tx, rx) = sync::oneshot::channel();
let future = self
- .send_command_to_daemon(ManagementCommand::VerifyWireguardKey(tx))
+ .send_command_to_daemon(DaemonCommand::VerifyWireguardKey(tx))
.and_then(|_| rx.map_err(|_| Error::internal_error()));
Box::new(future)
}
@@ -761,7 +664,7 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi
log::debug!("get_current_version");
let (tx, rx) = sync::oneshot::channel();
let future = self
- .send_command_to_daemon(ManagementCommand::GetCurrentVersion(tx))
+ .send_command_to_daemon(DaemonCommand::GetCurrentVersion(tx))
.and_then(|_| rx.map_err(|_| Error::internal_error()));
Box::new(future)
@@ -771,7 +674,7 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi
log::debug!("get_version_info");
let (tx, rx) = sync::oneshot::channel();
let future = self
- .send_command_to_daemon(ManagementCommand::GetVersionInfo(tx))
+ .send_command_to_daemon(DaemonCommand::GetVersionInfo(tx))
.and_then(|_| rx.map_err(|_| Error::internal_error()));
Box::new(future)
@@ -783,7 +686,7 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi
log::debug!("factory_reset");
let (tx, rx) = sync::oneshot::channel();
let future = self
- .send_command_to_daemon(ManagementCommand::FactoryReset(tx))
+ .send_command_to_daemon(DaemonCommand::FactoryReset(tx))
.and_then(|_| rx.map_err(|_| Error::internal_error()));
Box::new(future)
diff --git a/mullvad-daemon/src/version_check.rs b/mullvad-daemon/src/version_check.rs
index da24b85ea3..65b960a7b1 100644
--- a/mullvad-daemon/src/version_check.rs
+++ b/mullvad-daemon/src/version_check.rs
@@ -1,5 +1,5 @@
-use crate::version::PRODUCT_VERSION;
-use futures::{sync::mpsc::UnboundedSender, Async, Future, Poll};
+use crate::{version::PRODUCT_VERSION, DaemonEventSender};
+use futures::{Async, Future, Poll};
use mullvad_rpc::{AppVersionProxy, HttpHandle};
use mullvad_types::version::AppVersionInfo;
use serde::{Deserialize, Serialize};
@@ -9,6 +9,7 @@ use std::{
path::{Path, PathBuf},
time::{Duration, Instant},
};
+use talpid_core::mpsc::Sender;
use talpid_types::ErrorExt;
use tokio_timer::{TimeoutError, Timer};
@@ -80,10 +81,10 @@ impl<T> From<TimeoutError<T>> for Error {
}
-pub struct VersionUpdater<T: From<AppVersionInfo>> {
+pub(crate) struct VersionUpdater {
version_proxy: AppVersionProxy<HttpHandle>,
cache_path: PathBuf,
- update_sender: UnboundedSender<T>,
+ update_sender: DaemonEventSender<AppVersionInfo>,
last_app_version_info: AppVersionInfo,
next_update_time: Instant,
state: VersionUpdaterState,
@@ -94,11 +95,11 @@ enum VersionUpdaterState {
Updating(Box<dyn Future<Item = AppVersionInfo, Error = Error> + Send + 'static>),
}
-impl<T: From<AppVersionInfo>> VersionUpdater<T> {
+impl VersionUpdater {
pub fn new(
rpc_handle: HttpHandle,
cache_dir: PathBuf,
- update_sender: UnboundedSender<T>,
+ update_sender: DaemonEventSender<AppVersionInfo>,
last_app_version_info: AppVersionInfo,
) -> Self {
let version_proxy = AppVersionProxy::new(rpc_handle);
@@ -140,7 +141,7 @@ impl<T: From<AppVersionInfo>> VersionUpdater<T> {
}
}
-impl<T: From<AppVersionInfo>> Future for VersionUpdater<T> {
+impl Future for VersionUpdater {
type Item = ();
type Error = ();
@@ -176,11 +177,7 @@ impl<T: From<AppVersionInfo>> Future for VersionUpdater<T> {
log::debug!("Got new version check: {:?}", app_version_info);
self.next_update_time = Instant::now() + UPDATE_INTERVAL;
if app_version_info != self.last_app_version_info {
- if self
- .update_sender
- .unbounded_send(app_version_info.clone().into())
- .is_err()
- {
+ if self.update_sender.send(app_version_info.clone()).is_err() {
log::warn!(
"Version update receiver is closed, stopping version updater"
);
diff --git a/mullvad-daemon/src/wireguard.rs b/mullvad-daemon/src/wireguard.rs
index 09c5f01461..9b234dfaab 100644
--- a/mullvad-daemon/src/wireguard.rs
+++ b/mullvad-daemon/src/wireguard.rs
@@ -1,15 +1,11 @@
-use crate::{account_history::AccountHistory, InternalDaemonEvent};
+use crate::{account_history::AccountHistory, DaemonEventSender, InternalDaemonEvent};
use chrono::offset::Utc;
-use futures::{
- future::Executor,
- stream::Stream,
- sync::{mpsc::UnboundedSender, oneshot},
- Async, Future, Poll,
-};
+use futures::{future::Executor, stream::Stream, sync::oneshot, Async, Future, Poll};
use jsonrpc_client_core::Error as JsonRpcError;
use mullvad_types::account::AccountToken;
pub use mullvad_types::wireguard::*;
use std::time::Duration;
+use talpid_core::mpsc::Sender;
pub use talpid_types::net::wireguard::{
ConnectionConfig, PrivateKey, TunnelConfig, TunnelParameters,
};
@@ -46,7 +42,7 @@ pub enum Error {
pub type Result<T> = std::result::Result<T, Error>;
pub struct KeyManager {
- daemon_tx: UnboundedSender<InternalDaemonEvent>,
+ daemon_tx: DaemonEventSender,
http_handle: mullvad_rpc::HttpHandle,
tokio_remote: Remote,
current_job: Option<CancelHandle>,
@@ -57,7 +53,7 @@ pub struct KeyManager {
impl KeyManager {
pub(crate) fn new(
- daemon_tx: UnboundedSender<InternalDaemonEvent>,
+ daemon_tx: DaemonEventSender,
http_handle: mullvad_rpc::HttpHandle,
tokio_remote: Remote,
) -> Self {
@@ -199,14 +195,13 @@ impl KeyManager {
let fut = fut.then(move |result| {
match result {
Ok(wireguard_data) => {
- let _ = daemon_tx.unbounded_send(InternalDaemonEvent::WgKeyEvent((
+ let _ = daemon_tx.send(InternalDaemonEvent::WgKeyEvent((
account,
Ok(wireguard_data),
)));
}
Err(CancelErr::Inner(e)) => {
- let _ = daemon_tx
- .unbounded_send(InternalDaemonEvent::WgKeyEvent((account, Err(e))));
+ let _ = daemon_tx.send(InternalDaemonEvent::WgKeyEvent((account, Err(e))));
}
Err(CancelErr::Cancelled) => {
log::error!("Key generation cancelled");
@@ -294,7 +289,7 @@ impl KeyManager {
}
fn next_automatic_rotation(
- daemon_tx: UnboundedSender<InternalDaemonEvent>,
+ daemon_tx: DaemonEventSender,
http_handle: mullvad_rpc::HttpHandle,
public_key: PublicKey,
rotation_interval_secs: u64,
@@ -315,14 +310,14 @@ impl KeyManager {
match rpc_result {
Ok(data) => {
// Update account data
- let _ = daemon_tx.unbounded_send(InternalDaemonEvent::WgKeyEvent((
+ let _ = daemon_tx.send(InternalDaemonEvent::WgKeyEvent((
account_token_copy,
Ok(data.clone()),
)));
Ok(data.get_public_key())
}
Err(Error::TooManyKeys) => {
- let _ = daemon_tx.unbounded_send(InternalDaemonEvent::WgKeyEvent((
+ let _ = daemon_tx.send(InternalDaemonEvent::WgKeyEvent((
account_token_copy,
Err(Error::TooManyKeys),
)));
@@ -334,7 +329,7 @@ impl KeyManager {
}
fn create_automatic_rotation(
- daemon_tx: UnboundedSender<InternalDaemonEvent>,
+ daemon_tx: DaemonEventSender,
http_handle: mullvad_rpc::HttpHandle,
public_key: PublicKey,
rotation_interval_secs: u64,
diff --git a/mullvad-jni/src/daemon_interface.rs b/mullvad-jni/src/daemon_interface.rs
index ceda7b5fd6..4be2a35c94 100644
--- a/mullvad-jni/src/daemon_interface.rs
+++ b/mullvad-jni/src/daemon_interface.rs
@@ -1,5 +1,5 @@
use futures::{sync::oneshot, Future};
-use mullvad_daemon::{DaemonCommandSender, ManagementCommand};
+use mullvad_daemon::{DaemonCommand, DaemonCommandSender};
use mullvad_types::{
account::AccountData,
location::GeoIpLocation,
@@ -40,7 +40,7 @@ impl DaemonInterface {
pub fn connect(&self) -> Result<()> {
let (tx, rx) = oneshot::channel();
- self.send_command(ManagementCommand::SetTargetState(tx, TargetState::Secured))?;
+ self.send_command(DaemonCommand::SetTargetState(tx, TargetState::Secured))?;
rx.wait().map_err(|_| Error::NoResponse)?.unwrap();
@@ -50,10 +50,7 @@ impl DaemonInterface {
pub fn disconnect(&self) -> Result<()> {
let (tx, rx) = oneshot::channel();
- self.send_command(ManagementCommand::SetTargetState(
- tx,
- TargetState::Unsecured,
- ))?;
+ self.send_command(DaemonCommand::SetTargetState(tx, TargetState::Unsecured))?;
rx.wait().map_err(|_| Error::NoResponse)?.unwrap();
@@ -63,7 +60,7 @@ impl DaemonInterface {
pub fn generate_wireguard_key(&self) -> Result<KeygenEvent> {
let (tx, rx) = oneshot::channel();
- self.send_command(ManagementCommand::GenerateWireguardKey(tx))?;
+ self.send_command(DaemonCommand::GenerateWireguardKey(tx))?;
rx.wait().map_err(|_| Error::NoResponse)
}
@@ -71,7 +68,7 @@ impl DaemonInterface {
pub fn get_account_data(&self, account_token: String) -> Result<AccountData> {
let (tx, rx) = oneshot::channel();
- self.send_command(ManagementCommand::GetAccountData(tx, account_token))?;
+ self.send_command(DaemonCommand::GetAccountData(tx, account_token))?;
rx.wait()
.map_err(|_| Error::NoResponse)?
@@ -82,7 +79,7 @@ impl DaemonInterface {
pub fn get_account_history(&self) -> Result<Vec<String>> {
let (tx, rx) = oneshot::channel();
- self.send_command(ManagementCommand::GetAccountHistory(tx))?;
+ self.send_command(DaemonCommand::GetAccountHistory(tx))?;
rx.wait().map_err(|_| Error::NoResponse)
}
@@ -90,7 +87,7 @@ impl DaemonInterface {
pub fn get_www_auth_token(&self) -> Result<String> {
let (tx, rx) = oneshot::channel();
- self.send_command(ManagementCommand::GetWwwAuthToken(tx))?;
+ self.send_command(DaemonCommand::GetWwwAuthToken(tx))?;
rx.wait()
.map_err(|_| Error::NoResponse)?
@@ -101,7 +98,7 @@ impl DaemonInterface {
pub fn get_current_location(&self) -> Result<Option<GeoIpLocation>> {
let (tx, rx) = oneshot::channel();
- self.send_command(ManagementCommand::GetCurrentLocation(tx))?;
+ self.send_command(DaemonCommand::GetCurrentLocation(tx))?;
Ok(rx.wait().map_err(|_| Error::NoResponse)?)
}
@@ -109,7 +106,7 @@ impl DaemonInterface {
pub fn get_current_version(&self) -> Result<String> {
let (tx, rx) = oneshot::channel();
- self.send_command(ManagementCommand::GetCurrentVersion(tx))?;
+ self.send_command(DaemonCommand::GetCurrentVersion(tx))?;
Ok(rx.wait().map_err(|_| Error::NoResponse)?)
}
@@ -117,7 +114,7 @@ impl DaemonInterface {
pub fn get_relay_locations(&self) -> Result<RelayList> {
let (tx, rx) = oneshot::channel();
- self.send_command(ManagementCommand::GetRelayLocations(tx))?;
+ self.send_command(DaemonCommand::GetRelayLocations(tx))?;
Ok(rx.wait().map_err(|_| Error::NoResponse)?)
}
@@ -125,7 +122,7 @@ impl DaemonInterface {
pub fn get_settings(&self) -> Result<Settings> {
let (tx, rx) = oneshot::channel();
- self.send_command(ManagementCommand::GetSettings(tx))?;
+ self.send_command(DaemonCommand::GetSettings(tx))?;
Ok(rx.wait().map_err(|_| Error::NoResponse)?)
}
@@ -133,7 +130,7 @@ impl DaemonInterface {
pub fn get_state(&self) -> Result<TunnelState> {
let (tx, rx) = oneshot::channel();
- self.send_command(ManagementCommand::GetState(tx))?;
+ self.send_command(DaemonCommand::GetState(tx))?;
Ok(rx.wait().map_err(|_| Error::NoResponse)?)
}
@@ -141,13 +138,13 @@ impl DaemonInterface {
pub fn get_version_info(&self) -> Result<AppVersionInfo> {
let (tx, rx) = oneshot::channel();
- self.send_command(ManagementCommand::GetVersionInfo(tx))?;
+ self.send_command(DaemonCommand::GetVersionInfo(tx))?;
rx.wait().map_err(|_| Error::NoResponse)
}
pub fn reconnect(&self) -> Result<()> {
- self.send_command(ManagementCommand::Reconnect)?;
+ self.send_command(DaemonCommand::Reconnect)?;
Ok(())
}
@@ -155,7 +152,7 @@ impl DaemonInterface {
pub fn get_wireguard_key(&self) -> Result<Option<wireguard::PublicKey>> {
let (tx, rx) = oneshot::channel();
- self.send_command(ManagementCommand::GetWireguardKey(tx))?;
+ self.send_command(DaemonCommand::GetWireguardKey(tx))?;
rx.wait().map_err(|_| Error::NoResponse)
}
@@ -163,14 +160,14 @@ impl DaemonInterface {
pub fn verify_wireguard_key(&self) -> Result<bool> {
let (tx, rx) = oneshot::channel();
- self.send_command(ManagementCommand::VerifyWireguardKey(tx))?;
+ self.send_command(DaemonCommand::VerifyWireguardKey(tx))?;
rx.wait().map_err(|_| Error::NoResponse)
}
pub fn set_account(&self, account_token: Option<String>) -> Result<()> {
let (tx, rx) = oneshot::channel();
- self.send_command(ManagementCommand::SetAccount(tx, account_token))?;
+ self.send_command(DaemonCommand::SetAccount(tx, account_token))?;
rx.wait().map_err(|_| Error::NoResponse)
}
@@ -178,7 +175,7 @@ impl DaemonInterface {
pub fn set_allow_lan(&self, allow_lan: bool) -> Result<()> {
let (tx, rx) = oneshot::channel();
- self.send_command(ManagementCommand::SetAllowLan(tx, allow_lan))?;
+ self.send_command(DaemonCommand::SetAllowLan(tx, allow_lan))?;
rx.wait().map_err(|_| Error::NoResponse)
}
@@ -186,24 +183,24 @@ impl DaemonInterface {
pub fn set_auto_connect(&self, auto_connect: bool) -> Result<()> {
let (tx, rx) = oneshot::channel();
- self.send_command(ManagementCommand::SetAutoConnect(tx, auto_connect))?;
+ self.send_command(DaemonCommand::SetAutoConnect(tx, auto_connect))?;
rx.wait().map_err(|_| Error::NoResponse)
}
pub fn shutdown(&self) -> Result<()> {
- self.send_command(ManagementCommand::Shutdown)
+ self.send_command(DaemonCommand::Shutdown)
}
pub fn update_relay_settings(&self, update: RelaySettingsUpdate) -> Result<()> {
let (tx, rx) = oneshot::channel();
- self.send_command(ManagementCommand::UpdateRelaySettings(tx, update))?;
+ self.send_command(DaemonCommand::UpdateRelaySettings(tx, update))?;
rx.wait().map_err(|_| Error::NoResponse)
}
- fn send_command(&self, command: ManagementCommand) -> Result<()> {
+ fn send_command(&self, command: DaemonCommand) -> Result<()> {
self.command_sender.send(command).map_err(Error::NoDaemon)
}
}
diff --git a/mullvad-jni/src/lib.rs b/mullvad-jni/src/lib.rs
index 3be27632b4..b9ff6a40fb 100644
--- a/mullvad-jni/src/lib.rs
+++ b/mullvad-jni/src/lib.rs
@@ -17,7 +17,7 @@ use jnix::{
FromJava, IntoJava, JnixEnv,
};
use lazy_static::lazy_static;
-use mullvad_daemon::{logging, version, Daemon, DaemonCommandSender};
+use mullvad_daemon::{logging, version, Daemon, DaemonCommandChannel};
use mullvad_types::account::AccountData;
use std::{
path::{Path, PathBuf},
@@ -131,8 +131,10 @@ fn initialize(
log_dir: PathBuf,
) -> Result<(), Error> {
let android_context = create_android_context(env, *vpn_service)?;
- let daemon_command_sender = spawn_daemon(env, this, log_dir, android_context)?;
- let daemon_interface = Box::new(DaemonInterface::new(daemon_command_sender));
+ let daemon_command_channel = DaemonCommandChannel::new();
+ let daemon_interface = Box::new(DaemonInterface::new(daemon_command_channel.sender()));
+
+ spawn_daemon(env, this, log_dir, daemon_command_channel, android_context)?;
set_daemon_interface_address(env, this, Box::into_raw(daemon_interface) as jlong);
@@ -155,8 +157,9 @@ fn spawn_daemon(
env: &JnixEnv<'_>,
this: &JObject<'_>,
log_dir: PathBuf,
+ command_channel: DaemonCommandChannel,
android_context: AndroidContext,
-) -> Result<DaemonCommandSender, Error> {
+) -> Result<(), Error> {
let listener = JniEventListener::spawn(env, this).map_err(Error::SpawnJniEventListener)?;
let daemon_object = env
.new_global_ref(*this)
@@ -166,9 +169,9 @@ fn spawn_daemon(
thread::spawn(move || {
let jvm = android_context.jvm.clone();
- match create_daemon(listener, log_dir, android_context) {
+ match create_daemon(listener, log_dir, command_channel, android_context) {
Ok(daemon) => {
- let _ = tx.send(Ok(daemon.command_sender()));
+ let _ = tx.send(Ok(()));
match daemon.run() {
Ok(()) => log::info!("Mullvad daemon has stopped"),
Err(error) => log::error!("{}", error.display_chain()),
@@ -188,21 +191,21 @@ fn spawn_daemon(
fn create_daemon(
listener: JniEventListener,
log_dir: PathBuf,
+ command_channel: DaemonCommandChannel,
android_context: AndroidContext,
) -> Result<Daemon<JniEventListener>, Error> {
let resource_dir = mullvad_paths::get_resource_dir();
let cache_dir = mullvad_paths::cache_dir().map_err(Error::GetCacheDir)?;
- let daemon = Daemon::start_with_event_listener(
- listener,
+ Daemon::start(
Some(log_dir),
resource_dir,
cache_dir,
+ listener,
+ command_channel,
android_context,
)
- .map_err(Error::InitializeDaemon)?;
-
- Ok(daemon)
+ .map_err(Error::InitializeDaemon)
}
fn notify_daemon_stopped(jvm: Arc<JavaVM>, daemon_object: GlobalRef) {
diff --git a/talpid-core/src/mpsc.rs b/talpid-core/src/mpsc.rs
index 21807ca377..050b90c81c 100644
--- a/talpid-core/src/mpsc.rs
+++ b/talpid-core/src/mpsc.rs
@@ -1,84 +1,5 @@
-use futures::sync::mpsc::{SendError, UnboundedSender};
-use std::marker::PhantomData;
-
-/// Abstraction over an `mpsc::Sender` that first converts the value to another type before sending.
-#[derive(Debug, Clone)]
-pub struct IntoSender<T, U> {
- sender: UnboundedSender<U>,
- _marker: PhantomData<T>,
-}
-
-impl<T, U> IntoSender<T, U>
-where
- T: Into<U>,
-{
- /// Converts the `T` into a `U` and sends it on the channel.
- pub fn send(&self, t: T) -> Result<(), SendError<U>> {
- self.sender.unbounded_send(t.into())
- }
-}
-
-impl<T, U> From<UnboundedSender<U>> for IntoSender<T, U>
-where
- T: Into<U>,
-{
- fn from(sender: UnboundedSender<U>) -> Self {
- IntoSender {
- sender,
- _marker: PhantomData,
- }
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use futures::{sync::mpsc, Stream};
- use std::thread;
-
- #[derive(Debug, Eq, PartialEq)]
- enum Inner {
- One,
- Two,
- }
-
- #[derive(Debug, Eq, PartialEq)]
- enum Outer {
- Inner(Inner),
- Other,
- }
-
- impl From<Inner> for Outer {
- fn from(o: Inner) -> Self {
- Outer::Inner(o)
- }
- }
-
- #[test]
- fn sender() {
- let (tx, rx) = mpsc::unbounded();
- let inner_tx: IntoSender<Inner, Outer> = tx.clone().into();
-
- tx.unbounded_send(Outer::Other).unwrap();
- inner_tx.send(Inner::Two).unwrap();
-
- let mut sync_rx = rx.wait();
-
- assert_eq!(Outer::Other, sync_rx.next().unwrap().unwrap());
- assert_eq!(Outer::Inner(Inner::Two), sync_rx.next().unwrap().unwrap());
- }
-
- #[test]
- fn send_between_thread() {
- let (tx, rx) = mpsc::unbounded();
- let inner_tx: IntoSender<Inner, Outer> = tx.clone().into();
-
- thread::spawn(move || {
- inner_tx.send(Inner::One).unwrap();
- });
-
- let mut sync_rx = rx.wait();
-
- assert_eq!(Outer::Inner(Inner::One), sync_rx.next().unwrap().unwrap());
- }
+/// Abstraction over any type that can be used similarly to an `std::mpsc::Sender`.
+pub trait Sender<T> {
+ /// Sends an item over the underlying channel, failing only if the channel is closed.
+ fn send(&self, item: T) -> Result<(), ()>;
}
diff --git a/talpid-core/src/tunnel_state_machine/mod.rs b/talpid-core/src/tunnel_state_machine/mod.rs
index 3fb2e4e757..773a14e8da 100644
--- a/talpid-core/src/tunnel_state_machine/mod.rs
+++ b/talpid-core/src/tunnel_state_machine/mod.rs
@@ -17,7 +17,7 @@ use self::{
use crate::{
dns::DnsMonitor,
firewall::{Firewall, FirewallArguments},
- mpsc::IntoSender,
+ mpsc::Sender,
offline,
tunnel::tun_provider::TunProvider,
};
@@ -62,20 +62,16 @@ pub enum Error {
}
/// Spawn the tunnel state machine thread, returning a channel for sending tunnel commands.
-pub fn spawn<P, T>(
+pub fn spawn(
allow_lan: bool,
block_when_disconnected: bool,
tunnel_parameters_generator: impl TunnelParametersGenerator,
log_dir: Option<PathBuf>,
resource_dir: PathBuf,
- cache_dir: P,
- state_change_listener: IntoSender<TunnelStateTransition, T>,
+ cache_dir: impl AsRef<Path> + Send + 'static,
+ state_change_listener: impl Sender<TunnelStateTransition> + Send + 'static,
#[cfg(target_os = "android")] android_context: AndroidContext,
-) -> Result<Arc<mpsc::UnboundedSender<TunnelCommand>>, Error>
-where
- P: AsRef<Path> + Send + 'static,
- T: From<TunnelStateTransition> + Send + 'static,
-{
+) -> Result<Arc<mpsc::UnboundedSender<TunnelCommand>>, Error> {
let (command_tx, command_rx) = mpsc::unbounded();
let command_tx = Arc::new(command_tx);
let offline_monitor = offline::spawn_monitor(
@@ -134,7 +130,7 @@ where
Ok(command_tx)
}
-fn create_event_loop<T>(
+fn create_event_loop(
allow_lan: bool,
block_when_disconnected: bool,
is_offline: bool,
@@ -144,11 +140,8 @@ fn create_event_loop<T>(
resource_dir: PathBuf,
cache_dir: impl AsRef<Path>,
commands: mpsc::UnboundedReceiver<TunnelCommand>,
- state_change_listener: IntoSender<TunnelStateTransition, T>,
-) -> Result<(Core, impl Future<Item = (), Error = Error>), Error>
-where
- T: From<TunnelStateTransition> + Send + 'static,
-{
+ state_change_listener: impl Sender<TunnelStateTransition>,
+) -> Result<(Core, impl Future<Item = (), Error = Error>), Error> {
let reactor = Core::new().map_err(Error::ReactorError)?;
let state_machine = TunnelStateMachine::new(
allow_lan,