From 0fc446c62f07ce4e5802a2affc39abdcd6a0ef1a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Wed, 1 Nov 2023 10:07:03 +0100 Subject: [PATCH 1/5] Add more timing logs to the Transform --- milli/src/update/index_documents/transform.rs | 130 ++++++------------ 1 file changed, 44 insertions(+), 86 deletions(-) diff --git a/milli/src/update/index_documents/transform.rs b/milli/src/update/index_documents/transform.rs index 840bade2e..23b5c78c1 100644 --- a/milli/src/update/index_documents/transform.rs +++ b/milli/src/update/index_documents/transform.rs @@ -150,6 +150,7 @@ impl<'a, 'i> Transform<'a, 'i> { }) } + #[logging_timer::time] pub fn read_documents( &mut self, reader: EnrichedDocumentsBatchReader, @@ -162,6 +163,8 @@ impl<'a, 'i> Transform<'a, 'i> { FP: Fn(UpdateIndexingStep) + Sync, FA: Fn() -> bool + Sync, { + puffin::profile_function!(); + let (mut cursor, fields_index) = reader.into_cursor_and_fields_index(); let external_documents_ids = self.index.external_documents_ids(); let mapping = create_fields_mapping(&mut self.fields_ids_map, &fields_index)?; @@ -212,13 +215,12 @@ impl<'a, 'i> Transform<'a, 'i> { field_buffer_cache.sort_unstable_by(|(f1, _), (f2, _)| f1.cmp(f2)); // Build the new obkv document. - let mut writer = obkv::KvWriter::new(&mut obkv_buffer); + let mut writer = KvWriter::new(&mut obkv_buffer); for (k, v) in field_buffer_cache.iter() { writer.insert(*k, v)?; } let mut original_docid = None; - let docid = match self.new_external_documents_ids_builder.entry((*external_id).into()) { HEntry::Occupied(entry) => *entry.get() as u32, HEntry::Vacant(entry) => { @@ -275,24 +277,19 @@ impl<'a, 'i> Transform<'a, 'i> { &mut document_sorter_buffer, )?; self.original_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?; - match self.flatten_from_fields_ids_map(KvReader::new(base_obkv))? { - Some(flattened_obkv) => { - // we recreate our buffer with the flattened documents - document_sorter_buffer.clear(); - document_sorter_buffer.push(Operation::Addition as u8); - into_del_add_obkv( - KvReaderU16::new(&flattened_obkv), - true, - keep_original_version, - &mut document_sorter_buffer, - )?; - self.flattened_sorter - .insert(docid.to_be_bytes(), &document_sorter_buffer)? - } - None => self - .flattened_sorter - .insert(docid.to_be_bytes(), &document_sorter_buffer)?, + let base_obkv = KvReader::new(base_obkv); + if let Some(flattened_obkv) = self.flatten_from_fields_ids_map(base_obkv)? { + // we recreate our buffer with the flattened documents + document_sorter_buffer.clear(); + document_sorter_buffer.push(Operation::Addition as u8); + into_del_add_obkv( + KvReaderU16::new(&flattened_obkv), + true, + keep_original_version, + &mut document_sorter_buffer, + )?; } + self.flattened_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?; } } @@ -310,23 +307,18 @@ impl<'a, 'i> Transform<'a, 'i> { // We use the extracted/generated user id as the key for this document. self.original_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?; - match self.flatten_from_fields_ids_map(KvReader::new(&obkv_buffer))? { - Some(flattened_obkv) => { - document_sorter_buffer.clear(); - document_sorter_buffer.push(Operation::Addition as u8); - into_del_add_obkv( - KvReaderU16::new(&flattened_obkv), - false, - true, - &mut document_sorter_buffer, - )?; - self.flattened_sorter - .insert(docid.to_be_bytes(), &document_sorter_buffer)? - } - None => self - .flattened_sorter - .insert(docid.to_be_bytes(), &document_sorter_buffer)?, + let flattened_obkv = KvReader::new(&obkv_buffer); + if let Some(obkv) = self.flatten_from_fields_ids_map(flattened_obkv)? { + document_sorter_buffer.clear(); + document_sorter_buffer.push(Operation::Addition as u8); + into_del_add_obkv( + KvReaderU16::new(&obkv), + false, + true, + &mut document_sorter_buffer, + )? } + self.flattened_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?; } documents_count += 1; @@ -361,6 +353,7 @@ impl<'a, 'i> Transform<'a, 'i> { /// - If the document to remove was inserted by the `read_documents` method before but was NOT present in the db, /// it's added into the grenad to ensure we don't insert it + removed from the list of new documents ids. /// - If the document to remove was not present in either the db or the transform we do nothing. + #[logging_timer::time] pub fn remove_documents( &mut self, mut to_remove: Vec, @@ -370,6 +363,8 @@ impl<'a, 'i> Transform<'a, 'i> { where FA: Fn() -> bool + Sync, { + puffin::profile_function!(); + // there may be duplicates in the documents to remove. to_remove.sort_unstable(); to_remove.dedup(); @@ -439,24 +434,19 @@ impl<'a, 'i> Transform<'a, 'i> { self.original_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?; // flatten it and push it as to delete in the flattened_sorter - match self.flatten_from_fields_ids_map(KvReader::new(base_obkv))? { - Some(flattened_obkv) => { - // we recreate our buffer with the flattened documents - document_sorter_buffer.clear(); - document_sorter_buffer.push(Operation::Deletion as u8); - into_del_add_obkv( - KvReaderU16::new(&flattened_obkv), - true, - false, - &mut document_sorter_buffer, - )?; - self.flattened_sorter - .insert(docid.to_be_bytes(), &document_sorter_buffer)? - } - None => self - .flattened_sorter - .insert(docid.to_be_bytes(), &document_sorter_buffer)?, + let flattened_obkv = KvReader::new(base_obkv); + if let Some(obkv) = self.flatten_from_fields_ids_map(flattened_obkv)? { + // we recreate our buffer with the flattened documents + document_sorter_buffer.clear(); + document_sorter_buffer.push(Operation::Deletion as u8); + into_del_add_obkv( + KvReaderU16::new(&obkv), + true, + false, + &mut document_sorter_buffer, + )?; } + self.flattened_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?; true } @@ -591,42 +581,10 @@ impl<'a, 'i> Transform<'a, 'i> { Ok(()) } - fn remove_deleted_documents_from_field_distribution( - &self, - rtxn: &RoTxn, - field_distribution: &mut FieldDistribution, - ) -> Result<()> { - for deleted_docid in self.replaced_documents_ids.iter() { - let obkv = self.index.documents.get(rtxn, &BEU32::new(deleted_docid))?.ok_or( - InternalError::DatabaseMissingEntry { db_name: db_name::DOCUMENTS, key: None }, - )?; - - for (key, _) in obkv.iter() { - let name = - self.fields_ids_map.name(key).ok_or(FieldIdMapMissingEntry::FieldId { - field_id: key, - process: "Computing field distribution in transform.", - })?; - // We checked that the document was in the db earlier. If we can't find it it means - // there is an inconsistency between the field distribution and the field id map. - let field = - field_distribution.get_mut(name).ok_or(FieldIdMapMissingEntry::FieldId { - field_id: key, - process: "Accessing field distribution in transform.", - })?; - *field -= 1; - if *field == 0 { - // since we were able to get the field right before it's safe to unwrap here - field_distribution.remove(name).unwrap(); - } - } - } - Ok(()) - } - /// Generate the `TransformOutput` based on the given sorter that can be generated from any /// format like CSV, JSON or JSON stream. This sorter must contain a key that is the document /// id for the user side and the value must be an obkv where keys are valid fields ids. + #[logging_timer::time] pub(crate) fn output_from_sorter( self, wtxn: &mut heed::RwTxn, @@ -816,7 +774,7 @@ impl<'a, 'i> Transform<'a, 'i> { let (docid, obkv) = result?; obkv_buffer.clear(); - let mut obkv_writer = obkv::KvWriter::<_, FieldId>::new(&mut obkv_buffer); + let mut obkv_writer = KvWriter::<_, FieldId>::new(&mut obkv_buffer); // We iterate over the new `FieldsIdsMap` ids in order and construct the new obkv. for (id, name) in new_fields_ids_map.iter() { From c71b1d33ae5de96ae013e4695b13bc16263b4c3a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Wed, 1 Nov 2023 10:39:16 +0100 Subject: [PATCH 2/5] Sort entries using rayon in the transform sorters --- Cargo.lock | 5 +- milli/Cargo.toml | 3 +- milli/src/update/index_documents/transform.rs | 51 +++++++++++++------ 3 files changed, 40 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2ab2f706a..957dffbe4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1664,11 +1664,12 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "grenad" version = "0.4.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5232b2d157b7bf63d7abe1b12177039e58db2f29e377517c0cdee1578cca4c93" +source = "git+https://github.com/meilisearch/grenad?branch=parallel-sorter#eafb6ae795af6078e087edf77e7cd31a26238707" dependencies = [ "bytemuck", "byteorder", + "crossbeam-channel", + "rayon", "tempfile", ] diff --git a/milli/Cargo.toml b/milli/Cargo.toml index 68bc2d2b5..da259c65d 100644 --- a/milli/Cargo.toml +++ b/milli/Cargo.toml @@ -26,7 +26,8 @@ flatten-serde-json = { path = "../flatten-serde-json" } fst = "0.4.7" fxhash = "0.2.1" geoutils = "0.5.1" -grenad = { version = "0.4.4", default-features = false, features = [ +grenad = { git = "https://github.com/meilisearch/grenad", branch = "parallel-sorter", default-features = false, features = [ + "rayon", "tempfile", ] } heed = { git = "https://github.com/meilisearch/heed", tag = "v0.12.7", default-features = false, features = [ diff --git a/milli/src/update/index_documents/transform.rs b/milli/src/update/index_documents/transform.rs index 23b5c78c1..8d1750c49 100644 --- a/milli/src/update/index_documents/transform.rs +++ b/milli/src/update/index_documents/transform.rs @@ -114,24 +114,43 @@ impl<'a, 'i> Transform<'a, 'i> { }; // We initialize the sorter with the user indexing settings. - let original_sorter = create_sorter( - grenad::SortAlgorithm::Stable, - merge_function, - indexer_settings.chunk_compression_type, - indexer_settings.chunk_compression_level, - indexer_settings.max_nb_chunks, - indexer_settings.max_memory.map(|mem| mem / 2), - ); + let original_sorter = { + let mut builder = grenad::Sorter::builder(merge_function); + builder.chunk_compression_type(indexer_settings.chunk_compression_type); + if let Some(level) = indexer_settings.chunk_compression_level { + builder.chunk_compression_level(level); + } + if let Some(nb_chunks) = indexer_settings.max_nb_chunks { + builder.max_nb_chunks(nb_chunks); + } + if let Some(memory) = indexer_settings.max_memory.map(|mem| mem / 2) { + builder.dump_threshold(memory); + builder.allow_realloc(false); + } + builder.sort_algorithm(grenad::SortAlgorithm::Stable); + builder.sort_in_parallel(true); + builder.build() + }; // We initialize the sorter with the user indexing settings. - let flattened_sorter = create_sorter( - grenad::SortAlgorithm::Stable, - merge_function, - indexer_settings.chunk_compression_type, - indexer_settings.chunk_compression_level, - indexer_settings.max_nb_chunks, - indexer_settings.max_memory.map(|mem| mem / 2), - ); + let flattened_sorter = { + let mut builder = grenad::Sorter::builder(merge_function); + builder.chunk_compression_type(indexer_settings.chunk_compression_type); + if let Some(level) = indexer_settings.chunk_compression_level { + builder.chunk_compression_level(level); + } + if let Some(nb_chunks) = indexer_settings.max_nb_chunks { + builder.max_nb_chunks(nb_chunks); + } + if let Some(memory) = indexer_settings.max_memory.map(|mem| mem / 2) { + builder.dump_threshold(memory); + builder.allow_realloc(false); + } + builder.sort_algorithm(grenad::SortAlgorithm::Stable); + builder.sort_in_parallel(true); + builder.build() + }; + let documents_ids = index.documents_ids(wtxn)?; Ok(Transform { From e507ef593267795b4a88fde05477e58fb6948724 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Wed, 1 Nov 2023 11:06:58 +0100 Subject: [PATCH 3/5] Slow the logging down --- index-scheduler/src/batch.rs | 8 ++++---- meilisearch/src/lib.rs | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index c273d8ebb..ebdba0a8c 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -24,7 +24,7 @@ use std::fs::{self, File}; use std::io::BufWriter; use dump::IndexMetadata; -use log::{debug, error, info}; +use log::{debug, error, info, trace}; use meilisearch_types::error::Code; use meilisearch_types::heed::{RoTxn, RwTxn}; use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader}; @@ -1190,7 +1190,7 @@ impl IndexScheduler { index, indexer_config, config, - |indexing_step| debug!("update: {:?}", indexing_step), + |indexing_step| trace!("update: {:?}", indexing_step), || must_stop_processing.get(), )?; @@ -1268,7 +1268,7 @@ impl IndexScheduler { milli::update::Settings::new(index_wtxn, index, indexer_config); builder.reset_primary_key(); builder.execute( - |indexing_step| debug!("update: {:?}", indexing_step), + |indexing_step| trace!("update: {:?}", indexing_step), || must_stop_processing.clone().get(), )?; } @@ -1288,7 +1288,7 @@ impl IndexScheduler { index, indexer_config, config, - |indexing_step| debug!("update: {:?}", indexing_step), + |indexing_step| trace!("update: {:?}", indexing_step), || must_stop_processing.get(), )?; diff --git a/meilisearch/src/lib.rs b/meilisearch/src/lib.rs index 603d8ff86..16c08c6c2 100644 --- a/meilisearch/src/lib.rs +++ b/meilisearch/src/lib.rs @@ -362,7 +362,7 @@ fn import_dump( update_method: IndexDocumentsMethod::ReplaceDocuments, ..Default::default() }, - |indexing_step| log::debug!("update: {:?}", indexing_step), + |indexing_step| log::trace!("update: {:?}", indexing_step), || false, )?; From b10c060bf7e5c99d4789096d0cf15d4aa9e4fa24 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Wed, 1 Nov 2023 13:55:18 +0100 Subject: [PATCH 4/5] Cleanup TOML --- Cargo.lock | 6 +++--- milli/Cargo.toml | 5 ++--- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 957dffbe4..91fdc13be 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1663,12 +1663,12 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "grenad" -version = "0.4.4" -source = "git+https://github.com/meilisearch/grenad?branch=parallel-sorter#eafb6ae795af6078e087edf77e7cd31a26238707" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a007932af5475ebb5c63bef8812bb1c36f317983bb4ca663e9d6dd58d6a0f8c" dependencies = [ "bytemuck", "byteorder", - "crossbeam-channel", "rayon", "tempfile", ] diff --git a/milli/Cargo.toml b/milli/Cargo.toml index da259c65d..9cef4795b 100644 --- a/milli/Cargo.toml +++ b/milli/Cargo.toml @@ -26,9 +26,8 @@ flatten-serde-json = { path = "../flatten-serde-json" } fst = "0.4.7" fxhash = "0.2.1" geoutils = "0.5.1" -grenad = { git = "https://github.com/meilisearch/grenad", branch = "parallel-sorter", default-features = false, features = [ - "rayon", - "tempfile", +grenad = { version = "0.4.5", default-features = false, features = [ + "rayon", "tempfile" ] } heed = { git = "https://github.com/meilisearch/heed", tag = "v0.12.7", default-features = false, features = [ "lmdb", "read-txn-no-tls" From 4d864f0702578e6540207c1472992fab06d63b15 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Thu, 2 Nov 2023 14:47:43 +0100 Subject: [PATCH 5/5] Always sort internal Sorter entries in parallel --- .../index_documents/helpers/grenad_helpers.rs | 1 + milli/src/update/index_documents/transform.rs | 51 ++++++------------- 2 files changed, 17 insertions(+), 35 deletions(-) diff --git a/milli/src/update/index_documents/helpers/grenad_helpers.rs b/milli/src/update/index_documents/helpers/grenad_helpers.rs index cc0ccb609..03a3d6f5f 100644 --- a/milli/src/update/index_documents/helpers/grenad_helpers.rs +++ b/milli/src/update/index_documents/helpers/grenad_helpers.rs @@ -47,6 +47,7 @@ pub fn create_sorter( builder.allow_realloc(false); } builder.sort_algorithm(sort_algorithm); + builder.sort_in_parallel(true); builder.build() } diff --git a/milli/src/update/index_documents/transform.rs b/milli/src/update/index_documents/transform.rs index 8d1750c49..23b5c78c1 100644 --- a/milli/src/update/index_documents/transform.rs +++ b/milli/src/update/index_documents/transform.rs @@ -114,43 +114,24 @@ impl<'a, 'i> Transform<'a, 'i> { }; // We initialize the sorter with the user indexing settings. - let original_sorter = { - let mut builder = grenad::Sorter::builder(merge_function); - builder.chunk_compression_type(indexer_settings.chunk_compression_type); - if let Some(level) = indexer_settings.chunk_compression_level { - builder.chunk_compression_level(level); - } - if let Some(nb_chunks) = indexer_settings.max_nb_chunks { - builder.max_nb_chunks(nb_chunks); - } - if let Some(memory) = indexer_settings.max_memory.map(|mem| mem / 2) { - builder.dump_threshold(memory); - builder.allow_realloc(false); - } - builder.sort_algorithm(grenad::SortAlgorithm::Stable); - builder.sort_in_parallel(true); - builder.build() - }; + let original_sorter = create_sorter( + grenad::SortAlgorithm::Stable, + merge_function, + indexer_settings.chunk_compression_type, + indexer_settings.chunk_compression_level, + indexer_settings.max_nb_chunks, + indexer_settings.max_memory.map(|mem| mem / 2), + ); // We initialize the sorter with the user indexing settings. - let flattened_sorter = { - let mut builder = grenad::Sorter::builder(merge_function); - builder.chunk_compression_type(indexer_settings.chunk_compression_type); - if let Some(level) = indexer_settings.chunk_compression_level { - builder.chunk_compression_level(level); - } - if let Some(nb_chunks) = indexer_settings.max_nb_chunks { - builder.max_nb_chunks(nb_chunks); - } - if let Some(memory) = indexer_settings.max_memory.map(|mem| mem / 2) { - builder.dump_threshold(memory); - builder.allow_realloc(false); - } - builder.sort_algorithm(grenad::SortAlgorithm::Stable); - builder.sort_in_parallel(true); - builder.build() - }; - + let flattened_sorter = create_sorter( + grenad::SortAlgorithm::Stable, + merge_function, + indexer_settings.chunk_compression_type, + indexer_settings.chunk_compression_level, + indexer_settings.max_nb_chunks, + indexer_settings.max_memory.map(|mem| mem / 2), + ); let documents_ids = index.documents_ids(wtxn)?; Ok(Transform {