mirror of
https://github.com/meilisearch/meilisearch.git
synced 2024-11-28 16:15:42 +08:00
Move step to a dedicated mod and replace it with an enum
This commit is contained in:
parent
75943a5a9b
commit
c782c09208
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -3619,6 +3619,7 @@ dependencies = [
|
|||||||
"csv",
|
"csv",
|
||||||
"deserr",
|
"deserr",
|
||||||
"either",
|
"either",
|
||||||
|
"enum-iterator",
|
||||||
"filter-parser",
|
"filter-parser",
|
||||||
"flatten-serde-json",
|
"flatten-serde-json",
|
||||||
"fst",
|
"fst",
|
||||||
|
@ -101,6 +101,7 @@ thread_local = "1.1.8"
|
|||||||
allocator-api2 = "0.2.18"
|
allocator-api2 = "0.2.18"
|
||||||
rustc-hash = "2.0.0"
|
rustc-hash = "2.0.0"
|
||||||
uell = "0.1.0"
|
uell = "0.1.0"
|
||||||
|
enum-iterator = "2.1.0"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
mimalloc = { version = "0.1.43", default-features = false }
|
mimalloc = { version = "0.1.43", default-features = false }
|
||||||
|
@ -18,6 +18,7 @@ use crate::update::new::indexer::document_changes::{
|
|||||||
extract, DocumentChangeContext, DocumentChanges, Extractor, IndexingContext, Progress,
|
extract, DocumentChangeContext, DocumentChanges, Extractor, IndexingContext, Progress,
|
||||||
};
|
};
|
||||||
use crate::update::new::ref_cell_ext::RefCellExt as _;
|
use crate::update::new::ref_cell_ext::RefCellExt as _;
|
||||||
|
use crate::update::new::steps::Step;
|
||||||
use crate::update::new::thread_local::{FullySend, ThreadLocal};
|
use crate::update::new::thread_local::{FullySend, ThreadLocal};
|
||||||
use crate::update::new::DocumentChange;
|
use crate::update::new::DocumentChange;
|
||||||
use crate::update::GrenadParameters;
|
use crate::update::GrenadParameters;
|
||||||
@ -337,7 +338,6 @@ fn truncate_str(s: &str) -> &str {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl FacetedDocidsExtractor {
|
impl FacetedDocidsExtractor {
|
||||||
#[allow(clippy::too_many_arguments)]
|
|
||||||
#[tracing::instrument(level = "trace", skip_all, target = "indexing::extract::faceted")]
|
#[tracing::instrument(level = "trace", skip_all, target = "indexing::extract::faceted")]
|
||||||
pub fn run_extraction<
|
pub fn run_extraction<
|
||||||
'pl,
|
'pl,
|
||||||
@ -354,9 +354,7 @@ impl FacetedDocidsExtractor {
|
|||||||
indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP, SP>,
|
indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP, SP>,
|
||||||
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
|
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
|
||||||
sender: &FieldIdDocidFacetSender,
|
sender: &FieldIdDocidFacetSender,
|
||||||
finished_steps: u16,
|
step: Step,
|
||||||
total_steps: u16,
|
|
||||||
step_name: &'static str,
|
|
||||||
) -> Result<Vec<BalancedCaches<'extractor>>>
|
) -> Result<Vec<BalancedCaches<'extractor>>>
|
||||||
where
|
where
|
||||||
MSP: Fn() -> bool + Sync,
|
MSP: Fn() -> bool + Sync,
|
||||||
@ -386,9 +384,7 @@ impl FacetedDocidsExtractor {
|
|||||||
indexing_context,
|
indexing_context,
|
||||||
extractor_allocs,
|
extractor_allocs,
|
||||||
&datastore,
|
&datastore,
|
||||||
finished_steps,
|
step,
|
||||||
total_steps,
|
|
||||||
step_name,
|
|
||||||
)?;
|
)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -14,6 +14,7 @@ pub use searchable::*;
|
|||||||
pub use vectors::EmbeddingExtractor;
|
pub use vectors::EmbeddingExtractor;
|
||||||
|
|
||||||
use super::indexer::document_changes::{DocumentChanges, IndexingContext, Progress};
|
use super::indexer::document_changes::{DocumentChanges, IndexingContext, Progress};
|
||||||
|
use super::steps::Step;
|
||||||
use super::thread_local::{FullySend, ThreadLocal};
|
use super::thread_local::{FullySend, ThreadLocal};
|
||||||
use crate::update::GrenadParameters;
|
use crate::update::GrenadParameters;
|
||||||
use crate::Result;
|
use crate::Result;
|
||||||
@ -24,9 +25,7 @@ pub trait DocidsExtractor {
|
|||||||
document_changes: &DC,
|
document_changes: &DC,
|
||||||
indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP, SP>,
|
indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP, SP>,
|
||||||
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
|
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
|
||||||
finished_steps: u16,
|
step: Step,
|
||||||
total_steps: u16,
|
|
||||||
step_name: &'static str,
|
|
||||||
) -> Result<Vec<BalancedCaches<'extractor>>>
|
) -> Result<Vec<BalancedCaches<'extractor>>>
|
||||||
where
|
where
|
||||||
MSP: Fn() -> bool + Sync,
|
MSP: Fn() -> bool + Sync,
|
||||||
|
@ -14,6 +14,7 @@ use crate::update::new::indexer::document_changes::{
|
|||||||
extract, DocumentChangeContext, DocumentChanges, Extractor, IndexingContext, Progress,
|
extract, DocumentChangeContext, DocumentChanges, Extractor, IndexingContext, Progress,
|
||||||
};
|
};
|
||||||
use crate::update::new::ref_cell_ext::RefCellExt as _;
|
use crate::update::new::ref_cell_ext::RefCellExt as _;
|
||||||
|
use crate::update::new::steps::Step;
|
||||||
use crate::update::new::thread_local::{FullySend, MostlySend, ThreadLocal};
|
use crate::update::new::thread_local::{FullySend, MostlySend, ThreadLocal};
|
||||||
use crate::update::new::DocumentChange;
|
use crate::update::new::DocumentChange;
|
||||||
use crate::update::GrenadParameters;
|
use crate::update::GrenadParameters;
|
||||||
@ -249,9 +250,7 @@ impl WordDocidsExtractors {
|
|||||||
document_changes: &DC,
|
document_changes: &DC,
|
||||||
indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP, SP>,
|
indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP, SP>,
|
||||||
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
|
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
|
||||||
finished_steps: u16,
|
step: Step,
|
||||||
total_steps: u16,
|
|
||||||
step_name: &'static str,
|
|
||||||
) -> Result<WordDocidsCaches<'extractor>>
|
) -> Result<WordDocidsCaches<'extractor>>
|
||||||
where
|
where
|
||||||
MSP: Fn() -> bool + Sync,
|
MSP: Fn() -> bool + Sync,
|
||||||
@ -306,9 +305,7 @@ impl WordDocidsExtractors {
|
|||||||
indexing_context,
|
indexing_context,
|
||||||
extractor_allocs,
|
extractor_allocs,
|
||||||
&datastore,
|
&datastore,
|
||||||
finished_steps,
|
step,
|
||||||
total_steps,
|
|
||||||
step_name,
|
|
||||||
)?;
|
)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -16,6 +16,7 @@ use super::DocidsExtractor;
|
|||||||
use crate::update::new::indexer::document_changes::{
|
use crate::update::new::indexer::document_changes::{
|
||||||
extract, DocumentChangeContext, DocumentChanges, Extractor, IndexingContext, Progress,
|
extract, DocumentChangeContext, DocumentChanges, Extractor, IndexingContext, Progress,
|
||||||
};
|
};
|
||||||
|
use crate::update::new::steps::Step;
|
||||||
use crate::update::new::thread_local::{FullySend, ThreadLocal};
|
use crate::update::new::thread_local::{FullySend, ThreadLocal};
|
||||||
use crate::update::new::DocumentChange;
|
use crate::update::new::DocumentChange;
|
||||||
use crate::update::GrenadParameters;
|
use crate::update::GrenadParameters;
|
||||||
@ -60,9 +61,7 @@ pub trait SearchableExtractor: Sized + Sync {
|
|||||||
document_changes: &DC,
|
document_changes: &DC,
|
||||||
indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP, SP>,
|
indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP, SP>,
|
||||||
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
|
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
|
||||||
finished_steps: u16,
|
step: Step,
|
||||||
total_steps: u16,
|
|
||||||
step_name: &'static str,
|
|
||||||
) -> Result<Vec<BalancedCaches<'extractor>>>
|
) -> Result<Vec<BalancedCaches<'extractor>>>
|
||||||
where
|
where
|
||||||
MSP: Fn() -> bool + Sync,
|
MSP: Fn() -> bool + Sync,
|
||||||
@ -115,9 +114,7 @@ pub trait SearchableExtractor: Sized + Sync {
|
|||||||
indexing_context,
|
indexing_context,
|
||||||
extractor_allocs,
|
extractor_allocs,
|
||||||
&datastore,
|
&datastore,
|
||||||
finished_steps,
|
step,
|
||||||
total_steps,
|
|
||||||
step_name,
|
|
||||||
)?;
|
)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -142,9 +139,7 @@ impl<T: SearchableExtractor> DocidsExtractor for T {
|
|||||||
document_changes: &DC,
|
document_changes: &DC,
|
||||||
indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP, SP>,
|
indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP, SP>,
|
||||||
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
|
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
|
||||||
finished_steps: u16,
|
step: Step,
|
||||||
total_steps: u16,
|
|
||||||
step_name: &'static str,
|
|
||||||
) -> Result<Vec<BalancedCaches<'extractor>>>
|
) -> Result<Vec<BalancedCaches<'extractor>>>
|
||||||
where
|
where
|
||||||
MSP: Fn() -> bool + Sync,
|
MSP: Fn() -> bool + Sync,
|
||||||
@ -155,9 +150,7 @@ impl<T: SearchableExtractor> DocidsExtractor for T {
|
|||||||
document_changes,
|
document_changes,
|
||||||
indexing_context,
|
indexing_context,
|
||||||
extractor_allocs,
|
extractor_allocs,
|
||||||
finished_steps,
|
step,
|
||||||
total_steps,
|
|
||||||
step_name,
|
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
|
use std::collections::hash_map::Entry;
|
||||||
use std::collections::{BTreeSet, HashMap};
|
use std::collections::{BTreeSet, HashMap};
|
||||||
|
|
||||||
use charabia::normalizer::NormalizerOption;
|
use charabia::normalizer::NormalizerOption;
|
||||||
@ -83,9 +84,9 @@ impl<'indexer> FacetSearchBuilder<'indexer> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn locales(&mut self, field_id: FieldId) -> Option<&[Language]> {
|
fn locales(&mut self, field_id: FieldId) -> Option<&[Language]> {
|
||||||
if !self.localized_field_ids.contains_key(&field_id) {
|
if let Entry::Vacant(e) = self.localized_field_ids.entry(field_id) {
|
||||||
let Some(field_name) = self.global_fields_ids_map.name(field_id) else {
|
let Some(field_name) = self.global_fields_ids_map.name(field_id) else {
|
||||||
unreachable!("Field id {} not found in the global fields ids map", field_id);
|
unreachable!("Field id {field_id} not found in the global fields ids map");
|
||||||
};
|
};
|
||||||
|
|
||||||
let locales = self
|
let locales = self
|
||||||
@ -94,7 +95,7 @@ impl<'indexer> FacetSearchBuilder<'indexer> {
|
|||||||
.find(|rule| rule.match_str(field_name))
|
.find(|rule| rule.match_str(field_name))
|
||||||
.map(|rule| rule.locales.clone());
|
.map(|rule| rule.locales.clone());
|
||||||
|
|
||||||
self.localized_field_ids.insert(field_id, locales);
|
e.insert(locales);
|
||||||
}
|
}
|
||||||
|
|
||||||
self.localized_field_ids.get(&field_id).unwrap().as_deref()
|
self.localized_field_ids.get(&field_id).unwrap().as_deref()
|
||||||
|
@ -8,6 +8,7 @@ use rayon::iter::IndexedParallelIterator;
|
|||||||
use super::super::document_change::DocumentChange;
|
use super::super::document_change::DocumentChange;
|
||||||
use crate::fields_ids_map::metadata::FieldIdMapWithMetadata;
|
use crate::fields_ids_map::metadata::FieldIdMapWithMetadata;
|
||||||
use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _;
|
use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _;
|
||||||
|
use crate::update::new::steps::Step;
|
||||||
use crate::update::new::thread_local::{FullySend, MostlySend, ThreadLocal};
|
use crate::update::new::thread_local::{FullySend, MostlySend, ThreadLocal};
|
||||||
use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result};
|
use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result};
|
||||||
|
|
||||||
@ -191,7 +192,6 @@ where
|
|||||||
|
|
||||||
const CHUNK_SIZE: usize = 100;
|
const CHUNK_SIZE: usize = 100;
|
||||||
|
|
||||||
#[allow(clippy::too_many_arguments)]
|
|
||||||
pub fn extract<
|
pub fn extract<
|
||||||
'pl, // covariant lifetime of the underlying payload
|
'pl, // covariant lifetime of the underlying payload
|
||||||
'extractor, // invariant lifetime of extractor_alloc
|
'extractor, // invariant lifetime of extractor_alloc
|
||||||
@ -217,9 +217,7 @@ pub fn extract<
|
|||||||
}: IndexingContext<'fid, 'indexer, 'index, MSP, SP>,
|
}: IndexingContext<'fid, 'indexer, 'index, MSP, SP>,
|
||||||
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
|
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
|
||||||
datastore: &'data ThreadLocal<EX::Data>,
|
datastore: &'data ThreadLocal<EX::Data>,
|
||||||
finished_steps: u16,
|
step: Step,
|
||||||
total_steps: u16,
|
|
||||||
step_name: &'static str,
|
|
||||||
) -> Result<()>
|
) -> Result<()>
|
||||||
where
|
where
|
||||||
EX: Extractor<'extractor>,
|
EX: Extractor<'extractor>,
|
||||||
@ -233,7 +231,7 @@ where
|
|||||||
extractor_alloc.0.reset();
|
extractor_alloc.0.reset();
|
||||||
}
|
}
|
||||||
|
|
||||||
let total_documents = document_changes.len();
|
let total_documents = document_changes.len() as u32;
|
||||||
|
|
||||||
let pi = document_changes.iter(CHUNK_SIZE);
|
let pi = document_changes.iter(CHUNK_SIZE);
|
||||||
pi.enumerate().try_arc_for_each_try_init(
|
pi.enumerate().try_arc_for_each_try_init(
|
||||||
@ -253,14 +251,13 @@ where
|
|||||||
if (must_stop_processing)() {
|
if (must_stop_processing)() {
|
||||||
return Err(Arc::new(InternalError::AbortedIndexation.into()));
|
return Err(Arc::new(InternalError::AbortedIndexation.into()));
|
||||||
}
|
}
|
||||||
let finished_documents = finished_documents * CHUNK_SIZE;
|
let finished_documents = (finished_documents * CHUNK_SIZE) as u32;
|
||||||
|
|
||||||
(send_progress)(Progress {
|
(send_progress)(Progress::from_step_documents(
|
||||||
finished_steps,
|
step,
|
||||||
total_steps,
|
finished_documents,
|
||||||
step_name,
|
total_documents,
|
||||||
finished_total_documents: Some((finished_documents as u32, total_documents as u32)),
|
));
|
||||||
});
|
|
||||||
|
|
||||||
// Clean up and reuse the document-specific allocator
|
// Clean up and reuse the document-specific allocator
|
||||||
context.doc_alloc.reset();
|
context.doc_alloc.reset();
|
||||||
@ -279,12 +276,7 @@ where
|
|||||||
},
|
},
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
(send_progress)(Progress {
|
(send_progress)(Progress::from_step_documents(step, total_documents, total_documents));
|
||||||
finished_steps,
|
|
||||||
total_steps,
|
|
||||||
step_name,
|
|
||||||
finished_total_documents: Some((total_documents as u32, total_documents as u32)),
|
|
||||||
});
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -295,3 +287,20 @@ pub struct Progress {
|
|||||||
pub step_name: &'static str,
|
pub step_name: &'static str,
|
||||||
pub finished_total_documents: Option<(u32, u32)>,
|
pub finished_total_documents: Option<(u32, u32)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Progress {
|
||||||
|
pub fn from_step(step: Step) -> Self {
|
||||||
|
Self {
|
||||||
|
finished_steps: step.finished_steps(),
|
||||||
|
total_steps: Step::total_steps(),
|
||||||
|
step_name: step.name(),
|
||||||
|
finished_total_documents: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn from_step_documents(step: Step, finished_documents: u32, total_documents: u32) -> Self {
|
||||||
|
Self {
|
||||||
|
finished_total_documents: Some((finished_documents, total_documents)),
|
||||||
|
..Progress::from_step(step)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -96,6 +96,7 @@ mod test {
|
|||||||
extract, DocumentChangeContext, Extractor, IndexingContext,
|
extract, DocumentChangeContext, Extractor, IndexingContext,
|
||||||
};
|
};
|
||||||
use crate::update::new::indexer::DocumentDeletion;
|
use crate::update::new::indexer::DocumentDeletion;
|
||||||
|
use crate::update::new::steps::Step;
|
||||||
use crate::update::new::thread_local::{MostlySend, ThreadLocal};
|
use crate::update::new::thread_local::{MostlySend, ThreadLocal};
|
||||||
use crate::update::new::DocumentChange;
|
use crate::update::new::DocumentChange;
|
||||||
use crate::DocumentId;
|
use crate::DocumentId;
|
||||||
@ -175,9 +176,7 @@ mod test {
|
|||||||
context,
|
context,
|
||||||
&mut extractor_allocs,
|
&mut extractor_allocs,
|
||||||
&datastore,
|
&datastore,
|
||||||
0,
|
Step::ExtractingDocuments,
|
||||||
1,
|
|
||||||
"test",
|
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
@ -20,6 +20,7 @@ use super::channel::*;
|
|||||||
use super::extract::*;
|
use super::extract::*;
|
||||||
use super::facet_search_builder::FacetSearchBuilder;
|
use super::facet_search_builder::FacetSearchBuilder;
|
||||||
use super::merger::FacetFieldIdsDelta;
|
use super::merger::FacetFieldIdsDelta;
|
||||||
|
use super::steps::Step;
|
||||||
use super::thread_local::ThreadLocal;
|
use super::thread_local::ThreadLocal;
|
||||||
use super::word_fst_builder::{PrefixData, PrefixDelta, WordFstBuilder};
|
use super::word_fst_builder::{PrefixData, PrefixDelta, WordFstBuilder};
|
||||||
use super::words_prefix_docids::{
|
use super::words_prefix_docids::{
|
||||||
@ -51,80 +52,6 @@ mod document_operation;
|
|||||||
mod partial_dump;
|
mod partial_dump;
|
||||||
mod update_by_function;
|
mod update_by_function;
|
||||||
|
|
||||||
mod steps {
|
|
||||||
pub const STEPS: &[&str] = &[
|
|
||||||
"extracting documents",
|
|
||||||
"extracting facets",
|
|
||||||
"extracting words",
|
|
||||||
"extracting word proximity",
|
|
||||||
"extracting embeddings",
|
|
||||||
"writing geo points",
|
|
||||||
"writing to database",
|
|
||||||
"writing embeddings to database",
|
|
||||||
"waiting for extractors",
|
|
||||||
"post-processing facets",
|
|
||||||
"post-processing words",
|
|
||||||
"finalizing",
|
|
||||||
];
|
|
||||||
|
|
||||||
const fn step(step: u16) -> (u16, &'static str) {
|
|
||||||
/// TODO: convert to an enum_iterator enum of steps
|
|
||||||
(step, STEPS[step as usize])
|
|
||||||
}
|
|
||||||
|
|
||||||
pub const fn total_steps() -> u16 {
|
|
||||||
STEPS.len() as u16
|
|
||||||
}
|
|
||||||
|
|
||||||
pub const fn extract_documents() -> (u16, &'static str) {
|
|
||||||
step(0)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub const fn extract_facets() -> (u16, &'static str) {
|
|
||||||
step(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub const fn extract_words() -> (u16, &'static str) {
|
|
||||||
step(2)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub const fn extract_word_proximity() -> (u16, &'static str) {
|
|
||||||
step(3)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub const fn extract_embeddings() -> (u16, &'static str) {
|
|
||||||
step(4)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub const fn extract_geo_points() -> (u16, &'static str) {
|
|
||||||
step(5)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub const fn write_db() -> (u16, &'static str) {
|
|
||||||
step(6)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub const fn write_embedding_db() -> (u16, &'static str) {
|
|
||||||
step(7)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub const fn waiting_extractors() -> (u16, &'static str) {
|
|
||||||
step(8)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub const fn post_processing_facets() -> (u16, &'static str) {
|
|
||||||
step(9)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub const fn post_processing_words() -> (u16, &'static str) {
|
|
||||||
step(10)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub const fn finalizing() -> (u16, &'static str) {
|
|
||||||
step(11)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// This is the main function of this crate.
|
/// This is the main function of this crate.
|
||||||
///
|
///
|
||||||
/// Give it the output of the [`Indexer::document_changes`] method and it will execute it in the [`rayon::ThreadPool`].
|
/// Give it the output of the [`Indexer::document_changes`] method and it will execute it in the [`rayon::ThreadPool`].
|
||||||
@ -167,8 +94,6 @@ where
|
|||||||
send_progress,
|
send_progress,
|
||||||
};
|
};
|
||||||
|
|
||||||
let total_steps = steps::total_steps();
|
|
||||||
|
|
||||||
let mut field_distribution = index.field_distribution(wtxn)?;
|
let mut field_distribution = index.field_distribution(wtxn)?;
|
||||||
let mut document_ids = index.documents_ids(wtxn)?;
|
let mut document_ids = index.documents_ids(wtxn)?;
|
||||||
|
|
||||||
@ -189,15 +114,13 @@ where
|
|||||||
let document_sender = extractor_sender.documents();
|
let document_sender = extractor_sender.documents();
|
||||||
let document_extractor = DocumentsExtractor::new(&document_sender, embedders);
|
let document_extractor = DocumentsExtractor::new(&document_sender, embedders);
|
||||||
let datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
|
let datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
|
||||||
let (finished_steps, step_name) = steps::extract_documents();
|
|
||||||
extract(document_changes,
|
extract(document_changes,
|
||||||
&document_extractor,
|
&document_extractor,
|
||||||
indexing_context,
|
indexing_context,
|
||||||
&mut extractor_allocs,
|
&mut extractor_allocs,
|
||||||
&datastore,
|
&datastore,
|
||||||
finished_steps,
|
Step::ExtractingDocuments,
|
||||||
total_steps,
|
|
||||||
step_name,
|
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
for document_extractor_data in datastore {
|
for document_extractor_data in datastore {
|
||||||
@ -218,8 +141,6 @@ where
|
|||||||
let span = tracing::trace_span!(target: "indexing::documents::extract", "faceted");
|
let span = tracing::trace_span!(target: "indexing::documents::extract", "faceted");
|
||||||
let _entered = span.enter();
|
let _entered = span.enter();
|
||||||
|
|
||||||
let (finished_steps, step_name) = steps::extract_facets();
|
|
||||||
|
|
||||||
facet_field_ids_delta = merge_and_send_facet_docids(
|
facet_field_ids_delta = merge_and_send_facet_docids(
|
||||||
FacetedDocidsExtractor::run_extraction(
|
FacetedDocidsExtractor::run_extraction(
|
||||||
grenad_parameters,
|
grenad_parameters,
|
||||||
@ -227,9 +148,7 @@ where
|
|||||||
indexing_context,
|
indexing_context,
|
||||||
&mut extractor_allocs,
|
&mut extractor_allocs,
|
||||||
&extractor_sender.field_id_docid_facet_sender(),
|
&extractor_sender.field_id_docid_facet_sender(),
|
||||||
finished_steps,
|
Step::ExtractingFacets
|
||||||
total_steps,
|
|
||||||
step_name,
|
|
||||||
)?,
|
)?,
|
||||||
FacetDatabases::new(index),
|
FacetDatabases::new(index),
|
||||||
index,
|
index,
|
||||||
@ -240,7 +159,7 @@ where
|
|||||||
{
|
{
|
||||||
let span = tracing::trace_span!(target: "indexing::documents::extract", "word_docids");
|
let span = tracing::trace_span!(target: "indexing::documents::extract", "word_docids");
|
||||||
let _entered = span.enter();
|
let _entered = span.enter();
|
||||||
let (finished_steps, step_name) = steps::extract_words();
|
|
||||||
|
|
||||||
let WordDocidsCaches {
|
let WordDocidsCaches {
|
||||||
word_docids,
|
word_docids,
|
||||||
@ -253,9 +172,7 @@ where
|
|||||||
document_changes,
|
document_changes,
|
||||||
indexing_context,
|
indexing_context,
|
||||||
&mut extractor_allocs,
|
&mut extractor_allocs,
|
||||||
finished_steps,
|
Step::ExtractingWords
|
||||||
total_steps,
|
|
||||||
step_name,
|
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
// TODO Word Docids Merger
|
// TODO Word Docids Merger
|
||||||
@ -336,16 +253,13 @@ where
|
|||||||
let span = tracing::trace_span!(target: "indexing::documents::extract", "word_pair_proximity_docids");
|
let span = tracing::trace_span!(target: "indexing::documents::extract", "word_pair_proximity_docids");
|
||||||
let _entered = span.enter();
|
let _entered = span.enter();
|
||||||
|
|
||||||
let (finished_steps, step_name) = steps::extract_word_proximity();
|
|
||||||
|
|
||||||
let caches = <WordPairProximityDocidsExtractor as DocidsExtractor>::run_extraction(
|
let caches = <WordPairProximityDocidsExtractor as DocidsExtractor>::run_extraction(
|
||||||
grenad_parameters,
|
grenad_parameters,
|
||||||
document_changes,
|
document_changes,
|
||||||
indexing_context,
|
indexing_context,
|
||||||
&mut extractor_allocs,
|
&mut extractor_allocs,
|
||||||
finished_steps,
|
Step::ExtractingWordProximity,
|
||||||
total_steps,
|
|
||||||
step_name,
|
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
merge_and_send_docids(
|
merge_and_send_docids(
|
||||||
@ -369,8 +283,7 @@ where
|
|||||||
let embedding_sender = extractor_sender.embeddings();
|
let embedding_sender = extractor_sender.embeddings();
|
||||||
let extractor = EmbeddingExtractor::new(embedders, &embedding_sender, field_distribution, request_threads());
|
let extractor = EmbeddingExtractor::new(embedders, &embedding_sender, field_distribution, request_threads());
|
||||||
let mut datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
|
let mut datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
|
||||||
let (finished_steps, step_name) = steps::extract_embeddings();
|
extract(document_changes, &extractor, indexing_context, &mut extractor_allocs, &datastore, Step::ExtractingEmbeddings)?;
|
||||||
extract(document_changes, &extractor, indexing_context, &mut extractor_allocs, &datastore, finished_steps, total_steps, step_name)?;
|
|
||||||
|
|
||||||
for config in &mut index_embeddings {
|
for config in &mut index_embeddings {
|
||||||
'data: for data in datastore.iter_mut() {
|
'data: for data in datastore.iter_mut() {
|
||||||
@ -392,16 +305,13 @@ where
|
|||||||
break 'geo;
|
break 'geo;
|
||||||
};
|
};
|
||||||
let datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
|
let datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
|
||||||
let (finished_steps, step_name) = steps::extract_geo_points();
|
|
||||||
extract(
|
extract(
|
||||||
document_changes,
|
document_changes,
|
||||||
&extractor,
|
&extractor,
|
||||||
indexing_context,
|
indexing_context,
|
||||||
&mut extractor_allocs,
|
&mut extractor_allocs,
|
||||||
&datastore,
|
&datastore,
|
||||||
finished_steps,
|
Step::WritingGeoPoints
|
||||||
total_steps,
|
|
||||||
step_name,
|
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
merge_and_send_rtree(
|
merge_and_send_rtree(
|
||||||
@ -431,8 +341,7 @@ where
|
|||||||
{
|
{
|
||||||
let span = tracing::trace_span!(target: "indexing::documents::extract", "FINISH");
|
let span = tracing::trace_span!(target: "indexing::documents::extract", "FINISH");
|
||||||
let _entered = span.enter();
|
let _entered = span.enter();
|
||||||
let (finished_steps, step_name) = steps::write_db();
|
(indexing_context.send_progress)(Progress::from_step(Step::WritingToDatabase));
|
||||||
(indexing_context.send_progress)(Progress { finished_steps, total_steps, step_name, finished_total_documents: None });
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Result::Ok(facet_field_ids_delta)
|
Result::Ok(facet_field_ids_delta)
|
||||||
@ -513,13 +422,9 @@ where
|
|||||||
let span = tracing::trace_span!(target: "indexing::vectors", parent: &indexer_span, "build");
|
let span = tracing::trace_span!(target: "indexing::vectors", parent: &indexer_span, "build");
|
||||||
let _entered = span.enter();
|
let _entered = span.enter();
|
||||||
|
|
||||||
let (finished_steps, step_name) = steps::write_embedding_db();
|
(indexing_context.send_progress)(Progress::from_step(
|
||||||
(indexing_context.send_progress)(Progress {
|
Step::WritingEmbeddingsToDatabase,
|
||||||
finished_steps,
|
));
|
||||||
total_steps,
|
|
||||||
step_name,
|
|
||||||
finished_total_documents: None,
|
|
||||||
});
|
|
||||||
|
|
||||||
for (_embedder_index, (_embedder_name, _embedder, writer, dimensions)) in
|
for (_embedder_index, (_embedder_name, _embedder, writer, dimensions)) in
|
||||||
&mut arroy_writers
|
&mut arroy_writers
|
||||||
@ -540,46 +445,21 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let (finished_steps, step_name) = steps::waiting_extractors();
|
(indexing_context.send_progress)(Progress::from_step(Step::WaitingForExtractors));
|
||||||
(indexing_context.send_progress)(Progress {
|
|
||||||
finished_steps,
|
|
||||||
total_steps,
|
|
||||||
step_name,
|
|
||||||
finished_total_documents: None,
|
|
||||||
});
|
|
||||||
|
|
||||||
let facet_field_ids_delta = extractor_handle.join().unwrap()?;
|
let facet_field_ids_delta = extractor_handle.join().unwrap()?;
|
||||||
|
|
||||||
let (finished_steps, step_name) = steps::post_processing_facets();
|
(indexing_context.send_progress)(Progress::from_step(Step::PostProcessingFacets));
|
||||||
(indexing_context.send_progress)(Progress {
|
|
||||||
finished_steps,
|
|
||||||
total_steps,
|
|
||||||
step_name,
|
|
||||||
finished_total_documents: None,
|
|
||||||
});
|
|
||||||
|
|
||||||
compute_facet_search_database(index, wtxn, global_fields_ids_map)?;
|
compute_facet_search_database(index, wtxn, global_fields_ids_map)?;
|
||||||
compute_facet_level_database(index, wtxn, facet_field_ids_delta)?;
|
compute_facet_level_database(index, wtxn, facet_field_ids_delta)?;
|
||||||
|
|
||||||
let (finished_steps, step_name) = steps::post_processing_words();
|
(indexing_context.send_progress)(Progress::from_step(Step::PostProcessingWords));
|
||||||
(indexing_context.send_progress)(Progress {
|
|
||||||
finished_steps,
|
|
||||||
total_steps,
|
|
||||||
step_name,
|
|
||||||
finished_total_documents: None,
|
|
||||||
});
|
|
||||||
|
|
||||||
if let Some(prefix_delta) = compute_word_fst(index, wtxn)? {
|
if let Some(prefix_delta) = compute_word_fst(index, wtxn)? {
|
||||||
compute_prefix_database(index, wtxn, prefix_delta)?;
|
compute_prefix_database(index, wtxn, prefix_delta)?;
|
||||||
}
|
}
|
||||||
|
(indexing_context.send_progress)(Progress::from_step(Step::Finalizing));
|
||||||
let (finished_steps, step_name) = steps::finalizing();
|
|
||||||
(indexing_context.send_progress)(Progress {
|
|
||||||
finished_steps,
|
|
||||||
total_steps,
|
|
||||||
step_name,
|
|
||||||
finished_total_documents: None,
|
|
||||||
});
|
|
||||||
|
|
||||||
Ok(()) as Result<_>
|
Ok(()) as Result<_>
|
||||||
})?;
|
})?;
|
||||||
|
@ -17,6 +17,7 @@ pub mod indexer;
|
|||||||
mod merger;
|
mod merger;
|
||||||
mod parallel_iterator_ext;
|
mod parallel_iterator_ext;
|
||||||
mod ref_cell_ext;
|
mod ref_cell_ext;
|
||||||
|
pub(crate) mod steps;
|
||||||
pub(crate) mod thread_local;
|
pub(crate) mod thread_local;
|
||||||
mod top_level_map;
|
mod top_level_map;
|
||||||
pub mod vector_document;
|
pub mod vector_document;
|
||||||
|
45
crates/milli/src/update/new/steps.rs
Normal file
45
crates/milli/src/update/new/steps.rs
Normal file
@ -0,0 +1,45 @@
|
|||||||
|
use enum_iterator::Sequence;
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, Sequence)]
|
||||||
|
#[repr(u16)]
|
||||||
|
pub enum Step {
|
||||||
|
ExtractingDocuments,
|
||||||
|
ExtractingFacets,
|
||||||
|
ExtractingWords,
|
||||||
|
ExtractingWordProximity,
|
||||||
|
ExtractingEmbeddings,
|
||||||
|
WritingGeoPoints,
|
||||||
|
WritingToDatabase,
|
||||||
|
WritingEmbeddingsToDatabase,
|
||||||
|
WaitingForExtractors,
|
||||||
|
PostProcessingFacets,
|
||||||
|
PostProcessingWords,
|
||||||
|
Finalizing,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Step {
|
||||||
|
pub fn name(&self) -> &'static str {
|
||||||
|
match self {
|
||||||
|
Step::ExtractingDocuments => "extracting documents",
|
||||||
|
Step::ExtractingFacets => "extracting facets",
|
||||||
|
Step::ExtractingWords => "extracting words",
|
||||||
|
Step::ExtractingWordProximity => "extracting word proximity",
|
||||||
|
Step::ExtractingEmbeddings => "extracting embeddings",
|
||||||
|
Step::WritingGeoPoints => "writing geo points",
|
||||||
|
Step::WritingToDatabase => "writing to database",
|
||||||
|
Step::WritingEmbeddingsToDatabase => "writing embeddings to database",
|
||||||
|
Step::WaitingForExtractors => "waiting for extractors",
|
||||||
|
Step::PostProcessingFacets => "post-processing facets",
|
||||||
|
Step::PostProcessingWords => "post-processing words",
|
||||||
|
Step::Finalizing => "finalizing",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn finished_steps(self) -> u16 {
|
||||||
|
self as u16
|
||||||
|
}
|
||||||
|
|
||||||
|
pub const fn total_steps() -> u16 {
|
||||||
|
Self::CARDINALITY as u16
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user