mirror of
https://github.com/meilisearch/meilisearch.git
synced 2024-11-22 18:17:39 +08:00
Merge #4205
4205: Prevent search hang on the processing index r=Kerollmops a=dureuill Fixes #4206, an issue originally [reported on Discord](https://discord.com/channels/1006923006964154428/1148983671026618579/1148983671026618579) where having parallel search requests on more indexes than the index cache capacity would cause search requests on the currently updating index to hang until the index is done updating. ## Test setup - Create 20 empty indexes by sending settings to them - repeatedly send placeholder search requests to each of the indexes in a loop - Create another index and send a significant batch of documents to index. - Attempt to perform a search request on that last index. - Before this PR, the search request hangs while the index update task is processing - After this PR, the search request respond immediately even while the index update task is processing ## Changes - When getting the handle to an index for some potentially long running batches of tasks, save it in the index scheduler. - Drop the handle from the index-scheduler when the task is done so that we don't leak indexes. - When getting an index from outside the task queue processor, check if there is such an handle matching the requested index. If so, skip the cache entirely and clone the handle. Co-authored-by: Louis Dureuil <louis.dureuil@xinra.net> Co-authored-by: Louis Dureuil <louis@meilisearch.com>
This commit is contained in:
commit
b11f85a635
@ -923,6 +923,10 @@ impl IndexScheduler {
|
|||||||
self.index_mapper.index(&rtxn, &index_uid)?
|
self.index_mapper.index(&rtxn, &index_uid)?
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// the index operation can take a long time, so save this handle to make it available to the search for the duration of the tick
|
||||||
|
*self.currently_updating_index.write().unwrap() =
|
||||||
|
Some((index_uid.clone(), index.clone()));
|
||||||
|
|
||||||
let mut index_wtxn = index.write_txn()?;
|
let mut index_wtxn = index.write_txn()?;
|
||||||
let tasks = self.apply_index_operation(&mut index_wtxn, &index, op)?;
|
let tasks = self.apply_index_operation(&mut index_wtxn, &index, op)?;
|
||||||
index_wtxn.commit()?;
|
index_wtxn.commit()?;
|
||||||
|
@ -39,6 +39,7 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
|
|||||||
test_breakpoint_sdr: _,
|
test_breakpoint_sdr: _,
|
||||||
planned_failures: _,
|
planned_failures: _,
|
||||||
run_loop_iteration: _,
|
run_loop_iteration: _,
|
||||||
|
currently_updating_index: _,
|
||||||
} = scheduler;
|
} = scheduler;
|
||||||
|
|
||||||
let rtxn = env.read_txn().unwrap();
|
let rtxn = env.read_txn().unwrap();
|
||||||
|
@ -331,6 +331,10 @@ pub struct IndexScheduler {
|
|||||||
/// The path to the version file of Meilisearch.
|
/// The path to the version file of Meilisearch.
|
||||||
pub(crate) version_file_path: PathBuf,
|
pub(crate) version_file_path: PathBuf,
|
||||||
|
|
||||||
|
/// A few types of long running batches of tasks that act on a single index set this field
|
||||||
|
/// so that a handle to the index is available from other threads (search) in an optimized manner.
|
||||||
|
currently_updating_index: Arc<RwLock<Option<(String, Index)>>>,
|
||||||
|
|
||||||
// ================= test
|
// ================= test
|
||||||
// The next entry is dedicated to the tests.
|
// The next entry is dedicated to the tests.
|
||||||
/// Provide a way to set a breakpoint in multiple part of the scheduler.
|
/// Provide a way to set a breakpoint in multiple part of the scheduler.
|
||||||
@ -374,6 +378,7 @@ impl IndexScheduler {
|
|||||||
dumps_path: self.dumps_path.clone(),
|
dumps_path: self.dumps_path.clone(),
|
||||||
auth_path: self.auth_path.clone(),
|
auth_path: self.auth_path.clone(),
|
||||||
version_file_path: self.version_file_path.clone(),
|
version_file_path: self.version_file_path.clone(),
|
||||||
|
currently_updating_index: self.currently_updating_index.clone(),
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
test_breakpoint_sdr: self.test_breakpoint_sdr.clone(),
|
test_breakpoint_sdr: self.test_breakpoint_sdr.clone(),
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
@ -470,6 +475,7 @@ impl IndexScheduler {
|
|||||||
snapshots_path: options.snapshots_path,
|
snapshots_path: options.snapshots_path,
|
||||||
auth_path: options.auth_path,
|
auth_path: options.auth_path,
|
||||||
version_file_path: options.version_file_path,
|
version_file_path: options.version_file_path,
|
||||||
|
currently_updating_index: Arc::new(RwLock::new(None)),
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
test_breakpoint_sdr,
|
test_breakpoint_sdr,
|
||||||
@ -652,6 +658,13 @@ impl IndexScheduler {
|
|||||||
/// If you need to fetch information from or perform an action on all indexes,
|
/// If you need to fetch information from or perform an action on all indexes,
|
||||||
/// see the `try_for_each_index` function.
|
/// see the `try_for_each_index` function.
|
||||||
pub fn index(&self, name: &str) -> Result<Index> {
|
pub fn index(&self, name: &str) -> Result<Index> {
|
||||||
|
if let Some((current_name, current_index)) =
|
||||||
|
self.currently_updating_index.read().unwrap().as_ref()
|
||||||
|
{
|
||||||
|
if current_name == name {
|
||||||
|
return Ok(current_index.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
let rtxn = self.env.read_txn()?;
|
let rtxn = self.env.read_txn()?;
|
||||||
self.index_mapper.index(&rtxn, name)
|
self.index_mapper.index(&rtxn, name)
|
||||||
}
|
}
|
||||||
@ -1133,6 +1146,9 @@ impl IndexScheduler {
|
|||||||
handle.join().unwrap_or(Err(Error::ProcessBatchPanicked))
|
handle.join().unwrap_or(Err(Error::ProcessBatchPanicked))
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Reset the currently updating index to relinquish the index handle
|
||||||
|
*self.currently_updating_index.write().unwrap() = None;
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
self.maybe_fail(tests::FailureLocation::AcquiringWtxn)?;
|
self.maybe_fail(tests::FailureLocation::AcquiringWtxn)?;
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user