Fix the new config file with the index scheduler

This commit is contained in:
Clément Renault 2022-10-20 16:49:19 +02:00
parent 72ec4ce96b
commit 61edcd585a
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
3 changed files with 58 additions and 15 deletions

View File

@ -5,7 +5,6 @@ use std::sync::Arc;
use actix_web::http::KeepAlive; use actix_web::http::KeepAlive;
use actix_web::web::Data; use actix_web::web::Data;
use actix_web::HttpServer; use actix_web::HttpServer;
use clap::Parser;
use index_scheduler::IndexScheduler; use index_scheduler::IndexScheduler;
use meilisearch_auth::AuthController; use meilisearch_auth::AuthController;
use meilisearch_http::analytics::Analytics; use meilisearch_http::analytics::Analytics;

View File

@ -4,6 +4,8 @@ use std::num::ParseIntError;
use std::ops::Deref; use std::ops::Deref;
use std::path::PathBuf; use std::path::PathBuf;
use std::str::FromStr; use std::str::FromStr;
use std::ffi::OsStr;
use std::env::VarError;
use std::sync::Arc; use std::sync::Arc;
use std::{env, fmt, fs}; use std::{env, fmt, fs};
@ -65,6 +67,11 @@ const DEFAULT_SNAPSHOT_INTERVAL_SEC: u64 = 86400;
const DEFAULT_DUMPS_DIR: &str = "dumps/"; const DEFAULT_DUMPS_DIR: &str = "dumps/";
const DEFAULT_LOG_LEVEL: &str = "INFO"; const DEFAULT_LOG_LEVEL: &str = "INFO";
const MEILI_MAX_INDEXING_MEMORY: &str = "MEILI_MAX_INDEXING_MEMORY";
const MEILI_MAX_INDEXING_THREADS: &str = "MEILI_MAX_INDEXING_THREADS";
const DISABLE_AUTO_BATCHING: &str = "DISABLE_AUTO_BATCHING";
const DEFAULT_LOG_EVERY_N: usize = 100000;
#[derive(Debug, Clone, Parser, Serialize, Deserialize)] #[derive(Debug, Clone, Parser, Serialize, Deserialize)]
#[clap(version, next_display_order = None)] #[clap(version, next_display_order = None)]
#[serde(rename_all = "snake_case", deny_unknown_fields)] #[serde(rename_all = "snake_case", deny_unknown_fields)]
@ -435,27 +442,26 @@ impl Opt {
} }
} }
#[derive(Debug, Clone, Parser, Serialize)] #[derive(Debug, Clone, Parser, Deserialize, Serialize)]
pub struct IndexerOpts { pub struct IndexerOpts {
/// The amount of documents to skip before printing /// The amount of documents to skip before printing
/// a log regarding the indexing advancement. /// a log regarding the indexing advancement.
#[serde(skip)] #[serde(skip_serializing, default = "default_log_every_n")]
#[clap(long, default_value = "100000", hide = true)] // 100k #[clap(long, default_value_t = default_log_every_n(), hide = true)] // 100k
pub log_every_n: usize, pub log_every_n: usize,
/// Grenad max number of chunks in bytes. /// Grenad max number of chunks in bytes.
#[serde(skip)] #[serde(skip_serializing)]
#[clap(long, hide = true)] #[clap(long, hide = true)]
pub max_nb_chunks: Option<usize>, pub max_nb_chunks: Option<usize>,
/// The maximum amount of memory the indexer will use. It defaults to 2/3 /// The maximum amount of memory the indexer will use.
/// of the available memory. It is recommended to use something like 80%-90%
/// of the available memory, no more.
/// ///
/// In case the engine is unable to retrieve the available memory the engine will /// In case the engine is unable to retrieve the available memory the engine will
/// try to use the memory it needs but without real limit, this can lead to /// try to use the memory it needs but without real limit, this can lead to
/// Out-Of-Memory issues and it is recommended to specify the amount of memory to use. /// Out-Of-Memory issues and it is recommended to specify the amount of memory to use.
#[clap(long, env = "MEILI_MAX_INDEXING_MEMORY", default_value_t)] #[clap(long, env = MEILI_MAX_INDEXING_MEMORY, default_value_t)]
#[serde(default)]
pub max_indexing_memory: MaxMemory, pub max_indexing_memory: MaxMemory,
/// The maximum number of threads the indexer will use. /// The maximum number of threads the indexer will use.
@ -463,18 +469,52 @@ pub struct IndexerOpts {
/// it will use the maximum number of available cores. /// it will use the maximum number of available cores.
/// ///
/// It defaults to half of the available threads. /// It defaults to half of the available threads.
#[clap(long, env = "MEILI_MAX_INDEXING_THREADS", default_value_t)] #[clap(long, env = MEILI_MAX_INDEXING_THREADS, default_value_t)]
#[serde(default)]
pub max_indexing_threads: MaxThreads, pub max_indexing_threads: MaxThreads,
} }
#[derive(Debug, Clone, Parser, Default, Serialize)] impl IndexerOpts {
/// Exports the values to their corresponding env vars if they are not set.
pub fn export_to_env(self) {
let IndexerOpts {
max_indexing_memory,
max_indexing_threads,
log_every_n: _,
max_nb_chunks: _,
} = self;
if let Some(max_indexing_memory) = max_indexing_memory.0 {
export_to_env_if_not_present(
MEILI_MAX_INDEXING_MEMORY,
max_indexing_memory.to_string(),
);
}
export_to_env_if_not_present(
MEILI_MAX_INDEXING_THREADS,
max_indexing_threads.0.to_string(),
);
}
}
#[derive(Debug, Clone, Parser, Default, Deserialize, Serialize)]
#[serde(rename_all = "snake_case", deny_unknown_fields)]
pub struct SchedulerConfig { pub struct SchedulerConfig {
/// The engine will disable task auto-batching, /// The engine will disable task auto-batching,
/// and will sequencialy compute each task one by one. /// and will sequencialy compute each task one by one.
#[clap(long, env = "DISABLE_AUTO_BATCHING")] #[clap(long, env = DISABLE_AUTO_BATCHING)]
#[serde(default)]
pub disable_auto_batching: bool, pub disable_auto_batching: bool,
} }
impl SchedulerConfig {
pub fn export_to_env(self) {
let SchedulerConfig {
disable_auto_batching,
} = self;
export_to_env_if_not_present(DISABLE_AUTO_BATCHING, disable_auto_batching.to_string());
}
}
impl TryFrom<&IndexerOpts> for IndexerConfig { impl TryFrom<&IndexerOpts> for IndexerConfig {
type Error = anyhow::Error; type Error = anyhow::Error;
@ -506,7 +546,7 @@ impl Default for IndexerOpts {
} }
/// A type used to detect the max memory available and use 2/3 of it. /// A type used to detect the max memory available and use 2/3 of it.
#[derive(Debug, Clone, Copy, Serialize)] #[derive(Debug, Clone, Copy, Deserialize, Serialize)]
pub struct MaxMemory(Option<Byte>); pub struct MaxMemory(Option<Byte>);
impl FromStr for MaxMemory { impl FromStr for MaxMemory {
@ -562,7 +602,7 @@ fn total_memory_bytes() -> Option<u64> {
} }
} }
#[derive(Debug, Clone, Copy, Serialize)] #[derive(Debug, Clone, Copy, Deserialize, Serialize)]
pub struct MaxThreads(usize); pub struct MaxThreads(usize);
impl FromStr for MaxThreads { impl FromStr for MaxThreads {
@ -697,6 +737,10 @@ fn default_log_level() -> String {
DEFAULT_LOG_LEVEL.to_string() DEFAULT_LOG_LEVEL.to_string()
} }
fn default_log_every_n() -> usize {
DEFAULT_LOG_EVERY_N
}
#[cfg(test)] #[cfg(test)]
mod test { mod test {

View File

@ -96,7 +96,7 @@ pub async fn create_index(
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
let IndexCreateRequest { primary_key, uid } = body.into_inner(); let IndexCreateRequest { primary_key, uid } = body.into_inner();
let allow_index_creation = meilisearch.filters().search_rules.is_index_authorized(&uid); let allow_index_creation = index_scheduler.filters().search_rules.is_index_authorized(&uid);
if allow_index_creation { if allow_index_creation {
analytics.publish( analytics.publish(
"Index Created".to_string(), "Index Created".to_string(),