mirror of
https://github.com/meilisearch/meilisearch.git
synced 2024-12-02 10:05:05 +08:00
Merge pull request #27 from meilisearch/create-index
Implement create index
This commit is contained in:
commit
f44f8a823a
@ -9,8 +9,8 @@ use std::sync::Arc;
|
|||||||
|
|
||||||
use sha2::Digest;
|
use sha2::Digest;
|
||||||
|
|
||||||
use crate::index_controller::{IndexController, LocalIndexController};
|
use crate::index_controller::{IndexController, LocalIndexController, IndexMetadata, Settings, IndexSettings};
|
||||||
use crate::{option::Opt, index_controller::Settings};
|
use crate::option::Opt;
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct Data {
|
pub struct Data {
|
||||||
@ -114,6 +114,27 @@ impl Data {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn list_indexes(&self) -> anyhow::Result<Vec<IndexMetadata>> {
|
||||||
|
self.index_controller.list_indexes()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn index(&self, name: impl AsRef<str>) -> anyhow::Result<Option<IndexMetadata>> {
|
||||||
|
Ok(self
|
||||||
|
.list_indexes()?
|
||||||
|
.into_iter()
|
||||||
|
.find(|i| i.name == name.as_ref()))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn create_index(&self, name: impl AsRef<str>, primary_key: Option<impl AsRef<str>>) -> anyhow::Result<IndexMetadata> {
|
||||||
|
let settings = IndexSettings {
|
||||||
|
name: Some(name.as_ref().to_string()),
|
||||||
|
primary_key: primary_key.map(|s| s.as_ref().to_string()),
|
||||||
|
};
|
||||||
|
|
||||||
|
let meta = self.index_controller.create_index(settings)?;
|
||||||
|
Ok(meta)
|
||||||
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn http_payload_size_limit(&self) -> usize {
|
pub fn http_payload_size_limit(&self) -> usize {
|
||||||
self.options.http_payload_size_limit.get_bytes() as usize
|
self.options.http_payload_size_limit.get_bytes() as usize
|
||||||
|
@ -28,6 +28,13 @@ impl fmt::Display for ResponseError {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: remove this when implementing actual error handling
|
||||||
|
impl From<anyhow::Error> for ResponseError {
|
||||||
|
fn from(other: anyhow::Error) -> ResponseError {
|
||||||
|
ResponseError { inner: Box::new(Error::NotFound(other.to_string())) }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl From<Error> for ResponseError {
|
impl From<Error> for ResponseError {
|
||||||
fn from(error: Error) -> ResponseError {
|
fn from(error: Error) -> ResponseError {
|
||||||
ResponseError { inner: Box::new(error) }
|
ResponseError { inner: Box::new(error) }
|
||||||
|
@ -2,8 +2,11 @@ use std::fs::{create_dir_all, remove_dir_all};
|
|||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use anyhow::{bail, Context};
|
||||||
|
use chrono::{DateTime, Utc};
|
||||||
use dashmap::{DashMap, mapref::entry::Entry};
|
use dashmap::{DashMap, mapref::entry::Entry};
|
||||||
use heed::{Env, EnvOpenOptions, Database, types::{Str, SerdeJson, ByteSlice}, RoTxn, RwTxn};
|
use heed::{Env, EnvOpenOptions, Database, types::{Str, SerdeJson, ByteSlice}, RoTxn, RwTxn};
|
||||||
|
use log::error;
|
||||||
use milli::Index;
|
use milli::Index;
|
||||||
use rayon::ThreadPool;
|
use rayon::ThreadPool;
|
||||||
use serde::{Serialize, Deserialize};
|
use serde::{Serialize, Deserialize};
|
||||||
@ -16,10 +19,11 @@ use super::{UpdateMeta, UpdateResult};
|
|||||||
type UpdateStore = super::update_store::UpdateStore<UpdateMeta, UpdateResult, String>;
|
type UpdateStore = super::update_store::UpdateStore<UpdateMeta, UpdateResult, String>;
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug, PartialEq)]
|
#[derive(Serialize, Deserialize, Debug, PartialEq)]
|
||||||
struct IndexMeta {
|
pub struct IndexMeta {
|
||||||
update_store_size: u64,
|
update_store_size: u64,
|
||||||
index_store_size: u64,
|
index_store_size: u64,
|
||||||
uuid: Uuid,
|
pub uuid: Uuid,
|
||||||
|
pub created_at: DateTime<Utc>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl IndexMeta {
|
impl IndexMeta {
|
||||||
@ -50,9 +54,9 @@ impl IndexMeta {
|
|||||||
|
|
||||||
pub struct IndexStore {
|
pub struct IndexStore {
|
||||||
env: Env,
|
env: Env,
|
||||||
name_to_uuid_meta: Database<Str, ByteSlice>,
|
name_to_uuid: Database<Str, ByteSlice>,
|
||||||
uuid_to_index: DashMap<Uuid, (Arc<Index>, Arc<UpdateStore>)>,
|
uuid_to_index: DashMap<Uuid, (Arc<Index>, Arc<UpdateStore>)>,
|
||||||
uuid_to_index_db: Database<ByteSlice, SerdeJson<IndexMeta>>,
|
uuid_to_index_meta: Database<ByteSlice, SerdeJson<IndexMeta>>,
|
||||||
|
|
||||||
thread_pool: Arc<ThreadPool>,
|
thread_pool: Arc<ThreadPool>,
|
||||||
indexer_options: IndexerOpts,
|
indexer_options: IndexerOpts,
|
||||||
@ -65,9 +69,9 @@ impl IndexStore {
|
|||||||
.max_dbs(2)
|
.max_dbs(2)
|
||||||
.open(path)?;
|
.open(path)?;
|
||||||
|
|
||||||
let uid_to_index = DashMap::new();
|
let uuid_to_index = DashMap::new();
|
||||||
let name_to_uid_db = open_or_create_database(&env, Some("name_to_uid"))?;
|
let name_to_uuid = open_or_create_database(&env, Some("name_to_uid"))?;
|
||||||
let uid_to_index_db = open_or_create_database(&env, Some("uid_to_index_db"))?;
|
let uuid_to_index_meta = open_or_create_database(&env, Some("uid_to_index_db"))?;
|
||||||
|
|
||||||
let thread_pool = rayon::ThreadPoolBuilder::new()
|
let thread_pool = rayon::ThreadPoolBuilder::new()
|
||||||
.num_threads(indexer_options.indexing_jobs.unwrap_or(0))
|
.num_threads(indexer_options.indexing_jobs.unwrap_or(0))
|
||||||
@ -76,9 +80,9 @@ impl IndexStore {
|
|||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
env,
|
env,
|
||||||
name_to_uuid_meta: name_to_uid_db,
|
name_to_uuid,
|
||||||
uuid_to_index: uid_to_index,
|
uuid_to_index,
|
||||||
uuid_to_index_db: uid_to_index_db,
|
uuid_to_index_meta,
|
||||||
|
|
||||||
thread_pool,
|
thread_pool,
|
||||||
indexer_options,
|
indexer_options,
|
||||||
@ -86,7 +90,7 @@ impl IndexStore {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn index_uuid(&self, txn: &RoTxn, name: impl AsRef<str>) -> anyhow::Result<Option<Uuid>> {
|
fn index_uuid(&self, txn: &RoTxn, name: impl AsRef<str>) -> anyhow::Result<Option<Uuid>> {
|
||||||
match self.name_to_uuid_meta.get(txn, name.as_ref())? {
|
match self.name_to_uuid.get(txn, name.as_ref())? {
|
||||||
Some(bytes) => {
|
Some(bytes) => {
|
||||||
let uuid = Uuid::from_slice(bytes)?;
|
let uuid = Uuid::from_slice(bytes)?;
|
||||||
Ok(Some(uuid))
|
Ok(Some(uuid))
|
||||||
@ -98,7 +102,7 @@ impl IndexStore {
|
|||||||
fn retrieve_index(&self, txn: &RoTxn, uid: Uuid) -> anyhow::Result<Option<(Arc<Index>, Arc<UpdateStore>)>> {
|
fn retrieve_index(&self, txn: &RoTxn, uid: Uuid) -> anyhow::Result<Option<(Arc<Index>, Arc<UpdateStore>)>> {
|
||||||
match self.uuid_to_index.entry(uid.clone()) {
|
match self.uuid_to_index.entry(uid.clone()) {
|
||||||
Entry::Vacant(entry) => {
|
Entry::Vacant(entry) => {
|
||||||
match self.uuid_to_index_db.get(txn, uid.as_bytes())? {
|
match self.uuid_to_index_meta.get(txn, uid.as_bytes())? {
|
||||||
Some(meta) => {
|
Some(meta) => {
|
||||||
let path = self.env.path();
|
let path = self.env.path();
|
||||||
let (index, updates) = meta.open(path, self.thread_pool.clone(), &self.indexer_options)?;
|
let (index, updates) = meta.open(path, self.thread_pool.clone(), &self.indexer_options)?;
|
||||||
@ -138,14 +142,14 @@ impl IndexStore {
|
|||||||
Some(res) => Ok(res),
|
Some(res) => Ok(res),
|
||||||
None => {
|
None => {
|
||||||
let uuid = Uuid::new_v4();
|
let uuid = Uuid::new_v4();
|
||||||
let result = self.create_index(&mut txn, uuid, name, update_size, index_size)?;
|
let (index, updates, _) = self.create_index_txn(&mut txn, uuid, name, update_size, index_size)?;
|
||||||
// If we fail to commit the transaction, we must delete the database from the
|
// If we fail to commit the transaction, we must delete the database from the
|
||||||
// file-system.
|
// file-system.
|
||||||
if let Err(e) = txn.commit() {
|
if let Err(e) = txn.commit() {
|
||||||
self.clean_db(uuid);
|
self.clean_db(uuid);
|
||||||
return Err(e)?;
|
return Err(e)?;
|
||||||
}
|
}
|
||||||
Ok(result)
|
Ok((index, updates))
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -161,17 +165,18 @@ impl IndexStore {
|
|||||||
self.uuid_to_index.remove(&uuid);
|
self.uuid_to_index.remove(&uuid);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn create_index( &self,
|
fn create_index_txn( &self,
|
||||||
txn: &mut RwTxn,
|
txn: &mut RwTxn,
|
||||||
uuid: Uuid,
|
uuid: Uuid,
|
||||||
name: impl AsRef<str>,
|
name: impl AsRef<str>,
|
||||||
update_size: u64,
|
update_store_size: u64,
|
||||||
index_size: u64,
|
index_store_size: u64,
|
||||||
) -> anyhow::Result<(Arc<Index>, Arc<UpdateStore>)> {
|
) -> anyhow::Result<(Arc<Index>, Arc<UpdateStore>, IndexMeta)> {
|
||||||
let meta = IndexMeta { update_store_size: update_size, index_store_size: index_size, uuid: uuid.clone() };
|
let created_at = Utc::now();
|
||||||
|
let meta = IndexMeta { update_store_size, index_store_size, uuid: uuid.clone(), created_at };
|
||||||
|
|
||||||
self.name_to_uuid_meta.put(txn, name.as_ref(), uuid.as_bytes())?;
|
self.name_to_uuid.put(txn, name.as_ref(), uuid.as_bytes())?;
|
||||||
self.uuid_to_index_db.put(txn, uuid.as_bytes(), &meta)?;
|
self.uuid_to_index_meta.put(txn, uuid.as_bytes(), &meta)?;
|
||||||
|
|
||||||
let path = self.env.path();
|
let path = self.env.path();
|
||||||
let (index, update_store) = match meta.open(path, self.thread_pool.clone(), &self.indexer_options) {
|
let (index, update_store) = match meta.open(path, self.thread_pool.clone(), &self.indexer_options) {
|
||||||
@ -184,7 +189,56 @@ impl IndexStore {
|
|||||||
|
|
||||||
self.uuid_to_index.insert(uuid, (index.clone(), update_store.clone()));
|
self.uuid_to_index.insert(uuid, (index.clone(), update_store.clone()));
|
||||||
|
|
||||||
Ok((index, update_store))
|
Ok((index, update_store, meta))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Same as `get_or_create`, but returns an error if the index already exists.
|
||||||
|
pub fn create_index(
|
||||||
|
&self,
|
||||||
|
name: impl AsRef<str>,
|
||||||
|
update_size: u64,
|
||||||
|
index_size: u64,
|
||||||
|
) -> anyhow::Result<(Arc<Index>, Arc<UpdateStore>, IndexMeta)> {
|
||||||
|
let uuid = Uuid::new_v4();
|
||||||
|
let mut txn = self.env.write_txn()?;
|
||||||
|
|
||||||
|
if self.name_to_uuid.get(&txn, name.as_ref())?.is_some() {
|
||||||
|
bail!("cannot create index {:?}: an index with this name already exists.")
|
||||||
|
}
|
||||||
|
|
||||||
|
let result = self.create_index_txn(&mut txn, uuid, name, update_size, index_size)?;
|
||||||
|
// If we fail to commit the transaction, we must delete the database from the
|
||||||
|
// file-system.
|
||||||
|
if let Err(e) = txn.commit() {
|
||||||
|
self.clean_db(uuid);
|
||||||
|
return Err(e)?;
|
||||||
|
}
|
||||||
|
Ok(result)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns each index associated with its metadata:
|
||||||
|
/// (index_name, IndexMeta, primary_key)
|
||||||
|
/// This method will force all the indexes to be loaded.
|
||||||
|
pub fn list_indexes(&self) -> anyhow::Result<Vec<(String, IndexMeta, Option<String>)>> {
|
||||||
|
let txn = self.env.read_txn()?;
|
||||||
|
let metas = self.name_to_uuid
|
||||||
|
.iter(&txn)?
|
||||||
|
.filter_map(|entry| entry
|
||||||
|
.map_err(|e| { error!("error decoding entry while listing indexes: {}", e); e }).ok());
|
||||||
|
let mut indexes = Vec::new();
|
||||||
|
for (name, uuid) in metas {
|
||||||
|
// get index to retrieve primary key
|
||||||
|
let (index, _) = self.get_index_txn(&txn, name)?
|
||||||
|
.with_context(|| format!("could not load index {:?}", name))?;
|
||||||
|
let primary_key = index.primary_key(&index.read_txn()?)?
|
||||||
|
.map(String::from);
|
||||||
|
// retieve meta
|
||||||
|
let meta = self.uuid_to_index_meta
|
||||||
|
.get(&txn, &uuid)?
|
||||||
|
.with_context(|| format!("could not retieve meta for index {:?}", name))?;
|
||||||
|
indexes.push((name.to_owned(), meta, primary_key));
|
||||||
|
}
|
||||||
|
Ok(indexes)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -247,7 +301,7 @@ mod test {
|
|||||||
// insert an uuid in the the name_to_uuid_db:
|
// insert an uuid in the the name_to_uuid_db:
|
||||||
let uuid = Uuid::new_v4();
|
let uuid = Uuid::new_v4();
|
||||||
let mut txn = store.env.write_txn().unwrap();
|
let mut txn = store.env.write_txn().unwrap();
|
||||||
store.name_to_uuid_meta.put(&mut txn, &name, uuid.as_bytes()).unwrap();
|
store.name_to_uuid.put(&mut txn, &name, uuid.as_bytes()).unwrap();
|
||||||
txn.commit().unwrap();
|
txn.commit().unwrap();
|
||||||
|
|
||||||
// check that the uuid is there
|
// check that the uuid is there
|
||||||
@ -264,9 +318,14 @@ mod test {
|
|||||||
let txn = store.env.read_txn().unwrap();
|
let txn = store.env.read_txn().unwrap();
|
||||||
assert!(store.retrieve_index(&txn, uuid).unwrap().is_none());
|
assert!(store.retrieve_index(&txn, uuid).unwrap().is_none());
|
||||||
|
|
||||||
let meta = IndexMeta { update_store_size: 4096 * 100, index_store_size: 4096 * 100, uuid: uuid.clone() };
|
let meta = IndexMeta {
|
||||||
|
update_store_size: 4096 * 100,
|
||||||
|
index_store_size: 4096 * 100,
|
||||||
|
uuid: uuid.clone(),
|
||||||
|
created_at: Utc::now(),
|
||||||
|
};
|
||||||
let mut txn = store.env.write_txn().unwrap();
|
let mut txn = store.env.write_txn().unwrap();
|
||||||
store.uuid_to_index_db.put(&mut txn, uuid.as_bytes(), &meta).unwrap();
|
store.uuid_to_index_meta.put(&mut txn, uuid.as_bytes(), &meta).unwrap();
|
||||||
txn.commit().unwrap();
|
txn.commit().unwrap();
|
||||||
|
|
||||||
// the index cache should be empty
|
// the index cache should be empty
|
||||||
@ -286,10 +345,15 @@ mod test {
|
|||||||
assert!(store.index(&name).unwrap().is_none());
|
assert!(store.index(&name).unwrap().is_none());
|
||||||
|
|
||||||
let uuid = Uuid::new_v4();
|
let uuid = Uuid::new_v4();
|
||||||
let meta = IndexMeta { update_store_size: 4096 * 100, index_store_size: 4096 * 100, uuid: uuid.clone() };
|
let meta = IndexMeta {
|
||||||
|
update_store_size: 4096 * 100,
|
||||||
|
index_store_size: 4096 * 100,
|
||||||
|
uuid: uuid.clone(),
|
||||||
|
created_at: Utc::now(),
|
||||||
|
};
|
||||||
let mut txn = store.env.write_txn().unwrap();
|
let mut txn = store.env.write_txn().unwrap();
|
||||||
store.name_to_uuid_meta.put(&mut txn, &name, uuid.as_bytes()).unwrap();
|
store.name_to_uuid.put(&mut txn, &name, uuid.as_bytes()).unwrap();
|
||||||
store.uuid_to_index_db.put(&mut txn, uuid.as_bytes(), &meta).unwrap();
|
store.uuid_to_index_meta.put(&mut txn, uuid.as_bytes(), &meta).unwrap();
|
||||||
txn.commit().unwrap();
|
txn.commit().unwrap();
|
||||||
|
|
||||||
assert!(store.index(&name).unwrap().is_some());
|
assert!(store.index(&name).unwrap().is_some());
|
||||||
@ -301,14 +365,18 @@ mod test {
|
|||||||
let store = IndexStore::new(temp, IndexerOpts::default()).unwrap();
|
let store = IndexStore::new(temp, IndexerOpts::default()).unwrap();
|
||||||
let name = "foobar";
|
let name = "foobar";
|
||||||
|
|
||||||
store.get_or_create_index(&name, 4096 * 100, 4096 * 100).unwrap();
|
let update_store_size = 4096 * 100;
|
||||||
|
let index_store_size = 4096 * 100;
|
||||||
|
store.get_or_create_index(&name, update_store_size, index_store_size).unwrap();
|
||||||
let txn = store.env.read_txn().unwrap();
|
let txn = store.env.read_txn().unwrap();
|
||||||
let uuid = store.name_to_uuid_meta.get(&txn, &name).unwrap();
|
let uuid = store.name_to_uuid.get(&txn, &name).unwrap();
|
||||||
assert_eq!(store.uuid_to_index.len(), 1);
|
assert_eq!(store.uuid_to_index.len(), 1);
|
||||||
assert!(uuid.is_some());
|
assert!(uuid.is_some());
|
||||||
let uuid = Uuid::from_slice(uuid.unwrap()).unwrap();
|
let uuid = Uuid::from_slice(uuid.unwrap()).unwrap();
|
||||||
let meta = IndexMeta { update_store_size: 4096 * 100, index_store_size: 4096 * 100, uuid: uuid.clone() };
|
let meta = store.uuid_to_index_meta.get(&txn, uuid.as_bytes()).unwrap().unwrap();
|
||||||
assert_eq!(store.uuid_to_index_db.get(&txn, uuid.as_bytes()).unwrap(), Some(meta));
|
assert_eq!(meta.update_store_size, update_store_size);
|
||||||
|
assert_eq!(meta.index_store_size, index_store_size);
|
||||||
|
assert_eq!(meta.uuid, uuid);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@ -317,17 +385,19 @@ mod test {
|
|||||||
let store = IndexStore::new(temp, IndexerOpts::default()).unwrap();
|
let store = IndexStore::new(temp, IndexerOpts::default()).unwrap();
|
||||||
let name = "foobar";
|
let name = "foobar";
|
||||||
|
|
||||||
let update_size = 4096 * 100;
|
let update_store_size = 4096 * 100;
|
||||||
let index_size = 4096 * 100;
|
let index_store_size = 4096 * 100;
|
||||||
let uuid = Uuid::new_v4();
|
let uuid = Uuid::new_v4();
|
||||||
let mut txn = store.env.write_txn().unwrap();
|
let mut txn = store.env.write_txn().unwrap();
|
||||||
store.create_index(&mut txn, uuid, name, update_size, index_size).unwrap();
|
store.create_index_txn(&mut txn, uuid, name, update_store_size, index_store_size).unwrap();
|
||||||
let uuid = store.name_to_uuid_meta.get(&txn, &name).unwrap();
|
let uuid = store.name_to_uuid.get(&txn, &name).unwrap();
|
||||||
assert_eq!(store.uuid_to_index.len(), 1);
|
assert_eq!(store.uuid_to_index.len(), 1);
|
||||||
assert!(uuid.is_some());
|
assert!(uuid.is_some());
|
||||||
let uuid = Uuid::from_slice(uuid.unwrap()).unwrap();
|
let uuid = Uuid::from_slice(uuid.unwrap()).unwrap();
|
||||||
let meta = IndexMeta { update_store_size: update_size , index_store_size: index_size, uuid: uuid.clone() };
|
let meta = store.uuid_to_index_meta.get(&txn, uuid.as_bytes()).unwrap().unwrap();
|
||||||
assert_eq!(store.uuid_to_index_db.get(&txn, uuid.as_bytes()).unwrap(), Some(meta));
|
assert_eq!(meta.update_store_size, update_store_size);
|
||||||
|
assert_eq!(meta.index_store_size, index_store_size);
|
||||||
|
assert_eq!(meta.uuid, uuid);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -5,7 +5,7 @@ mod update_handler;
|
|||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use anyhow::bail;
|
use anyhow::{bail, Context};
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
use milli::Index;
|
use milli::Index;
|
||||||
|
|
||||||
@ -13,7 +13,7 @@ use crate::option::IndexerOpts;
|
|||||||
use index_store::IndexStore;
|
use index_store::IndexStore;
|
||||||
use super::IndexController;
|
use super::IndexController;
|
||||||
use super::updates::UpdateStatus;
|
use super::updates::UpdateStatus;
|
||||||
use super::{UpdateMeta, UpdateResult};
|
use super::{UpdateMeta, UpdateResult, IndexMetadata, IndexSettings};
|
||||||
|
|
||||||
pub struct LocalIndexController {
|
pub struct LocalIndexController {
|
||||||
indexes: IndexStore,
|
indexes: IndexStore,
|
||||||
@ -58,8 +58,25 @@ impl IndexController for LocalIndexController {
|
|||||||
Ok(pending.into())
|
Ok(pending.into())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn create_index<S: AsRef<str>>(&self, _index_uid: S) -> anyhow::Result<()> {
|
fn create_index(&self, index_settings: IndexSettings) -> anyhow::Result<IndexMetadata> {
|
||||||
todo!()
|
let index_name = index_settings.name.context("Missing name for index")?;
|
||||||
|
let (index, _, meta) = self.indexes.create_index(&index_name, self.update_db_size, self.index_db_size)?;
|
||||||
|
if let Some(ref primary_key) = index_settings.primary_key {
|
||||||
|
if let Err(e) = update_primary_key(index, primary_key).context("error creating index") {
|
||||||
|
// TODO: creating index could not be completed, delete everything.
|
||||||
|
Err(e)?
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let meta = IndexMetadata {
|
||||||
|
name: index_name,
|
||||||
|
uuid: meta.uuid.clone(),
|
||||||
|
created_at: meta.created_at,
|
||||||
|
updated_at: meta.created_at,
|
||||||
|
primary_key: index_settings.primary_key,
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(meta)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn delete_index<S: AsRef<str>>(&self, _index_uid: S) -> anyhow::Result<()> {
|
fn delete_index<S: AsRef<str>>(&self, _index_uid: S) -> anyhow::Result<()> {
|
||||||
@ -102,4 +119,54 @@ impl IndexController for LocalIndexController {
|
|||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn list_indexes(&self) -> anyhow::Result<Vec<IndexMetadata>> {
|
||||||
|
let metas = self.indexes.list_indexes()?;
|
||||||
|
let mut output_meta = Vec::new();
|
||||||
|
for (name, meta, primary_key) in metas {
|
||||||
|
let created_at = meta.created_at;
|
||||||
|
let uuid = meta.uuid;
|
||||||
|
let updated_at = self
|
||||||
|
.all_update_status(&name)?
|
||||||
|
.iter()
|
||||||
|
.filter_map(|u| u.processed().map(|u| u.processed_at))
|
||||||
|
.max()
|
||||||
|
.unwrap_or(created_at);
|
||||||
|
|
||||||
|
let index_meta = IndexMetadata {
|
||||||
|
name,
|
||||||
|
created_at,
|
||||||
|
updated_at,
|
||||||
|
uuid,
|
||||||
|
primary_key,
|
||||||
|
};
|
||||||
|
output_meta.push(index_meta);
|
||||||
|
}
|
||||||
|
Ok(output_meta)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn update_primary_key(index: impl AsRef<Index>, primary_key: impl AsRef<str>) -> anyhow::Result<()> {
|
||||||
|
let index = index.as_ref();
|
||||||
|
let mut txn = index.write_txn()?;
|
||||||
|
if index.primary_key(&txn)?.is_some() {
|
||||||
|
bail!("primary key already set.")
|
||||||
|
}
|
||||||
|
index.put_primary_key(&mut txn, primary_key.as_ref())?;
|
||||||
|
txn.commit()?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod test {
|
||||||
|
use super::*;
|
||||||
|
use tempfile::tempdir;
|
||||||
|
use crate::make_index_controller_tests;
|
||||||
|
|
||||||
|
make_index_controller_tests!({
|
||||||
|
let options = IndexerOpts::default();
|
||||||
|
let path = tempdir().unwrap();
|
||||||
|
let size = 4096 * 100;
|
||||||
|
LocalIndexController::new(path, options, size, size).unwrap()
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
@ -8,14 +8,26 @@ use std::num::NonZeroUsize;
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
|
use chrono::{DateTime, Utc};
|
||||||
use milli::Index;
|
use milli::Index;
|
||||||
use milli::update::{IndexDocumentsMethod, UpdateFormat, DocumentAdditionResult};
|
use milli::update::{IndexDocumentsMethod, UpdateFormat, DocumentAdditionResult};
|
||||||
use serde::{Serialize, Deserialize, de::Deserializer};
|
use serde::{Serialize, Deserialize, de::Deserializer};
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
pub use updates::{Processed, Processing, Failed};
|
pub use updates::{Processed, Processing, Failed};
|
||||||
|
|
||||||
pub type UpdateStatus = updates::UpdateStatus<UpdateMeta, UpdateResult, String>;
|
pub type UpdateStatus = updates::UpdateStatus<UpdateMeta, UpdateResult, String>;
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
pub struct IndexMetadata {
|
||||||
|
pub name: String,
|
||||||
|
uuid: Uuid,
|
||||||
|
created_at: DateTime<Utc>,
|
||||||
|
updated_at: DateTime<Utc>,
|
||||||
|
primary_key: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
#[serde(tag = "type")]
|
#[serde(tag = "type")]
|
||||||
pub enum UpdateMeta {
|
pub enum UpdateMeta {
|
||||||
@ -85,6 +97,11 @@ pub enum UpdateResult {
|
|||||||
Other,
|
Other,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub struct IndexSettings {
|
||||||
|
pub name: Option<String>,
|
||||||
|
pub primary_key: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
/// The `IndexController` is in charge of the access to the underlying indices. It splits the logic
|
/// The `IndexController` is in charge of the access to the underlying indices. It splits the logic
|
||||||
/// for read access which is provided thanks to an handle to the index, and write access which must
|
/// for read access which is provided thanks to an handle to the index, and write access which must
|
||||||
/// be provided. This allows the implementer to define the behaviour of write accesses to the
|
/// be provided. This allows the implementer to define the behaviour of write accesses to the
|
||||||
@ -115,7 +132,7 @@ pub trait IndexController {
|
|||||||
fn update_settings<S: AsRef<str>>(&self, index_uid: S, settings: Settings) -> anyhow::Result<UpdateStatus>;
|
fn update_settings<S: AsRef<str>>(&self, index_uid: S, settings: Settings) -> anyhow::Result<UpdateStatus>;
|
||||||
|
|
||||||
/// Create an index with the given `index_uid`.
|
/// Create an index with the given `index_uid`.
|
||||||
fn create_index<S: AsRef<str>>(&self, index_uid: S) -> Result<()>;
|
fn create_index(&self, index_settings: IndexSettings) -> Result<IndexMetadata>;
|
||||||
|
|
||||||
/// Delete index with the given `index_uid`, attempting to close it beforehand.
|
/// Delete index with the given `index_uid`, attempting to close it beforehand.
|
||||||
fn delete_index<S: AsRef<str>>(&self, index_uid: S) -> Result<()>;
|
fn delete_index<S: AsRef<str>>(&self, index_uid: S) -> Result<()>;
|
||||||
@ -140,4 +157,57 @@ pub trait IndexController {
|
|||||||
|
|
||||||
fn update_status(&self, index: impl AsRef<str>, id: u64) -> anyhow::Result<Option<UpdateStatus>>;
|
fn update_status(&self, index: impl AsRef<str>, id: u64) -> anyhow::Result<Option<UpdateStatus>>;
|
||||||
fn all_update_status(&self, index: impl AsRef<str>) -> anyhow::Result<Vec<UpdateStatus>>;
|
fn all_update_status(&self, index: impl AsRef<str>) -> anyhow::Result<Vec<UpdateStatus>>;
|
||||||
|
|
||||||
|
fn list_indexes(&self) -> anyhow::Result<Vec<IndexMetadata>>;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
#[macro_use]
|
||||||
|
pub(crate) mod test {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[macro_export]
|
||||||
|
macro_rules! make_index_controller_tests {
|
||||||
|
($controller_buider:block) => {
|
||||||
|
#[test]
|
||||||
|
fn test_create_and_list_indexes() {
|
||||||
|
crate::index_controller::test::create_and_list_indexes($controller_buider);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_create_index_with_no_name_is_error() {
|
||||||
|
crate::index_controller::test::create_index_with_no_name_is_error($controller_buider);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn create_and_list_indexes<S: IndexController>(controller: S) {
|
||||||
|
let settings1 = IndexSettings {
|
||||||
|
name: Some(String::from("test_index")),
|
||||||
|
primary_key: None,
|
||||||
|
};
|
||||||
|
|
||||||
|
let settings2 = IndexSettings {
|
||||||
|
name: Some(String::from("test_index2")),
|
||||||
|
primary_key: Some(String::from("foo")),
|
||||||
|
};
|
||||||
|
|
||||||
|
controller.create_index(settings1).unwrap();
|
||||||
|
controller.create_index(settings2).unwrap();
|
||||||
|
|
||||||
|
let indexes = controller.list_indexes().unwrap();
|
||||||
|
assert_eq!(indexes.len(), 2);
|
||||||
|
assert_eq!(indexes[0].name, "test_index");
|
||||||
|
assert_eq!(indexes[1].name, "test_index2");
|
||||||
|
assert_eq!(indexes[1].primary_key.clone().unwrap(), "foo");
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn create_index_with_no_name_is_error<S: IndexController>(controller: S) {
|
||||||
|
let settings = IndexSettings {
|
||||||
|
name: None,
|
||||||
|
primary_key: None,
|
||||||
|
};
|
||||||
|
assert!(controller.create_index(settings).is_err());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -134,6 +134,13 @@ impl<M, N, E> UpdateStatus<M, N, E> {
|
|||||||
UpdateStatus::Failed(u) => u.id(),
|
UpdateStatus::Failed(u) => u.id(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn processed(&self) -> Option<&Processed<M, N>> {
|
||||||
|
match self {
|
||||||
|
UpdateStatus::Processed(p) => Some(p),
|
||||||
|
_ => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<M, N, E> From<Pending<M>> for UpdateStatus<M, N, E> {
|
impl<M, N, E> From<Pending<M>> for UpdateStatus<M, N, E> {
|
||||||
|
@ -83,15 +83,6 @@ async fn get_all_documents(
|
|||||||
todo!()
|
todo!()
|
||||||
}
|
}
|
||||||
|
|
||||||
//fn find_primary_key(document: &IndexMap<String, Value>) -> Option<String> {
|
|
||||||
//for key in document.keys() {
|
|
||||||
//if key.to_lowercase().contains("id") {
|
|
||||||
//return Some(key.to_string());
|
|
||||||
//}
|
|
||||||
//}
|
|
||||||
//None
|
|
||||||
//}
|
|
||||||
|
|
||||||
#[derive(Deserialize)]
|
#[derive(Deserialize)]
|
||||||
#[serde(rename_all = "camelCase", deny_unknown_fields)]
|
#[serde(rename_all = "camelCase", deny_unknown_fields)]
|
||||||
struct UpdateDocumentsQuery {
|
struct UpdateDocumentsQuery {
|
||||||
@ -150,6 +141,7 @@ async fn add_documents_default(
|
|||||||
_params: web::Query<UpdateDocumentsQuery>,
|
_params: web::Query<UpdateDocumentsQuery>,
|
||||||
_body: web::Json<Vec<Document>>,
|
_body: web::Json<Vec<Document>>,
|
||||||
) -> Result<HttpResponse, ResponseError> {
|
) -> Result<HttpResponse, ResponseError> {
|
||||||
|
error!("Unknown document type");
|
||||||
todo!()
|
todo!()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -19,43 +19,60 @@ pub fn services(cfg: &mut web::ServiceConfig) {
|
|||||||
.service(get_all_updates_status);
|
.service(get_all_updates_status);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
|
||||||
#[serde(rename_all = "camelCase")]
|
|
||||||
pub struct IndexResponse {
|
|
||||||
pub name: String,
|
|
||||||
pub uid: String,
|
|
||||||
created_at: DateTime<Utc>,
|
|
||||||
updated_at: DateTime<Utc>,
|
|
||||||
pub primary_key: Option<String>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[get("/indexes", wrap = "Authentication::Private")]
|
#[get("/indexes", wrap = "Authentication::Private")]
|
||||||
async fn list_indexes(_data: web::Data<Data>) -> Result<HttpResponse, ResponseError> {
|
async fn list_indexes(data: web::Data<Data>) -> Result<HttpResponse, ResponseError> {
|
||||||
todo!()
|
match data.list_indexes() {
|
||||||
|
Ok(indexes) => {
|
||||||
|
let json = serde_json::to_string(&indexes).unwrap();
|
||||||
|
Ok(HttpResponse::Ok().body(&json))
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
error!("error listing indexes: {}", e);
|
||||||
|
unimplemented!()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[get("/indexes/{index_uid}", wrap = "Authentication::Private")]
|
#[get("/indexes/{index_uid}", wrap = "Authentication::Private")]
|
||||||
async fn get_index(
|
async fn get_index(
|
||||||
_data: web::Data<Data>,
|
data: web::Data<Data>,
|
||||||
_path: web::Path<IndexParam>,
|
path: web::Path<IndexParam>,
|
||||||
) -> Result<HttpResponse, ResponseError> {
|
) -> Result<HttpResponse, ResponseError> {
|
||||||
todo!()
|
match data.index(&path.index_uid)? {
|
||||||
|
Some(meta) => {
|
||||||
|
let json = serde_json::to_string(&meta).unwrap();
|
||||||
|
Ok(HttpResponse::Ok().body(json))
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
unimplemented!()
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Deserialize)]
|
#[derive(Debug, Deserialize)]
|
||||||
#[serde(rename_all = "camelCase", deny_unknown_fields)]
|
#[serde(rename_all = "camelCase", deny_unknown_fields)]
|
||||||
struct IndexCreateRequest {
|
struct IndexCreateRequest {
|
||||||
name: Option<String>,
|
name: String,
|
||||||
uid: Option<String>,
|
|
||||||
primary_key: Option<String>,
|
primary_key: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[post("/indexes", wrap = "Authentication::Private")]
|
#[post("/indexes", wrap = "Authentication::Private")]
|
||||||
async fn create_index(
|
async fn create_index(
|
||||||
_data: web::Data<Data>,
|
data: web::Data<Data>,
|
||||||
_body: web::Json<IndexCreateRequest>,
|
body: web::Json<IndexCreateRequest>,
|
||||||
) -> Result<HttpResponse, ResponseError> {
|
) -> Result<HttpResponse, ResponseError> {
|
||||||
todo!()
|
match data.create_index(&body.name, body.primary_key.clone()) {
|
||||||
|
Ok(meta) => {
|
||||||
|
let json = serde_json::to_string(&meta).unwrap();
|
||||||
|
Ok(HttpResponse::Ok().body(json))
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
error!("error creating index: {}", e);
|
||||||
|
unimplemented!()
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Deserialize)]
|
#[derive(Debug, Deserialize)]
|
||||||
|
Loading…
Reference in New Issue
Block a user