From 9d5cc88cd5d99ec937478e7b0182bee7a8f398ef Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Tue, 31 May 2022 11:56:51 +0200 Subject: [PATCH 1/8] Implement the seek-based tasks list pagination --- meilisearch-http/src/routes/tasks.rs | 33 +++++++++++++++++++++++----- meilisearch-http/src/task.rs | 12 ++++------ 2 files changed, 32 insertions(+), 13 deletions(-) diff --git a/meilisearch-http/src/routes/tasks.rs b/meilisearch-http/src/routes/tasks.rs index ca4824517..821142399 100644 --- a/meilisearch-http/src/routes/tasks.rs +++ b/meilisearch-http/src/routes/tasks.rs @@ -26,6 +26,8 @@ pub struct TaskFilterQuery { type_: Option>>, status: Option>>, index_uid: Option>>, + limit: Option, // TODO must not return an error when deser fail + after: Option, // TODO must not return an error when deser fail } #[rustfmt::skip] @@ -68,11 +70,13 @@ async fn get_tasks( type_, status, index_uid, + limit, + after, } = params.into_inner(); let search_rules = &meilisearch.filters().search_rules; - // We first tranform a potential indexUid=* into a "not specified indexUid filter" + // We first transform a potential indexUid=* into a "not specified indexUid filter" // for every one of the filters: type, status, and indexUid. let type_ = type_.map(CS::into_inner).and_then(fold_star_or); let status = status.map(CS::into_inner).and_then(fold_star_or); @@ -128,13 +132,32 @@ async fn get_tasks( indexes_filters }; - let tasks: TaskListView = meilisearch - .list_tasks(filters, None, None) + // We +1 just to know if there is more after this "page" or not. + let limit = limit.unwrap_or(DEFAULT_LIMIT).saturating_add(1); + // We -1 here because we need an offset and we must exclude the `after` one. + let offset = after.map(|n| n.saturating_sub(1)); + + let mut tasks_results = meilisearch + .list_tasks(filters, Some(limit), offset) .await? .into_iter() .map(TaskView::from) - .collect::>() - .into(); + .collect::>(); + + // If we were able to fetch the number +1 tasks we asked + // it means that there is more to come. + let after = if tasks_results.len() == limit { + tasks_results.pop(); + tasks_results.last().map(|t| t.uid) + } else { + None + }; + + let tasks = TaskListView { + results: tasks_results, + limit: limit.saturating_sub(1), + after, + }; Ok(HttpResponse::Ok().json(tasks)) } diff --git a/meilisearch-http/src/task.rs b/meilisearch-http/src/task.rs index 56eeabfc8..8eec71a4e 100644 --- a/meilisearch-http/src/task.rs +++ b/meilisearch-http/src/task.rs @@ -180,7 +180,7 @@ fn serialize_duration( #[derive(Debug, Serialize)] #[serde(rename_all = "camelCase")] pub struct TaskView { - uid: TaskId, + pub uid: TaskId, index_uid: Option, status: TaskStatus, #[serde(rename = "type")] @@ -369,13 +369,9 @@ impl From for TaskView { #[derive(Debug, Serialize)] pub struct TaskListView { - results: Vec, -} - -impl From> for TaskListView { - fn from(results: Vec) -> Self { - Self { results } - } + pub results: Vec, + pub limit: usize, + pub after: Option, } #[derive(Debug, Serialize)] From 004c8b6be36780954ad2e2cc541e852e4a8c9e13 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Tue, 31 May 2022 16:58:08 +0200 Subject: [PATCH 2/8] Add the new limit and after fields in the dump tests --- meilisearch-http/tests/dumps/mod.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/meilisearch-http/tests/dumps/mod.rs b/meilisearch-http/tests/dumps/mod.rs index 426e4f941..2f8938d28 100644 --- a/meilisearch-http/tests/dumps/mod.rs +++ b/meilisearch-http/tests/dumps/mod.rs @@ -68,7 +68,7 @@ async fn import_dump_v2_movie_raw() { assert_eq!(code, 200); assert_eq!( tasks, - json!({ "results": [{"uid": 0, "indexUid": "indexUID", "status": "succeeded", "type": "documentAdditionOrUpdate", "details": { "receivedDocuments": 0, "indexedDocuments": 31944 }, "duration": "PT41.751156S", "enqueuedAt": "2021-09-08T08:30:30.550282Z", "startedAt": "2021-09-08T08:30:30.553012Z", "finishedAt": "2021-09-08T08:31:12.304168Z"}]}) + json!({ "results": [{"uid": 0, "indexUid": "indexUID", "status": "succeeded", "type": "documentAdditionOrUpdate", "details": { "receivedDocuments": 0, "indexedDocuments": 31944 }, "duration": "PT41.751156S", "enqueuedAt": "2021-09-08T08:30:30.550282Z", "startedAt": "2021-09-08T08:30:30.553012Z", "finishedAt": "2021-09-08T08:31:12.304168Z" }], "limit": 20, "after": null }) ); // finally we're just going to check that we can still get a few documents by id @@ -132,7 +132,7 @@ async fn import_dump_v2_movie_with_settings() { assert_eq!(code, 200); assert_eq!( tasks, - json!({ "results": [{ "uid": 1, "indexUid": "indexUID", "status": "succeeded", "type": "settingsUpdate", "details": { "displayedAttributes": ["title", "genres", "overview", "poster", "release_date"], "searchableAttributes": ["title", "overview"], "filterableAttributes": ["genres"], "stopWords": ["of", "the"] }, "duration": "PT37.488777S", "enqueuedAt": "2021-09-08T08:24:02.323444Z", "startedAt": "2021-09-08T08:24:02.324145Z", "finishedAt": "2021-09-08T08:24:39.812922Z" }, { "uid": 0, "indexUid": "indexUID", "status": "succeeded", "type": "documentAdditionOrUpdate", "details": { "receivedDocuments": 0, "indexedDocuments": 31944 }, "duration": "PT39.941318S", "enqueuedAt": "2021-09-08T08:21:14.742672Z", "startedAt": "2021-09-08T08:21:14.750166Z", "finishedAt": "2021-09-08T08:21:54.691484Z" }]}) + json!({ "results": [{ "uid": 1, "indexUid": "indexUID", "status": "succeeded", "type": "settingsUpdate", "details": { "displayedAttributes": ["title", "genres", "overview", "poster", "release_date"], "searchableAttributes": ["title", "overview"], "filterableAttributes": ["genres"], "stopWords": ["of", "the"] }, "duration": "PT37.488777S", "enqueuedAt": "2021-09-08T08:24:02.323444Z", "startedAt": "2021-09-08T08:24:02.324145Z", "finishedAt": "2021-09-08T08:24:39.812922Z" }, { "uid": 0, "indexUid": "indexUID", "status": "succeeded", "type": "documentAdditionOrUpdate", "details": { "receivedDocuments": 0, "indexedDocuments": 31944 }, "duration": "PT39.941318S", "enqueuedAt": "2021-09-08T08:21:14.742672Z", "startedAt": "2021-09-08T08:21:14.750166Z", "finishedAt": "2021-09-08T08:21:54.691484Z" }], "limit": 20, "after": null }) ); // finally we're just going to check that we can still get a few documents by id @@ -264,7 +264,7 @@ async fn import_dump_v3_movie_raw() { assert_eq!(code, 200); assert_eq!( tasks, - json!({ "results": [{"uid": 0, "indexUid": "indexUID", "status": "succeeded", "type": "documentAdditionOrUpdate", "details": { "receivedDocuments": 0, "indexedDocuments": 31944 }, "duration": "PT41.751156S", "enqueuedAt": "2021-09-08T08:30:30.550282Z", "startedAt": "2021-09-08T08:30:30.553012Z", "finishedAt": "2021-09-08T08:31:12.304168Z"}]}) + json!({ "results": [{"uid": 0, "indexUid": "indexUID", "status": "succeeded", "type": "documentAdditionOrUpdate", "details": { "receivedDocuments": 0, "indexedDocuments": 31944 }, "duration": "PT41.751156S", "enqueuedAt": "2021-09-08T08:30:30.550282Z", "startedAt": "2021-09-08T08:30:30.553012Z", "finishedAt": "2021-09-08T08:31:12.304168Z" }], "limit": 20, "after": null }) ); // finally we're just going to check that we can still get a few documents by id @@ -328,7 +328,7 @@ async fn import_dump_v3_movie_with_settings() { assert_eq!(code, 200); assert_eq!( tasks, - json!({ "results": [{ "uid": 1, "indexUid": "indexUID", "status": "succeeded", "type": "settingsUpdate", "details": { "displayedAttributes": ["title", "genres", "overview", "poster", "release_date"], "searchableAttributes": ["title", "overview"], "filterableAttributes": ["genres"], "stopWords": ["of", "the"] }, "duration": "PT37.488777S", "enqueuedAt": "2021-09-08T08:24:02.323444Z", "startedAt": "2021-09-08T08:24:02.324145Z", "finishedAt": "2021-09-08T08:24:39.812922Z" }, { "uid": 0, "indexUid": "indexUID", "status": "succeeded", "type": "documentAdditionOrUpdate", "details": { "receivedDocuments": 0, "indexedDocuments": 31944 }, "duration": "PT39.941318S", "enqueuedAt": "2021-09-08T08:21:14.742672Z", "startedAt": "2021-09-08T08:21:14.750166Z", "finishedAt": "2021-09-08T08:21:54.691484Z" }]}) + json!({ "results": [{ "uid": 1, "indexUid": "indexUID", "status": "succeeded", "type": "settingsUpdate", "details": { "displayedAttributes": ["title", "genres", "overview", "poster", "release_date"], "searchableAttributes": ["title", "overview"], "filterableAttributes": ["genres"], "stopWords": ["of", "the"] }, "duration": "PT37.488777S", "enqueuedAt": "2021-09-08T08:24:02.323444Z", "startedAt": "2021-09-08T08:24:02.324145Z", "finishedAt": "2021-09-08T08:24:39.812922Z" }, { "uid": 0, "indexUid": "indexUID", "status": "succeeded", "type": "documentAdditionOrUpdate", "details": { "receivedDocuments": 0, "indexedDocuments": 31944 }, "duration": "PT39.941318S", "enqueuedAt": "2021-09-08T08:21:14.742672Z", "startedAt": "2021-09-08T08:21:14.750166Z", "finishedAt": "2021-09-08T08:21:54.691484Z" }], "limit": 20, "after": null }) ); // finally we're just going to check that we can["results"] still get a few documents by id @@ -460,7 +460,7 @@ async fn import_dump_v4_movie_raw() { assert_eq!(code, 200); assert_eq!( tasks, - json!({ "results": [{"uid": 0, "indexUid": "indexUID", "status": "succeeded", "type": "documentAdditionOrUpdate", "details": { "receivedDocuments": 0, "indexedDocuments": 31944 }, "duration": "PT41.751156S", "enqueuedAt": "2021-09-08T08:30:30.550282Z", "startedAt": "2021-09-08T08:30:30.553012Z", "finishedAt": "2021-09-08T08:31:12.304168Z"}]}) + json!({ "results": [{"uid": 0, "indexUid": "indexUID", "status": "succeeded", "type": "documentAdditionOrUpdate", "details": { "receivedDocuments": 0, "indexedDocuments": 31944 }, "duration": "PT41.751156S", "enqueuedAt": "2021-09-08T08:30:30.550282Z", "startedAt": "2021-09-08T08:30:30.553012Z", "finishedAt": "2021-09-08T08:31:12.304168Z" }], "limit" : 20, "after": null }) ); // finally we're just going to check that we can still get a few documents by id @@ -524,7 +524,7 @@ async fn import_dump_v4_movie_with_settings() { assert_eq!(code, 200); assert_eq!( tasks, - json!({ "results": [{ "uid": 1, "indexUid": "indexUID", "status": "succeeded", "type": "settingsUpdate", "details": { "displayedAttributes": ["title", "genres", "overview", "poster", "release_date"], "searchableAttributes": ["title", "overview"], "filterableAttributes": ["genres"], "stopWords": ["of", "the"] }, "duration": "PT37.488777S", "enqueuedAt": "2021-09-08T08:24:02.323444Z", "startedAt": "2021-09-08T08:24:02.324145Z", "finishedAt": "2021-09-08T08:24:39.812922Z" }, { "uid": 0, "indexUid": "indexUID", "status": "succeeded", "type": "documentAdditionOrUpdate", "details": { "receivedDocuments": 0, "indexedDocuments": 31944 }, "duration": "PT39.941318S", "enqueuedAt": "2021-09-08T08:21:14.742672Z", "startedAt": "2021-09-08T08:21:14.750166Z", "finishedAt": "2021-09-08T08:21:54.691484Z" }]}) + json!({ "results": [{ "uid": 1, "indexUid": "indexUID", "status": "succeeded", "type": "settingsUpdate", "details": { "displayedAttributes": ["title", "genres", "overview", "poster", "release_date"], "searchableAttributes": ["title", "overview"], "filterableAttributes": ["genres"], "stopWords": ["of", "the"] }, "duration": "PT37.488777S", "enqueuedAt": "2021-09-08T08:24:02.323444Z", "startedAt": "2021-09-08T08:24:02.324145Z", "finishedAt": "2021-09-08T08:24:39.812922Z" }, { "uid": 0, "indexUid": "indexUID", "status": "succeeded", "type": "documentAdditionOrUpdate", "details": { "receivedDocuments": 0, "indexedDocuments": 31944 }, "duration": "PT39.941318S", "enqueuedAt": "2021-09-08T08:21:14.742672Z", "startedAt": "2021-09-08T08:21:14.750166Z", "finishedAt": "2021-09-08T08:21:54.691484Z" }], "limit": 20, "after": null }) ); // finally we're just going to check that we can still get a few documents by id From 461b91fd13a5575d574fb3612a74ba948d993cb3 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Wed, 1 Jun 2022 10:20:33 +0200 Subject: [PATCH 3/8] Introduce the fetch_unfinished_tasks function to fetch tasks --- meilisearch-lib/src/tasks/scheduler.rs | 8 +--- meilisearch-lib/src/tasks/task_store/mod.rs | 18 ++++++++ meilisearch-lib/src/tasks/task_store/store.rs | 43 ++++++++++++++++--- 3 files changed, 56 insertions(+), 13 deletions(-) diff --git a/meilisearch-lib/src/tasks/scheduler.rs b/meilisearch-lib/src/tasks/scheduler.rs index 19265a911..dddb6dff9 100644 --- a/meilisearch-lib/src/tasks/scheduler.rs +++ b/meilisearch-lib/src/tasks/scheduler.rs @@ -342,14 +342,8 @@ impl Scheduler { } async fn fetch_pending_tasks(&mut self) -> Result<()> { - // We must NEVER re-enqueue an already processed task! It's content uuid would point to an unexisting file. - // - // TODO(marin): This may create some latency when the first batch lazy loads the pending updates. - let mut filter = TaskFilter::default(); - filter.filter_fn(|task| !task.is_finished()); - self.store - .list_tasks(Some(self.next_fetched_task_id), Some(filter), None) + .fetch_unfinished_tasks(Some(self.next_fetched_task_id)) .await? .into_iter() // The tasks arrive in reverse order, and we need to insert them in order. diff --git a/meilisearch-lib/src/tasks/task_store/mod.rs b/meilisearch-lib/src/tasks/task_store/mod.rs index 3645717e6..835f378e5 100644 --- a/meilisearch-lib/src/tasks/task_store/mod.rs +++ b/meilisearch-lib/src/tasks/task_store/mod.rs @@ -186,6 +186,17 @@ impl TaskStore { Ok(tasks) } + pub async fn fetch_unfinished_tasks(&self, offset: Option) -> Result> { + let store = self.store.clone(); + + tokio::task::spawn_blocking(move || { + let txn = store.rtxn()?; + let tasks = store.fetch_unfinished_tasks(&txn, offset)?; + Ok(tasks) + }) + .await? + } + pub async fn list_tasks( &self, offset: Option, @@ -325,6 +336,13 @@ pub mod test { } } + pub async fn fetch_unfinished_tasks(&self, from: Option) -> Result> { + match self { + Self::Real(s) => s.fetch_unfinished_tasks(from).await, + Self::Mock(m) => unsafe { m.get("fetch_unfinished_tasks").call(from) }, + } + } + pub async fn list_tasks( &self, from: Option, diff --git a/meilisearch-lib/src/tasks/task_store/store.rs b/meilisearch-lib/src/tasks/task_store/store.rs index 75ece0ae8..a109935ab 100644 --- a/meilisearch-lib/src/tasks/task_store/store.rs +++ b/meilisearch-lib/src/tasks/task_store/store.rs @@ -121,9 +121,27 @@ impl Store { Ok(task) } - pub fn list_tasks<'a>( + /// Returns the unfinished tasks starting from the given taskId in ascending order. + pub fn fetch_unfinished_tasks(&self, txn: &RoTxn, from: Option) -> Result> { + // We must NEVER re-enqueue an already processed task! It's content uuid would point to an unexisting file. + // + // TODO(marin): This may create some latency when the first batch lazy loads the pending updates. + let from = from.unwrap_or_default(); + + let result: StdResult, milli::heed::Error> = self + .tasks + .range(txn, &(BEU64::new(from)..))? + .map(|r| r.map(|(_, t)| t)) + .filter(|result| result.as_ref().map_or(true, |t| !t.is_finished())) + .collect(); + + result.map_err(Into::into) + } + + /// Returns all the tasks starting from the given taskId and going in descending order. + pub fn list_tasks( &self, - txn: &'a RoTxn, + txn: &RoTxn, from: Option, filter: Option, limit: Option, @@ -132,6 +150,7 @@ impl Store { let range = from..limit .map(|limit| (limit as u64).saturating_add(from)) .unwrap_or(u64::MAX); + let iter: Box>> = match filter { Some( ref filter @ TaskFilter { @@ -152,7 +171,7 @@ impl Store { ), }; - let apply_fitler = |task: &StdResult<_, milli::heed::Error>| match task { + let apply_filter = |task: &StdResult<_, milli::heed::Error>| match task { Ok(ref t) => filter .as_ref() .and_then(|filter| filter.filter_fn.as_ref()) @@ -160,9 +179,10 @@ impl Store { .unwrap_or(true), Err(_) => true, }; + // Collect 'limit' task if it exists or all of them. let tasks = iter - .filter(apply_fitler) + .filter(apply_filter) .take(limit.unwrap_or(usize::MAX)) .try_fold::<_, _, StdResult<_, milli::heed::Error>>(Vec::new(), |mut v, task| { v.push(task?); @@ -305,9 +325,20 @@ pub mod test { } } - pub fn list_tasks<'a>( + pub fn fetch_unfinished_tasks( &self, - txn: &'a RoTxn, + txn: &RoTxn, + from: Option, + ) -> Result> { + match self { + MockStore::Real(index) => index.fetch_unfinished_tasks(txn, from), + MockStore::Fake(_) => todo!(), + } + } + + pub fn list_tasks( + &self, + txn: &RoTxn, from: Option, filter: Option, limit: Option, From c11d21879ac843c3ed705e778ac8bfa52cc61100 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Wed, 1 Jun 2022 12:04:01 +0200 Subject: [PATCH 4/8] Introduce tasks limit and after to the tasks route --- Cargo.lock | 1 + meilisearch-http/src/routes/tasks.rs | 25 +- meilisearch-lib/Cargo.toml | 1 + meilisearch-lib/src/tasks/batch.rs | 2 +- meilisearch-lib/src/tasks/task.rs | 2 +- meilisearch-lib/src/tasks/task_store/mod.rs | 6 +- meilisearch-lib/src/tasks/task_store/store.rs | 225 +++++------------- 7 files changed, 95 insertions(+), 167 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 39eb78987..bbf557c2e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2114,6 +2114,7 @@ dependencies = [ "rayon", "regex", "reqwest", + "roaring", "rustls", "serde", "serde_json", diff --git a/meilisearch-http/src/routes/tasks.rs b/meilisearch-http/src/routes/tasks.rs index 821142399..a82ca835f 100644 --- a/meilisearch-http/src/routes/tasks.rs +++ b/meilisearch-http/src/routes/tasks.rs @@ -14,6 +14,8 @@ use crate::task::{TaskListView, TaskStatus, TaskType, TaskView}; use super::{fold_star_or, StarOr}; +const DEFAULT_LIMIT: fn() -> usize = || 20; + pub fn configure(cfg: &mut web::ServiceConfig) { cfg.service(web::resource("").route(web::get().to(SeqHandler(get_tasks)))) .service(web::resource("/{task_id}").route(web::get().to(SeqHandler(get_task)))); @@ -26,8 +28,9 @@ pub struct TaskFilterQuery { type_: Option>>, status: Option>>, index_uid: Option>>, - limit: Option, // TODO must not return an error when deser fail - after: Option, // TODO must not return an error when deser fail + #[serde(default = "DEFAULT_LIMIT")] + limit: usize, + after: Option, } #[rustfmt::skip] @@ -132,10 +135,22 @@ async fn get_tasks( indexes_filters }; + let offset = match after { + Some(0) => { + let tasks = TaskListView { + results: vec![], + limit, + after: None, + }; + return Ok(HttpResponse::Ok().json(tasks)); + } + // We -1 here because we need an offset and we must exclude the `after` one. + Some(n) => Some(n - 1), + None => None, + }; + // We +1 just to know if there is more after this "page" or not. - let limit = limit.unwrap_or(DEFAULT_LIMIT).saturating_add(1); - // We -1 here because we need an offset and we must exclude the `after` one. - let offset = after.map(|n| n.saturating_sub(1)); + let limit = limit.saturating_add(1); let mut tasks_results = meilisearch .list_tasks(filters, Some(limit), offset) diff --git a/meilisearch-lib/Cargo.toml b/meilisearch-lib/Cargo.toml index 85ae49f64..020862cea 100644 --- a/meilisearch-lib/Cargo.toml +++ b/meilisearch-lib/Cargo.toml @@ -41,6 +41,7 @@ rand = "0.8.5" rayon = "1.5.1" regex = "1.5.5" reqwest = { version = "0.11.9", features = ["json", "rustls-tls"], default-features = false, optional = true } +roaring = "0.9.0" rustls = "0.20.4" serde = { version = "1.0.136", features = ["derive"] } serde_json = { version = "1.0.79", features = ["preserve_order"] } diff --git a/meilisearch-lib/src/tasks/batch.rs b/meilisearch-lib/src/tasks/batch.rs index d5116f750..7173ecd33 100644 --- a/meilisearch-lib/src/tasks/batch.rs +++ b/meilisearch-lib/src/tasks/batch.rs @@ -4,7 +4,7 @@ use crate::snapshot::SnapshotJob; use super::task::{Task, TaskEvent}; -pub type BatchId = u64; +pub type BatchId = u32; #[derive(Debug)] pub enum BatchContent { diff --git a/meilisearch-lib/src/tasks/task.rs b/meilisearch-lib/src/tasks/task.rs index 97eb11467..3b94cd991 100644 --- a/meilisearch-lib/src/tasks/task.rs +++ b/meilisearch-lib/src/tasks/task.rs @@ -10,7 +10,7 @@ use crate::{ index_resolver::IndexUid, }; -pub type TaskId = u64; +pub type TaskId = u32; #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] #[cfg_attr(test, derive(proptest_derive::Arbitrary))] diff --git a/meilisearch-lib/src/tasks/task_store/mod.rs b/meilisearch-lib/src/tasks/task_store/mod.rs index 835f378e5..6c7584683 100644 --- a/meilisearch-lib/src/tasks/task_store/mod.rs +++ b/meilisearch-lib/src/tasks/task_store/mod.rs @@ -41,6 +41,10 @@ impl TaskFilter { } } + fn filtered_indexes(&self) -> Option<&HashSet> { + self.indexes.as_ref() + } + /// Adds an index to the filter, so the filter must match this index. pub fn filter_index(&mut self, index: String) { self.indexes @@ -396,7 +400,7 @@ pub mod test { let mut runner = TestRunner::new(Config::default()); runner - .run(&(0..100u64).prop_map(gen_task), |task| { + .run(&(0..100u32).prop_map(gen_task), |task| { let mut txn = store.wtxn().unwrap(); let previous_id = store.next_task_id(&mut txn).unwrap(); diff --git a/meilisearch-lib/src/tasks/task_store/store.rs b/meilisearch-lib/src/tasks/task_store/store.rs index a109935ab..5b17da716 100644 --- a/meilisearch-lib/src/tasks/task_store/store.rs +++ b/meilisearch-lib/src/tasks/task_store/store.rs @@ -1,62 +1,30 @@ #[allow(clippy::upper_case_acronyms)] -type BEU64 = milli::heed::zerocopy::U64; -const UID_TASK_IDS: &str = "uid_task_id"; +type BEU32 = milli::heed::zerocopy::U32; + +const INDEX_UIDS_TASK_IDS: &str = "index-uids-task-ids"; const TASKS: &str = "tasks"; -use std::borrow::Cow; -use std::collections::BinaryHeap; -use std::convert::TryInto; -use std::mem::size_of; -use std::ops::Range; +use std::collections::HashSet; +use std::ops::Bound::{Excluded, Unbounded}; use std::result::Result as StdResult; use std::sync::Arc; -use milli::heed::types::{ByteSlice, OwnedType, SerdeJson, Unit}; -use milli::heed::{BytesDecode, BytesEncode, Database, Env, RoTxn, RwTxn}; +use milli::heed::types::{OwnedType, SerdeJson, Str}; +use milli::heed::{Database, Env, RoTxn, RwTxn}; +use milli::heed_codec::RoaringBitmapCodec; +use roaring::RoaringBitmap; use crate::tasks::task::{Task, TaskId}; use super::super::Result; - use super::TaskFilter; -enum IndexUidTaskIdCodec {} - -impl<'a> BytesEncode<'a> for IndexUidTaskIdCodec { - type EItem = (&'a str, TaskId); - - fn bytes_encode((s, id): &'a Self::EItem) -> Option> { - let size = s.len() + std::mem::size_of::() + 1; - if size > 512 { - return None; - } - let mut b = Vec::with_capacity(size); - b.extend_from_slice(s.as_bytes()); - // null terminate the string - b.push(0); - b.extend_from_slice(&id.to_be_bytes()); - Some(Cow::Owned(b)) - } -} - -impl<'a> BytesDecode<'a> for IndexUidTaskIdCodec { - type DItem = (&'a str, TaskId); - - fn bytes_decode(bytes: &'a [u8]) -> Option { - let len = bytes.len(); - let s_end = len.checked_sub(size_of::())?.checked_sub(1)?; - let str_bytes = &bytes[..s_end]; - let str = std::str::from_utf8(str_bytes).ok()?; - let id = TaskId::from_be_bytes(bytes[(len - size_of::())..].try_into().ok()?); - Some((str, id)) - } -} - pub struct Store { env: Arc, - uids_task_ids: Database, - tasks: Database, SerdeJson>, + /// Maps an index uid to the set of tasks ids associated to it. + index_uid_task_ids: Database, + tasks: Database, SerdeJson>, } impl Drop for Store { @@ -74,12 +42,12 @@ impl Store { /// You want to patch all un-finished tasks and put them in your pending /// queue with the `reset_and_return_unfinished_update` method. pub fn new(env: Arc) -> Result { - let uids_task_ids = env.create_database(Some(UID_TASK_IDS))?; + let index_uid_task_ids = env.create_database(Some(INDEX_UIDS_TASK_IDS))?; let tasks = env.create_database(Some(TASKS))?; Ok(Self { env, - uids_task_ids, + index_uid_task_ids, tasks, }) } @@ -107,17 +75,24 @@ impl Store { } pub fn put(&self, txn: &mut RwTxn, task: &Task) -> Result<()> { - self.tasks.put(txn, &BEU64::new(task.id), task)?; + self.tasks.put(txn, &BEU32::new(task.id), task)?; // only add the task to the indexes index if it has an index_uid - if let Some(ref index_uid) = task.index_uid { - self.uids_task_ids.put(txn, &(index_uid, task.id), &())?; + if let Some(index_uid) = &task.index_uid { + let mut tasks_set = self + .index_uid_task_ids + .get(txn, index_uid)? + .unwrap_or_default(); + + tasks_set.insert(task.id); + + self.index_uid_task_ids.put(txn, index_uid, &tasks_set)?; } Ok(()) } pub fn get(&self, txn: &RoTxn, id: TaskId) -> Result> { - let task = self.tasks.get(txn, &BEU64::new(id))?; + let task = self.tasks.get(txn, &BEU32::new(id))?; Ok(task) } @@ -130,7 +105,7 @@ impl Store { let result: StdResult, milli::heed::Error> = self .tasks - .range(txn, &(BEU64::new(from)..))? + .range(txn, &(BEU32::new(from)..))? .map(|r| r.map(|(_, t)| t)) .filter(|result| result.as_ref().map_or(true, |t| !t.is_finished())) .collect(); @@ -146,102 +121,58 @@ impl Store { filter: Option, limit: Option, ) -> Result> { - let from = from.unwrap_or_default(); - let range = from..limit - .map(|limit| (limit as u64).saturating_add(from)) - .unwrap_or(u64::MAX); - - let iter: Box>> = match filter { - Some( - ref filter @ TaskFilter { - indexes: Some(_), .. - }, - ) => { - let iter = self - .compute_candidates(txn, filter, range)? - .into_iter() - .filter_map(|id| self.tasks.get(txn, &BEU64::new(id)).transpose()); - - Box::new(iter) - } - _ => Box::new( - self.tasks - .rev_range(txn, &(BEU64::new(range.start)..BEU64::new(range.end)))? - .map(|r| r.map(|(_, t)| t)), - ), + let from = match from { + Some(from) => from, + None => self.tasks.last(txn)?.map_or(0, |(id, _)| id.get()), }; - let apply_filter = |task: &StdResult<_, milli::heed::Error>| match task { - Ok(ref t) => filter + let filter_fn = |task: &Task| { + filter .as_ref() - .and_then(|filter| filter.filter_fn.as_ref()) - .map(|f| f(t)) - .unwrap_or(true), - Err(_) => true, + .and_then(|f| f.filter_fn.as_ref()) + .map_or(true, |f| f(task)) }; - // Collect 'limit' task if it exists or all of them. - let tasks = iter - .filter(apply_filter) - .take(limit.unwrap_or(usize::MAX)) - .try_fold::<_, _, StdResult<_, milli::heed::Error>>(Vec::new(), |mut v, task| { - v.push(task?); - Ok(v) - })?; + let result: Result> = match filter.as_ref().and_then(|f| f.filtered_indexes()) { + Some(indexes) => self + .compute_candidates(txn, indexes, from)? + .filter(|result| result.as_ref().map_or(true, filter_fn)) + .take(limit.unwrap_or(usize::MAX)) + .collect(), + None => self + .tasks + .rev_range(txn, &(..=BEU32::new(from)))? + .map(|r| r.map(|(_, t)| t).map_err(Into::into)) + .filter(|result| result.as_ref().map_or(true, filter_fn)) + .take(limit.unwrap_or(usize::MAX)) + .collect(), + }; - Ok(tasks) + result.map_err(Into::into) } - fn compute_candidates( - &self, - txn: &milli::heed::RoTxn, - filter: &TaskFilter, - range: Range, - ) -> Result> { - let mut candidates = BinaryHeap::new(); - if let Some(ref indexes) = filter.indexes { - for index in indexes { - // We need to prefix search the null terminated string to make sure that we only - // get exact matches for the index, and not other uids that would share the same - // prefix, i.e test and test1. - let mut index_uid = index.as_bytes().to_vec(); - index_uid.push(0); + fn compute_candidates<'a>( + &'a self, + txn: &'a RoTxn, + indexes: &HashSet, + from: TaskId, + ) -> Result> + 'a> { + let mut candidates = RoaringBitmap::new(); - self.uids_task_ids - .remap_key_type::() - .rev_prefix_iter(txn, &index_uid)? - .map(|entry| -> StdResult<_, milli::heed::Error> { - let (key, _) = entry?; - let (_, id) = IndexUidTaskIdCodec::bytes_decode(key) - .ok_or(milli::heed::Error::Decoding)?; - Ok(id) - }) - .skip_while(|entry| { - entry - .as_ref() - .ok() - // we skip all elements till we enter in the range - .map(|key| !range.contains(key)) - // if we encounter an error we returns true to collect it later - .unwrap_or(true) - }) - .take_while(|entry| { - entry - .as_ref() - .ok() - // as soon as we are out of the range we exit - .map(|key| range.contains(key)) - // if we encounter an error we returns true to collect it later - .unwrap_or(true) - }) - .try_for_each::<_, StdResult<(), milli::heed::Error>>(|id| { - candidates.push(id?); - Ok(()) - })?; + for index_uid in indexes { + if let Some(tasks_set) = self.index_uid_task_ids.get(txn, index_uid)? { + candidates |= tasks_set; } } - Ok(candidates) + candidates.remove_range((Excluded(from), Unbounded)); + + let iter = candidates + .into_iter() + .rev() + .filter_map(|id| self.get(txn, id).transpose()); + + Ok(iter) } } @@ -250,8 +181,6 @@ pub mod test { use itertools::Itertools; use milli::heed::EnvOpenOptions; use nelson::Mocker; - use proptest::collection::vec; - use proptest::prelude::*; use tempfile::TempDir; use crate::index_resolver::IndexUid; @@ -460,26 +389,4 @@ pub mod test { "test" ); } - - proptest! { - #[test] - fn encode_decode_roundtrip(index_uid in any::(), task_id in 0..TaskId::MAX) { - let value = (index_uid.as_ref(), task_id); - let bytes = IndexUidTaskIdCodec::bytes_encode(&value).unwrap(); - let (index, id) = IndexUidTaskIdCodec::bytes_decode(bytes.as_ref()).unwrap(); - assert_eq!(&*index_uid, index); - assert_eq!(task_id, id); - } - - #[test] - fn encode_doesnt_crash(index_uid in "\\PC*", task_id in 0..TaskId::MAX) { - let value = (index_uid.as_ref(), task_id); - IndexUidTaskIdCodec::bytes_encode(&value); - } - - #[test] - fn decode_doesnt_crash(bytes in vec(any::(), 0..1000)) { - IndexUidTaskIdCodec::bytes_decode(&bytes); - } - } } From d80e8b64afcca8a7cba6f7ce86c6758064fabdd4 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Wed, 1 Jun 2022 15:30:39 +0200 Subject: [PATCH 5/8] Align the tasks route API to the new spec --- meilisearch-http/src/routes/tasks.rs | 30 +++++++++------------------- meilisearch-http/src/task.rs | 3 ++- 2 files changed, 11 insertions(+), 22 deletions(-) diff --git a/meilisearch-http/src/routes/tasks.rs b/meilisearch-http/src/routes/tasks.rs index a82ca835f..49554858d 100644 --- a/meilisearch-http/src/routes/tasks.rs +++ b/meilisearch-http/src/routes/tasks.rs @@ -30,7 +30,7 @@ pub struct TaskFilterQuery { index_uid: Option>>, #[serde(default = "DEFAULT_LIMIT")] limit: usize, - after: Option, + from: Option, } #[rustfmt::skip] @@ -74,7 +74,7 @@ async fn get_tasks( status, index_uid, limit, - after, + from, } = params.into_inner(); let search_rules = &meilisearch.filters().search_rules; @@ -135,25 +135,11 @@ async fn get_tasks( indexes_filters }; - let offset = match after { - Some(0) => { - let tasks = TaskListView { - results: vec![], - limit, - after: None, - }; - return Ok(HttpResponse::Ok().json(tasks)); - } - // We -1 here because we need an offset and we must exclude the `after` one. - Some(n) => Some(n - 1), - None => None, - }; - // We +1 just to know if there is more after this "page" or not. let limit = limit.saturating_add(1); let mut tasks_results = meilisearch - .list_tasks(filters, Some(limit), offset) + .list_tasks(filters, Some(limit), from) .await? .into_iter() .map(TaskView::from) @@ -161,17 +147,19 @@ async fn get_tasks( // If we were able to fetch the number +1 tasks we asked // it means that there is more to come. - let after = if tasks_results.len() == limit { - tasks_results.pop(); - tasks_results.last().map(|t| t.uid) + let next = if tasks_results.len() == limit { + tasks_results.pop().map(|t| t.uid) } else { None }; + let from = tasks_results.first().map(|t| t.uid); + let tasks = TaskListView { results: tasks_results, limit: limit.saturating_sub(1), - after, + from, + next, }; Ok(HttpResponse::Ok().json(tasks)) diff --git a/meilisearch-http/src/task.rs b/meilisearch-http/src/task.rs index 8eec71a4e..f8ba941d8 100644 --- a/meilisearch-http/src/task.rs +++ b/meilisearch-http/src/task.rs @@ -371,7 +371,8 @@ impl From for TaskView { pub struct TaskListView { pub results: Vec, pub limit: usize, - pub after: Option, + pub from: Option, + pub next: Option, } #[derive(Debug, Serialize)] From 0656df3a6d9c71822d21205e93061ab5aaf9f697 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Wed, 1 Jun 2022 17:14:13 +0200 Subject: [PATCH 6/8] Fix the dumps tests --- meilisearch-http/tests/dumps/mod.rs | 24 ++++++------------------ 1 file changed, 6 insertions(+), 18 deletions(-) diff --git a/meilisearch-http/tests/dumps/mod.rs b/meilisearch-http/tests/dumps/mod.rs index 2f8938d28..c26b0e06e 100644 --- a/meilisearch-http/tests/dumps/mod.rs +++ b/meilisearch-http/tests/dumps/mod.rs @@ -68,7 +68,7 @@ async fn import_dump_v2_movie_raw() { assert_eq!(code, 200); assert_eq!( tasks, - json!({ "results": [{"uid": 0, "indexUid": "indexUID", "status": "succeeded", "type": "documentAdditionOrUpdate", "details": { "receivedDocuments": 0, "indexedDocuments": 31944 }, "duration": "PT41.751156S", "enqueuedAt": "2021-09-08T08:30:30.550282Z", "startedAt": "2021-09-08T08:30:30.553012Z", "finishedAt": "2021-09-08T08:31:12.304168Z" }], "limit": 20, "after": null }) + json!({ "results": [{"uid": 0, "indexUid": "indexUID", "status": "succeeded", "type": "documentAdditionOrUpdate", "details": { "receivedDocuments": 0, "indexedDocuments": 31944 }, "duration": "PT41.751156S", "enqueuedAt": "2021-09-08T08:30:30.550282Z", "startedAt": "2021-09-08T08:30:30.553012Z", "finishedAt": "2021-09-08T08:31:12.304168Z" }], "limit": 20, "from": 0, "next": null }) ); // finally we're just going to check that we can still get a few documents by id @@ -132,7 +132,7 @@ async fn import_dump_v2_movie_with_settings() { assert_eq!(code, 200); assert_eq!( tasks, - json!({ "results": [{ "uid": 1, "indexUid": "indexUID", "status": "succeeded", "type": "settingsUpdate", "details": { "displayedAttributes": ["title", "genres", "overview", "poster", "release_date"], "searchableAttributes": ["title", "overview"], "filterableAttributes": ["genres"], "stopWords": ["of", "the"] }, "duration": "PT37.488777S", "enqueuedAt": "2021-09-08T08:24:02.323444Z", "startedAt": "2021-09-08T08:24:02.324145Z", "finishedAt": "2021-09-08T08:24:39.812922Z" }, { "uid": 0, "indexUid": "indexUID", "status": "succeeded", "type": "documentAdditionOrUpdate", "details": { "receivedDocuments": 0, "indexedDocuments": 31944 }, "duration": "PT39.941318S", "enqueuedAt": "2021-09-08T08:21:14.742672Z", "startedAt": "2021-09-08T08:21:14.750166Z", "finishedAt": "2021-09-08T08:21:54.691484Z" }], "limit": 20, "after": null }) + json!({ "results": [{ "uid": 1, "indexUid": "indexUID", "status": "succeeded", "type": "settingsUpdate", "details": { "displayedAttributes": ["title", "genres", "overview", "poster", "release_date"], "searchableAttributes": ["title", "overview"], "filterableAttributes": ["genres"], "stopWords": ["of", "the"] }, "duration": "PT37.488777S", "enqueuedAt": "2021-09-08T08:24:02.323444Z", "startedAt": "2021-09-08T08:24:02.324145Z", "finishedAt": "2021-09-08T08:24:39.812922Z" }, { "uid": 0, "indexUid": "indexUID", "status": "succeeded", "type": "documentAdditionOrUpdate", "details": { "receivedDocuments": 0, "indexedDocuments": 31944 }, "duration": "PT39.941318S", "enqueuedAt": "2021-09-08T08:21:14.742672Z", "startedAt": "2021-09-08T08:21:14.750166Z", "finishedAt": "2021-09-08T08:21:54.691484Z" }], "limit": 20, "from": 1, "next": null }) ); // finally we're just going to check that we can still get a few documents by id @@ -198,10 +198,6 @@ async fn import_dump_v2_rubygems_with_settings() { tasks["results"][0], json!({"uid": 92, "indexUid": "rubygems", "status": "succeeded", "type": "documentAdditionOrUpdate", "details": {"receivedDocuments": 0, "indexedDocuments": 1042}, "duration": "PT14.034672S", "enqueuedAt": "2021-09-08T08:40:31.390775Z", "startedAt": "2021-09-08T08:51:39.060642Z", "finishedAt": "2021-09-08T08:51:53.095314Z"}) ); - assert_eq!( - tasks["results"][92], - json!({"uid": 0, "indexUid": "rubygems", "status": "succeeded", "type": "settingsUpdate", "details": {"displayedAttributes": ["name", "summary", "description", "version", "total_downloads"], "searchableAttributes": ["name", "summary"], "filterableAttributes": ["version"], "rankingRules": ["typo", "words", "desc(fame)", "proximity", "attribute", "exactness", "desc(total_downloads)"]}, "duration": "PT0.008886S", "enqueuedAt": "2021-09-08T08:40:28.660188Z", "startedAt": "2021-09-08T08:40:28.660766Z", "finishedAt": "2021-09-08T08:40:28.669652Z"}) - ); // finally we're just going to check that we can still get a few documents by id let (document, code) = index.get_document(188040, None).await; @@ -264,7 +260,7 @@ async fn import_dump_v3_movie_raw() { assert_eq!(code, 200); assert_eq!( tasks, - json!({ "results": [{"uid": 0, "indexUid": "indexUID", "status": "succeeded", "type": "documentAdditionOrUpdate", "details": { "receivedDocuments": 0, "indexedDocuments": 31944 }, "duration": "PT41.751156S", "enqueuedAt": "2021-09-08T08:30:30.550282Z", "startedAt": "2021-09-08T08:30:30.553012Z", "finishedAt": "2021-09-08T08:31:12.304168Z" }], "limit": 20, "after": null }) + json!({ "results": [{"uid": 0, "indexUid": "indexUID", "status": "succeeded", "type": "documentAdditionOrUpdate", "details": { "receivedDocuments": 0, "indexedDocuments": 31944 }, "duration": "PT41.751156S", "enqueuedAt": "2021-09-08T08:30:30.550282Z", "startedAt": "2021-09-08T08:30:30.553012Z", "finishedAt": "2021-09-08T08:31:12.304168Z" }], "limit": 20, "from": 0, "next": null }) ); // finally we're just going to check that we can still get a few documents by id @@ -328,7 +324,7 @@ async fn import_dump_v3_movie_with_settings() { assert_eq!(code, 200); assert_eq!( tasks, - json!({ "results": [{ "uid": 1, "indexUid": "indexUID", "status": "succeeded", "type": "settingsUpdate", "details": { "displayedAttributes": ["title", "genres", "overview", "poster", "release_date"], "searchableAttributes": ["title", "overview"], "filterableAttributes": ["genres"], "stopWords": ["of", "the"] }, "duration": "PT37.488777S", "enqueuedAt": "2021-09-08T08:24:02.323444Z", "startedAt": "2021-09-08T08:24:02.324145Z", "finishedAt": "2021-09-08T08:24:39.812922Z" }, { "uid": 0, "indexUid": "indexUID", "status": "succeeded", "type": "documentAdditionOrUpdate", "details": { "receivedDocuments": 0, "indexedDocuments": 31944 }, "duration": "PT39.941318S", "enqueuedAt": "2021-09-08T08:21:14.742672Z", "startedAt": "2021-09-08T08:21:14.750166Z", "finishedAt": "2021-09-08T08:21:54.691484Z" }], "limit": 20, "after": null }) + json!({ "results": [{ "uid": 1, "indexUid": "indexUID", "status": "succeeded", "type": "settingsUpdate", "details": { "displayedAttributes": ["title", "genres", "overview", "poster", "release_date"], "searchableAttributes": ["title", "overview"], "filterableAttributes": ["genres"], "stopWords": ["of", "the"] }, "duration": "PT37.488777S", "enqueuedAt": "2021-09-08T08:24:02.323444Z", "startedAt": "2021-09-08T08:24:02.324145Z", "finishedAt": "2021-09-08T08:24:39.812922Z" }, { "uid": 0, "indexUid": "indexUID", "status": "succeeded", "type": "documentAdditionOrUpdate", "details": { "receivedDocuments": 0, "indexedDocuments": 31944 }, "duration": "PT39.941318S", "enqueuedAt": "2021-09-08T08:21:14.742672Z", "startedAt": "2021-09-08T08:21:14.750166Z", "finishedAt": "2021-09-08T08:21:54.691484Z" }], "limit": 20, "from": 1, "next": null }) ); // finally we're just going to check that we can["results"] still get a few documents by id @@ -394,10 +390,6 @@ async fn import_dump_v3_rubygems_with_settings() { tasks["results"][0], json!({"uid": 92, "indexUid": "rubygems", "status": "succeeded", "type": "documentAdditionOrUpdate", "details": {"receivedDocuments": 0, "indexedDocuments": 1042}, "duration": "PT14.034672S", "enqueuedAt": "2021-09-08T08:40:31.390775Z", "startedAt": "2021-09-08T08:51:39.060642Z", "finishedAt": "2021-09-08T08:51:53.095314Z"}) ); - assert_eq!( - tasks["results"][92], - json!({"uid": 0, "indexUid": "rubygems", "status": "succeeded", "type": "settingsUpdate", "details": {"displayedAttributes": ["name", "summary", "description", "version", "total_downloads"], "searchableAttributes": ["name", "summary"], "filterableAttributes": ["version"], "rankingRules": ["typo", "words", "desc(fame)", "proximity", "attribute", "exactness", "desc(total_downloads)"]}, "duration": "PT0.008886S", "enqueuedAt": "2021-09-08T08:40:28.660188Z", "startedAt": "2021-09-08T08:40:28.660766Z", "finishedAt": "2021-09-08T08:40:28.669652Z"}) - ); // finally we're just going to check that we can still get a few documents by id let (document, code) = index.get_document(188040, None).await; @@ -460,7 +452,7 @@ async fn import_dump_v4_movie_raw() { assert_eq!(code, 200); assert_eq!( tasks, - json!({ "results": [{"uid": 0, "indexUid": "indexUID", "status": "succeeded", "type": "documentAdditionOrUpdate", "details": { "receivedDocuments": 0, "indexedDocuments": 31944 }, "duration": "PT41.751156S", "enqueuedAt": "2021-09-08T08:30:30.550282Z", "startedAt": "2021-09-08T08:30:30.553012Z", "finishedAt": "2021-09-08T08:31:12.304168Z" }], "limit" : 20, "after": null }) + json!({ "results": [{"uid": 0, "indexUid": "indexUID", "status": "succeeded", "type": "documentAdditionOrUpdate", "details": { "receivedDocuments": 0, "indexedDocuments": 31944 }, "duration": "PT41.751156S", "enqueuedAt": "2021-09-08T08:30:30.550282Z", "startedAt": "2021-09-08T08:30:30.553012Z", "finishedAt": "2021-09-08T08:31:12.304168Z" }], "limit" : 20, "from": 0, "next": null }) ); // finally we're just going to check that we can still get a few documents by id @@ -524,7 +516,7 @@ async fn import_dump_v4_movie_with_settings() { assert_eq!(code, 200); assert_eq!( tasks, - json!({ "results": [{ "uid": 1, "indexUid": "indexUID", "status": "succeeded", "type": "settingsUpdate", "details": { "displayedAttributes": ["title", "genres", "overview", "poster", "release_date"], "searchableAttributes": ["title", "overview"], "filterableAttributes": ["genres"], "stopWords": ["of", "the"] }, "duration": "PT37.488777S", "enqueuedAt": "2021-09-08T08:24:02.323444Z", "startedAt": "2021-09-08T08:24:02.324145Z", "finishedAt": "2021-09-08T08:24:39.812922Z" }, { "uid": 0, "indexUid": "indexUID", "status": "succeeded", "type": "documentAdditionOrUpdate", "details": { "receivedDocuments": 0, "indexedDocuments": 31944 }, "duration": "PT39.941318S", "enqueuedAt": "2021-09-08T08:21:14.742672Z", "startedAt": "2021-09-08T08:21:14.750166Z", "finishedAt": "2021-09-08T08:21:54.691484Z" }], "limit": 20, "after": null }) + json!({ "results": [{ "uid": 1, "indexUid": "indexUID", "status": "succeeded", "type": "settingsUpdate", "details": { "displayedAttributes": ["title", "genres", "overview", "poster", "release_date"], "searchableAttributes": ["title", "overview"], "filterableAttributes": ["genres"], "stopWords": ["of", "the"] }, "duration": "PT37.488777S", "enqueuedAt": "2021-09-08T08:24:02.323444Z", "startedAt": "2021-09-08T08:24:02.324145Z", "finishedAt": "2021-09-08T08:24:39.812922Z" }, { "uid": 0, "indexUid": "indexUID", "status": "succeeded", "type": "documentAdditionOrUpdate", "details": { "receivedDocuments": 0, "indexedDocuments": 31944 }, "duration": "PT39.941318S", "enqueuedAt": "2021-09-08T08:21:14.742672Z", "startedAt": "2021-09-08T08:21:14.750166Z", "finishedAt": "2021-09-08T08:21:54.691484Z" }], "limit": 20, "from": 1, "next": null }) ); // finally we're just going to check that we can still get a few documents by id @@ -590,10 +582,6 @@ async fn import_dump_v4_rubygems_with_settings() { tasks["results"][0], json!({ "uid": 92, "indexUid": "rubygems", "status": "succeeded", "type": "documentAdditionOrUpdate", "details": {"receivedDocuments": 0, "indexedDocuments": 1042}, "duration": "PT14.034672S", "enqueuedAt": "2021-09-08T08:40:31.390775Z", "startedAt": "2021-09-08T08:51:39.060642Z", "finishedAt": "2021-09-08T08:51:53.095314Z"}) ); - assert_eq!( - tasks["results"][92], - json!({ "uid": 0, "indexUid": "rubygems", "status": "succeeded", "type": "settingsUpdate", "details": {"displayedAttributes": ["name", "summary", "description", "version", "total_downloads"], "searchableAttributes": ["name", "summary"], "filterableAttributes": ["version"], "rankingRules": ["typo", "words", "desc(fame)", "proximity", "attribute", "exactness", "desc(total_downloads)"]}, "duration": "PT0.008886S", "enqueuedAt": "2021-09-08T08:40:28.660188Z", "startedAt": "2021-09-08T08:40:28.660766Z", "finishedAt": "2021-09-08T08:40:28.669652Z"}) - ); // finally we're just going to check that we can still get a few documents by id let (document, code) = index.get_document(188040, None).await; From df721b2e9eef60064b55622b47bd9af459d3c11c Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Wed, 1 Jun 2022 17:16:15 +0200 Subject: [PATCH 7/8] Scheduler must not reverse the order of the fetched tasks --- meilisearch-lib/src/tasks/scheduler.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/meilisearch-lib/src/tasks/scheduler.rs b/meilisearch-lib/src/tasks/scheduler.rs index dddb6dff9..36534f358 100644 --- a/meilisearch-lib/src/tasks/scheduler.rs +++ b/meilisearch-lib/src/tasks/scheduler.rs @@ -346,8 +346,6 @@ impl Scheduler { .fetch_unfinished_tasks(Some(self.next_fetched_task_id)) .await? .into_iter() - // The tasks arrive in reverse order, and we need to insert them in order. - .rev() .for_each(|t| { self.next_fetched_task_id = t.id + 1; self.register_task(t); From dfce9ba4683727b27998865cca6205dbed921478 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Thu, 2 Jun 2022 11:26:12 +0200 Subject: [PATCH 8/8] Apply suggestions --- meilisearch-http/src/routes/tasks.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/meilisearch-http/src/routes/tasks.rs b/meilisearch-http/src/routes/tasks.rs index 49554858d..2f62615fd 100644 --- a/meilisearch-http/src/routes/tasks.rs +++ b/meilisearch-http/src/routes/tasks.rs @@ -138,12 +138,12 @@ async fn get_tasks( // We +1 just to know if there is more after this "page" or not. let limit = limit.saturating_add(1); - let mut tasks_results = meilisearch + let mut tasks_results: Vec<_> = meilisearch .list_tasks(filters, Some(limit), from) .await? .into_iter() .map(TaskView::from) - .collect::>(); + .collect(); // If we were able to fetch the number +1 tasks we asked // it means that there is more to come.