diff options
| author | Markus Pettersson <markus.pettersson@mullvad.net> | 2025-10-09 16:08:16 +0200 |
|---|---|---|
| committer | Markus Pettersson <markus.pettersson@mullvad.net> | 2025-10-09 16:08:16 +0200 |
| commit | e1129803344db05e2dfe0e388116350fc7a89ad6 (patch) | |
| tree | deb1c385840f6a42ec13f353f1321c6941bf6b17 | |
| parent | 8fcdcbd9ad474e8505fec8930e1ad6f2ae0238fb (diff) | |
| parent | 87b3bb87a575abc334bcbed25bb8758c5539c5c0 (diff) | |
| download | mullvadvpn-e1129803344db05e2dfe0e388116350fc7a89ad6.tar.xz mullvadvpn-e1129803344db05e2dfe0e388116350fc7a89ad6.zip | |
Merge branch 'fix-lwo-android'
| -rw-r--r-- | tunnel-obfuscation/src/lib.rs | 1 | ||||
| -rw-r--r-- | tunnel-obfuscation/src/lwo.rs | 158 |
2 files changed, 103 insertions, 56 deletions
diff --git a/tunnel-obfuscation/src/lib.rs b/tunnel-obfuscation/src/lib.rs index 4ee8f0c8a1..c8819e1e1c 100644 --- a/tunnel-obfuscation/src/lib.rs +++ b/tunnel-obfuscation/src/lib.rs @@ -53,6 +53,7 @@ pub enum Error { #[async_trait] pub trait Obfuscator: Send { + /// NOTE(Android): Make sure to call bypass on the obfuscator socket _before_ invoking run. async fn run(self: Box<Self>) -> Result<()>; /// Returns the address of the local socket. diff --git a/tunnel-obfuscation/src/lwo.rs b/tunnel-obfuscation/src/lwo.rs index 9fd044458b..12c8b59b8a 100644 --- a/tunnel-obfuscation/src/lwo.rs +++ b/tunnel-obfuscation/src/lwo.rs @@ -8,8 +8,8 @@ use std::{ use async_trait::async_trait; use rand::{RngCore, SeedableRng}; use talpid_types::net::wireguard::PublicKey; -use tokio::{io, net::UdpSocket}; -use tokio_util::sync::{CancellationToken, DropGuard}; +use tokio::{io, net::UdpSocket, task::JoinHandle}; +use tokio_util::sync::CancellationToken; use crate::{Obfuscator, socket::create_remote_socket}; @@ -45,11 +45,10 @@ pub struct Settings { } pub struct Lwo { - task: tokio::task::JoinHandle<Result<(), Error>>, + client: Client, local_endpoint: SocketAddr, #[cfg(target_os = "android")] wg_endpoint: Arc<UdpSocket>, - _drop_guard: DropGuard, } impl Lwo { @@ -73,65 +72,107 @@ impl Lwo { .map_err(Error::GetUdpLocalAddress) .map_err(crate::Error::CreateLwoObfuscator)?; - let rx_key = settings.client_public_key.clone(); - let tx_key = settings.server_public_key.clone(); + #[cfg(target_os = "android")] + let wg_endpoint = Arc::clone(&remote_socket); - let server_addr = settings.server_addr; + let client = Client { + server_addr: settings.server_addr, + rx_key: settings.client_public_key.clone(), + tx_key: settings.server_public_key.clone(), + remote_socket, + client_socket, + }; - let token = CancellationToken::new(); - let cancel_token = token.child_token(); - let _drop_guard = token.drop_guard(); + Ok(Self { + local_endpoint, + client, + #[cfg(target_os = "android")] + wg_endpoint, + }) + } - #[cfg(target_os = "android")] - let wg_endpoint = remote_socket.clone(); + async fn run_forwarding(client: Client, cancel_token: CancellationToken) -> Result<(), Error> { + let mut running = client.connect().await?; + log::trace!("LWO client is running! 🎉"); + tokio::select! { + _ = cancel_token.cancelled() => log::trace!("Stopping LWO obfuscation"), + _result = &mut running.send => log::trace!("LWO client closed (send_task)"), + _result = &mut running.recv => log::trace!("LWO client closed (recv_task)"), + }; - let task = tokio::spawn(async move { - remote_socket - .connect(server_addr) - .await - .map_err(Error::ConnectRemoteUdp)?; - log::debug!("Connected to {server_addr}"); + Ok(()) + } +} - let client_addr = client_socket - .peek_sender() - .await - .map_err(Error::GetUdpLocalAddress)?; - client_socket - .connect(client_addr) - .await - .map_err(Error::PeekUdpSender)?; - log::debug!("Client socket connected to {client_addr}"); +struct Client { + server_addr: SocketAddr, + + rx_key: PublicKey, + tx_key: PublicKey, + + remote_socket: Arc<UdpSocket>, + client_socket: Arc<UdpSocket>, +} + +/// Start an LWO client by calling [Client::run]. +struct RunningClient { + /// Egress task. + send: JoinHandle<()>, + /// Ingress task. + recv: JoinHandle<()>, +} - let rx_socket = client_socket.clone(); - let tx_socket = remote_socket.clone(); - let mut send_task = tokio::spawn(async move { - run_obfuscation(true, tx_key, rx_socket, tx_socket).await; - }); +// Auto-abort the send/recv tasks on drop. +impl Drop for RunningClient { + fn drop(&mut self) { + self.send.abort(); + self.recv.abort(); + } +} - let rx_socket = remote_socket.clone(); - let tx_socket = client_socket.clone(); - let mut recv_task = tokio::spawn(async move { - run_obfuscation(false, rx_key, rx_socket, tx_socket).await; - }); +impl Client { + /// Returns join handles to the send and receive tasks. These need to be aborted when the + /// obfuscator is aborted / finished. + async fn connect(self) -> Result<RunningClient, Error> { + let Client { + server_addr, + rx_key, + tx_key, + remote_socket, + client_socket, + } = self; - tokio::select! { - _ = cancel_token.cancelled() => log::debug!("Stopping LWO obfuscation"), - _result = &mut recv_task => log::debug!("LWO client closed (recv_task)"), - _result = &mut send_task => log::debug!("LWO client closed (send_task)"), - }; + remote_socket + .connect(server_addr) + .await + .map_err(Error::ConnectRemoteUdp)?; + log::debug!("Connected to {server_addr}"); - send_task.abort(); - recv_task.abort(); + let client_addr = client_socket + .peek_sender() + .await + .map_err(Error::GetUdpLocalAddress)?; + client_socket + .connect(client_addr) + .await + .map_err(Error::PeekUdpSender)?; + log::debug!("Client socket connected to {client_addr}"); - Ok(()) + let rx_socket = client_socket.clone(); + let tx_socket = remote_socket.clone(); + let send_task = tokio::spawn(async move { + run_obfuscation(true, tx_key, rx_socket, tx_socket).await; }); - Ok(Self { - task, - local_endpoint, - #[cfg(target_os = "android")] - wg_endpoint, - _drop_guard, + let rx_socket = remote_socket.clone(); + let tx_socket = client_socket.clone(); + let recv_task = tokio::spawn(async move { + run_obfuscation(false, rx_key, rx_socket, tx_socket).await; + }); + + Ok(RunningClient { + send: send_task, + recv: recv_task, }) } } @@ -257,11 +298,16 @@ impl Obfuscator for Lwo { } async fn run(self: Box<Self>) -> crate::Result<()> { - match self.task.await { - Ok(result) => result.map_err(crate::Error::RunLwoObfuscator), - Err(_err) if _err.is_cancelled() => Ok(()), - Err(_err) => panic!("server handle panicked"), - } + let token = CancellationToken::new(); + let child_token = token.child_token(); + // This will always cancel `child_token` as soon as `run` is finished or aborted. + let _drop_guard = token.drop_guard(); + + let client = self.client; + tokio::spawn(Lwo::run_forwarding(client, child_token)) + .await + .unwrap() + .map_err(crate::Error::RunLwoObfuscator) } fn packet_overhead(&self) -> u16 { |
