summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorJanito Vaqueiro Ferreira Filho <janito@mullvad.net>2020-02-21 17:42:10 +0000
committerJanito Vaqueiro Ferreira Filho <janito@mullvad.net>2020-02-24 12:45:10 +0000
commita71c3dd858f85df41437d028f3f29580b7c5149f (patch)
tree1e32164cadb95edc4544d5e7ad735c01011f035f
parent5143a0f6324e388128930b70382b05f9d4026bcb (diff)
downloadmullvadvpn-a71c3dd858f85df41437d028f3f29580b7c5149f.tar.xz
mullvadvpn-a71c3dd858f85df41437d028f3f29580b7c5149f.zip
Use `DaemonEventSender` inside `Daemon`
-rw-r--r--mullvad-daemon/src/lib.rs51
1 files changed, 27 insertions, 24 deletions
diff --git a/mullvad-daemon/src/lib.rs b/mullvad-daemon/src/lib.rs
index b27030a645..b40264f9a8 100644
--- a/mullvad-daemon/src/lib.rs
+++ b/mullvad-daemon/src/lib.rs
@@ -397,7 +397,7 @@ pub struct Daemon<L: EventListener = ManagementInterfaceEventBroadcaster> {
target_state: TargetState,
state: DaemonExecutionState,
rx: Wait<UnboundedReceiver<InternalDaemonEvent>>,
- tx: UnboundedSender<InternalDaemonEvent>,
+ tx: DaemonEventSender,
reconnection_loop_tx: Option<mpsc::Sender<()>>,
event_listener: L,
settings: Settings,
@@ -426,10 +426,13 @@ impl Daemon<ManagementInterfaceEventBroadcaster> {
return Err(Error::DaemonIsAlreadyRunning);
}
let (tx, rx) = futures::sync::mpsc::unbounded();
- let management_interface_broadcaster = Self::start_management_interface(tx.clone())?;
+ let command_sender = DaemonCommandSender::new(tx.clone());
+ let event_sender = DaemonEventSender::new(tx);
+ let management_interface_broadcaster =
+ Self::start_management_interface(command_sender, event_sender.clone())?;
Self::start_internal(
- tx,
+ event_sender,
rx,
management_interface_broadcaster,
log_dir,
@@ -443,12 +446,12 @@ impl Daemon<ManagementInterfaceEventBroadcaster> {
// Starts the management interface and spawns a thread that will process it.
// Returns a handle that allows notifying all subscribers on events.
fn start_management_interface(
- event_tx: UnboundedSender<InternalDaemonEvent>,
+ command_sender: DaemonCommandSender,
+ event_sender: DaemonEventSender,
) -> Result<ManagementInterfaceEventBroadcaster, Error> {
- let command_sender = DaemonCommandSender::new(event_tx.clone());
let server = Self::start_management_interface_server(command_sender)?;
let event_broadcaster = server.event_broadcaster();
- Self::spawn_management_interface_wait_thread(server, event_tx);
+ Self::spawn_management_interface_wait_thread(server, event_sender);
Ok(event_broadcaster)
}
@@ -464,12 +467,12 @@ impl Daemon<ManagementInterfaceEventBroadcaster> {
fn spawn_management_interface_wait_thread(
server: ManagementInterfaceServer,
- exit_tx: UnboundedSender<InternalDaemonEvent>,
+ exit_tx: DaemonEventSender,
) {
thread::spawn(move || {
server.wait();
info!("Management interface shut down");
- let _ = exit_tx.unbounded_send(InternalDaemonEvent::ManagementInterfaceExited);
+ let _ = exit_tx.send(InternalDaemonEvent::ManagementInterfaceExited);
});
}
}
@@ -487,9 +490,10 @@ where
) -> Result<(Self, DaemonCommandSender), Error> {
let (tx, rx) = futures::sync::mpsc::unbounded();
let command_sender = DaemonCommandSender::new(tx.clone());
+ let event_sender = DaemonEventSender::new(tx);
let daemon = Self::start_internal(
- tx,
+ event_sender,
rx,
event_listener,
log_dir,
@@ -503,7 +507,7 @@ where
}
fn start_internal(
- internal_event_tx: UnboundedSender<InternalDaemonEvent>,
+ internal_event_tx: DaemonEventSender,
internal_event_rx: UnboundedReceiver<InternalDaemonEvent>,
event_listener: L,
log_dir: Option<PathBuf>,
@@ -543,7 +547,7 @@ where
let version_check_future = version_check::VersionUpdater::new(
rpc_handle.clone(),
cache_dir.clone(),
- DaemonEventSender::new(internal_event_tx.clone()).to_specialized_sender(),
+ internal_event_tx.to_specialized_sender(),
app_version_info.clone(),
);
tokio_remote.spawn(|_| version_check_future);
@@ -567,14 +571,14 @@ where
log_dir,
resource_dir,
cache_dir,
- IntoSender::from(internal_event_tx.clone()),
+ internal_event_tx.to_specialized_sender(),
#[cfg(target_os = "android")]
android_context,
)
.map_err(Error::TunnelError)?;
let wireguard_key_manager = wireguard::KeyManager::new(
- DaemonEventSender::new(internal_event_tx.clone()),
+ internal_event_tx.clone(),
rpc_handle.clone(),
tokio_remote.clone(),
);
@@ -896,7 +900,7 @@ where
}
fn schedule_reconnect(&mut self, delay: Duration) {
- let tunnel_command_tx = self.tx.clone();
+ let tunnel_command_tx = self.tx.to_specialized_sender();
let (tx, rx) = mpsc::channel();
self.reconnection_loop_tx = Some(tx);
@@ -906,8 +910,9 @@ where
if let Err(mpsc::RecvTimeoutError::Timeout) = rx.recv_timeout(delay) {
debug!("Attempting to reconnect");
- let _ = tunnel_command_tx.unbounded_send(InternalDaemonEvent::Command(
- DaemonCommand::SetTargetState(result_tx, TargetState::Secured),
+ let _ = tunnel_command_tx.send(DaemonCommand::SetTargetState(
+ result_tx,
+ TargetState::Secured,
));
}
});
@@ -1147,10 +1152,8 @@ where
.then(move |result| -> Result<(), ()> {
match result {
Ok(account_token) => {
- let _ = daemon_tx.unbounded_send(InternalDaemonEvent::NewAccountEvent(
- account_token,
- tx,
- ));
+ let _ =
+ daemon_tx.send(InternalDaemonEvent::NewAccountEvent(account_token, tx));
}
Err(err) => {
let _ = tx.send(Err(err));
@@ -1780,17 +1783,17 @@ where
}
pub struct DaemonShutdownHandle {
- tx: UnboundedSender<InternalDaemonEvent>,
+ tx: DaemonEventSender,
}
impl DaemonShutdownHandle {
pub fn shutdown(&self) {
- let _ = self.tx.unbounded_send(InternalDaemonEvent::TriggerShutdown);
+ let _ = self.tx.send(InternalDaemonEvent::TriggerShutdown);
}
}
struct MullvadTunnelParametersGenerator {
- tx: UnboundedSender<InternalDaemonEvent>,
+ tx: DaemonEventSender,
}
impl TunnelParametersGenerator for MullvadTunnelParametersGenerator {
@@ -1801,7 +1804,7 @@ impl TunnelParametersGenerator for MullvadTunnelParametersGenerator {
let (response_tx, response_rx) = mpsc::channel();
if self
.tx
- .unbounded_send(InternalDaemonEvent::GenerateTunnelParameters(
+ .send(InternalDaemonEvent::GenerateTunnelParameters(
response_tx,
retry_attempt,
))