diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 39460052e..83c97f28a 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -680,10 +680,10 @@ impl IndexScheduler { self.index_mapper.indexer_config(), ); builder.set_primary_key(primary_key); - let must_stop = self.processing_tasks.read().unwrap().must_stop.clone(); + let must_stop_processing = self.must_stop_processing.clone(); builder.execute( |indexing_step| debug!("update: {:?}", indexing_step), - || must_stop.load(Relaxed), + || must_stop_processing.get(), )?; index_wtxn.commit()?; } @@ -762,7 +762,7 @@ impl IndexScheduler { content_files, mut tasks, } => { - let must_stop = self.processing_tasks.read().unwrap().must_stop.clone(); + let must_stop_processing = self.must_stop_processing.clone(); let indexer_config = self.index_mapper.indexer_config(); // TODO use the code from the IndexCreate operation if let Some(primary_key) = primary_key { @@ -772,7 +772,7 @@ impl IndexScheduler { builder.set_primary_key(primary_key); builder.execute( |indexing_step| debug!("update: {:?}", indexing_step), - || must_stop.clone().load(Relaxed), + || must_stop_processing.clone().get(), )?; } } @@ -788,7 +788,7 @@ impl IndexScheduler { indexer_config, config, |indexing_step| debug!("update: {:?}", indexing_step), - || must_stop.load(Relaxed), + || must_stop_processing.get(), )?; let mut results = Vec::new(); @@ -882,10 +882,10 @@ impl IndexScheduler { let mut builder = milli::update::Settings::new(index_wtxn, index, indexer_config); apply_settings_to_builder(&checked_settings, &mut builder); - let must_stop = self.processing_tasks.read().unwrap().must_stop.clone(); + let must_stop_processing = self.must_stop_processing.clone(); builder.execute( |indexing_step| debug!("update: {:?}", indexing_step), - || must_stop.load(Relaxed), + || must_stop_processing.get(), )?; task.status = Status::Succeeded; diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 9b52c9ce9..2d8c354e5 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -125,31 +125,41 @@ struct ProcessingTasks { started_at: OffsetDateTime, /// The list of tasks ids that are currently running. processing: RoaringBitmap, - /// A boolean that can be set to true to stop the currently processing tasks. - must_stop: Arc, } impl ProcessingTasks { - /// Stores the currently processing tasks, the date time at which it started - /// and resets the _must stop_ flag. + /// Stores the currently processing tasks, and the date time at which it started. fn start_processing_at(&mut self, started_at: OffsetDateTime, processing: RoaringBitmap) { self.started_at = started_at; self.processing = processing; - self.must_stop.store(false, Relaxed); } - /// Resets the processing tasks to an empty list. + /// Set the processing tasks to an empty list. fn stop_processing_at(&mut self, stopped_at: OffsetDateTime) { self.started_at = stopped_at; self.processing = RoaringBitmap::new(); } - /// Forces the currently processing tasks to stop running if necessary. - fn cancel_processing_tasks(&self, canceled_tasks: &RoaringBitmap) { - // If there, at least, is one task that is currently processing we must stop. - if !self.processing.is_disjoint(canceled_tasks) { - self.must_stop.store(true, Relaxed); - } + /// Returns `true` if there, at least, is one task that is currently processing we must stop. + fn must_cancel_processing_tasks(&self, canceled_tasks: &RoaringBitmap) -> bool { + !self.processing.is_disjoint(canceled_tasks) + } +} + +#[derive(Default, Clone, Debug)] +struct MustStopProcessing(Arc); + +impl MustStopProcessing { + fn get(&self) -> bool { + self.0.load(Relaxed) + } + + fn must_stop(&self) { + self.0.store(true, Relaxed); + } + + fn reset(&self) { + self.0.store(false, Relaxed); } } @@ -168,6 +178,8 @@ pub struct IndexScheduler { /// The LMDB environment which the DBs are associated with. pub(crate) env: Env, + /// A boolean that can be set to true to stop the currently processing tasks. + pub(crate) must_stop_processing: MustStopProcessing, pub(crate) processing_tasks: Arc>, pub(crate) file_store: FileStore, @@ -233,12 +245,12 @@ impl IndexScheduler { let processing_tasks = ProcessingTasks { started_at: OffsetDateTime::now_utc(), processing: RoaringBitmap::new(), - must_stop: Arc::new(AtomicBool::new(false)), }; let file_store = FileStore::new(&update_file_path)?; // allow unreachable_code to get rids of the warning in the case of a test build. let this = Self { + must_stop_processing: MustStopProcessing::default(), processing_tasks: Arc::new(RwLock::new(processing_tasks)), file_store, all_tasks: env.create_database(Some(db_name::ALL_TASKS))?, @@ -263,6 +275,7 @@ impl IndexScheduler { /// This function will execute in a different thread and must be called only once. fn run(&self) { let run = Self { + must_stop_processing: MustStopProcessing::default(), processing_tasks: self.processing_tasks.clone(), file_store: self.file_store.clone(), env: self.env.clone(), @@ -433,10 +446,14 @@ impl IndexScheduler { // we inform the processing tasks to stop (if necessary). if let KindWithContent::TaskCancelation { tasks, .. } = kind { let tasks_to_cancel = RoaringBitmap::from_iter(tasks); - self.processing_tasks + if self + .processing_tasks .read() .unwrap() - .cancel_processing_tasks(&tasks_to_cancel); + .must_cancel_processing_tasks(&tasks_to_cancel) + { + self.must_stop_processing.must_stop(); + } } // notify the scheduler loop to execute a new tick @@ -612,6 +629,9 @@ impl IndexScheduler { let processed_tasks = ids.len(); let processing_tasks = RoaringBitmap::from_sorted_iter(ids.iter().copied()).unwrap(); let started_at = OffsetDateTime::now_utc(); + + // We reset the must_stop flag to be sure that we don't stop processing tasks + self.must_stop_processing.reset(); self.processing_tasks .write() .unwrap() diff --git a/index-scheduler/src/snapshot.rs b/index-scheduler/src/snapshot.rs index bdc37856f..44f8faa36 100644 --- a/index-scheduler/src/snapshot.rs +++ b/index-scheduler/src/snapshot.rs @@ -14,6 +14,7 @@ use crate::{index_mapper::IndexMapper, IndexScheduler, Kind, Status}; pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String { let IndexScheduler { autobatching_enabled, + must_stop_processing, processing_tasks, file_store, env,