summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorEmīls <emils@mullvad.net>2020-07-17 10:48:53 +0100
committerEmīls <emils@mullvad.net>2020-07-17 10:48:53 +0100
commit77e129e184c783473acad21c29901288eff10df1 (patch)
tree15201dd7e4ce4bbc52333dcc5b7171fbcbec4434
parent1cb6a6e6aeb555d253142ad78fda57d4a95934af (diff)
parentb699a2f958cdaa41e1e7fd32d8749276162ad3b7 (diff)
downloadmullvadvpn-77e129e184c783473acad21c29901288eff10df1.tar.xz
mullvadvpn-77e129e184c783473acad21c29901288eff10df1.zip
Merge branch 'fix-conncheck'
-rw-r--r--CHANGELOG.md2
-rw-r--r--talpid-core/src/ping_monitor/mod.rs16
-rw-r--r--talpid-core/src/ping_monitor/unix.rs27
-rw-r--r--talpid-core/src/ping_monitor/win.rs15
-rw-r--r--talpid-core/src/tunnel/wireguard/connectivity_check.rs266
5 files changed, 293 insertions, 33 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index f6ddf617a0..ace301a14c 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -43,6 +43,8 @@ Line wrap the file at 100 chars. Th
- Upgrade Wintun from 0.7 to 0.8.1.
### Fixed
+- Fix connectivity monitor for WireGuard not disconnecting from a relay when connectivity is lost.
+
#### Windows
- Fix window flickering by disabling window animations.
diff --git a/talpid-core/src/ping_monitor/mod.rs b/talpid-core/src/ping_monitor/mod.rs
index bc148a5892..c5e2a256e5 100644
--- a/talpid-core/src/ping_monitor/mod.rs
+++ b/talpid-core/src/ping_monitor/mod.rs
@@ -7,4 +7,18 @@ mod imp;
#[path = "win.rs"]
mod imp;
-pub use imp::{Error, Pinger};
+pub use imp::Error;
+
+pub trait Pinger: Send {
+ /// Sends an ICMP packet
+ fn send_icmp(&mut self) -> Result<(), Error>;
+ /// Clears all resources used by the pinger.
+ fn reset(&mut self) {}
+}
+
+pub fn new_pinger(
+ addr: std::net::Ipv4Addr,
+ interface_name: String,
+) -> Result<Box<dyn Pinger>, Error> {
+ Ok(Box::new(imp::Pinger::new(addr, interface_name)?))
+}
diff --git a/talpid-core/src/ping_monitor/unix.rs b/talpid-core/src/ping_monitor/unix.rs
index 7e0953289f..4207f0db04 100644
--- a/talpid-core/src/ping_monitor/unix.rs
+++ b/talpid-core/src/ping_monitor/unix.rs
@@ -25,8 +25,21 @@ impl Pinger {
})
}
+
+ fn try_deplete_process_list(&mut self) {
+ self.processes.retain(|child| {
+ match child.try_wait() {
+ // child has terminated, doesn't have to be retained
+ Ok(Some(_)) => false,
+ _ => true,
+ }
+ });
+ }
+}
+
+impl super::Pinger for Pinger {
// Send an ICMP packet without waiting for a reply
- pub fn send_icmp(&mut self) -> Result<(), Error> {
+ fn send_icmp(&mut self) -> Result<(), Error> {
self.try_deplete_process_list();
let cmd = ping_cmd(self.addr, 1, &self.interface_name);
@@ -35,7 +48,7 @@ impl Pinger {
Ok(())
}
- pub fn reset(&mut self) {
+ fn reset(&mut self) {
let processes = std::mem::replace(&mut self.processes, vec![]);
for proc in processes {
if proc
@@ -49,16 +62,6 @@ impl Pinger {
}
}
}
-
- fn try_deplete_process_list(&mut self) {
- self.processes.retain(|child| {
- match child.try_wait() {
- // child has terminated, doesn't have to be retained
- Ok(Some(_)) => false,
- _ => true,
- }
- });
- }
}
impl Drop for Pinger {
diff --git a/talpid-core/src/ping_monitor/win.rs b/talpid-core/src/ping_monitor/win.rs
index c2fd16dd9d..e2564b3943 100644
--- a/talpid-core/src/ping_monitor/win.rs
+++ b/talpid-core/src/ping_monitor/win.rs
@@ -60,13 +60,6 @@ impl Pinger {
})
}
- pub fn send_icmp(&mut self) -> Result<()> {
- let dest = SocketAddr::new(IpAddr::from(self.addr), 0);
- let request = self.next_ping_request();
- self.send_ping_request(&request, dest)
- }
-
-
fn send_ping_request(
&mut self,
request: &EchoRequestPacket<'static>,
@@ -117,3 +110,11 @@ impl Pinger {
seq
}
}
+
+impl super::Pinger for Pinger {
+ fn send_icmp(&mut self) -> Result<()> {
+ let dest = SocketAddr::new(IpAddr::from(self.addr), 0);
+ let request = self.next_ping_request();
+ self.send_ping_request(&request, dest)
+ }
+}
diff --git a/talpid-core/src/tunnel/wireguard/connectivity_check.rs b/talpid-core/src/tunnel/wireguard/connectivity_check.rs
index 7426e674d7..111edd772a 100644
--- a/talpid-core/src/tunnel/wireguard/connectivity_check.rs
+++ b/talpid-core/src/tunnel/wireguard/connectivity_check.rs
@@ -1,4 +1,7 @@
-use crate::{ping_monitor::Pinger, tunnel::wireguard::stats::Stats};
+use crate::{
+ ping_monitor::{new_pinger, Pinger},
+ tunnel::wireguard::stats::Stats,
+};
use std::{
net::Ipv4Addr,
sync::{mpsc, Mutex, Weak},
@@ -66,7 +69,7 @@ pub struct ConnectivityMonitor {
conn_state: ConnState,
initial_ping_timestamp: Option<Instant>,
num_pings_sent: u32,
- pinger: Pinger,
+ pinger: Box<dyn Pinger>,
close_receiver: mpsc::Receiver<()>,
}
@@ -78,7 +81,7 @@ impl ConnectivityMonitor {
tunnel_handle: Weak<Mutex<Option<Box<dyn Tunnel>>>>,
close_receiver: mpsc::Receiver<()>,
) -> Result<Self, Error> {
- let pinger = Pinger::new(addr, interface).map_err(Error::PingError)?;
+ let pinger = new_pinger(addr, interface).map_err(Error::PingError)?;
let now = Instant::now();
@@ -101,7 +104,7 @@ impl ConnectivityMonitor {
let start = Instant::now();
while start.elapsed() < PING_TIMEOUT {
- if self.check_connectivity()? {
+ if self.check_connectivity(Instant::now())? {
return Ok(true);
}
if self.should_shut_down(DELAY_ON_INITIAL_SETUP) {
@@ -129,7 +132,10 @@ impl ConnectivityMonitor {
let mut current_iteration = Instant::now();
let time_slept = current_iteration - last_iteration;
if time_slept < (iter_delay * 2) {
- self.check_connectivity()?;
+ if !self.check_connectivity(Instant::now())? {
+ return Ok(());
+ }
+
let end = Instant::now();
if end - current_iteration > Duration::from_secs(1) {
current_iteration = end;
@@ -146,8 +152,7 @@ impl ConnectivityMonitor {
}
/// Returns true if connection is established
- fn check_connectivity(&mut self) -> Result<bool, Error> {
- let now = Instant::now();
+ fn check_connectivity(&mut self, now: Instant) -> Result<bool, Error> {
match self.get_stats() {
None => Ok(false),
Some(new_stats) => {
@@ -158,7 +163,7 @@ impl ConnectivityMonitor {
return Ok(true);
}
- self.maybe_send_ping()?;
+ self.maybe_send_ping(now)?;
Ok(!self.ping_timed_out() && self.conn_state.connected())
}
}
@@ -175,7 +180,7 @@ impl ConnectivityMonitor {
.map(|tunnel| tunnel.get_tunnel_stats().map_err(Error::ConfigReadError))
}
- fn maybe_send_ping(&mut self) -> Result<(), Error> {
+ fn maybe_send_ping(&mut self, now: Instant) -> Result<(), Error> {
// Only send out a ping if we haven't received a byte in a while or no traffic has flowed
// in the last 2 minutes, but if a ping already has been sent out, only send one out every
// 3 seconds.
@@ -189,7 +194,7 @@ impl ConnectivityMonitor {
{
self.pinger.send_icmp().map_err(Error::PingError)?;
if self.initial_ping_timestamp.is_none() {
- self.initial_ping_timestamp = Some(Instant::now());
+ self.initial_ping_timestamp = Some(now);
}
self.num_pings_sent += 1;
}
@@ -206,7 +211,6 @@ impl ConnectivityMonitor {
fn reset_pinger(&mut self) {
self.initial_ping_timestamp = None;
self.num_pings_sent = 0;
- #[cfg(unix)]
self.pinger.reset();
}
}
@@ -338,8 +342,15 @@ impl ConnState {
#[cfg(test)]
mod test {
- use super::{ConnState, Stats, BYTES_RX_TIMEOUT, TRAFFIC_TIMEOUT};
- use std::time::{Duration, Instant};
+ use super::*;
+ use crate::tunnel::wireguard::{stats, TunnelError};
+ use std::{
+ sync::{
+ atomic::{AtomicBool, Ordering},
+ Arc, Mutex,
+ },
+ time::{Duration, Instant},
+ };
/// Test if a newly created ConnState won't have timed out or consider itself connected
#[test]
@@ -436,4 +447,233 @@ mod test {
assert!(conn_state.rx_timed_out());
assert!(!conn_state.traffic_timed_out());
}
+
+ #[derive(Default)]
+ struct MockPinger {
+ on_send_ping: Option<Box<dyn FnMut() + Send>>,
+ }
+
+ impl Pinger for MockPinger {
+ fn send_icmp(&mut self) -> Result<(), crate::ping_monitor::Error> {
+ if let Some(callback) = self.on_send_ping.as_mut() {
+ (callback)();
+ }
+ Ok(())
+ }
+ }
+
+ struct MockTunnel {
+ on_get_stats: Box<dyn Fn() -> Result<stats::Stats, TunnelError> + Send>,
+ }
+
+ impl MockTunnel {
+ fn new<F: Fn() -> Result<stats::Stats, TunnelError> + Send + 'static>(f: F) -> Self {
+ Self {
+ on_get_stats: Box::new(f),
+ }
+ }
+
+ fn always_incrementing() -> Self {
+ let traffic = Mutex::new(stats::Stats {
+ tx_bytes: 0,
+ rx_bytes: 0,
+ });
+ Self {
+ on_get_stats: Box::new(move || {
+ let mut traffic = traffic.lock().unwrap();
+ traffic.tx_bytes += 1;
+ traffic.rx_bytes += 1;
+
+ Ok(*traffic)
+ }),
+ }
+ }
+
+ fn never_incrementing() -> Self {
+ Self {
+ on_get_stats: Box::new(|| {
+ Ok(stats::Stats {
+ tx_bytes: 0,
+ rx_bytes: 0,
+ })
+ }),
+ }
+ }
+
+ fn into_locked(
+ self,
+ ) -> (
+ Arc<Mutex<Option<Box<dyn Tunnel>>>>,
+ Weak<Mutex<Option<Box<dyn Tunnel>>>>,
+ ) {
+ let dyn_tunnel: Box<dyn Tunnel> = Box::new(self);
+ let arc = Arc::new(Mutex::new(Some(dyn_tunnel)));
+ let weak_ref = Arc::downgrade(&arc);
+ (arc, weak_ref)
+ }
+ }
+
+ impl Tunnel for MockTunnel {
+ fn get_interface_name(&self) -> &str {
+ "mock-tunnel"
+ }
+
+ fn stop(self: Box<Self>) -> Result<(), TunnelError> {
+ Ok(())
+ }
+
+ fn get_tunnel_stats(&self) -> Result<stats::Stats, TunnelError> {
+ (self.on_get_stats)()
+ }
+ }
+
+ fn mock_monitor(
+ now: Instant,
+ pinger: Box<dyn Pinger>,
+ tunnel_handle: Weak<Mutex<Option<Box<dyn Tunnel>>>>,
+ close_receiver: mpsc::Receiver<()>,
+ ) -> ConnectivityMonitor {
+ ConnectivityMonitor {
+ conn_state: ConnState::new(now, Default::default()),
+ initial_ping_timestamp: None,
+ num_pings_sent: 0,
+ pinger,
+ close_receiver,
+ tunnel_handle,
+ }
+ }
+
+ fn connected_state(timestamp: Instant) -> ConnState {
+ ConnState::Connected {
+ rx_timestamp: timestamp,
+ tx_timestamp: timestamp,
+ stats: stats::Stats {
+ tx_bytes: 0,
+ rx_bytes: 0,
+ },
+ }
+ }
+
+
+ #[test]
+ /// Verify that `check_connectivity()` returns `false` if the tunnel is connected and traffic is
+ /// not flowing after `BYTES_RX_TIMEOUT` and `PING_TIMEOUT`.
+ fn test_ping_times_out() {
+ let (_tunnel_anchor, tunnel) = MockTunnel::never_incrementing().into_locked();
+ let (_tx, rx) = mpsc::channel();
+ let pinger = MockPinger::default();
+ let now = Instant::now();
+ let start = now - (BYTES_RX_TIMEOUT + PING_TIMEOUT + Duration::from_secs(10));
+ let mut monitor = mock_monitor(start, Box::new(pinger), tunnel, rx);
+
+ // Mock the state - connectivity has been established
+ monitor.conn_state = connected_state(start);
+ // A ping was sent to verify connectivity
+ monitor.maybe_send_ping(start).unwrap();
+ assert!(!monitor.check_connectivity(now).unwrap())
+ }
+
+ #[test]
+ /// Verify that `check_connectivity()` returns `true` if the tunnel is connected and traffic is
+ /// flowing constantly.
+ fn test_no_connection_on_start() {
+ let (_tunnel_anchor, tunnel) = MockTunnel::never_incrementing().into_locked();
+ let (_tx, rx) = mpsc::channel();
+ let pinger = MockPinger::default();
+ let now = Instant::now();
+ let start = now - Duration::from_secs(1);
+ let mut monitor = mock_monitor(start, Box::new(pinger), tunnel, rx);
+
+ assert!(!monitor.check_connectivity(now).unwrap())
+ }
+
+ #[test]
+ /// Verify that `check_connectivity()` returns `true` if the tunnel is connected and traffic is
+ /// flowing constantly.
+ fn test_connection_works() {
+ let (_tunnel_anchor, tunnel) = MockTunnel::always_incrementing().into_locked();
+ let (_tx, rx) = mpsc::channel();
+ let pinger = MockPinger::default();
+ let now = Instant::now();
+ let start = now - Duration::from_secs(1);
+ let mut monitor = mock_monitor(start, Box::new(pinger), tunnel, rx);
+
+ // Mock the state - connectivity has been established
+ monitor.conn_state = connected_state(start);
+
+ assert!(monitor.check_connectivity(now).unwrap())
+ }
+
+ #[test]
+ /// Verify that the connectivity monitor doesn't fail if the tunnel constantly sends traffic,
+ /// and it shuts down properly.
+ fn test_wait_loop() {
+ let (result_tx, result_rx) = mpsc::channel();
+ let (_tunnel_anchor, tunnel) = MockTunnel::always_incrementing().into_locked();
+ let pinger = MockPinger::default();
+ let (stop_tx, stop_rx) = mpsc::channel();
+ std::thread::spawn(move || {
+ let now = Instant::now();
+ let start = now - Duration::from_secs(1);
+ let mut monitor = mock_monitor(start, Box::new(pinger), tunnel, stop_rx);
+
+ let start_result = monitor.establish_connectivity();
+ result_tx.send(start_result).unwrap();
+
+ let result = monitor.run().map(|_| true);
+ result_tx.send(result).unwrap();
+ });
+
+ std::thread::sleep(Duration::from_secs(1));
+ assert_eq!(true, result_rx.try_recv().unwrap().unwrap());
+ stop_tx.send(()).unwrap();
+ std::thread::sleep(Duration::from_secs(1));
+ assert!(result_rx.try_recv().unwrap().is_ok());
+ }
+
+ #[test]
+ /// Verify that the connectivity monitor detects the tunnel timing out after no longer than
+ /// `BYTES_RX_TIMEOUT` and `PING_TIMEOUT` combined.
+ fn test_wait_loop_timeout() {
+ let should_stop = Arc::new(AtomicBool::new(false));
+ let should_stop_inner = should_stop.clone();
+
+ let tunnel_stats = Mutex::new(stats::Stats {
+ rx_bytes: 0,
+ tx_bytes: 0,
+ });
+
+ let pinger = MockPinger::default();
+ let (_tunnel_anchor, tunnel) = MockTunnel::new(move || {
+ let mut tunnel_stats = tunnel_stats.lock().unwrap();
+ if !should_stop_inner.load(Ordering::SeqCst) {
+ tunnel_stats.rx_bytes += 1;
+ }
+ tunnel_stats.tx_bytes += 1;
+ Ok(tunnel_stats.clone())
+ })
+ .into_locked();
+
+ let (result_tx, result_rx) = mpsc::channel();
+
+ let (_stop_tx, stop_rx) = mpsc::channel();
+ std::thread::spawn(move || {
+ let now = Instant::now();
+ let start = now - Duration::from_secs(1);
+ let mut monitor = mock_monitor(start, Box::new(pinger), tunnel, stop_rx);
+ let start_result = monitor.establish_connectivity();
+ result_tx.send(start_result).unwrap();
+ let end_result = monitor.run().map(|_| true);
+ result_tx.send(end_result).expect("Failed to send result");
+ });
+ assert!(result_rx
+ .recv_timeout(Duration::from_secs(1))
+ .unwrap()
+ .unwrap());
+ should_stop.store(true, Ordering::SeqCst);
+ assert!(result_rx
+ .recv_timeout(BYTES_RX_TIMEOUT + PING_TIMEOUT + Duration::from_secs(2))
+ .unwrap()
+ .is_ok());
+ }
}