Only spawn the pool once

This commit is contained in:
Louis Dureuil 2024-06-19 13:57:44 +02:00
parent 6c6c4732a1
commit 6254c7cee1
No known key found for this signature in database

View File

@ -11,7 +11,7 @@ mod extract_word_position_docids;
use std::fs::File; use std::fs::File;
use std::io::BufReader; use std::io::BufReader;
use std::sync::Arc; use std::sync::{Arc, OnceLock};
use crossbeam_channel::Sender; use crossbeam_channel::Sender;
use rayon::prelude::*; use rayon::prelude::*;
@ -31,7 +31,7 @@ use self::extract_word_position_docids::extract_word_position_docids;
use super::helpers::{as_cloneable_grenad, CursorClonableMmap, GrenadParameters}; use super::helpers::{as_cloneable_grenad, CursorClonableMmap, GrenadParameters};
use super::{helpers, TypedChunk}; use super::{helpers, TypedChunk};
use crate::update::settings::InnerIndexSettingsDiff; use crate::update::settings::InnerIndexSettingsDiff;
use crate::{FieldId, Result, ThreadPoolNoAbortBuilder}; use crate::{FieldId, Result, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder};
/// Extract data for each databases from obkv documents in parallel. /// Extract data for each databases from obkv documents in parallel.
/// Send data in grenad file over provided Sender. /// Send data in grenad file over provided Sender.
@ -213,6 +213,18 @@ fn run_extraction_task<FE, FS, M>(
}) })
} }
fn request_threads() -> &'static ThreadPoolNoAbort {
static REQUEST_THREADS: OnceLock<ThreadPoolNoAbort> = OnceLock::new();
REQUEST_THREADS.get_or_init(|| {
ThreadPoolNoAbortBuilder::new()
.num_threads(crate::vector::REQUEST_PARALLELISM)
.thread_name(|index| format!("embedding-request-{index}"))
.build()
.unwrap()
})
}
/// Extract chunked data and send it into lmdb_writer_sx sender: /// Extract chunked data and send it into lmdb_writer_sx sender:
/// - documents /// - documents
fn send_original_documents_data( fn send_original_documents_data(
@ -227,10 +239,7 @@ fn send_original_documents_data(
let documents_chunk_cloned = original_documents_chunk.clone(); let documents_chunk_cloned = original_documents_chunk.clone();
let lmdb_writer_sx_cloned = lmdb_writer_sx.clone(); let lmdb_writer_sx_cloned = lmdb_writer_sx.clone();
let request_threads = ThreadPoolNoAbortBuilder::new() let request_threads = request_threads();
.num_threads(crate::vector::REQUEST_PARALLELISM)
.thread_name(|index| format!("embedding-request-{index}"))
.build()?;
if settings_diff.reindex_vectors() || !settings_diff.settings_update_only() { if settings_diff.reindex_vectors() || !settings_diff.settings_update_only() {
let settings_diff = settings_diff.clone(); let settings_diff = settings_diff.clone();
@ -249,7 +258,7 @@ fn send_original_documents_data(
prompts, prompts,
indexer, indexer,
embedder.clone(), embedder.clone(),
&request_threads, request_threads,
) { ) {
Ok(results) => Some(results), Ok(results) => Some(results),
Err(error) => { Err(error) => {