From c11b7e5c0f6d6e2e6abe26006b72ea934397d52a Mon Sep 17 00:00:00 2001 From: ManyTheFish Date: Mon, 7 Oct 2024 15:58:16 +0200 Subject: [PATCH 1/3] Reduce number of cache created by using thread_local --- Cargo.lock | 5 ++- milli/Cargo.toml | 1 + .../new/extract/faceted/extract_facets.rs | 36 ++++++++++------- .../extract/searchable/extract_word_docids.rs | 27 ++++++++----- .../src/update/new/extract/searchable/mod.rs | 40 ++++++++++++------- 5 files changed, 68 insertions(+), 41 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 06bd9c234..335445956 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3598,6 +3598,7 @@ dependencies = [ "smartstring", "tempfile", "thiserror", + "thread_local", "tiktoken-rs", "time", "tokenizers", @@ -5332,9 +5333,9 @@ dependencies = [ [[package]] name = "thread_local" -version = "1.1.7" +version = "1.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3fdd6f064ccff2d6567adcb3873ca630700f00b5ad3f060c25b5dcfd9a4ce152" +checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c" dependencies = [ "cfg-if", "once_cell", diff --git a/milli/Cargo.toml b/milli/Cargo.toml index bae3dd64b..72f3daa4e 100644 --- a/milli/Cargo.toml +++ b/milli/Cargo.toml @@ -89,6 +89,7 @@ ureq = { version = "2.10.0", features = ["json"] } url = "2.5.2" rayon-par-bridge = "0.1.0" hashbrown = "0.14.5" +thread_local = "1.1.8" [dev-dependencies] mimalloc = { version = "0.1.43", default-features = false } diff --git a/milli/src/update/new/extract/faceted/extract_facets.rs b/milli/src/update/new/extract/faceted/extract_facets.rs index f4ad50bfe..8e8b71676 100644 --- a/milli/src/update/new/extract/faceted/extract_facets.rs +++ b/milli/src/update/new/extract/faceted/extract_facets.rs @@ -1,3 +1,4 @@ +use std::cell::RefCell; use std::collections::HashSet; use std::fmt::Debug; use std::fs::File; @@ -7,6 +8,7 @@ use grenad::{MergeFunction, Merger}; use heed::RoTxn; use rayon::iter::{IntoParallelIterator, ParallelBridge, ParallelIterator}; use serde_json::Value; +use thread_local::ThreadLocal; use super::super::cache::CboCachedSorter; use super::facet_document::extract_document_facets; @@ -216,24 +218,28 @@ impl DocidsExtractor for FacetedDocidsExtractor { let span = tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction"); let _entered = span.enter(); + let local = ThreadLocal::new(); document_changes.into_par_iter().try_arc_for_each_try_init( || { - let rtxn = index.read_txn().map_err(Error::from)?; - let cache = caches.push(CboCachedSorter::new( - // TODO use a better value - 100.try_into().unwrap(), - create_sorter( - grenad::SortAlgorithm::Stable, - MergeDeladdCboRoaringBitmaps, - indexer.chunk_compression_type, - indexer.chunk_compression_level, - indexer.max_nb_chunks, - max_memory, - ), - )); - Ok((rtxn, fields_ids_map.clone(), Vec::new(), cache)) + local.get_or_try(|| { + let rtxn = index.read_txn().map_err(Error::from)?; + let cache = caches.push(CboCachedSorter::new( + /// TODO use a better value + 100.try_into().unwrap(), + create_sorter( + grenad::SortAlgorithm::Stable, + MergeDeladdCboRoaringBitmaps, + indexer.chunk_compression_type, + indexer.chunk_compression_level, + indexer.max_nb_chunks, + max_memory, + ), + )); + Ok((rtxn, RefCell::new((fields_ids_map.clone(), Vec::new(), cache)))) + }) }, - |(rtxn, fields_ids_map, buffer, cached_sorter), document_change| { + |(rtxn, rc), document_change| { + let (fields_ids_map, buffer, cached_sorter) = &mut *rc.borrow_mut(); Self::extract_document_change( rtxn, index, diff --git a/milli/src/update/new/extract/searchable/extract_word_docids.rs b/milli/src/update/new/extract/searchable/extract_word_docids.rs index 702b8f4e9..df8409618 100644 --- a/milli/src/update/new/extract/searchable/extract_word_docids.rs +++ b/milli/src/update/new/extract/searchable/extract_word_docids.rs @@ -1,3 +1,4 @@ +use std::cell::RefCell; use std::collections::HashMap; use std::fs::File; use std::num::NonZero; @@ -6,6 +7,7 @@ use std::sync::Arc; use grenad::{Merger, MergerBuilder}; use heed::RoTxn; use rayon::iter::IntoParallelIterator; +use thread_local::ThreadLocal; use super::tokenize_document::{tokenizer_builder, DocumentTokenizer}; use super::SearchableExtractor; @@ -347,18 +349,23 @@ impl WordDocidsExtractors { let span = tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction"); let _entered = span.enter(); + let local = ThreadLocal::new(); document_changes.into_par_iter().try_arc_for_each_try_init( || { - let rtxn = index.read_txn().map_err(Error::from)?; - let cache = caches.push(WordDocidsCachedSorters::new( - indexer, - max_memory, - // TODO use a better value - 200_000.try_into().unwrap(), - )); - Ok((rtxn, &document_tokenizer, fields_ids_map.clone(), cache)) + local.get_or_try(|| { + let rtxn = index.read_txn().map_err(Error::from)?; + let fields_ids_map = fields_ids_map.clone(); + let cache = caches.push(WordDocidsCachedSorters::new( + indexer, + max_memory, + // TODO use a better value + 200_000.try_into().unwrap(), + )); + Ok((rtxn, &document_tokenizer, RefCell::new((fields_ids_map, cache)))) + }) }, - |(rtxn, document_tokenizer, fields_ids_map, cached_sorter), document_change| { + |(rtxn, document_tokenizer, rc), document_change| { + let (fields_ids_map, cached_sorter) = &mut *rc.borrow_mut(); Self::extract_document_change( rtxn, index, @@ -377,7 +384,9 @@ impl WordDocidsExtractors { tracing::trace_span!(target: "indexing::documents::extract", "merger_building"); let _entered = span.enter(); let mut builder = WordDocidsMergerBuilders::new(); + let mut count = 0; for cache in caches.into_iter() { + count += 1; builder.add_sorters(cache)?; } diff --git a/milli/src/update/new/extract/searchable/mod.rs b/milli/src/update/new/extract/searchable/mod.rs index ba1d53f54..272bff4d3 100644 --- a/milli/src/update/new/extract/searchable/mod.rs +++ b/milli/src/update/new/extract/searchable/mod.rs @@ -2,6 +2,7 @@ mod extract_word_docids; mod extract_word_pair_proximity_docids; mod tokenize_document; +use std::cell::RefCell; use std::fs::File; use std::sync::Arc; @@ -10,6 +11,7 @@ pub use extract_word_pair_proximity_docids::WordPairProximityDocidsExtractor; use grenad::Merger; use heed::RoTxn; use rayon::iter::{IntoParallelIterator, ParallelBridge, ParallelIterator}; +use thread_local::ThreadLocal; use tokenize_document::{tokenizer_builder, DocumentTokenizer}; use super::cache::CboCachedSorter; @@ -64,24 +66,32 @@ pub trait SearchableExtractor { let span = tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction"); let _entered = span.enter(); + let local = ThreadLocal::new(); document_changes.into_par_iter().try_arc_for_each_try_init( || { - let rtxn = index.read_txn().map_err(Error::from)?; - let cache = caches.push(CboCachedSorter::new( - // TODO use a better value - 1_000_000.try_into().unwrap(), - create_sorter( - grenad::SortAlgorithm::Stable, - MergeDeladdCboRoaringBitmaps, - indexer.chunk_compression_type, - indexer.chunk_compression_level, - indexer.max_nb_chunks, - max_memory, - ), - )); - Ok((rtxn, &document_tokenizer, fields_ids_map.clone(), cache)) + local.get_or_try(|| { + let rtxn = index.read_txn().map_err(Error::from)?; + let cache = caches.push(CboCachedSorter::new( + /// TODO use a better value + 1_000_000.try_into().unwrap(), + create_sorter( + grenad::SortAlgorithm::Stable, + MergeDeladdCboRoaringBitmaps, + indexer.chunk_compression_type, + indexer.chunk_compression_level, + indexer.max_nb_chunks, + max_memory, + ), + )); + Ok(( + rtxn, + &document_tokenizer, + RefCell::new((fields_ids_map.clone(), cache)), + )) + }) }, - |(rtxn, document_tokenizer, fields_ids_map, cached_sorter), document_change| { + |(rtxn, document_tokenizer, rc), document_change| { + let (fields_ids_map, cached_sorter) = &mut *rc.borrow_mut(); Self::extract_document_change( rtxn, index, From 83c09d0db0f418ad84c69c9a6ae582452708cdc4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Mon, 7 Oct 2024 16:38:45 +0200 Subject: [PATCH 2/3] Remove the now, useless AppendOnlyVec library --- .../src/update/new/append_only_linked_list.rs | 158 ------------------ .../new/extract/faceted/extract_facets.rs | 15 +- .../extract/searchable/extract_word_docids.rs | 15 +- .../src/update/new/extract/searchable/mod.rs | 16 +- milli/src/update/new/mod.rs | 1 - 5 files changed, 21 insertions(+), 184 deletions(-) delete mode 100644 milli/src/update/new/append_only_linked_list.rs diff --git a/milli/src/update/new/append_only_linked_list.rs b/milli/src/update/new/append_only_linked_list.rs deleted file mode 100644 index 274d3eea4..000000000 --- a/milli/src/update/new/append_only_linked_list.rs +++ /dev/null @@ -1,158 +0,0 @@ -use std::sync::atomic::AtomicPtr; -use std::{fmt, mem}; - -/// An append-only linked-list that returns a mutable references to the pushed items. -pub struct AppendOnlyLinkedList { - head: AtomicPtr>, -} - -struct Node { - item: T, - next: AtomicPtr>, -} - -impl AppendOnlyLinkedList { - /// Creates an empty list. - pub fn new() -> AppendOnlyLinkedList { - AppendOnlyLinkedList { head: AtomicPtr::default() } - } - - /// Pushes the item at the front of the linked-list and returns a unique and mutable reference to it. - #[allow(clippy::mut_from_ref)] // the mut ref is derived from T and unique each time - pub fn push(&self, item: T) -> &mut T { - use std::sync::atomic::Ordering::{Relaxed, SeqCst}; - - let node = Box::leak(Box::new(Node { item, next: AtomicPtr::default() })); - let mut head = self.head.load(SeqCst); - - loop { - std::hint::spin_loop(); - node.next = AtomicPtr::new(head); - match self.head.compare_exchange_weak(head, node, SeqCst, Relaxed) { - Ok(_) => break, - Err(new) => head = new, - } - } - - &mut node.item - } -} - -impl Default for AppendOnlyLinkedList { - fn default() -> Self { - Self::new() - } -} - -impl Drop for AppendOnlyLinkedList { - fn drop(&mut self) { - // Let's use the drop implementation of the IntoIter struct - IntoIter(mem::take(&mut self.head)); - } -} - -impl fmt::Debug for AppendOnlyLinkedList { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("AppendOnlyLinkedList").finish() - } -} - -impl IntoIterator for AppendOnlyLinkedList { - type Item = T; - type IntoIter = IntoIter; - - fn into_iter(mut self) -> Self::IntoIter { - IntoIter(mem::take(&mut self.head)) - } -} - -pub struct IntoIter(AtomicPtr>); - -impl Iterator for IntoIter { - type Item = T; - - fn next(&mut self) -> Option { - let ptr = *self.0.get_mut(); - if ptr.is_null() { - None - } else { - let node = unsafe { Box::from_raw(ptr) }; - // Let's set the next node to read to be the next of this one - self.0 = node.next; - Some(node.item) - } - } -} - -impl Drop for IntoIter { - fn drop(&mut self) { - let mut ptr = *self.0.get_mut(); - while !ptr.is_null() { - let mut node = unsafe { Box::from_raw(ptr) }; - // Let's set the next node to read to be the next of this one - ptr = *node.next.get_mut(); - } - } -} - -#[test] -fn test_parallel_pushing() { - use std::sync::Arc; - let v = Arc::new(AppendOnlyLinkedList::::new()); - let mut threads = Vec::new(); - const N: u64 = 100; - for thread_num in 0..N { - let v = v.clone(); - threads.push(std::thread::spawn(move || { - let which1 = v.push(thread_num); - let which2 = v.push(thread_num); - assert_eq!(*which1, thread_num); - assert_eq!(*which2, thread_num); - })); - } - for t in threads { - t.join().unwrap(); - } - let v = Arc::into_inner(v).unwrap().into_iter().collect::>(); - for thread_num in (0..N).rev() { - assert_eq!(2, v.iter().copied().filter(|&x| x == thread_num).count()); - } -} - -#[test] -fn test_into_vec() { - struct SafeToDrop(bool); - - impl Drop for SafeToDrop { - fn drop(&mut self) { - assert!(self.0); - } - } - - let v = AppendOnlyLinkedList::new(); - - for _ in 0..50 { - v.push(SafeToDrop(false)); - } - - let mut v = v.into_iter().collect::>(); - assert_eq!(v.len(), 50); - - for i in v.iter_mut() { - i.0 = true; - } -} - -#[test] -fn test_push_then_index_mut() { - let v = AppendOnlyLinkedList::::new(); - let mut w = Vec::new(); - for i in 0..1024 { - *v.push(i) += 1; - w.push(i + 1); - } - - let mut v = v.into_iter().collect::>(); - v.reverse(); - assert_eq!(v, w); -} diff --git a/milli/src/update/new/extract/faceted/extract_facets.rs b/milli/src/update/new/extract/faceted/extract_facets.rs index 8e8b71676..e6c3b02e6 100644 --- a/milli/src/update/new/extract/faceted/extract_facets.rs +++ b/milli/src/update/new/extract/faceted/extract_facets.rs @@ -14,7 +14,6 @@ use super::super::cache::CboCachedSorter; use super::facet_document::extract_document_facets; use super::FacetKind; use crate::facet::value_encoding::f64_into_bytes; -use crate::update::new::append_only_linked_list::AppendOnlyLinkedList; use crate::update::new::extract::DocidsExtractor; use crate::update::new::parallel_iterator_ext::ParallelIteratorExt; use crate::update::new::DocumentChange; @@ -212,18 +211,17 @@ impl DocidsExtractor for FacetedDocidsExtractor { let attributes_to_extract = Self::attributes_to_extract(&rtxn, index)?; let attributes_to_extract: Vec<_> = attributes_to_extract.iter().map(|s| s.as_ref()).collect(); - let caches = AppendOnlyLinkedList::new(); + let thread_local = ThreadLocal::new(); { let span = tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction"); let _entered = span.enter(); - let local = ThreadLocal::new(); document_changes.into_par_iter().try_arc_for_each_try_init( || { - local.get_or_try(|| { + thread_local.get_or_try(|| { let rtxn = index.read_txn().map_err(Error::from)?; - let cache = caches.push(CboCachedSorter::new( + let cache = CboCachedSorter::new( /// TODO use a better value 100.try_into().unwrap(), create_sorter( @@ -234,7 +232,7 @@ impl DocidsExtractor for FacetedDocidsExtractor { indexer.max_nb_chunks, max_memory, ), - )); + ); Ok((rtxn, RefCell::new((fields_ids_map.clone(), Vec::new(), cache)))) }) }, @@ -259,10 +257,11 @@ impl DocidsExtractor for FacetedDocidsExtractor { tracing::trace_span!(target: "indexing::documents::extract", "merger_building"); let _entered = span.enter(); - let readers: Vec<_> = caches + let readers: Vec<_> = thread_local .into_iter() .par_bridge() - .map(|cached_sorter| { + .map(|(_, rc)| { + let (_, _, cached_sorter) = rc.into_inner(); let sorter = cached_sorter.into_sorter()?; sorter.into_reader_cursors() }) diff --git a/milli/src/update/new/extract/searchable/extract_word_docids.rs b/milli/src/update/new/extract/searchable/extract_word_docids.rs index df8409618..6da793276 100644 --- a/milli/src/update/new/extract/searchable/extract_word_docids.rs +++ b/milli/src/update/new/extract/searchable/extract_word_docids.rs @@ -11,7 +11,6 @@ use thread_local::ThreadLocal; use super::tokenize_document::{tokenizer_builder, DocumentTokenizer}; use super::SearchableExtractor; -use crate::update::new::append_only_linked_list::AppendOnlyLinkedList; use crate::update::new::extract::cache::CboCachedSorter; use crate::update::new::extract::perm_json_p::contained_in; use crate::update::new::parallel_iterator_ext::ParallelIteratorExt; @@ -343,24 +342,23 @@ impl WordDocidsExtractors { max_positions_per_attributes: MAX_POSITION_PER_ATTRIBUTE, }; - let caches = AppendOnlyLinkedList::new(); + let thread_local = ThreadLocal::new(); { let span = tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction"); let _entered = span.enter(); - let local = ThreadLocal::new(); document_changes.into_par_iter().try_arc_for_each_try_init( || { - local.get_or_try(|| { + thread_local.get_or_try(|| { let rtxn = index.read_txn().map_err(Error::from)?; let fields_ids_map = fields_ids_map.clone(); - let cache = caches.push(WordDocidsCachedSorters::new( + let cache = WordDocidsCachedSorters::new( indexer, max_memory, // TODO use a better value 200_000.try_into().unwrap(), - )); + ); Ok((rtxn, &document_tokenizer, RefCell::new((fields_ids_map, cache)))) }) }, @@ -384,9 +382,8 @@ impl WordDocidsExtractors { tracing::trace_span!(target: "indexing::documents::extract", "merger_building"); let _entered = span.enter(); let mut builder = WordDocidsMergerBuilders::new(); - let mut count = 0; - for cache in caches.into_iter() { - count += 1; + for (_, _, rc) in thread_local.into_iter() { + let (_, cache) = rc.into_inner(); builder.add_sorters(cache)?; } diff --git a/milli/src/update/new/extract/searchable/mod.rs b/milli/src/update/new/extract/searchable/mod.rs index 272bff4d3..25f1eda14 100644 --- a/milli/src/update/new/extract/searchable/mod.rs +++ b/milli/src/update/new/extract/searchable/mod.rs @@ -16,7 +16,6 @@ use tokenize_document::{tokenizer_builder, DocumentTokenizer}; use super::cache::CboCachedSorter; use super::DocidsExtractor; -use crate::update::new::append_only_linked_list::AppendOnlyLinkedList; use crate::update::new::parallel_iterator_ext::ParallelIteratorExt; use crate::update::new::DocumentChange; use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; @@ -60,18 +59,18 @@ pub trait SearchableExtractor { localized_attributes_rules: &localized_attributes_rules, max_positions_per_attributes: MAX_POSITION_PER_ATTRIBUTE, }; - let caches = AppendOnlyLinkedList::new(); + + let thread_local = ThreadLocal::new(); { let span = tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction"); let _entered = span.enter(); - let local = ThreadLocal::new(); document_changes.into_par_iter().try_arc_for_each_try_init( || { - local.get_or_try(|| { + thread_local.get_or_try(|| { let rtxn = index.read_txn().map_err(Error::from)?; - let cache = caches.push(CboCachedSorter::new( + let cache = CboCachedSorter::new( /// TODO use a better value 1_000_000.try_into().unwrap(), create_sorter( @@ -82,7 +81,7 @@ pub trait SearchableExtractor { indexer.max_nb_chunks, max_memory, ), - )); + ); Ok(( rtxn, &document_tokenizer, @@ -110,10 +109,11 @@ pub trait SearchableExtractor { tracing::trace_span!(target: "indexing::documents::extract", "merger_building"); let _entered = span.enter(); - let readers: Vec<_> = caches + let readers: Vec<_> = thread_local .into_iter() .par_bridge() - .map(|cached_sorter| { + .map(|(_, _, rc)| { + let (_, cached_sorter) = rc.into_inner(); let sorter = cached_sorter.into_sorter()?; sorter.into_reader_cursors() }) diff --git a/milli/src/update/new/mod.rs b/milli/src/update/new/mod.rs index 862dd4dac..4a83529dc 100644 --- a/milli/src/update/new/mod.rs +++ b/milli/src/update/new/mod.rs @@ -4,7 +4,6 @@ pub use top_level_map::{CowStr, TopLevelMap}; use super::del_add::DelAdd; use crate::FieldId; -mod append_only_linked_list; mod channel; mod document_change; mod extract; From eb09dfed04e377bf44cafe132458004af8222158 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Mon, 7 Oct 2024 16:41:17 +0200 Subject: [PATCH 3/3] Avoid reallocation with the ThreadLocal pool --- milli/src/update/new/extract/faceted/extract_facets.rs | 2 +- milli/src/update/new/extract/searchable/extract_word_docids.rs | 2 +- milli/src/update/new/extract/searchable/mod.rs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/milli/src/update/new/extract/faceted/extract_facets.rs b/milli/src/update/new/extract/faceted/extract_facets.rs index e6c3b02e6..8ca9a8b20 100644 --- a/milli/src/update/new/extract/faceted/extract_facets.rs +++ b/milli/src/update/new/extract/faceted/extract_facets.rs @@ -211,7 +211,7 @@ impl DocidsExtractor for FacetedDocidsExtractor { let attributes_to_extract = Self::attributes_to_extract(&rtxn, index)?; let attributes_to_extract: Vec<_> = attributes_to_extract.iter().map(|s| s.as_ref()).collect(); - let thread_local = ThreadLocal::new(); + let thread_local = ThreadLocal::with_capacity(rayon::current_num_threads()); { let span = diff --git a/milli/src/update/new/extract/searchable/extract_word_docids.rs b/milli/src/update/new/extract/searchable/extract_word_docids.rs index 6da793276..dde969614 100644 --- a/milli/src/update/new/extract/searchable/extract_word_docids.rs +++ b/milli/src/update/new/extract/searchable/extract_word_docids.rs @@ -342,7 +342,7 @@ impl WordDocidsExtractors { max_positions_per_attributes: MAX_POSITION_PER_ATTRIBUTE, }; - let thread_local = ThreadLocal::new(); + let thread_local = ThreadLocal::with_capacity(rayon::current_num_threads()); { let span = diff --git a/milli/src/update/new/extract/searchable/mod.rs b/milli/src/update/new/extract/searchable/mod.rs index 25f1eda14..a261efda3 100644 --- a/milli/src/update/new/extract/searchable/mod.rs +++ b/milli/src/update/new/extract/searchable/mod.rs @@ -60,7 +60,7 @@ pub trait SearchableExtractor { max_positions_per_attributes: MAX_POSITION_PER_ATTRIBUTE, }; - let thread_local = ThreadLocal::new(); + let thread_local = ThreadLocal::with_capacity(rayon::current_num_threads()); { let span =