diff --git a/crates/milli/src/index.rs b/crates/milli/src/index.rs index e65d96bb4..d77b6b1da 100644 --- a/crates/milli/src/index.rs +++ b/crates/milli/src/index.rs @@ -1752,7 +1752,7 @@ pub(crate) mod tests { let embedders = EmbeddingConfigs::default(); /// TODO: fetch configs from the index let mut indexer = - indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); + indexer::DocumentOperation::new(self.index_documents_config.update_method); indexer.add_documents(&documents).unwrap(); let indexer_alloc = Bump::new(); @@ -1832,7 +1832,7 @@ pub(crate) mod tests { let embedders = EmbeddingConfigs::default(); let mut indexer = - indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); + indexer::DocumentOperation::new(self.index_documents_config.update_method); let external_document_ids: Vec<_> = external_document_ids.iter().map(AsRef::as_ref).collect(); indexer.delete_documents(external_document_ids.as_slice()); @@ -2429,7 +2429,16 @@ pub(crate) mod tests { // And adding lots of documents afterwards instead of just one. // These extra subtests don't add much, but it's better than nothing. - index.add_documents(documents!([{ "primary_key": 38 }, { "primary_key": 39 }, { "primary_key": 41 }, { "primary_key": 40 }, { "primary_key": 41 }, { "primary_key": 42 }])).unwrap(); + index + .add_documents(documents!([ + { "primary_key": 38 }, + { "primary_key": 39 }, + { "primary_key": 41 }, + { "primary_key": 40 }, + { "primary_key": 41 }, + { "primary_key": 42 }, + ])) + .unwrap(); db_snap!(index, documents_ids, @"[0, 1, 2, 3, 4, 5, ]"); db_snap!(index, external_documents_ids, 7, @r###" diff --git a/crates/milli/src/update/new/indexer/document_operation.rs b/crates/milli/src/update/new/indexer/document_operation.rs index 5dd897d34..b9f81f28d 100644 --- a/crates/milli/src/update/new/indexer/document_operation.rs +++ b/crates/milli/src/update/new/indexer/document_operation.rs @@ -57,7 +57,9 @@ impl<'pl> DocumentOperation<'pl> { let mut primary_key = None; for operation in operations { - let (bytes, document_count, result) = match operation { + let mut bytes = 0; + let mut document_count = 0; + let result = match operation { Payload::Addition(payload) => extract_addition_payload_changes( indexer, index, @@ -66,6 +68,8 @@ impl<'pl> DocumentOperation<'pl> { &mut primary_key, new_fields_ids_map, &mut available_docids, + &mut bytes, + &mut document_count, &docids_version_offsets, method, payload, @@ -74,6 +78,7 @@ impl<'pl> DocumentOperation<'pl> { index, rtxn, &mut available_docids, + &mut document_count, &docids_version_offsets, method, to_delete, @@ -122,67 +127,38 @@ fn extract_addition_payload_changes<'r, 'pl: 'r>( primary_key: &mut Option>, new_fields_ids_map: &mut FieldsIdsMap, available_docids: &mut AvailableIds, + bytes: &mut u64, + number_of_documents: &mut u64, main_docids_version_offsets: &hashbrown::HashMap<&'pl str, PayloadOperations<'pl>>, method: MergeMethod, payload: &'pl [u8], -) -> (u64, u64, Result>>) { +) -> Result>> { let mut new_docids_version_offsets = hashbrown::HashMap::<&str, PayloadOperations<'pl>>::new(); /// TODO manage the error let mut previous_offset = 0; let mut iter = Deserializer::from_slice(payload).into_iter::<&RawValue>(); - loop { - let optdoc = match iter.next().transpose() { - Ok(optdoc) => optdoc, - Err(e) => { - return ( - payload.len() as u64, - new_docids_version_offsets.len() as u64, - Err(InternalError::SerdeJson(e).into()), - ) - } - }; + while let Some(doc) = iter.next().transpose().map_err(InternalError::SerdeJson)? { + *bytes = previous_offset as u64; + *number_of_documents = new_docids_version_offsets.len() as u64; // Only guess the primary key if it is the first document let retrieved_primary_key = if previous_offset == 0 { - let optdoc = match optdoc { - Some(doc) => match RawMap::from_raw_value(doc, indexer) { - Ok(docmap) => Some(docmap), - Err(error) => { - return ( - payload.len() as u64, - new_docids_version_offsets.len() as u64, - Err(Error::UserError(UserError::SerdeJson(error))), - ) - } - }, - None => None, - }; + let doc = + RawMap::from_raw_value(doc, indexer).map(Some).map_err(UserError::SerdeJson)?; let result = retrieve_or_guess_primary_key( rtxn, index, new_fields_ids_map, primary_key_from_op, - optdoc, + doc, ); let (pk, _has_been_changed) = match result { Ok(Ok(pk)) => pk, - Ok(Err(user_error)) => { - return ( - payload.len() as u64, - new_docids_version_offsets.len() as u64, - Err(Error::UserError(user_error)), - ) - } - Err(error) => { - return ( - payload.len() as u64, - new_docids_version_offsets.len() as u64, - Err(error), - ) - } + Ok(Err(user_error)) => return Err(Error::UserError(user_error)), + Err(error) => return Err(error), }; primary_key.get_or_insert(pk) @@ -190,20 +166,13 @@ fn extract_addition_payload_changes<'r, 'pl: 'r>( primary_key.as_ref().unwrap() }; - let doc = match optdoc { - Some(doc) => doc, - None => break, - }; - let external_id = match retrieved_primary_key.extract_fields_and_docid( doc, new_fields_ids_map, indexer, ) { Ok(edi) => edi, - Err(e) => { - return (payload.len() as u64, new_docids_version_offsets.len() as u64, Err(e)) - } + Err(e) => return Err(e), }; let external_id = external_id.to_de(); @@ -212,40 +181,38 @@ fn extract_addition_payload_changes<'r, 'pl: 'r>( match main_docids_version_offsets.get(external_id) { None => { - let (docid, is_new) = match index.external_documents_ids().get(rtxn, external_id) { - Ok(Some(docid)) => (docid, false), - Ok(None) => ( - match available_docids.next() { - Some(docid) => docid, - None => { - return ( - payload.len() as u64, - new_docids_version_offsets.len() as u64, - Err(UserError::DocumentLimitReached.into()), - ) - } - }, - true, - ), - Err(e) => { - return ( - payload.len() as u64, - new_docids_version_offsets.len() as u64, - Err(e.into()), - ) - } - }; - - match new_docids_version_offsets.entry(external_id) { - Entry::Occupied(mut entry) => entry.get_mut().push_addition(document_offset), - Entry::Vacant(entry) => { - entry.insert(PayloadOperations::new_addition( - method, - docid, - is_new, - document_offset, - )); - } + match index.external_documents_ids().get(rtxn, external_id) { + Ok(Some(docid)) => match new_docids_version_offsets.entry(external_id) { + Entry::Occupied(mut entry) => { + entry.get_mut().push_addition(document_offset) + } + Entry::Vacant(entry) => { + entry.insert(PayloadOperations::new_addition( + method, + docid, + false, // is new + document_offset, + )); + } + }, + Ok(None) => match new_docids_version_offsets.entry(external_id) { + Entry::Occupied(mut entry) => { + entry.get_mut().push_addition(document_offset) + } + Entry::Vacant(entry) => { + let docid = match available_docids.next() { + Some(docid) => docid, + None => return Err(UserError::DocumentLimitReached.into()), + }; + entry.insert(PayloadOperations::new_addition( + method, + docid, + true, // is new + document_offset, + )); + } + }, + Err(e) => return Err(e.into()), } } Some(payload_operations) => match new_docids_version_offsets.entry(external_id) { @@ -264,46 +231,49 @@ fn extract_addition_payload_changes<'r, 'pl: 'r>( previous_offset = iter.byte_offset(); } - (payload.len() as u64, new_docids_version_offsets.len() as u64, Ok(new_docids_version_offsets)) + Ok(new_docids_version_offsets) } fn extract_deletion_payload_changes<'s, 'pl: 's>( index: &Index, rtxn: &RoTxn, available_docids: &mut AvailableIds, + number_of_documents: &mut u64, main_docids_version_offsets: &hashbrown::HashMap<&'s str, PayloadOperations<'pl>>, method: MergeMethod, to_delete: &'pl [&'pl str], -) -> (u64, u64, Result>>) { +) -> Result>> { let mut new_docids_version_offsets = hashbrown::HashMap::<&str, PayloadOperations<'pl>>::new(); - let mut document_count = 0; for external_id in to_delete { match main_docids_version_offsets.get(external_id) { None => { - let (docid, is_new) = match index.external_documents_ids().get(rtxn, external_id) { - Ok(Some(docid)) => (docid, false), - Ok(None) => ( - match available_docids.next() { - Some(docid) => docid, - None => { - return ( - 0, - new_docids_version_offsets.len() as u64, - Err(UserError::DocumentLimitReached.into()), - ) + match index.external_documents_ids().get(rtxn, external_id) { + Ok(Some(docid)) => { + match new_docids_version_offsets.entry(external_id) { + Entry::Occupied(mut entry) => entry.get_mut().push_deletion(), + Entry::Vacant(entry) => { + entry.insert(PayloadOperations::new_deletion( + method, docid, false, // is new + )); } - }, - true, - ), - Err(e) => return (0, new_docids_version_offsets.len() as u64, Err(e.into())), - }; - - match new_docids_version_offsets.entry(external_id) { - Entry::Occupied(mut entry) => entry.get_mut().push_deletion(), - Entry::Vacant(entry) => { - entry.insert(PayloadOperations::new_deletion(method, docid, is_new)); + } } + Ok(None) => { + let docid = match available_docids.next() { + Some(docid) => docid, + None => return Err(UserError::DocumentLimitReached.into()), + }; + match new_docids_version_offsets.entry(external_id) { + Entry::Occupied(mut entry) => entry.get_mut().push_deletion(), + Entry::Vacant(entry) => { + entry.insert(PayloadOperations::new_deletion( + method, docid, true, // is new + )); + } + } + } + Err(e) => return Err(e.into()), } } Some(payload_operations) => match new_docids_version_offsets.entry(external_id) { @@ -317,10 +287,10 @@ fn extract_deletion_payload_changes<'s, 'pl: 's>( } }, } - document_count += 1; + *number_of_documents += 1; } - (0, document_count, Ok(new_docids_version_offsets)) + Ok(new_docids_version_offsets) } fn merge_version_offsets<'s, 'pl>(