diff --git a/milli/src/update/index_documents/transform.rs b/milli/src/update/index_documents/transform.rs index c34b7876a..f58ffebf0 100644 --- a/milli/src/update/index_documents/transform.rs +++ b/milli/src/update/index_documents/transform.rs @@ -1,7 +1,7 @@ use std::borrow::Cow; use std::collections::btree_map::Entry as BEntry; use std::collections::hash_map::Entry as HEntry; -use std::collections::{HashMap, HashSet}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::fs::File; use std::io::{Read, Seek}; @@ -27,6 +27,7 @@ use crate::update::del_add::{ use crate::update::index_documents::GrenadParameters; use crate::update::settings::{InnerIndexSettings, InnerIndexSettingsDiff}; use crate::update::{AvailableDocumentsIds, UpdateIndexingStep}; +use crate::vector::settings::{EmbedderAction, WriteBackToDocuments}; use crate::{ is_faceted_by, FieldDistribution, FieldId, FieldIdMapMissingEntry, FieldsIdsMap, Index, Result, }; @@ -808,13 +809,13 @@ impl<'a, 'i> Transform<'a, 'i> { let mut new_inner_settings = old_inner_settings.clone(); new_inner_settings.fields_ids_map = fields_ids_map; - let embedding_configs_updated = false; + let embedding_config_updates = Default::default(); let settings_update_only = false; let settings_diff = InnerIndexSettingsDiff::new( old_inner_settings, new_inner_settings, primary_key_id, - embedding_configs_updated, + embedding_config_updates, settings_update_only, ); @@ -835,10 +836,13 @@ impl<'a, 'i> Transform<'a, 'i> { /// Rebind the field_ids of the provided document to their values /// based on the field_ids_maps difference between the old and the new settings, /// then fill the provided buffers with delta documents using KvWritterDelAdd. + #[allow(clippy::too_many_arguments)] // need the vectors + fid, feel free to create a struct xo xo fn rebind_existing_document( old_obkv: KvReader, settings_diff: &InnerIndexSettingsDiff, modified_faceted_fields: &HashSet, + mut injected_vectors: serde_json::Map, + old_vectors_fid: Option, original_obkv_buffer: Option<&mut Vec>, flattened_obkv_buffer: Option<&mut Vec>, ) -> Result<()> { @@ -863,7 +867,36 @@ impl<'a, 'i> Transform<'a, 'i> { let mut operations = HashMap::new(); let mut obkv_writer = KvWriter::<_, FieldId>::memory(); - for (id, val) in old_obkv.iter() { + 'write_fid: for (id, val) in old_obkv.iter() { + if !injected_vectors.is_empty() { + 'inject_vectors: { + let Some(vectors_fid) = old_vectors_fid else { break 'inject_vectors }; + + if id != vectors_fid { + break 'inject_vectors; + } + + let existing_vectors: std::result::Result< + serde_json::Map, + serde_json::Error, + > = serde_json::from_slice(val); + + let mut existing_vectors = match existing_vectors { + Ok(existing_vectors) => existing_vectors, + Err(error) => { + tracing::error!(%error, "Unexpected `_vectors` field that is not a map. Treating as an empty map"); + Default::default() + } + }; + + existing_vectors.append(&mut injected_vectors); + + operations.insert(id, DelAddOperation::DeletionAndAddition); + obkv_writer.insert(id, serde_json::to_vec(&existing_vectors).unwrap())?; + continue 'write_fid; + } + } + if is_primary_key(id) || necessary_faceted_field(id) || reindex_vectors { operations.insert(id, DelAddOperation::DeletionAndAddition); obkv_writer.insert(id, val)?; @@ -937,6 +970,35 @@ impl<'a, 'i> Transform<'a, 'i> { None }; + let readers: Result< + BTreeMap<&str, (Vec>, &RoaringBitmap)>, + > = settings_diff + .embedding_config_updates + .iter() + .filter_map(|(name, action)| { + if let EmbedderAction::WriteBackToDocuments(WriteBackToDocuments { + embedder_id, + user_provided, + }) = action + { + let readers: Result> = + self.index.arroy_readers(wtxn, *embedder_id).collect(); + match readers { + Ok(readers) => Some(Ok((name.as_str(), (readers, user_provided)))), + Err(error) => Some(Err(error)), + } + } else { + None + } + }) + .collect(); + let readers = readers?; + + let old_vectors_fid = settings_diff + .old + .fields_ids_map + .id(crate::vector::parsed_vectors::RESERVED_VECTORS_FIELD_NAME); + // We initialize the sorter with the user indexing settings. let mut flattened_sorter = if settings_diff.reindex_searchable() || settings_diff.reindex_facets() { @@ -963,10 +1025,41 @@ impl<'a, 'i> Transform<'a, 'i> { InternalError::DatabaseMissingEntry { db_name: db_name::DOCUMENTS, key: None }, )?; + let injected_vectors: std::result::Result< + serde_json::Map, + arroy::Error, + > = readers + .iter() + .filter_map(|(name, (readers, user_provided))| { + if !user_provided.contains(docid) { + return None; + } + let mut vectors = Vec::new(); + for reader in readers { + let Some(vector) = reader.item_vector(wtxn, docid).transpose() else { + break; + }; + + match vector { + Ok(vector) => vectors.push(vector), + Err(error) => return Some(Err(error)), + } + } + if vectors.is_empty() { + return None; + } + Some(Ok((name.to_string(), serde_json::to_value(vectors).unwrap()))) + }) + .collect(); + + let injected_vectors = injected_vectors?; + Self::rebind_existing_document( old_obkv, &settings_diff, &modified_faceted_fields, + injected_vectors, + old_vectors_fid, Some(&mut original_obkv_buffer).filter(|_| original_sorter.is_some()), Some(&mut flattened_obkv_buffer).filter(|_| flattened_sorter.is_some()), )?; @@ -983,6 +1076,23 @@ impl<'a, 'i> Transform<'a, 'i> { } } + let mut writers = Vec::new(); + + // delete all vectors from the embedders that need removal + for (_, (readers, _)) in readers { + for reader in readers { + let dimensions = reader.dimensions(); + let arroy_index = reader.index(); + drop(reader); + let writer = arroy::Writer::new(self.index.vector_arroy, arroy_index, dimensions); + writers.push(writer); + } + } + + for writer in writers { + writer.clear(wtxn)?; + } + let grenad_params = GrenadParameters { chunk_compression_type: self.indexer_settings.chunk_compression_type, chunk_compression_level: self.indexer_settings.chunk_compression_level,