From 54d07a8da3854a99263c6c74096d09fd139d5f20 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Mon, 30 Oct 2023 14:47:51 +0100 Subject: [PATCH] Update field distribution taking into account both deletions and additions --- milli/src/update/index_documents/transform.rs | 65 ++++++++++++++----- 1 file changed, 50 insertions(+), 15 deletions(-) diff --git a/milli/src/update/index_documents/transform.rs b/milli/src/update/index_documents/transform.rs index 98079e07b..05940822a 100644 --- a/milli/src/update/index_documents/transform.rs +++ b/milli/src/update/index_documents/transform.rs @@ -1,5 +1,6 @@ use std::borrow::Cow; -use std::collections::hash_map::Entry; +use std::collections::btree_map::Entry as BEntry; +use std::collections::hash_map::Entry as HEntry; use std::collections::{HashMap, HashSet}; use std::fs::File; use std::io::{Read, Seek}; @@ -20,7 +21,7 @@ use super::{IndexDocumentsMethod, IndexerConfig}; use crate::documents::{DocumentsBatchIndex, EnrichedDocument, EnrichedDocumentsBatchReader}; use crate::error::{Error, InternalError, UserError}; use crate::index::{db_name, main_key}; -use crate::update::del_add::into_del_add_obkv; +use crate::update::del_add::{into_del_add_obkv, DelAdd, KvReaderDelAdd}; use crate::update::{AvailableDocumentsIds, ClearDocuments, UpdateIndexingStep}; use crate::{ FieldDistribution, FieldId, FieldIdMapMissingEntry, FieldsIdsMap, Index, Result, BEU32, @@ -219,8 +220,8 @@ impl<'a, 'i> Transform<'a, 'i> { let mut original_docid = None; let docid = match self.new_external_documents_ids_builder.entry((*external_id).into()) { - Entry::Occupied(entry) => *entry.get() as u32, - Entry::Vacant(entry) => { + HEntry::Occupied(entry) => *entry.get() as u32, + HEntry::Vacant(entry) => { let docid = match external_documents_ids.get(wtxn, entry.key())? { Some(docid) => { // If it was already in the list of replaced documents it means it was deleted @@ -388,7 +389,7 @@ impl<'a, 'i> Transform<'a, 'i> { .entry((*to_remove).into()) { // if the document was added in a previous iteration of the transform we make it as deleted in the sorters. - Entry::Occupied(entry) => { + HEntry::Occupied(entry) => { let doc_id = *entry.get() as u32; document_sorter_buffer.clear(); document_sorter_buffer.push(Operation::Deletion as u8); @@ -405,7 +406,7 @@ impl<'a, 'i> Transform<'a, 'i> { entry.remove_entry(); true } - Entry::Vacant(_) => false, + HEntry::Vacant(_) => false, }; // If the document was already in the db we mark it as a `to_delete` document. @@ -657,8 +658,6 @@ impl<'a, 'i> Transform<'a, 'i> { // 2. Add all the new documents to the field distribution let mut field_distribution = self.index.field_distribution(wtxn)?; - self.remove_deleted_documents_from_field_distribution(wtxn, &mut field_distribution)?; - // Here we are going to do the document count + field distribution + `write_into_stream_writer` let mut iter = self.original_sorter.into_stream_merger_iter()?; // used only for the callback @@ -678,13 +677,49 @@ impl<'a, 'i> Transform<'a, 'i> { // We increment all the field of the current document in the field distribution. let obkv = KvReader::new(val); - for (key, _) in obkv.iter() { - let name = - self.fields_ids_map.name(key).ok_or(FieldIdMapMissingEntry::FieldId { - field_id: key, - process: "Computing field distribution in transform.", - })?; - *field_distribution.entry(name.to_string()).or_insert(0) += 1; + for (key, value) in obkv.iter() { + let reader = KvReaderDelAdd::new(value); + match (reader.get(DelAdd::Deletion), reader.get(DelAdd::Addition)) { + (None, None) => {} + (None, Some(_)) => { + // New field + let name = self.fields_ids_map.name(key).ok_or( + FieldIdMapMissingEntry::FieldId { + field_id: key, + process: "Computing field distribution in transform.", + }, + )?; + *field_distribution.entry(name.to_string()).or_insert(0) += 1; + } + (Some(_), None) => { + // Field removed + let name = self.fields_ids_map.name(key).ok_or( + FieldIdMapMissingEntry::FieldId { + field_id: key, + process: "Computing field distribution in transform.", + }, + )?; + match field_distribution.entry(name.to_string()) { + BEntry::Vacant(_) => { /* Bug? trying to remove a non-existing field */ + } + BEntry::Occupied(mut entry) => { + // attempt to remove one + match entry.get_mut().checked_sub(1) { + Some(new_val) => { + *entry.get_mut() = new_val; + } + None => { + // was 0, remove field from distribution + entry.remove(); + } + } + } + } + } + (Some(_), Some(_)) => { + // Value change, no field distribution change + } + } } writer.insert(key, val)?; }