diff options
| author | Sebastian Holmin <sebastian.holmin@mullvad.net> | 2025-09-02 15:38:51 +0200 |
|---|---|---|
| committer | Sebastian Holmin <sebastian.holmin@mullvad.net> | 2025-09-02 15:38:51 +0200 |
| commit | 498b66124cfc18c6b5cea22c23db8d767cd245f8 (patch) | |
| tree | 213bf7704fb4b741873269c97c5fe5be0e54ba9b | |
| parent | 83726134468eeeb07cf7269df3fee8267716b37b (diff) | |
| parent | 6f2e64da12861cc02512f75b35084209bf8655de (diff) | |
| download | mullvadvpn-498b66124cfc18c6b5cea22c23db8d767cd245f8.tar.xz mullvadvpn-498b66124cfc18c6b5cea22c23db8d767cd245f8.zip | |
Merge branch 'masque-benchmarks'
| -rw-r--r-- | Cargo.lock | 182 | ||||
| -rw-r--r-- | mullvad-masque-proxy/Cargo.toml | 8 | ||||
| -rw-r--r-- | mullvad-masque-proxy/benches/fragmentation.rs | 112 | ||||
| -rw-r--r-- | mullvad-masque-proxy/src/client/mod.rs | 6 | ||||
| -rw-r--r-- | mullvad-masque-proxy/src/fragment.rs | 94 | ||||
| -rw-r--r-- | mullvad-masque-proxy/src/lib.rs | 4 |
6 files changed, 359 insertions, 47 deletions
diff --git a/Cargo.lock b/Cargo.lock index c6b6a79e26..2285a260a2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -101,6 +101,12 @@ dependencies = [ ] [[package]] +name = "anes" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" + +[[package]] name = "anstream" version = "0.6.13" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -572,6 +578,12 @@ dependencies = [ ] [[package]] +name = "cast" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" + +[[package]] name = "cbindgen" version = "0.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -662,6 +674,33 @@ dependencies = [ ] [[package]] +name = "ciborium" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42e69ffd6f0917f5c029256a24d0161db17cea3997d185db0d35926308770f0e" +dependencies = [ + "ciborium-io", + "ciborium-ll", + "serde", +] + +[[package]] +name = "ciborium-io" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05afea1e0a06c9be33d539b876f1ce3692f4afea2cb41f740e7743225ed1c757" + +[[package]] +name = "ciborium-ll" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57663b653d948a338bfb3eeba9bb2fd5fcfaecb9e199e87e1eda4d9e8b240fd9" +dependencies = [ + "ciborium-io", + "half", +] + +[[package]] name = "cipher" version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -850,6 +889,39 @@ dependencies = [ ] [[package]] +name = "criterion" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1c047a62b0cc3e145fa84415a3191f628e980b194c2755aa12300a4e6cbd928" +dependencies = [ + "anes", + "cast", + "ciborium", + "clap", + "criterion-plot", + "itertools 0.13.0", + "num-traits", + "oorandom", + "plotters", + "rayon", + "regex", + "serde", + "serde_json", + "tinytemplate", + "walkdir", +] + +[[package]] +name = "criterion-plot" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b1bcc0dc7dfae599d84ad0b1a55f80cde8af3725da8313b528da95ef783e338" +dependencies = [ + "cast", + "itertools 0.13.0", +] + +[[package]] name = "crossbeam-channel" version = "0.5.15" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -859,12 +931,37 @@ dependencies = [ ] [[package]] +name = "crossbeam-deque" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + +[[package]] name = "crossbeam-utils" version = "0.8.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345" [[package]] +name = "crunchy" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "460fbee9c2c2f33933d720630a6a0bac33ba7053db5344fac858d4b8952d77d5" + +[[package]] name = "crypto-bigint" version = "0.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1709,6 +1806,16 @@ dependencies = [ ] [[package]] +name = "half" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "459196ed295495a68f7d7fe1d84f6c4b7ff0e21fe3017b2f283c6fac3ad803c9" +dependencies = [ + "cfg-if", + "crunchy", +] + +[[package]] name = "hashbrown" version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2419,6 +2526,15 @@ dependencies = [ [[package]] name = "itertools" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" +dependencies = [ + "either", +] + +[[package]] +name = "itertools" version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b192c782037fadd9cfa75548310488aabdbf3d2da73885b31bd0abd03351285" @@ -3141,6 +3257,7 @@ dependencies = [ "anyhow", "bytes", "clap", + "criterion", "env_logger 0.11.7", "h3", "h3-datagram", @@ -3152,6 +3269,7 @@ dependencies = [ "rustls 0.23.18", "rustls-pemfile 2.1.3", "socket2 0.5.8", + "talpid-tunnel", "thiserror 2.0.9", "tokio", "typed-builder 0.21.0", @@ -3785,6 +3903,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "945462a4b81e43c4e3ba96bd7b49d834c6f61198356aa858733bc4acf3cbe62e" [[package]] +name = "oorandom" +version = "11.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6790f58c7ff633d8771f42965289203411a5e5c68388703c06e14f24770b41e" + +[[package]] name = "opaque-debug" version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -4097,6 +4221,34 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec" [[package]] +name = "plotters" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5aeb6f403d7a4911efb1e33402027fc44f29b5bf6def3effcc22d7bb75f2b747" +dependencies = [ + "num-traits", + "plotters-backend", + "plotters-svg", + "wasm-bindgen", + "web-sys", +] + +[[package]] +name = "plotters-backend" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df42e13c12958a16b3f7f4386b9ab1f3e7933914ecea48da7139435263a4172a" + +[[package]] +name = "plotters-svg" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51bae2ac328883f7acdfea3d66a7c35751187f870bc81f94563733a154d7a670" +dependencies = [ + "plotters-backend", +] + +[[package]] name = "pnet_base" version = "0.34.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -4601,6 +4753,26 @@ dependencies = [ ] [[package]] +name = "rayon" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "368f01d005bf8fd9b1206fb6fa653e6c4a81ceb1466406b81792d87c5677a58f" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22e18b0f0062d30d4230b2e85ff77fdfe4326feb054b9783a3460d8435c8ab91" +dependencies = [ + "crossbeam-deque", + "crossbeam-utils", +] + +[[package]] name = "redox_syscall" version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -5813,6 +5985,16 @@ dependencies = [ ] [[package]] +name = "tinytemplate" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be4d6b5f19ff7664e8c98d03e2139cb510db9b0a60b55f8e8709b689d939b6bc" +dependencies = [ + "serde", + "serde_json", +] + +[[package]] name = "tinyvec" version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" diff --git a/mullvad-masque-proxy/Cargo.toml b/mullvad-masque-proxy/Cargo.toml index 6f54734d1d..98fc75f0e4 100644 --- a/mullvad-masque-proxy/Cargo.toml +++ b/mullvad-masque-proxy/Cargo.toml @@ -29,6 +29,14 @@ env_logger = { workspace = true } tokio = { workspace = true, features = ["fs", "macros", "io-util", "rt-multi-thread"] } clap = { workspace = true } rand = "0.8.5" +criterion = { version = "0.7.0", features = ["html_reports"] } +talpid-tunnel = { path = "../talpid-tunnel" } + [lints] workspace = true + +[[bench]] +name = "fragmentation" +harness = false + diff --git a/mullvad-masque-proxy/benches/fragmentation.rs b/mullvad-masque-proxy/benches/fragmentation.rs new file mode 100644 index 0000000000..6b4793dd5a --- /dev/null +++ b/mullvad-masque-proxy/benches/fragmentation.rs @@ -0,0 +1,112 @@ +use bytes::Bytes; +use criterion::{BatchSize, BenchmarkId, Criterion, criterion_group, criterion_main}; +use mullvad_masque_proxy::{ + FRAGMENT_HEADER_SIZE_FRAGMENTED, + fragment::{FRAGMENT_BUFFER_CAP, Fragments, fragment_packet}, +}; +use rand::{SeedableRng, rngs::StdRng, seq::SliceRandom}; +use talpid_tunnel::IPV4_HEADER_SIZE; + +const MAX_PAYLOAD_SIZE: u16 = 1280 - FRAGMENT_HEADER_SIZE_FRAGMENTED - IPV4_HEADER_SIZE; + +fn assemble_fragment_ordered(c: &mut Criterion) { + let mut group = c.benchmark_group("fragmentation_reconstruction"); + + for (n_packets, payload_len) in [(10, 30000u16), (100, 1500)] { + let mut fragment_buf = Vec::with_capacity(FRAGMENT_BUFFER_CAP); + for i in 0..n_packets { + let packet_id = i; + let mut payload = Bytes::from(vec![i as u8; payload_len as usize]); + + fragment_buf.extend( + &mut fragment_packet( + MAX_PAYLOAD_SIZE + FRAGMENT_HEADER_SIZE_FRAGMENTED, + &mut payload, + packet_id, + ) + .unwrap(), + ); + } + let n_fragments = fragment_buf.len(); + assert!( + n_fragments <= FRAGMENT_BUFFER_CAP, + "Too many fragments generated" + ); + group.throughput(criterion::Throughput::Bytes( + (n_packets * payload_len) as u64, + )); + + group.bench_with_input( + BenchmarkId::new( + "assemble_fragment_ordered", + format!("{n_packets}pkts_{payload_len}B_{n_fragments}frags"), + ), + &fragment_buf, + |b, fragment_buf| { + b.iter_batched( + || (fragment_buf.clone(), Fragments::default()), + |(f, mut fragments)| { + for frag in f { + fragments.handle_incoming_packet(frag).unwrap(); + } + }, + BatchSize::SmallInput, + ) + }, + ); + } + group.finish(); +} + +fn assemble_fragment_random(c: &mut Criterion) { + let mut group = c.benchmark_group("fragmentation_reconstruction"); + + for (n_packets, payload_len) in [(10, 30000u16), (100, 1500)] { + let mut fragment_buf = Vec::with_capacity(FRAGMENT_BUFFER_CAP); + for i in 0..n_packets { + let packet_id = i; + let mut payload = Bytes::from(vec![i as u8; payload_len as usize]); + + fragment_buf.extend( + &mut fragment_packet( + MAX_PAYLOAD_SIZE + FRAGMENT_HEADER_SIZE_FRAGMENTED, + &mut payload, + packet_id, + ) + .unwrap(), + ); + } + let n_fragments = fragment_buf.len(); + assert!( + n_fragments <= FRAGMENT_BUFFER_CAP, + "Too many fragments generated" + ); + group.throughput(criterion::Throughput::Bytes( + (n_packets * payload_len) as u64, + )); + let mut rng = StdRng::seed_from_u64(42); + fragment_buf.shuffle(&mut rng); + + group.bench_with_input( + BenchmarkId::new( + "assemble_fragment_random", + format!("{n_packets}pkts_{payload_len}B_{n_fragments}frags"), + ), + &fragment_buf, + |b, fragment_buf| { + b.iter_batched( + || (fragment_buf.clone(), Fragments::default()), + |(f, mut fragments)| { + for frag in f { + fragments.handle_incoming_packet(frag).unwrap(); + } + }, + BatchSize::SmallInput, + ) + }, + ); + } + group.finish(); +} +criterion_group!(benches, assemble_fragment_ordered, assemble_fragment_random); +criterion_main!(benches); diff --git a/mullvad-masque-proxy/src/client/mod.rs b/mullvad-masque-proxy/src/client/mod.rs index 32983ccdef..8ab79ec715 100644 --- a/mullvad-masque-proxy/src/client/mod.rs +++ b/mullvad-masque-proxy/src/client/mod.rs @@ -573,7 +573,11 @@ async fn client_socket_tx_task( send(reassembled_payload.chunk()).await?; } Ok(DefragReceived::Fragment) => stats.rx(original_payload_len, true), - Err(_) => (), + Err(e) => { + use log::Level::*; + let level = if cfg!(debug_assertions) { Debug } else { Trace }; + log::log!(level, "Packet reassembly failed: {e}"); + } } } diff --git a/mullvad-masque-proxy/src/fragment.rs b/mullvad-masque-proxy/src/fragment.rs index 5cb05d132c..d4cfca5496 100644 --- a/mullvad-masque-proxy/src/fragment.rs +++ b/mullvad-masque-proxy/src/fragment.rs @@ -1,4 +1,4 @@ -use std::collections::{BTreeMap, VecDeque}; +use std::collections::{BTreeMap, VecDeque, btree_map}; use bytes::{Buf, BufMut, Bytes, BytesMut}; use h3::proto::varint::VarInt; @@ -10,7 +10,7 @@ const FRAGMENT_INDEX_START: u8 = 1; /// The maximum number of unassembled fragments that we buffer. // 255 is the theoretical maximum number of fragments for a single packet. -const FRAGMENT_BUFFER_CAP: usize = 255; +pub const FRAGMENT_BUFFER_CAP: usize = 255; pub struct Fragments { /// FIFO queue of fragment indices. Used to mitigate floods of unordered packet fragments. @@ -22,6 +22,8 @@ pub struct Fragments { /// Map of fragmented packets. /// /// If fragments are arriving in order, this should never hold more than one set of fragments. + /// + /// INVARIANT: The `Vec` is sorted by `Fragment::index` // TODO: would a hashmap be faster? fragment_map: BTreeMap<u16, Vec<Fragment>>, } @@ -34,6 +36,12 @@ pub enum DefragError { #[error("Payload is too small")] PayloadTooSmall, + + #[error("Too few fragments in fragmented packet")] + TooFewFragments, + + #[error("Received a fragment twice")] + DuplicateFragment, } // When a packet is larger than u16::MAX, it can't be fragmented. @@ -84,6 +92,10 @@ impl Fragments { let fragment_count = payload .try_get_u8() .map_err(|_| DefragError::PayloadTooSmall)?; + if fragment_count < 2 { + // Packets with only one fragment should be sent as non-fragmented packets. + return Err(DefragError::TooFewFragments); + } let fragment = Fragment { index, payload }; // ensure that the fifo has capacity before pushing the new fragment id @@ -101,49 +113,42 @@ impl Fragments { "fragment_index_fifo must never grow", ); - let fragments = self.fragment_map.entry(id).or_default(); - fragments.push(fragment); + let entry = self.fragment_map.entry(id); - let reassembled = self.try_reassemble(id, fragment_count) - .map(DefragReceived::Reassembled) - // TODO: This may also occur if a packet is discarded - .unwrap_or(DefragReceived::Fragment); - Ok(reassembled) - } + let mut entry = match entry { + btree_map::Entry::Occupied(occupied) => occupied, - // TODO: Let caller provide output buffer. - fn try_reassemble(&mut self, id: u16, fragment_count: u8) -> Option<Bytes> { - // establish that there are enough fragments to reconstruct the whole packet - let fragments = &self.fragment_map[&id]; - if fragments.len() != fragment_count.into() { - return None; - } + // if this is the first received fragment, don't bother trying to reassemble + btree_map::Entry::Vacant(vacant) => { + let mut fragment_list = Vec::with_capacity(2); // two fragments should be the norm + fragment_list.push(fragment); + vacant.insert(fragment_list); + return Ok(DefragReceived::Fragment); + } + }; - // looks like a valid fragment set. pop it from the map. - let mut fragments = self.fragment_map.remove(&id).expect("fragment must exist"); + let fragments = entry.get_mut(); - fragments.sort_unstable_by_key(|f| f.index); + // insert the fragment such that the list is sorted + match fragments.binary_search_by_key(&fragment.index, |f| f.index) { + Err(insert_here) => fragments.insert(insert_here, fragment), + Ok(_) => return Err(DefragError::DuplicateFragment), + }; - // assert that fragments are in the correct order - // TODO: is this excessively paranoid? - let fragments_missing = (FRAGMENT_INDEX_START..) - .zip(&fragments) - .any(|(expected_index, fragment)| fragment.index != expected_index); - if fragments_missing { - if cfg!(debug_assertions) { - log::debug!("Discarding unordered fragment set"); - } - return None; + // establish that there are enough fragments to reconstruct the whole packet + if fragments.len() != fragment_count.into() { + return Ok(DefragReceived::Fragment); } + let fragments = entry.remove(); + // smush the fragments together let mut payload = BytesMut::with_capacity(fragments.iter().map(|f| f.payload.len()).sum()); for fragment in fragments { payload.extend_from_slice(&fragment.payload); } - let payload = payload.freeze(); - Some(payload) + Ok(DefragReceived::Reassembled(payload.freeze())) } } @@ -203,9 +208,9 @@ mod test { fn test_fragment_reconstruction() { let mut fragments = Fragments::default(); - 'outer: for packet_id in 1..255u16 { + let max_payload_size = 50; + 'outer: for packet_id in max_payload_size..255u16 { let payload = (0..packet_id as u8).collect::<Vec<u8>>(); - let max_payload_size = 50; let mut payload_clone = Bytes::from(payload.clone()); let mut fragment_buf = fragment_packet(max_payload_size, &mut payload_clone, packet_id) @@ -268,8 +273,8 @@ mod test { let fragment_survives_flood = |number_of_bad_fragments| { let mut fragments = Fragments::default(); - let packet_id = 123; - let bad_packet_id = 321; + let packet_id = 1; + let mut bad_packet_ids = 2..0xffff; let payload = (0..255).collect::<Vec<u8>>(); let max_payload_size = 50; @@ -292,15 +297,16 @@ mod test { // then send a bunch of fragments to fill the queue let mut bad_payload = Bytes::from([0u8; 2].to_vec()); - let incomplete_fragment = fragment_packet( - 1 + FRAGMENT_HEADER_SIZE_FRAGMENTED, - &mut bad_payload, - bad_packet_id, - ) - .unwrap() - .next() - .unwrap(); for _ in fragment_buf.len()..number_of_bad_fragments { + let incomplete_fragment = fragment_packet( + 1 + FRAGMENT_HEADER_SIZE_FRAGMENTED, + &mut bad_payload, + bad_packet_ids.next().unwrap(), + ) + .unwrap() + .next() + .unwrap(); + let packet = fragments .handle_incoming_packet(incomplete_fragment.clone()) .unwrap(); diff --git a/mullvad-masque-proxy/src/lib.rs b/mullvad-masque-proxy/src/lib.rs index 7d22dc11d7..37f8353acd 100644 --- a/mullvad-masque-proxy/src/lib.rs +++ b/mullvad-masque-proxy/src/lib.rs @@ -2,7 +2,7 @@ use h3::proto::varint::VarInt; use std::net::SocketAddr; pub mod client; -mod fragment; +pub mod fragment; pub mod server; mod stats; @@ -19,7 +19,7 @@ const PACKET_BUFFER_SIZE: usize = (u16::MAX - UDP_HEADER_SIZE + 1) as usize; const MAX_INFLIGHT_PACKETS: usize = 100; /// Fragment headers size for fragmented packets -const FRAGMENT_HEADER_SIZE_FRAGMENTED: u16 = 5; +pub const FRAGMENT_HEADER_SIZE_FRAGMENTED: u16 = 5; /// UDP header overhead const UDP_HEADER_SIZE: u16 = 8; |
