diff --git a/index-scheduler/src/index_mapper/index_map.rs b/index-scheduler/src/index_mapper/index_map.rs index a24213558..5cdadcd3b 100644 --- a/index-scheduler/src/index_mapper/index_map.rs +++ b/index-scheduler/src/index_mapper/index_map.rs @@ -295,6 +295,11 @@ impl IndexMap { "Attempt to finish deletion of an index that was being closed" ); } + + /// Returns the indexes that were opened by the `IndexMap`. + pub fn clear(&mut self) -> Vec { + self.available.clear().into_iter().map(|(_, (_, index))| index).collect() + } } /// Create or open an index in the specified path. diff --git a/index-scheduler/src/index_mapper/mod.rs b/index-scheduler/src/index_mapper/mod.rs index 7850966cf..00bf023b1 100644 --- a/index-scheduler/src/index_mapper/mod.rs +++ b/index-scheduler/src/index_mapper/mod.rs @@ -428,6 +428,11 @@ impl IndexMapper { Ok(()) } + /// Returns the indexes that were opened by the `IndexMapper`. + pub fn clear(&mut self) -> Vec { + self.index_map.write().unwrap().clear() + } + /// The stats of an index. /// /// If available in the cache, they are directly returned. diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index adbc06763..c5734dcfb 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -294,92 +294,13 @@ impl IndexScheduler { #[cfg(test)] test_breakpoint_sdr: crossbeam::channel::Sender<(Breakpoint, bool)>, #[cfg(test)] planned_failures: Vec<(usize, tests::FailureLocation)>, ) -> Result { - std::fs::create_dir_all(&options.tasks_path)?; - std::fs::create_dir_all(&options.update_file_path)?; - std::fs::create_dir_all(&options.indexes_path)?; - std::fs::create_dir_all(&options.dumps_path)?; - - if cfg!(windows) && options.enable_mdb_writemap { - // programmer error if this happens: in normal use passing the option on Windows is an error in main - panic!("Windows doesn't support the MDB_WRITEMAP LMDB option"); - } - - let task_db_size = clamp_to_page_size(options.task_db_size); - let budget = if options.indexer_config.skip_index_budget { - IndexBudget { - map_size: options.index_base_map_size, - index_count: options.index_count, - task_db_size, - } - } else { - Self::index_budget( - &options.tasks_path, - options.index_base_map_size, - task_db_size, - options.index_count, - ) - }; - - let env = heed::EnvOpenOptions::new() - .max_dbs(11) - .map_size(budget.task_db_size) - .open(options.tasks_path)?; - - let features = features::FeatureData::new(&env, options.instance_features)?; - - let file_store = FileStore::new(&options.update_file_path)?; - - let mut wtxn = env.write_txn()?; - let all_tasks = env.create_database(&mut wtxn, Some(db_name::ALL_TASKS))?; - let status = env.create_database(&mut wtxn, Some(db_name::STATUS))?; - let kind = env.create_database(&mut wtxn, Some(db_name::KIND))?; - let index_tasks = env.create_database(&mut wtxn, Some(db_name::INDEX_TASKS))?; - let canceled_by = env.create_database(&mut wtxn, Some(db_name::CANCELED_BY))?; - let enqueued_at = env.create_database(&mut wtxn, Some(db_name::ENQUEUED_AT))?; - let started_at = env.create_database(&mut wtxn, Some(db_name::STARTED_AT))?; - let finished_at = env.create_database(&mut wtxn, Some(db_name::FINISHED_AT))?; - wtxn.commit()?; - - // allow unreachable_code to get rids of the warning in the case of a test build. - let inner = IndexSchedulerInner { - must_stop_processing: MustStopProcessing::default(), - processing_tasks: Arc::new(RwLock::new(ProcessingTasks::new())), - file_store, - all_tasks, - status, - kind, - index_tasks, - canceled_by, - enqueued_at, - started_at, - finished_at, - index_mapper: IndexMapper::new( - &env, - options.indexes_path, - budget.map_size, - options.index_growth_amount, - budget.index_count, - options.enable_mdb_writemap, - options.indexer_config, - )?, - env, - // we want to start the loop right away in case meilisearch was ctrl+Ced while processing things - wake_up: Arc::new(SignalEvent::auto(true)), - autobatching_enabled: options.autobatching_enabled, - max_number_of_tasks: options.max_number_of_tasks, - dumps_path: options.dumps_path, - snapshots_path: options.snapshots_path, - auth_path: options.auth_path, - version_file_path: options.version_file_path, - zookeeper: options.zookeeper, + let inner = IndexSchedulerInner::new( + options, #[cfg(test)] test_breakpoint_sdr, #[cfg(test)] planned_failures, - #[cfg(test)] - run_loop_iteration: Arc::new(RwLock::new(0)), - features, - }; + )?; // initialize the directories we need to process batches. if let Some(zookeeper) = &inner.zookeeper { @@ -428,75 +349,6 @@ impl IndexScheduler { Ok(()) } - fn index_budget( - tasks_path: &Path, - base_map_size: usize, - mut task_db_size: usize, - max_index_count: usize, - ) -> IndexBudget { - #[cfg(windows)] - const DEFAULT_BUDGET: usize = 6 * 1024 * 1024 * 1024 * 1024; // 6 TiB, 1 index - #[cfg(not(windows))] - const DEFAULT_BUDGET: usize = 80 * 1024 * 1024 * 1024 * 1024; // 80 TiB, 18 indexes - - let budget = if Self::is_good_heed(tasks_path, DEFAULT_BUDGET) { - DEFAULT_BUDGET - } else { - log::debug!("determining budget with dichotomic search"); - utils::dichotomic_search(DEFAULT_BUDGET / 2, |map_size| { - Self::is_good_heed(tasks_path, map_size) - }) - }; - - log::debug!("memmap budget: {budget}B"); - let mut budget = budget / 2; - if task_db_size > (budget / 2) { - task_db_size = clamp_to_page_size(budget * 2 / 5); - log::debug!( - "Decreasing max size of task DB to {task_db_size}B due to constrained memory space" - ); - } - budget -= task_db_size; - - // won't be mutated again - let budget = budget; - let task_db_size = task_db_size; - - log::debug!("index budget: {budget}B"); - let mut index_count = budget / base_map_size; - if index_count < 2 { - // take a bit less than half than the budget to make sure we can always afford to open an index - let map_size = (budget * 2) / 5; - // single index of max budget - log::debug!("1 index of {map_size}B can be opened simultaneously."); - return IndexBudget { map_size, index_count: 1, task_db_size }; - } - // give us some space for an additional index when the cache is already full - // decrement is OK because index_count >= 2. - index_count -= 1; - if index_count > max_index_count { - index_count = max_index_count; - } - log::debug!("Up to {index_count} indexes of {base_map_size}B opened simultaneously."); - IndexBudget { map_size: base_map_size, index_count, task_db_size } - } - - fn is_good_heed(tasks_path: &Path, map_size: usize) -> bool { - if let Ok(env) = - heed::EnvOpenOptions::new().map_size(clamp_to_page_size(map_size)).open(tasks_path) - { - env.prepare_for_closing().wait(); - true - } else { - // We're treating all errors equally here, not only allocation errors. - // This means there's a possiblity for the budget to lower due to errors different from allocation errors. - // For persistent errors, this is OK as long as the task db is then reopened normally without ignoring the error this time. - // For transient errors, this could lead to an instance with too low a budget. - // However transient errors are: 1) less likely than persistent errors 2) likely to cause other issues down the line anyway. - false - } - } - /// Start the run loop for the given index scheduler. /// /// This function will execute in a different thread and must be called @@ -534,8 +386,6 @@ impl IndexScheduler { zookeeper .add_watch("/snapshots", AddWatchMode::PersistentRecursive, move |event| { if !latchc.has_leadership() { - let inner = this.inner(); - let WatchedEvent { event_type, path, keeper_state: _ } = event; match event_type { WatchedEventType::NodeCreated => { @@ -553,24 +403,32 @@ impl IndexScheduler { snapshot_id )); + let inner = this.inner(); + // 1. TODO: Ensure the snapshot version file is the same as our version. // 2. Download all the databases + let base_path = { + let mut path = inner.env.path().to_path_buf(); + assert!(path.pop()); + path + }; + let tasks_file = tempfile::NamedTempFile::new_in(inner.env.path()).unwrap(); log::info!("Downloading the index scheduler database."); let tasks_snapshot = snapshot_dir.join("tasks.mdb"); - std::fs::copy(tasks_snapshot, tasks_file).unwrap(); + std::fs::copy(&tasks_snapshot, tasks_file).unwrap(); log::info!("Downloading the indexes databases"); let indexes_files = tempfile::TempDir::new_in(&inner.index_mapper.base_path) .unwrap(); - let mut indexes = Vec::new(); - let dst = snapshot_dir.join("indexes"); - for result in std::fs::read_dir(&dst).unwrap() { + let mut indexes = Vec::new(); + let src = snapshot_dir.join("indexes"); + for result in std::fs::read_dir(&src).unwrap() { let entry = result.unwrap(); let uuid = entry .file_name() @@ -580,7 +438,7 @@ impl IndexScheduler { .to_string(); log::info!("\tDownloading the index {}", uuid.to_string()); std::fs::copy( - dst.join(&uuid), + src.join(&uuid), indexes_files.path().join(&uuid), ) .unwrap(); @@ -588,7 +446,28 @@ impl IndexScheduler { } // 3. Lock the index-mapper and close all the env - // TODO: continue here + drop(inner); + let mut lock = this.inner.write(); + let mut raw_inner = lock.take().unwrap(); + + // Wait for others to finish what they started + let indexes = raw_inner.index_mapper.clear(); + let mut pfcs: Vec<_> = indexes + .into_iter() + .map(|index| index.prepare_for_closing()) + .collect(); + pfcs.push(raw_inner.env.prepare_for_closing()); + pfcs.into_iter().for_each(|pfc| pfc.wait()); + + // Let's replace all the folders/files. + std::fs::rename(&tasks_snapshot, base_path.join("tasks")) + .unwrap(); + std::fs::rename(indexes_files, base_path.join("indexes")) + .unwrap(); + + // let inner = IndexSchedulerInner::new(); + + *lock = Some(todo!()); // run.env.close(); @@ -1126,6 +1005,168 @@ pub struct IndexSchedulerInner { } impl IndexSchedulerInner { + fn new( + options: IndexSchedulerOptions, + #[cfg(test)] test_breakpoint_sdr: crossbeam::channel::Sender<(Breakpoint, bool)>, + #[cfg(test)] planned_failures: Vec<(usize, tests::FailureLocation)>, + ) -> Result { + std::fs::create_dir_all(&options.tasks_path)?; + std::fs::create_dir_all(&options.update_file_path)?; + std::fs::create_dir_all(&options.indexes_path)?; + std::fs::create_dir_all(&options.dumps_path)?; + + if cfg!(windows) && options.enable_mdb_writemap { + // programmer error if this happens: in normal use passing the option on Windows is an error in main + panic!("Windows doesn't support the MDB_WRITEMAP LMDB option"); + } + + let task_db_size = clamp_to_page_size(options.task_db_size); + let budget = if options.indexer_config.skip_index_budget { + IndexBudget { + map_size: options.index_base_map_size, + index_count: options.index_count, + task_db_size, + } + } else { + Self::index_budget( + &options.tasks_path, + options.index_base_map_size, + task_db_size, + options.index_count, + ) + }; + + let env = heed::EnvOpenOptions::new() + .max_dbs(11) + .map_size(budget.task_db_size) + .open(options.tasks_path)?; + + let features = features::FeatureData::new(&env, options.instance_features)?; + + let file_store = FileStore::new(&options.update_file_path)?; + + let mut wtxn = env.write_txn()?; + let all_tasks = env.create_database(&mut wtxn, Some(db_name::ALL_TASKS))?; + let status = env.create_database(&mut wtxn, Some(db_name::STATUS))?; + let kind = env.create_database(&mut wtxn, Some(db_name::KIND))?; + let index_tasks = env.create_database(&mut wtxn, Some(db_name::INDEX_TASKS))?; + let canceled_by = env.create_database(&mut wtxn, Some(db_name::CANCELED_BY))?; + let enqueued_at = env.create_database(&mut wtxn, Some(db_name::ENQUEUED_AT))?; + let started_at = env.create_database(&mut wtxn, Some(db_name::STARTED_AT))?; + let finished_at = env.create_database(&mut wtxn, Some(db_name::FINISHED_AT))?; + wtxn.commit()?; + + // allow unreachable_code to get rids of the warning in the case of a test build. + Ok(IndexSchedulerInner { + must_stop_processing: MustStopProcessing::default(), + processing_tasks: Arc::new(RwLock::new(ProcessingTasks::new())), + file_store, + all_tasks, + status, + kind, + index_tasks, + canceled_by, + enqueued_at, + started_at, + finished_at, + index_mapper: IndexMapper::new( + &env, + options.indexes_path, + budget.map_size, + options.index_growth_amount, + budget.index_count, + options.enable_mdb_writemap, + options.indexer_config, + )?, + env, + // we want to start the loop right away in case meilisearch was ctrl+Ced while processing things + wake_up: Arc::new(SignalEvent::auto(true)), + autobatching_enabled: options.autobatching_enabled, + max_number_of_tasks: options.max_number_of_tasks, + dumps_path: options.dumps_path, + snapshots_path: options.snapshots_path, + auth_path: options.auth_path, + version_file_path: options.version_file_path, + zookeeper: options.zookeeper, + #[cfg(test)] + test_breakpoint_sdr, + #[cfg(test)] + planned_failures, + #[cfg(test)] + run_loop_iteration: Arc::new(RwLock::new(0)), + features, + }) + } + + fn index_budget( + tasks_path: &Path, + base_map_size: usize, + mut task_db_size: usize, + max_index_count: usize, + ) -> IndexBudget { + #[cfg(windows)] + const DEFAULT_BUDGET: usize = 6 * 1024 * 1024 * 1024 * 1024; // 6 TiB, 1 index + #[cfg(not(windows))] + const DEFAULT_BUDGET: usize = 80 * 1024 * 1024 * 1024 * 1024; // 80 TiB, 18 indexes + + let budget = if Self::is_good_heed(tasks_path, DEFAULT_BUDGET) { + DEFAULT_BUDGET + } else { + log::debug!("determining budget with dichotomic search"); + utils::dichotomic_search(DEFAULT_BUDGET / 2, |map_size| { + Self::is_good_heed(tasks_path, map_size) + }) + }; + + log::debug!("memmap budget: {budget}B"); + let mut budget = budget / 2; + if task_db_size > (budget / 2) { + task_db_size = clamp_to_page_size(budget * 2 / 5); + log::debug!( + "Decreasing max size of task DB to {task_db_size}B due to constrained memory space" + ); + } + budget -= task_db_size; + + // won't be mutated again + let budget = budget; + let task_db_size = task_db_size; + + log::debug!("index budget: {budget}B"); + let mut index_count = budget / base_map_size; + if index_count < 2 { + // take a bit less than half than the budget to make sure we can always afford to open an index + let map_size = (budget * 2) / 5; + // single index of max budget + log::debug!("1 index of {map_size}B can be opened simultaneously."); + return IndexBudget { map_size, index_count: 1, task_db_size }; + } + // give us some space for an additional index when the cache is already full + // decrement is OK because index_count >= 2. + index_count -= 1; + if index_count > max_index_count { + index_count = max_index_count; + } + log::debug!("Up to {index_count} indexes of {base_map_size}B opened simultaneously."); + IndexBudget { map_size: base_map_size, index_count, task_db_size } + } + + fn is_good_heed(tasks_path: &Path, map_size: usize) -> bool { + if let Ok(env) = + heed::EnvOpenOptions::new().map_size(clamp_to_page_size(map_size)).open(tasks_path) + { + env.prepare_for_closing().wait(); + true + } else { + // We're treating all errors equally here, not only allocation errors. + // This means there's a possiblity for the budget to lower due to errors different from allocation errors. + // For persistent errors, this is OK as long as the task db is then reopened normally without ignoring the error this time. + // For transient errors, this could lead to an instance with too low a budget. + // However transient errors are: 1) less likely than persistent errors 2) likely to cause other issues down the line anyway. + false + } + } + pub fn read_txn(&self) -> Result { self.env.read_txn().map_err(|e| e.into()) } diff --git a/index-scheduler/src/lru.rs b/index-scheduler/src/lru.rs index 370ff5fe1..0ea321d4b 100644 --- a/index-scheduler/src/lru.rs +++ b/index-scheduler/src/lru.rs @@ -1,5 +1,6 @@ //! Thread-safe `Vec`-backend LRU cache using [`std::sync::atomic::AtomicU64`] for synchronization. +use std::mem; use std::sync::atomic::{AtomicU64, Ordering}; /// Thread-safe `Vec`-backend LRU cache @@ -190,6 +191,11 @@ where } None } + + /// Returns the generation associated to the key and values of the `LruMap`. + pub fn clear(&mut self) -> Vec<(AtomicU64, (K, V))> { + mem::take(&mut self.0.data) + } } /// The result of an insertion in a LRU map.