mirror of
https://github.com/meilisearch/meilisearch.git
synced 2024-11-22 18:17:39 +08:00
Move functions to deserialize documents to milli
This commit is contained in:
parent
1d9caa11fd
commit
de99e52474
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -2145,6 +2145,7 @@ name = "meilisearch-types"
|
|||||||
version = "0.28.0"
|
version = "0.28.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"actix-web",
|
"actix-web",
|
||||||
|
"milli",
|
||||||
"proptest",
|
"proptest",
|
||||||
"proptest-derive",
|
"proptest-derive",
|
||||||
"serde",
|
"serde",
|
||||||
|
@ -1,138 +0,0 @@
|
|||||||
use std::borrow::Borrow;
|
|
||||||
use std::fmt::{self, Debug, Display};
|
|
||||||
use std::io::{self, BufRead, Seek, Write};
|
|
||||||
|
|
||||||
use meilisearch_types::error::{Code, ErrorCode};
|
|
||||||
use meilisearch_types::internal_error;
|
|
||||||
use milli::documents::{DocumentsBatchBuilder, Error};
|
|
||||||
|
|
||||||
type Result<T> = std::result::Result<T, DocumentFormatError>;
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub enum PayloadType {
|
|
||||||
Ndjson,
|
|
||||||
Json,
|
|
||||||
Csv,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl fmt::Display for PayloadType {
|
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
||||||
match self {
|
|
||||||
PayloadType::Ndjson => f.write_str("ndjson"),
|
|
||||||
PayloadType::Json => f.write_str("json"),
|
|
||||||
PayloadType::Csv => f.write_str("csv"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub enum DocumentFormatError {
|
|
||||||
Internal(Box<dyn std::error::Error + Send + Sync + 'static>),
|
|
||||||
MalformedPayload(Error, PayloadType),
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Display for DocumentFormatError {
|
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
||||||
match self {
|
|
||||||
Self::Internal(e) => write!(f, "An internal error has occurred: `{}`.", e),
|
|
||||||
Self::MalformedPayload(me, b) => match me.borrow() {
|
|
||||||
Error::Json(se) => {
|
|
||||||
// https://github.com/meilisearch/meilisearch/issues/2107
|
|
||||||
// The user input maybe insanely long. We need to truncate it.
|
|
||||||
let mut serde_msg = se.to_string();
|
|
||||||
let ellipsis = "...";
|
|
||||||
if serde_msg.len() > 100 + ellipsis.len() {
|
|
||||||
serde_msg.replace_range(50..serde_msg.len() - 85, ellipsis);
|
|
||||||
}
|
|
||||||
|
|
||||||
write!(
|
|
||||||
f,
|
|
||||||
"The `{}` payload provided is malformed. `Couldn't serialize document value: {}`.",
|
|
||||||
b, serde_msg
|
|
||||||
)
|
|
||||||
}
|
|
||||||
_ => write!(f, "The `{}` payload provided is malformed: `{}`.", b, me),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl std::error::Error for DocumentFormatError {}
|
|
||||||
|
|
||||||
impl From<(PayloadType, Error)> for DocumentFormatError {
|
|
||||||
fn from((ty, error): (PayloadType, Error)) -> Self {
|
|
||||||
match error {
|
|
||||||
Error::Io(e) => Self::Internal(Box::new(e)),
|
|
||||||
e => Self::MalformedPayload(e, ty),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ErrorCode for DocumentFormatError {
|
|
||||||
fn error_code(&self) -> Code {
|
|
||||||
match self {
|
|
||||||
DocumentFormatError::Internal(_) => Code::Internal,
|
|
||||||
DocumentFormatError::MalformedPayload(_, _) => Code::MalformedPayload,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
internal_error!(DocumentFormatError: io::Error);
|
|
||||||
|
|
||||||
/// Reads CSV from input and write an obkv batch to writer.
|
|
||||||
pub fn read_csv(input: impl BufRead, writer: impl Write + Seek) -> Result<usize> {
|
|
||||||
let mut builder = DocumentsBatchBuilder::new(writer);
|
|
||||||
|
|
||||||
let csv = csv::Reader::from_reader(input);
|
|
||||||
builder.append_csv(csv).map_err(|e| (PayloadType::Csv, e))?;
|
|
||||||
|
|
||||||
let count = builder.documents_count();
|
|
||||||
let _ = builder
|
|
||||||
.into_inner()
|
|
||||||
.map_err(Into::into)
|
|
||||||
.map_err(DocumentFormatError::Internal)?;
|
|
||||||
|
|
||||||
Ok(count as usize)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Reads JSON Lines from input and write an obkv batch to writer.
|
|
||||||
pub fn read_ndjson(mut input: impl BufRead, writer: impl Write + Seek) -> Result<usize> {
|
|
||||||
let mut builder = DocumentsBatchBuilder::new(writer);
|
|
||||||
let mut buf = String::with_capacity(1024);
|
|
||||||
while input.read_line(&mut buf)? > 0 {
|
|
||||||
if buf == "\n" {
|
|
||||||
buf.clear();
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
builder
|
|
||||||
.append_unparsed_json_object(&buf)
|
|
||||||
.map_err(Into::into)
|
|
||||||
.map_err(DocumentFormatError::Internal)?;
|
|
||||||
buf.clear();
|
|
||||||
}
|
|
||||||
|
|
||||||
let count = builder.documents_count();
|
|
||||||
let _ = builder
|
|
||||||
.into_inner()
|
|
||||||
.map_err(Into::into)
|
|
||||||
.map_err(DocumentFormatError::Internal)?;
|
|
||||||
|
|
||||||
Ok(count as usize)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Reads JSON from input and write an obkv batch to writer.
|
|
||||||
pub fn read_json(input: impl BufRead, writer: impl Write + Seek) -> Result<usize> {
|
|
||||||
let mut builder = DocumentsBatchBuilder::new(writer);
|
|
||||||
|
|
||||||
builder
|
|
||||||
.append_json(input)
|
|
||||||
.map_err(|e| (PayloadType::Json, e))?;
|
|
||||||
|
|
||||||
let count = builder.documents_count();
|
|
||||||
let _ = builder
|
|
||||||
.into_inner()
|
|
||||||
.map_err(Into::into)
|
|
||||||
.map_err(DocumentFormatError::Internal)?;
|
|
||||||
|
|
||||||
Ok(count as usize)
|
|
||||||
}
|
|
@ -9,8 +9,8 @@ use milli::heed::{EnvOpenOptions, RoTxn};
|
|||||||
use milli::update::{IndexDocumentsConfig, IndexerConfig};
|
use milli::update::{IndexDocumentsConfig, IndexerConfig};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
use crate::document_formats::read_ndjson;
|
|
||||||
use crate::index::updates::apply_settings_to_builder;
|
use crate::index::updates::apply_settings_to_builder;
|
||||||
|
use milli::documents::document_formats::read_ndjson;
|
||||||
|
|
||||||
use super::error::Result;
|
use super::error::Result;
|
||||||
use super::{index::Index, Settings, Unchecked};
|
use super::{index::Index, Settings, Unchecked};
|
||||||
|
@ -6,11 +6,11 @@ use meilisearch_types::internal_error;
|
|||||||
use tokio::task::JoinError;
|
use tokio::task::JoinError;
|
||||||
|
|
||||||
use super::DocumentAdditionFormat;
|
use super::DocumentAdditionFormat;
|
||||||
use crate::document_formats::DocumentFormatError;
|
|
||||||
use crate::dump::error::DumpError;
|
use crate::dump::error::DumpError;
|
||||||
use crate::index::error::IndexError;
|
use crate::index::error::IndexError;
|
||||||
use crate::tasks::error::TaskError;
|
use crate::tasks::error::TaskError;
|
||||||
use crate::update_file_store::UpdateFileStoreError;
|
use crate::update_file_store::UpdateFileStoreError;
|
||||||
|
use milli::documents::document_formats::DocumentFormatError;
|
||||||
|
|
||||||
use crate::index_resolver::error::IndexResolverError;
|
use crate::index_resolver::error::IndexResolverError;
|
||||||
|
|
||||||
|
@ -20,7 +20,6 @@ use tokio::task::spawn_blocking;
|
|||||||
use tokio::time::sleep;
|
use tokio::time::sleep;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
use crate::document_formats::{read_csv, read_json, read_ndjson};
|
|
||||||
use crate::dump::{self, load_dump, DumpHandler};
|
use crate::dump::{self, load_dump, DumpHandler};
|
||||||
use crate::index::{
|
use crate::index::{
|
||||||
Checked, Document, IndexMeta, IndexStats, SearchQuery, SearchResult, Settings, Unchecked,
|
Checked, Document, IndexMeta, IndexStats, SearchQuery, SearchResult, Settings, Unchecked,
|
||||||
@ -34,6 +33,7 @@ use crate::tasks::{
|
|||||||
BatchHandler, EmptyBatchHandler, Scheduler, SnapshotHandler, TaskFilter, TaskStore,
|
BatchHandler, EmptyBatchHandler, Scheduler, SnapshotHandler, TaskFilter, TaskStore,
|
||||||
};
|
};
|
||||||
use error::Result;
|
use error::Result;
|
||||||
|
use milli::documents::document_formats::{read_csv, read_json, read_ndjson};
|
||||||
|
|
||||||
use self::error::IndexControllerError;
|
use self::error::IndexControllerError;
|
||||||
use crate::index_resolver::index_store::{IndexStore, MapIndexStore};
|
use crate::index_resolver::index_store::{IndexStore, MapIndexStore};
|
||||||
|
@ -18,7 +18,6 @@ pub use milli;
|
|||||||
pub use milli::heed;
|
pub use milli::heed;
|
||||||
|
|
||||||
mod compression;
|
mod compression;
|
||||||
pub mod document_formats;
|
|
||||||
|
|
||||||
use walkdir::WalkDir;
|
use walkdir::WalkDir;
|
||||||
|
|
||||||
|
@ -14,7 +14,7 @@ pub use test::MockUpdateFileStore as UpdateFileStore;
|
|||||||
|
|
||||||
const UPDATE_FILES_PATH: &str = "updates/updates_files";
|
const UPDATE_FILES_PATH: &str = "updates/updates_files";
|
||||||
|
|
||||||
use crate::document_formats::read_ndjson;
|
use milli::documents::document_formats::read_ndjson;
|
||||||
|
|
||||||
pub struct UpdateFile {
|
pub struct UpdateFile {
|
||||||
path: PathBuf,
|
path: PathBuf,
|
||||||
|
@ -10,6 +10,7 @@ proptest = { version = "1.0.0", optional = true }
|
|||||||
proptest-derive = { version = "0.3.0", optional = true }
|
proptest-derive = { version = "0.3.0", optional = true }
|
||||||
serde = { version = "1.0.136", features = ["derive"] }
|
serde = { version = "1.0.136", features = ["derive"] }
|
||||||
serde_json = "1.0.79"
|
serde_json = "1.0.79"
|
||||||
|
milli = { path = "../../milli/milli" }
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
test-traits = ["proptest", "proptest-derive"]
|
test-traits = ["proptest", "proptest-derive"]
|
||||||
|
@ -94,6 +94,17 @@ pub trait ErrorCode: std::error::Error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl ErrorCode for milli::documents::document_formats::DocumentFormatError {
|
||||||
|
fn error_code(&self) -> Code {
|
||||||
|
match self {
|
||||||
|
milli::documents::document_formats::DocumentFormatError::Internal(_) => Code::Internal,
|
||||||
|
milli::documents::document_formats::DocumentFormatError::MalformedPayload(_, _) => {
|
||||||
|
Code::MalformedPayload
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[allow(clippy::enum_variant_names)]
|
#[allow(clippy::enum_variant_names)]
|
||||||
enum ErrorType {
|
enum ErrorType {
|
||||||
InternalError,
|
InternalError,
|
||||||
|
Loading…
Reference in New Issue
Block a user