use std::cell::RefCell; use bumpalo::Bump; use hashbrown::HashMap; use super::DelAddRoaringBitmap; use crate::update::new::channel::DocumentsSender; use crate::update::new::document::{write_to_obkv, Document as _}; use crate::update::new::indexer::document_changes::{ DocumentChangeContext, Extractor, FullySend, RefCellExt as _, }; use crate::update::new::DocumentChange; use crate::vector::EmbeddingConfigs; use crate::Result; pub struct DocumentsExtractor<'a> { document_sender: &'a DocumentsSender<'a>, embedders: &'a EmbeddingConfigs, } impl<'a> DocumentsExtractor<'a> { pub fn new(document_sender: &'a DocumentsSender<'a>, embedders: &'a EmbeddingConfigs) -> Self { Self { document_sender, embedders } } } #[derive(Default)] pub struct DocumentExtractorData { pub docids_delta: DelAddRoaringBitmap, pub field_distribution_delta: HashMap, } impl<'a, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a> { type Data = FullySend>; fn init_data(&self, _extractor_alloc: &'extractor Bump) -> Result { Ok(FullySend(Default::default())) } fn process<'doc>( &self, changes: impl Iterator>>, context: &DocumentChangeContext, ) -> Result<()> { let mut document_buffer = bumpalo::collections::Vec::new_in(&context.doc_alloc); let mut document_extractor_data = context.data.0.borrow_mut_or_yield(); let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield(); for change in changes { let change = change?; let external_docid = change.external_docid().to_owned(); // document but we need to create a function that collects and compresses documents. match change { DocumentChange::Deletion(deletion) => { let docid = deletion.docid(); let content = deletion.current( &context.txn, context.index, &context.db_fields_ids_map, )?; for res in content.iter_top_level_fields() { let (f, _) = res?; let entry = document_extractor_data .field_distribution_delta .entry_ref(f) .or_default(); *entry -= 1; } document_extractor_data.docids_delta.insert_del_u32(docid); self.document_sender.delete(docid, external_docid).unwrap(); } DocumentChange::Update(update) => { let docid = update.docid(); let content = update.current(&context.txn, context.index, &context.db_fields_ids_map)?; for res in content.iter_top_level_fields() { let (f, _) = res?; let entry = document_extractor_data .field_distribution_delta .entry_ref(f) .or_default(); *entry -= 1; } let content = update.updated(); for res in content.iter_top_level_fields() { let (f, _) = res?; let entry = document_extractor_data .field_distribution_delta .entry_ref(f) .or_default(); *entry += 1; } let content = update.merged(&context.txn, context.index, &context.db_fields_ids_map)?; let vector_content = update.merged_vectors( &context.txn, context.index, &context.db_fields_ids_map, &context.doc_alloc, self.embedders, )?; let content = write_to_obkv( &content, vector_content.as_ref(), &mut new_fields_ids_map, &mut document_buffer, )?; self.document_sender.uncompressed(docid, external_docid, content).unwrap(); } DocumentChange::Insertion(insertion) => { let docid = insertion.docid(); let content = insertion.inserted(); for res in content.iter_top_level_fields() { let (f, _) = res?; let entry = document_extractor_data .field_distribution_delta .entry_ref(f) .or_default(); *entry += 1; } let inserted_vectors = insertion.inserted_vectors(&context.doc_alloc, self.embedders)?; let content = write_to_obkv( &content, inserted_vectors.as_ref(), &mut new_fields_ids_map, &mut document_buffer, )?; document_extractor_data.docids_delta.insert_add_u32(docid); self.document_sender.uncompressed(docid, external_docid, content).unwrap(); // extracted_dictionary_sender.send(self, dictionary: &[u8]); } } } Ok(()) } }