2021-08-31 17:44:15 +08:00
|
|
|
mod builder;
|
2022-06-20 19:48:02 +08:00
|
|
|
mod enriched;
|
2023-11-09 21:19:16 +08:00
|
|
|
mod primary_key;
|
2021-08-31 17:44:15 +08:00
|
|
|
mod reader;
|
2022-07-12 00:38:50 +08:00
|
|
|
mod serde_impl;
|
2021-08-31 17:44:15 +08:00
|
|
|
|
2023-03-09 18:23:57 +08:00
|
|
|
use std::fmt::Debug;
|
2021-10-25 23:38:32 +08:00
|
|
|
use std::io;
|
2022-06-20 19:48:02 +08:00
|
|
|
use std::str::Utf8Error;
|
2021-08-31 17:44:15 +08:00
|
|
|
|
|
|
|
use bimap::BiHashMap;
|
2022-06-14 22:03:48 +08:00
|
|
|
pub use builder::DocumentsBatchBuilder;
|
2022-06-20 19:48:02 +08:00
|
|
|
pub use enriched::{EnrichedDocument, EnrichedDocumentsBatchCursor, EnrichedDocumentsBatchReader};
|
2022-06-15 00:12:15 +08:00
|
|
|
use obkv::KvReader;
|
2024-05-21 22:36:47 +08:00
|
|
|
pub use primary_key::{
|
|
|
|
validate_document_id_value, DocumentIdExtractionError, FieldIdMapper, PrimaryKey,
|
|
|
|
DEFAULT_PRIMARY_KEY,
|
|
|
|
};
|
2022-06-16 18:03:43 +08:00
|
|
|
pub use reader::{DocumentsBatchCursor, DocumentsBatchCursorError, DocumentsBatchReader};
|
2022-04-04 19:51:50 +08:00
|
|
|
use serde::{Deserialize, Serialize};
|
2021-08-31 17:44:15 +08:00
|
|
|
|
2022-06-15 00:12:15 +08:00
|
|
|
use crate::error::{FieldIdMapMissingEntry, InternalError};
|
|
|
|
use crate::{FieldId, Object, Result};
|
2021-08-31 17:44:15 +08:00
|
|
|
|
2022-06-14 22:03:48 +08:00
|
|
|
/// The key that is used to store the `DocumentsBatchIndex` datastructure,
|
|
|
|
/// it is the absolute last key of the list.
|
|
|
|
const DOCUMENTS_BATCH_INDEX_KEY: [u8; 8] = u64::MAX.to_be_bytes();
|
|
|
|
|
2022-06-15 00:12:15 +08:00
|
|
|
/// Helper function to convert an obkv reader into a JSON object.
|
2024-08-30 01:20:10 +08:00
|
|
|
pub fn obkv_to_object(obkv: &KvReader<FieldId>, index: &DocumentsBatchIndex) -> Result<Object> {
|
2022-06-15 00:12:15 +08:00
|
|
|
obkv.iter()
|
|
|
|
.map(|(field_id, value)| {
|
2022-10-25 03:34:13 +08:00
|
|
|
let field_name = index
|
|
|
|
.name(field_id)
|
|
|
|
.ok_or(FieldIdMapMissingEntry::FieldId { field_id, process: "obkv_to_object" })?;
|
2022-06-15 00:12:15 +08:00
|
|
|
let value = serde_json::from_slice(value).map_err(InternalError::SerdeJson)?;
|
|
|
|
Ok((field_name.to_string(), value))
|
|
|
|
})
|
|
|
|
.collect()
|
|
|
|
}
|
|
|
|
|
2021-08-31 17:44:15 +08:00
|
|
|
/// A bidirectional map that links field ids to their name in a document batch.
|
2022-06-14 22:03:48 +08:00
|
|
|
#[derive(Default, Clone, Debug, Serialize, Deserialize)]
|
2021-10-21 17:05:16 +08:00
|
|
|
pub struct DocumentsBatchIndex(pub BiHashMap<FieldId, String>);
|
|
|
|
|
|
|
|
impl DocumentsBatchIndex {
|
|
|
|
/// Insert the field in the map, or return it's field id if it doesn't already exists.
|
2021-10-25 23:38:32 +08:00
|
|
|
pub fn insert(&mut self, field: &str) -> FieldId {
|
2021-10-21 17:05:16 +08:00
|
|
|
match self.0.get_by_right(field) {
|
|
|
|
Some(field_id) => *field_id,
|
|
|
|
None => {
|
|
|
|
let field_id = self.0.len() as FieldId;
|
|
|
|
self.0.insert(field_id, field.to_string());
|
|
|
|
field_id
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn is_empty(&self) -> bool {
|
|
|
|
self.0.is_empty()
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn len(&self) -> usize {
|
|
|
|
self.0.len()
|
|
|
|
}
|
|
|
|
|
2024-07-09 23:25:39 +08:00
|
|
|
pub fn iter(&self) -> bimap::hash::Iter<'_, FieldId, String> {
|
2021-10-21 17:05:16 +08:00
|
|
|
self.0.iter()
|
|
|
|
}
|
|
|
|
|
2022-06-14 22:03:48 +08:00
|
|
|
pub fn name(&self, id: FieldId) -> Option<&str> {
|
|
|
|
self.0.get_by_left(&id).map(AsRef::as_ref)
|
2021-10-21 17:05:16 +08:00
|
|
|
}
|
2022-03-24 00:28:41 +08:00
|
|
|
|
2022-06-15 00:12:15 +08:00
|
|
|
pub fn id(&self, name: &str) -> Option<FieldId> {
|
|
|
|
self.0.get_by_right(name).cloned()
|
|
|
|
}
|
|
|
|
|
2024-08-30 01:20:10 +08:00
|
|
|
pub fn recreate_json(&self, document: &obkv::KvReaderU16) -> Result<Object> {
|
2022-06-15 00:12:15 +08:00
|
|
|
let mut map = Object::new();
|
2022-03-24 00:28:41 +08:00
|
|
|
|
|
|
|
for (k, v) in document.iter() {
|
|
|
|
// TODO: TAMO: update the error type
|
|
|
|
let key =
|
|
|
|
self.0.get_by_left(&k).ok_or(crate::error::InternalError::DatabaseClosing)?.clone();
|
|
|
|
let value = serde_json::from_slice::<serde_json::Value>(v)
|
|
|
|
.map_err(crate::error::InternalError::SerdeJson)?;
|
|
|
|
map.insert(key, value);
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(map)
|
|
|
|
}
|
2021-10-21 17:05:16 +08:00
|
|
|
}
|
2021-08-31 17:44:15 +08:00
|
|
|
|
2023-11-13 20:37:58 +08:00
|
|
|
impl FieldIdMapper for DocumentsBatchIndex {
|
2023-11-09 21:19:16 +08:00
|
|
|
fn id(&self, name: &str) -> Option<FieldId> {
|
|
|
|
self.id(name)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-03-09 18:23:57 +08:00
|
|
|
#[derive(Debug, thiserror::Error)]
|
2021-08-31 17:44:15 +08:00
|
|
|
pub enum Error {
|
2023-03-09 18:23:57 +08:00
|
|
|
#[error("Error parsing number {value:?} at line {line}: {error}")]
|
2021-10-25 23:38:32 +08:00
|
|
|
ParseFloat { error: std::num::ParseFloatError, line: usize, value: String },
|
2023-03-09 18:23:57 +08:00
|
|
|
#[error("Error parsing boolean {value:?} at line {line}: {error}")]
|
2023-03-09 18:12:49 +08:00
|
|
|
ParseBool { error: std::str::ParseBoolError, line: usize, value: String },
|
2023-03-09 18:23:57 +08:00
|
|
|
#[error("Invalid document addition format, missing the documents batch index.")]
|
2021-08-31 17:44:15 +08:00
|
|
|
InvalidDocumentFormat,
|
2023-03-09 18:23:57 +08:00
|
|
|
#[error("Invalid enriched data.")]
|
2022-06-20 19:48:02 +08:00
|
|
|
InvalidEnrichedData,
|
2023-03-09 18:23:57 +08:00
|
|
|
#[error(transparent)]
|
|
|
|
InvalidUtf8(#[from] Utf8Error),
|
|
|
|
#[error(transparent)]
|
|
|
|
Csv(#[from] csv::Error),
|
|
|
|
#[error(transparent)]
|
|
|
|
Json(#[from] serde_json::Error),
|
|
|
|
#[error(transparent)]
|
2022-06-14 22:03:48 +08:00
|
|
|
Serialize(serde_json::Error),
|
2023-03-09 18:23:57 +08:00
|
|
|
#[error(transparent)]
|
|
|
|
Grenad(#[from] grenad::Error),
|
|
|
|
#[error(transparent)]
|
|
|
|
Io(#[from] io::Error),
|
2021-08-31 17:44:15 +08:00
|
|
|
}
|
|
|
|
|
2022-08-10 15:32:03 +08:00
|
|
|
pub fn objects_from_json_value(json: serde_json::Value) -> Vec<crate::Object> {
|
|
|
|
let documents = match json {
|
|
|
|
object @ serde_json::Value::Object(_) => vec![object],
|
|
|
|
serde_json::Value::Array(objects) => objects,
|
|
|
|
invalid => {
|
|
|
|
panic!("an array of objects must be specified, {:#?} is not an array", invalid)
|
|
|
|
}
|
|
|
|
};
|
|
|
|
let mut objects = vec![];
|
|
|
|
for document in documents {
|
|
|
|
let object = match document {
|
|
|
|
serde_json::Value::Object(object) => object,
|
|
|
|
invalid => panic!("an object must be specified, {:#?} is not an object", invalid),
|
|
|
|
};
|
|
|
|
objects.push(object);
|
|
|
|
}
|
|
|
|
objects
|
|
|
|
}
|
|
|
|
|
2021-08-31 17:44:15 +08:00
|
|
|
/// Macro used to generate documents, with the same syntax as `serde_json::json`
|
|
|
|
#[cfg(test)]
|
|
|
|
macro_rules! documents {
|
|
|
|
($data:tt) => {{
|
|
|
|
let documents = serde_json::json!($data);
|
2022-08-10 15:32:03 +08:00
|
|
|
let documents = $crate::documents::objects_from_json_value(documents);
|
|
|
|
$crate::documents::documents_batch_reader_from_objects(documents)
|
2021-08-31 17:44:15 +08:00
|
|
|
}};
|
|
|
|
}
|
|
|
|
|
2022-08-02 21:13:06 +08:00
|
|
|
pub fn documents_batch_reader_from_objects(
|
|
|
|
objects: impl IntoIterator<Item = Object>,
|
|
|
|
) -> DocumentsBatchReader<std::io::Cursor<Vec<u8>>> {
|
|
|
|
let mut builder = DocumentsBatchBuilder::new(Vec::new());
|
|
|
|
for object in objects {
|
|
|
|
builder.append_json_object(&object).unwrap();
|
|
|
|
}
|
2022-08-10 15:32:03 +08:00
|
|
|
let vector = builder.into_inner().unwrap();
|
|
|
|
DocumentsBatchReader::from_reader(std::io::Cursor::new(vector)).unwrap()
|
2022-08-02 21:13:06 +08:00
|
|
|
}
|
|
|
|
|
2021-08-31 17:44:15 +08:00
|
|
|
#[cfg(test)]
|
|
|
|
mod test {
|
2021-10-24 20:41:36 +08:00
|
|
|
use std::io::Cursor;
|
|
|
|
|
2021-08-31 17:44:15 +08:00
|
|
|
use serde_json::{json, Value};
|
|
|
|
|
|
|
|
use super::*;
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn create_documents_no_errors() {
|
2022-06-14 22:04:27 +08:00
|
|
|
let value = json!({
|
2021-08-31 17:44:15 +08:00
|
|
|
"number": 1,
|
|
|
|
"string": "this is a field",
|
|
|
|
"array": ["an", "array"],
|
|
|
|
"object": {
|
|
|
|
"key": "value",
|
|
|
|
},
|
|
|
|
"bool": true
|
|
|
|
});
|
|
|
|
|
2022-06-14 22:04:27 +08:00
|
|
|
let mut builder = DocumentsBatchBuilder::new(Vec::new());
|
|
|
|
builder.append_json_object(value.as_object().unwrap()).unwrap();
|
|
|
|
let vector = builder.into_inner().unwrap();
|
2021-08-31 17:44:15 +08:00
|
|
|
|
2022-07-18 22:08:01 +08:00
|
|
|
let (mut documents, index) = DocumentsBatchReader::from_reader(Cursor::new(vector))
|
|
|
|
.unwrap()
|
|
|
|
.into_cursor_and_fields_index();
|
2021-08-31 17:44:15 +08:00
|
|
|
|
2022-07-18 22:08:01 +08:00
|
|
|
assert_eq!(index.iter().count(), 5);
|
2022-06-14 22:04:27 +08:00
|
|
|
let reader = documents.next_document().unwrap().unwrap();
|
|
|
|
assert_eq!(reader.iter().count(), 5);
|
|
|
|
assert!(documents.next_document().unwrap().is_none());
|
2021-08-31 17:44:15 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_add_multiple_documents() {
|
|
|
|
let doc1 = json!({
|
|
|
|
"bool": true,
|
|
|
|
});
|
|
|
|
let doc2 = json!({
|
|
|
|
"toto": false,
|
|
|
|
});
|
|
|
|
|
2022-06-14 22:04:27 +08:00
|
|
|
let mut builder = DocumentsBatchBuilder::new(Vec::new());
|
|
|
|
builder.append_json_object(doc1.as_object().unwrap()).unwrap();
|
|
|
|
builder.append_json_object(doc2.as_object().unwrap()).unwrap();
|
|
|
|
let vector = builder.into_inner().unwrap();
|
2021-08-31 17:44:15 +08:00
|
|
|
|
2022-07-18 22:08:01 +08:00
|
|
|
let (mut documents, index) = DocumentsBatchReader::from_reader(io::Cursor::new(vector))
|
|
|
|
.unwrap()
|
|
|
|
.into_cursor_and_fields_index();
|
|
|
|
assert_eq!(index.iter().count(), 2);
|
2022-06-14 22:04:27 +08:00
|
|
|
let reader = documents.next_document().unwrap().unwrap();
|
|
|
|
assert_eq!(reader.iter().count(), 1);
|
|
|
|
assert!(documents.next_document().unwrap().is_some());
|
|
|
|
assert!(documents.next_document().unwrap().is_none());
|
2021-08-31 17:44:15 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_nested() {
|
2022-06-14 22:04:27 +08:00
|
|
|
let docs_reader = documents!([{
|
2021-08-31 17:44:15 +08:00
|
|
|
"hello": {
|
|
|
|
"toto": ["hello"]
|
|
|
|
}
|
|
|
|
}]);
|
|
|
|
|
2022-07-18 22:08:01 +08:00
|
|
|
let (mut cursor, _) = docs_reader.into_cursor_and_fields_index();
|
2022-06-14 22:04:27 +08:00
|
|
|
let doc = cursor.next_document().unwrap().unwrap();
|
2021-08-31 17:44:15 +08:00
|
|
|
let nested: Value = serde_json::from_slice(doc.get(0).unwrap()).unwrap();
|
|
|
|
assert_eq!(nested, json!({ "toto": ["hello"] }));
|
|
|
|
}
|
2021-10-11 21:03:52 +08:00
|
|
|
|
|
|
|
#[test]
|
2022-06-14 22:04:27 +08:00
|
|
|
fn out_of_order_json_fields() {
|
2021-10-11 21:03:52 +08:00
|
|
|
let _documents = documents!([
|
|
|
|
{"id": 1,"b": 0},
|
|
|
|
{"id": 2,"a": 0,"b": 0},
|
|
|
|
]);
|
|
|
|
}
|
2022-06-14 22:04:27 +08:00
|
|
|
|
2023-03-09 18:12:49 +08:00
|
|
|
#[test]
|
|
|
|
fn csv_types_dont_panic() {
|
|
|
|
let csv1_content =
|
|
|
|
"id:number,b:boolean,c,d:number\n1,,,\n2,true,doggo,2\n3,false,the best doggo,-2\n4,,\"Hello, World!\",2.5";
|
|
|
|
let csv1 = csv::Reader::from_reader(Cursor::new(csv1_content));
|
|
|
|
|
|
|
|
let mut builder = DocumentsBatchBuilder::new(Vec::new());
|
|
|
|
builder.append_csv(csv1).unwrap();
|
|
|
|
let vector = builder.into_inner().unwrap();
|
|
|
|
|
|
|
|
DocumentsBatchReader::from_reader(Cursor::new(vector)).unwrap();
|
|
|
|
}
|
|
|
|
|
2022-06-14 22:04:27 +08:00
|
|
|
#[test]
|
|
|
|
fn out_of_order_csv_fields() {
|
|
|
|
let csv1_content = "id:number,b\n1,0";
|
|
|
|
let csv1 = csv::Reader::from_reader(Cursor::new(csv1_content));
|
|
|
|
|
|
|
|
let csv2_content = "id:number,a,b\n2,0,0";
|
|
|
|
let csv2 = csv::Reader::from_reader(Cursor::new(csv2_content));
|
|
|
|
|
|
|
|
let mut builder = DocumentsBatchBuilder::new(Vec::new());
|
|
|
|
builder.append_csv(csv1).unwrap();
|
|
|
|
builder.append_csv(csv2).unwrap();
|
|
|
|
let vector = builder.into_inner().unwrap();
|
|
|
|
|
|
|
|
DocumentsBatchReader::from_reader(Cursor::new(vector)).unwrap();
|
|
|
|
}
|
2021-08-31 17:44:15 +08:00
|
|
|
}
|