diff --git a/Cargo.lock b/Cargo.lock index 06bd9c234..335445956 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3598,6 +3598,7 @@ dependencies = [ "smartstring", "tempfile", "thiserror", + "thread_local", "tiktoken-rs", "time", "tokenizers", @@ -5332,9 +5333,9 @@ dependencies = [ [[package]] name = "thread_local" -version = "1.1.7" +version = "1.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3fdd6f064ccff2d6567adcb3873ca630700f00b5ad3f060c25b5dcfd9a4ce152" +checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c" dependencies = [ "cfg-if", "once_cell", diff --git a/milli/Cargo.toml b/milli/Cargo.toml index bae3dd64b..72f3daa4e 100644 --- a/milli/Cargo.toml +++ b/milli/Cargo.toml @@ -89,6 +89,7 @@ ureq = { version = "2.10.0", features = ["json"] } url = "2.5.2" rayon-par-bridge = "0.1.0" hashbrown = "0.14.5" +thread_local = "1.1.8" [dev-dependencies] mimalloc = { version = "0.1.43", default-features = false } diff --git a/milli/src/update/new/append_only_linked_list.rs b/milli/src/update/new/append_only_linked_list.rs deleted file mode 100644 index 274d3eea4..000000000 --- a/milli/src/update/new/append_only_linked_list.rs +++ /dev/null @@ -1,158 +0,0 @@ -use std::sync::atomic::AtomicPtr; -use std::{fmt, mem}; - -/// An append-only linked-list that returns a mutable references to the pushed items. -pub struct AppendOnlyLinkedList { - head: AtomicPtr>, -} - -struct Node { - item: T, - next: AtomicPtr>, -} - -impl AppendOnlyLinkedList { - /// Creates an empty list. - pub fn new() -> AppendOnlyLinkedList { - AppendOnlyLinkedList { head: AtomicPtr::default() } - } - - /// Pushes the item at the front of the linked-list and returns a unique and mutable reference to it. - #[allow(clippy::mut_from_ref)] // the mut ref is derived from T and unique each time - pub fn push(&self, item: T) -> &mut T { - use std::sync::atomic::Ordering::{Relaxed, SeqCst}; - - let node = Box::leak(Box::new(Node { item, next: AtomicPtr::default() })); - let mut head = self.head.load(SeqCst); - - loop { - std::hint::spin_loop(); - node.next = AtomicPtr::new(head); - match self.head.compare_exchange_weak(head, node, SeqCst, Relaxed) { - Ok(_) => break, - Err(new) => head = new, - } - } - - &mut node.item - } -} - -impl Default for AppendOnlyLinkedList { - fn default() -> Self { - Self::new() - } -} - -impl Drop for AppendOnlyLinkedList { - fn drop(&mut self) { - // Let's use the drop implementation of the IntoIter struct - IntoIter(mem::take(&mut self.head)); - } -} - -impl fmt::Debug for AppendOnlyLinkedList { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("AppendOnlyLinkedList").finish() - } -} - -impl IntoIterator for AppendOnlyLinkedList { - type Item = T; - type IntoIter = IntoIter; - - fn into_iter(mut self) -> Self::IntoIter { - IntoIter(mem::take(&mut self.head)) - } -} - -pub struct IntoIter(AtomicPtr>); - -impl Iterator for IntoIter { - type Item = T; - - fn next(&mut self) -> Option { - let ptr = *self.0.get_mut(); - if ptr.is_null() { - None - } else { - let node = unsafe { Box::from_raw(ptr) }; - // Let's set the next node to read to be the next of this one - self.0 = node.next; - Some(node.item) - } - } -} - -impl Drop for IntoIter { - fn drop(&mut self) { - let mut ptr = *self.0.get_mut(); - while !ptr.is_null() { - let mut node = unsafe { Box::from_raw(ptr) }; - // Let's set the next node to read to be the next of this one - ptr = *node.next.get_mut(); - } - } -} - -#[test] -fn test_parallel_pushing() { - use std::sync::Arc; - let v = Arc::new(AppendOnlyLinkedList::::new()); - let mut threads = Vec::new(); - const N: u64 = 100; - for thread_num in 0..N { - let v = v.clone(); - threads.push(std::thread::spawn(move || { - let which1 = v.push(thread_num); - let which2 = v.push(thread_num); - assert_eq!(*which1, thread_num); - assert_eq!(*which2, thread_num); - })); - } - for t in threads { - t.join().unwrap(); - } - let v = Arc::into_inner(v).unwrap().into_iter().collect::>(); - for thread_num in (0..N).rev() { - assert_eq!(2, v.iter().copied().filter(|&x| x == thread_num).count()); - } -} - -#[test] -fn test_into_vec() { - struct SafeToDrop(bool); - - impl Drop for SafeToDrop { - fn drop(&mut self) { - assert!(self.0); - } - } - - let v = AppendOnlyLinkedList::new(); - - for _ in 0..50 { - v.push(SafeToDrop(false)); - } - - let mut v = v.into_iter().collect::>(); - assert_eq!(v.len(), 50); - - for i in v.iter_mut() { - i.0 = true; - } -} - -#[test] -fn test_push_then_index_mut() { - let v = AppendOnlyLinkedList::::new(); - let mut w = Vec::new(); - for i in 0..1024 { - *v.push(i) += 1; - w.push(i + 1); - } - - let mut v = v.into_iter().collect::>(); - v.reverse(); - assert_eq!(v, w); -} diff --git a/milli/src/update/new/extract/faceted/extract_facets.rs b/milli/src/update/new/extract/faceted/extract_facets.rs index f4ad50bfe..8ca9a8b20 100644 --- a/milli/src/update/new/extract/faceted/extract_facets.rs +++ b/milli/src/update/new/extract/faceted/extract_facets.rs @@ -1,3 +1,4 @@ +use std::cell::RefCell; use std::collections::HashSet; use std::fmt::Debug; use std::fs::File; @@ -7,12 +8,12 @@ use grenad::{MergeFunction, Merger}; use heed::RoTxn; use rayon::iter::{IntoParallelIterator, ParallelBridge, ParallelIterator}; use serde_json::Value; +use thread_local::ThreadLocal; use super::super::cache::CboCachedSorter; use super::facet_document::extract_document_facets; use super::FacetKind; use crate::facet::value_encoding::f64_into_bytes; -use crate::update::new::append_only_linked_list::AppendOnlyLinkedList; use crate::update::new::extract::DocidsExtractor; use crate::update::new::parallel_iterator_ext::ParallelIteratorExt; use crate::update::new::DocumentChange; @@ -210,7 +211,7 @@ impl DocidsExtractor for FacetedDocidsExtractor { let attributes_to_extract = Self::attributes_to_extract(&rtxn, index)?; let attributes_to_extract: Vec<_> = attributes_to_extract.iter().map(|s| s.as_ref()).collect(); - let caches = AppendOnlyLinkedList::new(); + let thread_local = ThreadLocal::with_capacity(rayon::current_num_threads()); { let span = @@ -218,22 +219,25 @@ impl DocidsExtractor for FacetedDocidsExtractor { let _entered = span.enter(); document_changes.into_par_iter().try_arc_for_each_try_init( || { - let rtxn = index.read_txn().map_err(Error::from)?; - let cache = caches.push(CboCachedSorter::new( - // TODO use a better value - 100.try_into().unwrap(), - create_sorter( - grenad::SortAlgorithm::Stable, - MergeDeladdCboRoaringBitmaps, - indexer.chunk_compression_type, - indexer.chunk_compression_level, - indexer.max_nb_chunks, - max_memory, - ), - )); - Ok((rtxn, fields_ids_map.clone(), Vec::new(), cache)) + thread_local.get_or_try(|| { + let rtxn = index.read_txn().map_err(Error::from)?; + let cache = CboCachedSorter::new( + /// TODO use a better value + 100.try_into().unwrap(), + create_sorter( + grenad::SortAlgorithm::Stable, + MergeDeladdCboRoaringBitmaps, + indexer.chunk_compression_type, + indexer.chunk_compression_level, + indexer.max_nb_chunks, + max_memory, + ), + ); + Ok((rtxn, RefCell::new((fields_ids_map.clone(), Vec::new(), cache)))) + }) }, - |(rtxn, fields_ids_map, buffer, cached_sorter), document_change| { + |(rtxn, rc), document_change| { + let (fields_ids_map, buffer, cached_sorter) = &mut *rc.borrow_mut(); Self::extract_document_change( rtxn, index, @@ -253,10 +257,11 @@ impl DocidsExtractor for FacetedDocidsExtractor { tracing::trace_span!(target: "indexing::documents::extract", "merger_building"); let _entered = span.enter(); - let readers: Vec<_> = caches + let readers: Vec<_> = thread_local .into_iter() .par_bridge() - .map(|cached_sorter| { + .map(|(_, rc)| { + let (_, _, cached_sorter) = rc.into_inner(); let sorter = cached_sorter.into_sorter()?; sorter.into_reader_cursors() }) diff --git a/milli/src/update/new/extract/searchable/extract_word_docids.rs b/milli/src/update/new/extract/searchable/extract_word_docids.rs index 702b8f4e9..dde969614 100644 --- a/milli/src/update/new/extract/searchable/extract_word_docids.rs +++ b/milli/src/update/new/extract/searchable/extract_word_docids.rs @@ -1,3 +1,4 @@ +use std::cell::RefCell; use std::collections::HashMap; use std::fs::File; use std::num::NonZero; @@ -6,10 +7,10 @@ use std::sync::Arc; use grenad::{Merger, MergerBuilder}; use heed::RoTxn; use rayon::iter::IntoParallelIterator; +use thread_local::ThreadLocal; use super::tokenize_document::{tokenizer_builder, DocumentTokenizer}; use super::SearchableExtractor; -use crate::update::new::append_only_linked_list::AppendOnlyLinkedList; use crate::update::new::extract::cache::CboCachedSorter; use crate::update::new::extract::perm_json_p::contained_in; use crate::update::new::parallel_iterator_ext::ParallelIteratorExt; @@ -341,7 +342,7 @@ impl WordDocidsExtractors { max_positions_per_attributes: MAX_POSITION_PER_ATTRIBUTE, }; - let caches = AppendOnlyLinkedList::new(); + let thread_local = ThreadLocal::with_capacity(rayon::current_num_threads()); { let span = @@ -349,16 +350,20 @@ impl WordDocidsExtractors { let _entered = span.enter(); document_changes.into_par_iter().try_arc_for_each_try_init( || { - let rtxn = index.read_txn().map_err(Error::from)?; - let cache = caches.push(WordDocidsCachedSorters::new( - indexer, - max_memory, - // TODO use a better value - 200_000.try_into().unwrap(), - )); - Ok((rtxn, &document_tokenizer, fields_ids_map.clone(), cache)) + thread_local.get_or_try(|| { + let rtxn = index.read_txn().map_err(Error::from)?; + let fields_ids_map = fields_ids_map.clone(); + let cache = WordDocidsCachedSorters::new( + indexer, + max_memory, + // TODO use a better value + 200_000.try_into().unwrap(), + ); + Ok((rtxn, &document_tokenizer, RefCell::new((fields_ids_map, cache)))) + }) }, - |(rtxn, document_tokenizer, fields_ids_map, cached_sorter), document_change| { + |(rtxn, document_tokenizer, rc), document_change| { + let (fields_ids_map, cached_sorter) = &mut *rc.borrow_mut(); Self::extract_document_change( rtxn, index, @@ -377,7 +382,8 @@ impl WordDocidsExtractors { tracing::trace_span!(target: "indexing::documents::extract", "merger_building"); let _entered = span.enter(); let mut builder = WordDocidsMergerBuilders::new(); - for cache in caches.into_iter() { + for (_, _, rc) in thread_local.into_iter() { + let (_, cache) = rc.into_inner(); builder.add_sorters(cache)?; } diff --git a/milli/src/update/new/extract/searchable/mod.rs b/milli/src/update/new/extract/searchable/mod.rs index ba1d53f54..a261efda3 100644 --- a/milli/src/update/new/extract/searchable/mod.rs +++ b/milli/src/update/new/extract/searchable/mod.rs @@ -2,6 +2,7 @@ mod extract_word_docids; mod extract_word_pair_proximity_docids; mod tokenize_document; +use std::cell::RefCell; use std::fs::File; use std::sync::Arc; @@ -10,11 +11,11 @@ pub use extract_word_pair_proximity_docids::WordPairProximityDocidsExtractor; use grenad::Merger; use heed::RoTxn; use rayon::iter::{IntoParallelIterator, ParallelBridge, ParallelIterator}; +use thread_local::ThreadLocal; use tokenize_document::{tokenizer_builder, DocumentTokenizer}; use super::cache::CboCachedSorter; use super::DocidsExtractor; -use crate::update::new::append_only_linked_list::AppendOnlyLinkedList; use crate::update::new::parallel_iterator_ext::ParallelIteratorExt; use crate::update::new::DocumentChange; use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; @@ -58,7 +59,8 @@ pub trait SearchableExtractor { localized_attributes_rules: &localized_attributes_rules, max_positions_per_attributes: MAX_POSITION_PER_ATTRIBUTE, }; - let caches = AppendOnlyLinkedList::new(); + + let thread_local = ThreadLocal::with_capacity(rayon::current_num_threads()); { let span = @@ -66,22 +68,29 @@ pub trait SearchableExtractor { let _entered = span.enter(); document_changes.into_par_iter().try_arc_for_each_try_init( || { - let rtxn = index.read_txn().map_err(Error::from)?; - let cache = caches.push(CboCachedSorter::new( - // TODO use a better value - 1_000_000.try_into().unwrap(), - create_sorter( - grenad::SortAlgorithm::Stable, - MergeDeladdCboRoaringBitmaps, - indexer.chunk_compression_type, - indexer.chunk_compression_level, - indexer.max_nb_chunks, - max_memory, - ), - )); - Ok((rtxn, &document_tokenizer, fields_ids_map.clone(), cache)) + thread_local.get_or_try(|| { + let rtxn = index.read_txn().map_err(Error::from)?; + let cache = CboCachedSorter::new( + /// TODO use a better value + 1_000_000.try_into().unwrap(), + create_sorter( + grenad::SortAlgorithm::Stable, + MergeDeladdCboRoaringBitmaps, + indexer.chunk_compression_type, + indexer.chunk_compression_level, + indexer.max_nb_chunks, + max_memory, + ), + ); + Ok(( + rtxn, + &document_tokenizer, + RefCell::new((fields_ids_map.clone(), cache)), + )) + }) }, - |(rtxn, document_tokenizer, fields_ids_map, cached_sorter), document_change| { + |(rtxn, document_tokenizer, rc), document_change| { + let (fields_ids_map, cached_sorter) = &mut *rc.borrow_mut(); Self::extract_document_change( rtxn, index, @@ -100,10 +109,11 @@ pub trait SearchableExtractor { tracing::trace_span!(target: "indexing::documents::extract", "merger_building"); let _entered = span.enter(); - let readers: Vec<_> = caches + let readers: Vec<_> = thread_local .into_iter() .par_bridge() - .map(|cached_sorter| { + .map(|(_, _, rc)| { + let (_, cached_sorter) = rc.into_inner(); let sorter = cached_sorter.into_sorter()?; sorter.into_reader_cursors() }) diff --git a/milli/src/update/new/mod.rs b/milli/src/update/new/mod.rs index 862dd4dac..4a83529dc 100644 --- a/milli/src/update/new/mod.rs +++ b/milli/src/update/new/mod.rs @@ -4,7 +4,6 @@ pub use top_level_map::{CowStr, TopLevelMap}; use super::del_add::DelAdd; use crate::FieldId; -mod append_only_linked_list; mod channel; mod document_change; mod extract;