diff options
| -rw-r--r-- | Cargo.lock | 1 | ||||
| -rw-r--r-- | mullvad-relay-selector/src/relay_selector/helpers.rs | 13 | ||||
| -rw-r--r-- | talpid-types/src/net/obfuscation.rs | 29 | ||||
| -rw-r--r-- | tunnel-obfuscation/Cargo.toml | 1 | ||||
| -rw-r--r-- | tunnel-obfuscation/src/multiplexer.rs | 242 |
5 files changed, 184 insertions, 102 deletions
diff --git a/Cargo.lock b/Cargo.lock index 42fa93f816..55b56b6873 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6416,6 +6416,7 @@ version = "0.0.0" dependencies = [ "async-trait", "criterion", + "futures", "log", "mullvad-masque-proxy", "nix 0.30.1", diff --git a/mullvad-relay-selector/src/relay_selector/helpers.rs b/mullvad-relay-selector/src/relay_selector/helpers.rs index ce527e7b0a..f80942d0f5 100644 --- a/mullvad-relay-selector/src/relay_selector/helpers.rs +++ b/mullvad-relay-selector/src/relay_selector/helpers.rs @@ -85,6 +85,13 @@ pub fn pick_random_relay_weighted<'a, RelayType>( } } +/// Create a multiplexer obfuscator config +/// +/// # Arguments +/// * `udp2tcp_ports` - Available ports for UDP2TCP obfuscation +/// * `shadowsocks_ports` - Available port ranges for Shadowsocks obfuscation +/// * `obfuscator_relay` - The relay that will host the obfuscation services +/// * `endpoint` - Selected endpoint #[cfg(feature = "staggered-obfuscation")] pub fn get_multiplexer_obfuscator( udp2tcp_ports: &[u16], @@ -261,8 +268,8 @@ fn get_shadowsocks_obfuscator_inner<R: RangeBounds<u16> + Iterator<Item = u16> + } /// Return `desired_port` if it is specified and included in `port_ranges`. -/// If `desired_port` isn't specified, a random port from the ranges is returned. -/// If `desired_port` is specified but not in range, an error is returned. +/// If `desired_port` isn't specified, return a random port from the ranges. +/// If `desired_port` is specified but not in range, return an error. pub fn desired_or_random_port_from_range<R: RangeBounds<u16> + Iterator<Item = u16> + Clone>( port_ranges: &[R], desired_port: Constraint<u16>, @@ -286,7 +293,7 @@ fn port_if_in_range<R: RangeBounds<u16>>(port_ranges: &[R], port: u16) -> Result .ok_or(Error::NoMatchingPort) } -/// Selects a random port number from a list of provided port ranges. +/// Select a random port number from a list of provided port ranges. /// /// # Parameters /// - `port_ranges`: A slice of port numbers. diff --git a/talpid-types/src/net/obfuscation.rs b/talpid-types/src/net/obfuscation.rs index 6c14a6e59c..26e095844b 100644 --- a/talpid-types/src/net/obfuscation.rs +++ b/talpid-types/src/net/obfuscation.rs @@ -3,11 +3,19 @@ use std::net::SocketAddr; use super::{Endpoint, TransportProtocol}; +/// Available obfuscation configuration types. #[derive(Clone, Eq, PartialEq, Deserialize, Serialize, Debug)] pub enum Obfuscators { + /// A single obfuscation method Single(ObfuscatorConfig), + /// Try multiple obfuscation methods (using `multiplexer` obfuscation). + /// + /// They are tested in the following order: `direct`, `config.0`, then + /// the remaining configs in `configs.1` in order. Multiplexer { + /// Optional direct connection (no obfuscation) to try along with `configs`. direct: Option<SocketAddr>, + /// Obfuscation configurations to try. configs: (ObfuscatorConfig, Vec<ObfuscatorConfig>), }, } @@ -31,8 +39,17 @@ pub enum ObfuscatorConfig { } impl Obfuscators { - /// Return a [Obfuscators::Multiplexer]. If `obfuscators` contains zero values, - /// this returns `None`. + /// Create a multiplexer obfuscator configuration. + /// + /// See [Obfuscators::Multiplexer] for more details. + /// + /// # Arguments + /// * `direct` - Optional direct connection endpoint (no obfuscation) + /// * `obfuscators` - List of obfuscation methods to try, at least one. + /// + /// # Returns + /// * `Some(Obfuscators::Multiplexer)` if at least one obfuscation method is provided + /// * `None` if the obfuscators list is empty pub fn multiplexer( direct: Option<SocketAddr>, obfuscators: &[ObfuscatorConfig], @@ -46,7 +63,11 @@ impl Obfuscators { }) } - /// Return all potential obfuscation endpoints + /// Return all potential endpoints that this obfuscation configuration might connect to. + /// + /// For single obfuscators, return one endpoint. For `Obfuscators::Multiplexer`, return + /// all possible endpoints (direct + all obfuscated methods) that the multiplexer + /// might use, with duplicates removed. pub fn endpoints(&self) -> Vec<Endpoint> { match self { Obfuscators::Single(config) => vec![config.endpoint()], @@ -74,7 +95,7 @@ impl Obfuscators { } impl ObfuscatorConfig { - /// Return obfuscation endpoint + /// Return obfuscation endpoint, i.e. the first remote hop that will be connected to pub fn endpoint(&self) -> Endpoint { match self { ObfuscatorConfig::Udp2Tcp { endpoint } => Endpoint { diff --git a/tunnel-obfuscation/Cargo.toml b/tunnel-obfuscation/Cargo.toml index e84486e1cf..8af17fe898 100644 --- a/tunnel-obfuscation/Cargo.toml +++ b/tunnel-obfuscation/Cargo.toml @@ -11,6 +11,7 @@ rust-version.workspace = true workspace = true [dependencies] +futures = { workspace = true } log = { workspace = true } async-trait = "0.1" thiserror = { workspace = true } diff --git a/tunnel-obfuscation/src/multiplexer.rs b/tunnel-obfuscation/src/multiplexer.rs index 5ae303189b..e7a70fa059 100644 --- a/tunnel-obfuscation/src/multiplexer.rs +++ b/tunnel-obfuscation/src/multiplexer.rs @@ -1,3 +1,23 @@ +//! # Multiplexer Obfuscation +//! +//! This obfuscation module attempts to establish a connection through multiple obfuscation methods +//! simultaneously. It acts as a UDP proxy that forwards WireGuard traffic through other +//! obfuscation transports (UDP2TCP, Shadowsocks, QUIC, etc.) +//! and automatically selects the first one that successfully establishes a connection. +//! +//! ## How it works +//! +//! 1. **Initial Setup**: The multiplexer creates a local UDP socket that WireGuard connects to +//! 2. **Transport Spawning**: It progressively spawns different obfuscation transports at timed intervals +//! 3. **Traffic Fanout**: All incoming WireGuard packets are fanned out to all active transports +//! 4. **First Response Wins**: The first transport to receive a response from the server is selected +//! 5. **Connection Establishment**: Once a transport is selected, the multiplexer switches to a +//! direct forwarding mode between WireGuard and the selected transport +//! +//! ## Transport Types +//! +//! See the [Transport] enum. + use std::{ collections::{BTreeMap, VecDeque}, io, @@ -7,25 +27,48 @@ use std::{ }; use async_trait::async_trait; -use tokio::{net::UdpSocket, task::JoinHandle, time::Instant}; +use tokio::net::UdpSocket; use tokio_util::task::AbortOnDropHandle; use crate::socket::create_remote_socket; const MAX_DATAGRAM_SIZE: usize = u16::MAX as usize; +/// An obfuscator that manages multiple other obfuscators and automatically +/// selects the first one that successfully establishes a connection. +/// +/// The multiplexer operates in two phases: +/// 1. **Discovery Phase**: Spawn transports progressively and fan out traffic to all of them +/// 2. **Connected Phase**: Once a transport responds, switch to forwarding to that transport only pub struct Multiplexer { + /// Local UDP socket that WireGuard connects to client_socket: Arc<UdpSocket>, + /// Address of the client socket that WireGuard should connect to + client_socket_addr: SocketAddr, + /// IPv4 socket for communicating with obfuscation proxies proxy_socket_v4: Arc<UdpSocket>, + /// IPv6 socket for communicating with obfuscation proxies proxy_socket_v6: Arc<UdpSocket>, - client_socket_addr: SocketAddr, + /// Map of currently active transport endpoints and their configurations running_endpoints: BTreeMap<SocketAddr, Transport>, + /// Queue of transports to spawn (in priority order) transports: VecDeque<Transport>, + /// Buffer of initial packets received from WireGuard to replay to new transports initial_packets_to_send: Vec<Vec<u8>>, - tasks: Vec<JoinHandle<()>>, + /// Handles to spawned obfuscation tasks + tasks: Vec<AbortOnDropHandle<()>>, + /// Address of WG endpoint socket + wg_addr: Option<SocketAddr>, } impl Multiplexer { + /// Create a new multiplexer with the specified transports (obfuscators) and settings. + /// + /// # Arguments + /// * `settings` - Configuration containing the list of transports to try and network settings + /// + /// # Returns + /// A new multiplexer instance ready to start obfuscation discovery pub async fn new(settings: &Settings) -> crate::Result<Self> { let client_socket = UdpSocket::bind((Ipv4Addr::LOCALHOST, 0)) .await @@ -50,13 +93,14 @@ impl Multiplexer { Ok(Self { client_socket: Arc::new(client_socket), + client_socket_addr, proxy_socket_v4: Arc::new(proxy_socket_v4), proxy_socket_v6: Arc::new(proxy_socket_v6), - client_socket_addr, running_endpoints: BTreeMap::new(), transports: VecDeque::from(settings.transports.clone()), tasks: vec![], initial_packets_to_send: vec![], + wg_addr: None, }) } @@ -68,6 +112,13 @@ impl Multiplexer { } } + /// Start the multiplexer in discovery mode. + /// + /// Run the main event loop: + /// 1. Receive packets from WireGuard and fan them out to all active transports + /// 2. Receive responses from obfuscation proxies + /// 3. Spawn new transports at timed intervals + /// 4. Switch to connected mode when the first transport responds successfully async fn start(mut self) -> io::Result<()> { log::debug!("Running multiplexer obfuscation"); @@ -75,51 +126,27 @@ impl Multiplexer { let mut obfs_recv_v4_buf = vec![0u8; MAX_DATAGRAM_SIZE]; let mut obfs_recv_v6_buf = vec![0u8; MAX_DATAGRAM_SIZE]; - let delay = tokio::time::interval_at(Instant::now(), Duration::from_secs(1)); - tokio::pin!(delay); - - // Address of WG endpoint socket - let mut wg_addr = None; + let mut delay = tokio::time::interval(Duration::from_secs(1)); - // Helper to fan out a packet to all currently running endpoints + /// Helper to fan out a packet to all currently running endpoints async fn send_to_all<'a>( endpoints: &BTreeMap<SocketAddr, Transport>, get_socket: impl Fn(SocketAddr) -> &'a Arc<UdpSocket>, packet: &[u8], ) { - for addr in endpoints.keys() { - let udp = get_socket(*addr); - log::info!("Sending received packet to proxy {addr}"); - if let Err(err) = udp.send_to(packet, addr).await { - log::error!("Failed to send received packet to proxy {addr}: {err}"); - } else { - log::info!("Successfully sent traffic to obfuscator {addr}"); - } - } - } - - // Handler for packets received from any proxy. - // This returns true if we forwarded received bytes from an obfuscator - // back to wireguard - async fn process_obfuscator_recv( - wg_addr: SocketAddr, - client_socket: &UdpSocket, - running_endpoints: &BTreeMap<SocketAddr, Transport>, - obfuscator_addr: SocketAddr, - received: &[u8], - ) -> bool { - if let Some(transport_config) = running_endpoints.get(&obfuscator_addr) { - log::debug!( - "Selecting {:?} as valid transport configuration via {obfuscator_addr}", - transport_config - ); - } else { - log::trace!("Ignoring data from unexpected address {obfuscator_addr}"); - return false; + let mut futs = vec![]; + for &addr in endpoints.keys() { + let udp = get_socket(addr); + futs.push(async move { + log::info!("Sending received packet to proxy {addr}"); + if let Err(err) = udp.send_to(packet, addr).await { + log::error!("Failed to send received packet to proxy {addr}: {err}"); + } else { + log::info!("Successfully sent traffic to obfuscator {addr}"); + } + }); } - - let _ = client_socket.send_to(received, wg_addr).await; - true + futures::future::join_all(futs).await; } loop { @@ -128,7 +155,12 @@ impl Multiplexer { socket_recv = self.client_socket.recv_from(&mut wg_recv_buf) => { match socket_recv { Ok((bytes_received, from_addr)) => { - wg_addr = Some(from_addr); + if let Some(prev_addr) = self.wg_addr && prev_addr != from_addr { + log::debug!( + "WireGuard endpoint address changed from {prev_addr} to {from_addr}" + ); + } + self.wg_addr = Some(from_addr); let pkt = &wg_recv_buf[..bytes_received]; self.initial_packets_to_send.push(pkt.to_vec()); @@ -148,44 +180,12 @@ impl Multiplexer { // From any IPv4 proxy obfuscator_recv = self.proxy_socket_v4.recv_from(&mut obfs_recv_v4_buf) => { - match obfuscator_recv { - Ok((n, obfuscator_addr)) => { - if let Some(wg_addr) = wg_addr && process_obfuscator_recv( - wg_addr, - &self.client_socket, - &self.running_endpoints, - obfuscator_addr, - &obfs_recv_v4_buf[..n] - ).await { - return self.run_connected(wg_addr, obfuscator_addr).await; - } - }, - Err(err) => { - log::error!("Failed to receive traffic from obfuscators: {err}"); - return Err(err); - } - } + self.process_obfuscator_recv(obfuscator_recv.map(|(n, addr)| (&obfs_recv_v4_buf[..n], addr))).await?; }, // From any IPv6 proxy obfuscator_recv = self.proxy_socket_v6.recv_from(&mut obfs_recv_v6_buf) => { - match obfuscator_recv { - Ok((n, obfuscator_addr)) => { - if let Some(wg_addr) = wg_addr && process_obfuscator_recv( - wg_addr, - &self.client_socket, - &self.running_endpoints, - obfuscator_addr, - &obfs_recv_v6_buf[..n] - ).await { - return self.run_connected(wg_addr, obfuscator_addr).await; - } - }, - Err(err) => { - log::error!("Failed to receive traffic from obfuscators: {err}"); - return Err(err); - } - } + self.process_obfuscator_recv(obfuscator_recv.map(|(n, addr)| (&obfs_recv_v6_buf[..n], addr))).await?; }, // Spawning the next transport @@ -199,8 +199,52 @@ impl Multiplexer { } } + /// Handler for packets received from any proxy. + /// + /// If received bytes were forwarded from an obfuscator back to wireguard, this indicates that + /// a handshake response was received (hopefully) and that we should switch to connected mode. + /// + /// If a packet was received, this continues running until `run_connected` returns. + async fn process_obfuscator_recv( + &self, + obfuscator_recv: io::Result<(&[u8], SocketAddr)>, + ) -> io::Result<()> { + match obfuscator_recv { + Ok((received, obfuscator_addr)) => { + let Some(transport_config) = self.running_endpoints.get(&obfuscator_addr) else { + log::trace!("Ignoring data from unexpected address {obfuscator_addr}"); + return Ok(()); + }; + let Some(wg_addr) = self.wg_addr else { + log::trace!( + "Received data from {obfuscator_addr} before receiving any data from WireGuard" + ); + return Ok(()); + }; + log::debug!( + "Selecting {:?} as valid transport configuration via {obfuscator_addr}", + transport_config + ); + let _ = self.client_socket.send_to(received, wg_addr).await; + self.run_connected(wg_addr, obfuscator_addr).await + } + Err(err) => { + log::error!("Failed to receive traffic from obfuscators: {err}"); + Err(err) + } + } + } + + /// Switch to connected mode after a transport has been successfully selected. + /// + /// In this mode, the multiplexer acts as a simple UDP proxy between WireGuard + /// and the selected obfuscation transport. + /// + /// # Arguments + /// * `local_address` - Address of the local WireGuard instance + /// * `proxy_address` - Address of the selected obfuscation proxy async fn run_connected( - self, + &self, local_address: SocketAddr, proxy_address: SocketAddr, ) -> io::Result<()> { @@ -219,7 +263,7 @@ impl Multiplexer { let tx_client_socket = self.client_socket.clone(); let tx_proxy_socket = proxy_socket.clone(); - let tx_task: JoinHandle<io::Result<()>> = tokio::spawn(async move { + let tx_task = tokio::spawn(async move { loop { let n = tx_client_socket.recv(&mut wg_recv_buf).await?; tx_proxy_socket @@ -228,11 +272,12 @@ impl Multiplexer { } }); let mut tx_task = AbortOnDropHandle::new(tx_task); + let client_socket = self.client_socket.clone(); - let rx_task: JoinHandle<io::Result<()>> = tokio::spawn(async move { + let rx_task = tokio::spawn(async move { loop { let (n, _src) = proxy_socket.recv_from(&mut obfuscator_recv_buf).await?; - self.client_socket.send(&obfuscator_recv_buf[..n]).await?; + client_socket.send(&obfuscator_recv_buf[..n]).await?; } }); let mut rx_task = AbortOnDropHandle::new(rx_task); @@ -244,6 +289,13 @@ impl Multiplexer { } } + /// Spawn a new obfuscation transport and add it to the active set. + /// + /// For direct transports, simply register the endpoint. For obfuscated + /// transports, start the obfuscation process in a background task. + /// + /// # Arguments + /// * `transport` - The obfuscation type to spawn async fn spawn_new_transport(&mut self, transport: Transport) -> crate::Result<()> { let endpoint = match transport.clone() { Transport::Direct(addr) => { @@ -256,10 +308,11 @@ impl Multiplexer { let endpoint = obfuscator.endpoint(); self.running_endpoints .insert(endpoint, Transport::Obfuscated(obfuscator_settings)); - self.tasks.push(tokio::spawn(async move { - log::info!("Spawning new obfuscator"); - let _ = obfuscator.run().await; - })); + self.tasks + .push(AbortOnDropHandle::new(tokio::spawn(async move { + log::info!("Spawning new obfuscator"); + let _ = obfuscator.run().await; + }))); Ok(endpoint) } }?; @@ -279,25 +332,24 @@ impl Multiplexer { } } -impl Drop for Multiplexer { - fn drop(&mut self) { - for task in &self.tasks { - task.abort(); - } - } -} - +/// Configuration settings for multiplexer obfuscation #[derive(Debug, Clone)] pub struct Settings { - /// Transports to use, ordered by highest to lowest priority + /// List of transports to try, ordered by priority (highest to lowest). + /// Spawn these transports progressively and select + /// the first one that successfully establishes a connection. pub transports: Vec<Transport>, + /// Linux-specific firewall mark for outgoing connections #[cfg(target_os = "linux")] pub fwmark: Option<u32>, } +/// Represents a transport method that the multiplexer can use. #[derive(Clone, Debug)] pub enum Transport { + /// Direct UDP forwarding without any obfuscation Direct(SocketAddr), + /// An obfuscated transport (UDP2TCP, Shadowsocks, QUIC, etc.) Obfuscated(crate::Settings), } @@ -321,6 +373,6 @@ impl crate::Obfuscator for Multiplexer { #[cfg(target_os = "android")] fn remote_socket_fd(&self) -> std::os::unix::io::RawFd { - unimplemented!("note that we must punch a hole for all obfuscators") + unimplemented!("must return the socket fd of every obfuscator here") } } |
