2020-10-23 14:11:00 +02:00
|
|
|
use std::borrow::Cow;
|
2022-03-23 17:28:41 +01:00
|
|
|
use std::collections::hash_map::Entry;
|
|
|
|
use std::collections::{HashMap, HashSet};
|
2020-10-23 14:11:00 +02:00
|
|
|
use std::fs::File;
|
2020-10-24 14:02:29 +02:00
|
|
|
use std::io::{Read, Seek, SeekFrom};
|
2020-10-23 14:11:00 +02:00
|
|
|
|
2022-03-23 17:28:41 +01:00
|
|
|
use byteorder::ReadBytesExt;
|
|
|
|
use fxhash::FxHashMap;
|
|
|
|
use heed::RoTxn;
|
2021-08-31 11:44:15 +02:00
|
|
|
use itertools::Itertools;
|
2022-03-23 17:28:41 +01:00
|
|
|
use obkv::{KvReader, KvWriter};
|
2020-10-23 14:11:00 +02:00
|
|
|
use roaring::RoaringBitmap;
|
2020-10-31 16:10:15 +01:00
|
|
|
use serde_json::{Map, Value};
|
2022-04-11 15:43:18 +02:00
|
|
|
use smartstring::SmartString;
|
2020-10-23 14:11:00 +02:00
|
|
|
|
2022-03-23 17:28:41 +01:00
|
|
|
use super::helpers::{create_sorter, create_writer, keep_latest_obkv, merge_obkvs, MergeFn};
|
2021-12-08 14:12:07 +01:00
|
|
|
use super::{IndexDocumentsMethod, IndexerConfig};
|
2021-08-31 11:44:15 +02:00
|
|
|
use crate::documents::{DocumentBatchReader, DocumentsBatchIndex};
|
|
|
|
use crate::error::{Error, InternalError, UserError};
|
2021-06-15 11:06:42 +02:00
|
|
|
use crate::index::db_name;
|
2020-11-11 12:39:09 +01:00
|
|
|
use crate::update::{AvailableDocumentsIds, UpdateIndexingStep};
|
2022-03-23 17:28:41 +01:00
|
|
|
use crate::{
|
|
|
|
ExternalDocumentsIds, FieldDistribution, FieldId, FieldIdMapMissingEntry, FieldsIdsMap, Index,
|
|
|
|
Result, BEU32,
|
|
|
|
};
|
2020-10-24 16:23:08 +02:00
|
|
|
|
2021-01-20 17:27:43 +01:00
|
|
|
const DEFAULT_PRIMARY_KEY_NAME: &str = "id";
|
|
|
|
|
2020-10-23 14:11:00 +02:00
|
|
|
pub struct TransformOutput {
|
2021-01-20 17:27:43 +01:00
|
|
|
pub primary_key: String,
|
2020-10-23 14:11:00 +02:00
|
|
|
pub fields_ids_map: FieldsIdsMap,
|
2021-06-21 15:57:41 +02:00
|
|
|
pub field_distribution: FieldDistribution,
|
2020-11-22 17:53:33 +01:00
|
|
|
pub external_documents_ids: ExternalDocumentsIds<'static>,
|
2020-10-23 14:11:00 +02:00
|
|
|
pub new_documents_ids: RoaringBitmap,
|
|
|
|
pub replaced_documents_ids: RoaringBitmap,
|
|
|
|
pub documents_count: usize,
|
2022-03-23 17:28:41 +01:00
|
|
|
pub original_documents: File,
|
|
|
|
pub flattened_documents: File,
|
2020-10-23 14:11:00 +02:00
|
|
|
}
|
|
|
|
|
2020-11-22 11:54:04 +01:00
|
|
|
/// Extract the external ids, deduplicate and compute the new internal documents ids
|
2020-11-01 11:50:10 +01:00
|
|
|
/// and fields ids, writing all the documents under their internal ids into a final file.
|
|
|
|
///
|
|
|
|
/// Outputs the new `FieldsIdsMap`, the new `UsersIdsDocumentsIds` map, the new documents ids,
|
|
|
|
/// the replaced documents ids, the number of documents in this update and the file
|
|
|
|
/// containing all those documents.
|
2021-12-08 14:12:07 +01:00
|
|
|
pub struct Transform<'a, 'i> {
|
2020-10-26 20:18:10 +01:00
|
|
|
pub index: &'i Index,
|
2022-03-23 17:28:41 +01:00
|
|
|
fields_ids_map: FieldsIdsMap,
|
|
|
|
|
2021-12-08 14:12:07 +01:00
|
|
|
indexer_settings: &'a IndexerConfig,
|
2020-10-31 21:46:55 +01:00
|
|
|
pub autogenerate_docids: bool,
|
2021-12-08 14:12:07 +01:00
|
|
|
pub index_documents_method: IndexDocumentsMethod,
|
|
|
|
|
2022-03-23 17:28:41 +01:00
|
|
|
original_sorter: grenad::Sorter<MergeFn>,
|
|
|
|
flattened_sorter: grenad::Sorter<MergeFn>,
|
|
|
|
replaced_documents_ids: RoaringBitmap,
|
|
|
|
new_documents_ids: RoaringBitmap,
|
2022-04-11 15:43:18 +02:00
|
|
|
// To increase the cache locality and the heap usage we use smartstring.
|
|
|
|
new_external_documents_ids_builder: FxHashMap<SmartString<smartstring::Compact>, u64>,
|
2021-12-08 14:12:07 +01:00
|
|
|
documents_count: usize,
|
2020-10-23 14:11:00 +02:00
|
|
|
}
|
|
|
|
|
2021-08-31 11:44:15 +02:00
|
|
|
/// Create a mapping between the field ids found in the document batch and the one that were
|
|
|
|
/// already present in the index.
|
|
|
|
///
|
|
|
|
/// If new fields are present in the addition, they are added to the index field ids map.
|
|
|
|
fn create_fields_mapping(
|
|
|
|
index_field_map: &mut FieldsIdsMap,
|
|
|
|
batch_field_map: &DocumentsBatchIndex,
|
|
|
|
) -> Result<HashMap<FieldId, FieldId>> {
|
|
|
|
batch_field_map
|
|
|
|
.iter()
|
|
|
|
// we sort by id here to ensure a deterministic mapping of the fields, that preserves
|
|
|
|
// the original ordering.
|
|
|
|
.sorted_by_key(|(&id, _)| id)
|
|
|
|
.map(|(field, name)| match index_field_map.id(&name) {
|
|
|
|
Some(id) => Ok((*field, id)),
|
|
|
|
None => index_field_map
|
|
|
|
.insert(&name)
|
|
|
|
.ok_or(Error::UserError(UserError::AttributeLimitReached))
|
|
|
|
.map(|id| (*field, id)),
|
|
|
|
})
|
|
|
|
.collect()
|
2021-05-06 21:16:40 +02:00
|
|
|
}
|
|
|
|
|
2022-03-23 17:28:41 +01:00
|
|
|
/// Look for a key containing the [DEFAULT_PRIMARY_KEY_NAME] in the fields.
|
|
|
|
/// It doesn't look in the subfield because we don't want to enable the
|
|
|
|
/// primary key inference on nested objects.
|
2021-10-21 11:05:16 +02:00
|
|
|
fn find_primary_key(index: &DocumentsBatchIndex) -> Option<&str> {
|
2021-08-31 11:44:15 +02:00
|
|
|
index
|
2021-10-12 11:14:12 +02:00
|
|
|
.iter()
|
|
|
|
.sorted_by_key(|(k, _)| *k)
|
|
|
|
.map(|(_, v)| v)
|
2021-08-31 11:44:15 +02:00
|
|
|
.find(|v| v.to_lowercase().contains(DEFAULT_PRIMARY_KEY_NAME))
|
|
|
|
.map(String::as_str)
|
|
|
|
}
|
2020-11-01 11:50:10 +01:00
|
|
|
|
2021-12-08 14:12:07 +01:00
|
|
|
impl<'a, 'i> Transform<'a, 'i> {
|
|
|
|
pub fn new(
|
2022-03-23 17:28:41 +01:00
|
|
|
wtxn: &mut heed::RwTxn,
|
2021-12-08 14:12:07 +01:00
|
|
|
index: &'i Index,
|
|
|
|
indexer_settings: &'a IndexerConfig,
|
|
|
|
index_documents_method: IndexDocumentsMethod,
|
|
|
|
autogenerate_docids: bool,
|
2022-03-23 17:28:41 +01:00
|
|
|
) -> Result<Self> {
|
2021-12-08 14:12:07 +01:00
|
|
|
// We must choose the appropriate merge function for when two or more documents
|
|
|
|
// with the same user id must be merged or fully replaced in the same batch.
|
|
|
|
let merge_function = match index_documents_method {
|
|
|
|
IndexDocumentsMethod::ReplaceDocuments => keep_latest_obkv,
|
|
|
|
IndexDocumentsMethod::UpdateDocuments => merge_obkvs,
|
|
|
|
};
|
|
|
|
|
|
|
|
// We initialize the sorter with the user indexing settings.
|
2022-03-23 17:28:41 +01:00
|
|
|
let original_sorter = create_sorter(
|
2021-12-08 14:12:07 +01:00
|
|
|
merge_function,
|
|
|
|
indexer_settings.chunk_compression_type,
|
|
|
|
indexer_settings.chunk_compression_level,
|
|
|
|
indexer_settings.max_nb_chunks,
|
2022-03-23 17:28:41 +01:00
|
|
|
indexer_settings.max_memory.map(|mem| mem / 2),
|
2021-12-08 14:12:07 +01:00
|
|
|
);
|
|
|
|
|
2022-03-23 17:28:41 +01:00
|
|
|
// We initialize the sorter with the user indexing settings.
|
|
|
|
let flattened_sorter = create_sorter(
|
|
|
|
merge_function,
|
|
|
|
indexer_settings.chunk_compression_type,
|
|
|
|
indexer_settings.chunk_compression_level,
|
|
|
|
indexer_settings.max_nb_chunks,
|
|
|
|
indexer_settings.max_memory.map(|mem| mem / 2),
|
|
|
|
);
|
|
|
|
|
|
|
|
Ok(Transform {
|
2021-12-08 14:12:07 +01:00
|
|
|
index,
|
2022-03-23 17:28:41 +01:00
|
|
|
fields_ids_map: index.fields_ids_map(wtxn)?,
|
2021-12-08 14:12:07 +01:00
|
|
|
indexer_settings,
|
|
|
|
autogenerate_docids,
|
2022-03-23 17:28:41 +01:00
|
|
|
original_sorter,
|
|
|
|
flattened_sorter,
|
2021-12-08 14:12:07 +01:00
|
|
|
index_documents_method,
|
2022-03-23 17:28:41 +01:00
|
|
|
replaced_documents_ids: RoaringBitmap::new(),
|
|
|
|
new_documents_ids: RoaringBitmap::new(),
|
|
|
|
new_external_documents_ids_builder: FxHashMap::default(),
|
|
|
|
documents_count: 0,
|
|
|
|
})
|
2021-12-08 14:12:07 +01:00
|
|
|
}
|
|
|
|
|
2021-08-31 11:44:15 +02:00
|
|
|
pub fn read_documents<R, F>(
|
2021-12-08 14:12:07 +01:00
|
|
|
&mut self,
|
2021-08-31 11:44:15 +02:00
|
|
|
mut reader: DocumentBatchReader<R>,
|
2021-12-08 14:12:07 +01:00
|
|
|
wtxn: &mut heed::RwTxn,
|
2020-11-11 12:39:09 +01:00
|
|
|
progress_callback: F,
|
2021-12-08 14:12:07 +01:00
|
|
|
) -> Result<usize>
|
2020-11-11 12:39:09 +01:00
|
|
|
where
|
2021-08-31 11:44:15 +02:00
|
|
|
R: Read + Seek,
|
2020-11-11 12:39:09 +01:00
|
|
|
F: Fn(UpdateIndexingStep) + Sync,
|
|
|
|
{
|
2021-08-31 11:44:15 +02:00
|
|
|
let fields_index = reader.index();
|
2022-03-23 17:28:41 +01:00
|
|
|
let external_documents_ids = self.index.external_documents_ids(wtxn)?;
|
|
|
|
let documents_ids = self.index.documents_ids(wtxn)?;
|
|
|
|
let mut available_documents_ids = AvailableDocumentsIds::from_documents_ids(&documents_ids);
|
|
|
|
|
|
|
|
let mapping = create_fields_mapping(&mut self.fields_ids_map, fields_index)?;
|
2020-10-31 16:10:15 +01:00
|
|
|
|
2021-08-31 11:44:15 +02:00
|
|
|
let alternative_name = self
|
|
|
|
.index
|
2021-12-08 14:12:07 +01:00
|
|
|
.primary_key(wtxn)?
|
2021-08-31 11:44:15 +02:00
|
|
|
.or_else(|| find_primary_key(fields_index))
|
|
|
|
.map(String::from);
|
2021-06-02 19:05:12 +02:00
|
|
|
|
2021-08-31 11:44:15 +02:00
|
|
|
let (primary_key_id, primary_key_name) = compute_primary_key_pair(
|
2021-12-08 14:12:07 +01:00
|
|
|
self.index.primary_key(wtxn)?,
|
2022-03-23 17:28:41 +01:00
|
|
|
&mut self.fields_ids_map,
|
2021-01-20 17:27:43 +01:00
|
|
|
alternative_name,
|
2021-06-16 18:33:33 +02:00
|
|
|
self.autogenerate_docids,
|
2021-01-20 17:27:43 +01:00
|
|
|
)?;
|
2020-10-31 16:10:15 +01:00
|
|
|
|
2022-03-23 17:28:41 +01:00
|
|
|
let primary_key_id_nested = primary_key_name.contains('.');
|
|
|
|
|
|
|
|
let mut flattened_document = None;
|
2020-10-31 16:10:15 +01:00
|
|
|
let mut obkv_buffer = Vec::new();
|
2022-03-23 17:28:41 +01:00
|
|
|
let mut flattened_obkv_buffer = Vec::new();
|
2020-11-11 12:39:09 +01:00
|
|
|
let mut documents_count = 0;
|
2021-08-31 11:44:15 +02:00
|
|
|
let mut external_id_buffer = Vec::new();
|
2022-03-23 17:28:41 +01:00
|
|
|
let mut field_buffer: Vec<(u16, Cow<[u8]>)> = Vec::new();
|
2021-08-31 11:44:15 +02:00
|
|
|
while let Some((addition_index, document)) = reader.next_document_with_index()? {
|
|
|
|
let mut field_buffer_cache = drop_and_reuse(field_buffer);
|
2021-12-08 14:12:07 +01:00
|
|
|
if self.indexer_settings.log_every_n.map_or(false, |len| documents_count % len == 0) {
|
2021-08-31 11:44:15 +02:00
|
|
|
progress_callback(UpdateIndexingStep::RemapDocumentAddition {
|
2020-11-11 12:39:09 +01:00
|
|
|
documents_seen: documents_count,
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
2021-08-31 11:44:15 +02:00
|
|
|
for (k, v) in document.iter() {
|
2022-03-23 17:28:41 +01:00
|
|
|
let mapped_id =
|
|
|
|
*mapping.get(&k).ok_or(InternalError::FieldIdMappingMissingEntry { key: k })?;
|
|
|
|
field_buffer_cache.push((mapped_id, Cow::from(v)));
|
2020-10-31 16:10:15 +01:00
|
|
|
}
|
|
|
|
|
2021-08-31 11:44:15 +02:00
|
|
|
// We need to make sure that every document has a primary key. After we have remapped
|
|
|
|
// all the fields in the document, we try to find the primary key value. If we can find
|
|
|
|
// it, transform it into a string and validate it, and then update it in the
|
|
|
|
// document. If none is found, and we were told to generate missing document ids, then
|
|
|
|
// we create the missing field, and update the new document.
|
|
|
|
let mut uuid_buffer = [0; uuid::adapter::Hyphenated::LENGTH];
|
2022-03-23 17:28:41 +01:00
|
|
|
let external_id = if primary_key_id_nested {
|
|
|
|
let mut field_buffer_cache = field_buffer_cache.clone();
|
|
|
|
self.flatten_from_field_mapping(
|
|
|
|
&mapping,
|
|
|
|
&document,
|
|
|
|
&mut flattened_obkv_buffer,
|
|
|
|
&mut field_buffer_cache,
|
|
|
|
)?;
|
|
|
|
flattened_document = Some(&flattened_obkv_buffer);
|
|
|
|
let document = KvReader::new(&flattened_obkv_buffer);
|
|
|
|
|
|
|
|
update_primary_key(
|
|
|
|
document,
|
|
|
|
&addition_index,
|
|
|
|
primary_key_id,
|
|
|
|
&primary_key_name,
|
|
|
|
&mut uuid_buffer,
|
|
|
|
&mut field_buffer_cache,
|
|
|
|
&mut external_id_buffer,
|
|
|
|
self.autogenerate_docids,
|
|
|
|
)?
|
|
|
|
} else {
|
|
|
|
update_primary_key(
|
|
|
|
document,
|
|
|
|
&addition_index,
|
|
|
|
primary_key_id,
|
|
|
|
&primary_key_name,
|
|
|
|
&mut uuid_buffer,
|
|
|
|
&mut field_buffer_cache,
|
|
|
|
&mut external_id_buffer,
|
|
|
|
self.autogenerate_docids,
|
|
|
|
)?
|
|
|
|
};
|
2020-10-31 12:54:43 +01:00
|
|
|
|
2021-08-31 11:44:15 +02:00
|
|
|
// Insertion in a obkv need to be done with keys ordered. For now they are ordered
|
|
|
|
// according to the document addition key order, so we sort it according to the
|
|
|
|
// fieldids map keys order.
|
|
|
|
field_buffer_cache.sort_unstable_by(|(f1, _), (f2, _)| f1.cmp(&f2));
|
2020-10-31 12:54:43 +01:00
|
|
|
|
2022-03-23 17:28:41 +01:00
|
|
|
// Build the new obkv document.
|
2021-08-31 11:44:15 +02:00
|
|
|
let mut writer = obkv::KvWriter::new(&mut obkv_buffer);
|
|
|
|
for (k, v) in field_buffer_cache.iter() {
|
|
|
|
writer.insert(*k, v)?;
|
2020-10-23 14:11:00 +02:00
|
|
|
}
|
|
|
|
|
2022-03-23 17:28:41 +01:00
|
|
|
let (docid, should_insert_original_document) =
|
|
|
|
match external_documents_ids.get(&*external_id) {
|
|
|
|
// if the document is in the db but has already been inserted
|
|
|
|
// (ie: already exists in the list of replaced documents ids),
|
|
|
|
// we should not add the original document a second time.
|
|
|
|
Some(docid) => (docid, !self.replaced_documents_ids.contains(docid)),
|
|
|
|
None => {
|
|
|
|
// if the document has already been inserted in this
|
|
|
|
// batch we need to get its docid
|
2022-04-11 15:43:18 +02:00
|
|
|
match self.new_external_documents_ids_builder.entry(external_id.into()) {
|
2022-03-23 17:28:41 +01:00
|
|
|
Entry::Occupied(entry) => (*entry.get() as u32, false),
|
|
|
|
// if the document has never been encountered we give it a new docid
|
|
|
|
// and push this new docid to the external documents ids builder
|
|
|
|
Entry::Vacant(entry) => {
|
|
|
|
let new_docid = available_documents_ids
|
|
|
|
.next()
|
|
|
|
.ok_or(UserError::DocumentLimitReached)?;
|
|
|
|
entry.insert(new_docid as u64);
|
|
|
|
(new_docid, false)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
if should_insert_original_document {
|
|
|
|
self.replaced_documents_ids.insert(docid);
|
|
|
|
|
|
|
|
let key = BEU32::new(docid);
|
|
|
|
let base_obkv = self
|
|
|
|
.index
|
|
|
|
.documents
|
|
|
|
.remap_data_type::<heed::types::ByteSlice>()
|
|
|
|
.get(wtxn, &key)?
|
|
|
|
.ok_or(InternalError::DatabaseMissingEntry {
|
|
|
|
db_name: db_name::DOCUMENTS,
|
|
|
|
key: None,
|
|
|
|
})?;
|
|
|
|
|
|
|
|
self.original_sorter.insert(&docid.to_be_bytes(), base_obkv)?;
|
2022-04-12 11:22:36 +02:00
|
|
|
if let Some(buffer) = self.flatten_from_fields_ids_map(KvReader::new(&base_obkv))? {
|
|
|
|
self.flattened_sorter.insert(docid.to_be_bytes(), &buffer)?;
|
|
|
|
} else {
|
|
|
|
self.flattened_sorter.insert(docid.to_be_bytes(), base_obkv)?;
|
|
|
|
}
|
2022-03-23 17:28:41 +01:00
|
|
|
} else {
|
|
|
|
self.new_documents_ids.insert(docid);
|
|
|
|
}
|
|
|
|
|
2020-10-31 12:54:43 +01:00
|
|
|
// We use the extracted/generated user id as the key for this document.
|
2022-03-23 17:28:41 +01:00
|
|
|
self.original_sorter.insert(&docid.to_be_bytes(), obkv_buffer.clone())?;
|
2020-11-11 12:39:09 +01:00
|
|
|
documents_count += 1;
|
2021-08-31 11:44:15 +02:00
|
|
|
|
2022-03-23 17:28:41 +01:00
|
|
|
if let Some(flatten) = flattened_document {
|
|
|
|
self.flattened_sorter.insert(docid.to_be_bytes(), &flatten)?;
|
|
|
|
} else {
|
2022-04-12 11:22:36 +02:00
|
|
|
if let Some(buffer) =
|
|
|
|
self.flatten_from_fields_ids_map(KvReader::new(&obkv_buffer))?
|
|
|
|
{
|
|
|
|
self.flattened_sorter.insert(docid.to_be_bytes(), &buffer)?;
|
|
|
|
} else {
|
|
|
|
self.flattened_sorter.insert(docid.to_be_bytes(), obkv_buffer.clone())?;
|
|
|
|
}
|
2022-03-23 17:28:41 +01:00
|
|
|
}
|
|
|
|
|
2021-08-31 11:44:15 +02:00
|
|
|
progress_callback(UpdateIndexingStep::RemapDocumentAddition {
|
|
|
|
documents_seen: documents_count,
|
|
|
|
});
|
|
|
|
|
|
|
|
field_buffer = drop_and_reuse(field_buffer_cache);
|
|
|
|
external_id_buffer.clear();
|
2022-03-23 17:28:41 +01:00
|
|
|
obkv_buffer.clear();
|
2020-10-23 14:11:00 +02:00
|
|
|
}
|
|
|
|
|
2021-08-31 11:44:15 +02:00
|
|
|
progress_callback(UpdateIndexingStep::RemapDocumentAddition {
|
2020-11-11 12:39:09 +01:00
|
|
|
documents_seen: documents_count,
|
|
|
|
});
|
|
|
|
|
2022-03-23 17:28:41 +01:00
|
|
|
self.index.put_fields_ids_map(wtxn, &self.fields_ids_map)?;
|
2021-12-08 14:12:07 +01:00
|
|
|
self.index.put_primary_key(wtxn, &primary_key_name)?;
|
|
|
|
self.documents_count += documents_count;
|
2020-10-31 16:10:15 +01:00
|
|
|
// Now that we have a valid sorter that contains the user id and the obkv we
|
|
|
|
// give it to the last transforming function which returns the TransformOutput.
|
2021-12-08 14:12:07 +01:00
|
|
|
Ok(documents_count)
|
2020-10-31 16:10:15 +01:00
|
|
|
}
|
|
|
|
|
2022-03-23 17:28:41 +01:00
|
|
|
// Flatten a document from the fields ids map contained in self and insert the new
|
2022-04-12 11:22:36 +02:00
|
|
|
// created fields. Returns `None` if the document doesn't need to be flattened.
|
|
|
|
fn flatten_from_fields_ids_map(&mut self, obkv: KvReader<FieldId>) -> Result<Option<Vec<u8>>> {
|
|
|
|
if obkv
|
|
|
|
.iter()
|
|
|
|
.all(|(_, value)| !json_depth_checker::should_flatten_from_unchecked_slice(value))
|
|
|
|
{
|
|
|
|
return Ok(None);
|
|
|
|
}
|
|
|
|
|
2022-03-23 17:28:41 +01:00
|
|
|
let mut doc = serde_json::Map::new();
|
|
|
|
|
|
|
|
for (k, v) in obkv.iter() {
|
|
|
|
let key = self.fields_ids_map.name(k).ok_or(FieldIdMapMissingEntry::FieldId {
|
|
|
|
field_id: k,
|
|
|
|
process: "Flatten from fields ids map.",
|
|
|
|
})?;
|
|
|
|
let value = serde_json::from_slice::<serde_json::Value>(v)
|
|
|
|
.map_err(crate::error::InternalError::SerdeJson)?;
|
|
|
|
doc.insert(key.to_string(), value);
|
|
|
|
}
|
|
|
|
|
|
|
|
let flattened = flatten_serde_json::flatten(&doc);
|
|
|
|
|
|
|
|
// Once we have the flattened version we can convert it back to obkv and
|
|
|
|
// insert all the new generated fields_ids (if any) in the fields ids map.
|
|
|
|
let mut buffer: Vec<u8> = Vec::new();
|
|
|
|
let mut writer = KvWriter::new(&mut buffer);
|
|
|
|
let mut flattened: Vec<_> = flattened.into_iter().collect();
|
|
|
|
// we reorder the field to get all the known field first
|
|
|
|
flattened
|
|
|
|
.sort_unstable_by_key(|(key, _)| self.fields_ids_map.id(&key).unwrap_or(FieldId::MAX));
|
|
|
|
|
|
|
|
for (key, value) in flattened {
|
|
|
|
let fid = self.fields_ids_map.insert(&key).ok_or(UserError::AttributeLimitReached)?;
|
|
|
|
let value = serde_json::to_vec(&value).map_err(InternalError::SerdeJson)?;
|
|
|
|
writer.insert(fid, &value)?;
|
|
|
|
}
|
|
|
|
|
2022-04-12 11:22:36 +02:00
|
|
|
Ok(Some(buffer))
|
2022-03-23 17:28:41 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
// Flatten a document from a field mapping generated by [create_fields_mapping]
|
|
|
|
fn flatten_from_field_mapping(
|
|
|
|
&mut self,
|
|
|
|
mapping: &HashMap<FieldId, FieldId>,
|
|
|
|
obkv: &KvReader<FieldId>,
|
|
|
|
output_buffer: &mut Vec<u8>,
|
|
|
|
field_buffer_cache: &mut Vec<(u16, Cow<[u8]>)>,
|
|
|
|
) -> Result<()> {
|
|
|
|
// if the primary_key is nested we need to flatten the document before being able to do anything
|
|
|
|
let mut doc = serde_json::Map::new();
|
|
|
|
|
|
|
|
for (k, v) in obkv.iter() {
|
|
|
|
let key =
|
|
|
|
mapping.get(&k).ok_or(InternalError::FieldIdMappingMissingEntry { key: k })?;
|
|
|
|
let key = self.fields_ids_map.name(*key).ok_or(FieldIdMapMissingEntry::FieldId {
|
|
|
|
field_id: *key,
|
|
|
|
process: "Flatten from field mapping.",
|
|
|
|
})?;
|
|
|
|
let value =
|
|
|
|
serde_json::from_slice::<serde_json::Value>(v).map_err(InternalError::SerdeJson)?;
|
|
|
|
doc.insert(key.to_string(), value);
|
|
|
|
}
|
|
|
|
|
|
|
|
let flattened = flatten_serde_json::flatten(&doc);
|
|
|
|
|
|
|
|
// Once we have the flattened version we can convert it back to obkv and
|
|
|
|
// insert all the new generated fields_ids (if any) in the fields ids map.
|
|
|
|
output_buffer.clear();
|
|
|
|
let mut writer = KvWriter::new(output_buffer);
|
|
|
|
let mut flattened: Vec<_> = flattened.into_iter().collect();
|
|
|
|
// we reorder the field to get all the known field first
|
|
|
|
flattened
|
|
|
|
.sort_unstable_by_key(|(key, _)| self.fields_ids_map.id(&key).unwrap_or(FieldId::MAX));
|
|
|
|
|
|
|
|
for (key, value) in flattened {
|
|
|
|
let fid = self.fields_ids_map.insert(&key).ok_or(UserError::AttributeLimitReached)?;
|
|
|
|
let value = serde_json::to_vec(&value).map_err(InternalError::SerdeJson)?;
|
|
|
|
writer.insert(fid, &value)?;
|
|
|
|
if field_buffer_cache.iter().find(|(id, _)| *id == fid).is_none() {
|
|
|
|
field_buffer_cache.push((fid, value.into()));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2020-11-01 11:50:10 +01:00
|
|
|
/// Generate the `TransformOutput` based on the given sorter that can be generated from any
|
|
|
|
/// format like CSV, JSON or JSON stream. This sorter must contain a key that is the document
|
2020-10-31 16:10:15 +01:00
|
|
|
/// id for the user side and the value must be an obkv where keys are valid fields ids.
|
2021-12-08 14:12:07 +01:00
|
|
|
pub(crate) fn output_from_sorter<F>(
|
2020-10-31 16:10:15 +01:00
|
|
|
self,
|
2021-12-08 14:12:07 +01:00
|
|
|
wtxn: &mut heed::RwTxn,
|
2020-11-11 12:39:09 +01:00
|
|
|
progress_callback: F,
|
2021-06-14 16:46:19 +02:00
|
|
|
) -> Result<TransformOutput>
|
2020-11-11 12:39:09 +01:00
|
|
|
where
|
|
|
|
F: Fn(UpdateIndexingStep) + Sync,
|
2020-10-31 16:10:15 +01:00
|
|
|
{
|
2021-12-08 14:12:07 +01:00
|
|
|
let primary_key = self
|
|
|
|
.index
|
|
|
|
.primary_key(&wtxn)?
|
|
|
|
.ok_or(Error::UserError(UserError::MissingPrimaryKey))?
|
|
|
|
.to_string();
|
|
|
|
|
2022-03-23 17:28:41 +01:00
|
|
|
let mut external_documents_ids = self.index.external_documents_ids(wtxn)?;
|
2021-09-08 13:30:26 +02:00
|
|
|
|
2022-03-23 17:28:41 +01:00
|
|
|
// We create a final writer to write the new documents in order from the sorter.
|
|
|
|
let mut writer = create_writer(
|
2021-12-08 14:12:07 +01:00
|
|
|
self.indexer_settings.chunk_compression_type,
|
|
|
|
self.indexer_settings.chunk_compression_level,
|
2022-03-23 17:28:41 +01:00
|
|
|
tempfile::tempfile()?,
|
2020-10-29 14:20:03 +01:00
|
|
|
);
|
2020-10-23 14:11:00 +02:00
|
|
|
|
2022-03-23 17:28:41 +01:00
|
|
|
// Once we have all the documents in the sorter, we write the documents
|
|
|
|
// in the writer. We also generate the field distribution.
|
|
|
|
let mut field_distribution = self.index.field_distribution(wtxn)?;
|
|
|
|
let mut iter = self.original_sorter.into_stream_merger_iter()?;
|
|
|
|
// used only for the callback
|
2020-10-23 14:11:00 +02:00
|
|
|
let mut documents_count = 0;
|
2020-11-11 12:39:09 +01:00
|
|
|
|
2022-03-23 17:28:41 +01:00
|
|
|
while let Some((key, val)) = iter.next()? {
|
|
|
|
// send a callback to show at which step we are
|
|
|
|
documents_count += 1;
|
|
|
|
progress_callback(UpdateIndexingStep::ComputeIdsAndMergeDocuments {
|
|
|
|
documents_seen: documents_count,
|
|
|
|
total_documents: self.documents_count,
|
|
|
|
});
|
|
|
|
|
|
|
|
let u32_key = key.clone().read_u32::<byteorder::BigEndian>()?;
|
|
|
|
// if the document was already in the db we remove all of its field
|
|
|
|
// from the field distribution.
|
|
|
|
if self.replaced_documents_ids.contains(u32_key) {
|
|
|
|
let obkv = self.index.documents.get(wtxn, &BEU32::new(u32_key))?.ok_or(
|
|
|
|
InternalError::DatabaseMissingEntry { db_name: db_name::DOCUMENTS, key: None },
|
|
|
|
)?;
|
|
|
|
|
|
|
|
for (key, _) in obkv.iter() {
|
|
|
|
let name =
|
|
|
|
self.fields_ids_map.name(key).ok_or(FieldIdMapMissingEntry::FieldId {
|
|
|
|
field_id: key,
|
|
|
|
process: "Computing field distribution in transform.",
|
|
|
|
})?;
|
|
|
|
// We checked that the document was in the db earlier. If we can't find it it means
|
|
|
|
// there is an inconsistency between the field distribution and the field id map.
|
|
|
|
let field = field_distribution.get_mut(name).ok_or(
|
|
|
|
FieldIdMapMissingEntry::FieldId {
|
|
|
|
field_id: key,
|
|
|
|
process: "Accessing field distribution in transform.",
|
2021-06-17 17:05:34 +02:00
|
|
|
},
|
|
|
|
)?;
|
2022-03-23 17:28:41 +01:00
|
|
|
*field -= 1;
|
|
|
|
if *field == 0 {
|
|
|
|
// since we were able to get the field right before it's safe to unwrap here
|
|
|
|
field_distribution.remove(name).unwrap();
|
2020-10-26 10:55:07 +01:00
|
|
|
}
|
2021-06-16 18:33:33 +02:00
|
|
|
}
|
2022-03-23 17:28:41 +01:00
|
|
|
}
|
2020-10-23 14:11:00 +02:00
|
|
|
|
2022-03-23 17:28:41 +01:00
|
|
|
// We increment all the field of the current document in the field distribution.
|
|
|
|
let obkv = KvReader::new(val);
|
2021-05-04 22:01:11 +03:00
|
|
|
|
2022-03-23 17:28:41 +01:00
|
|
|
for (key, _) in obkv.iter() {
|
|
|
|
let name =
|
|
|
|
self.fields_ids_map.name(key).ok_or(FieldIdMapMissingEntry::FieldId {
|
|
|
|
field_id: key,
|
|
|
|
process: "Computing field distribution in transform.",
|
|
|
|
})?;
|
|
|
|
*field_distribution.entry(name.to_string()).or_insert(0) += 1;
|
2021-05-04 22:01:11 +03:00
|
|
|
}
|
2022-03-23 17:28:41 +01:00
|
|
|
writer.insert(key, val)?;
|
2020-10-23 14:11:00 +02:00
|
|
|
}
|
|
|
|
|
2022-03-23 17:28:41 +01:00
|
|
|
let mut original_documents = writer.into_inner()?;
|
|
|
|
// We then extract the file and reset the seek to be able to read it again.
|
|
|
|
original_documents.seek(SeekFrom::Start(0))?;
|
2020-11-11 12:39:09 +01:00
|
|
|
|
2020-10-29 14:20:03 +01:00
|
|
|
// We create a final writer to write the new documents in order from the sorter.
|
2021-12-08 14:12:07 +01:00
|
|
|
let mut writer = create_writer(
|
|
|
|
self.indexer_settings.chunk_compression_type,
|
|
|
|
self.indexer_settings.chunk_compression_level,
|
2022-02-16 15:28:48 +01:00
|
|
|
tempfile::tempfile()?,
|
|
|
|
);
|
2020-10-29 14:20:03 +01:00
|
|
|
// Once we have written all the documents into the final sorter, we write the documents
|
|
|
|
// into this writer, extract the file and reset the seek to be able to read it again.
|
2022-03-23 17:28:41 +01:00
|
|
|
self.flattened_sorter.write_into_stream_writer(&mut writer)?;
|
|
|
|
let mut flattened_documents = writer.into_inner()?;
|
|
|
|
flattened_documents.seek(SeekFrom::Start(0))?;
|
|
|
|
|
|
|
|
let mut new_external_documents_ids_builder: Vec<_> =
|
|
|
|
self.new_external_documents_ids_builder.into_iter().collect();
|
|
|
|
|
|
|
|
new_external_documents_ids_builder
|
|
|
|
.sort_unstable_by(|(left, _), (right, _)| left.cmp(&right));
|
|
|
|
let mut fst_new_external_documents_ids_builder = fst::MapBuilder::memory();
|
|
|
|
new_external_documents_ids_builder.into_iter().try_for_each(|(key, value)| {
|
|
|
|
fst_new_external_documents_ids_builder.insert(key, value)
|
|
|
|
})?;
|
|
|
|
let new_external_documents_ids = fst_new_external_documents_ids_builder.into_map();
|
2020-11-22 17:53:33 +01:00
|
|
|
external_documents_ids.insert_ids(&new_external_documents_ids)?;
|
2020-10-23 14:11:00 +02:00
|
|
|
|
|
|
|
Ok(TransformOutput {
|
2020-10-31 16:10:15 +01:00
|
|
|
primary_key,
|
2022-03-23 17:28:41 +01:00
|
|
|
fields_ids_map: self.fields_ids_map,
|
2021-06-17 15:16:20 +02:00
|
|
|
field_distribution,
|
2020-11-22 17:53:33 +01:00
|
|
|
external_documents_ids: external_documents_ids.into_static(),
|
2022-03-23 17:28:41 +01:00
|
|
|
new_documents_ids: self.new_documents_ids,
|
|
|
|
replaced_documents_ids: self.replaced_documents_ids,
|
|
|
|
documents_count: self.documents_count,
|
|
|
|
original_documents,
|
|
|
|
flattened_documents,
|
2020-11-03 13:20:11 +01:00
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Returns a `TransformOutput` with a file that contains the documents of the index
|
|
|
|
/// with the attributes reordered accordingly to the `FieldsIdsMap` given as argument.
|
|
|
|
// TODO this can be done in parallel by using the rayon `ThreadPool`.
|
|
|
|
pub fn remap_index_documents(
|
|
|
|
self,
|
2021-12-08 14:12:07 +01:00
|
|
|
wtxn: &mut heed::RwTxn,
|
2021-01-20 17:27:43 +01:00
|
|
|
old_fields_ids_map: FieldsIdsMap,
|
2022-03-23 17:28:41 +01:00
|
|
|
mut new_fields_ids_map: FieldsIdsMap,
|
2021-06-16 18:33:33 +02:00
|
|
|
) -> Result<TransformOutput> {
|
2021-12-08 14:12:07 +01:00
|
|
|
// There already has been a document addition, the primary key should be set by now.
|
|
|
|
let primary_key =
|
|
|
|
self.index.primary_key(wtxn)?.ok_or(UserError::MissingPrimaryKey)?.to_string();
|
|
|
|
let field_distribution = self.index.field_distribution(wtxn)?;
|
|
|
|
let external_documents_ids = self.index.external_documents_ids(wtxn)?;
|
|
|
|
let documents_ids = self.index.documents_ids(wtxn)?;
|
2020-11-03 13:20:11 +01:00
|
|
|
let documents_count = documents_ids.len() as usize;
|
|
|
|
|
|
|
|
// We create a final writer to write the new documents in order from the sorter.
|
2022-03-23 17:28:41 +01:00
|
|
|
let mut original_writer = create_writer(
|
|
|
|
self.indexer_settings.chunk_compression_type,
|
|
|
|
self.indexer_settings.chunk_compression_level,
|
|
|
|
tempfile::tempfile()?,
|
|
|
|
);
|
|
|
|
|
|
|
|
// We create a final writer to write the new documents in order from the sorter.
|
|
|
|
let mut flattened_writer = create_writer(
|
2021-12-08 14:12:07 +01:00
|
|
|
self.indexer_settings.chunk_compression_type,
|
|
|
|
self.indexer_settings.chunk_compression_level,
|
2022-02-16 15:28:48 +01:00
|
|
|
tempfile::tempfile()?,
|
|
|
|
);
|
2020-11-03 13:20:11 +01:00
|
|
|
|
|
|
|
let mut obkv_buffer = Vec::new();
|
2021-12-08 14:12:07 +01:00
|
|
|
for result in self.index.documents.iter(wtxn)? {
|
2020-11-03 13:20:11 +01:00
|
|
|
let (docid, obkv) = result?;
|
|
|
|
let docid = docid.get();
|
|
|
|
|
|
|
|
obkv_buffer.clear();
|
2021-08-16 13:36:30 +02:00
|
|
|
let mut obkv_writer = obkv::KvWriter::<_, FieldId>::new(&mut obkv_buffer);
|
2020-11-03 13:20:11 +01:00
|
|
|
|
|
|
|
// We iterate over the new `FieldsIdsMap` ids in order and construct the new obkv.
|
2021-01-20 17:27:43 +01:00
|
|
|
for (id, name) in new_fields_ids_map.iter() {
|
|
|
|
if let Some(val) = old_fields_ids_map.id(name).and_then(|id| obkv.get(id)) {
|
2020-11-03 13:20:11 +01:00
|
|
|
obkv_writer.insert(id, val)?;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
let buffer = obkv_writer.into_inner()?;
|
2022-03-23 17:28:41 +01:00
|
|
|
original_writer.insert(docid.to_be_bytes(), &buffer)?;
|
|
|
|
|
|
|
|
// Once we have the document. We're going to flatten it
|
|
|
|
// and insert it in the flattened sorter.
|
|
|
|
let mut doc = serde_json::Map::new();
|
|
|
|
|
|
|
|
let reader = obkv::KvReader::new(buffer);
|
|
|
|
for (k, v) in reader.iter() {
|
|
|
|
let key = new_fields_ids_map.name(k).ok_or(FieldIdMapMissingEntry::FieldId {
|
|
|
|
field_id: k,
|
|
|
|
process: "Accessing field distribution in transform.",
|
|
|
|
})?;
|
|
|
|
let value = serde_json::from_slice::<serde_json::Value>(v)
|
|
|
|
.map_err(InternalError::SerdeJson)?;
|
|
|
|
doc.insert(key.to_string(), value);
|
|
|
|
}
|
|
|
|
|
|
|
|
let flattened = flatten_serde_json::flatten(&doc);
|
|
|
|
|
|
|
|
// Once we have the flattened version we can convert it back to obkv and
|
|
|
|
// insert all the new generated fields_ids (if any) in the fields ids map.
|
|
|
|
let mut buffer: Vec<u8> = Vec::new();
|
|
|
|
let mut writer = KvWriter::new(&mut buffer);
|
|
|
|
let mut flattened: Vec<_> = flattened.into_iter().collect();
|
|
|
|
// we reorder the field to get all the known field first
|
|
|
|
flattened.sort_unstable_by_key(|(key, _)| {
|
|
|
|
new_fields_ids_map.id(&key).unwrap_or(FieldId::MAX)
|
|
|
|
});
|
|
|
|
|
|
|
|
for (key, value) in flattened {
|
|
|
|
let fid =
|
|
|
|
new_fields_ids_map.insert(&key).ok_or(UserError::AttributeLimitReached)?;
|
|
|
|
let value = serde_json::to_vec(&value).map_err(InternalError::SerdeJson)?;
|
|
|
|
writer.insert(fid, &value)?;
|
|
|
|
}
|
|
|
|
flattened_writer.insert(docid.to_be_bytes(), &buffer)?;
|
2020-11-03 13:20:11 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
// Once we have written all the documents, we extract
|
|
|
|
// the file and reset the seek to be able to read it again.
|
2022-03-23 17:28:41 +01:00
|
|
|
let mut original_documents = original_writer.into_inner()?;
|
|
|
|
original_documents.seek(SeekFrom::Start(0))?;
|
|
|
|
|
|
|
|
let mut flattened_documents = flattened_writer.into_inner()?;
|
|
|
|
flattened_documents.seek(SeekFrom::Start(0))?;
|
2020-11-03 13:20:11 +01:00
|
|
|
|
|
|
|
Ok(TransformOutput {
|
|
|
|
primary_key,
|
2021-01-20 17:27:43 +01:00
|
|
|
fields_ids_map: new_fields_ids_map,
|
2021-06-17 15:16:20 +02:00
|
|
|
field_distribution,
|
2020-11-22 17:53:33 +01:00
|
|
|
external_documents_ids: external_documents_ids.into_static(),
|
2020-11-03 13:20:11 +01:00
|
|
|
new_documents_ids: documents_ids,
|
|
|
|
replaced_documents_ids: RoaringBitmap::default(),
|
|
|
|
documents_count,
|
2022-03-23 17:28:41 +01:00
|
|
|
original_documents,
|
|
|
|
flattened_documents,
|
2020-10-23 14:11:00 +02:00
|
|
|
})
|
|
|
|
}
|
|
|
|
}
|
2020-10-31 16:10:15 +01:00
|
|
|
|
2021-01-20 17:27:43 +01:00
|
|
|
/// Given an optional primary key and an optional alternative name, returns the (field_id, attr_name)
|
|
|
|
/// for the primary key according to the following rules:
|
|
|
|
/// - if primary_key is `Some`, returns the id and the name, else
|
|
|
|
/// - if alternative_name is Some, adds alternative to the fields_ids_map, and returns the pair, else
|
|
|
|
/// - if autogenerate_docids is true, insert the default id value in the field ids map ("id") and
|
|
|
|
/// returns the pair, else
|
|
|
|
/// - returns an error.
|
|
|
|
fn compute_primary_key_pair(
|
|
|
|
primary_key: Option<&str>,
|
|
|
|
fields_ids_map: &mut FieldsIdsMap,
|
|
|
|
alternative_name: Option<String>,
|
|
|
|
autogenerate_docids: bool,
|
2021-06-14 16:46:19 +02:00
|
|
|
) -> Result<(FieldId, String)> {
|
2021-01-20 17:27:43 +01:00
|
|
|
match primary_key {
|
|
|
|
Some(primary_key) => {
|
2021-06-14 16:46:19 +02:00
|
|
|
let id = fields_ids_map.insert(primary_key).ok_or(UserError::AttributeLimitReached)?;
|
2021-01-20 17:27:43 +01:00
|
|
|
Ok((id, primary_key.to_string()))
|
|
|
|
}
|
|
|
|
None => {
|
|
|
|
let name = match alternative_name {
|
|
|
|
Some(key) => key,
|
|
|
|
None => {
|
|
|
|
if !autogenerate_docids {
|
|
|
|
// If there is no primary key in the current document batch, we must
|
|
|
|
// return an error and not automatically generate any document id.
|
2021-06-14 16:46:19 +02:00
|
|
|
return Err(UserError::MissingPrimaryKey.into());
|
2021-01-20 17:27:43 +01:00
|
|
|
}
|
|
|
|
DEFAULT_PRIMARY_KEY_NAME.to_string()
|
2021-06-16 18:33:33 +02:00
|
|
|
}
|
2021-01-20 17:27:43 +01:00
|
|
|
};
|
2021-06-14 16:46:19 +02:00
|
|
|
let id = fields_ids_map.insert(&name).ok_or(UserError::AttributeLimitReached)?;
|
2021-01-20 17:27:43 +01:00
|
|
|
Ok((id, name))
|
2021-06-16 18:33:33 +02:00
|
|
|
}
|
2021-01-20 17:27:43 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-11-01 16:43:12 +01:00
|
|
|
fn validate_document_id(document_id: &str) -> Option<&str> {
|
|
|
|
let document_id = document_id.trim();
|
|
|
|
Some(document_id).filter(|id| {
|
2021-06-16 18:33:33 +02:00
|
|
|
!id.is_empty()
|
|
|
|
&& id.chars().all(|c| matches!(c, 'a'..='z' | 'A'..='Z' | '0'..='9' | '-' | '_'))
|
2020-11-01 16:43:12 +01:00
|
|
|
})
|
|
|
|
}
|
2021-01-20 17:27:43 +01:00
|
|
|
|
2021-08-31 11:44:15 +02:00
|
|
|
/// Drops all the value of type `U` in vec, and reuses the allocation to create a `Vec<T>`.
|
|
|
|
///
|
|
|
|
/// The size and alignment of T and U must match.
|
|
|
|
fn drop_and_reuse<U, T>(mut vec: Vec<U>) -> Vec<T> {
|
|
|
|
debug_assert_eq!(std::mem::align_of::<U>(), std::mem::align_of::<T>());
|
|
|
|
debug_assert_eq!(std::mem::size_of::<U>(), std::mem::size_of::<T>());
|
|
|
|
vec.clear();
|
|
|
|
debug_assert!(vec.is_empty());
|
|
|
|
vec.into_iter().map(|_| unreachable!()).collect()
|
|
|
|
}
|
|
|
|
|
2022-03-23 17:28:41 +01:00
|
|
|
fn update_primary_key<'a>(
|
|
|
|
document: KvReader<'a, FieldId>,
|
|
|
|
addition_index: &DocumentsBatchIndex,
|
|
|
|
primary_key_id: FieldId,
|
|
|
|
primary_key_name: &str,
|
|
|
|
uuid_buffer: &'a mut [u8; uuid::adapter::Hyphenated::LENGTH],
|
|
|
|
field_buffer_cache: &mut Vec<(u16, Cow<'a, [u8]>)>,
|
|
|
|
mut external_id_buffer: &'a mut Vec<u8>,
|
|
|
|
autogenerate_docids: bool,
|
|
|
|
) -> Result<Cow<'a, str>> {
|
|
|
|
match field_buffer_cache.iter_mut().find(|(id, _)| *id == primary_key_id) {
|
|
|
|
Some((_, bytes)) => {
|
|
|
|
let value = match serde_json::from_slice(bytes).map_err(InternalError::SerdeJson)? {
|
|
|
|
Value::String(string) => match validate_document_id(&string) {
|
|
|
|
Some(s) if s.len() == string.len() => string,
|
|
|
|
Some(s) => s.to_string(),
|
|
|
|
None => {
|
|
|
|
return Err(UserError::InvalidDocumentId {
|
|
|
|
document_id: Value::String(string),
|
|
|
|
}
|
|
|
|
.into())
|
|
|
|
}
|
|
|
|
},
|
|
|
|
Value::Number(number) => number.to_string(),
|
|
|
|
content => {
|
|
|
|
return Err(UserError::InvalidDocumentId { document_id: content.clone() }.into())
|
|
|
|
}
|
|
|
|
};
|
|
|
|
serde_json::to_writer(external_id_buffer, &value).map_err(InternalError::SerdeJson)?;
|
|
|
|
Ok(Cow::Owned(value))
|
|
|
|
}
|
|
|
|
None if autogenerate_docids => {
|
|
|
|
let uuid = uuid::Uuid::new_v4().to_hyphenated().encode_lower(uuid_buffer);
|
|
|
|
serde_json::to_writer(&mut external_id_buffer, &uuid)
|
|
|
|
.map_err(InternalError::SerdeJson)?;
|
|
|
|
field_buffer_cache.push((primary_key_id, external_id_buffer.as_slice().into()));
|
|
|
|
Ok(Cow::Borrowed(&*uuid))
|
|
|
|
}
|
|
|
|
None => {
|
|
|
|
let mut json = Map::new();
|
|
|
|
for (key, value) in document.iter() {
|
|
|
|
let key = addition_index.name(key).cloned();
|
|
|
|
let value = serde_json::from_slice::<Value>(&value).ok();
|
|
|
|
|
|
|
|
if let Some((k, v)) = key.zip(value) {
|
|
|
|
json.insert(k, v);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
Err(UserError::MissingDocumentId {
|
|
|
|
primary_key: primary_key_name.to_string(),
|
|
|
|
document: json,
|
|
|
|
})?
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl TransformOutput {
|
|
|
|
// find and insert the new field ids
|
|
|
|
pub fn compute_real_facets(&self, rtxn: &RoTxn, index: &Index) -> Result<HashSet<String>> {
|
|
|
|
let user_defined_facets = index.user_defined_faceted_fields(rtxn)?;
|
|
|
|
|
|
|
|
Ok(self
|
|
|
|
.fields_ids_map
|
|
|
|
.names()
|
|
|
|
.filter(|&field| crate::is_faceted(field, &user_defined_facets))
|
|
|
|
.map(|field| field.to_string())
|
|
|
|
.collect())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-01-20 17:27:43 +01:00
|
|
|
#[cfg(test)]
|
|
|
|
mod test {
|
|
|
|
use super::*;
|
|
|
|
|
|
|
|
mod compute_primary_key {
|
2022-03-23 17:28:41 +01:00
|
|
|
use big_s::S;
|
|
|
|
|
2021-06-16 18:33:33 +02:00
|
|
|
use super::{compute_primary_key_pair, FieldsIdsMap};
|
2021-01-20 17:27:43 +01:00
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn should_return_primary_key_if_is_some() {
|
|
|
|
let mut fields_map = FieldsIdsMap::new();
|
|
|
|
fields_map.insert("toto").unwrap();
|
|
|
|
let result = compute_primary_key_pair(
|
|
|
|
Some("toto"),
|
|
|
|
&mut fields_map,
|
|
|
|
Some("tata".to_string()),
|
2021-06-16 18:33:33 +02:00
|
|
|
false,
|
|
|
|
);
|
2021-07-06 11:31:24 +02:00
|
|
|
assert_eq!(result.unwrap(), (0, "toto".to_string()));
|
2021-01-20 17:27:43 +01:00
|
|
|
assert_eq!(fields_map.len(), 1);
|
2022-03-23 17:28:41 +01:00
|
|
|
|
|
|
|
// and with nested fields
|
|
|
|
let mut fields_map = FieldsIdsMap::new();
|
|
|
|
fields_map.insert("toto.tata").unwrap();
|
|
|
|
let result = compute_primary_key_pair(
|
|
|
|
Some("toto.tata"),
|
|
|
|
&mut fields_map,
|
|
|
|
Some(S("titi")),
|
|
|
|
false,
|
|
|
|
);
|
|
|
|
assert_eq!(result.unwrap(), (0, "toto.tata".to_string()));
|
|
|
|
assert_eq!(fields_map.len(), 1);
|
2021-01-20 17:27:43 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn should_return_alternative_if_primary_is_none() {
|
|
|
|
let mut fields_map = FieldsIdsMap::new();
|
2021-06-16 18:33:33 +02:00
|
|
|
let result =
|
|
|
|
compute_primary_key_pair(None, &mut fields_map, Some("tata".to_string()), false);
|
2022-03-23 17:28:41 +01:00
|
|
|
assert_eq!(result.unwrap(), (0, S("tata")));
|
2021-01-20 17:27:43 +01:00
|
|
|
assert_eq!(fields_map.len(), 1);
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn should_return_default_if_both_are_none() {
|
|
|
|
let mut fields_map = FieldsIdsMap::new();
|
2021-06-16 18:33:33 +02:00
|
|
|
let result = compute_primary_key_pair(None, &mut fields_map, None, true);
|
2022-03-23 17:28:41 +01:00
|
|
|
assert_eq!(result.unwrap(), (0, S("id")));
|
2021-01-20 17:27:43 +01:00
|
|
|
assert_eq!(fields_map.len(), 1);
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
2021-06-16 18:33:33 +02:00
|
|
|
fn should_return_err_if_both_are_none_and_recompute_is_false() {
|
2021-01-20 17:27:43 +01:00
|
|
|
let mut fields_map = FieldsIdsMap::new();
|
2021-06-16 18:33:33 +02:00
|
|
|
let result = compute_primary_key_pair(None, &mut fields_map, None, false);
|
2021-01-20 17:27:43 +01:00
|
|
|
assert!(result.is_err());
|
|
|
|
assert_eq!(fields_map.len(), 0);
|
|
|
|
}
|
2021-10-12 10:59:14 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
mod primary_key_inference {
|
2022-03-23 17:28:41 +01:00
|
|
|
use big_s::S;
|
2021-10-12 10:59:14 +02:00
|
|
|
use bimap::BiHashMap;
|
|
|
|
|
2021-10-25 17:38:32 +02:00
|
|
|
use crate::documents::DocumentsBatchIndex;
|
|
|
|
use crate::update::index_documents::transform::find_primary_key;
|
2021-10-12 10:59:14 +02:00
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn primary_key_infered_on_first_field() {
|
|
|
|
// We run the test multiple times to change the order in which the fields are iterated upon.
|
|
|
|
for _ in 1..50 {
|
|
|
|
let mut map = BiHashMap::new();
|
2022-03-23 17:28:41 +01:00
|
|
|
map.insert(1, S("fakeId"));
|
|
|
|
map.insert(2, S("fakeId"));
|
|
|
|
map.insert(3, S("fakeId"));
|
|
|
|
map.insert(4, S("fakeId"));
|
|
|
|
map.insert(0, S("realId"));
|
2021-10-12 10:59:14 +02:00
|
|
|
|
2021-10-21 11:05:16 +02:00
|
|
|
assert_eq!(find_primary_key(&DocumentsBatchIndex(map)), Some("realId"));
|
2021-10-12 10:59:14 +02:00
|
|
|
}
|
|
|
|
}
|
2021-01-20 17:27:43 +01:00
|
|
|
}
|
|
|
|
}
|