summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorJanito Vaqueiro Ferreira Filho <janito@mullvad.net>2018-07-09 17:37:26 -0300
committerJanito Vaqueiro Ferreira Filho <janito@mullvad.net>2018-07-13 08:40:16 -0300
commitb91191d05e569e1aaccb7a734a57f642e6c865a0 (patch)
treebb0ca28ece0ad6376ac4fb4a338356de41bb0301
parent24db388d3c98b9e7cda81930fabbbd3cb4cec3e9 (diff)
downloadmullvadvpn-b91191d05e569e1aaccb7a734a57f642e6c865a0.tar.xz
mullvadvpn-b91191d05e569e1aaccb7a734a57f642e6c865a0.zip
Periodically update the relay cache
-rw-r--r--mullvad-daemon/src/main.rs22
-rw-r--r--mullvad-daemon/src/relays.rs173
2 files changed, 134 insertions, 61 deletions
diff --git a/mullvad-daemon/src/main.rs b/mullvad-daemon/src/main.rs
index efc2da7632..97df24e7d4 100644
--- a/mullvad-daemon/src/main.rs
+++ b/mullvad-daemon/src/main.rs
@@ -72,7 +72,7 @@ use mullvad_types::version::{AppVersion, AppVersionInfo};
use std::io;
use std::net::IpAddr;
-use std::path::{Path, PathBuf};
+use std::path::PathBuf;
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};
@@ -122,8 +122,6 @@ error_chain!{
}
static MIN_TUNNEL_ALIVE_TIME: Duration = Duration::from_millis(1000);
-static MAX_RELAY_CACHE_AGE: Duration = Duration::from_secs(3600);
-static RELAY_CACHE_UPDATE_TIMEOUT: Duration = Duration::from_millis(3000);
const DAEMON_LOG_FILENAME: &str = "daemon.log";
const OPENVPN_LOG_FILENAME: &str = "openvpn.log";
@@ -238,7 +236,7 @@ impl Daemon {
let https_handle = https_handle.chain_err(|| "Unable to create am.i.mullvad client")?;
let relay_selector =
- Self::create_relay_selector(rpc_handle.clone(), &resource_dir, &cache_dir);
+ relays::RelaySelector::new(rpc_handle.clone(), &resource_dir, &cache_dir);
let (tx, rx) = mpsc::channel();
let management_interface_broadcaster =
@@ -272,22 +270,6 @@ impl Daemon {
})
}
- fn create_relay_selector(
- rpc_handle: mullvad_rpc::HttpHandle,
- resource_dir: &Path,
- cache_dir: &Path,
- ) -> relays::RelaySelector {
- let mut relay_selector = relays::RelaySelector::new(rpc_handle, &resource_dir, cache_dir);
- if let Ok(elapsed) = relay_selector.get_last_updated().elapsed() {
- if elapsed > MAX_RELAY_CACHE_AGE {
- if let Err(e) = relay_selector.update(RELAY_CACHE_UPDATE_TIMEOUT) {
- error!("Unable to update relay cache: {}", e.display_chain());
- }
- }
- }
- relay_selector
- }
-
// Starts the management interface and spawns a thread that will process it.
// Returns a handle that allows notifying all subscribers on events.
fn start_management_interface(
diff --git a/mullvad-daemon/src/relays.rs b/mullvad-daemon/src/relays.rs
index 4bae7eac7b..fb60f985ba 100644
--- a/mullvad-daemon/src/relays.rs
+++ b/mullvad-daemon/src/relays.rs
@@ -14,17 +14,20 @@ use serde_json;
use talpid_types::net::{TransportProtocol, TunnelEndpoint, TunnelEndpointData};
use std::fs::File;
-use std::io;
use std::net::IpAddr;
use std::path::{Path, PathBuf};
-use std::sync::{Arc, Mutex, MutexGuard};
+use std::sync::{mpsc, Arc, Mutex, MutexGuard};
use std::time::{self, Duration, SystemTime};
+use std::{io, thread};
use rand::distributions::{IndependentSample, Range};
use rand::{self, Rng, ThreadRng};
use tokio_timer::{TimeoutError, Timer};
const RELAYS_FILENAME: &str = "relays.json";
+const DOWNLOAD_TIMEOUT: Duration = Duration::from_secs(3);
+const UPDATE_INTERVAL: Duration = Duration::from_secs(60 * 60);
+const MAX_CACHE_AGE: Duration = Duration::from_secs(60 * 60 * 24);
error_chain! {
errors {
@@ -120,8 +123,7 @@ impl ParsedRelays {
pub struct RelaySelector {
parsed_relays: Arc<Mutex<ParsedRelays>>,
rng: ThreadRng,
- rpc_client: RelayListProxy<HttpHandle>,
- cache_path: PathBuf,
+ _updater: RelayListUpdaterHandle,
}
impl RelaySelector {
@@ -129,23 +131,24 @@ impl RelaySelector {
/// to refresh the relay list from the internet.
pub fn new(rpc_handle: HttpHandle, resource_dir: &Path, cache_dir: &Path) -> Self {
let cache_path = cache_dir.join(RELAYS_FILENAME);
- let parsed_relays =
- Self::read_cached_relays(&cache_path, resource_dir).unwrap_or_else(|error| {
+ let unsynchronized_parsed_relays = Self::read_cached_relays(&cache_path, resource_dir)
+ .unwrap_or_else(|error| {
let chained_error = error.chain_err(|| "Unable to load cached relays");
error!("{}", chained_error.display_chain());
ParsedRelays::empty()
});
info!(
"Initialized with {} cached relays from {}",
- parsed_relays.relays().len(),
- DateTime::<Local>::from(parsed_relays.last_updated())
+ unsynchronized_parsed_relays.relays().len(),
+ DateTime::<Local>::from(unsynchronized_parsed_relays.last_updated())
.format(::logging::DATE_TIME_FORMAT_STR)
);
+ let parsed_relays = Arc::new(Mutex::new(unsynchronized_parsed_relays));
+ let updater = RelayListUpdater::spawn(rpc_handle, cache_path, parsed_relays.clone());
RelaySelector {
- parsed_relays: Arc::new(Mutex::new(parsed_relays)),
+ parsed_relays,
rng: rand::thread_rng(),
- rpc_client: RelayListProxy::new(rpc_handle),
- cache_path,
+ _updater: updater,
}
}
@@ -161,12 +164,6 @@ impl RelaySelector {
.expect("Relay updater thread crashed while it held a lock to the list of relays")
}
- /// Returns the time when the relay list backing this selector was last fetched from the
- /// internet.
- pub fn get_last_updated(&self) -> SystemTime {
- self.lock_parsed_relays().last_updated()
- }
-
/// Returns a random relay and relay endpoint matching the given constraints and with
/// preferences applied.
pub fn get_tunnel_endpoint(
@@ -330,26 +327,127 @@ impl RelaySelector {
.map(|openvpn_endpoint| TunnelEndpointData::OpenVpn(openvpn_endpoint))
}
- /// Downloads the latest relay list and caches it. This operation is blocking.
- pub fn update(&mut self, timeout: Duration) -> Result<()> {
- info!("Downloading list of relays...");
- let download_future = self
- .rpc_client
- .relay_list()
- .map_err(|e| Error::with_chain(e, ErrorKind::DownloadError));
- let relay_list = Timer::default().timeout(download_future, timeout).wait()?;
- if let Err(e) = self.cache_relays(&relay_list) {
- error!("Unable to save relays to cache: {}", e.display_chain());
+ /// Try to read the relays, first from cache and if that fails from the `resource_dir`.
+ fn read_cached_relays(cache_path: &Path, resource_dir: &Path) -> Result<ParsedRelays> {
+ match ParsedRelays::from_file(cache_path) {
+ Ok(value) => Ok(value),
+ Err(read_cache_error) => match ParsedRelays::from_file(
+ resource_dir.join(RELAYS_FILENAME),
+ ) {
+ Ok(value) => Ok(value),
+ Err(read_resource_error) => Err(read_cache_error.chain_err(|| read_resource_error)),
+ },
}
- let parsed_relays = ParsedRelays::from_relay_list(relay_list, SystemTime::now());
+ }
+}
+
+type RelayListUpdaterHandle = mpsc::Sender<()>;
+
+struct RelayListUpdater {
+ rpc_client: RelayListProxy<HttpHandle>,
+ cache_path: PathBuf,
+ parsed_relays: Arc<Mutex<ParsedRelays>>,
+ close_handle: mpsc::Receiver<()>,
+}
+
+impl RelayListUpdater {
+ pub fn spawn(
+ rpc_handle: HttpHandle,
+ cache_path: PathBuf,
+ parsed_relays: Arc<Mutex<ParsedRelays>>,
+ ) -> RelayListUpdaterHandle {
+ let (tx, rx) = mpsc::channel();
+
+ thread::spawn(move || Self::new(rpc_handle, cache_path, parsed_relays, rx).run());
+
+ tx
+ }
+
+ fn new(
+ rpc_handle: HttpHandle,
+ cache_path: PathBuf,
+ parsed_relays: Arc<Mutex<ParsedRelays>>,
+ close_handle: mpsc::Receiver<()>,
+ ) -> Self {
+ let rpc_client = RelayListProxy::new(rpc_handle);
+
+ RelayListUpdater {
+ rpc_client,
+ cache_path,
+ parsed_relays,
+ close_handle,
+ }
+ }
+
+ fn run(&mut self) {
+ debug!("Starting relay list updater thread");
+ while self.wait_for_next_iteration() {
+ trace!("Relay list updater iteration");
+ if self.should_update() {
+ match self
+ .update()
+ .chain_err(|| "Failed to update list of relays")
+ {
+ Ok(()) => info!("Updated list of relays"),
+ Err(error) => error!("{}", error),
+ }
+ }
+ }
+ debug!("Relay list updater thread has finished");
+ }
+
+ fn wait_for_next_iteration(&mut self) -> bool {
+ use self::mpsc::RecvTimeoutError::*;
+
+ match self.close_handle.recv_timeout(UPDATE_INTERVAL) {
+ Ok(()) => true,
+ Err(Timeout) => true,
+ Err(Disconnected) => false,
+ }
+ }
+
+ fn should_update(&mut self) -> bool {
+ match SystemTime::now().duration_since(self.lock_parsed_relays().last_updated()) {
+ Ok(duration) => duration > MAX_CACHE_AGE,
+ Err(_) => false,
+ }
+ }
+
+ fn update(&mut self) -> Result<()> {
+ let new_relay_list = self
+ .download_relay_list()
+ .chain_err(|| "Failed to download relay list")?;
+
+ if let Err(error) = self.cache_relays(&new_relay_list) {
+ let chained_error = error.chain_err(|| "Failed to update relay cache on disk");
+ error!("{}", chained_error.display_chain());
+ }
+
+ let new_parsed_relays = ParsedRelays::from_relay_list(new_relay_list, SystemTime::now());
info!(
"Downloaded relay inventory has {} relays",
- parsed_relays.relays().len()
+ new_parsed_relays.relays().len()
);
- *self.lock_parsed_relays() = parsed_relays;
+
+ *self.lock_parsed_relays() = new_parsed_relays;
+
Ok(())
}
+ fn download_relay_list(&mut self) -> Result<RelayList> {
+ info!("Downloading list of relays...");
+
+ let download_future = self
+ .rpc_client
+ .relay_list()
+ .map_err(|e| Error::with_chain(e, ErrorKind::DownloadError));
+ let relay_list = Timer::default()
+ .timeout(download_future, DOWNLOAD_TIMEOUT)
+ .wait()?;
+
+ Ok(relay_list)
+ }
+
/// Write a `RelayList` to the cache file.
fn cache_relays(&self, relays: &RelayList) -> Result<()> {
debug!("Writing relays cache to {}", self.cache_path.display());
@@ -358,16 +456,9 @@ impl RelaySelector {
.chain_err(|| ErrorKind::SerializationError)
}
- /// Try to read the relays, first from cache and if that fails from the `resource_dir`.
- fn read_cached_relays(cache_path: &Path, resource_dir: &Path) -> Result<ParsedRelays> {
- match ParsedRelays::from_file(cache_path) {
- Ok(value) => Ok(value),
- Err(read_cache_error) => match ParsedRelays::from_file(
- resource_dir.join(RELAYS_FILENAME),
- ) {
- Ok(value) => Ok(value),
- Err(read_resource_error) => Err(read_cache_error.chain_err(|| read_resource_error)),
- },
- }
+ fn lock_parsed_relays(&self) -> MutexGuard<ParsedRelays> {
+ self.parsed_relays
+ .lock()
+ .expect("A thread crashed while it held a lock to the list of relays")
}
}