diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 978898732..debee8d59 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -16,16 +16,30 @@ use std::{collections::HashMap, sync::RwLock}; use milli::heed::{Database, Env, EnvOpenOptions, RoTxn, RwTxn}; use milli::{Index, RoaringBitmapCodec, BEU32}; use roaring::RoaringBitmap; +use serde::Deserialize; pub type Result = std::result::Result; pub type TaskId = u32; type IndexName = String; type IndexUuid = String; +const DEFAULT_LIMIT: fn() -> u32 = || 20; + +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Query { + #[serde(default = "DEFAULT_LIMIT")] + limit: u32, + from: Option, + status: Option>, + #[serde(rename = "type")] + kind: Option>, + index_uid: Option>, +} + /// This module is responsible for two things; /// 1. Resolve the name of the indexes. /// 2. Schedule the tasks. - #[derive(Clone)] pub struct IndexScheduler { // Keep track of the opened indexes and is used @@ -91,13 +105,54 @@ impl IndexScheduler { Ok(index) } - fn next_task_id(&self, rtxn: &RoTxn) -> Result { + fn last_task_id(&self, rtxn: &RoTxn) -> Result> { Ok(self .all_tasks .remap_data_type::() .last(rtxn)? - .map(|(k, _)| k.get()) - .unwrap_or(0)) + .map(|(k, _)| k.get() + 1)) + } + + fn next_task_id(&self, rtxn: &RoTxn) -> Result { + Ok(self.last_task_id(rtxn)?.unwrap_or_default()) + } + + /// Returns the tasks corresponding to the query. + pub fn get_tasks(&self, query: Query) -> Result> { + let rtxn = self.env.read_txn()?; + let last_task_id = match self.last_task_id(&rtxn)? { + Some(tid) => query.from.map(|from| from.min(tid)).unwrap_or(tid), + None => return Ok(Vec::new()), + }; + + // This is the list of all the tasks. + let mut tasks = RoaringBitmap::from_iter(0..last_task_id); + + if let Some(status) = query.status { + let mut status_tasks = RoaringBitmap::new(); + for status in status { + status_tasks |= self.get_status(&rtxn, status)?; + } + tasks &= status_tasks; + } + + if let Some(kind) = query.kind { + let mut kind_tasks = RoaringBitmap::new(); + for kind in kind { + kind_tasks |= self.get_kind(&rtxn, kind)?; + } + tasks &= kind_tasks; + } + + if let Some(index) = query.index_uid { + let mut index_tasks = RoaringBitmap::new(); + for index in index { + index_tasks |= self.get_index(&rtxn, &index)?; + } + tasks &= index_tasks; + } + + self.get_existing_tasks(&rtxn, tasks.into_iter().rev().take(query.limit as usize)) } /// Register a new task in the scheduler. If it fails and data was associated with the task