From 295f496e8a5b75ba27cb6f921f43eaea7a484fb9 Mon Sep 17 00:00:00 2001 From: Marin Postma Date: Wed, 12 May 2021 16:21:37 +0200 Subject: [PATCH] atomic index dump load --- meilisearch-http/src/index/updates.rs | 69 +++++++++++-------- .../src/index_controller/dump_actor/v2.rs | 19 +++-- 2 files changed, 54 insertions(+), 34 deletions(-) diff --git a/meilisearch-http/src/index/updates.rs b/meilisearch-http/src/index/updates.rs index 0762c8550..b9c772ee2 100644 --- a/meilisearch-http/src/index/updates.rs +++ b/meilisearch-http/src/index/updates.rs @@ -132,17 +132,30 @@ impl Index { content: Option, update_builder: UpdateBuilder, primary_key: Option<&str>, + ) -> anyhow::Result { + let mut txn = self.write_txn()?; + let result = self.update_documents_txn(&mut txn, format, method, content, update_builder, primary_key)?; + txn.commit()?; + Ok(result) + } + + pub fn update_documents_txn<'a, 'b>( + &'a self, + txn: &mut heed::RwTxn<'a, 'b>, + format: UpdateFormat, + method: IndexDocumentsMethod, + content: Option, + update_builder: UpdateBuilder, + primary_key: Option<&str>, ) -> anyhow::Result { info!("performing document addition"); - // We must use the write transaction of the update here. - let mut wtxn = self.write_txn()?; // Set the primary key if not set already, ignore if already set. - if let (None, Some(ref primary_key)) = (self.primary_key(&wtxn)?, primary_key) { - self.put_primary_key(&mut wtxn, primary_key)?; + if let (None, Some(ref primary_key)) = (self.primary_key(txn)?, primary_key) { + self.put_primary_key(txn, primary_key)?; } - let mut builder = update_builder.index_documents(&mut wtxn, self); + let mut builder = update_builder.index_documents(txn, self); builder.update_format(format); builder.index_documents_method(method); @@ -150,19 +163,15 @@ impl Index { |indexing_step, update_id| info!("update {}: {:?}", update_id, indexing_step); let gzipped = false; - let result = match content { - Some(content) if gzipped => builder.execute(GzDecoder::new(content), indexing_callback), - Some(content) => builder.execute(content, indexing_callback), - None => builder.execute(std::io::empty(), indexing_callback), + let addition = match content { + Some(content) if gzipped => builder.execute(GzDecoder::new(content), indexing_callback)?, + Some(content) => builder.execute(content, indexing_callback)?, + None => builder.execute(std::io::empty(), indexing_callback)?, }; - info!("document addition done: {:?}", result); + info!("document addition done: {:?}", addition); - result.and_then(|addition_result| { - wtxn.commit() - .and(Ok(UpdateResult::DocumentsAddition(addition_result))) - .map_err(Into::into) - }) + Ok(UpdateResult::DocumentsAddition(addition)) } pub fn clear_documents(&self, update_builder: UpdateBuilder) -> anyhow::Result { @@ -179,14 +188,14 @@ impl Index { } } - pub fn update_settings( - &self, + pub fn update_settings_txn<'a, 'b>( + &'a self, + txn: &mut heed::RwTxn<'a, 'b>, settings: &Settings, update_builder: UpdateBuilder, ) -> anyhow::Result { // We must use the write transaction of the update here. - let mut wtxn = self.write_txn()?; - let mut builder = update_builder.settings(&mut wtxn, self); + let mut builder = update_builder.settings(txn, self); if let Some(ref names) = settings.searchable_attributes { match names { @@ -228,16 +237,20 @@ impl Index { } } - let result = builder - .execute(|indexing_step, update_id| info!("update {}: {:?}", update_id, indexing_step)); + builder.execute(|indexing_step, update_id| info!("update {}: {:?}", update_id, indexing_step))?; - match result { - Ok(()) => wtxn - .commit() - .and(Ok(UpdateResult::Other)) - .map_err(Into::into), - Err(e) => Err(e), - } + Ok(UpdateResult::Other) + } + + pub fn update_settings( + &self, + settings: &Settings, + update_builder: UpdateBuilder, + ) -> anyhow::Result { + let mut txn = self.write_txn()?; + let result = self.update_settings_txn(&mut txn, settings, update_builder)?; + txn.commit()?; + Ok(result) } pub fn delete_documents( diff --git a/meilisearch-http/src/index_controller/dump_actor/v2.rs b/meilisearch-http/src/index_controller/dump_actor/v2.rs index 4f39f88bf..eeda78e8a 100644 --- a/meilisearch-http/src/index_controller/dump_actor/v2.rs +++ b/meilisearch-http/src/index_controller/dump_actor/v2.rs @@ -1,7 +1,7 @@ use heed::EnvOpenOptions; use log::info; use uuid::Uuid; -use crate::index_controller::{UpdateStatus, update_actor::UpdateStore}; +use crate::{index::Unchecked, index_controller::{UpdateStatus, update_actor::UpdateStore}}; use std::io::BufRead; use milli::{update::{IndexDocumentsMethod, UpdateBuilder, UpdateFormat}}; use crate::index::{Checked, Index}; @@ -13,9 +13,11 @@ fn import_settings(dir_path: &Path) -> anyhow::Result> { let path = dir_path.join("settings.json"); let file = File::open(path)?; let reader = std::io::BufReader::new(file); - let metadata = serde_json::from_reader(reader)?; + let metadata: Settings = serde_json::from_reader(reader)?; - Ok(metadata) + println!("Meta: {:?}", metadata); + + Ok(metadata.check()) } pub fn import_index(size: usize, uuid: Uuid, dump_path: &Path, db_path: &Path, primary_key: Option<&str>) -> anyhow::Result<()> { @@ -26,11 +28,13 @@ pub fn import_index(size: usize, uuid: Uuid, dump_path: &Path, db_path: &Path, p let index = milli::Index::new(options, index_path)?; let index = Index(Arc::new(index)); + let mut txn = index.write_txn()?; + info!("importing the settings..."); // extract `settings.json` file and import content let settings = import_settings(&dump_path)?; let update_builder = UpdateBuilder::new(0); - index.update_settings(&settings, update_builder)?; + index.update_settings_txn(&mut txn, &settings, update_builder)?; // import the documents in the index let update_builder = UpdateBuilder::new(1); @@ -41,13 +45,16 @@ pub fn import_index(size: usize, uuid: Uuid, dump_path: &Path, db_path: &Path, p // TODO: TAMO: currently we ignore any error caused by the importation of the documents because // if there is no documents nor primary key it'll throw an anyhow error, but we must remove // this before the merge on main - let _ = index.update_documents( + index.update_documents_txn( + &mut txn, UpdateFormat::JsonStream, IndexDocumentsMethod::ReplaceDocuments, Some(reader), update_builder, primary_key, - ); + )?; + + txn.commit()?; // the last step: we extract the original milli::Index and close it Arc::try_unwrap(index.0)