mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-01-18 17:11:15 +08:00
Expose the DocumentId
struct to be sure to inject the generated ids
This commit is contained in:
parent
d1a4da9812
commit
0bbcc7b180
@ -7,6 +7,7 @@ use super::{
|
|||||||
DocumentsBatchCursor, DocumentsBatchCursorError, DocumentsBatchIndex, DocumentsBatchReader,
|
DocumentsBatchCursor, DocumentsBatchCursorError, DocumentsBatchIndex, DocumentsBatchReader,
|
||||||
Error,
|
Error,
|
||||||
};
|
};
|
||||||
|
use crate::update::DocumentId;
|
||||||
use crate::FieldId;
|
use crate::FieldId;
|
||||||
|
|
||||||
/// The `EnrichedDocumentsBatchReader` provides a way to iterate over documents that have
|
/// The `EnrichedDocumentsBatchReader` provides a way to iterate over documents that have
|
||||||
@ -66,10 +67,10 @@ impl<R: io::Read + io::Seek> EnrichedDocumentsBatchReader<R> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Copy)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct EnrichedDocument<'a> {
|
pub struct EnrichedDocument<'a> {
|
||||||
pub document: KvReader<'a, FieldId>,
|
pub document: KvReader<'a, FieldId>,
|
||||||
pub external_id: &'a str,
|
pub document_id: DocumentId,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct EnrichedDocumentsBatchCursor<R> {
|
pub struct EnrichedDocumentsBatchCursor<R> {
|
||||||
@ -110,13 +111,13 @@ impl<R: io::Read + io::Seek> EnrichedDocumentsBatchCursor<R> {
|
|||||||
&mut self,
|
&mut self,
|
||||||
) -> Result<Option<EnrichedDocument>, DocumentsBatchCursorError> {
|
) -> Result<Option<EnrichedDocument>, DocumentsBatchCursorError> {
|
||||||
let document = self.documents.next_document()?;
|
let document = self.documents.next_document()?;
|
||||||
let external_id = match self.external_ids.move_on_next()? {
|
let document_id = match self.external_ids.move_on_next()? {
|
||||||
Some((_, bytes)) => Some(str::from_utf8(bytes)?),
|
Some((_, bytes)) => serde_json::from_slice(bytes).map(Some)?,
|
||||||
None => None,
|
None => None,
|
||||||
};
|
};
|
||||||
|
|
||||||
match document.zip(external_id) {
|
match document.zip(document_id) {
|
||||||
Some((document, external_id)) => Ok(Some(EnrichedDocument { document, external_id })),
|
Some((document, document_id)) => Ok(Some(EnrichedDocument { document, document_id })),
|
||||||
None => Ok(None),
|
None => Ok(None),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
use std::convert::TryInto;
|
use std::convert::TryInto;
|
||||||
use std::{error, fmt, io, str};
|
use std::{error, fmt, io};
|
||||||
|
|
||||||
use obkv::KvReader;
|
use obkv::KvReader;
|
||||||
|
|
||||||
@ -95,7 +95,7 @@ impl<R: io::Read + io::Seek> DocumentsBatchCursor<R> {
|
|||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum DocumentsBatchCursorError {
|
pub enum DocumentsBatchCursorError {
|
||||||
Grenad(grenad::Error),
|
Grenad(grenad::Error),
|
||||||
Utf8(str::Utf8Error),
|
SerdeJson(serde_json::Error),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<grenad::Error> for DocumentsBatchCursorError {
|
impl From<grenad::Error> for DocumentsBatchCursorError {
|
||||||
@ -104,9 +104,9 @@ impl From<grenad::Error> for DocumentsBatchCursorError {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<str::Utf8Error> for DocumentsBatchCursorError {
|
impl From<serde_json::Error> for DocumentsBatchCursorError {
|
||||||
fn from(error: str::Utf8Error) -> DocumentsBatchCursorError {
|
fn from(error: serde_json::Error) -> DocumentsBatchCursorError {
|
||||||
DocumentsBatchCursorError::Utf8(error)
|
DocumentsBatchCursorError::SerdeJson(error)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -116,7 +116,7 @@ impl fmt::Display for DocumentsBatchCursorError {
|
|||||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
match self {
|
match self {
|
||||||
DocumentsBatchCursorError::Grenad(e) => e.fmt(f),
|
DocumentsBatchCursorError::Grenad(e) => e.fmt(f),
|
||||||
DocumentsBatchCursorError::Utf8(e) => e.fmt(f),
|
DocumentsBatchCursorError::SerdeJson(e) => e.fmt(f),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -217,7 +217,7 @@ impl From<DocumentsBatchCursorError> for Error {
|
|||||||
fn from(error: DocumentsBatchCursorError) -> Error {
|
fn from(error: DocumentsBatchCursorError) -> Error {
|
||||||
match error {
|
match error {
|
||||||
DocumentsBatchCursorError::Grenad(e) => Error::from(e),
|
DocumentsBatchCursorError::Grenad(e) => Error::from(e),
|
||||||
DocumentsBatchCursorError::Utf8(e) => Error::from(e),
|
DocumentsBatchCursorError::SerdeJson(e) => Error::from(InternalError::from(e)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2,6 +2,7 @@ use std::io::{Read, Seek};
|
|||||||
use std::result::Result as StdResult;
|
use std::result::Result as StdResult;
|
||||||
use std::{fmt, iter};
|
use std::{fmt, iter};
|
||||||
|
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
|
|
||||||
use crate::documents::{DocumentsBatchIndex, DocumentsBatchReader, EnrichedDocumentsBatchReader};
|
use crate::documents::{DocumentsBatchIndex, DocumentsBatchReader, EnrichedDocumentsBatchReader};
|
||||||
@ -89,14 +90,15 @@ pub fn enrich_documents_batch<R: Read + Seek>(
|
|||||||
Err(user_error) => return Ok(Err(user_error)),
|
Err(user_error) => return Ok(Err(user_error)),
|
||||||
};
|
};
|
||||||
|
|
||||||
external_ids.insert(count.to_be_bytes(), document_id.value())?;
|
|
||||||
|
|
||||||
if let Some(geo_value) = geo_field_id.and_then(|fid| document.get(fid)) {
|
if let Some(geo_value) = geo_field_id.and_then(|fid| document.get(fid)) {
|
||||||
if let Err(user_error) = validate_geo_from_json(&document_id, geo_value)? {
|
if let Err(user_error) = validate_geo_from_json(&document_id, geo_value)? {
|
||||||
return Ok(Err(UserError::from(user_error)));
|
return Ok(Err(UserError::from(user_error)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let document_id = serde_json::to_vec(&document_id).map_err(InternalError::SerdeJson)?;
|
||||||
|
external_ids.insert(count.to_be_bytes(), document_id)?;
|
||||||
|
|
||||||
count += 1;
|
count += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -210,7 +212,7 @@ impl PrimaryKey<'_> {
|
|||||||
///
|
///
|
||||||
/// In case the document id has been auto-generated, the document nth is kept to help
|
/// In case the document id has been auto-generated, the document nth is kept to help
|
||||||
/// users debug if there is an issue with the document itself.
|
/// users debug if there is an issue with the document itself.
|
||||||
#[derive(Clone)]
|
#[derive(Serialize, Deserialize, Clone)]
|
||||||
pub enum DocumentId {
|
pub enum DocumentId {
|
||||||
Retrieved { value: String },
|
Retrieved { value: String },
|
||||||
Generated { value: String, document_nth: u32 },
|
Generated { value: String, document_nth: u32 },
|
||||||
@ -225,16 +227,20 @@ impl DocumentId {
|
|||||||
DocumentId::Generated { value, document_nth }
|
DocumentId::Generated { value, document_nth }
|
||||||
}
|
}
|
||||||
|
|
||||||
fn value(&self) -> &str {
|
fn debug(&self) -> String {
|
||||||
|
format!("{:?}", self)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn is_generated(&self) -> bool {
|
||||||
|
matches!(self, DocumentId::Generated { .. })
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn value(&self) -> &str {
|
||||||
match self {
|
match self {
|
||||||
DocumentId::Retrieved { value } => value,
|
DocumentId::Retrieved { value } => value,
|
||||||
DocumentId::Generated { value, .. } => value,
|
DocumentId::Generated { value, .. } => value,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn debug(&self) -> String {
|
|
||||||
format!("{:?}", self)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl fmt::Debug for DocumentId {
|
impl fmt::Debug for DocumentId {
|
||||||
|
@ -22,7 +22,7 @@ use typed_chunk::{write_typed_chunk_into_index, TypedChunk};
|
|||||||
use self::enrich::enrich_documents_batch;
|
use self::enrich::enrich_documents_batch;
|
||||||
pub use self::enrich::{
|
pub use self::enrich::{
|
||||||
extract_float_from_value, validate_document_id, validate_document_id_value,
|
extract_float_from_value, validate_document_id, validate_document_id_value,
|
||||||
validate_geo_from_json,
|
validate_geo_from_json, DocumentId,
|
||||||
};
|
};
|
||||||
pub use self::helpers::{
|
pub use self::helpers::{
|
||||||
as_cloneable_grenad, create_sorter, create_writer, fst_stream_into_hashset,
|
as_cloneable_grenad, create_sorter, create_writer, fst_stream_into_hashset,
|
||||||
|
@ -153,8 +153,9 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||||||
let mapping = create_fields_mapping(&mut self.fields_ids_map, fields_index)?;
|
let mapping = create_fields_mapping(&mut self.fields_ids_map, fields_index)?;
|
||||||
|
|
||||||
let primary_key = cursor.primary_key().to_string();
|
let primary_key = cursor.primary_key().to_string();
|
||||||
self.fields_ids_map.insert(&primary_key).ok_or(UserError::AttributeLimitReached)?;
|
|
||||||
let primary_key_id_nested = primary_key.contains('.');
|
let primary_key_id_nested = primary_key.contains('.');
|
||||||
|
let primary_key_id =
|
||||||
|
self.fields_ids_map.insert(&primary_key).ok_or(UserError::AttributeLimitReached)?;
|
||||||
|
|
||||||
let mut flattened_document = None;
|
let mut flattened_document = None;
|
||||||
let mut obkv_buffer = Vec::new();
|
let mut obkv_buffer = Vec::new();
|
||||||
@ -162,7 +163,7 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||||||
let mut documents_count = 0;
|
let mut documents_count = 0;
|
||||||
let mut field_buffer: Vec<(u16, Cow<[u8]>)> = Vec::new();
|
let mut field_buffer: Vec<(u16, Cow<[u8]>)> = Vec::new();
|
||||||
while let Some(enriched_document) = cursor.next_enriched_document()? {
|
while let Some(enriched_document) = cursor.next_enriched_document()? {
|
||||||
let EnrichedDocument { document, external_id } = enriched_document;
|
let EnrichedDocument { document, document_id } = enriched_document;
|
||||||
|
|
||||||
let mut field_buffer_cache = drop_and_reuse(field_buffer);
|
let mut field_buffer_cache = drop_and_reuse(field_buffer);
|
||||||
if self.indexer_settings.log_every_n.map_or(false, |len| documents_count % len == 0) {
|
if self.indexer_settings.log_every_n.map_or(false, |len| documents_count % len == 0) {
|
||||||
@ -171,6 +172,14 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// When the document id has been auto-generated by the `enrich_documents_batch`
|
||||||
|
// we must insert this document id into the remaped document.
|
||||||
|
let external_id = document_id.value();
|
||||||
|
if document_id.is_generated() {
|
||||||
|
let docid = serde_json::to_vec(external_id).map_err(InternalError::SerdeJson)?;
|
||||||
|
field_buffer_cache.push((primary_key_id, Cow::from(docid)));
|
||||||
|
}
|
||||||
|
|
||||||
for (k, v) in document.iter() {
|
for (k, v) in document.iter() {
|
||||||
let mapped_id =
|
let mapped_id =
|
||||||
*mapping.get(&k).ok_or(InternalError::FieldIdMappingMissingEntry { key: k })?;
|
*mapping.get(&k).ok_or(InternalError::FieldIdMappingMissingEntry { key: k })?;
|
||||||
|
@ -3,7 +3,7 @@ pub use self::clear_documents::ClearDocuments;
|
|||||||
pub use self::delete_documents::{DeleteDocuments, DocumentDeletionResult};
|
pub use self::delete_documents::{DeleteDocuments, DocumentDeletionResult};
|
||||||
pub use self::facets::Facets;
|
pub use self::facets::Facets;
|
||||||
pub use self::index_documents::{
|
pub use self::index_documents::{
|
||||||
DocumentAdditionResult, IndexDocuments, IndexDocumentsConfig, IndexDocumentsMethod,
|
DocumentAdditionResult, DocumentId, IndexDocuments, IndexDocumentsConfig, IndexDocumentsMethod,
|
||||||
};
|
};
|
||||||
pub use self::indexer_config::IndexerConfig;
|
pub use self::indexer_config::IndexerConfig;
|
||||||
pub use self::settings::{Setting, Settings};
|
pub use self::settings::{Setting, Settings};
|
||||||
|
Loading…
Reference in New Issue
Block a user