mirror of
https://github.com/meilisearch/meilisearch.git
synced 2024-11-22 18:17:39 +08:00
Compare commits
14 Commits
ec06879d28
...
8004bac179
Author | SHA1 | Date | |
---|---|---|---|
|
8004bac179 | ||
|
7fd98d74be | ||
|
b817251a83 | ||
|
d21623605f | ||
|
6c4327dcb3 | ||
|
193c4b89e8 | ||
|
69d5472814 | ||
|
e3ddf9ef9a | ||
|
b5bea0cf56 | ||
|
2df4eeda74 | ||
|
5ce561f62c | ||
|
d941b9c2f4 | ||
|
46a7f87ac8 | ||
|
87299ff34c |
8
.github/workflows/flaky-tests.yml
vendored
8
.github/workflows/flaky-tests.yml
vendored
@ -21,10 +21,10 @@ jobs:
|
|||||||
- name: Install cargo-flaky
|
- name: Install cargo-flaky
|
||||||
run: cargo install cargo-flaky
|
run: cargo install cargo-flaky
|
||||||
- name: Run cargo flaky in the dumps
|
- name: Run cargo flaky in the dumps
|
||||||
run: cd crates/dump; cargo flaky -i 100 --release
|
run: cd dump; cargo flaky -i 100 --release
|
||||||
- name: Run cargo flaky in the index-scheduler
|
- name: Run cargo flaky in the index-scheduler
|
||||||
run: cd crates/index-scheduler; cargo flaky -i 100 --release
|
run: cd index-scheduler; cargo flaky -i 100 --release
|
||||||
- name: Run cargo flaky in the auth
|
- name: Run cargo flaky in the auth
|
||||||
run: cd crates/meilisearch-auth; cargo flaky -i 100 --release
|
run: cd meilisearch-auth; cargo flaky -i 100 --release
|
||||||
- name: Run cargo flaky in meilisearch
|
- name: Run cargo flaky in meilisearch
|
||||||
run: cd crates/meilisearch; cargo flaky -i 100 --release
|
run: cd meilisearch; cargo flaky -i 100 --release
|
||||||
|
@ -450,7 +450,7 @@ pub(crate) mod test {
|
|||||||
// tasks
|
// tasks
|
||||||
let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap();
|
let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap();
|
||||||
let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip();
|
let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip();
|
||||||
meili_snap::snapshot_hash!(meili_snap::json_string!(tasks), @"4b03e23e740b27bfb9d2a1faffe512e2");
|
meili_snap::snapshot_hash!(meili_snap::json_string!(tasks), @"41f91d3a94911b2735ec41b07540df5c");
|
||||||
assert_eq!(update_files.len(), 22);
|
assert_eq!(update_files.len(), 22);
|
||||||
assert!(update_files[0].is_none()); // the dump creation
|
assert!(update_files[0].is_none()); // the dump creation
|
||||||
assert!(update_files[1].is_some()); // the enqueued document addition
|
assert!(update_files[1].is_some()); // the enqueued document addition
|
||||||
|
@ -222,7 +222,7 @@ pub(crate) mod test {
|
|||||||
// tasks
|
// tasks
|
||||||
let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap();
|
let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap();
|
||||||
let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip();
|
let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip();
|
||||||
meili_snap::snapshot_hash!(meili_snap::json_string!(tasks), @"2b8a72d6bc6ba79980491966437daaf9");
|
meili_snap::snapshot_hash!(meili_snap::json_string!(tasks), @"278f63325ef06ca04d01df98d8207b94");
|
||||||
assert_eq!(update_files.len(), 10);
|
assert_eq!(update_files.len(), 10);
|
||||||
assert!(update_files[0].is_none()); // the dump creation
|
assert!(update_files[0].is_none()); // the dump creation
|
||||||
assert!(update_files[1].is_none());
|
assert!(update_files[1].is_none());
|
||||||
@ -345,7 +345,7 @@ pub(crate) mod test {
|
|||||||
// tasks
|
// tasks
|
||||||
let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap();
|
let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap();
|
||||||
let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip();
|
let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip();
|
||||||
meili_snap::snapshot_hash!(meili_snap::json_string!(tasks), @"3ddf6169b0a3703c5d770971f036fc5d");
|
meili_snap::snapshot_hash!(meili_snap::json_string!(tasks), @"d45cd8571703e58ae53c7bd7ce3f5c22");
|
||||||
assert_eq!(update_files.len(), 2);
|
assert_eq!(update_files.len(), 2);
|
||||||
assert!(update_files[0].is_none()); // the dump creation
|
assert!(update_files[0].is_none()); // the dump creation
|
||||||
assert!(update_files[1].is_none()); // the processed document addition
|
assert!(update_files[1].is_none()); // the processed document addition
|
||||||
@ -391,7 +391,7 @@ pub(crate) mod test {
|
|||||||
// tasks
|
// tasks
|
||||||
let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap();
|
let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap();
|
||||||
let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip();
|
let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip();
|
||||||
meili_snap::snapshot_hash!(meili_snap::json_string!(tasks), @"4b03e23e740b27bfb9d2a1faffe512e2");
|
meili_snap::snapshot_hash!(meili_snap::json_string!(tasks), @"41f91d3a94911b2735ec41b07540df5c");
|
||||||
assert_eq!(update_files.len(), 22);
|
assert_eq!(update_files.len(), 22);
|
||||||
assert!(update_files[0].is_none()); // the dump creation
|
assert!(update_files[0].is_none()); // the dump creation
|
||||||
assert!(update_files[1].is_some()); // the enqueued document addition
|
assert!(update_files[1].is_some()); // the enqueued document addition
|
||||||
@ -471,7 +471,7 @@ pub(crate) mod test {
|
|||||||
// tasks
|
// tasks
|
||||||
let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap();
|
let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap();
|
||||||
let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip();
|
let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip();
|
||||||
meili_snap::snapshot_hash!(meili_snap::json_string!(tasks), @"c1b06a5ca60d5805483c16c5b3ff61ef");
|
meili_snap::snapshot_hash!(meili_snap::json_string!(tasks), @"c2445ddd1785528b80f2ba534d3bd00c");
|
||||||
assert_eq!(update_files.len(), 10);
|
assert_eq!(update_files.len(), 10);
|
||||||
assert!(update_files[0].is_some()); // the enqueued document addition
|
assert!(update_files[0].is_some()); // the enqueued document addition
|
||||||
assert!(update_files[1..].iter().all(|u| u.is_none())); // everything already processed
|
assert!(update_files[1..].iter().all(|u| u.is_none())); // everything already processed
|
||||||
@ -548,7 +548,7 @@ pub(crate) mod test {
|
|||||||
// tasks
|
// tasks
|
||||||
let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap();
|
let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap();
|
||||||
let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip();
|
let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip();
|
||||||
meili_snap::snapshot_hash!(meili_snap::json_string!(tasks), @"0e203b6095f7c68dbdf788321dcc8215");
|
meili_snap::snapshot_hash!(meili_snap::json_string!(tasks), @"cd12efd308fe3ed226356a727ab42ed3");
|
||||||
assert_eq!(update_files.len(), 10);
|
assert_eq!(update_files.len(), 10);
|
||||||
assert!(update_files[0].is_some()); // the enqueued document addition
|
assert!(update_files[0].is_some()); // the enqueued document addition
|
||||||
assert!(update_files[1..].iter().all(|u| u.is_none())); // everything already processed
|
assert!(update_files[1..].iter().all(|u| u.is_none())); // everything already processed
|
||||||
@ -641,7 +641,7 @@ pub(crate) mod test {
|
|||||||
// tasks
|
// tasks
|
||||||
let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap();
|
let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap();
|
||||||
let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip();
|
let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip();
|
||||||
meili_snap::snapshot_hash!(meili_snap::json_string!(tasks), @"d216c7f90f538ffbb2a059531d7ac89a");
|
meili_snap::snapshot_hash!(meili_snap::json_string!(tasks), @"bc616290adfe7d09a624cf6065ca9069");
|
||||||
assert_eq!(update_files.len(), 9);
|
assert_eq!(update_files.len(), 9);
|
||||||
assert!(update_files[0].is_some()); // the enqueued document addition
|
assert!(update_files[0].is_some()); // the enqueued document addition
|
||||||
assert!(update_files[1..].iter().all(|u| u.is_none())); // everything already processed
|
assert!(update_files[1..].iter().all(|u| u.is_none())); // everything already processed
|
||||||
@ -734,7 +734,7 @@ pub(crate) mod test {
|
|||||||
// tasks
|
// tasks
|
||||||
let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap();
|
let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap();
|
||||||
let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip();
|
let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip();
|
||||||
meili_snap::snapshot_hash!(meili_snap::json_string!(tasks), @"e27999f1112632222cb84f6cffff7c5f");
|
meili_snap::snapshot_hash!(meili_snap::json_string!(tasks), @"2db37756d8af1fb7623436b76e8956a6");
|
||||||
assert_eq!(update_files.len(), 8);
|
assert_eq!(update_files.len(), 8);
|
||||||
assert!(update_files[0..].iter().all(|u| u.is_none())); // everything already processed
|
assert!(update_files[0..].iter().all(|u| u.is_none())); // everything already processed
|
||||||
|
|
||||||
@ -810,7 +810,7 @@ pub(crate) mod test {
|
|||||||
// tasks
|
// tasks
|
||||||
let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap();
|
let tasks = dump.tasks().unwrap().collect::<Result<Vec<_>>>().unwrap();
|
||||||
let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip();
|
let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip();
|
||||||
meili_snap::snapshot_hash!(meili_snap::json_string!(tasks), @"0155a664b0cf62aae23db5138b6b03d7");
|
meili_snap::snapshot_hash!(meili_snap::json_string!(tasks), @"8df6eab075a44b3c1af6b726f9fd9a43");
|
||||||
assert_eq!(update_files.len(), 9);
|
assert_eq!(update_files.len(), 9);
|
||||||
assert!(update_files[..].iter().all(|u| u.is_none())); // no update file in dump v1
|
assert!(update_files[..].iter().all(|u| u.is_none())); // no update file in dump v1
|
||||||
|
|
||||||
|
@ -46,7 +46,7 @@ use uuid::Uuid;
|
|||||||
|
|
||||||
use crate::autobatcher::{self, BatchKind};
|
use crate::autobatcher::{self, BatchKind};
|
||||||
use crate::utils::{self, swap_index_uid_in_task, ProcessingBatch};
|
use crate::utils::{self, swap_index_uid_in_task, ProcessingBatch};
|
||||||
use crate::{Error, IndexScheduler, MustStopProcessing, Result, TaskId};
|
use crate::{Error, IndexScheduler, MustStopProcessing, ProcessingTasks, Result, TaskId};
|
||||||
|
|
||||||
/// Represents a combination of tasks that can all be processed at the same time.
|
/// Represents a combination of tasks that can all be processed at the same time.
|
||||||
///
|
///
|
||||||
@ -58,6 +58,10 @@ pub(crate) enum Batch {
|
|||||||
TaskCancelation {
|
TaskCancelation {
|
||||||
/// The task cancelation itself.
|
/// The task cancelation itself.
|
||||||
task: Task,
|
task: Task,
|
||||||
|
/// The date and time at which the previously processing tasks started.
|
||||||
|
previous_started_at: OffsetDateTime,
|
||||||
|
/// The list of tasks that were processing when this task cancelation appeared.
|
||||||
|
previous_processing_tasks: RoaringBitmap,
|
||||||
},
|
},
|
||||||
TaskDeletions(Vec<Task>),
|
TaskDeletions(Vec<Task>),
|
||||||
SnapshotCreation(Vec<Task>),
|
SnapshotCreation(Vec<Task>),
|
||||||
@ -282,7 +286,7 @@ impl IndexScheduler {
|
|||||||
match batch {
|
match batch {
|
||||||
BatchKind::DocumentClear { ids } => Ok(Some(Batch::IndexOperation {
|
BatchKind::DocumentClear { ids } => Ok(Some(Batch::IndexOperation {
|
||||||
op: IndexOperation::DocumentClear {
|
op: IndexOperation::DocumentClear {
|
||||||
tasks: self.get_existing_tasks_for_processing_batch(
|
tasks: self.get_existing_tasks_with_processing_batch(
|
||||||
rtxn,
|
rtxn,
|
||||||
current_batch,
|
current_batch,
|
||||||
ids,
|
ids,
|
||||||
@ -308,7 +312,7 @@ impl IndexScheduler {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
BatchKind::DocumentOperation { method, operation_ids, .. } => {
|
BatchKind::DocumentOperation { method, operation_ids, .. } => {
|
||||||
let tasks = self.get_existing_tasks_for_processing_batch(
|
let tasks = self.get_existing_tasks_with_processing_batch(
|
||||||
rtxn,
|
rtxn,
|
||||||
current_batch,
|
current_batch,
|
||||||
operation_ids,
|
operation_ids,
|
||||||
@ -359,7 +363,7 @@ impl IndexScheduler {
|
|||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
BatchKind::DocumentDeletion { deletion_ids, includes_by_filter: _ } => {
|
BatchKind::DocumentDeletion { deletion_ids, includes_by_filter: _ } => {
|
||||||
let tasks = self.get_existing_tasks_for_processing_batch(
|
let tasks = self.get_existing_tasks_with_processing_batch(
|
||||||
rtxn,
|
rtxn,
|
||||||
current_batch,
|
current_batch,
|
||||||
deletion_ids,
|
deletion_ids,
|
||||||
@ -371,7 +375,7 @@ impl IndexScheduler {
|
|||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
BatchKind::Settings { settings_ids, .. } => {
|
BatchKind::Settings { settings_ids, .. } => {
|
||||||
let tasks = self.get_existing_tasks_for_processing_batch(
|
let tasks = self.get_existing_tasks_with_processing_batch(
|
||||||
rtxn,
|
rtxn,
|
||||||
current_batch,
|
current_batch,
|
||||||
settings_ids,
|
settings_ids,
|
||||||
@ -520,7 +524,7 @@ impl IndexScheduler {
|
|||||||
BatchKind::IndexDeletion { ids } => Ok(Some(Batch::IndexDeletion {
|
BatchKind::IndexDeletion { ids } => Ok(Some(Batch::IndexDeletion {
|
||||||
index_uid,
|
index_uid,
|
||||||
index_has_been_created: must_create_index,
|
index_has_been_created: must_create_index,
|
||||||
tasks: self.get_existing_tasks_for_processing_batch(rtxn, current_batch, ids)?,
|
tasks: self.get_existing_tasks_with_processing_batch(rtxn, current_batch, ids)?,
|
||||||
})),
|
})),
|
||||||
BatchKind::IndexSwap { id } => {
|
BatchKind::IndexSwap { id } => {
|
||||||
let mut task = self.get_task(rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?;
|
let mut task = self.get_task(rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?;
|
||||||
@ -552,9 +556,25 @@ impl IndexScheduler {
|
|||||||
|
|
||||||
// 1. we get the last task to cancel.
|
// 1. we get the last task to cancel.
|
||||||
if let Some(task_id) = to_cancel.max() {
|
if let Some(task_id) = to_cancel.max() {
|
||||||
|
// We retrieve the tasks that were processing before this tasks cancelation started.
|
||||||
|
// We must *not* reset the processing tasks before calling this method.
|
||||||
|
// Displaying the `batch_id` would make a strange error message since this task cancelation is going to
|
||||||
|
// replace the canceled batch. It's better to avoid mentioning it in the error message.
|
||||||
|
let ProcessingTasks { batch: previous_batch, processing } =
|
||||||
|
&*self.processing_tasks.read().unwrap();
|
||||||
let mut task = self.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
|
let mut task = self.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
|
||||||
current_batch.processing(Some(&mut task));
|
current_batch.processing(Some(&mut task));
|
||||||
return Ok(Some((Batch::TaskCancelation { task }, current_batch)));
|
return Ok(Some((
|
||||||
|
Batch::TaskCancelation {
|
||||||
|
task,
|
||||||
|
// We should never be in a case where we don't have a previous_batch, but let's not crash if it happens
|
||||||
|
previous_started_at: previous_batch
|
||||||
|
.as_ref()
|
||||||
|
.map_or_else(OffsetDateTime::now_utc, |batch| batch.started_at),
|
||||||
|
previous_processing_tasks: processing.clone(),
|
||||||
|
},
|
||||||
|
current_batch,
|
||||||
|
)));
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2. we get the next task to delete
|
// 2. we get the next task to delete
|
||||||
@ -661,7 +681,7 @@ impl IndexScheduler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
match batch {
|
match batch {
|
||||||
Batch::TaskCancelation { mut task } => {
|
Batch::TaskCancelation { mut task, previous_started_at, previous_processing_tasks } => {
|
||||||
// 1. Retrieve the tasks that matched the query at enqueue-time.
|
// 1. Retrieve the tasks that matched the query at enqueue-time.
|
||||||
let matched_tasks =
|
let matched_tasks =
|
||||||
if let KindWithContent::TaskCancelation { tasks, query: _ } = &task.kind {
|
if let KindWithContent::TaskCancelation { tasks, query: _ } = &task.kind {
|
||||||
|
@ -67,7 +67,7 @@ use roaring::RoaringBitmap;
|
|||||||
use synchronoise::SignalEvent;
|
use synchronoise::SignalEvent;
|
||||||
use time::format_description::well_known::Rfc3339;
|
use time::format_description::well_known::Rfc3339;
|
||||||
use time::OffsetDateTime;
|
use time::OffsetDateTime;
|
||||||
use utils::{filter_out_references_to_newer_tasks, keep_ids_within_datetimes, map_bound};
|
use utils::{filter_out_references_to_newer_tasks, keep_tasks_within_datetimes, map_bound};
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
use crate::index_mapper::IndexMapper;
|
use crate::index_mapper::IndexMapper;
|
||||||
@ -85,8 +85,6 @@ pub struct Query {
|
|||||||
pub limit: Option<u32>,
|
pub limit: Option<u32>,
|
||||||
/// The minimum [task id](`meilisearch_types::tasks::Task::uid`) to be matched
|
/// The minimum [task id](`meilisearch_types::tasks::Task::uid`) to be matched
|
||||||
pub from: Option<u32>,
|
pub from: Option<u32>,
|
||||||
/// The order used to return the tasks. By default the newest tasks are returned first and the boolean is `false`.
|
|
||||||
pub reverse: Option<bool>,
|
|
||||||
/// The [task ids](`meilisearch_types::tasks::Task::uid`) to be matched
|
/// The [task ids](`meilisearch_types::tasks::Task::uid`) to be matched
|
||||||
pub uids: Option<Vec<TaskId>>,
|
pub uids: Option<Vec<TaskId>>,
|
||||||
/// The [batch ids](`meilisearch_types::batches::Batch::uid`) to be matched
|
/// The [batch ids](`meilisearch_types::batches::Batch::uid`) to be matched
|
||||||
@ -131,7 +129,6 @@ impl Query {
|
|||||||
Query {
|
Query {
|
||||||
limit: None,
|
limit: None,
|
||||||
from: None,
|
from: None,
|
||||||
reverse: None,
|
|
||||||
uids: None,
|
uids: None,
|
||||||
batch_uids: None,
|
batch_uids: None,
|
||||||
statuses: None,
|
statuses: None,
|
||||||
@ -773,7 +770,6 @@ impl IndexScheduler {
|
|||||||
let Query {
|
let Query {
|
||||||
limit,
|
limit,
|
||||||
from,
|
from,
|
||||||
reverse,
|
|
||||||
uids,
|
uids,
|
||||||
batch_uids,
|
batch_uids,
|
||||||
statuses,
|
statuses,
|
||||||
@ -791,18 +787,13 @@ impl IndexScheduler {
|
|||||||
let mut tasks = self.all_task_ids(rtxn)?;
|
let mut tasks = self.all_task_ids(rtxn)?;
|
||||||
|
|
||||||
if let Some(from) = from {
|
if let Some(from) = from {
|
||||||
let range = if reverse.unwrap_or_default() {
|
tasks.remove_range(from.saturating_add(1)..);
|
||||||
u32::MIN..*from
|
|
||||||
} else {
|
|
||||||
from.saturating_add(1)..u32::MAX
|
|
||||||
};
|
|
||||||
tasks.remove_range(range);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(batch_uids) = batch_uids {
|
if let Some(batch_uids) = batch_uids {
|
||||||
let mut batch_tasks = RoaringBitmap::new();
|
let mut batch_tasks = RoaringBitmap::new();
|
||||||
for batch_uid in batch_uids {
|
for batch_uid in batch_uids {
|
||||||
if processing_batch.as_ref().map_or(false, |batch| batch.uid == *batch_uid) {
|
if Some(*batch_uid) == processing_batch.as_ref().map(|batch| batch.uid) {
|
||||||
batch_tasks |= &processing_tasks;
|
batch_tasks |= &processing_tasks;
|
||||||
} else {
|
} else {
|
||||||
batch_tasks |= self.tasks_in_batch(rtxn, *batch_uid)?;
|
batch_tasks |= self.tasks_in_batch(rtxn, *batch_uid)?;
|
||||||
@ -904,7 +895,7 @@ impl IndexScheduler {
|
|||||||
),
|
),
|
||||||
};
|
};
|
||||||
|
|
||||||
keep_ids_within_datetimes(
|
keep_tasks_within_datetimes(
|
||||||
rtxn,
|
rtxn,
|
||||||
&mut filtered_non_processing_tasks,
|
&mut filtered_non_processing_tasks,
|
||||||
self.started_at,
|
self.started_at,
|
||||||
@ -914,7 +905,7 @@ impl IndexScheduler {
|
|||||||
filtered_non_processing_tasks | filtered_processing_tasks
|
filtered_non_processing_tasks | filtered_processing_tasks
|
||||||
};
|
};
|
||||||
|
|
||||||
keep_ids_within_datetimes(
|
keep_tasks_within_datetimes(
|
||||||
rtxn,
|
rtxn,
|
||||||
&mut tasks,
|
&mut tasks,
|
||||||
self.enqueued_at,
|
self.enqueued_at,
|
||||||
@ -922,7 +913,7 @@ impl IndexScheduler {
|
|||||||
*before_enqueued_at,
|
*before_enqueued_at,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
keep_ids_within_datetimes(
|
keep_tasks_within_datetimes(
|
||||||
rtxn,
|
rtxn,
|
||||||
&mut tasks,
|
&mut tasks,
|
||||||
self.finished_at,
|
self.finished_at,
|
||||||
@ -930,12 +921,8 @@ impl IndexScheduler {
|
|||||||
*before_finished_at,
|
*before_finished_at,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
if let Some(limit) = limit {
|
if let Some(limit) = *limit {
|
||||||
tasks = if query.reverse.unwrap_or_default() {
|
tasks = tasks.into_iter().rev().take(limit as usize).collect();
|
||||||
tasks.into_iter().take(*limit as usize).collect()
|
|
||||||
} else {
|
|
||||||
tasks.into_iter().rev().take(*limit as usize).collect()
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(tasks)
|
Ok(tasks)
|
||||||
@ -948,44 +935,22 @@ impl IndexScheduler {
|
|||||||
processing: &ProcessingTasks,
|
processing: &ProcessingTasks,
|
||||||
query: &Query,
|
query: &Query,
|
||||||
) -> Result<RoaringBitmap> {
|
) -> Result<RoaringBitmap> {
|
||||||
let Query {
|
dbg!();
|
||||||
limit,
|
|
||||||
from,
|
|
||||||
reverse,
|
|
||||||
uids,
|
|
||||||
batch_uids,
|
|
||||||
statuses,
|
|
||||||
types,
|
|
||||||
index_uids,
|
|
||||||
canceled_by,
|
|
||||||
before_enqueued_at,
|
|
||||||
after_enqueued_at,
|
|
||||||
before_started_at,
|
|
||||||
after_started_at,
|
|
||||||
before_finished_at,
|
|
||||||
after_finished_at,
|
|
||||||
} = query;
|
|
||||||
|
|
||||||
let mut batches = self.all_batch_ids(rtxn)?;
|
let mut batches = self.all_batch_ids(rtxn)?;
|
||||||
if let Some(batch_id) = processing.batch.as_ref().map(|batch| batch.uid) {
|
if let Some(batch_id) = processing.batch.as_ref().map(|batch| batch.uid) {
|
||||||
batches.insert(batch_id);
|
batches.insert(batch_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(from) = from {
|
if let Some(from) = &query.from {
|
||||||
let range = if reverse.unwrap_or_default() {
|
batches.remove_range(from.saturating_add(1)..);
|
||||||
u32::MIN..*from
|
|
||||||
} else {
|
|
||||||
from.saturating_add(1)..u32::MAX
|
|
||||||
};
|
|
||||||
batches.remove_range(range);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(batch_uids) = &batch_uids {
|
if let Some(batch_uids) = &query.batch_uids {
|
||||||
let batches_uids = RoaringBitmap::from_iter(batch_uids);
|
let batches_uids = RoaringBitmap::from_iter(batch_uids);
|
||||||
batches &= batches_uids;
|
batches &= batches_uids;
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(status) = &statuses {
|
if let Some(status) = &query.statuses {
|
||||||
let mut status_batches = RoaringBitmap::new();
|
let mut status_batches = RoaringBitmap::new();
|
||||||
for status in status {
|
for status in status {
|
||||||
match status {
|
match status {
|
||||||
@ -1008,7 +973,7 @@ impl IndexScheduler {
|
|||||||
batches &= status_batches;
|
batches &= status_batches;
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(task_uids) = &uids {
|
if let Some(task_uids) = &query.uids {
|
||||||
let mut batches_by_task_uids = RoaringBitmap::new();
|
let mut batches_by_task_uids = RoaringBitmap::new();
|
||||||
for task_uid in task_uids {
|
for task_uid in task_uids {
|
||||||
if let Some(task) = self.get_task(rtxn, *task_uid)? {
|
if let Some(task) = self.get_task(rtxn, *task_uid)? {
|
||||||
@ -1021,7 +986,7 @@ impl IndexScheduler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// There is no database for this query, we must retrieve the task queried by the client and ensure it's valid
|
// There is no database for this query, we must retrieve the task queried by the client and ensure it's valid
|
||||||
if let Some(canceled_by) = &canceled_by {
|
if let Some(canceled_by) = &query.canceled_by {
|
||||||
let mut all_canceled_batches = RoaringBitmap::new();
|
let mut all_canceled_batches = RoaringBitmap::new();
|
||||||
for cancel_uid in canceled_by {
|
for cancel_uid in canceled_by {
|
||||||
if let Some(task) = self.get_task(rtxn, *cancel_uid)? {
|
if let Some(task) = self.get_task(rtxn, *cancel_uid)? {
|
||||||
@ -1044,7 +1009,7 @@ impl IndexScheduler {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(kind) = &types {
|
if let Some(kind) = &query.types {
|
||||||
let mut kind_batches = RoaringBitmap::new();
|
let mut kind_batches = RoaringBitmap::new();
|
||||||
for kind in kind {
|
for kind in kind {
|
||||||
kind_batches |= self.get_batch_kind(rtxn, *kind)?;
|
kind_batches |= self.get_batch_kind(rtxn, *kind)?;
|
||||||
@ -1059,7 +1024,7 @@ impl IndexScheduler {
|
|||||||
batches &= &kind_batches;
|
batches &= &kind_batches;
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(index) = &index_uids {
|
if let Some(index) = &query.index_uids {
|
||||||
let mut index_batches = RoaringBitmap::new();
|
let mut index_batches = RoaringBitmap::new();
|
||||||
for index in index {
|
for index in index {
|
||||||
index_batches |= self.index_batches(rtxn, index)?;
|
index_batches |= self.index_batches(rtxn, index)?;
|
||||||
@ -1100,52 +1065,48 @@ impl IndexScheduler {
|
|||||||
filtered_processing_batches.clear();
|
filtered_processing_batches.clear();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
match (after_started_at, before_started_at) {
|
match (query.after_started_at, query.before_started_at) {
|
||||||
(None, None) => (),
|
(None, None) => (),
|
||||||
(None, Some(before)) => {
|
(None, Some(before)) => {
|
||||||
clear_filtered_processing_batches(Bound::Unbounded, Bound::Excluded(*before))
|
clear_filtered_processing_batches(Bound::Unbounded, Bound::Excluded(before))
|
||||||
}
|
}
|
||||||
(Some(after), None) => {
|
(Some(after), None) => {
|
||||||
clear_filtered_processing_batches(Bound::Excluded(*after), Bound::Unbounded)
|
clear_filtered_processing_batches(Bound::Excluded(after), Bound::Unbounded)
|
||||||
}
|
}
|
||||||
(Some(after), Some(before)) => clear_filtered_processing_batches(
|
(Some(after), Some(before)) => clear_filtered_processing_batches(
|
||||||
Bound::Excluded(*after),
|
Bound::Excluded(after),
|
||||||
Bound::Excluded(*before),
|
Bound::Excluded(before),
|
||||||
),
|
),
|
||||||
};
|
};
|
||||||
|
|
||||||
keep_ids_within_datetimes(
|
keep_tasks_within_datetimes(
|
||||||
rtxn,
|
rtxn,
|
||||||
&mut filtered_non_processing_batches,
|
&mut filtered_non_processing_batches,
|
||||||
self.batch_started_at,
|
self.batch_started_at,
|
||||||
*after_started_at,
|
query.after_started_at,
|
||||||
*before_started_at,
|
query.before_started_at,
|
||||||
)?;
|
)?;
|
||||||
filtered_non_processing_batches | filtered_processing_batches
|
filtered_non_processing_batches | filtered_processing_batches
|
||||||
};
|
};
|
||||||
|
|
||||||
keep_ids_within_datetimes(
|
keep_tasks_within_datetimes(
|
||||||
rtxn,
|
rtxn,
|
||||||
&mut batches,
|
&mut batches,
|
||||||
self.batch_enqueued_at,
|
self.batch_enqueued_at,
|
||||||
*after_enqueued_at,
|
query.after_enqueued_at,
|
||||||
*before_enqueued_at,
|
query.before_enqueued_at,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
keep_ids_within_datetimes(
|
keep_tasks_within_datetimes(
|
||||||
rtxn,
|
rtxn,
|
||||||
&mut batches,
|
&mut batches,
|
||||||
self.batch_finished_at,
|
self.batch_finished_at,
|
||||||
*after_finished_at,
|
query.after_finished_at,
|
||||||
*before_finished_at,
|
query.before_finished_at,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
if let Some(limit) = limit {
|
if let Some(limit) = query.limit {
|
||||||
batches = if query.reverse.unwrap_or_default() {
|
batches = batches.into_iter().rev().take(limit as usize).collect();
|
||||||
batches.into_iter().take(*limit as usize).collect()
|
|
||||||
} else {
|
|
||||||
batches.into_iter().rev().take(*limit as usize).collect()
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(batches)
|
Ok(batches)
|
||||||
@ -1344,13 +1305,10 @@ impl IndexScheduler {
|
|||||||
let rtxn = self.env.read_txn()?;
|
let rtxn = self.env.read_txn()?;
|
||||||
|
|
||||||
let (tasks, total) = self.get_task_ids_from_authorized_indexes(&rtxn, &query, filters)?;
|
let (tasks, total) = self.get_task_ids_from_authorized_indexes(&rtxn, &query, filters)?;
|
||||||
let tasks = if query.reverse.unwrap_or_default() {
|
let tasks = self.get_existing_tasks(
|
||||||
Box::new(tasks.into_iter()) as Box<dyn Iterator<Item = u32>>
|
&rtxn,
|
||||||
} else {
|
tasks.into_iter().rev().take(query.limit.unwrap_or(u32::MAX) as usize),
|
||||||
Box::new(tasks.into_iter().rev()) as Box<dyn Iterator<Item = u32>>
|
)?;
|
||||||
};
|
|
||||||
let tasks =
|
|
||||||
self.get_existing_tasks(&rtxn, tasks.take(query.limit.unwrap_or(u32::MAX) as usize))?;
|
|
||||||
|
|
||||||
let ProcessingTasks { batch, processing } =
|
let ProcessingTasks { batch, processing } =
|
||||||
self.processing_tasks.read().map_err(|_| Error::CorruptedTaskQueue)?.clone();
|
self.processing_tasks.read().map_err(|_| Error::CorruptedTaskQueue)?.clone();
|
||||||
@ -1399,16 +1357,11 @@ impl IndexScheduler {
|
|||||||
|
|
||||||
let (batches, total) =
|
let (batches, total) =
|
||||||
self.get_batch_ids_from_authorized_indexes(&rtxn, &processing, &query, filters)?;
|
self.get_batch_ids_from_authorized_indexes(&rtxn, &processing, &query, filters)?;
|
||||||
let batches = if query.reverse.unwrap_or_default() {
|
|
||||||
Box::new(batches.into_iter()) as Box<dyn Iterator<Item = u32>>
|
|
||||||
} else {
|
|
||||||
Box::new(batches.into_iter().rev()) as Box<dyn Iterator<Item = u32>>
|
|
||||||
};
|
|
||||||
|
|
||||||
let batches = self.get_existing_batches(
|
let batches = self.get_existing_batches(
|
||||||
&rtxn,
|
&rtxn,
|
||||||
&processing,
|
&processing,
|
||||||
batches.take(query.limit.unwrap_or(u32::MAX) as usize),
|
batches.into_iter().rev().take(query.limit.unwrap_or(u32::MAX) as usize),
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
Ok((batches, total))
|
Ok((batches, total))
|
||||||
@ -1588,7 +1541,7 @@ impl IndexScheduler {
|
|||||||
drop(rtxn);
|
drop(rtxn);
|
||||||
|
|
||||||
// 1. store the starting date with the bitmap of processing tasks.
|
// 1. store the starting date with the bitmap of processing tasks.
|
||||||
let mut ids = batch.ids();
|
let ids = batch.ids();
|
||||||
let processed_tasks = ids.len();
|
let processed_tasks = ids.len();
|
||||||
|
|
||||||
// We reset the must_stop flag to be sure that we don't stop processing tasks
|
// We reset the must_stop flag to be sure that we don't stop processing tasks
|
||||||
@ -1625,7 +1578,6 @@ impl IndexScheduler {
|
|||||||
|
|
||||||
processing_batch.finished();
|
processing_batch.finished();
|
||||||
let mut wtxn = self.env.write_txn().map_err(Error::HeedTransaction)?;
|
let mut wtxn = self.env.write_txn().map_err(Error::HeedTransaction)?;
|
||||||
let mut canceled = RoaringBitmap::new();
|
|
||||||
|
|
||||||
match res {
|
match res {
|
||||||
Ok(tasks) => {
|
Ok(tasks) => {
|
||||||
@ -1635,6 +1587,7 @@ impl IndexScheduler {
|
|||||||
let mut success = 0;
|
let mut success = 0;
|
||||||
let mut failure = 0;
|
let mut failure = 0;
|
||||||
let mut canceled_by = None;
|
let mut canceled_by = None;
|
||||||
|
let mut canceled = RoaringBitmap::new();
|
||||||
|
|
||||||
#[allow(unused_variables)]
|
#[allow(unused_variables)]
|
||||||
for (i, mut task) in tasks.into_iter().enumerate() {
|
for (i, mut task) in tasks.into_iter().enumerate() {
|
||||||
@ -1725,12 +1678,9 @@ impl IndexScheduler {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
self.processing_tasks.write().unwrap().stop_processing();
|
let processed = self.processing_tasks.write().unwrap().stop_processing();
|
||||||
// We must re-add the canceled task so they're part of the same batch.
|
|
||||||
// processed.processing |= canceled;
|
|
||||||
ids |= canceled;
|
|
||||||
|
|
||||||
self.write_batch(&mut wtxn, processing_batch, &ids)?;
|
self.write_batch(&mut wtxn, processing_batch, &processed.processing)?;
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
self.maybe_fail(tests::FailureLocation::CommittingWtxn)?;
|
self.maybe_fail(tests::FailureLocation::CommittingWtxn)?;
|
||||||
@ -1760,7 +1710,7 @@ impl IndexScheduler {
|
|||||||
})?;
|
})?;
|
||||||
|
|
||||||
// We shouldn't crash the tick function if we can't send data to the webhook.
|
// We shouldn't crash the tick function if we can't send data to the webhook.
|
||||||
let _ = self.notify_webhook(&ids);
|
let _ = self.notify_webhook(&processed.processing);
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
self.breakpoint(Breakpoint::AfterProcessing);
|
self.breakpoint(Breakpoint::AfterProcessing);
|
||||||
@ -4194,6 +4144,7 @@ mod tests {
|
|||||||
tasks: [0, 1, 2, 3].into_iter().collect(),
|
tasks: [0, 1, 2, 3].into_iter().collect(),
|
||||||
};
|
};
|
||||||
let task_cancelation = index_scheduler.register(kind, None, false).unwrap();
|
let task_cancelation = index_scheduler.register(kind, None, false).unwrap();
|
||||||
|
println!("HEEERE");
|
||||||
handle.advance_n_successful_batches(1);
|
handle.advance_n_successful_batches(1);
|
||||||
|
|
||||||
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "start");
|
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "start");
|
||||||
|
@ -43,7 +43,7 @@ catto [0,]
|
|||||||
0 {uid: 0, details: {"receivedDocuments":1,"indexedDocuments":0,"matchedTasks":1,"canceledTasks":1,"originalFilter":"test_query"}, stats: {"totalNbTasks":2,"status":{"succeeded":1,"canceled":1},"types":{"documentAdditionOrUpdate":1,"taskCancelation":1},"indexUids":{"catto":1}}, }
|
0 {uid: 0, details: {"receivedDocuments":1,"indexedDocuments":0,"matchedTasks":1,"canceledTasks":1,"originalFilter":"test_query"}, stats: {"totalNbTasks":2,"status":{"succeeded":1,"canceled":1},"types":{"documentAdditionOrUpdate":1,"taskCancelation":1},"indexUids":{"catto":1}}, }
|
||||||
----------------------------------------------------------------------
|
----------------------------------------------------------------------
|
||||||
### Batch to tasks mapping:
|
### Batch to tasks mapping:
|
||||||
0 [0,1,]
|
0 [1,]
|
||||||
----------------------------------------------------------------------
|
----------------------------------------------------------------------
|
||||||
### Batches Status:
|
### Batches Status:
|
||||||
succeeded [0,]
|
succeeded [0,]
|
||||||
@ -67,5 +67,6 @@ catto [0,]
|
|||||||
[timestamp] [0,]
|
[timestamp] [0,]
|
||||||
----------------------------------------------------------------------
|
----------------------------------------------------------------------
|
||||||
### File Store:
|
### File Store:
|
||||||
|
00000000-0000-0000-0000-000000000000
|
||||||
|
|
||||||
----------------------------------------------------------------------
|
----------------------------------------------------------------------
|
||||||
|
@ -55,7 +55,7 @@ catto: { number_of_documents: 1, field_distribution: {"id": 1} }
|
|||||||
----------------------------------------------------------------------
|
----------------------------------------------------------------------
|
||||||
### Batch to tasks mapping:
|
### Batch to tasks mapping:
|
||||||
0 [0,]
|
0 [0,]
|
||||||
1 [1,2,3,]
|
1 [3,]
|
||||||
----------------------------------------------------------------------
|
----------------------------------------------------------------------
|
||||||
### Batches Status:
|
### Batches Status:
|
||||||
succeeded [0,1,]
|
succeeded [0,1,]
|
||||||
@ -84,5 +84,7 @@ wolfo [1,]
|
|||||||
[timestamp] [1,]
|
[timestamp] [1,]
|
||||||
----------------------------------------------------------------------
|
----------------------------------------------------------------------
|
||||||
### File Store:
|
### File Store:
|
||||||
|
00000000-0000-0000-0000-000000000001
|
||||||
|
00000000-0000-0000-0000-000000000002
|
||||||
|
|
||||||
----------------------------------------------------------------------
|
----------------------------------------------------------------------
|
||||||
|
@ -42,7 +42,7 @@ canceled [0,]
|
|||||||
0 {uid: 0, details: {"matchedTasks":1,"canceledTasks":1,"originalFilter":"cancel dump"}, stats: {"totalNbTasks":2,"status":{"succeeded":1,"canceled":1},"types":{"taskCancelation":1,"dumpCreation":1},"indexUids":{}}, }
|
0 {uid: 0, details: {"matchedTasks":1,"canceledTasks":1,"originalFilter":"cancel dump"}, stats: {"totalNbTasks":2,"status":{"succeeded":1,"canceled":1},"types":{"taskCancelation":1,"dumpCreation":1},"indexUids":{}}, }
|
||||||
----------------------------------------------------------------------
|
----------------------------------------------------------------------
|
||||||
### Batch to tasks mapping:
|
### Batch to tasks mapping:
|
||||||
0 [0,1,]
|
0 [1,]
|
||||||
----------------------------------------------------------------------
|
----------------------------------------------------------------------
|
||||||
### Batches Status:
|
### Batches Status:
|
||||||
succeeded [0,]
|
succeeded [0,]
|
||||||
|
@ -44,7 +44,7 @@ catto: { number_of_documents: 0, field_distribution: {} }
|
|||||||
0 {uid: 0, details: {"receivedDocuments":1,"indexedDocuments":0,"matchedTasks":1,"canceledTasks":1,"originalFilter":"test_query"}, stats: {"totalNbTasks":2,"status":{"succeeded":1,"canceled":1},"types":{"documentAdditionOrUpdate":1,"taskCancelation":1},"indexUids":{"catto":1}}, }
|
0 {uid: 0, details: {"receivedDocuments":1,"indexedDocuments":0,"matchedTasks":1,"canceledTasks":1,"originalFilter":"test_query"}, stats: {"totalNbTasks":2,"status":{"succeeded":1,"canceled":1},"types":{"documentAdditionOrUpdate":1,"taskCancelation":1},"indexUids":{"catto":1}}, }
|
||||||
----------------------------------------------------------------------
|
----------------------------------------------------------------------
|
||||||
### Batch to tasks mapping:
|
### Batch to tasks mapping:
|
||||||
0 [0,1,]
|
0 [1,]
|
||||||
----------------------------------------------------------------------
|
----------------------------------------------------------------------
|
||||||
### Batches Status:
|
### Batches Status:
|
||||||
succeeded [0,]
|
succeeded [0,]
|
||||||
@ -68,5 +68,6 @@ catto [0,]
|
|||||||
[timestamp] [0,]
|
[timestamp] [0,]
|
||||||
----------------------------------------------------------------------
|
----------------------------------------------------------------------
|
||||||
### File Store:
|
### File Store:
|
||||||
|
00000000-0000-0000-0000-000000000000
|
||||||
|
|
||||||
----------------------------------------------------------------------
|
----------------------------------------------------------------------
|
||||||
|
@ -54,7 +54,7 @@ catto: { number_of_documents: 0, field_distribution: {} }
|
|||||||
----------------------------------------------------------------------
|
----------------------------------------------------------------------
|
||||||
### Batch to tasks mapping:
|
### Batch to tasks mapping:
|
||||||
0 [0,]
|
0 [0,]
|
||||||
1 [1,2,3,]
|
1 [3,]
|
||||||
----------------------------------------------------------------------
|
----------------------------------------------------------------------
|
||||||
### Batches Status:
|
### Batches Status:
|
||||||
succeeded [0,1,]
|
succeeded [0,1,]
|
||||||
|
@ -54,7 +54,7 @@ catto: { number_of_documents: 0, field_distribution: {} }
|
|||||||
----------------------------------------------------------------------
|
----------------------------------------------------------------------
|
||||||
### Batch to tasks mapping:
|
### Batch to tasks mapping:
|
||||||
0 [0,]
|
0 [0,]
|
||||||
1 [1,2,3,]
|
1 [3,]
|
||||||
----------------------------------------------------------------------
|
----------------------------------------------------------------------
|
||||||
### Batches Status:
|
### Batches Status:
|
||||||
succeeded [0,1,]
|
succeeded [0,1,]
|
||||||
|
@ -211,10 +211,9 @@ impl IndexScheduler {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Convert an iterator to a `Vec` of tasks and edit the `ProcessingBatch` to add the given tasks.
|
/// Convert an iterator to a `Vec` of tasks. The tasks MUST exist or a
|
||||||
///
|
/// `CorruptedTaskQueue` error will be throwed.
|
||||||
/// The tasks MUST exist, or a `CorruptedTaskQueue` error will be thrown.
|
pub(crate) fn get_existing_tasks_with_processing_batch(
|
||||||
pub(crate) fn get_existing_tasks_for_processing_batch(
|
|
||||||
&self,
|
&self,
|
||||||
rtxn: &RoTxn,
|
rtxn: &RoTxn,
|
||||||
processing_batch: &mut ProcessingBatch,
|
processing_batch: &mut ProcessingBatch,
|
||||||
@ -233,7 +232,7 @@ impl IndexScheduler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Convert an iterator to a `Vec` of tasks. The tasks MUST exist or a
|
/// Convert an iterator to a `Vec` of tasks. The tasks MUST exist or a
|
||||||
/// `CorruptedTaskQueue` error will be thrown.
|
/// `CorruptedTaskQueue` error will be throwed.
|
||||||
pub(crate) fn get_existing_tasks(
|
pub(crate) fn get_existing_tasks(
|
||||||
&self,
|
&self,
|
||||||
rtxn: &RoTxn,
|
rtxn: &RoTxn,
|
||||||
@ -248,7 +247,7 @@ impl IndexScheduler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Convert an iterator to a `Vec` of batches. The batches MUST exist or a
|
/// Convert an iterator to a `Vec` of batches. The batches MUST exist or a
|
||||||
/// `CorruptedTaskQueue` error will be thrown.
|
/// `CorruptedTaskQueue` error will be throwed.
|
||||||
pub(crate) fn get_existing_batches(
|
pub(crate) fn get_existing_batches(
|
||||||
&self,
|
&self,
|
||||||
rtxn: &RoTxn,
|
rtxn: &RoTxn,
|
||||||
@ -271,10 +270,16 @@ impl IndexScheduler {
|
|||||||
pub(crate) fn update_task(&self, wtxn: &mut RwTxn, task: &Task) -> Result<()> {
|
pub(crate) fn update_task(&self, wtxn: &mut RwTxn, task: &Task) -> Result<()> {
|
||||||
let old_task = self.get_task(wtxn, task.uid)?.ok_or(Error::CorruptedTaskQueue)?;
|
let old_task = self.get_task(wtxn, task.uid)?.ok_or(Error::CorruptedTaskQueue)?;
|
||||||
|
|
||||||
debug_assert!(old_task != *task);
|
dbg!(&task);
|
||||||
|
|
||||||
debug_assert_eq!(old_task.uid, task.uid);
|
debug_assert_eq!(old_task.uid, task.uid);
|
||||||
debug_assert!(old_task.batch_uid.is_none() && task.batch_uid.is_some());
|
debug_assert!(old_task.batch_uid.is_none() && task.batch_uid.is_some());
|
||||||
|
|
||||||
|
// TODO: This shouldn't ever happen, we should assert it
|
||||||
|
if old_task == *task {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
if old_task.status != task.status {
|
if old_task.status != task.status {
|
||||||
self.update_status(wtxn, old_task.status, |bitmap| {
|
self.update_status(wtxn, old_task.status, |bitmap| {
|
||||||
bitmap.remove(task.uid);
|
bitmap.remove(task.uid);
|
||||||
@ -500,9 +505,10 @@ pub(crate) fn remove_task_datetime(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn keep_ids_within_datetimes(
|
// TODO: Rename the function since it also applies to batches
|
||||||
|
pub(crate) fn keep_tasks_within_datetimes(
|
||||||
rtxn: &RoTxn,
|
rtxn: &RoTxn,
|
||||||
ids: &mut RoaringBitmap,
|
tasks: &mut RoaringBitmap,
|
||||||
database: Database<BEI128, CboRoaringBitmapCodec>,
|
database: Database<BEI128, CboRoaringBitmapCodec>,
|
||||||
after: Option<OffsetDateTime>,
|
after: Option<OffsetDateTime>,
|
||||||
before: Option<OffsetDateTime>,
|
before: Option<OffsetDateTime>,
|
||||||
@ -513,15 +519,15 @@ pub(crate) fn keep_ids_within_datetimes(
|
|||||||
(Some(after), None) => (Bound::Excluded(*after), Bound::Unbounded),
|
(Some(after), None) => (Bound::Excluded(*after), Bound::Unbounded),
|
||||||
(Some(after), Some(before)) => (Bound::Excluded(*after), Bound::Excluded(*before)),
|
(Some(after), Some(before)) => (Bound::Excluded(*after), Bound::Excluded(*before)),
|
||||||
};
|
};
|
||||||
let mut collected_ids = RoaringBitmap::new();
|
let mut collected_task_ids = RoaringBitmap::new();
|
||||||
let start = map_bound(start, |b| b.unix_timestamp_nanos());
|
let start = map_bound(start, |b| b.unix_timestamp_nanos());
|
||||||
let end = map_bound(end, |b| b.unix_timestamp_nanos());
|
let end = map_bound(end, |b| b.unix_timestamp_nanos());
|
||||||
let iter = database.range(rtxn, &(start, end))?;
|
let iter = database.range(rtxn, &(start, end))?;
|
||||||
for r in iter {
|
for r in iter {
|
||||||
let (_timestamp, ids) = r?;
|
let (_timestamp, task_ids) = r?;
|
||||||
collected_ids |= ids;
|
collected_task_ids |= task_ids;
|
||||||
}
|
}
|
||||||
*ids &= collected_ids;
|
*tasks &= collected_task_ids;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -318,7 +318,6 @@ InvalidTaskBeforeStartedAt , InvalidRequest , BAD_REQUEST ;
|
|||||||
InvalidTaskCanceledBy , InvalidRequest , BAD_REQUEST ;
|
InvalidTaskCanceledBy , InvalidRequest , BAD_REQUEST ;
|
||||||
InvalidTaskFrom , InvalidRequest , BAD_REQUEST ;
|
InvalidTaskFrom , InvalidRequest , BAD_REQUEST ;
|
||||||
InvalidTaskLimit , InvalidRequest , BAD_REQUEST ;
|
InvalidTaskLimit , InvalidRequest , BAD_REQUEST ;
|
||||||
InvalidTaskReverse , InvalidRequest , BAD_REQUEST ;
|
|
||||||
InvalidTaskStatuses , InvalidRequest , BAD_REQUEST ;
|
InvalidTaskStatuses , InvalidRequest , BAD_REQUEST ;
|
||||||
InvalidTaskTypes , InvalidRequest , BAD_REQUEST ;
|
InvalidTaskTypes , InvalidRequest , BAD_REQUEST ;
|
||||||
InvalidTaskUids , InvalidRequest , BAD_REQUEST ;
|
InvalidTaskUids , InvalidRequest , BAD_REQUEST ;
|
||||||
|
@ -42,8 +42,6 @@ pub struct TasksFilterQuery {
|
|||||||
pub limit: Param<u32>,
|
pub limit: Param<u32>,
|
||||||
#[deserr(default, error = DeserrQueryParamError<InvalidTaskFrom>)]
|
#[deserr(default, error = DeserrQueryParamError<InvalidTaskFrom>)]
|
||||||
pub from: Option<Param<TaskId>>,
|
pub from: Option<Param<TaskId>>,
|
||||||
#[deserr(default, error = DeserrQueryParamError<InvalidTaskReverse>)]
|
|
||||||
pub reverse: Option<Param<bool>>,
|
|
||||||
|
|
||||||
#[deserr(default, error = DeserrQueryParamError<InvalidBatchUids>)]
|
#[deserr(default, error = DeserrQueryParamError<InvalidBatchUids>)]
|
||||||
pub batch_uids: OptionStarOrList<BatchId>,
|
pub batch_uids: OptionStarOrList<BatchId>,
|
||||||
@ -78,7 +76,6 @@ impl TasksFilterQuery {
|
|||||||
Query {
|
Query {
|
||||||
limit: Some(self.limit.0),
|
limit: Some(self.limit.0),
|
||||||
from: self.from.as_deref().copied(),
|
from: self.from.as_deref().copied(),
|
||||||
reverse: self.reverse.as_deref().copied(),
|
|
||||||
batch_uids: self.batch_uids.merge_star_and_none(),
|
batch_uids: self.batch_uids.merge_star_and_none(),
|
||||||
statuses: self.statuses.merge_star_and_none(),
|
statuses: self.statuses.merge_star_and_none(),
|
||||||
types: self.types.merge_star_and_none(),
|
types: self.types.merge_star_and_none(),
|
||||||
@ -152,7 +149,6 @@ impl TaskDeletionOrCancelationQuery {
|
|||||||
Query {
|
Query {
|
||||||
limit: None,
|
limit: None,
|
||||||
from: None,
|
from: None,
|
||||||
reverse: None,
|
|
||||||
batch_uids: self.batch_uids.merge_star_and_none(),
|
batch_uids: self.batch_uids.merge_star_and_none(),
|
||||||
statuses: self.statuses.merge_star_and_none(),
|
statuses: self.statuses.merge_star_and_none(),
|
||||||
types: self.types.merge_star_and_none(),
|
types: self.types.merge_star_and_none(),
|
||||||
@ -713,14 +709,14 @@ mod tests {
|
|||||||
{
|
{
|
||||||
let params = "from=12&limit=15&indexUids=toto,tata-78&statuses=succeeded,enqueued&afterEnqueuedAt=2012-04-23&uids=1,2,3";
|
let params = "from=12&limit=15&indexUids=toto,tata-78&statuses=succeeded,enqueued&afterEnqueuedAt=2012-04-23&uids=1,2,3";
|
||||||
let query = deserr_query_params::<TasksFilterQuery>(params).unwrap();
|
let query = deserr_query_params::<TasksFilterQuery>(params).unwrap();
|
||||||
snapshot!(format!("{:?}", query), @r###"TasksFilterQuery { limit: Param(15), from: Some(Param(12)), reverse: None, batch_uids: None, uids: List([1, 2, 3]), canceled_by: None, types: None, statuses: List([Succeeded, Enqueued]), index_uids: List([IndexUid("toto"), IndexUid("tata-78")]), after_enqueued_at: Other(2012-04-24 0:00:00.0 +00:00:00), before_enqueued_at: None, after_started_at: None, before_started_at: None, after_finished_at: None, before_finished_at: None }"###);
|
snapshot!(format!("{:?}", query), @r###"TasksFilterQuery { limit: Param(15), from: Some(Param(12)), batch_uids: None, uids: List([1, 2, 3]), canceled_by: None, types: None, statuses: List([Succeeded, Enqueued]), index_uids: List([IndexUid("toto"), IndexUid("tata-78")]), after_enqueued_at: Other(2012-04-24 0:00:00.0 +00:00:00), before_enqueued_at: None, after_started_at: None, before_started_at: None, after_finished_at: None, before_finished_at: None }"###);
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
// Stars should translate to `None` in the query
|
// Stars should translate to `None` in the query
|
||||||
// Verify value of the default limit
|
// Verify value of the default limit
|
||||||
let params = "indexUids=*&statuses=succeeded,*&afterEnqueuedAt=2012-04-23&uids=1,2,3";
|
let params = "indexUids=*&statuses=succeeded,*&afterEnqueuedAt=2012-04-23&uids=1,2,3";
|
||||||
let query = deserr_query_params::<TasksFilterQuery>(params).unwrap();
|
let query = deserr_query_params::<TasksFilterQuery>(params).unwrap();
|
||||||
snapshot!(format!("{:?}", query), @"TasksFilterQuery { limit: Param(20), from: None, reverse: None, batch_uids: None, uids: List([1, 2, 3]), canceled_by: None, types: None, statuses: Star, index_uids: Star, after_enqueued_at: Other(2012-04-24 0:00:00.0 +00:00:00), before_enqueued_at: None, after_started_at: None, before_started_at: None, after_finished_at: None, before_finished_at: None }");
|
snapshot!(format!("{:?}", query), @"TasksFilterQuery { limit: Param(20), from: None, batch_uids: None, uids: List([1, 2, 3]), canceled_by: None, types: None, statuses: Star, index_uids: Star, after_enqueued_at: Other(2012-04-24 0:00:00.0 +00:00:00), before_enqueued_at: None, after_started_at: None, before_started_at: None, after_finished_at: None, before_finished_at: None }");
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
// Stars should also translate to `None` in task deletion/cancelation queries
|
// Stars should also translate to `None` in task deletion/cancelation queries
|
||||||
|
@ -1733,51 +1733,46 @@ fn format_fields(
|
|||||||
// select the attributes to retrieve
|
// select the attributes to retrieve
|
||||||
let displayable_names =
|
let displayable_names =
|
||||||
displayable_ids.iter().map(|&fid| field_ids_map.name(fid).expect("Missing field name"));
|
displayable_ids.iter().map(|&fid| field_ids_map.name(fid).expect("Missing field name"));
|
||||||
permissive_json_pointer::map_leaf_values(
|
permissive_json_pointer::map_leaf_values(&mut document, displayable_names, |key, value| {
|
||||||
&mut document,
|
// To get the formatting option of each key we need to see all the rules that applies
|
||||||
displayable_names,
|
// to the value and merge them together. eg. If a user said he wanted to highlight `doggo`
|
||||||
|key, array_indices, value| {
|
// and crop `doggo.name`. `doggo.name` needs to be highlighted + cropped while `doggo.age` is only
|
||||||
// To get the formatting option of each key we need to see all the rules that applies
|
// highlighted.
|
||||||
// to the value and merge them together. eg. If a user said he wanted to highlight `doggo`
|
// Warn: The time to compute the format list scales with the number of fields to format;
|
||||||
// and crop `doggo.name`. `doggo.name` needs to be highlighted + cropped while `doggo.age` is only
|
// cumulated with map_leaf_values that iterates over all the nested fields, it gives a quadratic complexity:
|
||||||
// highlighted.
|
// d*f where d is the total number of fields to display and f is the total number of fields to format.
|
||||||
// Warn: The time to compute the format list scales with the number of fields to format;
|
let format = formatting_fields_options
|
||||||
// cumulated with map_leaf_values that iterates over all the nested fields, it gives a quadratic complexity:
|
.iter()
|
||||||
// d*f where d is the total number of fields to display and f is the total number of fields to format.
|
.filter(|(name, _option)| {
|
||||||
let format = formatting_fields_options
|
milli::is_faceted_by(name, key) || milli::is_faceted_by(key, name)
|
||||||
|
})
|
||||||
|
.map(|(_, option)| **option)
|
||||||
|
.reduce(|acc, option| acc.merge(option));
|
||||||
|
let mut infos = Vec::new();
|
||||||
|
|
||||||
|
// if no locales has been provided, we try to find the locales in the localized_attributes.
|
||||||
|
let locales = locales.or_else(|| {
|
||||||
|
localized_attributes
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|(name, _option)| {
|
.find(|rule| rule.match_str(key))
|
||||||
milli::is_faceted_by(name, key) || milli::is_faceted_by(key, name)
|
.map(LocalizedAttributesRule::locales)
|
||||||
})
|
});
|
||||||
.map(|(_, option)| **option)
|
|
||||||
.reduce(|acc, option| acc.merge(option));
|
|
||||||
let mut infos = Vec::new();
|
|
||||||
|
|
||||||
// if no locales has been provided, we try to find the locales in the localized_attributes.
|
*value = format_value(
|
||||||
let locales = locales.or_else(|| {
|
std::mem::take(value),
|
||||||
localized_attributes
|
builder,
|
||||||
.iter()
|
format,
|
||||||
.find(|rule| rule.match_str(key))
|
&mut infos,
|
||||||
.map(LocalizedAttributesRule::locales)
|
compute_matches,
|
||||||
});
|
locales,
|
||||||
|
);
|
||||||
|
|
||||||
*value = format_value(
|
if let Some(matches) = matches_position.as_mut() {
|
||||||
std::mem::take(value),
|
if !infos.is_empty() {
|
||||||
builder,
|
matches.insert(key.to_owned(), infos);
|
||||||
format,
|
|
||||||
&mut infos,
|
|
||||||
compute_matches,
|
|
||||||
array_indices,
|
|
||||||
locales,
|
|
||||||
);
|
|
||||||
|
|
||||||
if let Some(matches) = matches_position.as_mut() {
|
|
||||||
if !infos.is_empty() {
|
|
||||||
matches.insert(key.to_owned(), infos);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
},
|
}
|
||||||
);
|
});
|
||||||
|
|
||||||
let selectors = formatted_options
|
let selectors = formatted_options
|
||||||
.keys()
|
.keys()
|
||||||
@ -1795,14 +1790,13 @@ fn format_value(
|
|||||||
format_options: Option<FormatOptions>,
|
format_options: Option<FormatOptions>,
|
||||||
infos: &mut Vec<MatchBounds>,
|
infos: &mut Vec<MatchBounds>,
|
||||||
compute_matches: bool,
|
compute_matches: bool,
|
||||||
array_indices: &[usize],
|
|
||||||
locales: Option<&[Language]>,
|
locales: Option<&[Language]>,
|
||||||
) -> Value {
|
) -> Value {
|
||||||
match value {
|
match value {
|
||||||
Value::String(old_string) => {
|
Value::String(old_string) => {
|
||||||
let mut matcher = builder.build(&old_string, locales);
|
let mut matcher = builder.build(&old_string, locales);
|
||||||
if compute_matches {
|
if compute_matches {
|
||||||
let matches = matcher.matches(array_indices);
|
let matches = matcher.matches();
|
||||||
infos.extend_from_slice(&matches[..]);
|
infos.extend_from_slice(&matches[..]);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1814,15 +1808,51 @@ fn format_value(
|
|||||||
None => Value::String(old_string),
|
None => Value::String(old_string),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// `map_leaf_values` makes sure this is only called for leaf fields
|
Value::Array(values) => Value::Array(
|
||||||
Value::Array(_) => unreachable!(),
|
values
|
||||||
Value::Object(_) => unreachable!(),
|
.into_iter()
|
||||||
|
.map(|v| {
|
||||||
|
format_value(
|
||||||
|
v,
|
||||||
|
builder,
|
||||||
|
format_options.map(|format_options| FormatOptions {
|
||||||
|
highlight: format_options.highlight,
|
||||||
|
crop: None,
|
||||||
|
}),
|
||||||
|
infos,
|
||||||
|
compute_matches,
|
||||||
|
locales,
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.collect(),
|
||||||
|
),
|
||||||
|
Value::Object(object) => Value::Object(
|
||||||
|
object
|
||||||
|
.into_iter()
|
||||||
|
.map(|(k, v)| {
|
||||||
|
(
|
||||||
|
k,
|
||||||
|
format_value(
|
||||||
|
v,
|
||||||
|
builder,
|
||||||
|
format_options.map(|format_options| FormatOptions {
|
||||||
|
highlight: format_options.highlight,
|
||||||
|
crop: None,
|
||||||
|
}),
|
||||||
|
infos,
|
||||||
|
compute_matches,
|
||||||
|
locales,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.collect(),
|
||||||
|
),
|
||||||
Value::Number(number) => {
|
Value::Number(number) => {
|
||||||
let s = number.to_string();
|
let s = number.to_string();
|
||||||
|
|
||||||
let mut matcher = builder.build(&s, locales);
|
let mut matcher = builder.build(&s, locales);
|
||||||
if compute_matches {
|
if compute_matches {
|
||||||
let matches = matcher.matches(array_indices);
|
let matches = matcher.matches();
|
||||||
infos.extend_from_slice(&matches[..]);
|
infos.extend_from_slice(&matches[..]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -114,33 +114,6 @@ async fn batch_bad_from() {
|
|||||||
"#);
|
"#);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
|
||||||
async fn bask_bad_reverse() {
|
|
||||||
let server = Server::new_shared();
|
|
||||||
|
|
||||||
let (response, code) = server.batches_filter("reverse=doggo").await;
|
|
||||||
snapshot!(code, @"400 Bad Request");
|
|
||||||
snapshot!(response, @r###"
|
|
||||||
{
|
|
||||||
"message": "Invalid value in parameter `reverse`: could not parse `doggo` as a boolean, expected either `true` or `false`",
|
|
||||||
"code": "invalid_task_reverse",
|
|
||||||
"type": "invalid_request",
|
|
||||||
"link": "https://docs.meilisearch.com/errors#invalid_task_reverse"
|
|
||||||
}
|
|
||||||
"###);
|
|
||||||
|
|
||||||
let (response, code) = server.batches_filter("reverse=*").await;
|
|
||||||
snapshot!(code, @"400 Bad Request");
|
|
||||||
snapshot!(response, @r###"
|
|
||||||
{
|
|
||||||
"message": "Invalid value in parameter `reverse`: could not parse `*` as a boolean, expected either `true` or `false`",
|
|
||||||
"code": "invalid_task_reverse",
|
|
||||||
"type": "invalid_request",
|
|
||||||
"link": "https://docs.meilisearch.com/errors#invalid_task_reverse"
|
|
||||||
}
|
|
||||||
"###);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn batch_bad_after_enqueued_at() {
|
async fn batch_bad_after_enqueued_at() {
|
||||||
let server = Server::new_shared();
|
let server = Server::new_shared();
|
||||||
|
@ -49,44 +49,6 @@ async fn list_batches() {
|
|||||||
assert_eq!(response["results"].as_array().unwrap().len(), 2);
|
assert_eq!(response["results"].as_array().unwrap().len(), 2);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
|
||||||
async fn list_batches_pagination_and_reverse() {
|
|
||||||
let server = Server::new().await;
|
|
||||||
// First of all we want to create a lot of batches very quickly. The fastest way is to delete a lot of unexisting indexes
|
|
||||||
let mut last_batch = None;
|
|
||||||
for i in 0..10 {
|
|
||||||
let index = server.index(format!("test-{i}"));
|
|
||||||
last_batch = Some(index.create(None).await.0.uid());
|
|
||||||
}
|
|
||||||
server.wait_task(last_batch.unwrap()).await;
|
|
||||||
|
|
||||||
let (response, code) = server.batches_filter("limit=3").await;
|
|
||||||
assert_eq!(code, 200);
|
|
||||||
let results = response["results"].as_array().unwrap();
|
|
||||||
let batch_ids: Vec<_> = results.iter().map(|ret| ret["uid"].as_u64().unwrap()).collect();
|
|
||||||
snapshot!(format!("{batch_ids:?}"), @"[9, 8, 7]");
|
|
||||||
|
|
||||||
let (response, code) = server.batches_filter("limit=3&from=1").await;
|
|
||||||
assert_eq!(code, 200);
|
|
||||||
let results = response["results"].as_array().unwrap();
|
|
||||||
let batch_ids: Vec<_> = results.iter().map(|ret| ret["uid"].as_u64().unwrap()).collect();
|
|
||||||
snapshot!(format!("{batch_ids:?}"), @"[1, 0]");
|
|
||||||
|
|
||||||
// In reversed order
|
|
||||||
|
|
||||||
let (response, code) = server.batches_filter("limit=3&reverse=true").await;
|
|
||||||
assert_eq!(code, 200);
|
|
||||||
let results = response["results"].as_array().unwrap();
|
|
||||||
let batch_ids: Vec<_> = results.iter().map(|ret| ret["uid"].as_u64().unwrap()).collect();
|
|
||||||
snapshot!(format!("{batch_ids:?}"), @"[0, 1, 2]");
|
|
||||||
|
|
||||||
let (response, code) = server.batches_filter("limit=3&from=8&reverse=true").await;
|
|
||||||
assert_eq!(code, 200);
|
|
||||||
let results = response["results"].as_array().unwrap();
|
|
||||||
let batch_ids: Vec<_> = results.iter().map(|ret| ret["uid"].as_u64().unwrap()).collect();
|
|
||||||
snapshot!(format!("{batch_ids:?}"), @"[8, 9]");
|
|
||||||
}
|
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn list_batches_with_star_filters() {
|
async fn list_batches_with_star_filters() {
|
||||||
let server = Server::new().await;
|
let server = Server::new().await;
|
||||||
@ -224,14 +186,14 @@ async fn get_batch_filter_error() {
|
|||||||
|
|
||||||
let (response, code) = server.batches_filter("lol=pied").await;
|
let (response, code) = server.batches_filter("lol=pied").await;
|
||||||
assert_eq!(code, 400, "{}", response);
|
assert_eq!(code, 400, "{}", response);
|
||||||
meili_snap::snapshot!(meili_snap::json_string!(response), @r#"
|
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
|
||||||
{
|
{
|
||||||
"message": "Unknown parameter `lol`: expected one of `limit`, `from`, `reverse`, `batchUids`, `uids`, `canceledBy`, `types`, `statuses`, `indexUids`, `afterEnqueuedAt`, `beforeEnqueuedAt`, `afterStartedAt`, `beforeStartedAt`, `afterFinishedAt`, `beforeFinishedAt`",
|
"message": "Unknown parameter `lol`: expected one of `limit`, `from`, `batchUids`, `uids`, `canceledBy`, `types`, `statuses`, `indexUids`, `afterEnqueuedAt`, `beforeEnqueuedAt`, `afterStartedAt`, `beforeStartedAt`, `afterFinishedAt`, `beforeFinishedAt`",
|
||||||
"code": "bad_request",
|
"code": "bad_request",
|
||||||
"type": "invalid_request",
|
"type": "invalid_request",
|
||||||
"link": "https://docs.meilisearch.com/errors#bad_request"
|
"link": "https://docs.meilisearch.com/errors#bad_request"
|
||||||
}
|
}
|
||||||
"#);
|
"###);
|
||||||
|
|
||||||
let (response, code) = server.batches_filter("uids=pied").await;
|
let (response, code) = server.batches_filter("uids=pied").await;
|
||||||
assert_eq!(code, 400, "{}", response);
|
assert_eq!(code, 400, "{}", response);
|
||||||
|
@ -208,10 +208,7 @@ async fn format_nested() {
|
|||||||
"doggos.name": [
|
"doggos.name": [
|
||||||
{
|
{
|
||||||
"start": 0,
|
"start": 0,
|
||||||
"length": 5,
|
"length": 5
|
||||||
"indices": [
|
|
||||||
0
|
|
||||||
]
|
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
@ -65,6 +65,7 @@ async fn perform_snapshot() {
|
|||||||
let next_task = task.uid() + 1;
|
let next_task = task.uid() + 1;
|
||||||
loop {
|
loop {
|
||||||
let (value, code) = index.get_task(next_task).await;
|
let (value, code) = index.get_task(next_task).await;
|
||||||
|
dbg!(&value);
|
||||||
if code != 404 && value["status"].as_str() == Some("succeeded") {
|
if code != 404 && value["status"].as_str() == Some("succeeded") {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -279,55 +279,6 @@ async fn task_bad_from() {
|
|||||||
"###);
|
"###);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
|
||||||
async fn task_bad_reverse() {
|
|
||||||
let server = Server::new_shared();
|
|
||||||
|
|
||||||
let (response, code) = server.tasks_filter("reverse=doggo").await;
|
|
||||||
snapshot!(code, @"400 Bad Request");
|
|
||||||
snapshot!(response, @r###"
|
|
||||||
{
|
|
||||||
"message": "Invalid value in parameter `reverse`: could not parse `doggo` as a boolean, expected either `true` or `false`",
|
|
||||||
"code": "invalid_task_reverse",
|
|
||||||
"type": "invalid_request",
|
|
||||||
"link": "https://docs.meilisearch.com/errors#invalid_task_reverse"
|
|
||||||
}
|
|
||||||
"###);
|
|
||||||
|
|
||||||
let (response, code) = server.tasks_filter("reverse=*").await;
|
|
||||||
snapshot!(code, @"400 Bad Request");
|
|
||||||
snapshot!(response, @r###"
|
|
||||||
{
|
|
||||||
"message": "Invalid value in parameter `reverse`: could not parse `*` as a boolean, expected either `true` or `false`",
|
|
||||||
"code": "invalid_task_reverse",
|
|
||||||
"type": "invalid_request",
|
|
||||||
"link": "https://docs.meilisearch.com/errors#invalid_task_reverse"
|
|
||||||
}
|
|
||||||
"###);
|
|
||||||
|
|
||||||
let (response, code) = server.cancel_tasks("reverse=doggo").await;
|
|
||||||
snapshot!(code, @"400 Bad Request");
|
|
||||||
snapshot!(response, @r#"
|
|
||||||
{
|
|
||||||
"message": "Unknown parameter `reverse`: expected one of `uids`, `batchUids`, `canceledBy`, `types`, `statuses`, `indexUids`, `afterEnqueuedAt`, `beforeEnqueuedAt`, `afterStartedAt`, `beforeStartedAt`, `afterFinishedAt`, `beforeFinishedAt`",
|
|
||||||
"code": "bad_request",
|
|
||||||
"type": "invalid_request",
|
|
||||||
"link": "https://docs.meilisearch.com/errors#bad_request"
|
|
||||||
}
|
|
||||||
"#);
|
|
||||||
|
|
||||||
let (response, code) = server.delete_tasks("reverse=doggo").await;
|
|
||||||
snapshot!(code, @"400 Bad Request");
|
|
||||||
snapshot!(response, @r#"
|
|
||||||
{
|
|
||||||
"message": "Unknown parameter `reverse`: expected one of `uids`, `batchUids`, `canceledBy`, `types`, `statuses`, `indexUids`, `afterEnqueuedAt`, `beforeEnqueuedAt`, `afterStartedAt`, `beforeStartedAt`, `afterFinishedAt`, `beforeFinishedAt`",
|
|
||||||
"code": "bad_request",
|
|
||||||
"type": "invalid_request",
|
|
||||||
"link": "https://docs.meilisearch.com/errors#bad_request"
|
|
||||||
}
|
|
||||||
"#);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn task_bad_after_enqueued_at() {
|
async fn task_bad_after_enqueued_at() {
|
||||||
let server = Server::new_shared();
|
let server = Server::new_shared();
|
||||||
|
@ -62,44 +62,6 @@ async fn list_tasks() {
|
|||||||
assert_eq!(response["results"].as_array().unwrap().len(), 2);
|
assert_eq!(response["results"].as_array().unwrap().len(), 2);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
|
||||||
async fn list_tasks_pagination_and_reverse() {
|
|
||||||
let server = Server::new().await;
|
|
||||||
// First of all we want to create a lot of tasks very quickly. The fastest way is to delete a lot of unexisting indexes
|
|
||||||
let mut last_task = None;
|
|
||||||
for i in 0..10 {
|
|
||||||
let index = server.index(format!("test-{i}"));
|
|
||||||
last_task = Some(index.create(None).await.0.uid());
|
|
||||||
}
|
|
||||||
server.wait_task(last_task.unwrap()).await;
|
|
||||||
|
|
||||||
let (response, code) = server.tasks_filter("limit=3").await;
|
|
||||||
assert_eq!(code, 200);
|
|
||||||
let results = response["results"].as_array().unwrap();
|
|
||||||
let task_ids: Vec<_> = results.iter().map(|ret| ret["uid"].as_u64().unwrap()).collect();
|
|
||||||
snapshot!(format!("{task_ids:?}"), @"[9, 8, 7]");
|
|
||||||
|
|
||||||
let (response, code) = server.tasks_filter("limit=3&from=1").await;
|
|
||||||
assert_eq!(code, 200);
|
|
||||||
let results = response["results"].as_array().unwrap();
|
|
||||||
let task_ids: Vec<_> = results.iter().map(|ret| ret["uid"].as_u64().unwrap()).collect();
|
|
||||||
snapshot!(format!("{task_ids:?}"), @"[1, 0]");
|
|
||||||
|
|
||||||
// In reversed order
|
|
||||||
|
|
||||||
let (response, code) = server.tasks_filter("limit=3&reverse=true").await;
|
|
||||||
assert_eq!(code, 200);
|
|
||||||
let results = response["results"].as_array().unwrap();
|
|
||||||
let task_ids: Vec<_> = results.iter().map(|ret| ret["uid"].as_u64().unwrap()).collect();
|
|
||||||
snapshot!(format!("{task_ids:?}"), @"[0, 1, 2]");
|
|
||||||
|
|
||||||
let (response, code) = server.tasks_filter("limit=3&from=8&reverse=true").await;
|
|
||||||
assert_eq!(code, 200);
|
|
||||||
let results = response["results"].as_array().unwrap();
|
|
||||||
let task_ids: Vec<_> = results.iter().map(|ret| ret["uid"].as_u64().unwrap()).collect();
|
|
||||||
snapshot!(format!("{task_ids:?}"), @"[8, 9]");
|
|
||||||
}
|
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn list_tasks_with_star_filters() {
|
async fn list_tasks_with_star_filters() {
|
||||||
let server = Server::new().await;
|
let server = Server::new().await;
|
||||||
@ -231,6 +193,131 @@ async fn list_tasks_status_and_type_filtered() {
|
|||||||
assert_eq!(response["results"].as_array().unwrap().len(), 2);
|
assert_eq!(response["results"].as_array().unwrap().len(), 2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[actix_rt::test]
|
||||||
|
async fn get_task_filter_error() {
|
||||||
|
let server = Server::new().await;
|
||||||
|
|
||||||
|
let (response, code) = server.tasks_filter("lol=pied").await;
|
||||||
|
assert_eq!(code, 400, "{}", response);
|
||||||
|
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
|
||||||
|
{
|
||||||
|
"message": "Unknown parameter `lol`: expected one of `limit`, `from`, `batchUids`, `uids`, `canceledBy`, `types`, `statuses`, `indexUids`, `afterEnqueuedAt`, `beforeEnqueuedAt`, `afterStartedAt`, `beforeStartedAt`, `afterFinishedAt`, `beforeFinishedAt`",
|
||||||
|
"code": "bad_request",
|
||||||
|
"type": "invalid_request",
|
||||||
|
"link": "https://docs.meilisearch.com/errors#bad_request"
|
||||||
|
}
|
||||||
|
"###);
|
||||||
|
|
||||||
|
let (response, code) = server.tasks_filter("uids=pied").await;
|
||||||
|
assert_eq!(code, 400, "{}", response);
|
||||||
|
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
|
||||||
|
{
|
||||||
|
"message": "Invalid value in parameter `uids`: could not parse `pied` as a positive integer",
|
||||||
|
"code": "invalid_task_uids",
|
||||||
|
"type": "invalid_request",
|
||||||
|
"link": "https://docs.meilisearch.com/errors#invalid_task_uids"
|
||||||
|
}
|
||||||
|
"###);
|
||||||
|
|
||||||
|
let (response, code) = server.tasks_filter("from=pied").await;
|
||||||
|
assert_eq!(code, 400, "{}", response);
|
||||||
|
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
|
||||||
|
{
|
||||||
|
"message": "Invalid value in parameter `from`: could not parse `pied` as a positive integer",
|
||||||
|
"code": "invalid_task_from",
|
||||||
|
"type": "invalid_request",
|
||||||
|
"link": "https://docs.meilisearch.com/errors#invalid_task_from"
|
||||||
|
}
|
||||||
|
"###);
|
||||||
|
|
||||||
|
let (response, code) = server.tasks_filter("beforeStartedAt=pied").await;
|
||||||
|
assert_eq!(code, 400, "{}", response);
|
||||||
|
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
|
||||||
|
{
|
||||||
|
"message": "Invalid value in parameter `beforeStartedAt`: `pied` is an invalid date-time. It should follow the YYYY-MM-DD or RFC 3339 date-time format.",
|
||||||
|
"code": "invalid_task_before_started_at",
|
||||||
|
"type": "invalid_request",
|
||||||
|
"link": "https://docs.meilisearch.com/errors#invalid_task_before_started_at"
|
||||||
|
}
|
||||||
|
"###);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[actix_rt::test]
|
||||||
|
async fn delete_task_filter_error() {
|
||||||
|
let server = Server::new().await;
|
||||||
|
|
||||||
|
let (response, code) = server.delete_tasks("").await;
|
||||||
|
assert_eq!(code, 400, "{}", response);
|
||||||
|
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
|
||||||
|
{
|
||||||
|
"message": "Query parameters to filter the tasks to delete are missing. Available query parameters are: `uids`, `indexUids`, `statuses`, `types`, `canceledBy`, `beforeEnqueuedAt`, `afterEnqueuedAt`, `beforeStartedAt`, `afterStartedAt`, `beforeFinishedAt`, `afterFinishedAt`.",
|
||||||
|
"code": "missing_task_filters",
|
||||||
|
"type": "invalid_request",
|
||||||
|
"link": "https://docs.meilisearch.com/errors#missing_task_filters"
|
||||||
|
}
|
||||||
|
"###);
|
||||||
|
|
||||||
|
let (response, code) = server.delete_tasks("lol=pied").await;
|
||||||
|
assert_eq!(code, 400, "{}", response);
|
||||||
|
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
|
||||||
|
{
|
||||||
|
"message": "Unknown parameter `lol`: expected one of `uids`, `batchUids`, `canceledBy`, `types`, `statuses`, `indexUids`, `afterEnqueuedAt`, `beforeEnqueuedAt`, `afterStartedAt`, `beforeStartedAt`, `afterFinishedAt`, `beforeFinishedAt`",
|
||||||
|
"code": "bad_request",
|
||||||
|
"type": "invalid_request",
|
||||||
|
"link": "https://docs.meilisearch.com/errors#bad_request"
|
||||||
|
}
|
||||||
|
"###);
|
||||||
|
|
||||||
|
let (response, code) = server.delete_tasks("uids=pied").await;
|
||||||
|
assert_eq!(code, 400, "{}", response);
|
||||||
|
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
|
||||||
|
{
|
||||||
|
"message": "Invalid value in parameter `uids`: could not parse `pied` as a positive integer",
|
||||||
|
"code": "invalid_task_uids",
|
||||||
|
"type": "invalid_request",
|
||||||
|
"link": "https://docs.meilisearch.com/errors#invalid_task_uids"
|
||||||
|
}
|
||||||
|
"###);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[actix_rt::test]
|
||||||
|
async fn cancel_task_filter_error() {
|
||||||
|
let server = Server::new().await;
|
||||||
|
|
||||||
|
let (response, code) = server.cancel_tasks("").await;
|
||||||
|
assert_eq!(code, 400, "{}", response);
|
||||||
|
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
|
||||||
|
{
|
||||||
|
"message": "Query parameters to filter the tasks to cancel are missing. Available query parameters are: `uids`, `indexUids`, `statuses`, `types`, `canceledBy`, `beforeEnqueuedAt`, `afterEnqueuedAt`, `beforeStartedAt`, `afterStartedAt`, `beforeFinishedAt`, `afterFinishedAt`.",
|
||||||
|
"code": "missing_task_filters",
|
||||||
|
"type": "invalid_request",
|
||||||
|
"link": "https://docs.meilisearch.com/errors#missing_task_filters"
|
||||||
|
}
|
||||||
|
"###);
|
||||||
|
|
||||||
|
let (response, code) = server.cancel_tasks("lol=pied").await;
|
||||||
|
assert_eq!(code, 400, "{}", response);
|
||||||
|
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
|
||||||
|
{
|
||||||
|
"message": "Unknown parameter `lol`: expected one of `uids`, `batchUids`, `canceledBy`, `types`, `statuses`, `indexUids`, `afterEnqueuedAt`, `beforeEnqueuedAt`, `afterStartedAt`, `beforeStartedAt`, `afterFinishedAt`, `beforeFinishedAt`",
|
||||||
|
"code": "bad_request",
|
||||||
|
"type": "invalid_request",
|
||||||
|
"link": "https://docs.meilisearch.com/errors#bad_request"
|
||||||
|
}
|
||||||
|
"###);
|
||||||
|
|
||||||
|
let (response, code) = server.cancel_tasks("uids=pied").await;
|
||||||
|
assert_eq!(code, 400, "{}", response);
|
||||||
|
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
|
||||||
|
{
|
||||||
|
"message": "Invalid value in parameter `uids`: could not parse `pied` as a positive integer",
|
||||||
|
"code": "invalid_task_uids",
|
||||||
|
"type": "invalid_request",
|
||||||
|
"link": "https://docs.meilisearch.com/errors#invalid_task_uids"
|
||||||
|
}
|
||||||
|
"###);
|
||||||
|
}
|
||||||
|
|
||||||
macro_rules! assert_valid_summarized_task {
|
macro_rules! assert_valid_summarized_task {
|
||||||
($response:expr, $task_type:literal, $index:literal) => {{
|
($response:expr, $task_type:literal, $index:literal) => {{
|
||||||
assert_eq!($response.as_object().unwrap().len(), 5);
|
assert_eq!($response.as_object().unwrap().len(), 5);
|
||||||
|
@ -76,6 +76,13 @@ fn update_index_stats(
|
|||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let ctx = || format!("while updating index stats for index `{index_uid}`");
|
let ctx = || format!("while updating index stats for index `{index_uid}`");
|
||||||
|
|
||||||
|
let stats: Option<&str> = index_stats
|
||||||
|
.remap_data_type::<Str>()
|
||||||
|
.get(sched_wtxn, &index_uuid)
|
||||||
|
.with_context(ctx)
|
||||||
|
.with_context(|| "While reading value")?;
|
||||||
|
dbg!(stats);
|
||||||
|
|
||||||
let stats: Option<v1_9::IndexStats> = index_stats
|
let stats: Option<v1_9::IndexStats> = index_stats
|
||||||
.remap_data_type::<SerdeJson<v1_9::IndexStats>>()
|
.remap_data_type::<SerdeJson<v1_9::IndexStats>>()
|
||||||
.get(sched_wtxn, &index_uuid)
|
.get(sched_wtxn, &index_uuid)
|
||||||
|
@ -105,8 +105,6 @@ impl FormatOptions {
|
|||||||
pub struct MatchBounds {
|
pub struct MatchBounds {
|
||||||
pub start: usize,
|
pub start: usize,
|
||||||
pub length: usize,
|
pub length: usize,
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
|
||||||
pub indices: Option<Vec<usize>>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Structure used to analyze a string, compute words that match,
|
/// Structure used to analyze a string, compute words that match,
|
||||||
@ -222,20 +220,15 @@ impl<'t, 'tokenizer> Matcher<'t, 'tokenizer, '_, '_> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Returns boundaries of the words that match the query.
|
/// Returns boundaries of the words that match the query.
|
||||||
pub fn matches(&mut self, array_indices: &[usize]) -> Vec<MatchBounds> {
|
pub fn matches(&mut self) -> Vec<MatchBounds> {
|
||||||
match &self.matches {
|
match &self.matches {
|
||||||
None => self.compute_matches().matches(array_indices),
|
None => self.compute_matches().matches(),
|
||||||
Some((tokens, matches)) => matches
|
Some((tokens, matches)) => matches
|
||||||
.iter()
|
.iter()
|
||||||
.map(|m| MatchBounds {
|
.map(|m| MatchBounds {
|
||||||
start: tokens[m.get_first_token_pos()].byte_start,
|
start: tokens[m.get_first_token_pos()].byte_start,
|
||||||
// TODO: Why is this in chars, while start is in bytes?
|
// TODO: Why is this in chars, while start is in bytes?
|
||||||
length: m.char_count,
|
length: m.char_count,
|
||||||
indices: if array_indices.is_empty() {
|
|
||||||
None
|
|
||||||
} else {
|
|
||||||
Some(array_indices.to_owned())
|
|
||||||
},
|
|
||||||
})
|
})
|
||||||
.collect(),
|
.collect(),
|
||||||
}
|
}
|
||||||
|
@ -45,7 +45,7 @@ fn contained_in(selector: &str, key: &str) -> bool {
|
|||||||
/// map_leaf_values(
|
/// map_leaf_values(
|
||||||
/// value.as_object_mut().unwrap(),
|
/// value.as_object_mut().unwrap(),
|
||||||
/// ["jean.race.name"],
|
/// ["jean.race.name"],
|
||||||
/// |key, _array_indices, value| match (value, key) {
|
/// |key, value| match (value, key) {
|
||||||
/// (Value::String(name), "jean.race.name") => *name = "patou".to_string(),
|
/// (Value::String(name), "jean.race.name") => *name = "patou".to_string(),
|
||||||
/// _ => unreachable!(),
|
/// _ => unreachable!(),
|
||||||
/// },
|
/// },
|
||||||
@ -66,18 +66,17 @@ fn contained_in(selector: &str, key: &str) -> bool {
|
|||||||
pub fn map_leaf_values<'a>(
|
pub fn map_leaf_values<'a>(
|
||||||
value: &mut Map<String, Value>,
|
value: &mut Map<String, Value>,
|
||||||
selectors: impl IntoIterator<Item = &'a str>,
|
selectors: impl IntoIterator<Item = &'a str>,
|
||||||
mut mapper: impl FnMut(&str, &[usize], &mut Value),
|
mut mapper: impl FnMut(&str, &mut Value),
|
||||||
) {
|
) {
|
||||||
let selectors: Vec<_> = selectors.into_iter().collect();
|
let selectors: Vec<_> = selectors.into_iter().collect();
|
||||||
map_leaf_values_in_object(value, &selectors, "", &[], &mut mapper);
|
map_leaf_values_in_object(value, &selectors, "", &mut mapper);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn map_leaf_values_in_object(
|
pub fn map_leaf_values_in_object(
|
||||||
value: &mut Map<String, Value>,
|
value: &mut Map<String, Value>,
|
||||||
selectors: &[&str],
|
selectors: &[&str],
|
||||||
base_key: &str,
|
base_key: &str,
|
||||||
array_indices: &[usize],
|
mapper: &mut impl FnMut(&str, &mut Value),
|
||||||
mapper: &mut impl FnMut(&str, &[usize], &mut Value),
|
|
||||||
) {
|
) {
|
||||||
for (key, value) in value.iter_mut() {
|
for (key, value) in value.iter_mut() {
|
||||||
let base_key = if base_key.is_empty() {
|
let base_key = if base_key.is_empty() {
|
||||||
@ -95,12 +94,12 @@ pub fn map_leaf_values_in_object(
|
|||||||
if should_continue {
|
if should_continue {
|
||||||
match value {
|
match value {
|
||||||
Value::Object(object) => {
|
Value::Object(object) => {
|
||||||
map_leaf_values_in_object(object, selectors, &base_key, array_indices, mapper)
|
map_leaf_values_in_object(object, selectors, &base_key, mapper)
|
||||||
}
|
}
|
||||||
Value::Array(array) => {
|
Value::Array(array) => {
|
||||||
map_leaf_values_in_array(array, selectors, &base_key, array_indices, mapper)
|
map_leaf_values_in_array(array, selectors, &base_key, mapper)
|
||||||
}
|
}
|
||||||
value => mapper(&base_key, array_indices, value),
|
value => mapper(&base_key, value),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -110,24 +109,13 @@ pub fn map_leaf_values_in_array(
|
|||||||
values: &mut [Value],
|
values: &mut [Value],
|
||||||
selectors: &[&str],
|
selectors: &[&str],
|
||||||
base_key: &str,
|
base_key: &str,
|
||||||
base_array_indices: &[usize],
|
mapper: &mut impl FnMut(&str, &mut Value),
|
||||||
mapper: &mut impl FnMut(&str, &[usize], &mut Value),
|
|
||||||
) {
|
) {
|
||||||
// This avoids allocating twice
|
for value in values.iter_mut() {
|
||||||
let mut array_indices = Vec::with_capacity(base_array_indices.len() + 1);
|
|
||||||
array_indices.extend_from_slice(base_array_indices);
|
|
||||||
array_indices.push(0);
|
|
||||||
|
|
||||||
for (i, value) in values.iter_mut().enumerate() {
|
|
||||||
*array_indices.last_mut().unwrap() = i;
|
|
||||||
match value {
|
match value {
|
||||||
Value::Object(object) => {
|
Value::Object(object) => map_leaf_values_in_object(object, selectors, base_key, mapper),
|
||||||
map_leaf_values_in_object(object, selectors, base_key, &array_indices, mapper)
|
Value::Array(array) => map_leaf_values_in_array(array, selectors, base_key, mapper),
|
||||||
}
|
value => mapper(base_key, value),
|
||||||
Value::Array(array) => {
|
|
||||||
map_leaf_values_in_array(array, selectors, base_key, &array_indices, mapper)
|
|
||||||
}
|
|
||||||
value => mapper(base_key, &array_indices, value),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -755,14 +743,12 @@ mod tests {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
map_leaf_values(
|
map_leaf_values(value.as_object_mut().unwrap(), ["jean.race.name"], |key, value| {
|
||||||
value.as_object_mut().unwrap(),
|
match (value, key) {
|
||||||
["jean.race.name"],
|
|
||||||
|key, _, value| match (value, key) {
|
|
||||||
(Value::String(name), "jean.race.name") => *name = S("patou"),
|
(Value::String(name), "jean.race.name") => *name = S("patou"),
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
},
|
}
|
||||||
);
|
});
|
||||||
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
value,
|
value,
|
||||||
@ -789,7 +775,7 @@ mod tests {
|
|||||||
});
|
});
|
||||||
|
|
||||||
let mut calls = 0;
|
let mut calls = 0;
|
||||||
map_leaf_values(value.as_object_mut().unwrap(), ["jean"], |key, _, value| {
|
map_leaf_values(value.as_object_mut().unwrap(), ["jean"], |key, value| {
|
||||||
calls += 1;
|
calls += 1;
|
||||||
match (value, key) {
|
match (value, key) {
|
||||||
(Value::String(name), "jean.race.name") => *name = S("patou"),
|
(Value::String(name), "jean.race.name") => *name = S("patou"),
|
||||||
@ -812,52 +798,4 @@ mod tests {
|
|||||||
})
|
})
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn map_array() {
|
|
||||||
let mut value: Value = json!({
|
|
||||||
"no_array": "peter",
|
|
||||||
"simple": ["foo", "bar"],
|
|
||||||
"nested": [
|
|
||||||
{
|
|
||||||
"a": [
|
|
||||||
["cat", "dog"],
|
|
||||||
["fox", "bear"],
|
|
||||||
],
|
|
||||||
"b": "hi",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"a": ["green", "blue"],
|
|
||||||
},
|
|
||||||
],
|
|
||||||
});
|
|
||||||
|
|
||||||
map_leaf_values(
|
|
||||||
value.as_object_mut().unwrap(),
|
|
||||||
["no_array", "simple", "nested"],
|
|
||||||
|_key, array_indices, value| {
|
|
||||||
*value = format!("{array_indices:?}").into();
|
|
||||||
},
|
|
||||||
);
|
|
||||||
|
|
||||||
assert_eq!(
|
|
||||||
value,
|
|
||||||
json!({
|
|
||||||
"no_array": "[]",
|
|
||||||
"simple": ["[0]", "[1]"],
|
|
||||||
"nested": [
|
|
||||||
{
|
|
||||||
"a": [
|
|
||||||
["[0, 0, 0]", "[0, 0, 1]"],
|
|
||||||
["[0, 1, 0]", "[0, 1, 1]"],
|
|
||||||
],
|
|
||||||
"b": "[0]",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"a": ["[1, 0]", "[1, 1]"],
|
|
||||||
},
|
|
||||||
],
|
|
||||||
})
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user