mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-01-19 01:18:31 +08:00
refactor indexer mod
This commit is contained in:
parent
56fd4ee9bd
commit
de7f8c4406
@ -28,7 +28,7 @@ use crate::{DocumentId, FieldId, Index, Result, MAX_FACET_VALUE_LENGTH};
|
|||||||
pub struct FacetedExtractorData<'a, 'b> {
|
pub struct FacetedExtractorData<'a, 'b> {
|
||||||
attributes_to_extract: &'a [&'a str],
|
attributes_to_extract: &'a [&'a str],
|
||||||
sender: &'a FieldIdDocidFacetSender<'a, 'b>,
|
sender: &'a FieldIdDocidFacetSender<'a, 'b>,
|
||||||
grenad_parameters: GrenadParameters,
|
grenad_parameters: &'a GrenadParameters,
|
||||||
buckets: usize,
|
buckets: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -374,7 +374,6 @@ fn truncate_str(s: &str) -> &str {
|
|||||||
impl FacetedDocidsExtractor {
|
impl FacetedDocidsExtractor {
|
||||||
#[tracing::instrument(level = "trace", skip_all, target = "indexing::extract::faceted")]
|
#[tracing::instrument(level = "trace", skip_all, target = "indexing::extract::faceted")]
|
||||||
pub fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>, MSP>(
|
pub fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>, MSP>(
|
||||||
grenad_parameters: GrenadParameters,
|
|
||||||
document_changes: &DC,
|
document_changes: &DC,
|
||||||
indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP>,
|
indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP>,
|
||||||
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
|
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
|
||||||
@ -398,7 +397,7 @@ impl FacetedDocidsExtractor {
|
|||||||
|
|
||||||
let extractor = FacetedExtractorData {
|
let extractor = FacetedExtractorData {
|
||||||
attributes_to_extract: &attributes_to_extract,
|
attributes_to_extract: &attributes_to_extract,
|
||||||
grenad_parameters,
|
grenad_parameters: indexing_context.grenad_parameters,
|
||||||
buckets: rayon::current_num_threads(),
|
buckets: rayon::current_num_threads(),
|
||||||
sender,
|
sender,
|
||||||
};
|
};
|
||||||
|
@ -18,12 +18,10 @@ pub use vectors::EmbeddingExtractor;
|
|||||||
use super::indexer::document_changes::{DocumentChanges, IndexingContext};
|
use super::indexer::document_changes::{DocumentChanges, IndexingContext};
|
||||||
use super::steps::IndexingStep;
|
use super::steps::IndexingStep;
|
||||||
use super::thread_local::{FullySend, ThreadLocal};
|
use super::thread_local::{FullySend, ThreadLocal};
|
||||||
use crate::update::GrenadParameters;
|
|
||||||
use crate::Result;
|
use crate::Result;
|
||||||
|
|
||||||
pub trait DocidsExtractor {
|
pub trait DocidsExtractor {
|
||||||
fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>, MSP>(
|
fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>, MSP>(
|
||||||
grenad_parameters: GrenadParameters,
|
|
||||||
document_changes: &DC,
|
document_changes: &DC,
|
||||||
indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP>,
|
indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP>,
|
||||||
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
|
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
|
||||||
|
@ -208,7 +208,7 @@ impl<'extractor> WordDocidsCaches<'extractor> {
|
|||||||
|
|
||||||
pub struct WordDocidsExtractorData<'a> {
|
pub struct WordDocidsExtractorData<'a> {
|
||||||
tokenizer: &'a DocumentTokenizer<'a>,
|
tokenizer: &'a DocumentTokenizer<'a>,
|
||||||
grenad_parameters: GrenadParameters,
|
grenad_parameters: &'a GrenadParameters,
|
||||||
buckets: usize,
|
buckets: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -240,7 +240,6 @@ pub struct WordDocidsExtractors;
|
|||||||
|
|
||||||
impl WordDocidsExtractors {
|
impl WordDocidsExtractors {
|
||||||
pub fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>, MSP>(
|
pub fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>, MSP>(
|
||||||
grenad_parameters: GrenadParameters,
|
|
||||||
document_changes: &DC,
|
document_changes: &DC,
|
||||||
indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP>,
|
indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP>,
|
||||||
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
|
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
|
||||||
@ -288,7 +287,7 @@ impl WordDocidsExtractors {
|
|||||||
|
|
||||||
let extractor = WordDocidsExtractorData {
|
let extractor = WordDocidsExtractorData {
|
||||||
tokenizer: &document_tokenizer,
|
tokenizer: &document_tokenizer,
|
||||||
grenad_parameters,
|
grenad_parameters: indexing_context.grenad_parameters,
|
||||||
buckets: rayon::current_num_threads(),
|
buckets: rayon::current_num_threads(),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -24,7 +24,7 @@ use crate::{Index, Result, MAX_POSITION_PER_ATTRIBUTE};
|
|||||||
|
|
||||||
pub struct SearchableExtractorData<'a, EX: SearchableExtractor> {
|
pub struct SearchableExtractorData<'a, EX: SearchableExtractor> {
|
||||||
tokenizer: &'a DocumentTokenizer<'a>,
|
tokenizer: &'a DocumentTokenizer<'a>,
|
||||||
grenad_parameters: GrenadParameters,
|
grenad_parameters: &'a GrenadParameters,
|
||||||
buckets: usize,
|
buckets: usize,
|
||||||
_ex: PhantomData<EX>,
|
_ex: PhantomData<EX>,
|
||||||
}
|
}
|
||||||
@ -57,7 +57,6 @@ impl<'a, 'extractor, EX: SearchableExtractor + Sync> Extractor<'extractor>
|
|||||||
|
|
||||||
pub trait SearchableExtractor: Sized + Sync {
|
pub trait SearchableExtractor: Sized + Sync {
|
||||||
fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>, MSP>(
|
fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>, MSP>(
|
||||||
grenad_parameters: GrenadParameters,
|
|
||||||
document_changes: &DC,
|
document_changes: &DC,
|
||||||
indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP>,
|
indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP>,
|
||||||
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
|
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
|
||||||
@ -96,7 +95,7 @@ pub trait SearchableExtractor: Sized + Sync {
|
|||||||
|
|
||||||
let extractor_data: SearchableExtractorData<Self> = SearchableExtractorData {
|
let extractor_data: SearchableExtractorData<Self> = SearchableExtractorData {
|
||||||
tokenizer: &document_tokenizer,
|
tokenizer: &document_tokenizer,
|
||||||
grenad_parameters,
|
grenad_parameters: indexing_context.grenad_parameters,
|
||||||
buckets: rayon::current_num_threads(),
|
buckets: rayon::current_num_threads(),
|
||||||
_ex: PhantomData,
|
_ex: PhantomData,
|
||||||
};
|
};
|
||||||
@ -134,7 +133,6 @@ pub trait SearchableExtractor: Sized + Sync {
|
|||||||
|
|
||||||
impl<T: SearchableExtractor> DocidsExtractor for T {
|
impl<T: SearchableExtractor> DocidsExtractor for T {
|
||||||
fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>, MSP>(
|
fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>, MSP>(
|
||||||
grenad_parameters: GrenadParameters,
|
|
||||||
document_changes: &DC,
|
document_changes: &DC,
|
||||||
indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP>,
|
indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP>,
|
||||||
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
|
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
|
||||||
@ -143,12 +141,6 @@ impl<T: SearchableExtractor> DocidsExtractor for T {
|
|||||||
where
|
where
|
||||||
MSP: Fn() -> bool + Sync,
|
MSP: Fn() -> bool + Sync,
|
||||||
{
|
{
|
||||||
Self::run_extraction(
|
Self::run_extraction(document_changes, indexing_context, extractor_allocs, step)
|
||||||
grenad_parameters,
|
|
||||||
document_changes,
|
|
||||||
indexing_context,
|
|
||||||
extractor_allocs,
|
|
||||||
step,
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
187
crates/milli/src/update/new/indexer/compute.rs
Normal file
187
crates/milli/src/update/new/indexer/compute.rs
Normal file
@ -0,0 +1,187 @@
|
|||||||
|
use std::cmp::Ordering;
|
||||||
|
|
||||||
|
use heed::types::{Bytes, DecodeIgnore, Str};
|
||||||
|
use heed::RwTxn;
|
||||||
|
use itertools::{merge_join_by, EitherOrBoth};
|
||||||
|
|
||||||
|
use super::document_changes::IndexingContext;
|
||||||
|
use crate::facet::FacetType;
|
||||||
|
use crate::index::main_key::{WORDS_FST_KEY, WORDS_PREFIXES_FST_KEY};
|
||||||
|
use crate::update::del_add::DelAdd;
|
||||||
|
use crate::update::new::facet_search_builder::FacetSearchBuilder;
|
||||||
|
use crate::update::new::steps::IndexingStep;
|
||||||
|
use crate::update::new::word_fst_builder::{PrefixData, PrefixDelta, WordFstBuilder};
|
||||||
|
use crate::update::new::words_prefix_docids::{
|
||||||
|
compute_exact_word_prefix_docids, compute_word_prefix_docids, compute_word_prefix_fid_docids,
|
||||||
|
compute_word_prefix_position_docids,
|
||||||
|
};
|
||||||
|
use crate::update::new::FacetFieldIdsDelta;
|
||||||
|
use crate::update::{FacetsUpdateBulk, GrenadParameters};
|
||||||
|
use crate::{GlobalFieldsIdsMap, Index, Result};
|
||||||
|
|
||||||
|
pub(super) fn postprocess<MSP>(
|
||||||
|
indexing_context: IndexingContext<MSP>,
|
||||||
|
wtxn: &mut RwTxn<'_>,
|
||||||
|
global_fields_ids_map: GlobalFieldsIdsMap<'_>,
|
||||||
|
facet_field_ids_delta: FacetFieldIdsDelta,
|
||||||
|
) -> Result<()>
|
||||||
|
where
|
||||||
|
MSP: Fn() -> bool + Sync,
|
||||||
|
{
|
||||||
|
let index = indexing_context.index;
|
||||||
|
indexing_context.progress.update_progress(IndexingStep::PostProcessingFacets);
|
||||||
|
if index.facet_search(wtxn)? {
|
||||||
|
compute_facet_search_database(index, wtxn, global_fields_ids_map)?;
|
||||||
|
}
|
||||||
|
compute_facet_level_database(index, wtxn, facet_field_ids_delta)?;
|
||||||
|
indexing_context.progress.update_progress(IndexingStep::PostProcessingWords);
|
||||||
|
if let Some(prefix_delta) = compute_word_fst(index, wtxn)? {
|
||||||
|
compute_prefix_database(index, wtxn, prefix_delta, indexing_context.grenad_parameters)?;
|
||||||
|
};
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")]
|
||||||
|
fn compute_prefix_database(
|
||||||
|
index: &Index,
|
||||||
|
wtxn: &mut RwTxn,
|
||||||
|
prefix_delta: PrefixDelta,
|
||||||
|
grenad_parameters: &GrenadParameters,
|
||||||
|
) -> Result<()> {
|
||||||
|
let PrefixDelta { modified, deleted } = prefix_delta;
|
||||||
|
// Compute word prefix docids
|
||||||
|
compute_word_prefix_docids(wtxn, index, &modified, &deleted, grenad_parameters)?;
|
||||||
|
// Compute exact word prefix docids
|
||||||
|
compute_exact_word_prefix_docids(wtxn, index, &modified, &deleted, grenad_parameters)?;
|
||||||
|
// Compute word prefix fid docids
|
||||||
|
compute_word_prefix_fid_docids(wtxn, index, &modified, &deleted, grenad_parameters)?;
|
||||||
|
// Compute word prefix position docids
|
||||||
|
compute_word_prefix_position_docids(wtxn, index, &modified, &deleted, grenad_parameters)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(level = "trace", skip_all, target = "indexing")]
|
||||||
|
fn compute_word_fst(index: &Index, wtxn: &mut RwTxn) -> Result<Option<PrefixDelta>> {
|
||||||
|
let rtxn = index.read_txn()?;
|
||||||
|
let words_fst = index.words_fst(&rtxn)?;
|
||||||
|
let mut word_fst_builder = WordFstBuilder::new(&words_fst)?;
|
||||||
|
let prefix_settings = index.prefix_settings(&rtxn)?;
|
||||||
|
word_fst_builder.with_prefix_settings(prefix_settings);
|
||||||
|
|
||||||
|
let previous_words = index.word_docids.iter(&rtxn)?.remap_data_type::<Bytes>();
|
||||||
|
let current_words = index.word_docids.iter(wtxn)?.remap_data_type::<Bytes>();
|
||||||
|
for eob in merge_join_by(previous_words, current_words, |lhs, rhs| match (lhs, rhs) {
|
||||||
|
(Ok((l, _)), Ok((r, _))) => l.cmp(r),
|
||||||
|
(Err(_), _) | (_, Err(_)) => Ordering::Equal,
|
||||||
|
}) {
|
||||||
|
match eob {
|
||||||
|
EitherOrBoth::Both(lhs, rhs) => {
|
||||||
|
let (word, lhs_bytes) = lhs?;
|
||||||
|
let (_, rhs_bytes) = rhs?;
|
||||||
|
if lhs_bytes != rhs_bytes {
|
||||||
|
word_fst_builder.register_word(DelAdd::Addition, word.as_ref())?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
EitherOrBoth::Left(result) => {
|
||||||
|
let (word, _) = result?;
|
||||||
|
word_fst_builder.register_word(DelAdd::Deletion, word.as_ref())?;
|
||||||
|
}
|
||||||
|
EitherOrBoth::Right(result) => {
|
||||||
|
let (word, _) = result?;
|
||||||
|
word_fst_builder.register_word(DelAdd::Addition, word.as_ref())?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let (word_fst_mmap, prefix_data) = word_fst_builder.build(index, &rtxn)?;
|
||||||
|
index.main.remap_types::<Str, Bytes>().put(wtxn, WORDS_FST_KEY, &word_fst_mmap)?;
|
||||||
|
if let Some(PrefixData { prefixes_fst_mmap, prefix_delta }) = prefix_data {
|
||||||
|
index.main.remap_types::<Str, Bytes>().put(
|
||||||
|
wtxn,
|
||||||
|
WORDS_PREFIXES_FST_KEY,
|
||||||
|
&prefixes_fst_mmap,
|
||||||
|
)?;
|
||||||
|
Ok(Some(prefix_delta))
|
||||||
|
} else {
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(level = "trace", skip_all, target = "indexing::facet_search")]
|
||||||
|
fn compute_facet_search_database(
|
||||||
|
index: &Index,
|
||||||
|
wtxn: &mut RwTxn,
|
||||||
|
global_fields_ids_map: GlobalFieldsIdsMap,
|
||||||
|
) -> Result<()> {
|
||||||
|
let rtxn = index.read_txn()?;
|
||||||
|
let localized_attributes_rules = index.localized_attributes_rules(&rtxn)?;
|
||||||
|
let mut facet_search_builder = FacetSearchBuilder::new(
|
||||||
|
global_fields_ids_map,
|
||||||
|
localized_attributes_rules.unwrap_or_default(),
|
||||||
|
);
|
||||||
|
|
||||||
|
let previous_facet_id_string_docids = index
|
||||||
|
.facet_id_string_docids
|
||||||
|
.iter(&rtxn)?
|
||||||
|
.remap_data_type::<DecodeIgnore>()
|
||||||
|
.filter(|r| r.as_ref().map_or(true, |(k, _)| k.level == 0));
|
||||||
|
let current_facet_id_string_docids = index
|
||||||
|
.facet_id_string_docids
|
||||||
|
.iter(wtxn)?
|
||||||
|
.remap_data_type::<DecodeIgnore>()
|
||||||
|
.filter(|r| r.as_ref().map_or(true, |(k, _)| k.level == 0));
|
||||||
|
for eob in merge_join_by(
|
||||||
|
previous_facet_id_string_docids,
|
||||||
|
current_facet_id_string_docids,
|
||||||
|
|lhs, rhs| match (lhs, rhs) {
|
||||||
|
(Ok((l, _)), Ok((r, _))) => l.cmp(r),
|
||||||
|
(Err(_), _) | (_, Err(_)) => Ordering::Equal,
|
||||||
|
},
|
||||||
|
) {
|
||||||
|
match eob {
|
||||||
|
EitherOrBoth::Both(lhs, rhs) => {
|
||||||
|
let (_, _) = lhs?;
|
||||||
|
let (_, _) = rhs?;
|
||||||
|
}
|
||||||
|
EitherOrBoth::Left(result) => {
|
||||||
|
let (key, _) = result?;
|
||||||
|
facet_search_builder.register_from_key(DelAdd::Deletion, key)?;
|
||||||
|
}
|
||||||
|
EitherOrBoth::Right(result) => {
|
||||||
|
let (key, _) = result?;
|
||||||
|
facet_search_builder.register_from_key(DelAdd::Addition, key)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
facet_search_builder.merge_and_write(index, wtxn, &rtxn)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(level = "trace", skip_all, target = "indexing::facet_field_ids")]
|
||||||
|
fn compute_facet_level_database(
|
||||||
|
index: &Index,
|
||||||
|
wtxn: &mut RwTxn,
|
||||||
|
facet_field_ids_delta: FacetFieldIdsDelta,
|
||||||
|
) -> Result<()> {
|
||||||
|
if let Some(modified_facet_string_ids) = facet_field_ids_delta.modified_facet_string_ids() {
|
||||||
|
let span = tracing::trace_span!(target: "indexing::facet_field_ids", "string");
|
||||||
|
let _entered = span.enter();
|
||||||
|
FacetsUpdateBulk::new_not_updating_level_0(
|
||||||
|
index,
|
||||||
|
modified_facet_string_ids,
|
||||||
|
FacetType::String,
|
||||||
|
)
|
||||||
|
.execute(wtxn)?;
|
||||||
|
}
|
||||||
|
if let Some(modified_facet_number_ids) = facet_field_ids_delta.modified_facet_number_ids() {
|
||||||
|
let span = tracing::trace_span!(target: "indexing::facet_field_ids", "number");
|
||||||
|
let _entered = span.enter();
|
||||||
|
FacetsUpdateBulk::new_not_updating_level_0(
|
||||||
|
index,
|
||||||
|
modified_facet_number_ids,
|
||||||
|
FacetType::Number,
|
||||||
|
)
|
||||||
|
.execute(wtxn)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
@ -12,6 +12,7 @@ use crate::progress::{AtomicDocumentStep, Progress};
|
|||||||
use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _;
|
use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _;
|
||||||
use crate::update::new::steps::IndexingStep;
|
use crate::update::new::steps::IndexingStep;
|
||||||
use crate::update::new::thread_local::{FullySend, MostlySend, ThreadLocal};
|
use crate::update::new::thread_local::{FullySend, MostlySend, ThreadLocal};
|
||||||
|
use crate::update::GrenadParameters;
|
||||||
use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result};
|
use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result};
|
||||||
|
|
||||||
pub struct DocumentChangeContext<
|
pub struct DocumentChangeContext<
|
||||||
@ -145,6 +146,7 @@ pub struct IndexingContext<
|
|||||||
pub fields_ids_map_store: &'indexer ThreadLocal<FullySend<RefCell<GlobalFieldsIdsMap<'fid>>>>,
|
pub fields_ids_map_store: &'indexer ThreadLocal<FullySend<RefCell<GlobalFieldsIdsMap<'fid>>>>,
|
||||||
pub must_stop_processing: &'indexer MSP,
|
pub must_stop_processing: &'indexer MSP,
|
||||||
pub progress: &'indexer Progress,
|
pub progress: &'indexer Progress,
|
||||||
|
pub grenad_parameters: &'indexer GrenadParameters,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<
|
impl<
|
||||||
@ -207,6 +209,7 @@ pub fn extract<
|
|||||||
fields_ids_map_store,
|
fields_ids_map_store,
|
||||||
must_stop_processing,
|
must_stop_processing,
|
||||||
progress,
|
progress,
|
||||||
|
grenad_parameters: _,
|
||||||
}: IndexingContext<'fid, 'indexer, 'index, MSP>,
|
}: IndexingContext<'fid, 'indexer, 'index, MSP>,
|
||||||
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
|
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
|
||||||
datastore: &'data ThreadLocal<EX::Data>,
|
datastore: &'data ThreadLocal<EX::Data>,
|
||||||
|
@ -166,6 +166,7 @@ mod test {
|
|||||||
fields_ids_map_store: &fields_ids_map_store,
|
fields_ids_map_store: &fields_ids_map_store,
|
||||||
must_stop_processing: &(|| false),
|
must_stop_processing: &(|| false),
|
||||||
progress: &Progress::default(),
|
progress: &Progress::default(),
|
||||||
|
grenad_parameters: &Default::default(),
|
||||||
};
|
};
|
||||||
|
|
||||||
for _ in 0..3 {
|
for _ in 0..3 {
|
||||||
|
@ -13,7 +13,7 @@ use serde_json::Deserializer;
|
|||||||
|
|
||||||
use super::super::document_change::DocumentChange;
|
use super::super::document_change::DocumentChange;
|
||||||
use super::document_changes::{DocumentChangeContext, DocumentChanges};
|
use super::document_changes::{DocumentChangeContext, DocumentChanges};
|
||||||
use super::retrieve_or_guess_primary_key;
|
use super::guess_primary_key::retrieve_or_guess_primary_key;
|
||||||
use crate::documents::PrimaryKey;
|
use crate::documents::PrimaryKey;
|
||||||
use crate::progress::{AtomicPayloadStep, Progress};
|
use crate::progress::{AtomicPayloadStep, Progress};
|
||||||
use crate::update::new::document::Versions;
|
use crate::update::new::document::Versions;
|
||||||
|
309
crates/milli/src/update/new/indexer/extract.rs
Normal file
309
crates/milli/src/update/new/indexer/extract.rs
Normal file
@ -0,0 +1,309 @@
|
|||||||
|
use std::collections::BTreeMap;
|
||||||
|
use std::sync::atomic::AtomicBool;
|
||||||
|
use std::sync::OnceLock;
|
||||||
|
|
||||||
|
use bumpalo::Bump;
|
||||||
|
use roaring::RoaringBitmap;
|
||||||
|
use tracing::Span;
|
||||||
|
|
||||||
|
use super::super::channel::*;
|
||||||
|
use super::super::extract::*;
|
||||||
|
use super::super::steps::IndexingStep;
|
||||||
|
use super::super::thread_local::{FullySend, ThreadLocal};
|
||||||
|
use super::super::FacetFieldIdsDelta;
|
||||||
|
use super::document_changes::{extract, DocumentChanges, IndexingContext};
|
||||||
|
use crate::index::IndexEmbeddingConfig;
|
||||||
|
use crate::proximity::ProximityPrecision;
|
||||||
|
use crate::update::new::extract::EmbeddingExtractor;
|
||||||
|
use crate::update::new::merger::merge_and_send_rtree;
|
||||||
|
use crate::update::new::{merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases};
|
||||||
|
use crate::vector::EmbeddingConfigs;
|
||||||
|
use crate::{Result, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder};
|
||||||
|
|
||||||
|
#[allow(clippy::too_many_arguments)]
|
||||||
|
pub(super) fn extract_all<'pl, 'extractor, DC, MSP>(
|
||||||
|
document_changes: &DC,
|
||||||
|
indexing_context: IndexingContext<MSP>,
|
||||||
|
indexer_span: Span,
|
||||||
|
extractor_sender: ExtractorBbqueueSender,
|
||||||
|
embedders: &EmbeddingConfigs,
|
||||||
|
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
|
||||||
|
finished_extraction: &AtomicBool,
|
||||||
|
field_distribution: &mut BTreeMap<String, u64>,
|
||||||
|
mut index_embeddings: Vec<IndexEmbeddingConfig>,
|
||||||
|
document_ids: &mut RoaringBitmap,
|
||||||
|
) -> Result<(FacetFieldIdsDelta, Vec<IndexEmbeddingConfig>)>
|
||||||
|
where
|
||||||
|
DC: DocumentChanges<'pl>,
|
||||||
|
MSP: Fn() -> bool + Sync,
|
||||||
|
{
|
||||||
|
let span =
|
||||||
|
tracing::trace_span!(target: "indexing::documents", parent: &indexer_span, "extract");
|
||||||
|
let _entered = span.enter();
|
||||||
|
|
||||||
|
let index = indexing_context.index;
|
||||||
|
let rtxn = index.read_txn()?;
|
||||||
|
|
||||||
|
// document but we need to create a function that collects and compresses documents.
|
||||||
|
let document_sender = extractor_sender.documents();
|
||||||
|
let document_extractor = DocumentsExtractor::new(document_sender, embedders);
|
||||||
|
let datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
|
||||||
|
{
|
||||||
|
let span = tracing::trace_span!(target: "indexing::documents::extract", parent: &indexer_span, "documents");
|
||||||
|
let _entered = span.enter();
|
||||||
|
extract(
|
||||||
|
document_changes,
|
||||||
|
&document_extractor,
|
||||||
|
indexing_context,
|
||||||
|
extractor_allocs,
|
||||||
|
&datastore,
|
||||||
|
IndexingStep::ExtractingDocuments,
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
{
|
||||||
|
let span = tracing::trace_span!(target: "indexing::documents::merge", parent: &indexer_span, "documents");
|
||||||
|
let _entered = span.enter();
|
||||||
|
for document_extractor_data in datastore {
|
||||||
|
let document_extractor_data = document_extractor_data.0.into_inner();
|
||||||
|
for (field, delta) in document_extractor_data.field_distribution_delta {
|
||||||
|
let current = field_distribution.entry(field).or_default();
|
||||||
|
// adding the delta should never cause a negative result, as we are removing fields that previously existed.
|
||||||
|
*current = current.saturating_add_signed(delta);
|
||||||
|
}
|
||||||
|
document_extractor_data.docids_delta.apply_to(document_ids);
|
||||||
|
}
|
||||||
|
|
||||||
|
field_distribution.retain(|_, v| *v != 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
let facet_field_ids_delta;
|
||||||
|
|
||||||
|
{
|
||||||
|
let caches = {
|
||||||
|
let span = tracing::trace_span!(target: "indexing::documents::extract", parent: &indexer_span, "faceted");
|
||||||
|
let _entered = span.enter();
|
||||||
|
|
||||||
|
FacetedDocidsExtractor::run_extraction(
|
||||||
|
document_changes,
|
||||||
|
indexing_context,
|
||||||
|
extractor_allocs,
|
||||||
|
&extractor_sender.field_id_docid_facet_sender(),
|
||||||
|
IndexingStep::ExtractingFacets,
|
||||||
|
)?
|
||||||
|
};
|
||||||
|
|
||||||
|
{
|
||||||
|
let span = tracing::trace_span!(target: "indexing::documents::merge", parent: &indexer_span, "faceted");
|
||||||
|
let _entered = span.enter();
|
||||||
|
|
||||||
|
facet_field_ids_delta = merge_and_send_facet_docids(
|
||||||
|
caches,
|
||||||
|
FacetDatabases::new(index),
|
||||||
|
index,
|
||||||
|
extractor_sender.facet_docids(),
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
let WordDocidsCaches {
|
||||||
|
word_docids,
|
||||||
|
word_fid_docids,
|
||||||
|
exact_word_docids,
|
||||||
|
word_position_docids,
|
||||||
|
fid_word_count_docids,
|
||||||
|
} = {
|
||||||
|
let span = tracing::trace_span!(target: "indexing::documents::extract", "word_docids");
|
||||||
|
let _entered = span.enter();
|
||||||
|
|
||||||
|
WordDocidsExtractors::run_extraction(
|
||||||
|
document_changes,
|
||||||
|
indexing_context,
|
||||||
|
extractor_allocs,
|
||||||
|
IndexingStep::ExtractingWords,
|
||||||
|
)?
|
||||||
|
};
|
||||||
|
|
||||||
|
{
|
||||||
|
let span = tracing::trace_span!(target: "indexing::documents::merge", "word_docids");
|
||||||
|
let _entered = span.enter();
|
||||||
|
merge_and_send_docids(
|
||||||
|
word_docids,
|
||||||
|
index.word_docids.remap_types(),
|
||||||
|
index,
|
||||||
|
extractor_sender.docids::<WordDocids>(),
|
||||||
|
&indexing_context.must_stop_processing,
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
let span =
|
||||||
|
tracing::trace_span!(target: "indexing::documents::merge", "word_fid_docids");
|
||||||
|
let _entered = span.enter();
|
||||||
|
merge_and_send_docids(
|
||||||
|
word_fid_docids,
|
||||||
|
index.word_fid_docids.remap_types(),
|
||||||
|
index,
|
||||||
|
extractor_sender.docids::<WordFidDocids>(),
|
||||||
|
&indexing_context.must_stop_processing,
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
let span =
|
||||||
|
tracing::trace_span!(target: "indexing::documents::merge", "exact_word_docids");
|
||||||
|
let _entered = span.enter();
|
||||||
|
merge_and_send_docids(
|
||||||
|
exact_word_docids,
|
||||||
|
index.exact_word_docids.remap_types(),
|
||||||
|
index,
|
||||||
|
extractor_sender.docids::<ExactWordDocids>(),
|
||||||
|
&indexing_context.must_stop_processing,
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
let span =
|
||||||
|
tracing::trace_span!(target: "indexing::documents::merge", "word_position_docids");
|
||||||
|
let _entered = span.enter();
|
||||||
|
merge_and_send_docids(
|
||||||
|
word_position_docids,
|
||||||
|
index.word_position_docids.remap_types(),
|
||||||
|
index,
|
||||||
|
extractor_sender.docids::<WordPositionDocids>(),
|
||||||
|
&indexing_context.must_stop_processing,
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
let span =
|
||||||
|
tracing::trace_span!(target: "indexing::documents::merge", "fid_word_count_docids");
|
||||||
|
let _entered = span.enter();
|
||||||
|
merge_and_send_docids(
|
||||||
|
fid_word_count_docids,
|
||||||
|
index.field_id_word_count_docids.remap_types(),
|
||||||
|
index,
|
||||||
|
extractor_sender.docids::<FidWordCountDocids>(),
|
||||||
|
&indexing_context.must_stop_processing,
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// run the proximity extraction only if the precision is by word
|
||||||
|
// this works only if the settings didn't change during this transaction.
|
||||||
|
let proximity_precision = index.proximity_precision(&rtxn)?.unwrap_or_default();
|
||||||
|
if proximity_precision == ProximityPrecision::ByWord {
|
||||||
|
let caches = {
|
||||||
|
let span = tracing::trace_span!(target: "indexing::documents::extract", "word_pair_proximity_docids");
|
||||||
|
let _entered = span.enter();
|
||||||
|
|
||||||
|
<WordPairProximityDocidsExtractor as DocidsExtractor>::run_extraction(
|
||||||
|
document_changes,
|
||||||
|
indexing_context,
|
||||||
|
extractor_allocs,
|
||||||
|
IndexingStep::ExtractingWordProximity,
|
||||||
|
)?
|
||||||
|
};
|
||||||
|
|
||||||
|
{
|
||||||
|
let span = tracing::trace_span!(target: "indexing::documents::merge", "word_pair_proximity_docids");
|
||||||
|
let _entered = span.enter();
|
||||||
|
|
||||||
|
merge_and_send_docids(
|
||||||
|
caches,
|
||||||
|
index.word_pair_proximity_docids.remap_types(),
|
||||||
|
index,
|
||||||
|
extractor_sender.docids::<WordPairProximityDocids>(),
|
||||||
|
&indexing_context.must_stop_processing,
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
'vectors: {
|
||||||
|
if index_embeddings.is_empty() {
|
||||||
|
break 'vectors;
|
||||||
|
}
|
||||||
|
|
||||||
|
let embedding_sender = extractor_sender.embeddings();
|
||||||
|
let extractor = EmbeddingExtractor::new(
|
||||||
|
embedders,
|
||||||
|
embedding_sender,
|
||||||
|
field_distribution,
|
||||||
|
request_threads(),
|
||||||
|
);
|
||||||
|
let mut datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
|
||||||
|
{
|
||||||
|
let span = tracing::trace_span!(target: "indexing::documents::extract", "vectors");
|
||||||
|
let _entered = span.enter();
|
||||||
|
|
||||||
|
extract(
|
||||||
|
document_changes,
|
||||||
|
&extractor,
|
||||||
|
indexing_context,
|
||||||
|
extractor_allocs,
|
||||||
|
&datastore,
|
||||||
|
IndexingStep::ExtractingEmbeddings,
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
{
|
||||||
|
let span = tracing::trace_span!(target: "indexing::documents::merge", "vectors");
|
||||||
|
let _entered = span.enter();
|
||||||
|
|
||||||
|
for config in &mut index_embeddings {
|
||||||
|
'data: for data in datastore.iter_mut() {
|
||||||
|
let data = &mut data.get_mut().0;
|
||||||
|
let Some(deladd) = data.remove(&config.name) else {
|
||||||
|
continue 'data;
|
||||||
|
};
|
||||||
|
deladd.apply_to(&mut config.user_provided);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
'geo: {
|
||||||
|
let Some(extractor) = GeoExtractor::new(&rtxn, index, *indexing_context.grenad_parameters)?
|
||||||
|
else {
|
||||||
|
break 'geo;
|
||||||
|
};
|
||||||
|
let datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
|
||||||
|
|
||||||
|
{
|
||||||
|
let span = tracing::trace_span!(target: "indexing::documents::extract", "geo");
|
||||||
|
let _entered = span.enter();
|
||||||
|
|
||||||
|
extract(
|
||||||
|
document_changes,
|
||||||
|
&extractor,
|
||||||
|
indexing_context,
|
||||||
|
extractor_allocs,
|
||||||
|
&datastore,
|
||||||
|
IndexingStep::WritingGeoPoints,
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
merge_and_send_rtree(
|
||||||
|
datastore,
|
||||||
|
&rtxn,
|
||||||
|
index,
|
||||||
|
extractor_sender.geo(),
|
||||||
|
&indexing_context.must_stop_processing,
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
indexing_context.progress.update_progress(IndexingStep::WritingToDatabase);
|
||||||
|
finished_extraction.store(true, std::sync::atomic::Ordering::Relaxed);
|
||||||
|
|
||||||
|
Result::Ok((facet_field_ids_delta, index_embeddings))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn request_threads() -> &'static ThreadPoolNoAbort {
|
||||||
|
static REQUEST_THREADS: OnceLock<ThreadPoolNoAbort> = OnceLock::new();
|
||||||
|
|
||||||
|
REQUEST_THREADS.get_or_init(|| {
|
||||||
|
ThreadPoolNoAbortBuilder::new()
|
||||||
|
.num_threads(crate::vector::REQUEST_PARALLELISM)
|
||||||
|
.thread_name(|index| format!("embedding-request-{index}"))
|
||||||
|
.build()
|
||||||
|
.unwrap()
|
||||||
|
})
|
||||||
|
}
|
85
crates/milli/src/update/new/indexer/guess_primary_key.rs
Normal file
85
crates/milli/src/update/new/indexer/guess_primary_key.rs
Normal file
@ -0,0 +1,85 @@
|
|||||||
|
use bumparaw_collections::RawMap;
|
||||||
|
use heed::RoTxn;
|
||||||
|
use rustc_hash::FxBuildHasher;
|
||||||
|
|
||||||
|
use crate::documents::{PrimaryKey, DEFAULT_PRIMARY_KEY};
|
||||||
|
use crate::update::new::StdResult;
|
||||||
|
use crate::{FieldsIdsMap, Index, Result, UserError};
|
||||||
|
|
||||||
|
/// Returns the primary key that has already been set for this index or the
|
||||||
|
/// one we will guess by searching for the first key that contains "id" as a substring,
|
||||||
|
/// and whether the primary key changed
|
||||||
|
pub fn retrieve_or_guess_primary_key<'a>(
|
||||||
|
rtxn: &'a RoTxn<'a>,
|
||||||
|
index: &Index,
|
||||||
|
new_fields_ids_map: &mut FieldsIdsMap,
|
||||||
|
primary_key_from_op: Option<&'a str>,
|
||||||
|
first_document: Option<RawMap<'a, FxBuildHasher>>,
|
||||||
|
) -> Result<StdResult<(PrimaryKey<'a>, bool), UserError>> {
|
||||||
|
// make sure that we have a declared primary key, either fetching it from the index or attempting to guess it.
|
||||||
|
|
||||||
|
// do we have an existing declared primary key?
|
||||||
|
let (primary_key, has_changed) = if let Some(primary_key_from_db) = index.primary_key(rtxn)? {
|
||||||
|
// did we request a primary key in the operation?
|
||||||
|
match primary_key_from_op {
|
||||||
|
// we did, and it is different from the DB one
|
||||||
|
Some(primary_key_from_op) if primary_key_from_op != primary_key_from_db => {
|
||||||
|
return Ok(Err(UserError::PrimaryKeyCannotBeChanged(
|
||||||
|
primary_key_from_db.to_string(),
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
_ => (primary_key_from_db, false),
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// no primary key in the DB => let's set one
|
||||||
|
// did we request a primary key in the operation?
|
||||||
|
let primary_key = if let Some(primary_key_from_op) = primary_key_from_op {
|
||||||
|
// set primary key from operation
|
||||||
|
primary_key_from_op
|
||||||
|
} else {
|
||||||
|
// guess primary key
|
||||||
|
let first_document = match first_document {
|
||||||
|
Some(document) => document,
|
||||||
|
// previous indexer when no pk is set + we send an empty payload => index_primary_key_no_candidate_found
|
||||||
|
None => return Ok(Err(UserError::NoPrimaryKeyCandidateFound)),
|
||||||
|
};
|
||||||
|
|
||||||
|
let guesses: Result<Vec<&str>> = first_document
|
||||||
|
.keys()
|
||||||
|
.filter_map(|name| {
|
||||||
|
let Some(_) = new_fields_ids_map.insert(name) else {
|
||||||
|
return Some(Err(UserError::AttributeLimitReached.into()));
|
||||||
|
};
|
||||||
|
name.to_lowercase().ends_with(DEFAULT_PRIMARY_KEY).then_some(Ok(name))
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let mut guesses = guesses?;
|
||||||
|
|
||||||
|
// sort the keys in lexicographical order, so that fields are always in the same order.
|
||||||
|
guesses.sort_unstable();
|
||||||
|
|
||||||
|
match guesses.as_slice() {
|
||||||
|
[] => return Ok(Err(UserError::NoPrimaryKeyCandidateFound)),
|
||||||
|
[name] => {
|
||||||
|
tracing::info!("Primary key was not specified in index. Inferred to '{name}'");
|
||||||
|
*name
|
||||||
|
}
|
||||||
|
multiple => {
|
||||||
|
return Ok(Err(UserError::MultiplePrimaryKeyCandidatesFound {
|
||||||
|
candidates: multiple
|
||||||
|
.iter()
|
||||||
|
.map(|candidate| candidate.to_string())
|
||||||
|
.collect(),
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
(primary_key, true)
|
||||||
|
};
|
||||||
|
|
||||||
|
match PrimaryKey::new_or_insert(primary_key, new_fields_ids_map) {
|
||||||
|
Ok(primary_key) => Ok(Ok((primary_key, has_changed))),
|
||||||
|
Err(err) => Ok(Err(err)),
|
||||||
|
}
|
||||||
|
}
|
@ -1,59 +1,37 @@
|
|||||||
use std::cmp::Ordering;
|
|
||||||
use std::sync::atomic::AtomicBool;
|
use std::sync::atomic::AtomicBool;
|
||||||
use std::sync::{OnceLock, RwLock};
|
use std::sync::RwLock;
|
||||||
use std::thread::{self, Builder};
|
use std::thread::{self, Builder};
|
||||||
|
|
||||||
use big_s::S;
|
use big_s::S;
|
||||||
use bumparaw_collections::RawMap;
|
use document_changes::{DocumentChanges, IndexingContext};
|
||||||
use document_changes::{extract, DocumentChanges, IndexingContext};
|
|
||||||
pub use document_deletion::DocumentDeletion;
|
pub use document_deletion::DocumentDeletion;
|
||||||
pub use document_operation::{DocumentOperation, PayloadStats};
|
pub use document_operation::{DocumentOperation, PayloadStats};
|
||||||
use hashbrown::HashMap;
|
use hashbrown::HashMap;
|
||||||
use heed::types::{Bytes, DecodeIgnore, Str};
|
use heed::RwTxn;
|
||||||
use heed::{RoTxn, RwTxn};
|
|
||||||
use itertools::{merge_join_by, EitherOrBoth};
|
|
||||||
pub use partial_dump::PartialDump;
|
pub use partial_dump::PartialDump;
|
||||||
use rand::SeedableRng as _;
|
|
||||||
use rustc_hash::FxBuildHasher;
|
|
||||||
use time::OffsetDateTime;
|
|
||||||
pub use update_by_function::UpdateByFunction;
|
pub use update_by_function::UpdateByFunction;
|
||||||
|
use write::{build_vectors, update_index, write_to_db};
|
||||||
|
|
||||||
use super::channel::*;
|
use super::channel::*;
|
||||||
use super::extract::*;
|
|
||||||
use super::facet_search_builder::FacetSearchBuilder;
|
|
||||||
use super::merger::FacetFieldIdsDelta;
|
|
||||||
use super::steps::IndexingStep;
|
use super::steps::IndexingStep;
|
||||||
use super::thread_local::ThreadLocal;
|
use super::thread_local::ThreadLocal;
|
||||||
use super::word_fst_builder::{PrefixData, PrefixDelta, WordFstBuilder};
|
use crate::documents::PrimaryKey;
|
||||||
use super::words_prefix_docids::{
|
|
||||||
compute_word_prefix_docids, compute_word_prefix_fid_docids, compute_word_prefix_position_docids,
|
|
||||||
};
|
|
||||||
use super::StdResult;
|
|
||||||
use crate::documents::{PrimaryKey, DEFAULT_PRIMARY_KEY};
|
|
||||||
use crate::facet::FacetType;
|
|
||||||
use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder};
|
use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder};
|
||||||
use crate::index::main_key::{WORDS_FST_KEY, WORDS_PREFIXES_FST_KEY};
|
|
||||||
use crate::progress::Progress;
|
use crate::progress::Progress;
|
||||||
use crate::proximity::ProximityPrecision;
|
use crate::update::GrenadParameters;
|
||||||
use crate::update::del_add::DelAdd;
|
use crate::vector::{ArroyWrapper, EmbeddingConfigs};
|
||||||
use crate::update::new::extract::EmbeddingExtractor;
|
use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result, ThreadPoolNoAbort};
|
||||||
use crate::update::new::merger::merge_and_send_rtree;
|
|
||||||
use crate::update::new::words_prefix_docids::compute_exact_word_prefix_docids;
|
|
||||||
use crate::update::new::{merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases};
|
|
||||||
use crate::update::settings::InnerIndexSettings;
|
|
||||||
use crate::update::{FacetsUpdateBulk, GrenadParameters};
|
|
||||||
use crate::vector::{ArroyWrapper, EmbeddingConfigs, Embeddings};
|
|
||||||
use crate::{
|
|
||||||
Error, FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result, ThreadPoolNoAbort,
|
|
||||||
ThreadPoolNoAbortBuilder, UserError,
|
|
||||||
};
|
|
||||||
|
|
||||||
|
mod compute;
|
||||||
pub(crate) mod de;
|
pub(crate) mod de;
|
||||||
pub mod document_changes;
|
pub mod document_changes;
|
||||||
mod document_deletion;
|
mod document_deletion;
|
||||||
mod document_operation;
|
mod document_operation;
|
||||||
|
mod extract;
|
||||||
|
mod guess_primary_key;
|
||||||
mod partial_dump;
|
mod partial_dump;
|
||||||
mod update_by_function;
|
mod update_by_function;
|
||||||
|
mod write;
|
||||||
|
|
||||||
/// This is the main function of this crate.
|
/// This is the main function of this crate.
|
||||||
///
|
///
|
||||||
@ -107,7 +85,7 @@ where
|
|||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|
||||||
let (extractor_sender, mut writer_receiver) = pool
|
let (extractor_sender, writer_receiver) = pool
|
||||||
.install(|| extractor_writer_bbqueue(&mut bbbuffers, total_bbbuffer_capacity, 1000))
|
.install(|| extractor_writer_bbqueue(&mut bbbuffers, total_bbbuffer_capacity, 1000))
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
@ -126,9 +104,10 @@ where
|
|||||||
fields_ids_map_store: &fields_ids_map_store,
|
fields_ids_map_store: &fields_ids_map_store,
|
||||||
must_stop_processing,
|
must_stop_processing,
|
||||||
progress,
|
progress,
|
||||||
|
grenad_parameters: &grenad_parameters,
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut index_embeddings = index.embedding_configs(wtxn)?;
|
let index_embeddings = index.embedding_configs(wtxn)?;
|
||||||
let mut field_distribution = index.field_distribution(wtxn)?;
|
let mut field_distribution = index.field_distribution(wtxn)?;
|
||||||
let mut document_ids = index.documents_ids(wtxn)?;
|
let mut document_ids = index.documents_ids(wtxn)?;
|
||||||
|
|
||||||
@ -139,261 +118,28 @@ where
|
|||||||
// prevent moving the field_distribution and document_ids in the inner closure...
|
// prevent moving the field_distribution and document_ids in the inner closure...
|
||||||
let field_distribution = &mut field_distribution;
|
let field_distribution = &mut field_distribution;
|
||||||
let document_ids = &mut document_ids;
|
let document_ids = &mut document_ids;
|
||||||
let extractor_handle = Builder::new().name(S("indexer-extractors")).spawn_scoped(s, move || {
|
let extractor_handle =
|
||||||
pool.install(move || {
|
Builder::new().name(S("indexer-extractors")).spawn_scoped(s, move || {
|
||||||
let span = tracing::trace_span!(target: "indexing::documents", parent: &indexer_span, "extract");
|
pool.install(move || {
|
||||||
let _entered = span.enter();
|
extract::extract_all(
|
||||||
|
|
||||||
let rtxn = index.read_txn()?;
|
|
||||||
|
|
||||||
// document but we need to create a function that collects and compresses documents.
|
|
||||||
let document_sender = extractor_sender.documents();
|
|
||||||
let document_extractor = DocumentsExtractor::new(document_sender, embedders);
|
|
||||||
let datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
|
|
||||||
{
|
|
||||||
let span = tracing::trace_span!(target: "indexing::documents::extract", parent: &indexer_span, "documents");
|
|
||||||
let _entered = span.enter();
|
|
||||||
extract(
|
|
||||||
document_changes,
|
document_changes,
|
||||||
&document_extractor,
|
|
||||||
indexing_context,
|
indexing_context,
|
||||||
|
indexer_span,
|
||||||
|
extractor_sender,
|
||||||
|
embedders,
|
||||||
&mut extractor_allocs,
|
&mut extractor_allocs,
|
||||||
&datastore,
|
finished_extraction,
|
||||||
IndexingStep::ExtractingDocuments,
|
field_distribution,
|
||||||
)?;
|
index_embeddings,
|
||||||
}
|
document_ids,
|
||||||
{
|
)
|
||||||
let span = tracing::trace_span!(target: "indexing::documents::merge", parent: &indexer_span, "documents");
|
})
|
||||||
let _entered = span.enter();
|
.unwrap()
|
||||||
for document_extractor_data in datastore {
|
})?;
|
||||||
let document_extractor_data = document_extractor_data.0.into_inner();
|
|
||||||
for (field, delta) in document_extractor_data.field_distribution_delta {
|
|
||||||
let current = field_distribution.entry(field).or_default();
|
|
||||||
// adding the delta should never cause a negative result, as we are removing fields that previously existed.
|
|
||||||
*current = current.saturating_add_signed(delta);
|
|
||||||
}
|
|
||||||
document_extractor_data.docids_delta.apply_to(document_ids);
|
|
||||||
}
|
|
||||||
|
|
||||||
field_distribution.retain(|_, v| *v != 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
let facet_field_ids_delta;
|
|
||||||
|
|
||||||
{
|
|
||||||
let caches = {
|
|
||||||
let span = tracing::trace_span!(target: "indexing::documents::extract", parent: &indexer_span, "faceted");
|
|
||||||
let _entered = span.enter();
|
|
||||||
|
|
||||||
FacetedDocidsExtractor::run_extraction(
|
|
||||||
grenad_parameters,
|
|
||||||
document_changes,
|
|
||||||
indexing_context,
|
|
||||||
&mut extractor_allocs,
|
|
||||||
&extractor_sender.field_id_docid_facet_sender(),
|
|
||||||
IndexingStep::ExtractingFacets
|
|
||||||
)?
|
|
||||||
};
|
|
||||||
|
|
||||||
{
|
|
||||||
let span = tracing::trace_span!(target: "indexing::documents::merge", parent: &indexer_span, "faceted");
|
|
||||||
let _entered = span.enter();
|
|
||||||
|
|
||||||
facet_field_ids_delta = merge_and_send_facet_docids(
|
|
||||||
caches,
|
|
||||||
FacetDatabases::new(index),
|
|
||||||
index,
|
|
||||||
extractor_sender.facet_docids(),
|
|
||||||
)?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
{
|
|
||||||
let WordDocidsCaches {
|
|
||||||
word_docids,
|
|
||||||
word_fid_docids,
|
|
||||||
exact_word_docids,
|
|
||||||
word_position_docids,
|
|
||||||
fid_word_count_docids,
|
|
||||||
} = {
|
|
||||||
let span = tracing::trace_span!(target: "indexing::documents::extract", "word_docids");
|
|
||||||
let _entered = span.enter();
|
|
||||||
|
|
||||||
WordDocidsExtractors::run_extraction(
|
|
||||||
grenad_parameters,
|
|
||||||
document_changes,
|
|
||||||
indexing_context,
|
|
||||||
&mut extractor_allocs,
|
|
||||||
IndexingStep::ExtractingWords
|
|
||||||
)?
|
|
||||||
};
|
|
||||||
|
|
||||||
{
|
|
||||||
let span = tracing::trace_span!(target: "indexing::documents::merge", "word_docids");
|
|
||||||
let _entered = span.enter();
|
|
||||||
merge_and_send_docids(
|
|
||||||
word_docids,
|
|
||||||
index.word_docids.remap_types(),
|
|
||||||
index,
|
|
||||||
extractor_sender.docids::<WordDocids>(),
|
|
||||||
&indexing_context.must_stop_processing,
|
|
||||||
)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
{
|
|
||||||
let span = tracing::trace_span!(target: "indexing::documents::merge", "word_fid_docids");
|
|
||||||
let _entered = span.enter();
|
|
||||||
merge_and_send_docids(
|
|
||||||
word_fid_docids,
|
|
||||||
index.word_fid_docids.remap_types(),
|
|
||||||
index,
|
|
||||||
extractor_sender.docids::<WordFidDocids>(),
|
|
||||||
&indexing_context.must_stop_processing,
|
|
||||||
)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
{
|
|
||||||
let span = tracing::trace_span!(target: "indexing::documents::merge", "exact_word_docids");
|
|
||||||
let _entered = span.enter();
|
|
||||||
merge_and_send_docids(
|
|
||||||
exact_word_docids,
|
|
||||||
index.exact_word_docids.remap_types(),
|
|
||||||
index,
|
|
||||||
extractor_sender.docids::<ExactWordDocids>(),
|
|
||||||
&indexing_context.must_stop_processing,
|
|
||||||
)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
{
|
|
||||||
let span = tracing::trace_span!(target: "indexing::documents::merge", "word_position_docids");
|
|
||||||
let _entered = span.enter();
|
|
||||||
merge_and_send_docids(
|
|
||||||
word_position_docids,
|
|
||||||
index.word_position_docids.remap_types(),
|
|
||||||
index,
|
|
||||||
extractor_sender.docids::<WordPositionDocids>(),
|
|
||||||
&indexing_context.must_stop_processing,
|
|
||||||
)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
{
|
|
||||||
let span = tracing::trace_span!(target: "indexing::documents::merge", "fid_word_count_docids");
|
|
||||||
let _entered = span.enter();
|
|
||||||
merge_and_send_docids(
|
|
||||||
fid_word_count_docids,
|
|
||||||
index.field_id_word_count_docids.remap_types(),
|
|
||||||
index,
|
|
||||||
extractor_sender.docids::<FidWordCountDocids>(),
|
|
||||||
&indexing_context.must_stop_processing,
|
|
||||||
)?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// run the proximity extraction only if the precision is by word
|
|
||||||
// this works only if the settings didn't change during this transaction.
|
|
||||||
let proximity_precision = index.proximity_precision(&rtxn)?.unwrap_or_default();
|
|
||||||
if proximity_precision == ProximityPrecision::ByWord {
|
|
||||||
let caches = {
|
|
||||||
let span = tracing::trace_span!(target: "indexing::documents::extract", "word_pair_proximity_docids");
|
|
||||||
let _entered = span.enter();
|
|
||||||
|
|
||||||
<WordPairProximityDocidsExtractor as DocidsExtractor>::run_extraction(
|
|
||||||
grenad_parameters,
|
|
||||||
document_changes,
|
|
||||||
indexing_context,
|
|
||||||
&mut extractor_allocs,
|
|
||||||
IndexingStep::ExtractingWordProximity,
|
|
||||||
)?
|
|
||||||
};
|
|
||||||
|
|
||||||
{
|
|
||||||
let span = tracing::trace_span!(target: "indexing::documents::merge", "word_pair_proximity_docids");
|
|
||||||
let _entered = span.enter();
|
|
||||||
|
|
||||||
merge_and_send_docids(
|
|
||||||
caches,
|
|
||||||
index.word_pair_proximity_docids.remap_types(),
|
|
||||||
index,
|
|
||||||
extractor_sender.docids::<WordPairProximityDocids>(),
|
|
||||||
&indexing_context.must_stop_processing,
|
|
||||||
)?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
'vectors: {
|
|
||||||
if index_embeddings.is_empty() {
|
|
||||||
break 'vectors;
|
|
||||||
}
|
|
||||||
|
|
||||||
let embedding_sender = extractor_sender.embeddings();
|
|
||||||
let extractor = EmbeddingExtractor::new(embedders, embedding_sender, field_distribution, request_threads());
|
|
||||||
let mut datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
|
|
||||||
{
|
|
||||||
let span = tracing::trace_span!(target: "indexing::documents::extract", "vectors");
|
|
||||||
let _entered = span.enter();
|
|
||||||
|
|
||||||
extract(
|
|
||||||
document_changes,
|
|
||||||
&extractor,
|
|
||||||
indexing_context,
|
|
||||||
&mut extractor_allocs,
|
|
||||||
&datastore,
|
|
||||||
IndexingStep::ExtractingEmbeddings,
|
|
||||||
)?;
|
|
||||||
}
|
|
||||||
{
|
|
||||||
let span = tracing::trace_span!(target: "indexing::documents::merge", "vectors");
|
|
||||||
let _entered = span.enter();
|
|
||||||
|
|
||||||
for config in &mut index_embeddings {
|
|
||||||
'data: for data in datastore.iter_mut() {
|
|
||||||
let data = &mut data.get_mut().0;
|
|
||||||
let Some(deladd) = data.remove(&config.name) else { continue 'data; };
|
|
||||||
deladd.apply_to(&mut config.user_provided);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
'geo: {
|
|
||||||
let Some(extractor) = GeoExtractor::new(&rtxn, index, grenad_parameters)? else {
|
|
||||||
break 'geo;
|
|
||||||
};
|
|
||||||
let datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
|
|
||||||
|
|
||||||
{
|
|
||||||
let span = tracing::trace_span!(target: "indexing::documents::extract", "geo");
|
|
||||||
let _entered = span.enter();
|
|
||||||
|
|
||||||
extract(
|
|
||||||
document_changes,
|
|
||||||
&extractor,
|
|
||||||
indexing_context,
|
|
||||||
&mut extractor_allocs,
|
|
||||||
&datastore,
|
|
||||||
IndexingStep::WritingGeoPoints
|
|
||||||
)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
merge_and_send_rtree(
|
|
||||||
datastore,
|
|
||||||
&rtxn,
|
|
||||||
index,
|
|
||||||
extractor_sender.geo(),
|
|
||||||
&indexing_context.must_stop_processing,
|
|
||||||
)?;
|
|
||||||
}
|
|
||||||
indexing_context.progress.update_progress(IndexingStep::WritingToDatabase);
|
|
||||||
finished_extraction.store(true, std::sync::atomic::Ordering::Relaxed);
|
|
||||||
|
|
||||||
Result::Ok((facet_field_ids_delta, index_embeddings))
|
|
||||||
}).unwrap()
|
|
||||||
})?;
|
|
||||||
|
|
||||||
let global_fields_ids_map = GlobalFieldsIdsMap::new(&new_fields_ids_map);
|
let global_fields_ids_map = GlobalFieldsIdsMap::new(&new_fields_ids_map);
|
||||||
|
|
||||||
let vector_arroy = index.vector_arroy;
|
let vector_arroy = index.vector_arroy;
|
||||||
let indexer_span = tracing::Span::current();
|
|
||||||
let arroy_writers: Result<HashMap<_, _>> = embedders
|
let arroy_writers: Result<HashMap<_, _>> = embedders
|
||||||
.inner_as_ref()
|
.inner_as_ref()
|
||||||
.iter()
|
.iter()
|
||||||
@ -415,114 +161,25 @@ where
|
|||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
// Used by by the ArroySetVector to copy the embedding into an
|
|
||||||
// aligned memory area, required by arroy to accept a new vector.
|
|
||||||
let mut aligned_embedding = Vec::new();
|
|
||||||
let mut arroy_writers = arroy_writers?;
|
let mut arroy_writers = arroy_writers?;
|
||||||
|
|
||||||
{
|
write_to_db(writer_receiver, finished_extraction, index, wtxn, &arroy_writers)?;
|
||||||
let span = tracing::trace_span!(target: "indexing::write_db", "all");
|
|
||||||
let _entered = span.enter();
|
|
||||||
|
|
||||||
let span = tracing::trace_span!(target: "indexing::write_db", "post_merge");
|
|
||||||
let mut _entered_post_merge = None;
|
|
||||||
|
|
||||||
while let Some(action) = writer_receiver.recv_action() {
|
|
||||||
if _entered_post_merge.is_none()
|
|
||||||
&& finished_extraction.load(std::sync::atomic::Ordering::Relaxed)
|
|
||||||
{
|
|
||||||
_entered_post_merge = Some(span.enter());
|
|
||||||
}
|
|
||||||
|
|
||||||
match action {
|
|
||||||
ReceiverAction::WakeUp => (),
|
|
||||||
ReceiverAction::LargeEntry(LargeEntry { database, key, value }) => {
|
|
||||||
let database_name = database.database_name();
|
|
||||||
let database = database.database(index);
|
|
||||||
if let Err(error) = database.put(wtxn, &key, &value) {
|
|
||||||
return Err(Error::InternalError(InternalError::StorePut {
|
|
||||||
database_name,
|
|
||||||
key: bstr::BString::from(&key[..]),
|
|
||||||
value_length: value.len(),
|
|
||||||
error,
|
|
||||||
}));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ReceiverAction::LargeVectors(large_vectors) => {
|
|
||||||
let LargeVectors { docid, embedder_id, .. } = large_vectors;
|
|
||||||
let (_, _, writer, dimensions) =
|
|
||||||
arroy_writers.get(&embedder_id).expect("requested a missing embedder");
|
|
||||||
let mut embeddings = Embeddings::new(*dimensions);
|
|
||||||
for embedding in large_vectors.read_embeddings(*dimensions) {
|
|
||||||
embeddings.push(embedding.to_vec()).unwrap();
|
|
||||||
}
|
|
||||||
writer.del_items(wtxn, *dimensions, docid)?;
|
|
||||||
writer.add_items(wtxn, docid, &embeddings)?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Every time the is a message in the channel we search
|
|
||||||
// for new entries in the BBQueue buffers.
|
|
||||||
write_from_bbqueue(
|
|
||||||
&mut writer_receiver,
|
|
||||||
index,
|
|
||||||
wtxn,
|
|
||||||
&arroy_writers,
|
|
||||||
&mut aligned_embedding,
|
|
||||||
)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Once the extractor/writer channel is closed
|
|
||||||
// we must process the remaining BBQueue messages.
|
|
||||||
write_from_bbqueue(
|
|
||||||
&mut writer_receiver,
|
|
||||||
index,
|
|
||||||
wtxn,
|
|
||||||
&arroy_writers,
|
|
||||||
&mut aligned_embedding,
|
|
||||||
)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
indexing_context.progress.update_progress(IndexingStep::WaitingForExtractors);
|
indexing_context.progress.update_progress(IndexingStep::WaitingForExtractors);
|
||||||
|
|
||||||
let (facet_field_ids_delta, index_embeddings) = extractor_handle.join().unwrap()?;
|
let (facet_field_ids_delta, index_embeddings) = extractor_handle.join().unwrap()?;
|
||||||
|
|
||||||
'vectors: {
|
indexing_context.progress.update_progress(IndexingStep::WritingEmbeddingsToDatabase);
|
||||||
let span =
|
|
||||||
tracing::trace_span!(target: "indexing::vectors", parent: &indexer_span, "build");
|
|
||||||
let _entered = span.enter();
|
|
||||||
|
|
||||||
if index_embeddings.is_empty() {
|
build_vectors(
|
||||||
break 'vectors;
|
index,
|
||||||
}
|
wtxn,
|
||||||
|
index_embeddings,
|
||||||
|
&mut arroy_writers,
|
||||||
|
&indexing_context.must_stop_processing,
|
||||||
|
)?;
|
||||||
|
|
||||||
indexing_context.progress.update_progress(IndexingStep::WritingEmbeddingsToDatabase);
|
compute::postprocess(indexing_context, wtxn, global_fields_ids_map, facet_field_ids_delta)?;
|
||||||
let mut rng = rand::rngs::StdRng::seed_from_u64(42);
|
|
||||||
for (_index, (_embedder_name, _embedder, writer, dimensions)) in &mut arroy_writers {
|
|
||||||
let dimensions = *dimensions;
|
|
||||||
writer.build_and_quantize(
|
|
||||||
wtxn,
|
|
||||||
&mut rng,
|
|
||||||
dimensions,
|
|
||||||
false,
|
|
||||||
&indexing_context.must_stop_processing,
|
|
||||||
)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
index.put_embedding_configs(wtxn, index_embeddings)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
indexing_context.progress.update_progress(IndexingStep::PostProcessingFacets);
|
|
||||||
if index.facet_search(wtxn)? {
|
|
||||||
compute_facet_search_database(index, wtxn, global_fields_ids_map)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
compute_facet_level_database(index, wtxn, facet_field_ids_delta)?;
|
|
||||||
|
|
||||||
indexing_context.progress.update_progress(IndexingStep::PostProcessingWords);
|
|
||||||
if let Some(prefix_delta) = compute_word_fst(index, wtxn)? {
|
|
||||||
compute_prefix_database(index, wtxn, prefix_delta, grenad_parameters)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
indexing_context.progress.update_progress(IndexingStep::Finalizing);
|
indexing_context.progress.update_progress(IndexingStep::Finalizing);
|
||||||
|
|
||||||
@ -533,321 +190,15 @@ where
|
|||||||
drop(fields_ids_map_store);
|
drop(fields_ids_map_store);
|
||||||
|
|
||||||
let new_fields_ids_map = new_fields_ids_map.into_inner().unwrap();
|
let new_fields_ids_map = new_fields_ids_map.into_inner().unwrap();
|
||||||
index.put_fields_ids_map(wtxn, new_fields_ids_map.as_fields_ids_map())?;
|
update_index(
|
||||||
|
index,
|
||||||
if let Some(new_primary_key) = new_primary_key {
|
wtxn,
|
||||||
index.put_primary_key(wtxn, new_primary_key.name())?;
|
new_fields_ids_map,
|
||||||
}
|
new_primary_key,
|
||||||
|
embedders,
|
||||||
// used to update the localized and weighted maps while sharing the update code with the settings pipeline.
|
field_distribution,
|
||||||
let mut inner_index_settings = InnerIndexSettings::from_index(index, wtxn, Some(embedders))?;
|
document_ids,
|
||||||
inner_index_settings.recompute_facets(wtxn, index)?;
|
)?;
|
||||||
inner_index_settings.recompute_searchables(wtxn, index)?;
|
|
||||||
index.put_field_distribution(wtxn, &field_distribution)?;
|
|
||||||
index.put_documents_ids(wtxn, &document_ids)?;
|
|
||||||
index.set_updated_at(wtxn, &OffsetDateTime::now_utc())?;
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A function dedicated to manage all the available BBQueue frames.
|
|
||||||
///
|
|
||||||
/// It reads all the available frames, do the corresponding database operations
|
|
||||||
/// and stops when no frame are available.
|
|
||||||
fn write_from_bbqueue(
|
|
||||||
writer_receiver: &mut WriterBbqueueReceiver<'_>,
|
|
||||||
index: &Index,
|
|
||||||
wtxn: &mut RwTxn<'_>,
|
|
||||||
arroy_writers: &HashMap<u8, (&str, &crate::vector::Embedder, ArroyWrapper, usize)>,
|
|
||||||
aligned_embedding: &mut Vec<f32>,
|
|
||||||
) -> crate::Result<()> {
|
|
||||||
while let Some(frame_with_header) = writer_receiver.recv_frame() {
|
|
||||||
match frame_with_header.header() {
|
|
||||||
EntryHeader::DbOperation(operation) => {
|
|
||||||
let database_name = operation.database.database_name();
|
|
||||||
let database = operation.database.database(index);
|
|
||||||
let frame = frame_with_header.frame();
|
|
||||||
match operation.key_value(frame) {
|
|
||||||
(key, Some(value)) => {
|
|
||||||
if let Err(error) = database.put(wtxn, key, value) {
|
|
||||||
return Err(Error::InternalError(InternalError::StorePut {
|
|
||||||
database_name,
|
|
||||||
key: key.into(),
|
|
||||||
value_length: value.len(),
|
|
||||||
error,
|
|
||||||
}));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
(key, None) => match database.delete(wtxn, key) {
|
|
||||||
Ok(false) => {
|
|
||||||
unreachable!("We tried to delete an unknown key: {key:?}")
|
|
||||||
}
|
|
||||||
Ok(_) => (),
|
|
||||||
Err(error) => {
|
|
||||||
return Err(Error::InternalError(InternalError::StoreDeletion {
|
|
||||||
database_name,
|
|
||||||
key: key.into(),
|
|
||||||
error,
|
|
||||||
}));
|
|
||||||
}
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
EntryHeader::ArroyDeleteVector(ArroyDeleteVector { docid }) => {
|
|
||||||
for (_index, (_name, _embedder, writer, dimensions)) in arroy_writers {
|
|
||||||
let dimensions = *dimensions;
|
|
||||||
writer.del_items(wtxn, dimensions, docid)?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
EntryHeader::ArroySetVectors(asvs) => {
|
|
||||||
let ArroySetVectors { docid, embedder_id, .. } = asvs;
|
|
||||||
let frame = frame_with_header.frame();
|
|
||||||
let (_, _, writer, dimensions) =
|
|
||||||
arroy_writers.get(&embedder_id).expect("requested a missing embedder");
|
|
||||||
let mut embeddings = Embeddings::new(*dimensions);
|
|
||||||
let all_embeddings = asvs.read_all_embeddings_into_vec(frame, aligned_embedding);
|
|
||||||
embeddings.append(all_embeddings.to_vec()).unwrap();
|
|
||||||
writer.del_items(wtxn, *dimensions, docid)?;
|
|
||||||
writer.add_items(wtxn, docid, &embeddings)?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")]
|
|
||||||
fn compute_prefix_database(
|
|
||||||
index: &Index,
|
|
||||||
wtxn: &mut RwTxn,
|
|
||||||
prefix_delta: PrefixDelta,
|
|
||||||
grenad_parameters: GrenadParameters,
|
|
||||||
) -> Result<()> {
|
|
||||||
let PrefixDelta { modified, deleted } = prefix_delta;
|
|
||||||
// Compute word prefix docids
|
|
||||||
compute_word_prefix_docids(wtxn, index, &modified, &deleted, grenad_parameters)?;
|
|
||||||
// Compute exact word prefix docids
|
|
||||||
compute_exact_word_prefix_docids(wtxn, index, &modified, &deleted, grenad_parameters)?;
|
|
||||||
// Compute word prefix fid docids
|
|
||||||
compute_word_prefix_fid_docids(wtxn, index, &modified, &deleted, grenad_parameters)?;
|
|
||||||
// Compute word prefix position docids
|
|
||||||
compute_word_prefix_position_docids(wtxn, index, &modified, &deleted, grenad_parameters)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tracing::instrument(level = "trace", skip_all, target = "indexing")]
|
|
||||||
fn compute_word_fst(index: &Index, wtxn: &mut RwTxn) -> Result<Option<PrefixDelta>> {
|
|
||||||
let rtxn = index.read_txn()?;
|
|
||||||
let words_fst = index.words_fst(&rtxn)?;
|
|
||||||
let mut word_fst_builder = WordFstBuilder::new(&words_fst)?;
|
|
||||||
let prefix_settings = index.prefix_settings(&rtxn)?;
|
|
||||||
word_fst_builder.with_prefix_settings(prefix_settings);
|
|
||||||
|
|
||||||
let previous_words = index.word_docids.iter(&rtxn)?.remap_data_type::<Bytes>();
|
|
||||||
let current_words = index.word_docids.iter(wtxn)?.remap_data_type::<Bytes>();
|
|
||||||
for eob in merge_join_by(previous_words, current_words, |lhs, rhs| match (lhs, rhs) {
|
|
||||||
(Ok((l, _)), Ok((r, _))) => l.cmp(r),
|
|
||||||
(Err(_), _) | (_, Err(_)) => Ordering::Equal,
|
|
||||||
}) {
|
|
||||||
match eob {
|
|
||||||
EitherOrBoth::Both(lhs, rhs) => {
|
|
||||||
let (word, lhs_bytes) = lhs?;
|
|
||||||
let (_, rhs_bytes) = rhs?;
|
|
||||||
if lhs_bytes != rhs_bytes {
|
|
||||||
word_fst_builder.register_word(DelAdd::Addition, word.as_ref())?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
EitherOrBoth::Left(result) => {
|
|
||||||
let (word, _) = result?;
|
|
||||||
word_fst_builder.register_word(DelAdd::Deletion, word.as_ref())?;
|
|
||||||
}
|
|
||||||
EitherOrBoth::Right(result) => {
|
|
||||||
let (word, _) = result?;
|
|
||||||
word_fst_builder.register_word(DelAdd::Addition, word.as_ref())?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let (word_fst_mmap, prefix_data) = word_fst_builder.build(index, &rtxn)?;
|
|
||||||
index.main.remap_types::<Str, Bytes>().put(wtxn, WORDS_FST_KEY, &word_fst_mmap)?;
|
|
||||||
if let Some(PrefixData { prefixes_fst_mmap, prefix_delta }) = prefix_data {
|
|
||||||
index.main.remap_types::<Str, Bytes>().put(
|
|
||||||
wtxn,
|
|
||||||
WORDS_PREFIXES_FST_KEY,
|
|
||||||
&prefixes_fst_mmap,
|
|
||||||
)?;
|
|
||||||
Ok(Some(prefix_delta))
|
|
||||||
} else {
|
|
||||||
Ok(None)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tracing::instrument(level = "trace", skip_all, target = "indexing::facet_search")]
|
|
||||||
fn compute_facet_search_database(
|
|
||||||
index: &Index,
|
|
||||||
wtxn: &mut RwTxn,
|
|
||||||
global_fields_ids_map: GlobalFieldsIdsMap,
|
|
||||||
) -> Result<()> {
|
|
||||||
let rtxn = index.read_txn()?;
|
|
||||||
let localized_attributes_rules = index.localized_attributes_rules(&rtxn)?;
|
|
||||||
let mut facet_search_builder = FacetSearchBuilder::new(
|
|
||||||
global_fields_ids_map,
|
|
||||||
localized_attributes_rules.unwrap_or_default(),
|
|
||||||
);
|
|
||||||
|
|
||||||
let previous_facet_id_string_docids = index
|
|
||||||
.facet_id_string_docids
|
|
||||||
.iter(&rtxn)?
|
|
||||||
.remap_data_type::<DecodeIgnore>()
|
|
||||||
.filter(|r| r.as_ref().map_or(true, |(k, _)| k.level == 0));
|
|
||||||
let current_facet_id_string_docids = index
|
|
||||||
.facet_id_string_docids
|
|
||||||
.iter(wtxn)?
|
|
||||||
.remap_data_type::<DecodeIgnore>()
|
|
||||||
.filter(|r| r.as_ref().map_or(true, |(k, _)| k.level == 0));
|
|
||||||
for eob in merge_join_by(
|
|
||||||
previous_facet_id_string_docids,
|
|
||||||
current_facet_id_string_docids,
|
|
||||||
|lhs, rhs| match (lhs, rhs) {
|
|
||||||
(Ok((l, _)), Ok((r, _))) => l.cmp(r),
|
|
||||||
(Err(_), _) | (_, Err(_)) => Ordering::Equal,
|
|
||||||
},
|
|
||||||
) {
|
|
||||||
match eob {
|
|
||||||
EitherOrBoth::Both(lhs, rhs) => {
|
|
||||||
let (_, _) = lhs?;
|
|
||||||
let (_, _) = rhs?;
|
|
||||||
}
|
|
||||||
EitherOrBoth::Left(result) => {
|
|
||||||
let (key, _) = result?;
|
|
||||||
facet_search_builder.register_from_key(DelAdd::Deletion, key)?;
|
|
||||||
}
|
|
||||||
EitherOrBoth::Right(result) => {
|
|
||||||
let (key, _) = result?;
|
|
||||||
facet_search_builder.register_from_key(DelAdd::Addition, key)?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
facet_search_builder.merge_and_write(index, wtxn, &rtxn)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tracing::instrument(level = "trace", skip_all, target = "indexing::facet_field_ids")]
|
|
||||||
fn compute_facet_level_database(
|
|
||||||
index: &Index,
|
|
||||||
wtxn: &mut RwTxn,
|
|
||||||
facet_field_ids_delta: FacetFieldIdsDelta,
|
|
||||||
) -> Result<()> {
|
|
||||||
if let Some(modified_facet_string_ids) = facet_field_ids_delta.modified_facet_string_ids() {
|
|
||||||
let span = tracing::trace_span!(target: "indexing::facet_field_ids", "string");
|
|
||||||
let _entered = span.enter();
|
|
||||||
FacetsUpdateBulk::new_not_updating_level_0(
|
|
||||||
index,
|
|
||||||
modified_facet_string_ids,
|
|
||||||
FacetType::String,
|
|
||||||
)
|
|
||||||
.execute(wtxn)?;
|
|
||||||
}
|
|
||||||
if let Some(modified_facet_number_ids) = facet_field_ids_delta.modified_facet_number_ids() {
|
|
||||||
let span = tracing::trace_span!(target: "indexing::facet_field_ids", "number");
|
|
||||||
let _entered = span.enter();
|
|
||||||
FacetsUpdateBulk::new_not_updating_level_0(
|
|
||||||
index,
|
|
||||||
modified_facet_number_ids,
|
|
||||||
FacetType::Number,
|
|
||||||
)
|
|
||||||
.execute(wtxn)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns the primary key that has already been set for this index or the
|
|
||||||
/// one we will guess by searching for the first key that contains "id" as a substring,
|
|
||||||
/// and whether the primary key changed
|
|
||||||
/// TODO move this elsewhere
|
|
||||||
pub fn retrieve_or_guess_primary_key<'a>(
|
|
||||||
rtxn: &'a RoTxn<'a>,
|
|
||||||
index: &Index,
|
|
||||||
new_fields_ids_map: &mut FieldsIdsMap,
|
|
||||||
primary_key_from_op: Option<&'a str>,
|
|
||||||
first_document: Option<RawMap<'a, FxBuildHasher>>,
|
|
||||||
) -> Result<StdResult<(PrimaryKey<'a>, bool), UserError>> {
|
|
||||||
// make sure that we have a declared primary key, either fetching it from the index or attempting to guess it.
|
|
||||||
|
|
||||||
// do we have an existing declared primary key?
|
|
||||||
let (primary_key, has_changed) = if let Some(primary_key_from_db) = index.primary_key(rtxn)? {
|
|
||||||
// did we request a primary key in the operation?
|
|
||||||
match primary_key_from_op {
|
|
||||||
// we did, and it is different from the DB one
|
|
||||||
Some(primary_key_from_op) if primary_key_from_op != primary_key_from_db => {
|
|
||||||
return Ok(Err(UserError::PrimaryKeyCannotBeChanged(
|
|
||||||
primary_key_from_db.to_string(),
|
|
||||||
)));
|
|
||||||
}
|
|
||||||
_ => (primary_key_from_db, false),
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// no primary key in the DB => let's set one
|
|
||||||
// did we request a primary key in the operation?
|
|
||||||
let primary_key = if let Some(primary_key_from_op) = primary_key_from_op {
|
|
||||||
// set primary key from operation
|
|
||||||
primary_key_from_op
|
|
||||||
} else {
|
|
||||||
// guess primary key
|
|
||||||
let first_document = match first_document {
|
|
||||||
Some(document) => document,
|
|
||||||
// previous indexer when no pk is set + we send an empty payload => index_primary_key_no_candidate_found
|
|
||||||
None => return Ok(Err(UserError::NoPrimaryKeyCandidateFound)),
|
|
||||||
};
|
|
||||||
|
|
||||||
let guesses: Result<Vec<&str>> = first_document
|
|
||||||
.keys()
|
|
||||||
.filter_map(|name| {
|
|
||||||
let Some(_) = new_fields_ids_map.insert(name) else {
|
|
||||||
return Some(Err(UserError::AttributeLimitReached.into()));
|
|
||||||
};
|
|
||||||
name.to_lowercase().ends_with(DEFAULT_PRIMARY_KEY).then_some(Ok(name))
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let mut guesses = guesses?;
|
|
||||||
|
|
||||||
// sort the keys in lexicographical order, so that fields are always in the same order.
|
|
||||||
guesses.sort_unstable();
|
|
||||||
|
|
||||||
match guesses.as_slice() {
|
|
||||||
[] => return Ok(Err(UserError::NoPrimaryKeyCandidateFound)),
|
|
||||||
[name] => {
|
|
||||||
tracing::info!("Primary key was not specified in index. Inferred to '{name}'");
|
|
||||||
*name
|
|
||||||
}
|
|
||||||
multiple => {
|
|
||||||
return Ok(Err(UserError::MultiplePrimaryKeyCandidatesFound {
|
|
||||||
candidates: multiple
|
|
||||||
.iter()
|
|
||||||
.map(|candidate| candidate.to_string())
|
|
||||||
.collect(),
|
|
||||||
}))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
(primary_key, true)
|
|
||||||
};
|
|
||||||
|
|
||||||
match PrimaryKey::new_or_insert(primary_key, new_fields_ids_map) {
|
|
||||||
Ok(primary_key) => Ok(Ok((primary_key, has_changed))),
|
|
||||||
Err(err) => Ok(Err(err)),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn request_threads() -> &'static ThreadPoolNoAbort {
|
|
||||||
static REQUEST_THREADS: OnceLock<ThreadPoolNoAbort> = OnceLock::new();
|
|
||||||
|
|
||||||
REQUEST_THREADS.get_or_init(|| {
|
|
||||||
ThreadPoolNoAbortBuilder::new()
|
|
||||||
.num_threads(crate::vector::REQUEST_PARALLELISM)
|
|
||||||
.thread_name(|index| format!("embedding-request-{index}"))
|
|
||||||
.build()
|
|
||||||
.unwrap()
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
189
crates/milli/src/update/new/indexer/write.rs
Normal file
189
crates/milli/src/update/new/indexer/write.rs
Normal file
@ -0,0 +1,189 @@
|
|||||||
|
use std::sync::atomic::AtomicBool;
|
||||||
|
|
||||||
|
use hashbrown::HashMap;
|
||||||
|
use heed::RwTxn;
|
||||||
|
use rand::SeedableRng as _;
|
||||||
|
use time::OffsetDateTime;
|
||||||
|
|
||||||
|
use super::super::channel::*;
|
||||||
|
use crate::documents::PrimaryKey;
|
||||||
|
use crate::fields_ids_map::metadata::FieldIdMapWithMetadata;
|
||||||
|
use crate::index::IndexEmbeddingConfig;
|
||||||
|
use crate::update::settings::InnerIndexSettings;
|
||||||
|
use crate::vector::{ArroyWrapper, Embedder, EmbeddingConfigs, Embeddings};
|
||||||
|
use crate::{Error, Index, InternalError, Result};
|
||||||
|
|
||||||
|
pub(super) fn write_to_db(
|
||||||
|
mut writer_receiver: WriterBbqueueReceiver<'_>,
|
||||||
|
finished_extraction: &AtomicBool,
|
||||||
|
index: &Index,
|
||||||
|
wtxn: &mut RwTxn<'_>,
|
||||||
|
arroy_writers: &HashMap<u8, (&str, &Embedder, ArroyWrapper, usize)>,
|
||||||
|
) -> Result<()> {
|
||||||
|
// Used by by the ArroySetVector to copy the embedding into an
|
||||||
|
// aligned memory area, required by arroy to accept a new vector.
|
||||||
|
let mut aligned_embedding = Vec::new();
|
||||||
|
let span = tracing::trace_span!(target: "indexing::write_db", "all");
|
||||||
|
let _entered = span.enter();
|
||||||
|
let span = tracing::trace_span!(target: "indexing::write_db", "post_merge");
|
||||||
|
let mut _entered_post_merge = None;
|
||||||
|
while let Some(action) = writer_receiver.recv_action() {
|
||||||
|
if _entered_post_merge.is_none()
|
||||||
|
&& finished_extraction.load(std::sync::atomic::Ordering::Relaxed)
|
||||||
|
{
|
||||||
|
_entered_post_merge = Some(span.enter());
|
||||||
|
}
|
||||||
|
|
||||||
|
match action {
|
||||||
|
ReceiverAction::WakeUp => (),
|
||||||
|
ReceiverAction::LargeEntry(LargeEntry { database, key, value }) => {
|
||||||
|
let database_name = database.database_name();
|
||||||
|
let database = database.database(index);
|
||||||
|
if let Err(error) = database.put(wtxn, &key, &value) {
|
||||||
|
return Err(Error::InternalError(InternalError::StorePut {
|
||||||
|
database_name,
|
||||||
|
key: bstr::BString::from(&key[..]),
|
||||||
|
value_length: value.len(),
|
||||||
|
error,
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ReceiverAction::LargeVectors(large_vectors) => {
|
||||||
|
let LargeVectors { docid, embedder_id, .. } = large_vectors;
|
||||||
|
let (_, _, writer, dimensions) =
|
||||||
|
arroy_writers.get(&embedder_id).expect("requested a missing embedder");
|
||||||
|
let mut embeddings = Embeddings::new(*dimensions);
|
||||||
|
for embedding in large_vectors.read_embeddings(*dimensions) {
|
||||||
|
embeddings.push(embedding.to_vec()).unwrap();
|
||||||
|
}
|
||||||
|
writer.del_items(wtxn, *dimensions, docid)?;
|
||||||
|
writer.add_items(wtxn, docid, &embeddings)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Every time the is a message in the channel we search
|
||||||
|
// for new entries in the BBQueue buffers.
|
||||||
|
write_from_bbqueue(
|
||||||
|
&mut writer_receiver,
|
||||||
|
index,
|
||||||
|
wtxn,
|
||||||
|
arroy_writers,
|
||||||
|
&mut aligned_embedding,
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
write_from_bbqueue(&mut writer_receiver, index, wtxn, arroy_writers, &mut aligned_embedding)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(level = "trace", skip_all, target = "indexing::vectors")]
|
||||||
|
pub(super) fn build_vectors<MSP>(
|
||||||
|
index: &Index,
|
||||||
|
wtxn: &mut RwTxn<'_>,
|
||||||
|
index_embeddings: Vec<IndexEmbeddingConfig>,
|
||||||
|
arroy_writers: &mut HashMap<u8, (&str, &Embedder, ArroyWrapper, usize)>,
|
||||||
|
must_stop_processing: &MSP,
|
||||||
|
) -> Result<()>
|
||||||
|
where
|
||||||
|
MSP: Fn() -> bool + Sync + Send,
|
||||||
|
{
|
||||||
|
if index_embeddings.is_empty() {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut rng = rand::rngs::StdRng::seed_from_u64(42);
|
||||||
|
for (_index, (_embedder_name, _embedder, writer, dimensions)) in arroy_writers {
|
||||||
|
let dimensions = *dimensions;
|
||||||
|
writer.build_and_quantize(wtxn, &mut rng, dimensions, false, must_stop_processing)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
index.put_embedding_configs(wtxn, index_embeddings)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) fn update_index(
|
||||||
|
index: &Index,
|
||||||
|
wtxn: &mut RwTxn<'_>,
|
||||||
|
new_fields_ids_map: FieldIdMapWithMetadata,
|
||||||
|
new_primary_key: Option<PrimaryKey<'_>>,
|
||||||
|
embedders: EmbeddingConfigs,
|
||||||
|
field_distribution: std::collections::BTreeMap<String, u64>,
|
||||||
|
document_ids: roaring::RoaringBitmap,
|
||||||
|
) -> Result<()> {
|
||||||
|
index.put_fields_ids_map(wtxn, new_fields_ids_map.as_fields_ids_map())?;
|
||||||
|
if let Some(new_primary_key) = new_primary_key {
|
||||||
|
index.put_primary_key(wtxn, new_primary_key.name())?;
|
||||||
|
}
|
||||||
|
let mut inner_index_settings = InnerIndexSettings::from_index(index, wtxn, Some(embedders))?;
|
||||||
|
inner_index_settings.recompute_facets(wtxn, index)?;
|
||||||
|
inner_index_settings.recompute_searchables(wtxn, index)?;
|
||||||
|
index.put_field_distribution(wtxn, &field_distribution)?;
|
||||||
|
index.put_documents_ids(wtxn, &document_ids)?;
|
||||||
|
index.set_updated_at(wtxn, &OffsetDateTime::now_utc())?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A function dedicated to manage all the available BBQueue frames.
|
||||||
|
///
|
||||||
|
/// It reads all the available frames, do the corresponding database operations
|
||||||
|
/// and stops when no frame are available.
|
||||||
|
pub fn write_from_bbqueue(
|
||||||
|
writer_receiver: &mut WriterBbqueueReceiver<'_>,
|
||||||
|
index: &Index,
|
||||||
|
wtxn: &mut RwTxn<'_>,
|
||||||
|
arroy_writers: &HashMap<u8, (&str, &crate::vector::Embedder, ArroyWrapper, usize)>,
|
||||||
|
aligned_embedding: &mut Vec<f32>,
|
||||||
|
) -> crate::Result<()> {
|
||||||
|
while let Some(frame_with_header) = writer_receiver.recv_frame() {
|
||||||
|
match frame_with_header.header() {
|
||||||
|
EntryHeader::DbOperation(operation) => {
|
||||||
|
let database_name = operation.database.database_name();
|
||||||
|
let database = operation.database.database(index);
|
||||||
|
let frame = frame_with_header.frame();
|
||||||
|
match operation.key_value(frame) {
|
||||||
|
(key, Some(value)) => {
|
||||||
|
if let Err(error) = database.put(wtxn, key, value) {
|
||||||
|
return Err(Error::InternalError(InternalError::StorePut {
|
||||||
|
database_name,
|
||||||
|
key: key.into(),
|
||||||
|
value_length: value.len(),
|
||||||
|
error,
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
(key, None) => match database.delete(wtxn, key) {
|
||||||
|
Ok(false) => {
|
||||||
|
unreachable!("We tried to delete an unknown key: {key:?}")
|
||||||
|
}
|
||||||
|
Ok(_) => (),
|
||||||
|
Err(error) => {
|
||||||
|
return Err(Error::InternalError(InternalError::StoreDeletion {
|
||||||
|
database_name,
|
||||||
|
key: key.into(),
|
||||||
|
error,
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
EntryHeader::ArroyDeleteVector(ArroyDeleteVector { docid }) => {
|
||||||
|
for (_index, (_name, _embedder, writer, dimensions)) in arroy_writers {
|
||||||
|
let dimensions = *dimensions;
|
||||||
|
writer.del_items(wtxn, dimensions, docid)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
EntryHeader::ArroySetVectors(asvs) => {
|
||||||
|
let ArroySetVectors { docid, embedder_id, .. } = asvs;
|
||||||
|
let frame = frame_with_header.frame();
|
||||||
|
let (_, _, writer, dimensions) =
|
||||||
|
arroy_writers.get(&embedder_id).expect("requested a missing embedder");
|
||||||
|
let mut embeddings = Embeddings::new(*dimensions);
|
||||||
|
let all_embeddings = asvs.read_all_embeddings_into_vec(frame, aligned_embedding);
|
||||||
|
embeddings.append(all_embeddings.to_vec()).unwrap();
|
||||||
|
writer.del_items(wtxn, *dimensions, docid)?;
|
||||||
|
writer.add_items(wtxn, docid, &embeddings)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
@ -25,7 +25,7 @@ impl WordPrefixDocids {
|
|||||||
fn new(
|
fn new(
|
||||||
database: Database<Bytes, CboRoaringBitmapCodec>,
|
database: Database<Bytes, CboRoaringBitmapCodec>,
|
||||||
prefix_database: Database<Bytes, CboRoaringBitmapCodec>,
|
prefix_database: Database<Bytes, CboRoaringBitmapCodec>,
|
||||||
grenad_parameters: GrenadParameters,
|
grenad_parameters: &GrenadParameters,
|
||||||
) -> WordPrefixDocids {
|
) -> WordPrefixDocids {
|
||||||
WordPrefixDocids {
|
WordPrefixDocids {
|
||||||
database,
|
database,
|
||||||
@ -161,7 +161,7 @@ impl WordPrefixIntegerDocids {
|
|||||||
fn new(
|
fn new(
|
||||||
database: Database<Bytes, CboRoaringBitmapCodec>,
|
database: Database<Bytes, CboRoaringBitmapCodec>,
|
||||||
prefix_database: Database<Bytes, CboRoaringBitmapCodec>,
|
prefix_database: Database<Bytes, CboRoaringBitmapCodec>,
|
||||||
grenad_parameters: GrenadParameters,
|
grenad_parameters: &GrenadParameters,
|
||||||
) -> WordPrefixIntegerDocids {
|
) -> WordPrefixIntegerDocids {
|
||||||
WordPrefixIntegerDocids {
|
WordPrefixIntegerDocids {
|
||||||
database,
|
database,
|
||||||
@ -311,7 +311,7 @@ pub fn compute_word_prefix_docids(
|
|||||||
index: &Index,
|
index: &Index,
|
||||||
prefix_to_compute: &BTreeSet<Prefix>,
|
prefix_to_compute: &BTreeSet<Prefix>,
|
||||||
prefix_to_delete: &BTreeSet<Prefix>,
|
prefix_to_delete: &BTreeSet<Prefix>,
|
||||||
grenad_parameters: GrenadParameters,
|
grenad_parameters: &GrenadParameters,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
WordPrefixDocids::new(
|
WordPrefixDocids::new(
|
||||||
index.word_docids.remap_key_type(),
|
index.word_docids.remap_key_type(),
|
||||||
@ -327,7 +327,7 @@ pub fn compute_exact_word_prefix_docids(
|
|||||||
index: &Index,
|
index: &Index,
|
||||||
prefix_to_compute: &BTreeSet<Prefix>,
|
prefix_to_compute: &BTreeSet<Prefix>,
|
||||||
prefix_to_delete: &BTreeSet<Prefix>,
|
prefix_to_delete: &BTreeSet<Prefix>,
|
||||||
grenad_parameters: GrenadParameters,
|
grenad_parameters: &GrenadParameters,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
WordPrefixDocids::new(
|
WordPrefixDocids::new(
|
||||||
index.exact_word_docids.remap_key_type(),
|
index.exact_word_docids.remap_key_type(),
|
||||||
@ -343,7 +343,7 @@ pub fn compute_word_prefix_fid_docids(
|
|||||||
index: &Index,
|
index: &Index,
|
||||||
prefix_to_compute: &BTreeSet<Prefix>,
|
prefix_to_compute: &BTreeSet<Prefix>,
|
||||||
prefix_to_delete: &BTreeSet<Prefix>,
|
prefix_to_delete: &BTreeSet<Prefix>,
|
||||||
grenad_parameters: GrenadParameters,
|
grenad_parameters: &GrenadParameters,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
WordPrefixIntegerDocids::new(
|
WordPrefixIntegerDocids::new(
|
||||||
index.word_fid_docids.remap_key_type(),
|
index.word_fid_docids.remap_key_type(),
|
||||||
@ -359,7 +359,7 @@ pub fn compute_word_prefix_position_docids(
|
|||||||
index: &Index,
|
index: &Index,
|
||||||
prefix_to_compute: &BTreeSet<Prefix>,
|
prefix_to_compute: &BTreeSet<Prefix>,
|
||||||
prefix_to_delete: &BTreeSet<Prefix>,
|
prefix_to_delete: &BTreeSet<Prefix>,
|
||||||
grenad_parameters: GrenadParameters,
|
grenad_parameters: &GrenadParameters,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
WordPrefixIntegerDocids::new(
|
WordPrefixIntegerDocids::new(
|
||||||
index.word_position_docids.remap_key_type(),
|
index.word_position_docids.remap_key_type(),
|
||||||
|
Loading…
Reference in New Issue
Block a user