diff --git a/Cargo.lock b/Cargo.lock index ed6d0c291..fba78b3b6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -383,7 +383,7 @@ dependencies = [ [[package]] name = "arroy" version = "0.1.0" -source = "git+https://github.com/meilisearch/arroy.git#4b59476f457e5443ff250ea10d40d8b66a692674" +source = "git+https://github.com/meilisearch/arroy.git#0079af0ec960bc9c51dd66e898a6b5e980cbb083" dependencies = [ "bytemuck", "byteorder", diff --git a/milli/src/error.rs b/milli/src/error.rs index 032fd63a7..3d07590b0 100644 --- a/milli/src/error.rs +++ b/milli/src/error.rs @@ -61,6 +61,8 @@ pub enum InternalError { AbortedIndexation, #[error("The matching words list contains at least one invalid member.")] InvalidMatchingWords, + #[error(transparent)] + ArroyError(#[from] arroy::Error), } #[derive(Error, Debug)] @@ -190,6 +192,24 @@ only composed of alphanumeric characters (a-z A-Z 0-9), hyphens (-) and undersco InvalidPromptForEmbeddings(String, crate::prompt::error::NewPromptError), } +impl From for Error { + fn from(value: arroy::Error) -> Self { + match value { + arroy::Error::Heed(heed) => heed.into(), + arroy::Error::Io(io) => io.into(), + arroy::Error::InvalidVecDimension { expected, received } => { + Error::UserError(UserError::InvalidVectorDimensions { expected, found: received }) + } + arroy::Error::DatabaseFull + | arroy::Error::InvalidItemAppend + | arroy::Error::UnmatchingDistance { .. } + | arroy::Error::MissingMetadata => { + Error::InternalError(InternalError::ArroyError(value)) + } + } + } +} + #[derive(Error, Debug)] pub enum GeoError { #[error("The `_geo` field in the document with the id: `{document_id}` is not an object. Was expecting an object with the `_geo.lat` and `_geo.lng` fields but instead got `{value}`.")] diff --git a/milli/src/index.rs b/milli/src/index.rs index c494f2f2b..c5e190d38 100644 --- a/milli/src/index.rs +++ b/milli/src/index.rs @@ -70,7 +70,6 @@ pub mod main_key { pub const SORT_FACET_VALUES_BY: &str = "sort-facet-values-by"; pub const PAGINATION_MAX_TOTAL_HITS: &str = "pagination-max-total-hits"; pub const PROXIMITY_PRECISION: &str = "proximity-precision"; - pub const VECTOR_UNAVAILABLE_VECTOR_IDS: &str = "vector-unavailable-vector-ids"; pub const EMBEDDING_CONFIGS: &str = "embedding_configs"; } @@ -97,8 +96,6 @@ pub mod db_name { pub const FACET_ID_STRING_FST: &str = "facet-id-string-fst"; pub const FIELD_ID_DOCID_FACET_F64S: &str = "field-id-docid-facet-f64s"; pub const FIELD_ID_DOCID_FACET_STRINGS: &str = "field-id-docid-facet-strings"; - pub const VECTOR_ID_DOCID: &str = "vector-id-docids"; - pub const VECTOR_DOCID_IDS: &str = "vector-docid-ids"; pub const VECTOR_EMBEDDER_CATEGORY_ID: &str = "vector-embedder-category-id"; pub const VECTOR_ARROY: &str = "vector-arroy"; pub const DOCUMENTS: &str = "documents"; @@ -167,16 +164,10 @@ pub struct Index { /// Maps the document id, the facet field id and the strings. pub field_id_docid_facet_strings: Database, - /// Maps a vector id to its document id. - pub vector_id_docid: Database, - /// Maps a doc id to its vector ids. - pub docid_vector_ids: Database, - /// Maps an embedder name to its id in the arroy store. - pub embedder_category_id: Database, - + pub embedder_category_id: Database, /// Vector store based on arroy™. - pub vector_arroy: arroy::Database, + pub vector_arroy: arroy::Database, /// Maps the document id to the document as an obkv store. pub(crate) documents: Database, @@ -191,7 +182,7 @@ impl Index { ) -> Result { use db_name::*; - options.max_dbs(27); + options.max_dbs(25); let env = options.open(path)?; let mut wtxn = env.write_txn()?; @@ -232,8 +223,6 @@ impl Index { let field_id_docid_facet_strings = env.create_database(&mut wtxn, Some(FIELD_ID_DOCID_FACET_STRINGS))?; // vector stuff - let vector_id_docid = env.create_database(&mut wtxn, Some(VECTOR_ID_DOCID))?; - let docid_vector_ids = env.create_database(&mut wtxn, Some(VECTOR_DOCID_IDS))?; let embedder_category_id = env.create_database(&mut wtxn, Some(VECTOR_EMBEDDER_CATEGORY_ID))?; let vector_arroy = env.create_database(&mut wtxn, Some(VECTOR_ARROY))?; @@ -267,9 +256,7 @@ impl Index { facet_id_is_empty_docids, field_id_docid_facet_f64s, field_id_docid_facet_strings, - vector_id_docid, vector_arroy, - docid_vector_ids, embedder_category_id, documents, }) @@ -1516,30 +1503,6 @@ impl Index { .get(rtxn, main_key::EMBEDDING_CONFIGS)? .unwrap_or_default()) } - - pub(crate) fn put_unavailable_vector_ids( - &self, - wtxn: &mut RwTxn<'_>, - unavailable_vector_ids: RoaringBitmap, - ) -> heed::Result<()> { - self.main.remap_types::().put( - wtxn, - main_key::VECTOR_UNAVAILABLE_VECTOR_IDS, - &unavailable_vector_ids, - ) - } - - pub(crate) fn delete_unavailable_vector_ids(&self, wtxn: &mut RwTxn<'_>) -> heed::Result { - self.main.remap_key_type::().delete(wtxn, main_key::VECTOR_UNAVAILABLE_VECTOR_IDS) - } - - pub fn unavailable_vector_ids(&self, rtxn: &RoTxn<'_>) -> Result { - Ok(self - .main - .remap_types::() - .get(rtxn, main_key::VECTOR_UNAVAILABLE_VECTOR_IDS)? - .unwrap_or_default()) - } } #[cfg(test)] diff --git a/milli/src/search/new/mod.rs b/milli/src/search/new/mod.rs index 372c89601..ad5c59f99 100644 --- a/milli/src/search/new/mod.rs +++ b/milli/src/search/new/mod.rs @@ -262,6 +262,7 @@ fn get_ranking_rules_for_vector<'ctx>( ctx: &SearchContext<'ctx>, sort_criteria: &Option>, geo_strategy: geo_sort::Strategy, + limit_plus_offset: usize, target: &[f32], ) -> Result>> { // query graph search @@ -283,7 +284,12 @@ fn get_ranking_rules_for_vector<'ctx>( | crate::Criterion::Exactness => { if !vector { let vector_candidates = ctx.index.documents_ids(ctx.txn)?; - let vector_sort = VectorSort::new(ctx, target.to_vec(), vector_candidates)?; + let vector_sort = VectorSort::new( + ctx, + target.to_vec(), + vector_candidates, + limit_plus_offset, + )?; ranking_rules.push(Box::new(vector_sort)); vector = true; } @@ -509,7 +515,8 @@ pub fn execute_vector_search( /// FIXME: input universe = universe & documents_with_vectors // for now if we're computing embeddings for ALL documents, we can assume that this is just universe - let ranking_rules = get_ranking_rules_for_vector(ctx, sort_criteria, geo_strategy, vector)?; + let ranking_rules = + get_ranking_rules_for_vector(ctx, sort_criteria, geo_strategy, from + length, vector)?; let mut placeholder_search_logger = logger::DefaultSearchLogger; let placeholder_search_logger: &mut dyn SearchLogger = diff --git a/milli/src/search/new/vector_sort.rs b/milli/src/search/new/vector_sort.rs index 59b7a72c2..9bf13c631 100644 --- a/milli/src/search/new/vector_sort.rs +++ b/milli/src/search/new/vector_sort.rs @@ -1,48 +1,83 @@ -use std::future::Future; use std::iter::FromIterator; -use std::pin::Pin; -use nolife::DynBoxScope; +use ordered_float::OrderedFloat; use roaring::RoaringBitmap; use super::ranking_rules::{RankingRule, RankingRuleOutput, RankingRuleQueryTrait}; -use crate::distance::NDotProductPoint; -use crate::index::Hnsw; use crate::score_details::{self, ScoreDetails}; -use crate::{Result, SearchContext, SearchLogger, UserError}; +use crate::{DocumentId, Result, SearchContext, SearchLogger}; -pub struct VectorSort<'ctx, Q: RankingRuleQueryTrait> { +pub struct VectorSort { query: Option, target: Vec, vector_candidates: RoaringBitmap, - reader: arroy::Reader<'ctx, arroy::distances::DotProduct>, + cached_sorted_docids: std::vec::IntoIter<(DocumentId, f32, Vec)>, limit: usize, } -impl<'ctx, Q: RankingRuleQueryTrait> VectorSort<'ctx, Q> { +impl VectorSort { pub fn new( - ctx: &'ctx SearchContext, + _ctx: &SearchContext, target: Vec, vector_candidates: RoaringBitmap, limit: usize, ) -> Result { - /// FIXME? what to do in case of missing metadata - let reader = arroy::Reader::open(ctx.txn, 0, ctx.index.vector_arroy)?; + Ok(Self { + query: None, + target, + vector_candidates, + cached_sorted_docids: Default::default(), + limit, + }) + } - let target_clone = target.clone(); + fn fill_buffer(&mut self, ctx: &mut SearchContext<'_>) -> Result<()> { + let readers: std::result::Result, _> = (0..=u8::MAX) + .map_while(|k| { + arroy::Reader::open(ctx.txn, k.into(), ctx.index.vector_arroy) + .map(Some) + .or_else(|e| match e { + arroy::Error::MissingMetadata => Ok(None), + e => Err(e), + }) + .transpose() + }) + .collect(); - Ok(Self { query: None, target, vector_candidates, reader, limit }) + let readers = readers?; + + let target = &self.target; + let mut results = Vec::new(); + + for reader in readers.iter() { + let nns_by_vector = reader.nns_by_vector( + ctx.txn, + &target, + self.limit, + None, + Some(&self.vector_candidates), + )?; + let vectors: std::result::Result, _> = nns_by_vector + .iter() + .map(|(docid, _)| reader.item_vector(ctx.txn, *docid).transpose().unwrap()) + .collect(); + let vectors = vectors?; + results.extend(nns_by_vector.into_iter().zip(vectors).map(|((x, y), z)| (x, y, z))); + } + results.sort_unstable_by_key(|(_, distance, _)| OrderedFloat(*distance)); + self.cached_sorted_docids = results.into_iter(); + Ok(()) } } -impl<'ctx, Q: RankingRuleQueryTrait> RankingRule<'ctx, Q> for VectorSort<'ctx, Q> { +impl<'ctx, Q: RankingRuleQueryTrait> RankingRule<'ctx, Q> for VectorSort { fn id(&self) -> String { "vector_sort".to_owned() } fn start_iteration( &mut self, - _ctx: &mut SearchContext<'ctx>, + ctx: &mut SearchContext<'ctx>, _logger: &mut dyn SearchLogger, universe: &RoaringBitmap, query: &Q, @@ -51,7 +86,7 @@ impl<'ctx, Q: RankingRuleQueryTrait> RankingRule<'ctx, Q> for VectorSort<'ctx, Q self.query = Some(query.clone()); self.vector_candidates &= universe; - + self.fill_buffer(ctx)?; Ok(()) } @@ -75,40 +110,24 @@ impl<'ctx, Q: RankingRuleQueryTrait> RankingRule<'ctx, Q> for VectorSort<'ctx, Q }), })); } - let target = &self.target; - let vector_candidates = &self.vector_candidates; - let result = self.reader.nns_by_vector(ctx.txn, &target, count, search_k, candidates) - - scope.enter(|it| { - for item in it.by_ref() { - let item: Item = item; - let index = item.pid.into_inner(); - let docid = ctx.index.vector_id_docid.get(ctx.txn, &index)?.unwrap(); - - if vector_candidates.contains(docid) { - return Ok(Some(RankingRuleOutput { - query, - candidates: RoaringBitmap::from_iter([docid]), - score: ScoreDetails::Vector(score_details::Vector { - target_vector: target.clone(), - value_similarity: Some(( - item.point.clone().into_inner(), - 1.0 - item.distance, - )), - }), - })); - } + while let Some((docid, distance, vector)) = self.cached_sorted_docids.next() { + if self.vector_candidates.contains(docid) { + return Ok(Some(RankingRuleOutput { + query, + candidates: RoaringBitmap::from_iter([docid]), + score: ScoreDetails::Vector(score_details::Vector { + target_vector: self.target.clone(), + value_similarity: Some((vector, 1.0 - distance)), + }), + })); } - Ok(Some(RankingRuleOutput { - query, - candidates: universe.clone(), - score: ScoreDetails::Vector(score_details::Vector { - target_vector: target.clone(), - value_similarity: None, - }), - })) - }) + } + + // if we got out of this loop it means we've exhausted our cache. + // we need to refill it and run the function again. + self.fill_buffer(ctx)?; + self.next_bucket(ctx, _logger, universe) } fn end_iteration(&mut self, _ctx: &mut SearchContext<'ctx>, _logger: &mut dyn SearchLogger) { diff --git a/milli/src/update/clear_documents.rs b/milli/src/update/clear_documents.rs index 3b1a6c5d8..a6c7ff2b1 100644 --- a/milli/src/update/clear_documents.rs +++ b/milli/src/update/clear_documents.rs @@ -42,9 +42,7 @@ impl<'t, 'i> ClearDocuments<'t, 'i> { facet_id_is_empty_docids, field_id_docid_facet_f64s, field_id_docid_facet_strings, - vector_id_docid, vector_arroy, - docid_vector_ids, embedder_category_id: _, documents, } = self.index; @@ -86,8 +84,6 @@ impl<'t, 'i> ClearDocuments<'t, 'i> { field_id_docid_facet_strings.clear(self.wtxn)?; // vector vector_arroy.clear(self.wtxn)?; - vector_id_docid.clear(self.wtxn)?; - docid_vector_ids.clear(self.wtxn)?; documents.clear(self.wtxn)?; diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index eaac26dd3..472c77111 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -418,7 +418,7 @@ where } // needs to be dropped to avoid channel waiting lock. - drop(lmdb_writer_sx) + drop(lmdb_writer_sx); }); let index_is_empty = self.index.number_of_documents(self.wtxn)? == 0; @@ -435,6 +435,8 @@ where let mut word_docids = None; let mut exact_word_docids = None; + let mut dimension = None; + for result in lmdb_writer_rx { if (self.should_abort)() { return Err(Error::InternalError(InternalError::AbortedIndexation)); @@ -464,6 +466,20 @@ where word_position_docids = Some(cloneable_chunk); TypedChunk::WordPositionDocids(chunk) } + TypedChunk::VectorPoints { + expected_dimension, + remove_vectors, + embeddings, + manual_vectors, + } => { + dimension = Some(expected_dimension); + TypedChunk::VectorPoints { + remove_vectors, + embeddings, + expected_dimension, + manual_vectors, + } + } otherwise => otherwise, }; @@ -490,9 +506,6 @@ where } } - let writer = arroy::Writer::prepare(self.wtxn, self.index.vector_arroy, 0, 0)?; - writer.build(self.wtxn, &mut rand::rngs::StdRng::from_entropy(), None)?; - // We write the field distribution into the main database self.index.put_field_distribution(self.wtxn, &field_distribution)?; @@ -500,6 +513,23 @@ where self.index.put_primary_key(self.wtxn, &primary_key)?; let number_of_documents = self.index.number_of_documents(self.wtxn)?; + if let Some(dimension) = dimension { + let wtxn = &mut *self.wtxn; + let vector_arroy = self.index.vector_arroy; + pool.install(|| { + /// FIXME: do for each embedder + let mut rng = rand::rngs::StdRng::from_entropy(); + for k in 0..=u8::MAX { + let writer = arroy::Writer::prepare(wtxn, vector_arroy, k.into(), dimension)?; + if writer.is_empty(wtxn)? { + break; + } + writer.build(wtxn, &mut rng, None)?; + } + Result::Ok(()) + })?; + } + self.execute_prefix_databases( word_docids, exact_word_docids, diff --git a/milli/src/update/index_documents/typed_chunk.rs b/milli/src/update/index_documents/typed_chunk.rs index bc82518ca..82397ed3d 100644 --- a/milli/src/update/index_documents/typed_chunk.rs +++ b/milli/src/update/index_documents/typed_chunk.rs @@ -8,9 +8,7 @@ use charabia::{Language, Script}; use grenad::MergerBuilder; use heed::types::Bytes; use heed::{PutFlags, RwTxn}; -use log::error; use obkv::{KvReader, KvWriter}; -use ordered_float::OrderedFloat; use roaring::RoaringBitmap; use super::helpers::{ @@ -18,16 +16,12 @@ use super::helpers::{ valid_lmdb_key, CursorClonableMmap, }; use super::{ClonableMmap, MergeFn}; -use crate::distance::NDotProductPoint; -use crate::error::UserError; use crate::external_documents_ids::{DocumentOperation, DocumentOperationKind}; use crate::facet::FacetType; use crate::index::db_name::DOCUMENTS; -use crate::index::Hnsw; use crate::update::del_add::{deladd_serialize_add_side, DelAdd, KvReaderDelAdd}; use crate::update::facet::FacetsUpdate; use crate::update::index_documents::helpers::{as_cloneable_grenad, try_split_array_at}; -use crate::update::{available_documents_ids, AvailableDocumentsIds}; use crate::{lat_lng_to_xyz, DocumentId, FieldId, GeoPoint, Index, Result, SerializationError}; pub(crate) enum TypedChunk { @@ -374,28 +368,28 @@ pub(crate) fn write_typed_chunk_into_index( return Ok((RoaringBitmap::new(), is_merged_database)); } - let mut unavailable_vector_ids = index.unavailable_vector_ids(&wtxn)?; /// FIXME: allow customizing distance - /// FIXME: allow customizing index - let writer = arroy::Writer::prepare(wtxn, index.vector_arroy, 0, expected_dimension)?; + let writers: std::result::Result, _> = (0..=u8::MAX) + .map(|k| { + /// FIXME: allow customizing index and then do index << 8 + k + arroy::Writer::prepare(wtxn, index.vector_arroy, k.into(), expected_dimension) + }) + .collect(); + let writers = writers?; // remove vectors for docids we want them removed let mut cursor = remove_vectors.into_cursor()?; while let Some((key, _)) = cursor.move_on_next()? { let docid = key.try_into().map(DocumentId::from_be_bytes).unwrap(); - let Some(to_remove_vector_ids) = index.docid_vector_ids.get(&wtxn, &docid)? else { - continue; - }; - unavailable_vector_ids -= to_remove_vector_ids; - - for item in to_remove_vector_ids { - writer.del_item(wtxn, item)?; + for writer in &writers { + // Uses invariant: vectors are packed in the first writers. + if !writer.del_item(wtxn, docid)? { + break; + } } } - let mut available_vector_ids = - AvailableDocumentsIds::from_documents_ids(&unavailable_vector_ids); // add generated embeddings if let Some(embeddings) = embeddings { let mut cursor = embeddings.into_cursor()?; @@ -408,19 +402,10 @@ pub(crate) fn write_typed_chunk_into_index( // code error if we somehow got the wrong dimension .unwrap(); - let mut new_vector_ids = RoaringBitmap::new(); - for embedding in embeddings.iter() { - /// FIXME: error when you get over 9000 - let next_vector_id = available_vector_ids.next().unwrap(); - unavailable_vector_ids.insert(next_vector_id); - - new_vector_ids.insert(next_vector_id); - - index.vector_id_docid.put(wtxn, &next_vector_id, &docid)?; - - writer.add_item(wtxn, next_vector_id, embedding)?; + /// FIXME: detect overflow + for (embedding, writer) in embeddings.iter().zip(&writers) { + writer.add_item(wtxn, docid, embedding)?; } - index.docid_vector_ids.put(wtxn, &docid, &new_vector_ids)?; } } @@ -433,44 +418,52 @@ pub(crate) fn write_typed_chunk_into_index( let vector_deladd_obkv = KvReaderDelAdd::new(value); if let Some(value) = vector_deladd_obkv.get(DelAdd::Deletion) { - let vector = pod_collect_to_vec(value); - let Some(mut docid_vector_ids) = index.docid_vector_ids.get(&wtxn, &docid)? - else { - error!("Unable to delete the vector: {:?}", vector); - continue; - }; - for item in docid_vector_ids { - /// FIXME: comparing the vectors by equality is inefficient, and dangerous by perfect equality - let candidate = writer.item_vector(&wtxn, item)?.expect("Inconsistent dbs"); - if candidate == vector { - writer.del_item(wtxn, item)?; - unavailable_vector_ids.remove(item); - index.vector_id_docid.delete(wtxn, &item)?; - docid_vector_ids.remove(item); + let vector: Vec = pod_collect_to_vec(value); + + let mut deleted_index = None; + for (index, writer) in writers.iter().enumerate() { + let Some(candidate) = writer.item_vector(&wtxn, docid)? else { + // uses invariant: vectors are packed in the first writers. break; + }; + if candidate == vector { + writer.del_item(wtxn, docid)?; + deleted_index = Some(index); + } + } + + // 🥲 enforce invariant: vectors are packed in the first writers. + if let Some(deleted_index) = deleted_index { + let mut last_index_with_a_vector = None; + for (index, writer) in writers.iter().enumerate().skip(deleted_index) { + let Some(candidate) = writer.item_vector(&wtxn, docid)? else { + break; + }; + last_index_with_a_vector = Some((index, candidate)); + } + if let Some((last_index, vector)) = last_index_with_a_vector { + // unwrap: computed the index from the list of writers + let writer = writers.get(last_index).unwrap(); + writer.del_item(wtxn, docid)?; + writers.get(deleted_index).unwrap().add_item(wtxn, docid, &vector)?; } } - index.docid_vector_ids.put(wtxn, &docid, &docid_vector_ids)?; } - let mut available_vector_ids = - AvailableDocumentsIds::from_documents_ids(&unavailable_vector_ids); if let Some(value) = vector_deladd_obkv.get(DelAdd::Addition) { let vector = pod_collect_to_vec(value); - let next_vector_id = available_vector_ids.next().unwrap(); - writer.add_item(wtxn, next_vector_id, &vector)?; - unavailable_vector_ids.insert(next_vector_id); - index.vector_id_docid.put(wtxn, &next_vector_id, &docid)?; - let mut docid_vector_ids = - index.docid_vector_ids.get(&wtxn, &docid)?.unwrap_or_default(); - docid_vector_ids.insert(next_vector_id); - index.docid_vector_ids.put(wtxn, &docid, &docid_vector_ids)?; + /// FIXME: detect overflow + for writer in &writers { + if !writer.contains_item(wtxn, docid)? { + writer.add_item(wtxn, docid, &vector)?; + break; + } + } } } - log::debug!("There are {} entries in the arroy so far", unavailable_vector_ids.len()); - index.put_unavailable_vector_ids(wtxn, unavailable_vector_ids)?; + log::debug!("There are 🤷‍♀️ entries in the arroy so far"); } TypedChunk::ScriptLanguageDocids(sl_map) => { for (key, (deletion, addition)) in sl_map {