diff --git a/milli/src/update/new/extract/faceted/extract_facets.rs b/milli/src/update/new/extract/faceted/extract_facets.rs index 9f3ed18d8..9fae1839e 100644 --- a/milli/src/update/new/extract/faceted/extract_facets.rs +++ b/milli/src/update/new/extract/faceted/extract_facets.rs @@ -54,12 +54,20 @@ impl<'extractor> Extractor<'extractor> for FacetedExtractorData<'extractor> { )))) } - fn process( + fn process<'doc>( &self, - change: DocumentChange, - context: &crate::update::new::indexer::document_changes::DocumentChangeContext, + changes: impl Iterator>>, + context: &DocumentChangeContext, ) -> Result<()> { - FacetedDocidsExtractor::extract_document_change(context, self.attributes_to_extract, change) + for change in changes { + let change = change?; + FacetedDocidsExtractor::extract_document_change( + context, + self.attributes_to_extract, + change, + )? + } + Ok(()) } } diff --git a/milli/src/update/new/extract/searchable/extract_word_docids.rs b/milli/src/update/new/extract/searchable/extract_word_docids.rs index c76ab49d0..5eb9692d6 100644 --- a/milli/src/update/new/extract/searchable/extract_word_docids.rs +++ b/milli/src/update/new/extract/searchable/extract_word_docids.rs @@ -325,12 +325,16 @@ impl<'extractor> Extractor<'extractor> for WordDocidsExtractorData<'extractor> { )))) } - fn process( + fn process<'doc>( &self, - change: DocumentChange, - context: &crate::update::new::indexer::document_changes::DocumentChangeContext, + changes: impl Iterator>>, + context: &DocumentChangeContext, ) -> Result<()> { - WordDocidsExtractors::extract_document_change(context, self.tokenizer, change) + for change in changes { + let change = change?; + WordDocidsExtractors::extract_document_change(context, self.tokenizer, change)?; + } + Ok(()) } } diff --git a/milli/src/update/new/extract/searchable/mod.rs b/milli/src/update/new/extract/searchable/mod.rs index 8934ee892..dc429b1ba 100644 --- a/milli/src/update/new/extract/searchable/mod.rs +++ b/milli/src/update/new/extract/searchable/mod.rs @@ -55,12 +55,16 @@ impl<'extractor, EX: SearchableExtractor + Sync> Extractor<'extractor> )))) } - fn process( + fn process<'doc>( &self, - change: DocumentChange, - context: &crate::update::new::indexer::document_changes::DocumentChangeContext, + changes: impl Iterator>>, + context: &DocumentChangeContext, ) -> Result<()> { - EX::extract_document_change(context, self.tokenizer, change) + for change in changes { + let change = change?; + EX::extract_document_change(context, self.tokenizer, change)?; + } + Ok(()) } } diff --git a/milli/src/update/new/indexer/document_changes.rs b/milli/src/update/new/indexer/document_changes.rs index 423ddbdcc..91c65a6d1 100644 --- a/milli/src/update/new/indexer/document_changes.rs +++ b/milli/src/update/new/indexer/document_changes.rs @@ -323,7 +323,7 @@ pub trait Extractor<'extractor>: Sync { fn process<'doc>( &'doc self, - change: DocumentChange<'doc>, + changes: impl Iterator>>, context: &'doc DocumentChangeContext, ) -> Result<()>; } @@ -332,13 +332,13 @@ pub trait DocumentChanges<'pl // lifetime of the underlying payload >: Sync { type Item: Send; - fn iter(&self) -> impl IndexedParallelIterator; + fn iter(&self, chunk_size: usize) -> impl IndexedParallelIterator>; fn item_to_document_change<'doc, // lifetime of a single `process` call T: MostlySend>( &'doc self, context: &'doc DocumentChangeContext, - item: Self::Item, + item: &'doc Self::Item, ) -> Result>> where 'pl: 'doc // the payload must survive the process calls ; } @@ -356,6 +356,8 @@ pub struct IndexingContext< pub fields_ids_map_store: &'indexer ThreadLocal>>>, } +const CHUNK_SIZE: usize = 100; + pub fn for_each_document_change< 'pl, // covariant lifetime of the underlying payload 'extractor, // invariant lifetime of extractor_alloc @@ -386,7 +388,7 @@ where extractor_alloc.0.get_mut().reset(); } - let pi = document_changes.iter(); + let pi = document_changes.iter(CHUNK_SIZE); pi.try_arc_for_each_try_init( || { DocumentChangeContext::new( @@ -400,17 +402,16 @@ where move |index_alloc| extractor.init_data(index_alloc), ) }, - |context, item| { + |context, items| { // Clean up and reuse the document-specific allocator context.doc_alloc.reset(); - let Some(change) = - document_changes.item_to_document_change(context, item).map_err(Arc::new)? - else { - return Ok(()); - }; + let items = items.as_ref(); + let changes = items.iter().filter_map(|item| { + document_changes.item_to_document_change(context, item).transpose() + }); - let res = extractor.process(change, context).map_err(Arc::new); + let res = extractor.process(changes, context).map_err(Arc::new); // send back the doc_alloc in the pool context.doc_allocs.get_or_default().0.set(std::mem::take(&mut context.doc_alloc)); diff --git a/milli/src/update/new/indexer/document_deletion.rs b/milli/src/update/new/indexer/document_deletion.rs index a9628f419..bbd2b11ac 100644 --- a/milli/src/update/new/indexer/document_deletion.rs +++ b/milli/src/update/new/indexer/document_deletion.rs @@ -1,6 +1,7 @@ use bumpalo::collections::CollectIn; use bumpalo::Bump; -use rayon::iter::{IntoParallelIterator, ParallelIterator as _}; +use rayon::iter::IndexedParallelIterator; +use rayon::slice::ParallelSlice as _; use roaring::RoaringBitmap; use super::document_changes::{DocumentChangeContext, DocumentChanges, MostlySend}; @@ -44,8 +45,11 @@ pub struct DocumentDeletionChanges<'indexer> { impl<'pl> DocumentChanges<'pl> for DocumentDeletionChanges<'pl> { type Item = DocumentId; - fn iter(&self) -> impl rayon::prelude::IndexedParallelIterator { - self.to_delete.into_par_iter().copied() + fn iter( + &self, + chunk_size: usize, + ) -> impl IndexedParallelIterator> { + self.to_delete.par_chunks(chunk_size) } fn item_to_document_change< @@ -54,12 +58,12 @@ impl<'pl> DocumentChanges<'pl> for DocumentDeletionChanges<'pl> { >( &'doc self, context: &'doc DocumentChangeContext, - docid: Self::Item, + docid: &'doc Self::Item, ) -> Result>> where 'pl: 'doc, // the payload must survive the process calls { - let current = context.index.document(&context.txn, docid)?; + let current = context.index.document(&context.txn, *docid)?; let external_document_id = self.primary_key.extract_docid_from_db( current, @@ -69,7 +73,7 @@ impl<'pl> DocumentChanges<'pl> for DocumentDeletionChanges<'pl> { let external_document_id = external_document_id.to_bump(&context.doc_alloc); - Ok(Some(DocumentChange::Deletion(Deletion::create(docid, external_document_id)))) + Ok(Some(DocumentChange::Deletion(Deletion::create(*docid, external_document_id)))) } } @@ -118,12 +122,15 @@ mod test { Ok(DeletionWithData { deleted }) } - fn process( + fn process<'doc>( &self, - change: DocumentChange, + changes: impl Iterator>>, context: &DocumentChangeContext, ) -> crate::Result<()> { - context.data.deleted.borrow_mut().insert(change.docid()); + for change in changes { + let change = change?; + context.data.deleted.borrow_mut().insert(change.docid()); + } Ok(()) } } diff --git a/milli/src/update/new/indexer/document_operation.rs b/milli/src/update/new/indexer/document_operation.rs index 57ec46a41..ee4517e20 100644 --- a/milli/src/update/new/indexer/document_operation.rs +++ b/milli/src/update/new/indexer/document_operation.rs @@ -3,6 +3,7 @@ use bumpalo::Bump; use heed::RoTxn; use memmap2::Mmap; use rayon::iter::IntoParallelIterator; +use rayon::slice::ParallelSlice; use serde_json::value::RawValue; use IndexDocumentsMethod as Idm; @@ -209,16 +210,19 @@ impl<'pl> DocumentOperation<'pl> { } impl<'pl> DocumentChanges<'pl> for DocumentOperationChanges<'pl> { - type Item = &'pl (&'pl str, ((u32, bool), &'pl [InnerDocOp<'pl>])); + type Item = (&'pl str, ((u32, bool), &'pl [InnerDocOp<'pl>])); - fn iter(&self) -> impl rayon::prelude::IndexedParallelIterator { - self.docids_version_offsets.into_par_iter() + fn iter( + &self, + chunk_size: usize, + ) -> impl rayon::prelude::IndexedParallelIterator> { + self.docids_version_offsets.par_chunks(chunk_size) } fn item_to_document_change<'doc, T: MostlySend + 'doc>( &'doc self, context: &'doc DocumentChangeContext, - item: Self::Item, + item: &'doc Self::Item, ) -> Result>> where 'pl: 'doc, diff --git a/milli/src/update/new/indexer/mod.rs b/milli/src/update/new/indexer/mod.rs index 29ff2685e..d4e6ca6a6 100644 --- a/milli/src/update/new/indexer/mod.rs +++ b/milli/src/update/new/indexer/mod.rs @@ -5,8 +5,8 @@ use std::thread::{self, Builder}; use big_s::S; use bumpalo::Bump; use document_changes::{ - for_each_document_change, DocumentChanges, Extractor, FullySend, IndexingContext, RefCellExt, - ThreadLocal, + for_each_document_change, DocumentChangeContext, DocumentChanges, Extractor, FullySend, + IndexingContext, RefCellExt, ThreadLocal, }; pub use document_deletion::DocumentDeletion; pub use document_operation::DocumentOperation; @@ -33,7 +33,7 @@ use crate::update::new::channel::ExtractorSender; use crate::update::new::words_prefix_docids::compute_exact_word_prefix_docids; use crate::update::settings::InnerIndexSettings; use crate::update::{FacetsUpdateBulk, GrenadParameters}; -use crate::{Error, FieldsIdsMap, GlobalFieldsIdsMap, Index, Result, UserError}; +use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, Result, UserError}; pub(crate) mod de; pub mod document_changes; @@ -56,10 +56,10 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentExtractor<'a> { Ok(FullySend(())) } - fn process( + fn process<'doc>( &self, - change: DocumentChange, - context: &document_changes::DocumentChangeContext, + changes: impl Iterator>>, + context: &DocumentChangeContext, ) -> Result<()> { let mut document_buffer = Vec::new(); @@ -67,32 +67,36 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentExtractor<'a> { let new_fields_ids_map = &*new_fields_ids_map; let new_fields_ids_map = new_fields_ids_map.local_map(); - let external_docid = change.external_docid().to_owned(); + for change in changes { + let change = change?; + let external_docid = change.external_docid().to_owned(); - // document but we need to create a function that collects and compresses documents. - match change { - DocumentChange::Deletion(deletion) => { - let docid = deletion.docid(); - self.document_sender.delete(docid, external_docid).unwrap(); - } - /// TODO: change NONE by SOME(vector) when implemented - DocumentChange::Update(update) => { - let docid = update.docid(); - let content = - update.new(&context.txn, context.index, &context.db_fields_ids_map)?; - let content = - write_to_obkv(&content, None, new_fields_ids_map, &mut document_buffer)?; - self.document_sender.insert(docid, external_docid, content.boxed()).unwrap(); - } - DocumentChange::Insertion(insertion) => { - let docid = insertion.docid(); - let content = insertion.new(); - let content = - write_to_obkv(&content, None, new_fields_ids_map, &mut document_buffer)?; - self.document_sender.insert(docid, external_docid, content.boxed()).unwrap(); - // extracted_dictionary_sender.send(self, dictionary: &[u8]); + // document but we need to create a function that collects and compresses documents. + match change { + DocumentChange::Deletion(deletion) => { + let docid = deletion.docid(); + self.document_sender.delete(docid, external_docid).unwrap(); + } + /// TODO: change NONE by SOME(vector) when implemented + DocumentChange::Update(update) => { + let docid = update.docid(); + let content = + update.new(&context.txn, context.index, &context.db_fields_ids_map)?; + let content = + write_to_obkv(&content, None, new_fields_ids_map, &mut document_buffer)?; + self.document_sender.insert(docid, external_docid, content.boxed()).unwrap(); + } + DocumentChange::Insertion(insertion) => { + let docid = insertion.docid(); + let content = insertion.new(); + let content = + write_to_obkv(&content, None, new_fields_ids_map, &mut document_buffer)?; + self.document_sender.insert(docid, external_docid, content.boxed()).unwrap(); + // extracted_dictionary_sender.send(self, dictionary: &[u8]); + } } } + Ok(()) } } diff --git a/milli/src/update/new/indexer/partial_dump.rs b/milli/src/update/new/indexer/partial_dump.rs index 10fc95a03..470dbc9d5 100644 --- a/milli/src/update/new/indexer/partial_dump.rs +++ b/milli/src/update/new/indexer/partial_dump.rs @@ -45,14 +45,17 @@ where { type Item = Box; - fn iter(&self) -> impl IndexedParallelIterator { - self.iter.clone() + fn iter( + &self, + chunk_size: usize, + ) -> impl IndexedParallelIterator> { + self.iter.clone().chunks(chunk_size) } fn item_to_document_change<'doc, T: MostlySend + 'doc>( &'doc self, context: &'doc DocumentChangeContext, - document: Self::Item, + document: &'doc Self::Item, ) -> Result>> where 'index: 'doc, diff --git a/milli/src/update/new/indexer/update_by_function.rs b/milli/src/update/new/indexer/update_by_function.rs index 826f918a4..4c65edcc3 100644 --- a/milli/src/update/new/indexer/update_by_function.rs +++ b/milli/src/update/new/indexer/update_by_function.rs @@ -1,7 +1,6 @@ -use std::collections::BTreeMap; - use raw_collections::RawMap; -use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator}; +use rayon::iter::IndexedParallelIterator; +use rayon::slice::ParallelSlice as _; use rhai::{Dynamic, Engine, OptimizationLevel, Scope, AST}; use roaring::RoaringBitmap; @@ -12,8 +11,8 @@ use crate::documents::PrimaryKey; use crate::error::{FieldIdMapMissingEntry, InternalError}; use crate::update::new::document::DocumentFromVersions; use crate::update::new::document_change::Versions; -use crate::update::new::{Deletion, DocumentChange, KvReaderFieldId, KvWriterFieldId, Update}; -use crate::{all_obkv_to_json, Error, FieldsIdsMap, GlobalFieldsIdsMap, Object, Result, UserError}; +use crate::update::new::{Deletion, DocumentChange, KvReaderFieldId, Update}; +use crate::{all_obkv_to_json, Error, FieldsIdsMap, Object, Result, UserError}; pub struct UpdateByFunction { documents: RoaringBitmap, @@ -76,14 +75,17 @@ impl UpdateByFunction { impl<'index> DocumentChanges<'index> for UpdateByFunctionChanges<'index> { type Item = u32; - fn iter(&self) -> impl IndexedParallelIterator { - self.documents.par_iter().copied() + fn iter( + &self, + chunk_size: usize, + ) -> impl IndexedParallelIterator> { + self.documents.as_slice().par_chunks(chunk_size) } fn item_to_document_change<'doc, T: MostlySend + 'doc>( &self, context: &'doc DocumentChangeContext, - docid: Self::Item, + docid: &'doc Self::Item, ) -> Result>> where 'index: 'doc, @@ -97,6 +99,8 @@ impl<'index> DocumentChanges<'index> for UpdateByFunctionChanges<'index> { .. } = context; + let docid = *docid; + // safety: Both documents *must* exists in the database as // their IDs comes from the list of documents ids. let document = index.document(txn, docid)?;