From 2f1eb78b1db965ea6681b7ed26a1f726a50aa127 Mon Sep 17 00:00:00 2001 From: Tamo Date: Wed, 12 Oct 2022 03:21:25 +0200 Subject: [PATCH] refactor the Task a little bit --- Cargo.lock | 1 + dump/src/reader/compat/v5_to_v6.rs | 30 +- dump/src/reader/v6.rs | 200 ----- dump/src/writer.rs | 8 +- index-scheduler/src/autobatcher.rs | 363 ++++++--- index-scheduler/src/batch.rs | 12 +- index-scheduler/src/lib.rs | 23 +- index-scheduler/src/task.rs | 270 +------ meilisearch-http/src/routes/dump.rs | 2 +- .../src/routes/indexes/documents.rs | 10 +- meilisearch-http/src/routes/indexes/mod.rs | 4 +- .../src/routes/indexes/settings.rs | 6 +- meilisearch-http/src/routes/mod.rs | 28 +- meilisearch-http/src/routes/tasks.rs | 140 +++- meilisearch-http/src/search.rs | 694 ++++++++++++++++++ meilisearch-types/Cargo.toml | 1 + meilisearch-types/src/tasks.rs | 399 +++++----- 17 files changed, 1358 insertions(+), 833 deletions(-) delete mode 100644 dump/src/reader/v6.rs create mode 100644 meilisearch-http/src/search.rs diff --git a/Cargo.lock b/Cargo.lock index 8b420c459..02e5639e7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2370,6 +2370,7 @@ dependencies = [ "serde_json", "time 0.3.14", "tokio", + "uuid 1.1.2", ] [[package]] diff --git a/dump/src/reader/compat/v5_to_v6.rs b/dump/src/reader/compat/v5_to_v6.rs index c145b223e..9616913c8 100644 --- a/dump/src/reader/compat/v5_to_v6.rs +++ b/dump/src/reader/compat/v5_to_v6.rs @@ -72,15 +72,23 @@ impl CompatV5ToV6 { v5::Status::Succeeded => v6::Status::Succeeded, v5::Status::Failed => v6::Status::Failed, }, - kind: match &task.content { - v5::tasks::TaskContent::IndexCreation { .. } => v6::Kind::IndexCreation, - v5::tasks::TaskContent::IndexUpdate { .. } => v6::Kind::IndexUpdate, + kind: match task.content.clone() { + v5::tasks::TaskContent::IndexCreation { primary_key, .. } => { + v6::Kind::IndexCreation { primary_key } + } + v5::tasks::TaskContent::IndexUpdate { primary_key, .. } => { + v6::Kind::IndexUpdate { primary_key } + } v5::tasks::TaskContent::IndexDeletion { .. } => v6::Kind::IndexDeletion, v5::tasks::TaskContent::DocumentAddition { merge_strategy, allow_index_creation, + primary_key, + documents_count, .. } => v6::Kind::DocumentImport { + primary_key, + documents_count: documents_count as u64, method: match merge_strategy { v5::tasks::IndexDocumentsMethod::ReplaceDocuments => { v6::milli::update::IndexDocumentsMethod::ReplaceDocuments @@ -91,14 +99,22 @@ impl CompatV5ToV6 { }, allow_index_creation: allow_index_creation.clone(), }, - v5::tasks::TaskContent::DocumentDeletion { .. } => { - v6::Kind::DocumentDeletion - } + v5::tasks::TaskContent::DocumentDeletion { deletion, .. } => match deletion + { + v5::tasks::DocumentDeletion::Clear => v6::Kind::DocumentClear, + v5::tasks::DocumentDeletion::Ids(documents_ids) => { + v6::Kind::DocumentDeletion { documents_ids } + } + }, v5::tasks::TaskContent::SettingsUpdate { allow_index_creation, + is_deletion, + settings, .. } => v6::Kind::Settings { - allow_index_creation: allow_index_creation.clone(), + is_deletion, + allow_index_creation, + settings: settings.into(), }, v5::tasks::TaskContent::Dump { .. } => v6::Kind::DumpExport, }, diff --git a/dump/src/reader/v6.rs b/dump/src/reader/v6.rs deleted file mode 100644 index 7ac454ad5..000000000 --- a/dump/src/reader/v6.rs +++ /dev/null @@ -1,200 +0,0 @@ -use std::{ - fs::{self, File}, - io::{BufRead, BufReader}, - path::Path, - str::FromStr, -}; - -use tempfile::TempDir; -use time::OffsetDateTime; -use uuid::Uuid; - -use crate::{Error, IndexMetadata, Result, Version}; - -pub use meilisearch_types::milli; - -use super::Document; - -pub type Metadata = crate::Metadata; - -pub type Settings = meilisearch_types::settings::Settings; -pub type Checked = meilisearch_types::settings::Checked; -pub type Unchecked = meilisearch_types::settings::Unchecked; - -pub type Task = meilisearch_types::tasks::TaskDump; -pub type Key = meilisearch_auth::Key; - -// ===== Other types to clarify the code of the compat module -// everything related to the tasks -pub type Status = meilisearch_types::tasks::Status; -pub type Kind = meilisearch_types::tasks::Kind; -pub type Details = meilisearch_types::tasks::Details; - -// everything related to the settings -pub type Setting = meilisearch_types::milli::update::Setting; -pub type TypoTolerance = meilisearch_types::settings::TypoSettings; -pub type MinWordSizeForTypos = meilisearch_types::settings::MinWordSizeTyposSetting; -pub type FacetingSettings = meilisearch_types::settings::FacetingSettings; -pub type PaginationSettings = meilisearch_types::settings::PaginationSettings; - -// everything related to the api keys -pub type Action = meilisearch_auth::Action; -pub type StarOr = meilisearch_types::star_or::StarOr; -pub type IndexUid = meilisearch_types::index_uid::IndexUid; - -// everything related to the errors -pub type ResponseError = meilisearch_types::error::ResponseError; -pub type Code = meilisearch_types::error::Code; - -pub struct V6Reader { - dump: TempDir, - instance_uid: Uuid, - metadata: Metadata, - tasks: BufReader, - keys: BufReader, -} - -impl V6Reader { - pub fn open(dump: TempDir) -> Result { - let meta_file = fs::read(dump.path().join("metadata.json"))?; - let instance_uid = fs::read_to_string(dump.path().join("instance_uid.uuid"))?; - let instance_uid = Uuid::from_str(&instance_uid)?; - - Ok(V6Reader { - metadata: serde_json::from_reader(&*meta_file)?, - instance_uid, - tasks: BufReader::new(File::open(dump.path().join("tasks").join("queue.jsonl"))?), - keys: BufReader::new(File::open(dump.path().join("keys.jsonl"))?), - dump, - }) - } - - pub fn version(&self) -> Version { - Version::V6 - } - - pub fn date(&self) -> Option { - Some(self.metadata.dump_date) - } - - pub fn instance_uid(&self) -> Result> { - Ok(Some(self.instance_uid)) - } - - pub fn indexes(&self) -> Result> + '_>> { - let entries = fs::read_dir(self.dump.path().join("indexes"))?; - Ok(Box::new( - entries - .map(|entry| -> Result> { - let entry = entry?; - if entry.file_type()?.is_dir() { - let index = V6IndexReader::new( - entry - .file_name() - .to_str() - .ok_or(Error::BadIndexName)? - .to_string(), - &entry.path(), - )?; - Ok(Some(index)) - } else { - Ok(None) - } - }) - .filter_map(|entry| entry.transpose()), - )) - } - - pub fn tasks( - &mut self, - ) -> Box>)>> + '_> { - Box::new((&mut self.tasks).lines().map(|line| -> Result<_> { - let task: Task = serde_json::from_str(&line?)?; - - let update_file_path = self - .dump - .path() - .join("tasks") - .join("update_files") - .join(format!("{}.jsonl", task.uid.to_string())); - - if update_file_path.exists() { - Ok(( - task, - Some(Box::new(UpdateFile::new(&update_file_path)?) as Box), - )) - } else { - Ok((task, None)) - } - })) - } - - pub fn keys(&mut self) -> Box> + '_> { - Box::new( - (&mut self.keys) - .lines() - .map(|line| -> Result<_> { Ok(serde_json::from_str(&line?)?) }), - ) - } -} - -pub struct UpdateFile { - reader: BufReader, -} - -impl UpdateFile { - fn new(path: &Path) -> Result { - Ok(UpdateFile { - reader: BufReader::new(File::open(path)?), - }) - } -} - -impl Iterator for UpdateFile { - type Item = Result; - - fn next(&mut self) -> Option { - (&mut self.reader) - .lines() - .map(|line| { - line.map_err(Error::from) - .and_then(|line| serde_json::from_str(&line).map_err(Error::from)) - }) - .next() - } -} - -pub struct V6IndexReader { - metadata: IndexMetadata, - documents: BufReader, - settings: BufReader, -} - -impl V6IndexReader { - pub fn new(_name: String, path: &Path) -> Result { - let metadata = File::open(path.join("metadata.json"))?; - - let ret = V6IndexReader { - metadata: serde_json::from_reader(metadata)?, - documents: BufReader::new(File::open(path.join("documents.jsonl"))?), - settings: BufReader::new(File::open(path.join("settings.json"))?), - }; - - Ok(ret) - } - - pub fn metadata(&self) -> &IndexMetadata { - &self.metadata - } - - pub fn documents(&mut self) -> Result> + '_> { - Ok((&mut self.documents) - .lines() - .map(|line| -> Result<_> { Ok(serde_json::from_str(&line?)?) })) - } - - pub fn settings(&mut self) -> Result> { - let settings: Settings = serde_json::from_reader(&mut self.settings)?; - Ok(settings.check()) - } -} diff --git a/dump/src/writer.rs b/dump/src/writer.rs index bc5752fee..e5675ec30 100644 --- a/dump/src/writer.rs +++ b/dump/src/writer.rs @@ -6,8 +6,10 @@ use std::{ use flate2::{write::GzEncoder, Compression}; use meilisearch_auth::Key; -use meilisearch_types::settings::{Checked, Settings}; -use meilisearch_types::tasks::TaskDump; +use meilisearch_types::{ + settings::{Checked, Settings}, + tasks::Task, +}; use serde_json::{Map, Value}; use tempfile::TempDir; use time::OffsetDateTime; @@ -105,7 +107,7 @@ impl TaskWriter { /// Pushes tasks in the dump. /// If the tasks has an associated `update_file` it'll use the `task_id` as its name. - pub fn push_task(&mut self, task: &TaskDump) -> Result { + pub fn push_task(&mut self, task: &Task) -> Result { self.queue.write_all(&serde_json::to_vec(task)?)?; self.queue.write_all(b"\n")?; diff --git a/index-scheduler/src/autobatcher.rs b/index-scheduler/src/autobatcher.rs index 8a01ef493..897199be8 100644 --- a/index-scheduler/src/autobatcher.rs +++ b/index-scheduler/src/autobatcher.rs @@ -4,6 +4,61 @@ use meilisearch_types::milli::update::IndexDocumentsMethod::{ use meilisearch_types::tasks::{Kind, TaskId}; use std::ops::ControlFlow::{self, Break, Continue}; +use crate::KindWithContent; + +/// This enum contain the minimal necessary informations +/// to make the autobatcher works. +enum AutobatchKind { + DocumentImport { + method: IndexDocumentsMethod, + allow_index_creation: bool, + }, + DocumentDeletion, + DocumentClear, + Settings { + allow_index_creation: bool, + }, + IndexCreation, + IndexDeletion, + IndexUpdate, + IndexSwap, + CancelTask, + DeleteTasks, + DumpExport, + Snapshot, +} + +impl From for AutobatchKind { + fn from(kind: KindWithContent) -> Self { + match kind { + KindWithContent::DocumentImport { + method, + allow_index_creation, + .. + } => AutobatchKind::DocumentImport { + method, + allow_index_creation, + }, + KindWithContent::DocumentDeletion { .. } => AutobatchKind::DocumentDeletion, + KindWithContent::DocumentClear { .. } => AutobatchKind::DocumentClear, + KindWithContent::Settings { + allow_index_creation, + .. + } => AutobatchKind::Settings { + allow_index_creation, + }, + KindWithContent::IndexDeletion { .. } => AutobatchKind::IndexDeletion, + KindWithContent::IndexCreation { .. } => AutobatchKind::IndexCreation, + KindWithContent::IndexUpdate { .. } => AutobatchKind::IndexUpdate, + KindWithContent::IndexSwap { lhs, rhs } => AutobatchKind::IndexSwap, + KindWithContent::CancelTask { .. } => AutobatchKind::CancelTask, + KindWithContent::DeleteTasks { .. } => AutobatchKind::DeleteTasks, + KindWithContent::DumpExport { .. } => AutobatchKind::DumpExport, + KindWithContent::Snapshot => AutobatchKind::Snapshot, + } + } +} + #[derive(Debug)] pub enum BatchKind { DocumentClear { @@ -48,14 +103,16 @@ pub enum BatchKind { impl BatchKind { /// Returns a `ControlFlow::Break` if you must stop right now. - pub fn new(task_id: TaskId, kind: Kind) -> ControlFlow { - match kind { - Kind::IndexCreation => Break(BatchKind::IndexCreation { id: task_id }), - Kind::IndexDeletion => Break(BatchKind::IndexDeletion { ids: vec![task_id] }), - Kind::IndexUpdate => Break(BatchKind::IndexUpdate { id: task_id }), - Kind::IndexSwap => Break(BatchKind::IndexSwap { id: task_id }), - Kind::DocumentClear => Continue(BatchKind::DocumentClear { ids: vec![task_id] }), - Kind::DocumentImport { + pub fn new(task_id: TaskId, kind: KindWithContent) -> ControlFlow { + use AutobatchKind as K; + + match AutobatchKind::from(kind) { + K::IndexCreation => Break(BatchKind::IndexCreation { id: task_id }), + K::IndexDeletion => Break(BatchKind::IndexDeletion { ids: vec![task_id] }), + K::IndexUpdate => Break(BatchKind::IndexUpdate { id: task_id }), + K::IndexSwap => Break(BatchKind::IndexSwap { id: task_id }), + K::DocumentClear => Continue(BatchKind::DocumentClear { ids: vec![task_id] }), + K::DocumentImport { method, allow_index_creation, } => Continue(BatchKind::DocumentImport { @@ -63,16 +120,16 @@ impl BatchKind { allow_index_creation, import_ids: vec![task_id], }), - Kind::DocumentDeletion => Continue(BatchKind::DocumentDeletion { + K::DocumentDeletion => Continue(BatchKind::DocumentDeletion { deletion_ids: vec![task_id], }), - Kind::Settings { + K::Settings { allow_index_creation, } => Continue(BatchKind::Settings { allow_index_creation, settings_ids: vec![task_id], }), - Kind::DumpExport | Kind::Snapshot | Kind::CancelTask | Kind::DeleteTasks => { + K::DumpExport | K::Snapshot | K::CancelTask | K::DeleteTasks => { unreachable!() } } @@ -80,17 +137,19 @@ impl BatchKind { /// Returns a `ControlFlow::Break` if you must stop right now. #[rustfmt::skip] - fn accumulate(self, id: TaskId, kind: Kind) -> ControlFlow { + fn accumulate(self, id: TaskId, kind: AutobatchKind) -> ControlFlow { + use AutobatchKind as K; + match (self, kind) { // We don't batch any of these operations - (this, Kind::IndexCreation | Kind::IndexUpdate | Kind::IndexSwap) => Break(this), + (this, K::IndexCreation | K::IndexUpdate | K::IndexSwap) => Break(this), // The index deletion can batch with everything but must stop after ( BatchKind::DocumentClear { mut ids } | BatchKind::DocumentDeletion { deletion_ids: mut ids } | BatchKind::DocumentImport { method: _, allow_index_creation: _, import_ids: mut ids } | BatchKind::Settings { allow_index_creation: _, settings_ids: mut ids }, - Kind::IndexDeletion, + K::IndexDeletion, ) => { ids.push(id); Break(BatchKind::IndexDeletion { ids }) @@ -98,7 +157,7 @@ impl BatchKind { ( BatchKind::ClearAndSettings { settings_ids: mut ids, allow_index_creation: _, mut other } | BatchKind::SettingsAndDocumentImport { import_ids: mut ids, method: _, allow_index_creation: _, settings_ids: mut other }, - Kind::IndexDeletion, + K::IndexDeletion, ) => { ids.push(id); ids.append(&mut other); @@ -107,18 +166,18 @@ impl BatchKind { ( BatchKind::DocumentClear { mut ids }, - Kind::DocumentClear | Kind::DocumentDeletion, + K::DocumentClear | K::DocumentDeletion, ) => { ids.push(id); Continue(BatchKind::DocumentClear { ids }) } ( this @ BatchKind::DocumentClear { .. }, - Kind::DocumentImport { .. } | Kind::Settings { .. }, + K::DocumentImport { .. } | K::Settings { .. }, ) => Break(this), ( BatchKind::DocumentImport { method: _, allow_index_creation: _, import_ids: mut ids }, - Kind::DocumentClear, + K::DocumentClear, ) => { ids.push(id); Continue(BatchKind::DocumentClear { ids }) @@ -128,13 +187,13 @@ impl BatchKind { // or document imports not allowed to create an index if the first operation can. ( this @ BatchKind::DocumentImport { method: _, allow_index_creation: false, .. }, - Kind::DocumentImport { method: _, allow_index_creation: true }, + K::DocumentImport { method: _, allow_index_creation: true }, ) => Break(this), // we can autobatch the same kind of document additions / updates ( BatchKind::DocumentImport { method: ReplaceDocuments, allow_index_creation, mut import_ids }, - Kind::DocumentImport { method: ReplaceDocuments, .. }, + K::DocumentImport { method: ReplaceDocuments, .. }, ) => { import_ids.push(id); Continue(BatchKind::DocumentImport { @@ -145,7 +204,7 @@ impl BatchKind { } ( BatchKind::DocumentImport { method: UpdateDocuments, allow_index_creation, mut import_ids }, - Kind::DocumentImport { method: UpdateDocuments, .. }, + K::DocumentImport { method: UpdateDocuments, .. }, ) => { import_ids.push(id); Continue(BatchKind::DocumentImport { @@ -159,18 +218,18 @@ impl BatchKind { // this match branch MUST be AFTER the previous one ( this @ BatchKind::DocumentImport { .. }, - Kind::DocumentDeletion | Kind::DocumentImport { .. }, + K::DocumentDeletion | K::DocumentImport { .. }, ) => Break(this), // We only want to batch together document imports that are allowed to create the index // or document imports not allowed to create an index if the first operation can. ( this @ BatchKind::DocumentImport { allow_index_creation: false, .. }, - Kind::Settings { allow_index_creation: true }, + K::Settings { allow_index_creation: true }, ) => Break(this), ( BatchKind::DocumentImport { method, allow_index_creation, import_ids }, - Kind::Settings { .. }, + K::Settings { .. }, ) => Continue(BatchKind::SettingsAndDocumentImport { settings_ids: vec![id], method, @@ -178,20 +237,20 @@ impl BatchKind { import_ids, }), - (BatchKind::DocumentDeletion { mut deletion_ids }, Kind::DocumentClear) => { + (BatchKind::DocumentDeletion { mut deletion_ids }, K::DocumentClear) => { deletion_ids.push(id); Continue(BatchKind::DocumentClear { ids: deletion_ids }) } - (this @ BatchKind::DocumentDeletion { .. }, Kind::DocumentImport { .. }) => Break(this), - (BatchKind::DocumentDeletion { mut deletion_ids }, Kind::DocumentDeletion) => { + (this @ BatchKind::DocumentDeletion { .. }, K::DocumentImport { .. }) => Break(this), + (BatchKind::DocumentDeletion { mut deletion_ids }, K::DocumentDeletion) => { deletion_ids.push(id); Continue(BatchKind::DocumentDeletion { deletion_ids }) } - (this @ BatchKind::DocumentDeletion { .. }, Kind::Settings { .. }) => Break(this), + (this @ BatchKind::DocumentDeletion { .. }, K::Settings { .. }) => Break(this), ( BatchKind::Settings { settings_ids, allow_index_creation }, - Kind::DocumentClear, + K::DocumentClear, ) => Continue(BatchKind::ClearAndSettings { settings_ids: settings_ids, allow_index_creation, @@ -199,15 +258,15 @@ impl BatchKind { }), ( this @ BatchKind::Settings { .. }, - Kind::DocumentImport { .. } | Kind::DocumentDeletion, + K::DocumentImport { .. } | K::DocumentDeletion, ) => Break(this), ( this @ BatchKind::Settings { allow_index_creation: false, .. }, - Kind::Settings { allow_index_creation: true }, + K::Settings { allow_index_creation: true }, ) => Break(this), ( BatchKind::Settings { mut settings_ids, allow_index_creation }, - Kind::Settings { .. }, + K::Settings { .. }, ) => { settings_ids.push(id); Continue(BatchKind::Settings { @@ -218,7 +277,7 @@ impl BatchKind { ( BatchKind::ClearAndSettings { mut other, settings_ids, allow_index_creation }, - Kind::DocumentClear, + K::DocumentClear, ) => { other.push(id); Continue(BatchKind::ClearAndSettings { @@ -227,14 +286,14 @@ impl BatchKind { allow_index_creation, }) } - (this @ BatchKind::ClearAndSettings { .. }, Kind::DocumentImport { .. }) => Break(this), + (this @ BatchKind::ClearAndSettings { .. }, K::DocumentImport { .. }) => Break(this), ( BatchKind::ClearAndSettings { mut other, settings_ids, allow_index_creation, }, - Kind::DocumentDeletion, + K::DocumentDeletion, ) => { other.push(id); Continue(BatchKind::ClearAndSettings { @@ -245,13 +304,13 @@ impl BatchKind { } ( this @ BatchKind::ClearAndSettings { allow_index_creation: false, .. }, - Kind::Settings { + K::Settings { allow_index_creation: true, }, ) => Break(this), ( BatchKind::ClearAndSettings { mut settings_ids, other, allow_index_creation }, - Kind::Settings { .. }, + K::Settings { .. }, ) => { settings_ids.push(id); Continue(BatchKind::ClearAndSettings { @@ -262,7 +321,7 @@ impl BatchKind { } ( BatchKind::SettingsAndDocumentImport { settings_ids, method: _, import_ids: mut other, allow_index_creation }, - Kind::DocumentClear, + K::DocumentClear, ) => { other.push(id); Continue(BatchKind::ClearAndSettings { @@ -275,11 +334,11 @@ impl BatchKind { // we can batch the settings with a kind of document operation with the same kind of document operation ( this @ BatchKind::SettingsAndDocumentImport { allow_index_creation: false, .. }, - Kind::DocumentImport { allow_index_creation: true, .. }, + K::DocumentImport { allow_index_creation: true, .. }, ) => Break(this), ( BatchKind::SettingsAndDocumentImport { settings_ids, method: ReplaceDocuments, mut import_ids, allow_index_creation }, - Kind::DocumentImport { method: ReplaceDocuments, .. }, + K::DocumentImport { method: ReplaceDocuments, .. }, ) => { import_ids.push(id); Continue(BatchKind::SettingsAndDocumentImport { @@ -291,7 +350,7 @@ impl BatchKind { } ( BatchKind::SettingsAndDocumentImport { settings_ids, method: UpdateDocuments, allow_index_creation, mut import_ids }, - Kind::DocumentImport { method: UpdateDocuments, .. }, + K::DocumentImport { method: UpdateDocuments, .. }, ) => { import_ids.push(id); Continue(BatchKind::SettingsAndDocumentImport { @@ -305,15 +364,15 @@ impl BatchKind { // this MUST be AFTER the two previous branch ( this @ BatchKind::SettingsAndDocumentImport { .. }, - Kind::DocumentDeletion | Kind::DocumentImport { .. }, + K::DocumentDeletion | K::DocumentImport { .. }, ) => Break(this), ( this @ BatchKind::SettingsAndDocumentImport { allow_index_creation: false, .. }, - Kind::Settings { allow_index_creation: true }, + K::Settings { allow_index_creation: true }, ) => Break(this), ( BatchKind::SettingsAndDocumentImport { mut settings_ids, method, allow_index_creation, import_ids }, - Kind::Settings { .. }, + K::Settings { .. }, ) => { settings_ids.push(id); Continue(BatchKind::SettingsAndDocumentImport { @@ -323,7 +382,7 @@ impl BatchKind { import_ids, }) } - (_, Kind::CancelTask | Kind::DeleteTasks | Kind::DumpExport | Kind::Snapshot) => { + (_, K::CancelTask | K::DeleteTasks | K::DumpExport | K::Snapshot) => { unreachable!() } ( @@ -339,7 +398,7 @@ impl BatchKind { } } -pub fn autobatch(enqueued: Vec<(TaskId, Kind)>) -> Option { +pub fn autobatch(enqueued: Vec<(TaskId, KindWithContent)>) -> Option { let mut enqueued = enqueued.into_iter(); let (id, kind) = enqueued.next()?; let mut acc = match BatchKind::new(id, kind) { @@ -348,7 +407,7 @@ pub fn autobatch(enqueued: Vec<(TaskId, Kind)>) -> Option { }; for (id, kind) in enqueued { - acc = match acc.accumulate(id, kind) { + acc = match acc.accumulate(id, kind.into()) { Continue(acc) => acc, Break(acc) => return Some(acc), }; @@ -362,144 +421,204 @@ mod tests { use crate::assert_smol_debug_snapshot; use super::*; - use Kind::*; + use uuid::Uuid; - fn autobatch_from(input: impl IntoIterator) -> Option { + fn autobatch_from(input: impl IntoIterator) -> Option { autobatch( input .into_iter() .enumerate() - .map(|(id, kind)| (id as TaskId, kind)) + .map(|(id, kind)| (id as TaskId, kind.into())) .collect(), ) } + fn doc_imp(method: IndexDocumentsMethod, allow_index_creation: bool) -> KindWithContent { + KindWithContent::DocumentImport { + index_uid: String::from("doggo"), + primary_key: None, + method, + content_file: Uuid::new_v4(), + documents_count: 0, + allow_index_creation, + } + } + + fn doc_del() -> KindWithContent { + KindWithContent::DocumentDeletion { + index_uid: String::from("doggo"), + documents_ids: Vec::new(), + } + } + + fn doc_clr() -> KindWithContent { + KindWithContent::DocumentClear { + index_uid: String::from("doggo"), + } + } + + fn settings(allow_index_creation: bool) -> KindWithContent { + KindWithContent::Settings { + index_uid: String::from("doggo"), + new_settings: Default::default(), + is_deletion: false, + allow_index_creation, + } + } + + fn idx_create() -> KindWithContent { + KindWithContent::IndexCreation { + index_uid: String::from("doggo"), + primary_key: None, + } + } + + fn idx_update() -> KindWithContent { + KindWithContent::IndexUpdate { + index_uid: String::from("doggo"), + primary_key: None, + } + } + + fn idx_del() -> KindWithContent { + KindWithContent::IndexDeletion { + index_uid: String::from("doggo"), + } + } + + fn idx_swap() -> KindWithContent { + KindWithContent::IndexSwap { + lhs: String::from("doggo"), + rhs: String::from("catto"), + } + } + #[test] fn autobatch_simple_operation_together() { // we can autobatch one or multiple DocumentAddition together - assert_smol_debug_snapshot!(autobatch_from([DocumentImport { method: ReplaceDocuments, allow_index_creation: true }]), @"Some(DocumentImport { method: ReplaceDocuments, allow_index_creation: true, import_ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentImport { method: ReplaceDocuments, allow_index_creation: true }, DocumentImport { method: ReplaceDocuments, allow_index_creation: true }, DocumentImport { method: ReplaceDocuments, allow_index_creation: true }]), @"Some(DocumentImport { method: ReplaceDocuments, allow_index_creation: true, import_ids: [0, 1, 2] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_imp(ReplaceDocuments, true)]), @"Some(DocumentImport { method: ReplaceDocuments, allow_index_creation: true, import_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_imp(ReplaceDocuments, true), doc_imp( ReplaceDocuments, true ), doc_imp(ReplaceDocuments, true )]), @"Some(DocumentImport { method: ReplaceDocuments, allow_index_creation: true, import_ids: [0, 1, 2] })"); // we can autobatch one or multiple DocumentUpdate together - assert_smol_debug_snapshot!(autobatch_from([DocumentImport { method: UpdateDocuments, allow_index_creation: true }]), @"Some(DocumentImport { method: UpdateDocuments, allow_index_creation: true, import_ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentImport { method: UpdateDocuments, allow_index_creation: true }, DocumentImport { method: UpdateDocuments, allow_index_creation: true }, DocumentImport { method: UpdateDocuments, allow_index_creation: true }]), @"Some(DocumentImport { method: UpdateDocuments, allow_index_creation: true, import_ids: [0, 1, 2] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_imp(UpdateDocuments, true)]), @"Some(DocumentImport { method: UpdateDocuments, allow_index_creation: true, import_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_imp(UpdateDocuments, true), doc_imp(UpdateDocuments, true), doc_imp(UpdateDocuments, true)]), @"Some(DocumentImport { method: UpdateDocuments, allow_index_creation: true, import_ids: [0, 1, 2] })"); // we can autobatch one or multiple DocumentDeletion together - assert_smol_debug_snapshot!(autobatch_from([DocumentDeletion]), @"Some(DocumentDeletion { deletion_ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentDeletion, DocumentDeletion, DocumentDeletion]), @"Some(DocumentDeletion { deletion_ids: [0, 1, 2] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_del()]), @"Some(DocumentDeletion { deletion_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_del(), doc_del(), doc_del()]), @"Some(DocumentDeletion { deletion_ids: [0, 1, 2] })"); // we can autobatch one or multiple Settings together - assert_smol_debug_snapshot!(autobatch_from([Settings { allow_index_creation: true }]), @"Some(Settings { allow_index_creation: true, settings_ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([Settings { allow_index_creation: true }, Settings { allow_index_creation: true }, Settings { allow_index_creation: true }]), @"Some(Settings { allow_index_creation: true, settings_ids: [0, 1, 2] })"); + assert_smol_debug_snapshot!(autobatch_from([settings(true)]), @"Some(Settings { allow_index_creation: true, settings_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([settings(true), settings(true), settings(true)]), @"Some(Settings { allow_index_creation: true, settings_ids: [0, 1, 2] })"); } #[test] fn simple_document_operation_dont_autobatch_with_other() { // addition, updates and deletion can't batch together - assert_smol_debug_snapshot!(autobatch_from([DocumentImport { method: ReplaceDocuments, allow_index_creation: true }, DocumentImport { method: UpdateDocuments, allow_index_creation: true }]), @"Some(DocumentImport { method: ReplaceDocuments, allow_index_creation: true, import_ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentImport { method: ReplaceDocuments, allow_index_creation: true }, DocumentDeletion]), @"Some(DocumentImport { method: ReplaceDocuments, allow_index_creation: true, import_ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentImport { method: UpdateDocuments, allow_index_creation: true }, DocumentImport { method: ReplaceDocuments, allow_index_creation: true }]), @"Some(DocumentImport { method: UpdateDocuments, allow_index_creation: true, import_ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentImport { method: UpdateDocuments, allow_index_creation: true }, DocumentDeletion]), @"Some(DocumentImport { method: UpdateDocuments, allow_index_creation: true, import_ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentDeletion, DocumentImport { method: ReplaceDocuments, allow_index_creation: true }]), @"Some(DocumentDeletion { deletion_ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentDeletion, DocumentImport { method: UpdateDocuments, allow_index_creation: true }]), @"Some(DocumentDeletion { deletion_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_imp(ReplaceDocuments, true), doc_imp(UpdateDocuments, true)]), @"Some(DocumentImport { method: ReplaceDocuments, allow_index_creation: true, import_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_imp(ReplaceDocuments, true), doc_del()]), @"Some(DocumentImport { method: ReplaceDocuments, allow_index_creation: true, import_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_imp(UpdateDocuments, true), doc_imp(ReplaceDocuments, true)]), @"Some(DocumentImport { method: UpdateDocuments, allow_index_creation: true, import_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_imp(UpdateDocuments, true), doc_del()]), @"Some(DocumentImport { method: UpdateDocuments, allow_index_creation: true, import_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_del(), doc_imp(ReplaceDocuments, true)]), @"Some(DocumentDeletion { deletion_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_del(), doc_imp(UpdateDocuments, true)]), @"Some(DocumentDeletion { deletion_ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentImport { method: ReplaceDocuments, allow_index_creation: true }, IndexCreation]), @"Some(DocumentImport { method: ReplaceDocuments, allow_index_creation: true, import_ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentImport { method: UpdateDocuments, allow_index_creation: true }, IndexCreation]), @"Some(DocumentImport { method: UpdateDocuments, allow_index_creation: true, import_ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentDeletion, IndexCreation]), @"Some(DocumentDeletion { deletion_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_imp(ReplaceDocuments, true), idx_create()]), @"Some(DocumentImport { method: ReplaceDocuments, allow_index_creation: true, import_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_imp(UpdateDocuments, true), idx_create()]), @"Some(DocumentImport { method: UpdateDocuments, allow_index_creation: true, import_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_del(), idx_create()]), @"Some(DocumentDeletion { deletion_ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentImport { method: ReplaceDocuments, allow_index_creation: true }, IndexUpdate]), @"Some(DocumentImport { method: ReplaceDocuments, allow_index_creation: true, import_ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentImport { method: UpdateDocuments, allow_index_creation: true }, IndexUpdate]), @"Some(DocumentImport { method: UpdateDocuments, allow_index_creation: true, import_ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentDeletion, IndexUpdate]), @"Some(DocumentDeletion { deletion_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_imp(ReplaceDocuments, true), idx_update()]), @"Some(DocumentImport { method: ReplaceDocuments, allow_index_creation: true, import_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_imp(UpdateDocuments, true), idx_update()]), @"Some(DocumentImport { method: UpdateDocuments, allow_index_creation: true, import_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_del(), idx_update()]), @"Some(DocumentDeletion { deletion_ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentImport { method: ReplaceDocuments, allow_index_creation: true }, IndexSwap]), @"Some(DocumentImport { method: ReplaceDocuments, allow_index_creation: true, import_ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentImport { method: UpdateDocuments, allow_index_creation: true }, IndexSwap]), @"Some(DocumentImport { method: UpdateDocuments, allow_index_creation: true, import_ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentDeletion, IndexSwap]), @"Some(DocumentDeletion { deletion_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_imp(ReplaceDocuments, true), idx_swap()]), @"Some(DocumentImport { method: ReplaceDocuments, allow_index_creation: true, import_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_imp(UpdateDocuments, true), idx_swap()]), @"Some(DocumentImport { method: UpdateDocuments, allow_index_creation: true, import_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_del(), idx_swap()]), @"Some(DocumentDeletion { deletion_ids: [0] })"); } #[test] fn document_addition_batch_with_settings() { // simple case - assert_smol_debug_snapshot!(autobatch_from([DocumentImport { method: ReplaceDocuments, allow_index_creation: true }, Settings { allow_index_creation: true }]), @"Some(SettingsAndDocumentImport { settings_ids: [1], method: ReplaceDocuments, allow_index_creation: true, import_ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentImport { method: UpdateDocuments, allow_index_creation: true }, Settings { allow_index_creation: true }]), @"Some(SettingsAndDocumentImport { settings_ids: [1], method: UpdateDocuments, allow_index_creation: true, import_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_imp(ReplaceDocuments, true), settings(true)]), @"Some(settingsAnddoc_im(Repl)eDocuments)allow_index_creation: true, import_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_imp(UpdateDocuments, true), settings(true)]), @"Some(settingsAnddoc_im(Upda)Documents)allow_index_creation: true, import_ids: [0] })"); // multiple settings and doc addition - assert_smol_debug_snapshot!(autobatch_from([DocumentImport { method: ReplaceDocuments, allow_index_creation: true }, DocumentImport { method: ReplaceDocuments, allow_index_creation: true }, Settings { allow_index_creation: true }, Settings { allow_index_creation: true }]), @"Some(SettingsAndDocumentImport { settings_ids: [2, 3], method: ReplaceDocuments, allow_index_creation: true, import_ids: [0, 1] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentImport { method: ReplaceDocuments, allow_index_creation: true }, DocumentImport { method: ReplaceDocuments, allow_index_creation: true }, Settings { allow_index_creation: true }, Settings { allow_index_creation: true }]), @"Some(SettingsAndDocumentImport { settings_ids: [2, 3], method: ReplaceDocuments, allow_index_creation: true, import_ids: [0, 1] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_imp(ReplaceDocuments, true), doc_imp(ReplaceDocuments, true), settings(true), settings(true)]), @"Some(settingsAnddoc_im(Repl)eDocuments)allow_index_creation: true, import_ids: [0, 1] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_imp(ReplaceDocuments, true), doc_imp(ReplaceDocuments, true), settings(true), settings(true)]), @"Some(settingsAnddoc_im(Repl)eDocuments)allow_index_creation: true, import_ids: [0, 1] })"); // addition and setting unordered - assert_smol_debug_snapshot!(autobatch_from([DocumentImport { method: ReplaceDocuments, allow_index_creation: true }, Settings { allow_index_creation: true }, DocumentImport { method: ReplaceDocuments, allow_index_creation: true }, Settings { allow_index_creation: true }]), @"Some(SettingsAndDocumentImport { settings_ids: [1, 3], method: ReplaceDocuments, allow_index_creation: true, import_ids: [0, 2] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentImport { method: UpdateDocuments, allow_index_creation: true }, Settings { allow_index_creation: true }, DocumentImport { method: UpdateDocuments, allow_index_creation: true }, Settings { allow_index_creation: true }]), @"Some(SettingsAndDocumentImport { settings_ids: [1, 3], method: UpdateDocuments, allow_index_creation: true, import_ids: [0, 2] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_imp(ReplaceDocuments, true), settings(true), doc_imp(ReplaceDocuments, true), settings(true)]), @"Some(settingsAnddoc_im(Repl)eDocuments)allow_index_creation: true, import_ids: [0, 2] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_imp(UpdateDocuments, true), settings(true), doc_imp(UpdateDocuments, true), settings(true)]), @"Some(settingsAnddoc_im(Upda)Documents)allow_index_creation: true, import_ids: [0, 2] })"); // We ensure this kind of batch doesn't batch with forbidden operations - assert_smol_debug_snapshot!(autobatch_from([DocumentImport { method: ReplaceDocuments, allow_index_creation: true }, Settings { allow_index_creation: true }, DocumentImport { method: UpdateDocuments, allow_index_creation: true }]), @"Some(SettingsAndDocumentImport { settings_ids: [1], method: ReplaceDocuments, allow_index_creation: true, import_ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentImport { method: UpdateDocuments, allow_index_creation: true }, Settings { allow_index_creation: true }, DocumentImport { method: ReplaceDocuments, allow_index_creation: true }]), @"Some(SettingsAndDocumentImport { settings_ids: [1], method: UpdateDocuments, allow_index_creation: true, import_ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentImport { method: ReplaceDocuments, allow_index_creation: true }, Settings { allow_index_creation: true }, DocumentDeletion]), @"Some(SettingsAndDocumentImport { settings_ids: [1], method: ReplaceDocuments, allow_index_creation: true, import_ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentImport { method: UpdateDocuments, allow_index_creation: true }, Settings { allow_index_creation: true }, DocumentDeletion]), @"Some(SettingsAndDocumentImport { settings_ids: [1], method: UpdateDocuments, allow_index_creation: true, import_ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentImport { method: ReplaceDocuments, allow_index_creation: true }, Settings { allow_index_creation: true }, IndexCreation]), @"Some(SettingsAndDocumentImport { settings_ids: [1], method: ReplaceDocuments, allow_index_creation: true, import_ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentImport { method: UpdateDocuments, allow_index_creation: true }, Settings { allow_index_creation: true }, IndexCreation]), @"Some(SettingsAndDocumentImport { settings_ids: [1], method: UpdateDocuments, allow_index_creation: true, import_ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentImport { method: ReplaceDocuments, allow_index_creation: true }, Settings { allow_index_creation: true }, IndexUpdate]), @"Some(SettingsAndDocumentImport { settings_ids: [1], method: ReplaceDocuments, allow_index_creation: true, import_ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentImport { method: UpdateDocuments, allow_index_creation: true }, Settings { allow_index_creation: true }, IndexUpdate]), @"Some(SettingsAndDocumentImport { settings_ids: [1], method: UpdateDocuments, allow_index_creation: true, import_ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentImport { method: ReplaceDocuments, allow_index_creation: true }, Settings { allow_index_creation: true }, IndexSwap]), @"Some(SettingsAndDocumentImport { settings_ids: [1], method: ReplaceDocuments, allow_index_creation: true, import_ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentImport { method: UpdateDocuments, allow_index_creation: true }, Settings { allow_index_creation: true }, IndexSwap]), @"Some(SettingsAndDocumentImport { settings_ids: [1], method: UpdateDocuments, allow_index_creation: true, import_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_imp(ReplaceDocuments, true), settings(true), doc_imp(UpdateDocuments, true)]), @"Some(settingsAnddoc_im(Repl)eDocuments)allow_index_creation: true, import_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_imp(UpdateDocuments, true), settings(true), doc_imp(ReplaceDocuments, true)]), @"Some(settingsAnddoc_im(Upda)Documents)allow_index_creation: true, import_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_imp(ReplaceDocuments, true), settings(true), doc_del()]), @"Some(settingsAnddoc_im(Repl)eDocuments)allow_index_creation: true, import_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_imp(UpdateDocuments, true), settings(true), doc_del()]), @"Some(settingsAnddoc_im(Upda)Documents)allow_index_creation: true, import_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_imp(ReplaceDocuments, true), settings(true), idx_create()]), @"Some(settingsAnddoc_im(Repl)eDocuments)allow_index_creation: true, import_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_imp(UpdateDocuments, true), settings(true), idx_create()]), @"Some(settingsAnddoc_im(Upda)Documents)allow_index_creation: true, import_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_imp(ReplaceDocuments, true), settings(true), idx_update()]), @"Some(settingsAnddoc_im(Repl)eDocuments)allow_index_creation: true, import_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_imp(UpdateDocuments, true), settings(true), idx_update()]), @"Some(settingsAnddoc_im(Upda)Documents)allow_index_creation: true, import_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_imp(ReplaceDocuments, true), settings(true), idx_swap()]), @"Some(settingsAnddoc_im(Repl)eDocuments)allow_index_creation: true, import_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_imp(UpdateDocuments, true), settings(true), idx_swap()]), @"Some(settingsAnddoc_im(Upda)Documents)allow_index_creation: true, import_ids: [0] })"); } #[test] fn clear_and_additions() { // these two doesn't need to batch - assert_smol_debug_snapshot!(autobatch_from([DocumentClear, DocumentImport { method: ReplaceDocuments, allow_index_creation: true }]), @"Some(DocumentClear { ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentClear, DocumentImport { method: UpdateDocuments, allow_index_creation: true }]), @"Some(DocumentClear { ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_clr(), doc_imp(ReplaceDocuments, true)]), @"Some(doc_clr() { ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_clr(), doc_imp(UpdateDocuments, true)]), @"Some(doc_clr() { ids: [0] })"); // Basic use case - assert_smol_debug_snapshot!(autobatch_from([DocumentImport { method: ReplaceDocuments, allow_index_creation: true }, DocumentImport { method: ReplaceDocuments, allow_index_creation: true }, DocumentClear]), @"Some(DocumentClear { ids: [0, 1, 2] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentImport { method: UpdateDocuments, allow_index_creation: true }, DocumentImport { method: UpdateDocuments, allow_index_creation: true }, DocumentClear]), @"Some(DocumentClear { ids: [0, 1, 2] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_imp(ReplaceDocuments, true), doc_imp(ReplaceDocuments, true), doc_clr()]), @"Some(doc_clr() { ids: [0, 1, 2] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_imp(UpdateDocuments, true), doc_imp(UpdateDocuments, true), doc_clr()]), @"Some(doc_clr() { ids: [0, 1, 2] })"); // This batch kind doesn't mix with other document addition - assert_smol_debug_snapshot!(autobatch_from([DocumentImport { method: ReplaceDocuments, allow_index_creation: true }, DocumentImport { method: ReplaceDocuments, allow_index_creation: true }, DocumentClear, DocumentImport { method: ReplaceDocuments, allow_index_creation: true }]), @"Some(DocumentClear { ids: [0, 1, 2] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentImport { method: UpdateDocuments, allow_index_creation: true }, DocumentImport { method: UpdateDocuments, allow_index_creation: true }, DocumentClear, DocumentImport { method: UpdateDocuments, allow_index_creation: true }]), @"Some(DocumentClear { ids: [0, 1, 2] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_imp(ReplaceDocuments, true), doc_imp(ReplaceDocuments, true), doc_clr(), doc_imp(ReplaceDocuments, true)]), @"Some(doc_clr() { ids: [0, 1, 2] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_imp(UpdateDocuments, true), doc_imp(UpdateDocuments, true), doc_clr(), doc_imp(UpdateDocuments, true)]), @"Some(doc_clr() { ids: [0, 1, 2] })"); // But you can batch multiple clear together - assert_smol_debug_snapshot!(autobatch_from([DocumentImport { method: ReplaceDocuments, allow_index_creation: true }, DocumentImport { method: ReplaceDocuments, allow_index_creation: true }, DocumentClear, DocumentClear, DocumentClear]), @"Some(DocumentClear { ids: [0, 1, 2, 3, 4] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentImport { method: UpdateDocuments, allow_index_creation: true }, DocumentImport { method: UpdateDocuments, allow_index_creation: true }, DocumentClear, DocumentClear, DocumentClear]), @"Some(DocumentClear { ids: [0, 1, 2, 3, 4] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_imp(ReplaceDocuments, true), doc_imp(ReplaceDocuments, true), doc_clr(), doc_clr(), doc_clr()]), @"Some(doc_clr() { ids: [0, 1, 2, 3, 4] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_imp(UpdateDocuments, true), doc_imp(UpdateDocuments, true), doc_clr(), doc_clr(), doc_clr()]), @"Some(doc_clr() { ids: [0, 1, 2, 3, 4] })"); } #[test] fn clear_and_additions_and_settings() { // A clear don't need to autobatch the settings that happens AFTER there is no documents - assert_smol_debug_snapshot!(autobatch_from([DocumentClear, Settings { allow_index_creation: true }]), @"Some(DocumentClear { ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_clr(), settings(true)]), @"Some(doc_clr() { ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([Settings { allow_index_creation: true }, DocumentClear, Settings { allow_index_creation: true }]), @"Some(ClearAndSettings { other: [1], allow_index_creation: true, settings_ids: [0, 2] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentImport { method: ReplaceDocuments, allow_index_creation: true }, Settings { allow_index_creation: true }, DocumentClear]), @"Some(ClearAndSettings { other: [0, 2], allow_index_creation: true, settings_ids: [1] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentImport { method: UpdateDocuments, allow_index_creation: true }, Settings { allow_index_creation: true }, DocumentClear]), @"Some(ClearAndSettings { other: [0, 2], allow_index_creation: true, settings_ids: [1] })"); + assert_smol_debug_snapshot!(autobatch_from([settings(true), doc_clr(), settings(true)]), @"Some(clearAndSettings([1) allow_index_creation: true, settings_ids: [0, 2] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_imp(ReplaceDocuments, true), settings(true), doc_clr()]), @"Some(clearAndSettings([0)2], allow_index_creation: true, settings_ids: [1] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_imp(UpdateDocuments, true), settings(true), doc_clr()]), @"Some(clearAndSettings([0)2], allow_index_creation: true, settings_ids: [1] })"); } #[test] fn anything_and_index_deletion() { // The indexdeletion doesn't batch with anything that happens AFTER - assert_smol_debug_snapshot!(autobatch_from([IndexDeletion, DocumentImport { method: ReplaceDocuments, allow_index_creation: true }]), @"Some(IndexDeletion { ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([IndexDeletion, DocumentImport { method: UpdateDocuments, allow_index_creation: true }]), @"Some(IndexDeletion { ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([IndexDeletion, DocumentDeletion]), @"Some(IndexDeletion { ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([IndexDeletion, DocumentClear]), @"Some(IndexDeletion { ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([IndexDeletion, Settings { allow_index_creation: true }]), @"Some(IndexDeletion { ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([idx_del(), doc_imp(ReplaceDocuments, true)]), @"Some(idx_del() { ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([idx_del(), doc_imp(UpdateDocuments, true)]), @"Some(idx_del() { ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([idx_del(), doc_del()]), @"Some(idx_del() { ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([idx_del(), doc_clr()]), @"Some(idx_del() { ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([idx_del(), settings(true)]), @"Some(idx_del() { ids: [0] })"); - // The index deletion can accept almost any type of BatchKind and transform it to an IndexDeletion + // The index deletion can accept almost any type of BatchKind and transform it to an idx_del() // First, the basic cases - assert_smol_debug_snapshot!(autobatch_from([DocumentImport { method: ReplaceDocuments, allow_index_creation: true }, IndexDeletion]), @"Some(IndexDeletion { ids: [0, 1] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentImport { method: UpdateDocuments, allow_index_creation: true }, IndexDeletion]), @"Some(IndexDeletion { ids: [0, 1] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentDeletion, IndexDeletion]), @"Some(IndexDeletion { ids: [0, 1] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentClear, IndexDeletion]), @"Some(IndexDeletion { ids: [0, 1] })"); - assert_smol_debug_snapshot!(autobatch_from([Settings { allow_index_creation: true }, IndexDeletion]), @"Some(IndexDeletion { ids: [0, 1] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_imp(ReplaceDocuments, true), idx_del()]), @"Some(idx_del() { ids: [0, 1] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_imp(UpdateDocuments, true), idx_del()]), @"Some(idx_del() { ids: [0, 1] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_del(), idx_del()]), @"Some(idx_del() { ids: [0, 1] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_clr(), idx_del()]), @"Some(idx_del() { ids: [0, 1] })"); + assert_smol_debug_snapshot!(autobatch_from([settings(true), idx_del()]), @"Some(idx_del() { ids: [0, 1] })"); // Then the mixed cases - assert_smol_debug_snapshot!(autobatch_from([DocumentImport { method: ReplaceDocuments, allow_index_creation: true }, Settings { allow_index_creation: true }, IndexDeletion]), @"Some(IndexDeletion { ids: [0, 2, 1] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentImport { method: UpdateDocuments, allow_index_creation: true }, Settings { allow_index_creation: true }, IndexDeletion]), @"Some(IndexDeletion { ids: [0, 2, 1] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentImport { method: ReplaceDocuments, allow_index_creation: true }, Settings { allow_index_creation: true }, DocumentClear, IndexDeletion]), @"Some(IndexDeletion { ids: [1, 3, 0, 2] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentImport { method: UpdateDocuments, allow_index_creation: true }, Settings { allow_index_creation: true }, DocumentClear, IndexDeletion]), @"Some(IndexDeletion { ids: [1, 3, 0, 2] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_imp(ReplaceDocuments, true), settings(true), idx_del()]), @"Some(idx_del() { ids: [0, 2, 1] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_imp(UpdateDocuments, true), settings(true), idx_del()]), @"Some(idx_del() { ids: [0, 2, 1] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_imp(ReplaceDocuments, true), settings(true), doc_clr(), idx_del()]), @"Some(idx_del() { ids: [1, 3, 0, 2] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_imp(UpdateDocuments, true), settings(true), doc_clr(), idx_del()]), @"Some(idx_del() { ids: [1, 3, 0, 2] })"); } #[test] fn allowed_and_disallowed_index_creation() { - // DocumentImport that can create indexes can't be mixed with those disallowed to do so - assert_smol_debug_snapshot!(autobatch_from([DocumentImport { method: ReplaceDocuments, allow_index_creation: false }, DocumentImport { method: ReplaceDocuments, allow_index_creation: true }]), @"Some(DocumentImport { method: ReplaceDocuments, allow_index_creation: false, import_ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentImport { method: ReplaceDocuments, allow_index_creation: true }, DocumentImport { method: ReplaceDocuments, allow_index_creation: true }]), @"Some(DocumentImport { method: ReplaceDocuments, allow_index_creation: true, import_ids: [0, 1] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentImport { method: ReplaceDocuments, allow_index_creation: false }, DocumentImport { method: ReplaceDocuments, allow_index_creation: false }]), @"Some(DocumentImport { method: ReplaceDocuments, allow_index_creation: false, import_ids: [0, 1] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentImport { method: ReplaceDocuments, allow_index_creation: true }, Settings { allow_index_creation: true }]), @"Some(SettingsAndDocumentImport { settings_ids: [1], method: ReplaceDocuments, allow_index_creation: true, import_ids: [0] })"); - assert_smol_debug_snapshot!(autobatch_from([DocumentImport { method: ReplaceDocuments, allow_index_creation: false }, Settings { allow_index_creation: true }]), @"Some(DocumentImport { method: ReplaceDocuments, allow_index_creation: false, import_ids: [0] })"); + // doc_imp(indexes canbe)ixed with those disallowed to do so + assert_smol_debug_snapshot!(autobatch_from([doc_imp(ReplaceDocuments, false), doc_imp(ReplaceDocuments, true)]), @"Some(doc_imp(ReplaceDocuments, false)import_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_imp(ReplaceDocuments, true), doc_imp(ReplaceDocuments, true)]), @"Some(doc_imp(ReplaceDocuments, true)import_ids: [0, 1] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_imp(ReplaceDocuments, false), doc_imp(ReplaceDocuments, false)]), @"Some(doc_imp(ReplaceDocuments, false)import_ids: [0, 1] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_imp(ReplaceDocuments, true), settings(true)]), @"Some(settingsAnddoc_imp(: ReplaceDocuments: true)import_ids: [0] })"); + assert_smol_debug_snapshot!(autobatch_from([doc_imp(ReplaceDocuments, false), settings(true)]), @"Some(doc_imp(ReplaceDocuments, false)import_ids: [0] })"); } } diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 853bfd602..ef1a740f0 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -1,10 +1,6 @@ -use crate::{ - autobatcher::BatchKind, - task::{KindWithContent, Task}, - Error, IndexScheduler, Result, TaskId, -}; +use crate::{autobatcher::BatchKind, Error, IndexScheduler, Result, TaskId}; -use meilisearch_types::tasks::{Details, Kind, Status}; +use meilisearch_types::tasks::{Details, Kind, KindWithContent, Status, Task}; use log::{debug, info}; use meilisearch_types::milli::update::IndexDocumentsConfig; @@ -432,7 +428,7 @@ impl IndexScheduler { .map(|task_id| { self.get_task(rtxn, task_id) .and_then(|task| task.ok_or(Error::CorruptedTaskQueue)) - .map(|task| (task.uid, task.kind.as_kind())) + .map(|task| (task.uid, task.kind)) }) .collect::>>()?; @@ -862,9 +858,7 @@ impl IndexScheduler { (bitmap.remove(task.uid)); })?; - task.remove_data()?; self.all_tasks.delete(wtxn, &task_id)?; - Ok(()) } } diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 89f7aa07b..4cd56976d 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -11,8 +11,7 @@ pub type Result = std::result::Result; pub type TaskId = u32; pub use error::Error; -use meilisearch_types::tasks::{Kind, Status, TaskView}; -pub use task::KindWithContent; +use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task}; use std::path::PathBuf; use std::sync::{Arc, RwLock}; @@ -31,7 +30,6 @@ use meilisearch_types::milli::update::IndexerConfig; use meilisearch_types::milli::{Index, RoaringBitmapCodec, BEU32}; use crate::index_mapper::IndexMapper; -use crate::task::Task; const DEFAULT_LIMIT: fn() -> u32 = || 20; @@ -246,7 +244,7 @@ impl IndexScheduler { } /// Returns the tasks corresponding to the query. - pub fn get_tasks(&self, query: Query) -> Result> { + pub fn get_tasks(&self, query: Query) -> Result> { let rtxn = self.env.read_txn()?; let last_task_id = match self.last_task_id(&rtxn)? { Some(tid) => query.from.map(|from| from.min(tid)).unwrap_or(tid), @@ -292,13 +290,13 @@ impl IndexScheduler { .map_err(|_| Error::CorruptedTaskQueue)? .clone(); - let ret = tasks.into_iter().map(|task| task.as_task_view()); + let ret = tasks.into_iter(); if processing.is_empty() { Ok(ret.collect()) } else { Ok(ret .map(|task| match processing.contains(task.uid) { - true => TaskView { + true => Task { status: Status::Processing, started_at: Some(started_at), ..task @@ -311,7 +309,7 @@ impl IndexScheduler { /// Register a new task in the scheduler. If it fails and data was associated with the task /// it tries to delete the file. - pub fn register(&self, task: KindWithContent) -> Result { + pub fn register(&self, task: KindWithContent) -> Result { let mut wtxn = self.env.write_txn()?; let task = Task { @@ -343,13 +341,10 @@ impl IndexScheduler { (bitmap.insert(task.uid)); })?; - // we persist the file in last to be sure everything before was applied successfuly - task.persist()?; - match wtxn.commit() { Ok(()) => (), e @ Err(_) => { - task.remove_data()?; + todo!("remove the data associated with the task"); e?; } } @@ -357,7 +352,7 @@ impl IndexScheduler { // notify the scheduler loop to execute a new tick self.wake_up.signal(); - Ok(task.as_task_view()) + Ok(task) } pub fn create_update_file(&self) -> Result<(Uuid, File)> { @@ -416,7 +411,6 @@ impl IndexScheduler { task.finished_at = Some(finished_at); // TODO the info field should've been set by the process_batch function self.update_task(&mut wtxn, &task)?; - task.remove_data()?; } } // In case of a failure we must get back and patch all the tasks with the error. @@ -430,7 +424,6 @@ impl IndexScheduler { task.error = Some(error.clone()); self.update_task(&mut wtxn, &task)?; - task.remove_data()?; } } } @@ -585,7 +578,7 @@ mod tests { assert_eq!(task.uid, idx as u32); assert_eq!(task.status, Status::Enqueued); - assert_eq!(task.kind, k); + assert_eq!(task.kind.as_kind(), k); } assert_snapshot!(snapshot_index_scheduler(&index_scheduler)); diff --git a/index-scheduler/src/task.rs b/index-scheduler/src/task.rs index 471694151..ca452cb58 100644 --- a/index-scheduler/src/task.rs +++ b/index-scheduler/src/task.rs @@ -3,278 +3,10 @@ use meilisearch_types::error::ResponseError; use meilisearch_types::milli::update::IndexDocumentsMethod; use meilisearch_types::settings::{Settings, Unchecked}; -use meilisearch_types::tasks::{Details, Kind, Status, TaskView}; +use meilisearch_types::tasks::{Details, Kind, Status}; use serde::{Deserialize, Serialize}; use std::path::PathBuf; use time::OffsetDateTime; use uuid::Uuid; use crate::TaskId; - -#[derive(Debug, PartialEq, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct Task { - pub uid: TaskId, - - #[serde(with = "time::serde::rfc3339")] - pub enqueued_at: OffsetDateTime, - #[serde(with = "time::serde::rfc3339::option")] - pub started_at: Option, - #[serde(with = "time::serde::rfc3339::option")] - pub finished_at: Option, - - pub error: Option, - pub details: Option
, - - pub status: Status, - pub kind: KindWithContent, -} - -impl Task { - /// Persist all the temp files associated with the task. - pub fn persist(&self) -> Result<()> { - self.kind.persist() - } - - /// Delete all the files associated with the task. - pub fn remove_data(&self) -> Result<()> { - self.kind.remove_data() - } - - /// Return the list of indexes updated by this tasks. - pub fn indexes(&self) -> Option> { - self.kind.indexes() - } - - /// Convert a Task to a TaskView - pub fn as_task_view(&self) -> TaskView { - TaskView { - uid: self.uid, - index_uid: self - .indexes() - .and_then(|vec| vec.first().map(|i| i.to_string())), - status: self.status, - kind: self.kind.as_kind(), - details: self.details.as_ref().map(Details::as_details_view), - error: self.error.clone(), - duration: self - .started_at - .zip(self.finished_at) - .map(|(start, end)| end - start), - enqueued_at: self.enqueued_at, - started_at: self.started_at, - finished_at: self.finished_at, - } - } -} - -#[derive(Debug, PartialEq, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub enum KindWithContent { - DocumentImport { - index_uid: String, - primary_key: Option, - method: IndexDocumentsMethod, - content_file: Uuid, - documents_count: u64, - allow_index_creation: bool, - }, - DocumentDeletion { - index_uid: String, - documents_ids: Vec, - }, - DocumentClear { - index_uid: String, - }, - Settings { - index_uid: String, - new_settings: Settings, - is_deletion: bool, - allow_index_creation: bool, - }, - IndexDeletion { - index_uid: String, - }, - IndexCreation { - index_uid: String, - primary_key: Option, - }, - IndexUpdate { - index_uid: String, - primary_key: Option, - }, - IndexSwap { - lhs: String, - rhs: String, - }, - CancelTask { - tasks: Vec, - }, - DeleteTasks { - query: String, - tasks: Vec, - }, - DumpExport { - output: PathBuf, - }, - Snapshot, -} - -impl KindWithContent { - pub fn as_kind(&self) -> Kind { - match self { - KindWithContent::DocumentImport { - method, - allow_index_creation, - .. - } => Kind::DocumentImport { - method: *method, - allow_index_creation: *allow_index_creation, - }, - KindWithContent::DocumentDeletion { .. } => Kind::DocumentDeletion, - KindWithContent::DocumentClear { .. } => Kind::DocumentClear, - KindWithContent::Settings { - allow_index_creation, - .. - } => Kind::Settings { - allow_index_creation: *allow_index_creation, - }, - KindWithContent::IndexCreation { .. } => Kind::IndexCreation, - KindWithContent::IndexDeletion { .. } => Kind::IndexDeletion, - KindWithContent::IndexUpdate { .. } => Kind::IndexUpdate, - KindWithContent::IndexSwap { .. } => Kind::IndexSwap, - KindWithContent::CancelTask { .. } => Kind::CancelTask, - KindWithContent::DeleteTasks { .. } => Kind::DeleteTasks, - KindWithContent::DumpExport { .. } => Kind::DumpExport, - KindWithContent::Snapshot => Kind::Snapshot, - } - } - - pub fn persist(&self) -> Result<()> { - use KindWithContent::*; - - match self { - DocumentImport { .. } => { - // TODO: TAMO: persist the file - // content_file.persist(); - Ok(()) - } - DocumentDeletion { .. } - | DocumentClear { .. } - | Settings { .. } - | IndexCreation { .. } - | IndexDeletion { .. } - | IndexUpdate { .. } - | IndexSwap { .. } - | CancelTask { .. } - | DeleteTasks { .. } - | DumpExport { .. } - | Snapshot => Ok(()), // There is nothing to persist for all these tasks - } - } - - pub fn remove_data(&self) -> Result<()> { - use KindWithContent::*; - - match self { - DocumentImport { .. } => { - // TODO: TAMO: delete the file - // content_file.delete(); - Ok(()) - } - IndexCreation { .. } - | DocumentDeletion { .. } - | DocumentClear { .. } - | Settings { .. } - | IndexDeletion { .. } - | IndexUpdate { .. } - | IndexSwap { .. } - | CancelTask { .. } - | DeleteTasks { .. } - | DumpExport { .. } - | Snapshot => Ok(()), // There is no data associated with all these tasks - } - } - - pub fn indexes(&self) -> Option> { - use KindWithContent::*; - - match self { - DumpExport { .. } | Snapshot | CancelTask { .. } | DeleteTasks { .. } => None, - DocumentImport { index_uid, .. } - | DocumentDeletion { index_uid, .. } - | DocumentClear { index_uid } - | Settings { index_uid, .. } - | IndexCreation { index_uid, .. } - | IndexUpdate { index_uid, .. } - | IndexDeletion { index_uid } => Some(vec![index_uid]), - IndexSwap { lhs, rhs } => Some(vec![lhs, rhs]), - } - } - - /// Returns the default `Details` that correspond to this `KindWithContent`, - /// `None` if it cannot be generated. - pub fn default_details(&self) -> Option
{ - match self { - KindWithContent::DocumentImport { - documents_count, .. - } => Some(Details::DocumentAddition { - received_documents: *documents_count, - indexed_documents: 0, - }), - KindWithContent::DocumentDeletion { - index_uid: _, - documents_ids, - } => Some(Details::DocumentDeletion { - received_document_ids: documents_ids.len(), - deleted_documents: None, - }), - KindWithContent::DocumentClear { .. } => Some(Details::ClearAll { - deleted_documents: None, - }), - KindWithContent::Settings { new_settings, .. } => Some(Details::Settings { - settings: new_settings.clone(), - }), - KindWithContent::IndexDeletion { .. } => None, - KindWithContent::IndexCreation { primary_key, .. } - | KindWithContent::IndexUpdate { primary_key, .. } => Some(Details::IndexInfo { - primary_key: primary_key.clone(), - }), - KindWithContent::IndexSwap { .. } => { - todo!() - } - KindWithContent::CancelTask { .. } => { - None // TODO: check correctness of this return value - } - KindWithContent::DeleteTasks { query, tasks } => Some(Details::DeleteTasks { - matched_tasks: tasks.len(), - deleted_tasks: None, - original_query: query.clone(), - }), - KindWithContent::DumpExport { .. } => None, - KindWithContent::Snapshot => None, - } - } -} - -#[cfg(test)] -mod tests { - use meilisearch_types::heed::{types::SerdeJson, BytesDecode, BytesEncode}; - - use crate::assert_smol_debug_snapshot; - - use super::Details; - - #[test] - fn bad_deser() { - let details = Details::DeleteTasks { - matched_tasks: 1, - deleted_tasks: None, - original_query: "hello".to_owned(), - }; - let serialised = SerdeJson::
::bytes_encode(&details).unwrap(); - let deserialised = SerdeJson::
::bytes_decode(&serialised).unwrap(); - assert_smol_debug_snapshot!(details, @r###"DeleteTasks { matched_tasks: 1, deleted_tasks: None, original_query: "hello" }"###); - assert_smol_debug_snapshot!(deserialised, @r###"DeleteTasks { matched_tasks: 1, deleted_tasks: None, original_query: "hello" }"###); - } -} diff --git a/meilisearch-http/src/routes/dump.rs b/meilisearch-http/src/routes/dump.rs index e0a7356cf..c792357ea 100644 --- a/meilisearch-http/src/routes/dump.rs +++ b/meilisearch-http/src/routes/dump.rs @@ -1,9 +1,9 @@ use actix_web::web::Data; use actix_web::{web, HttpRequest, HttpResponse}; use index_scheduler::IndexScheduler; -use index_scheduler::KindWithContent; use log::debug; use meilisearch_types::error::ResponseError; +use meilisearch_types::tasks::KindWithContent; use serde_json::json; use crate::analytics::Analytics; diff --git a/meilisearch-http/src/routes/indexes/documents.rs b/meilisearch-http/src/routes/indexes/documents.rs index e1f493a47..cabdae502 100644 --- a/meilisearch-http/src/routes/indexes/documents.rs +++ b/meilisearch-http/src/routes/indexes/documents.rs @@ -6,14 +6,14 @@ use actix_web::HttpMessage; use actix_web::{web, HttpRequest, HttpResponse}; use bstr::ByteSlice; use futures::StreamExt; -use index_scheduler::{IndexScheduler, KindWithContent}; +use index_scheduler::IndexScheduler; use log::debug; use meilisearch_types::document_formats::{read_csv, read_json, read_ndjson, PayloadType}; use meilisearch_types::error::ResponseError; use meilisearch_types::heed::RoTxn; use meilisearch_types::milli::update::IndexDocumentsMethod; use meilisearch_types::star_or::StarOr; -use meilisearch_types::tasks::TaskView; +use meilisearch_types::tasks::KindWithContent; use meilisearch_types::{milli, Document, Index}; use mime::Mime; use once_cell::sync::Lazy; @@ -26,7 +26,7 @@ use crate::error::MeilisearchHttpError; use crate::extractors::authentication::{policies::*, GuardedData}; use crate::extractors::payload::Payload; use crate::extractors::sequential_extractor::SeqHandler; -use crate::routes::{fold_star_or, PaginationView}; +use crate::routes::{fold_star_or, PaginationView, SummarizedTaskView}; static ACCEPTED_CONTENT_TYPE: Lazy> = Lazy::new(|| { vec![ @@ -216,7 +216,7 @@ async fn document_addition( mut body: Payload, method: IndexDocumentsMethod, allow_index_creation: bool, -) -> Result { +) -> Result { let format = match mime_type .as_ref() .map(|m| (m.type_().as_str(), m.subtype().as_str())) @@ -292,7 +292,7 @@ async fn document_addition( }; debug!("returns: {:?}", task); - Ok(task) + Ok(task.into()) } pub async fn delete_documents( diff --git a/meilisearch-http/src/routes/indexes/mod.rs b/meilisearch-http/src/routes/indexes/mod.rs index 9b2ed0d2b..7a6d4607f 100644 --- a/meilisearch-http/src/routes/indexes/mod.rs +++ b/meilisearch-http/src/routes/indexes/mod.rs @@ -1,10 +1,10 @@ use actix_web::web::Data; use actix_web::{web, HttpRequest, HttpResponse}; -use index_scheduler::{IndexScheduler, KindWithContent, Query}; +use index_scheduler::{IndexScheduler, Query}; use log::debug; use meilisearch_types::error::ResponseError; use meilisearch_types::milli::{self, FieldDistribution, Index}; -use meilisearch_types::tasks::Status; +use meilisearch_types::tasks::{KindWithContent, Status}; use serde::{Deserialize, Serialize}; use serde_json::json; use time::OffsetDateTime; diff --git a/meilisearch-http/src/routes/indexes/settings.rs b/meilisearch-http/src/routes/indexes/settings.rs index c74c0dbcc..f9eec1427 100644 --- a/meilisearch-http/src/routes/indexes/settings.rs +++ b/meilisearch-http/src/routes/indexes/settings.rs @@ -6,7 +6,7 @@ use fst::IntoStreamer; use log::debug; use actix_web::{web, HttpRequest, HttpResponse}; -use index_scheduler::{IndexScheduler, KindWithContent}; +use index_scheduler::IndexScheduler; use meilisearch_types::error::ResponseError; use meilisearch_types::heed::RoTxn; use meilisearch_types::milli::update::Setting; @@ -15,6 +15,7 @@ use meilisearch_types::settings::{ Checked, FacetingSettings, MinWordSizeTyposSetting, PaginationSettings, Settings, TypoSettings, Unchecked, }; +use meilisearch_types::tasks::KindWithContent; use meilisearch_types::Index; use serde_json::json; @@ -30,9 +31,10 @@ macro_rules! make_setting_route { use actix_web::{web, HttpRequest, HttpResponse, Resource}; use log::debug; - use index_scheduler::{IndexScheduler, KindWithContent}; + use index_scheduler::IndexScheduler; use meilisearch_types::milli::update::Setting; use meilisearch_types::settings::Settings; + use meilisearch_types::tasks::KindWithContent; use meilisearch_types::error::ResponseError; use $crate::analytics::Analytics; diff --git a/meilisearch-http/src/routes/mod.rs b/meilisearch-http/src/routes/mod.rs index 6ac3733f7..da0e424f2 100644 --- a/meilisearch-http/src/routes/mod.rs +++ b/meilisearch-http/src/routes/mod.rs @@ -7,7 +7,7 @@ use log::debug; use meilisearch_types::error::ResponseError; use meilisearch_types::settings::{Settings, Unchecked}; use meilisearch_types::star_or::StarOr; -use meilisearch_types::tasks::Status; +use meilisearch_types::tasks::{Kind, Status, Task, TaskId}; use serde::{Deserialize, Serialize}; use serde_json::json; use time::OffsetDateTime; @@ -49,6 +49,30 @@ where const PAGINATION_DEFAULT_LIMIT: fn() -> usize = || 20; +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct SummarizedTaskView { + task_uid: TaskId, + index_uid: Option, + status: Status, + #[serde(rename = "type")] + kind: Kind, + #[serde(serialize_with = "time::serde::rfc3339::serialize")] + enqueued_at: OffsetDateTime, +} + +impl From for SummarizedTaskView { + fn from(task: Task) -> Self { + SummarizedTaskView { + task_uid: task.uid, + index_uid: task.index_uid().map(|s| s.to_string()), + status: task.status, + kind: task.kind.as_kind(), + enqueued_at: task.enqueued_at, + } + } +} + #[derive(Debug, Clone, Copy, Deserialize)] #[serde(rename_all = "camelCase", deny_unknown_fields)] pub struct Pagination { @@ -266,7 +290,7 @@ async fn get_stats( )?; let processing_index = processing_task .first() - .and_then(|task| task.index_uid.clone()); + .and_then(|task| task.index_uid().clone()); for (name, index) in index_scheduler.indexes()? { if !search_rules.is_index_authorized(&name) { diff --git a/meilisearch-http/src/routes/tasks.rs b/meilisearch-http/src/routes/tasks.rs index 25385e2bf..878ed8383 100644 --- a/meilisearch-http/src/routes/tasks.rs +++ b/meilisearch-http/src/routes/tasks.rs @@ -3,11 +3,13 @@ use actix_web::{web, HttpRequest, HttpResponse}; use index_scheduler::{IndexScheduler, TaskId}; use meilisearch_types::error::ResponseError; use meilisearch_types::index_uid::IndexUid; +use meilisearch_types::settings::{Settings, Unchecked}; use meilisearch_types::star_or::StarOr; -use meilisearch_types::tasks::{Kind, Status}; -use serde::Deserialize; +use meilisearch_types::tasks::{serialize_duration, Details, Kind, Status, Task}; +use serde::{Deserialize, Serialize}; use serde_cs::vec::CS; use serde_json::json; +use time::{Duration, OffsetDateTime}; use crate::analytics::Analytics; use crate::extractors::authentication::{policies::*, GuardedData}; @@ -22,6 +24,140 @@ pub fn configure(cfg: &mut web::ServiceConfig) { .service(web::resource("/{task_id}").route(web::get().to(SeqHandler(get_task)))); } +#[derive(Debug, Clone, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct TaskView { + pub uid: TaskId, + #[serde(default)] + pub index_uid: Option, + pub status: Status, + #[serde(rename = "type")] + pub kind: Kind, + + #[serde(skip_serializing_if = "Option::is_none")] + pub details: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub error: Option, + + #[serde( + serialize_with = "serialize_duration", + skip_serializing_if = "Option::is_none", + default + )] + pub duration: Option, + #[serde(with = "time::serde::rfc3339")] + pub enqueued_at: OffsetDateTime, + #[serde( + with = "time::serde::rfc3339::option", + skip_serializing_if = "Option::is_none", + default + )] + pub started_at: Option, + #[serde( + with = "time::serde::rfc3339::option", + skip_serializing_if = "Option::is_none", + default + )] + pub finished_at: Option, +} + +impl From for TaskView { + fn from(task: Task) -> Self { + TaskView { + uid: task.uid, + index_uid: task + .indexes() + .and_then(|vec| vec.first().map(|i| i.to_string())), + status: task.status, + kind: task.kind.as_kind(), + details: task.details.map(DetailsView::from), + error: task.error.clone(), + duration: task + .started_at + .zip(task.finished_at) + .map(|(start, end)| end - start), + enqueued_at: task.enqueued_at, + started_at: task.started_at, + finished_at: task.finished_at, + } + } +} + +#[derive(Default, Debug, PartialEq, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct DetailsView { + #[serde(skip_serializing_if = "Option::is_none")] + pub received_documents: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub indexed_documents: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub primary_key: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub received_document_ids: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub deleted_documents: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub matched_tasks: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub deleted_tasks: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub original_query: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub dump_uid: Option, + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(flatten)] + pub settings: Option>, +} + +impl From
for DetailsView { + fn from(details: Details) -> Self { + match details.clone() { + Details::DocumentAddition { + received_documents, + indexed_documents, + } => DetailsView { + received_documents: Some(received_documents), + indexed_documents: Some(indexed_documents), + ..DetailsView::default() + }, + Details::Settings { settings } => DetailsView { + settings: Some(settings), + ..DetailsView::default() + }, + Details::IndexInfo { primary_key } => DetailsView { + primary_key: Some(primary_key), + ..DetailsView::default() + }, + Details::DocumentDeletion { + received_document_ids, + deleted_documents, + } => DetailsView { + received_document_ids: Some(received_document_ids), + deleted_documents: Some(deleted_documents), + ..DetailsView::default() + }, + Details::ClearAll { deleted_documents } => DetailsView { + deleted_documents: Some(deleted_documents), + ..DetailsView::default() + }, + Details::DeleteTasks { + matched_tasks, + deleted_tasks, + original_query, + } => DetailsView { + matched_tasks: Some(matched_tasks), + deleted_tasks: Some(deleted_tasks), + original_query: Some(original_query), + ..DetailsView::default() + }, + Details::Dump { dump_uid } => DetailsView { + dump_uid: Some(dump_uid), + ..DetailsView::default() + }, + } + } +} + #[derive(Deserialize, Debug)] #[serde(rename_all = "camelCase", deny_unknown_fields)] pub struct TasksFilterQuery { diff --git a/meilisearch-http/src/search.rs b/meilisearch-http/src/search.rs new file mode 100644 index 000000000..f53fdb036 --- /dev/null +++ b/meilisearch-http/src/search.rs @@ -0,0 +1,694 @@ +use std::cmp::min; +use std::collections::{BTreeMap, BTreeSet, HashSet}; +use std::str::FromStr; +use std::time::Instant; + +use either::Either; +use meilisearch_types::{milli, Document}; +use milli::tokenizer::TokenizerBuilder; +use milli::{ + AscDesc, FieldId, FieldsIdsMap, Filter, FormatOptions, Index, MatchBounds, MatcherBuilder, + SortError, TermsMatchingStrategy, DEFAULT_VALUES_PER_FACET, +}; +use regex::Regex; +use serde::{Deserialize, Serialize}; +use serde_json::{json, Value}; + +use crate::error::MeilisearchHttpError; + +type MatchesPosition = BTreeMap>; + +pub const DEFAULT_SEARCH_LIMIT: fn() -> usize = || 20; +pub const DEFAULT_CROP_LENGTH: fn() -> usize = || 10; +pub const DEFAULT_CROP_MARKER: fn() -> String = || "…".to_string(); +pub const DEFAULT_HIGHLIGHT_PRE_TAG: fn() -> String = || "".to_string(); +pub const DEFAULT_HIGHLIGHT_POST_TAG: fn() -> String = || "".to_string(); + +/// The maximimum number of results that the engine +/// will be able to return in one search call. +pub const DEFAULT_PAGINATION_MAX_TOTAL_HITS: usize = 1000; + +#[derive(Deserialize, Debug, Clone, PartialEq, Eq)] +#[serde(rename_all = "camelCase", deny_unknown_fields)] +pub struct SearchQuery { + pub q: Option, + pub offset: Option, + #[serde(default = "DEFAULT_SEARCH_LIMIT")] + pub limit: usize, + pub attributes_to_retrieve: Option>, + pub attributes_to_crop: Option>, + #[serde(default = "DEFAULT_CROP_LENGTH")] + pub crop_length: usize, + pub attributes_to_highlight: Option>, + // Default to false + #[serde(default = "Default::default")] + pub show_matches_position: bool, + pub filter: Option, + pub sort: Option>, + pub facets: Option>, + #[serde(default = "DEFAULT_HIGHLIGHT_PRE_TAG")] + pub highlight_pre_tag: String, + #[serde(default = "DEFAULT_HIGHLIGHT_POST_TAG")] + pub highlight_post_tag: String, + #[serde(default = "DEFAULT_CROP_MARKER")] + pub crop_marker: String, + #[serde(default)] + pub matching_strategy: MatchingStrategy, +} + +#[derive(Deserialize, Debug, Clone, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +pub enum MatchingStrategy { + /// Remove query words from last to first + Last, + /// All query words are mandatory + All, +} + +impl Default for MatchingStrategy { + fn default() -> Self { + Self::Last + } +} + +impl From for TermsMatchingStrategy { + fn from(other: MatchingStrategy) -> Self { + match other { + MatchingStrategy::Last => Self::Last, + MatchingStrategy::All => Self::All, + } + } +} + +#[derive(Debug, Clone, Serialize, PartialEq)] +pub struct SearchHit { + #[serde(flatten)] + pub document: Document, + #[serde(rename = "_formatted", skip_serializing_if = "Document::is_empty")] + pub formatted: Document, + #[serde(rename = "_matchesPosition", skip_serializing_if = "Option::is_none")] + pub matches_position: Option, +} + +#[derive(Serialize, Debug, Clone, PartialEq)] +#[serde(rename_all = "camelCase")] +pub struct SearchResult { + pub hits: Vec, + pub estimated_total_hits: u64, + pub query: String, + pub limit: usize, + pub offset: usize, + pub processing_time_ms: u128, + #[serde(skip_serializing_if = "Option::is_none")] + pub facet_distribution: Option>>, +} + +pub fn perform_search( + index: &Index, + query: SearchQuery, +) -> Result { + let before_search = Instant::now(); + let rtxn = index.read_txn()?; + + let mut search = index.search(&rtxn); + + if let Some(ref query) = query.q { + search.query(query); + } + + search.terms_matching_strategy(query.matching_strategy.into()); + + let max_total_hits = index + .pagination_max_total_hits(&rtxn) + .map_err(milli::Error::from)? + .unwrap_or(DEFAULT_PAGINATION_MAX_TOTAL_HITS); + + // Make sure that a user can't get more documents than the hard limit, + // we align that on the offset too. + let offset = min(query.offset.unwrap_or(0), max_total_hits); + let limit = min(query.limit, max_total_hits.saturating_sub(offset)); + + search.offset(offset); + search.limit(limit); + + if let Some(ref filter) = query.filter { + if let Some(facets) = parse_filter(filter)? { + search.filter(facets); + } + } + + if let Some(ref sort) = query.sort { + let sort = match sort.iter().map(|s| AscDesc::from_str(s)).collect() { + Ok(sorts) => sorts, + Err(asc_desc_error) => { + return Err(milli::Error::from(SortError::from(asc_desc_error)).into()) + } + }; + + search.sort_criteria(sort); + } + + let milli::SearchResult { + documents_ids, + matching_words, + candidates, + .. + } = search.execute()?; + + let fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); + + let displayed_ids = index + .displayed_fields_ids(&rtxn)? + .map(|fields| fields.into_iter().collect::>()) + .unwrap_or_else(|| fields_ids_map.iter().map(|(id, _)| id).collect()); + + let fids = |attrs: &BTreeSet| { + let mut ids = BTreeSet::new(); + for attr in attrs { + if attr == "*" { + ids = displayed_ids.clone(); + break; + } + + if let Some(id) = fields_ids_map.id(attr) { + ids.insert(id); + } + } + ids + }; + + // The attributes to retrieve are the ones explicitly marked as to retrieve (all by default), + // but these attributes must be also be present + // - in the fields_ids_map + // - in the the displayed attributes + let to_retrieve_ids: BTreeSet<_> = query + .attributes_to_retrieve + .as_ref() + .map(fids) + .unwrap_or_else(|| displayed_ids.clone()) + .intersection(&displayed_ids) + .cloned() + .collect(); + + let attr_to_highlight = query.attributes_to_highlight.unwrap_or_default(); + + let attr_to_crop = query.attributes_to_crop.unwrap_or_default(); + + // Attributes in `formatted_options` correspond to the attributes that will be in `_formatted` + // These attributes are: + // - the attributes asked to be highlighted or cropped (with `attributesToCrop` or `attributesToHighlight`) + // - the attributes asked to be retrieved: these attributes will not be highlighted/cropped + // But these attributes must be also present in displayed attributes + let formatted_options = compute_formatted_options( + &attr_to_highlight, + &attr_to_crop, + query.crop_length, + &to_retrieve_ids, + &fields_ids_map, + &displayed_ids, + ); + + let tokenizer = TokenizerBuilder::default().build(); + + let mut formatter_builder = MatcherBuilder::new(matching_words, tokenizer); + formatter_builder.crop_marker(query.crop_marker); + formatter_builder.highlight_prefix(query.highlight_pre_tag); + formatter_builder.highlight_suffix(query.highlight_post_tag); + + let mut documents = Vec::new(); + + let documents_iter = index.documents(&rtxn, documents_ids)?; + + for (_id, obkv) in documents_iter { + // First generate a document with all the displayed fields + let displayed_document = make_document(&displayed_ids, &fields_ids_map, obkv)?; + + // select the attributes to retrieve + let attributes_to_retrieve = to_retrieve_ids + .iter() + .map(|&fid| fields_ids_map.name(fid).expect("Missing field name")); + let mut document = + permissive_json_pointer::select_values(&displayed_document, attributes_to_retrieve); + + let (matches_position, formatted) = format_fields( + &displayed_document, + &fields_ids_map, + &formatter_builder, + &formatted_options, + query.show_matches_position, + &displayed_ids, + )?; + + if let Some(sort) = query.sort.as_ref() { + insert_geo_distance(sort, &mut document); + } + + let hit = SearchHit { + document, + formatted, + matches_position, + }; + documents.push(hit); + } + + let estimated_total_hits = candidates.len(); + + let facet_distribution = match query.facets { + Some(ref fields) => { + let mut facet_distribution = index.facets_distribution(&rtxn); + + let max_values_by_facet = index + .max_values_per_facet(&rtxn) + .map_err(milli::Error::from)? + .unwrap_or(DEFAULT_VALUES_PER_FACET); + facet_distribution.max_values_per_facet(max_values_by_facet); + + if fields.iter().all(|f| f != "*") { + facet_distribution.facets(fields); + } + let distribution = facet_distribution.candidates(candidates).execute()?; + + Some(distribution) + } + None => None, + }; + + let result = SearchResult { + hits: documents, + estimated_total_hits, + query: query.q.clone().unwrap_or_default(), + limit: query.limit, + offset: query.offset.unwrap_or_default(), + processing_time_ms: before_search.elapsed().as_millis(), + facet_distribution, + }; + Ok(result) +} + +fn insert_geo_distance(sorts: &[String], document: &mut Document) { + lazy_static::lazy_static! { + static ref GEO_REGEX: Regex = + Regex::new(r"_geoPoint\(\s*([[:digit:].\-]+)\s*,\s*([[:digit:].\-]+)\s*\)").unwrap(); + }; + if let Some(capture_group) = sorts.iter().find_map(|sort| GEO_REGEX.captures(sort)) { + // TODO: TAMO: milli encountered an internal error, what do we want to do? + let base = [ + capture_group[1].parse().unwrap(), + capture_group[2].parse().unwrap(), + ]; + let geo_point = &document.get("_geo").unwrap_or(&json!(null)); + if let Some((lat, lng)) = geo_point["lat"].as_f64().zip(geo_point["lng"].as_f64()) { + let distance = milli::distance_between_two_points(&base, &[lat, lng]); + document.insert("_geoDistance".to_string(), json!(distance.round() as usize)); + } + } +} + +fn compute_formatted_options( + attr_to_highlight: &HashSet, + attr_to_crop: &[String], + query_crop_length: usize, + to_retrieve_ids: &BTreeSet, + fields_ids_map: &FieldsIdsMap, + displayed_ids: &BTreeSet, +) -> BTreeMap { + let mut formatted_options = BTreeMap::new(); + + add_highlight_to_formatted_options( + &mut formatted_options, + attr_to_highlight, + fields_ids_map, + displayed_ids, + ); + + add_crop_to_formatted_options( + &mut formatted_options, + attr_to_crop, + query_crop_length, + fields_ids_map, + displayed_ids, + ); + + // Should not return `_formatted` if no valid attributes to highlight/crop + if !formatted_options.is_empty() { + add_non_formatted_ids_to_formatted_options(&mut formatted_options, to_retrieve_ids); + } + + formatted_options +} + +fn add_highlight_to_formatted_options( + formatted_options: &mut BTreeMap, + attr_to_highlight: &HashSet, + fields_ids_map: &FieldsIdsMap, + displayed_ids: &BTreeSet, +) { + for attr in attr_to_highlight { + let new_format = FormatOptions { + highlight: true, + crop: None, + }; + + if attr == "*" { + for id in displayed_ids { + formatted_options.insert(*id, new_format); + } + break; + } + + if let Some(id) = fields_ids_map.id(attr) { + if displayed_ids.contains(&id) { + formatted_options.insert(id, new_format); + } + } + } +} + +fn add_crop_to_formatted_options( + formatted_options: &mut BTreeMap, + attr_to_crop: &[String], + crop_length: usize, + fields_ids_map: &FieldsIdsMap, + displayed_ids: &BTreeSet, +) { + for attr in attr_to_crop { + let mut split = attr.rsplitn(2, ':'); + let (attr_name, attr_len) = match split.next().zip(split.next()) { + Some((len, name)) => { + let crop_len = len.parse::().unwrap_or(crop_length); + (name, crop_len) + } + None => (attr.as_str(), crop_length), + }; + + if attr_name == "*" { + for id in displayed_ids { + formatted_options + .entry(*id) + .and_modify(|f| f.crop = Some(attr_len)) + .or_insert(FormatOptions { + highlight: false, + crop: Some(attr_len), + }); + } + } + + if let Some(id) = fields_ids_map.id(attr_name) { + if displayed_ids.contains(&id) { + formatted_options + .entry(id) + .and_modify(|f| f.crop = Some(attr_len)) + .or_insert(FormatOptions { + highlight: false, + crop: Some(attr_len), + }); + } + } + } +} + +fn add_non_formatted_ids_to_formatted_options( + formatted_options: &mut BTreeMap, + to_retrieve_ids: &BTreeSet, +) { + for id in to_retrieve_ids { + formatted_options.entry(*id).or_insert(FormatOptions { + highlight: false, + crop: None, + }); + } +} + +fn make_document( + displayed_attributes: &BTreeSet, + field_ids_map: &FieldsIdsMap, + obkv: obkv::KvReaderU16, +) -> Result { + let mut document = serde_json::Map::new(); + + // recreate the original json + for (key, value) in obkv.iter() { + let value = serde_json::from_slice(value)?; + let key = field_ids_map + .name(key) + .expect("Missing field name") + .to_string(); + + document.insert(key, value); + } + + // select the attributes to retrieve + let displayed_attributes = displayed_attributes + .iter() + .map(|&fid| field_ids_map.name(fid).expect("Missing field name")); + + let document = permissive_json_pointer::select_values(&document, displayed_attributes); + Ok(document) +} + +fn format_fields<'a, A: AsRef<[u8]>>( + document: &Document, + field_ids_map: &FieldsIdsMap, + builder: &MatcherBuilder<'a, A>, + formatted_options: &BTreeMap, + compute_matches: bool, + displayable_ids: &BTreeSet, +) -> Result<(Option, Document), MeilisearchHttpError> { + let mut matches_position = compute_matches.then(BTreeMap::new); + let mut document = document.clone(); + + // select the attributes to retrieve + let displayable_names = displayable_ids + .iter() + .map(|&fid| field_ids_map.name(fid).expect("Missing field name")); + permissive_json_pointer::map_leaf_values(&mut document, displayable_names, |key, value| { + // To get the formatting option of each key we need to see all the rules that applies + // to the value and merge them together. eg. If a user said he wanted to highlight `doggo` + // and crop `doggo.name`. `doggo.name` needs to be highlighted + cropped while `doggo.age` is only + // highlighted. + let format = formatted_options + .iter() + .filter(|(field, _option)| { + let name = field_ids_map.name(**field).unwrap(); + milli::is_faceted_by(name, key) || milli::is_faceted_by(key, name) + }) + .map(|(_, option)| *option) + .reduce(|acc, option| acc.merge(option)); + let mut infos = Vec::new(); + + *value = format_value( + std::mem::take(value), + builder, + format, + &mut infos, + compute_matches, + ); + + if let Some(matches) = matches_position.as_mut() { + if !infos.is_empty() { + matches.insert(key.to_owned(), infos); + } + } + }); + + let selectors = formatted_options + .keys() + // This unwrap must be safe since we got the ids from the fields_ids_map just + // before. + .map(|&fid| field_ids_map.name(fid).unwrap()); + let document = permissive_json_pointer::select_values(&document, selectors); + + Ok((matches_position, document)) +} + +fn format_value<'a, A: AsRef<[u8]>>( + value: Value, + builder: &MatcherBuilder<'a, A>, + format_options: Option, + infos: &mut Vec, + compute_matches: bool, +) -> Value { + match value { + Value::String(old_string) => { + let mut matcher = builder.build(&old_string); + if compute_matches { + let matches = matcher.matches(); + infos.extend_from_slice(&matches[..]); + } + + match format_options { + Some(format_options) => { + let value = matcher.format(format_options); + Value::String(value.into_owned()) + } + None => Value::String(old_string), + } + } + Value::Array(values) => Value::Array( + values + .into_iter() + .map(|v| { + format_value( + v, + builder, + format_options.map(|format_options| FormatOptions { + highlight: format_options.highlight, + crop: None, + }), + infos, + compute_matches, + ) + }) + .collect(), + ), + Value::Object(object) => Value::Object( + object + .into_iter() + .map(|(k, v)| { + ( + k, + format_value( + v, + builder, + format_options.map(|format_options| FormatOptions { + highlight: format_options.highlight, + crop: None, + }), + infos, + compute_matches, + ), + ) + }) + .collect(), + ), + Value::Number(number) => { + let s = number.to_string(); + + let mut matcher = builder.build(&s); + if compute_matches { + let matches = matcher.matches(); + infos.extend_from_slice(&matches[..]); + } + + match format_options { + Some(format_options) => { + let value = matcher.format(format_options); + Value::String(value.into_owned()) + } + None => Value::Number(number), + } + } + value => value, + } +} + +fn parse_filter(facets: &Value) -> Result, MeilisearchHttpError> { + match facets { + Value::String(expr) => { + let condition = Filter::from_str(expr)?; + Ok(condition) + } + Value::Array(arr) => parse_filter_array(arr), + v => Err(MeilisearchHttpError::InvalidExpression(&["Array"], v.clone()).into()), + } +} + +fn parse_filter_array(arr: &[Value]) -> Result, MeilisearchHttpError> { + let mut ands = Vec::new(); + for value in arr { + match value { + Value::String(s) => ands.push(Either::Right(s.as_str())), + Value::Array(arr) => { + let mut ors = Vec::new(); + for value in arr { + match value { + Value::String(s) => ors.push(s.as_str()), + v => { + return Err(MeilisearchHttpError::InvalidExpression( + &["String"], + v.clone(), + ) + .into()) + } + } + } + ands.push(Either::Left(ors)); + } + v => { + return Err(MeilisearchHttpError::InvalidExpression( + &["String", "[String]"], + v.clone(), + ) + .into()) + } + } + } + + Ok(Filter::from_array(ands)?) +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_insert_geo_distance() { + let value: Document = serde_json::from_str( + r#"{ + "_geo": { + "lat": 50.629973371633746, + "lng": 3.0569447399419567 + }, + "city": "Lille", + "id": "1" + }"#, + ) + .unwrap(); + + let sorters = &["_geoPoint(50.629973371633746,3.0569447399419567):desc".to_string()]; + let mut document = value.clone(); + insert_geo_distance(sorters, &mut document); + assert_eq!(document.get("_geoDistance"), Some(&json!(0))); + + let sorters = &["_geoPoint(50.629973371633746, 3.0569447399419567):asc".to_string()]; + let mut document = value.clone(); + insert_geo_distance(sorters, &mut document); + assert_eq!(document.get("_geoDistance"), Some(&json!(0))); + + let sorters = + &["_geoPoint( 50.629973371633746 , 3.0569447399419567 ):desc".to_string()]; + let mut document = value.clone(); + insert_geo_distance(sorters, &mut document); + assert_eq!(document.get("_geoDistance"), Some(&json!(0))); + + let sorters = &[ + "prix:asc", + "villeneuve:desc", + "_geoPoint(50.629973371633746, 3.0569447399419567):asc", + "ubu:asc", + ] + .map(|s| s.to_string()); + let mut document = value.clone(); + insert_geo_distance(sorters, &mut document); + assert_eq!(document.get("_geoDistance"), Some(&json!(0))); + + // only the first geoPoint is used to compute the distance + let sorters = &[ + "chien:desc", + "_geoPoint(50.629973371633746, 3.0569447399419567):asc", + "pangolin:desc", + "_geoPoint(100.0, -80.0):asc", + "chat:asc", + ] + .map(|s| s.to_string()); + let mut document = value.clone(); + insert_geo_distance(sorters, &mut document); + assert_eq!(document.get("_geoDistance"), Some(&json!(0))); + + // there was no _geoPoint so nothing is inserted in the document + let sorters = &["chien:asc".to_string()]; + let mut document = value; + insert_geo_distance(sorters, &mut document); + assert_eq!(document.get("_geoDistance"), None); + } +} diff --git a/meilisearch-types/Cargo.toml b/meilisearch-types/Cargo.toml index dfbf9edf7..4e068b903 100644 --- a/meilisearch-types/Cargo.toml +++ b/meilisearch-types/Cargo.toml @@ -15,6 +15,7 @@ serde = { version = "1.0.145", features = ["derive"] } serde_json = "1.0.85" time = { version = "0.3.7", features = ["serde-well-known", "formatting", "parsing", "macros"] } tokio = "1.0" +uuid = { version = "1.1.2", features = ["serde", "v4"] } [dev-dependencies] proptest = "1.0.0" diff --git a/meilisearch-types/src/tasks.rs b/meilisearch-types/src/tasks.rs index 2aa2dabb3..dd8b94684 100644 --- a/meilisearch-types/src/tasks.rs +++ b/meilisearch-types/src/tasks.rs @@ -2,9 +2,11 @@ use milli::update::IndexDocumentsMethod; use serde::{Deserialize, Serialize, Serializer}; use std::{ fmt::{Display, Write}, + path::PathBuf, str::FromStr, }; use time::{Duration, OffsetDateTime}; +use uuid::Uuid; use crate::{ error::{Code, ResponseError}, @@ -13,74 +15,192 @@ use crate::{ pub type TaskId = u32; -#[derive(Debug, Clone, PartialEq, Serialize)] +#[derive(Debug, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] -pub struct TaskView { +pub struct Task { pub uid: TaskId, - #[serde(default)] - pub index_uid: Option, - pub status: Status, - // TODO use our own Kind for the user - #[serde(rename = "type")] - pub kind: Kind, - #[serde(skip_serializing_if = "Option::is_none")] - pub details: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub error: Option, - - #[serde( - serialize_with = "serialize_duration", - skip_serializing_if = "Option::is_none", - default - )] - pub duration: Option, #[serde(with = "time::serde::rfc3339")] pub enqueued_at: OffsetDateTime, - #[serde( - with = "time::serde::rfc3339::option", - skip_serializing_if = "Option::is_none", - default - )] + #[serde(with = "time::serde::rfc3339::option")] pub started_at: Option, - #[serde( - with = "time::serde::rfc3339::option", - skip_serializing_if = "Option::is_none", - default - )] + #[serde(with = "time::serde::rfc3339::option")] pub finished_at: Option, + + pub error: Option, + pub details: Option
, + + pub status: Status, + pub kind: KindWithContent, } -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +impl Task { + pub fn index_uid(&self) -> Option<&str> { + use KindWithContent::*; + + match &self.kind { + DumpExport { .. } + | Snapshot + | CancelTask { .. } + | DeleteTasks { .. } + | IndexSwap { .. } => None, + DocumentImport { index_uid, .. } + | DocumentDeletion { index_uid, .. } + | DocumentClear { index_uid } + | Settings { index_uid, .. } + | IndexCreation { index_uid, .. } + | IndexUpdate { index_uid, .. } + | IndexDeletion { index_uid } => Some(index_uid), + } + } + + /// Return the list of indexes updated by this tasks. + pub fn indexes(&self) -> Option> { + use KindWithContent::*; + + match &self.kind { + DumpExport { .. } | Snapshot | CancelTask { .. } | DeleteTasks { .. } => None, + DocumentImport { index_uid, .. } + | DocumentDeletion { index_uid, .. } + | DocumentClear { index_uid } + | Settings { index_uid, .. } + | IndexCreation { index_uid, .. } + | IndexUpdate { index_uid, .. } + | IndexDeletion { index_uid } => Some(vec![index_uid]), + IndexSwap { lhs, rhs } => Some(vec![lhs, rhs]), + } + } +} + +#[derive(Debug, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] -pub struct TaskDump { - pub uid: TaskId, - #[serde(default)] - pub index_uid: Option, - pub status: Status, - // TODO use our own Kind for the user - #[serde(rename = "type")] - pub kind: Kind, +pub enum KindWithContent { + DocumentImport { + index_uid: String, + primary_key: Option, + method: IndexDocumentsMethod, + content_file: Uuid, + documents_count: u64, + allow_index_creation: bool, + }, + DocumentDeletion { + index_uid: String, + documents_ids: Vec, + }, + DocumentClear { + index_uid: String, + }, + Settings { + index_uid: String, + new_settings: Settings, + is_deletion: bool, + allow_index_creation: bool, + }, + IndexDeletion { + index_uid: String, + }, + IndexCreation { + index_uid: String, + primary_key: Option, + }, + IndexUpdate { + index_uid: String, + primary_key: Option, + }, + IndexSwap { + lhs: String, + rhs: String, + }, + CancelTask { + tasks: Vec, + }, + DeleteTasks { + query: String, + tasks: Vec, + }, + DumpExport { + output: PathBuf, + }, + Snapshot, +} - #[serde(skip_serializing_if = "Option::is_none")] - pub details: Option
, - #[serde(skip_serializing_if = "Option::is_none")] - pub error: Option, +impl KindWithContent { + pub fn as_kind(&self) -> Kind { + match self { + KindWithContent::DocumentImport { .. } => Kind::DocumentImport, + KindWithContent::DocumentDeletion { .. } => Kind::DocumentDeletion, + KindWithContent::DocumentClear { .. } => Kind::DocumentClear, + KindWithContent::Settings { .. } => Kind::Settings, + KindWithContent::IndexCreation { .. } => Kind::IndexCreation, + KindWithContent::IndexDeletion { .. } => Kind::IndexDeletion, + KindWithContent::IndexUpdate { .. } => Kind::IndexUpdate, + KindWithContent::IndexSwap { .. } => Kind::IndexSwap, + KindWithContent::CancelTask { .. } => Kind::CancelTask, + KindWithContent::DeleteTasks { .. } => Kind::DeleteTasks, + KindWithContent::DumpExport { .. } => Kind::DumpExport, + KindWithContent::Snapshot => Kind::Snapshot, + } + } - #[serde(with = "time::serde::rfc3339")] - pub enqueued_at: OffsetDateTime, - #[serde( - with = "time::serde::rfc3339::option", - skip_serializing_if = "Option::is_none", - default - )] - pub started_at: Option, - #[serde( - with = "time::serde::rfc3339::option", - skip_serializing_if = "Option::is_none", - default - )] - pub finished_at: Option, + pub fn indexes(&self) -> Option> { + use KindWithContent::*; + + match self { + DumpExport { .. } | Snapshot | CancelTask { .. } | DeleteTasks { .. } => None, + DocumentImport { index_uid, .. } + | DocumentDeletion { index_uid, .. } + | DocumentClear { index_uid } + | Settings { index_uid, .. } + | IndexCreation { index_uid, .. } + | IndexUpdate { index_uid, .. } + | IndexDeletion { index_uid } => Some(vec![index_uid]), + IndexSwap { lhs, rhs } => Some(vec![lhs, rhs]), + } + } + + /// Returns the default `Details` that correspond to this `KindWithContent`, + /// `None` if it cannot be generated. + pub fn default_details(&self) -> Option
{ + match self { + KindWithContent::DocumentImport { + documents_count, .. + } => Some(Details::DocumentAddition { + received_documents: *documents_count, + indexed_documents: 0, + }), + KindWithContent::DocumentDeletion { + index_uid: _, + documents_ids, + } => Some(Details::DocumentDeletion { + received_document_ids: documents_ids.len(), + deleted_documents: None, + }), + KindWithContent::DocumentClear { .. } => Some(Details::ClearAll { + deleted_documents: None, + }), + KindWithContent::Settings { new_settings, .. } => Some(Details::Settings { + settings: new_settings.clone(), + }), + KindWithContent::IndexDeletion { .. } => None, + KindWithContent::IndexCreation { primary_key, .. } + | KindWithContent::IndexUpdate { primary_key, .. } => Some(Details::IndexInfo { + primary_key: primary_key.clone(), + }), + KindWithContent::IndexSwap { .. } => { + todo!() + } + KindWithContent::CancelTask { .. } => { + None // TODO: check correctness of this return value + } + KindWithContent::DeleteTasks { query, tasks } => Some(Details::DeleteTasks { + matched_tasks: tasks.len(), + deleted_tasks: None, + original_query: query.clone(), + }), + KindWithContent::DumpExport { .. } => None, + KindWithContent::Snapshot => None, + } + } } #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] @@ -123,15 +243,10 @@ impl FromStr for Status { #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub enum Kind { - DocumentImport { - method: IndexDocumentsMethod, - allow_index_creation: bool, - }, + DocumentImport, DocumentDeletion, DocumentClear, - Settings { - allow_index_creation: bool, - }, + Settings, IndexCreation, IndexDeletion, IndexUpdate, @@ -147,22 +262,11 @@ impl FromStr for Kind { fn from_str(s: &str) -> Result { match s { - "document_addition" => Ok(Kind::DocumentImport { - method: IndexDocumentsMethod::ReplaceDocuments, - // TODO this doesn't make sense - allow_index_creation: false, - }), - "document_update" => Ok(Kind::DocumentImport { - method: IndexDocumentsMethod::UpdateDocuments, - // TODO this doesn't make sense - allow_index_creation: false, - }), + "document_addition" => Ok(Kind::DocumentImport), + "document_update" => Ok(Kind::DocumentImport), "document_deletion" => Ok(Kind::DocumentDeletion), "document_clear" => Ok(Kind::DocumentClear), - "settings" => Ok(Kind::Settings { - // TODO this doesn't make sense - allow_index_creation: false, - }), + "settings" => Ok(Kind::Settings), "index_creation" => Ok(Kind::IndexCreation), "index_deletion" => Ok(Kind::IndexDeletion), "index_update" => Ok(Kind::IndexUpdate), @@ -179,73 +283,6 @@ impl FromStr for Kind { } } -#[derive(Default, Debug, PartialEq, Clone, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct DetailsView { - #[serde(skip_serializing_if = "Option::is_none")] - pub received_documents: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub indexed_documents: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub primary_key: Option>, - #[serde(skip_serializing_if = "Option::is_none")] - pub received_document_ids: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub deleted_documents: Option>, - #[serde(skip_serializing_if = "Option::is_none")] - pub matched_tasks: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub deleted_tasks: Option>, - #[serde(skip_serializing_if = "Option::is_none")] - pub original_query: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub dump_uid: Option, - #[serde(skip_serializing_if = "Option::is_none")] - #[serde(flatten)] - pub settings: Option>, -} - -// A `Kind` specific version made for the dump. If modified you may break the dump. -#[derive(Debug, PartialEq, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub enum KindDump { - DocumentImport { - primary_key: Option, - method: IndexDocumentsMethod, - documents_count: u64, - allow_index_creation: bool, - }, - DocumentDeletion { - documents_ids: Vec, - }, - DocumentClear, - Settings { - new_settings: Settings, - is_deletion: bool, - allow_index_creation: bool, - }, - IndexDeletion, - IndexCreation { - primary_key: Option, - }, - IndexUpdate { - primary_key: Option, - }, - IndexSwap { - lhs: String, - rhs: String, - }, - CancelTask { - tasks: Vec, - }, - DeleteTasks { - query: String, - tasks: Vec, - }, - DumpExport, - Snapshot, -} - #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)] #[allow(clippy::large_enum_variant)] pub enum Details { @@ -277,59 +314,10 @@ pub enum Details { }, } -impl Details { - pub fn as_details_view(&self) -> DetailsView { - match self.clone() { - Details::DocumentAddition { - received_documents, - indexed_documents, - } => DetailsView { - received_documents: Some(received_documents), - indexed_documents: Some(indexed_documents), - ..DetailsView::default() - }, - Details::Settings { settings } => DetailsView { - settings: Some(settings), - ..DetailsView::default() - }, - Details::IndexInfo { primary_key } => DetailsView { - primary_key: Some(primary_key), - ..DetailsView::default() - }, - Details::DocumentDeletion { - received_document_ids, - deleted_documents, - } => DetailsView { - received_document_ids: Some(received_document_ids), - deleted_documents: Some(deleted_documents), - ..DetailsView::default() - }, - Details::ClearAll { deleted_documents } => DetailsView { - deleted_documents: Some(deleted_documents), - ..DetailsView::default() - }, - Details::DeleteTasks { - matched_tasks, - deleted_tasks, - original_query, - } => DetailsView { - matched_tasks: Some(matched_tasks), - deleted_tasks: Some(deleted_tasks), - original_query: Some(original_query), - ..DetailsView::default() - }, - Details::Dump { dump_uid } => DetailsView { - dump_uid: Some(dump_uid), - ..DetailsView::default() - }, - } - } -} - /// Serialize a `time::Duration` as a best effort ISO 8601 while waiting for /// https://github.com/time-rs/time/issues/378. /// This code is a port of the old code of time that was removed in 0.2. -fn serialize_duration( +pub fn serialize_duration( duration: &Option, serializer: S, ) -> Result { @@ -376,3 +364,26 @@ fn serialize_duration( None => serializer.serialize_none(), } } + +/* +#[cfg(test)] +mod tests { + use crate::assert_smol_debug_snapshot; + use crate::heed::{types::SerdeJson, BytesDecode, BytesEncode}; + + use super::Details; + + #[test] + fn bad_deser() { + let details = Details::DeleteTasks { + matched_tasks: 1, + deleted_tasks: None, + original_query: "hello".to_owned(), + }; + let serialised = SerdeJson::
::bytes_encode(&details).unwrap(); + let deserialised = SerdeJson::
::bytes_decode(&serialised).unwrap(); + assert_smol_debug_snapshot!(details, @r###"DeleteTasks { matched_tasks: 1, deleted_tasks: None, original_query: "hello" }"###); + assert_smol_debug_snapshot!(deserialised, @r###"DeleteTasks { matched_tasks: 1, deleted_tasks: None, original_query: "hello" }"###); + } +} +*/