305 lines
13 KiB
Rust
Raw Normal View History

2024-09-26 18:59:28 +02:00
use std::sync::{Arc, RwLock};
use std::thread::{self, Builder};
use big_s::S;
2024-09-02 14:42:27 +02:00
pub use document_deletion::DocumentDeletion;
pub use document_operation::DocumentOperation;
use heed::{RoTxn, RwTxn};
2024-09-02 14:42:27 +02:00
pub use partial_dump::PartialDump;
2024-09-30 16:08:29 +02:00
use rayon::iter::{IndexedParallelIterator, IntoParallelIterator};
use rayon::ThreadPool;
2024-09-02 14:42:27 +02:00
pub use update_by_function::UpdateByFunction;
use super::channel::*;
use super::document_change::DocumentChange;
use super::extract::*;
use super::merger::merge_grenad_entries;
use super::word_fst_builder::PrefixDelta;
use super::words_prefix_docids::{
compute_word_prefix_docids, compute_word_prefix_fid_docids, compute_word_prefix_position_docids,
};
use super::{StdResult, TopLevelMap};
2024-09-12 15:38:31 +02:00
use crate::documents::{PrimaryKey, DEFAULT_PRIMARY_KEY};
use crate::update::new::channel::ExtractorSender;
2024-09-30 11:35:03 +02:00
use crate::update::settings::InnerIndexSettings;
use crate::update::new::items_pool::ParallelIteratorExt;
2024-09-03 11:02:39 +02:00
use crate::update::GrenadParameters;
2024-09-26 17:48:32 +02:00
use crate::{Error, FieldsIdsMap, GlobalFieldsIdsMap, Index, Result, UserError};
mod document_deletion;
mod document_operation;
mod partial_dump;
mod update_by_function;
2024-09-02 15:10:21 +02:00
pub trait DocumentChanges<'p> {
type Parameter: 'p;
fn document_changes(
self,
fields_ids_map: &mut FieldsIdsMap,
param: Self::Parameter,
2024-09-26 17:48:32 +02:00
) -> Result<
2024-09-26 18:59:28 +02:00
impl IndexedParallelIterator<Item = std::result::Result<DocumentChange, Arc<Error>>>
+ Clone
2024-09-26 17:48:32 +02:00
+ 'p,
>;
}
/// This is the main function of this crate.
///
/// Give it the output of the [`Indexer::document_changes`] method and it will execute it in the [`rayon::ThreadPool`].
///
/// TODO return stats
pub fn index<PI>(
wtxn: &mut RwTxn,
index: &Index,
fields_ids_map: FieldsIdsMap,
pool: &ThreadPool,
2024-09-03 11:02:39 +02:00
document_changes: PI,
) -> Result<()>
where
2024-09-26 18:59:28 +02:00
PI: IndexedParallelIterator<Item = std::result::Result<DocumentChange, Arc<Error>>>
2024-09-26 17:48:32 +02:00
+ Send
+ Clone,
{
2024-09-05 15:12:07 +02:00
let (merger_sender, writer_receiver) = merger_writer_channel(10_000);
// This channel acts as a rendezvous point to ensure that we are one task ahead
2024-09-11 10:20:23 +02:00
let (extractor_sender, merger_receiver) = extractors_merger_channels(4);
let fields_ids_map_lock = RwLock::new(fields_ids_map);
let global_fields_ids_map = GlobalFieldsIdsMap::new(&fields_ids_map_lock);
2024-09-12 18:01:02 +02:00
let global_fields_ids_map_clone = global_fields_ids_map.clone();
thread::scope(|s| {
// TODO manage the errors correctly
2024-09-05 17:36:19 +02:00
let current_span = tracing::Span::current();
let handle = Builder::new().name(S("indexer-extractors")).spawn_scoped(s, move || {
pool.in_place_scope(|_s| {
2024-09-05 17:36:19 +02:00
let span = tracing::trace_span!(target: "indexing::documents", parent: &current_span, "extract");
let _entered = span.enter();
let document_changes = document_changes.into_par_iter();
// document but we need to create a function that collects and compresses documents.
2024-09-12 18:01:02 +02:00
let document_sender = extractor_sender.document_sender();
document_changes.clone().into_par_iter().try_for_each_try_init(|| Ok(()) as Result<_>, |_, result| {
match result? {
DocumentChange::Deletion(deletion) => {
let docid = deletion.docid();
2024-09-12 18:01:02 +02:00
document_sender.delete(docid).unwrap();
}
DocumentChange::Update(update) => {
let docid = update.docid();
let content = update.new();
2024-09-12 18:01:02 +02:00
document_sender.insert(docid, content.boxed()).unwrap();
}
DocumentChange::Insertion(insertion) => {
let docid = insertion.docid();
let content = insertion.new();
2024-09-12 18:01:02 +02:00
document_sender.insert(docid, content.boxed()).unwrap();
// extracted_dictionary_sender.send(self, dictionary: &[u8]);
}
}
Ok(()) as std::result::Result<_, Arc<_>>
})?;
2024-09-12 18:01:02 +02:00
document_sender.finish().unwrap();
2024-09-05 15:12:07 +02:00
const TEN_GIB: usize = 10 * 1024 * 1024 * 1024;
let max_memory = TEN_GIB / dbg!(rayon::current_num_threads());
let grenad_parameters = GrenadParameters {
max_memory: Some(max_memory),
..GrenadParameters::default()
};
2024-09-16 09:34:10 +02:00
{
let span = tracing::trace_span!(target: "indexing::documents::extract", "faceted");
let _entered = span.enter();
extract_and_send_docids::<
FacetedDocidsExtractor,
FacetDocids,
>(
index,
&global_fields_ids_map,
grenad_parameters,
document_changes.clone(),
&extractor_sender,
)?;
}
2024-09-05 17:36:19 +02:00
{
let span = tracing::trace_span!(target: "indexing::documents::extract", "word_docids");
let _entered = span.enter();
2024-09-11 10:20:23 +02:00
let WordDocidsMergers {
word_fid_docids,
word_docids,
exact_word_docids,
word_position_docids,
fid_word_count_docids,
} = WordDocidsExtractors::run_extraction(index, &global_fields_ids_map, grenad_parameters, document_changes.clone())?;
extractor_sender.send_searchable::<WordDocids>(word_docids).unwrap();
extractor_sender.send_searchable::<WordFidDocids>(word_fid_docids).unwrap();
extractor_sender.send_searchable::<ExactWordDocids>(exact_word_docids).unwrap();
extractor_sender.send_searchable::<WordPositionDocids>(word_position_docids).unwrap();
extractor_sender.send_searchable::<FidWordCountDocids>(fid_word_count_docids).unwrap();
2024-09-05 17:36:19 +02:00
}
2024-09-05 17:36:19 +02:00
{
let span = tracing::trace_span!(target: "indexing::documents::extract", "word_pair_proximity_docids");
let _entered = span.enter();
extract_and_send_docids::<
WordPairProximityDocidsExtractor,
WordPairProximityDocids,
>(
index,
&global_fields_ids_map,
grenad_parameters,
document_changes.clone(),
&extractor_sender,
)?;
}
{
let span = tracing::trace_span!(target: "indexing::documents::extract", "FINISH");
let _entered = span.enter();
}
// TODO THIS IS TOO MUCH
2024-09-16 09:34:10 +02:00
// - [ ] Extract fieldid docid facet number
// - [ ] Extract fieldid docid facet string
// - [ ] Extract facetid string fst
// - [ ] Extract facetid normalized string strings
// TODO Inverted Indexes again
2024-09-16 09:34:10 +02:00
// - [x] Extract fieldid facet isempty docids
// - [x] Extract fieldid facet isnull docids
// - [x] Extract fieldid facet exists docids
// TODO This is the normal system
2024-09-16 09:34:10 +02:00
// - [x] Extract fieldid facet number docids
// - [x] Extract fieldid facet string docids
Ok(()) as Result<_>
})
})?;
// TODO manage the errors correctly
2024-09-05 17:36:19 +02:00
let current_span = tracing::Span::current();
let merger_thread = Builder::new().name(S("indexer-merger")).spawn_scoped(s, move || {
2024-09-05 17:36:19 +02:00
let span =
tracing::trace_span!(target: "indexing::documents", parent: &current_span, "merge");
let _entered = span.enter();
let rtxn = index.read_txn().unwrap();
2024-09-12 18:01:02 +02:00
merge_grenad_entries(
merger_receiver,
merger_sender,
&rtxn,
index,
global_fields_ids_map_clone,
)
})?;
for operation in writer_receiver {
let database = operation.database(index);
match operation.entry() {
EntryOperation::Delete(e) => {
if !database.delete(wtxn, e.entry())? {
unreachable!("We tried to delete an unknown key")
}
}
EntryOperation::Write(e) => database.put(wtxn, e.key(), e.value())?,
}
}
2024-09-02 15:10:21 +02:00
/// TODO handle the panicking threads
handle.join().unwrap()?;
let merger_result = merger_thread.join().unwrap()?;
if let Some(prefix_delta) = merger_result.prefix_delta {
let span = tracing::trace_span!(target: "indexing", "prefix");
let _entered = span.enter();
let PrefixDelta { modified, deleted } = prefix_delta;
// Compute word prefix docids
compute_word_prefix_docids(wtxn, index, &modified, &deleted)?;
// Compute word prefix fid docids
compute_word_prefix_fid_docids(wtxn, index, &modified, &deleted)?;
// Compute word prefix position docids
compute_word_prefix_position_docids(wtxn, index, &modified, &deleted)?;
}
2024-09-02 15:10:21 +02:00
Ok(()) as Result<_>
})?;
let fields_ids_map = fields_ids_map_lock.into_inner().unwrap();
index.put_fields_ids_map(wtxn, &fields_ids_map)?;
2024-09-30 11:35:03 +02:00
// used to update the localized and weighted maps while sharing the update code with the settings pipeline.
let mut inner_index_settings = InnerIndexSettings::from_index(index, wtxn)?;
inner_index_settings.recompute_facets(wtxn, index)?;
inner_index_settings.recompute_searchables(wtxn, index)?;
Ok(())
}
2024-09-04 12:17:13 +02:00
/// TODO: GrenadParameters::default() should be removed in favor a passed parameter
/// TODO: manage the errors correctly
/// TODO: we must have a single trait that also gives the extractor type
2024-09-16 09:34:10 +02:00
fn extract_and_send_docids<E: DocidsExtractor, D: MergerOperationType>(
2024-09-04 12:17:13 +02:00
index: &Index,
fields_ids_map: &GlobalFieldsIdsMap,
indexer: GrenadParameters,
2024-09-26 18:59:28 +02:00
document_changes: impl IntoParallelIterator<Item = std::result::Result<DocumentChange, Arc<Error>>>,
2024-09-04 12:17:13 +02:00
sender: &ExtractorSender,
) -> Result<()> {
let merger = E::run_extraction(index, fields_ids_map, indexer, document_changes)?;
2024-09-30 16:08:29 +02:00
sender.send_searchable::<D>(merger).unwrap();
Ok(())
2024-09-04 12:17:13 +02:00
}
2024-09-11 15:59:30 +02:00
/// Returns the primary key *field id* that has already been set for this index or the
/// one we will guess by searching for the first key that contains "id" as a substring.
/// TODO move this elsewhere
2024-09-11 15:59:30 +02:00
pub fn retrieve_or_guess_primary_key<'a>(
rtxn: &'a RoTxn<'a>,
index: &Index,
2024-09-11 15:59:30 +02:00
fields_ids_map: &mut FieldsIdsMap,
first_document: Option<&'a TopLevelMap<'_>>,
) -> Result<StdResult<PrimaryKey<'a>, UserError>> {
match index.primary_key(rtxn)? {
2024-09-11 15:59:30 +02:00
Some(primary_key) => match PrimaryKey::new(primary_key, fields_ids_map) {
Some(primary_key) => Ok(Ok(primary_key)),
2024-09-11 15:59:30 +02:00
None => unreachable!("Why is the primary key not in the fidmap?"),
},
None => {
2024-09-11 15:59:30 +02:00
let first_document = match first_document {
Some(document) => document,
None => return Ok(Err(UserError::NoPrimaryKeyCandidateFound)),
};
let mut guesses: Vec<&str> = first_document
.keys()
.map(AsRef::as_ref)
.filter(|name| name.to_lowercase().ends_with(DEFAULT_PRIMARY_KEY))
.collect();
2024-09-11 15:59:30 +02:00
// sort the keys in lexicographical order, so that fields are always in the same order.
guesses.sort_unstable();
match guesses.as_slice() {
[] => Ok(Err(UserError::NoPrimaryKeyCandidateFound)),
2024-09-11 15:59:30 +02:00
[name] => {
tracing::info!("Primary key was not specified in index. Inferred to '{name}'");
2024-09-11 15:59:30 +02:00
match fields_ids_map.insert(name) {
Some(field_id) => Ok(Ok(PrimaryKey::Flat { name, field_id })),
None => Ok(Err(UserError::AttributeLimitReached)),
}
}
multiple => Ok(Err(UserError::MultiplePrimaryKeyCandidatesFound {
2024-09-11 15:59:30 +02:00
candidates: multiple.iter().map(|candidate| candidate.to_string()).collect(),
})),
}
}
}
}