Move the obkv merging functions into the merge_function module

This commit is contained in:
Kerollmops 2021-06-09 14:57:03 +02:00
parent ab727e428b
commit 65b1d09d55
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
2 changed files with 20 additions and 21 deletions

View File

@ -1,12 +1,28 @@
use std::borrow::Cow; use std::borrow::Cow;
use anyhow::bail;
use bstr::ByteSlice as _;
use fst::IntoStreamer; use fst::IntoStreamer;
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use crate::heed_codec::CboRoaringBitmapCodec; use crate::heed_codec::CboRoaringBitmapCodec;
/// Only the last value associated with an id is kept.
pub fn keep_latest_obkv(_key: &[u8], obkvs: &[Cow<[u8]>]) -> anyhow::Result<Vec<u8>> {
Ok(obkvs.last().unwrap().clone().into_owned())
}
/// Merge all the obks in the order we see them.
pub fn merge_obkvs(_key: &[u8], obkvs: &[Cow<[u8]>]) -> anyhow::Result<Vec<u8>> {
let mut iter = obkvs.iter();
let first = iter.next().map(|b| b.clone().into_owned()).unwrap();
Ok(iter.fold(first, |acc, current| {
let first = obkv::KvReader::new(&acc);
let second = obkv::KvReader::new(current);
let mut buffer = Vec::new();
merge_two_obkvs(first, second, &mut buffer);
buffer
}))
}
// Union of multiple FSTs // Union of multiple FSTs
pub fn fst_merge(_key: &[u8], values: &[Cow<[u8]>]) -> anyhow::Result<Vec<u8>> { pub fn fst_merge(_key: &[u8], values: &[Cow<[u8]>]) -> anyhow::Result<Vec<u8>> {
let fsts = values.iter().map(fst::Set::new).collect::<Result<Vec<_>, _>>()?; let fsts = values.iter().map(fst::Set::new).collect::<Result<Vec<_>, _>>()?;

View File

@ -10,8 +10,9 @@ use log::info;
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use serde_json::{Map, Value}; use serde_json::{Map, Value};
use crate::{Index, BEU32, MergeFn, FieldsIdsMap, ExternalDocumentsIds, FieldId, FieldsDistribution}; use crate::update::index_documents::merge_function::{merge_obkvs, keep_latest_obkv};
use crate::update::{AvailableDocumentsIds, UpdateIndexingStep}; use crate::update::{AvailableDocumentsIds, UpdateIndexingStep};
use crate::{Index, BEU32, MergeFn, FieldsIdsMap, ExternalDocumentsIds, FieldId, FieldsDistribution};
use super::merge_function::merge_two_obkvs; use super::merge_function::merge_two_obkvs;
use super::{create_writer, create_sorter, IndexDocumentsMethod}; use super::{create_writer, create_sorter, IndexDocumentsMethod};
@ -552,24 +553,6 @@ fn compute_primary_key_pair(
} }
} }
/// Only the last value associated with an id is kept.
fn keep_latest_obkv(_key: &[u8], obkvs: &[Cow<[u8]>]) -> anyhow::Result<Vec<u8>> {
obkvs.last().context("no last value").map(|last| last.clone().into_owned())
}
/// Merge all the obks in the order we see them.
fn merge_obkvs(_key: &[u8], obkvs: &[Cow<[u8]>]) -> anyhow::Result<Vec<u8>> {
let mut iter = obkvs.iter();
let first = iter.next().map(|b| b.clone().into_owned()).context("no first value")?;
Ok(iter.fold(first, |acc, current| {
let first = obkv::KvReader::new(&acc);
let second = obkv::KvReader::new(current);
let mut buffer = Vec::new();
merge_two_obkvs(first, second, &mut buffer);
buffer
}))
}
fn validate_document_id(document_id: &str) -> Option<&str> { fn validate_document_id(document_id: &str) -> Option<&str> {
let document_id = document_id.trim(); let document_id = document_id.trim();
Some(document_id).filter(|id| { Some(document_id).filter(|id| {