diff --git a/milli/src/update/new/indexer/document_deletion.rs b/milli/src/update/new/indexer/document_deletion.rs index c16299e9a..5e43b5816 100644 --- a/milli/src/update/new/indexer/document_deletion.rs +++ b/milli/src/update/new/indexer/document_deletion.rs @@ -28,7 +28,7 @@ impl<'p> DocumentChanges<'p> for DocumentDeletion { fn document_changes( self, param: Self::Parameter, - ) -> Result>> + 'p> { + ) -> Result> + 'p> { let (index, fields, primary_key) = param; let items = Arc::new(ItemsPool::new(|| index.read_txn().map_err(crate::Error::from))); Ok(self.to_delete.into_iter().par_bridge().map_with(items, |items, docid| { @@ -42,11 +42,11 @@ impl<'p> DocumentChanges<'p> for DocumentDeletion { .into()), }?; - Ok(Some(DocumentChange::Deletion(Deletion::create( + Ok(DocumentChange::Deletion(Deletion::create( docid, external_docid, current.boxed(), - )))) + ))) }) })) } diff --git a/milli/src/update/new/indexer/document_operation.rs b/milli/src/update/new/indexer/document_operation.rs index 5d9755211..26228c354 100644 --- a/milli/src/update/new/indexer/document_operation.rs +++ b/milli/src/update/new/indexer/document_operation.rs @@ -76,7 +76,7 @@ impl<'p> DocumentChanges<'p> for DocumentOperation { fn document_changes( self, param: Self::Parameter, - ) -> Result>> + 'p> { + ) -> Result> + 'p> { let (index, rtxn, fields_ids_map, primary_key) = param; let documents_ids = index.documents_ids(rtxn)?; @@ -170,27 +170,85 @@ impl<'p> DocumentChanges<'p> for DocumentOperation { } } - Ok(docids_version_offsets.into_par_iter().map_with( - Arc::new(ItemsPool::new(|| index.read_txn().map_err(crate::Error::from))), - move |context_pool, (external_docid, (internal_docid, operations))| { - context_pool.with(|rtxn| { - use IndexDocumentsMethod as Idm; - let document_merge_function = match self.index_documents_method { - Idm::ReplaceDocuments => merge_document_for_replacements, - Idm::UpdateDocuments => merge_document_for_updates, - }; + Ok(docids_version_offsets + .into_par_iter() + .map_with( + Arc::new(ItemsPool::new(|| index.read_txn().map_err(crate::Error::from))), + move |context_pool, (external_docid, (internal_docid, operations))| { + context_pool.with(|rtxn| { + use IndexDocumentsMethod as Idm; + let document_merge_function = match self.index_documents_method { + Idm::ReplaceDocuments => merge_document_for_replacements, + Idm::UpdateDocuments => merge_document_for_updates, + }; - document_merge_function( - rtxn, - index, - fields_ids_map, - internal_docid, - external_docid, - &operations, - ) - }) - }, - )) + document_merge_function( + rtxn, + index, + fields_ids_map, + internal_docid, + external_docid, + &operations, + ) + }) + }, + ) + .filter_map(Result::transpose)) + } +} + +/// Returns only the most recent version of a document based on the updates from the payloads. +/// +/// This function is only meant to be used when doing a replacement and not an update. +fn merge_document_for_replacements( + rtxn: &RoTxn, + index: &Index, + fields_ids_map: &FieldsIdsMap, + docid: DocumentId, + external_docid: String, + operations: &[InnerDocOp], +) -> Result> { + let current = index.documents.remap_data_type::().get(rtxn, &docid)?; + let current: Option<&KvReaderFieldId> = current.map(Into::into); + + match operations.last() { + Some(InnerDocOp::Addition(DocumentOffset { content, offset })) => { + let reader = DocumentsBatchReader::from_reader(Cursor::new(content.as_ref()))?; + let (mut cursor, batch_index) = reader.into_cursor_and_fields_index(); + let update = cursor.get(*offset)?.expect("must exists"); + + let mut document_entries = Vec::new(); + update.into_iter().for_each(|(k, v)| { + let field_name = batch_index.name(k).unwrap(); + let id = fields_ids_map.id(field_name).unwrap(); + document_entries.push((id, v)); + }); + + document_entries.sort_unstable_by_key(|(id, _)| *id); + + let mut writer = KvWriterFieldId::memory(); + document_entries.into_iter().for_each(|(id, value)| writer.insert(id, value).unwrap()); + let new = writer.into_boxed(); + + match current { + Some(current) => { + let update = Update::create(docid, external_docid, current.boxed(), new); + Ok(Some(DocumentChange::Update(update))) + } + None => { + let insertion = Insertion::create(docid, external_docid, new); + Ok(Some(DocumentChange::Insertion(insertion))) + } + } + } + Some(InnerDocOp::Deletion) => match current { + Some(current) => { + let deletion = Deletion::create(docid, external_docid, current.boxed()); + Ok(Some(DocumentChange::Deletion(deletion))) + } + None => Ok(None), + }, + None => Ok(None), // but it's strange } } @@ -271,58 +329,3 @@ fn merge_document_for_updates( } } } - -/// Returns only the most recent version of a document based on the updates from the payloads. -/// -/// This function is only meant to be used when doing a replacement and not an update. -fn merge_document_for_replacements( - rtxn: &RoTxn, - index: &Index, - fields_ids_map: &FieldsIdsMap, - docid: DocumentId, - external_docid: String, - operations: &[InnerDocOp], -) -> Result> { - let current = index.documents.remap_data_type::().get(rtxn, &docid)?; - let current: Option<&KvReaderFieldId> = current.map(Into::into); - - match operations.last() { - Some(InnerDocOp::Addition(DocumentOffset { content, offset })) => { - let reader = DocumentsBatchReader::from_reader(Cursor::new(content.as_ref()))?; - let (mut cursor, batch_index) = reader.into_cursor_and_fields_index(); - let update = cursor.get(*offset)?.expect("must exists"); - - let mut document_entries = Vec::new(); - update.into_iter().for_each(|(k, v)| { - let field_name = batch_index.name(k).unwrap(); - let id = fields_ids_map.id(field_name).unwrap(); - document_entries.push((id, v)); - }); - - document_entries.sort_unstable_by_key(|(id, _)| *id); - - let mut writer = KvWriterFieldId::memory(); - document_entries.into_iter().for_each(|(id, value)| writer.insert(id, value).unwrap()); - let new = writer.into_boxed(); - - match current { - Some(current) => { - let update = Update::create(docid, external_docid, current.boxed(), new); - Ok(Some(DocumentChange::Update(update))) - } - None => { - let insertion = Insertion::create(docid, external_docid, new); - Ok(Some(DocumentChange::Insertion(insertion))) - } - } - } - Some(InnerDocOp::Deletion) => match current { - Some(current) => { - let deletion = Deletion::create(docid, external_docid, current.boxed()); - Ok(Some(DocumentChange::Deletion(deletion))) - } - None => Ok(None), - }, - None => Ok(None), // but it's strange - } -} diff --git a/milli/src/update/new/indexer/mod.rs b/milli/src/update/new/indexer/mod.rs index 69ccc0451..ba4356288 100644 --- a/milli/src/update/new/indexer/mod.rs +++ b/milli/src/update/new/indexer/mod.rs @@ -1,4 +1,4 @@ -use std::thread; +use std::thread::{self, Builder}; use big_s::S; pub use document_deletion::DocumentDeletion; @@ -28,7 +28,7 @@ pub trait DocumentChanges<'p> { fn document_changes( self, param: Self::Parameter, - ) -> Result>> + 'p>; + ) -> Result> + 'p>; } /// This is the main function of this crate. @@ -43,7 +43,7 @@ pub fn index( document_changes: PI, ) -> Result<()> where - PI: IntoParallelIterator>> + Send, + PI: IntoParallelIterator> + Send, PI::Iter: Clone, { let (merger_sender, writer_receiver) = merger_writer_channel(100); @@ -52,20 +52,19 @@ where thread::scope(|s| { // TODO manage the errors correctly - let handle = - thread::Builder::new().name(S("indexer-extractors")).spawn_scoped(s, || { - pool.in_place_scope(|_s| { - // word docids - // document_changes.into_par_iter().try_for_each(|_dc| Ok(()) as Result<_>) - // let grenads = extractor_function(document_changes)?; - // deladd_cbo_roaring_bitmap_sender.word_docids(grenads)?; + let handle = Builder::new().name(S("indexer-extractors")).spawn_scoped(s, || { + pool.in_place_scope(|_s| { + // word docids + // document_changes.into_par_iter().try_for_each(|_dc| Ok(()) as Result<_>) + // let grenads = extractor_function(document_changes)?; + // deladd_cbo_roaring_bitmap_sender.word_docids(grenads)?; - Ok(()) as Result<_> - }) - })?; + Ok(()) as Result<_> + }) + })?; // TODO manage the errors correctly - let handle2 = thread::Builder::new().name(S("indexer-merger")).spawn_scoped(s, || { + let handle2 = Builder::new().name(S("indexer-merger")).spawn_scoped(s, || { let rtxn = index.read_txn().unwrap(); merge_grenad_entries(merger_receiver, merger_sender, &rtxn, index) })?; diff --git a/milli/src/update/new/indexer/partial_dump.rs b/milli/src/update/new/indexer/partial_dump.rs index 11c9fbd0e..d324322a7 100644 --- a/milli/src/update/new/indexer/partial_dump.rs +++ b/milli/src/update/new/indexer/partial_dump.rs @@ -31,7 +31,7 @@ where fn document_changes( self, param: Self::Parameter, - ) -> Result>> + 'p> { + ) -> Result> + 'p> { let (fields_ids_map, concurrent_available_ids, primary_key) = param; Ok(self.iter.into_iter().par_bridge().map(|object| { @@ -68,7 +68,7 @@ where }?; let insertion = Insertion::create(docid, external_docid, document); - Ok(Some(DocumentChange::Insertion(insertion))) + Ok(DocumentChange::Insertion(insertion)) })) } } diff --git a/milli/src/update/new/indexer/update_by_function.rs b/milli/src/update/new/indexer/update_by_function.rs index 035f95c02..91e1fd4ee 100644 --- a/milli/src/update/new/indexer/update_by_function.rs +++ b/milli/src/update/new/indexer/update_by_function.rs @@ -12,7 +12,7 @@ impl<'p> DocumentChanges<'p> for UpdateByFunction { fn document_changes( self, _param: Self::Parameter, - ) -> Result>> + 'p> { + ) -> Result> + 'p> { todo!(); Ok(vec![].into_par_iter()) }