diff --git a/milli/src/update/new/append_only_vec.rs b/milli/src/update/new/append_only_vec.rs index fe05dd782..d4a30c1b1 100644 --- a/milli/src/update/new/append_only_vec.rs +++ b/milli/src/update/new/append_only_vec.rs @@ -99,12 +99,6 @@ fn test_indices() { } } -impl Default for AppendOnlyVec { - fn default() -> Self { - Self::new() - } -} - impl AppendOnlyVec { const EMPTY: UnsafeCell<*mut T> = UnsafeCell::new(ptr::null_mut()); @@ -220,6 +214,12 @@ impl AppendOnlyVec { } } +impl Default for AppendOnlyVec { + fn default() -> Self { + Self::new() + } +} + impl Debug for AppendOnlyVec { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("AppendOnlyVec").field("len", &self.len()).finish() diff --git a/milli/src/update/new/extract/faceted/extract_facets.rs b/milli/src/update/new/extract/faceted/extract_facets.rs index 40f561b97..ef983c4e6 100644 --- a/milli/src/update/new/extract/faceted/extract_facets.rs +++ b/milli/src/update/new/extract/faceted/extract_facets.rs @@ -12,6 +12,7 @@ use super::super::cache::CboCachedSorter; use super::facet_document::extract_document_facets; use super::FacetKind; use crate::facet::value_encoding::f64_into_bytes; +use crate::update::new::append_only_vec::AppendOnlyVec; use crate::update::new::extract::DocidsExtractor; use crate::update::new::items_pool::ParallelIteratorExt; use crate::update::new::{DocumentChange, ItemsPool}; @@ -209,45 +210,40 @@ impl DocidsExtractor for FacetedDocidsExtractor { let attributes_to_extract = Self::attributes_to_extract(&rtxn, index)?; let attributes_to_extract: Vec<_> = attributes_to_extract.iter().map(|s| s.as_ref()).collect(); - - let context_pool = ItemsPool::new(|| { - Ok(( - fields_ids_map.clone(), - Vec::new(), - CboCachedSorter::new( - // TODO use a better value - 100.try_into().unwrap(), - create_sorter( - grenad::SortAlgorithm::Stable, - MergeDeladdCboRoaringBitmaps, - indexer.chunk_compression_type, - indexer.chunk_compression_level, - indexer.max_nb_chunks, - max_memory, - ), - ), - )) - }); + let caches = AppendOnlyVec::new(); { let span = tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction"); let _entered = span.enter(); document_changes.into_par_iter().try_arc_for_each_try_init( - || index.read_txn().map_err(Error::from), - |rtxn, document_change| { - context_pool.with(|(fields_ids_map, buffer, cached_sorter)| { - Self::extract_document_change( - rtxn, - index, - buffer, - fields_ids_map, - &attributes_to_extract, - cached_sorter, - document_change?, - ) - .map_err(Arc::new) - }) + || { + let rtxn = index.read_txn().map_err(Error::from)?; + let cache = caches.push(CboCachedSorter::new( + // TODO use a better value + 100.try_into().unwrap(), + create_sorter( + grenad::SortAlgorithm::Stable, + MergeDeladdCboRoaringBitmaps, + indexer.chunk_compression_type, + indexer.chunk_compression_level, + indexer.max_nb_chunks, + max_memory, + ), + )); + Ok((rtxn, fields_ids_map.clone(), Vec::new(), cache)) + }, + |(rtxn, fields_ids_map, buffer, cached_sorter), document_change| { + Self::extract_document_change( + rtxn, + index, + buffer, + fields_ids_map, + &attributes_to_extract, + cached_sorter, + document_change?, + ) + .map_err(Arc::new) }, )?; } @@ -257,14 +253,15 @@ impl DocidsExtractor for FacetedDocidsExtractor { tracing::trace_span!(target: "indexing::documents::extract", "merger_building"); let _entered = span.enter(); - let readers: Vec<_> = context_pool - .into_items() + let readers: Vec<_> = caches + .into_iter() .par_bridge() - .map(|(_tokenizer, _fields_ids_map, cached_sorter)| { + .map(|cached_sorter| { let sorter = cached_sorter.into_sorter()?; sorter.into_reader_cursors() }) .collect(); + for reader in readers { builder.extend(reader?); } diff --git a/milli/src/update/new/extract/searchable/extract_word_docids.rs b/milli/src/update/new/extract/searchable/extract_word_docids.rs index f59f5a03d..a19ac3891 100644 --- a/milli/src/update/new/extract/searchable/extract_word_docids.rs +++ b/milli/src/update/new/extract/searchable/extract_word_docids.rs @@ -8,6 +8,8 @@ use heed::RoTxn; use rayon::iter::IntoParallelIterator; use super::tokenize_document::{tokenizer_builder, DocumentTokenizer}; +use super::SearchableExtractor; +use crate::update::new::append_only_vec::AppendOnlyVec; use crate::update::new::extract::cache::CboCachedSorter; use crate::update::new::extract::perm_json_p::contained_in; use crate::update::new::items_pool::ParallelIteratorExt; @@ -339,37 +341,33 @@ impl WordDocidsExtractors { max_positions_per_attributes: MAX_POSITION_PER_ATTRIBUTE, }; - let context_pool = ItemsPool::new(|| { - Ok(( - &document_tokenizer, - fields_ids_map.clone(), - WordDocidsCachedSorters::new( - indexer, - max_memory, - // TODO use a better value - 200_000.try_into().unwrap(), - ), - )) - }); + let caches = AppendOnlyVec::new(); { let span = tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction"); let _entered = span.enter(); document_changes.into_par_iter().try_arc_for_each_try_init( - || index.read_txn().map_err(Error::from), - |rtxn, document_change| { - context_pool.with(|(document_tokenizer, fields_ids_map, cached_sorter)| { - Self::extract_document_change( - rtxn, - index, - document_tokenizer, - fields_ids_map, - cached_sorter, - document_change?, - ) - .map_err(Arc::new) - }) + || { + let rtxn = index.read_txn().map_err(Error::from)?; + let cache = caches.push(WordDocidsCachedSorters::new( + indexer, + max_memory, + // TODO use a better value + 200_000.try_into().unwrap(), + )); + Ok((rtxn, &document_tokenizer, fields_ids_map.clone(), cache)) + }, + |(rtxn, document_tokenizer, fields_ids_map, cached_sorter), document_change| { + Self::extract_document_change( + rtxn, + index, + document_tokenizer, + fields_ids_map, + cached_sorter, + document_change?, + ) + .map_err(Arc::new) }, )?; } @@ -379,7 +377,7 @@ impl WordDocidsExtractors { tracing::trace_span!(target: "indexing::documents::extract", "merger_building"); let _entered = span.enter(); let mut builder = WordDocidsMergerBuilders::new(); - for (_tokenizer, _fields_ids_map, cache) in context_pool.into_items() { + for cache in caches.into_iter() { builder.add_sorters(cache)?; } diff --git a/milli/src/update/new/extract/searchable/mod.rs b/milli/src/update/new/extract/searchable/mod.rs index b6cda3a87..f09f573e0 100644 --- a/milli/src/update/new/extract/searchable/mod.rs +++ b/milli/src/update/new/extract/searchable/mod.rs @@ -14,6 +14,7 @@ use tokenize_document::{tokenizer_builder, DocumentTokenizer}; use super::cache::CboCachedSorter; use super::DocidsExtractor; +use crate::update::new::append_only_vec::AppendOnlyVec; use crate::update::new::items_pool::ParallelIteratorExt; use crate::update::new::{DocumentChange, ItemsPool}; use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; @@ -57,44 +58,39 @@ pub trait SearchableExtractor { localized_attributes_rules: &localized_attributes_rules, max_positions_per_attributes: MAX_POSITION_PER_ATTRIBUTE, }; - - let context_pool = ItemsPool::new(|| { - Ok(( - &document_tokenizer, - fields_ids_map.clone(), - CboCachedSorter::new( - // TODO use a better value - 1_000_000.try_into().unwrap(), - create_sorter( - grenad::SortAlgorithm::Stable, - MergeDeladdCboRoaringBitmaps, - indexer.chunk_compression_type, - indexer.chunk_compression_level, - indexer.max_nb_chunks, - max_memory, - ), - ), - )) - }); + let caches = AppendOnlyVec::new(); { let span = tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction"); let _entered = span.enter(); document_changes.into_par_iter().try_arc_for_each_try_init( - || index.read_txn().map_err(Error::from), - |rtxn, document_change| { - context_pool.with(|(document_tokenizer, fields_ids_map, cached_sorter)| { - Self::extract_document_change( - rtxn, - index, - document_tokenizer, - fields_ids_map, - cached_sorter, - document_change?, - ) - .map_err(Arc::new) - }) + || { + let rtxn = index.read_txn().map_err(Error::from)?; + let cache = caches.push(CboCachedSorter::new( + // TODO use a better value + 1_000_000.try_into().unwrap(), + create_sorter( + grenad::SortAlgorithm::Stable, + MergeDeladdCboRoaringBitmaps, + indexer.chunk_compression_type, + indexer.chunk_compression_level, + indexer.max_nb_chunks, + max_memory, + ), + )); + Ok((rtxn, &document_tokenizer, fields_ids_map.clone(), cache)) + }, + |(rtxn, document_tokenizer, fields_ids_map, cached_sorter), document_change| { + Self::extract_document_change( + rtxn, + index, + document_tokenizer, + fields_ids_map, + cached_sorter, + document_change?, + ) + .map_err(Arc::new) }, )?; } @@ -104,14 +100,15 @@ pub trait SearchableExtractor { tracing::trace_span!(target: "indexing::documents::extract", "merger_building"); let _entered = span.enter(); - let readers: Vec<_> = context_pool - .into_items() + let readers: Vec<_> = caches + .into_iter() .par_bridge() - .map(|(_tokenizer, _fields_ids_map, cached_sorter)| { + .map(|cached_sorter| { let sorter = cached_sorter.into_sorter()?; sorter.into_reader_cursors() }) .collect(); + for reader in readers { builder.extend(reader?); }