diff options
| author | Janito Vaqueiro Ferreira Filho <janito@mullvad.net> | 2019-05-22 05:58:07 -0300 |
|---|---|---|
| committer | Janito Vaqueiro Ferreira Filho <janito@mullvad.net> | 2019-05-22 05:58:07 -0300 |
| commit | 3c0099e0cfe1a29fb104513a9afc85b63ef987f7 (patch) | |
| tree | 137d8c58aeb07d45fc7caf4d8df184a4bf0d6db8 | |
| parent | 1c0fd7fa2809322547d127c13f177c717bebbac4 (diff) | |
| parent | c0d08d8d4dd00d06c0ae0be3cc25ca26b557fb94 (diff) | |
| download | mullvadvpn-3c0099e0cfe1a29fb104513a9afc85b63ef987f7.tar.xz mullvadvpn-3c0099e0cfe1a29fb104513a9afc85b63ef987f7.zip | |
Merge branch 'create-event-broadcaster-trait'
| -rw-r--r-- | mullvad-daemon/src/lib.rs | 179 | ||||
| -rw-r--r-- | mullvad-daemon/src/management_interface.rs | 34 |
2 files changed, 133 insertions, 80 deletions
diff --git a/mullvad-daemon/src/lib.rs b/mullvad-daemon/src/lib.rs index bc8bfef9ae..327252ada8 100644 --- a/mullvad-daemon/src/lib.rs +++ b/mullvad-daemon/src/lib.rs @@ -21,7 +21,9 @@ mod rpc_uniqueness_check; pub mod version; pub use crate::management_interface::ManagementCommand; -use crate::management_interface::{BoxFuture, ManagementInterfaceServer}; +use crate::management_interface::{ + BoxFuture, ManagementInterfaceEventBroadcaster, ManagementInterfaceServer, +}; use futures::{ future, sync::{mpsc::UnboundedSender, oneshot}, @@ -178,7 +180,19 @@ impl DaemonCommandSender { } } -pub struct Daemon { +/// Trait representing something that can broadcast daemon events. +pub trait EventListener { + /// Notify that the tunnel state changed. + fn notify_new_state(&self, new_state: TunnelStateTransition); + + /// Notify that the settings changed. + fn notify_settings(&self, settings: Settings); + + /// Notify that the relay list changed. + fn notify_relay_list(&self, relay_list: RelayList); +} + +pub struct Daemon<L: EventListener = ManagementInterfaceEventBroadcaster> { tunnel_command_tx: SyncUnboundedSender<TunnelCommand>, tunnel_state: TunnelStateTransition, target_state: TargetState, @@ -186,7 +200,7 @@ pub struct Daemon { rx: mpsc::Receiver<InternalDaemonEvent>, tx: mpsc::Sender<InternalDaemonEvent>, reconnection_loop_tx: Option<mpsc::Sender<()>>, - management_interface_broadcaster: management_interface::EventBroadcaster, + event_listener: L, settings: Settings, account_history: account_history::AccountHistory, wg_key_proxy: WireguardKeyProxy<HttpHandle>, @@ -199,7 +213,7 @@ pub struct Daemon { version: String, } -impl Daemon { +impl Daemon<ManagementInterfaceEventBroadcaster> { pub fn start( log_dir: Option<PathBuf>, resource_dir: PathBuf, @@ -209,6 +223,87 @@ impl Daemon { if rpc_uniqueness_check::is_another_instance_running() { return Err(Error::DaemonIsAlreadyRunning); } + let (tx, rx) = mpsc::channel(); + let management_interface_broadcaster = Self::start_management_interface(tx.clone())?; + + Self::start_internal( + tx, + rx, + management_interface_broadcaster, + log_dir, + resource_dir, + cache_dir, + version, + ) + } + + // Starts the management interface and spawns a thread that will process it. + // Returns a handle that allows notifying all subscribers on events. + fn start_management_interface( + event_tx: mpsc::Sender<InternalDaemonEvent>, + ) -> Result<ManagementInterfaceEventBroadcaster> { + let multiplex_event_tx = IntoSender::from(event_tx.clone()); + let server = Self::start_management_interface_server(multiplex_event_tx)?; + let event_broadcaster = server.event_broadcaster(); + Self::spawn_management_interface_wait_thread(server, event_tx); + Ok(event_broadcaster) + } + + fn start_management_interface_server( + event_tx: IntoSender<ManagementCommand, InternalDaemonEvent>, + ) -> Result<ManagementInterfaceServer> { + let server = + ManagementInterfaceServer::start(event_tx).map_err(Error::StartManagementInterface)?; + info!("Management interface listening on {}", server.socket_path()); + + Ok(server) + } + + fn spawn_management_interface_wait_thread( + server: ManagementInterfaceServer, + exit_tx: mpsc::Sender<InternalDaemonEvent>, + ) { + thread::spawn(move || { + server.wait(); + info!("Management interface shut down"); + let _ = exit_tx.send(InternalDaemonEvent::ManagementInterfaceExited); + }); + } +} + +impl<L> Daemon<L> +where + L: EventListener + Clone + Send + 'static, +{ + pub fn start_with_event_listener( + event_listener: L, + log_dir: Option<PathBuf>, + resource_dir: PathBuf, + cache_dir: PathBuf, + version: String, + ) -> Result<Self> { + let (tx, rx) = mpsc::channel(); + + Self::start_internal( + tx, + rx, + event_listener, + log_dir, + resource_dir, + cache_dir, + version, + ) + } + + fn start_internal( + internal_event_tx: mpsc::Sender<InternalDaemonEvent>, + internal_event_rx: mpsc::Receiver<InternalDaemonEvent>, + event_listener: L, + log_dir: Option<PathBuf>, + resource_dir: PathBuf, + cache_dir: PathBuf, + version: String, + ) -> Result<Self> { let ca_path = resource_dir.join(mullvad_paths::resources::API_CA_FILENAME); let mut rpc_manager = mullvad_rpc::MullvadRpcFactory::with_cache_dir(&cache_dir, &ca_path); @@ -225,15 +320,11 @@ impl Daemon { let rpc_handle = rpc_handle.map_err(Error::InitRpcClient)?; let https_handle = https_handle.map_err(Error::InitHttpsClient)?; - let (internal_event_tx, internal_event_rx) = mpsc::channel(); - - let management_interface_broadcaster = - Self::start_management_interface(internal_event_tx.clone())?; - let relay_list_broadcaster = management_interface_broadcaster.clone(); - + let relay_list_listener = event_listener.clone(); let on_relay_list_update = move |relay_list: &RelayList| { - relay_list_broadcaster.notify_relay_list(relay_list.clone()); + relay_list_listener.notify_relay_list(relay_list.clone()); }; + let relay_selector = relays::RelaySelector::new( rpc_handle.clone(), on_relay_list_update, @@ -269,7 +360,7 @@ impl Daemon { rx: internal_event_rx, tx: internal_event_tx, reconnection_loop_tx: None, - management_interface_broadcaster, + event_listener, settings, account_history, wg_key_proxy: WireguardKeyProxy::new(rpc_handle.clone()), @@ -283,39 +374,6 @@ impl Daemon { }) } - // Starts the management interface and spawns a thread that will process it. - // Returns a handle that allows notifying all subscribers on events. - fn start_management_interface( - event_tx: mpsc::Sender<InternalDaemonEvent>, - ) -> Result<management_interface::EventBroadcaster> { - let multiplex_event_tx = IntoSender::from(event_tx.clone()); - let server = Self::start_management_interface_server(multiplex_event_tx)?; - let event_broadcaster = server.event_broadcaster(); - Self::spawn_management_interface_wait_thread(server, event_tx); - Ok(event_broadcaster) - } - - fn start_management_interface_server( - event_tx: IntoSender<ManagementCommand, InternalDaemonEvent>, - ) -> Result<ManagementInterfaceServer> { - let server = - ManagementInterfaceServer::start(event_tx).map_err(Error::StartManagementInterface)?; - info!("Management interface listening on {}", server.socket_path()); - - Ok(server) - } - - fn spawn_management_interface_wait_thread( - server: ManagementInterfaceServer, - exit_tx: mpsc::Sender<InternalDaemonEvent>, - ) { - thread::spawn(move || { - server.wait(); - info!("Management interface shut down"); - let _ = exit_tx.send(InternalDaemonEvent::ManagementInterfaceExited); - }); - } - /// Retrieve a channel for sending daemon commands. pub fn command_sender(&self) -> DaemonCommandSender { DaemonCommandSender::new(self.tx.clone()) @@ -334,7 +392,6 @@ impl Daemon { break; } } - self.management_interface_broadcaster.close(); Ok(()) } @@ -373,8 +430,7 @@ impl Daemon { } self.tunnel_state = tunnel_state.clone(); - self.management_interface_broadcaster - .notify_new_state(tunnel_state); + self.event_listener.notify_new_state(tunnel_state); } fn handle_generate_tunnel_parameters( @@ -625,8 +681,7 @@ impl Daemon { Ok(account_changed) => { Self::oneshot_send(tx, (), "set_account response"); if account_changed { - self.management_interface_broadcaster - .notify_settings(self.settings.clone()); + self.event_listener.notify_settings(self.settings.clone()); match account_token { Some(token) => { if let Err(e) = self.account_history.bump_history(&token) { @@ -693,8 +748,7 @@ impl Daemon { Ok(settings_changed) => { Self::oneshot_send(tx, (), "update_relay_settings response"); if settings_changed { - self.management_interface_broadcaster - .notify_settings(self.settings.clone()); + self.event_listener.notify_settings(self.settings.clone()); info!("Initiating tunnel restart because the relay settings changed"); self.reconnect_tunnel(); } @@ -709,8 +763,7 @@ impl Daemon { Ok(settings_changed) => { Self::oneshot_send(tx, (), "set_allow_lan response"); if settings_changed { - self.management_interface_broadcaster - .notify_settings(self.settings.clone()); + self.event_listener.notify_settings(self.settings.clone()); self.send_tunnel_command(TunnelCommand::AllowLan(allow_lan)); } } @@ -730,8 +783,7 @@ impl Daemon { Ok(settings_changed) => { Self::oneshot_send(tx, (), "set_block_when_disconnected response"); if settings_changed { - self.management_interface_broadcaster - .notify_settings(self.settings.clone()); + self.event_listener.notify_settings(self.settings.clone()); self.send_tunnel_command(TunnelCommand::BlockWhenDisconnected( block_when_disconnected, )); @@ -747,8 +799,7 @@ impl Daemon { Ok(settings_changed) => { Self::oneshot_send(tx, (), "set auto-connect response"); if settings_changed { - self.management_interface_broadcaster - .notify_settings(self.settings.clone()); + self.event_listener.notify_settings(self.settings.clone()); } } Err(e) => error!("{}", e.display_chain_with_msg("Unable to save settings")), @@ -761,8 +812,7 @@ impl Daemon { Ok(settings_changed) => { Self::oneshot_send(tx, (), "set_openvpn_mssfix response"); if settings_changed { - self.management_interface_broadcaster - .notify_settings(self.settings.clone()); + self.event_listener.notify_settings(self.settings.clone()); info!("Initiating tunnel restart because the OpenVPN mssfix setting changed"); self.reconnect_tunnel(); } @@ -786,8 +836,7 @@ impl Daemon { (Ok(proxy_changed), Ok(constraints_changed)) => { Self::oneshot_send(tx, Ok(()), "set_openvpn_proxy response"); if proxy_changed || constraints_changed { - self.management_interface_broadcaster - .notify_settings(self.settings.clone()); + self.event_listener.notify_settings(self.settings.clone()); info!("Initiating tunnel restart because the OpenVPN proxy setting changed"); self.reconnect_tunnel(); } @@ -828,8 +877,7 @@ impl Daemon { Ok(settings_changed) => { Self::oneshot_send(tx, (), "set_enable_ipv6 response"); if settings_changed { - self.management_interface_broadcaster - .notify_settings(self.settings.clone()); + self.event_listener.notify_settings(self.settings.clone()); info!("Initiating tunnel restart because the enable IPv6 setting changed"); self.reconnect_tunnel(); } @@ -844,8 +892,7 @@ impl Daemon { Ok(settings_changed) => { Self::oneshot_send(tx, (), "set_wireguard_mtu response"); if settings_changed { - self.management_interface_broadcaster - .notify_settings(self.settings.clone()); + self.event_listener.notify_settings(self.settings.clone()); info!("Initiating tunnel restart because the WireGuard MTU setting changed"); self.reconnect_tunnel(); } diff --git a/mullvad-daemon/src/management_interface.rs b/mullvad-daemon/src/management_interface.rs index c2ebf4ceb2..9d212abef8 100644 --- a/mullvad-daemon/src/management_interface.rs +++ b/mullvad-daemon/src/management_interface.rs @@ -1,3 +1,4 @@ +use crate::EventListener; use jsonrpc_core::{ futures::{ future, @@ -257,10 +258,10 @@ impl ManagementInterfaceServer { self.server.path() } - pub fn event_broadcaster(&self) -> EventBroadcaster { - EventBroadcaster { + pub fn event_broadcaster(&self) -> ManagementInterfaceEventBroadcaster { + ManagementInterfaceEventBroadcaster { subscriptions: self.subscriptions.clone(), - close_handle: self.server.close_handle(), + close_handle: Some(self.server.close_handle()), } } @@ -273,35 +274,32 @@ impl ManagementInterfaceServer { /// A handle that allows broadcasting messages to all subscribers of the management interface. #[derive(Clone)] -pub struct EventBroadcaster { +pub struct ManagementInterfaceEventBroadcaster { subscriptions: Arc<RwLock<HashMap<SubscriptionId, pubsub::Sink<DaemonEvent>>>>, - close_handle: talpid_ipc::CloseHandle, + close_handle: Option<talpid_ipc::CloseHandle>, } -impl EventBroadcaster { - /// Notifies that the management interface should be closed - pub fn close(self) { - self.close_handle.close(); - } - +impl EventListener for ManagementInterfaceEventBroadcaster { /// Sends a new state update to all `new_state` subscribers of the management interface. - pub fn notify_new_state(&self, new_state: TunnelStateTransition) { + fn notify_new_state(&self, new_state: TunnelStateTransition) { log::debug!("Broadcasting new state: {:?}", new_state); self.notify(DaemonEvent::StateTransition(new_state)); } /// Sends settings to all `settings` subscribers of the management interface. - pub fn notify_settings(&self, settings: Settings) { + fn notify_settings(&self, settings: Settings) { log::debug!("Broadcasting new settings"); self.notify(DaemonEvent::Settings(settings)); } /// Sends settings to all `settings` subscribers of the management interface. - pub fn notify_relay_list(&self, relay_list: RelayList) { + fn notify_relay_list(&self, relay_list: RelayList) { log::debug!("Broadcasting new relay list"); self.notify(DaemonEvent::RelayList(relay_list)); } +} +impl ManagementInterfaceEventBroadcaster { fn notify(&self, value: DaemonEvent) { let subscriptions = self.subscriptions.read().unwrap(); for sink in subscriptions.values() { @@ -310,6 +308,14 @@ impl EventBroadcaster { } } +impl Drop for ManagementInterfaceEventBroadcaster { + fn drop(&mut self) { + if let Some(close_handle) = self.close_handle.take() { + close_handle.close(); + } + } +} + struct ManagementInterface<T: From<ManagementCommand> + 'static + Send> { subscriptions: Arc<RwLock<HashMap<SubscriptionId, pubsub::Sink<DaemonEvent>>>>, tx: Mutex<IntoSender<ManagementCommand, T>>, |
