From f68906f5dc3c62cf7774978ebfa87fae79fd13e9 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Thu, 29 Sep 2022 15:49:54 +0200 Subject: [PATCH] Merge both DocumentAddition/Update into one DocumentImport variant --- index-scheduler/src/autobatcher.rs | 226 ++++++++++++------------- index-scheduler/src/batch.rs | 187 ++++---------------- index-scheduler/src/index_scheduler.rs | 22 +-- index-scheduler/src/task.rs | 29 ++-- 4 files changed, 167 insertions(+), 297 deletions(-) diff --git a/index-scheduler/src/autobatcher.rs b/index-scheduler/src/autobatcher.rs index 2d537c2f6..b804e9caa 100644 --- a/index-scheduler/src/autobatcher.rs +++ b/index-scheduler/src/autobatcher.rs @@ -1,3 +1,4 @@ +use milli::update::IndexDocumentsMethod::{self, ReplaceDocuments, UpdateDocuments}; use std::ops::ControlFlow; use crate::{task::Kind, TaskId}; @@ -7,11 +8,9 @@ pub enum BatchKind { DocumentClear { ids: Vec, }, - DocumentAddition { - addition_ids: Vec, - }, - DocumentUpdate { - update_ids: Vec, + DocumentImport { + method: IndexDocumentsMethod, + import_ids: Vec, }, DocumentDeletion { deletion_ids: Vec, @@ -20,13 +19,10 @@ pub enum BatchKind { other: Vec, settings_ids: Vec, }, - SettingsAndDocumentAddition { + SettingsAndDocumentImport { settings_ids: Vec, - addition_ids: Vec, - }, - SettingsAndDocumentUpdate { - settings_ids: Vec, - update_ids: Vec, + method: IndexDocumentsMethod, + import_ids: Vec, }, Settings { settings_ids: Vec, @@ -59,14 +55,16 @@ impl BatchKind { Kind::IndexSwap => (BatchKind::IndexSwap { id: task_id }, true), Kind::DocumentClear => (BatchKind::DocumentClear { ids: vec![task_id] }, false), Kind::DocumentAddition => ( - BatchKind::DocumentAddition { - addition_ids: vec![task_id], + BatchKind::DocumentImport { + method: ReplaceDocuments, + import_ids: vec![task_id], }, false, ), Kind::DocumentUpdate => ( - BatchKind::DocumentUpdate { - update_ids: vec![task_id], + BatchKind::DocumentImport { + method: UpdateDocuments, + import_ids: vec![task_id], }, false, ), @@ -98,11 +96,9 @@ impl BatchKind { // The index deletion can batch with everything but must stop after ( BatchKind::DocumentClear { mut ids } - | BatchKind::DocumentAddition { - addition_ids: mut ids, - } - | BatchKind::DocumentUpdate { - update_ids: mut ids, + | BatchKind::DocumentImport { + method: _, + import_ids: mut ids, } | BatchKind::DocumentDeletion { deletion_ids: mut ids, @@ -120,12 +116,9 @@ impl BatchKind { settings_ids: mut ids, mut other, } - | BatchKind::SettingsAndDocumentAddition { - addition_ids: mut ids, - settings_ids: mut other, - } - | BatchKind::SettingsAndDocumentUpdate { - update_ids: mut ids, + | BatchKind::SettingsAndDocumentImport { + import_ids: mut ids, + method: _, settings_ids: mut other, }, Kind::IndexDeletion, @@ -147,11 +140,9 @@ impl BatchKind { Kind::DocumentAddition | Kind::DocumentUpdate | Kind::Settings, ) => ControlFlow::Break(this), ( - BatchKind::DocumentAddition { - addition_ids: mut ids, - } - | BatchKind::DocumentUpdate { - update_ids: mut ids, + BatchKind::DocumentImport { + method: _, + import_ids: mut ids, }, Kind::DocumentClear, ) => { @@ -160,30 +151,43 @@ impl BatchKind { } // we can autobatch the same kind of document additions / updates - (BatchKind::DocumentAddition { mut addition_ids }, Kind::DocumentAddition) => { - addition_ids.push(id); - ControlFlow::Continue(BatchKind::DocumentAddition { addition_ids }) + ( + BatchKind::DocumentImport { + method: ReplaceDocuments, + mut import_ids, + }, + Kind::DocumentAddition, + ) => { + import_ids.push(id); + ControlFlow::Continue(BatchKind::DocumentImport { + method: ReplaceDocuments, + import_ids, + }) } - (BatchKind::DocumentUpdate { mut update_ids }, Kind::DocumentUpdate) => { - update_ids.push(id); - ControlFlow::Continue(BatchKind::DocumentUpdate { update_ids }) + ( + BatchKind::DocumentImport { + method: UpdateDocuments, + mut import_ids, + }, + Kind::DocumentUpdate, + ) => { + import_ids.push(id); + ControlFlow::Continue(BatchKind::DocumentImport { + method: UpdateDocuments, + import_ids, + }) } // but we can't autobatch documents if it's not the same kind // this match branch MUST be AFTER the previous one ( - this @ BatchKind::DocumentAddition { .. } | this @ BatchKind::DocumentUpdate { .. }, + this @ BatchKind::DocumentImport { .. }, Kind::DocumentDeletion | Kind::DocumentAddition | Kind::DocumentUpdate, ) => ControlFlow::Break(this), - (BatchKind::DocumentAddition { addition_ids }, Kind::Settings) => { - ControlFlow::Continue(BatchKind::SettingsAndDocumentAddition { + (BatchKind::DocumentImport { method, import_ids }, Kind::Settings) => { + ControlFlow::Continue(BatchKind::SettingsAndDocumentImport { settings_ids: vec![id], - addition_ids, - }) - } - (BatchKind::DocumentUpdate { update_ids }, Kind::Settings) => { - ControlFlow::Continue(BatchKind::SettingsAndDocumentUpdate { - settings_ids: vec![id], - update_ids, + method, + import_ids, }) } @@ -260,18 +264,14 @@ impl BatchKind { }) } ( - BatchKind::SettingsAndDocumentAddition { + BatchKind::SettingsAndDocumentImport { settings_ids, - addition_ids: mut other, - } - | BatchKind::SettingsAndDocumentUpdate { - settings_ids, - update_ids: mut other, + method: _, + import_ids: mut other, }, Kind::DocumentClear, ) => { other.push(id); - ControlFlow::Continue(BatchKind::ClearAndSettings { settings_ids, other, @@ -280,62 +280,54 @@ impl BatchKind { // we can batch the settings with a kind of document operation with the same kind of document operation ( - BatchKind::SettingsAndDocumentAddition { - mut addition_ids, + BatchKind::SettingsAndDocumentImport { settings_ids, + method: ReplaceDocuments, + mut import_ids, }, Kind::DocumentAddition, ) => { - addition_ids.push(id); - ControlFlow::Continue(BatchKind::SettingsAndDocumentAddition { - addition_ids, + import_ids.push(id); + ControlFlow::Continue(BatchKind::SettingsAndDocumentImport { settings_ids, + method: ReplaceDocuments, + import_ids, }) } ( - BatchKind::SettingsAndDocumentUpdate { - mut update_ids, + BatchKind::SettingsAndDocumentImport { settings_ids, + method: UpdateDocuments, + mut import_ids, }, Kind::DocumentUpdate, ) => { - update_ids.push(id); - ControlFlow::Continue(BatchKind::SettingsAndDocumentUpdate { - update_ids, + import_ids.push(id); + ControlFlow::Continue(BatchKind::SettingsAndDocumentImport { settings_ids, + method: UpdateDocuments, + import_ids, }) } // But we can't batch a settings and a doc op with another doc op // this MUST be AFTER the two previous branch ( - this @ BatchKind::SettingsAndDocumentAddition { .. } - | this @ BatchKind::SettingsAndDocumentUpdate { .. }, + this @ BatchKind::SettingsAndDocumentImport { .. }, Kind::DocumentDeletion | Kind::DocumentAddition | Kind::DocumentUpdate, ) => ControlFlow::Break(this), ( - BatchKind::SettingsAndDocumentAddition { + BatchKind::SettingsAndDocumentImport { mut settings_ids, - addition_ids, + method, + import_ids, }, Kind::Settings, ) => { settings_ids.push(id); - ControlFlow::Continue(BatchKind::SettingsAndDocumentAddition { + ControlFlow::Continue(BatchKind::SettingsAndDocumentImport { settings_ids, - addition_ids, - }) - } - ( - BatchKind::SettingsAndDocumentUpdate { - mut settings_ids, - update_ids, - }, - Kind::Settings, - ) => { - settings_ids.push(id); - ControlFlow::Continue(BatchKind::SettingsAndDocumentUpdate { - settings_ids, - update_ids, + method, + import_ids, }) } (_, Kind::CancelTask | Kind::DumpExport | Kind::Snapshot) => unreachable!(), @@ -391,11 +383,11 @@ mod tests { #[test] fn autobatch_simple_operation_together() { // we can autobatch one or multiple DocumentAddition together - assert_smol_debug_snapshot!(autobatch_from([DocumentAddition]), @"Some(DocumentAddition { addition_ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, DocumentAddition, DocumentAddition]), @"Some(DocumentAddition { addition_ids: [0, 1, 2] })"); + assert_smol_debug_snapshot!(autobatch_from([DocumentAddition]), @"Some(DocumentImport { method: ReplaceDocuments, import_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, DocumentAddition, DocumentAddition]), @"Some(DocumentImport { method: ReplaceDocuments, import_ids: [0, 1, 2] })"); // we can autobatch one or multiple DocumentUpdate together - assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate]), @"Some(DocumentUpdate { update_ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, DocumentUpdate, DocumentUpdate]), @"Some(DocumentUpdate { update_ids: [0, 1, 2] })"); + assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate]), @"Some(DocumentImport { method: UpdateDocuments, import_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, DocumentUpdate, DocumentUpdate]), @"Some(DocumentImport { method: UpdateDocuments, import_ids: [0, 1, 2] })"); // we can autobatch one or multiple DocumentDeletion together assert_smol_debug_snapshot!(autobatch_from([DocumentDeletion]), @"Some(DocumentDeletion { deletion_ids: [0] })"); assert_smol_debug_snapshot!(autobatch_from([DocumentDeletion, DocumentDeletion, DocumentDeletion]), @"Some(DocumentDeletion { deletion_ids: [0, 1, 2] })"); @@ -407,57 +399,57 @@ mod tests { #[test] fn simple_document_operation_dont_autobatch_with_other() { // addition, updates and deletion can't batch together - assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, DocumentUpdate]), @"Some(DocumentAddition { addition_ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, DocumentDeletion]), @"Some(DocumentAddition { addition_ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, DocumentAddition]), @"Some(DocumentUpdate { update_ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, DocumentDeletion]), @"Some(DocumentUpdate { update_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, DocumentUpdate]), @"Some(DocumentImport { method: ReplaceDocuments, import_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, DocumentDeletion]), @"Some(DocumentImport { method: ReplaceDocuments, import_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, DocumentAddition]), @"Some(DocumentImport { method: UpdateDocuments, import_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, DocumentDeletion]), @"Some(DocumentImport { method: UpdateDocuments, import_ids: [0] })"); assert_smol_debug_snapshot!(autobatch_from([DocumentDeletion, DocumentAddition]), @"Some(DocumentDeletion { deletion_ids: [0] })"); assert_smol_debug_snapshot!(autobatch_from([DocumentDeletion, DocumentUpdate]), @"Some(DocumentDeletion { deletion_ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, IndexCreation]), @"Some(DocumentAddition { addition_ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, IndexCreation]), @"Some(DocumentUpdate { update_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, IndexCreation]), @"Some(DocumentImport { method: ReplaceDocuments, import_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, IndexCreation]), @"Some(DocumentImport { method: UpdateDocuments, import_ids: [0] })"); assert_smol_debug_snapshot!(autobatch_from([DocumentDeletion, IndexCreation]), @"Some(DocumentDeletion { deletion_ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, IndexUpdate]), @"Some(DocumentAddition { addition_ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, IndexUpdate]), @"Some(DocumentUpdate { update_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, IndexUpdate]), @"Some(DocumentImport { method: ReplaceDocuments, import_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, IndexUpdate]), @"Some(DocumentImport { method: UpdateDocuments, import_ids: [0] })"); assert_smol_debug_snapshot!(autobatch_from([DocumentDeletion, IndexUpdate]), @"Some(DocumentDeletion { deletion_ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, IndexRename]), @"Some(DocumentAddition { addition_ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, IndexRename]), @"Some(DocumentUpdate { update_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, IndexRename]), @"Some(DocumentImport { method: ReplaceDocuments, import_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, IndexRename]), @"Some(DocumentImport { method: UpdateDocuments, import_ids: [0] })"); assert_smol_debug_snapshot!(autobatch_from([DocumentDeletion, IndexRename]), @"Some(DocumentDeletion { deletion_ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, IndexSwap]), @"Some(DocumentAddition { addition_ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, IndexSwap]), @"Some(DocumentUpdate { update_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, IndexSwap]), @"Some(DocumentImport { method: ReplaceDocuments, import_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, IndexSwap]), @"Some(DocumentImport { method: UpdateDocuments, import_ids: [0] })"); assert_smol_debug_snapshot!(autobatch_from([DocumentDeletion, IndexSwap]), @"Some(DocumentDeletion { deletion_ids: [0] })"); } #[test] fn document_addition_batch_with_settings() { // simple case - assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, Settings]), @"Some(SettingsAndDocumentAddition { settings_ids: [1], addition_ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, Settings]), @"Some(SettingsAndDocumentUpdate { settings_ids: [1], update_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, Settings]), @"Some(SettingsAndDocumentImport { settings_ids: [1], method: ReplaceDocuments, import_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, Settings]), @"Some(SettingsAndDocumentImport { settings_ids: [1], method: UpdateDocuments, import_ids: [0] })"); // multiple settings and doc addition - assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, DocumentAddition, Settings, Settings]), @"Some(SettingsAndDocumentAddition { settings_ids: [2, 3], addition_ids: [0, 1] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, DocumentAddition, Settings, Settings]), @"Some(SettingsAndDocumentAddition { settings_ids: [2, 3], addition_ids: [0, 1] })"); + assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, DocumentAddition, Settings, Settings]), @"Some(SettingsAndDocumentImport { settings_ids: [2, 3], method: ReplaceDocuments, import_ids: [0, 1] })"); + assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, DocumentAddition, Settings, Settings]), @"Some(SettingsAndDocumentImport { settings_ids: [2, 3], method: ReplaceDocuments, import_ids: [0, 1] })"); // addition and setting unordered - assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, Settings, DocumentAddition, Settings]), @"Some(SettingsAndDocumentAddition { settings_ids: [1, 3], addition_ids: [0, 2] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, Settings, DocumentUpdate, Settings]), @"Some(SettingsAndDocumentUpdate { settings_ids: [1, 3], update_ids: [0, 2] })"); + assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, Settings, DocumentAddition, Settings]), @"Some(SettingsAndDocumentImport { settings_ids: [1, 3], method: ReplaceDocuments, import_ids: [0, 2] })"); + assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, Settings, DocumentUpdate, Settings]), @"Some(SettingsAndDocumentImport { settings_ids: [1, 3], method: UpdateDocuments, import_ids: [0, 2] })"); // We ensure this kind of batch doesn't batch with forbidden operations - assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, Settings, DocumentUpdate]), @"Some(SettingsAndDocumentAddition { settings_ids: [1], addition_ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, Settings, DocumentAddition]), @"Some(SettingsAndDocumentUpdate { settings_ids: [1], update_ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, Settings, DocumentDeletion]), @"Some(SettingsAndDocumentAddition { settings_ids: [1], addition_ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, Settings, DocumentDeletion]), @"Some(SettingsAndDocumentUpdate { settings_ids: [1], update_ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, Settings, IndexCreation]), @"Some(SettingsAndDocumentAddition { settings_ids: [1], addition_ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, Settings, IndexCreation]), @"Some(SettingsAndDocumentUpdate { settings_ids: [1], update_ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, Settings, IndexUpdate]), @"Some(SettingsAndDocumentAddition { settings_ids: [1], addition_ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, Settings, IndexUpdate]), @"Some(SettingsAndDocumentUpdate { settings_ids: [1], update_ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, Settings, IndexRename]), @"Some(SettingsAndDocumentAddition { settings_ids: [1], addition_ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, Settings, IndexRename]), @"Some(SettingsAndDocumentUpdate { settings_ids: [1], update_ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, Settings, IndexSwap]), @"Some(SettingsAndDocumentAddition { settings_ids: [1], addition_ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, Settings, IndexSwap]), @"Some(SettingsAndDocumentUpdate { settings_ids: [1], update_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, Settings, DocumentUpdate]), @"Some(SettingsAndDocumentImport { settings_ids: [1], method: ReplaceDocuments, import_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, Settings, DocumentAddition]), @"Some(SettingsAndDocumentImport { settings_ids: [1], method: UpdateDocuments, import_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, Settings, DocumentDeletion]), @"Some(SettingsAndDocumentImport { settings_ids: [1], method: ReplaceDocuments, import_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, Settings, DocumentDeletion]), @"Some(SettingsAndDocumentImport { settings_ids: [1], method: UpdateDocuments, import_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, Settings, IndexCreation]), @"Some(SettingsAndDocumentImport { settings_ids: [1], method: ReplaceDocuments, import_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, Settings, IndexCreation]), @"Some(SettingsAndDocumentImport { settings_ids: [1], method: UpdateDocuments, import_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, Settings, IndexUpdate]), @"Some(SettingsAndDocumentImport { settings_ids: [1], method: ReplaceDocuments, import_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, Settings, IndexUpdate]), @"Some(SettingsAndDocumentImport { settings_ids: [1], method: UpdateDocuments, import_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, Settings, IndexRename]), @"Some(SettingsAndDocumentImport { settings_ids: [1], method: ReplaceDocuments, import_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, Settings, IndexRename]), @"Some(SettingsAndDocumentImport { settings_ids: [1], method: UpdateDocuments, import_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, Settings, IndexSwap]), @"Some(SettingsAndDocumentImport { settings_ids: [1], method: ReplaceDocuments, import_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, Settings, IndexSwap]), @"Some(SettingsAndDocumentImport { settings_ids: [1], method: UpdateDocuments, import_ids: [0] })"); } #[test] diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 031289fc5..3d4567c1f 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -12,15 +12,10 @@ pub(crate) enum Batch { Cancel(Task), Snapshot(Vec), Dump(Vec), - DocumentAddition { - index_uid: String, - primary_key: Option, - content_files: Vec, - tasks: Vec, - }, - DocumentUpdate { + DocumentImport { index_uid: String, primary_key: Option, + method: IndexDocumentsMethod, content_files: Vec, tasks: Vec, }, @@ -47,23 +42,13 @@ pub(crate) enum Batch { settings: Vec<(bool, Settings)>, settings_tasks: Vec, }, - SettingsAndDocumentAddition { + SettingsAndDocumentImport { index_uid: String, primary_key: Option, + method: IndexDocumentsMethod, content_files: Vec, - document_addition_tasks: Vec, - - // TODO what's that boolean, does it mean that it removes things or what? - settings: Vec<(bool, Settings)>, - settings_tasks: Vec, - }, - SettingsAndDocumentUpdate { - index_uid: String, - - primary_key: Option, - content_files: Vec, - document_update_tasks: Vec, + document_import_tasks: Vec, // TODO what's that boolean, does it mean that it removes things or what? settings: Vec<(bool, Settings)>, @@ -93,14 +78,13 @@ impl Batch { | Batch::IndexUpdate { task, .. } => vec![task.uid], Batch::Snapshot(tasks) | Batch::Dump(tasks) - | Batch::DocumentAddition { tasks, .. } - | Batch::DocumentUpdate { tasks, .. } + | Batch::DocumentImport { tasks, .. } | Batch::DocumentDeletion { tasks, .. } | Batch::Settings { tasks, .. } | Batch::DocumentClear { tasks, .. } | Batch::IndexDeletion { tasks, .. } => tasks.iter().map(|task| task.uid).collect(), - Batch::SettingsAndDocumentAddition { - document_addition_tasks: tasks, + Batch::SettingsAndDocumentImport { + document_import_tasks: tasks, settings_tasks: other, .. } @@ -108,11 +92,6 @@ impl Batch { cleared_tasks: tasks, settings_tasks: other, .. - } - | Batch::SettingsAndDocumentUpdate { - document_update_tasks: tasks, - settings_tasks: other, - .. } => tasks.iter().chain(other).map(|task| task.uid).collect(), } } @@ -130,44 +109,24 @@ impl IndexScheduler { tasks: self.get_existing_tasks(rtxn, ids)?, index_uid, })), - BatchKind::DocumentAddition { addition_ids } => { - let tasks = self.get_existing_tasks(rtxn, addition_ids)?; + BatchKind::DocumentImport { method, import_ids } => { + let tasks = self.get_existing_tasks(rtxn, import_ids)?; let primary_key = match &tasks[0].kind { - KindWithContent::DocumentAddition { primary_key, .. } => primary_key.clone(), + KindWithContent::DocumentImport { primary_key, .. } => primary_key.clone(), _ => unreachable!(), }; let content_files = tasks .iter() .map(|task| match task.kind { - KindWithContent::DocumentAddition { content_file, .. } => content_file, + KindWithContent::DocumentImport { content_file, .. } => content_file, _ => unreachable!(), }) .collect(); - Ok(Some(Batch::DocumentAddition { - index_uid, - primary_key, - content_files, - tasks, - })) - } - BatchKind::DocumentUpdate { update_ids } => { - let tasks = self.get_existing_tasks(rtxn, update_ids)?; - let primary_key = match &tasks[0].kind { - KindWithContent::DocumentUpdate { primary_key, .. } => primary_key.clone(), - _ => unreachable!(), - }; - let content_files = tasks - .iter() - .map(|task| match task.kind { - KindWithContent::DocumentUpdate { content_file, .. } => content_file, - _ => unreachable!(), - }) - .collect(); - - Ok(Some(Batch::DocumentUpdate { + Ok(Some(Batch::DocumentImport { index_uid, primary_key, + method, content_files, tasks, })) @@ -246,51 +205,10 @@ impl IndexScheduler { settings_tasks, })) } - BatchKind::SettingsAndDocumentAddition { - addition_ids, - settings_ids, - } => { - let (index_uid, settings, settings_tasks) = match self - .create_next_batch_index(rtxn, index_uid, BatchKind::Settings { settings_ids })? - .unwrap() - { - Batch::Settings { - index_uid, - settings, - tasks, - } => (index_uid, settings, tasks), - _ => unreachable!(), - }; - - let (index_uid, primary_key, content_files, document_addition_tasks) = match self - .create_next_batch_index( - rtxn, - index_uid, - BatchKind::DocumentAddition { addition_ids }, - )? - .unwrap() - { - Batch::DocumentAddition { - index_uid, - primary_key, - content_files, - tasks, - } => (index_uid, primary_key, content_files, tasks), - _ => unreachable!(), - }; - - Ok(Some(Batch::SettingsAndDocumentAddition { - index_uid, - primary_key, - content_files, - document_addition_tasks, - settings, - settings_tasks, - })) - } - BatchKind::SettingsAndDocumentUpdate { - update_ids, + BatchKind::SettingsAndDocumentImport { settings_ids, + method, + import_ids, } => { let settings = self.create_next_batch_index( rtxn, @@ -298,18 +216,18 @@ impl IndexScheduler { BatchKind::Settings { settings_ids }, )?; - let document_update = self.create_next_batch_index( + let document_import = self.create_next_batch_index( rtxn, index_uid.clone(), - BatchKind::DocumentUpdate { update_ids }, + BatchKind::DocumentImport { method, import_ids }, )?; - match (document_update, settings) { + match (document_import, settings) { ( - Some(Batch::DocumentUpdate { + Some(Batch::DocumentImport { primary_key, content_files, - tasks: document_update_tasks, + tasks: document_import_tasks, .. }), Some(Batch::Settings { @@ -317,11 +235,12 @@ impl IndexScheduler { tasks: settings_tasks, .. }), - ) => Ok(Some(Batch::SettingsAndDocumentUpdate { + ) => Ok(Some(Batch::SettingsAndDocumentImport { index_uid, primary_key, + method, content_files, - document_update_tasks, + document_import_tasks, settings, settings_tasks, })), @@ -453,10 +372,10 @@ impl IndexScheduler { Ok(tasks) } - // TODO we should merge both document import with a method field - Batch::DocumentAddition { + Batch::DocumentImport { index_uid, primary_key, + method, content_files, mut tasks, } => { @@ -467,7 +386,7 @@ impl IndexScheduler { wtxn.commit()?; let ret = index.update_documents( - IndexDocumentsMethod::ReplaceDocuments, + method, primary_key, self.file_store.clone(), content_files, @@ -490,53 +409,17 @@ impl IndexScheduler { Ok(tasks) } - Batch::SettingsAndDocumentAddition { + Batch::SettingsAndDocumentImport { index_uid, primary_key, + method, content_files, - document_addition_tasks, + document_import_tasks, settings: _, settings_tasks: _, } => { todo!(); } - // TODO we should merge both document import with a method field - Batch::DocumentUpdate { - index_uid, - primary_key, - content_files, - mut tasks, - } => { - // we NEED a write transaction for the index creation. - // To avoid blocking the whole process we're going to commit asap. - let mut wtxn = self.env.write_txn()?; - let index = self.index_mapper.create_index(&mut wtxn, &index_uid)?; - wtxn.commit()?; - - let ret = index.update_documents( - IndexDocumentsMethod::UpdateDocuments, - primary_key, - self.file_store.clone(), - content_files, - )?; - - for (task, ret) in tasks.iter_mut().zip(ret) { - match ret { - Ok(DocumentAdditionResult { - indexed_documents, - number_of_documents, - }) => { - task.details = Some(Details::DocumentAddition { - received_documents: number_of_documents, - indexed_documents, - }); - } - Err(error) => task.error = Some(error.into()), - } - } - - Ok(tasks) - } Batch::DocumentDeletion { index_uid, documents, @@ -628,14 +511,6 @@ impl IndexScheduler { tasks.append(&mut settings_tasks); Ok(tasks) } - Batch::SettingsAndDocumentUpdate { - index_uid, - primary_key, - content_files, - document_update_tasks, - settings, - settings_tasks, - } => todo!(), Batch::IndexCreation { index_uid, primary_key, diff --git a/index-scheduler/src/index_scheduler.rs b/index-scheduler/src/index_scheduler.rs index cc0d99791..308a00fa5 100644 --- a/index-scheduler/src/index_scheduler.rs +++ b/index-scheduler/src/index_scheduler.rs @@ -84,10 +84,7 @@ impl Query { } pub fn with_limit(self, limit: u32) -> Self { - Self { - limit, - ..self - } + Self { limit, ..self } } } @@ -435,6 +432,7 @@ impl IndexScheduler { mod tests { use big_s::S; use insta::*; + use milli::update::IndexDocumentsMethod::{self, ReplaceDocuments, UpdateDocuments}; use tempfile::TempDir; use uuid::Uuid; @@ -501,24 +499,27 @@ mod tests { index_uid: S("catto"), primary_key: Some(S("mouse")), }, - KindWithContent::DocumentAddition { + KindWithContent::DocumentImport { index_uid: S("catto"), primary_key: None, + method: ReplaceDocuments, content_file: Uuid::new_v4(), documents_count: 12, allow_index_creation: true, }, KindWithContent::CancelTask { tasks: vec![0, 1] }, - KindWithContent::DocumentAddition { + KindWithContent::DocumentImport { index_uid: S("catto"), primary_key: None, + method: ReplaceDocuments, content_file: Uuid::new_v4(), documents_count: 50, allow_index_creation: true, }, - KindWithContent::DocumentAddition { + KindWithContent::DocumentImport { index_uid: S("doggo"), primary_key: Some(S("bone")), + method: ReplaceDocuments, content_file: Uuid::new_v4(), documents_count: 5000, allow_index_creation: true, @@ -603,9 +604,10 @@ mod tests { let documents_count = document_formats::read_json(content.as_bytes(), file.as_file_mut()).unwrap(); index_scheduler - .register(KindWithContent::DocumentAddition { + .register(KindWithContent::DocumentImport { index_uid: S("doggos"), primary_key: Some(S("id")), + method: ReplaceDocuments, content_file: uuid, documents_count, allow_index_creation: true, @@ -633,7 +635,7 @@ mod tests { // Once the task has started being batched it should be marked as processing let task = index_scheduler.get_tasks(Query::default()).unwrap(); - assert_json_snapshot!(task, + assert_json_snapshot!(task, { "[].enqueuedAt" => "date", "[].startedAt" => "date", "[].finishedAt" => "date", "[].duration" => "duration" } ,@r###" [ @@ -650,7 +652,7 @@ mod tests { handle.wait_till(Breakpoint::AfterProcessing); let task = index_scheduler.get_tasks(Query::default()).unwrap(); - assert_json_snapshot!(task, + assert_json_snapshot!(task, { "[].enqueuedAt" => "date", "[].startedAt" => "date", "[].finishedAt" => "date", "[].duration" => "duration" } ,@r###" [ diff --git a/index-scheduler/src/task.rs b/index-scheduler/src/task.rs index 894a214ac..6552c2ce0 100644 --- a/index-scheduler/src/task.rs +++ b/index-scheduler/src/task.rs @@ -1,6 +1,7 @@ use anyhow::Result; use index::{Settings, Unchecked}; use meilisearch_types::error::ResponseError; +use milli::update::IndexDocumentsMethod; use serde::{Deserialize, Serialize, Serializer}; use std::{fmt::Write, path::PathBuf, str::FromStr}; @@ -125,16 +126,10 @@ impl FromStr for Status { #[derive(Debug, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub enum KindWithContent { - DocumentAddition { - index_uid: String, - primary_key: Option, - content_file: Uuid, - documents_count: usize, - allow_index_creation: bool, - }, - DocumentUpdate { + DocumentImport { index_uid: String, primary_key: Option, + method: IndexDocumentsMethod, content_file: Uuid, documents_count: usize, allow_index_creation: bool, @@ -183,8 +178,15 @@ pub enum KindWithContent { impl KindWithContent { pub fn as_kind(&self) -> Kind { match self { - KindWithContent::DocumentAddition { .. } => Kind::DocumentAddition, - KindWithContent::DocumentUpdate { .. } => Kind::DocumentUpdate, + KindWithContent::DocumentImport { + method: IndexDocumentsMethod::ReplaceDocuments, + .. + } => Kind::DocumentAddition, + KindWithContent::DocumentImport { + method: IndexDocumentsMethod::UpdateDocuments, + .. + } => Kind::DocumentUpdate, + KindWithContent::DocumentImport { .. } => unreachable!(), KindWithContent::DocumentDeletion { .. } => Kind::DocumentDeletion, KindWithContent::DocumentClear { .. } => Kind::DocumentClear, KindWithContent::Settings { .. } => Kind::Settings, @@ -203,7 +205,7 @@ impl KindWithContent { use KindWithContent::*; match self { - DocumentAddition { .. } | DocumentUpdate { .. } => { + DocumentImport { .. } => { // TODO: TAMO: persist the file // content_file.persist(); Ok(()) @@ -226,7 +228,7 @@ impl KindWithContent { use KindWithContent::*; match self { - DocumentAddition { .. } | DocumentUpdate { .. } => { + DocumentImport { .. } => { // TODO: TAMO: delete the file // content_file.delete(); Ok(()) @@ -250,8 +252,7 @@ impl KindWithContent { match self { DumpExport { .. } | Snapshot | CancelTask { .. } => None, - DocumentAddition { index_uid, .. } - | DocumentUpdate { index_uid, .. } + DocumentImport { index_uid, .. } | DocumentDeletion { index_uid, .. } | DocumentClear { index_uid } | Settings { index_uid, .. }