Compare commits

..

No commits in common. "c782c09208f3d250ff237ab3537a10e52ad52945" and "04c38220cac3ea13557fa18d30aaf6a6dd47d52a" have entirely different histories.

12 changed files with 188 additions and 111 deletions

1
Cargo.lock generated
View File

@ -3619,7 +3619,6 @@ dependencies = [
"csv", "csv",
"deserr", "deserr",
"either", "either",
"enum-iterator",
"filter-parser", "filter-parser",
"flatten-serde-json", "flatten-serde-json",
"fst", "fst",

View File

@ -101,7 +101,6 @@ thread_local = "1.1.8"
allocator-api2 = "0.2.18" allocator-api2 = "0.2.18"
rustc-hash = "2.0.0" rustc-hash = "2.0.0"
uell = "0.1.0" uell = "0.1.0"
enum-iterator = "2.1.0"
[dev-dependencies] [dev-dependencies]
mimalloc = { version = "0.1.43", default-features = false } mimalloc = { version = "0.1.43", default-features = false }

View File

@ -18,7 +18,6 @@ use crate::update::new::indexer::document_changes::{
extract, DocumentChangeContext, DocumentChanges, Extractor, IndexingContext, Progress, extract, DocumentChangeContext, DocumentChanges, Extractor, IndexingContext, Progress,
}; };
use crate::update::new::ref_cell_ext::RefCellExt as _; use crate::update::new::ref_cell_ext::RefCellExt as _;
use crate::update::new::steps::Step;
use crate::update::new::thread_local::{FullySend, ThreadLocal}; use crate::update::new::thread_local::{FullySend, ThreadLocal};
use crate::update::new::DocumentChange; use crate::update::new::DocumentChange;
use crate::update::GrenadParameters; use crate::update::GrenadParameters;
@ -338,6 +337,7 @@ fn truncate_str(s: &str) -> &str {
} }
impl FacetedDocidsExtractor { impl FacetedDocidsExtractor {
#[allow(clippy::too_many_arguments)]
#[tracing::instrument(level = "trace", skip_all, target = "indexing::extract::faceted")] #[tracing::instrument(level = "trace", skip_all, target = "indexing::extract::faceted")]
pub fn run_extraction< pub fn run_extraction<
'pl, 'pl,
@ -354,7 +354,9 @@ impl FacetedDocidsExtractor {
indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP, SP>, indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP, SP>,
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>, extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
sender: &FieldIdDocidFacetSender, sender: &FieldIdDocidFacetSender,
step: Step, finished_steps: u16,
total_steps: u16,
step_name: &'static str,
) -> Result<Vec<BalancedCaches<'extractor>>> ) -> Result<Vec<BalancedCaches<'extractor>>>
where where
MSP: Fn() -> bool + Sync, MSP: Fn() -> bool + Sync,
@ -384,7 +386,9 @@ impl FacetedDocidsExtractor {
indexing_context, indexing_context,
extractor_allocs, extractor_allocs,
&datastore, &datastore,
step, finished_steps,
total_steps,
step_name,
)?; )?;
} }

View File

@ -14,7 +14,6 @@ pub use searchable::*;
pub use vectors::EmbeddingExtractor; pub use vectors::EmbeddingExtractor;
use super::indexer::document_changes::{DocumentChanges, IndexingContext, Progress}; use super::indexer::document_changes::{DocumentChanges, IndexingContext, Progress};
use super::steps::Step;
use super::thread_local::{FullySend, ThreadLocal}; use super::thread_local::{FullySend, ThreadLocal};
use crate::update::GrenadParameters; use crate::update::GrenadParameters;
use crate::Result; use crate::Result;
@ -25,7 +24,9 @@ pub trait DocidsExtractor {
document_changes: &DC, document_changes: &DC,
indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP, SP>, indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP, SP>,
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>, extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
step: Step, finished_steps: u16,
total_steps: u16,
step_name: &'static str,
) -> Result<Vec<BalancedCaches<'extractor>>> ) -> Result<Vec<BalancedCaches<'extractor>>>
where where
MSP: Fn() -> bool + Sync, MSP: Fn() -> bool + Sync,

View File

@ -14,7 +14,6 @@ use crate::update::new::indexer::document_changes::{
extract, DocumentChangeContext, DocumentChanges, Extractor, IndexingContext, Progress, extract, DocumentChangeContext, DocumentChanges, Extractor, IndexingContext, Progress,
}; };
use crate::update::new::ref_cell_ext::RefCellExt as _; use crate::update::new::ref_cell_ext::RefCellExt as _;
use crate::update::new::steps::Step;
use crate::update::new::thread_local::{FullySend, MostlySend, ThreadLocal}; use crate::update::new::thread_local::{FullySend, MostlySend, ThreadLocal};
use crate::update::new::DocumentChange; use crate::update::new::DocumentChange;
use crate::update::GrenadParameters; use crate::update::GrenadParameters;
@ -250,7 +249,9 @@ impl WordDocidsExtractors {
document_changes: &DC, document_changes: &DC,
indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP, SP>, indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP, SP>,
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>, extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
step: Step, finished_steps: u16,
total_steps: u16,
step_name: &'static str,
) -> Result<WordDocidsCaches<'extractor>> ) -> Result<WordDocidsCaches<'extractor>>
where where
MSP: Fn() -> bool + Sync, MSP: Fn() -> bool + Sync,
@ -305,7 +306,9 @@ impl WordDocidsExtractors {
indexing_context, indexing_context,
extractor_allocs, extractor_allocs,
&datastore, &datastore,
step, finished_steps,
total_steps,
step_name,
)?; )?;
} }

View File

@ -16,7 +16,6 @@ use super::DocidsExtractor;
use crate::update::new::indexer::document_changes::{ use crate::update::new::indexer::document_changes::{
extract, DocumentChangeContext, DocumentChanges, Extractor, IndexingContext, Progress, extract, DocumentChangeContext, DocumentChanges, Extractor, IndexingContext, Progress,
}; };
use crate::update::new::steps::Step;
use crate::update::new::thread_local::{FullySend, ThreadLocal}; use crate::update::new::thread_local::{FullySend, ThreadLocal};
use crate::update::new::DocumentChange; use crate::update::new::DocumentChange;
use crate::update::GrenadParameters; use crate::update::GrenadParameters;
@ -61,7 +60,9 @@ pub trait SearchableExtractor: Sized + Sync {
document_changes: &DC, document_changes: &DC,
indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP, SP>, indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP, SP>,
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>, extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
step: Step, finished_steps: u16,
total_steps: u16,
step_name: &'static str,
) -> Result<Vec<BalancedCaches<'extractor>>> ) -> Result<Vec<BalancedCaches<'extractor>>>
where where
MSP: Fn() -> bool + Sync, MSP: Fn() -> bool + Sync,
@ -114,7 +115,9 @@ pub trait SearchableExtractor: Sized + Sync {
indexing_context, indexing_context,
extractor_allocs, extractor_allocs,
&datastore, &datastore,
step, finished_steps,
total_steps,
step_name,
)?; )?;
} }
@ -139,7 +142,9 @@ impl<T: SearchableExtractor> DocidsExtractor for T {
document_changes: &DC, document_changes: &DC,
indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP, SP>, indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP, SP>,
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>, extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
step: Step, finished_steps: u16,
total_steps: u16,
step_name: &'static str,
) -> Result<Vec<BalancedCaches<'extractor>>> ) -> Result<Vec<BalancedCaches<'extractor>>>
where where
MSP: Fn() -> bool + Sync, MSP: Fn() -> bool + Sync,
@ -150,7 +155,9 @@ impl<T: SearchableExtractor> DocidsExtractor for T {
document_changes, document_changes,
indexing_context, indexing_context,
extractor_allocs, extractor_allocs,
step, finished_steps,
total_steps,
step_name,
) )
} }
} }

View File

@ -1,4 +1,3 @@
use std::collections::hash_map::Entry;
use std::collections::{BTreeSet, HashMap}; use std::collections::{BTreeSet, HashMap};
use charabia::normalizer::NormalizerOption; use charabia::normalizer::NormalizerOption;
@ -84,9 +83,9 @@ impl<'indexer> FacetSearchBuilder<'indexer> {
} }
fn locales(&mut self, field_id: FieldId) -> Option<&[Language]> { fn locales(&mut self, field_id: FieldId) -> Option<&[Language]> {
if let Entry::Vacant(e) = self.localized_field_ids.entry(field_id) { if !self.localized_field_ids.contains_key(&field_id) {
let Some(field_name) = self.global_fields_ids_map.name(field_id) else { let Some(field_name) = self.global_fields_ids_map.name(field_id) else {
unreachable!("Field id {field_id} not found in the global fields ids map"); unreachable!("Field id {} not found in the global fields ids map", field_id);
}; };
let locales = self let locales = self
@ -95,7 +94,7 @@ impl<'indexer> FacetSearchBuilder<'indexer> {
.find(|rule| rule.match_str(field_name)) .find(|rule| rule.match_str(field_name))
.map(|rule| rule.locales.clone()); .map(|rule| rule.locales.clone());
e.insert(locales); self.localized_field_ids.insert(field_id, locales);
} }
self.localized_field_ids.get(&field_id).unwrap().as_deref() self.localized_field_ids.get(&field_id).unwrap().as_deref()

View File

@ -8,7 +8,6 @@ use rayon::iter::IndexedParallelIterator;
use super::super::document_change::DocumentChange; use super::super::document_change::DocumentChange;
use crate::fields_ids_map::metadata::FieldIdMapWithMetadata; use crate::fields_ids_map::metadata::FieldIdMapWithMetadata;
use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _; use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _;
use crate::update::new::steps::Step;
use crate::update::new::thread_local::{FullySend, MostlySend, ThreadLocal}; use crate::update::new::thread_local::{FullySend, MostlySend, ThreadLocal};
use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result}; use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result};
@ -192,6 +191,7 @@ where
const CHUNK_SIZE: usize = 100; const CHUNK_SIZE: usize = 100;
#[allow(clippy::too_many_arguments)]
pub fn extract< pub fn extract<
'pl, // covariant lifetime of the underlying payload 'pl, // covariant lifetime of the underlying payload
'extractor, // invariant lifetime of extractor_alloc 'extractor, // invariant lifetime of extractor_alloc
@ -217,7 +217,9 @@ pub fn extract<
}: IndexingContext<'fid, 'indexer, 'index, MSP, SP>, }: IndexingContext<'fid, 'indexer, 'index, MSP, SP>,
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>, extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
datastore: &'data ThreadLocal<EX::Data>, datastore: &'data ThreadLocal<EX::Data>,
step: Step, finished_steps: u16,
total_steps: u16,
step_name: &'static str,
) -> Result<()> ) -> Result<()>
where where
EX: Extractor<'extractor>, EX: Extractor<'extractor>,
@ -231,7 +233,7 @@ where
extractor_alloc.0.reset(); extractor_alloc.0.reset();
} }
let total_documents = document_changes.len() as u32; let total_documents = document_changes.len();
let pi = document_changes.iter(CHUNK_SIZE); let pi = document_changes.iter(CHUNK_SIZE);
pi.enumerate().try_arc_for_each_try_init( pi.enumerate().try_arc_for_each_try_init(
@ -251,13 +253,14 @@ where
if (must_stop_processing)() { if (must_stop_processing)() {
return Err(Arc::new(InternalError::AbortedIndexation.into())); return Err(Arc::new(InternalError::AbortedIndexation.into()));
} }
let finished_documents = (finished_documents * CHUNK_SIZE) as u32; let finished_documents = finished_documents * CHUNK_SIZE;
(send_progress)(Progress::from_step_documents( (send_progress)(Progress {
step, finished_steps,
finished_documents, total_steps,
total_documents, step_name,
)); finished_total_documents: Some((finished_documents as u32, total_documents as u32)),
});
// Clean up and reuse the document-specific allocator // Clean up and reuse the document-specific allocator
context.doc_alloc.reset(); context.doc_alloc.reset();
@ -276,7 +279,12 @@ where
}, },
)?; )?;
(send_progress)(Progress::from_step_documents(step, total_documents, total_documents)); (send_progress)(Progress {
finished_steps,
total_steps,
step_name,
finished_total_documents: Some((total_documents as u32, total_documents as u32)),
});
Ok(()) Ok(())
} }
@ -287,20 +295,3 @@ pub struct Progress {
pub step_name: &'static str, pub step_name: &'static str,
pub finished_total_documents: Option<(u32, u32)>, pub finished_total_documents: Option<(u32, u32)>,
} }
impl Progress {
pub fn from_step(step: Step) -> Self {
Self {
finished_steps: step.finished_steps(),
total_steps: Step::total_steps(),
step_name: step.name(),
finished_total_documents: None,
}
}
pub fn from_step_documents(step: Step, finished_documents: u32, total_documents: u32) -> Self {
Self {
finished_total_documents: Some((finished_documents, total_documents)),
..Progress::from_step(step)
}
}
}

View File

@ -96,7 +96,6 @@ mod test {
extract, DocumentChangeContext, Extractor, IndexingContext, extract, DocumentChangeContext, Extractor, IndexingContext,
}; };
use crate::update::new::indexer::DocumentDeletion; use crate::update::new::indexer::DocumentDeletion;
use crate::update::new::steps::Step;
use crate::update::new::thread_local::{MostlySend, ThreadLocal}; use crate::update::new::thread_local::{MostlySend, ThreadLocal};
use crate::update::new::DocumentChange; use crate::update::new::DocumentChange;
use crate::DocumentId; use crate::DocumentId;
@ -176,7 +175,9 @@ mod test {
context, context,
&mut extractor_allocs, &mut extractor_allocs,
&datastore, &datastore,
Step::ExtractingDocuments, 0,
1,
"test",
) )
.unwrap(); .unwrap();

View File

@ -20,7 +20,6 @@ use super::channel::*;
use super::extract::*; use super::extract::*;
use super::facet_search_builder::FacetSearchBuilder; use super::facet_search_builder::FacetSearchBuilder;
use super::merger::FacetFieldIdsDelta; use super::merger::FacetFieldIdsDelta;
use super::steps::Step;
use super::thread_local::ThreadLocal; use super::thread_local::ThreadLocal;
use super::word_fst_builder::{PrefixData, PrefixDelta, WordFstBuilder}; use super::word_fst_builder::{PrefixData, PrefixDelta, WordFstBuilder};
use super::words_prefix_docids::{ use super::words_prefix_docids::{
@ -52,6 +51,79 @@ mod document_operation;
mod partial_dump; mod partial_dump;
mod update_by_function; mod update_by_function;
mod steps {
pub const STEPS: &[&str] = &[
"extracting documents",
"extracting facets",
"extracting words",
"extracting word proximity",
"extracting embeddings",
"writing geo points",
"writing to database",
"writing embeddings to database",
"waiting for extractors",
"post-processing facets",
"post-processing words",
"finalizing",
];
const fn step(step: u16) -> (u16, &'static str) {
(step, STEPS[step as usize])
}
pub const fn total_steps() -> u16 {
STEPS.len() as u16
}
pub const fn extract_documents() -> (u16, &'static str) {
step(0)
}
pub const fn extract_facets() -> (u16, &'static str) {
step(1)
}
pub const fn extract_words() -> (u16, &'static str) {
step(2)
}
pub const fn extract_word_proximity() -> (u16, &'static str) {
step(3)
}
pub const fn extract_embeddings() -> (u16, &'static str) {
step(4)
}
pub const fn extract_geo_points() -> (u16, &'static str) {
step(5)
}
pub const fn write_db() -> (u16, &'static str) {
step(6)
}
pub const fn write_embedding_db() -> (u16, &'static str) {
step(7)
}
pub const fn waiting_extractors() -> (u16, &'static str) {
step(8)
}
pub const fn post_processing_facets() -> (u16, &'static str) {
step(9)
}
pub const fn post_processing_words() -> (u16, &'static str) {
step(10)
}
pub const fn finalizing() -> (u16, &'static str) {
step(11)
}
}
/// This is the main function of this crate. /// This is the main function of this crate.
/// ///
/// Give it the output of the [`Indexer::document_changes`] method and it will execute it in the [`rayon::ThreadPool`]. /// Give it the output of the [`Indexer::document_changes`] method and it will execute it in the [`rayon::ThreadPool`].
@ -94,6 +166,8 @@ where
send_progress, send_progress,
}; };
let total_steps = steps::total_steps();
let mut field_distribution = index.field_distribution(wtxn)?; let mut field_distribution = index.field_distribution(wtxn)?;
let mut document_ids = index.documents_ids(wtxn)?; let mut document_ids = index.documents_ids(wtxn)?;
@ -114,13 +188,15 @@ where
let document_sender = extractor_sender.documents(); let document_sender = extractor_sender.documents();
let document_extractor = DocumentsExtractor::new(&document_sender, embedders); let document_extractor = DocumentsExtractor::new(&document_sender, embedders);
let datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); let datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
let (finished_steps, step_name) = steps::extract_documents();
extract(document_changes, extract(document_changes,
&document_extractor, &document_extractor,
indexing_context, indexing_context,
&mut extractor_allocs, &mut extractor_allocs,
&datastore, &datastore,
Step::ExtractingDocuments, finished_steps,
total_steps,
step_name,
)?; )?;
for document_extractor_data in datastore { for document_extractor_data in datastore {
@ -141,6 +217,8 @@ where
let span = tracing::trace_span!(target: "indexing::documents::extract", "faceted"); let span = tracing::trace_span!(target: "indexing::documents::extract", "faceted");
let _entered = span.enter(); let _entered = span.enter();
let (finished_steps, step_name) = steps::extract_facets();
facet_field_ids_delta = merge_and_send_facet_docids( facet_field_ids_delta = merge_and_send_facet_docids(
FacetedDocidsExtractor::run_extraction( FacetedDocidsExtractor::run_extraction(
grenad_parameters, grenad_parameters,
@ -148,7 +226,9 @@ where
indexing_context, indexing_context,
&mut extractor_allocs, &mut extractor_allocs,
&extractor_sender.field_id_docid_facet_sender(), &extractor_sender.field_id_docid_facet_sender(),
Step::ExtractingFacets finished_steps,
total_steps,
step_name,
)?, )?,
FacetDatabases::new(index), FacetDatabases::new(index),
index, index,
@ -159,7 +239,7 @@ where
{ {
let span = tracing::trace_span!(target: "indexing::documents::extract", "word_docids"); let span = tracing::trace_span!(target: "indexing::documents::extract", "word_docids");
let _entered = span.enter(); let _entered = span.enter();
let (finished_steps, step_name) = steps::extract_words();
let WordDocidsCaches { let WordDocidsCaches {
word_docids, word_docids,
@ -172,7 +252,9 @@ where
document_changes, document_changes,
indexing_context, indexing_context,
&mut extractor_allocs, &mut extractor_allocs,
Step::ExtractingWords finished_steps,
total_steps,
step_name,
)?; )?;
// TODO Word Docids Merger // TODO Word Docids Merger
@ -253,13 +335,16 @@ where
let span = tracing::trace_span!(target: "indexing::documents::extract", "word_pair_proximity_docids"); let span = tracing::trace_span!(target: "indexing::documents::extract", "word_pair_proximity_docids");
let _entered = span.enter(); let _entered = span.enter();
let (finished_steps, step_name) = steps::extract_word_proximity();
let caches = <WordPairProximityDocidsExtractor as DocidsExtractor>::run_extraction( let caches = <WordPairProximityDocidsExtractor as DocidsExtractor>::run_extraction(
grenad_parameters, grenad_parameters,
document_changes, document_changes,
indexing_context, indexing_context,
&mut extractor_allocs, &mut extractor_allocs,
Step::ExtractingWordProximity, finished_steps,
total_steps,
step_name,
)?; )?;
merge_and_send_docids( merge_and_send_docids(
@ -283,7 +368,8 @@ where
let embedding_sender = extractor_sender.embeddings(); let embedding_sender = extractor_sender.embeddings();
let extractor = EmbeddingExtractor::new(embedders, &embedding_sender, field_distribution, request_threads()); let extractor = EmbeddingExtractor::new(embedders, &embedding_sender, field_distribution, request_threads());
let mut datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); let mut datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
extract(document_changes, &extractor, indexing_context, &mut extractor_allocs, &datastore, Step::ExtractingEmbeddings)?; let (finished_steps, step_name) = steps::extract_embeddings();
extract(document_changes, &extractor, indexing_context, &mut extractor_allocs, &datastore, finished_steps, total_steps, step_name)?;
for config in &mut index_embeddings { for config in &mut index_embeddings {
'data: for data in datastore.iter_mut() { 'data: for data in datastore.iter_mut() {
@ -305,13 +391,16 @@ where
break 'geo; break 'geo;
}; };
let datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); let datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
let (finished_steps, step_name) = steps::extract_geo_points();
extract( extract(
document_changes, document_changes,
&extractor, &extractor,
indexing_context, indexing_context,
&mut extractor_allocs, &mut extractor_allocs,
&datastore, &datastore,
Step::WritingGeoPoints finished_steps,
total_steps,
step_name,
)?; )?;
merge_and_send_rtree( merge_and_send_rtree(
@ -341,7 +430,8 @@ where
{ {
let span = tracing::trace_span!(target: "indexing::documents::extract", "FINISH"); let span = tracing::trace_span!(target: "indexing::documents::extract", "FINISH");
let _entered = span.enter(); let _entered = span.enter();
(indexing_context.send_progress)(Progress::from_step(Step::WritingToDatabase)); let (finished_steps, step_name) = steps::write_db();
(indexing_context.send_progress)(Progress { finished_steps, total_steps, step_name, finished_total_documents: None });
} }
Result::Ok(facet_field_ids_delta) Result::Ok(facet_field_ids_delta)
@ -422,9 +512,13 @@ where
let span = tracing::trace_span!(target: "indexing::vectors", parent: &indexer_span, "build"); let span = tracing::trace_span!(target: "indexing::vectors", parent: &indexer_span, "build");
let _entered = span.enter(); let _entered = span.enter();
(indexing_context.send_progress)(Progress::from_step( let (finished_steps, step_name) = steps::write_embedding_db();
Step::WritingEmbeddingsToDatabase, (indexing_context.send_progress)(Progress {
)); finished_steps,
total_steps,
step_name,
finished_total_documents: None,
});
for (_embedder_index, (_embedder_name, _embedder, writer, dimensions)) in for (_embedder_index, (_embedder_name, _embedder, writer, dimensions)) in
&mut arroy_writers &mut arroy_writers
@ -445,21 +539,46 @@ where
} }
} }
(indexing_context.send_progress)(Progress::from_step(Step::WaitingForExtractors)); let (finished_steps, step_name) = steps::waiting_extractors();
(indexing_context.send_progress)(Progress {
finished_steps,
total_steps,
step_name,
finished_total_documents: None,
});
let facet_field_ids_delta = extractor_handle.join().unwrap()?; let facet_field_ids_delta = extractor_handle.join().unwrap()?;
(indexing_context.send_progress)(Progress::from_step(Step::PostProcessingFacets)); let (finished_steps, step_name) = steps::post_processing_facets();
(indexing_context.send_progress)(Progress {
finished_steps,
total_steps,
step_name,
finished_total_documents: None,
});
compute_facet_search_database(index, wtxn, global_fields_ids_map)?; compute_facet_search_database(index, wtxn, global_fields_ids_map)?;
compute_facet_level_database(index, wtxn, facet_field_ids_delta)?; compute_facet_level_database(index, wtxn, facet_field_ids_delta)?;
(indexing_context.send_progress)(Progress::from_step(Step::PostProcessingWords)); let (finished_steps, step_name) = steps::post_processing_words();
(indexing_context.send_progress)(Progress {
finished_steps,
total_steps,
step_name,
finished_total_documents: None,
});
if let Some(prefix_delta) = compute_word_fst(index, wtxn)? { if let Some(prefix_delta) = compute_word_fst(index, wtxn)? {
compute_prefix_database(index, wtxn, prefix_delta)?; compute_prefix_database(index, wtxn, prefix_delta)?;
} }
(indexing_context.send_progress)(Progress::from_step(Step::Finalizing));
let (finished_steps, step_name) = steps::finalizing();
(indexing_context.send_progress)(Progress {
finished_steps,
total_steps,
step_name,
finished_total_documents: None,
});
Ok(()) as Result<_> Ok(()) as Result<_>
})?; })?;

View File

@ -17,7 +17,6 @@ pub mod indexer;
mod merger; mod merger;
mod parallel_iterator_ext; mod parallel_iterator_ext;
mod ref_cell_ext; mod ref_cell_ext;
pub(crate) mod steps;
pub(crate) mod thread_local; pub(crate) mod thread_local;
mod top_level_map; mod top_level_map;
pub mod vector_document; pub mod vector_document;

View File

@ -1,45 +0,0 @@
use enum_iterator::Sequence;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Sequence)]
#[repr(u16)]
pub enum Step {
ExtractingDocuments,
ExtractingFacets,
ExtractingWords,
ExtractingWordProximity,
ExtractingEmbeddings,
WritingGeoPoints,
WritingToDatabase,
WritingEmbeddingsToDatabase,
WaitingForExtractors,
PostProcessingFacets,
PostProcessingWords,
Finalizing,
}
impl Step {
pub fn name(&self) -> &'static str {
match self {
Step::ExtractingDocuments => "extracting documents",
Step::ExtractingFacets => "extracting facets",
Step::ExtractingWords => "extracting words",
Step::ExtractingWordProximity => "extracting word proximity",
Step::ExtractingEmbeddings => "extracting embeddings",
Step::WritingGeoPoints => "writing geo points",
Step::WritingToDatabase => "writing to database",
Step::WritingEmbeddingsToDatabase => "writing embeddings to database",
Step::WaitingForExtractors => "waiting for extractors",
Step::PostProcessingFacets => "post-processing facets",
Step::PostProcessingWords => "post-processing words",
Step::Finalizing => "finalizing",
}
}
pub fn finished_steps(self) -> u16 {
self as u16
}
pub const fn total_steps() -> u16 {
Self::CARDINALITY as u16
}
}