summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorDavid Lönnhager <david.l@mullvad.net>2025-09-18 11:39:17 +0200
committerDavid Lönnhager <david.l@mullvad.net>2025-09-18 17:20:44 +0200
commit72ec0d4db8fc0bd9fc468c06dd102c15e9cba501 (patch)
tree4e46bc3322fe413d253bdeb7013105d9ad975df5
parent9ddc3c1ddf699427228c11474f36f946406b5aa9 (diff)
downloadmullvadvpn-72ec0d4db8fc0bd9fc468c06dd102c15e9cba501.tar.xz
mullvadvpn-72ec0d4db8fc0bd9fc468c06dd102c15e9cba501.zip
Add documentation for multiplexer obfuscation
-rw-r--r--Cargo.lock1
-rw-r--r--mullvad-relay-selector/src/relay_selector/helpers.rs13
-rw-r--r--talpid-types/src/net/obfuscation.rs29
-rw-r--r--tunnel-obfuscation/Cargo.toml1
-rw-r--r--tunnel-obfuscation/src/multiplexer.rs242
5 files changed, 184 insertions, 102 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 42fa93f816..55b56b6873 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -6416,6 +6416,7 @@ version = "0.0.0"
dependencies = [
"async-trait",
"criterion",
+ "futures",
"log",
"mullvad-masque-proxy",
"nix 0.30.1",
diff --git a/mullvad-relay-selector/src/relay_selector/helpers.rs b/mullvad-relay-selector/src/relay_selector/helpers.rs
index ce527e7b0a..f80942d0f5 100644
--- a/mullvad-relay-selector/src/relay_selector/helpers.rs
+++ b/mullvad-relay-selector/src/relay_selector/helpers.rs
@@ -85,6 +85,13 @@ pub fn pick_random_relay_weighted<'a, RelayType>(
}
}
+/// Create a multiplexer obfuscator config
+///
+/// # Arguments
+/// * `udp2tcp_ports` - Available ports for UDP2TCP obfuscation
+/// * `shadowsocks_ports` - Available port ranges for Shadowsocks obfuscation
+/// * `obfuscator_relay` - The relay that will host the obfuscation services
+/// * `endpoint` - Selected endpoint
#[cfg(feature = "staggered-obfuscation")]
pub fn get_multiplexer_obfuscator(
udp2tcp_ports: &[u16],
@@ -261,8 +268,8 @@ fn get_shadowsocks_obfuscator_inner<R: RangeBounds<u16> + Iterator<Item = u16> +
}
/// Return `desired_port` if it is specified and included in `port_ranges`.
-/// If `desired_port` isn't specified, a random port from the ranges is returned.
-/// If `desired_port` is specified but not in range, an error is returned.
+/// If `desired_port` isn't specified, return a random port from the ranges.
+/// If `desired_port` is specified but not in range, return an error.
pub fn desired_or_random_port_from_range<R: RangeBounds<u16> + Iterator<Item = u16> + Clone>(
port_ranges: &[R],
desired_port: Constraint<u16>,
@@ -286,7 +293,7 @@ fn port_if_in_range<R: RangeBounds<u16>>(port_ranges: &[R], port: u16) -> Result
.ok_or(Error::NoMatchingPort)
}
-/// Selects a random port number from a list of provided port ranges.
+/// Select a random port number from a list of provided port ranges.
///
/// # Parameters
/// - `port_ranges`: A slice of port numbers.
diff --git a/talpid-types/src/net/obfuscation.rs b/talpid-types/src/net/obfuscation.rs
index 6c14a6e59c..26e095844b 100644
--- a/talpid-types/src/net/obfuscation.rs
+++ b/talpid-types/src/net/obfuscation.rs
@@ -3,11 +3,19 @@ use std::net::SocketAddr;
use super::{Endpoint, TransportProtocol};
+/// Available obfuscation configuration types.
#[derive(Clone, Eq, PartialEq, Deserialize, Serialize, Debug)]
pub enum Obfuscators {
+ /// A single obfuscation method
Single(ObfuscatorConfig),
+ /// Try multiple obfuscation methods (using `multiplexer` obfuscation).
+ ///
+ /// They are tested in the following order: `direct`, `config.0`, then
+ /// the remaining configs in `configs.1` in order.
Multiplexer {
+ /// Optional direct connection (no obfuscation) to try along with `configs`.
direct: Option<SocketAddr>,
+ /// Obfuscation configurations to try.
configs: (ObfuscatorConfig, Vec<ObfuscatorConfig>),
},
}
@@ -31,8 +39,17 @@ pub enum ObfuscatorConfig {
}
impl Obfuscators {
- /// Return a [Obfuscators::Multiplexer]. If `obfuscators` contains zero values,
- /// this returns `None`.
+ /// Create a multiplexer obfuscator configuration.
+ ///
+ /// See [Obfuscators::Multiplexer] for more details.
+ ///
+ /// # Arguments
+ /// * `direct` - Optional direct connection endpoint (no obfuscation)
+ /// * `obfuscators` - List of obfuscation methods to try, at least one.
+ ///
+ /// # Returns
+ /// * `Some(Obfuscators::Multiplexer)` if at least one obfuscation method is provided
+ /// * `None` if the obfuscators list is empty
pub fn multiplexer(
direct: Option<SocketAddr>,
obfuscators: &[ObfuscatorConfig],
@@ -46,7 +63,11 @@ impl Obfuscators {
})
}
- /// Return all potential obfuscation endpoints
+ /// Return all potential endpoints that this obfuscation configuration might connect to.
+ ///
+ /// For single obfuscators, return one endpoint. For `Obfuscators::Multiplexer`, return
+ /// all possible endpoints (direct + all obfuscated methods) that the multiplexer
+ /// might use, with duplicates removed.
pub fn endpoints(&self) -> Vec<Endpoint> {
match self {
Obfuscators::Single(config) => vec![config.endpoint()],
@@ -74,7 +95,7 @@ impl Obfuscators {
}
impl ObfuscatorConfig {
- /// Return obfuscation endpoint
+ /// Return obfuscation endpoint, i.e. the first remote hop that will be connected to
pub fn endpoint(&self) -> Endpoint {
match self {
ObfuscatorConfig::Udp2Tcp { endpoint } => Endpoint {
diff --git a/tunnel-obfuscation/Cargo.toml b/tunnel-obfuscation/Cargo.toml
index e84486e1cf..8af17fe898 100644
--- a/tunnel-obfuscation/Cargo.toml
+++ b/tunnel-obfuscation/Cargo.toml
@@ -11,6 +11,7 @@ rust-version.workspace = true
workspace = true
[dependencies]
+futures = { workspace = true }
log = { workspace = true }
async-trait = "0.1"
thiserror = { workspace = true }
diff --git a/tunnel-obfuscation/src/multiplexer.rs b/tunnel-obfuscation/src/multiplexer.rs
index 5ae303189b..e7a70fa059 100644
--- a/tunnel-obfuscation/src/multiplexer.rs
+++ b/tunnel-obfuscation/src/multiplexer.rs
@@ -1,3 +1,23 @@
+//! # Multiplexer Obfuscation
+//!
+//! This obfuscation module attempts to establish a connection through multiple obfuscation methods
+//! simultaneously. It acts as a UDP proxy that forwards WireGuard traffic through other
+//! obfuscation transports (UDP2TCP, Shadowsocks, QUIC, etc.)
+//! and automatically selects the first one that successfully establishes a connection.
+//!
+//! ## How it works
+//!
+//! 1. **Initial Setup**: The multiplexer creates a local UDP socket that WireGuard connects to
+//! 2. **Transport Spawning**: It progressively spawns different obfuscation transports at timed intervals
+//! 3. **Traffic Fanout**: All incoming WireGuard packets are fanned out to all active transports
+//! 4. **First Response Wins**: The first transport to receive a response from the server is selected
+//! 5. **Connection Establishment**: Once a transport is selected, the multiplexer switches to a
+//! direct forwarding mode between WireGuard and the selected transport
+//!
+//! ## Transport Types
+//!
+//! See the [Transport] enum.
+
use std::{
collections::{BTreeMap, VecDeque},
io,
@@ -7,25 +27,48 @@ use std::{
};
use async_trait::async_trait;
-use tokio::{net::UdpSocket, task::JoinHandle, time::Instant};
+use tokio::net::UdpSocket;
use tokio_util::task::AbortOnDropHandle;
use crate::socket::create_remote_socket;
const MAX_DATAGRAM_SIZE: usize = u16::MAX as usize;
+/// An obfuscator that manages multiple other obfuscators and automatically
+/// selects the first one that successfully establishes a connection.
+///
+/// The multiplexer operates in two phases:
+/// 1. **Discovery Phase**: Spawn transports progressively and fan out traffic to all of them
+/// 2. **Connected Phase**: Once a transport responds, switch to forwarding to that transport only
pub struct Multiplexer {
+ /// Local UDP socket that WireGuard connects to
client_socket: Arc<UdpSocket>,
+ /// Address of the client socket that WireGuard should connect to
+ client_socket_addr: SocketAddr,
+ /// IPv4 socket for communicating with obfuscation proxies
proxy_socket_v4: Arc<UdpSocket>,
+ /// IPv6 socket for communicating with obfuscation proxies
proxy_socket_v6: Arc<UdpSocket>,
- client_socket_addr: SocketAddr,
+ /// Map of currently active transport endpoints and their configurations
running_endpoints: BTreeMap<SocketAddr, Transport>,
+ /// Queue of transports to spawn (in priority order)
transports: VecDeque<Transport>,
+ /// Buffer of initial packets received from WireGuard to replay to new transports
initial_packets_to_send: Vec<Vec<u8>>,
- tasks: Vec<JoinHandle<()>>,
+ /// Handles to spawned obfuscation tasks
+ tasks: Vec<AbortOnDropHandle<()>>,
+ /// Address of WG endpoint socket
+ wg_addr: Option<SocketAddr>,
}
impl Multiplexer {
+ /// Create a new multiplexer with the specified transports (obfuscators) and settings.
+ ///
+ /// # Arguments
+ /// * `settings` - Configuration containing the list of transports to try and network settings
+ ///
+ /// # Returns
+ /// A new multiplexer instance ready to start obfuscation discovery
pub async fn new(settings: &Settings) -> crate::Result<Self> {
let client_socket = UdpSocket::bind((Ipv4Addr::LOCALHOST, 0))
.await
@@ -50,13 +93,14 @@ impl Multiplexer {
Ok(Self {
client_socket: Arc::new(client_socket),
+ client_socket_addr,
proxy_socket_v4: Arc::new(proxy_socket_v4),
proxy_socket_v6: Arc::new(proxy_socket_v6),
- client_socket_addr,
running_endpoints: BTreeMap::new(),
transports: VecDeque::from(settings.transports.clone()),
tasks: vec![],
initial_packets_to_send: vec![],
+ wg_addr: None,
})
}
@@ -68,6 +112,13 @@ impl Multiplexer {
}
}
+ /// Start the multiplexer in discovery mode.
+ ///
+ /// Run the main event loop:
+ /// 1. Receive packets from WireGuard and fan them out to all active transports
+ /// 2. Receive responses from obfuscation proxies
+ /// 3. Spawn new transports at timed intervals
+ /// 4. Switch to connected mode when the first transport responds successfully
async fn start(mut self) -> io::Result<()> {
log::debug!("Running multiplexer obfuscation");
@@ -75,51 +126,27 @@ impl Multiplexer {
let mut obfs_recv_v4_buf = vec![0u8; MAX_DATAGRAM_SIZE];
let mut obfs_recv_v6_buf = vec![0u8; MAX_DATAGRAM_SIZE];
- let delay = tokio::time::interval_at(Instant::now(), Duration::from_secs(1));
- tokio::pin!(delay);
-
- // Address of WG endpoint socket
- let mut wg_addr = None;
+ let mut delay = tokio::time::interval(Duration::from_secs(1));
- // Helper to fan out a packet to all currently running endpoints
+ /// Helper to fan out a packet to all currently running endpoints
async fn send_to_all<'a>(
endpoints: &BTreeMap<SocketAddr, Transport>,
get_socket: impl Fn(SocketAddr) -> &'a Arc<UdpSocket>,
packet: &[u8],
) {
- for addr in endpoints.keys() {
- let udp = get_socket(*addr);
- log::info!("Sending received packet to proxy {addr}");
- if let Err(err) = udp.send_to(packet, addr).await {
- log::error!("Failed to send received packet to proxy {addr}: {err}");
- } else {
- log::info!("Successfully sent traffic to obfuscator {addr}");
- }
- }
- }
-
- // Handler for packets received from any proxy.
- // This returns true if we forwarded received bytes from an obfuscator
- // back to wireguard
- async fn process_obfuscator_recv(
- wg_addr: SocketAddr,
- client_socket: &UdpSocket,
- running_endpoints: &BTreeMap<SocketAddr, Transport>,
- obfuscator_addr: SocketAddr,
- received: &[u8],
- ) -> bool {
- if let Some(transport_config) = running_endpoints.get(&obfuscator_addr) {
- log::debug!(
- "Selecting {:?} as valid transport configuration via {obfuscator_addr}",
- transport_config
- );
- } else {
- log::trace!("Ignoring data from unexpected address {obfuscator_addr}");
- return false;
+ let mut futs = vec![];
+ for &addr in endpoints.keys() {
+ let udp = get_socket(addr);
+ futs.push(async move {
+ log::info!("Sending received packet to proxy {addr}");
+ if let Err(err) = udp.send_to(packet, addr).await {
+ log::error!("Failed to send received packet to proxy {addr}: {err}");
+ } else {
+ log::info!("Successfully sent traffic to obfuscator {addr}");
+ }
+ });
}
-
- let _ = client_socket.send_to(received, wg_addr).await;
- true
+ futures::future::join_all(futs).await;
}
loop {
@@ -128,7 +155,12 @@ impl Multiplexer {
socket_recv = self.client_socket.recv_from(&mut wg_recv_buf) => {
match socket_recv {
Ok((bytes_received, from_addr)) => {
- wg_addr = Some(from_addr);
+ if let Some(prev_addr) = self.wg_addr && prev_addr != from_addr {
+ log::debug!(
+ "WireGuard endpoint address changed from {prev_addr} to {from_addr}"
+ );
+ }
+ self.wg_addr = Some(from_addr);
let pkt = &wg_recv_buf[..bytes_received];
self.initial_packets_to_send.push(pkt.to_vec());
@@ -148,44 +180,12 @@ impl Multiplexer {
// From any IPv4 proxy
obfuscator_recv = self.proxy_socket_v4.recv_from(&mut obfs_recv_v4_buf) => {
- match obfuscator_recv {
- Ok((n, obfuscator_addr)) => {
- if let Some(wg_addr) = wg_addr && process_obfuscator_recv(
- wg_addr,
- &self.client_socket,
- &self.running_endpoints,
- obfuscator_addr,
- &obfs_recv_v4_buf[..n]
- ).await {
- return self.run_connected(wg_addr, obfuscator_addr).await;
- }
- },
- Err(err) => {
- log::error!("Failed to receive traffic from obfuscators: {err}");
- return Err(err);
- }
- }
+ self.process_obfuscator_recv(obfuscator_recv.map(|(n, addr)| (&obfs_recv_v4_buf[..n], addr))).await?;
},
// From any IPv6 proxy
obfuscator_recv = self.proxy_socket_v6.recv_from(&mut obfs_recv_v6_buf) => {
- match obfuscator_recv {
- Ok((n, obfuscator_addr)) => {
- if let Some(wg_addr) = wg_addr && process_obfuscator_recv(
- wg_addr,
- &self.client_socket,
- &self.running_endpoints,
- obfuscator_addr,
- &obfs_recv_v6_buf[..n]
- ).await {
- return self.run_connected(wg_addr, obfuscator_addr).await;
- }
- },
- Err(err) => {
- log::error!("Failed to receive traffic from obfuscators: {err}");
- return Err(err);
- }
- }
+ self.process_obfuscator_recv(obfuscator_recv.map(|(n, addr)| (&obfs_recv_v6_buf[..n], addr))).await?;
},
// Spawning the next transport
@@ -199,8 +199,52 @@ impl Multiplexer {
}
}
+ /// Handler for packets received from any proxy.
+ ///
+ /// If received bytes were forwarded from an obfuscator back to wireguard, this indicates that
+ /// a handshake response was received (hopefully) and that we should switch to connected mode.
+ ///
+ /// If a packet was received, this continues running until `run_connected` returns.
+ async fn process_obfuscator_recv(
+ &self,
+ obfuscator_recv: io::Result<(&[u8], SocketAddr)>,
+ ) -> io::Result<()> {
+ match obfuscator_recv {
+ Ok((received, obfuscator_addr)) => {
+ let Some(transport_config) = self.running_endpoints.get(&obfuscator_addr) else {
+ log::trace!("Ignoring data from unexpected address {obfuscator_addr}");
+ return Ok(());
+ };
+ let Some(wg_addr) = self.wg_addr else {
+ log::trace!(
+ "Received data from {obfuscator_addr} before receiving any data from WireGuard"
+ );
+ return Ok(());
+ };
+ log::debug!(
+ "Selecting {:?} as valid transport configuration via {obfuscator_addr}",
+ transport_config
+ );
+ let _ = self.client_socket.send_to(received, wg_addr).await;
+ self.run_connected(wg_addr, obfuscator_addr).await
+ }
+ Err(err) => {
+ log::error!("Failed to receive traffic from obfuscators: {err}");
+ Err(err)
+ }
+ }
+ }
+
+ /// Switch to connected mode after a transport has been successfully selected.
+ ///
+ /// In this mode, the multiplexer acts as a simple UDP proxy between WireGuard
+ /// and the selected obfuscation transport.
+ ///
+ /// # Arguments
+ /// * `local_address` - Address of the local WireGuard instance
+ /// * `proxy_address` - Address of the selected obfuscation proxy
async fn run_connected(
- self,
+ &self,
local_address: SocketAddr,
proxy_address: SocketAddr,
) -> io::Result<()> {
@@ -219,7 +263,7 @@ impl Multiplexer {
let tx_client_socket = self.client_socket.clone();
let tx_proxy_socket = proxy_socket.clone();
- let tx_task: JoinHandle<io::Result<()>> = tokio::spawn(async move {
+ let tx_task = tokio::spawn(async move {
loop {
let n = tx_client_socket.recv(&mut wg_recv_buf).await?;
tx_proxy_socket
@@ -228,11 +272,12 @@ impl Multiplexer {
}
});
let mut tx_task = AbortOnDropHandle::new(tx_task);
+ let client_socket = self.client_socket.clone();
- let rx_task: JoinHandle<io::Result<()>> = tokio::spawn(async move {
+ let rx_task = tokio::spawn(async move {
loop {
let (n, _src) = proxy_socket.recv_from(&mut obfuscator_recv_buf).await?;
- self.client_socket.send(&obfuscator_recv_buf[..n]).await?;
+ client_socket.send(&obfuscator_recv_buf[..n]).await?;
}
});
let mut rx_task = AbortOnDropHandle::new(rx_task);
@@ -244,6 +289,13 @@ impl Multiplexer {
}
}
+ /// Spawn a new obfuscation transport and add it to the active set.
+ ///
+ /// For direct transports, simply register the endpoint. For obfuscated
+ /// transports, start the obfuscation process in a background task.
+ ///
+ /// # Arguments
+ /// * `transport` - The obfuscation type to spawn
async fn spawn_new_transport(&mut self, transport: Transport) -> crate::Result<()> {
let endpoint = match transport.clone() {
Transport::Direct(addr) => {
@@ -256,10 +308,11 @@ impl Multiplexer {
let endpoint = obfuscator.endpoint();
self.running_endpoints
.insert(endpoint, Transport::Obfuscated(obfuscator_settings));
- self.tasks.push(tokio::spawn(async move {
- log::info!("Spawning new obfuscator");
- let _ = obfuscator.run().await;
- }));
+ self.tasks
+ .push(AbortOnDropHandle::new(tokio::spawn(async move {
+ log::info!("Spawning new obfuscator");
+ let _ = obfuscator.run().await;
+ })));
Ok(endpoint)
}
}?;
@@ -279,25 +332,24 @@ impl Multiplexer {
}
}
-impl Drop for Multiplexer {
- fn drop(&mut self) {
- for task in &self.tasks {
- task.abort();
- }
- }
-}
-
+/// Configuration settings for multiplexer obfuscation
#[derive(Debug, Clone)]
pub struct Settings {
- /// Transports to use, ordered by highest to lowest priority
+ /// List of transports to try, ordered by priority (highest to lowest).
+ /// Spawn these transports progressively and select
+ /// the first one that successfully establishes a connection.
pub transports: Vec<Transport>,
+ /// Linux-specific firewall mark for outgoing connections
#[cfg(target_os = "linux")]
pub fwmark: Option<u32>,
}
+/// Represents a transport method that the multiplexer can use.
#[derive(Clone, Debug)]
pub enum Transport {
+ /// Direct UDP forwarding without any obfuscation
Direct(SocketAddr),
+ /// An obfuscated transport (UDP2TCP, Shadowsocks, QUIC, etc.)
Obfuscated(crate::Settings),
}
@@ -321,6 +373,6 @@ impl crate::Obfuscator for Multiplexer {
#[cfg(target_os = "android")]
fn remote_socket_fd(&self) -> std::os::unix::io::RawFd {
- unimplemented!("note that we must punch a hole for all obfuscators")
+ unimplemented!("must return the socket fd of every obfuscator here")
}
}