Reasonable changes

This commit is contained in:
Louis Dureuil 2025-02-26 22:16:31 +01:00
parent de2fedaa9d
commit a1f60c61e8
6 changed files with 127 additions and 115 deletions

View File

@ -1,8 +1,7 @@
use bumpalo::collections::CollectIn;
use bumpalo::Bump;
use rayon::iter::IndexedParallelIterator;
use rayon::slice::ParallelSlice as _;
use roaring::RoaringBitmap;
use scoped_thread_pool::PartitionChunks;
use super::document_changes::{DocumentChangeContext, DocumentChanges};
use crate::documents::PrimaryKey;
@ -28,31 +27,28 @@ impl DocumentDeletion {
self,
indexer_alloc: &'indexer Bump,
primary_key: PrimaryKey<'indexer>,
thread_pool: &scoped_thread_pool::ThreadPool<crate::Error>,
chunk_size: usize,
) -> DocumentDeletionChanges<'indexer> {
let to_delete: bumpalo::collections::Vec<_> =
self.to_delete.into_iter().collect_in(indexer_alloc);
let to_delete = to_delete.into_bump_slice();
let to_delete = PartitionChunks::new(to_delete, chunk_size, thread_pool.thread_count());
DocumentDeletionChanges { to_delete, primary_key }
}
}
pub struct DocumentDeletionChanges<'indexer> {
to_delete: &'indexer [DocumentId],
to_delete: scoped_thread_pool::PartitionChunks<'indexer, DocumentId>,
primary_key: PrimaryKey<'indexer>,
}
impl<'pl> DocumentChanges<'pl> for DocumentDeletionChanges<'pl> {
type Item = DocumentId;
fn iter(
&self,
chunk_size: usize,
) -> impl IndexedParallelIterator<Item = impl AsRef<[Self::Item]>> {
self.to_delete.par_chunks(chunk_size)
}
fn item_to_document_change<
'doc, // lifetime of a single `process` call
T: MostlySend,
@ -78,7 +74,11 @@ impl<'pl> DocumentChanges<'pl> for DocumentDeletionChanges<'pl> {
}
fn len(&self) -> usize {
self.to_delete.len()
self.to_delete.slice().len()
}
fn items(&self, thread_index: usize, task_index: usize) -> Option<&[Self::Item]> {
self.to_delete.partition(thread_index, task_index)
}
}
@ -86,6 +86,7 @@ impl<'pl> DocumentChanges<'pl> for DocumentDeletionChanges<'pl> {
mod test {
use std::cell::RefCell;
use std::marker::PhantomData;
use std::num::NonZeroUsize;
use std::sync::RwLock;
use bumpalo::Bump;
@ -135,6 +136,9 @@ mod test {
}
}
let mut thread_pool =
scoped_thread_pool::ThreadPool::new(NonZeroUsize::new(1).unwrap(), "test".into());
let mut deletions = DocumentDeletion::new();
deletions.delete_documents_by_docids(Vec::<u32>::new().into_iter().collect());
let indexer = Bump::new();
@ -173,6 +177,7 @@ mod test {
let datastore = ThreadLocal::new();
extract(
&mut thread_pool,
&changes,
&deletion_tracker,
context,

View File

@ -6,8 +6,8 @@ use bumparaw_collections::RawMap;
use hashbrown::hash_map::Entry;
use heed::RoTxn;
use memmap2::Mmap;
use rayon::slice::ParallelSlice;
use rustc_hash::FxBuildHasher;
use scoped_thread_pool::PartitionChunks;
use serde_json::value::RawValue;
use serde_json::Deserializer;
@ -57,6 +57,8 @@ impl<'pl> DocumentOperation<'pl> {
new_fields_ids_map: &mut FieldsIdsMap,
must_stop_processing: &MSP,
progress: Progress,
thread_pool: &scoped_thread_pool::ThreadPool<crate::Error>,
chunk_size: usize,
) -> Result<(DocumentOperationChanges<'pl>, Vec<PayloadStats>, Option<PrimaryKey<'pl>>)>
where
MSP: Fn() -> bool,
@ -130,6 +132,8 @@ impl<'pl> DocumentOperation<'pl> {
docids_version_offsets.sort_unstable_by_key(|(_, po)| method.sort_key(&po.operations));
let docids_version_offsets = docids_version_offsets.into_bump_slice();
let docids_version_offsets =
PartitionChunks::new(docids_version_offsets, chunk_size, thread_pool.thread_count());
Ok((DocumentOperationChanges { docids_version_offsets }, operations_stats, primary_key))
}
}
@ -353,13 +357,6 @@ fn merge_version_offsets<'s, 'pl>(
impl<'pl> DocumentChanges<'pl> for DocumentOperationChanges<'pl> {
type Item = (&'pl str, PayloadOperations<'pl>);
fn iter(
&self,
chunk_size: usize,
) -> impl rayon::prelude::IndexedParallelIterator<Item = impl AsRef<[Self::Item]>> {
self.docids_version_offsets.par_chunks(chunk_size)
}
fn item_to_document_change<'doc, T: MostlySend + 'doc>(
&'doc self,
context: &'doc DocumentChangeContext<T>,
@ -379,12 +376,16 @@ impl<'pl> DocumentChanges<'pl> for DocumentOperationChanges<'pl> {
}
fn len(&self) -> usize {
self.docids_version_offsets.len()
self.docids_version_offsets.slice().len()
}
fn items(&self, thread_index: usize, task_index: usize) -> Option<&[Self::Item]> {
self.docids_version_offsets.partition(thread_index, task_index)
}
}
pub struct DocumentOperationChanges<'pl> {
docids_version_offsets: &'pl [(&'pl str, PayloadOperations<'pl>)],
docids_version_offsets: PartitionChunks<'pl, (&'pl str, PayloadOperations<'pl>)>,
}
pub enum Payload<'pl> {

View File

@ -104,9 +104,8 @@ where
);
});
let (extractor_sender, writer_receiver) = pool
.install(|| extractor_writer_bbqueue(&mut bbbuffers, total_bbbuffer_capacity, 1000))
.unwrap();
let (extractor_sender, writer_receiver) =
extractor_writer_bbqueue(thread_pool, &mut bbbuffers, total_bbbuffer_capacity, 1000);
let metadata_builder = MetadataBuilder::from_index(index, wtxn)?;
let new_fields_ids_map = FieldIdMapWithMetadata::new(new_fields_ids_map, metadata_builder);
@ -139,21 +138,19 @@ where
let document_ids = &mut document_ids;
let extractor_handle =
Builder::new().name(S("indexer-extractors")).spawn_scoped(s, move || {
pool.install(move || {
extract::extract_all(
document_changes,
indexing_context,
indexer_span,
extractor_sender,
embedders,
&mut extractor_allocs,
finished_extraction,
field_distribution,
index_embeddings,
document_ids,
)
})
.unwrap()
extract::extract_all(
thread_pool,
document_changes,
indexing_context,
indexer_span,
extractor_sender,
embedders,
&mut extractor_allocs,
finished_extraction,
field_distribution,
index_embeddings,
document_ids,
)
})?;
let global_fields_ids_map = GlobalFieldsIdsMap::new(&new_fields_ids_map);

View File

@ -3,6 +3,7 @@ use std::ops::DerefMut;
use bumparaw_collections::RawMap;
use rayon::iter::IndexedParallelIterator;
use rustc_hash::FxBuildHasher;
use scoped_thread_pool::ThreadPool;
use serde_json::value::RawValue;
use super::document_changes::{DocumentChangeContext, DocumentChanges};
@ -14,45 +15,34 @@ use crate::update::new::thread_local::MostlySend;
use crate::update::new::{DocumentChange, Insertion};
use crate::{Error, InternalError, Result, UserError};
pub struct PartialDump<I> {
iter: I,
}
pub struct PartialDump;
impl<I> PartialDump<I> {
pub fn new_from_jsonlines(iter: I) -> Self {
PartialDump { iter }
impl PartialDump {
pub fn new_from_jsonlines() -> Self {
PartialDump
}
pub fn into_changes<'index>(
self,
concurrent_available_ids: &'index ConcurrentAvailableIds,
primary_key: &'index PrimaryKey,
) -> PartialDumpChanges<'index, I> {
thread_pool: &ThreadPool<crate::Error>,
chunk_size: usize,
) -> PartialDumpChanges<'index> {
// Note for future self:
// - We recommend sending chunks of documents in this `PartialDumpIndexer` we therefore need to create a custom take_while_size method (that doesn't drop items).
PartialDumpChanges { iter: self.iter, concurrent_available_ids, primary_key }
PartialDumpChanges { concurrent_available_ids, primary_key }
}
}
pub struct PartialDumpChanges<'doc, I> {
iter: I,
pub struct PartialDumpChanges<'doc> {
concurrent_available_ids: &'doc ConcurrentAvailableIds,
primary_key: &'doc PrimaryKey<'doc>,
}
impl<'index, Iter> DocumentChanges<'index> for PartialDumpChanges<'index, Iter>
where
Iter: IndexedParallelIterator<Item = Box<RawValue>> + Clone + Sync + 'index,
{
impl<'index> DocumentChanges<'index> for PartialDumpChanges<'index> {
type Item = Box<RawValue>;
fn iter(
&self,
chunk_size: usize,
) -> impl IndexedParallelIterator<Item = impl AsRef<[Self::Item]>> {
self.iter.clone().chunks(chunk_size)
}
fn item_to_document_change<'doc, T: MostlySend + 'doc>(
&'doc self,
context: &'doc DocumentChangeContext<T>,
@ -85,6 +75,10 @@ where
}
fn len(&self) -> usize {
self.iter.len()
unimplemented!()
}
fn items(&self, thread_index: usize, task_index: usize) -> Option<&[Self::Item]> {
unimplemented!()
}
}

View File

@ -1,9 +1,10 @@
use bumpalo::collections::CollectIn;
use bumpalo::Bump;
use bumparaw_collections::RawMap;
use rayon::iter::IndexedParallelIterator;
use rayon::slice::ParallelSlice as _;
use rhai::{Dynamic, Engine, OptimizationLevel, Scope, AST};
use roaring::RoaringBitmap;
use rustc_hash::FxBuildHasher;
use scoped_thread_pool::{PartitionChunks, ThreadPool};
use super::document_changes::DocumentChangeContext;
use super::DocumentChanges;
@ -22,14 +23,12 @@ pub struct UpdateByFunction {
code: String,
}
pub struct UpdateByFunctionChanges<'doc> {
primary_key: &'doc PrimaryKey<'doc>,
pub struct UpdateByFunctionChanges<'index> {
primary_key: &'index PrimaryKey<'index>,
engine: Engine,
ast: AST,
context: Option<Dynamic>,
// It is sad that the RoaringBitmap doesn't
// implement IndexedParallelIterator
documents: Vec<u32>,
documents: PartitionChunks<'index, u32>,
}
impl UpdateByFunction {
@ -40,6 +39,9 @@ impl UpdateByFunction {
pub fn into_changes<'index>(
self,
primary_key: &'index PrimaryKey,
allocator: &'index Bump,
thread_pool: &ThreadPool<crate::Error>,
chunk_size: usize,
) -> Result<UpdateByFunctionChanges<'index>> {
let Self { documents, context, code } = self;
@ -64,26 +66,19 @@ impl UpdateByFunction {
None => None,
};
Ok(UpdateByFunctionChanges {
primary_key,
engine,
ast,
context,
documents: documents.into_iter().collect(),
})
let documents: bumpalo::collections::Vec<'_, _> =
documents.into_iter().collect_in(allocator);
let documents = documents.into_bump_slice();
let documents = PartitionChunks::new(documents, chunk_size, thread_pool.thread_count());
Ok(UpdateByFunctionChanges { primary_key, engine, ast, context, documents })
}
}
impl<'index> DocumentChanges<'index> for UpdateByFunctionChanges<'index> {
type Item = u32;
fn iter(
&self,
chunk_size: usize,
) -> impl IndexedParallelIterator<Item = impl AsRef<[Self::Item]>> {
self.documents.as_slice().par_chunks(chunk_size)
}
fn item_to_document_change<'doc, T: MostlySend + 'doc>(
&self,
context: &'doc DocumentChangeContext<T>,
@ -185,7 +180,11 @@ impl<'index> DocumentChanges<'index> for UpdateByFunctionChanges<'index> {
}
fn len(&self) -> usize {
self.documents.len()
self.documents.slice().len()
}
fn items(&self, thread_index: usize, task_index: usize) -> Option<&[Self::Item]> {
self.documents.partition(thread_index, task_index)
}
}

View File

@ -1,10 +1,10 @@
use std::cell::RefCell;
use std::sync::Mutex;
use hashbrown::HashMap;
use heed::types::Bytes;
use heed::{Database, RoTxn};
use memmap2::Mmap;
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use roaring::RoaringBitmap;
use super::channel::*;
@ -64,6 +64,7 @@ where
#[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")]
pub fn merge_and_send_docids<'extractor, MSP, D>(
thread_pool: &mut scoped_thread_pool::ThreadPool<crate::Error>,
mut caches: Vec<BalancedCaches<'extractor>>,
database: Database<Bytes, Bytes>,
index: &Index,
@ -74,7 +75,10 @@ where
MSP: Fn() -> bool + Sync,
D: DatabaseType + Sync,
{
transpose_and_freeze_caches(&mut caches)?.into_par_iter().try_for_each(|frozen| {
let frozen_caches = Mutex::new(transpose_and_freeze_caches(&mut caches)?);
match thread_pool.broadcast(|thread_index| {
let frozen = std::mem::take(frozen_caches.lock().unwrap().get_mut(thread_index).unwrap());
let rtxn = index.read_txn()?;
if must_stop_processing() {
return Err(InternalError::AbortedIndexation.into());
@ -92,12 +96,17 @@ where
}
Operation::Ignore => Ok(()),
}
})
})
});
Ok(())
}) {
Ok(()) => Ok(()),
Err(errors) => Err(crate::Error::from_scoped_thread_pool_errors(thread_pool, errors)),
}
}
#[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")]
pub fn merge_and_send_facet_docids<'extractor>(
thread_pool: &mut scoped_thread_pool::ThreadPool<crate::Error>,
mut caches: Vec<BalancedCaches<'extractor>>,
database: FacetDatabases,
index: &Index,
@ -108,35 +117,42 @@ pub fn merge_and_send_facet_docids<'extractor>(
let max_number_count = (index.facet_id_f64_docids.len(rtxn)? / 500) as usize;
let max_string_count = max_string_count.clamp(1000, 100_000);
let max_number_count = max_number_count.clamp(1000, 100_000);
transpose_and_freeze_caches(&mut caches)?
.into_par_iter()
.map(|frozen| {
let mut facet_field_ids_delta =
FacetFieldIdsDelta::new(max_string_count, max_number_count);
let rtxn = index.read_txn()?;
merge_caches_sorted(frozen, |key, DelAddRoaringBitmap { del, add }| {
let current = database.get_cbo_roaring_bytes_value(&rtxn, key)?;
match merge_cbo_bitmaps(current, del, add)? {
Operation::Write(bitmap) => {
facet_field_ids_delta.register_from_key(key);
docids_sender.write(key, &bitmap)?;
Ok(())
}
Operation::Delete => {
facet_field_ids_delta.register_from_key(key);
docids_sender.delete(key)?;
Ok(())
}
Operation::Ignore => Ok(()),
}
})?;
let transposed_frozen_caches = Mutex::new(transpose_and_freeze_caches(&mut caches)?);
let output = Mutex::new(FacetFieldIdsDelta::new(max_string_count, max_number_count));
thread_pool.broadcast(|thread_index| {
// TODO: we can probably spare the mutex here since it is guaranteed that each thread will access its own cell of the vec
let frozen =
std::mem::take(transposed_frozen_caches.lock().unwrap().get_mut(thread_index).unwrap());
Ok(facet_field_ids_delta)
})
.reduce(
|| Ok(FacetFieldIdsDelta::new(max_string_count, max_number_count)),
|lhs, rhs| Ok(lhs?.merge(rhs?)),
)
let mut facet_field_ids_delta = FacetFieldIdsDelta::new(max_string_count, max_number_count);
let rtxn = index.read_txn()?;
merge_caches_sorted(frozen, |key, DelAddRoaringBitmap { del, add }| {
let current = database.get_cbo_roaring_bytes_value(&rtxn, key)?;
match merge_cbo_bitmaps(current, del, add)? {
Operation::Write(bitmap) => {
facet_field_ids_delta.register_from_key(key);
docids_sender.write(key, &bitmap)?;
Ok(())
}
Operation::Delete => {
facet_field_ids_delta.register_from_key(key);
docids_sender.delete(key)?;
Ok(())
}
Operation::Ignore => Ok(()),
}
})?;
{
let mut common = output.lock().unwrap();
*common = std::mem::replace(
&mut *common,
FacetFieldIdsDelta::new(max_string_count, max_number_count),
)
.merge(facet_field_ids_delta);
}
Ok(())
});
Ok(output.into_inner().unwrap())
}
pub struct FacetDatabases<'a> {