summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorSebastian Holmin <sebastian.holmin@mullvad.net>2025-09-02 15:38:51 +0200
committerSebastian Holmin <sebastian.holmin@mullvad.net>2025-09-02 15:38:51 +0200
commit498b66124cfc18c6b5cea22c23db8d767cd245f8 (patch)
tree213bf7704fb4b741873269c97c5fe5be0e54ba9b
parent83726134468eeeb07cf7269df3fee8267716b37b (diff)
parent6f2e64da12861cc02512f75b35084209bf8655de (diff)
downloadmullvadvpn-498b66124cfc18c6b5cea22c23db8d767cd245f8.tar.xz
mullvadvpn-498b66124cfc18c6b5cea22c23db8d767cd245f8.zip
Merge branch 'masque-benchmarks'
-rw-r--r--Cargo.lock182
-rw-r--r--mullvad-masque-proxy/Cargo.toml8
-rw-r--r--mullvad-masque-proxy/benches/fragmentation.rs112
-rw-r--r--mullvad-masque-proxy/src/client/mod.rs6
-rw-r--r--mullvad-masque-proxy/src/fragment.rs94
-rw-r--r--mullvad-masque-proxy/src/lib.rs4
6 files changed, 359 insertions, 47 deletions
diff --git a/Cargo.lock b/Cargo.lock
index c6b6a79e26..2285a260a2 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -101,6 +101,12 @@ dependencies = [
]
[[package]]
+name = "anes"
+version = "0.1.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299"
+
+[[package]]
name = "anstream"
version = "0.6.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -572,6 +578,12 @@ dependencies = [
]
[[package]]
+name = "cast"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
+
+[[package]]
name = "cbindgen"
version = "0.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -662,6 +674,33 @@ dependencies = [
]
[[package]]
+name = "ciborium"
+version = "0.2.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "42e69ffd6f0917f5c029256a24d0161db17cea3997d185db0d35926308770f0e"
+dependencies = [
+ "ciborium-io",
+ "ciborium-ll",
+ "serde",
+]
+
+[[package]]
+name = "ciborium-io"
+version = "0.2.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "05afea1e0a06c9be33d539b876f1ce3692f4afea2cb41f740e7743225ed1c757"
+
+[[package]]
+name = "ciborium-ll"
+version = "0.2.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "57663b653d948a338bfb3eeba9bb2fd5fcfaecb9e199e87e1eda4d9e8b240fd9"
+dependencies = [
+ "ciborium-io",
+ "half",
+]
+
+[[package]]
name = "cipher"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -850,6 +889,39 @@ dependencies = [
]
[[package]]
+name = "criterion"
+version = "0.7.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e1c047a62b0cc3e145fa84415a3191f628e980b194c2755aa12300a4e6cbd928"
+dependencies = [
+ "anes",
+ "cast",
+ "ciborium",
+ "clap",
+ "criterion-plot",
+ "itertools 0.13.0",
+ "num-traits",
+ "oorandom",
+ "plotters",
+ "rayon",
+ "regex",
+ "serde",
+ "serde_json",
+ "tinytemplate",
+ "walkdir",
+]
+
+[[package]]
+name = "criterion-plot"
+version = "0.6.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9b1bcc0dc7dfae599d84ad0b1a55f80cde8af3725da8313b528da95ef783e338"
+dependencies = [
+ "cast",
+ "itertools 0.13.0",
+]
+
+[[package]]
name = "crossbeam-channel"
version = "0.5.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -859,12 +931,37 @@ dependencies = [
]
[[package]]
+name = "crossbeam-deque"
+version = "0.8.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51"
+dependencies = [
+ "crossbeam-epoch",
+ "crossbeam-utils",
+]
+
+[[package]]
+name = "crossbeam-epoch"
+version = "0.9.18"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e"
+dependencies = [
+ "crossbeam-utils",
+]
+
+[[package]]
name = "crossbeam-utils"
version = "0.8.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345"
[[package]]
+name = "crunchy"
+version = "0.2.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "460fbee9c2c2f33933d720630a6a0bac33ba7053db5344fac858d4b8952d77d5"
+
+[[package]]
name = "crypto-bigint"
version = "0.5.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1709,6 +1806,16 @@ dependencies = [
]
[[package]]
+name = "half"
+version = "2.6.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "459196ed295495a68f7d7fe1d84f6c4b7ff0e21fe3017b2f283c6fac3ad803c9"
+dependencies = [
+ "cfg-if",
+ "crunchy",
+]
+
+[[package]]
name = "hashbrown"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2419,6 +2526,15 @@ dependencies = [
[[package]]
name = "itertools"
+version = "0.13.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186"
+dependencies = [
+ "either",
+]
+
+[[package]]
+name = "itertools"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b192c782037fadd9cfa75548310488aabdbf3d2da73885b31bd0abd03351285"
@@ -3141,6 +3257,7 @@ dependencies = [
"anyhow",
"bytes",
"clap",
+ "criterion",
"env_logger 0.11.7",
"h3",
"h3-datagram",
@@ -3152,6 +3269,7 @@ dependencies = [
"rustls 0.23.18",
"rustls-pemfile 2.1.3",
"socket2 0.5.8",
+ "talpid-tunnel",
"thiserror 2.0.9",
"tokio",
"typed-builder 0.21.0",
@@ -3785,6 +3903,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "945462a4b81e43c4e3ba96bd7b49d834c6f61198356aa858733bc4acf3cbe62e"
[[package]]
+name = "oorandom"
+version = "11.1.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d6790f58c7ff633d8771f42965289203411a5e5c68388703c06e14f24770b41e"
+
+[[package]]
name = "opaque-debug"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -4097,6 +4221,34 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec"
[[package]]
+name = "plotters"
+version = "0.3.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5aeb6f403d7a4911efb1e33402027fc44f29b5bf6def3effcc22d7bb75f2b747"
+dependencies = [
+ "num-traits",
+ "plotters-backend",
+ "plotters-svg",
+ "wasm-bindgen",
+ "web-sys",
+]
+
+[[package]]
+name = "plotters-backend"
+version = "0.3.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "df42e13c12958a16b3f7f4386b9ab1f3e7933914ecea48da7139435263a4172a"
+
+[[package]]
+name = "plotters-svg"
+version = "0.3.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "51bae2ac328883f7acdfea3d66a7c35751187f870bc81f94563733a154d7a670"
+dependencies = [
+ "plotters-backend",
+]
+
+[[package]]
name = "pnet_base"
version = "0.34.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -4601,6 +4753,26 @@ dependencies = [
]
[[package]]
+name = "rayon"
+version = "1.11.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "368f01d005bf8fd9b1206fb6fa653e6c4a81ceb1466406b81792d87c5677a58f"
+dependencies = [
+ "either",
+ "rayon-core",
+]
+
+[[package]]
+name = "rayon-core"
+version = "1.13.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "22e18b0f0062d30d4230b2e85ff77fdfe4326feb054b9783a3460d8435c8ab91"
+dependencies = [
+ "crossbeam-deque",
+ "crossbeam-utils",
+]
+
+[[package]]
name = "redox_syscall"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -5813,6 +5985,16 @@ dependencies = [
]
[[package]]
+name = "tinytemplate"
+version = "1.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "be4d6b5f19ff7664e8c98d03e2139cb510db9b0a60b55f8e8709b689d939b6bc"
+dependencies = [
+ "serde",
+ "serde_json",
+]
+
+[[package]]
name = "tinyvec"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
diff --git a/mullvad-masque-proxy/Cargo.toml b/mullvad-masque-proxy/Cargo.toml
index 6f54734d1d..98fc75f0e4 100644
--- a/mullvad-masque-proxy/Cargo.toml
+++ b/mullvad-masque-proxy/Cargo.toml
@@ -29,6 +29,14 @@ env_logger = { workspace = true }
tokio = { workspace = true, features = ["fs", "macros", "io-util", "rt-multi-thread"] }
clap = { workspace = true }
rand = "0.8.5"
+criterion = { version = "0.7.0", features = ["html_reports"] }
+talpid-tunnel = { path = "../talpid-tunnel" }
+
[lints]
workspace = true
+
+[[bench]]
+name = "fragmentation"
+harness = false
+
diff --git a/mullvad-masque-proxy/benches/fragmentation.rs b/mullvad-masque-proxy/benches/fragmentation.rs
new file mode 100644
index 0000000000..6b4793dd5a
--- /dev/null
+++ b/mullvad-masque-proxy/benches/fragmentation.rs
@@ -0,0 +1,112 @@
+use bytes::Bytes;
+use criterion::{BatchSize, BenchmarkId, Criterion, criterion_group, criterion_main};
+use mullvad_masque_proxy::{
+ FRAGMENT_HEADER_SIZE_FRAGMENTED,
+ fragment::{FRAGMENT_BUFFER_CAP, Fragments, fragment_packet},
+};
+use rand::{SeedableRng, rngs::StdRng, seq::SliceRandom};
+use talpid_tunnel::IPV4_HEADER_SIZE;
+
+const MAX_PAYLOAD_SIZE: u16 = 1280 - FRAGMENT_HEADER_SIZE_FRAGMENTED - IPV4_HEADER_SIZE;
+
+fn assemble_fragment_ordered(c: &mut Criterion) {
+ let mut group = c.benchmark_group("fragmentation_reconstruction");
+
+ for (n_packets, payload_len) in [(10, 30000u16), (100, 1500)] {
+ let mut fragment_buf = Vec::with_capacity(FRAGMENT_BUFFER_CAP);
+ for i in 0..n_packets {
+ let packet_id = i;
+ let mut payload = Bytes::from(vec![i as u8; payload_len as usize]);
+
+ fragment_buf.extend(
+ &mut fragment_packet(
+ MAX_PAYLOAD_SIZE + FRAGMENT_HEADER_SIZE_FRAGMENTED,
+ &mut payload,
+ packet_id,
+ )
+ .unwrap(),
+ );
+ }
+ let n_fragments = fragment_buf.len();
+ assert!(
+ n_fragments <= FRAGMENT_BUFFER_CAP,
+ "Too many fragments generated"
+ );
+ group.throughput(criterion::Throughput::Bytes(
+ (n_packets * payload_len) as u64,
+ ));
+
+ group.bench_with_input(
+ BenchmarkId::new(
+ "assemble_fragment_ordered",
+ format!("{n_packets}pkts_{payload_len}B_{n_fragments}frags"),
+ ),
+ &fragment_buf,
+ |b, fragment_buf| {
+ b.iter_batched(
+ || (fragment_buf.clone(), Fragments::default()),
+ |(f, mut fragments)| {
+ for frag in f {
+ fragments.handle_incoming_packet(frag).unwrap();
+ }
+ },
+ BatchSize::SmallInput,
+ )
+ },
+ );
+ }
+ group.finish();
+}
+
+fn assemble_fragment_random(c: &mut Criterion) {
+ let mut group = c.benchmark_group("fragmentation_reconstruction");
+
+ for (n_packets, payload_len) in [(10, 30000u16), (100, 1500)] {
+ let mut fragment_buf = Vec::with_capacity(FRAGMENT_BUFFER_CAP);
+ for i in 0..n_packets {
+ let packet_id = i;
+ let mut payload = Bytes::from(vec![i as u8; payload_len as usize]);
+
+ fragment_buf.extend(
+ &mut fragment_packet(
+ MAX_PAYLOAD_SIZE + FRAGMENT_HEADER_SIZE_FRAGMENTED,
+ &mut payload,
+ packet_id,
+ )
+ .unwrap(),
+ );
+ }
+ let n_fragments = fragment_buf.len();
+ assert!(
+ n_fragments <= FRAGMENT_BUFFER_CAP,
+ "Too many fragments generated"
+ );
+ group.throughput(criterion::Throughput::Bytes(
+ (n_packets * payload_len) as u64,
+ ));
+ let mut rng = StdRng::seed_from_u64(42);
+ fragment_buf.shuffle(&mut rng);
+
+ group.bench_with_input(
+ BenchmarkId::new(
+ "assemble_fragment_random",
+ format!("{n_packets}pkts_{payload_len}B_{n_fragments}frags"),
+ ),
+ &fragment_buf,
+ |b, fragment_buf| {
+ b.iter_batched(
+ || (fragment_buf.clone(), Fragments::default()),
+ |(f, mut fragments)| {
+ for frag in f {
+ fragments.handle_incoming_packet(frag).unwrap();
+ }
+ },
+ BatchSize::SmallInput,
+ )
+ },
+ );
+ }
+ group.finish();
+}
+criterion_group!(benches, assemble_fragment_ordered, assemble_fragment_random);
+criterion_main!(benches);
diff --git a/mullvad-masque-proxy/src/client/mod.rs b/mullvad-masque-proxy/src/client/mod.rs
index 32983ccdef..8ab79ec715 100644
--- a/mullvad-masque-proxy/src/client/mod.rs
+++ b/mullvad-masque-proxy/src/client/mod.rs
@@ -573,7 +573,11 @@ async fn client_socket_tx_task(
send(reassembled_payload.chunk()).await?;
}
Ok(DefragReceived::Fragment) => stats.rx(original_payload_len, true),
- Err(_) => (),
+ Err(e) => {
+ use log::Level::*;
+ let level = if cfg!(debug_assertions) { Debug } else { Trace };
+ log::log!(level, "Packet reassembly failed: {e}");
+ }
}
}
diff --git a/mullvad-masque-proxy/src/fragment.rs b/mullvad-masque-proxy/src/fragment.rs
index 5cb05d132c..d4cfca5496 100644
--- a/mullvad-masque-proxy/src/fragment.rs
+++ b/mullvad-masque-proxy/src/fragment.rs
@@ -1,4 +1,4 @@
-use std::collections::{BTreeMap, VecDeque};
+use std::collections::{BTreeMap, VecDeque, btree_map};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use h3::proto::varint::VarInt;
@@ -10,7 +10,7 @@ const FRAGMENT_INDEX_START: u8 = 1;
/// The maximum number of unassembled fragments that we buffer.
// 255 is the theoretical maximum number of fragments for a single packet.
-const FRAGMENT_BUFFER_CAP: usize = 255;
+pub const FRAGMENT_BUFFER_CAP: usize = 255;
pub struct Fragments {
/// FIFO queue of fragment indices. Used to mitigate floods of unordered packet fragments.
@@ -22,6 +22,8 @@ pub struct Fragments {
/// Map of fragmented packets.
///
/// If fragments are arriving in order, this should never hold more than one set of fragments.
+ ///
+ /// INVARIANT: The `Vec` is sorted by `Fragment::index`
// TODO: would a hashmap be faster?
fragment_map: BTreeMap<u16, Vec<Fragment>>,
}
@@ -34,6 +36,12 @@ pub enum DefragError {
#[error("Payload is too small")]
PayloadTooSmall,
+
+ #[error("Too few fragments in fragmented packet")]
+ TooFewFragments,
+
+ #[error("Received a fragment twice")]
+ DuplicateFragment,
}
// When a packet is larger than u16::MAX, it can't be fragmented.
@@ -84,6 +92,10 @@ impl Fragments {
let fragment_count = payload
.try_get_u8()
.map_err(|_| DefragError::PayloadTooSmall)?;
+ if fragment_count < 2 {
+ // Packets with only one fragment should be sent as non-fragmented packets.
+ return Err(DefragError::TooFewFragments);
+ }
let fragment = Fragment { index, payload };
// ensure that the fifo has capacity before pushing the new fragment id
@@ -101,49 +113,42 @@ impl Fragments {
"fragment_index_fifo must never grow",
);
- let fragments = self.fragment_map.entry(id).or_default();
- fragments.push(fragment);
+ let entry = self.fragment_map.entry(id);
- let reassembled = self.try_reassemble(id, fragment_count)
- .map(DefragReceived::Reassembled)
- // TODO: This may also occur if a packet is discarded
- .unwrap_or(DefragReceived::Fragment);
- Ok(reassembled)
- }
+ let mut entry = match entry {
+ btree_map::Entry::Occupied(occupied) => occupied,
- // TODO: Let caller provide output buffer.
- fn try_reassemble(&mut self, id: u16, fragment_count: u8) -> Option<Bytes> {
- // establish that there are enough fragments to reconstruct the whole packet
- let fragments = &self.fragment_map[&id];
- if fragments.len() != fragment_count.into() {
- return None;
- }
+ // if this is the first received fragment, don't bother trying to reassemble
+ btree_map::Entry::Vacant(vacant) => {
+ let mut fragment_list = Vec::with_capacity(2); // two fragments should be the norm
+ fragment_list.push(fragment);
+ vacant.insert(fragment_list);
+ return Ok(DefragReceived::Fragment);
+ }
+ };
- // looks like a valid fragment set. pop it from the map.
- let mut fragments = self.fragment_map.remove(&id).expect("fragment must exist");
+ let fragments = entry.get_mut();
- fragments.sort_unstable_by_key(|f| f.index);
+ // insert the fragment such that the list is sorted
+ match fragments.binary_search_by_key(&fragment.index, |f| f.index) {
+ Err(insert_here) => fragments.insert(insert_here, fragment),
+ Ok(_) => return Err(DefragError::DuplicateFragment),
+ };
- // assert that fragments are in the correct order
- // TODO: is this excessively paranoid?
- let fragments_missing = (FRAGMENT_INDEX_START..)
- .zip(&fragments)
- .any(|(expected_index, fragment)| fragment.index != expected_index);
- if fragments_missing {
- if cfg!(debug_assertions) {
- log::debug!("Discarding unordered fragment set");
- }
- return None;
+ // establish that there are enough fragments to reconstruct the whole packet
+ if fragments.len() != fragment_count.into() {
+ return Ok(DefragReceived::Fragment);
}
+ let fragments = entry.remove();
+
// smush the fragments together
let mut payload = BytesMut::with_capacity(fragments.iter().map(|f| f.payload.len()).sum());
for fragment in fragments {
payload.extend_from_slice(&fragment.payload);
}
- let payload = payload.freeze();
- Some(payload)
+ Ok(DefragReceived::Reassembled(payload.freeze()))
}
}
@@ -203,9 +208,9 @@ mod test {
fn test_fragment_reconstruction() {
let mut fragments = Fragments::default();
- 'outer: for packet_id in 1..255u16 {
+ let max_payload_size = 50;
+ 'outer: for packet_id in max_payload_size..255u16 {
let payload = (0..packet_id as u8).collect::<Vec<u8>>();
- let max_payload_size = 50;
let mut payload_clone = Bytes::from(payload.clone());
let mut fragment_buf = fragment_packet(max_payload_size, &mut payload_clone, packet_id)
@@ -268,8 +273,8 @@ mod test {
let fragment_survives_flood = |number_of_bad_fragments| {
let mut fragments = Fragments::default();
- let packet_id = 123;
- let bad_packet_id = 321;
+ let packet_id = 1;
+ let mut bad_packet_ids = 2..0xffff;
let payload = (0..255).collect::<Vec<u8>>();
let max_payload_size = 50;
@@ -292,15 +297,16 @@ mod test {
// then send a bunch of fragments to fill the queue
let mut bad_payload = Bytes::from([0u8; 2].to_vec());
- let incomplete_fragment = fragment_packet(
- 1 + FRAGMENT_HEADER_SIZE_FRAGMENTED,
- &mut bad_payload,
- bad_packet_id,
- )
- .unwrap()
- .next()
- .unwrap();
for _ in fragment_buf.len()..number_of_bad_fragments {
+ let incomplete_fragment = fragment_packet(
+ 1 + FRAGMENT_HEADER_SIZE_FRAGMENTED,
+ &mut bad_payload,
+ bad_packet_ids.next().unwrap(),
+ )
+ .unwrap()
+ .next()
+ .unwrap();
+
let packet = fragments
.handle_incoming_packet(incomplete_fragment.clone())
.unwrap();
diff --git a/mullvad-masque-proxy/src/lib.rs b/mullvad-masque-proxy/src/lib.rs
index 7d22dc11d7..37f8353acd 100644
--- a/mullvad-masque-proxy/src/lib.rs
+++ b/mullvad-masque-proxy/src/lib.rs
@@ -2,7 +2,7 @@ use h3::proto::varint::VarInt;
use std::net::SocketAddr;
pub mod client;
-mod fragment;
+pub mod fragment;
pub mod server;
mod stats;
@@ -19,7 +19,7 @@ const PACKET_BUFFER_SIZE: usize = (u16::MAX - UDP_HEADER_SIZE + 1) as usize;
const MAX_INFLIGHT_PACKETS: usize = 100;
/// Fragment headers size for fragmented packets
-const FRAGMENT_HEADER_SIZE_FRAGMENTED: u16 = 5;
+pub const FRAGMENT_HEADER_SIZE_FRAGMENTED: u16 = 5;
/// UDP header overhead
const UDP_HEADER_SIZE: u16 = 8;