Simplify Transform::read_documents, enabled by enriched documents reader

This commit is contained in:
Loïc Lecrenier 2022-07-18 12:41:58 +02:00
parent 448114cc1c
commit ab1571cdec

View File

@ -153,18 +153,18 @@ impl<'a, 'i> Transform<'a, 'i> {
let mapping = create_fields_mapping(&mut self.fields_ids_map, fields_index)?; let mapping = create_fields_mapping(&mut self.fields_ids_map, fields_index)?;
let primary_key = cursor.primary_key().to_string(); let primary_key = cursor.primary_key().to_string();
let primary_key_id_nested = primary_key.contains('.');
let primary_key_id = let primary_key_id =
self.fields_ids_map.insert(&primary_key).ok_or(UserError::AttributeLimitReached)?; self.fields_ids_map.insert(&primary_key).ok_or(UserError::AttributeLimitReached)?;
let mut flattened_document = None;
let mut obkv_buffer = Vec::new(); let mut obkv_buffer = Vec::new();
let mut flattened_obkv_buffer = Vec::new();
let mut documents_count = 0; let mut documents_count = 0;
let mut docid_buffer: Vec<u8> = Vec::new();
let mut field_buffer: Vec<(u16, Cow<[u8]>)> = Vec::new(); let mut field_buffer: Vec<(u16, Cow<[u8]>)> = Vec::new();
while let Some(enriched_document) = cursor.next_enriched_document()? { while let Some(enriched_document) = cursor.next_enriched_document()? {
let EnrichedDocument { document, document_id } = enriched_document; let EnrichedDocument { document, document_id } = enriched_document;
// drop_and_reuse is called instead of .clear() to communicate to the compiler that field_buffer
// does not keep references from the cursor between loop iterations
let mut field_buffer_cache = drop_and_reuse(field_buffer); let mut field_buffer_cache = drop_and_reuse(field_buffer);
if self.indexer_settings.log_every_n.map_or(false, |len| documents_count % len == 0) { if self.indexer_settings.log_every_n.map_or(false, |len| documents_count % len == 0) {
progress_callback(UpdateIndexingStep::RemapDocumentAddition { progress_callback(UpdateIndexingStep::RemapDocumentAddition {
@ -176,8 +176,9 @@ impl<'a, 'i> Transform<'a, 'i> {
// we must insert this document id into the remaped document. // we must insert this document id into the remaped document.
let external_id = document_id.value(); let external_id = document_id.value();
if document_id.is_generated() { if document_id.is_generated() {
let docid = serde_json::to_vec(external_id).map_err(InternalError::SerdeJson)?; serde_json::to_writer(&mut docid_buffer, external_id)
field_buffer_cache.push((primary_key_id, Cow::from(docid))); .map_err(InternalError::SerdeJson)?;
field_buffer_cache.push((primary_key_id, Cow::from(&docid_buffer)));
} }
for (k, v) in document.iter() { for (k, v) in document.iter() {
@ -186,22 +187,6 @@ impl<'a, 'i> Transform<'a, 'i> {
field_buffer_cache.push((mapped_id, Cow::from(v))); field_buffer_cache.push((mapped_id, Cow::from(v)));
} }
// We need to make sure that every document has a primary key. After we have remapped
// all the fields in the document, we try to find the primary key value. If we can find
// it, transform it into a string and validate it, and then update it in the
// document. If none is found, and we were told to generate missing document ids, then
// we create the missing field, and update the new document.
if primary_key_id_nested {
let mut field_buffer_cache = field_buffer_cache.clone();
self.flatten_from_field_mapping(
&mapping,
&document,
&mut flattened_obkv_buffer,
&mut field_buffer_cache,
)?;
flattened_document = Some(&flattened_obkv_buffer);
};
// Insertion in a obkv need to be done with keys ordered. For now they are ordered // Insertion in a obkv need to be done with keys ordered. For now they are ordered
// according to the document addition key order, so we sort it according to the // according to the document addition key order, so we sort it according to the
// fieldids map keys order. // fieldids map keys order.
@ -256,18 +241,12 @@ impl<'a, 'i> Transform<'a, 'i> {
} }
// We use the extracted/generated user id as the key for this document. // We use the extracted/generated user id as the key for this document.
self.original_sorter.insert(&docid.to_be_bytes(), obkv_buffer.clone())?; self.original_sorter.insert(&docid.to_be_bytes(), &obkv_buffer)?;
documents_count += 1; documents_count += 1;
if let Some(flatten) = flattened_document { match self.flatten_from_fields_ids_map(KvReader::new(&obkv_buffer))? {
self.flattened_sorter.insert(docid.to_be_bytes(), &flatten)?; Some(buffer) => self.flattened_sorter.insert(docid.to_be_bytes(), &buffer)?,
} else { None => self.flattened_sorter.insert(docid.to_be_bytes(), &obkv_buffer)?,
match self.flatten_from_fields_ids_map(KvReader::new(&obkv_buffer))? {
Some(buffer) => self.flattened_sorter.insert(docid.to_be_bytes(), &buffer)?,
None => {
self.flattened_sorter.insert(docid.to_be_bytes(), obkv_buffer.clone())?
}
}
} }
progress_callback(UpdateIndexingStep::RemapDocumentAddition { progress_callback(UpdateIndexingStep::RemapDocumentAddition {
@ -275,6 +254,7 @@ impl<'a, 'i> Transform<'a, 'i> {
}); });
field_buffer = drop_and_reuse(field_buffer_cache); field_buffer = drop_and_reuse(field_buffer_cache);
docid_buffer.clear();
obkv_buffer.clear(); obkv_buffer.clear();
} }
@ -345,61 +325,6 @@ impl<'a, 'i> Transform<'a, 'i> {
Ok(Some(buffer)) Ok(Some(buffer))
} }
// Flatten a document from a field mapping generated by [create_fields_mapping]
fn flatten_from_field_mapping(
&mut self,
mapping: &HashMap<FieldId, FieldId>,
obkv: &KvReader<FieldId>,
output_buffer: &mut Vec<u8>,
field_buffer_cache: &mut Vec<(u16, Cow<[u8]>)>,
) -> Result<()> {
// store the keys and values of the json + the original obkv
let mut key_value: Vec<(FieldId, Cow<[u8]>)> = Vec::new();
// if the primary_key is nested we need to flatten the document before being able to do anything
let mut doc = serde_json::Map::new();
// we recreate a json containing only the fields that needs to be flattened.
// all the raw values get inserted directly in the `key_value` vec.
for (key, value) in obkv.iter() {
if json_depth_checker::should_flatten_from_unchecked_slice(value) {
let key =
mapping.get(&key).ok_or(InternalError::FieldIdMappingMissingEntry { key })?;
let key =
self.fields_ids_map.name(*key).ok_or(FieldIdMapMissingEntry::FieldId {
field_id: *key,
process: "Flatten from field mapping.",
})?;
let value = serde_json::from_slice::<serde_json::Value>(value)
.map_err(InternalError::SerdeJson)?;
doc.insert(key.to_string(), value);
} else {
key_value.push((key, value.into()));
}
}
let flattened = flatten_serde_json::flatten(&doc);
// Once we have the flattened version we insert all the new generated fields_ids
// (if any) in the fields ids map and serialize the value.
for (key, value) in flattened.into_iter() {
let fid = self.fields_ids_map.insert(&key).ok_or(UserError::AttributeLimitReached)?;
let value = serde_json::to_vec(&value).map_err(InternalError::SerdeJson)?;
key_value.push((fid, value.clone().into()));
if field_buffer_cache.iter().find(|(id, _)| *id == fid).is_none() {
field_buffer_cache.push((fid, value.into()));
}
}
// we sort the key. If there was a conflict between the obkv and the new generated value the
// keys will be consecutive.
key_value.sort_unstable_by_key(|(key, _)| *key);
Self::create_obkv_from_key_value(&mut key_value, output_buffer)?;
Ok(())
}
/// Generate an obkv from a slice of key / value sorted by key. /// Generate an obkv from a slice of key / value sorted by key.
fn create_obkv_from_key_value( fn create_obkv_from_key_value(
key_value: &mut [(FieldId, Cow<[u8]>)], key_value: &mut [(FieldId, Cow<[u8]>)],