diff --git a/http-ui/src/main.rs b/http-ui/src/main.rs index 5c61d3e75..a29eb8895 100644 --- a/http-ui/src/main.rs +++ b/http-ui/src/main.rs @@ -322,7 +322,7 @@ async fn main() -> anyhow::Result<()> { // the type hint is necessary: https://github.com/rust-lang/rust/issues/32600 move |update_id, meta, content:&_| { // We prepare the update by using the update builder. - let mut update_builder = UpdateBuilder::new(); + let mut update_builder = UpdateBuilder::new(update_id); if let Some(max_nb_chunks) = indexer_opt_cloned.max_nb_chunks { update_builder.max_nb_chunks(max_nb_chunks); } @@ -363,7 +363,7 @@ async fn main() -> anyhow::Result<()> { otherwise => panic!("invalid encoding format {:?}", otherwise), }; - let result = builder.execute(reader, |indexing_step| { + let result = builder.execute(reader, |indexing_step, update_id| { let (current, total) = match indexing_step { TransformFromUserIntoGenericFormat { documents_seen } => (documents_seen, None), ComputeIdsAndMergeDocuments { documents_seen, total_documents } => (documents_seen, Some(total_documents)), @@ -382,7 +382,7 @@ async fn main() -> anyhow::Result<()> { }); match result { - Ok(()) => wtxn.commit().map_err(Into::into), + Ok(_) => wtxn.commit().map_err(Into::into), Err(e) => Err(e.into()) } }, @@ -430,7 +430,7 @@ async fn main() -> anyhow::Result<()> { } } - let result = builder.execute(|indexing_step| { + let result = builder.execute(|indexing_step, update_id| { let (current, total) = match indexing_step { TransformFromUserIntoGenericFormat { documents_seen } => (documents_seen, None), ComputeIdsAndMergeDocuments { documents_seen, total_documents } => (documents_seen, Some(total_documents)), diff --git a/src/search/facet/facet_condition.rs b/src/search/facet/facet_condition.rs index 578344d37..42c2327a9 100644 --- a/src/search/facet/facet_condition.rs +++ b/src/search/facet/facet_condition.rs @@ -625,9 +625,9 @@ mod tests { // Set the faceted fields to be the channel. let mut wtxn = index.write_txn().unwrap(); - let mut builder = Settings::new(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, 0); builder.set_faceted_fields(hashmap!{ "channel".into() => "string".into() }); - builder.execute(|_| ()).unwrap(); + builder.execute(|_, _| ()).unwrap(); wtxn.commit().unwrap(); // Test that the facet condition is correctly generated. @@ -654,9 +654,9 @@ mod tests { // Set the faceted fields to be the channel. let mut wtxn = index.write_txn().unwrap(); - let mut builder = Settings::new(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, 0); builder.set_faceted_fields(hashmap!{ "timestamp".into() => "integer".into() }); - builder.execute(|_| ()).unwrap(); + builder.execute(|_, _| ()).unwrap(); wtxn.commit().unwrap(); // Test that the facet condition is correctly generated. @@ -682,13 +682,13 @@ mod tests { // Set the faceted fields to be the channel. let mut wtxn = index.write_txn().unwrap(); - let mut builder = Settings::new(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, 0); builder.set_searchable_fields(vec!["channel".into(), "timestamp".into()]); // to keep the fields order builder.set_faceted_fields(hashmap!{ "channel".into() => "string".into(), "timestamp".into() => "integer".into(), }); - builder.execute(|_| ()).unwrap(); + builder.execute(|_, _| ()).unwrap(); wtxn.commit().unwrap(); // Test that the facet condition is correctly generated. @@ -732,13 +732,13 @@ mod tests { // Set the faceted fields to be the channel. let mut wtxn = index.write_txn().unwrap(); - let mut builder = Settings::new(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, 0); builder.set_searchable_fields(vec!["channel".into(), "timestamp".into()]); // to keep the fields order builder.set_faceted_fields(hashmap!{ "channel".into() => "string".into(), "timestamp".into() => "integer".into(), }); - builder.execute(|_| ()).unwrap(); + builder.execute(|_, _| ()).unwrap(); wtxn.commit().unwrap(); // Test that the facet condition is correctly generated. diff --git a/src/update/clear_documents.rs b/src/update/clear_documents.rs index ac359ba0d..a84596901 100644 --- a/src/update/clear_documents.rs +++ b/src/update/clear_documents.rs @@ -4,11 +4,17 @@ use crate::{ExternalDocumentsIds, Index}; pub struct ClearDocuments<'t, 'u, 'i> { wtxn: &'t mut heed::RwTxn<'i, 'u>, index: &'i Index, + _update_id: u64, } impl<'t, 'u, 'i> ClearDocuments<'t, 'u, 'i> { - pub fn new(wtxn: &'t mut heed::RwTxn<'i, 'u>, index: &'i Index) -> ClearDocuments<'t, 'u, 'i> { - ClearDocuments { wtxn, index } + pub fn new( + wtxn: &'t mut heed::RwTxn<'i, 'u>, + index: &'i Index, + update_id: u64 + ) -> ClearDocuments<'t, 'u, 'i> { + + ClearDocuments { wtxn, index, _update_id: update_id } } pub fn execute(self) -> anyhow::Result { diff --git a/src/update/delete_documents.rs b/src/update/delete_documents.rs index 6dc4b1cfa..2b67535c9 100644 --- a/src/update/delete_documents.rs +++ b/src/update/delete_documents.rs @@ -12,12 +12,14 @@ pub struct DeleteDocuments<'t, 'u, 'i> { index: &'i Index, external_documents_ids: ExternalDocumentsIds<'static>, documents_ids: RoaringBitmap, + update_id: u64, } impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> { pub fn new( wtxn: &'t mut heed::RwTxn<'i, 'u>, index: &'i Index, + update_id: u64, ) -> anyhow::Result> { let external_documents_ids = index @@ -29,6 +31,7 @@ impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> { index, external_documents_ids, documents_ids: RoaringBitmap::new(), + update_id, }) } @@ -64,7 +67,7 @@ impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> { // We can execute a ClearDocuments operation when the number of documents // to delete is exactly the number of documents in the database. if current_documents_ids_len == self.documents_ids.len() { - return ClearDocuments::new(self.wtxn, self.index).execute(); + return ClearDocuments::new(self.wtxn, self.index, self.update_id).execute(); } let fields_ids_map = self.index.fields_ids_map(self.wtxn)?; diff --git a/src/update/facets.rs b/src/update/facets.rs index a268cbeaf..522a4d350 100644 --- a/src/update/facets.rs +++ b/src/update/facets.rs @@ -24,10 +24,15 @@ pub struct Facets<'t, 'u, 'i> { pub(crate) chunk_fusing_shrink_size: Option, level_group_size: NonZeroUsize, min_level_size: NonZeroUsize, + _update_id: u64, } impl<'t, 'u, 'i> Facets<'t, 'u, 'i> { - pub fn new(wtxn: &'t mut heed::RwTxn<'i, 'u>, index: &'i Index) -> Facets<'t, 'u, 'i> { + pub fn new( + wtxn: &'t mut heed::RwTxn<'i, 'u>, + index: &'i Index, + update_id: u64, + ) -> Facets<'t, 'u, 'i> { Facets { wtxn, index, @@ -36,6 +41,7 @@ impl<'t, 'u, 'i> Facets<'t, 'u, 'i> { chunk_fusing_shrink_size: None, level_group_size: NonZeroUsize::new(4).unwrap(), min_level_size: NonZeroUsize::new(5).unwrap(), + _update_id: update_id, } } diff --git a/src/update/index_documents/mod.rs b/src/update/index_documents/mod.rs index 90bc5ef3d..e38c640a0 100644 --- a/src/update/index_documents/mod.rs +++ b/src/update/index_documents/mod.rs @@ -12,8 +12,9 @@ use grenad::{Writer, Sorter, Merger, Reader, FileFuse, CompressionType}; use heed::types::ByteSlice; use log::{debug, info, error}; use memmap::Mmap; -use rayon::prelude::*; use rayon::ThreadPool; +use rayon::prelude::*; +use serde::{Serialize, Deserialize}; use crate::index::Index; use crate::update::{Facets, UpdateIndexingStep}; @@ -32,6 +33,11 @@ mod merge_function; mod store; mod transform; +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct DocumentAdditionResult { + nb_documents: usize, +} + #[derive(Debug, Copy, Clone)] pub enum WriteMethod { Append, @@ -175,7 +181,7 @@ pub fn write_into_lmdb_database( Ok(()) } -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] #[non_exhaustive] pub enum IndexDocumentsMethod { /// Replace the previous document with the new one, @@ -187,7 +193,7 @@ pub enum IndexDocumentsMethod { UpdateDocuments, } -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] #[non_exhaustive] pub enum UpdateFormat { /// The given update is a real **comma seperated** CSV with headers on the first line. @@ -214,10 +220,15 @@ pub struct IndexDocuments<'t, 'u, 'i, 'a> { update_method: IndexDocumentsMethod, update_format: UpdateFormat, autogenerate_docids: bool, + update_id: u64, } impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { - pub fn new(wtxn: &'t mut heed::RwTxn<'i, 'u>, index: &'i Index) -> IndexDocuments<'t, 'u, 'i, 'a> { + pub fn new( + wtxn: &'t mut heed::RwTxn<'i, 'u>, + index: &'i Index, + update_id: u64, + ) -> IndexDocuments<'t, 'u, 'i, 'a> { IndexDocuments { wtxn, index, @@ -234,6 +245,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { update_method: IndexDocumentsMethod::ReplaceDocuments, update_format: UpdateFormat::Json, autogenerate_docids: true, + update_id, } } @@ -253,12 +265,14 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { self.autogenerate_docids = false; } - pub fn execute(self, reader: R, progress_callback: F) -> anyhow::Result<()> + pub fn execute(self, reader: R, progress_callback: F) -> anyhow::Result where R: io::Read, - F: Fn(UpdateIndexingStep) + Sync, + F: Fn(UpdateIndexingStep, u64) + Sync, { let before_transform = Instant::now(); + let update_id = self.update_id; + let progress_callback = |step| progress_callback(step, update_id); let transform = Transform { rtxn: &self.wtxn, @@ -279,9 +293,12 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { UpdateFormat::JsonStream => transform.output_from_json_stream(reader, &progress_callback)?, }; + let nb_documents = output.documents_count; + info!("Update transformed in {:.02?}", before_transform.elapsed()); - self.execute_raw(output, progress_callback) + self.execute_raw(output, progress_callback)?; + Ok(DocumentAdditionResult { nb_documents }) } pub fn execute_raw(self, output: TransformOutput, progress_callback: F) -> anyhow::Result<()> @@ -312,6 +329,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { chunk_compression_level: self.chunk_compression_level, chunk_fusing_shrink_size: self.chunk_fusing_shrink_size, thread_pool: self.thread_pool, + update_id: self.update_id, }; let mut deletion_builder = update_builder.delete_documents(self.wtxn, self.index)?; debug!("documents to delete {:?}", replaced_documents_ids); @@ -607,7 +625,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { }); } - let mut builder = Facets::new(self.wtxn, self.index); + let mut builder = Facets::new(self.wtxn, self.index, self.update_id); builder.chunk_compression_type = self.chunk_compression_type; builder.chunk_compression_level = self.chunk_compression_level; builder.chunk_fusing_shrink_size = self.chunk_fusing_shrink_size; @@ -642,9 +660,9 @@ mod tests { // First we send 3 documents with ids from 1 to 3. let mut wtxn = index.write_txn().unwrap(); let content = &b"id,name\n1,kevin\n2,kevina\n3,benoit\n"[..]; - let mut builder = IndexDocuments::new(&mut wtxn, &index); + let mut builder = IndexDocuments::new(&mut wtxn, &index, 0); builder.update_format(UpdateFormat::Csv); - builder.execute(content, |_| ()).unwrap(); + builder.execute(content, |_, _| ()).unwrap(); wtxn.commit().unwrap(); // Check that there is 3 documents now. @@ -656,9 +674,9 @@ mod tests { // Second we send 1 document with id 1, to erase the previous ones. let mut wtxn = index.write_txn().unwrap(); let content = &b"id,name\n1,updated kevin\n"[..]; - let mut builder = IndexDocuments::new(&mut wtxn, &index); + let mut builder = IndexDocuments::new(&mut wtxn, &index, 1); builder.update_format(UpdateFormat::Csv); - builder.execute(content, |_| ()).unwrap(); + builder.execute(content, |_, _| ()).unwrap(); wtxn.commit().unwrap(); // Check that there is **always** 3 documents. @@ -670,9 +688,9 @@ mod tests { // Third we send 3 documents again to replace the existing ones. let mut wtxn = index.write_txn().unwrap(); let content = &b"id,name\n1,updated second kevin\n2,updated kevina\n3,updated benoit\n"[..]; - let mut builder = IndexDocuments::new(&mut wtxn, &index); + let mut builder = IndexDocuments::new(&mut wtxn, &index, 2); builder.update_format(UpdateFormat::Csv); - builder.execute(content, |_| ()).unwrap(); + builder.execute(content, |_, _| ()).unwrap(); wtxn.commit().unwrap(); // Check that there is **always** 3 documents. @@ -693,10 +711,10 @@ mod tests { // change the index method to merge documents. let mut wtxn = index.write_txn().unwrap(); let content = &b"id,name\n1,kevin\n1,kevina\n1,benoit\n"[..]; - let mut builder = IndexDocuments::new(&mut wtxn, &index); + let mut builder = IndexDocuments::new(&mut wtxn, &index, 0); builder.update_format(UpdateFormat::Csv); builder.index_documents_method(IndexDocumentsMethod::UpdateDocuments); - builder.execute(content, |_| ()).unwrap(); + builder.execute(content, |_, _| ()).unwrap(); wtxn.commit().unwrap(); // Check that there is only 1 document now. @@ -720,10 +738,10 @@ mod tests { // Second we send 1 document with id 1, to force it to be merged with the previous one. let mut wtxn = index.write_txn().unwrap(); let content = &b"id,age\n1,25\n"[..]; - let mut builder = IndexDocuments::new(&mut wtxn, &index); + let mut builder = IndexDocuments::new(&mut wtxn, &index, 1); builder.update_format(UpdateFormat::Csv); builder.index_documents_method(IndexDocumentsMethod::UpdateDocuments); - builder.execute(content, |_| ()).unwrap(); + builder.execute(content, |_, _| ()).unwrap(); wtxn.commit().unwrap(); // Check that there is **always** 1 document. @@ -756,10 +774,10 @@ mod tests { // First we send 3 documents with ids from 1 to 3. let mut wtxn = index.write_txn().unwrap(); let content = &b"name\nkevin\nkevina\nbenoit\n"[..]; - let mut builder = IndexDocuments::new(&mut wtxn, &index); + let mut builder = IndexDocuments::new(&mut wtxn, &index, 0); builder.disable_autogenerate_docids(); builder.update_format(UpdateFormat::Csv); - assert!(builder.execute(content, |_| ()).is_err()); + assert!(builder.execute(content, |_, _| ()).is_err()); wtxn.commit().unwrap(); // Check that there is no document. @@ -783,10 +801,10 @@ mod tests { { "name": "kevin" }, { "name": "benoit" } ]"#[..]; - let mut builder = IndexDocuments::new(&mut wtxn, &index); + let mut builder = IndexDocuments::new(&mut wtxn, &index, 0); builder.disable_autogenerate_docids(); builder.update_format(UpdateFormat::Json); - assert!(builder.execute(content, |_| ()).is_err()); + assert!(builder.execute(content, |_, _| ()).is_err()); wtxn.commit().unwrap(); // Check that there is no document. @@ -806,9 +824,9 @@ mod tests { // First we send 3 documents with ids from 1 to 3. let mut wtxn = index.write_txn().unwrap(); let content = &b"name\nkevin\nkevina\nbenoit\n"[..]; - let mut builder = IndexDocuments::new(&mut wtxn, &index); + let mut builder = IndexDocuments::new(&mut wtxn, &index, 0); builder.update_format(UpdateFormat::Csv); - builder.execute(content, |_| ()).unwrap(); + builder.execute(content, |_, _| ()).unwrap(); wtxn.commit().unwrap(); // Check that there is 3 documents now. @@ -824,9 +842,9 @@ mod tests { // Second we send 1 document with the generated uuid, to erase the previous ones. let mut wtxn = index.write_txn().unwrap(); let content = format!("id,name\n{},updated kevin", kevin_uuid); - let mut builder = IndexDocuments::new(&mut wtxn, &index); + let mut builder = IndexDocuments::new(&mut wtxn, &index, 1); builder.update_format(UpdateFormat::Csv); - builder.execute(content.as_bytes(), |_| ()).unwrap(); + builder.execute(content.as_bytes(), |_, _| ()).unwrap(); wtxn.commit().unwrap(); // Check that there is **always** 3 documents. @@ -859,9 +877,9 @@ mod tests { // First we send 3 documents with ids from 1 to 3. let mut wtxn = index.write_txn().unwrap(); let content = &b"id,name\n1,kevin\n2,kevina\n3,benoit\n"[..]; - let mut builder = IndexDocuments::new(&mut wtxn, &index); + let mut builder = IndexDocuments::new(&mut wtxn, &index, 0); builder.update_format(UpdateFormat::Csv); - builder.execute(content, |_| ()).unwrap(); + builder.execute(content, |_, _| ()).unwrap(); wtxn.commit().unwrap(); // Check that there is 3 documents now. @@ -873,9 +891,9 @@ mod tests { // Second we send 1 document without specifying the id. let mut wtxn = index.write_txn().unwrap(); let content = &b"name\nnew kevin"[..]; - let mut builder = IndexDocuments::new(&mut wtxn, &index); + let mut builder = IndexDocuments::new(&mut wtxn, &index, 1); builder.update_format(UpdateFormat::Csv); - builder.execute(content, |_| ()).unwrap(); + builder.execute(content, |_, _| ()).unwrap(); wtxn.commit().unwrap(); // Check that there is 4 documents now. @@ -895,9 +913,9 @@ mod tests { // First we send 0 documents and only headers. let mut wtxn = index.write_txn().unwrap(); let content = &b"id,name\n"[..]; - let mut builder = IndexDocuments::new(&mut wtxn, &index); + let mut builder = IndexDocuments::new(&mut wtxn, &index, 0); builder.update_format(UpdateFormat::Csv); - builder.execute(content, |_| ()).unwrap(); + builder.execute(content, |_, _| ()).unwrap(); wtxn.commit().unwrap(); // Check that there is no documents. @@ -921,9 +939,9 @@ mod tests { { "name": "kevina", "id": 21 }, { "name": "benoit" } ]"#[..]; - let mut builder = IndexDocuments::new(&mut wtxn, &index); + let mut builder = IndexDocuments::new(&mut wtxn, &index, 0); builder.update_format(UpdateFormat::Json); - builder.execute(content, |_| ()).unwrap(); + builder.execute(content, |_, _| ()).unwrap(); wtxn.commit().unwrap(); // Check that there is 3 documents now. @@ -943,9 +961,9 @@ mod tests { // First we send 0 documents. let mut wtxn = index.write_txn().unwrap(); let content = &b"[]"[..]; - let mut builder = IndexDocuments::new(&mut wtxn, &index); + let mut builder = IndexDocuments::new(&mut wtxn, &index, 0); builder.update_format(UpdateFormat::Json); - builder.execute(content, |_| ()).unwrap(); + builder.execute(content, |_, _| ()).unwrap(); wtxn.commit().unwrap(); // Check that there is no documents. @@ -969,9 +987,9 @@ mod tests { { "name": "kevina", "id": 21 } { "name": "benoit" } "#[..]; - let mut builder = IndexDocuments::new(&mut wtxn, &index); + let mut builder = IndexDocuments::new(&mut wtxn, &index, 0); builder.update_format(UpdateFormat::JsonStream); - builder.execute(content, |_| ()).unwrap(); + builder.execute(content, |_, _| ()).unwrap(); wtxn.commit().unwrap(); // Check that there is 3 documents now. @@ -992,18 +1010,18 @@ mod tests { let mut wtxn = index.write_txn().unwrap(); // There is a space in the document id. let content = &b"id,name\nbrume bleue,kevin\n"[..]; - let mut builder = IndexDocuments::new(&mut wtxn, &index); + let mut builder = IndexDocuments::new(&mut wtxn, &index, 0); builder.update_format(UpdateFormat::Csv); - assert!(builder.execute(content, |_| ()).is_err()); + assert!(builder.execute(content, |_, _| ()).is_err()); wtxn.commit().unwrap(); // First we send 1 document with a valid id. let mut wtxn = index.write_txn().unwrap(); // There is a space in the document id. let content = &b"id,name\n32,kevin\n"[..]; - let mut builder = IndexDocuments::new(&mut wtxn, &index); + let mut builder = IndexDocuments::new(&mut wtxn, &index, 1); builder.update_format(UpdateFormat::Csv); - builder.execute(content, |_| ()).unwrap(); + builder.execute(content, |_, _| ()).unwrap(); wtxn.commit().unwrap(); // Check that there is 1 document now. @@ -1027,9 +1045,9 @@ mod tests { { "id": 1, "name": "kevina", "array": ["I", "am", "fine"] }, { "id": 2, "name": "benoit", "array_of_object": [{ "wow": "amazing" }] } ]"#[..]; - let mut builder = IndexDocuments::new(&mut wtxn, &index); + let mut builder = IndexDocuments::new(&mut wtxn, &index, 0); builder.update_format(UpdateFormat::Json); - builder.execute(content, |_| ()).unwrap(); + builder.execute(content, |_, _| ()).unwrap(); wtxn.commit().unwrap(); // Check that there is 1 documents now. diff --git a/src/update/mod.rs b/src/update/mod.rs index 407d9f498..2cd532c83 100644 --- a/src/update/mod.rs +++ b/src/update/mod.rs @@ -10,7 +10,7 @@ mod update_step; pub use self::available_documents_ids::AvailableDocumentsIds; pub use self::clear_documents::ClearDocuments; pub use self::delete_documents::DeleteDocuments; -pub use self::index_documents::{IndexDocuments, IndexDocumentsMethod, UpdateFormat}; +pub use self::index_documents::{IndexDocuments, IndexDocumentsMethod, UpdateFormat, DocumentAdditionResult}; pub use self::facets::Facets; pub use self::settings::Settings; pub use self::update_builder::UpdateBuilder; diff --git a/src/update/settings.rs b/src/update/settings.rs index dbe951761..17a9da1eb 100644 --- a/src/update/settings.rs +++ b/src/update/settings.rs @@ -23,6 +23,7 @@ pub struct Settings<'a, 't, 'u, 'i> { pub(crate) chunk_compression_level: Option, pub(crate) chunk_fusing_shrink_size: Option, pub(crate) thread_pool: Option<&'a ThreadPool>, + update_id: u64, // If a struct field is set to `None` it means that it hasn't been set by the user, // however if it is `Some(None)` it means that the user forced a reset of the setting. @@ -33,7 +34,11 @@ pub struct Settings<'a, 't, 'u, 'i> { } impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> { - pub fn new(wtxn: &'t mut heed::RwTxn<'i, 'u>, index: &'i Index) -> Settings<'a, 't, 'u, 'i> { + pub fn new( + wtxn: &'t mut heed::RwTxn<'i, 'u>, + index: &'i Index, + update_id: u64, + ) -> Settings<'a, 't, 'u, 'i> { Settings { wtxn, index, @@ -49,6 +54,7 @@ impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> { displayed_fields: None, faceted_fields: None, criteria: None, + update_id, } } @@ -86,9 +92,11 @@ impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> { fn reindex(&mut self, cb: &F, old_fields_ids_map: FieldsIdsMap) -> anyhow::Result<()> where - F: Fn(UpdateIndexingStep) + Sync, + F: Fn(UpdateIndexingStep, u64) + Sync { let fields_ids_map = self.index.fields_ids_map(self.wtxn)?; + let update_id = self.update_id; + let cb = |step| cb(step, update_id); // if the settings are set before any document update, we don't need to do anything, and // will set the primary key during the first document addition. if self.index.number_of_documents(&self.wtxn)? == 0 { @@ -118,11 +126,11 @@ impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> { fields_ids_map.clone())?; // We clear the full database (words-fst, documents ids and documents content). - ClearDocuments::new(self.wtxn, self.index).execute()?; + ClearDocuments::new(self.wtxn, self.index, self.update_id).execute()?; // We index the generated `TransformOutput` which must contain // all the documents with fields in the newly defined searchable order. - let mut indexing_builder = IndexDocuments::new(self.wtxn, self.index); + let mut indexing_builder = IndexDocuments::new(self.wtxn, self.index, self.update_id); indexing_builder.log_every_n = self.log_every_n; indexing_builder.max_nb_chunks = self.max_nb_chunks; indexing_builder.max_memory = self.max_memory; @@ -239,7 +247,7 @@ impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> { pub fn execute(mut self, progress_callback: F) -> anyhow::Result<()> where - F: Fn(UpdateIndexingStep) + Sync + F: Fn(UpdateIndexingStep, u64) + Sync { let old_fields_ids_map = self.index.fields_ids_map(&self.wtxn)?; self.update_displayed()?; @@ -276,16 +284,16 @@ mod tests { // First we send 3 documents with ids from 1 to 3. let mut wtxn = index.write_txn().unwrap(); let content = &b"name,age\nkevin,23\nkevina,21\nbenoit,34\n"[..]; - let mut builder = IndexDocuments::new(&mut wtxn, &index); + let mut builder = IndexDocuments::new(&mut wtxn, &index, 0); builder.update_format(UpdateFormat::Csv); - builder.execute(content, |_| ()).unwrap(); + builder.execute(content, |_, _| ()).unwrap(); wtxn.commit().unwrap(); // We change the searchable fields to be the "name" field only. let mut wtxn = index.write_txn().unwrap(); - let mut builder = Settings::new(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, 1); builder.set_searchable_fields(vec!["name".into()]); - builder.execute(|_| ()).unwrap(); + builder.execute(|_, _| ()).unwrap(); wtxn.commit().unwrap(); // Check that the searchable field is correctly set to "name" only. @@ -305,9 +313,9 @@ mod tests { // We change the searchable fields to be the "name" field only. let mut wtxn = index.write_txn().unwrap(); - let mut builder = Settings::new(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, 2); builder.reset_searchable_fields(); - builder.execute(|_| ()).unwrap(); + builder.execute(|_, _| ()).unwrap(); wtxn.commit().unwrap(); // Check that the searchable field have been reset and documents are found now. @@ -331,18 +339,18 @@ mod tests { // First we send 3 documents with ids from 1 to 3. let mut wtxn = index.write_txn().unwrap(); let content = &b"name,age\nkevin,23\nkevina,21\nbenoit,34\n"[..]; - let mut builder = IndexDocuments::new(&mut wtxn, &index); + let mut builder = IndexDocuments::new(&mut wtxn, &index, 0); builder.update_format(UpdateFormat::Csv); - builder.execute(content, |_| ()).unwrap(); + builder.execute(content, |_, _| ()).unwrap(); wtxn.commit().unwrap(); // In the same transaction we change the displayed fields to be only the "age". // We also change the searchable fields to be the "name" field only. let mut wtxn = index.write_txn().unwrap(); - let mut builder = Settings::new(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, 1); builder.set_displayed_fields(vec!["age".into()]); builder.set_searchable_fields(vec!["name".into()]); - builder.execute(|_| ()).unwrap(); + builder.execute(|_, _| ()).unwrap(); wtxn.commit().unwrap(); // Check that the displayed fields are correctly set to `None` (default value). @@ -353,9 +361,9 @@ mod tests { // We change the searchable fields to be the "name" field only. let mut wtxn = index.write_txn().unwrap(); - let mut builder = Settings::new(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, 2); builder.reset_searchable_fields(); - builder.execute(|_| ()).unwrap(); + builder.execute(|_, _| ()).unwrap(); wtxn.commit().unwrap(); // Check that the displayed fields always contains only the "age" field. @@ -375,9 +383,9 @@ mod tests { // First we send 3 documents with ids from 1 to 3. let mut wtxn = index.write_txn().unwrap(); let content = &b"name,age\nkevin,23\nkevina,21\nbenoit,34\n"[..]; - let mut builder = IndexDocuments::new(&mut wtxn, &index); + let mut builder = IndexDocuments::new(&mut wtxn, &index, 0); builder.update_format(UpdateFormat::Csv); - builder.execute(content, |_| ()).unwrap(); + builder.execute(content, |_, _| ()).unwrap(); wtxn.commit().unwrap(); // Check that the displayed fields are correctly set to `None` (default value). @@ -397,14 +405,14 @@ mod tests { // First we send 3 documents with ids from 1 to 3. let mut wtxn = index.write_txn().unwrap(); let content = &b"name,age\nkevin,23\nkevina,21\nbenoit,34\n"[..]; - let mut builder = IndexDocuments::new(&mut wtxn, &index); + let mut builder = IndexDocuments::new(&mut wtxn, &index, 0); builder.update_format(UpdateFormat::Csv); - builder.execute(content, |_| ()).unwrap(); + builder.execute(content, |_, _| ()).unwrap(); // In the same transaction we change the displayed fields to be only the age. - let mut builder = Settings::new(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, 0); builder.set_displayed_fields(vec!["age".into()]); - builder.execute(|_| ()).unwrap(); + builder.execute(|_, _| ()).unwrap(); wtxn.commit().unwrap(); // Check that the displayed fields are correctly set to only the "age" field. @@ -415,9 +423,9 @@ mod tests { // We reset the fields ids to become `None`, the default value. let mut wtxn = index.write_txn().unwrap(); - let mut builder = Settings::new(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, 0); builder.reset_displayed_fields(); - builder.execute(|_| ()).unwrap(); + builder.execute(|_, _| ()).unwrap(); wtxn.commit().unwrap(); // Check that the displayed fields are correctly set to `None` (default value). @@ -436,15 +444,15 @@ mod tests { // Set the faceted fields to be the age. let mut wtxn = index.write_txn().unwrap(); - let mut builder = Settings::new(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, 0); builder.set_faceted_fields(hashmap!{ "age".into() => "integer".into() }); - builder.execute(|_| ()).unwrap(); + builder.execute(|_, _| ()).unwrap(); // Then index some documents. let content = &b"name,age\nkevin,23\nkevina,21\nbenoit,34\n"[..]; - let mut builder = IndexDocuments::new(&mut wtxn, &index); + let mut builder = IndexDocuments::new(&mut wtxn, &index, 1); builder.update_format(UpdateFormat::Csv); - builder.execute(content, |_| ()).unwrap(); + builder.execute(content, |_, _| ()).unwrap(); wtxn.commit().unwrap(); // Check that the displayed fields are correctly set. @@ -459,9 +467,9 @@ mod tests { // Index a little more documents with new and current facets values. let mut wtxn = index.write_txn().unwrap(); let content = &b"name,age\nkevin2,23\nkevina2,21\nbenoit2,35\n"[..]; - let mut builder = IndexDocuments::new(&mut wtxn, &index); + let mut builder = IndexDocuments::new(&mut wtxn, &index, 2); builder.update_format(UpdateFormat::Csv); - builder.execute(content, |_| ()).unwrap(); + builder.execute(content, |_, _| ()).unwrap(); wtxn.commit().unwrap(); let rtxn = index.read_txn().unwrap(); @@ -480,14 +488,14 @@ mod tests { // Set all the settings except searchable let mut wtxn = index.write_txn().unwrap(); - let mut builder = Settings::new(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, 0); builder.set_displayed_fields(vec!["hello".to_string()]); builder.set_faceted_fields(hashmap!{ "age".into() => "integer".into(), "toto".into() => "integer".into(), }); builder.set_criteria(vec!["asc(toto)".to_string()]); - builder.execute(|_| ()).unwrap(); + builder.execute(|_, _| ()).unwrap(); wtxn.commit().unwrap(); // check the output @@ -500,9 +508,9 @@ mod tests { // We set toto and age as searchable to force reordering of the fields let mut wtxn = index.write_txn().unwrap(); - let mut builder = Settings::new(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, 1); builder.set_searchable_fields(vec!["toto".to_string(), "age".to_string()]); - builder.execute(|_| ()).unwrap(); + builder.execute(|_, _| ()).unwrap(); wtxn.commit().unwrap(); let rtxn = index.read_txn().unwrap(); diff --git a/src/update/update_builder.rs b/src/update/update_builder.rs index 43e3c28ed..8d6eb034d 100644 --- a/src/update/update_builder.rs +++ b/src/update/update_builder.rs @@ -13,10 +13,11 @@ pub struct UpdateBuilder<'a> { pub(crate) chunk_compression_level: Option, pub(crate) chunk_fusing_shrink_size: Option, pub(crate) thread_pool: Option<&'a ThreadPool>, + pub(crate) update_id: u64, } impl<'a> UpdateBuilder<'a> { - pub fn new() -> UpdateBuilder<'a> { + pub fn new(update_id: u64) -> UpdateBuilder<'a> { UpdateBuilder { log_every_n: None, max_nb_chunks: None, @@ -26,6 +27,7 @@ impl<'a> UpdateBuilder<'a> { chunk_compression_level: None, chunk_fusing_shrink_size: None, thread_pool: None, + update_id, } } @@ -67,7 +69,7 @@ impl<'a> UpdateBuilder<'a> { index: &'i Index, ) -> ClearDocuments<'t, 'u, 'i> { - ClearDocuments::new(wtxn, index) + ClearDocuments::new(wtxn, index, self.update_id) } pub fn delete_documents<'t, 'u, 'i>( @@ -76,7 +78,7 @@ impl<'a> UpdateBuilder<'a> { index: &'i Index, ) -> anyhow::Result> { - DeleteDocuments::new(wtxn, index) + DeleteDocuments::new(wtxn, index, self.update_id) } pub fn index_documents<'t, 'u, 'i>( @@ -85,7 +87,7 @@ impl<'a> UpdateBuilder<'a> { index: &'i Index, ) -> IndexDocuments<'t, 'u, 'i, 'a> { - let mut builder = IndexDocuments::new(wtxn, index); + let mut builder = IndexDocuments::new(wtxn, index, self.update_id); builder.log_every_n = self.log_every_n; builder.max_nb_chunks = self.max_nb_chunks; @@ -105,7 +107,7 @@ impl<'a> UpdateBuilder<'a> { index: &'i Index, ) -> Settings<'a, 't, 'u, 'i> { - let mut builder = Settings::new(wtxn, index); + let mut builder = Settings::new(wtxn, index, self.update_id); builder.log_every_n = self.log_every_n; builder.max_nb_chunks = self.max_nb_chunks; @@ -125,7 +127,7 @@ impl<'a> UpdateBuilder<'a> { index: &'i Index, ) -> Facets<'t, 'u, 'i> { - let mut builder = Facets::new(wtxn, index); + let mut builder = Facets::new(wtxn, index, self.update_id); builder.chunk_compression_type = self.chunk_compression_type; builder.chunk_compression_level = self.chunk_compression_level; @@ -134,9 +136,3 @@ impl<'a> UpdateBuilder<'a> { builder } } - -impl Default for UpdateBuilder<'_> { - fn default() -> Self { - Self::new() - } -}