From afa3ae0cbd9c7223d4068dd438d043a43d0d4fae Mon Sep 17 00:00:00 2001 From: Tamo Date: Thu, 19 Sep 2024 17:42:52 +0200 Subject: [PATCH] WIP --- milli/src/update/index_documents/mod.rs | 17 ++----- .../src/update/index_documents/typed_chunk.rs | 16 ++---- milli/src/vector/mod.rs | 51 +++++++++++-------- 3 files changed, 38 insertions(+), 46 deletions(-) diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index 326dd842d..b03ab259a 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -689,9 +689,8 @@ where key: None, }, )?; - let first_id = crate::vector::arroy_db_range_for_embedder(index).next().unwrap(); let reader = - ArroyWrapper::new(self.index.vector_arroy, first_id, action.was_quantized); + ArroyWrapper::new(self.index.vector_arroy, index, action.was_quantized); let dim = reader.dimensions(self.wtxn)?; dimension.insert(name.to_string(), dim); } @@ -713,17 +712,11 @@ where let is_quantizing = embedder_config.map_or(false, |action| action.is_being_quantized); pool.install(|| { - for k in crate::vector::arroy_db_range_for_embedder(embedder_index) { - let mut writer = ArroyWrapper::new(vector_arroy, k, was_quantized); - if is_quantizing { - writer.quantize(wtxn, k, dimension)?; - } - if writer.need_build(wtxn, dimension)? { - writer.build(wtxn, &mut rng, dimension)?; - } else if writer.is_empty(wtxn, dimension)? { - break; - } + let mut writer = ArroyWrapper::new(vector_arroy, embedder_index, was_quantized); + if is_quantizing { + writer.quantize(wtxn, dimension)?; } + writer.build(wtxn, &mut rng, dimension)?; Result::Ok(()) }) .map_err(InternalError::from)??; diff --git a/milli/src/update/index_documents/typed_chunk.rs b/milli/src/update/index_documents/typed_chunk.rs index 97a4bf712..e340137e2 100644 --- a/milli/src/update/index_documents/typed_chunk.rs +++ b/milli/src/update/index_documents/typed_chunk.rs @@ -673,22 +673,14 @@ pub(crate) fn write_typed_chunk_into_index( .get(&embedder_name) .map_or(false, |conf| conf.2); // FIXME: allow customizing distance - let writers: Vec<_> = crate::vector::arroy_db_range_for_embedder(embedder_index) - .map(|k| ArroyWrapper::new(index.vector_arroy, k, binary_quantized)) - .collect(); + let writer = ArroyWrapper::new(index.vector_arroy, embedder_index, binary_quantized); // remove vectors for docids we want them removed let merger = remove_vectors_builder.build(); let mut iter = merger.into_stream_merger_iter()?; while let Some((key, _)) = iter.next()? { let docid = key.try_into().map(DocumentId::from_be_bytes).unwrap(); - - for writer in &writers { - // Uses invariant: vectors are packed in the first writers. - if !writer.del_item(wtxn, expected_dimension, docid)? { - break; - } - } + writer.del_item(wtxn, expected_dimension, docid)?; } // add generated embeddings @@ -716,9 +708,7 @@ pub(crate) fn write_typed_chunk_into_index( embeddings.embedding_count(), ))); } - for (embedding, writer) in embeddings.iter().zip(&writers) { - writer.add_item(wtxn, expected_dimension, docid, embedding)?; - } + writer.add_items(wtxn, expected_dimension, docid, embeddings)?; } // perform the manual diff diff --git a/milli/src/vector/mod.rs b/milli/src/vector/mod.rs index d52e68bbe..644826dcd 100644 --- a/milli/src/vector/mod.rs +++ b/milli/src/vector/mod.rs @@ -32,60 +32,69 @@ pub const REQUEST_PARALLELISM: usize = 40; pub struct ArroyWrapper { quantized: bool, - index: u16, + index: u8, database: arroy::Database, } impl ArroyWrapper { - pub fn new(database: arroy::Database, index: u16, quantized: bool) -> Self { + pub fn new(database: arroy::Database, index: u8, quantized: bool) -> Self { Self { database, index, quantized } } - pub fn index(&self) -> u16 { + pub fn index(&self) -> u8 { self.index } pub fn dimensions(&self, rtxn: &RoTxn) -> Result { + let first_id = arroy_db_range_for_embedder(self.index).next().unwrap(); if self.quantized { - Ok(arroy::Reader::open(rtxn, self.index, self.quantized_db())?.dimensions()) + Ok(arroy::Reader::open(rtxn, first_id, self.quantized_db())?.dimensions()) } else { - Ok(arroy::Reader::open(rtxn, self.index, self.angular_db())?.dimensions()) + Ok(arroy::Reader::open(rtxn, first_id, self.angular_db())?.dimensions()) } } - pub fn quantize( - &mut self, - wtxn: &mut RwTxn, - index: u16, - dimension: usize, - ) -> Result<(), arroy::Error> { + pub fn quantize(&mut self, wtxn: &mut RwTxn, dimension: usize) -> Result<(), arroy::Error> { if !self.quantized { - let writer = arroy::Writer::new(self.angular_db(), index, dimension); - writer.prepare_changing_distance::(wtxn)?; + for index in arroy_db_range_for_embedder(self.index) { + let writer = arroy::Writer::new(self.angular_db(), index, dimension); + writer.prepare_changing_distance::(wtxn)?; + } self.quantized = true; } Ok(()) } + // TODO: We can stop early when we find an empty DB pub fn need_build(&self, rtxn: &RoTxn, dimension: usize) -> Result { - if self.quantized { - arroy::Writer::new(self.quantized_db(), self.index, dimension).need_build(rtxn) - } else { - arroy::Writer::new(self.angular_db(), self.index, dimension).need_build(rtxn) + for index in arroy_db_range_for_embedder(self.index) { + let need_build = if self.quantized { + arroy::Writer::new(self.quantized_db(), index, dimension).need_build(rtxn) + } else { + arroy::Writer::new(self.angular_db(), index, dimension).need_build(rtxn) + }; + if need_build? { + return Ok(true); + } } + Ok(false) } + /// TODO: We should early exit when it doesn't need to be built pub fn build( &self, wtxn: &mut RwTxn, rng: &mut R, dimension: usize, ) -> Result<(), arroy::Error> { - if self.quantized { - arroy::Writer::new(self.quantized_db(), self.index, dimension).build(wtxn, rng, None) - } else { - arroy::Writer::new(self.angular_db(), self.index, dimension).build(wtxn, rng, None) + for index in arroy_db_range_for_embedder(self.index) { + if self.quantized { + arroy::Writer::new(self.quantized_db(), index, dimension).build(wtxn, rng, None)? + } else { + arroy::Writer::new(self.angular_db(), index, dimension).build(wtxn, rng, None)? + } } + Ok(()) } pub fn add_item(