mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-02-17 08:10:14 +08:00
clippy fix change
This commit is contained in:
parent
9c28632498
commit
bf96b6df93
@ -1065,7 +1065,7 @@ impl IndexScheduler {
|
|||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use std::io::{Seek, Write, BufWriter};
|
use std::io::{BufWriter, Seek, Write};
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
|
|
||||||
use big_s::S;
|
use big_s::S;
|
||||||
@ -1079,7 +1079,7 @@ mod tests {
|
|||||||
};
|
};
|
||||||
use meilisearch_types::tasks::IndexSwap;
|
use meilisearch_types::tasks::IndexSwap;
|
||||||
use meilisearch_types::VERSION_FILE_NAME;
|
use meilisearch_types::VERSION_FILE_NAME;
|
||||||
use tempfile::{TempDir, NamedTempFile};
|
use tempfile::{NamedTempFile, TempDir};
|
||||||
use time::Duration;
|
use time::Duration;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
use Breakpoint::*;
|
use Breakpoint::*;
|
||||||
@ -1187,7 +1187,10 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Adapting to the new json reading interface
|
/// Adapting to the new json reading interface
|
||||||
pub fn read_json(bytes: &[u8], write: impl Write + Seek) -> std::result::Result<usize, DocumentFormatError> {
|
pub fn read_json(
|
||||||
|
bytes: &[u8],
|
||||||
|
write: impl Write + Seek,
|
||||||
|
) -> std::result::Result<usize, DocumentFormatError> {
|
||||||
let temp_file = NamedTempFile::new().unwrap();
|
let temp_file = NamedTempFile::new().unwrap();
|
||||||
let mut buffer = BufWriter::new(temp_file.reopen().unwrap());
|
let mut buffer = BufWriter::new(temp_file.reopen().unwrap());
|
||||||
buffer.write(bytes).unwrap();
|
buffer.write(bytes).unwrap();
|
||||||
@ -1213,9 +1216,7 @@ mod tests {
|
|||||||
);
|
);
|
||||||
|
|
||||||
let (_uuid, mut file) = index_scheduler.create_update_file_with_uuid(file_uuid).unwrap();
|
let (_uuid, mut file) = index_scheduler.create_update_file_with_uuid(file_uuid).unwrap();
|
||||||
let documents_count =
|
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64;
|
||||||
read_json(content.as_bytes(), file.as_file_mut())
|
|
||||||
.unwrap() as u64;
|
|
||||||
(file, documents_count)
|
(file, documents_count)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1595,9 +1596,7 @@ mod tests {
|
|||||||
}"#;
|
}"#;
|
||||||
|
|
||||||
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap();
|
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap();
|
||||||
let documents_count =
|
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64;
|
||||||
read_json(content.as_bytes(), file.as_file_mut())
|
|
||||||
.unwrap() as u64;
|
|
||||||
file.persist().unwrap();
|
file.persist().unwrap();
|
||||||
index_scheduler
|
index_scheduler
|
||||||
.register(KindWithContent::DocumentAdditionOrUpdate {
|
.register(KindWithContent::DocumentAdditionOrUpdate {
|
||||||
@ -1634,9 +1633,7 @@ mod tests {
|
|||||||
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task");
|
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task");
|
||||||
|
|
||||||
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap();
|
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap();
|
||||||
let documents_count =
|
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64;
|
||||||
read_json(content.as_bytes(), file.as_file_mut())
|
|
||||||
.unwrap() as u64;
|
|
||||||
file.persist().unwrap();
|
file.persist().unwrap();
|
||||||
index_scheduler
|
index_scheduler
|
||||||
.register(KindWithContent::DocumentAdditionOrUpdate {
|
.register(KindWithContent::DocumentAdditionOrUpdate {
|
||||||
@ -1803,9 +1800,7 @@ mod tests {
|
|||||||
}"#;
|
}"#;
|
||||||
|
|
||||||
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap();
|
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap();
|
||||||
let documents_count =
|
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64;
|
||||||
read_json(content.as_bytes(), file.as_file_mut())
|
|
||||||
.unwrap() as u64;
|
|
||||||
file.persist().unwrap();
|
file.persist().unwrap();
|
||||||
index_scheduler
|
index_scheduler
|
||||||
.register(KindWithContent::DocumentAdditionOrUpdate {
|
.register(KindWithContent::DocumentAdditionOrUpdate {
|
||||||
@ -1963,11 +1958,7 @@ mod tests {
|
|||||||
);
|
);
|
||||||
|
|
||||||
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap();
|
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap();
|
||||||
let documents_count = read_json(
|
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64;
|
||||||
content.as_bytes(),
|
|
||||||
file.as_file_mut(),
|
|
||||||
)
|
|
||||||
.unwrap() as u64;
|
|
||||||
file.persist().unwrap();
|
file.persist().unwrap();
|
||||||
index_scheduler
|
index_scheduler
|
||||||
.register(KindWithContent::DocumentAdditionOrUpdate {
|
.register(KindWithContent::DocumentAdditionOrUpdate {
|
||||||
@ -2014,11 +2005,7 @@ mod tests {
|
|||||||
);
|
);
|
||||||
|
|
||||||
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap();
|
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap();
|
||||||
let documents_count = read_json(
|
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64;
|
||||||
content.as_bytes(),
|
|
||||||
file.as_file_mut(),
|
|
||||||
)
|
|
||||||
.unwrap() as u64;
|
|
||||||
file.persist().unwrap();
|
file.persist().unwrap();
|
||||||
index_scheduler
|
index_scheduler
|
||||||
.register(KindWithContent::DocumentAdditionOrUpdate {
|
.register(KindWithContent::DocumentAdditionOrUpdate {
|
||||||
@ -2067,11 +2054,7 @@ mod tests {
|
|||||||
);
|
);
|
||||||
|
|
||||||
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap();
|
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap();
|
||||||
let documents_count = read_json(
|
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64;
|
||||||
content.as_bytes(),
|
|
||||||
file.as_file_mut(),
|
|
||||||
)
|
|
||||||
.unwrap() as u64;
|
|
||||||
file.persist().unwrap();
|
file.persist().unwrap();
|
||||||
index_scheduler
|
index_scheduler
|
||||||
.register(KindWithContent::DocumentAdditionOrUpdate {
|
.register(KindWithContent::DocumentAdditionOrUpdate {
|
||||||
@ -2121,11 +2104,7 @@ mod tests {
|
|||||||
);
|
);
|
||||||
|
|
||||||
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap();
|
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap();
|
||||||
let documents_count = read_json(
|
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64;
|
||||||
content.as_bytes(),
|
|
||||||
file.as_file_mut(),
|
|
||||||
)
|
|
||||||
.unwrap() as u64;
|
|
||||||
file.persist().unwrap();
|
file.persist().unwrap();
|
||||||
index_scheduler
|
index_scheduler
|
||||||
.register(KindWithContent::DocumentAdditionOrUpdate {
|
.register(KindWithContent::DocumentAdditionOrUpdate {
|
||||||
@ -2176,11 +2155,7 @@ mod tests {
|
|||||||
);
|
);
|
||||||
|
|
||||||
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap();
|
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap();
|
||||||
let documents_count = read_json(
|
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64;
|
||||||
content.as_bytes(),
|
|
||||||
file.as_file_mut(),
|
|
||||||
)
|
|
||||||
.unwrap() as u64;
|
|
||||||
file.persist().unwrap();
|
file.persist().unwrap();
|
||||||
index_scheduler
|
index_scheduler
|
||||||
.register(KindWithContent::DocumentAdditionOrUpdate {
|
.register(KindWithContent::DocumentAdditionOrUpdate {
|
||||||
@ -2627,9 +2602,7 @@ mod tests {
|
|||||||
}"#;
|
}"#;
|
||||||
|
|
||||||
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap();
|
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap();
|
||||||
let documents_count =
|
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64;
|
||||||
read_json(content.as_bytes(), file.as_file_mut())
|
|
||||||
.unwrap() as u64;
|
|
||||||
file.persist().unwrap();
|
file.persist().unwrap();
|
||||||
index_scheduler
|
index_scheduler
|
||||||
.register(KindWithContent::DocumentAdditionOrUpdate {
|
.register(KindWithContent::DocumentAdditionOrUpdate {
|
||||||
@ -2667,9 +2640,7 @@ mod tests {
|
|||||||
}"#;
|
}"#;
|
||||||
|
|
||||||
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap();
|
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap();
|
||||||
let documents_count =
|
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64;
|
||||||
read_json(content.as_bytes(), file.as_file_mut())
|
|
||||||
.unwrap() as u64;
|
|
||||||
file.persist().unwrap();
|
file.persist().unwrap();
|
||||||
index_scheduler
|
index_scheduler
|
||||||
.register(KindWithContent::DocumentAdditionOrUpdate {
|
.register(KindWithContent::DocumentAdditionOrUpdate {
|
||||||
@ -2725,11 +2696,7 @@ mod tests {
|
|||||||
);
|
);
|
||||||
|
|
||||||
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap();
|
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap();
|
||||||
let documents_count = read_json(
|
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64;
|
||||||
content.as_bytes(),
|
|
||||||
file.as_file_mut(),
|
|
||||||
)
|
|
||||||
.unwrap() as u64;
|
|
||||||
file.persist().unwrap();
|
file.persist().unwrap();
|
||||||
index_scheduler
|
index_scheduler
|
||||||
.register(KindWithContent::DocumentAdditionOrUpdate {
|
.register(KindWithContent::DocumentAdditionOrUpdate {
|
||||||
@ -2777,11 +2744,7 @@ mod tests {
|
|||||||
);
|
);
|
||||||
|
|
||||||
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap();
|
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap();
|
||||||
let documents_count = read_json(
|
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64;
|
||||||
content.as_bytes(),
|
|
||||||
file.as_file_mut(),
|
|
||||||
)
|
|
||||||
.unwrap() as u64;
|
|
||||||
file.persist().unwrap();
|
file.persist().unwrap();
|
||||||
index_scheduler
|
index_scheduler
|
||||||
.register(KindWithContent::DocumentAdditionOrUpdate {
|
.register(KindWithContent::DocumentAdditionOrUpdate {
|
||||||
@ -2835,11 +2798,7 @@ mod tests {
|
|||||||
);
|
);
|
||||||
|
|
||||||
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap();
|
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap();
|
||||||
let documents_count = read_json(
|
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64;
|
||||||
content.as_bytes(),
|
|
||||||
file.as_file_mut(),
|
|
||||||
)
|
|
||||||
.unwrap() as u64;
|
|
||||||
file.persist().unwrap();
|
file.persist().unwrap();
|
||||||
index_scheduler
|
index_scheduler
|
||||||
.register(KindWithContent::DocumentAdditionOrUpdate {
|
.register(KindWithContent::DocumentAdditionOrUpdate {
|
||||||
@ -2898,11 +2857,7 @@ mod tests {
|
|||||||
);
|
);
|
||||||
|
|
||||||
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap();
|
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap();
|
||||||
let documents_count = read_json(
|
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64;
|
||||||
content.as_bytes(),
|
|
||||||
file.as_file_mut(),
|
|
||||||
)
|
|
||||||
.unwrap() as u64;
|
|
||||||
file.persist().unwrap();
|
file.persist().unwrap();
|
||||||
index_scheduler
|
index_scheduler
|
||||||
.register(KindWithContent::DocumentAdditionOrUpdate {
|
.register(KindWithContent::DocumentAdditionOrUpdate {
|
||||||
@ -2966,11 +2921,7 @@ mod tests {
|
|||||||
let allow_index_creation = i % 2 != 0;
|
let allow_index_creation = i % 2 != 0;
|
||||||
|
|
||||||
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap();
|
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap();
|
||||||
let documents_count = read_json(
|
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64;
|
||||||
content.as_bytes(),
|
|
||||||
file.as_file_mut(),
|
|
||||||
)
|
|
||||||
.unwrap() as u64;
|
|
||||||
file.persist().unwrap();
|
file.persist().unwrap();
|
||||||
index_scheduler
|
index_scheduler
|
||||||
.register(KindWithContent::DocumentAdditionOrUpdate {
|
.register(KindWithContent::DocumentAdditionOrUpdate {
|
||||||
@ -3023,11 +2974,7 @@ mod tests {
|
|||||||
let allow_index_creation = i % 2 != 0;
|
let allow_index_creation = i % 2 != 0;
|
||||||
|
|
||||||
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap();
|
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap();
|
||||||
let documents_count = read_json(
|
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap() as u64;
|
||||||
content.as_bytes(),
|
|
||||||
file.as_file_mut(),
|
|
||||||
)
|
|
||||||
.unwrap() as u64;
|
|
||||||
file.persist().unwrap();
|
file.persist().unwrap();
|
||||||
index_scheduler
|
index_scheduler
|
||||||
.register(KindWithContent::DocumentAdditionOrUpdate {
|
.register(KindWithContent::DocumentAdditionOrUpdate {
|
||||||
|
@ -1,4 +1,11 @@
|
|||||||
use std::io::{ErrorKind, BufWriter, Write};
|
use crate::analytics::{Analytics, DocumentDeletionKind};
|
||||||
|
use crate::error::MeilisearchHttpError;
|
||||||
|
use crate::error::PayloadError::ReceivePayloadErr;
|
||||||
|
use crate::extractors::authentication::policies::*;
|
||||||
|
use crate::extractors::authentication::GuardedData;
|
||||||
|
use crate::extractors::payload::Payload;
|
||||||
|
use crate::extractors::sequential_extractor::SeqHandler;
|
||||||
|
use crate::routes::{fold_star_or, PaginationView, SummarizedTaskView};
|
||||||
use actix_web::http::header::CONTENT_TYPE;
|
use actix_web::http::header::CONTENT_TYPE;
|
||||||
use actix_web::web::Data;
|
use actix_web::web::Data;
|
||||||
use actix_web::{web, HttpMessage, HttpRequest, HttpResponse};
|
use actix_web::{web, HttpMessage, HttpRequest, HttpResponse};
|
||||||
@ -6,7 +13,7 @@ use bstr::ByteSlice;
|
|||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
use index_scheduler::IndexScheduler;
|
use index_scheduler::IndexScheduler;
|
||||||
use log::{debug, error};
|
use log::{debug, error};
|
||||||
use meilisearch_types::document_formats::{read_csv, PayloadType, read_json, read_ndjson};
|
use meilisearch_types::document_formats::{read_csv, read_json, read_ndjson, PayloadType};
|
||||||
use meilisearch_types::error::ResponseError;
|
use meilisearch_types::error::ResponseError;
|
||||||
use meilisearch_types::heed::RoTxn;
|
use meilisearch_types::heed::RoTxn;
|
||||||
use meilisearch_types::index_uid::IndexUid;
|
use meilisearch_types::index_uid::IndexUid;
|
||||||
@ -19,15 +26,8 @@ use once_cell::sync::Lazy;
|
|||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use serde_cs::vec::CS;
|
use serde_cs::vec::CS;
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
|
use std::io::{BufWriter, ErrorKind, Write};
|
||||||
use tempfile::NamedTempFile;
|
use tempfile::NamedTempFile;
|
||||||
use crate::analytics::{Analytics, DocumentDeletionKind};
|
|
||||||
use crate::error::MeilisearchHttpError;
|
|
||||||
use crate::error::PayloadError::ReceivePayloadErr;
|
|
||||||
use crate::extractors::authentication::policies::*;
|
|
||||||
use crate::extractors::authentication::GuardedData;
|
|
||||||
use crate::extractors::payload::Payload;
|
|
||||||
use crate::extractors::sequential_extractor::SeqHandler;
|
|
||||||
use crate::routes::{fold_star_or, PaginationView, SummarizedTaskView};
|
|
||||||
|
|
||||||
static ACCEPTED_CONTENT_TYPE: Lazy<Vec<String>> = Lazy::new(|| {
|
static ACCEPTED_CONTENT_TYPE: Lazy<Vec<String>> = Lazy::new(|| {
|
||||||
vec!["application/json".to_string(), "application/x-ndjson".to_string(), "text/csv".to_string()]
|
vec!["application/json".to_string(), "application/x-ndjson".to_string(), "text/csv".to_string()]
|
||||||
@ -227,14 +227,15 @@ async fn document_addition(
|
|||||||
|
|
||||||
let (uuid, mut update_file) = index_scheduler.create_update_file()?;
|
let (uuid, mut update_file) = index_scheduler.create_update_file()?;
|
||||||
|
|
||||||
let err: Result<SummarizedTaskView, MeilisearchHttpError> = Err(MeilisearchHttpError::Payload(ReceivePayloadErr));
|
let err: Result<SummarizedTaskView, MeilisearchHttpError> =
|
||||||
|
Err(MeilisearchHttpError::Payload(ReceivePayloadErr));
|
||||||
|
|
||||||
let temp_file = match NamedTempFile::new() {
|
let temp_file = match NamedTempFile::new() {
|
||||||
Ok(temp_file) => temp_file,
|
Ok(temp_file) => temp_file,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("create a temporary file error: {}", e);
|
error!("create a temporary file error: {}", e);
|
||||||
return err;
|
return err;
|
||||||
},
|
}
|
||||||
};
|
};
|
||||||
debug!("temp file path: {:?}", temp_file.as_ref());
|
debug!("temp file path: {:?}", temp_file.as_ref());
|
||||||
let buffer_file = match temp_file.reopen() {
|
let buffer_file = match temp_file.reopen() {
|
||||||
@ -245,21 +246,20 @@ async fn document_addition(
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
let mut buffer = BufWriter::new(buffer_file);
|
let mut buffer = BufWriter::new(buffer_file);
|
||||||
let mut buffer_write_size:usize = 0;
|
let mut buffer_write_size: usize = 0;
|
||||||
while let Some(bytes) = body.next().await {
|
while let Some(bytes) = body.next().await {
|
||||||
match buffer.write(&bytes?) {
|
match buffer.write(&bytes?) {
|
||||||
Ok(size) => buffer_write_size = buffer_write_size + size,
|
Ok(size) => buffer_write_size += size,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("bufWriter write error: {}", e);
|
error!("bufWriter write error: {}", e);
|
||||||
return err
|
return err;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
};
|
|
||||||
|
|
||||||
if let Err(e) = buffer.flush() {
|
if let Err(e) = buffer.flush() {
|
||||||
error!("bufWriter flush error: {}", e);
|
error!("bufWriter flush error: {}", e);
|
||||||
return err
|
return err;
|
||||||
};
|
};
|
||||||
|
|
||||||
if buffer_write_size == 0 {
|
if buffer_write_size == 0 {
|
||||||
|
@ -1,18 +1,18 @@
|
|||||||
use std::borrow::Borrow;
|
use crate::error::{Code, ErrorCode};
|
||||||
use std::fmt::{self, Debug, Display};
|
use crate::internal_error;
|
||||||
use std::fs::File;
|
|
||||||
use std::io::{self, Seek, Write};
|
|
||||||
use std::marker::PhantomData;
|
|
||||||
use either::Either;
|
use either::Either;
|
||||||
use log::debug;
|
use log::debug;
|
||||||
use memmap::MmapOptions;
|
use memmap::MmapOptions;
|
||||||
use milli::documents::{DocumentsBatchBuilder, Error};
|
use milli::documents::{DocumentsBatchBuilder, Error};
|
||||||
use milli::Object;
|
use milli::Object;
|
||||||
use serde::de::{Visitor, SeqAccess};
|
use serde::de::{SeqAccess, Visitor};
|
||||||
use serde::{Deserialize, Deserializer};
|
use serde::{Deserialize, Deserializer};
|
||||||
use serde_json::error::Category;
|
use serde_json::error::Category;
|
||||||
use crate::error::{Code, ErrorCode};
|
use std::borrow::Borrow;
|
||||||
use crate::internal_error;
|
use std::fmt::{self, Debug, Display};
|
||||||
|
use std::fs::File;
|
||||||
|
use std::io::{self, Seek, Write};
|
||||||
|
use std::marker::PhantomData;
|
||||||
|
|
||||||
type Result<T> = std::result::Result<T, DocumentFormatError>;
|
type Result<T> = std::result::Result<T, DocumentFormatError>;
|
||||||
|
|
||||||
@ -104,7 +104,7 @@ internal_error!(DocumentFormatError: io::Error);
|
|||||||
/// Reads CSV from input and write an obkv batch to writer.
|
/// Reads CSV from input and write an obkv batch to writer.
|
||||||
pub fn read_csv(file: &File, writer: impl Write + Seek) -> Result<usize> {
|
pub fn read_csv(file: &File, writer: impl Write + Seek) -> Result<usize> {
|
||||||
let mut builder = DocumentsBatchBuilder::new(writer);
|
let mut builder = DocumentsBatchBuilder::new(writer);
|
||||||
let mmap = unsafe { MmapOptions::new().map(file).unwrap()};
|
let mmap = unsafe { MmapOptions::new().map(file).unwrap() };
|
||||||
let csv = csv::Reader::from_reader(mmap.as_ref());
|
let csv = csv::Reader::from_reader(mmap.as_ref());
|
||||||
builder.append_csv(csv).map_err(|e| (PayloadType::Csv, e))?;
|
builder.append_csv(csv).map_err(|e| (PayloadType::Csv, e))?;
|
||||||
|
|
||||||
@ -125,15 +125,16 @@ pub fn read_ndjson(file: &File, writer: impl Write + Seek) -> Result<usize> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Reads JSON from temporary file and write an obkv batch to writer.
|
/// Reads JSON from temporary file and write an obkv batch to writer.
|
||||||
fn read_json_inner(file: &File, writer: impl Write + Seek, payload_type: PayloadType) -> Result<usize> {
|
fn read_json_inner(
|
||||||
|
file: &File,
|
||||||
|
writer: impl Write + Seek,
|
||||||
|
payload_type: PayloadType,
|
||||||
|
) -> Result<usize> {
|
||||||
let mut builder = DocumentsBatchBuilder::new(writer);
|
let mut builder = DocumentsBatchBuilder::new(writer);
|
||||||
let mmap = unsafe { MmapOptions::new().map(file).unwrap()};
|
let mmap = unsafe { MmapOptions::new().map(file).unwrap() };
|
||||||
let mut deserializer = serde_json::Deserializer::from_slice(&mmap);
|
let mut deserializer = serde_json::Deserializer::from_slice(&mmap);
|
||||||
|
|
||||||
match array_each(&mut deserializer, |obj: Object | {
|
match array_each(&mut deserializer, |obj: Object| builder.append_json_object(&obj)) {
|
||||||
builder
|
|
||||||
.append_json_object(&obj)
|
|
||||||
}) {
|
|
||||||
Ok(Ok(count)) => debug!("serde json array size: {}", count),
|
Ok(Ok(count)) => debug!("serde json array size: {}", count),
|
||||||
Ok(Err(e)) => return Err(DocumentFormatError::Internal(Box::new(e))),
|
Ok(Err(e)) => return Err(DocumentFormatError::Internal(Box::new(e))),
|
||||||
Err(_e) => {
|
Err(_e) => {
|
||||||
@ -145,8 +146,9 @@ fn read_json_inner(file: &File, writer: impl Write + Seek, payload_type: Payload
|
|||||||
inner: Either<Vec<Object>, Object>,
|
inner: Either<Vec<Object>, Object>,
|
||||||
}
|
}
|
||||||
|
|
||||||
let content: ArrayOrSingleObject =
|
let content: ArrayOrSingleObject = serde_json::from_reader(file)
|
||||||
serde_json::from_reader(file).map_err(Error::Json).map_err(|e| (payload_type, e))?;
|
.map_err(Error::Json)
|
||||||
|
.map_err(|e| (payload_type, e))?;
|
||||||
|
|
||||||
for object in content.inner.map_right(|o| vec![o]).into_inner() {
|
for object in content.inner.map_right(|o| vec![o]).into_inner() {
|
||||||
builder
|
builder
|
||||||
@ -186,14 +188,17 @@ where
|
|||||||
formatter.write_str("a nonempty sequence")
|
formatter.write_str("a nonempty sequence")
|
||||||
}
|
}
|
||||||
|
|
||||||
fn visit_seq<A>(mut self, mut seq: A) -> std::result::Result<io::Result<u64>, <A as SeqAccess<'de>>::Error>
|
fn visit_seq<A>(
|
||||||
|
mut self,
|
||||||
|
mut seq: A,
|
||||||
|
) -> std::result::Result<io::Result<u64>, <A as SeqAccess<'de>>::Error>
|
||||||
where
|
where
|
||||||
A: SeqAccess<'de>,
|
A: SeqAccess<'de>,
|
||||||
{
|
{
|
||||||
let mut max: u64 = 0;
|
let mut max: u64 = 0;
|
||||||
while let Some(value) = seq.next_element::<T>()? {
|
while let Some(value) = seq.next_element::<T>()? {
|
||||||
match self.0(value) {
|
match self.0(value) {
|
||||||
Ok(()) => max = max + 1,
|
Ok(()) => max += 1,
|
||||||
Err(e) => return Ok(Err(e)),
|
Err(e) => return Ok(Err(e)),
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
@ -324,9 +324,10 @@ impl Code {
|
|||||||
DuplicateIndexFound => {
|
DuplicateIndexFound => {
|
||||||
ErrCode::invalid("duplicate_index_found", StatusCode::BAD_REQUEST)
|
ErrCode::invalid("duplicate_index_found", StatusCode::BAD_REQUEST)
|
||||||
}
|
}
|
||||||
ReceivePayloadErr => {
|
ReceivePayloadErr => ErrCode::internal(
|
||||||
ErrCode::internal("receive_payload_internal_exceptions", StatusCode::INTERNAL_SERVER_ERROR)
|
"receive_payload_internal_exceptions",
|
||||||
}
|
StatusCode::INTERNAL_SERVER_ERROR,
|
||||||
|
),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user