summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--.github/workflows/git-commit-message-style.yml2
-rw-r--r--talpid-core/src/split_tunnel/macos/tun.rs29
2 files changed, 18 insertions, 13 deletions
diff --git a/.github/workflows/git-commit-message-style.yml b/.github/workflows/git-commit-message-style.yml
index effedd75ed..0d53849f50 100644
--- a/.github/workflows/git-commit-message-style.yml
+++ b/.github/workflows/git-commit-message-style.yml
@@ -32,4 +32,4 @@ jobs:
# This action defaults to 50 char subjects, but 72 is fine.
max-subject-line-length: '72'
# The action's wordlist is a bit short. Add more accepted verbs
- additional-verbs: 'tidy, wrap, obfuscate'
+ additional-verbs: 'tidy, wrap, obfuscate, bias'
diff --git a/talpid-core/src/split_tunnel/macos/tun.rs b/talpid-core/src/split_tunnel/macos/tun.rs
index 3bf828f167..7236f81ed4 100644
--- a/talpid-core/src/split_tunnel/macos/tun.rs
+++ b/talpid-core/src/split_tunnel/macos/tun.rs
@@ -240,6 +240,7 @@ fn redirect_packets_for_pktap_stream(
let st_utun_name = st_tun_device.get_ref().name().to_owned();
let (abort_tx, abort_rx) = broadcast::channel(1);
+ let abort_read_rx = abort_tx.subscribe();
let ingress_task: tokio::task::JoinHandle<tun::AsyncDevice> = tokio::spawn(run_ingress_task(
st_tun_device,
@@ -247,6 +248,7 @@ fn redirect_packets_for_pktap_stream(
read_buffer_size,
vpn_interface.clone(),
abort_rx,
+ abort_read_rx,
));
let egress_abort_rx = abort_tx.subscribe();
@@ -284,6 +286,9 @@ fn open_default_bpf(
default_dev
.set_see_sent(false)
.map_err(Error::ConfigDefaultBpf)?;
+ default_dev
+ .set_nonblocking(true)
+ .map_err(Error::ConfigDefaultBpf)?;
// Split the default device BPF handle into a read and write half
let (default_read, default_write) = default_dev.split().map_err(Error::ConfigDefaultBpf)?;
@@ -300,6 +305,7 @@ async fn run_ingress_task(
read_buffer_size: usize,
vpn_interface: Option<VpnInterface>,
mut abort_rx: broadcast::Receiver<()>,
+ mut abort_read_rx: broadcast::Receiver<()>,
) -> tun::AsyncDevice {
let mut read_buffer = vec![0u8; read_buffer_size];
log::trace!("Default BPF reader buffer size: {:?}", read_buffer.len());
@@ -309,22 +315,20 @@ async fn run_ingress_task(
let (mut tun_reader, mut tun_writer) = tokio::io::split(st_tun_device);
- let mut abort_read_rx = abort_rx.resubscribe();
-
// Swallow all data written to the tun by reading from it
// Do this to prevent the read buffer from filling up and preventing writes
let mut garbage: Vec<u8> = vec![0u8; 8 * 1024 * 1024];
let dummy_read = tokio::spawn(async move {
loop {
tokio::select! {
+ biased; Ok(()) | Err(_) = abort_read_rx.recv() => {
+ break;
+ }
result = tun_reader.read(&mut garbage) => {
if result.is_err() {
break;
}
}
- Ok(()) | Err(_) = abort_read_rx.recv() => {
- break;
- }
}
}
tun_reader
@@ -333,6 +337,9 @@ async fn run_ingress_task(
// Write data incoming on the default interface to the ST utun
let tun_writer = loop {
tokio::select! {
+ biased; Ok(()) | Err(_) = abort_rx.recv() => {
+ break tun_writer;
+ }
result = default_read.read(&mut read_buffer) => {
let Ok(read_n) = result else {
break tun_writer;
@@ -344,9 +351,6 @@ async fn run_ingress_task(
handle_incoming_data(&mut tun_writer, payload, vpn_v4, vpn_v6).await;
}
}
- Ok(()) | Err(_) = abort_rx.recv() => {
- break tun_writer;
- }
}
};
@@ -381,6 +385,10 @@ async fn run_egress_task(
loop {
tokio::select! {
+ biased; Ok(()) | Err(_) = abort_rx.recv() => {
+ log::debug!("stopping packet processing");
+ break Ok(EgressResult { pktap_stream, classify });
+ }
packet = pktap_stream.next() => {
let mut packet = packet.ok_or_else(|| {
log::debug!("packet stream closed");
@@ -395,10 +403,6 @@ async fn run_egress_task(
classify_and_send(&classify, &mut packet, &default_interface, &mut default_write, vpn_device)
}
- Ok(()) | Err(_) = abort_rx.recv() => {
- log::debug!("stopping packet processing");
- break Ok(EgressResult { pktap_stream, classify });
- }
}
}
}
@@ -411,6 +415,7 @@ fn open_vpn_bpf(vpn_interface: &VpnInterface) -> Result<bpf::Bpf, Error> {
.map_err(Error::ConfigVpnBpf)?;
vpn_dev.set_immediate(true).map_err(Error::ConfigVpnBpf)?;
vpn_dev.set_see_sent(false).map_err(Error::ConfigVpnBpf)?;
+ vpn_dev.set_nonblocking(true).map_err(Error::ConfigVpnBpf)?;
Ok(vpn_dev)
}