summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--mullvad-masque-proxy/src/client/mod.rs23
-rw-r--r--mullvad-masque-proxy/src/fragment.rs30
-rw-r--r--mullvad-masque-proxy/src/server/mod.rs8
-rw-r--r--talpid-wireguard/src/ephemeral.rs30
-rw-r--r--talpid-wireguard/src/lib.rs21
-rw-r--r--talpid-wireguard/src/obfuscation.rs34
-rw-r--r--tunnel-obfuscation/src/quic.rs4
7 files changed, 103 insertions, 47 deletions
diff --git a/mullvad-masque-proxy/src/client/mod.rs b/mullvad-masque-proxy/src/client/mod.rs
index 6163c63d25..e9d19fabf0 100644
--- a/mullvad-masque-proxy/src/client/mod.rs
+++ b/mullvad-masque-proxy/src/client/mod.rs
@@ -28,7 +28,7 @@ use quinn::{
use crate::{
MASQUE_WELL_KNOWN_PATH, MAX_INFLIGHT_PACKETS, MIN_IPV4_MTU, MIN_IPV6_MTU, QUIC_HEADER_SIZE,
compute_udp_payload_size,
- fragment::{self, Fragments},
+ fragment::{self, DefragReceived, Fragments},
stats::Stats,
};
@@ -553,14 +553,27 @@ async fn client_socket_tx_task(
continue;
}
let payload = response.into_payload();
+ let original_payload_len = payload.len();
- if let Ok(Some(payload)) = fragments.handle_incoming_packet(payload) {
- stats.rx(payload.len(), false /* TODO */);
-
+ let send = async |payload: &[u8]| -> Result<()> {
client_socket
- .send_to(payload.chunk(), return_addr)
+ .send_to(payload, return_addr)
.await
.map_err(Error::ClientWrite)?;
+ Ok(())
+ };
+
+ match fragments.handle_incoming_packet(payload) {
+ Ok(DefragReceived::Nonfragmented(payload)) => {
+ stats.rx(payload.len(), false);
+ send(payload.chunk()).await?;
+ }
+ Ok(DefragReceived::Reassembled(reassembled_payload)) => {
+ stats.rx(original_payload_len, true);
+ send(reassembled_payload.chunk()).await?;
+ }
+ Ok(DefragReceived::Fragment) => stats.rx(original_payload_len, true),
+ Err(_) => (),
}
}
diff --git a/mullvad-masque-proxy/src/fragment.rs b/mullvad-masque-proxy/src/fragment.rs
index f6e648af9e..534fb8bff5 100644
--- a/mullvad-masque-proxy/src/fragment.rs
+++ b/mullvad-masque-proxy/src/fragment.rs
@@ -50,15 +50,24 @@ impl Default for Fragments {
}
}
+pub enum DefragReceived {
+ /// Received a whole packet without fragmentation
+ Nonfragmented(Bytes),
+ /// Received a fragment but was unable to reassemble the packet
+ Fragment,
+ /// Received reassembled packet
+ Reassembled(Bytes),
+}
+
impl Fragments {
// TODO: Let caller provide output buffer.
pub fn handle_incoming_packet(
&mut self,
mut payload: Bytes,
- ) -> Result<Option<Bytes>, DefragError> {
+ ) -> Result<DefragReceived, DefragError> {
match VarInt::decode(&mut payload) {
Ok(crate::HTTP_MASQUE_DATAGRAM_CONTEXT_ID) => {
- return Ok(Some(payload));
+ return Ok(DefragReceived::Nonfragmented(payload));
}
Ok(crate::HTTP_MASQUE_FRAGMENTED_DATAGRAM_CONTEXT_ID) => {}
unexpected_context_id => {
@@ -95,7 +104,11 @@ impl Fragments {
let fragments = self.fragment_map.entry(id).or_default();
fragments.push(fragment);
- Ok(self.try_reassemble(id, fragment_count))
+ let reassembled = self.try_reassemble(id, fragment_count)
+ .map(DefragReceived::Reassembled)
+ // TODO: This may also occur if a packet is discarded
+ .unwrap_or(DefragReceived::Fragment);
+ Ok(reassembled)
}
// TODO: Let caller provide output buffer.
@@ -199,7 +212,7 @@ mod test {
fragment_buf.shuffle(&mut thread_rng());
for fragment in fragment_buf {
- if let Some(reconstructed_packet) =
+ if let DefragReceived::Reassembled(reconstructed_packet) =
fragments.handle_incoming_packet(fragment).unwrap()
{
assert_eq!(payload.as_slice(), reconstructed_packet.as_ref());
@@ -237,7 +250,10 @@ mod test {
let packet = fragments
.handle_incoming_packet(fragment_buf.pop().unwrap())
.unwrap();
- assert!(packet.is_none(), "haven't sent all fragments yet");
+ assert!(
+ matches!(packet, DefragReceived::Fragment),
+ "haven't sent all fragments yet"
+ );
// then send a bunch of fragments to fill the queue
let mut bad_payload = Bytes::from([0u8; 2].to_vec());
@@ -253,11 +269,11 @@ mod test {
let packet = fragments
.handle_incoming_packet(incomplete_fragment.clone())
.unwrap();
- assert!(packet.is_none());
+ assert!(matches!(packet, DefragReceived::Fragment));
}
for fragment in fragment_buf {
- if let Some(reconstructed_packet) =
+ if let DefragReceived::Reassembled(reconstructed_packet) =
fragments.handle_incoming_packet(fragment).unwrap()
{
assert_eq!(payload.as_slice(), reconstructed_packet.as_ref());
diff --git a/mullvad-masque-proxy/src/server/mod.rs b/mullvad-masque-proxy/src/server/mod.rs
index f8f224d485..3788ca7c80 100644
--- a/mullvad-masque-proxy/src/server/mod.rs
+++ b/mullvad-masque-proxy/src/server/mod.rs
@@ -22,7 +22,7 @@ use typed_builder::TypedBuilder;
use crate::{
MASQUE_WELL_KNOWN_PATH, MAX_INFLIGHT_PACKETS, MIN_IPV4_MTU, MIN_IPV6_MTU, QUIC_HEADER_SIZE,
compute_udp_payload_size,
- fragment::{self, Fragments},
+ fragment::{self, DefragReceived, Fragments},
};
#[derive(Debug, thiserror::Error)]
@@ -311,8 +311,10 @@ async fn proxy_tx_task(udp_socket: impl AsRef<UdpSocket>, mut client_rx: mpsc::R
let quic_payload = quic_datagram.into_payload();
let packet = match fragments.handle_incoming_packet(quic_payload) {
- Ok(Some(packet)) => packet,
- Ok(None) => continue,
+ Ok(DefragReceived::Reassembled(packet) | DefragReceived::Nonfragmented(packet)) => {
+ packet
+ }
+ Ok(DefragReceived::Fragment) => continue,
Err(err) => {
log::trace!("Failed to reassemble incoming packet: {err}");
continue;
diff --git a/talpid-wireguard/src/ephemeral.rs b/talpid-wireguard/src/ephemeral.rs
index 199cb0ebd6..501de4fdb6 100644
--- a/talpid-wireguard/src/ephemeral.rs
+++ b/talpid-wireguard/src/ephemeral.rs
@@ -27,6 +27,7 @@ pub async fn config_ephemeral_peers(
tunnel: &Arc<AsyncMutex<Option<TunnelType>>>,
config: &mut Config,
retry_attempt: u32,
+ obfuscation_mtu: u16,
obfuscator: Arc<AsyncMutex<Option<ObfuscatorHandle>>>,
close_obfs_sender: sync_mpsc::Sender<CloseMsg>,
) -> std::result::Result<(), CloseMsg> {
@@ -42,8 +43,15 @@ pub async fn config_ephemeral_peers(
log::trace!("Temporarily lowering tunnel MTU before ephemeral peer config");
try_set_ipv4_mtu(&iface_name, talpid_tunnel::MIN_IPV4_MTU);
- config_ephemeral_peers_inner(tunnel, config, retry_attempt, obfuscator, close_obfs_sender)
- .await?;
+ config_ephemeral_peers_inner(
+ tunnel,
+ config,
+ retry_attempt,
+ obfuscation_mtu,
+ obfuscator,
+ close_obfs_sender,
+ )
+ .await?;
log::trace!("Resetting tunnel MTU");
try_set_ipv4_mtu(&iface_name, config.mtu);
@@ -71,6 +79,7 @@ pub async fn config_ephemeral_peers(
tunnel: &Arc<AsyncMutex<Option<TunnelType>>>,
config: &mut Config,
retry_attempt: u32,
+ obfuscation_mtu: u16,
obfuscator: Arc<AsyncMutex<Option<ObfuscatorHandle>>>,
close_obfs_sender: sync_mpsc::Sender<CloseMsg>,
#[cfg(target_os = "android")] tun_provider: Arc<Mutex<TunProvider>>,
@@ -79,6 +88,7 @@ pub async fn config_ephemeral_peers(
tunnel,
config,
retry_attempt,
+ obfuscation_mtu,
obfuscator,
close_obfs_sender,
#[cfg(target_os = "android")]
@@ -91,6 +101,7 @@ async fn config_ephemeral_peers_inner(
tunnel: &Arc<AsyncMutex<Option<TunnelType>>>,
config: &mut Config,
retry_attempt: u32,
+ obfuscation_mtu: u16,
obfuscator: Arc<AsyncMutex<Option<ObfuscatorHandle>>>,
close_obfs_sender: sync_mpsc::Sender<CloseMsg>,
#[cfg(target_os = "android")] tun_provider: Arc<Mutex<TunProvider>>,
@@ -125,6 +136,7 @@ async fn config_ephemeral_peers_inner(
let entry_config = reconfigure_tunnel(
tunnel,
entry_tun_config,
+ obfuscation_mtu,
obfuscator.clone(),
close_obfs_sender,
#[cfg(target_os = "android")]
@@ -156,6 +168,7 @@ async fn config_ephemeral_peers_inner(
*config = reconfigure_tunnel(
tunnel,
config.clone(),
+ obfuscation_mtu,
obfuscator,
close_obfs_sender,
#[cfg(target_os = "android")]
@@ -187,6 +200,7 @@ async fn config_ephemeral_peers_inner(
async fn reconfigure_tunnel(
tunnel: &Arc<AsyncMutex<Option<TunnelType>>>,
mut config: Config,
+ obfuscation_mtu: u16,
obfuscator: Arc<AsyncMutex<Option<ObfuscatorHandle>>>,
close_obfs_sender: sync_mpsc::Sender<CloseMsg>,
tun_provider: &Arc<Mutex<TunProvider>>,
@@ -196,6 +210,7 @@ async fn reconfigure_tunnel(
obfuscator_handle.abort();
*obfs_guard = super::obfuscation::apply_obfuscation_config(
&mut config,
+ obfuscation_mtu,
close_obfs_sender,
#[cfg(target_os = "android")]
tun_provider.clone(),
@@ -224,15 +239,20 @@ async fn reconfigure_tunnel(
async fn reconfigure_tunnel(
tunnel: &Arc<AsyncMutex<Option<TunnelType>>>,
mut config: Config,
+ obfuscation_mtu: u16,
obfuscator: Arc<AsyncMutex<Option<ObfuscatorHandle>>>,
close_obfs_sender: sync_mpsc::Sender<CloseMsg>,
) -> Result<Config, CloseMsg> {
let mut obfs_guard = obfuscator.lock().await;
if let Some(obfuscator_handle) = obfs_guard.take() {
obfuscator_handle.abort();
- *obfs_guard = super::obfuscation::apply_obfuscation_config(&mut config, close_obfs_sender)
- .await
- .map_err(CloseMsg::ObfuscatorFailed)?;
+ *obfs_guard = super::obfuscation::apply_obfuscation_config(
+ &mut config,
+ obfuscation_mtu,
+ close_obfs_sender,
+ )
+ .await
+ .map_err(CloseMsg::ObfuscatorFailed)?;
}
{
diff --git a/talpid-wireguard/src/lib.rs b/talpid-wireguard/src/lib.rs
index 5e265e4dd5..dc6964385a 100644
--- a/talpid-wireguard/src/lib.rs
+++ b/talpid-wireguard/src/lib.rs
@@ -168,19 +168,24 @@ impl WireguardMonitor {
let endpoint_addrs = [params.get_next_hop_endpoint().address.ip()];
let (close_obfs_sender, close_obfs_listener) = sync_mpsc::channel();
+ // Adjust MTU unless overridden by user
+ if params.options.mtu.is_none() {
+ config.mtu = clamp_mtu(params, config.mtu);
+ }
// Start obfuscation server and patch the WireGuard config to point the endpoint to it.
+ let obfuscation_mtu = config.mtu;
let obfuscator = args
.runtime
.block_on(obfuscation::apply_obfuscation_config(
&mut config,
+ obfuscation_mtu,
close_obfs_sender.clone(),
))?;
- // Don't adjust MTU if overridden by user
- if params.options.mtu.is_none() {
- if let Some(obfuscator) = obfuscator.as_ref() {
- config.mtu = config.mtu.saturating_sub(obfuscator.packet_overhead());
- }
- config.mtu = clamp_mtu(params, config.mtu);
+ // Adjust MTU again for obfuscation packet overhead
+ if params.options.mtu.is_none()
+ && let Some(obfuscator) = obfuscator.as_ref()
+ {
+ config.mtu = config.mtu.saturating_sub(obfuscator.packet_overhead());
}
// NOTE: We force userspace WireGuard while boringtun is enabled to more easily test
@@ -279,6 +284,7 @@ impl WireguardMonitor {
&tunnel,
&mut config,
args.retry_attempt,
+ obfuscation_mtu,
obfuscator.clone(),
ephemeral_obfs_sender,
)
@@ -414,10 +420,12 @@ impl WireguardMonitor {
Config::from_parameters(params, desired_mtu).map_err(Error::WireguardConfigError)?;
let (close_obfs_sender, close_obfs_listener) = sync_mpsc::channel();
// Start obfuscation server and patch the WireGuard config to point the endpoint to it.
+ let obfuscation_mtu = config.mtu;
let obfuscator = args
.runtime
.block_on(obfuscation::apply_obfuscation_config(
&mut config,
+ obfuscation_mtu,
close_obfs_sender.clone(),
args.tun_provider.clone(),
))?;
@@ -520,6 +528,7 @@ impl WireguardMonitor {
&tunnel,
&mut config,
args.retry_attempt,
+ obfuscation_mtu,
obfuscator.clone(),
ephemeral_obfs_sender,
args.tun_provider,
diff --git a/talpid-wireguard/src/obfuscation.rs b/talpid-wireguard/src/obfuscation.rs
index c7bb69dd34..3fd4d411ea 100644
--- a/talpid-wireguard/src/obfuscation.rs
+++ b/talpid-wireguard/src/obfuscation.rs
@@ -8,7 +8,6 @@ use std::{
net::{Ipv4Addr, Ipv6Addr, SocketAddr},
sync::mpsc as sync_mpsc,
};
-use talpid_tunnel::WIREGUARD_HEADER_SIZE;
#[cfg(target_os = "android")]
use talpid_tunnel::tun_provider::TunProvider;
use talpid_types::{ErrorExt, net::obfuscation::ObfuscatorConfig};
@@ -19,8 +18,13 @@ use tunnel_obfuscation::{
/// Begin running obfuscation machine, if configured. This function will patch `config`'s endpoint
/// to point to an endpoint on localhost
+///
+/// # Arguments
+///
+/// * obfuscation_mtu - "MTU" including obfuscation overhead
pub async fn apply_obfuscation_config(
config: &mut Config,
+ obfuscation_mtu: u16,
close_msg_sender: sync_mpsc::Sender<CloseMsg>,
#[cfg(target_os = "android")] tun_provider: Arc<Mutex<TunProvider>>,
) -> Result<Option<ObfuscatorHandle>> {
@@ -28,24 +32,12 @@ pub async fn apply_obfuscation_config(
return Ok(None);
};
- let settings = {
- let settings = settings_from_config(
- obfuscator_config,
- #[cfg(target_os = "linux")]
- config.fwmark,
- );
-
- // Adjust MTU for QUIC obfuscator.
- match settings {
- ObfuscationSettings::Quic(quic) => {
- // Account for multihop
- // FIXME: Pass proper mtu as an argument / through config?
- let quic = quic.mtu(config.mtu - 2 * WIREGUARD_HEADER_SIZE);
- ObfuscationSettings::Quic(quic)
- }
- settings => settings,
- }
- };
+ let settings = settings_from_config(
+ obfuscator_config,
+ obfuscation_mtu,
+ #[cfg(target_os = "linux")]
+ config.fwmark,
+ );
log::trace!("Obfuscation settings: {settings:?}");
@@ -90,6 +82,7 @@ fn patch_endpoint(config: &mut Config, endpoint: SocketAddr) {
fn settings_from_config(
config: &ObfuscatorConfig,
+ mtu: u16,
#[cfg(target_os = "linux")] fwmark: Option<u32>,
) -> ObfuscationSettings {
match config {
@@ -121,7 +114,8 @@ fn settings_from_config(
hostname.to_owned(),
auth_token.parse().unwrap(),
wireguard_endpoint,
- );
+ )
+ .mtu(mtu);
#[cfg(target_os = "linux")]
if let Some(fwmark) = fwmark {
return ObfuscationSettings::Quic(settings.fwmark(fwmark));
diff --git a/tunnel-obfuscation/src/quic.rs b/tunnel-obfuscation/src/quic.rs
index a3f3164066..44dcfe1f8e 100644
--- a/tunnel-obfuscation/src/quic.rs
+++ b/tunnel-obfuscation/src/quic.rs
@@ -208,7 +208,9 @@ impl Obfuscator for Quic {
}
fn packet_overhead(&self) -> u16 {
- 0 // FIXME
+ // TODO: 95 = IPv6 (40) + UDP (8) + QUIC (<= 41) + stream ID (1) + fragment header (5)
+ // The above would prevent mullvad-masque-proxy-level fragmentation
+ 0
}
#[cfg(target_os = "android")]