diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 83c97f28a..4bafe8494 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -6,7 +6,7 @@ use std::sync::atomic::Ordering::Relaxed; use crate::{autobatcher::BatchKind, Error, IndexScheduler, Result, TaskId}; use dump::IndexMetadata; -use log::{debug, info}; +use log::{debug, error, info}; use meilisearch_types::milli::documents::obkv_to_object; use meilisearch_types::milli::update::IndexDocumentsConfig; @@ -468,7 +468,7 @@ impl IndexScheduler { }; let mut wtxn = self.env.write_txn()?; - let canceled_tasks_count = + let canceled_tasks_content_uuids = self.cancel_matched_tasks(&mut wtxn, task.uid, matched_tasks)?; task.status = Status::Succeeded; @@ -478,12 +478,29 @@ impl IndexScheduler { canceled_tasks, original_query: _, }) => { - *canceled_tasks = Some(canceled_tasks_count); + *canceled_tasks = Some(canceled_tasks_content_uuids.len() as u64); } _ => unreachable!(), } - wtxn.commit()?; + // We must only remove the content files if the transaction is successfuly committed + // and if errors occurs when we are deleting files we must do our best to delete + // everything. We do not return the encountered errors when deleting the content + // files as it is not a breaking operation and we can safely continue our job. + match wtxn.commit() { + Ok(()) => { + for content_uuid in canceled_tasks_content_uuids { + if let Err(error) = self.delete_update_file(content_uuid) { + error!( + "We failed deleting the content file indentified as {}: {}", + content_uuid, error + ) + } + } + } + Err(e) => return Err(e.into()), + } + Ok(vec![task]) } Batch::TaskDeletion(mut task) => { @@ -1022,13 +1039,13 @@ impl IndexScheduler { /// Cancel each given task from all the databases (if it is cancelable). /// - /// Return the number of tasks that were actually canceled. + /// Returns the content files that the transaction owner must delete if the commit is successful. fn cancel_matched_tasks( &self, wtxn: &mut RwTxn, cancel_task_id: TaskId, matched_tasks: &RoaringBitmap, - ) -> Result { + ) -> Result> { let now = OffsetDateTime::now_utc(); // 1. Remove from this list the tasks that we are not allowed to cancel @@ -1038,14 +1055,17 @@ impl IndexScheduler { let tasks_to_cancel = cancelable_tasks & matched_tasks; // 2. We now have a list of tasks to cancel, cancel them + let mut content_files_to_delete = Vec::new(); for mut task in self.get_existing_tasks(wtxn, tasks_to_cancel.iter())? { - self.delete_persisted_task_data(&task)?; + if let Some(uuid) = task.content_uuid() { + content_files_to_delete.push(*uuid); + } task.status = Status::Canceled; task.canceled_by = Some(cancel_task_id); task.finished_at = Some(now); self.update_task(wtxn, &task)?; } - Ok(tasks_to_cancel.len()) + Ok(content_files_to_delete) } }