summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorJoakim Hulthe <joakim@hulthe.net>2024-03-04 14:54:20 +0100
committerJoakim Hulthe <joakim@hulthe.net>2024-03-20 16:50:15 +0100
commit8aee9e291eca2ce8dfa58c3c05b20e1207a8efb8 (patch)
treea8266601517d6ead93681f2d1db512e4fff5ebfd
parent7e62d03a4366fb8eaabb13dec354bb0237cd0d08 (diff)
downloadmullvadvpn-8aee9e291eca2ce8dfa58c3c05b20e1207a8efb8.tar.xz
mullvadvpn-8aee9e291eca2ce8dfa58c3c05b20e1207a8efb8.zip
Refactor forward_messages to use select macro
-rw-r--r--test/test-rpc/src/transport.rs57
1 files changed, 29 insertions, 28 deletions
diff --git a/test/test-rpc/src/transport.rs b/test/test-rpc/src/transport.rs
index b8086b4145..f5f4617026 100644
--- a/test/test-rpc/src/transport.rs
+++ b/test/test-rpc/src/transport.rs
@@ -1,5 +1,5 @@
use bytes::{Buf, BufMut, Bytes, BytesMut};
-use futures::{channel::mpsc, SinkExt, StreamExt};
+use futures::{channel::mpsc, FutureExt, SinkExt, StreamExt};
use serde::{de::DeserializeOwned, Serialize};
use std::{
fmt::Write,
@@ -256,13 +256,12 @@ async fn forward_messages<
let mut mullvad_daemon_forwarder = LengthDelimitedCodec::new().framed(mullvad_daemon_forwarder);
loop {
- match futures::future::select(
- futures::future::select(serial_stream.next(), handshaker.1.next()),
- futures::future::select(runner_forwarder.next(), mullvad_daemon_forwarder.next()),
- )
- .await
- {
- futures::future::Either::Left((futures::future::Either::Left((Some(frame), _)), _)) => {
+ futures::select! {
+ frame = serial_stream.next().fuse() => {
+ let Some(frame) = frame else {
+ break Ok(());
+ };
+
let frame = frame.map_err(ForwardError::SerialConnection)?;
//
@@ -294,7 +293,12 @@ async fn forward_messages<
}
}
}
- futures::future::Either::Left((futures::future::Either::Right((Some(()), _)), _)) => {
+
+ handshake = handshaker.1.next().fuse() => {
+ if handshake.is_none() {
+ break Ok(());
+ }
+
log::trace!("shake: send");
// Ping the other end
@@ -303,10 +307,12 @@ async fn forward_messages<
.await
.map_err(ForwardError::HandshakeError)?;
}
- futures::future::Either::Right((
- futures::future::Either::Left((Some(message), _)),
- _,
- )) => {
+
+ message = runner_forwarder.next().fuse() => {
+ let Some(message) = message else {
+ break Ok(());
+ };
+
let message = message.map_err(ForwardError::TestRunnerChannel)?;
//
@@ -321,10 +327,16 @@ async fn forward_messages<
.await
.map_err(ForwardError::SerialConnection)?;
}
- futures::future::Either::Right((
- futures::future::Either::Right((Some(data), _)),
- _,
- )) => {
+
+ data = mullvad_daemon_forwarder.next().fuse() => {
+ let Some(data) = data else {
+ //
+ // Force management interface socket to close
+ //
+ let _ = serial_stream.send(Frame::DaemonRpc(Bytes::new())).await;
+ break Ok(());
+ };
+
let data = data.map_err(ForwardError::DaemonChannel)?;
//
@@ -336,17 +348,6 @@ async fn forward_messages<
.await
.map_err(ForwardError::SerialConnection)?;
}
- futures::future::Either::Right((futures::future::Either::Right((None, _)), _)) => {
- //
- // Force management interface socket to close
- //
- let _ = serial_stream.send(Frame::DaemonRpc(Bytes::new())).await;
-
- break Ok(());
- }
- _ => {
- break Ok(());
- }
}
}
}