diff --git a/milli/src/update/new/channel.rs b/milli/src/update/new/channel.rs index d5739a75e..acea02316 100644 --- a/milli/src/update/new/channel.rs +++ b/milli/src/update/new/channel.rs @@ -1,4 +1,5 @@ use std::fs::File; +use std::marker::PhantomData; use crossbeam_channel::{IntoIter, Receiver, SendError, Sender}; use grenad::Merger; @@ -17,20 +18,9 @@ pub fn merger_writer_channel(cap: usize) -> (MergerSender, WriterReceiver) { } /// The capacity of the channel is currently in number of messages. -pub fn extractors_merger_channels(cap: usize) -> ExtractorsMergerChannels { +pub fn extractors_merger_channels(cap: usize) -> (ExtractorSender, MergerReceiver) { let (sender, receiver) = crossbeam_channel::bounded(cap); - - ExtractorsMergerChannels { - merger_receiver: MergerReceiver(receiver), - deladd_cbo_roaring_bitmap_sender: DeladdCboRoaringBitmapSender(sender.clone()), - extracted_documents_sender: ExtractedDocumentsSender(sender.clone()), - } -} - -pub struct ExtractorsMergerChannels { - pub merger_receiver: MergerReceiver, - pub deladd_cbo_roaring_bitmap_sender: DeladdCboRoaringBitmapSender, - pub extracted_documents_sender: ExtractedDocumentsSender, + (ExtractorSender(sender), MergerReceiver(receiver)) } pub struct KeyValueEntry { @@ -113,6 +103,7 @@ pub struct WriterOperation { pub enum Database { WordDocids, + WordFidDocids, Documents, Main, } @@ -123,6 +114,7 @@ impl WriterOperation { Database::Main => index.main.remap_types(), Database::Documents => index.documents.remap_types(), Database::WordDocids => index.word_docids.remap_types(), + Database::WordFidDocids => index.word_fid_docids.remap_types(), } } @@ -149,8 +141,12 @@ impl MergerSender { MainSender(&self.0) } - pub fn word_docids(&self) -> WordDocidsSender<'_> { - WordDocidsSender(&self.0) + pub fn word_docids(&self) -> DocidsSender<'_, WordDocids> { + DocidsSender { sender: &self.0, _marker: PhantomData } + } + + pub fn word_fid_docids(&self) -> DocidsSender<'_, WordFidDocids> { + DocidsSender { sender: &self.0, _marker: PhantomData } } pub fn documents(&self) -> DocumentsSender<'_> { @@ -190,12 +186,34 @@ impl MainSender<'_> { } } -pub struct WordDocidsSender<'a>(&'a Sender); +pub enum WordDocids {} +pub enum WordFidDocids {} -impl WordDocidsSender<'_> { +pub trait DatabaseType { + fn database() -> Database; +} + +impl DatabaseType for WordDocids { + fn database() -> Database { + Database::WordDocids + } +} + +impl DatabaseType for WordFidDocids { + fn database() -> Database { + Database::WordFidDocids + } +} + +pub struct DocidsSender<'a, D> { + sender: &'a Sender, + _marker: PhantomData, +} + +impl DocidsSender<'_, D> { pub fn write(&self, key: &[u8], value: &[u8]) -> StdResult<(), SendError<()>> { let entry = EntryOperation::Write(KeyValueEntry::from_key_value(key, value)); - match self.0.send(WriterOperation { database: Database::WordDocids, entry }) { + match self.sender.send(WriterOperation { database: D::database(), entry }) { Ok(()) => Ok(()), Err(SendError(_)) => Err(SendError(())), } @@ -203,7 +221,7 @@ impl WordDocidsSender<'_> { pub fn delete(&self, key: &[u8]) -> StdResult<(), SendError<()>> { let entry = EntryOperation::Delete(KeyEntry::from_key(key)); - match self.0.send(WriterOperation { database: Database::WordDocids, entry }) { + match self.sender.send(WriterOperation { database: D::database(), entry }) { Ok(()) => Ok(()), Err(SendError(_)) => Err(SendError(())), } @@ -240,6 +258,7 @@ impl DocumentsSender<'_> { pub enum MergerOperation { WordDocidsMerger(Merger), + WordFidDocidsMerger(Merger), InsertDocument { docid: DocumentId, document: Box }, DeleteDocument { docid: DocumentId }, } @@ -255,27 +274,10 @@ impl IntoIterator for MergerReceiver { } } -#[derive(Clone)] -pub struct DeladdCboRoaringBitmapSender(Sender); +pub struct ExtractorSender(Sender); -impl DeladdCboRoaringBitmapSender { - pub fn word_docids( - &self, - merger: Merger, - ) -> StdResult<(), SendError<()>> { - let operation = MergerOperation::WordDocidsMerger(merger); - match self.0.send(operation) { - Ok(()) => Ok(()), - Err(SendError(_)) => Err(SendError(())), - } - } -} - -#[derive(Clone)] -pub struct ExtractedDocumentsSender(Sender); - -impl ExtractedDocumentsSender { - pub fn insert( +impl ExtractorSender { + pub fn document_insert( &self, docid: DocumentId, document: Box, @@ -286,10 +288,32 @@ impl ExtractedDocumentsSender { } } - pub fn delete(&self, docid: DocumentId) -> StdResult<(), SendError<()>> { + pub fn document_delete(&self, docid: DocumentId) -> StdResult<(), SendError<()>> { match self.0.send(MergerOperation::DeleteDocument { docid }) { Ok(()) => Ok(()), Err(SendError(_)) => Err(SendError(())), } } + + pub fn word_docids( + &self, + merger: Merger, + ) -> StdResult<(), SendError<()>> { + let operation = MergerOperation::WordDocidsMerger(merger); + match self.0.send(operation) { + Ok(()) => Ok(()), + Err(SendError(_)) => Err(SendError(())), + } + } + + pub fn word_fid_docids( + &self, + merger: Merger, + ) -> StdResult<(), SendError<()>> { + let operation = MergerOperation::WordFidDocidsMerger(merger); + match self.0.send(operation) { + Ok(()) => Ok(()), + Err(SendError(_)) => Err(SendError(())), + } + } } diff --git a/milli/src/update/new/extract/mod.rs b/milli/src/update/new/extract/mod.rs index 5e6c02c65..1964b88fc 100644 --- a/milli/src/update/new/extract/mod.rs +++ b/milli/src/update/new/extract/mod.rs @@ -1,5 +1,7 @@ mod cache; mod searchable; -pub use searchable::SearchableExtractor; -pub use searchable::WordDocidsExtractor; +pub use searchable::{ + ExactWordDocidsExtractor, SearchableExtractor, WordDocidsExtractor, WordFidDocidsExtractor, + WordPositionDocidsExtractor, +}; diff --git a/milli/src/update/new/indexer/mod.rs b/milli/src/update/new/indexer/mod.rs index 7a9999c28..539b6d602 100644 --- a/milli/src/update/new/indexer/mod.rs +++ b/milli/src/update/new/indexer/mod.rs @@ -11,11 +11,9 @@ use rayon::iter::{IntoParallelIterator, ParallelIterator}; use rayon::ThreadPool; pub use update_by_function::UpdateByFunction; -use super::channel::{ - extractors_merger_channels, merger_writer_channel, EntryOperation, ExtractorsMergerChannels, -}; +use super::channel::{extractors_merger_channels, merger_writer_channel, EntryOperation}; use super::document_change::DocumentChange; -use super::extract::{SearchableExtractor, WordDocidsExtractor}; +use super::extract::{SearchableExtractor, WordDocidsExtractor, WordFidDocidsExtractor}; use super::merger::merge_grenad_entries; use super::StdResult; use crate::documents::{ @@ -56,11 +54,8 @@ where PI::Iter: Clone, { let (merger_sender, writer_receiver) = merger_writer_channel(100); - let ExtractorsMergerChannels { - merger_receiver, - deladd_cbo_roaring_bitmap_sender, - extracted_documents_sender, - } = extractors_merger_channels(100); + // This channel acts as a rendezvous point to ensure that we are one task ahead + let (extractor_sender, merger_receiver) = extractors_merger_channels(0); let fields_ids_map_lock = RwLock::new(fields_ids_map); let global_fields_ids_map = GlobalFieldsIdsMap::new(&fields_ids_map_lock); @@ -76,17 +71,19 @@ where match result? { DocumentChange::Deletion(deletion) => { let docid = deletion.docid(); - extracted_documents_sender.delete(docid).unwrap(); + extractor_sender.document_delete(docid).unwrap(); } DocumentChange::Update(update) => { let docid = update.docid(); let content = update.new(); - extracted_documents_sender.insert(docid, content.boxed()).unwrap(); + extractor_sender.document_insert(docid, content.boxed()).unwrap(); } DocumentChange::Insertion(insertion) => { let docid = insertion.docid(); let content = insertion.new(); - extracted_documents_sender.insert(docid, content.boxed()).unwrap(); + extractor_sender.document_insert(docid, content.boxed()).unwrap(); + + // extracted_dictionary_sender.send(self, dictionary: &[u8]); } } Ok(()) as Result<_> @@ -102,7 +99,19 @@ where )?; /// TODO: manage the errors correctly - deladd_cbo_roaring_bitmap_sender.word_docids(merger).unwrap(); + extractor_sender.word_docids(merger).unwrap(); + + // word fid docids + let merger = WordFidDocidsExtractor::run_extraction( + index, + &global_fields_ids_map, + /// TODO: GrenadParameters::default() should be removed in favor a passed parameter + GrenadParameters::default(), + document_changes.clone(), + )?; + + /// TODO: manage the errors correctly + extractor_sender.word_fid_docids(merger).unwrap(); Ok(()) as Result<_> }) diff --git a/milli/src/update/new/merger.rs b/milli/src/update/new/merger.rs index b21f20b0f..c7f1a4385 100644 --- a/milli/src/update/new/merger.rs +++ b/milli/src/update/new/merger.rs @@ -79,6 +79,32 @@ pub fn merge_grenad_entries( let main_sender = sender.main(); main_sender.write_words_fst(&words_fst_mmap).unwrap(); } + MergerOperation::WordFidDocidsMerger(merger) => { + let word_docids_sender = sender.word_fid_docids(); + let database = index.word_fid_docids.remap_types::(); + + /// TODO manage the error correctly + let mut merger_iter = merger.into_stream_merger_iter().unwrap(); + + // TODO manage the error correctly + while let Some((key, deladd)) = merger_iter.next().unwrap() { + let current = database.get(rtxn, key)?; + let deladd: &KvReaderDelAdd = deladd.into(); + let del = deladd.get(DelAdd::Deletion); + let add = deladd.get(DelAdd::Addition); + + match merge_cbo_bitmaps(current, del, add)? { + Operation::Write(bitmap) => { + let value = cbo_bitmap_serialize_into_vec(&bitmap, &mut buffer); + word_docids_sender.write(key, value).unwrap(); + } + Operation::Delete => { + word_docids_sender.delete(key).unwrap(); + } + Operation::Ignore => (), + } + } + } MergerOperation::InsertDocument { docid, document } => { documents_ids.insert(docid); sender.documents().uncompressed(docid, &document).unwrap();