Update the Transform struct to support JSON updates

This commit is contained in:
Clément Renault 2020-10-31 16:10:15 +01:00
parent 9d47ee52b4
commit f0d028d3a4
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
5 changed files with 288 additions and 33 deletions

7
Cargo.lock generated
View File

@ -435,9 +435,9 @@ checksum = "4358a9e11b9a09cf52383b451b49a169e8d797b68aa02301ff586d70d9661ea3"
[[package]] [[package]]
name = "either" name = "either"
version = "1.5.3" version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb1f6b1ce1c140482ea30ddd3335fc0024ac7ee112895426e0a629a6c20adfe3" checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457"
[[package]] [[package]]
name = "fake-simd" name = "fake-simd"
@ -630,7 +630,7 @@ checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574"
[[package]] [[package]]
name = "grenad" name = "grenad"
version = "0.1.0" version = "0.1.0"
source = "git+https://github.com/Kerollmops/grenad.git?rev=ce3517f#ce3517fdbcf7ff0e1e703a4abbc623f69f29d8e0" source = "git+https://github.com/Kerollmops/grenad.git?rev=3eb7ad9#3eb7ad9fff06c7b4d3286a3e37e40eea12d695de"
dependencies = [ dependencies = [
"byteorder", "byteorder",
"flate2", "flate2",
@ -1796,6 +1796,7 @@ version = "1.0.59"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dcac07dbffa1c65e7f816ab9eba78eb142c6d44410f4eeba1e26e4f5dfa56b95" checksum = "dcac07dbffa1c65e7f816ab9eba78eb142c6d44410f4eeba1e26e4f5dfa56b95"
dependencies = [ dependencies = [
"indexmap",
"itoa", "itoa",
"ryu", "ryu",
"serde", "serde",

View File

@ -13,7 +13,7 @@ csv = "1.1.3"
flate2 = "1.0.17" flate2 = "1.0.17"
fst = "0.4.4" fst = "0.4.4"
fxhash = "0.2.1" fxhash = "0.2.1"
grenad = { git = "https://github.com/Kerollmops/grenad.git", rev = "ce3517f" } grenad = { git = "https://github.com/Kerollmops/grenad.git", rev = "3eb7ad9" }
heed = { version = "0.10.0", default-features = false, features = ["lmdb"] } heed = { version = "0.10.0", default-features = false, features = ["lmdb"] }
human_format = "1.0.3" human_format = "1.0.3"
indexmap = { version = "1.6.0", features = ["serde-1"] } indexmap = { version = "1.6.0", features = ["serde-1"] }
@ -27,7 +27,7 @@ once_cell = "1.4.0"
rayon = "1.3.1" rayon = "1.3.1"
ringtail = "0.3.0" ringtail = "0.3.0"
roaring = "0.6.1" roaring = "0.6.1"
serde_json = "1.0.59" serde_json = { version = "1.0.59", features = ["preserve_order"] }
slice-group-by = "0.2.6" slice-group-by = "0.2.6"
smallstr = { version = "0.2.0", features = ["serde"] } smallstr = { version = "0.2.0", features = ["serde"] }
smallvec = "1.4.0" smallvec = "1.4.0"

View File

@ -10,8 +10,10 @@ pub mod subcommand;
pub mod tokenizer; pub mod tokenizer;
pub mod update; pub mod update;
use std::borrow::Cow;
use std::collections::HashMap; use std::collections::HashMap;
use std::hash::BuildHasherDefault; use std::hash::BuildHasherDefault;
use fxhash::{FxHasher32, FxHasher64}; use fxhash::{FxHasher32, FxHasher64};
pub use self::criterion::{Criterion, default_criteria}; pub use self::criterion::{Criterion, default_criteria};
@ -34,3 +36,5 @@ pub type BEU64 = heed::zerocopy::U64<heed::byteorder::BE>;
pub type DocumentId = u32; pub type DocumentId = u32;
pub type Attribute = u32; pub type Attribute = u32;
pub type Position = u32; pub type Position = u32;
type MergeFn = for<'a> fn(&[u8], &[Cow<'a, [u8]>]) -> anyhow::Result<Vec<u8>>;

View File

@ -18,6 +18,7 @@ use self::merge_function::{
}; };
pub use self::transform::{Transform, TransformOutput}; pub use self::transform::{Transform, TransformOutput};
use crate::MergeFn;
use super::UpdateBuilder; use super::UpdateBuilder;
mod merge_function; mod merge_function;
@ -30,8 +31,6 @@ enum WriteMethod {
GetMergePut, GetMergePut,
} }
type MergeFn = for<'a> fn(&[u8], &[Cow<'a, [u8]>]) -> anyhow::Result<Vec<u8>>;
fn create_writer(typ: CompressionType, level: Option<u32>, file: File) -> io::Result<Writer<File>> { fn create_writer(typ: CompressionType, level: Option<u32>, file: File) -> io::Result<Writer<File>> {
let mut builder = Writer::builder(); let mut builder = Writer::builder();
builder.compression_type(typ); builder.compression_type(typ);
@ -170,6 +169,7 @@ fn write_into_lmdb_database(
} }
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum IndexDocumentsMethod { pub enum IndexDocumentsMethod {
/// Replace the previous document with the new one, /// Replace the previous document with the new one,
/// removing all the already known attributes. /// removing all the already known attributes.
@ -180,6 +180,15 @@ pub enum IndexDocumentsMethod {
UpdateDocuments, UpdateDocuments,
} }
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum UpdateFormat {
/// The given update is a real **comma seperated** CSV with headers on the first line.
Csv,
/// The given update is a JSON array with documents inside.
Json,
}
pub struct IndexDocuments<'t, 'u, 'i> { pub struct IndexDocuments<'t, 'u, 'i> {
wtxn: &'t mut heed::RwTxn<'i, 'u>, wtxn: &'t mut heed::RwTxn<'i, 'u>,
index: &'i Index, index: &'i Index,
@ -192,6 +201,7 @@ pub struct IndexDocuments<'t, 'u, 'i> {
chunk_fusing_shrink_size: Option<u64>, chunk_fusing_shrink_size: Option<u64>,
indexing_jobs: Option<usize>, indexing_jobs: Option<usize>,
update_method: IndexDocumentsMethod, update_method: IndexDocumentsMethod,
update_format: UpdateFormat,
} }
impl<'t, 'u, 'i> IndexDocuments<'t, 'u, 'i> { impl<'t, 'u, 'i> IndexDocuments<'t, 'u, 'i> {
@ -207,7 +217,8 @@ impl<'t, 'u, 'i> IndexDocuments<'t, 'u, 'i> {
chunk_compression_level: None, chunk_compression_level: None,
chunk_fusing_shrink_size: None, chunk_fusing_shrink_size: None,
indexing_jobs: None, indexing_jobs: None,
update_method: IndexDocumentsMethod::ReplaceDocuments update_method: IndexDocumentsMethod::ReplaceDocuments,
update_format: UpdateFormat::Json,
} }
} }
@ -256,6 +267,11 @@ impl<'t, 'u, 'i> IndexDocuments<'t, 'u, 'i> {
self self
} }
pub fn update_format(&mut self, format: UpdateFormat) -> &mut Self {
self.update_format = format;
self
}
pub fn execute<R, F>(self, reader: R, progress_callback: F) -> anyhow::Result<()> pub fn execute<R, F>(self, reader: R, progress_callback: F) -> anyhow::Result<()>
where where
R: io::Read, R: io::Read,
@ -274,6 +290,11 @@ impl<'t, 'u, 'i> IndexDocuments<'t, 'u, 'i> {
index_documents_method: self.update_method, index_documents_method: self.update_method,
}; };
let output = match self.update_format {
UpdateFormat::Csv => transform.from_csv(reader)?,
UpdateFormat::Json => transform.from_json(reader)?,
};
let TransformOutput { let TransformOutput {
primary_key, primary_key,
fields_ids_map, fields_ids_map,
@ -282,7 +303,7 @@ impl<'t, 'u, 'i> IndexDocuments<'t, 'u, 'i> {
replaced_documents_ids, replaced_documents_ids,
documents_count, documents_count,
documents_file, documents_file,
} = transform.from_csv(reader)?; } = output;
// We delete the documents that this document addition replaces. This way we are // We delete the documents that this document addition replaces. This way we are
// able to simply insert all the documents even if they already exist in the database. // able to simply insert all the documents even if they already exist in the database.
@ -302,10 +323,17 @@ impl<'t, 'u, 'i> IndexDocuments<'t, 'u, 'i> {
let _deleted_documents_count = deletion_builder.execute()?; let _deleted_documents_count = deletion_builder.execute()?;
} }
let mmap = if documents_count == 0 {
None
} else {
let mmap = unsafe { let mmap = unsafe {
memmap::Mmap::map(&documents_file).context("mmaping the transform documents file")? memmap::Mmap::map(&documents_file).context("mmaping the transform documents file")?
}; };
let documents = grenad::Reader::new(mmap.as_ref())?; Some(mmap)
};
let bytes = mmap.as_ref().map(AsRef::as_ref).unwrap_or_default();
let documents = grenad::Reader::new(bytes).unwrap();
// The enum which indicates the type of the readers // The enum which indicates the type of the readers
// merges that are potentially done on different threads. // merges that are potentially done on different threads.
@ -499,13 +527,14 @@ mod tests {
let path = tempfile::tempdir().unwrap(); let path = tempfile::tempdir().unwrap();
let mut options = EnvOpenOptions::new(); let mut options = EnvOpenOptions::new();
options.map_size(10 * 1024 * 1024); // 10 MB options.map_size(10 * 1024 * 1024); // 10 MB
let index = Index::new(options, &path).unwrap(); let index = Index::new(options, &path).unwrap();
// First we send 3 documents with ids from 1 to 3. // First we send 3 documents with ids from 1 to 3.
let mut wtxn = index.write_txn().unwrap(); let mut wtxn = index.write_txn().unwrap();
let content = &b"id,name\n1,kevin\n2,kevina\n3,benoit\n"[..]; let content = &b"id,name\n1,kevin\n2,kevina\n3,benoit\n"[..];
IndexDocuments::new(&mut wtxn, &index).execute(content, |_, _| ()).unwrap(); let mut builder = IndexDocuments::new(&mut wtxn, &index);
builder.update_format(UpdateFormat::Csv);
builder.execute(content, |_, _| ()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
// Check that there is 3 documents now. // Check that there is 3 documents now.
@ -517,7 +546,9 @@ mod tests {
// Second we send 1 document with id 1, to erase the previous ones. // Second we send 1 document with id 1, to erase the previous ones.
let mut wtxn = index.write_txn().unwrap(); let mut wtxn = index.write_txn().unwrap();
let content = &b"id,name\n1,updated kevin\n"[..]; let content = &b"id,name\n1,updated kevin\n"[..];
IndexDocuments::new(&mut wtxn, &index).execute(content, |_, _| ()).unwrap(); let mut builder = IndexDocuments::new(&mut wtxn, &index);
builder.update_format(UpdateFormat::Csv);
builder.execute(content, |_, _| ()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
// Check that there is **always** 3 documents. // Check that there is **always** 3 documents.
@ -529,7 +560,9 @@ mod tests {
// Third we send 3 documents again to replace the existing ones. // Third we send 3 documents again to replace the existing ones.
let mut wtxn = index.write_txn().unwrap(); let mut wtxn = index.write_txn().unwrap();
let content = &b"id,name\n1,updated second kevin\n2,updated kevina\n3,updated benoit\n"[..]; let content = &b"id,name\n1,updated second kevin\n2,updated kevina\n3,updated benoit\n"[..];
IndexDocuments::new(&mut wtxn, &index).execute(content, |_, _| ()).unwrap(); let mut builder = IndexDocuments::new(&mut wtxn, &index);
builder.update_format(UpdateFormat::Csv);
builder.execute(content, |_, _| ()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
// Check that there is **always** 3 documents. // Check that there is **always** 3 documents.
@ -544,7 +577,6 @@ mod tests {
let path = tempfile::tempdir().unwrap(); let path = tempfile::tempdir().unwrap();
let mut options = EnvOpenOptions::new(); let mut options = EnvOpenOptions::new();
options.map_size(10 * 1024 * 1024); // 10 MB options.map_size(10 * 1024 * 1024); // 10 MB
let index = Index::new(options, &path).unwrap(); let index = Index::new(options, &path).unwrap();
// First we send 3 documents with duplicate ids and // First we send 3 documents with duplicate ids and
@ -552,6 +584,7 @@ mod tests {
let mut wtxn = index.write_txn().unwrap(); let mut wtxn = index.write_txn().unwrap();
let content = &b"id,name\n1,kevin\n1,kevina\n1,benoit\n"[..]; let content = &b"id,name\n1,kevin\n1,kevina\n1,benoit\n"[..];
let mut builder = IndexDocuments::new(&mut wtxn, &index); let mut builder = IndexDocuments::new(&mut wtxn, &index);
builder.update_format(UpdateFormat::Csv);
builder.index_documents_method(IndexDocumentsMethod::UpdateDocuments); builder.index_documents_method(IndexDocumentsMethod::UpdateDocuments);
builder.execute(content, |_, _| ()).unwrap(); builder.execute(content, |_, _| ()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
@ -578,6 +611,7 @@ mod tests {
let mut wtxn = index.write_txn().unwrap(); let mut wtxn = index.write_txn().unwrap();
let content = &b"id,age\n1,25\n"[..]; let content = &b"id,age\n1,25\n"[..];
let mut builder = IndexDocuments::new(&mut wtxn, &index); let mut builder = IndexDocuments::new(&mut wtxn, &index);
builder.update_format(UpdateFormat::Csv);
builder.index_documents_method(IndexDocumentsMethod::UpdateDocuments); builder.index_documents_method(IndexDocumentsMethod::UpdateDocuments);
builder.execute(content, |_, _| ()).unwrap(); builder.execute(content, |_, _| ()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
@ -607,13 +641,14 @@ mod tests {
let path = tempfile::tempdir().unwrap(); let path = tempfile::tempdir().unwrap();
let mut options = EnvOpenOptions::new(); let mut options = EnvOpenOptions::new();
options.map_size(10 * 1024 * 1024); // 10 MB options.map_size(10 * 1024 * 1024); // 10 MB
let index = Index::new(options, &path).unwrap(); let index = Index::new(options, &path).unwrap();
// First we send 3 documents with ids from 1 to 3. // First we send 3 documents with ids from 1 to 3.
let mut wtxn = index.write_txn().unwrap(); let mut wtxn = index.write_txn().unwrap();
let content = &b"name\nkevin\nkevina\nbenoit\n"[..]; let content = &b"name\nkevin\nkevina\nbenoit\n"[..];
IndexDocuments::new(&mut wtxn, &index).execute(content, |_, _| ()).unwrap(); let mut builder = IndexDocuments::new(&mut wtxn, &index);
builder.update_format(UpdateFormat::Csv);
builder.execute(content, |_, _| ()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
// Check that there is 3 documents now. // Check that there is 3 documents now.
@ -629,7 +664,9 @@ mod tests {
// Second we send 1 document with the generated uuid, to erase the previous ones. // Second we send 1 document with the generated uuid, to erase the previous ones.
let mut wtxn = index.write_txn().unwrap(); let mut wtxn = index.write_txn().unwrap();
let content = format!("id,name\n{},updated kevin", kevin_uuid); let content = format!("id,name\n{},updated kevin", kevin_uuid);
IndexDocuments::new(&mut wtxn, &index).execute(content.as_bytes(), |_, _| ()).unwrap(); let mut builder = IndexDocuments::new(&mut wtxn, &index);
builder.update_format(UpdateFormat::Csv);
builder.execute(content.as_bytes(), |_, _| ()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
// Check that there is **always** 3 documents. // Check that there is **always** 3 documents.
@ -644,13 +681,14 @@ mod tests {
let path = tempfile::tempdir().unwrap(); let path = tempfile::tempdir().unwrap();
let mut options = EnvOpenOptions::new(); let mut options = EnvOpenOptions::new();
options.map_size(10 * 1024 * 1024); // 10 MB options.map_size(10 * 1024 * 1024); // 10 MB
let index = Index::new(options, &path).unwrap(); let index = Index::new(options, &path).unwrap();
// First we send 3 documents with ids from 1 to 3. // First we send 3 documents with ids from 1 to 3.
let mut wtxn = index.write_txn().unwrap(); let mut wtxn = index.write_txn().unwrap();
let content = &b"id,name\n1,kevin\n2,kevina\n3,benoit\n"[..]; let content = &b"id,name\n1,kevin\n2,kevina\n3,benoit\n"[..];
IndexDocuments::new(&mut wtxn, &index).execute(content, |_, _| ()).unwrap(); let mut builder = IndexDocuments::new(&mut wtxn, &index);
builder.update_format(UpdateFormat::Csv);
builder.execute(content, |_, _| ()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
// Check that there is 3 documents now. // Check that there is 3 documents now.
@ -662,7 +700,9 @@ mod tests {
// Second we send 1 document without specifying the id. // Second we send 1 document without specifying the id.
let mut wtxn = index.write_txn().unwrap(); let mut wtxn = index.write_txn().unwrap();
let content = &b"name\nnew kevin"[..]; let content = &b"name\nnew kevin"[..];
IndexDocuments::new(&mut wtxn, &index).execute(content, |_, _| ()).unwrap(); let mut builder = IndexDocuments::new(&mut wtxn, &index);
builder.update_format(UpdateFormat::Csv);
builder.execute(content, |_, _| ()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
// Check that there is 4 documents now. // Check that there is 4 documents now.
@ -671,4 +711,74 @@ mod tests {
assert_eq!(count, 4); assert_eq!(count, 4);
drop(rtxn); drop(rtxn);
} }
#[test]
fn empty_csv_update() {
let path = tempfile::tempdir().unwrap();
let mut options = EnvOpenOptions::new();
options.map_size(10 * 1024 * 1024); // 10 MB
let index = Index::new(options, &path).unwrap();
// First we send 0 documents and only headers.
let mut wtxn = index.write_txn().unwrap();
let content = &b"id,name\n"[..];
let mut builder = IndexDocuments::new(&mut wtxn, &index);
builder.update_format(UpdateFormat::Csv);
builder.execute(content, |_, _| ()).unwrap();
wtxn.commit().unwrap();
// Check that there is no documents.
let rtxn = index.read_txn().unwrap();
let count = index.number_of_documents(&rtxn).unwrap();
assert_eq!(count, 0);
drop(rtxn);
}
#[test]
fn json_documents() {
let path = tempfile::tempdir().unwrap();
let mut options = EnvOpenOptions::new();
options.map_size(10 * 1024 * 1024); // 10 MB
let index = Index::new(options, &path).unwrap();
// First we send 3 documents with an id for only one of them.
let mut wtxn = index.write_txn().unwrap();
let content = &br#"[
{ "name": "kevin" },
{ "name": "kevina", "id": "21" },
{ "name": "benoit" }
]"#[..];
let mut builder = IndexDocuments::new(&mut wtxn, &index);
builder.update_format(UpdateFormat::Json);
builder.execute(content, |_, _| ()).unwrap();
wtxn.commit().unwrap();
// Check that there is 3 documents now.
let rtxn = index.read_txn().unwrap();
let count = index.number_of_documents(&rtxn).unwrap();
assert_eq!(count, 3);
drop(rtxn);
}
#[test]
fn empty_json_update() {
let path = tempfile::tempdir().unwrap();
let mut options = EnvOpenOptions::new();
options.map_size(10 * 1024 * 1024); // 10 MB
let index = Index::new(options, &path).unwrap();
// First we send 0 documents.
let mut wtxn = index.write_txn().unwrap();
let content = &b"[]"[..];
let mut builder = IndexDocuments::new(&mut wtxn, &index);
builder.update_format(UpdateFormat::Json);
builder.execute(content, |_, _| ()).unwrap();
wtxn.commit().unwrap();
// Check that there is no documents.
let rtxn = index.read_txn().unwrap();
let count = index.number_of_documents(&rtxn).unwrap();
assert_eq!(count, 0);
drop(rtxn);
}
} }

View File

@ -7,8 +7,9 @@ use anyhow::{anyhow, Context};
use fst::{IntoStreamer, Streamer}; use fst::{IntoStreamer, Streamer};
use grenad::CompressionType; use grenad::CompressionType;
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use serde_json::{Map, Value};
use crate::{BEU32, Index, FieldsIdsMap}; use crate::{BEU32, MergeFn, Index, FieldsIdsMap};
use crate::update::AvailableDocumentsIds; use crate::update::AvailableDocumentsIds;
use super::merge_function::merge_two_obkvs; use super::merge_function::merge_two_obkvs;
use super::{create_writer, create_sorter, IndexDocumentsMethod}; use super::{create_writer, create_sorter, IndexDocumentsMethod};
@ -35,6 +36,114 @@ pub struct Transform<'t, 'i> {
} }
impl Transform<'_, '_> { impl Transform<'_, '_> {
/// Extract the users ids, deduplicate and compute the new internal documents ids
/// and fields ids, writing all the documents under their internal ids into a final file.
///
/// Outputs the new `FieldsIdsMap`, the new `UsersIdsDocumentsIds` map, the new documents ids,
/// the replaced documents ids, the number of documents in this update and the file
/// containing all those documents.
pub fn from_json<R: Read>(self, reader: R) -> anyhow::Result<TransformOutput> {
let mut fields_ids_map = self.index.fields_ids_map(self.rtxn)?;
let users_ids_documents_ids = self.index.users_ids_documents_ids(self.rtxn).unwrap();
let primary_key = self.index.primary_key(self.rtxn)?;
// Deserialize the whole batch of documents in memory.
let documents: Vec<Map<String, Value>> = serde_json::from_reader(reader)?;
// We extract the primary key from the first document in
// the batch if it hasn't already been defined in the index.
let primary_key = match primary_key {
Some(primary_key) => primary_key,
None => {
match documents.get(0).and_then(|doc| doc.keys().find(|k| k.contains("id"))) {
Some(key) => fields_ids_map.insert(&key).context("field id limit reached")?,
None => fields_ids_map.insert("id").context("field id limit reached")?,
}
},
};
if documents.is_empty() {
return Ok(TransformOutput {
primary_key,
fields_ids_map,
users_ids_documents_ids: fst::Map::default(),
new_documents_ids: RoaringBitmap::new(),
replaced_documents_ids: RoaringBitmap::new(),
documents_count: 0,
documents_file: tempfile::tempfile()?,
});
}
// Get the primary key field name now, this way we will
// be able to get the value in the JSON Map document.
let primary_key_name = fields_ids_map
.name(primary_key)
.expect("found the primary key name")
.to_owned();
// We must choose the appropriate merge function for when two or more documents
// with the same user id must be merged or fully replaced in the same batch.
let merge_function = match self.index_documents_method {
IndexDocumentsMethod::ReplaceDocuments => keep_latest_obkv,
IndexDocumentsMethod::UpdateDocuments => merge_obkvs,
};
// We initialize the sorter with the user indexing settings.
let mut sorter = create_sorter(
merge_function,
self.chunk_compression_type,
self.chunk_compression_level,
self.chunk_fusing_shrink_size,
self.max_nb_chunks,
self.max_memory,
);
let mut json_buffer = Vec::new();
let mut obkv_buffer = Vec::new();
let mut uuid_buffer = [0; uuid::adapter::Hyphenated::LENGTH];
for mut document in documents {
obkv_buffer.clear();
let mut writer = obkv::KvWriter::new(&mut obkv_buffer);
// We prepare the fields ids map with the documents keys.
for (key, _value) in &document {
fields_ids_map.insert(&key).context("field id limit reached")?;
}
// We iterate in the fields ids ordered.
for (field_id, name) in fields_ids_map.iter() {
if let Some(value) = document.get(name) {
// We serialize the attribute values.
json_buffer.clear();
serde_json::to_writer(&mut json_buffer, value)?;
writer.insert(field_id, &json_buffer)?;
}
}
// We retrieve the user id from the document based on the primary key name,
// if the document id isn't present we generate a uuid.
let user_id = match document.remove(&primary_key_name) {
Some(value) => match value {
Value::String(string) => Cow::Owned(string),
Value::Number(number) => Cow::Owned(number.to_string()),
_ => return Err(anyhow!("documents ids must be either strings or numbers")),
},
None => {
let uuid = uuid::Uuid::new_v4().to_hyphenated().encode_lower(&mut uuid_buffer);
Cow::Borrowed(uuid)
},
};
// We use the extracted/generated user id as the key for this document.
sorter.insert(user_id.as_bytes(), &obkv_buffer)?;
}
// Now that we have a valid sorter that contains the user id and the obkv we
// give it to the last transforming function which returns the TransformOutput.
self.from_sorter(sorter, primary_key, fields_ids_map, users_ids_documents_ids)
}
/// Extract the users ids, deduplicate and compute the new internal documents ids /// Extract the users ids, deduplicate and compute the new internal documents ids
/// and fields ids, writing all the documents under their internal ids into a final file. /// and fields ids, writing all the documents under their internal ids into a final file.
/// ///
@ -43,8 +152,6 @@ impl Transform<'_, '_> {
/// containing all those documents. /// containing all those documents.
pub fn from_csv<R: Read>(self, reader: R) -> anyhow::Result<TransformOutput> { pub fn from_csv<R: Read>(self, reader: R) -> anyhow::Result<TransformOutput> {
let mut fields_ids_map = self.index.fields_ids_map(self.rtxn)?; let mut fields_ids_map = self.index.fields_ids_map(self.rtxn)?;
let documents_ids = self.index.documents_ids(self.rtxn)?;
let mut available_documents_ids = AvailableDocumentsIds::from_documents_ids(&documents_ids);
let users_ids_documents_ids = self.index.users_ids_documents_ids(self.rtxn).unwrap(); let users_ids_documents_ids = self.index.users_ids_documents_ids(self.rtxn).unwrap();
let mut csv = csv::Reader::from_reader(reader); let mut csv = csv::Reader::from_reader(reader);
@ -85,11 +192,6 @@ impl Transform<'_, '_> {
// the records fields in the fields ids map order and correctly generate the obkv. // the records fields in the fields ids map order and correctly generate the obkv.
fields_ids.sort_unstable_by_key(|(field_id, _)| *field_id); fields_ids.sort_unstable_by_key(|(field_id, _)| *field_id);
/// Only the last value associated with an id is kept.
fn keep_latest_obkv(_key: &[u8], obkvs: &[Cow<[u8]>]) -> anyhow::Result<Vec<u8>> {
obkvs.last().context("no last value").map(|last| last.clone().into_owned())
}
// We initialize the sorter with the user indexing settings. // We initialize the sorter with the user indexing settings.
let mut sorter = create_sorter( let mut sorter = create_sorter(
keep_latest_obkv, keep_latest_obkv,
@ -105,9 +207,9 @@ impl Transform<'_, '_> {
let mut json_buffer = Vec::new(); let mut json_buffer = Vec::new();
let mut obkv_buffer = Vec::new(); let mut obkv_buffer = Vec::new();
let mut uuid_buffer = [0; uuid::adapter::Hyphenated::LENGTH]; let mut uuid_buffer = [0; uuid::adapter::Hyphenated::LENGTH];
let mut record = csv::StringRecord::new(); let mut record = csv::StringRecord::new();
while csv.read_record(&mut record)? { while csv.read_record(&mut record)? {
obkv_buffer.clear(); obkv_buffer.clear();
let mut writer = obkv::KvWriter::new(&mut obkv_buffer); let mut writer = obkv::KvWriter::new(&mut obkv_buffer);
@ -138,6 +240,25 @@ impl Transform<'_, '_> {
sorter.insert(user_id, &obkv_buffer)?; sorter.insert(user_id, &obkv_buffer)?;
} }
// Now that we have a valid sorter that contains the user id and the obkv we
// give it to the last transforming function which returns the TransformOutput.
self.from_sorter(sorter, primary_key_field_id, fields_ids_map, users_ids_documents_ids)
}
/// Generate the TransformOutput based on the given sorter that can be generated from any
/// format like CSV, JSON or JSON lines. This sorter must contain a key that is the document
/// id for the user side and the value must be an obkv where keys are valid fields ids.
fn from_sorter(
self,
sorter: grenad::Sorter<MergeFn>,
primary_key: u8,
fields_ids_map: FieldsIdsMap,
users_ids_documents_ids: fst::Map<Cow<'_, [u8]>>,
) -> anyhow::Result<TransformOutput>
{
let documents_ids = self.index.documents_ids(self.rtxn)?;
let mut available_documents_ids = AvailableDocumentsIds::from_documents_ids(&documents_ids);
// Once we have sort and deduplicated the documents we write them into a final file. // Once we have sort and deduplicated the documents we write them into a final file.
let mut final_sorter = create_sorter( let mut final_sorter = create_sorter(
|_docid, _obkvs| Err(anyhow!("cannot merge two documents")), |_docid, _obkvs| Err(anyhow!("cannot merge two documents")),
@ -150,6 +271,7 @@ impl Transform<'_, '_> {
let mut new_users_ids_documents_ids_builder = fst::MapBuilder::memory(); let mut new_users_ids_documents_ids_builder = fst::MapBuilder::memory();
let mut replaced_documents_ids = RoaringBitmap::new(); let mut replaced_documents_ids = RoaringBitmap::new();
let mut new_documents_ids = RoaringBitmap::new(); let mut new_documents_ids = RoaringBitmap::new();
let mut obkv_buffer = Vec::new();
// While we write into final file we get or generate the internal documents ids. // While we write into final file we get or generate the internal documents ids.
let mut documents_count = 0; let mut documents_count = 0;
@ -219,7 +341,7 @@ impl Transform<'_, '_> {
} }
Ok(TransformOutput { Ok(TransformOutput {
primary_key: primary_key_field_id, primary_key,
fields_ids_map, fields_ids_map,
users_ids_documents_ids: users_ids_documents_ids_builder.into_map(), users_ids_documents_ids: users_ids_documents_ids_builder.into_map(),
new_documents_ids, new_documents_ids,
@ -229,3 +351,21 @@ impl Transform<'_, '_> {
}) })
} }
} }
/// Only the last value associated with an id is kept.
fn keep_latest_obkv(_key: &[u8], obkvs: &[Cow<[u8]>]) -> anyhow::Result<Vec<u8>> {
obkvs.last().context("no last value").map(|last| last.clone().into_owned())
}
/// Merge all the obks in the order we see them.
fn merge_obkvs(_key: &[u8], obkvs: &[Cow<[u8]>]) -> anyhow::Result<Vec<u8>> {
let mut iter = obkvs.iter();
let first = iter.next().map(|b| b.clone().into_owned()).context("no first value")?;
Ok(iter.fold(first, |acc, current| {
let first = obkv::KvReader::new(&acc);
let second = obkv::KvReader::new(current);
let mut buffer = Vec::new();
merge_two_obkvs(first, second, &mut buffer);
buffer
}))
}