diff --git a/crates/index-scheduler/src/index_mapper/index_map.rs b/crates/index-scheduler/src/index_mapper/index_map.rs index 480dafa7c..ad89ffbf9 100644 --- a/crates/index-scheduler/src/index_mapper/index_map.rs +++ b/crates/index-scheduler/src/index_mapper/index_map.rs @@ -1,5 +1,7 @@ use std::collections::BTreeMap; +use std::env::VarError; use std::path::Path; +use std::str::FromStr; use std::time::Duration; use meilisearch_types::heed::{EnvClosingEvent, EnvFlags, EnvOpenOptions}; @@ -302,7 +304,18 @@ fn create_or_open_index( ) -> Result { let mut options = EnvOpenOptions::new(); options.map_size(clamp_to_page_size(map_size)); - options.max_readers(1024); + + // You can find more details about this experimental + // environment variable on the following GitHub discussion: + // + let max_readers = match std::env::var("MEILI_EXPERIMENTAL_INDEX_MAX_READERS") { + Ok(value) => u32::from_str(&value).unwrap(), + Err(VarError::NotPresent) => 1024, + Err(VarError::NotUnicode(value)) => panic!( + "Invalid unicode for the `MEILI_EXPERIMENTAL_INDEX_MAX_READERS` env var: {value:?}" + ), + }; + options.max_readers(max_readers); if enable_mdb_writemap { unsafe { options.flags(EnvFlags::WRITE_MAP) }; } diff --git a/crates/milli/src/thread_pool_no_abort.rs b/crates/milli/src/thread_pool_no_abort.rs index 14e5b0491..b57050a63 100644 --- a/crates/milli/src/thread_pool_no_abort.rs +++ b/crates/milli/src/thread_pool_no_abort.rs @@ -1,4 +1,4 @@ -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; use rayon::{ThreadPool, ThreadPoolBuilder}; @@ -9,6 +9,8 @@ use thiserror::Error; #[derive(Debug)] pub struct ThreadPoolNoAbort { thread_pool: ThreadPool, + /// The number of active operations. + active_operations: AtomicUsize, /// Set to true if the thread pool catched a panic. pool_catched_panic: Arc, } @@ -19,7 +21,9 @@ impl ThreadPoolNoAbort { OP: FnOnce() -> R + Send, R: Send, { + self.active_operations.fetch_add(1, Ordering::Relaxed); let output = self.thread_pool.install(op); + self.active_operations.fetch_sub(1, Ordering::Relaxed); // While reseting the pool panic catcher we return an error if we catched one. if self.pool_catched_panic.swap(false, Ordering::SeqCst) { Err(PanicCatched) @@ -31,6 +35,11 @@ impl ThreadPoolNoAbort { pub fn current_num_threads(&self) -> usize { self.thread_pool.current_num_threads() } + + /// The number of active operations. + pub fn active_operations(&self) -> usize { + self.active_operations.load(Ordering::Relaxed) + } } #[derive(Error, Debug)] @@ -64,6 +73,10 @@ impl ThreadPoolNoAbortBuilder { let catched_panic = pool_catched_panic.clone(); move |_result| catched_panic.store(true, Ordering::SeqCst) }); - Ok(ThreadPoolNoAbort { thread_pool: self.0.build()?, pool_catched_panic }) + Ok(ThreadPoolNoAbort { + thread_pool: self.0.build()?, + active_operations: AtomicUsize::new(0), + pool_catched_panic, + }) } } diff --git a/crates/milli/src/vector/ollama.rs b/crates/milli/src/vector/ollama.rs index 7ee775cbf..863a6c39d 100644 --- a/crates/milli/src/vector/ollama.rs +++ b/crates/milli/src/vector/ollama.rs @@ -5,7 +5,7 @@ use rayon::slice::ParallelSlice as _; use super::error::{EmbedError, EmbedErrorKind, NewEmbedderError, NewEmbedderErrorKind}; use super::rest::{Embedder as RestEmbedder, EmbedderOptions as RestEmbedderOptions}; -use super::DistributionShift; +use super::{DistributionShift, REQUEST_PARALLELISM}; use crate::error::FaultSource; use crate::vector::Embedding; use crate::ThreadPoolNoAbort; @@ -98,14 +98,20 @@ impl Embedder { text_chunks: Vec>, threads: &ThreadPoolNoAbort, ) -> Result>, EmbedError> { - threads - .install(move || { - text_chunks.into_par_iter().map(move |chunk| self.embed(&chunk, None)).collect() - }) - .map_err(|error| EmbedError { - kind: EmbedErrorKind::PanicInThreadPool(error), - fault: FaultSource::Bug, - })? + // This condition helps reduce the number of active rayon jobs + // so that we avoid consuming all the LMDB rtxns and avoid stack overflows. + if threads.active_operations() >= REQUEST_PARALLELISM { + text_chunks.into_iter().map(move |chunk| self.embed(&chunk, None)).collect() + } else { + threads + .install(move || { + text_chunks.into_par_iter().map(move |chunk| self.embed(&chunk, None)).collect() + }) + .map_err(|error| EmbedError { + kind: EmbedErrorKind::PanicInThreadPool(error), + fault: FaultSource::Bug, + })? + } } pub(crate) fn embed_chunks_ref( @@ -113,20 +119,32 @@ impl Embedder { texts: &[&str], threads: &ThreadPoolNoAbort, ) -> Result>, EmbedError> { - threads - .install(move || { - let embeddings: Result>, _> = texts - .par_chunks(self.prompt_count_in_chunk_hint()) - .map(move |chunk| self.embed(chunk, None)) - .collect(); + // This condition helps reduce the number of active rayon jobs + // so that we avoid consuming all the LMDB rtxns and avoid stack overflows. + if threads.active_operations() >= REQUEST_PARALLELISM { + let embeddings: Result>, _> = texts + .chunks(self.prompt_count_in_chunk_hint()) + .map(move |chunk| self.embed(chunk, None)) + .collect(); - let embeddings = embeddings?; - Ok(embeddings.into_iter().flatten().collect()) - }) - .map_err(|error| EmbedError { - kind: EmbedErrorKind::PanicInThreadPool(error), - fault: FaultSource::Bug, - })? + let embeddings = embeddings?; + Ok(embeddings.into_iter().flatten().collect()) + } else { + threads + .install(move || { + let embeddings: Result>, _> = texts + .par_chunks(self.prompt_count_in_chunk_hint()) + .map(move |chunk| self.embed(chunk, None)) + .collect(); + + let embeddings = embeddings?; + Ok(embeddings.into_iter().flatten().collect()) + }) + .map_err(|error| EmbedError { + kind: EmbedErrorKind::PanicInThreadPool(error), + fault: FaultSource::Bug, + })? + } } pub fn chunk_count_hint(&self) -> usize { diff --git a/crates/milli/src/vector/openai.rs b/crates/milli/src/vector/openai.rs index 7262bfef8..681fab142 100644 --- a/crates/milli/src/vector/openai.rs +++ b/crates/milli/src/vector/openai.rs @@ -6,7 +6,7 @@ use rayon::slice::ParallelSlice as _; use super::error::{EmbedError, NewEmbedderError}; use super::rest::{Embedder as RestEmbedder, EmbedderOptions as RestEmbedderOptions}; -use super::DistributionShift; +use super::{DistributionShift, REQUEST_PARALLELISM}; use crate::error::FaultSource; use crate::vector::error::EmbedErrorKind; use crate::vector::Embedding; @@ -255,14 +255,20 @@ impl Embedder { text_chunks: Vec>, threads: &ThreadPoolNoAbort, ) -> Result>, EmbedError> { - threads - .install(move || { - text_chunks.into_par_iter().map(move |chunk| self.embed(&chunk, None)).collect() - }) - .map_err(|error| EmbedError { - kind: EmbedErrorKind::PanicInThreadPool(error), - fault: FaultSource::Bug, - })? + // This condition helps reduce the number of active rayon jobs + // so that we avoid consuming all the LMDB rtxns and avoid stack overflows. + if threads.active_operations() >= REQUEST_PARALLELISM { + text_chunks.into_iter().map(move |chunk| self.embed(&chunk, None)).collect() + } else { + threads + .install(move || { + text_chunks.into_par_iter().map(move |chunk| self.embed(&chunk, None)).collect() + }) + .map_err(|error| EmbedError { + kind: EmbedErrorKind::PanicInThreadPool(error), + fault: FaultSource::Bug, + })? + } } pub(crate) fn embed_chunks_ref( @@ -270,20 +276,31 @@ impl Embedder { texts: &[&str], threads: &ThreadPoolNoAbort, ) -> Result>, EmbedError> { - threads - .install(move || { - let embeddings: Result>, _> = texts - .par_chunks(self.prompt_count_in_chunk_hint()) - .map(move |chunk| self.embed(chunk, None)) - .collect(); + // This condition helps reduce the number of active rayon jobs + // so that we avoid consuming all the LMDB rtxns and avoid stack overflows. + if threads.active_operations() >= REQUEST_PARALLELISM { + let embeddings: Result>, _> = texts + .chunks(self.prompt_count_in_chunk_hint()) + .map(move |chunk| self.embed(chunk, None)) + .collect(); + let embeddings = embeddings?; + Ok(embeddings.into_iter().flatten().collect()) + } else { + threads + .install(move || { + let embeddings: Result>, _> = texts + .par_chunks(self.prompt_count_in_chunk_hint()) + .map(move |chunk| self.embed(chunk, None)) + .collect(); - let embeddings = embeddings?; - Ok(embeddings.into_iter().flatten().collect()) - }) - .map_err(|error| EmbedError { - kind: EmbedErrorKind::PanicInThreadPool(error), - fault: FaultSource::Bug, - })? + let embeddings = embeddings?; + Ok(embeddings.into_iter().flatten().collect()) + }) + .map_err(|error| EmbedError { + kind: EmbedErrorKind::PanicInThreadPool(error), + fault: FaultSource::Bug, + })? + } } pub fn chunk_count_hint(&self) -> usize { diff --git a/crates/milli/src/vector/rest.rs b/crates/milli/src/vector/rest.rs index 98be311d4..5c2d6fe7a 100644 --- a/crates/milli/src/vector/rest.rs +++ b/crates/milli/src/vector/rest.rs @@ -188,14 +188,20 @@ impl Embedder { text_chunks: Vec>, threads: &ThreadPoolNoAbort, ) -> Result>, EmbedError> { - threads - .install(move || { - text_chunks.into_par_iter().map(move |chunk| self.embed(chunk, None)).collect() - }) - .map_err(|error| EmbedError { - kind: EmbedErrorKind::PanicInThreadPool(error), - fault: FaultSource::Bug, - })? + // This condition helps reduce the number of active rayon jobs + // so that we avoid consuming all the LMDB rtxns and avoid stack overflows. + if threads.active_operations() >= REQUEST_PARALLELISM { + text_chunks.into_iter().map(move |chunk| self.embed(chunk, None)).collect() + } else { + threads + .install(move || { + text_chunks.into_par_iter().map(move |chunk| self.embed(chunk, None)).collect() + }) + .map_err(|error| EmbedError { + kind: EmbedErrorKind::PanicInThreadPool(error), + fault: FaultSource::Bug, + })? + } } pub(crate) fn embed_chunks_ref( @@ -203,20 +209,32 @@ impl Embedder { texts: &[&str], threads: &ThreadPoolNoAbort, ) -> Result, EmbedError> { - threads - .install(move || { - let embeddings: Result>, _> = texts - .par_chunks(self.prompt_count_in_chunk_hint()) - .map(move |chunk| self.embed_ref(chunk, None)) - .collect(); + // This condition helps reduce the number of active rayon jobs + // so that we avoid consuming all the LMDB rtxns and avoid stack overflows. + if threads.active_operations() >= REQUEST_PARALLELISM { + let embeddings: Result>, _> = texts + .chunks(self.prompt_count_in_chunk_hint()) + .map(move |chunk| self.embed_ref(chunk, None)) + .collect(); - let embeddings = embeddings?; - Ok(embeddings.into_iter().flatten().collect()) - }) - .map_err(|error| EmbedError { - kind: EmbedErrorKind::PanicInThreadPool(error), - fault: FaultSource::Bug, - })? + let embeddings = embeddings?; + Ok(embeddings.into_iter().flatten().collect()) + } else { + threads + .install(move || { + let embeddings: Result>, _> = texts + .par_chunks(self.prompt_count_in_chunk_hint()) + .map(move |chunk| self.embed_ref(chunk, None)) + .collect(); + + let embeddings = embeddings?; + Ok(embeddings.into_iter().flatten().collect()) + }) + .map_err(|error| EmbedError { + kind: EmbedErrorKind::PanicInThreadPool(error), + fault: FaultSource::Bug, + })? + } } pub fn chunk_count_hint(&self) -> usize {