diff --git a/milli/src/update/new/append_only_vec.rs b/milli/src/update/new/append_only_vec.rs new file mode 100644 index 000000000..d4a30c1b1 --- /dev/null +++ b/milli/src/update/new/append_only_vec.rs @@ -0,0 +1,327 @@ +// Code taken from +// and modified in order to get a ref mut instead of the index of newly inserted items. + +//! AppendOnlyVec +//! +//! This is a pretty simple type, which is a vector that you can push into and +//! receive a reference to the item you just inserted. The data structure never +//! moves an element once allocated, so you can push to the vec even while holding +//! mutable references to elements that have already been pushed. +//! +//! ### Scaling +//! +//! 1. Accessing an element is O(1), but slightly more expensive than for a +//! standard `Vec`. +//! +//! 2. Pushing a new element amortizes to O(1), but may require allocation of a +//! new chunk. +//! +//! ### Example +//! +//! ``` +//! use append_only_vec::AppendOnlyVec; +//! +//! static V: AppendOnlyVec = AppendOnlyVec::::new(); +//! let mut threads = Vec::new(); +//! for thread_num in 0..10 { +//! threads.push(std::thread::spawn(move || { +//! for n in 0..100 { +//! let s = format!("thread {} says {}", thread_num, n); +//! let which = V.push(s.clone()); +//! assert_eq!(&which, &s); +//! } +//! })); +//! } +//! +//! for t in threads { +//! t.join(); +//! } +//! +//! assert_eq!(V.len(), 1000); +//! ``` + +use std::cell::UnsafeCell; +use std::fmt::Debug; +use std::ptr; +use std::sync::atomic::{AtomicUsize, Ordering}; + +pub struct AppendOnlyVec { + count: AtomicUsize, + _reserved: AtomicUsize, + data: [UnsafeCell<*mut T>; BITS_USED - 1 - 3], +} + +unsafe impl Send for AppendOnlyVec {} +unsafe impl Sync for AppendOnlyVec {} + +const BITS: usize = std::mem::size_of::() * 8; + +#[cfg(target_arch = "x86_64")] +const BITS_USED: usize = 48; +#[cfg(all(not(target_arch = "x86_64"), target_pointer_width = "64"))] +const BITS_USED: usize = 64; +#[cfg(target_pointer_width = "32")] +const BITS_USED: usize = 32; + +// This takes an index into a vec, and determines which data array will hold it +// (the first return value), and what the index will be into that data array +// (second return value) +// +// The ith data array holds 1< (u32, usize) { + let i = i + 8; + let bin = BITS as u32 - 1 - i.leading_zeros(); + let bin = bin - 3; + let offset = i - bin_size(bin); + (bin, offset) +} + +const fn bin_size(array: u32) -> usize { + (1 << 3) << array +} + +#[test] +fn test_indices() { + for i in 0..32 { + println!("{:3}: {} {}", i, indices(i).0, indices(i).1); + } + let mut array = 0; + let mut offset = 0; + let mut index = 0; + while index < 1000 { + index += 1; + offset += 1; + if offset >= bin_size(array) { + offset = 0; + array += 1; + } + assert_eq!(indices(index), (array, offset)); + } +} + +impl AppendOnlyVec { + const EMPTY: UnsafeCell<*mut T> = UnsafeCell::new(ptr::null_mut()); + + /// Allocate a new empty array. + pub const fn new() -> Self { + AppendOnlyVec { + count: AtomicUsize::new(0), + _reserved: AtomicUsize::new(0), + data: [Self::EMPTY; BITS_USED - 1 - 3], + } + } + + /// Find the length of the array. + #[inline] + pub fn len(&self) -> usize { + self.count.load(Ordering::Acquire) + } + + fn layout(array: u32) -> std::alloc::Layout { + std::alloc::Layout::array::(bin_size(array)).unwrap() + } + + /// Append an element to the array and get a mutable ref to it. + /// + /// This is notable in that it doesn't require a `&mut self`, because it + /// does appropriate atomic synchronization. + pub fn push(&self, val: T) -> &mut T { + let idx = self._reserved.fetch_add(1, Ordering::Relaxed); + let (array, offset) = indices(idx); + let ptr = if self.len() < 1 + idx - offset { + // We are working on a new array, which may not have been allocated... + if offset == 0 { + // It is our job to allocate the array! The size of the array + // is determined in the self.layout method, which needs to be + // consistent with the indices function. + let layout = Self::layout(array); + let ptr = unsafe { std::alloc::alloc(layout) } as *mut T; + unsafe { + *self.data[array as usize].get() = ptr; + } + ptr + } else { + // We need to wait for the array to be allocated. + while self.len() < 1 + idx - offset { + std::hint::spin_loop(); + } + // The Ordering::Acquire semantics of self.len() ensures that + // this pointer read will get the non-null pointer allocated + // above. + unsafe { *self.data[array as usize].get() } + } + } else { + // The Ordering::Acquire semantics of self.len() ensures that + // this pointer read will get the non-null pointer allocated + // above. + unsafe { *self.data[array as usize].get() } + }; + + // The contents of this offset are guaranteed to be unused (so far) + // because we got the idx from our fetch_add above, and ptr is + // guaranteed to be valid because of the loop we used above, which used + // self.len() which has Ordering::Acquire semantics. + unsafe { (ptr.add(offset)).write(val) }; + + // Now we need to increase the size of the vec, so it can get read. We + // use Release upon success, to ensure that the value which we wrote is + // visible to any thread that has confirmed that the count is big enough + // to read that element. In case of failure, we can be relaxed, since + // we don't do anything with the result other than try again. + while self + .count + .compare_exchange(idx, idx + 1, Ordering::Release, Ordering::Relaxed) + .is_err() + { + // This means that someone else *started* pushing before we started, + // but hasn't yet finished. We have to wait for them to finish + // pushing before we can update the count. Note that using a + // spinloop here isn't really ideal, but except when allocating a + // new array, the window between reserving space and using it is + // pretty small, so contention will hopefully be rare, and having a + // context switch during that interval will hopefully be vanishingly + // unlikely. + std::hint::spin_loop(); + } + + unsafe { &mut *ptr } + } + + /// Convert into a standard `Vec`. + pub fn into_vec(self) -> Vec { + let mut vec = Vec::with_capacity(self.len()); + + for idx in 0..self.len() { + let (array, offset) = indices(idx); + // We use a Relaxed load of the pointer, because the loop above (which + // ends before `self.len()`) should ensure that the data we want is + // already visible, since it Acquired `self.count` which synchronizes + // with the write in `self.push`. + let ptr = unsafe { *self.data[array as usize].get() }; + + // Copy the element value. The copy remaining in the array must not + // be used again (i.e. make sure we do not drop it) + let value = unsafe { ptr.add(offset).read() }; + + vec.push(value); + } + + // Prevent dropping the copied-out values by marking the count as 0 before + // our own drop is run + self.count.store(0, Ordering::Relaxed); + + vec + } +} + +impl Default for AppendOnlyVec { + fn default() -> Self { + Self::new() + } +} + +impl Debug for AppendOnlyVec { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("AppendOnlyVec").field("len", &self.len()).finish() + } +} + +impl Drop for AppendOnlyVec { + fn drop(&mut self) { + // First we'll drop all the `T` in a slightly sloppy way. FIXME this + // could be optimized to avoid reloading the `ptr`. + for idx in 0..self.len() { + let (array, offset) = indices(idx); + // We use a Relaxed load of the pointer, because the loop above (which + // ends before `self.len()`) should ensure that the data we want is + // already visible, since it Acquired `self.count` which synchronizes + // with the write in `self.push`. + let ptr = unsafe { *self.data[array as usize].get() }; + unsafe { + ptr::drop_in_place(ptr.add(offset)); + } + } + // Now we will free all the arrays. + for array in 0..self.data.len() as u32 { + // This load is relaxed because no other thread can have a reference + // to Self because we have a &mut self. + let ptr = unsafe { *self.data[array as usize].get() }; + if !ptr.is_null() { + let layout = Self::layout(array); + unsafe { std::alloc::dealloc(ptr as *mut u8, layout) }; + } else { + break; + } + } + } +} + +impl IntoIterator for AppendOnlyVec { + type Item = T; + type IntoIter = std::vec::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.into_vec().into_iter() + } +} + +#[test] +fn test_parallel_pushing() { + use std::sync::Arc; + let v = Arc::new(AppendOnlyVec::::new()); + let mut threads = Vec::new(); + const N: u64 = 100; + for thread_num in 0..N { + let v = v.clone(); + threads.push(std::thread::spawn(move || { + let which1 = v.push(thread_num); + let which2 = v.push(thread_num); + assert_eq!(*which1, thread_num); + assert_eq!(*which2, thread_num); + })); + } + for t in threads { + t.join().unwrap(); + } + let v = Arc::into_inner(v).unwrap().into_vec(); + for thread_num in 0..N { + assert_eq!(2, v.iter().copied().filter(|&x| x == thread_num).count()); + } +} + +#[test] +fn test_into_vec() { + struct SafeToDrop(bool); + + impl Drop for SafeToDrop { + fn drop(&mut self) { + assert!(self.0); + } + } + + let v = AppendOnlyVec::new(); + + for _ in 0..50 { + v.push(SafeToDrop(false)); + } + + let mut v = v.into_vec(); + assert_eq!(v.len(), 50); + + for i in v.iter_mut() { + i.0 = true; + } +} + +#[test] +fn test_push_then_index_mut() { + let v = AppendOnlyVec::::new(); + for i in 0..1024 { + *v.push(i) += 1; + } + + let v = v.into_vec(); + for i in 0..1024 { + assert_eq!(v[i], 2 * i); + } +} diff --git a/milli/src/update/new/extract/faceted/extract_facets.rs b/milli/src/update/new/extract/faceted/extract_facets.rs index 41bce2215..e4e6f7010 100644 --- a/milli/src/update/new/extract/faceted/extract_facets.rs +++ b/milli/src/update/new/extract/faceted/extract_facets.rs @@ -1,6 +1,7 @@ use std::collections::HashSet; use std::fmt::Debug; use std::fs::File; +use std::sync::Arc; use grenad::{MergeFunction, Merger}; use heed::RoTxn; @@ -11,10 +12,14 @@ use super::super::cache::CboCachedSorter; use super::facet_document::extract_document_facets; use super::FacetKind; use crate::facet::value_encoding::f64_into_bytes; +use crate::update::new::append_only_vec::AppendOnlyVec; use crate::update::new::extract::DocidsExtractor; -use crate::update::new::{DocumentChange, ItemsPool}; +use crate::update::new::parallel_iterator_ext::ParallelIteratorExt; +use crate::update::new::DocumentChange; use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; -use crate::{DocumentId, FieldId, GlobalFieldsIdsMap, Index, Result, MAX_FACET_VALUE_LENGTH}; +use crate::{ + DocumentId, Error, FieldId, GlobalFieldsIdsMap, Index, Result, MAX_FACET_VALUE_LENGTH, +}; pub struct FacetedDocidsExtractor; impl FacetedDocidsExtractor { @@ -195,7 +200,9 @@ impl DocidsExtractor for FacetedDocidsExtractor { index: &Index, fields_ids_map: &GlobalFieldsIdsMap, indexer: GrenadParameters, - document_changes: impl IntoParallelIterator>, + document_changes: impl IntoParallelIterator< + Item = std::result::Result>, + >, ) -> Result> { let max_memory = indexer.max_memory_by_thread(); @@ -203,35 +210,32 @@ impl DocidsExtractor for FacetedDocidsExtractor { let attributes_to_extract = Self::attributes_to_extract(&rtxn, index)?; let attributes_to_extract: Vec<_> = attributes_to_extract.iter().map(|s| s.as_ref()).collect(); - - let context_pool = ItemsPool::new(|| { - Ok(( - index.read_txn()?, - fields_ids_map.clone(), - Vec::new(), - CboCachedSorter::new( - // TODO use a better value - 100.try_into().unwrap(), - create_sorter( - grenad::SortAlgorithm::Stable, - MergeDeladdCboRoaringBitmaps, - indexer.chunk_compression_type, - indexer.chunk_compression_level, - indexer.max_nb_chunks, - max_memory, - ), - ), - )) - }); + let caches = AppendOnlyVec::new(); { let span = tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction"); let _entered = span.enter(); - document_changes.into_par_iter().try_for_each(|document_change| { - context_pool.with(|(rtxn, fields_ids_map, buffer, cached_sorter)| { + document_changes.into_par_iter().try_arc_for_each_try_init( + || { + let rtxn = index.read_txn().map_err(Error::from)?; + let cache = caches.push(CboCachedSorter::new( + // TODO use a better value + 100.try_into().unwrap(), + create_sorter( + grenad::SortAlgorithm::Stable, + MergeDeladdCboRoaringBitmaps, + indexer.chunk_compression_type, + indexer.chunk_compression_level, + indexer.max_nb_chunks, + max_memory, + ), + )); + Ok((rtxn, fields_ids_map.clone(), Vec::new(), cache)) + }, + |(rtxn, fields_ids_map, buffer, cached_sorter), document_change| { Self::extract_document_change( - &*rtxn, + rtxn, index, buffer, fields_ids_map, @@ -239,8 +243,9 @@ impl DocidsExtractor for FacetedDocidsExtractor { cached_sorter, document_change?, ) - }) - })?; + .map_err(Arc::new) + }, + )?; } { let mut builder = grenad::MergerBuilder::new(MergeDeladdCboRoaringBitmaps); @@ -248,14 +253,15 @@ impl DocidsExtractor for FacetedDocidsExtractor { tracing::trace_span!(target: "indexing::documents::extract", "merger_building"); let _entered = span.enter(); - let readers: Vec<_> = context_pool - .into_items() + let readers: Vec<_> = caches + .into_iter() .par_bridge() - .map(|(_rtxn, _tokenizer, _fields_ids_map, cached_sorter)| { + .map(|cached_sorter| { let sorter = cached_sorter.into_sorter()?; sorter.into_reader_cursors() }) .collect(); + for reader in readers { builder.extend(reader?); } diff --git a/milli/src/update/new/extract/mod.rs b/milli/src/update/new/extract/mod.rs index 6e60a4063..c12634563 100644 --- a/milli/src/update/new/extract/mod.rs +++ b/milli/src/update/new/extract/mod.rs @@ -4,6 +4,7 @@ mod lru; mod searchable; use std::fs::File; +use std::sync::Arc; pub use faceted::*; use grenad::Merger; @@ -12,14 +13,16 @@ pub use searchable::*; use super::DocumentChange; use crate::update::{GrenadParameters, MergeDeladdCboRoaringBitmaps}; -use crate::{GlobalFieldsIdsMap, Index, Result}; +use crate::{Error, GlobalFieldsIdsMap, Index, Result}; pub trait DocidsExtractor { fn run_extraction( index: &Index, fields_ids_map: &GlobalFieldsIdsMap, indexer: GrenadParameters, - document_changes: impl IntoParallelIterator>, + document_changes: impl IntoParallelIterator< + Item = std::result::Result>, + >, ) -> Result>; } diff --git a/milli/src/update/new/extract/searchable/extract_word_docids.rs b/milli/src/update/new/extract/searchable/extract_word_docids.rs index caab170a4..f4346ba52 100644 --- a/milli/src/update/new/extract/searchable/extract_word_docids.rs +++ b/milli/src/update/new/extract/searchable/extract_word_docids.rs @@ -1,18 +1,22 @@ use std::collections::HashMap; use std::fs::File; use std::num::NonZero; +use std::sync::Arc; use grenad::{Merger, MergerBuilder}; use heed::RoTxn; -use rayon::iter::{IntoParallelIterator, ParallelIterator}; +use rayon::iter::IntoParallelIterator; use super::tokenize_document::{tokenizer_builder, DocumentTokenizer}; +use super::SearchableExtractor; +use crate::update::new::append_only_vec::AppendOnlyVec; use crate::update::new::extract::cache::CboCachedSorter; use crate::update::new::extract::perm_json_p::contained_in; -use crate::update::new::{DocumentChange, ItemsPool}; +use crate::update::new::parallel_iterator_ext::ParallelIteratorExt; +use crate::update::new::DocumentChange; use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; use crate::{ - bucketed_position, DocumentId, FieldId, GlobalFieldsIdsMap, Index, Result, + bucketed_position, DocumentId, Error, FieldId, GlobalFieldsIdsMap, Index, Result, MAX_POSITION_PER_ATTRIBUTE, }; @@ -303,7 +307,9 @@ impl WordDocidsExtractors { index: &Index, fields_ids_map: &GlobalFieldsIdsMap, indexer: GrenadParameters, - document_changes: impl IntoParallelIterator>, + document_changes: impl IntoParallelIterator< + Item = std::result::Result>, + >, ) -> Result { let max_memory = indexer.max_memory_by_thread(); @@ -335,36 +341,35 @@ impl WordDocidsExtractors { max_positions_per_attributes: MAX_POSITION_PER_ATTRIBUTE, }; - let context_pool = ItemsPool::new(|| { - Ok(( - index.read_txn()?, - &document_tokenizer, - fields_ids_map.clone(), - WordDocidsCachedSorters::new( - indexer, - max_memory, - // TODO use a better value - 200_000.try_into().unwrap(), - ), - )) - }); + let caches = AppendOnlyVec::new(); { let span = tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction"); let _entered = span.enter(); - document_changes.into_par_iter().try_for_each(|document_change| { - context_pool.with(|(rtxn, document_tokenizer, fields_ids_map, cached_sorter)| { + document_changes.into_par_iter().try_arc_for_each_try_init( + || { + let rtxn = index.read_txn().map_err(Error::from)?; + let cache = caches.push(WordDocidsCachedSorters::new( + indexer, + max_memory, + // TODO use a better value + 200_000.try_into().unwrap(), + )); + Ok((rtxn, &document_tokenizer, fields_ids_map.clone(), cache)) + }, + |(rtxn, document_tokenizer, fields_ids_map, cached_sorter), document_change| { Self::extract_document_change( - &*rtxn, + rtxn, index, document_tokenizer, fields_ids_map, cached_sorter, document_change?, ) - }) - })?; + .map_err(Arc::new) + }, + )?; } { @@ -372,7 +377,7 @@ impl WordDocidsExtractors { tracing::trace_span!(target: "indexing::documents::extract", "merger_building"); let _entered = span.enter(); let mut builder = WordDocidsMergerBuilders::new(); - for (_rtxn, _tokenizer, _fields_ids_map, cache) in context_pool.into_items() { + for cache in caches.into_iter() { builder.add_sorters(cache)?; } diff --git a/milli/src/update/new/extract/searchable/mod.rs b/milli/src/update/new/extract/searchable/mod.rs index c79bd4766..b3fa646b9 100644 --- a/milli/src/update/new/extract/searchable/mod.rs +++ b/milli/src/update/new/extract/searchable/mod.rs @@ -3,6 +3,7 @@ mod extract_word_pair_proximity_docids; mod tokenize_document; use std::fs::File; +use std::sync::Arc; pub use extract_word_docids::{WordDocidsExtractors, WordDocidsMergers}; pub use extract_word_pair_proximity_docids::WordPairProximityDocidsExtractor; @@ -13,16 +14,20 @@ use tokenize_document::{tokenizer_builder, DocumentTokenizer}; use super::cache::CboCachedSorter; use super::DocidsExtractor; -use crate::update::new::{DocumentChange, ItemsPool}; +use crate::update::new::append_only_vec::AppendOnlyVec; +use crate::update::new::parallel_iterator_ext::ParallelIteratorExt; +use crate::update::new::DocumentChange; use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; -use crate::{GlobalFieldsIdsMap, Index, Result, MAX_POSITION_PER_ATTRIBUTE}; +use crate::{Error, GlobalFieldsIdsMap, Index, Result, MAX_POSITION_PER_ATTRIBUTE}; pub trait SearchableExtractor { fn run_extraction( index: &Index, fields_ids_map: &GlobalFieldsIdsMap, indexer: GrenadParameters, - document_changes: impl IntoParallelIterator>, + document_changes: impl IntoParallelIterator< + Item = std::result::Result>, + >, ) -> Result> { let max_memory = indexer.max_memory_by_thread(); @@ -53,43 +58,41 @@ pub trait SearchableExtractor { localized_attributes_rules: &localized_attributes_rules, max_positions_per_attributes: MAX_POSITION_PER_ATTRIBUTE, }; - - let context_pool = ItemsPool::new(|| { - Ok(( - index.read_txn()?, - &document_tokenizer, - fields_ids_map.clone(), - CboCachedSorter::new( - // TODO use a better value - 1_000_000.try_into().unwrap(), - create_sorter( - grenad::SortAlgorithm::Stable, - MergeDeladdCboRoaringBitmaps, - indexer.chunk_compression_type, - indexer.chunk_compression_level, - indexer.max_nb_chunks, - max_memory, - ), - ), - )) - }); + let caches = AppendOnlyVec::new(); { let span = tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction"); let _entered = span.enter(); - document_changes.into_par_iter().try_for_each(|document_change| { - context_pool.with(|(rtxn, document_tokenizer, fields_ids_map, cached_sorter)| { + document_changes.into_par_iter().try_arc_for_each_try_init( + || { + let rtxn = index.read_txn().map_err(Error::from)?; + let cache = caches.push(CboCachedSorter::new( + // TODO use a better value + 1_000_000.try_into().unwrap(), + create_sorter( + grenad::SortAlgorithm::Stable, + MergeDeladdCboRoaringBitmaps, + indexer.chunk_compression_type, + indexer.chunk_compression_level, + indexer.max_nb_chunks, + max_memory, + ), + )); + Ok((rtxn, &document_tokenizer, fields_ids_map.clone(), cache)) + }, + |(rtxn, document_tokenizer, fields_ids_map, cached_sorter), document_change| { Self::extract_document_change( - &*rtxn, + rtxn, index, document_tokenizer, fields_ids_map, cached_sorter, document_change?, ) - }) - })?; + .map_err(Arc::new) + }, + )?; } { let mut builder = grenad::MergerBuilder::new(MergeDeladdCboRoaringBitmaps); @@ -97,14 +100,15 @@ pub trait SearchableExtractor { tracing::trace_span!(target: "indexing::documents::extract", "merger_building"); let _entered = span.enter(); - let readers: Vec<_> = context_pool - .into_items() + let readers: Vec<_> = caches + .into_iter() .par_bridge() - .map(|(_rtxn, _tokenizer, _fields_ids_map, cached_sorter)| { + .map(|cached_sorter| { let sorter = cached_sorter.into_sorter()?; sorter.into_reader_cursors() }) .collect(); + for reader in readers { builder.extend(reader?); } @@ -132,7 +136,9 @@ impl DocidsExtractor for T { index: &Index, fields_ids_map: &GlobalFieldsIdsMap, indexer: GrenadParameters, - document_changes: impl IntoParallelIterator>, + document_changes: impl IntoParallelIterator< + Item = std::result::Result>, + >, ) -> Result> { Self::run_extraction(index, fields_ids_map, indexer, document_changes) } diff --git a/milli/src/update/new/indexer/document_deletion.rs b/milli/src/update/new/indexer/document_deletion.rs index bad72d3b2..400b51af6 100644 --- a/milli/src/update/new/indexer/document_deletion.rs +++ b/milli/src/update/new/indexer/document_deletion.rs @@ -1,11 +1,12 @@ use std::sync::Arc; -use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}; +use rayon::iter::{IndexedParallelIterator, IntoParallelIterator}; use roaring::RoaringBitmap; use super::DocumentChanges; -use crate::update::new::{Deletion, DocumentChange, ItemsPool}; -use crate::{FieldsIdsMap, Index, Result}; +use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _; +use crate::update::new::{Deletion, DocumentChange}; +use crate::{Error, FieldsIdsMap, Index, Result}; pub struct DocumentDeletion { pub to_delete: RoaringBitmap, @@ -28,15 +29,19 @@ impl<'p> DocumentChanges<'p> for DocumentDeletion { self, _fields_ids_map: &mut FieldsIdsMap, param: Self::Parameter, - ) -> Result> + Clone + 'p> { + ) -> Result< + impl IndexedParallelIterator>> + + Clone + + 'p, + > { let index = param; - let items = Arc::new(ItemsPool::new(|| index.read_txn().map_err(crate::Error::from))); let to_delete: Vec<_> = self.to_delete.into_iter().collect(); - Ok(to_delete.into_par_iter().map_with(items, |items, docid| { - items.with(|rtxn| { + Ok(to_delete.into_par_iter().try_map_try_init( + || index.read_txn().map_err(crate::Error::from), + |rtxn, docid| { let current = index.document(rtxn, docid)?; Ok(DocumentChange::Deletion(Deletion::create(docid, current.boxed()))) - }) - })) + }, + )) } } diff --git a/milli/src/update/new/indexer/document_operation.rs b/milli/src/update/new/indexer/document_operation.rs index 572ea8528..f9e1bb8f3 100644 --- a/milli/src/update/new/indexer/document_operation.rs +++ b/milli/src/update/new/indexer/document_operation.rs @@ -5,14 +5,14 @@ use std::sync::Arc; use heed::types::Bytes; use heed::RoTxn; use memmap2::Mmap; -use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}; +use rayon::iter::{IndexedParallelIterator, IntoParallelIterator}; use IndexDocumentsMethod as Idm; use super::super::document_change::DocumentChange; -use super::super::items_pool::ItemsPool; use super::super::{CowStr, TopLevelMap}; use super::DocumentChanges; use crate::documents::{DocumentIdExtractionError, PrimaryKey}; +use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _; use crate::update::new::{Deletion, Insertion, KvReaderFieldId, KvWriterFieldId, Update}; use crate::update::{AvailableIds, IndexDocumentsMethod}; use crate::{DocumentId, Error, FieldsIdsMap, Index, Result, UserError}; @@ -73,7 +73,11 @@ impl<'p, 'pl: 'p> DocumentChanges<'p> for DocumentOperation<'pl> { self, fields_ids_map: &mut FieldsIdsMap, param: Self::Parameter, - ) -> Result> + Clone + 'p> { + ) -> Result< + impl IndexedParallelIterator>> + + Clone + + 'p, + > { let (index, rtxn, primary_key) = param; let documents_ids = index.documents_ids(rtxn)?; @@ -199,24 +203,22 @@ impl<'p, 'pl: 'p> DocumentChanges<'p> for DocumentOperation<'pl> { // And finally sort them docids_version_offsets.sort_unstable_by_key(|(_, (_, docops))| sort_function_key(docops)); - Ok(docids_version_offsets.into_par_iter().map_with( - Arc::new(ItemsPool::new(|| index.read_txn().map_err(crate::Error::from))), - move |context_pool, (external_docid, (internal_docid, operations))| { - context_pool.with(|rtxn| { - let document_merge_function = match self.index_documents_method { - Idm::ReplaceDocuments => MergeDocumentForReplacement::merge, - Idm::UpdateDocuments => MergeDocumentForUpdates::merge, - }; + Ok(docids_version_offsets.into_par_iter().try_map_try_init( + || index.read_txn().map_err(Error::from), + move |rtxn, (external_docid, (internal_docid, operations))| { + let document_merge_function = match self.index_documents_method { + Idm::ReplaceDocuments => MergeDocumentForReplacement::merge, + Idm::UpdateDocuments => MergeDocumentForUpdates::merge, + }; - document_merge_function( - rtxn, - index, - &fields_ids_map, - internal_docid, - external_docid.to_string(), // TODO do not clone - &operations, - ) - }) + document_merge_function( + rtxn, + index, + &fields_ids_map, + internal_docid, + external_docid.to_string(), // TODO do not clone + &operations, + ) }, )) } diff --git a/milli/src/update/new/indexer/mod.rs b/milli/src/update/new/indexer/mod.rs index e30333b3a..28165c3a8 100644 --- a/milli/src/update/new/indexer/mod.rs +++ b/milli/src/update/new/indexer/mod.rs @@ -1,4 +1,4 @@ -use std::sync::RwLock; +use std::sync::{Arc, RwLock}; use std::thread::{self, Builder}; use big_s::S; @@ -22,8 +22,9 @@ use super::{StdResult, TopLevelMap}; use crate::documents::{PrimaryKey, DEFAULT_PRIMARY_KEY}; use crate::update::new::channel::ExtractorSender; use crate::update::settings::InnerIndexSettings; +use crate::update::new::parallel_iterator_ext::ParallelIteratorExt; use crate::update::GrenadParameters; -use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, Result, UserError}; +use crate::{Error, FieldsIdsMap, GlobalFieldsIdsMap, Index, Result, UserError}; mod document_deletion; mod document_operation; @@ -37,7 +38,11 @@ pub trait DocumentChanges<'p> { self, fields_ids_map: &mut FieldsIdsMap, param: Self::Parameter, - ) -> Result> + Clone + 'p>; + ) -> Result< + impl IndexedParallelIterator>> + + Clone + + 'p, + >; } /// This is the main function of this crate. @@ -53,7 +58,9 @@ pub fn index( document_changes: PI, ) -> Result<()> where - PI: IndexedParallelIterator> + Send + Clone, + PI: IndexedParallelIterator>> + + Send + + Clone, { let (merger_sender, writer_receiver) = merger_writer_channel(10_000); // This channel acts as a rendezvous point to ensure that we are one task ahead @@ -74,7 +81,8 @@ where // document but we need to create a function that collects and compresses documents. let document_sender = extractor_sender.document_sender(); - document_changes.clone().into_par_iter().try_for_each(|result| { + document_changes.clone().into_par_iter().try_arc_for_each::<_, Error>( + |result| { match result? { DocumentChange::Deletion(deletion) => { let docid = deletion.docid(); @@ -92,7 +100,7 @@ where // extracted_dictionary_sender.send(self, dictionary: &[u8]); } } - Ok(()) as Result<_> + Ok(()) })?; document_sender.finish().unwrap(); @@ -242,7 +250,7 @@ fn extract_and_send_docids( index: &Index, fields_ids_map: &GlobalFieldsIdsMap, indexer: GrenadParameters, - document_changes: impl IntoParallelIterator>, + document_changes: impl IntoParallelIterator>>, sender: &ExtractorSender, ) -> Result<()> { let merger = E::run_extraction(index, fields_ids_map, indexer, document_changes)?; diff --git a/milli/src/update/new/indexer/partial_dump.rs b/milli/src/update/new/indexer/partial_dump.rs index 43a89c46c..325e13cc4 100644 --- a/milli/src/update/new/indexer/partial_dump.rs +++ b/milli/src/update/new/indexer/partial_dump.rs @@ -1,8 +1,11 @@ +use std::sync::Arc; + use rayon::iter::IndexedParallelIterator; use super::DocumentChanges; use crate::documents::{DocumentIdExtractionError, PrimaryKey}; use crate::update::concurrent_available_ids::ConcurrentAvailableIds; +use crate::update::new::parallel_iterator_ext::ParallelIteratorExt; use crate::update::new::{DocumentChange, Insertion, KvWriterFieldId}; use crate::{all_obkv_to_json, Error, FieldsIdsMap, Object, Result, UserError}; @@ -30,44 +33,53 @@ where self, _fields_ids_map: &mut FieldsIdsMap, param: Self::Parameter, - ) -> Result> + Clone + 'p> { + ) -> Result< + impl IndexedParallelIterator>> + + Clone + + 'p, + > { let (fields_ids_map, concurrent_available_ids, primary_key) = param; - Ok(self.iter.map(|object| { - let docid = match concurrent_available_ids.next() { - Some(id) => id, - None => return Err(Error::UserError(UserError::DocumentLimitReached)), - }; + Ok(self.iter.try_map_try_init( + || Ok(()), + |_, object| { + let docid = match concurrent_available_ids.next() { + Some(id) => id, + None => return Err(Error::UserError(UserError::DocumentLimitReached)), + }; - let mut writer = KvWriterFieldId::memory(); - object.iter().for_each(|(key, value)| { - let key = fields_ids_map.id(key).unwrap(); - /// TODO better error management - let value = serde_json::to_vec(&value).unwrap(); - /// TODO it is not ordered - writer.insert(key, value).unwrap(); - }); + let mut writer = KvWriterFieldId::memory(); + object.iter().for_each(|(key, value)| { + let key = fields_ids_map.id(key).unwrap(); + /// TODO better error management + let value = serde_json::to_vec(&value).unwrap(); + /// TODO it is not ordered + writer.insert(key, value).unwrap(); + }); - let document = writer.into_boxed(); - let external_docid = match primary_key.document_id(&document, fields_ids_map)? { - Ok(document_id) => Ok(document_id), - Err(DocumentIdExtractionError::InvalidDocumentId(user_error)) => Err(user_error), - Err(DocumentIdExtractionError::MissingDocumentId) => { - Err(UserError::MissingDocumentId { - primary_key: primary_key.name().to_string(), - document: all_obkv_to_json(&document, fields_ids_map)?, - }) - } - Err(DocumentIdExtractionError::TooManyDocumentIds(_)) => { - Err(UserError::TooManyDocumentIds { - primary_key: primary_key.name().to_string(), - document: all_obkv_to_json(&document, fields_ids_map)?, - }) - } - }?; + let document = writer.into_boxed(); + let external_docid = match primary_key.document_id(&document, fields_ids_map)? { + Ok(document_id) => Ok(document_id), + Err(DocumentIdExtractionError::InvalidDocumentId(user_error)) => { + Err(user_error) + } + Err(DocumentIdExtractionError::MissingDocumentId) => { + Err(UserError::MissingDocumentId { + primary_key: primary_key.name().to_string(), + document: all_obkv_to_json(&document, fields_ids_map)?, + }) + } + Err(DocumentIdExtractionError::TooManyDocumentIds(_)) => { + Err(UserError::TooManyDocumentIds { + primary_key: primary_key.name().to_string(), + document: all_obkv_to_json(&document, fields_ids_map)?, + }) + } + }?; - let insertion = Insertion::create(docid, document); - Ok(DocumentChange::Insertion(insertion)) - })) + let insertion = Insertion::create(docid, document); + Ok(DocumentChange::Insertion(insertion)) + }, + )) } } diff --git a/milli/src/update/new/indexer/update_by_function.rs b/milli/src/update/new/indexer/update_by_function.rs index d4c0f837b..d6d532433 100644 --- a/milli/src/update/new/indexer/update_by_function.rs +++ b/milli/src/update/new/indexer/update_by_function.rs @@ -1,8 +1,10 @@ +use std::sync::Arc; + use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}; use super::DocumentChanges; use crate::update::new::DocumentChange; -use crate::{FieldsIdsMap, Result}; +use crate::{Error, FieldsIdsMap, Result}; pub struct UpdateByFunction; @@ -13,7 +15,11 @@ impl<'p> DocumentChanges<'p> for UpdateByFunction { self, _fields_ids_map: &mut FieldsIdsMap, _param: Self::Parameter, - ) -> Result> + Clone + 'p> { + ) -> Result< + impl IndexedParallelIterator>> + + Clone + + 'p, + > { Ok((0..100).into_par_iter().map(|_| todo!())) } } diff --git a/milli/src/update/new/items_pool.rs b/milli/src/update/new/items_pool.rs deleted file mode 100644 index e90ce97db..000000000 --- a/milli/src/update/new/items_pool.rs +++ /dev/null @@ -1,54 +0,0 @@ -use crossbeam_channel::{Receiver, Sender, TryRecvError}; - -/// A pool of items that can be pull and generated on demand. -pub struct ItemsPool -where - F: Fn() -> Result, -{ - init: F, - sender: Sender, - receiver: Receiver, -} - -impl ItemsPool -where - F: Fn() -> Result, -{ - /// Create a new unbounded items pool with the specified function - /// to generate items when needed. - /// - /// The `init` function will be invoked whenever a call to `with` requires new items. - pub fn new(init: F) -> Self { - let (sender, receiver) = crossbeam_channel::unbounded(); - ItemsPool { init, sender, receiver } - } - - /// Consumes the pool to retrieve all remaining items. - /// - /// This method is useful for cleaning up and managing the items once they are no longer needed. - pub fn into_items(self) -> crossbeam_channel::IntoIter { - self.receiver.into_iter() - } - - /// Allows running a function on an item from the pool, - /// potentially generating a new item if the pool is empty. - pub fn with(&self, f: G) -> Result - where - G: FnOnce(&mut T) -> Result, - { - let mut item = match self.receiver.try_recv() { - Ok(item) => item, - Err(TryRecvError::Empty) => (self.init)()?, - Err(TryRecvError::Disconnected) => unreachable!(), - }; - - // Run the user's closure with the retrieved item - let result = f(&mut item); - - if let Err(e) = self.sender.send(item) { - unreachable!("error when sending into channel {e}"); - } - - result - } -} diff --git a/milli/src/update/new/mod.rs b/milli/src/update/new/mod.rs index 98b60378f..264241caa 100644 --- a/milli/src/update/new/mod.rs +++ b/milli/src/update/new/mod.rs @@ -1,16 +1,16 @@ pub use document_change::{Deletion, DocumentChange, Insertion, Update}; -pub use items_pool::ItemsPool; pub use top_level_map::{CowStr, TopLevelMap}; use super::del_add::DelAdd; use crate::FieldId; +mod append_only_vec; mod channel; mod document_change; mod extract; pub mod indexer; -mod items_pool; mod merger; +mod parallel_iterator_ext; mod top_level_map; mod word_fst_builder; mod words_prefix_docids; diff --git a/milli/src/update/new/parallel_iterator_ext.rs b/milli/src/update/new/parallel_iterator_ext.rs new file mode 100644 index 000000000..043457cfd --- /dev/null +++ b/milli/src/update/new/parallel_iterator_ext.rs @@ -0,0 +1,74 @@ +use std::sync::Arc; + +use rayon::iter::{MapInit, ParallelIterator}; + +pub trait ParallelIteratorExt: ParallelIterator { + /// Maps items based on the init function. + /// + /// The init function is ran only as necessary which is basically once by thread. + fn try_map_try_init( + self, + init: INIT, + map_op: F, + ) -> MapInit< + Self, + impl Fn() -> Result> + Sync + Send + Clone, + impl Fn(&mut Result>, Self::Item) -> Result> + Sync + Send + Clone, + > + where + E: Send + Sync, + F: Fn(&mut T, Self::Item) -> Result + Sync + Send + Clone, + INIT: Fn() -> Result + Sync + Send + Clone, + R: Send, + { + self.map_init( + move || match init() { + Ok(t) => Ok(t), + Err(err) => Err(Arc::new(err)), + }, + move |result, item| match result { + Ok(t) => map_op(t, item).map_err(Arc::new), + Err(err) => Err(err.clone()), + }, + ) + } + + /// A method to run a closure of all the items and return an owned error. + /// + /// The init function is ran only as necessary which is basically once by thread. + fn try_arc_for_each_try_init(self, init: INIT, op: F) -> Result<(), E> + where + E: Send + Sync, + F: Fn(&mut T, Self::Item) -> Result<(), Arc> + Sync + Send + Clone, + INIT: Fn() -> Result + Sync + Send + Clone, + { + let result = self.try_for_each_init( + move || match init() { + Ok(t) => Ok(t), + Err(err) => Err(Arc::new(err)), + }, + move |result, item| match result { + Ok(t) => op(t, item), + Err(err) => Err(err.clone()), + }, + ); + + match result { + Ok(()) => Ok(()), + Err(err) => Err(Arc::into_inner(err).expect("the error must be only owned by us")), + } + } + + fn try_arc_for_each(self, op: F) -> Result<(), E> + where + E: Send + Sync, + F: Fn(Self::Item) -> Result<(), Arc> + Sync + Send + Clone, + { + match self.try_for_each(op) { + Ok(()) => Ok(()), + Err(err) => Err(Arc::into_inner(err).expect("the error must be only owned by us")), + } + } +} + +impl ParallelIteratorExt for T {}