Fix a bug around deleting all the vectors of a doc

This commit is contained in:
Clément Renault 2024-11-28 15:15:06 +01:00
parent cc4bd54669
commit 096a28656e
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
3 changed files with 23 additions and 53 deletions

View File

@ -146,15 +146,13 @@ pub struct LargeVectors {
pub docid: DocumentId, pub docid: DocumentId,
/// The embedder id in which to insert the large embedding. /// The embedder id in which to insert the large embedding.
pub embedder_id: u8, pub embedder_id: u8,
/// The dimensions of the embeddings in this payload.
pub dimensions: u16,
/// The large embedding that must be written. /// The large embedding that must be written.
pub embeddings: Mmap, pub embeddings: Mmap,
} }
impl LargeVectors { impl LargeVectors {
pub fn read_embeddings(&self) -> impl Iterator<Item = &[f32]> { pub fn read_embeddings(&self, dimensions: usize) -> impl Iterator<Item = &[f32]> {
self.embeddings.chunks_exact(self.dimensions as usize).map(bytemuck::cast_slice) self.embeddings.chunks_exact(dimensions).map(bytemuck::cast_slice)
} }
} }
@ -241,15 +239,18 @@ impl ArroySetVector {
&self, &self,
frame: &FrameGrantR<'_>, frame: &FrameGrantR<'_>,
vec: &'v mut Vec<f32>, vec: &'v mut Vec<f32>,
) -> &'v [f32] { ) -> Option<&'v [f32]> {
vec.clear(); vec.clear();
let skip = EntryHeader::variant_size() + mem::size_of::<Self>(); let skip = EntryHeader::variant_size() + mem::size_of::<Self>();
let bytes = &frame[skip..]; let bytes = &frame[skip..];
if bytes.is_empty() {
return None;
}
bytes.chunks_exact(mem::size_of::<f32>()).for_each(|bytes| { bytes.chunks_exact(mem::size_of::<f32>()).for_each(|bytes| {
let f = bytes.try_into().map(f32::from_ne_bytes).unwrap(); let f = bytes.try_into().map(f32::from_ne_bytes).unwrap();
vec.push(f); vec.push(f);
}); });
&vec[..] Some(&vec[..])
} }
} }
@ -259,9 +260,8 @@ impl ArroySetVector {
/// non-aligned [f32] each with dimensions f32s. /// non-aligned [f32] each with dimensions f32s.
pub struct ArroySetVectors { pub struct ArroySetVectors {
pub docid: DocumentId, pub docid: DocumentId,
pub dimensions: u16,
pub embedder_id: u8, pub embedder_id: u8,
_padding: u8, _padding: [u8; 3],
} }
impl ArroySetVectors { impl ArroySetVectors {
@ -270,30 +270,6 @@ impl ArroySetVectors {
&frame[skip..] &frame[skip..]
} }
// /// The number of embeddings in this payload.
// pub fn embedding_count(&self, frame: &FrameGrantR<'_>) -> usize {
// let bytes = Self::remaining_bytes(frame);
// bytes.len().checked_div(self.dimensions as usize).unwrap()
// }
/// Read the embedding at `index` or `None` if out of bounds.
pub fn read_embedding_into_vec<'v>(
&self,
frame: &FrameGrantR<'_>,
index: usize,
vec: &'v mut Vec<f32>,
) -> Option<&'v [f32]> {
vec.clear();
let bytes = Self::remaining_bytes(frame);
let embedding_size = self.dimensions as usize * mem::size_of::<f32>();
let embedding_bytes = bytes.chunks_exact(embedding_size).nth(index)?;
embedding_bytes.chunks_exact(mem::size_of::<f32>()).for_each(|bytes| {
let f = bytes.try_into().map(f32::from_ne_bytes).unwrap();
vec.push(f);
});
Some(&vec[..])
}
/// Read all the embeddings and write them into an aligned `f32` Vec. /// Read all the embeddings and write them into an aligned `f32` Vec.
pub fn read_all_embeddings_into_vec<'v>( pub fn read_all_embeddings_into_vec<'v>(
&self, &self,
@ -607,18 +583,14 @@ impl<'b> ExtractorBbqueueSender<'b> {
let refcell = self.producers.get().unwrap(); let refcell = self.producers.get().unwrap();
let mut producer = refcell.0.borrow_mut_or_yield(); let mut producer = refcell.0.borrow_mut_or_yield();
// If there are no vector we specify the dimensions
// to zero to allocate no extra space at all
let dimensions = match embeddings.first() { let dimensions = match embeddings.first() {
Some(embedding) => embedding.len(), Some(embedding) => embedding.len(),
None => return Ok(()), None => 0,
};
let arroy_set_vector = ArroySetVectors {
docid,
dimensions: dimensions.try_into().unwrap(),
embedder_id,
_padding: 0,
}; };
let arroy_set_vector = ArroySetVectors { docid, embedder_id, _padding: [0; 3] };
let payload_header = EntryHeader::ArroySetVectors(arroy_set_vector); let payload_header = EntryHeader::ArroySetVectors(arroy_set_vector);
let total_length = EntryHeader::total_set_vectors_size(embeddings.len(), dimensions); let total_length = EntryHeader::total_set_vectors_size(embeddings.len(), dimensions);
if total_length > capacity { if total_length > capacity {
@ -632,13 +604,7 @@ impl<'b> ExtractorBbqueueSender<'b> {
value_file.sync_all()?; value_file.sync_all()?;
let embeddings = unsafe { Mmap::map(&value_file)? }; let embeddings = unsafe { Mmap::map(&value_file)? };
let large_vectors = LargeVectors { let large_vectors = LargeVectors { docid, embedder_id, embeddings };
docid,
embedder_id,
dimensions: dimensions.try_into().unwrap(),
embeddings,
};
self.sender.send(ReceiverAction::LargeVectors(large_vectors)).unwrap(); self.sender.send(ReceiverAction::LargeVectors(large_vectors)).unwrap();
return Ok(()); return Ok(());
@ -657,9 +623,11 @@ impl<'b> ExtractorBbqueueSender<'b> {
let (header_bytes, remaining) = grant.split_at_mut(header_size); let (header_bytes, remaining) = grant.split_at_mut(header_size);
payload_header.serialize_into(header_bytes); payload_header.serialize_into(header_bytes);
let output_iter = remaining.chunks_exact_mut(dimensions * mem::size_of::<f32>()); if dimensions != 0 {
for (embedding, output) in embeddings.iter().zip(output_iter) { let output_iter = remaining.chunks_exact_mut(dimensions * mem::size_of::<f32>());
output.copy_from_slice(bytemuck::cast_slice(embedding)); for (embedding, output) in embeddings.iter().zip(output_iter) {
output.copy_from_slice(bytemuck::cast_slice(embedding));
}
} }
// We could commit only the used memory. // We could commit only the used memory.

View File

@ -443,7 +443,7 @@ where
let (_, _, writer, dimensions) = let (_, _, writer, dimensions) =
arroy_writers.get(&embedder_id).expect("requested a missing embedder"); arroy_writers.get(&embedder_id).expect("requested a missing embedder");
let mut embeddings = Embeddings::new(*dimensions); let mut embeddings = Embeddings::new(*dimensions);
for embedding in large_vectors.read_embeddings() { for embedding in large_vectors.read_embeddings(*dimensions) {
embeddings.push(embedding.to_vec()).unwrap(); embeddings.push(embedding.to_vec()).unwrap();
} }
writer.del_items(wtxn, *dimensions, docid)?; writer.del_items(wtxn, *dimensions, docid)?;
@ -597,11 +597,12 @@ fn write_from_bbqueue(
EntryHeader::ArroySetVector(asv) => { EntryHeader::ArroySetVector(asv) => {
let ArroySetVector { docid, embedder_id, .. } = asv; let ArroySetVector { docid, embedder_id, .. } = asv;
let frame = frame_with_header.frame(); let frame = frame_with_header.frame();
let embedding = asv.read_embedding_into_vec(frame, aligned_embedding);
let (_, _, writer, dimensions) = let (_, _, writer, dimensions) =
arroy_writers.get(&embedder_id).expect("requested a missing embedder"); arroy_writers.get(&embedder_id).expect("requested a missing embedder");
writer.del_items(wtxn, *dimensions, docid)?; writer.del_items(wtxn, *dimensions, docid)?;
writer.add_item(wtxn, docid, embedding)?; if let Some(embedding) = asv.read_embedding_into_vec(frame, aligned_embedding) {
writer.add_item(wtxn, docid, embedding)?;
}
} }
EntryHeader::ArroySetVectors(asvs) => { EntryHeader::ArroySetVectors(asvs) => {
let ArroySetVectors { docid, embedder_id, .. } = asvs; let ArroySetVectors { docid, embedder_id, .. } = asvs;

View File

@ -5,6 +5,7 @@ pub trait RefCellExt<T: ?Sized> {
&self, &self,
) -> std::result::Result<RefMut<'_, T>, std::cell::BorrowMutError>; ) -> std::result::Result<RefMut<'_, T>, std::cell::BorrowMutError>;
#[track_caller]
fn borrow_mut_or_yield(&self) -> RefMut<'_, T> { fn borrow_mut_or_yield(&self) -> RefMut<'_, T> {
self.try_borrow_mut_or_yield().unwrap() self.try_borrow_mut_or_yield().unwrap()
} }