diff --git a/config.toml b/config.toml index c47989f56..bbd70a63f 100644 --- a/config.toml +++ b/config.toml @@ -129,3 +129,6 @@ experimental_enable_metrics = false # Experimental RAM reduction during indexing, do not use in production, see: experimental_reduce_indexing_memory_usage = false + +# Experimentally reduces the maximum number of tasks that will be processed at once, see: +# experimental_max_number_of_batched_tasks = 100 diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 9089acb69..94a8b3f07 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -584,7 +584,9 @@ impl IndexScheduler { let index_tasks = self.index_tasks(rtxn, index_name)? & enqueued; // If autobatching is disabled we only take one task at a time. - let tasks_limit = if self.autobatching_enabled { usize::MAX } else { 1 }; + // Otherwise, we take only a maximum of tasks to create batches. + let tasks_limit = + if self.autobatching_enabled { self.max_number_of_batched_tasks } else { 1 }; let enqueued = index_tasks .into_iter() diff --git a/index-scheduler/src/insta_snapshot.rs b/index-scheduler/src/insta_snapshot.rs index 885a66f49..bd8fa5148 100644 --- a/index-scheduler/src/insta_snapshot.rs +++ b/index-scheduler/src/insta_snapshot.rs @@ -30,6 +30,7 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String { index_mapper, features: _, max_number_of_tasks: _, + max_number_of_batched_tasks: _, puffin_frame: _, wake_up: _, dumps_path: _, diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 446db8eae..a1b6497d9 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -258,6 +258,9 @@ pub struct IndexSchedulerOptions { /// The maximum number of tasks stored in the task queue before starting /// to auto schedule task deletions. pub max_number_of_tasks: usize, + /// If the autobatcher is allowed to automatically batch tasks + /// it will only batch this defined number of tasks at once. + pub max_number_of_batched_tasks: usize, /// The experimental features enabled for this instance. pub instance_features: InstanceTogglableFeatures, } @@ -316,6 +319,9 @@ pub struct IndexScheduler { /// the finished tasks automatically. pub(crate) max_number_of_tasks: usize, + /// The maximum number of tasks that will be batched together. + pub(crate) max_number_of_batched_tasks: usize, + /// A frame to output the indexation profiling files to disk. pub(crate) puffin_frame: Arc, @@ -373,6 +379,7 @@ impl IndexScheduler { wake_up: self.wake_up.clone(), autobatching_enabled: self.autobatching_enabled, max_number_of_tasks: self.max_number_of_tasks, + max_number_of_batched_tasks: self.max_number_of_batched_tasks, puffin_frame: self.puffin_frame.clone(), snapshots_path: self.snapshots_path.clone(), dumps_path: self.dumps_path.clone(), @@ -471,6 +478,7 @@ impl IndexScheduler { puffin_frame: Arc::new(puffin::GlobalFrameView::default()), autobatching_enabled: options.autobatching_enabled, max_number_of_tasks: options.max_number_of_tasks, + max_number_of_batched_tasks: options.max_number_of_batched_tasks, dumps_path: options.dumps_path, snapshots_path: options.snapshots_path, auth_path: options.auth_path, @@ -1638,6 +1646,7 @@ mod tests { indexer_config, autobatching_enabled: true, max_number_of_tasks: 1_000_000, + max_number_of_batched_tasks: usize::MAX, instance_features: Default::default(), }; configuration(&mut options); diff --git a/meilisearch/src/analytics/segment_analytics.rs b/meilisearch/src/analytics/segment_analytics.rs index 2f0014ab7..f75516731 100644 --- a/meilisearch/src/analytics/segment_analytics.rs +++ b/meilisearch/src/analytics/segment_analytics.rs @@ -251,6 +251,7 @@ struct Infos { env: String, experimental_enable_metrics: bool, experimental_reduce_indexing_memory_usage: bool, + experimental_max_number_of_batched_tasks: usize, db_path: bool, import_dump: bool, dump_dir: bool, @@ -285,6 +286,7 @@ impl From for Infos { db_path, experimental_enable_metrics, experimental_reduce_indexing_memory_usage, + experimental_max_number_of_batched_tasks, http_addr, master_key: _, env, @@ -340,6 +342,7 @@ impl From for Infos { ignore_snapshot_if_db_exists, http_addr: http_addr != default_http_addr(), http_payload_size_limit, + experimental_max_number_of_batched_tasks, log_level: log_level.to_string(), max_indexing_memory, max_indexing_threads, diff --git a/meilisearch/src/lib.rs b/meilisearch/src/lib.rs index 0dba77e08..e0f488eea 100644 --- a/meilisearch/src/lib.rs +++ b/meilisearch/src/lib.rs @@ -234,6 +234,7 @@ fn open_or_create_database_unchecked( indexer_config: (&opt.indexer_options).try_into()?, autobatching_enabled: true, max_number_of_tasks: 1_000_000, + max_number_of_batched_tasks: opt.experimental_max_number_of_batched_tasks, index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().get_bytes() as usize, index_count: DEFAULT_INDEX_COUNT, instance_features, diff --git a/meilisearch/src/option.rs b/meilisearch/src/option.rs index b8489c3e3..1ed20f5b5 100644 --- a/meilisearch/src/option.rs +++ b/meilisearch/src/option.rs @@ -51,6 +51,8 @@ const MEILI_LOG_LEVEL: &str = "MEILI_LOG_LEVEL"; const MEILI_EXPERIMENTAL_ENABLE_METRICS: &str = "MEILI_EXPERIMENTAL_ENABLE_METRICS"; const MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE: &str = "MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE"; +const MEILI_EXPERIMENTAL_MAX_NUMBER_OF_BATCHED_TASKS: &str = + "MEILI_EXPERIMENTAL_MAX_NUMBER_OF_BATCHED_TASKS"; const DEFAULT_CONFIG_FILE_PATH: &str = "./config.toml"; const DEFAULT_DB_PATH: &str = "./data.ms"; @@ -301,6 +303,11 @@ pub struct Opt { #[serde(default)] pub experimental_reduce_indexing_memory_usage: bool, + /// Experimentally reduces the maximum number of tasks that will be processed at once, see: + #[clap(long, env = MEILI_EXPERIMENTAL_MAX_NUMBER_OF_BATCHED_TASKS, default_value_t = default_limit_batched_tasks())] + #[serde(default = "default_limit_batched_tasks")] + pub experimental_max_number_of_batched_tasks: usize, + #[serde(flatten)] #[clap(flatten)] pub indexer_options: IndexerOpts, @@ -371,6 +378,7 @@ impl Opt { max_index_size: _, max_task_db_size: _, http_payload_size_limit, + experimental_max_number_of_batched_tasks, ssl_cert_path, ssl_key_path, ssl_auth_path, @@ -392,8 +400,8 @@ impl Opt { config_file_path: _, #[cfg(feature = "analytics")] no_analytics, - experimental_enable_metrics: enable_metrics_route, - experimental_reduce_indexing_memory_usage: reduce_indexing_memory_usage, + experimental_enable_metrics, + experimental_reduce_indexing_memory_usage, } = self; export_to_env_if_not_present(MEILI_DB_PATH, db_path); export_to_env_if_not_present(MEILI_HTTP_ADDR, http_addr); @@ -409,6 +417,10 @@ impl Opt { MEILI_HTTP_PAYLOAD_SIZE_LIMIT, http_payload_size_limit.to_string(), ); + export_to_env_if_not_present( + MEILI_EXPERIMENTAL_MAX_NUMBER_OF_BATCHED_TASKS, + experimental_max_number_of_batched_tasks.to_string(), + ); if let Some(ssl_cert_path) = ssl_cert_path { export_to_env_if_not_present(MEILI_SSL_CERT_PATH, ssl_cert_path); } @@ -433,11 +445,11 @@ impl Opt { export_to_env_if_not_present(MEILI_LOG_LEVEL, log_level.to_string()); export_to_env_if_not_present( MEILI_EXPERIMENTAL_ENABLE_METRICS, - enable_metrics_route.to_string(), + experimental_enable_metrics.to_string(), ); export_to_env_if_not_present( MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE, - reduce_indexing_memory_usage.to_string(), + experimental_reduce_indexing_memory_usage.to_string(), ); indexer_options.export_to_env(); } @@ -727,6 +739,10 @@ fn default_http_payload_size_limit() -> Byte { Byte::from_str(DEFAULT_HTTP_PAYLOAD_SIZE_LIMIT).unwrap() } +fn default_limit_batched_tasks() -> usize { + usize::MAX +} + fn default_snapshot_dir() -> PathBuf { PathBuf::from(DEFAULT_SNAPSHOT_DIR) }