From 91cdd502f8c616ca3d63b951ee18a5d7ffb20157 Mon Sep 17 00:00:00 2001 From: Tamo Date: Thu, 22 Feb 2024 14:56:22 +0100 Subject: [PATCH 1/2] When processing tasks, make the update file deletion atomic --- Cargo.lock | 10 +++++---- file-store/Cargo.toml | 1 + file-store/src/lib.rs | 16 +++++++++++--- index-scheduler/Cargo.toml | 1 + index-scheduler/src/lib.rs | 44 ++++++++++++++++++++++++++++++-------- 5 files changed, 56 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 684b9e5b5..971ab602a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1728,6 +1728,7 @@ dependencies = [ "faux", "tempfile", "thiserror", + "tracing", "uuid", ] @@ -2393,6 +2394,7 @@ dependencies = [ "meilisearch-types", "page_size 0.5.0", "puffin", + "rayon", "roaring", "serde", "serde_json", @@ -4077,9 +4079,9 @@ dependencies = [ [[package]] name = "rayon" -version = "1.8.0" +version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c27db03db7734835b3f53954b534c91069375ce6ccaa2e065441e07d9b6cdb1" +checksum = "fa7237101a77a10773db45d62004a272517633fbcc3df19d96455ede1122e051" dependencies = [ "either", "rayon-core", @@ -4098,9 +4100,9 @@ dependencies = [ [[package]] name = "rayon-core" -version = "1.12.0" +version = "1.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ce3fb6ad83f861aac485e76e1985cd109d9a3713802152be56c3b1f0e0658ed" +checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2" dependencies = [ "crossbeam-deque", "crossbeam-utils", diff --git a/file-store/Cargo.toml b/file-store/Cargo.toml index 1b1b0cff5..5fae1aab4 100644 --- a/file-store/Cargo.toml +++ b/file-store/Cargo.toml @@ -13,6 +13,7 @@ license.workspace = true [dependencies] tempfile = "3.9.0" thiserror = "1.0.56" +tracing = "0.1.40" uuid = { version = "1.6.1", features = ["serde", "v4"] } [dev-dependencies] diff --git a/file-store/src/lib.rs b/file-store/src/lib.rs index 75db9bb5f..0f2d348ca 100644 --- a/file-store/src/lib.rs +++ b/file-store/src/lib.rs @@ -75,7 +75,13 @@ impl FileStore { /// Returns the file corresponding to the requested uuid. pub fn get_update(&self, uuid: Uuid) -> Result { let path = self.get_update_path(uuid); - let file = StdFile::open(path)?; + let file = match StdFile::open(path) { + Ok(file) => file, + Err(e) => { + tracing::error!("Can't access update file {uuid}: {e}"); + return Err(e.into()); + } + }; Ok(file) } @@ -110,8 +116,12 @@ impl FileStore { pub fn delete(&self, uuid: Uuid) -> Result<()> { let path = self.path.join(uuid.to_string()); - std::fs::remove_file(path)?; - Ok(()) + if let Err(e) = std::fs::remove_file(path) { + tracing::error!("Can't delete file {uuid}: {e}"); + Err(e.into()) + } else { + Ok(()) + } } /// List the Uuids of the files in the FileStore diff --git a/index-scheduler/Cargo.toml b/index-scheduler/Cargo.toml index 890312854..c758f1114 100644 --- a/index-scheduler/Cargo.toml +++ b/index-scheduler/Cargo.toml @@ -23,6 +23,7 @@ meilisearch-auth = { path = "../meilisearch-auth" } meilisearch-types = { path = "../meilisearch-types" } page_size = "0.5.0" puffin = { version = "0.16.0", features = ["serialization"] } +rayon = "1.8.1" roaring = { version = "0.10.2", features = ["serde"] } serde = { version = "1.0.195", features = ["derive"] } serde_json = { version = "1.0.111", features = ["preserve_order"] } diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 7514a2a68..535b5a36e 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -60,6 +60,7 @@ use meilisearch_types::milli::{self, CboRoaringBitmapCodec, Index, RoaringBitmap use meilisearch_types::task_view::TaskView; use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task}; use puffin::FrameView; +use rayon::prelude::{IntoParallelIterator, ParallelIterator}; use roaring::RoaringBitmap; use synchronoise::SignalEvent; use time::format_description::well_known::Rfc3339; @@ -1175,6 +1176,9 @@ impl IndexScheduler { #[cfg(test)] self.breakpoint(Breakpoint::ProcessBatchSucceeded); + let mut success = 0; + let mut failure = 0; + #[allow(unused_variables)] for (i, mut task) in tasks.into_iter().enumerate() { task.started_at = Some(started_at); @@ -1187,13 +1191,15 @@ impl IndexScheduler { }, )?; + match task.error { + Some(_) => failure += 1, + None => success += 1, + } + self.update_task(&mut wtxn, &task) .map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))?; - if let Err(e) = self.delete_persisted_task_data(&task) { - tracing::error!("Failure to delete the content files associated with task {}. Error: {e}", task.uid); - } } - tracing::info!("A batch of tasks was successfully completed."); + tracing::info!("A batch of tasks was successfully completed with {success} successful tasks and {failure} failed tasks."); } // If we have an abortion error we must stop the tick here and re-schedule tasks. Err(Error::Milli(milli::Error::InternalError( @@ -1204,6 +1210,7 @@ impl IndexScheduler { self.breakpoint(Breakpoint::AbortedIndexation); wtxn.abort(); + tracing::info!("A batch of tasks was aborted."); // We make sure that we don't call `stop_processing` on the `processing_tasks`, // this is because we want to let the next tick call `create_next_batch` and keep // the `started_at` date times and `processings` of the current processing tasks. @@ -1225,6 +1232,8 @@ impl IndexScheduler { self.index_mapper.resize_index(&wtxn, &index_uid)?; wtxn.abort(); + tracing::info!("The max database size was reached. Resizing the index."); + return Ok(TickOutcome::TickAgain(0)); } // In case of a failure we must get back and patch all the tasks with the error. @@ -1232,9 +1241,9 @@ impl IndexScheduler { #[cfg(test)] self.breakpoint(Breakpoint::ProcessBatchFailed); let error: ResponseError = err.into(); - for id in ids { + for id in ids.iter() { let mut task = self - .get_task(&wtxn, id) + .get_task(&wtxn, *id) .map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))? .ok_or(Error::CorruptedTaskQueue)?; task.started_at = Some(started_at); @@ -1246,9 +1255,8 @@ impl IndexScheduler { #[cfg(test)] self.maybe_fail(tests::FailureLocation::UpdatingTaskAfterProcessBatchFailure)?; - if let Err(e) = self.delete_persisted_task_data(&task) { - tracing::error!("Failure to delete the content files associated with task {}. Error: {e}", task.uid); - } + tracing::info!("Batch failed {}", error); + self.update_task(&mut wtxn, &task) .map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))?; } @@ -1262,6 +1270,24 @@ impl IndexScheduler { wtxn.commit().map_err(Error::HeedTransaction)?; + // Once the tasks are commited, we should delete all the update files associated ASAP to avoid leaking files in case of a restart + tracing::debug!("Deleting the upadate files"); + + ids.into_par_iter().try_for_each(|id| -> Result<()> { + let rtxn = self.read_txn()?; + let task = self + .get_task(&rtxn, id) + .map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))? + .ok_or(Error::CorruptedTaskQueue)?; + if let Err(e) = self.delete_persisted_task_data(&task) { + tracing::error!( + "Failure to delete the content files associated with task {}. Error: {e}", + task.uid + ); + } + Ok(()) + })?; + // We shouldn't crash the tick function if we can't send data to the webhook. let _ = self.notify_webhook(&processed); From 066a7a3cde2695e7bd8c5d7460e1b500125d9647 Mon Sep 17 00:00:00 2001 From: Tamo Date: Mon, 26 Feb 2024 10:43:04 +0100 Subject: [PATCH 2/2] takes only one read transaction per thread --- index-scheduler/src/batch.rs | 22 +++++++++++++------- index-scheduler/src/lib.rs | 39 +++++++++++++++++++----------------- 2 files changed, 36 insertions(+), 25 deletions(-) diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 8e2eb26a0..b7e31c136 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -142,22 +142,28 @@ pub(crate) enum IndexOperation { impl Batch { /// Return the task ids associated with this batch. - pub fn ids(&self) -> Vec { + pub fn ids(&self) -> RoaringBitmap { match self { Batch::TaskCancelation { task, .. } | Batch::Dump(task) | Batch::IndexCreation { task, .. } - | Batch::IndexUpdate { task, .. } => vec![task.uid], + | Batch::IndexUpdate { task, .. } => { + RoaringBitmap::from_sorted_iter(std::iter::once(task.uid)).unwrap() + } Batch::SnapshotCreation(tasks) | Batch::TaskDeletions(tasks) - | Batch::IndexDeletion { tasks, .. } => tasks.iter().map(|task| task.uid).collect(), + | Batch::IndexDeletion { tasks, .. } => { + RoaringBitmap::from_iter(tasks.iter().map(|task| task.uid)) + } Batch::IndexOperation { op, .. } => match op { IndexOperation::DocumentOperation { tasks, .. } | IndexOperation::Settings { tasks, .. } | IndexOperation::DocumentClear { tasks, .. } => { - tasks.iter().map(|task| task.uid).collect() + RoaringBitmap::from_iter(tasks.iter().map(|task| task.uid)) + } + IndexOperation::IndexDocumentDeletionByFilter { task, .. } => { + RoaringBitmap::from_sorted_iter(std::iter::once(task.uid)).unwrap() } - IndexOperation::IndexDocumentDeletionByFilter { task, .. } => vec![task.uid], IndexOperation::SettingsAndDocumentOperation { document_import_tasks: tasks, settings_tasks: other, @@ -167,9 +173,11 @@ impl Batch { cleared_tasks: tasks, settings_tasks: other, .. - } => tasks.iter().chain(other).map(|task| task.uid).collect(), + } => RoaringBitmap::from_iter(tasks.iter().chain(other).map(|task| task.uid)), }, - Batch::IndexSwap { task } => vec![task.uid], + Batch::IndexSwap { task } => { + RoaringBitmap::from_sorted_iter(std::iter::once(task.uid)).unwrap() + } } } diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 535b5a36e..38a999ad7 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -37,8 +37,8 @@ use std::fs::File; use std::io::{self, BufReader, Read}; use std::ops::{Bound, RangeBounds}; use std::path::{Path, PathBuf}; -use std::sync::atomic::AtomicBool; -use std::sync::atomic::Ordering::Relaxed; +use std::sync::atomic::Ordering::{self, Relaxed}; +use std::sync::atomic::{AtomicBool, AtomicU32}; use std::sync::{Arc, RwLock}; use std::time::Duration; @@ -60,6 +60,7 @@ use meilisearch_types::milli::{self, CboRoaringBitmapCodec, Index, RoaringBitmap use meilisearch_types::task_view::TaskView; use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task}; use puffin::FrameView; +use rayon::current_num_threads; use rayon::prelude::{IntoParallelIterator, ParallelIterator}; use roaring::RoaringBitmap; use synchronoise::SignalEvent; @@ -1139,15 +1140,13 @@ impl IndexScheduler { drop(rtxn); // 1. store the starting date with the bitmap of processing tasks. - let mut ids = batch.ids(); - ids.sort_unstable(); + let ids = batch.ids(); let processed_tasks = ids.len(); - let processing_tasks = RoaringBitmap::from_sorted_iter(ids.iter().copied()).unwrap(); let started_at = OffsetDateTime::now_utc(); // We reset the must_stop flag to be sure that we don't stop processing tasks self.must_stop_processing.reset(); - self.processing_tasks.write().unwrap().start_processing_at(started_at, processing_tasks); + self.processing_tasks.write().unwrap().start_processing_at(started_at, ids.clone()); #[cfg(test)] self.breakpoint(Breakpoint::BatchCreated); @@ -1243,7 +1242,7 @@ impl IndexScheduler { let error: ResponseError = err.into(); for id in ids.iter() { let mut task = self - .get_task(&wtxn, *id) + .get_task(&wtxn, id) .map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))? .ok_or(Error::CorruptedTaskQueue)?; task.started_at = Some(started_at); @@ -1273,17 +1272,21 @@ impl IndexScheduler { // Once the tasks are commited, we should delete all the update files associated ASAP to avoid leaking files in case of a restart tracing::debug!("Deleting the upadate files"); - ids.into_par_iter().try_for_each(|id| -> Result<()> { + //We take one read transaction **per thread**. Then, every thread is going to pull out new IDs from the roaring bitmap with the help of an atomic shared index into the bitmap + let idx = AtomicU32::new(0); + (0..current_num_threads()).into_par_iter().try_for_each(|_| -> Result<()> { let rtxn = self.read_txn()?; - let task = self - .get_task(&rtxn, id) - .map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))? - .ok_or(Error::CorruptedTaskQueue)?; - if let Err(e) = self.delete_persisted_task_data(&task) { - tracing::error!( - "Failure to delete the content files associated with task {}. Error: {e}", - task.uid - ); + while let Some(id) = ids.select(idx.fetch_add(1, Ordering::Relaxed)) { + let task = self + .get_task(&rtxn, id) + .map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))? + .ok_or(Error::CorruptedTaskQueue)?; + if let Err(e) = self.delete_persisted_task_data(&task) { + tracing::error!( + "Failure to delete the content files associated with task {}. Error: {e}", + task.uid + ); + } } Ok(()) })?; @@ -1696,7 +1699,7 @@ pub enum TickOutcome { /// The scheduler should immediately attempt another `tick`. /// /// The `usize` field contains the number of processed tasks. - TickAgain(usize), + TickAgain(u64), /// The scheduler should wait for an external signal before attempting another `tick`. WaitForSignal, }