summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorJoakim Hulthe <joakim.hulthe@mullvad.net>2025-04-16 18:31:47 +0200
committerJoakim Hulthe <joakim.hulthe@mullvad.net>2025-04-16 18:31:47 +0200
commitd8800a37f24cd175f01fcfa52dea9a116307f7c4 (patch)
treed4ae917509c5f5ee545b31301c0edf9b81a2f1a3
parent737ab3a8202ce02924d7b855970d3dd22fc42a26 (diff)
downloadmullvadvpn-d8800a37f24cd175f01fcfa52dea9a116307f7c4.tar.xz
mullvadvpn-d8800a37f24cd175f01fcfa52dea9a116307f7c4.zip
Split masque-proxy connection_task into twomasque-backpressure
-rw-r--r--mullvad-masque-proxy/src/server/mod.rs85
1 files changed, 45 insertions, 40 deletions
diff --git a/mullvad-masque-proxy/src/server/mod.rs b/mullvad-masque-proxy/src/server/mod.rs
index 21d497bbc4..4ad4b64c8d 100644
--- a/mullvad-masque-proxy/src/server/mod.rs
+++ b/mullvad-masque-proxy/src/server/mod.rs
@@ -183,13 +183,10 @@ impl Server {
let (client_tx, client_rx) = mpsc::channel(MAX_INFLIGHT_PACKETS);
let (send_tx, send_rx) = mpsc::channel(MAX_INFLIGHT_PACKETS);
- let mut connection_task = task::spawn(connection_task(
- stream_id,
- connection,
- quinn_conn.clone(),
- send_rx,
- client_tx,
- ));
+ let mut connection_rx_task =
+ task::spawn(connection_rx_task(stream_id, connection, client_tx));
+ let mut connection_tx_task =
+ task::spawn(connection_tx_task(stream_id, quinn_conn.clone(), send_rx));
let mut proxy_rx_task = task::spawn(proxy_rx_task(
stream_id,
quinn_conn,
@@ -201,59 +198,67 @@ impl Server {
let mut proxy_tx_task = task::spawn(proxy_tx_task(udp_socket, client_rx));
select! {
- _ = &mut connection_task => {}
+ _ = &mut connection_rx_task => {}
+ _ = &mut connection_tx_task => {}
_ = &mut proxy_rx_task => {}
_ = &mut proxy_tx_task => {}
}
- connection_task.abort();
+ connection_rx_task.abort();
+ connection_tx_task.abort();
proxy_rx_task.abort();
proxy_tx_task.abort();
// TODO: stream.finish()?
}
}
+async fn connection_tx_task(
+ stream_id: StreamId,
+ quinn_conn: quinn::Connection,
+ mut send_rx: mpsc::Receiver<Bytes>,
+) -> anyhow::Result<()> {
+ loop {
+ let outgoing_packet = send_rx.recv().await;
+ let Some(outgoing_packet) = outgoing_packet else {
+ break; // sender is gone
+ };
+
+ let mut buf = BytesMut::new();
+ (VarInt::from(stream_id) / 4).encode(&mut buf);
+ buf.put(outgoing_packet);
+ quinn_conn
+ .send_datagram_wait(buf.freeze())
+ .await
+ .context("Error sending QUIC datagram to client")?;
+ }
+
+ Ok(())
+}
/// Forward packets from `send_rx` to `connection`, and from `connection` to `client_tx`.
-async fn connection_task(
+async fn connection_rx_task(
stream_id: StreamId,
mut connection: Connection<h3_quinn::Connection, Bytes>,
- quinn_conn: quinn::Connection,
- mut send_rx: mpsc::Receiver<Bytes>,
client_tx: mpsc::Sender<Datagram>,
) -> anyhow::Result<()> {
loop {
// TODO: split into two tasks
- tokio::select! {
- outgoing_packet = send_rx.recv() => {
- let Some(outgoing_packet) = outgoing_packet else {
- break; // sender is gone
- };
-
- let mut buf = BytesMut::new();
- (VarInt::from(stream_id) / 4).encode(&mut buf);
- buf.put(outgoing_packet);
- quinn_conn
- .send_datagram_wait(buf.freeze())
- .await
- .context("Error sending QUIC datagram to client")?;
- }
- incoming_packet = connection.read_datagram() => match incoming_packet {
- Ok(Some(received_packet)) => {
- ensure!(
- received_packet.stream_id() == stream_id,
- "Received unexpected stream ID from client",
- );
+ let incoming_packet = connection.read_datagram().await;
+ match incoming_packet {
+ Ok(Some(received_packet)) => {
+ ensure!(
+ received_packet.stream_id() == stream_id,
+ "Received unexpected stream ID from client",
+ );
- if client_tx.send(received_packet).await.is_err() {
- break; // receiver is gone
- }
- }
- Ok(None) => break, // EOF
- Err(err) => {
- return Err(err).context("Error reading QUIC datagram from client");
+ if client_tx.send(received_packet).await.is_err() {
+ break; // receiver is gone
}
- },
+ }
+ Ok(None) => break, // EOF
+ Err(err) => {
+ return Err(err).context("Error reading QUIC datagram from client");
+ }
}
}