From a2d0c73b411844b49f37b6f9a9cbc02f26765c5e Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Fri, 10 Nov 2023 10:50:19 +0100 Subject: [PATCH] Save the currently updating index so that the search can access it at all times --- index-scheduler/src/batch.rs | 6 ++++++ index-scheduler/src/insta_snapshot.rs | 1 + index-scheduler/src/lib.rs | 14 ++++++++++++++ 3 files changed, 21 insertions(+) diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 3e2cc4281..264afadae 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -923,6 +923,9 @@ impl IndexScheduler { self.index_mapper.index(&rtxn, &index_uid)? }; + // the index operation can take a long time, so save this handle to make it available tothe search for the duration of the tick + *self.currently_updating_index.write().unwrap() = Some((index_uid.clone(), index.clone())); + let mut index_wtxn = index.write_txn()?; let tasks = self.apply_index_operation(&mut index_wtxn, &index, op)?; index_wtxn.commit()?; @@ -959,6 +962,9 @@ impl IndexScheduler { Batch::IndexUpdate { index_uid, primary_key, mut task } => { let rtxn = self.env.read_txn()?; let index = self.index_mapper.index(&rtxn, &index_uid)?; + // the index update can take a long time, so save this handle to make it available tothe search for the duration of the tick + *self.currently_updating_index.write().unwrap() = Some((index_uid.clone(), index.clone())); + if let Some(primary_key) = primary_key.clone() { let mut index_wtxn = index.write_txn()?; diff --git a/index-scheduler/src/insta_snapshot.rs b/index-scheduler/src/insta_snapshot.rs index f820ce99d..6096bad38 100644 --- a/index-scheduler/src/insta_snapshot.rs +++ b/index-scheduler/src/insta_snapshot.rs @@ -39,6 +39,7 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String { test_breakpoint_sdr: _, planned_failures: _, run_loop_iteration: _, + currently_updating_index: _, } = scheduler; let rtxn = env.read_txn().unwrap(); diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 3c61880bb..0194bdb9d 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -331,6 +331,10 @@ pub struct IndexScheduler { /// The path to the version file of Meilisearch. pub(crate) version_file_path: PathBuf, + /// A few types of long running batches of tasks that act on a single index set this field + /// so that a handle to the index is available from other threads (search) in an optimized manner. + currently_updating_index: Arc>>, + // ================= test // The next entry is dedicated to the tests. /// Provide a way to set a breakpoint in multiple part of the scheduler. @@ -374,6 +378,7 @@ impl IndexScheduler { dumps_path: self.dumps_path.clone(), auth_path: self.auth_path.clone(), version_file_path: self.version_file_path.clone(), + currently_updating_index: self.currently_updating_index.clone(), #[cfg(test)] test_breakpoint_sdr: self.test_breakpoint_sdr.clone(), #[cfg(test)] @@ -470,6 +475,7 @@ impl IndexScheduler { snapshots_path: options.snapshots_path, auth_path: options.auth_path, version_file_path: options.version_file_path, + currently_updating_index: Arc::new(RwLock::new(None)), #[cfg(test)] test_breakpoint_sdr, @@ -652,6 +658,11 @@ impl IndexScheduler { /// If you need to fetch information from or perform an action on all indexes, /// see the `try_for_each_index` function. pub fn index(&self, name: &str) -> Result { + if let Some((current_name, current_index)) = self.currently_updating_index.read().unwrap().as_ref() { + if current_name == name { + return Ok(current_index.clone()) + } + } let rtxn = self.env.read_txn()?; self.index_mapper.index(&rtxn, name) } @@ -1133,6 +1144,9 @@ impl IndexScheduler { handle.join().unwrap_or(Err(Error::ProcessBatchPanicked)) }; + // Reset the currently updating index to relinquish the index handle + *self.currently_updating_index.write().unwrap() = None; + #[cfg(test)] self.maybe_fail(tests::FailureLocation::AcquiringWtxn)?;