From 6254c7cee15748b3908eafab4220e2fb938da87d Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Wed, 19 Jun 2024 13:57:44 +0200 Subject: [PATCH] Only spawn the pool once --- .../src/update/index_documents/extract/mod.rs | 23 +++++++++++++------ 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/milli/src/update/index_documents/extract/mod.rs b/milli/src/update/index_documents/extract/mod.rs index 237e19b2a..6935717dc 100644 --- a/milli/src/update/index_documents/extract/mod.rs +++ b/milli/src/update/index_documents/extract/mod.rs @@ -11,7 +11,7 @@ mod extract_word_position_docids; use std::fs::File; use std::io::BufReader; -use std::sync::Arc; +use std::sync::{Arc, OnceLock}; use crossbeam_channel::Sender; use rayon::prelude::*; @@ -31,7 +31,7 @@ use self::extract_word_position_docids::extract_word_position_docids; use super::helpers::{as_cloneable_grenad, CursorClonableMmap, GrenadParameters}; use super::{helpers, TypedChunk}; use crate::update::settings::InnerIndexSettingsDiff; -use crate::{FieldId, Result, ThreadPoolNoAbortBuilder}; +use crate::{FieldId, Result, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder}; /// Extract data for each databases from obkv documents in parallel. /// Send data in grenad file over provided Sender. @@ -213,6 +213,18 @@ fn run_extraction_task( }) } +fn request_threads() -> &'static ThreadPoolNoAbort { + static REQUEST_THREADS: OnceLock = OnceLock::new(); + + REQUEST_THREADS.get_or_init(|| { + ThreadPoolNoAbortBuilder::new() + .num_threads(crate::vector::REQUEST_PARALLELISM) + .thread_name(|index| format!("embedding-request-{index}")) + .build() + .unwrap() + }) +} + /// Extract chunked data and send it into lmdb_writer_sx sender: /// - documents fn send_original_documents_data( @@ -227,10 +239,7 @@ fn send_original_documents_data( let documents_chunk_cloned = original_documents_chunk.clone(); let lmdb_writer_sx_cloned = lmdb_writer_sx.clone(); - let request_threads = ThreadPoolNoAbortBuilder::new() - .num_threads(crate::vector::REQUEST_PARALLELISM) - .thread_name(|index| format!("embedding-request-{index}")) - .build()?; + let request_threads = request_threads(); if settings_diff.reindex_vectors() || !settings_diff.settings_update_only() { let settings_diff = settings_diff.clone(); @@ -249,7 +258,7 @@ fn send_original_documents_data( prompts, indexer, embedder.clone(), - &request_threads, + request_threads, ) { Ok(results) => Some(results), Err(error) => {