Change the original document sorter key from the internal docid to a concatenation of the internal and the external docid

This commit is contained in:
ManyTheFish 2023-10-31 16:46:16 +01:00
parent 44e9033b3a
commit 12323d610e
2 changed files with 69 additions and 49 deletions

View File

@ -1387,6 +1387,8 @@ mod tests {
index.add_documents(documents!({ "a" : { "b" : { "c" : 1 }}})).unwrap(); index.add_documents(documents!({ "a" : { "b" : { "c" : 1 }}})).unwrap();
let rtxn = index.read_txn().unwrap(); let rtxn = index.read_txn().unwrap();
let all_documents_count = index.all_documents(&rtxn).unwrap().count();
assert_eq!(all_documents_count, 1);
let external_documents_ids = index.external_documents_ids(); let external_documents_ids = index.external_documents_ids();
assert!(external_documents_ids.get(&rtxn, "1").unwrap().is_some()); assert!(external_documents_ids.get(&rtxn, "1").unwrap().is_some());
} }

View File

@ -174,7 +174,8 @@ impl<'a, 'i> Transform<'a, 'i> {
self.fields_ids_map.insert(&primary_key).ok_or(UserError::AttributeLimitReached)?; self.fields_ids_map.insert(&primary_key).ok_or(UserError::AttributeLimitReached)?;
let mut obkv_buffer = Vec::new(); let mut obkv_buffer = Vec::new();
let mut document_sorter_buffer = Vec::new(); let mut document_sorter_value_buffer = Vec::new();
let mut document_sorter_key_buffer = Vec::new();
let mut documents_count = 0; let mut documents_count = 0;
let mut docid_buffer: Vec<u8> = Vec::new(); let mut docid_buffer: Vec<u8> = Vec::new();
let mut field_buffer: Vec<(u16, Cow<[u8]>)> = Vec::new(); let mut field_buffer: Vec<(u16, Cow<[u8]>)> = Vec::new();
@ -268,57 +269,64 @@ impl<'a, 'i> Transform<'a, 'i> {
// we associate the base document with the new key, everything will get merged later. // we associate the base document with the new key, everything will get merged later.
let keep_original_version = let keep_original_version =
self.index_documents_method == IndexDocumentsMethod::UpdateDocuments; self.index_documents_method == IndexDocumentsMethod::UpdateDocuments;
document_sorter_buffer.clear(); document_sorter_key_buffer.clear();
document_sorter_buffer.push(Operation::Addition as u8); document_sorter_key_buffer.extend_from_slice(&docid.to_be_bytes());
document_sorter_key_buffer.extend_from_slice(external_id.as_bytes());
document_sorter_value_buffer.clear();
document_sorter_value_buffer.push(Operation::Addition as u8);
into_del_add_obkv( into_del_add_obkv(
KvReaderU16::new(base_obkv), KvReaderU16::new(base_obkv),
true, true,
keep_original_version, keep_original_version,
&mut document_sorter_buffer, &mut document_sorter_value_buffer,
)?; )?;
self.original_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?; self.original_sorter.insert(&document_sorter_key_buffer, &document_sorter_buffer)?;
let base_obkv = KvReader::new(base_obkv); let base_obkv = KvReader::new(base_obkv);
if let Some(flattened_obkv) = self.flatten_from_fields_ids_map(base_obkv)? { if let Some(flattened_obkv) = self.flatten_from_fields_ids_map(base_obkv)? {
// we recreate our buffer with the flattened documents // we recreate our buffer with the flattened documents
document_sorter_buffer.clear(); document_sorter_value_buffer.clear();
document_sorter_buffer.push(Operation::Addition as u8); document_sorter_value_buffer.push(Operation::Addition as u8);
into_del_add_obkv( into_del_add_obkv(
KvReaderU16::new(&flattened_obkv), KvReaderU16::new(&flattened_obkv),
true, true,
keep_original_version, keep_original_version,
&mut document_sorter_buffer, &mut document_sorter_value_buffer,
)?; )?;
} }
self.flattened_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?; self.flattened_sorter.insert(docid.to_be_bytes(), &document_sorter_value_buffer)?;
} }
} }
if !skip_insertion { if !skip_insertion {
self.new_documents_ids.insert(docid); self.new_documents_ids.insert(docid);
document_sorter_buffer.clear(); document_sorter_key_buffer.clear();
document_sorter_buffer.push(Operation::Addition as u8); document_sorter_key_buffer.extend_from_slice(&docid.to_be_bytes());
document_sorter_key_buffer.extend_from_slice(external_id.as_bytes());
document_sorter_value_buffer.clear();
document_sorter_value_buffer.push(Operation::Addition as u8);
into_del_add_obkv( into_del_add_obkv(
KvReaderU16::new(&obkv_buffer), KvReaderU16::new(&obkv_buffer),
false, false,
true, true,
&mut document_sorter_buffer, &mut document_sorter_value_buffer,
)?; )?;
// We use the extracted/generated user id as the key for this document. // We use the extracted/generated user id as the key for this document.
self.original_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?; self.original_sorter
.insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?;
let flattened_obkv = KvReader::new(&obkv_buffer); let flattened_obkv = KvReader::new(&obkv_buffer);
if let Some(obkv) = self.flatten_from_fields_ids_map(flattened_obkv)? { if let Some(obkv) = self.flatten_from_fields_ids_map(flattened_obkv)? {
document_sorter_buffer.clear(); document_sorter_value_buffer.clear();
document_sorter_buffer.push(Operation::Addition as u8); document_sorter_value_buffer.push(Operation::Addition as u8);
into_del_add_obkv( into_del_add_obkv(
KvReaderU16::new(&obkv), KvReaderU16::new(&obkv),
false, false,
true, true,
&mut document_sorter_buffer, &mut document_sorter_value_buffer,
)? )?
} }
self.flattened_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?; self.flattened_sorter.insert(docid.to_be_bytes(), &document_sorter_value_buffer)?;
} }
documents_count += 1; documents_count += 1;
@ -372,37 +380,42 @@ impl<'a, 'i> Transform<'a, 'i> {
let external_documents_ids = self.index.external_documents_ids(); let external_documents_ids = self.index.external_documents_ids();
let mut documents_deleted = 0; let mut documents_deleted = 0;
let mut document_sorter_buffer = Vec::new(); let mut document_sorter_value_buffer = Vec::new();
let mut document_sorter_key_buffer = Vec::new();
for to_remove in to_remove { for to_remove in to_remove {
if should_abort() { if should_abort() {
return Err(Error::InternalError(InternalError::AbortedIndexation)); return Err(Error::InternalError(InternalError::AbortedIndexation));
} }
// Check if the document has been added in the current indexing process. // Check if the document has been added in the current indexing process.
let deleted_from_current = match self let deleted_from_current =
.new_external_documents_ids_builder match self.new_external_documents_ids_builder.entry((*to_remove).into()) {
.entry((*to_remove).into()) // if the document was added in a previous iteration of the transform we make it as deleted in the sorters.
{ HEntry::Occupied(entry) => {
// if the document was added in a previous iteration of the transform we make it as deleted in the sorters. let docid = *entry.get() as u32;
HEntry::Occupied(entry) => { // Key is the concatenation of the internal docid and the external one.
let doc_id = *entry.get() as u32; document_sorter_key_buffer.clear();
document_sorter_buffer.clear(); document_sorter_key_buffer.extend_from_slice(&docid.to_be_bytes());
document_sorter_buffer.push(Operation::Deletion as u8); document_sorter_key_buffer.extend_from_slice(to_remove.as_bytes());
obkv::KvWriterU16::new(&mut document_sorter_buffer).finish().unwrap(); document_sorter_value_buffer.clear();
self.original_sorter.insert(doc_id.to_be_bytes(), &document_sorter_buffer)?; document_sorter_value_buffer.push(Operation::Deletion as u8);
self.flattened_sorter.insert(doc_id.to_be_bytes(), &document_sorter_buffer)?; obkv::KvWriterU16::new(&mut document_sorter_value_buffer).finish().unwrap();
self.original_sorter
.insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?;
self.flattened_sorter
.insert(docid.to_be_bytes(), &document_sorter_value_buffer)?;
// we must NOT update the list of replaced_documents_ids // we must NOT update the list of replaced_documents_ids
// Either: // Either:
// 1. It's already in it and there is nothing to do // 1. It's already in it and there is nothing to do
// 2. It wasn't in it because the document was created by a previous batch and since // 2. It wasn't in it because the document was created by a previous batch and since
// we're removing it there is nothing to do. // we're removing it there is nothing to do.
self.new_documents_ids.remove(doc_id); self.new_documents_ids.remove(docid);
entry.remove_entry(); entry.remove_entry();
true true
} }
HEntry::Vacant(_) => false, HEntry::Vacant(_) => false,
}; };
// If the document was already in the db we mark it as a `to_delete` document. // If the document was already in the db we mark it as a `to_delete` document.
// Then we push the document in sorters in deletion mode. // Then we push the document in sorters in deletion mode.
@ -422,31 +435,36 @@ impl<'a, 'i> Transform<'a, 'i> {
key: None, key: None,
})?; })?;
// Key is the concatenation of the internal docid and the external one.
document_sorter_key_buffer.clear();
document_sorter_key_buffer.extend_from_slice(&docid.to_be_bytes());
document_sorter_key_buffer.extend_from_slice(to_remove.as_bytes());
// push it as to delete in the original_sorter // push it as to delete in the original_sorter
document_sorter_buffer.clear(); document_sorter_value_buffer.clear();
document_sorter_buffer.push(Operation::Deletion as u8); document_sorter_value_buffer.push(Operation::Deletion as u8);
into_del_add_obkv( into_del_add_obkv(
KvReaderU16::new(base_obkv), KvReaderU16::new(base_obkv),
true, true,
false, false,
&mut document_sorter_buffer, &mut document_sorter_value_buffer,
)?; )?;
self.original_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?; self.original_sorter
.insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?;
// flatten it and push it as to delete in the flattened_sorter // flatten it and push it as to delete in the flattened_sorter
let flattened_obkv = KvReader::new(base_obkv); let flattened_obkv = KvReader::new(base_obkv);
if let Some(obkv) = self.flatten_from_fields_ids_map(flattened_obkv)? { if let Some(obkv) = self.flatten_from_fields_ids_map(flattened_obkv)? {
// we recreate our buffer with the flattened documents // we recreate our buffer with the flattened documents
document_sorter_buffer.clear(); document_sorter_value_buffer.clear();
document_sorter_buffer.push(Operation::Deletion as u8); document_sorter_value_buffer.push(Operation::Deletion as u8);
into_del_add_obkv( into_del_add_obkv(
KvReaderU16::new(&obkv), KvReaderU16::new(&obkv),
true, true,
false, false,
&mut document_sorter_buffer, &mut document_sorter_value_buffer,
)?; )?;
} }
self.flattened_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?; self.flattened_sorter.insert(docid.to_be_bytes(), &document_sorter_value_buffer)?;
true true
} }