Expose the --max-number-of-batched-tasks argument

This commit is contained in:
Clément Renault 2023-12-11 16:08:39 +01:00
parent 0fbc1511d7
commit 7e259cb0d2
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
6 changed files with 36 additions and 5 deletions

View File

@ -584,7 +584,9 @@ impl IndexScheduler {
let index_tasks = self.index_tasks(rtxn, index_name)? & enqueued; let index_tasks = self.index_tasks(rtxn, index_name)? & enqueued;
// If autobatching is disabled we only take one task at a time. // If autobatching is disabled we only take one task at a time.
let tasks_limit = if self.autobatching_enabled { usize::MAX } else { 1 }; // Otherwise, we take only a maximum of tasks to create batches.
let tasks_limit =
if self.autobatching_enabled { self.max_number_of_batched_tasks } else { 1 };
let enqueued = index_tasks let enqueued = index_tasks
.into_iter() .into_iter()

View File

@ -30,6 +30,7 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
index_mapper, index_mapper,
features: _, features: _,
max_number_of_tasks: _, max_number_of_tasks: _,
max_number_of_batched_tasks: _,
puffin_frame: _, puffin_frame: _,
wake_up: _, wake_up: _,
dumps_path: _, dumps_path: _,

View File

@ -258,6 +258,9 @@ pub struct IndexSchedulerOptions {
/// The maximum number of tasks stored in the task queue before starting /// The maximum number of tasks stored in the task queue before starting
/// to auto schedule task deletions. /// to auto schedule task deletions.
pub max_number_of_tasks: usize, pub max_number_of_tasks: usize,
/// If the autobatcher is allowed to automatically batch tasks
/// it will only batch this defined number of tasks at once.
pub max_number_of_batched_tasks: usize,
/// The experimental features enabled for this instance. /// The experimental features enabled for this instance.
pub instance_features: InstanceTogglableFeatures, pub instance_features: InstanceTogglableFeatures,
} }
@ -316,6 +319,9 @@ pub struct IndexScheduler {
/// the finished tasks automatically. /// the finished tasks automatically.
pub(crate) max_number_of_tasks: usize, pub(crate) max_number_of_tasks: usize,
/// The maximum number of tasks that will be batched together.
pub(crate) max_number_of_batched_tasks: usize,
/// A frame to output the indexation profiling files to disk. /// A frame to output the indexation profiling files to disk.
pub(crate) puffin_frame: Arc<puffin::GlobalFrameView>, pub(crate) puffin_frame: Arc<puffin::GlobalFrameView>,
@ -373,6 +379,7 @@ impl IndexScheduler {
wake_up: self.wake_up.clone(), wake_up: self.wake_up.clone(),
autobatching_enabled: self.autobatching_enabled, autobatching_enabled: self.autobatching_enabled,
max_number_of_tasks: self.max_number_of_tasks, max_number_of_tasks: self.max_number_of_tasks,
max_number_of_batched_tasks: self.max_number_of_batched_tasks,
puffin_frame: self.puffin_frame.clone(), puffin_frame: self.puffin_frame.clone(),
snapshots_path: self.snapshots_path.clone(), snapshots_path: self.snapshots_path.clone(),
dumps_path: self.dumps_path.clone(), dumps_path: self.dumps_path.clone(),
@ -471,6 +478,7 @@ impl IndexScheduler {
puffin_frame: Arc::new(puffin::GlobalFrameView::default()), puffin_frame: Arc::new(puffin::GlobalFrameView::default()),
autobatching_enabled: options.autobatching_enabled, autobatching_enabled: options.autobatching_enabled,
max_number_of_tasks: options.max_number_of_tasks, max_number_of_tasks: options.max_number_of_tasks,
max_number_of_batched_tasks: options.max_number_of_batched_tasks,
dumps_path: options.dumps_path, dumps_path: options.dumps_path,
snapshots_path: options.snapshots_path, snapshots_path: options.snapshots_path,
auth_path: options.auth_path, auth_path: options.auth_path,
@ -1638,6 +1646,7 @@ mod tests {
indexer_config, indexer_config,
autobatching_enabled: true, autobatching_enabled: true,
max_number_of_tasks: 1_000_000, max_number_of_tasks: 1_000_000,
max_number_of_batched_tasks: usize::MAX,
instance_features: Default::default(), instance_features: Default::default(),
}; };
configuration(&mut options); configuration(&mut options);

View File

@ -263,6 +263,7 @@ struct Infos {
ignore_snapshot_if_db_exists: bool, ignore_snapshot_if_db_exists: bool,
http_addr: bool, http_addr: bool,
http_payload_size_limit: Byte, http_payload_size_limit: Byte,
max_number_of_batched_tasks: usize,
log_level: String, log_level: String,
max_indexing_memory: MaxMemory, max_indexing_memory: MaxMemory,
max_indexing_threads: MaxThreads, max_indexing_threads: MaxThreads,
@ -291,6 +292,7 @@ impl From<Opt> for Infos {
max_index_size: _, max_index_size: _,
max_task_db_size: _, max_task_db_size: _,
http_payload_size_limit, http_payload_size_limit,
max_number_of_batched_tasks,
ssl_cert_path, ssl_cert_path,
ssl_key_path, ssl_key_path,
ssl_auth_path, ssl_auth_path,
@ -340,6 +342,7 @@ impl From<Opt> for Infos {
ignore_snapshot_if_db_exists, ignore_snapshot_if_db_exists,
http_addr: http_addr != default_http_addr(), http_addr: http_addr != default_http_addr(),
http_payload_size_limit, http_payload_size_limit,
max_number_of_batched_tasks,
log_level: log_level.to_string(), log_level: log_level.to_string(),
max_indexing_memory, max_indexing_memory,
max_indexing_threads, max_indexing_threads,

View File

@ -234,6 +234,7 @@ fn open_or_create_database_unchecked(
indexer_config: (&opt.indexer_options).try_into()?, indexer_config: (&opt.indexer_options).try_into()?,
autobatching_enabled: true, autobatching_enabled: true,
max_number_of_tasks: 1_000_000, max_number_of_tasks: 1_000_000,
max_number_of_batched_tasks: opt.max_number_of_batched_tasks,
index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().get_bytes() as usize, index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().get_bytes() as usize,
index_count: DEFAULT_INDEX_COUNT, index_count: DEFAULT_INDEX_COUNT,
instance_features, instance_features,

View File

@ -30,6 +30,7 @@ const MEILI_MASTER_KEY: &str = "MEILI_MASTER_KEY";
const MEILI_ENV: &str = "MEILI_ENV"; const MEILI_ENV: &str = "MEILI_ENV";
#[cfg(feature = "analytics")] #[cfg(feature = "analytics")]
const MEILI_NO_ANALYTICS: &str = "MEILI_NO_ANALYTICS"; const MEILI_NO_ANALYTICS: &str = "MEILI_NO_ANALYTICS";
const MEILI_MAX_NUMBER_OF_BATCHED_TASKS: &str = "MEILI_MAX_NUMBER_OF_BATCHED_TASKS";
const MEILI_HTTP_PAYLOAD_SIZE_LIMIT: &str = "MEILI_HTTP_PAYLOAD_SIZE_LIMIT"; const MEILI_HTTP_PAYLOAD_SIZE_LIMIT: &str = "MEILI_HTTP_PAYLOAD_SIZE_LIMIT";
const MEILI_SSL_CERT_PATH: &str = "MEILI_SSL_CERT_PATH"; const MEILI_SSL_CERT_PATH: &str = "MEILI_SSL_CERT_PATH";
const MEILI_SSL_KEY_PATH: &str = "MEILI_SSL_KEY_PATH"; const MEILI_SSL_KEY_PATH: &str = "MEILI_SSL_KEY_PATH";
@ -175,6 +176,11 @@ pub struct Opt {
#[serde(skip, default = "default_max_task_db_size")] #[serde(skip, default = "default_max_task_db_size")]
pub max_task_db_size: Byte, pub max_task_db_size: Byte,
/// Defines the maximum number of tasks that will be processed at once.
#[clap(long, env = MEILI_MAX_NUMBER_OF_BATCHED_TASKS, default_value_t = default_limit_batched_tasks())]
#[serde(default = "default_limit_batched_tasks")]
pub max_number_of_batched_tasks: usize,
/// Sets the maximum size of accepted payloads. Value must be given in bytes or explicitly stating a /// Sets the maximum size of accepted payloads. Value must be given in bytes or explicitly stating a
/// base unit (for instance: 107374182400, '107.7Gb', or '107374 Mb'). /// base unit (for instance: 107374182400, '107.7Gb', or '107374 Mb').
#[clap(long, env = MEILI_HTTP_PAYLOAD_SIZE_LIMIT, default_value_t = default_http_payload_size_limit())] #[clap(long, env = MEILI_HTTP_PAYLOAD_SIZE_LIMIT, default_value_t = default_http_payload_size_limit())]
@ -371,6 +377,7 @@ impl Opt {
max_index_size: _, max_index_size: _,
max_task_db_size: _, max_task_db_size: _,
http_payload_size_limit, http_payload_size_limit,
max_number_of_batched_tasks,
ssl_cert_path, ssl_cert_path,
ssl_key_path, ssl_key_path,
ssl_auth_path, ssl_auth_path,
@ -392,8 +399,8 @@ impl Opt {
config_file_path: _, config_file_path: _,
#[cfg(feature = "analytics")] #[cfg(feature = "analytics")]
no_analytics, no_analytics,
experimental_enable_metrics: enable_metrics_route, experimental_enable_metrics,
experimental_reduce_indexing_memory_usage: reduce_indexing_memory_usage, experimental_reduce_indexing_memory_usage,
} = self; } = self;
export_to_env_if_not_present(MEILI_DB_PATH, db_path); export_to_env_if_not_present(MEILI_DB_PATH, db_path);
export_to_env_if_not_present(MEILI_HTTP_ADDR, http_addr); export_to_env_if_not_present(MEILI_HTTP_ADDR, http_addr);
@ -409,6 +416,10 @@ impl Opt {
MEILI_HTTP_PAYLOAD_SIZE_LIMIT, MEILI_HTTP_PAYLOAD_SIZE_LIMIT,
http_payload_size_limit.to_string(), http_payload_size_limit.to_string(),
); );
export_to_env_if_not_present(
MEILI_MAX_NUMBER_OF_BATCHED_TASKS,
max_number_of_batched_tasks.to_string(),
);
if let Some(ssl_cert_path) = ssl_cert_path { if let Some(ssl_cert_path) = ssl_cert_path {
export_to_env_if_not_present(MEILI_SSL_CERT_PATH, ssl_cert_path); export_to_env_if_not_present(MEILI_SSL_CERT_PATH, ssl_cert_path);
} }
@ -433,11 +444,11 @@ impl Opt {
export_to_env_if_not_present(MEILI_LOG_LEVEL, log_level.to_string()); export_to_env_if_not_present(MEILI_LOG_LEVEL, log_level.to_string());
export_to_env_if_not_present( export_to_env_if_not_present(
MEILI_EXPERIMENTAL_ENABLE_METRICS, MEILI_EXPERIMENTAL_ENABLE_METRICS,
enable_metrics_route.to_string(), experimental_enable_metrics.to_string(),
); );
export_to_env_if_not_present( export_to_env_if_not_present(
MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE, MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE,
reduce_indexing_memory_usage.to_string(), experimental_reduce_indexing_memory_usage.to_string(),
); );
indexer_options.export_to_env(); indexer_options.export_to_env();
} }
@ -727,6 +738,10 @@ fn default_http_payload_size_limit() -> Byte {
Byte::from_str(DEFAULT_HTTP_PAYLOAD_SIZE_LIMIT).unwrap() Byte::from_str(DEFAULT_HTTP_PAYLOAD_SIZE_LIMIT).unwrap()
} }
fn default_limit_batched_tasks() -> usize {
usize::MAX
}
fn default_snapshot_dir() -> PathBuf { fn default_snapshot_dir() -> PathBuf {
PathBuf::from(DEFAULT_SNAPSHOT_DIR) PathBuf::from(DEFAULT_SNAPSHOT_DIR)
} }