diff --git a/milli/src/documents/builder.rs b/milli/src/documents/builder.rs index 391175f31..159afb8d9 100644 --- a/milli/src/documents/builder.rs +++ b/milli/src/documents/builder.rs @@ -1,157 +1,159 @@ -use std::collections::BTreeMap; -use std::io; -use std::io::{Cursor, Write}; +use std::io::{self, Write}; -use byteorder::{BigEndian, WriteBytesExt}; -use serde::Deserializer; -use serde_json::Value; +use grenad::{CompressionType, WriterBuilder}; +use serde_json::{to_writer, Map, Value}; -use super::serde_impl::DocumentVisitor; -use super::{ByteCounter, DocumentsBatchIndex, DocumentsMetadata, Error}; -use crate::FieldId; +use super::{DocumentsBatchIndex, Error, DOCUMENTS_BATCH_INDEX_KEY}; /// The `DocumentsBatchBuilder` provides a way to build a documents batch in the intermediary /// format used by milli. /// -/// The writer used by the DocumentBatchBuilder can be read using a `DocumentBatchReader` to -/// iterate over the documents. +/// The writer used by the `DocumentsBatchBuilder` can be read using a `DocumentsBatchReader` +/// to iterate over the documents. /// /// ## example: /// ``` -/// use milli::documents::DocumentBatchBuilder; /// use serde_json::json; -/// use std::io::Cursor; +/// use milli::documents::DocumentsBatchBuilder; /// -/// let json = r##"{"id": 1, "name": "foo"}"##; -/// let mut writer = Cursor::new(Vec::new()); -/// let mut builder = DocumentBatchBuilder::new(&mut writer).unwrap(); -/// builder.extend_from_json(&mut json.as_bytes()).unwrap(); -/// builder.finish().unwrap(); +/// let json = json!({ "id": 1, "name": "foo" }); +/// +/// let mut builder = DocumentsBatchBuilder::new(Vec::new()); +/// builder.append_json_object(json.as_object().unwrap()).unwrap(); +/// let _vector = builder.into_inner().unwrap(); /// ``` -pub struct DocumentBatchBuilder { - inner: ByteCounter, - index: DocumentsBatchIndex, +pub struct DocumentsBatchBuilder { + /// The inner grenad writer, the last value must always be the `DocumentsBatchIndex`. + writer: grenad::Writer, + /// A map that creates the relation between field ids and field names. + fields_index: DocumentsBatchIndex, + /// The number of documents that were added to this builder, + /// it doesn't take the primary key of the documents into account at this point. + documents_count: u32, + + /// A buffer to store a temporary obkv buffer and avoid reallocating. obkv_buffer: Vec, + /// A buffer to serialize the values and avoid reallocating, + /// serialized values are stored in an obkv. value_buffer: Vec, - values: BTreeMap, - count: usize, } -impl DocumentBatchBuilder { - pub fn new(writer: W) -> Result { - let index = DocumentsBatchIndex::default(); - let mut writer = ByteCounter::new(writer); - // add space to write the offset of the metadata at the end of the writer - writer.write_u64::(0)?; - - Ok(Self { - inner: writer, - index, +impl DocumentsBatchBuilder { + pub fn new(writer: W) -> DocumentsBatchBuilder { + DocumentsBatchBuilder { + writer: WriterBuilder::new().compression_type(CompressionType::None).build(writer), + fields_index: DocumentsBatchIndex::default(), + documents_count: 0, obkv_buffer: Vec::new(), value_buffer: Vec::new(), - values: BTreeMap::new(), - count: 0, - }) + } } - /// Returns the number of documents that have been written to the builder. - pub fn len(&self) -> usize { - self.count + /// Returns the number of documents inserted into this builder. + pub fn documents_count(&self) -> u32 { + self.documents_count } - /// This method must be called after the document addition is terminated. It will put the - /// metadata at the end of the file, and write the metadata offset at the beginning on the - /// file. - pub fn finish(self) -> Result { - let Self { inner: ByteCounter { mut writer, count: offset }, index, count, .. } = self; + /// Appends a new JSON object into the batch and updates the `DocumentsBatchIndex` accordingly. + pub fn append_json_object(&mut self, object: &Map) -> io::Result<()> { + // Make sure that we insert the fields ids in order as the obkv writer has this requirement. + let mut fields_ids: Vec<_> = object.keys().map(|k| self.fields_index.insert(&k)).collect(); + fields_ids.sort_unstable(); - let meta = DocumentsMetadata { count, index }; + self.obkv_buffer.clear(); + let mut writer = obkv::KvWriter::new(&mut self.obkv_buffer); + for field_id in fields_ids { + let key = self.fields_index.name(field_id).unwrap(); + self.value_buffer.clear(); + to_writer(&mut self.value_buffer, &object[key])?; + writer.insert(field_id, &self.value_buffer)?; + } - bincode::serialize_into(&mut writer, &meta)?; + let internal_id = self.documents_count.to_be_bytes(); + let document_bytes = writer.into_inner()?; + self.writer.insert(internal_id, &document_bytes)?; + self.documents_count += 1; - writer.seek(io::SeekFrom::Start(0))?; - writer.write_u64::(offset as u64)?; - - writer.flush()?; - - Ok(count) + Ok(()) } - /// Extends the builder with json documents from a reader. - pub fn extend_from_json(&mut self, reader: R) -> Result<(), Error> { - let mut de = serde_json::Deserializer::from_reader(reader); - - let mut visitor = DocumentVisitor { - inner: &mut self.inner, - index: &mut self.index, - obkv_buffer: &mut self.obkv_buffer, - value_buffer: &mut self.value_buffer, - values: &mut self.values, - count: &mut self.count, - }; - - de.deserialize_any(&mut visitor).map_err(Error::JsonError)? - } - - /// Creates a builder from a reader of CSV documents. - /// - /// Since all fields in a csv documents are guaranteed to be ordered, we are able to perform - /// optimisations, and extending from another CSV is not allowed. - pub fn from_csv(reader: R, writer: W) -> Result { - let mut this = Self::new(writer)?; - // Ensure that this is the first and only addition made with this builder - debug_assert!(this.index.is_empty()); - - let mut records = csv::Reader::from_reader(reader); - - let headers = records + /// Appends a new CSV file into the batch and updates the `DocumentsBatchIndex` accordingly. + pub fn append_csv(&mut self, mut reader: csv::Reader) -> Result<(), Error> { + // Make sure that we insert the fields ids in order as the obkv writer has this requirement. + let mut typed_fields_ids: Vec<_> = reader .headers()? .into_iter() .map(parse_csv_header) - .map(|(k, t)| (this.index.insert(k), t)) - .collect::>(); + .map(|(k, t)| (self.fields_index.insert(k), t)) + .enumerate() + .collect(); + typed_fields_ids.sort_unstable_by_key(|(_, (fid, _))| *fid); - for (i, record) in records.into_records().enumerate() { - let record = record?; - this.obkv_buffer.clear(); - let mut writer = obkv::KvWriter::new(&mut this.obkv_buffer); - for (value, (fid, ty)) in record.into_iter().zip(headers.iter()) { - let value = match ty { + let mut record = csv::StringRecord::new(); + let mut line = 0; + while reader.read_record(&mut record)? { + // We increment here and not at the end of the while loop to take + // the header offset into account. + line += 1; + + self.obkv_buffer.clear(); + let mut writer = obkv::KvWriter::new(&mut self.obkv_buffer); + + for (i, (field_id, type_)) in typed_fields_ids.iter() { + self.value_buffer.clear(); + + let value = &record[*i]; + match type_ { AllowedType::Number => { if value.trim().is_empty() { - Value::Null + to_writer(&mut self.value_buffer, &Value::Null)?; } else { - value.trim().parse::().map(Value::from).map_err(|error| { - Error::ParseFloat { - error, - // +1 for the header offset. - line: i + 1, - value: value.to_string(), + match value.trim().parse::() { + Ok(float) => { + to_writer(&mut self.value_buffer, &float)?; } - })? + Err(error) => { + return Err(Error::ParseFloat { + error, + line, + value: value.to_string(), + }); + } + } } } AllowedType::String => { if value.is_empty() { - Value::Null + to_writer(&mut self.value_buffer, &Value::Null)?; } else { - Value::String(value.to_string()) + to_writer(&mut self.value_buffer, value)?; } } - }; + } - this.value_buffer.clear(); - serde_json::to_writer(Cursor::new(&mut this.value_buffer), &value)?; - writer.insert(*fid, &this.value_buffer)?; + // We insert into the obkv writer the value buffer that has been filled just above. + writer.insert(*field_id, &self.value_buffer)?; } - this.inner.write_u32::(this.obkv_buffer.len() as u32)?; - this.inner.write_all(&this.obkv_buffer)?; - - this.count += 1; + let internal_id = self.documents_count.to_be_bytes(); + let document_bytes = writer.into_inner()?; + self.writer.insert(internal_id, &document_bytes)?; + self.documents_count += 1; } - Ok(this) + Ok(()) + } + + /// Flushes the content on disk and stores the final version of the `DocumentsBatchIndex`. + pub fn into_inner(mut self) -> io::Result { + let DocumentsBatchBuilder { mut writer, fields_index, .. } = self; + + // We serialize and insert the `DocumentsBatchIndex` as the last key of the grenad writer. + self.value_buffer.clear(); + to_writer(&mut self.value_buffer, &fields_index)?; + writer.insert(DOCUMENTS_BATCH_INDEX_KEY, &self.value_buffer)?; + + writer.into_inner() } } diff --git a/milli/src/documents/mod.rs b/milli/src/documents/mod.rs index 09f15901d..bd0afc6e4 100644 --- a/milli/src/documents/mod.rs +++ b/milli/src/documents/mod.rs @@ -1,24 +1,22 @@ mod builder; -/// The documents module defines an intermediary document format that milli uses for indexation, and -/// provides an API to easily build and read such documents. -/// -/// The `DocumentBatchBuilder` interface allows to write batches of documents to a writer, that can -/// later be read by milli using the `DocumentBatchReader` interface. mod reader; -mod serde_impl; use std::fmt::{self, Debug}; use std::io; use bimap::BiHashMap; -pub use builder::DocumentBatchBuilder; -pub use reader::DocumentBatchReader; +pub use builder::DocumentsBatchBuilder; +pub use reader::{DocumentsBatchCursor, DocumentsBatchReader}; use serde::{Deserialize, Serialize}; use crate::FieldId; +/// The key that is used to store the `DocumentsBatchIndex` datastructure, +/// it is the absolute last key of the list. +const DOCUMENTS_BATCH_INDEX_KEY: [u8; 8] = u64::MAX.to_be_bytes(); + /// A bidirectional map that links field ids to their name in a document batch. -#[derive(Default, Debug, Serialize, Deserialize)] +#[derive(Default, Clone, Debug, Serialize, Deserialize)] pub struct DocumentsBatchIndex(pub BiHashMap); impl DocumentsBatchIndex { @@ -46,8 +44,8 @@ impl DocumentsBatchIndex { self.0.iter() } - pub fn name(&self, id: FieldId) -> Option<&String> { - self.0.get_by_left(&id) + pub fn name(&self, id: FieldId) -> Option<&str> { + self.0.get_by_left(&id).map(AsRef::as_ref) } pub fn recreate_json( @@ -69,50 +67,20 @@ impl DocumentsBatchIndex { } } -#[derive(Debug, Serialize, Deserialize)] -struct DocumentsMetadata { - count: usize, - index: DocumentsBatchIndex, -} - -pub struct ByteCounter { - count: usize, - writer: W, -} - -impl ByteCounter { - fn new(writer: W) -> Self { - Self { count: 0, writer } - } -} - -impl io::Write for ByteCounter { - fn write(&mut self, buf: &[u8]) -> io::Result { - let count = self.writer.write(buf)?; - self.count += count; - Ok(count) - } - - fn flush(&mut self) -> io::Result<()> { - self.writer.flush() - } -} - #[derive(Debug)] pub enum Error { ParseFloat { error: std::num::ParseFloatError, line: usize, value: String }, InvalidDocumentFormat, - Custom(String), - JsonError(serde_json::Error), - CsvError(csv::Error), - Serialize(bincode::Error), + Csv(csv::Error), + Json(serde_json::Error), + Serialize(serde_json::Error), + Grenad(grenad::Error), Io(io::Error), - DocumentTooLarge, } impl From for Error { fn from(e: csv::Error) -> Self { - Self::CsvError(e) + Self::Csv(e) } } @@ -122,15 +90,15 @@ impl From for Error { } } -impl From for Error { - fn from(other: bincode::Error) -> Self { - Self::Serialize(other) +impl From for Error { + fn from(other: serde_json::Error) -> Self { + Self::Json(other) } } -impl From for Error { - fn from(other: serde_json::Error) -> Self { - Self::JsonError(other) +impl From for Error { + fn from(other: grenad::Error) -> Self { + Self::Grenad(other) } } @@ -140,13 +108,14 @@ impl fmt::Display for Error { Error::ParseFloat { error, line, value } => { write!(f, "Error parsing number {:?} at line {}: {}", value, line, error) } - Error::Custom(s) => write!(f, "Unexpected serialization error: {}", s), - Error::InvalidDocumentFormat => f.write_str("Invalid document addition format."), - Error::JsonError(err) => write!(f, "Couldn't serialize document value: {}", err), + Error::InvalidDocumentFormat => { + f.write_str("Invalid document addition format, missing the documents batch index.") + } Error::Io(e) => write!(f, "{}", e), - Error::DocumentTooLarge => f.write_str("Provided document is too large (>2Gib)"), Error::Serialize(e) => write!(f, "{}", e), - Error::CsvError(e) => write!(f, "{}", e), + Error::Grenad(e) => write!(f, "{}", e), + Error::Csv(e) => write!(f, "{}", e), + Error::Json(e) => write!(f, "{}", e), } } } @@ -158,15 +127,25 @@ impl std::error::Error for Error {} macro_rules! documents { ($data:tt) => {{ let documents = serde_json::json!($data); - let mut writer = std::io::Cursor::new(Vec::new()); - let mut builder = crate::documents::DocumentBatchBuilder::new(&mut writer).unwrap(); - let documents = serde_json::to_vec(&documents).unwrap(); - builder.extend_from_json(std::io::Cursor::new(documents)).unwrap(); - builder.finish().unwrap(); + let documents = match documents { + object @ serde_json::Value::Object(_) => vec![object], + serde_json::Value::Array(objects) => objects, + invalid => { + panic!("an array of objects must be specified, {:#?} is not an array", invalid) + } + }; - writer.set_position(0); + let mut builder = crate::documents::DocumentsBatchBuilder::new(Vec::new()); + for document in documents { + let object = match document { + serde_json::Value::Object(object) => object, + invalid => panic!("an object must be specified, {:#?} is not an object", invalid), + }; + builder.append_json_object(&object).unwrap(); + } - crate::documents::DocumentBatchReader::from_reader(writer).unwrap() + let vector = builder.into_inner().unwrap(); + crate::documents::DocumentsBatchReader::from_reader(std::io::Cursor::new(vector)).unwrap() }}; } diff --git a/milli/src/documents/reader.rs b/milli/src/documents/reader.rs index 14d7c8ceb..3dff999f5 100644 --- a/milli/src/documents/reader.rs +++ b/milli/src/documents/reader.rs @@ -1,11 +1,9 @@ +use std::convert::TryInto; use std::io; -use std::io::{BufReader, Read}; -use std::mem::size_of; -use byteorder::{BigEndian, ReadBytesExt}; use obkv::KvReader; -use super::{DocumentsBatchIndex, DocumentsMetadata, Error}; +use super::{DocumentsBatchIndex, Error, DOCUMENTS_BATCH_INDEX_KEY}; use crate::FieldId; /// The `DocumentsBatchReader` provides a way to iterate over documents that have been created with @@ -13,63 +11,80 @@ use crate::FieldId; /// /// The documents are returned in the form of `obkv::Reader` where each field is identified with a /// `FieldId`. The mapping between the field ids and the field names is done thanks to the index. -pub struct DocumentBatchReader { - reader: BufReader, - metadata: DocumentsMetadata, - buffer: Vec, - seen_documents: usize, +pub struct DocumentsBatchReader { + cursor: grenad::ReaderCursor, + fields_index: DocumentsBatchIndex, } -impl DocumentBatchReader { +impl DocumentsBatchReader { /// Construct a `DocumentsReader` from a reader. /// - /// It first retrieves the index, then moves to the first document. Subsequent calls to - /// `next_document` advance the document reader until all the documents have been read. - pub fn from_reader(mut reader: R) -> Result { - let mut buffer = Vec::new(); + /// It first retrieves the index, then moves to the first document. Use the `into_cursor` + /// method to iterator over the documents, from the first to the last. + pub fn from_reader(reader: R) -> Result { + let reader = grenad::Reader::new(reader)?; + let mut cursor = reader.into_cursor()?; - let meta_offset = reader.read_u64::()?; - reader.seek(io::SeekFrom::Start(meta_offset))?; - reader.read_to_end(&mut buffer)?; - let metadata: DocumentsMetadata = bincode::deserialize(&buffer)?; + let fields_index = match cursor.move_on_key_equal_to(DOCUMENTS_BATCH_INDEX_KEY)? { + Some((_, value)) => serde_json::from_slice(value).map_err(Error::Serialize)?, + None => return Err(Error::InvalidDocumentFormat), + }; - reader.seek(io::SeekFrom::Start(size_of::() as u64))?; - buffer.clear(); - - let reader = BufReader::new(reader); - - Ok(Self { reader, metadata, buffer, seen_documents: 0 }) + Ok(DocumentsBatchReader { cursor, fields_index }) } - /// Returns the next document in the reader, and wraps it in an `obkv::KvReader`, along with a - /// reference to the addition index. - pub fn next_document_with_index<'a>( - &'a mut self, - ) -> io::Result)>> { - if self.seen_documents < self.metadata.count { - let doc_len = self.reader.read_u32::()?; - self.buffer.resize(doc_len as usize, 0); - self.reader.read_exact(&mut self.buffer)?; - self.seen_documents += 1; - - let reader = KvReader::new(&self.buffer); - Ok(Some((&self.metadata.index, reader))) - } else { - Ok(None) - } - } - - /// Return the fields index for the documents batch. - pub fn index(&self) -> &DocumentsBatchIndex { - &self.metadata.index - } - - /// Returns the number of documents in the reader. - pub fn len(&self) -> usize { - self.metadata.count + pub fn documents_count(&self) -> u32 { + self.cursor.len().saturating_sub(1).try_into().expect("Invalid number of documents") } pub fn is_empty(&self) -> bool { - self.len() == 0 + self.cursor.len().saturating_sub(1) == 0 + } + + pub fn documents_batch_index(&self) -> &DocumentsBatchIndex { + &self.fields_index + } + + /// This method returns a forward cursor over the documents. + pub fn into_cursor(self) -> DocumentsBatchCursor { + let DocumentsBatchReader { cursor, fields_index } = self; + let mut cursor = DocumentsBatchCursor { cursor, fields_index }; + cursor.reset(); + cursor + } +} + +/// A forward cursor over the documents in a `DocumentsBatchReader`. +pub struct DocumentsBatchCursor { + cursor: grenad::ReaderCursor, + fields_index: DocumentsBatchIndex, +} + +impl DocumentsBatchCursor { + pub fn into_reader(self) -> DocumentsBatchReader { + let DocumentsBatchCursor { cursor, fields_index, .. } = self; + DocumentsBatchReader { cursor, fields_index } + } + + pub fn documents_batch_index(&self) -> &DocumentsBatchIndex { + &self.fields_index + } + + /// Resets the cursor to be able to read from the start again. + pub fn reset(&mut self) { + self.cursor.reset(); + } +} + +impl DocumentsBatchCursor { + /// Returns the next document, starting from the first one. Subsequent calls to + /// `next_document` advance the document reader until all the documents have been read. + pub fn next_document(&mut self) -> Result>, grenad::Error> { + match self.cursor.move_on_next()? { + Some((key, value)) if key != DOCUMENTS_BATCH_INDEX_KEY => { + Ok(Some(KvReader::new(value))) + } + _otherwise => Ok(None), + } } } diff --git a/milli/src/documents/serde_impl.rs b/milli/src/documents/serde_impl.rs deleted file mode 100644 index d57bf1ffb..000000000 --- a/milli/src/documents/serde_impl.rs +++ /dev/null @@ -1,134 +0,0 @@ -use std::collections::BTreeMap; -use std::fmt; -use std::io::{Cursor, Write}; - -use byteorder::WriteBytesExt; -use serde::de::{DeserializeSeed, MapAccess, SeqAccess, Visitor}; -use serde::Deserialize; -use serde_json::Value; - -use super::{ByteCounter, DocumentsBatchIndex, Error}; -use crate::FieldId; - -macro_rules! tri { - ($e:expr) => { - match $e { - Ok(r) => r, - Err(e) => return Ok(Err(e.into())), - } - }; -} - -struct FieldIdResolver<'a>(&'a mut DocumentsBatchIndex); - -impl<'a, 'de> DeserializeSeed<'de> for FieldIdResolver<'a> { - type Value = FieldId; - - fn deserialize(self, deserializer: D) -> Result - where - D: serde::Deserializer<'de>, - { - deserializer.deserialize_str(self) - } -} - -impl<'a, 'de> Visitor<'de> for FieldIdResolver<'a> { - type Value = FieldId; - - fn visit_str(self, v: &str) -> Result - where - E: serde::de::Error, - { - Ok(self.0.insert(v)) - } - - fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "a string") - } -} - -struct ValueDeserializer; - -impl<'de> DeserializeSeed<'de> for ValueDeserializer { - type Value = serde_json::Value; - - fn deserialize(self, deserializer: D) -> Result - where - D: serde::Deserializer<'de>, - { - serde_json::Value::deserialize(deserializer) - } -} - -pub struct DocumentVisitor<'a, W> { - pub inner: &'a mut ByteCounter, - pub index: &'a mut DocumentsBatchIndex, - pub obkv_buffer: &'a mut Vec, - pub value_buffer: &'a mut Vec, - pub values: &'a mut BTreeMap, - pub count: &'a mut usize, -} - -impl<'a, 'de, W: Write> Visitor<'de> for &mut DocumentVisitor<'a, W> { - /// This Visitor value is nothing, since it write the value to a file. - type Value = Result<(), Error>; - - fn visit_seq(self, mut seq: A) -> Result - where - A: SeqAccess<'de>, - { - while let Some(v) = seq.next_element_seed(&mut *self)? { - tri!(v) - } - - Ok(Ok(())) - } - - fn visit_map(self, mut map: A) -> Result - where - A: MapAccess<'de>, - { - while let Some((key, value)) = - map.next_entry_seed(FieldIdResolver(&mut *self.index), ValueDeserializer)? - { - self.values.insert(key, value); - } - - self.obkv_buffer.clear(); - let mut obkv = obkv::KvWriter::new(Cursor::new(&mut *self.obkv_buffer)); - for (key, value) in self.values.iter() { - self.value_buffer.clear(); - // This is guaranteed to work - tri!(serde_json::to_writer(Cursor::new(&mut *self.value_buffer), value)); - tri!(obkv.insert(*key, &self.value_buffer)); - } - - let reader = tri!(obkv.into_inner()).into_inner(); - - tri!(self.inner.write_u32::(reader.len() as u32)); - tri!(self.inner.write_all(reader)); - - *self.count += 1; - self.values.clear(); - - Ok(Ok(())) - } - - fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "a documents, or a sequence of documents.") - } -} - -impl<'a, 'de, W> DeserializeSeed<'de> for &mut DocumentVisitor<'a, W> -where - W: Write, -{ - type Value = Result<(), Error>; - - fn deserialize(self, deserializer: D) -> Result - where - D: serde::Deserializer<'de>, - { - deserializer.deserialize_map(self) - } -}