diff --git a/milli/src/update/new/extract/faceted/extract_facets.rs b/milli/src/update/new/extract/faceted/extract_facets.rs index 14cc28da4..82f80c7b5 100644 --- a/milli/src/update/new/extract/faceted/extract_facets.rs +++ b/milli/src/update/new/extract/faceted/extract_facets.rs @@ -17,7 +17,7 @@ use crate::facet::value_encoding::f64_into_bytes; use crate::update::new::extract::DocidsExtractor; use crate::update::new::indexer::document_changes::{ for_each_document_change, DocumentChangeContext, DocumentChanges, Extractor, FullySend, - IndexingContext, ThreadLocal, + IndexingContext, RefCellExt, ThreadLocal, }; use crate::update::new::DocumentChange; use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; @@ -71,8 +71,8 @@ impl FacetedDocidsExtractor { ) -> Result<()> { let index = &context.index; let rtxn = &context.txn; - let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut(); - let mut cached_sorter = context.data.0.borrow_mut(); + let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield(); + let mut cached_sorter = context.data.0.borrow_mut_or_yield(); match document_change { DocumentChange::Deletion(inner) => extract_document_facets( attributes_to_extract, diff --git a/milli/src/update/new/extract/searchable/extract_word_docids.rs b/milli/src/update/new/extract/searchable/extract_word_docids.rs index fd74cc8ce..5d70408bb 100644 --- a/milli/src/update/new/extract/searchable/extract_word_docids.rs +++ b/milli/src/update/new/extract/searchable/extract_word_docids.rs @@ -13,7 +13,7 @@ use crate::update::new::extract::cache::CboCachedSorter; use crate::update::new::extract::perm_json_p::contained_in; use crate::update::new::indexer::document_changes::{ for_each_document_change, DocumentChangeContext, DocumentChanges, Extractor, FullySend, - IndexingContext, ThreadLocal, + IndexingContext, RefCellExt, ThreadLocal, }; use crate::update::new::DocumentChange; use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; @@ -411,9 +411,9 @@ impl WordDocidsExtractors { ) -> Result<()> { let index = &context.index; let rtxn = &context.txn; - let mut cached_sorter = context.data.0.borrow_mut(); + let mut cached_sorter = context.data.0.borrow_mut_or_yield(); let cached_sorter = cached_sorter.deref_mut(); - let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut(); + let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield(); let new_fields_ids_map = new_fields_ids_map.deref_mut(); let exact_attributes = index.exact_attributes(rtxn)?; diff --git a/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs b/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs index 86ede5b14..53e6515a9 100644 --- a/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs +++ b/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs @@ -9,7 +9,7 @@ use super::SearchableExtractor; use crate::proximity::{index_proximity, MAX_DISTANCE}; use crate::update::new::document::Document; use crate::update::new::extract::cache::CboCachedSorter; -use crate::update::new::indexer::document_changes::{DocumentChangeContext, FullySend}; +use crate::update::new::indexer::document_changes::{DocumentChangeContext, FullySend, RefCellExt}; use crate::update::new::DocumentChange; use crate::update::MergeDeladdCboRoaringBitmaps; use crate::{FieldId, GlobalFieldsIdsMap, Index, Result}; @@ -45,10 +45,10 @@ impl SearchableExtractor for WordPairProximityDocidsExtractor { let mut del_word_pair_proximity = bumpalo::collections::Vec::new_in(doc_alloc); let mut add_word_pair_proximity = bumpalo::collections::Vec::new_in(doc_alloc); - let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut(); + let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield(); let new_fields_ids_map = &mut *new_fields_ids_map; - let mut cached_sorter = context.data.0.borrow_mut(); + let mut cached_sorter = context.data.0.borrow_mut_or_yield(); let cached_sorter = &mut *cached_sorter; // is a vecdequeue, and will be smol, so can stay on the heap for now diff --git a/milli/src/update/new/indexer/document_changes.rs b/milli/src/update/new/indexer/document_changes.rs index 18c7cdf02..5d9e7b3ba 100644 --- a/milli/src/update/new/indexer/document_changes.rs +++ b/milli/src/update/new/indexer/document_changes.rs @@ -1,4 +1,4 @@ -use std::cell::{Cell, RefCell}; +use std::cell::{Cell, Ref, RefCell, RefMut}; use std::sync::{Arc, RwLock}; use bumpalo::Bump; @@ -10,6 +10,49 @@ use super::super::document_change::DocumentChange; use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _; use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, Result}; +pub trait RefCellExt { + fn try_borrow_or_yield(&self) -> std::result::Result, std::cell::BorrowError>; + fn try_borrow_mut_or_yield( + &self, + ) -> std::result::Result, std::cell::BorrowMutError>; + + fn borrow_or_yield(&self) -> Ref<'_, T> { + self.try_borrow_or_yield().unwrap() + } + + fn borrow_mut_or_yield(&self) -> RefMut<'_, T> { + self.try_borrow_mut_or_yield().unwrap() + } +} + +impl RefCellExt for RefCell { + fn try_borrow_or_yield(&self) -> std::result::Result, std::cell::BorrowError> { + loop { + match self.try_borrow() { + Ok(borrow) => break Ok(borrow), + Err(error) => match rayon::yield_local() { + Some(rayon::Yield::Executed) => continue, + _ => return Err(error), + }, + } + } + } + + fn try_borrow_mut_or_yield( + &self, + ) -> std::result::Result, std::cell::BorrowMutError> { + loop { + match self.try_borrow_mut() { + Ok(borrow) => break Ok(borrow), + Err(error) => match rayon::yield_local() { + Some(rayon::Yield::Executed) => continue, + _ => return Err(error), + }, + } + } + } +} + /// A trait for types that are **not** [`Send`] only because they would then allow concurrent access to a type that is not [`Sync`]. /// /// The primary example of such a type is `&T`, with `T: !Sync`. @@ -245,7 +288,7 @@ impl< let fields_ids_map = &fields_ids_map.0; let extractor_alloc = extractor_allocs.get_or_default(); - let extractor_alloc = RefBump::new(extractor_alloc.0.borrow()); + let extractor_alloc = RefBump::new(extractor_alloc.0.borrow_or_yield()); let data = datastore.get_or_try(|| init_data(RefBump::clone(&extractor_alloc)))?; diff --git a/milli/src/update/new/indexer/mod.rs b/milli/src/update/new/indexer/mod.rs index 5f0face8b..29ff2685e 100644 --- a/milli/src/update/new/indexer/mod.rs +++ b/milli/src/update/new/indexer/mod.rs @@ -5,7 +5,8 @@ use std::thread::{self, Builder}; use big_s::S; use bumpalo::Bump; use document_changes::{ - for_each_document_change, DocumentChanges, Extractor, FullySend, IndexingContext, ThreadLocal, + for_each_document_change, DocumentChanges, Extractor, FullySend, IndexingContext, RefCellExt, + ThreadLocal, }; pub use document_deletion::DocumentDeletion; pub use document_operation::DocumentOperation; @@ -62,7 +63,7 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentExtractor<'a> { ) -> Result<()> { let mut document_buffer = Vec::new(); - let new_fields_ids_map = context.new_fields_ids_map.borrow(); + let new_fields_ids_map = context.new_fields_ids_map.borrow_or_yield(); let new_fields_ids_map = &*new_fields_ids_map; let new_fields_ids_map = new_fields_ids_map.local_map(); diff --git a/milli/src/update/new/indexer/partial_dump.rs b/milli/src/update/new/indexer/partial_dump.rs index 3b528d5e8..10fc95a03 100644 --- a/milli/src/update/new/indexer/partial_dump.rs +++ b/milli/src/update/new/indexer/partial_dump.rs @@ -5,7 +5,7 @@ use serde::Deserializer; use serde_json::value::RawValue; use super::de::FieldAndDocidExtractor; -use super::document_changes::{DocumentChangeContext, DocumentChanges, MostlySend}; +use super::document_changes::{DocumentChangeContext, DocumentChanges, MostlySend, RefCellExt}; use crate::documents::{DocumentIdExtractionError, PrimaryKey}; use crate::update::concurrent_available_ids::ConcurrentAvailableIds; use crate::update::new::document::DocumentFromVersions; @@ -63,7 +63,7 @@ where None => return Err(Error::UserError(UserError::DocumentLimitReached)), }; - let mut fields_ids_map = context.new_fields_ids_map.borrow_mut(); + let mut fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield(); let fields_ids_map = fields_ids_map.deref_mut(); let document = doc_alloc.alloc_str(document.get()); diff --git a/milli/src/update/new/indexer/update_by_function.rs b/milli/src/update/new/indexer/update_by_function.rs index 6f2914577..826f918a4 100644 --- a/milli/src/update/new/indexer/update_by_function.rs +++ b/milli/src/update/new/indexer/update_by_function.rs @@ -5,7 +5,7 @@ use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIter use rhai::{Dynamic, Engine, OptimizationLevel, Scope, AST}; use roaring::RoaringBitmap; -use super::document_changes::{DocumentChangeContext, MostlySend}; +use super::document_changes::{DocumentChangeContext, MostlySend, RefCellExt}; use super::DocumentChanges; use crate::documents::Error::InvalidDocumentFormat; use crate::documents::PrimaryKey; @@ -142,7 +142,7 @@ impl<'index> DocumentChanges<'index> for UpdateByFunctionChanges<'index> { // Future: Use a custom function rhai function to track changes. // if json_document != rhaimap_to_object(new_rhai_document) { - let mut global_fields_ids_map = new_fields_ids_map.borrow_mut(); + let mut global_fields_ids_map = new_fields_ids_map.borrow_mut_or_yield(); let new_document_id = self .primary_key .extract_fields_and_docid(