move the autobatcher logic to another file

This commit is contained in:
Irevoire 2022-09-09 12:16:19 +02:00 committed by Clément Renault
parent f638774764
commit 448f44f631
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
3 changed files with 289 additions and 302 deletions

View File

@ -0,0 +1,231 @@
use crate::{task::Kind, TaskId};
pub enum BatchKind {
ClearAll {
ids: Vec<TaskId>,
},
DocumentAddition {
addition_ids: Vec<TaskId>,
},
DocumentDeletion {
deletion_ids: Vec<TaskId>,
},
ClearAllAndSettings {
other: Vec<TaskId>,
settings_ids: Vec<TaskId>,
},
SettingsAndDocumentAddition {
settings_ids: Vec<TaskId>,
addition_ids: Vec<TaskId>,
},
Settings {
settings_ids: Vec<TaskId>,
},
DeleteIndex {
ids: Vec<TaskId>,
},
CreateIndex {
id: TaskId,
},
SwapIndex {
id: TaskId,
},
RenameIndex {
id: TaskId,
},
}
impl BatchKind {
/// return true if you must stop right there.
pub fn new(task_id: TaskId, kind: Kind) -> (Self, bool) {
match kind {
Kind::CreateIndex => (BatchKind::CreateIndex { id: task_id }, true),
Kind::DeleteIndex => (BatchKind::DeleteIndex { ids: vec![task_id] }, true),
Kind::RenameIndex => (BatchKind::RenameIndex { id: task_id }, true),
Kind::SwapIndex => (BatchKind::SwapIndex { id: task_id }, true),
Kind::ClearAllDocuments => (BatchKind::ClearAll { ids: vec![task_id] }, false),
Kind::DocumentAddition => (
BatchKind::DocumentAddition {
addition_ids: vec![task_id],
},
false,
),
Kind::DocumentDeletion => (
BatchKind::DocumentDeletion {
deletion_ids: vec![task_id],
},
false,
),
Kind::Settings => (
BatchKind::Settings {
settings_ids: vec![task_id],
},
false,
),
Kind::DumpExport | Kind::Snapshot | Kind::CancelTask => unreachable!(),
}
}
/// Return true if you must stop.
fn accumulate(&mut self, id: TaskId, kind: Kind) -> bool {
match (self, kind) {
// must handle the deleteIndex
(_, Kind::CreateIndex | Kind::RenameIndex | Kind::SwapIndex) => true,
(BatchKind::ClearAll { ids }, Kind::ClearAllDocuments | Kind::DocumentDeletion) => {
ids.push(id);
false
}
(BatchKind::ClearAll { .. }, Kind::DocumentAddition | Kind::Settings) => true,
(BatchKind::DocumentAddition { addition_ids }, Kind::ClearAllDocuments) => {
addition_ids.push(id);
*self = BatchKind::ClearAll {
ids: addition_ids.clone(),
};
false
}
(BatchKind::DocumentAddition { addition_ids }, Kind::DocumentAddition) => {
addition_ids.push(id);
false
}
(BatchKind::DocumentAddition { .. }, Kind::DocumentDeletion) => true,
(BatchKind::DocumentAddition { addition_ids }, Kind::Settings) => {
*self = BatchKind::SettingsAndDocumentAddition {
settings_ids: vec![id],
addition_ids: addition_ids.clone(),
};
false
}
(BatchKind::DocumentDeletion { deletion_ids }, Kind::ClearAllDocuments) => {
deletion_ids.push(id);
*self = BatchKind::ClearAll {
ids: deletion_ids.clone(),
};
false
}
(BatchKind::DocumentDeletion { .. }, Kind::DocumentAddition) => true,
(BatchKind::DocumentDeletion { deletion_ids }, Kind::DocumentDeletion) => {
deletion_ids.push(id);
false
}
(BatchKind::DocumentDeletion { .. }, Kind::Settings) => true,
(BatchKind::Settings { settings_ids }, Kind::ClearAllDocuments) => {
*self = BatchKind::ClearAllAndSettings {
settings_ids: settings_ids.clone(),
other: vec![id],
};
false
}
(BatchKind::Settings { .. }, Kind::DocumentAddition) => true,
(BatchKind::Settings { .. }, Kind::DocumentDeletion) => true,
(BatchKind::Settings { settings_ids }, Kind::Settings) => {
settings_ids.push(id);
false
}
(
BatchKind::ClearAllAndSettings {
other,
settings_ids,
},
Kind::ClearAllDocuments,
) => {
other.push(id);
false
}
(BatchKind::ClearAllAndSettings { .. }, Kind::DocumentAddition) => true,
(
BatchKind::ClearAllAndSettings {
other,
settings_ids,
},
Kind::DocumentDeletion,
) => {
other.push(id);
false
}
(
BatchKind::ClearAllAndSettings {
settings_ids,
other,
},
Kind::Settings,
) => {
settings_ids.push(id);
false
}
(
BatchKind::SettingsAndDocumentAddition {
settings_ids,
addition_ids,
},
Kind::ClearAllDocuments,
) => {
addition_ids.push(id);
*self = BatchKind::ClearAllAndSettings {
settings_ids: settings_ids.clone(),
other: addition_ids.clone(),
};
false
}
(
BatchKind::SettingsAndDocumentAddition {
settings_ids,
addition_ids,
},
Kind::DocumentAddition,
) => {
addition_ids.push(id);
false
}
(
BatchKind::SettingsAndDocumentAddition {
settings_ids,
addition_ids,
},
Kind::DocumentDeletion,
) => true,
(
BatchKind::SettingsAndDocumentAddition {
settings_ids,
addition_ids,
},
Kind::Settings,
) => {
settings_ids.push(id);
false
}
(_, Kind::CancelTask | Kind::DumpExport | Kind::Snapshot) => unreachable!(),
(
BatchKind::CreateIndex { .. }
| BatchKind::DeleteIndex { .. }
| BatchKind::SwapIndex { .. }
| BatchKind::RenameIndex { .. },
_,
) => {
unreachable!()
}
}
}
}
pub fn autobatch(enqueued: Vec<(TaskId, Kind)>) -> Option<BatchKind> {
let mut enqueued = enqueued.into_iter();
let (id, kind) = enqueued.next()?;
let (mut acc, is_finished) = BatchKind::new(id, kind);
if is_finished {
return Some(acc);
}
for (id, kind) in enqueued {
if acc.accumulate(id, kind) {
break;
}
}
Some(acc)
}

View File

@ -1,7 +1,4 @@
use crate::{ use crate::{autobatcher::BatchKind, task::Status, Error, IndexScheduler, Result, TaskId};
task::{KindWithContent, Status},
Error, IndexScheduler, Result, TaskId,
};
use milli::{heed::RoTxn, update::IndexDocumentsMethod}; use milli::{heed::RoTxn, update::IndexDocumentsMethod};
use uuid::Uuid; use uuid::Uuid;
@ -14,47 +11,6 @@ pub(crate) enum Batch {
IndexSpecific { index_uid: String, kind: BatchKind }, IndexSpecific { index_uid: String, kind: BatchKind },
} }
impl IndexScheduler {
/*
pub(crate) fn process_batch(&self, wtxn: &mut RwTxn, batch: Batch) -> Result<Vec<Task>> {
match batch {
Batch::DocumentAddition {
tasks,
primary_key,
content_files,
index_uid,
} => {
let index = self.create_index(wtxn, &index_uid)?;
let ret = index.update_documents(
IndexDocumentsMethod::UpdateDocuments,
primary_key,
self.file_store,
content_files,
)?;
assert_eq!(ret.len(), tasks.len(), "Update documents must return the same number of `Result` than the number of tasks.");
Ok(tasks
.into_iter()
.zip(ret)
.map(|(mut task, res)| match res {
Ok(info) => {
task.status = Status::Succeeded;
task.info = Some(info.to_string());
}
Err(error) => {
task.status = Status::Failed;
task.error = Some(error.to_string());
}
})
.collect())
}
_ => unreachable!(),
}
}
*/
}
impl IndexScheduler { impl IndexScheduler {
/// Create the next batch to be processed; /// Create the next batch to be processed;
/// 1. We get the *last* task to cancel. /// 1. We get the *last* task to cancel.
@ -109,12 +65,12 @@ impl IndexScheduler {
}) })
.collect::<Result<Vec<_>>>()?; .collect::<Result<Vec<_>>>()?;
return Ok( return Ok(crate::autobatcher::autobatch(enqueued).map(|batch_kind| {
autobatcher(enqueued).map(|batch_kind| Batch::IndexSpecific { Batch::IndexSpecific {
index_uid: index_name.to_string(), index_uid: index_name.to_string(),
kind: batch_kind, kind: batch_kind,
}), }
); }));
} }
// If we found no tasks then we were notified for something that got autobatched // If we found no tasks then we were notified for something that got autobatched
@ -122,33 +78,60 @@ impl IndexScheduler {
Ok(None) Ok(None)
} }
/// Batch all the consecutive tasks coming next that shares the same `Kind` pub(crate) fn process_batch(&self, wtxn: &mut RwTxn, batch: Batch) -> Result<Vec<Task>> {
/// for a specific index. There *MUST* be at least ONE task of this kind. match batch {
fn batch_contiguous_kind(&self, rtxn: &RoTxn, index: &str, kind: Kind) -> Result<Batch> { Batch::IndexSpecific { index_uid, kind } => {
let enqueued = &self.get_status(rtxn, Status::Enqueued)?; let index = create_index();
match kind {
BatchKind::ClearAll { ids } => todo!(),
BatchKind::DocumentAddition { addition_ids } => {
let index = self.create_index(wtxn, &index_uid)?;
let ret = index.update_documents(
IndexDocumentsMethod::UpdateDocuments,
None, // TODO primary key
self.file_store,
content_files,
)?;
// [1, 2, 4, 5] assert_eq!(ret.len(), tasks.len(), "Update documents must return the same number of `Result` than the number of tasks.");
let index_tasks = self.get_index(rtxn, &index)? & enqueued;
// [1, 2, 5]
let tasks_kind = &index_tasks & self.get_kind(rtxn, kind)?;
// [4]
let not_kind = &index_tasks - &tasks_kind;
// [1, 2] Ok(tasks
let mut to_process = tasks_kind.clone(); .into_iter()
if let Some(max) = not_kind.max() { .zip(ret)
// it's safe to unwrap since we already ensured there .map(|(mut task, res)| match res {
// was AT LEAST one task with the document addition tasks_kind. Ok(info) => {
to_process.remove_range(tasks_kind.min().unwrap()..max); task.status = Status::Succeeded;
task.info = Some(info.to_string());
}
Err(error) => {
task.status = Status::Failed;
task.error = Some(error.to_string());
} }
Ok(Batch::Contiguous {
tasks: self.get_existing_tasks(rtxn, to_process)?,
kind,
}) })
.collect())
}
BatchKind::DocumentDeletion { deletion_ids } => todo!(),
BatchKind::ClearAllAndSettings {
other,
settings_ids,
} => todo!(),
BatchKind::SettingsAndDocumentAddition {
settings_ids,
addition_ids,
} => todo!(),
BatchKind::Settings { settings_ids } => todo!(),
BatchKind::DeleteIndex { ids } => todo!(),
BatchKind::CreateIndex { id } => todo!(),
BatchKind::SwapIndex { id } => todo!(),
BatchKind::RenameIndex { id } => todo!(),
}
}
_ => unreachable!(),
}
} }
} }
/*
impl Batch { impl Batch {
pub fn task_ids(&self) -> impl IntoIterator<Item = TaskId> + '_ { pub fn task_ids(&self) -> impl IntoIterator<Item = TaskId> + '_ {
match self { match self {
@ -162,233 +145,4 @@ impl Batch {
} }
} }
} }
*/
pub(crate) enum BatchKind {
ClearAll {
ids: Vec<TaskId>,
},
DocumentAddition {
addition_ids: Vec<TaskId>,
},
DocumentDeletion {
deletion_ids: Vec<TaskId>,
},
ClearAllAndSettings {
other: Vec<TaskId>,
settings_ids: Vec<TaskId>,
},
SettingsAndDocumentAddition {
settings_ids: Vec<TaskId>,
addition_ids: Vec<TaskId>,
},
Settings {
settings_ids: Vec<TaskId>,
},
DeleteIndex {
ids: Vec<TaskId>,
},
CreateIndex {
id: TaskId,
},
SwapIndex {
id: TaskId,
},
RenameIndex {
id: TaskId,
},
}
impl BatchKind {
/// return true if you must stop right there.
pub fn new(task_id: TaskId, kind: Kind) -> (Self, bool) {
match kind {
Kind::CreateIndex => (BatchKind::CreateIndex { id: task_id }, true),
Kind::DeleteIndex => (BatchKind::DeleteIndex { ids: vec![task_id] }, true),
Kind::RenameIndex => (BatchKind::RenameIndex { id: task_id }, true),
Kind::SwapIndex => (BatchKind::SwapIndex { id: task_id }, true),
Kind::ClearAllDocuments => (BatchKind::ClearAll { ids: vec![task_id] }, false),
Kind::DocumentAddition => (
BatchKind::DocumentAddition {
addition_ids: vec![task_id],
},
false,
),
Kind::DocumentDeletion => (
BatchKind::DocumentDeletion {
deletion_ids: vec![task_id],
},
false,
),
Kind::Settings => (
BatchKind::Settings {
settings_ids: vec![task_id],
},
false,
),
Kind::DumpExport | Kind::Snapshot | Kind::CancelTask => unreachable!(),
}
}
/// Return true if you must stop.
fn accumulate(&mut self, id: TaskId, kind: Kind) -> bool {
match (self, kind) {
// must handle the deleteIndex
(_, Kind::CreateIndex | Kind::RenameIndex | Kind::SwapIndex) => true,
(BatchKind::ClearAll { ids }, Kind::ClearAllDocuments | Kind::DocumentDeletion) => {
ids.push(id);
false
}
(BatchKind::ClearAll { .. }, Kind::DocumentAddition | Kind::Settings) => true,
(BatchKind::DocumentAddition { addition_ids }, Kind::ClearAllDocuments) => {
addition_ids.push(id);
*self = BatchKind::ClearAll {
ids: addition_ids.clone(),
};
false
}
(BatchKind::DocumentAddition { addition_ids }, Kind::DocumentAddition) => {
addition_ids.push(id);
false
}
(BatchKind::DocumentAddition { .. }, Kind::DocumentDeletion) => true,
(BatchKind::DocumentAddition { addition_ids }, Kind::Settings) => {
*self = BatchKind::SettingsAndDocumentAddition {
settings_ids: vec![id],
addition_ids: addition_ids.clone(),
};
false
}
(BatchKind::DocumentDeletion { deletion_ids }, Kind::ClearAllDocuments) => {
deletion_ids.push(id);
*self = BatchKind::ClearAll {
ids: deletion_ids.clone(),
};
false
}
(BatchKind::DocumentDeletion { .. }, Kind::DocumentAddition) => true,
(BatchKind::DocumentDeletion { deletion_ids }, Kind::DocumentDeletion) => {
deletion_ids.push(id);
false
}
(BatchKind::DocumentDeletion { .. }, Kind::Settings) => true,
(BatchKind::Settings { settings_ids }, Kind::ClearAllDocuments) => {
*self = BatchKind::ClearAllAndSettings {
settings_ids: settings_ids.clone(),
other: vec![id],
};
false
}
(BatchKind::Settings { .. }, Kind::DocumentAddition) => true,
(BatchKind::Settings { .. }, Kind::DocumentDeletion) => true,
(BatchKind::Settings { settings_ids }, Kind::Settings) => {
settings_ids.push(id);
false
}
(
BatchKind::ClearAllAndSettings {
other,
settings_ids,
},
Kind::ClearAllDocuments,
) => {
other.push(id);
false
}
(BatchKind::ClearAllAndSettings { .. }, Kind::DocumentAddition) => true,
(
BatchKind::ClearAllAndSettings {
other,
settings_ids,
},
Kind::DocumentDeletion,
) => {
other.push(id);
false
}
(
BatchKind::ClearAllAndSettings {
settings_ids,
other,
},
Kind::Settings,
) => {
settings_ids.push(id);
false
}
(
BatchKind::SettingsAndDocumentAddition {
settings_ids,
addition_ids,
},
Kind::ClearAllDocuments,
) => {
addition_ids.push(id);
*self = BatchKind::ClearAllAndSettings {
settings_ids: settings_ids.clone(),
other: addition_ids.clone(),
};
false
}
(
BatchKind::SettingsAndDocumentAddition {
settings_ids,
addition_ids,
},
Kind::DocumentAddition,
) => {
addition_ids.push(id);
false
}
(
BatchKind::SettingsAndDocumentAddition {
settings_ids,
addition_ids,
},
Kind::DocumentDeletion,
) => true,
(
BatchKind::SettingsAndDocumentAddition {
settings_ids,
addition_ids,
},
Kind::Settings,
) => {
settings_ids.push(id);
false
}
(_, Kind::CancelTask | Kind::DumpExport | Kind::Snapshot) => unreachable!(),
(
BatchKind::CreateIndex { .. }
| BatchKind::DeleteIndex { .. }
| BatchKind::SwapIndex { .. }
| BatchKind::RenameIndex { .. },
_,
) => {
unreachable!()
}
}
}
}
pub fn autobatcher(enqueued: Vec<(TaskId, Kind)>) -> Option<BatchKind> {
let mut enqueued = enqueued.into_iter();
let (id, kind) = enqueued.next()?;
let (mut acc, is_finished) = BatchKind::new(id, kind);
if is_finished {
return Some(acc);
}
for (id, kind) in enqueued {
if acc.accumulate(id, kind) {
break;
}
}
Some(acc)
}

View File

@ -1,3 +1,4 @@
mod autobatcher;
mod batch; mod batch;
mod document_formats; mod document_formats;
pub mod error; pub mod error;
@ -219,7 +220,7 @@ impl IndexScheduler {
continue; continue;
} }
}; };
let mut batch = match self.get_next_batch(&wtxn) { let mut batch = match self.create_next_batch(&wtxn) {
Ok(batch) => batch, Ok(batch) => batch,
Err(e) => { Err(e) => {
log::error!("{}", e); log::error!("{}", e);
@ -227,7 +228,7 @@ impl IndexScheduler {
} }
}; };
let res = self.process_batch(&mut wtxn, &mut batch); let res = self.process_batch(&mut wtxn, batch);
// TODO: TAMO: do this later // TODO: TAMO: do this later
// must delete the file on disk // must delete the file on disk
@ -245,6 +246,7 @@ impl IndexScheduler {
} }
} }
#[cfg(truc)]
fn process_batch(&self, wtxn: &mut RwTxn, batch: &mut Batch) -> Result<()> { fn process_batch(&self, wtxn: &mut RwTxn, batch: &mut Batch) -> Result<()> {
match batch { match batch {
Batch::One(task) => match &task.kind { Batch::One(task) => match &task.kind {