mirror of
https://github.com/meilisearch/meilisearch.git
synced 2024-11-23 02:27:40 +08:00
Merge #4316
4316: Autobatch the task deletions r=curquiza a=irevoire # Pull Request ## Related issue Fix part of https://github.com/meilisearch/meilisearch-support/issues/69 Fix #4315 ## What does this PR do? - Autobatch the task deletions Co-authored-by: Tamo <tamo@meilisearch.com>
This commit is contained in:
commit
1ccde9bf0b
@ -60,7 +60,7 @@ pub(crate) enum Batch {
|
|||||||
/// The list of tasks that were processing when this task cancelation appeared.
|
/// The list of tasks that were processing when this task cancelation appeared.
|
||||||
previous_processing_tasks: RoaringBitmap,
|
previous_processing_tasks: RoaringBitmap,
|
||||||
},
|
},
|
||||||
TaskDeletion(Task),
|
TaskDeletions(Vec<Task>),
|
||||||
SnapshotCreation(Vec<Task>),
|
SnapshotCreation(Vec<Task>),
|
||||||
Dump(Task),
|
Dump(Task),
|
||||||
IndexOperation {
|
IndexOperation {
|
||||||
@ -146,13 +146,12 @@ impl Batch {
|
|||||||
pub fn ids(&self) -> Vec<TaskId> {
|
pub fn ids(&self) -> Vec<TaskId> {
|
||||||
match self {
|
match self {
|
||||||
Batch::TaskCancelation { task, .. }
|
Batch::TaskCancelation { task, .. }
|
||||||
| Batch::TaskDeletion(task)
|
|
||||||
| Batch::Dump(task)
|
| Batch::Dump(task)
|
||||||
| Batch::IndexCreation { task, .. }
|
| Batch::IndexCreation { task, .. }
|
||||||
| Batch::IndexUpdate { task, .. } => vec![task.uid],
|
| Batch::IndexUpdate { task, .. } => vec![task.uid],
|
||||||
Batch::SnapshotCreation(tasks) | Batch::IndexDeletion { tasks, .. } => {
|
Batch::SnapshotCreation(tasks)
|
||||||
tasks.iter().map(|task| task.uid).collect()
|
| Batch::TaskDeletions(tasks)
|
||||||
}
|
| Batch::IndexDeletion { tasks, .. } => tasks.iter().map(|task| task.uid).collect(),
|
||||||
Batch::IndexOperation { op, .. } => match op {
|
Batch::IndexOperation { op, .. } => match op {
|
||||||
IndexOperation::DocumentOperation { tasks, .. }
|
IndexOperation::DocumentOperation { tasks, .. }
|
||||||
| IndexOperation::Settings { tasks, .. }
|
| IndexOperation::Settings { tasks, .. }
|
||||||
@ -180,7 +179,7 @@ impl Batch {
|
|||||||
use Batch::*;
|
use Batch::*;
|
||||||
match self {
|
match self {
|
||||||
TaskCancelation { .. }
|
TaskCancelation { .. }
|
||||||
| TaskDeletion(_)
|
| TaskDeletions(_)
|
||||||
| SnapshotCreation(_)
|
| SnapshotCreation(_)
|
||||||
| Dump(_)
|
| Dump(_)
|
||||||
| IndexSwap { .. } => None,
|
| IndexSwap { .. } => None,
|
||||||
@ -199,7 +198,7 @@ impl fmt::Display for Batch {
|
|||||||
let tasks = self.ids();
|
let tasks = self.ids();
|
||||||
match self {
|
match self {
|
||||||
Batch::TaskCancelation { .. } => f.write_str("TaskCancelation")?,
|
Batch::TaskCancelation { .. } => f.write_str("TaskCancelation")?,
|
||||||
Batch::TaskDeletion(_) => f.write_str("TaskDeletion")?,
|
Batch::TaskDeletions(_) => f.write_str("TaskDeletion")?,
|
||||||
Batch::SnapshotCreation(_) => f.write_str("SnapshotCreation")?,
|
Batch::SnapshotCreation(_) => f.write_str("SnapshotCreation")?,
|
||||||
Batch::Dump(_) => f.write_str("Dump")?,
|
Batch::Dump(_) => f.write_str("Dump")?,
|
||||||
Batch::IndexOperation { op, .. } => write!(f, "{op}")?,
|
Batch::IndexOperation { op, .. } => write!(f, "{op}")?,
|
||||||
@ -539,9 +538,9 @@ impl IndexScheduler {
|
|||||||
|
|
||||||
// 2. we get the next task to delete
|
// 2. we get the next task to delete
|
||||||
let to_delete = self.get_kind(rtxn, Kind::TaskDeletion)? & enqueued;
|
let to_delete = self.get_kind(rtxn, Kind::TaskDeletion)? & enqueued;
|
||||||
if let Some(task_id) = to_delete.min() {
|
if !to_delete.is_empty() {
|
||||||
let task = self.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
|
let tasks = self.get_existing_tasks(rtxn, to_delete)?;
|
||||||
return Ok(Some(Batch::TaskDeletion(task)));
|
return Ok(Some(Batch::TaskDeletions(tasks)));
|
||||||
}
|
}
|
||||||
|
|
||||||
// 3. we batch the snapshot.
|
// 3. we batch the snapshot.
|
||||||
@ -681,31 +680,43 @@ impl IndexScheduler {
|
|||||||
|
|
||||||
Ok(vec![task])
|
Ok(vec![task])
|
||||||
}
|
}
|
||||||
Batch::TaskDeletion(mut task) => {
|
Batch::TaskDeletions(mut tasks) => {
|
||||||
// 1. Retrieve the tasks that matched the query at enqueue-time.
|
// 1. Retrieve the tasks that matched the query at enqueue-time.
|
||||||
let matched_tasks =
|
let mut matched_tasks = RoaringBitmap::new();
|
||||||
|
|
||||||
|
for task in tasks.iter() {
|
||||||
if let KindWithContent::TaskDeletion { tasks, query: _ } = &task.kind {
|
if let KindWithContent::TaskDeletion { tasks, query: _ } = &task.kind {
|
||||||
tasks
|
matched_tasks |= tasks;
|
||||||
} else {
|
} else {
|
||||||
unreachable!()
|
unreachable!()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut wtxn = self.env.write_txn()?;
|
||||||
|
let mut deleted_tasks = self.delete_matched_tasks(&mut wtxn, &matched_tasks)?;
|
||||||
|
wtxn.commit()?;
|
||||||
|
|
||||||
|
for task in tasks.iter_mut() {
|
||||||
|
task.status = Status::Succeeded;
|
||||||
|
let KindWithContent::TaskDeletion { tasks, query: _ } = &task.kind else {
|
||||||
|
unreachable!()
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut wtxn = self.env.write_txn()?;
|
let deleted_tasks_count = deleted_tasks.intersection_len(tasks);
|
||||||
let deleted_tasks_count = self.delete_matched_tasks(&mut wtxn, matched_tasks)?;
|
deleted_tasks -= tasks;
|
||||||
|
|
||||||
task.status = Status::Succeeded;
|
match &mut task.details {
|
||||||
match &mut task.details {
|
Some(Details::TaskDeletion {
|
||||||
Some(Details::TaskDeletion {
|
matched_tasks: _,
|
||||||
matched_tasks: _,
|
deleted_tasks,
|
||||||
deleted_tasks,
|
original_filter: _,
|
||||||
original_filter: _,
|
}) => {
|
||||||
}) => {
|
*deleted_tasks = Some(deleted_tasks_count);
|
||||||
*deleted_tasks = Some(deleted_tasks_count);
|
}
|
||||||
|
_ => unreachable!(),
|
||||||
}
|
}
|
||||||
_ => unreachable!(),
|
|
||||||
}
|
}
|
||||||
wtxn.commit()?;
|
Ok(tasks)
|
||||||
Ok(vec![task])
|
|
||||||
}
|
}
|
||||||
Batch::SnapshotCreation(mut tasks) => {
|
Batch::SnapshotCreation(mut tasks) => {
|
||||||
fs::create_dir_all(&self.snapshots_path)?;
|
fs::create_dir_all(&self.snapshots_path)?;
|
||||||
@ -1435,7 +1446,11 @@ impl IndexScheduler {
|
|||||||
/// Delete each given task from all the databases (if it is deleteable).
|
/// Delete each given task from all the databases (if it is deleteable).
|
||||||
///
|
///
|
||||||
/// Return the number of tasks that were actually deleted.
|
/// Return the number of tasks that were actually deleted.
|
||||||
fn delete_matched_tasks(&self, wtxn: &mut RwTxn, matched_tasks: &RoaringBitmap) -> Result<u64> {
|
fn delete_matched_tasks(
|
||||||
|
&self,
|
||||||
|
wtxn: &mut RwTxn,
|
||||||
|
matched_tasks: &RoaringBitmap,
|
||||||
|
) -> Result<RoaringBitmap> {
|
||||||
// 1. Remove from this list the tasks that we are not allowed to delete
|
// 1. Remove from this list the tasks that we are not allowed to delete
|
||||||
let enqueued_tasks = self.get_status(wtxn, Status::Enqueued)?;
|
let enqueued_tasks = self.get_status(wtxn, Status::Enqueued)?;
|
||||||
let processing_tasks = &self.processing_tasks.read().unwrap().processing.clone();
|
let processing_tasks = &self.processing_tasks.read().unwrap().processing.clone();
|
||||||
@ -1500,7 +1515,7 @@ impl IndexScheduler {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(to_delete_tasks.len())
|
Ok(to_delete_tasks)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Cancel each given task from all the databases (if it is cancelable).
|
/// Cancel each given task from all the databases (if it is cancelable).
|
||||||
|
@ -2244,10 +2244,7 @@ mod tests {
|
|||||||
.unwrap();
|
.unwrap();
|
||||||
index_scheduler.assert_internally_consistent();
|
index_scheduler.assert_internally_consistent();
|
||||||
}
|
}
|
||||||
for _ in 0..2 {
|
handle.advance_one_successful_batch();
|
||||||
handle.advance_one_successful_batch();
|
|
||||||
index_scheduler.assert_internally_consistent();
|
|
||||||
}
|
|
||||||
|
|
||||||
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "task_deletion_processed");
|
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "task_deletion_processed");
|
||||||
}
|
}
|
||||||
|
@ -34,12 +34,10 @@ catto: { number_of_documents: 1, field_distribution: {"id": 1} }
|
|||||||
[timestamp] [3,]
|
[timestamp] [3,]
|
||||||
----------------------------------------------------------------------
|
----------------------------------------------------------------------
|
||||||
### Started At:
|
### Started At:
|
||||||
[timestamp] [2,]
|
[timestamp] [2,3,]
|
||||||
[timestamp] [3,]
|
|
||||||
----------------------------------------------------------------------
|
----------------------------------------------------------------------
|
||||||
### Finished At:
|
### Finished At:
|
||||||
[timestamp] [2,]
|
[timestamp] [2,3,]
|
||||||
[timestamp] [3,]
|
|
||||||
----------------------------------------------------------------------
|
----------------------------------------------------------------------
|
||||||
### File Store:
|
### File Store:
|
||||||
00000000-0000-0000-0000-000000000001
|
00000000-0000-0000-0000-000000000001
|
||||||
|
Loading…
Reference in New Issue
Block a user