atomic index dump load

This commit is contained in:
Marin Postma 2021-05-12 16:21:37 +02:00
parent 6d837e3e07
commit 295f496e8a
No known key found for this signature in database
GPG Key ID: D5241F0C0C865F30
2 changed files with 54 additions and 34 deletions

View File

@ -133,16 +133,29 @@ impl Index {
update_builder: UpdateBuilder, update_builder: UpdateBuilder,
primary_key: Option<&str>, primary_key: Option<&str>,
) -> anyhow::Result<UpdateResult> { ) -> anyhow::Result<UpdateResult> {
info!("performing document addition"); let mut txn = self.write_txn()?;
// We must use the write transaction of the update here. let result = self.update_documents_txn(&mut txn, format, method, content, update_builder, primary_key)?;
let mut wtxn = self.write_txn()?; txn.commit()?;
Ok(result)
// Set the primary key if not set already, ignore if already set.
if let (None, Some(ref primary_key)) = (self.primary_key(&wtxn)?, primary_key) {
self.put_primary_key(&mut wtxn, primary_key)?;
} }
let mut builder = update_builder.index_documents(&mut wtxn, self); pub fn update_documents_txn<'a, 'b>(
&'a self,
txn: &mut heed::RwTxn<'a, 'b>,
format: UpdateFormat,
method: IndexDocumentsMethod,
content: Option<impl io::Read>,
update_builder: UpdateBuilder,
primary_key: Option<&str>,
) -> anyhow::Result<UpdateResult> {
info!("performing document addition");
// Set the primary key if not set already, ignore if already set.
if let (None, Some(ref primary_key)) = (self.primary_key(txn)?, primary_key) {
self.put_primary_key(txn, primary_key)?;
}
let mut builder = update_builder.index_documents(txn, self);
builder.update_format(format); builder.update_format(format);
builder.index_documents_method(method); builder.index_documents_method(method);
@ -150,19 +163,15 @@ impl Index {
|indexing_step, update_id| info!("update {}: {:?}", update_id, indexing_step); |indexing_step, update_id| info!("update {}: {:?}", update_id, indexing_step);
let gzipped = false; let gzipped = false;
let result = match content { let addition = match content {
Some(content) if gzipped => builder.execute(GzDecoder::new(content), indexing_callback), Some(content) if gzipped => builder.execute(GzDecoder::new(content), indexing_callback)?,
Some(content) => builder.execute(content, indexing_callback), Some(content) => builder.execute(content, indexing_callback)?,
None => builder.execute(std::io::empty(), indexing_callback), None => builder.execute(std::io::empty(), indexing_callback)?,
}; };
info!("document addition done: {:?}", result); info!("document addition done: {:?}", addition);
result.and_then(|addition_result| { Ok(UpdateResult::DocumentsAddition(addition))
wtxn.commit()
.and(Ok(UpdateResult::DocumentsAddition(addition_result)))
.map_err(Into::into)
})
} }
pub fn clear_documents(&self, update_builder: UpdateBuilder) -> anyhow::Result<UpdateResult> { pub fn clear_documents(&self, update_builder: UpdateBuilder) -> anyhow::Result<UpdateResult> {
@ -179,14 +188,14 @@ impl Index {
} }
} }
pub fn update_settings( pub fn update_settings_txn<'a, 'b>(
&self, &'a self,
txn: &mut heed::RwTxn<'a, 'b>,
settings: &Settings<Checked>, settings: &Settings<Checked>,
update_builder: UpdateBuilder, update_builder: UpdateBuilder,
) -> anyhow::Result<UpdateResult> { ) -> anyhow::Result<UpdateResult> {
// We must use the write transaction of the update here. // We must use the write transaction of the update here.
let mut wtxn = self.write_txn()?; let mut builder = update_builder.settings(txn, self);
let mut builder = update_builder.settings(&mut wtxn, self);
if let Some(ref names) = settings.searchable_attributes { if let Some(ref names) = settings.searchable_attributes {
match names { match names {
@ -228,16 +237,20 @@ impl Index {
} }
} }
let result = builder builder.execute(|indexing_step, update_id| info!("update {}: {:?}", update_id, indexing_step))?;
.execute(|indexing_step, update_id| info!("update {}: {:?}", update_id, indexing_step));
match result { Ok(UpdateResult::Other)
Ok(()) => wtxn
.commit()
.and(Ok(UpdateResult::Other))
.map_err(Into::into),
Err(e) => Err(e),
} }
pub fn update_settings(
&self,
settings: &Settings<Checked>,
update_builder: UpdateBuilder,
) -> anyhow::Result<UpdateResult> {
let mut txn = self.write_txn()?;
let result = self.update_settings_txn(&mut txn, settings, update_builder)?;
txn.commit()?;
Ok(result)
} }
pub fn delete_documents( pub fn delete_documents(

View File

@ -1,7 +1,7 @@
use heed::EnvOpenOptions; use heed::EnvOpenOptions;
use log::info; use log::info;
use uuid::Uuid; use uuid::Uuid;
use crate::index_controller::{UpdateStatus, update_actor::UpdateStore}; use crate::{index::Unchecked, index_controller::{UpdateStatus, update_actor::UpdateStore}};
use std::io::BufRead; use std::io::BufRead;
use milli::{update::{IndexDocumentsMethod, UpdateBuilder, UpdateFormat}}; use milli::{update::{IndexDocumentsMethod, UpdateBuilder, UpdateFormat}};
use crate::index::{Checked, Index}; use crate::index::{Checked, Index};
@ -13,9 +13,11 @@ fn import_settings(dir_path: &Path) -> anyhow::Result<Settings<Checked>> {
let path = dir_path.join("settings.json"); let path = dir_path.join("settings.json");
let file = File::open(path)?; let file = File::open(path)?;
let reader = std::io::BufReader::new(file); let reader = std::io::BufReader::new(file);
let metadata = serde_json::from_reader(reader)?; let metadata: Settings<Unchecked> = serde_json::from_reader(reader)?;
Ok(metadata) println!("Meta: {:?}", metadata);
Ok(metadata.check())
} }
pub fn import_index(size: usize, uuid: Uuid, dump_path: &Path, db_path: &Path, primary_key: Option<&str>) -> anyhow::Result<()> { pub fn import_index(size: usize, uuid: Uuid, dump_path: &Path, db_path: &Path, primary_key: Option<&str>) -> anyhow::Result<()> {
@ -26,11 +28,13 @@ pub fn import_index(size: usize, uuid: Uuid, dump_path: &Path, db_path: &Path, p
let index = milli::Index::new(options, index_path)?; let index = milli::Index::new(options, index_path)?;
let index = Index(Arc::new(index)); let index = Index(Arc::new(index));
let mut txn = index.write_txn()?;
info!("importing the settings..."); info!("importing the settings...");
// extract `settings.json` file and import content // extract `settings.json` file and import content
let settings = import_settings(&dump_path)?; let settings = import_settings(&dump_path)?;
let update_builder = UpdateBuilder::new(0); let update_builder = UpdateBuilder::new(0);
index.update_settings(&settings, update_builder)?; index.update_settings_txn(&mut txn, &settings, update_builder)?;
// import the documents in the index // import the documents in the index
let update_builder = UpdateBuilder::new(1); let update_builder = UpdateBuilder::new(1);
@ -41,13 +45,16 @@ pub fn import_index(size: usize, uuid: Uuid, dump_path: &Path, db_path: &Path, p
// TODO: TAMO: currently we ignore any error caused by the importation of the documents because // TODO: TAMO: currently we ignore any error caused by the importation of the documents because
// if there is no documents nor primary key it'll throw an anyhow error, but we must remove // if there is no documents nor primary key it'll throw an anyhow error, but we must remove
// this before the merge on main // this before the merge on main
let _ = index.update_documents( index.update_documents_txn(
&mut txn,
UpdateFormat::JsonStream, UpdateFormat::JsonStream,
IndexDocumentsMethod::ReplaceDocuments, IndexDocumentsMethod::ReplaceDocuments,
Some(reader), Some(reader),
update_builder, update_builder,
primary_key, primary_key,
); )?;
txn.commit()?;
// the last step: we extract the original milli::Index and close it // the last step: we extract the original milli::Index and close it
Arc::try_unwrap(index.0) Arc::try_unwrap(index.0)