From 3cf1352ae121250bd0bee402b77f20811acedb7a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Tue, 19 Nov 2024 10:45:27 +0100 Subject: [PATCH] Fix the benchmark tests --- Cargo.lock | 28 +- crates/benchmarks/Cargo.toml | 3 + crates/benchmarks/benches/indexing.rs | 1119 ++++++++++++----- crates/benchmarks/benches/utils.rs | 155 ++- .../update/new/indexer/document_deletion.rs | 6 +- .../update/new/indexer/document_operation.rs | 6 + 6 files changed, 919 insertions(+), 398 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c04b4f48a..3965ff9b4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -494,11 +494,13 @@ name = "benchmarks" version = "1.11.0" dependencies = [ "anyhow", + "bumpalo", "bytes", "convert_case 0.6.0", "criterion", "csv", "flate2", + "memmap2", "milli", "mimalloc", "rand", @@ -506,6 +508,7 @@ dependencies = [ "reqwest", "roaring", "serde_json", + "tempfile", ] [[package]] @@ -1860,9 +1863,9 @@ dependencies = [ [[package]] name = "fastrand" -version = "2.1.0" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9fc0510504f03c51ada170672ac806f1f105a88aa97a5281117e1ddc3368e51a" +checksum = "486f806e73c5707928240ddc295403b1b93c96a02038563881c4a2fd84b81ac4" [[package]] name = "file-store" @@ -2869,9 +2872,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.155" +version = "0.2.164" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c" +checksum = "433bfe06b8c75da9b2e3fbea6e5329ff87748f0b144ef75306e674c3f6f7c13f" [[package]] name = "libgit2-sys" @@ -3255,9 +3258,9 @@ checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" [[package]] name = "linux-raw-sys" -version = "0.4.12" +version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4cd1a83af159aa67994778be9070f0ae1bd732942279cabb14f86f986a21456" +checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89" [[package]] name = "liquid" @@ -3591,9 +3594,9 @@ checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" [[package]] name = "memmap2" -version = "0.9.4" +version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe751422e4a8caa417e13c3ea66452215d7d63e19e604f4980461212f3ae1322" +checksum = "fd3f7eed9d3848f8b98834af67102b720745c4ec028fcd0aa0239277e7de374f" dependencies = [ "libc", "stable_deref_trait", @@ -4801,9 +4804,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.31" +version = "0.38.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ea3e1a662af26cd7a3ba09c0297a31af215563ecf42817c98df621387f4e949" +checksum = "d7f649912bc1495e167a6edee79151c84b1bad49748cb4f1f1167f459f6224f6" dependencies = [ "bitflags 2.6.0", "errno", @@ -5372,12 +5375,13 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.10.1" +version = "3.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85b77fafb263dd9d05cbeac119526425676db3784113aa9295c88498cbf8bff1" +checksum = "28cce251fcbc87fac86a866eeb0d6c2d536fc16d06f184bb61aeae11aa4cee0c" dependencies = [ "cfg-if", "fastrand", + "once_cell", "rustix", "windows-sys 0.52.0", ] diff --git a/crates/benchmarks/Cargo.toml b/crates/benchmarks/Cargo.toml index 7e2ff995d..eec30ea3f 100644 --- a/crates/benchmarks/Cargo.toml +++ b/crates/benchmarks/Cargo.toml @@ -12,10 +12,13 @@ license.workspace = true [dependencies] anyhow = "1.0.86" +bumpalo = "3.16.0" csv = "1.3.0" +memmap2 = "0.9.5" milli = { path = "../milli" } mimalloc = { version = "0.1.43", default-features = false } serde_json = { version = "1.0.120", features = ["preserve_order"] } +tempfile = "3.14.0" [dev-dependencies] criterion = { version = "0.5.1", features = ["html_reports"] } diff --git a/crates/benchmarks/benches/indexing.rs b/crates/benchmarks/benches/indexing.rs index 0c19b89cf..7f7ae4a74 100644 --- a/crates/benchmarks/benches/indexing.rs +++ b/crates/benchmarks/benches/indexing.rs @@ -4,9 +4,13 @@ mod utils; use std::fs::{create_dir_all, remove_dir_all}; use std::path::Path; +use bumpalo::Bump; use criterion::{criterion_group, criterion_main, Criterion}; +use milli::documents::PrimaryKey; use milli::heed::{EnvOpenOptions, RwTxn}; -use milli::update::{IndexDocuments, IndexDocumentsConfig, IndexerConfig, Settings}; +use milli::update::new::indexer; +use milli::update::{IndexDocumentsMethod, IndexerConfig, Settings}; +use milli::vector::EmbeddingConfigs; use milli::Index; use rand::seq::SliceRandom; use rand_chacha::rand_core::SeedableRng; @@ -127,23 +131,37 @@ fn indexing_songs_default(c: &mut Criterion) { }, move |index| { let config = IndexerConfig::default(); - let indexing_config = IndexDocumentsConfig::default(); let mut wtxn = index.write_txn().unwrap(); - let builder = IndexDocuments::new( + let rtxn = index.read_txn().unwrap(); + let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); + let mut new_fields_ids_map = db_fields_ids_map.clone(); + + let mut indexer = + indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); + let documents = utils::documents_from(datasets_paths::SMOL_SONGS, "csv"); + indexer.add_documents(&documents).unwrap(); + + let indexer_alloc = Bump::new(); + let (document_changes, _operation_stats, primary_key) = indexer + .into_changes(&indexer_alloc, &index, &rtxn, None, &mut new_fields_ids_map) + .unwrap(); + + indexer::index( &mut wtxn, &index, - &config, - indexing_config, - |_| (), - || false, + config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + EmbeddingConfigs::default(), + &|| false, + &|_| (), ) .unwrap(); - let documents = utils::documents_from(datasets_paths::SMOL_SONGS, "csv"); - let (builder, user_error) = builder.add_documents(documents).unwrap(); - user_error.unwrap(); - builder.execute().unwrap(); wtxn.commit().unwrap(); + drop(rtxn); index.prepare_for_closing().wait(); }, @@ -171,45 +189,73 @@ fn reindexing_songs_default(c: &mut Criterion) { ); let config = IndexerConfig::default(); - let indexing_config = IndexDocumentsConfig::default(); let mut wtxn = index.write_txn().unwrap(); - let builder = IndexDocuments::new( + let rtxn = index.read_txn().unwrap(); + let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); + let mut new_fields_ids_map = db_fields_ids_map.clone(); + + let mut indexer = + indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); + let documents = utils::documents_from(datasets_paths::SMOL_SONGS, "csv"); + indexer.add_documents(&documents).unwrap(); + + let indexer_alloc = Bump::new(); + let (document_changes, _operation_stats, primary_key) = indexer + .into_changes(&indexer_alloc, &index, &rtxn, None, &mut new_fields_ids_map) + .unwrap(); + + indexer::index( &mut wtxn, &index, - &config, - indexing_config, - |_| (), - || false, + config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + EmbeddingConfigs::default(), + &|| false, + &|_| (), ) .unwrap(); - let documents = utils::documents_from(datasets_paths::SMOL_SONGS, "csv"); - let (builder, user_error) = builder.add_documents(documents).unwrap(); - user_error.unwrap(); - builder.execute().unwrap(); wtxn.commit().unwrap(); + drop(rtxn); index }, move |index| { let config = IndexerConfig::default(); - let indexing_config = IndexDocumentsConfig::default(); let mut wtxn = index.write_txn().unwrap(); - let builder = IndexDocuments::new( + let rtxn = index.read_txn().unwrap(); + let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); + let mut new_fields_ids_map = db_fields_ids_map.clone(); + + let mut indexer = + indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); + let documents = utils::documents_from(datasets_paths::SMOL_SONGS, "csv"); + indexer.add_documents(&documents).unwrap(); + + let indexer_alloc = Bump::new(); + let (document_changes, _operation_stats, primary_key) = indexer + .into_changes(&indexer_alloc, &index, &rtxn, None, &mut new_fields_ids_map) + .unwrap(); + + indexer::index( &mut wtxn, &index, - &config, - indexing_config, - |_| (), - || false, + config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + EmbeddingConfigs::default(), + &|| false, + &|_| (), ) .unwrap(); - let documents = utils::documents_from(datasets_paths::SMOL_SONGS, "csv"); - let (builder, user_error) = builder.add_documents(documents).unwrap(); - user_error.unwrap(); - builder.execute().unwrap(); wtxn.commit().unwrap(); + drop(rtxn); index.prepare_for_closing().wait(); }, @@ -240,21 +286,36 @@ fn deleting_songs_in_batches_default(c: &mut Criterion) { // as we don't care about the time it takes. let config = IndexerConfig::default(); let mut wtxn = index.write_txn().unwrap(); - let indexing_config = IndexDocumentsConfig::default(); - let builder = IndexDocuments::new( + let rtxn = index.read_txn().unwrap(); + let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); + let mut new_fields_ids_map = db_fields_ids_map.clone(); + + let mut indexer = + indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); + let documents = utils::documents_from(datasets_paths::SMOL_SONGS, "csv"); + indexer.add_documents(&documents).unwrap(); + + let indexer_alloc = Bump::new(); + let (document_changes, _operation_stats, primary_key) = indexer + .into_changes(&indexer_alloc, &index, &rtxn, None, &mut new_fields_ids_map) + .unwrap(); + + indexer::index( &mut wtxn, &index, - &config, - indexing_config, - |_| (), - || false, + config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + EmbeddingConfigs::default(), + &|| false, + &|_| (), ) .unwrap(); - let documents = utils::documents_from(datasets_paths::SMOL_SONGS, "csv"); - let (builder, user_error) = builder.add_documents(documents).unwrap(); - user_error.unwrap(); - builder.execute().unwrap(); + wtxn.commit().unwrap(); + drop(rtxn); let count = 1250; let batch_size = 250; @@ -293,59 +354,104 @@ fn indexing_songs_in_three_batches_default(c: &mut Criterion) { // as we don't care about the time it takes. let config = IndexerConfig::default(); let mut wtxn = index.write_txn().unwrap(); - let indexing_config = IndexDocumentsConfig::default(); - let builder = IndexDocuments::new( + let rtxn = index.read_txn().unwrap(); + let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); + let mut new_fields_ids_map = db_fields_ids_map.clone(); + + let mut indexer = + indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); + let documents = utils::documents_from(datasets_paths::SMOL_SONGS_1_2, "csv"); + indexer.add_documents(&documents).unwrap(); + + let indexer_alloc = Bump::new(); + let (document_changes, _operation_stats, primary_key) = indexer + .into_changes(&indexer_alloc, &index, &rtxn, None, &mut new_fields_ids_map) + .unwrap(); + + indexer::index( &mut wtxn, &index, - &config, - indexing_config, - |_| (), - || false, + config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + EmbeddingConfigs::default(), + &|| false, + &|_| (), ) .unwrap(); - let documents = utils::documents_from(datasets_paths::SMOL_SONGS_1_2, "csv"); - let (builder, user_error) = builder.add_documents(documents).unwrap(); - user_error.unwrap(); - builder.execute().unwrap(); wtxn.commit().unwrap(); + drop(rtxn); index }, move |index| { let config = IndexerConfig::default(); - let indexing_config = IndexDocumentsConfig::default(); let mut wtxn = index.write_txn().unwrap(); - let builder = IndexDocuments::new( - &mut wtxn, - &index, - &config, - indexing_config, - |_| (), - || false, - ) - .unwrap(); - let documents = utils::documents_from(datasets_paths::SMOL_SONGS_3_4, "csv"); - let (builder, user_error) = builder.add_documents(documents).unwrap(); - user_error.unwrap(); - builder.execute().unwrap(); + let rtxn = index.read_txn().unwrap(); + let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); + let mut new_fields_ids_map = db_fields_ids_map.clone(); - let indexing_config = IndexDocumentsConfig::default(); - let builder = IndexDocuments::new( + let mut indexer = + indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); + let documents = utils::documents_from(datasets_paths::SMOL_SONGS_3_4, "csv"); + indexer.add_documents(&documents).unwrap(); + + let indexer_alloc = Bump::new(); + let (document_changes, _operation_stats, primary_key) = indexer + .into_changes(&indexer_alloc, &index, &rtxn, None, &mut new_fields_ids_map) + .unwrap(); + + indexer::index( &mut wtxn, &index, - &config, - indexing_config, - |_| (), - || false, + config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + EmbeddingConfigs::default(), + &|| false, + &|_| (), ) .unwrap(); - let documents = utils::documents_from(datasets_paths::SMOL_SONGS_4_4, "csv"); - let (builder, user_error) = builder.add_documents(documents).unwrap(); - user_error.unwrap(); - builder.execute().unwrap(); wtxn.commit().unwrap(); + drop(rtxn); + + let mut wtxn = index.write_txn().unwrap(); + let rtxn = index.read_txn().unwrap(); + let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); + let mut new_fields_ids_map = db_fields_ids_map.clone(); + + let mut indexer = + indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); + let documents = utils::documents_from(datasets_paths::SMOL_SONGS_4_4, "csv"); + indexer.add_documents(&documents).unwrap(); + + let indexer_alloc = Bump::new(); + let (document_changes, _operation_stats, primary_key) = indexer + .into_changes(&indexer_alloc, &index, &rtxn, None, &mut new_fields_ids_map) + .unwrap(); + + indexer::index( + &mut wtxn, + &index, + config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + EmbeddingConfigs::default(), + &|| false, + &|_| (), + ) + .unwrap(); + + wtxn.commit().unwrap(); + drop(rtxn); index.prepare_for_closing().wait(); }, @@ -373,24 +479,38 @@ fn indexing_songs_without_faceted_numbers(c: &mut Criterion) { }, move |index| { let config = IndexerConfig::default(); - let indexing_config = IndexDocumentsConfig::default(); let mut wtxn = index.write_txn().unwrap(); - let builder = IndexDocuments::new( + let rtxn = index.read_txn().unwrap(); + let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); + let mut new_fields_ids_map = db_fields_ids_map.clone(); + + let mut indexer = + indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); + let documents = utils::documents_from(datasets_paths::SMOL_SONGS, "csv"); + + indexer.add_documents(&documents).unwrap(); + + let indexer_alloc = Bump::new(); + let (document_changes, _operation_stats, primary_key) = indexer + .into_changes(&indexer_alloc, &index, &rtxn, None, &mut new_fields_ids_map) + .unwrap(); + + indexer::index( &mut wtxn, &index, - &config, - indexing_config, - |_| (), - || false, + config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + EmbeddingConfigs::default(), + &|| false, + &|_| (), ) .unwrap(); - let documents = utils::documents_from(datasets_paths::SMOL_SONGS, "csv"); - - let (builder, user_error) = builder.add_documents(documents).unwrap(); - user_error.unwrap(); - builder.execute().unwrap(); wtxn.commit().unwrap(); + drop(rtxn); index.prepare_for_closing().wait(); }, @@ -418,23 +538,37 @@ fn indexing_songs_without_faceted_fields(c: &mut Criterion) { }, move |index| { let config = IndexerConfig::default(); - let indexing_config = IndexDocumentsConfig::default(); let mut wtxn = index.write_txn().unwrap(); - let builder = IndexDocuments::new( + let rtxn = index.read_txn().unwrap(); + let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); + let mut new_fields_ids_map = db_fields_ids_map.clone(); + + let mut indexer = + indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); + let documents = utils::documents_from(datasets_paths::SMOL_SONGS, "csv"); + indexer.add_documents(&documents).unwrap(); + + let indexer_alloc = Bump::new(); + let (document_changes, _operation_stats, primary_key) = indexer + .into_changes(&indexer_alloc, &index, &rtxn, None, &mut new_fields_ids_map) + .unwrap(); + + indexer::index( &mut wtxn, &index, - &config, - indexing_config, - |_| (), - || false, + config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + EmbeddingConfigs::default(), + &|| false, + &|_| (), ) .unwrap(); - let documents = utils::documents_from(datasets_paths::SMOL_SONGS, "csv"); - let (builder, user_error) = builder.add_documents(documents).unwrap(); - user_error.unwrap(); - builder.execute().unwrap(); wtxn.commit().unwrap(); + drop(rtxn); index.prepare_for_closing().wait(); }, @@ -462,24 +596,37 @@ fn indexing_wiki(c: &mut Criterion) { }, move |index| { let config = IndexerConfig::default(); - let indexing_config = - IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() }; let mut wtxn = index.write_txn().unwrap(); - let builder = IndexDocuments::new( + let rtxn = index.read_txn().unwrap(); + let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); + let mut new_fields_ids_map = db_fields_ids_map.clone(); + + let mut indexer = + indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); + let documents = utils::documents_from(datasets_paths::SMOL_WIKI_ARTICLES, "csv"); + indexer.add_documents(&documents).unwrap(); + + let indexer_alloc = Bump::new(); + let (document_changes, _operation_stats, primary_key) = indexer + .into_changes(&indexer_alloc, &index, &rtxn, None, &mut new_fields_ids_map) + .unwrap(); + + indexer::index( &mut wtxn, &index, - &config, - indexing_config, - |_| (), - || false, + config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + EmbeddingConfigs::default(), + &|| false, + &|_| (), ) .unwrap(); - let documents = utils::documents_from(datasets_paths::SMOL_WIKI_ARTICLES, "csv"); - let (builder, user_error) = builder.add_documents(documents).unwrap(); - user_error.unwrap(); - builder.execute().unwrap(); wtxn.commit().unwrap(); + drop(rtxn); index.prepare_for_closing().wait(); }, @@ -506,47 +653,73 @@ fn reindexing_wiki(c: &mut Criterion) { ); let config = IndexerConfig::default(); - let indexing_config = - IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() }; let mut wtxn = index.write_txn().unwrap(); - let builder = IndexDocuments::new( + let rtxn = index.read_txn().unwrap(); + let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); + let mut new_fields_ids_map = db_fields_ids_map.clone(); + + let mut indexer = + indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); + let documents = utils::documents_from(datasets_paths::SMOL_WIKI_ARTICLES, "csv"); + indexer.add_documents(&documents).unwrap(); + + let indexer_alloc = Bump::new(); + let (document_changes, _operation_stats, primary_key) = indexer + .into_changes(&indexer_alloc, &index, &rtxn, None, &mut new_fields_ids_map) + .unwrap(); + + indexer::index( &mut wtxn, &index, - &config, - indexing_config, - |_| (), - || false, + config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + EmbeddingConfigs::default(), + &|| false, + &|_| (), ) .unwrap(); - let documents = utils::documents_from(datasets_paths::SMOL_WIKI_ARTICLES, "csv"); - let (builder, user_error) = builder.add_documents(documents).unwrap(); - user_error.unwrap(); - builder.execute().unwrap(); wtxn.commit().unwrap(); + drop(rtxn); index }, move |index| { let config = IndexerConfig::default(); - let indexing_config = - IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() }; let mut wtxn = index.write_txn().unwrap(); - let builder = IndexDocuments::new( + let rtxn = index.read_txn().unwrap(); + let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); + let mut new_fields_ids_map = db_fields_ids_map.clone(); + + let mut indexer = + indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); + let documents = utils::documents_from(datasets_paths::SMOL_WIKI_ARTICLES, "csv"); + indexer.add_documents(&documents).unwrap(); + + let indexer_alloc = Bump::new(); + let (document_changes, _operation_stats, primary_key) = indexer + .into_changes(&indexer_alloc, &index, &rtxn, None, &mut new_fields_ids_map) + .unwrap(); + + indexer::index( &mut wtxn, &index, - &config, - indexing_config, - |_| (), - || false, + config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + EmbeddingConfigs::default(), + &|| false, + &|_| (), ) .unwrap(); - let documents = utils::documents_from(datasets_paths::SMOL_WIKI_ARTICLES, "csv"); - let (builder, user_error) = builder.add_documents(documents).unwrap(); - user_error.unwrap(); - builder.execute().unwrap(); wtxn.commit().unwrap(); + drop(rtxn); index.prepare_for_closing().wait(); }, @@ -576,22 +749,36 @@ fn deleting_wiki_in_batches_default(c: &mut Criterion) { // as we don't care about the time it takes. let config = IndexerConfig::default(); let mut wtxn = index.write_txn().unwrap(); - let indexing_config = - IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() }; - let builder = IndexDocuments::new( + let rtxn = index.read_txn().unwrap(); + let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); + let mut new_fields_ids_map = db_fields_ids_map.clone(); + + let mut indexer = + indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); + let documents = utils::documents_from(datasets_paths::SMOL_WIKI_ARTICLES, "csv"); + indexer.add_documents(&documents).unwrap(); + + let indexer_alloc = Bump::new(); + let (document_changes, _operation_stats, primary_key) = indexer + .into_changes(&indexer_alloc, &index, &rtxn, None, &mut new_fields_ids_map) + .unwrap(); + + indexer::index( &mut wtxn, &index, - &config, - indexing_config, - |_| (), - || false, + config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + EmbeddingConfigs::default(), + &|| false, + &|_| (), ) .unwrap(); - let documents = utils::documents_from(datasets_paths::SMOL_WIKI_ARTICLES, "csv"); - let (builder, user_error) = builder.add_documents(documents).unwrap(); - user_error.unwrap(); - builder.execute().unwrap(); + wtxn.commit().unwrap(); + drop(rtxn); let count = 1250; let batch_size = 250; @@ -625,72 +812,111 @@ fn indexing_wiki_in_three_batches(c: &mut Criterion) { &sortable_fields, ); - let mut wtxn = index.write_txn().unwrap(); - // We index only one half of the dataset in the setup part // as we don't care about the time it takes. let config = IndexerConfig::default(); - let indexing_config = - IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() }; - let builder = IndexDocuments::new( - &mut wtxn, - &index, - &config, - indexing_config, - |_| (), - || false, - ) - .unwrap(); + let mut wtxn = index.write_txn().unwrap(); + let rtxn = index.read_txn().unwrap(); + let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); + let mut new_fields_ids_map = db_fields_ids_map.clone(); + + let mut indexer = + indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); let documents = utils::documents_from(datasets_paths::SMOL_WIKI_ARTICLES_1_2, "csv"); - let (builder, user_error) = builder.add_documents(documents).unwrap(); - user_error.unwrap(); - builder.execute().unwrap(); + indexer.add_documents(&documents).unwrap(); + + let indexer_alloc = Bump::new(); + let (document_changes, _operation_stats, primary_key) = indexer + .into_changes(&indexer_alloc, &index, &rtxn, None, &mut new_fields_ids_map) + .unwrap(); + + indexer::index( + &mut wtxn, + &index, + config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + EmbeddingConfigs::default(), + &|| false, + &|_| (), + ) + .unwrap(); wtxn.commit().unwrap(); + drop(rtxn); index }, move |index| { let config = IndexerConfig::default(); - let indexing_config = - IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() }; let mut wtxn = index.write_txn().unwrap(); - let builder = IndexDocuments::new( - &mut wtxn, - &index, - &config, - indexing_config, - |_| (), - || false, - ) - .unwrap(); + let rtxn = index.read_txn().unwrap(); + let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); + let mut new_fields_ids_map = db_fields_ids_map.clone(); + let mut indexer = + indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); let documents = utils::documents_from(datasets_paths::SMOL_WIKI_ARTICLES_3_4, "csv"); - let (builder, user_error) = builder.add_documents(documents).unwrap(); - user_error.unwrap(); - builder.execute().unwrap(); + indexer.add_documents(&documents).unwrap(); - let indexing_config = - IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() }; - let builder = IndexDocuments::new( + let indexer_alloc = Bump::new(); + let (document_changes, _operation_stats, primary_key) = indexer + .into_changes(&indexer_alloc, &index, &rtxn, None, &mut new_fields_ids_map) + .unwrap(); + + indexer::index( &mut wtxn, &index, - &config, - indexing_config, - |_| (), - || false, + config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + EmbeddingConfigs::default(), + &|| false, + &|_| (), ) .unwrap(); + wtxn.commit().unwrap(); + drop(rtxn); + + let mut wtxn = index.write_txn().unwrap(); + let rtxn = index.read_txn().unwrap(); + let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); + let mut new_fields_ids_map = db_fields_ids_map.clone(); + + let mut indexer = + indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); let documents = utils::documents_from(datasets_paths::SMOL_WIKI_ARTICLES_4_4, "csv"); - let (builder, user_error) = builder.add_documents(documents).unwrap(); - user_error.unwrap(); - builder.execute().unwrap(); + indexer.add_documents(&documents).unwrap(); + + let indexer_alloc = Bump::new(); + let (document_changes, _operation_stats, primary_key) = indexer + .into_changes(&indexer_alloc, &index, &rtxn, None, &mut new_fields_ids_map) + .unwrap(); + + indexer::index( + &mut wtxn, + &index, + config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + EmbeddingConfigs::default(), + &|| false, + &|_| (), + ) + .unwrap(); wtxn.commit().unwrap(); + drop(rtxn); index.prepare_for_closing().wait(); }, @@ -718,23 +944,37 @@ fn indexing_movies_default(c: &mut Criterion) { }, move |index| { let config = IndexerConfig::default(); - let indexing_config = IndexDocumentsConfig::default(); let mut wtxn = index.write_txn().unwrap(); - let builder = IndexDocuments::new( + let rtxn = index.read_txn().unwrap(); + let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); + let mut new_fields_ids_map = db_fields_ids_map.clone(); + + let mut indexer = + indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); + let documents = utils::documents_from(datasets_paths::MOVIES, "json"); + indexer.add_documents(&documents).unwrap(); + + let indexer_alloc = Bump::new(); + let (document_changes, _operation_stats, primary_key) = indexer + .into_changes(&indexer_alloc, &index, &rtxn, None, &mut new_fields_ids_map) + .unwrap(); + + indexer::index( &mut wtxn, &index, - &config, - indexing_config, - |_| (), - || false, + config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + EmbeddingConfigs::default(), + &|| false, + &|_| (), ) .unwrap(); - let documents = utils::documents_from(datasets_paths::MOVIES, "json"); - let (builder, user_error) = builder.add_documents(documents).unwrap(); - user_error.unwrap(); - builder.execute().unwrap(); wtxn.commit().unwrap(); + drop(rtxn); index.prepare_for_closing().wait(); }, @@ -761,45 +1001,73 @@ fn reindexing_movies_default(c: &mut Criterion) { ); let config = IndexerConfig::default(); - let indexing_config = IndexDocumentsConfig::default(); let mut wtxn = index.write_txn().unwrap(); - let builder = IndexDocuments::new( + let rtxn = index.read_txn().unwrap(); + let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); + let mut new_fields_ids_map = db_fields_ids_map.clone(); + + let mut indexer = + indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); + let documents = utils::documents_from(datasets_paths::MOVIES, "json"); + indexer.add_documents(&documents).unwrap(); + + let indexer_alloc = Bump::new(); + let (document_changes, _operation_stats, primary_key) = indexer + .into_changes(&indexer_alloc, &index, &rtxn, None, &mut new_fields_ids_map) + .unwrap(); + + indexer::index( &mut wtxn, &index, - &config, - indexing_config, - |_| (), - || false, + config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + EmbeddingConfigs::default(), + &|| false, + &|_| (), ) .unwrap(); - let documents = utils::documents_from(datasets_paths::MOVIES, "json"); - let (builder, user_error) = builder.add_documents(documents).unwrap(); - user_error.unwrap(); - builder.execute().unwrap(); wtxn.commit().unwrap(); + drop(rtxn); index }, move |index| { let config = IndexerConfig::default(); - let indexing_config = IndexDocumentsConfig::default(); let mut wtxn = index.write_txn().unwrap(); - let builder = IndexDocuments::new( + let rtxn = index.read_txn().unwrap(); + let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); + let mut new_fields_ids_map = db_fields_ids_map.clone(); + + let mut indexer = + indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); + let documents = utils::documents_from(datasets_paths::MOVIES, "json"); + indexer.add_documents(&documents).unwrap(); + + let indexer_alloc = Bump::new(); + let (document_changes, _operation_stats, primary_key) = indexer + .into_changes(&indexer_alloc, &index, &rtxn, None, &mut new_fields_ids_map) + .unwrap(); + + indexer::index( &mut wtxn, &index, - &config, - indexing_config, - |_| (), - || false, + config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + EmbeddingConfigs::default(), + &|| false, + &|_| (), ) .unwrap(); - let documents = utils::documents_from(datasets_paths::MOVIES, "json"); - let (builder, user_error) = builder.add_documents(documents).unwrap(); - user_error.unwrap(); - builder.execute().unwrap(); wtxn.commit().unwrap(); + drop(rtxn); index.prepare_for_closing().wait(); }, @@ -829,21 +1097,36 @@ fn deleting_movies_in_batches_default(c: &mut Criterion) { // as we don't care about the time it takes. let config = IndexerConfig::default(); let mut wtxn = index.write_txn().unwrap(); - let indexing_config = IndexDocumentsConfig::default(); - let builder = IndexDocuments::new( + let rtxn = index.read_txn().unwrap(); + let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); + let mut new_fields_ids_map = db_fields_ids_map.clone(); + + let mut indexer = + indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); + let documents = utils::documents_from(datasets_paths::MOVIES, "json"); + indexer.add_documents(&documents).unwrap(); + + let indexer_alloc = Bump::new(); + let (document_changes, _operation_stats, primary_key) = indexer + .into_changes(&indexer_alloc, &index, &rtxn, None, &mut new_fields_ids_map) + .unwrap(); + + indexer::index( &mut wtxn, &index, - &config, - indexing_config, - |_| (), - || false, + config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + EmbeddingConfigs::default(), + &|| false, + &|_| (), ) .unwrap(); - let documents = utils::documents_from(datasets_paths::MOVIES, "json"); - let (builder, user_error) = builder.add_documents(documents).unwrap(); - user_error.unwrap(); - builder.execute().unwrap(); + wtxn.commit().unwrap(); + drop(rtxn); let count = 1250; let batch_size = 250; @@ -860,21 +1143,38 @@ fn deleting_movies_in_batches_default(c: &mut Criterion) { } fn delete_documents_from_ids(index: Index, document_ids_to_delete: Vec) { - let mut wtxn = index.write_txn().unwrap(); - - let indexer_config = IndexerConfig::default(); + let config = IndexerConfig::default(); for ids in document_ids_to_delete { - let config = IndexDocumentsConfig::default(); + let mut wtxn = index.write_txn().unwrap(); + let rtxn = index.read_txn().unwrap(); + let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); + let new_fields_ids_map = db_fields_ids_map.clone(); + let primary_key = index.primary_key(&rtxn).unwrap().unwrap(); + let primary_key = PrimaryKey::new(primary_key, &db_fields_ids_map).unwrap(); - let mut builder = - IndexDocuments::new(&mut wtxn, &index, &indexer_config, config, |_| (), || false) - .unwrap(); - (builder, _) = builder.remove_documents_from_db_no_batch(&ids).unwrap(); - builder.execute().unwrap(); + let mut indexer = indexer::DocumentDeletion::new(); + indexer.delete_documents_by_docids(ids); + + let indexer_alloc = Bump::new(); + let document_changes = indexer.into_changes(&indexer_alloc, primary_key); + + indexer::index( + &mut wtxn, + &index, + config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + Some(primary_key), + &document_changes, + EmbeddingConfigs::default(), + &|| false, + &|_| (), + ) + .unwrap(); + + wtxn.commit().unwrap(); } - wtxn.commit().unwrap(); - index.prepare_for_closing().wait(); } @@ -896,66 +1196,108 @@ fn indexing_movies_in_three_batches(c: &mut Criterion) { &sortable_fields, ); - let mut wtxn = index.write_txn().unwrap(); // We index only one half of the dataset in the setup part // as we don't care about the time it takes. let config = IndexerConfig::default(); - let indexing_config = IndexDocumentsConfig::default(); - let builder = IndexDocuments::new( + let mut wtxn = index.write_txn().unwrap(); + let rtxn = index.read_txn().unwrap(); + let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); + let mut new_fields_ids_map = db_fields_ids_map.clone(); + + let mut indexer = + indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); + let documents = utils::documents_from(datasets_paths::MOVIES_1_2, "json"); + indexer.add_documents(&documents).unwrap(); + + let indexer_alloc = Bump::new(); + let (document_changes, _operation_stats, primary_key) = indexer + .into_changes(&indexer_alloc, &index, &rtxn, None, &mut new_fields_ids_map) + .unwrap(); + + indexer::index( &mut wtxn, &index, - &config, - indexing_config, - |_| (), - || false, + config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + EmbeddingConfigs::default(), + &|| false, + &|_| (), ) .unwrap(); - let documents = utils::documents_from(datasets_paths::MOVIES_1_2, "json"); - let (builder, user_error) = builder.add_documents(documents).unwrap(); - user_error.unwrap(); - builder.execute().unwrap(); - wtxn.commit().unwrap(); + drop(rtxn); index }, move |index| { let config = IndexerConfig::default(); - let indexing_config = IndexDocumentsConfig::default(); let mut wtxn = index.write_txn().unwrap(); - let builder = IndexDocuments::new( - &mut wtxn, - &index, - &config, - indexing_config, - |_| (), - || false, - ) - .unwrap(); + let rtxn = index.read_txn().unwrap(); + let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); + let mut new_fields_ids_map = db_fields_ids_map.clone(); + let mut indexer = + indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); let documents = utils::documents_from(datasets_paths::MOVIES_3_4, "json"); - let (builder, user_error) = builder.add_documents(documents).unwrap(); - user_error.unwrap(); - builder.execute().unwrap(); + indexer.add_documents(&documents).unwrap(); - let indexing_config = IndexDocumentsConfig::default(); - let builder = IndexDocuments::new( + let indexer_alloc = Bump::new(); + let (document_changes, _operation_stats, primary_key) = indexer + .into_changes(&indexer_alloc, &index, &rtxn, None, &mut new_fields_ids_map) + .unwrap(); + + indexer::index( &mut wtxn, &index, - &config, - indexing_config, - |_| (), - || false, + config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + EmbeddingConfigs::default(), + &|| false, + &|_| (), ) .unwrap(); - let documents = utils::documents_from(datasets_paths::MOVIES_4_4, "json"); - let (builder, user_error) = builder.add_documents(documents).unwrap(); - user_error.unwrap(); - builder.execute().unwrap(); - wtxn.commit().unwrap(); + drop(rtxn); + + let mut wtxn = index.write_txn().unwrap(); + let rtxn = index.read_txn().unwrap(); + let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); + let mut new_fields_ids_map = db_fields_ids_map.clone(); + + let mut indexer = + indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); + let documents = utils::documents_from(datasets_paths::MOVIES_4_4, "json"); + indexer.add_documents(&documents).unwrap(); + + let indexer_alloc = Bump::new(); + let (document_changes, _operation_stats, primary_key) = indexer + .into_changes(&indexer_alloc, &index, &rtxn, None, &mut new_fields_ids_map) + .unwrap(); + + indexer::index( + &mut wtxn, + &index, + config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + EmbeddingConfigs::default(), + &|| false, + &|_| (), + ) + .unwrap(); + + wtxn.commit().unwrap(); + drop(rtxn); index.prepare_for_closing().wait(); }, @@ -1006,23 +1348,37 @@ fn indexing_nested_movies_default(c: &mut Criterion) { }, move |index| { let config = IndexerConfig::default(); - let indexing_config = IndexDocumentsConfig::default(); let mut wtxn = index.write_txn().unwrap(); - let builder = IndexDocuments::new( + let rtxn = index.read_txn().unwrap(); + let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); + let mut new_fields_ids_map = db_fields_ids_map.clone(); + + let mut indexer = + indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); + let documents = utils::documents_from(datasets_paths::NESTED_MOVIES, "json"); + indexer.add_documents(&documents).unwrap(); + + let indexer_alloc = Bump::new(); + let (document_changes, _operation_stats, primary_key) = indexer + .into_changes(&indexer_alloc, &index, &rtxn, None, &mut new_fields_ids_map) + .unwrap(); + + indexer::index( &mut wtxn, &index, - &config, - indexing_config, - |_| (), - || false, + config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + EmbeddingConfigs::default(), + &|| false, + &|_| (), ) .unwrap(); - let documents = utils::documents_from(datasets_paths::NESTED_MOVIES, "json"); - let (builder, user_error) = builder.add_documents(documents).unwrap(); - user_error.unwrap(); - builder.execute().unwrap(); wtxn.commit().unwrap(); + drop(rtxn); index.prepare_for_closing().wait(); }, @@ -1075,21 +1431,36 @@ fn deleting_nested_movies_in_batches_default(c: &mut Criterion) { // as we don't care about the time it takes. let config = IndexerConfig::default(); let mut wtxn = index.write_txn().unwrap(); - let indexing_config = IndexDocumentsConfig::default(); - let builder = IndexDocuments::new( + let rtxn = index.read_txn().unwrap(); + let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); + let mut new_fields_ids_map = db_fields_ids_map.clone(); + + let mut indexer = + indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); + let documents = utils::documents_from(datasets_paths::NESTED_MOVIES, "json"); + indexer.add_documents(&documents).unwrap(); + + let indexer_alloc = Bump::new(); + let (document_changes, _operation_stats, primary_key) = indexer + .into_changes(&indexer_alloc, &index, &rtxn, None, &mut new_fields_ids_map) + .unwrap(); + + indexer::index( &mut wtxn, &index, - &config, - indexing_config, - |_| (), - || false, + config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + EmbeddingConfigs::default(), + &|| false, + &|_| (), ) .unwrap(); - let documents = utils::documents_from(datasets_paths::NESTED_MOVIES, "json"); - let (builder, user_error) = builder.add_documents(documents).unwrap(); - user_error.unwrap(); - builder.execute().unwrap(); + wtxn.commit().unwrap(); + drop(rtxn); let count = 1250; let batch_size = 250; @@ -1133,23 +1504,37 @@ fn indexing_nested_movies_without_faceted_fields(c: &mut Criterion) { }, move |index| { let config = IndexerConfig::default(); - let indexing_config = IndexDocumentsConfig::default(); let mut wtxn = index.write_txn().unwrap(); - let builder = IndexDocuments::new( + let rtxn = index.read_txn().unwrap(); + let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); + let mut new_fields_ids_map = db_fields_ids_map.clone(); + + let mut indexer = + indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); + let documents = utils::documents_from(datasets_paths::NESTED_MOVIES, "json"); + indexer.add_documents(&documents).unwrap(); + + let indexer_alloc = Bump::new(); + let (document_changes, _operation_stats, primary_key) = indexer + .into_changes(&indexer_alloc, &index, &rtxn, None, &mut new_fields_ids_map) + .unwrap(); + + indexer::index( &mut wtxn, &index, - &config, - indexing_config, - |_| (), - || false, + config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + EmbeddingConfigs::default(), + &|| false, + &|_| (), ) .unwrap(); - let documents = utils::documents_from(datasets_paths::NESTED_MOVIES, "json"); - let (builder, user_error) = builder.add_documents(documents).unwrap(); - user_error.unwrap(); - builder.execute().unwrap(); wtxn.commit().unwrap(); + drop(rtxn); index.prepare_for_closing().wait(); }, @@ -1177,24 +1562,37 @@ fn indexing_geo(c: &mut Criterion) { }, move |index| { let config = IndexerConfig::default(); - let indexing_config = IndexDocumentsConfig::default(); let mut wtxn = index.write_txn().unwrap(); - let builder = IndexDocuments::new( + let rtxn = index.read_txn().unwrap(); + let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); + let mut new_fields_ids_map = db_fields_ids_map.clone(); + + let mut indexer = + indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); + let documents = utils::documents_from(datasets_paths::SMOL_ALL_COUNTRIES, "jsonl"); + indexer.add_documents(&documents).unwrap(); + + let indexer_alloc = Bump::new(); + let (document_changes, _operation_stats, primary_key) = indexer + .into_changes(&indexer_alloc, &index, &rtxn, None, &mut new_fields_ids_map) + .unwrap(); + + indexer::index( &mut wtxn, &index, - &config, - indexing_config, - |_| (), - || false, + config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + EmbeddingConfigs::default(), + &|| false, + &|_| (), ) .unwrap(); - let documents = utils::documents_from(datasets_paths::SMOL_ALL_COUNTRIES, "jsonl"); - let (builder, user_error) = builder.add_documents(documents).unwrap(); - user_error.unwrap(); - builder.execute().unwrap(); - wtxn.commit().unwrap(); + drop(rtxn); index.prepare_for_closing().wait(); }, @@ -1221,47 +1619,73 @@ fn reindexing_geo(c: &mut Criterion) { ); let config = IndexerConfig::default(); - let indexing_config = IndexDocumentsConfig::default(); let mut wtxn = index.write_txn().unwrap(); - let builder = IndexDocuments::new( + let rtxn = index.read_txn().unwrap(); + let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); + let mut new_fields_ids_map = db_fields_ids_map.clone(); + + let mut indexer = + indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); + let documents = utils::documents_from(datasets_paths::SMOL_ALL_COUNTRIES, "jsonl"); + indexer.add_documents(&documents).unwrap(); + + let indexer_alloc = Bump::new(); + let (document_changes, _operation_stats, primary_key) = indexer + .into_changes(&indexer_alloc, &index, &rtxn, None, &mut new_fields_ids_map) + .unwrap(); + + indexer::index( &mut wtxn, &index, - &config, - indexing_config, - |_| (), - || false, + config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + EmbeddingConfigs::default(), + &|| false, + &|_| (), ) .unwrap(); - let documents = utils::documents_from(datasets_paths::SMOL_ALL_COUNTRIES, "jsonl"); - let (builder, user_error) = builder.add_documents(documents).unwrap(); - user_error.unwrap(); - builder.execute().unwrap(); - wtxn.commit().unwrap(); + drop(rtxn); index }, move |index| { let config = IndexerConfig::default(); - let indexing_config = IndexDocumentsConfig::default(); let mut wtxn = index.write_txn().unwrap(); - let builder = IndexDocuments::new( + let rtxn = index.read_txn().unwrap(); + let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); + let mut new_fields_ids_map = db_fields_ids_map.clone(); + + let mut indexer = + indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); + let documents = utils::documents_from(datasets_paths::SMOL_ALL_COUNTRIES, "jsonl"); + indexer.add_documents(&documents).unwrap(); + + let indexer_alloc = Bump::new(); + let (document_changes, _operation_stats, primary_key) = indexer + .into_changes(&indexer_alloc, &index, &rtxn, None, &mut new_fields_ids_map) + .unwrap(); + + indexer::index( &mut wtxn, &index, - &config, - indexing_config, - |_| (), - || false, + config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + EmbeddingConfigs::default(), + &|| false, + &|_| (), ) .unwrap(); - let documents = utils::documents_from(datasets_paths::SMOL_ALL_COUNTRIES, "jsonl"); - let (builder, user_error) = builder.add_documents(documents).unwrap(); - user_error.unwrap(); - builder.execute().unwrap(); - wtxn.commit().unwrap(); + drop(rtxn); index.prepare_for_closing().wait(); }, @@ -1291,21 +1715,36 @@ fn deleting_geo_in_batches_default(c: &mut Criterion) { // as we don't care about the time it takes. let config = IndexerConfig::default(); let mut wtxn = index.write_txn().unwrap(); - let indexing_config = IndexDocumentsConfig::default(); - let builder = IndexDocuments::new( + let rtxn = index.read_txn().unwrap(); + let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); + let mut new_fields_ids_map = db_fields_ids_map.clone(); + + let mut indexer = + indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); + let documents = utils::documents_from(datasets_paths::SMOL_ALL_COUNTRIES, "jsonl"); + indexer.add_documents(&documents).unwrap(); + + let indexer_alloc = Bump::new(); + let (document_changes, _operation_stats, primary_key) = indexer + .into_changes(&indexer_alloc, &index, &rtxn, None, &mut new_fields_ids_map) + .unwrap(); + + indexer::index( &mut wtxn, &index, - &config, - indexing_config, - |_| (), - || false, + config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + EmbeddingConfigs::default(), + &|| false, + &|_| (), ) .unwrap(); - let documents = utils::documents_from(datasets_paths::SMOL_ALL_COUNTRIES, "jsonl"); - let (builder, user_error) = builder.add_documents(documents).unwrap(); - user_error.unwrap(); - builder.execute().unwrap(); + wtxn.commit().unwrap(); + drop(rtxn); let count = 1250; let batch_size = 250; diff --git a/crates/benchmarks/benches/utils.rs b/crates/benchmarks/benches/utils.rs index 01bc454a3..defcbbfbb 100644 --- a/crates/benchmarks/benches/utils.rs +++ b/crates/benchmarks/benches/utils.rs @@ -1,17 +1,19 @@ #![allow(dead_code)] use std::fs::{create_dir_all, remove_dir_all, File}; -use std::io::{self, BufRead, BufReader, Cursor, Read, Seek}; +use std::io::{self, BufReader, BufWriter, Read}; use std::num::ParseFloatError; use std::path::Path; use std::str::FromStr; +use anyhow::Context; +use bumpalo::Bump; use criterion::BenchmarkId; -use milli::documents::{DocumentsBatchBuilder, DocumentsBatchReader}; +use memmap2::Mmap; use milli::heed::EnvOpenOptions; -use milli::update::{ - IndexDocuments, IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, Settings, -}; +use milli::update::new::indexer; +use milli::update::{IndexDocumentsMethod, IndexerConfig, Settings}; +use milli::vector::EmbeddingConfigs; use milli::{Criterion, Filter, Index, Object, TermsMatchingStrategy}; use serde_json::Value; @@ -92,18 +94,34 @@ pub fn base_setup(conf: &Conf) -> Index { let config = IndexerConfig::default(); let mut wtxn = index.write_txn().unwrap(); - let indexing_config = IndexDocumentsConfig { - autogenerate_docids: conf.primary_key.is_none(), - update_method: IndexDocumentsMethod::ReplaceDocuments, - ..Default::default() - }; - let builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| (), || false).unwrap(); + let rtxn = index.read_txn().unwrap(); + let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); + let mut new_fields_ids_map = db_fields_ids_map.clone(); + let documents = documents_from(conf.dataset, conf.dataset_format); - let (builder, user_error) = builder.add_documents(documents).unwrap(); - user_error.unwrap(); - builder.execute().unwrap(); + let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); + indexer.add_documents(&documents).unwrap(); + + let indexer_alloc = Bump::new(); + let (document_changes, _operation_stats, primary_key) = + indexer.into_changes(&indexer_alloc, &index, &rtxn, None, &mut new_fields_ids_map).unwrap(); + + indexer::index( + &mut wtxn, + &index, + config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + EmbeddingConfigs::default(), + &|| false, + &|_| (), + ) + .unwrap(); + wtxn.commit().unwrap(); + drop(rtxn); index } @@ -141,48 +159,95 @@ pub fn run_benches(c: &mut criterion::Criterion, confs: &[Conf]) { } pub fn documents_from(filename: &str, filetype: &str) -> Mmap { - let reader = File::open(filename) - .unwrap_or_else(|_| panic!("could not find the dataset in: {}", filename)); - let reader = BufReader::new(reader); - let documents = match filetype { - "csv" => documents_from_csv(reader).unwrap(), - "json" => documents_from_json(reader).unwrap(), - "jsonl" => documents_from_jsonl(reader).unwrap(), - otherwise => panic!("invalid update format {:?}", otherwise), - }; - DocumentsBatchReader::from_reader(Cursor::new(documents)).unwrap() + let file = File::open(filename) + .unwrap_or_else(|_| panic!("could not find the dataset in: {filename}")); + match filetype { + "csv" => documents_from_csv(file).unwrap(), + "json" => documents_from_json(file).unwrap(), + "jsonl" => documents_from_jsonl(file).unwrap(), + otherwise => panic!("invalid update format {otherwise:?}"), + } } -fn documents_from_jsonl(reader: impl BufRead) -> anyhow::Result> { - let mut documents = DocumentsBatchBuilder::new(Vec::new()); +fn documents_from_jsonl(file: File) -> anyhow::Result { + unsafe { Mmap::map(&file).map_err(Into::into) } +} - for result in serde_json::Deserializer::from_reader(reader).into_iter::() { - let object = result?; - documents.append_json_object(&object)?; +fn documents_from_json(file: File) -> anyhow::Result { + let reader = BufReader::new(file); + let documents: Vec = serde_json::from_reader(reader)?; + let mut output = tempfile::tempfile().map(BufWriter::new)?; + + for document in documents { + serde_json::to_writer(&mut output, &document)?; } - documents.into_inner().map_err(Into::into) + let file = output.into_inner()?; + unsafe { Mmap::map(&file).map_err(Into::into) } } -fn documents_from_json(reader: impl BufRead) -> anyhow::Result> { - let mut documents = DocumentsBatchBuilder::new(Vec::new()); +fn documents_from_csv(file: File) -> anyhow::Result { + let output = tempfile::tempfile()?; + let mut output = BufWriter::new(output); + let mut reader = csv::ReaderBuilder::new().from_reader(file); - documents.append_json_array(reader)?; + let headers = reader.headers().context("while retrieving headers")?.clone(); + let typed_fields: Vec<_> = headers.iter().map(parse_csv_header).collect(); + let mut object: serde_json::Map<_, _> = + typed_fields.iter().map(|(k, _)| (k.to_string(), Value::Null)).collect(); - documents.into_inner().map_err(Into::into) -} + let mut line = 0; + let mut record = csv::StringRecord::new(); + while reader.read_record(&mut record).context("while reading a record")? { + // We increment here and not at the end of the loop + // to take the header offset into account. + line += 1; -fn documents_from_csv(reader: impl BufRead) -> anyhow::Result> { - let csv = csv::Reader::from_reader(reader); + // Reset the document values + object.iter_mut().for_each(|(_, v)| *v = Value::Null); - let mut documents = DocumentsBatchBuilder::new(Vec::new()); - documents.append_csv(csv)?; + for (i, (name, atype)) in typed_fields.iter().enumerate() { + let value = &record[i]; + let trimmed_value = value.trim(); + let value = match atype { + AllowedType::Number if trimmed_value.is_empty() => Value::Null, + AllowedType::Number => { + match trimmed_value.parse::() { + Ok(integer) => Value::from(integer), + Err(_) => match trimmed_value.parse::() { + Ok(float) => Value::from(float), + Err(error) => { + anyhow::bail!("document format error on line {line}: {error}. For value: {value}") + } + }, + } + } + AllowedType::Boolean if trimmed_value.is_empty() => Value::Null, + AllowedType::Boolean => match trimmed_value.parse::() { + Ok(bool) => Value::from(bool), + Err(error) => { + anyhow::bail!( + "document format error on line {line}: {error}. For value: {value}" + ) + } + }, + AllowedType::String if value.is_empty() => Value::Null, + AllowedType::String => Value::from(value), + }; - documents.into_inner().map_err(Into::into) + *object.get_mut(name).expect("encountered an unknown field") = value; + } + + serde_json::to_writer(&mut output, &object).context("while writing to disk")?; + } + + let output = output.into_inner()?; + unsafe { Mmap::map(&output).map_err(Into::into) } } enum AllowedType { String, + Boolean, Number, } @@ -191,8 +256,9 @@ fn parse_csv_header(header: &str) -> (String, AllowedType) { match header.rsplit_once(':') { Some((field_name, field_type)) => match field_type { "string" => (field_name.to_string(), AllowedType::String), + "boolean" => (field_name.to_string(), AllowedType::Boolean), "number" => (field_name.to_string(), AllowedType::Number), - // we may return an error in this case. + // if the pattern isn't recognized, we keep the whole field. _otherwise => (header.to_string(), AllowedType::String), }, None => (header.to_string(), AllowedType::String), @@ -230,10 +296,13 @@ impl Iterator for CSVDocumentDeserializer { for ((field_name, field_type), value) in self.headers.iter().zip(csv_document.into_iter()) { - let parsed_value: Result = match field_type { + let parsed_value: anyhow::Result = match field_type { AllowedType::Number => { value.parse::().map(Value::from).map_err(Into::into) } + AllowedType::Boolean => { + value.parse::().map(Value::from).map_err(Into::into) + } AllowedType::String => Ok(Value::String(value.to_string())), }; diff --git a/crates/milli/src/update/new/indexer/document_deletion.rs b/crates/milli/src/update/new/indexer/document_deletion.rs index fe3f08583..518786e6f 100644 --- a/crates/milli/src/update/new/indexer/document_deletion.rs +++ b/crates/milli/src/update/new/indexer/document_deletion.rs @@ -12,7 +12,7 @@ use crate::{DocumentId, Result}; #[derive(Default)] pub struct DocumentDeletion { - pub to_delete: RoaringBitmap, + to_delete: RoaringBitmap, } impl DocumentDeletion { @@ -26,11 +26,11 @@ impl DocumentDeletion { pub fn into_changes<'indexer>( self, - indexer: &'indexer Bump, + indexer_alloc: &'indexer Bump, primary_key: PrimaryKey<'indexer>, ) -> DocumentDeletionChanges<'indexer> { let to_delete: bumpalo::collections::Vec<_> = - self.to_delete.into_iter().collect_in(indexer); + self.to_delete.into_iter().collect_in(indexer_alloc); let to_delete = to_delete.into_bump_slice(); diff --git a/crates/milli/src/update/new/indexer/document_operation.rs b/crates/milli/src/update/new/indexer/document_operation.rs index 71d410ea6..5dd897d34 100644 --- a/crates/milli/src/update/new/indexer/document_operation.rs +++ b/crates/milli/src/update/new/indexer/document_operation.rs @@ -107,6 +107,12 @@ impl<'pl> DocumentOperation<'pl> { } } +impl Default for DocumentOperation<'_> { + fn default() -> Self { + DocumentOperation::new(IndexDocumentsMethod::default()) + } +} + #[allow(clippy::too_many_arguments)] fn extract_addition_payload_changes<'r, 'pl: 'r>( indexer: &'pl Bump,