diff options
| author | David Lönnhager <david.l@mullvad.net> | 2025-08-15 11:47:58 +0200 |
|---|---|---|
| committer | David Lönnhager <david.l@mullvad.net> | 2025-08-18 18:11:09 +0200 |
| commit | b0807e3ab9e423eb38a3b21ead3cca5b72a0ce1d (patch) | |
| tree | 4288d670630c4368a171c08370be13db262b8183 | |
| parent | 081f1e74b9dab7feb134cac439d873c38b4927ce (diff) | |
| download | mullvadvpn-b0807e3ab9e423eb38a3b21ead3cca5b72a0ce1d.tar.xz mullvadvpn-b0807e3ab9e423eb38a3b21ead3cca5b72a0ce1d.zip | |
Fix rx fragment stats in mullvad-masque-proxy
| -rw-r--r-- | mullvad-masque-proxy/src/client/mod.rs | 23 | ||||
| -rw-r--r-- | mullvad-masque-proxy/src/fragment.rs | 30 | ||||
| -rw-r--r-- | mullvad-masque-proxy/src/server/mod.rs | 8 |
3 files changed, 46 insertions, 15 deletions
diff --git a/mullvad-masque-proxy/src/client/mod.rs b/mullvad-masque-proxy/src/client/mod.rs index 6163c63d25..e9d19fabf0 100644 --- a/mullvad-masque-proxy/src/client/mod.rs +++ b/mullvad-masque-proxy/src/client/mod.rs @@ -28,7 +28,7 @@ use quinn::{ use crate::{ MASQUE_WELL_KNOWN_PATH, MAX_INFLIGHT_PACKETS, MIN_IPV4_MTU, MIN_IPV6_MTU, QUIC_HEADER_SIZE, compute_udp_payload_size, - fragment::{self, Fragments}, + fragment::{self, DefragReceived, Fragments}, stats::Stats, }; @@ -553,14 +553,27 @@ async fn client_socket_tx_task( continue; } let payload = response.into_payload(); + let original_payload_len = payload.len(); - if let Ok(Some(payload)) = fragments.handle_incoming_packet(payload) { - stats.rx(payload.len(), false /* TODO */); - + let send = async |payload: &[u8]| -> Result<()> { client_socket - .send_to(payload.chunk(), return_addr) + .send_to(payload, return_addr) .await .map_err(Error::ClientWrite)?; + Ok(()) + }; + + match fragments.handle_incoming_packet(payload) { + Ok(DefragReceived::Nonfragmented(payload)) => { + stats.rx(payload.len(), false); + send(payload.chunk()).await?; + } + Ok(DefragReceived::Reassembled(reassembled_payload)) => { + stats.rx(original_payload_len, true); + send(reassembled_payload.chunk()).await?; + } + Ok(DefragReceived::Fragment) => stats.rx(original_payload_len, true), + Err(_) => (), } } diff --git a/mullvad-masque-proxy/src/fragment.rs b/mullvad-masque-proxy/src/fragment.rs index f6e648af9e..534fb8bff5 100644 --- a/mullvad-masque-proxy/src/fragment.rs +++ b/mullvad-masque-proxy/src/fragment.rs @@ -50,15 +50,24 @@ impl Default for Fragments { } } +pub enum DefragReceived { + /// Received a whole packet without fragmentation + Nonfragmented(Bytes), + /// Received a fragment but was unable to reassemble the packet + Fragment, + /// Received reassembled packet + Reassembled(Bytes), +} + impl Fragments { // TODO: Let caller provide output buffer. pub fn handle_incoming_packet( &mut self, mut payload: Bytes, - ) -> Result<Option<Bytes>, DefragError> { + ) -> Result<DefragReceived, DefragError> { match VarInt::decode(&mut payload) { Ok(crate::HTTP_MASQUE_DATAGRAM_CONTEXT_ID) => { - return Ok(Some(payload)); + return Ok(DefragReceived::Nonfragmented(payload)); } Ok(crate::HTTP_MASQUE_FRAGMENTED_DATAGRAM_CONTEXT_ID) => {} unexpected_context_id => { @@ -95,7 +104,11 @@ impl Fragments { let fragments = self.fragment_map.entry(id).or_default(); fragments.push(fragment); - Ok(self.try_reassemble(id, fragment_count)) + let reassembled = self.try_reassemble(id, fragment_count) + .map(DefragReceived::Reassembled) + // TODO: This may also occur if a packet is discarded + .unwrap_or(DefragReceived::Fragment); + Ok(reassembled) } // TODO: Let caller provide output buffer. @@ -199,7 +212,7 @@ mod test { fragment_buf.shuffle(&mut thread_rng()); for fragment in fragment_buf { - if let Some(reconstructed_packet) = + if let DefragReceived::Reassembled(reconstructed_packet) = fragments.handle_incoming_packet(fragment).unwrap() { assert_eq!(payload.as_slice(), reconstructed_packet.as_ref()); @@ -237,7 +250,10 @@ mod test { let packet = fragments .handle_incoming_packet(fragment_buf.pop().unwrap()) .unwrap(); - assert!(packet.is_none(), "haven't sent all fragments yet"); + assert!( + matches!(packet, DefragReceived::Fragment), + "haven't sent all fragments yet" + ); // then send a bunch of fragments to fill the queue let mut bad_payload = Bytes::from([0u8; 2].to_vec()); @@ -253,11 +269,11 @@ mod test { let packet = fragments .handle_incoming_packet(incomplete_fragment.clone()) .unwrap(); - assert!(packet.is_none()); + assert!(matches!(packet, DefragReceived::Fragment)); } for fragment in fragment_buf { - if let Some(reconstructed_packet) = + if let DefragReceived::Reassembled(reconstructed_packet) = fragments.handle_incoming_packet(fragment).unwrap() { assert_eq!(payload.as_slice(), reconstructed_packet.as_ref()); diff --git a/mullvad-masque-proxy/src/server/mod.rs b/mullvad-masque-proxy/src/server/mod.rs index f8f224d485..3788ca7c80 100644 --- a/mullvad-masque-proxy/src/server/mod.rs +++ b/mullvad-masque-proxy/src/server/mod.rs @@ -22,7 +22,7 @@ use typed_builder::TypedBuilder; use crate::{ MASQUE_WELL_KNOWN_PATH, MAX_INFLIGHT_PACKETS, MIN_IPV4_MTU, MIN_IPV6_MTU, QUIC_HEADER_SIZE, compute_udp_payload_size, - fragment::{self, Fragments}, + fragment::{self, DefragReceived, Fragments}, }; #[derive(Debug, thiserror::Error)] @@ -311,8 +311,10 @@ async fn proxy_tx_task(udp_socket: impl AsRef<UdpSocket>, mut client_rx: mpsc::R let quic_payload = quic_datagram.into_payload(); let packet = match fragments.handle_incoming_packet(quic_payload) { - Ok(Some(packet)) => packet, - Ok(None) => continue, + Ok(DefragReceived::Reassembled(packet) | DefragReceived::Nonfragmented(packet)) => { + packet + } + Ok(DefragReceived::Fragment) => continue, Err(err) => { log::trace!("Failed to reassemble incoming packet: {err}"); continue; |
