summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--CHANGELOG.md1
-rw-r--r--mullvad-daemon/src/main.rs28
-rw-r--r--mullvad-daemon/src/relays.rs318
3 files changed, 223 insertions, 124 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 4255080524..0da782c87e 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -39,6 +39,7 @@ Line wrap the file at 100 chars. Th
- App now uses statically linked OpenSSL on all platforms.
- Add OpenVPN logs at the top of the problem report instead of middle, to aid support work.
- Lower per log size limit in the problem report to 128 kiB.
+- Relay list is now updated periodically automatically, not only when the daemon starts.
#### Windows
- Rename tunnel interface to "Mullvad".
diff --git a/mullvad-daemon/src/main.rs b/mullvad-daemon/src/main.rs
index 61c9f84f02..97df24e7d4 100644
--- a/mullvad-daemon/src/main.rs
+++ b/mullvad-daemon/src/main.rs
@@ -72,7 +72,7 @@ use mullvad_types::version::{AppVersion, AppVersionInfo};
use std::io;
use std::net::IpAddr;
-use std::path::{Path, PathBuf};
+use std::path::PathBuf;
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};
@@ -122,8 +122,6 @@ error_chain!{
}
static MIN_TUNNEL_ALIVE_TIME: Duration = Duration::from_millis(1000);
-static MAX_RELAY_CACHE_AGE: Duration = Duration::from_secs(3600);
-static RELAY_CACHE_UPDATE_TIMEOUT: Duration = Duration::from_millis(3000);
const DAEMON_LOG_FILENAME: &str = "daemon.log";
const OPENVPN_LOG_FILENAME: &str = "openvpn.log";
@@ -238,7 +236,7 @@ impl Daemon {
let https_handle = https_handle.chain_err(|| "Unable to create am.i.mullvad client")?;
let relay_selector =
- Self::create_relay_selector(rpc_handle.clone(), &resource_dir, &cache_dir);
+ relays::RelaySelector::new(rpc_handle.clone(), &resource_dir, &cache_dir);
let (tx, rx) = mpsc::channel();
let management_interface_broadcaster =
@@ -272,22 +270,6 @@ impl Daemon {
})
}
- fn create_relay_selector(
- rpc_handle: mullvad_rpc::HttpHandle,
- resource_dir: &Path,
- cache_dir: &Path,
- ) -> relays::RelaySelector {
- let mut relay_selector = relays::RelaySelector::new(rpc_handle, &resource_dir, cache_dir);
- if let Ok(elapsed) = relay_selector.get_last_updated().elapsed() {
- if elapsed > MAX_RELAY_CACHE_AGE {
- if let Err(e) = relay_selector.update(RELAY_CACHE_UPDATE_TIMEOUT) {
- error!("Unable to update relay cache: {}", e.display_chain());
- }
- }
- }
- relay_selector
- }
-
// Starts the management interface and spawns a thread that will process it.
// Returns a handle that allows notifying all subscribers on events.
fn start_management_interface(
@@ -465,11 +447,7 @@ impl Daemon {
}
fn on_get_relay_locations(&mut self, tx: OneshotSender<RelayList>) {
- Self::oneshot_send(
- tx,
- self.relay_selector.get_locations().clone(),
- "relay locations",
- );
+ Self::oneshot_send(tx, self.relay_selector.get_locations(), "relay locations");
}
diff --git a/mullvad-daemon/src/relays.rs b/mullvad-daemon/src/relays.rs
index ef0455ff52..ca0cf4bd05 100644
--- a/mullvad-daemon/src/relays.rs
+++ b/mullvad-daemon/src/relays.rs
@@ -14,16 +14,20 @@ use serde_json;
use talpid_types::net::{TransportProtocol, TunnelEndpoint, TunnelEndpointData};
use std::fs::File;
-use std::io;
use std::net::IpAddr;
use std::path::{Path, PathBuf};
+use std::sync::{mpsc, Arc, Mutex, MutexGuard};
use std::time::{self, Duration, SystemTime};
+use std::{io, thread};
use rand::distributions::{IndependentSample, Range};
use rand::{self, Rng, ThreadRng};
use tokio_timer::{TimeoutError, Timer};
const RELAYS_FILENAME: &str = "relays.json";
+const DOWNLOAD_TIMEOUT: Duration = Duration::from_secs(15);
+const UPDATE_INTERVAL: Duration = Duration::from_secs(60 * 60);
+const MAX_CACHE_AGE: Duration = Duration::from_secs(60 * 60 * 24);
error_chain! {
errors {
@@ -41,13 +45,85 @@ impl<F> From<TimeoutError<F>> for Error {
}
}
-pub struct RelaySelector {
+struct ParsedRelays {
+ last_updated: SystemTime,
locations: RelayList,
relays: Vec<Relay>,
- last_updated: SystemTime,
+}
+
+impl ParsedRelays {
+ pub fn empty() -> Self {
+ ParsedRelays {
+ last_updated: time::UNIX_EPOCH,
+ locations: RelayList::empty(),
+ relays: Vec::new(),
+ }
+ }
+
+ pub fn from_relay_list(mut relay_list: RelayList, last_updated: SystemTime) -> Self {
+ let mut relays = Vec::new();
+ for country in &mut relay_list.countries {
+ let country_name = country.name.clone();
+ let country_code = country.code.clone();
+ for city in &mut country.cities {
+ city.has_active_relays = !city.relays.is_empty();
+ let city_name = city.name.clone();
+ let city_code = city.code.clone();
+ let latitude = city.latitude;
+ let longitude = city.longitude;
+ relays.extend(city.relays.drain(..).map(|mut relay| {
+ relay.location = Some(Location {
+ country: country_name.clone(),
+ country_code: country_code.clone(),
+ city: city_name.clone(),
+ city_code: city_code.clone(),
+ latitude,
+ longitude,
+ });
+ relay
+ }));
+ }
+ }
+ ParsedRelays {
+ last_updated,
+ locations: relay_list,
+ relays,
+ }
+ }
+
+ pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self> {
+ debug!("Reading relays from {}", path.as_ref().display());
+ let (last_modified, file) =
+ Self::read_file(path.as_ref()).chain_err(|| ErrorKind::RelayCacheError)?;
+ let relay_list = serde_json::from_reader(io::BufReader::new(file))
+ .chain_err(|| ErrorKind::SerializationError)?;
+
+ Ok(Self::from_relay_list(relay_list, last_modified))
+ }
+
+ fn read_file(path: &Path) -> io::Result<(SystemTime, File)> {
+ let file = File::open(path)?;
+ let last_modified = file.metadata()?.modified()?;
+ Ok((last_modified, file))
+ }
+
+ pub fn last_updated(&self) -> SystemTime {
+ self.last_updated
+ }
+
+ pub fn locations(&self) -> &RelayList {
+ &self.locations
+ }
+
+ pub fn relays(&self) -> &Vec<Relay> {
+ &self.relays
+ }
+}
+
+pub struct RelaySelector {
+ parsed_relays: Arc<Mutex<ParsedRelays>>,
rng: ThreadRng,
- rpc_client: RelayListProxy<HttpHandle>,
- cache_path: PathBuf,
+ _updater: RelayListUpdaterHandle,
}
impl RelaySelector {
@@ -55,40 +131,37 @@ impl RelaySelector {
/// to refresh the relay list from the internet.
pub fn new(rpc_handle: HttpHandle, resource_dir: &Path, cache_dir: &Path) -> Self {
let cache_path = cache_dir.join(RELAYS_FILENAME);
- let (last_updated, relay_list) = match Self::read_cached_relays(&cache_path, resource_dir) {
- Ok(value) => value,
- Err(error) => {
- let error = error.chain_err(|| "Unable to load cached relays");
- error!("{}", error.display_chain());
- (time::UNIX_EPOCH, RelayList::empty())
- }
- };
- let (locations, relays) = Self::process_relay_list(relay_list);
+ let unsynchronized_parsed_relays = Self::read_cached_relays(&cache_path, resource_dir)
+ .unwrap_or_else(|error| {
+ let chained_error = error.chain_err(|| "Unable to load cached relays");
+ error!("{}", chained_error.display_chain());
+ ParsedRelays::empty()
+ });
info!(
"Initialized with {} cached relays from {}",
- relays.len(),
- DateTime::<Local>::from(last_updated).format(::logging::DATE_TIME_FORMAT_STR)
+ unsynchronized_parsed_relays.relays().len(),
+ DateTime::<Local>::from(unsynchronized_parsed_relays.last_updated())
+ .format(::logging::DATE_TIME_FORMAT_STR)
);
+ let parsed_relays = Arc::new(Mutex::new(unsynchronized_parsed_relays));
+ let updater = RelayListUpdater::spawn(rpc_handle, cache_path, parsed_relays.clone());
RelaySelector {
- locations,
- relays,
- last_updated,
+ parsed_relays,
rng: rand::thread_rng(),
- rpc_client: RelayListProxy::new(rpc_handle),
- cache_path,
+ _updater: updater,
}
}
/// Returns all countries and cities. The cities in the object returned does not have any
/// relays in them.
- pub fn get_locations(&mut self) -> &RelayList {
- &self.locations
+ pub fn get_locations(&mut self) -> RelayList {
+ self.lock_parsed_relays().locations().clone()
}
- /// Returns the time when the relay list backing this selector was last fetched from the
- /// internet.
- pub fn get_last_updated(&self) -> SystemTime {
- self.last_updated
+ fn lock_parsed_relays(&self) -> MutexGuard<ParsedRelays> {
+ self.parsed_relays
+ .lock()
+ .expect("Relay updater thread crashed while it held a lock to the list of relays")
}
/// Returns a random relay and relay endpoint matching the given constraints and with
@@ -139,7 +212,8 @@ impl RelaySelector {
constraints: &RelayConstraints,
) -> Option<(Relay, TunnelEndpoint)> {
let matching_relays: Vec<Relay> = self
- .relays
+ .lock_parsed_relays()
+ .relays()
.iter()
.filter_map(|relay| Self::matching_relay(relay, constraints))
.collect();
@@ -253,51 +327,125 @@ impl RelaySelector {
.map(|openvpn_endpoint| TunnelEndpointData::OpenVpn(openvpn_endpoint))
}
- /// Downloads the latest relay list and caches it. This operation is blocking.
- pub fn update(&mut self, timeout: Duration) -> Result<()> {
+ /// Try to read the relays, first from cache and if that fails from the `resource_dir`.
+ fn read_cached_relays(cache_path: &Path, resource_dir: &Path) -> Result<ParsedRelays> {
+ match ParsedRelays::from_file(cache_path) {
+ Ok(value) => Ok(value),
+ Err(read_cache_error) => match ParsedRelays::from_file(
+ resource_dir.join(RELAYS_FILENAME),
+ ) {
+ Ok(value) => Ok(value),
+ Err(read_resource_error) => Err(read_cache_error.chain_err(|| read_resource_error)),
+ },
+ }
+ }
+}
+
+type RelayListUpdaterHandle = mpsc::Sender<()>;
+
+struct RelayListUpdater {
+ rpc_client: RelayListProxy<HttpHandle>,
+ cache_path: PathBuf,
+ parsed_relays: Arc<Mutex<ParsedRelays>>,
+ close_handle: mpsc::Receiver<()>,
+}
+
+impl RelayListUpdater {
+ pub fn spawn(
+ rpc_handle: HttpHandle,
+ cache_path: PathBuf,
+ parsed_relays: Arc<Mutex<ParsedRelays>>,
+ ) -> RelayListUpdaterHandle {
+ let (tx, rx) = mpsc::channel();
+
+ thread::spawn(move || Self::new(rpc_handle, cache_path, parsed_relays, rx).run());
+
+ tx
+ }
+
+ fn new(
+ rpc_handle: HttpHandle,
+ cache_path: PathBuf,
+ parsed_relays: Arc<Mutex<ParsedRelays>>,
+ close_handle: mpsc::Receiver<()>,
+ ) -> Self {
+ let rpc_client = RelayListProxy::new(rpc_handle);
+
+ RelayListUpdater {
+ rpc_client,
+ cache_path,
+ parsed_relays,
+ close_handle,
+ }
+ }
+
+ fn run(&mut self) {
+ debug!("Starting relay list updater thread");
+ while self.wait_for_next_iteration() {
+ trace!("Relay list updater iteration");
+ if self.should_update() {
+ match self
+ .update()
+ .chain_err(|| "Failed to update list of relays")
+ {
+ Ok(()) => info!("Updated list of relays"),
+ Err(error) => error!("{}", error),
+ }
+ }
+ }
+ debug!("Relay list updater thread has finished");
+ }
+
+ fn wait_for_next_iteration(&mut self) -> bool {
+ use self::mpsc::RecvTimeoutError::*;
+
+ match self.close_handle.recv_timeout(UPDATE_INTERVAL) {
+ Ok(()) => true,
+ Err(Timeout) => true,
+ Err(Disconnected) => false,
+ }
+ }
+
+ fn should_update(&mut self) -> bool {
+ match SystemTime::now().duration_since(self.lock_parsed_relays().last_updated()) {
+ Ok(duration) => duration > MAX_CACHE_AGE,
+ Err(_) => false,
+ }
+ }
+
+ fn update(&mut self) -> Result<()> {
+ let new_relay_list = self
+ .download_relay_list()
+ .chain_err(|| "Failed to download relay list")?;
+
+ if let Err(error) = self.cache_relays(&new_relay_list) {
+ let chained_error = error.chain_err(|| "Failed to update relay cache on disk");
+ error!("{}", chained_error.display_chain());
+ }
+
+ let new_parsed_relays = ParsedRelays::from_relay_list(new_relay_list, SystemTime::now());
+ info!(
+ "Downloaded relay inventory has {} relays",
+ new_parsed_relays.relays().len()
+ );
+
+ *self.lock_parsed_relays() = new_parsed_relays;
+
+ Ok(())
+ }
+
+ fn download_relay_list(&mut self) -> Result<RelayList> {
info!("Downloading list of relays...");
+
let download_future = self
.rpc_client
.relay_list()
.map_err(|e| Error::with_chain(e, ErrorKind::DownloadError));
- let relay_list = Timer::default().timeout(download_future, timeout).wait()?;
- if let Err(e) = self.cache_relays(&relay_list) {
- error!("Unable to save relays to cache: {}", e.display_chain());
- }
- let (locations, relays) = Self::process_relay_list(relay_list);
- info!("Downloaded relay inventory has {} relays", relays.len());
- self.locations = locations;
- self.relays = relays;
- self.last_updated = SystemTime::now();
- Ok(())
- }
+ let relay_list = Timer::default()
+ .timeout(download_future, DOWNLOAD_TIMEOUT)
+ .wait()?;
- // Extracts all relays from their corresponding cities and return them as a separate vector.
- fn process_relay_list(mut relay_list: RelayList) -> (RelayList, Vec<Relay>) {
- let mut relays = Vec::new();
- for country in &mut relay_list.countries {
- let country_name = country.name.clone();
- let country_code = country.code.clone();
- for city in &mut country.cities {
- city.has_active_relays = !city.relays.is_empty();
- let city_name = city.name.clone();
- let city_code = city.code.clone();
- let latitude = city.latitude;
- let longitude = city.longitude;
- relays.extend(city.relays.drain(..).map(|mut relay| {
- relay.location = Some(Location {
- country: country_name.clone(),
- country_code: country_code.clone(),
- city: city_name.clone(),
- city_code: city_code.clone(),
- latitude,
- longitude,
- });
- relay
- }));
- }
- }
- (relay_list, relays)
+ Ok(relay_list)
}
/// Write a `RelayList` to the cache file.
@@ -308,37 +456,9 @@ impl RelaySelector {
.chain_err(|| ErrorKind::SerializationError)
}
- /// Try to read the relays, first from cache and if that fails from the `resource_dir`.
- fn read_cached_relays(
- cache_path: &Path,
- resource_dir: &Path,
- ) -> Result<(SystemTime, RelayList)> {
- match Self::read_relays(cache_path) {
- Ok(value) => Ok(value),
- Err(read_cache_error) => match Self::read_relays(resource_dir.join(RELAYS_FILENAME)) {
- Ok(value) => Ok(value),
- Err(read_resource_error) => Err(read_cache_error.chain_err(|| read_resource_error)),
- },
- }
- }
-
- /// Read and deserialize a `RelayList` from a given path.
- /// Returns the file modification time and the relays.
- fn read_relays<P: AsRef<Path>>(path: P) -> Result<(SystemTime, RelayList)> {
- debug!(
- "Trying to read relays cache from {}",
- path.as_ref().display()
- );
- let (last_modified, file) =
- Self::read_file(path.as_ref()).chain_err(|| ErrorKind::RelayCacheError)?;
- let relay_list = serde_json::from_reader(io::BufReader::new(file))
- .chain_err(|| ErrorKind::SerializationError)?;
- Ok((last_modified, relay_list))
- }
-
- fn read_file(path: &Path) -> io::Result<(SystemTime, File)> {
- let file = File::open(path)?;
- let last_modified = file.metadata()?.modified()?;
- Ok((last_modified, file))
+ fn lock_parsed_relays(&self) -> MutexGuard<ParsedRelays> {
+ self.parsed_relays
+ .lock()
+ .expect("A thread crashed while it held a lock to the list of relays")
}
}