diff options
| author | David Lönnhager <david.l@mullvad.net> | 2020-08-31 11:47:24 +0200 |
|---|---|---|
| committer | David Lönnhager <david.l@mullvad.net> | 2020-09-01 14:18:35 +0200 |
| commit | def635a4a8f4a015ecc3ff0f24c02f8919a8929c (patch) | |
| tree | f0c8f80baed0d21d65a3120da318e8d1e65c62ad | |
| parent | efec1159152e8da388ca489c97a55e20d6027ed1 (diff) | |
| download | mullvadvpn-def635a4a8f4a015ecc3ff0f24c02f8919a8929c.tar.xz mullvadvpn-def635a4a8f4a015ecc3ff0f24c02f8919a8929c.zip | |
Asynchronously write account history to file
| -rw-r--r-- | mullvad-daemon/src/account_history.rs | 84 | ||||
| -rw-r--r-- | mullvad-daemon/src/lib.rs | 1 |
2 files changed, 56 insertions, 29 deletions
diff --git a/mullvad-daemon/src/account_history.rs b/mullvad-daemon/src/account_history.rs index 084ea271ee..8cceecda42 100644 --- a/mullvad-daemon/src/account_history.rs +++ b/mullvad-daemon/src/account_history.rs @@ -6,6 +6,7 @@ use std::{ future::Future, io::{self, Seek, Write}, path::Path, + sync::{Arc, Mutex}, }; use talpid_types::ErrorExt; @@ -22,6 +23,9 @@ pub enum Error { #[error(display = "Unable to write account history file")] Write(#[error(source)] io::Error), + + #[error(display = "Write task panicked or was cancelled")] + WriteCancelled(#[error(source)] tokio::task::JoinError), } static ACCOUNT_HISTORY_FILE: &str = "account-history.json"; @@ -29,19 +33,19 @@ static ACCOUNT_HISTORY_LIMIT: usize = 3; /// A trivial MRU cache of account data pub struct AccountHistory { - file: io::BufWriter<fs::File>, - accounts: VecDeque<AccountEntry>, + file: Arc<Mutex<io::BufWriter<fs::File>>>, + accounts: Arc<Mutex<VecDeque<AccountEntry>>>, rpc_handle: MullvadRestHandle, } impl AccountHistory { - pub fn new( + pub async fn new( cache_dir: &Path, settings_dir: &Path, rpc_handle: MullvadRestHandle, ) -> Result<AccountHistory> { - Self::migrate_from_old_file_location(cache_dir, settings_dir); + Self::migrate_from_old_file_location(cache_dir, settings_dir).await; let mut options = fs::OpenOptions::new(); #[cfg(unix)] @@ -83,30 +87,32 @@ impl AccountHistory { }; let file = io::BufWriter::new(reader.into_inner()); let mut history = AccountHistory { - file, - accounts, + file: Arc::new(Mutex::new(file)), + accounts: Arc::new(Mutex::new(accounts)), rpc_handle, }; - if let Err(e) = history.save_to_disk() { + if let Err(e) = history.save_to_disk().await { log::error!("Failed to save account cache after opening it: {}", e); } Ok(history) } - fn migrate_from_old_file_location(old_dir: &Path, new_dir: &Path) { + async fn migrate_from_old_file_location(old_dir: &Path, new_dir: &Path) { + use tokio::fs; + let old_path = old_dir.join(ACCOUNT_HISTORY_FILE); let new_path = new_dir.join(ACCOUNT_HISTORY_FILE); if !old_path.exists() || new_path.exists() || new_path == old_path { return; } - if let Err(error) = fs::copy(&old_path, &new_path) { + if let Err(error) = fs::copy(&old_path, &new_path).await { log::error!( "{}", error.display_chain_with_msg("Failed to migrate account history file location") ); } else { - let _ = fs::remove_file(old_path); + let _ = fs::remove_file(old_path).await; } } @@ -126,6 +132,8 @@ impl AccountHistory { pub async fn get(&mut self, account: &AccountToken) -> Result<Option<AccountEntry>> { let (idx, entry) = match self .accounts + .lock() + .unwrap() .iter() .enumerate() .find(|(_idx, entry)| &entry.account == account) @@ -174,24 +182,26 @@ impl AccountHistory { /// Always inserts a new entry at the start of the list pub async fn insert(&mut self, new_entry: AccountEntry) -> Result<()> { - self.accounts - .retain(|entry| entry.account != new_entry.account); - - self.accounts.push_front(new_entry); + let mut accounts = self.accounts.lock().unwrap(); + accounts.retain(|entry| entry.account != new_entry.account); + accounts.push_front(new_entry); - if self.accounts.len() > ACCOUNT_HISTORY_LIMIT { - let last_entry = self.accounts.pop_back().unwrap(); + if accounts.len() > ACCOUNT_HISTORY_LIMIT { + let last_entry = accounts.pop_back().unwrap(); if let Some(wg_data) = last_entry.wireguard { tokio::spawn(self.create_remove_wg_key_rpc(&last_entry.account, &wg_data)); } } - self.save_to_disk() + std::mem::drop(accounts); + self.save_to_disk().await } /// Retrieve account history. pub fn get_account_history(&self) -> Vec<AccountToken> { self.accounts + .lock() + .unwrap() .iter() .map(|entry| entry.account.clone()) .collect() @@ -209,8 +219,8 @@ impl AccountHistory { tokio::spawn(self.create_remove_wg_key_rpc(account, &wg_data)); } - let _ = self.accounts.pop_front(); - self.save_to_disk() + let _ = self.accounts.lock().unwrap().pop_front(); + self.save_to_disk().await } /// Remove account history @@ -221,6 +231,8 @@ impl AccountHistory { let removal: Vec<_> = self .accounts + .lock() + .unwrap() .drain(0..) .filter_map(move |entry| { let account = entry.account.clone(); @@ -239,18 +251,32 @@ impl AccountHistory { futures::future::join_all(removal).await; - self.accounts = VecDeque::new(); - self.save_to_disk() + self.accounts = Arc::new(Mutex::new(VecDeque::new())); + self.save_to_disk().await + } + + async fn save_to_disk(&mut self) -> Result<()> { + let file = self.file.clone(); + let accounts = self.accounts.clone(); + + tokio::task::spawn_blocking(move || { + let mut file = file.lock().unwrap(); + let accounts = accounts.lock().unwrap(); + Self::save_to_disk_inner(&mut *file, &*accounts) + }) + .await + .map_err(Error::WriteCancelled)? } - fn save_to_disk(&mut self) -> Result<()> { - self.file.get_mut().set_len(0).map_err(Error::Write)?; - self.file - .seek(io::SeekFrom::Start(0)) - .map_err(Error::Write)?; - serde_json::to_writer_pretty(&mut self.file, &self.accounts).map_err(Error::Serialize)?; - self.file.flush().map_err(Error::Write)?; - self.file.get_mut().sync_all().map_err(Error::Write) + fn save_to_disk_inner( + mut file: &mut io::BufWriter<fs::File>, + accounts: &VecDeque<AccountEntry>, + ) -> Result<()> { + file.get_mut().set_len(0).map_err(Error::Write)?; + file.seek(io::SeekFrom::Start(0)).map_err(Error::Write)?; + serde_json::to_writer_pretty(&mut file, accounts).map_err(Error::Serialize)?; + file.flush().map_err(Error::Write)?; + file.get_mut().sync_all().map_err(Error::Write) } } diff --git a/mullvad-daemon/src/lib.rs b/mullvad-daemon/src/lib.rs index cd2d506c2b..10ed53171f 100644 --- a/mullvad-daemon/src/lib.rs +++ b/mullvad-daemon/src/lib.rs @@ -532,6 +532,7 @@ where tokio::spawn(version_updater.run()); let account_history = account_history::AccountHistory::new(&cache_dir, &settings_dir, rpc_handle.clone()) + .await .map_err(Error::LoadAccountHistory)?; // Restore the tunnel to a previous state |
