From a439fa3e1adab396074bd6387f16b081c50499ef Mon Sep 17 00:00:00 2001 From: Tamo Date: Mon, 2 Dec 2024 12:02:16 +0100 Subject: [PATCH] While spamming the batches route we could see a processing batch becoming missing and then finished, this commit ensures the batches goes from processing to finished directly --- crates/index-scheduler/src/lib.rs | 9 +++++---- crates/meilisearch/tests/batches/mod.rs | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/crates/index-scheduler/src/lib.rs b/crates/index-scheduler/src/lib.rs index cef24c1ea..f2510f1f9 100644 --- a/crates/index-scheduler/src/lib.rs +++ b/crates/index-scheduler/src/lib.rs @@ -1738,11 +1738,8 @@ impl IndexScheduler { } } - self.processing_tasks.write().unwrap().stop_processing(); // We must re-add the canceled task so they're part of the same batch. - // processed.processing |= canceled; ids |= canceled; - self.write_batch(&mut wtxn, processing_batch, &ids)?; #[cfg(test)] @@ -1750,8 +1747,12 @@ impl IndexScheduler { wtxn.commit().map_err(Error::HeedTransaction)?; + // We should stop processing AFTER everything is processed and written to disk otherwise, a batch (which only lives in RAM) may appear in the processing task + // and then become « not found » for some time until the commit everything is written and the final commit is made. + self.processing_tasks.write().unwrap().stop_processing(); + // Once the tasks are committed, we should delete all the update files associated ASAP to avoid leaking files in case of a restart - tracing::debug!("Deleting the update files"); + // tracing::debug!("Deleting the update files"); //We take one read transaction **per thread**. Then, every thread is going to pull out new IDs from the roaring bitmap with the help of an atomic shared index into the bitmap let idx = AtomicU32::new(0); diff --git a/crates/meilisearch/tests/batches/mod.rs b/crates/meilisearch/tests/batches/mod.rs index 799aa3df7..9c869c140 100644 --- a/crates/meilisearch/tests/batches/mod.rs +++ b/crates/meilisearch/tests/batches/mod.rs @@ -224,7 +224,7 @@ async fn list_batches_status_and_type_filtered() { } #[actix_rt::test] -async fn get_batch_filter_error() { +async fn list_batch_filter_error() { let server = Server::new().await; let (response, code) = server.batches_filter("lol=pied").await;