Move currently_updating_index to IndexMapper

This commit is contained in:
Louis Dureuil 2024-01-09 15:37:27 +01:00
parent 5ee1378856
commit 97bb1ff9e2
No known key found for this signature in database
4 changed files with 20 additions and 17 deletions

View File

@ -936,8 +936,8 @@ impl IndexScheduler {
}; };
// the index operation can take a long time, so save this handle to make it available to the search for the duration of the tick // the index operation can take a long time, so save this handle to make it available to the search for the duration of the tick
*self.currently_updating_index.write().unwrap() = self.index_mapper
Some((index_uid.clone(), index.clone())); .set_currently_updating_index(Some((index_uid.clone(), index.clone())));
let mut index_wtxn = index.write_txn()?; let mut index_wtxn = index.write_txn()?;
let tasks = self.apply_index_operation(&mut index_wtxn, &index, op)?; let tasks = self.apply_index_operation(&mut index_wtxn, &index, op)?;

View File

@ -69,6 +69,10 @@ pub struct IndexMapper {
/// Whether we open a meilisearch index with the MDB_WRITEMAP option or not. /// Whether we open a meilisearch index with the MDB_WRITEMAP option or not.
enable_mdb_writemap: bool, enable_mdb_writemap: bool,
pub indexer_config: Arc<IndexerConfig>, pub indexer_config: Arc<IndexerConfig>,
/// A few types of long running batches of tasks that act on a single index set this field
/// so that a handle to the index is available from other threads (search) in an optimized manner.
currently_updating_index: Arc<RwLock<Option<(String, Index)>>>,
} }
/// Whether the index is available for use or is forbidden to be inserted back in the index map /// Whether the index is available for use or is forbidden to be inserted back in the index map
@ -151,6 +155,7 @@ impl IndexMapper {
index_growth_amount, index_growth_amount,
enable_mdb_writemap, enable_mdb_writemap,
indexer_config: Arc::new(indexer_config), indexer_config: Arc::new(indexer_config),
currently_updating_index: Default::default(),
}) })
} }
@ -303,6 +308,14 @@ impl IndexMapper {
/// Return an index, may open it if it wasn't already opened. /// Return an index, may open it if it wasn't already opened.
pub fn index(&self, rtxn: &RoTxn, name: &str) -> Result<Index> { pub fn index(&self, rtxn: &RoTxn, name: &str) -> Result<Index> {
if let Some((current_name, current_index)) =
self.currently_updating_index.read().unwrap().as_ref()
{
if current_name == name {
return Ok(current_index.clone());
}
}
let uuid = self let uuid = self
.index_mapping .index_mapping
.get(rtxn, name)? .get(rtxn, name)?
@ -474,4 +487,8 @@ impl IndexMapper {
pub fn indexer_config(&self) -> &IndexerConfig { pub fn indexer_config(&self) -> &IndexerConfig {
&self.indexer_config &self.indexer_config
} }
pub fn set_currently_updating_index(&self, index: Option<(String, Index)>) {
*self.currently_updating_index.write().unwrap() = index;
}
} }

View File

@ -42,7 +42,6 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
test_breakpoint_sdr: _, test_breakpoint_sdr: _,
planned_failures: _, planned_failures: _,
run_loop_iteration: _, run_loop_iteration: _,
currently_updating_index: _,
embedders: _, embedders: _,
} = scheduler; } = scheduler;

View File

@ -351,10 +351,6 @@ pub struct IndexScheduler {
/// The path to the version file of Meilisearch. /// The path to the version file of Meilisearch.
pub(crate) version_file_path: PathBuf, pub(crate) version_file_path: PathBuf,
/// A few types of long running batches of tasks that act on a single index set this field
/// so that a handle to the index is available from other threads (search) in an optimized manner.
currently_updating_index: Arc<RwLock<Option<(String, Index)>>>,
embedders: Arc<RwLock<HashMap<EmbedderOptions, Arc<Embedder>>>>, embedders: Arc<RwLock<HashMap<EmbedderOptions, Arc<Embedder>>>>,
// ================= test // ================= test
@ -403,7 +399,6 @@ impl IndexScheduler {
version_file_path: self.version_file_path.clone(), version_file_path: self.version_file_path.clone(),
webhook_url: self.webhook_url.clone(), webhook_url: self.webhook_url.clone(),
webhook_authorization_header: self.webhook_authorization_header.clone(), webhook_authorization_header: self.webhook_authorization_header.clone(),
currently_updating_index: self.currently_updating_index.clone(),
embedders: self.embedders.clone(), embedders: self.embedders.clone(),
#[cfg(test)] #[cfg(test)]
test_breakpoint_sdr: self.test_breakpoint_sdr.clone(), test_breakpoint_sdr: self.test_breakpoint_sdr.clone(),
@ -504,7 +499,6 @@ impl IndexScheduler {
version_file_path: options.version_file_path, version_file_path: options.version_file_path,
webhook_url: options.webhook_url, webhook_url: options.webhook_url,
webhook_authorization_header: options.webhook_authorization_header, webhook_authorization_header: options.webhook_authorization_header,
currently_updating_index: Arc::new(RwLock::new(None)),
embedders: Default::default(), embedders: Default::default(),
#[cfg(test)] #[cfg(test)]
@ -688,13 +682,6 @@ impl IndexScheduler {
/// If you need to fetch information from or perform an action on all indexes, /// If you need to fetch information from or perform an action on all indexes,
/// see the `try_for_each_index` function. /// see the `try_for_each_index` function.
pub fn index(&self, name: &str) -> Result<Index> { pub fn index(&self, name: &str) -> Result<Index> {
if let Some((current_name, current_index)) =
self.currently_updating_index.read().unwrap().as_ref()
{
if current_name == name {
return Ok(current_index.clone());
}
}
let rtxn = self.env.read_txn()?; let rtxn = self.env.read_txn()?;
self.index_mapper.index(&rtxn, name) self.index_mapper.index(&rtxn, name)
} }
@ -1175,7 +1162,7 @@ impl IndexScheduler {
}; };
// Reset the currently updating index to relinquish the index handle // Reset the currently updating index to relinquish the index handle
*self.currently_updating_index.write().unwrap() = None; self.index_mapper.set_currently_updating_index(None);
#[cfg(test)] #[cfg(test)]
self.maybe_fail(tests::FailureLocation::AcquiringWtxn)?; self.maybe_fail(tests::FailureLocation::AcquiringWtxn)?;