mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-01-19 01:18:31 +08:00
Fix (hopefully) queries that include processing tasks
This commit is contained in:
parent
c562e380a0
commit
44b8c8b713
@ -34,8 +34,9 @@ pub use error::Error;
|
||||
use meilisearch_types::milli::documents::DocumentsBatchBuilder;
|
||||
use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task};
|
||||
|
||||
use utils::keep_tasks_within_datetimes;
|
||||
use utils::{keep_tasks_within_datetimes, map_bound};
|
||||
|
||||
use std::ops::{Bound, RangeBounds};
|
||||
use std::path::PathBuf;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::atomic::Ordering::Relaxed;
|
||||
@ -404,27 +405,41 @@ impl IndexScheduler {
|
||||
pub fn get_task_ids(&self, query: &Query) -> Result<RoaringBitmap> {
|
||||
let rtxn = self.env.read_txn()?;
|
||||
|
||||
// This is the list of all the tasks.
|
||||
let ProcessingTasks { started_at: started_at_processing, processing: tasks_processing } =
|
||||
self.processing_tasks.read().unwrap().clone();
|
||||
|
||||
let mut tasks = self.all_task_ids(&rtxn)?;
|
||||
|
||||
if let Some(uids) = &query.uid {
|
||||
tasks &= RoaringBitmap::from_iter(uids);
|
||||
}
|
||||
|
||||
if let Some(status) = &query.status {
|
||||
let mut include_processing_tasks = false;
|
||||
let mut status_tasks = RoaringBitmap::new();
|
||||
for status in status {
|
||||
status_tasks |= self.get_status(&rtxn, *status)?;
|
||||
match status {
|
||||
// special case for Processing tasks
|
||||
Status::Processing => {
|
||||
include_processing_tasks = true;
|
||||
status_tasks |= &tasks_processing;
|
||||
}
|
||||
status => status_tasks |= &self.get_status(&rtxn, *status)?,
|
||||
};
|
||||
}
|
||||
if !include_processing_tasks {
|
||||
tasks -= &tasks_processing;
|
||||
}
|
||||
tasks &= status_tasks;
|
||||
}
|
||||
|
||||
if let Some(uids) = &query.uid {
|
||||
let uids = RoaringBitmap::from_iter(uids);
|
||||
tasks &= &uids;
|
||||
}
|
||||
|
||||
if let Some(kind) = &query.kind {
|
||||
let mut kind_tasks = RoaringBitmap::new();
|
||||
for kind in kind {
|
||||
kind_tasks |= self.get_kind(&rtxn, *kind)?;
|
||||
}
|
||||
tasks &= kind_tasks;
|
||||
tasks &= &kind_tasks;
|
||||
}
|
||||
|
||||
if let Some(index) = &query.index_uid {
|
||||
@ -432,9 +447,51 @@ impl IndexScheduler {
|
||||
for index in index {
|
||||
index_tasks |= self.index_tasks(&rtxn, index)?;
|
||||
}
|
||||
tasks &= index_tasks;
|
||||
tasks &= &index_tasks;
|
||||
}
|
||||
|
||||
// For the started_at filter, we need to treat the part of the tasks that are processing from the part of the
|
||||
// tasks that are not processing. The non-processing ones are filtered normally while the processing ones
|
||||
// are entirely removed unless the in-memory startedAt variable falls within the date filter.
|
||||
// Once we have filtered the two subsets, we put them back together and assign it back to `tasks`.
|
||||
tasks = {
|
||||
let (mut filtered_non_processing_tasks, mut filtered_processing_tasks) =
|
||||
(&tasks - &tasks_processing, &tasks & &tasks_processing);
|
||||
|
||||
// special case for Processing tasks
|
||||
// in a loop because I want to break early if both query.after_started_at and query.before_started_at are None
|
||||
// it doesn't actually loop
|
||||
'block: loop {
|
||||
let bounds = match (query.after_started_at, query.before_started_at) {
|
||||
(None, None) => break 'block,
|
||||
(None, Some(before)) => (Bound::Unbounded, Bound::Excluded(before)),
|
||||
(Some(after), None) => (Bound::Excluded(after), Bound::Unbounded),
|
||||
(Some(after), Some(before)) => {
|
||||
(Bound::Excluded(after), Bound::Excluded(before))
|
||||
}
|
||||
};
|
||||
let start = map_bound(bounds.0, |b| b.unix_timestamp_nanos());
|
||||
let end = map_bound(bounds.1, |b| b.unix_timestamp_nanos());
|
||||
let is_within_dates = RangeBounds::contains(
|
||||
&(start, end),
|
||||
&started_at_processing.unix_timestamp_nanos(),
|
||||
);
|
||||
if !is_within_dates {
|
||||
filtered_processing_tasks.clear();
|
||||
}
|
||||
break 'block;
|
||||
}
|
||||
|
||||
keep_tasks_within_datetimes(
|
||||
&rtxn,
|
||||
&mut filtered_non_processing_tasks,
|
||||
self.started_at,
|
||||
query.after_started_at,
|
||||
query.before_started_at,
|
||||
)?;
|
||||
filtered_non_processing_tasks | filtered_processing_tasks
|
||||
};
|
||||
|
||||
keep_tasks_within_datetimes(
|
||||
&rtxn,
|
||||
&mut tasks,
|
||||
@ -443,14 +500,6 @@ impl IndexScheduler {
|
||||
query.before_enqueued_at,
|
||||
)?;
|
||||
|
||||
keep_tasks_within_datetimes(
|
||||
&rtxn,
|
||||
&mut tasks,
|
||||
self.started_at,
|
||||
query.after_started_at,
|
||||
query.before_started_at,
|
||||
)?;
|
||||
|
||||
keep_tasks_within_datetimes(
|
||||
&rtxn,
|
||||
&mut tasks,
|
||||
@ -831,6 +880,7 @@ mod tests {
|
||||
use meili_snap::snapshot;
|
||||
use meilisearch_types::milli::update::IndexDocumentsMethod::ReplaceDocuments;
|
||||
use tempfile::TempDir;
|
||||
use time::Duration;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::snapshot::{snapshot_bitmap, snapshot_index_scheduler};
|
||||
@ -1463,16 +1513,172 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn query_processing_tasks() {
|
||||
let start_time = OffsetDateTime::now_utc();
|
||||
|
||||
let (index_scheduler, handle) =
|
||||
IndexScheduler::test(true, vec![(1, FailureLocation::InsideCreateBatch)]);
|
||||
IndexScheduler::test(true, vec![(3, FailureLocation::InsideProcessBatch)]);
|
||||
|
||||
let kind = index_creation_task("catto", "mouse");
|
||||
let _task = index_scheduler.register(kind).unwrap();
|
||||
let kind = index_creation_task("doggo", "sheep");
|
||||
let _task = index_scheduler.register(kind).unwrap();
|
||||
let kind = index_creation_task("whalo", "fish");
|
||||
let _task = index_scheduler.register(kind).unwrap();
|
||||
|
||||
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "start");
|
||||
|
||||
handle.wait_till(Breakpoint::BatchCreated);
|
||||
|
||||
let query = Query { status: Some(vec![Status::Processing]), ..Default::default() };
|
||||
let processing_tasks = index_scheduler.get_task_ids(&query).unwrap();
|
||||
snapshot!(snapshot_bitmap(&processing_tasks), @"[0,]");
|
||||
let tasks = index_scheduler.get_task_ids(&query).unwrap();
|
||||
snapshot!(snapshot_bitmap(&tasks), @"[0,]"); // only the processing tasks in the first tick
|
||||
|
||||
let query = Query { status: Some(vec![Status::Enqueued]), ..Default::default() };
|
||||
let tasks = index_scheduler.get_task_ids(&query).unwrap();
|
||||
snapshot!(snapshot_bitmap(&tasks), @"[1,2,]"); // only the enqueued tasks in the first tick
|
||||
|
||||
let query = Query {
|
||||
status: Some(vec![Status::Enqueued, Status::Processing]),
|
||||
..Default::default()
|
||||
};
|
||||
let tasks = index_scheduler.get_task_ids(&query).unwrap();
|
||||
snapshot!(snapshot_bitmap(&tasks), @"[0,1,2,]"); // both enqueued and processing tasks in the first tick
|
||||
|
||||
let query = Query {
|
||||
status: Some(vec![Status::Enqueued, Status::Processing]),
|
||||
after_started_at: Some(start_time),
|
||||
..Default::default()
|
||||
};
|
||||
let tasks = index_scheduler.get_task_ids(&query).unwrap();
|
||||
// both enqueued and processing tasks in the first tick, but limited to those with a started_at
|
||||
// that comes after the start of the test, which should excludes the enqueued tasks
|
||||
snapshot!(snapshot_bitmap(&tasks), @"[0,]");
|
||||
|
||||
let query = Query {
|
||||
status: Some(vec![Status::Enqueued, Status::Processing]),
|
||||
before_started_at: Some(start_time),
|
||||
..Default::default()
|
||||
};
|
||||
let tasks = index_scheduler.get_task_ids(&query).unwrap();
|
||||
// both enqueued and processing tasks in the first tick, but limited to those with a started_at
|
||||
// that comes before the start of the test, which should excludes all of them
|
||||
snapshot!(snapshot_bitmap(&tasks), @"[]");
|
||||
|
||||
let query = Query {
|
||||
status: Some(vec![Status::Enqueued, Status::Processing]),
|
||||
after_started_at: Some(start_time),
|
||||
before_started_at: Some(start_time + Duration::minutes(1)),
|
||||
..Default::default()
|
||||
};
|
||||
let tasks = index_scheduler.get_task_ids(&query).unwrap();
|
||||
// both enqueued and processing tasks in the first tick, but limited to those with a started_at
|
||||
// that comes after the start of the test and before one minute after the start of the test,
|
||||
// which should exclude the enqueued tasks and include the only processing task
|
||||
snapshot!(snapshot_bitmap(&tasks), @"[0,]");
|
||||
|
||||
handle.wait_till(Breakpoint::BatchCreated);
|
||||
|
||||
let second_start_time = OffsetDateTime::now_utc();
|
||||
|
||||
let query = Query {
|
||||
status: Some(vec![Status::Succeeded, Status::Processing]),
|
||||
after_started_at: Some(start_time),
|
||||
before_started_at: Some(start_time + Duration::minutes(1)),
|
||||
..Default::default()
|
||||
};
|
||||
let tasks = index_scheduler.get_task_ids(&query).unwrap();
|
||||
// both succeeded and processing tasks in the first tick, but limited to those with a started_at
|
||||
// that comes after the start of the test and before one minute after the start of the test,
|
||||
// which should include all tasks
|
||||
snapshot!(snapshot_bitmap(&tasks), @"[0,1,]");
|
||||
|
||||
let query = Query {
|
||||
status: Some(vec![Status::Succeeded, Status::Processing]),
|
||||
before_started_at: Some(start_time),
|
||||
..Default::default()
|
||||
};
|
||||
let tasks = index_scheduler.get_task_ids(&query).unwrap();
|
||||
// both succeeded and processing tasks in the first tick, but limited to those with a started_at
|
||||
// that comes before the start of the test, which should exclude all tasks
|
||||
snapshot!(snapshot_bitmap(&tasks), @"[]");
|
||||
|
||||
let query = Query {
|
||||
status: Some(vec![Status::Enqueued, Status::Succeeded, Status::Processing]),
|
||||
after_started_at: Some(second_start_time),
|
||||
before_started_at: Some(second_start_time + Duration::minutes(1)),
|
||||
..Default::default()
|
||||
};
|
||||
let tasks = index_scheduler.get_task_ids(&query).unwrap();
|
||||
// both succeeded and processing tasks in the first tick, but limited to those with a started_at
|
||||
// that comes after the start of the second part of the test and before one minute after the
|
||||
// second start of the test, which should exclude all tasks
|
||||
snapshot!(snapshot_bitmap(&tasks), @"[]");
|
||||
|
||||
// now we make one more batch, the started_at field of the new tasks will be past `second_start_time`
|
||||
handle.wait_till(Breakpoint::BatchCreated);
|
||||
let tasks = index_scheduler.get_task_ids(&query).unwrap();
|
||||
// we run the same query to verify that, and indeed find that the last task is matched
|
||||
snapshot!(snapshot_bitmap(&tasks), @"[2,]");
|
||||
|
||||
let query = Query {
|
||||
status: Some(vec![Status::Enqueued, Status::Succeeded, Status::Processing]),
|
||||
after_started_at: Some(second_start_time),
|
||||
before_started_at: Some(second_start_time + Duration::minutes(1)),
|
||||
..Default::default()
|
||||
};
|
||||
let tasks = index_scheduler.get_task_ids(&query).unwrap();
|
||||
// enqueued, succeeded, or processing tasks started after the second part of the test, should
|
||||
// again only return the last task
|
||||
snapshot!(snapshot_bitmap(&tasks), @"[2,]");
|
||||
|
||||
handle.wait_till(Breakpoint::AfterProcessing);
|
||||
// now the last task should have failed
|
||||
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "end");
|
||||
let tasks = index_scheduler.get_task_ids(&query).unwrap();
|
||||
// so running the last query should return nothing
|
||||
snapshot!(snapshot_bitmap(&tasks), @"[]");
|
||||
|
||||
let query = Query {
|
||||
status: Some(vec![Status::Failed]),
|
||||
after_started_at: Some(second_start_time),
|
||||
before_started_at: Some(second_start_time + Duration::minutes(1)),
|
||||
..Default::default()
|
||||
};
|
||||
let tasks = index_scheduler.get_task_ids(&query).unwrap();
|
||||
// but the same query on failed tasks should return the last task
|
||||
snapshot!(snapshot_bitmap(&tasks), @"[2,]");
|
||||
|
||||
let query = Query {
|
||||
status: Some(vec![Status::Failed]),
|
||||
after_started_at: Some(second_start_time),
|
||||
before_started_at: Some(second_start_time + Duration::minutes(1)),
|
||||
..Default::default()
|
||||
};
|
||||
let tasks = index_scheduler.get_task_ids(&query).unwrap();
|
||||
// but the same query on failed tasks should return the last task
|
||||
snapshot!(snapshot_bitmap(&tasks), @"[2,]");
|
||||
|
||||
let query = Query {
|
||||
status: Some(vec![Status::Failed]),
|
||||
uid: Some(vec![1]),
|
||||
after_started_at: Some(second_start_time),
|
||||
before_started_at: Some(second_start_time + Duration::minutes(1)),
|
||||
..Default::default()
|
||||
};
|
||||
let tasks = index_scheduler.get_task_ids(&query).unwrap();
|
||||
// same query but with an invalid uid
|
||||
snapshot!(snapshot_bitmap(&tasks), @"[]");
|
||||
|
||||
let query = Query {
|
||||
status: Some(vec![Status::Failed]),
|
||||
uid: Some(vec![2]),
|
||||
after_started_at: Some(second_start_time),
|
||||
before_started_at: Some(second_start_time + Duration::minutes(1)),
|
||||
..Default::default()
|
||||
};
|
||||
let tasks = index_scheduler.get_task_ids(&query).unwrap();
|
||||
// same query but with a valid uid
|
||||
snapshot!(snapshot_bitmap(&tasks), @"[2,]");
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -0,0 +1,47 @@
|
||||
---
|
||||
source: index-scheduler/src/lib.rs
|
||||
---
|
||||
### Autobatching Enabled = true
|
||||
### Processing Tasks:
|
||||
[]
|
||||
----------------------------------------------------------------------
|
||||
### All Tasks:
|
||||
0 {uid: 0, status: succeeded, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
|
||||
1 {uid: 1, status: succeeded, details: { primary_key: Some("sheep") }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("sheep") }}
|
||||
2 {uid: 2, status: failed, error: ResponseError { code: 200, message: "Corrupted task queue.", error_code: "internal", error_type: "internal", error_link: "https://docs.meilisearch.com/errors#internal" }, details: { primary_key: Some("fish") }, kind: IndexCreation { index_uid: "whalo", primary_key: Some("fish") }}
|
||||
----------------------------------------------------------------------
|
||||
### Status:
|
||||
enqueued []
|
||||
succeeded [0,1,]
|
||||
failed [2,]
|
||||
----------------------------------------------------------------------
|
||||
### Kind:
|
||||
"indexCreation" [0,1,2,]
|
||||
----------------------------------------------------------------------
|
||||
### Index Tasks:
|
||||
catto [0,]
|
||||
doggo [1,]
|
||||
whalo [2,]
|
||||
----------------------------------------------------------------------
|
||||
### Index Mapper:
|
||||
["catto", "doggo"]
|
||||
----------------------------------------------------------------------
|
||||
### Enqueued At:
|
||||
[timestamp] [0,]
|
||||
[timestamp] [1,]
|
||||
[timestamp] [2,]
|
||||
----------------------------------------------------------------------
|
||||
### Started At:
|
||||
[timestamp] [0,]
|
||||
[timestamp] [1,]
|
||||
[timestamp] [2,]
|
||||
----------------------------------------------------------------------
|
||||
### Finished At:
|
||||
[timestamp] [0,]
|
||||
[timestamp] [1,]
|
||||
[timestamp] [2,]
|
||||
----------------------------------------------------------------------
|
||||
### File Store:
|
||||
|
||||
----------------------------------------------------------------------
|
||||
|
@ -0,0 +1,39 @@
|
||||
---
|
||||
source: index-scheduler/src/lib.rs
|
||||
---
|
||||
### Autobatching Enabled = true
|
||||
### Processing Tasks:
|
||||
[]
|
||||
----------------------------------------------------------------------
|
||||
### All Tasks:
|
||||
0 {uid: 0, status: enqueued, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
|
||||
1 {uid: 1, status: enqueued, details: { primary_key: Some("sheep") }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("sheep") }}
|
||||
2 {uid: 2, status: enqueued, details: { primary_key: Some("fish") }, kind: IndexCreation { index_uid: "whalo", primary_key: Some("fish") }}
|
||||
----------------------------------------------------------------------
|
||||
### Status:
|
||||
enqueued [0,1,2,]
|
||||
----------------------------------------------------------------------
|
||||
### Kind:
|
||||
"indexCreation" [0,1,2,]
|
||||
----------------------------------------------------------------------
|
||||
### Index Tasks:
|
||||
catto [0,]
|
||||
doggo [1,]
|
||||
whalo [2,]
|
||||
----------------------------------------------------------------------
|
||||
### Index Mapper:
|
||||
[]
|
||||
----------------------------------------------------------------------
|
||||
### Enqueued At:
|
||||
[timestamp] [0,]
|
||||
[timestamp] [1,]
|
||||
[timestamp] [2,]
|
||||
----------------------------------------------------------------------
|
||||
### Started At:
|
||||
----------------------------------------------------------------------
|
||||
### Finished At:
|
||||
----------------------------------------------------------------------
|
||||
### File Store:
|
||||
|
||||
----------------------------------------------------------------------
|
||||
|
@ -236,7 +236,7 @@ pub(crate) fn keep_tasks_within_datetimes(
|
||||
}
|
||||
|
||||
// TODO: remove when Bound::map ( https://github.com/rust-lang/rust/issues/86026 ) is available on stable
|
||||
fn map_bound<T, U>(bound: Bound<T>, map: impl FnOnce(T) -> U) -> Bound<U> {
|
||||
pub(crate) fn map_bound<T, U>(bound: Bound<T>, map: impl FnOnce(T) -> U) -> Bound<U> {
|
||||
match bound {
|
||||
Bound::Included(x) => Bound::Included(map(x)),
|
||||
Bound::Excluded(x) => Bound::Excluded(map(x)),
|
||||
|
Loading…
Reference in New Issue
Block a user