WIP making the final snapshot swap

This commit is contained in:
Clément Renault 2023-08-31 15:56:42 +02:00
parent d7233ecdb8
commit 0c68b9ed4c
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
4 changed files with 216 additions and 159 deletions

View File

@ -295,6 +295,11 @@ impl IndexMap {
"Attempt to finish deletion of an index that was being closed" "Attempt to finish deletion of an index that was being closed"
); );
} }
/// Returns the indexes that were opened by the `IndexMap`.
pub fn clear(&mut self) -> Vec<Index> {
self.available.clear().into_iter().map(|(_, (_, index))| index).collect()
}
} }
/// Create or open an index in the specified path. /// Create or open an index in the specified path.

View File

@ -428,6 +428,11 @@ impl IndexMapper {
Ok(()) Ok(())
} }
/// Returns the indexes that were opened by the `IndexMapper`.
pub fn clear(&mut self) -> Vec<Index> {
self.index_map.write().unwrap().clear()
}
/// The stats of an index. /// The stats of an index.
/// ///
/// If available in the cache, they are directly returned. /// If available in the cache, they are directly returned.

View File

@ -294,92 +294,13 @@ impl IndexScheduler {
#[cfg(test)] test_breakpoint_sdr: crossbeam::channel::Sender<(Breakpoint, bool)>, #[cfg(test)] test_breakpoint_sdr: crossbeam::channel::Sender<(Breakpoint, bool)>,
#[cfg(test)] planned_failures: Vec<(usize, tests::FailureLocation)>, #[cfg(test)] planned_failures: Vec<(usize, tests::FailureLocation)>,
) -> Result<Self> { ) -> Result<Self> {
std::fs::create_dir_all(&options.tasks_path)?; let inner = IndexSchedulerInner::new(
std::fs::create_dir_all(&options.update_file_path)?; options,
std::fs::create_dir_all(&options.indexes_path)?;
std::fs::create_dir_all(&options.dumps_path)?;
if cfg!(windows) && options.enable_mdb_writemap {
// programmer error if this happens: in normal use passing the option on Windows is an error in main
panic!("Windows doesn't support the MDB_WRITEMAP LMDB option");
}
let task_db_size = clamp_to_page_size(options.task_db_size);
let budget = if options.indexer_config.skip_index_budget {
IndexBudget {
map_size: options.index_base_map_size,
index_count: options.index_count,
task_db_size,
}
} else {
Self::index_budget(
&options.tasks_path,
options.index_base_map_size,
task_db_size,
options.index_count,
)
};
let env = heed::EnvOpenOptions::new()
.max_dbs(11)
.map_size(budget.task_db_size)
.open(options.tasks_path)?;
let features = features::FeatureData::new(&env, options.instance_features)?;
let file_store = FileStore::new(&options.update_file_path)?;
let mut wtxn = env.write_txn()?;
let all_tasks = env.create_database(&mut wtxn, Some(db_name::ALL_TASKS))?;
let status = env.create_database(&mut wtxn, Some(db_name::STATUS))?;
let kind = env.create_database(&mut wtxn, Some(db_name::KIND))?;
let index_tasks = env.create_database(&mut wtxn, Some(db_name::INDEX_TASKS))?;
let canceled_by = env.create_database(&mut wtxn, Some(db_name::CANCELED_BY))?;
let enqueued_at = env.create_database(&mut wtxn, Some(db_name::ENQUEUED_AT))?;
let started_at = env.create_database(&mut wtxn, Some(db_name::STARTED_AT))?;
let finished_at = env.create_database(&mut wtxn, Some(db_name::FINISHED_AT))?;
wtxn.commit()?;
// allow unreachable_code to get rids of the warning in the case of a test build.
let inner = IndexSchedulerInner {
must_stop_processing: MustStopProcessing::default(),
processing_tasks: Arc::new(RwLock::new(ProcessingTasks::new())),
file_store,
all_tasks,
status,
kind,
index_tasks,
canceled_by,
enqueued_at,
started_at,
finished_at,
index_mapper: IndexMapper::new(
&env,
options.indexes_path,
budget.map_size,
options.index_growth_amount,
budget.index_count,
options.enable_mdb_writemap,
options.indexer_config,
)?,
env,
// we want to start the loop right away in case meilisearch was ctrl+Ced while processing things
wake_up: Arc::new(SignalEvent::auto(true)),
autobatching_enabled: options.autobatching_enabled,
max_number_of_tasks: options.max_number_of_tasks,
dumps_path: options.dumps_path,
snapshots_path: options.snapshots_path,
auth_path: options.auth_path,
version_file_path: options.version_file_path,
zookeeper: options.zookeeper,
#[cfg(test)] #[cfg(test)]
test_breakpoint_sdr, test_breakpoint_sdr,
#[cfg(test)] #[cfg(test)]
planned_failures, planned_failures,
#[cfg(test)] )?;
run_loop_iteration: Arc::new(RwLock::new(0)),
features,
};
// initialize the directories we need to process batches. // initialize the directories we need to process batches.
if let Some(zookeeper) = &inner.zookeeper { if let Some(zookeeper) = &inner.zookeeper {
@ -428,75 +349,6 @@ impl IndexScheduler {
Ok(()) Ok(())
} }
fn index_budget(
tasks_path: &Path,
base_map_size: usize,
mut task_db_size: usize,
max_index_count: usize,
) -> IndexBudget {
#[cfg(windows)]
const DEFAULT_BUDGET: usize = 6 * 1024 * 1024 * 1024 * 1024; // 6 TiB, 1 index
#[cfg(not(windows))]
const DEFAULT_BUDGET: usize = 80 * 1024 * 1024 * 1024 * 1024; // 80 TiB, 18 indexes
let budget = if Self::is_good_heed(tasks_path, DEFAULT_BUDGET) {
DEFAULT_BUDGET
} else {
log::debug!("determining budget with dichotomic search");
utils::dichotomic_search(DEFAULT_BUDGET / 2, |map_size| {
Self::is_good_heed(tasks_path, map_size)
})
};
log::debug!("memmap budget: {budget}B");
let mut budget = budget / 2;
if task_db_size > (budget / 2) {
task_db_size = clamp_to_page_size(budget * 2 / 5);
log::debug!(
"Decreasing max size of task DB to {task_db_size}B due to constrained memory space"
);
}
budget -= task_db_size;
// won't be mutated again
let budget = budget;
let task_db_size = task_db_size;
log::debug!("index budget: {budget}B");
let mut index_count = budget / base_map_size;
if index_count < 2 {
// take a bit less than half than the budget to make sure we can always afford to open an index
let map_size = (budget * 2) / 5;
// single index of max budget
log::debug!("1 index of {map_size}B can be opened simultaneously.");
return IndexBudget { map_size, index_count: 1, task_db_size };
}
// give us some space for an additional index when the cache is already full
// decrement is OK because index_count >= 2.
index_count -= 1;
if index_count > max_index_count {
index_count = max_index_count;
}
log::debug!("Up to {index_count} indexes of {base_map_size}B opened simultaneously.");
IndexBudget { map_size: base_map_size, index_count, task_db_size }
}
fn is_good_heed(tasks_path: &Path, map_size: usize) -> bool {
if let Ok(env) =
heed::EnvOpenOptions::new().map_size(clamp_to_page_size(map_size)).open(tasks_path)
{
env.prepare_for_closing().wait();
true
} else {
// We're treating all errors equally here, not only allocation errors.
// This means there's a possiblity for the budget to lower due to errors different from allocation errors.
// For persistent errors, this is OK as long as the task db is then reopened normally without ignoring the error this time.
// For transient errors, this could lead to an instance with too low a budget.
// However transient errors are: 1) less likely than persistent errors 2) likely to cause other issues down the line anyway.
false
}
}
/// Start the run loop for the given index scheduler. /// Start the run loop for the given index scheduler.
/// ///
/// This function will execute in a different thread and must be called /// This function will execute in a different thread and must be called
@ -534,8 +386,6 @@ impl IndexScheduler {
zookeeper zookeeper
.add_watch("/snapshots", AddWatchMode::PersistentRecursive, move |event| { .add_watch("/snapshots", AddWatchMode::PersistentRecursive, move |event| {
if !latchc.has_leadership() { if !latchc.has_leadership() {
let inner = this.inner();
let WatchedEvent { event_type, path, keeper_state: _ } = event; let WatchedEvent { event_type, path, keeper_state: _ } = event;
match event_type { match event_type {
WatchedEventType::NodeCreated => { WatchedEventType::NodeCreated => {
@ -553,24 +403,32 @@ impl IndexScheduler {
snapshot_id snapshot_id
)); ));
let inner = this.inner();
// 1. TODO: Ensure the snapshot version file is the same as our version. // 1. TODO: Ensure the snapshot version file is the same as our version.
// 2. Download all the databases // 2. Download all the databases
let base_path = {
let mut path = inner.env.path().to_path_buf();
assert!(path.pop());
path
};
let tasks_file = let tasks_file =
tempfile::NamedTempFile::new_in(inner.env.path()).unwrap(); tempfile::NamedTempFile::new_in(inner.env.path()).unwrap();
log::info!("Downloading the index scheduler database."); log::info!("Downloading the index scheduler database.");
let tasks_snapshot = snapshot_dir.join("tasks.mdb"); let tasks_snapshot = snapshot_dir.join("tasks.mdb");
std::fs::copy(tasks_snapshot, tasks_file).unwrap(); std::fs::copy(&tasks_snapshot, tasks_file).unwrap();
log::info!("Downloading the indexes databases"); log::info!("Downloading the indexes databases");
let indexes_files = let indexes_files =
tempfile::TempDir::new_in(&inner.index_mapper.base_path) tempfile::TempDir::new_in(&inner.index_mapper.base_path)
.unwrap(); .unwrap();
let mut indexes = Vec::new();
let dst = snapshot_dir.join("indexes"); let mut indexes = Vec::new();
for result in std::fs::read_dir(&dst).unwrap() { let src = snapshot_dir.join("indexes");
for result in std::fs::read_dir(&src).unwrap() {
let entry = result.unwrap(); let entry = result.unwrap();
let uuid = entry let uuid = entry
.file_name() .file_name()
@ -580,7 +438,7 @@ impl IndexScheduler {
.to_string(); .to_string();
log::info!("\tDownloading the index {}", uuid.to_string()); log::info!("\tDownloading the index {}", uuid.to_string());
std::fs::copy( std::fs::copy(
dst.join(&uuid), src.join(&uuid),
indexes_files.path().join(&uuid), indexes_files.path().join(&uuid),
) )
.unwrap(); .unwrap();
@ -588,7 +446,28 @@ impl IndexScheduler {
} }
// 3. Lock the index-mapper and close all the env // 3. Lock the index-mapper and close all the env
// TODO: continue here drop(inner);
let mut lock = this.inner.write();
let mut raw_inner = lock.take().unwrap();
// Wait for others to finish what they started
let indexes = raw_inner.index_mapper.clear();
let mut pfcs: Vec<_> = indexes
.into_iter()
.map(|index| index.prepare_for_closing())
.collect();
pfcs.push(raw_inner.env.prepare_for_closing());
pfcs.into_iter().for_each(|pfc| pfc.wait());
// Let's replace all the folders/files.
std::fs::rename(&tasks_snapshot, base_path.join("tasks"))
.unwrap();
std::fs::rename(indexes_files, base_path.join("indexes"))
.unwrap();
// let inner = IndexSchedulerInner::new();
*lock = Some(todo!());
// run.env.close(); // run.env.close();
@ -1126,6 +1005,168 @@ pub struct IndexSchedulerInner {
} }
impl IndexSchedulerInner { impl IndexSchedulerInner {
fn new(
options: IndexSchedulerOptions,
#[cfg(test)] test_breakpoint_sdr: crossbeam::channel::Sender<(Breakpoint, bool)>,
#[cfg(test)] planned_failures: Vec<(usize, tests::FailureLocation)>,
) -> Result<Self> {
std::fs::create_dir_all(&options.tasks_path)?;
std::fs::create_dir_all(&options.update_file_path)?;
std::fs::create_dir_all(&options.indexes_path)?;
std::fs::create_dir_all(&options.dumps_path)?;
if cfg!(windows) && options.enable_mdb_writemap {
// programmer error if this happens: in normal use passing the option on Windows is an error in main
panic!("Windows doesn't support the MDB_WRITEMAP LMDB option");
}
let task_db_size = clamp_to_page_size(options.task_db_size);
let budget = if options.indexer_config.skip_index_budget {
IndexBudget {
map_size: options.index_base_map_size,
index_count: options.index_count,
task_db_size,
}
} else {
Self::index_budget(
&options.tasks_path,
options.index_base_map_size,
task_db_size,
options.index_count,
)
};
let env = heed::EnvOpenOptions::new()
.max_dbs(11)
.map_size(budget.task_db_size)
.open(options.tasks_path)?;
let features = features::FeatureData::new(&env, options.instance_features)?;
let file_store = FileStore::new(&options.update_file_path)?;
let mut wtxn = env.write_txn()?;
let all_tasks = env.create_database(&mut wtxn, Some(db_name::ALL_TASKS))?;
let status = env.create_database(&mut wtxn, Some(db_name::STATUS))?;
let kind = env.create_database(&mut wtxn, Some(db_name::KIND))?;
let index_tasks = env.create_database(&mut wtxn, Some(db_name::INDEX_TASKS))?;
let canceled_by = env.create_database(&mut wtxn, Some(db_name::CANCELED_BY))?;
let enqueued_at = env.create_database(&mut wtxn, Some(db_name::ENQUEUED_AT))?;
let started_at = env.create_database(&mut wtxn, Some(db_name::STARTED_AT))?;
let finished_at = env.create_database(&mut wtxn, Some(db_name::FINISHED_AT))?;
wtxn.commit()?;
// allow unreachable_code to get rids of the warning in the case of a test build.
Ok(IndexSchedulerInner {
must_stop_processing: MustStopProcessing::default(),
processing_tasks: Arc::new(RwLock::new(ProcessingTasks::new())),
file_store,
all_tasks,
status,
kind,
index_tasks,
canceled_by,
enqueued_at,
started_at,
finished_at,
index_mapper: IndexMapper::new(
&env,
options.indexes_path,
budget.map_size,
options.index_growth_amount,
budget.index_count,
options.enable_mdb_writemap,
options.indexer_config,
)?,
env,
// we want to start the loop right away in case meilisearch was ctrl+Ced while processing things
wake_up: Arc::new(SignalEvent::auto(true)),
autobatching_enabled: options.autobatching_enabled,
max_number_of_tasks: options.max_number_of_tasks,
dumps_path: options.dumps_path,
snapshots_path: options.snapshots_path,
auth_path: options.auth_path,
version_file_path: options.version_file_path,
zookeeper: options.zookeeper,
#[cfg(test)]
test_breakpoint_sdr,
#[cfg(test)]
planned_failures,
#[cfg(test)]
run_loop_iteration: Arc::new(RwLock::new(0)),
features,
})
}
fn index_budget(
tasks_path: &Path,
base_map_size: usize,
mut task_db_size: usize,
max_index_count: usize,
) -> IndexBudget {
#[cfg(windows)]
const DEFAULT_BUDGET: usize = 6 * 1024 * 1024 * 1024 * 1024; // 6 TiB, 1 index
#[cfg(not(windows))]
const DEFAULT_BUDGET: usize = 80 * 1024 * 1024 * 1024 * 1024; // 80 TiB, 18 indexes
let budget = if Self::is_good_heed(tasks_path, DEFAULT_BUDGET) {
DEFAULT_BUDGET
} else {
log::debug!("determining budget with dichotomic search");
utils::dichotomic_search(DEFAULT_BUDGET / 2, |map_size| {
Self::is_good_heed(tasks_path, map_size)
})
};
log::debug!("memmap budget: {budget}B");
let mut budget = budget / 2;
if task_db_size > (budget / 2) {
task_db_size = clamp_to_page_size(budget * 2 / 5);
log::debug!(
"Decreasing max size of task DB to {task_db_size}B due to constrained memory space"
);
}
budget -= task_db_size;
// won't be mutated again
let budget = budget;
let task_db_size = task_db_size;
log::debug!("index budget: {budget}B");
let mut index_count = budget / base_map_size;
if index_count < 2 {
// take a bit less than half than the budget to make sure we can always afford to open an index
let map_size = (budget * 2) / 5;
// single index of max budget
log::debug!("1 index of {map_size}B can be opened simultaneously.");
return IndexBudget { map_size, index_count: 1, task_db_size };
}
// give us some space for an additional index when the cache is already full
// decrement is OK because index_count >= 2.
index_count -= 1;
if index_count > max_index_count {
index_count = max_index_count;
}
log::debug!("Up to {index_count} indexes of {base_map_size}B opened simultaneously.");
IndexBudget { map_size: base_map_size, index_count, task_db_size }
}
fn is_good_heed(tasks_path: &Path, map_size: usize) -> bool {
if let Ok(env) =
heed::EnvOpenOptions::new().map_size(clamp_to_page_size(map_size)).open(tasks_path)
{
env.prepare_for_closing().wait();
true
} else {
// We're treating all errors equally here, not only allocation errors.
// This means there's a possiblity for the budget to lower due to errors different from allocation errors.
// For persistent errors, this is OK as long as the task db is then reopened normally without ignoring the error this time.
// For transient errors, this could lead to an instance with too low a budget.
// However transient errors are: 1) less likely than persistent errors 2) likely to cause other issues down the line anyway.
false
}
}
pub fn read_txn(&self) -> Result<RoTxn> { pub fn read_txn(&self) -> Result<RoTxn> {
self.env.read_txn().map_err(|e| e.into()) self.env.read_txn().map_err(|e| e.into())
} }

View File

@ -1,5 +1,6 @@
//! Thread-safe `Vec`-backend LRU cache using [`std::sync::atomic::AtomicU64`] for synchronization. //! Thread-safe `Vec`-backend LRU cache using [`std::sync::atomic::AtomicU64`] for synchronization.
use std::mem;
use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::atomic::{AtomicU64, Ordering};
/// Thread-safe `Vec`-backend LRU cache /// Thread-safe `Vec`-backend LRU cache
@ -190,6 +191,11 @@ where
} }
None None
} }
/// Returns the generation associated to the key and values of the `LruMap`.
pub fn clear(&mut self) -> Vec<(AtomicU64, (K, V))> {
mem::take(&mut self.0.data)
}
} }
/// The result of an insertion in a LRU map. /// The result of an insertion in a LRU map.