diff options
| author | Emīls <emils@mullvad.net> | 2020-07-17 10:48:53 +0100 |
|---|---|---|
| committer | Emīls <emils@mullvad.net> | 2020-07-17 10:48:53 +0100 |
| commit | 77e129e184c783473acad21c29901288eff10df1 (patch) | |
| tree | 15201dd7e4ce4bbc52333dcc5b7171fbcbec4434 | |
| parent | 1cb6a6e6aeb555d253142ad78fda57d4a95934af (diff) | |
| parent | b699a2f958cdaa41e1e7fd32d8749276162ad3b7 (diff) | |
| download | mullvadvpn-77e129e184c783473acad21c29901288eff10df1.tar.xz mullvadvpn-77e129e184c783473acad21c29901288eff10df1.zip | |
Merge branch 'fix-conncheck'
| -rw-r--r-- | CHANGELOG.md | 2 | ||||
| -rw-r--r-- | talpid-core/src/ping_monitor/mod.rs | 16 | ||||
| -rw-r--r-- | talpid-core/src/ping_monitor/unix.rs | 27 | ||||
| -rw-r--r-- | talpid-core/src/ping_monitor/win.rs | 15 | ||||
| -rw-r--r-- | talpid-core/src/tunnel/wireguard/connectivity_check.rs | 266 |
5 files changed, 293 insertions, 33 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index f6ddf617a0..ace301a14c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -43,6 +43,8 @@ Line wrap the file at 100 chars. Th - Upgrade Wintun from 0.7 to 0.8.1. ### Fixed +- Fix connectivity monitor for WireGuard not disconnecting from a relay when connectivity is lost. + #### Windows - Fix window flickering by disabling window animations. diff --git a/talpid-core/src/ping_monitor/mod.rs b/talpid-core/src/ping_monitor/mod.rs index bc148a5892..c5e2a256e5 100644 --- a/talpid-core/src/ping_monitor/mod.rs +++ b/talpid-core/src/ping_monitor/mod.rs @@ -7,4 +7,18 @@ mod imp; #[path = "win.rs"] mod imp; -pub use imp::{Error, Pinger}; +pub use imp::Error; + +pub trait Pinger: Send { + /// Sends an ICMP packet + fn send_icmp(&mut self) -> Result<(), Error>; + /// Clears all resources used by the pinger. + fn reset(&mut self) {} +} + +pub fn new_pinger( + addr: std::net::Ipv4Addr, + interface_name: String, +) -> Result<Box<dyn Pinger>, Error> { + Ok(Box::new(imp::Pinger::new(addr, interface_name)?)) +} diff --git a/talpid-core/src/ping_monitor/unix.rs b/talpid-core/src/ping_monitor/unix.rs index 7e0953289f..4207f0db04 100644 --- a/talpid-core/src/ping_monitor/unix.rs +++ b/talpid-core/src/ping_monitor/unix.rs @@ -25,8 +25,21 @@ impl Pinger { }) } + + fn try_deplete_process_list(&mut self) { + self.processes.retain(|child| { + match child.try_wait() { + // child has terminated, doesn't have to be retained + Ok(Some(_)) => false, + _ => true, + } + }); + } +} + +impl super::Pinger for Pinger { // Send an ICMP packet without waiting for a reply - pub fn send_icmp(&mut self) -> Result<(), Error> { + fn send_icmp(&mut self) -> Result<(), Error> { self.try_deplete_process_list(); let cmd = ping_cmd(self.addr, 1, &self.interface_name); @@ -35,7 +48,7 @@ impl Pinger { Ok(()) } - pub fn reset(&mut self) { + fn reset(&mut self) { let processes = std::mem::replace(&mut self.processes, vec![]); for proc in processes { if proc @@ -49,16 +62,6 @@ impl Pinger { } } } - - fn try_deplete_process_list(&mut self) { - self.processes.retain(|child| { - match child.try_wait() { - // child has terminated, doesn't have to be retained - Ok(Some(_)) => false, - _ => true, - } - }); - } } impl Drop for Pinger { diff --git a/talpid-core/src/ping_monitor/win.rs b/talpid-core/src/ping_monitor/win.rs index c2fd16dd9d..e2564b3943 100644 --- a/talpid-core/src/ping_monitor/win.rs +++ b/talpid-core/src/ping_monitor/win.rs @@ -60,13 +60,6 @@ impl Pinger { }) } - pub fn send_icmp(&mut self) -> Result<()> { - let dest = SocketAddr::new(IpAddr::from(self.addr), 0); - let request = self.next_ping_request(); - self.send_ping_request(&request, dest) - } - - fn send_ping_request( &mut self, request: &EchoRequestPacket<'static>, @@ -117,3 +110,11 @@ impl Pinger { seq } } + +impl super::Pinger for Pinger { + fn send_icmp(&mut self) -> Result<()> { + let dest = SocketAddr::new(IpAddr::from(self.addr), 0); + let request = self.next_ping_request(); + self.send_ping_request(&request, dest) + } +} diff --git a/talpid-core/src/tunnel/wireguard/connectivity_check.rs b/talpid-core/src/tunnel/wireguard/connectivity_check.rs index 7426e674d7..111edd772a 100644 --- a/talpid-core/src/tunnel/wireguard/connectivity_check.rs +++ b/talpid-core/src/tunnel/wireguard/connectivity_check.rs @@ -1,4 +1,7 @@ -use crate::{ping_monitor::Pinger, tunnel::wireguard::stats::Stats}; +use crate::{ + ping_monitor::{new_pinger, Pinger}, + tunnel::wireguard::stats::Stats, +}; use std::{ net::Ipv4Addr, sync::{mpsc, Mutex, Weak}, @@ -66,7 +69,7 @@ pub struct ConnectivityMonitor { conn_state: ConnState, initial_ping_timestamp: Option<Instant>, num_pings_sent: u32, - pinger: Pinger, + pinger: Box<dyn Pinger>, close_receiver: mpsc::Receiver<()>, } @@ -78,7 +81,7 @@ impl ConnectivityMonitor { tunnel_handle: Weak<Mutex<Option<Box<dyn Tunnel>>>>, close_receiver: mpsc::Receiver<()>, ) -> Result<Self, Error> { - let pinger = Pinger::new(addr, interface).map_err(Error::PingError)?; + let pinger = new_pinger(addr, interface).map_err(Error::PingError)?; let now = Instant::now(); @@ -101,7 +104,7 @@ impl ConnectivityMonitor { let start = Instant::now(); while start.elapsed() < PING_TIMEOUT { - if self.check_connectivity()? { + if self.check_connectivity(Instant::now())? { return Ok(true); } if self.should_shut_down(DELAY_ON_INITIAL_SETUP) { @@ -129,7 +132,10 @@ impl ConnectivityMonitor { let mut current_iteration = Instant::now(); let time_slept = current_iteration - last_iteration; if time_slept < (iter_delay * 2) { - self.check_connectivity()?; + if !self.check_connectivity(Instant::now())? { + return Ok(()); + } + let end = Instant::now(); if end - current_iteration > Duration::from_secs(1) { current_iteration = end; @@ -146,8 +152,7 @@ impl ConnectivityMonitor { } /// Returns true if connection is established - fn check_connectivity(&mut self) -> Result<bool, Error> { - let now = Instant::now(); + fn check_connectivity(&mut self, now: Instant) -> Result<bool, Error> { match self.get_stats() { None => Ok(false), Some(new_stats) => { @@ -158,7 +163,7 @@ impl ConnectivityMonitor { return Ok(true); } - self.maybe_send_ping()?; + self.maybe_send_ping(now)?; Ok(!self.ping_timed_out() && self.conn_state.connected()) } } @@ -175,7 +180,7 @@ impl ConnectivityMonitor { .map(|tunnel| tunnel.get_tunnel_stats().map_err(Error::ConfigReadError)) } - fn maybe_send_ping(&mut self) -> Result<(), Error> { + fn maybe_send_ping(&mut self, now: Instant) -> Result<(), Error> { // Only send out a ping if we haven't received a byte in a while or no traffic has flowed // in the last 2 minutes, but if a ping already has been sent out, only send one out every // 3 seconds. @@ -189,7 +194,7 @@ impl ConnectivityMonitor { { self.pinger.send_icmp().map_err(Error::PingError)?; if self.initial_ping_timestamp.is_none() { - self.initial_ping_timestamp = Some(Instant::now()); + self.initial_ping_timestamp = Some(now); } self.num_pings_sent += 1; } @@ -206,7 +211,6 @@ impl ConnectivityMonitor { fn reset_pinger(&mut self) { self.initial_ping_timestamp = None; self.num_pings_sent = 0; - #[cfg(unix)] self.pinger.reset(); } } @@ -338,8 +342,15 @@ impl ConnState { #[cfg(test)] mod test { - use super::{ConnState, Stats, BYTES_RX_TIMEOUT, TRAFFIC_TIMEOUT}; - use std::time::{Duration, Instant}; + use super::*; + use crate::tunnel::wireguard::{stats, TunnelError}; + use std::{ + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, Mutex, + }, + time::{Duration, Instant}, + }; /// Test if a newly created ConnState won't have timed out or consider itself connected #[test] @@ -436,4 +447,233 @@ mod test { assert!(conn_state.rx_timed_out()); assert!(!conn_state.traffic_timed_out()); } + + #[derive(Default)] + struct MockPinger { + on_send_ping: Option<Box<dyn FnMut() + Send>>, + } + + impl Pinger for MockPinger { + fn send_icmp(&mut self) -> Result<(), crate::ping_monitor::Error> { + if let Some(callback) = self.on_send_ping.as_mut() { + (callback)(); + } + Ok(()) + } + } + + struct MockTunnel { + on_get_stats: Box<dyn Fn() -> Result<stats::Stats, TunnelError> + Send>, + } + + impl MockTunnel { + fn new<F: Fn() -> Result<stats::Stats, TunnelError> + Send + 'static>(f: F) -> Self { + Self { + on_get_stats: Box::new(f), + } + } + + fn always_incrementing() -> Self { + let traffic = Mutex::new(stats::Stats { + tx_bytes: 0, + rx_bytes: 0, + }); + Self { + on_get_stats: Box::new(move || { + let mut traffic = traffic.lock().unwrap(); + traffic.tx_bytes += 1; + traffic.rx_bytes += 1; + + Ok(*traffic) + }), + } + } + + fn never_incrementing() -> Self { + Self { + on_get_stats: Box::new(|| { + Ok(stats::Stats { + tx_bytes: 0, + rx_bytes: 0, + }) + }), + } + } + + fn into_locked( + self, + ) -> ( + Arc<Mutex<Option<Box<dyn Tunnel>>>>, + Weak<Mutex<Option<Box<dyn Tunnel>>>>, + ) { + let dyn_tunnel: Box<dyn Tunnel> = Box::new(self); + let arc = Arc::new(Mutex::new(Some(dyn_tunnel))); + let weak_ref = Arc::downgrade(&arc); + (arc, weak_ref) + } + } + + impl Tunnel for MockTunnel { + fn get_interface_name(&self) -> &str { + "mock-tunnel" + } + + fn stop(self: Box<Self>) -> Result<(), TunnelError> { + Ok(()) + } + + fn get_tunnel_stats(&self) -> Result<stats::Stats, TunnelError> { + (self.on_get_stats)() + } + } + + fn mock_monitor( + now: Instant, + pinger: Box<dyn Pinger>, + tunnel_handle: Weak<Mutex<Option<Box<dyn Tunnel>>>>, + close_receiver: mpsc::Receiver<()>, + ) -> ConnectivityMonitor { + ConnectivityMonitor { + conn_state: ConnState::new(now, Default::default()), + initial_ping_timestamp: None, + num_pings_sent: 0, + pinger, + close_receiver, + tunnel_handle, + } + } + + fn connected_state(timestamp: Instant) -> ConnState { + ConnState::Connected { + rx_timestamp: timestamp, + tx_timestamp: timestamp, + stats: stats::Stats { + tx_bytes: 0, + rx_bytes: 0, + }, + } + } + + + #[test] + /// Verify that `check_connectivity()` returns `false` if the tunnel is connected and traffic is + /// not flowing after `BYTES_RX_TIMEOUT` and `PING_TIMEOUT`. + fn test_ping_times_out() { + let (_tunnel_anchor, tunnel) = MockTunnel::never_incrementing().into_locked(); + let (_tx, rx) = mpsc::channel(); + let pinger = MockPinger::default(); + let now = Instant::now(); + let start = now - (BYTES_RX_TIMEOUT + PING_TIMEOUT + Duration::from_secs(10)); + let mut monitor = mock_monitor(start, Box::new(pinger), tunnel, rx); + + // Mock the state - connectivity has been established + monitor.conn_state = connected_state(start); + // A ping was sent to verify connectivity + monitor.maybe_send_ping(start).unwrap(); + assert!(!monitor.check_connectivity(now).unwrap()) + } + + #[test] + /// Verify that `check_connectivity()` returns `true` if the tunnel is connected and traffic is + /// flowing constantly. + fn test_no_connection_on_start() { + let (_tunnel_anchor, tunnel) = MockTunnel::never_incrementing().into_locked(); + let (_tx, rx) = mpsc::channel(); + let pinger = MockPinger::default(); + let now = Instant::now(); + let start = now - Duration::from_secs(1); + let mut monitor = mock_monitor(start, Box::new(pinger), tunnel, rx); + + assert!(!monitor.check_connectivity(now).unwrap()) + } + + #[test] + /// Verify that `check_connectivity()` returns `true` if the tunnel is connected and traffic is + /// flowing constantly. + fn test_connection_works() { + let (_tunnel_anchor, tunnel) = MockTunnel::always_incrementing().into_locked(); + let (_tx, rx) = mpsc::channel(); + let pinger = MockPinger::default(); + let now = Instant::now(); + let start = now - Duration::from_secs(1); + let mut monitor = mock_monitor(start, Box::new(pinger), tunnel, rx); + + // Mock the state - connectivity has been established + monitor.conn_state = connected_state(start); + + assert!(monitor.check_connectivity(now).unwrap()) + } + + #[test] + /// Verify that the connectivity monitor doesn't fail if the tunnel constantly sends traffic, + /// and it shuts down properly. + fn test_wait_loop() { + let (result_tx, result_rx) = mpsc::channel(); + let (_tunnel_anchor, tunnel) = MockTunnel::always_incrementing().into_locked(); + let pinger = MockPinger::default(); + let (stop_tx, stop_rx) = mpsc::channel(); + std::thread::spawn(move || { + let now = Instant::now(); + let start = now - Duration::from_secs(1); + let mut monitor = mock_monitor(start, Box::new(pinger), tunnel, stop_rx); + + let start_result = monitor.establish_connectivity(); + result_tx.send(start_result).unwrap(); + + let result = monitor.run().map(|_| true); + result_tx.send(result).unwrap(); + }); + + std::thread::sleep(Duration::from_secs(1)); + assert_eq!(true, result_rx.try_recv().unwrap().unwrap()); + stop_tx.send(()).unwrap(); + std::thread::sleep(Duration::from_secs(1)); + assert!(result_rx.try_recv().unwrap().is_ok()); + } + + #[test] + /// Verify that the connectivity monitor detects the tunnel timing out after no longer than + /// `BYTES_RX_TIMEOUT` and `PING_TIMEOUT` combined. + fn test_wait_loop_timeout() { + let should_stop = Arc::new(AtomicBool::new(false)); + let should_stop_inner = should_stop.clone(); + + let tunnel_stats = Mutex::new(stats::Stats { + rx_bytes: 0, + tx_bytes: 0, + }); + + let pinger = MockPinger::default(); + let (_tunnel_anchor, tunnel) = MockTunnel::new(move || { + let mut tunnel_stats = tunnel_stats.lock().unwrap(); + if !should_stop_inner.load(Ordering::SeqCst) { + tunnel_stats.rx_bytes += 1; + } + tunnel_stats.tx_bytes += 1; + Ok(tunnel_stats.clone()) + }) + .into_locked(); + + let (result_tx, result_rx) = mpsc::channel(); + + let (_stop_tx, stop_rx) = mpsc::channel(); + std::thread::spawn(move || { + let now = Instant::now(); + let start = now - Duration::from_secs(1); + let mut monitor = mock_monitor(start, Box::new(pinger), tunnel, stop_rx); + let start_result = monitor.establish_connectivity(); + result_tx.send(start_result).unwrap(); + let end_result = monitor.run().map(|_| true); + result_tx.send(end_result).expect("Failed to send result"); + }); + assert!(result_rx + .recv_timeout(Duration::from_secs(1)) + .unwrap() + .unwrap()); + should_stop.store(true, Ordering::SeqCst); + assert!(result_rx + .recv_timeout(BYTES_RX_TIMEOUT + PING_TIMEOUT + Duration::from_secs(2)) + .unwrap() + .is_ok()); + } } |
