diff options
| author | Joakim Hulthe <joakim@hulthe.net> | 2024-03-04 14:54:20 +0100 |
|---|---|---|
| committer | Joakim Hulthe <joakim@hulthe.net> | 2024-03-20 16:50:15 +0100 |
| commit | 8aee9e291eca2ce8dfa58c3c05b20e1207a8efb8 (patch) | |
| tree | a8266601517d6ead93681f2d1db512e4fff5ebfd /test | |
| parent | 7e62d03a4366fb8eaabb13dec354bb0237cd0d08 (diff) | |
| download | mullvadvpn-8aee9e291eca2ce8dfa58c3c05b20e1207a8efb8.tar.xz mullvadvpn-8aee9e291eca2ce8dfa58c3c05b20e1207a8efb8.zip | |
Refactor forward_messages to use select macro
Diffstat (limited to 'test')
| -rw-r--r-- | test/test-rpc/src/transport.rs | 57 |
1 files changed, 29 insertions, 28 deletions
diff --git a/test/test-rpc/src/transport.rs b/test/test-rpc/src/transport.rs index b8086b4145..f5f4617026 100644 --- a/test/test-rpc/src/transport.rs +++ b/test/test-rpc/src/transport.rs @@ -1,5 +1,5 @@ use bytes::{Buf, BufMut, Bytes, BytesMut}; -use futures::{channel::mpsc, SinkExt, StreamExt}; +use futures::{channel::mpsc, FutureExt, SinkExt, StreamExt}; use serde::{de::DeserializeOwned, Serialize}; use std::{ fmt::Write, @@ -256,13 +256,12 @@ async fn forward_messages< let mut mullvad_daemon_forwarder = LengthDelimitedCodec::new().framed(mullvad_daemon_forwarder); loop { - match futures::future::select( - futures::future::select(serial_stream.next(), handshaker.1.next()), - futures::future::select(runner_forwarder.next(), mullvad_daemon_forwarder.next()), - ) - .await - { - futures::future::Either::Left((futures::future::Either::Left((Some(frame), _)), _)) => { + futures::select! { + frame = serial_stream.next().fuse() => { + let Some(frame) = frame else { + break Ok(()); + }; + let frame = frame.map_err(ForwardError::SerialConnection)?; // @@ -294,7 +293,12 @@ async fn forward_messages< } } } - futures::future::Either::Left((futures::future::Either::Right((Some(()), _)), _)) => { + + handshake = handshaker.1.next().fuse() => { + if handshake.is_none() { + break Ok(()); + } + log::trace!("shake: send"); // Ping the other end @@ -303,10 +307,12 @@ async fn forward_messages< .await .map_err(ForwardError::HandshakeError)?; } - futures::future::Either::Right(( - futures::future::Either::Left((Some(message), _)), - _, - )) => { + + message = runner_forwarder.next().fuse() => { + let Some(message) = message else { + break Ok(()); + }; + let message = message.map_err(ForwardError::TestRunnerChannel)?; // @@ -321,10 +327,16 @@ async fn forward_messages< .await .map_err(ForwardError::SerialConnection)?; } - futures::future::Either::Right(( - futures::future::Either::Right((Some(data), _)), - _, - )) => { + + data = mullvad_daemon_forwarder.next().fuse() => { + let Some(data) = data else { + // + // Force management interface socket to close + // + let _ = serial_stream.send(Frame::DaemonRpc(Bytes::new())).await; + break Ok(()); + }; + let data = data.map_err(ForwardError::DaemonChannel)?; // @@ -336,17 +348,6 @@ async fn forward_messages< .await .map_err(ForwardError::SerialConnection)?; } - futures::future::Either::Right((futures::future::Either::Right((None, _)), _)) => { - // - // Force management interface socket to close - // - let _ = serial_stream.send(Frame::DaemonRpc(Bytes::new())).await; - - break Ok(()); - } - _ => { - break Ok(()); - } } } } |
