Add borrow_mut_or_yield extension method

This commit is contained in:
Louis Dureuil 2024-10-16 17:36:41 +02:00
parent c75de1f391
commit 0647f75e6b
No known key found for this signature in database
7 changed files with 61 additions and 17 deletions

View File

@ -17,7 +17,7 @@ use crate::facet::value_encoding::f64_into_bytes;
use crate::update::new::extract::DocidsExtractor;
use crate::update::new::indexer::document_changes::{
for_each_document_change, DocumentChangeContext, DocumentChanges, Extractor, FullySend,
IndexingContext, ThreadLocal,
IndexingContext, RefCellExt, ThreadLocal,
};
use crate::update::new::DocumentChange;
use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps};
@ -71,8 +71,8 @@ impl FacetedDocidsExtractor {
) -> Result<()> {
let index = &context.index;
let rtxn = &context.txn;
let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut();
let mut cached_sorter = context.data.0.borrow_mut();
let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield();
let mut cached_sorter = context.data.0.borrow_mut_or_yield();
match document_change {
DocumentChange::Deletion(inner) => extract_document_facets(
attributes_to_extract,

View File

@ -13,7 +13,7 @@ use crate::update::new::extract::cache::CboCachedSorter;
use crate::update::new::extract::perm_json_p::contained_in;
use crate::update::new::indexer::document_changes::{
for_each_document_change, DocumentChangeContext, DocumentChanges, Extractor, FullySend,
IndexingContext, ThreadLocal,
IndexingContext, RefCellExt, ThreadLocal,
};
use crate::update::new::DocumentChange;
use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps};
@ -411,9 +411,9 @@ impl WordDocidsExtractors {
) -> Result<()> {
let index = &context.index;
let rtxn = &context.txn;
let mut cached_sorter = context.data.0.borrow_mut();
let mut cached_sorter = context.data.0.borrow_mut_or_yield();
let cached_sorter = cached_sorter.deref_mut();
let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut();
let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield();
let new_fields_ids_map = new_fields_ids_map.deref_mut();
let exact_attributes = index.exact_attributes(rtxn)?;

View File

@ -9,7 +9,7 @@ use super::SearchableExtractor;
use crate::proximity::{index_proximity, MAX_DISTANCE};
use crate::update::new::document::Document;
use crate::update::new::extract::cache::CboCachedSorter;
use crate::update::new::indexer::document_changes::{DocumentChangeContext, FullySend};
use crate::update::new::indexer::document_changes::{DocumentChangeContext, FullySend, RefCellExt};
use crate::update::new::DocumentChange;
use crate::update::MergeDeladdCboRoaringBitmaps;
use crate::{FieldId, GlobalFieldsIdsMap, Index, Result};
@ -45,10 +45,10 @@ impl SearchableExtractor for WordPairProximityDocidsExtractor {
let mut del_word_pair_proximity = bumpalo::collections::Vec::new_in(doc_alloc);
let mut add_word_pair_proximity = bumpalo::collections::Vec::new_in(doc_alloc);
let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut();
let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield();
let new_fields_ids_map = &mut *new_fields_ids_map;
let mut cached_sorter = context.data.0.borrow_mut();
let mut cached_sorter = context.data.0.borrow_mut_or_yield();
let cached_sorter = &mut *cached_sorter;
// is a vecdequeue, and will be smol, so can stay on the heap for now

View File

@ -1,4 +1,4 @@
use std::cell::{Cell, RefCell};
use std::cell::{Cell, Ref, RefCell, RefMut};
use std::sync::{Arc, RwLock};
use bumpalo::Bump;
@ -10,6 +10,49 @@ use super::super::document_change::DocumentChange;
use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _;
use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, Result};
pub trait RefCellExt<T: ?Sized> {
fn try_borrow_or_yield(&self) -> std::result::Result<Ref<'_, T>, std::cell::BorrowError>;
fn try_borrow_mut_or_yield(
&self,
) -> std::result::Result<RefMut<'_, T>, std::cell::BorrowMutError>;
fn borrow_or_yield(&self) -> Ref<'_, T> {
self.try_borrow_or_yield().unwrap()
}
fn borrow_mut_or_yield(&self) -> RefMut<'_, T> {
self.try_borrow_mut_or_yield().unwrap()
}
}
impl<T: ?Sized> RefCellExt<T> for RefCell<T> {
fn try_borrow_or_yield(&self) -> std::result::Result<Ref<'_, T>, std::cell::BorrowError> {
loop {
match self.try_borrow() {
Ok(borrow) => break Ok(borrow),
Err(error) => match rayon::yield_local() {
Some(rayon::Yield::Executed) => continue,
_ => return Err(error),
},
}
}
}
fn try_borrow_mut_or_yield(
&self,
) -> std::result::Result<RefMut<'_, T>, std::cell::BorrowMutError> {
loop {
match self.try_borrow_mut() {
Ok(borrow) => break Ok(borrow),
Err(error) => match rayon::yield_local() {
Some(rayon::Yield::Executed) => continue,
_ => return Err(error),
},
}
}
}
}
/// A trait for types that are **not** [`Send`] only because they would then allow concurrent access to a type that is not [`Sync`].
///
/// The primary example of such a type is `&T`, with `T: !Sync`.
@ -245,7 +288,7 @@ impl<
let fields_ids_map = &fields_ids_map.0;
let extractor_alloc = extractor_allocs.get_or_default();
let extractor_alloc = RefBump::new(extractor_alloc.0.borrow());
let extractor_alloc = RefBump::new(extractor_alloc.0.borrow_or_yield());
let data = datastore.get_or_try(|| init_data(RefBump::clone(&extractor_alloc)))?;

View File

@ -5,7 +5,8 @@ use std::thread::{self, Builder};
use big_s::S;
use bumpalo::Bump;
use document_changes::{
for_each_document_change, DocumentChanges, Extractor, FullySend, IndexingContext, ThreadLocal,
for_each_document_change, DocumentChanges, Extractor, FullySend, IndexingContext, RefCellExt,
ThreadLocal,
};
pub use document_deletion::DocumentDeletion;
pub use document_operation::DocumentOperation;
@ -62,7 +63,7 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentExtractor<'a> {
) -> Result<()> {
let mut document_buffer = Vec::new();
let new_fields_ids_map = context.new_fields_ids_map.borrow();
let new_fields_ids_map = context.new_fields_ids_map.borrow_or_yield();
let new_fields_ids_map = &*new_fields_ids_map;
let new_fields_ids_map = new_fields_ids_map.local_map();

View File

@ -5,7 +5,7 @@ use serde::Deserializer;
use serde_json::value::RawValue;
use super::de::FieldAndDocidExtractor;
use super::document_changes::{DocumentChangeContext, DocumentChanges, MostlySend};
use super::document_changes::{DocumentChangeContext, DocumentChanges, MostlySend, RefCellExt};
use crate::documents::{DocumentIdExtractionError, PrimaryKey};
use crate::update::concurrent_available_ids::ConcurrentAvailableIds;
use crate::update::new::document::DocumentFromVersions;
@ -63,7 +63,7 @@ where
None => return Err(Error::UserError(UserError::DocumentLimitReached)),
};
let mut fields_ids_map = context.new_fields_ids_map.borrow_mut();
let mut fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield();
let fields_ids_map = fields_ids_map.deref_mut();
let document = doc_alloc.alloc_str(document.get());

View File

@ -5,7 +5,7 @@ use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIter
use rhai::{Dynamic, Engine, OptimizationLevel, Scope, AST};
use roaring::RoaringBitmap;
use super::document_changes::{DocumentChangeContext, MostlySend};
use super::document_changes::{DocumentChangeContext, MostlySend, RefCellExt};
use super::DocumentChanges;
use crate::documents::Error::InvalidDocumentFormat;
use crate::documents::PrimaryKey;
@ -142,7 +142,7 @@ impl<'index> DocumentChanges<'index> for UpdateByFunctionChanges<'index> {
// Future: Use a custom function rhai function to track changes.
// <https://docs.rs/rhai/latest/rhai/struct.Engine.html#method.register_indexer_set>
if json_document != rhaimap_to_object(new_rhai_document) {
let mut global_fields_ids_map = new_fields_ids_map.borrow_mut();
let mut global_fields_ids_map = new_fields_ids_map.borrow_mut_or_yield();
let new_document_id = self
.primary_key
.extract_fields_and_docid(