This commit is contained in:
Tamo 2024-11-13 14:48:58 +01:00
parent 87299ff34c
commit 46a7f87ac8
7 changed files with 225 additions and 9 deletions

View File

@ -79,7 +79,9 @@ pub enum Error {
)] )]
InvalidTaskDate { field: DateField, date: String }, InvalidTaskDate { field: DateField, date: String },
#[error("Task uid `{task_uid}` is invalid. It should only contain numeric characters.")] #[error("Task uid `{task_uid}` is invalid. It should only contain numeric characters.")]
InvalidTaskUids { task_uid: String }, InvalidTaskUid { task_uid: String },
#[error("Batch uid `{batch_uid}` is invalid. It should only contain numeric characters.")]
InvalidBatchUid { batch_uid: String },
#[error( #[error(
"Task status `{status}` is invalid. Available task statuses are {}.", "Task status `{status}` is invalid. Available task statuses are {}.",
enum_iterator::all::<Status>() enum_iterator::all::<Status>()
@ -172,7 +174,8 @@ impl Error {
| Error::SwapIndexesNotFound(_) | Error::SwapIndexesNotFound(_)
| Error::CorruptedDump | Error::CorruptedDump
| Error::InvalidTaskDate { .. } | Error::InvalidTaskDate { .. }
| Error::InvalidTaskUids { .. } | Error::InvalidTaskUid { .. }
| Error::InvalidBatchUid { .. }
| Error::InvalidTaskStatuses { .. } | Error::InvalidTaskStatuses { .. }
| Error::InvalidTaskTypes { .. } | Error::InvalidTaskTypes { .. }
| Error::InvalidTaskCanceledBy { .. } | Error::InvalidTaskCanceledBy { .. }
@ -216,7 +219,8 @@ impl ErrorCode for Error {
Error::SwapIndexNotFound(_) => Code::IndexNotFound, Error::SwapIndexNotFound(_) => Code::IndexNotFound,
Error::SwapIndexesNotFound(_) => Code::IndexNotFound, Error::SwapIndexesNotFound(_) => Code::IndexNotFound,
Error::InvalidTaskDate { field, .. } => (*field).into(), Error::InvalidTaskDate { field, .. } => (*field).into(),
Error::InvalidTaskUids { .. } => Code::InvalidTaskUids, Error::InvalidTaskUid { .. } => Code::InvalidTaskUids,
Error::InvalidBatchUid { .. } => Code::InvalidBatchUids,
Error::InvalidTaskStatuses { .. } => Code::InvalidTaskStatuses, Error::InvalidTaskStatuses { .. } => Code::InvalidTaskStatuses,
Error::InvalidTaskTypes { .. } => Code::InvalidTaskTypes, Error::InvalidTaskTypes { .. } => Code::InvalidTaskTypes,
Error::InvalidTaskCanceledBy { .. } => Code::InvalidTaskCanceledBy, Error::InvalidTaskCanceledBy { .. } => Code::InvalidTaskCanceledBy,

View File

@ -25,6 +25,7 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
env, env,
all_tasks, all_tasks,
all_batches, all_batches,
// task reverse index
status, status,
kind, kind,
index_tasks, index_tasks,
@ -32,6 +33,16 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
enqueued_at, enqueued_at,
started_at, started_at,
finished_at, finished_at,
// batch reverse index
batch_status,
batch_kind,
batch_index_tasks,
batch_canceled_by,
batch_enqueued_at,
batch_started_at,
batch_finished_at,
index_mapper, index_mapper,
features: _, features: _,
max_number_of_tasks: _, max_number_of_tasks: _,

View File

@ -228,6 +228,14 @@ mod db_name {
pub const ENQUEUED_AT: &str = "enqueued-at"; pub const ENQUEUED_AT: &str = "enqueued-at";
pub const STARTED_AT: &str = "started-at"; pub const STARTED_AT: &str = "started-at";
pub const FINISHED_AT: &str = "finished-at"; pub const FINISHED_AT: &str = "finished-at";
pub const BATCH_STATUS: &str = "batch-status";
pub const BATCH_KIND: &str = "batch-kind";
pub const BATCH_INDEX_TASKS: &str = "batch-index-tasks";
pub const BATCH_CANCELED_BY: &str = "batch-canceled_by";
pub const BATCH_ENQUEUED_AT: &str = "batch-enqueued-at";
pub const BATCH_STARTED_AT: &str = "batch-started-at";
pub const BATCH_FINISHED_AT: &str = "batch-finished-at";
} }
#[cfg(test)] #[cfg(test)]
@ -315,6 +323,25 @@ pub struct IndexScheduler {
// Contains all the batches accessible by their Id. // Contains all the batches accessible by their Id.
pub(crate) all_batches: Database<BEU32, SerdeJson<Batch>>, pub(crate) all_batches: Database<BEU32, SerdeJson<Batch>>,
/// All the batches containing a task matching the selected status.
pub(crate) batch_status: Database<SerdeBincode<Status>, RoaringBitmapCodec>,
/// All the batches ids grouped by the kind of their task.
pub(crate) batch_kind: Database<SerdeBincode<Kind>, RoaringBitmapCodec>,
/// Store the batches associated to an index.
pub(crate) batch_index_tasks: Database<Str, RoaringBitmapCodec>,
/// Store the batches containing a task canceled by a task uid
pub(crate) batch_canceled_by: Database<BEU32, RoaringBitmapCodec>,
/// Store the batches containing tasks which were enqueued at a specific date
pub(crate) batch_enqueued_at: Database<BEI128, CboRoaringBitmapCodec>,
/// Store the batches containing finished tasks started at a specific date
pub(crate) batch_started_at: Database<BEI128, CboRoaringBitmapCodec>,
/// Store the batches containing tasks finished at a specific date
pub(crate) batch_finished_at: Database<BEI128, CboRoaringBitmapCodec>,
/// All the tasks ids grouped by their status. /// All the tasks ids grouped by their status.
// TODO we should not be able to serialize a `Status::Processing` in this database. // TODO we should not be able to serialize a `Status::Processing` in this database.
pub(crate) status: Database<SerdeBincode<Status>, RoaringBitmapCodec>, pub(crate) status: Database<SerdeBincode<Status>, RoaringBitmapCodec>,
@ -404,6 +431,8 @@ impl IndexScheduler {
file_store: self.file_store.clone(), file_store: self.file_store.clone(),
all_tasks: self.all_tasks, all_tasks: self.all_tasks,
all_batches: self.all_batches, all_batches: self.all_batches,
// Tasks reverse index
status: self.status, status: self.status,
kind: self.kind, kind: self.kind,
index_tasks: self.index_tasks, index_tasks: self.index_tasks,
@ -411,6 +440,16 @@ impl IndexScheduler {
enqueued_at: self.enqueued_at, enqueued_at: self.enqueued_at,
started_at: self.started_at, started_at: self.started_at,
finished_at: self.finished_at, finished_at: self.finished_at,
// Batches reverse index
batch_status: self.batch_status,
batch_kind: self.batch_kind,
batch_index_tasks: self.batch_index_tasks,
batch_canceled_by: self.batch_canceled_by,
batch_enqueued_at: self.batch_enqueued_at,
batch_started_at: self.batch_started_at,
batch_finished_at: self.batch_finished_at,
index_mapper: self.index_mapper.clone(), index_mapper: self.index_mapper.clone(),
wake_up: self.wake_up.clone(), wake_up: self.wake_up.clone(),
autobatching_enabled: self.autobatching_enabled, autobatching_enabled: self.autobatching_enabled,
@ -470,7 +509,7 @@ impl IndexScheduler {
let env = unsafe { let env = unsafe {
heed::EnvOpenOptions::new() heed::EnvOpenOptions::new()
.max_dbs(12) .max_dbs(19)
.map_size(budget.task_db_size) .map_size(budget.task_db_size)
.open(options.tasks_path) .open(options.tasks_path)
}?; }?;
@ -489,6 +528,14 @@ impl IndexScheduler {
let enqueued_at = env.create_database(&mut wtxn, Some(db_name::ENQUEUED_AT))?; let enqueued_at = env.create_database(&mut wtxn, Some(db_name::ENQUEUED_AT))?;
let started_at = env.create_database(&mut wtxn, Some(db_name::STARTED_AT))?; let started_at = env.create_database(&mut wtxn, Some(db_name::STARTED_AT))?;
let finished_at = env.create_database(&mut wtxn, Some(db_name::FINISHED_AT))?; let finished_at = env.create_database(&mut wtxn, Some(db_name::FINISHED_AT))?;
let batch_status = env.create_database(&mut wtxn, Some(db_name::STATUS))?;
let batch_kind = env.create_database(&mut wtxn, Some(db_name::KIND))?;
let batch_index_tasks = env.create_database(&mut wtxn, Some(db_name::INDEX_TASKS))?;
let batch_canceled_by = env.create_database(&mut wtxn, Some(db_name::CANCELED_BY))?;
let batch_enqueued_at = env.create_database(&mut wtxn, Some(db_name::ENQUEUED_AT))?;
let batch_started_at = env.create_database(&mut wtxn, Some(db_name::STARTED_AT))?;
let batch_finished_at = env.create_database(&mut wtxn, Some(db_name::FINISHED_AT))?;
wtxn.commit()?; wtxn.commit()?;
// allow unreachable_code to get rids of the warning in the case of a test build. // allow unreachable_code to get rids of the warning in the case of a test build.
@ -498,6 +545,7 @@ impl IndexScheduler {
file_store, file_store,
all_tasks, all_tasks,
all_batches, all_batches,
// Task reverse indexes
status, status,
kind, kind,
index_tasks, index_tasks,
@ -505,6 +553,16 @@ impl IndexScheduler {
enqueued_at, enqueued_at,
started_at, started_at,
finished_at, finished_at,
// Batch reverse indexes
batch_status,
batch_kind,
batch_index_tasks,
batch_canceled_by,
batch_enqueued_at,
batch_started_at,
batch_finished_at,
index_mapper: IndexMapper::new( index_mapper: IndexMapper::new(
&env, &env,
options.indexes_path, options.indexes_path,
@ -955,6 +1013,52 @@ impl IndexScheduler {
Ok((tasks, total_tasks.len())) Ok((tasks, total_tasks.len()))
} }
/// Return the batch ids matching the query along with the total number of batches
/// by ignoring the from and limit parameters from the user's point of view.
///
/// There are two differences between an internal query and a query executed by
/// the user.
///
/// 1. IndexSwap tasks are not publicly associated with any index, but they are associated
/// with many indexes internally.
/// 2. The user may not have the rights to access the tasks (internally) associated with all indexes.
pub fn get_task_ids_from_authorized_indexes(
&self,
rtxn: &RoTxn,
query: &Query,
filters: &meilisearch_auth::AuthFilter,
) -> Result<(RoaringBitmap, u64)> {
// compute all batches matching the filter by ignoring the limits, to find the number of batches matching
// the filter.
// As this causes us to compute the filter twice it is slightly inefficient, but doing it this way spares
// us from modifying the underlying implementation, and the performance remains sufficient.
// Should this change, we would modify `get_batch_ids` to directly return the number of matching batches.
let total_batches = self.get_batch_ids(rtxn, &query.clone().without_limits())?;
let mut batches = self.get_batch_ids(rtxn, query)?;
// If the query contains a list of index uid or there is a finite list of authorized indexes,
// then we must exclude all the kinds that aren't associated to one and only one index.
if query.index_uids.is_some() || !filters.all_indexes_authorized() {
for kind in enum_iterator::all::<Kind>().filter(|kind| !kind.related_to_one_index()) {
batches -= self.get_kind(rtxn, kind)?;
}
}
// Any task that is internally associated with a non-authorized index
// must be discarded.
if !filters.all_indexes_authorized() {
let all_indexes_iter = self.index_tasks.iter(rtxn)?;
for result in all_indexes_iter {
let (index, index_tasks) = result?;
if !filters.is_index_authorized(index) {
tasks -= index_tasks;
}
}
}
Ok((tasks, total_tasks.len()))
}
/// Return the tasks matching the query from the user's point of view along /// Return the tasks matching the query from the user's point of view along
/// with the total number of tasks matching the query, ignoring from and limit. /// with the total number of tasks matching the query, ignoring from and limit.
/// ///
@ -1003,6 +1107,54 @@ impl IndexScheduler {
} }
} }
/// Return the batches matching the query from the user's point of view along
/// with the total number of batches matching the query, ignoring from and limit.
///
/// There are two differences between an internal query and a query executed by
/// the user.
///
/// 1. IndexSwap tasks are not publicly associated with any index, but they are associated
/// with many indexes internally.
/// 2. The user may not have the rights to access the tasks (internally) associated with all indexes.
pub fn get_batches_from_authorized_indexes(
&self,
query: Query,
filters: &meilisearch_auth::AuthFilter,
) -> Result<(Vec<Batch>, u64)> {
let rtxn = self.env.read_txn()?;
let (tasks, total) = self.get_batch_ids_from_authorized_indexes(&rtxn, &query, filters)?;
let tasks = self.get_existing_batches(
&rtxn,
tasks.into_iter().rev().take(query.limit.unwrap_or(u32::MAX) as usize),
)?;
let ProcessingTasks { started_at, batch_id, processing } =
self.processing_tasks.read().map_err(|_| Error::CorruptedTaskQueue)?.clone();
let ret = tasks.into_iter();
if processing.is_empty() {
Ok((ret.collect(), total))
} else {
Ok((
ret.map(|task| {
if processing.contains(task.uid) {
Task {
status: Status::Processing,
batch_uid: batch_id,
started_at: Some(started_at),
..task
}
} else {
task
}
})
.collect(),
total,
))
}
}
/// Register a new task in the scheduler. /// Register a new task in the scheduler.
/// ///
/// If it fails and data was associated with the task, it tries to delete the associated data. /// If it fails and data was associated with the task, it tries to delete the associated data.

View File

@ -321,6 +321,7 @@ InvalidTaskLimit , InvalidRequest , BAD_REQUEST ;
InvalidTaskStatuses , InvalidRequest , BAD_REQUEST ; InvalidTaskStatuses , InvalidRequest , BAD_REQUEST ;
InvalidTaskTypes , InvalidRequest , BAD_REQUEST ; InvalidTaskTypes , InvalidRequest , BAD_REQUEST ;
InvalidTaskUids , InvalidRequest , BAD_REQUEST ; InvalidTaskUids , InvalidRequest , BAD_REQUEST ;
InvalidBatchUids , InvalidRequest , BAD_REQUEST ;
IoError , System , UNPROCESSABLE_ENTITY; IoError , System , UNPROCESSABLE_ENTITY;
FeatureNotEnabled , InvalidRequest , BAD_REQUEST ; FeatureNotEnabled , InvalidRequest , BAD_REQUEST ;
MalformedPayload , InvalidRequest , BAD_REQUEST ; MalformedPayload , InvalidRequest , BAD_REQUEST ;

View File

@ -0,0 +1,48 @@
use actix_web::{
web::{self, Data},
HttpResponse,
};
use index_scheduler::{IndexScheduler, Query};
use meilisearch_types::{
batches::BatchId, error::ResponseError, keys::actions, task_view::TaskView,
};
use crate::extractors::{authentication::GuardedData, sequential_extractor::SeqHandler};
use super::ActionPolicy;
pub fn configure(cfg: &mut web::ServiceConfig) {
cfg
// .service(
// web::resource("")
// .route(web::get().to(SeqHandler(get_tasks)))
// )
.service(web::resource("/{batch_id}").route(web::get().to(SeqHandler(get_batch))));
}
async fn get_task(
index_scheduler: GuardedData<ActionPolicy<{ actions::TASKS_GET }>, Data<IndexScheduler>>,
batch_uid: web::Path<String>,
) -> Result<HttpResponse, ResponseError> {
let batch_uid_string = batch_uid.into_inner();
let batch_uid: BatchId = match batch_uid_string.parse() {
Ok(id) => id,
Err(_e) => {
return Err(
index_scheduler::Error::InvalidBatchUid { batch_uid: batch_uid_string }.into()
)
}
};
let query = index_scheduler::Query { uids: Some(vec![batch_uid]), ..Query::default() };
let filters = index_scheduler.filters();
let (tasks, _) = index_scheduler.get_tasks_from_authorized_indexes(query, filters)?;
if let Some(task) = tasks.first() {
let task_view = TaskView::from_task(task);
Ok(HttpResponse::Ok().json(task_view))
} else {
Err(index_scheduler::Error::TaskNotFound(batch_uid).into())
}
}

View File

@ -19,6 +19,7 @@ use crate::Opt;
const PAGINATION_DEFAULT_LIMIT: usize = 20; const PAGINATION_DEFAULT_LIMIT: usize = 20;
mod api_key; mod api_key;
pub mod batches;
mod dump; mod dump;
pub mod features; pub mod features;
pub mod indexes; pub mod indexes;

View File

@ -17,15 +17,13 @@ use time::macros::format_description;
use time::{Date, Duration, OffsetDateTime, Time}; use time::{Date, Duration, OffsetDateTime, Time};
use tokio::task; use tokio::task;
use super::{get_task_id, is_dry_run, SummarizedTaskView}; use super::{get_task_id, is_dry_run, SummarizedTaskView, PAGINATION_DEFAULT_LIMIT};
use crate::analytics::{Aggregate, AggregateMethod, Analytics}; use crate::analytics::{Aggregate, AggregateMethod, Analytics};
use crate::extractors::authentication::policies::*; use crate::extractors::authentication::policies::*;
use crate::extractors::authentication::GuardedData; use crate::extractors::authentication::GuardedData;
use crate::extractors::sequential_extractor::SeqHandler; use crate::extractors::sequential_extractor::SeqHandler;
use crate::{aggregate_methods, Opt}; use crate::{aggregate_methods, Opt};
const DEFAULT_LIMIT: u32 = 20;
pub fn configure(cfg: &mut web::ServiceConfig) { pub fn configure(cfg: &mut web::ServiceConfig) {
cfg.service( cfg.service(
web::resource("") web::resource("")
@ -35,10 +33,11 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
.service(web::resource("/cancel").route(web::post().to(SeqHandler(cancel_tasks)))) .service(web::resource("/cancel").route(web::post().to(SeqHandler(cancel_tasks))))
.service(web::resource("/{task_id}").route(web::get().to(SeqHandler(get_task)))); .service(web::resource("/{task_id}").route(web::get().to(SeqHandler(get_task))));
} }
#[derive(Debug, Deserr)] #[derive(Debug, Deserr)]
#[deserr(error = DeserrQueryParamError, rename_all = camelCase, deny_unknown_fields)] #[deserr(error = DeserrQueryParamError, rename_all = camelCase, deny_unknown_fields)]
pub struct TasksFilterQuery { pub struct TasksFilterQuery {
#[deserr(default = Param(DEFAULT_LIMIT), error = DeserrQueryParamError<InvalidTaskLimit>)] #[deserr(default = Param(PAGINATION_DEFAULT_LIMIT as u32), error = DeserrQueryParamError<InvalidTaskLimit>)]
pub limit: Param<u32>, pub limit: Param<u32>,
#[deserr(default, error = DeserrQueryParamError<InvalidTaskFrom>)] #[deserr(default, error = DeserrQueryParamError<InvalidTaskFrom>)]
pub from: Option<Param<TaskId>>, pub from: Option<Param<TaskId>>,
@ -359,7 +358,7 @@ async fn get_task(
let task_uid: TaskId = match task_uid_string.parse() { let task_uid: TaskId = match task_uid_string.parse() {
Ok(id) => id, Ok(id) => id,
Err(_e) => { Err(_e) => {
return Err(index_scheduler::Error::InvalidTaskUids { task_uid: task_uid_string }.into()) return Err(index_scheduler::Error::InvalidTaskUid { task_uid: task_uid_string }.into())
} }
}; };