diff options
| author | David Lönnhager <david.l@mullvad.net> | 2025-05-09 19:36:18 +0200 |
|---|---|---|
| committer | David Lönnhager <david.l@mullvad.net> | 2025-05-09 19:36:18 +0200 |
| commit | 7b03ad31f612918cf1d37d17fcc22229d39850da (patch) | |
| tree | 6e81582380c4b7d9ef820c23209a24b242428043 | |
| parent | bae82a2925c7b0bd6dcef23009ef9e1d0bacda9c (diff) | |
| download | mullvadvpn-masque-bench.tar.xz mullvadvpn-masque-bench.zip | |
udp testmasque-bench
| -rw-r--r-- | mullvad-masque-proxy/src/udp.rs | 105 | ||||
| -rw-r--r-- | mullvad-masque-proxy/tests/proxy.rs | 141 |
2 files changed, 222 insertions, 24 deletions
diff --git a/mullvad-masque-proxy/src/udp.rs b/mullvad-masque-proxy/src/udp.rs index f553977da6..d9b3d5dc30 100644 --- a/mullvad-masque-proxy/src/udp.rs +++ b/mullvad-masque-proxy/src/udp.rs @@ -1,7 +1,7 @@ -use std::{net::SocketAddr, sync::{atomic::AtomicUsize, Arc, Mutex}}; +use std::{io::Write, net::SocketAddr, sync::{atomic::AtomicUsize, Arc, }, time::Duration}; -use bytes::BytesMut; -use tokio::{io, net::UdpSocket}; +use bytes::{Buf, BufMut, BytesMut}; +use tokio::{io, net::UdpSocket, sync::Notify}; pub trait UdpSocketTrait: Send + Sync + 'static { @@ -26,38 +26,101 @@ impl UdpSocketTrait for UdpSocket { } } -// TODO -/* -pub fn fake_udp_pair() -> (FakeUdpSender, FakeUdpReceiver) { - todo!() +pub fn fake_udp_pair() -> (FakeUdp, FakeUdp) { + let inner1: Arc<tokio::sync::Mutex<FakeInner>> = Arc::default(); + let inner2: Arc<tokio::sync::Mutex<FakeInner>> = Arc::default(); + let udp1 = FakeUdp { + me: inner1.clone(), + other: inner2.clone(), + }; + let udp2 = FakeUdp { + me: inner2, + other: inner1, + }; + (udp1, udp2) } -pub struct FakeUdpSender(FakeInner); - -pub struct FakeUdpReceiver(FakeInner); +pub struct FakeUdp { + me: Arc<tokio::sync::Mutex<FakeInner>>, + other: Arc<tokio::sync::Mutex<FakeInner>>, +} struct FakeInner { - rx: Vec<u8>, + data: BytesMut, + // FIXME + addr: SocketAddr, + signal: Arc<Notify>, +} + +impl Default for FakeInner { + fn default() -> Self { + Self { + data: BytesMut::with_capacity(8 * 1024 * 1024), + addr: "0.0.0.0:0".parse().unwrap(), + signal: Arc::default(), + } + } } -impl UdpSocketTrait for FakeUdpSender { +impl UdpSocketTrait for FakeUdp { async fn send_to(&self, buf: &[u8], addr: SocketAddr) -> io::Result<usize> { - UdpSocket::send_to(self, buf, addr).await + let mut other = self.other.lock().await; + let n_write = other.data.remaining_mut().min(buf.len()); + + other.data.put_slice(&buf[..n_write]); + other.addr = addr; + + other.signal.notify_one(); + + Ok(n_write) } async fn recv_buf_from(&self, buf: &mut BytesMut) -> io::Result<(usize, SocketAddr)> { - UdpSocket::recv_buf_from(self, buf).await + loop { + let mut me_ = self.me.lock().await; + if me_.data.len() != 0 { + let n_read = me_.data.len().min(buf.remaining_mut()); + + buf.put(me_.data.split_to(n_read)); + + return Ok((n_read, me_.addr)); + } + let signal = me_.signal.clone(); + drop(me_); + + signal.notified().await; + } } } -impl UdpSocketTrait for FakeUdpReceiver { - async fn send_to(&self, buf: &[u8], addr: SocketAddr) -> io::Result<usize> { - UdpSocket::send_to(self, buf, addr).await - } +#[cfg(test)] +mod test { + use super::*; - async fn recv_buf_from(&self, buf: &mut BytesMut) -> io::Result<(usize, SocketAddr)> { - UdpSocket::recv_buf_from(self, buf).await + #[tokio::test] + async fn test_fake_udp() { + const SENDMSG: &[u8] = b"hello there!"; + let expected_addr: SocketAddr = "1.2.3.4:1234".parse().unwrap(); + + let (udp1, udp2) = fake_udp_pair(); + + udp1.send_to(SENDMSG, expected_addr).await.unwrap(); + + let mut buf = BytesMut::new(); + let (nbytes, addr) = udp2.recv_buf_from(&mut buf).await.unwrap(); + + assert_eq!(nbytes, SENDMSG.len()); + assert_eq!(buf.chunk(), SENDMSG); + assert_eq!(addr, expected_addr); + + udp2.send_to(SENDMSG, expected_addr).await.unwrap(); + + let mut buf = BytesMut::new(); + let (nbytes, addr) = udp1.recv_buf_from(&mut buf).await.unwrap(); + + assert_eq!(nbytes, SENDMSG.len()); + assert_eq!(buf.chunk(), SENDMSG); + assert_eq!(addr, expected_addr); } } - */ diff --git a/mullvad-masque-proxy/tests/proxy.rs b/mullvad-masque-proxy/tests/proxy.rs index 25a95bbbcf..140eb92404 100644 --- a/mullvad-masque-proxy/tests/proxy.rs +++ b/mullvad-masque-proxy/tests/proxy.rs @@ -8,6 +8,9 @@ use anyhow::Context; use bytes::BytesMut; use mullvad_masque_proxy::server::AllowedIps; use mullvad_masque_proxy::server::ServerParams; +use mullvad_masque_proxy::udp::fake_udp_pair; +use mullvad_masque_proxy::udp::FakeUdp; +use mullvad_masque_proxy::udp::UdpSocketTrait; use mullvad_masque_proxy::MIN_IPV4_MTU; use rand::RngCore; use tokio::fs; @@ -16,17 +19,83 @@ use mullvad_masque_proxy::client; use mullvad_masque_proxy::server; use tokio::net::UdpSocket; use tokio::time::timeout; +use tokio::time::Instant; + +#[tokio::test] +async fn test_perf() -> anyhow::Result<()> { + let mut data = vec![0u8; 1400]; + rand::thread_rng().fill_bytes(&mut data); + + let began = Instant::now(); + + for _ in 0..100 { + const MTU: u16 = 1700; + //let (client, server) = setup_masque_fake(MTU).await?; + let (client, server) = setup_masque(MTU).await?; + + // Proxy client -> destination + let mut rx_buf = BytesMut::with_capacity(data.len()); + client.send(&data).await?; + let (_, proxy_addr) = server + .recv_buf_from(&mut rx_buf) + .await + .context("Expected to receive message")?; + assert_eq!(&*rx_buf, data, "Expected to receive message from client"); + + // Destination -> proxy client + let mut rx_buf = BytesMut::with_capacity(data.len()); + server.send_to(&data, proxy_addr).await?; + client + .recv_buf_from(&mut rx_buf) + .await + .context("Expected to receive message")?; + assert_eq!(&*rx_buf, data, "Expected to receive message from server"); + } + + eprintln!("real E2E: {}", began.elapsed().as_millis()); + + let dst = "123.123.123.123:123".parse().unwrap(); + let began = Instant::now(); + for _ in 0..100 { + const MTU: u16 = 1700; + let (client, server) = setup_masque_fake(MTU).await?; + + // Proxy client -> destination + let mut rx_buf = BytesMut::with_capacity(data.len()); + client.send_to(&data, dst).await?; + let (_, proxy_addr) = server + .recv_buf_from(&mut rx_buf) + .await + .context("Expected to receive message")?; + assert_eq!(&*rx_buf, data, "Expected to receive message from client"); + + // Destination -> proxy client + let mut rx_buf = BytesMut::with_capacity(data.len()); + server.send_to(&data, proxy_addr).await?; + client + .recv_buf_from(&mut rx_buf) + .await + .context("Expected to receive message")?; + assert_eq!(&*rx_buf, data, "Expected to receive message from server"); + } + + eprintln!("fake E2E: {}", began.elapsed().as_millis()); + + panic!(); + + Ok(()) +} /// Set up a MASQUE proxy and test that it can be used to communicate with some UDP destination #[tokio::test] async fn test_server_and_client_forwarding() -> anyhow::Result<()> { timeout(Duration::from_secs(1), async { const MTU: u16 = 1700; - let (client, server) = setup_masque(MTU).await?; + let (client, server) = setup_masque_fake(MTU).await?; // Proxy client -> destination let mut rx_buf = BytesMut::with_capacity(128); - client.send(b"abc").await?; + client.send_to(b"abc", "123.123.123.123:123".parse().unwrap()).await?; let (_, proxy_addr) = server .recv_buf_from(&mut rx_buf) .await @@ -37,7 +106,7 @@ async fn test_server_and_client_forwarding() -> anyhow::Result<()> { let mut rx_buf = BytesMut::with_capacity(128); server.send_to(b"def", proxy_addr).await?; client - .recv_buf(&mut rx_buf) + .recv_buf_from(&mut rx_buf) .await .context("Expected to receive message")?; assert_eq!(&*rx_buf, b"def", "Expected to receive message from server"); @@ -202,6 +271,72 @@ async fn setup_masque(mtu: u16) -> anyhow::Result<(UdpSocket, UdpSocket)> { Ok((proxy_client, destination_udp_server)) } +/// Set up a client and server connected by a MASQUE proxy. +/// This returns a UDP socket that is connected to the local MASQUE client, +/// and a UDP socket that represents the other endpoint. +/// Note that the server socket (second returned value) is not connected, +/// so `recv_from` must be used. +async fn setup_masque_fake(mtu: u16) -> anyhow::Result<(FakeUdp, UdpSocket)> { + const HOST: &str = "test.test"; + + // FIXME: simplify shared setup + + let any_localhost_addr: SocketAddr = "127.0.0.1:0".parse().unwrap(); + + let (proxy_client, local_socket) = fake_udp_pair(); + + // TODO: fake this end too (ie target a fake socket instead of target_addr) + let destination_udp_server = UdpSocket::bind(any_localhost_addr).await?; + let target_udp_addr = destination_udp_server + .local_addr() + .context("Retrieve dest UDP server addr")?; + + // Set up MASQUE server + let server_tls_config = load_server_test_cert().await?; + + let params = ServerParams::builder() + .allowed_hosts(AllowedIps::default()) + .mtu(mtu) + .auth_header(Some("Bearer test".to_owned())) + .build(); + + let server = server::Server::bind(any_localhost_addr, Arc::new(server_tls_config), params) + .context("Failed to start MASQUE server")?; + + let masque_server_addr = server.local_addr()?; + + tokio::spawn(async move { + if let Err(err) = server.run().await { + eprintln!("server.run() failed: {err}"); + } + }); + + // Set up MASQUE client + + let client_config = client::ClientConfig::builder() + .client_socket(local_socket) + .local_addr(any_localhost_addr) + .server_addr(masque_server_addr) + .server_host(HOST.to_owned()) + .target_addr(target_udp_addr) + .mtu(mtu) + .idle_timeout(Some(Duration::from_secs(10))) + .auth_header(Some("Bearer test".to_owned())) + .build(); + + let client = client::Client::connect(client_config) + .await + .context("Failed to start MASQUE client")?; + + tokio::spawn(async move { + if let Err(err) = client.run().await { + eprintln!("client.run() failed: {err}"); + } + }); + + Ok((proxy_client, destination_udp_server)) +} + async fn load_server_test_cert() -> anyhow::Result<rustls::ServerConfig> { let key = fs::read("tests/test.key").await.context("Read test key")?; let key = rustls_pemfile::private_key(&mut &*key)?.context("Invalid test key")?; |
