diff --git a/milli/src/documents/enriched.rs b/milli/src/documents/enriched.rs new file mode 100644 index 000000000..8645e06c4 --- /dev/null +++ b/milli/src/documents/enriched.rs @@ -0,0 +1,103 @@ +use std::fs::File; +use std::{io, str}; + +use obkv::KvReader; + +use super::{ + DocumentsBatchCursor, DocumentsBatchCursorError, DocumentsBatchIndex, DocumentsBatchReader, + Error, +}; +use crate::FieldId; + +/// The `EnrichedDocumentsBatchReader` provides a way to iterate over documents that have +/// been created with a `DocumentsBatchWriter` and, for the enriched data, +/// a simple `grenad::Reader`. +/// +/// The documents are returned in the form of `obkv::Reader` where each field is identified with a +/// `FieldId`. The mapping between the field ids and the field names is done thanks to the index. +pub struct EnrichedDocumentsBatchReader { + documents: DocumentsBatchReader, + external_ids: grenad::ReaderCursor, +} + +impl EnrichedDocumentsBatchReader { + pub fn new( + documents: DocumentsBatchReader, + external_ids: grenad::Reader, + ) -> Result { + if documents.documents_count() as u64 == external_ids.len() { + Ok(EnrichedDocumentsBatchReader { + documents, + external_ids: external_ids.into_cursor()?, + }) + } else { + Err(Error::InvalidEnrichedData) + } + } + + pub fn documents_count(&self) -> u32 { + self.documents.documents_count() + } + + pub fn is_empty(&self) -> bool { + self.documents.is_empty() + } + + pub fn documents_batch_index(&self) -> &DocumentsBatchIndex { + self.documents.documents_batch_index() + } + + /// This method returns a forward cursor over the enriched documents. + pub fn into_cursor(self) -> EnrichedDocumentsBatchCursor { + let EnrichedDocumentsBatchReader { documents, mut external_ids } = self; + external_ids.reset(); + EnrichedDocumentsBatchCursor { documents: documents.into_cursor(), external_ids } + } +} + +#[derive(Debug, Clone, Copy)] +pub struct EnrichedDocument<'a> { + pub document: KvReader<'a, FieldId>, + pub external_id: &'a str, +} + +pub struct EnrichedDocumentsBatchCursor { + documents: DocumentsBatchCursor, + external_ids: grenad::ReaderCursor, +} + +impl EnrichedDocumentsBatchCursor { + pub fn into_reader(self) -> EnrichedDocumentsBatchReader { + let EnrichedDocumentsBatchCursor { documents, external_ids } = self; + EnrichedDocumentsBatchReader { documents: documents.into_reader(), external_ids } + } + + pub fn documents_batch_index(&self) -> &DocumentsBatchIndex { + self.documents.documents_batch_index() + } + + /// Resets the cursor to be able to read from the start again. + pub fn reset(&mut self) { + self.documents.reset(); + self.external_ids.reset(); + } +} + +impl EnrichedDocumentsBatchCursor { + /// Returns the next document, starting from the first one. Subsequent calls to + /// `next_document` advance the document reader until all the documents have been read. + pub fn next_enriched_document( + &mut self, + ) -> Result, DocumentsBatchCursorError> { + let document = self.documents.next_document()?; + let external_id = match self.external_ids.move_on_next()? { + Some((_, bytes)) => Some(str::from_utf8(bytes)?), + None => None, + }; + + match document.zip(external_id) { + Some((document, external_id)) => Ok(Some(EnrichedDocument { document, external_id })), + None => Ok(None), + } + } +} diff --git a/milli/src/documents/mod.rs b/milli/src/documents/mod.rs index 66a05b7b6..43bfc1c20 100644 --- a/milli/src/documents/mod.rs +++ b/milli/src/documents/mod.rs @@ -1,11 +1,14 @@ mod builder; +mod enriched; mod reader; use std::fmt::{self, Debug}; use std::io; +use std::str::Utf8Error; use bimap::BiHashMap; pub use builder::DocumentsBatchBuilder; +pub use enriched::{EnrichedDocument, EnrichedDocumentsBatchCursor, EnrichedDocumentsBatchReader}; use obkv::KvReader; pub use reader::{DocumentsBatchCursor, DocumentsBatchCursorError, DocumentsBatchReader}; use serde::{Deserialize, Serialize}; @@ -87,6 +90,8 @@ impl DocumentsBatchIndex { pub enum Error { ParseFloat { error: std::num::ParseFloatError, line: usize, value: String }, InvalidDocumentFormat, + InvalidEnrichedData, + InvalidUtf8(Utf8Error), Csv(csv::Error), Json(serde_json::Error), Serialize(serde_json::Error), @@ -118,6 +123,12 @@ impl From for Error { } } +impl From for Error { + fn from(other: Utf8Error) -> Self { + Self::InvalidUtf8(other) + } +} + impl fmt::Display for Error { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { @@ -127,6 +138,8 @@ impl fmt::Display for Error { Error::InvalidDocumentFormat => { f.write_str("Invalid document addition format, missing the documents batch index.") } + Error::InvalidEnrichedData => f.write_str("Invalid enriched data."), + Error::InvalidUtf8(e) => write!(f, "{}", e), Error::Io(e) => write!(f, "{}", e), Error::Serialize(e) => write!(f, "{}", e), Error::Grenad(e) => write!(f, "{}", e), diff --git a/milli/src/documents/reader.rs b/milli/src/documents/reader.rs index 720b403b9..7bd6dbd51 100644 --- a/milli/src/documents/reader.rs +++ b/milli/src/documents/reader.rs @@ -1,5 +1,5 @@ use std::convert::TryInto; -use std::{error, fmt, io}; +use std::{error, fmt, io, str}; use obkv::KvReader; @@ -93,19 +93,20 @@ impl DocumentsBatchCursor { /// The possible error thrown by the `DocumentsBatchCursor` when iterating on the documents. #[derive(Debug)] -pub struct DocumentsBatchCursorError { - inner: grenad::Error, +pub enum DocumentsBatchCursorError { + Grenad(grenad::Error), + Utf8(str::Utf8Error), } impl From for DocumentsBatchCursorError { fn from(error: grenad::Error) -> DocumentsBatchCursorError { - DocumentsBatchCursorError { inner: error } + DocumentsBatchCursorError::Grenad(error) } } -impl Into for DocumentsBatchCursorError { - fn into(self) -> grenad::Error { - self.inner +impl From for DocumentsBatchCursorError { + fn from(error: str::Utf8Error) -> DocumentsBatchCursorError { + DocumentsBatchCursorError::Utf8(error) } } @@ -113,6 +114,9 @@ impl error::Error for DocumentsBatchCursorError {} impl fmt::Display for DocumentsBatchCursorError { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - self.inner.fmt(f) + match self { + DocumentsBatchCursorError::Grenad(e) => e.fmt(f), + DocumentsBatchCursorError::Utf8(e) => e.fmt(f), + } } } diff --git a/milli/src/error.rs b/milli/src/error.rs index d9dca287d..0419ceeda 100644 --- a/milli/src/error.rs +++ b/milli/src/error.rs @@ -7,7 +7,7 @@ use rayon::ThreadPoolBuildError; use serde_json::Value; use thiserror::Error; -use crate::documents::DocumentsBatchCursorError; +use crate::documents::{self, DocumentsBatchCursorError}; use crate::{CriterionError, DocumentId, FieldId, Object, SortError}; pub fn is_reserved_keyword(keyword: &str) -> bool { @@ -36,6 +36,8 @@ pub enum InternalError { FieldIdMappingMissingEntry { key: FieldId }, #[error(transparent)] Fst(#[from] fst::Error), + #[error(transparent)] + DocumentsError(#[from] documents::Error), #[error("Invalid compression type have been specified to grenad.")] GrenadInvalidCompressionType, #[error("Invalid grenad file with an invalid version format.")] @@ -185,6 +187,7 @@ macro_rules! error_from_sub_error { error_from_sub_error! { FieldIdMapMissingEntry => InternalError, fst::Error => InternalError, + documents::Error => InternalError, str::Utf8Error => InternalError, ThreadPoolBuildError => InternalError, SerializationError => InternalError, @@ -212,7 +215,10 @@ where impl From for Error { fn from(error: DocumentsBatchCursorError) -> Error { - Error::from(Into::::into(error)) + match error { + DocumentsBatchCursorError::Grenad(e) => Error::from(e), + DocumentsBatchCursorError::Utf8(e) => Error::from(e), + } } } diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index ba1064684..fe3bd1f8f 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -27,7 +27,7 @@ pub use self::helpers::{ }; use self::helpers::{grenad_obkv_into_chunks, GrenadParameters}; pub use self::transform::{Transform, TransformOutput}; -use self::validate::validate_documents_batch; +use self::validate::validate_and_enrich_documents_batch; pub use self::validate::{ extract_float_from_value, validate_document_id, validate_document_id_value, validate_geo_from_json, @@ -141,7 +141,7 @@ where // We check for user errors in this validator and if there is one, we can return // the `IndexDocument` struct as it is valid to send more documents into it. // However, if there is an internal error we throw it away! - let reader = match validate_documents_batch( + let enriched_documents_reader = match validate_and_enrich_documents_batch( self.wtxn, self.index, self.config.autogenerate_docids, @@ -155,7 +155,7 @@ where .transform .as_mut() .expect("Invalid document addition state") - .read_documents(reader, self.wtxn, &self.progress)? + .read_documents(enriched_documents_reader, self.wtxn, &self.progress)? as u64; self.added_documents += indexed_documents; diff --git a/milli/src/update/index_documents/transform.rs b/milli/src/update/index_documents/transform.rs index 38f6dc8ff..4d0a4c311 100644 --- a/milli/src/update/index_documents/transform.rs +++ b/milli/src/update/index_documents/transform.rs @@ -14,7 +14,7 @@ use smartstring::SmartString; use super::helpers::{create_sorter, create_writer, keep_latest_obkv, merge_obkvs, MergeFn}; use super::{IndexDocumentsMethod, IndexerConfig}; -use crate::documents::{DocumentsBatchIndex, DocumentsBatchReader}; +use crate::documents::{DocumentsBatchIndex, EnrichedDocument, EnrichedDocumentsBatchReader}; use crate::error::{Error, InternalError, UserError}; use crate::index::db_name; use crate::update::index_documents::validate_document_id_value; @@ -153,7 +153,7 @@ impl<'a, 'i> Transform<'a, 'i> { pub fn read_documents( &mut self, - reader: DocumentsBatchReader, + reader: EnrichedDocumentsBatchReader, wtxn: &mut heed::RwTxn, progress_callback: F, ) -> Result @@ -189,7 +189,9 @@ impl<'a, 'i> Transform<'a, 'i> { let mut external_id_buffer = Vec::new(); let mut field_buffer: Vec<(u16, Cow<[u8]>)> = Vec::new(); let addition_index = cursor.documents_batch_index().clone(); - while let Some(document) = cursor.next_document()? { + while let Some(enriched_document) = cursor.next_enriched_document()? { + let EnrichedDocument { document, external_id } = enriched_document; + let mut field_buffer_cache = drop_and_reuse(field_buffer); if self.indexer_settings.log_every_n.map_or(false, |len| documents_count % len == 0) { progress_callback(UpdateIndexingStep::RemapDocumentAddition { diff --git a/milli/src/update/index_documents/validate.rs b/milli/src/update/index_documents/validate.rs index 32e8de03f..8b68532cb 100644 --- a/milli/src/update/index_documents/validate.rs +++ b/milli/src/update/index_documents/validate.rs @@ -4,27 +4,28 @@ use std::result::Result as StdResult; use serde_json::Value; -use crate::documents::{DocumentsBatchIndex, DocumentsBatchReader}; +use crate::documents::{DocumentsBatchIndex, DocumentsBatchReader, EnrichedDocumentsBatchReader}; use crate::error::{GeoError, InternalError, UserError}; -use crate::update::index_documents::obkv_to_object; +use crate::update::index_documents::{obkv_to_object, writer_into_reader}; use crate::{FieldId, Index, Object, Result}; /// The symbol used to define levels in a nested primary key. const PRIMARY_KEY_SPLIT_SYMBOL: char = '.'; -/// This function validates a documents by checking that: +/// This function validates and enrich the documents by checking that: /// - we can infer a primary key, -/// - all the documents id exist and, +/// - all the documents id exist and are extracted, /// - the validity of them but also, /// - the validity of the `_geo` field depending on the settings. -pub fn validate_documents_batch( +pub fn validate_and_enrich_documents_batch( rtxn: &heed::RoTxn, index: &Index, autogenerate_docids: bool, reader: DocumentsBatchReader, -) -> Result, UserError>> { +) -> Result, UserError>> { let mut cursor = reader.into_cursor(); let mut documents_batch_index = cursor.documents_batch_index().clone(); + let mut external_ids = tempfile::tempfile().map(grenad::Writer::new)?; // The primary key *field id* that has already been set for this index or the one // we will guess by searching for the first key that contains "id" as a substring. @@ -82,6 +83,8 @@ pub fn validate_documents_batch( Err(user_error) => return Ok(Err(user_error)), }; + external_ids.insert(count.to_be_bytes(), &document_id)?; + if let Some(geo_value) = geo_field_id.and_then(|fid| document.get(fid)) { if let Err(user_error) = validate_geo_from_json(Value::from(document_id), geo_value)? { return Ok(Err(UserError::from(user_error))); @@ -90,7 +93,10 @@ pub fn validate_documents_batch( count += 1; } - Ok(Ok(cursor.into_reader())) + let external_ids = writer_into_reader(external_ids)?; + let reader = EnrichedDocumentsBatchReader::new(cursor.into_reader(), external_ids)?; + + Ok(Ok(reader)) } /// Retrieve the document id after validating it, returning a `UserError` @@ -100,7 +106,7 @@ fn fetch_document_id( documents_batch_index: &DocumentsBatchIndex, primary_key: PrimaryKey, autogenerate_docids: bool, - count: usize, + count: u32, ) -> Result> { match primary_key { PrimaryKey::Flat { name: primary_key, field_id: primary_key_id } => {