diff --git a/crates/index-scheduler/src/error.rs b/crates/index-scheduler/src/error.rs index 3bd378fd6..434764677 100644 --- a/crates/index-scheduler/src/error.rs +++ b/crates/index-scheduler/src/error.rs @@ -79,7 +79,9 @@ pub enum Error { )] InvalidTaskDate { field: DateField, date: String }, #[error("Task uid `{task_uid}` is invalid. It should only contain numeric characters.")] - InvalidTaskUids { task_uid: String }, + InvalidTaskUid { task_uid: String }, + #[error("Batch uid `{batch_uid}` is invalid. It should only contain numeric characters.")] + InvalidBatchUid { batch_uid: String }, #[error( "Task status `{status}` is invalid. Available task statuses are {}.", enum_iterator::all::() @@ -172,7 +174,8 @@ impl Error { | Error::SwapIndexesNotFound(_) | Error::CorruptedDump | Error::InvalidTaskDate { .. } - | Error::InvalidTaskUids { .. } + | Error::InvalidTaskUid { .. } + | Error::InvalidBatchUid { .. } | Error::InvalidTaskStatuses { .. } | Error::InvalidTaskTypes { .. } | Error::InvalidTaskCanceledBy { .. } @@ -216,7 +219,8 @@ impl ErrorCode for Error { Error::SwapIndexNotFound(_) => Code::IndexNotFound, Error::SwapIndexesNotFound(_) => Code::IndexNotFound, Error::InvalidTaskDate { field, .. } => (*field).into(), - Error::InvalidTaskUids { .. } => Code::InvalidTaskUids, + Error::InvalidTaskUid { .. } => Code::InvalidTaskUids, + Error::InvalidBatchUid { .. } => Code::InvalidBatchUids, Error::InvalidTaskStatuses { .. } => Code::InvalidTaskStatuses, Error::InvalidTaskTypes { .. } => Code::InvalidTaskTypes, Error::InvalidTaskCanceledBy { .. } => Code::InvalidTaskCanceledBy, diff --git a/crates/index-scheduler/src/insta_snapshot.rs b/crates/index-scheduler/src/insta_snapshot.rs index 94b9ca859..f2b9df555 100644 --- a/crates/index-scheduler/src/insta_snapshot.rs +++ b/crates/index-scheduler/src/insta_snapshot.rs @@ -25,6 +25,7 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String { env, all_tasks, all_batches, + // task reverse index status, kind, index_tasks, @@ -32,6 +33,16 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String { enqueued_at, started_at, finished_at, + + // batch reverse index + batch_status, + batch_kind, + batch_index_tasks, + batch_canceled_by, + batch_enqueued_at, + batch_started_at, + batch_finished_at, + index_mapper, features: _, max_number_of_tasks: _, diff --git a/crates/index-scheduler/src/lib.rs b/crates/index-scheduler/src/lib.rs index 38031c62a..1ae12a860 100644 --- a/crates/index-scheduler/src/lib.rs +++ b/crates/index-scheduler/src/lib.rs @@ -228,6 +228,14 @@ mod db_name { pub const ENQUEUED_AT: &str = "enqueued-at"; pub const STARTED_AT: &str = "started-at"; pub const FINISHED_AT: &str = "finished-at"; + + pub const BATCH_STATUS: &str = "batch-status"; + pub const BATCH_KIND: &str = "batch-kind"; + pub const BATCH_INDEX_TASKS: &str = "batch-index-tasks"; + pub const BATCH_CANCELED_BY: &str = "batch-canceled_by"; + pub const BATCH_ENQUEUED_AT: &str = "batch-enqueued-at"; + pub const BATCH_STARTED_AT: &str = "batch-started-at"; + pub const BATCH_FINISHED_AT: &str = "batch-finished-at"; } #[cfg(test)] @@ -315,6 +323,25 @@ pub struct IndexScheduler { // Contains all the batches accessible by their Id. pub(crate) all_batches: Database>, + /// All the batches containing a task matching the selected status. + pub(crate) batch_status: Database, RoaringBitmapCodec>, + /// All the batches ids grouped by the kind of their task. + pub(crate) batch_kind: Database, RoaringBitmapCodec>, + /// Store the batches associated to an index. + pub(crate) batch_index_tasks: Database, + + /// Store the batches containing a task canceled by a task uid + pub(crate) batch_canceled_by: Database, + + /// Store the batches containing tasks which were enqueued at a specific date + pub(crate) batch_enqueued_at: Database, + + /// Store the batches containing finished tasks started at a specific date + pub(crate) batch_started_at: Database, + + /// Store the batches containing tasks finished at a specific date + pub(crate) batch_finished_at: Database, + /// All the tasks ids grouped by their status. // TODO we should not be able to serialize a `Status::Processing` in this database. pub(crate) status: Database, RoaringBitmapCodec>, @@ -404,6 +431,8 @@ impl IndexScheduler { file_store: self.file_store.clone(), all_tasks: self.all_tasks, all_batches: self.all_batches, + + // Tasks reverse index status: self.status, kind: self.kind, index_tasks: self.index_tasks, @@ -411,6 +440,16 @@ impl IndexScheduler { enqueued_at: self.enqueued_at, started_at: self.started_at, finished_at: self.finished_at, + + // Batches reverse index + batch_status: self.batch_status, + batch_kind: self.batch_kind, + batch_index_tasks: self.batch_index_tasks, + batch_canceled_by: self.batch_canceled_by, + batch_enqueued_at: self.batch_enqueued_at, + batch_started_at: self.batch_started_at, + batch_finished_at: self.batch_finished_at, + index_mapper: self.index_mapper.clone(), wake_up: self.wake_up.clone(), autobatching_enabled: self.autobatching_enabled, @@ -470,7 +509,7 @@ impl IndexScheduler { let env = unsafe { heed::EnvOpenOptions::new() - .max_dbs(12) + .max_dbs(19) .map_size(budget.task_db_size) .open(options.tasks_path) }?; @@ -489,6 +528,14 @@ impl IndexScheduler { let enqueued_at = env.create_database(&mut wtxn, Some(db_name::ENQUEUED_AT))?; let started_at = env.create_database(&mut wtxn, Some(db_name::STARTED_AT))?; let finished_at = env.create_database(&mut wtxn, Some(db_name::FINISHED_AT))?; + + let batch_status = env.create_database(&mut wtxn, Some(db_name::STATUS))?; + let batch_kind = env.create_database(&mut wtxn, Some(db_name::KIND))?; + let batch_index_tasks = env.create_database(&mut wtxn, Some(db_name::INDEX_TASKS))?; + let batch_canceled_by = env.create_database(&mut wtxn, Some(db_name::CANCELED_BY))?; + let batch_enqueued_at = env.create_database(&mut wtxn, Some(db_name::ENQUEUED_AT))?; + let batch_started_at = env.create_database(&mut wtxn, Some(db_name::STARTED_AT))?; + let batch_finished_at = env.create_database(&mut wtxn, Some(db_name::FINISHED_AT))?; wtxn.commit()?; // allow unreachable_code to get rids of the warning in the case of a test build. @@ -498,6 +545,7 @@ impl IndexScheduler { file_store, all_tasks, all_batches, + // Task reverse indexes status, kind, index_tasks, @@ -505,6 +553,16 @@ impl IndexScheduler { enqueued_at, started_at, finished_at, + + // Batch reverse indexes + batch_status, + batch_kind, + batch_index_tasks, + batch_canceled_by, + batch_enqueued_at, + batch_started_at, + batch_finished_at, + index_mapper: IndexMapper::new( &env, options.indexes_path, @@ -955,6 +1013,52 @@ impl IndexScheduler { Ok((tasks, total_tasks.len())) } + /// Return the batch ids matching the query along with the total number of batches + /// by ignoring the from and limit parameters from the user's point of view. + /// + /// There are two differences between an internal query and a query executed by + /// the user. + /// + /// 1. IndexSwap tasks are not publicly associated with any index, but they are associated + /// with many indexes internally. + /// 2. The user may not have the rights to access the tasks (internally) associated with all indexes. + pub fn get_task_ids_from_authorized_indexes( + &self, + rtxn: &RoTxn, + query: &Query, + filters: &meilisearch_auth::AuthFilter, + ) -> Result<(RoaringBitmap, u64)> { + // compute all batches matching the filter by ignoring the limits, to find the number of batches matching + // the filter. + // As this causes us to compute the filter twice it is slightly inefficient, but doing it this way spares + // us from modifying the underlying implementation, and the performance remains sufficient. + // Should this change, we would modify `get_batch_ids` to directly return the number of matching batches. + let total_batches = self.get_batch_ids(rtxn, &query.clone().without_limits())?; + let mut batches = self.get_batch_ids(rtxn, query)?; + + // If the query contains a list of index uid or there is a finite list of authorized indexes, + // then we must exclude all the kinds that aren't associated to one and only one index. + if query.index_uids.is_some() || !filters.all_indexes_authorized() { + for kind in enum_iterator::all::().filter(|kind| !kind.related_to_one_index()) { + batches -= self.get_kind(rtxn, kind)?; + } + } + + // Any task that is internally associated with a non-authorized index + // must be discarded. + if !filters.all_indexes_authorized() { + let all_indexes_iter = self.index_tasks.iter(rtxn)?; + for result in all_indexes_iter { + let (index, index_tasks) = result?; + if !filters.is_index_authorized(index) { + tasks -= index_tasks; + } + } + } + + Ok((tasks, total_tasks.len())) + } + /// Return the tasks matching the query from the user's point of view along /// with the total number of tasks matching the query, ignoring from and limit. /// @@ -1003,6 +1107,54 @@ impl IndexScheduler { } } + /// Return the batches matching the query from the user's point of view along + /// with the total number of batches matching the query, ignoring from and limit. + /// + /// There are two differences between an internal query and a query executed by + /// the user. + /// + /// 1. IndexSwap tasks are not publicly associated with any index, but they are associated + /// with many indexes internally. + /// 2. The user may not have the rights to access the tasks (internally) associated with all indexes. + pub fn get_batches_from_authorized_indexes( + &self, + query: Query, + filters: &meilisearch_auth::AuthFilter, + ) -> Result<(Vec, u64)> { + let rtxn = self.env.read_txn()?; + + let (tasks, total) = self.get_batch_ids_from_authorized_indexes(&rtxn, &query, filters)?; + let tasks = self.get_existing_batches( + &rtxn, + tasks.into_iter().rev().take(query.limit.unwrap_or(u32::MAX) as usize), + )?; + + let ProcessingTasks { started_at, batch_id, processing } = + self.processing_tasks.read().map_err(|_| Error::CorruptedTaskQueue)?.clone(); + + let ret = tasks.into_iter(); + if processing.is_empty() { + Ok((ret.collect(), total)) + } else { + Ok(( + ret.map(|task| { + if processing.contains(task.uid) { + Task { + status: Status::Processing, + batch_uid: batch_id, + started_at: Some(started_at), + ..task + } + } else { + task + } + }) + .collect(), + total, + )) + } + } + /// Register a new task in the scheduler. /// /// If it fails and data was associated with the task, it tries to delete the associated data. diff --git a/crates/meilisearch-types/src/error.rs b/crates/meilisearch-types/src/error.rs index 514ed18c3..452590be0 100644 --- a/crates/meilisearch-types/src/error.rs +++ b/crates/meilisearch-types/src/error.rs @@ -321,6 +321,7 @@ InvalidTaskLimit , InvalidRequest , BAD_REQUEST ; InvalidTaskStatuses , InvalidRequest , BAD_REQUEST ; InvalidTaskTypes , InvalidRequest , BAD_REQUEST ; InvalidTaskUids , InvalidRequest , BAD_REQUEST ; +InvalidBatchUids , InvalidRequest , BAD_REQUEST ; IoError , System , UNPROCESSABLE_ENTITY; FeatureNotEnabled , InvalidRequest , BAD_REQUEST ; MalformedPayload , InvalidRequest , BAD_REQUEST ; diff --git a/crates/meilisearch/src/routes/batches.rs b/crates/meilisearch/src/routes/batches.rs new file mode 100644 index 000000000..f8e626a17 --- /dev/null +++ b/crates/meilisearch/src/routes/batches.rs @@ -0,0 +1,48 @@ +use actix_web::{ + web::{self, Data}, + HttpResponse, +}; +use index_scheduler::{IndexScheduler, Query}; +use meilisearch_types::{ + batches::BatchId, error::ResponseError, keys::actions, task_view::TaskView, +}; + +use crate::extractors::{authentication::GuardedData, sequential_extractor::SeqHandler}; + +use super::ActionPolicy; + +pub fn configure(cfg: &mut web::ServiceConfig) { + cfg + // .service( + // web::resource("") + // .route(web::get().to(SeqHandler(get_tasks))) + // ) + .service(web::resource("/{batch_id}").route(web::get().to(SeqHandler(get_batch)))); +} + +async fn get_task( + index_scheduler: GuardedData, Data>, + batch_uid: web::Path, +) -> Result { + let batch_uid_string = batch_uid.into_inner(); + + let batch_uid: BatchId = match batch_uid_string.parse() { + Ok(id) => id, + Err(_e) => { + return Err( + index_scheduler::Error::InvalidBatchUid { batch_uid: batch_uid_string }.into() + ) + } + }; + + let query = index_scheduler::Query { uids: Some(vec![batch_uid]), ..Query::default() }; + let filters = index_scheduler.filters(); + let (tasks, _) = index_scheduler.get_tasks_from_authorized_indexes(query, filters)?; + + if let Some(task) = tasks.first() { + let task_view = TaskView::from_task(task); + Ok(HttpResponse::Ok().json(task_view)) + } else { + Err(index_scheduler::Error::TaskNotFound(batch_uid).into()) + } +} diff --git a/crates/meilisearch/src/routes/mod.rs b/crates/meilisearch/src/routes/mod.rs index b7260ea08..f5c512ab9 100644 --- a/crates/meilisearch/src/routes/mod.rs +++ b/crates/meilisearch/src/routes/mod.rs @@ -19,6 +19,7 @@ use crate::Opt; const PAGINATION_DEFAULT_LIMIT: usize = 20; mod api_key; +pub mod batches; mod dump; pub mod features; pub mod indexes; diff --git a/crates/meilisearch/src/routes/tasks.rs b/crates/meilisearch/src/routes/tasks.rs index 95959d6d5..db07ad59c 100644 --- a/crates/meilisearch/src/routes/tasks.rs +++ b/crates/meilisearch/src/routes/tasks.rs @@ -17,15 +17,13 @@ use time::macros::format_description; use time::{Date, Duration, OffsetDateTime, Time}; use tokio::task; -use super::{get_task_id, is_dry_run, SummarizedTaskView}; +use super::{get_task_id, is_dry_run, SummarizedTaskView, PAGINATION_DEFAULT_LIMIT}; use crate::analytics::{Aggregate, AggregateMethod, Analytics}; use crate::extractors::authentication::policies::*; use crate::extractors::authentication::GuardedData; use crate::extractors::sequential_extractor::SeqHandler; use crate::{aggregate_methods, Opt}; -const DEFAULT_LIMIT: u32 = 20; - pub fn configure(cfg: &mut web::ServiceConfig) { cfg.service( web::resource("") @@ -35,10 +33,11 @@ pub fn configure(cfg: &mut web::ServiceConfig) { .service(web::resource("/cancel").route(web::post().to(SeqHandler(cancel_tasks)))) .service(web::resource("/{task_id}").route(web::get().to(SeqHandler(get_task)))); } + #[derive(Debug, Deserr)] #[deserr(error = DeserrQueryParamError, rename_all = camelCase, deny_unknown_fields)] pub struct TasksFilterQuery { - #[deserr(default = Param(DEFAULT_LIMIT), error = DeserrQueryParamError)] + #[deserr(default = Param(PAGINATION_DEFAULT_LIMIT as u32), error = DeserrQueryParamError)] pub limit: Param, #[deserr(default, error = DeserrQueryParamError)] pub from: Option>, @@ -359,7 +358,7 @@ async fn get_task( let task_uid: TaskId = match task_uid_string.parse() { Ok(id) => id, Err(_e) => { - return Err(index_scheduler::Error::InvalidTaskUids { task_uid: task_uid_string }.into()) + return Err(index_scheduler::Error::InvalidTaskUid { task_uid: task_uid_string }.into()) } };