mirror of
https://github.com/meilisearch/meilisearch.git
synced 2024-11-23 02:27:40 +08:00
Merge #4707
4707: Only spawn thread pool once r=irevoire a=dureuill # Pull Request ## Related issue Fixes #4692 ## What does this PR do? - There was a rayon thread pool of 40 threads that would be spawned multiple times per indexing operation. - Perhaps due to the sheer number of spawned threads, or to a leak in rayon thread pools, the system was unable to reclaim all the spawned threads at a sufficient rate. - As a result, the stack for the threads would accumulate and consume virtual memory, and eventually physical memory too. - Fortunately, the pool can actually be created once and then always reused. This PR performs this change. Co-authored-by: Louis Dureuil <louis@meilisearch.com>
This commit is contained in:
commit
7bd1b7ac43
@ -152,6 +152,7 @@ impl Settings<Unchecked> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Deserialize)]
|
#[derive(Debug, Clone, Deserialize)]
|
||||||
|
#[allow(dead_code)] // otherwise rustc complains that the fields go unused
|
||||||
#[cfg_attr(test, derive(serde::Serialize))]
|
#[cfg_attr(test, derive(serde::Serialize))]
|
||||||
#[serde(deny_unknown_fields)]
|
#[serde(deny_unknown_fields)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
|
@ -182,6 +182,7 @@ impl Settings<Unchecked> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(dead_code)] // otherwise rustc complains that the fields go unused
|
||||||
#[derive(Debug, Clone, Deserialize)]
|
#[derive(Debug, Clone, Deserialize)]
|
||||||
#[cfg_attr(test, derive(serde::Serialize))]
|
#[cfg_attr(test, derive(serde::Serialize))]
|
||||||
#[serde(deny_unknown_fields)]
|
#[serde(deny_unknown_fields)]
|
||||||
|
@ -200,6 +200,7 @@ impl std::ops::Deref for IndexUid {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(dead_code)] // otherwise rustc complains that the fields go unused
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
#[cfg_attr(test, derive(serde::Serialize))]
|
#[cfg_attr(test, derive(serde::Serialize))]
|
||||||
#[cfg_attr(test, serde(rename_all = "camelCase"))]
|
#[cfg_attr(test, serde(rename_all = "camelCase"))]
|
||||||
|
@ -74,10 +74,10 @@ csv = "1.3.0"
|
|||||||
candle-core = { version = "0.4.1" }
|
candle-core = { version = "0.4.1" }
|
||||||
candle-transformers = { version = "0.4.1" }
|
candle-transformers = { version = "0.4.1" }
|
||||||
candle-nn = { version = "0.4.1" }
|
candle-nn = { version = "0.4.1" }
|
||||||
tokenizers = { git = "https://github.com/huggingface/tokenizers.git", tag = "v0.15.2", version = "0.15.2", default_features = false, features = [
|
tokenizers = { git = "https://github.com/huggingface/tokenizers.git", tag = "v0.15.2", version = "0.15.2", default-features = false, features = [
|
||||||
"onig",
|
"onig",
|
||||||
] }
|
] }
|
||||||
hf-hub = { git = "https://github.com/dureuill/hf-hub.git", branch = "rust_tls", default_features = false, features = [
|
hf-hub = { git = "https://github.com/dureuill/hf-hub.git", branch = "rust_tls", default-features = false, features = [
|
||||||
"online",
|
"online",
|
||||||
] }
|
] }
|
||||||
tiktoken-rs = "0.5.8"
|
tiktoken-rs = "0.5.8"
|
||||||
|
@ -22,7 +22,7 @@ pub enum SearchEvents {
|
|||||||
RankingRuleStartIteration { ranking_rule_idx: usize, universe_len: u64 },
|
RankingRuleStartIteration { ranking_rule_idx: usize, universe_len: u64 },
|
||||||
RankingRuleNextBucket { ranking_rule_idx: usize, universe_len: u64, bucket_len: u64 },
|
RankingRuleNextBucket { ranking_rule_idx: usize, universe_len: u64, bucket_len: u64 },
|
||||||
RankingRuleSkipBucket { ranking_rule_idx: usize, bucket_len: u64 },
|
RankingRuleSkipBucket { ranking_rule_idx: usize, bucket_len: u64 },
|
||||||
RankingRuleEndIteration { ranking_rule_idx: usize, universe_len: u64 },
|
RankingRuleEndIteration { ranking_rule_idx: usize },
|
||||||
ExtendResults { new: Vec<u32> },
|
ExtendResults { new: Vec<u32> },
|
||||||
ProximityGraph { graph: RankingRuleGraph<ProximityGraph> },
|
ProximityGraph { graph: RankingRuleGraph<ProximityGraph> },
|
||||||
ProximityPaths { paths: Vec<Vec<Interned<ProximityCondition>>> },
|
ProximityPaths { paths: Vec<Vec<Interned<ProximityCondition>>> },
|
||||||
@ -123,12 +123,9 @@ impl SearchLogger<QueryGraph> for VisualSearchLogger {
|
|||||||
&mut self,
|
&mut self,
|
||||||
ranking_rule_idx: usize,
|
ranking_rule_idx: usize,
|
||||||
_ranking_rule: &dyn RankingRule<QueryGraph>,
|
_ranking_rule: &dyn RankingRule<QueryGraph>,
|
||||||
universe: &RoaringBitmap,
|
_universe: &RoaringBitmap,
|
||||||
) {
|
) {
|
||||||
self.events.push(SearchEvents::RankingRuleEndIteration {
|
self.events.push(SearchEvents::RankingRuleEndIteration { ranking_rule_idx });
|
||||||
ranking_rule_idx,
|
|
||||||
universe_len: universe.len(),
|
|
||||||
});
|
|
||||||
self.location.pop();
|
self.location.pop();
|
||||||
}
|
}
|
||||||
fn add_to_results(&mut self, docids: &[u32]) {
|
fn add_to_results(&mut self, docids: &[u32]) {
|
||||||
@ -326,7 +323,7 @@ impl<'ctx> DetailedLoggerFinish<'ctx> {
|
|||||||
assert!(ranking_rule_idx == self.rr_action_counter.len() - 1);
|
assert!(ranking_rule_idx == self.rr_action_counter.len() - 1);
|
||||||
self.write_skip_bucket(bucket_len)?;
|
self.write_skip_bucket(bucket_len)?;
|
||||||
}
|
}
|
||||||
SearchEvents::RankingRuleEndIteration { ranking_rule_idx, universe_len: _ } => {
|
SearchEvents::RankingRuleEndIteration { ranking_rule_idx } => {
|
||||||
assert!(ranking_rule_idx == self.rr_action_counter.len() - 1);
|
assert!(ranking_rule_idx == self.rr_action_counter.len() - 1);
|
||||||
self.write_end_iteration()?;
|
self.write_end_iteration()?;
|
||||||
}
|
}
|
||||||
|
@ -11,7 +11,7 @@ mod extract_word_position_docids;
|
|||||||
|
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::io::BufReader;
|
use std::io::BufReader;
|
||||||
use std::sync::Arc;
|
use std::sync::{Arc, OnceLock};
|
||||||
|
|
||||||
use crossbeam_channel::Sender;
|
use crossbeam_channel::Sender;
|
||||||
use rayon::prelude::*;
|
use rayon::prelude::*;
|
||||||
@ -31,7 +31,7 @@ use self::extract_word_position_docids::extract_word_position_docids;
|
|||||||
use super::helpers::{as_cloneable_grenad, CursorClonableMmap, GrenadParameters};
|
use super::helpers::{as_cloneable_grenad, CursorClonableMmap, GrenadParameters};
|
||||||
use super::{helpers, TypedChunk};
|
use super::{helpers, TypedChunk};
|
||||||
use crate::update::settings::InnerIndexSettingsDiff;
|
use crate::update::settings::InnerIndexSettingsDiff;
|
||||||
use crate::{FieldId, Result, ThreadPoolNoAbortBuilder};
|
use crate::{FieldId, Result, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder};
|
||||||
|
|
||||||
/// Extract data for each databases from obkv documents in parallel.
|
/// Extract data for each databases from obkv documents in parallel.
|
||||||
/// Send data in grenad file over provided Sender.
|
/// Send data in grenad file over provided Sender.
|
||||||
@ -213,6 +213,18 @@ fn run_extraction_task<FE, FS, M>(
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn request_threads() -> &'static ThreadPoolNoAbort {
|
||||||
|
static REQUEST_THREADS: OnceLock<ThreadPoolNoAbort> = OnceLock::new();
|
||||||
|
|
||||||
|
REQUEST_THREADS.get_or_init(|| {
|
||||||
|
ThreadPoolNoAbortBuilder::new()
|
||||||
|
.num_threads(crate::vector::REQUEST_PARALLELISM)
|
||||||
|
.thread_name(|index| format!("embedding-request-{index}"))
|
||||||
|
.build()
|
||||||
|
.unwrap()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
/// Extract chunked data and send it into lmdb_writer_sx sender:
|
/// Extract chunked data and send it into lmdb_writer_sx sender:
|
||||||
/// - documents
|
/// - documents
|
||||||
fn send_original_documents_data(
|
fn send_original_documents_data(
|
||||||
@ -227,11 +239,6 @@ fn send_original_documents_data(
|
|||||||
let documents_chunk_cloned = original_documents_chunk.clone();
|
let documents_chunk_cloned = original_documents_chunk.clone();
|
||||||
let lmdb_writer_sx_cloned = lmdb_writer_sx.clone();
|
let lmdb_writer_sx_cloned = lmdb_writer_sx.clone();
|
||||||
|
|
||||||
let request_threads = ThreadPoolNoAbortBuilder::new()
|
|
||||||
.num_threads(crate::vector::REQUEST_PARALLELISM)
|
|
||||||
.thread_name(|index| format!("embedding-request-{index}"))
|
|
||||||
.build()?;
|
|
||||||
|
|
||||||
if settings_diff.reindex_vectors() || !settings_diff.settings_update_only() {
|
if settings_diff.reindex_vectors() || !settings_diff.settings_update_only() {
|
||||||
let settings_diff = settings_diff.clone();
|
let settings_diff = settings_diff.clone();
|
||||||
rayon::spawn(move || {
|
rayon::spawn(move || {
|
||||||
@ -249,7 +256,7 @@ fn send_original_documents_data(
|
|||||||
prompts,
|
prompts,
|
||||||
indexer,
|
indexer,
|
||||||
embedder.clone(),
|
embedder.clone(),
|
||||||
&request_threads,
|
request_threads(),
|
||||||
) {
|
) {
|
||||||
Ok(results) => Some(results),
|
Ok(results) => Some(results),
|
||||||
Err(error) => {
|
Err(error) => {
|
||||||
|
@ -48,7 +48,6 @@ pub struct Transform<'a, 'i> {
|
|||||||
fields_ids_map: FieldsIdsMap,
|
fields_ids_map: FieldsIdsMap,
|
||||||
|
|
||||||
indexer_settings: &'a IndexerConfig,
|
indexer_settings: &'a IndexerConfig,
|
||||||
pub autogenerate_docids: bool,
|
|
||||||
pub index_documents_method: IndexDocumentsMethod,
|
pub index_documents_method: IndexDocumentsMethod,
|
||||||
available_documents_ids: AvailableDocumentsIds,
|
available_documents_ids: AvailableDocumentsIds,
|
||||||
|
|
||||||
@ -102,7 +101,7 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||||||
index: &'i Index,
|
index: &'i Index,
|
||||||
indexer_settings: &'a IndexerConfig,
|
indexer_settings: &'a IndexerConfig,
|
||||||
index_documents_method: IndexDocumentsMethod,
|
index_documents_method: IndexDocumentsMethod,
|
||||||
autogenerate_docids: bool,
|
_autogenerate_docids: bool,
|
||||||
) -> Result<Self> {
|
) -> Result<Self> {
|
||||||
// We must choose the appropriate merge function for when two or more documents
|
// We must choose the appropriate merge function for when two or more documents
|
||||||
// with the same user id must be merged or fully replaced in the same batch.
|
// with the same user id must be merged or fully replaced in the same batch.
|
||||||
@ -136,7 +135,6 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||||||
index,
|
index,
|
||||||
fields_ids_map: index.fields_ids_map(wtxn)?,
|
fields_ids_map: index.fields_ids_map(wtxn)?,
|
||||||
indexer_settings,
|
indexer_settings,
|
||||||
autogenerate_docids,
|
|
||||||
available_documents_ids: AvailableDocumentsIds::from_documents_ids(&documents_ids),
|
available_documents_ids: AvailableDocumentsIds::from_documents_ids(&documents_ids),
|
||||||
original_sorter,
|
original_sorter,
|
||||||
flattened_sorter,
|
flattened_sorter,
|
||||||
|
@ -21,7 +21,7 @@ reqwest = { version = "0.11.23", features = [
|
|||||||
"stream",
|
"stream",
|
||||||
"json",
|
"json",
|
||||||
"rustls-tls",
|
"rustls-tls",
|
||||||
], default_features = false }
|
], default-features = false }
|
||||||
serde = { version = "1.0.195", features = ["derive"] }
|
serde = { version = "1.0.195", features = ["derive"] }
|
||||||
serde_json = "1.0.111"
|
serde_json = "1.0.111"
|
||||||
sha2 = "0.10.8"
|
sha2 = "0.10.8"
|
||||||
|
Loading…
Reference in New Issue
Block a user