Add SP to into_changes

This commit is contained in:
Louis Dureuil 2024-11-20 15:06:49 +01:00
parent 567bd4538b
commit 867138f166
No known key found for this signature in database
2 changed files with 24 additions and 5 deletions

View File

@ -9,10 +9,11 @@ use serde_json::value::RawValue;
use serde_json::Deserializer; use serde_json::Deserializer;
use super::super::document_change::DocumentChange; use super::super::document_change::DocumentChange;
use super::document_changes::{DocumentChangeContext, DocumentChanges}; use super::document_changes::{DocumentChangeContext, DocumentChanges, Progress};
use super::retrieve_or_guess_primary_key; use super::retrieve_or_guess_primary_key;
use crate::documents::PrimaryKey; use crate::documents::PrimaryKey;
use crate::update::new::document::Versions; use crate::update::new::document::Versions;
use crate::update::new::steps::Step;
use crate::update::new::thread_local::MostlySend; use crate::update::new::thread_local::MostlySend;
use crate::update::new::{Deletion, Insertion, Update}; use crate::update::new::{Deletion, Insertion, Update};
use crate::update::{AvailableIds, IndexDocumentsMethod}; use crate::update::{AvailableIds, IndexDocumentsMethod};
@ -41,7 +42,8 @@ impl<'pl> DocumentOperation<'pl> {
self.operations.push(Payload::Deletion(to_delete)) self.operations.push(Payload::Deletion(to_delete))
} }
pub fn into_changes<MSP>( #[allow(clippy::too_many_arguments)]
pub fn into_changes<MSP, SP>(
self, self,
indexer: &'pl Bump, indexer: &'pl Bump,
index: &Index, index: &Index,
@ -49,9 +51,11 @@ impl<'pl> DocumentOperation<'pl> {
primary_key_from_op: Option<&'pl str>, primary_key_from_op: Option<&'pl str>,
new_fields_ids_map: &mut FieldsIdsMap, new_fields_ids_map: &mut FieldsIdsMap,
must_stop_processing: &MSP, must_stop_processing: &MSP,
send_progress: &SP,
) -> Result<(DocumentOperationChanges<'pl>, Vec<PayloadStats>, Option<PrimaryKey<'pl>>)> ) -> Result<(DocumentOperationChanges<'pl>, Vec<PayloadStats>, Option<PrimaryKey<'pl>>)>
where where
MSP: Fn() -> bool, MSP: Fn() -> bool,
SP: Fn(Progress),
{ {
let Self { operations, method } = self; let Self { operations, method } = self;
@ -61,10 +65,18 @@ impl<'pl> DocumentOperation<'pl> {
let mut docids_version_offsets = hashbrown::HashMap::new(); let mut docids_version_offsets = hashbrown::HashMap::new();
let mut primary_key = None; let mut primary_key = None;
for operation in operations { let payload_count = operations.len();
if (must_stop_processing)() {
for (payload_index, operation) in operations.into_iter().enumerate() {
if must_stop_processing() {
return Err(InternalError::AbortedIndexation.into()); return Err(InternalError::AbortedIndexation.into());
} }
send_progress(Progress::from_step_documents(
Step::PreparingPayloads,
payload_index as u32,
payload_count as u32,
));
let mut bytes = 0; let mut bytes = 0;
let result = match operation { let result = match operation {
Payload::Addition(payload) => extract_addition_payload_changes( Payload::Addition(payload) => extract_addition_payload_changes(
@ -102,10 +114,15 @@ impl<'pl> DocumentOperation<'pl> {
Err(Error::UserError(user_error)) => Some(user_error), Err(Error::UserError(user_error)) => Some(user_error),
Err(e) => return Err(e), Err(e) => return Err(e),
}; };
operations_stats.push(PayloadStats { document_count, bytes, error }); operations_stats.push(PayloadStats { document_count, bytes, error });
} }
send_progress(Progress::from_step_documents(
Step::PreparingPayloads,
payload_count as u32,
payload_count as u32,
));
// TODO We must drain the HashMap into a Vec because rayon::hash_map::IntoIter: !Clone // TODO We must drain the HashMap into a Vec because rayon::hash_map::IntoIter: !Clone
let mut docids_version_offsets: bumpalo::collections::vec::Vec<_> = let mut docids_version_offsets: bumpalo::collections::vec::Vec<_> =
docids_version_offsets.drain().collect_in(indexer); docids_version_offsets.drain().collect_in(indexer);

View File

@ -3,6 +3,7 @@ use enum_iterator::Sequence;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Sequence)] #[derive(Debug, Clone, Copy, PartialEq, Eq, Sequence)]
#[repr(u16)] #[repr(u16)]
pub enum Step { pub enum Step {
PreparingPayloads,
ExtractingDocuments, ExtractingDocuments,
ExtractingFacets, ExtractingFacets,
ExtractingWords, ExtractingWords,
@ -20,6 +21,7 @@ pub enum Step {
impl Step { impl Step {
pub fn name(&self) -> &'static str { pub fn name(&self) -> &'static str {
match self { match self {
Step::PreparingPayloads => "preparing update file",
Step::ExtractingDocuments => "extracting documents", Step::ExtractingDocuments => "extracting documents",
Step::ExtractingFacets => "extracting facets", Step::ExtractingFacets => "extracting facets",
Step::ExtractingWords => "extracting words", Step::ExtractingWords => "extracting words",