Add index scheduler tests for task cancelation

This commit is contained in:
Loïc Lecrenier 2022-10-25 11:42:14 +02:00 committed by Tamo
parent 10716b9786
commit 513544c1f3
13 changed files with 511 additions and 6 deletions

View File

@ -475,10 +475,11 @@ impl IndexScheduler {
/// [`finished_at`](meilisearch_types::tasks::Task::finished_at) and [`started_at`](meilisearch_types::tasks::Task::started_at).
pub(crate) fn process_batch(&self, batch: Batch) -> Result<Vec<Task>> {
#[cfg(test)]
self.maybe_fail(crate::tests::FailureLocation::InsideProcessBatch)?;
#[cfg(test)]
self.maybe_fail(crate::tests::FailureLocation::PanicInsideProcessBatch)?;
{
self.maybe_fail(crate::tests::FailureLocation::InsideProcessBatch)?;
self.maybe_fail(crate::tests::FailureLocation::PanicInsideProcessBatch)?;
self.breakpoint(crate::Breakpoint::InsideProcessBatch);
}
match batch {
Batch::TaskCancelation(mut task) => {
// 1. Retrieve the tasks that matched the query at enqueue-time.

View File

@ -298,6 +298,7 @@ pub enum Breakpoint {
AbortedIndexation,
ProcessBatchSucceeded,
ProcessBatchFailed,
InsideProcessBatch,
}
impl IndexScheduler {
@ -1545,6 +1546,136 @@ mod tests {
snapshot!(snapshot_index_scheduler(&index_scheduler));
}
#[test]
fn cancel_enqueued_task() {
let (index_scheduler, handle) = IndexScheduler::test(true, vec![]);
let (file0, documents_count0) = sample_documents(&index_scheduler, 0, 0);
file0.persist().unwrap();
let to_enqueue = [
replace_document_import_task("catto", None, 0, documents_count0),
KindWithContent::TaskCancelation {
query: "test_query".to_owned(),
tasks: RoaringBitmap::from_iter([0]),
},
];
for task in to_enqueue {
let _ = index_scheduler.register(task).unwrap();
index_scheduler.assert_internally_consistent();
}
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "initial_tasks_enqueued");
handle.wait_till(Breakpoint::AfterProcessing);
index_scheduler.assert_internally_consistent();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "cancel_processed");
}
#[test]
fn cancel_succeeded_task() {
let (index_scheduler, handle) = IndexScheduler::test(true, vec![]);
let (file0, documents_count0) = sample_documents(&index_scheduler, 0, 0);
file0.persist().unwrap();
let _ = index_scheduler
.register(replace_document_import_task("catto", None, 0, documents_count0))
.unwrap();
index_scheduler.assert_internally_consistent();
handle.wait_till(Breakpoint::AfterProcessing);
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "initial_task_processed");
index_scheduler
.register(KindWithContent::TaskCancelation {
query: "test_query".to_owned(),
tasks: RoaringBitmap::from_iter([0]),
})
.unwrap();
handle.wait_till(Breakpoint::AfterProcessing);
index_scheduler.assert_internally_consistent();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "cancel_processed");
}
#[test]
fn cancel_processing_task() {
let (index_scheduler, handle) = IndexScheduler::test(true, vec![]);
let (file0, documents_count0) = sample_documents(&index_scheduler, 0, 0);
file0.persist().unwrap();
let _ = index_scheduler
.register(replace_document_import_task("catto", None, 0, documents_count0))
.unwrap();
index_scheduler.assert_internally_consistent();
handle.wait_till(Breakpoint::InsideProcessBatch);
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "initial_task_processing");
index_scheduler
.register(KindWithContent::TaskCancelation {
query: "test_query".to_owned(),
tasks: RoaringBitmap::from_iter([0]),
})
.unwrap();
index_scheduler.assert_internally_consistent();
// Now we check that we can reach the AbortedIndexation error handling
handle.wait_till(Breakpoint::AbortedIndexation);
index_scheduler.assert_internally_consistent();
handle.wait_till(Breakpoint::AfterProcessing);
index_scheduler.assert_internally_consistent();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "cancel_processed");
}
#[test]
fn cancel_mix_of_tasks() {
let (index_scheduler, handle) = IndexScheduler::test(true, vec![]);
let (file0, documents_count0) = sample_documents(&index_scheduler, 0, 0);
file0.persist().unwrap();
let (file1, documents_count1) = sample_documents(&index_scheduler, 1, 1);
file1.persist().unwrap();
let (file2, documents_count2) = sample_documents(&index_scheduler, 2, 2);
file2.persist().unwrap();
let to_enqueue = [
replace_document_import_task("catto", None, 0, documents_count0),
replace_document_import_task("beavero", None, 1, documents_count1),
replace_document_import_task("wolfo", None, 2, documents_count2),
];
for task in to_enqueue {
let _ = index_scheduler.register(task).unwrap();
index_scheduler.assert_internally_consistent();
}
handle.wait_till(Breakpoint::AfterProcessing);
index_scheduler.assert_internally_consistent();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "first_task_processed");
handle.wait_till(Breakpoint::InsideProcessBatch);
index_scheduler
.register(KindWithContent::TaskCancelation {
query: "test_query".to_owned(),
tasks: RoaringBitmap::from_iter([0, 1, 2]),
})
.unwrap();
index_scheduler.assert_internally_consistent();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "processing_second_task_cancel_enqueued");
handle.wait_till(Breakpoint::AbortedIndexation);
index_scheduler.assert_internally_consistent();
handle.wait_till(Breakpoint::AfterProcessing);
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "cancel_processed");
}
#[macro_export]
macro_rules! debug_snapshot {
($value:expr, @$snapshot:literal) => {{

View File

@ -129,7 +129,7 @@ pub fn snapshot_task(task: &Task) -> String {
started_at: _,
finished_at: _,
error,
canceled_by: _,
canceled_by,
details,
status,
kind,
@ -137,6 +137,9 @@ pub fn snapshot_task(task: &Task) -> String {
snap.push('{');
snap.push_str(&format!("uid: {uid}, "));
snap.push_str(&format!("status: {status}, "));
if let Some(canceled_by) = canceled_by {
snap.push_str(&format!("canceled_by: {canceled_by}, "));
}
if let Some(error) = error {
snap.push_str(&format!("error: {error:?}, "));
}

View File

@ -0,0 +1,41 @@
---
source: index-scheduler/src/lib.rs
---
### Autobatching Enabled = true
### Processing Tasks:
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: canceled, canceled_by: 1, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}
1 {uid: 1, status: succeeded, details: { matched_tasks: 1, canceled_tasks: Some(1), original_query: "test_query" }, kind: TaskCancelation { query: "test_query", tasks: RoaringBitmap<[0]> }}
----------------------------------------------------------------------
### Status:
enqueued []
succeeded [1,]
canceled [0,]
----------------------------------------------------------------------
### Kind:
"documentAdditionOrUpdate" [0,]
"taskCancelation" [1,]
----------------------------------------------------------------------
### Index Tasks:
catto [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
[timestamp] [1,]
----------------------------------------------------------------------
### Started At:
[timestamp] [1,]
----------------------------------------------------------------------
### Finished At:
[timestamp] [0,]
[timestamp] [1,]
----------------------------------------------------------------------
### File Store:
----------------------------------------------------------------------

View File

@ -0,0 +1,37 @@
---
source: index-scheduler/src/lib.rs
---
### Autobatching Enabled = true
### Processing Tasks:
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}
1 {uid: 1, status: enqueued, details: { matched_tasks: 1, canceled_tasks: None, original_query: "test_query" }, kind: TaskCancelation { query: "test_query", tasks: RoaringBitmap<[0]> }}
----------------------------------------------------------------------
### Status:
enqueued [0,1,]
----------------------------------------------------------------------
### Kind:
"documentAdditionOrUpdate" [0,]
"taskCancelation" [1,]
----------------------------------------------------------------------
### Index Tasks:
catto [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
[timestamp] [1,]
----------------------------------------------------------------------
### Started At:
----------------------------------------------------------------------
### Finished At:
----------------------------------------------------------------------
### File Store:
00000000-0000-0000-0000-000000000000
----------------------------------------------------------------------

View File

@ -0,0 +1,49 @@
---
source: index-scheduler/src/lib.rs
---
### Autobatching Enabled = true
### Processing Tasks:
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: succeeded, details: { received_documents: 1, indexed_documents: Some(1) }, kind: DocumentAdditionOrUpdate { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}
1 {uid: 1, status: canceled, canceled_by: 3, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "beavero", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000001, documents_count: 1, allow_index_creation: true }}
2 {uid: 2, status: canceled, canceled_by: 3, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "wolfo", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000002, documents_count: 1, allow_index_creation: true }}
3 {uid: 3, status: succeeded, details: { matched_tasks: 3, canceled_tasks: Some(2), original_query: "test_query" }, kind: TaskCancelation { query: "test_query", tasks: RoaringBitmap<[0, 1, 2]> }}
----------------------------------------------------------------------
### Status:
enqueued []
succeeded [0,3,]
canceled [1,2,]
----------------------------------------------------------------------
### Kind:
"documentAdditionOrUpdate" [0,1,2,]
"taskCancelation" [3,]
----------------------------------------------------------------------
### Index Tasks:
beavero [1,]
catto [0,]
wolfo [2,]
----------------------------------------------------------------------
### Index Mapper:
["beavero", "catto"]
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
[timestamp] [3,]
----------------------------------------------------------------------
### Started At:
[timestamp] [0,]
[timestamp] [3,]
----------------------------------------------------------------------
### Finished At:
[timestamp] [0,]
[timestamp] [2,]
[timestamp] [3,]
----------------------------------------------------------------------
### File Store:
----------------------------------------------------------------------

View File

@ -0,0 +1,44 @@
---
source: index-scheduler/src/lib.rs
---
### Autobatching Enabled = true
### Processing Tasks:
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: succeeded, details: { received_documents: 1, indexed_documents: Some(1) }, kind: DocumentAdditionOrUpdate { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}
1 {uid: 1, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "beavero", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000001, documents_count: 1, allow_index_creation: true }}
2 {uid: 2, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "wolfo", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000002, documents_count: 1, allow_index_creation: true }}
----------------------------------------------------------------------
### Status:
enqueued [1,2,]
succeeded [0,]
----------------------------------------------------------------------
### Kind:
"documentAdditionOrUpdate" [0,1,2,]
----------------------------------------------------------------------
### Index Tasks:
beavero [1,]
catto [0,]
wolfo [2,]
----------------------------------------------------------------------
### Index Mapper:
["catto"]
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
----------------------------------------------------------------------
### Started At:
[timestamp] [0,]
----------------------------------------------------------------------
### Finished At:
[timestamp] [0,]
----------------------------------------------------------------------
### File Store:
00000000-0000-0000-0000-000000000001
00000000-0000-0000-0000-000000000002
----------------------------------------------------------------------

View File

@ -0,0 +1,47 @@
---
source: index-scheduler/src/lib.rs
---
### Autobatching Enabled = true
### Processing Tasks:
[1,]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: succeeded, details: { received_documents: 1, indexed_documents: Some(1) }, kind: DocumentAdditionOrUpdate { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}
1 {uid: 1, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "beavero", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000001, documents_count: 1, allow_index_creation: true }}
2 {uid: 2, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "wolfo", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000002, documents_count: 1, allow_index_creation: true }}
3 {uid: 3, status: enqueued, details: { matched_tasks: 3, canceled_tasks: None, original_query: "test_query" }, kind: TaskCancelation { query: "test_query", tasks: RoaringBitmap<[0, 1, 2]> }}
----------------------------------------------------------------------
### Status:
enqueued [1,2,3,]
succeeded [0,]
----------------------------------------------------------------------
### Kind:
"documentAdditionOrUpdate" [0,1,2,]
"taskCancelation" [3,]
----------------------------------------------------------------------
### Index Tasks:
beavero [1,]
catto [0,]
wolfo [2,]
----------------------------------------------------------------------
### Index Mapper:
["catto"]
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
[timestamp] [3,]
----------------------------------------------------------------------
### Started At:
[timestamp] [0,]
----------------------------------------------------------------------
### Finished At:
[timestamp] [0,]
----------------------------------------------------------------------
### File Store:
00000000-0000-0000-0000-000000000001
00000000-0000-0000-0000-000000000002
----------------------------------------------------------------------

View File

@ -0,0 +1,41 @@
---
source: index-scheduler/src/lib.rs
---
### Autobatching Enabled = true
### Processing Tasks:
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: canceled, canceled_by: 1, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}
1 {uid: 1, status: succeeded, details: { matched_tasks: 1, canceled_tasks: Some(1), original_query: "test_query" }, kind: TaskCancelation { query: "test_query", tasks: RoaringBitmap<[0]> }}
----------------------------------------------------------------------
### Status:
enqueued []
succeeded [1,]
canceled [0,]
----------------------------------------------------------------------
### Kind:
"documentAdditionOrUpdate" [0,]
"taskCancelation" [1,]
----------------------------------------------------------------------
### Index Tasks:
catto [0,]
----------------------------------------------------------------------
### Index Mapper:
["catto"]
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
[timestamp] [1,]
----------------------------------------------------------------------
### Started At:
[timestamp] [1,]
----------------------------------------------------------------------
### Finished At:
[timestamp] [0,]
[timestamp] [1,]
----------------------------------------------------------------------
### File Store:
----------------------------------------------------------------------

View File

@ -0,0 +1,34 @@
---
source: index-scheduler/src/lib.rs
---
### Autobatching Enabled = true
### Processing Tasks:
[0,]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}
----------------------------------------------------------------------
### Status:
enqueued [0,]
----------------------------------------------------------------------
### Kind:
"documentAdditionOrUpdate" [0,]
----------------------------------------------------------------------
### Index Tasks:
catto [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
----------------------------------------------------------------------
### Started At:
----------------------------------------------------------------------
### Finished At:
----------------------------------------------------------------------
### File Store:
00000000-0000-0000-0000-000000000000
----------------------------------------------------------------------

View File

@ -0,0 +1,41 @@
---
source: index-scheduler/src/lib.rs
---
### Autobatching Enabled = true
### Processing Tasks:
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: succeeded, details: { received_documents: 1, indexed_documents: Some(1) }, kind: DocumentAdditionOrUpdate { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}
1 {uid: 1, status: succeeded, details: { matched_tasks: 1, canceled_tasks: Some(0), original_query: "test_query" }, kind: TaskCancelation { query: "test_query", tasks: RoaringBitmap<[0]> }}
----------------------------------------------------------------------
### Status:
enqueued []
succeeded [0,1,]
----------------------------------------------------------------------
### Kind:
"documentAdditionOrUpdate" [0,]
"taskCancelation" [1,]
----------------------------------------------------------------------
### Index Tasks:
catto [0,]
----------------------------------------------------------------------
### Index Mapper:
["catto"]
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
[timestamp] [1,]
----------------------------------------------------------------------
### Started At:
[timestamp] [0,]
[timestamp] [1,]
----------------------------------------------------------------------
### Finished At:
[timestamp] [0,]
[timestamp] [1,]
----------------------------------------------------------------------
### File Store:
----------------------------------------------------------------------

View File

@ -0,0 +1,36 @@
---
source: index-scheduler/src/lib.rs
---
### Autobatching Enabled = true
### Processing Tasks:
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: succeeded, details: { received_documents: 1, indexed_documents: Some(1) }, kind: DocumentAdditionOrUpdate { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}
----------------------------------------------------------------------
### Status:
enqueued []
succeeded [0,]
----------------------------------------------------------------------
### Kind:
"documentAdditionOrUpdate" [0,]
----------------------------------------------------------------------
### Index Tasks:
catto [0,]
----------------------------------------------------------------------
### Index Mapper:
["catto"]
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
----------------------------------------------------------------------
### Started At:
[timestamp] [0,]
----------------------------------------------------------------------
### Finished At:
[timestamp] [0,]
----------------------------------------------------------------------
### File Store:
----------------------------------------------------------------------

View File

@ -331,7 +331,7 @@ impl IndexScheduler {
}
if let Some(canceled_by) = canceled_by {
let db_canceled_tasks = self.get_status(&rtxn, Status::Canceled).unwrap();
assert!(db_canceled_tasks.contains(canceled_by));
assert!(db_canceled_tasks.contains(uid));
let db_canceling_task = self.get_task(&rtxn, canceled_by).unwrap().unwrap();
assert_eq!(db_canceling_task.status, Status::Succeeded);
match db_canceling_task.kind {