diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 3e2cc4281..ec13201d9 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -825,6 +825,10 @@ impl IndexScheduler { // 2. dump the tasks let mut dump_tasks = dump.create_tasks_queue()?; for ret in self.all_tasks.iter(&rtxn)? { + if self.must_stop_processing.get() { + return Err(Error::AbortedTask); + } + let (_, mut t) = ret?; let status = t.status; let content_file = t.content_uuid(); @@ -845,6 +849,9 @@ impl IndexScheduler { // 2.1. Dump the `content_file` associated with the task if there is one and the task is not finished yet. if let Some(content_file) = content_file { + if self.must_stop_processing.get() { + return Err(Error::AbortedTask); + } if status == Status::Enqueued { let content_file = self.file_store.get_update(content_file)?; @@ -884,6 +891,9 @@ impl IndexScheduler { // 3.1. Dump the documents for ret in index.all_documents(&rtxn)? { + if self.must_stop_processing.get() { + return Err(Error::AbortedTask); + } let (_id, doc) = ret?; let document = milli::obkv_to_json(&all_fields, &fields_ids_map, doc)?; index_dumper.push_document(&document)?; @@ -903,6 +913,9 @@ impl IndexScheduler { "[year repr:full][month repr:numerical][day padding:zero]-[hour padding:zero][minute padding:zero][second padding:zero][subsecond digits:3]" )).unwrap(); + if self.must_stop_processing.get() { + return Err(Error::AbortedTask); + } let path = self.dumps_path.join(format!("{}.dump", dump_uid)); let file = File::create(path)?; dump.persist_to(BufWriter::new(file))?; diff --git a/index-scheduler/src/error.rs b/index-scheduler/src/error.rs index ddc6960f7..bbe526460 100644 --- a/index-scheduler/src/error.rs +++ b/index-scheduler/src/error.rs @@ -108,6 +108,8 @@ pub enum Error { TaskDeletionWithEmptyQuery, #[error("Query parameters to filter the tasks to cancel are missing. Available query parameters are: `uids`, `indexUids`, `statuses`, `types`, `canceledBy`, `beforeEnqueuedAt`, `afterEnqueuedAt`, `beforeStartedAt`, `afterStartedAt`, `beforeFinishedAt`, `afterFinishedAt`.")] TaskCancelationWithEmptyQuery, + #[error("Aborted task")] + AbortedTask, #[error(transparent)] Dump(#[from] dump::Error), @@ -175,6 +177,7 @@ impl Error { | Error::TaskNotFound(_) | Error::TaskDeletionWithEmptyQuery | Error::TaskCancelationWithEmptyQuery + | Error::AbortedTask | Error::Dump(_) | Error::Heed(_) | Error::Milli(_) @@ -236,6 +239,9 @@ impl ErrorCode for Error { Error::TaskDatabaseUpdate(_) => Code::Internal, Error::CreateBatch(_) => Code::Internal, + // This one should never be seen by the end user + Error::AbortedTask => Code::Internal, + #[cfg(test)] Error::PlannedFailure => Code::Internal, } diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 43ac2355c..825f97f46 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -1167,7 +1167,8 @@ impl IndexScheduler { // If we have an abortion error we must stop the tick here and re-schedule tasks. Err(Error::Milli(milli::Error::InternalError( milli::InternalError::AbortedIndexation, - ))) => { + ))) + | Err(Error::AbortedTask) => { #[cfg(test)] self.breakpoint(Breakpoint::AbortedIndexation); wtxn.abort().map_err(Error::HeedTransaction)?; @@ -4323,4 +4324,26 @@ mod tests { } "###); } + + #[test] + fn cancel_processing_dump() { + let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]); + + let dump_creation = KindWithContent::DumpCreation { keys: Vec::new(), instance_uid: None }; + let dump_cancellation = KindWithContent::TaskCancelation { + query: "cancel dump".to_owned(), + tasks: RoaringBitmap::from_iter([0]), + }; + let _ = index_scheduler.register(dump_creation).unwrap(); + snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_dump_register"); + handle.advance_till([Start, BatchCreated, InsideProcessBatch]); + + let _ = index_scheduler.register(dump_cancellation).unwrap(); + snapshot!(snapshot_index_scheduler(&index_scheduler), name: "cancel_registered"); + + snapshot!(format!("{:?}", handle.advance()), @"AbortedIndexation"); + + handle.advance_one_successful_batch(); + snapshot!(snapshot_index_scheduler(&index_scheduler), name: "cancel_processed"); + } } diff --git a/index-scheduler/src/snapshots/lib.rs/cancel_processing_dump/after_dump_register.snap b/index-scheduler/src/snapshots/lib.rs/cancel_processing_dump/after_dump_register.snap new file mode 100644 index 000000000..ce0343975 --- /dev/null +++ b/index-scheduler/src/snapshots/lib.rs/cancel_processing_dump/after_dump_register.snap @@ -0,0 +1,35 @@ +--- +source: index-scheduler/src/lib.rs +--- +### Autobatching Enabled = true +### Processing Tasks: +[] +---------------------------------------------------------------------- +### All Tasks: +0 {uid: 0, status: enqueued, details: { dump_uid: None }, kind: DumpCreation { keys: [], instance_uid: None }} +---------------------------------------------------------------------- +### Status: +enqueued [0,] +---------------------------------------------------------------------- +### Kind: +"dumpCreation" [0,] +---------------------------------------------------------------------- +### Index Tasks: +---------------------------------------------------------------------- +### Index Mapper: + +---------------------------------------------------------------------- +### Canceled By: + +---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +---------------------------------------------------------------------- +### Started At: +---------------------------------------------------------------------- +### Finished At: +---------------------------------------------------------------------- +### File Store: + +---------------------------------------------------------------------- + diff --git a/index-scheduler/src/snapshots/lib.rs/cancel_processing_dump/cancel_processed.snap b/index-scheduler/src/snapshots/lib.rs/cancel_processing_dump/cancel_processed.snap new file mode 100644 index 000000000..f3d7b363f --- /dev/null +++ b/index-scheduler/src/snapshots/lib.rs/cancel_processing_dump/cancel_processed.snap @@ -0,0 +1,45 @@ +--- +source: index-scheduler/src/lib.rs +--- +### Autobatching Enabled = true +### Processing Tasks: +[] +---------------------------------------------------------------------- +### All Tasks: +0 {uid: 0, status: canceled, canceled_by: 1, details: { dump_uid: None }, kind: DumpCreation { keys: [], instance_uid: None }} +1 {uid: 1, status: succeeded, details: { matched_tasks: 1, canceled_tasks: Some(0), original_filter: "cancel dump" }, kind: TaskCancelation { query: "cancel dump", tasks: RoaringBitmap<[0]> }} +---------------------------------------------------------------------- +### Status: +enqueued [] +succeeded [1,] +canceled [0,] +---------------------------------------------------------------------- +### Kind: +"taskCancelation" [1,] +"dumpCreation" [0,] +---------------------------------------------------------------------- +### Index Tasks: +---------------------------------------------------------------------- +### Index Mapper: + +---------------------------------------------------------------------- +### Canceled By: +1 [0,] + +---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +[timestamp] [1,] +---------------------------------------------------------------------- +### Started At: +[timestamp] [0,] +[timestamp] [1,] +---------------------------------------------------------------------- +### Finished At: +[timestamp] [0,] +[timestamp] [1,] +---------------------------------------------------------------------- +### File Store: + +---------------------------------------------------------------------- + diff --git a/index-scheduler/src/snapshots/lib.rs/cancel_processing_dump/cancel_registered.snap b/index-scheduler/src/snapshots/lib.rs/cancel_processing_dump/cancel_registered.snap new file mode 100644 index 000000000..72ae58e00 --- /dev/null +++ b/index-scheduler/src/snapshots/lib.rs/cancel_processing_dump/cancel_registered.snap @@ -0,0 +1,38 @@ +--- +source: index-scheduler/src/lib.rs +--- +### Autobatching Enabled = true +### Processing Tasks: +[0,] +---------------------------------------------------------------------- +### All Tasks: +0 {uid: 0, status: enqueued, details: { dump_uid: None }, kind: DumpCreation { keys: [], instance_uid: None }} +1 {uid: 1, status: enqueued, details: { matched_tasks: 1, canceled_tasks: None, original_filter: "cancel dump" }, kind: TaskCancelation { query: "cancel dump", tasks: RoaringBitmap<[0]> }} +---------------------------------------------------------------------- +### Status: +enqueued [0,1,] +---------------------------------------------------------------------- +### Kind: +"taskCancelation" [1,] +"dumpCreation" [0,] +---------------------------------------------------------------------- +### Index Tasks: +---------------------------------------------------------------------- +### Index Mapper: + +---------------------------------------------------------------------- +### Canceled By: + +---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +[timestamp] [1,] +---------------------------------------------------------------------- +### Started At: +---------------------------------------------------------------------- +### Finished At: +---------------------------------------------------------------------- +### File Store: + +---------------------------------------------------------------------- +