mirror of
https://github.com/meilisearch/meilisearch.git
synced 2024-11-25 19:45:05 +08:00
Move MostlySend, ThreadLocal, FullySend to their own commit
This commit is contained in:
parent
5f93651cef
commit
04c38220ca
@ -79,7 +79,7 @@ use roaring::RoaringBitmap;
|
|||||||
use rustc_hash::FxBuildHasher;
|
use rustc_hash::FxBuildHasher;
|
||||||
|
|
||||||
use crate::update::del_add::{DelAdd, KvWriterDelAdd};
|
use crate::update::del_add::{DelAdd, KvWriterDelAdd};
|
||||||
use crate::update::new::indexer::document_changes::MostlySend;
|
use crate::update::new::thread_local::MostlySend;
|
||||||
use crate::update::new::KvReaderDelAdd;
|
use crate::update::new::KvReaderDelAdd;
|
||||||
use crate::update::MergeDeladdCboRoaringBitmaps;
|
use crate::update::MergeDeladdCboRoaringBitmaps;
|
||||||
use crate::{CboRoaringBitmapCodec, Result};
|
use crate::{CboRoaringBitmapCodec, Result};
|
||||||
|
@ -6,8 +6,9 @@ use hashbrown::HashMap;
|
|||||||
use super::DelAddRoaringBitmap;
|
use super::DelAddRoaringBitmap;
|
||||||
use crate::update::new::channel::DocumentsSender;
|
use crate::update::new::channel::DocumentsSender;
|
||||||
use crate::update::new::document::{write_to_obkv, Document as _};
|
use crate::update::new::document::{write_to_obkv, Document as _};
|
||||||
use crate::update::new::indexer::document_changes::{DocumentChangeContext, Extractor, FullySend};
|
use crate::update::new::indexer::document_changes::{DocumentChangeContext, Extractor};
|
||||||
use crate::update::new::ref_cell_ext::RefCellExt as _;
|
use crate::update::new::ref_cell_ext::RefCellExt as _;
|
||||||
|
use crate::update::new::thread_local::FullySend;
|
||||||
use crate::update::new::DocumentChange;
|
use crate::update::new::DocumentChange;
|
||||||
use crate::vector::EmbeddingConfigs;
|
use crate::vector::EmbeddingConfigs;
|
||||||
use crate::Result;
|
use crate::Result;
|
||||||
|
@ -15,10 +15,10 @@ use crate::heed_codec::facet::OrderedF64Codec;
|
|||||||
use crate::update::del_add::DelAdd;
|
use crate::update::del_add::DelAdd;
|
||||||
use crate::update::new::channel::FieldIdDocidFacetSender;
|
use crate::update::new::channel::FieldIdDocidFacetSender;
|
||||||
use crate::update::new::indexer::document_changes::{
|
use crate::update::new::indexer::document_changes::{
|
||||||
extract, DocumentChangeContext, DocumentChanges, Extractor, FullySend, IndexingContext,
|
extract, DocumentChangeContext, DocumentChanges, Extractor, IndexingContext, Progress,
|
||||||
Progress, ThreadLocal,
|
|
||||||
};
|
};
|
||||||
use crate::update::new::ref_cell_ext::RefCellExt as _;
|
use crate::update::new::ref_cell_ext::RefCellExt as _;
|
||||||
|
use crate::update::new::thread_local::{FullySend, ThreadLocal};
|
||||||
use crate::update::new::DocumentChange;
|
use crate::update::new::DocumentChange;
|
||||||
use crate::update::GrenadParameters;
|
use crate::update::GrenadParameters;
|
||||||
use crate::{DocumentId, FieldId, Index, Result, MAX_FACET_VALUE_LENGTH};
|
use crate::{DocumentId, FieldId, Index, Result, MAX_FACET_VALUE_LENGTH};
|
||||||
|
@ -11,8 +11,9 @@ use serde_json::Value;
|
|||||||
|
|
||||||
use crate::error::GeoError;
|
use crate::error::GeoError;
|
||||||
use crate::update::new::document::Document;
|
use crate::update::new::document::Document;
|
||||||
use crate::update::new::indexer::document_changes::{DocumentChangeContext, Extractor, MostlySend};
|
use crate::update::new::indexer::document_changes::{DocumentChangeContext, Extractor};
|
||||||
use crate::update::new::ref_cell_ext::RefCellExt as _;
|
use crate::update::new::ref_cell_ext::RefCellExt as _;
|
||||||
|
use crate::update::new::thread_local::MostlySend;
|
||||||
use crate::update::new::DocumentChange;
|
use crate::update::new::DocumentChange;
|
||||||
use crate::update::GrenadParameters;
|
use crate::update::GrenadParameters;
|
||||||
use crate::{lat_lng_to_xyz, DocumentId, GeoPoint, Index, InternalError, Result};
|
use crate::{lat_lng_to_xyz, DocumentId, GeoPoint, Index, InternalError, Result};
|
||||||
|
@ -13,9 +13,8 @@ pub use geo::*;
|
|||||||
pub use searchable::*;
|
pub use searchable::*;
|
||||||
pub use vectors::EmbeddingExtractor;
|
pub use vectors::EmbeddingExtractor;
|
||||||
|
|
||||||
use super::indexer::document_changes::{
|
use super::indexer::document_changes::{DocumentChanges, IndexingContext, Progress};
|
||||||
DocumentChanges, FullySend, IndexingContext, Progress, ThreadLocal,
|
use super::thread_local::{FullySend, ThreadLocal};
|
||||||
};
|
|
||||||
use crate::update::GrenadParameters;
|
use crate::update::GrenadParameters;
|
||||||
use crate::Result;
|
use crate::Result;
|
||||||
|
|
||||||
|
@ -11,10 +11,10 @@ use super::tokenize_document::{tokenizer_builder, DocumentTokenizer};
|
|||||||
use crate::update::new::extract::cache::BalancedCaches;
|
use crate::update::new::extract::cache::BalancedCaches;
|
||||||
use crate::update::new::extract::perm_json_p::contained_in;
|
use crate::update::new::extract::perm_json_p::contained_in;
|
||||||
use crate::update::new::indexer::document_changes::{
|
use crate::update::new::indexer::document_changes::{
|
||||||
extract, DocumentChangeContext, DocumentChanges, Extractor, FullySend, IndexingContext,
|
extract, DocumentChangeContext, DocumentChanges, Extractor, IndexingContext, Progress,
|
||||||
MostlySend, Progress, ThreadLocal,
|
|
||||||
};
|
};
|
||||||
use crate::update::new::ref_cell_ext::RefCellExt as _;
|
use crate::update::new::ref_cell_ext::RefCellExt as _;
|
||||||
|
use crate::update::new::thread_local::{FullySend, MostlySend, ThreadLocal};
|
||||||
use crate::update::new::DocumentChange;
|
use crate::update::new::DocumentChange;
|
||||||
use crate::update::GrenadParameters;
|
use crate::update::GrenadParameters;
|
||||||
use crate::{bucketed_position, DocumentId, FieldId, Index, Result, MAX_POSITION_PER_ATTRIBUTE};
|
use crate::{bucketed_position, DocumentId, FieldId, Index, Result, MAX_POSITION_PER_ATTRIBUTE};
|
||||||
|
@ -14,9 +14,9 @@ use tokenize_document::{tokenizer_builder, DocumentTokenizer};
|
|||||||
use super::cache::BalancedCaches;
|
use super::cache::BalancedCaches;
|
||||||
use super::DocidsExtractor;
|
use super::DocidsExtractor;
|
||||||
use crate::update::new::indexer::document_changes::{
|
use crate::update::new::indexer::document_changes::{
|
||||||
extract, DocumentChangeContext, DocumentChanges, Extractor, FullySend, IndexingContext,
|
extract, DocumentChangeContext, DocumentChanges, Extractor, IndexingContext, Progress,
|
||||||
Progress, ThreadLocal,
|
|
||||||
};
|
};
|
||||||
|
use crate::update::new::thread_local::{FullySend, ThreadLocal};
|
||||||
use crate::update::new::DocumentChange;
|
use crate::update::new::DocumentChange;
|
||||||
use crate::update::GrenadParameters;
|
use crate::update::GrenadParameters;
|
||||||
use crate::{Index, Result, MAX_POSITION_PER_ATTRIBUTE};
|
use crate::{Index, Result, MAX_POSITION_PER_ATTRIBUTE};
|
||||||
|
@ -8,7 +8,8 @@ use super::cache::DelAddRoaringBitmap;
|
|||||||
use crate::error::FaultSource;
|
use crate::error::FaultSource;
|
||||||
use crate::prompt::Prompt;
|
use crate::prompt::Prompt;
|
||||||
use crate::update::new::channel::EmbeddingSender;
|
use crate::update::new::channel::EmbeddingSender;
|
||||||
use crate::update::new::indexer::document_changes::{DocumentChangeContext, Extractor, MostlySend};
|
use crate::update::new::indexer::document_changes::{DocumentChangeContext, Extractor};
|
||||||
|
use crate::update::new::thread_local::MostlySend;
|
||||||
use crate::update::new::vector_document::VectorDocument;
|
use crate::update::new::vector_document::VectorDocument;
|
||||||
use crate::update::new::DocumentChange;
|
use crate::update::new::DocumentChange;
|
||||||
use crate::vector::error::{
|
use crate::vector::error::{
|
||||||
|
@ -8,182 +8,9 @@ use rayon::iter::IndexedParallelIterator;
|
|||||||
use super::super::document_change::DocumentChange;
|
use super::super::document_change::DocumentChange;
|
||||||
use crate::fields_ids_map::metadata::FieldIdMapWithMetadata;
|
use crate::fields_ids_map::metadata::FieldIdMapWithMetadata;
|
||||||
use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _;
|
use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _;
|
||||||
|
use crate::update::new::thread_local::{FullySend, MostlySend, ThreadLocal};
|
||||||
use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result};
|
use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result};
|
||||||
|
|
||||||
/// A trait for types that are **not** [`Send`] only because they would then allow concurrent access to a type that is not [`Sync`].
|
|
||||||
///
|
|
||||||
/// The primary example of such a type is `&T`, with `T: !Sync`.
|
|
||||||
///
|
|
||||||
/// In the authors' understanding, a type can be `!Send` for two distinct reasons:
|
|
||||||
///
|
|
||||||
/// 1. Because it contains data that *genuinely* cannot be moved between threads, such as thread-local data.
|
|
||||||
/// 2. Because sending the type would allow concurrent access to a `!Sync` type, which is undefined behavior.
|
|
||||||
///
|
|
||||||
/// `MostlySend` exists to be used in bounds where you need a type whose data is **not** *attached* to a thread
|
|
||||||
/// because you might access it from a different thread, but where you will never access the type **concurrently** from
|
|
||||||
/// multiple threads.
|
|
||||||
///
|
|
||||||
/// Like [`Send`], `MostlySend` assumes properties on types that cannot be verified by the compiler, which is why implementing
|
|
||||||
/// this trait is unsafe.
|
|
||||||
///
|
|
||||||
/// # Safety
|
|
||||||
///
|
|
||||||
/// Implementers of this trait promises that the following properties hold on the implementing type:
|
|
||||||
///
|
|
||||||
/// 1. Its data can be accessed from any thread and will be the same regardless of the thread accessing it.
|
|
||||||
/// 2. Any operation that can be performed on the type does not depend on the thread that executes it.
|
|
||||||
///
|
|
||||||
/// As these properties are subtle and are not generally tracked by the Rust type system, great care should be taken before
|
|
||||||
/// implementing `MostlySend` on a type, especially a foreign type.
|
|
||||||
///
|
|
||||||
/// - An example of a type that verifies (1) and (2) is [`std::rc::Rc`] (when `T` is `Send` and `Sync`).
|
|
||||||
/// - An example of a type that doesn't verify (1) is thread-local data.
|
|
||||||
/// - An example of a type that doesn't verify (2) is [`std::sync::MutexGuard`]: a lot of mutex implementations require that
|
|
||||||
/// a lock is returned to the operating system on the same thread that initially locked the mutex, failing to uphold this
|
|
||||||
/// invariant will cause Undefined Behavior
|
|
||||||
/// (see last § in [the nomicon](https://doc.rust-lang.org/nomicon/send-and-sync.html)).
|
|
||||||
///
|
|
||||||
/// It is **always safe** to implement this trait on a type that is `Send`, but no placeholder impl is provided due to limitations in
|
|
||||||
/// coherency. Use the [`FullySend`] wrapper in this situation.
|
|
||||||
pub unsafe trait MostlySend {}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Copy, Default, Eq, PartialEq, Ord, PartialOrd, Hash)]
|
|
||||||
pub struct FullySend<T>(pub T);
|
|
||||||
|
|
||||||
// SAFETY: a type **fully** send is always mostly send as well.
|
|
||||||
unsafe impl<T> MostlySend for FullySend<T> where T: Send {}
|
|
||||||
|
|
||||||
unsafe impl<T> MostlySend for RefCell<T> where T: MostlySend {}
|
|
||||||
|
|
||||||
unsafe impl<T> MostlySend for Option<T> where T: MostlySend {}
|
|
||||||
|
|
||||||
impl<T> FullySend<T> {
|
|
||||||
pub fn into(self) -> T {
|
|
||||||
self.0
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> From<T> for FullySend<T> {
|
|
||||||
fn from(value: T) -> Self {
|
|
||||||
Self(value)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
|
||||||
struct MostlySendWrapper<T>(T);
|
|
||||||
|
|
||||||
impl<T: MostlySend> MostlySendWrapper<T> {
|
|
||||||
/// # Safety
|
|
||||||
///
|
|
||||||
/// - (P1) Users of this type will never access the type concurrently from multiple threads without synchronization
|
|
||||||
unsafe fn new(t: T) -> Self {
|
|
||||||
Self(t)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn as_ref(&self) -> &T {
|
|
||||||
&self.0
|
|
||||||
}
|
|
||||||
|
|
||||||
fn as_mut(&mut self) -> &mut T {
|
|
||||||
&mut self.0
|
|
||||||
}
|
|
||||||
|
|
||||||
fn into_inner(self) -> T {
|
|
||||||
self.0
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// # Safety
|
|
||||||
///
|
|
||||||
/// 1. `T` is [`MostlySend`], so by its safety contract it can be accessed by any thread and all of its operations are available
|
|
||||||
/// from any thread.
|
|
||||||
/// 2. (P1) of `MostlySendWrapper::new` forces the user to never access the value from multiple threads concurrently.
|
|
||||||
unsafe impl<T: MostlySend> Send for MostlySendWrapper<T> {}
|
|
||||||
|
|
||||||
/// A wrapper around [`thread_local::ThreadLocal`] that accepts [`MostlySend`] `T`s.
|
|
||||||
#[derive(Default)]
|
|
||||||
pub struct ThreadLocal<T: MostlySend> {
|
|
||||||
inner: thread_local::ThreadLocal<MostlySendWrapper<T>>,
|
|
||||||
// FIXME: this should be necessary
|
|
||||||
//_no_send: PhantomData<*mut ()>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T: MostlySend> ThreadLocal<T> {
|
|
||||||
pub fn new() -> Self {
|
|
||||||
Self { inner: thread_local::ThreadLocal::new() }
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn with_capacity(capacity: usize) -> Self {
|
|
||||||
Self { inner: thread_local::ThreadLocal::with_capacity(capacity) }
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn clear(&mut self) {
|
|
||||||
self.inner.clear()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn get(&self) -> Option<&T> {
|
|
||||||
self.inner.get().map(|t| t.as_ref())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn get_or<F>(&self, create: F) -> &T
|
|
||||||
where
|
|
||||||
F: FnOnce() -> T,
|
|
||||||
{
|
|
||||||
/// TODO: move ThreadLocal, MostlySend, FullySend to a dedicated file
|
|
||||||
self.inner.get_or(|| unsafe { MostlySendWrapper::new(create()) }).as_ref()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn get_or_try<F, E>(&self, create: F) -> std::result::Result<&T, E>
|
|
||||||
where
|
|
||||||
F: FnOnce() -> std::result::Result<T, E>,
|
|
||||||
{
|
|
||||||
self.inner
|
|
||||||
.get_or_try(|| unsafe { Ok(MostlySendWrapper::new(create()?)) })
|
|
||||||
.map(MostlySendWrapper::as_ref)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn get_or_default(&self) -> &T
|
|
||||||
where
|
|
||||||
T: Default,
|
|
||||||
{
|
|
||||||
self.inner.get_or_default().as_ref()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn iter_mut(&mut self) -> IterMut<T> {
|
|
||||||
IterMut(self.inner.iter_mut())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T: MostlySend> IntoIterator for ThreadLocal<T> {
|
|
||||||
type Item = T;
|
|
||||||
|
|
||||||
type IntoIter = IntoIter<T>;
|
|
||||||
|
|
||||||
fn into_iter(self) -> Self::IntoIter {
|
|
||||||
IntoIter(self.inner.into_iter())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct IterMut<'a, T: MostlySend>(thread_local::IterMut<'a, MostlySendWrapper<T>>);
|
|
||||||
|
|
||||||
impl<'a, T: MostlySend> Iterator for IterMut<'a, T> {
|
|
||||||
type Item = &'a mut T;
|
|
||||||
|
|
||||||
fn next(&mut self) -> Option<Self::Item> {
|
|
||||||
self.0.next().map(|t| t.as_mut())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct IntoIter<T: MostlySend>(thread_local::IntoIter<MostlySendWrapper<T>>);
|
|
||||||
|
|
||||||
impl<T: MostlySend> Iterator for IntoIter<T> {
|
|
||||||
type Item = T;
|
|
||||||
|
|
||||||
fn next(&mut self) -> Option<Self::Item> {
|
|
||||||
self.0.next().map(|t| t.into_inner())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct DocumentChangeContext<
|
pub struct DocumentChangeContext<
|
||||||
'doc, // covariant lifetime of a single `process` call
|
'doc, // covariant lifetime of a single `process` call
|
||||||
'extractor: 'doc, // invariant lifetime of the extractor_allocs
|
'extractor: 'doc, // invariant lifetime of the extractor_allocs
|
||||||
|
@ -4,8 +4,9 @@ use rayon::iter::IndexedParallelIterator;
|
|||||||
use rayon::slice::ParallelSlice as _;
|
use rayon::slice::ParallelSlice as _;
|
||||||
use roaring::RoaringBitmap;
|
use roaring::RoaringBitmap;
|
||||||
|
|
||||||
use super::document_changes::{DocumentChangeContext, DocumentChanges, MostlySend};
|
use super::document_changes::{DocumentChangeContext, DocumentChanges};
|
||||||
use crate::documents::PrimaryKey;
|
use crate::documents::PrimaryKey;
|
||||||
|
use crate::update::new::thread_local::MostlySend;
|
||||||
use crate::update::new::{Deletion, DocumentChange};
|
use crate::update::new::{Deletion, DocumentChange};
|
||||||
use crate::{DocumentId, Result};
|
use crate::{DocumentId, Result};
|
||||||
|
|
||||||
@ -92,9 +93,10 @@ mod test {
|
|||||||
use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder};
|
use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder};
|
||||||
use crate::index::tests::TempIndex;
|
use crate::index::tests::TempIndex;
|
||||||
use crate::update::new::indexer::document_changes::{
|
use crate::update::new::indexer::document_changes::{
|
||||||
extract, DocumentChangeContext, Extractor, IndexingContext, MostlySend, ThreadLocal,
|
extract, DocumentChangeContext, Extractor, IndexingContext,
|
||||||
};
|
};
|
||||||
use crate::update::new::indexer::DocumentDeletion;
|
use crate::update::new::indexer::DocumentDeletion;
|
||||||
|
use crate::update::new::thread_local::{MostlySend, ThreadLocal};
|
||||||
use crate::update::new::DocumentChange;
|
use crate::update::new::DocumentChange;
|
||||||
use crate::DocumentId;
|
use crate::DocumentId;
|
||||||
|
|
||||||
|
@ -9,10 +9,11 @@ use serde_json::value::RawValue;
|
|||||||
use serde_json::Deserializer;
|
use serde_json::Deserializer;
|
||||||
|
|
||||||
use super::super::document_change::DocumentChange;
|
use super::super::document_change::DocumentChange;
|
||||||
use super::document_changes::{DocumentChangeContext, DocumentChanges, MostlySend};
|
use super::document_changes::{DocumentChangeContext, DocumentChanges};
|
||||||
use super::retrieve_or_guess_primary_key;
|
use super::retrieve_or_guess_primary_key;
|
||||||
use crate::documents::PrimaryKey;
|
use crate::documents::PrimaryKey;
|
||||||
use crate::update::new::document::Versions;
|
use crate::update::new::document::Versions;
|
||||||
|
use crate::update::new::thread_local::MostlySend;
|
||||||
use crate::update::new::{Deletion, Insertion, Update};
|
use crate::update::new::{Deletion, Insertion, Update};
|
||||||
use crate::update::{AvailableIds, IndexDocumentsMethod};
|
use crate::update::{AvailableIds, IndexDocumentsMethod};
|
||||||
use crate::{DocumentId, Error, FieldsIdsMap, Index, InternalError, Result, UserError};
|
use crate::{DocumentId, Error, FieldsIdsMap, Index, InternalError, Result, UserError};
|
||||||
|
@ -3,7 +3,7 @@ use std::sync::{OnceLock, RwLock};
|
|||||||
use std::thread::{self, Builder};
|
use std::thread::{self, Builder};
|
||||||
|
|
||||||
use big_s::S;
|
use big_s::S;
|
||||||
use document_changes::{extract, DocumentChanges, IndexingContext, Progress, ThreadLocal};
|
use document_changes::{extract, DocumentChanges, IndexingContext, Progress};
|
||||||
pub use document_deletion::DocumentDeletion;
|
pub use document_deletion::DocumentDeletion;
|
||||||
pub use document_operation::{DocumentOperation, PayloadStats};
|
pub use document_operation::{DocumentOperation, PayloadStats};
|
||||||
use hashbrown::HashMap;
|
use hashbrown::HashMap;
|
||||||
@ -20,6 +20,7 @@ use super::channel::*;
|
|||||||
use super::extract::*;
|
use super::extract::*;
|
||||||
use super::facet_search_builder::FacetSearchBuilder;
|
use super::facet_search_builder::FacetSearchBuilder;
|
||||||
use super::merger::FacetFieldIdsDelta;
|
use super::merger::FacetFieldIdsDelta;
|
||||||
|
use super::thread_local::ThreadLocal;
|
||||||
use super::word_fst_builder::{PrefixData, PrefixDelta, WordFstBuilder};
|
use super::word_fst_builder::{PrefixData, PrefixDelta, WordFstBuilder};
|
||||||
use super::words_prefix_docids::{
|
use super::words_prefix_docids::{
|
||||||
compute_word_prefix_docids, compute_word_prefix_fid_docids, compute_word_prefix_position_docids,
|
compute_word_prefix_docids, compute_word_prefix_fid_docids, compute_word_prefix_position_docids,
|
||||||
|
@ -3,11 +3,12 @@ use std::ops::DerefMut;
|
|||||||
use rayon::iter::IndexedParallelIterator;
|
use rayon::iter::IndexedParallelIterator;
|
||||||
use serde_json::value::RawValue;
|
use serde_json::value::RawValue;
|
||||||
|
|
||||||
use super::document_changes::{DocumentChangeContext, DocumentChanges, MostlySend};
|
use super::document_changes::{DocumentChangeContext, DocumentChanges};
|
||||||
use crate::documents::PrimaryKey;
|
use crate::documents::PrimaryKey;
|
||||||
use crate::update::concurrent_available_ids::ConcurrentAvailableIds;
|
use crate::update::concurrent_available_ids::ConcurrentAvailableIds;
|
||||||
use crate::update::new::document::Versions;
|
use crate::update::new::document::Versions;
|
||||||
use crate::update::new::ref_cell_ext::RefCellExt as _;
|
use crate::update::new::ref_cell_ext::RefCellExt as _;
|
||||||
|
use crate::update::new::thread_local::MostlySend;
|
||||||
use crate::update::new::{DocumentChange, Insertion};
|
use crate::update::new::{DocumentChange, Insertion};
|
||||||
use crate::{Error, InternalError, Result, UserError};
|
use crate::{Error, InternalError, Result, UserError};
|
||||||
|
|
||||||
|
@ -4,13 +4,14 @@ use rayon::slice::ParallelSlice as _;
|
|||||||
use rhai::{Dynamic, Engine, OptimizationLevel, Scope, AST};
|
use rhai::{Dynamic, Engine, OptimizationLevel, Scope, AST};
|
||||||
use roaring::RoaringBitmap;
|
use roaring::RoaringBitmap;
|
||||||
|
|
||||||
use super::document_changes::{DocumentChangeContext, MostlySend};
|
use super::document_changes::DocumentChangeContext;
|
||||||
use super::DocumentChanges;
|
use super::DocumentChanges;
|
||||||
use crate::documents::Error::InvalidDocumentFormat;
|
use crate::documents::Error::InvalidDocumentFormat;
|
||||||
use crate::documents::PrimaryKey;
|
use crate::documents::PrimaryKey;
|
||||||
use crate::error::{FieldIdMapMissingEntry, InternalError};
|
use crate::error::{FieldIdMapMissingEntry, InternalError};
|
||||||
use crate::update::new::document::Versions;
|
use crate::update::new::document::Versions;
|
||||||
use crate::update::new::ref_cell_ext::RefCellExt as _;
|
use crate::update::new::ref_cell_ext::RefCellExt as _;
|
||||||
|
use crate::update::new::thread_local::MostlySend;
|
||||||
use crate::update::new::{Deletion, DocumentChange, KvReaderFieldId, Update};
|
use crate::update::new::{Deletion, DocumentChange, KvReaderFieldId, Update};
|
||||||
use crate::{all_obkv_to_json, Error, FieldsIdsMap, Object, Result, UserError};
|
use crate::{all_obkv_to_json, Error, FieldsIdsMap, Object, Result, UserError};
|
||||||
|
|
||||||
|
@ -17,6 +17,7 @@ pub mod indexer;
|
|||||||
mod merger;
|
mod merger;
|
||||||
mod parallel_iterator_ext;
|
mod parallel_iterator_ext;
|
||||||
mod ref_cell_ext;
|
mod ref_cell_ext;
|
||||||
|
pub(crate) mod thread_local;
|
||||||
mod top_level_map;
|
mod top_level_map;
|
||||||
pub mod vector_document;
|
pub mod vector_document;
|
||||||
mod word_fst_builder;
|
mod word_fst_builder;
|
||||||
|
174
crates/milli/src/update/new/thread_local.rs
Normal file
174
crates/milli/src/update/new/thread_local.rs
Normal file
@ -0,0 +1,174 @@
|
|||||||
|
use std::cell::RefCell;
|
||||||
|
|
||||||
|
/// A trait for types that are **not** [`Send`] only because they would then allow concurrent access to a type that is not [`Sync`].
|
||||||
|
///
|
||||||
|
/// The primary example of such a type is `&T`, with `T: !Sync`.
|
||||||
|
///
|
||||||
|
/// In the authors' understanding, a type can be `!Send` for two distinct reasons:
|
||||||
|
///
|
||||||
|
/// 1. Because it contains data that *genuinely* cannot be moved between threads, such as thread-local data.
|
||||||
|
/// 2. Because sending the type would allow concurrent access to a `!Sync` type, which is undefined behavior.
|
||||||
|
///
|
||||||
|
/// `MostlySend` exists to be used in bounds where you need a type whose data is **not** *attached* to a thread
|
||||||
|
/// because you might access it from a different thread, but where you will never access the type **concurrently** from
|
||||||
|
/// multiple threads.
|
||||||
|
///
|
||||||
|
/// Like [`Send`], `MostlySend` assumes properties on types that cannot be verified by the compiler, which is why implementing
|
||||||
|
/// this trait is unsafe.
|
||||||
|
///
|
||||||
|
/// # Safety
|
||||||
|
///
|
||||||
|
/// Implementers of this trait promises that the following properties hold on the implementing type:
|
||||||
|
///
|
||||||
|
/// 1. Its data can be accessed from any thread and will be the same regardless of the thread accessing it.
|
||||||
|
/// 2. Any operation that can be performed on the type does not depend on the thread that executes it.
|
||||||
|
///
|
||||||
|
/// As these properties are subtle and are not generally tracked by the Rust type system, great care should be taken before
|
||||||
|
/// implementing `MostlySend` on a type, especially a foreign type.
|
||||||
|
///
|
||||||
|
/// - An example of a type that verifies (1) and (2) is [`std::rc::Rc`] (when `T` is `Send` and `Sync`).
|
||||||
|
/// - An example of a type that doesn't verify (1) is thread-local data.
|
||||||
|
/// - An example of a type that doesn't verify (2) is [`std::sync::MutexGuard`]: a lot of mutex implementations require that
|
||||||
|
/// a lock is returned to the operating system on the same thread that initially locked the mutex, failing to uphold this
|
||||||
|
/// invariant will cause Undefined Behavior
|
||||||
|
/// (see last § in [the nomicon](https://doc.rust-lang.org/nomicon/send-and-sync.html)).
|
||||||
|
///
|
||||||
|
/// It is **always safe** to implement this trait on a type that is `Send`, but no placeholder impl is provided due to limitations in
|
||||||
|
/// coherency. Use the [`FullySend`] wrapper in this situation.
|
||||||
|
pub unsafe trait MostlySend {}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Copy, Default, Eq, PartialEq, Ord, PartialOrd, Hash)]
|
||||||
|
pub struct FullySend<T>(pub T);
|
||||||
|
|
||||||
|
// SAFETY: a type **fully** send is always mostly send as well.
|
||||||
|
unsafe impl<T> MostlySend for FullySend<T> where T: Send {}
|
||||||
|
|
||||||
|
unsafe impl<T> MostlySend for RefCell<T> where T: MostlySend {}
|
||||||
|
|
||||||
|
unsafe impl<T> MostlySend for Option<T> where T: MostlySend {}
|
||||||
|
|
||||||
|
impl<T> FullySend<T> {
|
||||||
|
pub fn into(self) -> T {
|
||||||
|
self.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> From<T> for FullySend<T> {
|
||||||
|
fn from(value: T) -> Self {
|
||||||
|
Self(value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
||||||
|
struct MostlySendWrapper<T>(T);
|
||||||
|
|
||||||
|
impl<T: MostlySend> MostlySendWrapper<T> {
|
||||||
|
/// # Safety
|
||||||
|
///
|
||||||
|
/// - (P1) Users of this type will never access the type concurrently from multiple threads without synchronization
|
||||||
|
unsafe fn new(t: T) -> Self {
|
||||||
|
Self(t)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn as_ref(&self) -> &T {
|
||||||
|
&self.0
|
||||||
|
}
|
||||||
|
|
||||||
|
fn as_mut(&mut self) -> &mut T {
|
||||||
|
&mut self.0
|
||||||
|
}
|
||||||
|
|
||||||
|
fn into_inner(self) -> T {
|
||||||
|
self.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// # Safety
|
||||||
|
///
|
||||||
|
/// 1. `T` is [`MostlySend`], so by its safety contract it can be accessed by any thread and all of its operations are available
|
||||||
|
/// from any thread.
|
||||||
|
/// 2. (P1) of `MostlySendWrapper::new` forces the user to never access the value from multiple threads concurrently.
|
||||||
|
unsafe impl<T: MostlySend> Send for MostlySendWrapper<T> {}
|
||||||
|
|
||||||
|
/// A wrapper around [`thread_local::ThreadLocal`] that accepts [`MostlySend`] `T`s.
|
||||||
|
#[derive(Default)]
|
||||||
|
pub struct ThreadLocal<T: MostlySend> {
|
||||||
|
inner: thread_local::ThreadLocal<MostlySendWrapper<T>>,
|
||||||
|
// FIXME: this should be necessary
|
||||||
|
//_no_send: PhantomData<*mut ()>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: MostlySend> ThreadLocal<T> {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self { inner: thread_local::ThreadLocal::new() }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn with_capacity(capacity: usize) -> Self {
|
||||||
|
Self { inner: thread_local::ThreadLocal::with_capacity(capacity) }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn clear(&mut self) {
|
||||||
|
self.inner.clear()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get(&self) -> Option<&T> {
|
||||||
|
self.inner.get().map(|t| t.as_ref())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_or<F>(&self, create: F) -> &T
|
||||||
|
where
|
||||||
|
F: FnOnce() -> T,
|
||||||
|
{
|
||||||
|
self.inner.get_or(|| unsafe { MostlySendWrapper::new(create()) }).as_ref()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_or_try<F, E>(&self, create: F) -> std::result::Result<&T, E>
|
||||||
|
where
|
||||||
|
F: FnOnce() -> std::result::Result<T, E>,
|
||||||
|
{
|
||||||
|
self.inner
|
||||||
|
.get_or_try(|| unsafe { Ok(MostlySendWrapper::new(create()?)) })
|
||||||
|
.map(MostlySendWrapper::as_ref)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_or_default(&self) -> &T
|
||||||
|
where
|
||||||
|
T: Default,
|
||||||
|
{
|
||||||
|
self.inner.get_or_default().as_ref()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn iter_mut(&mut self) -> IterMut<T> {
|
||||||
|
IterMut(self.inner.iter_mut())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: MostlySend> IntoIterator for ThreadLocal<T> {
|
||||||
|
type Item = T;
|
||||||
|
|
||||||
|
type IntoIter = IntoIter<T>;
|
||||||
|
|
||||||
|
fn into_iter(self) -> Self::IntoIter {
|
||||||
|
IntoIter(self.inner.into_iter())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct IterMut<'a, T: MostlySend>(thread_local::IterMut<'a, MostlySendWrapper<T>>);
|
||||||
|
|
||||||
|
impl<'a, T: MostlySend> Iterator for IterMut<'a, T> {
|
||||||
|
type Item = &'a mut T;
|
||||||
|
|
||||||
|
fn next(&mut self) -> Option<Self::Item> {
|
||||||
|
self.0.next().map(|t| t.as_mut())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct IntoIter<T: MostlySend>(thread_local::IntoIter<MostlySendWrapper<T>>);
|
||||||
|
|
||||||
|
impl<T: MostlySend> Iterator for IntoIter<T> {
|
||||||
|
type Item = T;
|
||||||
|
|
||||||
|
fn next(&mut self) -> Option<Self::Item> {
|
||||||
|
self.0.next().map(|t| t.into_inner())
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user