From eae2c353eb54c91a78848fdc9e77ba591db25ee6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Thu, 20 Oct 2022 17:21:37 +0200 Subject: [PATCH] Remove once for all the meilisearch-lib crate --- meilisearch-lib/Cargo.toml | 85 --- meilisearch-lib/src/document_formats.rs | 170 ----- meilisearch-lib/src/dump/compat/v2.rs | 152 ---- meilisearch-lib/src/dump/loaders/v4.rs | 103 --- meilisearch-lib/src/index/dump.rs | 161 ---- meilisearch-lib/src/index/index.rs | 333 --------- meilisearch-lib/src/index/search.rs | 688 ------------------ meilisearch-lib/src/index/updates.rs | 559 -------------- .../src/index_resolver/index_store.rs | 108 --- meilisearch-lib/src/lib.rs | 50 -- meilisearch-lib/src/options.rs | 205 ------ meilisearch-lib/src/snapshot.rs | 204 ------ meilisearch-lib/src/tasks/task_store/mod.rs | 420 ----------- meilisearch-lib/src/tasks/task_store/store.rs | 377 ---------- 14 files changed, 3615 deletions(-) delete mode 100644 meilisearch-lib/Cargo.toml delete mode 100644 meilisearch-lib/src/document_formats.rs delete mode 100644 meilisearch-lib/src/dump/compat/v2.rs delete mode 100644 meilisearch-lib/src/dump/loaders/v4.rs delete mode 100644 meilisearch-lib/src/index/dump.rs delete mode 100644 meilisearch-lib/src/index/index.rs delete mode 100644 meilisearch-lib/src/index/search.rs delete mode 100644 meilisearch-lib/src/index/updates.rs delete mode 100644 meilisearch-lib/src/index_resolver/index_store.rs delete mode 100644 meilisearch-lib/src/lib.rs delete mode 100644 meilisearch-lib/src/options.rs delete mode 100644 meilisearch-lib/src/snapshot.rs delete mode 100644 meilisearch-lib/src/tasks/task_store/mod.rs delete mode 100644 meilisearch-lib/src/tasks/task_store/store.rs diff --git a/meilisearch-lib/Cargo.toml b/meilisearch-lib/Cargo.toml deleted file mode 100644 index 3b0b1eed1..000000000 --- a/meilisearch-lib/Cargo.toml +++ /dev/null @@ -1,85 +0,0 @@ -[package] -name = "meilisearch-lib" -version = "0.29.1" -edition = "2021" - -[dependencies] -actix-web = { version = "4.2.1", default-features = false } -anyhow = { version = "1.0.65", features = ["backtrace"] } -async-stream = "0.3.3" -async-trait = "0.1.57" -atomic_refcell = "0.1.8" -byte-unit = { version = "4.0.14", default-features = false, features = ["std", "serde"] } -bytes = "1.2.1" -clap = { version = "4.0.9", features = ["derive", "env"] } -crossbeam-channel = "0.5.6" -csv = "1.1.6" -derivative = "2.2.0" -either = { version = "1.8.0", features = ["serde"] } -flate2 = "1.0.24" -fs_extra = "1.2.0" -fst = "0.4.7" -futures = "0.3.24" -futures-util = "0.3.24" -http = "0.2.8" -indexmap = { version = "1.9.1", features = ["serde-1"] } -itertools = "0.10.5" -lazy_static = "1.4.0" -log = "0.4.17" -meilisearch-auth = { path = "../meilisearch-auth" } -meilisearch-types = { path = "../meilisearch-types" } -milli = { git = "https://github.com/meilisearch/milli.git", tag = "v0.33.4", default-features = false } -mime = "0.3.16" -num_cpus = "1.13.1" -obkv = "0.2.0" -once_cell = "1.15.0" -page_size = "0.4.2" -parking_lot = "0.12.1" -permissive-json-pointer = { path = "../permissive-json-pointer" } -rand = "0.8.5" -rayon = "1.5.3" -regex = "1.6.0" -reqwest = { version = "0.11.12", features = ["json", "rustls-tls"], default-features = false, optional = true } -roaring = "0.9.0" -rustls = "0.20.6" -serde = { version = "1.0.145", features = ["derive"] } -serde_json = { version = "1.0.85", features = ["preserve_order"] } -siphasher = "0.3.10" -slice-group-by = "0.3.0" -sysinfo = "0.26.4" -tar = "0.4.38" -tempfile = "3.3.0" -thiserror = "1.0.37" -time = { version = "0.3.15", features = ["serde-well-known", "formatting", "parsing", "macros"] } -tokio = { version = "1.21.2", features = ["full"] } -uuid = { version = "1.1.2", features = ["serde", "v4"] } -walkdir = "2.3.2" -whoami = { version = "1.2.3", optional = true } -index-scheduler = { path = "../index-scheduler" } -index = { path = "../index" } -file-store = { path = "../file-store" } - -[dev-dependencies] -actix-rt = "2.7.0" -meilisearch-types = { path = "../meilisearch-types", features = ["test-traits"] } -mockall = "0.11.2" -nelson = { git = "https://github.com/meilisearch/nelson.git", rev = "675f13885548fb415ead8fbb447e9e6d9314000a"} -paste = "1.0.9" -proptest = "1.0.0" -proptest-derive = "0.3.0" - -[features] -# all specialized tokenizations -default = ["milli/default"] - -# chinese specialized tokenization -chinese = ["milli/chinese"] - -# hebrew specialized tokenization -hebrew = ["milli/hebrew"] - -# japanese specialized tokenization -japanese = ["milli/japanese"] - -# thai specialized tokenization -thai = ["milli/thai"] diff --git a/meilisearch-lib/src/document_formats.rs b/meilisearch-lib/src/document_formats.rs deleted file mode 100644 index cfc200019..000000000 --- a/meilisearch-lib/src/document_formats.rs +++ /dev/null @@ -1,170 +0,0 @@ -use std::borrow::Borrow; -use std::fmt::{self, Debug, Display}; -use std::io::{self, BufReader, Read, Seek, Write}; - -use either::Either; -use meilisearch_types::error::{Code, ErrorCode}; -use meilisearch_types::internal_error; -use milli::documents::{DocumentsBatchBuilder, Error}; -use milli::Object; -use serde::Deserialize; -use serde_json::error::Category; - -type Result = std::result::Result; - -#[derive(Debug)] -pub enum PayloadType { - Ndjson, - Json, - Csv, -} - -impl fmt::Display for PayloadType { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - PayloadType::Ndjson => f.write_str("ndjson"), - PayloadType::Json => f.write_str("json"), - PayloadType::Csv => f.write_str("csv"), - } - } -} - -#[derive(Debug)] -pub enum DocumentFormatError { - Internal(Box), - MalformedPayload(Error, PayloadType), -} - -impl Display for DocumentFormatError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Self::Internal(e) => write!(f, "An internal error has occurred: `{}`.", e), - Self::MalformedPayload(me, b) => match me.borrow() { - Error::Json(se) => { - let mut message = match se.classify() { - Category::Data => { - "data are neither an object nor a list of objects".to_string() - } - _ => se.to_string(), - }; - - // https://github.com/meilisearch/meilisearch/issues/2107 - // The user input maybe insanely long. We need to truncate it. - let ellipsis = "..."; - let trim_input_prefix_len = 50; - let trim_input_suffix_len = 85; - - if message.len() - > trim_input_prefix_len + trim_input_suffix_len + ellipsis.len() - { - message.replace_range( - trim_input_prefix_len..message.len() - trim_input_suffix_len, - ellipsis, - ); - } - - write!( - f, - "The `{}` payload provided is malformed. `Couldn't serialize document value: {}`.", - b, message - ) - } - _ => write!(f, "The `{}` payload provided is malformed: `{}`.", b, me), - }, - } - } -} - -impl std::error::Error for DocumentFormatError {} - -impl From<(PayloadType, Error)> for DocumentFormatError { - fn from((ty, error): (PayloadType, Error)) -> Self { - match error { - Error::Io(e) => Self::Internal(Box::new(e)), - e => Self::MalformedPayload(e, ty), - } - } -} - -impl ErrorCode for DocumentFormatError { - fn error_code(&self) -> Code { - match self { - DocumentFormatError::Internal(_) => Code::Internal, - DocumentFormatError::MalformedPayload(_, _) => Code::MalformedPayload, - } - } -} - -internal_error!(DocumentFormatError: io::Error); - -/// Reads CSV from input and write an obkv batch to writer. -pub fn read_csv(input: impl Read, writer: impl Write + Seek) -> Result { - let mut builder = DocumentsBatchBuilder::new(writer); - - let csv = csv::Reader::from_reader(input); - builder.append_csv(csv).map_err(|e| (PayloadType::Csv, e))?; - - let count = builder.documents_count(); - let _ = builder - .into_inner() - .map_err(Into::into) - .map_err(DocumentFormatError::Internal)?; - - Ok(count as usize) -} - -/// Reads JSON Lines from input and write an obkv batch to writer. -pub fn read_ndjson(input: impl Read, writer: impl Write + Seek) -> Result { - let mut builder = DocumentsBatchBuilder::new(writer); - let reader = BufReader::new(input); - - for result in serde_json::Deserializer::from_reader(reader).into_iter() { - let object = result - .map_err(Error::Json) - .map_err(|e| (PayloadType::Ndjson, e))?; - builder - .append_json_object(&object) - .map_err(Into::into) - .map_err(DocumentFormatError::Internal)?; - } - - let count = builder.documents_count(); - let _ = builder - .into_inner() - .map_err(Into::into) - .map_err(DocumentFormatError::Internal)?; - - Ok(count as usize) -} - -/// Reads JSON from input and write an obkv batch to writer. -pub fn read_json(input: impl Read, writer: impl Write + Seek) -> Result { - let mut builder = DocumentsBatchBuilder::new(writer); - let reader = BufReader::new(input); - - #[derive(Deserialize, Debug)] - #[serde(transparent)] - struct ArrayOrSingleObject { - #[serde(with = "either::serde_untagged")] - inner: Either, Object>, - } - - let content: ArrayOrSingleObject = serde_json::from_reader(reader) - .map_err(Error::Json) - .map_err(|e| (PayloadType::Json, e))?; - - for object in content.inner.map_right(|o| vec![o]).into_inner() { - builder - .append_json_object(&object) - .map_err(Into::into) - .map_err(DocumentFormatError::Internal)?; - } - - let count = builder.documents_count(); - let _ = builder - .into_inner() - .map_err(Into::into) - .map_err(DocumentFormatError::Internal)?; - - Ok(count as usize) -} diff --git a/meilisearch-lib/src/dump/compat/v2.rs b/meilisearch-lib/src/dump/compat/v2.rs deleted file mode 100644 index ba3b8e3a6..000000000 --- a/meilisearch-lib/src/dump/compat/v2.rs +++ /dev/null @@ -1,152 +0,0 @@ -use anyhow::bail; -use meilisearch_types::error::Code; -use milli::update::IndexDocumentsMethod; -use serde::{Deserialize, Serialize}; -use time::OffsetDateTime; -use uuid::Uuid; - -use crate::index::{Settings, Unchecked}; - -#[derive(Serialize, Deserialize)] -pub struct UpdateEntry { - pub uuid: Uuid, - pub update: UpdateStatus, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub enum UpdateFormat { - Json, -} - -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct DocumentAdditionResult { - pub nb_documents: usize, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub enum UpdateResult { - DocumentsAddition(DocumentAdditionResult), - DocumentDeletion { deleted: u64 }, - Other, -} - -#[allow(clippy::large_enum_variant)] -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(tag = "type")] -pub enum UpdateMeta { - DocumentsAddition { - method: IndexDocumentsMethod, - format: UpdateFormat, - primary_key: Option, - }, - ClearDocuments, - DeleteDocuments { - ids: Vec, - }, - Settings(Settings), -} - -#[derive(Debug, Serialize, Deserialize, Clone)] -#[serde(rename_all = "camelCase")] -pub struct Enqueued { - pub update_id: u64, - pub meta: UpdateMeta, - #[serde(with = "time::serde::rfc3339")] - pub enqueued_at: OffsetDateTime, - pub content: Option, -} - -#[derive(Debug, Serialize, Deserialize, Clone)] -#[serde(rename_all = "camelCase")] -pub struct Processed { - pub success: UpdateResult, - #[serde(with = "time::serde::rfc3339")] - pub processed_at: OffsetDateTime, - #[serde(flatten)] - pub from: Processing, -} - -#[derive(Debug, Serialize, Deserialize, Clone)] -#[serde(rename_all = "camelCase")] -pub struct Processing { - #[serde(flatten)] - pub from: Enqueued, - #[serde(with = "time::serde::rfc3339")] - pub started_processing_at: OffsetDateTime, -} - -#[derive(Debug, Serialize, Deserialize, Clone)] -#[serde(rename_all = "camelCase")] -pub struct Aborted { - #[serde(flatten)] - pub from: Enqueued, - #[serde(with = "time::serde::rfc3339")] - pub aborted_at: OffsetDateTime, -} - -#[derive(Debug, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct Failed { - #[serde(flatten)] - pub from: Processing, - pub error: ResponseError, - #[serde(with = "time::serde::rfc3339")] - pub failed_at: OffsetDateTime, -} - -#[derive(Debug, Serialize, Deserialize)] -#[serde(tag = "status", rename_all = "camelCase")] -pub enum UpdateStatus { - Processing(Processing), - Enqueued(Enqueued), - Processed(Processed), - Aborted(Aborted), - Failed(Failed), -} - -type StatusCode = (); - -#[derive(Debug, Serialize, Deserialize, Clone)] -#[serde(rename_all = "camelCase")] -pub struct ResponseError { - #[serde(skip)] - pub code: StatusCode, - pub message: String, - pub error_code: String, - pub error_type: String, - pub error_link: String, -} - -pub fn error_code_from_str(s: &str) -> anyhow::Result { - let code = match s { - "index_creation_failed" => Code::CreateIndex, - "index_already_exists" => Code::IndexAlreadyExists, - "index_not_found" => Code::IndexNotFound, - "invalid_index_uid" => Code::InvalidIndexUid, - "invalid_state" => Code::InvalidState, - "missing_primary_key" => Code::MissingPrimaryKey, - "primary_key_already_present" => Code::PrimaryKeyAlreadyPresent, - "invalid_request" => Code::InvalidRankingRule, - "max_fields_limit_exceeded" => Code::MaxFieldsLimitExceeded, - "missing_document_id" => Code::MissingDocumentId, - "invalid_facet" => Code::Filter, - "invalid_filter" => Code::Filter, - "invalid_sort" => Code::Sort, - "bad_parameter" => Code::BadParameter, - "bad_request" => Code::BadRequest, - "document_not_found" => Code::DocumentNotFound, - "internal" => Code::Internal, - "invalid_geo_field" => Code::InvalidGeoField, - "invalid_token" => Code::InvalidToken, - "missing_authorization_header" => Code::MissingAuthorizationHeader, - "payload_too_large" => Code::PayloadTooLarge, - "unretrievable_document" => Code::RetrieveDocument, - "search_error" => Code::SearchDocuments, - "unsupported_media_type" => Code::UnsupportedMediaType, - "dump_already_in_progress" => Code::DumpAlreadyInProgress, - "dump_process_failed" => Code::DumpProcessFailed, - _ => bail!("unknown error code."), - }; - - Ok(code) -} diff --git a/meilisearch-lib/src/dump/loaders/v4.rs b/meilisearch-lib/src/dump/loaders/v4.rs deleted file mode 100644 index 44ec23517..000000000 --- a/meilisearch-lib/src/dump/loaders/v4.rs +++ /dev/null @@ -1,103 +0,0 @@ -use std::fs::{self, create_dir_all, File}; -use std::io::{BufReader, Write}; -use std::path::Path; - -use fs_extra::dir::{self, CopyOptions}; -use log::info; -use serde_json::{Deserializer, Map, Value}; -use tempfile::tempdir; -use uuid::Uuid; - -use crate::dump::{compat, Metadata}; -use crate::options::IndexerOpts; -use crate::tasks::task::Task; - -pub fn load_dump( - meta: Metadata, - src: impl AsRef, - dst: impl AsRef, - index_db_size: usize, - meta_env_size: usize, - indexing_options: &IndexerOpts, -) -> anyhow::Result<()> { - info!("Patching dump V4 to dump V5..."); - - let patched_dir = tempdir()?; - let options = CopyOptions::default(); - - // Indexes - dir::copy(src.as_ref().join("indexes"), &patched_dir, &options)?; - - // Index uuids - dir::copy(src.as_ref().join("index_uuids"), &patched_dir, &options)?; - - // Metadata - fs::copy( - src.as_ref().join("metadata.json"), - patched_dir.path().join("metadata.json"), - )?; - - // Updates - patch_updates(&src, &patched_dir)?; - - // Keys - patch_keys(&src, &patched_dir)?; - - super::v5::load_dump( - meta, - &patched_dir, - dst, - index_db_size, - meta_env_size, - indexing_options, - ) -} - -fn patch_updates(src: impl AsRef, dst: impl AsRef) -> anyhow::Result<()> { - let updates_path = src.as_ref().join("updates/data.jsonl"); - let output_updates_path = dst.as_ref().join("updates/data.jsonl"); - create_dir_all(output_updates_path.parent().unwrap())?; - let updates_file = File::open(updates_path)?; - let mut output_update_file = File::create(output_updates_path)?; - - serde_json::Deserializer::from_reader(updates_file) - .into_iter::() - .try_for_each(|task| -> anyhow::Result<()> { - let task: Task = task?.into(); - - serde_json::to_writer(&mut output_update_file, &task)?; - output_update_file.write_all(b"\n")?; - - Ok(()) - })?; - - output_update_file.flush()?; - - Ok(()) -} - -fn patch_keys(src: impl AsRef, dst: impl AsRef) -> anyhow::Result<()> { - let keys_file_src = src.as_ref().join("keys"); - - if !keys_file_src.exists() { - return Ok(()); - } - - fs::create_dir_all(&dst)?; - let keys_file_dst = dst.as_ref().join("keys"); - let mut writer = File::create(&keys_file_dst)?; - - let reader = BufReader::new(File::open(&keys_file_src)?); - for key in Deserializer::from_reader(reader).into_iter() { - let mut key: Map = key?; - - // generate a new uuid v4 and insert it in the key. - let uid = serde_json::to_value(Uuid::new_v4()).unwrap(); - key.insert("uid".to_string(), uid); - - serde_json::to_writer(&mut writer, &key)?; - writer.write_all(b"\n")?; - } - - Ok(()) -} diff --git a/meilisearch-lib/src/index/dump.rs b/meilisearch-lib/src/index/dump.rs deleted file mode 100644 index 9cc3c033f..000000000 --- a/meilisearch-lib/src/index/dump.rs +++ /dev/null @@ -1,161 +0,0 @@ -use std::fs::{create_dir_all, File}; -use std::io::{BufReader, Seek, SeekFrom, Write}; -use std::path::Path; - -use anyhow::Context; -use indexmap::IndexMap; -use milli::documents::DocumentsBatchReader; -use milli::heed::{EnvOpenOptions, RoTxn}; -use milli::update::{IndexDocumentsConfig, IndexerConfig}; -use serde::{Deserialize, Serialize}; - -use crate::document_formats::read_ndjson; -use crate::index::updates::apply_settings_to_builder; - -use super::error::Result; -use super::{index::Index, Settings, Unchecked}; - -#[derive(Serialize, Deserialize)] -struct DumpMeta { - settings: Settings, - primary_key: Option, -} - -const META_FILE_NAME: &str = "meta.json"; -const DATA_FILE_NAME: &str = "documents.jsonl"; - -impl Index { - pub fn dump(&self, path: impl AsRef) -> Result<()> { - // acquire write txn make sure any ongoing write is finished before we start. - let txn = self.write_txn()?; - let path = path.as_ref().join(format!("indexes/{}", self.uuid)); - - create_dir_all(&path)?; - - self.dump_documents(&txn, &path)?; - self.dump_meta(&txn, &path)?; - - Ok(()) - } - - fn dump_documents(&self, txn: &RoTxn, path: impl AsRef) -> Result<()> { - let document_file_path = path.as_ref().join(DATA_FILE_NAME); - let mut document_file = File::create(&document_file_path)?; - - let documents = self.all_documents(txn)?; - let fields_ids_map = self.fields_ids_map(txn)?; - - // dump documents - let mut json_map = IndexMap::new(); - for document in documents { - let (_, reader) = document?; - - for (fid, bytes) in reader.iter() { - if let Some(name) = fields_ids_map.name(fid) { - json_map.insert(name, serde_json::from_slice::(bytes)?); - } - } - - serde_json::to_writer(&mut document_file, &json_map)?; - document_file.write_all(b"\n")?; - - json_map.clear(); - } - - Ok(()) - } - - fn dump_meta(&self, txn: &RoTxn, path: impl AsRef) -> Result<()> { - let meta_file_path = path.as_ref().join(META_FILE_NAME); - let mut meta_file = File::create(&meta_file_path)?; - - let settings = self.settings_txn(txn)?.into_unchecked(); - let primary_key = self.primary_key(txn)?.map(String::from); - let meta = DumpMeta { - settings, - primary_key, - }; - - serde_json::to_writer(&mut meta_file, &meta)?; - - Ok(()) - } - - pub fn load_dump( - src: impl AsRef, - dst: impl AsRef, - size: usize, - indexer_config: &IndexerConfig, - ) -> anyhow::Result<()> { - let dir_name = src - .as_ref() - .file_name() - .with_context(|| format!("invalid dump index: {}", src.as_ref().display()))?; - - let dst_dir_path = dst.as_ref().join("indexes").join(dir_name); - create_dir_all(&dst_dir_path)?; - - let meta_path = src.as_ref().join(META_FILE_NAME); - let meta_file = File::open(meta_path)?; - let DumpMeta { - settings, - primary_key, - } = serde_json::from_reader(meta_file)?; - let settings = settings.check(); - - let mut options = EnvOpenOptions::new(); - options.map_size(size); - options.max_readers(1024); - let index = milli::Index::new(options, &dst_dir_path)?; - - let mut txn = index.write_txn()?; - - // Apply settings first - let mut builder = milli::update::Settings::new(&mut txn, &index, indexer_config); - - if let Some(primary_key) = primary_key { - builder.set_primary_key(primary_key); - } - - apply_settings_to_builder(&settings, &mut builder); - - builder.execute(|_| ())?; - - let document_file_path = src.as_ref().join(DATA_FILE_NAME); - let reader = BufReader::new(File::open(&document_file_path)?); - - let mut tmp_doc_file = tempfile::tempfile()?; - - let empty = match read_ndjson(reader, &mut tmp_doc_file) { - // if there was no document in the file it's because the index was empty - Ok(0) => true, - Ok(_) => false, - Err(e) => return Err(e.into()), - }; - - if !empty { - tmp_doc_file.seek(SeekFrom::Start(0))?; - - let documents_reader = DocumentsBatchReader::from_reader(tmp_doc_file)?; - - //If the document file is empty, we don't perform the document addition, to prevent - //a primary key error to be thrown. - let config = IndexDocumentsConfig::default(); - let builder = milli::update::IndexDocuments::new( - &mut txn, - &index, - indexer_config, - config, - |_| (), - )?; - let (builder, user_error) = builder.add_documents(documents_reader)?; - user_error?; - builder.execute()?; - } - - txn.commit()?; - index.prepare_for_closing().wait(); - - Ok(()) - } -} diff --git a/meilisearch-lib/src/index/index.rs b/meilisearch-lib/src/index/index.rs deleted file mode 100644 index 3d6c47949..000000000 --- a/meilisearch-lib/src/index/index.rs +++ /dev/null @@ -1,333 +0,0 @@ -use std::collections::BTreeSet; -use std::fs::create_dir_all; -use std::marker::PhantomData; -use std::ops::Deref; -use std::path::Path; -use std::sync::Arc; - -use fst::IntoStreamer; -use milli::heed::{CompactionOption, EnvOpenOptions, RoTxn}; -use milli::update::{IndexerConfig, Setting}; -use milli::{obkv_to_json, FieldDistribution, DEFAULT_VALUES_PER_FACET}; -use serde::{Deserialize, Serialize}; -use serde_json::{Map, Value}; -use time::OffsetDateTime; -use uuid::Uuid; -use walkdir::WalkDir; - -use crate::index::search::DEFAULT_PAGINATION_MAX_TOTAL_HITS; - -use super::error::IndexError; -use super::error::Result; -use super::updates::{FacetingSettings, MinWordSizeTyposSetting, PaginationSettings, TypoSettings}; -use super::{Checked, Settings}; - -pub type Document = Map; - -#[derive(Debug, Serialize, Deserialize, Clone)] -#[serde(rename_all = "camelCase")] -pub struct IndexMeta { - #[serde(with = "time::serde::rfc3339")] - pub created_at: OffsetDateTime, - #[serde(with = "time::serde::rfc3339")] - pub updated_at: OffsetDateTime, - pub primary_key: Option, -} - -impl IndexMeta { - pub fn new(index: &Index) -> Result { - let txn = index.read_txn()?; - Self::new_txn(index, &txn) - } - - pub fn new_txn(index: &Index, txn: &milli::heed::RoTxn) -> Result { - let created_at = index.created_at(txn)?; - let updated_at = index.updated_at(txn)?; - let primary_key = index.primary_key(txn)?.map(String::from); - Ok(Self { - created_at, - updated_at, - primary_key, - }) - } -} - -#[derive(Serialize, Debug)] -#[serde(rename_all = "camelCase")] -pub struct IndexStats { - #[serde(skip)] - pub size: u64, - pub number_of_documents: u64, - /// Whether the current index is performing an update. It is initially `None` when the - /// index returns it, since it is the `UpdateStore` that knows what index is currently indexing. It is - /// later set to either true or false, we we retrieve the information from the `UpdateStore` - pub is_indexing: Option, - pub field_distribution: FieldDistribution, -} - -#[derive(Clone, derivative::Derivative)] -#[derivative(Debug)] -pub struct Index { - pub uuid: Uuid, - #[derivative(Debug = "ignore")] - pub inner: Arc, - #[derivative(Debug = "ignore")] - pub indexer_config: Arc, -} - -impl Deref for Index { - type Target = milli::Index; - - fn deref(&self) -> &Self::Target { - self.inner.as_ref() - } -} - -impl Index { - pub fn open( - path: impl AsRef, - size: usize, - uuid: Uuid, - update_handler: Arc, - ) -> Result { - log::debug!("opening index in {}", path.as_ref().display()); - create_dir_all(&path)?; - let mut options = EnvOpenOptions::new(); - options.map_size(size); - options.max_readers(1024); - let inner = Arc::new(milli::Index::new(options, &path)?); - Ok(Index { - inner, - uuid, - indexer_config: update_handler, - }) - } - - /// Asynchronously close the underlying index - pub fn close(self) { - self.inner.as_ref().clone().prepare_for_closing(); - } - - pub fn stats(&self) -> Result { - let rtxn = self.read_txn()?; - - Ok(IndexStats { - size: self.size(), - number_of_documents: self.number_of_documents(&rtxn)?, - is_indexing: None, - field_distribution: self.field_distribution(&rtxn)?, - }) - } - - pub fn meta(&self) -> Result { - IndexMeta::new(self) - } - pub fn settings(&self) -> Result> { - let txn = self.read_txn()?; - self.settings_txn(&txn) - } - - pub fn uuid(&self) -> Uuid { - self.uuid - } - - pub fn settings_txn(&self, txn: &RoTxn) -> Result> { - let displayed_attributes = self - .displayed_fields(txn)? - .map(|fields| fields.into_iter().map(String::from).collect()); - - let searchable_attributes = self - .user_defined_searchable_fields(txn)? - .map(|fields| fields.into_iter().map(String::from).collect()); - - let filterable_attributes = self.filterable_fields(txn)?.into_iter().collect(); - - let sortable_attributes = self.sortable_fields(txn)?.into_iter().collect(); - - let criteria = self - .criteria(txn)? - .into_iter() - .map(|c| c.to_string()) - .collect(); - - let stop_words = self - .stop_words(txn)? - .map(|stop_words| -> Result> { - Ok(stop_words.stream().into_strs()?.into_iter().collect()) - }) - .transpose()? - .unwrap_or_default(); - let distinct_field = self.distinct_field(txn)?.map(String::from); - - // in milli each word in the synonyms map were split on their separator. Since we lost - // this information we are going to put space between words. - let synonyms = self - .synonyms(txn)? - .iter() - .map(|(key, values)| { - ( - key.join(" "), - values.iter().map(|value| value.join(" ")).collect(), - ) - }) - .collect(); - - let min_typo_word_len = MinWordSizeTyposSetting { - one_typo: Setting::Set(self.min_word_len_one_typo(txn)?), - two_typos: Setting::Set(self.min_word_len_two_typos(txn)?), - }; - - let disabled_words = match self.exact_words(txn)? { - Some(fst) => fst.into_stream().into_strs()?.into_iter().collect(), - None => BTreeSet::new(), - }; - - let disabled_attributes = self - .exact_attributes(txn)? - .into_iter() - .map(String::from) - .collect(); - - let typo_tolerance = TypoSettings { - enabled: Setting::Set(self.authorize_typos(txn)?), - min_word_size_for_typos: Setting::Set(min_typo_word_len), - disable_on_words: Setting::Set(disabled_words), - disable_on_attributes: Setting::Set(disabled_attributes), - }; - - let faceting = FacetingSettings { - max_values_per_facet: Setting::Set( - self.max_values_per_facet(txn)? - .unwrap_or(DEFAULT_VALUES_PER_FACET), - ), - }; - - let pagination = PaginationSettings { - max_total_hits: Setting::Set( - self.pagination_max_total_hits(txn)? - .unwrap_or(DEFAULT_PAGINATION_MAX_TOTAL_HITS), - ), - }; - - Ok(Settings { - displayed_attributes: match displayed_attributes { - Some(attrs) => Setting::Set(attrs), - None => Setting::Reset, - }, - searchable_attributes: match searchable_attributes { - Some(attrs) => Setting::Set(attrs), - None => Setting::Reset, - }, - filterable_attributes: Setting::Set(filterable_attributes), - sortable_attributes: Setting::Set(sortable_attributes), - ranking_rules: Setting::Set(criteria), - stop_words: Setting::Set(stop_words), - distinct_attribute: match distinct_field { - Some(field) => Setting::Set(field), - None => Setting::Reset, - }, - synonyms: Setting::Set(synonyms), - typo_tolerance: Setting::Set(typo_tolerance), - faceting: Setting::Set(faceting), - pagination: Setting::Set(pagination), - _kind: PhantomData, - }) - } - - /// Return the total number of documents contained in the index + the selected documents. - pub fn retrieve_documents>( - &self, - offset: usize, - limit: usize, - attributes_to_retrieve: Option>, - ) -> Result<(u64, Vec)> { - let txn = self.read_txn()?; - - let fields_ids_map = self.fields_ids_map(&txn)?; - let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect(); - - let mut documents = Vec::new(); - for entry in self.all_documents(&txn)?.skip(offset).take(limit) { - let (_id, obkv) = entry?; - let document = obkv_to_json(&all_fields, &fields_ids_map, obkv)?; - let document = match &attributes_to_retrieve { - Some(attributes_to_retrieve) => permissive_json_pointer::select_values( - &document, - attributes_to_retrieve.iter().map(|s| s.as_ref()), - ), - None => document, - }; - documents.push(document); - } - - let number_of_documents = self.number_of_documents(&txn)?; - - Ok((number_of_documents, documents)) - } - - pub fn retrieve_document>( - &self, - doc_id: String, - attributes_to_retrieve: Option>, - ) -> Result { - let txn = self.read_txn()?; - - let fields_ids_map = self.fields_ids_map(&txn)?; - let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect(); - - let internal_id = self - .external_documents_ids(&txn)? - .get(doc_id.as_bytes()) - .ok_or_else(|| IndexError::DocumentNotFound(doc_id.clone()))?; - - let document = self - .documents(&txn, std::iter::once(internal_id))? - .into_iter() - .next() - .map(|(_, d)| d) - .ok_or(IndexError::DocumentNotFound(doc_id))?; - - let document = obkv_to_json(&all_fields, &fields_ids_map, document)?; - let document = match &attributes_to_retrieve { - Some(attributes_to_retrieve) => permissive_json_pointer::select_values( - &document, - attributes_to_retrieve.iter().map(|s| s.as_ref()), - ), - None => document, - }; - - Ok(document) - } - - pub fn size(&self) -> u64 { - WalkDir::new(self.path()) - .into_iter() - .filter_map(|entry| entry.ok()) - .filter_map(|entry| entry.metadata().ok()) - .filter(|metadata| metadata.is_file()) - .fold(0, |acc, m| acc + m.len()) - } - - pub fn snapshot(&self, path: impl AsRef) -> Result<()> { - let mut dst = path.as_ref().join(format!("indexes/{}/", self.uuid)); - create_dir_all(&dst)?; - dst.push("data.mdb"); - let _txn = self.write_txn()?; - self.inner.copy_to_path(dst, CompactionOption::Enabled)?; - Ok(()) - } -} - -/// When running tests, when a server instance is dropped, the environment is not actually closed, -/// leaving a lot of open file descriptors. -impl Drop for Index { - fn drop(&mut self) { - // When dropping the last instance of an index, we want to close the index - // Note that the close is actually performed only if all the instances a effectively - // dropped - - if Arc::strong_count(&self.inner) == 1 { - self.inner.as_ref().clone().prepare_for_closing(); - } - } -} diff --git a/meilisearch-lib/src/index/search.rs b/meilisearch-lib/src/index/search.rs deleted file mode 100644 index 1a9aa1d0d..000000000 --- a/meilisearch-lib/src/index/search.rs +++ /dev/null @@ -1,688 +0,0 @@ -use std::cmp::min; -use std::collections::{BTreeMap, BTreeSet, HashSet}; -use std::str::FromStr; -use std::time::Instant; - -use either::Either; -use milli::tokenizer::TokenizerBuilder; -use milli::{ - AscDesc, FieldId, FieldsIdsMap, Filter, FormatOptions, MatchBounds, MatcherBuilder, SortError, - TermsMatchingStrategy, DEFAULT_VALUES_PER_FACET, -}; -use regex::Regex; -use serde::{Deserialize, Serialize}; -use serde_json::{json, Value}; - -use crate::index::error::FacetError; - -use super::error::{IndexError, Result}; -use super::index::Index; - -pub type Document = serde_json::Map; -type MatchesPosition = BTreeMap>; - -pub const DEFAULT_SEARCH_LIMIT: fn() -> usize = || 20; -pub const DEFAULT_CROP_LENGTH: fn() -> usize = || 10; -pub const DEFAULT_CROP_MARKER: fn() -> String = || "…".to_string(); -pub const DEFAULT_HIGHLIGHT_PRE_TAG: fn() -> String = || "".to_string(); -pub const DEFAULT_HIGHLIGHT_POST_TAG: fn() -> String = || "".to_string(); - -/// The maximum number of results that the engine -/// will be able to return in one search call. -pub const DEFAULT_PAGINATION_MAX_TOTAL_HITS: usize = 1000; - -#[derive(Deserialize, Debug, Clone, PartialEq, Eq)] -#[serde(rename_all = "camelCase", deny_unknown_fields)] -pub struct SearchQuery { - pub q: Option, - pub offset: Option, - #[serde(default = "DEFAULT_SEARCH_LIMIT")] - pub limit: usize, - pub attributes_to_retrieve: Option>, - pub attributes_to_crop: Option>, - #[serde(default = "DEFAULT_CROP_LENGTH")] - pub crop_length: usize, - pub attributes_to_highlight: Option>, - // Default to false - #[serde(default = "Default::default")] - pub show_matches_position: bool, - pub filter: Option, - pub sort: Option>, - pub facets: Option>, - #[serde(default = "DEFAULT_HIGHLIGHT_PRE_TAG")] - pub highlight_pre_tag: String, - #[serde(default = "DEFAULT_HIGHLIGHT_POST_TAG")] - pub highlight_post_tag: String, - #[serde(default = "DEFAULT_CROP_MARKER")] - pub crop_marker: String, - #[serde(default)] - pub matching_strategy: MatchingStrategy, -} - -#[derive(Deserialize, Debug, Clone, PartialEq, Eq)] -#[serde(rename_all = "camelCase")] -pub enum MatchingStrategy { - /// Remove query words from last to first - Last, - /// All query words are mandatory - All, -} - -impl Default for MatchingStrategy { - fn default() -> Self { - Self::Last - } -} - -impl From for TermsMatchingStrategy { - fn from(other: MatchingStrategy) -> Self { - match other { - MatchingStrategy::Last => Self::Last, - MatchingStrategy::All => Self::All, - } - } -} - -#[derive(Debug, Clone, Serialize, PartialEq)] -pub struct SearchHit { - #[serde(flatten)] - pub document: Document, - #[serde(rename = "_formatted", skip_serializing_if = "Document::is_empty")] - pub formatted: Document, - #[serde(rename = "_matchesPosition", skip_serializing_if = "Option::is_none")] - pub matches_position: Option, -} - -#[derive(Serialize, Debug, Clone, PartialEq)] -#[serde(rename_all = "camelCase")] -pub struct SearchResult { - pub hits: Vec, - pub estimated_total_hits: u64, - pub query: String, - pub limit: usize, - pub offset: usize, - pub processing_time_ms: u128, - #[serde(skip_serializing_if = "Option::is_none")] - pub facet_distribution: Option>>, -} - -impl Index { - pub fn perform_search(&self, query: SearchQuery) -> Result { - let before_search = Instant::now(); - let rtxn = self.read_txn()?; - - let mut search = self.search(&rtxn); - - if let Some(ref query) = query.q { - search.query(query); - } - - search.terms_matching_strategy(query.matching_strategy.into()); - - let max_total_hits = self - .pagination_max_total_hits(&rtxn)? - .unwrap_or(DEFAULT_PAGINATION_MAX_TOTAL_HITS); - - // Make sure that a user can't get more documents than the hard limit, - // we align that on the offset too. - let offset = min(query.offset.unwrap_or(0), max_total_hits); - let limit = min(query.limit, max_total_hits.saturating_sub(offset)); - - search.offset(offset); - search.limit(limit); - - if let Some(ref filter) = query.filter { - if let Some(facets) = parse_filter(filter)? { - search.filter(facets); - } - } - - if let Some(ref sort) = query.sort { - let sort = match sort.iter().map(|s| AscDesc::from_str(s)).collect() { - Ok(sorts) => sorts, - Err(asc_desc_error) => { - return Err(IndexError::Milli(SortError::from(asc_desc_error).into())) - } - }; - - search.sort_criteria(sort); - } - - let milli::SearchResult { - documents_ids, - matching_words, - candidates, - .. - } = search.execute()?; - - let fields_ids_map = self.fields_ids_map(&rtxn).unwrap(); - - let displayed_ids = self - .displayed_fields_ids(&rtxn)? - .map(|fields| fields.into_iter().collect::>()) - .unwrap_or_else(|| fields_ids_map.iter().map(|(id, _)| id).collect()); - - let fids = |attrs: &BTreeSet| { - let mut ids = BTreeSet::new(); - for attr in attrs { - if attr == "*" { - ids = displayed_ids.clone(); - break; - } - - if let Some(id) = fields_ids_map.id(attr) { - ids.insert(id); - } - } - ids - }; - - // The attributes to retrieve are the ones explicitly marked as to retrieve (all by default), - // but these attributes must be also be present - // - in the fields_ids_map - // - in the the displayed attributes - let to_retrieve_ids: BTreeSet<_> = query - .attributes_to_retrieve - .as_ref() - .map(fids) - .unwrap_or_else(|| displayed_ids.clone()) - .intersection(&displayed_ids) - .cloned() - .collect(); - - let attr_to_highlight = query.attributes_to_highlight.unwrap_or_default(); - - let attr_to_crop = query.attributes_to_crop.unwrap_or_default(); - - // Attributes in `formatted_options` correspond to the attributes that will be in `_formatted` - // These attributes are: - // - the attributes asked to be highlighted or cropped (with `attributesToCrop` or `attributesToHighlight`) - // - the attributes asked to be retrieved: these attributes will not be highlighted/cropped - // But these attributes must be also present in displayed attributes - let formatted_options = compute_formatted_options( - &attr_to_highlight, - &attr_to_crop, - query.crop_length, - &to_retrieve_ids, - &fields_ids_map, - &displayed_ids, - ); - - let tokenizer = TokenizerBuilder::default().build(); - - let mut formatter_builder = MatcherBuilder::new(matching_words, tokenizer); - formatter_builder.crop_marker(query.crop_marker); - formatter_builder.highlight_prefix(query.highlight_pre_tag); - formatter_builder.highlight_suffix(query.highlight_post_tag); - - let mut documents = Vec::new(); - - let documents_iter = self.documents(&rtxn, documents_ids)?; - - for (_id, obkv) in documents_iter { - // First generate a document with all the displayed fields - let displayed_document = make_document(&displayed_ids, &fields_ids_map, obkv)?; - - // select the attributes to retrieve - let attributes_to_retrieve = to_retrieve_ids - .iter() - .map(|&fid| fields_ids_map.name(fid).expect("Missing field name")); - let mut document = - permissive_json_pointer::select_values(&displayed_document, attributes_to_retrieve); - - let (matches_position, formatted) = format_fields( - &displayed_document, - &fields_ids_map, - &formatter_builder, - &formatted_options, - query.show_matches_position, - &displayed_ids, - )?; - - if let Some(sort) = query.sort.as_ref() { - insert_geo_distance(sort, &mut document); - } - - let hit = SearchHit { - document, - formatted, - matches_position, - }; - documents.push(hit); - } - - let estimated_total_hits = candidates.len(); - - let facet_distribution = match query.facets { - Some(ref fields) => { - let mut facet_distribution = self.facets_distribution(&rtxn); - - let max_values_by_facet = self - .max_values_per_facet(&rtxn)? - .unwrap_or(DEFAULT_VALUES_PER_FACET); - facet_distribution.max_values_per_facet(max_values_by_facet); - - if fields.iter().all(|f| f != "*") { - facet_distribution.facets(fields); - } - let distribution = facet_distribution.candidates(candidates).execute()?; - - Some(distribution) - } - None => None, - }; - - let result = SearchResult { - hits: documents, - estimated_total_hits, - query: query.q.clone().unwrap_or_default(), - limit: query.limit, - offset: query.offset.unwrap_or_default(), - processing_time_ms: before_search.elapsed().as_millis(), - facet_distribution, - }; - Ok(result) - } -} - -fn insert_geo_distance(sorts: &[String], document: &mut Document) { - lazy_static::lazy_static! { - static ref GEO_REGEX: Regex = - Regex::new(r"_geoPoint\(\s*([[:digit:].\-]+)\s*,\s*([[:digit:].\-]+)\s*\)").unwrap(); - }; - if let Some(capture_group) = sorts.iter().find_map(|sort| GEO_REGEX.captures(sort)) { - // TODO: TAMO: milli encountered an internal error, what do we want to do? - let base = [ - capture_group[1].parse().unwrap(), - capture_group[2].parse().unwrap(), - ]; - let geo_point = &document.get("_geo").unwrap_or(&json!(null)); - if let Some((lat, lng)) = geo_point["lat"].as_f64().zip(geo_point["lng"].as_f64()) { - let distance = milli::distance_between_two_points(&base, &[lat, lng]); - document.insert("_geoDistance".to_string(), json!(distance.round() as usize)); - } - } -} - -fn compute_formatted_options( - attr_to_highlight: &HashSet, - attr_to_crop: &[String], - query_crop_length: usize, - to_retrieve_ids: &BTreeSet, - fields_ids_map: &FieldsIdsMap, - displayed_ids: &BTreeSet, -) -> BTreeMap { - let mut formatted_options = BTreeMap::new(); - - add_highlight_to_formatted_options( - &mut formatted_options, - attr_to_highlight, - fields_ids_map, - displayed_ids, - ); - - add_crop_to_formatted_options( - &mut formatted_options, - attr_to_crop, - query_crop_length, - fields_ids_map, - displayed_ids, - ); - - // Should not return `_formatted` if no valid attributes to highlight/crop - if !formatted_options.is_empty() { - add_non_formatted_ids_to_formatted_options(&mut formatted_options, to_retrieve_ids); - } - - formatted_options -} - -fn add_highlight_to_formatted_options( - formatted_options: &mut BTreeMap, - attr_to_highlight: &HashSet, - fields_ids_map: &FieldsIdsMap, - displayed_ids: &BTreeSet, -) { - for attr in attr_to_highlight { - let new_format = FormatOptions { - highlight: true, - crop: None, - }; - - if attr == "*" { - for id in displayed_ids { - formatted_options.insert(*id, new_format); - } - break; - } - - if let Some(id) = fields_ids_map.id(attr) { - if displayed_ids.contains(&id) { - formatted_options.insert(id, new_format); - } - } - } -} - -fn add_crop_to_formatted_options( - formatted_options: &mut BTreeMap, - attr_to_crop: &[String], - crop_length: usize, - fields_ids_map: &FieldsIdsMap, - displayed_ids: &BTreeSet, -) { - for attr in attr_to_crop { - let mut split = attr.rsplitn(2, ':'); - let (attr_name, attr_len) = match split.next().zip(split.next()) { - Some((len, name)) => { - let crop_len = len.parse::().unwrap_or(crop_length); - (name, crop_len) - } - None => (attr.as_str(), crop_length), - }; - - if attr_name == "*" { - for id in displayed_ids { - formatted_options - .entry(*id) - .and_modify(|f| f.crop = Some(attr_len)) - .or_insert(FormatOptions { - highlight: false, - crop: Some(attr_len), - }); - } - } - - if let Some(id) = fields_ids_map.id(attr_name) { - if displayed_ids.contains(&id) { - formatted_options - .entry(id) - .and_modify(|f| f.crop = Some(attr_len)) - .or_insert(FormatOptions { - highlight: false, - crop: Some(attr_len), - }); - } - } - } -} - -fn add_non_formatted_ids_to_formatted_options( - formatted_options: &mut BTreeMap, - to_retrieve_ids: &BTreeSet, -) { - for id in to_retrieve_ids { - formatted_options.entry(*id).or_insert(FormatOptions { - highlight: false, - crop: None, - }); - } -} - -fn make_document( - displayed_attributes: &BTreeSet, - field_ids_map: &FieldsIdsMap, - obkv: obkv::KvReaderU16, -) -> Result { - let mut document = serde_json::Map::new(); - - // recreate the original json - for (key, value) in obkv.iter() { - let value = serde_json::from_slice(value)?; - let key = field_ids_map - .name(key) - .expect("Missing field name") - .to_string(); - - document.insert(key, value); - } - - // select the attributes to retrieve - let displayed_attributes = displayed_attributes - .iter() - .map(|&fid| field_ids_map.name(fid).expect("Missing field name")); - - let document = permissive_json_pointer::select_values(&document, displayed_attributes); - Ok(document) -} - -fn format_fields<'a, A: AsRef<[u8]>>( - document: &Document, - field_ids_map: &FieldsIdsMap, - builder: &MatcherBuilder<'a, A>, - formatted_options: &BTreeMap, - compute_matches: bool, - displayable_ids: &BTreeSet, -) -> Result<(Option, Document)> { - let mut matches_position = compute_matches.then(BTreeMap::new); - let mut document = document.clone(); - - // select the attributes to retrieve - let displayable_names = displayable_ids - .iter() - .map(|&fid| field_ids_map.name(fid).expect("Missing field name")); - permissive_json_pointer::map_leaf_values(&mut document, displayable_names, |key, value| { - // To get the formatting option of each key we need to see all the rules that applies - // to the value and merge them together. eg. If a user said he wanted to highlight `doggo` - // and crop `doggo.name`. `doggo.name` needs to be highlighted + cropped while `doggo.age` is only - // highlighted. - let format = formatted_options - .iter() - .filter(|(field, _option)| { - let name = field_ids_map.name(**field).unwrap(); - milli::is_faceted_by(name, key) || milli::is_faceted_by(key, name) - }) - .map(|(_, option)| *option) - .reduce(|acc, option| acc.merge(option)); - let mut infos = Vec::new(); - - *value = format_value( - std::mem::take(value), - builder, - format, - &mut infos, - compute_matches, - ); - - if let Some(matches) = matches_position.as_mut() { - if !infos.is_empty() { - matches.insert(key.to_owned(), infos); - } - } - }); - - let selectors = formatted_options - .keys() - // This unwrap must be safe since we got the ids from the fields_ids_map just - // before. - .map(|&fid| field_ids_map.name(fid).unwrap()); - let document = permissive_json_pointer::select_values(&document, selectors); - - Ok((matches_position, document)) -} - -fn format_value<'a, A: AsRef<[u8]>>( - value: Value, - builder: &MatcherBuilder<'a, A>, - format_options: Option, - infos: &mut Vec, - compute_matches: bool, -) -> Value { - match value { - Value::String(old_string) => { - let mut matcher = builder.build(&old_string); - if compute_matches { - let matches = matcher.matches(); - infos.extend_from_slice(&matches[..]); - } - - match format_options { - Some(format_options) => { - let value = matcher.format(format_options); - Value::String(value.into_owned()) - } - None => Value::String(old_string), - } - } - Value::Array(values) => Value::Array( - values - .into_iter() - .map(|v| { - format_value( - v, - builder, - format_options.map(|format_options| FormatOptions { - highlight: format_options.highlight, - crop: None, - }), - infos, - compute_matches, - ) - }) - .collect(), - ), - Value::Object(object) => Value::Object( - object - .into_iter() - .map(|(k, v)| { - ( - k, - format_value( - v, - builder, - format_options.map(|format_options| FormatOptions { - highlight: format_options.highlight, - crop: None, - }), - infos, - compute_matches, - ), - ) - }) - .collect(), - ), - Value::Number(number) => { - let s = number.to_string(); - - let mut matcher = builder.build(&s); - if compute_matches { - let matches = matcher.matches(); - infos.extend_from_slice(&matches[..]); - } - - match format_options { - Some(format_options) => { - let value = matcher.format(format_options); - Value::String(value.into_owned()) - } - None => Value::Number(number), - } - } - value => value, - } -} - -fn parse_filter(facets: &Value) -> Result> { - match facets { - Value::String(expr) => { - let condition = Filter::from_str(expr)?; - Ok(condition) - } - Value::Array(arr) => parse_filter_array(arr), - v => Err(FacetError::InvalidExpression(&["Array"], v.clone()).into()), - } -} - -fn parse_filter_array(arr: &[Value]) -> Result> { - let mut ands = Vec::new(); - for value in arr { - match value { - Value::String(s) => ands.push(Either::Right(s.as_str())), - Value::Array(arr) => { - let mut ors = Vec::new(); - for value in arr { - match value { - Value::String(s) => ors.push(s.as_str()), - v => { - return Err(FacetError::InvalidExpression(&["String"], v.clone()).into()) - } - } - } - ands.push(Either::Left(ors)); - } - v => { - return Err( - FacetError::InvalidExpression(&["String", "[String]"], v.clone()).into(), - ) - } - } - } - - Ok(Filter::from_array(ands)?) -} - -#[cfg(test)] -mod test { - use super::*; - - #[test] - fn test_insert_geo_distance() { - let value: Document = serde_json::from_str( - r#"{ - "_geo": { - "lat": 50.629973371633746, - "lng": 3.0569447399419567 - }, - "city": "Lille", - "id": "1" - }"#, - ) - .unwrap(); - - let sorters = &["_geoPoint(50.629973371633746,3.0569447399419567):desc".to_string()]; - let mut document = value.clone(); - insert_geo_distance(sorters, &mut document); - assert_eq!(document.get("_geoDistance"), Some(&json!(0))); - - let sorters = &["_geoPoint(50.629973371633746, 3.0569447399419567):asc".to_string()]; - let mut document = value.clone(); - insert_geo_distance(sorters, &mut document); - assert_eq!(document.get("_geoDistance"), Some(&json!(0))); - - let sorters = - &["_geoPoint( 50.629973371633746 , 3.0569447399419567 ):desc".to_string()]; - let mut document = value.clone(); - insert_geo_distance(sorters, &mut document); - assert_eq!(document.get("_geoDistance"), Some(&json!(0))); - - let sorters = &[ - "prix:asc", - "villeneuve:desc", - "_geoPoint(50.629973371633746, 3.0569447399419567):asc", - "ubu:asc", - ] - .map(|s| s.to_string()); - let mut document = value.clone(); - insert_geo_distance(sorters, &mut document); - assert_eq!(document.get("_geoDistance"), Some(&json!(0))); - - // only the first geoPoint is used to compute the distance - let sorters = &[ - "chien:desc", - "_geoPoint(50.629973371633746, 3.0569447399419567):asc", - "pangolin:desc", - "_geoPoint(100.0, -80.0):asc", - "chat:asc", - ] - .map(|s| s.to_string()); - let mut document = value.clone(); - insert_geo_distance(sorters, &mut document); - assert_eq!(document.get("_geoDistance"), Some(&json!(0))); - - // there was no _geoPoint so nothing is inserted in the document - let sorters = &["chien:asc".to_string()]; - let mut document = value; - insert_geo_distance(sorters, &mut document); - assert_eq!(document.get("_geoDistance"), None); - } -} diff --git a/meilisearch-lib/src/index/updates.rs b/meilisearch-lib/src/index/updates.rs deleted file mode 100644 index b6f601753..000000000 --- a/meilisearch-lib/src/index/updates.rs +++ /dev/null @@ -1,559 +0,0 @@ -use std::collections::{BTreeMap, BTreeSet}; -use std::marker::PhantomData; -use std::num::NonZeroUsize; - -use log::{debug, info, trace}; -use milli::documents::DocumentsBatchReader; -use milli::update::{ - DocumentAdditionResult, DocumentDeletionResult, IndexDocumentsConfig, IndexDocumentsMethod, - Setting, -}; -use serde::{Deserialize, Serialize, Serializer}; -use uuid::Uuid; - -use super::error::{IndexError, Result}; -use super::index::{Index, IndexMeta}; -use crate::update_file_store::UpdateFileStore; - -fn serialize_with_wildcard( - field: &Setting>, - s: S, -) -> std::result::Result -where - S: Serializer, -{ - let wildcard = vec!["*".to_string()]; - match field { - Setting::Set(value) => Some(value), - Setting::Reset => Some(&wildcard), - Setting::NotSet => None, - } - .serialize(s) -} - -#[derive(Clone, Default, Debug, Serialize, PartialEq, Eq)] -pub struct Checked; - -#[derive(Clone, Default, Debug, Serialize, Deserialize, PartialEq, Eq)] -pub struct Unchecked; - -#[cfg_attr(test, derive(proptest_derive::Arbitrary))] -#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)] -#[serde(deny_unknown_fields)] -#[serde(rename_all = "camelCase")] -pub struct MinWordSizeTyposSetting { - #[cfg_attr(test, proptest(strategy = "test::setting_strategy()"))] - #[serde(default, skip_serializing_if = "Setting::is_not_set")] - pub one_typo: Setting, - #[cfg_attr(test, proptest(strategy = "test::setting_strategy()"))] - #[serde(default, skip_serializing_if = "Setting::is_not_set")] - pub two_typos: Setting, -} - -#[cfg_attr(test, derive(proptest_derive::Arbitrary))] -#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)] -#[serde(deny_unknown_fields)] -#[serde(rename_all = "camelCase")] -pub struct TypoSettings { - #[cfg_attr(test, proptest(strategy = "test::setting_strategy()"))] - #[serde(default, skip_serializing_if = "Setting::is_not_set")] - pub enabled: Setting, - #[cfg_attr(test, proptest(strategy = "test::setting_strategy()"))] - #[serde(default, skip_serializing_if = "Setting::is_not_set")] - pub min_word_size_for_typos: Setting, - #[cfg_attr(test, proptest(strategy = "test::setting_strategy()"))] - #[serde(default, skip_serializing_if = "Setting::is_not_set")] - pub disable_on_words: Setting>, - #[cfg_attr(test, proptest(strategy = "test::setting_strategy()"))] - #[serde(default, skip_serializing_if = "Setting::is_not_set")] - pub disable_on_attributes: Setting>, -} - -#[cfg_attr(test, derive(proptest_derive::Arbitrary))] -#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)] -#[serde(deny_unknown_fields)] -#[serde(rename_all = "camelCase")] -pub struct FacetingSettings { - #[cfg_attr(test, proptest(strategy = "test::setting_strategy()"))] - #[serde(default, skip_serializing_if = "Setting::is_not_set")] - pub max_values_per_facet: Setting, -} - -#[cfg_attr(test, derive(proptest_derive::Arbitrary))] -#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)] -#[serde(deny_unknown_fields)] -#[serde(rename_all = "camelCase")] -pub struct PaginationSettings { - #[cfg_attr(test, proptest(strategy = "test::setting_strategy()"))] - #[serde(default, skip_serializing_if = "Setting::is_not_set")] - pub max_total_hits: Setting, -} - -/// Holds all the settings for an index. `T` can either be `Checked` if they represents settings -/// whose validity is guaranteed, or `Unchecked` if they need to be validated. In the later case, a -/// call to `check` will return a `Settings` from a `Settings`. -#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)] -#[serde(deny_unknown_fields)] -#[serde(rename_all = "camelCase")] -#[serde(bound(serialize = "T: Serialize", deserialize = "T: Deserialize<'static>"))] -#[cfg_attr(test, derive(proptest_derive::Arbitrary))] -pub struct Settings { - #[serde( - default, - serialize_with = "serialize_with_wildcard", - skip_serializing_if = "Setting::is_not_set" - )] - #[cfg_attr(test, proptest(strategy = "test::setting_strategy()"))] - pub displayed_attributes: Setting>, - - #[serde( - default, - serialize_with = "serialize_with_wildcard", - skip_serializing_if = "Setting::is_not_set" - )] - #[cfg_attr(test, proptest(strategy = "test::setting_strategy()"))] - pub searchable_attributes: Setting>, - - #[serde(default, skip_serializing_if = "Setting::is_not_set")] - #[cfg_attr(test, proptest(strategy = "test::setting_strategy()"))] - pub filterable_attributes: Setting>, - #[serde(default, skip_serializing_if = "Setting::is_not_set")] - #[cfg_attr(test, proptest(strategy = "test::setting_strategy()"))] - pub sortable_attributes: Setting>, - #[serde(default, skip_serializing_if = "Setting::is_not_set")] - #[cfg_attr(test, proptest(strategy = "test::setting_strategy()"))] - pub ranking_rules: Setting>, - #[serde(default, skip_serializing_if = "Setting::is_not_set")] - #[cfg_attr(test, proptest(strategy = "test::setting_strategy()"))] - pub stop_words: Setting>, - #[serde(default, skip_serializing_if = "Setting::is_not_set")] - #[cfg_attr(test, proptest(strategy = "test::setting_strategy()"))] - pub synonyms: Setting>>, - #[serde(default, skip_serializing_if = "Setting::is_not_set")] - #[cfg_attr(test, proptest(strategy = "test::setting_strategy()"))] - pub distinct_attribute: Setting, - #[serde(default, skip_serializing_if = "Setting::is_not_set")] - #[cfg_attr(test, proptest(strategy = "test::setting_strategy()"))] - pub typo_tolerance: Setting, - #[serde(default, skip_serializing_if = "Setting::is_not_set")] - #[cfg_attr(test, proptest(strategy = "test::setting_strategy()"))] - pub faceting: Setting, - #[serde(default, skip_serializing_if = "Setting::is_not_set")] - #[cfg_attr(test, proptest(strategy = "test::setting_strategy()"))] - pub pagination: Setting, - - #[serde(skip)] - pub _kind: PhantomData, -} - -impl Settings { - pub fn cleared() -> Settings { - Settings { - displayed_attributes: Setting::Reset, - searchable_attributes: Setting::Reset, - filterable_attributes: Setting::Reset, - sortable_attributes: Setting::Reset, - ranking_rules: Setting::Reset, - stop_words: Setting::Reset, - synonyms: Setting::Reset, - distinct_attribute: Setting::Reset, - typo_tolerance: Setting::Reset, - faceting: Setting::Reset, - pagination: Setting::Reset, - _kind: PhantomData, - } - } - - pub fn into_unchecked(self) -> Settings { - let Self { - displayed_attributes, - searchable_attributes, - filterable_attributes, - sortable_attributes, - ranking_rules, - stop_words, - synonyms, - distinct_attribute, - typo_tolerance, - faceting, - pagination, - .. - } = self; - - Settings { - displayed_attributes, - searchable_attributes, - filterable_attributes, - sortable_attributes, - ranking_rules, - stop_words, - synonyms, - distinct_attribute, - typo_tolerance, - faceting, - pagination, - _kind: PhantomData, - } - } -} - -impl Settings { - pub fn check(self) -> Settings { - let displayed_attributes = match self.displayed_attributes { - Setting::Set(fields) => { - if fields.iter().any(|f| f == "*") { - Setting::Reset - } else { - Setting::Set(fields) - } - } - otherwise => otherwise, - }; - - let searchable_attributes = match self.searchable_attributes { - Setting::Set(fields) => { - if fields.iter().any(|f| f == "*") { - Setting::Reset - } else { - Setting::Set(fields) - } - } - otherwise => otherwise, - }; - - Settings { - displayed_attributes, - searchable_attributes, - filterable_attributes: self.filterable_attributes, - sortable_attributes: self.sortable_attributes, - ranking_rules: self.ranking_rules, - stop_words: self.stop_words, - synonyms: self.synonyms, - distinct_attribute: self.distinct_attribute, - typo_tolerance: self.typo_tolerance, - faceting: self.faceting, - pagination: self.pagination, - _kind: PhantomData, - } - } -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(deny_unknown_fields)] -#[serde(rename_all = "camelCase")] -pub struct Facets { - pub level_group_size: Option, - pub min_level_size: Option, -} - -impl Index { - fn update_primary_key_txn<'a, 'b>( - &'a self, - txn: &mut milli::heed::RwTxn<'a, 'b>, - primary_key: String, - ) -> Result { - let mut builder = milli::update::Settings::new(txn, self, self.indexer_config.as_ref()); - builder.set_primary_key(primary_key); - builder.execute(|_| ())?; - let meta = IndexMeta::new_txn(self, txn)?; - - Ok(meta) - } - - pub fn update_primary_key(&self, primary_key: String) -> Result { - let mut txn = self.write_txn()?; - let res = self.update_primary_key_txn(&mut txn, primary_key)?; - txn.commit()?; - - Ok(res) - } - - /// Deletes `ids` from the index, and returns how many documents were deleted. - pub fn delete_documents(&self, ids: &[String]) -> Result { - let mut txn = self.write_txn()?; - let mut builder = milli::update::DeleteDocuments::new(&mut txn, self)?; - - // We ignore unexisting document ids - ids.iter().for_each(|id| { - builder.delete_external_id(id); - }); - - let deleted = builder.execute()?; - - txn.commit()?; - - Ok(deleted) - } - - pub fn clear_documents(&self) -> Result<()> { - let mut txn = self.write_txn()?; - milli::update::ClearDocuments::new(&mut txn, self).execute()?; - txn.commit()?; - - Ok(()) - } - - pub fn update_documents( - &self, - method: IndexDocumentsMethod, - primary_key: Option, - file_store: UpdateFileStore, - contents: impl IntoIterator, - ) -> Result>> { - trace!("performing document addition"); - let mut txn = self.write_txn()?; - - if let Some(primary_key) = primary_key { - if self.primary_key(&txn)?.is_none() { - self.update_primary_key_txn(&mut txn, primary_key)?; - } - } - - let config = IndexDocumentsConfig { - update_method: method, - ..Default::default() - }; - - let indexing_callback = |indexing_step| debug!("update: {:?}", indexing_step); - let mut builder = milli::update::IndexDocuments::new( - &mut txn, - self, - self.indexer_config.as_ref(), - config, - indexing_callback, - )?; - - let mut results = Vec::new(); - for content_uuid in contents.into_iter() { - let content_file = file_store.get_update(content_uuid)?; - let reader = DocumentsBatchReader::from_reader(content_file)?; - let (new_builder, user_result) = builder.add_documents(reader)?; - builder = new_builder; - - let user_result = match user_result { - Ok(count) => Ok(DocumentAdditionResult { - indexed_documents: count, - number_of_documents: count, - }), - Err(e) => Err(IndexError::from(e)), - }; - - results.push(user_result); - } - - if results.iter().any(Result::is_ok) { - let addition = builder.execute()?; - txn.commit()?; - info!("document addition done: {:?}", addition); - } - - Ok(results) - } - - pub fn update_settings(&self, settings: &Settings) -> Result<()> { - // We must use the write transaction of the update here. - let mut txn = self.write_txn()?; - let mut builder = - milli::update::Settings::new(&mut txn, self, self.indexer_config.as_ref()); - - apply_settings_to_builder(settings, &mut builder); - - builder.execute(|indexing_step| debug!("update: {:?}", indexing_step))?; - - txn.commit()?; - - Ok(()) - } -} - -pub fn apply_settings_to_builder( - settings: &Settings, - builder: &mut milli::update::Settings, -) { - match settings.searchable_attributes { - Setting::Set(ref names) => builder.set_searchable_fields(names.clone()), - Setting::Reset => builder.reset_searchable_fields(), - Setting::NotSet => (), - } - - match settings.displayed_attributes { - Setting::Set(ref names) => builder.set_displayed_fields(names.clone()), - Setting::Reset => builder.reset_displayed_fields(), - Setting::NotSet => (), - } - - match settings.filterable_attributes { - Setting::Set(ref facets) => { - builder.set_filterable_fields(facets.clone().into_iter().collect()) - } - Setting::Reset => builder.reset_filterable_fields(), - Setting::NotSet => (), - } - - match settings.sortable_attributes { - Setting::Set(ref fields) => builder.set_sortable_fields(fields.iter().cloned().collect()), - Setting::Reset => builder.reset_sortable_fields(), - Setting::NotSet => (), - } - - match settings.ranking_rules { - Setting::Set(ref criteria) => builder.set_criteria(criteria.clone()), - Setting::Reset => builder.reset_criteria(), - Setting::NotSet => (), - } - - match settings.stop_words { - Setting::Set(ref stop_words) => builder.set_stop_words(stop_words.clone()), - Setting::Reset => builder.reset_stop_words(), - Setting::NotSet => (), - } - - match settings.synonyms { - Setting::Set(ref synonyms) => builder.set_synonyms(synonyms.clone().into_iter().collect()), - Setting::Reset => builder.reset_synonyms(), - Setting::NotSet => (), - } - - match settings.distinct_attribute { - Setting::Set(ref attr) => builder.set_distinct_field(attr.clone()), - Setting::Reset => builder.reset_distinct_field(), - Setting::NotSet => (), - } - - match settings.typo_tolerance { - Setting::Set(ref value) => { - match value.enabled { - Setting::Set(val) => builder.set_autorize_typos(val), - Setting::Reset => builder.reset_authorize_typos(), - Setting::NotSet => (), - } - - match value.min_word_size_for_typos { - Setting::Set(ref setting) => { - match setting.one_typo { - Setting::Set(val) => builder.set_min_word_len_one_typo(val), - Setting::Reset => builder.reset_min_word_len_one_typo(), - Setting::NotSet => (), - } - match setting.two_typos { - Setting::Set(val) => builder.set_min_word_len_two_typos(val), - Setting::Reset => builder.reset_min_word_len_two_typos(), - Setting::NotSet => (), - } - } - Setting::Reset => { - builder.reset_min_word_len_one_typo(); - builder.reset_min_word_len_two_typos(); - } - Setting::NotSet => (), - } - - match value.disable_on_words { - Setting::Set(ref words) => { - builder.set_exact_words(words.clone()); - } - Setting::Reset => builder.reset_exact_words(), - Setting::NotSet => (), - } - - match value.disable_on_attributes { - Setting::Set(ref words) => { - builder.set_exact_attributes(words.iter().cloned().collect()) - } - Setting::Reset => builder.reset_exact_attributes(), - Setting::NotSet => (), - } - } - Setting::Reset => { - // all typo settings need to be reset here. - builder.reset_authorize_typos(); - builder.reset_min_word_len_one_typo(); - builder.reset_min_word_len_two_typos(); - builder.reset_exact_words(); - builder.reset_exact_attributes(); - } - Setting::NotSet => (), - } - - match settings.faceting { - Setting::Set(ref value) => match value.max_values_per_facet { - Setting::Set(val) => builder.set_max_values_per_facet(val), - Setting::Reset => builder.reset_max_values_per_facet(), - Setting::NotSet => (), - }, - Setting::Reset => builder.reset_max_values_per_facet(), - Setting::NotSet => (), - } - - match settings.pagination { - Setting::Set(ref value) => match value.max_total_hits { - Setting::Set(val) => builder.set_pagination_max_total_hits(val), - Setting::Reset => builder.reset_pagination_max_total_hits(), - Setting::NotSet => (), - }, - Setting::Reset => builder.reset_pagination_max_total_hits(), - Setting::NotSet => (), - } -} - -#[cfg(test)] -pub(crate) mod test { - use proptest::prelude::*; - - use super::*; - - pub(super) fn setting_strategy() -> impl Strategy> { - prop_oneof![ - Just(Setting::NotSet), - Just(Setting::Reset), - any::().prop_map(Setting::Set) - ] - } - - #[test] - fn test_setting_check() { - // test no changes - let settings = Settings { - displayed_attributes: Setting::Set(vec![String::from("hello")]), - searchable_attributes: Setting::Set(vec![String::from("hello")]), - filterable_attributes: Setting::NotSet, - sortable_attributes: Setting::NotSet, - ranking_rules: Setting::NotSet, - stop_words: Setting::NotSet, - synonyms: Setting::NotSet, - distinct_attribute: Setting::NotSet, - typo_tolerance: Setting::NotSet, - faceting: Setting::NotSet, - pagination: Setting::NotSet, - _kind: PhantomData::, - }; - - let checked = settings.clone().check(); - assert_eq!(settings.displayed_attributes, checked.displayed_attributes); - assert_eq!( - settings.searchable_attributes, - checked.searchable_attributes - ); - - // test wildcard - // test no changes - let settings = Settings { - displayed_attributes: Setting::Set(vec![String::from("*")]), - searchable_attributes: Setting::Set(vec![String::from("hello"), String::from("*")]), - filterable_attributes: Setting::NotSet, - sortable_attributes: Setting::NotSet, - ranking_rules: Setting::NotSet, - stop_words: Setting::NotSet, - synonyms: Setting::NotSet, - distinct_attribute: Setting::NotSet, - typo_tolerance: Setting::NotSet, - faceting: Setting::NotSet, - pagination: Setting::NotSet, - _kind: PhantomData::, - }; - - let checked = settings.check(); - assert_eq!(checked.displayed_attributes, Setting::Reset); - assert_eq!(checked.searchable_attributes, Setting::Reset); - } -} diff --git a/meilisearch-lib/src/index_resolver/index_store.rs b/meilisearch-lib/src/index_resolver/index_store.rs deleted file mode 100644 index ea3c7125a..000000000 --- a/meilisearch-lib/src/index_resolver/index_store.rs +++ /dev/null @@ -1,108 +0,0 @@ -use std::collections::HashMap; -use std::convert::TryFrom; -use std::path::{Path, PathBuf}; -use std::sync::Arc; - -use milli::update::IndexerConfig; -use tokio::fs; -use tokio::sync::RwLock; -use tokio::task::spawn_blocking; -use uuid::Uuid; - -use super::error::{IndexResolverError, Result}; -use crate::index::Index; -use crate::options::IndexerOpts; - -type AsyncMap = Arc>>; - -#[async_trait::async_trait] -#[cfg_attr(test, mockall::automock)] -pub trait IndexStore { - async fn create(&self, uuid: Uuid) -> Result; - async fn get(&self, uuid: Uuid) -> Result>; - async fn delete(&self, uuid: Uuid) -> Result>; -} - -pub struct MapIndexStore { - index_store: AsyncMap, - path: PathBuf, - index_size: usize, - indexer_config: Arc, -} - -impl MapIndexStore { - pub fn new( - path: impl AsRef, - index_size: usize, - indexer_opts: &IndexerOpts, - ) -> anyhow::Result { - let indexer_config = Arc::new(IndexerConfig::try_from(indexer_opts)?); - let path = path.as_ref().join("indexes/"); - let index_store = Arc::new(RwLock::new(HashMap::new())); - Ok(Self { - index_store, - path, - index_size, - indexer_config, - }) - } -} - -#[async_trait::async_trait] -impl IndexStore for MapIndexStore { - async fn create(&self, uuid: Uuid) -> Result { - // We need to keep the lock until we are sure the db file has been opened correctly, to - // ensure that another db is not created at the same time. - let mut lock = self.index_store.write().await; - - if let Some(index) = lock.get(&uuid) { - return Ok(index.clone()); - } - let path = self.path.join(format!("{}", uuid)); - if path.exists() { - return Err(IndexResolverError::UuidAlreadyExists(uuid)); - } - - let index_size = self.index_size; - let update_handler = self.indexer_config.clone(); - let index = spawn_blocking(move || -> Result { - let index = Index::open(path, index_size, uuid, update_handler)?; - Ok(index) - }) - .await??; - - lock.insert(uuid, index.clone()); - - Ok(index) - } - - async fn get(&self, uuid: Uuid) -> Result> { - let guard = self.index_store.read().await; - match guard.get(&uuid) { - Some(index) => Ok(Some(index.clone())), - None => { - // drop the guard here so we can perform the write after without deadlocking; - drop(guard); - let path = self.path.join(format!("{}", uuid)); - if !path.exists() { - return Ok(None); - } - - let index_size = self.index_size; - let update_handler = self.indexer_config.clone(); - let index = - spawn_blocking(move || Index::open(path, index_size, uuid, update_handler)) - .await??; - self.index_store.write().await.insert(uuid, index.clone()); - Ok(Some(index)) - } - } - } - - async fn delete(&self, uuid: Uuid) -> Result> { - let db_path = self.path.join(format!("{}", uuid)); - fs::remove_dir_all(db_path).await?; - let index = self.index_store.write().await.remove(&uuid); - Ok(index) - } -} diff --git a/meilisearch-lib/src/lib.rs b/meilisearch-lib/src/lib.rs deleted file mode 100644 index 3a16daeea..000000000 --- a/meilisearch-lib/src/lib.rs +++ /dev/null @@ -1,50 +0,0 @@ -#[macro_use] -pub mod error; -pub mod options; - -mod analytics; -mod document_formats; -// TODO: TAMO: reenable the dumps -#[cfg(todo)] -mod dump; -mod index_controller; -mod snapshot; - -use std::env::VarError; -use std::ffi::OsStr; -use std::path::Path; - -// TODO: TAMO: rename the MeiliSearch in Meilisearch -pub use index_controller::error::IndexControllerError; -pub use index_controller::Meilisearch as MeiliSearch; -pub use milli; -pub use milli::heed; - -mod compression; - -/// Check if a db is empty. It does not provide any information on the -/// validity of the data in it. -/// We consider a database as non empty when it's a non empty directory. -pub fn is_empty_db(db_path: impl AsRef) -> bool { - let db_path = db_path.as_ref(); - - if !db_path.exists() { - true - // if we encounter an error or if the db is a file we consider the db non empty - } else if let Ok(dir) = db_path.read_dir() { - dir.count() == 0 - } else { - true - } -} - -/// Checks if the key is defined in the environment variables. -/// If not, inserts it with the given value. -pub fn export_to_env_if_not_present(key: &str, value: T) -where - T: AsRef, -{ - if let Err(VarError::NotPresent) = std::env::var(key) { - std::env::set_var(key, value); - } -} diff --git a/meilisearch-lib/src/options.rs b/meilisearch-lib/src/options.rs deleted file mode 100644 index b84dd94a2..000000000 --- a/meilisearch-lib/src/options.rs +++ /dev/null @@ -1,205 +0,0 @@ -use crate::export_to_env_if_not_present; - -use core::fmt; -use std::{convert::TryFrom, num::ParseIntError, ops::Deref, str::FromStr}; - -use byte_unit::{Byte, ByteError}; -use clap::Parser; -use milli::update::IndexerConfig; -use serde::{Deserialize, Serialize}; -use sysinfo::{RefreshKind, System, SystemExt}; - -const MEILI_MAX_INDEXING_MEMORY: &str = "MEILI_MAX_INDEXING_MEMORY"; -const MEILI_MAX_INDEXING_THREADS: &str = "MEILI_MAX_INDEXING_THREADS"; -const DISABLE_AUTO_BATCHING: &str = "DISABLE_AUTO_BATCHING"; -const DEFAULT_LOG_EVERY_N: usize = 100000; - -#[derive(Debug, Clone, Parser, Serialize, Deserialize)] -#[serde(rename_all = "snake_case", deny_unknown_fields)] -pub struct IndexerOpts { - /// Sets the amount of documents to skip before printing - /// a log regarding the indexing advancement. - #[serde(skip_serializing, default = "default_log_every_n")] - #[clap(long, default_value_t = default_log_every_n(), hide = true)] // 100k - pub log_every_n: usize, - - /// Grenad max number of chunks in bytes. - #[serde(skip_serializing)] - #[clap(long, hide = true)] - pub max_nb_chunks: Option, - - /// Sets the maximum amount of RAM Meilisearch can use when indexing. By default, Meilisearch - /// uses no more than two thirds of available memory. - #[clap(long, env = MEILI_MAX_INDEXING_MEMORY, default_value_t)] - #[serde(default)] - pub max_indexing_memory: MaxMemory, - - /// Sets the maximum number of threads Meilisearch can use during indexation. By default, the - /// indexer avoids using more than half of a machine's total processing units. This ensures - /// Meilisearch is always ready to perform searches, even while you are updating an index. - #[clap(long, env = MEILI_MAX_INDEXING_THREADS, default_value_t)] - #[serde(default)] - pub max_indexing_threads: MaxThreads, -} - -#[derive(Debug, Clone, Parser, Default, Serialize, Deserialize)] -#[serde(rename_all = "snake_case", deny_unknown_fields)] -pub struct SchedulerConfig { - /// Deactivates auto-batching when provided. - #[clap(long, env = DISABLE_AUTO_BATCHING)] - #[serde(default)] - pub disable_auto_batching: bool, -} - -impl IndexerOpts { - /// Exports the values to their corresponding env vars if they are not set. - pub fn export_to_env(self) { - let IndexerOpts { - max_indexing_memory, - max_indexing_threads, - log_every_n: _, - max_nb_chunks: _, - } = self; - if let Some(max_indexing_memory) = max_indexing_memory.0 { - export_to_env_if_not_present( - MEILI_MAX_INDEXING_MEMORY, - max_indexing_memory.to_string(), - ); - } - export_to_env_if_not_present( - MEILI_MAX_INDEXING_THREADS, - max_indexing_threads.0.to_string(), - ); - } -} - -impl TryFrom<&IndexerOpts> for IndexerConfig { - type Error = anyhow::Error; - - fn try_from(other: &IndexerOpts) -> Result { - let thread_pool = rayon::ThreadPoolBuilder::new() - .num_threads(*other.max_indexing_threads) - .build()?; - - Ok(Self { - log_every_n: Some(other.log_every_n), - max_nb_chunks: other.max_nb_chunks, - max_memory: other.max_indexing_memory.map(|b| b.get_bytes() as usize), - thread_pool: Some(thread_pool), - max_positions_per_attributes: None, - ..Default::default() - }) - } -} - -impl Default for IndexerOpts { - fn default() -> Self { - Self { - log_every_n: 100_000, - max_nb_chunks: None, - max_indexing_memory: MaxMemory::default(), - max_indexing_threads: MaxThreads::default(), - } - } -} - -impl SchedulerConfig { - pub fn export_to_env(self) { - let SchedulerConfig { - disable_auto_batching, - } = self; - export_to_env_if_not_present(DISABLE_AUTO_BATCHING, disable_auto_batching.to_string()); - } -} - -/// A type used to detect the max memory available and use 2/3 of it. -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] -pub struct MaxMemory(Option); - -impl FromStr for MaxMemory { - type Err = ByteError; - - fn from_str(s: &str) -> Result { - Byte::from_str(s).map(Some).map(MaxMemory) - } -} - -impl Default for MaxMemory { - fn default() -> MaxMemory { - MaxMemory( - total_memory_bytes() - .map(|bytes| bytes * 2 / 3) - .map(Byte::from_bytes), - ) - } -} - -impl fmt::Display for MaxMemory { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self.0 { - Some(memory) => write!(f, "{}", memory.get_appropriate_unit(true)), - None => f.write_str("unknown"), - } - } -} - -impl Deref for MaxMemory { - type Target = Option; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl MaxMemory { - pub fn unlimited() -> Self { - Self(None) - } -} - -/// Returns the total amount of bytes available or `None` if this system isn't supported. -fn total_memory_bytes() -> Option { - if System::IS_SUPPORTED { - let memory_kind = RefreshKind::new().with_memory(); - let mut system = System::new_with_specifics(memory_kind); - system.refresh_memory(); - Some(system.total_memory() * 1024) // KiB into bytes - } else { - None - } -} - -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] -pub struct MaxThreads(usize); - -impl FromStr for MaxThreads { - type Err = ParseIntError; - - fn from_str(s: &str) -> Result { - usize::from_str(s).map(Self) - } -} - -impl Default for MaxThreads { - fn default() -> Self { - MaxThreads(num_cpus::get() / 2) - } -} - -impl fmt::Display for MaxThreads { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{}", self.0) - } -} - -impl Deref for MaxThreads { - type Target = usize; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -fn default_log_every_n() -> usize { - DEFAULT_LOG_EVERY_N -} diff --git a/meilisearch-lib/src/snapshot.rs b/meilisearch-lib/src/snapshot.rs deleted file mode 100644 index 5d68907c8..000000000 --- a/meilisearch-lib/src/snapshot.rs +++ /dev/null @@ -1,204 +0,0 @@ -use std::fs; -use std::path::{Path, PathBuf}; -use std::sync::Arc; -use std::time::Duration; - -use anyhow::bail; -use fs_extra::dir::{self, CopyOptions}; -use log::{info, trace}; -use meilisearch_auth::open_auth_store_env; -use milli::heed::CompactionOption; -use tokio::sync::RwLock; -use tokio::time::sleep; -use walkdir::WalkDir; - -use crate::compression::from_tar_gz; -use crate::index_controller::open_meta_env; -use crate::index_controller::versioning::VERSION_FILE_NAME; -use index_scheduler::IndexScheduler; - -pub struct SnapshotService { - pub(crate) db_path: PathBuf, - pub(crate) snapshot_period: Duration, - pub(crate) snapshot_path: PathBuf, - pub(crate) index_size: usize, - pub(crate) meta_env_size: usize, - pub(crate) scheduler: IndexScheduler, -} - -impl SnapshotService { - pub async fn run(self) { - info!( - "Snapshot scheduled every {}s.", - self.snapshot_period.as_secs() - ); - loop { - let snapshot_job = SnapshotJob { - dest_path: self.snapshot_path.clone(), - src_path: self.db_path.clone(), - meta_env_size: self.meta_env_size, - index_size: self.index_size, - }; - // TODO: TAMO: reenable the snapshots - // self.scheduler.write().await.schedule_snapshot(snapshot_job); - sleep(self.snapshot_period).await; - } - } -} - -pub fn load_snapshot( - db_path: impl AsRef, - snapshot_path: impl AsRef, - ignore_snapshot_if_db_exists: bool, - ignore_missing_snapshot: bool, -) -> anyhow::Result<()> { - let empty_db = crate::is_empty_db(&db_path); - let snapshot_path_exists = snapshot_path.as_ref().exists(); - - if empty_db && snapshot_path_exists { - match from_tar_gz(snapshot_path, &db_path) { - Ok(()) => Ok(()), - Err(e) => { - //clean created db folder - std::fs::remove_dir_all(&db_path)?; - Err(e) - } - } - } else if !empty_db && !ignore_snapshot_if_db_exists { - bail!( - "database already exists at {:?}, try to delete it or rename it", - db_path - .as_ref() - .canonicalize() - .unwrap_or_else(|_| db_path.as_ref().to_owned()) - ) - } else if !snapshot_path_exists && !ignore_missing_snapshot { - bail!("snapshot doesn't exist at {:?}", snapshot_path.as_ref()) - } else { - Ok(()) - } -} - -#[derive(Debug)] -pub struct SnapshotJob { - dest_path: PathBuf, - src_path: PathBuf, - - meta_env_size: usize, - index_size: usize, -} - -impl SnapshotJob { - pub async fn run(self) -> anyhow::Result<()> { - tokio::task::spawn_blocking(|| self.run_sync()).await??; - - Ok(()) - } - - fn run_sync(self) -> anyhow::Result<()> { - trace!("Performing snapshot."); - - let snapshot_dir = self.dest_path.clone(); - std::fs::create_dir_all(&snapshot_dir)?; - let temp_snapshot_dir = tempfile::tempdir()?; - let temp_snapshot_path = temp_snapshot_dir.path(); - - self.snapshot_version_file(temp_snapshot_path)?; - self.snapshot_meta_env(temp_snapshot_path)?; - self.snapshot_file_store(temp_snapshot_path)?; - self.snapshot_indexes(temp_snapshot_path)?; - self.snapshot_auth(temp_snapshot_path)?; - - let db_name = self - .src_path - .file_name() - .and_then(|n| n.to_str()) - .unwrap_or("data.ms") - .to_string(); - - let snapshot_path = self.dest_path.join(format!("{}.snapshot", db_name)); - let temp_snapshot_file = tempfile::NamedTempFile::new_in(&snapshot_dir)?; - let temp_snapshot_file_path = temp_snapshot_file.path().to_owned(); - crate::compression::to_tar_gz(temp_snapshot_path, temp_snapshot_file_path)?; - let _file = temp_snapshot_file.persist(&snapshot_path)?; - - #[cfg(unix)] - { - use std::fs::Permissions; - use std::os::unix::fs::PermissionsExt; - - let perm = Permissions::from_mode(0o644); - _file.set_permissions(perm)?; - } - - trace!("Created snapshot in {:?}.", snapshot_path); - - Ok(()) - } - - fn snapshot_version_file(&self, path: &Path) -> anyhow::Result<()> { - let dst = path.join(VERSION_FILE_NAME); - let src = self.src_path.join(VERSION_FILE_NAME); - - fs::copy(src, dst)?; - - Ok(()) - } - - fn snapshot_meta_env(&self, path: &Path) -> anyhow::Result<()> { - let env = open_meta_env(&self.src_path, self.meta_env_size)?; - - let dst = path.join("data.mdb"); - env.copy_to_path(dst, milli::heed::CompactionOption::Enabled)?; - - Ok(()) - } - - fn snapshot_file_store(&self, path: &Path) -> anyhow::Result<()> { - // for now we simply copy the updates/updates_files - // FIXME(marin): We may copy more files than necessary, if new files are added while we are - // performing the snapshop. We need a way to filter them out. - - let dst = path.join("updates"); - fs::create_dir_all(&dst)?; - let options = CopyOptions::default(); - dir::copy(self.src_path.join("updates/updates_files"), dst, &options)?; - - Ok(()) - } - - fn snapshot_indexes(&self, path: &Path) -> anyhow::Result<()> { - let indexes_path = self.src_path.join("indexes/"); - let dst = path.join("indexes/"); - - for entry in WalkDir::new(indexes_path).max_depth(1).into_iter().skip(1) { - let entry = entry?; - let name = entry.file_name(); - let dst = dst.join(name); - - std::fs::create_dir_all(&dst)?; - - let dst = dst.join("data.mdb"); - - let mut options = milli::heed::EnvOpenOptions::new(); - options.map_size(self.index_size); - options.max_readers(1024); - let index = milli::Index::new(options, entry.path())?; - index.copy_to_path(dst, CompactionOption::Enabled)?; - } - - Ok(()) - } - - fn snapshot_auth(&self, path: &Path) -> anyhow::Result<()> { - let auth_path = self.src_path.join("auth"); - let dst = path.join("auth"); - std::fs::create_dir_all(&dst)?; - let dst = dst.join("data.mdb"); - - let env = open_auth_store_env(&auth_path)?; - env.copy_to_path(dst, milli::heed::CompactionOption::Enabled)?; - - Ok(()) - } -} diff --git a/meilisearch-lib/src/tasks/task_store/mod.rs b/meilisearch-lib/src/tasks/task_store/mod.rs deleted file mode 100644 index 55dfe17d3..000000000 --- a/meilisearch-lib/src/tasks/task_store/mod.rs +++ /dev/null @@ -1,420 +0,0 @@ -mod store; - -use std::collections::HashSet; -use std::io::{BufWriter, Write}; -use std::path::Path; -use std::sync::Arc; - -use log::debug; -use milli::heed::{Env, RwTxn}; -use time::OffsetDateTime; - -use super::batch::BatchContent; -use super::error::TaskError; -use super::scheduler::Processing; -use super::task::{Task, TaskContent, TaskId}; -use super::Result; -use crate::tasks::task::TaskEvent; -use crate::update_file_store::UpdateFileStore; - -#[cfg(test)] -pub use store::test::MockStore as Store; -#[cfg(not(test))] -pub use store::Store; - -type FilterFn = Box bool + Sync + Send + 'static>; - -/// Defines constraints to be applied when querying for Tasks from the store. -#[derive(Default)] -pub struct TaskFilter { - indexes: Option>, - filter_fn: Option, -} - -impl TaskFilter { - fn pass(&self, task: &Task) -> bool { - match task.index_uid() { - Some(index_uid) => self - .indexes - .as_ref() - .map_or(true, |indexes| indexes.contains(index_uid)), - None => false, - } - } - - fn filtered_indexes(&self) -> Option<&HashSet> { - self.indexes.as_ref() - } - - /// Adds an index to the filter, so the filter must match this index. - pub fn filter_index(&mut self, index: String) { - self.indexes - .get_or_insert_with(Default::default) - .insert(index); - } - - pub fn filter_fn(&mut self, f: FilterFn) { - self.filter_fn.replace(f); - } -} - -pub struct TaskStore { - store: Arc, -} - -impl Clone for TaskStore { - fn clone(&self) -> Self { - Self { - store: self.store.clone(), - } - } -} - -impl TaskStore { - pub fn new(env: Arc) -> Result { - let store = Arc::new(Store::new(env)?); - Ok(Self { store }) - } - - pub async fn register(&self, content: TaskContent) -> Result { - debug!("registering update: {:?}", content); - let store = self.store.clone(); - let task = tokio::task::spawn_blocking(move || -> Result { - let mut txn = store.wtxn()?; - let next_task_id = store.next_task_id(&mut txn)?; - let created_at = TaskEvent::Created(OffsetDateTime::now_utc()); - let task = Task { - id: next_task_id, - content, - events: vec![created_at], - }; - - store.put(&mut txn, &task)?; - txn.commit()?; - - Ok(task) - }) - .await??; - - Ok(task) - } - - pub fn register_raw_update(&self, wtxn: &mut RwTxn, task: &Task) -> Result<()> { - self.store.put(wtxn, task)?; - Ok(()) - } - - pub async fn get_task(&self, id: TaskId, filter: Option) -> Result { - let store = self.store.clone(); - let task = tokio::task::spawn_blocking(move || -> Result<_> { - let txn = store.rtxn()?; - let task = store.get(&txn, id)?; - Ok(task) - }) - .await?? - .ok_or(TaskError::UnexistingTask(id))?; - - match filter { - Some(filter) => filter - .pass(&task) - .then_some(task) - .ok_or(TaskError::UnexistingTask(id)), - None => Ok(task), - } - } - - /// This methods takes a `Processing` which contains the next task ids to process, and returns - /// the corresponding tasks along with the ownership to the passed processing. - /// - /// We need get_processing_tasks to take ownership over `Processing` because we need it to be - /// valid for 'static. - pub async fn get_processing_tasks( - &self, - processing: Processing, - ) -> Result<(Processing, BatchContent)> { - let store = self.store.clone(); - let tasks = tokio::task::spawn_blocking(move || -> Result<_> { - let txn = store.rtxn()?; - - let content = match processing { - Processing::DocumentAdditions(ref ids) => { - let mut tasks = Vec::new(); - - for id in ids.iter() { - let task = store - .get(&txn, *id)? - .ok_or(TaskError::UnexistingTask(*id))?; - tasks.push(task); - } - BatchContent::DocumentsAdditionBatch(tasks) - } - Processing::IndexUpdate(id) => { - let task = store.get(&txn, id)?.ok_or(TaskError::UnexistingTask(id))?; - BatchContent::IndexUpdate(task) - } - Processing::Dump(id) => { - let task = store.get(&txn, id)?.ok_or(TaskError::UnexistingTask(id))?; - debug_assert!(matches!(task.content, TaskContent::Dump { .. })); - BatchContent::Dump(task) - } - Processing::Nothing => BatchContent::Empty, - }; - - Ok((processing, content)) - }) - .await??; - - Ok(tasks) - } - - pub async fn update_tasks(&self, tasks: Vec) -> Result> { - let store = self.store.clone(); - - let tasks = tokio::task::spawn_blocking(move || -> Result<_> { - let mut txn = store.wtxn()?; - - for task in &tasks { - store.put(&mut txn, task)?; - } - - txn.commit()?; - - Ok(tasks) - }) - .await??; - - Ok(tasks) - } - - pub async fn fetch_unfinished_tasks(&self, offset: Option) -> Result> { - let store = self.store.clone(); - - tokio::task::spawn_blocking(move || { - let txn = store.rtxn()?; - let tasks = store.fetch_unfinished_tasks(&txn, offset)?; - Ok(tasks) - }) - .await? - } - - pub async fn list_tasks( - &self, - offset: Option, - filter: Option, - limit: Option, - ) -> Result> { - let store = self.store.clone(); - - tokio::task::spawn_blocking(move || { - let txn = store.rtxn()?; - let tasks = store.list_tasks(&txn, offset, filter, limit)?; - Ok(tasks) - }) - .await? - } - - pub async fn dump( - env: Arc, - dir_path: impl AsRef, - update_file_store: UpdateFileStore, - ) -> Result<()> { - let store = Self::new(env)?; - let update_dir = dir_path.as_ref().join("updates"); - let updates_file = update_dir.join("data.jsonl"); - let tasks = store.list_tasks(None, None, None).await?; - - let dir_path = dir_path.as_ref().to_path_buf(); - tokio::task::spawn_blocking(move || -> Result<()> { - std::fs::create_dir(&update_dir)?; - let updates_file = std::fs::File::create(updates_file)?; - let mut updates_file = BufWriter::new(updates_file); - - for task in tasks { - serde_json::to_writer(&mut updates_file, &task)?; - updates_file.write_all(b"\n")?; - - if !task.is_finished() { - if let Some(content_uuid) = task.get_content_uuid() { - update_file_store.dump(content_uuid, &dir_path)?; - } - } - } - updates_file.flush()?; - Ok(()) - }) - .await??; - - Ok(()) - } - - pub fn load_dump(src: impl AsRef, env: Arc) -> anyhow::Result<()> { - // create a dummy update field store, since it is not needed right now. - let store = Self::new(env.clone())?; - - let src_update_path = src.as_ref().join("updates"); - let update_data = std::fs::File::open(&src_update_path.join("data.jsonl"))?; - let update_data = std::io::BufReader::new(update_data); - - let stream = serde_json::Deserializer::from_reader(update_data).into_iter::(); - - let mut wtxn = env.write_txn()?; - for entry in stream { - store.register_raw_update(&mut wtxn, &entry?)?; - } - wtxn.commit()?; - - Ok(()) - } -} - -#[cfg(test)] -pub mod test { - use crate::tasks::{scheduler::Processing, task_store::store::test::tmp_env}; - - use super::*; - - use meilisearch_types::index_uid::IndexUid; - use nelson::Mocker; - use proptest::{ - strategy::Strategy, - test_runner::{Config, TestRunner}, - }; - - pub enum MockTaskStore { - Real(TaskStore), - Mock(Arc), - } - - impl Clone for MockTaskStore { - fn clone(&self) -> Self { - match self { - Self::Real(x) => Self::Real(x.clone()), - Self::Mock(x) => Self::Mock(x.clone()), - } - } - } - - impl MockTaskStore { - pub fn new(env: Arc) -> Result { - Ok(Self::Real(TaskStore::new(env)?)) - } - - pub async fn dump( - env: Arc, - path: impl AsRef, - update_file_store: UpdateFileStore, - ) -> Result<()> { - TaskStore::dump(env, path, update_file_store).await - } - - pub fn mock(mocker: Mocker) -> Self { - Self::Mock(Arc::new(mocker)) - } - - pub async fn update_tasks(&self, tasks: Vec) -> Result> { - match self { - Self::Real(s) => s.update_tasks(tasks).await, - Self::Mock(m) => unsafe { - m.get::<_, Result>>("update_tasks").call(tasks) - }, - } - } - - pub async fn get_task(&self, id: TaskId, filter: Option) -> Result { - match self { - Self::Real(s) => s.get_task(id, filter).await, - Self::Mock(m) => unsafe { m.get::<_, Result>("get_task").call((id, filter)) }, - } - } - - pub async fn get_processing_tasks( - &self, - tasks: Processing, - ) -> Result<(Processing, BatchContent)> { - match self { - Self::Real(s) => s.get_processing_tasks(tasks).await, - Self::Mock(m) => unsafe { m.get("get_pending_task").call(tasks) }, - } - } - - pub async fn fetch_unfinished_tasks(&self, from: Option) -> Result> { - match self { - Self::Real(s) => s.fetch_unfinished_tasks(from).await, - Self::Mock(m) => unsafe { m.get("fetch_unfinished_tasks").call(from) }, - } - } - - pub async fn list_tasks( - &self, - from: Option, - filter: Option, - limit: Option, - ) -> Result> { - match self { - Self::Real(s) => s.list_tasks(from, filter, limit).await, - Self::Mock(m) => unsafe { m.get("list_tasks").call((from, filter, limit)) }, - } - } - - pub async fn register(&self, content: TaskContent) -> Result { - match self { - Self::Real(s) => s.register(content).await, - Self::Mock(_m) => todo!(), - } - } - - pub fn register_raw_update(&self, wtxn: &mut RwTxn, task: &Task) -> Result<()> { - match self { - Self::Real(s) => s.register_raw_update(wtxn, task), - Self::Mock(_m) => todo!(), - } - } - - pub fn load_dump(path: impl AsRef, env: Arc) -> anyhow::Result<()> { - TaskStore::load_dump(path, env) - } - } - - #[test] - fn test_increment_task_id() { - let tmp = tmp_env(); - let store = Store::new(tmp.env()).unwrap(); - - let mut txn = store.wtxn().unwrap(); - assert_eq!(store.next_task_id(&mut txn).unwrap(), 0); - txn.abort().unwrap(); - - let gen_task = |id: TaskId| Task { - id, - content: TaskContent::IndexCreation { - primary_key: None, - index_uid: IndexUid::new_unchecked("test"), - }, - events: Vec::new(), - }; - - let mut runner = TestRunner::new(Config::default()); - runner - .run(&(0..100u32).prop_map(gen_task), |task| { - let mut txn = store.wtxn().unwrap(); - let previous_id = store.next_task_id(&mut txn).unwrap(); - - store.put(&mut txn, &task).unwrap(); - - let next_id = store.next_task_id(&mut txn).unwrap(); - - // if we put a task whose task_id is less than the next_id, then the next_id remains - // unchanged, otherwise it becomes task.id + 1 - if task.id < previous_id { - assert_eq!(next_id, previous_id) - } else { - assert_eq!(next_id, task.id + 1); - } - - txn.commit().unwrap(); - - Ok(()) - }) - .unwrap(); - } -} diff --git a/meilisearch-lib/src/tasks/task_store/store.rs b/meilisearch-lib/src/tasks/task_store/store.rs deleted file mode 100644 index 32b20aeb8..000000000 --- a/meilisearch-lib/src/tasks/task_store/store.rs +++ /dev/null @@ -1,377 +0,0 @@ -#[allow(clippy::upper_case_acronyms)] - -type BEU32 = milli::heed::zerocopy::U32; - -const INDEX_UIDS_TASK_IDS: &str = "index-uids-task-ids"; -const TASKS: &str = "tasks"; - -use std::collections::HashSet; -use std::ops::Bound::{Excluded, Unbounded}; -use std::result::Result as StdResult; -use std::sync::Arc; - -use milli::heed::types::{OwnedType, SerdeJson, Str}; -use milli::heed::{Database, Env, RoTxn, RwTxn}; -use milli::heed_codec::RoaringBitmapCodec; -use roaring::RoaringBitmap; - -use crate::tasks::task::{Task, TaskId}; - -use super::super::Result; -use super::TaskFilter; - -pub struct Store { - env: Arc, - /// Maps an index uid to the set of tasks ids associated to it. - index_uid_task_ids: Database, - tasks: Database, SerdeJson>, -} - -impl Drop for Store { - fn drop(&mut self) { - if Arc::strong_count(&self.env) == 1 { - self.env.as_ref().clone().prepare_for_closing(); - } - } -} - -impl Store { - /// Create a new store from the specified `Path`. - /// Be really cautious when calling this function, the returned `Store` may - /// be in an invalid state, with dangling processing tasks. - /// You want to patch all un-finished tasks and put them in your pending - /// queue with the `reset_and_return_unfinished_update` method. - pub fn new(env: Arc) -> Result { - let index_uid_task_ids = env.create_database(Some(INDEX_UIDS_TASK_IDS))?; - let tasks = env.create_database(Some(TASKS))?; - - Ok(Self { - env, - index_uid_task_ids, - tasks, - }) - } - - pub fn wtxn(&self) -> Result { - Ok(self.env.write_txn()?) - } - - pub fn rtxn(&self) -> Result { - Ok(self.env.read_txn()?) - } - - /// Returns the id for the next task. - /// - /// The required `mut txn` acts as a reservation system. It guarantees that as long as you commit - /// the task to the store in the same transaction, no one else will have this task id. - pub fn next_task_id(&self, txn: &mut RwTxn) -> Result { - let id = self - .tasks - .lazily_decode_data() - .last(txn)? - .map(|(id, _)| id.get() + 1) - .unwrap_or(0); - Ok(id) - } - - pub fn put(&self, txn: &mut RwTxn, task: &Task) -> Result<()> { - self.tasks.put(txn, &BEU32::new(task.id), task)?; - // only add the task to the indexes index if it has an index_uid - if let Some(index_uid) = task.index_uid() { - let mut tasks_set = self - .index_uid_task_ids - .get(txn, index_uid)? - .unwrap_or_default(); - - tasks_set.insert(task.id); - - self.index_uid_task_ids.put(txn, index_uid, &tasks_set)?; - } - - Ok(()) - } - - pub fn get(&self, txn: &RoTxn, id: TaskId) -> Result> { - let task = self.tasks.get(txn, &BEU32::new(id))?; - Ok(task) - } - - /// Returns the unfinished tasks starting from the given taskId in ascending order. - pub fn fetch_unfinished_tasks(&self, txn: &RoTxn, from: Option) -> Result> { - // We must NEVER re-enqueue an already processed task! It's content uuid would point to an unexisting file. - // - // TODO(marin): This may create some latency when the first batch lazy loads the pending updates. - let from = from.unwrap_or_default(); - - let result: StdResult, milli::heed::Error> = self - .tasks - .range(txn, &(BEU32::new(from)..))? - .map(|r| r.map(|(_, t)| t)) - .filter(|result| result.as_ref().map_or(true, |t| !t.is_finished())) - .collect(); - - result.map_err(Into::into) - } - - /// Returns all the tasks starting from the given taskId and going in descending order. - pub fn list_tasks( - &self, - txn: &RoTxn, - from: Option, - filter: Option, - limit: Option, - ) -> Result> { - let from = match from { - Some(from) => from, - None => self.tasks.last(txn)?.map_or(0, |(id, _)| id.get()), - }; - - let filter_fn = |task: &Task| { - filter - .as_ref() - .and_then(|f| f.filter_fn.as_ref()) - .map_or(true, |f| f(task)) - }; - - let result: Result> = match filter.as_ref().and_then(|f| f.filtered_indexes()) { - Some(indexes) => self - .compute_candidates(txn, indexes, from)? - .filter(|result| result.as_ref().map_or(true, filter_fn)) - .take(limit.unwrap_or(usize::MAX)) - .collect(), - None => self - .tasks - .rev_range(txn, &(..=BEU32::new(from)))? - .map(|r| r.map(|(_, t)| t).map_err(Into::into)) - .filter(|result| result.as_ref().map_or(true, filter_fn)) - .take(limit.unwrap_or(usize::MAX)) - .collect(), - }; - - result.map_err(Into::into) - } - - fn compute_candidates<'a>( - &'a self, - txn: &'a RoTxn, - indexes: &HashSet, - from: TaskId, - ) -> Result> + 'a> { - let mut candidates = RoaringBitmap::new(); - - for index_uid in indexes { - if let Some(tasks_set) = self.index_uid_task_ids.get(txn, index_uid)? { - candidates |= tasks_set; - } - } - - candidates.remove_range((Excluded(from), Unbounded)); - - let iter = candidates - .into_iter() - .rev() - .filter_map(|id| self.get(txn, id).transpose()); - - Ok(iter) - } -} - -#[cfg(test)] -pub mod test { - use itertools::Itertools; - use meilisearch_types::index_uid::IndexUid; - use milli::heed::EnvOpenOptions; - use nelson::Mocker; - use tempfile::TempDir; - - use crate::tasks::task::TaskContent; - - use super::*; - - /// TODO: use this mock to test the task store properly. - #[allow(dead_code)] - pub enum MockStore { - Real(Store), - Fake(Mocker), - } - - pub struct TmpEnv(TempDir, Arc); - - impl TmpEnv { - pub fn env(&self) -> Arc { - self.1.clone() - } - } - - pub fn tmp_env() -> TmpEnv { - let tmp = tempfile::tempdir().unwrap(); - - let mut options = EnvOpenOptions::new(); - options.map_size(4096 * 100000); - options.max_dbs(1000); - let env = Arc::new(options.open(tmp.path()).unwrap()); - - TmpEnv(tmp, env) - } - - impl MockStore { - pub fn new(env: Arc) -> Result { - Ok(Self::Real(Store::new(env)?)) - } - - pub fn wtxn(&self) -> Result { - match self { - MockStore::Real(index) => index.wtxn(), - MockStore::Fake(_) => todo!(), - } - } - - pub fn rtxn(&self) -> Result { - match self { - MockStore::Real(index) => index.rtxn(), - MockStore::Fake(_) => todo!(), - } - } - - pub fn next_task_id(&self, txn: &mut RwTxn) -> Result { - match self { - MockStore::Real(index) => index.next_task_id(txn), - MockStore::Fake(_) => todo!(), - } - } - - pub fn put(&self, txn: &mut RwTxn, task: &Task) -> Result<()> { - match self { - MockStore::Real(index) => index.put(txn, task), - MockStore::Fake(_) => todo!(), - } - } - - pub fn get(&self, txn: &RoTxn, id: TaskId) -> Result> { - match self { - MockStore::Real(index) => index.get(txn, id), - MockStore::Fake(_) => todo!(), - } - } - - pub fn fetch_unfinished_tasks( - &self, - txn: &RoTxn, - from: Option, - ) -> Result> { - match self { - MockStore::Real(index) => index.fetch_unfinished_tasks(txn, from), - MockStore::Fake(_) => todo!(), - } - } - - pub fn list_tasks( - &self, - txn: &RoTxn, - from: Option, - filter: Option, - limit: Option, - ) -> Result> { - match self { - MockStore::Real(index) => index.list_tasks(txn, from, filter, limit), - MockStore::Fake(_) => todo!(), - } - } - } - - #[test] - fn test_ordered_filtered_updates() { - let tmp = tmp_env(); - let store = Store::new(tmp.env()).unwrap(); - - let tasks = (0..100) - .map(|_| Task { - id: rand::random(), - content: TaskContent::IndexDeletion { - index_uid: IndexUid::new_unchecked("test"), - }, - events: vec![], - }) - .collect::>(); - - let mut txn = store.env.write_txn().unwrap(); - tasks - .iter() - .try_for_each(|t| store.put(&mut txn, t)) - .unwrap(); - - let mut filter = TaskFilter::default(); - filter.filter_index("test".into()); - - let tasks = store.list_tasks(&txn, None, Some(filter), None).unwrap(); - - assert!(tasks - .iter() - .map(|t| t.id) - .tuple_windows() - .all(|(a, b)| a > b)); - } - - #[test] - fn test_filter_same_index_prefix() { - let tmp = tmp_env(); - let store = Store::new(tmp.env()).unwrap(); - - let task_1 = Task { - id: 1, - content: TaskContent::IndexDeletion { - index_uid: IndexUid::new_unchecked("test"), - }, - events: vec![], - }; - - let task_2 = Task { - id: 0, - content: TaskContent::IndexDeletion { - index_uid: IndexUid::new_unchecked("test1"), - }, - events: vec![], - }; - - let mut txn = store.wtxn().unwrap(); - store.put(&mut txn, &task_1).unwrap(); - store.put(&mut txn, &task_2).unwrap(); - - let mut filter = TaskFilter::default(); - filter.filter_index("test".into()); - - let tasks = store.list_tasks(&txn, None, Some(filter), None).unwrap(); - - txn.abort().unwrap(); - assert_eq!(tasks.len(), 1); - assert_eq!(tasks.first().as_ref().unwrap().index_uid().unwrap(), "test"); - - // same thing but invert the ids - let task_1 = Task { - id: 0, - content: TaskContent::IndexDeletion { - index_uid: IndexUid::new_unchecked("test"), - }, - events: vec![], - }; - let task_2 = Task { - id: 1, - content: TaskContent::IndexDeletion { - index_uid: IndexUid::new_unchecked("test1"), - }, - events: vec![], - }; - - let mut txn = store.wtxn().unwrap(); - store.put(&mut txn, &task_1).unwrap(); - store.put(&mut txn, &task_2).unwrap(); - - let mut filter = TaskFilter::default(); - filter.filter_index("test".into()); - - let tasks = store.list_tasks(&txn, None, Some(filter), None).unwrap(); - - assert_eq!(tasks.len(), 1); - assert_eq!(tasks.first().as_ref().unwrap().index_uid().unwrap(), "test"); - } -}