From 5ab4cdb1f352c8a29f301fd45c16c806a342577e Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Thu, 23 Jan 2025 10:43:28 +0100 Subject: [PATCH 1/4] Reduce the maximum grant possible we can store in the BBQueue --- crates/milli/src/update/new/channel.rs | 43 ++++++++++++++------------ 1 file changed, 23 insertions(+), 20 deletions(-) diff --git a/crates/milli/src/update/new/channel.rs b/crates/milli/src/update/new/channel.rs index 7590c02ac..c362075a5 100644 --- a/crates/milli/src/update/new/channel.rs +++ b/crates/milli/src/update/new/channel.rs @@ -27,6 +27,12 @@ use crate::update::new::KvReaderFieldId; use crate::vector::Embedding; use crate::{CboRoaringBitmapCodec, DocumentId, Error, Index, InternalError}; +/// Note that the FrameProducer requires up to 9 bytes to +/// encode the length, the max grant has been computed accordingly. +/// +/// +const MAX_FRAME_HEADER_SIZE: usize = 9; + /// Creates a tuple of senders/receiver to be used by /// the extractors and the writer loop. /// @@ -53,8 +59,8 @@ pub fn extractor_writer_bbqueue( bbbuffers.resize_with(current_num_threads, || BBBuffer::new(bbbuffer_capacity)); let capacity = bbbuffers.first().unwrap().capacity(); - // Read the field description to understand this - let capacity = capacity.checked_sub(9).unwrap(); + // Read the const description to understand this + let max_grant = capacity.saturating_div(2).checked_sub(MAX_FRAME_HEADER_SIZE).unwrap(); let producers = ThreadLocal::with_capacity(bbbuffers.len()); let consumers = rayon::broadcast(|bi| { @@ -65,7 +71,7 @@ pub fn extractor_writer_bbqueue( }); let (sender, receiver) = flume::bounded(channel_capacity); - let sender = ExtractorBbqueueSender { sender, producers, capacity }; + let sender = ExtractorBbqueueSender { sender, producers, max_grant }; let receiver = WriterBbqueueReceiver { receiver, look_at_consumer: (0..consumers.len()).cycle(), @@ -81,13 +87,10 @@ pub struct ExtractorBbqueueSender<'a> { /// A memory buffer, one by thread, is used to serialize /// the entries directly in this shared, lock-free space. producers: ThreadLocal>>>, - /// The capacity of this frame producer, will never be able to store more than that. - /// - /// Note that the FrameProducer requires up to 9 bytes to encode the length, - /// the capacity has been shrunk accordingly. - /// - /// - capacity: usize, + /// The maximum frame grant that a producer can reserve. + /// It will never be able to store more than that as the + /// buffer cannot split data into two parts. + max_grant: usize, } pub struct WriterBbqueueReceiver<'a> { @@ -443,14 +446,14 @@ impl<'b> ExtractorBbqueueSender<'b> { } fn delete_vector(&self, docid: DocumentId) -> crate::Result<()> { - let capacity = self.capacity; + let max_grant = self.max_grant; let refcell = self.producers.get().unwrap(); let mut producer = refcell.0.borrow_mut_or_yield(); let payload_header = EntryHeader::ArroyDeleteVector(ArroyDeleteVector { docid }); let total_length = EntryHeader::total_delete_vector_size(); - if total_length > capacity { - panic!("The entry is larger ({total_length} bytes) than the BBQueue capacity ({capacity} bytes)"); + if total_length > max_grant { + panic!("The entry is larger ({total_length} bytes) than the BBQueue max grant ({max_grant} bytes)"); } // Spin loop to have a frame the size we requested. @@ -468,7 +471,7 @@ impl<'b> ExtractorBbqueueSender<'b> { embedder_id: u8, embeddings: &[Vec], ) -> crate::Result<()> { - let capacity = self.capacity; + let max_grant = self.max_grant; let refcell = self.producers.get().unwrap(); let mut producer = refcell.0.borrow_mut_or_yield(); @@ -479,7 +482,7 @@ impl<'b> ExtractorBbqueueSender<'b> { let arroy_set_vector = ArroySetVectors { docid, embedder_id, _padding: [0; 3] }; let payload_header = EntryHeader::ArroySetVectors(arroy_set_vector); let total_length = EntryHeader::total_set_vectors_size(embeddings.len(), dimensions); - if total_length > capacity { + if total_length > max_grant { let mut value_file = tempfile::tempfile().map(BufWriter::new)?; for embedding in embeddings { let mut embedding_bytes = bytemuck::cast_slice(embedding); @@ -540,14 +543,14 @@ impl<'b> ExtractorBbqueueSender<'b> { where F: FnOnce(&mut [u8], &mut [u8]) -> crate::Result<()>, { - let capacity = self.capacity; + let max_grant = self.max_grant; let refcell = self.producers.get().unwrap(); let mut producer = refcell.0.borrow_mut_or_yield(); let operation = DbOperation { database, key_length: Some(key_length) }; let payload_header = EntryHeader::DbOperation(operation); let total_length = EntryHeader::total_key_value_size(key_length, value_length); - if total_length > capacity { + if total_length > max_grant { let mut key_buffer = vec![0; key_length.get() as usize].into_boxed_slice(); let value_file = tempfile::tempfile()?; value_file.set_len(value_length.try_into().unwrap())?; @@ -601,7 +604,7 @@ impl<'b> ExtractorBbqueueSender<'b> { where F: FnOnce(&mut [u8]) -> crate::Result<()>, { - let capacity = self.capacity; + let max_grant = self.max_grant; let refcell = self.producers.get().unwrap(); let mut producer = refcell.0.borrow_mut_or_yield(); @@ -610,8 +613,8 @@ impl<'b> ExtractorBbqueueSender<'b> { let operation = DbOperation { database, key_length: None }; let payload_header = EntryHeader::DbOperation(operation); let total_length = EntryHeader::total_key_size(key_length); - if total_length > capacity { - panic!("The entry is larger ({total_length} bytes) than the BBQueue capacity ({capacity} bytes)"); + if total_length > max_grant { + panic!("The entry is larger ({total_length} bytes) than the BBQueue max grant ({max_grant} bytes)"); } // Spin loop to have a frame the size we requested. From f5a4a1c8b26a37694716cd6a44d3c3610d4239b4 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Thu, 23 Jan 2025 10:55:03 +0100 Subject: [PATCH 2/4] Give more RAM to bbqueue. - bbqueue buffers used to have (5% * 2%) / num_threads - they now have 5% / num_threads --- crates/milli/src/update/new/indexer/mod.rs | 24 ++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/crates/milli/src/update/new/indexer/mod.rs b/crates/milli/src/update/new/indexer/mod.rs index 1cf83f2d2..b65750030 100644 --- a/crates/milli/src/update/new/indexer/mod.rs +++ b/crates/milli/src/update/new/indexer/mod.rs @@ -68,17 +68,25 @@ where ..grenad_parameters }; - // We compute and remove the allocated BBQueues buffers capacity from the indexing memory. - let minimum_capacity = 50 * 1024 * 1024 * pool.current_num_threads(); // 50 MiB + // 5% percent of the allocated memory for the extractors, or min 100MiB + // 5% percent of the allocated memory for the bbqueues, or min 50MiB + // + // Minimum capacity for bbqueues + let minimum_total_bbbuffer_capacity = 50 * 1024 * 1024 * pool.current_num_threads(); // 50 MiB + let minimum_total_extractors_capacity = minimum_total_bbbuffer_capacity * 2; + let (grenad_parameters, total_bbbuffer_capacity) = grenad_parameters.max_memory.map_or( - (grenad_parameters, 2 * minimum_capacity), // 100 MiB by thread by default + ( + GrenadParameters { + max_memory: Some(minimum_total_extractors_capacity), + ..grenad_parameters + }, + minimum_total_bbbuffer_capacity, + ), // 100 MiB by thread by default |max_memory| { - // 2% of the indexing memory - let total_bbbuffer_capacity = (max_memory / 100 / 2).max(minimum_capacity); + let total_bbbuffer_capacity = max_memory.max(minimum_total_bbbuffer_capacity); let new_grenad_parameters = GrenadParameters { - max_memory: Some( - max_memory.saturating_sub(total_bbbuffer_capacity).max(100 * 1024 * 1024), - ), + max_memory: Some(max_memory.max(minimum_total_extractors_capacity)), ..grenad_parameters }; (new_grenad_parameters, total_bbbuffer_capacity) From 9b579069dff3e6b18b64567cf679c2d80f03e77f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Thu, 23 Jan 2025 11:09:20 +0100 Subject: [PATCH 3/4] Comment the max grant of the bbqueue Co-authored-by: Louis Dureuil --- crates/milli/src/update/new/channel.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/milli/src/update/new/channel.rs b/crates/milli/src/update/new/channel.rs index c362075a5..7e2229950 100644 --- a/crates/milli/src/update/new/channel.rs +++ b/crates/milli/src/update/new/channel.rs @@ -59,7 +59,8 @@ pub fn extractor_writer_bbqueue( bbbuffers.resize_with(current_num_threads, || BBBuffer::new(bbbuffer_capacity)); let capacity = bbbuffers.first().unwrap().capacity(); - // Read the const description to understand this + // 1. Due to fragmentation in the bbbuffer, we can only accept up to half the capacity in a single message. + // 2. Read the documentation for `MAX_FRAME_HEADER_SIZE` for more information about why it is here. let max_grant = capacity.saturating_div(2).checked_sub(MAX_FRAME_HEADER_SIZE).unwrap(); let producers = ThreadLocal::with_capacity(bbbuffers.len()); From 50280bf02b6583539b8ea5277cb027afcd8cdca5 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Thu, 23 Jan 2025 11:11:20 +0100 Subject: [PATCH 4/4] Support offline upgrade up to v1.12.7 --- crates/meilitool/src/upgrade/mod.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/meilitool/src/upgrade/mod.rs b/crates/meilitool/src/upgrade/mod.rs index bfaa6683d..82a57317c 100644 --- a/crates/meilitool/src/upgrade/mod.rs +++ b/crates/meilitool/src/upgrade/mod.rs @@ -44,9 +44,9 @@ impl OfflineUpgrade { } const FIRST_SUPPORTED_UPGRADE_FROM_VERSION: &str = "1.9.0"; - const LAST_SUPPORTED_UPGRADE_FROM_VERSION: &str = "1.12.5"; + const LAST_SUPPORTED_UPGRADE_FROM_VERSION: &str = "1.12.7"; const FIRST_SUPPORTED_UPGRADE_TO_VERSION: &str = "1.10.0"; - const LAST_SUPPORTED_UPGRADE_TO_VERSION: &str = "1.12.5"; + const LAST_SUPPORTED_UPGRADE_TO_VERSION: &str = "1.12.7"; let upgrade_list = [ ( @@ -69,7 +69,7 @@ impl OfflineUpgrade { (1, 10, _) => 1, (1, 11, _) => 2, (1, 12, 0..=2) => 3, - (1, 12, 3..=5) => no_upgrade, + (1, 12, 3..=7) => no_upgrade, _ => { bail!("Unsupported current version {current_major}.{current_minor}.{current_patch}. Can only upgrade from versions in range [{}-{}]", FIRST_SUPPORTED_UPGRADE_FROM_VERSION, @@ -82,8 +82,8 @@ impl OfflineUpgrade { let ends_at = match (target_major, target_minor, target_patch) { (1, 10, _) => 0, (1, 11, _) => 1, - (1, 12, x) if x == 0 || x == 1 || x == 2 => 2, - (1, 12, 3..=5) => 3, + (1, 12, 0..=2) => 2, + (1, 12, 3..=7) => 3, _ => { bail!("Unsupported target version {target_major}.{target_minor}.{target_patch}. Can only upgrade to versions in range [{}-{}]", FIRST_SUPPORTED_UPGRADE_TO_VERSION,