summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorDavid Lönnhager <david.l@mullvad.net>2020-08-31 11:47:24 +0200
committerDavid Lönnhager <david.l@mullvad.net>2020-09-01 14:18:35 +0200
commitdef635a4a8f4a015ecc3ff0f24c02f8919a8929c (patch)
treef0c8f80baed0d21d65a3120da318e8d1e65c62ad
parentefec1159152e8da388ca489c97a55e20d6027ed1 (diff)
downloadmullvadvpn-def635a4a8f4a015ecc3ff0f24c02f8919a8929c.tar.xz
mullvadvpn-def635a4a8f4a015ecc3ff0f24c02f8919a8929c.zip
Asynchronously write account history to file
-rw-r--r--mullvad-daemon/src/account_history.rs84
-rw-r--r--mullvad-daemon/src/lib.rs1
2 files changed, 56 insertions, 29 deletions
diff --git a/mullvad-daemon/src/account_history.rs b/mullvad-daemon/src/account_history.rs
index 084ea271ee..8cceecda42 100644
--- a/mullvad-daemon/src/account_history.rs
+++ b/mullvad-daemon/src/account_history.rs
@@ -6,6 +6,7 @@ use std::{
future::Future,
io::{self, Seek, Write},
path::Path,
+ sync::{Arc, Mutex},
};
use talpid_types::ErrorExt;
@@ -22,6 +23,9 @@ pub enum Error {
#[error(display = "Unable to write account history file")]
Write(#[error(source)] io::Error),
+
+ #[error(display = "Write task panicked or was cancelled")]
+ WriteCancelled(#[error(source)] tokio::task::JoinError),
}
static ACCOUNT_HISTORY_FILE: &str = "account-history.json";
@@ -29,19 +33,19 @@ static ACCOUNT_HISTORY_LIMIT: usize = 3;
/// A trivial MRU cache of account data
pub struct AccountHistory {
- file: io::BufWriter<fs::File>,
- accounts: VecDeque<AccountEntry>,
+ file: Arc<Mutex<io::BufWriter<fs::File>>>,
+ accounts: Arc<Mutex<VecDeque<AccountEntry>>>,
rpc_handle: MullvadRestHandle,
}
impl AccountHistory {
- pub fn new(
+ pub async fn new(
cache_dir: &Path,
settings_dir: &Path,
rpc_handle: MullvadRestHandle,
) -> Result<AccountHistory> {
- Self::migrate_from_old_file_location(cache_dir, settings_dir);
+ Self::migrate_from_old_file_location(cache_dir, settings_dir).await;
let mut options = fs::OpenOptions::new();
#[cfg(unix)]
@@ -83,30 +87,32 @@ impl AccountHistory {
};
let file = io::BufWriter::new(reader.into_inner());
let mut history = AccountHistory {
- file,
- accounts,
+ file: Arc::new(Mutex::new(file)),
+ accounts: Arc::new(Mutex::new(accounts)),
rpc_handle,
};
- if let Err(e) = history.save_to_disk() {
+ if let Err(e) = history.save_to_disk().await {
log::error!("Failed to save account cache after opening it: {}", e);
}
Ok(history)
}
- fn migrate_from_old_file_location(old_dir: &Path, new_dir: &Path) {
+ async fn migrate_from_old_file_location(old_dir: &Path, new_dir: &Path) {
+ use tokio::fs;
+
let old_path = old_dir.join(ACCOUNT_HISTORY_FILE);
let new_path = new_dir.join(ACCOUNT_HISTORY_FILE);
if !old_path.exists() || new_path.exists() || new_path == old_path {
return;
}
- if let Err(error) = fs::copy(&old_path, &new_path) {
+ if let Err(error) = fs::copy(&old_path, &new_path).await {
log::error!(
"{}",
error.display_chain_with_msg("Failed to migrate account history file location")
);
} else {
- let _ = fs::remove_file(old_path);
+ let _ = fs::remove_file(old_path).await;
}
}
@@ -126,6 +132,8 @@ impl AccountHistory {
pub async fn get(&mut self, account: &AccountToken) -> Result<Option<AccountEntry>> {
let (idx, entry) = match self
.accounts
+ .lock()
+ .unwrap()
.iter()
.enumerate()
.find(|(_idx, entry)| &entry.account == account)
@@ -174,24 +182,26 @@ impl AccountHistory {
/// Always inserts a new entry at the start of the list
pub async fn insert(&mut self, new_entry: AccountEntry) -> Result<()> {
- self.accounts
- .retain(|entry| entry.account != new_entry.account);
-
- self.accounts.push_front(new_entry);
+ let mut accounts = self.accounts.lock().unwrap();
+ accounts.retain(|entry| entry.account != new_entry.account);
+ accounts.push_front(new_entry);
- if self.accounts.len() > ACCOUNT_HISTORY_LIMIT {
- let last_entry = self.accounts.pop_back().unwrap();
+ if accounts.len() > ACCOUNT_HISTORY_LIMIT {
+ let last_entry = accounts.pop_back().unwrap();
if let Some(wg_data) = last_entry.wireguard {
tokio::spawn(self.create_remove_wg_key_rpc(&last_entry.account, &wg_data));
}
}
- self.save_to_disk()
+ std::mem::drop(accounts);
+ self.save_to_disk().await
}
/// Retrieve account history.
pub fn get_account_history(&self) -> Vec<AccountToken> {
self.accounts
+ .lock()
+ .unwrap()
.iter()
.map(|entry| entry.account.clone())
.collect()
@@ -209,8 +219,8 @@ impl AccountHistory {
tokio::spawn(self.create_remove_wg_key_rpc(account, &wg_data));
}
- let _ = self.accounts.pop_front();
- self.save_to_disk()
+ let _ = self.accounts.lock().unwrap().pop_front();
+ self.save_to_disk().await
}
/// Remove account history
@@ -221,6 +231,8 @@ impl AccountHistory {
let removal: Vec<_> = self
.accounts
+ .lock()
+ .unwrap()
.drain(0..)
.filter_map(move |entry| {
let account = entry.account.clone();
@@ -239,18 +251,32 @@ impl AccountHistory {
futures::future::join_all(removal).await;
- self.accounts = VecDeque::new();
- self.save_to_disk()
+ self.accounts = Arc::new(Mutex::new(VecDeque::new()));
+ self.save_to_disk().await
+ }
+
+ async fn save_to_disk(&mut self) -> Result<()> {
+ let file = self.file.clone();
+ let accounts = self.accounts.clone();
+
+ tokio::task::spawn_blocking(move || {
+ let mut file = file.lock().unwrap();
+ let accounts = accounts.lock().unwrap();
+ Self::save_to_disk_inner(&mut *file, &*accounts)
+ })
+ .await
+ .map_err(Error::WriteCancelled)?
}
- fn save_to_disk(&mut self) -> Result<()> {
- self.file.get_mut().set_len(0).map_err(Error::Write)?;
- self.file
- .seek(io::SeekFrom::Start(0))
- .map_err(Error::Write)?;
- serde_json::to_writer_pretty(&mut self.file, &self.accounts).map_err(Error::Serialize)?;
- self.file.flush().map_err(Error::Write)?;
- self.file.get_mut().sync_all().map_err(Error::Write)
+ fn save_to_disk_inner(
+ mut file: &mut io::BufWriter<fs::File>,
+ accounts: &VecDeque<AccountEntry>,
+ ) -> Result<()> {
+ file.get_mut().set_len(0).map_err(Error::Write)?;
+ file.seek(io::SeekFrom::Start(0)).map_err(Error::Write)?;
+ serde_json::to_writer_pretty(&mut file, accounts).map_err(Error::Serialize)?;
+ file.flush().map_err(Error::Write)?;
+ file.get_mut().sync_all().map_err(Error::Write)
}
}
diff --git a/mullvad-daemon/src/lib.rs b/mullvad-daemon/src/lib.rs
index cd2d506c2b..10ed53171f 100644
--- a/mullvad-daemon/src/lib.rs
+++ b/mullvad-daemon/src/lib.rs
@@ -532,6 +532,7 @@ where
tokio::spawn(version_updater.run());
let account_history =
account_history::AccountHistory::new(&cache_dir, &settings_dir, rpc_handle.clone())
+ .await
.map_err(Error::LoadAccountHistory)?;
// Restore the tunnel to a previous state