finish the arroywrapper

This commit is contained in:
Tamo 2024-09-23 18:56:15 +02:00
parent 6ba4baecbf
commit 1e4d4e69c4
5 changed files with 155 additions and 161 deletions

View File

@ -1610,24 +1610,6 @@ impl Index {
.unwrap_or_default()) .unwrap_or_default())
} }
pub fn arroy_readers<'a>(
&'a self,
rtxn: &'a RoTxn<'a>,
embedder_id: u8,
quantized: bool,
) -> impl Iterator<Item = Result<ArroyWrapper>> + 'a {
crate::vector::arroy_db_range_for_embedder(embedder_id).map_while(move |k| {
let reader = ArroyWrapper::new(self.vector_arroy, k, quantized);
// Here we don't care about the dimensions, but we want to know if we can read
// in the database or if its metadata are missing because there is no document with that many vectors.
match reader.dimensions(rtxn) {
Ok(_) => Some(Ok(reader)),
Err(arroy::Error::MissingMetadata(_)) => None,
Err(e) => Some(Err(e.into())),
}
})
}
pub(crate) fn put_search_cutoff(&self, wtxn: &mut RwTxn<'_>, cutoff: u64) -> heed::Result<()> { pub(crate) fn put_search_cutoff(&self, wtxn: &mut RwTxn<'_>, cutoff: u64) -> heed::Result<()> {
self.main.remap_types::<Str, BEU64>().put(wtxn, main_key::SEARCH_CUTOFF, &cutoff) self.main.remap_types::<Str, BEU64>().put(wtxn, main_key::SEARCH_CUTOFF, &cutoff)
} }
@ -1649,14 +1631,9 @@ impl Index {
let embedding_configs = self.embedding_configs(rtxn)?; let embedding_configs = self.embedding_configs(rtxn)?;
for config in embedding_configs { for config in embedding_configs {
let embedder_id = self.embedder_category_id.get(rtxn, &config.name)?.unwrap(); let embedder_id = self.embedder_category_id.get(rtxn, &config.name)?.unwrap();
let embeddings = self let reader =
.arroy_readers(rtxn, embedder_id, config.config.quantized()) ArroyWrapper::new(self.vector_arroy, embedder_id, config.config.quantized());
.map_while(|reader| { let embeddings = reader.item_vectors(rtxn, docid)?;
reader
.and_then(|r| r.item_vector(rtxn, docid).map_err(|e| e.into()))
.transpose()
})
.collect::<Result<Vec<_>>>()?;
res.insert(config.name.to_owned(), embeddings); res.insert(config.name.to_owned(), embeddings);
} }
Ok(res) Ok(res)

View File

@ -1,11 +1,10 @@
use std::iter::FromIterator; use std::iter::FromIterator;
use ordered_float::OrderedFloat;
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use super::ranking_rules::{RankingRule, RankingRuleOutput, RankingRuleQueryTrait}; use super::ranking_rules::{RankingRule, RankingRuleOutput, RankingRuleQueryTrait};
use crate::score_details::{self, ScoreDetails}; use crate::score_details::{self, ScoreDetails};
use crate::vector::{DistributionShift, Embedder}; use crate::vector::{ArroyWrapper, DistributionShift, Embedder};
use crate::{DocumentId, Result, SearchContext, SearchLogger}; use crate::{DocumentId, Result, SearchContext, SearchLogger};
pub struct VectorSort<Q: RankingRuleQueryTrait> { pub struct VectorSort<Q: RankingRuleQueryTrait> {
@ -53,14 +52,9 @@ impl<Q: RankingRuleQueryTrait> VectorSort<Q> {
vector_candidates: &RoaringBitmap, vector_candidates: &RoaringBitmap,
) -> Result<()> { ) -> Result<()> {
let target = &self.target; let target = &self.target;
let mut results = Vec::new();
for reader in ctx.index.arroy_readers(ctx.txn, self.embedder_index, self.quantized) { let reader = ArroyWrapper::new(ctx.index.vector_arroy, self.embedder_index, self.quantized);
let nns_by_vector = let results = reader.nns_by_vector(ctx.txn, target, self.limit, Some(vector_candidates))?;
reader?.nns_by_vector(ctx.txn, target, self.limit, Some(vector_candidates))?;
results.extend(nns_by_vector.into_iter());
}
results.sort_unstable_by_key(|(_, distance)| OrderedFloat(*distance));
self.cached_sorted_docids = results.into_iter(); self.cached_sorted_docids = results.into_iter();
Ok(()) Ok(())

View File

@ -1,6 +1,5 @@
use std::sync::Arc; use std::sync::Arc;
use ordered_float::OrderedFloat;
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use crate::score_details::{self, ScoreDetails}; use crate::score_details::{self, ScoreDetails};

View File

@ -990,27 +990,24 @@ impl<'a, 'i> Transform<'a, 'i> {
None None
}; };
let readers: Result<BTreeMap<&str, (Vec<ArroyWrapper>, &RoaringBitmap)>> = settings_diff let readers: BTreeMap<&str, (ArroyWrapper, &RoaringBitmap)> = settings_diff
.embedding_config_updates .embedding_config_updates
.iter() .iter()
.filter_map(|(name, action)| { .filter_map(|(name, action)| {
if let Some(WriteBackToDocuments { embedder_id, user_provided }) = if let Some(WriteBackToDocuments { embedder_id, user_provided }) =
action.write_back() action.write_back()
{ {
let readers: Result<Vec<_>> = self let reader = ArroyWrapper::new(
.index self.index.vector_arroy,
.arroy_readers(wtxn, *embedder_id, action.was_quantized) *embedder_id,
.collect(); action.was_quantized,
match readers { );
Ok(readers) => Some(Ok((name.as_str(), (readers, user_provided)))), Some((name.as_str(), (reader, user_provided)))
Err(error) => Some(Err(error)),
}
} else { } else {
None None
} }
}) })
.collect(); .collect();
let readers = readers?;
let old_vectors_fid = settings_diff let old_vectors_fid = settings_diff
.old .old
@ -1048,34 +1045,24 @@ impl<'a, 'i> Transform<'a, 'i> {
arroy::Error, arroy::Error,
> = readers > = readers
.iter() .iter()
.filter_map(|(name, (readers, user_provided))| { .filter_map(|(name, (reader, user_provided))| {
if !user_provided.contains(docid) { if !user_provided.contains(docid) {
return None; return None;
} }
let mut vectors = Vec::new(); match reader.item_vectors(wtxn, docid) {
for reader in readers { Ok(vectors) if vectors.is_empty() => None,
let Some(vector) = reader.item_vector(wtxn, docid).transpose() else { Ok(vectors) => Some(Ok((
break;
};
match vector {
Ok(vector) => vectors.push(vector),
Err(error) => return Some(Err(error)),
}
}
if vectors.is_empty() {
return None;
}
Some(Ok((
name.to_string(), name.to_string(),
serde_json::to_value(ExplicitVectors { serde_json::to_value(ExplicitVectors {
embeddings: Some(VectorOrArrayOfVectors::from_array_of_vectors( embeddings: Some(
vectors, VectorOrArrayOfVectors::from_array_of_vectors(vectors),
)), ),
regenerate: false, regenerate: false,
}) })
.unwrap(), .unwrap(),
))) ))),
Err(e) => Some(Err(e)),
}
}) })
.collect(); .collect();
@ -1104,12 +1091,10 @@ impl<'a, 'i> Transform<'a, 'i> {
} }
// delete all vectors from the embedders that need removal // delete all vectors from the embedders that need removal
for (_, (readers, _)) in readers { for (_, (reader, _)) in readers {
for reader in readers {
let dimensions = reader.dimensions(wtxn)?; let dimensions = reader.dimensions(wtxn)?;
reader.clear(wtxn, dimensions)?; reader.clear(wtxn, dimensions)?;
} }
}
let grenad_params = GrenadParameters { let grenad_params = GrenadParameters {
chunk_compression_type: self.indexer_settings.chunk_compression_type, chunk_compression_type: self.indexer_settings.chunk_compression_type,

View File

@ -45,6 +45,20 @@ impl ArroyWrapper {
self.index self.index
} }
fn readers<'a, D: arroy::Distance>(
&'a self,
rtxn: &'a RoTxn<'a>,
db: arroy::Database<D>,
) -> impl Iterator<Item = Result<arroy::Reader<D>, arroy::Error>> + 'a {
arroy_db_range_for_embedder(self.index).map_while(move |index| {
match arroy::Reader::open(rtxn, index, db) {
Ok(reader) => Some(Ok(reader)),
Err(arroy::Error::MissingMetadata(_)) => None,
Err(e) => Some(Err(e)),
}
})
}
pub fn dimensions(&self, rtxn: &RoTxn) -> Result<usize, arroy::Error> { pub fn dimensions(&self, rtxn: &RoTxn) -> Result<usize, arroy::Error> {
let first_id = arroy_db_range_for_embedder(self.index).next().unwrap(); let first_id = arroy_db_range_for_embedder(self.index).next().unwrap();
if self.quantized { if self.quantized {
@ -97,6 +111,7 @@ impl ArroyWrapper {
Ok(()) Ok(())
} }
/// Overwrite all the embeddings associated to the index and item id.
pub fn add_items( pub fn add_items(
&self, &self,
wtxn: &mut RwTxn, wtxn: &mut RwTxn,
@ -116,30 +131,41 @@ impl ArroyWrapper {
Ok(()) Ok(())
} }
/// Add one document int for this index where we can find an empty spot.
pub fn add_item( pub fn add_item(
&self, &self,
wtxn: &mut RwTxn, wtxn: &mut RwTxn,
item_id: arroy::ItemId, item_id: arroy::ItemId,
vector: &[f32], vector: &[f32],
) -> Result<(), arroy::Error> {
if self.quantized {
self._add_item(wtxn, self.quantized_db(), item_id, vector)
} else {
self._add_item(wtxn, self.angular_db(), item_id, vector)
}
}
fn _add_item<D: arroy::Distance>(
&self,
wtxn: &mut RwTxn,
db: arroy::Database<D>,
item_id: arroy::ItemId,
vector: &[f32],
) -> Result<(), arroy::Error> { ) -> Result<(), arroy::Error> {
let dimension = vector.len(); let dimension = vector.len();
for index in arroy_db_range_for_embedder(self.index) { for index in arroy_db_range_for_embedder(self.index) {
if self.quantized { let writer = arroy::Writer::new(db, index, dimension);
let writer = arroy::Writer::new(self.quantized_db(), index, dimension);
if !writer.contains_item(wtxn, item_id)? { if !writer.contains_item(wtxn, item_id)? {
writer.add_item(wtxn, item_id, &vector)?; writer.add_item(wtxn, item_id, vector)?;
break; break;
} }
} else {
arroy::Writer::new(self.angular_db(), index, dimension)
.add_item(wtxn, item_id, vector)?
} }
}
Ok(()) Ok(())
} }
/// Delete an item from the index. It **does not** take care of fixing the hole
/// made after deleting the item.
pub fn del_item_raw( pub fn del_item_raw(
&self, &self,
wtxn: &mut RwTxn, wtxn: &mut RwTxn,
@ -163,71 +189,58 @@ impl ArroyWrapper {
Ok(false) Ok(false)
} }
/// Delete one item.
pub fn del_item( pub fn del_item(
&self, &self,
wtxn: &mut RwTxn, wtxn: &mut RwTxn,
itemid: arroy::ItemId, item_id: arroy::ItemId,
vector: &[f32],
) -> Result<bool, arroy::Error> {
if self.quantized {
self._del_item(wtxn, self.quantized_db(), item_id, vector)
} else {
self._del_item(wtxn, self.angular_db(), item_id, vector)
}
}
fn _del_item<D: arroy::Distance>(
&self,
wtxn: &mut RwTxn,
db: arroy::Database<D>,
item_id: arroy::ItemId,
vector: &[f32], vector: &[f32],
) -> Result<bool, arroy::Error> { ) -> Result<bool, arroy::Error> {
let dimension = vector.len(); let dimension = vector.len();
let mut deleted_index = None; let mut deleted_index = None;
for index in arroy_db_range_for_embedder(self.index) { for index in arroy_db_range_for_embedder(self.index) {
if self.quantized { let writer = arroy::Writer::new(db, index, dimension);
let writer = arroy::Writer::new(self.quantized_db(), index, dimension); let Some(candidate) = writer.item_vector(wtxn, item_id)? else {
let Some(candidate) = writer.item_vector(wtxn, itemid)? else {
// uses invariant: vectors are packed in the first writers. // uses invariant: vectors are packed in the first writers.
break; break;
}; };
if candidate == vector { if candidate == vector {
writer.del_item(wtxn, itemid)?; writer.del_item(wtxn, item_id)?;
deleted_index = Some(index); deleted_index = Some(index);
} }
} else {
let writer = arroy::Writer::new(self.angular_db(), index, dimension);
let Some(candidate) = writer.item_vector(wtxn, itemid)? else {
// uses invariant: vectors are packed in the first writers.
break;
};
if candidate == vector {
writer.del_item(wtxn, itemid)?;
deleted_index = Some(index);
}
}
} }
// 🥲 enforce invariant: vectors are packed in the first writers. // 🥲 enforce invariant: vectors are packed in the first writers.
if let Some(deleted_index) = deleted_index { if let Some(deleted_index) = deleted_index {
let mut last_index_with_a_vector = None; let mut last_index_with_a_vector = None;
for index in arroy_db_range_for_embedder(self.index).skip(deleted_index as usize) { for index in arroy_db_range_for_embedder(self.index).skip(deleted_index as usize) {
if self.quantized { let writer = arroy::Writer::new(db, index, dimension);
let writer = arroy::Writer::new(self.quantized_db(), index, dimension); let Some(candidate) = writer.item_vector(wtxn, item_id)? else {
let Some(candidate) = writer.item_vector(wtxn, itemid)? else {
break; break;
}; };
last_index_with_a_vector = Some((index, candidate)); last_index_with_a_vector = Some((index, candidate));
} else {
let writer = arroy::Writer::new(self.angular_db(), index, dimension);
let Some(candidate) = writer.item_vector(wtxn, itemid)? else {
break;
};
last_index_with_a_vector = Some((index, candidate));
}
} }
if let Some((last_index, vector)) = last_index_with_a_vector { if let Some((last_index, vector)) = last_index_with_a_vector {
if self.quantized {
// unwrap: computed the index from the list of writers // unwrap: computed the index from the list of writers
let writer = arroy::Writer::new(self.quantized_db(), last_index, dimension); let writer = arroy::Writer::new(db, last_index, dimension);
writer.del_item(wtxn, itemid)?; writer.del_item(wtxn, item_id)?;
let writer = arroy::Writer::new(self.quantized_db(), deleted_index, dimension); let writer = arroy::Writer::new(db, deleted_index, dimension);
writer.add_item(wtxn, itemid, &vector)?; writer.add_item(wtxn, item_id, &vector)?;
} else {
// unwrap: computed the index from the list of writers
let writer = arroy::Writer::new(self.angular_db(), last_index, dimension);
writer.del_item(wtxn, itemid)?;
let writer = arroy::Writer::new(self.angular_db(), deleted_index, dimension);
writer.add_item(wtxn, itemid, &vector)?;
}
} }
} }
Ok(deleted_index.is_some()) Ok(deleted_index.is_some())
@ -284,17 +297,26 @@ impl ArroyWrapper {
item: ItemId, item: ItemId,
limit: usize, limit: usize,
filter: Option<&RoaringBitmap>, filter: Option<&RoaringBitmap>,
) -> Result<Vec<(ItemId, f32)>, arroy::Error> {
if self.quantized {
self._nns_by_item(rtxn, self.quantized_db(), item, limit, filter)
} else {
self._nns_by_item(rtxn, self.angular_db(), item, limit, filter)
}
}
fn _nns_by_item<D: arroy::Distance>(
&self,
rtxn: &RoTxn,
db: arroy::Database<D>,
item: ItemId,
limit: usize,
filter: Option<&RoaringBitmap>,
) -> Result<Vec<(ItemId, f32)>, arroy::Error> { ) -> Result<Vec<(ItemId, f32)>, arroy::Error> {
let mut results = Vec::new(); let mut results = Vec::new();
for index in arroy_db_range_for_embedder(self.index) { for reader in self.readers(rtxn, db) {
let ret = if self.quantized { let ret = reader?.nns_by_item(rtxn, item, limit, None, None, filter)?;
arroy::Reader::open(rtxn, index, self.quantized_db())?
.nns_by_item(rtxn, item, limit, None, None, filter)?
} else {
arroy::Reader::open(rtxn, index, self.angular_db())?
.nns_by_item(rtxn, item, limit, None, None, filter)?
};
if let Some(mut ret) = ret { if let Some(mut ret) = ret {
results.append(&mut ret); results.append(&mut ret);
} else { } else {
@ -302,27 +324,35 @@ impl ArroyWrapper {
} }
} }
results.sort_unstable_by_key(|(_, distance)| OrderedFloat(*distance)); results.sort_unstable_by_key(|(_, distance)| OrderedFloat(*distance));
Ok(results) Ok(results)
} }
pub fn nns_by_vector( pub fn nns_by_vector(
&self, &self,
txn: &RoTxn, rtxn: &RoTxn,
item: &[f32], vector: &[f32],
limit: usize,
filter: Option<&RoaringBitmap>,
) -> Result<Vec<(ItemId, f32)>, arroy::Error> {
if self.quantized {
self._nns_by_vector(rtxn, self.quantized_db(), vector, limit, filter)
} else {
self._nns_by_vector(rtxn, self.angular_db(), vector, limit, filter)
}
}
fn _nns_by_vector<D: arroy::Distance>(
&self,
rtxn: &RoTxn,
db: arroy::Database<D>,
vector: &[f32],
limit: usize, limit: usize,
filter: Option<&RoaringBitmap>, filter: Option<&RoaringBitmap>,
) -> Result<Vec<(ItemId, f32)>, arroy::Error> { ) -> Result<Vec<(ItemId, f32)>, arroy::Error> {
let mut results = Vec::new(); let mut results = Vec::new();
for index in arroy_db_range_for_embedder(self.index) { for reader in self.readers(rtxn, db) {
let mut ret = if self.quantized { let mut ret = reader?.nns_by_vector(rtxn, vector, limit, None, None, filter)?;
arroy::Reader::open(txn, index, self.quantized_db())?
.nns_by_vector(txn, item, limit, None, None, filter)?
} else {
arroy::Reader::open(txn, index, self.angular_db())?
.nns_by_vector(txn, item, limit, None, None, filter)?
};
results.append(&mut ret); results.append(&mut ret);
} }
@ -331,18 +361,27 @@ impl ArroyWrapper {
Ok(results) Ok(results)
} }
pub fn item_vector(&self, rtxn: &RoTxn, docid: u32) -> Result<Option<Vec<f32>>, arroy::Error> { pub fn item_vectors(&self, rtxn: &RoTxn, item_id: u32) -> Result<Vec<Vec<f32>>, arroy::Error> {
for index in arroy_db_range_for_embedder(self.index) { let mut vectors = Vec::new();
let ret = if self.quantized {
arroy::Reader::open(rtxn, index, self.quantized_db())?.item_vector(rtxn, docid)? if self.quantized {
for reader in self.readers(rtxn, self.quantized_db()) {
if let Some(vec) = reader?.item_vector(rtxn, item_id)? {
vectors.push(vec);
} else { } else {
arroy::Reader::open(rtxn, index, self.angular_db())?.item_vector(rtxn, docid)? break;
};
if ret.is_some() {
return Ok(ret);
} }
} }
Ok(None) } else {
for reader in self.readers(rtxn, self.angular_db()) {
if let Some(vec) = reader?.item_vector(rtxn, item_id)? {
vectors.push(vec);
} else {
break;
}
}
}
Ok(vectors)
} }
fn angular_db(&self) -> arroy::Database<Angular> { fn angular_db(&self) -> arroy::Database<Angular> {