WIP: IndexController

This commit is contained in:
mpostma 2021-01-13 17:50:36 +01:00
parent b07e21ab3c
commit ddd7789713
No known key found for this signature in database
GPG Key ID: CBC8A7C1D7A28C3A
6 changed files with 230 additions and 55 deletions

28
Cargo.lock generated
View File

@ -777,6 +777,16 @@ dependencies = [
"memchr",
]
[[package]]
name = "dashmap"
version = "4.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e77a43b28d0668df09411cb0bc9a8c2adc40f9a048afe863e05fd43251e8e39c"
dependencies = [
"cfg-if 1.0.0",
"num_cpus",
]
[[package]]
name = "debugid"
version = "0.7.2"
@ -1201,7 +1211,6 @@ dependencies = [
"lmdb-rkv-sys",
"once_cell",
"page_size",
"serde",
"synchronoise",
"url",
"zerocopy",
@ -1588,7 +1597,7 @@ checksum = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08"
[[package]]
name = "meilisearch-error"
version = "0.15.0"
version = "0.18.0"
dependencies = [
"actix-http",
]
@ -1609,6 +1618,7 @@ dependencies = [
"bytes 0.6.0",
"chrono",
"crossbeam-channel",
"dashmap",
"env_logger 0.8.2",
"flate2",
"fst",
@ -1627,6 +1637,7 @@ dependencies = [
"milli",
"mime",
"once_cell",
"page_size",
"rand 0.7.3",
"rayon",
"regex",
@ -1699,7 +1710,6 @@ dependencies = [
"bstr",
"byte-unit",
"byteorder",
"chrono",
"crossbeam-channel",
"csv",
"either",
@ -1714,13 +1724,13 @@ dependencies = [
"levenshtein_automata",
"linked-hash-map",
"log",
"meilisearch-tokenizer",
"memmap",
"near-proximity",
"num-traits",
"obkv",
"once_cell",
"ordered-float",
"page_size",
"pest 2.1.3 (git+https://github.com/pest-parser/pest.git?rev=51fd1d49f1041f7839975664ef71fe15c7dcaf67)",
"pest_derive",
"rayon",
@ -1729,7 +1739,6 @@ dependencies = [
"roaring",
"serde",
"serde_json",
"serde_millis",
"slice-group-by",
"smallstr",
"smallvec",
@ -2596,15 +2605,6 @@ dependencies = [
"serde",
]
[[package]]
name = "serde_millis"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6e2dc780ca5ee2c369d1d01d100270203c4ff923d2a4264812d723766434d00"
dependencies = [
"serde",
]
[[package]]
name = "serde_qs"
version = "0.8.2"

View File

@ -30,7 +30,7 @@ fst = "0.4.5"
futures = "0.3.7"
futures-util = "0.3.8"
grenad = { git = "https://github.com/Kerollmops/grenad.git", rev = "3adcb26" }
heed = "0.10.6"
heed = { version = "0.10.6", default-features = false, features = ["lmdb", "sync-read-txn"] }
http = "0.2.1"
indexmap = { version = "1.3.2", features = ["serde-1"] }
log = "0.4.8"
@ -58,6 +58,8 @@ tokio = { version = "0.2", features = ["full"] }
ureq = { version = "1.5.1", default-features = false, features = ["tls"] }
walkdir = "2.3.1"
whoami = "1.0.0"
dashmap = "4.0.2"
page_size = "0.4.2"
[dependencies.sentry]
default-features = false

View File

@ -3,7 +3,6 @@ mod updates;
pub use search::{SearchQuery, SearchResult};
use std::collections::HashMap;
use std::fs::create_dir_all;
use std::ops::Deref;
use std::sync::Arc;
@ -13,6 +12,7 @@ use sha2::Digest;
use crate::{option::Opt, updates::Settings};
use crate::updates::UpdateQueue;
use crate::index_controller::IndexController;
#[derive(Clone)]
pub struct Data {
@ -29,7 +29,7 @@ impl Deref for Data {
#[derive(Clone)]
pub struct DataInner {
pub indexes: Arc<Index>,
pub indexes: Arc<IndexController>,
pub update_queue: Arc<UpdateQueue>,
api_keys: ApiKeys,
options: Opt,
@ -62,9 +62,7 @@ impl ApiKeys {
impl Data {
pub fn new(options: Opt) -> anyhow::Result<Data> {
let db_size = options.max_mdb_size.get_bytes() as usize;
let path = options.db_path.join("main");
create_dir_all(&path)?;
let indexes = Index::new(&path, Some(db_size))?;
let indexes = IndexController::new(&options.db_path)?;
let indexes = Arc::new(indexes);
let update_queue = Arc::new(UpdateQueue::new(&options, indexes.clone())?);
@ -90,28 +88,26 @@ impl Data {
let displayed_attributes = self.indexes
.displayed_fields(&txn)?
.map(|fields| {println!("{:?}", fields); fields.iter().filter_map(|f| fields_map.name(*f).map(String::from)).collect()})
.map(|fields| fields.into_iter().map(String::from).collect())
.unwrap_or_else(|| vec!["*".to_string()]);
let searchable_attributes = self.indexes
.searchable_fields(&txn)?
.map(|fields| fields
.iter()
.filter_map(|f| fields_map.name(*f).map(String::from))
.into_iter()
.map(String::from)
.collect())
.unwrap_or_else(|| vec!["*".to_string()]);
let faceted_attributes = self.indexes
.faceted_fields(&txn)?
.iter()
.filter_map(|(f, t)| Some((fields_map.name(*f)?.to_string(), t.to_string())))
.collect::<HashMap<_, _>>()
.into();
let faceted_attributes = self.indexes.faceted_fields(&txn)?
.into_iter()
.map(|(k, v)| (k, v.to_string()))
.collect();
Ok(Settings {
displayed_attributes: Some(Some(displayed_attributes)),
searchable_attributes: Some(Some(searchable_attributes)),
faceted_attributes: Some(faceted_attributes),
faceted_attributes: Some(Some(faceted_attributes)),
criteria: None,
})
}

View File

@ -1,4 +1,3 @@
use std::borrow::Cow;
use std::collections::HashSet;
use std::mem;
use std::time::Instant;
@ -8,17 +7,22 @@ use serde::{Deserialize, Serialize};
use milli::{SearchResult as Results, obkv_to_json};
use meilisearch_tokenizer::{Analyzer, AnalyzerConfig};
use crate::error::Error;
use super::Data;
const DEFAULT_SEARCH_LIMIT: usize = 20;
const fn default_search_limit() -> usize { DEFAULT_SEARCH_LIMIT }
#[derive(Deserialize)]
#[serde(rename_all = "camelCase", deny_unknown_fields)]
#[allow(dead_code)]
pub struct SearchQuery {
q: Option<String>,
offset: Option<usize>,
limit: Option<usize>,
#[serde(default = "default_search_limit")]
limit: usize,
attributes_to_retrieve: Option<Vec<String>>,
attributes_to_crop: Option<Vec<String>>,
crop_length: Option<usize>,
@ -100,30 +104,18 @@ impl<'a, A: AsRef<[u8]>> Highlighter<'a, A> {
}
impl Data {
pub fn search<S: AsRef<str>>(&self, _index: S, search_query: SearchQuery) -> anyhow::Result<SearchResult> {
pub fn search<S: AsRef<str>>(&self, index: S, search_query: SearchQuery) -> anyhow::Result<SearchResult> {
let start = Instant::now();
let index = &self.indexes;
let rtxn = index.read_txn()?;
let mut search = index.search(&rtxn);
if let Some(query) = &search_query.q {
search.query(query);
}
if let Some(offset) = search_query.offset {
search.offset(offset);
}
let limit = search_query.limit.unwrap_or(DEFAULT_SEARCH_LIMIT);
search.limit(limit);
let Results { found_words, documents_ids, nb_hits, .. } = search.execute().unwrap();
let index = self.indexes
.get(index)?
.ok_or_else(|| Error::OpenIndex(format!("Index {} doesn't exists.", index.as_ref())))?;
let Results { found_words, documents_ids, nb_hits, .. } = index.search(search_query)?;
let fields_ids_map = index.fields_ids_map(&rtxn).unwrap();
let displayed_fields = match index.displayed_fields(&rtxn).unwrap() {
Some(fields) => Cow::Borrowed(fields),
None => Cow::Owned(fields_ids_map.iter().map(|(id, _)| id).collect()),
let displayed_fields = match index.displayed_fields_ids(&rtxn).unwrap() {
Some(fields) => fields,
None => fields_ids_map.iter().map(|(id, _)| id).collect(),
};
let attributes_to_highlight = match search_query.attributes_to_highlight {

187
src/index_controller/mod.rs Normal file
View File

@ -0,0 +1,187 @@
use std::fs::File;
use std::io::{Read, Write};
use std::path::Path;
use std::ops::Deref;
use anyhow::Result;
use chrono::{DateTime, Utc};
use dashmap::DashMap;
use heed::types::{Str, SerdeBincode};
use heed::{EnvOpenOptions, Env, Database};
use milli::Index;
use serde::{Serialize, Deserialize};
use crate::data::{SearchQuery, SearchResult};
const CONTROLLER_META_FILENAME: &str = "index_controller_meta";
const INDEXES_CONTROLLER_FILENAME: &str = "indexes_db";
const INDEXES_DB_NAME: &str = "indexes_db";
trait UpdateStore {}
pub struct IndexController<U> {
update_store: U,
env: Env,
indexes_db: Database<Str, SerdeBincode<IndexMetadata>>,
indexes: DashMap<String, Index>,
}
#[derive(Debug, Serialize, Deserialize)]
struct IndexControllerMeta {
open_options: EnvOpenOptions,
created_at: DateTime<Utc>,
}
impl IndexControllerMeta {
fn from_path(path: impl AsRef<Path>) -> Result<Option<IndexControllerMeta>> {
let path = path.as_ref().to_path_buf().push(CONTROLLER_META_FILENAME);
if path.exists() {
let mut file = File::open(path)?;
let mut buffer = Vec::new();
let n = file.read_to_end(&mut buffer)?;
let meta: IndexControllerMeta = serde_json::from_slice(&buffer[..n])?;
Ok(Some(meta))
} else {
Ok(None)
}
}
fn to_path(self, path: impl AsRef<Path>) -> Result<()> {
let path = path.as_ref().to_path_buf().push(CONTROLLER_META_FILENAME);
if path.exists() {
Err(anyhow::anyhow!("Index controller metadata already exists"))
} else {
let mut file = File::create(path)?;
let json = serde_json::to_vec(&self)?;
file.write_all(&json)?;
Ok(())
}
}
}
#[derive(Debug, Serialize, Deserialize)]
struct IndexMetadata {
created_at: DateTime<Utc>,
open_options: EnvOpenOptions,
id: String,
}
impl IndexMetadata {
fn open_index(&self) -> Result<Self> {
todo!()
}
}
struct IndexView<'a, U> {
txn: heed::RoTxn<'a>,
index: &'a Index,
update_store: &'a U,
}
struct IndexViewMut<'a, U> {
txn: heed::RwTxn<'a>,
index: &'a Index,
update_store: &'a U,
}
impl<'a, U> Deref for IndexViewMut<'a, U> {
type Target = IndexView<'a, U>;
fn deref(&self) -> &Self::Target {
IndexView {
txn: *self.txn,
index: self.index,
update_store: self.update_store,
}
}
}
impl<'a, U: UpdateStore> IndexView<'a, U> {
fn search(&self, search_query: SearchQuery) -> Result<SearchResult> {
let mut search = self.index.search(self.txn);
if let Some(query) = &search_query.q {
search.query(query);
}
if let Some(offset) = search_query.offset {
search.offset(offset);
}
let limit = search_query.limit;
search.limit(limit);
Ok(search.execute()?)
}
}
impl<U: UpdateStore> IndexController<U> {
/// Open the index controller from meta found at path, and create a new one if no meta is
/// found.
pub fn new(path: impl AsRef<Path>, update_store: U) -> Result<Self> {
// If index controller metadata is present, we return the env, otherwise, we create a new
// metadata from scratch before returning a new env.
let env = match IndexControllerMeta::from_path(path)? {
Some(meta) => meta.open_options.open(INDEXES_CONTROLLER_FILENAME)?,
None => {
let open_options = EnvOpenOptions::new()
.map_size(page_size::get() * 1000);
let env = open_options.open(INDEXES_CONTROLLER_FILENAME)?;
let created_at = Utc::now();
let meta = IndexControllerMeta { open_options, created_at };
meta.to_path(path)?;
env
}
};
let indexes = DashMap::new();
let indexes_db = match env.open_database(INDEXES_DB_NAME)? {
Some(indexes_db) => indexes_db,
None => env.create_database(INDEXES_DB_NAME)?,
};
Ok(Self { env, indexes, indexes_db, update_store })
}
pub fn get_or_create<S: AsRef<str>>(&mut self, name: S) -> Result<IndexViewMut<'_, U>> {
todo!()
}
/// Get an index with read access to the db. The index are lazily loaded, meaning that we first
/// check for its exixtence in the indexes map, and if it doesn't exist, the index db is check
/// for metadata to launch the index.
pub fn get<S: AsRef<str>>(&self, name: S) -> Result<Option<IndexView<'_, U>>> {
match self.indexes.get(name.as_ref()) {
Some(index) => {
let txn = index.read_txn()?;
let update_store = &self.update_store;
Ok(Some(IndexView { index, update_store, txn }))
}
None => {
let txn = self.env.read_txn()?;
match self.indexes_db.get(&txn, name.as_ref())? {
Some(meta) => {
let index = meta.open_index()?;
self.indexes.insert(name.as_ref().to_owned(), index);
Ok(self.indexes.get(name.as_ref()))
}
None => Ok(None)
}
}
}
}
pub fn get_mut<S: AsRef<str>>(&self, name: S) -> Result<Option<IndexViewMut<'_, U>>> {
todo!()
}
pub async fn delete_index<S: AsRef<str>>(&self, name:S) -> Result<()> {
todo!()
}
pub async fn list_indices(&self) -> Result<Vec<(String, IndexMetadata)>> {
todo!()
}
pub async fn rename_index(&self, old: &str, new: &str) -> Result<()> {
todo!()
}
}

View File

@ -6,9 +6,7 @@ pub mod helpers;
pub mod option;
pub mod routes;
mod updates;
//pub mod analytics;
//pub mod snapshot;
//pub mod dump;
mod index_controller;
use actix_http::Error;
use actix_service::ServiceFactory;