summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--tunnel-obfuscation/src/lib.rs1
-rw-r--r--tunnel-obfuscation/src/lwo.rs158
2 files changed, 103 insertions, 56 deletions
diff --git a/tunnel-obfuscation/src/lib.rs b/tunnel-obfuscation/src/lib.rs
index 4ee8f0c8a1..c8819e1e1c 100644
--- a/tunnel-obfuscation/src/lib.rs
+++ b/tunnel-obfuscation/src/lib.rs
@@ -53,6 +53,7 @@ pub enum Error {
#[async_trait]
pub trait Obfuscator: Send {
+ /// NOTE(Android): Make sure to call bypass on the obfuscator socket _before_ invoking run.
async fn run(self: Box<Self>) -> Result<()>;
/// Returns the address of the local socket.
diff --git a/tunnel-obfuscation/src/lwo.rs b/tunnel-obfuscation/src/lwo.rs
index 9fd044458b..12c8b59b8a 100644
--- a/tunnel-obfuscation/src/lwo.rs
+++ b/tunnel-obfuscation/src/lwo.rs
@@ -8,8 +8,8 @@ use std::{
use async_trait::async_trait;
use rand::{RngCore, SeedableRng};
use talpid_types::net::wireguard::PublicKey;
-use tokio::{io, net::UdpSocket};
-use tokio_util::sync::{CancellationToken, DropGuard};
+use tokio::{io, net::UdpSocket, task::JoinHandle};
+use tokio_util::sync::CancellationToken;
use crate::{Obfuscator, socket::create_remote_socket};
@@ -45,11 +45,10 @@ pub struct Settings {
}
pub struct Lwo {
- task: tokio::task::JoinHandle<Result<(), Error>>,
+ client: Client,
local_endpoint: SocketAddr,
#[cfg(target_os = "android")]
wg_endpoint: Arc<UdpSocket>,
- _drop_guard: DropGuard,
}
impl Lwo {
@@ -73,65 +72,107 @@ impl Lwo {
.map_err(Error::GetUdpLocalAddress)
.map_err(crate::Error::CreateLwoObfuscator)?;
- let rx_key = settings.client_public_key.clone();
- let tx_key = settings.server_public_key.clone();
+ #[cfg(target_os = "android")]
+ let wg_endpoint = Arc::clone(&remote_socket);
- let server_addr = settings.server_addr;
+ let client = Client {
+ server_addr: settings.server_addr,
+ rx_key: settings.client_public_key.clone(),
+ tx_key: settings.server_public_key.clone(),
+ remote_socket,
+ client_socket,
+ };
- let token = CancellationToken::new();
- let cancel_token = token.child_token();
- let _drop_guard = token.drop_guard();
+ Ok(Self {
+ local_endpoint,
+ client,
+ #[cfg(target_os = "android")]
+ wg_endpoint,
+ })
+ }
- #[cfg(target_os = "android")]
- let wg_endpoint = remote_socket.clone();
+ async fn run_forwarding(client: Client, cancel_token: CancellationToken) -> Result<(), Error> {
+ let mut running = client.connect().await?;
+ log::trace!("LWO client is running! 🎉");
+ tokio::select! {
+ _ = cancel_token.cancelled() => log::trace!("Stopping LWO obfuscation"),
+ _result = &mut running.send => log::trace!("LWO client closed (send_task)"),
+ _result = &mut running.recv => log::trace!("LWO client closed (recv_task)"),
+ };
- let task = tokio::spawn(async move {
- remote_socket
- .connect(server_addr)
- .await
- .map_err(Error::ConnectRemoteUdp)?;
- log::debug!("Connected to {server_addr}");
+ Ok(())
+ }
+}
- let client_addr = client_socket
- .peek_sender()
- .await
- .map_err(Error::GetUdpLocalAddress)?;
- client_socket
- .connect(client_addr)
- .await
- .map_err(Error::PeekUdpSender)?;
- log::debug!("Client socket connected to {client_addr}");
+struct Client {
+ server_addr: SocketAddr,
+
+ rx_key: PublicKey,
+ tx_key: PublicKey,
+
+ remote_socket: Arc<UdpSocket>,
+ client_socket: Arc<UdpSocket>,
+}
+
+/// Start an LWO client by calling [Client::run].
+struct RunningClient {
+ /// Egress task.
+ send: JoinHandle<()>,
+ /// Ingress task.
+ recv: JoinHandle<()>,
+}
- let rx_socket = client_socket.clone();
- let tx_socket = remote_socket.clone();
- let mut send_task = tokio::spawn(async move {
- run_obfuscation(true, tx_key, rx_socket, tx_socket).await;
- });
+// Auto-abort the send/recv tasks on drop.
+impl Drop for RunningClient {
+ fn drop(&mut self) {
+ self.send.abort();
+ self.recv.abort();
+ }
+}
- let rx_socket = remote_socket.clone();
- let tx_socket = client_socket.clone();
- let mut recv_task = tokio::spawn(async move {
- run_obfuscation(false, rx_key, rx_socket, tx_socket).await;
- });
+impl Client {
+ /// Returns join handles to the send and receive tasks. These need to be aborted when the
+ /// obfuscator is aborted / finished.
+ async fn connect(self) -> Result<RunningClient, Error> {
+ let Client {
+ server_addr,
+ rx_key,
+ tx_key,
+ remote_socket,
+ client_socket,
+ } = self;
- tokio::select! {
- _ = cancel_token.cancelled() => log::debug!("Stopping LWO obfuscation"),
- _result = &mut recv_task => log::debug!("LWO client closed (recv_task)"),
- _result = &mut send_task => log::debug!("LWO client closed (send_task)"),
- };
+ remote_socket
+ .connect(server_addr)
+ .await
+ .map_err(Error::ConnectRemoteUdp)?;
+ log::debug!("Connected to {server_addr}");
- send_task.abort();
- recv_task.abort();
+ let client_addr = client_socket
+ .peek_sender()
+ .await
+ .map_err(Error::GetUdpLocalAddress)?;
+ client_socket
+ .connect(client_addr)
+ .await
+ .map_err(Error::PeekUdpSender)?;
+ log::debug!("Client socket connected to {client_addr}");
- Ok(())
+ let rx_socket = client_socket.clone();
+ let tx_socket = remote_socket.clone();
+ let send_task = tokio::spawn(async move {
+ run_obfuscation(true, tx_key, rx_socket, tx_socket).await;
});
- Ok(Self {
- task,
- local_endpoint,
- #[cfg(target_os = "android")]
- wg_endpoint,
- _drop_guard,
+ let rx_socket = remote_socket.clone();
+ let tx_socket = client_socket.clone();
+ let recv_task = tokio::spawn(async move {
+ run_obfuscation(false, rx_key, rx_socket, tx_socket).await;
+ });
+
+ Ok(RunningClient {
+ send: send_task,
+ recv: recv_task,
})
}
}
@@ -257,11 +298,16 @@ impl Obfuscator for Lwo {
}
async fn run(self: Box<Self>) -> crate::Result<()> {
- match self.task.await {
- Ok(result) => result.map_err(crate::Error::RunLwoObfuscator),
- Err(_err) if _err.is_cancelled() => Ok(()),
- Err(_err) => panic!("server handle panicked"),
- }
+ let token = CancellationToken::new();
+ let child_token = token.child_token();
+ // This will always cancel `child_token` as soon as `run` is finished or aborted.
+ let _drop_guard = token.drop_guard();
+
+ let client = self.client;
+ tokio::spawn(Lwo::run_forwarding(client, child_token))
+ .await
+ .unwrap()
+ .map_err(crate::Error::RunLwoObfuscator)
}
fn packet_overhead(&self) -> u16 {