From 6ac5b3b136086b8b25b1eb8dc2d6678e39846262 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Wed, 27 Nov 2024 13:36:30 +0100 Subject: [PATCH] Finish most of the channels types --- crates/milli/src/error.rs | 9 +- crates/milli/src/update/new/channel.rs | 662 +++++++++++------- .../src/update/new/extract/vectors/mod.rs | 2 +- crates/milli/src/update/new/indexer/mod.rs | 132 ++-- 4 files changed, 474 insertions(+), 331 deletions(-) diff --git a/crates/milli/src/error.rs b/crates/milli/src/error.rs index 4da57a3e1..800dfa375 100644 --- a/crates/milli/src/error.rs +++ b/crates/milli/src/error.rs @@ -62,9 +62,14 @@ pub enum InternalError { #[error(transparent)] Store(#[from] MdbError), #[error("Cannot delete {key:?} from database {database_name}: {error}")] - StoreDeletion { database_name: &'static str, key: Vec, error: heed::Error }, + StoreDeletion { database_name: &'static str, key: Box<[u8]>, error: heed::Error }, #[error("Cannot insert {key:?} and value with length {value_length} into database {database_name}: {error}")] - StorePut { database_name: &'static str, key: Vec, value_length: usize, error: heed::Error }, + StorePut { + database_name: &'static str, + key: Box<[u8]>, + value_length: usize, + error: heed::Error, + }, #[error(transparent)] Utf8(#[from] str::Utf8Error), #[error("An indexation process was explicitly aborted")] diff --git a/crates/milli/src/update/new/channel.rs b/crates/milli/src/update/new/channel.rs index cacc7b129..d2681c915 100644 --- a/crates/milli/src/update/new/channel.rs +++ b/crates/milli/src/update/new/channel.rs @@ -1,12 +1,11 @@ use std::cell::RefCell; use std::marker::PhantomData; +use std::mem; use std::num::NonZeroU16; -use std::{mem, slice}; use bbqueue::framed::{FrameGrantR, FrameProducer}; -use bytemuck::{NoUninit, CheckedBitPattern}; -use crossbeam::sync::{Parker, Unparker}; -use crossbeam_channel::{IntoIter, Receiver, SendError}; +use bytemuck::{checked, CheckedBitPattern, NoUninit}; +use crossbeam_channel::SendError; use heed::types::Bytes; use heed::BytesDecode; use memmap2::Mmap; @@ -17,21 +16,32 @@ use super::ref_cell_ext::RefCellExt; use super::thread_local::{FullySend, ThreadLocal}; use super::StdResult; use crate::heed_codec::facet::{FieldDocIdFacetF64Codec, FieldDocIdFacetStringCodec}; +use crate::index::db_name; use crate::index::main_key::{GEO_FACETED_DOCUMENTS_IDS_KEY, GEO_RTREE_KEY}; -use crate::index::{db_name, IndexEmbeddingConfig}; use crate::update::new::KvReaderFieldId; use crate::vector::Embedding; use crate::{CboRoaringBitmapCodec, DocumentId, Index}; -/// Creates a tuple of producer/receivers to be used by +/// Creates a tuple of senders/receiver to be used by /// the extractors and the writer loop. /// +/// The `channel_capacity` parameter defines the number of +/// too-large-to-fit-in-BBQueue entries that can be sent through +/// a crossbeam channel. This parameter must stay low to make +/// sure we do not use too much memory. +/// +/// Note that the channel is also used to wake-up the receiver +/// wehn new stuff is available in any BBQueue buffer but we send +/// a message in this queue only if it is empty to avoid filling +/// the channel *and* the BBQueue. +/// /// # Safety /// -/// Panics if the number of provided bbqueue is not exactly equal +/// Panics if the number of provided BBQueues is not exactly equal /// to the number of available threads in the rayon threadpool. pub fn extractor_writer_bbqueue( bbbuffers: &[bbqueue::BBBuffer], + channel_capacity: usize, ) -> (ExtractorBbqueueSender, WriterBbqueueReceiver) { assert_eq!( bbbuffers.len(), @@ -40,88 +50,252 @@ pub fn extractor_writer_bbqueue( ); let capacity = bbbuffers.first().unwrap().capacity(); - let parker = Parker::new(); - let extractors = ThreadLocal::with_capacity(bbbuffers.len()); - let producers = rayon::broadcast(|bi| { + // Read the field description to understand this + let capacity = capacity.checked_sub(9).unwrap(); + + let producers = ThreadLocal::with_capacity(bbbuffers.len()); + let consumers = rayon::broadcast(|bi| { let bbqueue = &bbbuffers[bi.index()]; let (producer, consumer) = bbqueue.try_split_framed().unwrap(); - extractors.get_or(|| FullySend(RefCell::new(producer))); + producers.get_or(|| FullySend(RefCell::new(producer))); consumer }); - ( - ExtractorBbqueueSender { - inner: extractors, - capacity: capacity.checked_sub(9).unwrap(), - unparker: parker.unparker().clone(), - }, - WriterBbqueueReceiver { inner: producers, parker }, - ) + let (sender, receiver) = crossbeam_channel::bounded(channel_capacity); + let sender = ExtractorBbqueueSender { sender, producers, capacity }; + let receiver = WriterBbqueueReceiver { receiver, consumers }; + (sender, receiver) +} + +pub struct ExtractorBbqueueSender<'a> { + /// This channel is used to wake-up the receiver and + /// send large entries that cannot fit in the BBQueue. + sender: crossbeam_channel::Sender, + /// A memory buffer, one by thread, is used to serialize + /// the entries directly in this shared, lock-free space. + producers: ThreadLocal>>>, + /// The capacity of this frame producer, will never be able to store more than that. + /// + /// Note that the FrameProducer requires up to 9 bytes to encode the length, + /// the capacity has been shrinked accordingly. + /// + /// + capacity: usize, } pub struct WriterBbqueueReceiver<'a> { - inner: Vec>, - /// Used to park when no more work is required - parker: Parker, + /// Used to wake up when new entries are available either in + /// any BBQueue buffer or directly sent throught this channel + /// (still written to disk). + receiver: crossbeam_channel::Receiver, + /// The BBQueue frames to read when waking-up. + consumers: Vec>, +} + +/// The action to perform on the receiver/writer side. +pub enum ReceiverAction { + /// Wake up, you have frames to read for the BBQueue buffers. + WakeUp, + /// An entry that cannot fit in the BBQueue buffers has been + /// written to disk, memory-mapped and must be written in the + /// database. + LargeEntry { + /// The database where the entry must be written. + database: Database, + /// The key of the entry that must be written in the database. + key: Box<[u8]>, + /// The large value that must be written. + /// + /// Note: We can probably use a `File` here and + /// use `Database::put_reserved` instead of memory-mapping. + value: Mmap, + }, } impl<'a> WriterBbqueueReceiver<'a> { + pub fn recv(&mut self) -> Option { + self.receiver.recv().ok() + } + pub fn read(&mut self) -> Option> { - loop { - for consumer in &mut self.inner { - // mark the frame as auto release - if let Some() = consumer.read() + for consumer in &mut self.consumers { + if let Some(frame) = consumer.read() { + return Some(FrameWithHeader::from(frame)); } - break None; } + None } } -struct FrameWithHeader<'a> { +pub struct FrameWithHeader<'a> { header: EntryHeader, frame: FrameGrantR<'a>, } -#[derive(Debug, Clone, Copy, CheckedBitPattern)] -#[repr(u8)] -enum EntryHeader { - /// Wether a put of the key/value pair or a delete of the given key. - DbOperation { - /// The database on which to perform the operation. - database: Database, - /// The key length in the buffer. - /// - /// If None it means that the buffer is dedicated - /// to the key and it is therefore a deletion operation. - key_length: Option, - }, - ArroyDeleteVector { - docid: DocumentId, - }, - /// The embedding is the remaining space and represents a non-aligned [f32]. - ArroySetVector { - docid: DocumentId, - embedder_id: u8, - }, +impl FrameWithHeader<'_> { + pub fn header(&self) -> EntryHeader { + self.header + } + + pub fn frame(&self) -> &FrameGrantR<'_> { + &self.frame + } } -impl EntryHeader { - fn delete_key_size(key_length: u16) -> usize { - mem::size_of::() + key_length as usize - } - - fn put_key_value_size(key_length: u16, value_length: usize) -> usize { - mem::size_of::() + key_length as usize + value_length - } - - fn bytes_of(&self) -> &[u8] { - /// TODO do the variant matching ourselves - todo!() +impl<'a> From> for FrameWithHeader<'a> { + fn from(mut frame: FrameGrantR<'a>) -> Self { + frame.auto_release(true); + FrameWithHeader { header: EntryHeader::from_slice(&frame[..]), frame } } } #[derive(Debug, Clone, Copy, NoUninit, CheckedBitPattern)] -#[repr(u32)] +#[repr(C)] +/// Wether a put of the key/value pair or a delete of the given key. +pub struct DbOperation { + /// The database on which to perform the operation. + pub database: Database, + /// The key length in the buffer. + /// + /// If None it means that the buffer is dedicated + /// to the key and it is therefore a deletion operation. + pub key_length: Option, +} + +impl DbOperation { + pub fn key_value<'a>(&self, frame: &'a FrameGrantR<'_>) -> (&'a [u8], Option<&'a [u8]>) { + /// TODO replace the return type by an enum Write | Delete + let skip = EntryHeader::variant_size() + mem::size_of::(); + match self.key_length { + Some(key_length) => { + let (key, value) = frame[skip..].split_at(key_length.get() as usize); + (key, Some(value)) + } + None => (&frame[skip..], None), + } + } +} + +#[derive(Debug, Clone, Copy, NoUninit, CheckedBitPattern)] +#[repr(transparent)] +pub struct ArroyDeleteVector { + pub docid: DocumentId, +} + +#[derive(Debug, Clone, Copy, NoUninit, CheckedBitPattern)] +#[repr(C)] +/// The embedding is the remaining space and represents a non-aligned [f32]. +pub struct ArroySetVector { + pub docid: DocumentId, + pub embedder_id: u8, + _padding: [u8; 3], +} + +impl ArroySetVector { + pub fn read_embedding_into_vec<'v>( + &self, + frame: &FrameGrantR<'_>, + vec: &'v mut Vec, + ) -> &'v [f32] { + vec.clear(); + let skip = EntryHeader::variant_size() + mem::size_of::(); + let bytes = &frame[skip..]; + bytes.chunks_exact(mem::size_of::()).for_each(|bytes| { + let f = bytes.try_into().map(f32::from_ne_bytes).unwrap(); + vec.push(f); + }); + &vec[..] + } +} + +#[derive(Debug, Clone, Copy)] +#[repr(u8)] +pub enum EntryHeader { + DbOperation(DbOperation), + ArroyDeleteVector(ArroyDeleteVector), + ArroySetVector(ArroySetVector), +} + +impl EntryHeader { + const fn variant_size() -> usize { + mem::size_of::() + } + + const fn variant_id(&self) -> u8 { + match self { + EntryHeader::DbOperation(_) => 0, + EntryHeader::ArroyDeleteVector(_) => 1, + EntryHeader::ArroySetVector(_) => 2, + } + } + + const fn total_key_value_size(key_length: NonZeroU16, value_length: usize) -> usize { + Self::variant_size() + + mem::size_of::() + + key_length.get() as usize + + value_length + } + + const fn total_key_size(key_length: NonZeroU16) -> usize { + Self::total_key_value_size(key_length, 0) + } + + const fn total_delete_vector_size() -> usize { + Self::variant_size() + mem::size_of::() + } + + /// The `embedding_length` corresponds to the number of `f32` in the embedding. + fn total_set_vector_size(embedding_length: usize) -> usize { + Self::variant_size() + + mem::size_of::() + + embedding_length * mem::size_of::() + } + + fn header_size(&self) -> usize { + let payload_size = match self { + EntryHeader::DbOperation(op) => mem::size_of_val(op), + EntryHeader::ArroyDeleteVector(adv) => mem::size_of_val(adv), + EntryHeader::ArroySetVector(asv) => mem::size_of_val(asv), + }; + Self::variant_size() + payload_size + } + + fn from_slice(slice: &[u8]) -> EntryHeader { + let (variant_id, remaining) = slice.split_first().unwrap(); + match variant_id { + 0 => { + let header_bytes = &remaining[..mem::size_of::()]; + let header = checked::pod_read_unaligned(header_bytes); + EntryHeader::DbOperation(header) + } + 1 => { + let header_bytes = &remaining[..mem::size_of::()]; + let header = checked::pod_read_unaligned(header_bytes); + EntryHeader::ArroyDeleteVector(header) + } + 2 => { + let header_bytes = &remaining[..mem::size_of::()]; + let header = checked::pod_read_unaligned(header_bytes); + EntryHeader::ArroySetVector(header) + } + id => panic!("invalid variant id: {id}"), + } + } + + fn serialize_into(&self, header_bytes: &mut [u8]) { + let (first, remaining) = header_bytes.split_first_mut().unwrap(); + let payload_bytes = match self { + EntryHeader::DbOperation(op) => bytemuck::bytes_of(op), + EntryHeader::ArroyDeleteVector(adv) => bytemuck::bytes_of(adv), + EntryHeader::ArroySetVector(asv) => bytemuck::bytes_of(asv), + }; + *first = self.variant_id(); + remaining.copy_from_slice(payload_bytes); + } +} + +#[derive(Debug, Clone, Copy, NoUninit, CheckedBitPattern)] +#[repr(u16)] pub enum Database { Main, Documents, @@ -197,20 +371,6 @@ impl From for Database { } } -pub struct ExtractorBbqueueSender<'a> { - inner: ThreadLocal>>>, - /// The capacity of this frame producer, will never be able to store more than that. - /// - /// Note that the FrameProducer requires up to 9 bytes to encode the length, - /// the capacity has been shrinked accordingly. - /// - /// - capacity: usize, - /// Used to wake up the receiver thread, - /// Used everytime we write something in the producer. - unparker: Unparker, -} - impl<'b> ExtractorBbqueueSender<'b> { pub fn docids<'a, D: DatabaseType>(&'a self) -> WordDocidsSender<'a, 'b, D> { WordDocidsSender { sender: self, _marker: PhantomData } @@ -236,80 +396,171 @@ impl<'b> ExtractorBbqueueSender<'b> { GeoSender(&self) } - fn send_delete_vector(&self, docid: DocumentId) -> crate::Result<()> { - match self - .sender - .send(WriterOperation::ArroyOperation(ArroyOperation::DeleteVectors { docid })) - { - Ok(()) => Ok(()), - Err(SendError(_)) => Err(SendError(())), + fn delete_vector(&self, docid: DocumentId) -> crate::Result<()> { + let capacity = self.capacity; + let refcell = self.producers.get().unwrap(); + let mut producer = refcell.0.borrow_mut_or_yield(); + + let payload_header = EntryHeader::ArroyDeleteVector(ArroyDeleteVector { docid }); + let total_length = EntryHeader::total_delete_vector_size(); + if total_length > capacity { + unreachable!("entry larger that the BBQueue capacity"); } + + // Spin loop to have a frame the size we requested. + let mut grant = loop { + match producer.grant(total_length) { + Ok(grant) => break grant, + Err(bbqueue::Error::InsufficientSize) => continue, + Err(e) => unreachable!("{e:?}"), + } + }; + + payload_header.serialize_into(&mut grant); + + // We could commit only the used memory. + grant.commit(total_length); + + Ok(()) + } + + fn set_vector( + &self, + docid: DocumentId, + embedder_id: u8, + embedding: &[f32], + ) -> crate::Result<()> { + let capacity = self.capacity; + let refcell = self.producers.get().unwrap(); + let mut producer = refcell.0.borrow_mut_or_yield(); + + let payload_header = + EntryHeader::ArroySetVector(ArroySetVector { docid, embedder_id, _padding: [0; 3] }); + let total_length = EntryHeader::total_set_vector_size(embedding.len()); + if total_length > capacity { + unreachable!("entry larger that the BBQueue capacity"); + } + + // Spin loop to have a frame the size we requested. + let mut grant = loop { + match producer.grant(total_length) { + Ok(grant) => break grant, + Err(bbqueue::Error::InsufficientSize) => continue, + Err(e) => unreachable!("{e:?}"), + } + }; + + // payload_header.serialize_into(&mut grant); + let header_size = payload_header.header_size(); + let (header_bytes, remaining) = grant.split_at_mut(header_size); + payload_header.serialize_into(header_bytes); + remaining.copy_from_slice(bytemuck::cast_slice(embedding)); + + // We could commit only the used memory. + grant.commit(total_length); + + Ok(()) } fn write_key_value(&self, database: Database, key: &[u8], value: &[u8]) -> crate::Result<()> { + let key_length = NonZeroU16::new(key.len().try_into().unwrap()).unwrap(); + self.write_key_value_with(database, key_length, value.len(), |buffer| { + let (key_buffer, value_buffer) = buffer.split_at_mut(key.len()); + key_buffer.copy_from_slice(key); + value_buffer.copy_from_slice(value); + Ok(()) + }) + } + + fn write_key_value_with( + &self, + database: Database, + key_length: NonZeroU16, + value_length: usize, + key_value_writer: F, + ) -> crate::Result<()> + where + F: FnOnce(&mut [u8]) -> crate::Result<()>, + { let capacity = self.capacity; - let refcell = self.inner.get().unwrap(); + let refcell = self.producers.get().unwrap(); let mut producer = refcell.0.borrow_mut_or_yield(); - let key_length = key.len().try_into().unwrap(); - let value_length = value.len(); - let total_length = EntryHeader::put_key_value_size(key_length, value_length); + let operation = DbOperation { database, key_length: Some(key_length) }; + let payload_header = EntryHeader::DbOperation(operation); + let total_length = EntryHeader::total_key_value_size(key_length, value_length); if total_length > capacity { - unreachable!("entry larger that the bbqueue capacity"); + unreachable!("entry larger that the BBQueue capacity"); } - let payload_header = - EntryHeader::DbOperation { database, key_length: NonZeroU16::new(key_length) }; - - loop { - let mut grant = match producer.grant(total_length) { - Ok(grant) => grant, + // Spin loop to have a frame the size we requested. + let mut grant = loop { + match producer.grant(total_length) { + Ok(grant) => break grant, Err(bbqueue::Error::InsufficientSize) => continue, Err(e) => unreachable!("{e:?}"), - }; + } + }; - let (header, remaining) = grant.split_at_mut(mem::size_of::()); - header.copy_from_slice(payload_header.bytes_of()); - let (key_out, value_out) = remaining.split_at_mut(key.len()); - key_out.copy_from_slice(key); - value_out.copy_from_slice(value); + let header_size = payload_header.header_size(); + let (header_bytes, remaining) = grant.split_at_mut(header_size); + payload_header.serialize_into(header_bytes); + key_value_writer(remaining)?; - // We could commit only the used memory. - grant.commit(total_length); + // We could commit only the used memory. + grant.commit(total_length); - break Ok(()); - } + Ok(()) } fn delete_entry(&self, database: Database, key: &[u8]) -> crate::Result<()> { + let key_length = NonZeroU16::new(key.len().try_into().unwrap()).unwrap(); + self.delete_entry_with(database, key_length, |buffer| { + buffer.copy_from_slice(key); + Ok(()) + }) + } + + fn delete_entry_with( + &self, + database: Database, + key_length: NonZeroU16, + key_writer: F, + ) -> crate::Result<()> + where + F: FnOnce(&mut [u8]) -> crate::Result<()>, + { let capacity = self.capacity; - let refcell = self.inner.get().unwrap(); + let refcell = self.producers.get().unwrap(); let mut producer = refcell.0.borrow_mut_or_yield(); - let key_length = key.len().try_into().unwrap(); - let total_length = EntryHeader::delete_key_size(key_length); + // For deletion we do not specify the key length, + // it's in the remaining bytes. + let operation = DbOperation { database, key_length: None }; + let payload_header = EntryHeader::DbOperation(operation); + let total_length = EntryHeader::total_key_size(key_length); if total_length > capacity { - unreachable!("entry larger that the bbqueue capacity"); + unreachable!("entry larger that the BBQueue capacity"); } - let payload_header = EntryHeader::DbOperation { database, key_length: None }; - - loop { - let mut grant = match producer.grant(total_length) { - Ok(grant) => grant, + // Spin loop to have a frame the size we requested. + let mut grant = loop { + match producer.grant(total_length) { + Ok(grant) => break grant, Err(bbqueue::Error::InsufficientSize) => continue, Err(e) => unreachable!("{e:?}"), - }; + } + }; - let (header, remaining) = grant.split_at_mut(mem::size_of::()); - header.copy_from_slice(payload_header.bytes_of()); - remaining.copy_from_slice(key); + let header_size = payload_header.header_size(); + let (header_bytes, remaining) = grant.split_at_mut(header_size); + payload_header.serialize_into(header_bytes); + key_writer(remaining)?; - // We could commit only the used memory. - grant.commit(total_length); + // We could commit only the used memory. + grant.commit(total_length); - break Ok(()); - } + Ok(()) } } @@ -355,72 +606,18 @@ pub struct WordDocidsSender<'a, 'b, D> { impl WordDocidsSender<'_, '_, D> { pub fn write(&self, key: &[u8], bitmap: &RoaringBitmap) -> crate::Result<()> { - let capacity = self.sender.capacity; - let refcell = self.sender.inner.get().unwrap(); - let mut producer = refcell.0.borrow_mut_or_yield(); - - let key_length = key.len().try_into().unwrap(); + let key_length = NonZeroU16::new(key.len().try_into().unwrap()).unwrap(); let value_length = CboRoaringBitmapCodec::serialized_size(bitmap); - - let total_length = EntryHeader::put_key_value_size(key_length, value_length); - if total_length > capacity { - unreachable!("entry larger that the bbqueue capacity"); - } - - let payload_header = EntryHeader::DbOperation { - database: D::DATABASE, - key_length: NonZeroU16::new(key_length), - }; - - loop { - let mut grant = match producer.grant(total_length) { - Ok(grant) => grant, - Err(bbqueue::Error::InsufficientSize) => continue, - Err(e) => unreachable!("{e:?}"), - }; - - let (header, remaining) = grant.split_at_mut(mem::size_of::()); - header.copy_from_slice(payload_header.bytes_of()); - let (key_out, value_out) = remaining.split_at_mut(key.len()); - key_out.copy_from_slice(key); - CboRoaringBitmapCodec::serialize_into_writer(bitmap, value_out)?; - - // We could commit only the used memory. - grant.commit(total_length); - - break Ok(()); - } + self.sender.write_key_value_with(D::DATABASE, key_length, value_length, |buffer| { + let (key_buffer, value_buffer) = buffer.split_at_mut(key.len()); + key_buffer.copy_from_slice(key); + CboRoaringBitmapCodec::serialize_into_writer(bitmap, value_buffer)?; + Ok(()) + }) } pub fn delete(&self, key: &[u8]) -> crate::Result<()> { - let capacity = self.sender.capacity; - let refcell = self.sender.inner.get().unwrap(); - let mut producer = refcell.0.borrow_mut_or_yield(); - - let key_length = key.len().try_into().unwrap(); - let total_length = EntryHeader::delete_key_size(key_length); - if total_length > capacity { - unreachable!("entry larger that the bbqueue capacity"); - } - - let payload_header = EntryHeader::DbOperation { database: D::DATABASE, key_length: None }; - - loop { - let mut grant = match producer.grant(total_length) { - Ok(grant) => grant, - Err(bbqueue::Error::InsufficientSize) => continue, - Err(e) => unreachable!("{e:?}"), - }; - - let (header, remaining) = grant.split_at_mut(mem::size_of::()); - header.copy_from_slice(payload_header.bytes_of()); - remaining.copy_from_slice(key); - - // We could commit only the used memory. - grant.commit(total_length); - - break Ok(()); - } + self.sender.delete_entry(D::DATABASE, key) } } @@ -430,13 +627,10 @@ pub struct FacetDocidsSender<'a, 'b> { impl FacetDocidsSender<'_, '_> { pub fn write(&self, key: &[u8], bitmap: &RoaringBitmap) -> crate::Result<()> { - let capacity = self.sender.capacity; - let refcell = self.sender.inner.get().unwrap(); - let mut producer = refcell.0.borrow_mut_or_yield(); - let (facet_kind, key) = FacetKind::extract_from_key(key); - let key_length = key.len().try_into().unwrap(); + let database = Database::from(facet_kind); + let key_length = NonZeroU16::new(key.len().try_into().unwrap()).unwrap(); let value_length = CboRoaringBitmapCodec::serialized_size(bitmap); let value_length = match facet_kind { // We must take the facet group size into account @@ -445,26 +639,8 @@ impl FacetDocidsSender<'_, '_> { FacetKind::Null | FacetKind::Empty | FacetKind::Exists => value_length, }; - let total_length = EntryHeader::put_key_value_size(key_length, value_length); - if total_length > capacity { - unreachable!("entry larger that the bbqueue capacity"); - } - - let payload_header = EntryHeader::DbOperation { - database: Database::from(facet_kind), - key_length: NonZeroU16::new(key_length), - }; - - loop { - let mut grant = match producer.grant(total_length) { - Ok(grant) => grant, - Err(bbqueue::Error::InsufficientSize) => continue, - Err(e) => unreachable!("{e:?}"), - }; - - let (header, remaining) = grant.split_at_mut(mem::size_of::()); - header.copy_from_slice(payload_header.bytes_of()); - let (key_out, value_out) = remaining.split_at_mut(key.len()); + self.sender.write_key_value_with(database, key_length, value_length, |buffer| { + let (key_out, value_out) = buffer.split_at_mut(key.len()); key_out.copy_from_slice(key); let value_out = match facet_kind { @@ -477,47 +653,17 @@ impl FacetDocidsSender<'_, '_> { } FacetKind::Null | FacetKind::Empty | FacetKind::Exists => value_out, }; + CboRoaringBitmapCodec::serialize_into_writer(bitmap, value_out)?; - // We could commit only the used memory. - grant.commit(total_length); - - break Ok(()); - } + Ok(()) + }) } pub fn delete(&self, key: &[u8]) -> crate::Result<()> { - let capacity = self.sender.capacity; - let refcell = self.sender.inner.get().unwrap(); - let mut producer = refcell.0.borrow_mut_or_yield(); - let (facet_kind, key) = FacetKind::extract_from_key(key); - let key_length = key.len().try_into().unwrap(); - - let total_length = EntryHeader::delete_key_size(key_length); - if total_length > capacity { - unreachable!("entry larger that the bbqueue capacity"); - } - - let payload_header = - EntryHeader::DbOperation { database: Database::from(facet_kind), key_length: None }; - - loop { - let mut grant = match producer.grant(total_length) { - Ok(grant) => grant, - Err(bbqueue::Error::InsufficientSize) => continue, - Err(e) => unreachable!("{e:?}"), - }; - - let (header, remaining) = grant.split_at_mut(mem::size_of::()); - header.copy_from_slice(payload_header.bytes_of()); - remaining.copy_from_slice(key); - - // We could commit only the used memory. - grant.commit(total_length); - - break Ok(()); - } + let database = Database::from(facet_kind); + self.sender.delete_entry(database, key) } } @@ -565,7 +711,7 @@ impl DocumentsSender<'_, '_> { pub fn delete(&self, docid: DocumentId, external_id: String) -> crate::Result<()> { self.0.delete_entry(Database::Documents, &docid.to_be_bytes())?; - self.0.send_delete_vector(docid)?; + self.0.delete_vector(docid)?; self.0.delete_entry(Database::ExternalDocumentsIds, external_id.as_bytes()) } } @@ -579,13 +725,10 @@ impl EmbeddingSender<'_, '_> { embedder_id: u8, embeddings: Vec, ) -> crate::Result<()> { - self.0 - .send(WriterOperation::ArroyOperation(ArroyOperation::SetVectors { - docid, - embedder_id, - embeddings, - })) - .map_err(|_| SendError(())) + for embedding in embeddings { + self.set_vector(docid, embedder_id, embedding)?; + } + Ok(()) } pub fn set_vector( @@ -593,21 +736,8 @@ impl EmbeddingSender<'_, '_> { docid: DocumentId, embedder_id: u8, embedding: Embedding, - ) -> StdResult<(), SendError<()>> { - self.0 - .send(WriterOperation::ArroyOperation(ArroyOperation::SetVector { - docid, - embedder_id, - embedding, - })) - .map_err(|_| SendError(())) - } - - /// Marks all embedders as "to be built" - pub fn finish(self, configs: Vec) -> StdResult<(), SendError<()>> { - self.0 - .send(WriterOperation::ArroyOperation(ArroyOperation::Finish { configs })) - .map_err(|_| SendError(())) + ) -> crate::Result<()> { + self.0.set_vector(docid, embedder_id, &embedding[..]) } } diff --git a/crates/milli/src/update/new/extract/vectors/mod.rs b/crates/milli/src/update/new/extract/vectors/mod.rs index 52b13f37d..42278d443 100644 --- a/crates/milli/src/update/new/extract/vectors/mod.rs +++ b/crates/milli/src/update/new/extract/vectors/mod.rs @@ -76,7 +76,7 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> { context.data, &self.possible_embedding_mistakes, self.threads, - self.sender, + &self.sender, &context.doc_alloc, )) } diff --git a/crates/milli/src/update/new/indexer/mod.rs b/crates/milli/src/update/new/indexer/mod.rs index f82f4af37..1fd60b610 100644 --- a/crates/milli/src/update/new/indexer/mod.rs +++ b/crates/milli/src/update/new/indexer/mod.rs @@ -40,7 +40,7 @@ use crate::update::new::words_prefix_docids::compute_exact_word_prefix_docids; use crate::update::new::{merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases}; use crate::update::settings::InnerIndexSettings; use crate::update::{FacetsUpdateBulk, GrenadParameters}; -use crate::vector::{ArroyWrapper, EmbeddingConfigs, Embeddings}; +use crate::vector::{ArroyWrapper, EmbeddingConfigs}; use crate::{ Error, FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder, UserError, @@ -80,7 +80,7 @@ where let bbbuffers: Vec<_> = (0..rayon::current_num_threads()) .map(|_| bbqueue::BBBuffer::new(100 * 1024 * 1024)) // 100 MiB by thread .collect(); - let (extractor_sender, writer_receiver) = extractor_writer_bbqueue(&bbbuffers); + let (extractor_sender, writer_receiver) = extractor_writer_bbqueue(&bbbuffers, 1000); let finished_extraction = AtomicBool::new(false); let metadata_builder = MetadataBuilder::from_index(index, wtxn)?; @@ -386,7 +386,11 @@ where }) .collect(); + // Used by by the ArroySetVector to copy the embedding into an + // aligned memory area, required by arroy to accept a new vector. + let mut aligned_embedding = Vec::new(); let mut arroy_writers = arroy_writers?; + { let span = tracing::trace_span!(target: "indexing::write_db", "all"); let _entered = span.enter(); @@ -394,81 +398,85 @@ where let span = tracing::trace_span!(target: "indexing::write_db", "post_merge"); let mut _entered_post_merge = None; - for operation in writer_receiver { + while let Some(action) = writer_receiver.recv() { if _entered_post_merge.is_none() && finished_extraction.load(std::sync::atomic::Ordering::Relaxed) { _entered_post_merge = Some(span.enter()); } - match operation { - WriterOperation::DbOperation(db_operation) => { - let database = db_operation.database(index); - let database_name = db_operation.database_name(); - match db_operation.entry() { - EntryOperation::Delete(e) => match database.delete(wtxn, e.entry()) { - Ok(false) => unreachable!("We tried to delete an unknown key"), - Ok(_) => (), - Err(error) => { - return Err(Error::InternalError( - InternalError::StoreDeletion { - database_name, - key: e.entry().to_owned(), - error, - }, - )); - } - }, - EntryOperation::Write(e) => { - if let Err(error) = database.put(wtxn, e.key(), e.value()) { - return Err(Error::InternalError(InternalError::StorePut { - database_name, - key: e.key().to_owned(), - value_length: e.value().len(), - error, - })); - } - } + + match action { + ReceiverAction::WakeUp => (), + ReceiverAction::LargeEntry { database, key, value } => { + let database_name = database.database_name(); + let database = database.database(index); + if let Err(error) = database.put(wtxn, &key, &value) { + return Err(Error::InternalError(InternalError::StorePut { + database_name, + key, + value_length: value.len(), + error, + })); } } - WriterOperation::ArroyOperation(arroy_operation) => match arroy_operation { - ArroyOperation::DeleteVectors { docid } => { - for ( - _embedder_index, - (_embedder_name, _embedder, writer, dimensions), - ) in &mut arroy_writers - { + } + + while let Some(frame_with_header) = writer_receiver.read() { + match frame_with_header.header() { + EntryHeader::DbOperation(operation) => { + let database_name = operation.database.database_name(); + let database = operation.database.database(index); + let frame = frame_with_header.frame(); + match operation.key_value(frame) { + (key, Some(value)) => { + if let Err(error) = database.put(wtxn, key, value) { + return Err(Error::InternalError(InternalError::StorePut { + database_name, + key: key.into(), + value_length: value.len(), + error, + })); + } + } + (key, None) => match database.delete(wtxn, key) { + Ok(false) => { + unreachable!("We tried to delete an unknown key: {key:?}") + } + Ok(_) => (), + Err(error) => { + return Err(Error::InternalError( + InternalError::StoreDeletion { + database_name, + key: key.into(), + error, + }, + )); + } + }, + } + } + EntryHeader::ArroyDeleteVector(ArroyDeleteVector { docid }) => { + for (_index, (_name, _embedder, writer, dimensions)) in &mut arroy_writers { let dimensions = *dimensions; writer.del_items(wtxn, dimensions, docid)?; } } - ArroyOperation::SetVectors { - docid, - embedder_id, - embeddings: raw_embeddings, - } => { - let (_, _, writer, dimensions) = arroy_writers - .get(&embedder_id) - .expect("requested a missing embedder"); - - let mut embeddings = Embeddings::new(*dimensions); - for embedding in raw_embeddings { - embeddings.append(embedding).unwrap(); - } - - writer.del_items(wtxn, *dimensions, docid)?; - writer.add_items(wtxn, docid, &embeddings)?; + EntryHeader::ArroySetVector(asv) => { + let ArroySetVector { docid, embedder_id, .. } = asv; + let frame = frame_with_header.frame(); + let embedding = asv.read_embedding_into_vec(frame, &mut aligned_embedding); + let (_, _, writer, dimensions) = + arroy_writers.get(&embedder_id).expect("requested a missing embedder"); + writer.del_items(wtxn, *dimensions, docid)?; + writer.add_item(wtxn, docid, embedding)?; + } } - ArroyOperation::SetVector { docid, embedder_id, embedding } => { - let (_, _, writer, dimensions) = - arroy_writers.get(&embedder_id).expect("requested a missing embedder"); - writer.del_items(wtxn, *dimensions, docid)?; - writer.add_item(wtxn, docid, &embedding)?; - } - _otherwise => unreachable!(), - }, + } } } + todo!("read the BBQueue once the channel is closed"); + 'vectors: { let span = tracing::trace_span!(target: "indexing::vectors", parent: &indexer_span, "build");