diff options
| -rw-r--r-- | CHANGELOG.md | 1 | ||||
| -rw-r--r-- | mullvad-daemon/src/main.rs | 28 | ||||
| -rw-r--r-- | mullvad-daemon/src/relays.rs | 318 |
3 files changed, 223 insertions, 124 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index 4255080524..0da782c87e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -39,6 +39,7 @@ Line wrap the file at 100 chars. Th - App now uses statically linked OpenSSL on all platforms. - Add OpenVPN logs at the top of the problem report instead of middle, to aid support work. - Lower per log size limit in the problem report to 128 kiB. +- Relay list is now updated periodically automatically, not only when the daemon starts. #### Windows - Rename tunnel interface to "Mullvad". diff --git a/mullvad-daemon/src/main.rs b/mullvad-daemon/src/main.rs index 61c9f84f02..97df24e7d4 100644 --- a/mullvad-daemon/src/main.rs +++ b/mullvad-daemon/src/main.rs @@ -72,7 +72,7 @@ use mullvad_types::version::{AppVersion, AppVersionInfo}; use std::io; use std::net::IpAddr; -use std::path::{Path, PathBuf}; +use std::path::PathBuf; use std::sync::{mpsc, Arc, Mutex}; use std::thread; use std::time::{Duration, Instant}; @@ -122,8 +122,6 @@ error_chain!{ } static MIN_TUNNEL_ALIVE_TIME: Duration = Duration::from_millis(1000); -static MAX_RELAY_CACHE_AGE: Duration = Duration::from_secs(3600); -static RELAY_CACHE_UPDATE_TIMEOUT: Duration = Duration::from_millis(3000); const DAEMON_LOG_FILENAME: &str = "daemon.log"; const OPENVPN_LOG_FILENAME: &str = "openvpn.log"; @@ -238,7 +236,7 @@ impl Daemon { let https_handle = https_handle.chain_err(|| "Unable to create am.i.mullvad client")?; let relay_selector = - Self::create_relay_selector(rpc_handle.clone(), &resource_dir, &cache_dir); + relays::RelaySelector::new(rpc_handle.clone(), &resource_dir, &cache_dir); let (tx, rx) = mpsc::channel(); let management_interface_broadcaster = @@ -272,22 +270,6 @@ impl Daemon { }) } - fn create_relay_selector( - rpc_handle: mullvad_rpc::HttpHandle, - resource_dir: &Path, - cache_dir: &Path, - ) -> relays::RelaySelector { - let mut relay_selector = relays::RelaySelector::new(rpc_handle, &resource_dir, cache_dir); - if let Ok(elapsed) = relay_selector.get_last_updated().elapsed() { - if elapsed > MAX_RELAY_CACHE_AGE { - if let Err(e) = relay_selector.update(RELAY_CACHE_UPDATE_TIMEOUT) { - error!("Unable to update relay cache: {}", e.display_chain()); - } - } - } - relay_selector - } - // Starts the management interface and spawns a thread that will process it. // Returns a handle that allows notifying all subscribers on events. fn start_management_interface( @@ -465,11 +447,7 @@ impl Daemon { } fn on_get_relay_locations(&mut self, tx: OneshotSender<RelayList>) { - Self::oneshot_send( - tx, - self.relay_selector.get_locations().clone(), - "relay locations", - ); + Self::oneshot_send(tx, self.relay_selector.get_locations(), "relay locations"); } diff --git a/mullvad-daemon/src/relays.rs b/mullvad-daemon/src/relays.rs index ef0455ff52..ca0cf4bd05 100644 --- a/mullvad-daemon/src/relays.rs +++ b/mullvad-daemon/src/relays.rs @@ -14,16 +14,20 @@ use serde_json; use talpid_types::net::{TransportProtocol, TunnelEndpoint, TunnelEndpointData}; use std::fs::File; -use std::io; use std::net::IpAddr; use std::path::{Path, PathBuf}; +use std::sync::{mpsc, Arc, Mutex, MutexGuard}; use std::time::{self, Duration, SystemTime}; +use std::{io, thread}; use rand::distributions::{IndependentSample, Range}; use rand::{self, Rng, ThreadRng}; use tokio_timer::{TimeoutError, Timer}; const RELAYS_FILENAME: &str = "relays.json"; +const DOWNLOAD_TIMEOUT: Duration = Duration::from_secs(15); +const UPDATE_INTERVAL: Duration = Duration::from_secs(60 * 60); +const MAX_CACHE_AGE: Duration = Duration::from_secs(60 * 60 * 24); error_chain! { errors { @@ -41,13 +45,85 @@ impl<F> From<TimeoutError<F>> for Error { } } -pub struct RelaySelector { +struct ParsedRelays { + last_updated: SystemTime, locations: RelayList, relays: Vec<Relay>, - last_updated: SystemTime, +} + +impl ParsedRelays { + pub fn empty() -> Self { + ParsedRelays { + last_updated: time::UNIX_EPOCH, + locations: RelayList::empty(), + relays: Vec::new(), + } + } + + pub fn from_relay_list(mut relay_list: RelayList, last_updated: SystemTime) -> Self { + let mut relays = Vec::new(); + for country in &mut relay_list.countries { + let country_name = country.name.clone(); + let country_code = country.code.clone(); + for city in &mut country.cities { + city.has_active_relays = !city.relays.is_empty(); + let city_name = city.name.clone(); + let city_code = city.code.clone(); + let latitude = city.latitude; + let longitude = city.longitude; + relays.extend(city.relays.drain(..).map(|mut relay| { + relay.location = Some(Location { + country: country_name.clone(), + country_code: country_code.clone(), + city: city_name.clone(), + city_code: city_code.clone(), + latitude, + longitude, + }); + relay + })); + } + } + ParsedRelays { + last_updated, + locations: relay_list, + relays, + } + } + + pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self> { + debug!("Reading relays from {}", path.as_ref().display()); + let (last_modified, file) = + Self::read_file(path.as_ref()).chain_err(|| ErrorKind::RelayCacheError)?; + let relay_list = serde_json::from_reader(io::BufReader::new(file)) + .chain_err(|| ErrorKind::SerializationError)?; + + Ok(Self::from_relay_list(relay_list, last_modified)) + } + + fn read_file(path: &Path) -> io::Result<(SystemTime, File)> { + let file = File::open(path)?; + let last_modified = file.metadata()?.modified()?; + Ok((last_modified, file)) + } + + pub fn last_updated(&self) -> SystemTime { + self.last_updated + } + + pub fn locations(&self) -> &RelayList { + &self.locations + } + + pub fn relays(&self) -> &Vec<Relay> { + &self.relays + } +} + +pub struct RelaySelector { + parsed_relays: Arc<Mutex<ParsedRelays>>, rng: ThreadRng, - rpc_client: RelayListProxy<HttpHandle>, - cache_path: PathBuf, + _updater: RelayListUpdaterHandle, } impl RelaySelector { @@ -55,40 +131,37 @@ impl RelaySelector { /// to refresh the relay list from the internet. pub fn new(rpc_handle: HttpHandle, resource_dir: &Path, cache_dir: &Path) -> Self { let cache_path = cache_dir.join(RELAYS_FILENAME); - let (last_updated, relay_list) = match Self::read_cached_relays(&cache_path, resource_dir) { - Ok(value) => value, - Err(error) => { - let error = error.chain_err(|| "Unable to load cached relays"); - error!("{}", error.display_chain()); - (time::UNIX_EPOCH, RelayList::empty()) - } - }; - let (locations, relays) = Self::process_relay_list(relay_list); + let unsynchronized_parsed_relays = Self::read_cached_relays(&cache_path, resource_dir) + .unwrap_or_else(|error| { + let chained_error = error.chain_err(|| "Unable to load cached relays"); + error!("{}", chained_error.display_chain()); + ParsedRelays::empty() + }); info!( "Initialized with {} cached relays from {}", - relays.len(), - DateTime::<Local>::from(last_updated).format(::logging::DATE_TIME_FORMAT_STR) + unsynchronized_parsed_relays.relays().len(), + DateTime::<Local>::from(unsynchronized_parsed_relays.last_updated()) + .format(::logging::DATE_TIME_FORMAT_STR) ); + let parsed_relays = Arc::new(Mutex::new(unsynchronized_parsed_relays)); + let updater = RelayListUpdater::spawn(rpc_handle, cache_path, parsed_relays.clone()); RelaySelector { - locations, - relays, - last_updated, + parsed_relays, rng: rand::thread_rng(), - rpc_client: RelayListProxy::new(rpc_handle), - cache_path, + _updater: updater, } } /// Returns all countries and cities. The cities in the object returned does not have any /// relays in them. - pub fn get_locations(&mut self) -> &RelayList { - &self.locations + pub fn get_locations(&mut self) -> RelayList { + self.lock_parsed_relays().locations().clone() } - /// Returns the time when the relay list backing this selector was last fetched from the - /// internet. - pub fn get_last_updated(&self) -> SystemTime { - self.last_updated + fn lock_parsed_relays(&self) -> MutexGuard<ParsedRelays> { + self.parsed_relays + .lock() + .expect("Relay updater thread crashed while it held a lock to the list of relays") } /// Returns a random relay and relay endpoint matching the given constraints and with @@ -139,7 +212,8 @@ impl RelaySelector { constraints: &RelayConstraints, ) -> Option<(Relay, TunnelEndpoint)> { let matching_relays: Vec<Relay> = self - .relays + .lock_parsed_relays() + .relays() .iter() .filter_map(|relay| Self::matching_relay(relay, constraints)) .collect(); @@ -253,51 +327,125 @@ impl RelaySelector { .map(|openvpn_endpoint| TunnelEndpointData::OpenVpn(openvpn_endpoint)) } - /// Downloads the latest relay list and caches it. This operation is blocking. - pub fn update(&mut self, timeout: Duration) -> Result<()> { + /// Try to read the relays, first from cache and if that fails from the `resource_dir`. + fn read_cached_relays(cache_path: &Path, resource_dir: &Path) -> Result<ParsedRelays> { + match ParsedRelays::from_file(cache_path) { + Ok(value) => Ok(value), + Err(read_cache_error) => match ParsedRelays::from_file( + resource_dir.join(RELAYS_FILENAME), + ) { + Ok(value) => Ok(value), + Err(read_resource_error) => Err(read_cache_error.chain_err(|| read_resource_error)), + }, + } + } +} + +type RelayListUpdaterHandle = mpsc::Sender<()>; + +struct RelayListUpdater { + rpc_client: RelayListProxy<HttpHandle>, + cache_path: PathBuf, + parsed_relays: Arc<Mutex<ParsedRelays>>, + close_handle: mpsc::Receiver<()>, +} + +impl RelayListUpdater { + pub fn spawn( + rpc_handle: HttpHandle, + cache_path: PathBuf, + parsed_relays: Arc<Mutex<ParsedRelays>>, + ) -> RelayListUpdaterHandle { + let (tx, rx) = mpsc::channel(); + + thread::spawn(move || Self::new(rpc_handle, cache_path, parsed_relays, rx).run()); + + tx + } + + fn new( + rpc_handle: HttpHandle, + cache_path: PathBuf, + parsed_relays: Arc<Mutex<ParsedRelays>>, + close_handle: mpsc::Receiver<()>, + ) -> Self { + let rpc_client = RelayListProxy::new(rpc_handle); + + RelayListUpdater { + rpc_client, + cache_path, + parsed_relays, + close_handle, + } + } + + fn run(&mut self) { + debug!("Starting relay list updater thread"); + while self.wait_for_next_iteration() { + trace!("Relay list updater iteration"); + if self.should_update() { + match self + .update() + .chain_err(|| "Failed to update list of relays") + { + Ok(()) => info!("Updated list of relays"), + Err(error) => error!("{}", error), + } + } + } + debug!("Relay list updater thread has finished"); + } + + fn wait_for_next_iteration(&mut self) -> bool { + use self::mpsc::RecvTimeoutError::*; + + match self.close_handle.recv_timeout(UPDATE_INTERVAL) { + Ok(()) => true, + Err(Timeout) => true, + Err(Disconnected) => false, + } + } + + fn should_update(&mut self) -> bool { + match SystemTime::now().duration_since(self.lock_parsed_relays().last_updated()) { + Ok(duration) => duration > MAX_CACHE_AGE, + Err(_) => false, + } + } + + fn update(&mut self) -> Result<()> { + let new_relay_list = self + .download_relay_list() + .chain_err(|| "Failed to download relay list")?; + + if let Err(error) = self.cache_relays(&new_relay_list) { + let chained_error = error.chain_err(|| "Failed to update relay cache on disk"); + error!("{}", chained_error.display_chain()); + } + + let new_parsed_relays = ParsedRelays::from_relay_list(new_relay_list, SystemTime::now()); + info!( + "Downloaded relay inventory has {} relays", + new_parsed_relays.relays().len() + ); + + *self.lock_parsed_relays() = new_parsed_relays; + + Ok(()) + } + + fn download_relay_list(&mut self) -> Result<RelayList> { info!("Downloading list of relays..."); + let download_future = self .rpc_client .relay_list() .map_err(|e| Error::with_chain(e, ErrorKind::DownloadError)); - let relay_list = Timer::default().timeout(download_future, timeout).wait()?; - if let Err(e) = self.cache_relays(&relay_list) { - error!("Unable to save relays to cache: {}", e.display_chain()); - } - let (locations, relays) = Self::process_relay_list(relay_list); - info!("Downloaded relay inventory has {} relays", relays.len()); - self.locations = locations; - self.relays = relays; - self.last_updated = SystemTime::now(); - Ok(()) - } + let relay_list = Timer::default() + .timeout(download_future, DOWNLOAD_TIMEOUT) + .wait()?; - // Extracts all relays from their corresponding cities and return them as a separate vector. - fn process_relay_list(mut relay_list: RelayList) -> (RelayList, Vec<Relay>) { - let mut relays = Vec::new(); - for country in &mut relay_list.countries { - let country_name = country.name.clone(); - let country_code = country.code.clone(); - for city in &mut country.cities { - city.has_active_relays = !city.relays.is_empty(); - let city_name = city.name.clone(); - let city_code = city.code.clone(); - let latitude = city.latitude; - let longitude = city.longitude; - relays.extend(city.relays.drain(..).map(|mut relay| { - relay.location = Some(Location { - country: country_name.clone(), - country_code: country_code.clone(), - city: city_name.clone(), - city_code: city_code.clone(), - latitude, - longitude, - }); - relay - })); - } - } - (relay_list, relays) + Ok(relay_list) } /// Write a `RelayList` to the cache file. @@ -308,37 +456,9 @@ impl RelaySelector { .chain_err(|| ErrorKind::SerializationError) } - /// Try to read the relays, first from cache and if that fails from the `resource_dir`. - fn read_cached_relays( - cache_path: &Path, - resource_dir: &Path, - ) -> Result<(SystemTime, RelayList)> { - match Self::read_relays(cache_path) { - Ok(value) => Ok(value), - Err(read_cache_error) => match Self::read_relays(resource_dir.join(RELAYS_FILENAME)) { - Ok(value) => Ok(value), - Err(read_resource_error) => Err(read_cache_error.chain_err(|| read_resource_error)), - }, - } - } - - /// Read and deserialize a `RelayList` from a given path. - /// Returns the file modification time and the relays. - fn read_relays<P: AsRef<Path>>(path: P) -> Result<(SystemTime, RelayList)> { - debug!( - "Trying to read relays cache from {}", - path.as_ref().display() - ); - let (last_modified, file) = - Self::read_file(path.as_ref()).chain_err(|| ErrorKind::RelayCacheError)?; - let relay_list = serde_json::from_reader(io::BufReader::new(file)) - .chain_err(|| ErrorKind::SerializationError)?; - Ok((last_modified, relay_list)) - } - - fn read_file(path: &Path) -> io::Result<(SystemTime, File)> { - let file = File::open(path)?; - let last_modified = file.metadata()?.modified()?; - Ok((last_modified, file)) + fn lock_parsed_relays(&self) -> MutexGuard<ParsedRelays> { + self.parsed_relays + .lock() + .expect("A thread crashed while it held a lock to the list of relays") } } |
