Make the batched tasks size limit effectively work

This commit is contained in:
Clément Renault 2025-01-09 11:59:35 +01:00
parent 8650ee66c1
commit d0bdff7b7b
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
7 changed files with 34 additions and 15 deletions

View File

@ -115,6 +115,9 @@ pub struct IndexSchedulerOptions {
/// If the autobatcher is allowed to automatically batch tasks /// If the autobatcher is allowed to automatically batch tasks
/// it will only batch this defined number of tasks at once. /// it will only batch this defined number of tasks at once.
pub max_number_of_batched_tasks: usize, pub max_number_of_batched_tasks: usize,
/// If the autobatcher is allowed to automatically batch tasks
/// it will only batch this defined maximum size (in bytes) of tasks at once.
pub batched_tasks_size_limit: u64,
/// The experimental features enabled for this instance. /// The experimental features enabled for this instance.
pub instance_features: InstanceTogglableFeatures, pub instance_features: InstanceTogglableFeatures,
} }

View File

@ -497,17 +497,26 @@ impl IndexScheduler {
1 1
}; };
let enqueued = index_tasks let mut enqueued = Vec::new();
.into_iter() let mut total_size: u64 = 0;
.take(tasks_limit) for task_id in index_tasks.into_iter().take(tasks_limit) {
.map(|task_id| { let task = self
self.queue .queue
.tasks .tasks
.get_task(rtxn, task_id) .get_task(rtxn, task_id)
.and_then(|task| task.ok_or(Error::CorruptedTaskQueue)) .and_then(|task| task.ok_or(Error::CorruptedTaskQueue))?;
.map(|task| (task.uid, task.kind))
}) if let Some(uuid) = task.content_uuid() {
.collect::<Result<Vec<_>>>()?; let content_size = self.queue.file_store.compute_size(uuid)?;
total_size = total_size.saturating_add(content_size);
}
if total_size > self.scheduler.batched_tasks_size_limit && !enqueued.is_empty() {
break;
}
enqueued.push((task.uid, task.kind));
}
if let Some((batchkind, create_index)) = if let Some((batchkind, create_index)) =
autobatcher::autobatch(enqueued, index_already_exists, primary_key.as_deref()) autobatcher::autobatch(enqueued, index_already_exists, primary_key.as_deref())

View File

@ -60,6 +60,9 @@ pub struct Scheduler {
/// The maximum number of tasks that will be batched together. /// The maximum number of tasks that will be batched together.
pub(crate) max_number_of_batched_tasks: usize, pub(crate) max_number_of_batched_tasks: usize,
/// The maximum size, in bytes, of tasks in a batch.
pub(crate) batched_tasks_size_limit: u64,
/// The path used to create the dumps. /// The path used to create the dumps.
pub(crate) dumps_path: PathBuf, pub(crate) dumps_path: PathBuf,
@ -80,6 +83,7 @@ impl Scheduler {
wake_up: self.wake_up.clone(), wake_up: self.wake_up.clone(),
autobatching_enabled: self.autobatching_enabled, autobatching_enabled: self.autobatching_enabled,
max_number_of_batched_tasks: self.max_number_of_batched_tasks, max_number_of_batched_tasks: self.max_number_of_batched_tasks,
batched_tasks_size_limit: self.batched_tasks_size_limit,
dumps_path: self.dumps_path.clone(), dumps_path: self.dumps_path.clone(),
snapshots_path: self.snapshots_path.clone(), snapshots_path: self.snapshots_path.clone(),
auth_path: self.auth_path.clone(), auth_path: self.auth_path.clone(),
@ -94,6 +98,7 @@ impl Scheduler {
wake_up: Arc::new(SignalEvent::auto(true)), wake_up: Arc::new(SignalEvent::auto(true)),
autobatching_enabled: options.autobatching_enabled, autobatching_enabled: options.autobatching_enabled,
max_number_of_batched_tasks: options.max_number_of_batched_tasks, max_number_of_batched_tasks: options.max_number_of_batched_tasks,
batched_tasks_size_limit: options.batched_tasks_size_limit,
dumps_path: options.dumps_path.clone(), dumps_path: options.dumps_path.clone(),
snapshots_path: options.snapshots_path.clone(), snapshots_path: options.snapshots_path.clone(),
auth_path: options.auth_path.clone(), auth_path: options.auth_path.clone(),

View File

@ -107,6 +107,7 @@ impl IndexScheduler {
cleanup_enabled: true, cleanup_enabled: true,
max_number_of_tasks: 1_000_000, max_number_of_tasks: 1_000_000,
max_number_of_batched_tasks: usize::MAX, max_number_of_batched_tasks: usize::MAX,
batched_tasks_size_limit: u64::MAX,
instance_features: Default::default(), instance_features: Default::default(),
}; };
configuration(&mut options); configuration(&mut options);

View File

@ -194,7 +194,7 @@ struct Infos {
experimental_enable_logs_route: bool, experimental_enable_logs_route: bool,
experimental_reduce_indexing_memory_usage: bool, experimental_reduce_indexing_memory_usage: bool,
experimental_max_number_of_batched_tasks: usize, experimental_max_number_of_batched_tasks: usize,
experimental_limit_batched_tasks_total_size: usize, experimental_limit_batched_tasks_total_size: u64,
gpu_enabled: bool, gpu_enabled: bool,
db_path: bool, db_path: bool,
import_dump: bool, import_dump: bool,

View File

@ -312,6 +312,7 @@ fn open_or_create_database_unchecked(
cleanup_enabled: !opt.experimental_replication_parameters, cleanup_enabled: !opt.experimental_replication_parameters,
max_number_of_tasks: 1_000_000, max_number_of_tasks: 1_000_000,
max_number_of_batched_tasks: opt.experimental_max_number_of_batched_tasks, max_number_of_batched_tasks: opt.experimental_max_number_of_batched_tasks,
batched_tasks_size_limit: opt.experimental_limit_batched_tasks_total_size,
index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().as_u64() as usize, index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().as_u64() as usize,
index_count: DEFAULT_INDEX_COUNT, index_count: DEFAULT_INDEX_COUNT,
instance_features, instance_features,

View File

@ -436,7 +436,7 @@ pub struct Opt {
/// see: <https://github.com/orgs/meilisearch/discussions/801> /// see: <https://github.com/orgs/meilisearch/discussions/801>
#[clap(long, env = MEILI_EXPERIMENTAL_LIMIT_BATCHED_TASKS_TOTAL_SIZE, default_value_t = default_limit_batched_tasks_total_size())] #[clap(long, env = MEILI_EXPERIMENTAL_LIMIT_BATCHED_TASKS_TOTAL_SIZE, default_value_t = default_limit_batched_tasks_total_size())]
#[serde(default = "default_limit_batched_tasks_total_size")] #[serde(default = "default_limit_batched_tasks_total_size")]
pub experimental_limit_batched_tasks_total_size: usize, pub experimental_limit_batched_tasks_total_size: u64,
#[serde(flatten)] #[serde(flatten)]
#[clap(flatten)] #[clap(flatten)]
@ -931,8 +931,8 @@ fn default_limit_batched_tasks() -> usize {
usize::MAX usize::MAX
} }
fn default_limit_batched_tasks_total_size() -> usize { fn default_limit_batched_tasks_total_size() -> u64 {
usize::MAX u64::MAX
} }
fn default_snapshot_dir() -> PathBuf { fn default_snapshot_dir() -> PathBuf {