diff options
| author | Emīls <emils@mullvad.net> | 2022-01-24 11:55:52 +0000 |
|---|---|---|
| committer | Odd Stranne <odd@mullvad.net> | 2022-01-31 12:17:47 +0100 |
| commit | 51e7c8a43e76c401f80cfe3e12ba310780a30368 (patch) | |
| tree | 22ff7865a4ec3a94c72d2a548a188c52cfea363f | |
| parent | ebcd2f92402b7894bdfacd7d5db9eb274d982f61 (diff) | |
| download | mullvadvpn-51e7c8a43e76c401f80cfe3e12ba310780a30368.tar.xz mullvadvpn-51e7c8a43e76c401f80cfe3e12ba310780a30368.zip | |
Separate relays.rs into multiple modules
| -rw-r--r-- | mullvad-daemon/src/relays/matcher.rs | 313 | ||||
| -rw-r--r-- | mullvad-daemon/src/relays/mod.rs (renamed from mullvad-daemon/src/relays.rs) | 588 | ||||
| -rw-r--r-- | mullvad-daemon/src/relays/updater.rs | 211 | ||||
| -rw-r--r-- | mullvad-types/src/endpoint.rs | 8 |
4 files changed, 570 insertions, 550 deletions
diff --git a/mullvad-daemon/src/relays/matcher.rs b/mullvad-daemon/src/relays/matcher.rs new file mode 100644 index 0000000000..08924a78e9 --- /dev/null +++ b/mullvad-daemon/src/relays/matcher.rs @@ -0,0 +1,313 @@ +use mullvad_types::{ + endpoint::{MullvadEndpoint, MullvadWireguardEndpoint}, + relay_constraints::{ + Constraint, LocationConstraint, Match, OpenVpnConstraints, Providers, RelayConstraints, + TransportPort, WireguardConstraints, + }, + relay_list::{Relay, RelayTunnels, WireguardEndpointData}, +}; +use rand::{seq::SliceRandom, Rng}; +use std::net::{IpAddr, SocketAddr}; +use talpid_types::net::{all_of_the_internet, wireguard, IpVersion, TransportProtocol, TunnelType}; + +#[derive(Clone)] +pub struct RelayMatcher<T: TunnelMatcher> { + pub location: Constraint<LocationConstraint>, + pub providers: Constraint<Providers>, + pub tunnel: T, +} + +impl From<RelayConstraints> for RelayMatcher<AnyTunnelMatcher> { + fn from(constraints: RelayConstraints) -> Self { + Self { + location: constraints.location, + providers: constraints.providers, + tunnel: AnyTunnelMatcher { + wireguard: constraints.wireguard_constraints.into(), + openvpn: constraints.openvpn_constraints, + tunnel_type: constraints.tunnel_protocol, + }, + } + } +} + +impl RelayMatcher<AnyTunnelMatcher> { + pub fn to_wireguard_matcher(self) -> RelayMatcher<WireguardMatcher> { + RelayMatcher { + tunnel: self.tunnel.wireguard, + location: self.location, + providers: self.providers, + } + } +} + +impl RelayMatcher<WireguardMatcher> { + pub fn set_peer(&mut self, peer: Relay) { + self.tunnel.peer = Some(peer); + } +} + +impl<T: TunnelMatcher> RelayMatcher<T> { + /// Filter a relay and its endpoints based on constraints. + /// Only matching endpoints are included in the returned Relay. + pub fn filter_matching_relay(&self, relay: &Relay) -> Option<Relay> { + if !self.location.matches(relay) || !self.providers.matches(relay) { + return None; + } + + self.tunnel.filter_matching_endpoints(relay) + } + + pub fn mullvad_endpoint(&self, relay: &Relay) -> Option<MullvadEndpoint> { + self.tunnel.mullvad_endpoint(relay) + } +} + +/// TunnelMatcher allows to abstract over different tunnel-specific constraints, +/// as to not have false dependencies on OpenVpn specific constraints when +/// selecting only WireGuard tunnels. +pub trait TunnelMatcher: Clone { + /// Filter a relay and its endpoints based on constraints. + /// Only matching endpoints are included in the returned Relay. + fn filter_matching_endpoints(&self, relay: &Relay) -> Option<Relay>; + /// Constructs a MullvadEndpoint for a given Relay using extra data from the relay matcher + /// itself. + fn mullvad_endpoint(&self, relay: &Relay) -> Option<MullvadEndpoint>; +} + +impl TunnelMatcher for OpenVpnMatcher { + fn filter_matching_endpoints(&self, relay: &Relay) -> Option<Relay> { + let tunnels = relay + .tunnels + .openvpn + .iter() + .filter(|endpoint| self.matches(endpoint)) + .cloned() + .collect::<Vec<_>>(); + if tunnels.is_empty() { + return None; + } + let mut relay = relay.clone(); + relay.tunnels = RelayTunnels { + openvpn: tunnels, + wireguard: vec![], + }; + Some(relay) + } + + fn mullvad_endpoint(&self, relay: &Relay) -> Option<MullvadEndpoint> { + relay + .tunnels + .openvpn + .choose(&mut rand::thread_rng()) + .cloned() + .map(|endpoint| endpoint.into_mullvad_endpoint(relay.ipv4_addr_in.into())) + } +} + +pub type OpenVpnMatcher = OpenVpnConstraints; + +#[derive(Clone)] +pub struct AnyTunnelMatcher { + wireguard: WireguardMatcher, + openvpn: OpenVpnMatcher, + /// in the case that a user hasn't specified a tunnel protocol, the relay + /// selector might still construct preferred constraints that do select a + /// specific tunnel protocol, which is why the tunnel type may be specified + /// in the `AnyTunnelMatcher`. + tunnel_type: Constraint<TunnelType>, +} + +impl TunnelMatcher for AnyTunnelMatcher { + fn filter_matching_endpoints(&self, relay: &Relay) -> Option<Relay> { + match self.tunnel_type { + Constraint::Any => { + let wireguard_relay = self.wireguard.filter_matching_endpoints(relay); + let openvpn_relay = self.openvpn.filter_matching_endpoints(relay); + + match (wireguard_relay, openvpn_relay) { + (Some(mut matched_relay), Some(openvpn_relay)) => { + matched_relay.tunnels.openvpn = openvpn_relay.tunnels.openvpn; + Some(matched_relay) + } + (Some(relay), None) | (None, Some(relay)) => Some(relay), + _ => None, + } + } + Constraint::Only(TunnelType::OpenVpn) => self.openvpn.filter_matching_endpoints(relay), + Constraint::Only(TunnelType::Wireguard) => { + self.wireguard.filter_matching_endpoints(relay) + } + } + } + + fn mullvad_endpoint(&self, relay: &Relay) -> Option<MullvadEndpoint> { + #[cfg(not(target_os = "android"))] + match self.tunnel_type { + Constraint::Any => vec![ + self.openvpn.mullvad_endpoint(relay), + self.wireguard.mullvad_endpoint(relay), + ] + .into_iter() + .filter_map(|relay| relay) + .collect::<Vec<_>>() + .choose(&mut rand::thread_rng()) + .cloned(), + Constraint::Only(TunnelType::OpenVpn) => self.openvpn.mullvad_endpoint(relay), + Constraint::Only(TunnelType::Wireguard) => self.wireguard.mullvad_endpoint(relay), + } + + #[cfg(target_os = "android")] + self.wireguard.mullvad_endpoint(relay) + } +} + +#[derive(Clone)] +pub struct WireguardMatcher { + /// The peer is an already selected peer relay to be used with multihop. + /// It's stored here so we can exclude it from further selections being made. + pub peer: Option<Relay>, + pub port: Constraint<TransportPort>, + pub ip_version: Constraint<IpVersion>, +} + +impl WireguardMatcher { + fn wg_data_to_endpoint( + &self, + relay: &Relay, + data: WireguardEndpointData, + ) -> Option<MullvadEndpoint> { + let host = self.get_address_for_wireguard_relay(relay)?; + let port = self.get_port_for_wireguard_relay(&data)?; + let peer_config = wireguard::PeerConfig { + public_key: data.public_key, + endpoint: SocketAddr::new(host, port), + allowed_ips: all_of_the_internet(), + protocol: self + .port + .map(|port| port.protocol) + .unwrap_or(TransportProtocol::Udp), + }; + Some(MullvadEndpoint::Wireguard(MullvadWireguardEndpoint { + peer: peer_config, + exit_peer: None, + ipv4_gateway: data.ipv4_gateway, + ipv6_gateway: data.ipv6_gateway, + })) + } + + fn get_address_for_wireguard_relay(&self, relay: &Relay) -> Option<IpAddr> { + match self.ip_version { + Constraint::Any | Constraint::Only(IpVersion::V4) => Some(relay.ipv4_addr_in.into()), + Constraint::Only(IpVersion::V6) => relay.ipv6_addr_in.map(|addr| addr.into()), + } + } + + fn get_port_for_wireguard_relay(&self, data: &WireguardEndpointData) -> Option<u16> { + match self + .port + .as_ref() + .map(|port| port.port) + .unwrap_or(Constraint::Any) + { + Constraint::Any => { + let get_port_amount = + |range: &(u16, u16)| -> u64 { (1 + range.1 - range.0) as u64 }; + let port_amount: u64 = data.port_ranges.iter().map(get_port_amount).sum(); + + if port_amount < 1 { + return None; + } + + let mut port_index = rand::thread_rng().gen_range(0, port_amount); + + for range in data.port_ranges.iter() { + let ports_in_range = get_port_amount(range); + if port_index < ports_in_range { + return Some(port_index as u16 + range.0); + } + port_index -= ports_in_range; + } + log::error!("Port selection algorithm is broken!"); + None + } + Constraint::Only(port) => { + if data + .port_ranges + .iter() + .any(|range| (range.0 <= port && port <= range.1)) + { + Some(port) + } else { + None + } + } + } + } +} + +impl From<WireguardConstraints> for WireguardMatcher { + fn from(constraints: WireguardConstraints) -> Self { + Self { + peer: None, + port: constraints.port, + ip_version: constraints.ip_version, + } + } +} + +impl Match<WireguardEndpointData> for WireguardMatcher { + fn matches(&self, endpoint: &WireguardEndpointData) -> bool { + match self + .port + .as_ref() + .map(|port| port.port) + .unwrap_or(Constraint::Any) + { + Constraint::Any => true, + Constraint::Only(port) => endpoint + .port_ranges + .iter() + .any(|range| (port >= range.0 && port <= range.1)), + } + } +} + +impl TunnelMatcher for WireguardMatcher { + fn filter_matching_endpoints(&self, relay: &Relay) -> Option<Relay> { + if self + .peer + .as_ref() + .map(|peer_relay| peer_relay.hostname == relay.hostname) + .unwrap_or(false) + { + return None; + } + + let tunnels = relay + .tunnels + .wireguard + .iter() + .filter(|endpoint| self.matches(*endpoint)) + .cloned() + .collect::<Vec<_>>(); + if tunnels.is_empty() { + return None; + } + let mut relay = relay.clone(); + relay.tunnels = RelayTunnels { + wireguard: tunnels, + openvpn: vec![], + }; + Some(relay) + } + + fn mullvad_endpoint(&self, relay: &Relay) -> Option<MullvadEndpoint> { + relay + .tunnels + .wireguard + .choose(&mut rand::thread_rng()) + .cloned() + .and_then(|wg_tunnel| self.wg_data_to_endpoint(relay, wg_tunnel)) + } +} diff --git a/mullvad-daemon/src/relays.rs b/mullvad-daemon/src/relays/mod.rs index ac56bf75fd..1fead05f77 100644 --- a/mullvad-daemon/src/relays.rs +++ b/mullvad-daemon/src/relays/mod.rs @@ -2,13 +2,8 @@ //! updated as well. use chrono::{DateTime, Local}; -use futures::{ - channel::mpsc, - future::{Fuse, FusedFuture}, - FutureExt, SinkExt, StreamExt, -}; use ipnetwork::IpNetwork; -use mullvad_rpc::{availability::ApiAvailabilityHandle, rest::MullvadRestHandle, RelayListProxy}; +use mullvad_rpc::{availability::ApiAvailabilityHandle, rest::MullvadRestHandle}; use mullvad_types::{ endpoint::{MullvadEndpoint, MullvadWireguardEndpoint}, location::Location, @@ -16,39 +11,35 @@ use mullvad_types::{ BridgeState, Constraint, InternalBridgeConstraints, LocationConstraint, Match, OpenVpnConstraints, Providers, RelayConstraints, Set, TransportPort, WireguardConstraints, }, - relay_list::{Relay, RelayList, RelayTunnels, WireguardEndpointData}, + relay_list::{Relay, RelayList, WireguardEndpointData}, }; use parking_lot::Mutex; use rand::{self, seq::SliceRandom, Rng}; use std::{ - future::Future, io, - net::{IpAddr, SocketAddr}, - path::{Path, PathBuf}, + net::IpAddr, + path::Path, sync::Arc, - time::{self, Duration, Instant, SystemTime}, + time::{self, SystemTime}, }; -use talpid_core::future_retry::{retry_future, ExponentialBackoff, Jittered}; use talpid_types::{ - net::{ - all_of_the_internet, openvpn::ProxySettings, wireguard, IpVersion, TransportProtocol, - TunnelType, - }, + net::{openvpn::ProxySettings, wireguard, IpVersion, TransportProtocol, TunnelType}, ErrorExt, }; -use tokio::fs::File; + +use crate::relays::updater::RelayListUpdater; + +use self::{ + matcher::{RelayMatcher, TunnelMatcher, WireguardMatcher}, + updater::RelayListUpdaterHandle, +}; + +mod matcher; +mod updater; const DATE_TIME_FORMAT_STR: &str = "%Y-%m-%d %H:%M:%S%.3f"; const RELAYS_FILENAME: &str = "relays.json"; -/// How often the updater should wake up to check the cache of the in-memory cache of relays. -/// This check is very cheap. The only reason to not have it very often is because if downloading -/// constantly fails it will try very often and fill the logs etc. -const UPDATE_CHECK_INTERVAL: Duration = Duration::from_secs(60 * 15); -/// How old the cached relays need to be to trigger an update -const UPDATE_INTERVAL: Duration = Duration::from_secs(60 * 60); -const EXPONENTIAL_BACKOFF_INITIAL: Duration = Duration::from_secs(16); -const EXPONENTIAL_BACKOFF_FACTOR: u32 = 8; const DEFAULT_WIREGUARD_PORT: u16 = 51820; const WIREGUARD_EXIT_CONSTRAINTS: WireguardMatcher = WireguardMatcher { peer: None, @@ -353,12 +344,7 @@ impl RelaySelector { ) }; - Self::set_entry_peers( - exit_endpoint - .get_peer_config() - .expect("Failed to get peer config from WireGuard endpoint"), - &mut entry_endpoint, - ); + Self::set_entry_peers(&exit_endpoint.unwrap_wireguard().peer, &mut entry_endpoint); log::info!( "Selected entry relay {} at {} going through {} at {}", @@ -367,7 +353,7 @@ impl RelaySelector { exit_relay.hostname, exit_endpoint.to_endpoint().address.ip(), ); - let result = RelaySelectorResult::wireguard_endpoint_with_entry( + let result = RelaySelectorResult::wireguard_multihop_endpoint( exit_relay, entry_endpoint, entry_relay, @@ -544,15 +530,9 @@ impl RelaySelector { Constraint::Only(TunnelType::Wireguard) => { relay_constraints.wireguard_constraints = original_constraints.wireguard_constraints.clone(); - // This ensures that if after the first 2 failed attempts the daemon does not - // connect, then afterwards 2 of each 4 successive attempts will try to connect - // on port 53. - if retry_attempt % 4 > 1 && relay_constraints.wireguard_constraints.port.is_any() { + if relay_constraints.wireguard_constraints.port.is_any() { relay_constraints.wireguard_constraints.port = - Constraint::Only(TransportPort { - protocol: TransportProtocol::Udp, - port: Constraint::Only(53), - }); + Self::preferred_wireguard_port(retry_attempt); } } }; @@ -579,23 +559,20 @@ impl RelaySelector { .relays() .iter() .filter(|relay| relay.active) - .filter_map(|relay| matcher.matching_relay(relay)) + .filter_map(|relay| matcher.filter_matching_relay(relay)) .collect(); let relay = self .pick_random_relay(&matching_relays) .map(|relay| relay.clone()) .ok_or(Error::NoRelay)?; - let endpoint = matcher.mullvad_endpoint(&relay).ok_or(Error::NoRelay)?; - match endpoint { - MullvadEndpoint::Wireguard(endpoint) => Ok((relay, endpoint)), - _ => { - unreachable!( - "Entry endpoints should only ever be WireGuard endpoints, instead got a {:?}", - endpoint - ); - } - } + let endpoint = matcher + .mullvad_endpoint(&relay) + .ok_or(Error::NoRelay)? + .unwrap_wireguard() + .clone(); + + Ok((relay, endpoint)) } fn set_entry_peers( @@ -723,6 +700,9 @@ impl RelaySelector { } fn preferred_wireguard_port(retry_attempt: u32) -> Constraint<TransportPort> { + // This ensures that if after the first 2 failed attempts the daemon does not + // connect, then afterwards 2 of each 4 successive attempts will try to connect + // on port 53. let port = match retry_attempt % 4 { 0 | 1 => Constraint::Any, _ => Constraint::Only(53), @@ -761,7 +741,7 @@ impl RelaySelector { .relays() .iter() .filter(|relay| relay.active) - .filter_map(|relay| matcher.matching_relay(relay)) + .filter_map(|relay| matcher.filter_matching_relay(relay)) .collect(); self.pick_random_relay(&matching_relays) @@ -810,7 +790,7 @@ impl RelaySelector { if total_weight == 0 { relays.choose(&mut rng) } else { - // Pick a random number in the range 1 - total_weight. This choses the relay with a + // Pick a random number in the range 1..=total_weight. This choses the relay with a // non-zero weight. let mut i: u64 = rng.gen_range(1, total_weight + 1); Some( @@ -889,7 +869,7 @@ impl RelaySelectorResult { } } - fn wireguard_endpoint_with_entry( + fn wireguard_multihop_endpoint( exit_relay: Relay, endpoint: MullvadWireguardEndpoint, entry: Relay, @@ -902,489 +882,6 @@ impl RelaySelectorResult { } } -#[derive(Clone)] -pub struct RelayListUpdaterHandle { - tx: mpsc::Sender<()>, -} - -impl RelayListUpdaterHandle { - pub async fn update_relay_list(&mut self) -> Result<(), Error> { - self.tx - .send(()) - .await - .map_err(|_| Error::DownloaderShutDown) - } -} - -struct RelayListUpdater { - rpc_client: RelayListProxy, - cache_path: PathBuf, - parsed_relays: Arc<Mutex<ParsedRelays>>, - on_update: Box<dyn Fn(&RelayList) + Send + 'static>, - earliest_next_try: Instant, - api_availability: ApiAvailabilityHandle, -} - -impl RelayListUpdater { - pub fn new( - rpc_handle: MullvadRestHandle, - cache_path: PathBuf, - parsed_relays: Arc<Mutex<ParsedRelays>>, - on_update: Box<dyn Fn(&RelayList) + Send + 'static>, - api_availability: ApiAvailabilityHandle, - ) -> RelayListUpdaterHandle { - let (tx, cmd_rx) = mpsc::channel(1); - let service = rpc_handle.service(); - let rpc_client = RelayListProxy::new(rpc_handle); - let updater = RelayListUpdater { - rpc_client, - cache_path, - parsed_relays, - on_update, - earliest_next_try: Instant::now() + UPDATE_INTERVAL, - api_availability, - }; - - service.spawn(updater.run(cmd_rx)); - - RelayListUpdaterHandle { tx } - } - - async fn run(mut self, mut cmd_rx: mpsc::Receiver<()>) { - let mut check_interval = - tokio_stream::wrappers::IntervalStream::new(tokio::time::interval_at( - (Instant::now() + UPDATE_CHECK_INTERVAL).into(), - UPDATE_CHECK_INTERVAL, - )) - .fuse(); - let mut download_future = Box::pin(Fuse::terminated()); - loop { - futures::select! { - _check_update = check_interval.next() => { - if download_future.is_terminated() && self.should_update() { - let tag = self.parsed_relays.lock().tag().map(|tag| tag.to_string()); - download_future = Box::pin(Self::download_relay_list(self.api_availability.clone(), self.rpc_client.clone(), tag).fuse()); - self.earliest_next_try = Instant::now() + UPDATE_INTERVAL; - } - }, - - new_relay_list = download_future => { - self.consume_new_relay_list(new_relay_list).await; - - }, - - cmd = cmd_rx.next() => { - match cmd { - Some(()) => { - let tag = self.parsed_relays.lock().tag().map(|tag| tag.to_string()); - download_future = Box::pin(Self::download_relay_list(self.api_availability.clone(), self.rpc_client.clone(), tag).fuse()); - }, - None => { - log::trace!("Relay list updater shutting down"); - return; - } - } - } - - }; - } - } - - async fn consume_new_relay_list( - &mut self, - result: Result<Option<RelayList>, mullvad_rpc::Error>, - ) { - match result { - Ok(Some(relay_list)) => { - if let Err(err) = self.update_cache(relay_list).await { - log::error!("Failed to update relay list cache: {}", err); - } - } - Ok(None) => log::debug!("Relay list is up-to-date"), - Err(err) => { - log::error!( - "Failed to fetch new relay list: {}. Will retry in {} minutes", - err, - self.earliest_next_try - .saturating_duration_since(Instant::now()) - .as_secs() - / 60 - ); - } - } - } - - /// Returns true if the current parsed_relays is older than UPDATE_INTERVAL - fn should_update(&mut self) -> bool { - match SystemTime::now().duration_since(self.parsed_relays.lock().last_updated()) { - Ok(duration) => duration > UPDATE_INTERVAL && self.earliest_next_try <= Instant::now(), - // If the clock is skewed we have no idea by how much or when the last update - // actually was, better download again to get in sync and get a `last_updated` - // timestamp corresponding to the new time. - Err(_) => true, - } - } - - fn download_relay_list( - api_handle: ApiAvailabilityHandle, - rpc_handle: RelayListProxy, - tag: Option<String>, - ) -> impl Future<Output = Result<Option<RelayList>, mullvad_rpc::Error>> + 'static { - let download_futures = move || { - let available = api_handle.wait_background(); - let req = rpc_handle.relay_list(tag.clone()); - async move { - available.await?; - req.await.map_err(mullvad_rpc::Error::from) - } - }; - - let exponential_backoff = - ExponentialBackoff::new(EXPONENTIAL_BACKOFF_INITIAL, EXPONENTIAL_BACKOFF_FACTOR) - .max_delay(UPDATE_INTERVAL * 2); - - let download_future = retry_future( - download_futures, - |result| result.is_err(), - Jittered::jitter(exponential_backoff), - ); - download_future - } - - async fn update_cache(&mut self, new_relay_list: RelayList) -> Result<(), Error> { - if let Err(error) = Self::cache_relays(&self.cache_path, &new_relay_list).await { - log::error!( - "{}", - error.display_chain_with_msg("Failed to update relay cache on disk") - ); - } - - let new_parsed_relays = ParsedRelays::from_relay_list(new_relay_list, SystemTime::now()); - log::info!( - "Downloaded relay inventory has {} relays", - new_parsed_relays.relays().len() - ); - - let mut parsed_relays = self.parsed_relays.lock(); - *parsed_relays = new_parsed_relays; - (self.on_update)(parsed_relays.locations()); - Ok(()) - } - - /// Write a `RelayList` to the cache file. - async fn cache_relays(cache_path: &Path, relays: &RelayList) -> Result<(), Error> { - log::debug!("Writing relays cache to {}", cache_path.display()); - let mut file = File::create(cache_path) - .await - .map_err(Error::OpenRelayCache)?; - let bytes = serde_json::to_vec_pretty(relays).map_err(Error::Serialize)?; - let mut slice: &[u8] = bytes.as_slice(); - let _ = tokio::io::copy(&mut slice, &mut file) - .await - .map_err(Error::WriteRelayCache)?; - Ok(()) - } -} - -#[derive(Clone)] -struct RelayMatcher<T: TunnelMatcher> { - location: Constraint<LocationConstraint>, - providers: Constraint<Providers>, - tunnel: T, -} - -impl From<RelayConstraints> for RelayMatcher<AnyTunnelMatcher> { - fn from(constraints: RelayConstraints) -> Self { - Self { - location: constraints.location, - providers: constraints.providers, - tunnel: AnyTunnelMatcher { - wireguard: constraints.wireguard_constraints.into(), - openvpn: constraints.openvpn_constraints, - tunnel_type: constraints.tunnel_protocol, - }, - } - } -} - -impl RelayMatcher<AnyTunnelMatcher> { - fn to_wireguard_matcher(self) -> RelayMatcher<WireguardMatcher> { - let Self { - location, - providers, - tunnel, - } = self; - - let tunnel = tunnel.wireguard; - RelayMatcher { - tunnel, - location, - providers, - } - } -} - -impl RelayMatcher<WireguardMatcher> { - pub fn set_peer(&mut self, peer: Relay) { - self.tunnel.peer = Some(peer); - } -} - -impl<T: TunnelMatcher> RelayMatcher<T> { - fn matching_relay(&self, relay: &Relay) -> Option<Relay> { - if !self.location.matches(relay) || !self.providers.matches(relay) { - return None; - } - - self.tunnel.matching_relay(relay) - } - - fn mullvad_endpoint(&self, relay: &Relay) -> Option<MullvadEndpoint> { - self.tunnel.mullvad_endpoint(relay) - } -} -/// TunnelMatcher allows to abstract over different tunnel-specific constraints, -/// as to not have false dependencies on OpenVpn specific constraints when -/// selecting only WireGuard tunnels. -trait TunnelMatcher: Clone { - /// Check if given relay matches tunnel-specific constraints. - fn matching_relay(&self, relay: &Relay) -> Option<Relay>; - /// Constructs a MullvadEndpoint for a given Relay using extra data from the relay matcher - /// itself. - fn mullvad_endpoint(&self, relay: &Relay) -> Option<MullvadEndpoint>; -} - -impl TunnelMatcher for OpenVpnConstraints { - fn matching_relay(&self, relay: &Relay) -> Option<Relay> { - let tunnels = relay - .tunnels - .openvpn - .iter() - .filter(|endpoint| self.matches(endpoint)) - .cloned() - .collect::<Vec<_>>(); - if tunnels.is_empty() { - return None; - } - let mut relay = relay.clone(); - relay.tunnels = RelayTunnels { - openvpn: tunnels, - wireguard: vec![], - }; - Some(relay) - } - - fn mullvad_endpoint(&self, relay: &Relay) -> Option<MullvadEndpoint> { - relay - .tunnels - .openvpn - .choose(&mut rand::thread_rng()) - .cloned() - .map(|endpoint| endpoint.into_mullvad_endpoint(relay.ipv4_addr_in.into())) - } -} - -#[derive(Clone)] -struct AnyTunnelMatcher { - wireguard: WireguardMatcher, - openvpn: OpenVpnConstraints, - /// in the case that a user hasn't specified a tunnel protocol, the relay - /// selector might still construct preferred constraints that do select a - /// specific tunnel protocol, which is why the tunnel type may be specified - /// in the `AnyTunnelMatcher`. - tunnel_type: Constraint<TunnelType>, -} - -impl TunnelMatcher for AnyTunnelMatcher { - fn matching_relay(&self, relay: &Relay) -> Option<Relay> { - match self.tunnel_type { - Constraint::Any => { - let wireguard_relay = self.wireguard.matching_relay(relay); - let openvpn_relay = self.openvpn.matching_relay(relay); - - match (wireguard_relay, openvpn_relay) { - (Some(mut matched_relay), Some(openvpn_relay)) => { - matched_relay.tunnels.openvpn = openvpn_relay.tunnels.openvpn; - Some(matched_relay) - } - (Some(relay), None) | (None, Some(relay)) => Some(relay), - _ => None, - } - } - Constraint::Only(TunnelType::OpenVpn) => self.openvpn.matching_relay(relay), - Constraint::Only(TunnelType::Wireguard) => self.wireguard.matching_relay(relay), - } - } - - fn mullvad_endpoint(&self, relay: &Relay) -> Option<MullvadEndpoint> { - #[cfg(not(target_os = "android"))] - match self.tunnel_type { - Constraint::Any => vec![ - self.openvpn.mullvad_endpoint(relay), - self.wireguard.mullvad_endpoint(relay), - ] - .into_iter() - .filter_map(|relay| relay) - .collect::<Vec<_>>() - .choose(&mut rand::thread_rng()) - .cloned(), - Constraint::Only(TunnelType::OpenVpn) => self.openvpn.mullvad_endpoint(relay), - Constraint::Only(TunnelType::Wireguard) => self.wireguard.mullvad_endpoint(relay), - } - - #[cfg(target_os = "android")] - self.wireguard_matcher.mullvad_endpoint(relay) - } -} - -#[derive(Clone)] -struct WireguardMatcher { - peer: Option<Relay>, - port: Constraint<TransportPort>, - ip_version: Constraint<IpVersion>, -} - -impl WireguardMatcher { - fn wg_data_to_endpoint( - &self, - relay: &Relay, - data: WireguardEndpointData, - ) -> Option<MullvadEndpoint> { - let host = self.get_address_for_wireguard_relay(relay)?; - let port = self.get_port_for_wireguard_relay(&data)?; - let peer_config = wireguard::PeerConfig { - public_key: data.public_key, - endpoint: SocketAddr::new(host, port), - allowed_ips: all_of_the_internet(), - protocol: self - .port - .map(|port| port.protocol) - .unwrap_or(TransportProtocol::Udp), - }; - Some(MullvadEndpoint::Wireguard(MullvadWireguardEndpoint { - peer: peer_config, - exit_peer: None, - ipv4_gateway: data.ipv4_gateway, - ipv6_gateway: data.ipv6_gateway, - })) - } - - fn get_address_for_wireguard_relay(&self, relay: &Relay) -> Option<IpAddr> { - match self.ip_version { - Constraint::Any | Constraint::Only(IpVersion::V4) => Some(relay.ipv4_addr_in.into()), - Constraint::Only(IpVersion::V6) => relay.ipv6_addr_in.map(|addr| addr.into()), - } - } - - fn get_port_for_wireguard_relay(&self, data: &WireguardEndpointData) -> Option<u16> { - match self - .port - .as_ref() - .map(|port| port.port) - .unwrap_or(Constraint::Any) - { - Constraint::Any => { - let get_port_amount = - |range: &(u16, u16)| -> u64 { (1 + range.1 - range.0) as u64 }; - let port_amount: u64 = data.port_ranges.iter().map(get_port_amount).sum(); - - if port_amount < 1 { - return None; - } - - let mut port_index = rand::thread_rng().gen_range(0, port_amount); - - for range in data.port_ranges.iter() { - let ports_in_range = get_port_amount(range); - if port_index < ports_in_range { - return Some(port_index as u16 + range.0); - } - port_index -= ports_in_range; - } - log::error!("Port selection algorithm is broken!"); - None - } - Constraint::Only(port) => { - if data - .port_ranges - .iter() - .any(|range| (range.0 <= port && port <= range.1)) - { - Some(port) - } else { - None - } - } - } - } -} - -impl From<WireguardConstraints> for WireguardMatcher { - fn from(constraints: WireguardConstraints) -> Self { - Self { - peer: None, - port: constraints.port, - ip_version: constraints.ip_version, - } - } -} - -impl Match<WireguardEndpointData> for WireguardMatcher { - fn matches(&self, endpoint: &WireguardEndpointData) -> bool { - match self - .port - .as_ref() - .map(|port| port.port) - .unwrap_or(Constraint::Any) - { - Constraint::Any => true, - Constraint::Only(port) => endpoint - .port_ranges - .iter() - .any(|range| (port >= range.0 && port <= range.1)), - } - } -} - -impl TunnelMatcher for WireguardMatcher { - fn matching_relay(&self, relay: &Relay) -> Option<Relay> { - if self - .peer - .as_ref() - .map(|peer_relay| peer_relay.hostname == relay.hostname) - .unwrap_or(false) - { - return None; - } - - let tunnels = relay - .tunnels - .wireguard - .iter() - .filter(|endpoint| self.matches(*endpoint)) - .cloned() - .collect::<Vec<_>>(); - if tunnels.is_empty() { - return None; - } - let mut relay = relay.clone(); - relay.tunnels = RelayTunnels { - wireguard: tunnels, - openvpn: vec![], - }; - Some(relay) - } - - fn mullvad_endpoint(&self, relay: &Relay) -> Option<MullvadEndpoint> { - relay - .tunnels - .wireguard - .choose(&mut rand::thread_rng()) - .cloned() - .and_then(|wg_tunnel| self.wg_data_to_endpoint(relay, wg_tunnel)) - } -} - #[cfg(test)] mod test { use super::*; @@ -1675,16 +1172,13 @@ mod test { .map_err(|error| error.to_string())?; assert_eq!(exit_relay.hostname, specific_hostname); - match endpoint { - MullvadEndpoint::OpenVpn { .. } => return Err("Expected WireGuard relay".to_string()), - MullvadEndpoint::Wireguard(endpoint) => { - assert_eq!( - exit_relay.ipv4_addr_in, - endpoint.exit_peer.unwrap().endpoint.ip() - ); - assert_ne!(exit_relay.ipv4_addr_in, endpoint.peer.endpoint.ip()); - } - } + + let endpoint = endpoint.unwrap_wireguard(); + assert_eq!( + exit_relay.ipv4_addr_in, + endpoint.exit_peer.as_ref().unwrap().endpoint.ip() + ); + assert_ne!(exit_relay.ipv4_addr_in, endpoint.peer.endpoint.ip()); Ok(()) } diff --git a/mullvad-daemon/src/relays/updater.rs b/mullvad-daemon/src/relays/updater.rs new file mode 100644 index 0000000000..521eee3cda --- /dev/null +++ b/mullvad-daemon/src/relays/updater.rs @@ -0,0 +1,211 @@ +use super::{Error, ParsedRelays}; +use futures::{ + channel::mpsc, + future::{Fuse, FusedFuture}, + Future, FutureExt, SinkExt, StreamExt, +}; +use mullvad_rpc::{availability::ApiAvailabilityHandle, rest::MullvadRestHandle, RelayListProxy}; +use mullvad_types::relay_list::RelayList; +use parking_lot::Mutex; +use std::{ + path::{Path, PathBuf}, + sync::Arc, + time::{Duration, Instant, SystemTime}, +}; +use talpid_core::future_retry::{retry_future, ExponentialBackoff, Jittered}; +use talpid_types::ErrorExt; +use tokio::fs::File; + +/// How often the updater should wake up to check the cache of the in-memory cache of relays. +/// This check is very cheap. The only reason to not have it very often is because if downloading +/// constantly fails it will try very often and fill the logs etc. +const UPDATE_CHECK_INTERVAL: Duration = Duration::from_secs(60 * 15); +/// How old the cached relays need to be to trigger an update +const UPDATE_INTERVAL: Duration = Duration::from_secs(60 * 60); + +const EXPONENTIAL_BACKOFF_INITIAL: Duration = Duration::from_secs(16); +const EXPONENTIAL_BACKOFF_FACTOR: u32 = 8; + +#[derive(Clone)] +pub struct RelayListUpdaterHandle { + tx: mpsc::Sender<()>, +} + +impl RelayListUpdaterHandle { + pub async fn update_relay_list(&mut self) -> Result<(), Error> { + self.tx + .send(()) + .await + .map_err(|_| Error::DownloaderShutDown) + } +} + +pub struct RelayListUpdater { + rpc_client: RelayListProxy, + cache_path: PathBuf, + parsed_relays: Arc<Mutex<ParsedRelays>>, + on_update: Box<dyn Fn(&RelayList) + Send + 'static>, + earliest_next_try: Instant, + api_availability: ApiAvailabilityHandle, +} + +impl RelayListUpdater { + pub(super) fn new( + rpc_handle: MullvadRestHandle, + cache_path: PathBuf, + parsed_relays: Arc<Mutex<ParsedRelays>>, + on_update: Box<dyn Fn(&RelayList) + Send + 'static>, + api_availability: ApiAvailabilityHandle, + ) -> RelayListUpdaterHandle { + let (tx, cmd_rx) = mpsc::channel(1); + let service = rpc_handle.service(); + let rpc_client = RelayListProxy::new(rpc_handle); + let updater = RelayListUpdater { + rpc_client, + cache_path, + parsed_relays, + on_update, + earliest_next_try: Instant::now() + UPDATE_INTERVAL, + api_availability, + }; + + service.spawn(updater.run(cmd_rx)); + + RelayListUpdaterHandle { tx } + } + + async fn run(mut self, mut cmd_rx: mpsc::Receiver<()>) { + let mut check_interval = + tokio_stream::wrappers::IntervalStream::new(tokio::time::interval_at( + (Instant::now() + UPDATE_CHECK_INTERVAL).into(), + UPDATE_CHECK_INTERVAL, + )) + .fuse(); + let mut download_future = Box::pin(Fuse::terminated()); + loop { + futures::select! { + _check_update = check_interval.next() => { + if download_future.is_terminated() && self.should_update() { + let tag = self.parsed_relays.lock().tag().map(|tag| tag.to_string()); + download_future = Box::pin(Self::download_relay_list(self.api_availability.clone(), self.rpc_client.clone(), tag).fuse()); + self.earliest_next_try = Instant::now() + UPDATE_INTERVAL; + } + }, + + new_relay_list = download_future => { + self.consume_new_relay_list(new_relay_list).await; + + }, + + cmd = cmd_rx.next() => { + match cmd { + Some(()) => { + let tag = self.parsed_relays.lock().tag().map(|tag| tag.to_string()); + download_future = Box::pin(Self::download_relay_list(self.api_availability.clone(), self.rpc_client.clone(), tag).fuse()); + }, + None => { + log::trace!("Relay list updater shutting down"); + return; + } + } + } + + }; + } + } + + async fn consume_new_relay_list( + &mut self, + result: Result<Option<RelayList>, mullvad_rpc::Error>, + ) { + match result { + Ok(Some(relay_list)) => { + if let Err(err) = self.update_cache(relay_list).await { + log::error!("Failed to update relay list cache: {}", err); + } + } + Ok(None) => log::debug!("Relay list is up-to-date"), + Err(err) => { + log::error!( + "Failed to fetch new relay list: {}. Will retry in {} minutes", + err, + self.earliest_next_try + .saturating_duration_since(Instant::now()) + .as_secs() + / 60 + ); + } + } + } + + /// Returns true if the current parsed_relays is older than UPDATE_INTERVAL + fn should_update(&mut self) -> bool { + match SystemTime::now().duration_since(self.parsed_relays.lock().last_updated()) { + Ok(duration) => duration > UPDATE_INTERVAL && self.earliest_next_try <= Instant::now(), + // If the clock is skewed we have no idea by how much or when the last update + // actually was, better download again to get in sync and get a `last_updated` + // timestamp corresponding to the new time. + Err(_) => true, + } + } + + fn download_relay_list( + api_handle: ApiAvailabilityHandle, + rpc_handle: RelayListProxy, + tag: Option<String>, + ) -> impl Future<Output = Result<Option<RelayList>, mullvad_rpc::Error>> + 'static { + let download_futures = move || { + let available = api_handle.wait_background(); + let req = rpc_handle.relay_list(tag.clone()); + async move { + available.await?; + req.await.map_err(mullvad_rpc::Error::from) + } + }; + + let exponential_backoff = + ExponentialBackoff::new(EXPONENTIAL_BACKOFF_INITIAL, EXPONENTIAL_BACKOFF_FACTOR) + .max_delay(UPDATE_INTERVAL * 2); + + let download_future = retry_future( + download_futures, + |result| result.is_err(), + Jittered::jitter(exponential_backoff), + ); + download_future + } + + async fn update_cache(&mut self, new_relay_list: RelayList) -> Result<(), Error> { + if let Err(error) = Self::cache_relays(&self.cache_path, &new_relay_list).await { + log::error!( + "{}", + error.display_chain_with_msg("Failed to update relay cache on disk") + ); + } + + let new_parsed_relays = ParsedRelays::from_relay_list(new_relay_list, SystemTime::now()); + log::info!( + "Downloaded relay inventory has {} relays", + new_parsed_relays.relays().len() + ); + + let mut parsed_relays = self.parsed_relays.lock(); + *parsed_relays = new_parsed_relays; + (self.on_update)(parsed_relays.locations()); + Ok(()) + } + + /// Write a `RelayList` to the cache file. + async fn cache_relays(cache_path: &Path, relays: &RelayList) -> Result<(), Error> { + log::debug!("Writing relays cache to {}", cache_path.display()); + let mut file = File::create(cache_path) + .await + .map_err(Error::OpenRelayCache)?; + let bytes = serde_json::to_vec_pretty(relays).map_err(Error::Serialize)?; + let mut slice: &[u8] = bytes.as_slice(); + let _ = tokio::io::copy(&mut slice, &mut file) + .await + .map_err(Error::WriteRelayCache)?; + Ok(()) + } +} diff --git a/mullvad-types/src/endpoint.rs b/mullvad-types/src/endpoint.rs index 11049c0026..bfba11a481 100644 --- a/mullvad-types/src/endpoint.rs +++ b/mullvad-types/src/endpoint.rs @@ -36,10 +36,12 @@ impl MullvadEndpoint { } } - pub fn get_peer_config(&self) -> Option<&wireguard::PeerConfig> { + pub fn unwrap_wireguard(&self) -> &MullvadWireguardEndpoint { match self { - Self::Wireguard(wireguard_endpoint) => Some(&wireguard_endpoint.peer), - _ => None, + Self::Wireguard(endpoint) => endpoint, + other => { + panic!("Expected WireGuard enum variant but got {:?}", other); + } } } } |
