From fe32097964a1c0392d4c4c716ba581326eff22af Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Thu, 16 Jun 2022 17:24:38 +0200 Subject: [PATCH 1/8] Update milli v0.32 --- Cargo.lock | 37 ++++++++++++++----------------------- meilisearch-auth/Cargo.toml | 2 +- meilisearch-lib/Cargo.toml | 2 +- 3 files changed, 16 insertions(+), 25 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2b41dd265..fbc5db25b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -644,9 +644,9 @@ dependencies = [ [[package]] name = "charabia" -version = "0.5.0" +version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a26a3df4d9c9231eb1e757fe6b1c66c471e0c2cd5410265e7c3109a726663c4" +checksum = "2ed19edcd98f5bf6572f48d6f5982d595cb8718e47c6f0066d942b280575ff02" dependencies = [ "character_converter", "cow-utils", @@ -1123,8 +1123,8 @@ dependencies = [ [[package]] name = "filter-parser" -version = "0.31.2" -source = "git+https://github.com/meilisearch/milli.git?tag=v0.31.2#132558bf6a4e434de2a48314c4a208dea295a992" +version = "0.32.0" +source = "git+https://github.com/meilisearch/milli.git?tag=v0.32.0#e1bc610d2722a8010216c45d5a32cbe3db18468e" dependencies = [ "nom", "nom_locate", @@ -1148,8 +1148,8 @@ dependencies = [ [[package]] name = "flatten-serde-json" -version = "0.31.2" -source = "git+https://github.com/meilisearch/milli.git?tag=v0.31.2#132558bf6a4e434de2a48314c4a208dea295a992" +version = "0.32.0" +source = "git+https://github.com/meilisearch/milli.git?tag=v0.32.0#e1bc610d2722a8010216c45d5a32cbe3db18468e" dependencies = [ "serde_json", ] @@ -1661,8 +1661,8 @@ dependencies = [ [[package]] name = "json-depth-checker" -version = "0.31.2" -source = "git+https://github.com/meilisearch/milli.git?tag=v0.31.2#132558bf6a4e434de2a48314c4a208dea295a992" +version = "0.32.0" +source = "git+https://github.com/meilisearch/milli.git?tag=v0.32.0#e1bc610d2722a8010216c45d5a32cbe3db18468e" dependencies = [ "serde_json", ] @@ -2013,7 +2013,7 @@ dependencies = [ "sha2", "thiserror", "time 0.3.9", - "uuid 1.1.2", + "uuid", ] [[package]] @@ -2082,7 +2082,7 @@ dependencies = [ "tokio", "tokio-stream", "urlencoding", - "uuid 1.1.2", + "uuid", "vergen", "walkdir", "yaup", @@ -2146,7 +2146,7 @@ dependencies = [ "thiserror", "time 0.3.9", "tokio", - "uuid 1.1.2", + "uuid", "walkdir", "whoami", ] @@ -2188,8 +2188,8 @@ dependencies = [ [[package]] name = "milli" -version = "0.31.2" -source = "git+https://github.com/meilisearch/milli.git?tag=v0.31.2#132558bf6a4e434de2a48314c4a208dea295a992" +version = "0.32.0" +source = "git+https://github.com/meilisearch/milli.git?tag=v0.32.0#e1bc610d2722a8010216c45d5a32cbe3db18468e" dependencies = [ "bimap", "bincode", @@ -2228,7 +2228,7 @@ dependencies = [ "tempfile", "thiserror", "time 0.3.9", - "uuid 0.8.2", + "uuid", ] [[package]] @@ -3670,15 +3670,6 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5190c9442dcdaf0ddd50f37420417d219ae5261bbf5db120d0f9bab996c9cba1" -[[package]] -name = "uuid" -version = "0.8.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7" -dependencies = [ - "getrandom", -] - [[package]] name = "uuid" version = "1.1.2" diff --git a/meilisearch-auth/Cargo.toml b/meilisearch-auth/Cargo.toml index 812558581..9a4966201 100644 --- a/meilisearch-auth/Cargo.toml +++ b/meilisearch-auth/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" enum-iterator = "0.7.0" hmac = "0.12.1" meilisearch-types = { path = "../meilisearch-types" } -milli = { git = "https://github.com/meilisearch/milli.git", tag = "v0.31.2" } +milli = { git = "https://github.com/meilisearch/milli.git", tag = "v0.32.0" } rand = "0.8.4" serde = { version = "1.0.136", features = ["derive"] } serde_json = { version = "1.0.79", features = ["preserve_order"] } diff --git a/meilisearch-lib/Cargo.toml b/meilisearch-lib/Cargo.toml index 672897380..09bef9515 100644 --- a/meilisearch-lib/Cargo.toml +++ b/meilisearch-lib/Cargo.toml @@ -28,7 +28,7 @@ lazy_static = "1.4.0" log = "0.4.14" meilisearch-auth = { path = "../meilisearch-auth" } meilisearch-types = { path = "../meilisearch-types" } -milli = { git = "https://github.com/meilisearch/milli.git", tag = "v0.31.2" } +milli = { git = "https://github.com/meilisearch/milli.git", tag = "v0.32.0" } mime = "0.3.16" num_cpus = "1.13.1" obkv = "0.2.0" From 73d4869e5e7291b8986e4eed5eecd46f8039d756 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Thu, 16 Jun 2022 12:06:20 +0200 Subject: [PATCH 2/8] Make the changes to plug the new DocumentsBatch system --- meilisearch-lib/src/document_formats.rs | 96 ++++++++++++++---------- meilisearch-lib/src/index/dump.rs | 9 ++- meilisearch-lib/src/index/updates.rs | 9 ++- meilisearch-lib/src/update_file_store.rs | 10 ++- 4 files changed, 72 insertions(+), 52 deletions(-) diff --git a/meilisearch-lib/src/document_formats.rs b/meilisearch-lib/src/document_formats.rs index de3d7f5d5..5b224cf49 100644 --- a/meilisearch-lib/src/document_formats.rs +++ b/meilisearch-lib/src/document_formats.rs @@ -1,10 +1,10 @@ use std::borrow::Borrow; use std::fmt::{self, Debug, Display}; -use std::io::{self, BufRead, BufReader, BufWriter, Cursor, Read, Seek, Write}; +use std::io::{self, BufReader, Read, Seek, Write}; use meilisearch_types::error::{Code, ErrorCode}; use meilisearch_types::internal_error; -use milli::documents::DocumentBatchBuilder; +use milli::documents::{DocumentsBatchBuilder, Error}; type Result = std::result::Result; @@ -18,9 +18,9 @@ pub enum PayloadType { impl fmt::Display for PayloadType { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - PayloadType::Ndjson => write!(f, "ndjson"), - PayloadType::Json => write!(f, "json"), - PayloadType::Csv => write!(f, "csv"), + PayloadType::Ndjson => f.write_str("ndjson"), + PayloadType::Json => f.write_str("json"), + PayloadType::Csv => f.write_str("csv"), } } } @@ -28,7 +28,7 @@ impl fmt::Display for PayloadType { #[derive(Debug)] pub enum DocumentFormatError { Internal(Box), - MalformedPayload(Box, PayloadType), + MalformedPayload(Error, PayloadType), } impl Display for DocumentFormatError { @@ -36,7 +36,7 @@ impl Display for DocumentFormatError { match self { Self::Internal(e) => write!(f, "An internal error has occurred: `{}`.", e), Self::MalformedPayload(me, b) => match me.borrow() { - milli::documents::Error::JsonError(se) => { + Error::Json(se) => { // https://github.com/meilisearch/meilisearch/issues/2107 // The user input maybe insanely long. We need to truncate it. let mut serde_msg = se.to_string(); @@ -59,11 +59,11 @@ impl Display for DocumentFormatError { impl std::error::Error for DocumentFormatError {} -impl From<(PayloadType, milli::documents::Error)> for DocumentFormatError { - fn from((ty, error): (PayloadType, milli::documents::Error)) -> Self { +impl From<(PayloadType, Error)> for DocumentFormatError { + fn from((ty, error): (PayloadType, Error)) -> Self { match error { - milli::documents::Error::Io(e) => Self::Internal(Box::new(e)), - e => Self::MalformedPayload(Box::new(e), ty), + Error::Io(e) => Self::Internal(Box::new(e)), + e => Self::MalformedPayload(e, ty), } } } @@ -79,51 +79,67 @@ impl ErrorCode for DocumentFormatError { internal_error!(DocumentFormatError: io::Error); -/// reads csv from input and write an obkv batch to writer. +/// Reads CSV from input and write an obkv batch to writer. pub fn read_csv(input: impl Read, writer: impl Write + Seek) -> Result { - let writer = BufWriter::new(writer); - let builder = - DocumentBatchBuilder::from_csv(input, writer).map_err(|e| (PayloadType::Csv, e))?; + let mut builder = DocumentsBatchBuilder::new(writer); - let count = builder.finish().map_err(|e| (PayloadType::Csv, e))?; + let csv = csv::Reader::from_reader(input); + builder.append_csv(csv).map_err(|e| (PayloadType::Csv, e))?; - Ok(count) + let count = builder.documents_count(); + let _ = builder + .into_inner() + .map_err(Into::into) + .map_err(DocumentFormatError::Internal)?; + + Ok(count as usize) } -/// reads jsonl from input and write an obkv batch to writer. +/// Reads JSON Lines from input and write an obkv batch to writer. pub fn read_ndjson(input: impl Read, writer: impl Write + Seek) -> Result { + let mut builder = DocumentsBatchBuilder::new(writer); let mut reader = BufReader::new(input); - let writer = BufWriter::new(writer); - let mut builder = DocumentBatchBuilder::new(writer).map_err(|e| (PayloadType::Ndjson, e))?; - let mut buf = String::new(); - - while reader.read_line(&mut buf)? > 0 { - // skip empty lines - if buf == "\n" { - buf.clear(); - continue; - } - builder - .extend_from_json(Cursor::new(&buf.as_bytes())) + for result in serde_json::Deserializer::from_reader(reader).into_iter() { + let object = result + .map_err(Error::Json) .map_err(|e| (PayloadType::Ndjson, e))?; - buf.clear(); + builder + .append_json_object(&object) + .map_err(Into::into) + .map_err(DocumentFormatError::Internal)?; } - let count = builder.finish().map_err(|e| (PayloadType::Ndjson, e))?; + let count = builder.documents_count(); + let _ = builder + .into_inner() + .map_err(Into::into) + .map_err(DocumentFormatError::Internal)?; - Ok(count) + Ok(count as usize) } -/// reads json from input and write an obkv batch to writer. +/// Reads JSON from input and write an obkv batch to writer. pub fn read_json(input: impl Read, writer: impl Write + Seek) -> Result { - let writer = BufWriter::new(writer); - let mut builder = DocumentBatchBuilder::new(writer).map_err(|e| (PayloadType::Json, e))?; - builder - .extend_from_json(input) + let mut builder = DocumentsBatchBuilder::new(writer); + let mut reader = BufReader::new(input); + + let objects: Vec<_> = serde_json::from_reader(reader) + .map_err(Error::Json) .map_err(|e| (PayloadType::Json, e))?; - let count = builder.finish().map_err(|e| (PayloadType::Json, e))?; + for object in objects { + builder + .append_json_object(&object) + .map_err(Into::into) + .map_err(DocumentFormatError::Internal)?; + } - Ok(count) + let count = builder.documents_count(); + let _ = builder + .into_inner() + .map_err(Into::into) + .map_err(DocumentFormatError::Internal)?; + + Ok(count as usize) } diff --git a/meilisearch-lib/src/index/dump.rs b/meilisearch-lib/src/index/dump.rs index c6feb187f..6a41fa7a0 100644 --- a/meilisearch-lib/src/index/dump.rs +++ b/meilisearch-lib/src/index/dump.rs @@ -4,7 +4,7 @@ use std::path::Path; use anyhow::Context; use indexmap::IndexMap; -use milli::documents::DocumentBatchReader; +use milli::documents::DocumentsBatchReader; use milli::heed::{EnvOpenOptions, RoTxn}; use milli::update::{IndexDocumentsConfig, IndexerConfig}; use serde::{Deserialize, Serialize}; @@ -135,19 +135,20 @@ impl Index { if !empty { tmp_doc_file.seek(SeekFrom::Start(0))?; - let documents_reader = DocumentBatchReader::from_reader(tmp_doc_file)?; + let documents_reader = DocumentsBatchReader::from_reader(tmp_doc_file)?; //If the document file is empty, we don't perform the document addition, to prevent //a primary key error to be thrown. let config = IndexDocumentsConfig::default(); - let mut builder = milli::update::IndexDocuments::new( + let builder = milli::update::IndexDocuments::new( &mut txn, &index, indexer_config, config, |_| (), )?; - builder.add_documents(documents_reader)?; + let (builder, user_error) = builder.add_documents(documents_reader)?; + user_error?; builder.execute()?; } diff --git a/meilisearch-lib/src/index/updates.rs b/meilisearch-lib/src/index/updates.rs index 07695af05..b3a4205b7 100644 --- a/meilisearch-lib/src/index/updates.rs +++ b/meilisearch-lib/src/index/updates.rs @@ -3,7 +3,7 @@ use std::marker::PhantomData; use std::num::NonZeroUsize; use log::{debug, info, trace}; -use milli::documents::DocumentBatchReader; +use milli::documents::DocumentsBatchReader; use milli::update::{ DocumentAdditionResult, DocumentDeletionResult, IndexDocumentsConfig, IndexDocumentsMethod, Setting, @@ -315,7 +315,7 @@ impl Index { }; let indexing_callback = |indexing_step| debug!("update: {:?}", indexing_step); - let mut builder = milli::update::IndexDocuments::new( + let builder = milli::update::IndexDocuments::new( &mut txn, self, self.indexer_config.as_ref(), @@ -325,8 +325,9 @@ impl Index { for content_uuid in contents.into_iter() { let content_file = file_store.get_update(content_uuid)?; - let reader = DocumentBatchReader::from_reader(content_file)?; - builder.add_documents(reader)?; + let reader = DocumentsBatchReader::from_reader(content_file)?; + let (builder, user_error) = builder.add_documents(reader)?; + todo!("use the user_error here"); } let addition = builder.execute()?; diff --git a/meilisearch-lib/src/update_file_store.rs b/meilisearch-lib/src/update_file_store.rs index 3a60dfe26..e1be0dbd4 100644 --- a/meilisearch-lib/src/update_file_store.rs +++ b/meilisearch-lib/src/update_file_store.rs @@ -3,7 +3,7 @@ use std::io::{self, BufReader, BufWriter, Write}; use std::ops::{Deref, DerefMut}; use std::path::{Path, PathBuf}; -use milli::documents::DocumentBatchReader; +use milli::documents::DocumentsBatchReader; use serde_json::Map; use tempfile::{NamedTempFile, PersistError}; use uuid::Uuid; @@ -44,7 +44,8 @@ into_update_store_error!( PersistError, io::Error, serde_json::Error, - milli::documents::Error + milli::documents::Error, + milli::documents::DocumentsBatchCursorError ); impl UpdateFile { @@ -149,12 +150,13 @@ mod store { let update_file = File::open(update_file_path)?; let mut dst_file = NamedTempFile::new_in(&dump_path)?; - let mut document_reader = DocumentBatchReader::from_reader(update_file)?; + let mut document_cursor = DocumentsBatchReader::from_reader(update_file)?.into_cursor(); + let index = document_cursor.documents_batch_index(); let mut document_buffer = Map::new(); // TODO: we need to find a way to do this more efficiently. (create a custom serializer // for jsonl for example...) - while let Some((index, document)) = document_reader.next_document_with_index()? { + while let Some(document) = document_cursor.next_document()? { for (field_id, content) in document.iter() { if let Some(field_name) = index.name(field_id) { let content = serde_json::from_slice(content)?; From e3426d5b7a7269155288ae63e3863b9a2666f031 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Thu, 16 Jun 2022 15:58:39 +0200 Subject: [PATCH 3/8] Improve the tasks error reporting --- meilisearch-http/src/helpers/env.rs | 17 ------- meilisearch-http/src/helpers/mod.rs | 3 -- meilisearch-http/src/lib.rs | 1 - .../tests/documents/add_documents.rs | 14 ++---- meilisearch-http/tests/search/mod.rs | 4 +- meilisearch-lib/src/document_formats.rs | 4 +- meilisearch-lib/src/error.rs | 5 +- meilisearch-lib/src/index/error.rs | 6 +++ meilisearch-lib/src/index/index.rs | 9 ++-- meilisearch-lib/src/index/mod.rs | 10 ++-- meilisearch-lib/src/index/updates.rs | 36 +++++++++----- meilisearch-lib/src/index_resolver/mod.rs | 47 +++++++++++-------- meilisearch-lib/src/update_file_store.rs | 4 +- permissive-json-pointer/src/lib.rs | 4 +- 14 files changed, 83 insertions(+), 81 deletions(-) delete mode 100644 meilisearch-http/src/helpers/env.rs delete mode 100644 meilisearch-http/src/helpers/mod.rs diff --git a/meilisearch-http/src/helpers/env.rs b/meilisearch-http/src/helpers/env.rs deleted file mode 100644 index b76c9c8a7..000000000 --- a/meilisearch-http/src/helpers/env.rs +++ /dev/null @@ -1,17 +0,0 @@ -use meilisearch_lib::heed::Env; -use walkdir::WalkDir; - -pub trait EnvSizer { - fn size(&self) -> u64; -} - -impl EnvSizer for Env { - fn size(&self) -> u64 { - WalkDir::new(self.path()) - .into_iter() - .filter_map(|entry| entry.ok()) - .filter_map(|entry| entry.metadata().ok()) - .filter(|metadata| metadata.is_file()) - .fold(0, |acc, m| acc + m.len()) - } -} diff --git a/meilisearch-http/src/helpers/mod.rs b/meilisearch-http/src/helpers/mod.rs deleted file mode 100644 index 3908c440c..000000000 --- a/meilisearch-http/src/helpers/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -mod env; - -pub use env::EnvSizer; diff --git a/meilisearch-http/src/lib.rs b/meilisearch-http/src/lib.rs index 6485784fc..9df66071e 100644 --- a/meilisearch-http/src/lib.rs +++ b/meilisearch-http/src/lib.rs @@ -5,7 +5,6 @@ pub mod analytics; pub mod task; #[macro_use] pub mod extractors; -pub mod helpers; pub mod option; pub mod routes; diff --git a/meilisearch-http/tests/documents/add_documents.rs b/meilisearch-http/tests/documents/add_documents.rs index 85b88ca36..cb7030051 100644 --- a/meilisearch-http/tests/documents/add_documents.rs +++ b/meilisearch-http/tests/documents/add_documents.rs @@ -326,7 +326,7 @@ async fn error_add_malformed_json_documents() { assert_eq!( response["message"], json!( - r#"The `json` payload provided is malformed. `Couldn't serialize document value: invalid type: string "0123456789012345678901234567...890123456789", expected a documents, or a sequence of documents. at line 1 column 102`."# + r#"The `json` payload provided is malformed. `Couldn't serialize document value: invalid type: string "0123456789012345678901234567...890123456789012345678901234567890123456789", expected a sequence at line 1 column 102`."# ) ); assert_eq!(response["code"], json!("malformed_payload")); @@ -349,9 +349,7 @@ async fn error_add_malformed_json_documents() { assert_eq!(status_code, 400); assert_eq!( response["message"], - json!( - r#"The `json` payload provided is malformed. `Couldn't serialize document value: invalid type: string "0123456789012345678901234567...90123456789m", expected a documents, or a sequence of documents. at line 1 column 103`."# - ) + json!("The `json` payload provided is malformed. `Couldn't serialize document value: invalid type: string \"0123456789012345678901234567...90123456789012345678901234567890123456789m\", expected a sequence at line 1 column 103`.") ); assert_eq!(response["code"], json!("malformed_payload")); assert_eq!(response["type"], json!("invalid_request")); @@ -388,7 +386,7 @@ async fn error_add_malformed_ndjson_documents() { assert_eq!( response["message"], json!( - r#"The `ndjson` payload provided is malformed. `Couldn't serialize document value: key must be a string at line 1 column 2`."# + r#"The `ndjson` payload provided is malformed. `Couldn't serialize document value: key must be a string at line 2 column 2`."# ) ); assert_eq!(response["code"], json!("malformed_payload")); @@ -411,9 +409,7 @@ async fn error_add_malformed_ndjson_documents() { assert_eq!(status_code, 400); assert_eq!( response["message"], - json!( - r#"The `ndjson` payload provided is malformed. `Couldn't serialize document value: key must be a string at line 1 column 2`."# - ) + json!("The `ndjson` payload provided is malformed. `Couldn't serialize document value: key must be a string at line 2 column 2`.") ); assert_eq!(response["code"], json!("malformed_payload")); assert_eq!(response["type"], json!("invalid_request")); @@ -1020,7 +1016,7 @@ async fn add_documents_invalid_geo_field() { index.wait_task(2).await; let (response, code) = index.get_task(2).await; assert_eq!(code, 200); - assert_eq!(response["status"], "succeeded"); + assert_eq!(response["status"], "failed"); } #[actix_rt::test] diff --git a/meilisearch-http/tests/search/mod.rs b/meilisearch-http/tests/search/mod.rs index 17f53fa2d..d5e916860 100644 --- a/meilisearch-http/tests/search/mod.rs +++ b/meilisearch-http/tests/search/mod.rs @@ -708,9 +708,7 @@ async fn faceting_max_values_per_facet() { }), |response, code| { assert_eq!(code, 200, "{}", response); - let numbers = dbg!(&response)["facetDistribution"]["number"] - .as_object() - .unwrap(); + let numbers = &response["facetDistribution"]["number"].as_object().unwrap(); assert_eq!(numbers.len(), 10_000); }, ) diff --git a/meilisearch-lib/src/document_formats.rs b/meilisearch-lib/src/document_formats.rs index 5b224cf49..72e899845 100644 --- a/meilisearch-lib/src/document_formats.rs +++ b/meilisearch-lib/src/document_formats.rs @@ -98,7 +98,7 @@ pub fn read_csv(input: impl Read, writer: impl Write + Seek) -> Result { /// Reads JSON Lines from input and write an obkv batch to writer. pub fn read_ndjson(input: impl Read, writer: impl Write + Seek) -> Result { let mut builder = DocumentsBatchBuilder::new(writer); - let mut reader = BufReader::new(input); + let reader = BufReader::new(input); for result in serde_json::Deserializer::from_reader(reader).into_iter() { let object = result @@ -122,7 +122,7 @@ pub fn read_ndjson(input: impl Read, writer: impl Write + Seek) -> Result /// Reads JSON from input and write an obkv batch to writer. pub fn read_json(input: impl Read, writer: impl Write + Seek) -> Result { let mut builder = DocumentsBatchBuilder::new(writer); - let mut reader = BufReader::new(input); + let reader = BufReader::new(input); let objects: Vec<_> = serde_json::from_reader(reader) .map_err(Error::Json) diff --git a/meilisearch-lib/src/error.rs b/meilisearch-lib/src/error.rs index 83e9263b4..168f2f88b 100644 --- a/meilisearch-lib/src/error.rs +++ b/meilisearch-lib/src/error.rs @@ -25,6 +25,7 @@ impl ErrorCode for MilliError<'_> { // TODO: wait for spec for new error codes. UserError::SerdeJson(_) | UserError::DocumentLimitReached + | UserError::AccessingSoftDeletedDocument { .. } | UserError::UnknownInternalDocumentId { .. } => Code::Internal, UserError::InvalidStoreFile => Code::InvalidStore, UserError::NoSpaceLeftOnDevice => Code::NoSpaceLeftOnDevice, @@ -32,7 +33,9 @@ impl ErrorCode for MilliError<'_> { UserError::AttributeLimitReached => Code::MaxFieldsLimitExceeded, UserError::InvalidFilter(_) => Code::Filter, UserError::MissingDocumentId { .. } => Code::MissingDocumentId, - UserError::InvalidDocumentId { .. } => Code::InvalidDocumentId, + UserError::InvalidDocumentId { .. } | UserError::TooManyDocumentIds { .. } => { + Code::InvalidDocumentId + } UserError::MissingPrimaryKey => Code::MissingPrimaryKey, UserError::PrimaryKeyCannotBeChanged(_) => Code::PrimaryKeyAlreadyPresent, UserError::SortRankingRuleMissing => Code::Sort, diff --git a/meilisearch-lib/src/index/error.rs b/meilisearch-lib/src/index/error.rs index e31fcc4a0..f795ceaa4 100644 --- a/meilisearch-lib/src/index/error.rs +++ b/meilisearch-lib/src/index/error.rs @@ -40,6 +40,12 @@ impl ErrorCode for IndexError { } } +impl From for IndexError { + fn from(error: milli::UserError) -> IndexError { + IndexError::Milli(error.into()) + } +} + #[derive(Debug, thiserror::Error)] pub enum FacetError { #[error("Invalid syntax for the filter parameter: `expected {}, found: {1}`.", .0.join(", "))] diff --git a/meilisearch-lib/src/index/index.rs b/meilisearch-lib/src/index/index.rs index 518e9ce3e..02425d0bf 100644 --- a/meilisearch-lib/src/index/index.rs +++ b/meilisearch-lib/src/index/index.rs @@ -4,7 +4,6 @@ use std::marker::PhantomData; use std::ops::Deref; use std::path::Path; use std::sync::Arc; -use walkdir::WalkDir; use fst::IntoStreamer; use milli::heed::{CompactionOption, EnvOpenOptions, RoTxn}; @@ -14,6 +13,7 @@ use serde::{Deserialize, Serialize}; use serde_json::{Map, Value}; use time::OffsetDateTime; use uuid::Uuid; +use walkdir::WalkDir; use crate::index::search::DEFAULT_PAGINATION_MAX_TOTAL_HITS; @@ -245,11 +245,8 @@ impl Index { let fields_ids_map = self.fields_ids_map(&txn)?; let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect(); - let iter = self.all_documents(&txn)?.skip(offset).take(limit); - let mut documents = Vec::new(); - - for entry in iter { + for entry in self.all_documents(&txn)?.skip(offset).take(limit) { let (_id, obkv) = entry?; let document = obkv_to_json(&all_fields, &fields_ids_map, obkv)?; let document = match &attributes_to_retrieve { @@ -302,7 +299,7 @@ impl Index { } pub fn size(&self) -> u64 { - WalkDir::new(self.inner.path()) + WalkDir::new(self.path()) .into_iter() .filter_map(|entry| entry.ok()) .filter_map(|entry| entry.metadata().ok()) diff --git a/meilisearch-lib/src/index/mod.rs b/meilisearch-lib/src/index/mod.rs index e6c831a01..28d6e0222 100644 --- a/meilisearch-lib/src/index/mod.rs +++ b/meilisearch-lib/src/index/mod.rs @@ -24,12 +24,12 @@ pub use test::MockIndex as Index; /// code for unit testing, in places where an index would normally be used. #[cfg(test)] pub mod test { - use std::path::Path; - use std::path::PathBuf; + use std::path::{Path, PathBuf}; use std::sync::Arc; - use milli::update::IndexerConfig; - use milli::update::{DocumentAdditionResult, DocumentDeletionResult, IndexDocumentsMethod}; + use milli::update::{ + DocumentAdditionResult, DocumentDeletionResult, IndexDocumentsMethod, IndexerConfig, + }; use nelson::Mocker; use uuid::Uuid; @@ -162,7 +162,7 @@ pub mod test { primary_key: Option, file_store: UpdateFileStore, contents: impl Iterator, - ) -> Result { + ) -> Result>> { match self { MockIndex::Real(index) => { index.update_documents(method, primary_key, file_store, contents) diff --git a/meilisearch-lib/src/index/updates.rs b/meilisearch-lib/src/index/updates.rs index b3a4205b7..5e5a8e34b 100644 --- a/meilisearch-lib/src/index/updates.rs +++ b/meilisearch-lib/src/index/updates.rs @@ -11,7 +11,7 @@ use milli::update::{ use serde::{Deserialize, Serialize, Serializer}; use uuid::Uuid; -use super::error::Result; +use super::error::{IndexError, Result}; use super::index::{Index, IndexMeta}; use crate::update_file_store::UpdateFileStore; @@ -299,7 +299,7 @@ impl Index { primary_key: Option, file_store: UpdateFileStore, contents: impl IntoIterator, - ) -> Result { + ) -> Result>> { trace!("performing document addition"); let mut txn = self.write_txn()?; @@ -315,7 +315,7 @@ impl Index { }; let indexing_callback = |indexing_step| debug!("update: {:?}", indexing_step); - let builder = milli::update::IndexDocuments::new( + let mut builder = milli::update::IndexDocuments::new( &mut txn, self, self.indexer_config.as_ref(), @@ -323,20 +323,34 @@ impl Index { indexing_callback, )?; + let mut results = Vec::new(); for content_uuid in contents.into_iter() { let content_file = file_store.get_update(content_uuid)?; let reader = DocumentsBatchReader::from_reader(content_file)?; - let (builder, user_error) = builder.add_documents(reader)?; - todo!("use the user_error here"); + let (new_builder, user_result) = builder.add_documents(reader)?; + builder = new_builder; + + let user_result = match user_result { + Ok(count) => { + let addition = DocumentAdditionResult { + indexed_documents: count, + number_of_documents: count, + }; + info!("document addition done: {:?}", addition); + Ok(addition) + } + Err(e) => Err(IndexError::from(e)), + }; + + results.push(user_result); } - let addition = builder.execute()?; + if results.iter().any(Result::is_ok) { + let _addition = builder.execute()?; + txn.commit()?; + } - txn.commit()?; - - info!("document addition done: {:?}", addition); - - Ok(addition) + Ok(results) } pub fn update_settings(&self, settings: &Settings) -> Result<()> { diff --git a/meilisearch-lib/src/index_resolver/mod.rs b/meilisearch-lib/src/index_resolver/mod.rs index 686a549b9..284f64942 100644 --- a/meilisearch-lib/src/index_resolver/mod.rs +++ b/meilisearch-lib/src/index_resolver/mod.rs @@ -150,25 +150,34 @@ mod real { }) .await; - let event = match result { - Ok(Ok(result)) => TaskEvent::Succeeded { - timestamp: OffsetDateTime::now_utc(), - result: TaskResult::DocumentAddition { - indexed_documents: result.indexed_documents, - }, - }, - Ok(Err(e)) => TaskEvent::Failed { - timestamp: OffsetDateTime::now_utc(), - error: e.into(), - }, - Err(e) => TaskEvent::Failed { - timestamp: OffsetDateTime::now_utc(), - error: IndexResolverError::from(e).into(), - }, - }; - - for task in tasks.iter_mut() { - task.events.push(event.clone()); + match result { + Ok(Ok(results)) => { + for (task, result) in tasks.iter_mut().zip(results) { + let event = match result { + Ok(addition) => { + TaskEvent::succeeded(TaskResult::DocumentAddition { + indexed_documents: addition.indexed_documents, + }) + } + Err(error) => { + TaskEvent::failed(IndexResolverError::from(error)) + } + }; + task.events.push(event); + } + } + Ok(Err(e)) => { + let event = TaskEvent::failed(e); + for task in tasks.iter_mut() { + task.events.push(event.clone()); + } + } + Err(e) => { + let event = TaskEvent::failed(IndexResolverError::from(e)); + for task in tasks.iter_mut() { + task.events.push(event.clone()); + } + } } } _ => panic!("invalid batch!"), diff --git a/meilisearch-lib/src/update_file_store.rs b/meilisearch-lib/src/update_file_store.rs index e1be0dbd4..cb4eadf4d 100644 --- a/meilisearch-lib/src/update_file_store.rs +++ b/meilisearch-lib/src/update_file_store.rs @@ -150,8 +150,8 @@ mod store { let update_file = File::open(update_file_path)?; let mut dst_file = NamedTempFile::new_in(&dump_path)?; - let mut document_cursor = DocumentsBatchReader::from_reader(update_file)?.into_cursor(); - let index = document_cursor.documents_batch_index(); + let (mut document_cursor, index) = + DocumentsBatchReader::from_reader(update_file)?.into_cursor_and_fields_index(); let mut document_buffer = Map::new(); // TODO: we need to find a way to do this more efficiently. (create a custom serializer diff --git a/permissive-json-pointer/src/lib.rs b/permissive-json-pointer/src/lib.rs index 8f97ab2de..52f181980 100644 --- a/permissive-json-pointer/src/lib.rs +++ b/permissive-json-pointer/src/lib.rs @@ -49,7 +49,7 @@ fn contained_in(selector: &str, key: &str) -> bool { /// map_leaf_values( /// value.as_object_mut().unwrap(), /// ["jean.race.name"], -/// |key, value| match (value, dbg!(key)) { +/// |key, value| match (value, key) { /// (Value::String(name), "jean.race.name") => *name = "patou".to_string(), /// _ => unreachable!(), /// }, @@ -729,7 +729,7 @@ mod tests { map_leaf_values( value.as_object_mut().unwrap(), ["jean.race.name"], - |key, value| match (value, dbg!(key)) { + |key, value| match (value, key) { (Value::String(name), "jean.race.name") => *name = S("patou"), _ => unreachable!(), }, From 58d2aad3097c33d3d894bb05ab66234eea0a5b0c Mon Sep 17 00:00:00 2001 From: ManyTheFish Date: Thu, 28 Jul 2022 11:55:12 +0200 Subject: [PATCH 4/8] Change binary option and add env var support --- meilisearch-http/src/lib.rs | 6 ++--- meilisearch-lib/src/options.rs | 25 +++---------------- meilisearch-lib/src/tasks/scheduler.rs | 31 +++--------------------- meilisearch-lib/src/tasks/update_loop.rs | 10 -------- 4 files changed, 11 insertions(+), 61 deletions(-) diff --git a/meilisearch-http/src/lib.rs b/meilisearch-http/src/lib.rs index 9df66071e..2169bdb37 100644 --- a/meilisearch-http/src/lib.rs +++ b/meilisearch-http/src/lib.rs @@ -29,9 +29,9 @@ pub static AUTOBATCHING_ENABLED: AtomicBool = AtomicBool::new(false); pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result { let mut meilisearch = MeiliSearch::builder(); - // enable autobatching? - AUTOBATCHING_ENABLED.store( - opt.scheduler_options.enable_auto_batching, + // disable autobatching? + let _ = AUTOBATCHING_ENABLED.store( + opt.scheduler_options.disable_auto_batching, std::sync::atomic::Ordering::Relaxed, ); diff --git a/meilisearch-lib/src/options.rs b/meilisearch-lib/src/options.rs index c71f1cba6..ea810b9b7 100644 --- a/meilisearch-lib/src/options.rs +++ b/meilisearch-lib/src/options.rs @@ -41,27 +41,10 @@ pub struct IndexerOpts { #[derive(Debug, Clone, Parser, Default, Serialize)] pub struct SchedulerConfig { - /// enable the autobatching experimental feature - #[clap(long, hide = true)] - pub enable_auto_batching: bool, - - // The maximum number of updates of the same type that can be batched together. - // If unspecified, this is unlimited. A value of 0 is interpreted as 1. - #[clap(long, requires = "enable-auto-batching", hide = true)] - pub max_batch_size: Option, - - // The maximum number of documents in a document batch. Since batches must contain at least one - // update for the scheduler to make progress, the number of documents in a batch will be at - // least the number of documents of its first update. - #[clap(long, requires = "enable-auto-batching", hide = true)] - pub max_documents_per_batch: Option, - - /// Debounce duration in seconds - /// - /// When a new task is enqueued, the scheduler waits for `debounce_duration_sec` seconds for new updates before - /// starting to process a batch of updates. - #[clap(long, requires = "enable-auto-batching", hide = true)] - pub debounce_duration_sec: Option, + /// The engine will disable task auto-batching, + /// and will sequencialy compute each task one by one. + #[clap(long, env = "DISABLE_AUTO_BATCHING")] + pub disable_auto_batching: bool, } impl TryFrom<&IndexerOpts> for IndexerConfig { diff --git a/meilisearch-lib/src/tasks/scheduler.rs b/meilisearch-lib/src/tasks/scheduler.rs index 9c181b86b..a709f566e 100644 --- a/meilisearch-lib/src/tasks/scheduler.rs +++ b/meilisearch-lib/src/tasks/scheduler.rs @@ -3,7 +3,6 @@ use std::collections::{hash_map::Entry, BinaryHeap, HashMap, VecDeque}; use std::ops::{Deref, DerefMut}; use std::slice; use std::sync::Arc; -use std::time::Duration; use atomic_refcell::AtomicRefCell; use milli::update::IndexDocumentsMethod; @@ -248,17 +247,10 @@ impl Scheduler { pub fn new( store: TaskStore, performers: Vec>, - mut config: SchedulerConfig, + config: SchedulerConfig, ) -> Result>> { let (notifier, rcv) = watch::channel(()); - let debounce_time = config.debounce_duration_sec; - - // Disable autobatching - if !config.enable_auto_batching { - config.max_batch_size = Some(1); - } - let this = Self { snapshots: VecDeque::new(), tasks: TaskQueue::default(), @@ -275,12 +267,7 @@ impl Scheduler { let this = Arc::new(RwLock::new(this)); - let update_loop = UpdateLoop::new( - this.clone(), - performers, - debounce_time.filter(|&v| v > 0).map(Duration::from_secs), - rcv, - ); + let update_loop = UpdateLoop::new(this.clone(), performers, rcv); tokio::task::spawn_local(update_loop.run()); @@ -497,27 +484,17 @@ fn make_batch(tasks: &mut TaskQueue, config: &SchedulerConfig) -> Processing { match list.peek() { Some(pending) if pending.kind == kind => { // We always need to process at least one task for the scheduler to make progress. - if task_list.len() >= config.max_batch_size.unwrap_or(usize::MAX).max(1) - { + if config.disable_auto_batching && task_list.len() > 0 { break; } let pending = list.pop().unwrap(); task_list.push(pending.id); - // We add the number of documents to the count if we are scheduling document additions and - // stop adding if we already have enough. - // - // We check that bound only after adding the current task to the batch, so that a batch contains at least one task. + // We add the number of documents to the count if we are scheduling document additions. match pending.kind { TaskType::DocumentUpdate { number } | TaskType::DocumentAddition { number } => { doc_count += number; - - if doc_count - >= config.max_documents_per_batch.unwrap_or(usize::MAX) - { - break; - } } _ => (), } diff --git a/meilisearch-lib/src/tasks/update_loop.rs b/meilisearch-lib/src/tasks/update_loop.rs index b99eb54b5..b6e43e319 100644 --- a/meilisearch-lib/src/tasks/update_loop.rs +++ b/meilisearch-lib/src/tasks/update_loop.rs @@ -1,9 +1,7 @@ use std::sync::Arc; -use std::time::Duration; use time::OffsetDateTime; use tokio::sync::{watch, RwLock}; -use tokio::time::interval_at; use super::batch::Batch; use super::error::Result; @@ -17,20 +15,17 @@ pub struct UpdateLoop { performers: Vec>, notifier: Option>, - debounce_duration: Option, } impl UpdateLoop { pub fn new( scheduler: Arc>, performers: Vec>, - debuf_duration: Option, notifier: watch::Receiver<()>, ) -> Self { Self { scheduler, performers, - debounce_duration: debuf_duration, notifier: Some(notifier), } } @@ -43,11 +38,6 @@ impl UpdateLoop { break; } - if let Some(t) = self.debounce_duration { - let mut interval = interval_at(tokio::time::Instant::now() + t, t); - interval.tick().await; - }; - if let Err(e) = self.process_next_batch().await { log::error!("an error occurred while processing an update batch: {}", e); } From e6f03f82df7a75a45e918dc995cb0904316545e6 Mon Sep 17 00:00:00 2001 From: ManyTheFish Date: Thu, 28 Jul 2022 15:33:54 +0200 Subject: [PATCH 5/8] Fix clippy warnings --- meilisearch-http/src/lib.rs | 2 +- meilisearch-lib/src/dump/error.rs | 8 +++++++- meilisearch-lib/src/tasks/scheduler.rs | 2 +- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/meilisearch-http/src/lib.rs b/meilisearch-http/src/lib.rs index 2169bdb37..858f49924 100644 --- a/meilisearch-http/src/lib.rs +++ b/meilisearch-http/src/lib.rs @@ -30,7 +30,7 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result { let mut meilisearch = MeiliSearch::builder(); // disable autobatching? - let _ = AUTOBATCHING_ENABLED.store( + AUTOBATCHING_ENABLED.store( opt.scheduler_options.disable_auto_batching, std::sync::atomic::Ordering::Relaxed, ); diff --git a/meilisearch-lib/src/dump/error.rs b/meilisearch-lib/src/dump/error.rs index 3f6e2aae5..679fa2bc2 100644 --- a/meilisearch-lib/src/dump/error.rs +++ b/meilisearch-lib/src/dump/error.rs @@ -11,7 +11,7 @@ pub enum DumpError { #[error("An internal error has occurred. `{0}`.")] Internal(Box), #[error("{0}")] - IndexResolver(#[from] IndexResolverError), + IndexResolver(Box), } internal_error!( @@ -26,6 +26,12 @@ internal_error!( TaskError ); +impl From for DumpError { + fn from(e: IndexResolverError) -> Self { + Self::IndexResolver(Box::new(e)) + } +} + impl ErrorCode for DumpError { fn error_code(&self) -> Code { match self { diff --git a/meilisearch-lib/src/tasks/scheduler.rs b/meilisearch-lib/src/tasks/scheduler.rs index a709f566e..c7a522c96 100644 --- a/meilisearch-lib/src/tasks/scheduler.rs +++ b/meilisearch-lib/src/tasks/scheduler.rs @@ -484,7 +484,7 @@ fn make_batch(tasks: &mut TaskQueue, config: &SchedulerConfig) -> Processing { match list.peek() { Some(pending) if pending.kind == kind => { // We always need to process at least one task for the scheduler to make progress. - if config.disable_auto_batching && task_list.len() > 0 { + if config.disable_auto_batching && !task_list.is_empty() { break; } let pending = list.pop().unwrap(); From 3a48de136efa84ee337d880e0b4d7421ddb19c7b Mon Sep 17 00:00:00 2001 From: ManyTheFish Date: Wed, 10 Aug 2022 16:42:26 +0200 Subject: [PATCH 6/8] Add autobatching test --- .../tests/documents/add_documents.rs | 74 +++++++++++++++++++ 1 file changed, 74 insertions(+) diff --git a/meilisearch-http/tests/documents/add_documents.rs b/meilisearch-http/tests/documents/add_documents.rs index cb7030051..9ccd69e63 100644 --- a/meilisearch-http/tests/documents/add_documents.rs +++ b/meilisearch-http/tests/documents/add_documents.rs @@ -1,5 +1,6 @@ use crate::common::{GetAllDocumentsOptions, Server}; use actix_web::test; +use assert_json_diff::assert_json_include; use meilisearch_http::{analytics, create_app}; use serde_json::{json, Value}; use time::{format_description::well_known::Rfc3339, OffsetDateTime}; @@ -1095,3 +1096,76 @@ async fn add_documents_with_primary_key_twice() { let (response, _code) = index.get_task(1).await; assert_eq!(response["status"], "succeeded"); } + +#[actix_rt::test] +async fn batch_several_documents_addition() { + let server = Server::new().await; + let index = server.index("test"); + + let mut documents: Vec<_> = (0..150usize) + .into_iter() + .map(|id| { + json!( + { + "id": id, + "title": "foo", + "desc": "bar" + } + ) + }) + .collect(); + + documents[100] = json!({"title": "error", "desc": "error"}); + + // enqueue batch of documents + for chunk in documents.chunks(30) { + index.add_documents(json!(chunk), Some("id")).await; + } + + // wait first batch of documents to finish + index.wait_task(4).await; + + // run a second completely failing batch + for chunk in documents.chunks(30) { + let mut chunk = chunk.to_vec(); + chunk[0] = json!({"title": "error", "desc": "error"}); + + index.add_documents(json!(chunk), Some("id")).await; + } + // wait second batch of documents to finish + index.wait_task(9).await; + + let (response, _code) = index.list_tasks().await; + + // Check if only the 6th task failed + assert_json_include!( + actual: response, + expected: + json!( + { + "results": [ + {"uid": 9, "status": "failed"}, + {"uid": 8, "status": "failed"}, + {"uid": 7, "status": "failed"}, + {"uid": 6, "status": "failed"}, + {"uid": 5, "status": "failed"}, + {"uid": 4, "status": "succeeded"}, + {"uid": 3, "status": "failed"}, + {"uid": 2, "status": "succeeded"}, + {"uid": 1, "status": "succeeded"}, + {"uid": 0, "status": "succeeded"}, + ] + } + ) + ); + + // Check if there are exactly 120 documents (150 - 30) in the index; + let (response, code) = index + .get_all_documents(GetAllDocumentsOptions { + limit: Some(200), + ..Default::default() + }) + .await; + assert_eq!(code, 200, "failed with `{}`", response); + assert_eq!(response["results"].as_array().unwrap().len(), 120); +} From ae174c2ccae1154f68207de6827d25c42b2b35f7 Mon Sep 17 00:00:00 2001 From: ManyTheFish Date: Thu, 11 Aug 2022 13:35:35 +0200 Subject: [PATCH 7/8] Fix task serialization --- meilisearch-http/src/lib.rs | 2 +- meilisearch-http/src/task.rs | 20 +++++------ .../tests/documents/add_documents.rs | 33 +++++++++++-------- 3 files changed, 31 insertions(+), 24 deletions(-) diff --git a/meilisearch-http/src/lib.rs b/meilisearch-http/src/lib.rs index 858f49924..fcf07587f 100644 --- a/meilisearch-http/src/lib.rs +++ b/meilisearch-http/src/lib.rs @@ -31,7 +31,7 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result { // disable autobatching? AUTOBATCHING_ENABLED.store( - opt.scheduler_options.disable_auto_batching, + !opt.scheduler_options.disable_auto_batching, std::sync::atomic::Ordering::Relaxed, ); diff --git a/meilisearch-http/src/task.rs b/meilisearch-http/src/task.rs index 06bba1f76..08009f7da 100644 --- a/meilisearch-http/src/task.rs +++ b/meilisearch-http/src/task.rs @@ -231,7 +231,7 @@ pub struct TaskView { #[serde(serialize_with = "time::serde::rfc3339::option::serialize")] finished_at: Option, #[serde(skip_serializing_if = "Option::is_none")] - batch_uid: Option>, + batch_uid: Option, } impl From for TaskView { @@ -380,15 +380,15 @@ impl From for TaskView { let duration = finished_at.zip(started_at).map(|(tf, ts)| (tf - ts)); - let batch_uid = if AUTOBATCHING_ENABLED.load(std::sync::atomic::Ordering::Relaxed) { - let id = events.iter().find_map(|e| match e { - TaskEvent::Batched { batch_id, .. } => Some(*batch_id), - _ => None, - }); - Some(id) - } else { - None - }; + let batch_uid = AUTOBATCHING_ENABLED + .load(std::sync::atomic::Ordering::Relaxed) + .then(|| { + events.iter().find_map(|e| match e { + TaskEvent::Batched { batch_id, .. } => Some(*batch_id), + _ => None, + }) + }) + .flatten(); Self { uid: id, diff --git a/meilisearch-http/tests/documents/add_documents.rs b/meilisearch-http/tests/documents/add_documents.rs index 9ccd69e63..dae03d435 100644 --- a/meilisearch-http/tests/documents/add_documents.rs +++ b/meilisearch-http/tests/documents/add_documents.rs @@ -1126,10 +1126,10 @@ async fn batch_several_documents_addition() { index.wait_task(4).await; // run a second completely failing batch + documents[40] = json!({"title": "error", "desc": "error"}); + documents[70] = json!({"title": "error", "desc": "error"}); + documents[130] = json!({"title": "error", "desc": "error"}); for chunk in documents.chunks(30) { - let mut chunk = chunk.to_vec(); - chunk[0] = json!({"title": "error", "desc": "error"}); - index.add_documents(json!(chunk), Some("id")).await; } // wait second batch of documents to finish @@ -1144,16 +1144,23 @@ async fn batch_several_documents_addition() { json!( { "results": [ - {"uid": 9, "status": "failed"}, - {"uid": 8, "status": "failed"}, - {"uid": 7, "status": "failed"}, - {"uid": 6, "status": "failed"}, - {"uid": 5, "status": "failed"}, - {"uid": 4, "status": "succeeded"}, - {"uid": 3, "status": "failed"}, - {"uid": 2, "status": "succeeded"}, - {"uid": 1, "status": "succeeded"}, - {"uid": 0, "status": "succeeded"}, + // Completelly failing batch + {"uid": 9, "status": "failed", "batchUid": 6}, + {"uid": 8, "status": "failed", "batchUid": 6}, + {"uid": 7, "status": "failed", "batchUid": 6}, + {"uid": 6, "status": "failed", "batchUid": 6}, + + // Inter-batch + {"uid": 5, "status": "succeeded", "batchUid": 5}, + + // 1 fail in an succeded batch + {"uid": 4, "status": "succeeded", "batchUid": 1}, + {"uid": 3, "status": "failed", "batchUid": 1}, + {"uid": 2, "status": "succeeded", "batchUid": 1}, + {"uid": 1, "status": "succeeded", "batchUid": 1}, + + // Inter-batch + {"uid": 0, "status": "succeeded", "batchUid": 0}, ] } ) From b6e6a08f7dea47aaedb06117d1e477cd28dac973 Mon Sep 17 00:00:00 2001 From: ManyTheFish Date: Tue, 16 Aug 2022 15:14:01 +0200 Subject: [PATCH 8/8] Fix CI test --- .../tests/documents/add_documents.rs | 41 +++++-------------- 1 file changed, 10 insertions(+), 31 deletions(-) diff --git a/meilisearch-http/tests/documents/add_documents.rs b/meilisearch-http/tests/documents/add_documents.rs index dae03d435..ebd5f58d3 100644 --- a/meilisearch-http/tests/documents/add_documents.rs +++ b/meilisearch-http/tests/documents/add_documents.rs @@ -1,6 +1,6 @@ use crate::common::{GetAllDocumentsOptions, Server}; use actix_web::test; -use assert_json_diff::assert_json_include; + use meilisearch_http::{analytics, create_app}; use serde_json::{json, Value}; use time::{format_description::well_known::Rfc3339, OffsetDateTime}; @@ -1118,53 +1118,32 @@ async fn batch_several_documents_addition() { documents[100] = json!({"title": "error", "desc": "error"}); // enqueue batch of documents + let mut waiter = Vec::new(); for chunk in documents.chunks(30) { - index.add_documents(json!(chunk), Some("id")).await; + waiter.push(index.add_documents(json!(chunk), Some("id"))); } // wait first batch of documents to finish + futures::future::join_all(waiter).await; index.wait_task(4).await; // run a second completely failing batch documents[40] = json!({"title": "error", "desc": "error"}); documents[70] = json!({"title": "error", "desc": "error"}); documents[130] = json!({"title": "error", "desc": "error"}); + let mut waiter = Vec::new(); for chunk in documents.chunks(30) { - index.add_documents(json!(chunk), Some("id")).await; + waiter.push(index.add_documents(json!(chunk), Some("id"))); } // wait second batch of documents to finish + futures::future::join_all(waiter).await; index.wait_task(9).await; - let (response, _code) = index.list_tasks().await; + let (response, _code) = index.filtered_tasks(&[], &["failed"]).await; // Check if only the 6th task failed - assert_json_include!( - actual: response, - expected: - json!( - { - "results": [ - // Completelly failing batch - {"uid": 9, "status": "failed", "batchUid": 6}, - {"uid": 8, "status": "failed", "batchUid": 6}, - {"uid": 7, "status": "failed", "batchUid": 6}, - {"uid": 6, "status": "failed", "batchUid": 6}, - - // Inter-batch - {"uid": 5, "status": "succeeded", "batchUid": 5}, - - // 1 fail in an succeded batch - {"uid": 4, "status": "succeeded", "batchUid": 1}, - {"uid": 3, "status": "failed", "batchUid": 1}, - {"uid": 2, "status": "succeeded", "batchUid": 1}, - {"uid": 1, "status": "succeeded", "batchUid": 1}, - - // Inter-batch - {"uid": 0, "status": "succeeded", "batchUid": 0}, - ] - } - ) - ); + println!("{}", &response); + assert_eq!(response["results"].as_array().unwrap().len(), 5); // Check if there are exactly 120 documents (150 - 30) in the index; let (response, code) = index