fixes a lot of small issue, the test about the cancellation is still failing

This commit is contained in:
Tamo 2024-11-18 16:53:55 +01:00
parent 2df4eeda74
commit b5bea0cf56
No known key found for this signature in database
GPG Key ID: 20CD8020AFA88D69
13 changed files with 821 additions and 197 deletions

View File

@ -24,7 +24,6 @@ use std::fs::{self, File};
use std::io::BufWriter; use std::io::BufWriter;
use dump::IndexMetadata; use dump::IndexMetadata;
use meilisearch_types::batches::BatchId;
use meilisearch_types::error::Code; use meilisearch_types::error::Code;
use meilisearch_types::heed::{RoTxn, RwTxn}; use meilisearch_types::heed::{RoTxn, RwTxn};
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader}; use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader};
@ -45,7 +44,7 @@ use time::OffsetDateTime;
use uuid::Uuid; use uuid::Uuid;
use crate::autobatcher::{self, BatchKind}; use crate::autobatcher::{self, BatchKind};
use crate::utils::{self, swap_index_uid_in_task}; use crate::utils::{self, swap_index_uid_in_task, ProcessingBatch};
use crate::{Error, IndexScheduler, MustStopProcessing, ProcessingTasks, Result, TaskId}; use crate::{Error, IndexScheduler, MustStopProcessing, ProcessingTasks, Result, TaskId};
/// Represents a combination of tasks that can all be processed at the same time. /// Represents a combination of tasks that can all be processed at the same time.
@ -280,22 +279,24 @@ impl IndexScheduler {
rtxn: &RoTxn, rtxn: &RoTxn,
index_uid: String, index_uid: String,
batch: BatchKind, batch: BatchKind,
batch_id: BatchId, current_batch: &mut ProcessingBatch,
must_create_index: bool, must_create_index: bool,
) -> Result<Option<Batch>> { ) -> Result<Option<Batch>> {
match batch { match batch {
BatchKind::DocumentClear { ids } => Ok(Some(Batch::IndexOperation { BatchKind::DocumentClear { ids } => Ok(Some(Batch::IndexOperation {
op: IndexOperation::DocumentClear { op: IndexOperation::DocumentClear {
tasks: self.get_existing_tasks_with_batch_id(rtxn, batch_id, ids)?, tasks: self.get_existing_tasks_with_processing_batch(
rtxn,
current_batch,
ids,
)?,
index_uid, index_uid,
}, },
must_create_index, must_create_index,
})), })),
BatchKind::DocumentEdition { id } => { BatchKind::DocumentEdition { id } => {
let task = self let mut task = self.get_task(rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?;
.get_task(rtxn, id)? current_batch.processing(Some(&mut task));
.ok_or(Error::CorruptedTaskQueue)?
.with_batch_id(batch_id);
match &task.kind { match &task.kind {
KindWithContent::DocumentEdition { index_uid, .. } => { KindWithContent::DocumentEdition { index_uid, .. } => {
Ok(Some(Batch::IndexOperation { Ok(Some(Batch::IndexOperation {
@ -310,7 +311,11 @@ impl IndexScheduler {
} }
} }
BatchKind::DocumentOperation { method, operation_ids, .. } => { BatchKind::DocumentOperation { method, operation_ids, .. } => {
let tasks = self.get_existing_tasks_with_batch_id(rtxn, batch_id, operation_ids)?; let tasks = self.get_existing_tasks_with_processing_batch(
rtxn,
current_batch,
operation_ids,
)?;
let primary_key = tasks let primary_key = tasks
.iter() .iter()
.find_map(|task| match task.kind { .find_map(|task| match task.kind {
@ -357,7 +362,11 @@ impl IndexScheduler {
})) }))
} }
BatchKind::DocumentDeletion { deletion_ids, includes_by_filter: _ } => { BatchKind::DocumentDeletion { deletion_ids, includes_by_filter: _ } => {
let tasks = self.get_existing_tasks_with_batch_id(rtxn, batch_id, deletion_ids)?; let tasks = self.get_existing_tasks_with_processing_batch(
rtxn,
current_batch,
deletion_ids,
)?;
Ok(Some(Batch::IndexOperation { Ok(Some(Batch::IndexOperation {
op: IndexOperation::DocumentDeletion { index_uid, tasks }, op: IndexOperation::DocumentDeletion { index_uid, tasks },
@ -365,7 +374,11 @@ impl IndexScheduler {
})) }))
} }
BatchKind::Settings { settings_ids, .. } => { BatchKind::Settings { settings_ids, .. } => {
let tasks = self.get_existing_tasks_with_batch_id(rtxn, batch_id, settings_ids)?; let tasks = self.get_existing_tasks_with_processing_batch(
rtxn,
current_batch,
settings_ids,
)?;
let mut settings = Vec::new(); let mut settings = Vec::new();
for task in &tasks { for task in &tasks {
@ -388,7 +401,7 @@ impl IndexScheduler {
rtxn, rtxn,
index_uid, index_uid,
BatchKind::Settings { settings_ids, allow_index_creation }, BatchKind::Settings { settings_ids, allow_index_creation },
batch_id, current_batch,
must_create_index, must_create_index,
)? )?
.unwrap() .unwrap()
@ -404,7 +417,7 @@ impl IndexScheduler {
rtxn, rtxn,
index_uid, index_uid,
BatchKind::DocumentClear { ids: other }, BatchKind::DocumentClear { ids: other },
batch_id, current_batch,
must_create_index, must_create_index,
)? )?
.unwrap() .unwrap()
@ -437,7 +450,7 @@ impl IndexScheduler {
rtxn, rtxn,
index_uid.clone(), index_uid.clone(),
BatchKind::Settings { settings_ids, allow_index_creation }, BatchKind::Settings { settings_ids, allow_index_creation },
batch_id, current_batch,
must_create_index, must_create_index,
)?; )?;
@ -450,7 +463,7 @@ impl IndexScheduler {
primary_key, primary_key,
operation_ids, operation_ids,
}, },
batch_id, current_batch,
must_create_index, must_create_index,
)?; )?;
@ -488,10 +501,8 @@ impl IndexScheduler {
} }
} }
BatchKind::IndexCreation { id } => { BatchKind::IndexCreation { id } => {
let task = self let mut task = self.get_task(rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?;
.get_task(rtxn, id)? current_batch.processing(Some(&mut task));
.ok_or(Error::CorruptedTaskQueue)?
.with_batch_id(batch_id);
let (index_uid, primary_key) = match &task.kind { let (index_uid, primary_key) = match &task.kind {
KindWithContent::IndexCreation { index_uid, primary_key } => { KindWithContent::IndexCreation { index_uid, primary_key } => {
(index_uid.clone(), primary_key.clone()) (index_uid.clone(), primary_key.clone())
@ -501,10 +512,8 @@ impl IndexScheduler {
Ok(Some(Batch::IndexCreation { index_uid, primary_key, task })) Ok(Some(Batch::IndexCreation { index_uid, primary_key, task }))
} }
BatchKind::IndexUpdate { id } => { BatchKind::IndexUpdate { id } => {
let task = self let mut task = self.get_task(rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?;
.get_task(rtxn, id)? current_batch.processing(Some(&mut task));
.ok_or(Error::CorruptedTaskQueue)?
.with_batch_id(batch_id);
let primary_key = match &task.kind { let primary_key = match &task.kind {
KindWithContent::IndexUpdate { primary_key, .. } => primary_key.clone(), KindWithContent::IndexUpdate { primary_key, .. } => primary_key.clone(),
_ => unreachable!(), _ => unreachable!(),
@ -514,13 +523,11 @@ impl IndexScheduler {
BatchKind::IndexDeletion { ids } => Ok(Some(Batch::IndexDeletion { BatchKind::IndexDeletion { ids } => Ok(Some(Batch::IndexDeletion {
index_uid, index_uid,
index_has_been_created: must_create_index, index_has_been_created: must_create_index,
tasks: self.get_existing_tasks_with_batch_id(rtxn, batch_id, ids)?, tasks: self.get_existing_tasks_with_processing_batch(rtxn, current_batch, ids)?,
})), })),
BatchKind::IndexSwap { id } => { BatchKind::IndexSwap { id } => {
let task = self let mut task = self.get_task(rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?;
.get_task(rtxn, id)? current_batch.processing(Some(&mut task));
.ok_or(Error::CorruptedTaskQueue)?
.with_batch_id(batch_id);
Ok(Some(Batch::IndexSwap { task })) Ok(Some(Batch::IndexSwap { task }))
} }
} }
@ -533,11 +540,16 @@ impl IndexScheduler {
/// 4. We get the *next* dump to process. /// 4. We get the *next* dump to process.
/// 5. We get the *next* tasks to process for a specific index. /// 5. We get the *next* tasks to process for a specific index.
#[tracing::instrument(level = "trace", skip(self, rtxn), target = "indexing::scheduler")] #[tracing::instrument(level = "trace", skip(self, rtxn), target = "indexing::scheduler")]
pub(crate) fn create_next_batch(&self, rtxn: &RoTxn) -> Result<Option<(Batch, BatchId)>> { pub(crate) fn create_next_batch(
&self,
rtxn: &RoTxn,
) -> Result<Option<(Batch, ProcessingBatch)>> {
#[cfg(test)] #[cfg(test)]
self.maybe_fail(crate::tests::FailureLocation::InsideCreateBatch)?; self.maybe_fail(crate::tests::FailureLocation::InsideCreateBatch)?;
let batch_id = self.next_batch_id(rtxn)?; let batch_id = self.next_batch_id(rtxn)?;
let mut current_batch = ProcessingBatch::new(batch_id);
let enqueued = &self.get_status(rtxn, Status::Enqueued)?; let enqueued = &self.get_status(rtxn, Status::Enqueued)?;
let to_cancel = self.get_kind(rtxn, Kind::TaskCancelation)? & enqueued; let to_cancel = self.get_kind(rtxn, Kind::TaskCancelation)? & enqueued;
@ -547,63 +559,51 @@ impl IndexScheduler {
// We must *not* reset the processing tasks before calling this method. // We must *not* reset the processing tasks before calling this method.
// Displaying the `batch_id` would make a strange error message since this task cancelation is going to // Displaying the `batch_id` would make a strange error message since this task cancelation is going to
// replace the canceled batch. It's better to avoid mentioning it in the error message. // replace the canceled batch. It's better to avoid mentioning it in the error message.
let ProcessingTasks { started_at, batch_id: _, processing } = let ProcessingTasks { batch: previous_batch, processing } =
&*self.processing_tasks.read().unwrap(); &*self.processing_tasks.read().unwrap();
let mut task = self.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
current_batch.processing(Some(&mut task));
return Ok(Some(( return Ok(Some((
Batch::TaskCancelation { Batch::TaskCancelation {
task: self task,
.get_task(rtxn, task_id)? // We should never be in a case where we don't have a previous_batch, but let's not crash if it happens
.ok_or(Error::CorruptedTaskQueue)? previous_started_at: previous_batch
.with_batch_id(batch_id), .as_ref()
previous_started_at: *started_at, .map_or_else(OffsetDateTime::now_utc, |batch| batch.started_at),
previous_processing_tasks: processing.clone(), previous_processing_tasks: processing.clone(),
}, },
batch_id, current_batch,
))); )));
} }
// 2. we get the next task to delete // 2. we get the next task to delete
let to_delete = self.get_kind(rtxn, Kind::TaskDeletion)? & enqueued; let to_delete = self.get_kind(rtxn, Kind::TaskDeletion)? & enqueued;
if !to_delete.is_empty() { if !to_delete.is_empty() {
let tasks = self let mut tasks = self.get_existing_tasks(rtxn, to_delete)?;
.get_existing_tasks(rtxn, to_delete)? current_batch.processing(&mut tasks);
.into_iter() return Ok(Some((Batch::TaskDeletions(tasks), current_batch)));
.map(|task| task.with_batch_id(batch_id))
.collect();
return Ok(Some((Batch::TaskDeletions(tasks), batch_id)));
} }
// 3. we batch the snapshot. // 3. we batch the snapshot.
let to_snapshot = self.get_kind(rtxn, Kind::SnapshotCreation)? & enqueued; let to_snapshot = self.get_kind(rtxn, Kind::SnapshotCreation)? & enqueued;
if !to_snapshot.is_empty() { if !to_snapshot.is_empty() {
return Ok(Some(( let mut tasks = self.get_existing_tasks(rtxn, to_snapshot)?;
Batch::SnapshotCreation( current_batch.processing(&mut tasks);
self.get_existing_tasks(rtxn, to_snapshot)? return Ok(Some((Batch::SnapshotCreation(tasks), current_batch)));
.into_iter()
.map(|task| task.with_batch_id(batch_id))
.collect(),
),
batch_id,
)));
} }
// 4. we batch the dumps. // 4. we batch the dumps.
let to_dump = self.get_kind(rtxn, Kind::DumpCreation)? & enqueued; let to_dump = self.get_kind(rtxn, Kind::DumpCreation)? & enqueued;
if let Some(to_dump) = to_dump.min() { if let Some(to_dump) = to_dump.min() {
return Ok(Some(( let mut task = self.get_task(rtxn, to_dump)?.ok_or(Error::CorruptedTaskQueue)?;
Batch::Dump( current_batch.processing(Some(&mut task));
self.get_task(rtxn, to_dump)? return Ok(Some((Batch::Dump(task), current_batch)));
.ok_or(Error::CorruptedTaskQueue)?
.with_batch_id(batch_id),
),
batch_id,
)));
} }
// 5. We make a batch from the unprioritised tasks. Start by taking the next enqueued task. // 5. We make a batch from the unprioritised tasks. Start by taking the next enqueued task.
let task_id = if let Some(task_id) = enqueued.min() { task_id } else { return Ok(None) }; let task_id = if let Some(task_id) = enqueued.min() { task_id } else { return Ok(None) };
let task = let mut task = self.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
self.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?.with_batch_id(batch_id); current_batch.processing(Some(&mut task));
// If the task is not associated with any index, verify that it is an index swap and // If the task is not associated with any index, verify that it is an index swap and
// create the batch directly. Otherwise, get the index name associated with the task // create the batch directly. Otherwise, get the index name associated with the task
@ -613,7 +613,7 @@ impl IndexScheduler {
index_name index_name
} else { } else {
assert!(matches!(&task.kind, KindWithContent::IndexSwap { swaps } if swaps.is_empty())); assert!(matches!(&task.kind, KindWithContent::IndexSwap { swaps } if swaps.is_empty()));
return Ok(Some((Batch::IndexSwap { task }, batch_id))); return Ok(Some((Batch::IndexSwap { task }, current_batch)));
}; };
let index_already_exists = self.index_mapper.exists(rtxn, index_name)?; let index_already_exists = self.index_mapper.exists(rtxn, index_name)?;
@ -649,10 +649,10 @@ impl IndexScheduler {
rtxn, rtxn,
index_name.to_string(), index_name.to_string(),
batchkind, batchkind,
batch_id, &mut current_batch,
create_index, create_index,
)? )?
.map(|batch| (batch, batch_id))); .map(|batch| (batch, current_batch)));
} }
// If we found no tasks then we were notified for something that got autobatched // If we found no tasks then we were notified for something that got autobatched

View File

@ -68,7 +68,10 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
let processing = processing_tasks.read().unwrap().clone(); let processing = processing_tasks.read().unwrap().clone();
snap.push_str(&format!("### Autobatching Enabled = {autobatching_enabled}\n")); snap.push_str(&format!("### Autobatching Enabled = {autobatching_enabled}\n"));
snap.push_str(&format!("### Processing batch {:?}:\n", processing.batch_id)); snap.push_str(&format!(
"### Processing batch {:?}:\n",
processing.batch.map(|batch| batch.uid)
));
snap.push_str(&snapshot_bitmap(&processing.processing)); snap.push_str(&snapshot_bitmap(&processing.processing));
snap.push_str("\n----------------------------------------------------------------------\n"); snap.push_str("\n----------------------------------------------------------------------\n");

View File

@ -71,7 +71,7 @@ use utils::{filter_out_references_to_newer_tasks, keep_tasks_within_datetimes, m
use uuid::Uuid; use uuid::Uuid;
use crate::index_mapper::IndexMapper; use crate::index_mapper::IndexMapper;
use crate::utils::{check_index_swap_validity, clamp_to_page_size, CachedBatch}; use crate::utils::{check_index_swap_validity, clamp_to_page_size, ProcessingBatch};
pub(crate) type BEI128 = I128<BE>; pub(crate) type BEI128 = I128<BE>;
@ -161,10 +161,7 @@ impl Query {
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
struct ProcessingTasks { struct ProcessingTasks {
/// The date and time at which the indexation started. batch: Option<ProcessingBatch>,
started_at: OffsetDateTime,
/// The id of the batch processing
batch_id: Option<BatchId>,
/// The list of tasks ids that are currently running. /// The list of tasks ids that are currently running.
processing: RoaringBitmap, processing: RoaringBitmap,
} }
@ -172,30 +169,19 @@ struct ProcessingTasks {
impl ProcessingTasks { impl ProcessingTasks {
/// Creates an empty `ProcessingAt` struct. /// Creates an empty `ProcessingAt` struct.
fn new() -> ProcessingTasks { fn new() -> ProcessingTasks {
ProcessingTasks { ProcessingTasks { batch: None, processing: RoaringBitmap::new() }
started_at: OffsetDateTime::now_utc(),
batch_id: None,
processing: RoaringBitmap::new(),
}
} }
/// Stores the currently processing tasks, and the date time at which it started. /// Stores the currently processing tasks, and the date time at which it started.
fn start_processing( fn start_processing(&mut self, processing_batch: ProcessingBatch, processing: RoaringBitmap) {
&mut self, self.batch = Some(processing_batch);
started_at: OffsetDateTime,
batch_id: BatchId,
processing: RoaringBitmap,
) {
self.started_at = started_at;
self.batch_id = Some(batch_id);
self.processing = processing; self.processing = processing;
} }
/// Set the processing tasks to an empty list /// Set the processing tasks to an empty list
fn stop_processing(&mut self) -> Self { fn stop_processing(&mut self) -> Self {
Self { Self {
started_at: self.started_at, batch: std::mem::take(&mut self.batch),
batch_id: self.batch_id.take(),
processing: std::mem::take(&mut self.processing), processing: std::mem::take(&mut self.processing),
} }
} }
@ -785,11 +771,8 @@ impl IndexScheduler {
/// Return the task ids matched by the given query from the index scheduler's point of view. /// Return the task ids matched by the given query from the index scheduler's point of view.
pub(crate) fn get_task_ids(&self, rtxn: &RoTxn, query: &Query) -> Result<RoaringBitmap> { pub(crate) fn get_task_ids(&self, rtxn: &RoTxn, query: &Query) -> Result<RoaringBitmap> {
let ProcessingTasks { let ProcessingTasks { batch: processing_batch, processing: processing_tasks } =
started_at: started_at_processing, self.processing_tasks.read().unwrap().clone();
processing: processing_tasks,
batch_id: current_batch_processing,
} = self.processing_tasks.read().unwrap().clone();
let Query { let Query {
limit, limit,
from, from,
@ -816,7 +799,7 @@ impl IndexScheduler {
if let Some(batch_uids) = batch_uids { if let Some(batch_uids) = batch_uids {
let mut batch_tasks = RoaringBitmap::new(); let mut batch_tasks = RoaringBitmap::new();
for batch_uid in batch_uids { for batch_uid in batch_uids {
if Some(*batch_uid) == current_batch_processing { if Some(*batch_uid) == processing_batch.as_ref().map(|batch| batch.uid) {
batch_tasks |= &processing_tasks; batch_tasks |= &processing_tasks;
} else { } else {
batch_tasks |= self.tasks_in_batch(rtxn, *batch_uid)?; batch_tasks |= self.tasks_in_batch(rtxn, *batch_uid)?;
@ -890,13 +873,15 @@ impl IndexScheduler {
// special case for Processing tasks // special case for Processing tasks
// A closure that clears the filtered_processing_tasks if their started_at date falls outside the given bounds // A closure that clears the filtered_processing_tasks if their started_at date falls outside the given bounds
let mut clear_filtered_processing_tasks = let clear_filtered_processing_tasks =
|start: Bound<OffsetDateTime>, end: Bound<OffsetDateTime>| { |start: Bound<OffsetDateTime>, end: Bound<OffsetDateTime>| {
let start = map_bound(start, |b| b.unix_timestamp_nanos()); let start = map_bound(start, |b| b.unix_timestamp_nanos());
let end = map_bound(end, |b| b.unix_timestamp_nanos()); let end = map_bound(end, |b| b.unix_timestamp_nanos());
let is_within_dates = RangeBounds::contains( let is_within_dates = RangeBounds::contains(
&(start, end), &(start, end),
&started_at_processing.unix_timestamp_nanos(), &processing_batch
.map_or_else(OffsetDateTime::now_utc, |batch| batch.started_at)
.unix_timestamp_nanos(),
); );
if !is_within_dates { if !is_within_dates {
filtered_processing_tasks.clear(); filtered_processing_tasks.clear();
@ -950,42 +935,59 @@ impl IndexScheduler {
} }
/// Return the batch ids matched by the given query from the index scheduler's point of view. /// Return the batch ids matched by the given query from the index scheduler's point of view.
pub(crate) fn get_batch_ids(&self, rtxn: &RoTxn, query: &Query) -> Result<RoaringBitmap> { pub(crate) fn get_batch_ids(
let ProcessingTasks { &self,
started_at: started_at_processing, rtxn: &RoTxn,
processing: processing_batches, processing: &ProcessingTasks,
batch_id, query: &Query,
} = self.processing_tasks.read().unwrap().clone(); ) -> Result<RoaringBitmap> {
let mut batches = self.all_batch_ids(rtxn)?; let mut batches = self.all_batch_ids(rtxn)?;
if let Some(batch_id) = processing.batch.as_ref().map(|batch| batch.uid) {
batches.insert(batch_id);
}
if let Some(from) = &query.from { if let Some(from) = &query.from {
batches.remove_range(from.saturating_add(1)..); batches.remove_range(from.saturating_add(1)..);
} }
if let Some(batch_uids) = &query.batch_uids {
let batches_uids = RoaringBitmap::from_iter(batch_uids);
batches &= batches_uids;
}
if let Some(status) = &query.statuses { if let Some(status) = &query.statuses {
let mut status_batches = RoaringBitmap::new(); let mut status_batches = RoaringBitmap::new();
for status in status { for status in status {
// TODO: We can't retrieve anything around processing batches so we can get rid of a lot of code here
match status { match status {
// special case for Processing batches // special case for Processing batches
Status::Processing => { Status::Processing => {
if let Some(batch_id) = batch_id { if let Some(batch_id) = processing.batch.as_ref().map(|batch| batch.uid) {
status_batches.insert(batch_id); status_batches.insert(batch_id);
} }
} }
// Enqueued tasks are not stored in batches
Status::Enqueued => (),
status => status_batches |= &self.get_batch_status(rtxn, *status)?, status => status_batches |= &self.get_batch_status(rtxn, *status)?,
}; };
} }
if !status.contains(&Status::Processing) { if !status.contains(&Status::Processing) {
batches -= &processing_batches; if let Some(ref batch) = processing.batch {
batches.remove(batch.uid);
}
} }
batches &= status_batches; batches &= status_batches;
} }
if let Some(uids) = &query.uids { if let Some(task_uids) = &query.uids {
let uids = RoaringBitmap::from_iter(uids); let mut batches_by_task_uids = RoaringBitmap::new();
batches &= &uids; for task_uid in task_uids {
if let Some(task) = self.get_task(rtxn, *task_uid)? {
if let Some(batch_uid) = task.batch_uid {
batches_by_task_uids.insert(batch_uid);
}
}
}
batches &= batches_by_task_uids;
} }
if let Some(canceled_by) = &query.canceled_by { if let Some(canceled_by) = &query.canceled_by {
@ -1009,6 +1011,13 @@ impl IndexScheduler {
let mut kind_batches = RoaringBitmap::new(); let mut kind_batches = RoaringBitmap::new();
for kind in kind { for kind in kind {
kind_batches |= self.get_batch_kind(rtxn, *kind)?; kind_batches |= self.get_batch_kind(rtxn, *kind)?;
if let Some(uid) = processing
.batch
.as_ref()
.and_then(|batch| batch.kinds.contains(kind).then_some(batch.uid))
{
kind_batches.insert(uid);
}
} }
batches &= &kind_batches; batches &= &kind_batches;
} }
@ -1017,6 +1026,13 @@ impl IndexScheduler {
let mut index_batches = RoaringBitmap::new(); let mut index_batches = RoaringBitmap::new();
for index in index { for index in index {
index_batches |= self.index_batches(rtxn, index)?; index_batches |= self.index_batches(rtxn, index)?;
if let Some(uid) = processing
.batch
.as_ref()
.and_then(|batch| batch.indexes.contains(index).then_some(batch.uid))
{
index_batches.insert(uid);
}
} }
batches &= &index_batches; batches &= &index_batches;
} }
@ -1027,7 +1043,7 @@ impl IndexScheduler {
// Once we have filtered the two subsets, we put them back together and assign it back to `batches`. // Once we have filtered the two subsets, we put them back together and assign it back to `batches`.
batches = { batches = {
let (mut filtered_non_processing_batches, mut filtered_processing_batches) = let (mut filtered_non_processing_batches, mut filtered_processing_batches) =
(&batches - &processing_batches, &batches & &processing_batches); (&batches - &processing.processing, &batches & &processing.processing);
// special case for Processing batches // special case for Processing batches
// A closure that clears the filtered_processing_batches if their started_at date falls outside the given bounds // A closure that clears the filtered_processing_batches if their started_at date falls outside the given bounds
@ -1037,7 +1053,11 @@ impl IndexScheduler {
let end = map_bound(end, |b| b.unix_timestamp_nanos()); let end = map_bound(end, |b| b.unix_timestamp_nanos());
let is_within_dates = RangeBounds::contains( let is_within_dates = RangeBounds::contains(
&(start, end), &(start, end),
&started_at_processing.unix_timestamp_nanos(), &processing
.batch
.as_ref()
.map_or_else(OffsetDateTime::now_utc, |batch| batch.started_at)
.unix_timestamp_nanos(),
); );
if !is_within_dates { if !is_within_dates {
filtered_processing_batches.clear(); filtered_processing_batches.clear();
@ -1207,32 +1227,50 @@ impl IndexScheduler {
query: &Query, query: &Query,
filters: &meilisearch_auth::AuthFilter, filters: &meilisearch_auth::AuthFilter,
) -> Result<(RoaringBitmap, u64)> { ) -> Result<(RoaringBitmap, u64)> {
let processing = self.processing_tasks.read().unwrap().clone();
// compute all batches matching the filter by ignoring the limits, to find the number of batches matching // compute all batches matching the filter by ignoring the limits, to find the number of batches matching
// the filter. // the filter.
// As this causes us to compute the filter twice it is slightly inefficient, but doing it this way spares // As this causes us to compute the filter twice it is slightly inefficient, but doing it this way spares
// us from modifying the underlying implementation, and the performance remains sufficient. // us from modifying the underlying implementation, and the performance remains sufficient.
// Should this change, we would modify `get_batch_ids` to directly return the number of matching batches. // Should this change, we would modify `get_batch_ids` to directly return the number of matching batches.
let total_batches = self.get_batch_ids(rtxn, &query.clone().without_limits())?; let total_batches =
let mut batches = self.get_batch_ids(rtxn, query)?; self.get_batch_ids(rtxn, &processing, &query.clone().without_limits())?;
let mut batches = self.get_batch_ids(rtxn, &processing, query)?;
// If the query contains a list of index uid or there is a finite list of authorized indexes, // If the query contains a list of index uid or there is a finite list of authorized indexes,
// then we must exclude all the kinds that aren't associated to one and only one index. // then we must exclude all the batches that only contains tasks associated to multiple indexes.
// This works because we don't autobatch tasks associated to multiple indexes with tasks associated
// to a single index. e.g: IndexSwap cannot be batched with IndexCreation.
if query.index_uids.is_some() || !filters.all_indexes_authorized() { if query.index_uids.is_some() || !filters.all_indexes_authorized() {
for kind in enum_iterator::all::<Kind>().filter(|kind| !kind.related_to_one_index()) { for kind in enum_iterator::all::<Kind>().filter(|kind| !kind.related_to_one_index()) {
batches -= self.get_kind(rtxn, kind)?; batches -= self.get_kind(rtxn, kind)?;
if let Some(batch) = processing.batch.as_ref() {
if batch.kinds.contains(&kind) {
batches.remove(batch.uid);
}
}
} }
} }
// Any task that is internally associated with a non-authorized index // Any task that is internally associated with a non-authorized index
// must be discarded. // must be discarded.
// This works because currently batches cannot contains tasks from multiple indexes at the same time.
if !filters.all_indexes_authorized() { if !filters.all_indexes_authorized() {
let all_indexes_iter = self.index_tasks.iter(rtxn)?; let all_indexes_iter = self.batch_index_tasks.iter(rtxn)?;
for result in all_indexes_iter { for result in all_indexes_iter {
let (index, index_tasks) = result?; let (index, index_tasks) = result?;
if !filters.is_index_authorized(index) { if !filters.is_index_authorized(index) {
batches -= index_tasks; batches -= index_tasks;
} }
} }
if let Some(batch) = processing.batch.as_ref() {
for index in &batch.indexes {
if !filters.is_index_authorized(index) {
batches.remove(batch.uid);
}
}
}
} }
Ok((batches, total_batches.len())) Ok((batches, total_batches.len()))
@ -1260,20 +1298,22 @@ impl IndexScheduler {
tasks.into_iter().rev().take(query.limit.unwrap_or(u32::MAX) as usize), tasks.into_iter().rev().take(query.limit.unwrap_or(u32::MAX) as usize),
)?; )?;
let ProcessingTasks { started_at, batch_id, processing } = let ProcessingTasks { batch, processing } =
self.processing_tasks.read().map_err(|_| Error::CorruptedTaskQueue)?.clone(); self.processing_tasks.read().map_err(|_| Error::CorruptedTaskQueue)?.clone();
let ret = tasks.into_iter(); let ret = tasks.into_iter();
if processing.is_empty() { if processing.is_empty() || batch.is_none() {
Ok((ret.collect(), total)) Ok((ret.collect(), total))
} else { } else {
// Safe because we ensured there was a batch in the previous branch
let batch = batch.unwrap();
Ok(( Ok((
ret.map(|task| { ret.map(|task| {
if processing.contains(task.uid) { if processing.contains(task.uid) {
Task { Task {
status: Status::Processing, status: Status::Processing,
batch_uid: batch_id, batch_uid: Some(batch.uid),
started_at: Some(started_at), started_at: Some(batch.started_at),
..task ..task
} }
} else { } else {
@ -1302,32 +1342,14 @@ impl IndexScheduler {
) -> Result<(Vec<Batch>, u64)> { ) -> Result<(Vec<Batch>, u64)> {
let rtxn = self.env.read_txn()?; let rtxn = self.env.read_txn()?;
let (tasks, total) = self.get_batch_ids_from_authorized_indexes(&rtxn, &query, filters)?; let (batches, total) =
let tasks = self.get_existing_batches( self.get_batch_ids_from_authorized_indexes(&rtxn, &query, filters)?;
let batches = self.get_existing_batches(
&rtxn, &rtxn,
tasks.into_iter().rev().take(query.limit.unwrap_or(u32::MAX) as usize), batches.into_iter().rev().take(query.limit.unwrap_or(u32::MAX) as usize),
)?; )?;
let ProcessingTasks { started_at, batch_id, processing } = Ok((batches, total))
self.processing_tasks.read().map_err(|_| Error::CorruptedTaskQueue)?.clone();
let ret = tasks.into_iter();
if processing.is_empty() {
Ok((ret.collect(), total))
} else {
// TODO: We must re-insert the current processing batch somewhere in some way
Ok((
ret.map(|batch| {
if processing.contains(batch.uid) {
Batch { started_at, ..batch }
} else {
batch
}
})
.collect(),
total,
))
}
} }
/// Register a new task in the scheduler. /// Register a new task in the scheduler.
@ -1495,7 +1517,7 @@ impl IndexScheduler {
} }
let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?; let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?;
let (batch, batch_id) = let (batch, mut processing_batch) =
match self.create_next_batch(&rtxn).map_err(|e| Error::CreateBatch(Box::new(e)))? { match self.create_next_batch(&rtxn).map_err(|e| Error::CreateBatch(Box::new(e)))? {
Some(batch) => batch, Some(batch) => batch,
None => return Ok(TickOutcome::WaitForSignal), None => return Ok(TickOutcome::WaitForSignal),
@ -1506,11 +1528,14 @@ impl IndexScheduler {
// 1. store the starting date with the bitmap of processing tasks. // 1. store the starting date with the bitmap of processing tasks.
let ids = batch.ids(); let ids = batch.ids();
let processed_tasks = ids.len(); let processed_tasks = ids.len();
let started_at = OffsetDateTime::now_utc();
// We reset the must_stop flag to be sure that we don't stop processing tasks // We reset the must_stop flag to be sure that we don't stop processing tasks
self.must_stop_processing.reset(); self.must_stop_processing.reset();
self.processing_tasks.write().unwrap().start_processing(started_at, batch_id, ids.clone()); self.processing_tasks
.write()
.unwrap()
// We can clone the processing batch here because we don't want its modification to affect the view of the processing batches
.start_processing(processing_batch.clone(), ids.clone());
#[cfg(test)] #[cfg(test)]
self.breakpoint(Breakpoint::BatchCreated); self.breakpoint(Breakpoint::BatchCreated);
@ -1534,7 +1559,6 @@ impl IndexScheduler {
let mut wtxn = self.env.write_txn().map_err(Error::HeedTransaction)?; let mut wtxn = self.env.write_txn().map_err(Error::HeedTransaction)?;
let finished_at = OffsetDateTime::now_utc(); let finished_at = OffsetDateTime::now_utc();
let mut current_batch = CachedBatch::new(batch_id, started_at, finished_at);
match res { match res {
Ok(tasks) => { Ok(tasks) => {
#[cfg(test)] #[cfg(test)]
@ -1545,7 +1569,7 @@ impl IndexScheduler {
#[allow(unused_variables)] #[allow(unused_variables)]
for (i, mut task) in tasks.into_iter().enumerate() { for (i, mut task) in tasks.into_iter().enumerate() {
task.started_at = Some(started_at); task.started_at = Some(processing_batch.started_at);
task.finished_at = Some(finished_at); task.finished_at = Some(finished_at);
#[cfg(test)] #[cfg(test)]
@ -1562,7 +1586,7 @@ impl IndexScheduler {
self.update_task(&mut wtxn, &task) self.update_task(&mut wtxn, &task)
.map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))?; .map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))?;
current_batch.update(&task); processing_batch.update(&task);
} }
tracing::info!("A batch of tasks was successfully completed with {success} successful tasks and {failure} failed tasks."); tracing::info!("A batch of tasks was successfully completed with {success} successful tasks and {failure} failed tasks.");
} }
@ -1610,9 +1634,9 @@ impl IndexScheduler {
let mut task = self let mut task = self
.get_task(&wtxn, id) .get_task(&wtxn, id)
.map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))? .map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))?
.ok_or(Error::CorruptedTaskQueue)? .ok_or(Error::CorruptedTaskQueue)?;
.with_batch_id(batch_id); task.batch_uid = Some(processing_batch.uid);
task.started_at = Some(started_at); task.started_at = Some(processing_batch.started_at);
task.finished_at = Some(finished_at); task.finished_at = Some(finished_at);
task.status = Status::Failed; task.status = Status::Failed;
task.error = Some(error.clone()); task.error = Some(error.clone());
@ -1625,14 +1649,14 @@ impl IndexScheduler {
self.update_task(&mut wtxn, &task) self.update_task(&mut wtxn, &task)
.map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))?; .map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))?;
current_batch.update(&task); processing_batch.update(&task);
} }
} }
} }
let processed = self.processing_tasks.write().unwrap().stop_processing(); let processed = self.processing_tasks.write().unwrap().stop_processing();
self.write_batch(&mut wtxn, current_batch, &processed.processing)?; self.write_batch(&mut wtxn, processing_batch, &processed.processing, finished_at)?;
#[cfg(test)] #[cfg(test)]
self.maybe_fail(tests::FailureLocation::CommittingWtxn)?; self.maybe_fail(tests::FailureLocation::CommittingWtxn)?;
@ -4214,7 +4238,7 @@ mod tests {
let (batches, _) = index_scheduler let (batches, _) = index_scheduler
.get_batch_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default()) .get_batch_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default())
.unwrap(); .unwrap();
snapshot!(snapshot_bitmap(&batches), @"[1,2,]"); // only the enqueued batches in the first tick snapshot!(snapshot_bitmap(&batches), @"[]"); // The batches don't contains any enqueued tasks
let query = Query { let query = Query {
statuses: Some(vec![Status::Enqueued, Status::Processing]), statuses: Some(vec![Status::Enqueued, Status::Processing]),
@ -4223,7 +4247,7 @@ mod tests {
let (batches, _) = index_scheduler let (batches, _) = index_scheduler
.get_batch_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default()) .get_batch_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default())
.unwrap(); .unwrap();
snapshot!(snapshot_bitmap(&batches), @"[0,1,2,]"); // both enqueued and processing tasks in the first tick snapshot!(snapshot_bitmap(&batches), @"[0,]"); // both enqueued and processing tasks in the first tick
let query = Query { let query = Query {
statuses: Some(vec![Status::Enqueued, Status::Processing]), statuses: Some(vec![Status::Enqueued, Status::Processing]),
@ -4454,6 +4478,19 @@ mod tests {
// associated with doggo -> empty result // associated with doggo -> empty result
snapshot!(snapshot_bitmap(&batches), @"[]"); snapshot!(snapshot_bitmap(&batches), @"[]");
drop(rtxn);
// We're going to advance and process all the batches for the next query to actually hit the db
handle.advance_till([
InsideProcessBatch,
InsideProcessBatch,
ProcessBatchSucceeded,
AfterProcessing,
]);
handle.advance_one_successful_batch();
handle.advance_n_failed_batches(2);
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after-processing-everything");
let rtxn = index_scheduler.env.read_txn().unwrap();
let query = Query::default(); let query = Query::default();
let (batches, _) = index_scheduler let (batches, _) = index_scheduler
.get_batch_ids_from_authorized_indexes( .get_batch_ids_from_authorized_indexes(

View File

@ -0,0 +1,90 @@
---
source: crates/index-scheduler/src/lib.rs
---
### Autobatching Enabled = true
### Processing batch None:
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, batch_uid: 0, status: succeeded, details: { primary_key: Some("bone") }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("bone") }}
1 {uid: 1, batch_uid: 1, status: succeeded, details: { primary_key: Some("plankton") }, kind: IndexCreation { index_uid: "whalo", primary_key: Some("plankton") }}
2 {uid: 2, batch_uid: 2, status: succeeded, details: { primary_key: Some("his_own_vomit") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("his_own_vomit") }}
----------------------------------------------------------------------
### Status:
enqueued []
succeeded [0,1,2,]
----------------------------------------------------------------------
### Kind:
"indexCreation" [0,1,2,]
----------------------------------------------------------------------
### Index Tasks:
catto [2,]
doggo [0,]
whalo [1,]
----------------------------------------------------------------------
### Index Mapper:
catto: { number_of_documents: 0, field_distribution: {} }
doggo: { number_of_documents: 0, field_distribution: {} }
whalo: { number_of_documents: 0, field_distribution: {} }
----------------------------------------------------------------------
### Canceled By:
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
----------------------------------------------------------------------
### Started At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
----------------------------------------------------------------------
### Finished At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
----------------------------------------------------------------------
### All Batches:
0 {uid: 0, }
1 {uid: 1, }
2 {uid: 2, }
----------------------------------------------------------------------
### Batch to tasks mapping:
0 [0,]
1 [1,]
2 [2,]
----------------------------------------------------------------------
### Batches Status:
succeeded [0,1,2,]
----------------------------------------------------------------------
### Batches Kind:
"indexCreation" [0,1,2,]
----------------------------------------------------------------------
### Batches Index Tasks:
catto [2,]
doggo [0,]
whalo [1,]
----------------------------------------------------------------------
### Batches Canceled By:
----------------------------------------------------------------------
### Batches Enqueued At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
----------------------------------------------------------------------
### Batches Started At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
----------------------------------------------------------------------
### Batches Finished At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
----------------------------------------------------------------------
### File Store:
----------------------------------------------------------------------

View File

@ -0,0 +1,54 @@
---
source: crates/index-scheduler/src/lib.rs
---
### Autobatching Enabled = true
### Processing batch None:
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: enqueued, details: { primary_key: Some("bone") }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("bone") }}
----------------------------------------------------------------------
### Status:
enqueued [0,]
----------------------------------------------------------------------
### Kind:
"indexCreation" [0,]
----------------------------------------------------------------------
### Index Tasks:
doggo [0,]
----------------------------------------------------------------------
### Index Mapper:
----------------------------------------------------------------------
### Canceled By:
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
----------------------------------------------------------------------
### Started At:
----------------------------------------------------------------------
### Finished At:
----------------------------------------------------------------------
### All Batches:
----------------------------------------------------------------------
### Batch to tasks mapping:
----------------------------------------------------------------------
### Batches Status:
----------------------------------------------------------------------
### Batches Kind:
----------------------------------------------------------------------
### Batches Index Tasks:
----------------------------------------------------------------------
### Batches Canceled By:
----------------------------------------------------------------------
### Batches Enqueued At:
----------------------------------------------------------------------
### Batches Started At:
----------------------------------------------------------------------
### Batches Finished At:
----------------------------------------------------------------------
### File Store:
----------------------------------------------------------------------

View File

@ -0,0 +1,57 @@
---
source: crates/index-scheduler/src/lib.rs
---
### Autobatching Enabled = true
### Processing batch None:
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: enqueued, details: { primary_key: Some("bone") }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("bone") }}
1 {uid: 1, status: enqueued, details: { primary_key: Some("plankton") }, kind: IndexCreation { index_uid: "whalo", primary_key: Some("plankton") }}
----------------------------------------------------------------------
### Status:
enqueued [0,1,]
----------------------------------------------------------------------
### Kind:
"indexCreation" [0,1,]
----------------------------------------------------------------------
### Index Tasks:
doggo [0,]
whalo [1,]
----------------------------------------------------------------------
### Index Mapper:
----------------------------------------------------------------------
### Canceled By:
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
[timestamp] [1,]
----------------------------------------------------------------------
### Started At:
----------------------------------------------------------------------
### Finished At:
----------------------------------------------------------------------
### All Batches:
----------------------------------------------------------------------
### Batch to tasks mapping:
----------------------------------------------------------------------
### Batches Status:
----------------------------------------------------------------------
### Batches Kind:
----------------------------------------------------------------------
### Batches Index Tasks:
----------------------------------------------------------------------
### Batches Canceled By:
----------------------------------------------------------------------
### Batches Enqueued At:
----------------------------------------------------------------------
### Batches Started At:
----------------------------------------------------------------------
### Batches Finished At:
----------------------------------------------------------------------
### File Store:
----------------------------------------------------------------------

View File

@ -0,0 +1,60 @@
---
source: crates/index-scheduler/src/lib.rs
---
### Autobatching Enabled = true
### Processing batch None:
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: enqueued, details: { primary_key: Some("bone") }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("bone") }}
1 {uid: 1, status: enqueued, details: { primary_key: Some("plankton") }, kind: IndexCreation { index_uid: "whalo", primary_key: Some("plankton") }}
2 {uid: 2, status: enqueued, details: { primary_key: Some("his_own_vomit") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("his_own_vomit") }}
----------------------------------------------------------------------
### Status:
enqueued [0,1,2,]
----------------------------------------------------------------------
### Kind:
"indexCreation" [0,1,2,]
----------------------------------------------------------------------
### Index Tasks:
catto [2,]
doggo [0,]
whalo [1,]
----------------------------------------------------------------------
### Index Mapper:
----------------------------------------------------------------------
### Canceled By:
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
----------------------------------------------------------------------
### Started At:
----------------------------------------------------------------------
### Finished At:
----------------------------------------------------------------------
### All Batches:
----------------------------------------------------------------------
### Batch to tasks mapping:
----------------------------------------------------------------------
### Batches Status:
----------------------------------------------------------------------
### Batches Kind:
----------------------------------------------------------------------
### Batches Index Tasks:
----------------------------------------------------------------------
### Batches Canceled By:
----------------------------------------------------------------------
### Batches Enqueued At:
----------------------------------------------------------------------
### Batches Started At:
----------------------------------------------------------------------
### Batches Finished At:
----------------------------------------------------------------------
### File Store:
----------------------------------------------------------------------

View File

@ -0,0 +1,91 @@
---
source: crates/index-scheduler/src/lib.rs
---
### Autobatching Enabled = true
### Processing batch None:
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, batch_uid: 0, status: succeeded, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
1 {uid: 1, batch_uid: 1, status: succeeded, details: { primary_key: Some("sheep") }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("sheep") }}
2 {uid: 2, batch_uid: 2, status: failed, error: ResponseError { code: 200, message: "Planned failure for tests.", error_code: "internal", error_type: "internal", error_link: "https://docs.meilisearch.com/errors#internal" }, details: { primary_key: Some("fish") }, kind: IndexCreation { index_uid: "whalo", primary_key: Some("fish") }}
----------------------------------------------------------------------
### Status:
enqueued []
succeeded [0,1,]
failed [2,]
----------------------------------------------------------------------
### Kind:
"indexCreation" [0,1,2,]
----------------------------------------------------------------------
### Index Tasks:
catto [0,]
doggo [1,]
whalo [2,]
----------------------------------------------------------------------
### Index Mapper:
catto: { number_of_documents: 0, field_distribution: {} }
doggo: { number_of_documents: 0, field_distribution: {} }
----------------------------------------------------------------------
### Canceled By:
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
----------------------------------------------------------------------
### Started At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
----------------------------------------------------------------------
### Finished At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
----------------------------------------------------------------------
### All Batches:
0 {uid: 0, }
1 {uid: 1, }
2 {uid: 2, }
----------------------------------------------------------------------
### Batch to tasks mapping:
0 [0,]
1 [1,]
2 [2,]
----------------------------------------------------------------------
### Batches Status:
succeeded [0,1,]
failed [2,]
----------------------------------------------------------------------
### Batches Kind:
"indexCreation" [0,1,2,]
----------------------------------------------------------------------
### Batches Index Tasks:
catto [0,]
doggo [1,]
whalo [2,]
----------------------------------------------------------------------
### Batches Canceled By:
----------------------------------------------------------------------
### Batches Enqueued At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
----------------------------------------------------------------------
### Batches Started At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
----------------------------------------------------------------------
### Batches Finished At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
----------------------------------------------------------------------
### File Store:
----------------------------------------------------------------------

View File

@ -0,0 +1,60 @@
---
source: crates/index-scheduler/src/lib.rs
---
### Autobatching Enabled = true
### Processing batch None:
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: enqueued, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
1 {uid: 1, status: enqueued, details: { primary_key: Some("sheep") }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("sheep") }}
2 {uid: 2, status: enqueued, details: { primary_key: Some("fish") }, kind: IndexCreation { index_uid: "whalo", primary_key: Some("fish") }}
----------------------------------------------------------------------
### Status:
enqueued [0,1,2,]
----------------------------------------------------------------------
### Kind:
"indexCreation" [0,1,2,]
----------------------------------------------------------------------
### Index Tasks:
catto [0,]
doggo [1,]
whalo [2,]
----------------------------------------------------------------------
### Index Mapper:
----------------------------------------------------------------------
### Canceled By:
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
----------------------------------------------------------------------
### Started At:
----------------------------------------------------------------------
### Finished At:
----------------------------------------------------------------------
### All Batches:
----------------------------------------------------------------------
### Batch to tasks mapping:
----------------------------------------------------------------------
### Batches Status:
----------------------------------------------------------------------
### Batches Kind:
----------------------------------------------------------------------
### Batches Index Tasks:
----------------------------------------------------------------------
### Batches Canceled By:
----------------------------------------------------------------------
### Batches Enqueued At:
----------------------------------------------------------------------
### Batches Started At:
----------------------------------------------------------------------
### Batches Finished At:
----------------------------------------------------------------------
### File Store:
----------------------------------------------------------------------

View File

@ -0,0 +1,102 @@
---
source: crates/index-scheduler/src/lib.rs
---
### Autobatching Enabled = true
### Processing batch None:
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, batch_uid: 0, status: succeeded, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
1 {uid: 1, batch_uid: 1, status: succeeded, details: { primary_key: Some("sheep") }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("sheep") }}
2 {uid: 2, batch_uid: 2, status: failed, error: ResponseError { code: 200, message: "Planned failure for tests.", error_code: "internal", error_type: "internal", error_link: "https://docs.meilisearch.com/errors#internal" }, details: { swaps: [IndexSwap { indexes: ("catto", "doggo") }] }, kind: IndexSwap { swaps: [IndexSwap { indexes: ("catto", "doggo") }] }}
3 {uid: 3, batch_uid: 3, status: failed, error: ResponseError { code: 200, message: "Index `whalo` not found.", error_code: "index_not_found", error_type: "invalid_request", error_link: "https://docs.meilisearch.com/errors#index_not_found" }, details: { swaps: [IndexSwap { indexes: ("catto", "whalo") }] }, kind: IndexSwap { swaps: [IndexSwap { indexes: ("catto", "whalo") }] }}
----------------------------------------------------------------------
### Status:
enqueued []
succeeded [0,1,]
failed [2,3,]
----------------------------------------------------------------------
### Kind:
"indexCreation" [0,1,]
"indexSwap" [2,3,]
----------------------------------------------------------------------
### Index Tasks:
catto [0,2,3,]
doggo [1,2,]
whalo [3,]
----------------------------------------------------------------------
### Index Mapper:
catto: { number_of_documents: 0, field_distribution: {} }
doggo: { number_of_documents: 0, field_distribution: {} }
----------------------------------------------------------------------
### Canceled By:
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
[timestamp] [3,]
----------------------------------------------------------------------
### Started At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
[timestamp] [3,]
----------------------------------------------------------------------
### Finished At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
[timestamp] [3,]
----------------------------------------------------------------------
### All Batches:
0 {uid: 0, }
1 {uid: 1, }
2 {uid: 2, }
3 {uid: 3, }
----------------------------------------------------------------------
### Batch to tasks mapping:
0 [0,]
1 [1,]
2 [2,]
3 [3,]
----------------------------------------------------------------------
### Batches Status:
succeeded [0,1,]
failed [2,3,]
----------------------------------------------------------------------
### Batches Kind:
"indexCreation" [0,1,]
"indexSwap" [2,3,]
----------------------------------------------------------------------
### Batches Index Tasks:
catto [0,2,3,]
doggo [1,2,]
whalo [3,]
----------------------------------------------------------------------
### Batches Canceled By:
----------------------------------------------------------------------
### Batches Enqueued At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
[timestamp] [3,]
----------------------------------------------------------------------
### Batches Started At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
[timestamp] [3,]
----------------------------------------------------------------------
### Batches Finished At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
[timestamp] [3,]
----------------------------------------------------------------------
### File Store:
----------------------------------------------------------------------

View File

@ -0,0 +1,63 @@
---
source: crates/index-scheduler/src/lib.rs
---
### Autobatching Enabled = true
### Processing batch None:
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: enqueued, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
1 {uid: 1, status: enqueued, details: { primary_key: Some("sheep") }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("sheep") }}
2 {uid: 2, status: enqueued, details: { swaps: [IndexSwap { indexes: ("catto", "doggo") }] }, kind: IndexSwap { swaps: [IndexSwap { indexes: ("catto", "doggo") }] }}
3 {uid: 3, status: enqueued, details: { swaps: [IndexSwap { indexes: ("catto", "whalo") }] }, kind: IndexSwap { swaps: [IndexSwap { indexes: ("catto", "whalo") }] }}
----------------------------------------------------------------------
### Status:
enqueued [0,1,2,3,]
----------------------------------------------------------------------
### Kind:
"indexCreation" [0,1,]
"indexSwap" [2,3,]
----------------------------------------------------------------------
### Index Tasks:
catto [0,2,3,]
doggo [1,2,]
whalo [3,]
----------------------------------------------------------------------
### Index Mapper:
----------------------------------------------------------------------
### Canceled By:
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
[timestamp] [3,]
----------------------------------------------------------------------
### Started At:
----------------------------------------------------------------------
### Finished At:
----------------------------------------------------------------------
### All Batches:
----------------------------------------------------------------------
### Batch to tasks mapping:
----------------------------------------------------------------------
### Batches Status:
----------------------------------------------------------------------
### Batches Kind:
----------------------------------------------------------------------
### Batches Index Tasks:
----------------------------------------------------------------------
### Batches Canceled By:
----------------------------------------------------------------------
### Batches Enqueued At:
----------------------------------------------------------------------
### Batches Started At:
----------------------------------------------------------------------
### Batches Finished At:
----------------------------------------------------------------------
### File Store:
----------------------------------------------------------------------

View File

@ -11,52 +11,65 @@ use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status
use roaring::{MultiOps, RoaringBitmap}; use roaring::{MultiOps, RoaringBitmap};
use time::OffsetDateTime; use time::OffsetDateTime;
use crate::{Error, IndexScheduler, ProcessingTasks, Result, Task, TaskId, BEI128}; use crate::{Error, IndexScheduler, Result, Task, TaskId, BEI128};
/// This structure contains all the information required to write a batch in the database without reading the tasks. /// This structure contains all the information required to write a batch in the database without reading the tasks.
/// It'll stay in RAM so it must be small. /// It'll stay in RAM so it must be small.
pub(crate) struct CachedBatch { #[derive(Debug, Clone)]
uid: BatchId, pub(crate) struct ProcessingBatch {
statuses: HashSet<Status>, pub uid: BatchId,
kinds: HashSet<Kind>, pub statuses: HashSet<Status>,
indexes: HashSet<String>, pub kinds: HashSet<Kind>,
canceled_by: HashSet<TaskId>, pub indexes: HashSet<String>,
oldest_enqueued_at: Option<OffsetDateTime>, pub canceled_by: HashSet<TaskId>,
earliest_enqueued_at: Option<OffsetDateTime>, pub oldest_enqueued_at: Option<OffsetDateTime>,
started_at: OffsetDateTime, pub earliest_enqueued_at: Option<OffsetDateTime>,
finished_at: OffsetDateTime, pub started_at: OffsetDateTime,
} }
impl CachedBatch { impl ProcessingBatch {
pub fn new(uid: BatchId, started_at: OffsetDateTime, finished_at: OffsetDateTime) -> Self { pub fn new(uid: BatchId) -> Self {
// At the beginning, all the tasks are processing
let mut statuses = HashSet::default();
statuses.insert(Status::Processing);
Self { Self {
uid, uid,
statuses: HashSet::default(), statuses,
kinds: HashSet::default(), kinds: HashSet::default(),
indexes: HashSet::default(), indexes: HashSet::default(),
canceled_by: HashSet::default(), canceled_by: HashSet::default(),
oldest_enqueued_at: None, oldest_enqueued_at: None,
earliest_enqueued_at: None, earliest_enqueued_at: None,
started_at, started_at: OffsetDateTime::now_utc(),
finished_at,
} }
} }
/// Remove the Processing status and update the real statuses of the tasks.
pub fn update(&mut self, task: &Task) { pub fn update(&mut self, task: &Task) {
self.statuses.clear();
self.statuses.insert(task.status); self.statuses.insert(task.status);
self.kinds.insert(task.kind.as_kind()); }
self.indexes.extend(task.indexes().iter().map(|s| s.to_string()));
if let Some(canceled_by) = task.canceled_by { /// Update itself with the content of the task and update the batch id in the task.
self.canceled_by.insert(canceled_by); pub fn processing<'a>(&mut self, tasks: impl IntoIterator<Item = &'a mut Task>) {
for task in tasks.into_iter() {
task.batch_uid = Some(self.uid);
// We don't store the statuses since they're all enqueued.
self.kinds.insert(task.kind.as_kind());
self.indexes.extend(task.indexes().iter().map(|s| s.to_string()));
if let Some(canceled_by) = task.canceled_by {
self.canceled_by.insert(canceled_by);
}
self.oldest_enqueued_at =
Some(self.oldest_enqueued_at.map_or(task.enqueued_at, |oldest_enqueued_at| {
task.enqueued_at.min(oldest_enqueued_at)
}));
self.earliest_enqueued_at =
Some(self.earliest_enqueued_at.map_or(task.enqueued_at, |earliest_enqueued_at| {
task.enqueued_at.max(earliest_enqueued_at)
}));
} }
self.oldest_enqueued_at =
Some(self.oldest_enqueued_at.map_or(task.enqueued_at, |oldest_enqueued_at| {
task.enqueued_at.min(oldest_enqueued_at)
}));
self.earliest_enqueued_at =
Some(self.earliest_enqueued_at.map_or(task.enqueued_at, |earliest_enqueued_at| {
task.enqueued_at.max(earliest_enqueued_at)
}));
} }
} }
@ -97,17 +110,14 @@ impl IndexScheduler {
pub(crate) fn write_batch( pub(crate) fn write_batch(
&self, &self,
wtxn: &mut RwTxn, wtxn: &mut RwTxn,
batch: CachedBatch, batch: ProcessingBatch,
tasks: &RoaringBitmap, tasks: &RoaringBitmap,
finished_at: OffsetDateTime,
) -> Result<()> { ) -> Result<()> {
self.all_batches.put( self.all_batches.put(
wtxn, wtxn,
&batch.uid, &batch.uid,
&Batch { &Batch { uid: batch.uid, started_at: batch.started_at, finished_at: Some(finished_at) },
uid: batch.uid,
started_at: batch.started_at,
finished_at: Some(batch.finished_at),
},
)?; )?;
self.batch_to_tasks_mapping.put(wtxn, &batch.uid, tasks)?; self.batch_to_tasks_mapping.put(wtxn, &batch.uid, tasks)?;
@ -135,25 +145,27 @@ impl IndexScheduler {
insert_task_datetime(wtxn, self.batch_enqueued_at, enqueued_at, batch.uid)?; insert_task_datetime(wtxn, self.batch_enqueued_at, enqueued_at, batch.uid)?;
} }
insert_task_datetime(wtxn, self.batch_started_at, batch.started_at, batch.uid)?; insert_task_datetime(wtxn, self.batch_started_at, batch.started_at, batch.uid)?;
insert_task_datetime(wtxn, self.batch_finished_at, batch.finished_at, batch.uid)?; insert_task_datetime(wtxn, self.batch_finished_at, finished_at, batch.uid)?;
Ok(()) Ok(())
} }
/// Convert an iterator to a `Vec` of tasks. The tasks MUST exist or a /// Convert an iterator to a `Vec` of tasks. The tasks MUST exist or a
/// `CorruptedTaskQueue` error will be throwed. /// `CorruptedTaskQueue` error will be throwed.
pub(crate) fn get_existing_tasks_with_batch_id( pub(crate) fn get_existing_tasks_with_processing_batch(
&self, &self,
rtxn: &RoTxn, rtxn: &RoTxn,
batch_id: BatchId, processing_batch: &mut ProcessingBatch,
tasks: impl IntoIterator<Item = TaskId>, tasks: impl IntoIterator<Item = TaskId>,
) -> Result<Vec<Task>> { ) -> Result<Vec<Task>> {
tasks tasks
.into_iter() .into_iter()
.map(|task_id| { .map(|task_id| {
self.get_task(rtxn, task_id) let mut task = self
.and_then(|task| task.ok_or(Error::CorruptedTaskQueue)) .get_task(rtxn, task_id)
.map(|task| task.with_batch_id(batch_id)) .and_then(|task| task.ok_or(Error::CorruptedTaskQueue));
processing_batch.processing(&mut task);
task
}) })
.collect::<Result<_>>() .collect::<Result<_>>()
} }

View File

@ -62,11 +62,6 @@ impl Task {
} }
} }
pub fn with_batch_id(mut self, batch_id: TaskId) -> Self {
self.batch_uid = Some(batch_id);
self
}
/// Return the list of indexes updated by this tasks. /// Return the list of indexes updated by this tasks.
pub fn indexes(&self) -> Vec<&str> { pub fn indexes(&self) -> Vec<&str> {
self.kind.indexes() self.kind.indexes()