diff options
| -rw-r--r-- | Cargo.lock | 1 | ||||
| -rw-r--r-- | mullvad-masque-proxy/Cargo.toml | 4 | ||||
| -rw-r--r-- | mullvad-masque-proxy/src/client/mod.rs | 90 |
3 files changed, 88 insertions, 7 deletions
diff --git a/Cargo.lock b/Cargo.lock index 8810d1959a..e474ecfc88 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3110,6 +3110,7 @@ dependencies = [ "thiserror 2.0.9", "tokio", "typed-builder 0.21.0", + "windows-sys 0.52.0", ] [[package]] diff --git a/mullvad-masque-proxy/Cargo.toml b/mullvad-masque-proxy/Cargo.toml index 6f54734d1d..cc7085d2b7 100644 --- a/mullvad-masque-proxy/Cargo.toml +++ b/mullvad-masque-proxy/Cargo.toml @@ -24,6 +24,10 @@ log = { workspace = true } socket2 = { workspace = true } typed-builder = "0.21.0" +[target.'cfg(windows)'.dependencies.windows-sys] +workspace = true +features = ["Win32", "Win32_Foundation", "Win32_Networking", "Win32_Networking_WinSock"] + [dev-dependencies] env_logger = { workspace = true } tokio = { workspace = true, features = ["fs", "macros", "io-util", "rt-multi-thread"] } diff --git a/mullvad-masque-proxy/src/client/mod.rs b/mullvad-masque-proxy/src/client/mod.rs index 32983ccdef..c1a39d33c2 100644 --- a/mullvad-masque-proxy/src/client/mod.rs +++ b/mullvad-masque-proxy/src/client/mod.rs @@ -6,6 +6,7 @@ use std::{ future, io, net::{Ipv4Addr, SocketAddr}, path::Path, + ptr, str::FromStr as _, sync::{Arc, LazyLock}, time::Duration, @@ -16,6 +17,7 @@ use tokio::{ sync::{broadcast, mpsc}, }; use typed_builder::TypedBuilder; +use windows_sys::Win32::Networking::WinSock::{self, CMSGHDR, WSAMSG}; use h3::{client, ext::Protocol, proto::varint::VarInt, quic::StreamId}; use h3_datagram::{datagram::Datagram, datagram_traits::HandleDatagramsExt}; @@ -555,22 +557,96 @@ async fn client_socket_tx_task( let payload = response.into_payload(); let original_payload_len = payload.len(); - let send = async |payload: &[u8]| -> Result<()> { - client_socket - .send_to(payload, return_addr) - .await - .map_err(Error::ClientWrite)?; + let send = async |destination: SocketAddr, payload: &[u8]| -> Result<()> { + // TODO: send multiple datagrams with WinSock::WSASendMsg + // TODO: enable GSO + // TODO: set socket opt UDP_SEND_MSG_SIZE + + use std::os::windows::io::AsRawSocket; + use windows_sys::Win32::Networking::WinSock::{WSABUF, WSASendMsg}; + + let daddr: socket2::SockAddr = destination.into(); + + let mut data = WSABUF { + buf: payload.as_ptr() as *mut _, + len: payload.len() as _, + }; + + // FIXME + let mut cmsg_buf = vec![]; + + let cmsg_hdr = CMSGHDR { + cmsg_len: std::mem::size_of::<CMSGHDR>() + size_of::<u32>(), + cmsg_level: WinSock::IPPROTO_UDP, + cmsg_type: WinSock::UDP_SEND_MSG_SIZE, + }; + + //cmsg_buf.extend_from_slice(cmsg_hdr); + cmsg_buf.extend_from_slice(unsafe { + std::mem::transmute::<_, &[u8; std::mem::size_of::<CMSGHDR>()]>(&cmsg_hdr) + }); + // TODO: segment_size + const GSO_SEGMENT_SIZE: u32 = 1500; + //let segment_size: u32 = GSO_SEGMENT_SIZE; + let segment_size: u32 = payload.len() as u32; + cmsg_buf.extend_from_slice(&segment_size.to_ne_bytes()); + + let ctrl = WSABUF { + buf: cmsg_buf.as_ptr() as *mut _, + len: cmsg_buf.len() as _, + }; + + let raw_socket = client_socket.as_raw_socket(); + + let wsa_msg = WSAMSG { + name: daddr.as_ptr() as *mut _, + namelen: daddr.len(), + lpBuffers: &mut data, + Control: ctrl, + dwBufferCount: 1, + dwFlags: 0, + }; + + // TODO: PKTINFO? + // TODO: WinSock::IP_ECN? + // TODO: WinSock::IPV6_ECN? + + let mut bytes_sent = 0; + let rc = unsafe { + WSASendMsg( + raw_socket as usize, + &wsa_msg, + 0, + &mut bytes_sent, + ptr::null_mut(), + None, + ) + }; + + log::debug!("WSASendMsg returned: {rc}"); + log::debug!("BYTES SENT: {bytes_sent}"); + + /*client_socket + .send_to(payload, return_addr) + .await + .map_err(Error::ClientWrite)?;*/ Ok(()) }; match fragments.handle_incoming_packet(payload) { Ok(DefragReceived::Nonfragmented(payload)) => { stats.rx(payload.len(), false); - send(payload.chunk()).await?; + + // TODO: coalesce multiple packets + + send(return_addr, payload.chunk()).await?; } Ok(DefragReceived::Reassembled(reassembled_payload)) => { stats.rx(original_payload_len, true); - send(reassembled_payload.chunk()).await?; + + // TODO: coalesce multiple packets + + send(return_addr, reassembled_payload.chunk()).await?; } Ok(DefragReceived::Fragment) => stats.rx(original_payload_len, true), Err(_) => (), |
