summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorDavid Lönnhager <david.l@mullvad.net>2025-08-15 11:47:58 +0200
committerDavid Lönnhager <david.l@mullvad.net>2025-08-18 18:11:09 +0200
commitb0807e3ab9e423eb38a3b21ead3cca5b72a0ce1d (patch)
tree4288d670630c4368a171c08370be13db262b8183
parent081f1e74b9dab7feb134cac439d873c38b4927ce (diff)
downloadmullvadvpn-b0807e3ab9e423eb38a3b21ead3cca5b72a0ce1d.tar.xz
mullvadvpn-b0807e3ab9e423eb38a3b21ead3cca5b72a0ce1d.zip
Fix rx fragment stats in mullvad-masque-proxy
-rw-r--r--mullvad-masque-proxy/src/client/mod.rs23
-rw-r--r--mullvad-masque-proxy/src/fragment.rs30
-rw-r--r--mullvad-masque-proxy/src/server/mod.rs8
3 files changed, 46 insertions, 15 deletions
diff --git a/mullvad-masque-proxy/src/client/mod.rs b/mullvad-masque-proxy/src/client/mod.rs
index 6163c63d25..e9d19fabf0 100644
--- a/mullvad-masque-proxy/src/client/mod.rs
+++ b/mullvad-masque-proxy/src/client/mod.rs
@@ -28,7 +28,7 @@ use quinn::{
use crate::{
MASQUE_WELL_KNOWN_PATH, MAX_INFLIGHT_PACKETS, MIN_IPV4_MTU, MIN_IPV6_MTU, QUIC_HEADER_SIZE,
compute_udp_payload_size,
- fragment::{self, Fragments},
+ fragment::{self, DefragReceived, Fragments},
stats::Stats,
};
@@ -553,14 +553,27 @@ async fn client_socket_tx_task(
continue;
}
let payload = response.into_payload();
+ let original_payload_len = payload.len();
- if let Ok(Some(payload)) = fragments.handle_incoming_packet(payload) {
- stats.rx(payload.len(), false /* TODO */);
-
+ let send = async |payload: &[u8]| -> Result<()> {
client_socket
- .send_to(payload.chunk(), return_addr)
+ .send_to(payload, return_addr)
.await
.map_err(Error::ClientWrite)?;
+ Ok(())
+ };
+
+ match fragments.handle_incoming_packet(payload) {
+ Ok(DefragReceived::Nonfragmented(payload)) => {
+ stats.rx(payload.len(), false);
+ send(payload.chunk()).await?;
+ }
+ Ok(DefragReceived::Reassembled(reassembled_payload)) => {
+ stats.rx(original_payload_len, true);
+ send(reassembled_payload.chunk()).await?;
+ }
+ Ok(DefragReceived::Fragment) => stats.rx(original_payload_len, true),
+ Err(_) => (),
}
}
diff --git a/mullvad-masque-proxy/src/fragment.rs b/mullvad-masque-proxy/src/fragment.rs
index f6e648af9e..534fb8bff5 100644
--- a/mullvad-masque-proxy/src/fragment.rs
+++ b/mullvad-masque-proxy/src/fragment.rs
@@ -50,15 +50,24 @@ impl Default for Fragments {
}
}
+pub enum DefragReceived {
+ /// Received a whole packet without fragmentation
+ Nonfragmented(Bytes),
+ /// Received a fragment but was unable to reassemble the packet
+ Fragment,
+ /// Received reassembled packet
+ Reassembled(Bytes),
+}
+
impl Fragments {
// TODO: Let caller provide output buffer.
pub fn handle_incoming_packet(
&mut self,
mut payload: Bytes,
- ) -> Result<Option<Bytes>, DefragError> {
+ ) -> Result<DefragReceived, DefragError> {
match VarInt::decode(&mut payload) {
Ok(crate::HTTP_MASQUE_DATAGRAM_CONTEXT_ID) => {
- return Ok(Some(payload));
+ return Ok(DefragReceived::Nonfragmented(payload));
}
Ok(crate::HTTP_MASQUE_FRAGMENTED_DATAGRAM_CONTEXT_ID) => {}
unexpected_context_id => {
@@ -95,7 +104,11 @@ impl Fragments {
let fragments = self.fragment_map.entry(id).or_default();
fragments.push(fragment);
- Ok(self.try_reassemble(id, fragment_count))
+ let reassembled = self.try_reassemble(id, fragment_count)
+ .map(DefragReceived::Reassembled)
+ // TODO: This may also occur if a packet is discarded
+ .unwrap_or(DefragReceived::Fragment);
+ Ok(reassembled)
}
// TODO: Let caller provide output buffer.
@@ -199,7 +212,7 @@ mod test {
fragment_buf.shuffle(&mut thread_rng());
for fragment in fragment_buf {
- if let Some(reconstructed_packet) =
+ if let DefragReceived::Reassembled(reconstructed_packet) =
fragments.handle_incoming_packet(fragment).unwrap()
{
assert_eq!(payload.as_slice(), reconstructed_packet.as_ref());
@@ -237,7 +250,10 @@ mod test {
let packet = fragments
.handle_incoming_packet(fragment_buf.pop().unwrap())
.unwrap();
- assert!(packet.is_none(), "haven't sent all fragments yet");
+ assert!(
+ matches!(packet, DefragReceived::Fragment),
+ "haven't sent all fragments yet"
+ );
// then send a bunch of fragments to fill the queue
let mut bad_payload = Bytes::from([0u8; 2].to_vec());
@@ -253,11 +269,11 @@ mod test {
let packet = fragments
.handle_incoming_packet(incomplete_fragment.clone())
.unwrap();
- assert!(packet.is_none());
+ assert!(matches!(packet, DefragReceived::Fragment));
}
for fragment in fragment_buf {
- if let Some(reconstructed_packet) =
+ if let DefragReceived::Reassembled(reconstructed_packet) =
fragments.handle_incoming_packet(fragment).unwrap()
{
assert_eq!(payload.as_slice(), reconstructed_packet.as_ref());
diff --git a/mullvad-masque-proxy/src/server/mod.rs b/mullvad-masque-proxy/src/server/mod.rs
index f8f224d485..3788ca7c80 100644
--- a/mullvad-masque-proxy/src/server/mod.rs
+++ b/mullvad-masque-proxy/src/server/mod.rs
@@ -22,7 +22,7 @@ use typed_builder::TypedBuilder;
use crate::{
MASQUE_WELL_KNOWN_PATH, MAX_INFLIGHT_PACKETS, MIN_IPV4_MTU, MIN_IPV6_MTU, QUIC_HEADER_SIZE,
compute_udp_payload_size,
- fragment::{self, Fragments},
+ fragment::{self, DefragReceived, Fragments},
};
#[derive(Debug, thiserror::Error)]
@@ -311,8 +311,10 @@ async fn proxy_tx_task(udp_socket: impl AsRef<UdpSocket>, mut client_rx: mpsc::R
let quic_payload = quic_datagram.into_payload();
let packet = match fragments.handle_incoming_packet(quic_payload) {
- Ok(Some(packet)) => packet,
- Ok(None) => continue,
+ Ok(DefragReceived::Reassembled(packet) | DefragReceived::Nonfragmented(packet)) => {
+ packet
+ }
+ Ok(DefragReceived::Fragment) => continue,
Err(err) => {
log::trace!("Failed to reassemble incoming packet: {err}");
continue;