diff options
| author | Janito Vaqueiro Ferreira Filho <janito@mullvad.net> | 2020-02-21 17:42:10 +0000 |
|---|---|---|
| committer | Janito Vaqueiro Ferreira Filho <janito@mullvad.net> | 2020-02-24 12:45:10 +0000 |
| commit | a71c3dd858f85df41437d028f3f29580b7c5149f (patch) | |
| tree | 1e32164cadb95edc4544d5e7ad735c01011f035f | |
| parent | 5143a0f6324e388128930b70382b05f9d4026bcb (diff) | |
| download | mullvadvpn-a71c3dd858f85df41437d028f3f29580b7c5149f.tar.xz mullvadvpn-a71c3dd858f85df41437d028f3f29580b7c5149f.zip | |
Use `DaemonEventSender` inside `Daemon`
| -rw-r--r-- | mullvad-daemon/src/lib.rs | 51 |
1 files changed, 27 insertions, 24 deletions
diff --git a/mullvad-daemon/src/lib.rs b/mullvad-daemon/src/lib.rs index b27030a645..b40264f9a8 100644 --- a/mullvad-daemon/src/lib.rs +++ b/mullvad-daemon/src/lib.rs @@ -397,7 +397,7 @@ pub struct Daemon<L: EventListener = ManagementInterfaceEventBroadcaster> { target_state: TargetState, state: DaemonExecutionState, rx: Wait<UnboundedReceiver<InternalDaemonEvent>>, - tx: UnboundedSender<InternalDaemonEvent>, + tx: DaemonEventSender, reconnection_loop_tx: Option<mpsc::Sender<()>>, event_listener: L, settings: Settings, @@ -426,10 +426,13 @@ impl Daemon<ManagementInterfaceEventBroadcaster> { return Err(Error::DaemonIsAlreadyRunning); } let (tx, rx) = futures::sync::mpsc::unbounded(); - let management_interface_broadcaster = Self::start_management_interface(tx.clone())?; + let command_sender = DaemonCommandSender::new(tx.clone()); + let event_sender = DaemonEventSender::new(tx); + let management_interface_broadcaster = + Self::start_management_interface(command_sender, event_sender.clone())?; Self::start_internal( - tx, + event_sender, rx, management_interface_broadcaster, log_dir, @@ -443,12 +446,12 @@ impl Daemon<ManagementInterfaceEventBroadcaster> { // Starts the management interface and spawns a thread that will process it. // Returns a handle that allows notifying all subscribers on events. fn start_management_interface( - event_tx: UnboundedSender<InternalDaemonEvent>, + command_sender: DaemonCommandSender, + event_sender: DaemonEventSender, ) -> Result<ManagementInterfaceEventBroadcaster, Error> { - let command_sender = DaemonCommandSender::new(event_tx.clone()); let server = Self::start_management_interface_server(command_sender)?; let event_broadcaster = server.event_broadcaster(); - Self::spawn_management_interface_wait_thread(server, event_tx); + Self::spawn_management_interface_wait_thread(server, event_sender); Ok(event_broadcaster) } @@ -464,12 +467,12 @@ impl Daemon<ManagementInterfaceEventBroadcaster> { fn spawn_management_interface_wait_thread( server: ManagementInterfaceServer, - exit_tx: UnboundedSender<InternalDaemonEvent>, + exit_tx: DaemonEventSender, ) { thread::spawn(move || { server.wait(); info!("Management interface shut down"); - let _ = exit_tx.unbounded_send(InternalDaemonEvent::ManagementInterfaceExited); + let _ = exit_tx.send(InternalDaemonEvent::ManagementInterfaceExited); }); } } @@ -487,9 +490,10 @@ where ) -> Result<(Self, DaemonCommandSender), Error> { let (tx, rx) = futures::sync::mpsc::unbounded(); let command_sender = DaemonCommandSender::new(tx.clone()); + let event_sender = DaemonEventSender::new(tx); let daemon = Self::start_internal( - tx, + event_sender, rx, event_listener, log_dir, @@ -503,7 +507,7 @@ where } fn start_internal( - internal_event_tx: UnboundedSender<InternalDaemonEvent>, + internal_event_tx: DaemonEventSender, internal_event_rx: UnboundedReceiver<InternalDaemonEvent>, event_listener: L, log_dir: Option<PathBuf>, @@ -543,7 +547,7 @@ where let version_check_future = version_check::VersionUpdater::new( rpc_handle.clone(), cache_dir.clone(), - DaemonEventSender::new(internal_event_tx.clone()).to_specialized_sender(), + internal_event_tx.to_specialized_sender(), app_version_info.clone(), ); tokio_remote.spawn(|_| version_check_future); @@ -567,14 +571,14 @@ where log_dir, resource_dir, cache_dir, - IntoSender::from(internal_event_tx.clone()), + internal_event_tx.to_specialized_sender(), #[cfg(target_os = "android")] android_context, ) .map_err(Error::TunnelError)?; let wireguard_key_manager = wireguard::KeyManager::new( - DaemonEventSender::new(internal_event_tx.clone()), + internal_event_tx.clone(), rpc_handle.clone(), tokio_remote.clone(), ); @@ -896,7 +900,7 @@ where } fn schedule_reconnect(&mut self, delay: Duration) { - let tunnel_command_tx = self.tx.clone(); + let tunnel_command_tx = self.tx.to_specialized_sender(); let (tx, rx) = mpsc::channel(); self.reconnection_loop_tx = Some(tx); @@ -906,8 +910,9 @@ where if let Err(mpsc::RecvTimeoutError::Timeout) = rx.recv_timeout(delay) { debug!("Attempting to reconnect"); - let _ = tunnel_command_tx.unbounded_send(InternalDaemonEvent::Command( - DaemonCommand::SetTargetState(result_tx, TargetState::Secured), + let _ = tunnel_command_tx.send(DaemonCommand::SetTargetState( + result_tx, + TargetState::Secured, )); } }); @@ -1147,10 +1152,8 @@ where .then(move |result| -> Result<(), ()> { match result { Ok(account_token) => { - let _ = daemon_tx.unbounded_send(InternalDaemonEvent::NewAccountEvent( - account_token, - tx, - )); + let _ = + daemon_tx.send(InternalDaemonEvent::NewAccountEvent(account_token, tx)); } Err(err) => { let _ = tx.send(Err(err)); @@ -1780,17 +1783,17 @@ where } pub struct DaemonShutdownHandle { - tx: UnboundedSender<InternalDaemonEvent>, + tx: DaemonEventSender, } impl DaemonShutdownHandle { pub fn shutdown(&self) { - let _ = self.tx.unbounded_send(InternalDaemonEvent::TriggerShutdown); + let _ = self.tx.send(InternalDaemonEvent::TriggerShutdown); } } struct MullvadTunnelParametersGenerator { - tx: UnboundedSender<InternalDaemonEvent>, + tx: DaemonEventSender, } impl TunnelParametersGenerator for MullvadTunnelParametersGenerator { @@ -1801,7 +1804,7 @@ impl TunnelParametersGenerator for MullvadTunnelParametersGenerator { let (response_tx, response_rx) = mpsc::channel(); if self .tx - .unbounded_send(InternalDaemonEvent::GenerateTunnelParameters( + .send(InternalDaemonEvent::GenerateTunnelParameters( response_tx, retry_attempt, )) |
