now that the task cancelation shares their started at with all the tasks of their batch we don't need the trick of retrieving the previous batch anymore

This commit is contained in:
Tamo 2024-11-20 10:42:29 +01:00
parent b24a34830d
commit bdb51a85fe
No known key found for this signature in database
GPG Key ID: 20CD8020AFA88D69

View File

@ -46,7 +46,7 @@ use uuid::Uuid;
use crate::autobatcher::{self, BatchKind}; use crate::autobatcher::{self, BatchKind};
use crate::utils::{self, swap_index_uid_in_task, ProcessingBatch}; use crate::utils::{self, swap_index_uid_in_task, ProcessingBatch};
use crate::{Error, IndexScheduler, MustStopProcessing, ProcessingTasks, Result, TaskId}; use crate::{Error, IndexScheduler, MustStopProcessing, Result, TaskId};
/// Represents a combination of tasks that can all be processed at the same time. /// Represents a combination of tasks that can all be processed at the same time.
/// ///
@ -58,10 +58,6 @@ pub(crate) enum Batch {
TaskCancelation { TaskCancelation {
/// The task cancelation itself. /// The task cancelation itself.
task: Task, task: Task,
/// The date and time at which the previously processing tasks started.
previous_started_at: OffsetDateTime,
/// The list of tasks that were processing when this task cancelation appeared.
previous_processing_tasks: RoaringBitmap,
}, },
TaskDeletions(Vec<Task>), TaskDeletions(Vec<Task>),
SnapshotCreation(Vec<Task>), SnapshotCreation(Vec<Task>),
@ -556,25 +552,9 @@ impl IndexScheduler {
// 1. we get the last task to cancel. // 1. we get the last task to cancel.
if let Some(task_id) = to_cancel.max() { if let Some(task_id) = to_cancel.max() {
// We retrieve the tasks that were processing before this tasks cancelation started.
// We must *not* reset the processing tasks before calling this method.
// Displaying the `batch_id` would make a strange error message since this task cancelation is going to
// replace the canceled batch. It's better to avoid mentioning it in the error message.
let ProcessingTasks { batch: previous_batch, processing } =
&*self.processing_tasks.read().unwrap();
let mut task = self.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?; let mut task = self.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
current_batch.processing(Some(&mut task)); current_batch.processing(Some(&mut task));
return Ok(Some(( return Ok(Some((Batch::TaskCancelation { task }, current_batch)));
Batch::TaskCancelation {
task,
// We should never be in a case where we don't have a previous_batch, but let's not crash if it happens
previous_started_at: previous_batch
.as_ref()
.map_or_else(OffsetDateTime::now_utc, |batch| batch.started_at),
previous_processing_tasks: processing.clone(),
},
current_batch,
)));
} }
// 2. we get the next task to delete // 2. we get the next task to delete
@ -681,7 +661,7 @@ impl IndexScheduler {
} }
match batch { match batch {
Batch::TaskCancelation { mut task, previous_started_at, previous_processing_tasks } => { Batch::TaskCancelation { mut task } => {
// 1. Retrieve the tasks that matched the query at enqueue-time. // 1. Retrieve the tasks that matched the query at enqueue-time.
let matched_tasks = let matched_tasks =
if let KindWithContent::TaskCancelation { tasks, query: _ } = &task.kind { if let KindWithContent::TaskCancelation { tasks, query: _ } = &task.kind {