diff --git a/milli/src/update/new/channel.rs b/milli/src/update/new/channel.rs index 4123e568c..6780be72e 100644 --- a/milli/src/update/new/channel.rs +++ b/milli/src/update/new/channel.rs @@ -1,6 +1,7 @@ +use core::slice::SlicePattern; use std::fs::File; -use crossbeam_channel::{Receiver, RecvError, SendError, Sender}; +use crossbeam_channel::{IntoIter, Receiver, SendError, Sender}; use heed::types::Bytes; use super::indexer::KvReaderFieldId; @@ -8,20 +9,9 @@ use super::StdResult; use crate::{DocumentId, Index}; /// The capacity of the channel is currently in number of messages. -pub fn merger_writer_channels(cap: usize) -> MergerWriterChannels { +pub fn merger_writer_channels(cap: usize) -> (MergerSender, WriterReceiver) { let (sender, receiver) = crossbeam_channel::bounded(cap); - - MergerWriterChannels { - writer_receiver: WriterReceiver(receiver), - merger_sender: MergerSender(sender.clone()), - document_sender: DocumentSender(sender), - } -} - -pub struct MergerWriterChannels { - pub writer_receiver: WriterReceiver, - pub merger_sender: MergerSender, - pub document_sender: DocumentSender, + (MergerSender(sender), WriterReceiver(receiver)) } /// The capacity of the channel is currently in number of messages. @@ -53,8 +43,12 @@ impl KeyValueEntry { KeyValueEntry { key_length: key.len(), data: data.into_boxed_slice() } } - pub fn entry(&self) -> (&[u8], &[u8]) { - self.data.split_at(self.key_length) + pub fn key(&self) -> &[u8] { + &self.data.as_slice()[..self.key_length] + } + + pub fn value(&self) -> &[u8] { + &self.data.as_slice()[self.key_length..] } } @@ -72,7 +66,7 @@ impl KeyEntry { } } -enum EntryOperation { +pub enum EntryOperation { Delete(KeyEntry), Write(KeyValueEntry), } @@ -91,9 +85,12 @@ impl DocumentEntry { DocumentEntry { docid, content } } - pub fn entry(&self) -> ([u8; 4], &[u8]) { - let docid = self.docid.to_be_bytes(); - (docid, &self.content) + pub fn key(&self) -> [u8; 4] { + self.docid.to_be_bytes() + } + + pub fn content(&self) -> &[u8] { + &self.content } } @@ -113,9 +110,12 @@ impl WriterOperation { pub struct WriterReceiver(Receiver); -impl WriterReceiver { - pub fn recv(&self) -> StdResult { - self.0.recv() +impl IntoIterator for WriterReceiver { + type Item = WriterOperation; + type IntoIter = IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.0.into_iter() } } @@ -167,7 +167,7 @@ pub struct MergerReceiver(Receiver); impl IntoIterator for MergerReceiver { type Item = MergerOperation; - type IntoIter = crossbeam_channel::IntoIter; + type IntoIter = IntoIter; fn into_iter(self) -> Self::IntoIter { self.0.into_iter() diff --git a/milli/src/update/new/mod.rs b/milli/src/update/new/mod.rs index 24e9c95db..da76bdfee 100644 --- a/milli/src/update/new/mod.rs +++ b/milli/src/update/new/mod.rs @@ -15,15 +15,21 @@ mod indexer { use std::io::Cursor; use std::os::unix::fs::MetadataExt; use std::sync::Arc; + use std::thread; + use big_s::S; use heed::types::Bytes; - use heed::RoTxn; + use heed::{RoTxn, RwTxn}; use memmap2::Mmap; use rayon::iter::{IntoParallelIterator, ParallelBridge, ParallelIterator}; use roaring::RoaringBitmap; use serde_json::Value; - use super::channel::{MergerReceiver, MergerSender}; + use super::channel::{ + extractors_merger_channels, merger_writer_channels, EntryOperation, + ExtractorsMergerChannels, MergerReceiver, MergerSender, MergerWriterChannels, + WriterOperation, + }; use super::document_change::{Deletion, DocumentChange, Insertion, Update}; use super::items_pool::ItemsPool; use super::merge; @@ -363,6 +369,44 @@ mod indexer { pub struct UpdateByFunctionIndexer; + /// TODO return stats + /// TODO take the rayon ThreadPool + pub fn index(wtxn: &mut RwTxn, index: &Index, document_changes: PI) -> Result<()> + where + PI: IntoParallelIterator> + Send, + PI::Iter: Clone, + { + let (merger_sender, writer_receiver) = merger_writer_channels(100); + let ExtractorsMergerChannels { merger_receiver, deladd_cbo_roaring_bitmap_sender } = + extractors_merger_channels(100); + + thread::scope(|s| { + thread::Builder::new().name(S("indexer-extractors")).spawn_scoped(s, || { + document_changes.into_par_iter().for_each(|_dc| ()); + }); + + // TODO manage the errors correctly + thread::Builder::new().name(S("indexer-merger")).spawn_scoped(s, || { + let rtxn = index.read_txn().unwrap(); + merge_grenad_entries(merger_receiver, merger_sender, &rtxn, index).unwrap() + }); + + // TODO Split this code into another function + for operation in writer_receiver { + let database = operation.database(index); + match operation { + WriterOperation::WordDocids(operation) => match operation { + EntryOperation::Delete(e) => database.delete(wtxn, e.entry()).map(drop)?, + EntryOperation::Write(e) => database.put(wtxn, e.key(), e.value())?, + }, + WriterOperation::Document(e) => database.put(wtxn, &e.key(), e.content())?, + } + } + + Ok(()) + }) + } + enum Operation { Write(RoaringBitmap), Delete,