diff --git a/meilisearch-http/src/lib.rs b/meilisearch-http/src/lib.rs index 9df66071e..2169bdb37 100644 --- a/meilisearch-http/src/lib.rs +++ b/meilisearch-http/src/lib.rs @@ -29,9 +29,9 @@ pub static AUTOBATCHING_ENABLED: AtomicBool = AtomicBool::new(false); pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result { let mut meilisearch = MeiliSearch::builder(); - // enable autobatching? - AUTOBATCHING_ENABLED.store( - opt.scheduler_options.enable_auto_batching, + // disable autobatching? + let _ = AUTOBATCHING_ENABLED.store( + opt.scheduler_options.disable_auto_batching, std::sync::atomic::Ordering::Relaxed, ); diff --git a/meilisearch-lib/src/options.rs b/meilisearch-lib/src/options.rs index c71f1cba6..ea810b9b7 100644 --- a/meilisearch-lib/src/options.rs +++ b/meilisearch-lib/src/options.rs @@ -41,27 +41,10 @@ pub struct IndexerOpts { #[derive(Debug, Clone, Parser, Default, Serialize)] pub struct SchedulerConfig { - /// enable the autobatching experimental feature - #[clap(long, hide = true)] - pub enable_auto_batching: bool, - - // The maximum number of updates of the same type that can be batched together. - // If unspecified, this is unlimited. A value of 0 is interpreted as 1. - #[clap(long, requires = "enable-auto-batching", hide = true)] - pub max_batch_size: Option, - - // The maximum number of documents in a document batch. Since batches must contain at least one - // update for the scheduler to make progress, the number of documents in a batch will be at - // least the number of documents of its first update. - #[clap(long, requires = "enable-auto-batching", hide = true)] - pub max_documents_per_batch: Option, - - /// Debounce duration in seconds - /// - /// When a new task is enqueued, the scheduler waits for `debounce_duration_sec` seconds for new updates before - /// starting to process a batch of updates. - #[clap(long, requires = "enable-auto-batching", hide = true)] - pub debounce_duration_sec: Option, + /// The engine will disable task auto-batching, + /// and will sequencialy compute each task one by one. + #[clap(long, env = "DISABLE_AUTO_BATCHING")] + pub disable_auto_batching: bool, } impl TryFrom<&IndexerOpts> for IndexerConfig { diff --git a/meilisearch-lib/src/tasks/scheduler.rs b/meilisearch-lib/src/tasks/scheduler.rs index 9c181b86b..a709f566e 100644 --- a/meilisearch-lib/src/tasks/scheduler.rs +++ b/meilisearch-lib/src/tasks/scheduler.rs @@ -3,7 +3,6 @@ use std::collections::{hash_map::Entry, BinaryHeap, HashMap, VecDeque}; use std::ops::{Deref, DerefMut}; use std::slice; use std::sync::Arc; -use std::time::Duration; use atomic_refcell::AtomicRefCell; use milli::update::IndexDocumentsMethod; @@ -248,17 +247,10 @@ impl Scheduler { pub fn new( store: TaskStore, performers: Vec>, - mut config: SchedulerConfig, + config: SchedulerConfig, ) -> Result>> { let (notifier, rcv) = watch::channel(()); - let debounce_time = config.debounce_duration_sec; - - // Disable autobatching - if !config.enable_auto_batching { - config.max_batch_size = Some(1); - } - let this = Self { snapshots: VecDeque::new(), tasks: TaskQueue::default(), @@ -275,12 +267,7 @@ impl Scheduler { let this = Arc::new(RwLock::new(this)); - let update_loop = UpdateLoop::new( - this.clone(), - performers, - debounce_time.filter(|&v| v > 0).map(Duration::from_secs), - rcv, - ); + let update_loop = UpdateLoop::new(this.clone(), performers, rcv); tokio::task::spawn_local(update_loop.run()); @@ -497,27 +484,17 @@ fn make_batch(tasks: &mut TaskQueue, config: &SchedulerConfig) -> Processing { match list.peek() { Some(pending) if pending.kind == kind => { // We always need to process at least one task for the scheduler to make progress. - if task_list.len() >= config.max_batch_size.unwrap_or(usize::MAX).max(1) - { + if config.disable_auto_batching && task_list.len() > 0 { break; } let pending = list.pop().unwrap(); task_list.push(pending.id); - // We add the number of documents to the count if we are scheduling document additions and - // stop adding if we already have enough. - // - // We check that bound only after adding the current task to the batch, so that a batch contains at least one task. + // We add the number of documents to the count if we are scheduling document additions. match pending.kind { TaskType::DocumentUpdate { number } | TaskType::DocumentAddition { number } => { doc_count += number; - - if doc_count - >= config.max_documents_per_batch.unwrap_or(usize::MAX) - { - break; - } } _ => (), } diff --git a/meilisearch-lib/src/tasks/update_loop.rs b/meilisearch-lib/src/tasks/update_loop.rs index b99eb54b5..b6e43e319 100644 --- a/meilisearch-lib/src/tasks/update_loop.rs +++ b/meilisearch-lib/src/tasks/update_loop.rs @@ -1,9 +1,7 @@ use std::sync::Arc; -use std::time::Duration; use time::OffsetDateTime; use tokio::sync::{watch, RwLock}; -use tokio::time::interval_at; use super::batch::Batch; use super::error::Result; @@ -17,20 +15,17 @@ pub struct UpdateLoop { performers: Vec>, notifier: Option>, - debounce_duration: Option, } impl UpdateLoop { pub fn new( scheduler: Arc>, performers: Vec>, - debuf_duration: Option, notifier: watch::Receiver<()>, ) -> Self { Self { scheduler, performers, - debounce_duration: debuf_duration, notifier: Some(notifier), } } @@ -43,11 +38,6 @@ impl UpdateLoop { break; } - if let Some(t) = self.debounce_duration { - let mut interval = interval_at(tokio::time::Instant::now() + t, t); - interval.tick().await; - }; - if let Err(e) = self.process_next_batch().await { log::error!("an error occurred while processing an update batch: {}", e); }