2024-09-02 19:39:48 +02:00
|
|
|
use std::fs::File;
|
2024-09-03 12:01:01 +02:00
|
|
|
use std::sync::RwLock;
|
2024-09-02 15:21:00 +02:00
|
|
|
use std::thread::{self, Builder};
|
2024-09-02 10:42:19 +02:00
|
|
|
|
|
|
|
use big_s::S;
|
2024-09-02 14:42:27 +02:00
|
|
|
pub use document_deletion::DocumentDeletion;
|
|
|
|
pub use document_operation::DocumentOperation;
|
2024-09-02 19:39:48 +02:00
|
|
|
use heed::{RoTxn, RwTxn};
|
2024-09-02 14:42:27 +02:00
|
|
|
pub use partial_dump::PartialDump;
|
2024-09-02 10:42:19 +02:00
|
|
|
use rayon::iter::{IntoParallelIterator, ParallelIterator};
|
|
|
|
use rayon::ThreadPool;
|
2024-09-02 14:42:27 +02:00
|
|
|
pub use update_by_function::UpdateByFunction;
|
2024-09-02 10:42:19 +02:00
|
|
|
|
|
|
|
use super::channel::{
|
2024-09-02 15:10:21 +02:00
|
|
|
extractors_merger_channels, merger_writer_channel, EntryOperation, ExtractorsMergerChannels,
|
2024-09-02 10:42:19 +02:00
|
|
|
WriterOperation,
|
|
|
|
};
|
|
|
|
use super::document_change::DocumentChange;
|
2024-09-03 11:02:39 +02:00
|
|
|
use super::extract::{SearchableExtractor, WordDocidsExtractor};
|
2024-09-02 10:42:19 +02:00
|
|
|
use super::merger::merge_grenad_entries;
|
2024-09-02 19:39:48 +02:00
|
|
|
use super::StdResult;
|
|
|
|
use crate::documents::{
|
|
|
|
obkv_to_object, DocumentsBatchCursor, DocumentsBatchIndex, PrimaryKey, DEFAULT_PRIMARY_KEY,
|
|
|
|
};
|
2024-09-03 11:02:39 +02:00
|
|
|
use crate::update::GrenadParameters;
|
2024-09-03 12:01:01 +02:00
|
|
|
use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, Result, UserError};
|
2024-09-02 10:42:19 +02:00
|
|
|
|
|
|
|
mod document_deletion;
|
|
|
|
mod document_operation;
|
|
|
|
mod partial_dump;
|
|
|
|
mod update_by_function;
|
|
|
|
|
2024-09-02 15:10:21 +02:00
|
|
|
pub trait DocumentChanges<'p> {
|
2024-09-02 10:42:19 +02:00
|
|
|
type Parameter: 'p;
|
|
|
|
|
|
|
|
fn document_changes(
|
|
|
|
self,
|
2024-09-03 12:01:01 +02:00
|
|
|
fields_ids_map: &mut FieldsIdsMap,
|
2024-09-02 10:42:19 +02:00
|
|
|
param: Self::Parameter,
|
2024-09-02 19:39:48 +02:00
|
|
|
) -> Result<impl ParallelIterator<Item = Result<DocumentChange>> + Clone + 'p>;
|
2024-09-02 10:42:19 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
/// This is the main function of this crate.
|
|
|
|
///
|
|
|
|
/// Give it the output of the [`Indexer::document_changes`] method and it will execute it in the [`rayon::ThreadPool`].
|
|
|
|
///
|
|
|
|
/// TODO return stats
|
|
|
|
pub fn index<PI>(
|
|
|
|
wtxn: &mut RwTxn,
|
|
|
|
index: &Index,
|
2024-09-03 12:01:01 +02:00
|
|
|
fields_ids_map: FieldsIdsMap,
|
2024-09-02 10:42:19 +02:00
|
|
|
pool: &ThreadPool,
|
2024-09-03 11:02:39 +02:00
|
|
|
document_changes: PI,
|
2024-09-02 10:42:19 +02:00
|
|
|
) -> Result<()>
|
|
|
|
where
|
2024-09-02 15:21:00 +02:00
|
|
|
PI: IntoParallelIterator<Item = Result<DocumentChange>> + Send,
|
2024-09-02 10:42:19 +02:00
|
|
|
PI::Iter: Clone,
|
|
|
|
{
|
2024-09-02 15:10:21 +02:00
|
|
|
let (merger_sender, writer_receiver) = merger_writer_channel(100);
|
2024-09-02 10:42:19 +02:00
|
|
|
let ExtractorsMergerChannels { merger_receiver, deladd_cbo_roaring_bitmap_sender } =
|
|
|
|
extractors_merger_channels(100);
|
|
|
|
|
2024-09-03 12:01:01 +02:00
|
|
|
let fields_ids_map_lock = RwLock::new(fields_ids_map);
|
|
|
|
let global_fields_ids_map = GlobalFieldsIdsMap::new(&fields_ids_map_lock);
|
|
|
|
|
2024-09-02 10:42:19 +02:00
|
|
|
thread::scope(|s| {
|
|
|
|
// TODO manage the errors correctly
|
2024-09-02 15:21:00 +02:00
|
|
|
let handle = Builder::new().name(S("indexer-extractors")).spawn_scoped(s, || {
|
|
|
|
pool.in_place_scope(|_s| {
|
2024-09-03 11:02:39 +02:00
|
|
|
let document_changes = document_changes.into_par_iter();
|
2024-09-02 15:21:00 +02:00
|
|
|
// word docids
|
2024-09-03 11:02:39 +02:00
|
|
|
let merger = WordDocidsExtractor::run_extraction(
|
|
|
|
index,
|
2024-09-03 12:01:01 +02:00
|
|
|
&global_fields_ids_map,
|
2024-09-03 11:02:39 +02:00
|
|
|
/// TODO: GrenadParameters::default() should be removed in favor a passed parameter
|
|
|
|
GrenadParameters::default(),
|
|
|
|
document_changes.clone(),
|
|
|
|
)?;
|
|
|
|
|
|
|
|
/// TODO: manage the errors correctly
|
|
|
|
deladd_cbo_roaring_bitmap_sender.word_docids(merger).unwrap();
|
2024-09-02 15:10:21 +02:00
|
|
|
|
2024-09-02 15:21:00 +02:00
|
|
|
Ok(()) as Result<_>
|
|
|
|
})
|
|
|
|
})?;
|
2024-09-02 10:42:19 +02:00
|
|
|
|
|
|
|
// TODO manage the errors correctly
|
2024-09-02 15:21:00 +02:00
|
|
|
let handle2 = Builder::new().name(S("indexer-merger")).spawn_scoped(s, || {
|
2024-09-02 10:42:19 +02:00
|
|
|
let rtxn = index.read_txn().unwrap();
|
2024-09-02 15:10:21 +02:00
|
|
|
merge_grenad_entries(merger_receiver, merger_sender, &rtxn, index)
|
2024-09-02 10:42:19 +02:00
|
|
|
})?;
|
|
|
|
|
|
|
|
// TODO Split this code into another function
|
|
|
|
for operation in writer_receiver {
|
|
|
|
let database = operation.database(index);
|
|
|
|
match operation {
|
|
|
|
WriterOperation::WordDocids(operation) => match operation {
|
|
|
|
EntryOperation::Delete(e) => database.delete(wtxn, e.entry()).map(drop)?,
|
|
|
|
EntryOperation::Write(e) => database.put(wtxn, e.key(), e.value())?,
|
|
|
|
},
|
|
|
|
WriterOperation::Document(e) => database.put(wtxn, &e.key(), e.content())?,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-09-02 15:10:21 +02:00
|
|
|
/// TODO handle the panicking threads
|
|
|
|
handle.join().unwrap()?;
|
|
|
|
handle2.join().unwrap()?;
|
|
|
|
|
2024-09-03 12:01:01 +02:00
|
|
|
Ok(()) as Result<_>
|
|
|
|
})?;
|
|
|
|
|
|
|
|
let fields_ids_map = fields_ids_map_lock.into_inner().unwrap();
|
|
|
|
index.put_fields_ids_map(wtxn, &fields_ids_map)?;
|
|
|
|
|
|
|
|
Ok(())
|
2024-09-02 10:42:19 +02:00
|
|
|
}
|
2024-09-02 19:39:48 +02:00
|
|
|
|
|
|
|
/// TODO move this elsewhere
|
|
|
|
pub fn guess_primary_key<'a>(
|
|
|
|
rtxn: &'a RoTxn<'a>,
|
|
|
|
index: &Index,
|
|
|
|
mut cursor: DocumentsBatchCursor<File>,
|
|
|
|
documents_batch_index: &'a DocumentsBatchIndex,
|
|
|
|
) -> Result<StdResult<PrimaryKey<'a>, UserError>> {
|
|
|
|
// The primary key *field id* that has already been set for this index or the one
|
|
|
|
// we will guess by searching for the first key that contains "id" as a substring.
|
|
|
|
match index.primary_key(rtxn)? {
|
|
|
|
Some(primary_key) => match PrimaryKey::new(primary_key, documents_batch_index) {
|
|
|
|
Some(primary_key) => Ok(Ok(primary_key)),
|
|
|
|
None => match cursor.next_document()? {
|
|
|
|
Some(first_document) => Ok(Err(UserError::MissingDocumentId {
|
|
|
|
primary_key: primary_key.to_string(),
|
|
|
|
document: obkv_to_object(first_document, documents_batch_index)?,
|
|
|
|
})),
|
|
|
|
None => unreachable!("Called with reader.is_empty()"),
|
|
|
|
},
|
|
|
|
},
|
|
|
|
None => {
|
|
|
|
let mut guesses: Vec<(u16, &str)> = documents_batch_index
|
|
|
|
.iter()
|
|
|
|
.filter(|(_, name)| name.to_lowercase().ends_with(DEFAULT_PRIMARY_KEY))
|
|
|
|
.map(|(field_id, name)| (*field_id, name.as_str()))
|
|
|
|
.collect();
|
|
|
|
|
|
|
|
// sort the keys in a deterministic, obvious way, so that fields are always in the same order.
|
|
|
|
guesses.sort_by(|(_, left_name), (_, right_name)| {
|
|
|
|
// shortest name first
|
|
|
|
left_name.len().cmp(&right_name.len()).then_with(
|
|
|
|
// then alphabetical order
|
|
|
|
|| left_name.cmp(right_name),
|
|
|
|
)
|
|
|
|
});
|
|
|
|
|
|
|
|
match guesses.as_slice() {
|
|
|
|
[] => Ok(Err(UserError::NoPrimaryKeyCandidateFound)),
|
|
|
|
[(field_id, name)] => {
|
|
|
|
tracing::info!("Primary key was not specified in index. Inferred to '{name}'");
|
|
|
|
Ok(Ok(PrimaryKey::Flat { name, field_id: *field_id }))
|
|
|
|
}
|
|
|
|
multiple => Ok(Err(UserError::MultiplePrimaryKeyCandidatesFound {
|
|
|
|
candidates: multiple
|
|
|
|
.iter()
|
|
|
|
.map(|(_, candidate)| candidate.to_string())
|
|
|
|
.collect(),
|
|
|
|
})),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|