Don't iterate all indexes manually

This commit is contained in:
Louis Dureuil 2023-02-20 16:42:54 +01:00
parent 5822764be9
commit 3db613ff77
No known key found for this signature in database
6 changed files with 80 additions and 27 deletions

View File

@ -788,15 +788,15 @@ impl IndexScheduler {
dump_tasks.flush()?; dump_tasks.flush()?;
// 3. Dump the indexes // 3. Dump the indexes
for (uid, index) in self.index_mapper.indexes(&rtxn)? { self.index_mapper.try_for_each_index(&rtxn, |uid, index| -> Result<()> {
let rtxn = index.read_txn()?; let rtxn = index.read_txn()?;
let metadata = IndexMetadata { let metadata = IndexMetadata {
uid: uid.clone(), uid: uid.to_owned(),
primary_key: index.primary_key(&rtxn)?.map(String::from), primary_key: index.primary_key(&rtxn)?.map(String::from),
created_at: index.created_at(&rtxn)?, created_at: index.created_at(&rtxn)?,
updated_at: index.updated_at(&rtxn)?, updated_at: index.updated_at(&rtxn)?,
}; };
let mut index_dumper = dump.create_index(&uid, &metadata)?; let mut index_dumper = dump.create_index(uid, &metadata)?;
let fields_ids_map = index.fields_ids_map(&rtxn)?; let fields_ids_map = index.fields_ids_map(&rtxn)?;
let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect(); let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect();
@ -809,9 +809,10 @@ impl IndexScheduler {
} }
// 3.2. Dump the settings // 3.2. Dump the settings
let settings = meilisearch_types::settings::settings(&index, &rtxn)?; let settings = meilisearch_types::settings::settings(index, &rtxn)?;
index_dumper.settings(&settings)?; index_dumper.settings(&settings)?;
} Ok(())
})?;
let dump_uid = started_at.format(format_description!( let dump_uid = started_at.format(format_description!(
"[year repr:full][month repr:numerical][day padding:zero]-[hour padding:zero][minute padding:zero][second padding:zero][subsecond digits:3]" "[year repr:full][month repr:numerical][day padding:zero]-[hour padding:zero][minute padding:zero][second padding:zero][subsecond digits:3]"

View File

@ -682,18 +682,38 @@ impl IndexMapper {
Ok(index) Ok(index)
} }
/// Return all indexes, may open them if they weren't already opened. /// Attempts `f` for each index that exists in the index mapper.
pub fn indexes(&self, rtxn: &RoTxn) -> Result<Vec<(String, Index)>> { ///
/// It is preferable to use this function rather than a loop that opens all indexes, as a way to avoid having all indexes opened,
/// which is unsupported in general.
///
/// Since `f` is allowed to return a result, and `Index` is cloneable, it is still possible to wrongly build e.g. a vector of
/// all the indexes, but this function makes it harder and so less likely to do accidentally.
pub fn try_for_each_index<U, V>(
&self,
rtxn: &RoTxn,
mut f: impl FnMut(&str, &Index) -> Result<U>,
) -> Result<V>
where
V: FromIterator<U>,
{
self.index_mapping self.index_mapping
.iter(rtxn)? .iter(rtxn)?
.map(|ret| { .map(|res| {
ret.map_err(Error::from).and_then(|(name, _)| { res.map_err(Error::from)
self.index(rtxn, name).map(|index| (name.to_string(), index)) .and_then(|(name, _)| self.index(rtxn, name).and_then(|index| f(name, &index)))
})
}) })
.collect() .collect()
} }
/// Return the name of all indexes without opening them.
pub fn index_names(&self, rtxn: &RoTxn) -> Result<Vec<String>> {
self.index_mapping
.iter(rtxn)?
.map(|res| res.map_err(Error::from).map(|(name, _)| name.to_string()))
.collect()
}
/// Swap two index names. /// Swap two index names.
pub fn swap(&self, wtxn: &mut RwTxn, lhs: &str, rhs: &str) -> Result<()> { pub fn swap(&self, wtxn: &mut RwTxn, lhs: &str, rhs: &str) -> Result<()> {
let lhs_uuid = self let lhs_uuid = self

View File

@ -254,6 +254,6 @@ pub fn snapshot_canceled_by(
snap snap
} }
pub fn snapshot_index_mapper(rtxn: &RoTxn, mapper: &IndexMapper) -> String { pub fn snapshot_index_mapper(rtxn: &RoTxn, mapper: &IndexMapper) -> String {
let names = mapper.indexes(rtxn).unwrap().into_iter().map(|(n, _)| n).collect::<Vec<_>>(); let names = mapper.index_names(rtxn).unwrap();
format!("{names:?}") format!("{names:?}")
} }

View File

@ -541,15 +541,42 @@ impl IndexScheduler {
/// ///
/// * If the index wasn't opened before, the index will be opened. /// * If the index wasn't opened before, the index will be opened.
/// * If the index doesn't exist on disk, the `IndexNotFoundError` is thrown. /// * If the index doesn't exist on disk, the `IndexNotFoundError` is thrown.
///
/// ### Note
///
/// As an `Index` requires a large swath of the virtual memory address space, correct usage of an `Index` does not
/// keep its handle for too long.
///
/// Some configurations also can't reasonably open multiple indexes at once.
/// If you need to fetch information from or perform an action on all indexes,
/// see the `try_for_each_index` function.
pub fn index(&self, name: &str) -> Result<Index> { pub fn index(&self, name: &str) -> Result<Index> {
let rtxn = self.env.read_txn()?; let rtxn = self.env.read_txn()?;
self.index_mapper.index(&rtxn, name) self.index_mapper.index(&rtxn, name)
} }
/// Return and open all the indexes. /// Return the name of all indexes without opening them.
pub fn indexes(&self) -> Result<Vec<(String, Index)>> { pub fn index_names(self) -> Result<Vec<String>> {
let rtxn = self.env.read_txn()?; let rtxn = self.env.read_txn()?;
self.index_mapper.indexes(&rtxn) self.index_mapper.index_names(&rtxn)
}
/// Attempts `f` for each index that exists known to the index scheduler.
///
/// It is preferable to use this function rather than a loop that opens all indexes, as a way to avoid having all indexes opened,
/// which is unsupported in general.
///
/// Since `f` is allowed to return a result, and `Index` is cloneable, it is still possible to wrongly build e.g. a vector of
/// all the indexes, but this function makes it harder and so less likely to do accidentally.
///
/// If many indexes exist, this operation can take time to complete (in the order of seconds for a 1000 of indexes) as it needs to open
/// all the indexes.
pub fn try_for_each_index<U, V>(&self, f: impl FnMut(&str, &Index) -> Result<U>) -> Result<V>
where
V: FromIterator<U>,
{
let rtxn = self.env.read_txn()?;
self.index_mapper.try_for_each_index(&rtxn, f)
} }
/// Return the task ids matched by the given query from the index scheduler's point of view. /// Return the task ids matched by the given query from the index scheduler's point of view.

View File

@ -61,6 +61,8 @@ pub struct IndexView {
impl IndexView { impl IndexView {
fn new(uid: String, index: &Index) -> Result<IndexView, milli::Error> { fn new(uid: String, index: &Index) -> Result<IndexView, milli::Error> {
// It is important that this function does not keep the Index handle or a clone of it, because
// `list_indexes` relies on this property to avoid opening all indexes at once.
let rtxn = index.read_txn()?; let rtxn = index.read_txn()?;
Ok(IndexView { Ok(IndexView {
uid, uid,
@ -90,13 +92,15 @@ pub async fn list_indexes(
paginate: AwebQueryParameter<ListIndexes, DeserrQueryParamError>, paginate: AwebQueryParameter<ListIndexes, DeserrQueryParamError>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
let filters = index_scheduler.filters(); let filters = index_scheduler.filters();
let indexes: Vec<_> = index_scheduler.indexes()?; let indexes: Vec<Option<IndexView>> =
let indexes = indexes index_scheduler.try_for_each_index(|uid, index| -> Result<Option<IndexView>, _> {
.into_iter() if !filters.is_index_authorized(uid) {
.filter(|(name, _)| filters.is_index_authorized(name)) return Ok(None);
.map(|(name, index)| IndexView::new(name, &index)) }
.collect::<Result<Vec<_>, _>>()?; Ok(Some(IndexView::new(uid.to_string(), index)?))
})?;
// Won't cause to open all indexes because IndexView doesn't keep the `Index` opened.
let indexes: Vec<IndexView> = indexes.into_iter().flatten().collect();
let ret = paginate.as_pagination().auto_paginate_sized(indexes.into_iter()); let ret = paginate.as_pagination().auto_paginate_sized(indexes.into_iter());
debug!("returns: {:?}", ret); debug!("returns: {:?}", ret);

View File

@ -261,9 +261,9 @@ pub fn create_all_stats(
)?; )?;
// accumulate the size of each indexes // accumulate the size of each indexes
let processing_index = processing_task.first().and_then(|task| task.index_uid()); let processing_index = processing_task.first().and_then(|task| task.index_uid());
for (name, index) in index_scheduler.indexes()? { index_scheduler.try_for_each_index(|name, index| {
if !filters.is_index_authorized(&name) { if !filters.is_index_authorized(name) {
continue; return Ok(());
} }
database_size += index.on_disk_size()?; database_size += index.on_disk_size()?;
@ -278,8 +278,9 @@ pub fn create_all_stats(
let updated_at = index.updated_at(&rtxn)?; let updated_at = index.updated_at(&rtxn)?;
last_task = last_task.map_or(Some(updated_at), |last| Some(last.max(updated_at))); last_task = last_task.map_or(Some(updated_at), |last| Some(last.max(updated_at)));
indexes.insert(name, stats); indexes.insert(name.to_string(), stats);
} Ok(())
})?;
database_size += index_scheduler.size()?; database_size += index_scheduler.size()?;
database_size += auth_controller.size()?; database_size += auth_controller.size()?;