From a529bf160c08c6ca267005f6f3b0ad1e3e5ae623 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Wed, 15 Feb 2023 12:30:46 +0100 Subject: [PATCH] Compute budget --- index-scheduler/src/lib.rs | 84 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 80 insertions(+), 4 deletions(-) diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 24a25a2aa..fbf172b20 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -32,7 +32,7 @@ pub type Result = std::result::Result; pub type TaskId = u32; use std::ops::{Bound, RangeBounds}; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering::Relaxed; use std::sync::{Arc, RwLock}; @@ -365,9 +365,16 @@ impl IndexScheduler { std::fs::create_dir_all(&options.indexes_path)?; std::fs::create_dir_all(&options.dumps_path)?; + let task_db_size = clamp_to_page_size(options.task_db_size); + let budget = IndexBudget { + map_size: options.index_base_map_size, + index_count: options.index_count, + task_db_size, + }; + let env = heed::EnvOpenOptions::new() .max_dbs(10) - .map_size(clamp_to_page_size(options.task_db_size)) + .map_size(budget.task_db_size) .open(options.tasks_path)?; let file_store = FileStore::new(&options.update_file_path)?; @@ -387,9 +394,9 @@ impl IndexScheduler { index_mapper: IndexMapper::new( &env, options.indexes_path, - options.index_base_map_size, + budget.map_size, options.index_growth_amount, - options.index_count, + budget.index_count, options.indexer_config, )?, env, @@ -413,6 +420,65 @@ impl IndexScheduler { Ok(this) } + fn index_budget( + tasks_path: &Path, + base_map_size: usize, + mut task_db_size: usize, + max_index_count: usize, + ) -> IndexBudget { + let budget = utils::dichotomic_search(base_map_size, |map_size| { + Self::is_good_heed(tasks_path, map_size) + }); + + log::debug!("memmap budget: {budget}B"); + let mut budget = budget / 2; + if task_db_size > (budget / 2) { + task_db_size = clamp_to_page_size(budget * 2 / 5); + log::debug!( + "Decreasing max size of task DB to {task_db_size}B due to constrained memory space" + ); + } + budget -= task_db_size; + + // won't be mutated again + let budget = budget; + let task_db_size = task_db_size; + + log::debug!("index budget: {budget}B"); + let mut index_count = budget / base_map_size; + if index_count < 2 { + // take a bit less than half than the budget to make sure we can always afford to open an index + let map_size = (budget * 2) / 5; + // single index of max budget + log::debug!("1 index of {map_size}B can be opened simultaneously."); + return IndexBudget { map_size, index_count: 1, task_db_size }; + } + // give us some space for an additional index when the cache is already full + // decrement is OK because index_count >= 2. + index_count -= 1; + if index_count > max_index_count { + index_count = max_index_count; + } + log::debug!("Up to {index_count} indexes of {base_map_size}B opened simultaneously."); + IndexBudget { map_size: base_map_size, index_count, task_db_size } + } + + fn is_good_heed(tasks_path: &Path, map_size: usize) -> bool { + if let Ok(env) = + heed::EnvOpenOptions::new().map_size(clamp_to_page_size(map_size)).open(tasks_path) + { + env.prepare_for_closing().wait(); + true + } else { + // We're treating all errors equally here, not only allocation errors. + // This means there's a possiblity for the budget to lower due to errors different from allocation errors. + // For persistent errors, this is OK as long as the task db is then reopened normally without ignoring the error this time. + // For transient errors, this could lead to an instance with too low a budget. + // However transient errors are: 1) less likely than persistent errors 2) likely to cause other issues down the line anyway. + false + } + } + pub fn read_txn(&self) -> Result { self.env.read_txn().map_err(|e| e.into()) } @@ -1116,6 +1182,16 @@ pub enum TickOutcome { WaitForSignal, } +/// How many indexes we can afford to have open simultaneously. +struct IndexBudget { + /// Map size of an index. + map_size: usize, + /// Maximum number of simultaneously opened indexes. + index_count: usize, + /// For very constrained systems we might need to reduce the base task_db_size so we can accept at least one index. + task_db_size: usize, +} + #[cfg(test)] mod tests { use std::io::{BufWriter, Seek, Write};