summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorDavid Lönnhager <david.l@mullvad.net>2025-08-26 15:56:25 +0200
committerDavid Lönnhager <david.l@mullvad.net>2025-08-26 16:07:22 +0200
commit00a4916279e00c97696927cd6ce3c018a53da9c4 (patch)
tree02b4523183bd024b4e247adfd4cc4d581fdf0974
parent91901a62b1f35a18be5b070fc90326219b1b011e (diff)
downloadmullvadvpn-masque-perf.tar.xz
mullvadvpn-masque-perf.zip
TEMP: Try to send single packet with WSASendMsgmasque-perf
-rw-r--r--Cargo.lock1
-rw-r--r--mullvad-masque-proxy/Cargo.toml4
-rw-r--r--mullvad-masque-proxy/src/client/mod.rs90
3 files changed, 88 insertions, 7 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 8810d1959a..e474ecfc88 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3110,6 +3110,7 @@ dependencies = [
"thiserror 2.0.9",
"tokio",
"typed-builder 0.21.0",
+ "windows-sys 0.52.0",
]
[[package]]
diff --git a/mullvad-masque-proxy/Cargo.toml b/mullvad-masque-proxy/Cargo.toml
index 6f54734d1d..cc7085d2b7 100644
--- a/mullvad-masque-proxy/Cargo.toml
+++ b/mullvad-masque-proxy/Cargo.toml
@@ -24,6 +24,10 @@ log = { workspace = true }
socket2 = { workspace = true }
typed-builder = "0.21.0"
+[target.'cfg(windows)'.dependencies.windows-sys]
+workspace = true
+features = ["Win32", "Win32_Foundation", "Win32_Networking", "Win32_Networking_WinSock"]
+
[dev-dependencies]
env_logger = { workspace = true }
tokio = { workspace = true, features = ["fs", "macros", "io-util", "rt-multi-thread"] }
diff --git a/mullvad-masque-proxy/src/client/mod.rs b/mullvad-masque-proxy/src/client/mod.rs
index 32983ccdef..c1a39d33c2 100644
--- a/mullvad-masque-proxy/src/client/mod.rs
+++ b/mullvad-masque-proxy/src/client/mod.rs
@@ -6,6 +6,7 @@ use std::{
future, io,
net::{Ipv4Addr, SocketAddr},
path::Path,
+ ptr,
str::FromStr as _,
sync::{Arc, LazyLock},
time::Duration,
@@ -16,6 +17,7 @@ use tokio::{
sync::{broadcast, mpsc},
};
use typed_builder::TypedBuilder;
+use windows_sys::Win32::Networking::WinSock::{self, CMSGHDR, WSAMSG};
use h3::{client, ext::Protocol, proto::varint::VarInt, quic::StreamId};
use h3_datagram::{datagram::Datagram, datagram_traits::HandleDatagramsExt};
@@ -555,22 +557,96 @@ async fn client_socket_tx_task(
let payload = response.into_payload();
let original_payload_len = payload.len();
- let send = async |payload: &[u8]| -> Result<()> {
- client_socket
- .send_to(payload, return_addr)
- .await
- .map_err(Error::ClientWrite)?;
+ let send = async |destination: SocketAddr, payload: &[u8]| -> Result<()> {
+ // TODO: send multiple datagrams with WinSock::WSASendMsg
+ // TODO: enable GSO
+ // TODO: set socket opt UDP_SEND_MSG_SIZE
+
+ use std::os::windows::io::AsRawSocket;
+ use windows_sys::Win32::Networking::WinSock::{WSABUF, WSASendMsg};
+
+ let daddr: socket2::SockAddr = destination.into();
+
+ let mut data = WSABUF {
+ buf: payload.as_ptr() as *mut _,
+ len: payload.len() as _,
+ };
+
+ // FIXME
+ let mut cmsg_buf = vec![];
+
+ let cmsg_hdr = CMSGHDR {
+ cmsg_len: std::mem::size_of::<CMSGHDR>() + size_of::<u32>(),
+ cmsg_level: WinSock::IPPROTO_UDP,
+ cmsg_type: WinSock::UDP_SEND_MSG_SIZE,
+ };
+
+ //cmsg_buf.extend_from_slice(cmsg_hdr);
+ cmsg_buf.extend_from_slice(unsafe {
+ std::mem::transmute::<_, &[u8; std::mem::size_of::<CMSGHDR>()]>(&cmsg_hdr)
+ });
+ // TODO: segment_size
+ const GSO_SEGMENT_SIZE: u32 = 1500;
+ //let segment_size: u32 = GSO_SEGMENT_SIZE;
+ let segment_size: u32 = payload.len() as u32;
+ cmsg_buf.extend_from_slice(&segment_size.to_ne_bytes());
+
+ let ctrl = WSABUF {
+ buf: cmsg_buf.as_ptr() as *mut _,
+ len: cmsg_buf.len() as _,
+ };
+
+ let raw_socket = client_socket.as_raw_socket();
+
+ let wsa_msg = WSAMSG {
+ name: daddr.as_ptr() as *mut _,
+ namelen: daddr.len(),
+ lpBuffers: &mut data,
+ Control: ctrl,
+ dwBufferCount: 1,
+ dwFlags: 0,
+ };
+
+ // TODO: PKTINFO?
+ // TODO: WinSock::IP_ECN?
+ // TODO: WinSock::IPV6_ECN?
+
+ let mut bytes_sent = 0;
+ let rc = unsafe {
+ WSASendMsg(
+ raw_socket as usize,
+ &wsa_msg,
+ 0,
+ &mut bytes_sent,
+ ptr::null_mut(),
+ None,
+ )
+ };
+
+ log::debug!("WSASendMsg returned: {rc}");
+ log::debug!("BYTES SENT: {bytes_sent}");
+
+ /*client_socket
+ .send_to(payload, return_addr)
+ .await
+ .map_err(Error::ClientWrite)?;*/
Ok(())
};
match fragments.handle_incoming_packet(payload) {
Ok(DefragReceived::Nonfragmented(payload)) => {
stats.rx(payload.len(), false);
- send(payload.chunk()).await?;
+
+ // TODO: coalesce multiple packets
+
+ send(return_addr, payload.chunk()).await?;
}
Ok(DefragReceived::Reassembled(reassembled_payload)) => {
stats.rx(original_payload_len, true);
- send(reassembled_payload.chunk()).await?;
+
+ // TODO: coalesce multiple packets
+
+ send(return_addr, reassembled_payload.chunk()).await?;
}
Ok(DefragReceived::Fragment) => stats.rx(original_payload_len, true),
Err(_) => (),