From b625d31c7d411d57bc49f0d5aad76f2f90f809ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Fri, 30 Aug 2024 15:07:21 +0200 Subject: [PATCH] Introduce the PartialDumpIndexer indexer that generates document ids in parallel --- milli/src/update/new/mod.rs | 108 +++++++++++++++++++++++------------- 1 file changed, 68 insertions(+), 40 deletions(-) diff --git a/milli/src/update/new/mod.rs b/milli/src/update/new/mod.rs index 02b61dde4..3d9800657 100644 --- a/milli/src/update/new/mod.rs +++ b/milli/src/update/new/mod.rs @@ -21,6 +21,7 @@ mod indexer { use heed::types::Bytes; use heed::{RoTxn, RwTxn}; use memmap2::Mmap; + use obkv::KvWriter; use rayon::iter::{IntoParallelIterator, ParallelBridge, ParallelIterator}; use rayon::ThreadPool; use roaring::RoaringBitmap; @@ -35,14 +36,13 @@ mod indexer { use crate::documents::{ obkv_to_object, DocumentIdExtractionError, DocumentsBatchReader, PrimaryKey, }; + use crate::update::concurrent_available_ids::ConcurrentAvailableIds; use crate::update::del_add::DelAdd; use crate::update::new::channel::MergerOperation; - use crate::update::{ - AvailableDocumentsIds, IndexDocumentsMethod, MergeDeladdCboRoaringBitmaps, - }; + use crate::update::{AvailableIds, IndexDocumentsMethod, MergeDeladdCboRoaringBitmaps}; use crate::{ - CboRoaringBitmapCodec, DocumentId, Error, FieldId, FieldsIdsMap, Index, InternalError, - Result, UserError, + all_obkv_to_json, obkv_to_json, CboRoaringBitmapCodec, DocumentId, Error, FieldId, + FieldsIdsMap, Index, InternalError, Object, Result, UserError, }; pub type KvReaderFieldId = obkv::KvReader; @@ -108,7 +108,7 @@ mod indexer { primary_key: &'a PrimaryKey<'a>, ) -> Result>> + 'a> { let documents_ids = index.documents_ids(rtxn)?; - let mut available_docids = AvailableDocumentsIds::from_documents_ids(&documents_ids); + let mut available_docids = AvailableIds::new(&documents_ids); let mut docids_version_offsets = HashMap::::new(); for operation in self.operations { @@ -127,7 +127,7 @@ mod indexer { let mut offset: u32 = 0; while let Some(document) = batch_cursor.next_document()? { let external_document_id = - match primary_key.document_id(&document, &batch_index)? { + match primary_key.document_id(document, &batch_index)? { Ok(document_id) => Ok(document_id), Err(DocumentIdExtractionError::InvalidDocumentId( user_error, @@ -135,13 +135,13 @@ mod indexer { Err(DocumentIdExtractionError::MissingDocumentId) => { Err(UserError::MissingDocumentId { primary_key: primary_key.name().to_string(), - document: obkv_to_object(&document, &batch_index)?, + document: obkv_to_object(document, &batch_index)?, }) } Err(DocumentIdExtractionError::TooManyDocumentIds(_)) => { Err(UserError::TooManyDocumentIds { primary_key: primary_key.name().to_string(), - document: obkv_to_object(&document, &batch_index)?, + document: obkv_to_object(document, &batch_index)?, }) } }?; @@ -163,7 +163,7 @@ mod indexer { }; docids_version_offsets.insert( - external_document_id.into(), + external_document_id, (docid, vec![document_operation]), ); } @@ -275,43 +275,71 @@ mod indexer { } } - pub struct DumpIndexer; + pub struct PartialDumpIndexer { + iter: I, + } - impl DumpIndexer { - pub fn new() -> Self { - todo!() + impl PartialDumpIndexer + where + I: IntoIterator, + I::IntoIter: Send, + I::Item: Send, + { + pub fn new_from_jsonlines(iter: I) -> Self { + PartialDumpIndexer { iter } } - pub fn document_changes_from_json_iter( + /// Note for future self: + /// - the field ids map must already be valid so you must have to generate it beforehand. + /// - We should probably expose another method that generates the fields ids map from an iterator of JSON objects. + /// - We recommend sending chunks of documents in this `PartialDumpIndexer` we therefore need to create a custom take_while_size method (that doesn't drop items). + pub fn document_changes<'a>( self, - iter: I, - index: &Index, - ) -> impl ParallelIterator + fields_ids_map: &'a FieldsIdsMap, + concurrent_available_ids: &'a ConcurrentAvailableIds, + primary_key: &'a PrimaryKey<'a>, + ) -> impl ParallelIterator>> + 'a where - I: IntoIterator, + // I don't like this, it will not fit in the future trait easily + I::IntoIter: 'a, { - // let items = Arc::new(ItemsPool::new(|| { - // let rtxn = index.read_txn()?; - // let fields = index.fields_ids_map(&rtxn)?; - // let primary_key = - // index.primary_key(&rtxn)?.ok_or(InternalError::DatabaseMissingEntry { - // db_name: db_name::MAIN, - // key: Some(main_key::PRIMARY_KEY_KEY), - // })?; - // let primary_key = PrimaryKey::new(primary_key, &fields).ok_or_else(|| { - // InternalError::FieldIdMapMissingEntry( - // crate::FieldIdMapMissingEntry::FieldName { - // field_name: primary_key.to_owned(), - // process: "external_id_of", - // }, - // ) - // })?; - // Ok(DeleteDocumentExternalDocumentIdGetter { rtxn, fields, primary_key }) - // as crate::Result<_> - // })); + self.iter.into_iter().par_bridge().map(|object| { + let docid = match concurrent_available_ids.next() { + Some(id) => id, + None => return Err(Error::UserError(UserError::DocumentLimitReached)), + }; - todo!(); - vec![].into_par_iter() + let mut writer = KvWriterFieldId::memory(); + object.iter().for_each(|(key, value)| { + let key = fields_ids_map.id(key).unwrap(); + /// TODO better error management + let value = serde_json::to_vec(&value).unwrap(); + writer.insert(key, value).unwrap(); + }); + + let document = writer.into_boxed(); + let external_docid = match primary_key.document_id(&document, fields_ids_map)? { + Ok(document_id) => Ok(document_id), + Err(DocumentIdExtractionError::InvalidDocumentId(user_error)) => { + Err(user_error) + } + Err(DocumentIdExtractionError::MissingDocumentId) => { + Err(UserError::MissingDocumentId { + primary_key: primary_key.name().to_string(), + document: all_obkv_to_json(&document, fields_ids_map)?, + }) + } + Err(DocumentIdExtractionError::TooManyDocumentIds(_)) => { + Err(UserError::TooManyDocumentIds { + primary_key: primary_key.name().to_string(), + document: all_obkv_to_json(&document, fields_ids_map)?, + }) + } + }?; + + let insertion = Insertion::create(docid, external_docid, document); + Ok(Some(DocumentChange::Insertion(insertion))) + }) } }