diff --git a/crates/benchmarks/benches/utils.rs b/crates/benchmarks/benches/utils.rs index defcbbfbb..5900db481 100644 --- a/crates/benchmarks/benches/utils.rs +++ b/crates/benchmarks/benches/utils.rs @@ -2,9 +2,8 @@ use std::fs::{create_dir_all, remove_dir_all, File}; use std::io::{self, BufReader, BufWriter, Read}; -use std::num::ParseFloatError; use std::path::Path; -use std::str::FromStr; +use std::str::FromStr as _; use anyhow::Context; use bumpalo::Bump; diff --git a/crates/meilisearch/src/lib.rs b/crates/meilisearch/src/lib.rs index 2f7c35305..633ad2776 100644 --- a/crates/meilisearch/src/lib.rs +++ b/crates/meilisearch/src/lib.rs @@ -448,12 +448,11 @@ fn import_dump( let builder = builder.with_embedders(embedders); - todo!("please plug the dump load of main"); - // let (builder, user_result) = builder.add_documents(reader)?; - // let user_result = user_result?; - // tracing::info!(documents_found = user_result, "{} documents found.", user_result); - // builder.execute()?; - // wtxn.commit()?; + let (builder, user_result) = builder.add_documents(reader)?; + let user_result = user_result?; + tracing::info!(documents_found = user_result, "{} documents found.", user_result); + builder.execute()?; + wtxn.commit()?; tracing::info!("All documents successfully imported."); } diff --git a/crates/milli/src/index.rs b/crates/milli/src/index.rs index a7601050d..e600739a9 100644 --- a/crates/milli/src/index.rs +++ b/crates/milli/src/index.rs @@ -1907,7 +1907,7 @@ pub(crate) mod tests { { "id": 2, "name": "bob", "age": 20 }, { "id": 2, "name": "bob", "age": 20 }, ]); - indexer.add_documents(&payload); + indexer.add_documents(&payload).unwrap(); let indexer_alloc = Bump::new(); let (document_changes, _operation_stats, primary_key) = indexer diff --git a/crates/milli/src/update/facet/bulk.rs b/crates/milli/src/update/facet/bulk.rs index d3f74170f..1ab8740ed 100644 --- a/crates/milli/src/update/facet/bulk.rs +++ b/crates/milli/src/update/facet/bulk.rs @@ -493,7 +493,7 @@ mod tests { } let documents = mmap_from_objects(documents); - index.add_documents(documents); + index.add_documents(documents).unwrap(); db_snap!(index, facet_id_f64_docids, "initial", @"c34f499261f3510d862fa0283bbe843a"); } diff --git a/crates/milli/src/update/index_documents/mod.rs b/crates/milli/src/update/index_documents/mod.rs index bea26db83..1bcb9d182 100644 --- a/crates/milli/src/update/index_documents/mod.rs +++ b/crates/milli/src/update/index_documents/mod.rs @@ -5,11 +5,13 @@ mod transform; mod typed_chunk; use std::collections::HashSet; +use std::io::{Read, Seek}; use std::iter; use std::num::NonZeroU32; use std::sync::Arc; use crossbeam_channel::{Receiver, Sender}; +use enrich::enrich_documents_batch; use grenad::{Merger, MergerBuilder}; use hashbrown::HashMap; use heed::types::Str; @@ -24,7 +26,8 @@ use typed_chunk::{write_typed_chunk_into_index, ChunkAccumulator, TypedChunk}; pub use self::enrich::{extract_finite_float_from_value, DocumentId}; pub use self::helpers::*; pub use self::transform::{Transform, TransformOutput}; -use crate::documents::obkv_to_object; +use super::new::StdResult; +use crate::documents::{obkv_to_object, DocumentsBatchReader}; use crate::error::{Error, InternalError}; use crate::thread_pool_no_abort::ThreadPoolNoAbortBuilder; pub use crate::update::index_documents::helpers::CursorClonableMmap; @@ -32,7 +35,7 @@ use crate::update::{ IndexerConfig, UpdateIndexingStep, WordPrefixDocids, WordPrefixIntegerDocids, WordsPrefixesFst, }; use crate::vector::{ArroyWrapper, EmbeddingConfigs}; -use crate::{CboRoaringBitmapCodec, Index, Result}; +use crate::{CboRoaringBitmapCodec, Index, Result, UserError}; static MERGED_DATABASE_COUNT: usize = 7; static PREFIX_DATABASE_COUNT: usize = 4; @@ -122,11 +125,76 @@ where }) } + /// Adds a batch of documents to the current builder. + /// + /// Since the documents are progressively added to the writer, a failure will cause only + /// return an error and not the `IndexDocuments` struct as it is invalid to use it afterward. + /// + /// Returns the number of documents added to the builder. + #[tracing::instrument(level = "trace", skip_all, target = "indexing::documents")] + pub fn add_documents( + mut self, + reader: DocumentsBatchReader, + ) -> Result<(Self, StdResult)> { + // Early return when there is no document to add + if reader.is_empty() { + return Ok((self, Ok(0))); + } + + // We check for user errors in this validator and if there is one, we can return + // the `IndexDocument` struct as it is valid to send more documents into it. + // However, if there is an internal error we throw it away! + let enriched_documents_reader = match enrich_documents_batch( + self.wtxn, + self.index, + self.config.autogenerate_docids, + reader, + )? { + Ok(reader) => reader, + Err(user_error) => return Ok((self, Err(user_error))), + }; + + let indexed_documents = + self.transform.as_mut().expect("Invalid document addition state").read_documents( + enriched_documents_reader, + self.wtxn, + &self.progress, + &self.should_abort, + )? as u64; + + self.added_documents += indexed_documents; + + Ok((self, Ok(indexed_documents))) + } + pub fn with_embedders(mut self, embedders: EmbeddingConfigs) -> Self { self.embedders = embedders; self } + #[tracing::instrument( + level = "trace" + skip_all, + target = "indexing::documents", + name = "index_documents" + )] + pub fn execute(mut self) -> Result { + if self.added_documents == 0 && self.deleted_documents == 0 { + let number_of_documents = self.index.number_of_documents(self.wtxn)?; + return Ok(DocumentAdditionResult { indexed_documents: 0, number_of_documents }); + } + let output = self + .transform + .take() + .expect("Invalid document addition state") + .output_from_sorter(self.wtxn, &self.progress)?; + + let indexed_documents = output.documents_count as u64; + let number_of_documents = self.execute_raw(output)?; + + Ok(DocumentAdditionResult { indexed_documents, number_of_documents }) + } + /// Returns the total number of documents in the index after the update. #[tracing::instrument( level = "trace", @@ -678,7 +746,7 @@ mod tests { use maplit::hashset; use super::*; - use crate::documents::{documents_batch_reader_from_objects, mmap_from_objects}; + use crate::documents::mmap_from_objects; use crate::index::tests::TempIndex; use crate::index::IndexEmbeddingConfig; use crate::search::TermsMatchingStrategy; @@ -2119,16 +2187,7 @@ mod tests { index.index_documents_config.update_method = IndexDocumentsMethod::UpdateDocuments; let mut wtxn = index.write_txn().unwrap(); - let local_pool; let indexer_config = &index.indexer_config; - let pool = match &indexer_config.thread_pool { - Some(pool) => pool, - None => { - local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); - &local_pool - } - }; - let rtxn = index.inner.read_txn().unwrap(); let db_fields_ids_map = index.inner.fields_ids_map(&rtxn).unwrap(); let mut new_fields_ids_map = db_fields_ids_map.clone(); @@ -2142,27 +2201,24 @@ mod tests { let indexer_alloc = Bump::new(); let embedders = EmbeddingConfigs::default(); let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); - indexer.add_documents(&documents); + indexer.add_documents(&documents).unwrap(); indexer.delete_documents(&["2"]); let (document_changes, _operation_stats, primary_key) = indexer .into_changes(&indexer_alloc, &index.inner, &rtxn, None, &mut new_fields_ids_map) .unwrap(); - pool.install(|| { - indexer::index( - &mut wtxn, - &index.inner, - indexer_config.grenad_parameters(), - &db_fields_ids_map, - new_fields_ids_map, - primary_key, - &document_changes, - embedders, - &|| false, - &|_| (), - ) - }) - .unwrap() + indexer::index( + &mut wtxn, + &index.inner, + indexer_config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + embedders, + &|| false, + &|_| (), + ) .unwrap(); wtxn.commit().unwrap(); @@ -2178,16 +2234,7 @@ mod tests { index.index_documents_config.update_method = IndexDocumentsMethod::UpdateDocuments; let mut wtxn = index.write_txn().unwrap(); - let local_pool; let indexer_config = &index.indexer_config; - let pool = match &indexer_config.thread_pool { - Some(pool) => pool, - None => { - local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); - &local_pool - } - }; - let rtxn = index.inner.read_txn().unwrap(); let db_fields_ids_map = index.inner.fields_ids_map(&rtxn).unwrap(); let mut new_fields_ids_map = db_fields_ids_map.clone(); @@ -2213,21 +2260,18 @@ mod tests { .into_changes(&indexer_alloc, &index.inner, &rtxn, None, &mut new_fields_ids_map) .unwrap(); - pool.install(|| { - indexer::index( - &mut wtxn, - &index.inner, - indexer_config.grenad_parameters(), - &db_fields_ids_map, - new_fields_ids_map, - primary_key, - &document_changes, - embedders, - &|| false, - &|_| (), - ) - }) - .unwrap() + indexer::index( + &mut wtxn, + &index.inner, + indexer_config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + embedders, + &|| false, + &|_| (), + ) .unwrap(); wtxn.commit().unwrap(); @@ -2238,19 +2282,10 @@ mod tests { #[test] fn add_document_and_in_another_transform_update_and_delete_documents() { - let mut index = TempIndex::new(); + let index = TempIndex::new(); let mut wtxn = index.write_txn().unwrap(); - let local_pool; let indexer_config = &index.indexer_config; - let pool = match &indexer_config.thread_pool { - Some(pool) => pool, - None => { - local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); - &local_pool - } - }; - let rtxn = index.inner.read_txn().unwrap(); let db_fields_ids_map = index.inner.fields_ids_map(&rtxn).unwrap(); let mut new_fields_ids_map = db_fields_ids_map.clone(); @@ -2263,27 +2298,24 @@ mod tests { let indexer_alloc = Bump::new(); let embedders = EmbeddingConfigs::default(); let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::UpdateDocuments); - indexer.add_documents(&documents); + indexer.add_documents(&documents).unwrap(); let (document_changes, _operation_stats, primary_key) = indexer .into_changes(&indexer_alloc, &index.inner, &rtxn, None, &mut new_fields_ids_map) .unwrap(); - pool.install(|| { - indexer::index( - &mut wtxn, - &index.inner, - indexer_config.grenad_parameters(), - &db_fields_ids_map, - new_fields_ids_map, - primary_key, - &document_changes, - embedders, - &|| false, - &|_| (), - ) - }) - .unwrap() + indexer::index( + &mut wtxn, + &index.inner, + indexer_config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + embedders, + &|| false, + &|_| (), + ) .unwrap(); wtxn.commit().unwrap(); @@ -2296,16 +2328,7 @@ mod tests { // A first batch of documents has been inserted let mut wtxn = index.write_txn().unwrap(); - let local_pool; let indexer_config = &index.indexer_config; - let pool = match &indexer_config.thread_pool { - Some(pool) => pool, - None => { - local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); - &local_pool - } - }; - let rtxn = index.inner.read_txn().unwrap(); let db_fields_ids_map = index.inner.fields_ids_map(&rtxn).unwrap(); let mut new_fields_ids_map = db_fields_ids_map.clone(); @@ -2317,28 +2340,25 @@ mod tests { let indexer_alloc = Bump::new(); let embedders = EmbeddingConfigs::default(); let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::UpdateDocuments); - indexer.add_documents(&documents); + indexer.add_documents(&documents).unwrap(); indexer.delete_documents(&["1", "2"]); let (document_changes, _operation_stats, primary_key) = indexer .into_changes(&indexer_alloc, &index.inner, &rtxn, None, &mut new_fields_ids_map) .unwrap(); - pool.install(|| { - indexer::index( - &mut wtxn, - &index.inner, - indexer_config.grenad_parameters(), - &db_fields_ids_map, - new_fields_ids_map, - primary_key, - &document_changes, - embedders, - &|| false, - &|_| (), - ) - }) - .unwrap() + indexer::index( + &mut wtxn, + &index.inner, + indexer_config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + embedders, + &|| false, + &|_| (), + ) .unwrap(); wtxn.commit().unwrap(); @@ -2349,19 +2369,10 @@ mod tests { #[test] fn delete_document_and_then_add_documents_in_the_same_transform() { - let mut index = TempIndex::new(); + let index = TempIndex::new(); let mut wtxn = index.write_txn().unwrap(); - let local_pool; let indexer_config = &index.indexer_config; - let pool = match &indexer_config.thread_pool { - Some(pool) => pool, - None => { - local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); - &local_pool - } - }; - let rtxn = index.inner.read_txn().unwrap(); let db_fields_ids_map = index.inner.fields_ids_map(&rtxn).unwrap(); let mut new_fields_ids_map = db_fields_ids_map.clone(); @@ -2375,27 +2386,24 @@ mod tests { { "id": 2, "doggo": { "name": "jean", "age": 20 } }, { "id": 3, "name": "bob", "age": 25 }, ]); - indexer.add_documents(&documents); + indexer.add_documents(&documents).unwrap(); let (document_changes, _operation_stats, primary_key) = indexer .into_changes(&indexer_alloc, &index.inner, &rtxn, None, &mut new_fields_ids_map) .unwrap(); - pool.install(|| { - indexer::index( - &mut wtxn, - &index.inner, - indexer_config.grenad_parameters(), - &db_fields_ids_map, - new_fields_ids_map, - primary_key, - &document_changes, - embedders, - &|| false, - &|_| (), - ) - }) - .unwrap() + indexer::index( + &mut wtxn, + &index.inner, + indexer_config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + embedders, + &|| false, + &|_| (), + ) .unwrap(); wtxn.commit().unwrap(); @@ -2407,19 +2415,10 @@ mod tests { #[test] fn delete_the_same_document_multiple_time() { - let mut index = TempIndex::new(); + let index = TempIndex::new(); let mut wtxn = index.write_txn().unwrap(); - let local_pool; let indexer_config = &index.indexer_config; - let pool = match &indexer_config.thread_pool { - Some(pool) => pool, - None => { - local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); - &local_pool - } - }; - let rtxn = index.inner.read_txn().unwrap(); let db_fields_ids_map = index.inner.fields_ids_map(&rtxn).unwrap(); let mut new_fields_ids_map = db_fields_ids_map.clone(); @@ -2435,7 +2434,7 @@ mod tests { { "id": 2, "doggo": { "name": "jean", "age": 20 } }, { "id": 3, "name": "bob", "age": 25 }, ]); - indexer.add_documents(&documents); + indexer.add_documents(&documents).unwrap(); indexer.delete_documents(&["1", "2", "1", "2"]); @@ -2443,21 +2442,18 @@ mod tests { .into_changes(&indexer_alloc, &index.inner, &rtxn, None, &mut new_fields_ids_map) .unwrap(); - pool.install(|| { - indexer::index( - &mut wtxn, - &index.inner, - indexer_config.grenad_parameters(), - &db_fields_ids_map, - new_fields_ids_map, - primary_key, - &document_changes, - embedders, - &|| false, - &|_| (), - ) - }) - .unwrap() + indexer::index( + &mut wtxn, + &index.inner, + indexer_config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + embedders, + &|| false, + &|_| (), + ) .unwrap(); wtxn.commit().unwrap(); @@ -2468,19 +2464,10 @@ mod tests { #[test] fn add_document_and_in_another_transform_delete_the_document_then_add_it_again() { - let mut index = TempIndex::new(); + let index = TempIndex::new(); let mut wtxn = index.write_txn().unwrap(); - let local_pool; let indexer_config = &index.indexer_config; - let pool = match &indexer_config.thread_pool { - Some(pool) => pool, - None => { - local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); - &local_pool - } - }; - let rtxn = index.inner.read_txn().unwrap(); let db_fields_ids_map = index.inner.fields_ids_map(&rtxn).unwrap(); let mut new_fields_ids_map = db_fields_ids_map.clone(); @@ -2492,27 +2479,24 @@ mod tests { let documents = documents!([ { "id": 1, "doggo": "kevin" }, ]); - indexer.add_documents(&documents); + indexer.add_documents(&documents).unwrap(); let (document_changes, _operation_stats, primary_key) = indexer .into_changes(&indexer_alloc, &index.inner, &rtxn, None, &mut new_fields_ids_map) .unwrap(); - pool.install(|| { - indexer::index( - &mut wtxn, - &index.inner, - indexer_config.grenad_parameters(), - &db_fields_ids_map, - new_fields_ids_map, - primary_key, - &document_changes, - embedders, - &|| false, - &|_| (), - ) - }) - .unwrap() + indexer::index( + &mut wtxn, + &index.inner, + indexer_config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + embedders, + &|| false, + &|_| (), + ) .unwrap(); wtxn.commit().unwrap(); @@ -2523,7 +2507,7 @@ mod tests { // A first batch of documents has been inserted let mut wtxn = index.write_txn().unwrap(); - + let indexer_config = &index.indexer_config; let rtxn = index.inner.read_txn().unwrap(); let db_fields_ids_map = index.inner.fields_ids_map(&rtxn).unwrap(); let mut new_fields_ids_map = db_fields_ids_map.clone(); @@ -2538,27 +2522,24 @@ mod tests { { "id": 1, "catto": "jorts" }, ]); - indexer.add_documents(&documents); + indexer.add_documents(&documents).unwrap(); let (document_changes, _operation_stats, primary_key) = indexer .into_changes(&indexer_alloc, &index.inner, &rtxn, None, &mut new_fields_ids_map) .unwrap(); - pool.install(|| { - indexer::index( - &mut wtxn, - &index.inner, - indexer_config.grenad_parameters(), - &db_fields_ids_map, - new_fields_ids_map, - primary_key, - &document_changes, - embedders, - &|| false, - &|_| (), - ) - }) - .unwrap() + indexer::index( + &mut wtxn, + &index.inner, + indexer_config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + embedders, + &|| false, + &|_| (), + ) .unwrap(); wtxn.commit().unwrap(); @@ -2711,17 +2692,7 @@ mod tests { println!("--- ENTERING BATCH 1"); let mut wtxn = index.write_txn().unwrap(); - - let local_pool; let indexer_config = &index.indexer_config; - let pool = match &indexer_config.thread_pool { - Some(pool) => pool, - None => { - local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); - &local_pool - } - }; - let rtxn = index.inner.read_txn().unwrap(); let db_fields_ids_map = index.inner.fields_ids_map(&rtxn).unwrap(); let mut new_fields_ids_map = db_fields_ids_map.clone(); @@ -2742,21 +2713,18 @@ mod tests { .into_changes(&indexer_alloc, &index.inner, &rtxn, None, &mut new_fields_ids_map) .unwrap(); - pool.install(|| { - indexer::index( - &mut wtxn, - &index.inner, - indexer_config.grenad_parameters(), - &db_fields_ids_map, - new_fields_ids_map, - primary_key, - &document_changes, - embedders, - &|| false, - &|_| (), - ) - }) - .unwrap() + indexer::index( + &mut wtxn, + &index.inner, + indexer_config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + embedders, + &|| false, + &|_| (), + ) .unwrap(); wtxn.commit().unwrap(); @@ -2775,7 +2743,7 @@ mod tests { println!("--- ENTERING BATCH 2"); let mut wtxn = index.write_txn().unwrap(); - + let indexer_config = &index.indexer_config; let rtxn = index.inner.read_txn().unwrap(); let db_fields_ids_map = index.inner.fields_ids_map(&rtxn).unwrap(); let mut new_fields_ids_map = db_fields_ids_map.clone(); @@ -2795,21 +2763,18 @@ mod tests { .into_changes(&indexer_alloc, &index.inner, &rtxn, None, &mut new_fields_ids_map) .unwrap(); - pool.install(|| { - indexer::index( - &mut wtxn, - &index.inner, - indexer_config.grenad_parameters(), - &db_fields_ids_map, - new_fields_ids_map, - primary_key, - &document_changes, - embedders, - &|| false, - &|_| (), - ) - }) - .unwrap() + indexer::index( + &mut wtxn, + &index.inner, + indexer_config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + embedders, + &|| false, + &|_| (), + ) .unwrap(); wtxn.commit().unwrap(); @@ -2827,7 +2792,7 @@ mod tests { println!("--- ENTERING BATCH 3"); let mut wtxn = index.write_txn().unwrap(); - + let indexer_config = &index.indexer_config; let rtxn = index.inner.read_txn().unwrap(); let db_fields_ids_map = index.inner.fields_ids_map(&rtxn).unwrap(); let mut new_fields_ids_map = db_fields_ids_map.clone(); @@ -2845,21 +2810,18 @@ mod tests { .into_changes(&indexer_alloc, &index.inner, &rtxn, None, &mut new_fields_ids_map) .unwrap(); - pool.install(|| { - indexer::index( - &mut wtxn, - &index.inner, - indexer_config.grenad_parameters(), - &db_fields_ids_map, - new_fields_ids_map, - primary_key, - &document_changes, - embedders, - &|| false, - &|_| (), - ) - }) - .unwrap() + indexer::index( + &mut wtxn, + &index.inner, + indexer_config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + embedders, + &|| false, + &|_| (), + ) .unwrap(); wtxn.commit().unwrap(); @@ -2913,8 +2875,7 @@ mod tests { .unwrap(); wtxn.commit().unwrap(); - let mut wtxn = index.write_txn().unwrap(); - // delete those documents, ids are synchronous therefore 0, 1, and 2. + let mut wtxn = index.write_txn().unwrap(); // delete those documents, ids are synchronous therefore 0, 1, and 2. index.delete_documents_using_wtxn(&mut wtxn, vec![S("0"), S("1"), S("2")]).unwrap(); wtxn.commit().unwrap(); @@ -2951,7 +2912,6 @@ mod tests { wtxn.commit().unwrap(); let mut wtxn = index.write_txn().unwrap(); - // Delete not all of the documents but some of them. index.delete_documents_using_wtxn(&mut wtxn, vec![S("0"), S("1")]).unwrap(); @@ -3287,7 +3247,6 @@ mod tests { let index = TempIndex::new(); let mut wtxn = index.write_txn().unwrap(); - index .update_settings_using_wtxn(&mut wtxn, |settings| { settings.set_primary_key(S("docid")); diff --git a/crates/milli/src/update/index_documents/transform.rs b/crates/milli/src/update/index_documents/transform.rs index a83a99ab8..1b041afcc 100644 --- a/crates/milli/src/update/index_documents/transform.rs +++ b/crates/milli/src/update/index_documents/transform.rs @@ -1,11 +1,14 @@ use std::borrow::Cow; +use std::collections::btree_map::Entry as BEntry; +use std::collections::hash_map::Entry as HEntry; use std::collections::{BTreeMap, HashMap, HashSet}; use std::fs::File; +use std::io::{Read, Seek}; use either::Either; use fxhash::FxHashMap; use itertools::Itertools; -use obkv::{KvReader, KvWriter}; +use obkv::{KvReader, KvReaderU16, KvWriter}; use roaring::RoaringBitmap; use serde_json::Value; use smartstring::SmartString; @@ -14,16 +17,17 @@ use super::helpers::{ create_sorter, sorter_into_reader, EitherObkvMerge, ObkvsKeepLastAdditionMergeDeletions, ObkvsMergeAdditionsAndDeletions, }; -use super::{IndexDocumentsMethod, IndexerConfig, KeepFirst}; -use crate::documents::DocumentsBatchIndex; +use super::{create_writer, IndexDocumentsMethod, IndexerConfig, KeepFirst}; +use crate::documents::{DocumentsBatchIndex, EnrichedDocument, EnrichedDocumentsBatchReader}; use crate::error::{Error, InternalError, UserError}; use crate::index::{db_name, main_key}; use crate::update::del_add::{ - into_del_add_obkv, into_del_add_obkv_conditional_operation, DelAddOperation, + into_del_add_obkv, into_del_add_obkv_conditional_operation, DelAdd, DelAddOperation, + KvReaderDelAdd, }; use crate::update::index_documents::GrenadParameters; -use crate::update::settings::InnerIndexSettingsDiff; -use crate::update::AvailableIds; +use crate::update::settings::{InnerIndexSettings, InnerIndexSettingsDiff}; +use crate::update::{AvailableIds, UpdateIndexingStep}; use crate::vector::parsed_vectors::{ExplicitVectors, VectorOrArrayOfVectors}; use crate::vector::settings::WriteBackToDocuments; use crate::vector::ArroyWrapper; @@ -48,8 +52,23 @@ pub struct TransformOutput { /// containing all those documents. pub struct Transform<'a, 'i> { pub index: &'i Index, + fields_ids_map: FieldsIdsMap, + indexer_settings: &'a IndexerConfig, pub index_documents_method: IndexDocumentsMethod, + available_documents_ids: AvailableIds, + + // Both grenad follows the same format: + // key | value + // u32 | 1 byte for the Operation byte, the rest is the obkv of the document stored + original_sorter: grenad::Sorter, + flattened_sorter: grenad::Sorter, + + replaced_documents_ids: RoaringBitmap, + new_documents_ids: RoaringBitmap, + // To increase the cache locality and decrease the heap usage we use compact smartstring. + new_external_documents_ids_builder: FxHashMap, u64>, + documents_count: usize, } /// This enum is specific to the grenad sorter stored in the transform. @@ -60,6 +79,29 @@ pub enum Operation { Deletion, } +/// Create a mapping between the field ids found in the document batch and the one that were +/// already present in the index. +/// +/// If new fields are present in the addition, they are added to the index field ids map. +fn create_fields_mapping( + index_field_map: &mut FieldsIdsMap, + batch_field_map: &DocumentsBatchIndex, +) -> Result> { + batch_field_map + .iter() + // we sort by id here to ensure a deterministic mapping of the fields, that preserves + // the original ordering. + .sorted_by_key(|(&id, _)| id) + .map(|(field, name)| match index_field_map.id(name) { + Some(id) => Ok((*field, id)), + None => index_field_map + .insert(name) + .ok_or(Error::UserError(UserError::AttributeLimitReached)) + .map(|id| (*field, id)), + }) + .collect() +} + impl<'a, 'i> Transform<'a, 'i> { pub fn new( wtxn: &mut heed::RwTxn<'_>, @@ -100,7 +142,224 @@ impl<'a, 'i> Transform<'a, 'i> { ); let documents_ids = index.documents_ids(wtxn)?; - Ok(Transform { index, indexer_settings, index_documents_method }) + Ok(Transform { + index, + fields_ids_map: index.fields_ids_map(wtxn)?, + indexer_settings, + available_documents_ids: AvailableIds::new(&documents_ids), + original_sorter, + flattened_sorter, + index_documents_method, + replaced_documents_ids: RoaringBitmap::new(), + new_documents_ids: RoaringBitmap::new(), + new_external_documents_ids_builder: FxHashMap::default(), + documents_count: 0, + }) + } + + #[tracing::instrument(level = "trace", skip_all, target = "indexing::documents")] + pub fn read_documents( + &mut self, + reader: EnrichedDocumentsBatchReader, + wtxn: &mut heed::RwTxn<'_>, + progress_callback: FP, + should_abort: FA, + ) -> Result + where + R: Read + Seek, + FP: Fn(UpdateIndexingStep) + Sync, + FA: Fn() -> bool + Sync, + { + let (mut cursor, fields_index) = reader.into_cursor_and_fields_index(); + let external_documents_ids = self.index.external_documents_ids(); + let mapping = create_fields_mapping(&mut self.fields_ids_map, &fields_index)?; + + let primary_key = cursor.primary_key().to_string(); + let primary_key_id = + self.fields_ids_map.insert(&primary_key).ok_or(UserError::AttributeLimitReached)?; + + let mut obkv_buffer = Vec::new(); + let mut document_sorter_value_buffer = Vec::new(); + let mut document_sorter_key_buffer = Vec::new(); + let mut documents_count = 0; + let mut docid_buffer: Vec = Vec::new(); + let mut field_buffer: Vec<(u16, Cow<'_, [u8]>)> = Vec::new(); + while let Some(enriched_document) = cursor.next_enriched_document()? { + let EnrichedDocument { document, document_id } = enriched_document; + + if should_abort() { + return Err(Error::InternalError(InternalError::AbortedIndexation)); + } + + // drop_and_reuse is called instead of .clear() to communicate to the compiler that field_buffer + // does not keep references from the cursor between loop iterations + let mut field_buffer_cache = drop_and_reuse(field_buffer); + if self.indexer_settings.log_every_n.map_or(false, |len| documents_count % len == 0) { + progress_callback(UpdateIndexingStep::RemapDocumentAddition { + documents_seen: documents_count, + }); + } + + // When the document id has been auto-generated by the `enrich_documents_batch` + // we must insert this document id into the remaped document. + let external_id = document_id.value(); + if document_id.is_generated() { + serde_json::to_writer(&mut docid_buffer, external_id) + .map_err(InternalError::SerdeJson)?; + field_buffer_cache.push((primary_key_id, Cow::from(&docid_buffer))); + } + + for (k, v) in document.iter() { + let mapped_id = + *mapping.get(&k).ok_or(InternalError::FieldIdMappingMissingEntry { key: k })?; + field_buffer_cache.push((mapped_id, Cow::from(v))); + } + + // Insertion in a obkv need to be done with keys ordered. For now they are ordered + // according to the document addition key order, so we sort it according to the + // fieldids map keys order. + field_buffer_cache.sort_unstable_by(|(f1, _), (f2, _)| f1.cmp(f2)); + + // Build the new obkv document. + let mut writer = KvWriter::new(&mut obkv_buffer); + for (k, v) in field_buffer_cache.iter() { + writer.insert(*k, v)?; + } + + let mut original_docid = None; + let docid = match self.new_external_documents_ids_builder.entry((*external_id).into()) { + HEntry::Occupied(entry) => *entry.get() as u32, + HEntry::Vacant(entry) => { + let docid = match external_documents_ids.get(wtxn, entry.key())? { + Some(docid) => { + // If it was already in the list of replaced documents it means it was deleted + // by the remove_document method. We should starts as if it never existed. + if self.replaced_documents_ids.insert(docid) { + original_docid = Some(docid); + } + + docid + } + None => self + .available_documents_ids + .next() + .ok_or(UserError::DocumentLimitReached)?, + }; + entry.insert(docid as u64); + docid + } + }; + + let mut skip_insertion = false; + if let Some(original_docid) = original_docid { + let original_key = original_docid; + let base_obkv = self + .index + .documents + .remap_data_type::() + .get(wtxn, &original_key)? + .ok_or(InternalError::DatabaseMissingEntry { + db_name: db_name::DOCUMENTS, + key: None, + })?; + + // we check if the two documents are exactly equal. If it's the case we can skip this document entirely + if base_obkv == obkv_buffer { + // we're not replacing anything + self.replaced_documents_ids.remove(original_docid); + // and we need to put back the original id as it was before + self.new_external_documents_ids_builder.remove(external_id); + skip_insertion = true; + } else { + // we associate the base document with the new key, everything will get merged later. + let deladd_operation = match self.index_documents_method { + IndexDocumentsMethod::UpdateDocuments => { + DelAddOperation::DeletionAndAddition + } + IndexDocumentsMethod::ReplaceDocuments => DelAddOperation::Deletion, + }; + document_sorter_key_buffer.clear(); + document_sorter_key_buffer.extend_from_slice(&docid.to_be_bytes()); + document_sorter_key_buffer.extend_from_slice(external_id.as_bytes()); + document_sorter_value_buffer.clear(); + document_sorter_value_buffer.push(Operation::Addition as u8); + into_del_add_obkv( + KvReaderU16::from_slice(base_obkv), + deladd_operation, + &mut document_sorter_value_buffer, + )?; + self.original_sorter + .insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?; + let base_obkv = KvReader::from_slice(base_obkv); + if let Some(flattened_obkv) = + Self::flatten_from_fields_ids_map(base_obkv, &mut self.fields_ids_map)? + { + // we recreate our buffer with the flattened documents + document_sorter_value_buffer.clear(); + document_sorter_value_buffer.push(Operation::Addition as u8); + into_del_add_obkv( + KvReaderU16::from_slice(&flattened_obkv), + deladd_operation, + &mut document_sorter_value_buffer, + )?; + } + self.flattened_sorter + .insert(docid.to_be_bytes(), &document_sorter_value_buffer)?; + } + } + + if !skip_insertion { + self.new_documents_ids.insert(docid); + + document_sorter_key_buffer.clear(); + document_sorter_key_buffer.extend_from_slice(&docid.to_be_bytes()); + document_sorter_key_buffer.extend_from_slice(external_id.as_bytes()); + document_sorter_value_buffer.clear(); + document_sorter_value_buffer.push(Operation::Addition as u8); + into_del_add_obkv( + KvReaderU16::from_slice(&obkv_buffer), + DelAddOperation::Addition, + &mut document_sorter_value_buffer, + )?; + // We use the extracted/generated user id as the key for this document. + self.original_sorter + .insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?; + + let flattened_obkv = KvReader::from_slice(&obkv_buffer); + if let Some(obkv) = + Self::flatten_from_fields_ids_map(flattened_obkv, &mut self.fields_ids_map)? + { + document_sorter_value_buffer.clear(); + document_sorter_value_buffer.push(Operation::Addition as u8); + into_del_add_obkv( + KvReaderU16::from_slice(&obkv), + DelAddOperation::Addition, + &mut document_sorter_value_buffer, + )? + } + self.flattened_sorter.insert(docid.to_be_bytes(), &document_sorter_value_buffer)?; + } + documents_count += 1; + + progress_callback(UpdateIndexingStep::RemapDocumentAddition { + documents_seen: documents_count, + }); + + field_buffer = drop_and_reuse(field_buffer_cache); + docid_buffer.clear(); + obkv_buffer.clear(); + } + + progress_callback(UpdateIndexingStep::RemapDocumentAddition { + documents_seen: documents_count, + }); + + self.index.put_fields_ids_map(wtxn, &self.fields_ids_map)?; + self.index.put_primary_key(wtxn, &primary_key)?; + self.documents_count += documents_count; + // Now that we have a valid sorter that contains the user id and the obkv we + // give it to the last transforming function which returns the TransformOutput. + Ok(documents_count) } // Flatten a document from the fields ids map contained in self and insert the new @@ -230,6 +489,167 @@ impl<'a, 'i> Transform<'a, 'i> { Ok(()) } + /// Generate the `TransformOutput` based on the given sorter that can be generated from any + /// format like CSV, JSON or JSON stream. This sorter must contain a key that is the document + /// id for the user side and the value must be an obkv where keys are valid fields ids. + #[tracing::instrument(level = "trace", skip_all, target = "indexing::transform")] + pub(crate) fn output_from_sorter( + self, + wtxn: &mut heed::RwTxn<'_>, + progress_callback: F, + ) -> Result + where + F: Fn(UpdateIndexingStep) + Sync, + { + let primary_key = self + .index + .primary_key(wtxn)? + .ok_or(Error::InternalError(InternalError::DatabaseMissingEntry { + db_name: db_name::MAIN, + key: Some(main_key::PRIMARY_KEY_KEY), + }))? + .to_string(); + + // We create a final writer to write the new documents in order from the sorter. + let mut writer = create_writer( + self.indexer_settings.chunk_compression_type, + self.indexer_settings.chunk_compression_level, + tempfile::tempfile()?, + ); + + // To compute the field distribution we need to; + // 1. Remove all the deleted documents from the field distribution + // 2. Add all the new documents to the field distribution + let mut field_distribution = self.index.field_distribution(wtxn)?; + + // Here we are going to do the document count + field distribution + `write_into_stream_writer` + let mut iter = self.original_sorter.into_stream_merger_iter()?; + // used only for the callback + let mut documents_count = 0; + + while let Some((key, val)) = iter.next()? { + // skip first byte corresponding to the operation type (Deletion or Addition). + let val = &val[1..]; + + // send a callback to show at which step we are + documents_count += 1; + progress_callback(UpdateIndexingStep::ComputeIdsAndMergeDocuments { + documents_seen: documents_count, + total_documents: self.documents_count, + }); + + for (key, value) in KvReader::from_slice(val) { + let reader = KvReaderDelAdd::from_slice(value); + match (reader.get(DelAdd::Deletion), reader.get(DelAdd::Addition)) { + (None, None) => (), + (None, Some(_)) => { + // New field + let name = self.fields_ids_map.name(key).ok_or( + FieldIdMapMissingEntry::FieldId { + field_id: key, + process: "Computing field distribution in transform.", + }, + )?; + *field_distribution.entry(name.to_string()).or_insert(0) += 1; + } + (Some(_), None) => { + // Field removed + let name = self.fields_ids_map.name(key).ok_or( + FieldIdMapMissingEntry::FieldId { + field_id: key, + process: "Computing field distribution in transform.", + }, + )?; + match field_distribution.entry(name.to_string()) { + BEntry::Vacant(_) => { /* Bug? trying to remove a non-existing field */ + } + BEntry::Occupied(mut entry) => { + // attempt to remove one + match entry.get_mut().checked_sub(1) { + Some(0) => { + entry.remove(); + } + Some(new_val) => { + *entry.get_mut() = new_val; + } + None => { + unreachable!("Attempting to remove a field that wasn't in the field distribution") + } + } + } + } + } + (Some(_), Some(_)) => { + // Value change, no field distribution change + } + } + } + writer.insert(key, val)?; + } + + let mut original_documents = writer.into_inner()?; + // We then extract the file and reset the seek to be able to read it again. + original_documents.rewind()?; + + // We create a final writer to write the new documents in order from the sorter. + let mut writer = create_writer( + self.indexer_settings.chunk_compression_type, + self.indexer_settings.chunk_compression_level, + tempfile::tempfile()?, + ); + + // Once we have written all the documents into the final sorter, we write the nested documents + // into this writer. + // We get rids of the `Operation` byte and skip the deleted documents as well. + let mut iter = self.flattened_sorter.into_stream_merger_iter()?; + while let Some((key, val)) = iter.next()? { + // skip first byte corresponding to the operation type (Deletion or Addition). + let val = &val[1..]; + writer.insert(key, val)?; + } + let mut flattened_documents = writer.into_inner()?; + flattened_documents.rewind()?; + + let mut new_external_documents_ids_builder: Vec<_> = + self.new_external_documents_ids_builder.into_iter().collect(); + + new_external_documents_ids_builder + .sort_unstable_by(|(left, _), (right, _)| left.cmp(right)); + let mut fst_new_external_documents_ids_builder = fst::MapBuilder::memory(); + new_external_documents_ids_builder.into_iter().try_for_each(|(key, value)| { + fst_new_external_documents_ids_builder.insert(key, value) + })?; + + let old_inner_settings = InnerIndexSettings::from_index(self.index, wtxn)?; + let fields_ids_map = self.fields_ids_map; + let primary_key_id = self.index.primary_key(wtxn)?.and_then(|name| fields_ids_map.id(name)); + let mut new_inner_settings = old_inner_settings.clone(); + new_inner_settings.fields_ids_map = fields_ids_map; + + let embedding_config_updates = Default::default(); + let settings_update_only = false; + let settings_diff = InnerIndexSettingsDiff::new( + old_inner_settings, + new_inner_settings, + primary_key_id, + embedding_config_updates, + settings_update_only, + ); + + Ok(TransformOutput { + primary_key, + settings_diff, + field_distribution, + documents_count: self.documents_count, + original_documents: Some( + original_documents.into_inner().map_err(|err| err.into_error())?, + ), + flattened_documents: Some( + flattened_documents.into_inner().map_err(|err| err.into_error())?, + ), + }) + } + /// Rebind the field_ids of the provided document to their values /// based on the field_ids_maps difference between the old and the new settings, /// then fill the provided buffers with delta documents using KvWritterDelAdd. diff --git a/crates/milli/tests/search/facet_distribution.rs b/crates/milli/tests/search/facet_distribution.rs index 4347e310a..c665109db 100644 --- a/crates/milli/tests/search/facet_distribution.rs +++ b/crates/milli/tests/search/facet_distribution.rs @@ -1,17 +1,13 @@ -use std::io::Cursor; - use big_s::S; use bumpalo::Bump; use heed::EnvOpenOptions; use maplit::hashset; -use milli::documents::{mmap_from_objects, DocumentsBatchBuilder, DocumentsBatchReader}; +use milli::documents::mmap_from_objects; use milli::update::new::indexer; -use milli::update::{ - IndexDocuments, IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, Settings, -}; +use milli::update::{IndexDocumentsMethod, IndexerConfig, Settings}; use milli::vector::EmbeddingConfigs; use milli::{FacetDistribution, Index, Object, OrderBy}; -use serde_json::{from_value, json, Deserializer}; +use serde_json::{from_value, json}; #[test] fn test_facet_distribution_with_no_facet_values() { diff --git a/crates/milli/tests/search/query_criteria.rs b/crates/milli/tests/search/query_criteria.rs index 18c8b0bac..9b54c13b9 100644 --- a/crates/milli/tests/search/query_criteria.rs +++ b/crates/milli/tests/search/query_criteria.rs @@ -1,16 +1,12 @@ use std::cmp::Reverse; -use std::io::Cursor; use big_s::S; use bumpalo::Bump; use heed::EnvOpenOptions; use itertools::Itertools; use maplit::hashset; -use milli::documents::{DocumentsBatchBuilder, DocumentsBatchReader}; use milli::update::new::indexer; -use milli::update::{ - IndexDocuments, IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, Settings, -}; +use milli::update::{IndexDocumentsMethod, IndexerConfig, Settings}; use milli::vector::EmbeddingConfigs; use milli::{AscDesc, Criterion, Index, Member, Search, SearchResult, TermsMatchingStrategy}; use rand::Rng; diff --git a/crates/milli/tests/search/typo_tolerance.rs b/crates/milli/tests/search/typo_tolerance.rs index 535c1c3d7..581aa3c6f 100644 --- a/crates/milli/tests/search/typo_tolerance.rs +++ b/crates/milli/tests/search/typo_tolerance.rs @@ -4,7 +4,7 @@ use bumpalo::Bump; use heed::EnvOpenOptions; use milli::documents::mmap_from_objects; use milli::update::new::indexer; -use milli::update::{IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, Settings}; +use milli::update::{IndexDocumentsMethod, IndexerConfig, Settings}; use milli::vector::EmbeddingConfigs; use milli::{Criterion, Index, Object, Search, TermsMatchingStrategy}; use serde_json::from_value;