starts importing the real tasks

This commit is contained in:
Tamo 2022-09-13 22:38:43 +02:00 committed by Clément Renault
parent 5cc8f96237
commit b7c5b71a53
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
3 changed files with 158 additions and 137 deletions

View File

@ -3,7 +3,7 @@ use std::ops::ControlFlow;
use crate::{task::Kind, TaskId}; use crate::{task::Kind, TaskId};
pub enum BatchKind { pub enum BatchKind {
ClearAll { DocumentClear {
ids: Vec<TaskId>, ids: Vec<TaskId>,
}, },
DocumentAddition { DocumentAddition {
@ -12,7 +12,7 @@ pub enum BatchKind {
DocumentDeletion { DocumentDeletion {
deletion_ids: Vec<TaskId>, deletion_ids: Vec<TaskId>,
}, },
ClearAllAndSettings { ClearAndSettings {
other: Vec<TaskId>, other: Vec<TaskId>,
settings_ids: Vec<TaskId>, settings_ids: Vec<TaskId>,
}, },
@ -23,16 +23,19 @@ pub enum BatchKind {
Settings { Settings {
settings_ids: Vec<TaskId>, settings_ids: Vec<TaskId>,
}, },
DeleteIndex { IndexDeletion {
ids: Vec<TaskId>, ids: Vec<TaskId>,
}, },
CreateIndex { IndexCreation {
id: TaskId, id: TaskId,
}, },
SwapIndex { IndexUpdate {
id: TaskId, id: TaskId,
}, },
RenameIndex { IndexRename {
id: TaskId,
},
IndexSwap {
id: TaskId, id: TaskId,
}, },
} }
@ -41,12 +44,13 @@ impl BatchKind {
/// return true if you must stop right there. /// return true if you must stop right there.
pub fn new(task_id: TaskId, kind: Kind) -> (Self, bool) { pub fn new(task_id: TaskId, kind: Kind) -> (Self, bool) {
match kind { match kind {
Kind::CreateIndex => (BatchKind::CreateIndex { id: task_id }, true), Kind::IndexCreation => (BatchKind::IndexCreation { id: task_id }, true),
Kind::DeleteIndex => (BatchKind::DeleteIndex { ids: vec![task_id] }, true), Kind::IndexDeletion => (BatchKind::IndexDeletion { ids: vec![task_id] }, true),
Kind::RenameIndex => (BatchKind::RenameIndex { id: task_id }, true), Kind::IndexUpdate => (BatchKind::IndexUpdate { id: task_id }, true),
Kind::SwapIndex => (BatchKind::SwapIndex { id: task_id }, true), Kind::IndexRename => (BatchKind::IndexRename { id: task_id }, true),
Kind::ClearAllDocuments => (BatchKind::ClearAll { ids: vec![task_id] }, false), Kind::IndexSwap => (BatchKind::IndexSwap { id: task_id }, true),
Kind::DocumentAddition => ( Kind::DocumentClear => (BatchKind::DocumentClear { ids: vec![task_id] }, false),
Kind::DocumentAdditionOrUpdate => (
BatchKind::DocumentAddition { BatchKind::DocumentAddition {
addition_ids: vec![task_id], addition_ids: vec![task_id],
}, },
@ -73,12 +77,13 @@ impl BatchKind {
fn accumulate(mut self, id: TaskId, kind: Kind) -> ControlFlow<Self, Self> { fn accumulate(mut self, id: TaskId, kind: Kind) -> ControlFlow<Self, Self> {
match (self, kind) { match (self, kind) {
// We don't batch any of these operations // We don't batch any of these operations
(this, Kind::CreateIndex | Kind::RenameIndex | Kind::SwapIndex) => { (
ControlFlow::Break(this) this,
} Kind::IndexCreation | Kind::IndexRename | Kind::IndexUpdate | Kind::IndexSwap,
) => ControlFlow::Break(this),
// The index deletion can batch with everything but must stop after // The index deletion can batch with everything but must stop after
( (
BatchKind::ClearAll { mut ids } BatchKind::DocumentClear { mut ids }
| BatchKind::DocumentAddition { | BatchKind::DocumentAddition {
addition_ids: mut ids, addition_ids: mut ids,
} }
@ -88,13 +93,13 @@ impl BatchKind {
| BatchKind::Settings { | BatchKind::Settings {
settings_ids: mut ids, settings_ids: mut ids,
}, },
Kind::DeleteIndex, Kind::IndexDeletion,
) => { ) => {
ids.push(id); ids.push(id);
ControlFlow::Break(BatchKind::DeleteIndex { ids }) ControlFlow::Break(BatchKind::IndexDeletion { ids })
} }
( (
BatchKind::ClearAllAndSettings { BatchKind::ClearAndSettings {
settings_ids: mut ids, settings_ids: mut ids,
mut other, mut other,
} }
@ -102,26 +107,30 @@ impl BatchKind {
addition_ids: mut ids, addition_ids: mut ids,
settings_ids: mut other, settings_ids: mut other,
}, },
Kind::DeleteIndex, Kind::IndexDeletion,
) => { ) => {
ids.push(id); ids.push(id);
ids.append(&mut other); ids.append(&mut other);
ControlFlow::Break(BatchKind::DeleteIndex { ids }) ControlFlow::Break(BatchKind::IndexDeletion { ids })
} }
(BatchKind::ClearAll { mut ids }, Kind::ClearAllDocuments | Kind::DocumentDeletion) => { (
BatchKind::DocumentClear { mut ids },
Kind::DocumentClear | Kind::DocumentDeletion,
) => {
ids.push(id); ids.push(id);
ControlFlow::Continue(BatchKind::ClearAll { ids }) ControlFlow::Continue(BatchKind::DocumentClear { ids })
} }
(this @ BatchKind::ClearAll { .. }, Kind::DocumentAddition | Kind::Settings) => { (
ControlFlow::Break(this) this @ BatchKind::DocumentClear { .. },
} Kind::DocumentAdditionOrUpdate | Kind::Settings,
(BatchKind::DocumentAddition { mut addition_ids }, Kind::ClearAllDocuments) => { ) => ControlFlow::Break(this),
(BatchKind::DocumentAddition { mut addition_ids }, Kind::DocumentClear) => {
addition_ids.push(id); addition_ids.push(id);
ControlFlow::Continue(BatchKind::ClearAll { ids: addition_ids }) ControlFlow::Continue(BatchKind::DocumentClear { ids: addition_ids })
} }
(BatchKind::DocumentAddition { mut addition_ids }, Kind::DocumentAddition) => { (BatchKind::DocumentAddition { mut addition_ids }, Kind::DocumentAdditionOrUpdate) => {
addition_ids.push(id); addition_ids.push(id);
ControlFlow::Continue(BatchKind::DocumentAddition { addition_ids }) ControlFlow::Continue(BatchKind::DocumentAddition { addition_ids })
} }
@ -135,11 +144,11 @@ impl BatchKind {
}) })
} }
(BatchKind::DocumentDeletion { mut deletion_ids }, Kind::ClearAllDocuments) => { (BatchKind::DocumentDeletion { mut deletion_ids }, Kind::DocumentClear) => {
deletion_ids.push(id); deletion_ids.push(id);
ControlFlow::Continue(BatchKind::ClearAll { ids: deletion_ids }) ControlFlow::Continue(BatchKind::DocumentClear { ids: deletion_ids })
} }
(this @ BatchKind::DocumentDeletion { .. }, Kind::DocumentAddition) => { (this @ BatchKind::DocumentDeletion { .. }, Kind::DocumentAdditionOrUpdate) => {
ControlFlow::Break(this) ControlFlow::Break(this)
} }
(BatchKind::DocumentDeletion { mut deletion_ids }, Kind::DocumentDeletion) => { (BatchKind::DocumentDeletion { mut deletion_ids }, Kind::DocumentDeletion) => {
@ -148,13 +157,15 @@ impl BatchKind {
} }
(this @ BatchKind::DocumentDeletion { .. }, Kind::Settings) => ControlFlow::Break(this), (this @ BatchKind::DocumentDeletion { .. }, Kind::Settings) => ControlFlow::Break(this),
(BatchKind::Settings { settings_ids }, Kind::ClearAllDocuments) => { (BatchKind::Settings { settings_ids }, Kind::DocumentClear) => {
ControlFlow::Continue(BatchKind::ClearAllAndSettings { ControlFlow::Continue(BatchKind::ClearAndSettings {
settings_ids: settings_ids.clone(), settings_ids: settings_ids.clone(),
other: vec![id], other: vec![id],
}) })
} }
(this @ BatchKind::Settings { .. }, Kind::DocumentAddition) => ControlFlow::Break(this), (this @ BatchKind::Settings { .. }, Kind::DocumentAdditionOrUpdate) => {
ControlFlow::Break(this)
}
(this @ BatchKind::Settings { .. }, Kind::DocumentDeletion) => ControlFlow::Break(this), (this @ BatchKind::Settings { .. }, Kind::DocumentDeletion) => ControlFlow::Break(this),
(BatchKind::Settings { mut settings_ids }, Kind::Settings) => { (BatchKind::Settings { mut settings_ids }, Kind::Settings) => {
settings_ids.push(id); settings_ids.push(id);
@ -162,43 +173,43 @@ impl BatchKind {
} }
( (
BatchKind::ClearAllAndSettings { BatchKind::ClearAndSettings {
mut other, mut other,
settings_ids, settings_ids,
}, },
Kind::ClearAllDocuments, Kind::DocumentClear,
) => { ) => {
other.push(id); other.push(id);
ControlFlow::Continue(BatchKind::ClearAllAndSettings { ControlFlow::Continue(BatchKind::ClearAndSettings {
other, other,
settings_ids, settings_ids,
}) })
} }
(this @ BatchKind::ClearAllAndSettings { .. }, Kind::DocumentAddition) => { (this @ BatchKind::ClearAndSettings { .. }, Kind::DocumentAdditionOrUpdate) => {
ControlFlow::Break(this) ControlFlow::Break(this)
} }
( (
BatchKind::ClearAllAndSettings { BatchKind::ClearAndSettings {
mut other, mut other,
settings_ids, settings_ids,
}, },
Kind::DocumentDeletion, Kind::DocumentDeletion,
) => { ) => {
other.push(id); other.push(id);
ControlFlow::Continue(BatchKind::ClearAllAndSettings { ControlFlow::Continue(BatchKind::ClearAndSettings {
other, other,
settings_ids, settings_ids,
}) })
} }
( (
BatchKind::ClearAllAndSettings { BatchKind::ClearAndSettings {
mut settings_ids, mut settings_ids,
other, other,
}, },
Kind::Settings, Kind::Settings,
) => { ) => {
settings_ids.push(id); settings_ids.push(id);
ControlFlow::Continue(BatchKind::ClearAllAndSettings { ControlFlow::Continue(BatchKind::ClearAndSettings {
other, other,
settings_ids, settings_ids,
}) })
@ -208,11 +219,11 @@ impl BatchKind {
settings_ids, settings_ids,
mut addition_ids, mut addition_ids,
}, },
Kind::ClearAllDocuments, Kind::DocumentClear,
) => { ) => {
addition_ids.push(id); addition_ids.push(id);
ControlFlow::Continue(BatchKind::ClearAllAndSettings { ControlFlow::Continue(BatchKind::ClearAndSettings {
settings_ids, settings_ids,
other: addition_ids, other: addition_ids,
}) })
@ -222,7 +233,7 @@ impl BatchKind {
mut addition_ids, mut addition_ids,
settings_ids, settings_ids,
}, },
Kind::DocumentAddition, Kind::DocumentAdditionOrUpdate,
) => { ) => {
addition_ids.push(id); addition_ids.push(id);
ControlFlow::Continue(BatchKind::SettingsAndDocumentAddition { ControlFlow::Continue(BatchKind::SettingsAndDocumentAddition {
@ -248,10 +259,11 @@ impl BatchKind {
} }
(_, Kind::CancelTask | Kind::DumpExport | Kind::Snapshot) => unreachable!(), (_, Kind::CancelTask | Kind::DumpExport | Kind::Snapshot) => unreachable!(),
( (
BatchKind::CreateIndex { .. } BatchKind::IndexCreation { .. }
| BatchKind::DeleteIndex { .. } | BatchKind::IndexDeletion { .. }
| BatchKind::SwapIndex { .. } | BatchKind::IndexUpdate { .. }
| BatchKind::RenameIndex { .. }, | BatchKind::IndexRename { .. }
| BatchKind::IndexSwap { .. },
_, _,
) => { ) => {
unreachable!() unreachable!()

View File

@ -3,6 +3,7 @@ use crate::{
task::{KindWithContent, Status}, task::{KindWithContent, Status},
Error, IndexScheduler, Result, TaskId, Error, IndexScheduler, Result, TaskId,
}; };
use index::{Settings, Unchecked};
use milli::{ use milli::{
heed::{RoTxn, RwTxn}, heed::{RoTxn, RwTxn},
update::IndexDocumentsMethod, update::IndexDocumentsMethod,
@ -29,7 +30,7 @@ pub(crate) enum Batch {
content_files: Vec<Uuid>, content_files: Vec<Uuid>,
document_addition_tasks: Vec<Task>, document_addition_tasks: Vec<Task>,
settings: Vec<String>, settings: Vec<Settings<Unchecked>>,
settings_tasks: Vec<Task>, settings_tasks: Vec<Task>,
}, },
} }
@ -42,10 +43,10 @@ impl IndexScheduler {
batch: BatchKind, batch: BatchKind,
) -> Result<Option<Batch>> { ) -> Result<Option<Batch>> {
match batch { match batch {
BatchKind::ClearAll { ids } => todo!(), BatchKind::DocumentClear { ids } => todo!(),
BatchKind::DocumentAddition { addition_ids } => todo!(), BatchKind::DocumentAddition { addition_ids } => todo!(),
BatchKind::DocumentDeletion { deletion_ids } => todo!(), BatchKind::DocumentDeletion { deletion_ids } => todo!(),
BatchKind::ClearAllAndSettings { BatchKind::ClearAndSettings {
other, other,
settings_ids, settings_ids,
} => todo!(), } => todo!(),
@ -73,13 +74,17 @@ impl IndexScheduler {
.collect::<Result<Vec<_>>>()?; .collect::<Result<Vec<_>>>()?;
let primary_key = match &document_addition_tasks[0].kind { let primary_key = match &document_addition_tasks[0].kind {
KindWithContent::DocumentAddition { primary_key, .. } => primary_key.clone(), KindWithContent::DocumentAdditionOrUpdate { primary_key, .. } => {
primary_key.clone()
}
_ => unreachable!(), _ => unreachable!(),
}; };
let content_files = document_addition_tasks let content_files = document_addition_tasks
.iter() .iter()
.map(|task| match task.kind { .map(|task| match task.kind {
KindWithContent::DocumentAddition { content_file, .. } => content_file, KindWithContent::DocumentAdditionOrUpdate { content_file, .. } => {
content_file
}
_ => unreachable!(), _ => unreachable!(),
}) })
.collect(); .collect();
@ -87,7 +92,7 @@ impl IndexScheduler {
let settings = settings_tasks let settings = settings_tasks
.iter() .iter()
.map(|task| match &task.kind { .map(|task| match &task.kind {
KindWithContent::Settings { new_settings, .. } => new_settings.to_string(), KindWithContent::Settings { new_settings, .. } => new_settings.clone(),
_ => unreachable!(), _ => unreachable!(),
}) })
.collect(); .collect();
@ -102,10 +107,11 @@ impl IndexScheduler {
})) }))
} }
BatchKind::Settings { settings_ids } => todo!(), BatchKind::Settings { settings_ids } => todo!(),
BatchKind::DeleteIndex { ids } => todo!(), BatchKind::IndexCreation { id } => todo!(),
BatchKind::CreateIndex { id } => todo!(), BatchKind::IndexDeletion { ids } => todo!(),
BatchKind::SwapIndex { id } => todo!(), BatchKind::IndexUpdate { id } => todo!(),
BatchKind::RenameIndex { id } => todo!(), BatchKind::IndexSwap { id } => todo!(),
BatchKind::IndexRename { id } => todo!(),
} }
} }

View File

@ -1,4 +1,6 @@
use anyhow::Result; use anyhow::Result;
use index::{Settings, Unchecked};
use milli::update::IndexDocumentsMethod;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::path::PathBuf; use std::path::PathBuf;
use time::OffsetDateTime; use time::OffsetDateTime;
@ -51,67 +53,73 @@ impl Task {
} }
} }
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub enum KindWithContent { pub enum KindWithContent {
DumpExport { DocumentAdditionOrUpdate {
output: PathBuf, index_uid: String,
}, merge_strategy: IndexDocumentsMethod,
Snapshot,
DocumentAddition {
index_name: String,
primary_key: Option<String>, primary_key: Option<String>,
content_file: Uuid, content_file: Uuid,
documents_count: usize,
allow_index_creation: bool,
}, },
DocumentDeletion { DocumentDeletion {
index_name: String, index_uid: String,
documents_ids: Vec<String>, documents_ids: Vec<String>,
}, },
ClearAllDocuments { DocumentClear {
index_name: String, index_uid: String,
}, },
Settings { Settings {
index_name: String, index_uid: String,
// TODO: TAMO: fix the type new_settings: Settings<Unchecked>,
new_settings: String, is_deletion: bool,
allow_index_creation: bool,
}, },
RenameIndex { IndexDeletion {
index_name: String, index_uid: String,
new_name: String,
}, },
CreateIndex { IndexCreation {
index_name: String, index_uid: String,
primary_key: Option<String>, primary_key: Option<String>,
}, },
DeleteIndex { IndexUpdate {
index_name: String, index_uid: String,
primary_key: Option<String>,
}, },
SwapIndex { IndexRename {
index_uid: String,
new_name: String,
},
IndexSwap {
lhs: String, lhs: String,
rhs: String, rhs: String,
}, },
CancelTask { CancelTask {
tasks: Vec<TaskId>, tasks: Vec<TaskId>,
}, },
DumpExport {
output: PathBuf,
},
Snapshot,
} }
impl KindWithContent { impl KindWithContent {
pub fn as_kind(&self) -> Kind { pub fn as_kind(&self) -> Kind {
match self { match self {
KindWithContent::DumpExport { .. } => Kind::DumpExport, KindWithContent::DocumentAdditionOrUpdate { .. } => Kind::DocumentAdditionOrUpdate,
KindWithContent::DocumentAddition { .. } => Kind::DocumentAddition,
KindWithContent::DocumentDeletion { .. } => Kind::DocumentDeletion, KindWithContent::DocumentDeletion { .. } => Kind::DocumentDeletion,
KindWithContent::ClearAllDocuments { .. } => Kind::ClearAllDocuments, KindWithContent::DocumentClear { .. } => Kind::DocumentClear,
KindWithContent::RenameIndex { .. } => Kind::RenameIndex, KindWithContent::Settings { .. } => Kind::Settings,
KindWithContent::CreateIndex { .. } => Kind::CreateIndex, KindWithContent::IndexCreation { .. } => Kind::IndexCreation,
KindWithContent::DeleteIndex { .. } => Kind::DeleteIndex, KindWithContent::IndexDeletion { .. } => Kind::IndexDeletion,
KindWithContent::SwapIndex { .. } => Kind::SwapIndex, KindWithContent::IndexUpdate { .. } => Kind::IndexUpdate,
KindWithContent::IndexRename { .. } => Kind::IndexRename,
KindWithContent::IndexSwap { .. } => Kind::IndexSwap,
KindWithContent::CancelTask { .. } => Kind::CancelTask, KindWithContent::CancelTask { .. } => Kind::CancelTask,
KindWithContent::DumpExport { .. } => Kind::DumpExport,
KindWithContent::Snapshot => Kind::Snapshot, KindWithContent::Snapshot => Kind::Snapshot,
KindWithContent::Settings {
index_name,
new_settings,
} => todo!(),
} }
} }
@ -119,26 +127,22 @@ impl KindWithContent {
use KindWithContent::*; use KindWithContent::*;
match self { match self {
DocumentAddition { DocumentAdditionOrUpdate { .. } => {
index_name: _,
content_file: _,
primary_key,
} => {
// TODO: TAMO: persist the file // TODO: TAMO: persist the file
// content_file.persist(); // content_file.persist();
Ok(()) Ok(())
} }
// There is nothing to persist for all these tasks DocumentDeletion { .. }
DumpExport { .. } | DocumentClear { .. }
| Settings { .. } | Settings { .. }
| DocumentDeletion { .. } | IndexCreation { .. }
| ClearAllDocuments { .. } | IndexDeletion { .. }
| RenameIndex { .. } | IndexUpdate { .. }
| CreateIndex { .. } | IndexRename { .. }
| DeleteIndex { .. } | IndexSwap { .. }
| SwapIndex { .. }
| CancelTask { .. } | CancelTask { .. }
| Snapshot => Ok(()), | DumpExport { .. }
| Snapshot => Ok(()), // There is nothing to persist for all these tasks
} }
} }
@ -146,26 +150,23 @@ impl KindWithContent {
use KindWithContent::*; use KindWithContent::*;
match self { match self {
DocumentAddition { DocumentAdditionOrUpdate { .. } => {
index_name: _,
content_file: _,
primary_key,
} => {
// TODO: TAMO: delete the file // TODO: TAMO: delete the file
// content_file.delete(); // content_file.delete();
Ok(()) Ok(())
} }
// There is no data associated with all these tasks DocumentAdditionOrUpdate { .. }
DumpExport { .. } | IndexCreation { .. }
| Settings { .. }
| DocumentDeletion { .. } | DocumentDeletion { .. }
| ClearAllDocuments { .. } | DocumentClear { .. }
| RenameIndex { .. } | Settings { .. }
| CreateIndex { .. } | IndexDeletion { .. }
| DeleteIndex { .. } | IndexUpdate { .. }
| SwapIndex { .. } | IndexRename { .. }
| IndexSwap { .. }
| CancelTask { .. } | CancelTask { .. }
| Snapshot => Ok(()), | DumpExport { .. }
| Snapshot => Ok(()), // There is no data associated with all these tasks
} }
} }
@ -174,17 +175,18 @@ impl KindWithContent {
match self { match self {
DumpExport { .. } | Snapshot | CancelTask { .. } => None, DumpExport { .. } | Snapshot | CancelTask { .. } => None,
DocumentAddition { index_name, .. } DocumentAdditionOrUpdate { index_uid, .. }
| DocumentDeletion { index_name, .. } | DocumentDeletion { index_uid, .. }
| ClearAllDocuments { index_name } | DocumentClear { index_uid }
| CreateIndex { index_name, .. } | Settings { index_uid, .. }
| DeleteIndex { index_name } => Some(vec![index_name]), | IndexCreation { index_uid, .. }
RenameIndex { | IndexUpdate { index_uid, .. }
index_name: lhs, | IndexDeletion { index_uid } => Some(vec![index_uid]),
IndexRename {
index_uid: lhs,
new_name: rhs, new_name: rhs,
} }
| SwapIndex { lhs, rhs } => Some(vec![lhs, rhs]), | IndexSwap { lhs, rhs } => Some(vec![lhs, rhs]),
Settings { index_name, .. } => Some(vec![index_name]),
} }
} }
} }
@ -192,15 +194,16 @@ impl KindWithContent {
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub enum Kind { pub enum Kind {
CancelTask, DocumentAdditionOrUpdate,
ClearAllDocuments,
CreateIndex,
DeleteIndex,
DocumentAddition,
DocumentDeletion, DocumentDeletion,
DumpExport, DocumentClear,
RenameIndex,
Settings, Settings,
IndexCreation,
IndexDeletion,
IndexUpdate,
IndexRename,
IndexSwap,
CancelTask,
DumpExport,
Snapshot, Snapshot,
SwapIndex,
} }