diff options
| author | Joakim Hulthe <joakim.hulthe@mullvad.net> | 2025-04-16 18:31:47 +0200 |
|---|---|---|
| committer | Joakim Hulthe <joakim.hulthe@mullvad.net> | 2025-04-16 18:31:47 +0200 |
| commit | d8800a37f24cd175f01fcfa52dea9a116307f7c4 (patch) | |
| tree | d4ae917509c5f5ee545b31301c0edf9b81a2f1a3 | |
| parent | 737ab3a8202ce02924d7b855970d3dd22fc42a26 (diff) | |
| download | mullvadvpn-d8800a37f24cd175f01fcfa52dea9a116307f7c4.tar.xz mullvadvpn-d8800a37f24cd175f01fcfa52dea9a116307f7c4.zip | |
Split masque-proxy connection_task into twomasque-backpressure
| -rw-r--r-- | mullvad-masque-proxy/src/server/mod.rs | 85 |
1 files changed, 45 insertions, 40 deletions
diff --git a/mullvad-masque-proxy/src/server/mod.rs b/mullvad-masque-proxy/src/server/mod.rs index 21d497bbc4..4ad4b64c8d 100644 --- a/mullvad-masque-proxy/src/server/mod.rs +++ b/mullvad-masque-proxy/src/server/mod.rs @@ -183,13 +183,10 @@ impl Server { let (client_tx, client_rx) = mpsc::channel(MAX_INFLIGHT_PACKETS); let (send_tx, send_rx) = mpsc::channel(MAX_INFLIGHT_PACKETS); - let mut connection_task = task::spawn(connection_task( - stream_id, - connection, - quinn_conn.clone(), - send_rx, - client_tx, - )); + let mut connection_rx_task = + task::spawn(connection_rx_task(stream_id, connection, client_tx)); + let mut connection_tx_task = + task::spawn(connection_tx_task(stream_id, quinn_conn.clone(), send_rx)); let mut proxy_rx_task = task::spawn(proxy_rx_task( stream_id, quinn_conn, @@ -201,59 +198,67 @@ impl Server { let mut proxy_tx_task = task::spawn(proxy_tx_task(udp_socket, client_rx)); select! { - _ = &mut connection_task => {} + _ = &mut connection_rx_task => {} + _ = &mut connection_tx_task => {} _ = &mut proxy_rx_task => {} _ = &mut proxy_tx_task => {} } - connection_task.abort(); + connection_rx_task.abort(); + connection_tx_task.abort(); proxy_rx_task.abort(); proxy_tx_task.abort(); // TODO: stream.finish()? } } +async fn connection_tx_task( + stream_id: StreamId, + quinn_conn: quinn::Connection, + mut send_rx: mpsc::Receiver<Bytes>, +) -> anyhow::Result<()> { + loop { + let outgoing_packet = send_rx.recv().await; + let Some(outgoing_packet) = outgoing_packet else { + break; // sender is gone + }; + + let mut buf = BytesMut::new(); + (VarInt::from(stream_id) / 4).encode(&mut buf); + buf.put(outgoing_packet); + quinn_conn + .send_datagram_wait(buf.freeze()) + .await + .context("Error sending QUIC datagram to client")?; + } + + Ok(()) +} /// Forward packets from `send_rx` to `connection`, and from `connection` to `client_tx`. -async fn connection_task( +async fn connection_rx_task( stream_id: StreamId, mut connection: Connection<h3_quinn::Connection, Bytes>, - quinn_conn: quinn::Connection, - mut send_rx: mpsc::Receiver<Bytes>, client_tx: mpsc::Sender<Datagram>, ) -> anyhow::Result<()> { loop { // TODO: split into two tasks - tokio::select! { - outgoing_packet = send_rx.recv() => { - let Some(outgoing_packet) = outgoing_packet else { - break; // sender is gone - }; - - let mut buf = BytesMut::new(); - (VarInt::from(stream_id) / 4).encode(&mut buf); - buf.put(outgoing_packet); - quinn_conn - .send_datagram_wait(buf.freeze()) - .await - .context("Error sending QUIC datagram to client")?; - } - incoming_packet = connection.read_datagram() => match incoming_packet { - Ok(Some(received_packet)) => { - ensure!( - received_packet.stream_id() == stream_id, - "Received unexpected stream ID from client", - ); + let incoming_packet = connection.read_datagram().await; + match incoming_packet { + Ok(Some(received_packet)) => { + ensure!( + received_packet.stream_id() == stream_id, + "Received unexpected stream ID from client", + ); - if client_tx.send(received_packet).await.is_err() { - break; // receiver is gone - } - } - Ok(None) => break, // EOF - Err(err) => { - return Err(err).context("Error reading QUIC datagram from client"); + if client_tx.send(received_packet).await.is_err() { + break; // receiver is gone } - }, + } + Ok(None) => break, // EOF + Err(err) => { + return Err(err).context("Error reading QUIC datagram from client"); + } } } |
