diff options
| author | Emīls <emils@mullvad.net> | 2021-01-20 15:46:20 +0000 |
|---|---|---|
| committer | Emīls <emils@mullvad.net> | 2021-01-21 13:22:24 +0000 |
| commit | f9bb73db4fec61fc339d2d1edce7f17f689a532e (patch) | |
| tree | 96b46e20f378c96fefe41d1178edb50e68f79ad4 | |
| parent | 4f5df3a06de750dd6ad35cdebbfa21dc46fd6bf7 (diff) | |
| download | mullvadvpn-f9bb73db4fec61fc339d2d1edce7f17f689a532e.tar.xz mullvadvpn-f9bb73db4fec61fc339d2d1edce7f17f689a532e.zip | |
Send API address to TSM directly
| -rw-r--r-- | mullvad-daemon/src/lib.rs | 45 |
1 files changed, 24 insertions, 21 deletions
diff --git a/mullvad-daemon/src/lib.rs b/mullvad-daemon/src/lib.rs index 93c02fff69..0eb825315f 100644 --- a/mullvad-daemon/src/lib.rs +++ b/mullvad-daemon/src/lib.rs @@ -22,7 +22,7 @@ mod version_check; use futures::{ channel::{mpsc, oneshot}, future::{abortable, AbortHandle, Future}, - StreamExt, + SinkExt, StreamExt, }; use log::{debug, error, info, warn}; use mullvad_rpc::AccountsProxy; @@ -48,7 +48,7 @@ use std::{ io, marker::PhantomData, mem, - net::{IpAddr, SocketAddr}, + net::IpAddr, path::PathBuf, sync::{mpsc as sync_mpsc, Arc, Weak}, time::Duration, @@ -260,8 +260,6 @@ pub(crate) enum InternalDaemonEvent { ), /// The background job fetching new `AppVersionInfo`s got a new info object. NewAppVersionInfo(AppVersionInfo), - /// A new API endpoint is being used - NewApiAddress(SocketAddr, oneshot::Sender<()>), } impl From<TunnelStateTransition> for InternalDaemonEvent { @@ -493,7 +491,8 @@ where oneshot::channel(); let (internal_event_tx, internal_event_rx) = command_channel.destructure(); - let address_change_tx = std::sync::Mutex::new(internal_event_tx.clone()); + let (address_change_tx, mut address_change_rx) = mpsc::channel(0); + let address_change_tx = std::sync::Mutex::new(address_change_tx); let address_change_runtime = tokio::runtime::Handle::current(); let mut rpc_runtime = mullvad_rpc::MullvadRpcRuntime::with_cache( @@ -504,16 +503,15 @@ where move |address| { let (result_tx, result_rx) = oneshot::channel(); - let tx = address_change_tx.lock().unwrap(); - if tx - .send(InternalDaemonEvent::NewApiAddress(address, result_tx)) - .is_err() - { - log::error!("Failed to send API address daemon event"); - return Err(()); - } - - address_change_runtime.block_on(result_rx).map_err(|_| ()) + let mut tx = address_change_tx.lock().unwrap().clone(); + address_change_runtime.block_on(async move { + let tunnel_command = TunnelCommand::AllowEndpoint( + Endpoint::from_socket_address(address, TransportProtocol::Tcp), + result_tx, + ); + let _ = tx.send(tunnel_command).await; + result_rx.await.map_err(|_| ()) + }) }, ) .await @@ -618,6 +616,17 @@ where .await .map_err(Error::TunnelError)?; + let tsm_api_address_change_tx = Arc::downgrade(&tunnel_command_tx); + tokio::spawn(async move { + while let Some(address_change) = address_change_rx.next().await { + if let Some(tx) = tsm_api_address_change_tx.upgrade() { + let _ = tx.unbounded_send(address_change); + } else { + return; + } + } + }); + let wireguard_key_manager = wireguard::KeyManager::new(internal_event_tx.clone(), rpc_handle.clone()); @@ -774,12 +783,6 @@ where NewAppVersionInfo(app_version_info) => { self.handle_new_app_version_info(app_version_info) } - NewApiAddress(address, tx) => { - self.send_tunnel_command(TunnelCommand::AllowEndpoint( - Endpoint::from_socket_address(address, TransportProtocol::Tcp), - tx, - )); - } } } |
