summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock1
-rw-r--r--talpid-tunnel-config-client/Cargo.toml1
-rw-r--r--talpid-tunnel-config-client/src/lib.rs83
-rw-r--r--talpid-wireguard/src/lib.rs44
4 files changed, 119 insertions, 10 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 90e8b3e0ba..c85c6b6f08 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4495,7 +4495,6 @@ name = "talpid-tunnel-config-client"
version = "0.0.0"
dependencies = [
"classic-mceliece-rust",
- "futures",
"hyper-util",
"libc",
"log",
diff --git a/talpid-tunnel-config-client/Cargo.toml b/talpid-tunnel-config-client/Cargo.toml
index 5cf6e4b522..cb63d93fc9 100644
--- a/talpid-tunnel-config-client/Cargo.toml
+++ b/talpid-tunnel-config-client/Cargo.toml
@@ -14,7 +14,6 @@ workspace = true
log = { workspace = true }
rand = "0.8"
talpid-types = { path = "../talpid-types" }
-futures = { workspace = true }
tonic = { workspace = true }
tower = { workspace = true }
prost = { workspace = true }
diff --git a/talpid-tunnel-config-client/src/lib.rs b/talpid-tunnel-config-client/src/lib.rs
index 7c80d4f3e5..f13832fa60 100644
--- a/talpid-tunnel-config-client/src/lib.rs
+++ b/talpid-tunnel-config-client/src/lib.rs
@@ -107,7 +107,9 @@ pub async fn request_ephemeral_peer(
enable_post_quantum: bool,
enable_daita: bool,
) -> Result<EphemeralPeer, Error> {
+ log::debug!("Connecting to relay config service at {service_address}");
let client = connect_relay_config_client(service_address).await?;
+ log::debug!("Connected to relay config service at {service_address}");
request_ephemeral_peer_with(
client,
@@ -128,6 +130,7 @@ pub async fn request_ephemeral_peer_with(
) -> Result<EphemeralPeer, Error> {
let (pq_request, kem_secrets) = if enable_quantum_resistant {
let (pq_request, kem_secrets) = post_quantum_secrets().await;
+ log::debug!("Generated PQ secrets");
(Some(pq_request), Some(kem_secrets))
} else {
(None, None)
@@ -275,7 +278,7 @@ fn xor_assign(dst: &mut [u8; 32], src: &[u8; 32]) {
/// value has been speficically lowered, to avoid MTU issues. See the `socket` module.
#[cfg(not(target_os = "ios"))]
async fn connect_relay_config_client(ip: Ipv4Addr) -> Result<RelayConfigService, Error> {
- use futures::TryFutureExt;
+ use hyper_util::rt::tokio::TokioIo;
let endpoint = Endpoint::from_static("tcp://0.0.0.0:0");
let addr = SocketAddr::new(IpAddr::V4(ip), CONFIG_SERVICE_PORT);
@@ -283,12 +286,84 @@ async fn connect_relay_config_client(ip: Ipv4Addr) -> Result<RelayConfigService,
let connection = endpoint
.connect_with_connector(service_fn(move |_| async move {
let sock = socket::TcpSocket::new()?;
- sock.connect(addr)
- .map_ok(hyper_util::rt::tokio::TokioIo::new)
- .await
+ let stream = sock.connect(addr).await?;
+ let sniffer = socket_sniffer::SocketSniffer {
+ s: stream,
+ rx_bytes: 0,
+ tx_bytes: 0,
+ start_time: std::time::Instant::now(),
+ };
+ Ok::<_, std::io::Error>(TokioIo::new(sniffer))
}))
.await
.map_err(Error::GrpcConnectError)?;
Ok(RelayConfigService::new(connection))
}
+
+mod socket_sniffer {
+ pub struct SocketSniffer<S> {
+ pub s: S,
+ pub rx_bytes: usize,
+ pub tx_bytes: usize,
+ pub start_time: std::time::Instant,
+ }
+ use std::{
+ io,
+ pin::Pin,
+ task::{Context, Poll},
+ };
+
+ use tokio::io::AsyncWrite;
+
+ use tokio::io::{AsyncRead, ReadBuf};
+
+ impl<S> Drop for SocketSniffer<S> {
+ fn drop(&mut self) {
+ let duration = self.start_time.elapsed();
+ log::debug!(
+ "Tunnel config client connection ended. RX: {} bytes, TX: {} bytes, duration: {} s",
+ self.rx_bytes,
+ self.tx_bytes,
+ duration.as_secs()
+ );
+ }
+ }
+
+ impl<S: AsyncRead + AsyncWrite + Unpin> AsyncRead for SocketSniffer<S> {
+ fn poll_read(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<io::Result<()>> {
+ let initial_data = buf.filled().len();
+ let bytes = std::task::ready!(Pin::new(&mut self.s).poll_read(cx, buf));
+ if bytes.is_ok() {
+ self.rx_bytes += buf.filled().len().saturating_sub(initial_data);
+ }
+ Poll::Ready(bytes)
+ }
+ }
+
+ impl<S: AsyncRead + AsyncWrite + Unpin> AsyncWrite for SocketSniffer<S> {
+ fn poll_write(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ let bytes = std::task::ready!(Pin::new(&mut self.s).poll_write(cx, buf));
+ if let Ok(bytes) = bytes {
+ self.tx_bytes += bytes;
+ }
+ Poll::Ready(bytes)
+ }
+
+ fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ Pin::new(&mut self.s).poll_flush(cx)
+ }
+
+ fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ Pin::new(&mut self.s).poll_shutdown(cx)
+ }
+ }
+}
diff --git a/talpid-wireguard/src/lib.rs b/talpid-wireguard/src/lib.rs
index 2d282c6315..cb19559dcf 100644
--- a/talpid-wireguard/src/lib.rs
+++ b/talpid-wireguard/src/lib.rs
@@ -268,14 +268,22 @@ impl WireguardMonitor {
let ephemeral_obfs_sender = close_obfs_sender.clone();
if config.quantum_resistant || config.daita {
- ephemeral::config_ephemeral_peers(
+ if let Err(e) = ephemeral::config_ephemeral_peers(
&tunnel,
&mut config,
args.retry_attempt,
obfuscator.clone(),
ephemeral_obfs_sender,
)
- .await?;
+ .await
+ {
+ // We have received a small amount of reports about ephemeral peer nogationation
+ // timing out on Windows for 2024.9-beta1. These verbose data usage logs are
+ // a temporary measure to help us understand the issue. They can be removed
+ // if the issue is resolved.
+ log_tunnel_data_usage(&config, &tunnel).await;
+ return Err(e);
+ }
let metadata = Self::tunnel_metadata(&iface_name, &config);
event_hook
@@ -464,7 +472,7 @@ impl WireguardMonitor {
if should_negotiate_ephemeral_peer {
let ephemeral_obfs_sender = close_obfs_sender.clone();
- ephemeral::config_ephemeral_peers(
+ if let Err(e) = ephemeral::config_ephemeral_peers(
&tunnel,
&mut config,
args.retry_attempt,
@@ -472,7 +480,15 @@ impl WireguardMonitor {
ephemeral_obfs_sender,
args.tun_provider,
)
- .await?;
+ .await
+ {
+ // We have received a small amount of reports about ephemeral peer nogationation
+ // timing out on Windows for 2024.9-beta1. These verbose data usage logs are
+ // a temporary measure to help us understand the issue. They can be removed
+ // if the issue is resolved.
+ log_tunnel_data_usage(&config, &tunnel).await;
+ return Err(e);
+ }
let metadata = Self::tunnel_metadata(&iface_name, &config);
event_hook
@@ -965,6 +981,26 @@ impl WireguardMonitor {
}
}
+async fn log_tunnel_data_usage(config: &Config, tunnel: &Arc<AsyncMutex<Option<TunnelType>>>) {
+ let tunnel = tunnel.lock().await;
+ let Some(tunnel) = &*tunnel else { return };
+ let Ok(tunnel_stats) = tunnel.get_tunnel_stats() else {
+ return;
+ };
+ if let Some(stats) = config
+ .exit_peer
+ .as_ref()
+ .map(|peer| peer.public_key.as_bytes())
+ .and_then(|pubkey| tunnel_stats.get(pubkey))
+ {
+ log::warn!("Exit peer stats: {:?}", stats);
+ };
+ let pubkey = config.entry_peer.public_key.as_bytes();
+ if let Some(stats) = tunnel_stats.get(pubkey) {
+ log::warn!("Entry peer stats: {:?}", stats);
+ }
+}
+
#[derive(Debug)]
enum CloseMsg {
Stop,