first ugly step

This commit is contained in:
Tamo 2024-09-23 15:15:26 +02:00
parent afa3ae0cbd
commit 6ba4baecbf
3 changed files with 203 additions and 100 deletions

View File

@ -4,7 +4,7 @@ use ordered_float::OrderedFloat;
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use crate::score_details::{self, ScoreDetails}; use crate::score_details::{self, ScoreDetails};
use crate::vector::Embedder; use crate::vector::{ArroyWrapper, Embedder};
use crate::{filtered_universe, DocumentId, Filter, Index, Result, SearchResult}; use crate::{filtered_universe, DocumentId, Filter, Index, Result, SearchResult};
pub struct Similar<'a> { pub struct Similar<'a> {
@ -71,23 +71,13 @@ impl<'a> Similar<'a> {
.get(self.rtxn, &self.embedder_name)? .get(self.rtxn, &self.embedder_name)?
.ok_or_else(|| crate::UserError::InvalidEmbedder(self.embedder_name.to_owned()))?; .ok_or_else(|| crate::UserError::InvalidEmbedder(self.embedder_name.to_owned()))?;
let mut results = Vec::new(); let reader = ArroyWrapper::new(self.index.vector_arroy, embedder_index, self.quantized);
let results = reader.nns_by_item(
for reader in self.index.arroy_readers(self.rtxn, embedder_index, self.quantized) {
let nns_by_item = reader?.nns_by_item(
self.rtxn, self.rtxn,
self.id, self.id,
self.limit + self.offset + 1, self.limit + self.offset + 1,
Some(&universe), Some(&universe),
)?; )?;
if let Some(mut nns_by_item) = nns_by_item {
results.append(&mut nns_by_item);
} else {
break;
}
}
results.sort_unstable_by_key(|(_, distance)| OrderedFloat(*distance));
let mut documents_ids = Vec::with_capacity(self.limit); let mut documents_ids = Vec::with_capacity(self.limit);
let mut document_scores = Vec::with_capacity(self.limit); let mut document_scores = Vec::with_capacity(self.limit);

View File

@ -680,7 +680,7 @@ pub(crate) fn write_typed_chunk_into_index(
let mut iter = merger.into_stream_merger_iter()?; let mut iter = merger.into_stream_merger_iter()?;
while let Some((key, _)) = iter.next()? { while let Some((key, _)) = iter.next()? {
let docid = key.try_into().map(DocumentId::from_be_bytes).unwrap(); let docid = key.try_into().map(DocumentId::from_be_bytes).unwrap();
writer.del_item(wtxn, expected_dimension, docid)?; writer.del_item_raw(wtxn, expected_dimension, docid)?;
} }
// add generated embeddings // add generated embeddings
@ -708,7 +708,7 @@ pub(crate) fn write_typed_chunk_into_index(
embeddings.embedding_count(), embeddings.embedding_count(),
))); )));
} }
writer.add_items(wtxn, expected_dimension, docid, embeddings)?; writer.add_items(wtxn, docid, &embeddings)?;
} }
// perform the manual diff // perform the manual diff
@ -723,51 +723,14 @@ pub(crate) fn write_typed_chunk_into_index(
if let Some(value) = vector_deladd_obkv.get(DelAdd::Deletion) { if let Some(value) = vector_deladd_obkv.get(DelAdd::Deletion) {
let vector: Vec<f32> = pod_collect_to_vec(value); let vector: Vec<f32> = pod_collect_to_vec(value);
let mut deleted_index = None; writer.del_item(wtxn, docid, &vector)?;
for (index, writer) in writers.iter().enumerate() {
let Some(candidate) = writer.item_vector(wtxn, docid)? else {
// uses invariant: vectors are packed in the first writers.
break;
};
if candidate == vector {
writer.del_item(wtxn, expected_dimension, docid)?;
deleted_index = Some(index);
}
}
// 🥲 enforce invariant: vectors are packed in the first writers.
if let Some(deleted_index) = deleted_index {
let mut last_index_with_a_vector = None;
for (index, writer) in writers.iter().enumerate().skip(deleted_index) {
let Some(candidate) = writer.item_vector(wtxn, docid)? else {
break;
};
last_index_with_a_vector = Some((index, candidate));
}
if let Some((last_index, vector)) = last_index_with_a_vector {
// unwrap: computed the index from the list of writers
let writer = writers.get(last_index).unwrap();
writer.del_item(wtxn, expected_dimension, docid)?;
writers.get(deleted_index).unwrap().add_item(
wtxn,
expected_dimension,
docid,
&vector,
)?;
}
}
} }
if let Some(value) = vector_deladd_obkv.get(DelAdd::Addition) { if let Some(value) = vector_deladd_obkv.get(DelAdd::Addition) {
let vector = pod_collect_to_vec(value); let vector = pod_collect_to_vec(value);
// overflow was detected during vector extraction. // overflow was detected during vector extraction.
for writer in &writers { writer.add_item(wtxn, docid, &vector)?;
if !writer.contains_item(wtxn, expected_dimension, docid)? {
writer.add_item(wtxn, expected_dimension, docid, &vector)?;
break;
}
}
} }
} }

View File

@ -97,50 +97,166 @@ impl ArroyWrapper {
Ok(()) Ok(())
} }
pub fn add_items(
&self,
wtxn: &mut RwTxn,
item_id: arroy::ItemId,
embeddings: &Embeddings<f32>,
) -> Result<(), arroy::Error> {
let dimension = embeddings.dimension();
for (index, vector) in arroy_db_range_for_embedder(self.index).zip(embeddings.iter()) {
if self.quantized {
arroy::Writer::new(self.quantized_db(), index, dimension)
.add_item(wtxn, item_id, vector)?
} else {
arroy::Writer::new(self.angular_db(), index, dimension)
.add_item(wtxn, item_id, vector)?
}
}
Ok(())
}
pub fn add_item( pub fn add_item(
&self, &self,
wtxn: &mut RwTxn, wtxn: &mut RwTxn,
dimension: usize,
item_id: arroy::ItemId, item_id: arroy::ItemId,
vector: &[f32], vector: &[f32],
) -> Result<(), arroy::Error> { ) -> Result<(), arroy::Error> {
let dimension = vector.len();
for index in arroy_db_range_for_embedder(self.index) {
if self.quantized { if self.quantized {
arroy::Writer::new(self.quantized_db(), self.index, dimension) let writer = arroy::Writer::new(self.quantized_db(), index, dimension);
.add_item(wtxn, item_id, vector) if !writer.contains_item(wtxn, item_id)? {
writer.add_item(wtxn, item_id, &vector)?;
break;
}
} else { } else {
arroy::Writer::new(self.angular_db(), self.index, dimension) arroy::Writer::new(self.angular_db(), index, dimension)
.add_item(wtxn, item_id, vector) .add_item(wtxn, item_id, vector)?
} }
} }
pub fn del_item( Ok(())
}
pub fn del_item_raw(
&self, &self,
wtxn: &mut RwTxn, wtxn: &mut RwTxn,
dimension: usize, dimension: usize,
item_id: arroy::ItemId, item_id: arroy::ItemId,
) -> Result<bool, arroy::Error> { ) -> Result<bool, arroy::Error> {
for index in arroy_db_range_for_embedder(self.index) {
if self.quantized { if self.quantized {
arroy::Writer::new(self.quantized_db(), self.index, dimension).del_item(wtxn, item_id) let writer = arroy::Writer::new(self.quantized_db(), index, dimension);
} else { if writer.del_item(wtxn, item_id)? {
arroy::Writer::new(self.angular_db(), self.index, dimension).del_item(wtxn, item_id) return Ok(true);
} }
} else {
let writer = arroy::Writer::new(self.angular_db(), index, dimension);
if writer.del_item(wtxn, item_id)? {
return Ok(true);
}
}
}
Ok(false)
}
pub fn del_item(
&self,
wtxn: &mut RwTxn,
itemid: arroy::ItemId,
vector: &[f32],
) -> Result<bool, arroy::Error> {
let dimension = vector.len();
let mut deleted_index = None;
for index in arroy_db_range_for_embedder(self.index) {
if self.quantized {
let writer = arroy::Writer::new(self.quantized_db(), index, dimension);
let Some(candidate) = writer.item_vector(wtxn, itemid)? else {
// uses invariant: vectors are packed in the first writers.
break;
};
if candidate == vector {
writer.del_item(wtxn, itemid)?;
deleted_index = Some(index);
}
} else {
let writer = arroy::Writer::new(self.angular_db(), index, dimension);
let Some(candidate) = writer.item_vector(wtxn, itemid)? else {
// uses invariant: vectors are packed in the first writers.
break;
};
if candidate == vector {
writer.del_item(wtxn, itemid)?;
deleted_index = Some(index);
}
}
}
// 🥲 enforce invariant: vectors are packed in the first writers.
if let Some(deleted_index) = deleted_index {
let mut last_index_with_a_vector = None;
for index in arroy_db_range_for_embedder(self.index).skip(deleted_index as usize) {
if self.quantized {
let writer = arroy::Writer::new(self.quantized_db(), index, dimension);
let Some(candidate) = writer.item_vector(wtxn, itemid)? else {
break;
};
last_index_with_a_vector = Some((index, candidate));
} else {
let writer = arroy::Writer::new(self.angular_db(), index, dimension);
let Some(candidate) = writer.item_vector(wtxn, itemid)? else {
break;
};
last_index_with_a_vector = Some((index, candidate));
}
}
if let Some((last_index, vector)) = last_index_with_a_vector {
if self.quantized {
// unwrap: computed the index from the list of writers
let writer = arroy::Writer::new(self.quantized_db(), last_index, dimension);
writer.del_item(wtxn, itemid)?;
let writer = arroy::Writer::new(self.quantized_db(), deleted_index, dimension);
writer.add_item(wtxn, itemid, &vector)?;
} else {
// unwrap: computed the index from the list of writers
let writer = arroy::Writer::new(self.angular_db(), last_index, dimension);
writer.del_item(wtxn, itemid)?;
let writer = arroy::Writer::new(self.angular_db(), deleted_index, dimension);
writer.add_item(wtxn, itemid, &vector)?;
}
}
}
Ok(deleted_index.is_some())
} }
pub fn clear(&self, wtxn: &mut RwTxn, dimension: usize) -> Result<(), arroy::Error> { pub fn clear(&self, wtxn: &mut RwTxn, dimension: usize) -> Result<(), arroy::Error> {
for index in arroy_db_range_for_embedder(self.index) {
if self.quantized { if self.quantized {
arroy::Writer::new(self.quantized_db(), self.index, dimension).clear(wtxn) arroy::Writer::new(self.quantized_db(), index, dimension).clear(wtxn)?;
} else { } else {
arroy::Writer::new(self.angular_db(), self.index, dimension).clear(wtxn) arroy::Writer::new(self.angular_db(), index, dimension).clear(wtxn)?;
} }
} }
Ok(())
}
pub fn is_empty(&self, rtxn: &RoTxn, dimension: usize) -> Result<bool, arroy::Error> { pub fn is_empty(&self, rtxn: &RoTxn, dimension: usize) -> Result<bool, arroy::Error> {
if self.quantized { for index in arroy_db_range_for_embedder(self.index) {
arroy::Writer::new(self.quantized_db(), self.index, dimension).is_empty(rtxn) let empty = if self.quantized {
arroy::Writer::new(self.quantized_db(), index, dimension).is_empty(rtxn)?
} else { } else {
arroy::Writer::new(self.angular_db(), self.index, dimension).is_empty(rtxn) arroy::Writer::new(self.angular_db(), index, dimension).is_empty(rtxn)?
};
if !empty {
return Ok(false);
} }
} }
Ok(true)
}
pub fn contains_item( pub fn contains_item(
&self, &self,
@ -148,12 +264,19 @@ impl ArroyWrapper {
dimension: usize, dimension: usize,
item: arroy::ItemId, item: arroy::ItemId,
) -> Result<bool, arroy::Error> { ) -> Result<bool, arroy::Error> {
if self.quantized { for index in arroy_db_range_for_embedder(self.index) {
arroy::Writer::new(self.quantized_db(), self.index, dimension).contains_item(rtxn, item) let contains = if self.quantized {
arroy::Writer::new(self.quantized_db(), index, dimension)
.contains_item(rtxn, item)?
} else { } else {
arroy::Writer::new(self.angular_db(), self.index, dimension).contains_item(rtxn, item) arroy::Writer::new(self.angular_db(), index, dimension).contains_item(rtxn, item)?
};
if contains {
return Ok(contains);
} }
} }
Ok(false)
}
pub fn nns_by_item( pub fn nns_by_item(
&self, &self,
@ -161,15 +284,27 @@ impl ArroyWrapper {
item: ItemId, item: ItemId,
limit: usize, limit: usize,
filter: Option<&RoaringBitmap>, filter: Option<&RoaringBitmap>,
) -> Result<Option<Vec<(ItemId, f32)>>, arroy::Error> { ) -> Result<Vec<(ItemId, f32)>, arroy::Error> {
if self.quantized { let mut results = Vec::new();
arroy::Reader::open(rtxn, self.index, self.quantized_db())?
.nns_by_item(rtxn, item, limit, None, None, filter) for index in arroy_db_range_for_embedder(self.index) {
let ret = if self.quantized {
arroy::Reader::open(rtxn, index, self.quantized_db())?
.nns_by_item(rtxn, item, limit, None, None, filter)?
} else { } else {
arroy::Reader::open(rtxn, self.index, self.angular_db())? arroy::Reader::open(rtxn, index, self.angular_db())?
.nns_by_item(rtxn, item, limit, None, None, filter) .nns_by_item(rtxn, item, limit, None, None, filter)?
};
if let Some(mut ret) = ret {
results.append(&mut ret);
} else {
break;
} }
} }
results.sort_unstable_by_key(|(_, distance)| OrderedFloat(*distance));
Ok(results)
}
pub fn nns_by_vector( pub fn nns_by_vector(
&self, &self,
@ -178,22 +313,37 @@ impl ArroyWrapper {
limit: usize, limit: usize,
filter: Option<&RoaringBitmap>, filter: Option<&RoaringBitmap>,
) -> Result<Vec<(ItemId, f32)>, arroy::Error> { ) -> Result<Vec<(ItemId, f32)>, arroy::Error> {
if self.quantized { let mut results = Vec::new();
arroy::Reader::open(txn, self.index, self.quantized_db())?
.nns_by_vector(txn, item, limit, None, None, filter) for index in arroy_db_range_for_embedder(self.index) {
let mut ret = if self.quantized {
arroy::Reader::open(txn, index, self.quantized_db())?
.nns_by_vector(txn, item, limit, None, None, filter)?
} else { } else {
arroy::Reader::open(txn, self.index, self.angular_db())? arroy::Reader::open(txn, index, self.angular_db())?
.nns_by_vector(txn, item, limit, None, None, filter) .nns_by_vector(txn, item, limit, None, None, filter)?
};
results.append(&mut ret);
} }
results.sort_unstable_by_key(|(_, distance)| OrderedFloat(*distance));
Ok(results)
} }
pub fn item_vector(&self, rtxn: &RoTxn, docid: u32) -> Result<Option<Vec<f32>>, arroy::Error> { pub fn item_vector(&self, rtxn: &RoTxn, docid: u32) -> Result<Option<Vec<f32>>, arroy::Error> {
if self.quantized { for index in arroy_db_range_for_embedder(self.index) {
arroy::Reader::open(rtxn, self.index, self.quantized_db())?.item_vector(rtxn, docid) let ret = if self.quantized {
arroy::Reader::open(rtxn, index, self.quantized_db())?.item_vector(rtxn, docid)?
} else { } else {
arroy::Reader::open(rtxn, self.index, self.angular_db())?.item_vector(rtxn, docid) arroy::Reader::open(rtxn, index, self.angular_db())?.item_vector(rtxn, docid)?
};
if ret.is_some() {
return Ok(ret);
} }
} }
Ok(None)
}
fn angular_db(&self) -> arroy::Database<Angular> { fn angular_db(&self) -> arroy::Database<Angular> {
self.database.remap_data_type() self.database.remap_data_type()