mirror of
https://github.com/meilisearch/meilisearch.git
synced 2024-11-23 02:27:40 +08:00
Merge #4710
4710: Only spawn thread pool once (v1.9) r=irevoire a=dureuill # Pull Request See #4707 Co-authored-by: Louis Dureuil <louis@meilisearch.com>
This commit is contained in:
commit
4ae11bfd31
@ -11,7 +11,7 @@ mod extract_word_position_docids;
|
|||||||
|
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::io::BufReader;
|
use std::io::BufReader;
|
||||||
use std::sync::Arc;
|
use std::sync::{Arc, OnceLock};
|
||||||
|
|
||||||
use crossbeam_channel::Sender;
|
use crossbeam_channel::Sender;
|
||||||
use rayon::prelude::*;
|
use rayon::prelude::*;
|
||||||
@ -32,7 +32,7 @@ use super::helpers::{as_cloneable_grenad, CursorClonableMmap, GrenadParameters};
|
|||||||
use super::{helpers, TypedChunk};
|
use super::{helpers, TypedChunk};
|
||||||
use crate::index::IndexEmbeddingConfig;
|
use crate::index::IndexEmbeddingConfig;
|
||||||
use crate::update::settings::InnerIndexSettingsDiff;
|
use crate::update::settings::InnerIndexSettingsDiff;
|
||||||
use crate::{FieldId, Result, ThreadPoolNoAbortBuilder};
|
use crate::{FieldId, Result, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder};
|
||||||
|
|
||||||
/// Extract data for each databases from obkv documents in parallel.
|
/// Extract data for each databases from obkv documents in parallel.
|
||||||
/// Send data in grenad file over provided Sender.
|
/// Send data in grenad file over provided Sender.
|
||||||
@ -207,6 +207,18 @@ fn run_extraction_task<FE, FS, M>(
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn request_threads() -> &'static ThreadPoolNoAbort {
|
||||||
|
static REQUEST_THREADS: OnceLock<ThreadPoolNoAbort> = OnceLock::new();
|
||||||
|
|
||||||
|
REQUEST_THREADS.get_or_init(|| {
|
||||||
|
ThreadPoolNoAbortBuilder::new()
|
||||||
|
.num_threads(crate::vector::REQUEST_PARALLELISM)
|
||||||
|
.thread_name(|index| format!("embedding-request-{index}"))
|
||||||
|
.build()
|
||||||
|
.unwrap()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
/// Extract chunked data and send it into lmdb_writer_sx sender:
|
/// Extract chunked data and send it into lmdb_writer_sx sender:
|
||||||
/// - documents
|
/// - documents
|
||||||
fn send_original_documents_data(
|
fn send_original_documents_data(
|
||||||
@ -219,11 +231,6 @@ fn send_original_documents_data(
|
|||||||
let original_documents_chunk =
|
let original_documents_chunk =
|
||||||
original_documents_chunk.and_then(|c| unsafe { as_cloneable_grenad(&c) })?;
|
original_documents_chunk.and_then(|c| unsafe { as_cloneable_grenad(&c) })?;
|
||||||
|
|
||||||
let request_threads = ThreadPoolNoAbortBuilder::new()
|
|
||||||
.num_threads(crate::vector::REQUEST_PARALLELISM)
|
|
||||||
.thread_name(|index| format!("embedding-request-{index}"))
|
|
||||||
.build()?;
|
|
||||||
|
|
||||||
let index_vectors = (settings_diff.reindex_vectors() || !settings_diff.settings_update_only())
|
let index_vectors = (settings_diff.reindex_vectors() || !settings_diff.settings_update_only())
|
||||||
// no point in indexing vectors without embedders
|
// no point in indexing vectors without embedders
|
||||||
&& (!settings_diff.new.embedding_configs.inner_as_ref().is_empty());
|
&& (!settings_diff.new.embedding_configs.inner_as_ref().is_empty());
|
||||||
@ -256,7 +263,7 @@ fn send_original_documents_data(
|
|||||||
prompts,
|
prompts,
|
||||||
indexer,
|
indexer,
|
||||||
embedder.clone(),
|
embedder.clone(),
|
||||||
&request_threads,
|
request_threads(),
|
||||||
) {
|
) {
|
||||||
Ok(results) => Some(results),
|
Ok(results) => Some(results),
|
||||||
Err(error) => {
|
Err(error) => {
|
||||||
|
Loading…
Reference in New Issue
Block a user