diff --git a/milli/src/update/new/extract/searchable/extract_word_docids.rs b/milli/src/update/new/extract/searchable/extract_word_docids.rs index caab170a4..828219b41 100644 --- a/milli/src/update/new/extract/searchable/extract_word_docids.rs +++ b/milli/src/update/new/extract/searchable/extract_word_docids.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use std::fs::File; use std::num::NonZero; +use std::sync::Arc; use grenad::{Merger, MergerBuilder}; use heed::RoTxn; @@ -12,7 +13,7 @@ use crate::update::new::extract::perm_json_p::contained_in; use crate::update::new::{DocumentChange, ItemsPool}; use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; use crate::{ - bucketed_position, DocumentId, FieldId, GlobalFieldsIdsMap, Index, Result, + bucketed_position, DocumentId, Error, FieldId, GlobalFieldsIdsMap, Index, Result, MAX_POSITION_PER_ATTRIBUTE, }; @@ -303,7 +304,9 @@ impl WordDocidsExtractors { index: &Index, fields_ids_map: &GlobalFieldsIdsMap, indexer: GrenadParameters, - document_changes: impl IntoParallelIterator>, + document_changes: impl IntoParallelIterator< + Item = std::result::Result>, + >, ) -> Result { let max_memory = indexer.max_memory_by_thread(); diff --git a/milli/src/update/new/indexer/partial_dump.rs b/milli/src/update/new/indexer/partial_dump.rs index 02c9d68fc..aa01f6547 100644 --- a/milli/src/update/new/indexer/partial_dump.rs +++ b/milli/src/update/new/indexer/partial_dump.rs @@ -1,8 +1,11 @@ -use rayon::iter::IndexedParallelIterator; +use std::sync::Arc; + +use rayon::iter::{IndexedParallelIterator, ParallelBridge, ParallelIterator}; use super::DocumentChanges; use crate::documents::{DocumentIdExtractionError, PrimaryKey}; use crate::update::concurrent_available_ids::ConcurrentAvailableIds; +use crate::update::new::items_pool::ParallelIteratorExt; use crate::update::new::{DocumentChange, Insertion, KvWriterFieldId}; use crate::{all_obkv_to_json, Error, FieldsIdsMap, Object, Result, UserError}; @@ -37,41 +40,46 @@ where > { let (fields_ids_map, concurrent_available_ids, primary_key) = param; - Ok(self.iter.map(|object| { - let docid = match concurrent_available_ids.next() { - Some(id) => id, - None => return Err(Error::UserError(UserError::DocumentLimitReached)), - }; + Ok(self.iter.try_map_try_init( + || Ok(()), + |_, object| { + let docid = match concurrent_available_ids.next() { + Some(id) => id, + None => return Err(Error::UserError(UserError::DocumentLimitReached)), + }; - let mut writer = KvWriterFieldId::memory(); - object.iter().for_each(|(key, value)| { - let key = fields_ids_map.id(key).unwrap(); - /// TODO better error management - let value = serde_json::to_vec(&value).unwrap(); - /// TODO it is not ordered - writer.insert(key, value).unwrap(); - }); + let mut writer = KvWriterFieldId::memory(); + object.iter().for_each(|(key, value)| { + let key = fields_ids_map.id(key).unwrap(); + /// TODO better error management + let value = serde_json::to_vec(&value).unwrap(); + /// TODO it is not ordered + writer.insert(key, value).unwrap(); + }); - let document = writer.into_boxed(); - let external_docid = match primary_key.document_id(&document, fields_ids_map)? { - Ok(document_id) => Ok(document_id), - Err(DocumentIdExtractionError::InvalidDocumentId(user_error)) => Err(user_error), - Err(DocumentIdExtractionError::MissingDocumentId) => { - Err(UserError::MissingDocumentId { - primary_key: primary_key.name().to_string(), - document: all_obkv_to_json(&document, fields_ids_map)?, - }) - } - Err(DocumentIdExtractionError::TooManyDocumentIds(_)) => { - Err(UserError::TooManyDocumentIds { - primary_key: primary_key.name().to_string(), - document: all_obkv_to_json(&document, fields_ids_map)?, - }) - } - }?; + let document = writer.into_boxed(); + let external_docid = match primary_key.document_id(&document, fields_ids_map)? { + Ok(document_id) => Ok(document_id), + Err(DocumentIdExtractionError::InvalidDocumentId(user_error)) => { + Err(user_error) + } + Err(DocumentIdExtractionError::MissingDocumentId) => { + Err(UserError::MissingDocumentId { + primary_key: primary_key.name().to_string(), + document: all_obkv_to_json(&document, fields_ids_map)?, + }) + } + Err(DocumentIdExtractionError::TooManyDocumentIds(_)) => { + Err(UserError::TooManyDocumentIds { + primary_key: primary_key.name().to_string(), + document: all_obkv_to_json(&document, fields_ids_map)?, + }) + } + }?; - let insertion = Insertion::create(docid, document); - Ok(DocumentChange::Insertion(insertion)) - })) + let insertion = Insertion::create(docid, document); + Ok(DocumentChange::Insertion(insertion)) + }, + )) } } diff --git a/milli/src/update/new/items_pool.rs b/milli/src/update/new/items_pool.rs index 92a6d5e64..01a2cf933 100644 --- a/milli/src/update/new/items_pool.rs +++ b/milli/src/update/new/items_pool.rs @@ -4,7 +4,9 @@ use crossbeam_channel::{Receiver, Sender, TryRecvError}; use rayon::iter::{MapInit, ParallelIterator}; pub trait ParallelIteratorExt: ParallelIterator { - /// A method on a parallel iterator to map + /// Maps items based on the init function. + /// + /// The init function is ran only as necessary which is basically once by thread. fn try_map_try_init( self, init: INIT,