mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-01-30 23:13:09 +08:00
Use a ControlFlow in the autobatcher function
This commit is contained in:
parent
f1b1cfdbcc
commit
87212cfd20
@ -1,5 +1,5 @@
|
|||||||
use milli::update::IndexDocumentsMethod::{self, ReplaceDocuments, UpdateDocuments};
|
use milli::update::IndexDocumentsMethod::{self, ReplaceDocuments, UpdateDocuments};
|
||||||
use std::ops::ControlFlow;
|
use std::ops::ControlFlow::{self, Break, Continue};
|
||||||
|
|
||||||
use crate::{task::Kind, TaskId};
|
use crate::{task::Kind, TaskId};
|
||||||
|
|
||||||
@ -42,52 +42,37 @@ pub enum BatchKind {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl BatchKind {
|
impl BatchKind {
|
||||||
/// return true if you must stop right there.
|
/// Returns a `ControlFlow::Break` if you must stop right now.
|
||||||
pub fn new(task_id: TaskId, kind: Kind) -> (Self, bool) {
|
pub fn new(task_id: TaskId, kind: Kind) -> ControlFlow<BatchKind, BatchKind> {
|
||||||
match kind {
|
match kind {
|
||||||
Kind::IndexCreation => (BatchKind::IndexCreation { id: task_id }, true),
|
Kind::IndexCreation => Break(BatchKind::IndexCreation { id: task_id }),
|
||||||
Kind::IndexDeletion => (BatchKind::IndexDeletion { ids: vec![task_id] }, true),
|
Kind::IndexDeletion => Break(BatchKind::IndexDeletion { ids: vec![task_id] }),
|
||||||
Kind::IndexUpdate => (BatchKind::IndexUpdate { id: task_id }, true),
|
Kind::IndexUpdate => Break(BatchKind::IndexUpdate { id: task_id }),
|
||||||
Kind::IndexSwap => (BatchKind::IndexSwap { id: task_id }, true),
|
Kind::IndexSwap => Break(BatchKind::IndexSwap { id: task_id }),
|
||||||
Kind::DocumentClear => (BatchKind::DocumentClear { ids: vec![task_id] }, false),
|
Kind::DocumentClear => Continue(BatchKind::DocumentClear { ids: vec![task_id] }),
|
||||||
Kind::DocumentAddition => (
|
Kind::DocumentAddition => Continue(BatchKind::DocumentImport {
|
||||||
BatchKind::DocumentImport {
|
method: ReplaceDocuments,
|
||||||
method: ReplaceDocuments,
|
import_ids: vec![task_id],
|
||||||
import_ids: vec![task_id],
|
}),
|
||||||
},
|
Kind::DocumentUpdate => Continue(BatchKind::DocumentImport {
|
||||||
false,
|
method: UpdateDocuments,
|
||||||
),
|
import_ids: vec![task_id],
|
||||||
Kind::DocumentUpdate => (
|
}),
|
||||||
BatchKind::DocumentImport {
|
Kind::DocumentDeletion => Continue(BatchKind::DocumentDeletion {
|
||||||
method: UpdateDocuments,
|
deletion_ids: vec![task_id],
|
||||||
import_ids: vec![task_id],
|
}),
|
||||||
},
|
Kind::Settings => Continue(BatchKind::Settings {
|
||||||
false,
|
settings_ids: vec![task_id],
|
||||||
),
|
}),
|
||||||
Kind::DocumentDeletion => (
|
|
||||||
BatchKind::DocumentDeletion {
|
|
||||||
deletion_ids: vec![task_id],
|
|
||||||
},
|
|
||||||
false,
|
|
||||||
),
|
|
||||||
Kind::Settings => (
|
|
||||||
BatchKind::Settings {
|
|
||||||
settings_ids: vec![task_id],
|
|
||||||
},
|
|
||||||
false,
|
|
||||||
),
|
|
||||||
|
|
||||||
Kind::DumpExport | Kind::Snapshot | Kind::CancelTask => unreachable!(),
|
Kind::DumpExport | Kind::Snapshot | Kind::CancelTask => unreachable!(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return true if you must stop.
|
/// Returns a `ControlFlow::Break` if you must stop right now.
|
||||||
fn accumulate(self, id: TaskId, kind: Kind) -> ControlFlow<Self, Self> {
|
fn accumulate(self, id: TaskId, kind: Kind) -> ControlFlow<BatchKind, BatchKind> {
|
||||||
match (self, kind) {
|
match (self, kind) {
|
||||||
// We don't batch any of these operations
|
// We don't batch any of these operations
|
||||||
(this, Kind::IndexCreation | Kind::IndexUpdate | Kind::IndexSwap) => {
|
(this, Kind::IndexCreation | Kind::IndexUpdate | Kind::IndexSwap) => Break(this),
|
||||||
ControlFlow::Break(this)
|
|
||||||
}
|
|
||||||
// The index deletion can batch with everything but must stop after
|
// The index deletion can batch with everything but must stop after
|
||||||
(
|
(
|
||||||
BatchKind::DocumentClear { mut ids }
|
BatchKind::DocumentClear { mut ids }
|
||||||
@ -104,7 +89,7 @@ impl BatchKind {
|
|||||||
Kind::IndexDeletion,
|
Kind::IndexDeletion,
|
||||||
) => {
|
) => {
|
||||||
ids.push(id);
|
ids.push(id);
|
||||||
ControlFlow::Break(BatchKind::IndexDeletion { ids })
|
Break(BatchKind::IndexDeletion { ids })
|
||||||
}
|
}
|
||||||
(
|
(
|
||||||
BatchKind::ClearAndSettings {
|
BatchKind::ClearAndSettings {
|
||||||
@ -120,7 +105,7 @@ impl BatchKind {
|
|||||||
) => {
|
) => {
|
||||||
ids.push(id);
|
ids.push(id);
|
||||||
ids.append(&mut other);
|
ids.append(&mut other);
|
||||||
ControlFlow::Break(BatchKind::IndexDeletion { ids })
|
Break(BatchKind::IndexDeletion { ids })
|
||||||
}
|
}
|
||||||
|
|
||||||
(
|
(
|
||||||
@ -128,12 +113,12 @@ impl BatchKind {
|
|||||||
Kind::DocumentClear | Kind::DocumentDeletion,
|
Kind::DocumentClear | Kind::DocumentDeletion,
|
||||||
) => {
|
) => {
|
||||||
ids.push(id);
|
ids.push(id);
|
||||||
ControlFlow::Continue(BatchKind::DocumentClear { ids })
|
Continue(BatchKind::DocumentClear { ids })
|
||||||
}
|
}
|
||||||
(
|
(
|
||||||
this @ BatchKind::DocumentClear { .. },
|
this @ BatchKind::DocumentClear { .. },
|
||||||
Kind::DocumentAddition | Kind::DocumentUpdate | Kind::Settings,
|
Kind::DocumentAddition | Kind::DocumentUpdate | Kind::Settings,
|
||||||
) => ControlFlow::Break(this),
|
) => Break(this),
|
||||||
(
|
(
|
||||||
BatchKind::DocumentImport {
|
BatchKind::DocumentImport {
|
||||||
method: _,
|
method: _,
|
||||||
@ -142,7 +127,7 @@ impl BatchKind {
|
|||||||
Kind::DocumentClear,
|
Kind::DocumentClear,
|
||||||
) => {
|
) => {
|
||||||
ids.push(id);
|
ids.push(id);
|
||||||
ControlFlow::Continue(BatchKind::DocumentClear { ids })
|
Continue(BatchKind::DocumentClear { ids })
|
||||||
}
|
}
|
||||||
|
|
||||||
// we can autobatch the same kind of document additions / updates
|
// we can autobatch the same kind of document additions / updates
|
||||||
@ -154,7 +139,7 @@ impl BatchKind {
|
|||||||
Kind::DocumentAddition,
|
Kind::DocumentAddition,
|
||||||
) => {
|
) => {
|
||||||
import_ids.push(id);
|
import_ids.push(id);
|
||||||
ControlFlow::Continue(BatchKind::DocumentImport {
|
Continue(BatchKind::DocumentImport {
|
||||||
method: ReplaceDocuments,
|
method: ReplaceDocuments,
|
||||||
import_ids,
|
import_ids,
|
||||||
})
|
})
|
||||||
@ -167,7 +152,7 @@ impl BatchKind {
|
|||||||
Kind::DocumentUpdate,
|
Kind::DocumentUpdate,
|
||||||
) => {
|
) => {
|
||||||
import_ids.push(id);
|
import_ids.push(id);
|
||||||
ControlFlow::Continue(BatchKind::DocumentImport {
|
Continue(BatchKind::DocumentImport {
|
||||||
method: UpdateDocuments,
|
method: UpdateDocuments,
|
||||||
import_ids,
|
import_ids,
|
||||||
})
|
})
|
||||||
@ -177,9 +162,9 @@ impl BatchKind {
|
|||||||
(
|
(
|
||||||
this @ BatchKind::DocumentImport { .. },
|
this @ BatchKind::DocumentImport { .. },
|
||||||
Kind::DocumentDeletion | Kind::DocumentAddition | Kind::DocumentUpdate,
|
Kind::DocumentDeletion | Kind::DocumentAddition | Kind::DocumentUpdate,
|
||||||
) => ControlFlow::Break(this),
|
) => Break(this),
|
||||||
(BatchKind::DocumentImport { method, import_ids }, Kind::Settings) => {
|
(BatchKind::DocumentImport { method, import_ids }, Kind::Settings) => {
|
||||||
ControlFlow::Continue(BatchKind::SettingsAndDocumentImport {
|
Continue(BatchKind::SettingsAndDocumentImport {
|
||||||
settings_ids: vec![id],
|
settings_ids: vec![id],
|
||||||
method,
|
method,
|
||||||
import_ids,
|
import_ids,
|
||||||
@ -188,20 +173,20 @@ impl BatchKind {
|
|||||||
|
|
||||||
(BatchKind::DocumentDeletion { mut deletion_ids }, Kind::DocumentClear) => {
|
(BatchKind::DocumentDeletion { mut deletion_ids }, Kind::DocumentClear) => {
|
||||||
deletion_ids.push(id);
|
deletion_ids.push(id);
|
||||||
ControlFlow::Continue(BatchKind::DocumentClear { ids: deletion_ids })
|
Continue(BatchKind::DocumentClear { ids: deletion_ids })
|
||||||
}
|
}
|
||||||
(
|
(
|
||||||
this @ BatchKind::DocumentDeletion { .. },
|
this @ BatchKind::DocumentDeletion { .. },
|
||||||
Kind::DocumentAddition | Kind::DocumentUpdate,
|
Kind::DocumentAddition | Kind::DocumentUpdate,
|
||||||
) => ControlFlow::Break(this),
|
) => Break(this),
|
||||||
(BatchKind::DocumentDeletion { mut deletion_ids }, Kind::DocumentDeletion) => {
|
(BatchKind::DocumentDeletion { mut deletion_ids }, Kind::DocumentDeletion) => {
|
||||||
deletion_ids.push(id);
|
deletion_ids.push(id);
|
||||||
ControlFlow::Continue(BatchKind::DocumentDeletion { deletion_ids })
|
Continue(BatchKind::DocumentDeletion { deletion_ids })
|
||||||
}
|
}
|
||||||
(this @ BatchKind::DocumentDeletion { .. }, Kind::Settings) => ControlFlow::Break(this),
|
(this @ BatchKind::DocumentDeletion { .. }, Kind::Settings) => Break(this),
|
||||||
|
|
||||||
(BatchKind::Settings { settings_ids }, Kind::DocumentClear) => {
|
(BatchKind::Settings { settings_ids }, Kind::DocumentClear) => {
|
||||||
ControlFlow::Continue(BatchKind::ClearAndSettings {
|
Continue(BatchKind::ClearAndSettings {
|
||||||
settings_ids: settings_ids,
|
settings_ids: settings_ids,
|
||||||
other: vec![id],
|
other: vec![id],
|
||||||
})
|
})
|
||||||
@ -209,10 +194,10 @@ impl BatchKind {
|
|||||||
(
|
(
|
||||||
this @ BatchKind::Settings { .. },
|
this @ BatchKind::Settings { .. },
|
||||||
Kind::DocumentAddition | Kind::DocumentUpdate | Kind::DocumentDeletion,
|
Kind::DocumentAddition | Kind::DocumentUpdate | Kind::DocumentDeletion,
|
||||||
) => ControlFlow::Break(this),
|
) => Break(this),
|
||||||
(BatchKind::Settings { mut settings_ids }, Kind::Settings) => {
|
(BatchKind::Settings { mut settings_ids }, Kind::Settings) => {
|
||||||
settings_ids.push(id);
|
settings_ids.push(id);
|
||||||
ControlFlow::Continue(BatchKind::Settings { settings_ids })
|
Continue(BatchKind::Settings { settings_ids })
|
||||||
}
|
}
|
||||||
|
|
||||||
(
|
(
|
||||||
@ -223,7 +208,7 @@ impl BatchKind {
|
|||||||
Kind::DocumentClear,
|
Kind::DocumentClear,
|
||||||
) => {
|
) => {
|
||||||
other.push(id);
|
other.push(id);
|
||||||
ControlFlow::Continue(BatchKind::ClearAndSettings {
|
Continue(BatchKind::ClearAndSettings {
|
||||||
other,
|
other,
|
||||||
settings_ids,
|
settings_ids,
|
||||||
})
|
})
|
||||||
@ -231,7 +216,7 @@ impl BatchKind {
|
|||||||
(
|
(
|
||||||
this @ BatchKind::ClearAndSettings { .. },
|
this @ BatchKind::ClearAndSettings { .. },
|
||||||
Kind::DocumentAddition | Kind::DocumentUpdate,
|
Kind::DocumentAddition | Kind::DocumentUpdate,
|
||||||
) => ControlFlow::Break(this),
|
) => Break(this),
|
||||||
(
|
(
|
||||||
BatchKind::ClearAndSettings {
|
BatchKind::ClearAndSettings {
|
||||||
mut other,
|
mut other,
|
||||||
@ -240,7 +225,7 @@ impl BatchKind {
|
|||||||
Kind::DocumentDeletion,
|
Kind::DocumentDeletion,
|
||||||
) => {
|
) => {
|
||||||
other.push(id);
|
other.push(id);
|
||||||
ControlFlow::Continue(BatchKind::ClearAndSettings {
|
Continue(BatchKind::ClearAndSettings {
|
||||||
other,
|
other,
|
||||||
settings_ids,
|
settings_ids,
|
||||||
})
|
})
|
||||||
@ -253,7 +238,7 @@ impl BatchKind {
|
|||||||
Kind::Settings,
|
Kind::Settings,
|
||||||
) => {
|
) => {
|
||||||
settings_ids.push(id);
|
settings_ids.push(id);
|
||||||
ControlFlow::Continue(BatchKind::ClearAndSettings {
|
Continue(BatchKind::ClearAndSettings {
|
||||||
other,
|
other,
|
||||||
settings_ids,
|
settings_ids,
|
||||||
})
|
})
|
||||||
@ -267,7 +252,7 @@ impl BatchKind {
|
|||||||
Kind::DocumentClear,
|
Kind::DocumentClear,
|
||||||
) => {
|
) => {
|
||||||
other.push(id);
|
other.push(id);
|
||||||
ControlFlow::Continue(BatchKind::ClearAndSettings {
|
Continue(BatchKind::ClearAndSettings {
|
||||||
settings_ids,
|
settings_ids,
|
||||||
other,
|
other,
|
||||||
})
|
})
|
||||||
@ -283,7 +268,7 @@ impl BatchKind {
|
|||||||
Kind::DocumentAddition,
|
Kind::DocumentAddition,
|
||||||
) => {
|
) => {
|
||||||
import_ids.push(id);
|
import_ids.push(id);
|
||||||
ControlFlow::Continue(BatchKind::SettingsAndDocumentImport {
|
Continue(BatchKind::SettingsAndDocumentImport {
|
||||||
settings_ids,
|
settings_ids,
|
||||||
method: ReplaceDocuments,
|
method: ReplaceDocuments,
|
||||||
import_ids,
|
import_ids,
|
||||||
@ -298,7 +283,7 @@ impl BatchKind {
|
|||||||
Kind::DocumentUpdate,
|
Kind::DocumentUpdate,
|
||||||
) => {
|
) => {
|
||||||
import_ids.push(id);
|
import_ids.push(id);
|
||||||
ControlFlow::Continue(BatchKind::SettingsAndDocumentImport {
|
Continue(BatchKind::SettingsAndDocumentImport {
|
||||||
settings_ids,
|
settings_ids,
|
||||||
method: UpdateDocuments,
|
method: UpdateDocuments,
|
||||||
import_ids,
|
import_ids,
|
||||||
@ -309,7 +294,7 @@ impl BatchKind {
|
|||||||
(
|
(
|
||||||
this @ BatchKind::SettingsAndDocumentImport { .. },
|
this @ BatchKind::SettingsAndDocumentImport { .. },
|
||||||
Kind::DocumentDeletion | Kind::DocumentAddition | Kind::DocumentUpdate,
|
Kind::DocumentDeletion | Kind::DocumentAddition | Kind::DocumentUpdate,
|
||||||
) => ControlFlow::Break(this),
|
) => Break(this),
|
||||||
(
|
(
|
||||||
BatchKind::SettingsAndDocumentImport {
|
BatchKind::SettingsAndDocumentImport {
|
||||||
mut settings_ids,
|
mut settings_ids,
|
||||||
@ -319,7 +304,7 @@ impl BatchKind {
|
|||||||
Kind::Settings,
|
Kind::Settings,
|
||||||
) => {
|
) => {
|
||||||
settings_ids.push(id);
|
settings_ids.push(id);
|
||||||
ControlFlow::Continue(BatchKind::SettingsAndDocumentImport {
|
Continue(BatchKind::SettingsAndDocumentImport {
|
||||||
settings_ids,
|
settings_ids,
|
||||||
method,
|
method,
|
||||||
import_ids,
|
import_ids,
|
||||||
@ -342,15 +327,15 @@ impl BatchKind {
|
|||||||
pub fn autobatch(enqueued: Vec<(TaskId, Kind)>) -> Option<BatchKind> {
|
pub fn autobatch(enqueued: Vec<(TaskId, Kind)>) -> Option<BatchKind> {
|
||||||
let mut enqueued = enqueued.into_iter();
|
let mut enqueued = enqueued.into_iter();
|
||||||
let (id, kind) = enqueued.next()?;
|
let (id, kind) = enqueued.next()?;
|
||||||
let (mut acc, is_finished) = BatchKind::new(id, kind);
|
let mut acc = match BatchKind::new(id, kind) {
|
||||||
if is_finished {
|
Continue(acc) => acc,
|
||||||
return Some(acc);
|
Break(acc) => return Some(acc),
|
||||||
}
|
};
|
||||||
|
|
||||||
for (id, kind) in enqueued {
|
for (id, kind) in enqueued {
|
||||||
acc = match acc.accumulate(id, kind) {
|
acc = match acc.accumulate(id, kind) {
|
||||||
ControlFlow::Continue(acc) => acc,
|
Continue(acc) => acc,
|
||||||
ControlFlow::Break(acc) => return Some(acc),
|
Break(acc) => return Some(acc),
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user