Introduce tasks limit and after to the tasks route

This commit is contained in:
Kerollmops 2022-06-01 12:04:01 +02:00
parent 461b91fd13
commit c11d21879a
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
7 changed files with 95 additions and 167 deletions

1
Cargo.lock generated
View File

@ -2114,6 +2114,7 @@ dependencies = [
"rayon",
"regex",
"reqwest",
"roaring",
"rustls",
"serde",
"serde_json",

View File

@ -14,6 +14,8 @@ use crate::task::{TaskListView, TaskStatus, TaskType, TaskView};
use super::{fold_star_or, StarOr};
const DEFAULT_LIMIT: fn() -> usize = || 20;
pub fn configure(cfg: &mut web::ServiceConfig) {
cfg.service(web::resource("").route(web::get().to(SeqHandler(get_tasks))))
.service(web::resource("/{task_id}").route(web::get().to(SeqHandler(get_task))));
@ -26,8 +28,9 @@ pub struct TaskFilterQuery {
type_: Option<CS<StarOr<TaskType>>>,
status: Option<CS<StarOr<TaskStatus>>>,
index_uid: Option<CS<StarOr<IndexUid>>>,
limit: Option<usize>, // TODO must not return an error when deser fail
after: Option<TaskId>, // TODO must not return an error when deser fail
#[serde(default = "DEFAULT_LIMIT")]
limit: usize,
after: Option<TaskId>,
}
#[rustfmt::skip]
@ -132,10 +135,22 @@ async fn get_tasks(
indexes_filters
};
// We +1 just to know if there is more after this "page" or not.
let limit = limit.unwrap_or(DEFAULT_LIMIT).saturating_add(1);
let offset = match after {
Some(0) => {
let tasks = TaskListView {
results: vec![],
limit,
after: None,
};
return Ok(HttpResponse::Ok().json(tasks));
}
// We -1 here because we need an offset and we must exclude the `after` one.
let offset = after.map(|n| n.saturating_sub(1));
Some(n) => Some(n - 1),
None => None,
};
// We +1 just to know if there is more after this "page" or not.
let limit = limit.saturating_add(1);
let mut tasks_results = meilisearch
.list_tasks(filters, Some(limit), offset)

View File

@ -41,6 +41,7 @@ rand = "0.8.5"
rayon = "1.5.1"
regex = "1.5.5"
reqwest = { version = "0.11.9", features = ["json", "rustls-tls"], default-features = false, optional = true }
roaring = "0.9.0"
rustls = "0.20.4"
serde = { version = "1.0.136", features = ["derive"] }
serde_json = { version = "1.0.79", features = ["preserve_order"] }

View File

@ -4,7 +4,7 @@ use crate::snapshot::SnapshotJob;
use super::task::{Task, TaskEvent};
pub type BatchId = u64;
pub type BatchId = u32;
#[derive(Debug)]
pub enum BatchContent {

View File

@ -10,7 +10,7 @@ use crate::{
index_resolver::IndexUid,
};
pub type TaskId = u64;
pub type TaskId = u32;
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
#[cfg_attr(test, derive(proptest_derive::Arbitrary))]

View File

@ -41,6 +41,10 @@ impl TaskFilter {
}
}
fn filtered_indexes(&self) -> Option<&HashSet<String>> {
self.indexes.as_ref()
}
/// Adds an index to the filter, so the filter must match this index.
pub fn filter_index(&mut self, index: String) {
self.indexes
@ -396,7 +400,7 @@ pub mod test {
let mut runner = TestRunner::new(Config::default());
runner
.run(&(0..100u64).prop_map(gen_task), |task| {
.run(&(0..100u32).prop_map(gen_task), |task| {
let mut txn = store.wtxn().unwrap();
let previous_id = store.next_task_id(&mut txn).unwrap();

View File

@ -1,62 +1,30 @@
#[allow(clippy::upper_case_acronyms)]
type BEU64 = milli::heed::zerocopy::U64<milli::heed::byteorder::BE>;
const UID_TASK_IDS: &str = "uid_task_id";
type BEU32 = milli::heed::zerocopy::U32<milli::heed::byteorder::BE>;
const INDEX_UIDS_TASK_IDS: &str = "index-uids-task-ids";
const TASKS: &str = "tasks";
use std::borrow::Cow;
use std::collections::BinaryHeap;
use std::convert::TryInto;
use std::mem::size_of;
use std::ops::Range;
use std::collections::HashSet;
use std::ops::Bound::{Excluded, Unbounded};
use std::result::Result as StdResult;
use std::sync::Arc;
use milli::heed::types::{ByteSlice, OwnedType, SerdeJson, Unit};
use milli::heed::{BytesDecode, BytesEncode, Database, Env, RoTxn, RwTxn};
use milli::heed::types::{OwnedType, SerdeJson, Str};
use milli::heed::{Database, Env, RoTxn, RwTxn};
use milli::heed_codec::RoaringBitmapCodec;
use roaring::RoaringBitmap;
use crate::tasks::task::{Task, TaskId};
use super::super::Result;
use super::TaskFilter;
enum IndexUidTaskIdCodec {}
impl<'a> BytesEncode<'a> for IndexUidTaskIdCodec {
type EItem = (&'a str, TaskId);
fn bytes_encode((s, id): &'a Self::EItem) -> Option<Cow<'a, [u8]>> {
let size = s.len() + std::mem::size_of::<TaskId>() + 1;
if size > 512 {
return None;
}
let mut b = Vec::with_capacity(size);
b.extend_from_slice(s.as_bytes());
// null terminate the string
b.push(0);
b.extend_from_slice(&id.to_be_bytes());
Some(Cow::Owned(b))
}
}
impl<'a> BytesDecode<'a> for IndexUidTaskIdCodec {
type DItem = (&'a str, TaskId);
fn bytes_decode(bytes: &'a [u8]) -> Option<Self::DItem> {
let len = bytes.len();
let s_end = len.checked_sub(size_of::<TaskId>())?.checked_sub(1)?;
let str_bytes = &bytes[..s_end];
let str = std::str::from_utf8(str_bytes).ok()?;
let id = TaskId::from_be_bytes(bytes[(len - size_of::<TaskId>())..].try_into().ok()?);
Some((str, id))
}
}
pub struct Store {
env: Arc<Env>,
uids_task_ids: Database<IndexUidTaskIdCodec, Unit>,
tasks: Database<OwnedType<BEU64>, SerdeJson<Task>>,
/// Maps an index uid to the set of tasks ids associated to it.
index_uid_task_ids: Database<Str, RoaringBitmapCodec>,
tasks: Database<OwnedType<BEU32>, SerdeJson<Task>>,
}
impl Drop for Store {
@ -74,12 +42,12 @@ impl Store {
/// You want to patch all un-finished tasks and put them in your pending
/// queue with the `reset_and_return_unfinished_update` method.
pub fn new(env: Arc<milli::heed::Env>) -> Result<Self> {
let uids_task_ids = env.create_database(Some(UID_TASK_IDS))?;
let index_uid_task_ids = env.create_database(Some(INDEX_UIDS_TASK_IDS))?;
let tasks = env.create_database(Some(TASKS))?;
Ok(Self {
env,
uids_task_ids,
index_uid_task_ids,
tasks,
})
}
@ -107,17 +75,24 @@ impl Store {
}
pub fn put(&self, txn: &mut RwTxn, task: &Task) -> Result<()> {
self.tasks.put(txn, &BEU64::new(task.id), task)?;
self.tasks.put(txn, &BEU32::new(task.id), task)?;
// only add the task to the indexes index if it has an index_uid
if let Some(ref index_uid) = task.index_uid {
self.uids_task_ids.put(txn, &(index_uid, task.id), &())?;
if let Some(index_uid) = &task.index_uid {
let mut tasks_set = self
.index_uid_task_ids
.get(txn, index_uid)?
.unwrap_or_default();
tasks_set.insert(task.id);
self.index_uid_task_ids.put(txn, index_uid, &tasks_set)?;
}
Ok(())
}
pub fn get(&self, txn: &RoTxn, id: TaskId) -> Result<Option<Task>> {
let task = self.tasks.get(txn, &BEU64::new(id))?;
let task = self.tasks.get(txn, &BEU32::new(id))?;
Ok(task)
}
@ -130,7 +105,7 @@ impl Store {
let result: StdResult<Vec<_>, milli::heed::Error> = self
.tasks
.range(txn, &(BEU64::new(from)..))?
.range(txn, &(BEU32::new(from)..))?
.map(|r| r.map(|(_, t)| t))
.filter(|result| result.as_ref().map_or(true, |t| !t.is_finished()))
.collect();
@ -146,102 +121,58 @@ impl Store {
filter: Option<TaskFilter>,
limit: Option<usize>,
) -> Result<Vec<Task>> {
let from = from.unwrap_or_default();
let range = from..limit
.map(|limit| (limit as u64).saturating_add(from))
.unwrap_or(u64::MAX);
let iter: Box<dyn Iterator<Item = StdResult<_, milli::heed::Error>>> = match filter {
Some(
ref filter @ TaskFilter {
indexes: Some(_), ..
},
) => {
let iter = self
.compute_candidates(txn, filter, range)?
.into_iter()
.filter_map(|id| self.tasks.get(txn, &BEU64::new(id)).transpose());
Box::new(iter)
}
_ => Box::new(
self.tasks
.rev_range(txn, &(BEU64::new(range.start)..BEU64::new(range.end)))?
.map(|r| r.map(|(_, t)| t)),
),
let from = match from {
Some(from) => from,
None => self.tasks.last(txn)?.map_or(0, |(id, _)| id.get()),
};
let apply_filter = |task: &StdResult<_, milli::heed::Error>| match task {
Ok(ref t) => filter
let filter_fn = |task: &Task| {
filter
.as_ref()
.and_then(|filter| filter.filter_fn.as_ref())
.map(|f| f(t))
.unwrap_or(true),
Err(_) => true,
.and_then(|f| f.filter_fn.as_ref())
.map_or(true, |f| f(task))
};
// Collect 'limit' task if it exists or all of them.
let tasks = iter
.filter(apply_filter)
let result: Result<Vec<_>> = match filter.as_ref().and_then(|f| f.filtered_indexes()) {
Some(indexes) => self
.compute_candidates(txn, indexes, from)?
.filter(|result| result.as_ref().map_or(true, filter_fn))
.take(limit.unwrap_or(usize::MAX))
.try_fold::<_, _, StdResult<_, milli::heed::Error>>(Vec::new(), |mut v, task| {
v.push(task?);
Ok(v)
})?;
.collect(),
None => self
.tasks
.rev_range(txn, &(..=BEU32::new(from)))?
.map(|r| r.map(|(_, t)| t).map_err(Into::into))
.filter(|result| result.as_ref().map_or(true, filter_fn))
.take(limit.unwrap_or(usize::MAX))
.collect(),
};
Ok(tasks)
result.map_err(Into::into)
}
fn compute_candidates(
&self,
txn: &milli::heed::RoTxn,
filter: &TaskFilter,
range: Range<TaskId>,
) -> Result<BinaryHeap<TaskId>> {
let mut candidates = BinaryHeap::new();
if let Some(ref indexes) = filter.indexes {
for index in indexes {
// We need to prefix search the null terminated string to make sure that we only
// get exact matches for the index, and not other uids that would share the same
// prefix, i.e test and test1.
let mut index_uid = index.as_bytes().to_vec();
index_uid.push(0);
fn compute_candidates<'a>(
&'a self,
txn: &'a RoTxn,
indexes: &HashSet<String>,
from: TaskId,
) -> Result<impl Iterator<Item = Result<Task>> + 'a> {
let mut candidates = RoaringBitmap::new();
self.uids_task_ids
.remap_key_type::<ByteSlice>()
.rev_prefix_iter(txn, &index_uid)?
.map(|entry| -> StdResult<_, milli::heed::Error> {
let (key, _) = entry?;
let (_, id) = IndexUidTaskIdCodec::bytes_decode(key)
.ok_or(milli::heed::Error::Decoding)?;
Ok(id)
})
.skip_while(|entry| {
entry
.as_ref()
.ok()
// we skip all elements till we enter in the range
.map(|key| !range.contains(key))
// if we encounter an error we returns true to collect it later
.unwrap_or(true)
})
.take_while(|entry| {
entry
.as_ref()
.ok()
// as soon as we are out of the range we exit
.map(|key| range.contains(key))
// if we encounter an error we returns true to collect it later
.unwrap_or(true)
})
.try_for_each::<_, StdResult<(), milli::heed::Error>>(|id| {
candidates.push(id?);
Ok(())
})?;
for index_uid in indexes {
if let Some(tasks_set) = self.index_uid_task_ids.get(txn, index_uid)? {
candidates |= tasks_set;
}
}
Ok(candidates)
candidates.remove_range((Excluded(from), Unbounded));
let iter = candidates
.into_iter()
.rev()
.filter_map(|id| self.get(txn, id).transpose());
Ok(iter)
}
}
@ -250,8 +181,6 @@ pub mod test {
use itertools::Itertools;
use milli::heed::EnvOpenOptions;
use nelson::Mocker;
use proptest::collection::vec;
use proptest::prelude::*;
use tempfile::TempDir;
use crate::index_resolver::IndexUid;
@ -460,26 +389,4 @@ pub mod test {
"test"
);
}
proptest! {
#[test]
fn encode_decode_roundtrip(index_uid in any::<IndexUid>(), task_id in 0..TaskId::MAX) {
let value = (index_uid.as_ref(), task_id);
let bytes = IndexUidTaskIdCodec::bytes_encode(&value).unwrap();
let (index, id) = IndexUidTaskIdCodec::bytes_decode(bytes.as_ref()).unwrap();
assert_eq!(&*index_uid, index);
assert_eq!(task_id, id);
}
#[test]
fn encode_doesnt_crash(index_uid in "\\PC*", task_id in 0..TaskId::MAX) {
let value = (index_uid.as_ref(), task_id);
IndexUidTaskIdCodec::bytes_encode(&value);
}
#[test]
fn decode_doesnt_crash(bytes in vec(any::<u8>(), 0..1000)) {
IndexUidTaskIdCodec::bytes_decode(&bytes);
}
}
}