diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 97dc7a2bd..01b0ddc1e 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -60,7 +60,7 @@ pub(crate) enum Batch { /// The list of tasks that were processing when this task cancelation appeared. previous_processing_tasks: RoaringBitmap, }, - TaskDeletion(Task), + TaskDeletions(Vec), SnapshotCreation(Vec), Dump(Task), IndexOperation { @@ -146,13 +146,12 @@ impl Batch { pub fn ids(&self) -> Vec { match self { Batch::TaskCancelation { task, .. } - | Batch::TaskDeletion(task) | Batch::Dump(task) | Batch::IndexCreation { task, .. } | Batch::IndexUpdate { task, .. } => vec![task.uid], - Batch::SnapshotCreation(tasks) | Batch::IndexDeletion { tasks, .. } => { - tasks.iter().map(|task| task.uid).collect() - } + Batch::SnapshotCreation(tasks) + | Batch::TaskDeletions(tasks) + | Batch::IndexDeletion { tasks, .. } => tasks.iter().map(|task| task.uid).collect(), Batch::IndexOperation { op, .. } => match op { IndexOperation::DocumentOperation { tasks, .. } | IndexOperation::Settings { tasks, .. } @@ -180,7 +179,7 @@ impl Batch { use Batch::*; match self { TaskCancelation { .. } - | TaskDeletion(_) + | TaskDeletions(_) | SnapshotCreation(_) | Dump(_) | IndexSwap { .. } => None, @@ -199,7 +198,7 @@ impl fmt::Display for Batch { let tasks = self.ids(); match self { Batch::TaskCancelation { .. } => f.write_str("TaskCancelation")?, - Batch::TaskDeletion(_) => f.write_str("TaskDeletion")?, + Batch::TaskDeletions(_) => f.write_str("TaskDeletion")?, Batch::SnapshotCreation(_) => f.write_str("SnapshotCreation")?, Batch::Dump(_) => f.write_str("Dump")?, Batch::IndexOperation { op, .. } => write!(f, "{op}")?, @@ -539,9 +538,9 @@ impl IndexScheduler { // 2. we get the next task to delete let to_delete = self.get_kind(rtxn, Kind::TaskDeletion)? & enqueued; - if let Some(task_id) = to_delete.min() { - let task = self.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?; - return Ok(Some(Batch::TaskDeletion(task))); + if !to_delete.is_empty() { + let tasks = self.get_existing_tasks(rtxn, to_delete)?; + return Ok(Some(Batch::TaskDeletions(tasks))); } // 3. we batch the snapshot. @@ -681,31 +680,43 @@ impl IndexScheduler { Ok(vec![task]) } - Batch::TaskDeletion(mut task) => { + Batch::TaskDeletions(mut tasks) => { // 1. Retrieve the tasks that matched the query at enqueue-time. - let matched_tasks = + let mut matched_tasks = RoaringBitmap::new(); + + for task in tasks.iter() { if let KindWithContent::TaskDeletion { tasks, query: _ } = &task.kind { - tasks + matched_tasks |= tasks; } else { unreachable!() + } + } + + let mut wtxn = self.env.write_txn()?; + let mut deleted_tasks = self.delete_matched_tasks(&mut wtxn, &matched_tasks)?; + wtxn.commit()?; + + for task in tasks.iter_mut() { + task.status = Status::Succeeded; + let KindWithContent::TaskDeletion { tasks, query: _ } = &task.kind else { + unreachable!() }; - let mut wtxn = self.env.write_txn()?; - let deleted_tasks_count = self.delete_matched_tasks(&mut wtxn, matched_tasks)?; + let deleted_tasks_count = deleted_tasks.intersection_len(tasks); + deleted_tasks -= tasks; - task.status = Status::Succeeded; - match &mut task.details { - Some(Details::TaskDeletion { - matched_tasks: _, - deleted_tasks, - original_filter: _, - }) => { - *deleted_tasks = Some(deleted_tasks_count); + match &mut task.details { + Some(Details::TaskDeletion { + matched_tasks: _, + deleted_tasks, + original_filter: _, + }) => { + *deleted_tasks = Some(deleted_tasks_count); + } + _ => unreachable!(), } - _ => unreachable!(), } - wtxn.commit()?; - Ok(vec![task]) + Ok(tasks) } Batch::SnapshotCreation(mut tasks) => { fs::create_dir_all(&self.snapshots_path)?; @@ -1435,7 +1446,11 @@ impl IndexScheduler { /// Delete each given task from all the databases (if it is deleteable). /// /// Return the number of tasks that were actually deleted. - fn delete_matched_tasks(&self, wtxn: &mut RwTxn, matched_tasks: &RoaringBitmap) -> Result { + fn delete_matched_tasks( + &self, + wtxn: &mut RwTxn, + matched_tasks: &RoaringBitmap, + ) -> Result { // 1. Remove from this list the tasks that we are not allowed to delete let enqueued_tasks = self.get_status(wtxn, Status::Enqueued)?; let processing_tasks = &self.processing_tasks.read().unwrap().processing.clone(); @@ -1500,7 +1515,7 @@ impl IndexScheduler { } } - Ok(to_delete_tasks.len()) + Ok(to_delete_tasks) } /// Cancel each given task from all the databases (if it is cancelable). diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index a5b0cb5b0..946a2a33e 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -2244,10 +2244,7 @@ mod tests { .unwrap(); index_scheduler.assert_internally_consistent(); } - for _ in 0..2 { - handle.advance_one_successful_batch(); - index_scheduler.assert_internally_consistent(); - } + handle.advance_one_successful_batch(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "task_deletion_processed"); } diff --git a/index-scheduler/src/snapshots/lib.rs/task_deletion_delete_same_task_twice/task_deletion_processed.snap b/index-scheduler/src/snapshots/lib.rs/task_deletion_delete_same_task_twice/task_deletion_processed.snap index c47a7a95f..83cfcdf07 100644 --- a/index-scheduler/src/snapshots/lib.rs/task_deletion_delete_same_task_twice/task_deletion_processed.snap +++ b/index-scheduler/src/snapshots/lib.rs/task_deletion_delete_same_task_twice/task_deletion_processed.snap @@ -34,12 +34,10 @@ catto: { number_of_documents: 1, field_distribution: {"id": 1} } [timestamp] [3,] ---------------------------------------------------------------------- ### Started At: -[timestamp] [2,] -[timestamp] [3,] +[timestamp] [2,3,] ---------------------------------------------------------------------- ### Finished At: -[timestamp] [2,] -[timestamp] [3,] +[timestamp] [2,3,] ---------------------------------------------------------------------- ### File Store: 00000000-0000-0000-0000-000000000001