From 91d8198d17e6801a24a7b18a24c16651001aec7e Mon Sep 17 00:00:00 2001 From: mpostma Date: Wed, 30 Dec 2020 18:43:50 +0100 Subject: [PATCH 1/4] return documents number on addition --- src/update/index_documents/mod.rs | 15 ++++++++++++--- src/update/mod.rs | 2 +- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/src/update/index_documents/mod.rs b/src/update/index_documents/mod.rs index 90bc5ef3d..2c5d34092 100644 --- a/src/update/index_documents/mod.rs +++ b/src/update/index_documents/mod.rs @@ -12,8 +12,9 @@ use grenad::{Writer, Sorter, Merger, Reader, FileFuse, CompressionType}; use heed::types::ByteSlice; use log::{debug, info, error}; use memmap::Mmap; -use rayon::prelude::*; use rayon::ThreadPool; +use rayon::prelude::*; +use serde::{Serialize, Deserialize}; use crate::index::Index; use crate::update::{Facets, UpdateIndexingStep}; @@ -32,6 +33,11 @@ mod merge_function; mod store; mod transform; +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct DocumentAdditionResult { + nb_documents: usize, +} + #[derive(Debug, Copy, Clone)] pub enum WriteMethod { Append, @@ -253,7 +259,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { self.autogenerate_docids = false; } - pub fn execute(self, reader: R, progress_callback: F) -> anyhow::Result<()> + pub fn execute(self, reader: R, progress_callback: F) -> anyhow::Result where R: io::Read, F: Fn(UpdateIndexingStep) + Sync, @@ -279,9 +285,12 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { UpdateFormat::JsonStream => transform.output_from_json_stream(reader, &progress_callback)?, }; + let nb_documents = output.documents_count; + info!("Update transformed in {:.02?}", before_transform.elapsed()); - self.execute_raw(output, progress_callback) + self.execute_raw(output, progress_callback)?; + Ok(DocumentAdditionResult { nb_documents }) } pub fn execute_raw(self, output: TransformOutput, progress_callback: F) -> anyhow::Result<()> diff --git a/src/update/mod.rs b/src/update/mod.rs index 407d9f498..2cd532c83 100644 --- a/src/update/mod.rs +++ b/src/update/mod.rs @@ -10,7 +10,7 @@ mod update_step; pub use self::available_documents_ids::AvailableDocumentsIds; pub use self::clear_documents::ClearDocuments; pub use self::delete_documents::DeleteDocuments; -pub use self::index_documents::{IndexDocuments, IndexDocumentsMethod, UpdateFormat}; +pub use self::index_documents::{IndexDocuments, IndexDocumentsMethod, UpdateFormat, DocumentAdditionResult}; pub use self::facets::Facets; pub use self::settings::Settings; pub use self::update_builder::UpdateBuilder; From d487791b0312d965ac822d5868d5274e88ba7368 Mon Sep 17 00:00:00 2001 From: mpostma Date: Tue, 22 Dec 2020 18:17:35 +0100 Subject: [PATCH 2/4] derive serde for method and format This is nicer when working with UpdateMeta struct --- src/update/index_documents/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/update/index_documents/mod.rs b/src/update/index_documents/mod.rs index 2c5d34092..7fab93343 100644 --- a/src/update/index_documents/mod.rs +++ b/src/update/index_documents/mod.rs @@ -181,7 +181,7 @@ pub fn write_into_lmdb_database( Ok(()) } -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] #[non_exhaustive] pub enum IndexDocumentsMethod { /// Replace the previous document with the new one, @@ -193,7 +193,7 @@ pub enum IndexDocumentsMethod { UpdateDocuments, } -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] #[non_exhaustive] pub enum UpdateFormat { /// The given update is a real **comma seperated** CSV with headers on the first line. From 3b604326874691518a2165f9e3d18dd790d6d982 Mon Sep 17 00:00:00 2001 From: mpostma Date: Tue, 22 Dec 2020 16:21:07 +0100 Subject: [PATCH 3/4] Use update_id in UpdateBuilder Add `the update_id` to the to the updates. The rationale is the following: - It allows for better tracability of the update events, thus improved debugging and logging. - The enigne is now aware of what he's already processed, and can return it if asked. It may not make sense now, but in the future, the update store may not work the same way, and this information about the state of the engine will be desirable (distributed environement). --- http-ui/src/main.rs | 6 +- src/search/facet/facet_condition.rs | 12 ++-- src/update/clear_documents.rs | 10 +++- src/update/delete_documents.rs | 5 +- src/update/facets.rs | 8 ++- src/update/index_documents/mod.rs | 87 ++++++++++++++++------------- src/update/settings.rs | 78 ++++++++++++++------------ src/update/update_builder.rs | 20 +++---- 8 files changed, 127 insertions(+), 99 deletions(-) diff --git a/http-ui/src/main.rs b/http-ui/src/main.rs index 5c61d3e75..53c2cb460 100644 --- a/http-ui/src/main.rs +++ b/http-ui/src/main.rs @@ -322,7 +322,7 @@ async fn main() -> anyhow::Result<()> { // the type hint is necessary: https://github.com/rust-lang/rust/issues/32600 move |update_id, meta, content:&_| { // We prepare the update by using the update builder. - let mut update_builder = UpdateBuilder::new(); + let mut update_builder = UpdateBuilder::new(update_id); if let Some(max_nb_chunks) = indexer_opt_cloned.max_nb_chunks { update_builder.max_nb_chunks(max_nb_chunks); } @@ -363,7 +363,7 @@ async fn main() -> anyhow::Result<()> { otherwise => panic!("invalid encoding format {:?}", otherwise), }; - let result = builder.execute(reader, |indexing_step| { + let result = builder.execute(reader, |indexing_step, update_id| { let (current, total) = match indexing_step { TransformFromUserIntoGenericFormat { documents_seen } => (documents_seen, None), ComputeIdsAndMergeDocuments { documents_seen, total_documents } => (documents_seen, Some(total_documents)), @@ -430,7 +430,7 @@ async fn main() -> anyhow::Result<()> { } } - let result = builder.execute(|indexing_step| { + let result = builder.execute(|indexing_step, update_id| { let (current, total) = match indexing_step { TransformFromUserIntoGenericFormat { documents_seen } => (documents_seen, None), ComputeIdsAndMergeDocuments { documents_seen, total_documents } => (documents_seen, Some(total_documents)), diff --git a/src/search/facet/facet_condition.rs b/src/search/facet/facet_condition.rs index 578344d37..2f0444dce 100644 --- a/src/search/facet/facet_condition.rs +++ b/src/search/facet/facet_condition.rs @@ -625,9 +625,9 @@ mod tests { // Set the faceted fields to be the channel. let mut wtxn = index.write_txn().unwrap(); - let mut builder = Settings::new(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, 0); builder.set_faceted_fields(hashmap!{ "channel".into() => "string".into() }); - builder.execute(|_| ()).unwrap(); + builder.execute(|_, _| ()).unwrap(); wtxn.commit().unwrap(); // Test that the facet condition is correctly generated. @@ -654,9 +654,9 @@ mod tests { // Set the faceted fields to be the channel. let mut wtxn = index.write_txn().unwrap(); - let mut builder = Settings::new(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, 0); builder.set_faceted_fields(hashmap!{ "timestamp".into() => "integer".into() }); - builder.execute(|_| ()).unwrap(); + builder.execute(|_, _| ()).unwrap(); wtxn.commit().unwrap(); // Test that the facet condition is correctly generated. @@ -682,13 +682,13 @@ mod tests { // Set the faceted fields to be the channel. let mut wtxn = index.write_txn().unwrap(); - let mut builder = Settings::new(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, 0); builder.set_searchable_fields(vec!["channel".into(), "timestamp".into()]); // to keep the fields order builder.set_faceted_fields(hashmap!{ "channel".into() => "string".into(), "timestamp".into() => "integer".into(), }); - builder.execute(|_| ()).unwrap(); + builder.execute(|_, _| ()).unwrap(); wtxn.commit().unwrap(); // Test that the facet condition is correctly generated. diff --git a/src/update/clear_documents.rs b/src/update/clear_documents.rs index ac359ba0d..a84596901 100644 --- a/src/update/clear_documents.rs +++ b/src/update/clear_documents.rs @@ -4,11 +4,17 @@ use crate::{ExternalDocumentsIds, Index}; pub struct ClearDocuments<'t, 'u, 'i> { wtxn: &'t mut heed::RwTxn<'i, 'u>, index: &'i Index, + _update_id: u64, } impl<'t, 'u, 'i> ClearDocuments<'t, 'u, 'i> { - pub fn new(wtxn: &'t mut heed::RwTxn<'i, 'u>, index: &'i Index) -> ClearDocuments<'t, 'u, 'i> { - ClearDocuments { wtxn, index } + pub fn new( + wtxn: &'t mut heed::RwTxn<'i, 'u>, + index: &'i Index, + update_id: u64 + ) -> ClearDocuments<'t, 'u, 'i> { + + ClearDocuments { wtxn, index, _update_id: update_id } } pub fn execute(self) -> anyhow::Result { diff --git a/src/update/delete_documents.rs b/src/update/delete_documents.rs index 6dc4b1cfa..2b67535c9 100644 --- a/src/update/delete_documents.rs +++ b/src/update/delete_documents.rs @@ -12,12 +12,14 @@ pub struct DeleteDocuments<'t, 'u, 'i> { index: &'i Index, external_documents_ids: ExternalDocumentsIds<'static>, documents_ids: RoaringBitmap, + update_id: u64, } impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> { pub fn new( wtxn: &'t mut heed::RwTxn<'i, 'u>, index: &'i Index, + update_id: u64, ) -> anyhow::Result> { let external_documents_ids = index @@ -29,6 +31,7 @@ impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> { index, external_documents_ids, documents_ids: RoaringBitmap::new(), + update_id, }) } @@ -64,7 +67,7 @@ impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> { // We can execute a ClearDocuments operation when the number of documents // to delete is exactly the number of documents in the database. if current_documents_ids_len == self.documents_ids.len() { - return ClearDocuments::new(self.wtxn, self.index).execute(); + return ClearDocuments::new(self.wtxn, self.index, self.update_id).execute(); } let fields_ids_map = self.index.fields_ids_map(self.wtxn)?; diff --git a/src/update/facets.rs b/src/update/facets.rs index a268cbeaf..522a4d350 100644 --- a/src/update/facets.rs +++ b/src/update/facets.rs @@ -24,10 +24,15 @@ pub struct Facets<'t, 'u, 'i> { pub(crate) chunk_fusing_shrink_size: Option, level_group_size: NonZeroUsize, min_level_size: NonZeroUsize, + _update_id: u64, } impl<'t, 'u, 'i> Facets<'t, 'u, 'i> { - pub fn new(wtxn: &'t mut heed::RwTxn<'i, 'u>, index: &'i Index) -> Facets<'t, 'u, 'i> { + pub fn new( + wtxn: &'t mut heed::RwTxn<'i, 'u>, + index: &'i Index, + update_id: u64, + ) -> Facets<'t, 'u, 'i> { Facets { wtxn, index, @@ -36,6 +41,7 @@ impl<'t, 'u, 'i> Facets<'t, 'u, 'i> { chunk_fusing_shrink_size: None, level_group_size: NonZeroUsize::new(4).unwrap(), min_level_size: NonZeroUsize::new(5).unwrap(), + _update_id: update_id, } } diff --git a/src/update/index_documents/mod.rs b/src/update/index_documents/mod.rs index 7fab93343..e38c640a0 100644 --- a/src/update/index_documents/mod.rs +++ b/src/update/index_documents/mod.rs @@ -220,10 +220,15 @@ pub struct IndexDocuments<'t, 'u, 'i, 'a> { update_method: IndexDocumentsMethod, update_format: UpdateFormat, autogenerate_docids: bool, + update_id: u64, } impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { - pub fn new(wtxn: &'t mut heed::RwTxn<'i, 'u>, index: &'i Index) -> IndexDocuments<'t, 'u, 'i, 'a> { + pub fn new( + wtxn: &'t mut heed::RwTxn<'i, 'u>, + index: &'i Index, + update_id: u64, + ) -> IndexDocuments<'t, 'u, 'i, 'a> { IndexDocuments { wtxn, index, @@ -240,6 +245,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { update_method: IndexDocumentsMethod::ReplaceDocuments, update_format: UpdateFormat::Json, autogenerate_docids: true, + update_id, } } @@ -262,9 +268,11 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { pub fn execute(self, reader: R, progress_callback: F) -> anyhow::Result where R: io::Read, - F: Fn(UpdateIndexingStep) + Sync, + F: Fn(UpdateIndexingStep, u64) + Sync, { let before_transform = Instant::now(); + let update_id = self.update_id; + let progress_callback = |step| progress_callback(step, update_id); let transform = Transform { rtxn: &self.wtxn, @@ -321,6 +329,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { chunk_compression_level: self.chunk_compression_level, chunk_fusing_shrink_size: self.chunk_fusing_shrink_size, thread_pool: self.thread_pool, + update_id: self.update_id, }; let mut deletion_builder = update_builder.delete_documents(self.wtxn, self.index)?; debug!("documents to delete {:?}", replaced_documents_ids); @@ -616,7 +625,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { }); } - let mut builder = Facets::new(self.wtxn, self.index); + let mut builder = Facets::new(self.wtxn, self.index, self.update_id); builder.chunk_compression_type = self.chunk_compression_type; builder.chunk_compression_level = self.chunk_compression_level; builder.chunk_fusing_shrink_size = self.chunk_fusing_shrink_size; @@ -651,9 +660,9 @@ mod tests { // First we send 3 documents with ids from 1 to 3. let mut wtxn = index.write_txn().unwrap(); let content = &b"id,name\n1,kevin\n2,kevina\n3,benoit\n"[..]; - let mut builder = IndexDocuments::new(&mut wtxn, &index); + let mut builder = IndexDocuments::new(&mut wtxn, &index, 0); builder.update_format(UpdateFormat::Csv); - builder.execute(content, |_| ()).unwrap(); + builder.execute(content, |_, _| ()).unwrap(); wtxn.commit().unwrap(); // Check that there is 3 documents now. @@ -665,9 +674,9 @@ mod tests { // Second we send 1 document with id 1, to erase the previous ones. let mut wtxn = index.write_txn().unwrap(); let content = &b"id,name\n1,updated kevin\n"[..]; - let mut builder = IndexDocuments::new(&mut wtxn, &index); + let mut builder = IndexDocuments::new(&mut wtxn, &index, 1); builder.update_format(UpdateFormat::Csv); - builder.execute(content, |_| ()).unwrap(); + builder.execute(content, |_, _| ()).unwrap(); wtxn.commit().unwrap(); // Check that there is **always** 3 documents. @@ -679,9 +688,9 @@ mod tests { // Third we send 3 documents again to replace the existing ones. let mut wtxn = index.write_txn().unwrap(); let content = &b"id,name\n1,updated second kevin\n2,updated kevina\n3,updated benoit\n"[..]; - let mut builder = IndexDocuments::new(&mut wtxn, &index); + let mut builder = IndexDocuments::new(&mut wtxn, &index, 2); builder.update_format(UpdateFormat::Csv); - builder.execute(content, |_| ()).unwrap(); + builder.execute(content, |_, _| ()).unwrap(); wtxn.commit().unwrap(); // Check that there is **always** 3 documents. @@ -702,10 +711,10 @@ mod tests { // change the index method to merge documents. let mut wtxn = index.write_txn().unwrap(); let content = &b"id,name\n1,kevin\n1,kevina\n1,benoit\n"[..]; - let mut builder = IndexDocuments::new(&mut wtxn, &index); + let mut builder = IndexDocuments::new(&mut wtxn, &index, 0); builder.update_format(UpdateFormat::Csv); builder.index_documents_method(IndexDocumentsMethod::UpdateDocuments); - builder.execute(content, |_| ()).unwrap(); + builder.execute(content, |_, _| ()).unwrap(); wtxn.commit().unwrap(); // Check that there is only 1 document now. @@ -729,10 +738,10 @@ mod tests { // Second we send 1 document with id 1, to force it to be merged with the previous one. let mut wtxn = index.write_txn().unwrap(); let content = &b"id,age\n1,25\n"[..]; - let mut builder = IndexDocuments::new(&mut wtxn, &index); + let mut builder = IndexDocuments::new(&mut wtxn, &index, 1); builder.update_format(UpdateFormat::Csv); builder.index_documents_method(IndexDocumentsMethod::UpdateDocuments); - builder.execute(content, |_| ()).unwrap(); + builder.execute(content, |_, _| ()).unwrap(); wtxn.commit().unwrap(); // Check that there is **always** 1 document. @@ -765,10 +774,10 @@ mod tests { // First we send 3 documents with ids from 1 to 3. let mut wtxn = index.write_txn().unwrap(); let content = &b"name\nkevin\nkevina\nbenoit\n"[..]; - let mut builder = IndexDocuments::new(&mut wtxn, &index); + let mut builder = IndexDocuments::new(&mut wtxn, &index, 0); builder.disable_autogenerate_docids(); builder.update_format(UpdateFormat::Csv); - assert!(builder.execute(content, |_| ()).is_err()); + assert!(builder.execute(content, |_, _| ()).is_err()); wtxn.commit().unwrap(); // Check that there is no document. @@ -792,10 +801,10 @@ mod tests { { "name": "kevin" }, { "name": "benoit" } ]"#[..]; - let mut builder = IndexDocuments::new(&mut wtxn, &index); + let mut builder = IndexDocuments::new(&mut wtxn, &index, 0); builder.disable_autogenerate_docids(); builder.update_format(UpdateFormat::Json); - assert!(builder.execute(content, |_| ()).is_err()); + assert!(builder.execute(content, |_, _| ()).is_err()); wtxn.commit().unwrap(); // Check that there is no document. @@ -815,9 +824,9 @@ mod tests { // First we send 3 documents with ids from 1 to 3. let mut wtxn = index.write_txn().unwrap(); let content = &b"name\nkevin\nkevina\nbenoit\n"[..]; - let mut builder = IndexDocuments::new(&mut wtxn, &index); + let mut builder = IndexDocuments::new(&mut wtxn, &index, 0); builder.update_format(UpdateFormat::Csv); - builder.execute(content, |_| ()).unwrap(); + builder.execute(content, |_, _| ()).unwrap(); wtxn.commit().unwrap(); // Check that there is 3 documents now. @@ -833,9 +842,9 @@ mod tests { // Second we send 1 document with the generated uuid, to erase the previous ones. let mut wtxn = index.write_txn().unwrap(); let content = format!("id,name\n{},updated kevin", kevin_uuid); - let mut builder = IndexDocuments::new(&mut wtxn, &index); + let mut builder = IndexDocuments::new(&mut wtxn, &index, 1); builder.update_format(UpdateFormat::Csv); - builder.execute(content.as_bytes(), |_| ()).unwrap(); + builder.execute(content.as_bytes(), |_, _| ()).unwrap(); wtxn.commit().unwrap(); // Check that there is **always** 3 documents. @@ -868,9 +877,9 @@ mod tests { // First we send 3 documents with ids from 1 to 3. let mut wtxn = index.write_txn().unwrap(); let content = &b"id,name\n1,kevin\n2,kevina\n3,benoit\n"[..]; - let mut builder = IndexDocuments::new(&mut wtxn, &index); + let mut builder = IndexDocuments::new(&mut wtxn, &index, 0); builder.update_format(UpdateFormat::Csv); - builder.execute(content, |_| ()).unwrap(); + builder.execute(content, |_, _| ()).unwrap(); wtxn.commit().unwrap(); // Check that there is 3 documents now. @@ -882,9 +891,9 @@ mod tests { // Second we send 1 document without specifying the id. let mut wtxn = index.write_txn().unwrap(); let content = &b"name\nnew kevin"[..]; - let mut builder = IndexDocuments::new(&mut wtxn, &index); + let mut builder = IndexDocuments::new(&mut wtxn, &index, 1); builder.update_format(UpdateFormat::Csv); - builder.execute(content, |_| ()).unwrap(); + builder.execute(content, |_, _| ()).unwrap(); wtxn.commit().unwrap(); // Check that there is 4 documents now. @@ -904,9 +913,9 @@ mod tests { // First we send 0 documents and only headers. let mut wtxn = index.write_txn().unwrap(); let content = &b"id,name\n"[..]; - let mut builder = IndexDocuments::new(&mut wtxn, &index); + let mut builder = IndexDocuments::new(&mut wtxn, &index, 0); builder.update_format(UpdateFormat::Csv); - builder.execute(content, |_| ()).unwrap(); + builder.execute(content, |_, _| ()).unwrap(); wtxn.commit().unwrap(); // Check that there is no documents. @@ -930,9 +939,9 @@ mod tests { { "name": "kevina", "id": 21 }, { "name": "benoit" } ]"#[..]; - let mut builder = IndexDocuments::new(&mut wtxn, &index); + let mut builder = IndexDocuments::new(&mut wtxn, &index, 0); builder.update_format(UpdateFormat::Json); - builder.execute(content, |_| ()).unwrap(); + builder.execute(content, |_, _| ()).unwrap(); wtxn.commit().unwrap(); // Check that there is 3 documents now. @@ -952,9 +961,9 @@ mod tests { // First we send 0 documents. let mut wtxn = index.write_txn().unwrap(); let content = &b"[]"[..]; - let mut builder = IndexDocuments::new(&mut wtxn, &index); + let mut builder = IndexDocuments::new(&mut wtxn, &index, 0); builder.update_format(UpdateFormat::Json); - builder.execute(content, |_| ()).unwrap(); + builder.execute(content, |_, _| ()).unwrap(); wtxn.commit().unwrap(); // Check that there is no documents. @@ -978,9 +987,9 @@ mod tests { { "name": "kevina", "id": 21 } { "name": "benoit" } "#[..]; - let mut builder = IndexDocuments::new(&mut wtxn, &index); + let mut builder = IndexDocuments::new(&mut wtxn, &index, 0); builder.update_format(UpdateFormat::JsonStream); - builder.execute(content, |_| ()).unwrap(); + builder.execute(content, |_, _| ()).unwrap(); wtxn.commit().unwrap(); // Check that there is 3 documents now. @@ -1001,18 +1010,18 @@ mod tests { let mut wtxn = index.write_txn().unwrap(); // There is a space in the document id. let content = &b"id,name\nbrume bleue,kevin\n"[..]; - let mut builder = IndexDocuments::new(&mut wtxn, &index); + let mut builder = IndexDocuments::new(&mut wtxn, &index, 0); builder.update_format(UpdateFormat::Csv); - assert!(builder.execute(content, |_| ()).is_err()); + assert!(builder.execute(content, |_, _| ()).is_err()); wtxn.commit().unwrap(); // First we send 1 document with a valid id. let mut wtxn = index.write_txn().unwrap(); // There is a space in the document id. let content = &b"id,name\n32,kevin\n"[..]; - let mut builder = IndexDocuments::new(&mut wtxn, &index); + let mut builder = IndexDocuments::new(&mut wtxn, &index, 1); builder.update_format(UpdateFormat::Csv); - builder.execute(content, |_| ()).unwrap(); + builder.execute(content, |_, _| ()).unwrap(); wtxn.commit().unwrap(); // Check that there is 1 document now. @@ -1036,9 +1045,9 @@ mod tests { { "id": 1, "name": "kevina", "array": ["I", "am", "fine"] }, { "id": 2, "name": "benoit", "array_of_object": [{ "wow": "amazing" }] } ]"#[..]; - let mut builder = IndexDocuments::new(&mut wtxn, &index); + let mut builder = IndexDocuments::new(&mut wtxn, &index, 0); builder.update_format(UpdateFormat::Json); - builder.execute(content, |_| ()).unwrap(); + builder.execute(content, |_, _| ()).unwrap(); wtxn.commit().unwrap(); // Check that there is 1 documents now. diff --git a/src/update/settings.rs b/src/update/settings.rs index dbe951761..17a9da1eb 100644 --- a/src/update/settings.rs +++ b/src/update/settings.rs @@ -23,6 +23,7 @@ pub struct Settings<'a, 't, 'u, 'i> { pub(crate) chunk_compression_level: Option, pub(crate) chunk_fusing_shrink_size: Option, pub(crate) thread_pool: Option<&'a ThreadPool>, + update_id: u64, // If a struct field is set to `None` it means that it hasn't been set by the user, // however if it is `Some(None)` it means that the user forced a reset of the setting. @@ -33,7 +34,11 @@ pub struct Settings<'a, 't, 'u, 'i> { } impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> { - pub fn new(wtxn: &'t mut heed::RwTxn<'i, 'u>, index: &'i Index) -> Settings<'a, 't, 'u, 'i> { + pub fn new( + wtxn: &'t mut heed::RwTxn<'i, 'u>, + index: &'i Index, + update_id: u64, + ) -> Settings<'a, 't, 'u, 'i> { Settings { wtxn, index, @@ -49,6 +54,7 @@ impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> { displayed_fields: None, faceted_fields: None, criteria: None, + update_id, } } @@ -86,9 +92,11 @@ impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> { fn reindex(&mut self, cb: &F, old_fields_ids_map: FieldsIdsMap) -> anyhow::Result<()> where - F: Fn(UpdateIndexingStep) + Sync, + F: Fn(UpdateIndexingStep, u64) + Sync { let fields_ids_map = self.index.fields_ids_map(self.wtxn)?; + let update_id = self.update_id; + let cb = |step| cb(step, update_id); // if the settings are set before any document update, we don't need to do anything, and // will set the primary key during the first document addition. if self.index.number_of_documents(&self.wtxn)? == 0 { @@ -118,11 +126,11 @@ impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> { fields_ids_map.clone())?; // We clear the full database (words-fst, documents ids and documents content). - ClearDocuments::new(self.wtxn, self.index).execute()?; + ClearDocuments::new(self.wtxn, self.index, self.update_id).execute()?; // We index the generated `TransformOutput` which must contain // all the documents with fields in the newly defined searchable order. - let mut indexing_builder = IndexDocuments::new(self.wtxn, self.index); + let mut indexing_builder = IndexDocuments::new(self.wtxn, self.index, self.update_id); indexing_builder.log_every_n = self.log_every_n; indexing_builder.max_nb_chunks = self.max_nb_chunks; indexing_builder.max_memory = self.max_memory; @@ -239,7 +247,7 @@ impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> { pub fn execute(mut self, progress_callback: F) -> anyhow::Result<()> where - F: Fn(UpdateIndexingStep) + Sync + F: Fn(UpdateIndexingStep, u64) + Sync { let old_fields_ids_map = self.index.fields_ids_map(&self.wtxn)?; self.update_displayed()?; @@ -276,16 +284,16 @@ mod tests { // First we send 3 documents with ids from 1 to 3. let mut wtxn = index.write_txn().unwrap(); let content = &b"name,age\nkevin,23\nkevina,21\nbenoit,34\n"[..]; - let mut builder = IndexDocuments::new(&mut wtxn, &index); + let mut builder = IndexDocuments::new(&mut wtxn, &index, 0); builder.update_format(UpdateFormat::Csv); - builder.execute(content, |_| ()).unwrap(); + builder.execute(content, |_, _| ()).unwrap(); wtxn.commit().unwrap(); // We change the searchable fields to be the "name" field only. let mut wtxn = index.write_txn().unwrap(); - let mut builder = Settings::new(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, 1); builder.set_searchable_fields(vec!["name".into()]); - builder.execute(|_| ()).unwrap(); + builder.execute(|_, _| ()).unwrap(); wtxn.commit().unwrap(); // Check that the searchable field is correctly set to "name" only. @@ -305,9 +313,9 @@ mod tests { // We change the searchable fields to be the "name" field only. let mut wtxn = index.write_txn().unwrap(); - let mut builder = Settings::new(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, 2); builder.reset_searchable_fields(); - builder.execute(|_| ()).unwrap(); + builder.execute(|_, _| ()).unwrap(); wtxn.commit().unwrap(); // Check that the searchable field have been reset and documents are found now. @@ -331,18 +339,18 @@ mod tests { // First we send 3 documents with ids from 1 to 3. let mut wtxn = index.write_txn().unwrap(); let content = &b"name,age\nkevin,23\nkevina,21\nbenoit,34\n"[..]; - let mut builder = IndexDocuments::new(&mut wtxn, &index); + let mut builder = IndexDocuments::new(&mut wtxn, &index, 0); builder.update_format(UpdateFormat::Csv); - builder.execute(content, |_| ()).unwrap(); + builder.execute(content, |_, _| ()).unwrap(); wtxn.commit().unwrap(); // In the same transaction we change the displayed fields to be only the "age". // We also change the searchable fields to be the "name" field only. let mut wtxn = index.write_txn().unwrap(); - let mut builder = Settings::new(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, 1); builder.set_displayed_fields(vec!["age".into()]); builder.set_searchable_fields(vec!["name".into()]); - builder.execute(|_| ()).unwrap(); + builder.execute(|_, _| ()).unwrap(); wtxn.commit().unwrap(); // Check that the displayed fields are correctly set to `None` (default value). @@ -353,9 +361,9 @@ mod tests { // We change the searchable fields to be the "name" field only. let mut wtxn = index.write_txn().unwrap(); - let mut builder = Settings::new(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, 2); builder.reset_searchable_fields(); - builder.execute(|_| ()).unwrap(); + builder.execute(|_, _| ()).unwrap(); wtxn.commit().unwrap(); // Check that the displayed fields always contains only the "age" field. @@ -375,9 +383,9 @@ mod tests { // First we send 3 documents with ids from 1 to 3. let mut wtxn = index.write_txn().unwrap(); let content = &b"name,age\nkevin,23\nkevina,21\nbenoit,34\n"[..]; - let mut builder = IndexDocuments::new(&mut wtxn, &index); + let mut builder = IndexDocuments::new(&mut wtxn, &index, 0); builder.update_format(UpdateFormat::Csv); - builder.execute(content, |_| ()).unwrap(); + builder.execute(content, |_, _| ()).unwrap(); wtxn.commit().unwrap(); // Check that the displayed fields are correctly set to `None` (default value). @@ -397,14 +405,14 @@ mod tests { // First we send 3 documents with ids from 1 to 3. let mut wtxn = index.write_txn().unwrap(); let content = &b"name,age\nkevin,23\nkevina,21\nbenoit,34\n"[..]; - let mut builder = IndexDocuments::new(&mut wtxn, &index); + let mut builder = IndexDocuments::new(&mut wtxn, &index, 0); builder.update_format(UpdateFormat::Csv); - builder.execute(content, |_| ()).unwrap(); + builder.execute(content, |_, _| ()).unwrap(); // In the same transaction we change the displayed fields to be only the age. - let mut builder = Settings::new(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, 0); builder.set_displayed_fields(vec!["age".into()]); - builder.execute(|_| ()).unwrap(); + builder.execute(|_, _| ()).unwrap(); wtxn.commit().unwrap(); // Check that the displayed fields are correctly set to only the "age" field. @@ -415,9 +423,9 @@ mod tests { // We reset the fields ids to become `None`, the default value. let mut wtxn = index.write_txn().unwrap(); - let mut builder = Settings::new(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, 0); builder.reset_displayed_fields(); - builder.execute(|_| ()).unwrap(); + builder.execute(|_, _| ()).unwrap(); wtxn.commit().unwrap(); // Check that the displayed fields are correctly set to `None` (default value). @@ -436,15 +444,15 @@ mod tests { // Set the faceted fields to be the age. let mut wtxn = index.write_txn().unwrap(); - let mut builder = Settings::new(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, 0); builder.set_faceted_fields(hashmap!{ "age".into() => "integer".into() }); - builder.execute(|_| ()).unwrap(); + builder.execute(|_, _| ()).unwrap(); // Then index some documents. let content = &b"name,age\nkevin,23\nkevina,21\nbenoit,34\n"[..]; - let mut builder = IndexDocuments::new(&mut wtxn, &index); + let mut builder = IndexDocuments::new(&mut wtxn, &index, 1); builder.update_format(UpdateFormat::Csv); - builder.execute(content, |_| ()).unwrap(); + builder.execute(content, |_, _| ()).unwrap(); wtxn.commit().unwrap(); // Check that the displayed fields are correctly set. @@ -459,9 +467,9 @@ mod tests { // Index a little more documents with new and current facets values. let mut wtxn = index.write_txn().unwrap(); let content = &b"name,age\nkevin2,23\nkevina2,21\nbenoit2,35\n"[..]; - let mut builder = IndexDocuments::new(&mut wtxn, &index); + let mut builder = IndexDocuments::new(&mut wtxn, &index, 2); builder.update_format(UpdateFormat::Csv); - builder.execute(content, |_| ()).unwrap(); + builder.execute(content, |_, _| ()).unwrap(); wtxn.commit().unwrap(); let rtxn = index.read_txn().unwrap(); @@ -480,14 +488,14 @@ mod tests { // Set all the settings except searchable let mut wtxn = index.write_txn().unwrap(); - let mut builder = Settings::new(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, 0); builder.set_displayed_fields(vec!["hello".to_string()]); builder.set_faceted_fields(hashmap!{ "age".into() => "integer".into(), "toto".into() => "integer".into(), }); builder.set_criteria(vec!["asc(toto)".to_string()]); - builder.execute(|_| ()).unwrap(); + builder.execute(|_, _| ()).unwrap(); wtxn.commit().unwrap(); // check the output @@ -500,9 +508,9 @@ mod tests { // We set toto and age as searchable to force reordering of the fields let mut wtxn = index.write_txn().unwrap(); - let mut builder = Settings::new(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, 1); builder.set_searchable_fields(vec!["toto".to_string(), "age".to_string()]); - builder.execute(|_| ()).unwrap(); + builder.execute(|_, _| ()).unwrap(); wtxn.commit().unwrap(); let rtxn = index.read_txn().unwrap(); diff --git a/src/update/update_builder.rs b/src/update/update_builder.rs index 43e3c28ed..8d6eb034d 100644 --- a/src/update/update_builder.rs +++ b/src/update/update_builder.rs @@ -13,10 +13,11 @@ pub struct UpdateBuilder<'a> { pub(crate) chunk_compression_level: Option, pub(crate) chunk_fusing_shrink_size: Option, pub(crate) thread_pool: Option<&'a ThreadPool>, + pub(crate) update_id: u64, } impl<'a> UpdateBuilder<'a> { - pub fn new() -> UpdateBuilder<'a> { + pub fn new(update_id: u64) -> UpdateBuilder<'a> { UpdateBuilder { log_every_n: None, max_nb_chunks: None, @@ -26,6 +27,7 @@ impl<'a> UpdateBuilder<'a> { chunk_compression_level: None, chunk_fusing_shrink_size: None, thread_pool: None, + update_id, } } @@ -67,7 +69,7 @@ impl<'a> UpdateBuilder<'a> { index: &'i Index, ) -> ClearDocuments<'t, 'u, 'i> { - ClearDocuments::new(wtxn, index) + ClearDocuments::new(wtxn, index, self.update_id) } pub fn delete_documents<'t, 'u, 'i>( @@ -76,7 +78,7 @@ impl<'a> UpdateBuilder<'a> { index: &'i Index, ) -> anyhow::Result> { - DeleteDocuments::new(wtxn, index) + DeleteDocuments::new(wtxn, index, self.update_id) } pub fn index_documents<'t, 'u, 'i>( @@ -85,7 +87,7 @@ impl<'a> UpdateBuilder<'a> { index: &'i Index, ) -> IndexDocuments<'t, 'u, 'i, 'a> { - let mut builder = IndexDocuments::new(wtxn, index); + let mut builder = IndexDocuments::new(wtxn, index, self.update_id); builder.log_every_n = self.log_every_n; builder.max_nb_chunks = self.max_nb_chunks; @@ -105,7 +107,7 @@ impl<'a> UpdateBuilder<'a> { index: &'i Index, ) -> Settings<'a, 't, 'u, 'i> { - let mut builder = Settings::new(wtxn, index); + let mut builder = Settings::new(wtxn, index, self.update_id); builder.log_every_n = self.log_every_n; builder.max_nb_chunks = self.max_nb_chunks; @@ -125,7 +127,7 @@ impl<'a> UpdateBuilder<'a> { index: &'i Index, ) -> Facets<'t, 'u, 'i> { - let mut builder = Facets::new(wtxn, index); + let mut builder = Facets::new(wtxn, index, self.update_id); builder.chunk_compression_type = self.chunk_compression_type; builder.chunk_compression_level = self.chunk_compression_level; @@ -134,9 +136,3 @@ impl<'a> UpdateBuilder<'a> { builder } } - -impl Default for UpdateBuilder<'_> { - fn default() -> Self { - Self::new() - } -} From 8f43698a6015a534f21fc5176fb4ae812a39e701 Mon Sep 17 00:00:00 2001 From: mpostma Date: Mon, 1 Feb 2021 14:38:04 +0100 Subject: [PATCH 4/4] fix httpui --- http-ui/src/main.rs | 2 +- src/search/facet/facet_condition.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/http-ui/src/main.rs b/http-ui/src/main.rs index 53c2cb460..a29eb8895 100644 --- a/http-ui/src/main.rs +++ b/http-ui/src/main.rs @@ -382,7 +382,7 @@ async fn main() -> anyhow::Result<()> { }); match result { - Ok(()) => wtxn.commit().map_err(Into::into), + Ok(_) => wtxn.commit().map_err(Into::into), Err(e) => Err(e.into()) } }, diff --git a/src/search/facet/facet_condition.rs b/src/search/facet/facet_condition.rs index 2f0444dce..42c2327a9 100644 --- a/src/search/facet/facet_condition.rs +++ b/src/search/facet/facet_condition.rs @@ -732,13 +732,13 @@ mod tests { // Set the faceted fields to be the channel. let mut wtxn = index.write_txn().unwrap(); - let mut builder = Settings::new(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, 0); builder.set_searchable_fields(vec!["channel".into(), "timestamp".into()]); // to keep the fields order builder.set_faceted_fields(hashmap!{ "channel".into() => "string".into(), "timestamp".into() => "integer".into(), }); - builder.execute(|_| ()).unwrap(); + builder.execute(|_, _| ()).unwrap(); wtxn.commit().unwrap(); // Test that the facet condition is correctly generated.