diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index a208214d8..28ec79e14 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -34,8 +34,9 @@ pub use error::Error; use meilisearch_types::milli::documents::DocumentsBatchBuilder; use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task}; -use utils::keep_tasks_within_datetimes; +use utils::{keep_tasks_within_datetimes, map_bound}; +use std::ops::{Bound, RangeBounds}; use std::path::PathBuf; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering::Relaxed; @@ -404,27 +405,41 @@ impl IndexScheduler { pub fn get_task_ids(&self, query: &Query) -> Result { let rtxn = self.env.read_txn()?; - // This is the list of all the tasks. + let ProcessingTasks { started_at: started_at_processing, processing: tasks_processing } = + self.processing_tasks.read().unwrap().clone(); + let mut tasks = self.all_task_ids(&rtxn)?; - if let Some(uids) = &query.uid { - tasks &= RoaringBitmap::from_iter(uids); - } - if let Some(status) = &query.status { + let mut include_processing_tasks = false; let mut status_tasks = RoaringBitmap::new(); for status in status { - status_tasks |= self.get_status(&rtxn, *status)?; + match status { + // special case for Processing tasks + Status::Processing => { + include_processing_tasks = true; + status_tasks |= &tasks_processing; + } + status => status_tasks |= &self.get_status(&rtxn, *status)?, + }; + } + if !include_processing_tasks { + tasks -= &tasks_processing; } tasks &= status_tasks; } + if let Some(uids) = &query.uid { + let uids = RoaringBitmap::from_iter(uids); + tasks &= &uids; + } + if let Some(kind) = &query.kind { let mut kind_tasks = RoaringBitmap::new(); for kind in kind { kind_tasks |= self.get_kind(&rtxn, *kind)?; } - tasks &= kind_tasks; + tasks &= &kind_tasks; } if let Some(index) = &query.index_uid { @@ -432,9 +447,51 @@ impl IndexScheduler { for index in index { index_tasks |= self.index_tasks(&rtxn, index)?; } - tasks &= index_tasks; + tasks &= &index_tasks; } + // For the started_at filter, we need to treat the part of the tasks that are processing from the part of the + // tasks that are not processing. The non-processing ones are filtered normally while the processing ones + // are entirely removed unless the in-memory startedAt variable falls within the date filter. + // Once we have filtered the two subsets, we put them back together and assign it back to `tasks`. + tasks = { + let (mut filtered_non_processing_tasks, mut filtered_processing_tasks) = + (&tasks - &tasks_processing, &tasks & &tasks_processing); + + // special case for Processing tasks + // in a loop because I want to break early if both query.after_started_at and query.before_started_at are None + // it doesn't actually loop + 'block: loop { + let bounds = match (query.after_started_at, query.before_started_at) { + (None, None) => break 'block, + (None, Some(before)) => (Bound::Unbounded, Bound::Excluded(before)), + (Some(after), None) => (Bound::Excluded(after), Bound::Unbounded), + (Some(after), Some(before)) => { + (Bound::Excluded(after), Bound::Excluded(before)) + } + }; + let start = map_bound(bounds.0, |b| b.unix_timestamp_nanos()); + let end = map_bound(bounds.1, |b| b.unix_timestamp_nanos()); + let is_within_dates = RangeBounds::contains( + &(start, end), + &started_at_processing.unix_timestamp_nanos(), + ); + if !is_within_dates { + filtered_processing_tasks.clear(); + } + break 'block; + } + + keep_tasks_within_datetimes( + &rtxn, + &mut filtered_non_processing_tasks, + self.started_at, + query.after_started_at, + query.before_started_at, + )?; + filtered_non_processing_tasks | filtered_processing_tasks + }; + keep_tasks_within_datetimes( &rtxn, &mut tasks, @@ -443,14 +500,6 @@ impl IndexScheduler { query.before_enqueued_at, )?; - keep_tasks_within_datetimes( - &rtxn, - &mut tasks, - self.started_at, - query.after_started_at, - query.before_started_at, - )?; - keep_tasks_within_datetimes( &rtxn, &mut tasks, @@ -831,6 +880,7 @@ mod tests { use meili_snap::snapshot; use meilisearch_types::milli::update::IndexDocumentsMethod::ReplaceDocuments; use tempfile::TempDir; + use time::Duration; use uuid::Uuid; use crate::snapshot::{snapshot_bitmap, snapshot_index_scheduler}; @@ -1463,16 +1513,172 @@ mod tests { #[test] fn query_processing_tasks() { + let start_time = OffsetDateTime::now_utc(); + let (index_scheduler, handle) = - IndexScheduler::test(true, vec![(1, FailureLocation::InsideCreateBatch)]); + IndexScheduler::test(true, vec![(3, FailureLocation::InsideProcessBatch)]); let kind = index_creation_task("catto", "mouse"); let _task = index_scheduler.register(kind).unwrap(); + let kind = index_creation_task("doggo", "sheep"); + let _task = index_scheduler.register(kind).unwrap(); + let kind = index_creation_task("whalo", "fish"); + let _task = index_scheduler.register(kind).unwrap(); + + snapshot!(snapshot_index_scheduler(&index_scheduler), name: "start"); handle.wait_till(Breakpoint::BatchCreated); + let query = Query { status: Some(vec![Status::Processing]), ..Default::default() }; - let processing_tasks = index_scheduler.get_task_ids(&query).unwrap(); - snapshot!(snapshot_bitmap(&processing_tasks), @"[0,]"); + let tasks = index_scheduler.get_task_ids(&query).unwrap(); + snapshot!(snapshot_bitmap(&tasks), @"[0,]"); // only the processing tasks in the first tick + + let query = Query { status: Some(vec![Status::Enqueued]), ..Default::default() }; + let tasks = index_scheduler.get_task_ids(&query).unwrap(); + snapshot!(snapshot_bitmap(&tasks), @"[1,2,]"); // only the enqueued tasks in the first tick + + let query = Query { + status: Some(vec![Status::Enqueued, Status::Processing]), + ..Default::default() + }; + let tasks = index_scheduler.get_task_ids(&query).unwrap(); + snapshot!(snapshot_bitmap(&tasks), @"[0,1,2,]"); // both enqueued and processing tasks in the first tick + + let query = Query { + status: Some(vec![Status::Enqueued, Status::Processing]), + after_started_at: Some(start_time), + ..Default::default() + }; + let tasks = index_scheduler.get_task_ids(&query).unwrap(); + // both enqueued and processing tasks in the first tick, but limited to those with a started_at + // that comes after the start of the test, which should excludes the enqueued tasks + snapshot!(snapshot_bitmap(&tasks), @"[0,]"); + + let query = Query { + status: Some(vec![Status::Enqueued, Status::Processing]), + before_started_at: Some(start_time), + ..Default::default() + }; + let tasks = index_scheduler.get_task_ids(&query).unwrap(); + // both enqueued and processing tasks in the first tick, but limited to those with a started_at + // that comes before the start of the test, which should excludes all of them + snapshot!(snapshot_bitmap(&tasks), @"[]"); + + let query = Query { + status: Some(vec![Status::Enqueued, Status::Processing]), + after_started_at: Some(start_time), + before_started_at: Some(start_time + Duration::minutes(1)), + ..Default::default() + }; + let tasks = index_scheduler.get_task_ids(&query).unwrap(); + // both enqueued and processing tasks in the first tick, but limited to those with a started_at + // that comes after the start of the test and before one minute after the start of the test, + // which should exclude the enqueued tasks and include the only processing task + snapshot!(snapshot_bitmap(&tasks), @"[0,]"); + + handle.wait_till(Breakpoint::BatchCreated); + + let second_start_time = OffsetDateTime::now_utc(); + + let query = Query { + status: Some(vec![Status::Succeeded, Status::Processing]), + after_started_at: Some(start_time), + before_started_at: Some(start_time + Duration::minutes(1)), + ..Default::default() + }; + let tasks = index_scheduler.get_task_ids(&query).unwrap(); + // both succeeded and processing tasks in the first tick, but limited to those with a started_at + // that comes after the start of the test and before one minute after the start of the test, + // which should include all tasks + snapshot!(snapshot_bitmap(&tasks), @"[0,1,]"); + + let query = Query { + status: Some(vec![Status::Succeeded, Status::Processing]), + before_started_at: Some(start_time), + ..Default::default() + }; + let tasks = index_scheduler.get_task_ids(&query).unwrap(); + // both succeeded and processing tasks in the first tick, but limited to those with a started_at + // that comes before the start of the test, which should exclude all tasks + snapshot!(snapshot_bitmap(&tasks), @"[]"); + + let query = Query { + status: Some(vec![Status::Enqueued, Status::Succeeded, Status::Processing]), + after_started_at: Some(second_start_time), + before_started_at: Some(second_start_time + Duration::minutes(1)), + ..Default::default() + }; + let tasks = index_scheduler.get_task_ids(&query).unwrap(); + // both succeeded and processing tasks in the first tick, but limited to those with a started_at + // that comes after the start of the second part of the test and before one minute after the + // second start of the test, which should exclude all tasks + snapshot!(snapshot_bitmap(&tasks), @"[]"); + + // now we make one more batch, the started_at field of the new tasks will be past `second_start_time` + handle.wait_till(Breakpoint::BatchCreated); + let tasks = index_scheduler.get_task_ids(&query).unwrap(); + // we run the same query to verify that, and indeed find that the last task is matched + snapshot!(snapshot_bitmap(&tasks), @"[2,]"); + + let query = Query { + status: Some(vec![Status::Enqueued, Status::Succeeded, Status::Processing]), + after_started_at: Some(second_start_time), + before_started_at: Some(second_start_time + Duration::minutes(1)), + ..Default::default() + }; + let tasks = index_scheduler.get_task_ids(&query).unwrap(); + // enqueued, succeeded, or processing tasks started after the second part of the test, should + // again only return the last task + snapshot!(snapshot_bitmap(&tasks), @"[2,]"); + + handle.wait_till(Breakpoint::AfterProcessing); + // now the last task should have failed + snapshot!(snapshot_index_scheduler(&index_scheduler), name: "end"); + let tasks = index_scheduler.get_task_ids(&query).unwrap(); + // so running the last query should return nothing + snapshot!(snapshot_bitmap(&tasks), @"[]"); + + let query = Query { + status: Some(vec![Status::Failed]), + after_started_at: Some(second_start_time), + before_started_at: Some(second_start_time + Duration::minutes(1)), + ..Default::default() + }; + let tasks = index_scheduler.get_task_ids(&query).unwrap(); + // but the same query on failed tasks should return the last task + snapshot!(snapshot_bitmap(&tasks), @"[2,]"); + + let query = Query { + status: Some(vec![Status::Failed]), + after_started_at: Some(second_start_time), + before_started_at: Some(second_start_time + Duration::minutes(1)), + ..Default::default() + }; + let tasks = index_scheduler.get_task_ids(&query).unwrap(); + // but the same query on failed tasks should return the last task + snapshot!(snapshot_bitmap(&tasks), @"[2,]"); + + let query = Query { + status: Some(vec![Status::Failed]), + uid: Some(vec![1]), + after_started_at: Some(second_start_time), + before_started_at: Some(second_start_time + Duration::minutes(1)), + ..Default::default() + }; + let tasks = index_scheduler.get_task_ids(&query).unwrap(); + // same query but with an invalid uid + snapshot!(snapshot_bitmap(&tasks), @"[]"); + + let query = Query { + status: Some(vec![Status::Failed]), + uid: Some(vec![2]), + after_started_at: Some(second_start_time), + before_started_at: Some(second_start_time + Duration::minutes(1)), + ..Default::default() + }; + let tasks = index_scheduler.get_task_ids(&query).unwrap(); + // same query but with a valid uid + snapshot!(snapshot_bitmap(&tasks), @"[2,]"); } #[test] diff --git a/index-scheduler/src/snapshots/lib.rs/query_processing_tasks/end.snap b/index-scheduler/src/snapshots/lib.rs/query_processing_tasks/end.snap new file mode 100644 index 000000000..6b7ec2a2a --- /dev/null +++ b/index-scheduler/src/snapshots/lib.rs/query_processing_tasks/end.snap @@ -0,0 +1,47 @@ +--- +source: index-scheduler/src/lib.rs +--- +### Autobatching Enabled = true +### Processing Tasks: +[] +---------------------------------------------------------------------- +### All Tasks: +0 {uid: 0, status: succeeded, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }} +1 {uid: 1, status: succeeded, details: { primary_key: Some("sheep") }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("sheep") }} +2 {uid: 2, status: failed, error: ResponseError { code: 200, message: "Corrupted task queue.", error_code: "internal", error_type: "internal", error_link: "https://docs.meilisearch.com/errors#internal" }, details: { primary_key: Some("fish") }, kind: IndexCreation { index_uid: "whalo", primary_key: Some("fish") }} +---------------------------------------------------------------------- +### Status: +enqueued [] +succeeded [0,1,] +failed [2,] +---------------------------------------------------------------------- +### Kind: +"indexCreation" [0,1,2,] +---------------------------------------------------------------------- +### Index Tasks: +catto [0,] +doggo [1,] +whalo [2,] +---------------------------------------------------------------------- +### Index Mapper: +["catto", "doggo"] +---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +---------------------------------------------------------------------- +### Started At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +---------------------------------------------------------------------- +### Finished At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +---------------------------------------------------------------------- +### File Store: + +---------------------------------------------------------------------- + diff --git a/index-scheduler/src/snapshots/lib.rs/query_processing_tasks/start.snap b/index-scheduler/src/snapshots/lib.rs/query_processing_tasks/start.snap new file mode 100644 index 000000000..60c8de558 --- /dev/null +++ b/index-scheduler/src/snapshots/lib.rs/query_processing_tasks/start.snap @@ -0,0 +1,39 @@ +--- +source: index-scheduler/src/lib.rs +--- +### Autobatching Enabled = true +### Processing Tasks: +[] +---------------------------------------------------------------------- +### All Tasks: +0 {uid: 0, status: enqueued, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }} +1 {uid: 1, status: enqueued, details: { primary_key: Some("sheep") }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("sheep") }} +2 {uid: 2, status: enqueued, details: { primary_key: Some("fish") }, kind: IndexCreation { index_uid: "whalo", primary_key: Some("fish") }} +---------------------------------------------------------------------- +### Status: +enqueued [0,1,2,] +---------------------------------------------------------------------- +### Kind: +"indexCreation" [0,1,2,] +---------------------------------------------------------------------- +### Index Tasks: +catto [0,] +doggo [1,] +whalo [2,] +---------------------------------------------------------------------- +### Index Mapper: +[] +---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +---------------------------------------------------------------------- +### Started At: +---------------------------------------------------------------------- +### Finished At: +---------------------------------------------------------------------- +### File Store: + +---------------------------------------------------------------------- + diff --git a/index-scheduler/src/utils.rs b/index-scheduler/src/utils.rs index ee24fc593..e14da4d53 100644 --- a/index-scheduler/src/utils.rs +++ b/index-scheduler/src/utils.rs @@ -236,7 +236,7 @@ pub(crate) fn keep_tasks_within_datetimes( } // TODO: remove when Bound::map ( https://github.com/rust-lang/rust/issues/86026 ) is available on stable -fn map_bound(bound: Bound, map: impl FnOnce(T) -> U) -> Bound { +pub(crate) fn map_bound(bound: Bound, map: impl FnOnce(T) -> U) -> Bound { match bound { Bound::Included(x) => Bound::Included(map(x)), Bound::Excluded(x) => Bound::Excluded(map(x)),