WIP using try_map_try_init

This commit is contained in:
Clément Renault 2024-09-26 18:59:28 +02:00
parent 3843240940
commit 31de5c747e
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
8 changed files with 55 additions and 29 deletions

View File

@ -1,6 +1,7 @@
use std::collections::HashSet;
use std::fmt::Debug;
use std::fs::File;
use std::sync::Arc;
use grenad::{MergeFunction, Merger};
use heed::RoTxn;
@ -14,7 +15,9 @@ use crate::facet::value_encoding::f64_into_bytes;
use crate::update::new::extract::DocidsExtractor;
use crate::update::new::{DocumentChange, ItemsPool};
use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps};
use crate::{DocumentId, FieldId, GlobalFieldsIdsMap, Index, Result, MAX_FACET_VALUE_LENGTH};
use crate::{
DocumentId, Error, FieldId, GlobalFieldsIdsMap, Index, Result, MAX_FACET_VALUE_LENGTH,
};
pub struct FacetedDocidsExtractor;
impl FacetedDocidsExtractor {
@ -195,7 +198,9 @@ impl DocidsExtractor for FacetedDocidsExtractor {
index: &Index,
fields_ids_map: &GlobalFieldsIdsMap,
indexer: GrenadParameters,
document_changes: impl IntoParallelIterator<Item = Result<DocumentChange>>,
document_changes: impl IntoParallelIterator<
Item = std::result::Result<DocumentChange, Arc<Error>>,
>,
) -> Result<Merger<File, MergeDeladdCboRoaringBitmaps>> {
let max_memory = indexer.max_memory_by_thread();

View File

@ -4,6 +4,7 @@ mod lru;
mod searchable;
use std::fs::File;
use std::sync::Arc;
pub use faceted::*;
use grenad::Merger;
@ -12,14 +13,16 @@ pub use searchable::*;
use super::DocumentChange;
use crate::update::{GrenadParameters, MergeDeladdCboRoaringBitmaps};
use crate::{GlobalFieldsIdsMap, Index, Result};
use crate::{Error, GlobalFieldsIdsMap, Index, Result};
pub trait DocidsExtractor {
fn run_extraction(
index: &Index,
fields_ids_map: &GlobalFieldsIdsMap,
indexer: GrenadParameters,
document_changes: impl IntoParallelIterator<Item = Result<DocumentChange>>,
document_changes: impl IntoParallelIterator<
Item = std::result::Result<DocumentChange, Arc<Error>>,
>,
) -> Result<Merger<File, MergeDeladdCboRoaringBitmaps>>;
}

View File

@ -3,6 +3,7 @@ mod extract_word_pair_proximity_docids;
mod tokenize_document;
use std::fs::File;
use std::sync::Arc;
pub use extract_word_docids::{WordDocidsExtractors, WordDocidsMergers};
pub use extract_word_pair_proximity_docids::WordPairProximityDocidsExtractor;
@ -15,14 +16,16 @@ use super::cache::CboCachedSorter;
use super::DocidsExtractor;
use crate::update::new::{DocumentChange, ItemsPool};
use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps};
use crate::{GlobalFieldsIdsMap, Index, Result, MAX_POSITION_PER_ATTRIBUTE};
use crate::{Error, GlobalFieldsIdsMap, Index, Result, MAX_POSITION_PER_ATTRIBUTE};
pub trait SearchableExtractor {
fn run_extraction(
index: &Index,
fields_ids_map: &GlobalFieldsIdsMap,
indexer: GrenadParameters,
document_changes: impl IntoParallelIterator<Item = Result<DocumentChange>>,
document_changes: impl IntoParallelIterator<
Item = std::result::Result<DocumentChange, Arc<Error>>,
>,
) -> Result<Merger<File, MergeDeladdCboRoaringBitmaps>> {
let max_memory = indexer.max_memory_by_thread();
@ -132,7 +135,9 @@ impl<T: SearchableExtractor> DocidsExtractor for T {
index: &Index,
fields_ids_map: &GlobalFieldsIdsMap,
indexer: GrenadParameters,
document_changes: impl IntoParallelIterator<Item = Result<DocumentChange>>,
document_changes: impl IntoParallelIterator<
Item = std::result::Result<DocumentChange, Arc<Error>>,
>,
) -> Result<Merger<File, MergeDeladdCboRoaringBitmaps>> {
Self::run_extraction(index, fields_ids_map, indexer, document_changes)
}

View File

@ -1,11 +1,12 @@
use std::sync::Arc;
use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
use rayon::iter::{IndexedParallelIterator, IntoParallelIterator};
use roaring::RoaringBitmap;
use super::DocumentChanges;
use crate::update::new::{Deletion, DocumentChange, ItemsPool};
use crate::{FieldsIdsMap, Index, Result};
use crate::update::new::items_pool::ParallelIteratorExt as _;
use crate::update::new::{Deletion, DocumentChange};
use crate::{Error, FieldsIdsMap, Index, Result};
pub struct DocumentDeletion {
pub to_delete: RoaringBitmap,
@ -28,15 +29,19 @@ impl<'p> DocumentChanges<'p> for DocumentDeletion {
self,
_fields_ids_map: &mut FieldsIdsMap,
param: Self::Parameter,
) -> Result<impl IndexedParallelIterator<Item = Result<DocumentChange>> + Clone + 'p> {
) -> Result<
impl IndexedParallelIterator<Item = std::result::Result<DocumentChange, Arc<Error>>>
+ Clone
+ 'p,
> {
let index = param;
let items = Arc::new(ItemsPool::new(|| index.read_txn().map_err(crate::Error::from)));
let to_delete: Vec<_> = self.to_delete.into_iter().collect();
Ok(to_delete.into_par_iter().map_with(items, |items, docid| {
items.with(|rtxn| {
Ok(to_delete.into_par_iter().try_map_try_init(
|| index.read_txn().map_err(crate::Error::from),
|rtxn, docid| {
let current = index.document(rtxn, docid)?;
Ok(DocumentChange::Deletion(Deletion::create(docid, current.boxed())))
})
}))
},
))
}
}

View File

@ -75,9 +75,8 @@ impl<'p, 'pl: 'p> DocumentChanges<'p> for DocumentOperation<'pl> {
fields_ids_map: &mut FieldsIdsMap,
param: Self::Parameter,
) -> Result<
impl IndexedParallelIterator<
Item = std::result::Result<DocumentChange, Option<crate::Error>>,
> + Clone
impl IndexedParallelIterator<Item = std::result::Result<DocumentChange, Arc<Error>>>
+ Clone
+ 'p,
> {
let (index, rtxn, primary_key) = param;
@ -206,7 +205,7 @@ impl<'p, 'pl: 'p> DocumentChanges<'p> for DocumentOperation<'pl> {
docids_version_offsets.sort_unstable_by_key(|(_, (_, docops))| sort_function_key(docops));
Ok(docids_version_offsets.into_par_iter().try_map_try_init(
|| index.read_txn().map_err(crate::Error::from),
|| index.read_txn().map_err(Error::from),
move |rtxn, (external_docid, (internal_docid, operations))| {
let document_merge_function = match self.index_documents_method {
Idm::ReplaceDocuments => MergeDocumentForReplacement::merge,

View File

@ -1,4 +1,4 @@
use std::sync::RwLock;
use std::sync::{Arc, RwLock};
use std::thread::{self, Builder};
use big_s::S;
@ -38,9 +38,8 @@ pub trait DocumentChanges<'p> {
fields_ids_map: &mut FieldsIdsMap,
param: Self::Parameter,
) -> Result<
impl IndexedParallelIterator<
Item = std::result::Result<DocumentChange, Option<crate::Error>>,
> + Clone
impl IndexedParallelIterator<Item = std::result::Result<DocumentChange, Arc<Error>>>
+ Clone
+ 'p,
>;
}
@ -58,7 +57,7 @@ pub fn index<PI>(
document_changes: PI,
) -> Result<()>
where
PI: IndexedParallelIterator<Item = std::result::Result<DocumentChange, Option<Error>>>
PI: IndexedParallelIterator<Item = std::result::Result<DocumentChange, Arc<Error>>>
+ Send
+ Clone,
{
@ -249,7 +248,7 @@ fn extract_and_send_docids<E: DocidsExtractor, D: MergerOperationType>(
index: &Index,
fields_ids_map: &GlobalFieldsIdsMap,
indexer: GrenadParameters,
document_changes: impl IntoParallelIterator<Item = Result<DocumentChange>>,
document_changes: impl IntoParallelIterator<Item = std::result::Result<DocumentChange, Arc<Error>>>,
sender: &ExtractorSender,
) -> Result<()> {
let merger = E::run_extraction(index, fields_ids_map, indexer, document_changes)?;

View File

@ -30,7 +30,11 @@ where
self,
_fields_ids_map: &mut FieldsIdsMap,
param: Self::Parameter,
) -> Result<impl IndexedParallelIterator<Item = Result<DocumentChange>> + Clone + 'p> {
) -> Result<
impl IndexedParallelIterator<Item = std::result::Result<DocumentChange, Arc<Error>>>
+ Clone
+ 'p,
> {
let (fields_ids_map, concurrent_available_ids, primary_key) = param;
Ok(self.iter.map(|object| {

View File

@ -1,8 +1,10 @@
use std::sync::Arc;
use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
use super::DocumentChanges;
use crate::update::new::DocumentChange;
use crate::{FieldsIdsMap, Result};
use crate::{Error, FieldsIdsMap, Result};
pub struct UpdateByFunction;
@ -13,7 +15,11 @@ impl<'p> DocumentChanges<'p> for UpdateByFunction {
self,
_fields_ids_map: &mut FieldsIdsMap,
_param: Self::Parameter,
) -> Result<impl IndexedParallelIterator<Item = Result<DocumentChange>> + Clone + 'p> {
) -> Result<
impl IndexedParallelIterator<Item = std::result::Result<DocumentChange, Arc<Error>>>
+ Clone
+ 'p,
> {
Ok((0..100).into_par_iter().map(|_| todo!()))
}
}