summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--mullvad-masque-proxy/src/server/mod.rs51
1 files changed, 35 insertions, 16 deletions
diff --git a/mullvad-masque-proxy/src/server/mod.rs b/mullvad-masque-proxy/src/server/mod.rs
index af1cb3e1d4..6ce22423ce 100644
--- a/mullvad-masque-proxy/src/server/mod.rs
+++ b/mullvad-masque-proxy/src/server/mod.rs
@@ -7,7 +7,7 @@ use std::{
};
use anyhow::{anyhow, ensure, Context};
-use bytes::{Bytes, BytesMut};
+use bytes::{BufMut, Bytes, BytesMut};
use h3::{
proto::varint::VarInt,
quic::{BidiStream, StreamId},
@@ -233,8 +233,13 @@ impl Server {
let (client_tx, client_rx) = mpsc::channel(MAX_INFLIGHT_PACKETS);
let (send_tx, send_rx) = mpsc::channel(MAX_INFLIGHT_PACKETS);
- let mut connection_task =
- task::spawn(connection_task(stream_id, http_conn, send_rx, client_tx));
+ let mut connection_task = task::spawn(connection_task(
+ stream_id,
+ http_conn,
+ quic_conn.clone(),
+ send_rx,
+ client_tx,
+ ));
let mut proxy_rx_task = task::spawn(proxy_rx_task(
stream_id,
quic_conn,
@@ -263,21 +268,14 @@ impl Server {
async fn connection_task(
stream_id: StreamId,
mut connection: Connection<h3_quinn::Connection, Bytes>,
+ quinn_conn: quinn::Connection,
mut send_rx: mpsc::Receiver<Bytes>,
client_tx: mpsc::Sender<Datagram>,
) -> anyhow::Result<()> {
- loop {
- tokio::select! {
- outgoing_packet = send_rx.recv() => {
- let Some(outgoing_packet) = outgoing_packet else {
- break; // sender is gone
- };
-
- // TODO: is this blocking?
- connection.send_datagram(stream_id, outgoing_packet)
- .context("Error sending QUIC datagram to client")?;
- }
- incoming_packet = connection.read_datagram() => match incoming_packet {
+ let mut incoming_handler = tokio::spawn(async move {
+ loop {
+ let incoming_packet = connection.read_datagram().await;
+ match incoming_packet {
Ok(Some(received_packet)) => {
ensure!(
received_packet.stream_id() == stream_id,
@@ -292,7 +290,28 @@ async fn connection_task(
Err(err) => {
return Err(err).context("Error reading QUIC datagram from client");
}
- },
+ }
+ }
+ Ok(())
+ });
+
+ loop {
+ tokio::select! {
+ outgoing_packet = send_rx.recv() => {
+ let Some(outgoing_packet) = outgoing_packet else {
+ break; // sender is gone
+ };
+
+ // add stream id
+ let mut buf = BytesMut::new();
+ (VarInt::from(stream_id) / 4).encode(&mut buf);
+ buf.put(outgoing_packet);
+
+ quinn_conn.send_datagram(buf.freeze())
+ .context("Error sending QUIC datagram to client")?;
+ }
+ Ok(incoming_packet_result) = &mut incoming_handler => return incoming_packet_result,
+ else => break,
}
}