2024-09-02 15:21:00 +02:00
|
|
|
use std::thread::{self, Builder};
|
2024-09-02 10:42:19 +02:00
|
|
|
|
|
|
|
use big_s::S;
|
2024-09-02 14:42:27 +02:00
|
|
|
pub use document_deletion::DocumentDeletion;
|
|
|
|
pub use document_operation::DocumentOperation;
|
2024-09-02 10:42:19 +02:00
|
|
|
use heed::RwTxn;
|
2024-09-02 14:42:27 +02:00
|
|
|
pub use partial_dump::PartialDump;
|
2024-09-02 10:42:19 +02:00
|
|
|
use rayon::iter::{IntoParallelIterator, ParallelIterator};
|
|
|
|
use rayon::ThreadPool;
|
2024-09-02 14:42:27 +02:00
|
|
|
pub use update_by_function::UpdateByFunction;
|
2024-09-02 10:42:19 +02:00
|
|
|
|
|
|
|
use super::channel::{
|
2024-09-02 15:10:21 +02:00
|
|
|
extractors_merger_channels, merger_writer_channel, EntryOperation, ExtractorsMergerChannels,
|
2024-09-02 10:42:19 +02:00
|
|
|
WriterOperation,
|
|
|
|
};
|
|
|
|
use super::document_change::DocumentChange;
|
|
|
|
use super::merger::merge_grenad_entries;
|
|
|
|
use crate::{Index, Result};
|
|
|
|
|
|
|
|
mod document_deletion;
|
|
|
|
mod document_operation;
|
|
|
|
mod partial_dump;
|
|
|
|
mod update_by_function;
|
|
|
|
|
2024-09-02 15:10:21 +02:00
|
|
|
pub trait DocumentChanges<'p> {
|
2024-09-02 10:42:19 +02:00
|
|
|
type Parameter: 'p;
|
|
|
|
|
|
|
|
fn document_changes(
|
|
|
|
self,
|
|
|
|
param: Self::Parameter,
|
2024-09-02 15:21:00 +02:00
|
|
|
) -> Result<impl ParallelIterator<Item = Result<DocumentChange>> + 'p>;
|
2024-09-02 10:42:19 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
/// This is the main function of this crate.
|
|
|
|
///
|
|
|
|
/// Give it the output of the [`Indexer::document_changes`] method and it will execute it in the [`rayon::ThreadPool`].
|
|
|
|
///
|
|
|
|
/// TODO return stats
|
|
|
|
pub fn index<PI>(
|
|
|
|
wtxn: &mut RwTxn,
|
|
|
|
index: &Index,
|
|
|
|
pool: &ThreadPool,
|
|
|
|
document_changes: PI,
|
|
|
|
) -> Result<()>
|
|
|
|
where
|
2024-09-02 15:21:00 +02:00
|
|
|
PI: IntoParallelIterator<Item = Result<DocumentChange>> + Send,
|
2024-09-02 10:42:19 +02:00
|
|
|
PI::Iter: Clone,
|
|
|
|
{
|
2024-09-02 15:10:21 +02:00
|
|
|
let (merger_sender, writer_receiver) = merger_writer_channel(100);
|
2024-09-02 10:42:19 +02:00
|
|
|
let ExtractorsMergerChannels { merger_receiver, deladd_cbo_roaring_bitmap_sender } =
|
|
|
|
extractors_merger_channels(100);
|
|
|
|
|
|
|
|
thread::scope(|s| {
|
|
|
|
// TODO manage the errors correctly
|
2024-09-02 15:21:00 +02:00
|
|
|
let handle = Builder::new().name(S("indexer-extractors")).spawn_scoped(s, || {
|
|
|
|
pool.in_place_scope(|_s| {
|
|
|
|
// word docids
|
|
|
|
// document_changes.into_par_iter().try_for_each(|_dc| Ok(()) as Result<_>)
|
|
|
|
// let grenads = extractor_function(document_changes)?;
|
|
|
|
// deladd_cbo_roaring_bitmap_sender.word_docids(grenads)?;
|
2024-09-02 15:10:21 +02:00
|
|
|
|
2024-09-02 15:21:00 +02:00
|
|
|
Ok(()) as Result<_>
|
|
|
|
})
|
|
|
|
})?;
|
2024-09-02 10:42:19 +02:00
|
|
|
|
|
|
|
// TODO manage the errors correctly
|
2024-09-02 15:21:00 +02:00
|
|
|
let handle2 = Builder::new().name(S("indexer-merger")).spawn_scoped(s, || {
|
2024-09-02 10:42:19 +02:00
|
|
|
let rtxn = index.read_txn().unwrap();
|
2024-09-02 15:10:21 +02:00
|
|
|
merge_grenad_entries(merger_receiver, merger_sender, &rtxn, index)
|
2024-09-02 10:42:19 +02:00
|
|
|
})?;
|
|
|
|
|
|
|
|
// TODO Split this code into another function
|
|
|
|
for operation in writer_receiver {
|
|
|
|
let database = operation.database(index);
|
|
|
|
match operation {
|
|
|
|
WriterOperation::WordDocids(operation) => match operation {
|
|
|
|
EntryOperation::Delete(e) => database.delete(wtxn, e.entry()).map(drop)?,
|
|
|
|
EntryOperation::Write(e) => database.put(wtxn, e.key(), e.value())?,
|
|
|
|
},
|
|
|
|
WriterOperation::Document(e) => database.put(wtxn, &e.key(), e.content())?,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-09-02 15:10:21 +02:00
|
|
|
/// TODO handle the panicking threads
|
|
|
|
handle.join().unwrap()?;
|
|
|
|
handle2.join().unwrap()?;
|
|
|
|
|
2024-09-02 10:42:19 +02:00
|
|
|
Ok(())
|
|
|
|
})
|
|
|
|
}
|