starts importing the real tasks

This commit is contained in:
Tamo 2022-09-13 22:38:43 +02:00 committed by Clément Renault
parent bbb50d1b96
commit 00f13f45b6
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
3 changed files with 158 additions and 137 deletions

View File

@ -3,7 +3,7 @@ use std::ops::ControlFlow;
use crate::{task::Kind, TaskId};
pub enum BatchKind {
ClearAll {
DocumentClear {
ids: Vec<TaskId>,
},
DocumentAddition {
@ -12,7 +12,7 @@ pub enum BatchKind {
DocumentDeletion {
deletion_ids: Vec<TaskId>,
},
ClearAllAndSettings {
ClearAndSettings {
other: Vec<TaskId>,
settings_ids: Vec<TaskId>,
},
@ -23,16 +23,19 @@ pub enum BatchKind {
Settings {
settings_ids: Vec<TaskId>,
},
DeleteIndex {
IndexDeletion {
ids: Vec<TaskId>,
},
CreateIndex {
IndexCreation {
id: TaskId,
},
SwapIndex {
IndexUpdate {
id: TaskId,
},
RenameIndex {
IndexRename {
id: TaskId,
},
IndexSwap {
id: TaskId,
},
}
@ -41,12 +44,13 @@ impl BatchKind {
/// return true if you must stop right there.
pub fn new(task_id: TaskId, kind: Kind) -> (Self, bool) {
match kind {
Kind::CreateIndex => (BatchKind::CreateIndex { id: task_id }, true),
Kind::DeleteIndex => (BatchKind::DeleteIndex { ids: vec![task_id] }, true),
Kind::RenameIndex => (BatchKind::RenameIndex { id: task_id }, true),
Kind::SwapIndex => (BatchKind::SwapIndex { id: task_id }, true),
Kind::ClearAllDocuments => (BatchKind::ClearAll { ids: vec![task_id] }, false),
Kind::DocumentAddition => (
Kind::IndexCreation => (BatchKind::IndexCreation { id: task_id }, true),
Kind::IndexDeletion => (BatchKind::IndexDeletion { ids: vec![task_id] }, true),
Kind::IndexUpdate => (BatchKind::IndexUpdate { id: task_id }, true),
Kind::IndexRename => (BatchKind::IndexRename { id: task_id }, true),
Kind::IndexSwap => (BatchKind::IndexSwap { id: task_id }, true),
Kind::DocumentClear => (BatchKind::DocumentClear { ids: vec![task_id] }, false),
Kind::DocumentAdditionOrUpdate => (
BatchKind::DocumentAddition {
addition_ids: vec![task_id],
},
@ -73,12 +77,13 @@ impl BatchKind {
fn accumulate(mut self, id: TaskId, kind: Kind) -> ControlFlow<Self, Self> {
match (self, kind) {
// We don't batch any of these operations
(this, Kind::CreateIndex | Kind::RenameIndex | Kind::SwapIndex) => {
ControlFlow::Break(this)
}
(
this,
Kind::IndexCreation | Kind::IndexRename | Kind::IndexUpdate | Kind::IndexSwap,
) => ControlFlow::Break(this),
// The index deletion can batch with everything but must stop after
(
BatchKind::ClearAll { mut ids }
BatchKind::DocumentClear { mut ids }
| BatchKind::DocumentAddition {
addition_ids: mut ids,
}
@ -88,13 +93,13 @@ impl BatchKind {
| BatchKind::Settings {
settings_ids: mut ids,
},
Kind::DeleteIndex,
Kind::IndexDeletion,
) => {
ids.push(id);
ControlFlow::Break(BatchKind::DeleteIndex { ids })
ControlFlow::Break(BatchKind::IndexDeletion { ids })
}
(
BatchKind::ClearAllAndSettings {
BatchKind::ClearAndSettings {
settings_ids: mut ids,
mut other,
}
@ -102,26 +107,30 @@ impl BatchKind {
addition_ids: mut ids,
settings_ids: mut other,
},
Kind::DeleteIndex,
Kind::IndexDeletion,
) => {
ids.push(id);
ids.append(&mut other);
ControlFlow::Break(BatchKind::DeleteIndex { ids })
ControlFlow::Break(BatchKind::IndexDeletion { ids })
}
(BatchKind::ClearAll { mut ids }, Kind::ClearAllDocuments | Kind::DocumentDeletion) => {
(
BatchKind::DocumentClear { mut ids },
Kind::DocumentClear | Kind::DocumentDeletion,
) => {
ids.push(id);
ControlFlow::Continue(BatchKind::ClearAll { ids })
ControlFlow::Continue(BatchKind::DocumentClear { ids })
}
(this @ BatchKind::ClearAll { .. }, Kind::DocumentAddition | Kind::Settings) => {
ControlFlow::Break(this)
}
(BatchKind::DocumentAddition { mut addition_ids }, Kind::ClearAllDocuments) => {
(
this @ BatchKind::DocumentClear { .. },
Kind::DocumentAdditionOrUpdate | Kind::Settings,
) => ControlFlow::Break(this),
(BatchKind::DocumentAddition { mut addition_ids }, Kind::DocumentClear) => {
addition_ids.push(id);
ControlFlow::Continue(BatchKind::ClearAll { ids: addition_ids })
ControlFlow::Continue(BatchKind::DocumentClear { ids: addition_ids })
}
(BatchKind::DocumentAddition { mut addition_ids }, Kind::DocumentAddition) => {
(BatchKind::DocumentAddition { mut addition_ids }, Kind::DocumentAdditionOrUpdate) => {
addition_ids.push(id);
ControlFlow::Continue(BatchKind::DocumentAddition { addition_ids })
}
@ -135,11 +144,11 @@ impl BatchKind {
})
}
(BatchKind::DocumentDeletion { mut deletion_ids }, Kind::ClearAllDocuments) => {
(BatchKind::DocumentDeletion { mut deletion_ids }, Kind::DocumentClear) => {
deletion_ids.push(id);
ControlFlow::Continue(BatchKind::ClearAll { ids: deletion_ids })
ControlFlow::Continue(BatchKind::DocumentClear { ids: deletion_ids })
}
(this @ BatchKind::DocumentDeletion { .. }, Kind::DocumentAddition) => {
(this @ BatchKind::DocumentDeletion { .. }, Kind::DocumentAdditionOrUpdate) => {
ControlFlow::Break(this)
}
(BatchKind::DocumentDeletion { mut deletion_ids }, Kind::DocumentDeletion) => {
@ -148,13 +157,15 @@ impl BatchKind {
}
(this @ BatchKind::DocumentDeletion { .. }, Kind::Settings) => ControlFlow::Break(this),
(BatchKind::Settings { settings_ids }, Kind::ClearAllDocuments) => {
ControlFlow::Continue(BatchKind::ClearAllAndSettings {
(BatchKind::Settings { settings_ids }, Kind::DocumentClear) => {
ControlFlow::Continue(BatchKind::ClearAndSettings {
settings_ids: settings_ids.clone(),
other: vec![id],
})
}
(this @ BatchKind::Settings { .. }, Kind::DocumentAddition) => ControlFlow::Break(this),
(this @ BatchKind::Settings { .. }, Kind::DocumentAdditionOrUpdate) => {
ControlFlow::Break(this)
}
(this @ BatchKind::Settings { .. }, Kind::DocumentDeletion) => ControlFlow::Break(this),
(BatchKind::Settings { mut settings_ids }, Kind::Settings) => {
settings_ids.push(id);
@ -162,43 +173,43 @@ impl BatchKind {
}
(
BatchKind::ClearAllAndSettings {
BatchKind::ClearAndSettings {
mut other,
settings_ids,
},
Kind::ClearAllDocuments,
Kind::DocumentClear,
) => {
other.push(id);
ControlFlow::Continue(BatchKind::ClearAllAndSettings {
ControlFlow::Continue(BatchKind::ClearAndSettings {
other,
settings_ids,
})
}
(this @ BatchKind::ClearAllAndSettings { .. }, Kind::DocumentAddition) => {
(this @ BatchKind::ClearAndSettings { .. }, Kind::DocumentAdditionOrUpdate) => {
ControlFlow::Break(this)
}
(
BatchKind::ClearAllAndSettings {
BatchKind::ClearAndSettings {
mut other,
settings_ids,
},
Kind::DocumentDeletion,
) => {
other.push(id);
ControlFlow::Continue(BatchKind::ClearAllAndSettings {
ControlFlow::Continue(BatchKind::ClearAndSettings {
other,
settings_ids,
})
}
(
BatchKind::ClearAllAndSettings {
BatchKind::ClearAndSettings {
mut settings_ids,
other,
},
Kind::Settings,
) => {
settings_ids.push(id);
ControlFlow::Continue(BatchKind::ClearAllAndSettings {
ControlFlow::Continue(BatchKind::ClearAndSettings {
other,
settings_ids,
})
@ -208,11 +219,11 @@ impl BatchKind {
settings_ids,
mut addition_ids,
},
Kind::ClearAllDocuments,
Kind::DocumentClear,
) => {
addition_ids.push(id);
ControlFlow::Continue(BatchKind::ClearAllAndSettings {
ControlFlow::Continue(BatchKind::ClearAndSettings {
settings_ids,
other: addition_ids,
})
@ -222,7 +233,7 @@ impl BatchKind {
mut addition_ids,
settings_ids,
},
Kind::DocumentAddition,
Kind::DocumentAdditionOrUpdate,
) => {
addition_ids.push(id);
ControlFlow::Continue(BatchKind::SettingsAndDocumentAddition {
@ -248,10 +259,11 @@ impl BatchKind {
}
(_, Kind::CancelTask | Kind::DumpExport | Kind::Snapshot) => unreachable!(),
(
BatchKind::CreateIndex { .. }
| BatchKind::DeleteIndex { .. }
| BatchKind::SwapIndex { .. }
| BatchKind::RenameIndex { .. },
BatchKind::IndexCreation { .. }
| BatchKind::IndexDeletion { .. }
| BatchKind::IndexUpdate { .. }
| BatchKind::IndexRename { .. }
| BatchKind::IndexSwap { .. },
_,
) => {
unreachable!()

View File

@ -3,6 +3,7 @@ use crate::{
task::{KindWithContent, Status},
Error, IndexScheduler, Result, TaskId,
};
use index::{Settings, Unchecked};
use milli::{
heed::{RoTxn, RwTxn},
update::IndexDocumentsMethod,
@ -29,7 +30,7 @@ pub(crate) enum Batch {
content_files: Vec<Uuid>,
document_addition_tasks: Vec<Task>,
settings: Vec<String>,
settings: Vec<Settings<Unchecked>>,
settings_tasks: Vec<Task>,
},
}
@ -42,10 +43,10 @@ impl IndexScheduler {
batch: BatchKind,
) -> Result<Option<Batch>> {
match batch {
BatchKind::ClearAll { ids } => todo!(),
BatchKind::DocumentClear { ids } => todo!(),
BatchKind::DocumentAddition { addition_ids } => todo!(),
BatchKind::DocumentDeletion { deletion_ids } => todo!(),
BatchKind::ClearAllAndSettings {
BatchKind::ClearAndSettings {
other,
settings_ids,
} => todo!(),
@ -73,13 +74,17 @@ impl IndexScheduler {
.collect::<Result<Vec<_>>>()?;
let primary_key = match &document_addition_tasks[0].kind {
KindWithContent::DocumentAddition { primary_key, .. } => primary_key.clone(),
KindWithContent::DocumentAdditionOrUpdate { primary_key, .. } => {
primary_key.clone()
}
_ => unreachable!(),
};
let content_files = document_addition_tasks
.iter()
.map(|task| match task.kind {
KindWithContent::DocumentAddition { content_file, .. } => content_file,
KindWithContent::DocumentAdditionOrUpdate { content_file, .. } => {
content_file
}
_ => unreachable!(),
})
.collect();
@ -87,7 +92,7 @@ impl IndexScheduler {
let settings = settings_tasks
.iter()
.map(|task| match &task.kind {
KindWithContent::Settings { new_settings, .. } => new_settings.to_string(),
KindWithContent::Settings { new_settings, .. } => new_settings.clone(),
_ => unreachable!(),
})
.collect();
@ -102,10 +107,11 @@ impl IndexScheduler {
}))
}
BatchKind::Settings { settings_ids } => todo!(),
BatchKind::DeleteIndex { ids } => todo!(),
BatchKind::CreateIndex { id } => todo!(),
BatchKind::SwapIndex { id } => todo!(),
BatchKind::RenameIndex { id } => todo!(),
BatchKind::IndexCreation { id } => todo!(),
BatchKind::IndexDeletion { ids } => todo!(),
BatchKind::IndexUpdate { id } => todo!(),
BatchKind::IndexSwap { id } => todo!(),
BatchKind::IndexRename { id } => todo!(),
}
}

View File

@ -1,4 +1,6 @@
use anyhow::Result;
use index::{Settings, Unchecked};
use milli::update::IndexDocumentsMethod;
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use time::OffsetDateTime;
@ -51,67 +53,73 @@ impl Task {
}
}
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum KindWithContent {
DumpExport {
output: PathBuf,
},
Snapshot,
DocumentAddition {
index_name: String,
DocumentAdditionOrUpdate {
index_uid: String,
merge_strategy: IndexDocumentsMethod,
primary_key: Option<String>,
content_file: Uuid,
documents_count: usize,
allow_index_creation: bool,
},
DocumentDeletion {
index_name: String,
index_uid: String,
documents_ids: Vec<String>,
},
ClearAllDocuments {
index_name: String,
DocumentClear {
index_uid: String,
},
Settings {
index_name: String,
// TODO: TAMO: fix the type
new_settings: String,
index_uid: String,
new_settings: Settings<Unchecked>,
is_deletion: bool,
allow_index_creation: bool,
},
RenameIndex {
index_name: String,
new_name: String,
IndexDeletion {
index_uid: String,
},
CreateIndex {
index_name: String,
IndexCreation {
index_uid: String,
primary_key: Option<String>,
},
DeleteIndex {
index_name: String,
IndexUpdate {
index_uid: String,
primary_key: Option<String>,
},
SwapIndex {
IndexRename {
index_uid: String,
new_name: String,
},
IndexSwap {
lhs: String,
rhs: String,
},
CancelTask {
tasks: Vec<TaskId>,
},
DumpExport {
output: PathBuf,
},
Snapshot,
}
impl KindWithContent {
pub fn as_kind(&self) -> Kind {
match self {
KindWithContent::DumpExport { .. } => Kind::DumpExport,
KindWithContent::DocumentAddition { .. } => Kind::DocumentAddition,
KindWithContent::DocumentAdditionOrUpdate { .. } => Kind::DocumentAdditionOrUpdate,
KindWithContent::DocumentDeletion { .. } => Kind::DocumentDeletion,
KindWithContent::ClearAllDocuments { .. } => Kind::ClearAllDocuments,
KindWithContent::RenameIndex { .. } => Kind::RenameIndex,
KindWithContent::CreateIndex { .. } => Kind::CreateIndex,
KindWithContent::DeleteIndex { .. } => Kind::DeleteIndex,
KindWithContent::SwapIndex { .. } => Kind::SwapIndex,
KindWithContent::DocumentClear { .. } => Kind::DocumentClear,
KindWithContent::Settings { .. } => Kind::Settings,
KindWithContent::IndexCreation { .. } => Kind::IndexCreation,
KindWithContent::IndexDeletion { .. } => Kind::IndexDeletion,
KindWithContent::IndexUpdate { .. } => Kind::IndexUpdate,
KindWithContent::IndexRename { .. } => Kind::IndexRename,
KindWithContent::IndexSwap { .. } => Kind::IndexSwap,
KindWithContent::CancelTask { .. } => Kind::CancelTask,
KindWithContent::DumpExport { .. } => Kind::DumpExport,
KindWithContent::Snapshot => Kind::Snapshot,
KindWithContent::Settings {
index_name,
new_settings,
} => todo!(),
}
}
@ -119,26 +127,22 @@ impl KindWithContent {
use KindWithContent::*;
match self {
DocumentAddition {
index_name: _,
content_file: _,
primary_key,
} => {
DocumentAdditionOrUpdate { .. } => {
// TODO: TAMO: persist the file
// content_file.persist();
Ok(())
}
// There is nothing to persist for all these tasks
DumpExport { .. }
DocumentDeletion { .. }
| DocumentClear { .. }
| Settings { .. }
| DocumentDeletion { .. }
| ClearAllDocuments { .. }
| RenameIndex { .. }
| CreateIndex { .. }
| DeleteIndex { .. }
| SwapIndex { .. }
| IndexCreation { .. }
| IndexDeletion { .. }
| IndexUpdate { .. }
| IndexRename { .. }
| IndexSwap { .. }
| CancelTask { .. }
| Snapshot => Ok(()),
| DumpExport { .. }
| Snapshot => Ok(()), // There is nothing to persist for all these tasks
}
}
@ -146,26 +150,23 @@ impl KindWithContent {
use KindWithContent::*;
match self {
DocumentAddition {
index_name: _,
content_file: _,
primary_key,
} => {
DocumentAdditionOrUpdate { .. } => {
// TODO: TAMO: delete the file
// content_file.delete();
Ok(())
}
// There is no data associated with all these tasks
DumpExport { .. }
| Settings { .. }
DocumentAdditionOrUpdate { .. }
| IndexCreation { .. }
| DocumentDeletion { .. }
| ClearAllDocuments { .. }
| RenameIndex { .. }
| CreateIndex { .. }
| DeleteIndex { .. }
| SwapIndex { .. }
| DocumentClear { .. }
| Settings { .. }
| IndexDeletion { .. }
| IndexUpdate { .. }
| IndexRename { .. }
| IndexSwap { .. }
| CancelTask { .. }
| Snapshot => Ok(()),
| DumpExport { .. }
| Snapshot => Ok(()), // There is no data associated with all these tasks
}
}
@ -174,17 +175,18 @@ impl KindWithContent {
match self {
DumpExport { .. } | Snapshot | CancelTask { .. } => None,
DocumentAddition { index_name, .. }
| DocumentDeletion { index_name, .. }
| ClearAllDocuments { index_name }
| CreateIndex { index_name, .. }
| DeleteIndex { index_name } => Some(vec![index_name]),
RenameIndex {
index_name: lhs,
DocumentAdditionOrUpdate { index_uid, .. }
| DocumentDeletion { index_uid, .. }
| DocumentClear { index_uid }
| Settings { index_uid, .. }
| IndexCreation { index_uid, .. }
| IndexUpdate { index_uid, .. }
| IndexDeletion { index_uid } => Some(vec![index_uid]),
IndexRename {
index_uid: lhs,
new_name: rhs,
}
| SwapIndex { lhs, rhs } => Some(vec![lhs, rhs]),
Settings { index_name, .. } => Some(vec![index_name]),
| IndexSwap { lhs, rhs } => Some(vec![lhs, rhs]),
}
}
}
@ -192,15 +194,16 @@ impl KindWithContent {
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum Kind {
CancelTask,
ClearAllDocuments,
CreateIndex,
DeleteIndex,
DocumentAddition,
DocumentAdditionOrUpdate,
DocumentDeletion,
DumpExport,
RenameIndex,
DocumentClear,
Settings,
IndexCreation,
IndexDeletion,
IndexUpdate,
IndexRename,
IndexSwap,
CancelTask,
DumpExport,
Snapshot,
SwapIndex,
}