mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-02-07 11:26:16 +08:00
Make the auto-batcher batche replacement with updates
This commit is contained in:
parent
de98656ed1
commit
d018346f18
@ -5,12 +5,8 @@ tasks affecting a single index into a [batch](crate::batch::Batch).
|
|||||||
The main function of the autobatcher is [`next_autobatch`].
|
The main function of the autobatcher is [`next_autobatch`].
|
||||||
*/
|
*/
|
||||||
|
|
||||||
use std::ops::ControlFlow::{self, Break, Continue};
|
|
||||||
|
|
||||||
use meilisearch_types::milli::update::IndexDocumentsMethod::{
|
|
||||||
self, ReplaceDocuments, UpdateDocuments,
|
|
||||||
};
|
|
||||||
use meilisearch_types::tasks::TaskId;
|
use meilisearch_types::tasks::TaskId;
|
||||||
|
use std::ops::ControlFlow::{self, Break, Continue};
|
||||||
|
|
||||||
use crate::KindWithContent;
|
use crate::KindWithContent;
|
||||||
|
|
||||||
@ -19,19 +15,11 @@ use crate::KindWithContent;
|
|||||||
///
|
///
|
||||||
/// Only the non-prioritised tasks that can be grouped in a batch have a corresponding [`AutobatchKind`]
|
/// Only the non-prioritised tasks that can be grouped in a batch have a corresponding [`AutobatchKind`]
|
||||||
enum AutobatchKind {
|
enum AutobatchKind {
|
||||||
DocumentImport {
|
DocumentImport { allow_index_creation: bool, primary_key: Option<String> },
|
||||||
method: IndexDocumentsMethod,
|
|
||||||
allow_index_creation: bool,
|
|
||||||
primary_key: Option<String>,
|
|
||||||
},
|
|
||||||
DocumentEdition,
|
DocumentEdition,
|
||||||
DocumentDeletion {
|
DocumentDeletion { by_filter: bool },
|
||||||
by_filter: bool,
|
|
||||||
},
|
|
||||||
DocumentClear,
|
DocumentClear,
|
||||||
Settings {
|
Settings { allow_index_creation: bool },
|
||||||
allow_index_creation: bool,
|
|
||||||
},
|
|
||||||
IndexCreation,
|
IndexCreation,
|
||||||
IndexDeletion,
|
IndexDeletion,
|
||||||
IndexUpdate,
|
IndexUpdate,
|
||||||
@ -60,11 +48,8 @@ impl From<KindWithContent> for AutobatchKind {
|
|||||||
fn from(kind: KindWithContent) -> Self {
|
fn from(kind: KindWithContent) -> Self {
|
||||||
match kind {
|
match kind {
|
||||||
KindWithContent::DocumentAdditionOrUpdate {
|
KindWithContent::DocumentAdditionOrUpdate {
|
||||||
method,
|
allow_index_creation, primary_key, ..
|
||||||
allow_index_creation,
|
} => AutobatchKind::DocumentImport { allow_index_creation, primary_key },
|
||||||
primary_key,
|
|
||||||
..
|
|
||||||
} => AutobatchKind::DocumentImport { method, allow_index_creation, primary_key },
|
|
||||||
KindWithContent::DocumentEdition { .. } => AutobatchKind::DocumentEdition,
|
KindWithContent::DocumentEdition { .. } => AutobatchKind::DocumentEdition,
|
||||||
KindWithContent::DocumentDeletion { .. } => {
|
KindWithContent::DocumentDeletion { .. } => {
|
||||||
AutobatchKind::DocumentDeletion { by_filter: false }
|
AutobatchKind::DocumentDeletion { by_filter: false }
|
||||||
@ -99,7 +84,6 @@ pub enum BatchKind {
|
|||||||
ids: Vec<TaskId>,
|
ids: Vec<TaskId>,
|
||||||
},
|
},
|
||||||
DocumentOperation {
|
DocumentOperation {
|
||||||
method: IndexDocumentsMethod,
|
|
||||||
allow_index_creation: bool,
|
allow_index_creation: bool,
|
||||||
primary_key: Option<String>,
|
primary_key: Option<String>,
|
||||||
operation_ids: Vec<TaskId>,
|
operation_ids: Vec<TaskId>,
|
||||||
@ -172,12 +156,11 @@ impl BatchKind {
|
|||||||
K::IndexUpdate => (Break(BatchKind::IndexUpdate { id: task_id }), false),
|
K::IndexUpdate => (Break(BatchKind::IndexUpdate { id: task_id }), false),
|
||||||
K::IndexSwap => (Break(BatchKind::IndexSwap { id: task_id }), false),
|
K::IndexSwap => (Break(BatchKind::IndexSwap { id: task_id }), false),
|
||||||
K::DocumentClear => (Continue(BatchKind::DocumentClear { ids: vec![task_id] }), false),
|
K::DocumentClear => (Continue(BatchKind::DocumentClear { ids: vec![task_id] }), false),
|
||||||
K::DocumentImport { method, allow_index_creation, primary_key: pk }
|
K::DocumentImport { allow_index_creation, primary_key: pk }
|
||||||
if primary_key.is_none() || pk.is_none() || primary_key == pk.as_deref() =>
|
if primary_key.is_none() || pk.is_none() || primary_key == pk.as_deref() =>
|
||||||
{
|
{
|
||||||
(
|
(
|
||||||
Continue(BatchKind::DocumentOperation {
|
Continue(BatchKind::DocumentOperation {
|
||||||
method,
|
|
||||||
allow_index_creation,
|
allow_index_creation,
|
||||||
primary_key: pk,
|
primary_key: pk,
|
||||||
operation_ids: vec![task_id],
|
operation_ids: vec![task_id],
|
||||||
@ -186,9 +169,8 @@ impl BatchKind {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
// if the primary key set in the task was different than ours we should stop and make this batch fail asap.
|
// if the primary key set in the task was different than ours we should stop and make this batch fail asap.
|
||||||
K::DocumentImport { method, allow_index_creation, primary_key } => (
|
K::DocumentImport { allow_index_creation, primary_key } => (
|
||||||
Break(BatchKind::DocumentOperation {
|
Break(BatchKind::DocumentOperation {
|
||||||
method,
|
|
||||||
allow_index_creation,
|
allow_index_creation,
|
||||||
primary_key,
|
primary_key,
|
||||||
operation_ids: vec![task_id],
|
operation_ids: vec![task_id],
|
||||||
@ -257,7 +239,7 @@ impl BatchKind {
|
|||||||
(
|
(
|
||||||
BatchKind::DocumentClear { mut ids }
|
BatchKind::DocumentClear { mut ids }
|
||||||
| BatchKind::DocumentDeletion { deletion_ids: mut ids, includes_by_filter: _ }
|
| BatchKind::DocumentDeletion { deletion_ids: mut ids, includes_by_filter: _ }
|
||||||
| BatchKind::DocumentOperation { method: _, allow_index_creation: _, primary_key: _, operation_ids: mut ids }
|
| BatchKind::DocumentOperation { allow_index_creation: _, primary_key: _, operation_ids: mut ids }
|
||||||
| BatchKind::Settings { allow_index_creation: _, settings_ids: mut ids },
|
| BatchKind::Settings { allow_index_creation: _, settings_ids: mut ids },
|
||||||
K::IndexDeletion,
|
K::IndexDeletion,
|
||||||
) => {
|
) => {
|
||||||
@ -285,46 +267,32 @@ impl BatchKind {
|
|||||||
K::DocumentImport { .. } | K::Settings { .. },
|
K::DocumentImport { .. } | K::Settings { .. },
|
||||||
) => Break(this),
|
) => Break(this),
|
||||||
(
|
(
|
||||||
BatchKind::DocumentOperation { method: _, allow_index_creation: _, primary_key: _, mut operation_ids },
|
BatchKind::DocumentOperation { allow_index_creation: _, primary_key: _, mut operation_ids },
|
||||||
K::DocumentClear,
|
K::DocumentClear,
|
||||||
) => {
|
) => {
|
||||||
operation_ids.push(id);
|
operation_ids.push(id);
|
||||||
Continue(BatchKind::DocumentClear { ids: operation_ids })
|
Continue(BatchKind::DocumentClear { ids: operation_ids })
|
||||||
}
|
}
|
||||||
|
|
||||||
// we can autobatch the same kind of document additions / updates
|
// we can autobatch different kind of document operations and mix replacements with updates
|
||||||
(
|
(
|
||||||
BatchKind::DocumentOperation { method: ReplaceDocuments, allow_index_creation, primary_key: _, mut operation_ids },
|
BatchKind::DocumentOperation { allow_index_creation, primary_key: _, mut operation_ids },
|
||||||
K::DocumentImport { method: ReplaceDocuments, primary_key: pk, .. },
|
K::DocumentImport { primary_key: pk, .. },
|
||||||
) => {
|
) => {
|
||||||
operation_ids.push(id);
|
operation_ids.push(id);
|
||||||
Continue(BatchKind::DocumentOperation {
|
Continue(BatchKind::DocumentOperation {
|
||||||
method: ReplaceDocuments,
|
|
||||||
allow_index_creation,
|
allow_index_creation,
|
||||||
operation_ids,
|
operation_ids,
|
||||||
primary_key: pk,
|
primary_key: pk,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
(
|
(
|
||||||
BatchKind::DocumentOperation { method: UpdateDocuments, allow_index_creation, primary_key: _, mut operation_ids },
|
BatchKind::DocumentOperation { allow_index_creation, primary_key, mut operation_ids },
|
||||||
K::DocumentImport { method: UpdateDocuments, primary_key: pk, .. },
|
|
||||||
) => {
|
|
||||||
operation_ids.push(id);
|
|
||||||
Continue(BatchKind::DocumentOperation {
|
|
||||||
method: UpdateDocuments,
|
|
||||||
allow_index_creation,
|
|
||||||
primary_key: pk,
|
|
||||||
operation_ids,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
(
|
|
||||||
BatchKind::DocumentOperation { method, allow_index_creation, primary_key, mut operation_ids },
|
|
||||||
K::DocumentDeletion { by_filter: false },
|
K::DocumentDeletion { by_filter: false },
|
||||||
) => {
|
) => {
|
||||||
operation_ids.push(id);
|
operation_ids.push(id);
|
||||||
|
|
||||||
Continue(BatchKind::DocumentOperation {
|
Continue(BatchKind::DocumentOperation {
|
||||||
method,
|
|
||||||
allow_index_creation,
|
allow_index_creation,
|
||||||
primary_key,
|
primary_key,
|
||||||
operation_ids,
|
operation_ids,
|
||||||
@ -337,13 +305,6 @@ impl BatchKind {
|
|||||||
) => {
|
) => {
|
||||||
Break(this)
|
Break(this)
|
||||||
}
|
}
|
||||||
// but we can't autobatch documents if it's not the same kind
|
|
||||||
// this match branch MUST be AFTER the previous one
|
|
||||||
(
|
|
||||||
this @ BatchKind::DocumentOperation { .. },
|
|
||||||
K::DocumentImport { .. },
|
|
||||||
) => Break(this),
|
|
||||||
|
|
||||||
(
|
(
|
||||||
this @ BatchKind::DocumentOperation { .. },
|
this @ BatchKind::DocumentOperation { .. },
|
||||||
K::Settings { .. },
|
K::Settings { .. },
|
||||||
@ -361,12 +322,11 @@ impl BatchKind {
|
|||||||
// we can autobatch the deletion and import if the index already exists
|
// we can autobatch the deletion and import if the index already exists
|
||||||
(
|
(
|
||||||
BatchKind::DocumentDeletion { mut deletion_ids, includes_by_filter: false },
|
BatchKind::DocumentDeletion { mut deletion_ids, includes_by_filter: false },
|
||||||
K::DocumentImport { method, allow_index_creation, primary_key }
|
K::DocumentImport { allow_index_creation, primary_key }
|
||||||
) if index_already_exists => {
|
) if index_already_exists => {
|
||||||
deletion_ids.push(id);
|
deletion_ids.push(id);
|
||||||
|
|
||||||
Continue(BatchKind::DocumentOperation {
|
Continue(BatchKind::DocumentOperation {
|
||||||
method,
|
|
||||||
allow_index_creation,
|
allow_index_creation,
|
||||||
primary_key,
|
primary_key,
|
||||||
operation_ids: deletion_ids,
|
operation_ids: deletion_ids,
|
||||||
@ -375,12 +335,11 @@ impl BatchKind {
|
|||||||
// we can autobatch the deletion and import if both can't create an index
|
// we can autobatch the deletion and import if both can't create an index
|
||||||
(
|
(
|
||||||
BatchKind::DocumentDeletion { mut deletion_ids, includes_by_filter: false },
|
BatchKind::DocumentDeletion { mut deletion_ids, includes_by_filter: false },
|
||||||
K::DocumentImport { method, allow_index_creation, primary_key }
|
K::DocumentImport { allow_index_creation, primary_key }
|
||||||
) if !allow_index_creation => {
|
) if !allow_index_creation => {
|
||||||
deletion_ids.push(id);
|
deletion_ids.push(id);
|
||||||
|
|
||||||
Continue(BatchKind::DocumentOperation {
|
Continue(BatchKind::DocumentOperation {
|
||||||
method,
|
|
||||||
allow_index_creation,
|
allow_index_creation,
|
||||||
primary_key,
|
primary_key,
|
||||||
operation_ids: deletion_ids,
|
operation_ids: deletion_ids,
|
||||||
|
@ -64,7 +64,6 @@ pub(crate) enum IndexOperation {
|
|||||||
DocumentOperation {
|
DocumentOperation {
|
||||||
index_uid: String,
|
index_uid: String,
|
||||||
primary_key: Option<String>,
|
primary_key: Option<String>,
|
||||||
method: IndexDocumentsMethod,
|
|
||||||
operations: Vec<DocumentOperation>,
|
operations: Vec<DocumentOperation>,
|
||||||
tasks: Vec<Task>,
|
tasks: Vec<Task>,
|
||||||
},
|
},
|
||||||
|
Loading…
x
Reference in New Issue
Block a user