mirror of
https://github.com/meilisearch/meilisearch.git
synced 2024-11-26 12:05:05 +08:00
implements the csv delimiter without tests
Co-authored-by: Maxi Barmetler <maxi.barmetler@gmail.com>
This commit is contained in:
parent
143e3cf948
commit
8c074f5028
8
Cargo.lock
generated
8
Cargo.lock
generated
@ -1113,9 +1113,9 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "deserr"
|
name = "deserr"
|
||||||
version = "0.4.1"
|
version = "0.5.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "6eee2844f21cf7fb5693aae1fb8f1658127acfdb2fc072167d68a9152584ae64"
|
checksum = "c71c14985c842bf1e520b1ebcd22daff6aeece32f510e11f063cecf9b308c04b"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"actix-http",
|
"actix-http",
|
||||||
"actix-utils",
|
"actix-utils",
|
||||||
@ -1130,9 +1130,9 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "deserr-internal"
|
name = "deserr-internal"
|
||||||
version = "0.4.1"
|
version = "0.5.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "c27246f8ca9eeba9dd70d614b664dc43b529251ed7bd9e633131010d340da4b9"
|
checksum = "cae1c51b191528c9e4e5d6cff671de94f61fcda1c206cc891251e0cf438c941a"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"convert_case 0.5.0",
|
"convert_case 0.5.0",
|
||||||
"proc-macro2",
|
"proc-macro2",
|
||||||
|
@ -9,7 +9,7 @@ actix-web = { version = "4.2.1", default-features = false }
|
|||||||
anyhow = "1.0.65"
|
anyhow = "1.0.65"
|
||||||
convert_case = "0.6.0"
|
convert_case = "0.6.0"
|
||||||
csv = "1.1.6"
|
csv = "1.1.6"
|
||||||
deserr = "0.4.1"
|
deserr = "0.5.0"
|
||||||
either = { version = "1.6.1", features = ["serde"] }
|
either = { version = "1.6.1", features = ["serde"] }
|
||||||
enum-iterator = "1.1.3"
|
enum-iterator = "1.1.3"
|
||||||
file-store = { path = "../file-store" }
|
file-store = { path = "../file-store" }
|
||||||
|
@ -19,7 +19,7 @@ type Result<T> = std::result::Result<T, DocumentFormatError>;
|
|||||||
pub enum PayloadType {
|
pub enum PayloadType {
|
||||||
Ndjson,
|
Ndjson,
|
||||||
Json,
|
Json,
|
||||||
Csv,
|
Csv(u8),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl fmt::Display for PayloadType {
|
impl fmt::Display for PayloadType {
|
||||||
@ -27,7 +27,7 @@ impl fmt::Display for PayloadType {
|
|||||||
match self {
|
match self {
|
||||||
PayloadType::Ndjson => f.write_str("ndjson"),
|
PayloadType::Ndjson => f.write_str("ndjson"),
|
||||||
PayloadType::Json => f.write_str("json"),
|
PayloadType::Json => f.write_str("json"),
|
||||||
PayloadType::Csv => f.write_str("csv"),
|
PayloadType::Csv(_) => f.write_str("csv"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -105,11 +105,11 @@ impl ErrorCode for DocumentFormatError {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Reads CSV from input and write an obkv batch to writer.
|
/// Reads CSV from input and write an obkv batch to writer.
|
||||||
pub fn read_csv(file: &File, writer: impl Write + Seek) -> Result<u64> {
|
pub fn read_csv(file: &File, writer: impl Write + Seek, delimiter: u8) -> Result<u64> {
|
||||||
let mut builder = DocumentsBatchBuilder::new(writer);
|
let mut builder = DocumentsBatchBuilder::new(writer);
|
||||||
let mmap = unsafe { MmapOptions::new().map(file)? };
|
let mmap = unsafe { MmapOptions::new().map(file)? };
|
||||||
let csv = csv::Reader::from_reader(mmap.as_ref());
|
let csv = csv::ReaderBuilder::new().delimiter(delimiter).from_reader(mmap.as_ref());
|
||||||
builder.append_csv(csv).map_err(|e| (PayloadType::Csv, e))?;
|
builder.append_csv(csv).map_err(|e| (PayloadType::Csv(delimiter), e))?;
|
||||||
|
|
||||||
let count = builder.documents_count();
|
let count = builder.documents_count();
|
||||||
let _ = builder.into_inner().map_err(DocumentFormatError::Io)?;
|
let _ = builder.into_inner().map_err(DocumentFormatError::Io)?;
|
||||||
|
@ -220,6 +220,7 @@ InvalidDocumentOffset , InvalidRequest , BAD_REQUEST ;
|
|||||||
InvalidIndexLimit , InvalidRequest , BAD_REQUEST ;
|
InvalidIndexLimit , InvalidRequest , BAD_REQUEST ;
|
||||||
InvalidIndexOffset , InvalidRequest , BAD_REQUEST ;
|
InvalidIndexOffset , InvalidRequest , BAD_REQUEST ;
|
||||||
InvalidIndexPrimaryKey , InvalidRequest , BAD_REQUEST ;
|
InvalidIndexPrimaryKey , InvalidRequest , BAD_REQUEST ;
|
||||||
|
InvalidIndexCsvDelimiter , InvalidRequest , BAD_REQUEST ;
|
||||||
InvalidIndexUid , InvalidRequest , BAD_REQUEST ;
|
InvalidIndexUid , InvalidRequest , BAD_REQUEST ;
|
||||||
InvalidSearchAttributesToCrop , InvalidRequest , BAD_REQUEST ;
|
InvalidSearchAttributesToCrop , InvalidRequest , BAD_REQUEST ;
|
||||||
InvalidSearchAttributesToHighlight , InvalidRequest , BAD_REQUEST ;
|
InvalidSearchAttributesToHighlight , InvalidRequest , BAD_REQUEST ;
|
||||||
|
@ -19,7 +19,7 @@ byte-unit = { version = "4.0.14", default-features = false, features = ["std", "
|
|||||||
bytes = "1.2.1"
|
bytes = "1.2.1"
|
||||||
clap = { version = "4.0.9", features = ["derive", "env"] }
|
clap = { version = "4.0.9", features = ["derive", "env"] }
|
||||||
crossbeam-channel = "0.5.6"
|
crossbeam-channel = "0.5.6"
|
||||||
deserr = "0.4.1"
|
deserr = "0.5.0"
|
||||||
dump = { path = "../dump" }
|
dump = { path = "../dump" }
|
||||||
either = "1.8.0"
|
either = "1.8.0"
|
||||||
env_logger = "0.9.1"
|
env_logger = "0.9.1"
|
||||||
|
@ -11,6 +11,8 @@ pub enum MeilisearchHttpError {
|
|||||||
#[error("A Content-Type header is missing. Accepted values for the Content-Type header are: {}",
|
#[error("A Content-Type header is missing. Accepted values for the Content-Type header are: {}",
|
||||||
.0.iter().map(|s| format!("`{}`", s)).collect::<Vec<_>>().join(", "))]
|
.0.iter().map(|s| format!("`{}`", s)).collect::<Vec<_>>().join(", "))]
|
||||||
MissingContentType(Vec<String>),
|
MissingContentType(Vec<String>),
|
||||||
|
#[error("The Content-Type `{0}` does not support the use of a csv delimiter. The csv delimiter can only be used with the Content-Type `text/csv`.")]
|
||||||
|
CsvDelimiterWithWrongContentType(String),
|
||||||
#[error(
|
#[error(
|
||||||
"The Content-Type `{0}` is invalid. Accepted values for the Content-Type header are: {}",
|
"The Content-Type `{0}` is invalid. Accepted values for the Content-Type header are: {}",
|
||||||
.1.iter().map(|s| format!("`{}`", s)).collect::<Vec<_>>().join(", ")
|
.1.iter().map(|s| format!("`{}`", s)).collect::<Vec<_>>().join(", ")
|
||||||
@ -52,6 +54,7 @@ impl ErrorCode for MeilisearchHttpError {
|
|||||||
fn error_code(&self) -> Code {
|
fn error_code(&self) -> Code {
|
||||||
match self {
|
match self {
|
||||||
MeilisearchHttpError::MissingContentType(_) => Code::MissingContentType,
|
MeilisearchHttpError::MissingContentType(_) => Code::MissingContentType,
|
||||||
|
MeilisearchHttpError::CsvDelimiterWithWrongContentType(_) => Code::InvalidContentType,
|
||||||
MeilisearchHttpError::MissingPayload(_) => Code::MissingPayload,
|
MeilisearchHttpError::MissingPayload(_) => Code::MissingPayload,
|
||||||
MeilisearchHttpError::InvalidContentType(_, _) => Code::InvalidContentType,
|
MeilisearchHttpError::InvalidContentType(_, _) => Code::InvalidContentType,
|
||||||
MeilisearchHttpError::DocumentNotFound(_) => Code::DocumentNotFound,
|
MeilisearchHttpError::DocumentNotFound(_) => Code::DocumentNotFound,
|
||||||
|
@ -10,10 +10,10 @@ use futures::StreamExt;
|
|||||||
use index_scheduler::IndexScheduler;
|
use index_scheduler::IndexScheduler;
|
||||||
use log::debug;
|
use log::debug;
|
||||||
use meilisearch_types::deserr::query_params::Param;
|
use meilisearch_types::deserr::query_params::Param;
|
||||||
use meilisearch_types::deserr::{DeserrJsonError, DeserrQueryParamError};
|
use meilisearch_types::deserr::DeserrQueryParamError;
|
||||||
use meilisearch_types::document_formats::{read_csv, read_json, read_ndjson, PayloadType};
|
use meilisearch_types::document_formats::{read_csv, read_json, read_ndjson, PayloadType};
|
||||||
use meilisearch_types::error::deserr_codes::*;
|
use meilisearch_types::error::deserr_codes::*;
|
||||||
use meilisearch_types::error::ResponseError;
|
use meilisearch_types::error::{Code, ResponseError};
|
||||||
use meilisearch_types::heed::RoTxn;
|
use meilisearch_types::heed::RoTxn;
|
||||||
use meilisearch_types::index_uid::IndexUid;
|
use meilisearch_types::index_uid::IndexUid;
|
||||||
use meilisearch_types::milli::update::IndexDocumentsMethod;
|
use meilisearch_types::milli::update::IndexDocumentsMethod;
|
||||||
@ -67,7 +67,7 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
|
|||||||
cfg.service(
|
cfg.service(
|
||||||
web::resource("")
|
web::resource("")
|
||||||
.route(web::get().to(SeqHandler(get_all_documents)))
|
.route(web::get().to(SeqHandler(get_all_documents)))
|
||||||
.route(web::post().to(SeqHandler(add_documents)))
|
.route(web::post().to(SeqHandler(replace_documents)))
|
||||||
.route(web::put().to(SeqHandler(update_documents)))
|
.route(web::put().to(SeqHandler(update_documents)))
|
||||||
.route(web::delete().to(SeqHandler(clear_all_documents))),
|
.route(web::delete().to(SeqHandler(clear_all_documents))),
|
||||||
)
|
)
|
||||||
@ -156,16 +156,31 @@ pub async fn get_all_documents(
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Deserialize, Debug, Deserr)]
|
#[derive(Deserialize, Debug, Deserr)]
|
||||||
#[deserr(error = DeserrJsonError, rename_all = camelCase, deny_unknown_fields)]
|
#[deserr(error = DeserrQueryParamError, rename_all = camelCase, deny_unknown_fields)]
|
||||||
pub struct UpdateDocumentsQuery {
|
pub struct UpdateDocumentsQuery {
|
||||||
#[deserr(default, error = DeserrJsonError<InvalidIndexPrimaryKey>)]
|
#[deserr(default, error = DeserrQueryParamError<InvalidIndexPrimaryKey>)]
|
||||||
pub primary_key: Option<String>,
|
pub primary_key: Option<String>,
|
||||||
|
#[deserr(default, try_from(char) = from_char_csv_delimiter -> DeserrQueryParamError<InvalidIndexCsvDelimiter>, error = DeserrQueryParamError<InvalidIndexCsvDelimiter>)]
|
||||||
|
pub csv_delimiter: Option<u8>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn add_documents(
|
fn from_char_csv_delimiter(
|
||||||
|
c: char,
|
||||||
|
) -> Result<Option<u8>, DeserrQueryParamError<InvalidIndexCsvDelimiter>> {
|
||||||
|
if c.is_ascii() {
|
||||||
|
Ok(Some(c as u8))
|
||||||
|
} else {
|
||||||
|
Err(DeserrQueryParamError::new(
|
||||||
|
format!("csv delimiter must be an ascii character. Found: `{}`", c),
|
||||||
|
Code::InvalidIndexCsvDelimiter,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn replace_documents(
|
||||||
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_ADD }>, Data<IndexScheduler>>,
|
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_ADD }>, Data<IndexScheduler>>,
|
||||||
index_uid: web::Path<String>,
|
index_uid: web::Path<String>,
|
||||||
params: AwebQueryParameter<UpdateDocumentsQuery, DeserrJsonError>,
|
params: AwebQueryParameter<UpdateDocumentsQuery, DeserrQueryParamError>,
|
||||||
body: Payload,
|
body: Payload,
|
||||||
req: HttpRequest,
|
req: HttpRequest,
|
||||||
analytics: web::Data<dyn Analytics>,
|
analytics: web::Data<dyn Analytics>,
|
||||||
@ -183,6 +198,7 @@ pub async fn add_documents(
|
|||||||
index_scheduler,
|
index_scheduler,
|
||||||
index_uid,
|
index_uid,
|
||||||
params.primary_key,
|
params.primary_key,
|
||||||
|
params.csv_delimiter,
|
||||||
body,
|
body,
|
||||||
IndexDocumentsMethod::ReplaceDocuments,
|
IndexDocumentsMethod::ReplaceDocuments,
|
||||||
allow_index_creation,
|
allow_index_creation,
|
||||||
@ -195,7 +211,7 @@ pub async fn add_documents(
|
|||||||
pub async fn update_documents(
|
pub async fn update_documents(
|
||||||
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_ADD }>, Data<IndexScheduler>>,
|
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_ADD }>, Data<IndexScheduler>>,
|
||||||
index_uid: web::Path<String>,
|
index_uid: web::Path<String>,
|
||||||
params: AwebQueryParameter<UpdateDocumentsQuery, DeserrJsonError>,
|
params: AwebQueryParameter<UpdateDocumentsQuery, DeserrQueryParamError>,
|
||||||
body: Payload,
|
body: Payload,
|
||||||
req: HttpRequest,
|
req: HttpRequest,
|
||||||
analytics: web::Data<dyn Analytics>,
|
analytics: web::Data<dyn Analytics>,
|
||||||
@ -203,6 +219,7 @@ pub async fn update_documents(
|
|||||||
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
|
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
|
||||||
|
|
||||||
debug!("called with params: {:?}", params);
|
debug!("called with params: {:?}", params);
|
||||||
|
let params = params.into_inner();
|
||||||
|
|
||||||
analytics.update_documents(¶ms, index_scheduler.index(&index_uid).is_err(), &req);
|
analytics.update_documents(¶ms, index_scheduler.index(&index_uid).is_err(), &req);
|
||||||
|
|
||||||
@ -211,7 +228,8 @@ pub async fn update_documents(
|
|||||||
extract_mime_type(&req)?,
|
extract_mime_type(&req)?,
|
||||||
index_scheduler,
|
index_scheduler,
|
||||||
index_uid,
|
index_uid,
|
||||||
params.into_inner().primary_key,
|
params.primary_key,
|
||||||
|
params.csv_delimiter,
|
||||||
body,
|
body,
|
||||||
IndexDocumentsMethod::UpdateDocuments,
|
IndexDocumentsMethod::UpdateDocuments,
|
||||||
allow_index_creation,
|
allow_index_creation,
|
||||||
@ -226,21 +244,37 @@ async fn document_addition(
|
|||||||
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_ADD }>, Data<IndexScheduler>>,
|
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_ADD }>, Data<IndexScheduler>>,
|
||||||
index_uid: IndexUid,
|
index_uid: IndexUid,
|
||||||
primary_key: Option<String>,
|
primary_key: Option<String>,
|
||||||
|
csv_delimiter: Option<u8>,
|
||||||
mut body: Payload,
|
mut body: Payload,
|
||||||
method: IndexDocumentsMethod,
|
method: IndexDocumentsMethod,
|
||||||
allow_index_creation: bool,
|
allow_index_creation: bool,
|
||||||
) -> Result<SummarizedTaskView, MeilisearchHttpError> {
|
) -> Result<SummarizedTaskView, MeilisearchHttpError> {
|
||||||
let format = match mime_type.as_ref().map(|m| (m.type_().as_str(), m.subtype().as_str())) {
|
let format = match (
|
||||||
Some(("application", "json")) => PayloadType::Json,
|
mime_type.as_ref().map(|m| (m.type_().as_str(), m.subtype().as_str())),
|
||||||
Some(("application", "x-ndjson")) => PayloadType::Ndjson,
|
csv_delimiter,
|
||||||
Some(("text", "csv")) => PayloadType::Csv,
|
) {
|
||||||
Some((type_, subtype)) => {
|
(Some(("application", "json")), None) => PayloadType::Json,
|
||||||
|
(Some(("application", "x-ndjson")), None) => PayloadType::Ndjson,
|
||||||
|
(Some(("text", "csv")), None) => PayloadType::Csv(b','),
|
||||||
|
(Some(("text", "csv")), Some(delimiter)) => PayloadType::Csv(delimiter),
|
||||||
|
|
||||||
|
(Some(("application", "json")), Some(_)) => {
|
||||||
|
return Err(MeilisearchHttpError::CsvDelimiterWithWrongContentType(String::from(
|
||||||
|
"application/json",
|
||||||
|
)))
|
||||||
|
}
|
||||||
|
(Some(("application", "x-ndjson")), Some(_)) => {
|
||||||
|
return Err(MeilisearchHttpError::CsvDelimiterWithWrongContentType(String::from(
|
||||||
|
"application/x-ndjson",
|
||||||
|
)))
|
||||||
|
}
|
||||||
|
(Some((type_, subtype)), _) => {
|
||||||
return Err(MeilisearchHttpError::InvalidContentType(
|
return Err(MeilisearchHttpError::InvalidContentType(
|
||||||
format!("{}/{}", type_, subtype),
|
format!("{}/{}", type_, subtype),
|
||||||
ACCEPTED_CONTENT_TYPE.clone(),
|
ACCEPTED_CONTENT_TYPE.clone(),
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
None => {
|
(None, _) => {
|
||||||
return Err(MeilisearchHttpError::MissingContentType(ACCEPTED_CONTENT_TYPE.clone()))
|
return Err(MeilisearchHttpError::MissingContentType(ACCEPTED_CONTENT_TYPE.clone()))
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@ -285,7 +319,7 @@ async fn document_addition(
|
|||||||
let documents_count = tokio::task::spawn_blocking(move || {
|
let documents_count = tokio::task::spawn_blocking(move || {
|
||||||
let documents_count = match format {
|
let documents_count = match format {
|
||||||
PayloadType::Json => read_json(&read_file, update_file.as_file_mut())?,
|
PayloadType::Json => read_json(&read_file, update_file.as_file_mut())?,
|
||||||
PayloadType::Csv => read_csv(&read_file, update_file.as_file_mut())?,
|
PayloadType::Csv(delim) => read_csv(&read_file, update_file.as_file_mut(), delim)?,
|
||||||
PayloadType::Ndjson => read_ndjson(&read_file, update_file.as_file_mut())?,
|
PayloadType::Ndjson => read_ndjson(&read_file, update_file.as_file_mut())?,
|
||||||
};
|
};
|
||||||
// we NEED to persist the file here because we moved the `udpate_file` in another task.
|
// we NEED to persist the file here because we moved the `udpate_file` in another task.
|
||||||
|
@ -12,7 +12,7 @@ byteorder = "1.4.3"
|
|||||||
charabia = { version = "0.7.0", default-features = false }
|
charabia = { version = "0.7.0", default-features = false }
|
||||||
concat-arrays = "0.1.2"
|
concat-arrays = "0.1.2"
|
||||||
crossbeam-channel = "0.5.6"
|
crossbeam-channel = "0.5.6"
|
||||||
deserr = "0.4.1"
|
deserr = "0.5.0"
|
||||||
either = "1.8.0"
|
either = "1.8.0"
|
||||||
flatten-serde-json = { path = "../flatten-serde-json" }
|
flatten-serde-json = { path = "../flatten-serde-json" }
|
||||||
fst = "0.4.7"
|
fst = "0.4.7"
|
||||||
|
Loading…
Reference in New Issue
Block a user