diff options
| author | Janito Vaqueiro Ferreira Filho <janito@mullvad.net> | 2018-07-09 17:37:26 -0300 |
|---|---|---|
| committer | Janito Vaqueiro Ferreira Filho <janito@mullvad.net> | 2018-07-13 08:40:16 -0300 |
| commit | b91191d05e569e1aaccb7a734a57f642e6c865a0 (patch) | |
| tree | bb0ca28ece0ad6376ac4fb4a338356de41bb0301 | |
| parent | 24db388d3c98b9e7cda81930fabbbd3cb4cec3e9 (diff) | |
| download | mullvadvpn-b91191d05e569e1aaccb7a734a57f642e6c865a0.tar.xz mullvadvpn-b91191d05e569e1aaccb7a734a57f642e6c865a0.zip | |
Periodically update the relay cache
| -rw-r--r-- | mullvad-daemon/src/main.rs | 22 | ||||
| -rw-r--r-- | mullvad-daemon/src/relays.rs | 173 |
2 files changed, 134 insertions, 61 deletions
diff --git a/mullvad-daemon/src/main.rs b/mullvad-daemon/src/main.rs index efc2da7632..97df24e7d4 100644 --- a/mullvad-daemon/src/main.rs +++ b/mullvad-daemon/src/main.rs @@ -72,7 +72,7 @@ use mullvad_types::version::{AppVersion, AppVersionInfo}; use std::io; use std::net::IpAddr; -use std::path::{Path, PathBuf}; +use std::path::PathBuf; use std::sync::{mpsc, Arc, Mutex}; use std::thread; use std::time::{Duration, Instant}; @@ -122,8 +122,6 @@ error_chain!{ } static MIN_TUNNEL_ALIVE_TIME: Duration = Duration::from_millis(1000); -static MAX_RELAY_CACHE_AGE: Duration = Duration::from_secs(3600); -static RELAY_CACHE_UPDATE_TIMEOUT: Duration = Duration::from_millis(3000); const DAEMON_LOG_FILENAME: &str = "daemon.log"; const OPENVPN_LOG_FILENAME: &str = "openvpn.log"; @@ -238,7 +236,7 @@ impl Daemon { let https_handle = https_handle.chain_err(|| "Unable to create am.i.mullvad client")?; let relay_selector = - Self::create_relay_selector(rpc_handle.clone(), &resource_dir, &cache_dir); + relays::RelaySelector::new(rpc_handle.clone(), &resource_dir, &cache_dir); let (tx, rx) = mpsc::channel(); let management_interface_broadcaster = @@ -272,22 +270,6 @@ impl Daemon { }) } - fn create_relay_selector( - rpc_handle: mullvad_rpc::HttpHandle, - resource_dir: &Path, - cache_dir: &Path, - ) -> relays::RelaySelector { - let mut relay_selector = relays::RelaySelector::new(rpc_handle, &resource_dir, cache_dir); - if let Ok(elapsed) = relay_selector.get_last_updated().elapsed() { - if elapsed > MAX_RELAY_CACHE_AGE { - if let Err(e) = relay_selector.update(RELAY_CACHE_UPDATE_TIMEOUT) { - error!("Unable to update relay cache: {}", e.display_chain()); - } - } - } - relay_selector - } - // Starts the management interface and spawns a thread that will process it. // Returns a handle that allows notifying all subscribers on events. fn start_management_interface( diff --git a/mullvad-daemon/src/relays.rs b/mullvad-daemon/src/relays.rs index 4bae7eac7b..fb60f985ba 100644 --- a/mullvad-daemon/src/relays.rs +++ b/mullvad-daemon/src/relays.rs @@ -14,17 +14,20 @@ use serde_json; use talpid_types::net::{TransportProtocol, TunnelEndpoint, TunnelEndpointData}; use std::fs::File; -use std::io; use std::net::IpAddr; use std::path::{Path, PathBuf}; -use std::sync::{Arc, Mutex, MutexGuard}; +use std::sync::{mpsc, Arc, Mutex, MutexGuard}; use std::time::{self, Duration, SystemTime}; +use std::{io, thread}; use rand::distributions::{IndependentSample, Range}; use rand::{self, Rng, ThreadRng}; use tokio_timer::{TimeoutError, Timer}; const RELAYS_FILENAME: &str = "relays.json"; +const DOWNLOAD_TIMEOUT: Duration = Duration::from_secs(3); +const UPDATE_INTERVAL: Duration = Duration::from_secs(60 * 60); +const MAX_CACHE_AGE: Duration = Duration::from_secs(60 * 60 * 24); error_chain! { errors { @@ -120,8 +123,7 @@ impl ParsedRelays { pub struct RelaySelector { parsed_relays: Arc<Mutex<ParsedRelays>>, rng: ThreadRng, - rpc_client: RelayListProxy<HttpHandle>, - cache_path: PathBuf, + _updater: RelayListUpdaterHandle, } impl RelaySelector { @@ -129,23 +131,24 @@ impl RelaySelector { /// to refresh the relay list from the internet. pub fn new(rpc_handle: HttpHandle, resource_dir: &Path, cache_dir: &Path) -> Self { let cache_path = cache_dir.join(RELAYS_FILENAME); - let parsed_relays = - Self::read_cached_relays(&cache_path, resource_dir).unwrap_or_else(|error| { + let unsynchronized_parsed_relays = Self::read_cached_relays(&cache_path, resource_dir) + .unwrap_or_else(|error| { let chained_error = error.chain_err(|| "Unable to load cached relays"); error!("{}", chained_error.display_chain()); ParsedRelays::empty() }); info!( "Initialized with {} cached relays from {}", - parsed_relays.relays().len(), - DateTime::<Local>::from(parsed_relays.last_updated()) + unsynchronized_parsed_relays.relays().len(), + DateTime::<Local>::from(unsynchronized_parsed_relays.last_updated()) .format(::logging::DATE_TIME_FORMAT_STR) ); + let parsed_relays = Arc::new(Mutex::new(unsynchronized_parsed_relays)); + let updater = RelayListUpdater::spawn(rpc_handle, cache_path, parsed_relays.clone()); RelaySelector { - parsed_relays: Arc::new(Mutex::new(parsed_relays)), + parsed_relays, rng: rand::thread_rng(), - rpc_client: RelayListProxy::new(rpc_handle), - cache_path, + _updater: updater, } } @@ -161,12 +164,6 @@ impl RelaySelector { .expect("Relay updater thread crashed while it held a lock to the list of relays") } - /// Returns the time when the relay list backing this selector was last fetched from the - /// internet. - pub fn get_last_updated(&self) -> SystemTime { - self.lock_parsed_relays().last_updated() - } - /// Returns a random relay and relay endpoint matching the given constraints and with /// preferences applied. pub fn get_tunnel_endpoint( @@ -330,26 +327,127 @@ impl RelaySelector { .map(|openvpn_endpoint| TunnelEndpointData::OpenVpn(openvpn_endpoint)) } - /// Downloads the latest relay list and caches it. This operation is blocking. - pub fn update(&mut self, timeout: Duration) -> Result<()> { - info!("Downloading list of relays..."); - let download_future = self - .rpc_client - .relay_list() - .map_err(|e| Error::with_chain(e, ErrorKind::DownloadError)); - let relay_list = Timer::default().timeout(download_future, timeout).wait()?; - if let Err(e) = self.cache_relays(&relay_list) { - error!("Unable to save relays to cache: {}", e.display_chain()); + /// Try to read the relays, first from cache and if that fails from the `resource_dir`. + fn read_cached_relays(cache_path: &Path, resource_dir: &Path) -> Result<ParsedRelays> { + match ParsedRelays::from_file(cache_path) { + Ok(value) => Ok(value), + Err(read_cache_error) => match ParsedRelays::from_file( + resource_dir.join(RELAYS_FILENAME), + ) { + Ok(value) => Ok(value), + Err(read_resource_error) => Err(read_cache_error.chain_err(|| read_resource_error)), + }, } - let parsed_relays = ParsedRelays::from_relay_list(relay_list, SystemTime::now()); + } +} + +type RelayListUpdaterHandle = mpsc::Sender<()>; + +struct RelayListUpdater { + rpc_client: RelayListProxy<HttpHandle>, + cache_path: PathBuf, + parsed_relays: Arc<Mutex<ParsedRelays>>, + close_handle: mpsc::Receiver<()>, +} + +impl RelayListUpdater { + pub fn spawn( + rpc_handle: HttpHandle, + cache_path: PathBuf, + parsed_relays: Arc<Mutex<ParsedRelays>>, + ) -> RelayListUpdaterHandle { + let (tx, rx) = mpsc::channel(); + + thread::spawn(move || Self::new(rpc_handle, cache_path, parsed_relays, rx).run()); + + tx + } + + fn new( + rpc_handle: HttpHandle, + cache_path: PathBuf, + parsed_relays: Arc<Mutex<ParsedRelays>>, + close_handle: mpsc::Receiver<()>, + ) -> Self { + let rpc_client = RelayListProxy::new(rpc_handle); + + RelayListUpdater { + rpc_client, + cache_path, + parsed_relays, + close_handle, + } + } + + fn run(&mut self) { + debug!("Starting relay list updater thread"); + while self.wait_for_next_iteration() { + trace!("Relay list updater iteration"); + if self.should_update() { + match self + .update() + .chain_err(|| "Failed to update list of relays") + { + Ok(()) => info!("Updated list of relays"), + Err(error) => error!("{}", error), + } + } + } + debug!("Relay list updater thread has finished"); + } + + fn wait_for_next_iteration(&mut self) -> bool { + use self::mpsc::RecvTimeoutError::*; + + match self.close_handle.recv_timeout(UPDATE_INTERVAL) { + Ok(()) => true, + Err(Timeout) => true, + Err(Disconnected) => false, + } + } + + fn should_update(&mut self) -> bool { + match SystemTime::now().duration_since(self.lock_parsed_relays().last_updated()) { + Ok(duration) => duration > MAX_CACHE_AGE, + Err(_) => false, + } + } + + fn update(&mut self) -> Result<()> { + let new_relay_list = self + .download_relay_list() + .chain_err(|| "Failed to download relay list")?; + + if let Err(error) = self.cache_relays(&new_relay_list) { + let chained_error = error.chain_err(|| "Failed to update relay cache on disk"); + error!("{}", chained_error.display_chain()); + } + + let new_parsed_relays = ParsedRelays::from_relay_list(new_relay_list, SystemTime::now()); info!( "Downloaded relay inventory has {} relays", - parsed_relays.relays().len() + new_parsed_relays.relays().len() ); - *self.lock_parsed_relays() = parsed_relays; + + *self.lock_parsed_relays() = new_parsed_relays; + Ok(()) } + fn download_relay_list(&mut self) -> Result<RelayList> { + info!("Downloading list of relays..."); + + let download_future = self + .rpc_client + .relay_list() + .map_err(|e| Error::with_chain(e, ErrorKind::DownloadError)); + let relay_list = Timer::default() + .timeout(download_future, DOWNLOAD_TIMEOUT) + .wait()?; + + Ok(relay_list) + } + /// Write a `RelayList` to the cache file. fn cache_relays(&self, relays: &RelayList) -> Result<()> { debug!("Writing relays cache to {}", self.cache_path.display()); @@ -358,16 +456,9 @@ impl RelaySelector { .chain_err(|| ErrorKind::SerializationError) } - /// Try to read the relays, first from cache and if that fails from the `resource_dir`. - fn read_cached_relays(cache_path: &Path, resource_dir: &Path) -> Result<ParsedRelays> { - match ParsedRelays::from_file(cache_path) { - Ok(value) => Ok(value), - Err(read_cache_error) => match ParsedRelays::from_file( - resource_dir.join(RELAYS_FILENAME), - ) { - Ok(value) => Ok(value), - Err(read_resource_error) => Err(read_cache_error.chain_err(|| read_resource_error)), - }, - } + fn lock_parsed_relays(&self) -> MutexGuard<ParsedRelays> { + self.parsed_relays + .lock() + .expect("A thread crashed while it held a lock to the list of relays") } } |
