Introduce the indexer::index function that runs the indexation

This commit is contained in:
Clément Renault 2024-08-29 18:27:02 +02:00
parent 45c060831e
commit 27df9e6c73
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
2 changed files with 70 additions and 26 deletions

View File

@ -1,6 +1,7 @@
use core::slice::SlicePattern;
use std::fs::File; use std::fs::File;
use crossbeam_channel::{Receiver, RecvError, SendError, Sender}; use crossbeam_channel::{IntoIter, Receiver, SendError, Sender};
use heed::types::Bytes; use heed::types::Bytes;
use super::indexer::KvReaderFieldId; use super::indexer::KvReaderFieldId;
@ -8,20 +9,9 @@ use super::StdResult;
use crate::{DocumentId, Index}; use crate::{DocumentId, Index};
/// The capacity of the channel is currently in number of messages. /// The capacity of the channel is currently in number of messages.
pub fn merger_writer_channels(cap: usize) -> MergerWriterChannels { pub fn merger_writer_channels(cap: usize) -> (MergerSender, WriterReceiver) {
let (sender, receiver) = crossbeam_channel::bounded(cap); let (sender, receiver) = crossbeam_channel::bounded(cap);
(MergerSender(sender), WriterReceiver(receiver))
MergerWriterChannels {
writer_receiver: WriterReceiver(receiver),
merger_sender: MergerSender(sender.clone()),
document_sender: DocumentSender(sender),
}
}
pub struct MergerWriterChannels {
pub writer_receiver: WriterReceiver,
pub merger_sender: MergerSender,
pub document_sender: DocumentSender,
} }
/// The capacity of the channel is currently in number of messages. /// The capacity of the channel is currently in number of messages.
@ -53,8 +43,12 @@ impl KeyValueEntry {
KeyValueEntry { key_length: key.len(), data: data.into_boxed_slice() } KeyValueEntry { key_length: key.len(), data: data.into_boxed_slice() }
} }
pub fn entry(&self) -> (&[u8], &[u8]) { pub fn key(&self) -> &[u8] {
self.data.split_at(self.key_length) &self.data.as_slice()[..self.key_length]
}
pub fn value(&self) -> &[u8] {
&self.data.as_slice()[self.key_length..]
} }
} }
@ -72,7 +66,7 @@ impl KeyEntry {
} }
} }
enum EntryOperation { pub enum EntryOperation {
Delete(KeyEntry), Delete(KeyEntry),
Write(KeyValueEntry), Write(KeyValueEntry),
} }
@ -91,9 +85,12 @@ impl DocumentEntry {
DocumentEntry { docid, content } DocumentEntry { docid, content }
} }
pub fn entry(&self) -> ([u8; 4], &[u8]) { pub fn key(&self) -> [u8; 4] {
let docid = self.docid.to_be_bytes(); self.docid.to_be_bytes()
(docid, &self.content) }
pub fn content(&self) -> &[u8] {
&self.content
} }
} }
@ -113,9 +110,12 @@ impl WriterOperation {
pub struct WriterReceiver(Receiver<WriterOperation>); pub struct WriterReceiver(Receiver<WriterOperation>);
impl WriterReceiver { impl IntoIterator for WriterReceiver {
pub fn recv(&self) -> StdResult<WriterOperation, RecvError> { type Item = WriterOperation;
self.0.recv() type IntoIter = IntoIter<Self::Item>;
fn into_iter(self) -> Self::IntoIter {
self.0.into_iter()
} }
} }
@ -167,7 +167,7 @@ pub struct MergerReceiver(Receiver<MergerOperation>);
impl IntoIterator for MergerReceiver { impl IntoIterator for MergerReceiver {
type Item = MergerOperation; type Item = MergerOperation;
type IntoIter = crossbeam_channel::IntoIter<Self::Item>; type IntoIter = IntoIter<Self::Item>;
fn into_iter(self) -> Self::IntoIter { fn into_iter(self) -> Self::IntoIter {
self.0.into_iter() self.0.into_iter()

View File

@ -15,15 +15,21 @@ mod indexer {
use std::io::Cursor; use std::io::Cursor;
use std::os::unix::fs::MetadataExt; use std::os::unix::fs::MetadataExt;
use std::sync::Arc; use std::sync::Arc;
use std::thread;
use big_s::S;
use heed::types::Bytes; use heed::types::Bytes;
use heed::RoTxn; use heed::{RoTxn, RwTxn};
use memmap2::Mmap; use memmap2::Mmap;
use rayon::iter::{IntoParallelIterator, ParallelBridge, ParallelIterator}; use rayon::iter::{IntoParallelIterator, ParallelBridge, ParallelIterator};
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use serde_json::Value; use serde_json::Value;
use super::channel::{MergerReceiver, MergerSender}; use super::channel::{
extractors_merger_channels, merger_writer_channels, EntryOperation,
ExtractorsMergerChannels, MergerReceiver, MergerSender, MergerWriterChannels,
WriterOperation,
};
use super::document_change::{Deletion, DocumentChange, Insertion, Update}; use super::document_change::{Deletion, DocumentChange, Insertion, Update};
use super::items_pool::ItemsPool; use super::items_pool::ItemsPool;
use super::merge; use super::merge;
@ -363,6 +369,44 @@ mod indexer {
pub struct UpdateByFunctionIndexer; pub struct UpdateByFunctionIndexer;
/// TODO return stats
/// TODO take the rayon ThreadPool
pub fn index<PI>(wtxn: &mut RwTxn, index: &Index, document_changes: PI) -> Result<()>
where
PI: IntoParallelIterator<Item = Result<DocumentChange>> + Send,
PI::Iter: Clone,
{
let (merger_sender, writer_receiver) = merger_writer_channels(100);
let ExtractorsMergerChannels { merger_receiver, deladd_cbo_roaring_bitmap_sender } =
extractors_merger_channels(100);
thread::scope(|s| {
thread::Builder::new().name(S("indexer-extractors")).spawn_scoped(s, || {
document_changes.into_par_iter().for_each(|_dc| ());
});
// TODO manage the errors correctly
thread::Builder::new().name(S("indexer-merger")).spawn_scoped(s, || {
let rtxn = index.read_txn().unwrap();
merge_grenad_entries(merger_receiver, merger_sender, &rtxn, index).unwrap()
});
// TODO Split this code into another function
for operation in writer_receiver {
let database = operation.database(index);
match operation {
WriterOperation::WordDocids(operation) => match operation {
EntryOperation::Delete(e) => database.delete(wtxn, e.entry()).map(drop)?,
EntryOperation::Write(e) => database.put(wtxn, e.key(), e.value())?,
},
WriterOperation::Document(e) => database.put(wtxn, &e.key(), e.content())?,
}
}
Ok(())
})
}
enum Operation { enum Operation {
Write(RoaringBitmap), Write(RoaringBitmap),
Delete, Delete,