From d8800a37f24cd175f01fcfa52dea9a116307f7c4 Mon Sep 17 00:00:00 2001 From: Joakim Hulthe Date: Wed, 16 Apr 2025 18:31:47 +0200 Subject: Split masque-proxy connection_task into two --- mullvad-masque-proxy/src/server/mod.rs | 87 ++++++++++++++++++---------------- 1 file changed, 46 insertions(+), 41 deletions(-) diff --git a/mullvad-masque-proxy/src/server/mod.rs b/mullvad-masque-proxy/src/server/mod.rs index 21d497bbc4..4ad4b64c8d 100644 --- a/mullvad-masque-proxy/src/server/mod.rs +++ b/mullvad-masque-proxy/src/server/mod.rs @@ -183,13 +183,10 @@ impl Server { let (client_tx, client_rx) = mpsc::channel(MAX_INFLIGHT_PACKETS); let (send_tx, send_rx) = mpsc::channel(MAX_INFLIGHT_PACKETS); - let mut connection_task = task::spawn(connection_task( - stream_id, - connection, - quinn_conn.clone(), - send_rx, - client_tx, - )); + let mut connection_rx_task = + task::spawn(connection_rx_task(stream_id, connection, client_tx)); + let mut connection_tx_task = + task::spawn(connection_tx_task(stream_id, quinn_conn.clone(), send_rx)); let mut proxy_rx_task = task::spawn(proxy_rx_task( stream_id, quinn_conn, @@ -201,59 +198,67 @@ impl Server { let mut proxy_tx_task = task::spawn(proxy_tx_task(udp_socket, client_rx)); select! { - _ = &mut connection_task => {} + _ = &mut connection_rx_task => {} + _ = &mut connection_tx_task => {} _ = &mut proxy_rx_task => {} _ = &mut proxy_tx_task => {} } - connection_task.abort(); + connection_rx_task.abort(); + connection_tx_task.abort(); proxy_rx_task.abort(); proxy_tx_task.abort(); // TODO: stream.finish()? } } +async fn connection_tx_task( + stream_id: StreamId, + quinn_conn: quinn::Connection, + mut send_rx: mpsc::Receiver, +) -> anyhow::Result<()> { + loop { + let outgoing_packet = send_rx.recv().await; + let Some(outgoing_packet) = outgoing_packet else { + break; // sender is gone + }; + + let mut buf = BytesMut::new(); + (VarInt::from(stream_id) / 4).encode(&mut buf); + buf.put(outgoing_packet); + quinn_conn + .send_datagram_wait(buf.freeze()) + .await + .context("Error sending QUIC datagram to client")?; + } + + Ok(()) +} /// Forward packets from `send_rx` to `connection`, and from `connection` to `client_tx`. -async fn connection_task( +async fn connection_rx_task( stream_id: StreamId, mut connection: Connection, - quinn_conn: quinn::Connection, - mut send_rx: mpsc::Receiver, client_tx: mpsc::Sender, ) -> anyhow::Result<()> { loop { // TODO: split into two tasks - tokio::select! { - outgoing_packet = send_rx.recv() => { - let Some(outgoing_packet) = outgoing_packet else { - break; // sender is gone - }; - - let mut buf = BytesMut::new(); - (VarInt::from(stream_id) / 4).encode(&mut buf); - buf.put(outgoing_packet); - quinn_conn - .send_datagram_wait(buf.freeze()) - .await - .context("Error sending QUIC datagram to client")?; - } - incoming_packet = connection.read_datagram() => match incoming_packet { - Ok(Some(received_packet)) => { - ensure!( - received_packet.stream_id() == stream_id, - "Received unexpected stream ID from client", - ); - - if client_tx.send(received_packet).await.is_err() { - break; // receiver is gone - } - } - Ok(None) => break, // EOF - Err(err) => { - return Err(err).context("Error reading QUIC datagram from client"); + let incoming_packet = connection.read_datagram().await; + match incoming_packet { + Ok(Some(received_packet)) => { + ensure!( + received_packet.stream_id() == stream_id, + "Received unexpected stream ID from client", + ); + + if client_tx.send(received_packet).await.is_err() { + break; // receiver is gone } - }, + } + Ok(None) => break, // EOF + Err(err) => { + return Err(err).context("Error reading QUIC datagram from client"); + } } } -- cgit v1.3-3-g829e