From 10f49f0d75f038b3fe240e868300f6728414b456 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Wed, 6 Nov 2024 17:50:12 +0100 Subject: [PATCH] Post processing of the merge --- crates/meilitool/src/main.rs | 2 +- crates/milli/src/update/new/channel.rs | 30 +- crates/milli/src/update/new/document.rs | 4 +- .../milli/src/update/new/extract/documents.rs | 145 ++- .../new/extract/faceted/extract_facets.rs | 2 +- crates/milli/src/update/new/extract/mod.rs | 4 +- .../extract/searchable/extract_word_docids.rs | 45 +- .../src/update/new/extract/vectors/mod.rs | 32 +- crates/milli/src/update/new/indexer/mod.rs | 405 +++++++-- .../milli/src/update/new/vector_document.rs | 14 +- crates/milli/src/vector/error.rs | 2 +- milli/src/update/new/indexer/mod.rs | 823 ------------------ 12 files changed, 512 insertions(+), 996 deletions(-) delete mode 100644 milli/src/update/new/indexer/mod.rs diff --git a/crates/meilitool/src/main.rs b/crates/meilitool/src/main.rs index 978824356..f84cea98d 100644 --- a/crates/meilitool/src/main.rs +++ b/crates/meilitool/src/main.rs @@ -264,7 +264,7 @@ fn export_a_dump( format!("While iterating on content file {:?}", content_file_uuid) })? { dump_content_file - .push_document(&obkv_to_object(&doc, &documents_batch_index)?)?; + .push_document(&obkv_to_object(doc, &documents_batch_index)?)?; } dump_content_file.flush()?; count += 1; diff --git a/crates/milli/src/update/new/channel.rs b/crates/milli/src/update/new/channel.rs index dee82e6d9..a4896ee3f 100644 --- a/crates/milli/src/update/new/channel.rs +++ b/crates/milli/src/update/new/channel.rs @@ -2,16 +2,13 @@ use std::marker::PhantomData; use std::sync::atomic::{AtomicUsize, Ordering}; use crossbeam_channel::{IntoIter, Receiver, SendError, Sender}; -use grenad::Merger; use hashbrown::HashMap; use heed::types::Bytes; use roaring::RoaringBitmap; use super::extract::FacetKind; use super::StdResult; -use crate::index::main_key::DOCUMENTS_IDS_KEY; use crate::update::new::KvReaderFieldId; -use crate::update::MergeDeladdCboRoaringBitmaps; use crate::vector::Embedding; use crate::{DocumentId, Index}; @@ -41,14 +38,6 @@ impl KeyValueEntry { data.extend_from_slice(value); KeyValueEntry { key_length: key.len(), data: data.into_boxed_slice() } } - - pub fn from_small_key_bitmap(key: &[u8], bitmap: RoaringBitmap) -> Self { - let mut data = Vec::with_capacity(key.len() + bitmap.serialized_size()); - data.extend_from_slice(key); - bitmap.serialize_into(&mut data).unwrap(); - KeyValueEntry { key_length: key.len(), data: data.into_boxed_slice() } - } - pub fn key(&self) -> &[u8] { &self.data[..self.key_length] } @@ -113,7 +102,6 @@ pub enum Database { ExternalDocumentsIds, ExactWordDocids, FidWordCountDocids, - Main, WordDocids, WordFidDocids, WordPairProximityDocids, @@ -131,7 +119,6 @@ impl Database { Database::Documents => index.documents.remap_types(), Database::ExternalDocumentsIds => index.external_documents_ids.remap_types(), Database::ExactWordDocids => index.exact_word_docids.remap_types(), - Database::Main => index.main.remap_types(), Database::WordDocids => index.word_docids.remap_types(), Database::WordFidDocids => index.word_fid_docids.remap_types(), Database::WordPositionDocids => index.word_position_docids.remap_types(), @@ -217,12 +204,15 @@ impl ExtractorSender { DocumentsSender(self) } - pub fn send_documents_ids(&self, documents_ids: RoaringBitmap) -> StdResult<(), SendError<()>> { - let entry = EntryOperation::Write(KeyValueEntry::from_small_key_bitmap( - DOCUMENTS_IDS_KEY.as_bytes(), - documents_ids, - )); - match self.send_db_operation(DbOperation { database: Database::Main, entry }) { + pub fn embeddings(&self) -> EmbeddingSender<'_> { + EmbeddingSender(&self.sender) + } + + fn send_delete_vector(&self, docid: DocumentId) -> StdResult<(), SendError<()>> { + match self + .sender + .send(WriterOperation::ArroyOperation(ArroyOperation::DeleteVectors { docid })) + { Ok(()) => Ok(()), Err(SendError(_)) => Err(SendError(())), } @@ -381,6 +371,8 @@ impl DocumentsSender<'_> { Err(SendError(_)) => Err(SendError(())), }?; + self.0.send_delete_vector(docid)?; + let entry = EntryOperation::Delete(KeyEntry::from_key(external_id.as_bytes())); match self .0 diff --git a/crates/milli/src/update/new/document.rs b/crates/milli/src/update/new/document.rs index 068268c4e..14e4f72e5 100644 --- a/crates/milli/src/update/new/document.rs +++ b/crates/milli/src/update/new/document.rs @@ -286,11 +286,11 @@ where /// /// - If the document contains a top-level field that is not present in `fields_ids_map`. /// -pub fn write_to_obkv<'s, 'a, 'map>( +pub fn write_to_obkv<'s, 'a, 'map, 'buffer>( document: &'s impl Document<'s>, vector_document: Option<&'s impl VectorDocument<'s>>, fields_ids_map: &'a mut GlobalFieldsIdsMap<'map>, - mut document_buffer: &'a mut Vec, + mut document_buffer: &'a mut bumpalo::collections::Vec<'buffer, u8>, ) -> Result<&'a KvReaderFieldId> where 's: 'a, diff --git a/crates/milli/src/update/new/extract/documents.rs b/crates/milli/src/update/new/extract/documents.rs index 21fe4d518..79e1a2462 100644 --- a/crates/milli/src/update/new/extract/documents.rs +++ b/crates/milli/src/update/new/extract/documents.rs @@ -1,73 +1,140 @@ use std::cell::RefCell; use bumpalo::Bump; +use hashbrown::HashMap; use super::DelAddRoaringBitmap; use crate::update::new::channel::DocumentsSender; -use crate::update::new::document::write_to_obkv; +use crate::update::new::document::{write_to_obkv, Document as _}; use crate::update::new::indexer::document_changes::{ DocumentChangeContext, Extractor, FullySend, RefCellExt as _, }; use crate::update::new::DocumentChange; +use crate::vector::EmbeddingConfigs; use crate::Result; - pub struct DocumentsExtractor<'a> { - documents_sender: &'a DocumentsSender<'a>, + document_sender: &'a DocumentsSender<'a>, + embedders: &'a EmbeddingConfigs, } impl<'a> DocumentsExtractor<'a> { - pub fn new(documents_sender: &'a DocumentsSender<'a>) -> Self { - Self { documents_sender } + pub fn new(document_sender: &'a DocumentsSender<'a>, embedders: &'a EmbeddingConfigs) -> Self { + Self { document_sender, embedders } } } +#[derive(Default)] +pub struct DocumentExtractorData { + pub docids_delta: DelAddRoaringBitmap, + pub field_distribution_delta: HashMap, +} + impl<'a, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a> { - type Data = FullySend>; + type Data = FullySend>; fn init_data(&self, _extractor_alloc: &'extractor Bump) -> Result { - Ok(FullySend(RefCell::new(DelAddRoaringBitmap::empty()))) + Ok(FullySend(Default::default())) } - fn process( + fn process<'doc>( &self, - change: DocumentChange, + changes: impl Iterator>>, context: &DocumentChangeContext, ) -> Result<()> { - let mut document_buffer = Vec::new(); - let mut delta_documents_ids = context.data.0.borrow_mut_or_yield(); + let mut document_buffer = bumpalo::collections::Vec::new_in(&context.doc_alloc); + let mut document_extractor_data = context.data.0.borrow_mut_or_yield(); - let new_fields_ids_map = context.new_fields_ids_map.borrow_or_yield(); - let new_fields_ids_map = &*new_fields_ids_map; - let new_fields_ids_map = new_fields_ids_map.local_map(); + let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield(); - let external_docid = change.external_docid().to_owned(); + for change in changes { + let change = change?; + let external_docid = change.external_docid().to_owned(); - // document but we need to create a function that collects and compresses documents. - match change { - DocumentChange::Deletion(deletion) => { - let docid = deletion.docid(); - self.documents_sender.delete(docid, external_docid).unwrap(); - delta_documents_ids.insert_del_u32(docid); - } - /// TODO: change NONE by SOME(vector) when implemented - DocumentChange::Update(update) => { - let docid = update.docid(); - let content = - update.new(&context.txn, context.index, &context.db_fields_ids_map)?; - let content = - write_to_obkv(&content, None, new_fields_ids_map, &mut document_buffer)?; - self.documents_sender.uncompressed(docid, external_docid, content).unwrap(); - } - DocumentChange::Insertion(insertion) => { - let docid = insertion.docid(); - let content = insertion.new(); - let content = - write_to_obkv(&content, None, new_fields_ids_map, &mut document_buffer)?; - self.documents_sender.uncompressed(docid, external_docid, content).unwrap(); - delta_documents_ids.insert_add_u32(docid); - // extracted_dictionary_sender.send(self, dictionary: &[u8]); + // document but we need to create a function that collects and compresses documents. + match change { + DocumentChange::Deletion(deletion) => { + let docid = deletion.docid(); + let content = deletion.current( + &context.txn, + context.index, + &context.db_fields_ids_map, + )?; + for res in content.iter_top_level_fields() { + let (f, _) = res?; + let entry = document_extractor_data + .field_distribution_delta + .entry_ref(f) + .or_default(); + *entry -= 1; + } + document_extractor_data.docids_delta.insert_del_u32(docid); + self.document_sender.delete(docid, external_docid).unwrap(); + } + DocumentChange::Update(update) => { + let docid = update.docid(); + let content = + update.current(&context.txn, context.index, &context.db_fields_ids_map)?; + for res in content.iter_top_level_fields() { + let (f, _) = res?; + let entry = document_extractor_data + .field_distribution_delta + .entry_ref(f) + .or_default(); + *entry -= 1; + } + let content = update.updated(); + for res in content.iter_top_level_fields() { + let (f, _) = res?; + let entry = document_extractor_data + .field_distribution_delta + .entry_ref(f) + .or_default(); + *entry += 1; + } + + let content = + update.merged(&context.txn, context.index, &context.db_fields_ids_map)?; + let vector_content = update.merged_vectors( + &context.txn, + context.index, + &context.db_fields_ids_map, + &context.doc_alloc, + self.embedders, + )?; + let content = write_to_obkv( + &content, + vector_content.as_ref(), + &mut new_fields_ids_map, + &mut document_buffer, + )?; + self.document_sender.uncompressed(docid, external_docid, content).unwrap(); + } + DocumentChange::Insertion(insertion) => { + let docid = insertion.docid(); + let content = insertion.inserted(); + for res in content.iter_top_level_fields() { + let (f, _) = res?; + let entry = document_extractor_data + .field_distribution_delta + .entry_ref(f) + .or_default(); + *entry += 1; + } + let inserted_vectors = + insertion.inserted_vectors(&context.doc_alloc, self.embedders)?; + let content = write_to_obkv( + &content, + inserted_vectors.as_ref(), + &mut new_fields_ids_map, + &mut document_buffer, + )?; + document_extractor_data.docids_delta.insert_add_u32(docid); + self.document_sender.uncompressed(docid, external_docid, content).unwrap(); + // extracted_dictionary_sender.send(self, dictionary: &[u8]); + } } } + Ok(()) } } diff --git a/crates/milli/src/update/new/extract/faceted/extract_facets.rs b/crates/milli/src/update/new/extract/faceted/extract_facets.rs index 108e4d422..1aaae1cb8 100644 --- a/crates/milli/src/update/new/extract/faceted/extract_facets.rs +++ b/crates/milli/src/update/new/extract/faceted/extract_facets.rs @@ -228,7 +228,7 @@ impl DocidsExtractor for FacetedDocidsExtractor { grenad_parameters: GrenadParameters, document_changes: &DC, indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP, SP>, - extractor_allocs: &mut ThreadLocal>, + extractor_allocs: &'extractor mut ThreadLocal>, finished_steps: u16, total_steps: u16, step_name: &'static str, diff --git a/crates/milli/src/update/new/extract/mod.rs b/crates/milli/src/update/new/extract/mod.rs index 7f6b72c93..af6a29d07 100644 --- a/crates/milli/src/update/new/extract/mod.rs +++ b/crates/milli/src/update/new/extract/mod.rs @@ -14,7 +14,7 @@ pub use vectors::EmbeddingExtractor; use super::indexer::document_changes::{ DocumentChanges, FullySend, IndexingContext, Progress, ThreadLocal, }; -use crate::update::{GrenadParameters, MergeDeladdCboRoaringBitmaps}; +use crate::update::GrenadParameters; use crate::Result; pub trait DocidsExtractor { @@ -26,7 +26,7 @@ pub trait DocidsExtractor { finished_steps: u16, total_steps: u16, step_name: &'static str, - ) -> Result> + ) -> Result>> where MSP: Fn() -> bool + Sync, SP: Fn(Progress) + Sync; diff --git a/crates/milli/src/update/new/extract/searchable/extract_word_docids.rs b/crates/milli/src/update/new/extract/searchable/extract_word_docids.rs index 23bca784f..cadea7251 100644 --- a/crates/milli/src/update/new/extract/searchable/extract_word_docids.rs +++ b/crates/milli/src/update/new/extract/searchable/extract_word_docids.rs @@ -11,8 +11,8 @@ use super::tokenize_document::{tokenizer_builder, DocumentTokenizer}; use crate::update::new::extract::cache::BalancedCaches; use crate::update::new::extract::perm_json_p::contained_in; use crate::update::new::indexer::document_changes::{ - for_each_document_change, DocumentChangeContext, DocumentChanges, Extractor, FullySend, - IndexingContext, MostlySend, RefCellExt, ThreadLocal, + extract, DocumentChangeContext, DocumentChanges, Extractor, FullySend, IndexingContext, + MostlySend, Progress, RefCellExt, ThreadLocal, }; use crate::update::new::DocumentChange; use crate::update::GrenadParameters; @@ -218,24 +218,44 @@ impl<'a, 'extractor> Extractor<'extractor> for WordDocidsExtractorData<'a> { )))) } - fn process( + fn process<'doc>( &self, - change: DocumentChange, + changes: impl Iterator>>, context: &DocumentChangeContext, ) -> Result<()> { - WordDocidsExtractors::extract_document_change(context, self.tokenizer, change) + for change in changes { + let change = change?; + WordDocidsExtractors::extract_document_change(context, self.tokenizer, change)?; + } + Ok(()) } } pub struct WordDocidsExtractors; impl WordDocidsExtractors { - pub fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>>( + pub fn run_extraction< + 'pl, + 'fid, + 'indexer, + 'index, + 'extractor, + DC: DocumentChanges<'pl>, + MSP, + SP, + >( grenad_parameters: GrenadParameters, document_changes: &DC, - indexing_context: IndexingContext<'fid, 'indexer, 'index>, + indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP, SP>, extractor_allocs: &'extractor mut ThreadLocal>, - ) -> Result> { + finished_steps: u16, + total_steps: u16, + step_name: &'static str, + ) -> Result> + where + MSP: Fn() -> bool + Sync, + SP: Fn(Progress) + Sync, + { let index = indexing_context.index; let rtxn = index.read_txn()?; @@ -279,12 +299,15 @@ impl WordDocidsExtractors { buckets: rayon::current_num_threads(), }; - for_each_document_change( + extract( document_changes, &extractor, indexing_context, extractor_allocs, &datastore, + finished_steps, + total_steps, + step_name, )?; } @@ -358,7 +381,7 @@ impl WordDocidsExtractors { ) }; document_tokenizer.tokenize_document( - inner.new(rtxn, index, context.db_fields_ids_map)?, + inner.merged(rtxn, index, context.db_fields_ids_map)?, new_fields_ids_map, &mut token_fn, )?; @@ -375,7 +398,7 @@ impl WordDocidsExtractors { ) }; document_tokenizer.tokenize_document( - inner.new(), + inner.inserted(), new_fields_ids_map, &mut token_fn, )?; diff --git a/crates/milli/src/update/new/extract/vectors/mod.rs b/crates/milli/src/update/new/extract/vectors/mod.rs index 70bd4d42d..a5cf915e4 100644 --- a/crates/milli/src/update/new/extract/vectors/mod.rs +++ b/crates/milli/src/update/new/extract/vectors/mod.rs @@ -8,7 +8,7 @@ use super::cache::DelAddRoaringBitmap; use crate::error::FaultSource; use crate::prompt::Prompt; use crate::update::new::channel::EmbeddingSender; -use crate::update::new::indexer::document_changes::{Extractor, FullySend}; +use crate::update::new::indexer::document_changes::{Extractor, MostlySend}; use crate::update::new::vector_document::VectorDocument; use crate::update::new::DocumentChange; use crate::vector::error::{ @@ -36,15 +36,17 @@ impl<'a> EmbeddingExtractor<'a> { } } -impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> { - type Data = FullySend>>; +pub struct EmbeddingExtractorData<'extractor>( + pub HashMap, +); - fn init_data<'doc>( - &'doc self, - _extractor_alloc: raw_collections::alloc::RefBump<'extractor>, - ) -> crate::Result { - /// TODO: use the extractor_alloc in the hashbrown once you merge the branch where it is no longer a RefBump - Ok(FullySend(Default::default())) +unsafe impl MostlySend for EmbeddingExtractorData<'_> {} + +impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> { + type Data = RefCell>; + + fn init_data<'doc>(&'doc self, extractor_alloc: &'extractor Bump) -> crate::Result { + Ok(RefCell::new(EmbeddingExtractorData(HashMap::new_in(extractor_alloc)))) } fn process<'doc>( @@ -72,7 +74,7 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> { embedder_id, embedder_name, prompt, - &context.data.0, + context.data, &self.possible_embedding_mistakes, self.threads, self.sender, @@ -252,7 +254,7 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> { // Currently this is the case as: // 1. BVec are inside of the bumaplo // 2. All other fields are either trivial (u8) or references. -struct Chunks<'a> { +struct Chunks<'a, 'extractor> { texts: BVec<'a, &'a str>, ids: BVec<'a, DocumentId>, @@ -261,19 +263,19 @@ struct Chunks<'a> { embedder_name: &'a str, prompt: &'a Prompt, possible_embedding_mistakes: &'a PossibleEmbeddingMistakes, - user_provided: &'a RefCell>, + user_provided: &'a RefCell>, threads: &'a ThreadPoolNoAbort, sender: &'a EmbeddingSender<'a>, } -impl<'a> Chunks<'a> { +impl<'a, 'extractor> Chunks<'a, 'extractor> { #[allow(clippy::too_many_arguments)] pub fn new( embedder: &'a Embedder, embedder_id: u8, embedder_name: &'a str, prompt: &'a Prompt, - user_provided: &'a RefCell>, + user_provided: &'a RefCell>, possible_embedding_mistakes: &'a PossibleEmbeddingMistakes, threads: &'a ThreadPoolNoAbort, sender: &'a EmbeddingSender<'a>, @@ -417,7 +419,7 @@ impl<'a> Chunks<'a> { fn set_regenerate(&self, docid: DocumentId, regenerate: bool) { let mut user_provided = self.user_provided.borrow_mut(); - let user_provided = user_provided.entry_ref(self.embedder_name).or_default(); + let user_provided = user_provided.0.entry_ref(self.embedder_name).or_default(); if regenerate { // regenerate == !user_provided user_provided.del.get_or_insert(Default::default()).insert(docid); diff --git a/crates/milli/src/update/new/indexer/mod.rs b/crates/milli/src/update/new/indexer/mod.rs index 430313fbd..7688f29da 100644 --- a/crates/milli/src/update/new/indexer/mod.rs +++ b/crates/milli/src/update/new/indexer/mod.rs @@ -1,17 +1,17 @@ use std::cmp::Ordering; -use std::sync::RwLock; +use std::sync::{OnceLock, RwLock}; use std::thread::{self, Builder}; use big_s::S; -use document_changes::{ - for_each_document_change, DocumentChanges, FullySend, IndexingContext, ThreadLocal, -}; +use document_changes::{extract, DocumentChanges, IndexingContext, Progress, ThreadLocal}; pub use document_deletion::DocumentDeletion; pub use document_operation::DocumentOperation; +use hashbrown::HashMap; use heed::types::{Bytes, DecodeIgnore, Str}; use heed::{RoTxn, RwTxn}; use itertools::{merge_join_by, EitherOrBoth}; pub use partial_dump::PartialDump; +use rand::SeedableRng as _; use rayon::ThreadPool; use time::OffsetDateTime; pub use update_by_function::UpdateByFunction; @@ -19,37 +19,100 @@ pub use update_by_function::UpdateByFunction; use super::channel::*; use super::extract::*; use super::facet_search_builder::FacetSearchBuilder; -use super::merger::{FacetDatabases, FacetFieldIdsDelta}; -use super::word_fst_builder::PrefixDelta; +use super::merger::FacetFieldIdsDelta; +use super::word_fst_builder::{PrefixData, PrefixDelta, WordFstBuilder}; use super::words_prefix_docids::{ compute_word_prefix_docids, compute_word_prefix_fid_docids, compute_word_prefix_position_docids, }; use super::{StdResult, TopLevelMap}; use crate::documents::{PrimaryKey, DEFAULT_PRIMARY_KEY}; use crate::facet::FacetType; +use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder}; use crate::index::main_key::{WORDS_FST_KEY, WORDS_PREFIXES_FST_KEY}; use crate::proximity::ProximityPrecision; use crate::update::del_add::DelAdd; -use crate::update::new::word_fst_builder::{PrefixData, WordFstBuilder}; +use crate::update::new::extract::EmbeddingExtractor; use crate::update::new::words_prefix_docids::compute_exact_word_prefix_docids; -use crate::update::new::{merge_and_send_docids, merge_and_send_facet_docids}; +use crate::update::new::{merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases}; use crate::update::settings::InnerIndexSettings; use crate::update::{FacetsUpdateBulk, GrenadParameters}; -use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, Result, UserError}; +use crate::vector::{ArroyWrapper, EmbeddingConfigs, Embeddings}; +use crate::{ + FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result, ThreadPoolNoAbort, + ThreadPoolNoAbortBuilder, UserError, +}; -pub mod de; +pub(crate) mod de; pub mod document_changes; mod document_deletion; mod document_operation; mod partial_dump; mod update_by_function; +mod steps { + pub const STEPS: &[&str] = &[ + "extracting documents", + "extracting facets", + "extracting words", + "extracting word proximity", + "extracting embeddings", + "writing to database", + "post-processing facets", + "post-processing words", + "finalizing", + ]; + + const fn step(step: u16) -> (u16, &'static str) { + (step, STEPS[step as usize]) + } + + pub const fn total_steps() -> u16 { + STEPS.len() as u16 + } + + pub const fn extract_documents() -> (u16, &'static str) { + step(0) + } + + pub const fn extract_facets() -> (u16, &'static str) { + step(1) + } + + pub const fn extract_words() -> (u16, &'static str) { + step(2) + } + + pub const fn extract_word_proximity() -> (u16, &'static str) { + step(3) + } + + pub const fn extract_embeddings() -> (u16, &'static str) { + step(4) + } + + pub const fn write_db() -> (u16, &'static str) { + step(5) + } + + pub const fn post_processing_facets() -> (u16, &'static str) { + step(6) + } + pub const fn post_processing_words() -> (u16, &'static str) { + step(7) + } + + pub const fn finalizing() -> (u16, &'static str) { + step(8) + } +} + /// This is the main function of this crate. /// /// Give it the output of the [`Indexer::document_changes`] method and it will execute it in the [`rayon::ThreadPool`]. /// /// TODO return stats -pub fn index<'pl, 'indexer, 'index, DC>( +#[allow(clippy::too_many_arguments)] // clippy: 😝 +pub fn index<'pl, 'indexer, 'index, DC, MSP, SP>( wtxn: &mut RwTxn, index: &'index Index, db_fields_ids_map: &'indexer FieldsIdsMap, @@ -57,15 +120,23 @@ pub fn index<'pl, 'indexer, 'index, DC>( new_primary_key: Option>, pool: &ThreadPool, document_changes: &DC, + embedders: EmbeddingConfigs, + must_stop_processing: &'indexer MSP, + send_progress: &'indexer SP, ) -> Result<()> where DC: DocumentChanges<'pl>, + MSP: Fn() -> bool + Sync, + SP: Fn(Progress) + Sync, { - // TODO find a better channel limit let (extractor_sender, writer_receiver) = extractor_writer_channel(10_000); + + let metadata_builder = MetadataBuilder::from_index(index, wtxn)?; + + let new_fields_ids_map = FieldIdMapWithMetadata::new(new_fields_ids_map, metadata_builder); + let new_fields_ids_map = RwLock::new(new_fields_ids_map); - let global_fields_ids_map = GlobalFieldsIdsMap::new(&new_fields_ids_map); let fields_ids_map_store = ThreadLocal::with_capacity(pool.current_num_threads()); let mut extractor_allocs = ThreadLocal::with_capacity(pool.current_num_threads()); let doc_allocs = ThreadLocal::with_capacity(pool.current_num_threads()); @@ -76,46 +147,69 @@ where new_fields_ids_map: &new_fields_ids_map, doc_allocs: &doc_allocs, fields_ids_map_store: &fields_ids_map_store, + must_stop_processing, + send_progress, }; - thread::scope(|s| -> crate::Result<_> { + let total_steps = steps::total_steps(); + + let mut field_distribution = index.field_distribution(wtxn)?; + let mut document_ids = index.documents_ids(wtxn)?; + + thread::scope(|s| -> Result<()> { let indexer_span = tracing::Span::current(); + let embedders = &embedders; + // prevent moving the field_distribution and document_ids in the inner closure... + let field_distribution = &mut field_distribution; + let document_ids = &mut document_ids; // TODO manage the errors correctly let extractor_handle = Builder::new().name(S("indexer-extractors")).spawn_scoped(s, move || { pool.in_place_scope(|_s| { - let span = tracing::trace_span!(target: "indexing::documents", parent: &indexer_span, "extract"); - let _entered = span.enter(); + let span = tracing::trace_span!(target: "indexing::documents", parent: &indexer_span, "extract"); + let _entered = span.enter(); - // document but we need to create a function that collects and compresses documents. - let rtxn = index.read_txn().unwrap(); - let document_sender = extractor_sender.documents(); - let document_extractor = DocumentsExtractor::new(&document_sender); - let datastore = ThreadLocal::with_capacity(pool.current_num_threads()); - for_each_document_change(document_changes, &document_extractor, indexing_context, &mut extractor_allocs, &datastore)?; + let rtxn = index.read_txn()?; - let mut documents_ids = index.documents_ids(&rtxn)?; - let delta_documents_ids = datastore.into_iter().map(|FullySend(d)| d.into_inner()).reduce(DelAddRoaringBitmap::merge).unwrap_or_default(); - delta_documents_ids.apply_to(&mut documents_ids); - extractor_sender.send_documents_ids(documents_ids).unwrap(); + // document but we need to create a function that collects and compresses documents. + let document_sender = extractor_sender.documents(); + let document_extractor = DocumentsExtractor::new(&document_sender, embedders); + let datastore = ThreadLocal::with_capacity(pool.current_num_threads()); + let (finished_steps, step_name) = steps::extract_documents(); + extract(document_changes, &document_extractor, indexing_context, &mut extractor_allocs, &datastore, finished_steps, total_steps, step_name)?; - // document_sender.finish().unwrap(); - const TEN_GIB: usize = 10 * 1024 * 1024 * 1024; - let current_num_threads = rayon::current_num_threads(); - let max_memory = TEN_GIB / current_num_threads; - eprintln!("A maximum of {max_memory} bytes will be used for each of the {current_num_threads} threads"); - let grenad_parameters = GrenadParameters { - max_memory: Some(max_memory), - ..GrenadParameters::default() - }; + for document_extractor_data in datastore { + let document_extractor_data = document_extractor_data.0.into_inner(); + for (field, delta) in document_extractor_data.field_distribution_delta { + let current = field_distribution.entry(field).or_default(); + // adding the delta should never cause a negative result, as we are removing fields that previously existed. + *current = current.saturating_add_signed(delta); + } + document_extractor_data.docids_delta.apply_to(document_ids); + } - let facet_field_ids_delta; + field_distribution.retain(|_, v| *v == 0); + + const TEN_GIB: usize = 10 * 1024 * 1024 * 1024; + let current_num_threads = rayon::current_num_threads(); + let max_memory = TEN_GIB / current_num_threads; + eprintln!("A maximum of {max_memory} bytes will be used for each of the {current_num_threads} threads"); + + let grenad_parameters = GrenadParameters { + max_memory: Some(max_memory), + ..GrenadParameters::default() + }; + + let facet_field_ids_delta; { let span = tracing::trace_span!(target: "indexing::documents::extract", "faceted"); let _entered = span.enter(); + + let (finished_steps, step_name) = steps::extract_facets(); + facet_field_ids_delta = merge_and_send_facet_docids( - FacetedDocidsExtractor::run_extraction(grenad_parameters, document_changes, indexing_context, &mut extractor_allocs)?, + FacetedDocidsExtractor::run_extraction(grenad_parameters, document_changes, indexing_context, &mut extractor_allocs, finished_steps, total_steps, step_name)?, FacetDatabases::new(index), index, extractor_sender.facet_docids(), @@ -125,6 +219,7 @@ where { let span = tracing::trace_span!(target: "indexing::documents::extract", "word_docids"); let _entered = span.enter(); + let (finished_steps, step_name) = steps::extract_words(); let WordDocidsCaches { word_docids, @@ -132,7 +227,7 @@ where exact_word_docids, word_position_docids, fid_word_count_docids, - } = WordDocidsExtractors::run_extraction(grenad_parameters, document_changes, indexing_context, &mut extractor_allocs)?; + } = WordDocidsExtractors::run_extraction(grenad_parameters, document_changes, indexing_context, &mut extractor_allocs, finished_steps, total_steps, step_name)?; // TODO Word Docids Merger // extractor_sender.send_searchable::(word_docids).unwrap(); @@ -206,7 +301,10 @@ where if proximity_precision == ProximityPrecision::ByWord { let span = tracing::trace_span!(target: "indexing::documents::extract", "word_pair_proximity_docids"); let _entered = span.enter(); - let caches = ::run_extraction(grenad_parameters, document_changes, indexing_context, &mut extractor_allocs)?; + + let (finished_steps, step_name) = steps::extract_word_proximity(); + + let caches = ::run_extraction(grenad_parameters, document_changes, indexing_context, &mut extractor_allocs, finished_steps, total_steps, step_name)?; merge_and_send_docids( caches, index.word_pair_proximity_docids.remap_types(), @@ -215,62 +313,212 @@ where )?; } - { - let span = tracing::trace_span!(target: "indexing::documents::extract", "FINISH"); - let _entered = span.enter(); - } + 'vectors: { + let span = tracing::trace_span!(target: "indexing::documents::extract", "vectors"); + let _entered = span.enter(); - // TODO THIS IS TOO MUCH - // - [ ] Extract fieldid docid facet number - // - [ ] Extract fieldid docid facet string - // - [ ] Extract facetid string fst - // - [ ] Extract facetid normalized string strings + let index_embeddings = index.embedding_configs(&rtxn)?; + if index_embeddings.is_empty() { + break 'vectors; + } - // TODO Inverted Indexes again - // - [x] Extract fieldid facet isempty docids - // - [x] Extract fieldid facet isnull docids - // - [x] Extract fieldid facet exists docids + let embedding_sender = extractor_sender.embeddings(); + let extractor = EmbeddingExtractor::new(embedders, &embedding_sender, field_distribution, request_threads()); + let datastore = ThreadLocal::with_capacity(pool.current_num_threads()); + let (finished_steps, step_name) = steps::extract_embeddings(); - // TODO This is the normal system - // - [x] Extract fieldid facet number docids - // - [x] Extract fieldid facet string docids - // TODO use None when needed - Result::Ok(facet_field_ids_delta) - }) + extract(document_changes, &extractor, indexing_context, &mut extractor_allocs, &datastore, finished_steps, total_steps, step_name)?; + + + let mut user_provided = HashMap::new(); + for data in datastore { + let data = data.into_inner().0; + for (embedder, deladd) in data.into_iter() { + let user_provided = user_provided.entry(embedder).or_insert(Default::default()); + if let Some(del) = deladd.del { + *user_provided -= del; + } + if let Some(add) = deladd.add { + *user_provided |= add; + } + } + } + + embedding_sender.finish(user_provided).unwrap(); + } + + { + let span = tracing::trace_span!(target: "indexing::documents::extract", "FINISH"); + let _entered = span.enter(); + let (finished_steps, step_name) = steps::write_db(); + (indexing_context.send_progress)(Progress { finished_steps, total_steps, step_name, finished_total_documents: None }); + } + + // TODO THIS IS TOO MUCH + // - [ ] Extract fieldid docid facet number + // - [ ] Extract fieldid docid facet string + // - [ ] Extract facetid string fst + // - [ ] Extract facetid normalized string strings + + // TODO Inverted Indexes again + // - [x] Extract fieldid facet isempty docids + // - [x] Extract fieldid facet isnull docids + // - [x] Extract fieldid facet exists docids + + // TODO This is the normal system + // - [x] Extract fieldid facet number docids + // - [x] Extract fieldid facet string docids + + Result::Ok(facet_field_ids_delta) + }) })?; + let global_fields_ids_map = GlobalFieldsIdsMap::new(&new_fields_ids_map); + + let indexer_span = tracing::Span::current(); + + let vector_arroy = index.vector_arroy; + let mut rng = rand::rngs::StdRng::seed_from_u64(42); + let indexer_span = tracing::Span::current(); + let arroy_writers: Result> = embedders + .inner_as_ref() + .iter() + .map(|(embedder_name, (embedder, _, was_quantized))| { + let embedder_index = index.embedder_category_id.get(wtxn, embedder_name)?.ok_or( + InternalError::DatabaseMissingEntry { + db_name: "embedder_category_id", + key: None, + }, + )?; + + let dimensions = embedder.dimensions(); + let writer = ArroyWrapper::new(vector_arroy, embedder_index, *was_quantized); + + Ok(( + embedder_index, + (embedder_name.as_str(), embedder.as_ref(), writer, dimensions), + )) + }) + .collect(); + + let mut arroy_writers = arroy_writers?; for operation in writer_receiver { - let database = operation.database(index); - match operation.entry() { - EntryOperation::Delete(e) => { - if !database.delete(wtxn, e.entry())? { - unreachable!("We tried to delete an unknown key") + match operation { + WriterOperation::DbOperation(db_operation) => { + let database = db_operation.database(index); + match db_operation.entry() { + EntryOperation::Delete(e) => { + if !database.delete(wtxn, e.entry())? { + unreachable!("We tried to delete an unknown key") + } + } + EntryOperation::Write(e) => database.put(wtxn, e.key(), e.value())?, } } - EntryOperation::Write(e) => database.put(wtxn, e.key(), e.value())?, + WriterOperation::ArroyOperation(arroy_operation) => match arroy_operation { + ArroyOperation::DeleteVectors { docid } => { + for (_embedder_index, (_embedder_name, _embedder, writer, dimensions)) in + &mut arroy_writers + { + let dimensions = *dimensions; + writer.del_items(wtxn, dimensions, docid)?; + } + } + ArroyOperation::SetVectors { + docid, + embedder_id, + embeddings: raw_embeddings, + } => { + let (_, _, writer, dimensions) = + arroy_writers.get(&embedder_id).expect("requested a missing embedder"); + // TODO: switch to Embeddings + let mut embeddings = Embeddings::new(*dimensions); + for embedding in raw_embeddings { + embeddings.append(embedding).unwrap(); + } + + writer.del_items(wtxn, *dimensions, docid)?; + writer.add_items(wtxn, docid, &embeddings)?; + } + ArroyOperation::SetVector { docid, embedder_id, embedding } => { + let (_, _, writer, dimensions) = + arroy_writers.get(&embedder_id).expect("requested a missing embedder"); + writer.del_items(wtxn, *dimensions, docid)?; + writer.add_item(wtxn, docid, &embedding)?; + } + ArroyOperation::Finish { mut user_provided } => { + let span = tracing::trace_span!(target: "indexing::vectors", parent: &indexer_span, "build"); + let _entered = span.enter(); + for (_embedder_index, (_embedder_name, _embedder, writer, dimensions)) in + &mut arroy_writers + { + let dimensions = *dimensions; + writer.build_and_quantize( + wtxn, + &mut rng, + dimensions, + false, + &indexing_context.must_stop_processing, + )?; + } + + let mut configs = index.embedding_configs(wtxn)?; + + for config in &mut configs { + if let Some(user_provided) = user_provided.remove(&config.name) { + config.user_provided = user_provided; + } + } + + index.put_embedding_configs(wtxn, configs)?; + } + }, } } - /// TODO handle the panicking threads let facet_field_ids_delta = extractor_handle.join().unwrap()?; + let (finished_steps, step_name) = steps::post_processing_facets(); + (indexing_context.send_progress)(Progress { + finished_steps, + total_steps, + step_name, + finished_total_documents: None, + }); + + compute_facet_level_database(index, wtxn, facet_field_ids_delta)?; + + compute_facet_search_database(index, wtxn, global_fields_ids_map)?; + + let (finished_steps, step_name) = steps::post_processing_words(); + (indexing_context.send_progress)(Progress { + finished_steps, + total_steps, + step_name, + finished_total_documents: None, + }); + if let Some(prefix_delta) = compute_word_fst(index, wtxn)? { compute_prefix_database(index, wtxn, prefix_delta)?; } - compute_facet_search_database(index, wtxn, global_fields_ids_map)?; + let (finished_steps, step_name) = steps::finalizing(); + (indexing_context.send_progress)(Progress { + finished_steps, + total_steps, + step_name, + finished_total_documents: None, + }); - compute_facet_level_database(index, wtxn, facet_field_ids_delta)?; - - Result::Ok(()) + Ok(()) as Result<_> })?; // required to into_inner the new_fields_ids_map drop(fields_ids_map_store); - let fields_ids_map = new_fields_ids_map.into_inner().unwrap(); - index.put_fields_ids_map(wtxn, &fields_ids_map)?; + let new_fields_ids_map = new_fields_ids_map.into_inner().unwrap(); + index.put_fields_ids_map(wtxn, new_fields_ids_map.as_fields_ids_map())?; if let Some(new_primary_key) = new_primary_key { index.put_primary_key(wtxn, new_primary_key.name())?; @@ -280,7 +528,8 @@ where let mut inner_index_settings = InnerIndexSettings::from_index(index, wtxn)?; inner_index_settings.recompute_facets(wtxn, index)?; inner_index_settings.recompute_searchables(wtxn, index)?; - + index.put_field_distribution(wtxn, &field_distribution)?; + index.put_documents_ids(wtxn, &document_ids)?; index.set_updated_at(wtxn, &OffsetDateTime::now_utc())?; Ok(()) @@ -517,3 +766,15 @@ pub fn retrieve_or_guess_primary_key<'a>( Err(err) => Ok(Err(err)), } } + +fn request_threads() -> &'static ThreadPoolNoAbort { + static REQUEST_THREADS: OnceLock = OnceLock::new(); + + REQUEST_THREADS.get_or_init(|| { + ThreadPoolNoAbortBuilder::new() + .num_threads(crate::vector::REQUEST_PARALLELISM) + .thread_name(|index| format!("embedding-request-{index}")) + .build() + .unwrap() + }) +} diff --git a/crates/milli/src/update/new/vector_document.rs b/crates/milli/src/update/new/vector_document.rs index 6796134db..dc73c5268 100644 --- a/crates/milli/src/update/new/vector_document.rs +++ b/crates/milli/src/update/new/vector_document.rs @@ -14,7 +14,7 @@ use crate::index::IndexEmbeddingConfig; use crate::vector::parsed_vectors::{ RawVectors, VectorOrArrayOfVectors, RESERVED_VECTORS_FIELD_NAME, }; -use crate::vector::{Embedding, EmbeddingConfigs}; +use crate::vector::{ArroyWrapper, Embedding, EmbeddingConfigs}; use crate::{DocumentId, Index, InternalError, Result, UserError}; #[derive(Serialize)] @@ -117,16 +117,10 @@ impl<'t> VectorDocumentFromDb<'t> { embedder_id: u8, config: &IndexEmbeddingConfig, ) -> Result> { - let readers = self.index.arroy_readers(self.rtxn, embedder_id, config.config.quantized()); - let mut vectors = Vec::new(); - for reader in readers { - let reader = reader?; - let Some(vector) = reader.item_vector(self.rtxn, self.docid)? else { - break; - }; + let reader = + ArroyWrapper::new(self.index.vector_arroy, embedder_id, config.config.quantized()); + let vectors = reader.item_vectors(self.rtxn, self.docid)?; - vectors.push(vector); - } Ok(VectorEntry { has_configured_embedder: true, embeddings: Some(Embeddings::FromDb(vectors)), diff --git a/crates/milli/src/vector/error.rs b/crates/milli/src/vector/error.rs index d5e0697d6..41765f6ab 100644 --- a/crates/milli/src/vector/error.rs +++ b/crates/milli/src/vector/error.rs @@ -454,7 +454,7 @@ impl UnusedVectorsDistribution { } pub struct UnusedVectorsDistributionBump<'doc>( - hashbrown::HashMap<&'doc str, u64, hashbrown::hash_map::DefaultHashBuilder, &'doc Bump>, + hashbrown::HashMap<&'doc str, u64, hashbrown::DefaultHashBuilder, &'doc Bump>, ); impl<'doc> UnusedVectorsDistributionBump<'doc> { diff --git a/milli/src/update/new/indexer/mod.rs b/milli/src/update/new/indexer/mod.rs deleted file mode 100644 index 3bee9904f..000000000 --- a/milli/src/update/new/indexer/mod.rs +++ /dev/null @@ -1,823 +0,0 @@ -use std::cell::RefCell; -use std::sync::{OnceLock, RwLock}; -use std::thread::{self, Builder}; - -use big_s::S; -use bumpalo::Bump; -use document_changes::{ - extract, DocumentChangeContext, DocumentChanges, Extractor, FullySend, IndexingContext, - Progress, RefCellExt, ThreadLocal, -}; -pub use document_deletion::DocumentDeletion; -pub use document_operation::DocumentOperation; -use hashbrown::HashMap; -use heed::{RoTxn, RwTxn}; -use itertools::{EitherOrBoth, Itertools}; -pub use partial_dump::PartialDump; -use rand::SeedableRng as _; -use rayon::ThreadPool; -use time::OffsetDateTime; -pub use update_by_function::UpdateByFunction; - -use super::channel::*; -use super::document::{write_to_obkv, Document}; -use super::document_change::DocumentChange; -use super::extract::*; -use super::merger::{merge_grenad_entries, FacetFieldIdsDelta}; -use super::word_fst_builder::PrefixDelta; -use super::words_prefix_docids::{ - compute_word_prefix_docids, compute_word_prefix_fid_docids, compute_word_prefix_position_docids, -}; -use super::{StdResult, TopLevelMap}; -use crate::documents::{PrimaryKey, DEFAULT_PRIMARY_KEY}; -use crate::facet::FacetType; -use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder}; -use crate::proximity::ProximityPrecision; -use crate::update::new::channel::ExtractorSender; -use crate::update::new::extract::EmbeddingExtractor; -use crate::update::new::words_prefix_docids::compute_exact_word_prefix_docids; -use crate::update::settings::InnerIndexSettings; -use crate::update::{FacetsUpdateBulk, GrenadParameters}; -use crate::vector::{ArroyWrapper, EmbeddingConfigs}; -use crate::{ - FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result, ThreadPoolNoAbort, - ThreadPoolNoAbortBuilder, UserError, -}; - -pub(crate) mod de; -pub mod document_changes; -mod document_deletion; -mod document_operation; -mod partial_dump; -mod update_by_function; - -struct DocumentExtractor<'a> { - document_sender: &'a DocumentSender<'a>, - embedders: &'a EmbeddingConfigs, -} - -impl<'a, 'extractor> Extractor<'extractor> for DocumentExtractor<'a> { - type Data = FullySend>>; - - fn init_data( - &self, - _extractor_alloc: raw_collections::alloc::RefBump<'extractor>, - ) -> Result { - Ok(FullySend(Default::default())) - } - - fn process<'doc>( - &self, - changes: impl Iterator>>, - context: &DocumentChangeContext, - ) -> Result<()> { - let mut document_buffer = Vec::new(); - let mut field_distribution_delta = context.data.0.borrow_mut_or_yield(); - - let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield(); - - for change in changes { - let change = change?; - let external_docid = change.external_docid().to_owned(); - - // document but we need to create a function that collects and compresses documents. - match change { - DocumentChange::Deletion(deletion) => { - let docid = deletion.docid(); - let content = deletion.current( - &context.txn, - context.index, - &context.db_fields_ids_map, - )?; - for res in content.iter_top_level_fields() { - let (f, _) = res?; - let entry = field_distribution_delta.entry_ref(f).or_default(); - *entry -= 1; - } - self.document_sender.delete(docid, external_docid).unwrap(); - } - DocumentChange::Update(update) => { - let docid = update.docid(); - let content = - update.current(&context.txn, context.index, &context.db_fields_ids_map)?; - for res in content.iter_top_level_fields() { - let (f, _) = res?; - let entry = field_distribution_delta.entry_ref(f).or_default(); - *entry -= 1; - } - let content = update.updated(); - for res in content.iter_top_level_fields() { - let (f, _) = res?; - let entry = field_distribution_delta.entry_ref(f).or_default(); - *entry += 1; - } - - let content = - update.merged(&context.txn, context.index, &context.db_fields_ids_map)?; - let vector_content = update.merged_vectors( - &context.txn, - context.index, - &context.db_fields_ids_map, - &context.doc_alloc, - self.embedders, - )?; - let content = write_to_obkv( - &content, - vector_content.as_ref(), - &mut new_fields_ids_map, - &mut document_buffer, - )?; - self.document_sender.insert(docid, external_docid, content.boxed()).unwrap(); - } - DocumentChange::Insertion(insertion) => { - let docid = insertion.docid(); - let content = insertion.inserted(); - for res in content.iter_top_level_fields() { - let (f, _) = res?; - let entry = field_distribution_delta.entry_ref(f).or_default(); - *entry += 1; - } - let inserted_vectors = - insertion.inserted_vectors(&context.doc_alloc, self.embedders)?; - let content = write_to_obkv( - &content, - inserted_vectors.as_ref(), - &mut new_fields_ids_map, - &mut document_buffer, - )?; - self.document_sender.insert(docid, external_docid, content.boxed()).unwrap(); - // extracted_dictionary_sender.send(self, dictionary: &[u8]); - } - } - } - - Ok(()) - } -} - -mod steps { - pub const STEPS: &[&str] = &[ - "extracting documents", - "extracting facets", - "extracting words", - "extracting word proximity", - "extracting embeddings", - "writing to database", - "post-processing facets", - "post-processing words", - "finalizing", - ]; - - const fn step(step: u16) -> (u16, &'static str) { - (step, STEPS[step as usize]) - } - - pub const fn total_steps() -> u16 { - STEPS.len() as u16 - } - - pub const fn extract_documents() -> (u16, &'static str) { - step(0) - } - - pub const fn extract_facets() -> (u16, &'static str) { - step(1) - } - - pub const fn extract_words() -> (u16, &'static str) { - step(2) - } - - pub const fn extract_word_proximity() -> (u16, &'static str) { - step(3) - } - - pub const fn extract_embeddings() -> (u16, &'static str) { - step(4) - } - - pub const fn write_db() -> (u16, &'static str) { - step(5) - } - - pub const fn post_processing_facets() -> (u16, &'static str) { - step(6) - } - pub const fn post_processing_words() -> (u16, &'static str) { - step(7) - } - - pub const fn finalizing() -> (u16, &'static str) { - step(8) - } -} - -/// This is the main function of this crate. -/// -/// Give it the output of the [`Indexer::document_changes`] method and it will execute it in the [`rayon::ThreadPool`]. -/// -/// TODO return stats -#[allow(clippy::too_many_arguments)] // clippy: 😝 -pub fn index<'pl, 'indexer, 'index, DC, MSP, SP>( - wtxn: &mut RwTxn, - index: &'index Index, - db_fields_ids_map: &'indexer FieldsIdsMap, - new_fields_ids_map: FieldsIdsMap, - new_primary_key: Option>, - pool: &ThreadPool, - document_changes: &DC, - embedders: EmbeddingConfigs, - must_stop_processing: &'indexer MSP, - send_progress: &'indexer SP, -) -> Result<()> -where - DC: DocumentChanges<'pl>, - MSP: Fn() -> bool + Sync, - SP: Fn(Progress) + Sync, -{ - let (merger_sender, writer_receiver) = merger_writer_channel(10_000); - // This channel acts as a rendezvous point to ensure that we are one task ahead - let (extractor_sender, merger_receiver) = extractors_merger_channels(4); - - let metadata_builder = MetadataBuilder::from_index(index, wtxn)?; - - let new_fields_ids_map = FieldIdMapWithMetadata::new(new_fields_ids_map, metadata_builder); - - let new_fields_ids_map = RwLock::new(new_fields_ids_map); - - let fields_ids_map_store = ThreadLocal::with_capacity(pool.current_num_threads()); - let mut extractor_allocs = ThreadLocal::with_capacity(pool.current_num_threads()); - let doc_allocs = ThreadLocal::with_capacity(pool.current_num_threads()); - - let indexing_context = IndexingContext { - index, - db_fields_ids_map, - new_fields_ids_map: &new_fields_ids_map, - doc_allocs: &doc_allocs, - fields_ids_map_store: &fields_ids_map_store, - must_stop_processing, - send_progress, - }; - - let total_steps = steps::total_steps(); - - let mut field_distribution = index.field_distribution(wtxn)?; - - thread::scope(|s| -> Result<()> { - let indexer_span = tracing::Span::current(); - let embedders = &embedders; - // prevent moving the field_distribution in the inner closure... - let field_distribution = &mut field_distribution; - // TODO manage the errors correctly - let handle = Builder::new().name(S("indexer-extractors")).spawn_scoped(s, move || { - pool.in_place_scope(|_s| { - let span = tracing::trace_span!(target: "indexing::documents", parent: &indexer_span, "extract"); - let _entered = span.enter(); - - // document but we need to create a function that collects and compresses documents. - let document_sender = extractor_sender.document_sender(); - let document_extractor = DocumentExtractor { document_sender: &document_sender, embedders }; - let datastore = ThreadLocal::with_capacity(pool.current_num_threads()); - let (finished_steps, step_name) = steps::extract_documents(); - extract(document_changes, &document_extractor, indexing_context, &mut extractor_allocs, &datastore, finished_steps, total_steps, step_name)?; - - for field_distribution_delta in datastore { - let field_distribution_delta = field_distribution_delta.0.into_inner(); - for (field, delta) in field_distribution_delta { - let current = field_distribution.entry(field).or_default(); - // adding the delta should never cause a negative result, as we are removing fields that previously existed. - *current = current.saturating_add_signed(delta); - } - } - - field_distribution.retain(|_, v| *v == 0); - - document_sender.finish().unwrap(); - - const TEN_GIB: usize = 10 * 1024 * 1024 * 1024; - let max_memory = TEN_GIB / dbg!(rayon::current_num_threads()); - let grenad_parameters = GrenadParameters { - max_memory: Some(max_memory), - ..GrenadParameters::default() - }; - - { - let span = tracing::trace_span!(target: "indexing::documents::extract", "faceted"); - let _entered = span.enter(); - let (finished_steps, step_name) = steps::extract_facets(); - extract_and_send_docids::< - _, - FacetedDocidsExtractor, - FacetDocids, - _, - _ - >( - grenad_parameters, - document_changes, - indexing_context, - &mut extractor_allocs, - &extractor_sender, - finished_steps, - total_steps, - step_name - )?; - } - - { - let span = tracing::trace_span!(target: "indexing::documents::extract", "word_docids"); - let _entered = span.enter(); - let (finished_steps, step_name) = steps::extract_words(); - - let WordDocidsMergers { - word_fid_docids, - word_docids, - exact_word_docids, - word_position_docids, - fid_word_count_docids, - } = WordDocidsExtractors::run_extraction(grenad_parameters, document_changes, indexing_context, &mut extractor_allocs, finished_steps, total_steps, step_name)?; - extractor_sender.send_searchable::(word_docids).unwrap(); - extractor_sender.send_searchable::(word_fid_docids).unwrap(); - extractor_sender.send_searchable::(exact_word_docids).unwrap(); - extractor_sender.send_searchable::(word_position_docids).unwrap(); - extractor_sender.send_searchable::(fid_word_count_docids).unwrap(); - } - - // run the proximity extraction only if the precision is by word - // this works only if the settings didn't change during this transaction. - let rtxn = index.read_txn().unwrap(); - let proximity_precision = index.proximity_precision(&rtxn)?.unwrap_or_default(); - if proximity_precision == ProximityPrecision::ByWord { - let span = tracing::trace_span!(target: "indexing::documents::extract", "word_pair_proximity_docids"); - let _entered = span.enter(); - let (finished_steps, step_name) = steps::extract_word_proximity(); - - - extract_and_send_docids::< - _, - WordPairProximityDocidsExtractor, - WordPairProximityDocids, - _, - _ - >( - grenad_parameters, - document_changes, - indexing_context, - &mut extractor_allocs, - &extractor_sender, - finished_steps, - total_steps, - step_name, - )?; - } - - 'vectors: { - let span = tracing::trace_span!(target: "indexing::documents::extract", "vectors"); - let _entered = span.enter(); - - let index_embeddings = index.embedding_configs(&rtxn)?; - if index_embeddings.is_empty() { - break 'vectors; - } - /// FIXME: need access to `merger_sender` - let embedding_sender = todo!(); - let extractor = EmbeddingExtractor::new(embedders, &embedding_sender, &field_distribution, request_threads()); - let datastore = ThreadLocal::with_capacity(pool.current_num_threads()); - let (finished_steps, step_name) = steps::extract_embeddings(); - - - extract(document_changes, &extractor, indexing_context, &mut extractor_allocs, &datastore, finished_steps, total_steps, step_name)?; - - - let mut user_provided = HashMap::new(); - for data in datastore { - let data = data.0.into_inner(); - for (embedder, deladd) in data.into_iter() { - let user_provided = user_provided.entry(embedder).or_insert(Default::default()); - if let Some(del) = deladd.del { - *user_provided -= del; - } - if let Some(add) = deladd.add { - *user_provided |= add; - } - } - } - - embedding_sender.finish(user_provided).unwrap(); - } - - { - let span = tracing::trace_span!(target: "indexing::documents::extract", "FINISH"); - let _entered = span.enter(); - let (finished_steps, step_name) = steps::write_db(); - (indexing_context.send_progress)(Progress { finished_steps, total_steps, step_name, finished_total_documents: None }); - } - - // TODO THIS IS TOO MUCH - // - [ ] Extract fieldid docid facet number - // - [ ] Extract fieldid docid facet string - // - [ ] Extract facetid string fst - // - [ ] Extract facetid normalized string strings - - // TODO Inverted Indexes again - // - [x] Extract fieldid facet isempty docids - // - [x] Extract fieldid facet isnull docids - // - [x] Extract fieldid facet exists docids - - // TODO This is the normal system - // - [x] Extract fieldid facet number docids - // - [x] Extract fieldid facet string docids - - Ok(()) as Result<_> - }) - })?; - - let global_fields_ids_map = GlobalFieldsIdsMap::new(&new_fields_ids_map); - - let indexer_span = tracing::Span::current(); - // TODO manage the errors correctly - let merger_thread = Builder::new().name(S("indexer-merger")).spawn_scoped(s, move || { - let span = - tracing::trace_span!(target: "indexing::documents", parent: &indexer_span, "merge"); - let _entered = span.enter(); - let rtxn = index.read_txn().unwrap(); - merge_grenad_entries( - merger_receiver, - merger_sender, - &rtxn, - index, - global_fields_ids_map, - ) - })?; - - let vector_arroy = index.vector_arroy; - let mut rng = rand::rngs::StdRng::seed_from_u64(42); - let indexer_span = tracing::Span::current(); - let arroy_writers: Result> = embedders - .inner_as_ref() - .iter() - .map(|(embedder_name, (embedder, _, was_quantized))| { - let embedder_index = index.embedder_category_id.get(wtxn, embedder_name)?.ok_or( - InternalError::DatabaseMissingEntry { - db_name: "embedder_category_id", - key: None, - }, - )?; - - let dimensions = embedder.dimensions(); - - let writers: Vec<_> = crate::vector::arroy_db_range_for_embedder(embedder_index) - .map(|k| ArroyWrapper::new(vector_arroy, k, *was_quantized)) - .collect(); - - Ok(( - embedder_index, - (embedder_name.as_str(), embedder.as_ref(), writers, dimensions), - )) - }) - .collect(); - - let mut arroy_writers = arroy_writers?; - for operation in writer_receiver { - match operation { - WriterOperation::DbOperation(db_operation) => { - let database = db_operation.database(index); - match db_operation.entry() { - EntryOperation::Delete(e) => { - if !database.delete(wtxn, e.entry())? { - unreachable!("We tried to delete an unknown key") - } - } - EntryOperation::Write(e) => database.put(wtxn, e.key(), e.value())?, - } - } - WriterOperation::ArroyOperation(arroy_operation) => match arroy_operation { - ArroyOperation::DeleteVectors { docid } => { - for (_embedder_index, (_embedder_name, _embedder, writers, dimensions)) in - &mut arroy_writers - { - let dimensions = *dimensions; - for writer in writers { - // Uses invariant: vectors are packed in the first writers. - if !writer.del_item(wtxn, dimensions, docid)? { - break; - } - } - } - } - ArroyOperation::SetVectors { docid, embedder_id, embeddings } => { - let (_, _, writers, dimensions) = - arroy_writers.get(&embedder_id).expect("requested a missing embedder"); - for res in writers.iter().zip_longest(&embeddings) { - match res { - EitherOrBoth::Both(writer, embedding) => { - writer.add_item(wtxn, *dimensions, docid, embedding)?; - } - EitherOrBoth::Left(writer) => { - let deleted = writer.del_item(wtxn, *dimensions, docid)?; - if !deleted { - break; - } - } - EitherOrBoth::Right(_embedding) => { - let external_document_id = index - .external_id_of(wtxn, std::iter::once(docid))? - .into_iter() - .next() - .unwrap()?; - return Err(UserError::TooManyVectors( - external_document_id, - embeddings.len(), - ) - .into()); - } - } - } - } - ArroyOperation::SetVector { docid, embedder_id, embedding } => { - let (_, _, writers, dimensions) = - arroy_writers.get(&embedder_id).expect("requested a missing embedder"); - for res in writers.iter().zip_longest(std::iter::once(&embedding)) { - match res { - EitherOrBoth::Both(writer, embedding) => { - writer.add_item(wtxn, *dimensions, docid, embedding)?; - } - EitherOrBoth::Left(writer) => { - let deleted = writer.del_item(wtxn, *dimensions, docid)?; - if !deleted { - break; - } - } - EitherOrBoth::Right(_embedding) => { - unreachable!("1 vs 256 vectors") - } - } - } - } - ArroyOperation::Finish { mut user_provided } => { - let span = tracing::trace_span!(target: "indexing::vectors", parent: &indexer_span, "build"); - let _entered = span.enter(); - for (_embedder_index, (_embedder_name, _embedder, writers, dimensions)) in - &mut arroy_writers - { - let dimensions = *dimensions; - for writer in writers { - if writer.need_build(wtxn, dimensions)? { - writer.build(wtxn, &mut rng, dimensions)?; - } else if writer.is_empty(wtxn, dimensions)? { - break; - } - } - } - - let mut configs = index.embedding_configs(wtxn)?; - - for config in &mut configs { - if let Some(user_provided) = user_provided.remove(&config.name) { - config.user_provided = user_provided; - } - } - - index.put_embedding_configs(wtxn, configs)?; - } - }, - } - } - - /// TODO handle the panicking threads - handle.join().unwrap()?; - let merger_result = merger_thread.join().unwrap()?; - let (finished_steps, step_name) = steps::post_processing_facets(); - (indexing_context.send_progress)(Progress { - finished_steps, - total_steps, - step_name, - finished_total_documents: None, - }); - - if let Some(facet_field_ids_delta) = merger_result.facet_field_ids_delta { - compute_facet_level_database(index, wtxn, facet_field_ids_delta)?; - } - - let (finished_steps, step_name) = steps::post_processing_words(); - (indexing_context.send_progress)(Progress { - finished_steps, - total_steps, - step_name, - finished_total_documents: None, - }); - - if let Some(prefix_delta) = merger_result.prefix_delta { - compute_prefix_database(index, wtxn, prefix_delta)?; - } - - let (finished_steps, step_name) = steps::finalizing(); - (indexing_context.send_progress)(Progress { - finished_steps, - total_steps, - step_name, - finished_total_documents: None, - }); - - Ok(()) as Result<_> - })?; - - // required to into_inner the new_fields_ids_map - drop(fields_ids_map_store); - - let new_fields_ids_map = new_fields_ids_map.into_inner().unwrap(); - index.put_fields_ids_map(wtxn, new_fields_ids_map.as_fields_ids_map())?; - - if let Some(new_primary_key) = new_primary_key { - index.put_primary_key(wtxn, new_primary_key.name())?; - } - - // used to update the localized and weighted maps while sharing the update code with the settings pipeline. - let mut inner_index_settings = InnerIndexSettings::from_index(index, wtxn)?; - inner_index_settings.recompute_facets(wtxn, index)?; - inner_index_settings.recompute_searchables(wtxn, index)?; - index.put_field_distribution(wtxn, &field_distribution)?; - index.set_updated_at(wtxn, &OffsetDateTime::now_utc())?; - - Ok(()) -} - -#[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")] -fn compute_prefix_database( - index: &Index, - wtxn: &mut RwTxn, - prefix_delta: PrefixDelta, -) -> Result<()> { - eprintln!("prefix_delta: {:?}", &prefix_delta); - let PrefixDelta { modified, deleted } = prefix_delta; - // Compute word prefix docids - compute_word_prefix_docids(wtxn, index, &modified, &deleted)?; - // Compute exact word prefix docids - compute_exact_word_prefix_docids(wtxn, index, &modified, &deleted)?; - // Compute word prefix fid docids - compute_word_prefix_fid_docids(wtxn, index, &modified, &deleted)?; - // Compute word prefix position docids - compute_word_prefix_position_docids(wtxn, index, &modified, &deleted) -} - -#[tracing::instrument(level = "trace", skip_all, target = "indexing::facet_field_ids")] -fn compute_facet_level_database( - index: &Index, - wtxn: &mut RwTxn, - facet_field_ids_delta: FacetFieldIdsDelta, -) -> Result<()> { - eprintln!("facet_field_ids_delta: {:?}", &facet_field_ids_delta); - if let Some(modified_facet_string_ids) = facet_field_ids_delta.modified_facet_string_ids() { - let span = tracing::trace_span!(target: "indexing::facet_field_ids", "string"); - let _entered = span.enter(); - FacetsUpdateBulk::new_not_updating_level_0( - index, - modified_facet_string_ids, - FacetType::String, - ) - .execute(wtxn)?; - } - if let Some(modified_facet_number_ids) = facet_field_ids_delta.modified_facet_number_ids() { - let span = tracing::trace_span!(target: "indexing::facet_field_ids", "number"); - let _entered = span.enter(); - FacetsUpdateBulk::new_not_updating_level_0( - index, - modified_facet_number_ids, - FacetType::Number, - ) - .execute(wtxn)?; - } - - Ok(()) -} - -/// TODO: GrenadParameters::default() should be removed in favor a passed parameter -/// TODO: manage the errors correctly -/// TODO: we must have a single trait that also gives the extractor type -#[allow(clippy::too_many_arguments)] -fn extract_and_send_docids< - 'pl, - 'fid, - 'indexer, - 'index, - DC: DocumentChanges<'pl>, - E: DocidsExtractor, - D: MergerOperationType, - MSP, - SP, ->( - grenad_parameters: GrenadParameters, - document_changes: &DC, - indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP, SP>, - extractor_allocs: &mut ThreadLocal>>, - sender: &ExtractorSender, - finished_steps: u16, - total_steps: u16, - step_name: &'static str, -) -> Result<()> -where - MSP: Fn() -> bool + Sync, - SP: Fn(Progress) + Sync, -{ - let merger = E::run_extraction( - grenad_parameters, - document_changes, - indexing_context, - extractor_allocs, - finished_steps, - total_steps, - step_name, - )?; - sender.send_searchable::(merger).unwrap(); - Ok(()) -} - -/// Returns the primary key that has already been set for this index or the -/// one we will guess by searching for the first key that contains "id" as a substring, -/// and whether the primary key changed -/// TODO move this elsewhere -pub fn retrieve_or_guess_primary_key<'a>( - rtxn: &'a RoTxn<'a>, - index: &Index, - new_fields_ids_map: &mut FieldsIdsMap, - primary_key_from_op: Option<&'a str>, - first_document: Option<&'a TopLevelMap<'a>>, -) -> Result, bool), UserError>> { - // make sure that we have a declared primary key, either fetching it from the index or attempting to guess it. - - // do we have an existing declared primary key? - let (primary_key, has_changed) = if let Some(primary_key_from_db) = index.primary_key(rtxn)? { - // did we request a primary key in the operation? - match primary_key_from_op { - // we did, and it is different from the DB one - Some(primary_key_from_op) if primary_key_from_op != primary_key_from_db => { - // is the index empty? - if index.number_of_documents(rtxn)? == 0 { - // change primary key - (primary_key_from_op, true) - } else { - return Ok(Err(UserError::PrimaryKeyCannotBeChanged( - primary_key_from_db.to_string(), - ))); - } - } - _ => (primary_key_from_db, false), - } - } else { - // no primary key in the DB => let's set one - // did we request a primary key in the operation? - let primary_key = if let Some(primary_key_from_op) = primary_key_from_op { - // set primary key from operation - primary_key_from_op - } else { - // guess primary key - let first_document = match first_document { - Some(document) => document, - // previous indexer when no pk is set + we send an empty payload => index_primary_key_no_candidate_found - None => return Ok(Err(UserError::NoPrimaryKeyCandidateFound)), - }; - - let mut guesses: Vec<&str> = first_document - .keys() - .map(AsRef::as_ref) - .filter(|name| name.to_lowercase().ends_with(DEFAULT_PRIMARY_KEY)) - .collect(); - - // sort the keys in lexicographical order, so that fields are always in the same order. - guesses.sort_unstable(); - - match guesses.as_slice() { - [] => return Ok(Err(UserError::NoPrimaryKeyCandidateFound)), - [name] => { - tracing::info!("Primary key was not specified in index. Inferred to '{name}'"); - *name - } - multiple => { - return Ok(Err(UserError::MultiplePrimaryKeyCandidatesFound { - candidates: multiple - .iter() - .map(|candidate| candidate.to_string()) - .collect(), - })) - } - } - }; - (primary_key, true) - }; - - match PrimaryKey::new_or_insert(primary_key, new_fields_ids_map) { - Ok(primary_key) => Ok(Ok((primary_key, has_changed))), - Err(err) => Ok(Err(err)), - } -} - -fn request_threads() -> &'static ThreadPoolNoAbort { - static REQUEST_THREADS: OnceLock = OnceLock::new(); - - REQUEST_THREADS.get_or_init(|| { - ThreadPoolNoAbortBuilder::new() - .num_threads(crate::vector::REQUEST_PARALLELISM) - .thread_name(|index| format!("embedding-request-{index}")) - .build() - .unwrap() - }) -}