Compare commits

..

27 Commits

Author SHA1 Message Date
Tamo
ec06879d28
apply review changes 2024-11-20 14:40:36 +01:00
Tamo
83d1f858c1
Update crates/index-scheduler/src/lib.rs
Co-authored-by: Clément Renault <clement@meilisearch.com>
2024-11-20 14:36:05 +01:00
Tamo
a7ac590e9e
implements the reverse query parameter for the batches 2024-11-20 13:29:52 +01:00
Tamo
8ad68dd708
stop leaking the update files of the canceled tasks 2024-11-20 13:17:54 +01:00
Tamo
7e379b3d14
remove useless prints 2024-11-20 12:27:12 +01:00
Tamo
56eacd221f
update the tests after the rebase 2024-11-20 10:54:38 +01:00
Tamo
bdb51a85fe
now that the task cancelation shares their started at with all the tasks of their batch we don't need the trick of retrieving the previous batch anymore 2024-11-20 10:51:07 +01:00
Tamo
b24a34830d
fix the dump test -> the only change is that we now have a null batch_uid in all the tasks 2024-11-20 10:51:06 +01:00
Tamo
e145d71a62
implements the two last TODOs 2024-11-20 10:51:06 +01:00
Tamo
d9a4e69990
push a missing snapshot 2024-11-20 10:51:06 +01:00
Tamo
b906e3ed70
improve the way we access the mutex 2024-11-20 10:51:06 +01:00
Tamo
4abcd9c04e
add some stats on the batches 2024-11-20 10:51:06 +01:00
Tamo
229fa0f902
implements the batch details 2024-11-20 10:51:06 +01:00
Tamo
5d10c2312b
remove unused file 2024-11-20 10:51:06 +01:00
Tamo
f1d38581e5
add the front end tests on the batches routes 2024-11-20 10:51:06 +01:00
Tamo
62646af7b9
implements the automatic batch deletion 2024-11-20 10:51:06 +01:00
Tamo
1fcb9526f5
fix the task cancelation 2024-11-20 10:51:06 +01:00
Tamo
15eefa4fcc
fixes a lot of small issue, the test about the cancellation is still failing 2024-11-20 10:51:05 +01:00
Tamo
ad9763ffcd
copy multiple task query tests to batches. Currently, they fails 2024-11-20 10:49:25 +01:00
Tamo
d489f5635f
add the mapping between the task and batches 2024-11-20 10:49:23 +01:00
Tamo
a1251c3c83
Implements the get all batches route with filters working 2024-11-20 10:42:55 +01:00
Tamo
6062914654
add the batch_id to the tasks 2024-11-20 10:42:54 +01:00
Lukas Kalbertodt
057fcb3993
Add indices field to _matchesPosition to specify where in an array a match comes from (#5005)
Some checks are pending
Indexing bench (push) / Run and upload benchmarks (push) Waiting to run
Benchmarks of indexing (push) / Run and upload benchmarks (push) Waiting to run
Benchmarks of search for geo (push) / Run and upload benchmarks (push) Waiting to run
Benchmarks of search for songs (push) / Run and upload benchmarks (push) Waiting to run
Benchmarks of search for Wikipedia articles (push) / Run and upload benchmarks (push) Waiting to run
Run the indexing fuzzer / Setup the action (push) Successful in 1h4m31s
* Remove unreachable code

* Add `indices` field to `MatchBounds`

For matches inside arrays, this field holds the indices of the array
elements that matched. For example, searching for `cat` inside
`{ "a": ["dog", "cat", "fox"] }` would return `indices: [1]`. For nested
arrays, this contains multiple indices, starting with the one for the
top-most array. For matches in fields without arrays, `indices` is not
serialized (does not exist) to save space.
2024-11-20 01:00:43 +01:00
meili-bors[bot]
c1d8ee2a8d
Merge #5048
Some checks failed
Test suite / Tests almost all features (push) Has been skipped
Test suite / Test disabled tokenization (push) Has been skipped
Test suite / Tests on ubuntu-20.04 (push) Failing after 14s
Test suite / Run tests in debug (push) Failing after 24s
Test suite / Tests on ${{ matrix.os }} (windows-2022) (push) Failing after 57s
Test suite / Run Rustfmt (push) Successful in 1m36s
Test suite / Run Clippy (push) Successful in 6m8s
Indexing bench (push) / Run and upload benchmarks (push) Waiting to run
Benchmarks of indexing (push) / Run and upload benchmarks (push) Waiting to run
Benchmarks of search for geo (push) / Run and upload benchmarks (push) Waiting to run
Benchmarks of search for songs (push) / Run and upload benchmarks (push) Waiting to run
Benchmarks of search for Wikipedia articles (push) / Run and upload benchmarks (push) Waiting to run
Run the indexing fuzzer / Setup the action (push) Successful in 1h4m23s
Test suite / Tests on ${{ matrix.os }} (macos-13) (push) Has been cancelled
5048: Reverse the order of the task queue r=Kerollmops a=irevoire

# Pull Request

## Related issue
Fixes https://github.com/meilisearch/meilisearch/issues/5047

## What does this PR do?
- Provide a new parameter to reverse the order of the task queue
- Add tests
- Remove some unrelated tests that were duplicated in tests/tasks/mod.rs and tests/tasks/error.rs


Co-authored-by: Tamo <tamo@meilisearch.com>
2024-11-18 16:24:12 +00:00
meili-bors[bot]
94fb55bb6f
Merge #5049
Some checks failed
Test suite / Tests on ${{ matrix.os }} (macos-13) (push) Waiting to run
Test suite / Tests on ubuntu-20.04 (push) Failing after 59s
Test suite / Tests almost all features (push) Has been skipped
Test suite / Test disabled tokenization (push) Has been skipped
Test suite / Run tests in debug (push) Failing after 13s
Test suite / Tests on ${{ matrix.os }} (windows-2022) (push) Failing after 7m4s
Test suite / Run Clippy (push) Successful in 10m58s
Test suite / Run Rustfmt (push) Successful in 2m34s
Run the indexing fuzzer / Setup the action (push) Successful in 1h5m58s
Indexing bench (push) / Run and upload benchmarks (push) Has been cancelled
Benchmarks of indexing (push) / Run and upload benchmarks (push) Has been cancelled
Benchmarks of search for geo (push) / Run and upload benchmarks (push) Has been cancelled
Benchmarks of search for songs (push) / Run and upload benchmarks (push) Has been cancelled
Benchmarks of search for Wikipedia articles (push) / Run and upload benchmarks (push) Has been cancelled
5049: Fix the path used in the flaky tests CI r=irevoire a=Kerollmops

This PR fixes [the flaky tests CI](https://github.com/meilisearch/meilisearch/actions/runs/11741717787) path used.

Co-authored-by: Clément Renault <clement@meilisearch.com>
2024-11-13 10:26:50 +00:00
Clément Renault
009709eace
Fix the path used in the flaky tests CI 2024-11-13 09:52:10 +01:00
Tamo
2eb1801e85 reverse the order of the task queue 2024-11-07 19:19:44 +01:00
24 changed files with 434 additions and 349 deletions

View File

@ -21,10 +21,10 @@ jobs:
- name: Install cargo-flaky - name: Install cargo-flaky
run: cargo install cargo-flaky run: cargo install cargo-flaky
- name: Run cargo flaky in the dumps - name: Run cargo flaky in the dumps
run: cd dump; cargo flaky -i 100 --release run: cd crates/dump; cargo flaky -i 100 --release
- name: Run cargo flaky in the index-scheduler - name: Run cargo flaky in the index-scheduler
run: cd index-scheduler; cargo flaky -i 100 --release run: cd crates/index-scheduler; cargo flaky -i 100 --release
- name: Run cargo flaky in the auth - name: Run cargo flaky in the auth
run: cd meilisearch-auth; cargo flaky -i 100 --release run: cd crates/meilisearch-auth; cargo flaky -i 100 --release
- name: Run cargo flaky in meilisearch - name: Run cargo flaky in meilisearch
run: cd meilisearch; cargo flaky -i 100 --release run: cd crates/meilisearch; cargo flaky -i 100 --release

View File

@ -450,7 +450,7 @@ pub(crate) mod test {
// tasks // tasks
let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap(); let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap();
let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip(); let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip();
meili_snap::snapshot_hash!(meili_snap::json_string!(tasks), @"41f91d3a94911b2735ec41b07540df5c"); meili_snap::snapshot_hash!(meili_snap::json_string!(tasks), @"4b03e23e740b27bfb9d2a1faffe512e2");
assert_eq!(update_files.len(), 22); assert_eq!(update_files.len(), 22);
assert!(update_files[0].is_none()); // the dump creation assert!(update_files[0].is_none()); // the dump creation
assert!(update_files[1].is_some()); // the enqueued document addition assert!(update_files[1].is_some()); // the enqueued document addition

View File

@ -222,7 +222,7 @@ pub(crate) mod test {
// tasks // tasks
let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap(); let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap();
let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip(); let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip();
meili_snap::snapshot_hash!(meili_snap::json_string!(tasks), @"278f63325ef06ca04d01df98d8207b94"); meili_snap::snapshot_hash!(meili_snap::json_string!(tasks), @"2b8a72d6bc6ba79980491966437daaf9");
assert_eq!(update_files.len(), 10); assert_eq!(update_files.len(), 10);
assert!(update_files[0].is_none()); // the dump creation assert!(update_files[0].is_none()); // the dump creation
assert!(update_files[1].is_none()); assert!(update_files[1].is_none());
@ -345,7 +345,7 @@ pub(crate) mod test {
// tasks // tasks
let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap(); let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap();
let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip(); let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip();
meili_snap::snapshot_hash!(meili_snap::json_string!(tasks), @"d45cd8571703e58ae53c7bd7ce3f5c22"); meili_snap::snapshot_hash!(meili_snap::json_string!(tasks), @"3ddf6169b0a3703c5d770971f036fc5d");
assert_eq!(update_files.len(), 2); assert_eq!(update_files.len(), 2);
assert!(update_files[0].is_none()); // the dump creation assert!(update_files[0].is_none()); // the dump creation
assert!(update_files[1].is_none()); // the processed document addition assert!(update_files[1].is_none()); // the processed document addition
@ -391,7 +391,7 @@ pub(crate) mod test {
// tasks // tasks
let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap(); let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap();
let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip(); let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip();
meili_snap::snapshot_hash!(meili_snap::json_string!(tasks), @"41f91d3a94911b2735ec41b07540df5c"); meili_snap::snapshot_hash!(meili_snap::json_string!(tasks), @"4b03e23e740b27bfb9d2a1faffe512e2");
assert_eq!(update_files.len(), 22); assert_eq!(update_files.len(), 22);
assert!(update_files[0].is_none()); // the dump creation assert!(update_files[0].is_none()); // the dump creation
assert!(update_files[1].is_some()); // the enqueued document addition assert!(update_files[1].is_some()); // the enqueued document addition
@ -471,7 +471,7 @@ pub(crate) mod test {
// tasks // tasks
let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap(); let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap();
let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip(); let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip();
meili_snap::snapshot_hash!(meili_snap::json_string!(tasks), @"c2445ddd1785528b80f2ba534d3bd00c"); meili_snap::snapshot_hash!(meili_snap::json_string!(tasks), @"c1b06a5ca60d5805483c16c5b3ff61ef");
assert_eq!(update_files.len(), 10); assert_eq!(update_files.len(), 10);
assert!(update_files[0].is_some()); // the enqueued document addition assert!(update_files[0].is_some()); // the enqueued document addition
assert!(update_files[1..].iter().all(|u| u.is_none())); // everything already processed assert!(update_files[1..].iter().all(|u| u.is_none())); // everything already processed
@ -548,7 +548,7 @@ pub(crate) mod test {
// tasks // tasks
let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap(); let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap();
let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip(); let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip();
meili_snap::snapshot_hash!(meili_snap::json_string!(tasks), @"cd12efd308fe3ed226356a727ab42ed3"); meili_snap::snapshot_hash!(meili_snap::json_string!(tasks), @"0e203b6095f7c68dbdf788321dcc8215");
assert_eq!(update_files.len(), 10); assert_eq!(update_files.len(), 10);
assert!(update_files[0].is_some()); // the enqueued document addition assert!(update_files[0].is_some()); // the enqueued document addition
assert!(update_files[1..].iter().all(|u| u.is_none())); // everything already processed assert!(update_files[1..].iter().all(|u| u.is_none())); // everything already processed
@ -641,7 +641,7 @@ pub(crate) mod test {
// tasks // tasks
let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap(); let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap();
let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip(); let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip();
meili_snap::snapshot_hash!(meili_snap::json_string!(tasks), @"bc616290adfe7d09a624cf6065ca9069"); meili_snap::snapshot_hash!(meili_snap::json_string!(tasks), @"d216c7f90f538ffbb2a059531d7ac89a");
assert_eq!(update_files.len(), 9); assert_eq!(update_files.len(), 9);
assert!(update_files[0].is_some()); // the enqueued document addition assert!(update_files[0].is_some()); // the enqueued document addition
assert!(update_files[1..].iter().all(|u| u.is_none())); // everything already processed assert!(update_files[1..].iter().all(|u| u.is_none())); // everything already processed
@ -734,7 +734,7 @@ pub(crate) mod test {
// tasks // tasks
let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap(); let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap();
let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip(); let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip();
meili_snap::snapshot_hash!(meili_snap::json_string!(tasks), @"2db37756d8af1fb7623436b76e8956a6"); meili_snap::snapshot_hash!(meili_snap::json_string!(tasks), @"e27999f1112632222cb84f6cffff7c5f");
assert_eq!(update_files.len(), 8); assert_eq!(update_files.len(), 8);
assert!(update_files[0..].iter().all(|u| u.is_none())); // everything already processed assert!(update_files[0..].iter().all(|u| u.is_none())); // everything already processed
@ -810,7 +810,7 @@ pub(crate) mod test {
// tasks // tasks
let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap(); let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap();
let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip(); let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip();
meili_snap::snapshot_hash!(meili_snap::json_string!(tasks), @"8df6eab075a44b3c1af6b726f9fd9a43"); meili_snap::snapshot_hash!(meili_snap::json_string!(tasks), @"0155a664b0cf62aae23db5138b6b03d7");
assert_eq!(update_files.len(), 9); assert_eq!(update_files.len(), 9);
assert!(update_files[..].iter().all(|u| u.is_none())); // no update file in dump v1 assert!(update_files[..].iter().all(|u| u.is_none())); // no update file in dump v1

View File

@ -46,7 +46,7 @@ use uuid::Uuid;
use crate::autobatcher::{self, BatchKind}; use crate::autobatcher::{self, BatchKind};
use crate::utils::{self, swap_index_uid_in_task, ProcessingBatch}; use crate::utils::{self, swap_index_uid_in_task, ProcessingBatch};
use crate::{Error, IndexScheduler, MustStopProcessing, ProcessingTasks, Result, TaskId}; use crate::{Error, IndexScheduler, MustStopProcessing, Result, TaskId};
/// Represents a combination of tasks that can all be processed at the same time. /// Represents a combination of tasks that can all be processed at the same time.
/// ///
@ -58,10 +58,6 @@ pub(crate) enum Batch {
TaskCancelation { TaskCancelation {
/// The task cancelation itself. /// The task cancelation itself.
task: Task, task: Task,
/// The date and time at which the previously processing tasks started.
previous_started_at: OffsetDateTime,
/// The list of tasks that were processing when this task cancelation appeared.
previous_processing_tasks: RoaringBitmap,
}, },
TaskDeletions(Vec<Task>), TaskDeletions(Vec<Task>),
SnapshotCreation(Vec<Task>), SnapshotCreation(Vec<Task>),
@ -286,7 +282,7 @@ impl IndexScheduler {
match batch { match batch {
BatchKind::DocumentClear { ids } => Ok(Some(Batch::IndexOperation { BatchKind::DocumentClear { ids } => Ok(Some(Batch::IndexOperation {
op: IndexOperation::DocumentClear { op: IndexOperation::DocumentClear {
tasks: self.get_existing_tasks_with_processing_batch( tasks: self.get_existing_tasks_for_processing_batch(
rtxn, rtxn,
current_batch, current_batch,
ids, ids,
@ -312,7 +308,7 @@ impl IndexScheduler {
} }
} }
BatchKind::DocumentOperation { method, operation_ids, .. } => { BatchKind::DocumentOperation { method, operation_ids, .. } => {
let tasks = self.get_existing_tasks_with_processing_batch( let tasks = self.get_existing_tasks_for_processing_batch(
rtxn, rtxn,
current_batch, current_batch,
operation_ids, operation_ids,
@ -363,7 +359,7 @@ impl IndexScheduler {
})) }))
} }
BatchKind::DocumentDeletion { deletion_ids, includes_by_filter: _ } => { BatchKind::DocumentDeletion { deletion_ids, includes_by_filter: _ } => {
let tasks = self.get_existing_tasks_with_processing_batch( let tasks = self.get_existing_tasks_for_processing_batch(
rtxn, rtxn,
current_batch, current_batch,
deletion_ids, deletion_ids,
@ -375,7 +371,7 @@ impl IndexScheduler {
})) }))
} }
BatchKind::Settings { settings_ids, .. } => { BatchKind::Settings { settings_ids, .. } => {
let tasks = self.get_existing_tasks_with_processing_batch( let tasks = self.get_existing_tasks_for_processing_batch(
rtxn, rtxn,
current_batch, current_batch,
settings_ids, settings_ids,
@ -524,7 +520,7 @@ impl IndexScheduler {
BatchKind::IndexDeletion { ids } => Ok(Some(Batch::IndexDeletion { BatchKind::IndexDeletion { ids } => Ok(Some(Batch::IndexDeletion {
index_uid, index_uid,
index_has_been_created: must_create_index, index_has_been_created: must_create_index,
tasks: self.get_existing_tasks_with_processing_batch(rtxn, current_batch, ids)?, tasks: self.get_existing_tasks_for_processing_batch(rtxn, current_batch, ids)?,
})), })),
BatchKind::IndexSwap { id } => { BatchKind::IndexSwap { id } => {
let mut task = self.get_task(rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?; let mut task = self.get_task(rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?;
@ -556,25 +552,9 @@ impl IndexScheduler {
// 1. we get the last task to cancel. // 1. we get the last task to cancel.
if let Some(task_id) = to_cancel.max() { if let Some(task_id) = to_cancel.max() {
// We retrieve the tasks that were processing before this tasks cancelation started.
// We must *not* reset the processing tasks before calling this method.
// Displaying the `batch_id` would make a strange error message since this task cancelation is going to
// replace the canceled batch. It's better to avoid mentioning it in the error message.
let ProcessingTasks { batch: previous_batch, processing } =
&*self.processing_tasks.read().unwrap();
let mut task = self.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?; let mut task = self.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
current_batch.processing(Some(&mut task)); current_batch.processing(Some(&mut task));
return Ok(Some(( return Ok(Some((Batch::TaskCancelation { task }, current_batch)));
Batch::TaskCancelation {
task,
// We should never be in a case where we don't have a previous_batch, but let's not crash if it happens
previous_started_at: previous_batch
.as_ref()
.map_or_else(OffsetDateTime::now_utc, |batch| batch.started_at),
previous_processing_tasks: processing.clone(),
},
current_batch,
)));
} }
// 2. we get the next task to delete // 2. we get the next task to delete
@ -681,7 +661,7 @@ impl IndexScheduler {
} }
match batch { match batch {
Batch::TaskCancelation { mut task, previous_started_at, previous_processing_tasks } => { Batch::TaskCancelation { mut task } => {
// 1. Retrieve the tasks that matched the query at enqueue-time. // 1. Retrieve the tasks that matched the query at enqueue-time.
let matched_tasks = let matched_tasks =
if let KindWithContent::TaskCancelation { tasks, query: _ } = &task.kind { if let KindWithContent::TaskCancelation { tasks, query: _ } = &task.kind {

View File

@ -67,7 +67,7 @@ use roaring::RoaringBitmap;
use synchronoise::SignalEvent; use synchronoise::SignalEvent;
use time::format_description::well_known::Rfc3339; use time::format_description::well_known::Rfc3339;
use time::OffsetDateTime; use time::OffsetDateTime;
use utils::{filter_out_references_to_newer_tasks, keep_tasks_within_datetimes, map_bound}; use utils::{filter_out_references_to_newer_tasks, keep_ids_within_datetimes, map_bound};
use uuid::Uuid; use uuid::Uuid;
use crate::index_mapper::IndexMapper; use crate::index_mapper::IndexMapper;
@ -85,6 +85,8 @@ pub struct Query {
pub limit: Option<u32>, pub limit: Option<u32>,
/// The minimum [task id](`meilisearch_types::tasks::Task::uid`) to be matched /// The minimum [task id](`meilisearch_types::tasks::Task::uid`) to be matched
pub from: Option<u32>, pub from: Option<u32>,
/// The order used to return the tasks. By default the newest tasks are returned first and the boolean is `false`.
pub reverse: Option<bool>,
/// The [task ids](`meilisearch_types::tasks::Task::uid`) to be matched /// The [task ids](`meilisearch_types::tasks::Task::uid`) to be matched
pub uids: Option<Vec<TaskId>>, pub uids: Option<Vec<TaskId>>,
/// The [batch ids](`meilisearch_types::batches::Batch::uid`) to be matched /// The [batch ids](`meilisearch_types::batches::Batch::uid`) to be matched
@ -129,6 +131,7 @@ impl Query {
Query { Query {
limit: None, limit: None,
from: None, from: None,
reverse: None,
uids: None, uids: None,
batch_uids: None, batch_uids: None,
statuses: None, statuses: None,
@ -770,6 +773,7 @@ impl IndexScheduler {
let Query { let Query {
limit, limit,
from, from,
reverse,
uids, uids,
batch_uids, batch_uids,
statuses, statuses,
@ -787,13 +791,18 @@ impl IndexScheduler {
let mut tasks = self.all_task_ids(rtxn)?; let mut tasks = self.all_task_ids(rtxn)?;
if let Some(from) = from { if let Some(from) = from {
tasks.remove_range(from.saturating_add(1)..); let range = if reverse.unwrap_or_default() {
u32::MIN..*from
} else {
from.saturating_add(1)..u32::MAX
};
tasks.remove_range(range);
} }
if let Some(batch_uids) = batch_uids { if let Some(batch_uids) = batch_uids {
let mut batch_tasks = RoaringBitmap::new(); let mut batch_tasks = RoaringBitmap::new();
for batch_uid in batch_uids { for batch_uid in batch_uids {
if Some(*batch_uid) == processing_batch.as_ref().map(|batch| batch.uid) { if processing_batch.as_ref().map_or(false, |batch| batch.uid == *batch_uid) {
batch_tasks |= &processing_tasks; batch_tasks |= &processing_tasks;
} else { } else {
batch_tasks |= self.tasks_in_batch(rtxn, *batch_uid)?; batch_tasks |= self.tasks_in_batch(rtxn, *batch_uid)?;
@ -895,7 +904,7 @@ impl IndexScheduler {
), ),
}; };
keep_tasks_within_datetimes( keep_ids_within_datetimes(
rtxn, rtxn,
&mut filtered_non_processing_tasks, &mut filtered_non_processing_tasks,
self.started_at, self.started_at,
@ -905,7 +914,7 @@ impl IndexScheduler {
filtered_non_processing_tasks | filtered_processing_tasks filtered_non_processing_tasks | filtered_processing_tasks
}; };
keep_tasks_within_datetimes( keep_ids_within_datetimes(
rtxn, rtxn,
&mut tasks, &mut tasks,
self.enqueued_at, self.enqueued_at,
@ -913,7 +922,7 @@ impl IndexScheduler {
*before_enqueued_at, *before_enqueued_at,
)?; )?;
keep_tasks_within_datetimes( keep_ids_within_datetimes(
rtxn, rtxn,
&mut tasks, &mut tasks,
self.finished_at, self.finished_at,
@ -921,8 +930,12 @@ impl IndexScheduler {
*before_finished_at, *before_finished_at,
)?; )?;
if let Some(limit) = *limit { if let Some(limit) = limit {
tasks = tasks.into_iter().rev().take(limit as usize).collect(); tasks = if query.reverse.unwrap_or_default() {
tasks.into_iter().take(*limit as usize).collect()
} else {
tasks.into_iter().rev().take(*limit as usize).collect()
};
} }
Ok(tasks) Ok(tasks)
@ -935,22 +948,44 @@ impl IndexScheduler {
processing: &ProcessingTasks, processing: &ProcessingTasks,
query: &Query, query: &Query,
) -> Result<RoaringBitmap> { ) -> Result<RoaringBitmap> {
dbg!(); let Query {
limit,
from,
reverse,
uids,
batch_uids,
statuses,
types,
index_uids,
canceled_by,
before_enqueued_at,
after_enqueued_at,
before_started_at,
after_started_at,
before_finished_at,
after_finished_at,
} = query;
let mut batches = self.all_batch_ids(rtxn)?; let mut batches = self.all_batch_ids(rtxn)?;
if let Some(batch_id) = processing.batch.as_ref().map(|batch| batch.uid) { if let Some(batch_id) = processing.batch.as_ref().map(|batch| batch.uid) {
batches.insert(batch_id); batches.insert(batch_id);
} }
if let Some(from) = &query.from { if let Some(from) = from {
batches.remove_range(from.saturating_add(1)..); let range = if reverse.unwrap_or_default() {
u32::MIN..*from
} else {
from.saturating_add(1)..u32::MAX
};
batches.remove_range(range);
} }
if let Some(batch_uids) = &query.batch_uids { if let Some(batch_uids) = &batch_uids {
let batches_uids = RoaringBitmap::from_iter(batch_uids); let batches_uids = RoaringBitmap::from_iter(batch_uids);
batches &= batches_uids; batches &= batches_uids;
} }
if let Some(status) = &query.statuses { if let Some(status) = &statuses {
let mut status_batches = RoaringBitmap::new(); let mut status_batches = RoaringBitmap::new();
for status in status { for status in status {
match status { match status {
@ -973,7 +1008,7 @@ impl IndexScheduler {
batches &= status_batches; batches &= status_batches;
} }
if let Some(task_uids) = &query.uids { if let Some(task_uids) = &uids {
let mut batches_by_task_uids = RoaringBitmap::new(); let mut batches_by_task_uids = RoaringBitmap::new();
for task_uid in task_uids { for task_uid in task_uids {
if let Some(task) = self.get_task(rtxn, *task_uid)? { if let Some(task) = self.get_task(rtxn, *task_uid)? {
@ -986,7 +1021,7 @@ impl IndexScheduler {
} }
// There is no database for this query, we must retrieve the task queried by the client and ensure it's valid // There is no database for this query, we must retrieve the task queried by the client and ensure it's valid
if let Some(canceled_by) = &query.canceled_by { if let Some(canceled_by) = &canceled_by {
let mut all_canceled_batches = RoaringBitmap::new(); let mut all_canceled_batches = RoaringBitmap::new();
for cancel_uid in canceled_by { for cancel_uid in canceled_by {
if let Some(task) = self.get_task(rtxn, *cancel_uid)? { if let Some(task) = self.get_task(rtxn, *cancel_uid)? {
@ -1009,7 +1044,7 @@ impl IndexScheduler {
} }
} }
if let Some(kind) = &query.types { if let Some(kind) = &types {
let mut kind_batches = RoaringBitmap::new(); let mut kind_batches = RoaringBitmap::new();
for kind in kind { for kind in kind {
kind_batches |= self.get_batch_kind(rtxn, *kind)?; kind_batches |= self.get_batch_kind(rtxn, *kind)?;
@ -1024,7 +1059,7 @@ impl IndexScheduler {
batches &= &kind_batches; batches &= &kind_batches;
} }
if let Some(index) = &query.index_uids { if let Some(index) = &index_uids {
let mut index_batches = RoaringBitmap::new(); let mut index_batches = RoaringBitmap::new();
for index in index { for index in index {
index_batches |= self.index_batches(rtxn, index)?; index_batches |= self.index_batches(rtxn, index)?;
@ -1065,48 +1100,52 @@ impl IndexScheduler {
filtered_processing_batches.clear(); filtered_processing_batches.clear();
} }
}; };
match (query.after_started_at, query.before_started_at) { match (after_started_at, before_started_at) {
(None, None) => (), (None, None) => (),
(None, Some(before)) => { (None, Some(before)) => {
clear_filtered_processing_batches(Bound::Unbounded, Bound::Excluded(before)) clear_filtered_processing_batches(Bound::Unbounded, Bound::Excluded(*before))
} }
(Some(after), None) => { (Some(after), None) => {
clear_filtered_processing_batches(Bound::Excluded(after), Bound::Unbounded) clear_filtered_processing_batches(Bound::Excluded(*after), Bound::Unbounded)
} }
(Some(after), Some(before)) => clear_filtered_processing_batches( (Some(after), Some(before)) => clear_filtered_processing_batches(
Bound::Excluded(after), Bound::Excluded(*after),
Bound::Excluded(before), Bound::Excluded(*before),
), ),
}; };
keep_tasks_within_datetimes( keep_ids_within_datetimes(
rtxn, rtxn,
&mut filtered_non_processing_batches, &mut filtered_non_processing_batches,
self.batch_started_at, self.batch_started_at,
query.after_started_at, *after_started_at,
query.before_started_at, *before_started_at,
)?; )?;
filtered_non_processing_batches | filtered_processing_batches filtered_non_processing_batches | filtered_processing_batches
}; };
keep_tasks_within_datetimes( keep_ids_within_datetimes(
rtxn, rtxn,
&mut batches, &mut batches,
self.batch_enqueued_at, self.batch_enqueued_at,
query.after_enqueued_at, *after_enqueued_at,
query.before_enqueued_at, *before_enqueued_at,
)?; )?;
keep_tasks_within_datetimes( keep_ids_within_datetimes(
rtxn, rtxn,
&mut batches, &mut batches,
self.batch_finished_at, self.batch_finished_at,
query.after_finished_at, *after_finished_at,
query.before_finished_at, *before_finished_at,
)?; )?;
if let Some(limit) = query.limit { if let Some(limit) = limit {
batches = batches.into_iter().rev().take(limit as usize).collect(); batches = if query.reverse.unwrap_or_default() {
batches.into_iter().take(*limit as usize).collect()
} else {
batches.into_iter().rev().take(*limit as usize).collect()
};
} }
Ok(batches) Ok(batches)
@ -1305,10 +1344,13 @@ impl IndexScheduler {
let rtxn = self.env.read_txn()?; let rtxn = self.env.read_txn()?;
let (tasks, total) = self.get_task_ids_from_authorized_indexes(&rtxn, &query, filters)?; let (tasks, total) = self.get_task_ids_from_authorized_indexes(&rtxn, &query, filters)?;
let tasks = self.get_existing_tasks( let tasks = if query.reverse.unwrap_or_default() {
&rtxn, Box::new(tasks.into_iter()) as Box<dyn Iterator<Item = u32>>
tasks.into_iter().rev().take(query.limit.unwrap_or(u32::MAX) as usize), } else {
)?; Box::new(tasks.into_iter().rev()) as Box<dyn Iterator<Item = u32>>
};
let tasks =
self.get_existing_tasks(&rtxn, tasks.take(query.limit.unwrap_or(u32::MAX) as usize))?;
let ProcessingTasks { batch, processing } = let ProcessingTasks { batch, processing } =
self.processing_tasks.read().map_err(|_| Error::CorruptedTaskQueue)?.clone(); self.processing_tasks.read().map_err(|_| Error::CorruptedTaskQueue)?.clone();
@ -1357,11 +1399,16 @@ impl IndexScheduler {
let (batches, total) = let (batches, total) =
self.get_batch_ids_from_authorized_indexes(&rtxn, &processing, &query, filters)?; self.get_batch_ids_from_authorized_indexes(&rtxn, &processing, &query, filters)?;
let batches = if query.reverse.unwrap_or_default() {
Box::new(batches.into_iter()) as Box<dyn Iterator<Item = u32>>
} else {
Box::new(batches.into_iter().rev()) as Box<dyn Iterator<Item = u32>>
};
let batches = self.get_existing_batches( let batches = self.get_existing_batches(
&rtxn, &rtxn,
&processing, &processing,
batches.into_iter().rev().take(query.limit.unwrap_or(u32::MAX) as usize), batches.take(query.limit.unwrap_or(u32::MAX) as usize),
)?; )?;
Ok((batches, total)) Ok((batches, total))
@ -1541,7 +1588,7 @@ impl IndexScheduler {
drop(rtxn); drop(rtxn);
// 1. store the starting date with the bitmap of processing tasks. // 1. store the starting date with the bitmap of processing tasks.
let ids = batch.ids(); let mut ids = batch.ids();
let processed_tasks = ids.len(); let processed_tasks = ids.len();
// We reset the must_stop flag to be sure that we don't stop processing tasks // We reset the must_stop flag to be sure that we don't stop processing tasks
@ -1578,6 +1625,7 @@ impl IndexScheduler {
processing_batch.finished(); processing_batch.finished();
let mut wtxn = self.env.write_txn().map_err(Error::HeedTransaction)?; let mut wtxn = self.env.write_txn().map_err(Error::HeedTransaction)?;
let mut canceled = RoaringBitmap::new();
match res { match res {
Ok(tasks) => { Ok(tasks) => {
@ -1587,7 +1635,6 @@ impl IndexScheduler {
let mut success = 0; let mut success = 0;
let mut failure = 0; let mut failure = 0;
let mut canceled_by = None; let mut canceled_by = None;
let mut canceled = RoaringBitmap::new();
#[allow(unused_variables)] #[allow(unused_variables)]
for (i, mut task) in tasks.into_iter().enumerate() { for (i, mut task) in tasks.into_iter().enumerate() {
@ -1678,9 +1725,12 @@ impl IndexScheduler {
} }
} }
let processed = self.processing_tasks.write().unwrap().stop_processing(); self.processing_tasks.write().unwrap().stop_processing();
// We must re-add the canceled task so they're part of the same batch.
// processed.processing |= canceled;
ids |= canceled;
self.write_batch(&mut wtxn, processing_batch, &processed.processing)?; self.write_batch(&mut wtxn, processing_batch, &ids)?;
#[cfg(test)] #[cfg(test)]
self.maybe_fail(tests::FailureLocation::CommittingWtxn)?; self.maybe_fail(tests::FailureLocation::CommittingWtxn)?;
@ -1710,7 +1760,7 @@ impl IndexScheduler {
})?; })?;
// We shouldn't crash the tick function if we can't send data to the webhook. // We shouldn't crash the tick function if we can't send data to the webhook.
let _ = self.notify_webhook(&processed.processing); let _ = self.notify_webhook(&ids);
#[cfg(test)] #[cfg(test)]
self.breakpoint(Breakpoint::AfterProcessing); self.breakpoint(Breakpoint::AfterProcessing);
@ -4144,7 +4194,6 @@ mod tests {
tasks: [0, 1, 2, 3].into_iter().collect(), tasks: [0, 1, 2, 3].into_iter().collect(),
}; };
let task_cancelation = index_scheduler.register(kind, None, false).unwrap(); let task_cancelation = index_scheduler.register(kind, None, false).unwrap();
println!("HEEERE");
handle.advance_n_successful_batches(1); handle.advance_n_successful_batches(1);
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "start"); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "start");

View File

@ -43,7 +43,7 @@ catto [0,]
0 {uid: 0, details: {"receivedDocuments":1,"indexedDocuments":0,"matchedTasks":1,"canceledTasks":1,"originalFilter":"test_query"}, stats: {"totalNbTasks":2,"status":{"succeeded":1,"canceled":1},"types":{"documentAdditionOrUpdate":1,"taskCancelation":1},"indexUids":{"catto":1}}, } 0 {uid: 0, details: {"receivedDocuments":1,"indexedDocuments":0,"matchedTasks":1,"canceledTasks":1,"originalFilter":"test_query"}, stats: {"totalNbTasks":2,"status":{"succeeded":1,"canceled":1},"types":{"documentAdditionOrUpdate":1,"taskCancelation":1},"indexUids":{"catto":1}}, }
---------------------------------------------------------------------- ----------------------------------------------------------------------
### Batch to tasks mapping: ### Batch to tasks mapping:
0 [1,] 0 [0,1,]
---------------------------------------------------------------------- ----------------------------------------------------------------------
### Batches Status: ### Batches Status:
succeeded [0,] succeeded [0,]
@ -67,6 +67,5 @@ catto [0,]
[timestamp] [0,] [timestamp] [0,]
---------------------------------------------------------------------- ----------------------------------------------------------------------
### File Store: ### File Store:
00000000-0000-0000-0000-000000000000
---------------------------------------------------------------------- ----------------------------------------------------------------------

View File

@ -55,7 +55,7 @@ catto: { number_of_documents: 1, field_distribution: {"id": 1} }
---------------------------------------------------------------------- ----------------------------------------------------------------------
### Batch to tasks mapping: ### Batch to tasks mapping:
0 [0,] 0 [0,]
1 [3,] 1 [1,2,3,]
---------------------------------------------------------------------- ----------------------------------------------------------------------
### Batches Status: ### Batches Status:
succeeded [0,1,] succeeded [0,1,]
@ -84,7 +84,5 @@ wolfo [1,]
[timestamp] [1,] [timestamp] [1,]
---------------------------------------------------------------------- ----------------------------------------------------------------------
### File Store: ### File Store:
00000000-0000-0000-0000-000000000001
00000000-0000-0000-0000-000000000002
---------------------------------------------------------------------- ----------------------------------------------------------------------

View File

@ -42,7 +42,7 @@ canceled [0,]
0 {uid: 0, details: {"matchedTasks":1,"canceledTasks":1,"originalFilter":"cancel dump"}, stats: {"totalNbTasks":2,"status":{"succeeded":1,"canceled":1},"types":{"taskCancelation":1,"dumpCreation":1},"indexUids":{}}, } 0 {uid: 0, details: {"matchedTasks":1,"canceledTasks":1,"originalFilter":"cancel dump"}, stats: {"totalNbTasks":2,"status":{"succeeded":1,"canceled":1},"types":{"taskCancelation":1,"dumpCreation":1},"indexUids":{}}, }
---------------------------------------------------------------------- ----------------------------------------------------------------------
### Batch to tasks mapping: ### Batch to tasks mapping:
0 [1,] 0 [0,1,]
---------------------------------------------------------------------- ----------------------------------------------------------------------
### Batches Status: ### Batches Status:
succeeded [0,] succeeded [0,]

View File

@ -44,7 +44,7 @@ catto: { number_of_documents: 0, field_distribution: {} }
0 {uid: 0, details: {"receivedDocuments":1,"indexedDocuments":0,"matchedTasks":1,"canceledTasks":1,"originalFilter":"test_query"}, stats: {"totalNbTasks":2,"status":{"succeeded":1,"canceled":1},"types":{"documentAdditionOrUpdate":1,"taskCancelation":1},"indexUids":{"catto":1}}, } 0 {uid: 0, details: {"receivedDocuments":1,"indexedDocuments":0,"matchedTasks":1,"canceledTasks":1,"originalFilter":"test_query"}, stats: {"totalNbTasks":2,"status":{"succeeded":1,"canceled":1},"types":{"documentAdditionOrUpdate":1,"taskCancelation":1},"indexUids":{"catto":1}}, }
---------------------------------------------------------------------- ----------------------------------------------------------------------
### Batch to tasks mapping: ### Batch to tasks mapping:
0 [1,] 0 [0,1,]
---------------------------------------------------------------------- ----------------------------------------------------------------------
### Batches Status: ### Batches Status:
succeeded [0,] succeeded [0,]
@ -68,6 +68,5 @@ catto [0,]
[timestamp] [0,] [timestamp] [0,]
---------------------------------------------------------------------- ----------------------------------------------------------------------
### File Store: ### File Store:
00000000-0000-0000-0000-000000000000
---------------------------------------------------------------------- ----------------------------------------------------------------------

View File

@ -54,7 +54,7 @@ catto: { number_of_documents: 0, field_distribution: {} }
---------------------------------------------------------------------- ----------------------------------------------------------------------
### Batch to tasks mapping: ### Batch to tasks mapping:
0 [0,] 0 [0,]
1 [3,] 1 [1,2,3,]
---------------------------------------------------------------------- ----------------------------------------------------------------------
### Batches Status: ### Batches Status:
succeeded [0,1,] succeeded [0,1,]

View File

@ -54,7 +54,7 @@ catto: { number_of_documents: 0, field_distribution: {} }
---------------------------------------------------------------------- ----------------------------------------------------------------------
### Batch to tasks mapping: ### Batch to tasks mapping:
0 [0,] 0 [0,]
1 [3,] 1 [1,2,3,]
---------------------------------------------------------------------- ----------------------------------------------------------------------
### Batches Status: ### Batches Status:
succeeded [0,1,] succeeded [0,1,]

View File

@ -211,9 +211,10 @@ impl IndexScheduler {
Ok(()) Ok(())
} }
/// Convert an iterator to a `Vec` of tasks. The tasks MUST exist or a /// Convert an iterator to a `Vec` of tasks and edit the `ProcessingBatch` to add the given tasks.
/// `CorruptedTaskQueue` error will be throwed. ///
pub(crate) fn get_existing_tasks_with_processing_batch( /// The tasks MUST exist, or a `CorruptedTaskQueue` error will be thrown.
pub(crate) fn get_existing_tasks_for_processing_batch(
&self, &self,
rtxn: &RoTxn, rtxn: &RoTxn,
processing_batch: &mut ProcessingBatch, processing_batch: &mut ProcessingBatch,
@ -232,7 +233,7 @@ impl IndexScheduler {
} }
/// Convert an iterator to a `Vec` of tasks. The tasks MUST exist or a /// Convert an iterator to a `Vec` of tasks. The tasks MUST exist or a
/// `CorruptedTaskQueue` error will be throwed. /// `CorruptedTaskQueue` error will be thrown.
pub(crate) fn get_existing_tasks( pub(crate) fn get_existing_tasks(
&self, &self,
rtxn: &RoTxn, rtxn: &RoTxn,
@ -247,7 +248,7 @@ impl IndexScheduler {
} }
/// Convert an iterator to a `Vec` of batches. The batches MUST exist or a /// Convert an iterator to a `Vec` of batches. The batches MUST exist or a
/// `CorruptedTaskQueue` error will be throwed. /// `CorruptedTaskQueue` error will be thrown.
pub(crate) fn get_existing_batches( pub(crate) fn get_existing_batches(
&self, &self,
rtxn: &RoTxn, rtxn: &RoTxn,
@ -270,16 +271,10 @@ impl IndexScheduler {
pub(crate) fn update_task(&self, wtxn: &mut RwTxn, task: &Task) -> Result<()> { pub(crate) fn update_task(&self, wtxn: &mut RwTxn, task: &Task) -> Result<()> {
let old_task = self.get_task(wtxn, task.uid)?.ok_or(Error::CorruptedTaskQueue)?; let old_task = self.get_task(wtxn, task.uid)?.ok_or(Error::CorruptedTaskQueue)?;
dbg!(&task); debug_assert!(old_task != *task);
debug_assert_eq!(old_task.uid, task.uid); debug_assert_eq!(old_task.uid, task.uid);
debug_assert!(old_task.batch_uid.is_none() && task.batch_uid.is_some()); debug_assert!(old_task.batch_uid.is_none() && task.batch_uid.is_some());
// TODO: This shouldn't ever happen, we should assert it
if old_task == *task {
return Ok(());
}
if old_task.status != task.status { if old_task.status != task.status {
self.update_status(wtxn, old_task.status, |bitmap| { self.update_status(wtxn, old_task.status, |bitmap| {
bitmap.remove(task.uid); bitmap.remove(task.uid);
@ -505,10 +500,9 @@ pub(crate) fn remove_task_datetime(
Ok(()) Ok(())
} }
// TODO: Rename the function since it also applies to batches pub(crate) fn keep_ids_within_datetimes(
pub(crate) fn keep_tasks_within_datetimes(
rtxn: &RoTxn, rtxn: &RoTxn,
tasks: &mut RoaringBitmap, ids: &mut RoaringBitmap,
database: Database<BEI128, CboRoaringBitmapCodec>, database: Database<BEI128, CboRoaringBitmapCodec>,
after: Option<OffsetDateTime>, after: Option<OffsetDateTime>,
before: Option<OffsetDateTime>, before: Option<OffsetDateTime>,
@ -519,15 +513,15 @@ pub(crate) fn keep_tasks_within_datetimes(
(Some(after), None) => (Bound::Excluded(*after), Bound::Unbounded), (Some(after), None) => (Bound::Excluded(*after), Bound::Unbounded),
(Some(after), Some(before)) => (Bound::Excluded(*after), Bound::Excluded(*before)), (Some(after), Some(before)) => (Bound::Excluded(*after), Bound::Excluded(*before)),
}; };
let mut collected_task_ids = RoaringBitmap::new(); let mut collected_ids = RoaringBitmap::new();
let start = map_bound(start, |b| b.unix_timestamp_nanos()); let start = map_bound(start, |b| b.unix_timestamp_nanos());
let end = map_bound(end, |b| b.unix_timestamp_nanos()); let end = map_bound(end, |b| b.unix_timestamp_nanos());
let iter = database.range(rtxn, &(start, end))?; let iter = database.range(rtxn, &(start, end))?;
for r in iter { for r in iter {
let (_timestamp, task_ids) = r?; let (_timestamp, ids) = r?;
collected_task_ids |= task_ids; collected_ids |= ids;
} }
*tasks &= collected_task_ids; *ids &= collected_ids;
Ok(()) Ok(())
} }

View File

@ -318,6 +318,7 @@ InvalidTaskBeforeStartedAt , InvalidRequest , BAD_REQUEST ;
InvalidTaskCanceledBy , InvalidRequest , BAD_REQUEST ; InvalidTaskCanceledBy , InvalidRequest , BAD_REQUEST ;
InvalidTaskFrom , InvalidRequest , BAD_REQUEST ; InvalidTaskFrom , InvalidRequest , BAD_REQUEST ;
InvalidTaskLimit , InvalidRequest , BAD_REQUEST ; InvalidTaskLimit , InvalidRequest , BAD_REQUEST ;
InvalidTaskReverse , InvalidRequest , BAD_REQUEST ;
InvalidTaskStatuses , InvalidRequest , BAD_REQUEST ; InvalidTaskStatuses , InvalidRequest , BAD_REQUEST ;
InvalidTaskTypes , InvalidRequest , BAD_REQUEST ; InvalidTaskTypes , InvalidRequest , BAD_REQUEST ;
InvalidTaskUids , InvalidRequest , BAD_REQUEST ; InvalidTaskUids , InvalidRequest , BAD_REQUEST ;

View File

@ -42,6 +42,8 @@ pub struct TasksFilterQuery {
pub limit: Param<u32>, pub limit: Param<u32>,
#[deserr(default, error = DeserrQueryParamError<InvalidTaskFrom>)] #[deserr(default, error = DeserrQueryParamError<InvalidTaskFrom>)]
pub from: Option<Param<TaskId>>, pub from: Option<Param<TaskId>>,
#[deserr(default, error = DeserrQueryParamError<InvalidTaskReverse>)]
pub reverse: Option<Param<bool>>,
#[deserr(default, error = DeserrQueryParamError<InvalidBatchUids>)] #[deserr(default, error = DeserrQueryParamError<InvalidBatchUids>)]
pub batch_uids: OptionStarOrList<BatchId>, pub batch_uids: OptionStarOrList<BatchId>,
@ -76,6 +78,7 @@ impl TasksFilterQuery {
Query { Query {
limit: Some(self.limit.0), limit: Some(self.limit.0),
from: self.from.as_deref().copied(), from: self.from.as_deref().copied(),
reverse: self.reverse.as_deref().copied(),
batch_uids: self.batch_uids.merge_star_and_none(), batch_uids: self.batch_uids.merge_star_and_none(),
statuses: self.statuses.merge_star_and_none(), statuses: self.statuses.merge_star_and_none(),
types: self.types.merge_star_and_none(), types: self.types.merge_star_and_none(),
@ -149,6 +152,7 @@ impl TaskDeletionOrCancelationQuery {
Query { Query {
limit: None, limit: None,
from: None, from: None,
reverse: None,
batch_uids: self.batch_uids.merge_star_and_none(), batch_uids: self.batch_uids.merge_star_and_none(),
statuses: self.statuses.merge_star_and_none(), statuses: self.statuses.merge_star_and_none(),
types: self.types.merge_star_and_none(), types: self.types.merge_star_and_none(),
@ -709,14 +713,14 @@ mod tests {
{ {
let params = "from=12&limit=15&indexUids=toto,tata-78&statuses=succeeded,enqueued&afterEnqueuedAt=2012-04-23&uids=1,2,3"; let params = "from=12&limit=15&indexUids=toto,tata-78&statuses=succeeded,enqueued&afterEnqueuedAt=2012-04-23&uids=1,2,3";
let query = deserr_query_params::<TasksFilterQuery>(params).unwrap(); let query = deserr_query_params::<TasksFilterQuery>(params).unwrap();
snapshot!(format!("{:?}", query), @r###"TasksFilterQuery { limit: Param(15), from: Some(Param(12)), batch_uids: None, uids: List([1, 2, 3]), canceled_by: None, types: None, statuses: List([Succeeded, Enqueued]), index_uids: List([IndexUid("toto"), IndexUid("tata-78")]), after_enqueued_at: Other(2012-04-24 0:00:00.0 +00:00:00), before_enqueued_at: None, after_started_at: None, before_started_at: None, after_finished_at: None, before_finished_at: None }"###); snapshot!(format!("{:?}", query), @r###"TasksFilterQuery { limit: Param(15), from: Some(Param(12)), reverse: None, batch_uids: None, uids: List([1, 2, 3]), canceled_by: None, types: None, statuses: List([Succeeded, Enqueued]), index_uids: List([IndexUid("toto"), IndexUid("tata-78")]), after_enqueued_at: Other(2012-04-24 0:00:00.0 +00:00:00), before_enqueued_at: None, after_started_at: None, before_started_at: None, after_finished_at: None, before_finished_at: None }"###);
} }
{ {
// Stars should translate to `None` in the query // Stars should translate to `None` in the query
// Verify value of the default limit // Verify value of the default limit
let params = "indexUids=*&statuses=succeeded,*&afterEnqueuedAt=2012-04-23&uids=1,2,3"; let params = "indexUids=*&statuses=succeeded,*&afterEnqueuedAt=2012-04-23&uids=1,2,3";
let query = deserr_query_params::<TasksFilterQuery>(params).unwrap(); let query = deserr_query_params::<TasksFilterQuery>(params).unwrap();
snapshot!(format!("{:?}", query), @"TasksFilterQuery { limit: Param(20), from: None, batch_uids: None, uids: List([1, 2, 3]), canceled_by: None, types: None, statuses: Star, index_uids: Star, after_enqueued_at: Other(2012-04-24 0:00:00.0 +00:00:00), before_enqueued_at: None, after_started_at: None, before_started_at: None, after_finished_at: None, before_finished_at: None }"); snapshot!(format!("{:?}", query), @"TasksFilterQuery { limit: Param(20), from: None, reverse: None, batch_uids: None, uids: List([1, 2, 3]), canceled_by: None, types: None, statuses: Star, index_uids: Star, after_enqueued_at: Other(2012-04-24 0:00:00.0 +00:00:00), before_enqueued_at: None, after_started_at: None, before_started_at: None, after_finished_at: None, before_finished_at: None }");
} }
{ {
// Stars should also translate to `None` in task deletion/cancelation queries // Stars should also translate to `None` in task deletion/cancelation queries

View File

@ -1733,7 +1733,10 @@ fn format_fields(
// select the attributes to retrieve // select the attributes to retrieve
let displayable_names = let displayable_names =
displayable_ids.iter().map(|&fid| field_ids_map.name(fid).expect("Missing field name")); displayable_ids.iter().map(|&fid| field_ids_map.name(fid).expect("Missing field name"));
permissive_json_pointer::map_leaf_values(&mut document, displayable_names, |key, value| { permissive_json_pointer::map_leaf_values(
&mut document,
displayable_names,
|key, array_indices, value| {
// To get the formatting option of each key we need to see all the rules that applies // To get the formatting option of each key we need to see all the rules that applies
// to the value and merge them together. eg. If a user said he wanted to highlight `doggo` // to the value and merge them together. eg. If a user said he wanted to highlight `doggo`
// and crop `doggo.name`. `doggo.name` needs to be highlighted + cropped while `doggo.age` is only // and crop `doggo.name`. `doggo.name` needs to be highlighted + cropped while `doggo.age` is only
@ -1764,6 +1767,7 @@ fn format_fields(
format, format,
&mut infos, &mut infos,
compute_matches, compute_matches,
array_indices,
locales, locales,
); );
@ -1772,7 +1776,8 @@ fn format_fields(
matches.insert(key.to_owned(), infos); matches.insert(key.to_owned(), infos);
} }
} }
}); },
);
let selectors = formatted_options let selectors = formatted_options
.keys() .keys()
@ -1790,13 +1795,14 @@ fn format_value(
format_options: Option<FormatOptions>, format_options: Option<FormatOptions>,
infos: &mut Vec<MatchBounds>, infos: &mut Vec<MatchBounds>,
compute_matches: bool, compute_matches: bool,
array_indices: &[usize],
locales: Option<&[Language]>, locales: Option<&[Language]>,
) -> Value { ) -> Value {
match value { match value {
Value::String(old_string) => { Value::String(old_string) => {
let mut matcher = builder.build(&old_string, locales); let mut matcher = builder.build(&old_string, locales);
if compute_matches { if compute_matches {
let matches = matcher.matches(); let matches = matcher.matches(array_indices);
infos.extend_from_slice(&matches[..]); infos.extend_from_slice(&matches[..]);
} }
@ -1808,51 +1814,15 @@ fn format_value(
None => Value::String(old_string), None => Value::String(old_string),
} }
} }
Value::Array(values) => Value::Array( // `map_leaf_values` makes sure this is only called for leaf fields
values Value::Array(_) => unreachable!(),
.into_iter() Value::Object(_) => unreachable!(),
.map(|v| {
format_value(
v,
builder,
format_options.map(|format_options| FormatOptions {
highlight: format_options.highlight,
crop: None,
}),
infos,
compute_matches,
locales,
)
})
.collect(),
),
Value::Object(object) => Value::Object(
object
.into_iter()
.map(|(k, v)| {
(
k,
format_value(
v,
builder,
format_options.map(|format_options| FormatOptions {
highlight: format_options.highlight,
crop: None,
}),
infos,
compute_matches,
locales,
),
)
})
.collect(),
),
Value::Number(number) => { Value::Number(number) => {
let s = number.to_string(); let s = number.to_string();
let mut matcher = builder.build(&s, locales); let mut matcher = builder.build(&s, locales);
if compute_matches { if compute_matches {
let matches = matcher.matches(); let matches = matcher.matches(array_indices);
infos.extend_from_slice(&matches[..]); infos.extend_from_slice(&matches[..]);
} }

View File

@ -114,6 +114,33 @@ async fn batch_bad_from() {
"#); "#);
} }
#[actix_rt::test]
async fn bask_bad_reverse() {
let server = Server::new_shared();
let (response, code) = server.batches_filter("reverse=doggo").await;
snapshot!(code, @"400 Bad Request");
snapshot!(response, @r###"
{
"message": "Invalid value in parameter `reverse`: could not parse `doggo` as a boolean, expected either `true` or `false`",
"code": "invalid_task_reverse",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_task_reverse"
}
"###);
let (response, code) = server.batches_filter("reverse=*").await;
snapshot!(code, @"400 Bad Request");
snapshot!(response, @r###"
{
"message": "Invalid value in parameter `reverse`: could not parse `*` as a boolean, expected either `true` or `false`",
"code": "invalid_task_reverse",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_task_reverse"
}
"###);
}
#[actix_rt::test] #[actix_rt::test]
async fn batch_bad_after_enqueued_at() { async fn batch_bad_after_enqueued_at() {
let server = Server::new_shared(); let server = Server::new_shared();

View File

@ -49,6 +49,44 @@ async fn list_batches() {
assert_eq!(response["results"].as_array().unwrap().len(), 2); assert_eq!(response["results"].as_array().unwrap().len(), 2);
} }
#[actix_rt::test]
async fn list_batches_pagination_and_reverse() {
let server = Server::new().await;
// First of all we want to create a lot of batches very quickly. The fastest way is to delete a lot of unexisting indexes
let mut last_batch = None;
for i in 0..10 {
let index = server.index(format!("test-{i}"));
last_batch = Some(index.create(None).await.0.uid());
}
server.wait_task(last_batch.unwrap()).await;
let (response, code) = server.batches_filter("limit=3").await;
assert_eq!(code, 200);
let results = response["results"].as_array().unwrap();
let batch_ids: Vec<_> = results.iter().map(|ret| ret["uid"].as_u64().unwrap()).collect();
snapshot!(format!("{batch_ids:?}"), @"[9, 8, 7]");
let (response, code) = server.batches_filter("limit=3&from=1").await;
assert_eq!(code, 200);
let results = response["results"].as_array().unwrap();
let batch_ids: Vec<_> = results.iter().map(|ret| ret["uid"].as_u64().unwrap()).collect();
snapshot!(format!("{batch_ids:?}"), @"[1, 0]");
// In reversed order
let (response, code) = server.batches_filter("limit=3&reverse=true").await;
assert_eq!(code, 200);
let results = response["results"].as_array().unwrap();
let batch_ids: Vec<_> = results.iter().map(|ret| ret["uid"].as_u64().unwrap()).collect();
snapshot!(format!("{batch_ids:?}"), @"[0, 1, 2]");
let (response, code) = server.batches_filter("limit=3&from=8&reverse=true").await;
assert_eq!(code, 200);
let results = response["results"].as_array().unwrap();
let batch_ids: Vec<_> = results.iter().map(|ret| ret["uid"].as_u64().unwrap()).collect();
snapshot!(format!("{batch_ids:?}"), @"[8, 9]");
}
#[actix_rt::test] #[actix_rt::test]
async fn list_batches_with_star_filters() { async fn list_batches_with_star_filters() {
let server = Server::new().await; let server = Server::new().await;
@ -186,14 +224,14 @@ async fn get_batch_filter_error() {
let (response, code) = server.batches_filter("lol=pied").await; let (response, code) = server.batches_filter("lol=pied").await;
assert_eq!(code, 400, "{}", response); assert_eq!(code, 400, "{}", response);
meili_snap::snapshot!(meili_snap::json_string!(response), @r###" meili_snap::snapshot!(meili_snap::json_string!(response), @r#"
{ {
"message": "Unknown parameter `lol`: expected one of `limit`, `from`, `batchUids`, `uids`, `canceledBy`, `types`, `statuses`, `indexUids`, `afterEnqueuedAt`, `beforeEnqueuedAt`, `afterStartedAt`, `beforeStartedAt`, `afterFinishedAt`, `beforeFinishedAt`", "message": "Unknown parameter `lol`: expected one of `limit`, `from`, `reverse`, `batchUids`, `uids`, `canceledBy`, `types`, `statuses`, `indexUids`, `afterEnqueuedAt`, `beforeEnqueuedAt`, `afterStartedAt`, `beforeStartedAt`, `afterFinishedAt`, `beforeFinishedAt`",
"code": "bad_request", "code": "bad_request",
"type": "invalid_request", "type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#bad_request" "link": "https://docs.meilisearch.com/errors#bad_request"
} }
"###); "#);
let (response, code) = server.batches_filter("uids=pied").await; let (response, code) = server.batches_filter("uids=pied").await;
assert_eq!(code, 400, "{}", response); assert_eq!(code, 400, "{}", response);

View File

@ -208,7 +208,10 @@ async fn format_nested() {
"doggos.name": [ "doggos.name": [
{ {
"start": 0, "start": 0,
"length": 5 "length": 5,
"indices": [
0
]
} }
] ]
} }

View File

@ -65,7 +65,6 @@ async fn perform_snapshot() {
let next_task = task.uid() + 1; let next_task = task.uid() + 1;
loop { loop {
let (value, code) = index.get_task(next_task).await; let (value, code) = index.get_task(next_task).await;
dbg!(&value);
if code != 404 && value["status"].as_str() == Some("succeeded") { if code != 404 && value["status"].as_str() == Some("succeeded") {
break; break;
} }

View File

@ -279,6 +279,55 @@ async fn task_bad_from() {
"###); "###);
} }
#[actix_rt::test]
async fn task_bad_reverse() {
let server = Server::new_shared();
let (response, code) = server.tasks_filter("reverse=doggo").await;
snapshot!(code, @"400 Bad Request");
snapshot!(response, @r###"
{
"message": "Invalid value in parameter `reverse`: could not parse `doggo` as a boolean, expected either `true` or `false`",
"code": "invalid_task_reverse",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_task_reverse"
}
"###);
let (response, code) = server.tasks_filter("reverse=*").await;
snapshot!(code, @"400 Bad Request");
snapshot!(response, @r###"
{
"message": "Invalid value in parameter `reverse`: could not parse `*` as a boolean, expected either `true` or `false`",
"code": "invalid_task_reverse",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_task_reverse"
}
"###);
let (response, code) = server.cancel_tasks("reverse=doggo").await;
snapshot!(code, @"400 Bad Request");
snapshot!(response, @r#"
{
"message": "Unknown parameter `reverse`: expected one of `uids`, `batchUids`, `canceledBy`, `types`, `statuses`, `indexUids`, `afterEnqueuedAt`, `beforeEnqueuedAt`, `afterStartedAt`, `beforeStartedAt`, `afterFinishedAt`, `beforeFinishedAt`",
"code": "bad_request",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#bad_request"
}
"#);
let (response, code) = server.delete_tasks("reverse=doggo").await;
snapshot!(code, @"400 Bad Request");
snapshot!(response, @r#"
{
"message": "Unknown parameter `reverse`: expected one of `uids`, `batchUids`, `canceledBy`, `types`, `statuses`, `indexUids`, `afterEnqueuedAt`, `beforeEnqueuedAt`, `afterStartedAt`, `beforeStartedAt`, `afterFinishedAt`, `beforeFinishedAt`",
"code": "bad_request",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#bad_request"
}
"#);
}
#[actix_rt::test] #[actix_rt::test]
async fn task_bad_after_enqueued_at() { async fn task_bad_after_enqueued_at() {
let server = Server::new_shared(); let server = Server::new_shared();

View File

@ -62,6 +62,44 @@ async fn list_tasks() {
assert_eq!(response["results"].as_array().unwrap().len(), 2); assert_eq!(response["results"].as_array().unwrap().len(), 2);
} }
#[actix_rt::test]
async fn list_tasks_pagination_and_reverse() {
let server = Server::new().await;
// First of all we want to create a lot of tasks very quickly. The fastest way is to delete a lot of unexisting indexes
let mut last_task = None;
for i in 0..10 {
let index = server.index(format!("test-{i}"));
last_task = Some(index.create(None).await.0.uid());
}
server.wait_task(last_task.unwrap()).await;
let (response, code) = server.tasks_filter("limit=3").await;
assert_eq!(code, 200);
let results = response["results"].as_array().unwrap();
let task_ids: Vec<_> = results.iter().map(|ret| ret["uid"].as_u64().unwrap()).collect();
snapshot!(format!("{task_ids:?}"), @"[9, 8, 7]");
let (response, code) = server.tasks_filter("limit=3&from=1").await;
assert_eq!(code, 200);
let results = response["results"].as_array().unwrap();
let task_ids: Vec<_> = results.iter().map(|ret| ret["uid"].as_u64().unwrap()).collect();
snapshot!(format!("{task_ids:?}"), @"[1, 0]");
// In reversed order
let (response, code) = server.tasks_filter("limit=3&reverse=true").await;
assert_eq!(code, 200);
let results = response["results"].as_array().unwrap();
let task_ids: Vec<_> = results.iter().map(|ret| ret["uid"].as_u64().unwrap()).collect();
snapshot!(format!("{task_ids:?}"), @"[0, 1, 2]");
let (response, code) = server.tasks_filter("limit=3&from=8&reverse=true").await;
assert_eq!(code, 200);
let results = response["results"].as_array().unwrap();
let task_ids: Vec<_> = results.iter().map(|ret| ret["uid"].as_u64().unwrap()).collect();
snapshot!(format!("{task_ids:?}"), @"[8, 9]");
}
#[actix_rt::test] #[actix_rt::test]
async fn list_tasks_with_star_filters() { async fn list_tasks_with_star_filters() {
let server = Server::new().await; let server = Server::new().await;
@ -193,131 +231,6 @@ async fn list_tasks_status_and_type_filtered() {
assert_eq!(response["results"].as_array().unwrap().len(), 2); assert_eq!(response["results"].as_array().unwrap().len(), 2);
} }
#[actix_rt::test]
async fn get_task_filter_error() {
let server = Server::new().await;
let (response, code) = server.tasks_filter("lol=pied").await;
assert_eq!(code, 400, "{}", response);
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
{
"message": "Unknown parameter `lol`: expected one of `limit`, `from`, `batchUids`, `uids`, `canceledBy`, `types`, `statuses`, `indexUids`, `afterEnqueuedAt`, `beforeEnqueuedAt`, `afterStartedAt`, `beforeStartedAt`, `afterFinishedAt`, `beforeFinishedAt`",
"code": "bad_request",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#bad_request"
}
"###);
let (response, code) = server.tasks_filter("uids=pied").await;
assert_eq!(code, 400, "{}", response);
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
{
"message": "Invalid value in parameter `uids`: could not parse `pied` as a positive integer",
"code": "invalid_task_uids",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_task_uids"
}
"###);
let (response, code) = server.tasks_filter("from=pied").await;
assert_eq!(code, 400, "{}", response);
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
{
"message": "Invalid value in parameter `from`: could not parse `pied` as a positive integer",
"code": "invalid_task_from",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_task_from"
}
"###);
let (response, code) = server.tasks_filter("beforeStartedAt=pied").await;
assert_eq!(code, 400, "{}", response);
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
{
"message": "Invalid value in parameter `beforeStartedAt`: `pied` is an invalid date-time. It should follow the YYYY-MM-DD or RFC 3339 date-time format.",
"code": "invalid_task_before_started_at",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_task_before_started_at"
}
"###);
}
#[actix_rt::test]
async fn delete_task_filter_error() {
let server = Server::new().await;
let (response, code) = server.delete_tasks("").await;
assert_eq!(code, 400, "{}", response);
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
{
"message": "Query parameters to filter the tasks to delete are missing. Available query parameters are: `uids`, `indexUids`, `statuses`, `types`, `canceledBy`, `beforeEnqueuedAt`, `afterEnqueuedAt`, `beforeStartedAt`, `afterStartedAt`, `beforeFinishedAt`, `afterFinishedAt`.",
"code": "missing_task_filters",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#missing_task_filters"
}
"###);
let (response, code) = server.delete_tasks("lol=pied").await;
assert_eq!(code, 400, "{}", response);
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
{
"message": "Unknown parameter `lol`: expected one of `uids`, `batchUids`, `canceledBy`, `types`, `statuses`, `indexUids`, `afterEnqueuedAt`, `beforeEnqueuedAt`, `afterStartedAt`, `beforeStartedAt`, `afterFinishedAt`, `beforeFinishedAt`",
"code": "bad_request",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#bad_request"
}
"###);
let (response, code) = server.delete_tasks("uids=pied").await;
assert_eq!(code, 400, "{}", response);
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
{
"message": "Invalid value in parameter `uids`: could not parse `pied` as a positive integer",
"code": "invalid_task_uids",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_task_uids"
}
"###);
}
#[actix_rt::test]
async fn cancel_task_filter_error() {
let server = Server::new().await;
let (response, code) = server.cancel_tasks("").await;
assert_eq!(code, 400, "{}", response);
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
{
"message": "Query parameters to filter the tasks to cancel are missing. Available query parameters are: `uids`, `indexUids`, `statuses`, `types`, `canceledBy`, `beforeEnqueuedAt`, `afterEnqueuedAt`, `beforeStartedAt`, `afterStartedAt`, `beforeFinishedAt`, `afterFinishedAt`.",
"code": "missing_task_filters",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#missing_task_filters"
}
"###);
let (response, code) = server.cancel_tasks("lol=pied").await;
assert_eq!(code, 400, "{}", response);
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
{
"message": "Unknown parameter `lol`: expected one of `uids`, `batchUids`, `canceledBy`, `types`, `statuses`, `indexUids`, `afterEnqueuedAt`, `beforeEnqueuedAt`, `afterStartedAt`, `beforeStartedAt`, `afterFinishedAt`, `beforeFinishedAt`",
"code": "bad_request",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#bad_request"
}
"###);
let (response, code) = server.cancel_tasks("uids=pied").await;
assert_eq!(code, 400, "{}", response);
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
{
"message": "Invalid value in parameter `uids`: could not parse `pied` as a positive integer",
"code": "invalid_task_uids",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_task_uids"
}
"###);
}
macro_rules! assert_valid_summarized_task { macro_rules! assert_valid_summarized_task {
($response:expr, $task_type:literal, $index:literal) => {{ ($response:expr, $task_type:literal, $index:literal) => {{
assert_eq!($response.as_object().unwrap().len(), 5); assert_eq!($response.as_object().unwrap().len(), 5);

View File

@ -76,13 +76,6 @@ fn update_index_stats(
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let ctx = || format!("while updating index stats for index `{index_uid}`"); let ctx = || format!("while updating index stats for index `{index_uid}`");
let stats: Option<&str> = index_stats
.remap_data_type::<Str>()
.get(sched_wtxn, &index_uuid)
.with_context(ctx)
.with_context(|| "While reading value")?;
dbg!(stats);
let stats: Option<v1_9::IndexStats> = index_stats let stats: Option<v1_9::IndexStats> = index_stats
.remap_data_type::<SerdeJson<v1_9::IndexStats>>() .remap_data_type::<SerdeJson<v1_9::IndexStats>>()
.get(sched_wtxn, &index_uuid) .get(sched_wtxn, &index_uuid)

View File

@ -105,6 +105,8 @@ impl FormatOptions {
pub struct MatchBounds { pub struct MatchBounds {
pub start: usize, pub start: usize,
pub length: usize, pub length: usize,
#[serde(skip_serializing_if = "Option::is_none")]
pub indices: Option<Vec<usize>>,
} }
/// Structure used to analyze a string, compute words that match, /// Structure used to analyze a string, compute words that match,
@ -220,15 +222,20 @@ impl<'t, 'tokenizer> Matcher<'t, 'tokenizer, '_, '_> {
} }
/// Returns boundaries of the words that match the query. /// Returns boundaries of the words that match the query.
pub fn matches(&mut self) -> Vec<MatchBounds> { pub fn matches(&mut self, array_indices: &[usize]) -> Vec<MatchBounds> {
match &self.matches { match &self.matches {
None => self.compute_matches().matches(), None => self.compute_matches().matches(array_indices),
Some((tokens, matches)) => matches Some((tokens, matches)) => matches
.iter() .iter()
.map(|m| MatchBounds { .map(|m| MatchBounds {
start: tokens[m.get_first_token_pos()].byte_start, start: tokens[m.get_first_token_pos()].byte_start,
// TODO: Why is this in chars, while start is in bytes? // TODO: Why is this in chars, while start is in bytes?
length: m.char_count, length: m.char_count,
indices: if array_indices.is_empty() {
None
} else {
Some(array_indices.to_owned())
},
}) })
.collect(), .collect(),
} }

View File

@ -45,7 +45,7 @@ fn contained_in(selector: &str, key: &str) -> bool {
/// map_leaf_values( /// map_leaf_values(
/// value.as_object_mut().unwrap(), /// value.as_object_mut().unwrap(),
/// ["jean.race.name"], /// ["jean.race.name"],
/// |key, value| match (value, key) { /// |key, _array_indices, value| match (value, key) {
/// (Value::String(name), "jean.race.name") => *name = "patou".to_string(), /// (Value::String(name), "jean.race.name") => *name = "patou".to_string(),
/// _ => unreachable!(), /// _ => unreachable!(),
/// }, /// },
@ -66,17 +66,18 @@ fn contained_in(selector: &str, key: &str) -> bool {
pub fn map_leaf_values<'a>( pub fn map_leaf_values<'a>(
value: &mut Map<String, Value>, value: &mut Map<String, Value>,
selectors: impl IntoIterator<Item = &'a str>, selectors: impl IntoIterator<Item = &'a str>,
mut mapper: impl FnMut(&str, &mut Value), mut mapper: impl FnMut(&str, &[usize], &mut Value),
) { ) {
let selectors: Vec<_> = selectors.into_iter().collect(); let selectors: Vec<_> = selectors.into_iter().collect();
map_leaf_values_in_object(value, &selectors, "", &mut mapper); map_leaf_values_in_object(value, &selectors, "", &[], &mut mapper);
} }
pub fn map_leaf_values_in_object( pub fn map_leaf_values_in_object(
value: &mut Map<String, Value>, value: &mut Map<String, Value>,
selectors: &[&str], selectors: &[&str],
base_key: &str, base_key: &str,
mapper: &mut impl FnMut(&str, &mut Value), array_indices: &[usize],
mapper: &mut impl FnMut(&str, &[usize], &mut Value),
) { ) {
for (key, value) in value.iter_mut() { for (key, value) in value.iter_mut() {
let base_key = if base_key.is_empty() { let base_key = if base_key.is_empty() {
@ -94,12 +95,12 @@ pub fn map_leaf_values_in_object(
if should_continue { if should_continue {
match value { match value {
Value::Object(object) => { Value::Object(object) => {
map_leaf_values_in_object(object, selectors, &base_key, mapper) map_leaf_values_in_object(object, selectors, &base_key, array_indices, mapper)
} }
Value::Array(array) => { Value::Array(array) => {
map_leaf_values_in_array(array, selectors, &base_key, mapper) map_leaf_values_in_array(array, selectors, &base_key, array_indices, mapper)
} }
value => mapper(&base_key, value), value => mapper(&base_key, array_indices, value),
} }
} }
} }
@ -109,13 +110,24 @@ pub fn map_leaf_values_in_array(
values: &mut [Value], values: &mut [Value],
selectors: &[&str], selectors: &[&str],
base_key: &str, base_key: &str,
mapper: &mut impl FnMut(&str, &mut Value), base_array_indices: &[usize],
mapper: &mut impl FnMut(&str, &[usize], &mut Value),
) { ) {
for value in values.iter_mut() { // This avoids allocating twice
let mut array_indices = Vec::with_capacity(base_array_indices.len() + 1);
array_indices.extend_from_slice(base_array_indices);
array_indices.push(0);
for (i, value) in values.iter_mut().enumerate() {
*array_indices.last_mut().unwrap() = i;
match value { match value {
Value::Object(object) => map_leaf_values_in_object(object, selectors, base_key, mapper), Value::Object(object) => {
Value::Array(array) => map_leaf_values_in_array(array, selectors, base_key, mapper), map_leaf_values_in_object(object, selectors, base_key, &array_indices, mapper)
value => mapper(base_key, value), }
Value::Array(array) => {
map_leaf_values_in_array(array, selectors, base_key, &array_indices, mapper)
}
value => mapper(base_key, &array_indices, value),
} }
} }
} }
@ -743,12 +755,14 @@ mod tests {
} }
}); });
map_leaf_values(value.as_object_mut().unwrap(), ["jean.race.name"], |key, value| { map_leaf_values(
match (value, key) { value.as_object_mut().unwrap(),
["jean.race.name"],
|key, _, value| match (value, key) {
(Value::String(name), "jean.race.name") => *name = S("patou"), (Value::String(name), "jean.race.name") => *name = S("patou"),
_ => unreachable!(), _ => unreachable!(),
} },
}); );
assert_eq!( assert_eq!(
value, value,
@ -775,7 +789,7 @@ mod tests {
}); });
let mut calls = 0; let mut calls = 0;
map_leaf_values(value.as_object_mut().unwrap(), ["jean"], |key, value| { map_leaf_values(value.as_object_mut().unwrap(), ["jean"], |key, _, value| {
calls += 1; calls += 1;
match (value, key) { match (value, key) {
(Value::String(name), "jean.race.name") => *name = S("patou"), (Value::String(name), "jean.race.name") => *name = S("patou"),
@ -798,4 +812,52 @@ mod tests {
}) })
); );
} }
#[test]
fn map_array() {
let mut value: Value = json!({
"no_array": "peter",
"simple": ["foo", "bar"],
"nested": [
{
"a": [
["cat", "dog"],
["fox", "bear"],
],
"b": "hi",
},
{
"a": ["green", "blue"],
},
],
});
map_leaf_values(
value.as_object_mut().unwrap(),
["no_array", "simple", "nested"],
|_key, array_indices, value| {
*value = format!("{array_indices:?}").into();
},
);
assert_eq!(
value,
json!({
"no_array": "[]",
"simple": ["[0]", "[1]"],
"nested": [
{
"a": [
["[0, 0, 0]", "[0, 0, 1]"],
["[0, 1, 0]", "[0, 1, 1]"],
],
"b": "[0]",
},
{
"a": ["[1, 0]", "[1, 1]"],
},
],
})
);
}
} }