diff options
| author | Markus Pettersson <markus.pettersson@mullvad.net> | 2023-12-05 16:14:18 +0100 |
|---|---|---|
| committer | Markus Pettersson <markus.pettersson@mullvad.net> | 2023-12-07 13:28:40 +0100 |
| commit | 644b49b87428e0f62b278eca6940ae7b015304bc (patch) | |
| tree | 3e71f10e6ad5c3250072d8aafdda7e49b4d20d28 | |
| parent | 0b914a8796c15f25bdb3eb28903bb6a2a60dc12d (diff) | |
| download | mullvadvpn-644b49b87428e0f62b278eca6940ae7b015304bc.tar.xz mullvadvpn-644b49b87428e0f62b278eca6940ae7b015304bc.zip | |
Refactor `start_packet_monitor_for_interface`
Flatten the inner loop of `start_packet_monitor_for_interface` by using
`tokio::select!` instead of composing `futures::select`
| -rw-r--r-- | test/test-manager/src/network_monitor.rs | 66 |
1 files changed, 31 insertions, 35 deletions
diff --git a/test/test-manager/src/network_monitor.rs b/test/test-manager/src/network_monitor.rs index 02c1e24d9e..7f74f0f425 100644 --- a/test/test-manager/src/network_monitor.rs +++ b/test/test-manager/src/network_monitor.rs @@ -4,11 +4,7 @@ use std::{ time::Duration, }; -use futures::{ - channel::oneshot, - future::{select, Either}, - pin_mut, StreamExt, -}; +use futures::{channel::oneshot, pin_mut, StreamExt}; pub use pcap::Direction; use pcap::PacketCodec; use pnet_packet::{ @@ -243,7 +239,7 @@ async fn start_packet_monitor_for_interface( no_frame: monitor_options.no_frame, }) .unwrap(); - let (stop_tx, stop_rx) = oneshot::channel(); + let (stop_tx, mut stop_rx) = oneshot::channel(); let interface = interface.to_owned(); @@ -261,9 +257,7 @@ async fn start_packet_monitor_for_interface( futures::future::pending().await } }; - pin_mut!(timeout); - pin_mut!(stop_rx); let mut is_receiving_tx = Some(is_receiving_tx); @@ -272,40 +266,42 @@ async fn start_packet_monitor_for_interface( let next_packet = poll_fn(|ctx| poll_and_notify(ctx, &mut next_packet_fut, &mut is_receiving_tx)); - match select(select(next_packet, &mut stop_rx), &mut timeout).await { - Either::Left((Either::Left((Some(Ok(packet)), _)), _)) => { - if let Some(packet) = packet { - if !filter_fn(&packet) { - log::debug!( - "{interface} \"{packet:?}\" does not match closure conditions" - ); - monitor_result.discarded_packets = - monitor_result.discarded_packets.saturating_add(1); - } else { - log::debug!("{interface} \"{packet:?}\" matches closure conditions"); + tokio::select! { + _stop = &mut stop_rx => { + log::trace!("stopping packet monitor"); + break Ok(monitor_result); + } + _timeout = &mut timeout => { + log::info!("monitor timed out"); + break Ok(monitor_result); + } + maybe_next_packet = next_packet => { + match maybe_next_packet { + Some(Ok(packet))=> { + if let Some(packet) = packet { + if !filter_fn(&packet) { + log::debug!("{interface} \"{packet:?}\" does not match closure conditions"); + monitor_result.discarded_packets = + monitor_result.discarded_packets.saturating_add(1); + } else { + log::debug!("{interface} \"{packet:?}\" matches closure conditions"); - let should_continue = should_continue_fn(&packet); + let should_continue = should_continue_fn(&packet); - monitor_result.packets.push(packet); + monitor_result.packets.push(packet); - if !should_continue { - break Ok(monitor_result); + if !should_continue { + break Ok(monitor_result); + } + } } } + _ => { + log::error!("lost packet stream"); + break Err(MonitorUnexpectedlyStopped(())); + } } } - Either::Left((Either::Left(_), _)) => { - log::error!("lost packet stream"); - break Err(MonitorUnexpectedlyStopped(())); - } - Either::Left((Either::Right(_), _)) => { - log::trace!("stopping packet monitor"); - break Ok(monitor_result); - } - Either::Right(_) => { - log::info!("monitor timed out"); - break Ok(monitor_result); - } } } }); |
