Stop reindexing already indexed documents

This commit is contained in:
Tamo 2022-07-05 19:15:16 +02:00 committed by Irevoire
parent e1bc610d27
commit f156d7dd3b
No known key found for this signature in database
GPG Key ID: 7A6A970C96104F1B

View File

@ -200,24 +200,26 @@ impl<'a, 'i> Transform<'a, 'i> {
let mut original_docid = None; let mut original_docid = None;
let docid = match self.new_external_documents_ids_builder.entry(external_id.into()) { let docid =
Entry::Occupied(entry) => *entry.get() as u32, match self.new_external_documents_ids_builder.entry(external_id.clone().into()) {
Entry::Vacant(entry) => { Entry::Occupied(entry) => *entry.get() as u32,
// If the document was already in the db we mark it as a replaced document. Entry::Vacant(entry) => {
// It'll be deleted later. We keep its original docid to insert it in the grenad. // If the document was already in the db we mark it as a replaced document.
if let Some(docid) = external_documents_ids.get(entry.key()) { // It'll be deleted later. We keep its original docid to insert it in the grenad.
self.replaced_documents_ids.insert(docid); if let Some(docid) = external_documents_ids.get(entry.key()) {
original_docid = Some(docid); self.replaced_documents_ids.insert(docid);
original_docid = Some(docid);
}
let docid = self
.available_documents_ids
.next()
.ok_or(UserError::DocumentLimitReached)?;
entry.insert(docid as u64);
docid
} }
let docid = self };
.available_documents_ids
.next()
.ok_or(UserError::DocumentLimitReached)?;
entry.insert(docid as u64);
docid
}
};
let mut skip_insertion = false;
if let Some(original_docid) = original_docid { if let Some(original_docid) = original_docid {
let original_key = BEU32::new(original_docid); let original_key = BEU32::new(original_docid);
let base_obkv = self let base_obkv = self
@ -230,24 +232,39 @@ impl<'a, 'i> Transform<'a, 'i> {
key: None, key: None,
})?; })?;
// we associate the base document with the new key, everything will get merged later. // we check if the two documents are exactly equal. If it's the case we can skip this document entirely
self.original_sorter.insert(&docid.to_be_bytes(), base_obkv)?; if base_obkv == obkv_buffer {
match self.flatten_from_fields_ids_map(KvReader::new(&base_obkv))? { // we're not replacing anything
Some(buffer) => self.flattened_sorter.insert(docid.to_be_bytes(), &buffer)?, self.replaced_documents_ids.remove(original_docid);
None => self.flattened_sorter.insert(docid.to_be_bytes(), base_obkv)?, // and we need to put back the original id as it was before
self.new_external_documents_ids_builder.remove(&*external_id);
skip_insertion = true;
} else {
// we associate the base document with the new key, everything will get merged later.
self.original_sorter.insert(&docid.to_be_bytes(), base_obkv)?;
match self.flatten_from_fields_ids_map(KvReader::new(&base_obkv))? {
Some(buffer) => {
self.flattened_sorter.insert(docid.to_be_bytes(), &buffer)?
}
None => self.flattened_sorter.insert(docid.to_be_bytes(), base_obkv)?,
}
} }
} else { } else {
self.new_documents_ids.insert(docid); self.new_documents_ids.insert(docid);
} }
// We use the extracted/generated user id as the key for this document. if !skip_insertion {
self.original_sorter.insert(&docid.to_be_bytes(), &obkv_buffer)?; // We use the extracted/generated user id as the key for this document.
documents_count += 1; self.original_sorter.insert(&docid.to_be_bytes(), obkv_buffer.clone())?;
match self.flatten_from_fields_ids_map(KvReader::new(&obkv_buffer))? { match self.flatten_from_fields_ids_map(KvReader::new(&obkv_buffer))? {
Some(buffer) => self.flattened_sorter.insert(docid.to_be_bytes(), &buffer)?, Some(buffer) => self.flattened_sorter.insert(docid.to_be_bytes(), &buffer)?,
None => self.flattened_sorter.insert(docid.to_be_bytes(), &obkv_buffer)?, None => {
self.flattened_sorter.insert(docid.to_be_bytes(), obkv_buffer.clone())?
}
}
} }
documents_count += 1;
progress_callback(UpdateIndexingStep::RemapDocumentAddition { progress_callback(UpdateIndexingStep::RemapDocumentAddition {
documents_seen: documents_count, documents_seen: documents_count,
@ -394,6 +411,11 @@ impl<'a, 'i> Transform<'a, 'i> {
rtxn: &RoTxn, rtxn: &RoTxn,
field_distribution: &mut FieldDistribution, field_distribution: &mut FieldDistribution,
) -> Result<()> { ) -> Result<()> {
println!(
"The following documents are going to be deleted from the field distribution: {:?}",
self.replaced_documents_ids
);
for deleted_docid in self.replaced_documents_ids.iter() { for deleted_docid in self.replaced_documents_ids.iter() {
let obkv = self.index.documents.get(rtxn, &BEU32::new(deleted_docid))?.ok_or( let obkv = self.index.documents.get(rtxn, &BEU32::new(deleted_docid))?.ok_or(
InternalError::DatabaseMissingEntry { db_name: db_name::DOCUMENTS, key: None }, InternalError::DatabaseMissingEntry { db_name: db_name::DOCUMENTS, key: None },
@ -461,6 +483,7 @@ impl<'a, 'i> Transform<'a, 'i> {
let mut documents_count = 0; let mut documents_count = 0;
while let Some((key, val)) = iter.next()? { while let Some((key, val)) = iter.next()? {
println!("Reading a document");
// send a callback to show at which step we are // send a callback to show at which step we are
documents_count += 1; documents_count += 1;
progress_callback(UpdateIndexingStep::ComputeIdsAndMergeDocuments { progress_callback(UpdateIndexingStep::ComputeIdsAndMergeDocuments {