From b5bea0cf56ea030cb8f4bce6bdfdf933323bd9fb Mon Sep 17 00:00:00 2001 From: Tamo Date: Mon, 18 Nov 2024 16:53:55 +0100 Subject: [PATCH] fixes a lot of small issue, the test about the cancellation is still failing --- crates/index-scheduler/src/batch.rs | 128 +++++------ crates/index-scheduler/src/insta_snapshot.rs | 5 +- crates/index-scheduler/src/lib.rs | 211 ++++++++++-------- .../processed_all_tasks.snap | 90 ++++++++ .../registered_the_first_task.snap | 54 +++++ .../registered_the_second_task.snap | 57 +++++ .../registered_the_third_task.snap | 60 +++++ .../lib.rs/query_batches_simple/end.snap | 91 ++++++++ .../lib.rs/query_batches_simple/start.snap | 60 +++++ .../after-processing-everything.snap | 102 +++++++++ .../query_batches_special_rules/start.snap | 63 ++++++ crates/index-scheduler/src/utils.rs | 92 ++++---- crates/meilisearch-types/src/tasks.rs | 5 - 13 files changed, 821 insertions(+), 197 deletions(-) create mode 100644 crates/index-scheduler/src/snapshots/lib.rs/query_batches_from_and_limit/processed_all_tasks.snap create mode 100644 crates/index-scheduler/src/snapshots/lib.rs/query_batches_from_and_limit/registered_the_first_task.snap create mode 100644 crates/index-scheduler/src/snapshots/lib.rs/query_batches_from_and_limit/registered_the_second_task.snap create mode 100644 crates/index-scheduler/src/snapshots/lib.rs/query_batches_from_and_limit/registered_the_third_task.snap create mode 100644 crates/index-scheduler/src/snapshots/lib.rs/query_batches_simple/end.snap create mode 100644 crates/index-scheduler/src/snapshots/lib.rs/query_batches_simple/start.snap create mode 100644 crates/index-scheduler/src/snapshots/lib.rs/query_batches_special_rules/after-processing-everything.snap create mode 100644 crates/index-scheduler/src/snapshots/lib.rs/query_batches_special_rules/start.snap diff --git a/crates/index-scheduler/src/batch.rs b/crates/index-scheduler/src/batch.rs index d64e2657a..0130e9bfe 100644 --- a/crates/index-scheduler/src/batch.rs +++ b/crates/index-scheduler/src/batch.rs @@ -24,7 +24,6 @@ use std::fs::{self, File}; use std::io::BufWriter; use dump::IndexMetadata; -use meilisearch_types::batches::BatchId; use meilisearch_types::error::Code; use meilisearch_types::heed::{RoTxn, RwTxn}; use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader}; @@ -45,7 +44,7 @@ use time::OffsetDateTime; use uuid::Uuid; use crate::autobatcher::{self, BatchKind}; -use crate::utils::{self, swap_index_uid_in_task}; +use crate::utils::{self, swap_index_uid_in_task, ProcessingBatch}; use crate::{Error, IndexScheduler, MustStopProcessing, ProcessingTasks, Result, TaskId}; /// Represents a combination of tasks that can all be processed at the same time. @@ -280,22 +279,24 @@ impl IndexScheduler { rtxn: &RoTxn, index_uid: String, batch: BatchKind, - batch_id: BatchId, + current_batch: &mut ProcessingBatch, must_create_index: bool, ) -> Result> { match batch { BatchKind::DocumentClear { ids } => Ok(Some(Batch::IndexOperation { op: IndexOperation::DocumentClear { - tasks: self.get_existing_tasks_with_batch_id(rtxn, batch_id, ids)?, + tasks: self.get_existing_tasks_with_processing_batch( + rtxn, + current_batch, + ids, + )?, index_uid, }, must_create_index, })), BatchKind::DocumentEdition { id } => { - let task = self - .get_task(rtxn, id)? - .ok_or(Error::CorruptedTaskQueue)? - .with_batch_id(batch_id); + let mut task = self.get_task(rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?; + current_batch.processing(Some(&mut task)); match &task.kind { KindWithContent::DocumentEdition { index_uid, .. } => { Ok(Some(Batch::IndexOperation { @@ -310,7 +311,11 @@ impl IndexScheduler { } } BatchKind::DocumentOperation { method, operation_ids, .. } => { - let tasks = self.get_existing_tasks_with_batch_id(rtxn, batch_id, operation_ids)?; + let tasks = self.get_existing_tasks_with_processing_batch( + rtxn, + current_batch, + operation_ids, + )?; let primary_key = tasks .iter() .find_map(|task| match task.kind { @@ -357,7 +362,11 @@ impl IndexScheduler { })) } BatchKind::DocumentDeletion { deletion_ids, includes_by_filter: _ } => { - let tasks = self.get_existing_tasks_with_batch_id(rtxn, batch_id, deletion_ids)?; + let tasks = self.get_existing_tasks_with_processing_batch( + rtxn, + current_batch, + deletion_ids, + )?; Ok(Some(Batch::IndexOperation { op: IndexOperation::DocumentDeletion { index_uid, tasks }, @@ -365,7 +374,11 @@ impl IndexScheduler { })) } BatchKind::Settings { settings_ids, .. } => { - let tasks = self.get_existing_tasks_with_batch_id(rtxn, batch_id, settings_ids)?; + let tasks = self.get_existing_tasks_with_processing_batch( + rtxn, + current_batch, + settings_ids, + )?; let mut settings = Vec::new(); for task in &tasks { @@ -388,7 +401,7 @@ impl IndexScheduler { rtxn, index_uid, BatchKind::Settings { settings_ids, allow_index_creation }, - batch_id, + current_batch, must_create_index, )? .unwrap() @@ -404,7 +417,7 @@ impl IndexScheduler { rtxn, index_uid, BatchKind::DocumentClear { ids: other }, - batch_id, + current_batch, must_create_index, )? .unwrap() @@ -437,7 +450,7 @@ impl IndexScheduler { rtxn, index_uid.clone(), BatchKind::Settings { settings_ids, allow_index_creation }, - batch_id, + current_batch, must_create_index, )?; @@ -450,7 +463,7 @@ impl IndexScheduler { primary_key, operation_ids, }, - batch_id, + current_batch, must_create_index, )?; @@ -488,10 +501,8 @@ impl IndexScheduler { } } BatchKind::IndexCreation { id } => { - let task = self - .get_task(rtxn, id)? - .ok_or(Error::CorruptedTaskQueue)? - .with_batch_id(batch_id); + let mut task = self.get_task(rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?; + current_batch.processing(Some(&mut task)); let (index_uid, primary_key) = match &task.kind { KindWithContent::IndexCreation { index_uid, primary_key } => { (index_uid.clone(), primary_key.clone()) @@ -501,10 +512,8 @@ impl IndexScheduler { Ok(Some(Batch::IndexCreation { index_uid, primary_key, task })) } BatchKind::IndexUpdate { id } => { - let task = self - .get_task(rtxn, id)? - .ok_or(Error::CorruptedTaskQueue)? - .with_batch_id(batch_id); + let mut task = self.get_task(rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?; + current_batch.processing(Some(&mut task)); let primary_key = match &task.kind { KindWithContent::IndexUpdate { primary_key, .. } => primary_key.clone(), _ => unreachable!(), @@ -514,13 +523,11 @@ impl IndexScheduler { BatchKind::IndexDeletion { ids } => Ok(Some(Batch::IndexDeletion { index_uid, index_has_been_created: must_create_index, - tasks: self.get_existing_tasks_with_batch_id(rtxn, batch_id, ids)?, + tasks: self.get_existing_tasks_with_processing_batch(rtxn, current_batch, ids)?, })), BatchKind::IndexSwap { id } => { - let task = self - .get_task(rtxn, id)? - .ok_or(Error::CorruptedTaskQueue)? - .with_batch_id(batch_id); + let mut task = self.get_task(rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?; + current_batch.processing(Some(&mut task)); Ok(Some(Batch::IndexSwap { task })) } } @@ -533,11 +540,16 @@ impl IndexScheduler { /// 4. We get the *next* dump to process. /// 5. We get the *next* tasks to process for a specific index. #[tracing::instrument(level = "trace", skip(self, rtxn), target = "indexing::scheduler")] - pub(crate) fn create_next_batch(&self, rtxn: &RoTxn) -> Result> { + pub(crate) fn create_next_batch( + &self, + rtxn: &RoTxn, + ) -> Result> { #[cfg(test)] self.maybe_fail(crate::tests::FailureLocation::InsideCreateBatch)?; let batch_id = self.next_batch_id(rtxn)?; + let mut current_batch = ProcessingBatch::new(batch_id); + let enqueued = &self.get_status(rtxn, Status::Enqueued)?; let to_cancel = self.get_kind(rtxn, Kind::TaskCancelation)? & enqueued; @@ -547,63 +559,51 @@ impl IndexScheduler { // We must *not* reset the processing tasks before calling this method. // Displaying the `batch_id` would make a strange error message since this task cancelation is going to // replace the canceled batch. It's better to avoid mentioning it in the error message. - let ProcessingTasks { started_at, batch_id: _, processing } = + let ProcessingTasks { batch: previous_batch, processing } = &*self.processing_tasks.read().unwrap(); + let mut task = self.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?; + current_batch.processing(Some(&mut task)); return Ok(Some(( Batch::TaskCancelation { - task: self - .get_task(rtxn, task_id)? - .ok_or(Error::CorruptedTaskQueue)? - .with_batch_id(batch_id), - previous_started_at: *started_at, + task, + // We should never be in a case where we don't have a previous_batch, but let's not crash if it happens + previous_started_at: previous_batch + .as_ref() + .map_or_else(OffsetDateTime::now_utc, |batch| batch.started_at), previous_processing_tasks: processing.clone(), }, - batch_id, + current_batch, ))); } // 2. we get the next task to delete let to_delete = self.get_kind(rtxn, Kind::TaskDeletion)? & enqueued; if !to_delete.is_empty() { - let tasks = self - .get_existing_tasks(rtxn, to_delete)? - .into_iter() - .map(|task| task.with_batch_id(batch_id)) - .collect(); - return Ok(Some((Batch::TaskDeletions(tasks), batch_id))); + let mut tasks = self.get_existing_tasks(rtxn, to_delete)?; + current_batch.processing(&mut tasks); + return Ok(Some((Batch::TaskDeletions(tasks), current_batch))); } // 3. we batch the snapshot. let to_snapshot = self.get_kind(rtxn, Kind::SnapshotCreation)? & enqueued; if !to_snapshot.is_empty() { - return Ok(Some(( - Batch::SnapshotCreation( - self.get_existing_tasks(rtxn, to_snapshot)? - .into_iter() - .map(|task| task.with_batch_id(batch_id)) - .collect(), - ), - batch_id, - ))); + let mut tasks = self.get_existing_tasks(rtxn, to_snapshot)?; + current_batch.processing(&mut tasks); + return Ok(Some((Batch::SnapshotCreation(tasks), current_batch))); } // 4. we batch the dumps. let to_dump = self.get_kind(rtxn, Kind::DumpCreation)? & enqueued; if let Some(to_dump) = to_dump.min() { - return Ok(Some(( - Batch::Dump( - self.get_task(rtxn, to_dump)? - .ok_or(Error::CorruptedTaskQueue)? - .with_batch_id(batch_id), - ), - batch_id, - ))); + let mut task = self.get_task(rtxn, to_dump)?.ok_or(Error::CorruptedTaskQueue)?; + current_batch.processing(Some(&mut task)); + return Ok(Some((Batch::Dump(task), current_batch))); } // 5. We make a batch from the unprioritised tasks. Start by taking the next enqueued task. let task_id = if let Some(task_id) = enqueued.min() { task_id } else { return Ok(None) }; - let task = - self.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?.with_batch_id(batch_id); + let mut task = self.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?; + current_batch.processing(Some(&mut task)); // If the task is not associated with any index, verify that it is an index swap and // create the batch directly. Otherwise, get the index name associated with the task @@ -613,7 +613,7 @@ impl IndexScheduler { index_name } else { assert!(matches!(&task.kind, KindWithContent::IndexSwap { swaps } if swaps.is_empty())); - return Ok(Some((Batch::IndexSwap { task }, batch_id))); + return Ok(Some((Batch::IndexSwap { task }, current_batch))); }; let index_already_exists = self.index_mapper.exists(rtxn, index_name)?; @@ -649,10 +649,10 @@ impl IndexScheduler { rtxn, index_name.to_string(), batchkind, - batch_id, + &mut current_batch, create_index, )? - .map(|batch| (batch, batch_id))); + .map(|batch| (batch, current_batch))); } // If we found no tasks then we were notified for something that got autobatched diff --git a/crates/index-scheduler/src/insta_snapshot.rs b/crates/index-scheduler/src/insta_snapshot.rs index 2ca12e1ee..da0d90baa 100644 --- a/crates/index-scheduler/src/insta_snapshot.rs +++ b/crates/index-scheduler/src/insta_snapshot.rs @@ -68,7 +68,10 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String { let processing = processing_tasks.read().unwrap().clone(); snap.push_str(&format!("### Autobatching Enabled = {autobatching_enabled}\n")); - snap.push_str(&format!("### Processing batch {:?}:\n", processing.batch_id)); + snap.push_str(&format!( + "### Processing batch {:?}:\n", + processing.batch.map(|batch| batch.uid) + )); snap.push_str(&snapshot_bitmap(&processing.processing)); snap.push_str("\n----------------------------------------------------------------------\n"); diff --git a/crates/index-scheduler/src/lib.rs b/crates/index-scheduler/src/lib.rs index 54403b8b8..05b6722e8 100644 --- a/crates/index-scheduler/src/lib.rs +++ b/crates/index-scheduler/src/lib.rs @@ -71,7 +71,7 @@ use utils::{filter_out_references_to_newer_tasks, keep_tasks_within_datetimes, m use uuid::Uuid; use crate::index_mapper::IndexMapper; -use crate::utils::{check_index_swap_validity, clamp_to_page_size, CachedBatch}; +use crate::utils::{check_index_swap_validity, clamp_to_page_size, ProcessingBatch}; pub(crate) type BEI128 = I128; @@ -161,10 +161,7 @@ impl Query { #[derive(Debug, Clone)] struct ProcessingTasks { - /// The date and time at which the indexation started. - started_at: OffsetDateTime, - /// The id of the batch processing - batch_id: Option, + batch: Option, /// The list of tasks ids that are currently running. processing: RoaringBitmap, } @@ -172,30 +169,19 @@ struct ProcessingTasks { impl ProcessingTasks { /// Creates an empty `ProcessingAt` struct. fn new() -> ProcessingTasks { - ProcessingTasks { - started_at: OffsetDateTime::now_utc(), - batch_id: None, - processing: RoaringBitmap::new(), - } + ProcessingTasks { batch: None, processing: RoaringBitmap::new() } } /// Stores the currently processing tasks, and the date time at which it started. - fn start_processing( - &mut self, - started_at: OffsetDateTime, - batch_id: BatchId, - processing: RoaringBitmap, - ) { - self.started_at = started_at; - self.batch_id = Some(batch_id); + fn start_processing(&mut self, processing_batch: ProcessingBatch, processing: RoaringBitmap) { + self.batch = Some(processing_batch); self.processing = processing; } /// Set the processing tasks to an empty list fn stop_processing(&mut self) -> Self { Self { - started_at: self.started_at, - batch_id: self.batch_id.take(), + batch: std::mem::take(&mut self.batch), processing: std::mem::take(&mut self.processing), } } @@ -785,11 +771,8 @@ impl IndexScheduler { /// Return the task ids matched by the given query from the index scheduler's point of view. pub(crate) fn get_task_ids(&self, rtxn: &RoTxn, query: &Query) -> Result { - let ProcessingTasks { - started_at: started_at_processing, - processing: processing_tasks, - batch_id: current_batch_processing, - } = self.processing_tasks.read().unwrap().clone(); + let ProcessingTasks { batch: processing_batch, processing: processing_tasks } = + self.processing_tasks.read().unwrap().clone(); let Query { limit, from, @@ -816,7 +799,7 @@ impl IndexScheduler { if let Some(batch_uids) = batch_uids { let mut batch_tasks = RoaringBitmap::new(); for batch_uid in batch_uids { - if Some(*batch_uid) == current_batch_processing { + if Some(*batch_uid) == processing_batch.as_ref().map(|batch| batch.uid) { batch_tasks |= &processing_tasks; } else { batch_tasks |= self.tasks_in_batch(rtxn, *batch_uid)?; @@ -890,13 +873,15 @@ impl IndexScheduler { // special case for Processing tasks // A closure that clears the filtered_processing_tasks if their started_at date falls outside the given bounds - let mut clear_filtered_processing_tasks = + let clear_filtered_processing_tasks = |start: Bound, end: Bound| { let start = map_bound(start, |b| b.unix_timestamp_nanos()); let end = map_bound(end, |b| b.unix_timestamp_nanos()); let is_within_dates = RangeBounds::contains( &(start, end), - &started_at_processing.unix_timestamp_nanos(), + &processing_batch + .map_or_else(OffsetDateTime::now_utc, |batch| batch.started_at) + .unix_timestamp_nanos(), ); if !is_within_dates { filtered_processing_tasks.clear(); @@ -950,42 +935,59 @@ impl IndexScheduler { } /// Return the batch ids matched by the given query from the index scheduler's point of view. - pub(crate) fn get_batch_ids(&self, rtxn: &RoTxn, query: &Query) -> Result { - let ProcessingTasks { - started_at: started_at_processing, - processing: processing_batches, - batch_id, - } = self.processing_tasks.read().unwrap().clone(); - + pub(crate) fn get_batch_ids( + &self, + rtxn: &RoTxn, + processing: &ProcessingTasks, + query: &Query, + ) -> Result { let mut batches = self.all_batch_ids(rtxn)?; + if let Some(batch_id) = processing.batch.as_ref().map(|batch| batch.uid) { + batches.insert(batch_id); + } if let Some(from) = &query.from { batches.remove_range(from.saturating_add(1)..); } + if let Some(batch_uids) = &query.batch_uids { + let batches_uids = RoaringBitmap::from_iter(batch_uids); + batches &= batches_uids; + } + if let Some(status) = &query.statuses { let mut status_batches = RoaringBitmap::new(); for status in status { - // TODO: We can't retrieve anything around processing batches so we can get rid of a lot of code here match status { // special case for Processing batches Status::Processing => { - if let Some(batch_id) = batch_id { + if let Some(batch_id) = processing.batch.as_ref().map(|batch| batch.uid) { status_batches.insert(batch_id); } } + // Enqueued tasks are not stored in batches + Status::Enqueued => (), status => status_batches |= &self.get_batch_status(rtxn, *status)?, }; } if !status.contains(&Status::Processing) { - batches -= &processing_batches; + if let Some(ref batch) = processing.batch { + batches.remove(batch.uid); + } } batches &= status_batches; } - if let Some(uids) = &query.uids { - let uids = RoaringBitmap::from_iter(uids); - batches &= &uids; + if let Some(task_uids) = &query.uids { + let mut batches_by_task_uids = RoaringBitmap::new(); + for task_uid in task_uids { + if let Some(task) = self.get_task(rtxn, *task_uid)? { + if let Some(batch_uid) = task.batch_uid { + batches_by_task_uids.insert(batch_uid); + } + } + } + batches &= batches_by_task_uids; } if let Some(canceled_by) = &query.canceled_by { @@ -1009,6 +1011,13 @@ impl IndexScheduler { let mut kind_batches = RoaringBitmap::new(); for kind in kind { kind_batches |= self.get_batch_kind(rtxn, *kind)?; + if let Some(uid) = processing + .batch + .as_ref() + .and_then(|batch| batch.kinds.contains(kind).then_some(batch.uid)) + { + kind_batches.insert(uid); + } } batches &= &kind_batches; } @@ -1017,6 +1026,13 @@ impl IndexScheduler { let mut index_batches = RoaringBitmap::new(); for index in index { index_batches |= self.index_batches(rtxn, index)?; + if let Some(uid) = processing + .batch + .as_ref() + .and_then(|batch| batch.indexes.contains(index).then_some(batch.uid)) + { + index_batches.insert(uid); + } } batches &= &index_batches; } @@ -1027,7 +1043,7 @@ impl IndexScheduler { // Once we have filtered the two subsets, we put them back together and assign it back to `batches`. batches = { let (mut filtered_non_processing_batches, mut filtered_processing_batches) = - (&batches - &processing_batches, &batches & &processing_batches); + (&batches - &processing.processing, &batches & &processing.processing); // special case for Processing batches // A closure that clears the filtered_processing_batches if their started_at date falls outside the given bounds @@ -1037,7 +1053,11 @@ impl IndexScheduler { let end = map_bound(end, |b| b.unix_timestamp_nanos()); let is_within_dates = RangeBounds::contains( &(start, end), - &started_at_processing.unix_timestamp_nanos(), + &processing + .batch + .as_ref() + .map_or_else(OffsetDateTime::now_utc, |batch| batch.started_at) + .unix_timestamp_nanos(), ); if !is_within_dates { filtered_processing_batches.clear(); @@ -1207,32 +1227,50 @@ impl IndexScheduler { query: &Query, filters: &meilisearch_auth::AuthFilter, ) -> Result<(RoaringBitmap, u64)> { + let processing = self.processing_tasks.read().unwrap().clone(); + // compute all batches matching the filter by ignoring the limits, to find the number of batches matching // the filter. // As this causes us to compute the filter twice it is slightly inefficient, but doing it this way spares // us from modifying the underlying implementation, and the performance remains sufficient. // Should this change, we would modify `get_batch_ids` to directly return the number of matching batches. - let total_batches = self.get_batch_ids(rtxn, &query.clone().without_limits())?; - let mut batches = self.get_batch_ids(rtxn, query)?; + let total_batches = + self.get_batch_ids(rtxn, &processing, &query.clone().without_limits())?; + let mut batches = self.get_batch_ids(rtxn, &processing, query)?; // If the query contains a list of index uid or there is a finite list of authorized indexes, - // then we must exclude all the kinds that aren't associated to one and only one index. + // then we must exclude all the batches that only contains tasks associated to multiple indexes. + // This works because we don't autobatch tasks associated to multiple indexes with tasks associated + // to a single index. e.g: IndexSwap cannot be batched with IndexCreation. if query.index_uids.is_some() || !filters.all_indexes_authorized() { for kind in enum_iterator::all::().filter(|kind| !kind.related_to_one_index()) { batches -= self.get_kind(rtxn, kind)?; + if let Some(batch) = processing.batch.as_ref() { + if batch.kinds.contains(&kind) { + batches.remove(batch.uid); + } + } } } // Any task that is internally associated with a non-authorized index // must be discarded. + // This works because currently batches cannot contains tasks from multiple indexes at the same time. if !filters.all_indexes_authorized() { - let all_indexes_iter = self.index_tasks.iter(rtxn)?; + let all_indexes_iter = self.batch_index_tasks.iter(rtxn)?; for result in all_indexes_iter { let (index, index_tasks) = result?; if !filters.is_index_authorized(index) { batches -= index_tasks; } } + if let Some(batch) = processing.batch.as_ref() { + for index in &batch.indexes { + if !filters.is_index_authorized(index) { + batches.remove(batch.uid); + } + } + } } Ok((batches, total_batches.len())) @@ -1260,20 +1298,22 @@ impl IndexScheduler { tasks.into_iter().rev().take(query.limit.unwrap_or(u32::MAX) as usize), )?; - let ProcessingTasks { started_at, batch_id, processing } = + let ProcessingTasks { batch, processing } = self.processing_tasks.read().map_err(|_| Error::CorruptedTaskQueue)?.clone(); let ret = tasks.into_iter(); - if processing.is_empty() { + if processing.is_empty() || batch.is_none() { Ok((ret.collect(), total)) } else { + // Safe because we ensured there was a batch in the previous branch + let batch = batch.unwrap(); Ok(( ret.map(|task| { if processing.contains(task.uid) { Task { status: Status::Processing, - batch_uid: batch_id, - started_at: Some(started_at), + batch_uid: Some(batch.uid), + started_at: Some(batch.started_at), ..task } } else { @@ -1302,32 +1342,14 @@ impl IndexScheduler { ) -> Result<(Vec, u64)> { let rtxn = self.env.read_txn()?; - let (tasks, total) = self.get_batch_ids_from_authorized_indexes(&rtxn, &query, filters)?; - let tasks = self.get_existing_batches( + let (batches, total) = + self.get_batch_ids_from_authorized_indexes(&rtxn, &query, filters)?; + let batches = self.get_existing_batches( &rtxn, - tasks.into_iter().rev().take(query.limit.unwrap_or(u32::MAX) as usize), + batches.into_iter().rev().take(query.limit.unwrap_or(u32::MAX) as usize), )?; - let ProcessingTasks { started_at, batch_id, processing } = - self.processing_tasks.read().map_err(|_| Error::CorruptedTaskQueue)?.clone(); - - let ret = tasks.into_iter(); - if processing.is_empty() { - Ok((ret.collect(), total)) - } else { - // TODO: We must re-insert the current processing batch somewhere in some way - Ok(( - ret.map(|batch| { - if processing.contains(batch.uid) { - Batch { started_at, ..batch } - } else { - batch - } - }) - .collect(), - total, - )) - } + Ok((batches, total)) } /// Register a new task in the scheduler. @@ -1495,7 +1517,7 @@ impl IndexScheduler { } let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?; - let (batch, batch_id) = + let (batch, mut processing_batch) = match self.create_next_batch(&rtxn).map_err(|e| Error::CreateBatch(Box::new(e)))? { Some(batch) => batch, None => return Ok(TickOutcome::WaitForSignal), @@ -1506,11 +1528,14 @@ impl IndexScheduler { // 1. store the starting date with the bitmap of processing tasks. let ids = batch.ids(); let processed_tasks = ids.len(); - let started_at = OffsetDateTime::now_utc(); // We reset the must_stop flag to be sure that we don't stop processing tasks self.must_stop_processing.reset(); - self.processing_tasks.write().unwrap().start_processing(started_at, batch_id, ids.clone()); + self.processing_tasks + .write() + .unwrap() + // We can clone the processing batch here because we don't want its modification to affect the view of the processing batches + .start_processing(processing_batch.clone(), ids.clone()); #[cfg(test)] self.breakpoint(Breakpoint::BatchCreated); @@ -1534,7 +1559,6 @@ impl IndexScheduler { let mut wtxn = self.env.write_txn().map_err(Error::HeedTransaction)?; let finished_at = OffsetDateTime::now_utc(); - let mut current_batch = CachedBatch::new(batch_id, started_at, finished_at); match res { Ok(tasks) => { #[cfg(test)] @@ -1545,7 +1569,7 @@ impl IndexScheduler { #[allow(unused_variables)] for (i, mut task) in tasks.into_iter().enumerate() { - task.started_at = Some(started_at); + task.started_at = Some(processing_batch.started_at); task.finished_at = Some(finished_at); #[cfg(test)] @@ -1562,7 +1586,7 @@ impl IndexScheduler { self.update_task(&mut wtxn, &task) .map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))?; - current_batch.update(&task); + processing_batch.update(&task); } tracing::info!("A batch of tasks was successfully completed with {success} successful tasks and {failure} failed tasks."); } @@ -1610,9 +1634,9 @@ impl IndexScheduler { let mut task = self .get_task(&wtxn, id) .map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))? - .ok_or(Error::CorruptedTaskQueue)? - .with_batch_id(batch_id); - task.started_at = Some(started_at); + .ok_or(Error::CorruptedTaskQueue)?; + task.batch_uid = Some(processing_batch.uid); + task.started_at = Some(processing_batch.started_at); task.finished_at = Some(finished_at); task.status = Status::Failed; task.error = Some(error.clone()); @@ -1625,14 +1649,14 @@ impl IndexScheduler { self.update_task(&mut wtxn, &task) .map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))?; - current_batch.update(&task); + processing_batch.update(&task); } } } let processed = self.processing_tasks.write().unwrap().stop_processing(); - self.write_batch(&mut wtxn, current_batch, &processed.processing)?; + self.write_batch(&mut wtxn, processing_batch, &processed.processing, finished_at)?; #[cfg(test)] self.maybe_fail(tests::FailureLocation::CommittingWtxn)?; @@ -4214,7 +4238,7 @@ mod tests { let (batches, _) = index_scheduler .get_batch_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default()) .unwrap(); - snapshot!(snapshot_bitmap(&batches), @"[1,2,]"); // only the enqueued batches in the first tick + snapshot!(snapshot_bitmap(&batches), @"[]"); // The batches don't contains any enqueued tasks let query = Query { statuses: Some(vec![Status::Enqueued, Status::Processing]), @@ -4223,7 +4247,7 @@ mod tests { let (batches, _) = index_scheduler .get_batch_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default()) .unwrap(); - snapshot!(snapshot_bitmap(&batches), @"[0,1,2,]"); // both enqueued and processing tasks in the first tick + snapshot!(snapshot_bitmap(&batches), @"[0,]"); // both enqueued and processing tasks in the first tick let query = Query { statuses: Some(vec![Status::Enqueued, Status::Processing]), @@ -4454,6 +4478,19 @@ mod tests { // associated with doggo -> empty result snapshot!(snapshot_bitmap(&batches), @"[]"); + drop(rtxn); + // We're going to advance and process all the batches for the next query to actually hit the db + handle.advance_till([ + InsideProcessBatch, + InsideProcessBatch, + ProcessBatchSucceeded, + AfterProcessing, + ]); + handle.advance_one_successful_batch(); + handle.advance_n_failed_batches(2); + snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after-processing-everything"); + let rtxn = index_scheduler.env.read_txn().unwrap(); + let query = Query::default(); let (batches, _) = index_scheduler .get_batch_ids_from_authorized_indexes( diff --git a/crates/index-scheduler/src/snapshots/lib.rs/query_batches_from_and_limit/processed_all_tasks.snap b/crates/index-scheduler/src/snapshots/lib.rs/query_batches_from_and_limit/processed_all_tasks.snap new file mode 100644 index 000000000..d22441b0b --- /dev/null +++ b/crates/index-scheduler/src/snapshots/lib.rs/query_batches_from_and_limit/processed_all_tasks.snap @@ -0,0 +1,90 @@ +--- +source: crates/index-scheduler/src/lib.rs +--- +### Autobatching Enabled = true +### Processing batch None: +[] +---------------------------------------------------------------------- +### All Tasks: +0 {uid: 0, batch_uid: 0, status: succeeded, details: { primary_key: Some("bone") }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("bone") }} +1 {uid: 1, batch_uid: 1, status: succeeded, details: { primary_key: Some("plankton") }, kind: IndexCreation { index_uid: "whalo", primary_key: Some("plankton") }} +2 {uid: 2, batch_uid: 2, status: succeeded, details: { primary_key: Some("his_own_vomit") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("his_own_vomit") }} +---------------------------------------------------------------------- +### Status: +enqueued [] +succeeded [0,1,2,] +---------------------------------------------------------------------- +### Kind: +"indexCreation" [0,1,2,] +---------------------------------------------------------------------- +### Index Tasks: +catto [2,] +doggo [0,] +whalo [1,] +---------------------------------------------------------------------- +### Index Mapper: +catto: { number_of_documents: 0, field_distribution: {} } +doggo: { number_of_documents: 0, field_distribution: {} } +whalo: { number_of_documents: 0, field_distribution: {} } + +---------------------------------------------------------------------- +### Canceled By: + +---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +---------------------------------------------------------------------- +### Started At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +---------------------------------------------------------------------- +### Finished At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +---------------------------------------------------------------------- +### All Batches: +0 {uid: 0, } +1 {uid: 1, } +2 {uid: 2, } +---------------------------------------------------------------------- +### Batch to tasks mapping: +0 [0,] +1 [1,] +2 [2,] +---------------------------------------------------------------------- +### Batches Status: +succeeded [0,1,2,] +---------------------------------------------------------------------- +### Batches Kind: +"indexCreation" [0,1,2,] +---------------------------------------------------------------------- +### Batches Index Tasks: +catto [2,] +doggo [0,] +whalo [1,] +---------------------------------------------------------------------- +### Batches Canceled By: + +---------------------------------------------------------------------- +### Batches Enqueued At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +---------------------------------------------------------------------- +### Batches Started At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +---------------------------------------------------------------------- +### Batches Finished At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +---------------------------------------------------------------------- +### File Store: + +---------------------------------------------------------------------- diff --git a/crates/index-scheduler/src/snapshots/lib.rs/query_batches_from_and_limit/registered_the_first_task.snap b/crates/index-scheduler/src/snapshots/lib.rs/query_batches_from_and_limit/registered_the_first_task.snap new file mode 100644 index 000000000..c6bf88bb0 --- /dev/null +++ b/crates/index-scheduler/src/snapshots/lib.rs/query_batches_from_and_limit/registered_the_first_task.snap @@ -0,0 +1,54 @@ +--- +source: crates/index-scheduler/src/lib.rs +--- +### Autobatching Enabled = true +### Processing batch None: +[] +---------------------------------------------------------------------- +### All Tasks: +0 {uid: 0, status: enqueued, details: { primary_key: Some("bone") }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("bone") }} +---------------------------------------------------------------------- +### Status: +enqueued [0,] +---------------------------------------------------------------------- +### Kind: +"indexCreation" [0,] +---------------------------------------------------------------------- +### Index Tasks: +doggo [0,] +---------------------------------------------------------------------- +### Index Mapper: + +---------------------------------------------------------------------- +### Canceled By: + +---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +---------------------------------------------------------------------- +### Started At: +---------------------------------------------------------------------- +### Finished At: +---------------------------------------------------------------------- +### All Batches: +---------------------------------------------------------------------- +### Batch to tasks mapping: +---------------------------------------------------------------------- +### Batches Status: +---------------------------------------------------------------------- +### Batches Kind: +---------------------------------------------------------------------- +### Batches Index Tasks: +---------------------------------------------------------------------- +### Batches Canceled By: + +---------------------------------------------------------------------- +### Batches Enqueued At: +---------------------------------------------------------------------- +### Batches Started At: +---------------------------------------------------------------------- +### Batches Finished At: +---------------------------------------------------------------------- +### File Store: + +---------------------------------------------------------------------- diff --git a/crates/index-scheduler/src/snapshots/lib.rs/query_batches_from_and_limit/registered_the_second_task.snap b/crates/index-scheduler/src/snapshots/lib.rs/query_batches_from_and_limit/registered_the_second_task.snap new file mode 100644 index 000000000..cae2c4806 --- /dev/null +++ b/crates/index-scheduler/src/snapshots/lib.rs/query_batches_from_and_limit/registered_the_second_task.snap @@ -0,0 +1,57 @@ +--- +source: crates/index-scheduler/src/lib.rs +--- +### Autobatching Enabled = true +### Processing batch None: +[] +---------------------------------------------------------------------- +### All Tasks: +0 {uid: 0, status: enqueued, details: { primary_key: Some("bone") }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("bone") }} +1 {uid: 1, status: enqueued, details: { primary_key: Some("plankton") }, kind: IndexCreation { index_uid: "whalo", primary_key: Some("plankton") }} +---------------------------------------------------------------------- +### Status: +enqueued [0,1,] +---------------------------------------------------------------------- +### Kind: +"indexCreation" [0,1,] +---------------------------------------------------------------------- +### Index Tasks: +doggo [0,] +whalo [1,] +---------------------------------------------------------------------- +### Index Mapper: + +---------------------------------------------------------------------- +### Canceled By: + +---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +[timestamp] [1,] +---------------------------------------------------------------------- +### Started At: +---------------------------------------------------------------------- +### Finished At: +---------------------------------------------------------------------- +### All Batches: +---------------------------------------------------------------------- +### Batch to tasks mapping: +---------------------------------------------------------------------- +### Batches Status: +---------------------------------------------------------------------- +### Batches Kind: +---------------------------------------------------------------------- +### Batches Index Tasks: +---------------------------------------------------------------------- +### Batches Canceled By: + +---------------------------------------------------------------------- +### Batches Enqueued At: +---------------------------------------------------------------------- +### Batches Started At: +---------------------------------------------------------------------- +### Batches Finished At: +---------------------------------------------------------------------- +### File Store: + +---------------------------------------------------------------------- diff --git a/crates/index-scheduler/src/snapshots/lib.rs/query_batches_from_and_limit/registered_the_third_task.snap b/crates/index-scheduler/src/snapshots/lib.rs/query_batches_from_and_limit/registered_the_third_task.snap new file mode 100644 index 000000000..2346e5e68 --- /dev/null +++ b/crates/index-scheduler/src/snapshots/lib.rs/query_batches_from_and_limit/registered_the_third_task.snap @@ -0,0 +1,60 @@ +--- +source: crates/index-scheduler/src/lib.rs +--- +### Autobatching Enabled = true +### Processing batch None: +[] +---------------------------------------------------------------------- +### All Tasks: +0 {uid: 0, status: enqueued, details: { primary_key: Some("bone") }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("bone") }} +1 {uid: 1, status: enqueued, details: { primary_key: Some("plankton") }, kind: IndexCreation { index_uid: "whalo", primary_key: Some("plankton") }} +2 {uid: 2, status: enqueued, details: { primary_key: Some("his_own_vomit") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("his_own_vomit") }} +---------------------------------------------------------------------- +### Status: +enqueued [0,1,2,] +---------------------------------------------------------------------- +### Kind: +"indexCreation" [0,1,2,] +---------------------------------------------------------------------- +### Index Tasks: +catto [2,] +doggo [0,] +whalo [1,] +---------------------------------------------------------------------- +### Index Mapper: + +---------------------------------------------------------------------- +### Canceled By: + +---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +---------------------------------------------------------------------- +### Started At: +---------------------------------------------------------------------- +### Finished At: +---------------------------------------------------------------------- +### All Batches: +---------------------------------------------------------------------- +### Batch to tasks mapping: +---------------------------------------------------------------------- +### Batches Status: +---------------------------------------------------------------------- +### Batches Kind: +---------------------------------------------------------------------- +### Batches Index Tasks: +---------------------------------------------------------------------- +### Batches Canceled By: + +---------------------------------------------------------------------- +### Batches Enqueued At: +---------------------------------------------------------------------- +### Batches Started At: +---------------------------------------------------------------------- +### Batches Finished At: +---------------------------------------------------------------------- +### File Store: + +---------------------------------------------------------------------- diff --git a/crates/index-scheduler/src/snapshots/lib.rs/query_batches_simple/end.snap b/crates/index-scheduler/src/snapshots/lib.rs/query_batches_simple/end.snap new file mode 100644 index 000000000..b87a06b2b --- /dev/null +++ b/crates/index-scheduler/src/snapshots/lib.rs/query_batches_simple/end.snap @@ -0,0 +1,91 @@ +--- +source: crates/index-scheduler/src/lib.rs +--- +### Autobatching Enabled = true +### Processing batch None: +[] +---------------------------------------------------------------------- +### All Tasks: +0 {uid: 0, batch_uid: 0, status: succeeded, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }} +1 {uid: 1, batch_uid: 1, status: succeeded, details: { primary_key: Some("sheep") }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("sheep") }} +2 {uid: 2, batch_uid: 2, status: failed, error: ResponseError { code: 200, message: "Planned failure for tests.", error_code: "internal", error_type: "internal", error_link: "https://docs.meilisearch.com/errors#internal" }, details: { primary_key: Some("fish") }, kind: IndexCreation { index_uid: "whalo", primary_key: Some("fish") }} +---------------------------------------------------------------------- +### Status: +enqueued [] +succeeded [0,1,] +failed [2,] +---------------------------------------------------------------------- +### Kind: +"indexCreation" [0,1,2,] +---------------------------------------------------------------------- +### Index Tasks: +catto [0,] +doggo [1,] +whalo [2,] +---------------------------------------------------------------------- +### Index Mapper: +catto: { number_of_documents: 0, field_distribution: {} } +doggo: { number_of_documents: 0, field_distribution: {} } + +---------------------------------------------------------------------- +### Canceled By: + +---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +---------------------------------------------------------------------- +### Started At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +---------------------------------------------------------------------- +### Finished At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +---------------------------------------------------------------------- +### All Batches: +0 {uid: 0, } +1 {uid: 1, } +2 {uid: 2, } +---------------------------------------------------------------------- +### Batch to tasks mapping: +0 [0,] +1 [1,] +2 [2,] +---------------------------------------------------------------------- +### Batches Status: +succeeded [0,1,] +failed [2,] +---------------------------------------------------------------------- +### Batches Kind: +"indexCreation" [0,1,2,] +---------------------------------------------------------------------- +### Batches Index Tasks: +catto [0,] +doggo [1,] +whalo [2,] +---------------------------------------------------------------------- +### Batches Canceled By: + +---------------------------------------------------------------------- +### Batches Enqueued At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +---------------------------------------------------------------------- +### Batches Started At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +---------------------------------------------------------------------- +### Batches Finished At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +---------------------------------------------------------------------- +### File Store: + +---------------------------------------------------------------------- diff --git a/crates/index-scheduler/src/snapshots/lib.rs/query_batches_simple/start.snap b/crates/index-scheduler/src/snapshots/lib.rs/query_batches_simple/start.snap new file mode 100644 index 000000000..7428155e9 --- /dev/null +++ b/crates/index-scheduler/src/snapshots/lib.rs/query_batches_simple/start.snap @@ -0,0 +1,60 @@ +--- +source: crates/index-scheduler/src/lib.rs +--- +### Autobatching Enabled = true +### Processing batch None: +[] +---------------------------------------------------------------------- +### All Tasks: +0 {uid: 0, status: enqueued, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }} +1 {uid: 1, status: enqueued, details: { primary_key: Some("sheep") }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("sheep") }} +2 {uid: 2, status: enqueued, details: { primary_key: Some("fish") }, kind: IndexCreation { index_uid: "whalo", primary_key: Some("fish") }} +---------------------------------------------------------------------- +### Status: +enqueued [0,1,2,] +---------------------------------------------------------------------- +### Kind: +"indexCreation" [0,1,2,] +---------------------------------------------------------------------- +### Index Tasks: +catto [0,] +doggo [1,] +whalo [2,] +---------------------------------------------------------------------- +### Index Mapper: + +---------------------------------------------------------------------- +### Canceled By: + +---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +---------------------------------------------------------------------- +### Started At: +---------------------------------------------------------------------- +### Finished At: +---------------------------------------------------------------------- +### All Batches: +---------------------------------------------------------------------- +### Batch to tasks mapping: +---------------------------------------------------------------------- +### Batches Status: +---------------------------------------------------------------------- +### Batches Kind: +---------------------------------------------------------------------- +### Batches Index Tasks: +---------------------------------------------------------------------- +### Batches Canceled By: + +---------------------------------------------------------------------- +### Batches Enqueued At: +---------------------------------------------------------------------- +### Batches Started At: +---------------------------------------------------------------------- +### Batches Finished At: +---------------------------------------------------------------------- +### File Store: + +---------------------------------------------------------------------- diff --git a/crates/index-scheduler/src/snapshots/lib.rs/query_batches_special_rules/after-processing-everything.snap b/crates/index-scheduler/src/snapshots/lib.rs/query_batches_special_rules/after-processing-everything.snap new file mode 100644 index 000000000..625017e91 --- /dev/null +++ b/crates/index-scheduler/src/snapshots/lib.rs/query_batches_special_rules/after-processing-everything.snap @@ -0,0 +1,102 @@ +--- +source: crates/index-scheduler/src/lib.rs +--- +### Autobatching Enabled = true +### Processing batch None: +[] +---------------------------------------------------------------------- +### All Tasks: +0 {uid: 0, batch_uid: 0, status: succeeded, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }} +1 {uid: 1, batch_uid: 1, status: succeeded, details: { primary_key: Some("sheep") }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("sheep") }} +2 {uid: 2, batch_uid: 2, status: failed, error: ResponseError { code: 200, message: "Planned failure for tests.", error_code: "internal", error_type: "internal", error_link: "https://docs.meilisearch.com/errors#internal" }, details: { swaps: [IndexSwap { indexes: ("catto", "doggo") }] }, kind: IndexSwap { swaps: [IndexSwap { indexes: ("catto", "doggo") }] }} +3 {uid: 3, batch_uid: 3, status: failed, error: ResponseError { code: 200, message: "Index `whalo` not found.", error_code: "index_not_found", error_type: "invalid_request", error_link: "https://docs.meilisearch.com/errors#index_not_found" }, details: { swaps: [IndexSwap { indexes: ("catto", "whalo") }] }, kind: IndexSwap { swaps: [IndexSwap { indexes: ("catto", "whalo") }] }} +---------------------------------------------------------------------- +### Status: +enqueued [] +succeeded [0,1,] +failed [2,3,] +---------------------------------------------------------------------- +### Kind: +"indexCreation" [0,1,] +"indexSwap" [2,3,] +---------------------------------------------------------------------- +### Index Tasks: +catto [0,2,3,] +doggo [1,2,] +whalo [3,] +---------------------------------------------------------------------- +### Index Mapper: +catto: { number_of_documents: 0, field_distribution: {} } +doggo: { number_of_documents: 0, field_distribution: {} } + +---------------------------------------------------------------------- +### Canceled By: + +---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +[timestamp] [3,] +---------------------------------------------------------------------- +### Started At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +[timestamp] [3,] +---------------------------------------------------------------------- +### Finished At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +[timestamp] [3,] +---------------------------------------------------------------------- +### All Batches: +0 {uid: 0, } +1 {uid: 1, } +2 {uid: 2, } +3 {uid: 3, } +---------------------------------------------------------------------- +### Batch to tasks mapping: +0 [0,] +1 [1,] +2 [2,] +3 [3,] +---------------------------------------------------------------------- +### Batches Status: +succeeded [0,1,] +failed [2,3,] +---------------------------------------------------------------------- +### Batches Kind: +"indexCreation" [0,1,] +"indexSwap" [2,3,] +---------------------------------------------------------------------- +### Batches Index Tasks: +catto [0,2,3,] +doggo [1,2,] +whalo [3,] +---------------------------------------------------------------------- +### Batches Canceled By: + +---------------------------------------------------------------------- +### Batches Enqueued At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +[timestamp] [3,] +---------------------------------------------------------------------- +### Batches Started At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +[timestamp] [3,] +---------------------------------------------------------------------- +### Batches Finished At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +[timestamp] [3,] +---------------------------------------------------------------------- +### File Store: + +---------------------------------------------------------------------- diff --git a/crates/index-scheduler/src/snapshots/lib.rs/query_batches_special_rules/start.snap b/crates/index-scheduler/src/snapshots/lib.rs/query_batches_special_rules/start.snap new file mode 100644 index 000000000..f5a544f18 --- /dev/null +++ b/crates/index-scheduler/src/snapshots/lib.rs/query_batches_special_rules/start.snap @@ -0,0 +1,63 @@ +--- +source: crates/index-scheduler/src/lib.rs +--- +### Autobatching Enabled = true +### Processing batch None: +[] +---------------------------------------------------------------------- +### All Tasks: +0 {uid: 0, status: enqueued, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }} +1 {uid: 1, status: enqueued, details: { primary_key: Some("sheep") }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("sheep") }} +2 {uid: 2, status: enqueued, details: { swaps: [IndexSwap { indexes: ("catto", "doggo") }] }, kind: IndexSwap { swaps: [IndexSwap { indexes: ("catto", "doggo") }] }} +3 {uid: 3, status: enqueued, details: { swaps: [IndexSwap { indexes: ("catto", "whalo") }] }, kind: IndexSwap { swaps: [IndexSwap { indexes: ("catto", "whalo") }] }} +---------------------------------------------------------------------- +### Status: +enqueued [0,1,2,3,] +---------------------------------------------------------------------- +### Kind: +"indexCreation" [0,1,] +"indexSwap" [2,3,] +---------------------------------------------------------------------- +### Index Tasks: +catto [0,2,3,] +doggo [1,2,] +whalo [3,] +---------------------------------------------------------------------- +### Index Mapper: + +---------------------------------------------------------------------- +### Canceled By: + +---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +[timestamp] [1,] +[timestamp] [2,] +[timestamp] [3,] +---------------------------------------------------------------------- +### Started At: +---------------------------------------------------------------------- +### Finished At: +---------------------------------------------------------------------- +### All Batches: +---------------------------------------------------------------------- +### Batch to tasks mapping: +---------------------------------------------------------------------- +### Batches Status: +---------------------------------------------------------------------- +### Batches Kind: +---------------------------------------------------------------------- +### Batches Index Tasks: +---------------------------------------------------------------------- +### Batches Canceled By: + +---------------------------------------------------------------------- +### Batches Enqueued At: +---------------------------------------------------------------------- +### Batches Started At: +---------------------------------------------------------------------- +### Batches Finished At: +---------------------------------------------------------------------- +### File Store: + +---------------------------------------------------------------------- diff --git a/crates/index-scheduler/src/utils.rs b/crates/index-scheduler/src/utils.rs index d557d3e2e..8ffef39f3 100644 --- a/crates/index-scheduler/src/utils.rs +++ b/crates/index-scheduler/src/utils.rs @@ -11,52 +11,65 @@ use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status use roaring::{MultiOps, RoaringBitmap}; use time::OffsetDateTime; -use crate::{Error, IndexScheduler, ProcessingTasks, Result, Task, TaskId, BEI128}; +use crate::{Error, IndexScheduler, Result, Task, TaskId, BEI128}; /// This structure contains all the information required to write a batch in the database without reading the tasks. /// It'll stay in RAM so it must be small. -pub(crate) struct CachedBatch { - uid: BatchId, - statuses: HashSet, - kinds: HashSet, - indexes: HashSet, - canceled_by: HashSet, - oldest_enqueued_at: Option, - earliest_enqueued_at: Option, - started_at: OffsetDateTime, - finished_at: OffsetDateTime, +#[derive(Debug, Clone)] +pub(crate) struct ProcessingBatch { + pub uid: BatchId, + pub statuses: HashSet, + pub kinds: HashSet, + pub indexes: HashSet, + pub canceled_by: HashSet, + pub oldest_enqueued_at: Option, + pub earliest_enqueued_at: Option, + pub started_at: OffsetDateTime, } -impl CachedBatch { - pub fn new(uid: BatchId, started_at: OffsetDateTime, finished_at: OffsetDateTime) -> Self { +impl ProcessingBatch { + pub fn new(uid: BatchId) -> Self { + // At the beginning, all the tasks are processing + let mut statuses = HashSet::default(); + statuses.insert(Status::Processing); + Self { uid, - statuses: HashSet::default(), + statuses, kinds: HashSet::default(), indexes: HashSet::default(), canceled_by: HashSet::default(), oldest_enqueued_at: None, earliest_enqueued_at: None, - started_at, - finished_at, + started_at: OffsetDateTime::now_utc(), } } + /// Remove the Processing status and update the real statuses of the tasks. pub fn update(&mut self, task: &Task) { + self.statuses.clear(); self.statuses.insert(task.status); - self.kinds.insert(task.kind.as_kind()); - self.indexes.extend(task.indexes().iter().map(|s| s.to_string())); - if let Some(canceled_by) = task.canceled_by { - self.canceled_by.insert(canceled_by); + } + + /// Update itself with the content of the task and update the batch id in the task. + pub fn processing<'a>(&mut self, tasks: impl IntoIterator) { + for task in tasks.into_iter() { + task.batch_uid = Some(self.uid); + // We don't store the statuses since they're all enqueued. + self.kinds.insert(task.kind.as_kind()); + self.indexes.extend(task.indexes().iter().map(|s| s.to_string())); + if let Some(canceled_by) = task.canceled_by { + self.canceled_by.insert(canceled_by); + } + self.oldest_enqueued_at = + Some(self.oldest_enqueued_at.map_or(task.enqueued_at, |oldest_enqueued_at| { + task.enqueued_at.min(oldest_enqueued_at) + })); + self.earliest_enqueued_at = + Some(self.earliest_enqueued_at.map_or(task.enqueued_at, |earliest_enqueued_at| { + task.enqueued_at.max(earliest_enqueued_at) + })); } - self.oldest_enqueued_at = - Some(self.oldest_enqueued_at.map_or(task.enqueued_at, |oldest_enqueued_at| { - task.enqueued_at.min(oldest_enqueued_at) - })); - self.earliest_enqueued_at = - Some(self.earliest_enqueued_at.map_or(task.enqueued_at, |earliest_enqueued_at| { - task.enqueued_at.max(earliest_enqueued_at) - })); } } @@ -97,17 +110,14 @@ impl IndexScheduler { pub(crate) fn write_batch( &self, wtxn: &mut RwTxn, - batch: CachedBatch, + batch: ProcessingBatch, tasks: &RoaringBitmap, + finished_at: OffsetDateTime, ) -> Result<()> { self.all_batches.put( wtxn, &batch.uid, - &Batch { - uid: batch.uid, - started_at: batch.started_at, - finished_at: Some(batch.finished_at), - }, + &Batch { uid: batch.uid, started_at: batch.started_at, finished_at: Some(finished_at) }, )?; self.batch_to_tasks_mapping.put(wtxn, &batch.uid, tasks)?; @@ -135,25 +145,27 @@ impl IndexScheduler { insert_task_datetime(wtxn, self.batch_enqueued_at, enqueued_at, batch.uid)?; } insert_task_datetime(wtxn, self.batch_started_at, batch.started_at, batch.uid)?; - insert_task_datetime(wtxn, self.batch_finished_at, batch.finished_at, batch.uid)?; + insert_task_datetime(wtxn, self.batch_finished_at, finished_at, batch.uid)?; Ok(()) } /// Convert an iterator to a `Vec` of tasks. The tasks MUST exist or a /// `CorruptedTaskQueue` error will be throwed. - pub(crate) fn get_existing_tasks_with_batch_id( + pub(crate) fn get_existing_tasks_with_processing_batch( &self, rtxn: &RoTxn, - batch_id: BatchId, + processing_batch: &mut ProcessingBatch, tasks: impl IntoIterator, ) -> Result> { tasks .into_iter() .map(|task_id| { - self.get_task(rtxn, task_id) - .and_then(|task| task.ok_or(Error::CorruptedTaskQueue)) - .map(|task| task.with_batch_id(batch_id)) + let mut task = self + .get_task(rtxn, task_id) + .and_then(|task| task.ok_or(Error::CorruptedTaskQueue)); + processing_batch.processing(&mut task); + task }) .collect::>() } diff --git a/crates/meilisearch-types/src/tasks.rs b/crates/meilisearch-types/src/tasks.rs index 3485962d1..f5875225a 100644 --- a/crates/meilisearch-types/src/tasks.rs +++ b/crates/meilisearch-types/src/tasks.rs @@ -62,11 +62,6 @@ impl Task { } } - pub fn with_batch_id(mut self, batch_id: TaskId) -> Self { - self.batch_uid = Some(batch_id); - self - } - /// Return the list of indexes updated by this tasks. pub fn indexes(&self) -> Vec<&str> { self.kind.indexes()