mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-01-18 17:11:15 +08:00
Do not duplicate NDJson when unecessary
This commit is contained in:
parent
e974be9518
commit
d683f5980c
@ -136,6 +136,14 @@ pub struct File {
|
||||
}
|
||||
|
||||
impl File {
|
||||
pub fn from_parts(path: PathBuf, file: Option<NamedTempFile>) -> Self {
|
||||
Self { path, file }
|
||||
}
|
||||
|
||||
pub fn into_parts(self) -> (PathBuf, Option<NamedTempFile>) {
|
||||
(self.path, self.file)
|
||||
}
|
||||
|
||||
pub fn dry_file() -> Result<Self> {
|
||||
Ok(Self { path: PathBuf::new(), file: None })
|
||||
}
|
||||
|
@ -250,26 +250,25 @@ pub fn read_json(input: &File, output: impl io::Write) -> Result<u64> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Reads NDJSON from file and write it in NDJSON in a file checking it along the way.
|
||||
pub fn read_ndjson(input: &File, output: impl io::Write) -> Result<u64> {
|
||||
/// Reads NDJSON from file and checks it.
|
||||
pub fn read_ndjson(input: &File) -> Result<u64> {
|
||||
// We memory map to be able to deserialize into a RawMap that
|
||||
// does not allocate when possible and only materialize the first/top level.
|
||||
let input = unsafe { Mmap::map(input).map_err(DocumentFormatError::Io)? };
|
||||
let mut output = BufWriter::new(output);
|
||||
|
||||
let mut bump = Bump::with_capacity(1024 * 1024);
|
||||
|
||||
let mut count = 0;
|
||||
for result in serde_json::Deserializer::from_slice(&input).into_iter() {
|
||||
bump.reset();
|
||||
count += 1;
|
||||
result
|
||||
.and_then(|raw: &RawValue| {
|
||||
match result {
|
||||
Ok(raw) => {
|
||||
// try to deserialize as a map
|
||||
let map = RawMap::from_raw_value(raw, &bump)?;
|
||||
to_writer(&mut output, &map)
|
||||
})
|
||||
.map_err(|e| DocumentFormatError::from((PayloadType::Ndjson, e)))?;
|
||||
RawMap::from_raw_value(raw, &bump)
|
||||
.map_err(|e| DocumentFormatError::from((PayloadType::Ndjson, e)))?;
|
||||
count += 1;
|
||||
}
|
||||
Err(e) => return Err(DocumentFormatError::from((PayloadType::Ndjson, e))),
|
||||
}
|
||||
}
|
||||
|
||||
Ok(count)
|
||||
|
@ -1,5 +1,5 @@
|
||||
use std::collections::HashSet;
|
||||
use std::io::ErrorKind;
|
||||
use std::io::{ErrorKind, Seek as _};
|
||||
use std::marker::PhantomData;
|
||||
|
||||
use actix_web::http::header::CONTENT_TYPE;
|
||||
@ -572,7 +572,7 @@ async fn document_addition(
|
||||
index_uid: IndexUid,
|
||||
primary_key: Option<String>,
|
||||
csv_delimiter: Option<u8>,
|
||||
mut body: Payload,
|
||||
body: Payload,
|
||||
method: IndexDocumentsMethod,
|
||||
task_id: Option<TaskId>,
|
||||
dry_run: bool,
|
||||
@ -609,54 +609,54 @@ async fn document_addition(
|
||||
};
|
||||
|
||||
let (uuid, mut update_file) = index_scheduler.create_update_file(dry_run)?;
|
||||
let documents_count = match format {
|
||||
PayloadType::Ndjson => {
|
||||
let (path, file) = update_file.into_parts();
|
||||
let file = match file {
|
||||
Some(file) => {
|
||||
let (file, path) = file.into_parts();
|
||||
let mut file = copy_body_to_file(file, body, format).await?;
|
||||
file.rewind().map_err(|e| {
|
||||
index_scheduler::Error::FileStore(file_store::Error::IoError(e))
|
||||
})?;
|
||||
Some(tempfile::NamedTempFile::from_parts(file, path))
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
|
||||
let temp_file = match tempfile() {
|
||||
Ok(file) => file,
|
||||
Err(e) => return Err(MeilisearchHttpError::Payload(ReceivePayload(Box::new(e)))),
|
||||
let documents_count = file
|
||||
.as_ref()
|
||||
.map_or(Ok(0), |ntf| read_ndjson(ntf.as_file()))
|
||||
.map_err(|e| MeilisearchHttpError::Payload(ReceivePayload(Box::new(e))));
|
||||
let update_file = file_store::File::from_parts(path, file);
|
||||
update_file.persist()?;
|
||||
Ok(documents_count)
|
||||
}
|
||||
PayloadType::Json | PayloadType::Csv { delimiter: _ } => {
|
||||
let temp_file = match tempfile() {
|
||||
Ok(file) => file,
|
||||
Err(e) => return Err(MeilisearchHttpError::Payload(ReceivePayload(Box::new(e)))),
|
||||
};
|
||||
|
||||
let read_file = copy_body_to_file(temp_file, body, format).await?;
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let documents_count = match format {
|
||||
PayloadType::Json => read_json(&read_file, &mut update_file)?,
|
||||
PayloadType::Csv { delimiter } => {
|
||||
read_csv(&read_file, &mut update_file, delimiter)?
|
||||
}
|
||||
PayloadType::Ndjson => {
|
||||
unreachable!("We already wrote the user content into the update file")
|
||||
}
|
||||
};
|
||||
// we NEED to persist the file here because we moved the `udpate_file` in another task.
|
||||
update_file.persist()?;
|
||||
Ok(documents_count)
|
||||
})
|
||||
.await
|
||||
}
|
||||
};
|
||||
|
||||
let async_file = File::from_std(temp_file);
|
||||
let mut buffer = BufWriter::new(async_file);
|
||||
|
||||
let mut buffer_write_size: usize = 0;
|
||||
while let Some(result) = body.next().await {
|
||||
let byte = result?;
|
||||
|
||||
if byte.is_empty() && buffer_write_size == 0 {
|
||||
return Err(MeilisearchHttpError::MissingPayload(format));
|
||||
}
|
||||
|
||||
match buffer.write_all(&byte).await {
|
||||
Ok(()) => buffer_write_size += 1,
|
||||
Err(e) => return Err(MeilisearchHttpError::Payload(ReceivePayload(Box::new(e)))),
|
||||
}
|
||||
}
|
||||
|
||||
if let Err(e) = buffer.flush().await {
|
||||
return Err(MeilisearchHttpError::Payload(ReceivePayload(Box::new(e))));
|
||||
}
|
||||
|
||||
if buffer_write_size == 0 {
|
||||
return Err(MeilisearchHttpError::MissingPayload(format));
|
||||
}
|
||||
|
||||
if let Err(e) = buffer.seek(std::io::SeekFrom::Start(0)).await {
|
||||
return Err(MeilisearchHttpError::Payload(ReceivePayload(Box::new(e))));
|
||||
}
|
||||
|
||||
let read_file = buffer.into_inner().into_std().await;
|
||||
let documents_count = tokio::task::spawn_blocking(move || {
|
||||
let documents_count = match format {
|
||||
PayloadType::Json => read_json(&read_file, &mut update_file)?,
|
||||
PayloadType::Csv { delimiter } => read_csv(&read_file, &mut update_file, delimiter)?,
|
||||
PayloadType::Ndjson => read_ndjson(&read_file, &mut update_file)?,
|
||||
};
|
||||
// we NEED to persist the file here because we moved the `udpate_file` in another task.
|
||||
update_file.persist()?;
|
||||
Ok(documents_count)
|
||||
})
|
||||
.await;
|
||||
|
||||
let documents_count = match documents_count {
|
||||
Ok(Ok(documents_count)) => documents_count,
|
||||
// in this case the file has not possibly be persisted.
|
||||
@ -703,6 +703,39 @@ async fn document_addition(
|
||||
Ok(task.into())
|
||||
}
|
||||
|
||||
async fn copy_body_to_file(
|
||||
output: std::fs::File,
|
||||
mut body: Payload,
|
||||
format: PayloadType,
|
||||
) -> Result<std::fs::File, MeilisearchHttpError> {
|
||||
let async_file = File::from_std(output);
|
||||
let mut buffer = BufWriter::new(async_file);
|
||||
let mut buffer_write_size: usize = 0;
|
||||
while let Some(result) = body.next().await {
|
||||
let byte = result?;
|
||||
|
||||
if byte.is_empty() && buffer_write_size == 0 {
|
||||
return Err(MeilisearchHttpError::MissingPayload(format));
|
||||
}
|
||||
|
||||
match buffer.write_all(&byte).await {
|
||||
Ok(()) => buffer_write_size += 1,
|
||||
Err(e) => return Err(MeilisearchHttpError::Payload(ReceivePayload(Box::new(e)))),
|
||||
}
|
||||
}
|
||||
if let Err(e) = buffer.flush().await {
|
||||
return Err(MeilisearchHttpError::Payload(ReceivePayload(Box::new(e))));
|
||||
}
|
||||
if buffer_write_size == 0 {
|
||||
return Err(MeilisearchHttpError::MissingPayload(format));
|
||||
}
|
||||
if let Err(e) = buffer.seek(std::io::SeekFrom::Start(0)).await {
|
||||
return Err(MeilisearchHttpError::Payload(ReceivePayload(Box::new(e))));
|
||||
}
|
||||
let read_file = buffer.into_inner().into_std().await;
|
||||
Ok(read_file)
|
||||
}
|
||||
|
||||
pub async fn delete_documents_batch(
|
||||
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_DELETE }>, Data<IndexScheduler>>,
|
||||
index_uid: web::Path<String>,
|
||||
|
Loading…
Reference in New Issue
Block a user