mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-01-18 08:48:32 +08:00
Reimplement task queries to account for special index swap rules
This commit is contained in:
parent
b44cc62320
commit
7b93ba40bd
@ -41,7 +41,7 @@ use uuid::Uuid;
|
|||||||
|
|
||||||
use crate::autobatcher::{self, BatchKind};
|
use crate::autobatcher::{self, BatchKind};
|
||||||
use crate::utils::{self, swap_index_uid_in_task};
|
use crate::utils::{self, swap_index_uid_in_task};
|
||||||
use crate::{Error, IndexScheduler, Query, Result, TaskId};
|
use crate::{Error, IndexScheduler, Result, TaskId};
|
||||||
|
|
||||||
/// Represents a combination of tasks that can all be processed at the same time.
|
/// Represents a combination of tasks that can all be processed at the same time.
|
||||||
///
|
///
|
||||||
@ -854,12 +854,10 @@ impl IndexScheduler {
|
|||||||
return Err(Error::IndexNotFound(rhs.to_owned()));
|
return Err(Error::IndexNotFound(rhs.to_owned()));
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2. Get the task set for index = name.
|
// 2. Get the task set for index = name that appeared before the index swap task
|
||||||
let mut index_lhs_task_ids =
|
let mut index_lhs_task_ids = self.index_tasks(&wtxn, lhs)?;
|
||||||
self.get_task_ids(&Query::default().with_index(lhs.to_owned()))?;
|
|
||||||
index_lhs_task_ids.remove_range(task_id..);
|
index_lhs_task_ids.remove_range(task_id..);
|
||||||
let mut index_rhs_task_ids =
|
let mut index_rhs_task_ids = self.index_tasks(&wtxn, rhs)?;
|
||||||
self.get_task_ids(&Query::default().with_index(rhs.to_owned()))?;
|
|
||||||
index_rhs_task_ids.remove_range(task_id..);
|
index_rhs_task_ids.remove_range(task_id..);
|
||||||
|
|
||||||
// 3. before_name -> new_name in the task's KindWithContent
|
// 3. before_name -> new_name in the task's KindWithContent
|
||||||
|
@ -41,7 +41,7 @@ pub use error::Error;
|
|||||||
use file_store::FileStore;
|
use file_store::FileStore;
|
||||||
use meilisearch_types::error::ResponseError;
|
use meilisearch_types::error::ResponseError;
|
||||||
use meilisearch_types::heed::types::{OwnedType, SerdeBincode, SerdeJson, Str};
|
use meilisearch_types::heed::types::{OwnedType, SerdeBincode, SerdeJson, Str};
|
||||||
use meilisearch_types::heed::{self, Database, Env};
|
use meilisearch_types::heed::{self, Database, Env, RoTxn};
|
||||||
use meilisearch_types::milli;
|
use meilisearch_types::milli;
|
||||||
use meilisearch_types::milli::documents::DocumentsBatchBuilder;
|
use meilisearch_types::milli::documents::DocumentsBatchBuilder;
|
||||||
use meilisearch_types::milli::update::IndexerConfig;
|
use meilisearch_types::milli::update::IndexerConfig;
|
||||||
@ -393,6 +393,10 @@ impl IndexScheduler {
|
|||||||
Ok(this)
|
Ok(this)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn read_txn(&self) -> Result<RoTxn> {
|
||||||
|
self.env.read_txn().map_err(|e| e.into())
|
||||||
|
}
|
||||||
|
|
||||||
/// Start the run loop for the given index scheduler.
|
/// Start the run loop for the given index scheduler.
|
||||||
///
|
///
|
||||||
/// This function will execute in a different thread and must be called
|
/// This function will execute in a different thread and must be called
|
||||||
@ -442,10 +446,8 @@ impl IndexScheduler {
|
|||||||
self.index_mapper.indexes(&rtxn)
|
self.index_mapper.indexes(&rtxn)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return the task ids matched by the given query.
|
/// Return the task ids matched by the given query from the index scheduler's point of view.
|
||||||
pub fn get_task_ids(&self, query: &Query) -> Result<RoaringBitmap> {
|
pub(crate) fn get_task_ids(&self, rtxn: &RoTxn, query: &Query) -> Result<RoaringBitmap> {
|
||||||
let rtxn = self.env.read_txn()?;
|
|
||||||
|
|
||||||
let ProcessingTasks { started_at: started_at_processing, processing: processing_tasks } =
|
let ProcessingTasks { started_at: started_at_processing, processing: processing_tasks } =
|
||||||
self.processing_tasks.read().unwrap().clone();
|
self.processing_tasks.read().unwrap().clone();
|
||||||
|
|
||||||
@ -561,10 +563,72 @@ impl IndexScheduler {
|
|||||||
Ok(tasks)
|
Ok(tasks)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the tasks matched by the given query.
|
/// Return true iff there is at least one task associated with this index
|
||||||
pub fn get_tasks(&self, query: Query) -> Result<Vec<Task>> {
|
/// that is processing.
|
||||||
let tasks = self.get_task_ids(&query)?;
|
pub fn is_index_processing(&self, index: &str) -> Result<bool> {
|
||||||
let rtxn = self.env.read_txn()?;
|
let rtxn = self.env.read_txn()?;
|
||||||
|
let processing_tasks = self.processing_tasks.read().unwrap().processing.clone();
|
||||||
|
let index_tasks = self.index_tasks(&rtxn, index)?;
|
||||||
|
|
||||||
|
let nbr_index_processing_tasks = processing_tasks.intersection_len(&index_tasks);
|
||||||
|
|
||||||
|
Ok(nbr_index_processing_tasks > 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Return the task ids matching the query from the user's point of view.
|
||||||
|
///
|
||||||
|
/// There are two differences between an internal query and a query executed by
|
||||||
|
/// the user.
|
||||||
|
///
|
||||||
|
/// 1. IndexSwap tasks are not publicly associated with any index, but they are associated
|
||||||
|
/// with many indexes internally.
|
||||||
|
/// 2. The user may not have the rights to access the tasks (internally) associated wuth all indexes.
|
||||||
|
pub fn get_task_ids_from_authorized_indexes(
|
||||||
|
&self,
|
||||||
|
rtxn: &RoTxn,
|
||||||
|
query: &Query,
|
||||||
|
authorized_indexes: &Option<Vec<String>>,
|
||||||
|
) -> Result<RoaringBitmap> {
|
||||||
|
let mut tasks = self.get_task_ids(rtxn, &query)?;
|
||||||
|
|
||||||
|
// If the query contains a list of index_uid, then we must exclude IndexSwap tasks
|
||||||
|
// from the result (because it is not publicly associated with any index)
|
||||||
|
if query.index_uid.is_some() {
|
||||||
|
tasks -= self.get_kind(rtxn, Kind::IndexSwap)?
|
||||||
|
}
|
||||||
|
|
||||||
|
// Any task that is internally associated with a non-authorized index
|
||||||
|
// must be discarded.
|
||||||
|
if let Some(authorized_indexes) = authorized_indexes {
|
||||||
|
let all_indexes_iter = self.index_tasks.iter(rtxn)?;
|
||||||
|
for iter_el in all_indexes_iter {
|
||||||
|
let (index, index_tasks) = iter_el?;
|
||||||
|
if !authorized_indexes.contains(&index.to_owned()) {
|
||||||
|
tasks -= index_tasks;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(tasks)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Return the tasks matching the query from the user's point of view.
|
||||||
|
///
|
||||||
|
/// There are two differences between an internal query and a query executed by
|
||||||
|
/// the user.
|
||||||
|
///
|
||||||
|
/// 1. IndexSwap tasks are not publicly associated with any index, but they are associated
|
||||||
|
/// with many indexes internally.
|
||||||
|
/// 2. The user may not have the rights to access the tasks (internally) associated wuth all indexes.
|
||||||
|
pub fn get_tasks_from_authorized_indexes(
|
||||||
|
&self,
|
||||||
|
query: Query,
|
||||||
|
authorized_indexes: Option<Vec<String>>,
|
||||||
|
) -> Result<Vec<Task>> {
|
||||||
|
let rtxn = self.env.read_txn()?;
|
||||||
|
|
||||||
|
let tasks =
|
||||||
|
self.get_task_ids_from_authorized_indexes(&rtxn, &query, &authorized_indexes)?;
|
||||||
|
|
||||||
let tasks = self.get_existing_tasks(
|
let tasks = self.get_existing_tasks(
|
||||||
&rtxn,
|
&rtxn,
|
||||||
@ -1187,12 +1251,7 @@ mod tests {
|
|||||||
handle.wait_till(Breakpoint::AfterProcessing);
|
handle.wait_till(Breakpoint::AfterProcessing);
|
||||||
index_scheduler.assert_internally_consistent();
|
index_scheduler.assert_internally_consistent();
|
||||||
|
|
||||||
let mut tasks = index_scheduler.get_tasks(Query::default()).unwrap();
|
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "all_tasks_processed");
|
||||||
tasks.reverse();
|
|
||||||
assert_eq!(tasks.len(), 3);
|
|
||||||
assert_eq!(tasks[0].status, Status::Succeeded);
|
|
||||||
assert_eq!(tasks[1].status, Status::Succeeded);
|
|
||||||
assert_eq!(tasks[2].status, Status::Succeeded);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@ -1231,13 +1290,7 @@ mod tests {
|
|||||||
handle.wait_till(Breakpoint::AfterProcessing);
|
handle.wait_till(Breakpoint::AfterProcessing);
|
||||||
index_scheduler.assert_internally_consistent();
|
index_scheduler.assert_internally_consistent();
|
||||||
|
|
||||||
let mut tasks = index_scheduler.get_tasks(Query::default()).unwrap();
|
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "all_tasks_processed");
|
||||||
tasks.reverse();
|
|
||||||
assert_eq!(tasks.len(), 4);
|
|
||||||
assert_eq!(tasks[0].status, Status::Succeeded);
|
|
||||||
assert_eq!(tasks[1].status, Status::Succeeded);
|
|
||||||
assert_eq!(tasks[2].status, Status::Succeeded);
|
|
||||||
assert_eq!(tasks[3].status, Status::Succeeded);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@ -1493,15 +1546,7 @@ mod tests {
|
|||||||
index_scheduler.assert_internally_consistent();
|
index_scheduler.assert_internally_consistent();
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut tasks = index_scheduler.get_tasks(Query::default()).unwrap();
|
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "all_tasks_processed");
|
||||||
tasks.reverse();
|
|
||||||
assert_eq!(tasks.len(), 6);
|
|
||||||
assert_eq!(tasks[0].status, Status::Succeeded);
|
|
||||||
assert_eq!(tasks[1].status, Status::Succeeded);
|
|
||||||
assert_eq!(tasks[2].status, Status::Succeeded);
|
|
||||||
assert_eq!(tasks[3].status, Status::Succeeded);
|
|
||||||
assert_eq!(tasks[4].status, Status::Succeeded);
|
|
||||||
assert_eq!(tasks[5].status, Status::Succeeded);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@ -2054,37 +2099,45 @@ mod tests {
|
|||||||
|
|
||||||
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "finished");
|
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "finished");
|
||||||
|
|
||||||
|
let rtxn = index_scheduler.env.read_txn().unwrap();
|
||||||
let query = Query { limit: Some(0), ..Default::default() };
|
let query = Query { limit: Some(0), ..Default::default() };
|
||||||
let tasks = index_scheduler.get_task_ids(&query).unwrap();
|
let tasks =
|
||||||
|
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
|
||||||
snapshot!(snapshot_bitmap(&tasks), @"[]");
|
snapshot!(snapshot_bitmap(&tasks), @"[]");
|
||||||
|
|
||||||
let query = Query { limit: Some(1), ..Default::default() };
|
let query = Query { limit: Some(1), ..Default::default() };
|
||||||
let tasks = index_scheduler.get_task_ids(&query).unwrap();
|
let tasks =
|
||||||
|
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
|
||||||
snapshot!(snapshot_bitmap(&tasks), @"[2,]");
|
snapshot!(snapshot_bitmap(&tasks), @"[2,]");
|
||||||
|
|
||||||
let query = Query { limit: Some(2), ..Default::default() };
|
let query = Query { limit: Some(2), ..Default::default() };
|
||||||
let tasks = index_scheduler.get_task_ids(&query).unwrap();
|
let tasks =
|
||||||
|
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
|
||||||
snapshot!(snapshot_bitmap(&tasks), @"[1,2,]");
|
snapshot!(snapshot_bitmap(&tasks), @"[1,2,]");
|
||||||
|
|
||||||
let query = Query { from: Some(1), ..Default::default() };
|
let query = Query { from: Some(1), ..Default::default() };
|
||||||
let tasks = index_scheduler.get_task_ids(&query).unwrap();
|
let tasks =
|
||||||
|
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
|
||||||
snapshot!(snapshot_bitmap(&tasks), @"[0,1,]");
|
snapshot!(snapshot_bitmap(&tasks), @"[0,1,]");
|
||||||
|
|
||||||
let query = Query { from: Some(2), ..Default::default() };
|
let query = Query { from: Some(2), ..Default::default() };
|
||||||
let tasks = index_scheduler.get_task_ids(&query).unwrap();
|
let tasks =
|
||||||
|
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
|
||||||
snapshot!(snapshot_bitmap(&tasks), @"[0,1,2,]");
|
snapshot!(snapshot_bitmap(&tasks), @"[0,1,2,]");
|
||||||
|
|
||||||
let query = Query { from: Some(1), limit: Some(1), ..Default::default() };
|
let query = Query { from: Some(1), limit: Some(1), ..Default::default() };
|
||||||
let tasks = index_scheduler.get_task_ids(&query).unwrap();
|
let tasks =
|
||||||
|
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
|
||||||
snapshot!(snapshot_bitmap(&tasks), @"[1,]");
|
snapshot!(snapshot_bitmap(&tasks), @"[1,]");
|
||||||
|
|
||||||
let query = Query { from: Some(1), limit: Some(2), ..Default::default() };
|
let query = Query { from: Some(1), limit: Some(2), ..Default::default() };
|
||||||
let tasks = index_scheduler.get_task_ids(&query).unwrap();
|
let tasks =
|
||||||
|
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
|
||||||
snapshot!(snapshot_bitmap(&tasks), @"[0,1,]");
|
snapshot!(snapshot_bitmap(&tasks), @"[0,1,]");
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn query_processing_tasks() {
|
fn query_tasks_simple() {
|
||||||
let start_time = OffsetDateTime::now_utc();
|
let start_time = OffsetDateTime::now_utc();
|
||||||
|
|
||||||
let (index_scheduler, handle) =
|
let (index_scheduler, handle) =
|
||||||
@ -2101,19 +2154,24 @@ mod tests {
|
|||||||
|
|
||||||
handle.wait_till(Breakpoint::BatchCreated);
|
handle.wait_till(Breakpoint::BatchCreated);
|
||||||
|
|
||||||
|
let rtxn = index_scheduler.env.read_txn().unwrap();
|
||||||
|
|
||||||
let query = Query { status: Some(vec![Status::Processing]), ..Default::default() };
|
let query = Query { status: Some(vec![Status::Processing]), ..Default::default() };
|
||||||
let tasks = index_scheduler.get_task_ids(&query).unwrap();
|
let tasks =
|
||||||
|
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
|
||||||
snapshot!(snapshot_bitmap(&tasks), @"[0,]"); // only the processing tasks in the first tick
|
snapshot!(snapshot_bitmap(&tasks), @"[0,]"); // only the processing tasks in the first tick
|
||||||
|
|
||||||
let query = Query { status: Some(vec![Status::Enqueued]), ..Default::default() };
|
let query = Query { status: Some(vec![Status::Enqueued]), ..Default::default() };
|
||||||
let tasks = index_scheduler.get_task_ids(&query).unwrap();
|
let tasks =
|
||||||
|
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
|
||||||
snapshot!(snapshot_bitmap(&tasks), @"[1,2,]"); // only the enqueued tasks in the first tick
|
snapshot!(snapshot_bitmap(&tasks), @"[1,2,]"); // only the enqueued tasks in the first tick
|
||||||
|
|
||||||
let query = Query {
|
let query = Query {
|
||||||
status: Some(vec![Status::Enqueued, Status::Processing]),
|
status: Some(vec![Status::Enqueued, Status::Processing]),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
let tasks = index_scheduler.get_task_ids(&query).unwrap();
|
let tasks =
|
||||||
|
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
|
||||||
snapshot!(snapshot_bitmap(&tasks), @"[0,1,2,]"); // both enqueued and processing tasks in the first tick
|
snapshot!(snapshot_bitmap(&tasks), @"[0,1,2,]"); // both enqueued and processing tasks in the first tick
|
||||||
|
|
||||||
let query = Query {
|
let query = Query {
|
||||||
@ -2121,7 +2179,8 @@ mod tests {
|
|||||||
after_started_at: Some(start_time),
|
after_started_at: Some(start_time),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
let tasks = index_scheduler.get_task_ids(&query).unwrap();
|
let tasks =
|
||||||
|
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
|
||||||
// both enqueued and processing tasks in the first tick, but limited to those with a started_at
|
// both enqueued and processing tasks in the first tick, but limited to those with a started_at
|
||||||
// that comes after the start of the test, which should excludes the enqueued tasks
|
// that comes after the start of the test, which should excludes the enqueued tasks
|
||||||
snapshot!(snapshot_bitmap(&tasks), @"[0,]");
|
snapshot!(snapshot_bitmap(&tasks), @"[0,]");
|
||||||
@ -2131,7 +2190,8 @@ mod tests {
|
|||||||
before_started_at: Some(start_time),
|
before_started_at: Some(start_time),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
let tasks = index_scheduler.get_task_ids(&query).unwrap();
|
let tasks =
|
||||||
|
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
|
||||||
// both enqueued and processing tasks in the first tick, but limited to those with a started_at
|
// both enqueued and processing tasks in the first tick, but limited to those with a started_at
|
||||||
// that comes before the start of the test, which should excludes all of them
|
// that comes before the start of the test, which should excludes all of them
|
||||||
snapshot!(snapshot_bitmap(&tasks), @"[]");
|
snapshot!(snapshot_bitmap(&tasks), @"[]");
|
||||||
@ -2142,7 +2202,8 @@ mod tests {
|
|||||||
before_started_at: Some(start_time + Duration::minutes(1)),
|
before_started_at: Some(start_time + Duration::minutes(1)),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
let tasks = index_scheduler.get_task_ids(&query).unwrap();
|
let tasks =
|
||||||
|
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
|
||||||
// both enqueued and processing tasks in the first tick, but limited to those with a started_at
|
// both enqueued and processing tasks in the first tick, but limited to those with a started_at
|
||||||
// that comes after the start of the test and before one minute after the start of the test,
|
// that comes after the start of the test and before one minute after the start of the test,
|
||||||
// which should exclude the enqueued tasks and include the only processing task
|
// which should exclude the enqueued tasks and include the only processing task
|
||||||
@ -2150,6 +2211,8 @@ mod tests {
|
|||||||
|
|
||||||
handle.wait_till(Breakpoint::BatchCreated);
|
handle.wait_till(Breakpoint::BatchCreated);
|
||||||
|
|
||||||
|
let rtxn = index_scheduler.env.read_txn().unwrap();
|
||||||
|
|
||||||
let second_start_time = OffsetDateTime::now_utc();
|
let second_start_time = OffsetDateTime::now_utc();
|
||||||
|
|
||||||
let query = Query {
|
let query = Query {
|
||||||
@ -2158,7 +2221,8 @@ mod tests {
|
|||||||
before_started_at: Some(start_time + Duration::minutes(1)),
|
before_started_at: Some(start_time + Duration::minutes(1)),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
let tasks = index_scheduler.get_task_ids(&query).unwrap();
|
let tasks =
|
||||||
|
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
|
||||||
// both succeeded and processing tasks in the first tick, but limited to those with a started_at
|
// both succeeded and processing tasks in the first tick, but limited to those with a started_at
|
||||||
// that comes after the start of the test and before one minute after the start of the test,
|
// that comes after the start of the test and before one minute after the start of the test,
|
||||||
// which should include all tasks
|
// which should include all tasks
|
||||||
@ -2169,7 +2233,8 @@ mod tests {
|
|||||||
before_started_at: Some(start_time),
|
before_started_at: Some(start_time),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
let tasks = index_scheduler.get_task_ids(&query).unwrap();
|
let tasks =
|
||||||
|
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
|
||||||
// both succeeded and processing tasks in the first tick, but limited to those with a started_at
|
// both succeeded and processing tasks in the first tick, but limited to those with a started_at
|
||||||
// that comes before the start of the test, which should exclude all tasks
|
// that comes before the start of the test, which should exclude all tasks
|
||||||
snapshot!(snapshot_bitmap(&tasks), @"[]");
|
snapshot!(snapshot_bitmap(&tasks), @"[]");
|
||||||
@ -2180,7 +2245,8 @@ mod tests {
|
|||||||
before_started_at: Some(second_start_time + Duration::minutes(1)),
|
before_started_at: Some(second_start_time + Duration::minutes(1)),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
let tasks = index_scheduler.get_task_ids(&query).unwrap();
|
let tasks =
|
||||||
|
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
|
||||||
// both succeeded and processing tasks in the first tick, but limited to those with a started_at
|
// both succeeded and processing tasks in the first tick, but limited to those with a started_at
|
||||||
// that comes after the start of the second part of the test and before one minute after the
|
// that comes after the start of the second part of the test and before one minute after the
|
||||||
// second start of the test, which should exclude all tasks
|
// second start of the test, which should exclude all tasks
|
||||||
@ -2188,7 +2254,11 @@ mod tests {
|
|||||||
|
|
||||||
// now we make one more batch, the started_at field of the new tasks will be past `second_start_time`
|
// now we make one more batch, the started_at field of the new tasks will be past `second_start_time`
|
||||||
handle.wait_till(Breakpoint::BatchCreated);
|
handle.wait_till(Breakpoint::BatchCreated);
|
||||||
let tasks = index_scheduler.get_task_ids(&query).unwrap();
|
|
||||||
|
let rtxn = index_scheduler.env.read_txn().unwrap();
|
||||||
|
|
||||||
|
let tasks =
|
||||||
|
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
|
||||||
// we run the same query to verify that, and indeed find that the last task is matched
|
// we run the same query to verify that, and indeed find that the last task is matched
|
||||||
snapshot!(snapshot_bitmap(&tasks), @"[2,]");
|
snapshot!(snapshot_bitmap(&tasks), @"[2,]");
|
||||||
|
|
||||||
@ -2198,15 +2268,19 @@ mod tests {
|
|||||||
before_started_at: Some(second_start_time + Duration::minutes(1)),
|
before_started_at: Some(second_start_time + Duration::minutes(1)),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
let tasks = index_scheduler.get_task_ids(&query).unwrap();
|
let tasks =
|
||||||
|
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
|
||||||
// enqueued, succeeded, or processing tasks started after the second part of the test, should
|
// enqueued, succeeded, or processing tasks started after the second part of the test, should
|
||||||
// again only return the last task
|
// again only return the last task
|
||||||
snapshot!(snapshot_bitmap(&tasks), @"[2,]");
|
snapshot!(snapshot_bitmap(&tasks), @"[2,]");
|
||||||
|
|
||||||
handle.wait_till(Breakpoint::AfterProcessing);
|
handle.wait_till(Breakpoint::AfterProcessing);
|
||||||
|
let rtxn = index_scheduler.read_txn().unwrap();
|
||||||
|
|
||||||
// now the last task should have failed
|
// now the last task should have failed
|
||||||
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "end");
|
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "end");
|
||||||
let tasks = index_scheduler.get_task_ids(&query).unwrap();
|
let tasks =
|
||||||
|
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
|
||||||
// so running the last query should return nothing
|
// so running the last query should return nothing
|
||||||
snapshot!(snapshot_bitmap(&tasks), @"[]");
|
snapshot!(snapshot_bitmap(&tasks), @"[]");
|
||||||
|
|
||||||
@ -2216,7 +2290,8 @@ mod tests {
|
|||||||
before_started_at: Some(second_start_time + Duration::minutes(1)),
|
before_started_at: Some(second_start_time + Duration::minutes(1)),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
let tasks = index_scheduler.get_task_ids(&query).unwrap();
|
let tasks =
|
||||||
|
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
|
||||||
// but the same query on failed tasks should return the last task
|
// but the same query on failed tasks should return the last task
|
||||||
snapshot!(snapshot_bitmap(&tasks), @"[2,]");
|
snapshot!(snapshot_bitmap(&tasks), @"[2,]");
|
||||||
|
|
||||||
@ -2226,7 +2301,8 @@ mod tests {
|
|||||||
before_started_at: Some(second_start_time + Duration::minutes(1)),
|
before_started_at: Some(second_start_time + Duration::minutes(1)),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
let tasks = index_scheduler.get_task_ids(&query).unwrap();
|
let tasks =
|
||||||
|
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
|
||||||
// but the same query on failed tasks should return the last task
|
// but the same query on failed tasks should return the last task
|
||||||
snapshot!(snapshot_bitmap(&tasks), @"[2,]");
|
snapshot!(snapshot_bitmap(&tasks), @"[2,]");
|
||||||
|
|
||||||
@ -2237,7 +2313,8 @@ mod tests {
|
|||||||
before_started_at: Some(second_start_time + Duration::minutes(1)),
|
before_started_at: Some(second_start_time + Duration::minutes(1)),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
let tasks = index_scheduler.get_task_ids(&query).unwrap();
|
let tasks =
|
||||||
|
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
|
||||||
// same query but with an invalid uid
|
// same query but with an invalid uid
|
||||||
snapshot!(snapshot_bitmap(&tasks), @"[]");
|
snapshot!(snapshot_bitmap(&tasks), @"[]");
|
||||||
|
|
||||||
@ -2248,11 +2325,77 @@ mod tests {
|
|||||||
before_started_at: Some(second_start_time + Duration::minutes(1)),
|
before_started_at: Some(second_start_time + Duration::minutes(1)),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
let tasks = index_scheduler.get_task_ids(&query).unwrap();
|
let tasks =
|
||||||
|
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
|
||||||
// same query but with a valid uid
|
// same query but with a valid uid
|
||||||
snapshot!(snapshot_bitmap(&tasks), @"[2,]");
|
snapshot!(snapshot_bitmap(&tasks), @"[2,]");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn query_tasks_special_rules() {
|
||||||
|
let (index_scheduler, handle) =
|
||||||
|
IndexScheduler::test(true, vec![(3, FailureLocation::InsideProcessBatch)]);
|
||||||
|
|
||||||
|
let kind = index_creation_task("catto", "mouse");
|
||||||
|
let _task = index_scheduler.register(kind).unwrap();
|
||||||
|
let kind = index_creation_task("doggo", "sheep");
|
||||||
|
let _task = index_scheduler.register(kind).unwrap();
|
||||||
|
let kind = KindWithContent::IndexSwap {
|
||||||
|
swaps: vec![IndexSwap { indexes: ("catto".to_owned(), "doggo".to_owned()) }],
|
||||||
|
};
|
||||||
|
let _task = index_scheduler.register(kind).unwrap();
|
||||||
|
let kind = KindWithContent::IndexSwap {
|
||||||
|
swaps: vec![IndexSwap { indexes: ("catto".to_owned(), "whalo".to_owned()) }],
|
||||||
|
};
|
||||||
|
let _task = index_scheduler.register(kind).unwrap();
|
||||||
|
|
||||||
|
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "start");
|
||||||
|
|
||||||
|
handle.wait_till(Breakpoint::BatchCreated);
|
||||||
|
|
||||||
|
let rtxn = index_scheduler.env.read_txn().unwrap();
|
||||||
|
|
||||||
|
let query = Query { index_uid: Some(vec!["catto".to_owned()]), ..Default::default() };
|
||||||
|
let tasks =
|
||||||
|
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
|
||||||
|
// only the first task associated with catto is returned, the indexSwap tasks are excluded!
|
||||||
|
snapshot!(snapshot_bitmap(&tasks), @"[0,]");
|
||||||
|
|
||||||
|
let query = Query { index_uid: Some(vec!["catto".to_owned()]), ..Default::default() };
|
||||||
|
let tasks = index_scheduler
|
||||||
|
.get_task_ids_from_authorized_indexes(&rtxn, &query, &Some(vec!["doggo".to_owned()]))
|
||||||
|
.unwrap();
|
||||||
|
// we have asked for only the tasks associated with catto, but are only authorized to retrieve the tasks
|
||||||
|
// associated with doggo -> empty result
|
||||||
|
snapshot!(snapshot_bitmap(&tasks), @"[]");
|
||||||
|
|
||||||
|
let query = Query::default();
|
||||||
|
let tasks = index_scheduler
|
||||||
|
.get_task_ids_from_authorized_indexes(&rtxn, &query, &Some(vec!["doggo".to_owned()]))
|
||||||
|
.unwrap();
|
||||||
|
// we asked for all the tasks, but we are only authorized to retrieve the doggo tasks
|
||||||
|
// -> only the index creation of doggo should be returned
|
||||||
|
snapshot!(snapshot_bitmap(&tasks), @"[1,]");
|
||||||
|
|
||||||
|
let query = Query::default();
|
||||||
|
let tasks = index_scheduler
|
||||||
|
.get_task_ids_from_authorized_indexes(
|
||||||
|
&rtxn,
|
||||||
|
&query,
|
||||||
|
&Some(vec!["catto".to_owned(), "doggo".to_owned()]),
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
// we asked for all the tasks, but we are only authorized to retrieve the doggo and catto tasks
|
||||||
|
// -> all tasks except the swap of catto with whalo are returned
|
||||||
|
snapshot!(snapshot_bitmap(&tasks), @"[0,1,2,]");
|
||||||
|
|
||||||
|
let query = Query::default();
|
||||||
|
let tasks =
|
||||||
|
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
|
||||||
|
// we asked for all the tasks with all index authorized -> all tasks returned
|
||||||
|
snapshot!(snapshot_bitmap(&tasks), @"[0,1,2,3,]");
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn fail_in_create_batch_for_index_creation() {
|
fn fail_in_create_batch_for_index_creation() {
|
||||||
let (index_scheduler, handle) =
|
let (index_scheduler, handle) =
|
||||||
|
@ -0,0 +1,59 @@
|
|||||||
|
---
|
||||||
|
source: index-scheduler/src/lib.rs
|
||||||
|
---
|
||||||
|
### Autobatching Enabled = true
|
||||||
|
### Processing Tasks:
|
||||||
|
[]
|
||||||
|
----------------------------------------------------------------------
|
||||||
|
### All Tasks:
|
||||||
|
0 {uid: 0, status: succeeded, details: { primary_key: None }, kind: IndexCreation { index_uid: "doggos", primary_key: None }}
|
||||||
|
1 {uid: 1, status: succeeded, details: { primary_key: None }, kind: IndexCreation { index_uid: "cattos", primary_key: None }}
|
||||||
|
2 {uid: 2, status: succeeded, details: { primary_key: None }, kind: IndexCreation { index_uid: "girafos", primary_key: None }}
|
||||||
|
3 {uid: 3, status: succeeded, details: { deleted_documents: Some(0) }, kind: DocumentClear { index_uid: "doggos" }}
|
||||||
|
4 {uid: 4, status: succeeded, details: { deleted_documents: Some(0) }, kind: DocumentClear { index_uid: "cattos" }}
|
||||||
|
5 {uid: 5, status: succeeded, details: { deleted_documents: Some(0) }, kind: DocumentClear { index_uid: "girafos" }}
|
||||||
|
----------------------------------------------------------------------
|
||||||
|
### Status:
|
||||||
|
enqueued []
|
||||||
|
succeeded [0,1,2,3,4,5,]
|
||||||
|
----------------------------------------------------------------------
|
||||||
|
### Kind:
|
||||||
|
"documentDeletion" [3,4,5,]
|
||||||
|
"indexCreation" [0,1,2,]
|
||||||
|
----------------------------------------------------------------------
|
||||||
|
### Index Tasks:
|
||||||
|
cattos [1,4,]
|
||||||
|
doggos [0,3,]
|
||||||
|
girafos [2,5,]
|
||||||
|
----------------------------------------------------------------------
|
||||||
|
### Index Mapper:
|
||||||
|
["cattos", "doggos", "girafos"]
|
||||||
|
----------------------------------------------------------------------
|
||||||
|
### Enqueued At:
|
||||||
|
[timestamp] [0,]
|
||||||
|
[timestamp] [1,]
|
||||||
|
[timestamp] [2,]
|
||||||
|
[timestamp] [3,]
|
||||||
|
[timestamp] [4,]
|
||||||
|
[timestamp] [5,]
|
||||||
|
----------------------------------------------------------------------
|
||||||
|
### Started At:
|
||||||
|
[timestamp] [0,]
|
||||||
|
[timestamp] [1,]
|
||||||
|
[timestamp] [2,]
|
||||||
|
[timestamp] [3,]
|
||||||
|
[timestamp] [4,]
|
||||||
|
[timestamp] [5,]
|
||||||
|
----------------------------------------------------------------------
|
||||||
|
### Finished At:
|
||||||
|
[timestamp] [0,]
|
||||||
|
[timestamp] [1,]
|
||||||
|
[timestamp] [2,]
|
||||||
|
[timestamp] [3,]
|
||||||
|
[timestamp] [4,]
|
||||||
|
[timestamp] [5,]
|
||||||
|
----------------------------------------------------------------------
|
||||||
|
### File Store:
|
||||||
|
|
||||||
|
----------------------------------------------------------------------
|
||||||
|
|
@ -0,0 +1,46 @@
|
|||||||
|
---
|
||||||
|
source: index-scheduler/src/lib.rs
|
||||||
|
---
|
||||||
|
### Autobatching Enabled = true
|
||||||
|
### Processing Tasks:
|
||||||
|
[]
|
||||||
|
----------------------------------------------------------------------
|
||||||
|
### All Tasks:
|
||||||
|
0 {uid: 0, status: succeeded, details: { primary_key: None }, kind: IndexCreation { index_uid: "doggos", primary_key: None }}
|
||||||
|
1 {uid: 1, status: succeeded, details: { primary_key: None }, kind: IndexCreation { index_uid: "cattos", primary_key: None }}
|
||||||
|
2 {uid: 2, status: succeeded, details: { deleted_documents: Some(0) }, kind: IndexDeletion { index_uid: "doggos" }}
|
||||||
|
----------------------------------------------------------------------
|
||||||
|
### Status:
|
||||||
|
enqueued []
|
||||||
|
succeeded [0,1,2,]
|
||||||
|
----------------------------------------------------------------------
|
||||||
|
### Kind:
|
||||||
|
"indexCreation" [0,1,]
|
||||||
|
"indexDeletion" [2,]
|
||||||
|
----------------------------------------------------------------------
|
||||||
|
### Index Tasks:
|
||||||
|
cattos [1,]
|
||||||
|
doggos [0,2,]
|
||||||
|
----------------------------------------------------------------------
|
||||||
|
### Index Mapper:
|
||||||
|
["cattos"]
|
||||||
|
----------------------------------------------------------------------
|
||||||
|
### Enqueued At:
|
||||||
|
[timestamp] [0,]
|
||||||
|
[timestamp] [1,]
|
||||||
|
[timestamp] [2,]
|
||||||
|
----------------------------------------------------------------------
|
||||||
|
### Started At:
|
||||||
|
[timestamp] [0,]
|
||||||
|
[timestamp] [1,]
|
||||||
|
[timestamp] [2,]
|
||||||
|
----------------------------------------------------------------------
|
||||||
|
### Finished At:
|
||||||
|
[timestamp] [0,]
|
||||||
|
[timestamp] [1,]
|
||||||
|
[timestamp] [2,]
|
||||||
|
----------------------------------------------------------------------
|
||||||
|
### File Store:
|
||||||
|
|
||||||
|
----------------------------------------------------------------------
|
||||||
|
|
@ -0,0 +1,49 @@
|
|||||||
|
---
|
||||||
|
source: index-scheduler/src/lib.rs
|
||||||
|
---
|
||||||
|
### Autobatching Enabled = false
|
||||||
|
### Processing Tasks:
|
||||||
|
[]
|
||||||
|
----------------------------------------------------------------------
|
||||||
|
### All Tasks:
|
||||||
|
0 {uid: 0, status: succeeded, details: { primary_key: None }, kind: IndexCreation { index_uid: "doggos", primary_key: None }}
|
||||||
|
1 {uid: 1, status: succeeded, details: { deleted_documents: Some(0) }, kind: DocumentClear { index_uid: "doggos" }}
|
||||||
|
2 {uid: 2, status: succeeded, details: { deleted_documents: Some(0) }, kind: DocumentClear { index_uid: "doggos" }}
|
||||||
|
3 {uid: 3, status: succeeded, details: { deleted_documents: Some(0) }, kind: DocumentClear { index_uid: "doggos" }}
|
||||||
|
----------------------------------------------------------------------
|
||||||
|
### Status:
|
||||||
|
enqueued []
|
||||||
|
succeeded [0,1,2,3,]
|
||||||
|
----------------------------------------------------------------------
|
||||||
|
### Kind:
|
||||||
|
"documentDeletion" [1,2,3,]
|
||||||
|
"indexCreation" [0,]
|
||||||
|
----------------------------------------------------------------------
|
||||||
|
### Index Tasks:
|
||||||
|
doggos [0,1,2,3,]
|
||||||
|
----------------------------------------------------------------------
|
||||||
|
### Index Mapper:
|
||||||
|
["doggos"]
|
||||||
|
----------------------------------------------------------------------
|
||||||
|
### Enqueued At:
|
||||||
|
[timestamp] [0,]
|
||||||
|
[timestamp] [1,]
|
||||||
|
[timestamp] [2,]
|
||||||
|
[timestamp] [3,]
|
||||||
|
----------------------------------------------------------------------
|
||||||
|
### Started At:
|
||||||
|
[timestamp] [0,]
|
||||||
|
[timestamp] [1,]
|
||||||
|
[timestamp] [2,]
|
||||||
|
[timestamp] [3,]
|
||||||
|
----------------------------------------------------------------------
|
||||||
|
### Finished At:
|
||||||
|
[timestamp] [0,]
|
||||||
|
[timestamp] [1,]
|
||||||
|
[timestamp] [2,]
|
||||||
|
[timestamp] [3,]
|
||||||
|
----------------------------------------------------------------------
|
||||||
|
### File Store:
|
||||||
|
|
||||||
|
----------------------------------------------------------------------
|
||||||
|
|
@ -0,0 +1,42 @@
|
|||||||
|
---
|
||||||
|
source: index-scheduler/src/lib.rs
|
||||||
|
---
|
||||||
|
### Autobatching Enabled = true
|
||||||
|
### Processing Tasks:
|
||||||
|
[]
|
||||||
|
----------------------------------------------------------------------
|
||||||
|
### All Tasks:
|
||||||
|
0 {uid: 0, status: enqueued, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
|
||||||
|
1 {uid: 1, status: enqueued, details: { primary_key: Some("sheep") }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("sheep") }}
|
||||||
|
2 {uid: 2, status: enqueued, details: { swaps: [IndexSwap { indexes: ("catto", "doggo") }] }, kind: IndexSwap { swaps: [IndexSwap { indexes: ("catto", "doggo") }] }}
|
||||||
|
3 {uid: 3, status: enqueued, details: { swaps: [IndexSwap { indexes: ("catto", "whalo") }] }, kind: IndexSwap { swaps: [IndexSwap { indexes: ("catto", "whalo") }] }}
|
||||||
|
----------------------------------------------------------------------
|
||||||
|
### Status:
|
||||||
|
enqueued [0,1,2,3,]
|
||||||
|
----------------------------------------------------------------------
|
||||||
|
### Kind:
|
||||||
|
"indexCreation" [0,1,]
|
||||||
|
"indexSwap" [2,3,]
|
||||||
|
----------------------------------------------------------------------
|
||||||
|
### Index Tasks:
|
||||||
|
catto [0,2,3,]
|
||||||
|
doggo [1,2,]
|
||||||
|
whalo [3,]
|
||||||
|
----------------------------------------------------------------------
|
||||||
|
### Index Mapper:
|
||||||
|
[]
|
||||||
|
----------------------------------------------------------------------
|
||||||
|
### Enqueued At:
|
||||||
|
[timestamp] [0,]
|
||||||
|
[timestamp] [1,]
|
||||||
|
[timestamp] [2,]
|
||||||
|
[timestamp] [3,]
|
||||||
|
----------------------------------------------------------------------
|
||||||
|
### Started At:
|
||||||
|
----------------------------------------------------------------------
|
||||||
|
### Finished At:
|
||||||
|
----------------------------------------------------------------------
|
||||||
|
### File Store:
|
||||||
|
|
||||||
|
----------------------------------------------------------------------
|
||||||
|
|
@ -215,6 +215,27 @@ impl SearchRules {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Return the list of indexes such that `self.is_index_authorized(index) == true`,
|
||||||
|
/// or `None` if all indexes satisfy this condition.
|
||||||
|
pub fn authorized_indexes(&self) -> Option<Vec<String>> {
|
||||||
|
match self {
|
||||||
|
SearchRules::Set(set) => {
|
||||||
|
if set.contains("*") {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
Some(set.iter().cloned().collect())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
SearchRules::Map(map) => {
|
||||||
|
if map.contains_key("*") {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
Some(map.keys().cloned().collect())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl IntoIterator for SearchRules {
|
impl IntoIterator for SearchRules {
|
||||||
|
@ -1,11 +1,11 @@
|
|||||||
use actix_web::web::Data;
|
use actix_web::web::Data;
|
||||||
use actix_web::{web, HttpRequest, HttpResponse};
|
use actix_web::{web, HttpRequest, HttpResponse};
|
||||||
use index_scheduler::{IndexScheduler, Query};
|
use index_scheduler::IndexScheduler;
|
||||||
use log::debug;
|
use log::debug;
|
||||||
use meilisearch_types::error::ResponseError;
|
use meilisearch_types::error::ResponseError;
|
||||||
use meilisearch_types::index_uid::IndexUid;
|
use meilisearch_types::index_uid::IndexUid;
|
||||||
use meilisearch_types::milli::{self, FieldDistribution, Index};
|
use meilisearch_types::milli::{self, FieldDistribution, Index};
|
||||||
use meilisearch_types::tasks::{KindWithContent, Status};
|
use meilisearch_types::tasks::KindWithContent;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use serde_json::json;
|
use serde_json::json;
|
||||||
use time::OffsetDateTime;
|
use time::OffsetDateTime;
|
||||||
@ -202,14 +202,7 @@ impl IndexStats {
|
|||||||
index_uid: String,
|
index_uid: String,
|
||||||
) -> Result<Self, ResponseError> {
|
) -> Result<Self, ResponseError> {
|
||||||
// we check if there is currently a task processing associated with this index.
|
// we check if there is currently a task processing associated with this index.
|
||||||
let processing_task = index_scheduler.get_tasks(Query {
|
let is_processing = index_scheduler.is_index_processing(&index_uid)?;
|
||||||
status: Some(vec![Status::Processing]),
|
|
||||||
index_uid: Some(vec![index_uid.clone()]),
|
|
||||||
limit: Some(1),
|
|
||||||
..Query::default()
|
|
||||||
})?;
|
|
||||||
let is_processing = !processing_task.is_empty();
|
|
||||||
|
|
||||||
let index = index_scheduler.index(&index_uid)?;
|
let index = index_scheduler.index(&index_uid)?;
|
||||||
let rtxn = index.read_txn()?;
|
let rtxn = index.read_txn()?;
|
||||||
Ok(IndexStats {
|
Ok(IndexStats {
|
||||||
|
@ -270,11 +270,10 @@ pub fn create_all_stats(
|
|||||||
let mut last_task: Option<OffsetDateTime> = None;
|
let mut last_task: Option<OffsetDateTime> = None;
|
||||||
let mut indexes = BTreeMap::new();
|
let mut indexes = BTreeMap::new();
|
||||||
let mut database_size = 0;
|
let mut database_size = 0;
|
||||||
let processing_task = index_scheduler.get_tasks(Query {
|
let processing_task = index_scheduler.get_tasks_from_authorized_indexes(
|
||||||
status: Some(vec![Status::Processing]),
|
Query { status: Some(vec![Status::Processing]), limit: Some(1), ..Query::default() },
|
||||||
limit: Some(1),
|
search_rules.authorized_indexes(),
|
||||||
..Query::default()
|
)?;
|
||||||
})?;
|
|
||||||
let processing_index = processing_task.first().and_then(|task| task.index_uid());
|
let processing_index = processing_task.first().and_then(|task| task.index_uid());
|
||||||
for (name, index) in index_scheduler.indexes()? {
|
for (name, index) in index_scheduler.indexes()? {
|
||||||
if !search_rules.is_index_authorized(&name) {
|
if !search_rules.is_index_authorized(&name) {
|
||||||
|
@ -291,8 +291,11 @@ async fn cancel_tasks(
|
|||||||
return Err(index_scheduler::Error::TaskCancelationWithEmptyQuery.into());
|
return Err(index_scheduler::Error::TaskCancelationWithEmptyQuery.into());
|
||||||
}
|
}
|
||||||
|
|
||||||
let filtered_query = filter_out_inaccessible_indexes_from_query(&index_scheduler, &query);
|
let tasks = index_scheduler.get_task_ids_from_authorized_indexes(
|
||||||
let tasks = index_scheduler.get_task_ids(&filtered_query)?;
|
&index_scheduler.read_txn()?,
|
||||||
|
&query,
|
||||||
|
&index_scheduler.filters().search_rules.authorized_indexes(),
|
||||||
|
)?;
|
||||||
let task_cancelation =
|
let task_cancelation =
|
||||||
KindWithContent::TaskCancelation { query: req.query_string().to_string(), tasks };
|
KindWithContent::TaskCancelation { query: req.query_string().to_string(), tasks };
|
||||||
|
|
||||||
@ -348,8 +351,11 @@ async fn delete_tasks(
|
|||||||
return Err(index_scheduler::Error::TaskDeletionWithEmptyQuery.into());
|
return Err(index_scheduler::Error::TaskDeletionWithEmptyQuery.into());
|
||||||
}
|
}
|
||||||
|
|
||||||
let filtered_query = filter_out_inaccessible_indexes_from_query(&index_scheduler, &query);
|
let tasks = index_scheduler.get_task_ids_from_authorized_indexes(
|
||||||
let tasks = index_scheduler.get_task_ids(&filtered_query)?;
|
&index_scheduler.read_txn()?,
|
||||||
|
&query,
|
||||||
|
&index_scheduler.filters().search_rules.authorized_indexes(),
|
||||||
|
)?;
|
||||||
let task_deletion =
|
let task_deletion =
|
||||||
KindWithContent::TaskDeletion { query: req.query_string().to_string(), tasks };
|
KindWithContent::TaskDeletion { query: req.query_string().to_string(), tasks };
|
||||||
|
|
||||||
@ -425,10 +431,15 @@ async fn get_tasks(
|
|||||||
before_finished_at,
|
before_finished_at,
|
||||||
after_finished_at,
|
after_finished_at,
|
||||||
};
|
};
|
||||||
let query = filter_out_inaccessible_indexes_from_query(&index_scheduler, &query);
|
|
||||||
|
|
||||||
let mut tasks_results: Vec<TaskView> =
|
let mut tasks_results: Vec<TaskView> = index_scheduler
|
||||||
index_scheduler.get_tasks(query)?.into_iter().map(|t| TaskView::from_task(&t)).collect();
|
.get_tasks_from_authorized_indexes(
|
||||||
|
query,
|
||||||
|
index_scheduler.filters().search_rules.authorized_indexes(),
|
||||||
|
)?
|
||||||
|
.into_iter()
|
||||||
|
.map(|t| TaskView::from_task(&t))
|
||||||
|
.collect();
|
||||||
|
|
||||||
// If we were able to fetch the number +1 tasks we asked
|
// If we were able to fetch the number +1 tasks we asked
|
||||||
// it means that there is more to come.
|
// it means that there is more to come.
|
||||||
@ -454,17 +465,16 @@ async fn get_task(
|
|||||||
|
|
||||||
analytics.publish("Tasks Seen".to_string(), json!({ "per_task_uid": true }), Some(&req));
|
analytics.publish("Tasks Seen".to_string(), json!({ "per_task_uid": true }), Some(&req));
|
||||||
|
|
||||||
let search_rules = &index_scheduler.filters().search_rules;
|
let mut query = index_scheduler::Query::default();
|
||||||
let mut filters = index_scheduler::Query::default();
|
query.uid = Some(vec![task_id]);
|
||||||
if !search_rules.is_index_authorized("*") {
|
|
||||||
for (index, _policy) in search_rules.clone() {
|
|
||||||
filters = filters.with_index(index);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
filters.uid = Some(vec![task_id]);
|
if let Some(task) = index_scheduler
|
||||||
|
.get_tasks_from_authorized_indexes(
|
||||||
if let Some(task) = index_scheduler.get_tasks(filters)?.first() {
|
query,
|
||||||
|
index_scheduler.filters().search_rules.authorized_indexes(),
|
||||||
|
)?
|
||||||
|
.first()
|
||||||
|
{
|
||||||
let task_view = TaskView::from_task(task);
|
let task_view = TaskView::from_task(task);
|
||||||
Ok(HttpResponse::Ok().json(task_view))
|
Ok(HttpResponse::Ok().json(task_view))
|
||||||
} else {
|
} else {
|
||||||
@ -472,39 +482,6 @@ async fn get_task(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn filter_out_inaccessible_indexes_from_query<const ACTION: u8>(
|
|
||||||
index_scheduler: &GuardedData<ActionPolicy<ACTION>, Data<IndexScheduler>>,
|
|
||||||
query: &Query,
|
|
||||||
) -> Query {
|
|
||||||
let mut query = query.clone();
|
|
||||||
|
|
||||||
// First remove all indexes from the query, we will add them back later
|
|
||||||
let indexes = query.index_uid.take();
|
|
||||||
|
|
||||||
let search_rules = &index_scheduler.filters().search_rules;
|
|
||||||
|
|
||||||
// We filter on potential indexes and make sure that the search filter
|
|
||||||
// restrictions are also applied.
|
|
||||||
match indexes {
|
|
||||||
Some(indexes) => {
|
|
||||||
for name in indexes.iter() {
|
|
||||||
if search_rules.is_index_authorized(name) {
|
|
||||||
query = query.with_index(name.to_string());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
None => {
|
|
||||||
if !search_rules.is_index_authorized("*") {
|
|
||||||
for (index, _policy) in search_rules.clone() {
|
|
||||||
query = query.with_index(index.to_string());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
query
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) mod date_deserializer {
|
pub(crate) mod date_deserializer {
|
||||||
use time::format_description::well_known::Rfc3339;
|
use time::format_description::well_known::Rfc3339;
|
||||||
use time::macros::format_description;
|
use time::macros::format_description;
|
||||||
|
Loading…
Reference in New Issue
Block a user