diff --git a/crates/index-scheduler/src/batch.rs b/crates/index-scheduler/src/batch.rs index 40526622c..b43efd899 100644 --- a/crates/index-scheduler/src/batch.rs +++ b/crates/index-scheduler/src/batch.rs @@ -17,13 +17,14 @@ tasks individually, but should be much faster since we are only performing one indexing operation. */ -use std::collections::{BTreeSet, HashSet}; +use std::collections::{BTreeSet, HashMap, HashSet}; use std::ffi::OsStr; use std::fmt; use std::fs::{self, File}; use std::io::BufWriter; use dump::IndexMetadata; +use meilisearch_types::batches::BatchId; use meilisearch_types::error::Code; use meilisearch_types::heed::{RoTxn, RwTxn}; use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader}; @@ -1675,6 +1676,8 @@ impl IndexScheduler { let mut affected_statuses = HashSet::new(); let mut affected_kinds = HashSet::new(); let mut affected_canceled_by = RoaringBitmap::new(); + // The tasks that have been removed *per batches*. + let mut affected_batches: HashMap = HashMap::new(); for task_id in to_delete_tasks.iter() { let task = self.get_task(wtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?; @@ -1696,18 +1699,21 @@ impl IndexScheduler { if let Some(canceled_by) = task.canceled_by { affected_canceled_by.insert(canceled_by); } + if let Some(batch_uid) = task.batch_uid { + affected_batches.entry(batch_uid).or_default().insert(task_id); + } } - for index in affected_indexes { - self.update_index(wtxn, &index, |bitmap| *bitmap -= &to_delete_tasks)?; + for index in affected_indexes.iter() { + self.update_index(wtxn, index, |bitmap| *bitmap -= &to_delete_tasks)?; } - for status in affected_statuses { - self.update_status(wtxn, status, |bitmap| *bitmap -= &to_delete_tasks)?; + for status in affected_statuses.iter() { + self.update_status(wtxn, *status, |bitmap| *bitmap -= &to_delete_tasks)?; } - for kind in affected_kinds { - self.update_kind(wtxn, kind, |bitmap| *bitmap -= &to_delete_tasks)?; + for kind in affected_kinds.iter() { + self.update_kind(wtxn, *kind, |bitmap| *bitmap -= &to_delete_tasks)?; } for task in to_delete_tasks.iter() { @@ -1723,6 +1729,48 @@ impl IndexScheduler { } } } + for (batch_id, to_delete_tasks) in affected_batches { + if let Some(mut tasks) = self.batch_to_tasks_mapping.get(wtxn, &batch_id)? { + tasks -= &to_delete_tasks; + // We must remove the batch entirely + if tasks.is_empty() { + self.all_batches.delete(wtxn, &batch_id)?; + self.batch_to_tasks_mapping.delete(wtxn, &batch_id)?; + } + // Anyway, we must remove the batch from all its reverse indexes. + // The only way to do that is to check + + for index in affected_indexes.iter() { + let index_tasks = self.index_tasks(wtxn, index)?; + let remaining_index_tasks = index_tasks & &tasks; + if remaining_index_tasks.is_empty() { + self.update_batch_index(wtxn, index, |bitmap| { + bitmap.remove(batch_id); + })?; + } + } + + for status in affected_statuses.iter() { + let status_tasks = self.get_status(wtxn, *status)?; + let remaining_status_tasks = status_tasks & &tasks; + if remaining_status_tasks.is_empty() { + self.update_batch_status(wtxn, *status, |bitmap| { + bitmap.remove(batch_id); + })?; + } + } + + for kind in affected_kinds.iter() { + let kind_tasks = self.get_kind(wtxn, *kind)?; + let remaining_kind_tasks = kind_tasks & &tasks; + if remaining_kind_tasks.is_empty() { + self.update_batch_kind(wtxn, *kind, |bitmap| { + bitmap.remove(batch_id); + })?; + } + } + } + } Ok(to_delete_tasks) } diff --git a/crates/index-scheduler/src/snapshots/lib.rs/task_deletion_delete_same_task_twice/task_deletion_processed.snap b/crates/index-scheduler/src/snapshots/lib.rs/task_deletion_delete_same_task_twice/task_deletion_processed.snap index 9736e07a7..24844c3f8 100644 --- a/crates/index-scheduler/src/snapshots/lib.rs/task_deletion_delete_same_task_twice/task_deletion_processed.snap +++ b/crates/index-scheduler/src/snapshots/lib.rs/task_deletion_delete_same_task_twice/task_deletion_processed.snap @@ -41,22 +41,19 @@ catto: { number_of_documents: 1, field_distribution: {"id": 1} } [timestamp] [2,3,] ---------------------------------------------------------------------- ### All Batches: -0 {uid: 0, } 1 {uid: 1, } ---------------------------------------------------------------------- ### Batch to tasks mapping: -0 [0,] 1 [2,3,] ---------------------------------------------------------------------- ### Batches Status: -succeeded [0,1,] +succeeded [1,] ---------------------------------------------------------------------- ### Batches Kind: -"documentAdditionOrUpdate" [0,] +"documentAdditionOrUpdate" [] "taskDeletion" [1,] ---------------------------------------------------------------------- ### Batches Index Tasks: -catto [0,] ---------------------------------------------------------------------- ### Batches Enqueued At: [timestamp] [0,] diff --git a/crates/index-scheduler/src/snapshots/lib.rs/task_deletion_deleteable/task_deletion_processed.snap b/crates/index-scheduler/src/snapshots/lib.rs/task_deletion_deleteable/task_deletion_processed.snap index 6985fe1e5..97106df1b 100644 --- a/crates/index-scheduler/src/snapshots/lib.rs/task_deletion_deleteable/task_deletion_processed.snap +++ b/crates/index-scheduler/src/snapshots/lib.rs/task_deletion_deleteable/task_deletion_processed.snap @@ -39,22 +39,19 @@ catto: { number_of_documents: 1, field_distribution: {"id": 1} } [timestamp] [2,] ---------------------------------------------------------------------- ### All Batches: -0 {uid: 0, } 1 {uid: 1, } ---------------------------------------------------------------------- ### Batch to tasks mapping: -0 [0,] 1 [2,] ---------------------------------------------------------------------- ### Batches Status: -succeeded [0,1,] +succeeded [1,] ---------------------------------------------------------------------- ### Batches Kind: -"documentAdditionOrUpdate" [0,] +"documentAdditionOrUpdate" [] "taskDeletion" [1,] ---------------------------------------------------------------------- ### Batches Index Tasks: -catto [0,] ---------------------------------------------------------------------- ### Batches Enqueued At: [timestamp] [0,]