summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorMarkus Pettersson <markus.pettersson@mullvad.net>2023-12-05 16:14:18 +0100
committerMarkus Pettersson <markus.pettersson@mullvad.net>2023-12-07 13:28:40 +0100
commit644b49b87428e0f62b278eca6940ae7b015304bc (patch)
tree3e71f10e6ad5c3250072d8aafdda7e49b4d20d28
parent0b914a8796c15f25bdb3eb28903bb6a2a60dc12d (diff)
downloadmullvadvpn-644b49b87428e0f62b278eca6940ae7b015304bc.tar.xz
mullvadvpn-644b49b87428e0f62b278eca6940ae7b015304bc.zip
Refactor `start_packet_monitor_for_interface`
Flatten the inner loop of `start_packet_monitor_for_interface` by using `tokio::select!` instead of composing `futures::select`
-rw-r--r--test/test-manager/src/network_monitor.rs66
1 files changed, 31 insertions, 35 deletions
diff --git a/test/test-manager/src/network_monitor.rs b/test/test-manager/src/network_monitor.rs
index 02c1e24d9e..7f74f0f425 100644
--- a/test/test-manager/src/network_monitor.rs
+++ b/test/test-manager/src/network_monitor.rs
@@ -4,11 +4,7 @@ use std::{
time::Duration,
};
-use futures::{
- channel::oneshot,
- future::{select, Either},
- pin_mut, StreamExt,
-};
+use futures::{channel::oneshot, pin_mut, StreamExt};
pub use pcap::Direction;
use pcap::PacketCodec;
use pnet_packet::{
@@ -243,7 +239,7 @@ async fn start_packet_monitor_for_interface(
no_frame: monitor_options.no_frame,
})
.unwrap();
- let (stop_tx, stop_rx) = oneshot::channel();
+ let (stop_tx, mut stop_rx) = oneshot::channel();
let interface = interface.to_owned();
@@ -261,9 +257,7 @@ async fn start_packet_monitor_for_interface(
futures::future::pending().await
}
};
-
pin_mut!(timeout);
- pin_mut!(stop_rx);
let mut is_receiving_tx = Some(is_receiving_tx);
@@ -272,40 +266,42 @@ async fn start_packet_monitor_for_interface(
let next_packet =
poll_fn(|ctx| poll_and_notify(ctx, &mut next_packet_fut, &mut is_receiving_tx));
- match select(select(next_packet, &mut stop_rx), &mut timeout).await {
- Either::Left((Either::Left((Some(Ok(packet)), _)), _)) => {
- if let Some(packet) = packet {
- if !filter_fn(&packet) {
- log::debug!(
- "{interface} \"{packet:?}\" does not match closure conditions"
- );
- monitor_result.discarded_packets =
- monitor_result.discarded_packets.saturating_add(1);
- } else {
- log::debug!("{interface} \"{packet:?}\" matches closure conditions");
+ tokio::select! {
+ _stop = &mut stop_rx => {
+ log::trace!("stopping packet monitor");
+ break Ok(monitor_result);
+ }
+ _timeout = &mut timeout => {
+ log::info!("monitor timed out");
+ break Ok(monitor_result);
+ }
+ maybe_next_packet = next_packet => {
+ match maybe_next_packet {
+ Some(Ok(packet))=> {
+ if let Some(packet) = packet {
+ if !filter_fn(&packet) {
+ log::debug!("{interface} \"{packet:?}\" does not match closure conditions");
+ monitor_result.discarded_packets =
+ monitor_result.discarded_packets.saturating_add(1);
+ } else {
+ log::debug!("{interface} \"{packet:?}\" matches closure conditions");
- let should_continue = should_continue_fn(&packet);
+ let should_continue = should_continue_fn(&packet);
- monitor_result.packets.push(packet);
+ monitor_result.packets.push(packet);
- if !should_continue {
- break Ok(monitor_result);
+ if !should_continue {
+ break Ok(monitor_result);
+ }
+ }
}
}
+ _ => {
+ log::error!("lost packet stream");
+ break Err(MonitorUnexpectedlyStopped(()));
+ }
}
}
- Either::Left((Either::Left(_), _)) => {
- log::error!("lost packet stream");
- break Err(MonitorUnexpectedlyStopped(()));
- }
- Either::Left((Either::Right(_), _)) => {
- log::trace!("stopping packet monitor");
- break Ok(monitor_result);
- }
- Either::Right(_) => {
- log::info!("monitor timed out");
- break Ok(monitor_result);
- }
}
}
});