summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--mullvad-daemon/src/lib.rs45
1 files changed, 24 insertions, 21 deletions
diff --git a/mullvad-daemon/src/lib.rs b/mullvad-daemon/src/lib.rs
index 93c02fff69..0eb825315f 100644
--- a/mullvad-daemon/src/lib.rs
+++ b/mullvad-daemon/src/lib.rs
@@ -22,7 +22,7 @@ mod version_check;
use futures::{
channel::{mpsc, oneshot},
future::{abortable, AbortHandle, Future},
- StreamExt,
+ SinkExt, StreamExt,
};
use log::{debug, error, info, warn};
use mullvad_rpc::AccountsProxy;
@@ -48,7 +48,7 @@ use std::{
io,
marker::PhantomData,
mem,
- net::{IpAddr, SocketAddr},
+ net::IpAddr,
path::PathBuf,
sync::{mpsc as sync_mpsc, Arc, Weak},
time::Duration,
@@ -260,8 +260,6 @@ pub(crate) enum InternalDaemonEvent {
),
/// The background job fetching new `AppVersionInfo`s got a new info object.
NewAppVersionInfo(AppVersionInfo),
- /// A new API endpoint is being used
- NewApiAddress(SocketAddr, oneshot::Sender<()>),
}
impl From<TunnelStateTransition> for InternalDaemonEvent {
@@ -493,7 +491,8 @@ where
oneshot::channel();
let (internal_event_tx, internal_event_rx) = command_channel.destructure();
- let address_change_tx = std::sync::Mutex::new(internal_event_tx.clone());
+ let (address_change_tx, mut address_change_rx) = mpsc::channel(0);
+ let address_change_tx = std::sync::Mutex::new(address_change_tx);
let address_change_runtime = tokio::runtime::Handle::current();
let mut rpc_runtime = mullvad_rpc::MullvadRpcRuntime::with_cache(
@@ -504,16 +503,15 @@ where
move |address| {
let (result_tx, result_rx) = oneshot::channel();
- let tx = address_change_tx.lock().unwrap();
- if tx
- .send(InternalDaemonEvent::NewApiAddress(address, result_tx))
- .is_err()
- {
- log::error!("Failed to send API address daemon event");
- return Err(());
- }
-
- address_change_runtime.block_on(result_rx).map_err(|_| ())
+ let mut tx = address_change_tx.lock().unwrap().clone();
+ address_change_runtime.block_on(async move {
+ let tunnel_command = TunnelCommand::AllowEndpoint(
+ Endpoint::from_socket_address(address, TransportProtocol::Tcp),
+ result_tx,
+ );
+ let _ = tx.send(tunnel_command).await;
+ result_rx.await.map_err(|_| ())
+ })
},
)
.await
@@ -618,6 +616,17 @@ where
.await
.map_err(Error::TunnelError)?;
+ let tsm_api_address_change_tx = Arc::downgrade(&tunnel_command_tx);
+ tokio::spawn(async move {
+ while let Some(address_change) = address_change_rx.next().await {
+ if let Some(tx) = tsm_api_address_change_tx.upgrade() {
+ let _ = tx.unbounded_send(address_change);
+ } else {
+ return;
+ }
+ }
+ });
+
let wireguard_key_manager =
wireguard::KeyManager::new(internal_event_tx.clone(), rpc_handle.clone());
@@ -774,12 +783,6 @@ where
NewAppVersionInfo(app_version_info) => {
self.handle_new_app_version_info(app_version_info)
}
- NewApiAddress(address, tx) => {
- self.send_tunnel_command(TunnelCommand::AllowEndpoint(
- Endpoint::from_socket_address(address, TransportProtocol::Tcp),
- tx,
- ));
- }
}
}