diff --git a/Cargo.lock b/Cargo.lock index e78372421..c04b4f48a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2041,7 +2041,9 @@ name = "fuzzers" version = "1.11.0" dependencies = [ "arbitrary", + "bumpalo", "clap", + "either", "fastrand", "milli", "serde", diff --git a/crates/benchmarks/benches/utils.rs b/crates/benchmarks/benches/utils.rs index b848560ad..01bc454a3 100644 --- a/crates/benchmarks/benches/utils.rs +++ b/crates/benchmarks/benches/utils.rs @@ -140,7 +140,7 @@ pub fn run_benches(c: &mut criterion::Criterion, confs: &[Conf]) { } } -pub fn documents_from(filename: &str, filetype: &str) -> DocumentsBatchReader { +pub fn documents_from(filename: &str, filetype: &str) -> Mmap { let reader = File::open(filename) .unwrap_or_else(|_| panic!("could not find the dataset in: {}", filename)); let reader = BufReader::new(reader); diff --git a/crates/fuzzers/Cargo.toml b/crates/fuzzers/Cargo.toml index 6ebd37906..86a0f779d 100644 --- a/crates/fuzzers/Cargo.toml +++ b/crates/fuzzers/Cargo.toml @@ -12,7 +12,9 @@ license.workspace = true [dependencies] arbitrary = { version = "1.3.2", features = ["derive"] } +bumpalo = "3.16.0" clap = { version = "4.5.9", features = ["derive"] } +either = "1.13.0" fastrand = "2.1.0" milli = { path = "../milli" } serde = { version = "1.0.204", features = ["derive"] } diff --git a/crates/fuzzers/src/bin/fuzz-indexing.rs b/crates/fuzzers/src/bin/fuzz-indexing.rs index baf705709..3c4aea576 100644 --- a/crates/fuzzers/src/bin/fuzz-indexing.rs +++ b/crates/fuzzers/src/bin/fuzz-indexing.rs @@ -4,11 +4,17 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::time::Duration; use arbitrary::{Arbitrary, Unstructured}; +use bumpalo::Bump; use clap::Parser; +use either::Either; use fuzzers::Operation; +use milli::documents::mmap_from_objects; use milli::heed::EnvOpenOptions; -use milli::update::{IndexDocuments, IndexDocumentsConfig, IndexerConfig}; +use milli::update::new::indexer; +use milli::update::{IndexDocumentsMethod, IndexerConfig}; +use milli::vector::EmbeddingConfigs; use milli::Index; +use serde_json::Value; use tempfile::TempDir; #[derive(Debug, Arbitrary)] @@ -58,7 +64,6 @@ fn main() { }; let index = Index::new(options, tempdir.path()).unwrap(); let indexer_config = IndexerConfig::default(); - let index_documents_config = IndexDocumentsConfig::default(); std::thread::scope(|s| { loop { @@ -75,38 +80,69 @@ fn main() { let handle = s.spawn(|| { let mut wtxn = index.write_txn().unwrap(); + let rtxn = index.read_txn().unwrap(); for batch in batches { - let mut builder = IndexDocuments::new( - &mut wtxn, - &index, - &indexer_config, - index_documents_config.clone(), - |_| (), - || false, - ) - .unwrap(); + let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); + let mut new_fields_ids_map = db_fields_ids_map.clone(); + let indexer_alloc = Bump::new(); + let embedders = EmbeddingConfigs::default(); + let mut indexer = indexer::DocumentOperation::new( + IndexDocumentsMethod::ReplaceDocuments, + ); + + let mut operations = Vec::new(); for op in batch.0 { match op { Operation::AddDoc(doc) => { - let documents = - milli::documents::objects_from_json_value(doc.to_d()); - let documents = - milli::documents::documents_batch_reader_from_objects( - documents, - ); - let (b, _added) = builder.add_documents(documents).unwrap(); - builder = b; + let object = match doc.to_d() { + Value::Object(object) => object, + _ => unreachable!(), + }; + let documents = mmap_from_objects(vec![object]); + operations.push(Either::Left(documents)); } Operation::DeleteDoc(id) => { - let (b, _removed) = - builder.remove_documents(vec![id.to_s()]).unwrap(); - builder = b; + let id = indexer_alloc.alloc_str(&id.to_s()); + let ids = indexer_alloc.alloc_slice_copy(&[&*id]); + operations.push(Either::Right(ids)); } } } - builder.execute().unwrap(); + + for op in &operations { + match op { + Either::Left(documents) => { + indexer.add_documents(documents).unwrap() + } + Either::Right(ids) => indexer.delete_documents(ids), + } + } + + let (document_changes, _operation_stats, primary_key) = indexer + .into_changes( + &indexer_alloc, + &index, + &rtxn, + None, + &mut new_fields_ids_map, + ) + .unwrap(); + + indexer::index( + &mut wtxn, + &index, + indexer_config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + embedders, + &|| false, + &|_| (), + ) + .unwrap(); // after executing a batch we check if the database is corrupted let res = index.search(&wtxn).execute().unwrap(); diff --git a/crates/meilisearch/src/lib.rs b/crates/meilisearch/src/lib.rs index 633ad2776..2f7c35305 100644 --- a/crates/meilisearch/src/lib.rs +++ b/crates/meilisearch/src/lib.rs @@ -448,11 +448,12 @@ fn import_dump( let builder = builder.with_embedders(embedders); - let (builder, user_result) = builder.add_documents(reader)?; - let user_result = user_result?; - tracing::info!(documents_found = user_result, "{} documents found.", user_result); - builder.execute()?; - wtxn.commit()?; + todo!("please plug the dump load of main"); + // let (builder, user_result) = builder.add_documents(reader)?; + // let user_result = user_result?; + // tracing::info!(documents_found = user_result, "{} documents found.", user_result); + // builder.execute()?; + // wtxn.commit()?; tracing::info!("All documents successfully imported."); } diff --git a/crates/milli/examples/index.rs b/crates/milli/examples/index.rs deleted file mode 100644 index 781440b56..000000000 --- a/crates/milli/examples/index.rs +++ /dev/null @@ -1,114 +0,0 @@ -use std::error::Error; -use std::fs::File; -use std::io::{BufRead, BufReader, Cursor, Seek}; -use std::path::Path; - -use heed::EnvOpenOptions; -use milli::documents::{DocumentsBatchBuilder, DocumentsBatchReader}; -use milli::update::{IndexDocuments, IndexDocumentsConfig, IndexerConfig, Settings}; -use milli::{Index, Object}; - -fn usage(error: &str, program_name: &str) -> String { - format!( - "{}. Usage: {} [searchable_fields] [filterable_fields]", - error, program_name - ) -} - -fn main() -> Result<(), Box> { - let mut args = std::env::args(); - let program_name = args.next().expect("No program name"); - let index_path = - args.next().unwrap_or_else(|| panic!("{}", usage("Missing path to index.", &program_name))); - let dataset_path = args - .next() - .unwrap_or_else(|| panic!("{}", usage("Missing path to source dataset.", &program_name))); - // let primary_key = args.next().unwrap_or_else(|| "id".into()); - // "title overview" - let searchable_fields: Vec = args - .next() - .map(|arg| arg.split_whitespace().map(ToString::to_string).collect()) - .unwrap_or_default(); - - println!("{searchable_fields:?}"); - // "release_date genres" - let filterable_fields: Vec = args - .next() - .map(|arg| arg.split_whitespace().map(ToString::to_string).collect()) - .unwrap_or_default(); - - let mut options = EnvOpenOptions::new(); - options.map_size(100 * 1024 * 1024 * 1024); // 100 GB - - std::fs::create_dir_all(&index_path).unwrap(); - let index = Index::new(options, index_path).unwrap(); - let mut wtxn = index.write_txn().unwrap(); - - let config = IndexerConfig::default(); - let mut builder = Settings::new(&mut wtxn, &index, &config); - // builder.set_primary_key(primary_key); - let searchable_fields = searchable_fields.iter().map(|s| s.to_string()).collect(); - builder.set_searchable_fields(searchable_fields); - let filterable_fields = filterable_fields.iter().map(|s| s.to_string()).collect(); - builder.set_filterable_fields(filterable_fields); - - builder.execute(|_| (), || false).unwrap(); - - let config = IndexerConfig::default(); - let indexing_config = IndexDocumentsConfig::default(); - - let builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| (), || false).unwrap(); - - let documents = documents_from( - &dataset_path, - Path::new(&dataset_path).extension().unwrap_or_default().to_str().unwrap_or_default(), - ); - let (builder, user_error) = builder.add_documents(documents).unwrap(); - user_error.unwrap(); - builder.execute().unwrap(); - wtxn.commit().unwrap(); - - index.prepare_for_closing().wait(); - Ok(()) -} -fn documents_from(filename: &str, filetype: &str) -> DocumentsBatchReader { - let reader = File::open(filename) - .unwrap_or_else(|_| panic!("could not find the dataset in: {}", filename)); - let reader = BufReader::new(reader); - let documents = match filetype { - "csv" => documents_from_csv(reader).unwrap(), - "json" => documents_from_json(reader).unwrap(), - "jsonl" => documents_from_jsonl(reader).unwrap(), - otherwise => panic!("invalid update format {:?}", otherwise), - }; - DocumentsBatchReader::from_reader(Cursor::new(documents)).unwrap() -} - -fn documents_from_jsonl(reader: impl BufRead) -> milli::Result> { - let mut documents = DocumentsBatchBuilder::new(Vec::new()); - - for result in serde_json::Deserializer::from_reader(reader).into_iter::() { - let object = result.unwrap(); - documents.append_json_object(&object)?; - } - - documents.into_inner().map_err(Into::into) -} - -fn documents_from_json(reader: impl BufRead) -> milli::Result> { - let mut documents = DocumentsBatchBuilder::new(Vec::new()); - - documents.append_json_array(reader)?; - - documents.into_inner().map_err(Into::into) -} - -fn documents_from_csv(reader: impl BufRead) -> milli::Result> { - let csv = csv::Reader::from_reader(reader); - - let mut documents = DocumentsBatchBuilder::new(Vec::new()); - documents.append_csv(csv)?; - - documents.into_inner().map_err(Into::into) -} diff --git a/crates/milli/examples/search.rs b/crates/milli/examples/search.rs deleted file mode 100644 index bb374f629..000000000 --- a/crates/milli/examples/search.rs +++ /dev/null @@ -1,124 +0,0 @@ -use std::error::Error; -use std::io::stdin; -use std::path::Path; -use std::time::Instant; - -use heed::EnvOpenOptions; -use milli::{ - execute_search, filtered_universe, DefaultSearchLogger, GeoSortStrategy, Index, SearchContext, - SearchLogger, TermsMatchingStrategy, TimeBudget, -}; - -#[global_allocator] -static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc; - -fn main() -> Result<(), Box> { - let mut args = std::env::args(); - let program_name = args.next().expect("No program name"); - let dataset = args.next().unwrap_or_else(|| { - panic!( - "Missing path to index. Usage: {} [] [print-documents]", - program_name - ) - }); - let detailed_logger_dir = args.next(); - let print_documents: bool = - if let Some(arg) = args.next() { arg == "print-documents" } else { false }; - - let mut options = EnvOpenOptions::new(); - options.map_size(100 * 1024 * 1024 * 1024); // 100 GB - - let index = Index::new(options, dataset)?; - let txn = index.read_txn()?; - let mut query = String::new(); - while stdin().read_line(&mut query)? > 0 { - for _ in 0..2 { - let mut default_logger = DefaultSearchLogger; - // FIXME: consider resetting the state of the logger between search executions as otherwise panics are possible. - // Workaround'd here by recreating the logger on each iteration of the loop - let mut detailed_logger = detailed_logger_dir - .as_ref() - .map(|logger_dir| (milli::VisualSearchLogger::default(), logger_dir)); - let logger: &mut dyn SearchLogger<_> = - if let Some((detailed_logger, _)) = detailed_logger.as_mut() { - detailed_logger - } else { - &mut default_logger - }; - - let start = Instant::now(); - - let mut ctx = SearchContext::new(&index, &txn)?; - let universe = filtered_universe(ctx.index, ctx.txn, &None)?; - - let docs = execute_search( - &mut ctx, - (!query.trim().is_empty()).then(|| query.trim()), - TermsMatchingStrategy::Last, - milli::score_details::ScoringStrategy::Skip, - false, - universe, - &None, - &None, - GeoSortStrategy::default(), - 0, - 20, - None, - &mut DefaultSearchLogger, - logger, - TimeBudget::max(), - None, - None, - )?; - if let Some((logger, dir)) = detailed_logger { - logger.finish(&mut ctx, Path::new(dir))?; - } - let elapsed = start.elapsed(); - println!("new: {}us, docids: {:?}", elapsed.as_micros(), docs.documents_ids); - if print_documents { - let documents = index - .documents(&txn, docs.documents_ids.iter().copied()) - .unwrap() - .into_iter() - .map(|(id, obkv)| { - let mut object = serde_json::Map::default(); - for (fid, fid_name) in index.fields_ids_map(&txn).unwrap().iter() { - let value = obkv.get(fid).unwrap(); - let value: serde_json::Value = serde_json::from_slice(value).unwrap(); - object.insert(fid_name.to_owned(), value); - } - (id, serde_json::to_string_pretty(&object).unwrap()) - }) - .collect::>(); - - for (id, document) in documents { - println!("{id}:"); - println!("{document}"); - } - - let documents = index - .documents(&txn, docs.documents_ids.iter().copied()) - .unwrap() - .into_iter() - .map(|(id, obkv)| { - let mut object = serde_json::Map::default(); - for (fid, fid_name) in index.fields_ids_map(&txn).unwrap().iter() { - let value = obkv.get(fid).unwrap(); - let value: serde_json::Value = serde_json::from_slice(value).unwrap(); - object.insert(fid_name.to_owned(), value); - } - (id, serde_json::to_string_pretty(&object).unwrap()) - }) - .collect::>(); - println!("{}us: {:?}", elapsed.as_micros(), docs.documents_ids); - for (id, document) in documents { - println!("{id}:"); - println!("{document}"); - } - } - } - query.clear(); - } - - Ok(()) -} diff --git a/crates/milli/examples/settings.rs b/crates/milli/examples/settings.rs deleted file mode 100644 index c7f4780cb..000000000 --- a/crates/milli/examples/settings.rs +++ /dev/null @@ -1,33 +0,0 @@ -// use big_s::S; -use heed::EnvOpenOptions; -// use maplit::hashset; -use milli::{ - update::{IndexerConfig, Settings}, - Criterion, Index, -}; - -fn main() { - let mut options = EnvOpenOptions::new(); - options.map_size(100 * 1024 * 1024 * 1024); // 100 GB - - let index = Index::new(options, "data_movies.ms").unwrap(); - let mut wtxn = index.write_txn().unwrap(); - - let config = IndexerConfig::default(); - let mut builder = Settings::new(&mut wtxn, &index, &config); - - // builder.set_min_word_len_one_typo(5); - // builder.set_min_word_len_two_typos(7); - // builder.set_sortable_fields(hashset! { S("release_date") }); - builder.set_criteria(vec![ - Criterion::Words, - Criterion::Typo, - Criterion::Proximity, - Criterion::Attribute, - Criterion::Sort, - Criterion::Exactness, - ]); - - builder.execute(|_| (), || false).unwrap(); - wtxn.commit().unwrap(); -} diff --git a/crates/milli/src/documents/mod.rs b/crates/milli/src/documents/mod.rs index 001e2293a..64ad5dbab 100644 --- a/crates/milli/src/documents/mod.rs +++ b/crates/milli/src/documents/mod.rs @@ -150,11 +150,24 @@ pub fn objects_from_json_value(json: serde_json::Value) -> Vec { macro_rules! documents { ($data:tt) => {{ let documents = serde_json::json!($data); - let documents = $crate::documents::objects_from_json_value(documents); - $crate::documents::documents_batch_reader_from_objects(documents) + let mut file = tempfile::tempfile().unwrap(); + for document in documents.as_array().unwrap() { + serde_json::to_writer(&mut file, &document).unwrap(); + } + file.sync_all().unwrap(); + unsafe { memmap2::Mmap::map(&file).unwrap() } }}; } +pub fn mmap_from_objects(objects: impl IntoIterator) -> memmap2::Mmap { + let mut writer = tempfile::tempfile().map(std::io::BufWriter::new).unwrap(); + for object in objects { + serde_json::to_writer(&mut writer, &object).unwrap(); + } + let file = writer.into_inner().unwrap(); + unsafe { memmap2::Mmap::map(&file).unwrap() } +} + pub fn documents_batch_reader_from_objects( objects: impl IntoIterator, ) -> DocumentsBatchReader>> { @@ -224,20 +237,6 @@ mod test { assert!(documents.next_document().unwrap().is_none()); } - #[test] - fn test_nested() { - let docs_reader = documents!([{ - "hello": { - "toto": ["hello"] - } - }]); - - let (mut cursor, _) = docs_reader.into_cursor_and_fields_index(); - let doc = cursor.next_document().unwrap().unwrap(); - let nested: Value = serde_json::from_slice(doc.get(0).unwrap()).unwrap(); - assert_eq!(nested, json!({ "toto": ["hello"] })); - } - #[test] fn out_of_order_json_fields() { let _documents = documents!([ diff --git a/crates/milli/src/index.rs b/crates/milli/src/index.rs index 08a8e36f8..7c6b6e9f2 100644 --- a/crates/milli/src/index.rs +++ b/crates/milli/src/index.rs @@ -1680,19 +1680,23 @@ pub(crate) mod tests { use std::ops::Deref; use big_s::S; + use bumpalo::Bump; use heed::{EnvOpenOptions, RwTxn}; use maplit::{btreemap, hashset}; + use memmap2::Mmap; use tempfile::TempDir; - use crate::documents::DocumentsBatchReader; use crate::error::{Error, InternalError}; use crate::index::{DEFAULT_MIN_WORD_LEN_ONE_TYPO, DEFAULT_MIN_WORD_LEN_TWO_TYPOS}; + use crate::update::new::indexer; use crate::update::{ - self, IndexDocuments, IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, Setting, - Settings, + self, IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, Setting, Settings, }; use crate::vector::settings::{EmbedderSource, EmbeddingSettings}; - use crate::{db_snap, obkv_to_json, Filter, Index, Search, SearchResult}; + use crate::vector::EmbeddingConfigs; + use crate::{ + db_snap, obkv_to_json, Filter, Index, Search, SearchResult, ThreadPoolNoAbortBuilder, + }; pub(crate) struct TempIndex { pub inner: Index, @@ -1725,35 +1729,60 @@ pub(crate) mod tests { pub fn new() -> Self { Self::new_with_map_size(4096 * 2000) } - pub fn add_documents_using_wtxn<'t, R>( + + pub fn add_documents_using_wtxn<'t>( &'t self, wtxn: &mut RwTxn<'t>, - documents: DocumentsBatchReader, - ) -> Result<(), crate::error::Error> - where - R: std::io::Read + std::io::Seek, - { - let builder = IndexDocuments::new( - wtxn, - self, - &self.indexer_config, - self.index_documents_config.clone(), - |_| (), - || false, - ) - .unwrap(); - let (builder, user_error) = builder.add_documents(documents).unwrap(); - user_error?; - builder.execute()?; + documents: Mmap, + ) -> Result<(), crate::error::Error> { + let local_pool; + let indexer_config = &self.indexer_config; + let pool = match &indexer_config.thread_pool { + Some(pool) => pool, + None => { + local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); + &local_pool + } + }; + + let rtxn = self.inner.read_txn()?; + let db_fields_ids_map = self.inner.fields_ids_map(&rtxn)?; + let mut new_fields_ids_map = db_fields_ids_map.clone(); + + let embedders = EmbeddingConfigs::default(); + let mut indexer = + indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); + indexer.add_documents(&documents).unwrap(); + + let indexer_alloc = Bump::new(); + let (document_changes, _operation_stats, primary_key) = indexer.into_changes( + &indexer_alloc, + &self.inner, + &rtxn, + None, + &mut new_fields_ids_map, + )?; + + pool.install(|| { + indexer::index( + wtxn, + &self.inner, + indexer_config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + embedders, + &|| false, + &|_| (), + ) + }) + .unwrap()?; + Ok(()) } - pub fn add_documents( - &self, - documents: DocumentsBatchReader, - ) -> Result<(), crate::error::Error> - where - R: std::io::Read + std::io::Seek, - { + + pub fn add_documents(&self, documents: Mmap) -> Result<(), crate::error::Error> { let mut wtxn = self.write_txn().unwrap(); self.add_documents_using_wtxn(&mut wtxn, documents)?; wtxn.commit().unwrap(); @@ -1769,6 +1798,7 @@ pub(crate) mod tests { wtxn.commit().unwrap(); Ok(()) } + pub fn update_settings_using_wtxn<'t>( &'t self, wtxn: &mut RwTxn<'t>, @@ -1784,19 +1814,54 @@ pub(crate) mod tests { &'t self, wtxn: &mut RwTxn<'t>, external_document_ids: Vec, - ) { - let builder = IndexDocuments::new( - wtxn, - self, - &self.indexer_config, - self.index_documents_config.clone(), - |_| (), - || false, - ) - .unwrap(); - let (builder, user_error) = builder.remove_documents(external_document_ids).unwrap(); - user_error.unwrap(); - builder.execute().unwrap(); + ) -> Result<(), crate::error::Error> { + let local_pool; + let indexer_config = &self.indexer_config; + let pool = match &indexer_config.thread_pool { + Some(pool) => pool, + None => { + local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); + &local_pool + } + }; + + let rtxn = self.inner.read_txn()?; + let db_fields_ids_map = self.inner.fields_ids_map(&rtxn)?; + let mut new_fields_ids_map = db_fields_ids_map.clone(); + + let embedders = EmbeddingConfigs::default(); + let mut indexer = + indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); + let external_document_ids: Vec<_> = + external_document_ids.iter().map(AsRef::as_ref).collect(); + indexer.delete_documents(external_document_ids.as_slice()); + + let indexer_alloc = Bump::new(); + let (document_changes, _operation_stats, primary_key) = indexer.into_changes( + &indexer_alloc, + &self.inner, + &rtxn, + None, + &mut new_fields_ids_map, + )?; + + pool.install(|| { + indexer::index( + wtxn, + &self.inner, + indexer_config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + embedders, + &|| false, + &|_| (), + ) + }) + .unwrap()?; + + Ok(()) } pub fn delete_documents(&self, external_document_ids: Vec) { @@ -1819,29 +1884,55 @@ pub(crate) mod tests { let index = TempIndex::new(); let mut wtxn = index.inner.write_txn().unwrap(); - let should_abort = AtomicBool::new(false); - let builder = IndexDocuments::new( - &mut wtxn, - &index.inner, - &index.indexer_config, - index.index_documents_config.clone(), - |_| (), - || should_abort.load(Relaxed), - ) - .unwrap(); - let (builder, user_error) = builder - .add_documents(documents!([ - { "id": 1, "name": "kevin" }, - { "id": 2, "name": "bob", "age": 20 }, - { "id": 2, "name": "bob", "age": 20 }, - ])) + let local_pool; + let indexer_config = &index.indexer_config; + let pool = match &indexer_config.thread_pool { + Some(pool) => pool, + None => { + local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); + &local_pool + } + }; + + let rtxn = index.inner.read_txn().unwrap(); + let db_fields_ids_map = index.inner.fields_ids_map(&rtxn).unwrap(); + let mut new_fields_ids_map = db_fields_ids_map.clone(); + + let embedders = EmbeddingConfigs::default(); + let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); + let payload = documents!([ + { "id": 1, "name": "kevin" }, + { "id": 2, "name": "bob", "age": 20 }, + { "id": 2, "name": "bob", "age": 20 }, + ]); + indexer.add_documents(&payload); + + let indexer_alloc = Bump::new(); + let (document_changes, _operation_stats, primary_key) = indexer + .into_changes(&indexer_alloc, &index.inner, &rtxn, None, &mut new_fields_ids_map) .unwrap(); - user_error.unwrap(); should_abort.store(true, Relaxed); - let err = builder.execute().unwrap_err(); + + let err = pool + .install(|| { + indexer::index( + &mut wtxn, + &index.inner, + indexer_config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + embedders, + &|| should_abort.load(Relaxed), + &|_| (), + ) + }) + .unwrap() + .unwrap_err(); assert!(matches!(err, Error::InternalError(InternalError::AbortedIndexation))); } diff --git a/crates/milli/src/search/facet/facet_distribution.rs b/crates/milli/src/search/facet/facet_distribution.rs index a63bb634b..9f363528c 100644 --- a/crates/milli/src/search/facet/facet_distribution.rs +++ b/crates/milli/src/search/facet/facet_distribution.rs @@ -407,7 +407,7 @@ mod tests { use big_s::S; use maplit::hashset; - use crate::documents::documents_batch_reader_from_objects; + use crate::documents::mmap_from_objects; use crate::index::tests::TempIndex; use crate::{milli_snap, FacetDistribution, OrderBy}; @@ -508,8 +508,7 @@ mod tests { documents.push(document); } - let documents = documents_batch_reader_from_objects(documents); - + let documents = mmap_from_objects(documents); index.add_documents(documents).unwrap(); let txn = index.read_txn().unwrap(); @@ -594,8 +593,7 @@ mod tests { documents.push(document); } - let documents = documents_batch_reader_from_objects(documents); - + let documents = mmap_from_objects(documents); index.add_documents(documents).unwrap(); let txn = index.read_txn().unwrap(); @@ -654,8 +652,7 @@ mod tests { documents.push(document); } - let documents = documents_batch_reader_from_objects(documents); - + let documents = mmap_from_objects(documents); index.add_documents(documents).unwrap(); let txn = index.read_txn().unwrap(); @@ -706,8 +703,7 @@ mod tests { documents.push(document); } - let documents = documents_batch_reader_from_objects(documents); - + let documents = mmap_from_objects(documents); index.add_documents(documents).unwrap(); let txn = index.read_txn().unwrap(); @@ -758,8 +754,7 @@ mod tests { documents.push(document); } - let documents = documents_batch_reader_from_objects(documents); - + let documents = mmap_from_objects(documents); index.add_documents(documents).unwrap(); let txn = index.read_txn().unwrap(); @@ -814,8 +809,7 @@ mod tests { documents.push(document); } - let documents = documents_batch_reader_from_objects(documents); - + let documents = mmap_from_objects(documents); index.add_documents(documents).unwrap(); let txn = index.read_txn().unwrap(); diff --git a/crates/milli/src/search/new/tests/integration.rs b/crates/milli/src/search/new/tests/integration.rs index e2ea4580e..25d63ed6e 100644 --- a/crates/milli/src/search/new/tests/integration.rs +++ b/crates/milli/src/search/new/tests/integration.rs @@ -1,11 +1,16 @@ -use std::io::Cursor; +use std::io::Write; use big_s::S; +use bumpalo::Bump; use heed::EnvOpenOptions; use maplit::{btreemap, hashset}; use crate::documents::{DocumentsBatchBuilder, DocumentsBatchReader}; -use crate::update::{IndexDocuments, IndexDocumentsConfig, IndexerConfig, Settings}; +use crate::update::new::indexer; +use crate::update::{ + IndexDocuments, IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, Settings, +}; +use crate::vector::EmbeddingConfigs; use crate::{db_snap, Criterion, Index, Object}; pub const CONTENT: &str = include_str!("../../../../tests/assets/test_set.ndjson"); @@ -16,6 +21,7 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index { let index = Index::new(options, &path).unwrap(); let mut wtxn = index.write_txn().unwrap(); + let rtxn = index.read_txn().unwrap(); let config = IndexerConfig::default(); let mut builder = Settings::new(&mut wtxn, &index, &config); @@ -43,27 +49,41 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index { // index documents let config = IndexerConfig { max_memory: Some(10 * 1024 * 1024), ..Default::default() }; - let indexing_config = IndexDocumentsConfig::default(); - let builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| (), || false).unwrap(); - let mut documents_builder = DocumentsBatchBuilder::new(Vec::new()); - let reader = Cursor::new(CONTENT.as_bytes()); + let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); + let mut new_fields_ids_map = db_fields_ids_map.clone(); - for result in serde_json::Deserializer::from_reader(reader).into_iter::() { - let object = result.unwrap(); - documents_builder.append_json_object(&object).unwrap(); - } + let embedders = EmbeddingConfigs::default(); + let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); - let vector = documents_builder.into_inner().unwrap(); + let mut file = tempfile::tempfile().unwrap(); + file.write_all(CONTENT.as_bytes()).unwrap(); + file.sync_all().unwrap(); + let payload = unsafe { memmap2::Mmap::map(&file).unwrap() }; // index documents - let content = DocumentsBatchReader::from_reader(Cursor::new(vector)).unwrap(); - let (builder, user_error) = builder.add_documents(content).unwrap(); - user_error.unwrap(); - builder.execute().unwrap(); + indexer.add_documents(&payload).unwrap(); + + let indexer_alloc = Bump::new(); + let (document_changes, _operation_stats, primary_key) = + indexer.into_changes(&indexer_alloc, &index, &rtxn, None, &mut new_fields_ids_map).unwrap(); + + indexer::index( + &mut wtxn, + &index, + config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + embedders, + &|| false, + &|_| (), + ) + .unwrap(); wtxn.commit().unwrap(); + drop(rtxn); index } diff --git a/crates/milli/src/update/facet/bulk.rs b/crates/milli/src/update/facet/bulk.rs index 19dfc310b..378b6b6da 100644 --- a/crates/milli/src/update/facet/bulk.rs +++ b/crates/milli/src/update/facet/bulk.rs @@ -369,7 +369,7 @@ mod tests { use maplit::hashset; use roaring::RoaringBitmap; - use crate::documents::documents_batch_reader_from_objects; + use crate::documents::{documents_batch_reader_from_objects, mmap_from_objects}; use crate::heed_codec::facet::OrderedF64Codec; use crate::heed_codec::StrRefCodec; use crate::index::tests::TempIndex; @@ -492,8 +492,8 @@ mod tests { ); } - let documents = documents_batch_reader_from_objects(documents); - index.add_documents(documents).unwrap(); + let documents = mmap_from_objects(documents); + index.add_documents(documents); db_snap!(index, facet_id_f64_docids, "initial", @"c34f499261f3510d862fa0283bbe843a"); } diff --git a/crates/milli/src/update/index_documents/mod.rs b/crates/milli/src/update/index_documents/mod.rs index 8ac183241..bea26db83 100644 --- a/crates/milli/src/update/index_documents/mod.rs +++ b/crates/milli/src/update/index_documents/mod.rs @@ -1,33 +1,38 @@ mod enrich; mod extract; mod helpers; -mod parallel; mod transform; mod typed_chunk; use std::collections::HashSet; +use std::iter; use std::num::NonZeroU32; +use std::sync::Arc; -use grenad::Merger; +use crossbeam_channel::{Receiver, Sender}; +use grenad::{Merger, MergerBuilder}; +use hashbrown::HashMap; use heed::types::Str; use heed::Database; +use rand::SeedableRng as _; +use roaring::RoaringBitmap; use serde::{Deserialize, Serialize}; use slice_group_by::GroupBy; -use typed_chunk::TypedChunk; +use tracing::debug; +use typed_chunk::{write_typed_chunk_into_index, ChunkAccumulator, TypedChunk}; pub use self::enrich::{extract_finite_float_from_value, DocumentId}; pub use self::helpers::*; pub use self::transform::{Transform, TransformOutput}; -use crate::documents::{obkv_to_object, DocumentsBatchBuilder, DocumentsBatchReader}; -use crate::error::{Error, InternalError, UserError}; +use crate::documents::obkv_to_object; +use crate::error::{Error, InternalError}; use crate::thread_pool_no_abort::ThreadPoolNoAbortBuilder; pub use crate::update::index_documents::helpers::CursorClonableMmap; -use crate::update::index_documents::parallel::ImmutableObkvs; use crate::update::{ IndexerConfig, UpdateIndexingStep, WordPrefixDocids, WordPrefixIntegerDocids, WordsPrefixesFst, }; use crate::vector::{ArroyWrapper, EmbeddingConfigs}; -use crate::{CboRoaringBitmapCodec, Index, Object, Result}; +use crate::{CboRoaringBitmapCodec, Index, Result}; static MERGED_DATABASE_COUNT: usize = 7; static PREFIX_DATABASE_COUNT: usize = 4; @@ -122,6 +127,338 @@ where self } + /// Returns the total number of documents in the index after the update. + #[tracing::instrument( + level = "trace", + skip_all, + target = "indexing::details", + name = "index_documents_raw" + )] + pub fn execute_raw(self, output: TransformOutput) -> Result + where + FP: Fn(UpdateIndexingStep) + Sync, + FA: Fn() -> bool + Sync, + { + let TransformOutput { + primary_key, + mut settings_diff, + field_distribution, + documents_count, + original_documents, + flattened_documents, + } = output; + + // update the internal facet and searchable list, + // because they might have changed due to the nested documents flattening. + settings_diff.new.recompute_facets(self.wtxn, self.index)?; + settings_diff.new.recompute_searchables(self.wtxn, self.index)?; + + let settings_diff = Arc::new(settings_diff); + let embedders_configs = Arc::new(self.index.embedding_configs(self.wtxn)?); + + let possible_embedding_mistakes = + crate::vector::error::PossibleEmbeddingMistakes::new(&field_distribution); + + let backup_pool; + let pool = match self.indexer_config.thread_pool { + Some(ref pool) => pool, + None => { + // We initialize a backup pool with the default + // settings if none have already been set. + #[allow(unused_mut)] + let mut pool_builder = ThreadPoolNoAbortBuilder::new(); + + #[cfg(test)] + { + pool_builder = pool_builder.num_threads(1); + } + + backup_pool = pool_builder.build()?; + &backup_pool + } + }; + + // create LMDB writer channel + let (lmdb_writer_sx, lmdb_writer_rx): ( + Sender>, + Receiver>, + ) = crossbeam_channel::unbounded(); + + // get the primary key field id + let primary_key_id = settings_diff.new.fields_ids_map.id(&primary_key).unwrap(); + + let pool_params = GrenadParameters { + chunk_compression_type: self.indexer_config.chunk_compression_type, + chunk_compression_level: self.indexer_config.chunk_compression_level, + max_memory: self.indexer_config.max_memory, + max_nb_chunks: self.indexer_config.max_nb_chunks, // default value, may be chosen. + }; + let documents_chunk_size = match self.indexer_config.documents_chunk_size { + Some(chunk_size) => chunk_size, + None => { + let default_chunk_size = 1024 * 1024 * 4; // 4MiB + let min_chunk_size = 1024 * 512; // 512KiB + + // compute the chunk size from the number of available threads and the inputed data size. + let total_size = match flattened_documents.as_ref() { + Some(flattened_documents) => flattened_documents.metadata().map(|m| m.len()), + None => Ok(default_chunk_size as u64), + }; + let current_num_threads = pool.current_num_threads(); + // if we have more than 2 thread, create a number of chunk equal to 3/4 threads count + let chunk_count = if current_num_threads > 2 { + (current_num_threads * 3 / 4).max(2) + } else { + current_num_threads + }; + total_size + .map_or(default_chunk_size, |size| (size as usize) / chunk_count) + .max(min_chunk_size) + } + }; + + let original_documents = match original_documents { + Some(original_documents) => Some(grenad::Reader::new(original_documents)?), + None => None, + }; + let flattened_documents = match flattened_documents { + Some(flattened_documents) => Some(grenad::Reader::new(flattened_documents)?), + None => None, + }; + + let max_positions_per_attributes = self.indexer_config.max_positions_per_attributes; + + let mut final_documents_ids = RoaringBitmap::new(); + let mut databases_seen = 0; + let mut word_position_docids = None; + let mut word_fid_docids = None; + let mut word_docids = None; + let mut exact_word_docids = None; + let mut chunk_accumulator = ChunkAccumulator::default(); + let mut dimension = HashMap::new(); + + let current_span = tracing::Span::current(); + + // Run extraction pipeline in parallel. + pool.install(|| { + let settings_diff_cloned = settings_diff.clone(); + rayon::spawn(move || { + let child_span = tracing::trace_span!(target: "indexing::details", parent: ¤t_span, "extract_and_send_grenad_chunks"); + let _enter = child_span.enter(); + + // split obkv file into several chunks + let original_chunk_iter = match original_documents { + Some(original_documents) => { + grenad_obkv_into_chunks(original_documents,pool_params,documents_chunk_size).map(either::Left) + }, + None => Ok(either::Right(iter::empty())), + }; + + // split obkv file into several chunks + let flattened_chunk_iter = match flattened_documents { + Some(flattened_documents) => { + grenad_obkv_into_chunks(flattened_documents, pool_params, documents_chunk_size).map(either::Left) + }, + None => Ok(either::Right(iter::empty())), + }; + + let result = original_chunk_iter.and_then(|original_chunk| { + let flattened_chunk = flattened_chunk_iter?; + // extract all databases from the chunked obkv douments + extract::data_from_obkv_documents( + original_chunk, + flattened_chunk, + pool_params, + lmdb_writer_sx.clone(), + primary_key_id, + embedders_configs.clone(), + settings_diff_cloned, + max_positions_per_attributes, + Arc::new(possible_embedding_mistakes) + ) + }); + + if let Err(e) = result { + let _ = lmdb_writer_sx.send(Err(e)); + } + + // needs to be dropped to avoid channel waiting lock. + drop(lmdb_writer_sx); + }); + + (self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase { + databases_seen, + total_databases: TOTAL_POSTING_DATABASE_COUNT, + }); + + loop { + if (self.should_abort)() { + return Err(Error::InternalError(InternalError::AbortedIndexation)); + } + + match lmdb_writer_rx.clone().recv_timeout(std::time::Duration::from_millis(500)) { + Err(status) => { + if let Some(typed_chunks) = chunk_accumulator.pop_longest() { + let (docids, is_merged_database) = + write_typed_chunk_into_index(self.wtxn, self.index, &settings_diff, typed_chunks)?; + if !docids.is_empty() { + final_documents_ids |= docids; + let documents_seen_count = final_documents_ids.len(); + (self.progress)(UpdateIndexingStep::IndexDocuments { + documents_seen: documents_seen_count as usize, + total_documents: documents_count, + }); + debug!(documents = documents_seen_count, total = documents_count, "Seen"); + } + if is_merged_database { + databases_seen += 1; + (self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase { + databases_seen, + total_databases: TOTAL_POSTING_DATABASE_COUNT, + }); + } + // If no more chunk remains in the chunk accumulator and the channel is disconected, break. + } else if status == crossbeam_channel::RecvTimeoutError::Disconnected { + break; + } else { + rayon::yield_now(); + } + } + Ok(result) => { + let typed_chunk = match result? { + TypedChunk::WordDocids { + word_docids_reader, + exact_word_docids_reader, + word_fid_docids_reader, + } => { + let cloneable_chunk = + unsafe { as_cloneable_grenad(&word_docids_reader)? }; + let word_docids = word_docids.get_or_insert_with(|| { + MergerBuilder::new(MergeDeladdCboRoaringBitmaps) + }); + word_docids.push(cloneable_chunk.into_cursor()?); + let cloneable_chunk = + unsafe { as_cloneable_grenad(&exact_word_docids_reader)? }; + let exact_word_docids = + exact_word_docids.get_or_insert_with(|| { + MergerBuilder::new( + MergeDeladdCboRoaringBitmaps, + ) + }); + exact_word_docids.push(cloneable_chunk.into_cursor()?); + let cloneable_chunk = + unsafe { as_cloneable_grenad(&word_fid_docids_reader)? }; + let word_fid_docids = word_fid_docids.get_or_insert_with(|| { + MergerBuilder::new(MergeDeladdCboRoaringBitmaps) + }); + word_fid_docids.push(cloneable_chunk.into_cursor()?); + TypedChunk::WordDocids { + word_docids_reader, + exact_word_docids_reader, + word_fid_docids_reader, + } + } + TypedChunk::WordPositionDocids(chunk) => { + let cloneable_chunk = unsafe { as_cloneable_grenad(&chunk)? }; + let word_position_docids = + word_position_docids.get_or_insert_with(|| { + MergerBuilder::new( + MergeDeladdCboRoaringBitmaps, + ) + }); + word_position_docids.push(cloneable_chunk.into_cursor()?); + TypedChunk::WordPositionDocids(chunk) + } + TypedChunk::VectorPoints { + expected_dimension, + remove_vectors, + embeddings, + manual_vectors, + embedder_name, + add_to_user_provided, + remove_from_user_provided, + } => { + dimension.insert(embedder_name.clone(), expected_dimension); + TypedChunk::VectorPoints { + remove_vectors, + embeddings, + expected_dimension, + manual_vectors, + embedder_name, + add_to_user_provided, + remove_from_user_provided, + } + } + otherwise => otherwise, + }; + + chunk_accumulator.insert(typed_chunk); + } + } + } + + Ok(()) + }).map_err(InternalError::from)??; + + // We write the field distribution into the main database + self.index.put_field_distribution(self.wtxn, &field_distribution)?; + + // We write the primary key field id into the main database + self.index.put_primary_key(self.wtxn, &primary_key)?; + let number_of_documents = self.index.number_of_documents(self.wtxn)?; + let mut rng = rand::rngs::StdRng::seed_from_u64(42); + + // If an embedder wasn't used in the typedchunk but must be binary quantized + // we should insert it in `dimension` + for (name, action) in settings_diff.embedding_config_updates.iter() { + if action.is_being_quantized && !dimension.contains_key(name.as_str()) { + let index = self.index.embedder_category_id.get(self.wtxn, name)?.ok_or( + InternalError::DatabaseMissingEntry { + db_name: "embedder_category_id", + key: None, + }, + )?; + let reader = + ArroyWrapper::new(self.index.vector_arroy, index, action.was_quantized); + let dim = reader.dimensions(self.wtxn)?; + dimension.insert(name.to_string(), dim); + } + } + + for (embedder_name, dimension) in dimension { + let wtxn = &mut *self.wtxn; + let vector_arroy = self.index.vector_arroy; + let cancel = &self.should_abort; + + let embedder_index = self.index.embedder_category_id.get(wtxn, &embedder_name)?.ok_or( + InternalError::DatabaseMissingEntry { db_name: "embedder_category_id", key: None }, + )?; + let embedder_config = settings_diff.embedding_config_updates.get(&embedder_name); + let was_quantized = settings_diff + .old + .embedding_configs + .get(&embedder_name) + .map_or(false, |conf| conf.2); + let is_quantizing = embedder_config.map_or(false, |action| action.is_being_quantized); + + pool.install(|| { + let mut writer = ArroyWrapper::new(vector_arroy, embedder_index, was_quantized); + writer.build_and_quantize(wtxn, &mut rng, dimension, is_quantizing, cancel)?; + Result::Ok(()) + }) + .map_err(InternalError::from)??; + } + + self.execute_prefix_databases( + word_docids.map(MergerBuilder::build), + exact_word_docids.map(MergerBuilder::build), + word_position_docids.map(MergerBuilder::build), + word_fid_docids.map(MergerBuilder::build), + )?; + + Ok(number_of_documents) + } + #[tracing::instrument( level = "trace", skip_all, @@ -335,17 +672,19 @@ mod tests { use std::collections::BTreeMap; use big_s::S; + use bumpalo::Bump; use fst::IntoStreamer; use heed::RwTxn; use maplit::hashset; use super::*; - use crate::documents::documents_batch_reader_from_objects; + use crate::documents::{documents_batch_reader_from_objects, mmap_from_objects}; use crate::index::tests::TempIndex; use crate::index::IndexEmbeddingConfig; use crate::search::TermsMatchingStrategy; + use crate::update::new::indexer; use crate::update::Setting; - use crate::{db_snap, Filter, Search}; + use crate::{db_snap, Filter, Search, UserError}; #[test] fn simple_document_replacement() { @@ -787,7 +1126,7 @@ mod tests { big_object.insert(key, serde_json::Value::from("I am a text!")); } - let documents = documents_batch_reader_from_objects([big_object]); + let documents = mmap_from_objects([big_object]); index.add_documents(documents).unwrap(); } @@ -1280,7 +1619,7 @@ mod tests { serde_json::Value::Object(object) => Some(object), _ => None, }); - documents_batch_reader_from_objects(documents_iter) + mmap_from_objects(documents_iter) }; // Index those 200 long documents index.add_documents(content).unwrap(); @@ -1326,9 +1665,9 @@ mod tests { index.add_documents(doc1).unwrap(); index.add_documents(doc2).unwrap(); - let wtxn = index.read_txn().unwrap(); + let rtxn = index.read_txn().unwrap(); - let map = index.external_documents_ids().to_hash_map(&wtxn).unwrap(); + let map = index.external_documents_ids().to_hash_map(&rtxn).unwrap(); let ids = map.values().collect::>(); assert_eq!(ids.len(), map.len()); @@ -1609,10 +1948,23 @@ mod tests { "title": "something", }]}; - index.add_documents(doc1).unwrap(); - index.add_documents(doc2).unwrap_err(); - index.add_documents(doc3).unwrap_err(); - index.add_documents(doc4).unwrap_err(); + let rtxn = index.inner.read_txn().unwrap(); + let db_fields_ids_map = index.inner.fields_ids_map(&rtxn).unwrap(); + let mut new_fields_ids_map = db_fields_ids_map.clone(); + + let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); + indexer.add_documents(&doc1).unwrap(); + indexer.add_documents(&doc2).unwrap(); + indexer.add_documents(&doc3).unwrap(); + indexer.add_documents(&doc4).unwrap(); + + let indexer_alloc = Bump::new(); + let (_document_changes, operation_stats, _primary_key) = indexer + .into_changes(&indexer_alloc, &index.inner, &rtxn, None, &mut new_fields_ids_map) + .unwrap(); + + assert_eq!(operation_stats.iter().filter(|ps| ps.error.is_none()).count(), 1); + assert_eq!(operation_stats.iter().filter(|ps| ps.error.is_some()).count(), 3); } #[test] @@ -1767,34 +2119,51 @@ mod tests { index.index_documents_config.update_method = IndexDocumentsMethod::UpdateDocuments; let mut wtxn = index.write_txn().unwrap(); - let builder = IndexDocuments::new( - &mut wtxn, - &index, - &index.indexer_config, - index.index_documents_config.clone(), - |_| (), - || false, - ) - .unwrap(); + let local_pool; + let indexer_config = &index.indexer_config; + let pool = match &indexer_config.thread_pool { + Some(pool) => pool, + None => { + local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); + &local_pool + } + }; + + let rtxn = index.inner.read_txn().unwrap(); + let db_fields_ids_map = index.inner.fields_ids_map(&rtxn).unwrap(); + let mut new_fields_ids_map = db_fields_ids_map.clone(); let documents = documents!([ { "id": 1, "doggo": "kevin" }, { "id": 2, "doggo": { "name": "bob", "age": 20 } }, { "id": 3, "name": "jean", "age": 25 }, ]); - let (builder, added) = builder.add_documents(documents).unwrap(); - insta::assert_snapshot!(added.unwrap(), @"3"); - let (builder, removed) = builder.remove_documents(vec![S("2")]).unwrap(); - insta::assert_snapshot!(removed.unwrap(), @"1"); + let indexer_alloc = Bump::new(); + let embedders = EmbeddingConfigs::default(); + let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); + indexer.add_documents(&documents); + indexer.delete_documents(&["2"]); + let (document_changes, _operation_stats, primary_key) = indexer + .into_changes(&indexer_alloc, &index.inner, &rtxn, None, &mut new_fields_ids_map) + .unwrap(); - let addition = builder.execute().unwrap(); - insta::assert_debug_snapshot!(addition, @r###" - DocumentAdditionResult { - indexed_documents: 3, - number_of_documents: 2, - } - "###); + pool.install(|| { + indexer::index( + &mut wtxn, + &index.inner, + indexer_config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + embedders, + &|| false, + &|_| (), + ) + }) + .unwrap() + .unwrap(); wtxn.commit().unwrap(); db_snap!(index, documents, @r###" @@ -1809,41 +2178,57 @@ mod tests { index.index_documents_config.update_method = IndexDocumentsMethod::UpdateDocuments; let mut wtxn = index.write_txn().unwrap(); - let builder = IndexDocuments::new( - &mut wtxn, - &index, - &index.indexer_config, - index.index_documents_config.clone(), - |_| (), - || false, - ) - .unwrap(); + let local_pool; + let indexer_config = &index.indexer_config; + let pool = match &indexer_config.thread_pool { + Some(pool) => pool, + None => { + local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); + &local_pool + } + }; + + let rtxn = index.inner.read_txn().unwrap(); + let db_fields_ids_map = index.inner.fields_ids_map(&rtxn).unwrap(); + let mut new_fields_ids_map = db_fields_ids_map.clone(); let documents = documents!([ { "id": 1, "doggo": "kevin" }, { "id": 2, "doggo": { "name": "bob", "age": 20 } }, { "id": 3, "name": "jean", "age": 25 }, ]); - let (builder, added) = builder.add_documents(documents).unwrap(); - insta::assert_snapshot!(added.unwrap(), @"3"); + let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::UpdateDocuments); + indexer.add_documents(&documents).unwrap(); let documents = documents!([ { "id": 2, "catto": "jorts" }, { "id": 3, "legs": 4 }, ]); - let (builder, added) = builder.add_documents(documents).unwrap(); - insta::assert_snapshot!(added.unwrap(), @"2"); + indexer.add_documents(&documents).unwrap(); + indexer.delete_documents(&["1", "2"]); - let (builder, removed) = builder.remove_documents(vec![S("1"), S("2")]).unwrap(); - insta::assert_snapshot!(removed.unwrap(), @"2"); + let indexer_alloc = Bump::new(); + let embedders = EmbeddingConfigs::default(); + let (document_changes, _operation_stats, primary_key) = indexer + .into_changes(&indexer_alloc, &index.inner, &rtxn, None, &mut new_fields_ids_map) + .unwrap(); - let addition = builder.execute().unwrap(); - insta::assert_debug_snapshot!(addition, @r###" - DocumentAdditionResult { - indexed_documents: 5, - number_of_documents: 1, - } - "###); + pool.install(|| { + indexer::index( + &mut wtxn, + &index.inner, + indexer_config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + embedders, + &|| false, + &|_| (), + ) + }) + .unwrap() + .unwrap(); wtxn.commit().unwrap(); db_snap!(index, documents, @r###" @@ -1854,34 +2239,52 @@ mod tests { #[test] fn add_document_and_in_another_transform_update_and_delete_documents() { let mut index = TempIndex::new(); - index.index_documents_config.update_method = IndexDocumentsMethod::UpdateDocuments; let mut wtxn = index.write_txn().unwrap(); - let builder = IndexDocuments::new( - &mut wtxn, - &index, - &index.indexer_config, - index.index_documents_config.clone(), - |_| (), - || false, - ) - .unwrap(); + let local_pool; + let indexer_config = &index.indexer_config; + let pool = match &indexer_config.thread_pool { + Some(pool) => pool, + None => { + local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); + &local_pool + } + }; + + let rtxn = index.inner.read_txn().unwrap(); + let db_fields_ids_map = index.inner.fields_ids_map(&rtxn).unwrap(); + let mut new_fields_ids_map = db_fields_ids_map.clone(); let documents = documents!([ { "id": 1, "doggo": "kevin" }, { "id": 2, "doggo": { "name": "bob", "age": 20 } }, { "id": 3, "name": "jean", "age": 25 }, ]); - let (builder, added) = builder.add_documents(documents).unwrap(); - insta::assert_snapshot!(added.unwrap(), @"3"); + let indexer_alloc = Bump::new(); + let embedders = EmbeddingConfigs::default(); + let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::UpdateDocuments); + indexer.add_documents(&documents); - let addition = builder.execute().unwrap(); - insta::assert_debug_snapshot!(addition, @r###" - DocumentAdditionResult { - indexed_documents: 3, - number_of_documents: 3, - } - "###); + let (document_changes, _operation_stats, primary_key) = indexer + .into_changes(&indexer_alloc, &index.inner, &rtxn, None, &mut new_fields_ids_map) + .unwrap(); + + pool.install(|| { + indexer::index( + &mut wtxn, + &index.inner, + indexer_config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + embedders, + &|| false, + &|_| (), + ) + }) + .unwrap() + .unwrap(); wtxn.commit().unwrap(); db_snap!(index, documents, @r###" @@ -1893,33 +2296,50 @@ mod tests { // A first batch of documents has been inserted let mut wtxn = index.write_txn().unwrap(); - let builder = IndexDocuments::new( - &mut wtxn, - &index, - &index.indexer_config, - index.index_documents_config.clone(), - |_| (), - || false, - ) - .unwrap(); + let local_pool; + let indexer_config = &index.indexer_config; + let pool = match &indexer_config.thread_pool { + Some(pool) => pool, + None => { + local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); + &local_pool + } + }; + + let rtxn = index.inner.read_txn().unwrap(); + let db_fields_ids_map = index.inner.fields_ids_map(&rtxn).unwrap(); + let mut new_fields_ids_map = db_fields_ids_map.clone(); let documents = documents!([ { "id": 2, "catto": "jorts" }, { "id": 3, "legs": 4 }, ]); - let (builder, added) = builder.add_documents(documents).unwrap(); - insta::assert_snapshot!(added.unwrap(), @"2"); + let indexer_alloc = Bump::new(); + let embedders = EmbeddingConfigs::default(); + let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::UpdateDocuments); + indexer.add_documents(&documents); + indexer.delete_documents(&["1", "2"]); - let (builder, removed) = builder.remove_documents(vec![S("1"), S("2")]).unwrap(); - insta::assert_snapshot!(removed.unwrap(), @"2"); + let (document_changes, _operation_stats, primary_key) = indexer + .into_changes(&indexer_alloc, &index.inner, &rtxn, None, &mut new_fields_ids_map) + .unwrap(); - let addition = builder.execute().unwrap(); - insta::assert_debug_snapshot!(addition, @r###" - DocumentAdditionResult { - indexed_documents: 2, - number_of_documents: 1, - } - "###); + pool.install(|| { + indexer::index( + &mut wtxn, + &index.inner, + indexer_config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + embedders, + &|| false, + &|_| (), + ) + }) + .unwrap() + .unwrap(); wtxn.commit().unwrap(); db_snap!(index, documents, @r###" @@ -1930,36 +2350,53 @@ mod tests { #[test] fn delete_document_and_then_add_documents_in_the_same_transform() { let mut index = TempIndex::new(); - index.index_documents_config.update_method = IndexDocumentsMethod::UpdateDocuments; let mut wtxn = index.write_txn().unwrap(); - let builder = IndexDocuments::new( - &mut wtxn, - &index, - &index.indexer_config, - index.index_documents_config.clone(), - |_| (), - || false, - ) - .unwrap(); + let local_pool; + let indexer_config = &index.indexer_config; + let pool = match &indexer_config.thread_pool { + Some(pool) => pool, + None => { + local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); + &local_pool + } + }; - let (builder, removed) = builder.remove_documents(vec![S("1"), S("2")]).unwrap(); - insta::assert_snapshot!(removed.unwrap(), @"0"); + let rtxn = index.inner.read_txn().unwrap(); + let db_fields_ids_map = index.inner.fields_ids_map(&rtxn).unwrap(); + let mut new_fields_ids_map = db_fields_ids_map.clone(); + + let indexer_alloc = Bump::new(); + let embedders = EmbeddingConfigs::default(); + let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::UpdateDocuments); + indexer.delete_documents(&["1", "2"]); let documents = documents!([ { "id": 2, "doggo": { "name": "jean", "age": 20 } }, { "id": 3, "name": "bob", "age": 25 }, ]); - let (builder, added) = builder.add_documents(documents).unwrap(); - insta::assert_snapshot!(added.unwrap(), @"2"); + indexer.add_documents(&documents); - let addition = builder.execute().unwrap(); - insta::assert_debug_snapshot!(addition, @r###" - DocumentAdditionResult { - indexed_documents: 2, - number_of_documents: 2, - } - "###); + let (document_changes, _operation_stats, primary_key) = indexer + .into_changes(&indexer_alloc, &index.inner, &rtxn, None, &mut new_fields_ids_map) + .unwrap(); + + pool.install(|| { + indexer::index( + &mut wtxn, + &index.inner, + indexer_config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + embedders, + &|| false, + &|_| (), + ) + }) + .unwrap() + .unwrap(); wtxn.commit().unwrap(); db_snap!(index, documents, @r###" @@ -1971,42 +2408,57 @@ mod tests { #[test] fn delete_the_same_document_multiple_time() { let mut index = TempIndex::new(); - index.index_documents_config.update_method = IndexDocumentsMethod::UpdateDocuments; let mut wtxn = index.write_txn().unwrap(); - let builder = IndexDocuments::new( - &mut wtxn, - &index, - &index.indexer_config, - index.index_documents_config.clone(), - |_| (), - || false, - ) - .unwrap(); + let local_pool; + let indexer_config = &index.indexer_config; + let pool = match &indexer_config.thread_pool { + Some(pool) => pool, + None => { + local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); + &local_pool + } + }; - let (builder, removed) = - builder.remove_documents(vec![S("1"), S("2"), S("1"), S("2")]).unwrap(); - insta::assert_snapshot!(removed.unwrap(), @"0"); + let rtxn = index.inner.read_txn().unwrap(); + let db_fields_ids_map = index.inner.fields_ids_map(&rtxn).unwrap(); + let mut new_fields_ids_map = db_fields_ids_map.clone(); + + let indexer_alloc = Bump::new(); + let embedders = EmbeddingConfigs::default(); + let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::UpdateDocuments); + + indexer.delete_documents(&["1", "2", "1", "2"]); let documents = documents!([ { "id": 1, "doggo": "kevin" }, { "id": 2, "doggo": { "name": "jean", "age": 20 } }, { "id": 3, "name": "bob", "age": 25 }, ]); - let (builder, added) = builder.add_documents(documents).unwrap(); - insta::assert_snapshot!(added.unwrap(), @"3"); + indexer.add_documents(&documents); - let (builder, removed) = - builder.remove_documents(vec![S("1"), S("2"), S("1"), S("2")]).unwrap(); - insta::assert_snapshot!(removed.unwrap(), @"2"); + indexer.delete_documents(&["1", "2", "1", "2"]); - let addition = builder.execute().unwrap(); - insta::assert_debug_snapshot!(addition, @r###" - DocumentAdditionResult { - indexed_documents: 3, - number_of_documents: 1, - } - "###); + let (document_changes, _operation_stats, primary_key) = indexer + .into_changes(&indexer_alloc, &index.inner, &rtxn, None, &mut new_fields_ids_map) + .unwrap(); + + pool.install(|| { + indexer::index( + &mut wtxn, + &index.inner, + indexer_config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + embedders, + &|| false, + &|_| (), + ) + }) + .unwrap() + .unwrap(); wtxn.commit().unwrap(); db_snap!(index, documents, @r###" @@ -2017,32 +2469,51 @@ mod tests { #[test] fn add_document_and_in_another_transform_delete_the_document_then_add_it_again() { let mut index = TempIndex::new(); - index.index_documents_config.update_method = IndexDocumentsMethod::UpdateDocuments; let mut wtxn = index.write_txn().unwrap(); - let builder = IndexDocuments::new( - &mut wtxn, - &index, - &index.indexer_config, - index.index_documents_config.clone(), - |_| (), - || false, - ) - .unwrap(); + let local_pool; + let indexer_config = &index.indexer_config; + let pool = match &indexer_config.thread_pool { + Some(pool) => pool, + None => { + local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); + &local_pool + } + }; + + let rtxn = index.inner.read_txn().unwrap(); + let db_fields_ids_map = index.inner.fields_ids_map(&rtxn).unwrap(); + let mut new_fields_ids_map = db_fields_ids_map.clone(); + + let indexer_alloc = Bump::new(); + let embedders = EmbeddingConfigs::default(); + let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::UpdateDocuments); let documents = documents!([ { "id": 1, "doggo": "kevin" }, ]); - let (builder, added) = builder.add_documents(documents).unwrap(); - insta::assert_snapshot!(added.unwrap(), @"1"); + indexer.add_documents(&documents); - let addition = builder.execute().unwrap(); - insta::assert_debug_snapshot!(addition, @r###" - DocumentAdditionResult { - indexed_documents: 1, - number_of_documents: 1, - } - "###); + let (document_changes, _operation_stats, primary_key) = indexer + .into_changes(&indexer_alloc, &index.inner, &rtxn, None, &mut new_fields_ids_map) + .unwrap(); + + pool.install(|| { + indexer::index( + &mut wtxn, + &index.inner, + indexer_config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + embedders, + &|| false, + &|_| (), + ) + }) + .unwrap() + .unwrap(); wtxn.commit().unwrap(); db_snap!(index, documents, @r###" @@ -2052,32 +2523,43 @@ mod tests { // A first batch of documents has been inserted let mut wtxn = index.write_txn().unwrap(); - let builder = IndexDocuments::new( - &mut wtxn, - &index, - &index.indexer_config, - index.index_documents_config.clone(), - |_| (), - || false, - ) - .unwrap(); - let (builder, removed) = builder.remove_documents(vec![S("1")]).unwrap(); - insta::assert_snapshot!(removed.unwrap(), @"1"); + let rtxn = index.inner.read_txn().unwrap(); + let db_fields_ids_map = index.inner.fields_ids_map(&rtxn).unwrap(); + let mut new_fields_ids_map = db_fields_ids_map.clone(); + + let indexer_alloc = Bump::new(); + let embedders = EmbeddingConfigs::default(); + let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); + + indexer.delete_documents(&["1"]); let documents = documents!([ { "id": 1, "catto": "jorts" }, ]); - let (builder, added) = builder.add_documents(documents).unwrap(); - insta::assert_snapshot!(added.unwrap(), @"1"); - let addition = builder.execute().unwrap(); - insta::assert_debug_snapshot!(addition, @r###" - DocumentAdditionResult { - indexed_documents: 1, - number_of_documents: 1, - } - "###); + indexer.add_documents(&documents); + + let (document_changes, _operation_stats, primary_key) = indexer + .into_changes(&indexer_alloc, &index.inner, &rtxn, None, &mut new_fields_ids_map) + .unwrap(); + + pool.install(|| { + indexer::index( + &mut wtxn, + &index.inner, + indexer_config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + embedders, + &|| false, + &|_| (), + ) + }) + .unwrap() + .unwrap(); wtxn.commit().unwrap(); db_snap!(index, documents, @r###" @@ -2230,32 +2712,52 @@ mod tests { let mut wtxn = index.write_txn().unwrap(); - let builder = IndexDocuments::new( - &mut wtxn, - &index, - &index.indexer_config, - index.index_documents_config.clone(), - |_| (), - || false, - ) - .unwrap(); + let local_pool; + let indexer_config = &index.indexer_config; + let pool = match &indexer_config.thread_pool { + Some(pool) => pool, + None => { + local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); + &local_pool + } + }; + + let rtxn = index.inner.read_txn().unwrap(); + let db_fields_ids_map = index.inner.fields_ids_map(&rtxn).unwrap(); + let mut new_fields_ids_map = db_fields_ids_map.clone(); + + let indexer_alloc = Bump::new(); + let embedders = EmbeddingConfigs::default(); + let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); // OP let documents = documents!([ { "id": 1, "doggo": "bernese" }, ]); - let (builder, added) = builder.add_documents(documents).unwrap(); - insta::assert_snapshot!(added.unwrap(), @"1"); + indexer.add_documents(&documents).unwrap(); // FINISHING - let addition = builder.execute().unwrap(); - insta::assert_debug_snapshot!(addition, @r###" - DocumentAdditionResult { - indexed_documents: 1, - number_of_documents: 1, - } - "###); + let (document_changes, _operation_stats, primary_key) = indexer + .into_changes(&indexer_alloc, &index.inner, &rtxn, None, &mut new_fields_ids_map) + .unwrap(); + + pool.install(|| { + indexer::index( + &mut wtxn, + &index.inner, + indexer_config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + embedders, + &|| false, + &|_| (), + ) + }) + .unwrap() + .unwrap(); wtxn.commit().unwrap(); db_snap!(index, documents, @r###" @@ -2274,32 +2776,41 @@ mod tests { let mut wtxn = index.write_txn().unwrap(); - let builder = IndexDocuments::new( - &mut wtxn, - &index, - &index.indexer_config, - index.index_documents_config.clone(), - |_| (), - || false, - ) - .unwrap(); + let rtxn = index.inner.read_txn().unwrap(); + let db_fields_ids_map = index.inner.fields_ids_map(&rtxn).unwrap(); + let mut new_fields_ids_map = db_fields_ids_map.clone(); - let (builder, removed) = builder.remove_documents(vec![S("1")]).unwrap(); - insta::assert_snapshot!(removed.unwrap(), @"1"); + let indexer_alloc = Bump::new(); + let embedders = EmbeddingConfigs::default(); + let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); + + indexer.delete_documents(&["1"]); let documents = documents!([ { "id": 0, "catto": "jorts" }, ]); - let (builder, added) = builder.add_documents(documents).unwrap(); - insta::assert_snapshot!(added.unwrap(), @"1"); + indexer.add_documents(&documents).unwrap(); - let addition = builder.execute().unwrap(); - insta::assert_debug_snapshot!(addition, @r###" - DocumentAdditionResult { - indexed_documents: 1, - number_of_documents: 1, - } - "###); + let (document_changes, _operation_stats, primary_key) = indexer + .into_changes(&indexer_alloc, &index.inner, &rtxn, None, &mut new_fields_ids_map) + .unwrap(); + + pool.install(|| { + indexer::index( + &mut wtxn, + &index.inner, + indexer_config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + embedders, + &|| false, + &|_| (), + ) + }) + .unwrap() + .unwrap(); wtxn.commit().unwrap(); db_snap!(index, documents, @r###" @@ -2317,29 +2828,39 @@ mod tests { let mut wtxn = index.write_txn().unwrap(); - let builder = IndexDocuments::new( - &mut wtxn, - &index, - &index.indexer_config, - index.index_documents_config.clone(), - |_| (), - || false, - ) - .unwrap(); + let rtxn = index.inner.read_txn().unwrap(); + let db_fields_ids_map = index.inner.fields_ids_map(&rtxn).unwrap(); + let mut new_fields_ids_map = db_fields_ids_map.clone(); + + let indexer_alloc = Bump::new(); + let embedders = EmbeddingConfigs::default(); + let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); let documents = documents!([ { "id": 1, "catto": "jorts" }, ]); - let (builder, added) = builder.add_documents(documents).unwrap(); - insta::assert_snapshot!(added.unwrap(), @"1"); + indexer.add_documents(&documents).unwrap(); - let addition = builder.execute().unwrap(); - insta::assert_debug_snapshot!(addition, @r###" - DocumentAdditionResult { - indexed_documents: 1, - number_of_documents: 2, - } - "###); + let (document_changes, _operation_stats, primary_key) = indexer + .into_changes(&indexer_alloc, &index.inner, &rtxn, None, &mut new_fields_ids_map) + .unwrap(); + + pool.install(|| { + indexer::index( + &mut wtxn, + &index.inner, + indexer_config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + embedders, + &|| false, + &|_| (), + ) + }) + .unwrap() + .unwrap(); wtxn.commit().unwrap(); db_snap!(index, documents, @r###" @@ -2365,10 +2886,12 @@ mod tests { .collect(); // Delete some documents. - index.delete_documents_using_wtxn( - wtxn, - external_ids.iter().map(ToString::to_string).collect(), - ); + index + .delete_documents_using_wtxn( + wtxn, + external_ids.iter().map(ToString::to_string).collect(), + ) + .unwrap(); ids_to_delete } @@ -2388,10 +2911,11 @@ mod tests { ]), ) .unwrap(); + wtxn.commit().unwrap(); + let mut wtxn = index.write_txn().unwrap(); // delete those documents, ids are synchronous therefore 0, 1, and 2. - index.delete_documents_using_wtxn(&mut wtxn, vec![S("0"), S("1"), S("2")]); - + index.delete_documents_using_wtxn(&mut wtxn, vec![S("0"), S("1"), S("2")]).unwrap(); wtxn.commit().unwrap(); // All these snapshots should be empty since the database was cleared @@ -2429,7 +2953,7 @@ mod tests { let mut wtxn = index.write_txn().unwrap(); // Delete not all of the documents but some of them. - index.delete_documents_using_wtxn(&mut wtxn, vec![S("0"), S("1")]); + index.delete_documents_using_wtxn(&mut wtxn, vec![S("0"), S("1")]).unwrap(); wtxn.commit().unwrap(); @@ -2443,14 +2967,15 @@ mod tests { let index = TempIndex::new(); let mut wtxn = index.write_txn().unwrap(); - index .update_settings_using_wtxn(&mut wtxn, |settings| { settings.set_primary_key(S("docid")); settings.set_filterable_fields(hashset! { S("label"), S("label2") }); }) .unwrap(); + wtxn.commit().unwrap(); + let mut wtxn = index.write_txn().unwrap(); index .add_documents_using_wtxn( &mut wtxn, @@ -2482,6 +3007,9 @@ mod tests { ) .unwrap(); + wtxn.commit().unwrap(); + + let mut wtxn = index.write_txn().unwrap(); delete_documents(&mut wtxn, &index, &["1_4", "1_70", "1_72"]); // Placeholder search with filter @@ -2489,8 +3017,6 @@ mod tests { let results = index.search(&wtxn).filter(filter).execute().unwrap(); assert!(results.documents_ids.is_empty()); - wtxn.commit().unwrap(); - db_snap!(index, word_docids); db_snap!(index, facet_id_f64_docids); db_snap!(index, word_pair_proximity_docids); @@ -2626,7 +3152,9 @@ mod tests { settings.set_sortable_fields(hashset!(S("_geo"))); }) .unwrap(); + wtxn.commit().unwrap(); + let mut wtxn = index.write_txn().unwrap(); index.add_documents_using_wtxn(&mut wtxn, documents!([ { "id": "1", "city": "Lille", "_geo": { "lat": 50.6299, "lng": 3.0569 } }, { "id": "2", "city": "Mons-en-Barœul", "_geo": { "lat": 50.6415, "lng": 3.1106 } }, @@ -2649,7 +3177,9 @@ mod tests { { "id": "19", "city": "Compiègne", "_geo": { "lat": 49.4449, "lng": 2.7913 } }, { "id": "20", "city": "Paris", "_geo": { "lat": 48.9021, "lng": 2.3708 } } ])).unwrap(); + wtxn.commit().unwrap(); + let mut wtxn = index.write_txn().unwrap(); let external_ids_to_delete = ["5", "6", "7", "12", "17", "19"]; let deleted_internal_ids = delete_documents(&mut wtxn, &index, &external_ids_to_delete); diff --git a/crates/milli/src/update/index_documents/parallel.rs b/crates/milli/src/update/index_documents/parallel.rs deleted file mode 100644 index 2f6bf9caf..000000000 --- a/crates/milli/src/update/index_documents/parallel.rs +++ /dev/null @@ -1,86 +0,0 @@ -use heed::types::Bytes; -use heed::{Database, RoTxn}; -use obkv::KvReaderU16; -use roaring::RoaringBitmap; - -use crate::{all_obkv_to_json, DocumentId, FieldsIdsMap, Object, ObkvCodec, Result, BEU32}; - -pub struct ImmutableObkvs<'t> { - ids: RoaringBitmap, - fields_ids_map: FieldsIdsMap, - slices: Vec<&'t [u8]>, -} - -impl<'t> ImmutableObkvs<'t> { - /// Creates the structure by fetching all the OBKVs - /// and keeping the transaction making the pointers valid. - pub fn new( - rtxn: &'t RoTxn, - documents_database: Database, - fields_ids_map: FieldsIdsMap, - subset: RoaringBitmap, - ) -> heed::Result { - let mut slices = Vec::new(); - let documents_database = documents_database.remap_data_type::(); - for docid in &subset { - let slice = documents_database.get(rtxn, &docid)?.unwrap(); - slices.push(slice); - } - - Ok(ImmutableObkvs { ids: subset, fields_ids_map, slices }) - } - - /// Returns the OBKVs identified by the given ID. - pub fn obkv(&self, docid: DocumentId) -> heed::Result> { - match self - .ids - .rank(docid) - .checked_sub(1) - .and_then(|offset| self.slices.get(offset as usize)) - { - Some(&bytes) => Ok(Some(bytes.into())), - None => Ok(None), - } - } - - /// Returns the owned rhai::Map identified by the given ID. - pub fn rhai_map(&self, docid: DocumentId) -> Result> { - let obkv = match self.obkv(docid) { - Ok(Some(obkv)) => obkv, - Ok(None) => return Ok(None), - Err(e) => return Err(e.into()), - }; - - let all_keys = obkv.iter().map(|(k, _v)| k).collect::>(); - let map: Result = all_keys - .iter() - .copied() - .flat_map(|id| obkv.get(id).map(|value| (id, value))) - .map(|(id, value)| { - let name = self.fields_ids_map.name(id).ok_or( - crate::error::FieldIdMapMissingEntry::FieldId { - field_id: id, - process: "all_obkv_to_rhaimap", - }, - )?; - let value = serde_json::from_slice(value) - .map_err(crate::error::InternalError::SerdeJson)?; - Ok((name.into(), value)) - }) - .collect(); - - map.map(Some) - } - - pub fn json_map(&self, docid: DocumentId) -> Result> { - let obkv = match self.obkv(docid) { - Ok(Some(obkv)) => obkv, - Ok(None) => return Ok(None), - Err(e) => return Err(e.into()), - }; - - all_obkv_to_json(obkv, &self.fields_ids_map).map(Some) - } -} - -unsafe impl Sync for ImmutableObkvs<'_> {} diff --git a/crates/milli/src/update/index_documents/transform.rs b/crates/milli/src/update/index_documents/transform.rs index 7f156f348..a83a99ab8 100644 --- a/crates/milli/src/update/index_documents/transform.rs +++ b/crates/milli/src/update/index_documents/transform.rs @@ -48,23 +48,8 @@ pub struct TransformOutput { /// containing all those documents. pub struct Transform<'a, 'i> { pub index: &'i Index, - fields_ids_map: FieldsIdsMap, - indexer_settings: &'a IndexerConfig, pub index_documents_method: IndexDocumentsMethod, - available_documents_ids: AvailableIds, - - // Both grenad follows the same format: - // key | value - // u32 | 1 byte for the Operation byte, the rest is the obkv of the document stored - original_sorter: grenad::Sorter, - flattened_sorter: grenad::Sorter, - - replaced_documents_ids: RoaringBitmap, - new_documents_ids: RoaringBitmap, - // To increase the cache locality and decrease the heap usage we use compact smartstring. - new_external_documents_ids_builder: FxHashMap, u64>, - documents_count: usize, } /// This enum is specific to the grenad sorter stored in the transform. @@ -75,29 +60,6 @@ pub enum Operation { Deletion, } -/// Create a mapping between the field ids found in the document batch and the one that were -/// already present in the index. -/// -/// If new fields are present in the addition, they are added to the index field ids map. -fn create_fields_mapping( - index_field_map: &mut FieldsIdsMap, - batch_field_map: &DocumentsBatchIndex, -) -> Result> { - batch_field_map - .iter() - // we sort by id here to ensure a deterministic mapping of the fields, that preserves - // the original ordering. - .sorted_by_key(|(&id, _)| id) - .map(|(field, name)| match index_field_map.id(name) { - Some(id) => Ok((*field, id)), - None => index_field_map - .insert(name) - .ok_or(Error::UserError(UserError::AttributeLimitReached)) - .map(|id| (*field, id)), - }) - .collect() -} - impl<'a, 'i> Transform<'a, 'i> { pub fn new( wtxn: &mut heed::RwTxn<'_>, @@ -138,19 +100,7 @@ impl<'a, 'i> Transform<'a, 'i> { ); let documents_ids = index.documents_ids(wtxn)?; - Ok(Transform { - index, - fields_ids_map: index.fields_ids_map(wtxn)?, - indexer_settings, - available_documents_ids: AvailableIds::new(&documents_ids), - original_sorter, - flattened_sorter, - index_documents_method, - replaced_documents_ids: RoaringBitmap::new(), - new_documents_ids: RoaringBitmap::new(), - new_external_documents_ids_builder: FxHashMap::default(), - documents_count: 0, - }) + Ok(Transform { index, indexer_settings, index_documents_method }) } // Flatten a document from the fields ids map contained in self and insert the new diff --git a/crates/milli/tests/search/facet_distribution.rs b/crates/milli/tests/search/facet_distribution.rs index 03405531d..4347e310a 100644 --- a/crates/milli/tests/search/facet_distribution.rs +++ b/crates/milli/tests/search/facet_distribution.rs @@ -1,12 +1,17 @@ use std::io::Cursor; use big_s::S; +use bumpalo::Bump; use heed::EnvOpenOptions; use maplit::hashset; -use milli::documents::{DocumentsBatchBuilder, DocumentsBatchReader}; -use milli::update::{IndexDocuments, IndexDocumentsConfig, IndexerConfig, Settings}; +use milli::documents::{mmap_from_objects, DocumentsBatchBuilder, DocumentsBatchReader}; +use milli::update::new::indexer; +use milli::update::{ + IndexDocuments, IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, Settings, +}; +use milli::vector::EmbeddingConfigs; use milli::{FacetDistribution, Index, Object, OrderBy}; -use serde_json::Deserializer; +use serde_json::{from_value, json, Deserializer}; #[test] fn test_facet_distribution_with_no_facet_values() { @@ -27,37 +32,41 @@ fn test_facet_distribution_with_no_facet_values() { // index documents let config = IndexerConfig { max_memory: Some(10 * 1024 * 1024), ..Default::default() }; - let indexing_config = IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() }; + let rtxn = index.read_txn().unwrap(); + let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); + let mut new_fields_ids_map = db_fields_ids_map.clone(); - let builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| (), || false).unwrap(); - let mut documents_builder = DocumentsBatchBuilder::new(Vec::new()); - let reader = Cursor::new( - r#"{ - "id": 123, - "title": "What a week, hu...", - "genres": [], - "tags": ["blue"] - } - { - "id": 345, - "title": "I am the pig!", - "tags": ["red"] - }"#, - ); + let embedders = EmbeddingConfigs::default(); + let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); - for result in Deserializer::from_reader(reader).into_iter::() { - let object = result.unwrap(); - documents_builder.append_json_object(&object).unwrap(); - } - - let vector = documents_builder.into_inner().unwrap(); + let doc1: Object = from_value( + json!({ "id": 123, "title": "What a week, hu...", "genres": [], "tags": ["blue"] }), + ) + .unwrap(); + let doc2: Object = + from_value(json!({ "id": 345, "title": "I am the pig!", "tags": ["red"] })).unwrap(); + let documents = mmap_from_objects(vec![doc1, doc2]); // index documents - let content = DocumentsBatchReader::from_reader(Cursor::new(vector)).unwrap(); - let (builder, user_error) = builder.add_documents(content).unwrap(); - user_error.unwrap(); - builder.execute().unwrap(); + indexer.add_documents(&documents).unwrap(); + + let indexer_alloc = Bump::new(); + let (document_changes, _operation_stats, primary_key) = + indexer.into_changes(&indexer_alloc, &index, &rtxn, None, &mut new_fields_ids_map).unwrap(); + + indexer::index( + &mut wtxn, + &index, + config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + embedders, + &|| false, + &|_| (), + ) + .unwrap(); wtxn.commit().unwrap(); diff --git a/crates/milli/tests/search/mod.rs b/crates/milli/tests/search/mod.rs index 310780e03..cfe7867e0 100644 --- a/crates/milli/tests/search/mod.rs +++ b/crates/milli/tests/search/mod.rs @@ -1,14 +1,16 @@ use std::cmp::Reverse; use std::collections::HashSet; -use std::io::Cursor; +use std::io::Write; use big_s::S; +use bumpalo::Bump; use either::{Either, Left, Right}; use heed::EnvOpenOptions; use maplit::{btreemap, hashset}; -use milli::documents::{DocumentsBatchBuilder, DocumentsBatchReader}; -use milli::update::{IndexDocuments, IndexDocumentsConfig, IndexerConfig, Settings}; -use milli::{AscDesc, Criterion, DocumentId, Index, Member, Object, TermsMatchingStrategy}; +use milli::update::new::indexer; +use milli::update::{IndexDocumentsMethod, IndexerConfig, Settings}; +use milli::vector::EmbeddingConfigs; +use milli::{AscDesc, Criterion, DocumentId, Index, Member, TermsMatchingStrategy}; use serde::{Deserialize, Deserializer}; use slice_group_by::GroupBy; @@ -34,6 +36,7 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index { let index = Index::new(options, &path).unwrap(); let mut wtxn = index.write_txn().unwrap(); + let rtxn = index.read_txn().unwrap(); let config = IndexerConfig::default(); let mut builder = Settings::new(&mut wtxn, &index, &config); @@ -61,27 +64,41 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index { // index documents let config = IndexerConfig { max_memory: Some(10 * 1024 * 1024), ..Default::default() }; - let indexing_config = IndexDocumentsConfig::default(); + let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); + let mut new_fields_ids_map = db_fields_ids_map.clone(); - let builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| (), || false).unwrap(); - let mut documents_builder = DocumentsBatchBuilder::new(Vec::new()); - let reader = Cursor::new(CONTENT.as_bytes()); + let embedders = EmbeddingConfigs::default(); + let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); - for result in serde_json::Deserializer::from_reader(reader).into_iter::() { - let object = result.unwrap(); - documents_builder.append_json_object(&object).unwrap(); - } + let mut file = tempfile::tempfile().unwrap(); + file.write_all(CONTENT.as_bytes()).unwrap(); + file.sync_all().unwrap(); - let vector = documents_builder.into_inner().unwrap(); + let payload = unsafe { memmap2::Mmap::map(&file).unwrap() }; // index documents - let content = DocumentsBatchReader::from_reader(Cursor::new(vector)).unwrap(); - let (builder, user_error) = builder.add_documents(content).unwrap(); - user_error.unwrap(); - builder.execute().unwrap(); + indexer.add_documents(&payload).unwrap(); + + let indexer_alloc = Bump::new(); + let (document_changes, _operation_stats, primary_key) = + indexer.into_changes(&indexer_alloc, &index, &rtxn, None, &mut new_fields_ids_map).unwrap(); + + indexer::index( + &mut wtxn, + &index, + config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + embedders, + &|| false, + &|_| (), + ) + .unwrap(); wtxn.commit().unwrap(); + drop(rtxn); index } diff --git a/crates/milli/tests/search/query_criteria.rs b/crates/milli/tests/search/query_criteria.rs index 65d403097..18c8b0bac 100644 --- a/crates/milli/tests/search/query_criteria.rs +++ b/crates/milli/tests/search/query_criteria.rs @@ -2,11 +2,16 @@ use std::cmp::Reverse; use std::io::Cursor; use big_s::S; +use bumpalo::Bump; use heed::EnvOpenOptions; use itertools::Itertools; use maplit::hashset; use milli::documents::{DocumentsBatchBuilder, DocumentsBatchReader}; -use milli::update::{IndexDocuments, IndexDocumentsConfig, IndexerConfig, Settings}; +use milli::update::new::indexer; +use milli::update::{ + IndexDocuments, IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, Settings, +}; +use milli::vector::EmbeddingConfigs; use milli::{AscDesc, Criterion, Index, Member, Search, SearchResult, TermsMatchingStrategy}; use rand::Rng; use Criterion::*; @@ -275,14 +280,20 @@ fn criteria_ascdesc() { }); builder.execute(|_| (), || false).unwrap(); + wtxn.commit().unwrap(); + let mut wtxn = index.write_txn().unwrap(); + let rtxn = index.read_txn().unwrap(); + // index documents let config = IndexerConfig { max_memory: Some(10 * 1024 * 1024), ..Default::default() }; - let indexing_config = IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() }; - let builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| (), || false).unwrap(); + let indexer_alloc = Bump::new(); + let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); + let mut new_fields_ids_map = db_fields_ids_map.clone(); - let mut batch_builder = DocumentsBatchBuilder::new(Vec::new()); + let embedders = EmbeddingConfigs::default(); + let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); + let mut file = tempfile::tempfile().unwrap(); (0..ASC_DESC_CANDIDATES_THRESHOLD + 1).for_each(|_| { let mut rng = rand::thread_rng(); @@ -304,15 +315,29 @@ fn criteria_ascdesc() { _ => panic!(), }; - batch_builder.append_json_object(&object).unwrap(); + serde_json::to_writer(&mut file, &object).unwrap(); }); - let vector = batch_builder.into_inner().unwrap(); + file.sync_all().unwrap(); - let reader = DocumentsBatchReader::from_reader(Cursor::new(vector)).unwrap(); - let (builder, user_error) = builder.add_documents(reader).unwrap(); - user_error.unwrap(); - builder.execute().unwrap(); + let payload = unsafe { memmap2::Mmap::map(&file).unwrap() }; + indexer.add_documents(&payload).unwrap(); + let (document_changes, _operation_stats, primary_key) = + indexer.into_changes(&indexer_alloc, &index, &rtxn, None, &mut new_fields_ids_map).unwrap(); + + indexer::index( + &mut wtxn, + &index, + config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + embedders, + &|| false, + &|_| (), + ) + .unwrap(); wtxn.commit().unwrap(); diff --git a/crates/milli/tests/search/typo_tolerance.rs b/crates/milli/tests/search/typo_tolerance.rs index 863d2758a..535c1c3d7 100644 --- a/crates/milli/tests/search/typo_tolerance.rs +++ b/crates/milli/tests/search/typo_tolerance.rs @@ -1,10 +1,15 @@ use std::collections::BTreeSet; +use bumpalo::Bump; use heed::EnvOpenOptions; -use milli::update::{IndexDocuments, IndexDocumentsConfig, IndexerConfig, Settings}; -use milli::{Criterion, Index, Search, TermsMatchingStrategy}; -use serde_json::json; +use milli::documents::mmap_from_objects; +use milli::update::new::indexer; +use milli::update::{IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, Settings}; +use milli::vector::EmbeddingConfigs; +use milli::{Criterion, Index, Object, Search, TermsMatchingStrategy}; +use serde_json::from_value; use tempfile::tempdir; +use ureq::json; use Criterion::*; #[test] @@ -106,34 +111,40 @@ fn test_typo_disabled_on_word() { options.map_size(4096 * 100); let index = Index::new(options, tmp.path()).unwrap(); - let mut builder = milli::documents::DocumentsBatchBuilder::new(Vec::new()); - let doc1 = json!({ - "id": 1usize, - "data": "zealand", - }); + let doc1: Object = from_value(json!({ "id": 1usize, "data": "zealand" })).unwrap(); + let doc2: Object = from_value(json!({ "id": 2usize, "data": "zearand" })).unwrap(); + let documents = mmap_from_objects(vec![doc1, doc2]); - let doc2 = json!({ - "id": 2usize, - "data": "zearand", - }); - - builder.append_json_object(doc1.as_object().unwrap()).unwrap(); - builder.append_json_object(doc2.as_object().unwrap()).unwrap(); - let vector = builder.into_inner().unwrap(); - - let documents = - milli::documents::DocumentsBatchReader::from_reader(std::io::Cursor::new(vector)).unwrap(); - - let mut txn = index.write_txn().unwrap(); + let mut wtxn = index.write_txn().unwrap(); + let rtxn = index.read_txn().unwrap(); let config = IndexerConfig::default(); - let indexing_config = IndexDocumentsConfig::default(); - let builder = - IndexDocuments::new(&mut txn, &index, &config, indexing_config, |_| (), || false).unwrap(); - let (builder, user_error) = builder.add_documents(documents).unwrap(); - user_error.unwrap(); - builder.execute().unwrap(); - txn.commit().unwrap(); + let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); + let mut new_fields_ids_map = db_fields_ids_map.clone(); + let embedders = EmbeddingConfigs::default(); + let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); + + indexer.add_documents(&documents).unwrap(); + + let indexer_alloc = Bump::new(); + let (document_changes, _operation_stats, primary_key) = + indexer.into_changes(&indexer_alloc, &index, &rtxn, None, &mut new_fields_ids_map).unwrap(); + + indexer::index( + &mut wtxn, + &index, + config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + embedders, + &|| false, + &|_| (), + ) + .unwrap(); + + wtxn.commit().unwrap(); // basic typo search with default typo settings {