summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorDavid Lönnhager <david.l@mullvad.net>2025-05-09 19:36:18 +0200
committerDavid Lönnhager <david.l@mullvad.net>2025-05-09 19:36:18 +0200
commit7b03ad31f612918cf1d37d17fcc22229d39850da (patch)
tree6e81582380c4b7d9ef820c23209a24b242428043
parentbae82a2925c7b0bd6dcef23009ef9e1d0bacda9c (diff)
downloadmullvadvpn-masque-bench.tar.xz
mullvadvpn-masque-bench.zip
udp testmasque-bench
-rw-r--r--mullvad-masque-proxy/src/udp.rs105
-rw-r--r--mullvad-masque-proxy/tests/proxy.rs141
2 files changed, 222 insertions, 24 deletions
diff --git a/mullvad-masque-proxy/src/udp.rs b/mullvad-masque-proxy/src/udp.rs
index f553977da6..d9b3d5dc30 100644
--- a/mullvad-masque-proxy/src/udp.rs
+++ b/mullvad-masque-proxy/src/udp.rs
@@ -1,7 +1,7 @@
-use std::{net::SocketAddr, sync::{atomic::AtomicUsize, Arc, Mutex}};
+use std::{io::Write, net::SocketAddr, sync::{atomic::AtomicUsize, Arc, }, time::Duration};
-use bytes::BytesMut;
-use tokio::{io, net::UdpSocket};
+use bytes::{Buf, BufMut, BytesMut};
+use tokio::{io, net::UdpSocket, sync::Notify};
pub trait UdpSocketTrait: Send + Sync + 'static {
@@ -26,38 +26,101 @@ impl UdpSocketTrait for UdpSocket {
}
}
-// TODO
-/*
-pub fn fake_udp_pair() -> (FakeUdpSender, FakeUdpReceiver) {
- todo!()
+pub fn fake_udp_pair() -> (FakeUdp, FakeUdp) {
+ let inner1: Arc<tokio::sync::Mutex<FakeInner>> = Arc::default();
+ let inner2: Arc<tokio::sync::Mutex<FakeInner>> = Arc::default();
+ let udp1 = FakeUdp {
+ me: inner1.clone(),
+ other: inner2.clone(),
+ };
+ let udp2 = FakeUdp {
+ me: inner2,
+ other: inner1,
+ };
+ (udp1, udp2)
}
-pub struct FakeUdpSender(FakeInner);
-
-pub struct FakeUdpReceiver(FakeInner);
+pub struct FakeUdp {
+ me: Arc<tokio::sync::Mutex<FakeInner>>,
+ other: Arc<tokio::sync::Mutex<FakeInner>>,
+}
struct FakeInner {
- rx: Vec<u8>,
+ data: BytesMut,
+ // FIXME
+ addr: SocketAddr,
+ signal: Arc<Notify>,
+}
+
+impl Default for FakeInner {
+ fn default() -> Self {
+ Self {
+ data: BytesMut::with_capacity(8 * 1024 * 1024),
+ addr: "0.0.0.0:0".parse().unwrap(),
+ signal: Arc::default(),
+ }
+ }
}
-impl UdpSocketTrait for FakeUdpSender {
+impl UdpSocketTrait for FakeUdp {
async fn send_to(&self, buf: &[u8], addr: SocketAddr) -> io::Result<usize> {
- UdpSocket::send_to(self, buf, addr).await
+ let mut other = self.other.lock().await;
+ let n_write = other.data.remaining_mut().min(buf.len());
+
+ other.data.put_slice(&buf[..n_write]);
+ other.addr = addr;
+
+ other.signal.notify_one();
+
+ Ok(n_write)
}
async fn recv_buf_from(&self, buf: &mut BytesMut) -> io::Result<(usize, SocketAddr)> {
- UdpSocket::recv_buf_from(self, buf).await
+ loop {
+ let mut me_ = self.me.lock().await;
+ if me_.data.len() != 0 {
+ let n_read = me_.data.len().min(buf.remaining_mut());
+
+ buf.put(me_.data.split_to(n_read));
+
+ return Ok((n_read, me_.addr));
+ }
+ let signal = me_.signal.clone();
+ drop(me_);
+
+ signal.notified().await;
+ }
}
}
-impl UdpSocketTrait for FakeUdpReceiver {
- async fn send_to(&self, buf: &[u8], addr: SocketAddr) -> io::Result<usize> {
- UdpSocket::send_to(self, buf, addr).await
- }
+#[cfg(test)]
+mod test {
+ use super::*;
- async fn recv_buf_from(&self, buf: &mut BytesMut) -> io::Result<(usize, SocketAddr)> {
- UdpSocket::recv_buf_from(self, buf).await
+ #[tokio::test]
+ async fn test_fake_udp() {
+ const SENDMSG: &[u8] = b"hello there!";
+ let expected_addr: SocketAddr = "1.2.3.4:1234".parse().unwrap();
+
+ let (udp1, udp2) = fake_udp_pair();
+
+ udp1.send_to(SENDMSG, expected_addr).await.unwrap();
+
+ let mut buf = BytesMut::new();
+ let (nbytes, addr) = udp2.recv_buf_from(&mut buf).await.unwrap();
+
+ assert_eq!(nbytes, SENDMSG.len());
+ assert_eq!(buf.chunk(), SENDMSG);
+ assert_eq!(addr, expected_addr);
+
+ udp2.send_to(SENDMSG, expected_addr).await.unwrap();
+
+ let mut buf = BytesMut::new();
+ let (nbytes, addr) = udp1.recv_buf_from(&mut buf).await.unwrap();
+
+ assert_eq!(nbytes, SENDMSG.len());
+ assert_eq!(buf.chunk(), SENDMSG);
+ assert_eq!(addr, expected_addr);
}
}
- */
diff --git a/mullvad-masque-proxy/tests/proxy.rs b/mullvad-masque-proxy/tests/proxy.rs
index 25a95bbbcf..140eb92404 100644
--- a/mullvad-masque-proxy/tests/proxy.rs
+++ b/mullvad-masque-proxy/tests/proxy.rs
@@ -8,6 +8,9 @@ use anyhow::Context;
use bytes::BytesMut;
use mullvad_masque_proxy::server::AllowedIps;
use mullvad_masque_proxy::server::ServerParams;
+use mullvad_masque_proxy::udp::fake_udp_pair;
+use mullvad_masque_proxy::udp::FakeUdp;
+use mullvad_masque_proxy::udp::UdpSocketTrait;
use mullvad_masque_proxy::MIN_IPV4_MTU;
use rand::RngCore;
use tokio::fs;
@@ -16,17 +19,83 @@ use mullvad_masque_proxy::client;
use mullvad_masque_proxy::server;
use tokio::net::UdpSocket;
use tokio::time::timeout;
+use tokio::time::Instant;
+
+#[tokio::test]
+async fn test_perf() -> anyhow::Result<()> {
+ let mut data = vec![0u8; 1400];
+ rand::thread_rng().fill_bytes(&mut data);
+
+ let began = Instant::now();
+
+ for _ in 0..100 {
+ const MTU: u16 = 1700;
+ //let (client, server) = setup_masque_fake(MTU).await?;
+ let (client, server) = setup_masque(MTU).await?;
+
+ // Proxy client -> destination
+ let mut rx_buf = BytesMut::with_capacity(data.len());
+ client.send(&data).await?;
+ let (_, proxy_addr) = server
+ .recv_buf_from(&mut rx_buf)
+ .await
+ .context("Expected to receive message")?;
+ assert_eq!(&*rx_buf, data, "Expected to receive message from client");
+
+ // Destination -> proxy client
+ let mut rx_buf = BytesMut::with_capacity(data.len());
+ server.send_to(&data, proxy_addr).await?;
+ client
+ .recv_buf_from(&mut rx_buf)
+ .await
+ .context("Expected to receive message")?;
+ assert_eq!(&*rx_buf, data, "Expected to receive message from server");
+ }
+
+ eprintln!("real E2E: {}", began.elapsed().as_millis());
+
+ let dst = "123.123.123.123:123".parse().unwrap();
+ let began = Instant::now();
+ for _ in 0..100 {
+ const MTU: u16 = 1700;
+ let (client, server) = setup_masque_fake(MTU).await?;
+
+ // Proxy client -> destination
+ let mut rx_buf = BytesMut::with_capacity(data.len());
+ client.send_to(&data, dst).await?;
+ let (_, proxy_addr) = server
+ .recv_buf_from(&mut rx_buf)
+ .await
+ .context("Expected to receive message")?;
+ assert_eq!(&*rx_buf, data, "Expected to receive message from client");
+
+ // Destination -> proxy client
+ let mut rx_buf = BytesMut::with_capacity(data.len());
+ server.send_to(&data, proxy_addr).await?;
+ client
+ .recv_buf_from(&mut rx_buf)
+ .await
+ .context("Expected to receive message")?;
+ assert_eq!(&*rx_buf, data, "Expected to receive message from server");
+ }
+
+ eprintln!("fake E2E: {}", began.elapsed().as_millis());
+
+ panic!();
+
+ Ok(())
+}
/// Set up a MASQUE proxy and test that it can be used to communicate with some UDP destination
#[tokio::test]
async fn test_server_and_client_forwarding() -> anyhow::Result<()> {
timeout(Duration::from_secs(1), async {
const MTU: u16 = 1700;
- let (client, server) = setup_masque(MTU).await?;
+ let (client, server) = setup_masque_fake(MTU).await?;
// Proxy client -> destination
let mut rx_buf = BytesMut::with_capacity(128);
- client.send(b"abc").await?;
+ client.send_to(b"abc", "123.123.123.123:123".parse().unwrap()).await?;
let (_, proxy_addr) = server
.recv_buf_from(&mut rx_buf)
.await
@@ -37,7 +106,7 @@ async fn test_server_and_client_forwarding() -> anyhow::Result<()> {
let mut rx_buf = BytesMut::with_capacity(128);
server.send_to(b"def", proxy_addr).await?;
client
- .recv_buf(&mut rx_buf)
+ .recv_buf_from(&mut rx_buf)
.await
.context("Expected to receive message")?;
assert_eq!(&*rx_buf, b"def", "Expected to receive message from server");
@@ -202,6 +271,72 @@ async fn setup_masque(mtu: u16) -> anyhow::Result<(UdpSocket, UdpSocket)> {
Ok((proxy_client, destination_udp_server))
}
+/// Set up a client and server connected by a MASQUE proxy.
+/// This returns a UDP socket that is connected to the local MASQUE client,
+/// and a UDP socket that represents the other endpoint.
+/// Note that the server socket (second returned value) is not connected,
+/// so `recv_from` must be used.
+async fn setup_masque_fake(mtu: u16) -> anyhow::Result<(FakeUdp, UdpSocket)> {
+ const HOST: &str = "test.test";
+
+ // FIXME: simplify shared setup
+
+ let any_localhost_addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
+
+ let (proxy_client, local_socket) = fake_udp_pair();
+
+ // TODO: fake this end too (ie target a fake socket instead of target_addr)
+ let destination_udp_server = UdpSocket::bind(any_localhost_addr).await?;
+ let target_udp_addr = destination_udp_server
+ .local_addr()
+ .context("Retrieve dest UDP server addr")?;
+
+ // Set up MASQUE server
+ let server_tls_config = load_server_test_cert().await?;
+
+ let params = ServerParams::builder()
+ .allowed_hosts(AllowedIps::default())
+ .mtu(mtu)
+ .auth_header(Some("Bearer test".to_owned()))
+ .build();
+
+ let server = server::Server::bind(any_localhost_addr, Arc::new(server_tls_config), params)
+ .context("Failed to start MASQUE server")?;
+
+ let masque_server_addr = server.local_addr()?;
+
+ tokio::spawn(async move {
+ if let Err(err) = server.run().await {
+ eprintln!("server.run() failed: {err}");
+ }
+ });
+
+ // Set up MASQUE client
+
+ let client_config = client::ClientConfig::builder()
+ .client_socket(local_socket)
+ .local_addr(any_localhost_addr)
+ .server_addr(masque_server_addr)
+ .server_host(HOST.to_owned())
+ .target_addr(target_udp_addr)
+ .mtu(mtu)
+ .idle_timeout(Some(Duration::from_secs(10)))
+ .auth_header(Some("Bearer test".to_owned()))
+ .build();
+
+ let client = client::Client::connect(client_config)
+ .await
+ .context("Failed to start MASQUE client")?;
+
+ tokio::spawn(async move {
+ if let Err(err) = client.run().await {
+ eprintln!("client.run() failed: {err}");
+ }
+ });
+
+ Ok((proxy_client, destination_udp_server))
+}
+
async fn load_server_test_cert() -> anyhow::Result<rustls::ServerConfig> {
let key = fs::read("tests/test.key").await.context("Read test key")?;
let key = rustls_pemfile::private_key(&mut &*key)?.context("Invalid test key")?;