Introduce an options struct to create the IndexScheduler

This commit is contained in:
Kerollmops 2022-10-26 11:41:59 +02:00 committed by Tamo
parent 426246b1b4
commit 6167baa9eb

View File

@ -190,6 +190,46 @@ mod db_name {
pub const FINISHED_AT: &str = "finished-at";
}
#[cfg(test)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Breakpoint {
Start,
BatchCreated,
BeforeProcessing,
AfterProcessing,
AbortedIndexation,
ProcessBatchSucceeded,
ProcessBatchFailed,
InsideProcessBatch,
}
#[derive(Debug)]
pub struct IndexSchedulerOptions {
/// The path to the version file of Meilisearch.
version_file_path: PathBuf,
/// The path to the folder containing the auth LMDB env.
auth_path: PathBuf,
/// The path to the folder containing the task databases.
tasks_path: PathBuf,
/// The path to the file store containing the files associated to the tasks.
update_file_path: PathBuf,
/// The path to the folder containing meilisearch's indexes.
indexes_path: PathBuf,
/// The path to the folder containing the snapshots.
snapshots_path: PathBuf,
/// The path to the folder containing the dumps.
dumps_path: PathBuf,
/// The maximum size, in bytes, of each meilisearch index.
task_db_size: usize,
/// The maximum size, in bytes, of the tasks index.
index_size: usize,
/// Configuration used during indexing for each meilisearch index.
indexer_config: IndexerConfig,
/// Set to `true` iff the index scheduler is allowed to automatically
/// batch tasks together, to process multiple tasks at once.
autobatching_enabled: bool,
}
/// Structure which holds meilisearch's indexes and schedules the tasks
/// to be performed on them.
pub struct IndexScheduler {
@ -269,6 +309,7 @@ pub struct IndexScheduler {
/// A counter that is incremented before every call to [`tick`](IndexScheduler::tick)
run_loop_iteration: Arc<RwLock<usize>>,
}
impl IndexScheduler {
fn private_clone(&self) -> IndexScheduler {
IndexScheduler {
@ -300,62 +341,24 @@ impl IndexScheduler {
}
}
#[cfg(test)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Breakpoint {
Start,
BatchCreated,
BeforeProcessing,
AfterProcessing,
AbortedIndexation,
ProcessBatchSucceeded,
ProcessBatchFailed,
InsideProcessBatch,
}
impl IndexScheduler {
// TODO create a struct of options with a documented field for each required option instead
/// Create an index scheduler and start its run loop.
///
/// ## Arguments
/// - `version_file_path`: the path to the version file of Meilisearch
/// - `auth_path`: the path to the folder containing the auth LMDB env
/// - `tasks_path`: the path to the folder containing the task databases
/// - `update_file_path`: the path to the file store containing the files associated to the tasks
/// - `indexes_path`: the path to the folder containing meilisearch's indexes
/// - `snapshots_path`: the path to the folder containing the snapshots
/// - `dumps_path`: the path to the folder containing the dumps
/// - `index_size`: the maximum size, in bytes, of each meilisearch index
/// - `indexer_config`: configuration used during indexing for each meilisearch index
/// - `autobatching_enabled`: `true` iff the index scheduler is allowed to automatically batch tasks
/// together, to process multiple tasks at once.
#[allow(clippy::too_many_arguments)]
pub fn new(
version_file_path: PathBuf,
auth_path: PathBuf,
tasks_path: PathBuf,
update_file_path: PathBuf,
indexes_path: PathBuf,
snapshots_path: PathBuf,
dumps_path: PathBuf,
task_db_size: usize,
index_size: usize,
indexer_config: IndexerConfig,
autobatching_enabled: bool,
options: IndexSchedulerOptions,
#[cfg(test)] test_breakpoint_sdr: crossbeam::channel::Sender<(Breakpoint, bool)>,
#[cfg(test)] planned_failures: Vec<(usize, tests::FailureLocation)>,
) -> Result<Self> {
std::fs::create_dir_all(&tasks_path)?;
std::fs::create_dir_all(&update_file_path)?;
std::fs::create_dir_all(&indexes_path)?;
std::fs::create_dir_all(&dumps_path)?;
std::fs::create_dir_all(&options.tasks_path)?;
std::fs::create_dir_all(&options.update_file_path)?;
std::fs::create_dir_all(&options.indexes_path)?;
std::fs::create_dir_all(&options.dumps_path)?;
let mut options = heed::EnvOpenOptions::new();
options.max_dbs(9);
options.map_size(task_db_size);
let env = options.open(tasks_path)?;
let file_store = FileStore::new(&update_file_path)?;
let env = heed::EnvOpenOptions::new()
.max_dbs(9)
.map_size(options.task_db_size)
.open(options.tasks_path)?;
let file_store = FileStore::new(&options.update_file_path)?;
// allow unreachable_code to get rids of the warning in the case of a test build.
let this = Self {
@ -369,15 +372,20 @@ impl IndexScheduler {
enqueued_at: env.create_database(Some(db_name::ENQUEUED_AT))?,
started_at: env.create_database(Some(db_name::STARTED_AT))?,
finished_at: env.create_database(Some(db_name::FINISHED_AT))?,
index_mapper: IndexMapper::new(&env, indexes_path, index_size, indexer_config)?,
index_mapper: IndexMapper::new(
&env,
options.indexes_path,
options.index_size,
options.indexer_config,
)?,
env,
// we want to start the loop right away in case meilisearch was ctrl+Ced while processing things
wake_up: Arc::new(SignalEvent::auto(true)),
autobatching_enabled,
dumps_path,
snapshots_path,
auth_path,
version_file_path,
autobatching_enabled: options.autobatching_enabled,
dumps_path: options.dumps_path,
snapshots_path: options.snapshots_path,
auth_path: options.auth_path,
version_file_path: options.version_file_path,
#[cfg(test)]
test_breakpoint_sdr,
@ -976,28 +984,27 @@ mod tests {
impl IndexScheduler {
pub fn test(
autobatching: bool,
autobatching_enabled: bool,
planned_failures: Vec<(usize, FailureLocation)>,
) -> (Self, IndexSchedulerHandle) {
let tempdir = TempDir::new().unwrap();
let (sender, receiver) = crossbeam::channel::bounded(0);
let index_scheduler = Self::new(
tempdir.path().join(VERSION_FILE_NAME),
tempdir.path().join("auth"),
tempdir.path().join("db_path"),
tempdir.path().join("file_store"),
tempdir.path().join("indexes"),
tempdir.path().join("snapshots"),
tempdir.path().join("dumps"),
1024 * 1024,
1024 * 1024,
IndexerConfig::default(),
autobatching, // enable autobatching
sender,
planned_failures,
)
.unwrap();
let options = IndexSchedulerOptions {
version_file_path: tempdir.path().join(VERSION_FILE_NAME),
auth_path: tempdir.path().join("auth"),
tasks_path: tempdir.path().join("db_path"),
update_file_path: tempdir.path().join("file_store"),
indexes_path: tempdir.path().join("indexes"),
snapshots_path: tempdir.path().join("snapshots"),
dumps_path: tempdir.path().join("dumps"),
task_db_size: 1024 * 1024, // 1 MiB
index_size: 1024 * 1024, // 1 MiB
indexer_config: IndexerConfig::default(),
autobatching_enabled,
};
let index_scheduler = Self::new(options, sender, planned_failures).unwrap();
let index_scheduler_handle =
IndexSchedulerHandle { _tempdir: tempdir, test_breakpoint_rcv: receiver };