From a2d0c73b411844b49f37b6f9a9cbc02f26765c5e Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Fri, 10 Nov 2023 10:50:19 +0100 Subject: [PATCH 1/3] Save the currently updating index so that the search can access it at all times --- index-scheduler/src/batch.rs | 6 ++++++ index-scheduler/src/insta_snapshot.rs | 1 + index-scheduler/src/lib.rs | 14 ++++++++++++++ 3 files changed, 21 insertions(+) diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 3e2cc4281..264afadae 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -923,6 +923,9 @@ impl IndexScheduler { self.index_mapper.index(&rtxn, &index_uid)? }; + // the index operation can take a long time, so save this handle to make it available tothe search for the duration of the tick + *self.currently_updating_index.write().unwrap() = Some((index_uid.clone(), index.clone())); + let mut index_wtxn = index.write_txn()?; let tasks = self.apply_index_operation(&mut index_wtxn, &index, op)?; index_wtxn.commit()?; @@ -959,6 +962,9 @@ impl IndexScheduler { Batch::IndexUpdate { index_uid, primary_key, mut task } => { let rtxn = self.env.read_txn()?; let index = self.index_mapper.index(&rtxn, &index_uid)?; + // the index update can take a long time, so save this handle to make it available tothe search for the duration of the tick + *self.currently_updating_index.write().unwrap() = Some((index_uid.clone(), index.clone())); + if let Some(primary_key) = primary_key.clone() { let mut index_wtxn = index.write_txn()?; diff --git a/index-scheduler/src/insta_snapshot.rs b/index-scheduler/src/insta_snapshot.rs index f820ce99d..6096bad38 100644 --- a/index-scheduler/src/insta_snapshot.rs +++ b/index-scheduler/src/insta_snapshot.rs @@ -39,6 +39,7 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String { test_breakpoint_sdr: _, planned_failures: _, run_loop_iteration: _, + currently_updating_index: _, } = scheduler; let rtxn = env.read_txn().unwrap(); diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 3c61880bb..0194bdb9d 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -331,6 +331,10 @@ pub struct IndexScheduler { /// The path to the version file of Meilisearch. pub(crate) version_file_path: PathBuf, + /// A few types of long running batches of tasks that act on a single index set this field + /// so that a handle to the index is available from other threads (search) in an optimized manner. + currently_updating_index: Arc>>, + // ================= test // The next entry is dedicated to the tests. /// Provide a way to set a breakpoint in multiple part of the scheduler. @@ -374,6 +378,7 @@ impl IndexScheduler { dumps_path: self.dumps_path.clone(), auth_path: self.auth_path.clone(), version_file_path: self.version_file_path.clone(), + currently_updating_index: self.currently_updating_index.clone(), #[cfg(test)] test_breakpoint_sdr: self.test_breakpoint_sdr.clone(), #[cfg(test)] @@ -470,6 +475,7 @@ impl IndexScheduler { snapshots_path: options.snapshots_path, auth_path: options.auth_path, version_file_path: options.version_file_path, + currently_updating_index: Arc::new(RwLock::new(None)), #[cfg(test)] test_breakpoint_sdr, @@ -652,6 +658,11 @@ impl IndexScheduler { /// If you need to fetch information from or perform an action on all indexes, /// see the `try_for_each_index` function. pub fn index(&self, name: &str) -> Result { + if let Some((current_name, current_index)) = self.currently_updating_index.read().unwrap().as_ref() { + if current_name == name { + return Ok(current_index.clone()) + } + } let rtxn = self.env.read_txn()?; self.index_mapper.index(&rtxn, name) } @@ -1133,6 +1144,9 @@ impl IndexScheduler { handle.join().unwrap_or(Err(Error::ProcessBatchPanicked)) }; + // Reset the currently updating index to relinquish the index handle + *self.currently_updating_index.write().unwrap() = None; + #[cfg(test)] self.maybe_fail(tests::FailureLocation::AcquiringWtxn)?; From 492fc086f0a37fffb7d3db34880820c46115434f Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Sun, 12 Nov 2023 21:53:11 +0100 Subject: [PATCH 2/3] cargo fmt --- index-scheduler/src/batch.rs | 7 ++++--- index-scheduler/src/lib.rs | 6 ++++-- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 264afadae..96cb85562 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -924,7 +924,8 @@ impl IndexScheduler { }; // the index operation can take a long time, so save this handle to make it available tothe search for the duration of the tick - *self.currently_updating_index.write().unwrap() = Some((index_uid.clone(), index.clone())); + *self.currently_updating_index.write().unwrap() = + Some((index_uid.clone(), index.clone())); let mut index_wtxn = index.write_txn()?; let tasks = self.apply_index_operation(&mut index_wtxn, &index, op)?; @@ -963,8 +964,8 @@ impl IndexScheduler { let rtxn = self.env.read_txn()?; let index = self.index_mapper.index(&rtxn, &index_uid)?; // the index update can take a long time, so save this handle to make it available tothe search for the duration of the tick - *self.currently_updating_index.write().unwrap() = Some((index_uid.clone(), index.clone())); - + *self.currently_updating_index.write().unwrap() = + Some((index_uid.clone(), index.clone())); if let Some(primary_key) = primary_key.clone() { let mut index_wtxn = index.write_txn()?; diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 0194bdb9d..95902aa15 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -658,9 +658,11 @@ impl IndexScheduler { /// If you need to fetch information from or perform an action on all indexes, /// see the `try_for_each_index` function. pub fn index(&self, name: &str) -> Result { - if let Some((current_name, current_index)) = self.currently_updating_index.read().unwrap().as_ref() { + if let Some((current_name, current_index)) = + self.currently_updating_index.read().unwrap().as_ref() + { if current_name == name { - return Ok(current_index.clone()) + return Ok(current_index.clone()); } } let rtxn = self.env.read_txn()?; From a2d6dc857112ff10bc9f72f770d36f0a745ccd3e Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Mon, 13 Nov 2023 10:44:36 +0100 Subject: [PATCH 3/3] Fix typo, remove caching for the change of index --- index-scheduler/src/batch.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 96cb85562..aa93cda2a 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -923,7 +923,7 @@ impl IndexScheduler { self.index_mapper.index(&rtxn, &index_uid)? }; - // the index operation can take a long time, so save this handle to make it available tothe search for the duration of the tick + // the index operation can take a long time, so save this handle to make it available to the search for the duration of the tick *self.currently_updating_index.write().unwrap() = Some((index_uid.clone(), index.clone())); @@ -963,9 +963,6 @@ impl IndexScheduler { Batch::IndexUpdate { index_uid, primary_key, mut task } => { let rtxn = self.env.read_txn()?; let index = self.index_mapper.index(&rtxn, &index_uid)?; - // the index update can take a long time, so save this handle to make it available tothe search for the duration of the tick - *self.currently_updating_index.write().unwrap() = - Some((index_uid.clone(), index.clone())); if let Some(primary_key) = primary_key.clone() { let mut index_wtxn = index.write_txn()?;