diff options
| -rw-r--r-- | mullvad-masque-proxy/src/client/mod.rs | 23 | ||||
| -rw-r--r-- | mullvad-masque-proxy/src/fragment.rs | 30 | ||||
| -rw-r--r-- | mullvad-masque-proxy/src/server/mod.rs | 8 | ||||
| -rw-r--r-- | talpid-wireguard/src/ephemeral.rs | 30 | ||||
| -rw-r--r-- | talpid-wireguard/src/lib.rs | 21 | ||||
| -rw-r--r-- | talpid-wireguard/src/obfuscation.rs | 34 | ||||
| -rw-r--r-- | tunnel-obfuscation/src/quic.rs | 4 |
7 files changed, 103 insertions, 47 deletions
diff --git a/mullvad-masque-proxy/src/client/mod.rs b/mullvad-masque-proxy/src/client/mod.rs index 6163c63d25..e9d19fabf0 100644 --- a/mullvad-masque-proxy/src/client/mod.rs +++ b/mullvad-masque-proxy/src/client/mod.rs @@ -28,7 +28,7 @@ use quinn::{ use crate::{ MASQUE_WELL_KNOWN_PATH, MAX_INFLIGHT_PACKETS, MIN_IPV4_MTU, MIN_IPV6_MTU, QUIC_HEADER_SIZE, compute_udp_payload_size, - fragment::{self, Fragments}, + fragment::{self, DefragReceived, Fragments}, stats::Stats, }; @@ -553,14 +553,27 @@ async fn client_socket_tx_task( continue; } let payload = response.into_payload(); + let original_payload_len = payload.len(); - if let Ok(Some(payload)) = fragments.handle_incoming_packet(payload) { - stats.rx(payload.len(), false /* TODO */); - + let send = async |payload: &[u8]| -> Result<()> { client_socket - .send_to(payload.chunk(), return_addr) + .send_to(payload, return_addr) .await .map_err(Error::ClientWrite)?; + Ok(()) + }; + + match fragments.handle_incoming_packet(payload) { + Ok(DefragReceived::Nonfragmented(payload)) => { + stats.rx(payload.len(), false); + send(payload.chunk()).await?; + } + Ok(DefragReceived::Reassembled(reassembled_payload)) => { + stats.rx(original_payload_len, true); + send(reassembled_payload.chunk()).await?; + } + Ok(DefragReceived::Fragment) => stats.rx(original_payload_len, true), + Err(_) => (), } } diff --git a/mullvad-masque-proxy/src/fragment.rs b/mullvad-masque-proxy/src/fragment.rs index f6e648af9e..534fb8bff5 100644 --- a/mullvad-masque-proxy/src/fragment.rs +++ b/mullvad-masque-proxy/src/fragment.rs @@ -50,15 +50,24 @@ impl Default for Fragments { } } +pub enum DefragReceived { + /// Received a whole packet without fragmentation + Nonfragmented(Bytes), + /// Received a fragment but was unable to reassemble the packet + Fragment, + /// Received reassembled packet + Reassembled(Bytes), +} + impl Fragments { // TODO: Let caller provide output buffer. pub fn handle_incoming_packet( &mut self, mut payload: Bytes, - ) -> Result<Option<Bytes>, DefragError> { + ) -> Result<DefragReceived, DefragError> { match VarInt::decode(&mut payload) { Ok(crate::HTTP_MASQUE_DATAGRAM_CONTEXT_ID) => { - return Ok(Some(payload)); + return Ok(DefragReceived::Nonfragmented(payload)); } Ok(crate::HTTP_MASQUE_FRAGMENTED_DATAGRAM_CONTEXT_ID) => {} unexpected_context_id => { @@ -95,7 +104,11 @@ impl Fragments { let fragments = self.fragment_map.entry(id).or_default(); fragments.push(fragment); - Ok(self.try_reassemble(id, fragment_count)) + let reassembled = self.try_reassemble(id, fragment_count) + .map(DefragReceived::Reassembled) + // TODO: This may also occur if a packet is discarded + .unwrap_or(DefragReceived::Fragment); + Ok(reassembled) } // TODO: Let caller provide output buffer. @@ -199,7 +212,7 @@ mod test { fragment_buf.shuffle(&mut thread_rng()); for fragment in fragment_buf { - if let Some(reconstructed_packet) = + if let DefragReceived::Reassembled(reconstructed_packet) = fragments.handle_incoming_packet(fragment).unwrap() { assert_eq!(payload.as_slice(), reconstructed_packet.as_ref()); @@ -237,7 +250,10 @@ mod test { let packet = fragments .handle_incoming_packet(fragment_buf.pop().unwrap()) .unwrap(); - assert!(packet.is_none(), "haven't sent all fragments yet"); + assert!( + matches!(packet, DefragReceived::Fragment), + "haven't sent all fragments yet" + ); // then send a bunch of fragments to fill the queue let mut bad_payload = Bytes::from([0u8; 2].to_vec()); @@ -253,11 +269,11 @@ mod test { let packet = fragments .handle_incoming_packet(incomplete_fragment.clone()) .unwrap(); - assert!(packet.is_none()); + assert!(matches!(packet, DefragReceived::Fragment)); } for fragment in fragment_buf { - if let Some(reconstructed_packet) = + if let DefragReceived::Reassembled(reconstructed_packet) = fragments.handle_incoming_packet(fragment).unwrap() { assert_eq!(payload.as_slice(), reconstructed_packet.as_ref()); diff --git a/mullvad-masque-proxy/src/server/mod.rs b/mullvad-masque-proxy/src/server/mod.rs index f8f224d485..3788ca7c80 100644 --- a/mullvad-masque-proxy/src/server/mod.rs +++ b/mullvad-masque-proxy/src/server/mod.rs @@ -22,7 +22,7 @@ use typed_builder::TypedBuilder; use crate::{ MASQUE_WELL_KNOWN_PATH, MAX_INFLIGHT_PACKETS, MIN_IPV4_MTU, MIN_IPV6_MTU, QUIC_HEADER_SIZE, compute_udp_payload_size, - fragment::{self, Fragments}, + fragment::{self, DefragReceived, Fragments}, }; #[derive(Debug, thiserror::Error)] @@ -311,8 +311,10 @@ async fn proxy_tx_task(udp_socket: impl AsRef<UdpSocket>, mut client_rx: mpsc::R let quic_payload = quic_datagram.into_payload(); let packet = match fragments.handle_incoming_packet(quic_payload) { - Ok(Some(packet)) => packet, - Ok(None) => continue, + Ok(DefragReceived::Reassembled(packet) | DefragReceived::Nonfragmented(packet)) => { + packet + } + Ok(DefragReceived::Fragment) => continue, Err(err) => { log::trace!("Failed to reassemble incoming packet: {err}"); continue; diff --git a/talpid-wireguard/src/ephemeral.rs b/talpid-wireguard/src/ephemeral.rs index 199cb0ebd6..501de4fdb6 100644 --- a/talpid-wireguard/src/ephemeral.rs +++ b/talpid-wireguard/src/ephemeral.rs @@ -27,6 +27,7 @@ pub async fn config_ephemeral_peers( tunnel: &Arc<AsyncMutex<Option<TunnelType>>>, config: &mut Config, retry_attempt: u32, + obfuscation_mtu: u16, obfuscator: Arc<AsyncMutex<Option<ObfuscatorHandle>>>, close_obfs_sender: sync_mpsc::Sender<CloseMsg>, ) -> std::result::Result<(), CloseMsg> { @@ -42,8 +43,15 @@ pub async fn config_ephemeral_peers( log::trace!("Temporarily lowering tunnel MTU before ephemeral peer config"); try_set_ipv4_mtu(&iface_name, talpid_tunnel::MIN_IPV4_MTU); - config_ephemeral_peers_inner(tunnel, config, retry_attempt, obfuscator, close_obfs_sender) - .await?; + config_ephemeral_peers_inner( + tunnel, + config, + retry_attempt, + obfuscation_mtu, + obfuscator, + close_obfs_sender, + ) + .await?; log::trace!("Resetting tunnel MTU"); try_set_ipv4_mtu(&iface_name, config.mtu); @@ -71,6 +79,7 @@ pub async fn config_ephemeral_peers( tunnel: &Arc<AsyncMutex<Option<TunnelType>>>, config: &mut Config, retry_attempt: u32, + obfuscation_mtu: u16, obfuscator: Arc<AsyncMutex<Option<ObfuscatorHandle>>>, close_obfs_sender: sync_mpsc::Sender<CloseMsg>, #[cfg(target_os = "android")] tun_provider: Arc<Mutex<TunProvider>>, @@ -79,6 +88,7 @@ pub async fn config_ephemeral_peers( tunnel, config, retry_attempt, + obfuscation_mtu, obfuscator, close_obfs_sender, #[cfg(target_os = "android")] @@ -91,6 +101,7 @@ async fn config_ephemeral_peers_inner( tunnel: &Arc<AsyncMutex<Option<TunnelType>>>, config: &mut Config, retry_attempt: u32, + obfuscation_mtu: u16, obfuscator: Arc<AsyncMutex<Option<ObfuscatorHandle>>>, close_obfs_sender: sync_mpsc::Sender<CloseMsg>, #[cfg(target_os = "android")] tun_provider: Arc<Mutex<TunProvider>>, @@ -125,6 +136,7 @@ async fn config_ephemeral_peers_inner( let entry_config = reconfigure_tunnel( tunnel, entry_tun_config, + obfuscation_mtu, obfuscator.clone(), close_obfs_sender, #[cfg(target_os = "android")] @@ -156,6 +168,7 @@ async fn config_ephemeral_peers_inner( *config = reconfigure_tunnel( tunnel, config.clone(), + obfuscation_mtu, obfuscator, close_obfs_sender, #[cfg(target_os = "android")] @@ -187,6 +200,7 @@ async fn config_ephemeral_peers_inner( async fn reconfigure_tunnel( tunnel: &Arc<AsyncMutex<Option<TunnelType>>>, mut config: Config, + obfuscation_mtu: u16, obfuscator: Arc<AsyncMutex<Option<ObfuscatorHandle>>>, close_obfs_sender: sync_mpsc::Sender<CloseMsg>, tun_provider: &Arc<Mutex<TunProvider>>, @@ -196,6 +210,7 @@ async fn reconfigure_tunnel( obfuscator_handle.abort(); *obfs_guard = super::obfuscation::apply_obfuscation_config( &mut config, + obfuscation_mtu, close_obfs_sender, #[cfg(target_os = "android")] tun_provider.clone(), @@ -224,15 +239,20 @@ async fn reconfigure_tunnel( async fn reconfigure_tunnel( tunnel: &Arc<AsyncMutex<Option<TunnelType>>>, mut config: Config, + obfuscation_mtu: u16, obfuscator: Arc<AsyncMutex<Option<ObfuscatorHandle>>>, close_obfs_sender: sync_mpsc::Sender<CloseMsg>, ) -> Result<Config, CloseMsg> { let mut obfs_guard = obfuscator.lock().await; if let Some(obfuscator_handle) = obfs_guard.take() { obfuscator_handle.abort(); - *obfs_guard = super::obfuscation::apply_obfuscation_config(&mut config, close_obfs_sender) - .await - .map_err(CloseMsg::ObfuscatorFailed)?; + *obfs_guard = super::obfuscation::apply_obfuscation_config( + &mut config, + obfuscation_mtu, + close_obfs_sender, + ) + .await + .map_err(CloseMsg::ObfuscatorFailed)?; } { diff --git a/talpid-wireguard/src/lib.rs b/talpid-wireguard/src/lib.rs index 5e265e4dd5..dc6964385a 100644 --- a/talpid-wireguard/src/lib.rs +++ b/talpid-wireguard/src/lib.rs @@ -168,19 +168,24 @@ impl WireguardMonitor { let endpoint_addrs = [params.get_next_hop_endpoint().address.ip()]; let (close_obfs_sender, close_obfs_listener) = sync_mpsc::channel(); + // Adjust MTU unless overridden by user + if params.options.mtu.is_none() { + config.mtu = clamp_mtu(params, config.mtu); + } // Start obfuscation server and patch the WireGuard config to point the endpoint to it. + let obfuscation_mtu = config.mtu; let obfuscator = args .runtime .block_on(obfuscation::apply_obfuscation_config( &mut config, + obfuscation_mtu, close_obfs_sender.clone(), ))?; - // Don't adjust MTU if overridden by user - if params.options.mtu.is_none() { - if let Some(obfuscator) = obfuscator.as_ref() { - config.mtu = config.mtu.saturating_sub(obfuscator.packet_overhead()); - } - config.mtu = clamp_mtu(params, config.mtu); + // Adjust MTU again for obfuscation packet overhead + if params.options.mtu.is_none() + && let Some(obfuscator) = obfuscator.as_ref() + { + config.mtu = config.mtu.saturating_sub(obfuscator.packet_overhead()); } // NOTE: We force userspace WireGuard while boringtun is enabled to more easily test @@ -279,6 +284,7 @@ impl WireguardMonitor { &tunnel, &mut config, args.retry_attempt, + obfuscation_mtu, obfuscator.clone(), ephemeral_obfs_sender, ) @@ -414,10 +420,12 @@ impl WireguardMonitor { Config::from_parameters(params, desired_mtu).map_err(Error::WireguardConfigError)?; let (close_obfs_sender, close_obfs_listener) = sync_mpsc::channel(); // Start obfuscation server and patch the WireGuard config to point the endpoint to it. + let obfuscation_mtu = config.mtu; let obfuscator = args .runtime .block_on(obfuscation::apply_obfuscation_config( &mut config, + obfuscation_mtu, close_obfs_sender.clone(), args.tun_provider.clone(), ))?; @@ -520,6 +528,7 @@ impl WireguardMonitor { &tunnel, &mut config, args.retry_attempt, + obfuscation_mtu, obfuscator.clone(), ephemeral_obfs_sender, args.tun_provider, diff --git a/talpid-wireguard/src/obfuscation.rs b/talpid-wireguard/src/obfuscation.rs index c7bb69dd34..3fd4d411ea 100644 --- a/talpid-wireguard/src/obfuscation.rs +++ b/talpid-wireguard/src/obfuscation.rs @@ -8,7 +8,6 @@ use std::{ net::{Ipv4Addr, Ipv6Addr, SocketAddr}, sync::mpsc as sync_mpsc, }; -use talpid_tunnel::WIREGUARD_HEADER_SIZE; #[cfg(target_os = "android")] use talpid_tunnel::tun_provider::TunProvider; use talpid_types::{ErrorExt, net::obfuscation::ObfuscatorConfig}; @@ -19,8 +18,13 @@ use tunnel_obfuscation::{ /// Begin running obfuscation machine, if configured. This function will patch `config`'s endpoint /// to point to an endpoint on localhost +/// +/// # Arguments +/// +/// * obfuscation_mtu - "MTU" including obfuscation overhead pub async fn apply_obfuscation_config( config: &mut Config, + obfuscation_mtu: u16, close_msg_sender: sync_mpsc::Sender<CloseMsg>, #[cfg(target_os = "android")] tun_provider: Arc<Mutex<TunProvider>>, ) -> Result<Option<ObfuscatorHandle>> { @@ -28,24 +32,12 @@ pub async fn apply_obfuscation_config( return Ok(None); }; - let settings = { - let settings = settings_from_config( - obfuscator_config, - #[cfg(target_os = "linux")] - config.fwmark, - ); - - // Adjust MTU for QUIC obfuscator. - match settings { - ObfuscationSettings::Quic(quic) => { - // Account for multihop - // FIXME: Pass proper mtu as an argument / through config? - let quic = quic.mtu(config.mtu - 2 * WIREGUARD_HEADER_SIZE); - ObfuscationSettings::Quic(quic) - } - settings => settings, - } - }; + let settings = settings_from_config( + obfuscator_config, + obfuscation_mtu, + #[cfg(target_os = "linux")] + config.fwmark, + ); log::trace!("Obfuscation settings: {settings:?}"); @@ -90,6 +82,7 @@ fn patch_endpoint(config: &mut Config, endpoint: SocketAddr) { fn settings_from_config( config: &ObfuscatorConfig, + mtu: u16, #[cfg(target_os = "linux")] fwmark: Option<u32>, ) -> ObfuscationSettings { match config { @@ -121,7 +114,8 @@ fn settings_from_config( hostname.to_owned(), auth_token.parse().unwrap(), wireguard_endpoint, - ); + ) + .mtu(mtu); #[cfg(target_os = "linux")] if let Some(fwmark) = fwmark { return ObfuscationSettings::Quic(settings.fwmark(fwmark)); diff --git a/tunnel-obfuscation/src/quic.rs b/tunnel-obfuscation/src/quic.rs index a3f3164066..44dcfe1f8e 100644 --- a/tunnel-obfuscation/src/quic.rs +++ b/tunnel-obfuscation/src/quic.rs @@ -208,7 +208,9 @@ impl Obfuscator for Quic { } fn packet_overhead(&self) -> u16 { - 0 // FIXME + // TODO: 95 = IPv6 (40) + UDP (8) + QUIC (<= 41) + stream ID (1) + fragment header (5) + // The above would prevent mullvad-masque-proxy-level fragmentation + 0 } #[cfg(target_os = "android")] |
