Use the primary key and external id in the transform

This commit is contained in:
Kerollmops 2022-06-21 11:12:51 +02:00
parent 742543091e
commit 905af2a2e9
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4

View File

@ -9,7 +9,7 @@ use heed::RoTxn;
use itertools::Itertools; use itertools::Itertools;
use obkv::{KvReader, KvWriter}; use obkv::{KvReader, KvWriter};
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use serde_json::{Map, Value}; use serde_json::Value;
use smartstring::SmartString; use smartstring::SmartString;
use super::helpers::{create_sorter, create_writer, keep_latest_obkv, merge_obkvs, MergeFn}; use super::helpers::{create_sorter, create_writer, keep_latest_obkv, merge_obkvs, MergeFn};
@ -17,15 +17,12 @@ use super::{IndexDocumentsMethod, IndexerConfig};
use crate::documents::{DocumentsBatchIndex, EnrichedDocument, EnrichedDocumentsBatchReader}; use crate::documents::{DocumentsBatchIndex, EnrichedDocument, EnrichedDocumentsBatchReader};
use crate::error::{Error, InternalError, UserError}; use crate::error::{Error, InternalError, UserError};
use crate::index::db_name; use crate::index::db_name;
use crate::update::index_documents::validate_document_id_value;
use crate::update::{AvailableDocumentsIds, UpdateIndexingStep}; use crate::update::{AvailableDocumentsIds, UpdateIndexingStep};
use crate::{ use crate::{
ExternalDocumentsIds, FieldDistribution, FieldId, FieldIdMapMissingEntry, FieldsIdsMap, Index, ExternalDocumentsIds, FieldDistribution, FieldId, FieldIdMapMissingEntry, FieldsIdsMap, Index,
Result, BEU32, Result, BEU32,
}; };
const DEFAULT_PRIMARY_KEY_NAME: &str = "id";
pub struct TransformOutput { pub struct TransformOutput {
pub primary_key: String, pub primary_key: String,
pub fields_ids_map: FieldsIdsMap, pub fields_ids_map: FieldsIdsMap,
@ -85,18 +82,6 @@ fn create_fields_mapping(
.collect() .collect()
} }
/// Look for a key containing the [DEFAULT_PRIMARY_KEY_NAME] in the fields.
/// It doesn't look in the subfield because we don't want to enable the
/// primary key inference on nested objects.
fn find_primary_key(index: &DocumentsBatchIndex) -> Option<&str> {
index
.iter()
.sorted_by_key(|(k, _)| *k)
.map(|(_, v)| v)
.find(|v| v.to_lowercase().contains(DEFAULT_PRIMARY_KEY_NAME))
.map(String::as_str)
}
impl<'a, 'i> Transform<'a, 'i> { impl<'a, 'i> Transform<'a, 'i> {
pub fn new( pub fn new(
wtxn: &mut heed::RwTxn, wtxn: &mut heed::RwTxn,
@ -167,28 +152,15 @@ impl<'a, 'i> Transform<'a, 'i> {
let mapping = create_fields_mapping(&mut self.fields_ids_map, fields_index)?; let mapping = create_fields_mapping(&mut self.fields_ids_map, fields_index)?;
let alternative_name = self let primary_key = cursor.primary_key().to_string();
.index self.fields_ids_map.insert(&primary_key).ok_or(UserError::AttributeLimitReached)?;
.primary_key(wtxn)? let primary_key_id_nested = primary_key.contains('.');
.or_else(|| find_primary_key(fields_index))
.map(String::from);
let (primary_key_id, primary_key_name) = compute_primary_key_pair(
self.index.primary_key(wtxn)?,
&mut self.fields_ids_map,
alternative_name,
self.autogenerate_docids,
)?;
let primary_key_id_nested = primary_key_name.contains('.');
let mut flattened_document = None; let mut flattened_document = None;
let mut obkv_buffer = Vec::new(); let mut obkv_buffer = Vec::new();
let mut flattened_obkv_buffer = Vec::new(); let mut flattened_obkv_buffer = Vec::new();
let mut documents_count = 0; let mut documents_count = 0;
let mut external_id_buffer = Vec::new();
let mut field_buffer: Vec<(u16, Cow<[u8]>)> = Vec::new(); let mut field_buffer: Vec<(u16, Cow<[u8]>)> = Vec::new();
let addition_index = cursor.documents_batch_index().clone();
while let Some(enriched_document) = cursor.next_enriched_document()? { while let Some(enriched_document) = cursor.next_enriched_document()? {
let EnrichedDocument { document, external_id } = enriched_document; let EnrichedDocument { document, external_id } = enriched_document;
@ -210,8 +182,7 @@ impl<'a, 'i> Transform<'a, 'i> {
// it, transform it into a string and validate it, and then update it in the // it, transform it into a string and validate it, and then update it in the
// document. If none is found, and we were told to generate missing document ids, then // document. If none is found, and we were told to generate missing document ids, then
// we create the missing field, and update the new document. // we create the missing field, and update the new document.
let mut uuid_buffer = [0; uuid::fmt::Hyphenated::LENGTH]; if primary_key_id_nested {
let external_id = if primary_key_id_nested {
let mut field_buffer_cache = field_buffer_cache.clone(); let mut field_buffer_cache = field_buffer_cache.clone();
self.flatten_from_field_mapping( self.flatten_from_field_mapping(
&mapping, &mapping,
@ -220,29 +191,6 @@ impl<'a, 'i> Transform<'a, 'i> {
&mut field_buffer_cache, &mut field_buffer_cache,
)?; )?;
flattened_document = Some(&flattened_obkv_buffer); flattened_document = Some(&flattened_obkv_buffer);
let document = KvReader::new(&flattened_obkv_buffer);
update_primary_key(
document,
&addition_index,
primary_key_id,
&primary_key_name,
&mut uuid_buffer,
&mut field_buffer_cache,
&mut external_id_buffer,
self.autogenerate_docids,
)?
} else {
update_primary_key(
document,
&addition_index,
primary_key_id,
&primary_key_name,
&mut uuid_buffer,
&mut field_buffer_cache,
&mut external_id_buffer,
self.autogenerate_docids,
)?
}; };
// Insertion in a obkv need to be done with keys ordered. For now they are ordered // Insertion in a obkv need to be done with keys ordered. For now they are ordered
@ -318,7 +266,6 @@ impl<'a, 'i> Transform<'a, 'i> {
}); });
field_buffer = drop_and_reuse(field_buffer_cache); field_buffer = drop_and_reuse(field_buffer_cache);
external_id_buffer.clear();
obkv_buffer.clear(); obkv_buffer.clear();
} }
@ -327,7 +274,7 @@ impl<'a, 'i> Transform<'a, 'i> {
}); });
self.index.put_fields_ids_map(wtxn, &self.fields_ids_map)?; self.index.put_fields_ids_map(wtxn, &self.fields_ids_map)?;
self.index.put_primary_key(wtxn, &primary_key_name)?; self.index.put_primary_key(wtxn, &primary_key)?;
self.documents_count += documents_count; self.documents_count += documents_count;
// Now that we have a valid sorter that contains the user id and the obkv we // Now that we have a valid sorter that contains the user id and the obkv we
// give it to the last transforming function which returns the TransformOutput. // give it to the last transforming function which returns the TransformOutput.
@ -749,42 +696,6 @@ impl<'a, 'i> Transform<'a, 'i> {
} }
} }
/// Given an optional primary key and an optional alternative name, returns the (field_id, attr_name)
/// for the primary key according to the following rules:
/// - if primary_key is `Some`, returns the id and the name, else
/// - if alternative_name is Some, adds alternative to the fields_ids_map, and returns the pair, else
/// - if autogenerate_docids is true, insert the default id value in the field ids map ("id") and
/// returns the pair, else
/// - returns an error.
fn compute_primary_key_pair(
primary_key: Option<&str>,
fields_ids_map: &mut FieldsIdsMap,
alternative_name: Option<String>,
autogenerate_docids: bool,
) -> Result<(FieldId, String)> {
match primary_key {
Some(primary_key) => {
let id = fields_ids_map.insert(primary_key).ok_or(UserError::AttributeLimitReached)?;
Ok((id, primary_key.to_string()))
}
None => {
let name = match alternative_name {
Some(key) => key,
None => {
if !autogenerate_docids {
// If there is no primary key in the current document batch, we must
// return an error and not automatically generate any document id.
return Err(UserError::MissingPrimaryKey.into());
}
DEFAULT_PRIMARY_KEY_NAME.to_string()
}
};
let id = fields_ids_map.insert(&name).ok_or(UserError::AttributeLimitReached)?;
Ok((id, name))
}
}
}
/// Drops all the value of type `U` in vec, and reuses the allocation to create a `Vec<T>`. /// Drops all the value of type `U` in vec, and reuses the allocation to create a `Vec<T>`.
/// ///
/// The size and alignment of T and U must match. /// The size and alignment of T and U must match.
@ -796,49 +707,6 @@ fn drop_and_reuse<U, T>(mut vec: Vec<U>) -> Vec<T> {
vec.into_iter().map(|_| unreachable!()).collect() vec.into_iter().map(|_| unreachable!()).collect()
} }
fn update_primary_key<'a>(
document: KvReader<'a, FieldId>,
addition_index: &DocumentsBatchIndex,
primary_key_id: FieldId,
primary_key_name: &str,
uuid_buffer: &'a mut [u8; uuid::fmt::Hyphenated::LENGTH],
field_buffer_cache: &mut Vec<(u16, Cow<'a, [u8]>)>,
mut external_id_buffer: &'a mut Vec<u8>,
autogenerate_docids: bool,
) -> Result<Cow<'a, str>> {
match field_buffer_cache.iter_mut().find(|(id, _)| *id == primary_key_id) {
Some((_, bytes)) => {
let document_id = serde_json::from_slice(bytes).map_err(InternalError::SerdeJson)?;
let value = validate_document_id_value(document_id)??;
serde_json::to_writer(external_id_buffer, &value).map_err(InternalError::SerdeJson)?;
Ok(Cow::Owned(value))
}
None if autogenerate_docids => {
let uuid = uuid::Uuid::new_v4().as_hyphenated().encode_lower(uuid_buffer);
serde_json::to_writer(&mut external_id_buffer, &uuid)
.map_err(InternalError::SerdeJson)?;
field_buffer_cache.push((primary_key_id, external_id_buffer.as_slice().into()));
Ok(Cow::Borrowed(&*uuid))
}
None => {
let mut json = Map::new();
for (key, value) in document.iter() {
let key = addition_index.name(key).map(ToString::to_string);
let value = serde_json::from_slice::<Value>(&value).ok();
if let Some((k, v)) = key.zip(value) {
json.insert(k, v);
}
}
Err(UserError::MissingDocumentId {
primary_key: primary_key_name.to_string(),
document: json,
})?
}
}
}
impl TransformOutput { impl TransformOutput {
// find and insert the new field ids // find and insert the new field ids
pub fn compute_real_facets(&self, rtxn: &RoTxn, index: &Index) -> Result<HashSet<String>> { pub fn compute_real_facets(&self, rtxn: &RoTxn, index: &Index) -> Result<HashSet<String>> {
@ -853,87 +721,87 @@ impl TransformOutput {
} }
} }
#[cfg(test)] // #[cfg(test)]
mod test { // mod test {
use super::*; // use super::*;
mod compute_primary_key { // mod compute_primary_key {
use big_s::S; // use big_s::S;
use super::{compute_primary_key_pair, FieldsIdsMap}; // use super::{compute_primary_key_pair, FieldsIdsMap};
#[test] // #[test]
fn should_return_primary_key_if_is_some() { // fn should_return_primary_key_if_is_some() {
let mut fields_map = FieldsIdsMap::new(); // let mut fields_map = FieldsIdsMap::new();
fields_map.insert("toto").unwrap(); // fields_map.insert("toto").unwrap();
let result = compute_primary_key_pair( // let result = compute_primary_key_pair(
Some("toto"), // Some("toto"),
&mut fields_map, // &mut fields_map,
Some("tata".to_string()), // Some("tata".to_string()),
false, // false,
); // );
assert_eq!(result.unwrap(), (0, "toto".to_string())); // assert_eq!(result.unwrap(), (0, "toto".to_string()));
assert_eq!(fields_map.len(), 1); // assert_eq!(fields_map.len(), 1);
// and with nested fields // // and with nested fields
let mut fields_map = FieldsIdsMap::new(); // let mut fields_map = FieldsIdsMap::new();
fields_map.insert("toto.tata").unwrap(); // fields_map.insert("toto.tata").unwrap();
let result = compute_primary_key_pair( // let result = compute_primary_key_pair(
Some("toto.tata"), // Some("toto.tata"),
&mut fields_map, // &mut fields_map,
Some(S("titi")), // Some(S("titi")),
false, // false,
); // );
assert_eq!(result.unwrap(), (0, "toto.tata".to_string())); // assert_eq!(result.unwrap(), (0, "toto.tata".to_string()));
assert_eq!(fields_map.len(), 1); // assert_eq!(fields_map.len(), 1);
} // }
#[test] // #[test]
fn should_return_alternative_if_primary_is_none() { // fn should_return_alternative_if_primary_is_none() {
let mut fields_map = FieldsIdsMap::new(); // let mut fields_map = FieldsIdsMap::new();
let result = // let result =
compute_primary_key_pair(None, &mut fields_map, Some("tata".to_string()), false); // compute_primary_key_pair(None, &mut fields_map, Some("tata".to_string()), false);
assert_eq!(result.unwrap(), (0, S("tata"))); // assert_eq!(result.unwrap(), (0, S("tata")));
assert_eq!(fields_map.len(), 1); // assert_eq!(fields_map.len(), 1);
} // }
#[test] // #[test]
fn should_return_default_if_both_are_none() { // fn should_return_default_if_both_are_none() {
let mut fields_map = FieldsIdsMap::new(); // let mut fields_map = FieldsIdsMap::new();
let result = compute_primary_key_pair(None, &mut fields_map, None, true); // let result = compute_primary_key_pair(None, &mut fields_map, None, true);
assert_eq!(result.unwrap(), (0, S("id"))); // assert_eq!(result.unwrap(), (0, S("id")));
assert_eq!(fields_map.len(), 1); // assert_eq!(fields_map.len(), 1);
} // }
#[test] // #[test]
fn should_return_err_if_both_are_none_and_recompute_is_false() { // fn should_return_err_if_both_are_none_and_recompute_is_false() {
let mut fields_map = FieldsIdsMap::new(); // let mut fields_map = FieldsIdsMap::new();
let result = compute_primary_key_pair(None, &mut fields_map, None, false); // let result = compute_primary_key_pair(None, &mut fields_map, None, false);
assert!(result.is_err()); // assert!(result.is_err());
assert_eq!(fields_map.len(), 0); // assert_eq!(fields_map.len(), 0);
} // }
} // }
mod primary_key_inference { // mod primary_key_inference {
use big_s::S; // use big_s::S;
use bimap::BiHashMap; // use bimap::BiHashMap;
use crate::documents::DocumentsBatchIndex; // use crate::documents::DocumentsBatchIndex;
use crate::update::index_documents::transform::find_primary_key; // use crate::update::index_documents::transform::find_primary_key;
#[test] // #[test]
fn primary_key_infered_on_first_field() { // fn primary_key_infered_on_first_field() {
// We run the test multiple times to change the order in which the fields are iterated upon. // // We run the test multiple times to change the order in which the fields are iterated upon.
for _ in 1..50 { // for _ in 1..50 {
let mut map = BiHashMap::new(); // let mut map = BiHashMap::new();
map.insert(1, S("fakeId")); // map.insert(1, S("fakeId"));
map.insert(2, S("fakeId")); // map.insert(2, S("fakeId"));
map.insert(3, S("fakeId")); // map.insert(3, S("fakeId"));
map.insert(4, S("fakeId")); // map.insert(4, S("fakeId"));
map.insert(0, S("realId")); // map.insert(0, S("realId"));
assert_eq!(find_primary_key(&DocumentsBatchIndex(map)), Some("realId")); // assert_eq!(find_primary_key(&DocumentsBatchIndex(map)), Some("realId"));
} // }
} // }
} // }
} // }