diff --git a/crates/milli/src/update/new/channel.rs b/crates/milli/src/update/new/channel.rs index 38f436837..102a27336 100644 --- a/crates/milli/src/update/new/channel.rs +++ b/crates/milli/src/update/new/channel.rs @@ -100,7 +100,6 @@ pub enum ReceiverAction { /// Wake up, you have frames to read for the BBQueue buffers. WakeUp, LargeEntry(LargeEntry), - LargeVector(LargeVector), LargeVectors(LargeVectors), } @@ -120,24 +119,6 @@ pub struct LargeEntry { pub value: Mmap, } -/// When an embedding is larger than the available -/// BBQueue space it arrives here. -#[derive(Debug)] -pub struct LargeVector { - /// The document id associated to the large embedding. - pub docid: DocumentId, - /// The embedder id in which to insert the large embedding. - pub embedder_id: u8, - /// The large embedding that must be written. - pub embedding: Mmap, -} - -impl LargeVector { - pub fn read_embedding(&self) -> &[f32] { - bytemuck::cast_slice(&self.embedding) - } -} - /// When embeddings are larger than the available /// BBQueue space it arrives here. #[derive(Debug)] @@ -225,35 +206,6 @@ pub struct ArroyDeleteVector { pub docid: DocumentId, } -#[derive(Debug, Clone, Copy, NoUninit, CheckedBitPattern)] -#[repr(C)] -/// The embedding is the remaining space and represents a non-aligned [f32]. -pub struct ArroySetVector { - pub docid: DocumentId, - pub embedder_id: u8, - _padding: [u8; 3], -} - -impl ArroySetVector { - pub fn read_embedding_into_vec<'v>( - &self, - frame: &FrameGrantR<'_>, - vec: &'v mut Vec, - ) -> Option<&'v [f32]> { - vec.clear(); - let skip = EntryHeader::variant_size() + mem::size_of::(); - let bytes = &frame[skip..]; - if bytes.is_empty() { - return None; - } - bytes.chunks_exact(mem::size_of::()).for_each(|bytes| { - let f = bytes.try_into().map(f32::from_ne_bytes).unwrap(); - vec.push(f); - }); - Some(&vec[..]) - } -} - #[derive(Debug, Clone, Copy, NoUninit, CheckedBitPattern)] #[repr(C)] /// The embeddings are in the remaining space and represents @@ -290,7 +242,6 @@ impl ArroySetVectors { pub enum EntryHeader { DbOperation(DbOperation), ArroyDeleteVector(ArroyDeleteVector), - ArroySetVector(ArroySetVector), ArroySetVectors(ArroySetVectors), } @@ -303,8 +254,7 @@ impl EntryHeader { match self { EntryHeader::DbOperation(_) => 0, EntryHeader::ArroyDeleteVector(_) => 1, - EntryHeader::ArroySetVector(_) => 2, - EntryHeader::ArroySetVectors(_) => 3, + EntryHeader::ArroySetVectors(_) => 2, } } @@ -323,11 +273,6 @@ impl EntryHeader { Self::variant_size() + mem::size_of::() } - /// The `dimensions` corresponds to the number of `f32` in the embedding. - fn total_set_vector_size(dimensions: usize) -> usize { - Self::variant_size() + mem::size_of::() + dimensions * mem::size_of::() - } - /// The `dimensions` corresponds to the number of `f32` in the embedding. fn total_set_vectors_size(count: usize, dimensions: usize) -> usize { let embedding_size = dimensions * mem::size_of::(); @@ -338,7 +283,6 @@ impl EntryHeader { let payload_size = match self { EntryHeader::DbOperation(op) => mem::size_of_val(op), EntryHeader::ArroyDeleteVector(adv) => mem::size_of_val(adv), - EntryHeader::ArroySetVector(asv) => mem::size_of_val(asv), EntryHeader::ArroySetVectors(asvs) => mem::size_of_val(asvs), }; Self::variant_size() + payload_size @@ -358,11 +302,6 @@ impl EntryHeader { EntryHeader::ArroyDeleteVector(header) } 2 => { - let header_bytes = &remaining[..mem::size_of::()]; - let header = checked::pod_read_unaligned(header_bytes); - EntryHeader::ArroySetVector(header) - } - 3 => { let header_bytes = &remaining[..mem::size_of::()]; let header = checked::pod_read_unaligned(header_bytes); EntryHeader::ArroySetVectors(header) @@ -376,7 +315,6 @@ impl EntryHeader { let payload_bytes = match self { EntryHeader::DbOperation(op) => bytemuck::bytes_of(op), EntryHeader::ArroyDeleteVector(adv) => bytemuck::bytes_of(adv), - EntryHeader::ArroySetVector(asv) => bytemuck::bytes_of(asv), EntryHeader::ArroySetVectors(asvs) => bytemuck::bytes_of(asvs), }; *first = self.variant_id(); @@ -520,59 +458,6 @@ impl<'b> ExtractorBbqueueSender<'b> { Ok(()) } - fn set_vector( - &self, - docid: DocumentId, - embedder_id: u8, - embedding: &[f32], - ) -> crate::Result<()> { - let capacity = self.capacity; - let refcell = self.producers.get().unwrap(); - let mut producer = refcell.0.borrow_mut_or_yield(); - - let arroy_set_vector = ArroySetVector { docid, embedder_id, _padding: [0; 3] }; - let payload_header = EntryHeader::ArroySetVector(arroy_set_vector); - let total_length = EntryHeader::total_set_vector_size(embedding.len()); - if total_length > capacity { - let mut embedding_bytes = bytemuck::cast_slice(embedding); - let mut value_file = tempfile::tempfile().map(BufWriter::new)?; - io::copy(&mut embedding_bytes, &mut value_file)?; - let value_file = value_file.into_inner().map_err(|ie| ie.into_error())?; - value_file.sync_all()?; - let embedding = unsafe { Mmap::map(&value_file)? }; - - let large_vector = LargeVector { docid, embedder_id, embedding }; - self.sender.send(ReceiverAction::LargeVector(large_vector)).unwrap(); - - return Ok(()); - } - - // Spin loop to have a frame the size we requested. - let mut grant = loop { - match producer.grant(total_length) { - Ok(grant) => break grant, - Err(bbqueue::Error::InsufficientSize) => continue, - Err(e) => unreachable!("{e:?}"), - } - }; - - let header_size = payload_header.header_size(); - let (header_bytes, remaining) = grant.split_at_mut(header_size); - payload_header.serialize_into(header_bytes); - remaining.copy_from_slice(bytemuck::cast_slice(embedding)); - - // We could commit only the used memory. - grant.commit(total_length); - - // We only send a wake up message when the channel is empty - // so that we don't fill the channel with too many WakeUps. - if self.sender.is_empty() { - self.sender.send(ReceiverAction::WakeUp).unwrap(); - } - - Ok(()) - } - fn set_vectors( &self, docid: u32, @@ -583,12 +468,9 @@ impl<'b> ExtractorBbqueueSender<'b> { let refcell = self.producers.get().unwrap(); let mut producer = refcell.0.borrow_mut_or_yield(); - // If there are no vector we specify the dimensions + // If there are no vectors we specify the dimensions // to zero to allocate no extra space at all - let dimensions = match embeddings.first() { - Some(embedding) => embedding.len(), - None => 0, - }; + let dimensions = embeddings.first().map_or(0, |emb| emb.len()); let arroy_set_vector = ArroySetVectors { docid, embedder_id, _padding: [0; 3] }; let payload_header = EntryHeader::ArroySetVectors(arroy_set_vector); @@ -954,7 +836,7 @@ impl EmbeddingSender<'_, '_> { embedder_id: u8, embedding: Embedding, ) -> crate::Result<()> { - self.0.set_vector(docid, embedder_id, &embedding[..]) + self.0.set_vectors(docid, embedder_id, &[embedding]) } } diff --git a/crates/milli/src/update/new/indexer/mod.rs b/crates/milli/src/update/new/indexer/mod.rs index 07cb9d69e..9a6b40efb 100644 --- a/crates/milli/src/update/new/indexer/mod.rs +++ b/crates/milli/src/update/new/indexer/mod.rs @@ -16,7 +16,6 @@ use rand::SeedableRng as _; use raw_collections::RawMap; use time::OffsetDateTime; pub use update_by_function::UpdateByFunction; -use {LargeEntry, LargeVector}; use super::channel::*; use super::extract::*; @@ -430,14 +429,6 @@ where })); } } - ReceiverAction::LargeVector(large_vector) => { - let embedding = large_vector.read_embedding(); - let LargeVector { docid, embedder_id, .. } = large_vector; - let (_, _, writer, dimensions) = - arroy_writers.get(&embedder_id).expect("requested a missing embedder"); - writer.del_items(wtxn, *dimensions, docid)?; - writer.add_item(wtxn, docid, embedding)?; - } ReceiverAction::LargeVectors(large_vectors) => { let LargeVectors { docid, embedder_id, .. } = large_vectors; let (_, _, writer, dimensions) = @@ -594,16 +585,6 @@ fn write_from_bbqueue( writer.del_items(wtxn, dimensions, docid)?; } } - EntryHeader::ArroySetVector(asv) => { - let ArroySetVector { docid, embedder_id, .. } = asv; - let frame = frame_with_header.frame(); - let (_, _, writer, dimensions) = - arroy_writers.get(&embedder_id).expect("requested a missing embedder"); - writer.del_items(wtxn, *dimensions, docid)?; - if let Some(embedding) = asv.read_embedding_into_vec(frame, aligned_embedding) { - writer.add_item(wtxn, docid, embedding)?; - } - } EntryHeader::ArroySetVectors(asvs) => { let ArroySetVectors { docid, embedder_id, .. } = asvs; let frame = frame_with_header.frame();