diff --git a/milli/src/update/new/append_only_linked_list.rs b/milli/src/update/new/append_only_linked_list.rs deleted file mode 100644 index 274d3eea4..000000000 --- a/milli/src/update/new/append_only_linked_list.rs +++ /dev/null @@ -1,158 +0,0 @@ -use std::sync::atomic::AtomicPtr; -use std::{fmt, mem}; - -/// An append-only linked-list that returns a mutable references to the pushed items. -pub struct AppendOnlyLinkedList { - head: AtomicPtr>, -} - -struct Node { - item: T, - next: AtomicPtr>, -} - -impl AppendOnlyLinkedList { - /// Creates an empty list. - pub fn new() -> AppendOnlyLinkedList { - AppendOnlyLinkedList { head: AtomicPtr::default() } - } - - /// Pushes the item at the front of the linked-list and returns a unique and mutable reference to it. - #[allow(clippy::mut_from_ref)] // the mut ref is derived from T and unique each time - pub fn push(&self, item: T) -> &mut T { - use std::sync::atomic::Ordering::{Relaxed, SeqCst}; - - let node = Box::leak(Box::new(Node { item, next: AtomicPtr::default() })); - let mut head = self.head.load(SeqCst); - - loop { - std::hint::spin_loop(); - node.next = AtomicPtr::new(head); - match self.head.compare_exchange_weak(head, node, SeqCst, Relaxed) { - Ok(_) => break, - Err(new) => head = new, - } - } - - &mut node.item - } -} - -impl Default for AppendOnlyLinkedList { - fn default() -> Self { - Self::new() - } -} - -impl Drop for AppendOnlyLinkedList { - fn drop(&mut self) { - // Let's use the drop implementation of the IntoIter struct - IntoIter(mem::take(&mut self.head)); - } -} - -impl fmt::Debug for AppendOnlyLinkedList { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("AppendOnlyLinkedList").finish() - } -} - -impl IntoIterator for AppendOnlyLinkedList { - type Item = T; - type IntoIter = IntoIter; - - fn into_iter(mut self) -> Self::IntoIter { - IntoIter(mem::take(&mut self.head)) - } -} - -pub struct IntoIter(AtomicPtr>); - -impl Iterator for IntoIter { - type Item = T; - - fn next(&mut self) -> Option { - let ptr = *self.0.get_mut(); - if ptr.is_null() { - None - } else { - let node = unsafe { Box::from_raw(ptr) }; - // Let's set the next node to read to be the next of this one - self.0 = node.next; - Some(node.item) - } - } -} - -impl Drop for IntoIter { - fn drop(&mut self) { - let mut ptr = *self.0.get_mut(); - while !ptr.is_null() { - let mut node = unsafe { Box::from_raw(ptr) }; - // Let's set the next node to read to be the next of this one - ptr = *node.next.get_mut(); - } - } -} - -#[test] -fn test_parallel_pushing() { - use std::sync::Arc; - let v = Arc::new(AppendOnlyLinkedList::::new()); - let mut threads = Vec::new(); - const N: u64 = 100; - for thread_num in 0..N { - let v = v.clone(); - threads.push(std::thread::spawn(move || { - let which1 = v.push(thread_num); - let which2 = v.push(thread_num); - assert_eq!(*which1, thread_num); - assert_eq!(*which2, thread_num); - })); - } - for t in threads { - t.join().unwrap(); - } - let v = Arc::into_inner(v).unwrap().into_iter().collect::>(); - for thread_num in (0..N).rev() { - assert_eq!(2, v.iter().copied().filter(|&x| x == thread_num).count()); - } -} - -#[test] -fn test_into_vec() { - struct SafeToDrop(bool); - - impl Drop for SafeToDrop { - fn drop(&mut self) { - assert!(self.0); - } - } - - let v = AppendOnlyLinkedList::new(); - - for _ in 0..50 { - v.push(SafeToDrop(false)); - } - - let mut v = v.into_iter().collect::>(); - assert_eq!(v.len(), 50); - - for i in v.iter_mut() { - i.0 = true; - } -} - -#[test] -fn test_push_then_index_mut() { - let v = AppendOnlyLinkedList::::new(); - let mut w = Vec::new(); - for i in 0..1024 { - *v.push(i) += 1; - w.push(i + 1); - } - - let mut v = v.into_iter().collect::>(); - v.reverse(); - assert_eq!(v, w); -} diff --git a/milli/src/update/new/extract/faceted/extract_facets.rs b/milli/src/update/new/extract/faceted/extract_facets.rs index 8e8b71676..e6c3b02e6 100644 --- a/milli/src/update/new/extract/faceted/extract_facets.rs +++ b/milli/src/update/new/extract/faceted/extract_facets.rs @@ -14,7 +14,6 @@ use super::super::cache::CboCachedSorter; use super::facet_document::extract_document_facets; use super::FacetKind; use crate::facet::value_encoding::f64_into_bytes; -use crate::update::new::append_only_linked_list::AppendOnlyLinkedList; use crate::update::new::extract::DocidsExtractor; use crate::update::new::parallel_iterator_ext::ParallelIteratorExt; use crate::update::new::DocumentChange; @@ -212,18 +211,17 @@ impl DocidsExtractor for FacetedDocidsExtractor { let attributes_to_extract = Self::attributes_to_extract(&rtxn, index)?; let attributes_to_extract: Vec<_> = attributes_to_extract.iter().map(|s| s.as_ref()).collect(); - let caches = AppendOnlyLinkedList::new(); + let thread_local = ThreadLocal::new(); { let span = tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction"); let _entered = span.enter(); - let local = ThreadLocal::new(); document_changes.into_par_iter().try_arc_for_each_try_init( || { - local.get_or_try(|| { + thread_local.get_or_try(|| { let rtxn = index.read_txn().map_err(Error::from)?; - let cache = caches.push(CboCachedSorter::new( + let cache = CboCachedSorter::new( /// TODO use a better value 100.try_into().unwrap(), create_sorter( @@ -234,7 +232,7 @@ impl DocidsExtractor for FacetedDocidsExtractor { indexer.max_nb_chunks, max_memory, ), - )); + ); Ok((rtxn, RefCell::new((fields_ids_map.clone(), Vec::new(), cache)))) }) }, @@ -259,10 +257,11 @@ impl DocidsExtractor for FacetedDocidsExtractor { tracing::trace_span!(target: "indexing::documents::extract", "merger_building"); let _entered = span.enter(); - let readers: Vec<_> = caches + let readers: Vec<_> = thread_local .into_iter() .par_bridge() - .map(|cached_sorter| { + .map(|(_, rc)| { + let (_, _, cached_sorter) = rc.into_inner(); let sorter = cached_sorter.into_sorter()?; sorter.into_reader_cursors() }) diff --git a/milli/src/update/new/extract/searchable/extract_word_docids.rs b/milli/src/update/new/extract/searchable/extract_word_docids.rs index df8409618..6da793276 100644 --- a/milli/src/update/new/extract/searchable/extract_word_docids.rs +++ b/milli/src/update/new/extract/searchable/extract_word_docids.rs @@ -11,7 +11,6 @@ use thread_local::ThreadLocal; use super::tokenize_document::{tokenizer_builder, DocumentTokenizer}; use super::SearchableExtractor; -use crate::update::new::append_only_linked_list::AppendOnlyLinkedList; use crate::update::new::extract::cache::CboCachedSorter; use crate::update::new::extract::perm_json_p::contained_in; use crate::update::new::parallel_iterator_ext::ParallelIteratorExt; @@ -343,24 +342,23 @@ impl WordDocidsExtractors { max_positions_per_attributes: MAX_POSITION_PER_ATTRIBUTE, }; - let caches = AppendOnlyLinkedList::new(); + let thread_local = ThreadLocal::new(); { let span = tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction"); let _entered = span.enter(); - let local = ThreadLocal::new(); document_changes.into_par_iter().try_arc_for_each_try_init( || { - local.get_or_try(|| { + thread_local.get_or_try(|| { let rtxn = index.read_txn().map_err(Error::from)?; let fields_ids_map = fields_ids_map.clone(); - let cache = caches.push(WordDocidsCachedSorters::new( + let cache = WordDocidsCachedSorters::new( indexer, max_memory, // TODO use a better value 200_000.try_into().unwrap(), - )); + ); Ok((rtxn, &document_tokenizer, RefCell::new((fields_ids_map, cache)))) }) }, @@ -384,9 +382,8 @@ impl WordDocidsExtractors { tracing::trace_span!(target: "indexing::documents::extract", "merger_building"); let _entered = span.enter(); let mut builder = WordDocidsMergerBuilders::new(); - let mut count = 0; - for cache in caches.into_iter() { - count += 1; + for (_, _, rc) in thread_local.into_iter() { + let (_, cache) = rc.into_inner(); builder.add_sorters(cache)?; } diff --git a/milli/src/update/new/extract/searchable/mod.rs b/milli/src/update/new/extract/searchable/mod.rs index 272bff4d3..25f1eda14 100644 --- a/milli/src/update/new/extract/searchable/mod.rs +++ b/milli/src/update/new/extract/searchable/mod.rs @@ -16,7 +16,6 @@ use tokenize_document::{tokenizer_builder, DocumentTokenizer}; use super::cache::CboCachedSorter; use super::DocidsExtractor; -use crate::update::new::append_only_linked_list::AppendOnlyLinkedList; use crate::update::new::parallel_iterator_ext::ParallelIteratorExt; use crate::update::new::DocumentChange; use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; @@ -60,18 +59,18 @@ pub trait SearchableExtractor { localized_attributes_rules: &localized_attributes_rules, max_positions_per_attributes: MAX_POSITION_PER_ATTRIBUTE, }; - let caches = AppendOnlyLinkedList::new(); + + let thread_local = ThreadLocal::new(); { let span = tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction"); let _entered = span.enter(); - let local = ThreadLocal::new(); document_changes.into_par_iter().try_arc_for_each_try_init( || { - local.get_or_try(|| { + thread_local.get_or_try(|| { let rtxn = index.read_txn().map_err(Error::from)?; - let cache = caches.push(CboCachedSorter::new( + let cache = CboCachedSorter::new( /// TODO use a better value 1_000_000.try_into().unwrap(), create_sorter( @@ -82,7 +81,7 @@ pub trait SearchableExtractor { indexer.max_nb_chunks, max_memory, ), - )); + ); Ok(( rtxn, &document_tokenizer, @@ -110,10 +109,11 @@ pub trait SearchableExtractor { tracing::trace_span!(target: "indexing::documents::extract", "merger_building"); let _entered = span.enter(); - let readers: Vec<_> = caches + let readers: Vec<_> = thread_local .into_iter() .par_bridge() - .map(|cached_sorter| { + .map(|(_, _, rc)| { + let (_, cached_sorter) = rc.into_inner(); let sorter = cached_sorter.into_sorter()?; sorter.into_reader_cursors() }) diff --git a/milli/src/update/new/mod.rs b/milli/src/update/new/mod.rs index 862dd4dac..4a83529dc 100644 --- a/milli/src/update/new/mod.rs +++ b/milli/src/update/new/mod.rs @@ -4,7 +4,6 @@ pub use top_level_map::{CowStr, TopLevelMap}; use super::del_add::DelAdd; use crate::FieldId; -mod append_only_linked_list; mod channel; mod document_change; mod extract;