mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-01-31 15:31:53 +08:00
Merge #5276
5276: Fix the stuck indexation due to the internal BBQueue capacity r=curquiza a=Kerollmops Fixes https://github.com/meilisearch/meilisearch/issues/5277. Reduce the maximum reserve grant in the BBQueue so we are never stuck. Co-authored-by: Kerollmops <clement@meilisearch.com> Co-authored-by: Louis Dureuil <louis@meilisearch.com> Co-authored-by: Clément Renault <clement@meilisearch.com>
This commit is contained in:
commit
e20b91210d
@ -44,9 +44,9 @@ impl OfflineUpgrade {
|
|||||||
}
|
}
|
||||||
|
|
||||||
const FIRST_SUPPORTED_UPGRADE_FROM_VERSION: &str = "1.9.0";
|
const FIRST_SUPPORTED_UPGRADE_FROM_VERSION: &str = "1.9.0";
|
||||||
const LAST_SUPPORTED_UPGRADE_FROM_VERSION: &str = "1.12.5";
|
const LAST_SUPPORTED_UPGRADE_FROM_VERSION: &str = "1.12.7";
|
||||||
const FIRST_SUPPORTED_UPGRADE_TO_VERSION: &str = "1.10.0";
|
const FIRST_SUPPORTED_UPGRADE_TO_VERSION: &str = "1.10.0";
|
||||||
const LAST_SUPPORTED_UPGRADE_TO_VERSION: &str = "1.12.5";
|
const LAST_SUPPORTED_UPGRADE_TO_VERSION: &str = "1.12.7";
|
||||||
|
|
||||||
let upgrade_list = [
|
let upgrade_list = [
|
||||||
(
|
(
|
||||||
@ -73,7 +73,7 @@ impl OfflineUpgrade {
|
|||||||
("1", "10", _) => 1,
|
("1", "10", _) => 1,
|
||||||
("1", "11", _) => 2,
|
("1", "11", _) => 2,
|
||||||
("1", "12", "0" | "1" | "2") => 3,
|
("1", "12", "0" | "1" | "2") => 3,
|
||||||
("1", "12", "3" | "4" | "5") => no_upgrade,
|
("1", "12", "3" | "4" | "5" | "6" | "7") => no_upgrade,
|
||||||
_ => {
|
_ => {
|
||||||
bail!("Unsupported current version {current_major}.{current_minor}.{current_patch}. Can only upgrade from versions in range [{}-{}]",
|
bail!("Unsupported current version {current_major}.{current_minor}.{current_patch}. Can only upgrade from versions in range [{}-{}]",
|
||||||
FIRST_SUPPORTED_UPGRADE_FROM_VERSION,
|
FIRST_SUPPORTED_UPGRADE_FROM_VERSION,
|
||||||
@ -87,7 +87,7 @@ impl OfflineUpgrade {
|
|||||||
("1", "10", _) => 0,
|
("1", "10", _) => 0,
|
||||||
("1", "11", _) => 1,
|
("1", "11", _) => 1,
|
||||||
("1", "12", "0" | "1" | "2") => 2,
|
("1", "12", "0" | "1" | "2") => 2,
|
||||||
("1", "12", "3" | "4" | "5") => 3,
|
("1", "12", "3" | "4" | "5" | "6" | "7") => 3,
|
||||||
(major, _, _) if major.starts_with('v') => {
|
(major, _, _) if major.starts_with('v') => {
|
||||||
bail!("Target version must not starts with a `v`. Instead of writing `v1.9.0` write `1.9.0` for example.")
|
bail!("Target version must not starts with a `v`. Instead of writing `v1.9.0` write `1.9.0` for example.")
|
||||||
}
|
}
|
||||||
|
@ -27,6 +27,12 @@ use crate::update::new::KvReaderFieldId;
|
|||||||
use crate::vector::Embedding;
|
use crate::vector::Embedding;
|
||||||
use crate::{CboRoaringBitmapCodec, DocumentId, Error, Index, InternalError};
|
use crate::{CboRoaringBitmapCodec, DocumentId, Error, Index, InternalError};
|
||||||
|
|
||||||
|
/// Note that the FrameProducer requires up to 9 bytes to
|
||||||
|
/// encode the length, the max grant has been computed accordingly.
|
||||||
|
///
|
||||||
|
/// <https://docs.rs/bbqueue/latest/bbqueue/framed/index.html#frame-header>
|
||||||
|
const MAX_FRAME_HEADER_SIZE: usize = 9;
|
||||||
|
|
||||||
/// Creates a tuple of senders/receiver to be used by
|
/// Creates a tuple of senders/receiver to be used by
|
||||||
/// the extractors and the writer loop.
|
/// the extractors and the writer loop.
|
||||||
///
|
///
|
||||||
@ -53,8 +59,9 @@ pub fn extractor_writer_bbqueue(
|
|||||||
bbbuffers.resize_with(current_num_threads, || BBBuffer::new(bbbuffer_capacity));
|
bbbuffers.resize_with(current_num_threads, || BBBuffer::new(bbbuffer_capacity));
|
||||||
|
|
||||||
let capacity = bbbuffers.first().unwrap().capacity();
|
let capacity = bbbuffers.first().unwrap().capacity();
|
||||||
// Read the field description to understand this
|
// 1. Due to fragmentation in the bbbuffer, we can only accept up to half the capacity in a single message.
|
||||||
let capacity = capacity.checked_sub(9).unwrap();
|
// 2. Read the documentation for `MAX_FRAME_HEADER_SIZE` for more information about why it is here.
|
||||||
|
let max_grant = capacity.saturating_div(2).checked_sub(MAX_FRAME_HEADER_SIZE).unwrap();
|
||||||
|
|
||||||
let producers = ThreadLocal::with_capacity(bbbuffers.len());
|
let producers = ThreadLocal::with_capacity(bbbuffers.len());
|
||||||
let consumers = rayon::broadcast(|bi| {
|
let consumers = rayon::broadcast(|bi| {
|
||||||
@ -65,7 +72,7 @@ pub fn extractor_writer_bbqueue(
|
|||||||
});
|
});
|
||||||
|
|
||||||
let (sender, receiver) = flume::bounded(channel_capacity);
|
let (sender, receiver) = flume::bounded(channel_capacity);
|
||||||
let sender = ExtractorBbqueueSender { sender, producers, capacity };
|
let sender = ExtractorBbqueueSender { sender, producers, max_grant };
|
||||||
let receiver = WriterBbqueueReceiver {
|
let receiver = WriterBbqueueReceiver {
|
||||||
receiver,
|
receiver,
|
||||||
look_at_consumer: (0..consumers.len()).cycle(),
|
look_at_consumer: (0..consumers.len()).cycle(),
|
||||||
@ -81,13 +88,10 @@ pub struct ExtractorBbqueueSender<'a> {
|
|||||||
/// A memory buffer, one by thread, is used to serialize
|
/// A memory buffer, one by thread, is used to serialize
|
||||||
/// the entries directly in this shared, lock-free space.
|
/// the entries directly in this shared, lock-free space.
|
||||||
producers: ThreadLocal<FullySend<RefCell<FrameProducer<'a>>>>,
|
producers: ThreadLocal<FullySend<RefCell<FrameProducer<'a>>>>,
|
||||||
/// The capacity of this frame producer, will never be able to store more than that.
|
/// The maximum frame grant that a producer can reserve.
|
||||||
///
|
/// It will never be able to store more than that as the
|
||||||
/// Note that the FrameProducer requires up to 9 bytes to encode the length,
|
/// buffer cannot split data into two parts.
|
||||||
/// the capacity has been shrunk accordingly.
|
max_grant: usize,
|
||||||
///
|
|
||||||
/// <https://docs.rs/bbqueue/latest/bbqueue/framed/index.html#frame-header>
|
|
||||||
capacity: usize,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct WriterBbqueueReceiver<'a> {
|
pub struct WriterBbqueueReceiver<'a> {
|
||||||
@ -443,14 +447,14 @@ impl<'b> ExtractorBbqueueSender<'b> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn delete_vector(&self, docid: DocumentId) -> crate::Result<()> {
|
fn delete_vector(&self, docid: DocumentId) -> crate::Result<()> {
|
||||||
let capacity = self.capacity;
|
let max_grant = self.max_grant;
|
||||||
let refcell = self.producers.get().unwrap();
|
let refcell = self.producers.get().unwrap();
|
||||||
let mut producer = refcell.0.borrow_mut_or_yield();
|
let mut producer = refcell.0.borrow_mut_or_yield();
|
||||||
|
|
||||||
let payload_header = EntryHeader::ArroyDeleteVector(ArroyDeleteVector { docid });
|
let payload_header = EntryHeader::ArroyDeleteVector(ArroyDeleteVector { docid });
|
||||||
let total_length = EntryHeader::total_delete_vector_size();
|
let total_length = EntryHeader::total_delete_vector_size();
|
||||||
if total_length > capacity {
|
if total_length > max_grant {
|
||||||
panic!("The entry is larger ({total_length} bytes) than the BBQueue capacity ({capacity} bytes)");
|
panic!("The entry is larger ({total_length} bytes) than the BBQueue max grant ({max_grant} bytes)");
|
||||||
}
|
}
|
||||||
|
|
||||||
// Spin loop to have a frame the size we requested.
|
// Spin loop to have a frame the size we requested.
|
||||||
@ -468,7 +472,7 @@ impl<'b> ExtractorBbqueueSender<'b> {
|
|||||||
embedder_id: u8,
|
embedder_id: u8,
|
||||||
embeddings: &[Vec<f32>],
|
embeddings: &[Vec<f32>],
|
||||||
) -> crate::Result<()> {
|
) -> crate::Result<()> {
|
||||||
let capacity = self.capacity;
|
let max_grant = self.max_grant;
|
||||||
let refcell = self.producers.get().unwrap();
|
let refcell = self.producers.get().unwrap();
|
||||||
let mut producer = refcell.0.borrow_mut_or_yield();
|
let mut producer = refcell.0.borrow_mut_or_yield();
|
||||||
|
|
||||||
@ -479,7 +483,7 @@ impl<'b> ExtractorBbqueueSender<'b> {
|
|||||||
let arroy_set_vector = ArroySetVectors { docid, embedder_id, _padding: [0; 3] };
|
let arroy_set_vector = ArroySetVectors { docid, embedder_id, _padding: [0; 3] };
|
||||||
let payload_header = EntryHeader::ArroySetVectors(arroy_set_vector);
|
let payload_header = EntryHeader::ArroySetVectors(arroy_set_vector);
|
||||||
let total_length = EntryHeader::total_set_vectors_size(embeddings.len(), dimensions);
|
let total_length = EntryHeader::total_set_vectors_size(embeddings.len(), dimensions);
|
||||||
if total_length > capacity {
|
if total_length > max_grant {
|
||||||
let mut value_file = tempfile::tempfile().map(BufWriter::new)?;
|
let mut value_file = tempfile::tempfile().map(BufWriter::new)?;
|
||||||
for embedding in embeddings {
|
for embedding in embeddings {
|
||||||
let mut embedding_bytes = bytemuck::cast_slice(embedding);
|
let mut embedding_bytes = bytemuck::cast_slice(embedding);
|
||||||
@ -540,14 +544,14 @@ impl<'b> ExtractorBbqueueSender<'b> {
|
|||||||
where
|
where
|
||||||
F: FnOnce(&mut [u8], &mut [u8]) -> crate::Result<()>,
|
F: FnOnce(&mut [u8], &mut [u8]) -> crate::Result<()>,
|
||||||
{
|
{
|
||||||
let capacity = self.capacity;
|
let max_grant = self.max_grant;
|
||||||
let refcell = self.producers.get().unwrap();
|
let refcell = self.producers.get().unwrap();
|
||||||
let mut producer = refcell.0.borrow_mut_or_yield();
|
let mut producer = refcell.0.borrow_mut_or_yield();
|
||||||
|
|
||||||
let operation = DbOperation { database, key_length: Some(key_length) };
|
let operation = DbOperation { database, key_length: Some(key_length) };
|
||||||
let payload_header = EntryHeader::DbOperation(operation);
|
let payload_header = EntryHeader::DbOperation(operation);
|
||||||
let total_length = EntryHeader::total_key_value_size(key_length, value_length);
|
let total_length = EntryHeader::total_key_value_size(key_length, value_length);
|
||||||
if total_length > capacity {
|
if total_length > max_grant {
|
||||||
let mut key_buffer = vec![0; key_length.get() as usize].into_boxed_slice();
|
let mut key_buffer = vec![0; key_length.get() as usize].into_boxed_slice();
|
||||||
let value_file = tempfile::tempfile()?;
|
let value_file = tempfile::tempfile()?;
|
||||||
value_file.set_len(value_length.try_into().unwrap())?;
|
value_file.set_len(value_length.try_into().unwrap())?;
|
||||||
@ -601,7 +605,7 @@ impl<'b> ExtractorBbqueueSender<'b> {
|
|||||||
where
|
where
|
||||||
F: FnOnce(&mut [u8]) -> crate::Result<()>,
|
F: FnOnce(&mut [u8]) -> crate::Result<()>,
|
||||||
{
|
{
|
||||||
let capacity = self.capacity;
|
let max_grant = self.max_grant;
|
||||||
let refcell = self.producers.get().unwrap();
|
let refcell = self.producers.get().unwrap();
|
||||||
let mut producer = refcell.0.borrow_mut_or_yield();
|
let mut producer = refcell.0.borrow_mut_or_yield();
|
||||||
|
|
||||||
@ -610,8 +614,8 @@ impl<'b> ExtractorBbqueueSender<'b> {
|
|||||||
let operation = DbOperation { database, key_length: None };
|
let operation = DbOperation { database, key_length: None };
|
||||||
let payload_header = EntryHeader::DbOperation(operation);
|
let payload_header = EntryHeader::DbOperation(operation);
|
||||||
let total_length = EntryHeader::total_key_size(key_length);
|
let total_length = EntryHeader::total_key_size(key_length);
|
||||||
if total_length > capacity {
|
if total_length > max_grant {
|
||||||
panic!("The entry is larger ({total_length} bytes) than the BBQueue capacity ({capacity} bytes)");
|
panic!("The entry is larger ({total_length} bytes) than the BBQueue max grant ({max_grant} bytes)");
|
||||||
}
|
}
|
||||||
|
|
||||||
// Spin loop to have a frame the size we requested.
|
// Spin loop to have a frame the size we requested.
|
||||||
|
@ -93,17 +93,25 @@ where
|
|||||||
..grenad_parameters
|
..grenad_parameters
|
||||||
};
|
};
|
||||||
|
|
||||||
// We compute and remove the allocated BBQueues buffers capacity from the indexing memory.
|
// 5% percent of the allocated memory for the extractors, or min 100MiB
|
||||||
let minimum_capacity = 50 * 1024 * 1024 * pool.current_num_threads(); // 50 MiB
|
// 5% percent of the allocated memory for the bbqueues, or min 50MiB
|
||||||
|
//
|
||||||
|
// Minimum capacity for bbqueues
|
||||||
|
let minimum_total_bbbuffer_capacity = 50 * 1024 * 1024 * pool.current_num_threads(); // 50 MiB
|
||||||
|
let minimum_total_extractors_capacity = minimum_total_bbbuffer_capacity * 2;
|
||||||
|
|
||||||
let (grenad_parameters, total_bbbuffer_capacity) = grenad_parameters.max_memory.map_or(
|
let (grenad_parameters, total_bbbuffer_capacity) = grenad_parameters.max_memory.map_or(
|
||||||
(grenad_parameters, 2 * minimum_capacity), // 100 MiB by thread by default
|
(
|
||||||
|
GrenadParameters {
|
||||||
|
max_memory: Some(minimum_total_extractors_capacity),
|
||||||
|
..grenad_parameters
|
||||||
|
},
|
||||||
|
minimum_total_bbbuffer_capacity,
|
||||||
|
), // 100 MiB by thread by default
|
||||||
|max_memory| {
|
|max_memory| {
|
||||||
// 2% of the indexing memory
|
let total_bbbuffer_capacity = max_memory.max(minimum_total_bbbuffer_capacity);
|
||||||
let total_bbbuffer_capacity = (max_memory / 100 / 2).max(minimum_capacity);
|
|
||||||
let new_grenad_parameters = GrenadParameters {
|
let new_grenad_parameters = GrenadParameters {
|
||||||
max_memory: Some(
|
max_memory: Some(max_memory.max(minimum_total_extractors_capacity)),
|
||||||
max_memory.saturating_sub(total_bbbuffer_capacity).max(100 * 1024 * 1024),
|
|
||||||
),
|
|
||||||
..grenad_parameters
|
..grenad_parameters
|
||||||
};
|
};
|
||||||
(new_grenad_parameters, total_bbbuffer_capacity)
|
(new_grenad_parameters, total_bbbuffer_capacity)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user