mirror of
https://github.com/meilisearch/meilisearch.git
synced 2024-12-02 01:55:03 +08:00
Add more task deletion tests
This commit is contained in:
parent
13a72f8757
commit
568199fc0d
@ -71,7 +71,9 @@ impl BatchKind {
|
|||||||
allow_index_creation,
|
allow_index_creation,
|
||||||
settings_ids: vec![task_id],
|
settings_ids: vec![task_id],
|
||||||
}),
|
}),
|
||||||
Kind::DumpExport | Kind::Snapshot | Kind::CancelTask | Kind::DeleteTasks => unreachable!(),
|
Kind::DumpExport | Kind::Snapshot | Kind::CancelTask | Kind::DeleteTasks => {
|
||||||
|
unreachable!()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -456,10 +456,66 @@ mod tests {
|
|||||||
use tempfile::TempDir;
|
use tempfile::TempDir;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
use crate::{assert_smol_debug_snapshot, snapshot::snapshot_index_scheduler};
|
use crate::snapshot::snapshot_index_scheduler;
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
|
/// Return a `KindWithContent::IndexCreation` task
|
||||||
|
fn index_creation_task(index: &'static str, primary_key: &'static str) -> KindWithContent {
|
||||||
|
KindWithContent::IndexCreation {
|
||||||
|
index_uid: S(index),
|
||||||
|
primary_key: Some(S(primary_key)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/// Create a `KindWithContent::DocumentImport` task that imports documents.
|
||||||
|
///
|
||||||
|
/// - `index_uid` is given as parameter
|
||||||
|
/// - `primary_key` is given as parameter
|
||||||
|
/// - `method` is set to `ReplaceDocuments`
|
||||||
|
/// - `content_file` is given as parameter
|
||||||
|
/// - `documents_count` is given as parameter
|
||||||
|
/// - `allow_index_creation` is set to `true`
|
||||||
|
fn replace_document_import_task(
|
||||||
|
index: &'static str,
|
||||||
|
primary_key: Option<&'static str>,
|
||||||
|
content_file_uuid: u128,
|
||||||
|
documents_count: u64,
|
||||||
|
) -> KindWithContent {
|
||||||
|
KindWithContent::DocumentImport {
|
||||||
|
index_uid: S(index),
|
||||||
|
primary_key: primary_key.map(ToOwned::to_owned),
|
||||||
|
method: ReplaceDocuments,
|
||||||
|
content_file: Uuid::from_u128(content_file_uuid),
|
||||||
|
documents_count: documents_count,
|
||||||
|
allow_index_creation: true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create an update file with the given file uuid.
|
||||||
|
///
|
||||||
|
/// The update file contains just one simple document whose id is given by `document_id`.
|
||||||
|
///
|
||||||
|
/// The uuid of the file and its documents count is returned.
|
||||||
|
fn sample_documents(
|
||||||
|
index_scheduler: &IndexScheduler,
|
||||||
|
file_uuid: u128,
|
||||||
|
document_id: usize,
|
||||||
|
) -> (File, u64) {
|
||||||
|
let content = format!(
|
||||||
|
r#"
|
||||||
|
{{
|
||||||
|
"id" : "{document_id}"
|
||||||
|
}}"#
|
||||||
|
);
|
||||||
|
|
||||||
|
let (_uuid, mut file) = index_scheduler
|
||||||
|
.create_update_file_with_uuid(file_uuid)
|
||||||
|
.unwrap();
|
||||||
|
let documents_count =
|
||||||
|
document_formats::read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64;
|
||||||
|
(file, documents_count)
|
||||||
|
}
|
||||||
|
|
||||||
impl IndexScheduler {
|
impl IndexScheduler {
|
||||||
pub fn test(autobatching: bool) -> (Self, IndexSchedulerHandle) {
|
pub fn test(autobatching: bool) -> (Self, IndexSchedulerHandle) {
|
||||||
let tempdir = TempDir::new().unwrap();
|
let tempdir = TempDir::new().unwrap();
|
||||||
@ -496,6 +552,7 @@ mod tests {
|
|||||||
self.test_breakpoint_rcv.iter().find(|b| *b == breakpoint);
|
self.test_breakpoint_rcv.iter().find(|b| *b == breakpoint);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(unused)]
|
||||||
/// Wait until the provided breakpoint is reached.
|
/// Wait until the provided breakpoint is reached.
|
||||||
fn next_breakpoint(&self) -> Breakpoint {
|
fn next_breakpoint(&self) -> Breakpoint {
|
||||||
self.test_breakpoint_rcv.recv().unwrap()
|
self.test_breakpoint_rcv.recv().unwrap()
|
||||||
@ -512,40 +569,16 @@ mod tests {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn register() {
|
fn register() {
|
||||||
let (index_scheduler, handle) = IndexScheduler::test();
|
// In this test, the handle doesn't make any progress, we only check that the tasks are registered
|
||||||
|
let (index_scheduler, _handle) = IndexScheduler::test();
|
||||||
|
|
||||||
let kinds = [
|
let kinds = [
|
||||||
KindWithContent::IndexCreation {
|
index_creation_task("catto", "mouse"),
|
||||||
index_uid: S("catto"),
|
replace_document_import_task("catto", None, 0, 12),
|
||||||
primary_key: Some(S("mouse")),
|
|
||||||
},
|
|
||||||
KindWithContent::DocumentImport {
|
|
||||||
index_uid: S("catto"),
|
|
||||||
primary_key: None,
|
|
||||||
method: ReplaceDocuments,
|
|
||||||
content_file: Uuid::from_u128(0),
|
|
||||||
documents_count: 12,
|
|
||||||
allow_index_creation: true,
|
|
||||||
},
|
|
||||||
KindWithContent::CancelTask { tasks: vec![0, 1] },
|
KindWithContent::CancelTask { tasks: vec![0, 1] },
|
||||||
KindWithContent::DocumentImport {
|
replace_document_import_task("catto", None, 1, 50),
|
||||||
index_uid: S("catto"),
|
replace_document_import_task("doggo", Some("bone"), 2, 5000),
|
||||||
primary_key: None,
|
|
||||||
method: ReplaceDocuments,
|
|
||||||
content_file: Uuid::from_u128(1),
|
|
||||||
documents_count: 50,
|
|
||||||
allow_index_creation: true,
|
|
||||||
},
|
|
||||||
KindWithContent::DocumentImport {
|
|
||||||
index_uid: S("doggo"),
|
|
||||||
primary_key: Some(S("bone")),
|
|
||||||
method: ReplaceDocuments,
|
|
||||||
content_file: Uuid::from_u128(2),
|
|
||||||
documents_count: 5000,
|
|
||||||
allow_index_creation: true,
|
|
||||||
},
|
|
||||||
];
|
];
|
||||||
let mut inserted_tasks = Vec::new();
|
|
||||||
for (idx, kind) in kinds.into_iter().enumerate() {
|
for (idx, kind) in kinds.into_iter().enumerate() {
|
||||||
let k = kind.as_kind();
|
let k = kind.as_kind();
|
||||||
let task = index_scheduler.register(kind).unwrap();
|
let task = index_scheduler.register(kind).unwrap();
|
||||||
@ -553,8 +586,6 @@ mod tests {
|
|||||||
assert_eq!(task.uid, idx as u32);
|
assert_eq!(task.uid, idx as u32);
|
||||||
assert_eq!(task.status, Status::Enqueued);
|
assert_eq!(task.status, Status::Enqueued);
|
||||||
assert_eq!(task.kind, k);
|
assert_eq!(task.kind, k);
|
||||||
|
|
||||||
inserted_tasks.push(task);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
assert_snapshot!(snapshot_index_scheduler(&index_scheduler));
|
assert_snapshot!(snapshot_index_scheduler(&index_scheduler));
|
||||||
@ -656,49 +687,85 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn task_deletion() {
|
fn task_deletion_undeleteable() {
|
||||||
let (index_scheduler, handle) = IndexScheduler::test();
|
let (index_scheduler, handle) = IndexScheduler::test();
|
||||||
|
|
||||||
let to_enqueue = [
|
let to_enqueue = [
|
||||||
KindWithContent::IndexCreation {
|
index_creation_task("catto", "mouse"),
|
||||||
index_uid: S("catto"),
|
replace_document_import_task("catto", None, 0, 12),
|
||||||
primary_key: Some(S("mouse")),
|
replace_document_import_task("doggo", Some("bone"), 1, 5000),
|
||||||
},
|
|
||||||
KindWithContent::DocumentImport {
|
|
||||||
index_uid: S("catto"),
|
|
||||||
primary_key: None,
|
|
||||||
method: ReplaceDocuments,
|
|
||||||
content_file: Uuid::from_u128(0),
|
|
||||||
documents_count: 12,
|
|
||||||
allow_index_creation: true,
|
|
||||||
},
|
|
||||||
KindWithContent::DocumentImport {
|
|
||||||
index_uid: S("doggo"),
|
|
||||||
primary_key: Some(S("bone")),
|
|
||||||
method: ReplaceDocuments,
|
|
||||||
content_file: Uuid::from_u128(1),
|
|
||||||
documents_count: 5000,
|
|
||||||
allow_index_creation: true,
|
|
||||||
},
|
|
||||||
];
|
];
|
||||||
for task in to_enqueue {
|
for task in to_enqueue {
|
||||||
let _ = index_scheduler.register(task).unwrap();
|
let _ = index_scheduler.register(task).unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// here we have registered all the tasks, but the index scheduler
|
||||||
|
// has not progressed at all
|
||||||
assert_snapshot!(snapshot_index_scheduler(&index_scheduler));
|
assert_snapshot!(snapshot_index_scheduler(&index_scheduler));
|
||||||
|
|
||||||
index_scheduler.register(KindWithContent::DeleteTasks {
|
index_scheduler
|
||||||
query: "test_query".to_owned(),
|
.register(KindWithContent::DeleteTasks {
|
||||||
tasks: vec![0, 1],
|
query: "test_query".to_owned(),
|
||||||
});
|
tasks: vec![0, 1],
|
||||||
|
})
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// again, no progress made at all, but one more task is registered
|
||||||
assert_snapshot!(snapshot_index_scheduler(&index_scheduler));
|
assert_snapshot!(snapshot_index_scheduler(&index_scheduler));
|
||||||
|
|
||||||
|
// now we create the first batch
|
||||||
handle.wait_till(Breakpoint::BatchCreated);
|
handle.wait_till(Breakpoint::BatchCreated);
|
||||||
|
|
||||||
|
// the task deletion should now be "processing"
|
||||||
assert_snapshot!(snapshot_index_scheduler(&index_scheduler));
|
assert_snapshot!(snapshot_index_scheduler(&index_scheduler));
|
||||||
|
|
||||||
handle.wait_till(Breakpoint::AfterProcessing);
|
handle.wait_till(Breakpoint::AfterProcessing);
|
||||||
|
|
||||||
|
// after the task deletion is processed, no task should actually have been deleted,
|
||||||
|
// because the tasks with ids 0 and 1 were still "enqueued", and thus undeleteable
|
||||||
|
// the "task deletion" task should be marked as "succeeded" and, in its details, the
|
||||||
|
// number of deleted tasks should be 0
|
||||||
|
assert_snapshot!(snapshot_index_scheduler(&index_scheduler));
|
||||||
|
|
||||||
|
handle.dont_block();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn task_deletion_deleteable() {
|
||||||
|
let (index_scheduler, handle) = IndexScheduler::test();
|
||||||
|
|
||||||
|
let (file0, documents_count0) = sample_documents(&index_scheduler, 0, 0);
|
||||||
|
let (file1, documents_count1) = sample_documents(&index_scheduler, 1, 1);
|
||||||
|
|
||||||
|
let to_enqueue = [
|
||||||
|
replace_document_import_task("catto", None, 0, documents_count0),
|
||||||
|
replace_document_import_task("doggo", Some("bone"), 1, documents_count1),
|
||||||
|
];
|
||||||
|
|
||||||
|
for task in to_enqueue {
|
||||||
|
let _ = index_scheduler.register(task).unwrap();
|
||||||
|
}
|
||||||
|
file0.persist().unwrap();
|
||||||
|
file1.persist().unwrap();
|
||||||
|
|
||||||
|
assert_snapshot!(snapshot_index_scheduler(&index_scheduler));
|
||||||
|
|
||||||
|
handle.wait_till(Breakpoint::AfterProcessing);
|
||||||
|
// first addition of documents should be successful
|
||||||
|
// TODO: currently the result of this operation is incorrect!
|
||||||
|
// only the first task should be successful, because it should not be batched with
|
||||||
|
// the second task, that operates on a different index!
|
||||||
|
assert_snapshot!(snapshot_index_scheduler(&index_scheduler));
|
||||||
|
|
||||||
|
// Now we delete the first task
|
||||||
|
index_scheduler
|
||||||
|
.register(KindWithContent::DeleteTasks {
|
||||||
|
query: "test_query".to_owned(),
|
||||||
|
tasks: vec![0],
|
||||||
|
})
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
handle.wait_till(Breakpoint::AfterProcessing);
|
||||||
assert_snapshot!(snapshot_index_scheduler(&index_scheduler));
|
assert_snapshot!(snapshot_index_scheduler(&index_scheduler));
|
||||||
|
|
||||||
handle.dont_block();
|
handle.dont_block();
|
||||||
|
@ -0,0 +1,26 @@
|
|||||||
|
---
|
||||||
|
source: index-scheduler/src/lib.rs
|
||||||
|
expression: snapshot_index_scheduler(&index_scheduler)
|
||||||
|
---
|
||||||
|
### Processing Tasks:
|
||||||
|
[]
|
||||||
|
----------------------------------------------------------------------
|
||||||
|
### All Tasks:
|
||||||
|
0 {uid: 0, status: succeeded, details: { received_documents: 1, indexed_documents: 1 }, kind: DocumentImport { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}
|
||||||
|
1 {uid: 1, status: succeeded, details: { received_documents: 1, indexed_documents: 1 }, kind: DocumentImport { index_uid: "doggo", primary_key: Some("bone"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000001, documents_count: 1, allow_index_creation: true }}
|
||||||
|
----------------------------------------------------------------------
|
||||||
|
### Status:
|
||||||
|
enqueued []
|
||||||
|
succeeded [0,1,]
|
||||||
|
----------------------------------------------------------------------
|
||||||
|
### Kind:
|
||||||
|
{"documentImport":{"method":"ReplaceDocuments","allow_index_creation":true}} [0,1,]
|
||||||
|
----------------------------------------------------------------------
|
||||||
|
### Index Tasks:
|
||||||
|
catto [0,]
|
||||||
|
doggo [1,]
|
||||||
|
----------------------------------------------------------------------
|
||||||
|
### Index Mapper:
|
||||||
|
["catto"]
|
||||||
|
----------------------------------------------------------------------
|
||||||
|
|
@ -0,0 +1,27 @@
|
|||||||
|
---
|
||||||
|
source: index-scheduler/src/lib.rs
|
||||||
|
expression: snapshot_index_scheduler(&index_scheduler)
|
||||||
|
---
|
||||||
|
### Processing Tasks:
|
||||||
|
[]
|
||||||
|
----------------------------------------------------------------------
|
||||||
|
### All Tasks:
|
||||||
|
1 {uid: 1, status: succeeded, details: { received_documents: 1, indexed_documents: 1 }, kind: DocumentImport { index_uid: "doggo", primary_key: Some("bone"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000001, documents_count: 1, allow_index_creation: true }}
|
||||||
|
2 {uid: 2, status: succeeded, details: { matched_tasks: 1, deleted_tasks: Some(1), original_query: "test_query" }, kind: DeleteTasks { query: "test_query", tasks: [0] }}
|
||||||
|
----------------------------------------------------------------------
|
||||||
|
### Status:
|
||||||
|
enqueued []
|
||||||
|
succeeded [1,2,]
|
||||||
|
----------------------------------------------------------------------
|
||||||
|
### Kind:
|
||||||
|
{"documentImport":{"method":"ReplaceDocuments","allow_index_creation":true}} [1,]
|
||||||
|
"deleteTasks" [2,]
|
||||||
|
----------------------------------------------------------------------
|
||||||
|
### Index Tasks:
|
||||||
|
catto []
|
||||||
|
doggo [1,]
|
||||||
|
----------------------------------------------------------------------
|
||||||
|
### Index Mapper:
|
||||||
|
["catto"]
|
||||||
|
----------------------------------------------------------------------
|
||||||
|
|
@ -0,0 +1,25 @@
|
|||||||
|
---
|
||||||
|
source: index-scheduler/src/lib.rs
|
||||||
|
expression: snapshot_index_scheduler(&index_scheduler)
|
||||||
|
---
|
||||||
|
### Processing Tasks:
|
||||||
|
[]
|
||||||
|
----------------------------------------------------------------------
|
||||||
|
### All Tasks:
|
||||||
|
0 {uid: 0, status: enqueued, details: { received_documents: 1, indexed_documents: 0 }, kind: DocumentImport { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}
|
||||||
|
1 {uid: 1, status: enqueued, details: { received_documents: 1, indexed_documents: 0 }, kind: DocumentImport { index_uid: "doggo", primary_key: Some("bone"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000001, documents_count: 1, allow_index_creation: true }}
|
||||||
|
----------------------------------------------------------------------
|
||||||
|
### Status:
|
||||||
|
enqueued [0,1,]
|
||||||
|
----------------------------------------------------------------------
|
||||||
|
### Kind:
|
||||||
|
{"documentImport":{"method":"ReplaceDocuments","allow_index_creation":true}} [0,1,]
|
||||||
|
----------------------------------------------------------------------
|
||||||
|
### Index Tasks:
|
||||||
|
catto [0,]
|
||||||
|
doggo [1,]
|
||||||
|
----------------------------------------------------------------------
|
||||||
|
### Index Mapper:
|
||||||
|
[]
|
||||||
|
----------------------------------------------------------------------
|
||||||
|
|
Loading…
Reference in New Issue
Block a user