diff --git a/Cargo.lock b/Cargo.lock index 06bd9c234..335445956 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3598,6 +3598,7 @@ dependencies = [ "smartstring", "tempfile", "thiserror", + "thread_local", "tiktoken-rs", "time", "tokenizers", @@ -5332,9 +5333,9 @@ dependencies = [ [[package]] name = "thread_local" -version = "1.1.7" +version = "1.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3fdd6f064ccff2d6567adcb3873ca630700f00b5ad3f060c25b5dcfd9a4ce152" +checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c" dependencies = [ "cfg-if", "once_cell", diff --git a/milli/Cargo.toml b/milli/Cargo.toml index bae3dd64b..72f3daa4e 100644 --- a/milli/Cargo.toml +++ b/milli/Cargo.toml @@ -89,6 +89,7 @@ ureq = { version = "2.10.0", features = ["json"] } url = "2.5.2" rayon-par-bridge = "0.1.0" hashbrown = "0.14.5" +thread_local = "1.1.8" [dev-dependencies] mimalloc = { version = "0.1.43", default-features = false } diff --git a/milli/src/update/new/extract/faceted/extract_facets.rs b/milli/src/update/new/extract/faceted/extract_facets.rs index f4ad50bfe..8e8b71676 100644 --- a/milli/src/update/new/extract/faceted/extract_facets.rs +++ b/milli/src/update/new/extract/faceted/extract_facets.rs @@ -1,3 +1,4 @@ +use std::cell::RefCell; use std::collections::HashSet; use std::fmt::Debug; use std::fs::File; @@ -7,6 +8,7 @@ use grenad::{MergeFunction, Merger}; use heed::RoTxn; use rayon::iter::{IntoParallelIterator, ParallelBridge, ParallelIterator}; use serde_json::Value; +use thread_local::ThreadLocal; use super::super::cache::CboCachedSorter; use super::facet_document::extract_document_facets; @@ -216,24 +218,28 @@ impl DocidsExtractor for FacetedDocidsExtractor { let span = tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction"); let _entered = span.enter(); + let local = ThreadLocal::new(); document_changes.into_par_iter().try_arc_for_each_try_init( || { - let rtxn = index.read_txn().map_err(Error::from)?; - let cache = caches.push(CboCachedSorter::new( - // TODO use a better value - 100.try_into().unwrap(), - create_sorter( - grenad::SortAlgorithm::Stable, - MergeDeladdCboRoaringBitmaps, - indexer.chunk_compression_type, - indexer.chunk_compression_level, - indexer.max_nb_chunks, - max_memory, - ), - )); - Ok((rtxn, fields_ids_map.clone(), Vec::new(), cache)) + local.get_or_try(|| { + let rtxn = index.read_txn().map_err(Error::from)?; + let cache = caches.push(CboCachedSorter::new( + /// TODO use a better value + 100.try_into().unwrap(), + create_sorter( + grenad::SortAlgorithm::Stable, + MergeDeladdCboRoaringBitmaps, + indexer.chunk_compression_type, + indexer.chunk_compression_level, + indexer.max_nb_chunks, + max_memory, + ), + )); + Ok((rtxn, RefCell::new((fields_ids_map.clone(), Vec::new(), cache)))) + }) }, - |(rtxn, fields_ids_map, buffer, cached_sorter), document_change| { + |(rtxn, rc), document_change| { + let (fields_ids_map, buffer, cached_sorter) = &mut *rc.borrow_mut(); Self::extract_document_change( rtxn, index, diff --git a/milli/src/update/new/extract/searchable/extract_word_docids.rs b/milli/src/update/new/extract/searchable/extract_word_docids.rs index 702b8f4e9..df8409618 100644 --- a/milli/src/update/new/extract/searchable/extract_word_docids.rs +++ b/milli/src/update/new/extract/searchable/extract_word_docids.rs @@ -1,3 +1,4 @@ +use std::cell::RefCell; use std::collections::HashMap; use std::fs::File; use std::num::NonZero; @@ -6,6 +7,7 @@ use std::sync::Arc; use grenad::{Merger, MergerBuilder}; use heed::RoTxn; use rayon::iter::IntoParallelIterator; +use thread_local::ThreadLocal; use super::tokenize_document::{tokenizer_builder, DocumentTokenizer}; use super::SearchableExtractor; @@ -347,18 +349,23 @@ impl WordDocidsExtractors { let span = tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction"); let _entered = span.enter(); + let local = ThreadLocal::new(); document_changes.into_par_iter().try_arc_for_each_try_init( || { - let rtxn = index.read_txn().map_err(Error::from)?; - let cache = caches.push(WordDocidsCachedSorters::new( - indexer, - max_memory, - // TODO use a better value - 200_000.try_into().unwrap(), - )); - Ok((rtxn, &document_tokenizer, fields_ids_map.clone(), cache)) + local.get_or_try(|| { + let rtxn = index.read_txn().map_err(Error::from)?; + let fields_ids_map = fields_ids_map.clone(); + let cache = caches.push(WordDocidsCachedSorters::new( + indexer, + max_memory, + // TODO use a better value + 200_000.try_into().unwrap(), + )); + Ok((rtxn, &document_tokenizer, RefCell::new((fields_ids_map, cache)))) + }) }, - |(rtxn, document_tokenizer, fields_ids_map, cached_sorter), document_change| { + |(rtxn, document_tokenizer, rc), document_change| { + let (fields_ids_map, cached_sorter) = &mut *rc.borrow_mut(); Self::extract_document_change( rtxn, index, @@ -377,7 +384,9 @@ impl WordDocidsExtractors { tracing::trace_span!(target: "indexing::documents::extract", "merger_building"); let _entered = span.enter(); let mut builder = WordDocidsMergerBuilders::new(); + let mut count = 0; for cache in caches.into_iter() { + count += 1; builder.add_sorters(cache)?; } diff --git a/milli/src/update/new/extract/searchable/mod.rs b/milli/src/update/new/extract/searchable/mod.rs index ba1d53f54..272bff4d3 100644 --- a/milli/src/update/new/extract/searchable/mod.rs +++ b/milli/src/update/new/extract/searchable/mod.rs @@ -2,6 +2,7 @@ mod extract_word_docids; mod extract_word_pair_proximity_docids; mod tokenize_document; +use std::cell::RefCell; use std::fs::File; use std::sync::Arc; @@ -10,6 +11,7 @@ pub use extract_word_pair_proximity_docids::WordPairProximityDocidsExtractor; use grenad::Merger; use heed::RoTxn; use rayon::iter::{IntoParallelIterator, ParallelBridge, ParallelIterator}; +use thread_local::ThreadLocal; use tokenize_document::{tokenizer_builder, DocumentTokenizer}; use super::cache::CboCachedSorter; @@ -64,24 +66,32 @@ pub trait SearchableExtractor { let span = tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction"); let _entered = span.enter(); + let local = ThreadLocal::new(); document_changes.into_par_iter().try_arc_for_each_try_init( || { - let rtxn = index.read_txn().map_err(Error::from)?; - let cache = caches.push(CboCachedSorter::new( - // TODO use a better value - 1_000_000.try_into().unwrap(), - create_sorter( - grenad::SortAlgorithm::Stable, - MergeDeladdCboRoaringBitmaps, - indexer.chunk_compression_type, - indexer.chunk_compression_level, - indexer.max_nb_chunks, - max_memory, - ), - )); - Ok((rtxn, &document_tokenizer, fields_ids_map.clone(), cache)) + local.get_or_try(|| { + let rtxn = index.read_txn().map_err(Error::from)?; + let cache = caches.push(CboCachedSorter::new( + /// TODO use a better value + 1_000_000.try_into().unwrap(), + create_sorter( + grenad::SortAlgorithm::Stable, + MergeDeladdCboRoaringBitmaps, + indexer.chunk_compression_type, + indexer.chunk_compression_level, + indexer.max_nb_chunks, + max_memory, + ), + )); + Ok(( + rtxn, + &document_tokenizer, + RefCell::new((fields_ids_map.clone(), cache)), + )) + }) }, - |(rtxn, document_tokenizer, fields_ids_map, cached_sorter), document_change| { + |(rtxn, document_tokenizer, rc), document_change| { + let (fields_ids_map, cached_sorter) = &mut *rc.borrow_mut(); Self::extract_document_change( rtxn, index,