add lock to prevent snapshot during update

This commit is contained in:
mpostma 2021-03-20 17:24:08 +01:00
parent 520f7c09ba
commit 7f6a54cb12
No known key found for this signature in database
GPG Key ID: CBC8A7C1D7A28C3A
3 changed files with 46 additions and 16 deletions

View File

@ -52,6 +52,7 @@ impl<B> SnapshotService<B> {
let uuids = self.uuid_resolver_handle.snapshot(temp_snapshot_path.clone()).await?; let uuids = self.uuid_resolver_handle.snapshot(temp_snapshot_path.clone()).await?;
for uuid in uuids { for uuid in uuids {
self.update_handle.snapshot(uuid, temp_snapshot_path.clone()).await?; self.update_handle.snapshot(uuid, temp_snapshot_path.clone()).await?;
println!("performed snapshot for index {}", uuid);
} }
Ok(()) Ok(())
} }

View File

@ -5,7 +5,6 @@ use std::path::{Path, PathBuf};
use std::sync::Arc; use std::sync::Arc;
use super::index_actor::IndexActorHandle; use super::index_actor::IndexActorHandle;
use heed::CompactionOption;
use log::info; use log::info;
use oxidized_json_checker::JsonChecker; use oxidized_json_checker::JsonChecker;
use thiserror::Error; use thiserror::Error;
@ -257,21 +256,17 @@ where
} }
async fn handle_snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()> { async fn handle_snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()> {
use tokio::fs;
let update_path = path.join("updates");
fs::create_dir_all(&update_path)
.await
.map_err(|e| UpdateError::Error(e.into()))?;
let index_handle = self.index_handle.clone(); let index_handle = self.index_handle.clone();
if let Some(update_store) = self.store.get(uuid).await? { if let Some(update_store) = self.store.get(uuid).await? {
let snapshot_path = update_path.join(format!("update-{}", uuid));
tokio::task::spawn_blocking(move || -> anyhow::Result<()> { tokio::task::spawn_blocking(move || -> anyhow::Result<()> {
let _txn = update_store.env.write_txn()?; // acquire write lock to prevent further writes during snapshot
update_store // the update lock must be acquired BEFORE the write lock to prevent dead lock
.env let _lock = update_store.update_lock.lock();
.copy_to_path(&snapshot_path, CompactionOption::Enabled)?; let mut txn = update_store.env.write_txn()?;
// create db snapshot
update_store.snapshot(&mut txn, &path, uuid)?;
futures::executor::block_on( futures::executor::block_on(
async move { index_handle.snapshot(uuid, path).await }, async move { index_handle.snapshot(uuid, path).await },
)?; )?;

View File

@ -1,10 +1,10 @@
use std::fs::remove_file; use std::fs::{remove_file, create_dir_all, copy};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::Arc; use std::sync::Arc;
use heed::types::{DecodeIgnore, OwnedType, SerdeJson}; use heed::types::{DecodeIgnore, OwnedType, SerdeJson};
use heed::{Database, Env, EnvOpenOptions}; use heed::{Database, Env, EnvOpenOptions, CompactionOption};
use parking_lot::RwLock; use parking_lot::{RwLock, Mutex};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::fs::File; use std::fs::File;
use tokio::sync::mpsc; use tokio::sync::mpsc;
@ -24,6 +24,9 @@ pub struct UpdateStore<M, N, E> {
aborted_meta: Database<OwnedType<BEU64>, SerdeJson<Aborted<M>>>, aborted_meta: Database<OwnedType<BEU64>, SerdeJson<Aborted<M>>>,
processing: Arc<RwLock<Option<Processing<M>>>>, processing: Arc<RwLock<Option<Processing<M>>>>,
notification_sender: mpsc::Sender<()>, notification_sender: mpsc::Sender<()>,
/// A lock on the update loop. This is meant to prevent a snapshot to occur while an update is
/// processing, while not preventing writes all together during an update
pub update_lock: Arc<Mutex<()>>,
} }
pub trait HandleUpdate<M, N, E> { pub trait HandleUpdate<M, N, E> {
@ -76,6 +79,8 @@ where
// Send a first notification to trigger the process. // Send a first notification to trigger the process.
let _ = notification_sender.send(()); let _ = notification_sender.send(());
let update_lock = Arc::new(Mutex::new(()));
let update_store = Arc::new(UpdateStore { let update_store = Arc::new(UpdateStore {
env, env,
pending, pending,
@ -85,6 +90,7 @@ where
notification_sender, notification_sender,
failed_meta, failed_meta,
processing, processing,
update_lock,
}); });
// We need a weak reference so we can take ownership on the arc later when we // We need a weak reference so we can take ownership on the arc later when we
@ -190,6 +196,7 @@ where
where where
U: HandleUpdate<M, N, E>, U: HandleUpdate<M, N, E>,
{ {
let _lock = self.update_lock.lock();
// Create a read transaction to be able to retrieve the pending update in order. // Create a read transaction to be able to retrieve the pending update in order.
let rtxn = self.env.read_txn()?; let rtxn = self.env.read_txn()?;
let first_meta = self.pending_meta.first(&rtxn)?; let first_meta = self.pending_meta.first(&rtxn)?;
@ -371,6 +378,33 @@ where
Ok(aborted_updates) Ok(aborted_updates)
} }
pub fn snapshot(&self, txn: &mut heed::RwTxn, path: impl AsRef<Path>, uuid: Uuid) -> anyhow::Result<()> {
println!("snapshoting updates in {:?}", path.as_ref());
let update_path = path.as_ref().join("updates");
create_dir_all(&update_path)?;
let snapshot_path = update_path.join(format!("update-{}", uuid));
// acquire write lock to prevent further writes during snapshot
println!("acquired lock");
// create db snapshot
self.env.copy_to_path(&snapshot_path, CompactionOption::Enabled)?;
let update_files_path = update_path.join("update_files");
create_dir_all(&update_files_path)?;
for path in self.pending.iter(&txn)? {
let (_, path) = path?;
let name = path.file_name().unwrap();
let to = update_files_path.join(name);
copy(path, to)?;
}
println!("done");
Ok(())
}
} }
//#[cfg(test)] //#[cfg(test)]