diff --git a/milli/src/update/new/append_only_linked_list.rs b/milli/src/update/new/append_only_linked_list.rs new file mode 100644 index 000000000..274d3eea4 --- /dev/null +++ b/milli/src/update/new/append_only_linked_list.rs @@ -0,0 +1,158 @@ +use std::sync::atomic::AtomicPtr; +use std::{fmt, mem}; + +/// An append-only linked-list that returns a mutable references to the pushed items. +pub struct AppendOnlyLinkedList { + head: AtomicPtr>, +} + +struct Node { + item: T, + next: AtomicPtr>, +} + +impl AppendOnlyLinkedList { + /// Creates an empty list. + pub fn new() -> AppendOnlyLinkedList { + AppendOnlyLinkedList { head: AtomicPtr::default() } + } + + /// Pushes the item at the front of the linked-list and returns a unique and mutable reference to it. + #[allow(clippy::mut_from_ref)] // the mut ref is derived from T and unique each time + pub fn push(&self, item: T) -> &mut T { + use std::sync::atomic::Ordering::{Relaxed, SeqCst}; + + let node = Box::leak(Box::new(Node { item, next: AtomicPtr::default() })); + let mut head = self.head.load(SeqCst); + + loop { + std::hint::spin_loop(); + node.next = AtomicPtr::new(head); + match self.head.compare_exchange_weak(head, node, SeqCst, Relaxed) { + Ok(_) => break, + Err(new) => head = new, + } + } + + &mut node.item + } +} + +impl Default for AppendOnlyLinkedList { + fn default() -> Self { + Self::new() + } +} + +impl Drop for AppendOnlyLinkedList { + fn drop(&mut self) { + // Let's use the drop implementation of the IntoIter struct + IntoIter(mem::take(&mut self.head)); + } +} + +impl fmt::Debug for AppendOnlyLinkedList { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("AppendOnlyLinkedList").finish() + } +} + +impl IntoIterator for AppendOnlyLinkedList { + type Item = T; + type IntoIter = IntoIter; + + fn into_iter(mut self) -> Self::IntoIter { + IntoIter(mem::take(&mut self.head)) + } +} + +pub struct IntoIter(AtomicPtr>); + +impl Iterator for IntoIter { + type Item = T; + + fn next(&mut self) -> Option { + let ptr = *self.0.get_mut(); + if ptr.is_null() { + None + } else { + let node = unsafe { Box::from_raw(ptr) }; + // Let's set the next node to read to be the next of this one + self.0 = node.next; + Some(node.item) + } + } +} + +impl Drop for IntoIter { + fn drop(&mut self) { + let mut ptr = *self.0.get_mut(); + while !ptr.is_null() { + let mut node = unsafe { Box::from_raw(ptr) }; + // Let's set the next node to read to be the next of this one + ptr = *node.next.get_mut(); + } + } +} + +#[test] +fn test_parallel_pushing() { + use std::sync::Arc; + let v = Arc::new(AppendOnlyLinkedList::::new()); + let mut threads = Vec::new(); + const N: u64 = 100; + for thread_num in 0..N { + let v = v.clone(); + threads.push(std::thread::spawn(move || { + let which1 = v.push(thread_num); + let which2 = v.push(thread_num); + assert_eq!(*which1, thread_num); + assert_eq!(*which2, thread_num); + })); + } + for t in threads { + t.join().unwrap(); + } + let v = Arc::into_inner(v).unwrap().into_iter().collect::>(); + for thread_num in (0..N).rev() { + assert_eq!(2, v.iter().copied().filter(|&x| x == thread_num).count()); + } +} + +#[test] +fn test_into_vec() { + struct SafeToDrop(bool); + + impl Drop for SafeToDrop { + fn drop(&mut self) { + assert!(self.0); + } + } + + let v = AppendOnlyLinkedList::new(); + + for _ in 0..50 { + v.push(SafeToDrop(false)); + } + + let mut v = v.into_iter().collect::>(); + assert_eq!(v.len(), 50); + + for i in v.iter_mut() { + i.0 = true; + } +} + +#[test] +fn test_push_then_index_mut() { + let v = AppendOnlyLinkedList::::new(); + let mut w = Vec::new(); + for i in 0..1024 { + *v.push(i) += 1; + w.push(i + 1); + } + + let mut v = v.into_iter().collect::>(); + v.reverse(); + assert_eq!(v, w); +} diff --git a/milli/src/update/new/append_only_vec.rs b/milli/src/update/new/append_only_vec.rs deleted file mode 100644 index d4a30c1b1..000000000 --- a/milli/src/update/new/append_only_vec.rs +++ /dev/null @@ -1,327 +0,0 @@ -// Code taken from -// and modified in order to get a ref mut instead of the index of newly inserted items. - -//! AppendOnlyVec -//! -//! This is a pretty simple type, which is a vector that you can push into and -//! receive a reference to the item you just inserted. The data structure never -//! moves an element once allocated, so you can push to the vec even while holding -//! mutable references to elements that have already been pushed. -//! -//! ### Scaling -//! -//! 1. Accessing an element is O(1), but slightly more expensive than for a -//! standard `Vec`. -//! -//! 2. Pushing a new element amortizes to O(1), but may require allocation of a -//! new chunk. -//! -//! ### Example -//! -//! ``` -//! use append_only_vec::AppendOnlyVec; -//! -//! static V: AppendOnlyVec = AppendOnlyVec::::new(); -//! let mut threads = Vec::new(); -//! for thread_num in 0..10 { -//! threads.push(std::thread::spawn(move || { -//! for n in 0..100 { -//! let s = format!("thread {} says {}", thread_num, n); -//! let which = V.push(s.clone()); -//! assert_eq!(&which, &s); -//! } -//! })); -//! } -//! -//! for t in threads { -//! t.join(); -//! } -//! -//! assert_eq!(V.len(), 1000); -//! ``` - -use std::cell::UnsafeCell; -use std::fmt::Debug; -use std::ptr; -use std::sync::atomic::{AtomicUsize, Ordering}; - -pub struct AppendOnlyVec { - count: AtomicUsize, - _reserved: AtomicUsize, - data: [UnsafeCell<*mut T>; BITS_USED - 1 - 3], -} - -unsafe impl Send for AppendOnlyVec {} -unsafe impl Sync for AppendOnlyVec {} - -const BITS: usize = std::mem::size_of::() * 8; - -#[cfg(target_arch = "x86_64")] -const BITS_USED: usize = 48; -#[cfg(all(not(target_arch = "x86_64"), target_pointer_width = "64"))] -const BITS_USED: usize = 64; -#[cfg(target_pointer_width = "32")] -const BITS_USED: usize = 32; - -// This takes an index into a vec, and determines which data array will hold it -// (the first return value), and what the index will be into that data array -// (second return value) -// -// The ith data array holds 1< (u32, usize) { - let i = i + 8; - let bin = BITS as u32 - 1 - i.leading_zeros(); - let bin = bin - 3; - let offset = i - bin_size(bin); - (bin, offset) -} - -const fn bin_size(array: u32) -> usize { - (1 << 3) << array -} - -#[test] -fn test_indices() { - for i in 0..32 { - println!("{:3}: {} {}", i, indices(i).0, indices(i).1); - } - let mut array = 0; - let mut offset = 0; - let mut index = 0; - while index < 1000 { - index += 1; - offset += 1; - if offset >= bin_size(array) { - offset = 0; - array += 1; - } - assert_eq!(indices(index), (array, offset)); - } -} - -impl AppendOnlyVec { - const EMPTY: UnsafeCell<*mut T> = UnsafeCell::new(ptr::null_mut()); - - /// Allocate a new empty array. - pub const fn new() -> Self { - AppendOnlyVec { - count: AtomicUsize::new(0), - _reserved: AtomicUsize::new(0), - data: [Self::EMPTY; BITS_USED - 1 - 3], - } - } - - /// Find the length of the array. - #[inline] - pub fn len(&self) -> usize { - self.count.load(Ordering::Acquire) - } - - fn layout(array: u32) -> std::alloc::Layout { - std::alloc::Layout::array::(bin_size(array)).unwrap() - } - - /// Append an element to the array and get a mutable ref to it. - /// - /// This is notable in that it doesn't require a `&mut self`, because it - /// does appropriate atomic synchronization. - pub fn push(&self, val: T) -> &mut T { - let idx = self._reserved.fetch_add(1, Ordering::Relaxed); - let (array, offset) = indices(idx); - let ptr = if self.len() < 1 + idx - offset { - // We are working on a new array, which may not have been allocated... - if offset == 0 { - // It is our job to allocate the array! The size of the array - // is determined in the self.layout method, which needs to be - // consistent with the indices function. - let layout = Self::layout(array); - let ptr = unsafe { std::alloc::alloc(layout) } as *mut T; - unsafe { - *self.data[array as usize].get() = ptr; - } - ptr - } else { - // We need to wait for the array to be allocated. - while self.len() < 1 + idx - offset { - std::hint::spin_loop(); - } - // The Ordering::Acquire semantics of self.len() ensures that - // this pointer read will get the non-null pointer allocated - // above. - unsafe { *self.data[array as usize].get() } - } - } else { - // The Ordering::Acquire semantics of self.len() ensures that - // this pointer read will get the non-null pointer allocated - // above. - unsafe { *self.data[array as usize].get() } - }; - - // The contents of this offset are guaranteed to be unused (so far) - // because we got the idx from our fetch_add above, and ptr is - // guaranteed to be valid because of the loop we used above, which used - // self.len() which has Ordering::Acquire semantics. - unsafe { (ptr.add(offset)).write(val) }; - - // Now we need to increase the size of the vec, so it can get read. We - // use Release upon success, to ensure that the value which we wrote is - // visible to any thread that has confirmed that the count is big enough - // to read that element. In case of failure, we can be relaxed, since - // we don't do anything with the result other than try again. - while self - .count - .compare_exchange(idx, idx + 1, Ordering::Release, Ordering::Relaxed) - .is_err() - { - // This means that someone else *started* pushing before we started, - // but hasn't yet finished. We have to wait for them to finish - // pushing before we can update the count. Note that using a - // spinloop here isn't really ideal, but except when allocating a - // new array, the window between reserving space and using it is - // pretty small, so contention will hopefully be rare, and having a - // context switch during that interval will hopefully be vanishingly - // unlikely. - std::hint::spin_loop(); - } - - unsafe { &mut *ptr } - } - - /// Convert into a standard `Vec`. - pub fn into_vec(self) -> Vec { - let mut vec = Vec::with_capacity(self.len()); - - for idx in 0..self.len() { - let (array, offset) = indices(idx); - // We use a Relaxed load of the pointer, because the loop above (which - // ends before `self.len()`) should ensure that the data we want is - // already visible, since it Acquired `self.count` which synchronizes - // with the write in `self.push`. - let ptr = unsafe { *self.data[array as usize].get() }; - - // Copy the element value. The copy remaining in the array must not - // be used again (i.e. make sure we do not drop it) - let value = unsafe { ptr.add(offset).read() }; - - vec.push(value); - } - - // Prevent dropping the copied-out values by marking the count as 0 before - // our own drop is run - self.count.store(0, Ordering::Relaxed); - - vec - } -} - -impl Default for AppendOnlyVec { - fn default() -> Self { - Self::new() - } -} - -impl Debug for AppendOnlyVec { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("AppendOnlyVec").field("len", &self.len()).finish() - } -} - -impl Drop for AppendOnlyVec { - fn drop(&mut self) { - // First we'll drop all the `T` in a slightly sloppy way. FIXME this - // could be optimized to avoid reloading the `ptr`. - for idx in 0..self.len() { - let (array, offset) = indices(idx); - // We use a Relaxed load of the pointer, because the loop above (which - // ends before `self.len()`) should ensure that the data we want is - // already visible, since it Acquired `self.count` which synchronizes - // with the write in `self.push`. - let ptr = unsafe { *self.data[array as usize].get() }; - unsafe { - ptr::drop_in_place(ptr.add(offset)); - } - } - // Now we will free all the arrays. - for array in 0..self.data.len() as u32 { - // This load is relaxed because no other thread can have a reference - // to Self because we have a &mut self. - let ptr = unsafe { *self.data[array as usize].get() }; - if !ptr.is_null() { - let layout = Self::layout(array); - unsafe { std::alloc::dealloc(ptr as *mut u8, layout) }; - } else { - break; - } - } - } -} - -impl IntoIterator for AppendOnlyVec { - type Item = T; - type IntoIter = std::vec::IntoIter; - - fn into_iter(self) -> Self::IntoIter { - self.into_vec().into_iter() - } -} - -#[test] -fn test_parallel_pushing() { - use std::sync::Arc; - let v = Arc::new(AppendOnlyVec::::new()); - let mut threads = Vec::new(); - const N: u64 = 100; - for thread_num in 0..N { - let v = v.clone(); - threads.push(std::thread::spawn(move || { - let which1 = v.push(thread_num); - let which2 = v.push(thread_num); - assert_eq!(*which1, thread_num); - assert_eq!(*which2, thread_num); - })); - } - for t in threads { - t.join().unwrap(); - } - let v = Arc::into_inner(v).unwrap().into_vec(); - for thread_num in 0..N { - assert_eq!(2, v.iter().copied().filter(|&x| x == thread_num).count()); - } -} - -#[test] -fn test_into_vec() { - struct SafeToDrop(bool); - - impl Drop for SafeToDrop { - fn drop(&mut self) { - assert!(self.0); - } - } - - let v = AppendOnlyVec::new(); - - for _ in 0..50 { - v.push(SafeToDrop(false)); - } - - let mut v = v.into_vec(); - assert_eq!(v.len(), 50); - - for i in v.iter_mut() { - i.0 = true; - } -} - -#[test] -fn test_push_then_index_mut() { - let v = AppendOnlyVec::::new(); - for i in 0..1024 { - *v.push(i) += 1; - } - - let v = v.into_vec(); - for i in 0..1024 { - assert_eq!(v[i], 2 * i); - } -} diff --git a/milli/src/update/new/extract/faceted/extract_facets.rs b/milli/src/update/new/extract/faceted/extract_facets.rs index 8ffec68f3..f4ad50bfe 100644 --- a/milli/src/update/new/extract/faceted/extract_facets.rs +++ b/milli/src/update/new/extract/faceted/extract_facets.rs @@ -12,7 +12,7 @@ use super::super::cache::CboCachedSorter; use super::facet_document::extract_document_facets; use super::FacetKind; use crate::facet::value_encoding::f64_into_bytes; -use crate::update::new::append_only_vec::AppendOnlyVec; +use crate::update::new::append_only_linked_list::AppendOnlyLinkedList; use crate::update::new::extract::DocidsExtractor; use crate::update::new::parallel_iterator_ext::ParallelIteratorExt; use crate::update::new::DocumentChange; @@ -210,7 +210,7 @@ impl DocidsExtractor for FacetedDocidsExtractor { let attributes_to_extract = Self::attributes_to_extract(&rtxn, index)?; let attributes_to_extract: Vec<_> = attributes_to_extract.iter().map(|s| s.as_ref()).collect(); - let caches = AppendOnlyVec::new(); + let caches = AppendOnlyLinkedList::new(); { let span = diff --git a/milli/src/update/new/extract/searchable/extract_word_docids.rs b/milli/src/update/new/extract/searchable/extract_word_docids.rs index f4346ba52..702b8f4e9 100644 --- a/milli/src/update/new/extract/searchable/extract_word_docids.rs +++ b/milli/src/update/new/extract/searchable/extract_word_docids.rs @@ -9,7 +9,7 @@ use rayon::iter::IntoParallelIterator; use super::tokenize_document::{tokenizer_builder, DocumentTokenizer}; use super::SearchableExtractor; -use crate::update::new::append_only_vec::AppendOnlyVec; +use crate::update::new::append_only_linked_list::AppendOnlyLinkedList; use crate::update::new::extract::cache::CboCachedSorter; use crate::update::new::extract::perm_json_p::contained_in; use crate::update::new::parallel_iterator_ext::ParallelIteratorExt; @@ -341,7 +341,7 @@ impl WordDocidsExtractors { max_positions_per_attributes: MAX_POSITION_PER_ATTRIBUTE, }; - let caches = AppendOnlyVec::new(); + let caches = AppendOnlyLinkedList::new(); { let span = diff --git a/milli/src/update/new/extract/searchable/mod.rs b/milli/src/update/new/extract/searchable/mod.rs index b3fa646b9..ba1d53f54 100644 --- a/milli/src/update/new/extract/searchable/mod.rs +++ b/milli/src/update/new/extract/searchable/mod.rs @@ -14,7 +14,7 @@ use tokenize_document::{tokenizer_builder, DocumentTokenizer}; use super::cache::CboCachedSorter; use super::DocidsExtractor; -use crate::update::new::append_only_vec::AppendOnlyVec; +use crate::update::new::append_only_linked_list::AppendOnlyLinkedList; use crate::update::new::parallel_iterator_ext::ParallelIteratorExt; use crate::update::new::DocumentChange; use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; @@ -58,7 +58,7 @@ pub trait SearchableExtractor { localized_attributes_rules: &localized_attributes_rules, max_positions_per_attributes: MAX_POSITION_PER_ATTRIBUTE, }; - let caches = AppendOnlyVec::new(); + let caches = AppendOnlyLinkedList::new(); { let span = diff --git a/milli/src/update/new/mod.rs b/milli/src/update/new/mod.rs index 264241caa..862dd4dac 100644 --- a/milli/src/update/new/mod.rs +++ b/milli/src/update/new/mod.rs @@ -4,7 +4,7 @@ pub use top_level_map::{CowStr, TopLevelMap}; use super::del_add::DelAdd; use crate::FieldId; -mod append_only_vec; +mod append_only_linked_list; mod channel; mod document_change; mod extract;