From ddd778971351d8eb5b937813ea5b375b1af9b68c Mon Sep 17 00:00:00 2001 From: mpostma Date: Wed, 13 Jan 2021 17:50:36 +0100 Subject: [PATCH] WIP: IndexController --- Cargo.lock | 28 +++--- Cargo.toml | 4 +- src/data/mod.rs | 26 +++-- src/data/search.rs | 36 +++---- src/index_controller/mod.rs | 187 ++++++++++++++++++++++++++++++++++++ src/lib.rs | 4 +- 6 files changed, 230 insertions(+), 55 deletions(-) create mode 100644 src/index_controller/mod.rs diff --git a/Cargo.lock b/Cargo.lock index f717be694..4415db555 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -777,6 +777,16 @@ dependencies = [ "memchr", ] +[[package]] +name = "dashmap" +version = "4.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e77a43b28d0668df09411cb0bc9a8c2adc40f9a048afe863e05fd43251e8e39c" +dependencies = [ + "cfg-if 1.0.0", + "num_cpus", +] + [[package]] name = "debugid" version = "0.7.2" @@ -1201,7 +1211,6 @@ dependencies = [ "lmdb-rkv-sys", "once_cell", "page_size", - "serde", "synchronoise", "url", "zerocopy", @@ -1588,7 +1597,7 @@ checksum = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08" [[package]] name = "meilisearch-error" -version = "0.15.0" +version = "0.18.0" dependencies = [ "actix-http", ] @@ -1609,6 +1618,7 @@ dependencies = [ "bytes 0.6.0", "chrono", "crossbeam-channel", + "dashmap", "env_logger 0.8.2", "flate2", "fst", @@ -1627,6 +1637,7 @@ dependencies = [ "milli", "mime", "once_cell", + "page_size", "rand 0.7.3", "rayon", "regex", @@ -1699,7 +1710,6 @@ dependencies = [ "bstr", "byte-unit", "byteorder", - "chrono", "crossbeam-channel", "csv", "either", @@ -1714,13 +1724,13 @@ dependencies = [ "levenshtein_automata", "linked-hash-map", "log", - "meilisearch-tokenizer", "memmap", "near-proximity", "num-traits", "obkv", "once_cell", "ordered-float", + "page_size", "pest 2.1.3 (git+https://github.com/pest-parser/pest.git?rev=51fd1d49f1041f7839975664ef71fe15c7dcaf67)", "pest_derive", "rayon", @@ -1729,7 +1739,6 @@ dependencies = [ "roaring", "serde", "serde_json", - "serde_millis", "slice-group-by", "smallstr", "smallvec", @@ -2596,15 +2605,6 @@ dependencies = [ "serde", ] -[[package]] -name = "serde_millis" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6e2dc780ca5ee2c369d1d01d100270203c4ff923d2a4264812d723766434d00" -dependencies = [ - "serde", -] - [[package]] name = "serde_qs" version = "0.8.2" diff --git a/Cargo.toml b/Cargo.toml index fb8531a96..47bbc09df 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,7 +30,7 @@ fst = "0.4.5" futures = "0.3.7" futures-util = "0.3.8" grenad = { git = "https://github.com/Kerollmops/grenad.git", rev = "3adcb26" } -heed = "0.10.6" +heed = { version = "0.10.6", default-features = false, features = ["lmdb", "sync-read-txn"] } http = "0.2.1" indexmap = { version = "1.3.2", features = ["serde-1"] } log = "0.4.8" @@ -58,6 +58,8 @@ tokio = { version = "0.2", features = ["full"] } ureq = { version = "1.5.1", default-features = false, features = ["tls"] } walkdir = "2.3.1" whoami = "1.0.0" +dashmap = "4.0.2" +page_size = "0.4.2" [dependencies.sentry] default-features = false diff --git a/src/data/mod.rs b/src/data/mod.rs index 9d64052af..944690e0c 100644 --- a/src/data/mod.rs +++ b/src/data/mod.rs @@ -3,7 +3,6 @@ mod updates; pub use search::{SearchQuery, SearchResult}; -use std::collections::HashMap; use std::fs::create_dir_all; use std::ops::Deref; use std::sync::Arc; @@ -13,6 +12,7 @@ use sha2::Digest; use crate::{option::Opt, updates::Settings}; use crate::updates::UpdateQueue; +use crate::index_controller::IndexController; #[derive(Clone)] pub struct Data { @@ -29,7 +29,7 @@ impl Deref for Data { #[derive(Clone)] pub struct DataInner { - pub indexes: Arc, + pub indexes: Arc, pub update_queue: Arc, api_keys: ApiKeys, options: Opt, @@ -62,9 +62,7 @@ impl ApiKeys { impl Data { pub fn new(options: Opt) -> anyhow::Result { let db_size = options.max_mdb_size.get_bytes() as usize; - let path = options.db_path.join("main"); - create_dir_all(&path)?; - let indexes = Index::new(&path, Some(db_size))?; + let indexes = IndexController::new(&options.db_path)?; let indexes = Arc::new(indexes); let update_queue = Arc::new(UpdateQueue::new(&options, indexes.clone())?); @@ -90,28 +88,26 @@ impl Data { let displayed_attributes = self.indexes .displayed_fields(&txn)? - .map(|fields| {println!("{:?}", fields); fields.iter().filter_map(|f| fields_map.name(*f).map(String::from)).collect()}) + .map(|fields| fields.into_iter().map(String::from).collect()) .unwrap_or_else(|| vec!["*".to_string()]); let searchable_attributes = self.indexes .searchable_fields(&txn)? .map(|fields| fields - .iter() - .filter_map(|f| fields_map.name(*f).map(String::from)) + .into_iter() + .map(String::from) .collect()) .unwrap_or_else(|| vec!["*".to_string()]); - let faceted_attributes = self.indexes - .faceted_fields(&txn)? - .iter() - .filter_map(|(f, t)| Some((fields_map.name(*f)?.to_string(), t.to_string()))) - .collect::>() - .into(); + let faceted_attributes = self.indexes.faceted_fields(&txn)? + .into_iter() + .map(|(k, v)| (k, v.to_string())) + .collect(); Ok(Settings { displayed_attributes: Some(Some(displayed_attributes)), searchable_attributes: Some(Some(searchable_attributes)), - faceted_attributes: Some(faceted_attributes), + faceted_attributes: Some(Some(faceted_attributes)), criteria: None, }) } diff --git a/src/data/search.rs b/src/data/search.rs index bd22a959b..44e0a54e6 100644 --- a/src/data/search.rs +++ b/src/data/search.rs @@ -1,4 +1,3 @@ -use std::borrow::Cow; use std::collections::HashSet; use std::mem; use std::time::Instant; @@ -8,17 +7,22 @@ use serde::{Deserialize, Serialize}; use milli::{SearchResult as Results, obkv_to_json}; use meilisearch_tokenizer::{Analyzer, AnalyzerConfig}; +use crate::error::Error; + use super::Data; const DEFAULT_SEARCH_LIMIT: usize = 20; +const fn default_search_limit() -> usize { DEFAULT_SEARCH_LIMIT } + #[derive(Deserialize)] #[serde(rename_all = "camelCase", deny_unknown_fields)] #[allow(dead_code)] pub struct SearchQuery { q: Option, offset: Option, - limit: Option, + #[serde(default = "default_search_limit")] + limit: usize, attributes_to_retrieve: Option>, attributes_to_crop: Option>, crop_length: Option, @@ -100,30 +104,18 @@ impl<'a, A: AsRef<[u8]>> Highlighter<'a, A> { } impl Data { - pub fn search>(&self, _index: S, search_query: SearchQuery) -> anyhow::Result { + pub fn search>(&self, index: S, search_query: SearchQuery) -> anyhow::Result { let start = Instant::now(); - let index = &self.indexes; - let rtxn = index.read_txn()?; - - let mut search = index.search(&rtxn); - if let Some(query) = &search_query.q { - search.query(query); - } - - if let Some(offset) = search_query.offset { - search.offset(offset); - } - - let limit = search_query.limit.unwrap_or(DEFAULT_SEARCH_LIMIT); - search.limit(limit); - - let Results { found_words, documents_ids, nb_hits, .. } = search.execute().unwrap(); + let index = self.indexes + .get(index)? + .ok_or_else(|| Error::OpenIndex(format!("Index {} doesn't exists.", index.as_ref())))?; + let Results { found_words, documents_ids, nb_hits, .. } = index.search(search_query)?; let fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); - let displayed_fields = match index.displayed_fields(&rtxn).unwrap() { - Some(fields) => Cow::Borrowed(fields), - None => Cow::Owned(fields_ids_map.iter().map(|(id, _)| id).collect()), + let displayed_fields = match index.displayed_fields_ids(&rtxn).unwrap() { + Some(fields) => fields, + None => fields_ids_map.iter().map(|(id, _)| id).collect(), }; let attributes_to_highlight = match search_query.attributes_to_highlight { diff --git a/src/index_controller/mod.rs b/src/index_controller/mod.rs new file mode 100644 index 000000000..9dfa23ce5 --- /dev/null +++ b/src/index_controller/mod.rs @@ -0,0 +1,187 @@ +use std::fs::File; +use std::io::{Read, Write}; +use std::path::Path; +use std::ops::Deref; + +use anyhow::Result; +use chrono::{DateTime, Utc}; +use dashmap::DashMap; +use heed::types::{Str, SerdeBincode}; +use heed::{EnvOpenOptions, Env, Database}; +use milli::Index; +use serde::{Serialize, Deserialize}; + +use crate::data::{SearchQuery, SearchResult}; + +const CONTROLLER_META_FILENAME: &str = "index_controller_meta"; +const INDEXES_CONTROLLER_FILENAME: &str = "indexes_db"; +const INDEXES_DB_NAME: &str = "indexes_db"; + +trait UpdateStore {} + +pub struct IndexController { + update_store: U, + env: Env, + indexes_db: Database>, + indexes: DashMap, +} + +#[derive(Debug, Serialize, Deserialize)] +struct IndexControllerMeta { + open_options: EnvOpenOptions, + created_at: DateTime, +} + +impl IndexControllerMeta { + fn from_path(path: impl AsRef) -> Result> { + let path = path.as_ref().to_path_buf().push(CONTROLLER_META_FILENAME); + if path.exists() { + let mut file = File::open(path)?; + let mut buffer = Vec::new(); + let n = file.read_to_end(&mut buffer)?; + let meta: IndexControllerMeta = serde_json::from_slice(&buffer[..n])?; + Ok(Some(meta)) + } else { + Ok(None) + } + } + + fn to_path(self, path: impl AsRef) -> Result<()> { + let path = path.as_ref().to_path_buf().push(CONTROLLER_META_FILENAME); + if path.exists() { + Err(anyhow::anyhow!("Index controller metadata already exists")) + } else { + let mut file = File::create(path)?; + let json = serde_json::to_vec(&self)?; + file.write_all(&json)?; + Ok(()) + } + } +} + +#[derive(Debug, Serialize, Deserialize)] +struct IndexMetadata { + created_at: DateTime, + open_options: EnvOpenOptions, + id: String, +} + +impl IndexMetadata { + fn open_index(&self) -> Result { + todo!() + } +} + +struct IndexView<'a, U> { + txn: heed::RoTxn<'a>, + index: &'a Index, + update_store: &'a U, +} + +struct IndexViewMut<'a, U> { + txn: heed::RwTxn<'a>, + index: &'a Index, + update_store: &'a U, +} + +impl<'a, U> Deref for IndexViewMut<'a, U> { + type Target = IndexView<'a, U>; + + fn deref(&self) -> &Self::Target { + IndexView { + txn: *self.txn, + index: self.index, + update_store: self.update_store, + } + } +} + +impl<'a, U: UpdateStore> IndexView<'a, U> { + fn search(&self, search_query: SearchQuery) -> Result { + let mut search = self.index.search(self.txn); + if let Some(query) = &search_query.q { + search.query(query); + } + + if let Some(offset) = search_query.offset { + search.offset(offset); + } + + let limit = search_query.limit; + search.limit(limit); + + Ok(search.execute()?) + } +} + +impl IndexController { + /// Open the index controller from meta found at path, and create a new one if no meta is + /// found. + pub fn new(path: impl AsRef, update_store: U) -> Result { + // If index controller metadata is present, we return the env, otherwise, we create a new + // metadata from scratch before returning a new env. + let env = match IndexControllerMeta::from_path(path)? { + Some(meta) => meta.open_options.open(INDEXES_CONTROLLER_FILENAME)?, + None => { + let open_options = EnvOpenOptions::new() + .map_size(page_size::get() * 1000); + let env = open_options.open(INDEXES_CONTROLLER_FILENAME)?; + let created_at = Utc::now(); + let meta = IndexControllerMeta { open_options, created_at }; + meta.to_path(path)?; + env + } + }; + let indexes = DashMap::new(); + let indexes_db = match env.open_database(INDEXES_DB_NAME)? { + Some(indexes_db) => indexes_db, + None => env.create_database(INDEXES_DB_NAME)?, + }; + + Ok(Self { env, indexes, indexes_db, update_store }) + } + + pub fn get_or_create>(&mut self, name: S) -> Result> { + todo!() + } + + /// Get an index with read access to the db. The index are lazily loaded, meaning that we first + /// check for its exixtence in the indexes map, and if it doesn't exist, the index db is check + /// for metadata to launch the index. + pub fn get>(&self, name: S) -> Result>> { + match self.indexes.get(name.as_ref()) { + Some(index) => { + let txn = index.read_txn()?; + let update_store = &self.update_store; + Ok(Some(IndexView { index, update_store, txn })) + } + None => { + let txn = self.env.read_txn()?; + match self.indexes_db.get(&txn, name.as_ref())? { + Some(meta) => { + let index = meta.open_index()?; + self.indexes.insert(name.as_ref().to_owned(), index); + Ok(self.indexes.get(name.as_ref())) + } + None => Ok(None) + } + } + } + } + + pub fn get_mut>(&self, name: S) -> Result>> { + todo!() + } + + pub async fn delete_index>(&self, name:S) -> Result<()> { + todo!() + } + + pub async fn list_indices(&self) -> Result> { + todo!() + } + + pub async fn rename_index(&self, old: &str, new: &str) -> Result<()> { + todo!() + } +} diff --git a/src/lib.rs b/src/lib.rs index e0ae9aedb..f5dd79b0b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,9 +6,7 @@ pub mod helpers; pub mod option; pub mod routes; mod updates; -//pub mod analytics; -//pub mod snapshot; -//pub mod dump; +mod index_controller; use actix_http::Error; use actix_service::ServiceFactory;