From 0a8cb471dff621c963d9d86d18b6d027c893ad1c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Mon, 30 Sep 2024 23:50:43 +0200 Subject: [PATCH] Introduce the AppendOnlyVec struct for the parallel computing --- milli/src/update/new/append_only_vec.rs | 327 ++++++++++++++++++++++++ milli/src/update/new/mod.rs | 1 + 2 files changed, 328 insertions(+) create mode 100644 milli/src/update/new/append_only_vec.rs diff --git a/milli/src/update/new/append_only_vec.rs b/milli/src/update/new/append_only_vec.rs new file mode 100644 index 000000000..fe05dd782 --- /dev/null +++ b/milli/src/update/new/append_only_vec.rs @@ -0,0 +1,327 @@ +// Code taken from +// and modified in order to get a ref mut instead of the index of newly inserted items. + +//! AppendOnlyVec +//! +//! This is a pretty simple type, which is a vector that you can push into and +//! receive a reference to the item you just inserted. The data structure never +//! moves an element once allocated, so you can push to the vec even while holding +//! mutable references to elements that have already been pushed. +//! +//! ### Scaling +//! +//! 1. Accessing an element is O(1), but slightly more expensive than for a +//! standard `Vec`. +//! +//! 2. Pushing a new element amortizes to O(1), but may require allocation of a +//! new chunk. +//! +//! ### Example +//! +//! ``` +//! use append_only_vec::AppendOnlyVec; +//! +//! static V: AppendOnlyVec = AppendOnlyVec::::new(); +//! let mut threads = Vec::new(); +//! for thread_num in 0..10 { +//! threads.push(std::thread::spawn(move || { +//! for n in 0..100 { +//! let s = format!("thread {} says {}", thread_num, n); +//! let which = V.push(s.clone()); +//! assert_eq!(&which, &s); +//! } +//! })); +//! } +//! +//! for t in threads { +//! t.join(); +//! } +//! +//! assert_eq!(V.len(), 1000); +//! ``` + +use std::cell::UnsafeCell; +use std::fmt::Debug; +use std::ptr; +use std::sync::atomic::{AtomicUsize, Ordering}; + +pub struct AppendOnlyVec { + count: AtomicUsize, + _reserved: AtomicUsize, + data: [UnsafeCell<*mut T>; BITS_USED - 1 - 3], +} + +unsafe impl Send for AppendOnlyVec {} +unsafe impl Sync for AppendOnlyVec {} + +const BITS: usize = std::mem::size_of::() * 8; + +#[cfg(target_arch = "x86_64")] +const BITS_USED: usize = 48; +#[cfg(all(not(target_arch = "x86_64"), target_pointer_width = "64"))] +const BITS_USED: usize = 64; +#[cfg(target_pointer_width = "32")] +const BITS_USED: usize = 32; + +// This takes an index into a vec, and determines which data array will hold it +// (the first return value), and what the index will be into that data array +// (second return value) +// +// The ith data array holds 1< (u32, usize) { + let i = i + 8; + let bin = BITS as u32 - 1 - i.leading_zeros(); + let bin = bin - 3; + let offset = i - bin_size(bin); + (bin, offset) +} + +const fn bin_size(array: u32) -> usize { + (1 << 3) << array +} + +#[test] +fn test_indices() { + for i in 0..32 { + println!("{:3}: {} {}", i, indices(i).0, indices(i).1); + } + let mut array = 0; + let mut offset = 0; + let mut index = 0; + while index < 1000 { + index += 1; + offset += 1; + if offset >= bin_size(array) { + offset = 0; + array += 1; + } + assert_eq!(indices(index), (array, offset)); + } +} + +impl Default for AppendOnlyVec { + fn default() -> Self { + Self::new() + } +} + +impl AppendOnlyVec { + const EMPTY: UnsafeCell<*mut T> = UnsafeCell::new(ptr::null_mut()); + + /// Allocate a new empty array. + pub const fn new() -> Self { + AppendOnlyVec { + count: AtomicUsize::new(0), + _reserved: AtomicUsize::new(0), + data: [Self::EMPTY; BITS_USED - 1 - 3], + } + } + + /// Find the length of the array. + #[inline] + pub fn len(&self) -> usize { + self.count.load(Ordering::Acquire) + } + + fn layout(array: u32) -> std::alloc::Layout { + std::alloc::Layout::array::(bin_size(array)).unwrap() + } + + /// Append an element to the array and get a mutable ref to it. + /// + /// This is notable in that it doesn't require a `&mut self`, because it + /// does appropriate atomic synchronization. + pub fn push(&self, val: T) -> &mut T { + let idx = self._reserved.fetch_add(1, Ordering::Relaxed); + let (array, offset) = indices(idx); + let ptr = if self.len() < 1 + idx - offset { + // We are working on a new array, which may not have been allocated... + if offset == 0 { + // It is our job to allocate the array! The size of the array + // is determined in the self.layout method, which needs to be + // consistent with the indices function. + let layout = Self::layout(array); + let ptr = unsafe { std::alloc::alloc(layout) } as *mut T; + unsafe { + *self.data[array as usize].get() = ptr; + } + ptr + } else { + // We need to wait for the array to be allocated. + while self.len() < 1 + idx - offset { + std::hint::spin_loop(); + } + // The Ordering::Acquire semantics of self.len() ensures that + // this pointer read will get the non-null pointer allocated + // above. + unsafe { *self.data[array as usize].get() } + } + } else { + // The Ordering::Acquire semantics of self.len() ensures that + // this pointer read will get the non-null pointer allocated + // above. + unsafe { *self.data[array as usize].get() } + }; + + // The contents of this offset are guaranteed to be unused (so far) + // because we got the idx from our fetch_add above, and ptr is + // guaranteed to be valid because of the loop we used above, which used + // self.len() which has Ordering::Acquire semantics. + unsafe { (ptr.add(offset)).write(val) }; + + // Now we need to increase the size of the vec, so it can get read. We + // use Release upon success, to ensure that the value which we wrote is + // visible to any thread that has confirmed that the count is big enough + // to read that element. In case of failure, we can be relaxed, since + // we don't do anything with the result other than try again. + while self + .count + .compare_exchange(idx, idx + 1, Ordering::Release, Ordering::Relaxed) + .is_err() + { + // This means that someone else *started* pushing before we started, + // but hasn't yet finished. We have to wait for them to finish + // pushing before we can update the count. Note that using a + // spinloop here isn't really ideal, but except when allocating a + // new array, the window between reserving space and using it is + // pretty small, so contention will hopefully be rare, and having a + // context switch during that interval will hopefully be vanishingly + // unlikely. + std::hint::spin_loop(); + } + + unsafe { &mut *ptr } + } + + /// Convert into a standard `Vec`. + pub fn into_vec(self) -> Vec { + let mut vec = Vec::with_capacity(self.len()); + + for idx in 0..self.len() { + let (array, offset) = indices(idx); + // We use a Relaxed load of the pointer, because the loop above (which + // ends before `self.len()`) should ensure that the data we want is + // already visible, since it Acquired `self.count` which synchronizes + // with the write in `self.push`. + let ptr = unsafe { *self.data[array as usize].get() }; + + // Copy the element value. The copy remaining in the array must not + // be used again (i.e. make sure we do not drop it) + let value = unsafe { ptr.add(offset).read() }; + + vec.push(value); + } + + // Prevent dropping the copied-out values by marking the count as 0 before + // our own drop is run + self.count.store(0, Ordering::Relaxed); + + vec + } +} + +impl Debug for AppendOnlyVec { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("AppendOnlyVec").field("len", &self.len()).finish() + } +} + +impl Drop for AppendOnlyVec { + fn drop(&mut self) { + // First we'll drop all the `T` in a slightly sloppy way. FIXME this + // could be optimized to avoid reloading the `ptr`. + for idx in 0..self.len() { + let (array, offset) = indices(idx); + // We use a Relaxed load of the pointer, because the loop above (which + // ends before `self.len()`) should ensure that the data we want is + // already visible, since it Acquired `self.count` which synchronizes + // with the write in `self.push`. + let ptr = unsafe { *self.data[array as usize].get() }; + unsafe { + ptr::drop_in_place(ptr.add(offset)); + } + } + // Now we will free all the arrays. + for array in 0..self.data.len() as u32 { + // This load is relaxed because no other thread can have a reference + // to Self because we have a &mut self. + let ptr = unsafe { *self.data[array as usize].get() }; + if !ptr.is_null() { + let layout = Self::layout(array); + unsafe { std::alloc::dealloc(ptr as *mut u8, layout) }; + } else { + break; + } + } + } +} + +impl IntoIterator for AppendOnlyVec { + type Item = T; + type IntoIter = std::vec::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.into_vec().into_iter() + } +} + +#[test] +fn test_parallel_pushing() { + use std::sync::Arc; + let v = Arc::new(AppendOnlyVec::::new()); + let mut threads = Vec::new(); + const N: u64 = 100; + for thread_num in 0..N { + let v = v.clone(); + threads.push(std::thread::spawn(move || { + let which1 = v.push(thread_num); + let which2 = v.push(thread_num); + assert_eq!(*which1, thread_num); + assert_eq!(*which2, thread_num); + })); + } + for t in threads { + t.join().unwrap(); + } + let v = Arc::into_inner(v).unwrap().into_vec(); + for thread_num in 0..N { + assert_eq!(2, v.iter().copied().filter(|&x| x == thread_num).count()); + } +} + +#[test] +fn test_into_vec() { + struct SafeToDrop(bool); + + impl Drop for SafeToDrop { + fn drop(&mut self) { + assert!(self.0); + } + } + + let v = AppendOnlyVec::new(); + + for _ in 0..50 { + v.push(SafeToDrop(false)); + } + + let mut v = v.into_vec(); + assert_eq!(v.len(), 50); + + for i in v.iter_mut() { + i.0 = true; + } +} + +#[test] +fn test_push_then_index_mut() { + let v = AppendOnlyVec::::new(); + for i in 0..1024 { + *v.push(i) += 1; + } + + let v = v.into_vec(); + for i in 0..1024 { + assert_eq!(v[i], 2 * i); + } +} diff --git a/milli/src/update/new/mod.rs b/milli/src/update/new/mod.rs index 98b60378f..6a48e0407 100644 --- a/milli/src/update/new/mod.rs +++ b/milli/src/update/new/mod.rs @@ -5,6 +5,7 @@ pub use top_level_map::{CowStr, TopLevelMap}; use super::del_add::DelAdd; use crate::FieldId; +mod append_only_vec; mod channel; mod document_change; mod extract;