diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 20b6c7e1d..7a067933b 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -37,7 +37,7 @@ use roaring::RoaringBitmap; use time::OffsetDateTime; use uuid::Uuid; -use crate::autobatcher::BatchKind; +use crate::autobatcher::{self, BatchKind}; use crate::utils::{self, swap_index_uid_in_task}; use crate::{Error, IndexScheduler, Query, Result, TaskId}; @@ -419,41 +419,47 @@ impl IndexScheduler { ))); } - // 5. We take the next task and try to batch all the tasks associated with this index. - if let Some(task_id) = enqueued.min() { - let task = self.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?; + // 5. We make a batch from the unprioritised tasks. Start by taking the next enqueued task. + let task_id = if let Some(task_id) = enqueued.min() { task_id } else { return Ok(None) }; + let task = self.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?; - // This is safe because all the remaining task are associated with - // AT LEAST one index. We can use the right or left one it doesn't - // matter. - let index_name = task.indexes().unwrap()[0]; - let index_already_exists = self.index_mapper.exists(rtxn, index_name)?; + // If the task is not associated with any index, verify that it is an index swap and + // create the batch directly. Otherwise, get the index name associated with the task + // and use the autobatcher to batch the enqueued tasks associated with it - let index_tasks = self.index_tasks(rtxn, index_name)? & enqueued; + let index_name = if let Some(&index_name) = task.indexes().first() { + index_name + } else { + assert!(matches!(&task.kind, KindWithContent::IndexSwap { swaps } if swaps.is_empty())); + return Ok(Some(Batch::IndexSwap { task })); + }; - // If autobatching is disabled we only take one task at a time. - let tasks_limit = if self.autobatching_enabled { usize::MAX } else { 1 }; + let index_already_exists = self.index_mapper.exists(rtxn, index_name)?; - let enqueued = index_tasks - .into_iter() - .take(tasks_limit) - .map(|task_id| { - self.get_task(rtxn, task_id) - .and_then(|task| task.ok_or(Error::CorruptedTaskQueue)) - .map(|task| (task.uid, task.kind)) - }) - .collect::>>()?; + let index_tasks = self.index_tasks(rtxn, index_name)? & enqueued; - if let Some((batchkind, create_index)) = - crate::autobatcher::autobatch(enqueued, index_already_exists) - { - return self.create_next_batch_index( - rtxn, - index_name.to_string(), - batchkind, - create_index, - ); - } + // If autobatching is disabled we only take one task at a time. + let tasks_limit = if self.autobatching_enabled { usize::MAX } else { 1 }; + + let enqueued = index_tasks + .into_iter() + .take(tasks_limit) + .map(|task_id| { + self.get_task(rtxn, task_id) + .and_then(|task| task.ok_or(Error::CorruptedTaskQueue)) + .map(|task| (task.uid, task.kind)) + }) + .collect::>>()?; + + if let Some((batchkind, create_index)) = + autobatcher::autobatch(enqueued, index_already_exists) + { + return self.create_next_batch_index( + rtxn, + index_name.to_string(), + batchkind, + create_index, + ); } // If we found no tasks then we were notified for something that got autobatched @@ -1042,9 +1048,8 @@ impl IndexScheduler { for task_id in to_delete_tasks.iter() { let task = self.get_task(wtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?; - if let Some(task_indexes) = task.indexes() { - affected_indexes.extend(task_indexes.into_iter().map(|x| x.to_owned())); - } + + affected_indexes.extend(task.indexes().into_iter().map(|x| x.to_owned())); affected_statuses.insert(task.status); affected_kinds.insert(task.kind.as_kind()); // Note: don't delete the persisted task data since diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index c52813e06..75030fcbb 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -577,12 +577,10 @@ impl IndexScheduler { }; self.all_tasks.append(&mut wtxn, &BEU32::new(task.uid), &task)?; - if let Some(indexes) = task.indexes() { - for index in indexes { - self.update_index(&mut wtxn, index, |bitmap| { - bitmap.insert(task.uid); - })?; - } + for index in task.indexes() { + self.update_index(&mut wtxn, index, |bitmap| { + bitmap.insert(task.uid); + })?; } self.update_status(&mut wtxn, Status::Enqueued, |bitmap| { @@ -709,12 +707,10 @@ impl IndexScheduler { self.all_tasks.put(&mut wtxn, &BEU32::new(task.uid), &task)?; - if let Some(indexes) = task.indexes() { - for index in indexes { - self.update_index(&mut wtxn, index, |bitmap| { - bitmap.insert(task.uid); - })?; - } + for index in task.indexes() { + self.update_index(&mut wtxn, index, |bitmap| { + bitmap.insert(task.uid); + })?; } self.update_status(&mut wtxn, task.status, |bitmap| { @@ -1506,6 +1502,10 @@ mod tests { handle.wait_till(Breakpoint::AfterProcessing); index_scheduler.assert_internally_consistent(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "second_swap_processed"); + + index_scheduler.register(KindWithContent::IndexSwap { swaps: vec![] }).unwrap(); + handle.wait_till(Breakpoint::AfterProcessing); + snapshot!(snapshot_index_scheduler(&index_scheduler), name: "third_empty_swap_processed"); } #[test] diff --git a/index-scheduler/src/snapshots/lib.rs/swap_indexes/third_empty_swap_processed.snap b/index-scheduler/src/snapshots/lib.rs/swap_indexes/third_empty_swap_processed.snap new file mode 100644 index 000000000..31d0b4f5e --- /dev/null +++ b/index-scheduler/src/snapshots/lib.rs/swap_indexes/third_empty_swap_processed.snap @@ -0,0 +1,64 @@ +--- +source: index-scheduler/src/lib.rs +--- +### Autobatching Enabled = true +### Processing Tasks: +[] +---------------------------------------------------------------------- +### All Tasks: +0 {uid: 0, status: succeeded, details: { primary_key: Some("id") }, kind: IndexCreation { index_uid: "b", primary_key: Some("id") }} +1 {uid: 1, status: succeeded, details: { primary_key: Some("id") }, kind: IndexCreation { index_uid: "c", primary_key: Some("id") }} +2 {uid: 2, status: succeeded, details: { primary_key: Some("id") }, kind: IndexCreation { index_uid: "d", primary_key: Some("id") }} +3 {uid: 3, status: succeeded, details: { primary_key: Some("id") }, kind: IndexCreation { index_uid: "a", primary_key: Some("id") }} +4 {uid: 4, status: succeeded, details: { indexes: [("a", "b"), ("c", "d")] }, kind: IndexSwap { swaps: [("c", "b"), ("a", "d")] }} +5 {uid: 5, status: succeeded, details: { indexes: [("a", "c")] }, kind: IndexSwap { swaps: [("a", "c")] }} +6 {uid: 6, status: succeeded, details: { indexes: [] }, kind: IndexSwap { swaps: [] }} +---------------------------------------------------------------------- +### Status: +enqueued [] +succeeded [0,1,2,3,4,5,6,] +---------------------------------------------------------------------- +### Kind: +"indexCreation" [0,1,2,3,] +"indexSwap" [4,5,6,] +---------------------------------------------------------------------- +### Index Tasks: +a [3,4,5,] +b [0,4,] +c [1,4,5,] +d [2,4,] +---------------------------------------------------------------------- +### Index Mapper: +["a", "b", "c", "d"] +---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +[timestamp] [3,] +[timestamp] [4,] +[timestamp] [5,] +[timestamp] [6,] +---------------------------------------------------------------------- +### Started At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +[timestamp] [3,] +[timestamp] [4,] +[timestamp] [5,] +[timestamp] [6,] +---------------------------------------------------------------------- +### Finished At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +[timestamp] [3,] +[timestamp] [4,] +[timestamp] [5,] +[timestamp] [6,] +---------------------------------------------------------------------- +### File Store: + +---------------------------------------------------------------------- + diff --git a/meilisearch-types/src/tasks.rs b/meilisearch-types/src/tasks.rs index 087980a11..2beb25a06 100644 --- a/meilisearch-types/src/tasks.rs +++ b/meilisearch-types/src/tasks.rs @@ -57,7 +57,7 @@ impl Task { } /// Return the list of indexes updated by this tasks. - pub fn indexes(&self) -> Option> { + pub fn indexes(&self) -> Vec<&str> { self.kind.indexes() } @@ -154,25 +154,25 @@ impl KindWithContent { } } - pub fn indexes(&self) -> Option> { + pub fn indexes(&self) -> Vec<&str> { use KindWithContent::*; match self { - DumpCreation { .. } | Snapshot | TaskCancelation { .. } | TaskDeletion { .. } => None, + DumpCreation { .. } | Snapshot | TaskCancelation { .. } | TaskDeletion { .. } => vec![], DocumentAdditionOrUpdate { index_uid, .. } | DocumentDeletion { index_uid, .. } | DocumentClear { index_uid } | SettingsUpdate { index_uid, .. } | IndexCreation { index_uid, .. } | IndexUpdate { index_uid, .. } - | IndexDeletion { index_uid } => Some(vec![index_uid]), + | IndexDeletion { index_uid } => vec![index_uid], IndexSwap { swaps } => { let mut indexes = HashSet::<&str>::default(); for (lhs, rhs) in swaps { indexes.insert(lhs.as_str()); indexes.insert(rhs.as_str()); } - Some(indexes.into_iter().collect()) + indexes.into_iter().collect() } } }