1763: Index tests r=MarinPostma a=MarinPostma

This pr aims to test more thorougly the usage on index in the meilisearch database, by writing unit tests.

work included:
- [x] Create index mock and stub methods
- [x] Test snapshot creation
- [x] Test Dumps
- [x] Test search

Co-authored-by: mpostma <postma.marin@protonmail.com>
This commit is contained in:
bors[bot] 2021-10-06 14:39:53 +00:00 committed by GitHub
commit ddbcf449da
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 1392 additions and 558 deletions

96
Cargo.lock generated
View File

@ -825,6 +825,12 @@ version = "1.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f2c9736e15e7df1638a7f6eee92a6511615c738246a052af5ba86f039b65aede" checksum = "f2c9736e15e7df1638a7f6eee92a6511615c738246a052af5ba86f039b65aede"
[[package]]
name = "difference"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "524cbf6897b527295dff137cec09ecf3a05f4fddffd7dfcd1585403449e74198"
[[package]] [[package]]
name = "digest" name = "digest"
version = "0.8.1" version = "0.8.1"
@ -849,6 +855,12 @@ version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "212d0f5754cb6769937f4501cc0e67f4f4483c8d2c3e1e922ee9edbe4ab4c7c0" checksum = "212d0f5754cb6769937f4501cc0e67f4f4483c8d2c3e1e922ee9edbe4ab4c7c0"
[[package]]
name = "downcast"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4bb454f0228b18c7f4c3b0ebbee346ed9c52e7443b0999cd543ff3571205701d"
[[package]] [[package]]
name = "either" name = "either"
version = "1.6.1" version = "1.6.1"
@ -933,6 +945,15 @@ dependencies = [
"miniz_oxide", "miniz_oxide",
] ]
[[package]]
name = "float-cmp"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1267f4ac4f343772758f7b1bdcbe767c218bbab93bb432acbf5162bbf85a6c4"
dependencies = [
"num-traits",
]
[[package]] [[package]]
name = "fnv" name = "fnv"
version = "1.0.7" version = "1.0.7"
@ -949,6 +970,12 @@ dependencies = [
"percent-encoding", "percent-encoding",
] ]
[[package]]
name = "fragile"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69a039c3498dc930fe810151a34ba0c1c70b02b8625035592e74432f678591f2"
[[package]] [[package]]
name = "fs_extra" name = "fs_extra"
version = "1.2.0" version = "1.2.0"
@ -1688,6 +1715,7 @@ dependencies = [
"memmap", "memmap",
"milli", "milli",
"mime", "mime",
"mockall",
"num_cpus", "num_cpus",
"obkv", "obkv",
"once_cell", "once_cell",
@ -1847,6 +1875,39 @@ dependencies = [
"winapi", "winapi",
] ]
[[package]]
name = "mockall"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ab571328afa78ae322493cacca3efac6a0f2e0a67305b4df31fd439ef129ac0"
dependencies = [
"cfg-if 1.0.0",
"downcast",
"fragile",
"lazy_static",
"mockall_derive",
"predicates",
"predicates-tree",
]
[[package]]
name = "mockall_derive"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7e25b214433f669161f414959594216d8e6ba83b6679d3db96899c0b4639033"
dependencies = [
"cfg-if 1.0.0",
"proc-macro2 1.0.29",
"quote 1.0.9",
"syn 1.0.77",
]
[[package]]
name = "normalize-line-endings"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61807f77802ff30975e01f4f071c8ba10c022052f98b3294119f3e615d13e5be"
[[package]] [[package]]
name = "ntapi" name = "ntapi"
version = "0.3.6" version = "0.3.6"
@ -2119,6 +2180,35 @@ version = "0.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857" checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857"
[[package]]
name = "predicates"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f49cfaf7fdaa3bfacc6fa3e7054e65148878354a5cfddcf661df4c851f8021df"
dependencies = [
"difference",
"float-cmp",
"normalize-line-endings",
"predicates-core",
"regex",
]
[[package]]
name = "predicates-core"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57e35a3326b75e49aa85f5dc6ec15b41108cf5aee58eabb1f274dd18b73c2451"
[[package]]
name = "predicates-tree"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7dd0fd014130206c9352efbdc92be592751b2b9274dff685348341082c6ea3d"
dependencies = [
"predicates-core",
"treeline",
]
[[package]] [[package]]
name = "proc-macro-error" name = "proc-macro-error"
version = "1.0.4" version = "1.0.4"
@ -3044,6 +3134,12 @@ dependencies = [
"lazy_static", "lazy_static",
] ]
[[package]]
name = "treeline"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a7f741b240f1a48843f9b8e0444fb55fb2a4ff67293b50a9179dfd5ea67f8d41"
[[package]] [[package]]
name = "try-lock" name = "try-lock"
version = "0.2.3" version = "0.2.3"

View File

@ -60,4 +60,5 @@ derivative = "2.2.0"
[dev-dependencies] [dev-dependencies]
actix-rt = "2.2.0" actix-rt = "2.2.0"
mockall = "0.10.2"
paste = "1.0.5" paste = "1.0.5"

View File

@ -13,7 +13,7 @@ use crate::index::update_handler::UpdateHandler;
use crate::index::updates::apply_settings_to_builder; use crate::index::updates::apply_settings_to_builder;
use super::error::Result; use super::error::Result;
use super::{Index, Settings, Unchecked}; use super::{index::Index, Settings, Unchecked};
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
struct DumpMeta { struct DumpMeta {

View File

@ -0,0 +1,286 @@
use std::collections::{BTreeSet, HashSet};
use std::fs::create_dir_all;
use std::marker::PhantomData;
use std::ops::Deref;
use std::path::Path;
use std::sync::Arc;
use chrono::{DateTime, Utc};
use heed::{EnvOpenOptions, RoTxn};
use milli::update::Setting;
use milli::{obkv_to_json, FieldDistribution, FieldId};
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
use uuid::Uuid;
use crate::index_controller::update_file_store::UpdateFileStore;
use crate::EnvSizer;
use super::error::IndexError;
use super::error::Result;
use super::update_handler::UpdateHandler;
use super::{Checked, Settings};
pub type Document = Map<String, Value>;
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct IndexMeta {
created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub primary_key: Option<String>,
}
impl IndexMeta {
pub fn new(index: &Index) -> Result<Self> {
let txn = index.read_txn()?;
Self::new_txn(index, &txn)
}
pub fn new_txn(index: &Index, txn: &heed::RoTxn) -> Result<Self> {
let created_at = index.created_at(txn)?;
let updated_at = index.updated_at(txn)?;
let primary_key = index.primary_key(txn)?.map(String::from);
Ok(Self {
created_at,
updated_at,
primary_key,
})
}
}
#[derive(Serialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct IndexStats {
#[serde(skip)]
pub size: u64,
pub number_of_documents: u64,
/// Whether the current index is performing an update. It is initially `None` when the
/// index returns it, since it is the `UpdateStore` that knows what index is currently indexing. It is
/// later set to either true or false, we we retrieve the information from the `UpdateStore`
pub is_indexing: Option<bool>,
pub field_distribution: FieldDistribution,
}
#[derive(Clone, derivative::Derivative)]
#[derivative(Debug)]
pub struct Index {
pub uuid: Uuid,
#[derivative(Debug = "ignore")]
pub inner: Arc<milli::Index>,
#[derivative(Debug = "ignore")]
pub update_file_store: Arc<UpdateFileStore>,
#[derivative(Debug = "ignore")]
pub update_handler: Arc<UpdateHandler>,
}
impl Deref for Index {
type Target = milli::Index;
fn deref(&self) -> &Self::Target {
self.inner.as_ref()
}
}
impl Index {
pub fn open(
path: impl AsRef<Path>,
size: usize,
update_file_store: Arc<UpdateFileStore>,
uuid: Uuid,
update_handler: Arc<UpdateHandler>,
) -> Result<Self> {
create_dir_all(&path)?;
let mut options = EnvOpenOptions::new();
options.map_size(size);
let inner = Arc::new(milli::Index::new(options, &path)?);
Ok(Index {
inner,
update_file_store,
uuid,
update_handler,
})
}
pub fn inner(&self) -> &milli::Index {
&self.inner
}
pub fn stats(&self) -> Result<IndexStats> {
let rtxn = self.read_txn()?;
Ok(IndexStats {
size: self.size(),
number_of_documents: self.number_of_documents(&rtxn)?,
is_indexing: None,
field_distribution: self.field_distribution(&rtxn)?,
})
}
pub fn meta(&self) -> Result<IndexMeta> {
IndexMeta::new(self)
}
pub fn settings(&self) -> Result<Settings<Checked>> {
let txn = self.read_txn()?;
self.settings_txn(&txn)
}
pub fn uuid(&self) -> Uuid {
self.uuid
}
pub fn settings_txn(&self, txn: &RoTxn) -> Result<Settings<Checked>> {
let displayed_attributes = self
.displayed_fields(txn)?
.map(|fields| fields.into_iter().map(String::from).collect());
let searchable_attributes = self
.searchable_fields(txn)?
.map(|fields| fields.into_iter().map(String::from).collect());
let filterable_attributes = self.filterable_fields(txn)?.into_iter().collect();
let sortable_attributes = self.sortable_fields(txn)?.into_iter().collect();
let criteria = self
.criteria(txn)?
.into_iter()
.map(|c| c.to_string())
.collect();
let stop_words = self
.stop_words(txn)?
.map(|stop_words| -> Result<BTreeSet<_>> {
Ok(stop_words.stream().into_strs()?.into_iter().collect())
})
.transpose()?
.unwrap_or_else(BTreeSet::new);
let distinct_field = self.distinct_field(txn)?.map(String::from);
// in milli each word in the synonyms map were split on their separator. Since we lost
// this information we are going to put space between words.
let synonyms = self
.synonyms(txn)?
.iter()
.map(|(key, values)| {
(
key.join(" "),
values.iter().map(|value| value.join(" ")).collect(),
)
})
.collect();
Ok(Settings {
displayed_attributes: match displayed_attributes {
Some(attrs) => Setting::Set(attrs),
None => Setting::Reset,
},
searchable_attributes: match searchable_attributes {
Some(attrs) => Setting::Set(attrs),
None => Setting::Reset,
},
filterable_attributes: Setting::Set(filterable_attributes),
sortable_attributes: Setting::Set(sortable_attributes),
ranking_rules: Setting::Set(criteria),
stop_words: Setting::Set(stop_words),
distinct_attribute: match distinct_field {
Some(field) => Setting::Set(field),
None => Setting::Reset,
},
synonyms: Setting::Set(synonyms),
_kind: PhantomData,
})
}
pub fn retrieve_documents<S: AsRef<str>>(
&self,
offset: usize,
limit: usize,
attributes_to_retrieve: Option<Vec<S>>,
) -> Result<Vec<Map<String, Value>>> {
let txn = self.read_txn()?;
let fields_ids_map = self.fields_ids_map(&txn)?;
let fields_to_display =
self.fields_to_display(&txn, &attributes_to_retrieve, &fields_ids_map)?;
let iter = self.documents.range(&txn, &(..))?.skip(offset).take(limit);
let mut documents = Vec::new();
for entry in iter {
let (_id, obkv) = entry?;
let object = obkv_to_json(&fields_to_display, &fields_ids_map, obkv)?;
documents.push(object);
}
Ok(documents)
}
pub fn retrieve_document<S: AsRef<str>>(
&self,
doc_id: String,
attributes_to_retrieve: Option<Vec<S>>,
) -> Result<Map<String, Value>> {
let txn = self.read_txn()?;
let fields_ids_map = self.fields_ids_map(&txn)?;
let fields_to_display =
self.fields_to_display(&txn, &attributes_to_retrieve, &fields_ids_map)?;
let internal_id = self
.external_documents_ids(&txn)?
.get(doc_id.as_bytes())
.ok_or_else(|| IndexError::DocumentNotFound(doc_id.clone()))?;
let document = self
.documents(&txn, std::iter::once(internal_id))?
.into_iter()
.next()
.map(|(_, d)| d)
.ok_or(IndexError::DocumentNotFound(doc_id))?;
let document = obkv_to_json(&fields_to_display, &fields_ids_map, document)?;
Ok(document)
}
pub fn size(&self) -> u64 {
self.env.size()
}
fn fields_to_display<S: AsRef<str>>(
&self,
txn: &heed::RoTxn,
attributes_to_retrieve: &Option<Vec<S>>,
fields_ids_map: &milli::FieldsIdsMap,
) -> Result<Vec<FieldId>> {
let mut displayed_fields_ids = match self.displayed_fields_ids(txn)? {
Some(ids) => ids.into_iter().collect::<Vec<_>>(),
None => fields_ids_map.iter().map(|(id, _)| id).collect(),
};
let attributes_to_retrieve_ids = match attributes_to_retrieve {
Some(attrs) => attrs
.iter()
.filter_map(|f| fields_ids_map.id(f.as_ref()))
.collect::<HashSet<_>>(),
None => fields_ids_map.iter().map(|(id, _)| id).collect(),
};
displayed_fields_ids.retain(|fid| attributes_to_retrieve_ids.contains(fid));
Ok(displayed_fields_ids)
}
pub fn snapshot(&self, path: impl AsRef<Path>) -> Result<()> {
let mut dst = path.as_ref().join(format!("indexes/{}/", self.uuid));
create_dir_all(&dst)?;
dst.push("data.mdb");
let _txn = self.write_txn()?;
self.inner
.env
.copy_to_path(dst, heed::CompactionOption::Enabled)?;
Ok(())
}
}

View File

@ -1,97 +1,203 @@
use std::collections::{BTreeSet, HashSet};
use std::fs::create_dir_all;
use std::marker::PhantomData;
use std::ops::Deref;
use std::path::Path;
use std::sync::Arc;
use chrono::{DateTime, Utc};
use heed::{EnvOpenOptions, RoTxn};
use milli::update::Setting;
use milli::{obkv_to_json, FieldDistribution, FieldId};
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
use error::Result;
pub use search::{default_crop_length, SearchQuery, SearchResult, DEFAULT_SEARCH_LIMIT}; pub use search::{default_crop_length, SearchQuery, SearchResult, DEFAULT_SEARCH_LIMIT};
pub use updates::{apply_settings_to_builder, Checked, Facets, Settings, Unchecked}; pub use updates::{apply_settings_to_builder, Checked, Facets, Settings, Unchecked};
use uuid::Uuid;
use crate::index_controller::update_file_store::UpdateFileStore;
use crate::EnvSizer;
use self::error::IndexError;
use self::update_handler::UpdateHandler;
pub mod error;
pub mod update_handler;
mod dump; mod dump;
pub mod error;
mod search; mod search;
pub mod update_handler;
mod updates; mod updates;
pub type Document = Map<String, Value>; #[allow(clippy::module_inception)]
mod index;
#[derive(Debug, Serialize, Deserialize, Clone)] pub use index::{Document, IndexMeta, IndexStats};
#[serde(rename_all = "camelCase")]
pub struct IndexMeta {
created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub primary_key: Option<String>,
}
#[derive(Serialize, Debug)] #[cfg(not(test))]
#[serde(rename_all = "camelCase")] pub use index::Index;
pub struct IndexStats {
#[serde(skip)]
pub size: u64,
pub number_of_documents: u64,
/// Whether the current index is performing an update. It is initially `None` when the
/// index returns it, since it is the `UpdateStore` that knows what index is currently indexing. It is
/// later set to either true or false, we we retrieve the information from the `UpdateStore`
pub is_indexing: Option<bool>,
pub field_distribution: FieldDistribution,
}
impl IndexMeta { #[cfg(test)]
pub fn new(index: &Index) -> Result<Self> { pub use test::MockIndex as Index;
let txn = index.read_txn()?;
Self::new_txn(index, &txn) /// The index::test module provides means of mocking an index instance. I can be used throughout the
/// code for unit testing, in places where an index would normally be used.
#[cfg(test)]
pub mod test {
use std::any::Any;
use std::collections::HashMap;
use std::panic::{RefUnwindSafe, UnwindSafe};
use std::path::Path;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use serde_json::{Map, Value};
use uuid::Uuid;
use crate::index_controller::update_file_store::UpdateFileStore;
use crate::index_controller::updates::status::{Failed, Processed, Processing};
use super::error::Result;
use super::index::Index;
use super::update_handler::UpdateHandler;
use super::{Checked, IndexMeta, IndexStats, SearchQuery, SearchResult, Settings};
pub struct Stub<A, R> {
name: String,
times: Mutex<Option<usize>>,
stub: Box<dyn Fn(A) -> R + Sync + Send>,
invalidated: AtomicBool,
} }
fn new_txn(index: &Index, txn: &heed::RoTxn) -> Result<Self> { impl<A, R> Drop for Stub<A, R> {
let created_at = index.created_at(txn)?; fn drop(&mut self) {
let updated_at = index.updated_at(txn)?; if !self.invalidated.load(Ordering::Relaxed) {
let primary_key = index.primary_key(txn)?.map(String::from); let lock = self.times.lock().unwrap();
Ok(Self { if let Some(n) = *lock {
created_at, assert_eq!(n, 0, "{} not called enough times", self.name);
updated_at, }
primary_key, }
})
} }
}
#[derive(Clone, derivative::Derivative)]
#[derivative(Debug)]
pub struct Index {
pub uuid: Uuid,
#[derivative(Debug = "ignore")]
pub inner: Arc<milli::Index>,
#[derivative(Debug = "ignore")]
update_file_store: Arc<UpdateFileStore>,
#[derivative(Debug = "ignore")]
update_handler: Arc<UpdateHandler>,
}
impl Deref for Index {
type Target = milli::Index;
fn deref(&self) -> &Self::Target {
self.inner.as_ref()
} }
}
impl Index { impl<A, R> Stub<A, R> {
fn invalidate(&self) {
self.invalidated.store(true, Ordering::Relaxed);
}
}
impl<A: UnwindSafe, R> Stub<A, R> {
fn call(&self, args: A) -> R {
let mut lock = self.times.lock().unwrap();
match *lock {
Some(0) => panic!("{} called to many times", self.name),
Some(ref mut times) => {
*times -= 1;
}
None => (),
}
// Since we add assertions in the drop implementation for Stub, a panic can occur in a
// panic, causing a hard abort of the program. To handle that, we catch the panic, and
// set the stub as invalidated so the assertions aren't run during the drop.
impl<'a, A, R> RefUnwindSafe for StubHolder<'a, A, R> {}
struct StubHolder<'a, A, R>(&'a (dyn Fn(A) -> R + Sync + Send));
let stub = StubHolder(self.stub.as_ref());
match std::panic::catch_unwind(|| (stub.0)(args)) {
Ok(r) => r,
Err(panic) => {
self.invalidate();
std::panic::resume_unwind(panic);
}
}
}
}
#[derive(Debug, Default)]
struct StubStore {
inner: Arc<Mutex<HashMap<String, Box<dyn Any + Sync + Send>>>>,
}
impl StubStore {
pub fn insert<A: 'static, R: 'static>(&self, name: String, stub: Stub<A, R>) {
let mut lock = self.inner.lock().unwrap();
lock.insert(name, Box::new(stub));
}
pub fn get<A, B>(&self, name: &str) -> Option<&Stub<A, B>> {
let mut lock = self.inner.lock().unwrap();
match lock.get_mut(name) {
Some(s) => {
let s = s.as_mut() as *mut dyn Any as *mut Stub<A, B>;
Some(unsafe { &mut *s })
}
None => None,
}
}
}
pub struct StubBuilder<'a, A, R> {
name: String,
store: &'a StubStore,
times: Option<usize>,
_f: std::marker::PhantomData<fn(A) -> R>,
}
impl<'a, A: 'static, R: 'static> StubBuilder<'a, A, R> {
/// Asserts the stub has been called exactly `times` times.
#[must_use]
pub fn times(mut self, times: usize) -> Self {
self.times = Some(times);
self
}
/// Asserts the stub has been called exactly once.
#[must_use]
pub fn once(mut self) -> Self {
self.times = Some(1);
self
}
/// The function that will be called when the stub is called. This needs to be called to
/// actually build the stub and register it to the stub store.
pub fn then(self, f: impl Fn(A) -> R + Sync + Send + 'static) {
let times = Mutex::new(self.times);
let stub = Stub {
stub: Box::new(f),
times,
name: self.name.clone(),
invalidated: AtomicBool::new(false),
};
self.store.insert(self.name, stub);
}
}
/// Mocker allows to stub metod call on any struct. you can register stubs by calling
/// `Mocker::when` and retrieve it in the proxy implementation when with `Mocker::get`.
#[derive(Debug, Default)]
pub struct Mocker {
store: StubStore,
}
impl Mocker {
pub fn when<A, R>(&self, name: &str) -> StubBuilder<A, R> {
StubBuilder {
name: name.to_string(),
store: &self.store,
times: None,
_f: std::marker::PhantomData,
}
}
pub fn get<A, R>(&self, name: &str) -> &Stub<A, R> {
match self.store.get(name) {
Some(stub) => stub,
None => {
// panic here causes the stubs to get dropped, and panic in turn. To prevent
// that, we forget them, and let them be cleaned by the os later. This is not
// optimal, but is still better than nested panicks.
let mut stubs = self.store.inner.lock().unwrap();
let stubs = std::mem::take(&mut *stubs);
std::mem::forget(stubs);
panic!("unexpected call to {}", name)
}
}
}
}
#[derive(Debug, Clone)]
pub enum MockIndex {
Vrai(Index),
Faux(Arc<Mocker>),
}
impl MockIndex {
pub fn faux(faux: Mocker) -> Self {
Self::Faux(Arc::new(faux))
}
pub fn open( pub fn open(
path: impl AsRef<Path>, path: impl AsRef<Path>,
size: usize, size: usize,
@ -99,98 +205,52 @@ impl Index {
uuid: Uuid, uuid: Uuid,
update_handler: Arc<UpdateHandler>, update_handler: Arc<UpdateHandler>,
) -> Result<Self> { ) -> Result<Self> {
create_dir_all(&path)?; let index = Index::open(path, size, update_file_store, uuid, update_handler)?;
let mut options = EnvOpenOptions::new(); Ok(Self::Vrai(index))
options.map_size(size); }
let inner = Arc::new(milli::Index::new(options, &path)?);
Ok(Index { pub fn load_dump(
inner, src: impl AsRef<Path>,
update_file_store, dst: impl AsRef<Path>,
uuid, size: usize,
update_handler, update_handler: &UpdateHandler,
}) ) -> anyhow::Result<()> {
Index::load_dump(src, dst, size, update_handler)?;
Ok(())
}
pub fn handle_update(&self, update: Processing) -> std::result::Result<Processed, Failed> {
match self {
MockIndex::Vrai(index) => index.handle_update(update),
MockIndex::Faux(faux) => faux.get("handle_update").call(update),
}
}
pub fn uuid(&self) -> Uuid {
match self {
MockIndex::Vrai(index) => index.uuid(),
MockIndex::Faux(faux) => faux.get("uuid").call(()),
}
} }
pub fn stats(&self) -> Result<IndexStats> { pub fn stats(&self) -> Result<IndexStats> {
let rtxn = self.read_txn()?; match self {
MockIndex::Vrai(index) => index.stats(),
Ok(IndexStats { MockIndex::Faux(_) => todo!(),
size: self.size(), }
number_of_documents: self.number_of_documents(&rtxn)?,
is_indexing: None,
field_distribution: self.field_distribution(&rtxn)?,
})
} }
pub fn meta(&self) -> Result<IndexMeta> { pub fn meta(&self) -> Result<IndexMeta> {
IndexMeta::new(self) match self {
MockIndex::Vrai(index) => index.meta(),
MockIndex::Faux(_) => todo!(),
}
} }
pub fn settings(&self) -> Result<Settings<Checked>> { pub fn settings(&self) -> Result<Settings<Checked>> {
let txn = self.read_txn()?; match self {
self.settings_txn(&txn) MockIndex::Vrai(index) => index.settings(),
MockIndex::Faux(_) => todo!(),
} }
pub fn settings_txn(&self, txn: &RoTxn) -> Result<Settings<Checked>> {
let displayed_attributes = self
.displayed_fields(txn)?
.map(|fields| fields.into_iter().map(String::from).collect());
let searchable_attributes = self
.searchable_fields(txn)?
.map(|fields| fields.into_iter().map(String::from).collect());
let filterable_attributes = self.filterable_fields(txn)?.into_iter().collect();
let sortable_attributes = self.sortable_fields(txn)?.into_iter().collect();
let criteria = self
.criteria(txn)?
.into_iter()
.map(|c| c.to_string())
.collect();
let stop_words = self
.stop_words(txn)?
.map(|stop_words| -> Result<BTreeSet<_>> {
Ok(stop_words.stream().into_strs()?.into_iter().collect())
})
.transpose()?
.unwrap_or_else(BTreeSet::new);
let distinct_field = self.distinct_field(txn)?.map(String::from);
// in milli each word in the synonyms map were split on their separator. Since we lost
// this information we are going to put space between words.
let synonyms = self
.synonyms(txn)?
.iter()
.map(|(key, values)| {
(
key.join(" "),
values.iter().map(|value| value.join(" ")).collect(),
)
})
.collect();
Ok(Settings {
displayed_attributes: match displayed_attributes {
Some(attrs) => Setting::Set(attrs),
None => Setting::Reset,
},
searchable_attributes: match searchable_attributes {
Some(attrs) => Setting::Set(attrs),
None => Setting::Reset,
},
filterable_attributes: Setting::Set(filterable_attributes),
sortable_attributes: Setting::Set(sortable_attributes),
ranking_rules: Setting::Set(criteria),
stop_words: Setting::Set(stop_words),
distinct_attribute: match distinct_field {
Some(field) => Setting::Set(field),
None => Setting::Reset,
},
synonyms: Setting::Set(synonyms),
_kind: PhantomData,
})
} }
pub fn retrieve_documents<S: AsRef<str>>( pub fn retrieve_documents<S: AsRef<str>>(
@ -199,23 +259,12 @@ impl Index {
limit: usize, limit: usize,
attributes_to_retrieve: Option<Vec<S>>, attributes_to_retrieve: Option<Vec<S>>,
) -> Result<Vec<Map<String, Value>>> { ) -> Result<Vec<Map<String, Value>>> {
let txn = self.read_txn()?; match self {
MockIndex::Vrai(index) => {
let fields_ids_map = self.fields_ids_map(&txn)?; index.retrieve_documents(offset, limit, attributes_to_retrieve)
let fields_to_display = }
self.fields_to_display(&txn, &attributes_to_retrieve, &fields_ids_map)?; MockIndex::Faux(_) => todo!(),
let iter = self.documents.range(&txn, &(..))?.skip(offset).take(limit);
let mut documents = Vec::new();
for entry in iter {
let (_id, obkv) = entry?;
let object = obkv_to_json(&fields_to_display, &fields_ids_map, obkv)?;
documents.push(object);
} }
Ok(documents)
} }
pub fn retrieve_document<S: AsRef<str>>( pub fn retrieve_document<S: AsRef<str>>(
@ -223,65 +272,94 @@ impl Index {
doc_id: String, doc_id: String,
attributes_to_retrieve: Option<Vec<S>>, attributes_to_retrieve: Option<Vec<S>>,
) -> Result<Map<String, Value>> { ) -> Result<Map<String, Value>> {
let txn = self.read_txn()?; match self {
MockIndex::Vrai(index) => index.retrieve_document(doc_id, attributes_to_retrieve),
let fields_ids_map = self.fields_ids_map(&txn)?; MockIndex::Faux(_) => todo!(),
}
let fields_to_display =
self.fields_to_display(&txn, &attributes_to_retrieve, &fields_ids_map)?;
let internal_id = self
.external_documents_ids(&txn)?
.get(doc_id.as_bytes())
.ok_or_else(|| IndexError::DocumentNotFound(doc_id.clone()))?;
let document = self
.documents(&txn, std::iter::once(internal_id))?
.into_iter()
.next()
.map(|(_, d)| d)
.ok_or(IndexError::DocumentNotFound(doc_id))?;
let document = obkv_to_json(&fields_to_display, &fields_ids_map, document)?;
Ok(document)
} }
pub fn size(&self) -> u64 { pub fn size(&self) -> u64 {
self.env.size() match self {
MockIndex::Vrai(index) => index.size(),
MockIndex::Faux(_) => todo!(),
} }
fn fields_to_display<S: AsRef<str>>(
&self,
txn: &heed::RoTxn,
attributes_to_retrieve: &Option<Vec<S>>,
fields_ids_map: &milli::FieldsIdsMap,
) -> Result<Vec<FieldId>> {
let mut displayed_fields_ids = match self.displayed_fields_ids(txn)? {
Some(ids) => ids.into_iter().collect::<Vec<_>>(),
None => fields_ids_map.iter().map(|(id, _)| id).collect(),
};
let attributes_to_retrieve_ids = match attributes_to_retrieve {
Some(attrs) => attrs
.iter()
.filter_map(|f| fields_ids_map.id(f.as_ref()))
.collect::<HashSet<_>>(),
None => fields_ids_map.iter().map(|(id, _)| id).collect(),
};
displayed_fields_ids.retain(|fid| attributes_to_retrieve_ids.contains(fid));
Ok(displayed_fields_ids)
} }
pub fn snapshot(&self, path: impl AsRef<Path>) -> Result<()> { pub fn snapshot(&self, path: impl AsRef<Path>) -> Result<()> {
let mut dst = path.as_ref().join(format!("indexes/{}/", self.uuid)); match self {
create_dir_all(&dst)?; MockIndex::Vrai(index) => index.snapshot(path),
dst.push("data.mdb"); MockIndex::Faux(faux) => faux.get("snapshot").call(path.as_ref()),
let _txn = self.write_txn()?; }
self.inner }
.env
.copy_to_path(dst, heed::CompactionOption::Enabled)?; pub fn inner(&self) -> &milli::Index {
Ok(()) match self {
MockIndex::Vrai(index) => index.inner(),
MockIndex::Faux(_) => todo!(),
}
}
pub fn update_primary_key(&self, primary_key: Option<String>) -> Result<IndexMeta> {
match self {
MockIndex::Vrai(index) => index.update_primary_key(primary_key),
MockIndex::Faux(_) => todo!(),
}
}
pub fn perform_search(&self, query: SearchQuery) -> Result<SearchResult> {
match self {
MockIndex::Vrai(index) => index.perform_search(query),
MockIndex::Faux(faux) => faux.get("perform_search").call(query),
}
}
pub fn dump(&self, path: impl AsRef<Path>) -> Result<()> {
match self {
MockIndex::Vrai(index) => index.dump(path),
MockIndex::Faux(faux) => faux.get("dump").call(path.as_ref()),
}
}
}
#[test]
fn test_faux_index() {
let faux = Mocker::default();
faux.when("snapshot")
.times(2)
.then(|_: &Path| -> Result<()> { Ok(()) });
let index = MockIndex::faux(faux);
let path = PathBuf::from("hello");
index.snapshot(&path).unwrap();
index.snapshot(&path).unwrap();
}
#[test]
#[should_panic]
fn test_faux_unexisting_method_stub() {
let faux = Mocker::default();
let index = MockIndex::faux(faux);
let path = PathBuf::from("hello");
index.snapshot(&path).unwrap();
index.snapshot(&path).unwrap();
}
#[test]
#[should_panic]
fn test_faux_panic() {
let faux = Mocker::default();
faux.when("snapshot")
.times(2)
.then(|_: &Path| -> Result<()> {
panic!();
});
let index = MockIndex::faux(faux);
let path = PathBuf::from("hello");
index.snapshot(&path).unwrap();
index.snapshot(&path).unwrap();
} }
} }

View File

@ -12,15 +12,14 @@ use serde::{Deserialize, Serialize};
use serde_json::{json, Value}; use serde_json::{json, Value};
use crate::index::error::FacetError; use crate::index::error::FacetError;
use crate::index::IndexError;
use super::error::Result; use super::error::{IndexError, Result};
use super::Index; use super::index::Index;
pub type Document = IndexMap<String, Value>; pub type Document = IndexMap<String, Value>;
type MatchesInfo = BTreeMap<String, Vec<MatchInfo>>; type MatchesInfo = BTreeMap<String, Vec<MatchInfo>>;
#[derive(Serialize, Debug, Clone)] #[derive(Serialize, Debug, Clone, PartialEq)]
pub struct MatchInfo { pub struct MatchInfo {
start: usize, start: usize,
length: usize, length: usize,
@ -36,7 +35,7 @@ pub const fn default_crop_length() -> usize {
DEFAULT_CROP_LENGTH DEFAULT_CROP_LENGTH
} }
#[derive(Deserialize, Debug)] #[derive(Deserialize, Debug, Clone, PartialEq)]
#[serde(rename_all = "camelCase", deny_unknown_fields)] #[serde(rename_all = "camelCase", deny_unknown_fields)]
pub struct SearchQuery { pub struct SearchQuery {
pub q: Option<String>, pub q: Option<String>,
@ -56,7 +55,7 @@ pub struct SearchQuery {
pub facets_distribution: Option<Vec<String>>, pub facets_distribution: Option<Vec<String>>,
} }
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone, Serialize, PartialEq)]
pub struct SearchHit { pub struct SearchHit {
#[serde(flatten)] #[serde(flatten)]
pub document: Document, pub document: Document,
@ -66,7 +65,7 @@ pub struct SearchHit {
pub matches_info: Option<MatchesInfo>, pub matches_info: Option<MatchesInfo>,
} }
#[derive(Serialize, Debug)] #[derive(Serialize, Debug, Clone, PartialEq)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct SearchResult { pub struct SearchResult {
pub hits: Vec<SearchHit>, pub hits: Vec<SearchHit>,

View File

@ -12,7 +12,7 @@ use crate::index_controller::updates::status::{Failed, Processed, Processing, Up
use crate::Update; use crate::Update;
use super::error::{IndexError, Result}; use super::error::{IndexError, Result};
use super::{Index, IndexMeta}; use super::index::{Index, IndexMeta};
fn serialize_with_wildcard<S>( fn serialize_with_wildcard<S>(
field: &Setting<Vec<String>>, field: &Setting<Vec<String>>,

View File

@ -10,14 +10,16 @@ use tokio::sync::{mpsc, oneshot, RwLock};
use super::error::{DumpActorError, Result}; use super::error::{DumpActorError, Result};
use super::{DumpInfo, DumpMsg, DumpStatus, DumpTask}; use super::{DumpInfo, DumpMsg, DumpStatus, DumpTask};
use crate::index_controller::index_resolver::HardStateIndexResolver; use crate::index_controller::index_resolver::index_store::IndexStore;
use crate::index_controller::index_resolver::uuid_store::UuidStore;
use crate::index_controller::index_resolver::IndexResolver;
use crate::index_controller::updates::UpdateSender; use crate::index_controller::updates::UpdateSender;
pub const CONCURRENT_DUMP_MSG: usize = 10; pub const CONCURRENT_DUMP_MSG: usize = 10;
pub struct DumpActor { pub struct DumpActor<U, I> {
inbox: Option<mpsc::Receiver<DumpMsg>>, inbox: Option<mpsc::Receiver<DumpMsg>>,
index_resolver: Arc<HardStateIndexResolver>, index_resolver: Arc<IndexResolver<U, I>>,
update: UpdateSender, update: UpdateSender,
dump_path: PathBuf, dump_path: PathBuf,
lock: Arc<Mutex<()>>, lock: Arc<Mutex<()>>,
@ -31,10 +33,14 @@ fn generate_uid() -> String {
Utc::now().format("%Y%m%d-%H%M%S%3f").to_string() Utc::now().format("%Y%m%d-%H%M%S%3f").to_string()
} }
impl DumpActor { impl<U, I> DumpActor<U, I>
where
U: UuidStore + Sync + Send + 'static,
I: IndexStore + Sync + Send + 'static,
{
pub fn new( pub fn new(
inbox: mpsc::Receiver<DumpMsg>, inbox: mpsc::Receiver<DumpMsg>,
index_resolver: Arc<HardStateIndexResolver>, index_resolver: Arc<IndexResolver<U, I>>,
update: UpdateSender, update: UpdateSender,
dump_path: impl AsRef<Path>, dump_path: impl AsRef<Path>,
index_db_size: usize, index_db_size: usize,
@ -114,7 +120,7 @@ impl DumpActor {
let task = DumpTask { let task = DumpTask {
path: self.dump_path.clone(), path: self.dump_path.clone(),
index_resolver: self.index_resolver.clone(), index_resolver: self.index_resolver.clone(),
update_handle: self.update.clone(), update_sender: self.update.clone(),
uid: uid.clone(), uid: uid.clone(),
update_db_size: self.update_db_size, update_db_size: self.update_db_size,
index_db_size: self.index_db_size, index_db_size: self.index_db_size,

View File

@ -13,7 +13,9 @@ pub use actor::DumpActor;
pub use handle_impl::*; pub use handle_impl::*;
pub use message::DumpMsg; pub use message::DumpMsg;
use super::index_resolver::HardStateIndexResolver; use super::index_resolver::index_store::IndexStore;
use super::index_resolver::uuid_store::UuidStore;
use super::index_resolver::IndexResolver;
use super::updates::UpdateSender; use super::updates::UpdateSender;
use crate::compression::{from_tar_gz, to_tar_gz}; use crate::compression::{from_tar_gz, to_tar_gz};
use crate::index_controller::dump_actor::error::DumpActorError; use crate::index_controller::dump_actor::error::DumpActorError;
@ -51,6 +53,7 @@ impl Metadata {
} }
#[async_trait::async_trait] #[async_trait::async_trait]
#[cfg_attr(test, mockall::automock)]
pub trait DumpActorHandle { pub trait DumpActorHandle {
/// Start the creation of a dump /// Start the creation of a dump
/// Implementation: [handle_impl::DumpActorHandleImpl::create_dump] /// Implementation: [handle_impl::DumpActorHandleImpl::create_dump]
@ -218,16 +221,20 @@ pub fn load_dump(
Ok(()) Ok(())
} }
struct DumpTask { struct DumpTask<U, I> {
path: PathBuf, path: PathBuf,
index_resolver: Arc<HardStateIndexResolver>, index_resolver: Arc<IndexResolver<U, I>>,
update_handle: UpdateSender, update_sender: UpdateSender,
uid: String, uid: String,
update_db_size: usize, update_db_size: usize,
index_db_size: usize, index_db_size: usize,
} }
impl DumpTask { impl<U, I> DumpTask<U, I>
where
U: UuidStore + Sync + Send + 'static,
I: IndexStore + Sync + Send + 'static,
{
async fn run(self) -> Result<()> { async fn run(self) -> Result<()> {
trace!("Performing dump."); trace!("Performing dump.");
@ -243,7 +250,7 @@ impl DumpTask {
let uuids = self.index_resolver.dump(temp_dump_path.clone()).await?; let uuids = self.index_resolver.dump(temp_dump_path.clone()).await?;
UpdateMsg::dump(&self.update_handle, uuids, temp_dump_path.clone()).await?; UpdateMsg::dump(&self.update_sender, uuids, temp_dump_path.clone()).await?;
let dump_path = tokio::task::spawn_blocking(move || -> Result<PathBuf> { let dump_path = tokio::task::spawn_blocking(move || -> Result<PathBuf> {
let temp_dump_file = tempfile::NamedTempFile::new()?; let temp_dump_file = tempfile::NamedTempFile::new()?;
@ -262,3 +269,110 @@ impl DumpTask {
Ok(()) Ok(())
} }
} }
#[cfg(test)]
mod test {
use std::collections::HashSet;
use futures::future::{err, ok};
use once_cell::sync::Lazy;
use uuid::Uuid;
use super::*;
use crate::index::error::Result as IndexResult;
use crate::index::test::Mocker;
use crate::index::Index;
use crate::index_controller::index_resolver::error::IndexResolverError;
use crate::index_controller::index_resolver::index_store::MockIndexStore;
use crate::index_controller::index_resolver::uuid_store::MockUuidStore;
use crate::index_controller::updates::create_update_handler;
fn setup() {
static SETUP: Lazy<()> = Lazy::new(|| {
if cfg!(windows) {
std::env::set_var("TMP", ".");
} else {
std::env::set_var("TMPDIR", ".");
}
});
// just deref to make sure the env is setup
*SETUP
}
#[actix_rt::test]
async fn test_dump_normal() {
setup();
let tmp = tempfile::tempdir().unwrap();
let uuids = std::iter::repeat_with(Uuid::new_v4)
.take(4)
.collect::<HashSet<_>>();
let mut uuid_store = MockUuidStore::new();
let uuids_cloned = uuids.clone();
uuid_store
.expect_dump()
.once()
.returning(move |_| Box::pin(ok(uuids_cloned.clone())));
let mut index_store = MockIndexStore::new();
index_store.expect_get().times(4).returning(move |uuid| {
let mocker = Mocker::default();
let uuids_clone = uuids.clone();
mocker.when::<(), Uuid>("uuid").once().then(move |_| {
assert!(uuids_clone.contains(&uuid));
uuid
});
mocker
.when::<&Path, IndexResult<()>>("dump")
.once()
.then(move |_| Ok(()));
Box::pin(ok(Some(Index::faux(mocker))))
});
let index_resolver = Arc::new(IndexResolver::new(uuid_store, index_store));
let update_sender =
create_update_handler(index_resolver.clone(), tmp.path(), 4096 * 100).unwrap();
let task = DumpTask {
path: tmp.path().to_owned(),
index_resolver,
update_sender,
uid: String::from("test"),
update_db_size: 4096 * 10,
index_db_size: 4096 * 10,
};
task.run().await.unwrap();
}
#[actix_rt::test]
async fn error_performing_dump() {
let tmp = tempfile::tempdir().unwrap();
let mut uuid_store = MockUuidStore::new();
uuid_store
.expect_dump()
.once()
.returning(move |_| Box::pin(err(IndexResolverError::ExistingPrimaryKey)));
let index_store = MockIndexStore::new();
let index_resolver = Arc::new(IndexResolver::new(uuid_store, index_store));
let update_sender =
create_update_handler(index_resolver.clone(), tmp.path(), 4096 * 100).unwrap();
let task = DumpTask {
path: tmp.path().to_owned(),
index_resolver,
update_sender,
uid: String::from("test"),
update_db_size: 4096 * 10,
index_db_size: 4096 * 10,
};
assert!(task.run().await.is_err());
}
}

View File

@ -17,6 +17,7 @@ use crate::options::IndexerOpts;
type AsyncMap<K, V> = Arc<RwLock<HashMap<K, V>>>; type AsyncMap<K, V> = Arc<RwLock<HashMap<K, V>>>;
#[async_trait::async_trait] #[async_trait::async_trait]
#[cfg_attr(test, mockall::automock)]
pub trait IndexStore { pub trait IndexStore {
async fn create(&self, uuid: Uuid, primary_key: Option<String>) -> Result<Index>; async fn create(&self, uuid: Uuid, primary_key: Option<String>) -> Result<Index>;
async fn get(&self, uuid: Uuid) -> Result<Option<Index>>; async fn get(&self, uuid: Uuid) -> Result<Option<Index>>;
@ -72,9 +73,10 @@ impl IndexStore for MapIndexStore {
let index = spawn_blocking(move || -> Result<Index> { let index = spawn_blocking(move || -> Result<Index> {
let index = Index::open(path, index_size, file_store, uuid, update_handler)?; let index = Index::open(path, index_size, file_store, uuid, update_handler)?;
if let Some(primary_key) = primary_key { if let Some(primary_key) = primary_key {
let mut txn = index.write_txn()?; let inner = index.inner();
let mut txn = inner.write_txn()?;
let mut builder = UpdateBuilder::new(0).settings(&mut txn, &index); let mut builder = UpdateBuilder::new(0).settings(&mut txn, index.inner());
builder.set_primary_key(primary_key); builder.set_primary_key(primary_key);
builder.execute(|_, _| ())?; builder.execute(|_, _| ())?;

View File

@ -1,5 +1,5 @@
pub mod error; pub mod error;
mod index_store; pub mod index_store;
pub mod uuid_store; pub mod uuid_store;
use std::path::Path; use std::path::Path;

View File

@ -22,6 +22,7 @@ struct DumpEntry {
const UUIDS_DB_PATH: &str = "index_uuids"; const UUIDS_DB_PATH: &str = "index_uuids";
#[async_trait::async_trait] #[async_trait::async_trait]
#[cfg_attr(test, mockall::automock)]
pub trait UuidStore: Sized { pub trait UuidStore: Sized {
// Create a new entry for `name`. Return an error if `err` and the entry already exists, return // Create a new entry for `name`. Return an error if `err` and the entry already exists, return
// the uuid otherwise. // the uuid otherwise.

View File

@ -30,7 +30,9 @@ use error::Result;
use self::dump_actor::load_dump; use self::dump_actor::load_dump;
use self::index_resolver::error::IndexResolverError; use self::index_resolver::error::IndexResolverError;
use self::index_resolver::HardStateIndexResolver; use self::index_resolver::index_store::{IndexStore, MapIndexStore};
use self::index_resolver::uuid_store::{HeedUuidStore, UuidStore};
use self::index_resolver::IndexResolver;
use self::updates::status::UpdateStatus; use self::updates::status::UpdateStatus;
use self::updates::UpdateMsg; use self::updates::UpdateMsg;
@ -41,6 +43,10 @@ mod snapshot;
pub mod update_file_store; pub mod update_file_store;
pub mod updates; pub mod updates;
/// Concrete implementation of the IndexController, exposed by meilisearch-lib
pub type MeiliSearch =
IndexController<HeedUuidStore, MapIndexStore, dump_actor::DumpActorHandleImpl>;
pub type Payload = Box< pub type Payload = Box<
dyn Stream<Item = std::result::Result<Bytes, PayloadError>> + Send + Sync + 'static + Unpin, dyn Stream<Item = std::result::Result<Bytes, PayloadError>> + Send + Sync + 'static + Unpin,
>; >;
@ -62,13 +68,6 @@ pub struct IndexSettings {
pub primary_key: Option<String>, pub primary_key: Option<String>,
} }
#[derive(Clone)]
pub struct IndexController {
index_resolver: Arc<HardStateIndexResolver>,
update_sender: updates::UpdateSender,
dump_handle: dump_actor::DumpActorHandleImpl,
}
#[derive(Debug)] #[derive(Debug)]
pub enum DocumentAdditionFormat { pub enum DocumentAdditionFormat {
Json, Json,
@ -129,7 +128,7 @@ impl IndexControllerBuilder {
self, self,
db_path: impl AsRef<Path>, db_path: impl AsRef<Path>,
indexer_options: IndexerOpts, indexer_options: IndexerOpts,
) -> anyhow::Result<IndexController> { ) -> anyhow::Result<MeiliSearch> {
let index_size = self let index_size = self
.max_index_size .max_index_size
.ok_or_else(|| anyhow::anyhow!("Missing index size"))?; .ok_or_else(|| anyhow::anyhow!("Missing index size"))?;
@ -178,6 +177,8 @@ impl IndexControllerBuilder {
update_store_size, update_store_size,
)?; )?;
let dump_handle = Arc::new(dump_handle);
if self.schedule_snapshot { if self.schedule_snapshot {
let snapshot_service = SnapshotService::new( let snapshot_service = SnapshotService::new(
index_resolver.clone(), index_resolver.clone(),
@ -266,7 +267,22 @@ impl IndexControllerBuilder {
} }
} }
impl IndexController { // We are using derivative here to derive Clone, because U, I and D do not necessarily implement
// Clone themselves.
#[derive(derivative::Derivative)]
#[derivative(Clone(bound = ""))]
pub struct IndexController<U, I, D> {
index_resolver: Arc<IndexResolver<U, I>>,
update_sender: updates::UpdateSender,
dump_handle: Arc<D>,
}
impl<U, I, D> IndexController<U, I, D>
where
U: UuidStore + Sync + Send + 'static,
I: IndexStore + Sync + Send + 'static,
D: DumpActorHandle + Send + Sync,
{
pub fn builder() -> IndexControllerBuilder { pub fn builder() -> IndexControllerBuilder {
IndexControllerBuilder::default() IndexControllerBuilder::default()
} }
@ -286,7 +302,7 @@ impl IndexController {
if create_index { if create_index {
let index = self.index_resolver.create_index(name, None).await?; let index = self.index_resolver.create_index(name, None).await?;
let update_result = let update_result =
UpdateMsg::update(&self.update_sender, index.uuid, update).await?; UpdateMsg::update(&self.update_sender, index.uuid(), update).await?;
Ok(update_result) Ok(update_result)
} else { } else {
Err(IndexResolverError::UnexistingIndex(name).into()) Err(IndexResolverError::UnexistingIndex(name).into())
@ -314,7 +330,7 @@ impl IndexController {
for (uid, index) in indexes { for (uid, index) in indexes {
let meta = index.meta()?; let meta = index.meta()?;
let meta = IndexMetadata { let meta = IndexMetadata {
uuid: index.uuid, uuid: index.uuid(),
name: uid.clone(), name: uid.clone(),
uid, uid,
meta, meta,
@ -366,7 +382,7 @@ impl IndexController {
index_settings.uid.take(); index_settings.uid.take();
let index = self.index_resolver.get_index(uid.clone()).await?; let index = self.index_resolver.get_index(uid.clone()).await?;
let uuid = index.uuid; let uuid = index.uuid();
let meta = let meta =
spawn_blocking(move || index.update_primary_key(index_settings.primary_key)).await??; spawn_blocking(move || index.update_primary_key(index_settings.primary_key)).await??;
let meta = IndexMetadata { let meta = IndexMetadata {
@ -386,7 +402,7 @@ impl IndexController {
pub async fn get_index(&self, uid: String) -> Result<IndexMetadata> { pub async fn get_index(&self, uid: String) -> Result<IndexMetadata> {
let index = self.index_resolver.get_index(uid.clone()).await?; let index = self.index_resolver.get_index(uid.clone()).await?;
let uuid = index.uuid; let uuid = index.uuid();
let meta = spawn_blocking(move || index.meta()).await??; let meta = spawn_blocking(move || index.meta()).await??;
let meta = IndexMetadata { let meta = IndexMetadata {
uuid, uuid,
@ -400,7 +416,7 @@ impl IndexController {
pub async fn get_index_stats(&self, uid: String) -> Result<IndexStats> { pub async fn get_index_stats(&self, uid: String) -> Result<IndexStats> {
let update_infos = UpdateMsg::get_info(&self.update_sender).await?; let update_infos = UpdateMsg::get_info(&self.update_sender).await?;
let index = self.index_resolver.get_index(uid).await?; let index = self.index_resolver.get_index(uid).await?;
let uuid = index.uuid; let uuid = index.uuid();
let mut stats = spawn_blocking(move || index.stats()).await??; let mut stats = spawn_blocking(move || index.stats()).await??;
// Check if the currently indexing update is from our index. // Check if the currently indexing update is from our index.
stats.is_indexing = Some(Some(uuid) == update_infos.processing); stats.is_indexing = Some(Some(uuid) == update_infos.processing);
@ -414,7 +430,7 @@ impl IndexController {
let mut indexes = BTreeMap::new(); let mut indexes = BTreeMap::new();
for (index_uid, index) in self.index_resolver.list().await? { for (index_uid, index) in self.index_resolver.list().await? {
let uuid = index.uuid; let uuid = index.uuid();
let (mut stats, meta) = spawn_blocking::<_, IndexResult<_>>(move || { let (mut stats, meta) = spawn_blocking::<_, IndexResult<_>>(move || {
let stats = index.stats()?; let stats = index.stats()?;
let meta = index.meta()?; let meta = index.meta()?;
@ -461,7 +477,7 @@ impl IndexController {
let meta = spawn_blocking(move || -> IndexResult<_> { let meta = spawn_blocking(move || -> IndexResult<_> {
let meta = index.meta()?; let meta = index.meta()?;
let meta = IndexMetadata { let meta = IndexMetadata {
uuid: index.uuid, uuid: index.uuid(),
uid: uid.clone(), uid: uid.clone(),
name: uid, name: uid,
meta, meta,
@ -497,3 +513,103 @@ pub async fn get_arc_ownership_blocking<T>(mut item: Arc<T>) -> T {
} }
} }
} }
#[cfg(test)]
mod test {
use futures::future::ok;
use mockall::predicate::eq;
use tokio::sync::mpsc;
use crate::index::error::Result as IndexResult;
use crate::index::test::Mocker;
use crate::index::Index;
use crate::index_controller::dump_actor::MockDumpActorHandle;
use crate::index_controller::index_resolver::index_store::MockIndexStore;
use crate::index_controller::index_resolver::uuid_store::MockUuidStore;
use super::updates::UpdateSender;
use super::*;
impl<D: DumpActorHandle> IndexController<MockUuidStore, MockIndexStore, D> {
pub fn mock(
index_resolver: IndexResolver<MockUuidStore, MockIndexStore>,
update_sender: UpdateSender,
dump_handle: D,
) -> Self {
IndexController {
index_resolver: Arc::new(index_resolver),
update_sender,
dump_handle: Arc::new(dump_handle),
}
}
}
#[actix_rt::test]
async fn test_search_simple() {
let index_uid = "test";
let index_uuid = Uuid::new_v4();
let query = SearchQuery {
q: Some(String::from("hello world")),
offset: Some(10),
limit: 0,
attributes_to_retrieve: Some(vec!["string".to_owned()].into_iter().collect()),
attributes_to_crop: None,
crop_length: 18,
attributes_to_highlight: None,
matches: true,
filter: None,
sort: None,
facets_distribution: None,
};
let result = SearchResult {
hits: vec![],
nb_hits: 29,
exhaustive_nb_hits: true,
query: "hello world".to_string(),
limit: 24,
offset: 0,
processing_time_ms: 50,
facets_distribution: None,
exhaustive_facets_count: Some(true),
};
let mut uuid_store = MockUuidStore::new();
uuid_store
.expect_get_uuid()
.with(eq(index_uid.to_owned()))
.returning(move |s| Box::pin(ok((s, Some(index_uuid)))));
let mut index_store = MockIndexStore::new();
let result_clone = result.clone();
let query_clone = query.clone();
index_store
.expect_get()
.with(eq(index_uuid))
.returning(move |_uuid| {
let result = result_clone.clone();
let query = query_clone.clone();
let mocker = Mocker::default();
mocker
.when::<SearchQuery, IndexResult<SearchResult>>("perform_search")
.once()
.then(move |q| {
assert_eq!(&q, &query);
Ok(result.clone())
});
let index = Index::faux(mocker);
Box::pin(ok(Some(index)))
});
let index_resolver = IndexResolver::new(uuid_store, index_store);
let (update_sender, _) = mpsc::channel(1);
let dump_actor = MockDumpActorHandle::new();
let index_controller = IndexController::mock(index_resolver, update_sender, dump_actor);
let r = index_controller
.search(index_uid.to_owned(), query.clone())
.await
.unwrap();
assert_eq!(r, result);
}
}

View File

@ -11,20 +11,26 @@ use tokio::time::sleep;
use crate::compression::from_tar_gz; use crate::compression::from_tar_gz;
use crate::index_controller::updates::UpdateMsg; use crate::index_controller::updates::UpdateMsg;
use super::index_resolver::HardStateIndexResolver; use super::index_resolver::index_store::IndexStore;
use super::index_resolver::uuid_store::UuidStore;
use super::index_resolver::IndexResolver;
use super::updates::UpdateSender; use super::updates::UpdateSender;
pub struct SnapshotService { pub struct SnapshotService<U, I> {
index_resolver: Arc<HardStateIndexResolver>, index_resolver: Arc<IndexResolver<U, I>>,
update_sender: UpdateSender, update_sender: UpdateSender,
snapshot_period: Duration, snapshot_period: Duration,
snapshot_path: PathBuf, snapshot_path: PathBuf,
db_name: String, db_name: String,
} }
impl SnapshotService { impl<U, I> SnapshotService<U, I>
where
U: UuidStore + Sync + Send + 'static,
I: IndexStore + Sync + Send + 'static,
{
pub fn new( pub fn new(
index_resolver: Arc<HardStateIndexResolver>, index_resolver: Arc<IndexResolver<U, I>>,
update_sender: UpdateSender, update_sender: UpdateSender,
snapshot_period: Duration, snapshot_period: Duration,
snapshot_path: PathBuf, snapshot_path: PathBuf,
@ -125,133 +131,169 @@ pub fn load_snapshot(
} }
} }
//#[cfg(test)] #[cfg(test)]
//mod test { mod test {
//use std::iter::FromIterator; use std::{collections::HashSet, sync::Arc};
//use std::{collections::HashSet, sync::Arc};
//use futures::future::{err, ok}; use futures::future::{err, ok};
//use rand::Rng; use once_cell::sync::Lazy;
//use tokio::time::timeout; use rand::Rng;
//use uuid::Uuid; use uuid::Uuid;
//use super::*; use crate::index::error::IndexError;
use crate::index::test::Mocker;
use crate::index::{error::Result as IndexResult, Index};
use crate::index_controller::index_resolver::error::IndexResolverError;
use crate::index_controller::index_resolver::index_store::MockIndexStore;
use crate::index_controller::index_resolver::uuid_store::MockUuidStore;
use crate::index_controller::index_resolver::IndexResolver;
use crate::index_controller::updates::create_update_handler;
//#[actix_rt::test] use super::*;
//async fn test_normal() {
//let mut rng = rand::thread_rng();
//let uuids_num: usize = rng.gen_range(5..10);
//let uuids = (0..uuids_num)
//.map(|_| Uuid::new_v4())
//.collect::<HashSet<_>>();
//let mut uuid_resolver = MockUuidResolverHandle::new(); fn setup() {
//let uuids_clone = uuids.clone(); static SETUP: Lazy<()> = Lazy::new(|| {
//uuid_resolver if cfg!(windows) {
//.expect_snapshot() std::env::set_var("TMP", ".");
//.times(1) } else {
//.returning(move |_| Box::pin(ok(uuids_clone.clone()))); std::env::set_var("TMPDIR", ".");
}
});
//let uuids_clone = uuids.clone(); // just deref to make sure the env is setup
//let mut index_handle = MockIndexActorHandle::new(); *SETUP
//index_handle }
//.expect_snapshot()
//.withf(move |uuid, _path| uuids_clone.contains(uuid))
//.times(uuids_num)
//.returning(move |_, _| Box::pin(ok(())));
//let dir = tempfile::tempdir_in(".").unwrap(); #[actix_rt::test]
//let handle = Arc::new(index_handle); async fn test_normal() {
//let update_handle = setup();
//UpdateActorHandleImpl::<Vec<u8>>::new(handle.clone(), dir.path(), 4096 * 100).unwrap();
//let snapshot_path = tempfile::tempdir_in(".").unwrap(); let mut rng = rand::thread_rng();
//let snapshot_service = SnapshotService::new( let uuids_num: usize = rng.gen_range(5..10);
//uuid_resolver, let uuids = (0..uuids_num)
//update_handle, .map(|_| Uuid::new_v4())
//Duration::from_millis(100), .collect::<HashSet<_>>();
//snapshot_path.path().to_owned(),
//"data.ms".to_string(),
//);
//snapshot_service.perform_snapshot().await.unwrap(); let mut uuid_store = MockUuidStore::new();
//} let uuids_clone = uuids.clone();
uuid_store
.expect_snapshot()
.times(1)
.returning(move |_| Box::pin(ok(uuids_clone.clone())));
//#[actix_rt::test] let mut indexes = uuids.clone().into_iter().map(|uuid| {
//async fn error_performing_uuid_snapshot() { let mocker = Mocker::default();
//let mut uuid_resolver = MockUuidResolverHandle::new(); mocker
//uuid_resolver .when("snapshot")
//.expect_snapshot() .times(1)
//.times(1) .then(|_: &Path| -> IndexResult<()> { Ok(()) });
////abitrary error mocker.when("uuid").then(move |_: ()| uuid);
//.returning(|_| Box::pin(err(UuidResolverError::NameAlreadyExist))); Index::faux(mocker)
});
//let update_handle = MockUpdateActorHandle::new(); let uuids_clone = uuids.clone();
let mut index_store = MockIndexStore::new();
index_store
.expect_get()
.withf(move |uuid| uuids_clone.contains(uuid))
.times(uuids_num)
.returning(move |_| Box::pin(ok(Some(indexes.next().unwrap()))));
//let snapshot_path = tempfile::tempdir_in(".").unwrap(); let index_resolver = Arc::new(IndexResolver::new(uuid_store, index_store));
//let snapshot_service = SnapshotService::new(
//uuid_resolver,
//update_handle,
//Duration::from_millis(100),
//snapshot_path.path().to_owned(),
//"data.ms".to_string(),
//);
//assert!(snapshot_service.perform_snapshot().await.is_err()); let dir = tempfile::tempdir().unwrap();
////Nothing was written to the file let update_sender =
//assert!(!snapshot_path.path().join("data.ms.snapshot").exists()); create_update_handler(index_resolver.clone(), dir.path(), 4096 * 100).unwrap();
//}
//#[actix_rt::test] let snapshot_path = tempfile::tempdir().unwrap();
//async fn error_performing_index_snapshot() { let snapshot_service = SnapshotService::new(
//let uuid = Uuid::new_v4(); index_resolver,
//let mut uuid_resolver = MockUuidResolverHandle::new(); update_sender,
//uuid_resolver Duration::from_millis(100),
//.expect_snapshot() snapshot_path.path().to_owned(),
//.times(1) "data.ms".to_string(),
//.returning(move |_| Box::pin(ok(HashSet::from_iter(Some(uuid))))); );
//let mut update_handle = MockUpdateActorHandle::new(); snapshot_service.perform_snapshot().await.unwrap();
//update_handle }
//.expect_snapshot()
////abitrary error
//.returning(|_, _| Box::pin(err(UpdateActorError::UnexistingUpdate(0))));
//let snapshot_path = tempfile::tempdir_in(".").unwrap(); #[actix_rt::test]
//let snapshot_service = SnapshotService::new( async fn error_performing_uuid_snapshot() {
//uuid_resolver, setup();
//update_handle,
//Duration::from_millis(100),
//snapshot_path.path().to_owned(),
//"data.ms".to_string(),
//);
//assert!(snapshot_service.perform_snapshot().await.is_err()); let mut uuid_store = MockUuidStore::new();
////Nothing was written to the file uuid_store
//assert!(!snapshot_path.path().join("data.ms.snapshot").exists()); .expect_snapshot()
//} .once()
.returning(move |_| Box::pin(err(IndexResolverError::IndexAlreadyExists)));
//#[actix_rt::test] let mut index_store = MockIndexStore::new();
//async fn test_loop() { index_store.expect_get().never();
//let mut uuid_resolver = MockUuidResolverHandle::new();
//uuid_resolver
//.expect_snapshot()
////we expect the funtion to be called between 2 and 3 time in the given interval.
//.times(2..4)
////abitrary error, to short-circuit the function
//.returning(move |_| Box::pin(err(UuidResolverError::NameAlreadyExist)));
//let update_handle = MockUpdateActorHandle::new(); let index_resolver = Arc::new(IndexResolver::new(uuid_store, index_store));
//let snapshot_path = tempfile::tempdir_in(".").unwrap(); let dir = tempfile::tempdir().unwrap();
//let snapshot_service = SnapshotService::new( let update_sender =
//uuid_resolver, create_update_handler(index_resolver.clone(), dir.path(), 4096 * 100).unwrap();
//update_handle,
//Duration::from_millis(100),
//snapshot_path.path().to_owned(),
//"data.ms".to_string(),
//);
//let _ = timeout(Duration::from_millis(300), snapshot_service.run()).await; let snapshot_path = tempfile::tempdir().unwrap();
//} let snapshot_service = SnapshotService::new(
//} index_resolver,
update_sender,
Duration::from_millis(100),
snapshot_path.path().to_owned(),
"data.ms".to_string(),
);
assert!(snapshot_service.perform_snapshot().await.is_err());
}
#[actix_rt::test]
async fn error_performing_index_snapshot() {
setup();
let uuids: HashSet<Uuid> = vec![Uuid::new_v4()].into_iter().collect();
let mut uuid_store = MockUuidStore::new();
let uuids_clone = uuids.clone();
uuid_store
.expect_snapshot()
.once()
.returning(move |_| Box::pin(ok(uuids_clone.clone())));
let mut indexes = uuids.clone().into_iter().map(|uuid| {
let mocker = Mocker::default();
// index returns random error
mocker
.when("snapshot")
.then(|_: &Path| -> IndexResult<()> { Err(IndexError::ExistingPrimaryKey) });
mocker.when("uuid").then(move |_: ()| uuid);
Index::faux(mocker)
});
let uuids_clone = uuids.clone();
let mut index_store = MockIndexStore::new();
index_store
.expect_get()
.withf(move |uuid| uuids_clone.contains(uuid))
.once()
.returning(move |_| Box::pin(ok(Some(indexes.next().unwrap()))));
let index_resolver = Arc::new(IndexResolver::new(uuid_store, index_store));
let dir = tempfile::tempdir().unwrap();
let update_sender =
create_update_handler(index_resolver.clone(), dir.path(), 4096 * 100).unwrap();
let snapshot_path = tempfile::tempdir().unwrap();
let snapshot_service = SnapshotService::new(
index_resolver,
update_sender,
Duration::from_millis(100),
snapshot_path.path().to_owned(),
"data.ms".to_string(),
);
assert!(snapshot_service.perform_snapshot().await.is_err());
}
}

View File

@ -5,6 +5,7 @@ use meilisearch_error::{Code, ErrorCode};
use crate::{ use crate::{
document_formats::DocumentFormatError, document_formats::DocumentFormatError,
index::error::IndexError,
index_controller::{update_file_store::UpdateFileStoreError, DocumentAdditionFormat}, index_controller::{update_file_store::UpdateFileStoreError, DocumentAdditionFormat},
}; };
@ -28,6 +29,8 @@ pub enum UpdateLoopError {
PayloadError(#[from] actix_web::error::PayloadError), PayloadError(#[from] actix_web::error::PayloadError),
#[error("A {0} payload is missing.")] #[error("A {0} payload is missing.")]
MissingPayload(DocumentAdditionFormat), MissingPayload(DocumentAdditionFormat),
#[error("{0}")]
IndexError(#[from] IndexError),
} }
impl<T> From<tokio::sync::mpsc::error::SendError<T>> for UpdateLoopError impl<T> From<tokio::sync::mpsc::error::SendError<T>> for UpdateLoopError
@ -58,7 +61,6 @@ impl ErrorCode for UpdateLoopError {
match self { match self {
Self::UnexistingUpdate(_) => Code::NotFound, Self::UnexistingUpdate(_) => Code::NotFound,
Self::Internal(_) => Code::Internal, Self::Internal(_) => Code::Internal,
//Self::IndexActor(e) => e.error_code(),
Self::FatalUpdateStoreError => Code::Internal, Self::FatalUpdateStoreError => Code::Internal,
Self::DocumentFormatError(error) => error.error_code(), Self::DocumentFormatError(error) => error.error_code(),
Self::PayloadError(error) => match error { Self::PayloadError(error) => match error {
@ -66,6 +68,7 @@ impl ErrorCode for UpdateLoopError {
_ => Code::Internal, _ => Code::Internal,
}, },
Self::MissingPayload(_) => Code::MissingPayload, Self::MissingPayload(_) => Code::MissingPayload,
Self::IndexError(e) => e.error_code(),
} }
} }
} }

View File

@ -26,16 +26,22 @@ use crate::index::{Index, Settings, Unchecked};
use crate::index_controller::update_file_store::UpdateFileStore; use crate::index_controller::update_file_store::UpdateFileStore;
use status::UpdateStatus; use status::UpdateStatus;
use super::index_resolver::HardStateIndexResolver; use super::index_resolver::index_store::IndexStore;
use super::index_resolver::uuid_store::UuidStore;
use super::index_resolver::IndexResolver;
use super::{DocumentAdditionFormat, Update}; use super::{DocumentAdditionFormat, Update};
pub type UpdateSender = mpsc::Sender<UpdateMsg>; pub type UpdateSender = mpsc::Sender<UpdateMsg>;
pub fn create_update_handler( pub fn create_update_handler<U, I>(
index_resolver: Arc<HardStateIndexResolver>, index_resolver: Arc<IndexResolver<U, I>>,
db_path: impl AsRef<Path>, db_path: impl AsRef<Path>,
update_store_size: usize, update_store_size: usize,
) -> anyhow::Result<UpdateSender> { ) -> anyhow::Result<UpdateSender>
where
U: UuidStore + Sync + Send + 'static,
I: IndexStore + Sync + Send + 'static,
{
let path = db_path.as_ref().to_owned(); let path = db_path.as_ref().to_owned();
let (sender, receiver) = mpsc::channel(100); let (sender, receiver) = mpsc::channel(100);
let actor = UpdateLoop::new(update_store_size, receiver, path, index_resolver)?; let actor = UpdateLoop::new(update_store_size, receiver, path, index_resolver)?;
@ -95,12 +101,16 @@ pub struct UpdateLoop {
} }
impl UpdateLoop { impl UpdateLoop {
pub fn new( pub fn new<U, I>(
update_db_size: usize, update_db_size: usize,
inbox: mpsc::Receiver<UpdateMsg>, inbox: mpsc::Receiver<UpdateMsg>,
path: impl AsRef<Path>, path: impl AsRef<Path>,
index_resolver: Arc<HardStateIndexResolver>, index_resolver: Arc<IndexResolver<U, I>>,
) -> anyhow::Result<Self> { ) -> anyhow::Result<Self>
where
U: UuidStore + Sync + Send + 'static,
I: IndexStore + Sync + Send + 'static,
{
let path = path.as_ref().to_owned(); let path = path.as_ref().to_owned();
std::fs::create_dir_all(&path)?; std::fs::create_dir_all(&path)?;

View File

@ -34,7 +34,7 @@ impl UpdateStore {
// txn must *always* be acquired after state lock, or it will dead lock. // txn must *always* be acquired after state lock, or it will dead lock.
let txn = self.env.write_txn()?; let txn = self.env.write_txn()?;
let uuids = indexes.iter().map(|i| i.uuid).collect(); let uuids = indexes.iter().map(|i| i.uuid()).collect();
self.dump_updates(&txn, &uuids, &path)?; self.dump_updates(&txn, &uuids, &path)?;

View File

@ -29,6 +29,8 @@ use codec::*;
use super::error::Result; use super::error::Result;
use super::status::{Enqueued, Processing}; use super::status::{Enqueued, Processing};
use crate::index::Index; use crate::index::Index;
use crate::index_controller::index_resolver::index_store::IndexStore;
use crate::index_controller::index_resolver::uuid_store::UuidStore;
use crate::index_controller::updates::*; use crate::index_controller::updates::*;
use crate::EnvSizer; use crate::EnvSizer;
@ -157,13 +159,17 @@ impl UpdateStore {
)) ))
} }
pub fn open( pub fn open<U, I>(
options: EnvOpenOptions, options: EnvOpenOptions,
path: impl AsRef<Path>, path: impl AsRef<Path>,
index_resolver: Arc<HardStateIndexResolver>, index_resolver: Arc<IndexResolver<U, I>>,
must_exit: Arc<AtomicBool>, must_exit: Arc<AtomicBool>,
update_file_store: UpdateFileStore, update_file_store: UpdateFileStore,
) -> anyhow::Result<Arc<Self>> { ) -> anyhow::Result<Arc<Self>>
where
U: UuidStore + Sync + Send + 'static,
I: IndexStore + Sync + Send + 'static,
{
let (update_store, mut notification_receiver) = let (update_store, mut notification_receiver) =
Self::new(options, path, update_file_store)?; Self::new(options, path, update_file_store)?;
let update_store = Arc::new(update_store); let update_store = Arc::new(update_store);
@ -296,10 +302,14 @@ impl UpdateStore {
/// Executes the user provided function on the next pending update (the one with the lowest id). /// Executes the user provided function on the next pending update (the one with the lowest id).
/// This is asynchronous as it let the user process the update with a read-only txn and /// This is asynchronous as it let the user process the update with a read-only txn and
/// only writing the result meta to the processed-meta store *after* it has been processed. /// only writing the result meta to the processed-meta store *after* it has been processed.
fn process_pending_update( fn process_pending_update<U, I>(
&self, &self,
index_resolver: Arc<HardStateIndexResolver>, index_resolver: Arc<IndexResolver<U, I>>,
) -> Result<Option<()>> { ) -> Result<Option<()>>
where
U: UuidStore + Sync + Send + 'static,
I: IndexStore + Sync + Send + 'static,
{
// Create a read transaction to be able to retrieve the pending update in order. // Create a read transaction to be able to retrieve the pending update in order.
let rtxn = self.env.read_txn()?; let rtxn = self.env.read_txn()?;
let first_meta = self.pending_queue.first(&rtxn)?; let first_meta = self.pending_queue.first(&rtxn)?;
@ -325,13 +335,17 @@ impl UpdateStore {
} }
} }
fn perform_update( fn perform_update<U, I>(
&self, &self,
processing: Processing, processing: Processing,
index_resolver: Arc<HardStateIndexResolver>, index_resolver: Arc<IndexResolver<U, I>>,
index_uuid: Uuid, index_uuid: Uuid,
global_id: u64, global_id: u64,
) -> Result<Option<()>> { ) -> Result<Option<()>>
where
U: UuidStore + Sync + Send + 'static,
I: IndexStore + Sync + Send + 'static,
{
// Process the pending update using the provided user function. // Process the pending update using the provided user function.
let handle = Handle::current(); let handle = Handle::current();
let update_id = processing.id(); let update_id = processing.id();
@ -509,7 +523,7 @@ impl UpdateStore {
let pendings = self.pending_queue.iter(&txn)?.lazily_decode_data(); let pendings = self.pending_queue.iter(&txn)?.lazily_decode_data();
let uuids: HashSet<_> = indexes.iter().map(|i| i.uuid).collect(); let uuids: HashSet<_> = indexes.iter().map(|i| i.uuid()).collect();
for entry in pendings { for entry in pendings {
let ((_, uuid, _), pending) = entry?; let ((_, uuid, _), pending) = entry?;
if uuids.contains(&uuid) { if uuids.contains(&uuid) {
@ -518,9 +532,7 @@ impl UpdateStore {
.. ..
} = pending.decode()? } = pending.decode()?
{ {
self.update_file_store self.update_file_store.snapshot(content_uuid, &path)?;
.snapshot(content_uuid, &path)
.unwrap();
} }
} }
} }
@ -528,8 +540,7 @@ impl UpdateStore {
let path = path.as_ref().to_owned(); let path = path.as_ref().to_owned();
indexes indexes
.par_iter() .par_iter()
.try_for_each(|index| index.snapshot(path.clone())) .try_for_each(|index| index.snapshot(&path))?;
.unwrap();
Ok(()) Ok(())
} }
@ -557,149 +568,217 @@ impl UpdateStore {
} }
} }
//#[cfg(test)] #[cfg(test)]
//mod test { mod test {
//use super::*; use futures::future::ok;
//use crate::index_controller::{ use mockall::predicate::eq;
//index_actor::{error::IndexActorError, MockIndexActorHandle},
//UpdateResult,
//};
//use futures::future::ok; use crate::index::error::IndexError;
use crate::index::test::Mocker;
use crate::index_controller::index_resolver::index_store::MockIndexStore;
use crate::index_controller::index_resolver::uuid_store::MockUuidStore;
use crate::index_controller::updates::status::{Failed, Processed};
//#[actix_rt::test] use super::*;
//async fn test_next_id() {
//let dir = tempfile::tempdir_in(".").unwrap();
//let mut options = EnvOpenOptions::new();
//let handle = Arc::new(MockIndexActorHandle::new());
//options.map_size(4096 * 100);
//let update_store = UpdateStore::open(
//options,
//dir.path(),
//handle,
//Arc::new(AtomicBool::new(false)),
//)
//.unwrap();
//let index1_uuid = Uuid::new_v4(); #[actix_rt::test]
//let index2_uuid = Uuid::new_v4(); async fn test_next_id() {
let dir = tempfile::tempdir_in(".").unwrap();
let mut options = EnvOpenOptions::new();
let index_store = MockIndexStore::new();
let uuid_store = MockUuidStore::new();
let index_resolver = IndexResolver::new(uuid_store, index_store);
let update_file_store = UpdateFileStore::new(dir.path()).unwrap();
options.map_size(4096 * 100);
let update_store = UpdateStore::open(
options,
dir.path(),
Arc::new(index_resolver),
Arc::new(AtomicBool::new(false)),
update_file_store,
)
.unwrap();
//let mut txn = update_store.env.write_txn().unwrap(); let index1_uuid = Uuid::new_v4();
//let ids = update_store.next_update_id(&mut txn, index1_uuid).unwrap(); let index2_uuid = Uuid::new_v4();
//txn.commit().unwrap();
//assert_eq!((0, 0), ids);
//let mut txn = update_store.env.write_txn().unwrap(); let mut txn = update_store.env.write_txn().unwrap();
//let ids = update_store.next_update_id(&mut txn, index2_uuid).unwrap(); let ids = update_store.next_update_id(&mut txn, index1_uuid).unwrap();
//txn.commit().unwrap(); txn.commit().unwrap();
//assert_eq!((1, 0), ids); assert_eq!((0, 0), ids);
//let mut txn = update_store.env.write_txn().unwrap(); let mut txn = update_store.env.write_txn().unwrap();
//let ids = update_store.next_update_id(&mut txn, index1_uuid).unwrap(); let ids = update_store.next_update_id(&mut txn, index2_uuid).unwrap();
//txn.commit().unwrap(); txn.commit().unwrap();
//assert_eq!((2, 1), ids); assert_eq!((1, 0), ids);
//}
//#[actix_rt::test] let mut txn = update_store.env.write_txn().unwrap();
//async fn test_register_update() { let ids = update_store.next_update_id(&mut txn, index1_uuid).unwrap();
//let dir = tempfile::tempdir_in(".").unwrap(); txn.commit().unwrap();
//let mut options = EnvOpenOptions::new(); assert_eq!((2, 1), ids);
//let handle = Arc::new(MockIndexActorHandle::new()); }
//options.map_size(4096 * 100);
//let update_store = UpdateStore::open(
//options,
//dir.path(),
//handle,
//Arc::new(AtomicBool::new(false)),
//)
//.unwrap();
//let meta = UpdateMeta::ClearDocuments;
//let uuid = Uuid::new_v4();
//let store_clone = update_store.clone();
//tokio::task::spawn_blocking(move || {
//store_clone.register_update(meta, None, uuid).unwrap();
//})
//.await
//.unwrap();
//let txn = update_store.env.read_txn().unwrap(); #[actix_rt::test]
//assert!(update_store async fn test_register_update() {
//.pending_queue let dir = tempfile::tempdir_in(".").unwrap();
//.get(&txn, &(0, uuid, 0)) let index_store = MockIndexStore::new();
//.unwrap() let uuid_store = MockUuidStore::new();
//.is_some()); let index_resolver = IndexResolver::new(uuid_store, index_store);
//} let update_file_store = UpdateFileStore::new(dir.path()).unwrap();
let mut options = EnvOpenOptions::new();
options.map_size(4096 * 100);
let update_store = UpdateStore::open(
options,
dir.path(),
Arc::new(index_resolver),
Arc::new(AtomicBool::new(false)),
update_file_store,
)
.unwrap();
let update = Update::ClearDocuments;
let uuid = Uuid::new_v4();
let store_clone = update_store.clone();
tokio::task::spawn_blocking(move || {
store_clone.register_update(uuid, update).unwrap();
})
.await
.unwrap();
//#[actix_rt::test] let txn = update_store.env.read_txn().unwrap();
//async fn test_process_update() { assert!(update_store
//let dir = tempfile::tempdir_in(".").unwrap(); .pending_queue
//let mut handle = MockIndexActorHandle::new(); .get(&txn, &(0, uuid, 0))
.unwrap()
.is_some());
}
//handle #[actix_rt::test]
//.expect_update() async fn test_process_update_success() {
//.times(2) let dir = tempfile::tempdir_in(".").unwrap();
//.returning(|_index_uuid, processing, _file| { let index_uuid = Uuid::new_v4();
//if processing.id() == 0 {
//Box::pin(ok(Ok(processing.process(UpdateResult::Other))))
//} else {
//Box::pin(ok(Err(
//processing.fail(IndexActorError::ExistingPrimaryKey.into())
//)))
//}
//});
//let handle = Arc::new(handle); let mut index_store = MockIndexStore::new();
index_store
.expect_get()
.with(eq(index_uuid))
.returning(|_uuid| {
let mocker = Mocker::default();
mocker
.when::<Processing, std::result::Result<Processed, Failed>>("handle_update")
.once()
.then(|update| Ok(update.process(status::UpdateResult::Other)));
//let mut options = EnvOpenOptions::new(); Box::pin(ok(Some(Index::faux(mocker))))
//options.map_size(4096 * 100); });
//let store = UpdateStore::open(
//options,
//dir.path(),
//handle.clone(),
//Arc::new(AtomicBool::new(false)),
//)
//.unwrap();
//// wait a bit for the event loop exit. let uuid_store = MockUuidStore::new();
//tokio::time::sleep(std::time::Duration::from_millis(50)).await; let index_resolver = Arc::new(IndexResolver::new(uuid_store, index_store));
//let mut txn = store.env.write_txn().unwrap(); let update_file_store = UpdateFileStore::new(dir.path()).unwrap();
let mut options = EnvOpenOptions::new();
options.map_size(4096 * 100);
let store = UpdateStore::open(
options,
dir.path(),
index_resolver.clone(),
Arc::new(AtomicBool::new(false)),
update_file_store,
)
.unwrap();
//let update = Enqueued::new(UpdateMeta::ClearDocuments, 0, None); // wait a bit for the event loop exit.
//let uuid = Uuid::new_v4(); tokio::time::sleep(std::time::Duration::from_millis(50)).await;
//store let mut txn = store.env.write_txn().unwrap();
//.pending_queue
//.put(&mut txn, &(0, uuid, 0), &update)
//.unwrap();
//let update = Enqueued::new(UpdateMeta::ClearDocuments, 1, None); let update = Enqueued::new(Update::ClearDocuments, 0);
//store store
//.pending_queue .pending_queue
//.put(&mut txn, &(1, uuid, 1), &update) .put(&mut txn, &(0, index_uuid, 0), &update)
//.unwrap(); .unwrap();
//txn.commit().unwrap(); txn.commit().unwrap();
//// Process the pending, and check that it has been moved to the update databases, and // Process the pending, and check that it has been moved to the update databases, and
//// removed from the pending database. // removed from the pending database.
//let store_clone = store.clone(); let store_clone = store.clone();
//tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
//store_clone.process_pending_update(handle.clone()).unwrap(); store_clone.process_pending_update(index_resolver).unwrap();
//store_clone.process_pending_update(handle).unwrap(); })
//}) .await
//.await .unwrap();
//.unwrap();
//let txn = store.env.read_txn().unwrap(); let txn = store.env.read_txn().unwrap();
//assert!(store.pending_queue.first(&txn).unwrap().is_none()); assert!(store.pending_queue.first(&txn).unwrap().is_none());
//let update = store.updates.get(&txn, &(uuid, 0)).unwrap().unwrap(); let update = store.updates.get(&txn, &(index_uuid, 0)).unwrap().unwrap();
//assert!(matches!(update, UpdateStatus::Processed(_))); assert!(matches!(update, UpdateStatus::Processed(_)));
//let update = store.updates.get(&txn, &(uuid, 1)).unwrap().unwrap(); }
//assert!(matches!(update, UpdateStatus::Failed(_))); #[actix_rt::test]
//} async fn test_process_update_failure() {
//} let dir = tempfile::tempdir_in(".").unwrap();
let index_uuid = Uuid::new_v4();
let mut index_store = MockIndexStore::new();
index_store
.expect_get()
.with(eq(index_uuid))
.returning(|_uuid| {
let mocker = Mocker::default();
mocker
.when::<Processing, std::result::Result<Processed, Failed>>("handle_update")
.once()
.then(|update| Err(update.fail(IndexError::ExistingPrimaryKey)));
Box::pin(ok(Some(Index::faux(mocker))))
});
let uuid_store = MockUuidStore::new();
let index_resolver = Arc::new(IndexResolver::new(uuid_store, index_store));
let update_file_store = UpdateFileStore::new(dir.path()).unwrap();
let mut options = EnvOpenOptions::new();
options.map_size(4096 * 100);
let store = UpdateStore::open(
options,
dir.path(),
index_resolver.clone(),
Arc::new(AtomicBool::new(false)),
update_file_store,
)
.unwrap();
// wait a bit for the event loop exit.
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let mut txn = store.env.write_txn().unwrap();
let update = Enqueued::new(Update::ClearDocuments, 0);
store
.pending_queue
.put(&mut txn, &(0, index_uuid, 0), &update)
.unwrap();
txn.commit().unwrap();
// Process the pending, and check that it has been moved to the update databases, and
// removed from the pending database.
let store_clone = store.clone();
tokio::task::spawn_blocking(move || {
store_clone.process_pending_update(index_resolver).unwrap();
})
.await
.unwrap();
let txn = store.env.read_txn().unwrap();
assert!(store.pending_queue.first(&txn).unwrap().is_none());
let update = store.updates.get(&txn, &(index_uuid, 0)).unwrap().unwrap();
assert!(matches!(update, UpdateStatus::Failed(_)));
}
}

View File

@ -5,7 +5,8 @@ pub mod options;
pub mod index; pub mod index;
pub mod index_controller; pub mod index_controller;
pub use index_controller::{updates::store::Update, IndexController as MeiliSearch}; pub use index_controller::updates::store::Update;
pub use index_controller::MeiliSearch;
pub use milli; pub use milli;