From 00f13f45b6bf0263befc6a2209ce905b178ba20a Mon Sep 17 00:00:00 2001 From: Tamo Date: Tue, 13 Sep 2022 22:38:43 +0200 Subject: [PATCH] starts importing the real tasks --- index-scheduler/src/autobatcher.rs | 112 +++++++++++--------- index-scheduler/src/batch.rs | 26 +++-- index-scheduler/src/task.rs | 157 +++++++++++++++-------------- 3 files changed, 158 insertions(+), 137 deletions(-) diff --git a/index-scheduler/src/autobatcher.rs b/index-scheduler/src/autobatcher.rs index 3512b1b80..2a85792ac 100644 --- a/index-scheduler/src/autobatcher.rs +++ b/index-scheduler/src/autobatcher.rs @@ -3,7 +3,7 @@ use std::ops::ControlFlow; use crate::{task::Kind, TaskId}; pub enum BatchKind { - ClearAll { + DocumentClear { ids: Vec, }, DocumentAddition { @@ -12,7 +12,7 @@ pub enum BatchKind { DocumentDeletion { deletion_ids: Vec, }, - ClearAllAndSettings { + ClearAndSettings { other: Vec, settings_ids: Vec, }, @@ -23,16 +23,19 @@ pub enum BatchKind { Settings { settings_ids: Vec, }, - DeleteIndex { + IndexDeletion { ids: Vec, }, - CreateIndex { + IndexCreation { id: TaskId, }, - SwapIndex { + IndexUpdate { id: TaskId, }, - RenameIndex { + IndexRename { + id: TaskId, + }, + IndexSwap { id: TaskId, }, } @@ -41,12 +44,13 @@ impl BatchKind { /// return true if you must stop right there. pub fn new(task_id: TaskId, kind: Kind) -> (Self, bool) { match kind { - Kind::CreateIndex => (BatchKind::CreateIndex { id: task_id }, true), - Kind::DeleteIndex => (BatchKind::DeleteIndex { ids: vec![task_id] }, true), - Kind::RenameIndex => (BatchKind::RenameIndex { id: task_id }, true), - Kind::SwapIndex => (BatchKind::SwapIndex { id: task_id }, true), - Kind::ClearAllDocuments => (BatchKind::ClearAll { ids: vec![task_id] }, false), - Kind::DocumentAddition => ( + Kind::IndexCreation => (BatchKind::IndexCreation { id: task_id }, true), + Kind::IndexDeletion => (BatchKind::IndexDeletion { ids: vec![task_id] }, true), + Kind::IndexUpdate => (BatchKind::IndexUpdate { id: task_id }, true), + Kind::IndexRename => (BatchKind::IndexRename { id: task_id }, true), + Kind::IndexSwap => (BatchKind::IndexSwap { id: task_id }, true), + Kind::DocumentClear => (BatchKind::DocumentClear { ids: vec![task_id] }, false), + Kind::DocumentAdditionOrUpdate => ( BatchKind::DocumentAddition { addition_ids: vec![task_id], }, @@ -73,12 +77,13 @@ impl BatchKind { fn accumulate(mut self, id: TaskId, kind: Kind) -> ControlFlow { match (self, kind) { // We don't batch any of these operations - (this, Kind::CreateIndex | Kind::RenameIndex | Kind::SwapIndex) => { - ControlFlow::Break(this) - } + ( + this, + Kind::IndexCreation | Kind::IndexRename | Kind::IndexUpdate | Kind::IndexSwap, + ) => ControlFlow::Break(this), // The index deletion can batch with everything but must stop after ( - BatchKind::ClearAll { mut ids } + BatchKind::DocumentClear { mut ids } | BatchKind::DocumentAddition { addition_ids: mut ids, } @@ -88,13 +93,13 @@ impl BatchKind { | BatchKind::Settings { settings_ids: mut ids, }, - Kind::DeleteIndex, + Kind::IndexDeletion, ) => { ids.push(id); - ControlFlow::Break(BatchKind::DeleteIndex { ids }) + ControlFlow::Break(BatchKind::IndexDeletion { ids }) } ( - BatchKind::ClearAllAndSettings { + BatchKind::ClearAndSettings { settings_ids: mut ids, mut other, } @@ -102,26 +107,30 @@ impl BatchKind { addition_ids: mut ids, settings_ids: mut other, }, - Kind::DeleteIndex, + Kind::IndexDeletion, ) => { ids.push(id); ids.append(&mut other); - ControlFlow::Break(BatchKind::DeleteIndex { ids }) + ControlFlow::Break(BatchKind::IndexDeletion { ids }) } - (BatchKind::ClearAll { mut ids }, Kind::ClearAllDocuments | Kind::DocumentDeletion) => { + ( + BatchKind::DocumentClear { mut ids }, + Kind::DocumentClear | Kind::DocumentDeletion, + ) => { ids.push(id); - ControlFlow::Continue(BatchKind::ClearAll { ids }) + ControlFlow::Continue(BatchKind::DocumentClear { ids }) } - (this @ BatchKind::ClearAll { .. }, Kind::DocumentAddition | Kind::Settings) => { - ControlFlow::Break(this) - } - (BatchKind::DocumentAddition { mut addition_ids }, Kind::ClearAllDocuments) => { + ( + this @ BatchKind::DocumentClear { .. }, + Kind::DocumentAdditionOrUpdate | Kind::Settings, + ) => ControlFlow::Break(this), + (BatchKind::DocumentAddition { mut addition_ids }, Kind::DocumentClear) => { addition_ids.push(id); - ControlFlow::Continue(BatchKind::ClearAll { ids: addition_ids }) + ControlFlow::Continue(BatchKind::DocumentClear { ids: addition_ids }) } - (BatchKind::DocumentAddition { mut addition_ids }, Kind::DocumentAddition) => { + (BatchKind::DocumentAddition { mut addition_ids }, Kind::DocumentAdditionOrUpdate) => { addition_ids.push(id); ControlFlow::Continue(BatchKind::DocumentAddition { addition_ids }) } @@ -135,11 +144,11 @@ impl BatchKind { }) } - (BatchKind::DocumentDeletion { mut deletion_ids }, Kind::ClearAllDocuments) => { + (BatchKind::DocumentDeletion { mut deletion_ids }, Kind::DocumentClear) => { deletion_ids.push(id); - ControlFlow::Continue(BatchKind::ClearAll { ids: deletion_ids }) + ControlFlow::Continue(BatchKind::DocumentClear { ids: deletion_ids }) } - (this @ BatchKind::DocumentDeletion { .. }, Kind::DocumentAddition) => { + (this @ BatchKind::DocumentDeletion { .. }, Kind::DocumentAdditionOrUpdate) => { ControlFlow::Break(this) } (BatchKind::DocumentDeletion { mut deletion_ids }, Kind::DocumentDeletion) => { @@ -148,13 +157,15 @@ impl BatchKind { } (this @ BatchKind::DocumentDeletion { .. }, Kind::Settings) => ControlFlow::Break(this), - (BatchKind::Settings { settings_ids }, Kind::ClearAllDocuments) => { - ControlFlow::Continue(BatchKind::ClearAllAndSettings { + (BatchKind::Settings { settings_ids }, Kind::DocumentClear) => { + ControlFlow::Continue(BatchKind::ClearAndSettings { settings_ids: settings_ids.clone(), other: vec![id], }) } - (this @ BatchKind::Settings { .. }, Kind::DocumentAddition) => ControlFlow::Break(this), + (this @ BatchKind::Settings { .. }, Kind::DocumentAdditionOrUpdate) => { + ControlFlow::Break(this) + } (this @ BatchKind::Settings { .. }, Kind::DocumentDeletion) => ControlFlow::Break(this), (BatchKind::Settings { mut settings_ids }, Kind::Settings) => { settings_ids.push(id); @@ -162,43 +173,43 @@ impl BatchKind { } ( - BatchKind::ClearAllAndSettings { + BatchKind::ClearAndSettings { mut other, settings_ids, }, - Kind::ClearAllDocuments, + Kind::DocumentClear, ) => { other.push(id); - ControlFlow::Continue(BatchKind::ClearAllAndSettings { + ControlFlow::Continue(BatchKind::ClearAndSettings { other, settings_ids, }) } - (this @ BatchKind::ClearAllAndSettings { .. }, Kind::DocumentAddition) => { + (this @ BatchKind::ClearAndSettings { .. }, Kind::DocumentAdditionOrUpdate) => { ControlFlow::Break(this) } ( - BatchKind::ClearAllAndSettings { + BatchKind::ClearAndSettings { mut other, settings_ids, }, Kind::DocumentDeletion, ) => { other.push(id); - ControlFlow::Continue(BatchKind::ClearAllAndSettings { + ControlFlow::Continue(BatchKind::ClearAndSettings { other, settings_ids, }) } ( - BatchKind::ClearAllAndSettings { + BatchKind::ClearAndSettings { mut settings_ids, other, }, Kind::Settings, ) => { settings_ids.push(id); - ControlFlow::Continue(BatchKind::ClearAllAndSettings { + ControlFlow::Continue(BatchKind::ClearAndSettings { other, settings_ids, }) @@ -208,11 +219,11 @@ impl BatchKind { settings_ids, mut addition_ids, }, - Kind::ClearAllDocuments, + Kind::DocumentClear, ) => { addition_ids.push(id); - ControlFlow::Continue(BatchKind::ClearAllAndSettings { + ControlFlow::Continue(BatchKind::ClearAndSettings { settings_ids, other: addition_ids, }) @@ -222,7 +233,7 @@ impl BatchKind { mut addition_ids, settings_ids, }, - Kind::DocumentAddition, + Kind::DocumentAdditionOrUpdate, ) => { addition_ids.push(id); ControlFlow::Continue(BatchKind::SettingsAndDocumentAddition { @@ -248,10 +259,11 @@ impl BatchKind { } (_, Kind::CancelTask | Kind::DumpExport | Kind::Snapshot) => unreachable!(), ( - BatchKind::CreateIndex { .. } - | BatchKind::DeleteIndex { .. } - | BatchKind::SwapIndex { .. } - | BatchKind::RenameIndex { .. }, + BatchKind::IndexCreation { .. } + | BatchKind::IndexDeletion { .. } + | BatchKind::IndexUpdate { .. } + | BatchKind::IndexRename { .. } + | BatchKind::IndexSwap { .. }, _, ) => { unreachable!() diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index d555bee9d..73bc9a9a9 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -3,6 +3,7 @@ use crate::{ task::{KindWithContent, Status}, Error, IndexScheduler, Result, TaskId, }; +use index::{Settings, Unchecked}; use milli::{ heed::{RoTxn, RwTxn}, update::IndexDocumentsMethod, @@ -29,7 +30,7 @@ pub(crate) enum Batch { content_files: Vec, document_addition_tasks: Vec, - settings: Vec, + settings: Vec>, settings_tasks: Vec, }, } @@ -42,10 +43,10 @@ impl IndexScheduler { batch: BatchKind, ) -> Result> { match batch { - BatchKind::ClearAll { ids } => todo!(), + BatchKind::DocumentClear { ids } => todo!(), BatchKind::DocumentAddition { addition_ids } => todo!(), BatchKind::DocumentDeletion { deletion_ids } => todo!(), - BatchKind::ClearAllAndSettings { + BatchKind::ClearAndSettings { other, settings_ids, } => todo!(), @@ -73,13 +74,17 @@ impl IndexScheduler { .collect::>>()?; let primary_key = match &document_addition_tasks[0].kind { - KindWithContent::DocumentAddition { primary_key, .. } => primary_key.clone(), + KindWithContent::DocumentAdditionOrUpdate { primary_key, .. } => { + primary_key.clone() + } _ => unreachable!(), }; let content_files = document_addition_tasks .iter() .map(|task| match task.kind { - KindWithContent::DocumentAddition { content_file, .. } => content_file, + KindWithContent::DocumentAdditionOrUpdate { content_file, .. } => { + content_file + } _ => unreachable!(), }) .collect(); @@ -87,7 +92,7 @@ impl IndexScheduler { let settings = settings_tasks .iter() .map(|task| match &task.kind { - KindWithContent::Settings { new_settings, .. } => new_settings.to_string(), + KindWithContent::Settings { new_settings, .. } => new_settings.clone(), _ => unreachable!(), }) .collect(); @@ -102,10 +107,11 @@ impl IndexScheduler { })) } BatchKind::Settings { settings_ids } => todo!(), - BatchKind::DeleteIndex { ids } => todo!(), - BatchKind::CreateIndex { id } => todo!(), - BatchKind::SwapIndex { id } => todo!(), - BatchKind::RenameIndex { id } => todo!(), + BatchKind::IndexCreation { id } => todo!(), + BatchKind::IndexDeletion { ids } => todo!(), + BatchKind::IndexUpdate { id } => todo!(), + BatchKind::IndexSwap { id } => todo!(), + BatchKind::IndexRename { id } => todo!(), } } diff --git a/index-scheduler/src/task.rs b/index-scheduler/src/task.rs index 82c8ac46b..40197e81c 100644 --- a/index-scheduler/src/task.rs +++ b/index-scheduler/src/task.rs @@ -1,4 +1,6 @@ use anyhow::Result; +use index::{Settings, Unchecked}; +use milli::update::IndexDocumentsMethod; use serde::{Deserialize, Serialize}; use std::path::PathBuf; use time::OffsetDateTime; @@ -51,67 +53,73 @@ impl Task { } } -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub enum KindWithContent { - DumpExport { - output: PathBuf, - }, - Snapshot, - DocumentAddition { - index_name: String, + DocumentAdditionOrUpdate { + index_uid: String, + merge_strategy: IndexDocumentsMethod, primary_key: Option, content_file: Uuid, + documents_count: usize, + allow_index_creation: bool, }, DocumentDeletion { - index_name: String, + index_uid: String, documents_ids: Vec, }, - ClearAllDocuments { - index_name: String, + DocumentClear { + index_uid: String, }, Settings { - index_name: String, - // TODO: TAMO: fix the type - new_settings: String, + index_uid: String, + new_settings: Settings, + is_deletion: bool, + allow_index_creation: bool, }, - RenameIndex { - index_name: String, - new_name: String, + IndexDeletion { + index_uid: String, }, - CreateIndex { - index_name: String, + IndexCreation { + index_uid: String, primary_key: Option, }, - DeleteIndex { - index_name: String, + IndexUpdate { + index_uid: String, + primary_key: Option, }, - SwapIndex { + IndexRename { + index_uid: String, + new_name: String, + }, + IndexSwap { lhs: String, rhs: String, }, CancelTask { tasks: Vec, }, + DumpExport { + output: PathBuf, + }, + Snapshot, } impl KindWithContent { pub fn as_kind(&self) -> Kind { match self { - KindWithContent::DumpExport { .. } => Kind::DumpExport, - KindWithContent::DocumentAddition { .. } => Kind::DocumentAddition, + KindWithContent::DocumentAdditionOrUpdate { .. } => Kind::DocumentAdditionOrUpdate, KindWithContent::DocumentDeletion { .. } => Kind::DocumentDeletion, - KindWithContent::ClearAllDocuments { .. } => Kind::ClearAllDocuments, - KindWithContent::RenameIndex { .. } => Kind::RenameIndex, - KindWithContent::CreateIndex { .. } => Kind::CreateIndex, - KindWithContent::DeleteIndex { .. } => Kind::DeleteIndex, - KindWithContent::SwapIndex { .. } => Kind::SwapIndex, + KindWithContent::DocumentClear { .. } => Kind::DocumentClear, + KindWithContent::Settings { .. } => Kind::Settings, + KindWithContent::IndexCreation { .. } => Kind::IndexCreation, + KindWithContent::IndexDeletion { .. } => Kind::IndexDeletion, + KindWithContent::IndexUpdate { .. } => Kind::IndexUpdate, + KindWithContent::IndexRename { .. } => Kind::IndexRename, + KindWithContent::IndexSwap { .. } => Kind::IndexSwap, KindWithContent::CancelTask { .. } => Kind::CancelTask, + KindWithContent::DumpExport { .. } => Kind::DumpExport, KindWithContent::Snapshot => Kind::Snapshot, - KindWithContent::Settings { - index_name, - new_settings, - } => todo!(), } } @@ -119,26 +127,22 @@ impl KindWithContent { use KindWithContent::*; match self { - DocumentAddition { - index_name: _, - content_file: _, - primary_key, - } => { + DocumentAdditionOrUpdate { .. } => { // TODO: TAMO: persist the file // content_file.persist(); Ok(()) } - // There is nothing to persist for all these tasks - DumpExport { .. } + DocumentDeletion { .. } + | DocumentClear { .. } | Settings { .. } - | DocumentDeletion { .. } - | ClearAllDocuments { .. } - | RenameIndex { .. } - | CreateIndex { .. } - | DeleteIndex { .. } - | SwapIndex { .. } + | IndexCreation { .. } + | IndexDeletion { .. } + | IndexUpdate { .. } + | IndexRename { .. } + | IndexSwap { .. } | CancelTask { .. } - | Snapshot => Ok(()), + | DumpExport { .. } + | Snapshot => Ok(()), // There is nothing to persist for all these tasks } } @@ -146,26 +150,23 @@ impl KindWithContent { use KindWithContent::*; match self { - DocumentAddition { - index_name: _, - content_file: _, - primary_key, - } => { + DocumentAdditionOrUpdate { .. } => { // TODO: TAMO: delete the file // content_file.delete(); Ok(()) } - // There is no data associated with all these tasks - DumpExport { .. } - | Settings { .. } + DocumentAdditionOrUpdate { .. } + | IndexCreation { .. } | DocumentDeletion { .. } - | ClearAllDocuments { .. } - | RenameIndex { .. } - | CreateIndex { .. } - | DeleteIndex { .. } - | SwapIndex { .. } + | DocumentClear { .. } + | Settings { .. } + | IndexDeletion { .. } + | IndexUpdate { .. } + | IndexRename { .. } + | IndexSwap { .. } | CancelTask { .. } - | Snapshot => Ok(()), + | DumpExport { .. } + | Snapshot => Ok(()), // There is no data associated with all these tasks } } @@ -174,17 +175,18 @@ impl KindWithContent { match self { DumpExport { .. } | Snapshot | CancelTask { .. } => None, - DocumentAddition { index_name, .. } - | DocumentDeletion { index_name, .. } - | ClearAllDocuments { index_name } - | CreateIndex { index_name, .. } - | DeleteIndex { index_name } => Some(vec![index_name]), - RenameIndex { - index_name: lhs, + DocumentAdditionOrUpdate { index_uid, .. } + | DocumentDeletion { index_uid, .. } + | DocumentClear { index_uid } + | Settings { index_uid, .. } + | IndexCreation { index_uid, .. } + | IndexUpdate { index_uid, .. } + | IndexDeletion { index_uid } => Some(vec![index_uid]), + IndexRename { + index_uid: lhs, new_name: rhs, } - | SwapIndex { lhs, rhs } => Some(vec![lhs, rhs]), - Settings { index_name, .. } => Some(vec![index_name]), + | IndexSwap { lhs, rhs } => Some(vec![lhs, rhs]), } } } @@ -192,15 +194,16 @@ impl KindWithContent { #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub enum Kind { - CancelTask, - ClearAllDocuments, - CreateIndex, - DeleteIndex, - DocumentAddition, + DocumentAdditionOrUpdate, DocumentDeletion, - DumpExport, - RenameIndex, + DocumentClear, Settings, + IndexCreation, + IndexDeletion, + IndexUpdate, + IndexRename, + IndexSwap, + CancelTask, + DumpExport, Snapshot, - SwapIndex, }