diff options
| author | Janito Vaqueiro Ferreira Filho <janito@mullvad.net> | 2020-02-24 09:48:13 -0300 |
|---|---|---|
| committer | Janito Vaqueiro Ferreira Filho <janito@mullvad.net> | 2020-02-24 09:48:13 -0300 |
| commit | 807de13a4577db8d97b8718f43fe9aef6be84b6d (patch) | |
| tree | ffb55ef18e6355f9aa6eb5e5a2fee48203ecc6b3 | |
| parent | ec5549a908c580aff9db9218800b0f73fcbe2b6c (diff) | |
| parent | 57e13fa5107d6a6392c09ba0bedeecf786206b9c (diff) | |
| download | mullvadvpn-807de13a4577db8d97b8718f43fe9aef6be84b6d.tar.xz mullvadvpn-807de13a4577db8d97b8718f43fe9aef6be84b6d.zip | |
Merge branch 'uncouple-management-interface'
| -rw-r--r-- | mullvad-daemon/src/lib.rs | 427 | ||||
| -rw-r--r-- | mullvad-daemon/src/main.rs | 45 | ||||
| -rw-r--r-- | mullvad-daemon/src/management_interface.rs | 185 | ||||
| -rw-r--r-- | mullvad-daemon/src/version_check.rs | 21 | ||||
| -rw-r--r-- | mullvad-daemon/src/wireguard.rs | 27 | ||||
| -rw-r--r-- | mullvad-jni/src/daemon_interface.rs | 47 | ||||
| -rw-r--r-- | mullvad-jni/src/lib.rs | 25 | ||||
| -rw-r--r-- | talpid-core/src/mpsc.rs | 87 | ||||
| -rw-r--r-- | talpid-core/src/tunnel_state_machine/mod.rs | 23 |
9 files changed, 395 insertions, 492 deletions
diff --git a/mullvad-daemon/src/lib.rs b/mullvad-daemon/src/lib.rs index f5e76d9b99..ffce1b4187 100644 --- a/mullvad-daemon/src/lib.rs +++ b/mullvad-daemon/src/lib.rs @@ -7,17 +7,15 @@ extern crate serde; mod account_history; mod geoip; pub mod logging; -mod management_interface; +#[cfg(not(target_os = "android"))] +pub mod management_interface; mod relays; -mod rpc_uniqueness_check; +#[cfg(not(target_os = "android"))] +pub mod rpc_uniqueness_check; mod settings; pub mod version; mod version_check; -pub use crate::management_interface::ManagementCommand; -use crate::management_interface::{ - BoxFuture, ManagementInterfaceEventBroadcaster, ManagementInterfaceServer, -}; use futures::{ future::{self, Executor}, stream::Wait, @@ -46,14 +44,16 @@ use settings::Settings; #[cfg(not(target_os = "android"))] use std::path::Path; use std::{ - io, mem, + io, + marker::PhantomData, + mem, path::PathBuf, - sync::{mpsc, Arc}, + sync::{mpsc, Arc, Weak}, thread, time::Duration, }; use talpid_core::{ - mpsc::IntoSender, + mpsc::Sender, tunnel_state_machine::{self, TunnelCommand, TunnelParametersGenerator}, }; #[cfg(target_os = "android")] @@ -67,14 +67,14 @@ use talpid_types::{ #[path = "wireguard.rs"] mod wireguard; -pub type Result<T> = std::result::Result<T, Error>; +/// FIXME(linus): This is here just because the futures crate has deprecated it and jsonrpc_core +/// did not introduce their own yet (https://github.com/paritytech/jsonrpc/pull/196). +/// Remove this and use the one in jsonrpc_core when that is released. +type BoxFuture<T, E> = Box<dyn Future<Item = T, Error = E> + Send>; #[derive(err_derive::Error, Debug)] #[error(no_from)] pub enum Error { - #[error(display = "Another instance of the daemon is already running")] - DaemonIsAlreadyRunning, - #[error(display = "Failed to send command to daemon because it is not running")] DaemonUnavailable, @@ -90,13 +90,6 @@ pub enum Error { #[error(display = "Unable to load account history with wireguard key cache")] LoadAccountHistory(#[error(source)] account_history::Error), - /// Error in the management interface - #[error(display = "Unable to start management interface server")] - StartManagementInterface(#[error(source)] talpid_ipc::Error), - - #[error(display = "Management interface server exited unexpectedly")] - ManagementInterfaceExited, - #[error(display = "No wireguard private key available")] NoKeyAvailable, @@ -131,32 +124,109 @@ pub enum Error { ReadDirError(#[error(source)] io::Error), } +/// Enum representing commands that can be sent to the daemon. +pub enum DaemonCommand { + /// Set target state. Does nothing if the daemon already has the state that is being set. + SetTargetState(oneshot::Sender<std::result::Result<(), ()>>, TargetState), + /// Reconnect the tunnel, if one is connecting/connected. + Reconnect, + /// Request the current state. + GetState(oneshot::Sender<TunnelState>), + /// Get the current geographical location. + GetCurrentLocation(oneshot::Sender<Option<GeoIpLocation>>), + CreateNewAccount(oneshot::Sender<std::result::Result<String, mullvad_rpc::Error>>), + /// Request the metadata for an account. + GetAccountData( + oneshot::Sender<BoxFuture<AccountData, mullvad_rpc::Error>>, + AccountToken, + ), + /// Request www auth token for an account + GetWwwAuthToken(oneshot::Sender<BoxFuture<String, mullvad_rpc::Error>>), + /// Submit voucher to add time to the current account. Returns time added in seconds + SubmitVoucher( + oneshot::Sender<BoxFuture<VoucherSubmission, mullvad_rpc::Error>>, + String, + ), + /// Request account history + GetAccountHistory(oneshot::Sender<Vec<AccountToken>>), + /// Request account history + RemoveAccountFromHistory(oneshot::Sender<()>, AccountToken), + /// Get the list of countries and cities where there are relays. + GetRelayLocations(oneshot::Sender<RelayList>), + /// Trigger an asynchronous relay list update. This returns before the relay list is actually + /// updated. + UpdateRelayLocations, + /// Set which account token to use for subsequent connection attempts. + SetAccount(oneshot::Sender<()>, Option<AccountToken>), + /// Place constraints on the type of tunnel and relay + UpdateRelaySettings(oneshot::Sender<()>, RelaySettingsUpdate), + /// Set the allow LAN setting. + SetAllowLan(oneshot::Sender<()>, bool), + /// Set the block_when_disconnected setting. + SetBlockWhenDisconnected(oneshot::Sender<()>, bool), + /// Set the auto-connect setting. + SetAutoConnect(oneshot::Sender<()>, bool), + /// Set the mssfix argument for OpenVPN + SetOpenVpnMssfix(oneshot::Sender<()>, Option<u16>), + /// Set proxy details for OpenVPN + SetBridgeSettings( + oneshot::Sender<std::result::Result<(), settings::Error>>, + BridgeSettings, + ), + /// Set proxy state + SetBridgeState( + oneshot::Sender<std::result::Result<(), settings::Error>>, + BridgeState, + ), + /// Set if IPv6 should be enabled in the tunnel + SetEnableIpv6(oneshot::Sender<()>, bool), + /// Set MTU for wireguard tunnels + SetWireguardMtu(oneshot::Sender<()>, Option<u16>), + /// Set automatic key rotation interval for wireguard tunnels + SetWireguardRotationInterval(oneshot::Sender<()>, Option<u32>), + /// Get the daemon settings + GetSettings(oneshot::Sender<Settings>), + /// Generate new wireguard key + GenerateWireguardKey(oneshot::Sender<wireguard::KeygenEvent>), + /// Return a public key of the currently set wireguard private key, if there is one + GetWireguardKey(oneshot::Sender<Option<wireguard::PublicKey>>), + /// Verify if the currently set wireguard key is valid. + VerifyWireguardKey(oneshot::Sender<bool>), + /// Get information about the currently running and latest app versions + GetVersionInfo(oneshot::Sender<AppVersionInfo>), + /// Get current version of the app + GetCurrentVersion(oneshot::Sender<AppVersion>), + /// Remove settings and clear the cache + #[cfg(not(target_os = "android"))] + FactoryReset(oneshot::Sender<()>), + /// Makes the daemon exit the main loop and quit. + Shutdown, +} + /// All events that can happen in the daemon. Sent from various threads and exposed interfaces. pub(crate) enum InternalDaemonEvent { /// Tunnel has changed state. TunnelStateTransition(TunnelStateTransition), /// Request from the `MullvadTunnelParametersGenerator` to obtain a new relay. GenerateTunnelParameters( - mpsc::Sender<std::result::Result<TunnelParameters, ParameterGenerationError>>, + mpsc::Sender<Result<TunnelParameters, ParameterGenerationError>>, u32, ), - /// An event coming from the JSONRPC-2.0 management interface. - ManagementInterfaceEvent(ManagementCommand), - /// Triggered if the server hosting the JSONRPC-2.0 management interface dies unexpectedly. - ManagementInterfaceExited, + /// A command sent to the daemon. + Command(DaemonCommand), /// Daemon shutdown triggered by a signal, ctrl-c or similar. TriggerShutdown, /// Wireguard key generation event WgKeyEvent( ( AccountToken, - std::result::Result<mullvad_types::wireguard::WireguardData, wireguard::Error>, + Result<mullvad_types::wireguard::WireguardData, wireguard::Error>, ), ), /// New Account created NewAccountEvent( AccountToken, - oneshot::Sender<std::result::Result<String, mullvad_rpc::Error>>, + oneshot::Sender<Result<String, mullvad_rpc::Error>>, ), /// The background job fetching new `AppVersionInfo`s got a new info object. NewAppVersionInfo(AppVersionInfo), @@ -168,9 +238,9 @@ impl From<TunnelStateTransition> for InternalDaemonEvent { } } -impl From<ManagementCommand> for InternalDaemonEvent { - fn from(command: ManagementCommand) -> Self { - InternalDaemonEvent::ManagementInterfaceEvent(command) +impl From<DaemonCommand> for InternalDaemonEvent { + fn from(command: DaemonCommand) -> Self { + InternalDaemonEvent::Command(command) } } @@ -223,15 +293,101 @@ impl DaemonExecutionState { } } -pub struct DaemonCommandSender(IntoSender<ManagementCommand, InternalDaemonEvent>); +pub struct DaemonCommandChannel { + sender: DaemonCommandSender, + receiver: UnboundedReceiver<InternalDaemonEvent>, +} + +impl DaemonCommandChannel { + pub fn new() -> Self { + let (untracked_sender, receiver) = futures::sync::mpsc::unbounded(); + let sender = DaemonCommandSender(Arc::new(untracked_sender)); + + Self { sender, receiver } + } + + pub fn sender(&self) -> DaemonCommandSender { + self.sender.clone() + } + + fn destructure(self) -> (DaemonEventSender, UnboundedReceiver<InternalDaemonEvent>) { + let event_sender = DaemonEventSender::new(Arc::downgrade(&self.sender.0)); + + (event_sender, self.receiver) + } +} + +#[derive(Clone)] +pub struct DaemonCommandSender(Arc<UnboundedSender<InternalDaemonEvent>>); impl DaemonCommandSender { - pub(crate) fn new(internal_event_sender: UnboundedSender<InternalDaemonEvent>) -> Self { - DaemonCommandSender(IntoSender::from(internal_event_sender)) + pub fn send(&self, command: DaemonCommand) -> Result<(), Error> { + self.0 + .unbounded_send(InternalDaemonEvent::Command(command)) + .map_err(|_| Error::DaemonUnavailable) + } +} + +pub(crate) struct DaemonEventSender<E = InternalDaemonEvent> { + sender: Weak<UnboundedSender<InternalDaemonEvent>>, + _event: PhantomData<E>, +} + +impl<E> Clone for DaemonEventSender<E> +where + InternalDaemonEvent: From<E>, +{ + fn clone(&self) -> Self { + DaemonEventSender { + sender: self.sender.clone(), + _event: PhantomData, + } + } +} + +impl DaemonEventSender { + pub fn new(sender: Weak<UnboundedSender<InternalDaemonEvent>>) -> Self { + DaemonEventSender { + sender, + _event: PhantomData, + } + } + + pub fn to_specialized_sender<E>(&self) -> DaemonEventSender<E> + where + InternalDaemonEvent: From<E>, + { + DaemonEventSender { + sender: self.sender.clone(), + _event: PhantomData, + } + } +} + +impl<E> DaemonEventSender<E> +where + InternalDaemonEvent: From<E>, +{ + pub fn is_closed(&self) -> bool { + self.sender + .upgrade() + .map(|sender| sender.is_closed()) + .unwrap_or(true) } +} - pub fn send(&self, command: ManagementCommand) -> Result<()> { - self.0.send(command).map_err(|_| Error::DaemonUnavailable) +impl<E> Sender<E> for DaemonEventSender<E> +where + InternalDaemonEvent: From<E>, +{ + fn send(&self, event: E) -> Result<(), ()> { + if let Some(sender) = self.sender.upgrade() { + sender + .unbounded_send(InternalDaemonEvent::from(event)) + .map_err(|_| ()) + } else { + Err(()) + } } } @@ -254,13 +410,13 @@ pub trait EventListener { fn notify_key_event(&self, key_event: KeygenEvent); } -pub struct Daemon<L: EventListener = ManagementInterfaceEventBroadcaster> { +pub struct Daemon<L: EventListener> { tunnel_command_tx: Arc<UnboundedSender<TunnelCommand>>, tunnel_state: TunnelState, target_state: TargetState, state: DaemonExecutionState, rx: Wait<UnboundedReceiver<InternalDaemonEvent>>, - tx: UnboundedSender<InternalDaemonEvent>, + tx: DaemonEventSender, reconnection_loop_tx: Option<mpsc::Sender<()>>, event_listener: L, settings: Settings, @@ -277,100 +433,18 @@ pub struct Daemon<L: EventListener = ManagementInterfaceEventBroadcaster> { shutdown_callbacks: Vec<Box<dyn FnOnce()>>, } -impl Daemon<ManagementInterfaceEventBroadcaster> { - pub fn start( - log_dir: Option<PathBuf>, - resource_dir: PathBuf, - cache_dir: PathBuf, - // TODO: Remove this once `ManagementInterface` is less coupled to the constructor. - #[cfg(target_os = "android")] android_context: AndroidContext, - ) -> Result<Self> { - if rpc_uniqueness_check::is_another_instance_running() { - return Err(Error::DaemonIsAlreadyRunning); - } - let (tx, rx) = futures::sync::mpsc::unbounded(); - let management_interface_broadcaster = Self::start_management_interface(tx.clone())?; - - Self::start_internal( - tx, - rx, - management_interface_broadcaster, - log_dir, - resource_dir, - cache_dir, - #[cfg(target_os = "android")] - android_context, - ) - } - - // Starts the management interface and spawns a thread that will process it. - // Returns a handle that allows notifying all subscribers on events. - fn start_management_interface( - event_tx: UnboundedSender<InternalDaemonEvent>, - ) -> Result<ManagementInterfaceEventBroadcaster> { - let multiplex_event_tx = IntoSender::from(event_tx.clone()); - let server = Self::start_management_interface_server(multiplex_event_tx)?; - let event_broadcaster = server.event_broadcaster(); - Self::spawn_management_interface_wait_thread(server, event_tx); - Ok(event_broadcaster) - } - - fn start_management_interface_server( - event_tx: IntoSender<ManagementCommand, InternalDaemonEvent>, - ) -> Result<ManagementInterfaceServer> { - let server = - ManagementInterfaceServer::start(event_tx).map_err(Error::StartManagementInterface)?; - info!("Management interface listening on {}", server.socket_path()); - - Ok(server) - } - - fn spawn_management_interface_wait_thread( - server: ManagementInterfaceServer, - exit_tx: UnboundedSender<InternalDaemonEvent>, - ) { - thread::spawn(move || { - server.wait(); - info!("Management interface shut down"); - let _ = exit_tx.unbounded_send(InternalDaemonEvent::ManagementInterfaceExited); - }); - } -} - impl<L> Daemon<L> where L: EventListener + Clone + Send + 'static, { - pub fn start_with_event_listener( - event_listener: L, + pub fn start( log_dir: Option<PathBuf>, resource_dir: PathBuf, cache_dir: PathBuf, - #[cfg(target_os = "android")] android_context: AndroidContext, - ) -> Result<Self> { - let (tx, rx) = futures::sync::mpsc::unbounded(); - - Self::start_internal( - tx, - rx, - event_listener, - log_dir, - resource_dir, - cache_dir, - #[cfg(target_os = "android")] - android_context, - ) - } - - fn start_internal( - internal_event_tx: UnboundedSender<InternalDaemonEvent>, - internal_event_rx: UnboundedReceiver<InternalDaemonEvent>, event_listener: L, - log_dir: Option<PathBuf>, - resource_dir: PathBuf, - cache_dir: PathBuf, + command_channel: DaemonCommandChannel, #[cfg(target_os = "android")] android_context: AndroidContext, - ) -> Result<Self> { + ) -> Result<Self, Error> { let ca_path = resource_dir.join(mullvad_paths::resources::API_CA_FILENAME); let mut rpc_manager = mullvad_rpc::MullvadRpcFactory::with_cache_dir(&cache_dir, &ca_path); @@ -399,11 +473,13 @@ where &cache_dir, ); + let (internal_event_tx, internal_event_rx) = command_channel.destructure(); + let app_version_info = version_check::load_cache(&cache_dir); let version_check_future = version_check::VersionUpdater::new( rpc_handle.clone(), cache_dir.clone(), - internal_event_tx.clone(), + internal_event_tx.to_specialized_sender(), app_version_info.clone(), ); tokio_remote.spawn(|_| version_check_future); @@ -427,7 +503,7 @@ where log_dir, resource_dir, cache_dir, - IntoSender::from(internal_event_tx.clone()), + internal_event_tx.to_specialized_sender(), #[cfg(target_os = "android")] android_context, ) @@ -483,21 +559,16 @@ where Ok(daemon) } - /// Retrieve a channel for sending daemon commands. - pub fn command_sender(&self) -> DaemonCommandSender { - DaemonCommandSender::new(self.tx.clone()) - } - /// Consume the `Daemon` and run the main event loop. Blocks until an error happens or a /// shutdown event is received. - pub fn run(mut self) -> Result<()> { + pub fn run(mut self) -> Result<(), Error> { if self.settings.get_auto_connect() && self.settings.get_account_token().is_some() { info!("Automatically connecting since auto-connect is turned on"); self.set_target_state(TargetState::Secured); } while let Some(Ok(event)) = self.rx.next() { - self.handle_event(event)?; + self.handle_event(event); if self.state == DaemonExecutionState::Finished { break; } @@ -515,8 +586,8 @@ where mem::drop(event_listener); } - /// Shuts down the daemon without shutting down the underlying management interface event - /// listener and the shutdown callbacks + /// Shuts down the daemon without shutting down the underlying event listener and the shutdown + /// callbacks fn shutdown(self) -> (L, Vec<Box<dyn FnOnce()>>) { let Daemon { event_listener, @@ -527,17 +598,14 @@ where } - fn handle_event(&mut self, event: InternalDaemonEvent) -> Result<()> { + fn handle_event(&mut self, event: InternalDaemonEvent) { use self::InternalDaemonEvent::*; match event { TunnelStateTransition(transition) => self.handle_tunnel_state_transition(transition), GenerateTunnelParameters(tunnel_parameters_tx, retry_attempt) => { self.handle_generate_tunnel_parameters(&tunnel_parameters_tx, retry_attempt) } - ManagementInterfaceEvent(event) => self.handle_management_interface_event(event), - ManagementInterfaceExited => { - return Err(Error::ManagementInterfaceExited); - } + Command(command) => self.handle_command(command), TriggerShutdown => self.trigger_shutdown_event(), WgKeyEvent(key_event) => self.handle_wireguard_key_event(key_event), NewAccountEvent(account_token, tx) => self.handle_new_account_event(account_token, tx), @@ -545,7 +613,6 @@ where self.handle_new_app_version_info(app_version_info) } } - Ok(()) } fn handle_tunnel_state_transition(&mut self, tunnel_state_transition: TunnelStateTransition) { @@ -596,9 +663,7 @@ where fn handle_generate_tunnel_parameters( &mut self, - tunnel_parameters_tx: &mpsc::Sender< - std::result::Result<TunnelParameters, ParameterGenerationError>, - >, + tunnel_parameters_tx: &mpsc::Sender<Result<TunnelParameters, ParameterGenerationError>>, retry_attempt: u32, ) { if let Some(account_token) = self.settings.get_account_token() { @@ -663,7 +728,7 @@ where endpoint: MullvadEndpoint, account_token: String, retry_attempt: u32, - ) -> Result<TunnelParameters> { + ) -> Result<TunnelParameters, Error> { let tunnel_options = self.settings.get_tunnel_options().clone(); let location = relay.location.as_ref().expect("Relay has no location set"); self.last_generated_bridge_relay = None; @@ -763,7 +828,7 @@ where } fn schedule_reconnect(&mut self, delay: Duration) { - let tunnel_command_tx = self.tx.clone(); + let tunnel_command_tx = self.tx.to_specialized_sender(); let (tx, rx) = mpsc::channel(); self.reconnection_loop_tx = Some(tx); @@ -773,11 +838,10 @@ where if let Err(mpsc::RecvTimeoutError::Timeout) = rx.recv_timeout(delay) { debug!("Attempting to reconnect"); - let _ = tunnel_command_tx.unbounded_send( - InternalDaemonEvent::ManagementInterfaceEvent( - ManagementCommand::SetTargetState(result_tx, TargetState::Secured), - ), - ); + let _ = tunnel_command_tx.send(DaemonCommand::SetTargetState( + result_tx, + TargetState::Secured, + )); } }); } @@ -788,13 +852,13 @@ where } } - fn handle_management_interface_event(&mut self, event: ManagementCommand) { - use self::ManagementCommand::*; + fn handle_command(&mut self, command: DaemonCommand) { + use self::DaemonCommand::*; if !self.state.is_running() { - log::trace!("Dropping management command because the daemon is shutting down",); + log::trace!("Dropping daemon command because the daemon is shutting down",); return; } - match event { + match command { SetTargetState(tx, state) => self.on_set_target_state(tx, state), Reconnect => self.on_reconnect(), GetState(tx) => self.on_get_state(tx), @@ -842,7 +906,7 @@ where &mut self, event: ( AccountToken, - std::result::Result<mullvad_types::wireguard::WireguardData, wireguard::Error>, + Result<mullvad_types::wireguard::WireguardData, wireguard::Error>, ), ) { let (account, result) = event; @@ -905,7 +969,7 @@ where fn handle_new_account_event( &mut self, new_token: AccountToken, - tx: oneshot::Sender<std::result::Result<String, mullvad_rpc::Error>>, + tx: oneshot::Sender<Result<String, mullvad_rpc::Error>>, ) { match self.set_account(Some(new_token.clone())) { Ok(_) => { @@ -925,7 +989,7 @@ where fn on_set_target_state( &mut self, - tx: oneshot::Sender<std::result::Result<(), ()>>, + tx: oneshot::Sender<Result<(), ()>>, new_target_state: TargetState, ) { if self.state.is_running() { @@ -1008,27 +1072,23 @@ where }) } - fn on_create_new_account( - &mut self, - tx: oneshot::Sender<std::result::Result<String, mullvad_rpc::Error>>, - ) { + fn on_create_new_account(&mut self, tx: oneshot::Sender<Result<String, mullvad_rpc::Error>>) { let daemon_tx = self.tx.clone(); - let future = self.accounts_proxy.create_account().then( - move |result| -> std::result::Result<(), ()> { + let future = self + .accounts_proxy + .create_account() + .then(move |result| -> Result<(), ()> { match result { Ok(account_token) => { - let _ = daemon_tx.unbounded_send(InternalDaemonEvent::NewAccountEvent( - account_token, - tx, - )); + let _ = + daemon_tx.send(InternalDaemonEvent::NewAccountEvent(account_token, tx)); } Err(err) => { let _ = tx.send(Err(err)); } }; Ok(()) - }, - ); + }); if self.tokio_remote.execute(future).is_err() { log::error!("Failed to spawn future for creating a new account"); @@ -1099,10 +1159,7 @@ where } } - fn set_account( - &mut self, - account_token: Option<String>, - ) -> std::result::Result<bool, settings::Error> { + fn set_account(&mut self, account_token: Option<String>) -> Result<bool, settings::Error> { let account_changed = self.settings.set_account_token(account_token.clone())?; if account_changed { self.event_listener.notify_settings(self.settings.clone()); @@ -1284,7 +1341,7 @@ where fn on_set_bridge_settings( &mut self, - tx: oneshot::Sender<std::result::Result<(), settings::Error>>, + tx: oneshot::Sender<Result<(), settings::Error>>, new_settings: BridgeSettings, ) { match self.settings.set_bridge_settings(new_settings) { @@ -1308,7 +1365,7 @@ where fn on_set_bridge_state( &mut self, - tx: oneshot::Sender<std::result::Result<(), settings::Error>>, + tx: oneshot::Sender<Result<(), settings::Error>>, bridge_state: BridgeState, ) { let result = match self.settings.set_bridge_state(bridge_state) { @@ -1415,7 +1472,7 @@ where } fn on_generate_wireguard_key(&mut self, tx: oneshot::Sender<KeygenEvent>) { - let mut result = || -> std::result::Result<KeygenEvent, String> { + let mut result = || -> Result<KeygenEvent, String> { let account_token = self .settings .get_account_token() @@ -1539,7 +1596,7 @@ where fn oneshot_send<T>(tx: oneshot::Sender<T>, t: T, msg: &'static str) { if tx.send(t).is_err() { - warn!("Unable to send {} to management interface client", msg); + warn!("Unable to send {} to the daemon command sender", msg); } } @@ -1598,19 +1655,19 @@ where } #[cfg(not(target_os = "android"))] - fn clear_log_directory() -> Result<()> { + fn clear_log_directory() -> Result<(), Error> { let log_dir = mullvad_paths::get_log_dir().map_err(Error::PathError)?; Self::clear_directory(&log_dir) } #[cfg(not(target_os = "android"))] - fn clear_cache_directory() -> Result<()> { + fn clear_cache_directory() -> Result<(), Error> { let cache_dir = mullvad_paths::cache_dir().map_err(Error::PathError)?; Self::clear_directory(&cache_dir) } #[cfg(not(target_os = "android"))] - fn clear_directory(path: &Path) -> Result<()> { + fn clear_directory(path: &Path) -> Result<(), Error> { use std::fs; #[cfg(not(target_os = "windows"))] { @@ -1640,7 +1697,7 @@ where Error::RemoveDirError(entry.path().display().to_string(), e) }) }) - .collect::<Result<()>>() + .collect::<Result<(), Error>>() }) } } @@ -1654,28 +1711,28 @@ where } pub struct DaemonShutdownHandle { - tx: UnboundedSender<InternalDaemonEvent>, + tx: DaemonEventSender, } impl DaemonShutdownHandle { pub fn shutdown(&self) { - let _ = self.tx.unbounded_send(InternalDaemonEvent::TriggerShutdown); + let _ = self.tx.send(InternalDaemonEvent::TriggerShutdown); } } struct MullvadTunnelParametersGenerator { - tx: UnboundedSender<InternalDaemonEvent>, + tx: DaemonEventSender, } impl TunnelParametersGenerator for MullvadTunnelParametersGenerator { fn generate( &mut self, retry_attempt: u32, - ) -> std::result::Result<TunnelParameters, ParameterGenerationError> { + ) -> Result<TunnelParameters, ParameterGenerationError> { let (response_tx, response_rx) = mpsc::channel(); if self .tx - .unbounded_send(InternalDaemonEvent::GenerateTunnelParameters( + .send(InternalDaemonEvent::GenerateTunnelParameters( response_tx, retry_attempt, )) diff --git a/mullvad-daemon/src/main.rs b/mullvad-daemon/src/main.rs index 4fd4ac6796..4928ba277e 100644 --- a/mullvad-daemon/src/main.rs +++ b/mullvad-daemon/src/main.rs @@ -1,7 +1,11 @@ #![deny(rust_2018_idioms)] use log::{debug, error, info, warn}; -use mullvad_daemon::{logging, version, Daemon}; +use mullvad_daemon::{ + logging, + management_interface::{ManagementInterfaceEventBroadcaster, ManagementInterfaceServer}, + rpc_uniqueness_check, version, Daemon, DaemonCommandChannel, DaemonCommandSender, +}; use std::{path::PathBuf, thread, time::Duration}; use talpid_types::ErrorExt; @@ -84,6 +88,10 @@ fn run_platform(_config: &cli::Config, log_dir: Option<PathBuf>) -> Result<(), S } fn run_standalone(log_dir: Option<PathBuf>) -> Result<(), String> { + if rpc_uniqueness_check::is_another_instance_running() { + return Err("Another instance of the daemon is already running".to_owned()); + } + if !running_as_admin() { warn!("Running daemon as a non-administrator user, clients might refuse to connect"); } @@ -101,13 +109,42 @@ fn run_standalone(log_dir: Option<PathBuf>) -> Result<(), String> { Ok(()) } -fn create_daemon(log_dir: Option<PathBuf>) -> Result<Daemon, String> { +fn create_daemon( + log_dir: Option<PathBuf>, +) -> Result<Daemon<ManagementInterfaceEventBroadcaster>, String> { let resource_dir = mullvad_paths::get_resource_dir(); let cache_dir = mullvad_paths::cache_dir() .map_err(|e| e.display_chain_with_msg("Unable to get cache dir"))?; - Daemon::start(log_dir, resource_dir, cache_dir) - .map_err(|e| e.display_chain_with_msg("Unable to initialize daemon")) + let command_channel = DaemonCommandChannel::new(); + let event_listener = spawn_management_interface(command_channel.sender())?; + + Daemon::start( + log_dir, + resource_dir, + cache_dir, + event_listener, + command_channel, + ) + .map_err(|e| e.display_chain_with_msg("Unable to initialize daemon")) +} + +fn spawn_management_interface( + command_sender: DaemonCommandSender, +) -> Result<ManagementInterfaceEventBroadcaster, String> { + let server = ManagementInterfaceServer::start(command_sender).map_err(|error| { + error.display_chain_with_msg("Unable to start management interface server") + })?; + let event_broadcaster = server.event_broadcaster(); + + info!("Management interface listening on {}", server.socket_path()); + + thread::spawn(|| { + server.wait(); + info!("Management interface shut down"); + }); + + Ok(event_broadcaster) } #[cfg(unix)] diff --git a/mullvad-daemon/src/management_interface.rs b/mullvad-daemon/src/management_interface.rs index 22358881ed..8da3d4dabf 100644 --- a/mullvad-daemon/src/management_interface.rs +++ b/mullvad-daemon/src/management_interface.rs @@ -1,10 +1,6 @@ -use crate::EventListener; +use crate::{BoxFuture, DaemonCommand, DaemonCommandSender, EventListener}; use jsonrpc_core::{ - futures::{ - future, - sync::{self, oneshot::Sender as OneshotSender}, - Future, - }, + futures::{future, sync, Future}, Error, ErrorCode, MetaIoHandler, Metadata, }; use jsonrpc_ipc_server; @@ -21,21 +17,15 @@ use mullvad_types::{ states::{TargetState, TunnelState}, version, wireguard, DaemonEvent, }; -use parking_lot::{Mutex, RwLock}; +use parking_lot::RwLock; use std::{ collections::{hash_map::Entry, HashMap}, sync::Arc, }; -use talpid_core::mpsc::IntoSender; use talpid_ipc; use talpid_types::ErrorExt; use uuid; -/// FIXME(linus): This is here just because the futures crate has deprecated it and jsonrpc_core -/// did not introduce their own yet (https://github.com/paritytech/jsonrpc/pull/196). -/// Remove this and use the one in jsonrpc_core when that is released. -pub type BoxFuture<T, E> = Box<dyn Future<Item = T, Error = E> + Send>; - build_rpc_trait! { pub trait ManagementInterfaceApi { type Metadata; @@ -190,90 +180,13 @@ build_rpc_trait! { } } - -/// Enum representing commands coming in on the management interface. -pub enum ManagementCommand { - /// Set target state. Does nothing if the daemon already has the state that is being set. - SetTargetState(OneshotSender<Result<(), ()>>, TargetState), - /// Reconnect the tunnel, if one is connecting/connected. - Reconnect, - /// Request the current state. - GetState(OneshotSender<TunnelState>), - /// Get the current geographical location. - GetCurrentLocation(OneshotSender<Option<GeoIpLocation>>), - CreateNewAccount(OneshotSender<std::result::Result<String, mullvad_rpc::Error>>), - /// Request the metadata for an account. - GetAccountData( - OneshotSender<BoxFuture<AccountData, mullvad_rpc::Error>>, - AccountToken, - ), - /// Request www auth token for an account - GetWwwAuthToken(OneshotSender<BoxFuture<String, mullvad_rpc::Error>>), - /// Submit voucher to add time to the current account. Returns time added in seconds - SubmitVoucher( - OneshotSender<BoxFuture<VoucherSubmission, mullvad_rpc::Error>>, - String, - ), - /// Request account history - GetAccountHistory(OneshotSender<Vec<AccountToken>>), - /// Request account history - RemoveAccountFromHistory(OneshotSender<()>, AccountToken), - /// Get the list of countries and cities where there are relays. - GetRelayLocations(OneshotSender<RelayList>), - /// Trigger an asynchronous relay list update. This returns before the relay list is actually - /// updated. - UpdateRelayLocations, - /// Set which account token to use for subsequent connection attempts. - SetAccount(OneshotSender<()>, Option<AccountToken>), - /// Place constraints on the type of tunnel and relay - UpdateRelaySettings(OneshotSender<()>, RelaySettingsUpdate), - /// Set the allow LAN setting. - SetAllowLan(OneshotSender<()>, bool), - /// Set the block_when_disconnected setting. - SetBlockWhenDisconnected(OneshotSender<()>, bool), - /// Set the auto-connect setting. - SetAutoConnect(OneshotSender<()>, bool), - /// Set the mssfix argument for OpenVPN - SetOpenVpnMssfix(OneshotSender<()>, Option<u16>), - /// Set proxy details for OpenVPN - SetBridgeSettings(OneshotSender<Result<(), settings::Error>>, BridgeSettings), - /// Set proxy state - SetBridgeState(OneshotSender<Result<(), settings::Error>>, BridgeState), - /// Set if IPv6 should be enabled in the tunnel - SetEnableIpv6(OneshotSender<()>, bool), - /// Set MTU for wireguard tunnels - SetWireguardMtu(OneshotSender<()>, Option<u16>), - /// Set automatic key rotation interval for wireguard tunnels - SetWireguardRotationInterval(OneshotSender<()>, Option<u32>), - /// Get the daemon settings - GetSettings(OneshotSender<Settings>), - /// Generate new wireguard key - GenerateWireguardKey(OneshotSender<wireguard::KeygenEvent>), - /// Return a public key of the currently set wireguard private key, if there is one - GetWireguardKey(OneshotSender<Option<wireguard::PublicKey>>), - /// Verify if the currently set wireguard key is valid. - VerifyWireguardKey(OneshotSender<bool>), - /// Get information about the currently running and latest app versions - GetVersionInfo(OneshotSender<version::AppVersionInfo>), - /// Get current version of the app - GetCurrentVersion(OneshotSender<version::AppVersion>), - /// Remove settings and clear the cache - #[cfg(not(target_os = "android"))] - FactoryReset(OneshotSender<()>), - /// Makes the daemon exit the main loop and quit. - Shutdown, -} - pub struct ManagementInterfaceServer { server: talpid_ipc::IpcServer, subscriptions: Arc<RwLock<HashMap<SubscriptionId, pubsub::Sink<DaemonEvent>>>>, } impl ManagementInterfaceServer { - pub fn start<T>(tunnel_tx: IntoSender<ManagementCommand, T>) -> Result<Self, talpid_ipc::Error> - where - T: From<ManagementCommand> + 'static + Send, - { + pub fn start(tunnel_tx: DaemonCommandSender) -> Result<Self, talpid_ipc::Error> { let rpc = ManagementInterface::new(tunnel_tx); let subscriptions = rpc.subscriptions.clone(); @@ -363,25 +276,25 @@ impl Drop for ManagementInterfaceEventBroadcaster { } } -struct ManagementInterface<T: From<ManagementCommand> + 'static + Send> { +struct ManagementInterface { subscriptions: Arc<RwLock<HashMap<SubscriptionId, pubsub::Sink<DaemonEvent>>>>, - tx: Mutex<IntoSender<ManagementCommand, T>>, + tx: DaemonCommandSender, } -impl<T: From<ManagementCommand> + 'static + Send> ManagementInterface<T> { - pub fn new(tx: IntoSender<ManagementCommand, T>) -> Self { +impl ManagementInterface { + pub fn new(tx: DaemonCommandSender) -> Self { ManagementInterface { subscriptions: Default::default(), - tx: Mutex::new(tx), + tx, } } /// Sends a command to the daemon and maps the error to an RPC error. fn send_command_to_daemon( &self, - command: ManagementCommand, + command: DaemonCommand, ) -> impl Future<Item = (), Error = Error> { - future::result(self.tx.lock().send(command)).map_err(|_| Error::internal_error()) + future::result(self.tx.send(command)).map_err(|_| Error::internal_error()) } /// Converts the given error to an error that can be given to the caller of the API. @@ -403,15 +316,13 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterface<T> { } } -impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi - for ManagementInterface<T> -{ +impl ManagementInterfaceApi for ManagementInterface { type Metadata = Meta; fn create_new_account(&self, _: Self::Metadata) -> BoxFuture<String, Error> { let (tx, rx) = sync::oneshot::channel(); let future = self - .send_command_to_daemon(ManagementCommand::CreateNewAccount(tx)) + .send_command_to_daemon(DaemonCommand::CreateNewAccount(tx)) .and_then(|_| rx.map_err(|_| Error::internal_error())) .and_then(|result| match result { Ok(account_token) => Ok(account_token), @@ -429,7 +340,7 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi log::debug!("get_account_data"); let (tx, rx) = sync::oneshot::channel(); let future = self - .send_command_to_daemon(ManagementCommand::GetAccountData(tx, account_token)) + .send_command_to_daemon(DaemonCommand::GetAccountData(tx, account_token)) .and_then(|_| rx.map_err(|_| Error::internal_error())) .and_then(|rpc_future| { rpc_future.map_err(|error: mullvad_rpc::Error| { @@ -447,7 +358,7 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi log::debug!("get_account_data"); let (tx, rx) = sync::oneshot::channel(); let future = self - .send_command_to_daemon(ManagementCommand::GetWwwAuthToken(tx)) + .send_command_to_daemon(DaemonCommand::GetWwwAuthToken(tx)) .and_then(|_| rx.map_err(|_| Error::internal_error())) .and_then(|rpc_future| { rpc_future.map_err(|error: mullvad_rpc::Error| { @@ -469,7 +380,7 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi log::debug!("submit_voucher"); let (tx, rx) = sync::oneshot::channel(); let future = self - .send_command_to_daemon(ManagementCommand::SubmitVoucher(tx, voucher)) + .send_command_to_daemon(DaemonCommand::SubmitVoucher(tx, voucher)) .and_then(|_| rx.map_err(|_| Error::internal_error())) .and_then(|f| f.map_err(|e| Self::map_rpc_error(&e))); Box::new(future) @@ -479,14 +390,14 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi log::debug!("get_relay_locations"); let (tx, rx) = sync::oneshot::channel(); let future = self - .send_command_to_daemon(ManagementCommand::GetRelayLocations(tx)) + .send_command_to_daemon(DaemonCommand::GetRelayLocations(tx)) .and_then(|_| rx.map_err(|_| Error::internal_error())); Box::new(future) } fn update_relay_locations(&self, _: Self::Metadata) -> BoxFuture<(), Error> { log::debug!("update_relay_locations"); - Box::new(self.send_command_to_daemon(ManagementCommand::UpdateRelayLocations)) + Box::new(self.send_command_to_daemon(DaemonCommand::UpdateRelayLocations)) } fn set_account( @@ -497,7 +408,7 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi log::debug!("set_account"); let (tx, rx) = sync::oneshot::channel(); let future = self - .send_command_to_daemon(ManagementCommand::SetAccount(tx, account_token)) + .send_command_to_daemon(DaemonCommand::SetAccount(tx, account_token)) .and_then(|_| rx.map_err(|_| Error::internal_error())); Box::new(future) } @@ -510,7 +421,7 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi log::debug!("update_relay_settings"); let (tx, rx) = sync::oneshot::channel(); - let message = ManagementCommand::UpdateRelaySettings(tx, constraints_update); + let message = DaemonCommand::UpdateRelaySettings(tx, constraints_update); let future = self .send_command_to_daemon(message) .and_then(|_| rx.map_err(|_| Error::internal_error())); @@ -521,7 +432,7 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi log::debug!("set_allow_lan({})", allow_lan); let (tx, rx) = sync::oneshot::channel(); let future = self - .send_command_to_daemon(ManagementCommand::SetAllowLan(tx, allow_lan)) + .send_command_to_daemon(DaemonCommand::SetAllowLan(tx, allow_lan)) .and_then(|_| rx.map_err(|_| Error::internal_error())); Box::new(future) } @@ -534,7 +445,7 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi log::debug!("set_block_when_disconnected({})", block_when_disconnected); let (tx, rx) = sync::oneshot::channel(); let future = self - .send_command_to_daemon(ManagementCommand::SetBlockWhenDisconnected( + .send_command_to_daemon(DaemonCommand::SetBlockWhenDisconnected( tx, block_when_disconnected, )) @@ -546,7 +457,7 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi log::debug!("set_auto_connect({})", auto_connect); let (tx, rx) = sync::oneshot::channel(); let future = self - .send_command_to_daemon(ManagementCommand::SetAutoConnect(tx, auto_connect)) + .send_command_to_daemon(DaemonCommand::SetAutoConnect(tx, auto_connect)) .and_then(|_| rx.map_err(|_| Error::internal_error())); Box::new(future) } @@ -555,7 +466,7 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi log::debug!("connect"); let (tx, rx) = sync::oneshot::channel(); let future = self - .send_command_to_daemon(ManagementCommand::SetTargetState(tx, TargetState::Secured)) + .send_command_to_daemon(DaemonCommand::SetTargetState(tx, TargetState::Secured)) .and_then(|_| rx.map_err(|_| Error::internal_error())) .and_then(|result| match result { Ok(()) => future::ok(()), @@ -572,17 +483,14 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi log::debug!("disconnect"); let (tx, _) = sync::oneshot::channel(); let future = self - .send_command_to_daemon(ManagementCommand::SetTargetState( - tx, - TargetState::Unsecured, - )) + .send_command_to_daemon(DaemonCommand::SetTargetState(tx, TargetState::Unsecured)) .then(|_| future::ok(())); Box::new(future) } fn reconnect(&self, _: Self::Metadata) -> BoxFuture<(), Error> { log::debug!("reconnect"); - let future = self.send_command_to_daemon(ManagementCommand::Reconnect); + let future = self.send_command_to_daemon(DaemonCommand::Reconnect); Box::new(future) } @@ -590,7 +498,7 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi log::debug!("get_state"); let (state_tx, state_rx) = sync::oneshot::channel(); let future = self - .send_command_to_daemon(ManagementCommand::GetState(state_tx)) + .send_command_to_daemon(DaemonCommand::GetState(state_tx)) .and_then(|_| state_rx.map_err(|_| Error::internal_error())); Box::new(future) } @@ -599,21 +507,21 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi log::debug!("get_current_location"); let (tx, rx) = sync::oneshot::channel(); let future = self - .send_command_to_daemon(ManagementCommand::GetCurrentLocation(tx)) + .send_command_to_daemon(DaemonCommand::GetCurrentLocation(tx)) .and_then(|_| rx.map_err(|_| Error::internal_error())); Box::new(future) } fn shutdown(&self, _: Self::Metadata) -> BoxFuture<(), Error> { log::debug!("shutdown"); - Box::new(self.send_command_to_daemon(ManagementCommand::Shutdown)) + Box::new(self.send_command_to_daemon(DaemonCommand::Shutdown)) } fn get_account_history(&self, _: Self::Metadata) -> BoxFuture<Vec<AccountToken>, Error> { log::debug!("get_account_history"); let (tx, rx) = sync::oneshot::channel(); let future = self - .send_command_to_daemon(ManagementCommand::GetAccountHistory(tx)) + .send_command_to_daemon(DaemonCommand::GetAccountHistory(tx)) .and_then(|_| rx.map_err(|_| Error::internal_error())); Box::new(future) } @@ -626,10 +534,7 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi log::debug!("remove_account_from_history"); let (tx, rx) = sync::oneshot::channel(); let future = self - .send_command_to_daemon(ManagementCommand::RemoveAccountFromHistory( - tx, - account_token, - )) + .send_command_to_daemon(DaemonCommand::RemoveAccountFromHistory(tx, account_token)) .and_then(|_| rx.map_err(|_| Error::internal_error())); Box::new(future) } @@ -638,7 +543,7 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi log::debug!("set_openvpn_mssfix({:?})", mssfix); let (tx, rx) = sync::oneshot::channel(); let future = self - .send_command_to_daemon(ManagementCommand::SetOpenVpnMssfix(tx, mssfix)) + .send_command_to_daemon(DaemonCommand::SetOpenVpnMssfix(tx, mssfix)) .and_then(|_| rx.map_err(|_| Error::internal_error())); Box::new(future) @@ -652,7 +557,7 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi log::debug!("set_bridge_settings({:?})", bridge_settings); let (tx, rx) = sync::oneshot::channel(); let future = self - .send_command_to_daemon(ManagementCommand::SetBridgeSettings(tx, bridge_settings)) + .send_command_to_daemon(DaemonCommand::SetBridgeSettings(tx, bridge_settings)) .and_then(|_| rx.map_err(|_| Error::internal_error())) .and_then(|settings_result| { settings_result.map_err(|error| match error { @@ -672,7 +577,7 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi log::debug!("set_bridge_state({:?})", bridge_state); let (tx, rx) = sync::oneshot::channel(); let future = self - .send_command_to_daemon(ManagementCommand::SetBridgeState(tx, bridge_state)) + .send_command_to_daemon(DaemonCommand::SetBridgeState(tx, bridge_state)) .and_then(|_| rx.map_err(|_| Error::internal_error())) .and_then(|settings_result| settings_result.map_err(|_| Error::internal_error())); @@ -683,7 +588,7 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi log::debug!("set_enable_ipv6({})", enable_ipv6); let (tx, rx) = sync::oneshot::channel(); let future = self - .send_command_to_daemon(ManagementCommand::SetEnableIpv6(tx, enable_ipv6)) + .send_command_to_daemon(DaemonCommand::SetEnableIpv6(tx, enable_ipv6)) .and_then(|_| rx.map_err(|_| Error::internal_error())); Box::new(future) @@ -694,7 +599,7 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi log::debug!("set_wireguard_mtu({:?})", mtu); let (tx, rx) = sync::oneshot::channel(); let future = self - .send_command_to_daemon(ManagementCommand::SetWireguardMtu(tx, mtu)) + .send_command_to_daemon(DaemonCommand::SetWireguardMtu(tx, mtu)) .and_then(|_| rx.map_err(|_| Error::internal_error())); Box::new(future) } @@ -708,9 +613,7 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi log::debug!("set_wireguard_rotation_interval({:?})", interval); let (tx, rx) = sync::oneshot::channel(); let future = self - .send_command_to_daemon(ManagementCommand::SetWireguardRotationInterval( - tx, interval, - )) + .send_command_to_daemon(DaemonCommand::SetWireguardRotationInterval(tx, interval)) .and_then(|_| rx.map_err(|_| Error::internal_error())); Box::new(future) } @@ -719,7 +622,7 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi log::debug!("get_settings"); let (tx, rx) = sync::oneshot::channel(); let future = self - .send_command_to_daemon(ManagementCommand::GetSettings(tx)) + .send_command_to_daemon(DaemonCommand::GetSettings(tx)) .and_then(|_| rx.map_err(|_| Error::internal_error())); Box::new(future) } @@ -731,7 +634,7 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi log::debug!("generate_wireguard_key"); let (tx, rx) = sync::oneshot::channel(); let future = self - .send_command_to_daemon(ManagementCommand::GenerateWireguardKey(tx)) + .send_command_to_daemon(DaemonCommand::GenerateWireguardKey(tx)) .and_then(|_| rx.map_err(|_| Error::internal_error())); Box::new(future) } @@ -743,7 +646,7 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi log::debug!("get_wireguard_key"); let (tx, rx) = sync::oneshot::channel(); let future = self - .send_command_to_daemon(ManagementCommand::GetWireguardKey(tx)) + .send_command_to_daemon(DaemonCommand::GetWireguardKey(tx)) .and_then(|_| rx.map_err(|_| Error::internal_error())); Box::new(future) } @@ -752,7 +655,7 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi log::debug!("verify_wireguard_key"); let (tx, rx) = sync::oneshot::channel(); let future = self - .send_command_to_daemon(ManagementCommand::VerifyWireguardKey(tx)) + .send_command_to_daemon(DaemonCommand::VerifyWireguardKey(tx)) .and_then(|_| rx.map_err(|_| Error::internal_error())); Box::new(future) } @@ -761,7 +664,7 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi log::debug!("get_current_version"); let (tx, rx) = sync::oneshot::channel(); let future = self - .send_command_to_daemon(ManagementCommand::GetCurrentVersion(tx)) + .send_command_to_daemon(DaemonCommand::GetCurrentVersion(tx)) .and_then(|_| rx.map_err(|_| Error::internal_error())); Box::new(future) @@ -771,7 +674,7 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi log::debug!("get_version_info"); let (tx, rx) = sync::oneshot::channel(); let future = self - .send_command_to_daemon(ManagementCommand::GetVersionInfo(tx)) + .send_command_to_daemon(DaemonCommand::GetVersionInfo(tx)) .and_then(|_| rx.map_err(|_| Error::internal_error())); Box::new(future) @@ -783,7 +686,7 @@ impl<T: From<ManagementCommand> + 'static + Send> ManagementInterfaceApi log::debug!("factory_reset"); let (tx, rx) = sync::oneshot::channel(); let future = self - .send_command_to_daemon(ManagementCommand::FactoryReset(tx)) + .send_command_to_daemon(DaemonCommand::FactoryReset(tx)) .and_then(|_| rx.map_err(|_| Error::internal_error())); Box::new(future) diff --git a/mullvad-daemon/src/version_check.rs b/mullvad-daemon/src/version_check.rs index da24b85ea3..65b960a7b1 100644 --- a/mullvad-daemon/src/version_check.rs +++ b/mullvad-daemon/src/version_check.rs @@ -1,5 +1,5 @@ -use crate::version::PRODUCT_VERSION; -use futures::{sync::mpsc::UnboundedSender, Async, Future, Poll}; +use crate::{version::PRODUCT_VERSION, DaemonEventSender}; +use futures::{Async, Future, Poll}; use mullvad_rpc::{AppVersionProxy, HttpHandle}; use mullvad_types::version::AppVersionInfo; use serde::{Deserialize, Serialize}; @@ -9,6 +9,7 @@ use std::{ path::{Path, PathBuf}, time::{Duration, Instant}, }; +use talpid_core::mpsc::Sender; use talpid_types::ErrorExt; use tokio_timer::{TimeoutError, Timer}; @@ -80,10 +81,10 @@ impl<T> From<TimeoutError<T>> for Error { } -pub struct VersionUpdater<T: From<AppVersionInfo>> { +pub(crate) struct VersionUpdater { version_proxy: AppVersionProxy<HttpHandle>, cache_path: PathBuf, - update_sender: UnboundedSender<T>, + update_sender: DaemonEventSender<AppVersionInfo>, last_app_version_info: AppVersionInfo, next_update_time: Instant, state: VersionUpdaterState, @@ -94,11 +95,11 @@ enum VersionUpdaterState { Updating(Box<dyn Future<Item = AppVersionInfo, Error = Error> + Send + 'static>), } -impl<T: From<AppVersionInfo>> VersionUpdater<T> { +impl VersionUpdater { pub fn new( rpc_handle: HttpHandle, cache_dir: PathBuf, - update_sender: UnboundedSender<T>, + update_sender: DaemonEventSender<AppVersionInfo>, last_app_version_info: AppVersionInfo, ) -> Self { let version_proxy = AppVersionProxy::new(rpc_handle); @@ -140,7 +141,7 @@ impl<T: From<AppVersionInfo>> VersionUpdater<T> { } } -impl<T: From<AppVersionInfo>> Future for VersionUpdater<T> { +impl Future for VersionUpdater { type Item = (); type Error = (); @@ -176,11 +177,7 @@ impl<T: From<AppVersionInfo>> Future for VersionUpdater<T> { log::debug!("Got new version check: {:?}", app_version_info); self.next_update_time = Instant::now() + UPDATE_INTERVAL; if app_version_info != self.last_app_version_info { - if self - .update_sender - .unbounded_send(app_version_info.clone().into()) - .is_err() - { + if self.update_sender.send(app_version_info.clone()).is_err() { log::warn!( "Version update receiver is closed, stopping version updater" ); diff --git a/mullvad-daemon/src/wireguard.rs b/mullvad-daemon/src/wireguard.rs index 09c5f01461..9b234dfaab 100644 --- a/mullvad-daemon/src/wireguard.rs +++ b/mullvad-daemon/src/wireguard.rs @@ -1,15 +1,11 @@ -use crate::{account_history::AccountHistory, InternalDaemonEvent}; +use crate::{account_history::AccountHistory, DaemonEventSender, InternalDaemonEvent}; use chrono::offset::Utc; -use futures::{ - future::Executor, - stream::Stream, - sync::{mpsc::UnboundedSender, oneshot}, - Async, Future, Poll, -}; +use futures::{future::Executor, stream::Stream, sync::oneshot, Async, Future, Poll}; use jsonrpc_client_core::Error as JsonRpcError; use mullvad_types::account::AccountToken; pub use mullvad_types::wireguard::*; use std::time::Duration; +use talpid_core::mpsc::Sender; pub use talpid_types::net::wireguard::{ ConnectionConfig, PrivateKey, TunnelConfig, TunnelParameters, }; @@ -46,7 +42,7 @@ pub enum Error { pub type Result<T> = std::result::Result<T, Error>; pub struct KeyManager { - daemon_tx: UnboundedSender<InternalDaemonEvent>, + daemon_tx: DaemonEventSender, http_handle: mullvad_rpc::HttpHandle, tokio_remote: Remote, current_job: Option<CancelHandle>, @@ -57,7 +53,7 @@ pub struct KeyManager { impl KeyManager { pub(crate) fn new( - daemon_tx: UnboundedSender<InternalDaemonEvent>, + daemon_tx: DaemonEventSender, http_handle: mullvad_rpc::HttpHandle, tokio_remote: Remote, ) -> Self { @@ -199,14 +195,13 @@ impl KeyManager { let fut = fut.then(move |result| { match result { Ok(wireguard_data) => { - let _ = daemon_tx.unbounded_send(InternalDaemonEvent::WgKeyEvent(( + let _ = daemon_tx.send(InternalDaemonEvent::WgKeyEvent(( account, Ok(wireguard_data), ))); } Err(CancelErr::Inner(e)) => { - let _ = daemon_tx - .unbounded_send(InternalDaemonEvent::WgKeyEvent((account, Err(e)))); + let _ = daemon_tx.send(InternalDaemonEvent::WgKeyEvent((account, Err(e)))); } Err(CancelErr::Cancelled) => { log::error!("Key generation cancelled"); @@ -294,7 +289,7 @@ impl KeyManager { } fn next_automatic_rotation( - daemon_tx: UnboundedSender<InternalDaemonEvent>, + daemon_tx: DaemonEventSender, http_handle: mullvad_rpc::HttpHandle, public_key: PublicKey, rotation_interval_secs: u64, @@ -315,14 +310,14 @@ impl KeyManager { match rpc_result { Ok(data) => { // Update account data - let _ = daemon_tx.unbounded_send(InternalDaemonEvent::WgKeyEvent(( + let _ = daemon_tx.send(InternalDaemonEvent::WgKeyEvent(( account_token_copy, Ok(data.clone()), ))); Ok(data.get_public_key()) } Err(Error::TooManyKeys) => { - let _ = daemon_tx.unbounded_send(InternalDaemonEvent::WgKeyEvent(( + let _ = daemon_tx.send(InternalDaemonEvent::WgKeyEvent(( account_token_copy, Err(Error::TooManyKeys), ))); @@ -334,7 +329,7 @@ impl KeyManager { } fn create_automatic_rotation( - daemon_tx: UnboundedSender<InternalDaemonEvent>, + daemon_tx: DaemonEventSender, http_handle: mullvad_rpc::HttpHandle, public_key: PublicKey, rotation_interval_secs: u64, diff --git a/mullvad-jni/src/daemon_interface.rs b/mullvad-jni/src/daemon_interface.rs index ceda7b5fd6..4be2a35c94 100644 --- a/mullvad-jni/src/daemon_interface.rs +++ b/mullvad-jni/src/daemon_interface.rs @@ -1,5 +1,5 @@ use futures::{sync::oneshot, Future}; -use mullvad_daemon::{DaemonCommandSender, ManagementCommand}; +use mullvad_daemon::{DaemonCommand, DaemonCommandSender}; use mullvad_types::{ account::AccountData, location::GeoIpLocation, @@ -40,7 +40,7 @@ impl DaemonInterface { pub fn connect(&self) -> Result<()> { let (tx, rx) = oneshot::channel(); - self.send_command(ManagementCommand::SetTargetState(tx, TargetState::Secured))?; + self.send_command(DaemonCommand::SetTargetState(tx, TargetState::Secured))?; rx.wait().map_err(|_| Error::NoResponse)?.unwrap(); @@ -50,10 +50,7 @@ impl DaemonInterface { pub fn disconnect(&self) -> Result<()> { let (tx, rx) = oneshot::channel(); - self.send_command(ManagementCommand::SetTargetState( - tx, - TargetState::Unsecured, - ))?; + self.send_command(DaemonCommand::SetTargetState(tx, TargetState::Unsecured))?; rx.wait().map_err(|_| Error::NoResponse)?.unwrap(); @@ -63,7 +60,7 @@ impl DaemonInterface { pub fn generate_wireguard_key(&self) -> Result<KeygenEvent> { let (tx, rx) = oneshot::channel(); - self.send_command(ManagementCommand::GenerateWireguardKey(tx))?; + self.send_command(DaemonCommand::GenerateWireguardKey(tx))?; rx.wait().map_err(|_| Error::NoResponse) } @@ -71,7 +68,7 @@ impl DaemonInterface { pub fn get_account_data(&self, account_token: String) -> Result<AccountData> { let (tx, rx) = oneshot::channel(); - self.send_command(ManagementCommand::GetAccountData(tx, account_token))?; + self.send_command(DaemonCommand::GetAccountData(tx, account_token))?; rx.wait() .map_err(|_| Error::NoResponse)? @@ -82,7 +79,7 @@ impl DaemonInterface { pub fn get_account_history(&self) -> Result<Vec<String>> { let (tx, rx) = oneshot::channel(); - self.send_command(ManagementCommand::GetAccountHistory(tx))?; + self.send_command(DaemonCommand::GetAccountHistory(tx))?; rx.wait().map_err(|_| Error::NoResponse) } @@ -90,7 +87,7 @@ impl DaemonInterface { pub fn get_www_auth_token(&self) -> Result<String> { let (tx, rx) = oneshot::channel(); - self.send_command(ManagementCommand::GetWwwAuthToken(tx))?; + self.send_command(DaemonCommand::GetWwwAuthToken(tx))?; rx.wait() .map_err(|_| Error::NoResponse)? @@ -101,7 +98,7 @@ impl DaemonInterface { pub fn get_current_location(&self) -> Result<Option<GeoIpLocation>> { let (tx, rx) = oneshot::channel(); - self.send_command(ManagementCommand::GetCurrentLocation(tx))?; + self.send_command(DaemonCommand::GetCurrentLocation(tx))?; Ok(rx.wait().map_err(|_| Error::NoResponse)?) } @@ -109,7 +106,7 @@ impl DaemonInterface { pub fn get_current_version(&self) -> Result<String> { let (tx, rx) = oneshot::channel(); - self.send_command(ManagementCommand::GetCurrentVersion(tx))?; + self.send_command(DaemonCommand::GetCurrentVersion(tx))?; Ok(rx.wait().map_err(|_| Error::NoResponse)?) } @@ -117,7 +114,7 @@ impl DaemonInterface { pub fn get_relay_locations(&self) -> Result<RelayList> { let (tx, rx) = oneshot::channel(); - self.send_command(ManagementCommand::GetRelayLocations(tx))?; + self.send_command(DaemonCommand::GetRelayLocations(tx))?; Ok(rx.wait().map_err(|_| Error::NoResponse)?) } @@ -125,7 +122,7 @@ impl DaemonInterface { pub fn get_settings(&self) -> Result<Settings> { let (tx, rx) = oneshot::channel(); - self.send_command(ManagementCommand::GetSettings(tx))?; + self.send_command(DaemonCommand::GetSettings(tx))?; Ok(rx.wait().map_err(|_| Error::NoResponse)?) } @@ -133,7 +130,7 @@ impl DaemonInterface { pub fn get_state(&self) -> Result<TunnelState> { let (tx, rx) = oneshot::channel(); - self.send_command(ManagementCommand::GetState(tx))?; + self.send_command(DaemonCommand::GetState(tx))?; Ok(rx.wait().map_err(|_| Error::NoResponse)?) } @@ -141,13 +138,13 @@ impl DaemonInterface { pub fn get_version_info(&self) -> Result<AppVersionInfo> { let (tx, rx) = oneshot::channel(); - self.send_command(ManagementCommand::GetVersionInfo(tx))?; + self.send_command(DaemonCommand::GetVersionInfo(tx))?; rx.wait().map_err(|_| Error::NoResponse) } pub fn reconnect(&self) -> Result<()> { - self.send_command(ManagementCommand::Reconnect)?; + self.send_command(DaemonCommand::Reconnect)?; Ok(()) } @@ -155,7 +152,7 @@ impl DaemonInterface { pub fn get_wireguard_key(&self) -> Result<Option<wireguard::PublicKey>> { let (tx, rx) = oneshot::channel(); - self.send_command(ManagementCommand::GetWireguardKey(tx))?; + self.send_command(DaemonCommand::GetWireguardKey(tx))?; rx.wait().map_err(|_| Error::NoResponse) } @@ -163,14 +160,14 @@ impl DaemonInterface { pub fn verify_wireguard_key(&self) -> Result<bool> { let (tx, rx) = oneshot::channel(); - self.send_command(ManagementCommand::VerifyWireguardKey(tx))?; + self.send_command(DaemonCommand::VerifyWireguardKey(tx))?; rx.wait().map_err(|_| Error::NoResponse) } pub fn set_account(&self, account_token: Option<String>) -> Result<()> { let (tx, rx) = oneshot::channel(); - self.send_command(ManagementCommand::SetAccount(tx, account_token))?; + self.send_command(DaemonCommand::SetAccount(tx, account_token))?; rx.wait().map_err(|_| Error::NoResponse) } @@ -178,7 +175,7 @@ impl DaemonInterface { pub fn set_allow_lan(&self, allow_lan: bool) -> Result<()> { let (tx, rx) = oneshot::channel(); - self.send_command(ManagementCommand::SetAllowLan(tx, allow_lan))?; + self.send_command(DaemonCommand::SetAllowLan(tx, allow_lan))?; rx.wait().map_err(|_| Error::NoResponse) } @@ -186,24 +183,24 @@ impl DaemonInterface { pub fn set_auto_connect(&self, auto_connect: bool) -> Result<()> { let (tx, rx) = oneshot::channel(); - self.send_command(ManagementCommand::SetAutoConnect(tx, auto_connect))?; + self.send_command(DaemonCommand::SetAutoConnect(tx, auto_connect))?; rx.wait().map_err(|_| Error::NoResponse) } pub fn shutdown(&self) -> Result<()> { - self.send_command(ManagementCommand::Shutdown) + self.send_command(DaemonCommand::Shutdown) } pub fn update_relay_settings(&self, update: RelaySettingsUpdate) -> Result<()> { let (tx, rx) = oneshot::channel(); - self.send_command(ManagementCommand::UpdateRelaySettings(tx, update))?; + self.send_command(DaemonCommand::UpdateRelaySettings(tx, update))?; rx.wait().map_err(|_| Error::NoResponse) } - fn send_command(&self, command: ManagementCommand) -> Result<()> { + fn send_command(&self, command: DaemonCommand) -> Result<()> { self.command_sender.send(command).map_err(Error::NoDaemon) } } diff --git a/mullvad-jni/src/lib.rs b/mullvad-jni/src/lib.rs index 3be27632b4..b9ff6a40fb 100644 --- a/mullvad-jni/src/lib.rs +++ b/mullvad-jni/src/lib.rs @@ -17,7 +17,7 @@ use jnix::{ FromJava, IntoJava, JnixEnv, }; use lazy_static::lazy_static; -use mullvad_daemon::{logging, version, Daemon, DaemonCommandSender}; +use mullvad_daemon::{logging, version, Daemon, DaemonCommandChannel}; use mullvad_types::account::AccountData; use std::{ path::{Path, PathBuf}, @@ -131,8 +131,10 @@ fn initialize( log_dir: PathBuf, ) -> Result<(), Error> { let android_context = create_android_context(env, *vpn_service)?; - let daemon_command_sender = spawn_daemon(env, this, log_dir, android_context)?; - let daemon_interface = Box::new(DaemonInterface::new(daemon_command_sender)); + let daemon_command_channel = DaemonCommandChannel::new(); + let daemon_interface = Box::new(DaemonInterface::new(daemon_command_channel.sender())); + + spawn_daemon(env, this, log_dir, daemon_command_channel, android_context)?; set_daemon_interface_address(env, this, Box::into_raw(daemon_interface) as jlong); @@ -155,8 +157,9 @@ fn spawn_daemon( env: &JnixEnv<'_>, this: &JObject<'_>, log_dir: PathBuf, + command_channel: DaemonCommandChannel, android_context: AndroidContext, -) -> Result<DaemonCommandSender, Error> { +) -> Result<(), Error> { let listener = JniEventListener::spawn(env, this).map_err(Error::SpawnJniEventListener)?; let daemon_object = env .new_global_ref(*this) @@ -166,9 +169,9 @@ fn spawn_daemon( thread::spawn(move || { let jvm = android_context.jvm.clone(); - match create_daemon(listener, log_dir, android_context) { + match create_daemon(listener, log_dir, command_channel, android_context) { Ok(daemon) => { - let _ = tx.send(Ok(daemon.command_sender())); + let _ = tx.send(Ok(())); match daemon.run() { Ok(()) => log::info!("Mullvad daemon has stopped"), Err(error) => log::error!("{}", error.display_chain()), @@ -188,21 +191,21 @@ fn spawn_daemon( fn create_daemon( listener: JniEventListener, log_dir: PathBuf, + command_channel: DaemonCommandChannel, android_context: AndroidContext, ) -> Result<Daemon<JniEventListener>, Error> { let resource_dir = mullvad_paths::get_resource_dir(); let cache_dir = mullvad_paths::cache_dir().map_err(Error::GetCacheDir)?; - let daemon = Daemon::start_with_event_listener( - listener, + Daemon::start( Some(log_dir), resource_dir, cache_dir, + listener, + command_channel, android_context, ) - .map_err(Error::InitializeDaemon)?; - - Ok(daemon) + .map_err(Error::InitializeDaemon) } fn notify_daemon_stopped(jvm: Arc<JavaVM>, daemon_object: GlobalRef) { diff --git a/talpid-core/src/mpsc.rs b/talpid-core/src/mpsc.rs index 21807ca377..050b90c81c 100644 --- a/talpid-core/src/mpsc.rs +++ b/talpid-core/src/mpsc.rs @@ -1,84 +1,5 @@ -use futures::sync::mpsc::{SendError, UnboundedSender}; -use std::marker::PhantomData; - -/// Abstraction over an `mpsc::Sender` that first converts the value to another type before sending. -#[derive(Debug, Clone)] -pub struct IntoSender<T, U> { - sender: UnboundedSender<U>, - _marker: PhantomData<T>, -} - -impl<T, U> IntoSender<T, U> -where - T: Into<U>, -{ - /// Converts the `T` into a `U` and sends it on the channel. - pub fn send(&self, t: T) -> Result<(), SendError<U>> { - self.sender.unbounded_send(t.into()) - } -} - -impl<T, U> From<UnboundedSender<U>> for IntoSender<T, U> -where - T: Into<U>, -{ - fn from(sender: UnboundedSender<U>) -> Self { - IntoSender { - sender, - _marker: PhantomData, - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use futures::{sync::mpsc, Stream}; - use std::thread; - - #[derive(Debug, Eq, PartialEq)] - enum Inner { - One, - Two, - } - - #[derive(Debug, Eq, PartialEq)] - enum Outer { - Inner(Inner), - Other, - } - - impl From<Inner> for Outer { - fn from(o: Inner) -> Self { - Outer::Inner(o) - } - } - - #[test] - fn sender() { - let (tx, rx) = mpsc::unbounded(); - let inner_tx: IntoSender<Inner, Outer> = tx.clone().into(); - - tx.unbounded_send(Outer::Other).unwrap(); - inner_tx.send(Inner::Two).unwrap(); - - let mut sync_rx = rx.wait(); - - assert_eq!(Outer::Other, sync_rx.next().unwrap().unwrap()); - assert_eq!(Outer::Inner(Inner::Two), sync_rx.next().unwrap().unwrap()); - } - - #[test] - fn send_between_thread() { - let (tx, rx) = mpsc::unbounded(); - let inner_tx: IntoSender<Inner, Outer> = tx.clone().into(); - - thread::spawn(move || { - inner_tx.send(Inner::One).unwrap(); - }); - - let mut sync_rx = rx.wait(); - - assert_eq!(Outer::Inner(Inner::One), sync_rx.next().unwrap().unwrap()); - } +/// Abstraction over any type that can be used similarly to an `std::mpsc::Sender`. +pub trait Sender<T> { + /// Sends an item over the underlying channel, failing only if the channel is closed. + fn send(&self, item: T) -> Result<(), ()>; } diff --git a/talpid-core/src/tunnel_state_machine/mod.rs b/talpid-core/src/tunnel_state_machine/mod.rs index 3fb2e4e757..773a14e8da 100644 --- a/talpid-core/src/tunnel_state_machine/mod.rs +++ b/talpid-core/src/tunnel_state_machine/mod.rs @@ -17,7 +17,7 @@ use self::{ use crate::{ dns::DnsMonitor, firewall::{Firewall, FirewallArguments}, - mpsc::IntoSender, + mpsc::Sender, offline, tunnel::tun_provider::TunProvider, }; @@ -62,20 +62,16 @@ pub enum Error { } /// Spawn the tunnel state machine thread, returning a channel for sending tunnel commands. -pub fn spawn<P, T>( +pub fn spawn( allow_lan: bool, block_when_disconnected: bool, tunnel_parameters_generator: impl TunnelParametersGenerator, log_dir: Option<PathBuf>, resource_dir: PathBuf, - cache_dir: P, - state_change_listener: IntoSender<TunnelStateTransition, T>, + cache_dir: impl AsRef<Path> + Send + 'static, + state_change_listener: impl Sender<TunnelStateTransition> + Send + 'static, #[cfg(target_os = "android")] android_context: AndroidContext, -) -> Result<Arc<mpsc::UnboundedSender<TunnelCommand>>, Error> -where - P: AsRef<Path> + Send + 'static, - T: From<TunnelStateTransition> + Send + 'static, -{ +) -> Result<Arc<mpsc::UnboundedSender<TunnelCommand>>, Error> { let (command_tx, command_rx) = mpsc::unbounded(); let command_tx = Arc::new(command_tx); let offline_monitor = offline::spawn_monitor( @@ -134,7 +130,7 @@ where Ok(command_tx) } -fn create_event_loop<T>( +fn create_event_loop( allow_lan: bool, block_when_disconnected: bool, is_offline: bool, @@ -144,11 +140,8 @@ fn create_event_loop<T>( resource_dir: PathBuf, cache_dir: impl AsRef<Path>, commands: mpsc::UnboundedReceiver<TunnelCommand>, - state_change_listener: IntoSender<TunnelStateTransition, T>, -) -> Result<(Core, impl Future<Item = (), Error = Error>), Error> -where - T: From<TunnelStateTransition> + Send + 'static, -{ + state_change_listener: impl Sender<TunnelStateTransition>, +) -> Result<(Core, impl Future<Item = (), Error = Error>), Error> { let reactor = Core::new().map_err(Error::ReactorError)?; let state_machine = TunnelStateMachine::new( allow_lan, |
