mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-01-18 17:11:15 +08:00
Merge pull request #60 from meilisearch/accept-compressed-documents-updates
Accept and mirror compression of documents additions
This commit is contained in:
commit
5dd4dc2862
14
http-ui/Cargo.lock
generated
14
http-ui/Cargo.lock
generated
@ -80,19 +80,6 @@ dependencies = [
|
|||||||
"warp",
|
"warp",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "async-compression"
|
|
||||||
version = "0.3.6"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "fb1ff21a63d3262af46b9f33a826a8d134e2d0d9b2179c86034948b732ea8b2a"
|
|
||||||
dependencies = [
|
|
||||||
"flate2",
|
|
||||||
"futures-core",
|
|
||||||
"memchr",
|
|
||||||
"pin-project-lite",
|
|
||||||
"tokio",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "atty"
|
name = "atty"
|
||||||
version = "0.2.14"
|
version = "0.2.14"
|
||||||
@ -767,7 +754,6 @@ dependencies = [
|
|||||||
"anyhow",
|
"anyhow",
|
||||||
"askama",
|
"askama",
|
||||||
"askama_warp",
|
"askama_warp",
|
||||||
"async-compression",
|
|
||||||
"byte-unit",
|
"byte-unit",
|
||||||
"bytes",
|
"bytes",
|
||||||
"flate2",
|
"flate2",
|
||||||
|
@ -7,7 +7,6 @@ edition = "2018"
|
|||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
anyhow = "1.0.28"
|
anyhow = "1.0.28"
|
||||||
async-compression = { version = "0.3.6", features = ["gzip", "tokio-02"] }
|
|
||||||
byte-unit = { version = "4.0.9", default-features = false, features = ["std"] }
|
byte-unit = { version = "4.0.9", default-features = false, features = ["std"] }
|
||||||
grenad = { git = "https://github.com/Kerollmops/grenad.git", rev = "3adcb26" }
|
grenad = { git = "https://github.com/Kerollmops/grenad.git", rev = "3adcb26" }
|
||||||
heed = "0.10.5"
|
heed = "0.10.5"
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
use std::borrow::Cow;
|
use std::borrow::Cow;
|
||||||
use std::collections::{HashMap, HashSet};
|
use std::collections::{HashMap, HashSet};
|
||||||
|
use std::fmt::Display;
|
||||||
use std::fs::{File, create_dir_all};
|
use std::fs::{File, create_dir_all};
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::num::NonZeroUsize;
|
use std::num::NonZeroUsize;
|
||||||
@ -10,7 +11,6 @@ use std::time::Instant;
|
|||||||
use std::{mem, io};
|
use std::{mem, io};
|
||||||
|
|
||||||
use askama_warp::Template;
|
use askama_warp::Template;
|
||||||
use async_compression::tokio_02::write::GzipEncoder;
|
|
||||||
use byte_unit::Byte;
|
use byte_unit::Byte;
|
||||||
use flate2::read::GzDecoder;
|
use flate2::read::GzDecoder;
|
||||||
use futures::stream;
|
use futures::stream;
|
||||||
@ -178,7 +178,7 @@ struct IndexTemplate {
|
|||||||
|
|
||||||
#[derive(Template)]
|
#[derive(Template)]
|
||||||
#[template(path = "updates.html")]
|
#[template(path = "updates.html")]
|
||||||
struct UpdatesTemplate<M: Serialize + Send, P: Serialize + Send, N: Serialize + Send> {
|
struct UpdatesTemplate<M: Serialize + Send, P: Serialize + Send, N: Serialize + Send + Display> {
|
||||||
db_name: String,
|
db_name: String,
|
||||||
db_size: usize,
|
db_size: usize,
|
||||||
docs_count: usize,
|
docs_count: usize,
|
||||||
@ -208,7 +208,7 @@ impl<M, P, N> UpdateStatus<M, P, N> {
|
|||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
#[serde(tag = "type")]
|
#[serde(tag = "type")]
|
||||||
enum UpdateMeta {
|
enum UpdateMeta {
|
||||||
DocumentsAddition { method: String, format: String },
|
DocumentsAddition { method: String, format: String, encoding: Option<String> },
|
||||||
ClearDocuments,
|
ClearDocuments,
|
||||||
Settings(Settings),
|
Settings(Settings),
|
||||||
Facets(Facets),
|
Facets(Facets),
|
||||||
@ -323,9 +323,10 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
update_builder.chunk_compression_type(indexer_opt_cloned.chunk_compression_type);
|
update_builder.chunk_compression_type(indexer_opt_cloned.chunk_compression_type);
|
||||||
update_builder.chunk_fusing_shrink_size(indexer_opt_cloned.chunk_fusing_shrink_size.get_bytes());
|
update_builder.chunk_fusing_shrink_size(indexer_opt_cloned.chunk_fusing_shrink_size.get_bytes());
|
||||||
|
|
||||||
|
let before_update = Instant::now();
|
||||||
// we extract the update type and execute the update itself.
|
// we extract the update type and execute the update itself.
|
||||||
let result: anyhow::Result<()> = match meta {
|
let result: anyhow::Result<()> = match meta {
|
||||||
UpdateMeta::DocumentsAddition { method, format } => {
|
UpdateMeta::DocumentsAddition { method, format, encoding } => {
|
||||||
// We must use the write transaction of the update here.
|
// We must use the write transaction of the update here.
|
||||||
let mut wtxn = index_cloned.write_txn()?;
|
let mut wtxn = index_cloned.write_txn()?;
|
||||||
let mut builder = update_builder.index_documents(&mut wtxn, &index_cloned);
|
let mut builder = update_builder.index_documents(&mut wtxn, &index_cloned);
|
||||||
@ -343,11 +344,10 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
otherwise => panic!("invalid indexing method {:?}", otherwise),
|
otherwise => panic!("invalid indexing method {:?}", otherwise),
|
||||||
};
|
};
|
||||||
|
|
||||||
let gzipped = true;
|
let reader = match encoding.as_deref() {
|
||||||
let reader = if gzipped {
|
Some("gzip") => Box::new(GzDecoder::new(content)),
|
||||||
Box::new(GzDecoder::new(content))
|
None => Box::new(content) as Box<dyn io::Read>,
|
||||||
} else {
|
otherwise => panic!("invalid encoding format {:?}", otherwise),
|
||||||
Box::new(content) as Box<dyn io::Read>
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let result = builder.execute(reader, |indexing_step| {
|
let result = builder.execute(reader, |indexing_step| {
|
||||||
@ -458,7 +458,7 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
};
|
};
|
||||||
|
|
||||||
let meta = match result {
|
let meta = match result {
|
||||||
Ok(()) => format!("valid update content"),
|
Ok(()) => format!("valid update content processed in {:.02?}", before_update.elapsed()),
|
||||||
Err(e) => format!("error while processing update content: {:?}", e),
|
Err(e) => format!("error while processing update content: {:?}", e),
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -703,21 +703,18 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
update_status_sender: broadcast::Sender<UpdateStatus<UpdateMeta, UpdateMetaProgress, String>>,
|
update_status_sender: broadcast::Sender<UpdateStatus<UpdateMeta, UpdateMetaProgress, String>>,
|
||||||
update_method: Option<String>,
|
update_method: Option<String>,
|
||||||
update_format: UpdateFormat,
|
update_format: UpdateFormat,
|
||||||
|
encoding: Option<String>,
|
||||||
mut stream: impl futures::Stream<Item=Result<impl bytes::Buf, warp::Error>> + Unpin,
|
mut stream: impl futures::Stream<Item=Result<impl bytes::Buf, warp::Error>> + Unpin,
|
||||||
) -> Result<impl warp::Reply, warp::Rejection>
|
) -> Result<impl warp::Reply, warp::Rejection>
|
||||||
{
|
{
|
||||||
let file = tokio::task::block_in_place(tempfile::tempfile).unwrap();
|
let file = tokio::task::block_in_place(tempfile::tempfile).unwrap();
|
||||||
let file = TFile::from_std(file);
|
let mut file = TFile::from_std(file);
|
||||||
let mut encoder = GzipEncoder::new(file);
|
|
||||||
|
|
||||||
while let Some(result) = stream.next().await {
|
while let Some(result) = stream.next().await {
|
||||||
let bytes = result.unwrap().to_bytes();
|
let bytes = result.unwrap().to_bytes();
|
||||||
encoder.write_all(&bytes[..]).await.unwrap();
|
file.write_all(&bytes[..]).await.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
encoder.shutdown().await.unwrap();
|
|
||||||
let mut file = encoder.into_inner();
|
|
||||||
file.sync_all().await.unwrap();
|
|
||||||
let file = file.into_std().await;
|
let file = file.into_std().await;
|
||||||
let mmap = unsafe { memmap::Mmap::map(&file).unwrap() };
|
let mmap = unsafe { memmap::Mmap::map(&file).unwrap() };
|
||||||
|
|
||||||
@ -734,7 +731,7 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
_ => panic!("Unknown update format"),
|
_ => panic!("Unknown update format"),
|
||||||
};
|
};
|
||||||
|
|
||||||
let meta = UpdateMeta::DocumentsAddition { method, format };
|
let meta = UpdateMeta::DocumentsAddition { method, format, encoding };
|
||||||
let update_id = update_store.register_update(&meta, &mmap[..]).unwrap();
|
let update_id = update_store.register_update(&meta, &mmap[..]).unwrap();
|
||||||
let _ = update_status_sender.send(UpdateStatus::Pending { update_id, meta });
|
let _ = update_status_sender.send(UpdateStatus::Pending { update_id, meta });
|
||||||
eprintln!("update {} registered", update_id);
|
eprintln!("update {} registered", update_id);
|
||||||
@ -749,51 +746,26 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
|
|
||||||
let update_store_cloned = update_store.clone();
|
let update_store_cloned = update_store.clone();
|
||||||
let update_status_sender_cloned = update_status_sender.clone();
|
let update_status_sender_cloned = update_status_sender.clone();
|
||||||
let indexing_csv_route = warp::filters::method::post()
|
let indexing_route = warp::filters::method::post()
|
||||||
.and(warp::path!("documents"))
|
.and(warp::path!("documents"))
|
||||||
.and(warp::header::exact_ignore_case("content-type", "text/csv"))
|
.and(warp::header::header("content-type"))
|
||||||
.and(warp::filters::query::query())
|
.and(warp::header::optional::<String>("content-encoding"))
|
||||||
|
.and(warp::query::query())
|
||||||
.and(warp::body::stream())
|
.and(warp::body::stream())
|
||||||
.and_then(move |params: QueryUpdate, stream| {
|
.and_then(move |content_type: String, content_encoding, params: QueryUpdate, stream| {
|
||||||
buf_stream(
|
let format = match content_type.as_str() {
|
||||||
update_store_cloned.clone(),
|
"text/csv" => UpdateFormat::Csv,
|
||||||
update_status_sender_cloned.clone(),
|
"application/json" => UpdateFormat::Json,
|
||||||
params.method,
|
"application/x-ndjson" => UpdateFormat::JsonStream,
|
||||||
UpdateFormat::Csv,
|
otherwise => panic!("invalid update format: {}", otherwise),
|
||||||
stream,
|
};
|
||||||
)
|
|
||||||
});
|
|
||||||
|
|
||||||
let update_store_cloned = update_store.clone();
|
|
||||||
let update_status_sender_cloned = update_status_sender.clone();
|
|
||||||
let indexing_json_route = warp::filters::method::post()
|
|
||||||
.and(warp::path!("documents"))
|
|
||||||
.and(warp::header::exact_ignore_case("content-type", "application/json"))
|
|
||||||
.and(warp::filters::query::query())
|
|
||||||
.and(warp::body::stream())
|
|
||||||
.and_then(move |params: QueryUpdate, stream| {
|
|
||||||
buf_stream(
|
buf_stream(
|
||||||
update_store_cloned.clone(),
|
update_store_cloned.clone(),
|
||||||
update_status_sender_cloned.clone(),
|
update_status_sender_cloned.clone(),
|
||||||
params.method,
|
params.method,
|
||||||
UpdateFormat::Json,
|
format,
|
||||||
stream,
|
content_encoding,
|
||||||
)
|
|
||||||
});
|
|
||||||
|
|
||||||
let update_store_cloned = update_store.clone();
|
|
||||||
let update_status_sender_cloned = update_status_sender.clone();
|
|
||||||
let indexing_json_stream_route = warp::filters::method::post()
|
|
||||||
.and(warp::path!("documents"))
|
|
||||||
.and(warp::header::exact_ignore_case("content-type", "application/x-ndjson"))
|
|
||||||
.and(warp::filters::query::query())
|
|
||||||
.and(warp::body::stream())
|
|
||||||
.and_then(move |params: QueryUpdate, stream| {
|
|
||||||
buf_stream(
|
|
||||||
update_store_cloned.clone(),
|
|
||||||
update_status_sender_cloned.clone(),
|
|
||||||
params.method,
|
|
||||||
UpdateFormat::JsonStream,
|
|
||||||
stream,
|
stream,
|
||||||
)
|
)
|
||||||
});
|
});
|
||||||
@ -904,9 +876,7 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
.or(dash_logo_black_route)
|
.or(dash_logo_black_route)
|
||||||
.or(query_route)
|
.or(query_route)
|
||||||
.or(document_route)
|
.or(document_route)
|
||||||
.or(indexing_csv_route)
|
.or(indexing_route)
|
||||||
.or(indexing_json_route)
|
|
||||||
.or(indexing_json_stream_route)
|
|
||||||
.or(abort_update_id_route)
|
.or(abort_update_id_route)
|
||||||
.or(abort_pending_updates_route)
|
.or(abort_pending_updates_route)
|
||||||
.or(clearing_route)
|
.or(clearing_route)
|
||||||
|
@ -68,7 +68,7 @@
|
|||||||
<ol>
|
<ol>
|
||||||
<li class="field">
|
<li class="field">
|
||||||
<div class="attribute">update id</div><div class="updateId content">{{ update_id }}</div>
|
<div class="attribute">update id</div><div class="updateId content">{{ update_id }}</div>
|
||||||
<div class="attribute">update status</div><div class="updateStatus content">processed</div>
|
<div class="attribute">update status</div><div class="updateStatus content">{{ meta }}</div>
|
||||||
</li>
|
</li>
|
||||||
</ol>
|
</ol>
|
||||||
</li>
|
</li>
|
||||||
|
Loading…
Reference in New Issue
Block a user