Add progress

This commit is contained in:
Louis Dureuil 2024-11-04 15:10:40 +01:00
parent a77d5ea8c1
commit 3658f57f93
No known key found for this signature in database
11 changed files with 380 additions and 48 deletions

View File

@ -22,6 +22,7 @@ use std::ffi::OsStr;
use std::fmt;
use std::fs::{self, File};
use std::io::BufWriter;
use std::sync::atomic::{self, AtomicU16, AtomicU32};
use bumpalo::collections::CollectIn;
use bumpalo::Bump;
@ -30,6 +31,7 @@ use meilisearch_types::error::Code;
use meilisearch_types::heed::{RoTxn, RwTxn};
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader, PrimaryKey};
use meilisearch_types::milli::heed::CompactionOption;
use meilisearch_types::milli::update::new::indexer::document_changes::Progress;
use meilisearch_types::milli::update::new::indexer::{
self, retrieve_or_guess_primary_key, UpdateByFunction,
};
@ -1221,6 +1223,40 @@ impl IndexScheduler {
) -> Result<Vec<Task>> {
let indexer_alloc = Bump::new();
let last_finished_steps = AtomicU16::new(0);
let last_finished_documents = AtomicU32::new(0);
let send_progress =
|Progress { finished_steps, total_steps, step_name, finished_total_documents }| {
/*
let current = rayon::current_thread_index();
let last_finished_steps =
last_finished_steps.fetch_max(finished_steps, atomic::Ordering::Relaxed);
if last_finished_steps > finished_steps {
return;
}
if let Some((finished_documents, total_documents)) = finished_total_documents {
if last_finished_steps < finished_steps {
last_finished_documents.store(finished_documents, atomic::Ordering::Relaxed);
} else {
let last_finished_documents = last_finished_documents
.fetch_max(finished_documents, atomic::Ordering::Relaxed);
if last_finished_documents > finished_documents {
return;
}
}
tracing::warn!("Progress from {current:?}: {step_name} ({finished_steps}/{total_steps}), document {finished_documents}/{total_documents}")
} else {
tracing::warn!(
"Progress from {current:?}: {step_name} ({finished_steps}/{total_steps})"
)
}
*/
};
match operation {
IndexOperation::DocumentClear { mut tasks, .. } => {
let count = milli::update::ClearDocuments::new(index_wtxn, index).execute()?;
@ -1377,6 +1413,8 @@ impl IndexScheduler {
&pool,
&document_changes,
embedders,
&|| must_stop_processing.get(),
&send_progress,
)?;
// tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done");
@ -1465,6 +1503,7 @@ impl IndexScheduler {
let document_changes = indexer.into_changes(&primary_key)?;
let embedders = index.embedding_configs(index_wtxn)?;
let embedders = self.embedders(embedders)?;
let must_stop_processing = &self.must_stop_processing;
indexer::index(
index_wtxn,
@ -1475,6 +1514,8 @@ impl IndexScheduler {
&pool,
&document_changes,
embedders,
&|| must_stop_processing.get(),
&send_progress,
)?;
// tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done");
@ -1604,6 +1645,7 @@ impl IndexScheduler {
let document_changes = indexer.into_changes(&indexer_alloc, primary_key);
let embedders = index.embedding_configs(index_wtxn)?;
let embedders = self.embedders(embedders)?;
let must_stop_processing = &self.must_stop_processing;
indexer::index(
index_wtxn,
@ -1614,6 +1656,8 @@ impl IndexScheduler {
&pool,
&document_changes,
embedders,
&|| must_stop_processing.get(),
&send_progress,
)?;
// tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done");

View File

@ -16,8 +16,8 @@ use super::FacetKind;
use crate::facet::value_encoding::f64_into_bytes;
use crate::update::new::extract::DocidsExtractor;
use crate::update::new::indexer::document_changes::{
for_each_document_change, DocumentChangeContext, DocumentChanges, Extractor, FullySend,
IndexingContext, RefCellExt, ThreadLocal,
extract, DocumentChangeContext, DocumentChanges, Extractor, FullySend, IndexingContext,
Progress, RefCellExt, ThreadLocal,
};
use crate::update::new::DocumentChange;
use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps};
@ -250,12 +250,19 @@ fn truncate_str(s: &str) -> &str {
impl DocidsExtractor for FacetedDocidsExtractor {
#[tracing::instrument(level = "trace", skip_all, target = "indexing::extract::faceted")]
fn run_extraction<'pl, 'fid, 'indexer, 'index, DC: DocumentChanges<'pl>>(
fn run_extraction<'pl, 'fid, 'indexer, 'index, DC: DocumentChanges<'pl>, MSP, SP>(
grenad_parameters: GrenadParameters,
document_changes: &DC,
indexing_context: IndexingContext<'fid, 'indexer, 'index>,
indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP, SP>,
extractor_allocs: &mut ThreadLocal<FullySend<RefCell<Bump>>>,
) -> Result<Merger<File, MergeDeladdCboRoaringBitmaps>> {
finished_steps: u16,
total_steps: u16,
step_name: &'static str,
) -> Result<Merger<File, MergeDeladdCboRoaringBitmaps>>
where
MSP: Fn() -> bool + Sync,
SP: Fn(Progress) + Sync,
{
let max_memory = grenad_parameters.max_memory_by_thread();
let index = indexing_context.index;
@ -276,12 +283,15 @@ impl DocidsExtractor for FacetedDocidsExtractor {
grenad_parameters,
max_memory,
};
for_each_document_change(
extract(
document_changes,
&extractor,
indexing_context,
extractor_allocs,
&datastore,
finished_steps,
total_steps,
step_name,
)?;
}
{

View File

@ -13,17 +13,25 @@ use grenad::Merger;
pub use searchable::*;
pub use vectors::EmbeddingExtractor;
use super::indexer::document_changes::{DocumentChanges, FullySend, IndexingContext, ThreadLocal};
use super::indexer::document_changes::{
DocumentChanges, FullySend, IndexingContext, Progress, ThreadLocal,
};
use crate::update::{GrenadParameters, MergeDeladdCboRoaringBitmaps};
use crate::Result;
pub trait DocidsExtractor {
fn run_extraction<'pl, 'fid, 'indexer, 'index, DC: DocumentChanges<'pl>>(
fn run_extraction<'pl, 'fid, 'indexer, 'index, DC: DocumentChanges<'pl>, MSP, SP>(
grenad_parameters: GrenadParameters,
document_changes: &DC,
indexing_context: IndexingContext<'fid, 'indexer, 'index>,
indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP, SP>,
extractor_allocs: &mut ThreadLocal<FullySend<RefCell<Bump>>>,
) -> Result<Merger<File, MergeDeladdCboRoaringBitmaps>>;
finished_steps: u16,
total_steps: u16,
step_name: &'static str,
) -> Result<Merger<File, MergeDeladdCboRoaringBitmaps>>
where
MSP: Fn() -> bool + Sync,
SP: Fn(Progress) + Sync;
}
/// TODO move in permissive json pointer

View File

@ -12,8 +12,8 @@ use super::tokenize_document::{tokenizer_builder, DocumentTokenizer};
use crate::update::new::extract::cache::CboCachedSorter;
use crate::update::new::extract::perm_json_p::contained_in;
use crate::update::new::indexer::document_changes::{
for_each_document_change, DocumentChangeContext, DocumentChanges, Extractor, FullySend,
IndexingContext, RefCellExt, ThreadLocal,
extract, DocumentChangeContext, DocumentChanges, Extractor, FullySend, IndexingContext,
Progress, RefCellExt, ThreadLocal,
};
use crate::update::new::DocumentChange;
use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps};
@ -341,12 +341,19 @@ impl<'extractor> Extractor<'extractor> for WordDocidsExtractorData<'extractor> {
pub struct WordDocidsExtractors;
impl WordDocidsExtractors {
pub fn run_extraction<'pl, 'fid, 'indexer, 'index, DC: DocumentChanges<'pl>>(
pub fn run_extraction<'pl, 'fid, 'indexer, 'index, DC: DocumentChanges<'pl>, MSP, SP>(
grenad_parameters: GrenadParameters,
document_changes: &DC,
indexing_context: IndexingContext<'fid, 'indexer, 'index>,
indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP, SP>,
extractor_allocs: &mut ThreadLocal<FullySend<RefCell<Bump>>>,
) -> Result<WordDocidsMergers> {
finished_steps: u16,
total_steps: u16,
step_name: &'static str,
) -> Result<WordDocidsMergers>
where
MSP: Fn() -> bool + Sync,
SP: Fn(Progress) + Sync,
{
let max_memory = grenad_parameters.max_memory_by_thread();
let index = indexing_context.index;
@ -391,12 +398,15 @@ impl WordDocidsExtractors {
max_memory,
};
for_each_document_change(
extract(
document_changes,
&extractor,
indexing_context,
extractor_allocs,
&datastore,
finished_steps,
total_steps,
step_name,
)?;
}

View File

@ -17,8 +17,8 @@ use tokenize_document::{tokenizer_builder, DocumentTokenizer};
use super::cache::CboCachedSorter;
use super::DocidsExtractor;
use crate::update::new::indexer::document_changes::{
for_each_document_change, DocumentChangeContext, DocumentChanges, Extractor, FullySend,
IndexingContext, ThreadLocal,
extract, DocumentChangeContext, DocumentChanges, Extractor, FullySend, IndexingContext,
Progress, ThreadLocal,
};
use crate::update::new::DocumentChange;
use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps};
@ -69,12 +69,19 @@ impl<'extractor, EX: SearchableExtractor + Sync> Extractor<'extractor>
}
pub trait SearchableExtractor: Sized + Sync {
fn run_extraction<'pl, 'fid, 'indexer, 'index, DC: DocumentChanges<'pl>>(
fn run_extraction<'pl, 'fid, 'indexer, 'index, DC: DocumentChanges<'pl>, MSP, SP>(
grenad_parameters: GrenadParameters,
document_changes: &DC,
indexing_context: IndexingContext<'fid, 'indexer, 'index>,
indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP, SP>,
extractor_allocs: &mut ThreadLocal<FullySend<RefCell<Bump>>>,
) -> Result<Merger<File, MergeDeladdCboRoaringBitmaps>> {
finished_steps: u16,
total_steps: u16,
step_name: &'static str,
) -> Result<Merger<File, MergeDeladdCboRoaringBitmaps>>
where
MSP: Fn() -> bool + Sync,
SP: Fn(Progress) + Sync,
{
let max_memory = grenad_parameters.max_memory_by_thread();
let rtxn = indexing_context.index.read_txn()?;
@ -118,12 +125,15 @@ pub trait SearchableExtractor: Sized + Sync {
let span =
tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction");
let _entered = span.enter();
for_each_document_change(
extract(
document_changes,
&extractor_data,
indexing_context,
extractor_allocs,
&datastore,
finished_steps,
total_steps,
step_name,
)?;
}
{
@ -168,17 +178,27 @@ pub trait SearchableExtractor: Sized + Sync {
}
impl<T: SearchableExtractor> DocidsExtractor for T {
fn run_extraction<'pl, 'fid, 'indexer, 'index, DC: DocumentChanges<'pl>>(
fn run_extraction<'pl, 'fid, 'indexer, 'index, DC: DocumentChanges<'pl>, MSP, SP>(
grenad_parameters: GrenadParameters,
document_changes: &DC,
indexing_context: IndexingContext<'fid, 'indexer, 'index>,
indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP, SP>,
extractor_allocs: &mut ThreadLocal<FullySend<RefCell<Bump>>>,
) -> Result<Merger<File, MergeDeladdCboRoaringBitmaps>> {
finished_steps: u16,
total_steps: u16,
step_name: &'static str,
) -> Result<Merger<File, MergeDeladdCboRoaringBitmaps>>
where
MSP: Fn() -> bool + Sync,
SP: Fn(Progress) + Sync,
{
Self::run_extraction(
grenad_parameters,
document_changes,
indexing_context,
extractor_allocs,
finished_steps,
total_steps,
step_name,
)
}
}

View File

@ -9,7 +9,7 @@ use rayon::iter::IndexedParallelIterator;
use super::super::document_change::DocumentChange;
use crate::fields_ids_map::metadata::FieldIdMapWithMetadata;
use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _;
use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, Result};
use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result};
pub trait RefCellExt<T: ?Sized> {
fn try_borrow_or_yield(&self) -> std::result::Result<Ref<'_, T>, std::cell::BorrowError>;
@ -335,6 +335,12 @@ pub trait DocumentChanges<'pl // lifetime of the underlying payload
fn iter(&self, chunk_size: usize) -> impl IndexedParallelIterator<Item = impl AsRef<[Self::Item]>>;
fn len(&self) -> usize;
fn is_empty(&self) -> bool {
self.len() == 0
}
fn item_to_document_change<'doc, // lifetime of a single `process` call
T: MostlySend>(
&'doc self,
@ -344,22 +350,72 @@ pub trait DocumentChanges<'pl // lifetime of the underlying payload
;
}
#[derive(Clone, Copy)]
pub struct IndexingContext<
'fid, // invariant lifetime of fields ids map
'indexer, // covariant lifetime of objects that are borrowed during the entire indexing operation
'index, // covariant lifetime of the index
> {
MSP,
SP,
> where
MSP: Fn() -> bool + Sync,
SP: Fn(Progress) + Sync,
{
pub index: &'index Index,
pub db_fields_ids_map: &'indexer FieldsIdsMap,
pub new_fields_ids_map: &'fid RwLock<FieldIdMapWithMetadata>,
pub doc_allocs: &'indexer ThreadLocal<FullySend<Cell<Bump>>>,
pub fields_ids_map_store: &'indexer ThreadLocal<FullySend<RefCell<GlobalFieldsIdsMap<'fid>>>>,
pub must_stop_processing: &'indexer MSP,
pub send_progress: &'indexer SP,
}
impl<
'fid, // invariant lifetime of fields ids map
'indexer, // covariant lifetime of objects that are borrowed during the entire indexing operation
'index, // covariant lifetime of the index
MSP,
SP,
> Copy
for IndexingContext<
'fid, // invariant lifetime of fields ids map
'indexer, // covariant lifetime of objects that are borrowed during the entire indexing operation
'index, // covariant lifetime of the index
MSP,
SP,
>
where
MSP: Fn() -> bool + Sync,
SP: Fn(Progress) + Sync,
{
}
impl<
'fid, // invariant lifetime of fields ids map
'indexer, // covariant lifetime of objects that are borrowed during the entire indexing operation
'index, // covariant lifetime of the index
MSP,
SP,
> Clone
for IndexingContext<
'fid, // invariant lifetime of fields ids map
'indexer, // covariant lifetime of objects that are borrowed during the entire indexing operation
'index, // covariant lifetime of the index
MSP,
SP,
>
where
MSP: Fn() -> bool + Sync,
SP: Fn(Progress) + Sync,
{
fn clone(&self) -> Self {
*self
}
}
const CHUNK_SIZE: usize = 100;
pub fn for_each_document_change<
#[allow(clippy::too_many_arguments)]
pub fn extract<
'pl, // covariant lifetime of the underlying payload
'extractor, // invariant lifetime of extractor_alloc
'fid, // invariant lifetime of fields ids map
@ -368,6 +424,8 @@ pub fn for_each_document_change<
'index, // covariant lifetime of the index
EX,
DC: DocumentChanges<'pl>,
MSP,
SP,
>(
document_changes: &DC,
extractor: &EX,
@ -377,20 +435,29 @@ pub fn for_each_document_change<
new_fields_ids_map,
doc_allocs,
fields_ids_map_store,
}: IndexingContext<'fid, 'indexer, 'index>,
must_stop_processing,
send_progress,
}: IndexingContext<'fid, 'indexer, 'index, MSP, SP>,
extractor_allocs: &'extractor mut ThreadLocal<FullySend<RefCell<Bump>>>,
datastore: &'data ThreadLocal<EX::Data>,
finished_steps: u16,
total_steps: u16,
step_name: &'static str,
) -> Result<()>
where
EX: Extractor<'extractor>,
MSP: Fn() -> bool + Sync,
SP: Fn(Progress) + Sync,
{
// Clean up and reuse the extractor allocs
for extractor_alloc in extractor_allocs.iter_mut() {
extractor_alloc.0.get_mut().reset();
}
let total_documents = document_changes.len();
let pi = document_changes.iter(CHUNK_SIZE);
pi.try_arc_for_each_try_init(
pi.enumerate().try_arc_for_each_try_init(
|| {
DocumentChangeContext::new(
index,
@ -403,7 +470,19 @@ where
move |index_alloc| extractor.init_data(index_alloc),
)
},
|context, items| {
|context, (finished_documents, items)| {
if (must_stop_processing)() {
return Err(Arc::new(InternalError::AbortedIndexation.into()));
}
let finished_documents = finished_documents * CHUNK_SIZE;
(send_progress)(Progress {
finished_steps,
total_steps,
step_name,
finished_total_documents: Some((finished_documents as u32, total_documents as u32)),
});
// Clean up and reuse the document-specific allocator
context.doc_alloc.reset();
@ -419,5 +498,21 @@ where
res
},
)
)?;
(send_progress)(Progress {
finished_steps,
total_steps,
step_name,
finished_total_documents: Some((total_documents as u32, total_documents as u32)),
});
Ok(())
}
pub struct Progress {
pub finished_steps: u16,
pub total_steps: u16,
pub step_name: &'static str,
pub finished_total_documents: Option<(u32, u32)>,
}

View File

@ -75,6 +75,10 @@ impl<'pl> DocumentChanges<'pl> for DocumentDeletionChanges<'pl> {
Ok(Some(DocumentChange::Deletion(Deletion::create(*docid, external_document_id))))
}
fn len(&self) -> usize {
self.to_delete.len()
}
}
#[cfg(test)]
@ -89,8 +93,7 @@ mod test {
use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder};
use crate::index::tests::TempIndex;
use crate::update::new::indexer::document_changes::{
for_each_document_change, DocumentChangeContext, Extractor, IndexingContext, MostlySend,
ThreadLocal,
extract, DocumentChangeContext, Extractor, IndexingContext, MostlySend, ThreadLocal,
};
use crate::update::new::indexer::DocumentDeletion;
use crate::update::new::DocumentChange;
@ -165,17 +168,22 @@ mod test {
new_fields_ids_map: &fields_ids_map,
doc_allocs: &doc_allocs,
fields_ids_map_store: &fields_ids_map_store,
must_stop_processing: &(|| false),
send_progress: &(|_progress| {}),
};
for _ in 0..3 {
let datastore = ThreadLocal::new();
for_each_document_change(
extract(
&changes,
&deletion_tracker,
context,
&mut extractor_allocs,
&datastore,
0,
1,
"test",
)
.unwrap();

View File

@ -241,6 +241,10 @@ impl<'pl> DocumentChanges<'pl> for DocumentOperationChanges<'pl> {
)?;
Ok(change)
}
fn len(&self) -> usize {
self.docids_version_offsets.len()
}
}
trait MergeChanges {

View File

@ -5,8 +5,8 @@ use std::thread::{self, Builder};
use big_s::S;
use bumpalo::Bump;
use document_changes::{
for_each_document_change, DocumentChangeContext, DocumentChanges, Extractor, FullySend,
IndexingContext, RefCellExt, ThreadLocal,
extract, DocumentChangeContext, DocumentChanges, Extractor, FullySend, IndexingContext,
Progress, RefCellExt, ThreadLocal,
};
pub use document_deletion::DocumentDeletion;
pub use document_operation::DocumentOperation;
@ -72,7 +72,7 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentExtractor<'a> {
context: &DocumentChangeContext<Self::Data>,
) -> Result<()> {
let mut document_buffer = Vec::new();
let mut field_distribution_delta = context.data.0.borrow_mut();
let mut field_distribution_delta = context.data.0.borrow_mut_or_yield();
let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield();
@ -155,13 +155,70 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentExtractor<'a> {
}
}
mod steps {
pub const STEPS: &[&str] = &[
"extracting documents",
"extracting facets",
"extracting words",
"extracting word proximity",
"extracting embeddings",
"writing to database",
"post-processing facets",
"post-processing words",
"finalizing",
];
const fn step(step: u16) -> (u16, &'static str) {
(step, STEPS[step as usize])
}
pub const fn total_steps() -> u16 {
STEPS.len() as u16
}
pub const fn extract_documents() -> (u16, &'static str) {
step(0)
}
pub const fn extract_facets() -> (u16, &'static str) {
step(1)
}
pub const fn extract_words() -> (u16, &'static str) {
step(2)
}
pub const fn extract_word_proximity() -> (u16, &'static str) {
step(3)
}
pub const fn extract_embeddings() -> (u16, &'static str) {
step(4)
}
pub const fn write_db() -> (u16, &'static str) {
step(5)
}
pub const fn post_processing_facets() -> (u16, &'static str) {
step(6)
}
pub const fn post_processing_words() -> (u16, &'static str) {
step(7)
}
pub const fn finalizing() -> (u16, &'static str) {
step(8)
}
}
/// This is the main function of this crate.
///
/// Give it the output of the [`Indexer::document_changes`] method and it will execute it in the [`rayon::ThreadPool`].
///
/// TODO return stats
#[allow(clippy::too_many_arguments)] // clippy: 😝
pub fn index<'pl, 'indexer, 'index, DC>(
pub fn index<'pl, 'indexer, 'index, DC, MSP, SP>(
wtxn: &mut RwTxn,
index: &'index Index,
db_fields_ids_map: &'indexer FieldsIdsMap,
@ -170,9 +227,13 @@ pub fn index<'pl, 'indexer, 'index, DC>(
pool: &ThreadPool,
document_changes: &DC,
embedders: EmbeddingConfigs,
must_stop_processing: &'indexer MSP,
send_progress: &'indexer SP,
) -> Result<()>
where
DC: DocumentChanges<'pl>,
MSP: Fn() -> bool + Sync,
SP: Fn(Progress) + Sync,
{
let (merger_sender, writer_receiver) = merger_writer_channel(10_000);
// This channel acts as a rendezvous point to ensure that we are one task ahead
@ -194,8 +255,12 @@ where
new_fields_ids_map: &new_fields_ids_map,
doc_allocs: &doc_allocs,
fields_ids_map_store: &fields_ids_map_store,
must_stop_processing,
send_progress,
};
let total_steps = steps::total_steps();
let mut field_distribution = index.field_distribution(wtxn)?;
thread::scope(|s| -> Result<()> {
@ -213,7 +278,8 @@ where
let document_sender = extractor_sender.document_sender();
let document_extractor = DocumentExtractor { document_sender: &document_sender, embedders };
let datastore = ThreadLocal::with_capacity(pool.current_num_threads());
for_each_document_change(document_changes, &document_extractor, indexing_context, &mut extractor_allocs, &datastore)?;
let (finished_steps, step_name) = steps::extract_documents();
extract(document_changes, &document_extractor, indexing_context, &mut extractor_allocs, &datastore, finished_steps, total_steps, step_name)?;
for field_distribution_delta in datastore {
let field_distribution_delta = field_distribution_delta.0.into_inner();
@ -238,22 +304,29 @@ where
{
let span = tracing::trace_span!(target: "indexing::documents::extract", "faceted");
let _entered = span.enter();
let (finished_steps, step_name) = steps::extract_facets();
extract_and_send_docids::<
_,
FacetedDocidsExtractor,
FacetDocids,
_,
_
>(
grenad_parameters,
document_changes,
indexing_context,
&mut extractor_allocs,
&extractor_sender,
finished_steps,
total_steps,
step_name
)?;
}
{
let span = tracing::trace_span!(target: "indexing::documents::extract", "word_docids");
let _entered = span.enter();
let (finished_steps, step_name) = steps::extract_words();
let WordDocidsMergers {
word_fid_docids,
@ -261,7 +334,7 @@ where
exact_word_docids,
word_position_docids,
fid_word_count_docids,
} = WordDocidsExtractors::run_extraction(grenad_parameters, document_changes, indexing_context, &mut extractor_allocs)?;
} = WordDocidsExtractors::run_extraction(grenad_parameters, document_changes, indexing_context, &mut extractor_allocs, finished_steps, total_steps, step_name)?;
extractor_sender.send_searchable::<WordDocids>(word_docids).unwrap();
extractor_sender.send_searchable::<WordFidDocids>(word_fid_docids).unwrap();
extractor_sender.send_searchable::<ExactWordDocids>(exact_word_docids).unwrap();
@ -276,16 +349,24 @@ where
if proximity_precision == ProximityPrecision::ByWord {
let span = tracing::trace_span!(target: "indexing::documents::extract", "word_pair_proximity_docids");
let _entered = span.enter();
let (finished_steps, step_name) = steps::extract_word_proximity();
extract_and_send_docids::<
_,
WordPairProximityDocidsExtractor,
WordPairProximityDocids,
_,
_
>(
grenad_parameters,
document_changes,
indexing_context,
&mut extractor_allocs,
&extractor_sender,
finished_steps,
total_steps,
step_name,
)?;
}
@ -301,8 +382,10 @@ where
let embedding_sender = todo!();
let extractor = EmbeddingExtractor::new(embedders, &embedding_sender, &field_distribution, request_threads());
let datastore = ThreadLocal::with_capacity(pool.current_num_threads());
let (finished_steps, step_name) = steps::extract_embeddings();
for_each_document_change(document_changes, &extractor, indexing_context, &mut extractor_allocs, &datastore)?;
extract(document_changes, &extractor, indexing_context, &mut extractor_allocs, &datastore, finished_steps, total_steps, step_name)?;
let mut user_provided = HashMap::new();
@ -325,6 +408,8 @@ where
{
let span = tracing::trace_span!(target: "indexing::documents::extract", "FINISH");
let _entered = span.enter();
let (finished_steps, step_name) = steps::write_db();
(indexing_context.send_progress)(Progress { finished_steps, total_steps, step_name, finished_total_documents: None });
}
// TODO THIS IS TOO MUCH
@ -501,15 +586,38 @@ where
/// TODO handle the panicking threads
handle.join().unwrap()?;
let merger_result = merger_thread.join().unwrap()?;
let (finished_steps, step_name) = steps::post_processing_facets();
(indexing_context.send_progress)(Progress {
finished_steps,
total_steps,
step_name,
finished_total_documents: None,
});
if let Some(facet_field_ids_delta) = merger_result.facet_field_ids_delta {
compute_facet_level_database(index, wtxn, facet_field_ids_delta)?;
}
let (finished_steps, step_name) = steps::post_processing_words();
(indexing_context.send_progress)(Progress {
finished_steps,
total_steps,
step_name,
finished_total_documents: None,
});
if let Some(prefix_delta) = merger_result.prefix_delta {
compute_prefix_database(index, wtxn, prefix_delta)?;
}
let (finished_steps, step_name) = steps::finalizing();
(indexing_context.send_progress)(Progress {
finished_steps,
total_steps,
step_name,
finished_total_documents: None,
});
Ok(()) as Result<_>
})?;
@ -585,6 +693,7 @@ fn compute_facet_level_database(
/// TODO: GrenadParameters::default() should be removed in favor a passed parameter
/// TODO: manage the errors correctly
/// TODO: we must have a single trait that also gives the extractor type
#[allow(clippy::too_many_arguments)]
fn extract_and_send_docids<
'pl,
'fid,
@ -593,15 +702,31 @@ fn extract_and_send_docids<
DC: DocumentChanges<'pl>,
E: DocidsExtractor,
D: MergerOperationType,
MSP,
SP,
>(
grenad_parameters: GrenadParameters,
document_changes: &DC,
indexing_context: IndexingContext<'fid, 'indexer, 'index>,
indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP, SP>,
extractor_allocs: &mut ThreadLocal<FullySend<RefCell<Bump>>>,
sender: &ExtractorSender,
) -> Result<()> {
let merger =
E::run_extraction(grenad_parameters, document_changes, indexing_context, extractor_allocs)?;
finished_steps: u16,
total_steps: u16,
step_name: &'static str,
) -> Result<()>
where
MSP: Fn() -> bool + Sync,
SP: Fn(Progress) + Sync,
{
let merger = E::run_extraction(
grenad_parameters,
document_changes,
indexing_context,
extractor_allocs,
finished_steps,
total_steps,
step_name,
)?;
sender.send_searchable::<D>(merger).unwrap();
Ok(())
}

View File

@ -79,4 +79,8 @@ where
let insertion = Insertion::create(docid, external_document_id, Versions::single(document));
Ok(Some(DocumentChange::Insertion(insertion)))
}
fn len(&self) -> usize {
self.iter.len()
}
}

View File

@ -9,7 +9,7 @@ use super::DocumentChanges;
use crate::documents::Error::InvalidDocumentFormat;
use crate::documents::PrimaryKey;
use crate::error::{FieldIdMapMissingEntry, InternalError};
use crate::update::new::document::{DocumentFromVersions, Versions};
use crate::update::new::document::Versions;
use crate::update::new::{Deletion, DocumentChange, KvReaderFieldId, Update};
use crate::{all_obkv_to_json, Error, FieldsIdsMap, Object, Result, UserError};
@ -176,6 +176,10 @@ impl<'index> DocumentChanges<'index> for UpdateByFunctionChanges<'index> {
},
}
}
fn len(&self) -> usize {
self.documents.len()
}
}
fn obkv_to_rhaimap(obkv: &KvReaderFieldId, fields_ids_map: &FieldsIdsMap) -> Result<rhai::Map> {