Handle the autobatching of deletion and addition in the scheduler

This commit is contained in:
Tamo 2023-02-08 20:53:19 +01:00
parent 67dda0678f
commit 860c993ef7
No known key found for this signature in database
GPG Key ID: 20CD8020AFA88D69
8 changed files with 409 additions and 71 deletions

View File

@ -86,15 +86,27 @@ pub(crate) enum Batch {
}, },
} }
#[derive(Debug)]
pub(crate) enum DocumentOperation {
Add(Uuid),
Delete(Vec<String>),
}
#[derive(Debug)]
pub(crate) enum DocumentOperationResult {
Add(DocumentAdditionResult),
Delete(DocumentDeletionResult),
}
/// A [batch](Batch) that combines multiple tasks operating on an index. /// A [batch](Batch) that combines multiple tasks operating on an index.
#[derive(Debug)] #[derive(Debug)]
pub(crate) enum IndexOperation { pub(crate) enum IndexOperation {
DocumentImport { DocumentOperation {
index_uid: String, index_uid: String,
primary_key: Option<String>, primary_key: Option<String>,
method: IndexDocumentsMethod, method: IndexDocumentsMethod,
documents_counts: Vec<u64>, documents_counts: Vec<u64>,
content_files: Vec<Uuid>, operations: Vec<DocumentOperation>,
tasks: Vec<Task>, tasks: Vec<Task>,
}, },
DocumentDeletion { DocumentDeletion {
@ -121,13 +133,13 @@ pub(crate) enum IndexOperation {
settings: Vec<(bool, Settings<Unchecked>)>, settings: Vec<(bool, Settings<Unchecked>)>,
settings_tasks: Vec<Task>, settings_tasks: Vec<Task>,
}, },
SettingsAndDocumentImport { SettingsAndDocumentOperation {
index_uid: String, index_uid: String,
primary_key: Option<String>, primary_key: Option<String>,
method: IndexDocumentsMethod, method: IndexDocumentsMethod,
documents_counts: Vec<u64>, documents_counts: Vec<u64>,
content_files: Vec<Uuid>, operations: Vec<DocumentOperation>,
document_import_tasks: Vec<Task>, document_import_tasks: Vec<Task>,
// The boolean indicates if it's a settings deletion or creation. // The boolean indicates if it's a settings deletion or creation.
@ -149,13 +161,13 @@ impl Batch {
tasks.iter().map(|task| task.uid).collect() tasks.iter().map(|task| task.uid).collect()
} }
Batch::IndexOperation { op, .. } => match op { Batch::IndexOperation { op, .. } => match op {
IndexOperation::DocumentImport { tasks, .. } IndexOperation::DocumentOperation { tasks, .. }
| IndexOperation::DocumentDeletion { tasks, .. } | IndexOperation::DocumentDeletion { tasks, .. }
| IndexOperation::Settings { tasks, .. } | IndexOperation::Settings { tasks, .. }
| IndexOperation::DocumentClear { tasks, .. } => { | IndexOperation::DocumentClear { tasks, .. } => {
tasks.iter().map(|task| task.uid).collect() tasks.iter().map(|task| task.uid).collect()
} }
IndexOperation::SettingsAndDocumentImport { IndexOperation::SettingsAndDocumentOperation {
document_import_tasks: tasks, document_import_tasks: tasks,
settings_tasks: other, settings_tasks: other,
.. ..
@ -174,12 +186,12 @@ impl Batch {
impl IndexOperation { impl IndexOperation {
pub fn index_uid(&self) -> &str { pub fn index_uid(&self) -> &str {
match self { match self {
IndexOperation::DocumentImport { index_uid, .. } IndexOperation::DocumentOperation { index_uid, .. }
| IndexOperation::DocumentDeletion { index_uid, .. } | IndexOperation::DocumentDeletion { index_uid, .. }
| IndexOperation::DocumentClear { index_uid, .. } | IndexOperation::DocumentClear { index_uid, .. }
| IndexOperation::Settings { index_uid, .. } | IndexOperation::Settings { index_uid, .. }
| IndexOperation::DocumentClearAndSetting { index_uid, .. } | IndexOperation::DocumentClearAndSetting { index_uid, .. }
| IndexOperation::SettingsAndDocumentImport { index_uid, .. } => index_uid, | IndexOperation::SettingsAndDocumentOperation { index_uid, .. } => index_uid,
} }
} }
} }
@ -206,17 +218,22 @@ impl IndexScheduler {
}, },
must_create_index, must_create_index,
})), })),
BatchKind::DocumentOperation { method, operation_ids: import_ids, .. } => { BatchKind::DocumentOperation { method, operation_ids, .. } => {
let tasks = self.get_existing_tasks(rtxn, import_ids)?; let tasks = self.get_existing_tasks(rtxn, operation_ids)?;
let primary_key = match &tasks[0].kind { let primary_key = tasks
KindWithContent::DocumentAdditionOrUpdate { primary_key, .. } => { .iter()
primary_key.clone() .find_map(|task| match task.kind {
} KindWithContent::DocumentAdditionOrUpdate { ref primary_key, .. } => {
_ => unreachable!(), // we want to stop on the first document addition
}; Some(primary_key.clone())
}
KindWithContent::DocumentDeletion { .. } => None,
_ => unreachable!(),
})
.flatten();
let mut documents_counts = Vec::new(); let mut documents_counts = Vec::new();
let mut content_files = Vec::new(); let mut operations = Vec::new();
for task in tasks.iter() { for task in tasks.iter() {
match task.kind { match task.kind {
@ -226,19 +243,23 @@ impl IndexScheduler {
.. ..
} => { } => {
documents_counts.push(documents_count); documents_counts.push(documents_count);
content_files.push(content_file); operations.push(DocumentOperation::Add(content_file));
}
KindWithContent::DocumentDeletion { ref documents_ids, .. } => {
documents_counts.push(documents_ids.len() as u64);
operations.push(DocumentOperation::Delete(documents_ids.clone()));
} }
_ => unreachable!(), _ => unreachable!(),
} }
} }
Ok(Some(Batch::IndexOperation { Ok(Some(Batch::IndexOperation {
op: IndexOperation::DocumentImport { op: IndexOperation::DocumentOperation {
index_uid, index_uid,
primary_key, primary_key,
method, method,
documents_counts, documents_counts,
content_files, operations,
tasks, tasks,
}, },
must_create_index, must_create_index,
@ -327,7 +348,7 @@ impl IndexScheduler {
method, method,
allow_index_creation, allow_index_creation,
primary_key, primary_key,
operation_ids: import_ids, operation_ids,
} => { } => {
let settings = self.create_next_batch_index( let settings = self.create_next_batch_index(
rtxn, rtxn,
@ -343,7 +364,7 @@ impl IndexScheduler {
method, method,
allow_index_creation, allow_index_creation,
primary_key, primary_key,
operation_ids: import_ids, operation_ids,
}, },
must_create_index, must_create_index,
)?; )?;
@ -352,10 +373,10 @@ impl IndexScheduler {
( (
Some(Batch::IndexOperation { Some(Batch::IndexOperation {
op: op:
IndexOperation::DocumentImport { IndexOperation::DocumentOperation {
primary_key, primary_key,
documents_counts, documents_counts,
content_files, operations,
tasks: document_import_tasks, tasks: document_import_tasks,
.. ..
}, },
@ -366,12 +387,12 @@ impl IndexScheduler {
.. ..
}), }),
) => Ok(Some(Batch::IndexOperation { ) => Ok(Some(Batch::IndexOperation {
op: IndexOperation::SettingsAndDocumentImport { op: IndexOperation::SettingsAndDocumentOperation {
index_uid, index_uid,
primary_key, primary_key,
method, method,
documents_counts, documents_counts,
content_files, operations,
document_import_tasks, document_import_tasks,
settings, settings,
settings_tasks, settings_tasks,
@ -987,12 +1008,12 @@ impl IndexScheduler {
Ok(tasks) Ok(tasks)
} }
IndexOperation::DocumentImport { IndexOperation::DocumentOperation {
index_uid: _, index_uid: _,
primary_key, primary_key,
method, method,
documents_counts, documents_counts,
content_files, operations,
mut tasks, mut tasks,
} => { } => {
let mut primary_key_has_been_set = false; let mut primary_key_has_been_set = false;
@ -1037,26 +1058,68 @@ impl IndexScheduler {
|| must_stop_processing.get(), || must_stop_processing.get(),
)?; )?;
let mut results = Vec::new(); for (operation, task) in operations.into_iter().zip(tasks.iter_mut()) {
for content_uuid in content_files.into_iter() { match operation {
let content_file = self.file_store.get_update(content_uuid)?; DocumentOperation::Add(content_uuid) => {
let reader = DocumentsBatchReader::from_reader(content_file) let content_file = self.file_store.get_update(content_uuid)?;
.map_err(milli::Error::from)?; let reader = DocumentsBatchReader::from_reader(content_file)
let (new_builder, user_result) = builder.add_documents(reader)?; .map_err(milli::Error::from)?;
builder = new_builder; let (new_builder, user_result) = builder.add_documents(reader)?;
builder = new_builder;
let user_result = match user_result { let Some(Details::DocumentAdditionOrUpdate { received_documents, .. }) = task.details
Ok(count) => Ok(DocumentAdditionResult { // In the case of a `documentAdditionOrUpdate` the details MUST be set
indexed_documents: count, else { unreachable!(); };
number_of_documents: count, // TODO: this is wrong, we should use the value stored in the Details.
}),
Err(e) => Err(milli::Error::from(e)),
};
results.push(user_result); match user_result {
Ok(count) => {
task.status = Status::Succeeded;
task.details = Some(Details::DocumentAdditionOrUpdate {
received_documents,
indexed_documents: Some(count),
})
}
Err(e) => {
task.status = Status::Failed;
task.details = Some(Details::DocumentAdditionOrUpdate {
received_documents,
indexed_documents: Some(0),
});
task.error = Some(milli::Error::from(e).into());
}
}
}
DocumentOperation::Delete(document_ids) => {
let (new_builder, user_result) =
builder.remove_documents(document_ids)?;
builder = new_builder;
let Some(Details::DocumentDeletion { provided_ids, .. }) = task.details
// In the case of a `documentAdditionOrUpdate` the details MUST be set
else { unreachable!(); };
match user_result {
Ok(count) => {
task.status = Status::Succeeded;
task.details = Some(Details::DocumentDeletion {
provided_ids,
deleted_documents: Some(count),
});
}
Err(e) => {
task.status = Status::Failed;
task.details = Some(Details::DocumentDeletion {
provided_ids,
deleted_documents: Some(0),
});
task.error = Some(milli::Error::from(e).into());
}
}
}
}
} }
if results.iter().any(|res| res.is_ok()) { if !tasks.iter().all(|res| res.error.is_some()) {
let addition = builder.execute()?; let addition = builder.execute()?;
info!("document addition done: {:?}", addition); info!("document addition done: {:?}", addition);
} else if primary_key_has_been_set { } else if primary_key_has_been_set {
@ -1071,29 +1134,6 @@ impl IndexScheduler {
)?; )?;
} }
for (task, (ret, count)) in
tasks.iter_mut().zip(results.into_iter().zip(documents_counts))
{
match ret {
Ok(DocumentAdditionResult { indexed_documents, number_of_documents }) => {
task.status = Status::Succeeded;
task.details = Some(Details::DocumentAdditionOrUpdate {
received_documents: number_of_documents,
indexed_documents: Some(indexed_documents),
});
}
Err(error) => {
task.status = Status::Failed;
task.details = Some(Details::DocumentAdditionOrUpdate {
received_documents: count,
// if there was an error we indexed 0 documents.
indexed_documents: Some(0),
});
task.error = Some(error.into())
}
}
}
Ok(tasks) Ok(tasks)
} }
IndexOperation::DocumentDeletion { index_uid: _, documents, mut tasks } => { IndexOperation::DocumentDeletion { index_uid: _, documents, mut tasks } => {
@ -1136,12 +1176,12 @@ impl IndexScheduler {
Ok(tasks) Ok(tasks)
} }
IndexOperation::SettingsAndDocumentImport { IndexOperation::SettingsAndDocumentOperation {
index_uid, index_uid,
primary_key, primary_key,
method, method,
documents_counts, documents_counts,
content_files, operations,
document_import_tasks, document_import_tasks,
settings, settings,
settings_tasks, settings_tasks,
@ -1159,12 +1199,12 @@ impl IndexScheduler {
let mut import_tasks = self.apply_index_operation( let mut import_tasks = self.apply_index_operation(
index_wtxn, index_wtxn,
index, index,
IndexOperation::DocumentImport { IndexOperation::DocumentOperation {
index_uid, index_uid,
primary_key, primary_key,
method, method,
documents_counts, documents_counts,
content_files, operations,
tasks: document_import_tasks, tasks: document_import_tasks,
}, },
)?; )?;

View File

@ -1679,6 +1679,100 @@ mod tests {
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "both_task_succeeded"); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "both_task_succeeded");
} }
#[test]
fn document_addition_and_document_deletion() {
let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]);
let content = r#"[
{ "id": 1, "doggo": "jean bob" },
{ "id": 2, "catto": "jorts" },
{ "id": 3, "doggo": "bork" }
]"#;
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap();
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap();
file.persist().unwrap();
index_scheduler
.register(KindWithContent::DocumentAdditionOrUpdate {
index_uid: S("doggos"),
primary_key: Some(S("id")),
method: ReplaceDocuments,
content_file: uuid,
documents_count,
allow_index_creation: true,
})
.unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task");
index_scheduler
.register(KindWithContent::DocumentDeletion {
index_uid: S("doggos"),
documents_ids: vec![S("1"), S("2")],
})
.unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_second_task");
handle.advance_one_successful_batch(); // The addition AND deletion should've been batched together
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_processing_the_batch");
let index = index_scheduler.index("doggos").unwrap();
let rtxn = index.read_txn().unwrap();
let field_ids_map = index.fields_ids_map(&rtxn).unwrap();
let field_ids = field_ids_map.ids().collect::<Vec<_>>();
let documents = index
.all_documents(&rtxn)
.unwrap()
.map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap())
.collect::<Vec<_>>();
snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents");
}
#[test]
fn document_deletion_and_document_addition() {
let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]);
index_scheduler
.register(KindWithContent::DocumentDeletion {
index_uid: S("doggos"),
documents_ids: vec![S("1"), S("2")],
})
.unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task");
let content = r#"[
{ "id": 1, "doggo": "jean bob" },
{ "id": 2, "catto": "jorts" },
{ "id": 3, "doggo": "bork" }
]"#;
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap();
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap();
file.persist().unwrap();
index_scheduler
.register(KindWithContent::DocumentAdditionOrUpdate {
index_uid: S("doggos"),
primary_key: Some(S("id")),
method: ReplaceDocuments,
content_file: uuid,
documents_count,
allow_index_creation: true,
})
.unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_second_task");
handle.advance_one_successful_batch(); // The deletion AND addition should've been batched together
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_processing_the_batch");
let index = index_scheduler.index("doggos").unwrap();
let rtxn = index.read_txn().unwrap();
let field_ids_map = index.fields_ids_map(&rtxn).unwrap();
let field_ids = field_ids_map.ids().collect::<Vec<_>>();
let documents = index
.all_documents(&rtxn)
.unwrap()
.map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap())
.collect::<Vec<_>>();
snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents");
}
#[test] #[test]
fn do_not_batch_task_of_different_indexes() { fn do_not_batch_task_of_different_indexes() {
let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]); let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]);

View File

@ -0,0 +1,42 @@
---
source: index-scheduler/src/lib.rs
---
### Autobatching Enabled = true
### Processing Tasks:
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: succeeded, details: { received_documents: 3, indexed_documents: Some(3) }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("id"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 3, allow_index_creation: true }}
1 {uid: 1, status: succeeded, details: { received_document_ids: 2, deleted_documents: Some(2) }, kind: DocumentDeletion { index_uid: "doggos", documents_ids: ["1", "2"] }}
----------------------------------------------------------------------
### Status:
enqueued []
succeeded [0,1,]
----------------------------------------------------------------------
### Kind:
"documentAdditionOrUpdate" [0,]
"documentDeletion" [1,]
----------------------------------------------------------------------
### Index Tasks:
doggos [0,1,]
----------------------------------------------------------------------
### Index Mapper:
["doggos"]
----------------------------------------------------------------------
### Canceled By:
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
[timestamp] [1,]
----------------------------------------------------------------------
### Started At:
[timestamp] [0,1,]
----------------------------------------------------------------------
### Finished At:
[timestamp] [0,1,]
----------------------------------------------------------------------
### File Store:
----------------------------------------------------------------------

View File

@ -0,0 +1,9 @@
---
source: index-scheduler/src/lib.rs
---
[
{
"id": 3,
"doggo": "bork"
}
]

View File

@ -0,0 +1,37 @@
---
source: index-scheduler/src/lib.rs
---
### Autobatching Enabled = true
### Processing Tasks:
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: enqueued, details: { received_documents: 3, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("id"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 3, allow_index_creation: true }}
----------------------------------------------------------------------
### Status:
enqueued [0,]
----------------------------------------------------------------------
### Kind:
"documentAdditionOrUpdate" [0,]
----------------------------------------------------------------------
### Index Tasks:
doggos [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
----------------------------------------------------------------------
### Started At:
----------------------------------------------------------------------
### Finished At:
----------------------------------------------------------------------
### File Store:
00000000-0000-0000-0000-000000000000
----------------------------------------------------------------------

View File

@ -0,0 +1,40 @@
---
source: index-scheduler/src/lib.rs
---
### Autobatching Enabled = true
### Processing Tasks:
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: enqueued, details: { received_documents: 3, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("id"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 3, allow_index_creation: true }}
1 {uid: 1, status: enqueued, details: { received_document_ids: 2, deleted_documents: None }, kind: DocumentDeletion { index_uid: "doggos", documents_ids: ["1", "2"] }}
----------------------------------------------------------------------
### Status:
enqueued [0,1,]
----------------------------------------------------------------------
### Kind:
"documentAdditionOrUpdate" [0,]
"documentDeletion" [1,]
----------------------------------------------------------------------
### Index Tasks:
doggos [0,1,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
[timestamp] [1,]
----------------------------------------------------------------------
### Started At:
----------------------------------------------------------------------
### Finished At:
----------------------------------------------------------------------
### File Store:
00000000-0000-0000-0000-000000000000
----------------------------------------------------------------------

View File

@ -0,0 +1,36 @@
---
source: index-scheduler/src/lib.rs
---
### Autobatching Enabled = true
### Processing Tasks:
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: enqueued, details: { received_document_ids: 2, deleted_documents: None }, kind: DocumentDeletion { index_uid: "doggos", documents_ids: ["1", "2"] }}
----------------------------------------------------------------------
### Status:
enqueued [0,]
----------------------------------------------------------------------
### Kind:
"documentDeletion" [0,]
----------------------------------------------------------------------
### Index Tasks:
doggos [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
----------------------------------------------------------------------
### Started At:
----------------------------------------------------------------------
### Finished At:
----------------------------------------------------------------------
### File Store:
----------------------------------------------------------------------

View File

@ -0,0 +1,40 @@
---
source: index-scheduler/src/lib.rs
---
### Autobatching Enabled = true
### Processing Tasks:
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: enqueued, details: { received_document_ids: 2, deleted_documents: None }, kind: DocumentDeletion { index_uid: "doggos", documents_ids: ["1", "2"] }}
1 {uid: 1, status: enqueued, details: { received_documents: 3, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("id"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 3, allow_index_creation: true }}
----------------------------------------------------------------------
### Status:
enqueued [0,1,]
----------------------------------------------------------------------
### Kind:
"documentAdditionOrUpdate" [1,]
"documentDeletion" [0,]
----------------------------------------------------------------------
### Index Tasks:
doggos [0,1,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
[timestamp] [1,]
----------------------------------------------------------------------
### Started At:
----------------------------------------------------------------------
### Finished At:
----------------------------------------------------------------------
### File Store:
00000000-0000-0000-0000-000000000000
----------------------------------------------------------------------