diff --git a/Cargo.lock b/Cargo.lock index 2ab2f706a..91fdc13be 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1663,12 +1663,13 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "grenad" -version = "0.4.4" +version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5232b2d157b7bf63d7abe1b12177039e58db2f29e377517c0cdee1578cca4c93" +checksum = "6a007932af5475ebb5c63bef8812bb1c36f317983bb4ca663e9d6dd58d6a0f8c" dependencies = [ "bytemuck", "byteorder", + "rayon", "tempfile", ] diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index c273d8ebb..ebdba0a8c 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -24,7 +24,7 @@ use std::fs::{self, File}; use std::io::BufWriter; use dump::IndexMetadata; -use log::{debug, error, info}; +use log::{debug, error, info, trace}; use meilisearch_types::error::Code; use meilisearch_types::heed::{RoTxn, RwTxn}; use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader}; @@ -1190,7 +1190,7 @@ impl IndexScheduler { index, indexer_config, config, - |indexing_step| debug!("update: {:?}", indexing_step), + |indexing_step| trace!("update: {:?}", indexing_step), || must_stop_processing.get(), )?; @@ -1268,7 +1268,7 @@ impl IndexScheduler { milli::update::Settings::new(index_wtxn, index, indexer_config); builder.reset_primary_key(); builder.execute( - |indexing_step| debug!("update: {:?}", indexing_step), + |indexing_step| trace!("update: {:?}", indexing_step), || must_stop_processing.clone().get(), )?; } @@ -1288,7 +1288,7 @@ impl IndexScheduler { index, indexer_config, config, - |indexing_step| debug!("update: {:?}", indexing_step), + |indexing_step| trace!("update: {:?}", indexing_step), || must_stop_processing.get(), )?; diff --git a/meilisearch/src/lib.rs b/meilisearch/src/lib.rs index 603d8ff86..16c08c6c2 100644 --- a/meilisearch/src/lib.rs +++ b/meilisearch/src/lib.rs @@ -362,7 +362,7 @@ fn import_dump( update_method: IndexDocumentsMethod::ReplaceDocuments, ..Default::default() }, - |indexing_step| log::debug!("update: {:?}", indexing_step), + |indexing_step| log::trace!("update: {:?}", indexing_step), || false, )?; diff --git a/milli/Cargo.toml b/milli/Cargo.toml index 68bc2d2b5..9cef4795b 100644 --- a/milli/Cargo.toml +++ b/milli/Cargo.toml @@ -26,8 +26,8 @@ flatten-serde-json = { path = "../flatten-serde-json" } fst = "0.4.7" fxhash = "0.2.1" geoutils = "0.5.1" -grenad = { version = "0.4.4", default-features = false, features = [ - "tempfile", +grenad = { version = "0.4.5", default-features = false, features = [ + "rayon", "tempfile" ] } heed = { git = "https://github.com/meilisearch/heed", tag = "v0.12.7", default-features = false, features = [ "lmdb", "read-txn-no-tls" diff --git a/milli/src/update/index_documents/helpers/grenad_helpers.rs b/milli/src/update/index_documents/helpers/grenad_helpers.rs index cc0ccb609..03a3d6f5f 100644 --- a/milli/src/update/index_documents/helpers/grenad_helpers.rs +++ b/milli/src/update/index_documents/helpers/grenad_helpers.rs @@ -47,6 +47,7 @@ pub fn create_sorter( builder.allow_realloc(false); } builder.sort_algorithm(sort_algorithm); + builder.sort_in_parallel(true); builder.build() } diff --git a/milli/src/update/index_documents/transform.rs b/milli/src/update/index_documents/transform.rs index 840bade2e..23b5c78c1 100644 --- a/milli/src/update/index_documents/transform.rs +++ b/milli/src/update/index_documents/transform.rs @@ -150,6 +150,7 @@ impl<'a, 'i> Transform<'a, 'i> { }) } + #[logging_timer::time] pub fn read_documents( &mut self, reader: EnrichedDocumentsBatchReader, @@ -162,6 +163,8 @@ impl<'a, 'i> Transform<'a, 'i> { FP: Fn(UpdateIndexingStep) + Sync, FA: Fn() -> bool + Sync, { + puffin::profile_function!(); + let (mut cursor, fields_index) = reader.into_cursor_and_fields_index(); let external_documents_ids = self.index.external_documents_ids(); let mapping = create_fields_mapping(&mut self.fields_ids_map, &fields_index)?; @@ -212,13 +215,12 @@ impl<'a, 'i> Transform<'a, 'i> { field_buffer_cache.sort_unstable_by(|(f1, _), (f2, _)| f1.cmp(f2)); // Build the new obkv document. - let mut writer = obkv::KvWriter::new(&mut obkv_buffer); + let mut writer = KvWriter::new(&mut obkv_buffer); for (k, v) in field_buffer_cache.iter() { writer.insert(*k, v)?; } let mut original_docid = None; - let docid = match self.new_external_documents_ids_builder.entry((*external_id).into()) { HEntry::Occupied(entry) => *entry.get() as u32, HEntry::Vacant(entry) => { @@ -275,24 +277,19 @@ impl<'a, 'i> Transform<'a, 'i> { &mut document_sorter_buffer, )?; self.original_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?; - match self.flatten_from_fields_ids_map(KvReader::new(base_obkv))? { - Some(flattened_obkv) => { - // we recreate our buffer with the flattened documents - document_sorter_buffer.clear(); - document_sorter_buffer.push(Operation::Addition as u8); - into_del_add_obkv( - KvReaderU16::new(&flattened_obkv), - true, - keep_original_version, - &mut document_sorter_buffer, - )?; - self.flattened_sorter - .insert(docid.to_be_bytes(), &document_sorter_buffer)? - } - None => self - .flattened_sorter - .insert(docid.to_be_bytes(), &document_sorter_buffer)?, + let base_obkv = KvReader::new(base_obkv); + if let Some(flattened_obkv) = self.flatten_from_fields_ids_map(base_obkv)? { + // we recreate our buffer with the flattened documents + document_sorter_buffer.clear(); + document_sorter_buffer.push(Operation::Addition as u8); + into_del_add_obkv( + KvReaderU16::new(&flattened_obkv), + true, + keep_original_version, + &mut document_sorter_buffer, + )?; } + self.flattened_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?; } } @@ -310,23 +307,18 @@ impl<'a, 'i> Transform<'a, 'i> { // We use the extracted/generated user id as the key for this document. self.original_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?; - match self.flatten_from_fields_ids_map(KvReader::new(&obkv_buffer))? { - Some(flattened_obkv) => { - document_sorter_buffer.clear(); - document_sorter_buffer.push(Operation::Addition as u8); - into_del_add_obkv( - KvReaderU16::new(&flattened_obkv), - false, - true, - &mut document_sorter_buffer, - )?; - self.flattened_sorter - .insert(docid.to_be_bytes(), &document_sorter_buffer)? - } - None => self - .flattened_sorter - .insert(docid.to_be_bytes(), &document_sorter_buffer)?, + let flattened_obkv = KvReader::new(&obkv_buffer); + if let Some(obkv) = self.flatten_from_fields_ids_map(flattened_obkv)? { + document_sorter_buffer.clear(); + document_sorter_buffer.push(Operation::Addition as u8); + into_del_add_obkv( + KvReaderU16::new(&obkv), + false, + true, + &mut document_sorter_buffer, + )? } + self.flattened_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?; } documents_count += 1; @@ -361,6 +353,7 @@ impl<'a, 'i> Transform<'a, 'i> { /// - If the document to remove was inserted by the `read_documents` method before but was NOT present in the db, /// it's added into the grenad to ensure we don't insert it + removed from the list of new documents ids. /// - If the document to remove was not present in either the db or the transform we do nothing. + #[logging_timer::time] pub fn remove_documents( &mut self, mut to_remove: Vec, @@ -370,6 +363,8 @@ impl<'a, 'i> Transform<'a, 'i> { where FA: Fn() -> bool + Sync, { + puffin::profile_function!(); + // there may be duplicates in the documents to remove. to_remove.sort_unstable(); to_remove.dedup(); @@ -439,24 +434,19 @@ impl<'a, 'i> Transform<'a, 'i> { self.original_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?; // flatten it and push it as to delete in the flattened_sorter - match self.flatten_from_fields_ids_map(KvReader::new(base_obkv))? { - Some(flattened_obkv) => { - // we recreate our buffer with the flattened documents - document_sorter_buffer.clear(); - document_sorter_buffer.push(Operation::Deletion as u8); - into_del_add_obkv( - KvReaderU16::new(&flattened_obkv), - true, - false, - &mut document_sorter_buffer, - )?; - self.flattened_sorter - .insert(docid.to_be_bytes(), &document_sorter_buffer)? - } - None => self - .flattened_sorter - .insert(docid.to_be_bytes(), &document_sorter_buffer)?, + let flattened_obkv = KvReader::new(base_obkv); + if let Some(obkv) = self.flatten_from_fields_ids_map(flattened_obkv)? { + // we recreate our buffer with the flattened documents + document_sorter_buffer.clear(); + document_sorter_buffer.push(Operation::Deletion as u8); + into_del_add_obkv( + KvReaderU16::new(&obkv), + true, + false, + &mut document_sorter_buffer, + )?; } + self.flattened_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?; true } @@ -591,42 +581,10 @@ impl<'a, 'i> Transform<'a, 'i> { Ok(()) } - fn remove_deleted_documents_from_field_distribution( - &self, - rtxn: &RoTxn, - field_distribution: &mut FieldDistribution, - ) -> Result<()> { - for deleted_docid in self.replaced_documents_ids.iter() { - let obkv = self.index.documents.get(rtxn, &BEU32::new(deleted_docid))?.ok_or( - InternalError::DatabaseMissingEntry { db_name: db_name::DOCUMENTS, key: None }, - )?; - - for (key, _) in obkv.iter() { - let name = - self.fields_ids_map.name(key).ok_or(FieldIdMapMissingEntry::FieldId { - field_id: key, - process: "Computing field distribution in transform.", - })?; - // We checked that the document was in the db earlier. If we can't find it it means - // there is an inconsistency between the field distribution and the field id map. - let field = - field_distribution.get_mut(name).ok_or(FieldIdMapMissingEntry::FieldId { - field_id: key, - process: "Accessing field distribution in transform.", - })?; - *field -= 1; - if *field == 0 { - // since we were able to get the field right before it's safe to unwrap here - field_distribution.remove(name).unwrap(); - } - } - } - Ok(()) - } - /// Generate the `TransformOutput` based on the given sorter that can be generated from any /// format like CSV, JSON or JSON stream. This sorter must contain a key that is the document /// id for the user side and the value must be an obkv where keys are valid fields ids. + #[logging_timer::time] pub(crate) fn output_from_sorter( self, wtxn: &mut heed::RwTxn, @@ -816,7 +774,7 @@ impl<'a, 'i> Transform<'a, 'i> { let (docid, obkv) = result?; obkv_buffer.clear(); - let mut obkv_writer = obkv::KvWriter::<_, FieldId>::new(&mut obkv_buffer); + let mut obkv_writer = KvWriter::<_, FieldId>::new(&mut obkv_buffer); // We iterate over the new `FieldsIdsMap` ids in order and construct the new obkv. for (id, name) in new_fields_ids_map.iter() {