Add chunking

This commit is contained in:
Louis Dureuil 2024-10-16 15:44:04 +02:00
parent c1fcb2ebc6
commit cd378e5bd2
No known key found for this signature in database
9 changed files with 115 additions and 76 deletions

View File

@ -54,12 +54,20 @@ impl<'extractor> Extractor<'extractor> for FacetedExtractorData<'extractor> {
))))
}
fn process(
fn process<'doc>(
&self,
change: DocumentChange,
context: &crate::update::new::indexer::document_changes::DocumentChangeContext<Self::Data>,
changes: impl Iterator<Item = Result<DocumentChange<'doc>>>,
context: &DocumentChangeContext<Self::Data>,
) -> Result<()> {
FacetedDocidsExtractor::extract_document_change(context, self.attributes_to_extract, change)
for change in changes {
let change = change?;
FacetedDocidsExtractor::extract_document_change(
context,
self.attributes_to_extract,
change,
)?
}
Ok(())
}
}

View File

@ -325,12 +325,16 @@ impl<'extractor> Extractor<'extractor> for WordDocidsExtractorData<'extractor> {
))))
}
fn process(
fn process<'doc>(
&self,
change: DocumentChange,
context: &crate::update::new::indexer::document_changes::DocumentChangeContext<Self::Data>,
changes: impl Iterator<Item = Result<DocumentChange<'doc>>>,
context: &DocumentChangeContext<Self::Data>,
) -> Result<()> {
WordDocidsExtractors::extract_document_change(context, self.tokenizer, change)
for change in changes {
let change = change?;
WordDocidsExtractors::extract_document_change(context, self.tokenizer, change)?;
}
Ok(())
}
}

View File

@ -55,12 +55,16 @@ impl<'extractor, EX: SearchableExtractor + Sync> Extractor<'extractor>
))))
}
fn process(
fn process<'doc>(
&self,
change: DocumentChange,
context: &crate::update::new::indexer::document_changes::DocumentChangeContext<Self::Data>,
changes: impl Iterator<Item = Result<DocumentChange<'doc>>>,
context: &DocumentChangeContext<Self::Data>,
) -> Result<()> {
EX::extract_document_change(context, self.tokenizer, change)
for change in changes {
let change = change?;
EX::extract_document_change(context, self.tokenizer, change)?;
}
Ok(())
}
}

View File

@ -323,7 +323,7 @@ pub trait Extractor<'extractor>: Sync {
fn process<'doc>(
&'doc self,
change: DocumentChange<'doc>,
changes: impl Iterator<Item = Result<DocumentChange<'doc>>>,
context: &'doc DocumentChangeContext<Self::Data>,
) -> Result<()>;
}
@ -332,13 +332,13 @@ pub trait DocumentChanges<'pl // lifetime of the underlying payload
>: Sync {
type Item: Send;
fn iter(&self) -> impl IndexedParallelIterator<Item = Self::Item>;
fn iter(&self, chunk_size: usize) -> impl IndexedParallelIterator<Item = impl AsRef<[Self::Item]>>;
fn item_to_document_change<'doc, // lifetime of a single `process` call
T: MostlySend>(
&'doc self,
context: &'doc DocumentChangeContext<T>,
item: Self::Item,
item: &'doc Self::Item,
) -> Result<Option<DocumentChange<'doc>>> where 'pl: 'doc // the payload must survive the process calls
;
}
@ -356,6 +356,8 @@ pub struct IndexingContext<
pub fields_ids_map_store: &'indexer ThreadLocal<FullySend<RefCell<GlobalFieldsIdsMap<'fid>>>>,
}
const CHUNK_SIZE: usize = 100;
pub fn for_each_document_change<
'pl, // covariant lifetime of the underlying payload
'extractor, // invariant lifetime of extractor_alloc
@ -386,7 +388,7 @@ where
extractor_alloc.0.get_mut().reset();
}
let pi = document_changes.iter();
let pi = document_changes.iter(CHUNK_SIZE);
pi.try_arc_for_each_try_init(
|| {
DocumentChangeContext::new(
@ -400,17 +402,16 @@ where
move |index_alloc| extractor.init_data(index_alloc),
)
},
|context, item| {
|context, items| {
// Clean up and reuse the document-specific allocator
context.doc_alloc.reset();
let Some(change) =
document_changes.item_to_document_change(context, item).map_err(Arc::new)?
else {
return Ok(());
};
let items = items.as_ref();
let changes = items.iter().filter_map(|item| {
document_changes.item_to_document_change(context, item).transpose()
});
let res = extractor.process(change, context).map_err(Arc::new);
let res = extractor.process(changes, context).map_err(Arc::new);
// send back the doc_alloc in the pool
context.doc_allocs.get_or_default().0.set(std::mem::take(&mut context.doc_alloc));

View File

@ -1,6 +1,7 @@
use bumpalo::collections::CollectIn;
use bumpalo::Bump;
use rayon::iter::{IntoParallelIterator, ParallelIterator as _};
use rayon::iter::IndexedParallelIterator;
use rayon::slice::ParallelSlice as _;
use roaring::RoaringBitmap;
use super::document_changes::{DocumentChangeContext, DocumentChanges, MostlySend};
@ -44,8 +45,11 @@ pub struct DocumentDeletionChanges<'indexer> {
impl<'pl> DocumentChanges<'pl> for DocumentDeletionChanges<'pl> {
type Item = DocumentId;
fn iter(&self) -> impl rayon::prelude::IndexedParallelIterator<Item = Self::Item> {
self.to_delete.into_par_iter().copied()
fn iter(
&self,
chunk_size: usize,
) -> impl IndexedParallelIterator<Item = impl AsRef<[Self::Item]>> {
self.to_delete.par_chunks(chunk_size)
}
fn item_to_document_change<
@ -54,12 +58,12 @@ impl<'pl> DocumentChanges<'pl> for DocumentDeletionChanges<'pl> {
>(
&'doc self,
context: &'doc DocumentChangeContext<T>,
docid: Self::Item,
docid: &'doc Self::Item,
) -> Result<Option<DocumentChange<'doc>>>
where
'pl: 'doc, // the payload must survive the process calls
{
let current = context.index.document(&context.txn, docid)?;
let current = context.index.document(&context.txn, *docid)?;
let external_document_id = self.primary_key.extract_docid_from_db(
current,
@ -69,7 +73,7 @@ impl<'pl> DocumentChanges<'pl> for DocumentDeletionChanges<'pl> {
let external_document_id = external_document_id.to_bump(&context.doc_alloc);
Ok(Some(DocumentChange::Deletion(Deletion::create(docid, external_document_id))))
Ok(Some(DocumentChange::Deletion(Deletion::create(*docid, external_document_id))))
}
}
@ -118,12 +122,15 @@ mod test {
Ok(DeletionWithData { deleted })
}
fn process(
fn process<'doc>(
&self,
change: DocumentChange,
changes: impl Iterator<Item = crate::Result<DocumentChange<'doc>>>,
context: &DocumentChangeContext<Self::Data>,
) -> crate::Result<()> {
context.data.deleted.borrow_mut().insert(change.docid());
for change in changes {
let change = change?;
context.data.deleted.borrow_mut().insert(change.docid());
}
Ok(())
}
}

View File

@ -3,6 +3,7 @@ use bumpalo::Bump;
use heed::RoTxn;
use memmap2::Mmap;
use rayon::iter::IntoParallelIterator;
use rayon::slice::ParallelSlice;
use serde_json::value::RawValue;
use IndexDocumentsMethod as Idm;
@ -209,16 +210,19 @@ impl<'pl> DocumentOperation<'pl> {
}
impl<'pl> DocumentChanges<'pl> for DocumentOperationChanges<'pl> {
type Item = &'pl (&'pl str, ((u32, bool), &'pl [InnerDocOp<'pl>]));
type Item = (&'pl str, ((u32, bool), &'pl [InnerDocOp<'pl>]));
fn iter(&self) -> impl rayon::prelude::IndexedParallelIterator<Item = Self::Item> {
self.docids_version_offsets.into_par_iter()
fn iter(
&self,
chunk_size: usize,
) -> impl rayon::prelude::IndexedParallelIterator<Item = impl AsRef<[Self::Item]>> {
self.docids_version_offsets.par_chunks(chunk_size)
}
fn item_to_document_change<'doc, T: MostlySend + 'doc>(
&'doc self,
context: &'doc DocumentChangeContext<T>,
item: Self::Item,
item: &'doc Self::Item,
) -> Result<Option<DocumentChange<'doc>>>
where
'pl: 'doc,

View File

@ -5,8 +5,8 @@ use std::thread::{self, Builder};
use big_s::S;
use bumpalo::Bump;
use document_changes::{
for_each_document_change, DocumentChanges, Extractor, FullySend, IndexingContext, RefCellExt,
ThreadLocal,
for_each_document_change, DocumentChangeContext, DocumentChanges, Extractor, FullySend,
IndexingContext, RefCellExt, ThreadLocal,
};
pub use document_deletion::DocumentDeletion;
pub use document_operation::DocumentOperation;
@ -33,7 +33,7 @@ use crate::update::new::channel::ExtractorSender;
use crate::update::new::words_prefix_docids::compute_exact_word_prefix_docids;
use crate::update::settings::InnerIndexSettings;
use crate::update::{FacetsUpdateBulk, GrenadParameters};
use crate::{Error, FieldsIdsMap, GlobalFieldsIdsMap, Index, Result, UserError};
use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, Result, UserError};
pub(crate) mod de;
pub mod document_changes;
@ -56,10 +56,10 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentExtractor<'a> {
Ok(FullySend(()))
}
fn process(
fn process<'doc>(
&self,
change: DocumentChange,
context: &document_changes::DocumentChangeContext<Self::Data>,
changes: impl Iterator<Item = Result<DocumentChange<'doc>>>,
context: &DocumentChangeContext<Self::Data>,
) -> Result<()> {
let mut document_buffer = Vec::new();
@ -67,32 +67,36 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentExtractor<'a> {
let new_fields_ids_map = &*new_fields_ids_map;
let new_fields_ids_map = new_fields_ids_map.local_map();
let external_docid = change.external_docid().to_owned();
for change in changes {
let change = change?;
let external_docid = change.external_docid().to_owned();
// document but we need to create a function that collects and compresses documents.
match change {
DocumentChange::Deletion(deletion) => {
let docid = deletion.docid();
self.document_sender.delete(docid, external_docid).unwrap();
}
/// TODO: change NONE by SOME(vector) when implemented
DocumentChange::Update(update) => {
let docid = update.docid();
let content =
update.new(&context.txn, context.index, &context.db_fields_ids_map)?;
let content =
write_to_obkv(&content, None, new_fields_ids_map, &mut document_buffer)?;
self.document_sender.insert(docid, external_docid, content.boxed()).unwrap();
}
DocumentChange::Insertion(insertion) => {
let docid = insertion.docid();
let content = insertion.new();
let content =
write_to_obkv(&content, None, new_fields_ids_map, &mut document_buffer)?;
self.document_sender.insert(docid, external_docid, content.boxed()).unwrap();
// extracted_dictionary_sender.send(self, dictionary: &[u8]);
// document but we need to create a function that collects and compresses documents.
match change {
DocumentChange::Deletion(deletion) => {
let docid = deletion.docid();
self.document_sender.delete(docid, external_docid).unwrap();
}
/// TODO: change NONE by SOME(vector) when implemented
DocumentChange::Update(update) => {
let docid = update.docid();
let content =
update.new(&context.txn, context.index, &context.db_fields_ids_map)?;
let content =
write_to_obkv(&content, None, new_fields_ids_map, &mut document_buffer)?;
self.document_sender.insert(docid, external_docid, content.boxed()).unwrap();
}
DocumentChange::Insertion(insertion) => {
let docid = insertion.docid();
let content = insertion.new();
let content =
write_to_obkv(&content, None, new_fields_ids_map, &mut document_buffer)?;
self.document_sender.insert(docid, external_docid, content.boxed()).unwrap();
// extracted_dictionary_sender.send(self, dictionary: &[u8]);
}
}
}
Ok(())
}
}

View File

@ -45,14 +45,17 @@ where
{
type Item = Box<RawValue>;
fn iter(&self) -> impl IndexedParallelIterator<Item = Self::Item> {
self.iter.clone()
fn iter(
&self,
chunk_size: usize,
) -> impl IndexedParallelIterator<Item = impl AsRef<[Self::Item]>> {
self.iter.clone().chunks(chunk_size)
}
fn item_to_document_change<'doc, T: MostlySend + 'doc>(
&'doc self,
context: &'doc DocumentChangeContext<T>,
document: Self::Item,
document: &'doc Self::Item,
) -> Result<Option<DocumentChange<'doc>>>
where
'index: 'doc,

View File

@ -1,7 +1,6 @@
use std::collections::BTreeMap;
use raw_collections::RawMap;
use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator};
use rayon::iter::IndexedParallelIterator;
use rayon::slice::ParallelSlice as _;
use rhai::{Dynamic, Engine, OptimizationLevel, Scope, AST};
use roaring::RoaringBitmap;
@ -12,8 +11,8 @@ use crate::documents::PrimaryKey;
use crate::error::{FieldIdMapMissingEntry, InternalError};
use crate::update::new::document::DocumentFromVersions;
use crate::update::new::document_change::Versions;
use crate::update::new::{Deletion, DocumentChange, KvReaderFieldId, KvWriterFieldId, Update};
use crate::{all_obkv_to_json, Error, FieldsIdsMap, GlobalFieldsIdsMap, Object, Result, UserError};
use crate::update::new::{Deletion, DocumentChange, KvReaderFieldId, Update};
use crate::{all_obkv_to_json, Error, FieldsIdsMap, Object, Result, UserError};
pub struct UpdateByFunction {
documents: RoaringBitmap,
@ -76,14 +75,17 @@ impl UpdateByFunction {
impl<'index> DocumentChanges<'index> for UpdateByFunctionChanges<'index> {
type Item = u32;
fn iter(&self) -> impl IndexedParallelIterator<Item = Self::Item> {
self.documents.par_iter().copied()
fn iter(
&self,
chunk_size: usize,
) -> impl IndexedParallelIterator<Item = impl AsRef<[Self::Item]>> {
self.documents.as_slice().par_chunks(chunk_size)
}
fn item_to_document_change<'doc, T: MostlySend + 'doc>(
&self,
context: &'doc DocumentChangeContext<T>,
docid: Self::Item,
docid: &'doc Self::Item,
) -> Result<Option<DocumentChange<'doc>>>
where
'index: 'doc,
@ -97,6 +99,8 @@ impl<'index> DocumentChanges<'index> for UpdateByFunctionChanges<'index> {
..
} = context;
let docid = *docid;
// safety: Both documents *must* exists in the database as
// their IDs comes from the list of documents ids.
let document = index.document(txn, docid)?;