From 7f6a54cb12fe1eaf781482e6ff76d71019b89a86 Mon Sep 17 00:00:00 2001 From: mpostma Date: Sat, 20 Mar 2021 17:24:08 +0100 Subject: [PATCH] add lock to prevent snapshot during update --- .../src/index_controller/snapshot.rs | 1 + .../src/index_controller/update_actor.rs | 21 ++++------ .../src/index_controller/update_store.rs | 40 +++++++++++++++++-- 3 files changed, 46 insertions(+), 16 deletions(-) diff --git a/meilisearch-http/src/index_controller/snapshot.rs b/meilisearch-http/src/index_controller/snapshot.rs index 75f6c1f82..8e26fbfac 100644 --- a/meilisearch-http/src/index_controller/snapshot.rs +++ b/meilisearch-http/src/index_controller/snapshot.rs @@ -52,6 +52,7 @@ impl SnapshotService { let uuids = self.uuid_resolver_handle.snapshot(temp_snapshot_path.clone()).await?; for uuid in uuids { self.update_handle.snapshot(uuid, temp_snapshot_path.clone()).await?; + println!("performed snapshot for index {}", uuid); } Ok(()) } diff --git a/meilisearch-http/src/index_controller/update_actor.rs b/meilisearch-http/src/index_controller/update_actor.rs index 6e017dcf5..64eab5221 100644 --- a/meilisearch-http/src/index_controller/update_actor.rs +++ b/meilisearch-http/src/index_controller/update_actor.rs @@ -5,7 +5,6 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; use super::index_actor::IndexActorHandle; -use heed::CompactionOption; use log::info; use oxidized_json_checker::JsonChecker; use thiserror::Error; @@ -257,21 +256,17 @@ where } async fn handle_snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()> { - use tokio::fs; - - let update_path = path.join("updates"); - fs::create_dir_all(&update_path) - .await - .map_err(|e| UpdateError::Error(e.into()))?; - let index_handle = self.index_handle.clone(); if let Some(update_store) = self.store.get(uuid).await? { - let snapshot_path = update_path.join(format!("update-{}", uuid)); tokio::task::spawn_blocking(move || -> anyhow::Result<()> { - let _txn = update_store.env.write_txn()?; - update_store - .env - .copy_to_path(&snapshot_path, CompactionOption::Enabled)?; + // acquire write lock to prevent further writes during snapshot + // the update lock must be acquired BEFORE the write lock to prevent dead lock + let _lock = update_store.update_lock.lock(); + let mut txn = update_store.env.write_txn()?; + + // create db snapshot + update_store.snapshot(&mut txn, &path, uuid)?; + futures::executor::block_on( async move { index_handle.snapshot(uuid, path).await }, )?; diff --git a/meilisearch-http/src/index_controller/update_store.rs b/meilisearch-http/src/index_controller/update_store.rs index 5280ed94e..587b060af 100644 --- a/meilisearch-http/src/index_controller/update_store.rs +++ b/meilisearch-http/src/index_controller/update_store.rs @@ -1,10 +1,10 @@ -use std::fs::remove_file; +use std::fs::{remove_file, create_dir_all, copy}; use std::path::{Path, PathBuf}; use std::sync::Arc; use heed::types::{DecodeIgnore, OwnedType, SerdeJson}; -use heed::{Database, Env, EnvOpenOptions}; -use parking_lot::RwLock; +use heed::{Database, Env, EnvOpenOptions, CompactionOption}; +use parking_lot::{RwLock, Mutex}; use serde::{Deserialize, Serialize}; use std::fs::File; use tokio::sync::mpsc; @@ -24,6 +24,9 @@ pub struct UpdateStore { aborted_meta: Database, SerdeJson>>, processing: Arc>>>, notification_sender: mpsc::Sender<()>, + /// A lock on the update loop. This is meant to prevent a snapshot to occur while an update is + /// processing, while not preventing writes all together during an update + pub update_lock: Arc>, } pub trait HandleUpdate { @@ -76,6 +79,8 @@ where // Send a first notification to trigger the process. let _ = notification_sender.send(()); + let update_lock = Arc::new(Mutex::new(())); + let update_store = Arc::new(UpdateStore { env, pending, @@ -85,6 +90,7 @@ where notification_sender, failed_meta, processing, + update_lock, }); // We need a weak reference so we can take ownership on the arc later when we @@ -190,6 +196,7 @@ where where U: HandleUpdate, { + let _lock = self.update_lock.lock(); // Create a read transaction to be able to retrieve the pending update in order. let rtxn = self.env.read_txn()?; let first_meta = self.pending_meta.first(&rtxn)?; @@ -371,6 +378,33 @@ where Ok(aborted_updates) } + + pub fn snapshot(&self, txn: &mut heed::RwTxn, path: impl AsRef, uuid: Uuid) -> anyhow::Result<()> { + println!("snapshoting updates in {:?}", path.as_ref()); + let update_path = path.as_ref().join("updates"); + create_dir_all(&update_path)?; + + let snapshot_path = update_path.join(format!("update-{}", uuid)); + // acquire write lock to prevent further writes during snapshot + println!("acquired lock"); + + // create db snapshot + self.env.copy_to_path(&snapshot_path, CompactionOption::Enabled)?; + + let update_files_path = update_path.join("update_files"); + create_dir_all(&update_files_path)?; + + for path in self.pending.iter(&txn)? { + let (_, path) = path?; + let name = path.file_name().unwrap(); + let to = update_files_path.join(name); + copy(path, to)?; + } + + println!("done"); + + Ok(()) + } } //#[cfg(test)]