diff options
| -rw-r--r-- | mullvad-masque-proxy/src/server/mod.rs | 51 |
1 files changed, 35 insertions, 16 deletions
diff --git a/mullvad-masque-proxy/src/server/mod.rs b/mullvad-masque-proxy/src/server/mod.rs index af1cb3e1d4..6ce22423ce 100644 --- a/mullvad-masque-proxy/src/server/mod.rs +++ b/mullvad-masque-proxy/src/server/mod.rs @@ -7,7 +7,7 @@ use std::{ }; use anyhow::{anyhow, ensure, Context}; -use bytes::{Bytes, BytesMut}; +use bytes::{BufMut, Bytes, BytesMut}; use h3::{ proto::varint::VarInt, quic::{BidiStream, StreamId}, @@ -233,8 +233,13 @@ impl Server { let (client_tx, client_rx) = mpsc::channel(MAX_INFLIGHT_PACKETS); let (send_tx, send_rx) = mpsc::channel(MAX_INFLIGHT_PACKETS); - let mut connection_task = - task::spawn(connection_task(stream_id, http_conn, send_rx, client_tx)); + let mut connection_task = task::spawn(connection_task( + stream_id, + http_conn, + quic_conn.clone(), + send_rx, + client_tx, + )); let mut proxy_rx_task = task::spawn(proxy_rx_task( stream_id, quic_conn, @@ -263,21 +268,14 @@ impl Server { async fn connection_task( stream_id: StreamId, mut connection: Connection<h3_quinn::Connection, Bytes>, + quinn_conn: quinn::Connection, mut send_rx: mpsc::Receiver<Bytes>, client_tx: mpsc::Sender<Datagram>, ) -> anyhow::Result<()> { - loop { - tokio::select! { - outgoing_packet = send_rx.recv() => { - let Some(outgoing_packet) = outgoing_packet else { - break; // sender is gone - }; - - // TODO: is this blocking? - connection.send_datagram(stream_id, outgoing_packet) - .context("Error sending QUIC datagram to client")?; - } - incoming_packet = connection.read_datagram() => match incoming_packet { + let mut incoming_handler = tokio::spawn(async move { + loop { + let incoming_packet = connection.read_datagram().await; + match incoming_packet { Ok(Some(received_packet)) => { ensure!( received_packet.stream_id() == stream_id, @@ -292,7 +290,28 @@ async fn connection_task( Err(err) => { return Err(err).context("Error reading QUIC datagram from client"); } - }, + } + } + Ok(()) + }); + + loop { + tokio::select! { + outgoing_packet = send_rx.recv() => { + let Some(outgoing_packet) = outgoing_packet else { + break; // sender is gone + }; + + // add stream id + let mut buf = BytesMut::new(); + (VarInt::from(stream_id) / 4).encode(&mut buf); + buf.put(outgoing_packet); + + quinn_conn.send_datagram(buf.freeze()) + .context("Error sending QUIC datagram to client")?; + } + Ok(incoming_packet_result) = &mut incoming_handler => return incoming_packet_result, + else => break, } } |
