Update field distribution taking into account both deletions and additions

This commit is contained in:
Louis Dureuil 2023-10-30 14:47:51 +01:00
parent 58690dfb19
commit 54d07a8da3
No known key found for this signature in database

View File

@ -1,5 +1,6 @@
use std::borrow::Cow; use std::borrow::Cow;
use std::collections::hash_map::Entry; use std::collections::btree_map::Entry as BEntry;
use std::collections::hash_map::Entry as HEntry;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::fs::File; use std::fs::File;
use std::io::{Read, Seek}; use std::io::{Read, Seek};
@ -20,7 +21,7 @@ use super::{IndexDocumentsMethod, IndexerConfig};
use crate::documents::{DocumentsBatchIndex, EnrichedDocument, EnrichedDocumentsBatchReader}; use crate::documents::{DocumentsBatchIndex, EnrichedDocument, EnrichedDocumentsBatchReader};
use crate::error::{Error, InternalError, UserError}; use crate::error::{Error, InternalError, UserError};
use crate::index::{db_name, main_key}; use crate::index::{db_name, main_key};
use crate::update::del_add::into_del_add_obkv; use crate::update::del_add::{into_del_add_obkv, DelAdd, KvReaderDelAdd};
use crate::update::{AvailableDocumentsIds, ClearDocuments, UpdateIndexingStep}; use crate::update::{AvailableDocumentsIds, ClearDocuments, UpdateIndexingStep};
use crate::{ use crate::{
FieldDistribution, FieldId, FieldIdMapMissingEntry, FieldsIdsMap, Index, Result, BEU32, FieldDistribution, FieldId, FieldIdMapMissingEntry, FieldsIdsMap, Index, Result, BEU32,
@ -219,8 +220,8 @@ impl<'a, 'i> Transform<'a, 'i> {
let mut original_docid = None; let mut original_docid = None;
let docid = match self.new_external_documents_ids_builder.entry((*external_id).into()) { let docid = match self.new_external_documents_ids_builder.entry((*external_id).into()) {
Entry::Occupied(entry) => *entry.get() as u32, HEntry::Occupied(entry) => *entry.get() as u32,
Entry::Vacant(entry) => { HEntry::Vacant(entry) => {
let docid = match external_documents_ids.get(wtxn, entry.key())? { let docid = match external_documents_ids.get(wtxn, entry.key())? {
Some(docid) => { Some(docid) => {
// If it was already in the list of replaced documents it means it was deleted // If it was already in the list of replaced documents it means it was deleted
@ -388,7 +389,7 @@ impl<'a, 'i> Transform<'a, 'i> {
.entry((*to_remove).into()) .entry((*to_remove).into())
{ {
// if the document was added in a previous iteration of the transform we make it as deleted in the sorters. // if the document was added in a previous iteration of the transform we make it as deleted in the sorters.
Entry::Occupied(entry) => { HEntry::Occupied(entry) => {
let doc_id = *entry.get() as u32; let doc_id = *entry.get() as u32;
document_sorter_buffer.clear(); document_sorter_buffer.clear();
document_sorter_buffer.push(Operation::Deletion as u8); document_sorter_buffer.push(Operation::Deletion as u8);
@ -405,7 +406,7 @@ impl<'a, 'i> Transform<'a, 'i> {
entry.remove_entry(); entry.remove_entry();
true true
} }
Entry::Vacant(_) => false, HEntry::Vacant(_) => false,
}; };
// If the document was already in the db we mark it as a `to_delete` document. // If the document was already in the db we mark it as a `to_delete` document.
@ -657,8 +658,6 @@ impl<'a, 'i> Transform<'a, 'i> {
// 2. Add all the new documents to the field distribution // 2. Add all the new documents to the field distribution
let mut field_distribution = self.index.field_distribution(wtxn)?; let mut field_distribution = self.index.field_distribution(wtxn)?;
self.remove_deleted_documents_from_field_distribution(wtxn, &mut field_distribution)?;
// Here we are going to do the document count + field distribution + `write_into_stream_writer` // Here we are going to do the document count + field distribution + `write_into_stream_writer`
let mut iter = self.original_sorter.into_stream_merger_iter()?; let mut iter = self.original_sorter.into_stream_merger_iter()?;
// used only for the callback // used only for the callback
@ -678,13 +677,49 @@ impl<'a, 'i> Transform<'a, 'i> {
// We increment all the field of the current document in the field distribution. // We increment all the field of the current document in the field distribution.
let obkv = KvReader::new(val); let obkv = KvReader::new(val);
for (key, _) in obkv.iter() { for (key, value) in obkv.iter() {
let name = let reader = KvReaderDelAdd::new(value);
self.fields_ids_map.name(key).ok_or(FieldIdMapMissingEntry::FieldId { match (reader.get(DelAdd::Deletion), reader.get(DelAdd::Addition)) {
field_id: key, (None, None) => {}
process: "Computing field distribution in transform.", (None, Some(_)) => {
})?; // New field
*field_distribution.entry(name.to_string()).or_insert(0) += 1; let name = self.fields_ids_map.name(key).ok_or(
FieldIdMapMissingEntry::FieldId {
field_id: key,
process: "Computing field distribution in transform.",
},
)?;
*field_distribution.entry(name.to_string()).or_insert(0) += 1;
}
(Some(_), None) => {
// Field removed
let name = self.fields_ids_map.name(key).ok_or(
FieldIdMapMissingEntry::FieldId {
field_id: key,
process: "Computing field distribution in transform.",
},
)?;
match field_distribution.entry(name.to_string()) {
BEntry::Vacant(_) => { /* Bug? trying to remove a non-existing field */
}
BEntry::Occupied(mut entry) => {
// attempt to remove one
match entry.get_mut().checked_sub(1) {
Some(new_val) => {
*entry.get_mut() = new_val;
}
None => {
// was 0, remove field from distribution
entry.remove();
}
}
}
}
}
(Some(_), Some(_)) => {
// Value change, no field distribution change
}
}
} }
writer.insert(key, val)?; writer.insert(key, val)?;
} }