diff options
| -rw-r--r-- | .github/workflows/git-commit-message-style.yml | 2 | ||||
| -rw-r--r-- | talpid-core/src/split_tunnel/macos/tun.rs | 29 |
2 files changed, 18 insertions, 13 deletions
diff --git a/.github/workflows/git-commit-message-style.yml b/.github/workflows/git-commit-message-style.yml index effedd75ed..0d53849f50 100644 --- a/.github/workflows/git-commit-message-style.yml +++ b/.github/workflows/git-commit-message-style.yml @@ -32,4 +32,4 @@ jobs: # This action defaults to 50 char subjects, but 72 is fine. max-subject-line-length: '72' # The action's wordlist is a bit short. Add more accepted verbs - additional-verbs: 'tidy, wrap, obfuscate' + additional-verbs: 'tidy, wrap, obfuscate, bias' diff --git a/talpid-core/src/split_tunnel/macos/tun.rs b/talpid-core/src/split_tunnel/macos/tun.rs index 3bf828f167..7236f81ed4 100644 --- a/talpid-core/src/split_tunnel/macos/tun.rs +++ b/talpid-core/src/split_tunnel/macos/tun.rs @@ -240,6 +240,7 @@ fn redirect_packets_for_pktap_stream( let st_utun_name = st_tun_device.get_ref().name().to_owned(); let (abort_tx, abort_rx) = broadcast::channel(1); + let abort_read_rx = abort_tx.subscribe(); let ingress_task: tokio::task::JoinHandle<tun::AsyncDevice> = tokio::spawn(run_ingress_task( st_tun_device, @@ -247,6 +248,7 @@ fn redirect_packets_for_pktap_stream( read_buffer_size, vpn_interface.clone(), abort_rx, + abort_read_rx, )); let egress_abort_rx = abort_tx.subscribe(); @@ -284,6 +286,9 @@ fn open_default_bpf( default_dev .set_see_sent(false) .map_err(Error::ConfigDefaultBpf)?; + default_dev + .set_nonblocking(true) + .map_err(Error::ConfigDefaultBpf)?; // Split the default device BPF handle into a read and write half let (default_read, default_write) = default_dev.split().map_err(Error::ConfigDefaultBpf)?; @@ -300,6 +305,7 @@ async fn run_ingress_task( read_buffer_size: usize, vpn_interface: Option<VpnInterface>, mut abort_rx: broadcast::Receiver<()>, + mut abort_read_rx: broadcast::Receiver<()>, ) -> tun::AsyncDevice { let mut read_buffer = vec![0u8; read_buffer_size]; log::trace!("Default BPF reader buffer size: {:?}", read_buffer.len()); @@ -309,22 +315,20 @@ async fn run_ingress_task( let (mut tun_reader, mut tun_writer) = tokio::io::split(st_tun_device); - let mut abort_read_rx = abort_rx.resubscribe(); - // Swallow all data written to the tun by reading from it // Do this to prevent the read buffer from filling up and preventing writes let mut garbage: Vec<u8> = vec![0u8; 8 * 1024 * 1024]; let dummy_read = tokio::spawn(async move { loop { tokio::select! { + biased; Ok(()) | Err(_) = abort_read_rx.recv() => { + break; + } result = tun_reader.read(&mut garbage) => { if result.is_err() { break; } } - Ok(()) | Err(_) = abort_read_rx.recv() => { - break; - } } } tun_reader @@ -333,6 +337,9 @@ async fn run_ingress_task( // Write data incoming on the default interface to the ST utun let tun_writer = loop { tokio::select! { + biased; Ok(()) | Err(_) = abort_rx.recv() => { + break tun_writer; + } result = default_read.read(&mut read_buffer) => { let Ok(read_n) = result else { break tun_writer; @@ -344,9 +351,6 @@ async fn run_ingress_task( handle_incoming_data(&mut tun_writer, payload, vpn_v4, vpn_v6).await; } } - Ok(()) | Err(_) = abort_rx.recv() => { - break tun_writer; - } } }; @@ -381,6 +385,10 @@ async fn run_egress_task( loop { tokio::select! { + biased; Ok(()) | Err(_) = abort_rx.recv() => { + log::debug!("stopping packet processing"); + break Ok(EgressResult { pktap_stream, classify }); + } packet = pktap_stream.next() => { let mut packet = packet.ok_or_else(|| { log::debug!("packet stream closed"); @@ -395,10 +403,6 @@ async fn run_egress_task( classify_and_send(&classify, &mut packet, &default_interface, &mut default_write, vpn_device) } - Ok(()) | Err(_) = abort_rx.recv() => { - log::debug!("stopping packet processing"); - break Ok(EgressResult { pktap_stream, classify }); - } } } } @@ -411,6 +415,7 @@ fn open_vpn_bpf(vpn_interface: &VpnInterface) -> Result<bpf::Bpf, Error> { .map_err(Error::ConfigVpnBpf)?; vpn_dev.set_immediate(true).map_err(Error::ConfigVpnBpf)?; vpn_dev.set_see_sent(false).map_err(Error::ConfigVpnBpf)?; + vpn_dev.set_nonblocking(true).map_err(Error::ConfigVpnBpf)?; Ok(vpn_dev) } |
