Use the content of the ProcessingTasks in the tasks cancelation system

This commit is contained in:
Kerollmops 2022-11-02 15:38:07 +01:00 committed by Clément Renault
parent 2254bbf3bd
commit 739b9f5505
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
4 changed files with 69 additions and 21 deletions

View File

@ -41,7 +41,7 @@ use uuid::Uuid;
use crate::autobatcher::{self, BatchKind}; use crate::autobatcher::{self, BatchKind};
use crate::utils::{self, swap_index_uid_in_task}; use crate::utils::{self, swap_index_uid_in_task};
use crate::{Error, IndexScheduler, Result, TaskId}; use crate::{Error, IndexScheduler, ProcessingTasks, Result, TaskId};
/// Represents a combination of tasks that can all be processed at the same time. /// Represents a combination of tasks that can all be processed at the same time.
/// ///
@ -50,15 +50,39 @@ use crate::{Error, IndexScheduler, Result, TaskId};
/// be processed. /// be processed.
#[derive(Debug)] #[derive(Debug)]
pub(crate) enum Batch { pub(crate) enum Batch {
TaskCancelation(Task), TaskCancelation {
/// The task cancelation itself.
task: Task,
/// The date and time at which the previously processing tasks started.
previous_started_at: OffsetDateTime,
/// The list of tasks that were processing when this task cancelation appeared.
previous_processing_tasks: RoaringBitmap,
},
TaskDeletion(Task), TaskDeletion(Task),
SnapshotCreation(Vec<Task>), SnapshotCreation(Vec<Task>),
Dump(Task), Dump(Task),
IndexOperation { op: IndexOperation, must_create_index: bool }, IndexOperation {
IndexCreation { index_uid: String, primary_key: Option<String>, task: Task }, op: IndexOperation,
IndexUpdate { index_uid: String, primary_key: Option<String>, task: Task }, must_create_index: bool,
IndexDeletion { index_uid: String, tasks: Vec<Task>, index_has_been_created: bool }, },
IndexSwap { task: Task }, IndexCreation {
index_uid: String,
primary_key: Option<String>,
task: Task,
},
IndexUpdate {
index_uid: String,
primary_key: Option<String>,
task: Task,
},
IndexDeletion {
index_uid: String,
tasks: Vec<Task>,
index_has_been_created: bool,
},
IndexSwap {
task: Task,
},
} }
/// A [batch](Batch) that combines multiple tasks operating on an index. /// A [batch](Batch) that combines multiple tasks operating on an index.
@ -115,7 +139,7 @@ impl Batch {
/// Return the task ids associated with this batch. /// Return the task ids associated with this batch.
pub fn ids(&self) -> Vec<TaskId> { pub fn ids(&self) -> Vec<TaskId> {
match self { match self {
Batch::TaskCancelation(task) Batch::TaskCancelation { task, .. }
| Batch::TaskDeletion(task) | Batch::TaskDeletion(task)
| Batch::Dump(task) | Batch::Dump(task)
| Batch::IndexCreation { task, .. } | Batch::IndexCreation { task, .. }
@ -394,9 +418,15 @@ impl IndexScheduler {
// 1. we get the last task to cancel. // 1. we get the last task to cancel.
if let Some(task_id) = to_cancel.max() { if let Some(task_id) = to_cancel.max() {
return Ok(Some(Batch::TaskCancelation( // We retrieve the tasks that were processing before this tasks cancelation started.
self.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?, // We must *not* reset the processing tasks before calling this method.
))); let ProcessingTasks { started_at, processing } =
&*self.processing_tasks.read().unwrap();
return Ok(Some(Batch::TaskCancelation {
task: self.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?,
previous_started_at: *started_at,
previous_processing_tasks: processing.clone(),
}));
} }
// 2. we get the next task to delete // 2. we get the next task to delete
@ -482,7 +512,7 @@ impl IndexScheduler {
self.breakpoint(crate::Breakpoint::InsideProcessBatch); self.breakpoint(crate::Breakpoint::InsideProcessBatch);
} }
match batch { match batch {
Batch::TaskCancelation(mut task) => { Batch::TaskCancelation { mut task, previous_started_at, previous_processing_tasks } => {
// 1. Retrieve the tasks that matched the query at enqueue-time. // 1. Retrieve the tasks that matched the query at enqueue-time.
let matched_tasks = let matched_tasks =
if let KindWithContent::TaskCancelation { tasks, query: _ } = &task.kind { if let KindWithContent::TaskCancelation { tasks, query: _ } = &task.kind {
@ -492,8 +522,13 @@ impl IndexScheduler {
}; };
let mut wtxn = self.env.write_txn()?; let mut wtxn = self.env.write_txn()?;
let canceled_tasks_content_uuids = let canceled_tasks_content_uuids = self.cancel_matched_tasks(
self.cancel_matched_tasks(&mut wtxn, task.uid, matched_tasks)?; &mut wtxn,
task.uid,
matched_tasks,
previous_started_at,
&previous_processing_tasks,
)?;
task.status = Status::Succeeded; task.status = Status::Succeeded;
match &mut task.details { match &mut task.details {
@ -1199,6 +1234,8 @@ impl IndexScheduler {
wtxn: &mut RwTxn, wtxn: &mut RwTxn,
cancel_task_id: TaskId, cancel_task_id: TaskId,
matched_tasks: &RoaringBitmap, matched_tasks: &RoaringBitmap,
previous_started_at: OffsetDateTime,
previous_processing_tasks: &RoaringBitmap,
) -> Result<Vec<Uuid>> { ) -> Result<Vec<Uuid>> {
let now = OffsetDateTime::now_utc(); let now = OffsetDateTime::now_utc();
@ -1214,6 +1251,9 @@ impl IndexScheduler {
if let Some(uuid) = task.content_uuid() { if let Some(uuid) = task.content_uuid() {
content_files_to_delete.push(uuid); content_files_to_delete.push(uuid);
} }
if previous_processing_tasks.contains(task.uid) {
task.started_at = Some(previous_started_at);
}
task.status = Status::Canceled; task.status = Status::Canceled;
task.canceled_by = Some(cancel_task_id); task.canceled_by = Some(cancel_task_id);
task.finished_at = Some(now); task.finished_at = Some(now);

View File

@ -151,13 +151,12 @@ impl ProcessingTasks {
self.processing = processing; self.processing = processing;
} }
/// Set the processing tasks to an empty list. /// Set the processing tasks to an empty list
fn stop_processing_at(&mut self, stopped_at: OffsetDateTime) { fn stop_processing(&mut self) {
self.started_at = stopped_at;
self.processing = RoaringBitmap::new(); self.processing = RoaringBitmap::new();
} }
/// Returns `true` if there, at least, is one task that is currently processing we must stop. /// Returns `true` if there, at least, is one task that is currently processing that we must stop.
fn must_cancel_processing_tasks(&self, canceled_tasks: &RoaringBitmap) -> bool { fn must_cancel_processing_tasks(&self, canceled_tasks: &RoaringBitmap) -> bool {
!self.processing.is_disjoint(canceled_tasks) !self.processing.is_disjoint(canceled_tasks)
} }
@ -449,8 +448,9 @@ impl IndexScheduler {
/// Return the task ids matched by the given query from the index scheduler's point of view. /// Return the task ids matched by the given query from the index scheduler's point of view.
pub(crate) fn get_task_ids(&self, rtxn: &RoTxn, query: &Query) -> Result<RoaringBitmap> { pub(crate) fn get_task_ids(&self, rtxn: &RoTxn, query: &Query) -> Result<RoaringBitmap> {
let ProcessingTasks { started_at: started_at_processing, processing: processing_tasks } = let ProcessingTasks {
self.processing_tasks.read().unwrap().clone(); started_at: started_at_processing, processing: processing_tasks, ..
} = self.processing_tasks.read().unwrap().clone();
let mut tasks = self.all_task_ids(rtxn)?; let mut tasks = self.all_task_ids(rtxn)?;
@ -947,6 +947,12 @@ impl IndexScheduler {
#[cfg(test)] #[cfg(test)]
self.breakpoint(Breakpoint::AbortedIndexation); self.breakpoint(Breakpoint::AbortedIndexation);
wtxn.abort().map_err(Error::HeedTransaction)?; wtxn.abort().map_err(Error::HeedTransaction)?;
// We make sure that we don't call `stop_processing` on the `processing_tasks`,
// this is because we want to let the next tick call `create_next_batch` and keep
// the `started_at` date times and `processings` of the current processing tasks.
// This date time is used by the task cancelation to store the right `started_at`
// date in the task on disk.
return Ok(0); return Ok(0);
} }
// In case of a failure we must get back and patch all the tasks with the error. // In case of a failure we must get back and patch all the tasks with the error.
@ -976,7 +982,7 @@ impl IndexScheduler {
} }
} }
self.processing_tasks.write().unwrap().stop_processing_at(finished_at); self.processing_tasks.write().unwrap().stop_processing();
#[cfg(test)] #[cfg(test)]
self.maybe_fail(tests::FailureLocation::CommittingWtxn)?; self.maybe_fail(tests::FailureLocation::CommittingWtxn)?;

View File

@ -36,6 +36,7 @@ wolfo [2,]
---------------------------------------------------------------------- ----------------------------------------------------------------------
### Started At: ### Started At:
[timestamp] [0,] [timestamp] [0,]
[timestamp] [1,]
[timestamp] [3,] [timestamp] [3,]
---------------------------------------------------------------------- ----------------------------------------------------------------------
### Finished At: ### Finished At:

View File

@ -29,6 +29,7 @@ catto [0,]
[timestamp] [1,] [timestamp] [1,]
---------------------------------------------------------------------- ----------------------------------------------------------------------
### Started At: ### Started At:
[timestamp] [0,]
[timestamp] [1,] [timestamp] [1,]
---------------------------------------------------------------------- ----------------------------------------------------------------------
### Finished At: ### Finished At: