This commit is contained in:
Tamo 2024-09-19 17:42:52 +02:00
parent f6483cf15d
commit afa3ae0cbd
3 changed files with 38 additions and 46 deletions

View File

@ -689,9 +689,8 @@ where
key: None, key: None,
}, },
)?; )?;
let first_id = crate::vector::arroy_db_range_for_embedder(index).next().unwrap();
let reader = let reader =
ArroyWrapper::new(self.index.vector_arroy, first_id, action.was_quantized); ArroyWrapper::new(self.index.vector_arroy, index, action.was_quantized);
let dim = reader.dimensions(self.wtxn)?; let dim = reader.dimensions(self.wtxn)?;
dimension.insert(name.to_string(), dim); dimension.insert(name.to_string(), dim);
} }
@ -713,17 +712,11 @@ where
let is_quantizing = embedder_config.map_or(false, |action| action.is_being_quantized); let is_quantizing = embedder_config.map_or(false, |action| action.is_being_quantized);
pool.install(|| { pool.install(|| {
for k in crate::vector::arroy_db_range_for_embedder(embedder_index) { let mut writer = ArroyWrapper::new(vector_arroy, embedder_index, was_quantized);
let mut writer = ArroyWrapper::new(vector_arroy, k, was_quantized); if is_quantizing {
if is_quantizing { writer.quantize(wtxn, dimension)?;
writer.quantize(wtxn, k, dimension)?;
}
if writer.need_build(wtxn, dimension)? {
writer.build(wtxn, &mut rng, dimension)?;
} else if writer.is_empty(wtxn, dimension)? {
break;
}
} }
writer.build(wtxn, &mut rng, dimension)?;
Result::Ok(()) Result::Ok(())
}) })
.map_err(InternalError::from)??; .map_err(InternalError::from)??;

View File

@ -673,22 +673,14 @@ pub(crate) fn write_typed_chunk_into_index(
.get(&embedder_name) .get(&embedder_name)
.map_or(false, |conf| conf.2); .map_or(false, |conf| conf.2);
// FIXME: allow customizing distance // FIXME: allow customizing distance
let writers: Vec<_> = crate::vector::arroy_db_range_for_embedder(embedder_index) let writer = ArroyWrapper::new(index.vector_arroy, embedder_index, binary_quantized);
.map(|k| ArroyWrapper::new(index.vector_arroy, k, binary_quantized))
.collect();
// remove vectors for docids we want them removed // remove vectors for docids we want them removed
let merger = remove_vectors_builder.build(); let merger = remove_vectors_builder.build();
let mut iter = merger.into_stream_merger_iter()?; let mut iter = merger.into_stream_merger_iter()?;
while let Some((key, _)) = iter.next()? { while let Some((key, _)) = iter.next()? {
let docid = key.try_into().map(DocumentId::from_be_bytes).unwrap(); let docid = key.try_into().map(DocumentId::from_be_bytes).unwrap();
writer.del_item(wtxn, expected_dimension, docid)?;
for writer in &writers {
// Uses invariant: vectors are packed in the first writers.
if !writer.del_item(wtxn, expected_dimension, docid)? {
break;
}
}
} }
// add generated embeddings // add generated embeddings
@ -716,9 +708,7 @@ pub(crate) fn write_typed_chunk_into_index(
embeddings.embedding_count(), embeddings.embedding_count(),
))); )));
} }
for (embedding, writer) in embeddings.iter().zip(&writers) { writer.add_items(wtxn, expected_dimension, docid, embeddings)?;
writer.add_item(wtxn, expected_dimension, docid, embedding)?;
}
} }
// perform the manual diff // perform the manual diff

View File

@ -32,60 +32,69 @@ pub const REQUEST_PARALLELISM: usize = 40;
pub struct ArroyWrapper { pub struct ArroyWrapper {
quantized: bool, quantized: bool,
index: u16, index: u8,
database: arroy::Database<Unspecified>, database: arroy::Database<Unspecified>,
} }
impl ArroyWrapper { impl ArroyWrapper {
pub fn new(database: arroy::Database<Unspecified>, index: u16, quantized: bool) -> Self { pub fn new(database: arroy::Database<Unspecified>, index: u8, quantized: bool) -> Self {
Self { database, index, quantized } Self { database, index, quantized }
} }
pub fn index(&self) -> u16 { pub fn index(&self) -> u8 {
self.index self.index
} }
pub fn dimensions(&self, rtxn: &RoTxn) -> Result<usize, arroy::Error> { pub fn dimensions(&self, rtxn: &RoTxn) -> Result<usize, arroy::Error> {
let first_id = arroy_db_range_for_embedder(self.index).next().unwrap();
if self.quantized { if self.quantized {
Ok(arroy::Reader::open(rtxn, self.index, self.quantized_db())?.dimensions()) Ok(arroy::Reader::open(rtxn, first_id, self.quantized_db())?.dimensions())
} else { } else {
Ok(arroy::Reader::open(rtxn, self.index, self.angular_db())?.dimensions()) Ok(arroy::Reader::open(rtxn, first_id, self.angular_db())?.dimensions())
} }
} }
pub fn quantize( pub fn quantize(&mut self, wtxn: &mut RwTxn, dimension: usize) -> Result<(), arroy::Error> {
&mut self,
wtxn: &mut RwTxn,
index: u16,
dimension: usize,
) -> Result<(), arroy::Error> {
if !self.quantized { if !self.quantized {
let writer = arroy::Writer::new(self.angular_db(), index, dimension); for index in arroy_db_range_for_embedder(self.index) {
writer.prepare_changing_distance::<BinaryQuantizedAngular>(wtxn)?; let writer = arroy::Writer::new(self.angular_db(), index, dimension);
writer.prepare_changing_distance::<BinaryQuantizedAngular>(wtxn)?;
}
self.quantized = true; self.quantized = true;
} }
Ok(()) Ok(())
} }
// TODO: We can stop early when we find an empty DB
pub fn need_build(&self, rtxn: &RoTxn, dimension: usize) -> Result<bool, arroy::Error> { pub fn need_build(&self, rtxn: &RoTxn, dimension: usize) -> Result<bool, arroy::Error> {
if self.quantized { for index in arroy_db_range_for_embedder(self.index) {
arroy::Writer::new(self.quantized_db(), self.index, dimension).need_build(rtxn) let need_build = if self.quantized {
} else { arroy::Writer::new(self.quantized_db(), index, dimension).need_build(rtxn)
arroy::Writer::new(self.angular_db(), self.index, dimension).need_build(rtxn) } else {
arroy::Writer::new(self.angular_db(), index, dimension).need_build(rtxn)
};
if need_build? {
return Ok(true);
}
} }
Ok(false)
} }
/// TODO: We should early exit when it doesn't need to be built
pub fn build<R: rand::Rng + rand::SeedableRng>( pub fn build<R: rand::Rng + rand::SeedableRng>(
&self, &self,
wtxn: &mut RwTxn, wtxn: &mut RwTxn,
rng: &mut R, rng: &mut R,
dimension: usize, dimension: usize,
) -> Result<(), arroy::Error> { ) -> Result<(), arroy::Error> {
if self.quantized { for index in arroy_db_range_for_embedder(self.index) {
arroy::Writer::new(self.quantized_db(), self.index, dimension).build(wtxn, rng, None) if self.quantized {
} else { arroy::Writer::new(self.quantized_db(), index, dimension).build(wtxn, rng, None)?
arroy::Writer::new(self.angular_db(), self.index, dimension).build(wtxn, rng, None) } else {
arroy::Writer::new(self.angular_db(), index, dimension).build(wtxn, rng, None)?
}
} }
Ok(())
} }
pub fn add_item( pub fn add_item(