Merge branch 'fix-append-only-vec' into indexer-edition-2024

This commit is contained in:
Clément Renault 2024-10-08 10:32:45 +02:00
commit 2230674c0a
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
7 changed files with 75 additions and 211 deletions

5
Cargo.lock generated
View File

@ -3598,6 +3598,7 @@ dependencies = [
"smartstring", "smartstring",
"tempfile", "tempfile",
"thiserror", "thiserror",
"thread_local",
"tiktoken-rs", "tiktoken-rs",
"time", "time",
"tokenizers", "tokenizers",
@ -5332,9 +5333,9 @@ dependencies = [
[[package]] [[package]]
name = "thread_local" name = "thread_local"
version = "1.1.7" version = "1.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3fdd6f064ccff2d6567adcb3873ca630700f00b5ad3f060c25b5dcfd9a4ce152" checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"once_cell", "once_cell",

View File

@ -89,6 +89,7 @@ ureq = { version = "2.10.0", features = ["json"] }
url = "2.5.2" url = "2.5.2"
rayon-par-bridge = "0.1.0" rayon-par-bridge = "0.1.0"
hashbrown = "0.14.5" hashbrown = "0.14.5"
thread_local = "1.1.8"
[dev-dependencies] [dev-dependencies]
mimalloc = { version = "0.1.43", default-features = false } mimalloc = { version = "0.1.43", default-features = false }

View File

@ -1,158 +0,0 @@
use std::sync::atomic::AtomicPtr;
use std::{fmt, mem};
/// An append-only linked-list that returns a mutable references to the pushed items.
pub struct AppendOnlyLinkedList<T> {
head: AtomicPtr<Node<T>>,
}
struct Node<T> {
item: T,
next: AtomicPtr<Node<T>>,
}
impl<T> AppendOnlyLinkedList<T> {
/// Creates an empty list.
pub fn new() -> AppendOnlyLinkedList<T> {
AppendOnlyLinkedList { head: AtomicPtr::default() }
}
/// Pushes the item at the front of the linked-list and returns a unique and mutable reference to it.
#[allow(clippy::mut_from_ref)] // the mut ref is derived from T and unique each time
pub fn push(&self, item: T) -> &mut T {
use std::sync::atomic::Ordering::{Relaxed, SeqCst};
let node = Box::leak(Box::new(Node { item, next: AtomicPtr::default() }));
let mut head = self.head.load(SeqCst);
loop {
std::hint::spin_loop();
node.next = AtomicPtr::new(head);
match self.head.compare_exchange_weak(head, node, SeqCst, Relaxed) {
Ok(_) => break,
Err(new) => head = new,
}
}
&mut node.item
}
}
impl<T> Default for AppendOnlyLinkedList<T> {
fn default() -> Self {
Self::new()
}
}
impl<T> Drop for AppendOnlyLinkedList<T> {
fn drop(&mut self) {
// Let's use the drop implementation of the IntoIter struct
IntoIter(mem::take(&mut self.head));
}
}
impl<T> fmt::Debug for AppendOnlyLinkedList<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("AppendOnlyLinkedList").finish()
}
}
impl<T> IntoIterator for AppendOnlyLinkedList<T> {
type Item = T;
type IntoIter = IntoIter<T>;
fn into_iter(mut self) -> Self::IntoIter {
IntoIter(mem::take(&mut self.head))
}
}
pub struct IntoIter<T>(AtomicPtr<Node<T>>);
impl<T> Iterator for IntoIter<T> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
let ptr = *self.0.get_mut();
if ptr.is_null() {
None
} else {
let node = unsafe { Box::from_raw(ptr) };
// Let's set the next node to read to be the next of this one
self.0 = node.next;
Some(node.item)
}
}
}
impl<T> Drop for IntoIter<T> {
fn drop(&mut self) {
let mut ptr = *self.0.get_mut();
while !ptr.is_null() {
let mut node = unsafe { Box::from_raw(ptr) };
// Let's set the next node to read to be the next of this one
ptr = *node.next.get_mut();
}
}
}
#[test]
fn test_parallel_pushing() {
use std::sync::Arc;
let v = Arc::new(AppendOnlyLinkedList::<u64>::new());
let mut threads = Vec::new();
const N: u64 = 100;
for thread_num in 0..N {
let v = v.clone();
threads.push(std::thread::spawn(move || {
let which1 = v.push(thread_num);
let which2 = v.push(thread_num);
assert_eq!(*which1, thread_num);
assert_eq!(*which2, thread_num);
}));
}
for t in threads {
t.join().unwrap();
}
let v = Arc::into_inner(v).unwrap().into_iter().collect::<Vec<_>>();
for thread_num in (0..N).rev() {
assert_eq!(2, v.iter().copied().filter(|&x| x == thread_num).count());
}
}
#[test]
fn test_into_vec() {
struct SafeToDrop(bool);
impl Drop for SafeToDrop {
fn drop(&mut self) {
assert!(self.0);
}
}
let v = AppendOnlyLinkedList::new();
for _ in 0..50 {
v.push(SafeToDrop(false));
}
let mut v = v.into_iter().collect::<Vec<_>>();
assert_eq!(v.len(), 50);
for i in v.iter_mut() {
i.0 = true;
}
}
#[test]
fn test_push_then_index_mut() {
let v = AppendOnlyLinkedList::<usize>::new();
let mut w = Vec::new();
for i in 0..1024 {
*v.push(i) += 1;
w.push(i + 1);
}
let mut v = v.into_iter().collect::<Vec<_>>();
v.reverse();
assert_eq!(v, w);
}

View File

@ -1,3 +1,4 @@
use std::cell::RefCell;
use std::collections::HashSet; use std::collections::HashSet;
use std::fmt::Debug; use std::fmt::Debug;
use std::fs::File; use std::fs::File;
@ -7,12 +8,12 @@ use grenad::{MergeFunction, Merger};
use heed::RoTxn; use heed::RoTxn;
use rayon::iter::{IntoParallelIterator, ParallelBridge, ParallelIterator}; use rayon::iter::{IntoParallelIterator, ParallelBridge, ParallelIterator};
use serde_json::Value; use serde_json::Value;
use thread_local::ThreadLocal;
use super::super::cache::CboCachedSorter; use super::super::cache::CboCachedSorter;
use super::facet_document::extract_document_facets; use super::facet_document::extract_document_facets;
use super::FacetKind; use super::FacetKind;
use crate::facet::value_encoding::f64_into_bytes; use crate::facet::value_encoding::f64_into_bytes;
use crate::update::new::append_only_linked_list::AppendOnlyLinkedList;
use crate::update::new::extract::DocidsExtractor; use crate::update::new::extract::DocidsExtractor;
use crate::update::new::parallel_iterator_ext::ParallelIteratorExt; use crate::update::new::parallel_iterator_ext::ParallelIteratorExt;
use crate::update::new::DocumentChange; use crate::update::new::DocumentChange;
@ -210,7 +211,7 @@ impl DocidsExtractor for FacetedDocidsExtractor {
let attributes_to_extract = Self::attributes_to_extract(&rtxn, index)?; let attributes_to_extract = Self::attributes_to_extract(&rtxn, index)?;
let attributes_to_extract: Vec<_> = let attributes_to_extract: Vec<_> =
attributes_to_extract.iter().map(|s| s.as_ref()).collect(); attributes_to_extract.iter().map(|s| s.as_ref()).collect();
let caches = AppendOnlyLinkedList::new(); let thread_local = ThreadLocal::with_capacity(rayon::current_num_threads());
{ {
let span = let span =
@ -218,9 +219,10 @@ impl DocidsExtractor for FacetedDocidsExtractor {
let _entered = span.enter(); let _entered = span.enter();
document_changes.into_par_iter().try_arc_for_each_try_init( document_changes.into_par_iter().try_arc_for_each_try_init(
|| { || {
thread_local.get_or_try(|| {
let rtxn = index.read_txn().map_err(Error::from)?; let rtxn = index.read_txn().map_err(Error::from)?;
let cache = caches.push(CboCachedSorter::new( let cache = CboCachedSorter::new(
// TODO use a better value /// TODO use a better value
100.try_into().unwrap(), 100.try_into().unwrap(),
create_sorter( create_sorter(
grenad::SortAlgorithm::Stable, grenad::SortAlgorithm::Stable,
@ -230,10 +232,12 @@ impl DocidsExtractor for FacetedDocidsExtractor {
indexer.max_nb_chunks, indexer.max_nb_chunks,
max_memory, max_memory,
), ),
)); );
Ok((rtxn, fields_ids_map.clone(), Vec::new(), cache)) Ok((rtxn, RefCell::new((fields_ids_map.clone(), Vec::new(), cache))))
})
}, },
|(rtxn, fields_ids_map, buffer, cached_sorter), document_change| { |(rtxn, rc), document_change| {
let (fields_ids_map, buffer, cached_sorter) = &mut *rc.borrow_mut();
Self::extract_document_change( Self::extract_document_change(
rtxn, rtxn,
index, index,
@ -253,10 +257,11 @@ impl DocidsExtractor for FacetedDocidsExtractor {
tracing::trace_span!(target: "indexing::documents::extract", "merger_building"); tracing::trace_span!(target: "indexing::documents::extract", "merger_building");
let _entered = span.enter(); let _entered = span.enter();
let readers: Vec<_> = caches let readers: Vec<_> = thread_local
.into_iter() .into_iter()
.par_bridge() .par_bridge()
.map(|cached_sorter| { .map(|(_, rc)| {
let (_, _, cached_sorter) = rc.into_inner();
let sorter = cached_sorter.into_sorter()?; let sorter = cached_sorter.into_sorter()?;
sorter.into_reader_cursors() sorter.into_reader_cursors()
}) })

View File

@ -1,3 +1,4 @@
use std::cell::RefCell;
use std::collections::HashMap; use std::collections::HashMap;
use std::fs::File; use std::fs::File;
use std::num::NonZero; use std::num::NonZero;
@ -6,10 +7,10 @@ use std::sync::Arc;
use grenad::{Merger, MergerBuilder}; use grenad::{Merger, MergerBuilder};
use heed::RoTxn; use heed::RoTxn;
use rayon::iter::IntoParallelIterator; use rayon::iter::IntoParallelIterator;
use thread_local::ThreadLocal;
use super::tokenize_document::{tokenizer_builder, DocumentTokenizer}; use super::tokenize_document::{tokenizer_builder, DocumentTokenizer};
use super::SearchableExtractor; use super::SearchableExtractor;
use crate::update::new::append_only_linked_list::AppendOnlyLinkedList;
use crate::update::new::extract::cache::CboCachedSorter; use crate::update::new::extract::cache::CboCachedSorter;
use crate::update::new::extract::perm_json_p::contained_in; use crate::update::new::extract::perm_json_p::contained_in;
use crate::update::new::parallel_iterator_ext::ParallelIteratorExt; use crate::update::new::parallel_iterator_ext::ParallelIteratorExt;
@ -341,7 +342,7 @@ impl WordDocidsExtractors {
max_positions_per_attributes: MAX_POSITION_PER_ATTRIBUTE, max_positions_per_attributes: MAX_POSITION_PER_ATTRIBUTE,
}; };
let caches = AppendOnlyLinkedList::new(); let thread_local = ThreadLocal::with_capacity(rayon::current_num_threads());
{ {
let span = let span =
@ -349,16 +350,20 @@ impl WordDocidsExtractors {
let _entered = span.enter(); let _entered = span.enter();
document_changes.into_par_iter().try_arc_for_each_try_init( document_changes.into_par_iter().try_arc_for_each_try_init(
|| { || {
thread_local.get_or_try(|| {
let rtxn = index.read_txn().map_err(Error::from)?; let rtxn = index.read_txn().map_err(Error::from)?;
let cache = caches.push(WordDocidsCachedSorters::new( let fields_ids_map = fields_ids_map.clone();
let cache = WordDocidsCachedSorters::new(
indexer, indexer,
max_memory, max_memory,
// TODO use a better value // TODO use a better value
200_000.try_into().unwrap(), 200_000.try_into().unwrap(),
)); );
Ok((rtxn, &document_tokenizer, fields_ids_map.clone(), cache)) Ok((rtxn, &document_tokenizer, RefCell::new((fields_ids_map, cache))))
})
}, },
|(rtxn, document_tokenizer, fields_ids_map, cached_sorter), document_change| { |(rtxn, document_tokenizer, rc), document_change| {
let (fields_ids_map, cached_sorter) = &mut *rc.borrow_mut();
Self::extract_document_change( Self::extract_document_change(
rtxn, rtxn,
index, index,
@ -377,7 +382,8 @@ impl WordDocidsExtractors {
tracing::trace_span!(target: "indexing::documents::extract", "merger_building"); tracing::trace_span!(target: "indexing::documents::extract", "merger_building");
let _entered = span.enter(); let _entered = span.enter();
let mut builder = WordDocidsMergerBuilders::new(); let mut builder = WordDocidsMergerBuilders::new();
for cache in caches.into_iter() { for (_, _, rc) in thread_local.into_iter() {
let (_, cache) = rc.into_inner();
builder.add_sorters(cache)?; builder.add_sorters(cache)?;
} }

View File

@ -2,6 +2,7 @@ mod extract_word_docids;
mod extract_word_pair_proximity_docids; mod extract_word_pair_proximity_docids;
mod tokenize_document; mod tokenize_document;
use std::cell::RefCell;
use std::fs::File; use std::fs::File;
use std::sync::Arc; use std::sync::Arc;
@ -10,11 +11,11 @@ pub use extract_word_pair_proximity_docids::WordPairProximityDocidsExtractor;
use grenad::Merger; use grenad::Merger;
use heed::RoTxn; use heed::RoTxn;
use rayon::iter::{IntoParallelIterator, ParallelBridge, ParallelIterator}; use rayon::iter::{IntoParallelIterator, ParallelBridge, ParallelIterator};
use thread_local::ThreadLocal;
use tokenize_document::{tokenizer_builder, DocumentTokenizer}; use tokenize_document::{tokenizer_builder, DocumentTokenizer};
use super::cache::CboCachedSorter; use super::cache::CboCachedSorter;
use super::DocidsExtractor; use super::DocidsExtractor;
use crate::update::new::append_only_linked_list::AppendOnlyLinkedList;
use crate::update::new::parallel_iterator_ext::ParallelIteratorExt; use crate::update::new::parallel_iterator_ext::ParallelIteratorExt;
use crate::update::new::DocumentChange; use crate::update::new::DocumentChange;
use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps};
@ -58,7 +59,8 @@ pub trait SearchableExtractor {
localized_attributes_rules: &localized_attributes_rules, localized_attributes_rules: &localized_attributes_rules,
max_positions_per_attributes: MAX_POSITION_PER_ATTRIBUTE, max_positions_per_attributes: MAX_POSITION_PER_ATTRIBUTE,
}; };
let caches = AppendOnlyLinkedList::new();
let thread_local = ThreadLocal::with_capacity(rayon::current_num_threads());
{ {
let span = let span =
@ -66,9 +68,10 @@ pub trait SearchableExtractor {
let _entered = span.enter(); let _entered = span.enter();
document_changes.into_par_iter().try_arc_for_each_try_init( document_changes.into_par_iter().try_arc_for_each_try_init(
|| { || {
thread_local.get_or_try(|| {
let rtxn = index.read_txn().map_err(Error::from)?; let rtxn = index.read_txn().map_err(Error::from)?;
let cache = caches.push(CboCachedSorter::new( let cache = CboCachedSorter::new(
// TODO use a better value /// TODO use a better value
1_000_000.try_into().unwrap(), 1_000_000.try_into().unwrap(),
create_sorter( create_sorter(
grenad::SortAlgorithm::Stable, grenad::SortAlgorithm::Stable,
@ -78,10 +81,16 @@ pub trait SearchableExtractor {
indexer.max_nb_chunks, indexer.max_nb_chunks,
max_memory, max_memory,
), ),
)); );
Ok((rtxn, &document_tokenizer, fields_ids_map.clone(), cache)) Ok((
rtxn,
&document_tokenizer,
RefCell::new((fields_ids_map.clone(), cache)),
))
})
}, },
|(rtxn, document_tokenizer, fields_ids_map, cached_sorter), document_change| { |(rtxn, document_tokenizer, rc), document_change| {
let (fields_ids_map, cached_sorter) = &mut *rc.borrow_mut();
Self::extract_document_change( Self::extract_document_change(
rtxn, rtxn,
index, index,
@ -100,10 +109,11 @@ pub trait SearchableExtractor {
tracing::trace_span!(target: "indexing::documents::extract", "merger_building"); tracing::trace_span!(target: "indexing::documents::extract", "merger_building");
let _entered = span.enter(); let _entered = span.enter();
let readers: Vec<_> = caches let readers: Vec<_> = thread_local
.into_iter() .into_iter()
.par_bridge() .par_bridge()
.map(|cached_sorter| { .map(|(_, _, rc)| {
let (_, cached_sorter) = rc.into_inner();
let sorter = cached_sorter.into_sorter()?; let sorter = cached_sorter.into_sorter()?;
sorter.into_reader_cursors() sorter.into_reader_cursors()
}) })

View File

@ -4,7 +4,6 @@ pub use top_level_map::{CowStr, TopLevelMap};
use super::del_add::DelAdd; use super::del_add::DelAdd;
use crate::FieldId; use crate::FieldId;
mod append_only_linked_list;
mod channel; mod channel;
mod document_change; mod document_change;
mod extract; mod extract;