From 9af0a08122e3558d90a0ac32fff63dcc7ba1a912 Mon Sep 17 00:00:00 2001 From: mpostma Date: Mon, 1 Feb 2021 19:51:47 +0100 Subject: [PATCH] post review fixes --- Cargo.lock | 113 ++++++++++-- Cargo.toml | 1 + src/data/mod.rs | 8 +- src/data/search.rs | 24 ++- src/data/updates.rs | 27 ++- .../local_index_controller/index_store.rs | 173 +++++++++--------- .../local_index_controller/mod.rs | 9 +- .../local_index_controller/update_store.rs | 10 - src/index_controller/mod.rs | 25 +-- src/lib.rs | 1 - 10 files changed, 233 insertions(+), 158 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9240548d9..f036664f4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -808,6 +808,12 @@ version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "80115a2dfde04491e181c2440a39e4be26e52d9ca4e92bed213f65b94e0b8db1" +[[package]] +name = "difference" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "524cbf6897b527295dff137cec09ecf3a05f4fddffd7dfcd1585403449e74198" + [[package]] name = "digest" version = "0.8.1" @@ -832,6 +838,12 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "212d0f5754cb6769937f4501cc0e67f4f4483c8d2c3e1e922ee9edbe4ab4c7c0" +[[package]] +name = "downcast" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bb454f0228b18c7f4c3b0ebbee346ed9c52e7443b0999cd543ff3571205701d" + [[package]] name = "either" version = "1.6.1" @@ -937,6 +949,15 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "float-cmp" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1267f4ac4f343772758f7b1bdcbe767c218bbab93bb432acbf5162bbf85a6c4" +dependencies = [ + "num-traits", +] + [[package]] name = "fnv" version = "1.0.7" @@ -953,6 +974,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fragile" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69a039c3498dc930fe810151a34ba0c1c70b02b8625035592e74432f678591f2" + [[package]] name = "fs_extra" version = "1.2.0" @@ -1612,7 +1639,7 @@ checksum = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08" [[package]] name = "meilisearch-error" -version = "0.18.0" +version = "0.19.0" dependencies = [ "actix-http", ] @@ -1652,6 +1679,7 @@ dependencies = [ "memmap", "milli", "mime", + "mockall", "obkv", "once_cell", "page_size", @@ -1724,7 +1752,6 @@ dependencies = [ "bstr", "byte-unit", "byteorder", - "chrono", "crossbeam-channel", "csv", "either", @@ -1754,7 +1781,6 @@ dependencies = [ "roaring", "serde", "serde_json", - "serde_millis", "slice-group-by", "smallstr", "smallvec", @@ -1854,6 +1880,33 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "mockall" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "619634fd9149c4a06e66d8fd9256e85326d8eeee75abee4565ff76c92e4edfe0" +dependencies = [ + "cfg-if 1.0.0", + "downcast", + "fragile", + "lazy_static", + "mockall_derive", + "predicates", + "predicates-tree", +] + +[[package]] +name = "mockall_derive" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83714c95dbf4c24202f0f1b208f0f248e6bd65abfa8989303611a71c0f781548" +dependencies = [ + "cfg-if 1.0.0", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "near-proximity" version = "0.1.0" @@ -1885,6 +1938,12 @@ dependencies = [ "libc", ] +[[package]] +name = "normalize-line-endings" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61807f77802ff30975e01f4f071c8ba10c022052f98b3294119f3e615d13e5be" + [[package]] name = "num-integer" version = "0.1.44" @@ -2153,6 +2212,35 @@ version = "0.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857" +[[package]] +name = "predicates" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eeb433456c1a57cc93554dea3ce40b4c19c4057e41c55d4a0f3d84ea71c325aa" +dependencies = [ + "difference", + "float-cmp", + "normalize-line-endings", + "predicates-core", + "regex", +] + +[[package]] +name = "predicates-core" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57e35a3326b75e49aa85f5dc6ec15b41108cf5aee58eabb1f274dd18b73c2451" + +[[package]] +name = "predicates-tree" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15f553275e5721409451eb85e15fd9a860a6e5ab4496eb215987502b5f5391f2" +dependencies = [ + "predicates-core", + "treeline", +] + [[package]] name = "proc-macro-error" version = "1.0.4" @@ -2447,9 +2535,9 @@ checksum = "21215c1b9d8f7832b433255bd9eea3e2779aa55b21b2f8e13aad62c74749b237" [[package]] name = "roaring" -version = "0.6.2" +version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb550891a98438463978260676feef06c12bfb0eb0b05e191f888fb785cc9374" +checksum = "4d60b41c8f25d07cecab125cb46ebbf234fc055effc61ca2392a3ef4f9422304" dependencies = [ "byteorder", ] @@ -2590,15 +2678,6 @@ dependencies = [ "serde", ] -[[package]] -name = "serde_millis" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6e2dc780ca5ee2c369d1d01d100270203c4ff923d2a4264812d723766434d00" -dependencies = [ - "serde", -] - [[package]] name = "serde_url_params" version = "0.2.0" @@ -3139,6 +3218,12 @@ dependencies = [ "tracing", ] +[[package]] +name = "treeline" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7f741b240f1a48843f9b8e0444fb55fb2a4ff67293b50a9179dfd5ea67f8d41" + [[package]] name = "trust-dns-proto" version = "0.19.6" diff --git a/Cargo.toml b/Cargo.toml index 1d7590d47..ad2d034ad 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -72,6 +72,7 @@ serde_url_params = "0.2.0" tempdir = "0.3.7" assert-json-diff = { branch = "master", git = "https://github.com/qdequele/assert-json-diff" } tokio = { version = "0.2", features = ["macros", "time"] } +mockall = "0.9.0" [features] default = ["sentry"] diff --git a/src/data/mod.rs b/src/data/mod.rs index 175aedba5..de24d0a06 100644 --- a/src/data/mod.rs +++ b/src/data/mod.rs @@ -3,14 +3,14 @@ mod updates; pub use search::{SearchQuery, SearchResult}; +use std::fs::create_dir_all; use std::ops::Deref; use std::sync::Arc; -use std::fs::create_dir_all; use sha2::Digest; -use crate::{option::Opt, index_controller::Settings}; use crate::index_controller::{IndexController, LocalIndexController}; +use crate::{option::Opt, index_controller::Settings}; #[derive(Clone)] pub struct Data { @@ -67,7 +67,7 @@ impl Data { options.max_mdb_size.get_bytes(), options.max_udb_size.get_bytes(), )?; - let indexes = Arc::new(index_controller); + let index_controller = Arc::new(index_controller); let mut api_keys = ApiKeys { master: options.clone().master_key, @@ -77,7 +77,7 @@ impl Data { api_keys.generate_missing_api_keys(); - let inner = DataInner { index_controller: indexes, options, api_keys }; + let inner = DataInner { index_controller, options, api_keys }; let inner = Arc::new(inner); Ok(Data { inner }) diff --git a/src/data/search.rs b/src/data/search.rs index 246e3bdac..2e05988aa 100644 --- a/src/data/search.rs +++ b/src/data/search.rs @@ -2,11 +2,11 @@ use std::collections::HashSet; use std::mem; use std::time::Instant; -use serde_json::{Value, Map}; -use serde::{Deserialize, Serialize}; -use milli::{Index, obkv_to_json, FacetCondition}; -use meilisearch_tokenizer::{Analyzer, AnalyzerConfig}; use anyhow::bail; +use meilisearch_tokenizer::{Analyzer, AnalyzerConfig}; +use milli::{Index, obkv_to_json, FacetCondition}; +use serde::{Deserialize, Serialize}; +use serde_json::{Value, Map}; use crate::index_controller::IndexController; use super::Data; @@ -37,7 +37,6 @@ pub struct SearchQuery { impl SearchQuery { pub fn perform(&self, index: impl AsRef) -> anyhow::Result{ let index = index.as_ref(); - let before_search = Instant::now(); let rtxn = index.read_txn().unwrap(); @@ -47,6 +46,9 @@ impl SearchQuery { search.query(query); } + search.limit(self.limit); + search.offset(self.offset.unwrap_or_default()); + if let Some(ref condition) = self.facet_condition { if !condition.trim().is_empty() { let condition = FacetCondition::from_str(&rtxn, &index, &condition).unwrap(); @@ -54,11 +56,7 @@ impl SearchQuery { } } - if let Some(offset) = self.offset { - search.offset(offset); - } - - let milli::SearchResult { documents_ids, found_words, nb_hits, limit, } = search.execute()?; + let milli::SearchResult { documents_ids, found_words, candidates } = search.execute()?; let mut documents = Vec::new(); let fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); @@ -81,9 +79,9 @@ impl SearchQuery { Ok(SearchResult { hits: documents, - nb_hits, + nb_hits: candidates.len(), query: self.q.clone().unwrap_or_default(), - limit, + limit: self.limit, offset: self.offset.unwrap_or_default(), processing_time_ms: before_search.elapsed().as_millis(), }) @@ -94,7 +92,7 @@ impl SearchQuery { #[serde(rename_all = "camelCase")] pub struct SearchResult { hits: Vec>, - nb_hits: usize, + nb_hits: u64, query: String, limit: usize, offset: usize, diff --git a/src/data/updates.rs b/src/data/updates.rs index 194ec346b..27fc6537e 100644 --- a/src/data/updates.rs +++ b/src/data/updates.rs @@ -6,21 +6,20 @@ use futures_util::stream::StreamExt; use tokio::io::AsyncWriteExt; use super::Data; -use crate::index_controller::{IndexController, Settings, UpdateResult, UpdateMeta}; -use crate::index_controller::updates::UpdateStatus; +use crate::index_controller::{IndexController, Settings}; +use crate::index_controller::UpdateStatus; impl Data { - pub async fn add_documents( + pub async fn add_documents( &self, - index: S, + index: impl AsRef + Send + Sync + 'static, method: IndexDocumentsMethod, format: UpdateFormat, mut stream: impl futures::Stream> + Unpin, - ) -> anyhow::Result> + ) -> anyhow::Result where B: Deref, E: std::error::Error + Send + Sync + 'static, - S: AsRef + Send + Sync + 'static, { let file = tokio::task::spawn_blocking(tempfile::tempfile).await?; let file = tokio::fs::File::from_std(file?); @@ -38,26 +37,26 @@ impl Data { let mmap = unsafe { memmap::Mmap::map(&file)? }; let index_controller = self.index_controller.clone(); - let update = tokio::task::spawn_blocking(move ||index_controller.add_documents(index, method, format, &mmap[..])).await??; + let update = tokio::task::spawn_blocking(move || index_controller.add_documents(index, method, format, &mmap[..])).await??; Ok(update.into()) } - pub async fn update_settings + Send + Sync + 'static>( + pub async fn update_settings( &self, - index: S, + index: impl AsRef + Send + Sync + 'static, settings: Settings - ) -> anyhow::Result> { - let indexes = self.index_controller.clone(); - let update = tokio::task::spawn_blocking(move || indexes.update_settings(index, settings)).await??; + ) -> anyhow::Result { + let index_controller = self.index_controller.clone(); + let update = tokio::task::spawn_blocking(move || index_controller.update_settings(index, settings)).await??; Ok(update.into()) } #[inline] - pub fn get_update_status>(&self, index: S, uid: u64) -> anyhow::Result>> { + pub fn get_update_status(&self, index: impl AsRef, uid: u64) -> anyhow::Result> { self.index_controller.update_status(index, uid) } - pub fn get_updates_status(&self, index: &str) -> anyhow::Result>> { + pub fn get_updates_status(&self, index: impl AsRef) -> anyhow::Result> { self.index_controller.all_update_status(index) } } diff --git a/src/index_controller/local_index_controller/index_store.rs b/src/index_controller/local_index_controller/index_store.rs index c96884155..16df83d2c 100644 --- a/src/index_controller/local_index_controller/index_store.rs +++ b/src/index_controller/local_index_controller/index_store.rs @@ -1,15 +1,13 @@ +use std::fs::{create_dir_all, remove_dir_all}; use std::path::{Path, PathBuf}; -use std::fs::create_dir_all; use std::sync::Arc; -use dashmap::DashMap; -use dashmap::mapref::entry::Entry; +use dashmap::{DashMap, mapref::entry::Entry}; use heed::{Env, EnvOpenOptions, Database, types::{Str, SerdeJson, ByteSlice}, RoTxn, RwTxn}; use milli::Index; use rayon::ThreadPool; -use uuid::Uuid; use serde::{Serialize, Deserialize}; -use log::warn; +use uuid::Uuid; use crate::option::IndexerOpts; use super::update_handler::UpdateHandler; @@ -29,7 +27,7 @@ impl IndexMeta { &self, path: impl AsRef, thread_pool: Arc, - opt: &IndexerOpts, + indexer_options: &IndexerOpts, ) -> anyhow::Result<(Arc, Arc)> { let update_path = make_update_db_path(&path, &self.uuid); let index_path = make_index_db_path(&path, &self.uuid); @@ -43,24 +41,25 @@ impl IndexMeta { let mut options = EnvOpenOptions::new(); options.map_size(self.update_size as usize); - let handler = UpdateHandler::new(opt, index.clone(), thread_pool)?; + let handler = UpdateHandler::new(indexer_options, index.clone(), thread_pool)?; let update_store = UpdateStore::open(options, update_path, handler)?; + Ok((index, update_store)) } } pub struct IndexStore { env: Env, - name_to_uid_db: Database, - uid_to_index: DashMap, Arc)>, - uid_to_index_db: Database>, + name_to_uuid_db: Database, + uuid_to_index: DashMap, Arc)>, + uuid_to_index_db: Database>, thread_pool: Arc, - opt: IndexerOpts, + indexer_options: IndexerOpts, } impl IndexStore { - pub fn new(path: impl AsRef, opt: IndexerOpts) -> anyhow::Result { + pub fn new(path: impl AsRef, indexer_options: IndexerOpts) -> anyhow::Result { let env = EnvOpenOptions::new() .map_size(4096 * 100) .max_dbs(2) @@ -71,23 +70,23 @@ impl IndexStore { let uid_to_index_db = open_or_create_database(&env, Some("uid_to_index_db"))?; let thread_pool = rayon::ThreadPoolBuilder::new() - .num_threads(opt.indexing_jobs.unwrap_or(0)) + .num_threads(indexer_options.indexing_jobs.unwrap_or(0)) .build()?; let thread_pool = Arc::new(thread_pool); Ok(Self { env, - name_to_uid_db, - uid_to_index, - uid_to_index_db, + name_to_uuid_db: name_to_uid_db, + uuid_to_index: uid_to_index, + uuid_to_index_db: uid_to_index_db, thread_pool, - opt, + indexer_options, }) } - fn index_uid(&self, txn: &RoTxn, name: impl AsRef) -> anyhow::Result> { - match self.name_to_uid_db.get(txn, name.as_ref())? { + fn index_uuid(&self, txn: &RoTxn, name: impl AsRef) -> anyhow::Result> { + match self.name_to_uuid_db.get(txn, name.as_ref())? { Some(bytes) => { let uuid = Uuid::from_slice(bytes)?; Ok(Some(uuid)) @@ -97,12 +96,12 @@ impl IndexStore { } fn retrieve_index(&self, txn: &RoTxn, uid: Uuid) -> anyhow::Result, Arc)>> { - match self.uid_to_index.entry(uid.clone()) { + match self.uuid_to_index.entry(uid.clone()) { Entry::Vacant(entry) => { - match self.uid_to_index_db.get(txn, uid.as_bytes())? { + match self.uuid_to_index_db.get(txn, uid.as_bytes())? { Some(meta) => { let path = self.env.path(); - let (index, updates) = meta.open(path, self.thread_pool.clone(), &self.opt)?; + let (index, updates) = meta.open(path, self.thread_pool.clone(), &self.indexer_options)?; entry.insert((index.clone(), updates.clone())); Ok(Some((index, updates))) }, @@ -117,7 +116,7 @@ impl IndexStore { } fn _get_index(&self, txn: &RoTxn, name: impl AsRef) -> anyhow::Result, Arc)>> { - match self.index_uid(&txn, name)? { + match self.index_uuid(&txn, name)? { Some(uid) => self.retrieve_index(&txn, uid), None => Ok(None), } @@ -129,59 +128,61 @@ impl IndexStore { } pub fn get_or_create_index( - &self, name: impl AsRef, - update_size: u64, - index_size: u64, - ) -> anyhow::Result<(Arc, Arc)> { - let mut txn = self.env.write_txn()?; - match self._get_index(&txn, name.as_ref())? { - Some(res) => Ok(res), - None => { - let uid = Uuid::new_v4(); - // TODO: clean in case of error - let result = self.create_index(&mut txn, uid, name, update_size, index_size); - match result { - Ok((index, update_store)) => { - match txn.commit() { - Ok(_) => Ok((index, update_store)), - Err(e) => { - self.clean_uid(&uid); - Err(anyhow::anyhow!("error creating index: {}", e)) - } - } - } - Err(e) => { - self.clean_uid(&uid); - Err(e) - } - } - }, - } - } - - /// removes all data acociated with an index Uuid. This is called when index creation failed - /// and outstanding files and data need to be cleaned. - fn clean_uid(&self, _uid: &Uuid) { - // TODO! - warn!("creating cleanup is not yet implemented"); - } - - fn create_index( &self, - txn: &mut RwTxn, - uid: Uuid, + &self, name: impl AsRef, update_size: u64, index_size: u64, ) -> anyhow::Result<(Arc, Arc)> { - let meta = IndexMeta { update_size, index_size, uuid: uid.clone() }; + let mut txn = self.env.write_txn()?; + match self._get_index(&txn, name.as_ref())? { + Some(res) => Ok(res), + None => { + let uuid = Uuid::new_v4(); + let result = self.create_index(&mut txn, uuid, name, update_size, index_size)?; + // If we fail to commit the transaction, we must delete the database from the + // file-system. + if let Err(e) = txn.commit() { + self.clean_db(uuid); + return Err(e)?; + } + Ok(result) + }, + } + } - self.name_to_uid_db.put(txn, name.as_ref(), uid.as_bytes())?; - self.uid_to_index_db.put(txn, uid.as_bytes(), &meta)?; + // Remove all the files and data associated with a db uuid. + fn clean_db(&self, uuid: Uuid) { + let update_db_path = make_update_db_path(self.env.path(), &uuid); + let index_db_path = make_index_db_path(self.env.path(), &uuid); + + remove_dir_all(update_db_path).expect("Failed to clean database"); + remove_dir_all(index_db_path).expect("Failed to clean database"); + + self.uuid_to_index.remove(&uuid); + } + + fn create_index( &self, + txn: &mut RwTxn, + uuid: Uuid, + name: impl AsRef, + update_size: u64, + index_size: u64, + ) -> anyhow::Result<(Arc, Arc)> { + let meta = IndexMeta { update_size, index_size, uuid: uuid.clone() }; + + self.name_to_uuid_db.put(txn, name.as_ref(), uuid.as_bytes())?; + self.uuid_to_index_db.put(txn, uuid.as_bytes(), &meta)?; let path = self.env.path(); - let (index, update_store) = meta.open(path, self.thread_pool.clone(), &self.opt)?; + let (index, update_store) = match meta.open(path, self.thread_pool.clone(), &self.indexer_options) { + Ok(res) => res, + Err(e) => { + self.clean_db(uuid); + return Err(e) + } + }; - self.uid_to_index.insert(uid, (index.clone(), update_store.clone())); + self.uuid_to_index.insert(uuid, (index.clone(), update_store.clone())); Ok((index, update_store)) } @@ -194,15 +195,15 @@ fn open_or_create_database(env: &Env, name: Option<&str> } } -fn make_update_db_path(path: impl AsRef, uid: &Uuid) -> PathBuf { +fn make_update_db_path(path: impl AsRef, uuid: &Uuid) -> PathBuf { let mut path = path.as_ref().to_path_buf(); - path.push(format!("update{}", uid)); + path.push(format!("update{}", uuid)); path } -fn make_index_db_path(path: impl AsRef, uid: &Uuid) -> PathBuf { +fn make_index_db_path(path: impl AsRef, uuid: &Uuid) -> PathBuf { let mut path = path.as_ref().to_path_buf(); - path.push(format!("index{}", uid)); + path.push(format!("index{}", uuid)); path } @@ -240,18 +241,18 @@ mod test { let name = "foobar"; let txn = store.env.read_txn().unwrap(); // name is not found if the uuid in not present in the db - assert!(store.index_uid(&txn, &name).unwrap().is_none()); + assert!(store.index_uuid(&txn, &name).unwrap().is_none()); drop(txn); // insert an uuid in the the name_to_uuid_db: let uuid = Uuid::new_v4(); let mut txn = store.env.write_txn().unwrap(); - store.name_to_uid_db.put(&mut txn, &name, uuid.as_bytes()).unwrap(); + store.name_to_uuid_db.put(&mut txn, &name, uuid.as_bytes()).unwrap(); txn.commit().unwrap(); // check that the uuid is there let txn = store.env.read_txn().unwrap(); - assert_eq!(store.index_uid(&txn, &name).unwrap(), Some(uuid)); + assert_eq!(store.index_uuid(&txn, &name).unwrap(), Some(uuid)); } #[test] @@ -265,15 +266,15 @@ mod test { let meta = IndexMeta { update_size: 4096 * 100, index_size: 4096 * 100, uuid: uuid.clone() }; let mut txn = store.env.write_txn().unwrap(); - store.uid_to_index_db.put(&mut txn, uuid.as_bytes(), &meta).unwrap(); + store.uuid_to_index_db.put(&mut txn, uuid.as_bytes(), &meta).unwrap(); txn.commit().unwrap(); // the index cache should be empty - assert!(store.uid_to_index.is_empty()); + assert!(store.uuid_to_index.is_empty()); let txn = store.env.read_txn().unwrap(); assert!(store.retrieve_index(&txn, uuid).unwrap().is_some()); - assert_eq!(store.uid_to_index.len(), 1); + assert_eq!(store.uuid_to_index.len(), 1); } #[test] @@ -287,8 +288,8 @@ mod test { let uuid = Uuid::new_v4(); let meta = IndexMeta { update_size: 4096 * 100, index_size: 4096 * 100, uuid: uuid.clone() }; let mut txn = store.env.write_txn().unwrap(); - store.name_to_uid_db.put(&mut txn, &name, uuid.as_bytes()).unwrap(); - store.uid_to_index_db.put(&mut txn, uuid.as_bytes(), &meta).unwrap(); + store.name_to_uuid_db.put(&mut txn, &name, uuid.as_bytes()).unwrap(); + store.uuid_to_index_db.put(&mut txn, uuid.as_bytes(), &meta).unwrap(); txn.commit().unwrap(); assert!(store.index(&name).unwrap().is_some()); @@ -302,12 +303,12 @@ mod test { store.get_or_create_index(&name, 4096 * 100, 4096 * 100).unwrap(); let txn = store.env.read_txn().unwrap(); - let uuid = store.name_to_uid_db.get(&txn, &name).unwrap(); - assert_eq!(store.uid_to_index.len(), 1); + let uuid = store.name_to_uuid_db.get(&txn, &name).unwrap(); + assert_eq!(store.uuid_to_index.len(), 1); assert!(uuid.is_some()); let uuid = Uuid::from_slice(uuid.unwrap()).unwrap(); let meta = IndexMeta { update_size: 4096 * 100, index_size: 4096 * 100, uuid: uuid.clone() }; - assert_eq!(store.uid_to_index_db.get(&txn, uuid.as_bytes()).unwrap(), Some(meta)); + assert_eq!(store.uuid_to_index_db.get(&txn, uuid.as_bytes()).unwrap(), Some(meta)); } #[test] @@ -321,12 +322,12 @@ mod test { let uuid = Uuid::new_v4(); let mut txn = store.env.write_txn().unwrap(); store.create_index(&mut txn, uuid, name, update_size, index_size).unwrap(); - let uuid = store.name_to_uid_db.get(&txn, &name).unwrap(); - assert_eq!(store.uid_to_index.len(), 1); + let uuid = store.name_to_uuid_db.get(&txn, &name).unwrap(); + assert_eq!(store.uuid_to_index.len(), 1); assert!(uuid.is_some()); let uuid = Uuid::from_slice(uuid.unwrap()).unwrap(); let meta = IndexMeta { update_size , index_size, uuid: uuid.clone() }; - assert_eq!(store.uid_to_index_db.get(&txn, uuid.as_bytes()).unwrap(), Some(meta)); + assert_eq!(store.uuid_to_index_db.get(&txn, uuid.as_bytes()).unwrap(), Some(meta)); } } } diff --git a/src/index_controller/local_index_controller/mod.rs b/src/index_controller/local_index_controller/mod.rs index debc15a1a..b59eb2a99 100644 --- a/src/index_controller/local_index_controller/mod.rs +++ b/src/index_controller/local_index_controller/mod.rs @@ -2,16 +2,15 @@ mod update_store; mod index_store; mod update_handler; -use index_store::IndexStore; - use std::path::Path; use std::sync::Arc; -use milli::Index; use anyhow::bail; use itertools::Itertools; +use milli::Index; use crate::option::IndexerOpts; +use index_store::IndexStore; use super::IndexController; use super::updates::UpdateStatus; use super::{UpdateMeta, UpdateResult}; @@ -84,7 +83,7 @@ impl IndexController for LocalIndexController { } fn all_update_status(&self, index: impl AsRef) -> anyhow::Result>> { - match self.indexes.index(index)? { + match self.indexes.index(&index)? { Some((_, update_store)) => { let updates = update_store.iter_metas(|processing, processed, pending, aborted, failed| { Ok(processing @@ -99,7 +98,7 @@ impl IndexController for LocalIndexController { })?; Ok(updates) } - None => Ok(Vec::new()) + None => bail!("index {} doesn't exist.", index.as_ref()), } } diff --git a/src/index_controller/local_index_controller/update_store.rs b/src/index_controller/local_index_controller/update_store.rs index 9a17ec00f..d4b796993 100644 --- a/src/index_controller/local_index_controller/update_store.rs +++ b/src/index_controller/local_index_controller/update_store.rs @@ -192,16 +192,6 @@ where } } - /// The id and metadata of the update that is currently being processed, - /// `None` if no update is being processed. - pub fn processing_update(&self) -> heed::Result>> { - let rtxn = self.env.read_txn()?; - match self.pending_meta.first(&rtxn)? { - Some((_, meta)) => Ok(Some(meta)), - None => Ok(None), - } - } - /// Execute the user defined function with the meta-store iterators, the first /// iterator is the *processed* meta one, the second the *aborted* meta one /// and, the last is the *pending* meta one. diff --git a/src/index_controller/mod.rs b/src/index_controller/mod.rs index f1ba8f7ce..0ea654dfb 100644 --- a/src/index_controller/mod.rs +++ b/src/index_controller/mod.rs @@ -1,5 +1,5 @@ mod local_index_controller; -pub mod updates; +mod updates; pub use local_index_controller::LocalIndexController; @@ -12,7 +12,9 @@ use milli::Index; use milli::update::{IndexDocumentsMethod, UpdateFormat, DocumentAdditionResult}; use serde::{Serialize, Deserialize, de::Deserializer}; -use updates::{Processed, Processing, Failed, UpdateStatus}; +pub use updates::{Processed, Processing, Failed}; + +pub type UpdateStatus = updates::UpdateStatus; #[derive(Debug, Clone, Serialize, Deserialize)] @@ -85,9 +87,10 @@ pub enum UpdateResult { } /// The `IndexController` is in charge of the access to the underlying indices. It splits the logic -/// for read access which is provided, and write access which must be provided. This allows the -/// implementer to define the behaviour of write accesses to the indices, and abstract the -/// scheduling of the updates. The implementer must be able to provide an instance of `IndexStore` +/// for read access which is provided thanks to an handle to the index, and write access which must +/// be provided. This allows the implementer to define the behaviour of write accesses to the +/// indices, and abstract the scheduling of the updates. The implementer must be able to provide an +/// instance of `IndexStore` pub trait IndexController { /* @@ -106,11 +109,11 @@ pub trait IndexController { method: IndexDocumentsMethod, format: UpdateFormat, data: &[u8], - ) -> anyhow::Result>; + ) -> anyhow::Result; /// Updates an index settings. If the index does not exist, it will be created when the update /// is applied to the index. - fn update_settings>(&self, index_uid: S, settings: Settings) -> anyhow::Result>; + fn update_settings>(&self, index_uid: S, settings: Settings) -> anyhow::Result; /// Create an index with the given `index_uid`. fn create_index>(&self, index_uid: S) -> Result<()>; @@ -133,9 +136,9 @@ pub trait IndexController { todo!() } - /// Returns, if it exists, an `IndexView` to the requested index. - fn index(&self, uid: impl AsRef) -> anyhow::Result>>; + /// Returns, if it exists, the `Index` with the povided name. + fn index(&self, name: impl AsRef) -> anyhow::Result>>; - fn update_status(&self, index: impl AsRef, id: u64) -> anyhow::Result>>; - fn all_update_status(&self, index: impl AsRef) -> anyhow::Result>>; + fn update_status(&self, index: impl AsRef, id: u64) -> anyhow::Result>; + fn all_update_status(&self, index: impl AsRef) -> anyhow::Result>; } diff --git a/src/lib.rs b/src/lib.rs index df9381914..d542fd6d7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,7 +5,6 @@ pub mod error; pub mod helpers; pub mod option; pub mod routes; -//mod updates; mod index_controller; use actix_http::Error;