From 867138f1661ff3d2924b0320fe49d34e577401f2 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Wed, 20 Nov 2024 15:06:49 +0100 Subject: [PATCH] Add SP to into_changes --- .../update/new/indexer/document_operation.rs | 27 +++++++++++++++---- crates/milli/src/update/new/steps.rs | 2 ++ 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/crates/milli/src/update/new/indexer/document_operation.rs b/crates/milli/src/update/new/indexer/document_operation.rs index e5148258d..76baf1580 100644 --- a/crates/milli/src/update/new/indexer/document_operation.rs +++ b/crates/milli/src/update/new/indexer/document_operation.rs @@ -9,10 +9,11 @@ use serde_json::value::RawValue; use serde_json::Deserializer; use super::super::document_change::DocumentChange; -use super::document_changes::{DocumentChangeContext, DocumentChanges}; +use super::document_changes::{DocumentChangeContext, DocumentChanges, Progress}; use super::retrieve_or_guess_primary_key; use crate::documents::PrimaryKey; use crate::update::new::document::Versions; +use crate::update::new::steps::Step; use crate::update::new::thread_local::MostlySend; use crate::update::new::{Deletion, Insertion, Update}; use crate::update::{AvailableIds, IndexDocumentsMethod}; @@ -41,7 +42,8 @@ impl<'pl> DocumentOperation<'pl> { self.operations.push(Payload::Deletion(to_delete)) } - pub fn into_changes( + #[allow(clippy::too_many_arguments)] + pub fn into_changes( self, indexer: &'pl Bump, index: &Index, @@ -49,9 +51,11 @@ impl<'pl> DocumentOperation<'pl> { primary_key_from_op: Option<&'pl str>, new_fields_ids_map: &mut FieldsIdsMap, must_stop_processing: &MSP, + send_progress: &SP, ) -> Result<(DocumentOperationChanges<'pl>, Vec, Option>)> where MSP: Fn() -> bool, + SP: Fn(Progress), { let Self { operations, method } = self; @@ -61,10 +65,18 @@ impl<'pl> DocumentOperation<'pl> { let mut docids_version_offsets = hashbrown::HashMap::new(); let mut primary_key = None; - for operation in operations { - if (must_stop_processing)() { + let payload_count = operations.len(); + + for (payload_index, operation) in operations.into_iter().enumerate() { + if must_stop_processing() { return Err(InternalError::AbortedIndexation.into()); } + send_progress(Progress::from_step_documents( + Step::PreparingPayloads, + payload_index as u32, + payload_count as u32, + )); + let mut bytes = 0; let result = match operation { Payload::Addition(payload) => extract_addition_payload_changes( @@ -102,10 +114,15 @@ impl<'pl> DocumentOperation<'pl> { Err(Error::UserError(user_error)) => Some(user_error), Err(e) => return Err(e), }; - operations_stats.push(PayloadStats { document_count, bytes, error }); } + send_progress(Progress::from_step_documents( + Step::PreparingPayloads, + payload_count as u32, + payload_count as u32, + )); + // TODO We must drain the HashMap into a Vec because rayon::hash_map::IntoIter: !Clone let mut docids_version_offsets: bumpalo::collections::vec::Vec<_> = docids_version_offsets.drain().collect_in(indexer); diff --git a/crates/milli/src/update/new/steps.rs b/crates/milli/src/update/new/steps.rs index 60a0c872b..7c2441933 100644 --- a/crates/milli/src/update/new/steps.rs +++ b/crates/milli/src/update/new/steps.rs @@ -3,6 +3,7 @@ use enum_iterator::Sequence; #[derive(Debug, Clone, Copy, PartialEq, Eq, Sequence)] #[repr(u16)] pub enum Step { + PreparingPayloads, ExtractingDocuments, ExtractingFacets, ExtractingWords, @@ -20,6 +21,7 @@ pub enum Step { impl Step { pub fn name(&self) -> &'static str { match self { + Step::PreparingPayloads => "preparing update file", Step::ExtractingDocuments => "extracting documents", Step::ExtractingFacets => "extracting facets", Step::ExtractingWords => "extracting words",