diff --git a/milli/src/search/similar.rs b/milli/src/search/similar.rs index 0cb8d723d..e408c94b1 100644 --- a/milli/src/search/similar.rs +++ b/milli/src/search/similar.rs @@ -4,7 +4,7 @@ use ordered_float::OrderedFloat; use roaring::RoaringBitmap; use crate::score_details::{self, ScoreDetails}; -use crate::vector::Embedder; +use crate::vector::{ArroyWrapper, Embedder}; use crate::{filtered_universe, DocumentId, Filter, Index, Result, SearchResult}; pub struct Similar<'a> { @@ -71,23 +71,13 @@ impl<'a> Similar<'a> { .get(self.rtxn, &self.embedder_name)? .ok_or_else(|| crate::UserError::InvalidEmbedder(self.embedder_name.to_owned()))?; - let mut results = Vec::new(); - - for reader in self.index.arroy_readers(self.rtxn, embedder_index, self.quantized) { - let nns_by_item = reader?.nns_by_item( - self.rtxn, - self.id, - self.limit + self.offset + 1, - Some(&universe), - )?; - if let Some(mut nns_by_item) = nns_by_item { - results.append(&mut nns_by_item); - } else { - break; - } - } - - results.sort_unstable_by_key(|(_, distance)| OrderedFloat(*distance)); + let reader = ArroyWrapper::new(self.index.vector_arroy, embedder_index, self.quantized); + let results = reader.nns_by_item( + self.rtxn, + self.id, + self.limit + self.offset + 1, + Some(&universe), + )?; let mut documents_ids = Vec::with_capacity(self.limit); let mut document_scores = Vec::with_capacity(self.limit); diff --git a/milli/src/update/index_documents/typed_chunk.rs b/milli/src/update/index_documents/typed_chunk.rs index e340137e2..e118420d8 100644 --- a/milli/src/update/index_documents/typed_chunk.rs +++ b/milli/src/update/index_documents/typed_chunk.rs @@ -680,7 +680,7 @@ pub(crate) fn write_typed_chunk_into_index( let mut iter = merger.into_stream_merger_iter()?; while let Some((key, _)) = iter.next()? { let docid = key.try_into().map(DocumentId::from_be_bytes).unwrap(); - writer.del_item(wtxn, expected_dimension, docid)?; + writer.del_item_raw(wtxn, expected_dimension, docid)?; } // add generated embeddings @@ -708,7 +708,7 @@ pub(crate) fn write_typed_chunk_into_index( embeddings.embedding_count(), ))); } - writer.add_items(wtxn, expected_dimension, docid, embeddings)?; + writer.add_items(wtxn, docid, &embeddings)?; } // perform the manual diff @@ -723,51 +723,14 @@ pub(crate) fn write_typed_chunk_into_index( if let Some(value) = vector_deladd_obkv.get(DelAdd::Deletion) { let vector: Vec = pod_collect_to_vec(value); - let mut deleted_index = None; - for (index, writer) in writers.iter().enumerate() { - let Some(candidate) = writer.item_vector(wtxn, docid)? else { - // uses invariant: vectors are packed in the first writers. - break; - }; - if candidate == vector { - writer.del_item(wtxn, expected_dimension, docid)?; - deleted_index = Some(index); - } - } - - // 🥲 enforce invariant: vectors are packed in the first writers. - if let Some(deleted_index) = deleted_index { - let mut last_index_with_a_vector = None; - for (index, writer) in writers.iter().enumerate().skip(deleted_index) { - let Some(candidate) = writer.item_vector(wtxn, docid)? else { - break; - }; - last_index_with_a_vector = Some((index, candidate)); - } - if let Some((last_index, vector)) = last_index_with_a_vector { - // unwrap: computed the index from the list of writers - let writer = writers.get(last_index).unwrap(); - writer.del_item(wtxn, expected_dimension, docid)?; - writers.get(deleted_index).unwrap().add_item( - wtxn, - expected_dimension, - docid, - &vector, - )?; - } - } + writer.del_item(wtxn, docid, &vector)?; } if let Some(value) = vector_deladd_obkv.get(DelAdd::Addition) { let vector = pod_collect_to_vec(value); // overflow was detected during vector extraction. - for writer in &writers { - if !writer.contains_item(wtxn, expected_dimension, docid)? { - writer.add_item(wtxn, expected_dimension, docid, &vector)?; - break; - } - } + writer.add_item(wtxn, docid, &vector)?; } } diff --git a/milli/src/vector/mod.rs b/milli/src/vector/mod.rs index 644826dcd..54765cfef 100644 --- a/milli/src/vector/mod.rs +++ b/milli/src/vector/mod.rs @@ -97,49 +97,165 @@ impl ArroyWrapper { Ok(()) } + pub fn add_items( + &self, + wtxn: &mut RwTxn, + item_id: arroy::ItemId, + embeddings: &Embeddings, + ) -> Result<(), arroy::Error> { + let dimension = embeddings.dimension(); + for (index, vector) in arroy_db_range_for_embedder(self.index).zip(embeddings.iter()) { + if self.quantized { + arroy::Writer::new(self.quantized_db(), index, dimension) + .add_item(wtxn, item_id, vector)? + } else { + arroy::Writer::new(self.angular_db(), index, dimension) + .add_item(wtxn, item_id, vector)? + } + } + Ok(()) + } + pub fn add_item( &self, wtxn: &mut RwTxn, - dimension: usize, item_id: arroy::ItemId, vector: &[f32], ) -> Result<(), arroy::Error> { - if self.quantized { - arroy::Writer::new(self.quantized_db(), self.index, dimension) - .add_item(wtxn, item_id, vector) - } else { - arroy::Writer::new(self.angular_db(), self.index, dimension) - .add_item(wtxn, item_id, vector) + let dimension = vector.len(); + + for index in arroy_db_range_for_embedder(self.index) { + if self.quantized { + let writer = arroy::Writer::new(self.quantized_db(), index, dimension); + if !writer.contains_item(wtxn, item_id)? { + writer.add_item(wtxn, item_id, &vector)?; + break; + } + } else { + arroy::Writer::new(self.angular_db(), index, dimension) + .add_item(wtxn, item_id, vector)? + } } + + Ok(()) } - pub fn del_item( + pub fn del_item_raw( &self, wtxn: &mut RwTxn, dimension: usize, item_id: arroy::ItemId, ) -> Result { - if self.quantized { - arroy::Writer::new(self.quantized_db(), self.index, dimension).del_item(wtxn, item_id) - } else { - arroy::Writer::new(self.angular_db(), self.index, dimension).del_item(wtxn, item_id) + for index in arroy_db_range_for_embedder(self.index) { + if self.quantized { + let writer = arroy::Writer::new(self.quantized_db(), index, dimension); + if writer.del_item(wtxn, item_id)? { + return Ok(true); + } + } else { + let writer = arroy::Writer::new(self.angular_db(), index, dimension); + if writer.del_item(wtxn, item_id)? { + return Ok(true); + } + } } + + Ok(false) + } + + pub fn del_item( + &self, + wtxn: &mut RwTxn, + itemid: arroy::ItemId, + vector: &[f32], + ) -> Result { + let dimension = vector.len(); + let mut deleted_index = None; + + for index in arroy_db_range_for_embedder(self.index) { + if self.quantized { + let writer = arroy::Writer::new(self.quantized_db(), index, dimension); + let Some(candidate) = writer.item_vector(wtxn, itemid)? else { + // uses invariant: vectors are packed in the first writers. + break; + }; + if candidate == vector { + writer.del_item(wtxn, itemid)?; + deleted_index = Some(index); + } + } else { + let writer = arroy::Writer::new(self.angular_db(), index, dimension); + let Some(candidate) = writer.item_vector(wtxn, itemid)? else { + // uses invariant: vectors are packed in the first writers. + break; + }; + if candidate == vector { + writer.del_item(wtxn, itemid)?; + deleted_index = Some(index); + } + } + } + + // 🥲 enforce invariant: vectors are packed in the first writers. + if let Some(deleted_index) = deleted_index { + let mut last_index_with_a_vector = None; + for index in arroy_db_range_for_embedder(self.index).skip(deleted_index as usize) { + if self.quantized { + let writer = arroy::Writer::new(self.quantized_db(), index, dimension); + let Some(candidate) = writer.item_vector(wtxn, itemid)? else { + break; + }; + last_index_with_a_vector = Some((index, candidate)); + } else { + let writer = arroy::Writer::new(self.angular_db(), index, dimension); + let Some(candidate) = writer.item_vector(wtxn, itemid)? else { + break; + }; + last_index_with_a_vector = Some((index, candidate)); + } + } + if let Some((last_index, vector)) = last_index_with_a_vector { + if self.quantized { + // unwrap: computed the index from the list of writers + let writer = arroy::Writer::new(self.quantized_db(), last_index, dimension); + writer.del_item(wtxn, itemid)?; + let writer = arroy::Writer::new(self.quantized_db(), deleted_index, dimension); + writer.add_item(wtxn, itemid, &vector)?; + } else { + // unwrap: computed the index from the list of writers + let writer = arroy::Writer::new(self.angular_db(), last_index, dimension); + writer.del_item(wtxn, itemid)?; + let writer = arroy::Writer::new(self.angular_db(), deleted_index, dimension); + writer.add_item(wtxn, itemid, &vector)?; + } + } + } + Ok(deleted_index.is_some()) } pub fn clear(&self, wtxn: &mut RwTxn, dimension: usize) -> Result<(), arroy::Error> { - if self.quantized { - arroy::Writer::new(self.quantized_db(), self.index, dimension).clear(wtxn) - } else { - arroy::Writer::new(self.angular_db(), self.index, dimension).clear(wtxn) + for index in arroy_db_range_for_embedder(self.index) { + if self.quantized { + arroy::Writer::new(self.quantized_db(), index, dimension).clear(wtxn)?; + } else { + arroy::Writer::new(self.angular_db(), index, dimension).clear(wtxn)?; + } } + Ok(()) } pub fn is_empty(&self, rtxn: &RoTxn, dimension: usize) -> Result { - if self.quantized { - arroy::Writer::new(self.quantized_db(), self.index, dimension).is_empty(rtxn) - } else { - arroy::Writer::new(self.angular_db(), self.index, dimension).is_empty(rtxn) + for index in arroy_db_range_for_embedder(self.index) { + let empty = if self.quantized { + arroy::Writer::new(self.quantized_db(), index, dimension).is_empty(rtxn)? + } else { + arroy::Writer::new(self.angular_db(), index, dimension).is_empty(rtxn)? + }; + if !empty { + return Ok(false); + } } + Ok(true) } pub fn contains_item( @@ -148,11 +264,18 @@ impl ArroyWrapper { dimension: usize, item: arroy::ItemId, ) -> Result { - if self.quantized { - arroy::Writer::new(self.quantized_db(), self.index, dimension).contains_item(rtxn, item) - } else { - arroy::Writer::new(self.angular_db(), self.index, dimension).contains_item(rtxn, item) + for index in arroy_db_range_for_embedder(self.index) { + let contains = if self.quantized { + arroy::Writer::new(self.quantized_db(), index, dimension) + .contains_item(rtxn, item)? + } else { + arroy::Writer::new(self.angular_db(), index, dimension).contains_item(rtxn, item)? + }; + if contains { + return Ok(contains); + } } + Ok(false) } pub fn nns_by_item( @@ -161,14 +284,26 @@ impl ArroyWrapper { item: ItemId, limit: usize, filter: Option<&RoaringBitmap>, - ) -> Result>, arroy::Error> { - if self.quantized { - arroy::Reader::open(rtxn, self.index, self.quantized_db())? - .nns_by_item(rtxn, item, limit, None, None, filter) - } else { - arroy::Reader::open(rtxn, self.index, self.angular_db())? - .nns_by_item(rtxn, item, limit, None, None, filter) + ) -> Result, arroy::Error> { + let mut results = Vec::new(); + + for index in arroy_db_range_for_embedder(self.index) { + let ret = if self.quantized { + arroy::Reader::open(rtxn, index, self.quantized_db())? + .nns_by_item(rtxn, item, limit, None, None, filter)? + } else { + arroy::Reader::open(rtxn, index, self.angular_db())? + .nns_by_item(rtxn, item, limit, None, None, filter)? + }; + if let Some(mut ret) = ret { + results.append(&mut ret); + } else { + break; + } } + results.sort_unstable_by_key(|(_, distance)| OrderedFloat(*distance)); + + Ok(results) } pub fn nns_by_vector( @@ -178,21 +313,36 @@ impl ArroyWrapper { limit: usize, filter: Option<&RoaringBitmap>, ) -> Result, arroy::Error> { - if self.quantized { - arroy::Reader::open(txn, self.index, self.quantized_db())? - .nns_by_vector(txn, item, limit, None, None, filter) - } else { - arroy::Reader::open(txn, self.index, self.angular_db())? - .nns_by_vector(txn, item, limit, None, None, filter) + let mut results = Vec::new(); + + for index in arroy_db_range_for_embedder(self.index) { + let mut ret = if self.quantized { + arroy::Reader::open(txn, index, self.quantized_db())? + .nns_by_vector(txn, item, limit, None, None, filter)? + } else { + arroy::Reader::open(txn, index, self.angular_db())? + .nns_by_vector(txn, item, limit, None, None, filter)? + }; + results.append(&mut ret); } + + results.sort_unstable_by_key(|(_, distance)| OrderedFloat(*distance)); + + Ok(results) } pub fn item_vector(&self, rtxn: &RoTxn, docid: u32) -> Result>, arroy::Error> { - if self.quantized { - arroy::Reader::open(rtxn, self.index, self.quantized_db())?.item_vector(rtxn, docid) - } else { - arroy::Reader::open(rtxn, self.index, self.angular_db())?.item_vector(rtxn, docid) + for index in arroy_db_range_for_embedder(self.index) { + let ret = if self.quantized { + arroy::Reader::open(rtxn, index, self.quantized_db())?.item_vector(rtxn, docid)? + } else { + arroy::Reader::open(rtxn, index, self.angular_db())?.item_vector(rtxn, docid)? + }; + if ret.is_some() { + return Ok(ret); + } } + Ok(None) } fn angular_db(&self) -> arroy::Database {