diff options
| author | Joakim Hulthe <joakim.hulthe@mullvad.net> | 2025-04-07 14:15:31 +0200 |
|---|---|---|
| committer | Joakim Hulthe <joakim.hulthe@mullvad.net> | 2025-04-09 18:02:55 +0200 |
| commit | f0547062d88f468d304bd5bec5937f1e0786cee3 (patch) | |
| tree | a1e4f52234ca20754ece617fb4cc374d14c54362 | |
| parent | 6af30eecaca4cfa8a9629894caa7efdc49bb24a6 (diff) | |
| download | mullvadvpn-f0547062d88f468d304bd5bec5937f1e0786cee3.tar.xz mullvadvpn-f0547062d88f468d304bd5bec5937f1e0786cee3.zip | |
Include fragmented bytes in stats
| -rw-r--r-- | mullvad-masque-proxy/src/client/mod.rs | 31 | ||||
| -rw-r--r-- | mullvad-masque-proxy/src/lib.rs | 1 | ||||
| -rw-r--r-- | mullvad-masque-proxy/src/stats.rs | 46 |
3 files changed, 58 insertions, 20 deletions
diff --git a/mullvad-masque-proxy/src/client/mod.rs b/mullvad-masque-proxy/src/client/mod.rs index 67694b9ec1..424a26b8d9 100644 --- a/mullvad-masque-proxy/src/client/mod.rs +++ b/mullvad-masque-proxy/src/client/mod.rs @@ -4,10 +4,7 @@ use std::{ fs, future, io, net::{Ipv4Addr, SocketAddr}, path::Path, - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, LazyLock, - }, + sync::{Arc, LazyLock}, time::{Duration, Instant}, }; use tokio::{net::UdpSocket, time::interval}; @@ -17,7 +14,10 @@ use h3_datagram::datagram_traits::HandleDatagramsExt; use http::{header, uri::Scheme, Response, StatusCode}; use quinn::{crypto::rustls::QuicClientConfig, ClientConfig, Endpoint, TransportConfig}; -use crate::fragment::{self, Fragments}; +use crate::{ + fragment::{self, Fragments}, + stats::Stats, +}; const MAX_HEADER_SIZE: u64 = 8192; @@ -37,19 +37,8 @@ pub struct Client { fragments: Fragments, /// Maximum packet size maximum_packet_size: u16, - stats: Arc<Stats>, -} - -#[derive(Debug, Default)] -struct Stats { - rx_bytes: AtomicUsize, - tx_bytes: AtomicUsize, -} -impl Drop for Stats { - fn drop(&mut self) { - println!("stats: {:?}", self); - } + stats: Arc<Stats>, } pub type Result<T> = std::result::Result<T, Error>; @@ -235,8 +224,6 @@ impl Client { let (bytes_received, recv_addr) = client_read.map_err(Error::ClientRead)?; return_addr = recv_addr; - self.stats.tx_bytes.fetch_add(bytes_received, Ordering::Relaxed); - /*if prev_stats.elapsed() >= Duration::from_secs(3) { prev_stats = Instant::now(); println!("stats: {:?}", self.stats); @@ -244,6 +231,7 @@ impl Client { let mut send_buf = client_read_buf.split().freeze(); if send_buf.len() < (Into::<usize>::into(self.maximum_packet_size) - 100usize) { + self.stats.tx(bytes_received, false); self.connection .send_datagram(stream_id, send_buf) .map_err(Error::SendDatagram)?; @@ -257,6 +245,7 @@ impl Client { &mut send_buf, fragment_id) ? { + self.stats.tx(fragment.len(), true); self.connection.send_datagram(stream_id, fragment).map_err(Error::SendDatagram)?; } fragment_id = fragment_id.wrapping_add(1); @@ -274,13 +263,15 @@ impl Client { } let payload = response.into_payload(); - self.stats.rx_bytes.fetch_add(payload.len(), Ordering::Relaxed); /*if prev_stats.elapsed() >= Duration::from_secs(3) { prev_stats = Instant::now(); println!("stats: {:?}", self.stats); }*/ + let fragment_len = payload.len(); if let Ok(Some(payload)) = self.fragments.handle_incoming_packet(payload) { + self.stats.rx(payload.len(), fragment_len != payload.len()); + self.client_socket .send_to(payload.chunk(), return_addr) .await diff --git a/mullvad-masque-proxy/src/lib.rs b/mullvad-masque-proxy/src/lib.rs index bb973d3a80..d4a4e47812 100644 --- a/mullvad-masque-proxy/src/lib.rs +++ b/mullvad-masque-proxy/src/lib.rs @@ -3,6 +3,7 @@ use h3::proto::varint::VarInt; pub mod client; mod fragment; pub mod server; +mod stats; const PACKET_BUFFER_SIZE: usize = 1700; pub const HTTP_MASQUE_DATAGRAM_CONTEXT_ID: VarInt = VarInt::from_u32(0); diff --git a/mullvad-masque-proxy/src/stats.rs b/mullvad-masque-proxy/src/stats.rs new file mode 100644 index 0000000000..412ddcc9bd --- /dev/null +++ b/mullvad-masque-proxy/src/stats.rs @@ -0,0 +1,46 @@ +use std::sync::atomic::{AtomicUsize, Ordering}; + +#[derive(Debug, Default)] +pub struct Stats { + rx_packets: AtomicUsize, + tx_packets: AtomicUsize, + + rx_bytes: AtomicUsize, + tx_bytes: AtomicUsize, + + fragmented_tx_bytes: AtomicUsize, + fragmented_rx_bytes: AtomicUsize, + + fragmented_tx_packets: AtomicUsize, + fragmented_rx_packets: AtomicUsize, +} + +const ORD: Ordering = Ordering::Relaxed; + +impl Drop for Stats { + fn drop(&mut self) { + println!("stats: {:?}", self); + } +} + +impl Stats { + pub fn tx(&self, packet_len: usize, is_fragment: bool) { + self.tx_packets.fetch_add(1, ORD); + self.tx_bytes.fetch_add(packet_len, ORD); + + if is_fragment { + self.fragmented_tx_packets.fetch_add(1, ORD); + self.fragmented_tx_bytes.fetch_add(packet_len, ORD); + } + } + + pub fn rx(&self, packet_len: usize, is_fragment: bool) { + self.rx_packets.fetch_add(1, ORD); + self.rx_bytes.fetch_add(packet_len, ORD); + + if is_fragment { + self.fragmented_rx_packets.fetch_add(1, ORD); + self.fragmented_rx_bytes.fetch_add(packet_len, ORD); + } + } +} |
