From 461b91fd13a5575d574fb3612a74ba948d993cb3 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Wed, 1 Jun 2022 10:20:33 +0200 Subject: [PATCH] Introduce the fetch_unfinished_tasks function to fetch tasks --- meilisearch-lib/src/tasks/scheduler.rs | 8 +--- meilisearch-lib/src/tasks/task_store/mod.rs | 18 ++++++++ meilisearch-lib/src/tasks/task_store/store.rs | 43 ++++++++++++++++--- 3 files changed, 56 insertions(+), 13 deletions(-) diff --git a/meilisearch-lib/src/tasks/scheduler.rs b/meilisearch-lib/src/tasks/scheduler.rs index 19265a911..dddb6dff9 100644 --- a/meilisearch-lib/src/tasks/scheduler.rs +++ b/meilisearch-lib/src/tasks/scheduler.rs @@ -342,14 +342,8 @@ impl Scheduler { } async fn fetch_pending_tasks(&mut self) -> Result<()> { - // We must NEVER re-enqueue an already processed task! It's content uuid would point to an unexisting file. - // - // TODO(marin): This may create some latency when the first batch lazy loads the pending updates. - let mut filter = TaskFilter::default(); - filter.filter_fn(|task| !task.is_finished()); - self.store - .list_tasks(Some(self.next_fetched_task_id), Some(filter), None) + .fetch_unfinished_tasks(Some(self.next_fetched_task_id)) .await? .into_iter() // The tasks arrive in reverse order, and we need to insert them in order. diff --git a/meilisearch-lib/src/tasks/task_store/mod.rs b/meilisearch-lib/src/tasks/task_store/mod.rs index 3645717e6..835f378e5 100644 --- a/meilisearch-lib/src/tasks/task_store/mod.rs +++ b/meilisearch-lib/src/tasks/task_store/mod.rs @@ -186,6 +186,17 @@ impl TaskStore { Ok(tasks) } + pub async fn fetch_unfinished_tasks(&self, offset: Option) -> Result> { + let store = self.store.clone(); + + tokio::task::spawn_blocking(move || { + let txn = store.rtxn()?; + let tasks = store.fetch_unfinished_tasks(&txn, offset)?; + Ok(tasks) + }) + .await? + } + pub async fn list_tasks( &self, offset: Option, @@ -325,6 +336,13 @@ pub mod test { } } + pub async fn fetch_unfinished_tasks(&self, from: Option) -> Result> { + match self { + Self::Real(s) => s.fetch_unfinished_tasks(from).await, + Self::Mock(m) => unsafe { m.get("fetch_unfinished_tasks").call(from) }, + } + } + pub async fn list_tasks( &self, from: Option, diff --git a/meilisearch-lib/src/tasks/task_store/store.rs b/meilisearch-lib/src/tasks/task_store/store.rs index 75ece0ae8..a109935ab 100644 --- a/meilisearch-lib/src/tasks/task_store/store.rs +++ b/meilisearch-lib/src/tasks/task_store/store.rs @@ -121,9 +121,27 @@ impl Store { Ok(task) } - pub fn list_tasks<'a>( + /// Returns the unfinished tasks starting from the given taskId in ascending order. + pub fn fetch_unfinished_tasks(&self, txn: &RoTxn, from: Option) -> Result> { + // We must NEVER re-enqueue an already processed task! It's content uuid would point to an unexisting file. + // + // TODO(marin): This may create some latency when the first batch lazy loads the pending updates. + let from = from.unwrap_or_default(); + + let result: StdResult, milli::heed::Error> = self + .tasks + .range(txn, &(BEU64::new(from)..))? + .map(|r| r.map(|(_, t)| t)) + .filter(|result| result.as_ref().map_or(true, |t| !t.is_finished())) + .collect(); + + result.map_err(Into::into) + } + + /// Returns all the tasks starting from the given taskId and going in descending order. + pub fn list_tasks( &self, - txn: &'a RoTxn, + txn: &RoTxn, from: Option, filter: Option, limit: Option, @@ -132,6 +150,7 @@ impl Store { let range = from..limit .map(|limit| (limit as u64).saturating_add(from)) .unwrap_or(u64::MAX); + let iter: Box>> = match filter { Some( ref filter @ TaskFilter { @@ -152,7 +171,7 @@ impl Store { ), }; - let apply_fitler = |task: &StdResult<_, milli::heed::Error>| match task { + let apply_filter = |task: &StdResult<_, milli::heed::Error>| match task { Ok(ref t) => filter .as_ref() .and_then(|filter| filter.filter_fn.as_ref()) @@ -160,9 +179,10 @@ impl Store { .unwrap_or(true), Err(_) => true, }; + // Collect 'limit' task if it exists or all of them. let tasks = iter - .filter(apply_fitler) + .filter(apply_filter) .take(limit.unwrap_or(usize::MAX)) .try_fold::<_, _, StdResult<_, milli::heed::Error>>(Vec::new(), |mut v, task| { v.push(task?); @@ -305,9 +325,20 @@ pub mod test { } } - pub fn list_tasks<'a>( + pub fn fetch_unfinished_tasks( &self, - txn: &'a RoTxn, + txn: &RoTxn, + from: Option, + ) -> Result> { + match self { + MockStore::Real(index) => index.fetch_unfinished_tasks(txn, from), + MockStore::Fake(_) => todo!(), + } + } + + pub fn list_tasks( + &self, + txn: &RoTxn, from: Option, filter: Option, limit: Option,