From 670aff5553b0ce70bcca05335e26489e9a90e9f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Mon, 18 Nov 2024 11:59:18 +0100 Subject: [PATCH] Remove useless Transform methods --- .../milli/src/update/index_documents/mod.rs | 608 +----------------- .../src/update/index_documents/transform.rs | 573 +---------------- 2 files changed, 11 insertions(+), 1170 deletions(-) diff --git a/crates/milli/src/update/index_documents/mod.rs b/crates/milli/src/update/index_documents/mod.rs index befde896d..8ac183241 100644 --- a/crates/milli/src/update/index_documents/mod.rs +++ b/crates/milli/src/update/index_documents/mod.rs @@ -5,27 +5,16 @@ mod parallel; mod transform; mod typed_chunk; -use std::collections::{HashMap, HashSet}; -use std::io::{Read, Seek}; -use std::iter; +use std::collections::HashSet; use std::num::NonZeroU32; -use std::result::Result as StdResult; -use std::sync::Arc; -use crossbeam_channel::{Receiver, Sender}; -use grenad::{Merger, MergerBuilder}; +use grenad::Merger; use heed::types::Str; use heed::Database; -use rand::SeedableRng; -use rayon::iter::{ParallelBridge, ParallelIterator}; -use rhai::{Dynamic, Engine, OptimizationLevel, Scope}; -use roaring::RoaringBitmap; use serde::{Deserialize, Serialize}; use slice_group_by::GroupBy; -use tracing::debug; -use typed_chunk::{write_typed_chunk_into_index, ChunkAccumulator, TypedChunk}; +use typed_chunk::TypedChunk; -use self::enrich::enrich_documents_batch; pub use self::enrich::{extract_finite_float_from_value, DocumentId}; pub use self::helpers::*; pub use self::transform::{Transform, TransformOutput}; @@ -128,602 +117,11 @@ where }) } - /// Adds a batch of documents to the current builder. - /// - /// Since the documents are progressively added to the writer, a failure will cause only - /// return an error and not the `IndexDocuments` struct as it is invalid to use it afterward. - /// - /// Returns the number of documents added to the builder. - #[tracing::instrument(level = "trace", skip_all, target = "indexing::documents")] - pub fn add_documents( - mut self, - reader: DocumentsBatchReader, - ) -> Result<(Self, StdResult)> { - // Early return when there is no document to add - if reader.is_empty() { - return Ok((self, Ok(0))); - } - - // We check for user errors in this validator and if there is one, we can return - // the `IndexDocument` struct as it is valid to send more documents into it. - // However, if there is an internal error we throw it away! - let enriched_documents_reader = match enrich_documents_batch( - self.wtxn, - self.index, - self.config.autogenerate_docids, - reader, - )? { - Ok(reader) => reader, - Err(user_error) => return Ok((self, Err(user_error))), - }; - - let indexed_documents = - self.transform.as_mut().expect("Invalid document addition state").read_documents( - enriched_documents_reader, - self.wtxn, - &self.progress, - &self.should_abort, - )? as u64; - - self.added_documents += indexed_documents; - - Ok((self, Ok(indexed_documents))) - } - - #[tracing::instrument(level = "trace", skip_all, target = "indexing::documents")] - pub fn edit_documents( - self, - documents: &RoaringBitmap, - context: Option, - code: &str, - ) -> Result<(Self, StdResult<(u64, u64), UserError>)> { - // Early return when there is no document to edit - if documents.is_empty() { - return Ok((self, Ok((0, 0)))); - } - - fn rhaimap_to_object(map: rhai::Map) -> Object { - let mut output = Object::new(); - for (key, value) in map { - let value = serde_json::to_value(&value).unwrap(); - output.insert(key.into(), value); - } - output - } - - // Setup the security and limits of the Engine - let mut engine = Engine::new(); - engine.set_optimization_level(OptimizationLevel::Full); - engine.set_max_call_levels(1000); - // It is an arbitrary value. We need to let users define this in the settings. - engine.set_max_operations(1_000_000); - engine.set_max_variables(1000); - engine.set_max_functions(30); - engine.set_max_expr_depths(100, 1000); - engine.set_max_string_size(1024 * 1024 * 1024); // 1 GiB - engine.set_max_array_size(10_000); - engine.set_max_map_size(10_000); - - let ast = engine.compile(code).map_err(UserError::DocumentEditionCompilationError)?; - let fields_ids_map = self.index.fields_ids_map(self.wtxn)?; - let primary_key = self.index.primary_key(self.wtxn)?.unwrap(); - let mut documents_batch_builder = tempfile::tempfile().map(DocumentsBatchBuilder::new)?; - let mut documents_to_remove = RoaringBitmap::new(); - - let context: Option = match context { - Some(context) => { - Some(serde_json::from_value(context.into()).map_err(InternalError::SerdeJson)?) - } - None => None, - }; - - enum DocumentEdition { - Deleted(crate::DocumentId), - Edited(Object), - Nothing, - } - - let immutable_obkvs = ImmutableObkvs::new( - self.wtxn, - self.index.documents, - fields_ids_map.clone(), - documents.clone(), - )?; - - let processing = documents.into_iter().par_bridge().map(|docid| { - // safety: Both documents *must* exists in the database as - // their IDs comes from the list of documents ids. - let rhai_document = immutable_obkvs.rhai_map(docid)?.unwrap(); - let json_document = immutable_obkvs.json_map(docid)?.unwrap(); - let document_id = &json_document[primary_key]; - - let mut scope = Scope::new(); - if let Some(context) = context.as_ref().cloned() { - scope.push_constant_dynamic("context", context.clone()); - } - scope.push("doc", rhai_document); - // That's were the magic happens. We run the user script - // which edits "doc" scope variable reprensenting the document - // and ignore the output and even the type of it, i.e., Dynamic. - let _ = engine - .eval_ast_with_scope::(&mut scope, &ast) - .map_err(UserError::DocumentEditionRuntimeError)?; - - match scope.remove::("doc") { - // If the "doc" variable has set to (), we effectively delete the document. - Some(doc) if doc.is_unit() => Ok(DocumentEdition::Deleted(docid)), - None => unreachable!("missing doc variable from the Rhai scope"), - Some(document) => match document.try_cast() { - Some(document) => { - let new_document = rhaimap_to_object(document); - // Note: This condition is not perfect. Sometimes it detect changes - // like with floating points numbers and consider updating - // the document even if nothing actually changed. - if json_document != new_document { - if Some(document_id) != new_document.get(primary_key) { - Err(Error::UserError( - UserError::DocumentEditionCannotModifyPrimaryKey, - )) - } else { - Ok(DocumentEdition::Edited(new_document)) - } - } else { - Ok(DocumentEdition::Nothing) - } - } - None => Err(Error::UserError(UserError::DocumentEditionDocumentMustBeObject)), - }, - } - }); - - rayon_par_bridge::par_bridge(100, processing, |iterator| { - for result in iterator { - if (self.should_abort)() { - return Err(Error::InternalError(InternalError::AbortedIndexation)); - } - - match result? { - DocumentEdition::Deleted(docid) => { - documents_to_remove.insert(docid); - } - DocumentEdition::Edited(new_document) => { - documents_batch_builder.append_json_object(&new_document)?; - } - DocumentEdition::Nothing => (), - } - } - - Ok(()) - })?; - - let file = documents_batch_builder.into_inner()?; - let reader = DocumentsBatchReader::from_reader(file)?; - - let (this, removed) = self.remove_documents_from_db_no_batch(&documents_to_remove)?; - let (this, result) = this.add_documents(reader)?; - - Ok((this, result.map(|added| (removed, added)))) - } - pub fn with_embedders(mut self, embedders: EmbeddingConfigs) -> Self { self.embedders = embedders; self } - /// Remove a batch of documents from the current builder. - /// - /// Returns the number of documents deleted from the builder. - #[tracing::instrument(level = "trace", skip_all, target = "indexing::documents")] - pub fn remove_documents( - mut self, - to_delete: Vec, - ) -> Result<(Self, StdResult)> { - // Early return when there is no document to add - if to_delete.is_empty() { - // Maintains Invariant: remove documents actually always returns Ok for the inner result - return Ok((self, Ok(0))); - } - - let deleted_documents = self - .transform - .as_mut() - .expect("Invalid document deletion state") - .remove_documents(to_delete, self.wtxn, &self.should_abort)? - as u64; - - self.deleted_documents += deleted_documents; - - // Maintains Invariant: remove documents actually always returns Ok for the inner result - Ok((self, Ok(deleted_documents))) - } - - /// Removes documents from db using their internal document ids. - /// - /// # Warning - /// - /// This function is dangerous and will only work correctly if: - /// - /// - All the passed ids currently exist in the database - /// - No batching using the standards `remove_documents` and `add_documents` took place - /// - /// TODO: make it impossible to call `remove_documents` or `add_documents` on an instance that calls this function. - #[tracing::instrument(level = "trace", skip_all, target = "indexing::details")] - pub fn remove_documents_from_db_no_batch( - mut self, - to_delete: &RoaringBitmap, - ) -> Result<(Self, u64)> { - // Early return when there is no document to add - if to_delete.is_empty() { - return Ok((self, 0)); - } - - let deleted_documents = self - .transform - .as_mut() - .expect("Invalid document deletion state") - .remove_documents_from_db_no_batch(to_delete, self.wtxn, &self.should_abort)? - as u64; - - self.deleted_documents += deleted_documents; - - Ok((self, deleted_documents)) - } - - #[tracing::instrument( - level = "trace" - skip_all, - target = "indexing::documents", - name = "index_documents" - )] - pub fn execute(mut self) -> Result { - if self.added_documents == 0 && self.deleted_documents == 0 { - let number_of_documents = self.index.number_of_documents(self.wtxn)?; - return Ok(DocumentAdditionResult { indexed_documents: 0, number_of_documents }); - } - let output = self - .transform - .take() - .expect("Invalid document addition state") - .output_from_sorter(self.wtxn, &self.progress)?; - - let indexed_documents = output.documents_count as u64; - let number_of_documents = self.execute_raw(output)?; - - Ok(DocumentAdditionResult { indexed_documents, number_of_documents }) - } - - /// Returns the total number of documents in the index after the update. - #[tracing::instrument( - level = "trace", - skip_all, - target = "indexing::details", - name = "index_documents_raw" - )] - pub fn execute_raw(self, output: TransformOutput) -> Result - where - FP: Fn(UpdateIndexingStep) + Sync, - FA: Fn() -> bool + Sync, - { - let TransformOutput { - primary_key, - mut settings_diff, - field_distribution, - documents_count, - original_documents, - flattened_documents, - } = output; - - // update the internal facet and searchable list, - // because they might have changed due to the nested documents flattening. - settings_diff.new.recompute_facets(self.wtxn, self.index)?; - settings_diff.new.recompute_searchables(self.wtxn, self.index)?; - - let settings_diff = Arc::new(settings_diff); - let embedders_configs = Arc::new(self.index.embedding_configs(self.wtxn)?); - - let possible_embedding_mistakes = - crate::vector::error::PossibleEmbeddingMistakes::new(&field_distribution); - - let backup_pool; - let pool = match self.indexer_config.thread_pool { - Some(ref pool) => pool, - None => { - // We initialize a backup pool with the default - // settings if none have already been set. - #[allow(unused_mut)] - let mut pool_builder = ThreadPoolNoAbortBuilder::new(); - - #[cfg(test)] - { - pool_builder = pool_builder.num_threads(1); - } - - backup_pool = pool_builder.build()?; - &backup_pool - } - }; - - // create LMDB writer channel - let (lmdb_writer_sx, lmdb_writer_rx): ( - Sender>, - Receiver>, - ) = crossbeam_channel::unbounded(); - - // get the primary key field id - let primary_key_id = settings_diff.new.fields_ids_map.id(&primary_key).unwrap(); - - let pool_params = GrenadParameters { - chunk_compression_type: self.indexer_config.chunk_compression_type, - chunk_compression_level: self.indexer_config.chunk_compression_level, - max_memory: self.indexer_config.max_memory, - max_nb_chunks: self.indexer_config.max_nb_chunks, // default value, may be chosen. - }; - let documents_chunk_size = match self.indexer_config.documents_chunk_size { - Some(chunk_size) => chunk_size, - None => { - let default_chunk_size = 1024 * 1024 * 4; // 4MiB - let min_chunk_size = 1024 * 512; // 512KiB - - // compute the chunk size from the number of available threads and the inputed data size. - let total_size = match flattened_documents.as_ref() { - Some(flattened_documents) => flattened_documents.metadata().map(|m| m.len()), - None => Ok(default_chunk_size as u64), - }; - let current_num_threads = pool.current_num_threads(); - // if we have more than 2 thread, create a number of chunk equal to 3/4 threads count - let chunk_count = if current_num_threads > 2 { - (current_num_threads * 3 / 4).max(2) - } else { - current_num_threads - }; - total_size - .map_or(default_chunk_size, |size| (size as usize) / chunk_count) - .max(min_chunk_size) - } - }; - - let original_documents = match original_documents { - Some(original_documents) => Some(grenad::Reader::new(original_documents)?), - None => None, - }; - let flattened_documents = match flattened_documents { - Some(flattened_documents) => Some(grenad::Reader::new(flattened_documents)?), - None => None, - }; - - let max_positions_per_attributes = self.indexer_config.max_positions_per_attributes; - - let mut final_documents_ids = RoaringBitmap::new(); - let mut databases_seen = 0; - let mut word_position_docids = None; - let mut word_fid_docids = None; - let mut word_docids = None; - let mut exact_word_docids = None; - let mut chunk_accumulator = ChunkAccumulator::default(); - let mut dimension = HashMap::new(); - - let current_span = tracing::Span::current(); - - // Run extraction pipeline in parallel. - pool.install(|| { - let settings_diff_cloned = settings_diff.clone(); - rayon::spawn(move || { - let child_span = tracing::trace_span!(target: "indexing::details", parent: ¤t_span, "extract_and_send_grenad_chunks"); - let _enter = child_span.enter(); - - // split obkv file into several chunks - let original_chunk_iter = match original_documents { - Some(original_documents) => { - grenad_obkv_into_chunks(original_documents,pool_params,documents_chunk_size).map(either::Left) - }, - None => Ok(either::Right(iter::empty())), - }; - - // split obkv file into several chunks - let flattened_chunk_iter = match flattened_documents { - Some(flattened_documents) => { - grenad_obkv_into_chunks(flattened_documents, pool_params, documents_chunk_size).map(either::Left) - }, - None => Ok(either::Right(iter::empty())), - }; - - let result = original_chunk_iter.and_then(|original_chunk| { - let flattened_chunk = flattened_chunk_iter?; - // extract all databases from the chunked obkv douments - extract::data_from_obkv_documents( - original_chunk, - flattened_chunk, - pool_params, - lmdb_writer_sx.clone(), - primary_key_id, - embedders_configs.clone(), - settings_diff_cloned, - max_positions_per_attributes, - Arc::new(possible_embedding_mistakes) - ) - }); - - if let Err(e) = result { - let _ = lmdb_writer_sx.send(Err(e)); - } - - // needs to be dropped to avoid channel waiting lock. - drop(lmdb_writer_sx); - }); - - (self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase { - databases_seen, - total_databases: TOTAL_POSTING_DATABASE_COUNT, - }); - - loop { - if (self.should_abort)() { - return Err(Error::InternalError(InternalError::AbortedIndexation)); - } - - match lmdb_writer_rx.clone().recv_timeout(std::time::Duration::from_millis(500)) { - Err(status) => { - if let Some(typed_chunks) = chunk_accumulator.pop_longest() { - let (docids, is_merged_database) = - write_typed_chunk_into_index(self.wtxn, self.index, &settings_diff, typed_chunks)?; - if !docids.is_empty() { - final_documents_ids |= docids; - let documents_seen_count = final_documents_ids.len(); - (self.progress)(UpdateIndexingStep::IndexDocuments { - documents_seen: documents_seen_count as usize, - total_documents: documents_count, - }); - debug!(documents = documents_seen_count, total = documents_count, "Seen"); - } - if is_merged_database { - databases_seen += 1; - (self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase { - databases_seen, - total_databases: TOTAL_POSTING_DATABASE_COUNT, - }); - } - // If no more chunk remains in the chunk accumulator and the channel is disconected, break. - } else if status == crossbeam_channel::RecvTimeoutError::Disconnected { - break; - } else { - rayon::yield_now(); - } - } - Ok(result) => { - let typed_chunk = match result? { - TypedChunk::WordDocids { - word_docids_reader, - exact_word_docids_reader, - word_fid_docids_reader, - } => { - let cloneable_chunk = - unsafe { as_cloneable_grenad(&word_docids_reader)? }; - let word_docids = word_docids.get_or_insert_with(|| { - MergerBuilder::new(MergeDeladdCboRoaringBitmaps) - }); - word_docids.push(cloneable_chunk.into_cursor()?); - let cloneable_chunk = - unsafe { as_cloneable_grenad(&exact_word_docids_reader)? }; - let exact_word_docids = - exact_word_docids.get_or_insert_with(|| { - MergerBuilder::new( - MergeDeladdCboRoaringBitmaps, - ) - }); - exact_word_docids.push(cloneable_chunk.into_cursor()?); - let cloneable_chunk = - unsafe { as_cloneable_grenad(&word_fid_docids_reader)? }; - let word_fid_docids = word_fid_docids.get_or_insert_with(|| { - MergerBuilder::new(MergeDeladdCboRoaringBitmaps) - }); - word_fid_docids.push(cloneable_chunk.into_cursor()?); - TypedChunk::WordDocids { - word_docids_reader, - exact_word_docids_reader, - word_fid_docids_reader, - } - } - TypedChunk::WordPositionDocids(chunk) => { - let cloneable_chunk = unsafe { as_cloneable_grenad(&chunk)? }; - let word_position_docids = - word_position_docids.get_or_insert_with(|| { - MergerBuilder::new( - MergeDeladdCboRoaringBitmaps, - ) - }); - word_position_docids.push(cloneable_chunk.into_cursor()?); - TypedChunk::WordPositionDocids(chunk) - } - TypedChunk::VectorPoints { - expected_dimension, - remove_vectors, - embeddings, - manual_vectors, - embedder_name, - add_to_user_provided, - remove_from_user_provided, - } => { - dimension.insert(embedder_name.clone(), expected_dimension); - TypedChunk::VectorPoints { - remove_vectors, - embeddings, - expected_dimension, - manual_vectors, - embedder_name, - add_to_user_provided, - remove_from_user_provided, - } - } - otherwise => otherwise, - }; - - chunk_accumulator.insert(typed_chunk); - } - } - } - - Ok(()) - }).map_err(InternalError::from)??; - - // We write the field distribution into the main database - self.index.put_field_distribution(self.wtxn, &field_distribution)?; - - // We write the primary key field id into the main database - self.index.put_primary_key(self.wtxn, &primary_key)?; - let number_of_documents = self.index.number_of_documents(self.wtxn)?; - let mut rng = rand::rngs::StdRng::seed_from_u64(42); - - // If an embedder wasn't used in the typedchunk but must be binary quantized - // we should insert it in `dimension` - for (name, action) in settings_diff.embedding_config_updates.iter() { - if action.is_being_quantized && !dimension.contains_key(name.as_str()) { - let index = self.index.embedder_category_id.get(self.wtxn, name)?.ok_or( - InternalError::DatabaseMissingEntry { - db_name: "embedder_category_id", - key: None, - }, - )?; - let reader = - ArroyWrapper::new(self.index.vector_arroy, index, action.was_quantized); - let dim = reader.dimensions(self.wtxn)?; - dimension.insert(name.to_string(), dim); - } - } - - for (embedder_name, dimension) in dimension { - let wtxn = &mut *self.wtxn; - let vector_arroy = self.index.vector_arroy; - let cancel = &self.should_abort; - - let embedder_index = self.index.embedder_category_id.get(wtxn, &embedder_name)?.ok_or( - InternalError::DatabaseMissingEntry { db_name: "embedder_category_id", key: None }, - )?; - let embedder_config = settings_diff.embedding_config_updates.get(&embedder_name); - let was_quantized = settings_diff - .old - .embedding_configs - .get(&embedder_name) - .map_or(false, |conf| conf.2); - let is_quantizing = embedder_config.map_or(false, |action| action.is_being_quantized); - - pool.install(|| { - let mut writer = ArroyWrapper::new(vector_arroy, embedder_index, was_quantized); - writer.build_and_quantize(wtxn, &mut rng, dimension, is_quantizing, cancel)?; - Result::Ok(()) - }) - .map_err(InternalError::from)??; - } - - self.execute_prefix_databases( - word_docids.map(MergerBuilder::build), - exact_word_docids.map(MergerBuilder::build), - word_position_docids.map(MergerBuilder::build), - word_fid_docids.map(MergerBuilder::build), - )?; - - Ok(number_of_documents) - } - #[tracing::instrument( level = "trace", skip_all, diff --git a/crates/milli/src/update/index_documents/transform.rs b/crates/milli/src/update/index_documents/transform.rs index 7239e8bff..7f156f348 100644 --- a/crates/milli/src/update/index_documents/transform.rs +++ b/crates/milli/src/update/index_documents/transform.rs @@ -1,33 +1,29 @@ use std::borrow::Cow; -use std::collections::btree_map::Entry as BEntry; -use std::collections::hash_map::Entry as HEntry; use std::collections::{BTreeMap, HashMap, HashSet}; use std::fs::File; -use std::io::{Read, Seek}; use either::Either; use fxhash::FxHashMap; use itertools::Itertools; -use obkv::{KvReader, KvReaderU16, KvWriter}; +use obkv::{KvReader, KvWriter}; use roaring::RoaringBitmap; use serde_json::Value; use smartstring::SmartString; use super::helpers::{ - create_sorter, create_writer, sorter_into_reader, EitherObkvMerge, - ObkvsKeepLastAdditionMergeDeletions, ObkvsMergeAdditionsAndDeletions, + create_sorter, sorter_into_reader, EitherObkvMerge, ObkvsKeepLastAdditionMergeDeletions, + ObkvsMergeAdditionsAndDeletions, }; use super::{IndexDocumentsMethod, IndexerConfig, KeepFirst}; -use crate::documents::{DocumentsBatchIndex, EnrichedDocument, EnrichedDocumentsBatchReader}; +use crate::documents::DocumentsBatchIndex; use crate::error::{Error, InternalError, UserError}; use crate::index::{db_name, main_key}; use crate::update::del_add::{ - into_del_add_obkv, into_del_add_obkv_conditional_operation, DelAdd, DelAddOperation, - KvReaderDelAdd, + into_del_add_obkv, into_del_add_obkv_conditional_operation, DelAddOperation, }; use crate::update::index_documents::GrenadParameters; -use crate::update::settings::{InnerIndexSettings, InnerIndexSettingsDiff}; -use crate::update::{AvailableIds, UpdateIndexingStep}; +use crate::update::settings::InnerIndexSettingsDiff; +use crate::update::AvailableIds; use crate::vector::parsed_vectors::{ExplicitVectors, VectorOrArrayOfVectors}; use crate::vector::settings::WriteBackToDocuments; use crate::vector::ArroyWrapper; @@ -157,399 +153,6 @@ impl<'a, 'i> Transform<'a, 'i> { }) } - #[tracing::instrument(level = "trace", skip_all, target = "indexing::documents")] - pub fn read_documents( - &mut self, - reader: EnrichedDocumentsBatchReader, - wtxn: &mut heed::RwTxn<'_>, - progress_callback: FP, - should_abort: FA, - ) -> Result - where - R: Read + Seek, - FP: Fn(UpdateIndexingStep) + Sync, - FA: Fn() -> bool + Sync, - { - let (mut cursor, fields_index) = reader.into_cursor_and_fields_index(); - let external_documents_ids = self.index.external_documents_ids(); - let mapping = create_fields_mapping(&mut self.fields_ids_map, &fields_index)?; - - let primary_key = cursor.primary_key().to_string(); - let primary_key_id = - self.fields_ids_map.insert(&primary_key).ok_or(UserError::AttributeLimitReached)?; - - let mut obkv_buffer = Vec::new(); - let mut document_sorter_value_buffer = Vec::new(); - let mut document_sorter_key_buffer = Vec::new(); - let mut documents_count = 0; - let mut docid_buffer: Vec = Vec::new(); - let mut field_buffer: Vec<(u16, Cow<'_, [u8]>)> = Vec::new(); - while let Some(enriched_document) = cursor.next_enriched_document()? { - let EnrichedDocument { document, document_id } = enriched_document; - - if should_abort() { - return Err(Error::InternalError(InternalError::AbortedIndexation)); - } - - // drop_and_reuse is called instead of .clear() to communicate to the compiler that field_buffer - // does not keep references from the cursor between loop iterations - let mut field_buffer_cache = drop_and_reuse(field_buffer); - if self.indexer_settings.log_every_n.map_or(false, |len| documents_count % len == 0) { - progress_callback(UpdateIndexingStep::RemapDocumentAddition { - documents_seen: documents_count, - }); - } - - // When the document id has been auto-generated by the `enrich_documents_batch` - // we must insert this document id into the remaped document. - let external_id = document_id.value(); - if document_id.is_generated() { - serde_json::to_writer(&mut docid_buffer, external_id) - .map_err(InternalError::SerdeJson)?; - field_buffer_cache.push((primary_key_id, Cow::from(&docid_buffer))); - } - - for (k, v) in document.iter() { - let mapped_id = - *mapping.get(&k).ok_or(InternalError::FieldIdMappingMissingEntry { key: k })?; - field_buffer_cache.push((mapped_id, Cow::from(v))); - } - - // Insertion in a obkv need to be done with keys ordered. For now they are ordered - // according to the document addition key order, so we sort it according to the - // fieldids map keys order. - field_buffer_cache.sort_unstable_by(|(f1, _), (f2, _)| f1.cmp(f2)); - - // Build the new obkv document. - let mut writer = KvWriter::new(&mut obkv_buffer); - for (k, v) in field_buffer_cache.iter() { - writer.insert(*k, v)?; - } - - let mut original_docid = None; - let docid = match self.new_external_documents_ids_builder.entry((*external_id).into()) { - HEntry::Occupied(entry) => *entry.get() as u32, - HEntry::Vacant(entry) => { - let docid = match external_documents_ids.get(wtxn, entry.key())? { - Some(docid) => { - // If it was already in the list of replaced documents it means it was deleted - // by the remove_document method. We should starts as if it never existed. - if self.replaced_documents_ids.insert(docid) { - original_docid = Some(docid); - } - - docid - } - None => self - .available_documents_ids - .next() - .ok_or(UserError::DocumentLimitReached)?, - }; - entry.insert(docid as u64); - docid - } - }; - - let mut skip_insertion = false; - if let Some(original_docid) = original_docid { - let original_key = original_docid; - let base_obkv = self - .index - .documents - .remap_data_type::() - .get(wtxn, &original_key)? - .ok_or(InternalError::DatabaseMissingEntry { - db_name: db_name::DOCUMENTS, - key: None, - })?; - - // we check if the two documents are exactly equal. If it's the case we can skip this document entirely - if base_obkv == obkv_buffer { - // we're not replacing anything - self.replaced_documents_ids.remove(original_docid); - // and we need to put back the original id as it was before - self.new_external_documents_ids_builder.remove(external_id); - skip_insertion = true; - } else { - // we associate the base document with the new key, everything will get merged later. - let deladd_operation = match self.index_documents_method { - IndexDocumentsMethod::UpdateDocuments => { - DelAddOperation::DeletionAndAddition - } - IndexDocumentsMethod::ReplaceDocuments => DelAddOperation::Deletion, - }; - document_sorter_key_buffer.clear(); - document_sorter_key_buffer.extend_from_slice(&docid.to_be_bytes()); - document_sorter_key_buffer.extend_from_slice(external_id.as_bytes()); - document_sorter_value_buffer.clear(); - document_sorter_value_buffer.push(Operation::Addition as u8); - into_del_add_obkv( - KvReaderU16::from_slice(base_obkv), - deladd_operation, - &mut document_sorter_value_buffer, - )?; - self.original_sorter - .insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?; - let base_obkv = KvReader::from_slice(base_obkv); - if let Some(flattened_obkv) = - Self::flatten_from_fields_ids_map(base_obkv, &mut self.fields_ids_map)? - { - // we recreate our buffer with the flattened documents - document_sorter_value_buffer.clear(); - document_sorter_value_buffer.push(Operation::Addition as u8); - into_del_add_obkv( - KvReaderU16::from_slice(&flattened_obkv), - deladd_operation, - &mut document_sorter_value_buffer, - )?; - } - self.flattened_sorter - .insert(docid.to_be_bytes(), &document_sorter_value_buffer)?; - } - } - - if !skip_insertion { - self.new_documents_ids.insert(docid); - - document_sorter_key_buffer.clear(); - document_sorter_key_buffer.extend_from_slice(&docid.to_be_bytes()); - document_sorter_key_buffer.extend_from_slice(external_id.as_bytes()); - document_sorter_value_buffer.clear(); - document_sorter_value_buffer.push(Operation::Addition as u8); - into_del_add_obkv( - KvReaderU16::from_slice(&obkv_buffer), - DelAddOperation::Addition, - &mut document_sorter_value_buffer, - )?; - // We use the extracted/generated user id as the key for this document. - self.original_sorter - .insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?; - - let flattened_obkv = KvReader::from_slice(&obkv_buffer); - if let Some(obkv) = - Self::flatten_from_fields_ids_map(flattened_obkv, &mut self.fields_ids_map)? - { - document_sorter_value_buffer.clear(); - document_sorter_value_buffer.push(Operation::Addition as u8); - into_del_add_obkv( - KvReaderU16::from_slice(&obkv), - DelAddOperation::Addition, - &mut document_sorter_value_buffer, - )? - } - self.flattened_sorter.insert(docid.to_be_bytes(), &document_sorter_value_buffer)?; - } - documents_count += 1; - - progress_callback(UpdateIndexingStep::RemapDocumentAddition { - documents_seen: documents_count, - }); - - field_buffer = drop_and_reuse(field_buffer_cache); - docid_buffer.clear(); - obkv_buffer.clear(); - } - - progress_callback(UpdateIndexingStep::RemapDocumentAddition { - documents_seen: documents_count, - }); - - self.index.put_fields_ids_map(wtxn, &self.fields_ids_map)?; - self.index.put_primary_key(wtxn, &primary_key)?; - self.documents_count += documents_count; - // Now that we have a valid sorter that contains the user id and the obkv we - // give it to the last transforming function which returns the TransformOutput. - Ok(documents_count) - } - - /// The counter part of `read_documents` that removes documents either from the transform or the database. - /// It can be called before, after or in between two calls of the `read_documents`. - /// - /// It needs to update all the internal datastructure in the transform. - /// - If the document is coming from the database -> it's marked as a to_delete document - /// - If the document to remove was inserted by the `read_documents` method before AND was present in the db, - /// it's marked as `to_delete` + added into the grenad to ensure we don't reinsert it. - /// - If the document to remove was inserted by the `read_documents` method before but was NOT present in the db, - /// it's added into the grenad to ensure we don't insert it + removed from the list of new documents ids. - /// - If the document to remove was not present in either the db or the transform we do nothing. - #[tracing::instrument(level = "trace", skip_all, target = "indexing::documents")] - pub fn remove_documents( - &mut self, - mut to_remove: Vec, - wtxn: &mut heed::RwTxn<'_>, - should_abort: FA, - ) -> Result - where - FA: Fn() -> bool + Sync, - { - // there may be duplicates in the documents to remove. - to_remove.sort_unstable(); - to_remove.dedup(); - - let external_documents_ids = self.index.external_documents_ids(); - - let mut documents_deleted = 0; - let mut document_sorter_value_buffer = Vec::new(); - let mut document_sorter_key_buffer = Vec::new(); - for to_remove in to_remove { - if should_abort() { - return Err(Error::InternalError(InternalError::AbortedIndexation)); - } - - // Check if the document has been added in the current indexing process. - let deleted_from_current = - match self.new_external_documents_ids_builder.entry((*to_remove).into()) { - // if the document was added in a previous iteration of the transform we make it as deleted in the sorters. - HEntry::Occupied(entry) => { - let docid = *entry.get() as u32; - // Key is the concatenation of the internal docid and the external one. - document_sorter_key_buffer.clear(); - document_sorter_key_buffer.extend_from_slice(&docid.to_be_bytes()); - document_sorter_key_buffer.extend_from_slice(to_remove.as_bytes()); - document_sorter_value_buffer.clear(); - document_sorter_value_buffer.push(Operation::Deletion as u8); - obkv::KvWriterU16::new(&mut document_sorter_value_buffer).finish().unwrap(); - self.original_sorter - .insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?; - self.flattened_sorter - .insert(docid.to_be_bytes(), &document_sorter_value_buffer)?; - - // we must NOT update the list of replaced_documents_ids - // Either: - // 1. It's already in it and there is nothing to do - // 2. It wasn't in it because the document was created by a previous batch and since - // we're removing it there is nothing to do. - self.new_documents_ids.remove(docid); - entry.remove_entry(); - true - } - HEntry::Vacant(_) => false, - }; - - // If the document was already in the db we mark it as a `to_delete` document. - // Then we push the document in sorters in deletion mode. - let deleted_from_db = match external_documents_ids.get(wtxn, &to_remove)? { - Some(docid) => { - self.remove_document_from_db( - docid, - to_remove, - wtxn, - &mut document_sorter_key_buffer, - &mut document_sorter_value_buffer, - )?; - true - } - None => false, - }; - - // increase counter only if the document existed somewhere before. - if deleted_from_current || deleted_from_db { - documents_deleted += 1; - } - } - - Ok(documents_deleted) - } - - /// Removes documents from db using their internal document ids. - /// - /// # Warning - /// - /// This function is dangerous and will only work correctly if: - /// - /// - All the passed ids currently exist in the database - /// - No batching using the standards `remove_documents` and `add_documents` took place - /// - /// TODO: make it impossible to call `remove_documents` or `add_documents` on an instance that calls this function. - #[tracing::instrument(level = "trace", skip_all, target = "indexing::details")] - pub fn remove_documents_from_db_no_batch( - &mut self, - to_remove: &RoaringBitmap, - wtxn: &mut heed::RwTxn<'_>, - should_abort: FA, - ) -> Result - where - FA: Fn() -> bool + Sync, - { - let mut documents_deleted = 0; - let mut document_sorter_value_buffer = Vec::new(); - let mut document_sorter_key_buffer = Vec::new(); - let external_ids = self.index.external_id_of(wtxn, to_remove.iter())?; - - for (internal_docid, external_docid) in to_remove.iter().zip(external_ids) { - let external_docid = external_docid?; - if should_abort() { - return Err(Error::InternalError(InternalError::AbortedIndexation)); - } - self.remove_document_from_db( - internal_docid, - external_docid, - wtxn, - &mut document_sorter_key_buffer, - &mut document_sorter_value_buffer, - )?; - - documents_deleted += 1; - } - - Ok(documents_deleted) - } - - fn remove_document_from_db( - &mut self, - internal_docid: u32, - external_docid: String, - txn: &heed::RoTxn<'_>, - document_sorter_key_buffer: &mut Vec, - document_sorter_value_buffer: &mut Vec, - ) -> Result<()> { - self.replaced_documents_ids.insert(internal_docid); - - // fetch the obkv document - let original_key = internal_docid; - let base_obkv = self - .index - .documents - .remap_data_type::() - .get(txn, &original_key)? - .ok_or(InternalError::DatabaseMissingEntry { - db_name: db_name::DOCUMENTS, - key: None, - })?; - - // Key is the concatenation of the internal docid and the external one. - document_sorter_key_buffer.clear(); - document_sorter_key_buffer.extend_from_slice(&internal_docid.to_be_bytes()); - document_sorter_key_buffer.extend_from_slice(external_docid.as_bytes()); - // push it as to delete in the original_sorter - document_sorter_value_buffer.clear(); - document_sorter_value_buffer.push(Operation::Deletion as u8); - into_del_add_obkv( - KvReaderU16::from_slice(base_obkv), - DelAddOperation::Deletion, - document_sorter_value_buffer, - )?; - self.original_sorter.insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?; - - // flatten it and push it as to delete in the flattened_sorter - let flattened_obkv = KvReader::from_slice(base_obkv); - if let Some(obkv) = - Self::flatten_from_fields_ids_map(flattened_obkv, &mut self.fields_ids_map)? - { - // we recreate our buffer with the flattened documents - document_sorter_value_buffer.clear(); - document_sorter_value_buffer.push(Operation::Deletion as u8); - into_del_add_obkv( - KvReaderU16::from_slice(&obkv), - DelAddOperation::Deletion, - document_sorter_value_buffer, - )?; - } - self.flattened_sorter - .insert(internal_docid.to_be_bytes(), &document_sorter_value_buffer)?; - Ok(()) - } - // Flatten a document from the fields ids map contained in self and insert the new // created fields. Returns `None` if the document doesn't need to be flattened. #[tracing::instrument( @@ -677,167 +280,6 @@ impl<'a, 'i> Transform<'a, 'i> { Ok(()) } - /// Generate the `TransformOutput` based on the given sorter that can be generated from any - /// format like CSV, JSON or JSON stream. This sorter must contain a key that is the document - /// id for the user side and the value must be an obkv where keys are valid fields ids. - #[tracing::instrument(level = "trace", skip_all, target = "indexing::transform")] - pub(crate) fn output_from_sorter( - self, - wtxn: &mut heed::RwTxn<'_>, - progress_callback: F, - ) -> Result - where - F: Fn(UpdateIndexingStep) + Sync, - { - let primary_key = self - .index - .primary_key(wtxn)? - .ok_or(Error::InternalError(InternalError::DatabaseMissingEntry { - db_name: db_name::MAIN, - key: Some(main_key::PRIMARY_KEY_KEY), - }))? - .to_string(); - - // We create a final writer to write the new documents in order from the sorter. - let mut writer = create_writer( - self.indexer_settings.chunk_compression_type, - self.indexer_settings.chunk_compression_level, - tempfile::tempfile()?, - ); - - // To compute the field distribution we need to; - // 1. Remove all the deleted documents from the field distribution - // 2. Add all the new documents to the field distribution - let mut field_distribution = self.index.field_distribution(wtxn)?; - - // Here we are going to do the document count + field distribution + `write_into_stream_writer` - let mut iter = self.original_sorter.into_stream_merger_iter()?; - // used only for the callback - let mut documents_count = 0; - - while let Some((key, val)) = iter.next()? { - // skip first byte corresponding to the operation type (Deletion or Addition). - let val = &val[1..]; - - // send a callback to show at which step we are - documents_count += 1; - progress_callback(UpdateIndexingStep::ComputeIdsAndMergeDocuments { - documents_seen: documents_count, - total_documents: self.documents_count, - }); - - for (key, value) in KvReader::from_slice(val) { - let reader = KvReaderDelAdd::from_slice(value); - match (reader.get(DelAdd::Deletion), reader.get(DelAdd::Addition)) { - (None, None) => (), - (None, Some(_)) => { - // New field - let name = self.fields_ids_map.name(key).ok_or( - FieldIdMapMissingEntry::FieldId { - field_id: key, - process: "Computing field distribution in transform.", - }, - )?; - *field_distribution.entry(name.to_string()).or_insert(0) += 1; - } - (Some(_), None) => { - // Field removed - let name = self.fields_ids_map.name(key).ok_or( - FieldIdMapMissingEntry::FieldId { - field_id: key, - process: "Computing field distribution in transform.", - }, - )?; - match field_distribution.entry(name.to_string()) { - BEntry::Vacant(_) => { /* Bug? trying to remove a non-existing field */ - } - BEntry::Occupied(mut entry) => { - // attempt to remove one - match entry.get_mut().checked_sub(1) { - Some(0) => { - entry.remove(); - } - Some(new_val) => { - *entry.get_mut() = new_val; - } - None => { - unreachable!("Attempting to remove a field that wasn't in the field distribution") - } - } - } - } - } - (Some(_), Some(_)) => { - // Value change, no field distribution change - } - } - } - writer.insert(key, val)?; - } - - let mut original_documents = writer.into_inner()?; - // We then extract the file and reset the seek to be able to read it again. - original_documents.rewind()?; - - // We create a final writer to write the new documents in order from the sorter. - let mut writer = create_writer( - self.indexer_settings.chunk_compression_type, - self.indexer_settings.chunk_compression_level, - tempfile::tempfile()?, - ); - - // Once we have written all the documents into the final sorter, we write the nested documents - // into this writer. - // We get rids of the `Operation` byte and skip the deleted documents as well. - let mut iter = self.flattened_sorter.into_stream_merger_iter()?; - while let Some((key, val)) = iter.next()? { - // skip first byte corresponding to the operation type (Deletion or Addition). - let val = &val[1..]; - writer.insert(key, val)?; - } - let mut flattened_documents = writer.into_inner()?; - flattened_documents.rewind()?; - - let mut new_external_documents_ids_builder: Vec<_> = - self.new_external_documents_ids_builder.into_iter().collect(); - - new_external_documents_ids_builder - .sort_unstable_by(|(left, _), (right, _)| left.cmp(right)); - let mut fst_new_external_documents_ids_builder = fst::MapBuilder::memory(); - new_external_documents_ids_builder.into_iter().try_for_each(|(key, value)| { - fst_new_external_documents_ids_builder.insert(key, value) - })?; - - let old_inner_settings = InnerIndexSettings::from_index(self.index, wtxn)?; - let fields_ids_map = self.fields_ids_map; - let primary_key_id = self.index.primary_key(wtxn)?.and_then(|name| fields_ids_map.id(name)); - let mut new_inner_settings = old_inner_settings.clone(); - new_inner_settings.fields_ids_map = fields_ids_map; - - let embedding_config_updates = Default::default(); - let settings_update_only = false; - let settings_diff = InnerIndexSettingsDiff::new( - old_inner_settings, - new_inner_settings, - primary_key_id, - embedding_config_updates, - settings_update_only, - ); - - Ok(TransformOutput { - primary_key, - settings_diff, - field_distribution, - documents_count: self.documents_count, - original_documents: Some( - original_documents.into_inner().map_err(|err| err.into_error())?, - ), - flattened_documents: Some( - flattened_documents.into_inner().map_err(|err| err.into_error())?, - ), - }) - } - /// Rebind the field_ids of the provided document to their values /// based on the field_ids_maps difference between the old and the new settings, /// then fill the provided buffers with delta documents using KvWritterDelAdd. @@ -1145,6 +587,7 @@ fn drop_and_reuse(mut vec: Vec) -> Vec { #[cfg(test)] mod test { use grenad::MergeFunction; + use obkv::KvReaderU16; use super::*;