summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorEmīls <emils@mullvad.net>2022-01-24 11:55:52 +0000
committerOdd Stranne <odd@mullvad.net>2022-01-31 12:17:47 +0100
commit51e7c8a43e76c401f80cfe3e12ba310780a30368 (patch)
tree22ff7865a4ec3a94c72d2a548a188c52cfea363f
parentebcd2f92402b7894bdfacd7d5db9eb274d982f61 (diff)
downloadmullvadvpn-51e7c8a43e76c401f80cfe3e12ba310780a30368.tar.xz
mullvadvpn-51e7c8a43e76c401f80cfe3e12ba310780a30368.zip
Separate relays.rs into multiple modules
-rw-r--r--mullvad-daemon/src/relays/matcher.rs313
-rw-r--r--mullvad-daemon/src/relays/mod.rs (renamed from mullvad-daemon/src/relays.rs)588
-rw-r--r--mullvad-daemon/src/relays/updater.rs211
-rw-r--r--mullvad-types/src/endpoint.rs8
4 files changed, 570 insertions, 550 deletions
diff --git a/mullvad-daemon/src/relays/matcher.rs b/mullvad-daemon/src/relays/matcher.rs
new file mode 100644
index 0000000000..08924a78e9
--- /dev/null
+++ b/mullvad-daemon/src/relays/matcher.rs
@@ -0,0 +1,313 @@
+use mullvad_types::{
+ endpoint::{MullvadEndpoint, MullvadWireguardEndpoint},
+ relay_constraints::{
+ Constraint, LocationConstraint, Match, OpenVpnConstraints, Providers, RelayConstraints,
+ TransportPort, WireguardConstraints,
+ },
+ relay_list::{Relay, RelayTunnels, WireguardEndpointData},
+};
+use rand::{seq::SliceRandom, Rng};
+use std::net::{IpAddr, SocketAddr};
+use talpid_types::net::{all_of_the_internet, wireguard, IpVersion, TransportProtocol, TunnelType};
+
+#[derive(Clone)]
+pub struct RelayMatcher<T: TunnelMatcher> {
+ pub location: Constraint<LocationConstraint>,
+ pub providers: Constraint<Providers>,
+ pub tunnel: T,
+}
+
+impl From<RelayConstraints> for RelayMatcher<AnyTunnelMatcher> {
+ fn from(constraints: RelayConstraints) -> Self {
+ Self {
+ location: constraints.location,
+ providers: constraints.providers,
+ tunnel: AnyTunnelMatcher {
+ wireguard: constraints.wireguard_constraints.into(),
+ openvpn: constraints.openvpn_constraints,
+ tunnel_type: constraints.tunnel_protocol,
+ },
+ }
+ }
+}
+
+impl RelayMatcher<AnyTunnelMatcher> {
+ pub fn to_wireguard_matcher(self) -> RelayMatcher<WireguardMatcher> {
+ RelayMatcher {
+ tunnel: self.tunnel.wireguard,
+ location: self.location,
+ providers: self.providers,
+ }
+ }
+}
+
+impl RelayMatcher<WireguardMatcher> {
+ pub fn set_peer(&mut self, peer: Relay) {
+ self.tunnel.peer = Some(peer);
+ }
+}
+
+impl<T: TunnelMatcher> RelayMatcher<T> {
+ /// Filter a relay and its endpoints based on constraints.
+ /// Only matching endpoints are included in the returned Relay.
+ pub fn filter_matching_relay(&self, relay: &Relay) -> Option<Relay> {
+ if !self.location.matches(relay) || !self.providers.matches(relay) {
+ return None;
+ }
+
+ self.tunnel.filter_matching_endpoints(relay)
+ }
+
+ pub fn mullvad_endpoint(&self, relay: &Relay) -> Option<MullvadEndpoint> {
+ self.tunnel.mullvad_endpoint(relay)
+ }
+}
+
+/// TunnelMatcher allows to abstract over different tunnel-specific constraints,
+/// as to not have false dependencies on OpenVpn specific constraints when
+/// selecting only WireGuard tunnels.
+pub trait TunnelMatcher: Clone {
+ /// Filter a relay and its endpoints based on constraints.
+ /// Only matching endpoints are included in the returned Relay.
+ fn filter_matching_endpoints(&self, relay: &Relay) -> Option<Relay>;
+ /// Constructs a MullvadEndpoint for a given Relay using extra data from the relay matcher
+ /// itself.
+ fn mullvad_endpoint(&self, relay: &Relay) -> Option<MullvadEndpoint>;
+}
+
+impl TunnelMatcher for OpenVpnMatcher {
+ fn filter_matching_endpoints(&self, relay: &Relay) -> Option<Relay> {
+ let tunnels = relay
+ .tunnels
+ .openvpn
+ .iter()
+ .filter(|endpoint| self.matches(endpoint))
+ .cloned()
+ .collect::<Vec<_>>();
+ if tunnels.is_empty() {
+ return None;
+ }
+ let mut relay = relay.clone();
+ relay.tunnels = RelayTunnels {
+ openvpn: tunnels,
+ wireguard: vec![],
+ };
+ Some(relay)
+ }
+
+ fn mullvad_endpoint(&self, relay: &Relay) -> Option<MullvadEndpoint> {
+ relay
+ .tunnels
+ .openvpn
+ .choose(&mut rand::thread_rng())
+ .cloned()
+ .map(|endpoint| endpoint.into_mullvad_endpoint(relay.ipv4_addr_in.into()))
+ }
+}
+
+pub type OpenVpnMatcher = OpenVpnConstraints;
+
+#[derive(Clone)]
+pub struct AnyTunnelMatcher {
+ wireguard: WireguardMatcher,
+ openvpn: OpenVpnMatcher,
+ /// in the case that a user hasn't specified a tunnel protocol, the relay
+ /// selector might still construct preferred constraints that do select a
+ /// specific tunnel protocol, which is why the tunnel type may be specified
+ /// in the `AnyTunnelMatcher`.
+ tunnel_type: Constraint<TunnelType>,
+}
+
+impl TunnelMatcher for AnyTunnelMatcher {
+ fn filter_matching_endpoints(&self, relay: &Relay) -> Option<Relay> {
+ match self.tunnel_type {
+ Constraint::Any => {
+ let wireguard_relay = self.wireguard.filter_matching_endpoints(relay);
+ let openvpn_relay = self.openvpn.filter_matching_endpoints(relay);
+
+ match (wireguard_relay, openvpn_relay) {
+ (Some(mut matched_relay), Some(openvpn_relay)) => {
+ matched_relay.tunnels.openvpn = openvpn_relay.tunnels.openvpn;
+ Some(matched_relay)
+ }
+ (Some(relay), None) | (None, Some(relay)) => Some(relay),
+ _ => None,
+ }
+ }
+ Constraint::Only(TunnelType::OpenVpn) => self.openvpn.filter_matching_endpoints(relay),
+ Constraint::Only(TunnelType::Wireguard) => {
+ self.wireguard.filter_matching_endpoints(relay)
+ }
+ }
+ }
+
+ fn mullvad_endpoint(&self, relay: &Relay) -> Option<MullvadEndpoint> {
+ #[cfg(not(target_os = "android"))]
+ match self.tunnel_type {
+ Constraint::Any => vec![
+ self.openvpn.mullvad_endpoint(relay),
+ self.wireguard.mullvad_endpoint(relay),
+ ]
+ .into_iter()
+ .filter_map(|relay| relay)
+ .collect::<Vec<_>>()
+ .choose(&mut rand::thread_rng())
+ .cloned(),
+ Constraint::Only(TunnelType::OpenVpn) => self.openvpn.mullvad_endpoint(relay),
+ Constraint::Only(TunnelType::Wireguard) => self.wireguard.mullvad_endpoint(relay),
+ }
+
+ #[cfg(target_os = "android")]
+ self.wireguard.mullvad_endpoint(relay)
+ }
+}
+
+#[derive(Clone)]
+pub struct WireguardMatcher {
+ /// The peer is an already selected peer relay to be used with multihop.
+ /// It's stored here so we can exclude it from further selections being made.
+ pub peer: Option<Relay>,
+ pub port: Constraint<TransportPort>,
+ pub ip_version: Constraint<IpVersion>,
+}
+
+impl WireguardMatcher {
+ fn wg_data_to_endpoint(
+ &self,
+ relay: &Relay,
+ data: WireguardEndpointData,
+ ) -> Option<MullvadEndpoint> {
+ let host = self.get_address_for_wireguard_relay(relay)?;
+ let port = self.get_port_for_wireguard_relay(&data)?;
+ let peer_config = wireguard::PeerConfig {
+ public_key: data.public_key,
+ endpoint: SocketAddr::new(host, port),
+ allowed_ips: all_of_the_internet(),
+ protocol: self
+ .port
+ .map(|port| port.protocol)
+ .unwrap_or(TransportProtocol::Udp),
+ };
+ Some(MullvadEndpoint::Wireguard(MullvadWireguardEndpoint {
+ peer: peer_config,
+ exit_peer: None,
+ ipv4_gateway: data.ipv4_gateway,
+ ipv6_gateway: data.ipv6_gateway,
+ }))
+ }
+
+ fn get_address_for_wireguard_relay(&self, relay: &Relay) -> Option<IpAddr> {
+ match self.ip_version {
+ Constraint::Any | Constraint::Only(IpVersion::V4) => Some(relay.ipv4_addr_in.into()),
+ Constraint::Only(IpVersion::V6) => relay.ipv6_addr_in.map(|addr| addr.into()),
+ }
+ }
+
+ fn get_port_for_wireguard_relay(&self, data: &WireguardEndpointData) -> Option<u16> {
+ match self
+ .port
+ .as_ref()
+ .map(|port| port.port)
+ .unwrap_or(Constraint::Any)
+ {
+ Constraint::Any => {
+ let get_port_amount =
+ |range: &(u16, u16)| -> u64 { (1 + range.1 - range.0) as u64 };
+ let port_amount: u64 = data.port_ranges.iter().map(get_port_amount).sum();
+
+ if port_amount < 1 {
+ return None;
+ }
+
+ let mut port_index = rand::thread_rng().gen_range(0, port_amount);
+
+ for range in data.port_ranges.iter() {
+ let ports_in_range = get_port_amount(range);
+ if port_index < ports_in_range {
+ return Some(port_index as u16 + range.0);
+ }
+ port_index -= ports_in_range;
+ }
+ log::error!("Port selection algorithm is broken!");
+ None
+ }
+ Constraint::Only(port) => {
+ if data
+ .port_ranges
+ .iter()
+ .any(|range| (range.0 <= port && port <= range.1))
+ {
+ Some(port)
+ } else {
+ None
+ }
+ }
+ }
+ }
+}
+
+impl From<WireguardConstraints> for WireguardMatcher {
+ fn from(constraints: WireguardConstraints) -> Self {
+ Self {
+ peer: None,
+ port: constraints.port,
+ ip_version: constraints.ip_version,
+ }
+ }
+}
+
+impl Match<WireguardEndpointData> for WireguardMatcher {
+ fn matches(&self, endpoint: &WireguardEndpointData) -> bool {
+ match self
+ .port
+ .as_ref()
+ .map(|port| port.port)
+ .unwrap_or(Constraint::Any)
+ {
+ Constraint::Any => true,
+ Constraint::Only(port) => endpoint
+ .port_ranges
+ .iter()
+ .any(|range| (port >= range.0 && port <= range.1)),
+ }
+ }
+}
+
+impl TunnelMatcher for WireguardMatcher {
+ fn filter_matching_endpoints(&self, relay: &Relay) -> Option<Relay> {
+ if self
+ .peer
+ .as_ref()
+ .map(|peer_relay| peer_relay.hostname == relay.hostname)
+ .unwrap_or(false)
+ {
+ return None;
+ }
+
+ let tunnels = relay
+ .tunnels
+ .wireguard
+ .iter()
+ .filter(|endpoint| self.matches(*endpoint))
+ .cloned()
+ .collect::<Vec<_>>();
+ if tunnels.is_empty() {
+ return None;
+ }
+ let mut relay = relay.clone();
+ relay.tunnels = RelayTunnels {
+ wireguard: tunnels,
+ openvpn: vec![],
+ };
+ Some(relay)
+ }
+
+ fn mullvad_endpoint(&self, relay: &Relay) -> Option<MullvadEndpoint> {
+ relay
+ .tunnels
+ .wireguard
+ .choose(&mut rand::thread_rng())
+ .cloned()
+ .and_then(|wg_tunnel| self.wg_data_to_endpoint(relay, wg_tunnel))
+ }
+}
diff --git a/mullvad-daemon/src/relays.rs b/mullvad-daemon/src/relays/mod.rs
index ac56bf75fd..1fead05f77 100644
--- a/mullvad-daemon/src/relays.rs
+++ b/mullvad-daemon/src/relays/mod.rs
@@ -2,13 +2,8 @@
//! updated as well.
use chrono::{DateTime, Local};
-use futures::{
- channel::mpsc,
- future::{Fuse, FusedFuture},
- FutureExt, SinkExt, StreamExt,
-};
use ipnetwork::IpNetwork;
-use mullvad_rpc::{availability::ApiAvailabilityHandle, rest::MullvadRestHandle, RelayListProxy};
+use mullvad_rpc::{availability::ApiAvailabilityHandle, rest::MullvadRestHandle};
use mullvad_types::{
endpoint::{MullvadEndpoint, MullvadWireguardEndpoint},
location::Location,
@@ -16,39 +11,35 @@ use mullvad_types::{
BridgeState, Constraint, InternalBridgeConstraints, LocationConstraint, Match,
OpenVpnConstraints, Providers, RelayConstraints, Set, TransportPort, WireguardConstraints,
},
- relay_list::{Relay, RelayList, RelayTunnels, WireguardEndpointData},
+ relay_list::{Relay, RelayList, WireguardEndpointData},
};
use parking_lot::Mutex;
use rand::{self, seq::SliceRandom, Rng};
use std::{
- future::Future,
io,
- net::{IpAddr, SocketAddr},
- path::{Path, PathBuf},
+ net::IpAddr,
+ path::Path,
sync::Arc,
- time::{self, Duration, Instant, SystemTime},
+ time::{self, SystemTime},
};
-use talpid_core::future_retry::{retry_future, ExponentialBackoff, Jittered};
use talpid_types::{
- net::{
- all_of_the_internet, openvpn::ProxySettings, wireguard, IpVersion, TransportProtocol,
- TunnelType,
- },
+ net::{openvpn::ProxySettings, wireguard, IpVersion, TransportProtocol, TunnelType},
ErrorExt,
};
-use tokio::fs::File;
+
+use crate::relays::updater::RelayListUpdater;
+
+use self::{
+ matcher::{RelayMatcher, TunnelMatcher, WireguardMatcher},
+ updater::RelayListUpdaterHandle,
+};
+
+mod matcher;
+mod updater;
const DATE_TIME_FORMAT_STR: &str = "%Y-%m-%d %H:%M:%S%.3f";
const RELAYS_FILENAME: &str = "relays.json";
-/// How often the updater should wake up to check the cache of the in-memory cache of relays.
-/// This check is very cheap. The only reason to not have it very often is because if downloading
-/// constantly fails it will try very often and fill the logs etc.
-const UPDATE_CHECK_INTERVAL: Duration = Duration::from_secs(60 * 15);
-/// How old the cached relays need to be to trigger an update
-const UPDATE_INTERVAL: Duration = Duration::from_secs(60 * 60);
-const EXPONENTIAL_BACKOFF_INITIAL: Duration = Duration::from_secs(16);
-const EXPONENTIAL_BACKOFF_FACTOR: u32 = 8;
const DEFAULT_WIREGUARD_PORT: u16 = 51820;
const WIREGUARD_EXIT_CONSTRAINTS: WireguardMatcher = WireguardMatcher {
peer: None,
@@ -353,12 +344,7 @@ impl RelaySelector {
)
};
- Self::set_entry_peers(
- exit_endpoint
- .get_peer_config()
- .expect("Failed to get peer config from WireGuard endpoint"),
- &mut entry_endpoint,
- );
+ Self::set_entry_peers(&exit_endpoint.unwrap_wireguard().peer, &mut entry_endpoint);
log::info!(
"Selected entry relay {} at {} going through {} at {}",
@@ -367,7 +353,7 @@ impl RelaySelector {
exit_relay.hostname,
exit_endpoint.to_endpoint().address.ip(),
);
- let result = RelaySelectorResult::wireguard_endpoint_with_entry(
+ let result = RelaySelectorResult::wireguard_multihop_endpoint(
exit_relay,
entry_endpoint,
entry_relay,
@@ -544,15 +530,9 @@ impl RelaySelector {
Constraint::Only(TunnelType::Wireguard) => {
relay_constraints.wireguard_constraints =
original_constraints.wireguard_constraints.clone();
- // This ensures that if after the first 2 failed attempts the daemon does not
- // connect, then afterwards 2 of each 4 successive attempts will try to connect
- // on port 53.
- if retry_attempt % 4 > 1 && relay_constraints.wireguard_constraints.port.is_any() {
+ if relay_constraints.wireguard_constraints.port.is_any() {
relay_constraints.wireguard_constraints.port =
- Constraint::Only(TransportPort {
- protocol: TransportProtocol::Udp,
- port: Constraint::Only(53),
- });
+ Self::preferred_wireguard_port(retry_attempt);
}
}
};
@@ -579,23 +559,20 @@ impl RelaySelector {
.relays()
.iter()
.filter(|relay| relay.active)
- .filter_map(|relay| matcher.matching_relay(relay))
+ .filter_map(|relay| matcher.filter_matching_relay(relay))
.collect();
let relay = self
.pick_random_relay(&matching_relays)
.map(|relay| relay.clone())
.ok_or(Error::NoRelay)?;
- let endpoint = matcher.mullvad_endpoint(&relay).ok_or(Error::NoRelay)?;
- match endpoint {
- MullvadEndpoint::Wireguard(endpoint) => Ok((relay, endpoint)),
- _ => {
- unreachable!(
- "Entry endpoints should only ever be WireGuard endpoints, instead got a {:?}",
- endpoint
- );
- }
- }
+ let endpoint = matcher
+ .mullvad_endpoint(&relay)
+ .ok_or(Error::NoRelay)?
+ .unwrap_wireguard()
+ .clone();
+
+ Ok((relay, endpoint))
}
fn set_entry_peers(
@@ -723,6 +700,9 @@ impl RelaySelector {
}
fn preferred_wireguard_port(retry_attempt: u32) -> Constraint<TransportPort> {
+ // This ensures that if after the first 2 failed attempts the daemon does not
+ // connect, then afterwards 2 of each 4 successive attempts will try to connect
+ // on port 53.
let port = match retry_attempt % 4 {
0 | 1 => Constraint::Any,
_ => Constraint::Only(53),
@@ -761,7 +741,7 @@ impl RelaySelector {
.relays()
.iter()
.filter(|relay| relay.active)
- .filter_map(|relay| matcher.matching_relay(relay))
+ .filter_map(|relay| matcher.filter_matching_relay(relay))
.collect();
self.pick_random_relay(&matching_relays)
@@ -810,7 +790,7 @@ impl RelaySelector {
if total_weight == 0 {
relays.choose(&mut rng)
} else {
- // Pick a random number in the range 1 - total_weight. This choses the relay with a
+ // Pick a random number in the range 1..=total_weight. This choses the relay with a
// non-zero weight.
let mut i: u64 = rng.gen_range(1, total_weight + 1);
Some(
@@ -889,7 +869,7 @@ impl RelaySelectorResult {
}
}
- fn wireguard_endpoint_with_entry(
+ fn wireguard_multihop_endpoint(
exit_relay: Relay,
endpoint: MullvadWireguardEndpoint,
entry: Relay,
@@ -902,489 +882,6 @@ impl RelaySelectorResult {
}
}
-#[derive(Clone)]
-pub struct RelayListUpdaterHandle {
- tx: mpsc::Sender<()>,
-}
-
-impl RelayListUpdaterHandle {
- pub async fn update_relay_list(&mut self) -> Result<(), Error> {
- self.tx
- .send(())
- .await
- .map_err(|_| Error::DownloaderShutDown)
- }
-}
-
-struct RelayListUpdater {
- rpc_client: RelayListProxy,
- cache_path: PathBuf,
- parsed_relays: Arc<Mutex<ParsedRelays>>,
- on_update: Box<dyn Fn(&RelayList) + Send + 'static>,
- earliest_next_try: Instant,
- api_availability: ApiAvailabilityHandle,
-}
-
-impl RelayListUpdater {
- pub fn new(
- rpc_handle: MullvadRestHandle,
- cache_path: PathBuf,
- parsed_relays: Arc<Mutex<ParsedRelays>>,
- on_update: Box<dyn Fn(&RelayList) + Send + 'static>,
- api_availability: ApiAvailabilityHandle,
- ) -> RelayListUpdaterHandle {
- let (tx, cmd_rx) = mpsc::channel(1);
- let service = rpc_handle.service();
- let rpc_client = RelayListProxy::new(rpc_handle);
- let updater = RelayListUpdater {
- rpc_client,
- cache_path,
- parsed_relays,
- on_update,
- earliest_next_try: Instant::now() + UPDATE_INTERVAL,
- api_availability,
- };
-
- service.spawn(updater.run(cmd_rx));
-
- RelayListUpdaterHandle { tx }
- }
-
- async fn run(mut self, mut cmd_rx: mpsc::Receiver<()>) {
- let mut check_interval =
- tokio_stream::wrappers::IntervalStream::new(tokio::time::interval_at(
- (Instant::now() + UPDATE_CHECK_INTERVAL).into(),
- UPDATE_CHECK_INTERVAL,
- ))
- .fuse();
- let mut download_future = Box::pin(Fuse::terminated());
- loop {
- futures::select! {
- _check_update = check_interval.next() => {
- if download_future.is_terminated() && self.should_update() {
- let tag = self.parsed_relays.lock().tag().map(|tag| tag.to_string());
- download_future = Box::pin(Self::download_relay_list(self.api_availability.clone(), self.rpc_client.clone(), tag).fuse());
- self.earliest_next_try = Instant::now() + UPDATE_INTERVAL;
- }
- },
-
- new_relay_list = download_future => {
- self.consume_new_relay_list(new_relay_list).await;
-
- },
-
- cmd = cmd_rx.next() => {
- match cmd {
- Some(()) => {
- let tag = self.parsed_relays.lock().tag().map(|tag| tag.to_string());
- download_future = Box::pin(Self::download_relay_list(self.api_availability.clone(), self.rpc_client.clone(), tag).fuse());
- },
- None => {
- log::trace!("Relay list updater shutting down");
- return;
- }
- }
- }
-
- };
- }
- }
-
- async fn consume_new_relay_list(
- &mut self,
- result: Result<Option<RelayList>, mullvad_rpc::Error>,
- ) {
- match result {
- Ok(Some(relay_list)) => {
- if let Err(err) = self.update_cache(relay_list).await {
- log::error!("Failed to update relay list cache: {}", err);
- }
- }
- Ok(None) => log::debug!("Relay list is up-to-date"),
- Err(err) => {
- log::error!(
- "Failed to fetch new relay list: {}. Will retry in {} minutes",
- err,
- self.earliest_next_try
- .saturating_duration_since(Instant::now())
- .as_secs()
- / 60
- );
- }
- }
- }
-
- /// Returns true if the current parsed_relays is older than UPDATE_INTERVAL
- fn should_update(&mut self) -> bool {
- match SystemTime::now().duration_since(self.parsed_relays.lock().last_updated()) {
- Ok(duration) => duration > UPDATE_INTERVAL && self.earliest_next_try <= Instant::now(),
- // If the clock is skewed we have no idea by how much or when the last update
- // actually was, better download again to get in sync and get a `last_updated`
- // timestamp corresponding to the new time.
- Err(_) => true,
- }
- }
-
- fn download_relay_list(
- api_handle: ApiAvailabilityHandle,
- rpc_handle: RelayListProxy,
- tag: Option<String>,
- ) -> impl Future<Output = Result<Option<RelayList>, mullvad_rpc::Error>> + 'static {
- let download_futures = move || {
- let available = api_handle.wait_background();
- let req = rpc_handle.relay_list(tag.clone());
- async move {
- available.await?;
- req.await.map_err(mullvad_rpc::Error::from)
- }
- };
-
- let exponential_backoff =
- ExponentialBackoff::new(EXPONENTIAL_BACKOFF_INITIAL, EXPONENTIAL_BACKOFF_FACTOR)
- .max_delay(UPDATE_INTERVAL * 2);
-
- let download_future = retry_future(
- download_futures,
- |result| result.is_err(),
- Jittered::jitter(exponential_backoff),
- );
- download_future
- }
-
- async fn update_cache(&mut self, new_relay_list: RelayList) -> Result<(), Error> {
- if let Err(error) = Self::cache_relays(&self.cache_path, &new_relay_list).await {
- log::error!(
- "{}",
- error.display_chain_with_msg("Failed to update relay cache on disk")
- );
- }
-
- let new_parsed_relays = ParsedRelays::from_relay_list(new_relay_list, SystemTime::now());
- log::info!(
- "Downloaded relay inventory has {} relays",
- new_parsed_relays.relays().len()
- );
-
- let mut parsed_relays = self.parsed_relays.lock();
- *parsed_relays = new_parsed_relays;
- (self.on_update)(parsed_relays.locations());
- Ok(())
- }
-
- /// Write a `RelayList` to the cache file.
- async fn cache_relays(cache_path: &Path, relays: &RelayList) -> Result<(), Error> {
- log::debug!("Writing relays cache to {}", cache_path.display());
- let mut file = File::create(cache_path)
- .await
- .map_err(Error::OpenRelayCache)?;
- let bytes = serde_json::to_vec_pretty(relays).map_err(Error::Serialize)?;
- let mut slice: &[u8] = bytes.as_slice();
- let _ = tokio::io::copy(&mut slice, &mut file)
- .await
- .map_err(Error::WriteRelayCache)?;
- Ok(())
- }
-}
-
-#[derive(Clone)]
-struct RelayMatcher<T: TunnelMatcher> {
- location: Constraint<LocationConstraint>,
- providers: Constraint<Providers>,
- tunnel: T,
-}
-
-impl From<RelayConstraints> for RelayMatcher<AnyTunnelMatcher> {
- fn from(constraints: RelayConstraints) -> Self {
- Self {
- location: constraints.location,
- providers: constraints.providers,
- tunnel: AnyTunnelMatcher {
- wireguard: constraints.wireguard_constraints.into(),
- openvpn: constraints.openvpn_constraints,
- tunnel_type: constraints.tunnel_protocol,
- },
- }
- }
-}
-
-impl RelayMatcher<AnyTunnelMatcher> {
- fn to_wireguard_matcher(self) -> RelayMatcher<WireguardMatcher> {
- let Self {
- location,
- providers,
- tunnel,
- } = self;
-
- let tunnel = tunnel.wireguard;
- RelayMatcher {
- tunnel,
- location,
- providers,
- }
- }
-}
-
-impl RelayMatcher<WireguardMatcher> {
- pub fn set_peer(&mut self, peer: Relay) {
- self.tunnel.peer = Some(peer);
- }
-}
-
-impl<T: TunnelMatcher> RelayMatcher<T> {
- fn matching_relay(&self, relay: &Relay) -> Option<Relay> {
- if !self.location.matches(relay) || !self.providers.matches(relay) {
- return None;
- }
-
- self.tunnel.matching_relay(relay)
- }
-
- fn mullvad_endpoint(&self, relay: &Relay) -> Option<MullvadEndpoint> {
- self.tunnel.mullvad_endpoint(relay)
- }
-}
-/// TunnelMatcher allows to abstract over different tunnel-specific constraints,
-/// as to not have false dependencies on OpenVpn specific constraints when
-/// selecting only WireGuard tunnels.
-trait TunnelMatcher: Clone {
- /// Check if given relay matches tunnel-specific constraints.
- fn matching_relay(&self, relay: &Relay) -> Option<Relay>;
- /// Constructs a MullvadEndpoint for a given Relay using extra data from the relay matcher
- /// itself.
- fn mullvad_endpoint(&self, relay: &Relay) -> Option<MullvadEndpoint>;
-}
-
-impl TunnelMatcher for OpenVpnConstraints {
- fn matching_relay(&self, relay: &Relay) -> Option<Relay> {
- let tunnels = relay
- .tunnels
- .openvpn
- .iter()
- .filter(|endpoint| self.matches(endpoint))
- .cloned()
- .collect::<Vec<_>>();
- if tunnels.is_empty() {
- return None;
- }
- let mut relay = relay.clone();
- relay.tunnels = RelayTunnels {
- openvpn: tunnels,
- wireguard: vec![],
- };
- Some(relay)
- }
-
- fn mullvad_endpoint(&self, relay: &Relay) -> Option<MullvadEndpoint> {
- relay
- .tunnels
- .openvpn
- .choose(&mut rand::thread_rng())
- .cloned()
- .map(|endpoint| endpoint.into_mullvad_endpoint(relay.ipv4_addr_in.into()))
- }
-}
-
-#[derive(Clone)]
-struct AnyTunnelMatcher {
- wireguard: WireguardMatcher,
- openvpn: OpenVpnConstraints,
- /// in the case that a user hasn't specified a tunnel protocol, the relay
- /// selector might still construct preferred constraints that do select a
- /// specific tunnel protocol, which is why the tunnel type may be specified
- /// in the `AnyTunnelMatcher`.
- tunnel_type: Constraint<TunnelType>,
-}
-
-impl TunnelMatcher for AnyTunnelMatcher {
- fn matching_relay(&self, relay: &Relay) -> Option<Relay> {
- match self.tunnel_type {
- Constraint::Any => {
- let wireguard_relay = self.wireguard.matching_relay(relay);
- let openvpn_relay = self.openvpn.matching_relay(relay);
-
- match (wireguard_relay, openvpn_relay) {
- (Some(mut matched_relay), Some(openvpn_relay)) => {
- matched_relay.tunnels.openvpn = openvpn_relay.tunnels.openvpn;
- Some(matched_relay)
- }
- (Some(relay), None) | (None, Some(relay)) => Some(relay),
- _ => None,
- }
- }
- Constraint::Only(TunnelType::OpenVpn) => self.openvpn.matching_relay(relay),
- Constraint::Only(TunnelType::Wireguard) => self.wireguard.matching_relay(relay),
- }
- }
-
- fn mullvad_endpoint(&self, relay: &Relay) -> Option<MullvadEndpoint> {
- #[cfg(not(target_os = "android"))]
- match self.tunnel_type {
- Constraint::Any => vec![
- self.openvpn.mullvad_endpoint(relay),
- self.wireguard.mullvad_endpoint(relay),
- ]
- .into_iter()
- .filter_map(|relay| relay)
- .collect::<Vec<_>>()
- .choose(&mut rand::thread_rng())
- .cloned(),
- Constraint::Only(TunnelType::OpenVpn) => self.openvpn.mullvad_endpoint(relay),
- Constraint::Only(TunnelType::Wireguard) => self.wireguard.mullvad_endpoint(relay),
- }
-
- #[cfg(target_os = "android")]
- self.wireguard_matcher.mullvad_endpoint(relay)
- }
-}
-
-#[derive(Clone)]
-struct WireguardMatcher {
- peer: Option<Relay>,
- port: Constraint<TransportPort>,
- ip_version: Constraint<IpVersion>,
-}
-
-impl WireguardMatcher {
- fn wg_data_to_endpoint(
- &self,
- relay: &Relay,
- data: WireguardEndpointData,
- ) -> Option<MullvadEndpoint> {
- let host = self.get_address_for_wireguard_relay(relay)?;
- let port = self.get_port_for_wireguard_relay(&data)?;
- let peer_config = wireguard::PeerConfig {
- public_key: data.public_key,
- endpoint: SocketAddr::new(host, port),
- allowed_ips: all_of_the_internet(),
- protocol: self
- .port
- .map(|port| port.protocol)
- .unwrap_or(TransportProtocol::Udp),
- };
- Some(MullvadEndpoint::Wireguard(MullvadWireguardEndpoint {
- peer: peer_config,
- exit_peer: None,
- ipv4_gateway: data.ipv4_gateway,
- ipv6_gateway: data.ipv6_gateway,
- }))
- }
-
- fn get_address_for_wireguard_relay(&self, relay: &Relay) -> Option<IpAddr> {
- match self.ip_version {
- Constraint::Any | Constraint::Only(IpVersion::V4) => Some(relay.ipv4_addr_in.into()),
- Constraint::Only(IpVersion::V6) => relay.ipv6_addr_in.map(|addr| addr.into()),
- }
- }
-
- fn get_port_for_wireguard_relay(&self, data: &WireguardEndpointData) -> Option<u16> {
- match self
- .port
- .as_ref()
- .map(|port| port.port)
- .unwrap_or(Constraint::Any)
- {
- Constraint::Any => {
- let get_port_amount =
- |range: &(u16, u16)| -> u64 { (1 + range.1 - range.0) as u64 };
- let port_amount: u64 = data.port_ranges.iter().map(get_port_amount).sum();
-
- if port_amount < 1 {
- return None;
- }
-
- let mut port_index = rand::thread_rng().gen_range(0, port_amount);
-
- for range in data.port_ranges.iter() {
- let ports_in_range = get_port_amount(range);
- if port_index < ports_in_range {
- return Some(port_index as u16 + range.0);
- }
- port_index -= ports_in_range;
- }
- log::error!("Port selection algorithm is broken!");
- None
- }
- Constraint::Only(port) => {
- if data
- .port_ranges
- .iter()
- .any(|range| (range.0 <= port && port <= range.1))
- {
- Some(port)
- } else {
- None
- }
- }
- }
- }
-}
-
-impl From<WireguardConstraints> for WireguardMatcher {
- fn from(constraints: WireguardConstraints) -> Self {
- Self {
- peer: None,
- port: constraints.port,
- ip_version: constraints.ip_version,
- }
- }
-}
-
-impl Match<WireguardEndpointData> for WireguardMatcher {
- fn matches(&self, endpoint: &WireguardEndpointData) -> bool {
- match self
- .port
- .as_ref()
- .map(|port| port.port)
- .unwrap_or(Constraint::Any)
- {
- Constraint::Any => true,
- Constraint::Only(port) => endpoint
- .port_ranges
- .iter()
- .any(|range| (port >= range.0 && port <= range.1)),
- }
- }
-}
-
-impl TunnelMatcher for WireguardMatcher {
- fn matching_relay(&self, relay: &Relay) -> Option<Relay> {
- if self
- .peer
- .as_ref()
- .map(|peer_relay| peer_relay.hostname == relay.hostname)
- .unwrap_or(false)
- {
- return None;
- }
-
- let tunnels = relay
- .tunnels
- .wireguard
- .iter()
- .filter(|endpoint| self.matches(*endpoint))
- .cloned()
- .collect::<Vec<_>>();
- if tunnels.is_empty() {
- return None;
- }
- let mut relay = relay.clone();
- relay.tunnels = RelayTunnels {
- wireguard: tunnels,
- openvpn: vec![],
- };
- Some(relay)
- }
-
- fn mullvad_endpoint(&self, relay: &Relay) -> Option<MullvadEndpoint> {
- relay
- .tunnels
- .wireguard
- .choose(&mut rand::thread_rng())
- .cloned()
- .and_then(|wg_tunnel| self.wg_data_to_endpoint(relay, wg_tunnel))
- }
-}
-
#[cfg(test)]
mod test {
use super::*;
@@ -1675,16 +1172,13 @@ mod test {
.map_err(|error| error.to_string())?;
assert_eq!(exit_relay.hostname, specific_hostname);
- match endpoint {
- MullvadEndpoint::OpenVpn { .. } => return Err("Expected WireGuard relay".to_string()),
- MullvadEndpoint::Wireguard(endpoint) => {
- assert_eq!(
- exit_relay.ipv4_addr_in,
- endpoint.exit_peer.unwrap().endpoint.ip()
- );
- assert_ne!(exit_relay.ipv4_addr_in, endpoint.peer.endpoint.ip());
- }
- }
+
+ let endpoint = endpoint.unwrap_wireguard();
+ assert_eq!(
+ exit_relay.ipv4_addr_in,
+ endpoint.exit_peer.as_ref().unwrap().endpoint.ip()
+ );
+ assert_ne!(exit_relay.ipv4_addr_in, endpoint.peer.endpoint.ip());
Ok(())
}
diff --git a/mullvad-daemon/src/relays/updater.rs b/mullvad-daemon/src/relays/updater.rs
new file mode 100644
index 0000000000..521eee3cda
--- /dev/null
+++ b/mullvad-daemon/src/relays/updater.rs
@@ -0,0 +1,211 @@
+use super::{Error, ParsedRelays};
+use futures::{
+ channel::mpsc,
+ future::{Fuse, FusedFuture},
+ Future, FutureExt, SinkExt, StreamExt,
+};
+use mullvad_rpc::{availability::ApiAvailabilityHandle, rest::MullvadRestHandle, RelayListProxy};
+use mullvad_types::relay_list::RelayList;
+use parking_lot::Mutex;
+use std::{
+ path::{Path, PathBuf},
+ sync::Arc,
+ time::{Duration, Instant, SystemTime},
+};
+use talpid_core::future_retry::{retry_future, ExponentialBackoff, Jittered};
+use talpid_types::ErrorExt;
+use tokio::fs::File;
+
+/// How often the updater should wake up to check the cache of the in-memory cache of relays.
+/// This check is very cheap. The only reason to not have it very often is because if downloading
+/// constantly fails it will try very often and fill the logs etc.
+const UPDATE_CHECK_INTERVAL: Duration = Duration::from_secs(60 * 15);
+/// How old the cached relays need to be to trigger an update
+const UPDATE_INTERVAL: Duration = Duration::from_secs(60 * 60);
+
+const EXPONENTIAL_BACKOFF_INITIAL: Duration = Duration::from_secs(16);
+const EXPONENTIAL_BACKOFF_FACTOR: u32 = 8;
+
+#[derive(Clone)]
+pub struct RelayListUpdaterHandle {
+ tx: mpsc::Sender<()>,
+}
+
+impl RelayListUpdaterHandle {
+ pub async fn update_relay_list(&mut self) -> Result<(), Error> {
+ self.tx
+ .send(())
+ .await
+ .map_err(|_| Error::DownloaderShutDown)
+ }
+}
+
+pub struct RelayListUpdater {
+ rpc_client: RelayListProxy,
+ cache_path: PathBuf,
+ parsed_relays: Arc<Mutex<ParsedRelays>>,
+ on_update: Box<dyn Fn(&RelayList) + Send + 'static>,
+ earliest_next_try: Instant,
+ api_availability: ApiAvailabilityHandle,
+}
+
+impl RelayListUpdater {
+ pub(super) fn new(
+ rpc_handle: MullvadRestHandle,
+ cache_path: PathBuf,
+ parsed_relays: Arc<Mutex<ParsedRelays>>,
+ on_update: Box<dyn Fn(&RelayList) + Send + 'static>,
+ api_availability: ApiAvailabilityHandle,
+ ) -> RelayListUpdaterHandle {
+ let (tx, cmd_rx) = mpsc::channel(1);
+ let service = rpc_handle.service();
+ let rpc_client = RelayListProxy::new(rpc_handle);
+ let updater = RelayListUpdater {
+ rpc_client,
+ cache_path,
+ parsed_relays,
+ on_update,
+ earliest_next_try: Instant::now() + UPDATE_INTERVAL,
+ api_availability,
+ };
+
+ service.spawn(updater.run(cmd_rx));
+
+ RelayListUpdaterHandle { tx }
+ }
+
+ async fn run(mut self, mut cmd_rx: mpsc::Receiver<()>) {
+ let mut check_interval =
+ tokio_stream::wrappers::IntervalStream::new(tokio::time::interval_at(
+ (Instant::now() + UPDATE_CHECK_INTERVAL).into(),
+ UPDATE_CHECK_INTERVAL,
+ ))
+ .fuse();
+ let mut download_future = Box::pin(Fuse::terminated());
+ loop {
+ futures::select! {
+ _check_update = check_interval.next() => {
+ if download_future.is_terminated() && self.should_update() {
+ let tag = self.parsed_relays.lock().tag().map(|tag| tag.to_string());
+ download_future = Box::pin(Self::download_relay_list(self.api_availability.clone(), self.rpc_client.clone(), tag).fuse());
+ self.earliest_next_try = Instant::now() + UPDATE_INTERVAL;
+ }
+ },
+
+ new_relay_list = download_future => {
+ self.consume_new_relay_list(new_relay_list).await;
+
+ },
+
+ cmd = cmd_rx.next() => {
+ match cmd {
+ Some(()) => {
+ let tag = self.parsed_relays.lock().tag().map(|tag| tag.to_string());
+ download_future = Box::pin(Self::download_relay_list(self.api_availability.clone(), self.rpc_client.clone(), tag).fuse());
+ },
+ None => {
+ log::trace!("Relay list updater shutting down");
+ return;
+ }
+ }
+ }
+
+ };
+ }
+ }
+
+ async fn consume_new_relay_list(
+ &mut self,
+ result: Result<Option<RelayList>, mullvad_rpc::Error>,
+ ) {
+ match result {
+ Ok(Some(relay_list)) => {
+ if let Err(err) = self.update_cache(relay_list).await {
+ log::error!("Failed to update relay list cache: {}", err);
+ }
+ }
+ Ok(None) => log::debug!("Relay list is up-to-date"),
+ Err(err) => {
+ log::error!(
+ "Failed to fetch new relay list: {}. Will retry in {} minutes",
+ err,
+ self.earliest_next_try
+ .saturating_duration_since(Instant::now())
+ .as_secs()
+ / 60
+ );
+ }
+ }
+ }
+
+ /// Returns true if the current parsed_relays is older than UPDATE_INTERVAL
+ fn should_update(&mut self) -> bool {
+ match SystemTime::now().duration_since(self.parsed_relays.lock().last_updated()) {
+ Ok(duration) => duration > UPDATE_INTERVAL && self.earliest_next_try <= Instant::now(),
+ // If the clock is skewed we have no idea by how much or when the last update
+ // actually was, better download again to get in sync and get a `last_updated`
+ // timestamp corresponding to the new time.
+ Err(_) => true,
+ }
+ }
+
+ fn download_relay_list(
+ api_handle: ApiAvailabilityHandle,
+ rpc_handle: RelayListProxy,
+ tag: Option<String>,
+ ) -> impl Future<Output = Result<Option<RelayList>, mullvad_rpc::Error>> + 'static {
+ let download_futures = move || {
+ let available = api_handle.wait_background();
+ let req = rpc_handle.relay_list(tag.clone());
+ async move {
+ available.await?;
+ req.await.map_err(mullvad_rpc::Error::from)
+ }
+ };
+
+ let exponential_backoff =
+ ExponentialBackoff::new(EXPONENTIAL_BACKOFF_INITIAL, EXPONENTIAL_BACKOFF_FACTOR)
+ .max_delay(UPDATE_INTERVAL * 2);
+
+ let download_future = retry_future(
+ download_futures,
+ |result| result.is_err(),
+ Jittered::jitter(exponential_backoff),
+ );
+ download_future
+ }
+
+ async fn update_cache(&mut self, new_relay_list: RelayList) -> Result<(), Error> {
+ if let Err(error) = Self::cache_relays(&self.cache_path, &new_relay_list).await {
+ log::error!(
+ "{}",
+ error.display_chain_with_msg("Failed to update relay cache on disk")
+ );
+ }
+
+ let new_parsed_relays = ParsedRelays::from_relay_list(new_relay_list, SystemTime::now());
+ log::info!(
+ "Downloaded relay inventory has {} relays",
+ new_parsed_relays.relays().len()
+ );
+
+ let mut parsed_relays = self.parsed_relays.lock();
+ *parsed_relays = new_parsed_relays;
+ (self.on_update)(parsed_relays.locations());
+ Ok(())
+ }
+
+ /// Write a `RelayList` to the cache file.
+ async fn cache_relays(cache_path: &Path, relays: &RelayList) -> Result<(), Error> {
+ log::debug!("Writing relays cache to {}", cache_path.display());
+ let mut file = File::create(cache_path)
+ .await
+ .map_err(Error::OpenRelayCache)?;
+ let bytes = serde_json::to_vec_pretty(relays).map_err(Error::Serialize)?;
+ let mut slice: &[u8] = bytes.as_slice();
+ let _ = tokio::io::copy(&mut slice, &mut file)
+ .await
+ .map_err(Error::WriteRelayCache)?;
+ Ok(())
+ }
+}
diff --git a/mullvad-types/src/endpoint.rs b/mullvad-types/src/endpoint.rs
index 11049c0026..bfba11a481 100644
--- a/mullvad-types/src/endpoint.rs
+++ b/mullvad-types/src/endpoint.rs
@@ -36,10 +36,12 @@ impl MullvadEndpoint {
}
}
- pub fn get_peer_config(&self) -> Option<&wireguard::PeerConfig> {
+ pub fn unwrap_wireguard(&self) -> &MullvadWireguardEndpoint {
match self {
- Self::Wireguard(wireguard_endpoint) => Some(&wireguard_endpoint.peer),
- _ => None,
+ Self::Wireguard(endpoint) => endpoint,
+ other => {
+ panic!("Expected WireGuard enum variant but got {:?}", other);
+ }
}
}
}