autobatch the task deletions

This commit is contained in:
Tamo 2024-01-11 14:44:29 +01:00
parent 5204c0b60b
commit b4d7d80ad9
No known key found for this signature in database
GPG Key ID: 20CD8020AFA88D69
3 changed files with 46 additions and 36 deletions

View File

@ -60,7 +60,7 @@ pub(crate) enum Batch {
/// The list of tasks that were processing when this task cancelation appeared. /// The list of tasks that were processing when this task cancelation appeared.
previous_processing_tasks: RoaringBitmap, previous_processing_tasks: RoaringBitmap,
}, },
TaskDeletion(Task), TaskDeletions(Vec<Task>),
SnapshotCreation(Vec<Task>), SnapshotCreation(Vec<Task>),
Dump(Task), Dump(Task),
IndexOperation { IndexOperation {
@ -146,13 +146,12 @@ impl Batch {
pub fn ids(&self) -> Vec<TaskId> { pub fn ids(&self) -> Vec<TaskId> {
match self { match self {
Batch::TaskCancelation { task, .. } Batch::TaskCancelation { task, .. }
| Batch::TaskDeletion(task)
| Batch::Dump(task) | Batch::Dump(task)
| Batch::IndexCreation { task, .. } | Batch::IndexCreation { task, .. }
| Batch::IndexUpdate { task, .. } => vec![task.uid], | Batch::IndexUpdate { task, .. } => vec![task.uid],
Batch::SnapshotCreation(tasks) | Batch::IndexDeletion { tasks, .. } => { Batch::SnapshotCreation(tasks)
tasks.iter().map(|task| task.uid).collect() | Batch::TaskDeletions(tasks)
} | Batch::IndexDeletion { tasks, .. } => tasks.iter().map(|task| task.uid).collect(),
Batch::IndexOperation { op, .. } => match op { Batch::IndexOperation { op, .. } => match op {
IndexOperation::DocumentOperation { tasks, .. } IndexOperation::DocumentOperation { tasks, .. }
| IndexOperation::Settings { tasks, .. } | IndexOperation::Settings { tasks, .. }
@ -180,7 +179,7 @@ impl Batch {
use Batch::*; use Batch::*;
match self { match self {
TaskCancelation { .. } TaskCancelation { .. }
| TaskDeletion(_) | TaskDeletions(_)
| SnapshotCreation(_) | SnapshotCreation(_)
| Dump(_) | Dump(_)
| IndexSwap { .. } => None, | IndexSwap { .. } => None,
@ -199,7 +198,7 @@ impl fmt::Display for Batch {
let tasks = self.ids(); let tasks = self.ids();
match self { match self {
Batch::TaskCancelation { .. } => f.write_str("TaskCancelation")?, Batch::TaskCancelation { .. } => f.write_str("TaskCancelation")?,
Batch::TaskDeletion(_) => f.write_str("TaskDeletion")?, Batch::TaskDeletions(_) => f.write_str("TaskDeletion")?,
Batch::SnapshotCreation(_) => f.write_str("SnapshotCreation")?, Batch::SnapshotCreation(_) => f.write_str("SnapshotCreation")?,
Batch::Dump(_) => f.write_str("Dump")?, Batch::Dump(_) => f.write_str("Dump")?,
Batch::IndexOperation { op, .. } => write!(f, "{op}")?, Batch::IndexOperation { op, .. } => write!(f, "{op}")?,
@ -539,9 +538,9 @@ impl IndexScheduler {
// 2. we get the next task to delete // 2. we get the next task to delete
let to_delete = self.get_kind(rtxn, Kind::TaskDeletion)? & enqueued; let to_delete = self.get_kind(rtxn, Kind::TaskDeletion)? & enqueued;
if let Some(task_id) = to_delete.min() { if !to_delete.is_empty() {
let task = self.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?; let tasks = self.get_existing_tasks(rtxn, to_delete)?;
return Ok(Some(Batch::TaskDeletion(task))); return Ok(Some(Batch::TaskDeletions(tasks)));
} }
// 3. we batch the snapshot. // 3. we batch the snapshot.
@ -681,19 +680,31 @@ impl IndexScheduler {
Ok(vec![task]) Ok(vec![task])
} }
Batch::TaskDeletion(mut task) => { Batch::TaskDeletions(mut tasks) => {
// 1. Retrieve the tasks that matched the query at enqueue-time. // 1. Retrieve the tasks that matched the query at enqueue-time.
let matched_tasks = let mut matched_tasks = RoaringBitmap::new();
for task in tasks.iter() {
if let KindWithContent::TaskDeletion { tasks, query: _ } = &task.kind { if let KindWithContent::TaskDeletion { tasks, query: _ } = &task.kind {
tasks matched_tasks |= tasks;
} else { } else {
unreachable!() unreachable!()
}
}
let mut wtxn = self.env.write_txn()?;
let mut deleted_tasks = self.delete_matched_tasks(&mut wtxn, &matched_tasks)?;
wtxn.commit()?;
for task in tasks.iter_mut() {
task.status = Status::Succeeded;
let KindWithContent::TaskDeletion { tasks, query: _ } = &task.kind else {
unreachable!()
}; };
let mut wtxn = self.env.write_txn()?; let deleted_tasks_count = deleted_tasks.intersection_len(tasks);
let deleted_tasks_count = self.delete_matched_tasks(&mut wtxn, matched_tasks)?; deleted_tasks -= tasks;
task.status = Status::Succeeded;
match &mut task.details { match &mut task.details {
Some(Details::TaskDeletion { Some(Details::TaskDeletion {
matched_tasks: _, matched_tasks: _,
@ -704,8 +715,8 @@ impl IndexScheduler {
} }
_ => unreachable!(), _ => unreachable!(),
} }
wtxn.commit()?; }
Ok(vec![task]) Ok(tasks)
} }
Batch::SnapshotCreation(mut tasks) => { Batch::SnapshotCreation(mut tasks) => {
fs::create_dir_all(&self.snapshots_path)?; fs::create_dir_all(&self.snapshots_path)?;
@ -1438,7 +1449,11 @@ impl IndexScheduler {
/// Delete each given task from all the databases (if it is deleteable). /// Delete each given task from all the databases (if it is deleteable).
/// ///
/// Return the number of tasks that were actually deleted. /// Return the number of tasks that were actually deleted.
fn delete_matched_tasks(&self, wtxn: &mut RwTxn, matched_tasks: &RoaringBitmap) -> Result<u64> { fn delete_matched_tasks(
&self,
wtxn: &mut RwTxn,
matched_tasks: &RoaringBitmap,
) -> Result<RoaringBitmap> {
// 1. Remove from this list the tasks that we are not allowed to delete // 1. Remove from this list the tasks that we are not allowed to delete
let enqueued_tasks = self.get_status(wtxn, Status::Enqueued)?; let enqueued_tasks = self.get_status(wtxn, Status::Enqueued)?;
let processing_tasks = &self.processing_tasks.read().unwrap().processing.clone(); let processing_tasks = &self.processing_tasks.read().unwrap().processing.clone();
@ -1503,7 +1518,7 @@ impl IndexScheduler {
} }
} }
Ok(to_delete_tasks.len()) Ok(to_delete_tasks)
} }
/// Cancel each given task from all the databases (if it is cancelable). /// Cancel each given task from all the databases (if it is cancelable).

View File

@ -2158,10 +2158,7 @@ mod tests {
.unwrap(); .unwrap();
index_scheduler.assert_internally_consistent(); index_scheduler.assert_internally_consistent();
} }
for _ in 0..2 {
handle.advance_one_successful_batch(); handle.advance_one_successful_batch();
index_scheduler.assert_internally_consistent();
}
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "task_deletion_processed"); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "task_deletion_processed");
} }

View File

@ -34,12 +34,10 @@ catto: { number_of_documents: 1, field_distribution: {"id": 1} }
[timestamp] [3,] [timestamp] [3,]
---------------------------------------------------------------------- ----------------------------------------------------------------------
### Started At: ### Started At:
[timestamp] [2,] [timestamp] [2,3,]
[timestamp] [3,]
---------------------------------------------------------------------- ----------------------------------------------------------------------
### Finished At: ### Finished At:
[timestamp] [2,] [timestamp] [2,3,]
[timestamp] [3,]
---------------------------------------------------------------------- ----------------------------------------------------------------------
### File Store: ### File Store:
00000000-0000-0000-0000-000000000001 00000000-0000-0000-0000-000000000001