mirror of
https://github.com/meilisearch/meilisearch.git
synced 2024-11-23 18:45:06 +08:00
206 lines
5.9 KiB
Rust
206 lines
5.9 KiB
Rust
use crate::export_to_env_if_not_present;
|
|
|
|
use core::fmt;
|
|
use std::{convert::TryFrom, num::ParseIntError, ops::Deref, str::FromStr};
|
|
|
|
use byte_unit::{Byte, ByteError};
|
|
use clap::Parser;
|
|
use milli::update::IndexerConfig;
|
|
use serde::{Deserialize, Serialize};
|
|
use sysinfo::{RefreshKind, System, SystemExt};
|
|
|
|
const MEILI_MAX_INDEXING_MEMORY: &str = "MEILI_MAX_INDEXING_MEMORY";
|
|
const MEILI_MAX_INDEXING_THREADS: &str = "MEILI_MAX_INDEXING_THREADS";
|
|
const DISABLE_AUTO_BATCHING: &str = "DISABLE_AUTO_BATCHING";
|
|
const DEFAULT_LOG_EVERY_N: usize = 100000;
|
|
|
|
#[derive(Debug, Clone, Parser, Serialize, Deserialize)]
|
|
#[serde(rename_all = "snake_case", deny_unknown_fields)]
|
|
pub struct IndexerOpts {
|
|
/// Sets the amount of documents to skip before printing
|
|
/// a log regarding the indexing advancement.
|
|
#[serde(skip_serializing, default = "default_log_every_n")]
|
|
#[clap(long, default_value_t = default_log_every_n(), hide = true)] // 100k
|
|
pub log_every_n: usize,
|
|
|
|
/// Grenad max number of chunks in bytes.
|
|
#[serde(skip_serializing)]
|
|
#[clap(long, hide = true)]
|
|
pub max_nb_chunks: Option<usize>,
|
|
|
|
/// Sets the maximum amount of RAM Meilisearch can use when indexing. By default, Meilisearch
|
|
/// uses no more than two thirds of available memory.
|
|
#[clap(long, env = MEILI_MAX_INDEXING_MEMORY, default_value_t)]
|
|
#[serde(default)]
|
|
pub max_indexing_memory: MaxMemory,
|
|
|
|
/// Sets the maximum number of threads Meilisearch can use during indexation. By default, the
|
|
/// indexer avoids using more than half of a machine's total processing units. This ensures
|
|
/// Meilisearch is always ready to perform searches, even while you are updating an index.
|
|
#[clap(long, env = MEILI_MAX_INDEXING_THREADS, default_value_t)]
|
|
#[serde(default)]
|
|
pub max_indexing_threads: MaxThreads,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Parser, Default, Serialize, Deserialize)]
|
|
#[serde(rename_all = "snake_case", deny_unknown_fields)]
|
|
pub struct SchedulerConfig {
|
|
/// Deactivates auto-batching when provided.
|
|
#[clap(long, env = DISABLE_AUTO_BATCHING)]
|
|
#[serde(default)]
|
|
pub disable_auto_batching: bool,
|
|
}
|
|
|
|
impl IndexerOpts {
|
|
/// Exports the values to their corresponding env vars if they are not set.
|
|
pub fn export_to_env(self) {
|
|
let IndexerOpts {
|
|
max_indexing_memory,
|
|
max_indexing_threads,
|
|
log_every_n: _,
|
|
max_nb_chunks: _,
|
|
} = self;
|
|
if let Some(max_indexing_memory) = max_indexing_memory.0 {
|
|
export_to_env_if_not_present(
|
|
MEILI_MAX_INDEXING_MEMORY,
|
|
max_indexing_memory.to_string(),
|
|
);
|
|
}
|
|
export_to_env_if_not_present(
|
|
MEILI_MAX_INDEXING_THREADS,
|
|
max_indexing_threads.0.to_string(),
|
|
);
|
|
}
|
|
}
|
|
|
|
impl TryFrom<&IndexerOpts> for IndexerConfig {
|
|
type Error = anyhow::Error;
|
|
|
|
fn try_from(other: &IndexerOpts) -> Result<Self, Self::Error> {
|
|
let thread_pool = rayon::ThreadPoolBuilder::new()
|
|
.num_threads(*other.max_indexing_threads)
|
|
.build()?;
|
|
|
|
Ok(Self {
|
|
log_every_n: Some(other.log_every_n),
|
|
max_nb_chunks: other.max_nb_chunks,
|
|
max_memory: other.max_indexing_memory.map(|b| b.get_bytes() as usize),
|
|
thread_pool: Some(thread_pool),
|
|
max_positions_per_attributes: None,
|
|
..Default::default()
|
|
})
|
|
}
|
|
}
|
|
|
|
impl Default for IndexerOpts {
|
|
fn default() -> Self {
|
|
Self {
|
|
log_every_n: 100_000,
|
|
max_nb_chunks: None,
|
|
max_indexing_memory: MaxMemory::default(),
|
|
max_indexing_threads: MaxThreads::default(),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl SchedulerConfig {
|
|
pub fn export_to_env(self) {
|
|
let SchedulerConfig {
|
|
disable_auto_batching,
|
|
} = self;
|
|
export_to_env_if_not_present(DISABLE_AUTO_BATCHING, disable_auto_batching.to_string());
|
|
}
|
|
}
|
|
|
|
/// A type used to detect the max memory available and use 2/3 of it.
|
|
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
|
|
pub struct MaxMemory(Option<Byte>);
|
|
|
|
impl FromStr for MaxMemory {
|
|
type Err = ByteError;
|
|
|
|
fn from_str(s: &str) -> Result<MaxMemory, ByteError> {
|
|
Byte::from_str(s).map(Some).map(MaxMemory)
|
|
}
|
|
}
|
|
|
|
impl Default for MaxMemory {
|
|
fn default() -> MaxMemory {
|
|
MaxMemory(
|
|
total_memory_bytes()
|
|
.map(|bytes| bytes * 2 / 3)
|
|
.map(Byte::from_bytes),
|
|
)
|
|
}
|
|
}
|
|
|
|
impl fmt::Display for MaxMemory {
|
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
|
match self.0 {
|
|
Some(memory) => write!(f, "{}", memory.get_appropriate_unit(true)),
|
|
None => f.write_str("unknown"),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Deref for MaxMemory {
|
|
type Target = Option<Byte>;
|
|
|
|
fn deref(&self) -> &Self::Target {
|
|
&self.0
|
|
}
|
|
}
|
|
|
|
impl MaxMemory {
|
|
pub fn unlimited() -> Self {
|
|
Self(None)
|
|
}
|
|
}
|
|
|
|
/// Returns the total amount of bytes available or `None` if this system isn't supported.
|
|
fn total_memory_bytes() -> Option<u64> {
|
|
if System::IS_SUPPORTED {
|
|
let memory_kind = RefreshKind::new().with_memory();
|
|
let mut system = System::new_with_specifics(memory_kind);
|
|
system.refresh_memory();
|
|
Some(system.total_memory() * 1024) // KiB into bytes
|
|
} else {
|
|
None
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
|
|
pub struct MaxThreads(usize);
|
|
|
|
impl FromStr for MaxThreads {
|
|
type Err = ParseIntError;
|
|
|
|
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
|
usize::from_str(s).map(Self)
|
|
}
|
|
}
|
|
|
|
impl Default for MaxThreads {
|
|
fn default() -> Self {
|
|
MaxThreads(num_cpus::get() / 2)
|
|
}
|
|
}
|
|
|
|
impl fmt::Display for MaxThreads {
|
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
|
write!(f, "{}", self.0)
|
|
}
|
|
}
|
|
|
|
impl Deref for MaxThreads {
|
|
type Target = usize;
|
|
|
|
fn deref(&self) -> &Self::Target {
|
|
&self.0
|
|
}
|
|
}
|
|
|
|
fn default_log_every_n() -> usize {
|
|
DEFAULT_LOG_EVERY_N
|
|
}
|