Merge #5279
Some checks failed
Test suite / Tests on ubuntu-20.04 (push) Failing after 0s
Test suite / Tests almost all features (push) Has been skipped
Test suite / Test disabled tokenization (push) Has been skipped
Test suite / Run tests in debug (push) Failing after 2s
Test suite / Tests on windows-2022 (push) Failing after 22s
Test suite / Run Rustfmt (push) Successful in 2m14s
Test suite / Run Clippy (push) Successful in 5m21s
Run the indexing fuzzer / Setup the action (push) Successful in 1h4m54s
Test suite / Tests on macos-13 (push) Has been cancelled
Indexing bench (push) / Run and upload benchmarks (push) Has been cancelled
Benchmarks of indexing (push) / Run and upload benchmarks (push) Has been cancelled
Benchmarks of search for geo (push) / Run and upload benchmarks (push) Has been cancelled
Benchmarks of search for songs (push) / Run and upload benchmarks (push) Has been cancelled
Benchmarks of search for Wikipedia articles (push) / Run and upload benchmarks (push) Has been cancelled

5279: Bring back changes from v1.12.7 into main r=dureuill a=Kerollmops

This PR brings back v1.12.7 into main.

Co-authored-by: Kerollmops <clement@meilisearch.com>
Co-authored-by: Louis Dureuil <louis@meilisearch.com>
Co-authored-by: Clément Renault <clement@meilisearch.com>
This commit is contained in:
meili-bors[bot] 2025-01-24 11:48:46 +00:00 committed by GitHub
commit 022205af90
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 45 additions and 33 deletions

View File

@ -44,9 +44,9 @@ impl OfflineUpgrade {
} }
const FIRST_SUPPORTED_UPGRADE_FROM_VERSION: &str = "1.9.0"; const FIRST_SUPPORTED_UPGRADE_FROM_VERSION: &str = "1.9.0";
const LAST_SUPPORTED_UPGRADE_FROM_VERSION: &str = "1.12.5"; const LAST_SUPPORTED_UPGRADE_FROM_VERSION: &str = "1.12.7";
const FIRST_SUPPORTED_UPGRADE_TO_VERSION: &str = "1.10.0"; const FIRST_SUPPORTED_UPGRADE_TO_VERSION: &str = "1.10.0";
const LAST_SUPPORTED_UPGRADE_TO_VERSION: &str = "1.12.5"; const LAST_SUPPORTED_UPGRADE_TO_VERSION: &str = "1.12.7";
let upgrade_list = [ let upgrade_list = [
( (
@ -69,7 +69,7 @@ impl OfflineUpgrade {
(1, 10, _) => 1, (1, 10, _) => 1,
(1, 11, _) => 2, (1, 11, _) => 2,
(1, 12, 0..=2) => 3, (1, 12, 0..=2) => 3,
(1, 12, 3..=5) => no_upgrade, (1, 12, 3..=7) => no_upgrade,
_ => { _ => {
bail!("Unsupported current version {current_major}.{current_minor}.{current_patch}. Can only upgrade from versions in range [{}-{}]", bail!("Unsupported current version {current_major}.{current_minor}.{current_patch}. Can only upgrade from versions in range [{}-{}]",
FIRST_SUPPORTED_UPGRADE_FROM_VERSION, FIRST_SUPPORTED_UPGRADE_FROM_VERSION,
@ -82,8 +82,8 @@ impl OfflineUpgrade {
let ends_at = match (target_major, target_minor, target_patch) { let ends_at = match (target_major, target_minor, target_patch) {
(1, 10, _) => 0, (1, 10, _) => 0,
(1, 11, _) => 1, (1, 11, _) => 1,
(1, 12, x) if x == 0 || x == 1 || x == 2 => 2, (1, 12, 0..=2) => 2,
(1, 12, 3..=5) => 3, (1, 12, 3..=7) => 3,
_ => { _ => {
bail!("Unsupported target version {target_major}.{target_minor}.{target_patch}. Can only upgrade to versions in range [{}-{}]", bail!("Unsupported target version {target_major}.{target_minor}.{target_patch}. Can only upgrade to versions in range [{}-{}]",
FIRST_SUPPORTED_UPGRADE_TO_VERSION, FIRST_SUPPORTED_UPGRADE_TO_VERSION,

View File

@ -27,6 +27,12 @@ use crate::update::new::KvReaderFieldId;
use crate::vector::Embedding; use crate::vector::Embedding;
use crate::{CboRoaringBitmapCodec, DocumentId, Error, Index, InternalError}; use crate::{CboRoaringBitmapCodec, DocumentId, Error, Index, InternalError};
/// Note that the FrameProducer requires up to 9 bytes to
/// encode the length, the max grant has been computed accordingly.
///
/// <https://docs.rs/bbqueue/latest/bbqueue/framed/index.html#frame-header>
const MAX_FRAME_HEADER_SIZE: usize = 9;
/// Creates a tuple of senders/receiver to be used by /// Creates a tuple of senders/receiver to be used by
/// the extractors and the writer loop. /// the extractors and the writer loop.
/// ///
@ -53,8 +59,9 @@ pub fn extractor_writer_bbqueue(
bbbuffers.resize_with(current_num_threads, || BBBuffer::new(bbbuffer_capacity)); bbbuffers.resize_with(current_num_threads, || BBBuffer::new(bbbuffer_capacity));
let capacity = bbbuffers.first().unwrap().capacity(); let capacity = bbbuffers.first().unwrap().capacity();
// Read the field description to understand this // 1. Due to fragmentation in the bbbuffer, we can only accept up to half the capacity in a single message.
let capacity = capacity.checked_sub(9).unwrap(); // 2. Read the documentation for `MAX_FRAME_HEADER_SIZE` for more information about why it is here.
let max_grant = capacity.saturating_div(2).checked_sub(MAX_FRAME_HEADER_SIZE).unwrap();
let producers = ThreadLocal::with_capacity(bbbuffers.len()); let producers = ThreadLocal::with_capacity(bbbuffers.len());
let consumers = rayon::broadcast(|bi| { let consumers = rayon::broadcast(|bi| {
@ -65,7 +72,7 @@ pub fn extractor_writer_bbqueue(
}); });
let (sender, receiver) = flume::bounded(channel_capacity); let (sender, receiver) = flume::bounded(channel_capacity);
let sender = ExtractorBbqueueSender { sender, producers, capacity }; let sender = ExtractorBbqueueSender { sender, producers, max_grant };
let receiver = WriterBbqueueReceiver { let receiver = WriterBbqueueReceiver {
receiver, receiver,
look_at_consumer: (0..consumers.len()).cycle(), look_at_consumer: (0..consumers.len()).cycle(),
@ -81,13 +88,10 @@ pub struct ExtractorBbqueueSender<'a> {
/// A memory buffer, one by thread, is used to serialize /// A memory buffer, one by thread, is used to serialize
/// the entries directly in this shared, lock-free space. /// the entries directly in this shared, lock-free space.
producers: ThreadLocal<FullySend<RefCell<FrameProducer<'a>>>>, producers: ThreadLocal<FullySend<RefCell<FrameProducer<'a>>>>,
/// The capacity of this frame producer, will never be able to store more than that. /// The maximum frame grant that a producer can reserve.
/// /// It will never be able to store more than that as the
/// Note that the FrameProducer requires up to 9 bytes to encode the length, /// buffer cannot split data into two parts.
/// the capacity has been shrunk accordingly. max_grant: usize,
///
/// <https://docs.rs/bbqueue/latest/bbqueue/framed/index.html#frame-header>
capacity: usize,
} }
pub struct WriterBbqueueReceiver<'a> { pub struct WriterBbqueueReceiver<'a> {
@ -443,14 +447,14 @@ impl<'b> ExtractorBbqueueSender<'b> {
} }
fn delete_vector(&self, docid: DocumentId) -> crate::Result<()> { fn delete_vector(&self, docid: DocumentId) -> crate::Result<()> {
let capacity = self.capacity; let max_grant = self.max_grant;
let refcell = self.producers.get().unwrap(); let refcell = self.producers.get().unwrap();
let mut producer = refcell.0.borrow_mut_or_yield(); let mut producer = refcell.0.borrow_mut_or_yield();
let payload_header = EntryHeader::ArroyDeleteVector(ArroyDeleteVector { docid }); let payload_header = EntryHeader::ArroyDeleteVector(ArroyDeleteVector { docid });
let total_length = EntryHeader::total_delete_vector_size(); let total_length = EntryHeader::total_delete_vector_size();
if total_length > capacity { if total_length > max_grant {
panic!("The entry is larger ({total_length} bytes) than the BBQueue capacity ({capacity} bytes)"); panic!("The entry is larger ({total_length} bytes) than the BBQueue max grant ({max_grant} bytes)");
} }
// Spin loop to have a frame the size we requested. // Spin loop to have a frame the size we requested.
@ -468,7 +472,7 @@ impl<'b> ExtractorBbqueueSender<'b> {
embedder_id: u8, embedder_id: u8,
embeddings: &[Vec<f32>], embeddings: &[Vec<f32>],
) -> crate::Result<()> { ) -> crate::Result<()> {
let capacity = self.capacity; let max_grant = self.max_grant;
let refcell = self.producers.get().unwrap(); let refcell = self.producers.get().unwrap();
let mut producer = refcell.0.borrow_mut_or_yield(); let mut producer = refcell.0.borrow_mut_or_yield();
@ -479,7 +483,7 @@ impl<'b> ExtractorBbqueueSender<'b> {
let arroy_set_vector = ArroySetVectors { docid, embedder_id, _padding: [0; 3] }; let arroy_set_vector = ArroySetVectors { docid, embedder_id, _padding: [0; 3] };
let payload_header = EntryHeader::ArroySetVectors(arroy_set_vector); let payload_header = EntryHeader::ArroySetVectors(arroy_set_vector);
let total_length = EntryHeader::total_set_vectors_size(embeddings.len(), dimensions); let total_length = EntryHeader::total_set_vectors_size(embeddings.len(), dimensions);
if total_length > capacity { if total_length > max_grant {
let mut value_file = tempfile::tempfile().map(BufWriter::new)?; let mut value_file = tempfile::tempfile().map(BufWriter::new)?;
for embedding in embeddings { for embedding in embeddings {
let mut embedding_bytes = bytemuck::cast_slice(embedding); let mut embedding_bytes = bytemuck::cast_slice(embedding);
@ -540,14 +544,14 @@ impl<'b> ExtractorBbqueueSender<'b> {
where where
F: FnOnce(&mut [u8], &mut [u8]) -> crate::Result<()>, F: FnOnce(&mut [u8], &mut [u8]) -> crate::Result<()>,
{ {
let capacity = self.capacity; let max_grant = self.max_grant;
let refcell = self.producers.get().unwrap(); let refcell = self.producers.get().unwrap();
let mut producer = refcell.0.borrow_mut_or_yield(); let mut producer = refcell.0.borrow_mut_or_yield();
let operation = DbOperation { database, key_length: Some(key_length) }; let operation = DbOperation { database, key_length: Some(key_length) };
let payload_header = EntryHeader::DbOperation(operation); let payload_header = EntryHeader::DbOperation(operation);
let total_length = EntryHeader::total_key_value_size(key_length, value_length); let total_length = EntryHeader::total_key_value_size(key_length, value_length);
if total_length > capacity { if total_length > max_grant {
let mut key_buffer = vec![0; key_length.get() as usize].into_boxed_slice(); let mut key_buffer = vec![0; key_length.get() as usize].into_boxed_slice();
let value_file = tempfile::tempfile()?; let value_file = tempfile::tempfile()?;
value_file.set_len(value_length.try_into().unwrap())?; value_file.set_len(value_length.try_into().unwrap())?;
@ -601,7 +605,7 @@ impl<'b> ExtractorBbqueueSender<'b> {
where where
F: FnOnce(&mut [u8]) -> crate::Result<()>, F: FnOnce(&mut [u8]) -> crate::Result<()>,
{ {
let capacity = self.capacity; let max_grant = self.max_grant;
let refcell = self.producers.get().unwrap(); let refcell = self.producers.get().unwrap();
let mut producer = refcell.0.borrow_mut_or_yield(); let mut producer = refcell.0.borrow_mut_or_yield();
@ -610,8 +614,8 @@ impl<'b> ExtractorBbqueueSender<'b> {
let operation = DbOperation { database, key_length: None }; let operation = DbOperation { database, key_length: None };
let payload_header = EntryHeader::DbOperation(operation); let payload_header = EntryHeader::DbOperation(operation);
let total_length = EntryHeader::total_key_size(key_length); let total_length = EntryHeader::total_key_size(key_length);
if total_length > capacity { if total_length > max_grant {
panic!("The entry is larger ({total_length} bytes) than the BBQueue capacity ({capacity} bytes)"); panic!("The entry is larger ({total_length} bytes) than the BBQueue max grant ({max_grant} bytes)");
} }
// Spin loop to have a frame the size we requested. // Spin loop to have a frame the size we requested.

View File

@ -68,17 +68,25 @@ where
..grenad_parameters ..grenad_parameters
}; };
// We compute and remove the allocated BBQueues buffers capacity from the indexing memory. // 5% percent of the allocated memory for the extractors, or min 100MiB
let minimum_capacity = 50 * 1024 * 1024 * pool.current_num_threads(); // 50 MiB // 5% percent of the allocated memory for the bbqueues, or min 50MiB
//
// Minimum capacity for bbqueues
let minimum_total_bbbuffer_capacity = 50 * 1024 * 1024 * pool.current_num_threads(); // 50 MiB
let minimum_total_extractors_capacity = minimum_total_bbbuffer_capacity * 2;
let (grenad_parameters, total_bbbuffer_capacity) = grenad_parameters.max_memory.map_or( let (grenad_parameters, total_bbbuffer_capacity) = grenad_parameters.max_memory.map_or(
(grenad_parameters, 2 * minimum_capacity), // 100 MiB by thread by default (
GrenadParameters {
max_memory: Some(minimum_total_extractors_capacity),
..grenad_parameters
},
minimum_total_bbbuffer_capacity,
), // 100 MiB by thread by default
|max_memory| { |max_memory| {
// 2% of the indexing memory let total_bbbuffer_capacity = max_memory.max(minimum_total_bbbuffer_capacity);
let total_bbbuffer_capacity = (max_memory / 100 / 2).max(minimum_capacity);
let new_grenad_parameters = GrenadParameters { let new_grenad_parameters = GrenadParameters {
max_memory: Some( max_memory: Some(max_memory.max(minimum_total_extractors_capacity)),
max_memory.saturating_sub(total_bbbuffer_capacity).max(100 * 1024 * 1024),
),
..grenad_parameters ..grenad_parameters
}; };
(new_grenad_parameters, total_bbbuffer_capacity) (new_grenad_parameters, total_bbbuffer_capacity)