post review fixes

This commit is contained in:
mpostma 2021-02-01 19:51:47 +01:00
parent 17c463ca61
commit 9af0a08122
No known key found for this signature in database
GPG Key ID: CBC8A7C1D7A28C3A
10 changed files with 233 additions and 158 deletions

113
Cargo.lock generated
View File

@ -808,6 +808,12 @@ version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "80115a2dfde04491e181c2440a39e4be26e52d9ca4e92bed213f65b94e0b8db1" checksum = "80115a2dfde04491e181c2440a39e4be26e52d9ca4e92bed213f65b94e0b8db1"
[[package]]
name = "difference"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "524cbf6897b527295dff137cec09ecf3a05f4fddffd7dfcd1585403449e74198"
[[package]] [[package]]
name = "digest" name = "digest"
version = "0.8.1" version = "0.8.1"
@ -832,6 +838,12 @@ version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "212d0f5754cb6769937f4501cc0e67f4f4483c8d2c3e1e922ee9edbe4ab4c7c0" checksum = "212d0f5754cb6769937f4501cc0e67f4f4483c8d2c3e1e922ee9edbe4ab4c7c0"
[[package]]
name = "downcast"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4bb454f0228b18c7f4c3b0ebbee346ed9c52e7443b0999cd543ff3571205701d"
[[package]] [[package]]
name = "either" name = "either"
version = "1.6.1" version = "1.6.1"
@ -937,6 +949,15 @@ dependencies = [
"miniz_oxide", "miniz_oxide",
] ]
[[package]]
name = "float-cmp"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1267f4ac4f343772758f7b1bdcbe767c218bbab93bb432acbf5162bbf85a6c4"
dependencies = [
"num-traits",
]
[[package]] [[package]]
name = "fnv" name = "fnv"
version = "1.0.7" version = "1.0.7"
@ -953,6 +974,12 @@ dependencies = [
"percent-encoding", "percent-encoding",
] ]
[[package]]
name = "fragile"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69a039c3498dc930fe810151a34ba0c1c70b02b8625035592e74432f678591f2"
[[package]] [[package]]
name = "fs_extra" name = "fs_extra"
version = "1.2.0" version = "1.2.0"
@ -1612,7 +1639,7 @@ checksum = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08"
[[package]] [[package]]
name = "meilisearch-error" name = "meilisearch-error"
version = "0.18.0" version = "0.19.0"
dependencies = [ dependencies = [
"actix-http", "actix-http",
] ]
@ -1652,6 +1679,7 @@ dependencies = [
"memmap", "memmap",
"milli", "milli",
"mime", "mime",
"mockall",
"obkv", "obkv",
"once_cell", "once_cell",
"page_size", "page_size",
@ -1724,7 +1752,6 @@ dependencies = [
"bstr", "bstr",
"byte-unit", "byte-unit",
"byteorder", "byteorder",
"chrono",
"crossbeam-channel", "crossbeam-channel",
"csv", "csv",
"either", "either",
@ -1754,7 +1781,6 @@ dependencies = [
"roaring", "roaring",
"serde", "serde",
"serde_json", "serde_json",
"serde_millis",
"slice-group-by", "slice-group-by",
"smallstr", "smallstr",
"smallvec", "smallvec",
@ -1854,6 +1880,33 @@ dependencies = [
"winapi 0.3.9", "winapi 0.3.9",
] ]
[[package]]
name = "mockall"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "619634fd9149c4a06e66d8fd9256e85326d8eeee75abee4565ff76c92e4edfe0"
dependencies = [
"cfg-if 1.0.0",
"downcast",
"fragile",
"lazy_static",
"mockall_derive",
"predicates",
"predicates-tree",
]
[[package]]
name = "mockall_derive"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "83714c95dbf4c24202f0f1b208f0f248e6bd65abfa8989303611a71c0f781548"
dependencies = [
"cfg-if 1.0.0",
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "near-proximity" name = "near-proximity"
version = "0.1.0" version = "0.1.0"
@ -1885,6 +1938,12 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "normalize-line-endings"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61807f77802ff30975e01f4f071c8ba10c022052f98b3294119f3e615d13e5be"
[[package]] [[package]]
name = "num-integer" name = "num-integer"
version = "0.1.44" version = "0.1.44"
@ -2153,6 +2212,35 @@ version = "0.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857" checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857"
[[package]]
name = "predicates"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eeb433456c1a57cc93554dea3ce40b4c19c4057e41c55d4a0f3d84ea71c325aa"
dependencies = [
"difference",
"float-cmp",
"normalize-line-endings",
"predicates-core",
"regex",
]
[[package]]
name = "predicates-core"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57e35a3326b75e49aa85f5dc6ec15b41108cf5aee58eabb1f274dd18b73c2451"
[[package]]
name = "predicates-tree"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15f553275e5721409451eb85e15fd9a860a6e5ab4496eb215987502b5f5391f2"
dependencies = [
"predicates-core",
"treeline",
]
[[package]] [[package]]
name = "proc-macro-error" name = "proc-macro-error"
version = "1.0.4" version = "1.0.4"
@ -2447,9 +2535,9 @@ checksum = "21215c1b9d8f7832b433255bd9eea3e2779aa55b21b2f8e13aad62c74749b237"
[[package]] [[package]]
name = "roaring" name = "roaring"
version = "0.6.2" version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb550891a98438463978260676feef06c12bfb0eb0b05e191f888fb785cc9374" checksum = "4d60b41c8f25d07cecab125cb46ebbf234fc055effc61ca2392a3ef4f9422304"
dependencies = [ dependencies = [
"byteorder", "byteorder",
] ]
@ -2590,15 +2678,6 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "serde_millis"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6e2dc780ca5ee2c369d1d01d100270203c4ff923d2a4264812d723766434d00"
dependencies = [
"serde",
]
[[package]] [[package]]
name = "serde_url_params" name = "serde_url_params"
version = "0.2.0" version = "0.2.0"
@ -3139,6 +3218,12 @@ dependencies = [
"tracing", "tracing",
] ]
[[package]]
name = "treeline"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a7f741b240f1a48843f9b8e0444fb55fb2a4ff67293b50a9179dfd5ea67f8d41"
[[package]] [[package]]
name = "trust-dns-proto" name = "trust-dns-proto"
version = "0.19.6" version = "0.19.6"

View File

@ -72,6 +72,7 @@ serde_url_params = "0.2.0"
tempdir = "0.3.7" tempdir = "0.3.7"
assert-json-diff = { branch = "master", git = "https://github.com/qdequele/assert-json-diff" } assert-json-diff = { branch = "master", git = "https://github.com/qdequele/assert-json-diff" }
tokio = { version = "0.2", features = ["macros", "time"] } tokio = { version = "0.2", features = ["macros", "time"] }
mockall = "0.9.0"
[features] [features]
default = ["sentry"] default = ["sentry"]

View File

@ -3,14 +3,14 @@ mod updates;
pub use search::{SearchQuery, SearchResult}; pub use search::{SearchQuery, SearchResult};
use std::fs::create_dir_all;
use std::ops::Deref; use std::ops::Deref;
use std::sync::Arc; use std::sync::Arc;
use std::fs::create_dir_all;
use sha2::Digest; use sha2::Digest;
use crate::{option::Opt, index_controller::Settings};
use crate::index_controller::{IndexController, LocalIndexController}; use crate::index_controller::{IndexController, LocalIndexController};
use crate::{option::Opt, index_controller::Settings};
#[derive(Clone)] #[derive(Clone)]
pub struct Data { pub struct Data {
@ -67,7 +67,7 @@ impl Data {
options.max_mdb_size.get_bytes(), options.max_mdb_size.get_bytes(),
options.max_udb_size.get_bytes(), options.max_udb_size.get_bytes(),
)?; )?;
let indexes = Arc::new(index_controller); let index_controller = Arc::new(index_controller);
let mut api_keys = ApiKeys { let mut api_keys = ApiKeys {
master: options.clone().master_key, master: options.clone().master_key,
@ -77,7 +77,7 @@ impl Data {
api_keys.generate_missing_api_keys(); api_keys.generate_missing_api_keys();
let inner = DataInner { index_controller: indexes, options, api_keys }; let inner = DataInner { index_controller, options, api_keys };
let inner = Arc::new(inner); let inner = Arc::new(inner);
Ok(Data { inner }) Ok(Data { inner })

View File

@ -2,11 +2,11 @@ use std::collections::HashSet;
use std::mem; use std::mem;
use std::time::Instant; use std::time::Instant;
use serde_json::{Value, Map};
use serde::{Deserialize, Serialize};
use milli::{Index, obkv_to_json, FacetCondition};
use meilisearch_tokenizer::{Analyzer, AnalyzerConfig};
use anyhow::bail; use anyhow::bail;
use meilisearch_tokenizer::{Analyzer, AnalyzerConfig};
use milli::{Index, obkv_to_json, FacetCondition};
use serde::{Deserialize, Serialize};
use serde_json::{Value, Map};
use crate::index_controller::IndexController; use crate::index_controller::IndexController;
use super::Data; use super::Data;
@ -37,7 +37,6 @@ pub struct SearchQuery {
impl SearchQuery { impl SearchQuery {
pub fn perform(&self, index: impl AsRef<Index>) -> anyhow::Result<SearchResult>{ pub fn perform(&self, index: impl AsRef<Index>) -> anyhow::Result<SearchResult>{
let index = index.as_ref(); let index = index.as_ref();
let before_search = Instant::now(); let before_search = Instant::now();
let rtxn = index.read_txn().unwrap(); let rtxn = index.read_txn().unwrap();
@ -47,6 +46,9 @@ impl SearchQuery {
search.query(query); search.query(query);
} }
search.limit(self.limit);
search.offset(self.offset.unwrap_or_default());
if let Some(ref condition) = self.facet_condition { if let Some(ref condition) = self.facet_condition {
if !condition.trim().is_empty() { if !condition.trim().is_empty() {
let condition = FacetCondition::from_str(&rtxn, &index, &condition).unwrap(); let condition = FacetCondition::from_str(&rtxn, &index, &condition).unwrap();
@ -54,11 +56,7 @@ impl SearchQuery {
} }
} }
if let Some(offset) = self.offset { let milli::SearchResult { documents_ids, found_words, candidates } = search.execute()?;
search.offset(offset);
}
let milli::SearchResult { documents_ids, found_words, nb_hits, limit, } = search.execute()?;
let mut documents = Vec::new(); let mut documents = Vec::new();
let fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); let fields_ids_map = index.fields_ids_map(&rtxn).unwrap();
@ -81,9 +79,9 @@ impl SearchQuery {
Ok(SearchResult { Ok(SearchResult {
hits: documents, hits: documents,
nb_hits, nb_hits: candidates.len(),
query: self.q.clone().unwrap_or_default(), query: self.q.clone().unwrap_or_default(),
limit, limit: self.limit,
offset: self.offset.unwrap_or_default(), offset: self.offset.unwrap_or_default(),
processing_time_ms: before_search.elapsed().as_millis(), processing_time_ms: before_search.elapsed().as_millis(),
}) })
@ -94,7 +92,7 @@ impl SearchQuery {
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct SearchResult { pub struct SearchResult {
hits: Vec<Map<String, Value>>, hits: Vec<Map<String, Value>>,
nb_hits: usize, nb_hits: u64,
query: String, query: String,
limit: usize, limit: usize,
offset: usize, offset: usize,

View File

@ -6,21 +6,20 @@ use futures_util::stream::StreamExt;
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
use super::Data; use super::Data;
use crate::index_controller::{IndexController, Settings, UpdateResult, UpdateMeta}; use crate::index_controller::{IndexController, Settings};
use crate::index_controller::updates::UpdateStatus; use crate::index_controller::UpdateStatus;
impl Data { impl Data {
pub async fn add_documents<B, E, S>( pub async fn add_documents<B, E>(
&self, &self,
index: S, index: impl AsRef<str> + Send + Sync + 'static,
method: IndexDocumentsMethod, method: IndexDocumentsMethod,
format: UpdateFormat, format: UpdateFormat,
mut stream: impl futures::Stream<Item=Result<B, E>> + Unpin, mut stream: impl futures::Stream<Item=Result<B, E>> + Unpin,
) -> anyhow::Result<UpdateStatus<UpdateMeta, UpdateResult, String>> ) -> anyhow::Result<UpdateStatus>
where where
B: Deref<Target = [u8]>, B: Deref<Target = [u8]>,
E: std::error::Error + Send + Sync + 'static, E: std::error::Error + Send + Sync + 'static,
S: AsRef<str> + Send + Sync + 'static,
{ {
let file = tokio::task::spawn_blocking(tempfile::tempfile).await?; let file = tokio::task::spawn_blocking(tempfile::tempfile).await?;
let file = tokio::fs::File::from_std(file?); let file = tokio::fs::File::from_std(file?);
@ -38,26 +37,26 @@ impl Data {
let mmap = unsafe { memmap::Mmap::map(&file)? }; let mmap = unsafe { memmap::Mmap::map(&file)? };
let index_controller = self.index_controller.clone(); let index_controller = self.index_controller.clone();
let update = tokio::task::spawn_blocking(move ||index_controller.add_documents(index, method, format, &mmap[..])).await??; let update = tokio::task::spawn_blocking(move || index_controller.add_documents(index, method, format, &mmap[..])).await??;
Ok(update.into()) Ok(update.into())
} }
pub async fn update_settings<S: AsRef<str> + Send + Sync + 'static>( pub async fn update_settings(
&self, &self,
index: S, index: impl AsRef<str> + Send + Sync + 'static,
settings: Settings settings: Settings
) -> anyhow::Result<UpdateStatus<UpdateMeta, UpdateResult, String>> { ) -> anyhow::Result<UpdateStatus> {
let indexes = self.index_controller.clone(); let index_controller = self.index_controller.clone();
let update = tokio::task::spawn_blocking(move || indexes.update_settings(index, settings)).await??; let update = tokio::task::spawn_blocking(move || index_controller.update_settings(index, settings)).await??;
Ok(update.into()) Ok(update.into())
} }
#[inline] #[inline]
pub fn get_update_status<S: AsRef<str>>(&self, index: S, uid: u64) -> anyhow::Result<Option<UpdateStatus<UpdateMeta, UpdateResult, String>>> { pub fn get_update_status(&self, index: impl AsRef<str>, uid: u64) -> anyhow::Result<Option<UpdateStatus>> {
self.index_controller.update_status(index, uid) self.index_controller.update_status(index, uid)
} }
pub fn get_updates_status(&self, index: &str) -> anyhow::Result<Vec<UpdateStatus<UpdateMeta, UpdateResult, String>>> { pub fn get_updates_status(&self, index: impl AsRef<str>) -> anyhow::Result<Vec<UpdateStatus>> {
self.index_controller.all_update_status(index) self.index_controller.all_update_status(index)
} }
} }

View File

@ -1,15 +1,13 @@
use std::fs::{create_dir_all, remove_dir_all};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::fs::create_dir_all;
use std::sync::Arc; use std::sync::Arc;
use dashmap::DashMap; use dashmap::{DashMap, mapref::entry::Entry};
use dashmap::mapref::entry::Entry;
use heed::{Env, EnvOpenOptions, Database, types::{Str, SerdeJson, ByteSlice}, RoTxn, RwTxn}; use heed::{Env, EnvOpenOptions, Database, types::{Str, SerdeJson, ByteSlice}, RoTxn, RwTxn};
use milli::Index; use milli::Index;
use rayon::ThreadPool; use rayon::ThreadPool;
use uuid::Uuid;
use serde::{Serialize, Deserialize}; use serde::{Serialize, Deserialize};
use log::warn; use uuid::Uuid;
use crate::option::IndexerOpts; use crate::option::IndexerOpts;
use super::update_handler::UpdateHandler; use super::update_handler::UpdateHandler;
@ -29,7 +27,7 @@ impl IndexMeta {
&self, &self,
path: impl AsRef<Path>, path: impl AsRef<Path>,
thread_pool: Arc<ThreadPool>, thread_pool: Arc<ThreadPool>,
opt: &IndexerOpts, indexer_options: &IndexerOpts,
) -> anyhow::Result<(Arc<Index>, Arc<UpdateStore>)> { ) -> anyhow::Result<(Arc<Index>, Arc<UpdateStore>)> {
let update_path = make_update_db_path(&path, &self.uuid); let update_path = make_update_db_path(&path, &self.uuid);
let index_path = make_index_db_path(&path, &self.uuid); let index_path = make_index_db_path(&path, &self.uuid);
@ -43,24 +41,25 @@ impl IndexMeta {
let mut options = EnvOpenOptions::new(); let mut options = EnvOpenOptions::new();
options.map_size(self.update_size as usize); options.map_size(self.update_size as usize);
let handler = UpdateHandler::new(opt, index.clone(), thread_pool)?; let handler = UpdateHandler::new(indexer_options, index.clone(), thread_pool)?;
let update_store = UpdateStore::open(options, update_path, handler)?; let update_store = UpdateStore::open(options, update_path, handler)?;
Ok((index, update_store)) Ok((index, update_store))
} }
} }
pub struct IndexStore { pub struct IndexStore {
env: Env, env: Env,
name_to_uid_db: Database<Str, ByteSlice>, name_to_uuid_db: Database<Str, ByteSlice>,
uid_to_index: DashMap<Uuid, (Arc<Index>, Arc<UpdateStore>)>, uuid_to_index: DashMap<Uuid, (Arc<Index>, Arc<UpdateStore>)>,
uid_to_index_db: Database<ByteSlice, SerdeJson<IndexMeta>>, uuid_to_index_db: Database<ByteSlice, SerdeJson<IndexMeta>>,
thread_pool: Arc<ThreadPool>, thread_pool: Arc<ThreadPool>,
opt: IndexerOpts, indexer_options: IndexerOpts,
} }
impl IndexStore { impl IndexStore {
pub fn new(path: impl AsRef<Path>, opt: IndexerOpts) -> anyhow::Result<Self> { pub fn new(path: impl AsRef<Path>, indexer_options: IndexerOpts) -> anyhow::Result<Self> {
let env = EnvOpenOptions::new() let env = EnvOpenOptions::new()
.map_size(4096 * 100) .map_size(4096 * 100)
.max_dbs(2) .max_dbs(2)
@ -71,23 +70,23 @@ impl IndexStore {
let uid_to_index_db = open_or_create_database(&env, Some("uid_to_index_db"))?; let uid_to_index_db = open_or_create_database(&env, Some("uid_to_index_db"))?;
let thread_pool = rayon::ThreadPoolBuilder::new() let thread_pool = rayon::ThreadPoolBuilder::new()
.num_threads(opt.indexing_jobs.unwrap_or(0)) .num_threads(indexer_options.indexing_jobs.unwrap_or(0))
.build()?; .build()?;
let thread_pool = Arc::new(thread_pool); let thread_pool = Arc::new(thread_pool);
Ok(Self { Ok(Self {
env, env,
name_to_uid_db, name_to_uuid_db: name_to_uid_db,
uid_to_index, uuid_to_index: uid_to_index,
uid_to_index_db, uuid_to_index_db: uid_to_index_db,
thread_pool, thread_pool,
opt, indexer_options,
}) })
} }
fn index_uid(&self, txn: &RoTxn, name: impl AsRef<str>) -> anyhow::Result<Option<Uuid>> { fn index_uuid(&self, txn: &RoTxn, name: impl AsRef<str>) -> anyhow::Result<Option<Uuid>> {
match self.name_to_uid_db.get(txn, name.as_ref())? { match self.name_to_uuid_db.get(txn, name.as_ref())? {
Some(bytes) => { Some(bytes) => {
let uuid = Uuid::from_slice(bytes)?; let uuid = Uuid::from_slice(bytes)?;
Ok(Some(uuid)) Ok(Some(uuid))
@ -97,12 +96,12 @@ impl IndexStore {
} }
fn retrieve_index(&self, txn: &RoTxn, uid: Uuid) -> anyhow::Result<Option<(Arc<Index>, Arc<UpdateStore>)>> { fn retrieve_index(&self, txn: &RoTxn, uid: Uuid) -> anyhow::Result<Option<(Arc<Index>, Arc<UpdateStore>)>> {
match self.uid_to_index.entry(uid.clone()) { match self.uuid_to_index.entry(uid.clone()) {
Entry::Vacant(entry) => { Entry::Vacant(entry) => {
match self.uid_to_index_db.get(txn, uid.as_bytes())? { match self.uuid_to_index_db.get(txn, uid.as_bytes())? {
Some(meta) => { Some(meta) => {
let path = self.env.path(); let path = self.env.path();
let (index, updates) = meta.open(path, self.thread_pool.clone(), &self.opt)?; let (index, updates) = meta.open(path, self.thread_pool.clone(), &self.indexer_options)?;
entry.insert((index.clone(), updates.clone())); entry.insert((index.clone(), updates.clone()));
Ok(Some((index, updates))) Ok(Some((index, updates)))
}, },
@ -117,7 +116,7 @@ impl IndexStore {
} }
fn _get_index(&self, txn: &RoTxn, name: impl AsRef<str>) -> anyhow::Result<Option<(Arc<Index>, Arc<UpdateStore>)>> { fn _get_index(&self, txn: &RoTxn, name: impl AsRef<str>) -> anyhow::Result<Option<(Arc<Index>, Arc<UpdateStore>)>> {
match self.index_uid(&txn, name)? { match self.index_uuid(&txn, name)? {
Some(uid) => self.retrieve_index(&txn, uid), Some(uid) => self.retrieve_index(&txn, uid),
None => Ok(None), None => Ok(None),
} }
@ -129,59 +128,61 @@ impl IndexStore {
} }
pub fn get_or_create_index( pub fn get_or_create_index(
&self, name: impl AsRef<str>, &self,
update_size: u64,
index_size: u64,
) -> anyhow::Result<(Arc<Index>, Arc<UpdateStore>)> {
let mut txn = self.env.write_txn()?;
match self._get_index(&txn, name.as_ref())? {
Some(res) => Ok(res),
None => {
let uid = Uuid::new_v4();
// TODO: clean in case of error
let result = self.create_index(&mut txn, uid, name, update_size, index_size);
match result {
Ok((index, update_store)) => {
match txn.commit() {
Ok(_) => Ok((index, update_store)),
Err(e) => {
self.clean_uid(&uid);
Err(anyhow::anyhow!("error creating index: {}", e))
}
}
}
Err(e) => {
self.clean_uid(&uid);
Err(e)
}
}
},
}
}
/// removes all data acociated with an index Uuid. This is called when index creation failed
/// and outstanding files and data need to be cleaned.
fn clean_uid(&self, _uid: &Uuid) {
// TODO!
warn!("creating cleanup is not yet implemented");
}
fn create_index( &self,
txn: &mut RwTxn,
uid: Uuid,
name: impl AsRef<str>, name: impl AsRef<str>,
update_size: u64, update_size: u64,
index_size: u64, index_size: u64,
) -> anyhow::Result<(Arc<Index>, Arc<UpdateStore>)> { ) -> anyhow::Result<(Arc<Index>, Arc<UpdateStore>)> {
let meta = IndexMeta { update_size, index_size, uuid: uid.clone() }; let mut txn = self.env.write_txn()?;
match self._get_index(&txn, name.as_ref())? {
Some(res) => Ok(res),
None => {
let uuid = Uuid::new_v4();
let result = self.create_index(&mut txn, uuid, name, update_size, index_size)?;
// If we fail to commit the transaction, we must delete the database from the
// file-system.
if let Err(e) = txn.commit() {
self.clean_db(uuid);
return Err(e)?;
}
Ok(result)
},
}
}
self.name_to_uid_db.put(txn, name.as_ref(), uid.as_bytes())?; // Remove all the files and data associated with a db uuid.
self.uid_to_index_db.put(txn, uid.as_bytes(), &meta)?; fn clean_db(&self, uuid: Uuid) {
let update_db_path = make_update_db_path(self.env.path(), &uuid);
let index_db_path = make_index_db_path(self.env.path(), &uuid);
remove_dir_all(update_db_path).expect("Failed to clean database");
remove_dir_all(index_db_path).expect("Failed to clean database");
self.uuid_to_index.remove(&uuid);
}
fn create_index( &self,
txn: &mut RwTxn,
uuid: Uuid,
name: impl AsRef<str>,
update_size: u64,
index_size: u64,
) -> anyhow::Result<(Arc<Index>, Arc<UpdateStore>)> {
let meta = IndexMeta { update_size, index_size, uuid: uuid.clone() };
self.name_to_uuid_db.put(txn, name.as_ref(), uuid.as_bytes())?;
self.uuid_to_index_db.put(txn, uuid.as_bytes(), &meta)?;
let path = self.env.path(); let path = self.env.path();
let (index, update_store) = meta.open(path, self.thread_pool.clone(), &self.opt)?; let (index, update_store) = match meta.open(path, self.thread_pool.clone(), &self.indexer_options) {
Ok(res) => res,
Err(e) => {
self.clean_db(uuid);
return Err(e)
}
};
self.uid_to_index.insert(uid, (index.clone(), update_store.clone())); self.uuid_to_index.insert(uuid, (index.clone(), update_store.clone()));
Ok((index, update_store)) Ok((index, update_store))
} }
@ -194,15 +195,15 @@ fn open_or_create_database<K: 'static, V: 'static>(env: &Env, name: Option<&str>
} }
} }
fn make_update_db_path(path: impl AsRef<Path>, uid: &Uuid) -> PathBuf { fn make_update_db_path(path: impl AsRef<Path>, uuid: &Uuid) -> PathBuf {
let mut path = path.as_ref().to_path_buf(); let mut path = path.as_ref().to_path_buf();
path.push(format!("update{}", uid)); path.push(format!("update{}", uuid));
path path
} }
fn make_index_db_path(path: impl AsRef<Path>, uid: &Uuid) -> PathBuf { fn make_index_db_path(path: impl AsRef<Path>, uuid: &Uuid) -> PathBuf {
let mut path = path.as_ref().to_path_buf(); let mut path = path.as_ref().to_path_buf();
path.push(format!("index{}", uid)); path.push(format!("index{}", uuid));
path path
} }
@ -240,18 +241,18 @@ mod test {
let name = "foobar"; let name = "foobar";
let txn = store.env.read_txn().unwrap(); let txn = store.env.read_txn().unwrap();
// name is not found if the uuid in not present in the db // name is not found if the uuid in not present in the db
assert!(store.index_uid(&txn, &name).unwrap().is_none()); assert!(store.index_uuid(&txn, &name).unwrap().is_none());
drop(txn); drop(txn);
// insert an uuid in the the name_to_uuid_db: // insert an uuid in the the name_to_uuid_db:
let uuid = Uuid::new_v4(); let uuid = Uuid::new_v4();
let mut txn = store.env.write_txn().unwrap(); let mut txn = store.env.write_txn().unwrap();
store.name_to_uid_db.put(&mut txn, &name, uuid.as_bytes()).unwrap(); store.name_to_uuid_db.put(&mut txn, &name, uuid.as_bytes()).unwrap();
txn.commit().unwrap(); txn.commit().unwrap();
// check that the uuid is there // check that the uuid is there
let txn = store.env.read_txn().unwrap(); let txn = store.env.read_txn().unwrap();
assert_eq!(store.index_uid(&txn, &name).unwrap(), Some(uuid)); assert_eq!(store.index_uuid(&txn, &name).unwrap(), Some(uuid));
} }
#[test] #[test]
@ -265,15 +266,15 @@ mod test {
let meta = IndexMeta { update_size: 4096 * 100, index_size: 4096 * 100, uuid: uuid.clone() }; let meta = IndexMeta { update_size: 4096 * 100, index_size: 4096 * 100, uuid: uuid.clone() };
let mut txn = store.env.write_txn().unwrap(); let mut txn = store.env.write_txn().unwrap();
store.uid_to_index_db.put(&mut txn, uuid.as_bytes(), &meta).unwrap(); store.uuid_to_index_db.put(&mut txn, uuid.as_bytes(), &meta).unwrap();
txn.commit().unwrap(); txn.commit().unwrap();
// the index cache should be empty // the index cache should be empty
assert!(store.uid_to_index.is_empty()); assert!(store.uuid_to_index.is_empty());
let txn = store.env.read_txn().unwrap(); let txn = store.env.read_txn().unwrap();
assert!(store.retrieve_index(&txn, uuid).unwrap().is_some()); assert!(store.retrieve_index(&txn, uuid).unwrap().is_some());
assert_eq!(store.uid_to_index.len(), 1); assert_eq!(store.uuid_to_index.len(), 1);
} }
#[test] #[test]
@ -287,8 +288,8 @@ mod test {
let uuid = Uuid::new_v4(); let uuid = Uuid::new_v4();
let meta = IndexMeta { update_size: 4096 * 100, index_size: 4096 * 100, uuid: uuid.clone() }; let meta = IndexMeta { update_size: 4096 * 100, index_size: 4096 * 100, uuid: uuid.clone() };
let mut txn = store.env.write_txn().unwrap(); let mut txn = store.env.write_txn().unwrap();
store.name_to_uid_db.put(&mut txn, &name, uuid.as_bytes()).unwrap(); store.name_to_uuid_db.put(&mut txn, &name, uuid.as_bytes()).unwrap();
store.uid_to_index_db.put(&mut txn, uuid.as_bytes(), &meta).unwrap(); store.uuid_to_index_db.put(&mut txn, uuid.as_bytes(), &meta).unwrap();
txn.commit().unwrap(); txn.commit().unwrap();
assert!(store.index(&name).unwrap().is_some()); assert!(store.index(&name).unwrap().is_some());
@ -302,12 +303,12 @@ mod test {
store.get_or_create_index(&name, 4096 * 100, 4096 * 100).unwrap(); store.get_or_create_index(&name, 4096 * 100, 4096 * 100).unwrap();
let txn = store.env.read_txn().unwrap(); let txn = store.env.read_txn().unwrap();
let uuid = store.name_to_uid_db.get(&txn, &name).unwrap(); let uuid = store.name_to_uuid_db.get(&txn, &name).unwrap();
assert_eq!(store.uid_to_index.len(), 1); assert_eq!(store.uuid_to_index.len(), 1);
assert!(uuid.is_some()); assert!(uuid.is_some());
let uuid = Uuid::from_slice(uuid.unwrap()).unwrap(); let uuid = Uuid::from_slice(uuid.unwrap()).unwrap();
let meta = IndexMeta { update_size: 4096 * 100, index_size: 4096 * 100, uuid: uuid.clone() }; let meta = IndexMeta { update_size: 4096 * 100, index_size: 4096 * 100, uuid: uuid.clone() };
assert_eq!(store.uid_to_index_db.get(&txn, uuid.as_bytes()).unwrap(), Some(meta)); assert_eq!(store.uuid_to_index_db.get(&txn, uuid.as_bytes()).unwrap(), Some(meta));
} }
#[test] #[test]
@ -321,12 +322,12 @@ mod test {
let uuid = Uuid::new_v4(); let uuid = Uuid::new_v4();
let mut txn = store.env.write_txn().unwrap(); let mut txn = store.env.write_txn().unwrap();
store.create_index(&mut txn, uuid, name, update_size, index_size).unwrap(); store.create_index(&mut txn, uuid, name, update_size, index_size).unwrap();
let uuid = store.name_to_uid_db.get(&txn, &name).unwrap(); let uuid = store.name_to_uuid_db.get(&txn, &name).unwrap();
assert_eq!(store.uid_to_index.len(), 1); assert_eq!(store.uuid_to_index.len(), 1);
assert!(uuid.is_some()); assert!(uuid.is_some());
let uuid = Uuid::from_slice(uuid.unwrap()).unwrap(); let uuid = Uuid::from_slice(uuid.unwrap()).unwrap();
let meta = IndexMeta { update_size , index_size, uuid: uuid.clone() }; let meta = IndexMeta { update_size , index_size, uuid: uuid.clone() };
assert_eq!(store.uid_to_index_db.get(&txn, uuid.as_bytes()).unwrap(), Some(meta)); assert_eq!(store.uuid_to_index_db.get(&txn, uuid.as_bytes()).unwrap(), Some(meta));
} }
} }
} }

View File

@ -2,16 +2,15 @@ mod update_store;
mod index_store; mod index_store;
mod update_handler; mod update_handler;
use index_store::IndexStore;
use std::path::Path; use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
use milli::Index;
use anyhow::bail; use anyhow::bail;
use itertools::Itertools; use itertools::Itertools;
use milli::Index;
use crate::option::IndexerOpts; use crate::option::IndexerOpts;
use index_store::IndexStore;
use super::IndexController; use super::IndexController;
use super::updates::UpdateStatus; use super::updates::UpdateStatus;
use super::{UpdateMeta, UpdateResult}; use super::{UpdateMeta, UpdateResult};
@ -84,7 +83,7 @@ impl IndexController for LocalIndexController {
} }
fn all_update_status(&self, index: impl AsRef<str>) -> anyhow::Result<Vec<UpdateStatus<UpdateMeta, UpdateResult, String>>> { fn all_update_status(&self, index: impl AsRef<str>) -> anyhow::Result<Vec<UpdateStatus<UpdateMeta, UpdateResult, String>>> {
match self.indexes.index(index)? { match self.indexes.index(&index)? {
Some((_, update_store)) => { Some((_, update_store)) => {
let updates = update_store.iter_metas(|processing, processed, pending, aborted, failed| { let updates = update_store.iter_metas(|processing, processed, pending, aborted, failed| {
Ok(processing Ok(processing
@ -99,7 +98,7 @@ impl IndexController for LocalIndexController {
})?; })?;
Ok(updates) Ok(updates)
} }
None => Ok(Vec::new()) None => bail!("index {} doesn't exist.", index.as_ref()),
} }
} }

View File

@ -192,16 +192,6 @@ where
} }
} }
/// The id and metadata of the update that is currently being processed,
/// `None` if no update is being processed.
pub fn processing_update(&self) -> heed::Result<Option<Pending<M>>> {
let rtxn = self.env.read_txn()?;
match self.pending_meta.first(&rtxn)? {
Some((_, meta)) => Ok(Some(meta)),
None => Ok(None),
}
}
/// Execute the user defined function with the meta-store iterators, the first /// Execute the user defined function with the meta-store iterators, the first
/// iterator is the *processed* meta one, the second the *aborted* meta one /// iterator is the *processed* meta one, the second the *aborted* meta one
/// and, the last is the *pending* meta one. /// and, the last is the *pending* meta one.

View File

@ -1,5 +1,5 @@
mod local_index_controller; mod local_index_controller;
pub mod updates; mod updates;
pub use local_index_controller::LocalIndexController; pub use local_index_controller::LocalIndexController;
@ -12,7 +12,9 @@ use milli::Index;
use milli::update::{IndexDocumentsMethod, UpdateFormat, DocumentAdditionResult}; use milli::update::{IndexDocumentsMethod, UpdateFormat, DocumentAdditionResult};
use serde::{Serialize, Deserialize, de::Deserializer}; use serde::{Serialize, Deserialize, de::Deserializer};
use updates::{Processed, Processing, Failed, UpdateStatus}; pub use updates::{Processed, Processing, Failed};
pub type UpdateStatus = updates::UpdateStatus<UpdateMeta, UpdateResult, String>;
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
@ -85,9 +87,10 @@ pub enum UpdateResult {
} }
/// The `IndexController` is in charge of the access to the underlying indices. It splits the logic /// The `IndexController` is in charge of the access to the underlying indices. It splits the logic
/// for read access which is provided, and write access which must be provided. This allows the /// for read access which is provided thanks to an handle to the index, and write access which must
/// implementer to define the behaviour of write accesses to the indices, and abstract the /// be provided. This allows the implementer to define the behaviour of write accesses to the
/// scheduling of the updates. The implementer must be able to provide an instance of `IndexStore` /// indices, and abstract the scheduling of the updates. The implementer must be able to provide an
/// instance of `IndexStore`
pub trait IndexController { pub trait IndexController {
/* /*
@ -106,11 +109,11 @@ pub trait IndexController {
method: IndexDocumentsMethod, method: IndexDocumentsMethod,
format: UpdateFormat, format: UpdateFormat,
data: &[u8], data: &[u8],
) -> anyhow::Result<UpdateStatus<UpdateMeta, UpdateResult, String>>; ) -> anyhow::Result<UpdateStatus>;
/// Updates an index settings. If the index does not exist, it will be created when the update /// Updates an index settings. If the index does not exist, it will be created when the update
/// is applied to the index. /// is applied to the index.
fn update_settings<S: AsRef<str>>(&self, index_uid: S, settings: Settings) -> anyhow::Result<UpdateStatus<UpdateMeta, UpdateResult, String>>; fn update_settings<S: AsRef<str>>(&self, index_uid: S, settings: Settings) -> anyhow::Result<UpdateStatus>;
/// Create an index with the given `index_uid`. /// Create an index with the given `index_uid`.
fn create_index<S: AsRef<str>>(&self, index_uid: S) -> Result<()>; fn create_index<S: AsRef<str>>(&self, index_uid: S) -> Result<()>;
@ -133,9 +136,9 @@ pub trait IndexController {
todo!() todo!()
} }
/// Returns, if it exists, an `IndexView` to the requested index. /// Returns, if it exists, the `Index` with the povided name.
fn index(&self, uid: impl AsRef<str>) -> anyhow::Result<Option<Arc<Index>>>; fn index(&self, name: impl AsRef<str>) -> anyhow::Result<Option<Arc<Index>>>;
fn update_status(&self, index: impl AsRef<str>, id: u64) -> anyhow::Result<Option<UpdateStatus<UpdateMeta, UpdateResult, String>>>; fn update_status(&self, index: impl AsRef<str>, id: u64) -> anyhow::Result<Option<UpdateStatus>>;
fn all_update_status(&self, index: impl AsRef<str>) -> anyhow::Result<Vec<UpdateStatus<UpdateMeta, UpdateResult, String>>>; fn all_update_status(&self, index: impl AsRef<str>) -> anyhow::Result<Vec<UpdateStatus>>;
} }

View File

@ -5,7 +5,6 @@ pub mod error;
pub mod helpers; pub mod helpers;
pub mod option; pub mod option;
pub mod routes; pub mod routes;
//mod updates;
mod index_controller; mod index_controller;
use actix_http::Error; use actix_http::Error;