mirror of
https://github.com/meilisearch/meilisearch.git
synced 2024-11-23 02:27:40 +08:00
Make the transform struct return diff-based documents obkvs
This commit is contained in:
parent
e8f8730467
commit
096d7705c7
60
milli/src/update/del_add.rs
Normal file
60
milli/src/update/del_add.rs
Normal file
@ -0,0 +1,60 @@
|
|||||||
|
use obkv::Key;
|
||||||
|
|
||||||
|
pub type KvWriterDelAdd<W> = obkv::KvWriter<W, DelAdd>;
|
||||||
|
pub type KvReaderDelAdd<'a> = obkv::KvReader<'a, DelAdd>;
|
||||||
|
|
||||||
|
/// DelAdd defines the new value to add in the database and old value to delete from the database.
|
||||||
|
///
|
||||||
|
/// Its used in an OBKV to be serialized in grenad files.
|
||||||
|
#[repr(u8)]
|
||||||
|
#[derive(Clone, Copy, PartialOrd, PartialEq, Debug)]
|
||||||
|
pub enum DelAdd {
|
||||||
|
Deletion = 0,
|
||||||
|
Addition = 1,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Key for DelAdd {
|
||||||
|
const BYTES_SIZE: usize = std::mem::size_of::<DelAdd>();
|
||||||
|
type BYTES = [u8; Self::BYTES_SIZE];
|
||||||
|
|
||||||
|
fn to_be_bytes(&self) -> Self::BYTES {
|
||||||
|
u8::to_be_bytes(*self as u8)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn from_be_bytes(array: Self::BYTES) -> Self {
|
||||||
|
match u8::from_be_bytes(array) {
|
||||||
|
0 => Self::Deletion,
|
||||||
|
1 => Self::Addition,
|
||||||
|
otherwise => unreachable!("DelAdd has only 2 variants, unknown variant: {}", otherwise),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Creates a Kv<K, Kv<DelAdd, value>> from Kv<K, value>
|
||||||
|
///
|
||||||
|
/// if deletion is `true`, the value will be inserted behind a DelAdd::Deletion key.
|
||||||
|
/// if addition is `true`, the value will be inserted behind a DelAdd::Addition key.
|
||||||
|
/// if both deletion and addition are `true, the value will be inserted in both keys.
|
||||||
|
pub fn into_del_add_obkv<K: obkv::Key + PartialOrd>(
|
||||||
|
reader: obkv::KvReader<K>,
|
||||||
|
deletion: bool,
|
||||||
|
addition: bool,
|
||||||
|
buffer: &mut Vec<u8>,
|
||||||
|
) -> Result<(), std::io::Error> {
|
||||||
|
let mut writer = obkv::KvWriter::new(buffer);
|
||||||
|
let mut value_buffer = Vec::new();
|
||||||
|
for (key, value) in reader.iter() {
|
||||||
|
value_buffer.clear();
|
||||||
|
let mut value_writer = KvWriterDelAdd::new(&mut value_buffer);
|
||||||
|
if deletion {
|
||||||
|
value_writer.insert(DelAdd::Deletion, value)?;
|
||||||
|
}
|
||||||
|
if addition {
|
||||||
|
value_writer.insert(DelAdd::Addition, value)?;
|
||||||
|
}
|
||||||
|
value_writer.finish()?;
|
||||||
|
writer.insert(key, &value_buffer)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
writer.finish()
|
||||||
|
}
|
@ -6,6 +6,7 @@ use std::result::Result as StdResult;
|
|||||||
use roaring::RoaringBitmap;
|
use roaring::RoaringBitmap;
|
||||||
|
|
||||||
use crate::heed_codec::CboRoaringBitmapCodec;
|
use crate::heed_codec::CboRoaringBitmapCodec;
|
||||||
|
use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd};
|
||||||
use crate::update::index_documents::transform::Operation;
|
use crate::update::index_documents::transform::Operation;
|
||||||
use crate::Result;
|
use crate::Result;
|
||||||
|
|
||||||
@ -76,55 +77,118 @@ pub fn keep_latest_obkv<'a>(_key: &[u8], obkvs: &[Cow<'a, [u8]>]) -> Result<Cow<
|
|||||||
Ok(obkvs.last().unwrap().clone())
|
Ok(obkvs.last().unwrap().clone())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn merge_two_obkvs(base: obkv::KvReaderU16, update: obkv::KvReaderU16, buffer: &mut Vec<u8>) {
|
pub fn merge_two_del_add_obkvs(
|
||||||
|
base: obkv::KvReaderU16,
|
||||||
|
update: obkv::KvReaderU16,
|
||||||
|
merge_additions: bool,
|
||||||
|
buffer: &mut Vec<u8>,
|
||||||
|
) {
|
||||||
use itertools::merge_join_by;
|
use itertools::merge_join_by;
|
||||||
use itertools::EitherOrBoth::{Both, Left, Right};
|
use itertools::EitherOrBoth::{Both, Left, Right};
|
||||||
|
|
||||||
buffer.clear();
|
buffer.clear();
|
||||||
|
|
||||||
let mut writer = obkv::KvWriter::new(buffer);
|
let mut writer = obkv::KvWriter::new(buffer);
|
||||||
|
let mut value_buffer = Vec::new();
|
||||||
for eob in merge_join_by(base.iter(), update.iter(), |(b, _), (u, _)| b.cmp(u)) {
|
for eob in merge_join_by(base.iter(), update.iter(), |(b, _), (u, _)| b.cmp(u)) {
|
||||||
match eob {
|
match eob {
|
||||||
Both(_, (k, v)) | Left((k, v)) | Right((k, v)) => writer.insert(k, v).unwrap(),
|
Left((k, v)) => {
|
||||||
|
if merge_additions {
|
||||||
|
writer.insert(k, v).unwrap()
|
||||||
|
} else {
|
||||||
|
// If merge_additions is false, recreate an obkv keeping the deletions only.
|
||||||
|
value_buffer.clear();
|
||||||
|
let mut value_writer = KvWriterDelAdd::new(&mut value_buffer);
|
||||||
|
let base_reader = KvReaderDelAdd::new(v);
|
||||||
|
|
||||||
|
if let Some(deletion) = base_reader.get(DelAdd::Deletion) {
|
||||||
|
value_writer.insert(DelAdd::Deletion, deletion).unwrap();
|
||||||
|
value_writer.finish().unwrap();
|
||||||
|
writer.insert(k, &value_buffer).unwrap()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Right((k, v)) => writer.insert(k, v).unwrap(),
|
||||||
|
Both((k, base), (_, update)) => {
|
||||||
|
// merge deletions and additions.
|
||||||
|
value_buffer.clear();
|
||||||
|
let mut value_writer = KvWriterDelAdd::new(&mut value_buffer);
|
||||||
|
let base_reader = KvReaderDelAdd::new(base);
|
||||||
|
let update_reader = KvReaderDelAdd::new(update);
|
||||||
|
|
||||||
|
// keep newest deletion.
|
||||||
|
if let Some(deletion) =
|
||||||
|
update_reader.get(DelAdd::Deletion).or(base_reader.get(DelAdd::Deletion))
|
||||||
|
{
|
||||||
|
value_writer.insert(DelAdd::Deletion, deletion).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
// keep base addition only if merge_additions is true.
|
||||||
|
let base_addition =
|
||||||
|
merge_additions.then(|| base_reader.get(DelAdd::Addition)).flatten();
|
||||||
|
// keep newest addition.
|
||||||
|
if let Some(addition) = update_reader.get(DelAdd::Addition).or(base_addition) {
|
||||||
|
value_writer.insert(DelAdd::Addition, addition).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
value_writer.finish().unwrap();
|
||||||
|
writer.insert(k, &value_buffer).unwrap()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
writer.finish().unwrap();
|
writer.finish().unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Merge all the obks in the order we see them.
|
/// Merge all the obkvs from the newest to the oldest.
|
||||||
pub fn merge_obkvs_and_operations<'a>(
|
fn inner_merge_del_add_obkvs<'a>(
|
||||||
|
obkvs: &[Cow<'a, [u8]>],
|
||||||
|
merge_additions: bool,
|
||||||
|
) -> Result<Cow<'a, [u8]>> {
|
||||||
|
// pop the newest operation from the list.
|
||||||
|
let (newest, obkvs) = obkvs.split_last().unwrap();
|
||||||
|
// keep the operation type for the returned value.
|
||||||
|
let newest_operation_type = newest[0];
|
||||||
|
|
||||||
|
// treat the newest obkv as the starting point of the merge.
|
||||||
|
let mut acc_operation_type = newest_operation_type;
|
||||||
|
let mut acc = newest[1..].to_vec();
|
||||||
|
let mut buffer = Vec::new();
|
||||||
|
// reverse iter from the most recent to the oldest.
|
||||||
|
for current in obkvs.into_iter().rev() {
|
||||||
|
// if in the previous iteration there was a complete deletion,
|
||||||
|
// stop the merge process.
|
||||||
|
if acc_operation_type == Operation::Deletion as u8 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
let newest = obkv::KvReader::new(&acc);
|
||||||
|
let oldest = obkv::KvReader::new(¤t[1..]);
|
||||||
|
merge_two_del_add_obkvs(oldest, newest, merge_additions, &mut buffer);
|
||||||
|
|
||||||
|
// we want the result of the merge into our accumulator.
|
||||||
|
std::mem::swap(&mut acc, &mut buffer);
|
||||||
|
acc_operation_type = current[0];
|
||||||
|
}
|
||||||
|
|
||||||
|
acc.insert(0, newest_operation_type);
|
||||||
|
Ok(Cow::from(acc))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Merge all the obkvs from the newest to the oldest.
|
||||||
|
pub fn obkvs_merge_additions_and_deletions<'a>(
|
||||||
_key: &[u8],
|
_key: &[u8],
|
||||||
obkvs: &[Cow<'a, [u8]>],
|
obkvs: &[Cow<'a, [u8]>],
|
||||||
) -> Result<Cow<'a, [u8]>> {
|
) -> Result<Cow<'a, [u8]>> {
|
||||||
// [add, add, delete, add, add]
|
inner_merge_del_add_obkvs(obkvs, true)
|
||||||
// we can ignore everything that happened before the last delete.
|
}
|
||||||
let starting_position =
|
|
||||||
obkvs.iter().rposition(|obkv| obkv[0] == Operation::Deletion as u8).unwrap_or(0);
|
|
||||||
|
|
||||||
// [add, add, delete]
|
/// Merge all the obkvs deletions from the newest to the oldest and keep only the newest additions.
|
||||||
// if the last operation was a deletion then we simply return the deletion
|
pub fn obkvs_keep_last_addition_merge_deletions<'a>(
|
||||||
if starting_position == obkvs.len() - 1 && obkvs.last().unwrap()[0] == Operation::Deletion as u8
|
_key: &[u8],
|
||||||
{
|
obkvs: &[Cow<'a, [u8]>],
|
||||||
return Ok(obkvs[obkvs.len() - 1].clone());
|
) -> Result<Cow<'a, [u8]>> {
|
||||||
}
|
inner_merge_del_add_obkvs(obkvs, false)
|
||||||
let mut buffer = Vec::new();
|
|
||||||
|
|
||||||
// (add, add, delete) [add, add]
|
|
||||||
// in the other case, no deletion will be encountered during the merge
|
|
||||||
let mut ret =
|
|
||||||
obkvs[starting_position..].iter().cloned().fold(Vec::new(), |mut acc, current| {
|
|
||||||
let first = obkv::KvReader::new(&acc);
|
|
||||||
let second = obkv::KvReader::new(¤t[1..]);
|
|
||||||
merge_two_obkvs(first, second, &mut buffer);
|
|
||||||
|
|
||||||
// we want the result of the merge into our accumulator
|
|
||||||
std::mem::swap(&mut acc, &mut buffer);
|
|
||||||
acc
|
|
||||||
});
|
|
||||||
|
|
||||||
ret.insert(0, Operation::Addition as u8);
|
|
||||||
Ok(Cow::from(ret))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn merge_cbo_roaring_bitmaps<'a>(
|
pub fn merge_cbo_roaring_bitmaps<'a>(
|
||||||
|
@ -14,8 +14,8 @@ pub use grenad_helpers::{
|
|||||||
};
|
};
|
||||||
pub use merge_functions::{
|
pub use merge_functions::{
|
||||||
concat_u32s_array, keep_first, keep_latest_obkv, merge_btreeset_string,
|
concat_u32s_array, keep_first, keep_latest_obkv, merge_btreeset_string,
|
||||||
merge_cbo_roaring_bitmaps, merge_obkvs_and_operations, merge_roaring_bitmaps, merge_two_obkvs,
|
merge_cbo_roaring_bitmaps, merge_roaring_bitmaps, obkvs_keep_last_addition_merge_deletions,
|
||||||
serialize_roaring_bitmap, MergeFn,
|
obkvs_merge_additions_and_deletions, serialize_roaring_bitmap, MergeFn,
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::MAX_WORD_LENGTH;
|
use crate::MAX_WORD_LENGTH;
|
||||||
|
@ -7,18 +7,20 @@ use std::io::{Read, Seek};
|
|||||||
use fxhash::FxHashMap;
|
use fxhash::FxHashMap;
|
||||||
use heed::RoTxn;
|
use heed::RoTxn;
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
use obkv::{KvReader, KvWriter};
|
use obkv::{KvReader, KvReaderU16, KvWriter};
|
||||||
use roaring::RoaringBitmap;
|
use roaring::RoaringBitmap;
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
use smartstring::SmartString;
|
use smartstring::SmartString;
|
||||||
|
|
||||||
use super::helpers::{
|
use super::helpers::{
|
||||||
create_sorter, create_writer, keep_latest_obkv, merge_obkvs_and_operations, MergeFn,
|
create_sorter, create_writer, obkvs_keep_last_addition_merge_deletions,
|
||||||
|
obkvs_merge_additions_and_deletions, MergeFn,
|
||||||
};
|
};
|
||||||
use super::{IndexDocumentsMethod, IndexerConfig};
|
use super::{IndexDocumentsMethod, IndexerConfig};
|
||||||
use crate::documents::{DocumentsBatchIndex, EnrichedDocument, EnrichedDocumentsBatchReader};
|
use crate::documents::{DocumentsBatchIndex, EnrichedDocument, EnrichedDocumentsBatchReader};
|
||||||
use crate::error::{Error, InternalError, UserError};
|
use crate::error::{Error, InternalError, UserError};
|
||||||
use crate::index::{db_name, main_key};
|
use crate::index::{db_name, main_key};
|
||||||
|
use crate::update::del_add::into_del_add_obkv;
|
||||||
use crate::update::{AvailableDocumentsIds, ClearDocuments, UpdateIndexingStep};
|
use crate::update::{AvailableDocumentsIds, ClearDocuments, UpdateIndexingStep};
|
||||||
use crate::{
|
use crate::{
|
||||||
FieldDistribution, FieldId, FieldIdMapMissingEntry, FieldsIdsMap, Index, Result, BEU32,
|
FieldDistribution, FieldId, FieldIdMapMissingEntry, FieldsIdsMap, Index, Result, BEU32,
|
||||||
@ -106,8 +108,8 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||||||
// We must choose the appropriate merge function for when two or more documents
|
// We must choose the appropriate merge function for when two or more documents
|
||||||
// with the same user id must be merged or fully replaced in the same batch.
|
// with the same user id must be merged or fully replaced in the same batch.
|
||||||
let merge_function = match index_documents_method {
|
let merge_function = match index_documents_method {
|
||||||
IndexDocumentsMethod::ReplaceDocuments => keep_latest_obkv,
|
IndexDocumentsMethod::ReplaceDocuments => obkvs_keep_last_addition_merge_deletions,
|
||||||
IndexDocumentsMethod::UpdateDocuments => merge_obkvs_and_operations,
|
IndexDocumentsMethod::UpdateDocuments => obkvs_merge_additions_and_deletions,
|
||||||
};
|
};
|
||||||
|
|
||||||
// We initialize the sorter with the user indexing settings.
|
// We initialize the sorter with the user indexing settings.
|
||||||
@ -223,19 +225,21 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||||||
let docid = match self.new_external_documents_ids_builder.entry((*external_id).into()) {
|
let docid = match self.new_external_documents_ids_builder.entry((*external_id).into()) {
|
||||||
Entry::Occupied(entry) => *entry.get() as u32,
|
Entry::Occupied(entry) => *entry.get() as u32,
|
||||||
Entry::Vacant(entry) => {
|
Entry::Vacant(entry) => {
|
||||||
// If the document was already in the db we mark it as a replaced document.
|
let docid = match external_documents_ids.get(entry.key()) {
|
||||||
// It'll be deleted later.
|
Some(docid) => {
|
||||||
if let Some(docid) = external_documents_ids.get(entry.key()) {
|
// If it was already in the list of replaced documents it means it was deleted
|
||||||
// If it was already in the list of replaced documents it means it was deleted
|
// by the remove_document method. We should starts as if it never existed.
|
||||||
// by the remove_document method. We should starts as if it never existed.
|
if self.replaced_documents_ids.insert(docid) {
|
||||||
if self.replaced_documents_ids.insert(docid) {
|
original_docid = Some(docid);
|
||||||
original_docid = Some(docid);
|
}
|
||||||
|
|
||||||
|
docid
|
||||||
}
|
}
|
||||||
}
|
None => self
|
||||||
let docid = self
|
.available_documents_ids
|
||||||
.available_documents_ids
|
.next()
|
||||||
.next()
|
.ok_or(UserError::DocumentLimitReached)?,
|
||||||
.ok_or(UserError::DocumentLimitReached)?;
|
};
|
||||||
entry.insert(docid as u64);
|
entry.insert(docid as u64);
|
||||||
docid
|
docid
|
||||||
}
|
}
|
||||||
@ -263,16 +267,28 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||||||
skip_insertion = true;
|
skip_insertion = true;
|
||||||
} else {
|
} else {
|
||||||
// we associate the base document with the new key, everything will get merged later.
|
// we associate the base document with the new key, everything will get merged later.
|
||||||
|
let keep_original_version =
|
||||||
|
self.index_documents_method == IndexDocumentsMethod::UpdateDocuments;
|
||||||
document_sorter_buffer.clear();
|
document_sorter_buffer.clear();
|
||||||
document_sorter_buffer.push(Operation::Addition as u8);
|
document_sorter_buffer.push(Operation::Addition as u8);
|
||||||
document_sorter_buffer.extend_from_slice(base_obkv);
|
into_del_add_obkv(
|
||||||
|
KvReaderU16::new(base_obkv),
|
||||||
|
true,
|
||||||
|
keep_original_version,
|
||||||
|
&mut document_sorter_buffer,
|
||||||
|
)?;
|
||||||
self.original_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?;
|
self.original_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?;
|
||||||
match self.flatten_from_fields_ids_map(KvReader::new(base_obkv))? {
|
match self.flatten_from_fields_ids_map(KvReader::new(base_obkv))? {
|
||||||
Some(flattened_obkv) => {
|
Some(flattened_obkv) => {
|
||||||
// we recreate our buffer with the flattened documents
|
// we recreate our buffer with the flattened documents
|
||||||
document_sorter_buffer.clear();
|
document_sorter_buffer.clear();
|
||||||
document_sorter_buffer.push(Operation::Addition as u8);
|
document_sorter_buffer.push(Operation::Addition as u8);
|
||||||
document_sorter_buffer.extend_from_slice(&flattened_obkv);
|
into_del_add_obkv(
|
||||||
|
KvReaderU16::new(&flattened_obkv),
|
||||||
|
true,
|
||||||
|
keep_original_version,
|
||||||
|
&mut document_sorter_buffer,
|
||||||
|
)?;
|
||||||
self.flattened_sorter
|
self.flattened_sorter
|
||||||
.insert(docid.to_be_bytes(), &document_sorter_buffer)?
|
.insert(docid.to_be_bytes(), &document_sorter_buffer)?
|
||||||
}
|
}
|
||||||
@ -288,7 +304,12 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||||||
|
|
||||||
document_sorter_buffer.clear();
|
document_sorter_buffer.clear();
|
||||||
document_sorter_buffer.push(Operation::Addition as u8);
|
document_sorter_buffer.push(Operation::Addition as u8);
|
||||||
document_sorter_buffer.extend_from_slice(&obkv_buffer);
|
into_del_add_obkv(
|
||||||
|
KvReaderU16::new(&obkv_buffer),
|
||||||
|
false,
|
||||||
|
true,
|
||||||
|
&mut document_sorter_buffer,
|
||||||
|
)?;
|
||||||
// We use the extracted/generated user id as the key for this document.
|
// We use the extracted/generated user id as the key for this document.
|
||||||
self.original_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?;
|
self.original_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?;
|
||||||
|
|
||||||
@ -296,7 +317,12 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||||||
Some(flattened_obkv) => {
|
Some(flattened_obkv) => {
|
||||||
document_sorter_buffer.clear();
|
document_sorter_buffer.clear();
|
||||||
document_sorter_buffer.push(Operation::Addition as u8);
|
document_sorter_buffer.push(Operation::Addition as u8);
|
||||||
document_sorter_buffer.extend_from_slice(&flattened_obkv);
|
into_del_add_obkv(
|
||||||
|
KvReaderU16::new(&flattened_obkv),
|
||||||
|
false,
|
||||||
|
true,
|
||||||
|
&mut document_sorter_buffer,
|
||||||
|
)?;
|
||||||
self.flattened_sorter
|
self.flattened_sorter
|
||||||
.insert(docid.to_be_bytes(), &document_sorter_buffer)?
|
.insert(docid.to_be_bytes(), &document_sorter_buffer)?
|
||||||
}
|
}
|
||||||
@ -354,19 +380,25 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||||||
let external_documents_ids = self.index.external_documents_ids(wtxn)?;
|
let external_documents_ids = self.index.external_documents_ids(wtxn)?;
|
||||||
|
|
||||||
let mut documents_deleted = 0;
|
let mut documents_deleted = 0;
|
||||||
|
let mut document_sorter_buffer = Vec::new();
|
||||||
for to_remove in to_remove {
|
for to_remove in to_remove {
|
||||||
if should_abort() {
|
if should_abort() {
|
||||||
return Err(Error::InternalError(InternalError::AbortedIndexation));
|
return Err(Error::InternalError(InternalError::AbortedIndexation));
|
||||||
}
|
}
|
||||||
|
|
||||||
match self.new_external_documents_ids_builder.entry((*to_remove).into()) {
|
// Check if the document has been added in the current indexing process.
|
||||||
|
let deleted_from_current = match self
|
||||||
|
.new_external_documents_ids_builder
|
||||||
|
.entry((*to_remove).into())
|
||||||
|
{
|
||||||
// if the document was added in a previous iteration of the transform we make it as deleted in the sorters.
|
// if the document was added in a previous iteration of the transform we make it as deleted in the sorters.
|
||||||
Entry::Occupied(entry) => {
|
Entry::Occupied(entry) => {
|
||||||
let doc_id = *entry.get() as u32;
|
let doc_id = *entry.get() as u32;
|
||||||
self.original_sorter
|
document_sorter_buffer.clear();
|
||||||
.insert(doc_id.to_be_bytes(), [Operation::Deletion as u8])?;
|
document_sorter_buffer.push(Operation::Deletion as u8);
|
||||||
self.flattened_sorter
|
obkv::KvWriterU16::new(&mut document_sorter_buffer).finish().unwrap();
|
||||||
.insert(doc_id.to_be_bytes(), [Operation::Deletion as u8])?;
|
self.original_sorter.insert(doc_id.to_be_bytes(), &document_sorter_buffer)?;
|
||||||
|
self.flattened_sorter.insert(doc_id.to_be_bytes(), &document_sorter_buffer)?;
|
||||||
|
|
||||||
// we must NOT update the list of replaced_documents_ids
|
// we must NOT update the list of replaced_documents_ids
|
||||||
// Either:
|
// Either:
|
||||||
@ -375,21 +407,69 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||||||
// we're removing it there is nothing to do.
|
// we're removing it there is nothing to do.
|
||||||
self.new_documents_ids.remove(doc_id);
|
self.new_documents_ids.remove(doc_id);
|
||||||
entry.remove_entry();
|
entry.remove_entry();
|
||||||
|
true
|
||||||
}
|
}
|
||||||
Entry::Vacant(entry) => {
|
Entry::Vacant(_) => false,
|
||||||
// If the document was already in the db we mark it as a `to_delete` document.
|
|
||||||
// It'll be deleted later. We don't need to push anything to the sorters.
|
|
||||||
if let Some(docid) = external_documents_ids.get(entry.key()) {
|
|
||||||
self.replaced_documents_ids.insert(docid);
|
|
||||||
} else {
|
|
||||||
// if the document is nowehere to be found, there is nothing to do and we must NOT
|
|
||||||
// increment the count of documents_deleted
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
documents_deleted += 1;
|
// If the document was already in the db we mark it as a `to_delete` document.
|
||||||
|
// Then we push the document in sorters in deletion mode.
|
||||||
|
let deleted_from_db = match external_documents_ids.get(&to_remove) {
|
||||||
|
Some(docid) => {
|
||||||
|
self.replaced_documents_ids.insert(docid);
|
||||||
|
|
||||||
|
// fetch the obkv document
|
||||||
|
let original_key = BEU32::new(docid);
|
||||||
|
let base_obkv = self
|
||||||
|
.index
|
||||||
|
.documents
|
||||||
|
.remap_data_type::<heed::types::ByteSlice>()
|
||||||
|
.get(wtxn, &original_key)?
|
||||||
|
.ok_or(InternalError::DatabaseMissingEntry {
|
||||||
|
db_name: db_name::DOCUMENTS,
|
||||||
|
key: None,
|
||||||
|
})?;
|
||||||
|
|
||||||
|
// push it as to delete in the original_sorter
|
||||||
|
document_sorter_buffer.clear();
|
||||||
|
document_sorter_buffer.push(Operation::Deletion as u8);
|
||||||
|
into_del_add_obkv(
|
||||||
|
KvReaderU16::new(base_obkv),
|
||||||
|
true,
|
||||||
|
false,
|
||||||
|
&mut document_sorter_buffer,
|
||||||
|
)?;
|
||||||
|
self.original_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?;
|
||||||
|
|
||||||
|
// flatten it and push it as to delete in the flattened_sorter
|
||||||
|
match self.flatten_from_fields_ids_map(KvReader::new(base_obkv))? {
|
||||||
|
Some(flattened_obkv) => {
|
||||||
|
// we recreate our buffer with the flattened documents
|
||||||
|
document_sorter_buffer.clear();
|
||||||
|
document_sorter_buffer.push(Operation::Deletion as u8);
|
||||||
|
into_del_add_obkv(
|
||||||
|
KvReaderU16::new(&flattened_obkv),
|
||||||
|
true,
|
||||||
|
false,
|
||||||
|
&mut document_sorter_buffer,
|
||||||
|
)?;
|
||||||
|
self.flattened_sorter
|
||||||
|
.insert(docid.to_be_bytes(), &document_sorter_buffer)?
|
||||||
|
}
|
||||||
|
None => self
|
||||||
|
.flattened_sorter
|
||||||
|
.insert(docid.to_be_bytes(), &document_sorter_buffer)?,
|
||||||
|
}
|
||||||
|
|
||||||
|
true
|
||||||
|
}
|
||||||
|
None => false,
|
||||||
|
};
|
||||||
|
|
||||||
|
// increase counter only if the document existed somewhere before.
|
||||||
|
if deleted_from_current || deleted_from_db {
|
||||||
|
documents_deleted += 1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(documents_deleted)
|
Ok(documents_deleted)
|
||||||
@ -589,9 +669,7 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||||||
let mut documents_count = 0;
|
let mut documents_count = 0;
|
||||||
|
|
||||||
while let Some((key, val)) = iter.next()? {
|
while let Some((key, val)) = iter.next()? {
|
||||||
if val[0] == Operation::Deletion as u8 {
|
// skip first byte corresponding to the operation type (Deletion or Addition).
|
||||||
continue;
|
|
||||||
}
|
|
||||||
let val = &val[1..];
|
let val = &val[1..];
|
||||||
|
|
||||||
// send a callback to show at which step we are
|
// send a callback to show at which step we are
|
||||||
@ -631,9 +709,7 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||||||
// We get rids of the `Operation` byte and skip the deleted documents as well.
|
// We get rids of the `Operation` byte and skip the deleted documents as well.
|
||||||
let mut iter = self.flattened_sorter.into_stream_merger_iter()?;
|
let mut iter = self.flattened_sorter.into_stream_merger_iter()?;
|
||||||
while let Some((key, val)) = iter.next()? {
|
while let Some((key, val)) = iter.next()? {
|
||||||
if val[0] == Operation::Deletion as u8 {
|
// skip first byte corresponding to the operation type (Deletion or Addition).
|
||||||
continue;
|
|
||||||
}
|
|
||||||
let val = &val[1..];
|
let val = &val[1..];
|
||||||
writer.insert(key, val)?;
|
writer.insert(key, val)?;
|
||||||
}
|
}
|
||||||
@ -711,6 +787,7 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||||||
);
|
);
|
||||||
|
|
||||||
let mut obkv_buffer = Vec::new();
|
let mut obkv_buffer = Vec::new();
|
||||||
|
let mut document_sorter_buffer = Vec::new();
|
||||||
for result in self.index.all_documents(wtxn)? {
|
for result in self.index.all_documents(wtxn)? {
|
||||||
let (docid, obkv) = result?;
|
let (docid, obkv) = result?;
|
||||||
|
|
||||||
@ -725,7 +802,9 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let buffer = obkv_writer.into_inner()?;
|
let buffer = obkv_writer.into_inner()?;
|
||||||
original_writer.insert(docid.to_be_bytes(), &buffer)?;
|
document_sorter_buffer.clear();
|
||||||
|
into_del_add_obkv(KvReaderU16::new(buffer), true, true, &mut document_sorter_buffer)?;
|
||||||
|
original_writer.insert(docid.to_be_bytes(), &document_sorter_buffer)?;
|
||||||
|
|
||||||
// Once we have the document. We're going to flatten it
|
// Once we have the document. We're going to flatten it
|
||||||
// and insert it in the flattened sorter.
|
// and insert it in the flattened sorter.
|
||||||
@ -760,7 +839,9 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||||||
let value = serde_json::to_vec(&value).map_err(InternalError::SerdeJson)?;
|
let value = serde_json::to_vec(&value).map_err(InternalError::SerdeJson)?;
|
||||||
writer.insert(fid, &value)?;
|
writer.insert(fid, &value)?;
|
||||||
}
|
}
|
||||||
flattened_writer.insert(docid.to_be_bytes(), &buffer)?;
|
document_sorter_buffer.clear();
|
||||||
|
into_del_add_obkv(KvReaderU16::new(&buffer), true, true, &mut document_sorter_buffer)?;
|
||||||
|
flattened_writer.insert(docid.to_be_bytes(), &document_sorter_buffer)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Once we have written all the documents, we extract
|
// Once we have written all the documents, we extract
|
||||||
@ -824,38 +905,86 @@ mod test {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn merge_obkvs() {
|
fn merge_obkvs() {
|
||||||
let mut doc_0 = Vec::new();
|
let mut additive_doc_0 = Vec::new();
|
||||||
let mut kv_writer = KvWriter::new(&mut doc_0);
|
let mut deletive_doc_0 = Vec::new();
|
||||||
|
let mut del_add_doc_0 = Vec::new();
|
||||||
|
let mut kv_writer = KvWriter::memory();
|
||||||
kv_writer.insert(0_u8, [0]).unwrap();
|
kv_writer.insert(0_u8, [0]).unwrap();
|
||||||
kv_writer.finish().unwrap();
|
let buffer = kv_writer.into_inner().unwrap();
|
||||||
doc_0.insert(0, Operation::Addition as u8);
|
into_del_add_obkv(KvReaderU16::new(&buffer), false, true, &mut additive_doc_0).unwrap();
|
||||||
|
additive_doc_0.insert(0, Operation::Addition as u8);
|
||||||
|
into_del_add_obkv(KvReaderU16::new(&buffer), true, false, &mut deletive_doc_0).unwrap();
|
||||||
|
deletive_doc_0.insert(0, Operation::Deletion as u8);
|
||||||
|
into_del_add_obkv(KvReaderU16::new(&buffer), true, true, &mut del_add_doc_0).unwrap();
|
||||||
|
del_add_doc_0.insert(0, Operation::Addition as u8);
|
||||||
|
|
||||||
let ret = merge_obkvs_and_operations(&[], &[Cow::from(doc_0.as_slice())]).unwrap();
|
let mut additive_doc_1 = Vec::new();
|
||||||
assert_eq!(*ret, doc_0);
|
let mut kv_writer = KvWriter::memory();
|
||||||
|
kv_writer.insert(1_u8, [1]).unwrap();
|
||||||
|
let buffer = kv_writer.into_inner().unwrap();
|
||||||
|
into_del_add_obkv(KvReaderU16::new(&buffer), false, true, &mut additive_doc_1).unwrap();
|
||||||
|
additive_doc_1.insert(0, Operation::Addition as u8);
|
||||||
|
|
||||||
let ret = merge_obkvs_and_operations(
|
let mut additive_doc_0_1 = Vec::new();
|
||||||
|
let mut kv_writer = KvWriter::memory();
|
||||||
|
kv_writer.insert(0_u8, [0]).unwrap();
|
||||||
|
kv_writer.insert(1_u8, [1]).unwrap();
|
||||||
|
let buffer = kv_writer.into_inner().unwrap();
|
||||||
|
into_del_add_obkv(KvReaderU16::new(&buffer), false, true, &mut additive_doc_0_1).unwrap();
|
||||||
|
additive_doc_0_1.insert(0, Operation::Addition as u8);
|
||||||
|
|
||||||
|
let ret = obkvs_merge_additions_and_deletions(&[], &[Cow::from(additive_doc_0.as_slice())])
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(*ret, additive_doc_0);
|
||||||
|
|
||||||
|
let ret = obkvs_merge_additions_and_deletions(
|
||||||
&[],
|
&[],
|
||||||
&[Cow::from([Operation::Deletion as u8].as_slice()), Cow::from(doc_0.as_slice())],
|
&[Cow::from(deletive_doc_0.as_slice()), Cow::from(additive_doc_0.as_slice())],
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
assert_eq!(*ret, doc_0);
|
assert_eq!(*ret, del_add_doc_0);
|
||||||
|
|
||||||
let ret = merge_obkvs_and_operations(
|
let ret = obkvs_merge_additions_and_deletions(
|
||||||
&[],
|
&[],
|
||||||
&[Cow::from(doc_0.as_slice()), Cow::from([Operation::Deletion as u8].as_slice())],
|
&[Cow::from(additive_doc_0.as_slice()), Cow::from(deletive_doc_0.as_slice())],
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
assert_eq!(*ret, [Operation::Deletion as u8]);
|
assert_eq!(*ret, deletive_doc_0);
|
||||||
|
|
||||||
let ret = merge_obkvs_and_operations(
|
let ret = obkvs_merge_additions_and_deletions(
|
||||||
&[],
|
&[],
|
||||||
&[
|
&[
|
||||||
Cow::from([Operation::Addition as u8, 1].as_slice()),
|
Cow::from(additive_doc_1.as_slice()),
|
||||||
Cow::from([Operation::Deletion as u8].as_slice()),
|
Cow::from(deletive_doc_0.as_slice()),
|
||||||
Cow::from(doc_0.as_slice()),
|
Cow::from(additive_doc_0.as_slice()),
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
assert_eq!(*ret, doc_0);
|
assert_eq!(*ret, del_add_doc_0);
|
||||||
|
|
||||||
|
let ret = obkvs_merge_additions_and_deletions(
|
||||||
|
&[],
|
||||||
|
&[Cow::from(additive_doc_1.as_slice()), Cow::from(additive_doc_0.as_slice())],
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(*ret, additive_doc_0_1);
|
||||||
|
|
||||||
|
let ret = obkvs_keep_last_addition_merge_deletions(
|
||||||
|
&[],
|
||||||
|
&[Cow::from(additive_doc_1.as_slice()), Cow::from(additive_doc_0.as_slice())],
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(*ret, additive_doc_0);
|
||||||
|
|
||||||
|
let ret = obkvs_keep_last_addition_merge_deletions(
|
||||||
|
&[],
|
||||||
|
&[
|
||||||
|
Cow::from(deletive_doc_0.as_slice()),
|
||||||
|
Cow::from(additive_doc_1.as_slice()),
|
||||||
|
Cow::from(additive_doc_0.as_slice()),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(*ret, del_add_doc_0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -21,6 +21,7 @@ pub use self::words_prefixes_fst::WordsPrefixesFst;
|
|||||||
|
|
||||||
mod available_documents_ids;
|
mod available_documents_ids;
|
||||||
mod clear_documents;
|
mod clear_documents;
|
||||||
|
pub(crate) mod del_add;
|
||||||
mod delete_documents;
|
mod delete_documents;
|
||||||
pub(crate) mod facet;
|
pub(crate) mod facet;
|
||||||
mod index_documents;
|
mod index_documents;
|
||||||
|
Loading…
Reference in New Issue
Block a user