Only spawn the pool once

This commit is contained in:
Louis Dureuil 2024-06-19 16:25:33 +02:00
parent e580d6b98f
commit a04041c8f2
No known key found for this signature in database

View File

@ -11,7 +11,7 @@ mod extract_word_position_docids;
use std::fs::File; use std::fs::File;
use std::io::BufReader; use std::io::BufReader;
use std::sync::Arc; use std::sync::{Arc, OnceLock};
use crossbeam_channel::Sender; use crossbeam_channel::Sender;
use rayon::prelude::*; use rayon::prelude::*;
@ -32,7 +32,7 @@ use super::helpers::{as_cloneable_grenad, CursorClonableMmap, GrenadParameters};
use super::{helpers, TypedChunk}; use super::{helpers, TypedChunk};
use crate::index::IndexEmbeddingConfig; use crate::index::IndexEmbeddingConfig;
use crate::update::settings::InnerIndexSettingsDiff; use crate::update::settings::InnerIndexSettingsDiff;
use crate::{FieldId, Result, ThreadPoolNoAbortBuilder}; use crate::{FieldId, Result, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder};
/// Extract data for each databases from obkv documents in parallel. /// Extract data for each databases from obkv documents in parallel.
/// Send data in grenad file over provided Sender. /// Send data in grenad file over provided Sender.
@ -207,6 +207,18 @@ fn run_extraction_task<FE, FS, M>(
}) })
} }
fn request_threads() -> &'static ThreadPoolNoAbort {
static REQUEST_THREADS: OnceLock<ThreadPoolNoAbort> = OnceLock::new();
REQUEST_THREADS.get_or_init(|| {
ThreadPoolNoAbortBuilder::new()
.num_threads(crate::vector::REQUEST_PARALLELISM)
.thread_name(|index| format!("embedding-request-{index}"))
.build()
.unwrap()
})
}
/// Extract chunked data and send it into lmdb_writer_sx sender: /// Extract chunked data and send it into lmdb_writer_sx sender:
/// - documents /// - documents
fn send_original_documents_data( fn send_original_documents_data(
@ -219,11 +231,6 @@ fn send_original_documents_data(
let original_documents_chunk = let original_documents_chunk =
original_documents_chunk.and_then(|c| unsafe { as_cloneable_grenad(&c) })?; original_documents_chunk.and_then(|c| unsafe { as_cloneable_grenad(&c) })?;
let request_threads = ThreadPoolNoAbortBuilder::new()
.num_threads(crate::vector::REQUEST_PARALLELISM)
.thread_name(|index| format!("embedding-request-{index}"))
.build()?;
let index_vectors = (settings_diff.reindex_vectors() || !settings_diff.settings_update_only()) let index_vectors = (settings_diff.reindex_vectors() || !settings_diff.settings_update_only())
// no point in indexing vectors without embedders // no point in indexing vectors without embedders
&& (!settings_diff.new.embedding_configs.inner_as_ref().is_empty()); && (!settings_diff.new.embedding_configs.inner_as_ref().is_empty());
@ -256,7 +263,7 @@ fn send_original_documents_data(
prompts, prompts,
indexer, indexer,
embedder.clone(), embedder.clone(),
&request_threads, request_threads(),
) { ) {
Ok(results) => Some(results), Ok(results) => Some(results),
Err(error) => { Err(error) => {