diff --git a/Cargo.lock b/Cargo.lock index 66b5e0147..a1608dbd6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -611,7 +611,7 @@ checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574" [[package]] name = "grenad" version = "0.1.0" -source = "git+https://github.com/Kerollmops/grenad.git?rev=c390cfe#c390cfed1dc8a26ca108ffaeb7bdd978fa4e9021" +source = "git+https://github.com/Kerollmops/grenad.git?rev=ce3517f#ce3517fdbcf7ff0e1e703a4abbc623f69f29d8e0" dependencies = [ "byteorder", "flate2", diff --git a/Cargo.toml b/Cargo.toml index 264cddc50..0a15aa653 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ csv = "1.1.3" flate2 = "1.0.17" fst = "0.4.4" fxhash = "0.2.1" -grenad = { git = "https://github.com/Kerollmops/grenad.git", rev = "c390cfe" } +grenad = { git = "https://github.com/Kerollmops/grenad.git", rev = "ce3517f" } heed = { version = "0.8.1", default-features = false, features = ["lmdb"] } human_format = "1.0.3" indexmap = { version = "1.6.0", features = ["serde-1"] } diff --git a/src/indexing/transform.rs b/src/indexing/transform.rs index 40d34a1b7..f5187c2df 100644 --- a/src/indexing/transform.rs +++ b/src/indexing/transform.rs @@ -1,12 +1,12 @@ use std::borrow::Cow; -use std::convert::{TryFrom, TryInto}; +use std::convert::TryFrom; use std::fs::File; -use std::io::{Read, Write, Seek, SeekFrom}; +use std::io::{Read, Seek, SeekFrom}; -use anyhow::{bail, Context}; +use anyhow::Context; use crate::{FieldsIdsMap, AvailableDocumentsIds}; use fst::{IntoStreamer, Streamer}; -use grenad::{Writer, Sorter, Reader, CompressionType}; +use grenad::{Writer, Sorter, CompressionType}; use roaring::RoaringBitmap; pub struct TransformOutput { @@ -49,7 +49,7 @@ impl> Transform { /// The last value associated with an id is kept. fn merge_last_win(_key: &[u8], vals: &[Cow<[u8]>]) -> anyhow::Result> { - Ok(vals.last().unwrap().clone().into_owned()) + vals.last().context("no last value").map(|last| last.clone().into_owned()) } // We initialize the sorter with the user indexing settings. @@ -63,23 +63,27 @@ impl> Transform { // We write into the sorter to merge and deduplicate the documents // based on the users ids. let mut sorter = sorter_builder.build(); - let mut buffer = Vec::new(); + let mut json_buffer = Vec::new(); + let mut obkv_buffer = Vec::new(); let mut record = csv::StringRecord::new(); while csv.read_record(&mut record)? { - buffer.clear(); - let mut writer = obkv::KvWriter::new(&mut buffer); + obkv_buffer.clear(); + let mut writer = obkv::KvWriter::new(&mut obkv_buffer); // We retrieve the field id based on the CSV header position // and zip it with the record value. for (key, field) in fields_ids.iter().copied().zip(&record) { - // TODO we must serialize the values as JSON strings. - writer.insert(key, field)?; + // We serialize the attribute values as JSON strings. + json_buffer.clear(); + serde_json::to_writer(&mut json_buffer, &field)?; + writer.insert(key, &json_buffer)?; } // We extract the user id and use it as the key for this document. + // TODO we must validate the user id (i.e. [a-zA-Z0-9\-_]). let user_id = &record[user_id_pos]; - sorter.insert(user_id, &buffer)?; + sorter.insert(user_id, &obkv_buffer)?; } // Once we have sort and deduplicated the documents we write them into a final file. @@ -129,7 +133,7 @@ impl> Transform { // We create the union between the existing users ids documents ids with the new ones. let new_users_ids_documents_ids = new_users_ids_documents_ids_builder.into_map(); - let mut union_ = fst::map::OpBuilder::new() + let union_ = fst::map::OpBuilder::new() .add(&self.users_ids_documents_ids) .add(&new_users_ids_documents_ids) .r#union();