mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-01-18 08:48:32 +08:00
Merge #3331
3331: Limit the number of concurrently opened indexes r=dureuill a=dureuill # Pull Request ## Related issue Relevant to #1841, fixes #3382 ## What does this PR do? ### User standpoint - Limit the number of concurrently opened indexes (currently, the number of indexes that can be concurrently opened is computed at startup) - When too many an index is opened, the least recently used one is closed and its virtual memory released. - This allows a user to have an arbitrary number of indexes of an arbitrary size ### Implementation standpoint - Added a LRU cache map in `index-scheduler::lru`. A more complete implementation (eg with helper functions not used here) is available but would better fit a dedicated crate. - Use the LRU cache map in the `IndexScheduler`. To simplify the lifecycle of indexes, they are never removed from the cache when they are in the middle of a resize or delete operation. To achieve this, an intermediate `Vec` stores the UUIDs of the indexes that are in the middle of such an operation. - Upon creating the index scheduler object, compute the total virtual memory that is adressable by using a dichotomic search on the max size of an index. Use this as a base to compute the number of indexes that can be open with 2TiB per index. If the virtual memory address space is lower than 2TiB, then only allow for 1 index of a fraction of that size. Co-authored-by: Louis Dureuil <louis@meilisearch.com>
This commit is contained in:
commit
ca25904c26
@ -788,15 +788,15 @@ impl IndexScheduler {
|
||||
dump_tasks.flush()?;
|
||||
|
||||
// 3. Dump the indexes
|
||||
for (uid, index) in self.index_mapper.indexes(&rtxn)? {
|
||||
self.index_mapper.try_for_each_index(&rtxn, |uid, index| -> Result<()> {
|
||||
let rtxn = index.read_txn()?;
|
||||
let metadata = IndexMetadata {
|
||||
uid: uid.clone(),
|
||||
uid: uid.to_owned(),
|
||||
primary_key: index.primary_key(&rtxn)?.map(String::from),
|
||||
created_at: index.created_at(&rtxn)?,
|
||||
updated_at: index.updated_at(&rtxn)?,
|
||||
};
|
||||
let mut index_dumper = dump.create_index(&uid, &metadata)?;
|
||||
let mut index_dumper = dump.create_index(uid, &metadata)?;
|
||||
|
||||
let fields_ids_map = index.fields_ids_map(&rtxn)?;
|
||||
let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect();
|
||||
@ -809,9 +809,10 @@ impl IndexScheduler {
|
||||
}
|
||||
|
||||
// 3.2. Dump the settings
|
||||
let settings = meilisearch_types::settings::settings(&index, &rtxn)?;
|
||||
let settings = meilisearch_types::settings::settings(index, &rtxn)?;
|
||||
index_dumper.settings(&settings)?;
|
||||
}
|
||||
Ok(())
|
||||
})?;
|
||||
|
||||
let dump_uid = started_at.format(format_description!(
|
||||
"[year repr:full][month repr:numerical][day padding:zero]-[hour padding:zero][minute padding:zero][second padding:zero][subsecond digits:3]"
|
||||
|
370
index-scheduler/src/index_mapper/index_map.rs
Normal file
370
index-scheduler/src/index_mapper/index_map.rs
Normal file
@ -0,0 +1,370 @@
|
||||
/// the map size to use when we don't succeed in reading it in indexes.
|
||||
const DEFAULT_MAP_SIZE: usize = 10 * 1024 * 1024 * 1024; // 10 GiB
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
use std::path::Path;
|
||||
use std::time::Duration;
|
||||
|
||||
use meilisearch_types::heed::{EnvClosingEvent, EnvOpenOptions};
|
||||
use meilisearch_types::milli::Index;
|
||||
use time::OffsetDateTime;
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::IndexStatus::{self, Available, BeingDeleted, Closing, Missing};
|
||||
use crate::lru::{InsertionOutcome, LruMap};
|
||||
use crate::{clamp_to_page_size, Result};
|
||||
|
||||
/// Keep an internally consistent view of the open indexes in memory.
|
||||
///
|
||||
/// This view is made of an LRU cache that will evict the least frequently used indexes when new indexes are opened.
|
||||
/// Indexes that are being closed (for resizing or due to cache eviction) or deleted cannot be evicted from the cache and
|
||||
/// are stored separately.
|
||||
///
|
||||
/// This view provides operations to change the state of the index as it is known in memory:
|
||||
/// open an index (making it available for queries), close an index (specifying the new size it should be opened with),
|
||||
/// delete an index.
|
||||
///
|
||||
/// External consistency with the other bits of data of an index is provided by the `IndexMapper` parent structure.
|
||||
pub struct IndexMap {
|
||||
/// A LRU map of indexes that are in the open state and available for queries.
|
||||
available: LruMap<Uuid, Index>,
|
||||
/// A map of indexes that are not available for queries, either because they are being deleted
|
||||
/// or because they are being closed.
|
||||
///
|
||||
/// If they are being deleted, the UUID points to `None`.
|
||||
unavailable: BTreeMap<Uuid, Option<ClosingIndex>>,
|
||||
|
||||
/// A monotonically increasing generation number, used to differentiate between multiple successive index closing requests.
|
||||
///
|
||||
/// Because multiple readers could be waiting on an index to close, the following could theoretically happen:
|
||||
///
|
||||
/// 1. Multiple readers wait for the index closing to occur.
|
||||
/// 2. One of them "wins the race", takes the lock and then removes the index that finished closing from the map.
|
||||
/// 3. The index is reopened, but must be closed again (such as being resized again).
|
||||
/// 4. One reader that "lost the race" in (2) wakes up and tries to take the lock and remove the index from the map.
|
||||
///
|
||||
/// In that situation, the index may or may not have finished closing. The `generation` field allows to remember which
|
||||
/// closing request was made, so the reader that "lost the race" has the old generation and will need to wait again for the index
|
||||
/// to close.
|
||||
generation: usize,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ClosingIndex {
|
||||
uuid: Uuid,
|
||||
closing_event: EnvClosingEvent,
|
||||
map_size: usize,
|
||||
generation: usize,
|
||||
}
|
||||
|
||||
impl ClosingIndex {
|
||||
/// Waits for the index to be definitely closed.
|
||||
///
|
||||
/// To avoid blocking, users should relinquish their locks to the IndexMap before calling this function.
|
||||
///
|
||||
/// After the index is physically closed, the in memory map must still be updated to take this into account.
|
||||
/// To do so, a `ReopenableIndex` is returned, that can be used to either definitely close or definitely open
|
||||
/// the index without waiting anymore.
|
||||
pub fn wait_timeout(self, timeout: Duration) -> Option<ReopenableIndex> {
|
||||
self.closing_event.wait_timeout(timeout).then_some(ReopenableIndex {
|
||||
uuid: self.uuid,
|
||||
map_size: self.map_size,
|
||||
generation: self.generation,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ReopenableIndex {
|
||||
uuid: Uuid,
|
||||
map_size: usize,
|
||||
generation: usize,
|
||||
}
|
||||
|
||||
impl ReopenableIndex {
|
||||
/// Attempts to reopen the index, which can result in the index being reopened again or not
|
||||
/// (e.g. if another thread already opened and closed the index again).
|
||||
///
|
||||
/// Use get again on the IndexMap to get the updated status.
|
||||
///
|
||||
/// Fails if the underlying index creation fails.
|
||||
///
|
||||
/// # Status table
|
||||
///
|
||||
/// | Previous Status | New Status |
|
||||
/// |-----------------|----------------------------------------------|
|
||||
/// | Missing | Missing |
|
||||
/// | BeingDeleted | BeingDeleted |
|
||||
/// | Closing | Available or Closing depending on generation |
|
||||
/// | Available | Available |
|
||||
///
|
||||
pub fn reopen(self, map: &mut IndexMap, path: &Path) -> Result<()> {
|
||||
if let Closing(reopen) = map.get(&self.uuid) {
|
||||
if reopen.generation != self.generation {
|
||||
return Ok(());
|
||||
}
|
||||
map.unavailable.remove(&self.uuid);
|
||||
map.create(&self.uuid, path, None, self.map_size)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Attempts to close the index, which may or may not result in the index being closed
|
||||
/// (e.g. if another thread already reopened the index again).
|
||||
///
|
||||
/// Use get again on the IndexMap to get the updated status.
|
||||
///
|
||||
/// # Status table
|
||||
///
|
||||
/// | Previous Status | New Status |
|
||||
/// |-----------------|--------------------------------------------|
|
||||
/// | Missing | Missing |
|
||||
/// | BeingDeleted | BeingDeleted |
|
||||
/// | Closing | Missing or Closing depending on generation |
|
||||
/// | Available | Available |
|
||||
pub fn close(self, map: &mut IndexMap) {
|
||||
if let Closing(reopen) = map.get(&self.uuid) {
|
||||
if reopen.generation != self.generation {
|
||||
return;
|
||||
}
|
||||
map.unavailable.remove(&self.uuid);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl IndexMap {
|
||||
pub fn new(cap: usize) -> IndexMap {
|
||||
Self { unavailable: Default::default(), available: LruMap::new(cap), generation: 0 }
|
||||
}
|
||||
|
||||
/// Gets the current status of an index in the map.
|
||||
///
|
||||
/// If the index is available it can be accessed from the returned status.
|
||||
pub fn get(&self, uuid: &Uuid) -> IndexStatus {
|
||||
self.available
|
||||
.get(uuid)
|
||||
.map(|index| Available(index.clone()))
|
||||
.unwrap_or_else(|| self.get_unavailable(uuid))
|
||||
}
|
||||
|
||||
fn get_unavailable(&self, uuid: &Uuid) -> IndexStatus {
|
||||
match self.unavailable.get(uuid) {
|
||||
Some(Some(reopen)) => Closing(reopen.clone()),
|
||||
Some(None) => BeingDeleted,
|
||||
None => Missing,
|
||||
}
|
||||
}
|
||||
|
||||
/// Attempts to create a new index that wasn't existing before.
|
||||
///
|
||||
/// # Status table
|
||||
///
|
||||
/// | Previous Status | New Status |
|
||||
/// |-----------------|------------|
|
||||
/// | Missing | Available |
|
||||
/// | BeingDeleted | panics |
|
||||
/// | Closing | panics |
|
||||
/// | Available | panics |
|
||||
///
|
||||
pub fn create(
|
||||
&mut self,
|
||||
uuid: &Uuid,
|
||||
path: &Path,
|
||||
date: Option<(OffsetDateTime, OffsetDateTime)>,
|
||||
map_size: usize,
|
||||
) -> Result<Index> {
|
||||
if !matches!(self.get_unavailable(uuid), Missing) {
|
||||
panic!("Attempt to open an index that was unavailable");
|
||||
}
|
||||
let index = create_or_open_index(path, date, map_size)?;
|
||||
match self.available.insert(*uuid, index.clone()) {
|
||||
InsertionOutcome::InsertedNew => (),
|
||||
InsertionOutcome::Evicted(evicted_uuid, evicted_index) => {
|
||||
self.close(evicted_uuid, evicted_index, 0);
|
||||
}
|
||||
InsertionOutcome::Replaced(_) => {
|
||||
panic!("Attempt to open an index that was already opened")
|
||||
}
|
||||
}
|
||||
Ok(index)
|
||||
}
|
||||
|
||||
/// Increases the current generation. See documentation for this field.
|
||||
///
|
||||
/// In the unlikely event that the 2^64 generations would have been exhausted, we simply wrap-around.
|
||||
///
|
||||
/// For this to cause an issue, one should be able to stop a reader in time after it got a `ReopenableIndex` and before it takes the lock
|
||||
/// to remove it from the unavailable map, and keep the reader in this frozen state for 2^64 closing of other indexes.
|
||||
///
|
||||
/// This seems overwhelmingly impossible to achieve in practice.
|
||||
fn next_generation(&mut self) -> usize {
|
||||
self.generation = self.generation.wrapping_add(1);
|
||||
self.generation
|
||||
}
|
||||
|
||||
/// Attempts to close an index.
|
||||
///
|
||||
/// # Status table
|
||||
///
|
||||
/// | Previous Status | New Status |
|
||||
/// |-----------------|---------------|
|
||||
/// | Missing | Missing |
|
||||
/// | BeingDeleted | BeingDeleted |
|
||||
/// | Closing | Closing |
|
||||
/// | Available | Closing |
|
||||
///
|
||||
pub fn close_for_resize(&mut self, uuid: &Uuid, map_size_growth: usize) {
|
||||
let Some(index) = self.available.remove(uuid) else { return; };
|
||||
self.close(*uuid, index, map_size_growth);
|
||||
}
|
||||
|
||||
fn close(&mut self, uuid: Uuid, index: Index, map_size_growth: usize) {
|
||||
let map_size = index.map_size().unwrap_or(DEFAULT_MAP_SIZE) + map_size_growth;
|
||||
let closing_event = index.prepare_for_closing();
|
||||
let generation = self.next_generation();
|
||||
self.unavailable
|
||||
.insert(uuid, Some(ClosingIndex { uuid, closing_event, map_size, generation }));
|
||||
}
|
||||
|
||||
/// Attempts to delete and index.
|
||||
///
|
||||
/// `end_deletion` must be called just after.
|
||||
///
|
||||
/// # Status table
|
||||
///
|
||||
/// | Previous Status | New Status | Return value |
|
||||
/// |-----------------|--------------|-----------------------------|
|
||||
/// | Missing | BeingDeleted | Ok(None) |
|
||||
/// | BeingDeleted | BeingDeleted | Err(None) |
|
||||
/// | Closing | Closing | Err(Some(reopen)) |
|
||||
/// | Available | BeingDeleted | Ok(Some(env_closing_event)) |
|
||||
pub fn start_deletion(
|
||||
&mut self,
|
||||
uuid: &Uuid,
|
||||
) -> std::result::Result<Option<EnvClosingEvent>, Option<ClosingIndex>> {
|
||||
if let Some(index) = self.available.remove(uuid) {
|
||||
self.unavailable.insert(*uuid, None);
|
||||
return Ok(Some(index.prepare_for_closing()));
|
||||
}
|
||||
match self.unavailable.remove(uuid) {
|
||||
Some(Some(reopen)) => Err(Some(reopen)),
|
||||
Some(None) => Err(None),
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
/// Marks that an index deletion finished.
|
||||
///
|
||||
/// Must be used after calling `start_deletion`.
|
||||
///
|
||||
/// # Status table
|
||||
///
|
||||
/// | Previous Status | New Status |
|
||||
/// |-----------------|------------|
|
||||
/// | Missing | Missing |
|
||||
/// | BeingDeleted | Missing |
|
||||
/// | Closing | panics |
|
||||
/// | Available | panics |
|
||||
pub fn end_deletion(&mut self, uuid: &Uuid) {
|
||||
assert!(
|
||||
self.available.get(uuid).is_none(),
|
||||
"Attempt to finish deletion of an index that was not being deleted"
|
||||
);
|
||||
// Do not panic if the index was Missing or BeingDeleted
|
||||
assert!(
|
||||
!matches!(self.unavailable.remove(uuid), Some(Some(_))),
|
||||
"Attempt to finish deletion of an index that was being closed"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Create or open an index in the specified path.
|
||||
/// The path *must* exist or an error will be thrown.
|
||||
fn create_or_open_index(
|
||||
path: &Path,
|
||||
date: Option<(OffsetDateTime, OffsetDateTime)>,
|
||||
map_size: usize,
|
||||
) -> Result<Index> {
|
||||
let mut options = EnvOpenOptions::new();
|
||||
options.map_size(clamp_to_page_size(map_size));
|
||||
options.max_readers(1024);
|
||||
|
||||
if let Some((created, updated)) = date {
|
||||
Ok(Index::new_with_creation_dates(options, path, created, updated)?)
|
||||
} else {
|
||||
Ok(Index::new(options, path)?)
|
||||
}
|
||||
}
|
||||
|
||||
/// Putting the tests of the LRU down there so we have access to the cache's private members
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use meilisearch_types::heed::Env;
|
||||
use meilisearch_types::Index;
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::super::IndexMapper;
|
||||
use crate::tests::IndexSchedulerHandle;
|
||||
use crate::utils::clamp_to_page_size;
|
||||
use crate::IndexScheduler;
|
||||
|
||||
impl IndexMapper {
|
||||
fn test() -> (Self, Env, IndexSchedulerHandle) {
|
||||
let (index_scheduler, handle) = IndexScheduler::test(true, vec![]);
|
||||
(index_scheduler.index_mapper, index_scheduler.env, handle)
|
||||
}
|
||||
}
|
||||
|
||||
fn check_first_unavailable(mapper: &IndexMapper, expected_uuid: Uuid, is_closing: bool) {
|
||||
let index_map = mapper.index_map.read().unwrap();
|
||||
let (uuid, state) = index_map.unavailable.first_key_value().unwrap();
|
||||
assert_eq!(uuid, &expected_uuid);
|
||||
assert_eq!(state.is_some(), is_closing);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn evict_indexes() {
|
||||
let (mapper, env, _handle) = IndexMapper::test();
|
||||
let mut uuids = vec![];
|
||||
// LRU cap + 1
|
||||
for i in 0..(5 + 1) {
|
||||
let index_name = format!("index-{i}");
|
||||
let wtxn = env.write_txn().unwrap();
|
||||
mapper.create_index(wtxn, &index_name, None).unwrap();
|
||||
let txn = env.read_txn().unwrap();
|
||||
uuids.push(mapper.index_mapping.get(&txn, &index_name).unwrap().unwrap());
|
||||
}
|
||||
// index-0 was evicted
|
||||
check_first_unavailable(&mapper, uuids[0], true);
|
||||
|
||||
// get back the evicted index
|
||||
let wtxn = env.write_txn().unwrap();
|
||||
mapper.create_index(wtxn, "index-0", None).unwrap();
|
||||
|
||||
// Least recently used is now index-1
|
||||
check_first_unavailable(&mapper, uuids[1], true);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn resize_index() {
|
||||
let (mapper, env, _handle) = IndexMapper::test();
|
||||
let index = mapper.create_index(env.write_txn().unwrap(), "index", None).unwrap();
|
||||
assert_index_size(index, mapper.index_base_map_size);
|
||||
|
||||
mapper.resize_index(&env.read_txn().unwrap(), "index").unwrap();
|
||||
|
||||
let index = mapper.create_index(env.write_txn().unwrap(), "index", None).unwrap();
|
||||
assert_index_size(index, mapper.index_base_map_size + mapper.index_growth_amount);
|
||||
|
||||
mapper.resize_index(&env.read_txn().unwrap(), "index").unwrap();
|
||||
|
||||
let index = mapper.create_index(env.write_txn().unwrap(), "index", None).unwrap();
|
||||
assert_index_size(index, mapper.index_base_map_size + mapper.index_growth_amount * 2);
|
||||
}
|
||||
|
||||
fn assert_index_size(index: Index, expected: usize) {
|
||||
let expected = clamp_to_page_size(expected);
|
||||
let index_map_size = index.map_size().unwrap();
|
||||
assert_eq!(index_map_size, expected);
|
||||
}
|
||||
}
|
@ -1,21 +1,22 @@
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::HashMap;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::path::PathBuf;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::time::Duration;
|
||||
use std::{fs, thread};
|
||||
|
||||
use log::error;
|
||||
use meilisearch_types::heed::types::Str;
|
||||
use meilisearch_types::heed::{Database, Env, EnvOpenOptions, RoTxn, RwTxn};
|
||||
use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn};
|
||||
use meilisearch_types::milli::update::IndexerConfig;
|
||||
use meilisearch_types::milli::Index;
|
||||
use synchronoise::SignalEvent;
|
||||
use time::OffsetDateTime;
|
||||
use uuid::Uuid;
|
||||
|
||||
use self::IndexStatus::{Available, BeingDeleted, BeingResized};
|
||||
use self::index_map::IndexMap;
|
||||
use self::IndexStatus::{Available, BeingDeleted, Closing, Missing};
|
||||
use crate::uuid_codec::UuidCodec;
|
||||
use crate::{clamp_to_page_size, Error, Result};
|
||||
use crate::{Error, Result};
|
||||
|
||||
mod index_map;
|
||||
|
||||
const INDEX_MAPPING: &str = "index-mapping";
|
||||
|
||||
@ -26,17 +27,38 @@ const INDEX_MAPPING: &str = "index-mapping";
|
||||
/// 2. Opening indexes and storing references to these opened indexes
|
||||
/// 3. Accessing indexes through their uuid
|
||||
/// 4. Mapping a user-defined name to each index uuid.
|
||||
///
|
||||
/// # Implementation notes
|
||||
///
|
||||
/// An index exists as 3 bits of data:
|
||||
/// 1. The index data on disk, that can exist in 3 states: Missing, Present, or BeingDeleted.
|
||||
/// 2. The persistent database containing the association between the index' name and its UUID,
|
||||
/// that can exist in 2 states: Missing or Present.
|
||||
/// 3. The state of the index in the in-memory `IndexMap`, that can exist in multiple states:
|
||||
/// - Missing
|
||||
/// - Available
|
||||
/// - Closing (because an index needs resizing or was evicted from the cache)
|
||||
/// - BeingDeleted
|
||||
///
|
||||
/// All of this data should be kept consistent between index operations, which is achieved by the `IndexMapper`
|
||||
/// with the use of the following primitives:
|
||||
/// - A RwLock on the `IndexMap`.
|
||||
/// - Transactions on the association database.
|
||||
/// - ClosingEvent signals emitted when closing an environment.
|
||||
#[derive(Clone)]
|
||||
pub struct IndexMapper {
|
||||
/// Keep track of the opened indexes. Used mainly by the index resolver.
|
||||
index_map: Arc<RwLock<HashMap<Uuid, IndexStatus>>>,
|
||||
index_map: Arc<RwLock<IndexMap>>,
|
||||
|
||||
/// Map an index name with an index uuid currently available on disk.
|
||||
pub(crate) index_mapping: Database<Str, UuidCodec>,
|
||||
|
||||
/// Path to the folder where the LMDB environments of each index are.
|
||||
base_path: PathBuf,
|
||||
index_size: usize,
|
||||
/// The map size an index is opened with on the first time.
|
||||
index_base_map_size: usize,
|
||||
/// The quantity by which the map size of an index is incremented upon reopening, in bytes.
|
||||
index_growth_amount: usize,
|
||||
pub indexer_config: Arc<IndexerConfig>,
|
||||
}
|
||||
|
||||
@ -44,10 +66,12 @@ pub struct IndexMapper {
|
||||
#[allow(clippy::large_enum_variant)]
|
||||
#[derive(Clone)]
|
||||
pub enum IndexStatus {
|
||||
/// Not currently in the index map.
|
||||
Missing,
|
||||
/// Do not insert it back in the index map as it is currently being deleted.
|
||||
BeingDeleted,
|
||||
/// Temporarily do not insert the index in the index map as it is currently being resized.
|
||||
BeingResized(Arc<SignalEvent>),
|
||||
/// Temporarily do not insert the index in the index map as it is currently being resized/evicted from the map.
|
||||
Closing(index_map::ClosingIndex),
|
||||
/// You can use the index without worrying about anything.
|
||||
Available(Index),
|
||||
}
|
||||
@ -56,37 +80,21 @@ impl IndexMapper {
|
||||
pub fn new(
|
||||
env: &Env,
|
||||
base_path: PathBuf,
|
||||
index_size: usize,
|
||||
index_base_map_size: usize,
|
||||
index_growth_amount: usize,
|
||||
index_count: usize,
|
||||
indexer_config: IndexerConfig,
|
||||
) -> Result<Self> {
|
||||
Ok(Self {
|
||||
index_map: Arc::default(),
|
||||
index_map: Arc::new(RwLock::new(IndexMap::new(index_count))),
|
||||
index_mapping: env.create_database(Some(INDEX_MAPPING))?,
|
||||
base_path,
|
||||
index_size,
|
||||
index_base_map_size,
|
||||
index_growth_amount,
|
||||
indexer_config: Arc::new(indexer_config),
|
||||
})
|
||||
}
|
||||
|
||||
/// Create or open an index in the specified path.
|
||||
/// The path *must* exists or an error will be thrown.
|
||||
fn create_or_open_index(
|
||||
&self,
|
||||
path: &Path,
|
||||
date: Option<(OffsetDateTime, OffsetDateTime)>,
|
||||
map_size: usize,
|
||||
) -> Result<Index> {
|
||||
let mut options = EnvOpenOptions::new();
|
||||
options.map_size(clamp_to_page_size(map_size));
|
||||
options.max_readers(1024);
|
||||
|
||||
if let Some((created, updated)) = date {
|
||||
Ok(Index::new_with_creation_dates(options, path, created, updated)?)
|
||||
} else {
|
||||
Ok(Index::new(options, path)?)
|
||||
}
|
||||
}
|
||||
|
||||
/// Get or create the index.
|
||||
pub fn create_index(
|
||||
&self,
|
||||
@ -106,16 +114,17 @@ impl IndexMapper {
|
||||
let index_path = self.base_path.join(uuid.to_string());
|
||||
fs::create_dir_all(&index_path)?;
|
||||
|
||||
let index = self.create_or_open_index(&index_path, date, self.index_size)?;
|
||||
|
||||
wtxn.commit()?;
|
||||
// Error if the UUIDv4 somehow already exists in the map, since it should be fresh.
|
||||
// This is very unlikely to happen in practice.
|
||||
// TODO: it would be better to lazily create the index. But we need an Index::open function for milli.
|
||||
if self.index_map.write().unwrap().insert(uuid, Available(index.clone())).is_some()
|
||||
{
|
||||
panic!("Uuid v4 conflict: index with UUID {uuid} already exists.");
|
||||
}
|
||||
let index = self.index_map.write().unwrap().create(
|
||||
&uuid,
|
||||
&index_path,
|
||||
date,
|
||||
self.index_base_map_size,
|
||||
)?;
|
||||
|
||||
wtxn.commit()?;
|
||||
|
||||
Ok(index)
|
||||
}
|
||||
@ -135,23 +144,42 @@ impl IndexMapper {
|
||||
assert!(self.index_mapping.delete(&mut wtxn, name)?);
|
||||
|
||||
wtxn.commit()?;
|
||||
// We remove the index from the in-memory index map.
|
||||
|
||||
let mut tries = 0;
|
||||
// Attempts to remove the index from the in-memory index map in a loop.
|
||||
//
|
||||
// If the index is currently being closed, we will wait for it to be closed and retry getting it in a subsequent
|
||||
// loop iteration.
|
||||
//
|
||||
// We make 100 attempts before giving up.
|
||||
// This could happen in the following situations:
|
||||
//
|
||||
// 1. There is a bug preventing the index from being correctly closed, or us from detecting this.
|
||||
// 2. A user of the index is keeping it open for more than 600 seconds. This could happen e.g. during a pathological search.
|
||||
// This can not be caused by indexation because deleting an index happens in the scheduler itself, so cannot be concurrent with indexation.
|
||||
//
|
||||
// In these situations, reporting the error through a panic is in order.
|
||||
let closing_event = loop {
|
||||
let mut lock = self.index_map.write().unwrap();
|
||||
let resize_operation = match lock.insert(uuid, BeingDeleted) {
|
||||
Some(Available(index)) => break Some(index.prepare_for_closing()),
|
||||
// The target index is in the middle of a resize operation.
|
||||
// Wait for this operation to complete, then try again.
|
||||
Some(BeingResized(resize_operation)) => resize_operation.clone(),
|
||||
// The index is already being deleted or doesn't exist.
|
||||
// It's OK to remove it from the map again.
|
||||
_ => break None,
|
||||
};
|
||||
|
||||
// Avoiding deadlocks: we need to drop the lock before waiting for the end of the resize, which
|
||||
// will involve operations on the very map we're locking.
|
||||
drop(lock);
|
||||
resize_operation.wait();
|
||||
match lock.start_deletion(&uuid) {
|
||||
Ok(env_closing) => break env_closing,
|
||||
Err(Some(reopen)) => {
|
||||
// drop the lock here so that we don't synchronously wait for the index to close.
|
||||
drop(lock);
|
||||
tries += 1;
|
||||
if tries >= 100 {
|
||||
panic!("Too many attempts to close index {name} prior to deletion.")
|
||||
}
|
||||
let reopen = if let Some(reopen) = reopen.wait_timeout(Duration::from_secs(6)) {
|
||||
reopen
|
||||
} else {
|
||||
continue;
|
||||
};
|
||||
reopen.close(&mut self.index_map.write().unwrap());
|
||||
continue;
|
||||
}
|
||||
Err(None) => return Ok(()),
|
||||
}
|
||||
};
|
||||
|
||||
let index_map = self.index_map.clone();
|
||||
@ -161,7 +189,7 @@ impl IndexMapper {
|
||||
.name(String::from("index_deleter"))
|
||||
.spawn(move || {
|
||||
// We first wait to be sure that the previously opened index is effectively closed.
|
||||
// This can take a lot of time, this is why we do that in a seperate thread.
|
||||
// This can take a lot of time, this is why we do that in a separate thread.
|
||||
if let Some(closing_event) = closing_event {
|
||||
closing_event.wait();
|
||||
}
|
||||
@ -175,7 +203,7 @@ impl IndexMapper {
|
||||
}
|
||||
|
||||
// Finally we remove the entry from the index map.
|
||||
assert!(matches!(index_map.write().unwrap().remove(&uuid), Some(BeingDeleted)));
|
||||
index_map.write().unwrap().end_deletion(&uuid);
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
@ -195,76 +223,15 @@ impl IndexMapper {
|
||||
/// - If the Index corresponding to the passed name is concurrently being deleted/resized or cannot be found in the
|
||||
/// in memory hash map.
|
||||
pub fn resize_index(&self, rtxn: &RoTxn, name: &str) -> Result<()> {
|
||||
// fixme: factor to a function?
|
||||
let uuid = self
|
||||
.index_mapping
|
||||
.get(rtxn, name)?
|
||||
.ok_or_else(|| Error::IndexNotFound(name.to_string()))?;
|
||||
|
||||
// We remove the index from the in-memory index map.
|
||||
let mut lock = self.index_map.write().unwrap();
|
||||
// signal that will be sent when the resize operation completes
|
||||
let resize_operation = Arc::new(SignalEvent::manual(false));
|
||||
let index = match lock.insert(uuid, BeingResized(resize_operation)) {
|
||||
Some(Available(index)) => index,
|
||||
Some(previous_status) => {
|
||||
lock.insert(uuid, previous_status);
|
||||
panic!(
|
||||
"Attempting to resize index {name} that is already being resized or deleted."
|
||||
)
|
||||
}
|
||||
None => {
|
||||
panic!("Could not find the status of index {name} in the in-memory index mapper.")
|
||||
}
|
||||
};
|
||||
self.index_map.write().unwrap().close_for_resize(&uuid, self.index_growth_amount);
|
||||
|
||||
drop(lock);
|
||||
|
||||
let resize_succeeded = (move || {
|
||||
let current_size = index.map_size()?;
|
||||
let new_size = current_size * 2;
|
||||
let closing_event = index.prepare_for_closing();
|
||||
|
||||
log::debug!("Waiting for index {name} to close");
|
||||
|
||||
if !closing_event.wait_timeout(std::time::Duration::from_secs(600)) {
|
||||
// fail after 10 minutes waiting
|
||||
panic!("Could not resize index {name} (unable to close it)");
|
||||
}
|
||||
|
||||
log::info!("Resized index {name} from {current_size} to {new_size} bytes");
|
||||
let index_path = self.base_path.join(uuid.to_string());
|
||||
let index = self.create_or_open_index(&index_path, None, new_size)?;
|
||||
Ok(index)
|
||||
})();
|
||||
|
||||
// Put the map back to a consistent state.
|
||||
// Even if there was an error we don't want to leave the map in an inconsistent state as it would cause
|
||||
// deadlocks.
|
||||
let mut lock = self.index_map.write().unwrap();
|
||||
let (resize_operation, resize_succeeded) = match resize_succeeded {
|
||||
Ok(index) => {
|
||||
// insert the resized index
|
||||
let Some(BeingResized(resize_operation)) = lock.insert(uuid, Available(index)) else {
|
||||
panic!("Index state for index {name} was modified while it was being resized")
|
||||
};
|
||||
|
||||
(resize_operation, Ok(()))
|
||||
}
|
||||
Err(error) => {
|
||||
// there was an error, not much we can do... delete the index from the in-memory map to prevent future errors
|
||||
let Some(BeingResized(resize_operation)) = lock.remove(&uuid) else {
|
||||
panic!("Index state for index {name} was modified while it was being resized")
|
||||
};
|
||||
(resize_operation, Err(error))
|
||||
}
|
||||
};
|
||||
|
||||
// drop the lock before signaling completion so that other threads don't immediately await on the lock after waking up.
|
||||
drop(lock);
|
||||
resize_operation.signal();
|
||||
|
||||
resize_succeeded
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Return an index, may open it if it wasn't already opened.
|
||||
@ -274,47 +241,68 @@ impl IndexMapper {
|
||||
.get(rtxn, name)?
|
||||
.ok_or_else(|| Error::IndexNotFound(name.to_string()))?;
|
||||
|
||||
// we clone here to drop the lock before entering the match
|
||||
let mut tries = 0;
|
||||
// attempts to open the index in a loop.
|
||||
//
|
||||
// If the index is currently being closed, we will wait for it to be closed and retry getting it in a subsequent
|
||||
// loop iteration.
|
||||
//
|
||||
// We make 100 attempts before giving up.
|
||||
// This could happen in the following situations:
|
||||
//
|
||||
// 1. There is a bug preventing the index from being correctly closed, or us from detecting it was.
|
||||
// 2. A user of the index is keeping it open for more than 600 seconds. This could happen e.g. during a long indexation,
|
||||
// a pathological search, and so on.
|
||||
//
|
||||
// In these situations, reporting the error through a panic is in order.
|
||||
let index = loop {
|
||||
let index = self.index_map.read().unwrap().get(&uuid).cloned();
|
||||
tries += 1;
|
||||
if tries > 100 {
|
||||
panic!("Too many spurious wake ups while trying to open the index {name}");
|
||||
}
|
||||
|
||||
// we get the index here to drop the lock before entering the match
|
||||
let index = self.index_map.read().unwrap().get(&uuid);
|
||||
|
||||
match index {
|
||||
Some(Available(index)) => break index,
|
||||
Some(BeingResized(ref resize_operation)) => {
|
||||
Available(index) => break index,
|
||||
Closing(reopen) => {
|
||||
// Avoiding deadlocks: no lock taken while doing this operation.
|
||||
resize_operation.wait();
|
||||
let reopen = if let Some(reopen) = reopen.wait_timeout(Duration::from_secs(6)) {
|
||||
reopen
|
||||
} else {
|
||||
continue;
|
||||
};
|
||||
let index_path = self.base_path.join(uuid.to_string());
|
||||
// take the lock to reopen the environment.
|
||||
reopen.reopen(&mut self.index_map.write().unwrap(), &index_path)?;
|
||||
continue;
|
||||
}
|
||||
Some(BeingDeleted) => return Err(Error::IndexNotFound(name.to_string())),
|
||||
BeingDeleted => return Err(Error::IndexNotFound(name.to_string())),
|
||||
// since we're lazy, it's possible that the index has not been opened yet.
|
||||
None => {
|
||||
Missing => {
|
||||
let mut index_map = self.index_map.write().unwrap();
|
||||
// between the read lock and the write lock it's not impossible
|
||||
// that someone already opened the index (eg if two search happens
|
||||
// that someone already opened the index (eg if two searches happen
|
||||
// at the same time), thus before opening it we check a second time
|
||||
// if it's not already there.
|
||||
// Since there is a good chance it's not already there we can use
|
||||
// the entry method.
|
||||
match index_map.entry(uuid) {
|
||||
Entry::Vacant(entry) => {
|
||||
match index_map.get(&uuid) {
|
||||
Missing => {
|
||||
let index_path = self.base_path.join(uuid.to_string());
|
||||
|
||||
let index =
|
||||
self.create_or_open_index(&index_path, None, self.index_size)?;
|
||||
entry.insert(Available(index.clone()));
|
||||
break index;
|
||||
break index_map.create(
|
||||
&uuid,
|
||||
&index_path,
|
||||
None,
|
||||
self.index_base_map_size,
|
||||
)?;
|
||||
}
|
||||
Entry::Occupied(entry) => match entry.get() {
|
||||
Available(index) => break index.clone(),
|
||||
BeingResized(resize_operation) => {
|
||||
// Avoiding the deadlock: we drop the lock before waiting
|
||||
let resize_operation = resize_operation.clone();
|
||||
drop(index_map);
|
||||
resize_operation.wait();
|
||||
continue;
|
||||
}
|
||||
BeingDeleted => return Err(Error::IndexNotFound(name.to_string())),
|
||||
},
|
||||
Available(index) => break index,
|
||||
Closing(_) => {
|
||||
// the reopening will be handled in the next loop operation
|
||||
continue;
|
||||
}
|
||||
BeingDeleted => return Err(Error::IndexNotFound(name.to_string())),
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -323,18 +311,38 @@ impl IndexMapper {
|
||||
Ok(index)
|
||||
}
|
||||
|
||||
/// Return all indexes, may open them if they weren't already opened.
|
||||
pub fn indexes(&self, rtxn: &RoTxn) -> Result<Vec<(String, Index)>> {
|
||||
/// Attempts `f` for each index that exists in the index mapper.
|
||||
///
|
||||
/// It is preferable to use this function rather than a loop that opens all indexes, as a way to avoid having all indexes opened,
|
||||
/// which is unsupported in general.
|
||||
///
|
||||
/// Since `f` is allowed to return a result, and `Index` is cloneable, it is still possible to wrongly build e.g. a vector of
|
||||
/// all the indexes, but this function makes it harder and so less likely to do accidentally.
|
||||
pub fn try_for_each_index<U, V>(
|
||||
&self,
|
||||
rtxn: &RoTxn,
|
||||
mut f: impl FnMut(&str, &Index) -> Result<U>,
|
||||
) -> Result<V>
|
||||
where
|
||||
V: FromIterator<U>,
|
||||
{
|
||||
self.index_mapping
|
||||
.iter(rtxn)?
|
||||
.map(|ret| {
|
||||
ret.map_err(Error::from).and_then(|(name, _)| {
|
||||
self.index(rtxn, name).map(|index| (name.to_string(), index))
|
||||
})
|
||||
.map(|res| {
|
||||
res.map_err(Error::from)
|
||||
.and_then(|(name, _)| self.index(rtxn, name).and_then(|index| f(name, &index)))
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Return the name of all indexes without opening them.
|
||||
pub fn index_names(&self, rtxn: &RoTxn) -> Result<Vec<String>> {
|
||||
self.index_mapping
|
||||
.iter(rtxn)?
|
||||
.map(|res| res.map_err(Error::from).map(|(name, _)| name.to_string()))
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Swap two index names.
|
||||
pub fn swap(&self, wtxn: &mut RwTxn, lhs: &str, rhs: &str) -> Result<()> {
|
||||
let lhs_uuid = self
|
@ -254,6 +254,6 @@ pub fn snapshot_canceled_by(
|
||||
snap
|
||||
}
|
||||
pub fn snapshot_index_mapper(rtxn: &RoTxn, mapper: &IndexMapper) -> String {
|
||||
let names = mapper.indexes(rtxn).unwrap().into_iter().map(|(n, _)| n).collect::<Vec<_>>();
|
||||
let names = mapper.index_names(rtxn).unwrap();
|
||||
format!("{names:?}")
|
||||
}
|
||||
|
@ -24,6 +24,7 @@ pub mod error;
|
||||
mod index_mapper;
|
||||
#[cfg(test)]
|
||||
mod insta_snapshot;
|
||||
mod lru;
|
||||
mod utils;
|
||||
mod uuid_codec;
|
||||
|
||||
@ -31,7 +32,7 @@ pub type Result<T> = std::result::Result<T, Error>;
|
||||
pub type TaskId = u32;
|
||||
|
||||
use std::ops::{Bound, RangeBounds};
|
||||
use std::path::PathBuf;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::atomic::Ordering::Relaxed;
|
||||
use std::sync::{Arc, RwLock};
|
||||
@ -229,8 +230,12 @@ pub struct IndexSchedulerOptions {
|
||||
pub dumps_path: PathBuf,
|
||||
/// The maximum size, in bytes, of the task index.
|
||||
pub task_db_size: usize,
|
||||
/// The maximum size, in bytes, of each meilisearch index.
|
||||
pub index_size: usize,
|
||||
/// The size, in bytes, with which a meilisearch index is opened the first time of each meilisearch index.
|
||||
pub index_base_map_size: usize,
|
||||
/// The size, in bytes, by which the map size of an index is increased when it resized due to being full.
|
||||
pub index_growth_amount: usize,
|
||||
/// The number of indexes that can be concurrently opened in memory.
|
||||
pub index_count: usize,
|
||||
/// Configuration used during indexing for each meilisearch index.
|
||||
pub indexer_config: IndexerConfig,
|
||||
/// Set to `true` iff the index scheduler is allowed to automatically
|
||||
@ -360,9 +365,25 @@ impl IndexScheduler {
|
||||
std::fs::create_dir_all(&options.indexes_path)?;
|
||||
std::fs::create_dir_all(&options.dumps_path)?;
|
||||
|
||||
let task_db_size = clamp_to_page_size(options.task_db_size);
|
||||
let budget = if options.indexer_config.skip_index_budget {
|
||||
IndexBudget {
|
||||
map_size: options.index_base_map_size,
|
||||
index_count: options.index_count,
|
||||
task_db_size,
|
||||
}
|
||||
} else {
|
||||
Self::index_budget(
|
||||
&options.tasks_path,
|
||||
options.index_base_map_size,
|
||||
task_db_size,
|
||||
options.index_count,
|
||||
)
|
||||
};
|
||||
|
||||
let env = heed::EnvOpenOptions::new()
|
||||
.max_dbs(10)
|
||||
.map_size(clamp_to_page_size(options.task_db_size))
|
||||
.map_size(budget.task_db_size)
|
||||
.open(options.tasks_path)?;
|
||||
let file_store = FileStore::new(&options.update_file_path)?;
|
||||
|
||||
@ -382,7 +403,9 @@ impl IndexScheduler {
|
||||
index_mapper: IndexMapper::new(
|
||||
&env,
|
||||
options.indexes_path,
|
||||
options.index_size,
|
||||
budget.map_size,
|
||||
options.index_growth_amount,
|
||||
budget.index_count,
|
||||
options.indexer_config,
|
||||
)?,
|
||||
env,
|
||||
@ -406,6 +429,65 @@ impl IndexScheduler {
|
||||
Ok(this)
|
||||
}
|
||||
|
||||
fn index_budget(
|
||||
tasks_path: &Path,
|
||||
base_map_size: usize,
|
||||
mut task_db_size: usize,
|
||||
max_index_count: usize,
|
||||
) -> IndexBudget {
|
||||
let budget = utils::dichotomic_search(base_map_size, |map_size| {
|
||||
Self::is_good_heed(tasks_path, map_size)
|
||||
});
|
||||
|
||||
log::debug!("memmap budget: {budget}B");
|
||||
let mut budget = budget / 2;
|
||||
if task_db_size > (budget / 2) {
|
||||
task_db_size = clamp_to_page_size(budget * 2 / 5);
|
||||
log::debug!(
|
||||
"Decreasing max size of task DB to {task_db_size}B due to constrained memory space"
|
||||
);
|
||||
}
|
||||
budget -= task_db_size;
|
||||
|
||||
// won't be mutated again
|
||||
let budget = budget;
|
||||
let task_db_size = task_db_size;
|
||||
|
||||
log::debug!("index budget: {budget}B");
|
||||
let mut index_count = budget / base_map_size;
|
||||
if index_count < 2 {
|
||||
// take a bit less than half than the budget to make sure we can always afford to open an index
|
||||
let map_size = (budget * 2) / 5;
|
||||
// single index of max budget
|
||||
log::debug!("1 index of {map_size}B can be opened simultaneously.");
|
||||
return IndexBudget { map_size, index_count: 1, task_db_size };
|
||||
}
|
||||
// give us some space for an additional index when the cache is already full
|
||||
// decrement is OK because index_count >= 2.
|
||||
index_count -= 1;
|
||||
if index_count > max_index_count {
|
||||
index_count = max_index_count;
|
||||
}
|
||||
log::debug!("Up to {index_count} indexes of {base_map_size}B opened simultaneously.");
|
||||
IndexBudget { map_size: base_map_size, index_count, task_db_size }
|
||||
}
|
||||
|
||||
fn is_good_heed(tasks_path: &Path, map_size: usize) -> bool {
|
||||
if let Ok(env) =
|
||||
heed::EnvOpenOptions::new().map_size(clamp_to_page_size(map_size)).open(tasks_path)
|
||||
{
|
||||
env.prepare_for_closing().wait();
|
||||
true
|
||||
} else {
|
||||
// We're treating all errors equally here, not only allocation errors.
|
||||
// This means there's a possiblity for the budget to lower due to errors different from allocation errors.
|
||||
// For persistent errors, this is OK as long as the task db is then reopened normally without ignoring the error this time.
|
||||
// For transient errors, this could lead to an instance with too low a budget.
|
||||
// However transient errors are: 1) less likely than persistent errors 2) likely to cause other issues down the line anyway.
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
pub fn read_txn(&self) -> Result<RoTxn> {
|
||||
self.env.read_txn().map_err(|e| e.into())
|
||||
}
|
||||
@ -459,15 +541,42 @@ impl IndexScheduler {
|
||||
///
|
||||
/// * If the index wasn't opened before, the index will be opened.
|
||||
/// * If the index doesn't exist on disk, the `IndexNotFoundError` is thrown.
|
||||
///
|
||||
/// ### Note
|
||||
///
|
||||
/// As an `Index` requires a large swath of the virtual memory address space, correct usage of an `Index` does not
|
||||
/// keep its handle for too long.
|
||||
///
|
||||
/// Some configurations also can't reasonably open multiple indexes at once.
|
||||
/// If you need to fetch information from or perform an action on all indexes,
|
||||
/// see the `try_for_each_index` function.
|
||||
pub fn index(&self, name: &str) -> Result<Index> {
|
||||
let rtxn = self.env.read_txn()?;
|
||||
self.index_mapper.index(&rtxn, name)
|
||||
}
|
||||
|
||||
/// Return and open all the indexes.
|
||||
pub fn indexes(&self) -> Result<Vec<(String, Index)>> {
|
||||
/// Return the name of all indexes without opening them.
|
||||
pub fn index_names(self) -> Result<Vec<String>> {
|
||||
let rtxn = self.env.read_txn()?;
|
||||
self.index_mapper.indexes(&rtxn)
|
||||
self.index_mapper.index_names(&rtxn)
|
||||
}
|
||||
|
||||
/// Attempts `f` for each index that exists known to the index scheduler.
|
||||
///
|
||||
/// It is preferable to use this function rather than a loop that opens all indexes, as a way to avoid having all indexes opened,
|
||||
/// which is unsupported in general.
|
||||
///
|
||||
/// Since `f` is allowed to return a result, and `Index` is cloneable, it is still possible to wrongly build e.g. a vector of
|
||||
/// all the indexes, but this function makes it harder and so less likely to do accidentally.
|
||||
///
|
||||
/// If many indexes exist, this operation can take time to complete (in the order of seconds for a 1000 of indexes) as it needs to open
|
||||
/// all the indexes.
|
||||
pub fn try_for_each_index<U, V>(&self, f: impl FnMut(&str, &Index) -> Result<U>) -> Result<V>
|
||||
where
|
||||
V: FromIterator<U>,
|
||||
{
|
||||
let rtxn = self.env.read_txn()?;
|
||||
self.index_mapper.try_for_each_index(&rtxn, f)
|
||||
}
|
||||
|
||||
/// Return the task ids matched by the given query from the index scheduler's point of view.
|
||||
@ -1109,6 +1218,16 @@ pub enum TickOutcome {
|
||||
WaitForSignal,
|
||||
}
|
||||
|
||||
/// How many indexes we can afford to have open simultaneously.
|
||||
struct IndexBudget {
|
||||
/// Map size of an index.
|
||||
map_size: usize,
|
||||
/// Maximum number of simultaneously opened indexes.
|
||||
index_count: usize,
|
||||
/// For very constrained systems we might need to reduce the base task_db_size so we can accept at least one index.
|
||||
task_db_size: usize,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::io::{BufWriter, Seek, Write};
|
||||
@ -1154,6 +1273,8 @@ mod tests {
|
||||
let tempdir = TempDir::new().unwrap();
|
||||
let (sender, receiver) = crossbeam::channel::bounded(0);
|
||||
|
||||
let indexer_config = IndexerConfig { skip_index_budget: true, ..Default::default() };
|
||||
|
||||
let options = IndexSchedulerOptions {
|
||||
version_file_path: tempdir.path().join(VERSION_FILE_NAME),
|
||||
auth_path: tempdir.path().join("auth"),
|
||||
@ -1163,8 +1284,10 @@ mod tests {
|
||||
snapshots_path: tempdir.path().join("snapshots"),
|
||||
dumps_path: tempdir.path().join("dumps"),
|
||||
task_db_size: 1000 * 1000, // 1 MB, we don't use MiB on purpose.
|
||||
index_size: 1000 * 1000, // 1 MB, we don't use MiB on purpose.
|
||||
indexer_config: IndexerConfig::default(),
|
||||
index_base_map_size: 1000 * 1000, // 1 MB, we don't use MiB on purpose.
|
||||
index_growth_amount: 1000 * 1000, // 1 MB
|
||||
index_count: 5,
|
||||
indexer_config,
|
||||
autobatching_enabled,
|
||||
};
|
||||
|
||||
|
203
index-scheduler/src/lru.rs
Normal file
203
index-scheduler/src/lru.rs
Normal file
@ -0,0 +1,203 @@
|
||||
//! Thread-safe `Vec`-backend LRU cache using [`std::sync::atomic::AtomicU64`] for synchronization.
|
||||
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
|
||||
/// Thread-safe `Vec`-backend LRU cache
|
||||
#[derive(Debug)]
|
||||
pub struct Lru<T> {
|
||||
data: Vec<(AtomicU64, T)>,
|
||||
generation: AtomicU64,
|
||||
cap: usize,
|
||||
}
|
||||
|
||||
impl<T> Lru<T> {
|
||||
/// Creates a new LRU cache with the specified capacity.
|
||||
///
|
||||
/// The capacity is allocated up-front, and will never change through a [`Self::put`] operation.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// - If the capacity is 0.
|
||||
/// - If the capacity exceeds `isize::MAX` bytes.
|
||||
pub fn new(cap: usize) -> Self {
|
||||
assert_ne!(cap, 0, "The capacity of a cache cannot be 0");
|
||||
Self {
|
||||
// Note: since the element of the vector contains an AtomicU64, it is definitely not zero-sized so cap will never be usize::MAX.
|
||||
data: Vec::with_capacity(cap),
|
||||
generation: AtomicU64::new(0),
|
||||
cap,
|
||||
}
|
||||
}
|
||||
|
||||
/// The capacity of this LRU cache, that is the maximum number of elements it can hold before evicting elements from the cache.
|
||||
///
|
||||
/// The cache will contain at most this number of elements at any given time.
|
||||
pub fn capacity(&self) -> usize {
|
||||
self.cap
|
||||
}
|
||||
|
||||
fn next_generation(&self) -> u64 {
|
||||
// Acquire so this "happens-before" any potential store to a data cell (with Release ordering)
|
||||
let generation = self.generation.fetch_add(1, Ordering::Acquire);
|
||||
generation + 1
|
||||
}
|
||||
|
||||
fn next_generation_mut(&mut self) -> u64 {
|
||||
let generation = self.generation.get_mut();
|
||||
*generation += 1;
|
||||
*generation
|
||||
}
|
||||
|
||||
/// Add a value in the cache, evicting an older value if necessary.
|
||||
///
|
||||
/// If a value was evicted from the cache, it is returned.
|
||||
///
|
||||
/// # Complexity
|
||||
///
|
||||
/// - If the cache is full, then linear in the capacity.
|
||||
/// - Otherwise constant.
|
||||
pub fn put(&mut self, value: T) -> Option<T> {
|
||||
// no need for a memory fence: we assume that whichever mechanism provides us synchronization
|
||||
// (very probably, a RwLock) takes care of fencing for us.
|
||||
|
||||
let next_generation = self.next_generation_mut();
|
||||
let evicted = if self.is_full() { self.pop() } else { None };
|
||||
self.data.push((AtomicU64::new(next_generation), value));
|
||||
evicted
|
||||
}
|
||||
|
||||
/// Evict the oldest value from the cache.
|
||||
///
|
||||
/// If the cache is empty, `None` will be returned.
|
||||
///
|
||||
/// # Complexity
|
||||
///
|
||||
/// - Linear in the capacity of the cache.
|
||||
pub fn pop(&mut self) -> Option<T> {
|
||||
// Don't use `Iterator::min_by_key` that provides shared references to its elements,
|
||||
// so that we can get an exclusive one.
|
||||
// This allows to handles the `AtomicU64`s as normal integers without using atomic instructions.
|
||||
let mut min_generation_index = None;
|
||||
for (index, (generation, _)) in self.data.iter_mut().enumerate() {
|
||||
let generation = *generation.get_mut();
|
||||
if let Some((_, min_generation)) = min_generation_index {
|
||||
if min_generation > generation {
|
||||
min_generation_index = Some((index, generation));
|
||||
}
|
||||
} else {
|
||||
min_generation_index = Some((index, generation))
|
||||
}
|
||||
}
|
||||
min_generation_index.map(|(min_index, _)| self.data.swap_remove(min_index).1)
|
||||
}
|
||||
|
||||
/// The current number of elements in the cache.
|
||||
///
|
||||
/// This value is guaranteed to be less than or equal to [`Self::capacity`].
|
||||
pub fn len(&self) -> usize {
|
||||
self.data.len()
|
||||
}
|
||||
|
||||
/// Returns `true` if putting any additional element in the cache would cause the eviction of an element.
|
||||
pub fn is_full(&self) -> bool {
|
||||
self.len() == self.capacity()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct LruMap<K, V>(Lru<(K, V)>);
|
||||
|
||||
impl<K, V> LruMap<K, V>
|
||||
where
|
||||
K: Eq,
|
||||
{
|
||||
/// Creates a new LRU cache map with the specified capacity.
|
||||
///
|
||||
/// The capacity is allocated up-front, and will never change through a [`Self::insert`] operation.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// - If the capacity is 0.
|
||||
/// - If the capacity exceeds `isize::MAX` bytes.
|
||||
pub fn new(cap: usize) -> Self {
|
||||
Self(Lru::new(cap))
|
||||
}
|
||||
|
||||
/// Gets a value in the cache map by its key.
|
||||
///
|
||||
/// If no value matches, `None` will be returned.
|
||||
///
|
||||
/// # Complexity
|
||||
///
|
||||
/// - Linear in the capacity of the cache.
|
||||
pub fn get(&self, key: &K) -> Option<&V> {
|
||||
for (generation, (candidate, value)) in self.0.data.iter() {
|
||||
if key == candidate {
|
||||
generation.store(self.0.next_generation(), Ordering::Release);
|
||||
return Some(value);
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
/// Gets a value in the cache map by its key.
|
||||
///
|
||||
/// If no value matches, `None` will be returned.
|
||||
///
|
||||
/// # Complexity
|
||||
///
|
||||
/// - Linear in the capacity of the cache.
|
||||
pub fn get_mut(&mut self, key: &K) -> Option<&mut V> {
|
||||
let next_generation = self.0.next_generation_mut();
|
||||
for (generation, (candidate, value)) in self.0.data.iter_mut() {
|
||||
if key == candidate {
|
||||
*generation.get_mut() = next_generation;
|
||||
return Some(value);
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
/// Inserts a value in the cache map by its key, replacing any existing value and returning any evicted value.
|
||||
///
|
||||
/// # Complexity
|
||||
///
|
||||
/// - Linear in the capacity of the cache.
|
||||
pub fn insert(&mut self, key: K, mut value: V) -> InsertionOutcome<K, V> {
|
||||
match self.get_mut(&key) {
|
||||
Some(old_value) => {
|
||||
std::mem::swap(old_value, &mut value);
|
||||
InsertionOutcome::Replaced(value)
|
||||
}
|
||||
None => match self.0.put((key, value)) {
|
||||
Some((key, value)) => InsertionOutcome::Evicted(key, value),
|
||||
None => InsertionOutcome::InsertedNew,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Removes an element from the cache map by its key, returning its value.
|
||||
///
|
||||
/// Returns `None` if there was no element with this key in the cache.
|
||||
///
|
||||
/// # Complexity
|
||||
///
|
||||
/// - Linear in the capacity of the cache.
|
||||
pub fn remove(&mut self, key: &K) -> Option<V> {
|
||||
for (index, (_, (candidate, _))) in self.0.data.iter_mut().enumerate() {
|
||||
if key == candidate {
|
||||
return Some(self.0.data.swap_remove(index).1 .1);
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// The result of an insertion in a LRU map.
|
||||
pub enum InsertionOutcome<K, V> {
|
||||
/// The key was not in the cache, the key-value pair has been inserted.
|
||||
InsertedNew,
|
||||
/// The key was not in the cache and an old key-value pair was evicted from the cache to make room for its insertions.
|
||||
Evicted(K, V),
|
||||
/// The key was already in the cache map, its value has been updated.
|
||||
Replaced(V),
|
||||
}
|
@ -538,3 +538,37 @@ impl IndexScheduler {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn dichotomic_search(start_point: usize, mut is_good: impl FnMut(usize) -> bool) -> usize {
|
||||
let mut biggest_good = None;
|
||||
let mut smallest_bad = None;
|
||||
let mut current = start_point;
|
||||
loop {
|
||||
let is_good = is_good(current);
|
||||
|
||||
(biggest_good, smallest_bad, current) = match (biggest_good, smallest_bad, is_good) {
|
||||
(None, None, false) => (None, Some(current), current / 2),
|
||||
(None, None, true) => (Some(current), None, current * 2),
|
||||
(None, Some(smallest_bad), true) => {
|
||||
(Some(current), Some(smallest_bad), (current + smallest_bad) / 2)
|
||||
}
|
||||
(None, Some(_), false) => (None, Some(current), current / 2),
|
||||
(Some(_), None, true) => (Some(current), None, current * 2),
|
||||
(Some(biggest_good), None, false) => {
|
||||
(Some(biggest_good), Some(current), (biggest_good + current) / 2)
|
||||
}
|
||||
(Some(_), Some(smallest_bad), true) => {
|
||||
(Some(current), Some(smallest_bad), (smallest_bad + current) / 2)
|
||||
}
|
||||
(Some(biggest_good), Some(_), false) => {
|
||||
(Some(biggest_good), Some(current), (biggest_good + current) / 2)
|
||||
}
|
||||
};
|
||||
if current == 0 {
|
||||
return current;
|
||||
}
|
||||
if smallest_bad.is_some() && biggest_good.is_some() && biggest_good >= Some(current) {
|
||||
return current;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -292,7 +292,8 @@ impl From<Opt> for Infos {
|
||||
ScheduleSnapshot::Enabled(interval) => Some(interval),
|
||||
};
|
||||
|
||||
let IndexerOpts { max_indexing_memory, max_indexing_threads } = indexer_options;
|
||||
let IndexerOpts { max_indexing_memory, max_indexing_threads, skip_index_budget: _ } =
|
||||
indexer_options;
|
||||
|
||||
// We're going to override every sensible information.
|
||||
// We consider information sensible if it contains a path, an address, or a key.
|
||||
|
@ -45,6 +45,34 @@ use option::ScheduleSnapshot;
|
||||
|
||||
use crate::error::MeilisearchHttpError;
|
||||
|
||||
/// Default number of simultaneously opened indexes.
|
||||
///
|
||||
/// This value is used when dynamic computation of how many indexes can be opened at once was skipped (e.g., in tests).
|
||||
///
|
||||
/// Lower for Windows that dedicates a smaller virtual address space to processes.
|
||||
///
|
||||
/// The value was chosen this way:
|
||||
///
|
||||
/// - Windows provides a small virtual address space of about 10TiB to processes.
|
||||
/// - The chosen value allows for indexes to use the default map size of 2TiB safely.
|
||||
#[cfg(windows)]
|
||||
const DEFAULT_INDEX_COUNT: usize = 4;
|
||||
|
||||
/// Default number of simultaneously opened indexes.
|
||||
///
|
||||
/// This value is used when dynamic computation of how many indexes can be opened at once was skipped (e.g., in tests).
|
||||
///
|
||||
/// The higher, the better for avoiding reopening indexes.
|
||||
///
|
||||
/// The value was chosen this way:
|
||||
///
|
||||
/// - Opening an index consumes a file descriptor.
|
||||
/// - The default on many unices is about 256 file descriptors for a process.
|
||||
/// - 100 is a little bit less than half this value.
|
||||
/// - The chosen value allows for indexes to use the default map size of 2TiB safely.
|
||||
#[cfg(not(windows))]
|
||||
const DEFAULT_INDEX_COUNT: usize = 20;
|
||||
|
||||
/// Check if a db is empty. It does not provide any information on the
|
||||
/// validity of the data in it.
|
||||
/// We consider a database as non empty when it's a non empty directory.
|
||||
@ -206,9 +234,11 @@ fn open_or_create_database_unchecked(
|
||||
snapshots_path: opt.snapshot_dir.clone(),
|
||||
dumps_path: opt.dump_dir.clone(),
|
||||
task_db_size: opt.max_task_db_size.get_bytes() as usize,
|
||||
index_size: opt.max_index_size.get_bytes() as usize,
|
||||
index_base_map_size: opt.max_index_size.get_bytes() as usize,
|
||||
indexer_config: (&opt.indexer_options).try_into()?,
|
||||
autobatching_enabled: true,
|
||||
index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().get_bytes() as usize,
|
||||
index_count: DEFAULT_INDEX_COUNT,
|
||||
})?)
|
||||
};
|
||||
|
||||
|
@ -65,11 +65,11 @@ const MEILI_MAX_INDEXING_THREADS: &str = "MEILI_MAX_INDEXING_THREADS";
|
||||
const DEFAULT_LOG_EVERY_N: usize = 100_000;
|
||||
|
||||
// Each environment (index and task-db) is taking space in the virtual address space.
|
||||
//
|
||||
// The size of the virtual address space is limited by the OS. About 100TB for Linux and about 10TB for Windows.
|
||||
// This means that the number of indexes is limited to about 200 for Linux and about 20 for Windows.
|
||||
pub const INDEX_SIZE: u64 = 536_870_912_000; // 500 GiB
|
||||
pub const TASK_DB_SIZE: u64 = 10_737_418_240; // 10 GiB
|
||||
// Ideally, indexes can occupy 2TiB each to avoid having to manually resize them.
|
||||
// The actual size of the virtual address space is computed at startup to determine how many 2TiB indexes can be
|
||||
// opened simultaneously.
|
||||
pub const INDEX_SIZE: u64 = 2 * 1024 * 1024 * 1024 * 1024; // 2 TiB
|
||||
pub const TASK_DB_SIZE: u64 = 10 * 1024 * 1024 * 1024; // 10 GiB
|
||||
|
||||
#[derive(Debug, Default, Clone, Copy, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "UPPERCASE")]
|
||||
@ -494,12 +494,21 @@ pub struct IndexerOpts {
|
||||
#[clap(long, env = MEILI_MAX_INDEXING_THREADS, default_value_t)]
|
||||
#[serde(default)]
|
||||
pub max_indexing_threads: MaxThreads,
|
||||
|
||||
/// Whether or not we want to determine the budget of virtual memory address space we have available dynamically
|
||||
/// (the default), or statically.
|
||||
///
|
||||
/// Determining the budget of virtual memory address space dynamically takes some time on some systems (such as macOS)
|
||||
/// and may make tests non-deterministic, so we want to skip it in tests.
|
||||
#[clap(skip)]
|
||||
#[serde(skip)]
|
||||
pub skip_index_budget: bool,
|
||||
}
|
||||
|
||||
impl IndexerOpts {
|
||||
/// Exports the values to their corresponding env vars if they are not set.
|
||||
pub fn export_to_env(self) {
|
||||
let IndexerOpts { max_indexing_memory, max_indexing_threads } = self;
|
||||
let IndexerOpts { max_indexing_memory, max_indexing_threads, skip_index_budget: _ } = self;
|
||||
if let Some(max_indexing_memory) = max_indexing_memory.0 {
|
||||
export_to_env_if_not_present(
|
||||
MEILI_MAX_INDEXING_MEMORY,
|
||||
@ -527,6 +536,7 @@ impl TryFrom<&IndexerOpts> for IndexerConfig {
|
||||
max_memory: other.max_indexing_memory.map(|b| b.get_bytes() as usize),
|
||||
thread_pool: Some(thread_pool),
|
||||
max_positions_per_attributes: None,
|
||||
skip_index_budget: other.skip_index_budget,
|
||||
..Default::default()
|
||||
})
|
||||
}
|
||||
|
@ -61,6 +61,8 @@ pub struct IndexView {
|
||||
|
||||
impl IndexView {
|
||||
fn new(uid: String, index: &Index) -> Result<IndexView, milli::Error> {
|
||||
// It is important that this function does not keep the Index handle or a clone of it, because
|
||||
// `list_indexes` relies on this property to avoid opening all indexes at once.
|
||||
let rtxn = index.read_txn()?;
|
||||
Ok(IndexView {
|
||||
uid,
|
||||
@ -90,13 +92,15 @@ pub async fn list_indexes(
|
||||
paginate: AwebQueryParameter<ListIndexes, DeserrQueryParamError>,
|
||||
) -> Result<HttpResponse, ResponseError> {
|
||||
let filters = index_scheduler.filters();
|
||||
let indexes: Vec<_> = index_scheduler.indexes()?;
|
||||
let indexes = indexes
|
||||
.into_iter()
|
||||
.filter(|(name, _)| filters.is_index_authorized(name))
|
||||
.map(|(name, index)| IndexView::new(name, &index))
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
|
||||
let indexes: Vec<Option<IndexView>> =
|
||||
index_scheduler.try_for_each_index(|uid, index| -> Result<Option<IndexView>, _> {
|
||||
if !filters.is_index_authorized(uid) {
|
||||
return Ok(None);
|
||||
}
|
||||
Ok(Some(IndexView::new(uid.to_string(), index)?))
|
||||
})?;
|
||||
// Won't cause to open all indexes because IndexView doesn't keep the `Index` opened.
|
||||
let indexes: Vec<IndexView> = indexes.into_iter().flatten().collect();
|
||||
let ret = paginate.as_pagination().auto_paginate_sized(indexes.into_iter());
|
||||
|
||||
debug!("returns: {:?}", ret);
|
||||
|
@ -261,9 +261,9 @@ pub fn create_all_stats(
|
||||
)?;
|
||||
// accumulate the size of each indexes
|
||||
let processing_index = processing_task.first().and_then(|task| task.index_uid());
|
||||
for (name, index) in index_scheduler.indexes()? {
|
||||
if !filters.is_index_authorized(&name) {
|
||||
continue;
|
||||
index_scheduler.try_for_each_index(|name, index| {
|
||||
if !filters.is_index_authorized(name) {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
database_size += index.on_disk_size()?;
|
||||
@ -278,8 +278,9 @@ pub fn create_all_stats(
|
||||
let updated_at = index.updated_at(&rtxn)?;
|
||||
last_task = last_task.map_or(Some(updated_at), |last| Some(last.max(updated_at)));
|
||||
|
||||
indexes.insert(name, stats);
|
||||
}
|
||||
indexes.insert(name.to_string(), stats);
|
||||
Ok(())
|
||||
})?;
|
||||
|
||||
database_size += index_scheduler.size()?;
|
||||
database_size += auth_controller.size()?;
|
||||
|
@ -205,6 +205,7 @@ pub fn default_settings(dir: impl AsRef<Path>) -> Opt {
|
||||
indexer_options: IndexerOpts {
|
||||
// memory has to be unlimited because several meilisearch are running in test context.
|
||||
max_indexing_memory: MaxMemory::unlimited(),
|
||||
skip_index_budget: true,
|
||||
..Parser::parse_from(None as Option<&str>)
|
||||
},
|
||||
#[cfg(feature = "metrics")]
|
||||
|
@ -11,6 +11,7 @@ pub struct IndexerConfig {
|
||||
pub chunk_compression_level: Option<u32>,
|
||||
pub thread_pool: Option<ThreadPool>,
|
||||
pub max_positions_per_attributes: Option<u32>,
|
||||
pub skip_index_budget: bool,
|
||||
}
|
||||
|
||||
impl Default for IndexerConfig {
|
||||
@ -24,6 +25,7 @@ impl Default for IndexerConfig {
|
||||
chunk_compression_level: None,
|
||||
thread_pool: None,
|
||||
max_positions_per_attributes: None,
|
||||
skip_index_budget: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user