From 22cf0559fe4511168b331e8e8ec1fa9020ba9274 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Lecrenier?= Date: Wed, 19 Oct 2022 12:59:12 +0200 Subject: [PATCH] Implement task date filters before/after enqueued/started/finished at --- Cargo.lock | 1 + index-scheduler/src/batch.rs | 8 + index-scheduler/src/lib.rs | 96 ++++-- index-scheduler/src/snapshot.rs | 33 +- .../snapshots/lib.rs/document_addition/1.snap | 7 + .../snapshots/lib.rs/document_addition/2.snap | 7 + .../snapshots/lib.rs/document_addition/3.snap | 9 + .../1.snap | 9 + .../src/snapshots/lib.rs/register/1.snap | 11 + .../initial_tasks_enqueued.snap | 8 + .../initial_tasks_processed.snap | 12 +- .../task_deletion_processed.snap | 13 + .../initial_tasks_enqueued.snap | 8 + .../initial_tasks_processed.snap | 12 +- .../task_deletion_processed.snap | 10 + .../initial_tasks_enqueued.snap | 9 + .../task_deletion_done.snap | 12 + .../task_deletion_enqueued.snap | 10 + .../task_deletion_processing.snap | 10 + index-scheduler/src/utils.rs | 103 ++++++- meili-snap/src/lib.rs | 27 +- meilisearch-http/Cargo.toml | 1 + meilisearch-http/src/routes/tasks.rs | 287 +++++++++++++++--- 23 files changed, 619 insertions(+), 84 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c4484a735..35ebb4d0c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2287,6 +2287,7 @@ dependencies = [ "log", "manifest-dir-macros", "maplit", + "meili-snap", "meilisearch-auth", "meilisearch-types", "mimalloc", diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index a253c7fc8..607557a1c 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -2,6 +2,7 @@ use std::collections::HashSet; use std::fs::File; use std::io::BufWriter; +use crate::utils; use crate::{autobatcher::BatchKind, Error, IndexScheduler, Result, TaskId}; use dump::IndexMetadata; @@ -1015,6 +1016,13 @@ impl IndexScheduler { // we can only delete succeeded, failed, and canceled tasks. // In each of those cases, the persisted data is supposed to // have been deleted already. + utils::remove_task_datetime(wtxn, self.enqueued_at, task.enqueued_at, task.uid)?; + if let Some(started_at) = task.started_at { + utils::remove_task_datetime(wtxn, self.started_at, started_at, task.uid)?; + } + if let Some(finished_at) = task.finished_at { + utils::remove_task_datetime(wtxn, self.finished_at, finished_at, task.uid)?; + } } for index in affected_indexes { diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index eac477a3c..28b207bb8 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -11,6 +11,10 @@ pub type TaskId = u32; use dump::{KindDump, TaskDump, UpdateFile}; pub use error::Error; +use meilisearch_types::milli::documents::DocumentsBatchBuilder; +use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task}; +use serde::Serialize; +use utils::keep_tasks_within_datetimes; use std::path::PathBuf; use std::sync::atomic::{AtomicBool, Ordering::Relaxed}; @@ -20,21 +24,20 @@ use file_store::FileStore; use meilisearch_types::error::ResponseError; use meilisearch_types::milli; use roaring::RoaringBitmap; -use serde::{Deserialize, Serialize}; use synchronoise::SignalEvent; use time::OffsetDateTime; use uuid::Uuid; use meilisearch_types::heed::types::{OwnedType, SerdeBincode, SerdeJson, Str}; use meilisearch_types::heed::{self, Database, Env}; -use meilisearch_types::milli::documents::DocumentsBatchBuilder; use meilisearch_types::milli::update::IndexerConfig; -use meilisearch_types::milli::{Index, RoaringBitmapCodec, BEU32}; -use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task}; +use meilisearch_types::milli::{CboRoaringBitmapCodec, Index, RoaringBitmapCodec, BEU32}; use crate::index_mapper::IndexMapper; -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +type BEI128 = meilisearch_types::heed::zerocopy::I128; + +#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize)] #[serde(rename_all = "camelCase")] pub struct Query { pub limit: Option, @@ -44,19 +47,19 @@ pub struct Query { pub kind: Option>, pub index_uid: Option>, pub uid: Option>, -} -impl Default for Query { - fn default() -> Self { - Self { - limit: None, - from: None, - status: None, - kind: None, - index_uid: None, - uid: None, - } - } + #[serde(serialize_with = "time::serde::rfc3339::option::serialize")] + pub before_enqueued_at: Option, + #[serde(serialize_with = "time::serde::rfc3339::option::serialize")] + pub after_enqueued_at: Option, + #[serde(serialize_with = "time::serde::rfc3339::option::serialize")] + pub before_started_at: Option, + #[serde(serialize_with = "time::serde::rfc3339::option::serialize")] + pub after_started_at: Option, + #[serde(serialize_with = "time::serde::rfc3339::option::serialize")] + pub before_finished_at: Option, + #[serde(serialize_with = "time::serde::rfc3339::option::serialize")] + pub after_finished_at: Option, } impl Query { @@ -71,7 +74,13 @@ impl Query { status: None, kind: None, index_uid: None, - uid: None + uid: None, + before_enqueued_at: None, + after_enqueued_at: None, + before_started_at: None, + after_started_at: None, + before_finished_at: None, + after_finished_at: None, } ) } @@ -177,6 +186,9 @@ mod db_name { pub const STATUS: &str = "status"; pub const KIND: &str = "kind"; pub const INDEX_TASKS: &str = "index-tasks"; + pub const ENQUEUED_AT: &str = "enqueued-at"; + pub const STARTED_AT: &str = "started-at"; + pub const FINISHED_AT: &str = "finished-at"; } /// This module is responsible for two things; @@ -202,6 +214,20 @@ pub struct IndexScheduler { /// Store the tasks associated to an index. pub(crate) index_tasks: Database, + /// Store the task ids of tasks which were enqueued at a specific date + /// + /// Note that since we store the date with nanosecond-level precision, it would be + /// reasonable to assume that there is only one task per key. However, it is not a + /// theoretical certainty, and we might want to make it possible to enqueue multiple + /// tasks at a time in the future. + pub(crate) enqueued_at: Database, CboRoaringBitmapCodec>, + + /// Store the task ids of finished tasks which started being processed at a specific date + pub(crate) started_at: Database, CboRoaringBitmapCodec>, + + /// Store the task ids of tasks which finished at a specific date + pub(crate) finished_at: Database, CboRoaringBitmapCodec>, + /// In charge of creating, opening, storing and returning indexes. pub(crate) index_mapper: IndexMapper, @@ -247,7 +273,7 @@ impl IndexScheduler { std::fs::create_dir_all(&dumps_path)?; let mut options = heed::EnvOpenOptions::new(); - options.max_dbs(6); + options.max_dbs(9); let env = options.open(tasks_path)?; let file_store = FileStore::new(&update_file_path)?; @@ -261,6 +287,9 @@ impl IndexScheduler { status: env.create_database(Some(db_name::STATUS))?, kind: env.create_database(Some(db_name::KIND))?, index_tasks: env.create_database(Some(db_name::INDEX_TASKS))?, + enqueued_at: env.create_database(Some(db_name::ENQUEUED_AT))?, + started_at: env.create_database(Some(db_name::STARTED_AT))?, + finished_at: env.create_database(Some(db_name::FINISHED_AT))?, index_mapper: IndexMapper::new(&env, indexes_path, index_size, indexer_config)?, env, // we want to start the loop right away in case meilisearch was ctrl+Ced while processing things @@ -287,6 +316,9 @@ impl IndexScheduler { status: self.status, kind: self.kind, index_tasks: self.index_tasks, + enqueued_at: self.enqueued_at, + started_at: self.started_at, + finished_at: self.finished_at, index_mapper: self.index_mapper.clone(), wake_up: self.wake_up.clone(), autobatching_enabled: self.autobatching_enabled, @@ -359,6 +391,30 @@ impl IndexScheduler { } tasks &= index_tasks; } + keep_tasks_within_datetimes( + &rtxn, + &mut tasks, + self.enqueued_at, + query.after_enqueued_at, + query.before_enqueued_at, + )?; + + keep_tasks_within_datetimes( + &rtxn, + &mut tasks, + self.started_at, + query.after_started_at, + query.before_started_at, + )?; + + keep_tasks_within_datetimes( + &rtxn, + &mut tasks, + self.finished_at, + query.after_finished_at, + query.before_finished_at, + )?; + rtxn.commit().unwrap(); Ok(tasks) } @@ -438,6 +494,8 @@ impl IndexScheduler { (bitmap.insert(task.uid)); })?; + utils::insert_task_datetime(&mut wtxn, self.enqueued_at, task.enqueued_at, task.uid)?; + if let Err(e) = wtxn.commit() { self.delete_persisted_task_data(&task)?; return Err(e.into()); diff --git a/index-scheduler/src/snapshot.rs b/index-scheduler/src/snapshot.rs index 44f8faa36..d10a5c331 100644 --- a/index-scheduler/src/snapshot.rs +++ b/index-scheduler/src/snapshot.rs @@ -1,4 +1,4 @@ -use meilisearch_types::milli::{RoaringBitmapCodec, BEU32}; +use meilisearch_types::milli::{CboRoaringBitmapCodec, RoaringBitmapCodec, BEU32}; use meilisearch_types::tasks::Details; use meilisearch_types::{ heed::{ @@ -9,12 +9,13 @@ use meilisearch_types::{ }; use roaring::RoaringBitmap; +use crate::BEI128; use crate::{index_mapper::IndexMapper, IndexScheduler, Kind, Status}; pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String { let IndexScheduler { autobatching_enabled, - must_stop_processing, + must_stop_processing: _, processing_tasks, file_store, env, @@ -22,6 +23,9 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String { status, kind, index_tasks, + enqueued_at, + started_at, + finished_at, index_mapper, wake_up: _, dumps_path: _, @@ -60,6 +64,18 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String { snap.push_str(&snapshot_index_mapper(&rtxn, index_mapper)); snap.push_str("\n----------------------------------------------------------------------\n"); + snap.push_str("### Enqueued At:\n"); + snap.push_str(&snapshot_date_db(&rtxn, *enqueued_at)); + snap.push_str("----------------------------------------------------------------------\n"); + + snap.push_str("### Started At:\n"); + snap.push_str(&snapshot_date_db(&rtxn, *started_at)); + snap.push_str("----------------------------------------------------------------------\n"); + + snap.push_str("### Finished At:\n"); + snap.push_str(&snapshot_date_db(&rtxn, *finished_at)); + snap.push_str("----------------------------------------------------------------------\n"); + snap.push_str("### File Store:\n"); snap.push_str(&snapshot_file_store(file_store)); snap.push_str("\n----------------------------------------------------------------------\n"); @@ -97,6 +113,19 @@ fn snapshot_all_tasks(rtxn: &RoTxn, db: Database, SerdeJson, CboRoaringBitmapCodec>, +) -> String { + let mut snap = String::new(); + let mut iter = db.iter(rtxn).unwrap(); + while let Some(next) = iter.next() { + let (_timestamp, task_ids) = next.unwrap(); + snap.push_str(&format!("[timestamp] {}\n", snapshot_bitmap(&task_ids))); + } + snap +} + fn snapshot_task(task: &Task) -> String { let mut snap = String::new(); let Task { diff --git a/index-scheduler/src/snapshots/lib.rs/document_addition/1.snap b/index-scheduler/src/snapshots/lib.rs/document_addition/1.snap index 29fda8278..f1276bdfa 100644 --- a/index-scheduler/src/snapshots/lib.rs/document_addition/1.snap +++ b/index-scheduler/src/snapshots/lib.rs/document_addition/1.snap @@ -20,6 +20,13 @@ doggos [0,] ### Index Mapper: [] ---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +---------------------------------------------------------------------- +### Started At: +---------------------------------------------------------------------- +### Finished At: +---------------------------------------------------------------------- ### File Store: 00000000-0000-0000-0000-000000000000 diff --git a/index-scheduler/src/snapshots/lib.rs/document_addition/2.snap b/index-scheduler/src/snapshots/lib.rs/document_addition/2.snap index ff9798905..f0ee39fdd 100644 --- a/index-scheduler/src/snapshots/lib.rs/document_addition/2.snap +++ b/index-scheduler/src/snapshots/lib.rs/document_addition/2.snap @@ -20,6 +20,13 @@ doggos [0,] ### Index Mapper: [] ---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +---------------------------------------------------------------------- +### Started At: +---------------------------------------------------------------------- +### Finished At: +---------------------------------------------------------------------- ### File Store: 00000000-0000-0000-0000-000000000000 diff --git a/index-scheduler/src/snapshots/lib.rs/document_addition/3.snap b/index-scheduler/src/snapshots/lib.rs/document_addition/3.snap index 4c7739942..d01fa5327 100644 --- a/index-scheduler/src/snapshots/lib.rs/document_addition/3.snap +++ b/index-scheduler/src/snapshots/lib.rs/document_addition/3.snap @@ -21,6 +21,15 @@ doggos [0,] ### Index Mapper: ["doggos"] ---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +---------------------------------------------------------------------- +### Started At: +[timestamp] [0,] +---------------------------------------------------------------------- +### Finished At: +[timestamp] [0,] +---------------------------------------------------------------------- ### File Store: ---------------------------------------------------------------------- diff --git a/index-scheduler/src/snapshots/lib.rs/insert_task_while_another_task_is_processing/1.snap b/index-scheduler/src/snapshots/lib.rs/insert_task_while_another_task_is_processing/1.snap index 572cb0596..4410dbcad 100644 --- a/index-scheduler/src/snapshots/lib.rs/insert_task_while_another_task_is_processing/1.snap +++ b/index-scheduler/src/snapshots/lib.rs/insert_task_while_another_task_is_processing/1.snap @@ -23,6 +23,15 @@ doggos [2,] ### Index Mapper: [] ---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +---------------------------------------------------------------------- +### Started At: +---------------------------------------------------------------------- +### Finished At: +---------------------------------------------------------------------- ### File Store: ---------------------------------------------------------------------- diff --git a/index-scheduler/src/snapshots/lib.rs/register/1.snap b/index-scheduler/src/snapshots/lib.rs/register/1.snap index b86acf496..894a440d6 100644 --- a/index-scheduler/src/snapshots/lib.rs/register/1.snap +++ b/index-scheduler/src/snapshots/lib.rs/register/1.snap @@ -27,6 +27,17 @@ doggo [4,] ### Index Mapper: [] ---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +[timestamp] [3,] +[timestamp] [4,] +---------------------------------------------------------------------- +### Started At: +---------------------------------------------------------------------- +### Finished At: +---------------------------------------------------------------------- ### File Store: ---------------------------------------------------------------------- diff --git a/index-scheduler/src/snapshots/lib.rs/task_deletion_delete_same_task_twice/initial_tasks_enqueued.snap b/index-scheduler/src/snapshots/lib.rs/task_deletion_delete_same_task_twice/initial_tasks_enqueued.snap index fd2bc806d..858dd0230 100644 --- a/index-scheduler/src/snapshots/lib.rs/task_deletion_delete_same_task_twice/initial_tasks_enqueued.snap +++ b/index-scheduler/src/snapshots/lib.rs/task_deletion_delete_same_task_twice/initial_tasks_enqueued.snap @@ -22,6 +22,14 @@ doggo [1,] ### Index Mapper: [] ---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +[timestamp] [1,] +---------------------------------------------------------------------- +### Started At: +---------------------------------------------------------------------- +### Finished At: +---------------------------------------------------------------------- ### File Store: 00000000-0000-0000-0000-000000000000 00000000-0000-0000-0000-000000000001 diff --git a/index-scheduler/src/snapshots/lib.rs/task_deletion_delete_same_task_twice/initial_tasks_processed.snap b/index-scheduler/src/snapshots/lib.rs/task_deletion_delete_same_task_twice/initial_tasks_processed.snap index 167d387da..84699ab64 100644 --- a/index-scheduler/src/snapshots/lib.rs/task_deletion_delete_same_task_twice/initial_tasks_processed.snap +++ b/index-scheduler/src/snapshots/lib.rs/task_deletion_delete_same_task_twice/initial_tasks_processed.snap @@ -7,7 +7,7 @@ source: index-scheduler/src/lib.rs ---------------------------------------------------------------------- ### All Tasks: 0 {uid: 0, status: succeeded, details: { received_documents: 1, indexed_documents: Some(1) }, kind: DocumentImport { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }} -1 {uid: 1, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentImport { index_uid: "doggo", primary_key: Some("bone"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000001, documents_count: 1, allow_index_creation: true }} +1 {uid: 1, status: enqueued, details: { received_documents: 1, indexed_documents: Some(0) }, kind: DocumentImport { index_uid: "doggo", primary_key: Some("bone"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000001, documents_count: 1, allow_index_creation: true }} ---------------------------------------------------------------------- ### Status: enqueued [1,] @@ -23,6 +23,16 @@ doggo [1,] ### Index Mapper: ["catto"] ---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +[timestamp] [1,] +---------------------------------------------------------------------- +### Started At: +[timestamp] [0,] +---------------------------------------------------------------------- +### Finished At: +[timestamp] [0,] +---------------------------------------------------------------------- ### File Store: 00000000-0000-0000-0000-000000000001 diff --git a/index-scheduler/src/snapshots/lib.rs/task_deletion_delete_same_task_twice/task_deletion_processed.snap b/index-scheduler/src/snapshots/lib.rs/task_deletion_delete_same_task_twice/task_deletion_processed.snap index b7b4c4b97..740edb2ed 100644 --- a/index-scheduler/src/snapshots/lib.rs/task_deletion_delete_same_task_twice/task_deletion_processed.snap +++ b/index-scheduler/src/snapshots/lib.rs/task_deletion_delete_same_task_twice/task_deletion_processed.snap @@ -25,6 +25,19 @@ doggo [1,] ### Index Mapper: ["catto"] ---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [1,] +[timestamp] [2,] +[timestamp] [3,] +---------------------------------------------------------------------- +### Started At: +[timestamp] [2,] +[timestamp] [3,] +---------------------------------------------------------------------- +### Finished At: +[timestamp] [2,] +[timestamp] [3,] +---------------------------------------------------------------------- ### File Store: 00000000-0000-0000-0000-000000000001 diff --git a/index-scheduler/src/snapshots/lib.rs/task_deletion_deleteable/initial_tasks_enqueued.snap b/index-scheduler/src/snapshots/lib.rs/task_deletion_deleteable/initial_tasks_enqueued.snap index fd2bc806d..858dd0230 100644 --- a/index-scheduler/src/snapshots/lib.rs/task_deletion_deleteable/initial_tasks_enqueued.snap +++ b/index-scheduler/src/snapshots/lib.rs/task_deletion_deleteable/initial_tasks_enqueued.snap @@ -22,6 +22,14 @@ doggo [1,] ### Index Mapper: [] ---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +[timestamp] [1,] +---------------------------------------------------------------------- +### Started At: +---------------------------------------------------------------------- +### Finished At: +---------------------------------------------------------------------- ### File Store: 00000000-0000-0000-0000-000000000000 00000000-0000-0000-0000-000000000001 diff --git a/index-scheduler/src/snapshots/lib.rs/task_deletion_deleteable/initial_tasks_processed.snap b/index-scheduler/src/snapshots/lib.rs/task_deletion_deleteable/initial_tasks_processed.snap index 167d387da..84699ab64 100644 --- a/index-scheduler/src/snapshots/lib.rs/task_deletion_deleteable/initial_tasks_processed.snap +++ b/index-scheduler/src/snapshots/lib.rs/task_deletion_deleteable/initial_tasks_processed.snap @@ -7,7 +7,7 @@ source: index-scheduler/src/lib.rs ---------------------------------------------------------------------- ### All Tasks: 0 {uid: 0, status: succeeded, details: { received_documents: 1, indexed_documents: Some(1) }, kind: DocumentImport { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }} -1 {uid: 1, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentImport { index_uid: "doggo", primary_key: Some("bone"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000001, documents_count: 1, allow_index_creation: true }} +1 {uid: 1, status: enqueued, details: { received_documents: 1, indexed_documents: Some(0) }, kind: DocumentImport { index_uid: "doggo", primary_key: Some("bone"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000001, documents_count: 1, allow_index_creation: true }} ---------------------------------------------------------------------- ### Status: enqueued [1,] @@ -23,6 +23,16 @@ doggo [1,] ### Index Mapper: ["catto"] ---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +[timestamp] [1,] +---------------------------------------------------------------------- +### Started At: +[timestamp] [0,] +---------------------------------------------------------------------- +### Finished At: +[timestamp] [0,] +---------------------------------------------------------------------- ### File Store: 00000000-0000-0000-0000-000000000001 diff --git a/index-scheduler/src/snapshots/lib.rs/task_deletion_deleteable/task_deletion_processed.snap b/index-scheduler/src/snapshots/lib.rs/task_deletion_deleteable/task_deletion_processed.snap index 6d3e2f9ce..8378a940a 100644 --- a/index-scheduler/src/snapshots/lib.rs/task_deletion_deleteable/task_deletion_processed.snap +++ b/index-scheduler/src/snapshots/lib.rs/task_deletion_deleteable/task_deletion_processed.snap @@ -24,6 +24,16 @@ doggo [1,] ### Index Mapper: ["catto"] ---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [1,] +[timestamp] [2,] +---------------------------------------------------------------------- +### Started At: +[timestamp] [2,] +---------------------------------------------------------------------- +### Finished At: +[timestamp] [2,] +---------------------------------------------------------------------- ### File Store: 00000000-0000-0000-0000-000000000001 diff --git a/index-scheduler/src/snapshots/lib.rs/task_deletion_undeleteable/initial_tasks_enqueued.snap b/index-scheduler/src/snapshots/lib.rs/task_deletion_undeleteable/initial_tasks_enqueued.snap index a6ae53c78..b9542ae05 100644 --- a/index-scheduler/src/snapshots/lib.rs/task_deletion_undeleteable/initial_tasks_enqueued.snap +++ b/index-scheduler/src/snapshots/lib.rs/task_deletion_undeleteable/initial_tasks_enqueued.snap @@ -24,6 +24,15 @@ doggo [2,] ### Index Mapper: [] ---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +---------------------------------------------------------------------- +### Started At: +---------------------------------------------------------------------- +### Finished At: +---------------------------------------------------------------------- ### File Store: ---------------------------------------------------------------------- diff --git a/index-scheduler/src/snapshots/lib.rs/task_deletion_undeleteable/task_deletion_done.snap b/index-scheduler/src/snapshots/lib.rs/task_deletion_undeleteable/task_deletion_done.snap index 4e833c022..8fd797849 100644 --- a/index-scheduler/src/snapshots/lib.rs/task_deletion_undeleteable/task_deletion_done.snap +++ b/index-scheduler/src/snapshots/lib.rs/task_deletion_undeleteable/task_deletion_done.snap @@ -27,6 +27,18 @@ doggo [2,] ### Index Mapper: [] ---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +[timestamp] [3,] +---------------------------------------------------------------------- +### Started At: +[timestamp] [3,] +---------------------------------------------------------------------- +### Finished At: +[timestamp] [3,] +---------------------------------------------------------------------- ### File Store: ---------------------------------------------------------------------- diff --git a/index-scheduler/src/snapshots/lib.rs/task_deletion_undeleteable/task_deletion_enqueued.snap b/index-scheduler/src/snapshots/lib.rs/task_deletion_undeleteable/task_deletion_enqueued.snap index c0e353710..a071b65a8 100644 --- a/index-scheduler/src/snapshots/lib.rs/task_deletion_undeleteable/task_deletion_enqueued.snap +++ b/index-scheduler/src/snapshots/lib.rs/task_deletion_undeleteable/task_deletion_enqueued.snap @@ -26,6 +26,16 @@ doggo [2,] ### Index Mapper: [] ---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +[timestamp] [3,] +---------------------------------------------------------------------- +### Started At: +---------------------------------------------------------------------- +### Finished At: +---------------------------------------------------------------------- ### File Store: ---------------------------------------------------------------------- diff --git a/index-scheduler/src/snapshots/lib.rs/task_deletion_undeleteable/task_deletion_processing.snap b/index-scheduler/src/snapshots/lib.rs/task_deletion_undeleteable/task_deletion_processing.snap index e4f0be8c4..03e598a42 100644 --- a/index-scheduler/src/snapshots/lib.rs/task_deletion_undeleteable/task_deletion_processing.snap +++ b/index-scheduler/src/snapshots/lib.rs/task_deletion_undeleteable/task_deletion_processing.snap @@ -26,6 +26,16 @@ doggo [2,] ### Index Mapper: [] ---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +[timestamp] [3,] +---------------------------------------------------------------------- +### Started At: +---------------------------------------------------------------------- +### Finished At: +---------------------------------------------------------------------- ### File Store: ---------------------------------------------------------------------- diff --git a/index-scheduler/src/utils.rs b/index-scheduler/src/utils.rs index c0dad9e0d..efb9dc7b4 100644 --- a/index-scheduler/src/utils.rs +++ b/index-scheduler/src/utils.rs @@ -1,10 +1,15 @@ //! Utility functions on the DBs. Mainly getter and setters. -use meilisearch_types::heed::{types::DecodeIgnore, RoTxn, RwTxn}; -use meilisearch_types::milli::BEU32; -use roaring::{MultiOps, RoaringBitmap}; +use std::ops::Bound; -use crate::{Error, IndexScheduler, Result, Task, TaskId}; +use meilisearch_types::heed::types::OwnedType; +use meilisearch_types::heed::Database; +use meilisearch_types::heed::{types::DecodeIgnore, RoTxn, RwTxn}; +use meilisearch_types::milli::{CboRoaringBitmapCodec, BEU32}; +use roaring::{MultiOps, RoaringBitmap}; +use time::OffsetDateTime; + +use crate::{Error, IndexScheduler, Result, Task, TaskId, BEI128}; use meilisearch_types::tasks::{Kind, Status}; impl IndexScheduler { @@ -75,6 +80,26 @@ impl IndexScheduler { })?; } + if old_task.enqueued_at != task.enqueued_at { + unreachable!("Cannot update a task's enqueued_at time"); + } + if old_task.started_at != task.started_at { + if old_task.started_at.is_some() { + unreachable!("Cannot update a task's started_at time"); + } + if let Some(started_at) = task.started_at { + insert_task_datetime(wtxn, self.started_at, started_at, task.uid)?; + } + } + if old_task.finished_at != task.finished_at { + if old_task.finished_at.is_some() { + unreachable!("Cannot update a task's finished_at time"); + } + if let Some(finished_at) = task.finished_at { + insert_task_datetime(wtxn, self.finished_at, finished_at, task.uid)?; + } + } + self.all_tasks.put(wtxn, &BEU32::new(task.uid), task)?; Ok(()) } @@ -158,3 +183,73 @@ impl IndexScheduler { Ok(()) } } + +pub(crate) fn insert_task_datetime( + wtxn: &mut RwTxn, + database: Database, CboRoaringBitmapCodec>, + + time: OffsetDateTime, + task_id: TaskId, +) -> Result<()> { + let timestamp = BEI128::new(time.unix_timestamp_nanos()); + let mut task_ids = if let Some(existing) = database.get(&wtxn, ×tamp)? { + existing + } else { + RoaringBitmap::new() + }; + task_ids.insert(task_id); + database.put(wtxn, ×tamp, &RoaringBitmap::from_iter([task_id]))?; + Ok(()) +} +pub(crate) fn remove_task_datetime( + wtxn: &mut RwTxn, + database: Database, CboRoaringBitmapCodec>, + + time: OffsetDateTime, + task_id: TaskId, +) -> Result<()> { + let timestamp = BEI128::new(time.unix_timestamp_nanos()); + if let Some(mut existing) = database.get(&wtxn, ×tamp)? { + existing.remove(task_id); + if existing.is_empty() { + database.delete(wtxn, ×tamp)?; + } else { + database.put(wtxn, ×tamp, &RoaringBitmap::from_iter([task_id]))?; + } + } + + Ok(()) +} +pub(crate) fn keep_tasks_within_datetimes( + rtxn: &RoTxn, + tasks: &mut RoaringBitmap, + database: Database, CboRoaringBitmapCodec>, + after: Option, + before: Option, +) -> Result<()> { + let (start, end) = match (&after, &before) { + (None, None) => return Ok(()), + (None, Some(before)) => (Bound::Unbounded, Bound::Excluded(*before)), + (Some(after), None) => (Bound::Excluded(*after), Bound::Unbounded), + (Some(after), Some(before)) => (Bound::Excluded(*after), Bound::Excluded(*before)), + }; + let mut collected_task_ids = RoaringBitmap::new(); + let start = map_bound(start, |b| BEI128::new(b.unix_timestamp_nanos())); + let end = map_bound(end, |b| BEI128::new(b.unix_timestamp_nanos())); + let iter = database.range(&rtxn, &(start, end))?; + for r in iter { + let (_timestamp, task_ids) = r?; + collected_task_ids |= task_ids; + } + *tasks &= collected_task_ids; + Ok(()) +} + +// TODO: remove when Bound::map ( https://github.com/rust-lang/rust/issues/86026 ) is available on stable +fn map_bound(bound: Bound, map: impl FnOnce(T) -> U) -> Bound { + match bound { + Bound::Included(x) => Bound::Included(map(x)), + Bound::Excluded(x) => Bound::Excluded(map(x)), + Bound::Unbounded => Bound::Unbounded, + } +} diff --git a/meili-snap/src/lib.rs b/meili-snap/src/lib.rs index 8477abb24..8991d1640 100644 --- a/meili-snap/src/lib.rs +++ b/meili-snap/src/lib.rs @@ -4,6 +4,8 @@ use std::path::PathBuf; use std::sync::Mutex; use std::{collections::HashMap, path::Path}; +pub use insta; + static SNAPSHOT_NAMES: Lazy>> = Lazy::new(|| Mutex::default()); /// Return the md5 hash of the given string @@ -81,8 +83,8 @@ macro_rules! snapshot_hash { settings.bind(|| { let snap = format!("{}", $value); let hash_snap = $crate::hash_snapshot(&snap); - insta::assert_snapshot!(hash_snap, @$inline); - insta::assert_snapshot!(format!("{}.full", snap_name), snap); + meili_snap::insta::assert_snapshot!(hash_snap, @$inline); + meili_snap::insta::assert_snapshot!(format!("{}.full", snap_name), snap); }); }; ($value:expr, name: $name:expr, @$inline:literal) => { @@ -91,8 +93,8 @@ macro_rules! snapshot_hash { settings.bind(|| { let snap = format!("{}", $value); let hash_snap = $crate::hash_snapshot(&snap); - insta::assert_snapshot!(hash_snap, @$inline); - insta::assert_snapshot!(format!("{}.full", snap_name), snap); + meili_snap::insta::assert_snapshot!(hash_snap, @$inline); + meili_snap::insta::assert_snapshot!(format!("{}.full", snap_name), snap); }); }; } @@ -132,7 +134,7 @@ macro_rules! snapshot { let (settings, snap_name) = $crate::default_snapshot_settings_for_test(Some(&snap_name)); settings.bind(|| { let snap = format!("{}", $value); - insta::assert_snapshot!(format!("{}", snap_name), snap); + meili_snap::insta::assert_snapshot!(format!("{}", snap_name), snap); }); }; ($value:expr, @$inline:literal) => { @@ -141,21 +143,21 @@ macro_rules! snapshot { let (settings, _) = $crate::default_snapshot_settings_for_test(Some("_dummy_argument")); settings.bind(|| { let snap = format!("{}", $value); - insta::assert_snapshot!(snap, @$inline); + meili_snap::insta::assert_snapshot!(snap, @$inline); }); }; ($value:expr) => { let (settings, snap_name) = $crate::default_snapshot_settings_for_test(None); settings.bind(|| { let snap = format!("{}", $value); - insta::assert_snapshot!(format!("{}", snap_name), snap); + meili_snap::insta::assert_snapshot!(format!("{}", snap_name), snap); }); }; } #[cfg(test)] mod tests { - + use crate as meili_snap; #[test] fn snap() { snapshot_hash!(10, @"d3d9446802a44259755d38e6d163e820"); @@ -180,6 +182,7 @@ mod tests { // Currently the name of this module is not part of the snapshot path // It does not bother me, but maybe it is worth changing later on. mod snap { + use crate as meili_snap; #[test] fn some_test() { snapshot_hash!(10, @"d3d9446802a44259755d38e6d163e820"); @@ -214,15 +217,15 @@ mod tests { macro_rules! json_string { ($value:expr, {$($k:expr => $v:expr),*$(,)?}) => { { - let (_, snap) = insta::_prepare_snapshot_for_redaction!($value, {$($k => $v),*}, Json, File); + let (_, snap) = meili_snap::insta::_prepare_snapshot_for_redaction!($value, {$($k => $v),*}, Json, File); snap } }; ($value:expr) => {{ - let value = insta::_macro_support::serialize_value( + let value = meili_snap::insta::_macro_support::serialize_value( &$value, - insta::_macro_support::SerializationFormat::Json, - insta::_macro_support::SnapshotLocation::File + meili_snap::insta::_macro_support::SerializationFormat::Json, + meili_snap::insta::_macro_support::SnapshotLocation::File ); value }}; diff --git a/meilisearch-http/Cargo.toml b/meilisearch-http/Cargo.toml index 54ee2b6a9..1068dd100 100644 --- a/meilisearch-http/Cargo.toml +++ b/meilisearch-http/Cargo.toml @@ -94,6 +94,7 @@ brotli = "3.3.4" manifest-dir-macros = "0.1.16" maplit = "1.0.2" urlencoding = "2.1.2" +meili-snap = {path = "../meili-snap"} yaup = "0.2.1" temp-env = "0.3.1" diff --git a/meilisearch-http/src/routes/tasks.rs b/meilisearch-http/src/routes/tasks.rs index 2803ac056..84eae8a1a 100644 --- a/meilisearch-http/src/routes/tasks.rs +++ b/meilisearch-http/src/routes/tasks.rs @@ -180,26 +180,79 @@ impl From
for DetailsView { } } +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "camelCase", deny_unknown_fields)] +pub struct TaskDateQuery { + #[serde( + default, + skip_serializing_if = "Option::is_none", + serialize_with = "time::serde::rfc3339::option::serialize", + deserialize_with = "rfc3339_date_or_datetime::deserialize" + )] + after_enqueued_at: Option, + #[serde( + default, + skip_serializing_if = "Option::is_none", + serialize_with = "time::serde::rfc3339::option::serialize", + deserialize_with = "rfc3339_date_or_datetime::deserialize" + )] + before_enqueued_at: Option, + #[serde( + default, + skip_serializing_if = "Option::is_none", + serialize_with = "time::serde::rfc3339::option::serialize", + deserialize_with = "rfc3339_date_or_datetime::deserialize" + )] + after_started_at: Option, + #[serde( + default, + skip_serializing_if = "Option::is_none", + serialize_with = "time::serde::rfc3339::option::serialize", + deserialize_with = "rfc3339_date_or_datetime::deserialize" + )] + before_started_at: Option, + + #[serde( + default, + skip_serializing_if = "Option::is_none", + serialize_with = "time::serde::rfc3339::option::serialize", + deserialize_with = "rfc3339_date_or_datetime::deserialize" + )] + after_finished_at: Option, + #[serde( + default, + skip_serializing_if = "Option::is_none", + serialize_with = "time::serde::rfc3339::option::serialize", + deserialize_with = "rfc3339_date_or_datetime::deserialize" + )] + before_finished_at: Option, +} + #[derive(Deserialize, Debug)] #[serde(rename_all = "camelCase", deny_unknown_fields)] pub struct TasksFilterQuery { #[serde(rename = "type")] - type_: Option>>, + kind: Option>>, + uid: Option>, status: Option>>, - index_uid: Option>>, + index_uid: Option>>, #[serde(default = "DEFAULT_LIMIT")] limit: u32, from: Option, + #[serde(flatten)] + dates: TaskDateQuery, } #[derive(Deserialize, Debug)] #[serde(rename_all = "camelCase", deny_unknown_fields)] pub struct TaskDeletionQuery { #[serde(rename = "type")] - type_: Option>, + kind: Option>, uid: Option>, status: Option>, index_uid: Option>, + #[serde(flatten)] + dates: TaskDateQuery, } #[derive(Deserialize, Debug)] @@ -210,6 +263,8 @@ pub struct TaskCancelationQuery { uid: Option>, status: Option>, index_uid: Option>, + #[serde(flatten)] + dates: TaskDateQuery, } async fn cancel_tasks( @@ -222,6 +277,15 @@ async fn cancel_tasks( uid, status, index_uid, + dates: + TaskDateQuery { + after_enqueued_at, + before_enqueued_at, + after_started_at, + before_started_at, + after_finished_at, + before_finished_at, + }, } = params.into_inner(); let kind: Option> = type_.map(|x| x.into_iter().collect()); @@ -237,6 +301,12 @@ async fn cancel_tasks( kind, index_uid, uid, + before_enqueued_at, + after_enqueued_at, + before_started_at, + after_started_at, + before_finished_at, + after_finished_at, }; if query.is_empty() { @@ -262,10 +332,19 @@ async fn delete_tasks( params: web::Query, ) -> Result { let TaskDeletionQuery { - type_, + kind: type_, uid, status, index_uid, + dates: + TaskDateQuery { + after_enqueued_at, + before_enqueued_at, + after_started_at, + before_started_at, + after_finished_at, + before_finished_at, + }, } = params.into_inner(); let kind: Option> = type_.map(|x| x.into_iter().collect()); @@ -281,6 +360,12 @@ async fn delete_tasks( kind, index_uid, uid, + after_enqueued_at, + before_enqueued_at, + after_started_at, + before_started_at, + after_finished_at, + before_finished_at, }; if query.is_empty() { @@ -307,18 +392,27 @@ async fn get_tasks( analytics: web::Data, ) -> Result { let TasksFilterQuery { - type_, + kind, + uid, status, index_uid, limit, from, + dates: + TaskDateQuery { + after_enqueued_at, + before_enqueued_at, + after_started_at, + before_started_at, + after_finished_at, + before_finished_at, + }, } = params.into_inner(); - let search_rules = &index_scheduler.filters().search_rules; - // We first transform a potential indexUid=* into a "not specified indexUid filter" // for every one of the filters: type, status, and indexUid. - let type_: Option> = type_.and_then(fold_star_or); + let type_: Option> = kind.and_then(fold_star_or); + let uid: Option> = uid.map(|x| x.into_iter().collect()); let status: Option> = status.and_then(fold_star_or); let index_uid: Option> = index_uid.and_then(fold_star_or); @@ -332,47 +426,27 @@ async fn get_tasks( Some(&req), ); - // TODO: Lo: use `filter_out_inaccessible_indexes_from_query` here - let mut filters = index_scheduler::Query::default(); - - // Then we filter on potential indexes and make sure that the search filter - // restrictions are also applied. - match index_uid { - Some(indexes) => { - for name in indexes { - if search_rules.is_index_authorized(&name) { - filters = filters.with_index(name.to_string()); - } - } - } - None => { - if !search_rules.is_index_authorized("*") { - for (index, _policy) in search_rules.clone() { - filters = filters.with_index(index.to_string()); - } - } - } - }; - - if let Some(kinds) = type_ { - for kind in kinds { - filters = filters.with_kind(kind); - } - } - - if let Some(statuses) = status { - for status in statuses { - filters = filters.with_status(status); - } - } - - filters.from = from; // We +1 just to know if there is more after this "page" or not. let limit = limit.saturating_add(1); - filters.limit = Some(limit); + + let query = index_scheduler::Query { + limit: Some(limit), + from, + status, + kind: type_, + index_uid, + uid, + before_enqueued_at, + after_enqueued_at, + before_started_at, + after_started_at, + before_finished_at, + after_finished_at, + }; + let query = filter_out_inaccessible_indexes_from_query(&index_scheduler, &query); let mut tasks_results: Vec = index_scheduler - .get_tasks(filters)? + .get_tasks(query)? .into_iter() .map(|t| TaskView::from_task(&t)) .collect(); @@ -462,3 +536,126 @@ fn filter_out_inaccessible_indexes_from_query( query } + +/// Deserialize a datetime optional string using rfc3339, assuming midnight and UTC+0 if not specified +pub mod rfc3339_date_or_datetime { + #[allow(clippy::wildcard_imports)] + use super::*; + use serde::Deserializer; + use time::format_description::well_known::iso8601::{Config, EncodedConfig}; + use time::format_description::well_known::{Iso8601, Rfc3339}; + use time::{Date, PrimitiveDateTime, Time}; + const SERDE_CONFIG: EncodedConfig = Config::DEFAULT.set_year_is_six_digits(true).encode(); + + /// Deserialize an [`Option`] from its ISO 8601 representation. + pub fn deserialize<'a, D: Deserializer<'a>>( + deserializer: D, + ) -> Result, D::Error> { + deserializer.deserialize_option(Visitor) + } + struct Visitor; + + #[derive(Debug)] + struct DeserializeError; + + impl<'a> serde::de::Visitor<'a> for Visitor { + type Value = Option; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("an rfc3339- or iso8601-formatted datetime") + } + fn visit_str(self, value: &str) -> Result, E> { + let datetime = OffsetDateTime::parse(value, &Rfc3339) + .or_else(|_e| OffsetDateTime::parse(value, &Iso8601::)) + .or_else(|_e| { + PrimitiveDateTime::parse(value, &Iso8601::) + .map(|x| x.assume_utc()) + }) + .or_else(|_e| { + Date::parse(value, &Iso8601::) + .map(|date| date.with_time(Time::MIDNIGHT).assume_utc()) + }) + .map_err(|_e| { + serde::de::Error::custom( + "could not parse an rfc3339- or iso8601-formatted date", + ) + })?; + + Ok(Some(datetime)) + } + fn visit_some>( + self, + deserializer: D, + ) -> Result, D::Error> { + deserializer.deserialize_str(Visitor) + } + + fn visit_none(self) -> Result, E> { + Ok(None) + } + + fn visit_unit(self) -> Result { + Ok(None) + } + } +} + +#[cfg(test)] +mod tests { + use crate::routes::tasks::TaskDeletionQuery; + use meili_snap::snapshot; + + #[test] + fn deserialize_task_deletion_query_datetime() { + { + let json = r#" { "afterEnqueuedAt": "2021" } "#; + let err = serde_json::from_str::(json).unwrap_err(); + snapshot!(format!("{err}"), @"could not parse an rfc3339- or iso8601-formatted date at line 1 column 30"); + } + { + let json = r#" { "afterEnqueuedAt": "2021-12" } "#; + let err = serde_json::from_str::(json).unwrap_err(); + snapshot!(format!("{err}"), @"could not parse an rfc3339- or iso8601-formatted date at line 1 column 33"); + } + { + let json = r#" { "afterEnqueuedAt": "2021-12-03" } "#; + let query = serde_json::from_str::(json).unwrap(); + snapshot!(format!("{:?}", query.dates.after_enqueued_at.unwrap()), @"2021-12-03 0:00:00.0 +00:00:00"); + } + { + let json = r#" { "afterEnqueuedAt": "2021-12-03T23" } "#; + let err = serde_json::from_str::(json).unwrap_err(); + snapshot!(format!("{err}"), @"could not parse an rfc3339- or iso8601-formatted date at line 1 column 39"); + } + { + let json = r#" { "afterEnqueuedAt": "2021-12-03T23:45" } "#; + let query = serde_json::from_str::(json).unwrap(); + snapshot!(format!("{:?}", query.dates.after_enqueued_at.unwrap()), @"2021-12-03 23:45:00.0 +00:00:00"); + } + { + let json = r#" { "afterEnqueuedAt": "2021-12-03T23:45:23" } "#; + let query = serde_json::from_str::(json).unwrap(); + snapshot!(format!("{:?}", query.dates.after_enqueued_at.unwrap()), @"2021-12-03 23:45:23.0 +00:00:00"); + } + { + let json = r#" { "afterEnqueuedAt": "2021-12-03T23:45:23 +01:00" } "#; + let err = serde_json::from_str::(json).unwrap_err(); + snapshot!(format!("{err}"), @"could not parse an rfc3339- or iso8601-formatted date at line 1 column 52"); + } + { + let json = r#" { "afterEnqueuedAt": "2021-12-03T23:45:23+01:00" } "#; + let query = serde_json::from_str::(json).unwrap(); + snapshot!(format!("{:?}", query.dates.after_enqueued_at.unwrap()), @"2021-12-03 23:45:23.0 +01:00:00"); + } + { + let json = r#" { "afterEnqueuedAt": "1997-11-12T09:55:06.000000000-06:00" } "#; + let query = serde_json::from_str::(json).unwrap(); + snapshot!(format!("{:?}", query.dates.after_enqueued_at.unwrap()), @"1997-11-12 9:55:06.0 -06:00:00"); + } + { + let json = r#" { "afterEnqueuedAt": "1997-11-12T09:55:06.000000000Z" } "#; + let query = serde_json::from_str::(json).unwrap(); + snapshot!(format!("{:?}", query.dates.after_enqueued_at.unwrap()), @"1997-11-12 9:55:06.0 +00:00:00"); + } + } +}