Compare commits

..

1 Commits

Author SHA1 Message Date
Louis Dureuil
cb44e630a6
Merge 40dd25d6b2 into 94fb55bb6f 2024-11-13 21:10:07 +00:00
13 changed files with 304 additions and 343 deletions

View File

@ -39,7 +39,7 @@ use meilisearch_types::milli::update::{IndexDocumentsMethod, Settings as MilliSe
use meilisearch_types::milli::vector::parsed_vectors::{ use meilisearch_types::milli::vector::parsed_vectors::{
ExplicitVectors, VectorOrArrayOfVectors, RESERVED_VECTORS_FIELD_NAME, ExplicitVectors, VectorOrArrayOfVectors, RESERVED_VECTORS_FIELD_NAME,
}; };
use meilisearch_types::milli::{self, Filter, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder}; use meilisearch_types::milli::{self, Filter};
use meilisearch_types::settings::{apply_settings_to_builder, Settings, Unchecked}; use meilisearch_types::settings::{apply_settings_to_builder, Settings, Unchecked};
use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task}; use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task};
use meilisearch_types::{compression, Index, VERSION_FILE_NAME}; use meilisearch_types::{compression, Index, VERSION_FILE_NAME};
@ -1277,6 +1277,7 @@ impl IndexScheduler {
operations, operations,
mut tasks, mut tasks,
} => { } => {
let indexer_config = self.index_mapper.indexer_config();
// TODO: at some point, for better efficiency we might want to reuse the bumpalo for successive batches. // TODO: at some point, for better efficiency we might want to reuse the bumpalo for successive batches.
// this is made difficult by the fact we're doing private clones of the index scheduler and sending it // this is made difficult by the fact we're doing private clones of the index scheduler and sending it
// to a fresh thread. // to a fresh thread.
@ -1385,16 +1386,10 @@ impl IndexScheduler {
} }
if tasks.iter().any(|res| res.error.is_none()) { if tasks.iter().any(|res| res.error.is_none()) {
let local_pool; /// TODO create a pool if needed
let pool = match &self.index_mapper.indexer_config().thread_pool { // let pool = indexer_config.thread_pool.unwrap();
Some(pool) => pool, let pool = rayon::ThreadPoolBuilder::new().build().unwrap();
None => {
local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap();
&local_pool
}
};
// TODO we want to multithread this
let document_changes = indexer.into_changes( let document_changes = indexer.into_changes(
&indexer_alloc, &indexer_alloc,
index, index,
@ -1403,20 +1398,18 @@ impl IndexScheduler {
&mut new_fields_ids_map, &mut new_fields_ids_map,
)?; )?;
pool.install(|| { indexer::index(
indexer::index( index_wtxn,
index_wtxn, index,
index, &db_fields_ids_map,
&db_fields_ids_map, new_fields_ids_map,
new_fields_ids_map, primary_key_has_been_set.then_some(primary_key),
primary_key_has_been_set.then_some(primary_key), &pool,
&document_changes, &document_changes,
embedders, embedders,
&|| must_stop_processing.get(), &|| must_stop_processing.get(),
&send_progress, &send_progress,
) )?;
})
.unwrap()?;
// tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done"); // tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done");
} }
@ -1496,37 +1489,27 @@ impl IndexScheduler {
let result_count = Ok((candidates.len(), candidates.len())) as Result<_>; let result_count = Ok((candidates.len(), candidates.len())) as Result<_>;
if task.error.is_none() { if task.error.is_none() {
let local_pool; /// TODO create a pool if needed
let pool = match &self.index_mapper.indexer_config().thread_pool { // let pool = indexer_config.thread_pool.unwrap();
Some(pool) => pool, let pool = rayon::ThreadPoolBuilder::new().build().unwrap();
None => {
local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap();
&local_pool
}
};
pool.install(|| { let indexer = UpdateByFunction::new(candidates, context.clone(), code.clone());
let indexer = let document_changes = indexer.into_changes(&primary_key)?;
UpdateByFunction::new(candidates, context.clone(), code.clone()); let embedders = index.embedding_configs(index_wtxn)?;
let document_changes = indexer.into_changes(&primary_key)?; let embedders = self.embedders(embedders)?;
let embedders = index.embedding_configs(index_wtxn)?;
let embedders = self.embedders(embedders)?;
indexer::index( indexer::index(
index_wtxn, index_wtxn,
index, index,
&db_fields_ids_map, &db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,
None, // cannot change primary key in DocumentEdition None, // cannot change primary key in DocumentEdition
&document_changes, &pool,
embedders, &document_changes,
&|| must_stop_processing.get(), embedders,
&send_progress, &|| must_stop_processing.get(),
)?; &send_progress,
)?;
Result::Ok(())
})
.unwrap()?;
// tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done"); // tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done");
} }
@ -1646,14 +1629,9 @@ impl IndexScheduler {
.map_err(milli::Error::from)?; .map_err(milli::Error::from)?;
if !tasks.iter().all(|res| res.error.is_some()) { if !tasks.iter().all(|res| res.error.is_some()) {
let local_pool; /// TODO create a pool if needed
let pool = match &self.index_mapper.indexer_config().thread_pool { // let pool = indexer_config.thread_pool.unwrap();
Some(pool) => pool, let pool = rayon::ThreadPoolBuilder::new().build().unwrap();
None => {
local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap();
&local_pool
}
};
let mut indexer = indexer::DocumentDeletion::new(); let mut indexer = indexer::DocumentDeletion::new();
indexer.delete_documents_by_docids(to_delete); indexer.delete_documents_by_docids(to_delete);
@ -1661,20 +1639,18 @@ impl IndexScheduler {
let embedders = index.embedding_configs(index_wtxn)?; let embedders = index.embedding_configs(index_wtxn)?;
let embedders = self.embedders(embedders)?; let embedders = self.embedders(embedders)?;
pool.install(|| { indexer::index(
indexer::index( index_wtxn,
index_wtxn, index,
index, &db_fields_ids_map,
&db_fields_ids_map, new_fields_ids_map,
new_fields_ids_map, None, // document deletion never changes primary key
None, // document deletion never changes primary key &pool,
&document_changes, &document_changes,
embedders, embedders,
&|| must_stop_processing.get(), &|| must_stop_processing.get(),
&send_progress, &send_progress,
) )?;
})
.unwrap()?;
// tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done"); // tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done");
} }

View File

@ -3,9 +3,6 @@ mod r#match;
mod matching_words; mod matching_words;
mod simple_token_kind; mod simple_token_kind;
use std::borrow::Cow;
use std::cmp::{max, min};
use charabia::{Language, SeparatorKind, Token, Tokenizer}; use charabia::{Language, SeparatorKind, Token, Tokenizer};
use either::Either; use either::Either;
pub use matching_words::MatchingWords; pub use matching_words::MatchingWords;
@ -13,6 +10,10 @@ use matching_words::{MatchType, PartialMatch};
use r#match::{Match, MatchPosition}; use r#match::{Match, MatchPosition};
use serde::Serialize; use serde::Serialize;
use simple_token_kind::SimpleTokenKind; use simple_token_kind::SimpleTokenKind;
use std::{
borrow::Cow,
cmp::{max, min},
};
const DEFAULT_CROP_MARKER: &str = ""; const DEFAULT_CROP_MARKER: &str = "";
const DEFAULT_HIGHLIGHT_PREFIX: &str = "<em>"; const DEFAULT_HIGHLIGHT_PREFIX: &str = "<em>";

View File

@ -366,14 +366,14 @@ pub struct FieldIdDocidFacetSender<'a>(&'a ExtractorSender);
impl FieldIdDocidFacetSender<'_> { impl FieldIdDocidFacetSender<'_> {
pub fn write_facet_string(&self, key: &[u8], value: &[u8]) -> StdResult<(), SendError<()>> { pub fn write_facet_string(&self, key: &[u8], value: &[u8]) -> StdResult<(), SendError<()>> {
debug_assert!(FieldDocIdFacetStringCodec::bytes_decode(key).is_ok()); debug_assert!(FieldDocIdFacetStringCodec::bytes_decode(key).is_ok());
let entry = EntryOperation::Write(KeyValueEntry::from_small_key_value(key, value)); let entry = EntryOperation::Write(KeyValueEntry::from_small_key_value(&key, &value));
self.0 self.0
.send_db_operation(DbOperation { database: Database::FieldIdDocidFacetStrings, entry }) .send_db_operation(DbOperation { database: Database::FieldIdDocidFacetStrings, entry })
} }
pub fn write_facet_f64(&self, key: &[u8]) -> StdResult<(), SendError<()>> { pub fn write_facet_f64(&self, key: &[u8]) -> StdResult<(), SendError<()>> {
debug_assert!(FieldDocIdFacetF64Codec::bytes_decode(key).is_ok()); debug_assert!(FieldDocIdFacetF64Codec::bytes_decode(key).is_ok());
let entry = EntryOperation::Write(KeyValueEntry::from_small_key_value(key, &[])); let entry = EntryOperation::Write(KeyValueEntry::from_small_key_value(&key, &[]));
self.0.send_db_operation(DbOperation { database: Database::FieldIdDocidFacetF64s, entry }) self.0.send_db_operation(DbOperation { database: Database::FieldIdDocidFacetF64s, entry })
} }

View File

@ -58,8 +58,7 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a> {
context.index, context.index,
&context.db_fields_ids_map, &context.db_fields_ids_map,
)?; )?;
let geo_iter = let geo_iter = content.geo_field().transpose().map(|res| res.map(|rv| ("_geo", rv)));
content.geo_field().transpose().map(|res| res.map(|rv| ("_geo", rv)));
for res in content.iter_top_level_fields().chain(geo_iter) { for res in content.iter_top_level_fields().chain(geo_iter) {
let (f, _) = res?; let (f, _) = res?;
let entry = document_extractor_data let entry = document_extractor_data
@ -75,8 +74,7 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a> {
let docid = update.docid(); let docid = update.docid();
let content = let content =
update.current(&context.rtxn, context.index, &context.db_fields_ids_map)?; update.current(&context.rtxn, context.index, &context.db_fields_ids_map)?;
let geo_iter = let geo_iter = content.geo_field().transpose().map(|res| res.map(|rv| ("_geo", rv)));
content.geo_field().transpose().map(|res| res.map(|rv| ("_geo", rv)));
for res in content.iter_top_level_fields().chain(geo_iter) { for res in content.iter_top_level_fields().chain(geo_iter) {
let (f, _) = res?; let (f, _) = res?;
let entry = document_extractor_data let entry = document_extractor_data
@ -86,8 +84,7 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a> {
*entry -= 1; *entry -= 1;
} }
let content = update.updated(); let content = update.updated();
let geo_iter = let geo_iter = content.geo_field().transpose().map(|res| res.map(|rv| ("_geo", rv)));
content.geo_field().transpose().map(|res| res.map(|rv| ("_geo", rv)));
for res in content.iter_top_level_fields().chain(geo_iter) { for res in content.iter_top_level_fields().chain(geo_iter) {
let (f, _) = res?; let (f, _) = res?;
let entry = document_extractor_data let entry = document_extractor_data
@ -117,8 +114,7 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a> {
DocumentChange::Insertion(insertion) => { DocumentChange::Insertion(insertion) => {
let docid = insertion.docid(); let docid = insertion.docid();
let content = insertion.inserted(); let content = insertion.inserted();
let geo_iter = let geo_iter = content.geo_field().transpose().map(|res| res.map(|rv| ("_geo", rv)));
content.geo_field().transpose().map(|res| res.map(|rv| ("_geo", rv)));
for res in content.iter_top_level_fields().chain(geo_iter) { for res in content.iter_top_level_fields().chain(geo_iter) {
let (f, _) = res?; let (f, _) = res?;
let entry = document_extractor_data let entry = document_extractor_data

View File

@ -3,6 +3,7 @@ use std::collections::HashMap;
use charabia::{SeparatorKind, Token, TokenKind, Tokenizer, TokenizerBuilder}; use charabia::{SeparatorKind, Token, TokenKind, Tokenizer, TokenizerBuilder};
use serde_json::Value; use serde_json::Value;
use crate::proximity::MAX_DISTANCE;
use crate::update::new::document::Document; use crate::update::new::document::Document;
use crate::update::new::extract::perm_json_p::{ use crate::update::new::extract::perm_json_p::{
seek_leaf_values_in_array, seek_leaf_values_in_object, select_field, seek_leaf_values_in_array, seek_leaf_values_in_object, select_field,
@ -12,9 +13,6 @@ use crate::{
MAX_WORD_LENGTH, MAX_WORD_LENGTH,
}; };
// todo: should be crate::proximity::MAX_DISTANCE but it has been forgotten
const MAX_DISTANCE: u32 = 8;
pub struct DocumentTokenizer<'a> { pub struct DocumentTokenizer<'a> {
pub tokenizer: &'a Tokenizer<'a>, pub tokenizer: &'a Tokenizer<'a>,
pub attribute_to_extract: Option<&'a [&'a str]>, pub attribute_to_extract: Option<&'a [&'a str]>,
@ -253,22 +251,22 @@ mod test {
]: "doggo", ]: "doggo",
[ [
2, 2,
8, MAX_DISTANCE,
]: "doggo", ]: "doggo",
[ [
2, 2,
16, 16,
]: "catto", ]: "catto",
[ [
5, 3,
0, 0,
]: "10", ]: "10",
[ [
6, 4,
0, 0,
]: "pesti", ]: "pesti",
[ [
7, 5,
0, 0,
]: "23", ]: "23",
} }

View File

@ -6,9 +6,11 @@ use grenad::Sorter;
use heed::types::{Bytes, SerdeJson}; use heed::types::{Bytes, SerdeJson};
use heed::{BytesDecode, BytesEncode, RoTxn, RwTxn}; use heed::{BytesDecode, BytesEncode, RoTxn, RwTxn};
use super::extract::FacetKind;
use super::fst_merger_builder::FstMergerBuilder; use super::fst_merger_builder::FstMergerBuilder;
use super::KvReaderDelAdd; use super::KvReaderDelAdd;
use crate::heed_codec::facet::FacetGroupKey; use crate::heed_codec::facet::{FacetGroupKey, FacetGroupKeyCodec};
use crate::heed_codec::StrRefCodec;
use crate::update::del_add::{DelAdd, KvWriterDelAdd}; use crate::update::del_add::{DelAdd, KvWriterDelAdd};
use crate::update::{create_sorter, MergeDeladdBtreesetString}; use crate::update::{create_sorter, MergeDeladdBtreesetString};
use crate::{ use crate::{

View File

@ -1,12 +1,10 @@
use std::fs::File; use std::{fs::File, io::BufWriter};
use std::io::BufWriter;
use fst::{Set, SetBuilder, Streamer}; use fst::{Set, SetBuilder, Streamer};
use memmap2::Mmap; use memmap2::Mmap;
use tempfile::tempfile; use tempfile::tempfile;
use crate::update::del_add::DelAdd; use crate::{update::del_add::DelAdd, InternalError, Result};
use crate::{InternalError, Result};
pub struct FstMergerBuilder<'a> { pub struct FstMergerBuilder<'a> {
stream: Option<fst::set::Stream<'a>>, stream: Option<fst::set::Stream<'a>>,

View File

@ -132,7 +132,7 @@ mod test {
} }
let mut deletions = DocumentDeletion::new(); let mut deletions = DocumentDeletion::new();
deletions.delete_documents_by_docids(Vec::<u32>::new().into_iter().collect()); deletions.delete_documents_by_docids(vec![0, 2, 42].into_iter().collect());
let indexer = Bump::new(); let indexer = Bump::new();
let index = TempIndex::new(); let index = TempIndex::new();

View File

@ -356,11 +356,11 @@ impl MergeChanges for MergeDocumentForUpdates {
let has_deletion = last_deletion.is_some(); let has_deletion = last_deletion.is_some();
if operations.is_empty() { if operations.is_empty() {
return if is_new { return if !is_new {
Ok(None)
} else {
let deletion = Deletion::create(docid, external_docid); let deletion = Deletion::create(docid, external_docid);
Ok(Some(DocumentChange::Deletion(deletion))) Ok(Some(DocumentChange::Deletion(deletion)))
} else {
Ok(None)
}; };
} }

View File

@ -13,6 +13,7 @@ use itertools::{merge_join_by, EitherOrBoth};
pub use partial_dump::PartialDump; pub use partial_dump::PartialDump;
use rand::SeedableRng as _; use rand::SeedableRng as _;
use raw_collections::RawMap; use raw_collections::RawMap;
use rayon::ThreadPool;
use time::OffsetDateTime; use time::OffsetDateTime;
pub use update_by_function::UpdateByFunction; pub use update_by_function::UpdateByFunction;
@ -135,6 +136,7 @@ pub fn index<'pl, 'indexer, 'index, DC, MSP, SP>(
db_fields_ids_map: &'indexer FieldsIdsMap, db_fields_ids_map: &'indexer FieldsIdsMap,
new_fields_ids_map: FieldsIdsMap, new_fields_ids_map: FieldsIdsMap,
new_primary_key: Option<PrimaryKey<'pl>>, new_primary_key: Option<PrimaryKey<'pl>>,
pool: &ThreadPool,
document_changes: &DC, document_changes: &DC,
embedders: EmbeddingConfigs, embedders: EmbeddingConfigs,
must_stop_processing: &'indexer MSP, must_stop_processing: &'indexer MSP,
@ -150,9 +152,9 @@ where
let metadata_builder = MetadataBuilder::from_index(index, wtxn)?; let metadata_builder = MetadataBuilder::from_index(index, wtxn)?;
let new_fields_ids_map = FieldIdMapWithMetadata::new(new_fields_ids_map, metadata_builder); let new_fields_ids_map = FieldIdMapWithMetadata::new(new_fields_ids_map, metadata_builder);
let new_fields_ids_map = RwLock::new(new_fields_ids_map); let new_fields_ids_map = RwLock::new(new_fields_ids_map);
let fields_ids_map_store = ThreadLocal::with_capacity(rayon::current_num_threads()); let fields_ids_map_store = ThreadLocal::with_capacity(pool.current_num_threads());
let mut extractor_allocs = ThreadLocal::with_capacity(rayon::current_num_threads()); let mut extractor_allocs = ThreadLocal::with_capacity(pool.current_num_threads());
let doc_allocs = ThreadLocal::with_capacity(rayon::current_num_threads()); let doc_allocs = ThreadLocal::with_capacity(pool.current_num_threads());
let indexing_context = IndexingContext { let indexing_context = IndexingContext {
index, index,
@ -177,260 +179,248 @@ where
let document_ids = &mut document_ids; let document_ids = &mut document_ids;
// TODO manage the errors correctly // TODO manage the errors correctly
let extractor_handle = Builder::new().name(S("indexer-extractors")).spawn_scoped(s, move || { let extractor_handle = Builder::new().name(S("indexer-extractors")).spawn_scoped(s, move || {
let span = tracing::trace_span!(target: "indexing::documents", parent: &indexer_span, "extract"); let result = pool.in_place_scope(|_s| {
let _entered = span.enter(); let span = tracing::trace_span!(target: "indexing::documents", parent: &indexer_span, "extract");
let rtxn = index.read_txn()?;
// document but we need to create a function that collects and compresses documents.
let document_sender = extractor_sender.documents();
let document_extractor = DocumentsExtractor::new(&document_sender, embedders);
let datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
let (finished_steps, step_name) = steps::extract_documents();
extract(document_changes,
&document_extractor,
indexing_context,
&mut extractor_allocs,
&datastore,
finished_steps,
total_steps,
step_name,
)?;
for document_extractor_data in datastore {
let document_extractor_data = document_extractor_data.0.into_inner();
for (field, delta) in document_extractor_data.field_distribution_delta {
let current = field_distribution.entry(field).or_default();
// adding the delta should never cause a negative result, as we are removing fields that previously existed.
*current = current.saturating_add_signed(delta);
}
document_extractor_data.docids_delta.apply_to(document_ids);
}
field_distribution.retain(|_, v| *v != 0);
const TEN_GIB: usize = 10 * 1024 * 1024 * 1024;
let current_num_threads = rayon::current_num_threads();
let max_memory = TEN_GIB / current_num_threads;
eprintln!("A maximum of {max_memory} bytes will be used for each of the {current_num_threads} threads");
let grenad_parameters = GrenadParameters {
max_memory: Some(max_memory),
..GrenadParameters::default()
};
let facet_field_ids_delta;
{
let span = tracing::trace_span!(target: "indexing::documents::extract", "faceted");
let _entered = span.enter(); let _entered = span.enter();
let (finished_steps, step_name) = steps::extract_facets(); let rtxn = index.read_txn()?;
facet_field_ids_delta = merge_and_send_facet_docids( // document but we need to create a function that collects and compresses documents.
FacetedDocidsExtractor::run_extraction(grenad_parameters, let document_sender = extractor_sender.documents();
let document_extractor = DocumentsExtractor::new(&document_sender, embedders);
let datastore = ThreadLocal::with_capacity(pool.current_num_threads());
let (finished_steps, step_name) = steps::extract_documents();
extract(document_changes, &document_extractor, indexing_context, &mut extractor_allocs, &datastore, finished_steps, total_steps, step_name)?;
for document_extractor_data in datastore {
let document_extractor_data = document_extractor_data.0.into_inner();
for (field, delta) in document_extractor_data.field_distribution_delta {
let current = field_distribution.entry(field).or_default();
// adding the delta should never cause a negative result, as we are removing fields that previously existed.
*current = current.saturating_add_signed(delta);
}
document_extractor_data.docids_delta.apply_to(document_ids);
}
field_distribution.retain(|_, v| *v != 0);
const TEN_GIB: usize = 10 * 1024 * 1024 * 1024;
let current_num_threads = rayon::current_num_threads();
let max_memory = TEN_GIB / current_num_threads;
eprintln!("A maximum of {max_memory} bytes will be used for each of the {current_num_threads} threads");
let grenad_parameters = GrenadParameters {
max_memory: Some(max_memory),
..GrenadParameters::default()
};
let facet_field_ids_delta;
{
let span = tracing::trace_span!(target: "indexing::documents::extract", "faceted");
let _entered = span.enter();
let (finished_steps, step_name) = steps::extract_facets();
facet_field_ids_delta = merge_and_send_facet_docids(
FacetedDocidsExtractor::run_extraction(grenad_parameters, document_changes, indexing_context, &mut extractor_allocs, &extractor_sender.field_id_docid_facet_sender(), finished_steps, total_steps, step_name)?,
FacetDatabases::new(index),
index,
extractor_sender.facet_docids(),
)?;
}
{
let span = tracing::trace_span!(target: "indexing::documents::extract", "word_docids");
let _entered = span.enter();
let (finished_steps, step_name) = steps::extract_words();
let WordDocidsCaches {
word_docids,
word_fid_docids,
exact_word_docids,
word_position_docids,
fid_word_count_docids,
} = WordDocidsExtractors::run_extraction(
grenad_parameters,
document_changes, document_changes,
indexing_context, indexing_context,
&mut extractor_allocs, &mut extractor_allocs,
&extractor_sender.field_id_docid_facet_sender(),
finished_steps, finished_steps,
total_steps, total_steps,
step_name, step_name,
)?,
FacetDatabases::new(index),
index,
extractor_sender.facet_docids(),
)?;
}
{
let span = tracing::trace_span!(target: "indexing::documents::extract", "word_docids");
let _entered = span.enter();
let (finished_steps, step_name) = steps::extract_words();
let WordDocidsCaches {
word_docids,
word_fid_docids,
exact_word_docids,
word_position_docids,
fid_word_count_docids,
} = WordDocidsExtractors::run_extraction(
grenad_parameters,
document_changes,
indexing_context,
&mut extractor_allocs,
finished_steps,
total_steps,
step_name,
)?;
// TODO Word Docids Merger
// extractor_sender.send_searchable::<WordDocids>(word_docids).unwrap();
{
let span = tracing::trace_span!(target: "indexing::documents::merge", "word_docids");
let _entered = span.enter();
merge_and_send_docids(
word_docids,
index.word_docids.remap_types(),
index,
extractor_sender.docids::<WordDocids>(),
&indexing_context.must_stop_processing,
)?; )?;
}
// Word Fid Docids Merging // TODO Word Docids Merger
// extractor_sender.send_searchable::<WordFidDocids>(word_fid_docids).unwrap(); // extractor_sender.send_searchable::<WordDocids>(word_docids).unwrap();
{ {
let span = tracing::trace_span!(target: "indexing::documents::merge", "word_fid_docids"); let span = tracing::trace_span!(target: "indexing::documents::merge", "word_docids");
let _entered = span.enter(); let _entered = span.enter();
merge_and_send_docids( merge_and_send_docids(
word_fid_docids, word_docids,
index.word_fid_docids.remap_types(), index.word_docids.remap_types(),
index, index,
extractor_sender.docids::<WordFidDocids>(), extractor_sender.docids::<WordDocids>(),
&indexing_context.must_stop_processing, &indexing_context.must_stop_processing,
)?; )?;
} }
// Exact Word Docids Merging // Word Fid Docids Merging
// extractor_sender.send_searchable::<ExactWordDocids>(exact_word_docids).unwrap(); // extractor_sender.send_searchable::<WordFidDocids>(word_fid_docids).unwrap();
{ {
let span = tracing::trace_span!(target: "indexing::documents::merge", "exact_word_docids"); let span = tracing::trace_span!(target: "indexing::documents::merge", "word_fid_docids");
let _entered = span.enter(); let _entered = span.enter();
merge_and_send_docids( merge_and_send_docids(
exact_word_docids, word_fid_docids,
index.exact_word_docids.remap_types(), index.word_fid_docids.remap_types(),
index, index,
extractor_sender.docids::<ExactWordDocids>(), extractor_sender.docids::<WordFidDocids>(),
&indexing_context.must_stop_processing, &indexing_context.must_stop_processing,
)?; )?;
} }
// Word Position Docids Merging // Exact Word Docids Merging
// extractor_sender.send_searchable::<WordPositionDocids>(word_position_docids).unwrap(); // extractor_sender.send_searchable::<ExactWordDocids>(exact_word_docids).unwrap();
{ {
let span = tracing::trace_span!(target: "indexing::documents::merge", "word_position_docids"); let span = tracing::trace_span!(target: "indexing::documents::merge", "exact_word_docids");
let _entered = span.enter(); let _entered = span.enter();
merge_and_send_docids( merge_and_send_docids(
word_position_docids, exact_word_docids,
index.word_position_docids.remap_types(), index.exact_word_docids.remap_types(),
index, index,
extractor_sender.docids::<WordPositionDocids>(), extractor_sender.docids::<ExactWordDocids>(),
&indexing_context.must_stop_processing, &indexing_context.must_stop_processing,
)?; )?;
} }
// Fid Word Count Docids Merging // Word Position Docids Merging
// extractor_sender.send_searchable::<FidWordCountDocids>(fid_word_count_docids).unwrap(); // extractor_sender.send_searchable::<WordPositionDocids>(word_position_docids).unwrap();
{ {
let span = tracing::trace_span!(target: "indexing::documents::merge", "fid_word_count_docids"); let span = tracing::trace_span!(target: "indexing::documents::merge", "word_position_docids");
let _entered = span.enter(); let _entered = span.enter();
merge_and_send_docids( merge_and_send_docids(
fid_word_count_docids, word_position_docids,
index.field_id_word_count_docids.remap_types(), index.word_position_docids.remap_types(),
index, index,
extractor_sender.docids::<FidWordCountDocids>(), extractor_sender.docids::<WordPositionDocids>(),
&indexing_context.must_stop_processing, &indexing_context.must_stop_processing,
)?; )?;
} }
}
// run the proximity extraction only if the precision is by word // Fid Word Count Docids Merging
// this works only if the settings didn't change during this transaction. // extractor_sender.send_searchable::<FidWordCountDocids>(fid_word_count_docids).unwrap();
let proximity_precision = index.proximity_precision(&rtxn)?.unwrap_or_default(); {
if proximity_precision == ProximityPrecision::ByWord { let span = tracing::trace_span!(target: "indexing::documents::merge", "fid_word_count_docids");
let span = tracing::trace_span!(target: "indexing::documents::extract", "word_pair_proximity_docids"); let _entered = span.enter();
let _entered = span.enter(); merge_and_send_docids(
fid_word_count_docids,
let (finished_steps, step_name) = steps::extract_word_proximity(); index.field_id_word_count_docids.remap_types(),
index,
let caches = <WordPairProximityDocidsExtractor as DocidsExtractor>::run_extraction(grenad_parameters, extractor_sender.docids::<FidWordCountDocids>(),
document_changes, &indexing_context.must_stop_processing,
indexing_context, )?;
&mut extractor_allocs,
finished_steps,
total_steps,
step_name,
)?;
merge_and_send_docids(
caches,
index.word_pair_proximity_docids.remap_types(),
index,
extractor_sender.docids::<WordPairProximityDocids>(),
&indexing_context.must_stop_processing,
)?;
}
'vectors: {
let span = tracing::trace_span!(target: "indexing::documents::extract", "vectors");
let _entered = span.enter();
let mut index_embeddings = index.embedding_configs(&rtxn)?;
if index_embeddings.is_empty() {
break 'vectors;
}
let embedding_sender = extractor_sender.embeddings();
let extractor = EmbeddingExtractor::new(embedders, &embedding_sender, field_distribution, request_threads());
let mut datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
let (finished_steps, step_name) = steps::extract_embeddings();
extract(document_changes, &extractor, indexing_context, &mut extractor_allocs, &datastore, finished_steps, total_steps, step_name)?;
for config in &mut index_embeddings {
'data: for data in datastore.iter_mut() {
let data = &mut data.get_mut().0;
let Some(deladd) = data.remove(&config.name) else { continue 'data; };
deladd.apply_to(&mut config.user_provided);
} }
} }
embedding_sender.finish(index_embeddings).unwrap(); // run the proximity extraction only if the precision is by word
} // this works only if the settings didn't change during this transaction.
let proximity_precision = index.proximity_precision(&rtxn)?.unwrap_or_default();
if proximity_precision == ProximityPrecision::ByWord {
let span = tracing::trace_span!(target: "indexing::documents::extract", "word_pair_proximity_docids");
let _entered = span.enter();
'geo: { let (finished_steps, step_name) = steps::extract_word_proximity();
let span = tracing::trace_span!(target: "indexing::documents::extract", "geo");
let _entered = span.enter();
// let geo_sender = extractor_sender.geo_points(); let caches = <WordPairProximityDocidsExtractor as DocidsExtractor>::run_extraction(grenad_parameters,
let Some(extractor) = GeoExtractor::new(&rtxn, index, grenad_parameters)? else { document_changes,
break 'geo; indexing_context,
}; &mut extractor_allocs,
let datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); finished_steps,
let (finished_steps, step_name) = steps::extract_geo_points(); total_steps,
extract(document_changes, step_name,
&extractor, )?;
indexing_context,
&mut extractor_allocs,
&datastore,
finished_steps,
total_steps,
step_name,
)?;
merge_and_send_rtree( merge_and_send_docids(
datastore, caches,
&rtxn, index.word_pair_proximity_docids.remap_types(),
index, index,
extractor_sender.geo(), extractor_sender.docids::<WordPairProximityDocids>(),
&indexing_context.must_stop_processing, &indexing_context.must_stop_processing,
)?; )?;
} }
// TODO THIS IS TOO MUCH 'vectors: {
// - [ ] Extract fieldid docid facet number let span = tracing::trace_span!(target: "indexing::documents::extract", "vectors");
// - [ ] Extract fieldid docid facet string let _entered = span.enter();
// - [ ] Extract facetid string fst
// - [ ] Extract facetid normalized string strings
// TODO Inverted Indexes again let mut index_embeddings = index.embedding_configs(&rtxn)?;
// - [x] Extract fieldid facet isempty docids if index_embeddings.is_empty() {
// - [x] Extract fieldid facet isnull docids break 'vectors;
// - [x] Extract fieldid facet exists docids }
// TODO This is the normal system let embedding_sender = extractor_sender.embeddings();
// - [x] Extract fieldid facet number docids let extractor = EmbeddingExtractor::new(embedders, &embedding_sender, field_distribution, request_threads());
// - [x] Extract fieldid facet string docids let mut datastore = ThreadLocal::with_capacity(pool.current_num_threads());
let (finished_steps, step_name) = steps::extract_embeddings();
extract(document_changes, &extractor, indexing_context, &mut extractor_allocs, &datastore, finished_steps, total_steps, step_name)?;
for config in &mut index_embeddings {
'data: for data in datastore.iter_mut() {
let data = &mut data.get_mut().0;
let Some(deladd) = data.remove(&config.name) else { continue 'data; };
deladd.apply_to(&mut config.user_provided);
}
}
embedding_sender.finish(index_embeddings).unwrap();
}
'geo: {
let span = tracing::trace_span!(target: "indexing::documents::extract", "geo");
let _entered = span.enter();
// let geo_sender = extractor_sender.geo_points();
let Some(extractor) = GeoExtractor::new(&rtxn, index, grenad_parameters)? else {
break 'geo;
};
let datastore = ThreadLocal::with_capacity(pool.current_num_threads());
let (finished_steps, step_name) = steps::extract_geo_points();
extract(document_changes,
&extractor,
indexing_context,
&mut extractor_allocs,
&datastore,
finished_steps,
total_steps,
step_name,
)?;
merge_and_send_rtree(
datastore,
&rtxn,
index,
extractor_sender.geo(),
&indexing_context.must_stop_processing,
)?;
}
// TODO THIS IS TOO MUCH
// - [ ] Extract fieldid docid facet number
// - [ ] Extract fieldid docid facet string
// - [ ] Extract facetid string fst
// - [ ] Extract facetid normalized string strings
// TODO Inverted Indexes again
// - [x] Extract fieldid facet isempty docids
// - [x] Extract fieldid facet isnull docids
// - [x] Extract fieldid facet exists docids
// TODO This is the normal system
// - [x] Extract fieldid facet number docids
// - [x] Extract fieldid facet string docids
Result::Ok(facet_field_ids_delta)
});
{ {
let span = tracing::trace_span!(target: "indexing::documents::extract", "FINISH"); let span = tracing::trace_span!(target: "indexing::documents::extract", "FINISH");
@ -439,7 +429,7 @@ where
(indexing_context.send_progress)(Progress { finished_steps, total_steps, step_name, finished_total_documents: None }); (indexing_context.send_progress)(Progress { finished_steps, total_steps, step_name, finished_total_documents: None });
} }
Result::Ok(facet_field_ids_delta) result
})?; })?;
let global_fields_ids_map = GlobalFieldsIdsMap::new(&new_fields_ids_map); let global_fields_ids_map = GlobalFieldsIdsMap::new(&new_fields_ids_map);

View File

@ -1,4 +1,5 @@
use std::cell::RefCell; use std::cell::RefCell;
use std::io;
use hashbrown::HashSet; use hashbrown::HashSet;
use heed::types::Bytes; use heed::types::Bytes;

View File

@ -286,7 +286,7 @@ impl<'doc> MergedVectorDocument<'doc> {
) -> Result<Option<Self>> { ) -> Result<Option<Self>> {
let db = VectorDocumentFromDb::new(docid, index, rtxn, db_fields_ids_map, doc_alloc)?; let db = VectorDocumentFromDb::new(docid, index, rtxn, db_fields_ids_map, doc_alloc)?;
let new_doc = let new_doc =
VectorDocumentFromVersions::new(external_document_id, versions, doc_alloc, embedders)?; VectorDocumentFromVersions::new(&external_document_id, versions, doc_alloc, embedders)?;
Ok(if db.is_none() && new_doc.is_none() { None } else { Some(Self { new_doc, db }) }) Ok(if db.is_none() && new_doc.is_none() { None } else { Some(Self { new_doc, db }) })
} }

View File

@ -1,14 +1,13 @@
use std::collections::HashSet;
use std::io::BufWriter; use std::io::BufWriter;
use fst::{Set, SetBuilder, Streamer}; use fst::{Set, SetBuilder, Streamer};
use memmap2::Mmap; use memmap2::Mmap;
use std::collections::HashSet;
use tempfile::tempfile; use tempfile::tempfile;
use crate::{index::PrefixSettings, update::del_add::DelAdd, InternalError, Prefix, Result};
use super::fst_merger_builder::FstMergerBuilder; use super::fst_merger_builder::FstMergerBuilder;
use crate::index::PrefixSettings;
use crate::update::del_add::DelAdd;
use crate::{InternalError, Prefix, Result};
pub struct WordFstBuilder<'a> { pub struct WordFstBuilder<'a> {
word_fst_builder: FstMergerBuilder<'a>, word_fst_builder: FstMergerBuilder<'a>,