2982: Adapt task queries to account for special index swap rules r=irevoire a=loiclec

# Pull Request

## Related issue
Fixes https://github.com/meilisearch/meilisearch/issues/2970 

## What does this PR do?
- Replace the `get_tasks` method with a `get_tasks_from_authorized_indexes` which returns the list of tasks matched by the query **from the point of view of the user**. That is, it takes into consideration the list of authorised indexes as well as the special case of `IndexSwap` which should not be returned if an index_uid is specified or if any of its associated indexes are not authorised.
- Adapt the code in other places following this change
- Add some tests
- Also the method `get_task_ids_from_authorized_indexes` now takes a read transaction as argument. This is because we want to make sure that the implementation of `get_tasks_from_authorized_indexes` only uses one read transaction. Otherwise, we could (1) get a list of task ids matching the query, then (2) one of these task ids is deleted by a taskDeletion task, and finally (3) we try to get the `Task`s associated with each returned task ids, and get a `CorruptedTaskQueue` error.



Co-authored-by: Loïc Lecrenier <loic.lecrenier@me.com>
This commit is contained in:
bors[bot] 2022-10-27 14:28:04 +00:00 committed by GitHub
commit d16ea755d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 457 additions and 133 deletions

View File

@ -41,7 +41,7 @@ use uuid::Uuid;
use crate::autobatcher::{self, BatchKind}; use crate::autobatcher::{self, BatchKind};
use crate::utils::{self, swap_index_uid_in_task}; use crate::utils::{self, swap_index_uid_in_task};
use crate::{Error, IndexScheduler, Query, Result, TaskId}; use crate::{Error, IndexScheduler, Result, TaskId};
/// Represents a combination of tasks that can all be processed at the same time. /// Represents a combination of tasks that can all be processed at the same time.
/// ///
@ -854,12 +854,10 @@ impl IndexScheduler {
return Err(Error::IndexNotFound(rhs.to_owned())); return Err(Error::IndexNotFound(rhs.to_owned()));
} }
// 2. Get the task set for index = name. // 2. Get the task set for index = name that appeared before the index swap task
let mut index_lhs_task_ids = let mut index_lhs_task_ids = self.index_tasks(wtxn, lhs)?;
self.get_task_ids(&Query::default().with_index(lhs.to_owned()))?;
index_lhs_task_ids.remove_range(task_id..); index_lhs_task_ids.remove_range(task_id..);
let mut index_rhs_task_ids = let mut index_rhs_task_ids = self.index_tasks(wtxn, rhs)?;
self.get_task_ids(&Query::default().with_index(rhs.to_owned()))?;
index_rhs_task_ids.remove_range(task_id..); index_rhs_task_ids.remove_range(task_id..);
// 3. before_name -> new_name in the task's KindWithContent // 3. before_name -> new_name in the task's KindWithContent

View File

@ -41,7 +41,7 @@ pub use error::Error;
use file_store::FileStore; use file_store::FileStore;
use meilisearch_types::error::ResponseError; use meilisearch_types::error::ResponseError;
use meilisearch_types::heed::types::{OwnedType, SerdeBincode, SerdeJson, Str}; use meilisearch_types::heed::types::{OwnedType, SerdeBincode, SerdeJson, Str};
use meilisearch_types::heed::{self, Database, Env}; use meilisearch_types::heed::{self, Database, Env, RoTxn};
use meilisearch_types::milli; use meilisearch_types::milli;
use meilisearch_types::milli::documents::DocumentsBatchBuilder; use meilisearch_types::milli::documents::DocumentsBatchBuilder;
use meilisearch_types::milli::update::IndexerConfig; use meilisearch_types::milli::update::IndexerConfig;
@ -100,7 +100,7 @@ pub struct Query {
} }
impl Query { impl Query {
/// Return `true` iff every field of the query is set to `None`, such that the query /// Return `true` if every field of the query is set to `None`, such that the query
/// matches all tasks. /// matches all tasks.
pub fn is_empty(&self) -> bool { pub fn is_empty(&self) -> bool {
matches!( matches!(
@ -393,6 +393,10 @@ impl IndexScheduler {
Ok(this) Ok(this)
} }
pub fn read_txn(&self) -> Result<RoTxn> {
self.env.read_txn().map_err(|e| e.into())
}
/// Start the run loop for the given index scheduler. /// Start the run loop for the given index scheduler.
/// ///
/// This function will execute in a different thread and must be called /// This function will execute in a different thread and must be called
@ -442,14 +446,12 @@ impl IndexScheduler {
self.index_mapper.indexes(&rtxn) self.index_mapper.indexes(&rtxn)
} }
/// Return the task ids matched by the given query. /// Return the task ids matched by the given query from the index scheduler's point of view.
pub fn get_task_ids(&self, query: &Query) -> Result<RoaringBitmap> { pub(crate) fn get_task_ids(&self, rtxn: &RoTxn, query: &Query) -> Result<RoaringBitmap> {
let rtxn = self.env.read_txn()?;
let ProcessingTasks { started_at: started_at_processing, processing: processing_tasks } = let ProcessingTasks { started_at: started_at_processing, processing: processing_tasks } =
self.processing_tasks.read().unwrap().clone(); self.processing_tasks.read().unwrap().clone();
let mut tasks = self.all_task_ids(&rtxn)?; let mut tasks = self.all_task_ids(rtxn)?;
if let Some(from) = &query.from { if let Some(from) = &query.from {
tasks.remove_range(from.saturating_add(1)..); tasks.remove_range(from.saturating_add(1)..);
@ -463,7 +465,7 @@ impl IndexScheduler {
Status::Processing => { Status::Processing => {
status_tasks |= &processing_tasks; status_tasks |= &processing_tasks;
} }
status => status_tasks |= &self.get_status(&rtxn, *status)?, status => status_tasks |= &self.get_status(rtxn, *status)?,
}; };
} }
if !status.contains(&Status::Processing) { if !status.contains(&Status::Processing) {
@ -480,7 +482,7 @@ impl IndexScheduler {
if let Some(kind) = &query.kind { if let Some(kind) = &query.kind {
let mut kind_tasks = RoaringBitmap::new(); let mut kind_tasks = RoaringBitmap::new();
for kind in kind { for kind in kind {
kind_tasks |= self.get_kind(&rtxn, *kind)?; kind_tasks |= self.get_kind(rtxn, *kind)?;
} }
tasks &= &kind_tasks; tasks &= &kind_tasks;
} }
@ -488,7 +490,7 @@ impl IndexScheduler {
if let Some(index) = &query.index_uid { if let Some(index) = &query.index_uid {
let mut index_tasks = RoaringBitmap::new(); let mut index_tasks = RoaringBitmap::new();
for index in index { for index in index {
index_tasks |= self.index_tasks(&rtxn, index)?; index_tasks |= self.index_tasks(rtxn, index)?;
} }
tasks &= &index_tasks; tasks &= &index_tasks;
} }
@ -529,7 +531,7 @@ impl IndexScheduler {
}; };
keep_tasks_within_datetimes( keep_tasks_within_datetimes(
&rtxn, rtxn,
&mut filtered_non_processing_tasks, &mut filtered_non_processing_tasks,
self.started_at, self.started_at,
query.after_started_at, query.after_started_at,
@ -539,7 +541,7 @@ impl IndexScheduler {
}; };
keep_tasks_within_datetimes( keep_tasks_within_datetimes(
&rtxn, rtxn,
&mut tasks, &mut tasks,
self.enqueued_at, self.enqueued_at,
query.after_enqueued_at, query.after_enqueued_at,
@ -547,7 +549,7 @@ impl IndexScheduler {
)?; )?;
keep_tasks_within_datetimes( keep_tasks_within_datetimes(
&rtxn, rtxn,
&mut tasks, &mut tasks,
self.finished_at, self.finished_at,
query.after_finished_at, query.after_finished_at,
@ -561,10 +563,70 @@ impl IndexScheduler {
Ok(tasks) Ok(tasks)
} }
/// Returns the tasks matched by the given query. /// Return true iff there is at least one task associated with this index
pub fn get_tasks(&self, query: Query) -> Result<Vec<Task>> { /// that is processing.
let tasks = self.get_task_ids(&query)?; pub fn is_index_processing(&self, index: &str) -> Result<bool> {
let rtxn = self.env.read_txn()?; let rtxn = self.env.read_txn()?;
let processing_tasks = self.processing_tasks.read().unwrap().processing.clone();
let index_tasks = self.index_tasks(&rtxn, index)?;
let nbr_index_processing_tasks = processing_tasks.intersection_len(&index_tasks);
Ok(nbr_index_processing_tasks > 0)
}
/// Return the task ids matching the query from the user's point of view.
///
/// There are two differences between an internal query and a query executed by
/// the user.
///
/// 1. IndexSwap tasks are not publicly associated with any index, but they are associated
/// with many indexes internally.
/// 2. The user may not have the rights to access the tasks (internally) associated with all indexes.
pub fn get_task_ids_from_authorized_indexes(
&self,
rtxn: &RoTxn,
query: &Query,
authorized_indexes: &Option<Vec<String>>,
) -> Result<RoaringBitmap> {
let mut tasks = self.get_task_ids(rtxn, query)?;
// If the query contains a list of index_uid, then we must exclude IndexSwap tasks
// from the result (because it is not publicly associated with any index)
if query.index_uid.is_some() {
tasks -= self.get_kind(rtxn, Kind::IndexSwap)?
}
// Any task that is internally associated with a non-authorized index
// must be discarded.
if let Some(authorized_indexes) = authorized_indexes {
let all_indexes_iter = self.index_tasks.iter(rtxn)?;
for result in all_indexes_iter {
let (index, index_tasks) = result?;
if !authorized_indexes.contains(&index.to_owned()) {
tasks -= index_tasks;
}
}
}
Ok(tasks)
}
/// Return the tasks matching the query from the user's point of view.
///
/// There are two differences between an internal query and a query executed by
/// the user.
///
/// 1. IndexSwap tasks are not publicly associated with any index, but they are associated
/// with many indexes internally.
/// 2. The user may not have the rights to access the tasks (internally) associated with all indexes.
pub fn get_tasks_from_authorized_indexes(
&self,
query: Query,
authorized_indexes: Option<Vec<String>>,
) -> Result<Vec<Task>> {
let rtxn = self.env.read_txn()?;
let tasks =
self.get_task_ids_from_authorized_indexes(&rtxn, &query, &authorized_indexes)?;
let tasks = self.get_existing_tasks( let tasks = self.get_existing_tasks(
&rtxn, &rtxn,
@ -1187,12 +1249,7 @@ mod tests {
handle.wait_till(Breakpoint::AfterProcessing); handle.wait_till(Breakpoint::AfterProcessing);
index_scheduler.assert_internally_consistent(); index_scheduler.assert_internally_consistent();
let mut tasks = index_scheduler.get_tasks(Query::default()).unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "all_tasks_processed");
tasks.reverse();
assert_eq!(tasks.len(), 3);
assert_eq!(tasks[0].status, Status::Succeeded);
assert_eq!(tasks[1].status, Status::Succeeded);
assert_eq!(tasks[2].status, Status::Succeeded);
} }
#[test] #[test]
@ -1231,13 +1288,7 @@ mod tests {
handle.wait_till(Breakpoint::AfterProcessing); handle.wait_till(Breakpoint::AfterProcessing);
index_scheduler.assert_internally_consistent(); index_scheduler.assert_internally_consistent();
let mut tasks = index_scheduler.get_tasks(Query::default()).unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "all_tasks_processed");
tasks.reverse();
assert_eq!(tasks.len(), 4);
assert_eq!(tasks[0].status, Status::Succeeded);
assert_eq!(tasks[1].status, Status::Succeeded);
assert_eq!(tasks[2].status, Status::Succeeded);
assert_eq!(tasks[3].status, Status::Succeeded);
} }
#[test] #[test]
@ -1493,15 +1544,7 @@ mod tests {
index_scheduler.assert_internally_consistent(); index_scheduler.assert_internally_consistent();
} }
let mut tasks = index_scheduler.get_tasks(Query::default()).unwrap(); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "all_tasks_processed");
tasks.reverse();
assert_eq!(tasks.len(), 6);
assert_eq!(tasks[0].status, Status::Succeeded);
assert_eq!(tasks[1].status, Status::Succeeded);
assert_eq!(tasks[2].status, Status::Succeeded);
assert_eq!(tasks[3].status, Status::Succeeded);
assert_eq!(tasks[4].status, Status::Succeeded);
assert_eq!(tasks[5].status, Status::Succeeded);
} }
#[test] #[test]
@ -2054,37 +2097,45 @@ mod tests {
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "finished"); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "finished");
let rtxn = index_scheduler.env.read_txn().unwrap();
let query = Query { limit: Some(0), ..Default::default() }; let query = Query { limit: Some(0), ..Default::default() };
let tasks = index_scheduler.get_task_ids(&query).unwrap(); let tasks =
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
snapshot!(snapshot_bitmap(&tasks), @"[]"); snapshot!(snapshot_bitmap(&tasks), @"[]");
let query = Query { limit: Some(1), ..Default::default() }; let query = Query { limit: Some(1), ..Default::default() };
let tasks = index_scheduler.get_task_ids(&query).unwrap(); let tasks =
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
snapshot!(snapshot_bitmap(&tasks), @"[2,]"); snapshot!(snapshot_bitmap(&tasks), @"[2,]");
let query = Query { limit: Some(2), ..Default::default() }; let query = Query { limit: Some(2), ..Default::default() };
let tasks = index_scheduler.get_task_ids(&query).unwrap(); let tasks =
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
snapshot!(snapshot_bitmap(&tasks), @"[1,2,]"); snapshot!(snapshot_bitmap(&tasks), @"[1,2,]");
let query = Query { from: Some(1), ..Default::default() }; let query = Query { from: Some(1), ..Default::default() };
let tasks = index_scheduler.get_task_ids(&query).unwrap(); let tasks =
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
snapshot!(snapshot_bitmap(&tasks), @"[0,1,]"); snapshot!(snapshot_bitmap(&tasks), @"[0,1,]");
let query = Query { from: Some(2), ..Default::default() }; let query = Query { from: Some(2), ..Default::default() };
let tasks = index_scheduler.get_task_ids(&query).unwrap(); let tasks =
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
snapshot!(snapshot_bitmap(&tasks), @"[0,1,2,]"); snapshot!(snapshot_bitmap(&tasks), @"[0,1,2,]");
let query = Query { from: Some(1), limit: Some(1), ..Default::default() }; let query = Query { from: Some(1), limit: Some(1), ..Default::default() };
let tasks = index_scheduler.get_task_ids(&query).unwrap(); let tasks =
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
snapshot!(snapshot_bitmap(&tasks), @"[1,]"); snapshot!(snapshot_bitmap(&tasks), @"[1,]");
let query = Query { from: Some(1), limit: Some(2), ..Default::default() }; let query = Query { from: Some(1), limit: Some(2), ..Default::default() };
let tasks = index_scheduler.get_task_ids(&query).unwrap(); let tasks =
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
snapshot!(snapshot_bitmap(&tasks), @"[0,1,]"); snapshot!(snapshot_bitmap(&tasks), @"[0,1,]");
} }
#[test] #[test]
fn query_processing_tasks() { fn query_tasks_simple() {
let start_time = OffsetDateTime::now_utc(); let start_time = OffsetDateTime::now_utc();
let (index_scheduler, handle) = let (index_scheduler, handle) =
@ -2101,19 +2152,24 @@ mod tests {
handle.wait_till(Breakpoint::BatchCreated); handle.wait_till(Breakpoint::BatchCreated);
let rtxn = index_scheduler.env.read_txn().unwrap();
let query = Query { status: Some(vec![Status::Processing]), ..Default::default() }; let query = Query { status: Some(vec![Status::Processing]), ..Default::default() };
let tasks = index_scheduler.get_task_ids(&query).unwrap(); let tasks =
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
snapshot!(snapshot_bitmap(&tasks), @"[0,]"); // only the processing tasks in the first tick snapshot!(snapshot_bitmap(&tasks), @"[0,]"); // only the processing tasks in the first tick
let query = Query { status: Some(vec![Status::Enqueued]), ..Default::default() }; let query = Query { status: Some(vec![Status::Enqueued]), ..Default::default() };
let tasks = index_scheduler.get_task_ids(&query).unwrap(); let tasks =
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
snapshot!(snapshot_bitmap(&tasks), @"[1,2,]"); // only the enqueued tasks in the first tick snapshot!(snapshot_bitmap(&tasks), @"[1,2,]"); // only the enqueued tasks in the first tick
let query = Query { let query = Query {
status: Some(vec![Status::Enqueued, Status::Processing]), status: Some(vec![Status::Enqueued, Status::Processing]),
..Default::default() ..Default::default()
}; };
let tasks = index_scheduler.get_task_ids(&query).unwrap(); let tasks =
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
snapshot!(snapshot_bitmap(&tasks), @"[0,1,2,]"); // both enqueued and processing tasks in the first tick snapshot!(snapshot_bitmap(&tasks), @"[0,1,2,]"); // both enqueued and processing tasks in the first tick
let query = Query { let query = Query {
@ -2121,7 +2177,8 @@ mod tests {
after_started_at: Some(start_time), after_started_at: Some(start_time),
..Default::default() ..Default::default()
}; };
let tasks = index_scheduler.get_task_ids(&query).unwrap(); let tasks =
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
// both enqueued and processing tasks in the first tick, but limited to those with a started_at // both enqueued and processing tasks in the first tick, but limited to those with a started_at
// that comes after the start of the test, which should excludes the enqueued tasks // that comes after the start of the test, which should excludes the enqueued tasks
snapshot!(snapshot_bitmap(&tasks), @"[0,]"); snapshot!(snapshot_bitmap(&tasks), @"[0,]");
@ -2131,7 +2188,8 @@ mod tests {
before_started_at: Some(start_time), before_started_at: Some(start_time),
..Default::default() ..Default::default()
}; };
let tasks = index_scheduler.get_task_ids(&query).unwrap(); let tasks =
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
// both enqueued and processing tasks in the first tick, but limited to those with a started_at // both enqueued and processing tasks in the first tick, but limited to those with a started_at
// that comes before the start of the test, which should excludes all of them // that comes before the start of the test, which should excludes all of them
snapshot!(snapshot_bitmap(&tasks), @"[]"); snapshot!(snapshot_bitmap(&tasks), @"[]");
@ -2142,7 +2200,8 @@ mod tests {
before_started_at: Some(start_time + Duration::minutes(1)), before_started_at: Some(start_time + Duration::minutes(1)),
..Default::default() ..Default::default()
}; };
let tasks = index_scheduler.get_task_ids(&query).unwrap(); let tasks =
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
// both enqueued and processing tasks in the first tick, but limited to those with a started_at // both enqueued and processing tasks in the first tick, but limited to those with a started_at
// that comes after the start of the test and before one minute after the start of the test, // that comes after the start of the test and before one minute after the start of the test,
// which should exclude the enqueued tasks and include the only processing task // which should exclude the enqueued tasks and include the only processing task
@ -2150,6 +2209,8 @@ mod tests {
handle.wait_till(Breakpoint::BatchCreated); handle.wait_till(Breakpoint::BatchCreated);
let rtxn = index_scheduler.env.read_txn().unwrap();
let second_start_time = OffsetDateTime::now_utc(); let second_start_time = OffsetDateTime::now_utc();
let query = Query { let query = Query {
@ -2158,7 +2219,8 @@ mod tests {
before_started_at: Some(start_time + Duration::minutes(1)), before_started_at: Some(start_time + Duration::minutes(1)),
..Default::default() ..Default::default()
}; };
let tasks = index_scheduler.get_task_ids(&query).unwrap(); let tasks =
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
// both succeeded and processing tasks in the first tick, but limited to those with a started_at // both succeeded and processing tasks in the first tick, but limited to those with a started_at
// that comes after the start of the test and before one minute after the start of the test, // that comes after the start of the test and before one minute after the start of the test,
// which should include all tasks // which should include all tasks
@ -2169,7 +2231,8 @@ mod tests {
before_started_at: Some(start_time), before_started_at: Some(start_time),
..Default::default() ..Default::default()
}; };
let tasks = index_scheduler.get_task_ids(&query).unwrap(); let tasks =
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
// both succeeded and processing tasks in the first tick, but limited to those with a started_at // both succeeded and processing tasks in the first tick, but limited to those with a started_at
// that comes before the start of the test, which should exclude all tasks // that comes before the start of the test, which should exclude all tasks
snapshot!(snapshot_bitmap(&tasks), @"[]"); snapshot!(snapshot_bitmap(&tasks), @"[]");
@ -2180,7 +2243,8 @@ mod tests {
before_started_at: Some(second_start_time + Duration::minutes(1)), before_started_at: Some(second_start_time + Duration::minutes(1)),
..Default::default() ..Default::default()
}; };
let tasks = index_scheduler.get_task_ids(&query).unwrap(); let tasks =
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
// both succeeded and processing tasks in the first tick, but limited to those with a started_at // both succeeded and processing tasks in the first tick, but limited to those with a started_at
// that comes after the start of the second part of the test and before one minute after the // that comes after the start of the second part of the test and before one minute after the
// second start of the test, which should exclude all tasks // second start of the test, which should exclude all tasks
@ -2188,7 +2252,11 @@ mod tests {
// now we make one more batch, the started_at field of the new tasks will be past `second_start_time` // now we make one more batch, the started_at field of the new tasks will be past `second_start_time`
handle.wait_till(Breakpoint::BatchCreated); handle.wait_till(Breakpoint::BatchCreated);
let tasks = index_scheduler.get_task_ids(&query).unwrap();
let rtxn = index_scheduler.env.read_txn().unwrap();
let tasks =
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
// we run the same query to verify that, and indeed find that the last task is matched // we run the same query to verify that, and indeed find that the last task is matched
snapshot!(snapshot_bitmap(&tasks), @"[2,]"); snapshot!(snapshot_bitmap(&tasks), @"[2,]");
@ -2198,15 +2266,19 @@ mod tests {
before_started_at: Some(second_start_time + Duration::minutes(1)), before_started_at: Some(second_start_time + Duration::minutes(1)),
..Default::default() ..Default::default()
}; };
let tasks = index_scheduler.get_task_ids(&query).unwrap(); let tasks =
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
// enqueued, succeeded, or processing tasks started after the second part of the test, should // enqueued, succeeded, or processing tasks started after the second part of the test, should
// again only return the last task // again only return the last task
snapshot!(snapshot_bitmap(&tasks), @"[2,]"); snapshot!(snapshot_bitmap(&tasks), @"[2,]");
handle.wait_till(Breakpoint::AfterProcessing); handle.wait_till(Breakpoint::AfterProcessing);
let rtxn = index_scheduler.read_txn().unwrap();
// now the last task should have failed // now the last task should have failed
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "end"); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "end");
let tasks = index_scheduler.get_task_ids(&query).unwrap(); let tasks =
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
// so running the last query should return nothing // so running the last query should return nothing
snapshot!(snapshot_bitmap(&tasks), @"[]"); snapshot!(snapshot_bitmap(&tasks), @"[]");
@ -2216,7 +2288,8 @@ mod tests {
before_started_at: Some(second_start_time + Duration::minutes(1)), before_started_at: Some(second_start_time + Duration::minutes(1)),
..Default::default() ..Default::default()
}; };
let tasks = index_scheduler.get_task_ids(&query).unwrap(); let tasks =
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
// but the same query on failed tasks should return the last task // but the same query on failed tasks should return the last task
snapshot!(snapshot_bitmap(&tasks), @"[2,]"); snapshot!(snapshot_bitmap(&tasks), @"[2,]");
@ -2226,7 +2299,8 @@ mod tests {
before_started_at: Some(second_start_time + Duration::minutes(1)), before_started_at: Some(second_start_time + Duration::minutes(1)),
..Default::default() ..Default::default()
}; };
let tasks = index_scheduler.get_task_ids(&query).unwrap(); let tasks =
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
// but the same query on failed tasks should return the last task // but the same query on failed tasks should return the last task
snapshot!(snapshot_bitmap(&tasks), @"[2,]"); snapshot!(snapshot_bitmap(&tasks), @"[2,]");
@ -2237,7 +2311,8 @@ mod tests {
before_started_at: Some(second_start_time + Duration::minutes(1)), before_started_at: Some(second_start_time + Duration::minutes(1)),
..Default::default() ..Default::default()
}; };
let tasks = index_scheduler.get_task_ids(&query).unwrap(); let tasks =
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
// same query but with an invalid uid // same query but with an invalid uid
snapshot!(snapshot_bitmap(&tasks), @"[]"); snapshot!(snapshot_bitmap(&tasks), @"[]");
@ -2248,11 +2323,77 @@ mod tests {
before_started_at: Some(second_start_time + Duration::minutes(1)), before_started_at: Some(second_start_time + Duration::minutes(1)),
..Default::default() ..Default::default()
}; };
let tasks = index_scheduler.get_task_ids(&query).unwrap(); let tasks =
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
// same query but with a valid uid // same query but with a valid uid
snapshot!(snapshot_bitmap(&tasks), @"[2,]"); snapshot!(snapshot_bitmap(&tasks), @"[2,]");
} }
#[test]
fn query_tasks_special_rules() {
let (index_scheduler, handle) =
IndexScheduler::test(true, vec![(3, FailureLocation::InsideProcessBatch)]);
let kind = index_creation_task("catto", "mouse");
let _task = index_scheduler.register(kind).unwrap();
let kind = index_creation_task("doggo", "sheep");
let _task = index_scheduler.register(kind).unwrap();
let kind = KindWithContent::IndexSwap {
swaps: vec![IndexSwap { indexes: ("catto".to_owned(), "doggo".to_owned()) }],
};
let _task = index_scheduler.register(kind).unwrap();
let kind = KindWithContent::IndexSwap {
swaps: vec![IndexSwap { indexes: ("catto".to_owned(), "whalo".to_owned()) }],
};
let _task = index_scheduler.register(kind).unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "start");
handle.wait_till(Breakpoint::BatchCreated);
let rtxn = index_scheduler.env.read_txn().unwrap();
let query = Query { index_uid: Some(vec!["catto".to_owned()]), ..Default::default() };
let tasks =
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
// only the first task associated with catto is returned, the indexSwap tasks are excluded!
snapshot!(snapshot_bitmap(&tasks), @"[0,]");
let query = Query { index_uid: Some(vec!["catto".to_owned()]), ..Default::default() };
let tasks = index_scheduler
.get_task_ids_from_authorized_indexes(&rtxn, &query, &Some(vec!["doggo".to_owned()]))
.unwrap();
// we have asked for only the tasks associated with catto, but are only authorized to retrieve the tasks
// associated with doggo -> empty result
snapshot!(snapshot_bitmap(&tasks), @"[]");
let query = Query::default();
let tasks = index_scheduler
.get_task_ids_from_authorized_indexes(&rtxn, &query, &Some(vec!["doggo".to_owned()]))
.unwrap();
// we asked for all the tasks, but we are only authorized to retrieve the doggo tasks
// -> only the index creation of doggo should be returned
snapshot!(snapshot_bitmap(&tasks), @"[1,]");
let query = Query::default();
let tasks = index_scheduler
.get_task_ids_from_authorized_indexes(
&rtxn,
&query,
&Some(vec!["catto".to_owned(), "doggo".to_owned()]),
)
.unwrap();
// we asked for all the tasks, but we are only authorized to retrieve the doggo and catto tasks
// -> all tasks except the swap of catto with whalo are returned
snapshot!(snapshot_bitmap(&tasks), @"[0,1,2,]");
let query = Query::default();
let tasks =
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
// we asked for all the tasks with all index authorized -> all tasks returned
snapshot!(snapshot_bitmap(&tasks), @"[0,1,2,3,]");
}
#[test] #[test]
fn fail_in_create_batch_for_index_creation() { fn fail_in_create_batch_for_index_creation() {
let (index_scheduler, handle) = let (index_scheduler, handle) =

View File

@ -0,0 +1,59 @@
---
source: index-scheduler/src/lib.rs
---
### Autobatching Enabled = true
### Processing Tasks:
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: succeeded, details: { primary_key: None }, kind: IndexCreation { index_uid: "doggos", primary_key: None }}
1 {uid: 1, status: succeeded, details: { primary_key: None }, kind: IndexCreation { index_uid: "cattos", primary_key: None }}
2 {uid: 2, status: succeeded, details: { primary_key: None }, kind: IndexCreation { index_uid: "girafos", primary_key: None }}
3 {uid: 3, status: succeeded, details: { deleted_documents: Some(0) }, kind: DocumentClear { index_uid: "doggos" }}
4 {uid: 4, status: succeeded, details: { deleted_documents: Some(0) }, kind: DocumentClear { index_uid: "cattos" }}
5 {uid: 5, status: succeeded, details: { deleted_documents: Some(0) }, kind: DocumentClear { index_uid: "girafos" }}
----------------------------------------------------------------------
### Status:
enqueued []
succeeded [0,1,2,3,4,5,]
----------------------------------------------------------------------
### Kind:
"documentDeletion" [3,4,5,]
"indexCreation" [0,1,2,]
----------------------------------------------------------------------
### Index Tasks:
cattos [1,4,]
doggos [0,3,]
girafos [2,5,]
----------------------------------------------------------------------
### Index Mapper:
["cattos", "doggos", "girafos"]
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
[timestamp] [3,]
[timestamp] [4,]
[timestamp] [5,]
----------------------------------------------------------------------
### Started At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
[timestamp] [3,]
[timestamp] [4,]
[timestamp] [5,]
----------------------------------------------------------------------
### Finished At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
[timestamp] [3,]
[timestamp] [4,]
[timestamp] [5,]
----------------------------------------------------------------------
### File Store:
----------------------------------------------------------------------

View File

@ -0,0 +1,46 @@
---
source: index-scheduler/src/lib.rs
---
### Autobatching Enabled = true
### Processing Tasks:
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: succeeded, details: { primary_key: None }, kind: IndexCreation { index_uid: "doggos", primary_key: None }}
1 {uid: 1, status: succeeded, details: { primary_key: None }, kind: IndexCreation { index_uid: "cattos", primary_key: None }}
2 {uid: 2, status: succeeded, details: { deleted_documents: Some(0) }, kind: IndexDeletion { index_uid: "doggos" }}
----------------------------------------------------------------------
### Status:
enqueued []
succeeded [0,1,2,]
----------------------------------------------------------------------
### Kind:
"indexCreation" [0,1,]
"indexDeletion" [2,]
----------------------------------------------------------------------
### Index Tasks:
cattos [1,]
doggos [0,2,]
----------------------------------------------------------------------
### Index Mapper:
["cattos"]
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
----------------------------------------------------------------------
### Started At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
----------------------------------------------------------------------
### Finished At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
----------------------------------------------------------------------
### File Store:
----------------------------------------------------------------------

View File

@ -0,0 +1,49 @@
---
source: index-scheduler/src/lib.rs
---
### Autobatching Enabled = false
### Processing Tasks:
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: succeeded, details: { primary_key: None }, kind: IndexCreation { index_uid: "doggos", primary_key: None }}
1 {uid: 1, status: succeeded, details: { deleted_documents: Some(0) }, kind: DocumentClear { index_uid: "doggos" }}
2 {uid: 2, status: succeeded, details: { deleted_documents: Some(0) }, kind: DocumentClear { index_uid: "doggos" }}
3 {uid: 3, status: succeeded, details: { deleted_documents: Some(0) }, kind: DocumentClear { index_uid: "doggos" }}
----------------------------------------------------------------------
### Status:
enqueued []
succeeded [0,1,2,3,]
----------------------------------------------------------------------
### Kind:
"documentDeletion" [1,2,3,]
"indexCreation" [0,]
----------------------------------------------------------------------
### Index Tasks:
doggos [0,1,2,3,]
----------------------------------------------------------------------
### Index Mapper:
["doggos"]
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
[timestamp] [3,]
----------------------------------------------------------------------
### Started At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
[timestamp] [3,]
----------------------------------------------------------------------
### Finished At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
[timestamp] [3,]
----------------------------------------------------------------------
### File Store:
----------------------------------------------------------------------

View File

@ -0,0 +1,42 @@
---
source: index-scheduler/src/lib.rs
---
### Autobatching Enabled = true
### Processing Tasks:
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: enqueued, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
1 {uid: 1, status: enqueued, details: { primary_key: Some("sheep") }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("sheep") }}
2 {uid: 2, status: enqueued, details: { swaps: [IndexSwap { indexes: ("catto", "doggo") }] }, kind: IndexSwap { swaps: [IndexSwap { indexes: ("catto", "doggo") }] }}
3 {uid: 3, status: enqueued, details: { swaps: [IndexSwap { indexes: ("catto", "whalo") }] }, kind: IndexSwap { swaps: [IndexSwap { indexes: ("catto", "whalo") }] }}
----------------------------------------------------------------------
### Status:
enqueued [0,1,2,3,]
----------------------------------------------------------------------
### Kind:
"indexCreation" [0,1,]
"indexSwap" [2,3,]
----------------------------------------------------------------------
### Index Tasks:
catto [0,2,3,]
doggo [1,2,]
whalo [3,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
[timestamp] [3,]
----------------------------------------------------------------------
### Started At:
----------------------------------------------------------------------
### Finished At:
----------------------------------------------------------------------
### File Store:
----------------------------------------------------------------------

View File

@ -215,6 +215,27 @@ impl SearchRules {
} }
} }
} }
/// Return the list of indexes such that `self.is_index_authorized(index) == true`,
/// or `None` if all indexes satisfy this condition.
pub fn authorized_indexes(&self) -> Option<Vec<String>> {
match self {
SearchRules::Set(set) => {
if set.contains("*") {
None
} else {
Some(set.iter().cloned().collect())
}
}
SearchRules::Map(map) => {
if map.contains_key("*") {
None
} else {
Some(map.keys().cloned().collect())
}
}
}
}
} }
impl IntoIterator for SearchRules { impl IntoIterator for SearchRules {

View File

@ -1,11 +1,11 @@
use actix_web::web::Data; use actix_web::web::Data;
use actix_web::{web, HttpRequest, HttpResponse}; use actix_web::{web, HttpRequest, HttpResponse};
use index_scheduler::{IndexScheduler, Query}; use index_scheduler::IndexScheduler;
use log::debug; use log::debug;
use meilisearch_types::error::ResponseError; use meilisearch_types::error::ResponseError;
use meilisearch_types::index_uid::IndexUid; use meilisearch_types::index_uid::IndexUid;
use meilisearch_types::milli::{self, FieldDistribution, Index}; use meilisearch_types::milli::{self, FieldDistribution, Index};
use meilisearch_types::tasks::{KindWithContent, Status}; use meilisearch_types::tasks::KindWithContent;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::json; use serde_json::json;
use time::OffsetDateTime; use time::OffsetDateTime;
@ -202,14 +202,7 @@ impl IndexStats {
index_uid: String, index_uid: String,
) -> Result<Self, ResponseError> { ) -> Result<Self, ResponseError> {
// we check if there is currently a task processing associated with this index. // we check if there is currently a task processing associated with this index.
let processing_task = index_scheduler.get_tasks(Query { let is_processing = index_scheduler.is_index_processing(&index_uid)?;
status: Some(vec![Status::Processing]),
index_uid: Some(vec![index_uid.clone()]),
limit: Some(1),
..Query::default()
})?;
let is_processing = !processing_task.is_empty();
let index = index_scheduler.index(&index_uid)?; let index = index_scheduler.index(&index_uid)?;
let rtxn = index.read_txn()?; let rtxn = index.read_txn()?;
Ok(IndexStats { Ok(IndexStats {

View File

@ -270,11 +270,10 @@ pub fn create_all_stats(
let mut last_task: Option<OffsetDateTime> = None; let mut last_task: Option<OffsetDateTime> = None;
let mut indexes = BTreeMap::new(); let mut indexes = BTreeMap::new();
let mut database_size = 0; let mut database_size = 0;
let processing_task = index_scheduler.get_tasks(Query { let processing_task = index_scheduler.get_tasks_from_authorized_indexes(
status: Some(vec![Status::Processing]), Query { status: Some(vec![Status::Processing]), limit: Some(1), ..Query::default() },
limit: Some(1), search_rules.authorized_indexes(),
..Query::default() )?;
})?;
let processing_index = processing_task.first().and_then(|task| task.index_uid()); let processing_index = processing_task.first().and_then(|task| task.index_uid());
for (name, index) in index_scheduler.indexes()? { for (name, index) in index_scheduler.indexes()? {
if !search_rules.is_index_authorized(&name) { if !search_rules.is_index_authorized(&name) {

View File

@ -291,8 +291,11 @@ async fn cancel_tasks(
return Err(index_scheduler::Error::TaskCancelationWithEmptyQuery.into()); return Err(index_scheduler::Error::TaskCancelationWithEmptyQuery.into());
} }
let filtered_query = filter_out_inaccessible_indexes_from_query(&index_scheduler, &query); let tasks = index_scheduler.get_task_ids_from_authorized_indexes(
let tasks = index_scheduler.get_task_ids(&filtered_query)?; &index_scheduler.read_txn()?,
&query,
&index_scheduler.filters().search_rules.authorized_indexes(),
)?;
let task_cancelation = let task_cancelation =
KindWithContent::TaskCancelation { query: req.query_string().to_string(), tasks }; KindWithContent::TaskCancelation { query: req.query_string().to_string(), tasks };
@ -348,8 +351,11 @@ async fn delete_tasks(
return Err(index_scheduler::Error::TaskDeletionWithEmptyQuery.into()); return Err(index_scheduler::Error::TaskDeletionWithEmptyQuery.into());
} }
let filtered_query = filter_out_inaccessible_indexes_from_query(&index_scheduler, &query); let tasks = index_scheduler.get_task_ids_from_authorized_indexes(
let tasks = index_scheduler.get_task_ids(&filtered_query)?; &index_scheduler.read_txn()?,
&query,
&index_scheduler.filters().search_rules.authorized_indexes(),
)?;
let task_deletion = let task_deletion =
KindWithContent::TaskDeletion { query: req.query_string().to_string(), tasks }; KindWithContent::TaskDeletion { query: req.query_string().to_string(), tasks };
@ -425,10 +431,15 @@ async fn get_tasks(
before_finished_at, before_finished_at,
after_finished_at, after_finished_at,
}; };
let query = filter_out_inaccessible_indexes_from_query(&index_scheduler, &query);
let mut tasks_results: Vec<TaskView> = let mut tasks_results: Vec<TaskView> = index_scheduler
index_scheduler.get_tasks(query)?.into_iter().map(|t| TaskView::from_task(&t)).collect(); .get_tasks_from_authorized_indexes(
query,
index_scheduler.filters().search_rules.authorized_indexes(),
)?
.into_iter()
.map(|t| TaskView::from_task(&t))
.collect();
// If we were able to fetch the number +1 tasks we asked // If we were able to fetch the number +1 tasks we asked
// it means that there is more to come. // it means that there is more to come.
@ -454,17 +465,15 @@ async fn get_task(
analytics.publish("Tasks Seen".to_string(), json!({ "per_task_uid": true }), Some(&req)); analytics.publish("Tasks Seen".to_string(), json!({ "per_task_uid": true }), Some(&req));
let search_rules = &index_scheduler.filters().search_rules; let query = index_scheduler::Query { uid: Some(vec![task_id]), ..Query::default() };
let mut filters = index_scheduler::Query::default();
if !search_rules.is_index_authorized("*") {
for (index, _policy) in search_rules.clone() {
filters = filters.with_index(index);
}
}
filters.uid = Some(vec![task_id]); if let Some(task) = index_scheduler
.get_tasks_from_authorized_indexes(
if let Some(task) = index_scheduler.get_tasks(filters)?.first() { query,
index_scheduler.filters().search_rules.authorized_indexes(),
)?
.first()
{
let task_view = TaskView::from_task(task); let task_view = TaskView::from_task(task);
Ok(HttpResponse::Ok().json(task_view)) Ok(HttpResponse::Ok().json(task_view))
} else { } else {
@ -472,39 +481,6 @@ async fn get_task(
} }
} }
fn filter_out_inaccessible_indexes_from_query<const ACTION: u8>(
index_scheduler: &GuardedData<ActionPolicy<ACTION>, Data<IndexScheduler>>,
query: &Query,
) -> Query {
let mut query = query.clone();
// First remove all indexes from the query, we will add them back later
let indexes = query.index_uid.take();
let search_rules = &index_scheduler.filters().search_rules;
// We filter on potential indexes and make sure that the search filter
// restrictions are also applied.
match indexes {
Some(indexes) => {
for name in indexes.iter() {
if search_rules.is_index_authorized(name) {
query = query.with_index(name.to_string());
}
}
}
None => {
if !search_rules.is_index_authorized("*") {
for (index, _policy) in search_rules.clone() {
query = query.with_index(index.to_string());
}
}
}
};
query
}
pub(crate) mod date_deserializer { pub(crate) mod date_deserializer {
use time::format_description::well_known::Rfc3339; use time::format_description::well_known::Rfc3339;
use time::macros::format_description; use time::macros::format_description;