diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index f6ed8392c..97dc7a2bd 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -936,8 +936,8 @@ impl IndexScheduler { }; // the index operation can take a long time, so save this handle to make it available to the search for the duration of the tick - *self.currently_updating_index.write().unwrap() = - Some((index_uid.clone(), index.clone())); + self.index_mapper + .set_currently_updating_index(Some((index_uid.clone(), index.clone()))); let mut index_wtxn = index.write_txn()?; let tasks = self.apply_index_operation(&mut index_wtxn, &index, op)?; diff --git a/index-scheduler/src/index_mapper/mod.rs b/index-scheduler/src/index_mapper/mod.rs index 18aed42b0..58ec2bf11 100644 --- a/index-scheduler/src/index_mapper/mod.rs +++ b/index-scheduler/src/index_mapper/mod.rs @@ -69,6 +69,10 @@ pub struct IndexMapper { /// Whether we open a meilisearch index with the MDB_WRITEMAP option or not. enable_mdb_writemap: bool, pub indexer_config: Arc, + + /// A few types of long running batches of tasks that act on a single index set this field + /// so that a handle to the index is available from other threads (search) in an optimized manner. + currently_updating_index: Arc>>, } /// Whether the index is available for use or is forbidden to be inserted back in the index map @@ -151,6 +155,7 @@ impl IndexMapper { index_growth_amount, enable_mdb_writemap, indexer_config: Arc::new(indexer_config), + currently_updating_index: Default::default(), }) } @@ -303,6 +308,14 @@ impl IndexMapper { /// Return an index, may open it if it wasn't already opened. pub fn index(&self, rtxn: &RoTxn, name: &str) -> Result { + if let Some((current_name, current_index)) = + self.currently_updating_index.read().unwrap().as_ref() + { + if current_name == name { + return Ok(current_index.clone()); + } + } + let uuid = self .index_mapping .get(rtxn, name)? @@ -474,4 +487,8 @@ impl IndexMapper { pub fn indexer_config(&self) -> &IndexerConfig { &self.indexer_config } + + pub fn set_currently_updating_index(&self, index: Option<(String, Index)>) { + *self.currently_updating_index.write().unwrap() = index; + } } diff --git a/index-scheduler/src/insta_snapshot.rs b/index-scheduler/src/insta_snapshot.rs index 0adda43ff..42f041578 100644 --- a/index-scheduler/src/insta_snapshot.rs +++ b/index-scheduler/src/insta_snapshot.rs @@ -42,7 +42,6 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String { test_breakpoint_sdr: _, planned_failures: _, run_loop_iteration: _, - currently_updating_index: _, embedders: _, } = scheduler; diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 296f8add1..a5b0cb5b0 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -351,10 +351,6 @@ pub struct IndexScheduler { /// The path to the version file of Meilisearch. pub(crate) version_file_path: PathBuf, - /// A few types of long running batches of tasks that act on a single index set this field - /// so that a handle to the index is available from other threads (search) in an optimized manner. - currently_updating_index: Arc>>, - embedders: Arc>>>, // ================= test @@ -403,7 +399,6 @@ impl IndexScheduler { version_file_path: self.version_file_path.clone(), webhook_url: self.webhook_url.clone(), webhook_authorization_header: self.webhook_authorization_header.clone(), - currently_updating_index: self.currently_updating_index.clone(), embedders: self.embedders.clone(), #[cfg(test)] test_breakpoint_sdr: self.test_breakpoint_sdr.clone(), @@ -504,7 +499,6 @@ impl IndexScheduler { version_file_path: options.version_file_path, webhook_url: options.webhook_url, webhook_authorization_header: options.webhook_authorization_header, - currently_updating_index: Arc::new(RwLock::new(None)), embedders: Default::default(), #[cfg(test)] @@ -688,13 +682,6 @@ impl IndexScheduler { /// If you need to fetch information from or perform an action on all indexes, /// see the `try_for_each_index` function. pub fn index(&self, name: &str) -> Result { - if let Some((current_name, current_index)) = - self.currently_updating_index.read().unwrap().as_ref() - { - if current_name == name { - return Ok(current_index.clone()); - } - } let rtxn = self.env.read_txn()?; self.index_mapper.index(&rtxn, name) } @@ -1175,7 +1162,7 @@ impl IndexScheduler { }; // Reset the currently updating index to relinquish the index handle - *self.currently_updating_index.write().unwrap() = None; + self.index_mapper.set_currently_updating_index(None); #[cfg(test)] self.maybe_fail(tests::FailureLocation::AcquiringWtxn)?;