summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorJoakim Hulthe <joakim.hulthe@mullvad.net>2025-04-07 14:15:31 +0200
committerJoakim Hulthe <joakim.hulthe@mullvad.net>2025-04-09 18:02:55 +0200
commitf0547062d88f468d304bd5bec5937f1e0786cee3 (patch)
treea1e4f52234ca20754ece617fb4cc374d14c54362
parent6af30eecaca4cfa8a9629894caa7efdc49bb24a6 (diff)
downloadmullvadvpn-f0547062d88f468d304bd5bec5937f1e0786cee3.tar.xz
mullvadvpn-f0547062d88f468d304bd5bec5937f1e0786cee3.zip
Include fragmented bytes in stats
-rw-r--r--mullvad-masque-proxy/src/client/mod.rs31
-rw-r--r--mullvad-masque-proxy/src/lib.rs1
-rw-r--r--mullvad-masque-proxy/src/stats.rs46
3 files changed, 58 insertions, 20 deletions
diff --git a/mullvad-masque-proxy/src/client/mod.rs b/mullvad-masque-proxy/src/client/mod.rs
index 67694b9ec1..424a26b8d9 100644
--- a/mullvad-masque-proxy/src/client/mod.rs
+++ b/mullvad-masque-proxy/src/client/mod.rs
@@ -4,10 +4,7 @@ use std::{
fs, future, io,
net::{Ipv4Addr, SocketAddr},
path::Path,
- sync::{
- atomic::{AtomicUsize, Ordering},
- Arc, LazyLock,
- },
+ sync::{Arc, LazyLock},
time::{Duration, Instant},
};
use tokio::{net::UdpSocket, time::interval};
@@ -17,7 +14,10 @@ use h3_datagram::datagram_traits::HandleDatagramsExt;
use http::{header, uri::Scheme, Response, StatusCode};
use quinn::{crypto::rustls::QuicClientConfig, ClientConfig, Endpoint, TransportConfig};
-use crate::fragment::{self, Fragments};
+use crate::{
+ fragment::{self, Fragments},
+ stats::Stats,
+};
const MAX_HEADER_SIZE: u64 = 8192;
@@ -37,19 +37,8 @@ pub struct Client {
fragments: Fragments,
/// Maximum packet size
maximum_packet_size: u16,
- stats: Arc<Stats>,
-}
-
-#[derive(Debug, Default)]
-struct Stats {
- rx_bytes: AtomicUsize,
- tx_bytes: AtomicUsize,
-}
-impl Drop for Stats {
- fn drop(&mut self) {
- println!("stats: {:?}", self);
- }
+ stats: Arc<Stats>,
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -235,8 +224,6 @@ impl Client {
let (bytes_received, recv_addr) = client_read.map_err(Error::ClientRead)?;
return_addr = recv_addr;
- self.stats.tx_bytes.fetch_add(bytes_received, Ordering::Relaxed);
-
/*if prev_stats.elapsed() >= Duration::from_secs(3) {
prev_stats = Instant::now();
println!("stats: {:?}", self.stats);
@@ -244,6 +231,7 @@ impl Client {
let mut send_buf = client_read_buf.split().freeze();
if send_buf.len() < (Into::<usize>::into(self.maximum_packet_size) - 100usize) {
+ self.stats.tx(bytes_received, false);
self.connection
.send_datagram(stream_id, send_buf)
.map_err(Error::SendDatagram)?;
@@ -257,6 +245,7 @@ impl Client {
&mut send_buf,
fragment_id)
? {
+ self.stats.tx(fragment.len(), true);
self.connection.send_datagram(stream_id, fragment).map_err(Error::SendDatagram)?;
}
fragment_id = fragment_id.wrapping_add(1);
@@ -274,13 +263,15 @@ impl Client {
}
let payload = response.into_payload();
- self.stats.rx_bytes.fetch_add(payload.len(), Ordering::Relaxed);
/*if prev_stats.elapsed() >= Duration::from_secs(3) {
prev_stats = Instant::now();
println!("stats: {:?}", self.stats);
}*/
+ let fragment_len = payload.len();
if let Ok(Some(payload)) = self.fragments.handle_incoming_packet(payload) {
+ self.stats.rx(payload.len(), fragment_len != payload.len());
+
self.client_socket
.send_to(payload.chunk(), return_addr)
.await
diff --git a/mullvad-masque-proxy/src/lib.rs b/mullvad-masque-proxy/src/lib.rs
index bb973d3a80..d4a4e47812 100644
--- a/mullvad-masque-proxy/src/lib.rs
+++ b/mullvad-masque-proxy/src/lib.rs
@@ -3,6 +3,7 @@ use h3::proto::varint::VarInt;
pub mod client;
mod fragment;
pub mod server;
+mod stats;
const PACKET_BUFFER_SIZE: usize = 1700;
pub const HTTP_MASQUE_DATAGRAM_CONTEXT_ID: VarInt = VarInt::from_u32(0);
diff --git a/mullvad-masque-proxy/src/stats.rs b/mullvad-masque-proxy/src/stats.rs
new file mode 100644
index 0000000000..412ddcc9bd
--- /dev/null
+++ b/mullvad-masque-proxy/src/stats.rs
@@ -0,0 +1,46 @@
+use std::sync::atomic::{AtomicUsize, Ordering};
+
+#[derive(Debug, Default)]
+pub struct Stats {
+ rx_packets: AtomicUsize,
+ tx_packets: AtomicUsize,
+
+ rx_bytes: AtomicUsize,
+ tx_bytes: AtomicUsize,
+
+ fragmented_tx_bytes: AtomicUsize,
+ fragmented_rx_bytes: AtomicUsize,
+
+ fragmented_tx_packets: AtomicUsize,
+ fragmented_rx_packets: AtomicUsize,
+}
+
+const ORD: Ordering = Ordering::Relaxed;
+
+impl Drop for Stats {
+ fn drop(&mut self) {
+ println!("stats: {:?}", self);
+ }
+}
+
+impl Stats {
+ pub fn tx(&self, packet_len: usize, is_fragment: bool) {
+ self.tx_packets.fetch_add(1, ORD);
+ self.tx_bytes.fetch_add(packet_len, ORD);
+
+ if is_fragment {
+ self.fragmented_tx_packets.fetch_add(1, ORD);
+ self.fragmented_tx_bytes.fetch_add(packet_len, ORD);
+ }
+ }
+
+ pub fn rx(&self, packet_len: usize, is_fragment: bool) {
+ self.rx_packets.fetch_add(1, ORD);
+ self.rx_bytes.fetch_add(packet_len, ORD);
+
+ if is_fragment {
+ self.fragmented_rx_packets.fetch_add(1, ORD);
+ self.fragmented_rx_bytes.fetch_add(packet_len, ORD);
+ }
+ }
+}