From 739b9f550502f6e46983e7922f839c12a07c419e Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Wed, 2 Nov 2022 15:38:07 +0100 Subject: [PATCH] Use the content of the ProcessingTasks in the tasks cancelation system --- index-scheduler/src/batch.rs | 68 +++++++++++++++---- index-scheduler/src/lib.rs | 20 ++++-- .../cancel_mix_of_tasks/cancel_processed.snap | 1 + .../cancel_processed.snap | 1 + 4 files changed, 69 insertions(+), 21 deletions(-) diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 01ad50d54..31c338bb5 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -41,7 +41,7 @@ use uuid::Uuid; use crate::autobatcher::{self, BatchKind}; use crate::utils::{self, swap_index_uid_in_task}; -use crate::{Error, IndexScheduler, Result, TaskId}; +use crate::{Error, IndexScheduler, ProcessingTasks, Result, TaskId}; /// Represents a combination of tasks that can all be processed at the same time. /// @@ -50,15 +50,39 @@ use crate::{Error, IndexScheduler, Result, TaskId}; /// be processed. #[derive(Debug)] pub(crate) enum Batch { - TaskCancelation(Task), + TaskCancelation { + /// The task cancelation itself. + task: Task, + /// The date and time at which the previously processing tasks started. + previous_started_at: OffsetDateTime, + /// The list of tasks that were processing when this task cancelation appeared. + previous_processing_tasks: RoaringBitmap, + }, TaskDeletion(Task), SnapshotCreation(Vec), Dump(Task), - IndexOperation { op: IndexOperation, must_create_index: bool }, - IndexCreation { index_uid: String, primary_key: Option, task: Task }, - IndexUpdate { index_uid: String, primary_key: Option, task: Task }, - IndexDeletion { index_uid: String, tasks: Vec, index_has_been_created: bool }, - IndexSwap { task: Task }, + IndexOperation { + op: IndexOperation, + must_create_index: bool, + }, + IndexCreation { + index_uid: String, + primary_key: Option, + task: Task, + }, + IndexUpdate { + index_uid: String, + primary_key: Option, + task: Task, + }, + IndexDeletion { + index_uid: String, + tasks: Vec, + index_has_been_created: bool, + }, + IndexSwap { + task: Task, + }, } /// A [batch](Batch) that combines multiple tasks operating on an index. @@ -115,7 +139,7 @@ impl Batch { /// Return the task ids associated with this batch. pub fn ids(&self) -> Vec { match self { - Batch::TaskCancelation(task) + Batch::TaskCancelation { task, .. } | Batch::TaskDeletion(task) | Batch::Dump(task) | Batch::IndexCreation { task, .. } @@ -394,9 +418,15 @@ impl IndexScheduler { // 1. we get the last task to cancel. if let Some(task_id) = to_cancel.max() { - return Ok(Some(Batch::TaskCancelation( - self.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?, - ))); + // We retrieve the tasks that were processing before this tasks cancelation started. + // We must *not* reset the processing tasks before calling this method. + let ProcessingTasks { started_at, processing } = + &*self.processing_tasks.read().unwrap(); + return Ok(Some(Batch::TaskCancelation { + task: self.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?, + previous_started_at: *started_at, + previous_processing_tasks: processing.clone(), + })); } // 2. we get the next task to delete @@ -482,7 +512,7 @@ impl IndexScheduler { self.breakpoint(crate::Breakpoint::InsideProcessBatch); } match batch { - Batch::TaskCancelation(mut task) => { + Batch::TaskCancelation { mut task, previous_started_at, previous_processing_tasks } => { // 1. Retrieve the tasks that matched the query at enqueue-time. let matched_tasks = if let KindWithContent::TaskCancelation { tasks, query: _ } = &task.kind { @@ -492,8 +522,13 @@ impl IndexScheduler { }; let mut wtxn = self.env.write_txn()?; - let canceled_tasks_content_uuids = - self.cancel_matched_tasks(&mut wtxn, task.uid, matched_tasks)?; + let canceled_tasks_content_uuids = self.cancel_matched_tasks( + &mut wtxn, + task.uid, + matched_tasks, + previous_started_at, + &previous_processing_tasks, + )?; task.status = Status::Succeeded; match &mut task.details { @@ -1199,6 +1234,8 @@ impl IndexScheduler { wtxn: &mut RwTxn, cancel_task_id: TaskId, matched_tasks: &RoaringBitmap, + previous_started_at: OffsetDateTime, + previous_processing_tasks: &RoaringBitmap, ) -> Result> { let now = OffsetDateTime::now_utc(); @@ -1214,6 +1251,9 @@ impl IndexScheduler { if let Some(uuid) = task.content_uuid() { content_files_to_delete.push(uuid); } + if previous_processing_tasks.contains(task.uid) { + task.started_at = Some(previous_started_at); + } task.status = Status::Canceled; task.canceled_by = Some(cancel_task_id); task.finished_at = Some(now); diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index a25f74a69..2d782355c 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -151,13 +151,12 @@ impl ProcessingTasks { self.processing = processing; } - /// Set the processing tasks to an empty list. - fn stop_processing_at(&mut self, stopped_at: OffsetDateTime) { - self.started_at = stopped_at; + /// Set the processing tasks to an empty list + fn stop_processing(&mut self) { self.processing = RoaringBitmap::new(); } - /// Returns `true` if there, at least, is one task that is currently processing we must stop. + /// Returns `true` if there, at least, is one task that is currently processing that we must stop. fn must_cancel_processing_tasks(&self, canceled_tasks: &RoaringBitmap) -> bool { !self.processing.is_disjoint(canceled_tasks) } @@ -449,8 +448,9 @@ impl IndexScheduler { /// Return the task ids matched by the given query from the index scheduler's point of view. pub(crate) fn get_task_ids(&self, rtxn: &RoTxn, query: &Query) -> Result { - let ProcessingTasks { started_at: started_at_processing, processing: processing_tasks } = - self.processing_tasks.read().unwrap().clone(); + let ProcessingTasks { + started_at: started_at_processing, processing: processing_tasks, .. + } = self.processing_tasks.read().unwrap().clone(); let mut tasks = self.all_task_ids(rtxn)?; @@ -947,6 +947,12 @@ impl IndexScheduler { #[cfg(test)] self.breakpoint(Breakpoint::AbortedIndexation); wtxn.abort().map_err(Error::HeedTransaction)?; + + // We make sure that we don't call `stop_processing` on the `processing_tasks`, + // this is because we want to let the next tick call `create_next_batch` and keep + // the `started_at` date times and `processings` of the current processing tasks. + // This date time is used by the task cancelation to store the right `started_at` + // date in the task on disk. return Ok(0); } // In case of a failure we must get back and patch all the tasks with the error. @@ -976,7 +982,7 @@ impl IndexScheduler { } } - self.processing_tasks.write().unwrap().stop_processing_at(finished_at); + self.processing_tasks.write().unwrap().stop_processing(); #[cfg(test)] self.maybe_fail(tests::FailureLocation::CommittingWtxn)?; diff --git a/index-scheduler/src/snapshots/lib.rs/cancel_mix_of_tasks/cancel_processed.snap b/index-scheduler/src/snapshots/lib.rs/cancel_mix_of_tasks/cancel_processed.snap index e398ab205..675ba268d 100644 --- a/index-scheduler/src/snapshots/lib.rs/cancel_mix_of_tasks/cancel_processed.snap +++ b/index-scheduler/src/snapshots/lib.rs/cancel_mix_of_tasks/cancel_processed.snap @@ -36,6 +36,7 @@ wolfo [2,] ---------------------------------------------------------------------- ### Started At: [timestamp] [0,] +[timestamp] [1,] [timestamp] [3,] ---------------------------------------------------------------------- ### Finished At: diff --git a/index-scheduler/src/snapshots/lib.rs/cancel_processing_task/cancel_processed.snap b/index-scheduler/src/snapshots/lib.rs/cancel_processing_task/cancel_processed.snap index f0706934b..0eb9838c7 100644 --- a/index-scheduler/src/snapshots/lib.rs/cancel_processing_task/cancel_processed.snap +++ b/index-scheduler/src/snapshots/lib.rs/cancel_processing_task/cancel_processed.snap @@ -29,6 +29,7 @@ catto [0,] [timestamp] [1,] ---------------------------------------------------------------------- ### Started At: +[timestamp] [0,] [timestamp] [1,] ---------------------------------------------------------------------- ### Finished At: