mirror of
https://github.com/meilisearch/meilisearch.git
synced 2024-11-23 10:37:41 +08:00
clean project
This commit is contained in:
parent
c4846dafca
commit
2ee2e6a9b2
@ -1,127 +0,0 @@
|
|||||||
mod index_actor;
|
|
||||||
mod update_actor;
|
|
||||||
mod uuid_resolver;
|
|
||||||
mod update_store;
|
|
||||||
mod update_handler;
|
|
||||||
|
|
||||||
use std::path::Path;
|
|
||||||
|
|
||||||
use tokio::sync::{mpsc, oneshot};
|
|
||||||
use uuid::Uuid;
|
|
||||||
use super::IndexMetadata;
|
|
||||||
use futures::stream::StreamExt;
|
|
||||||
use actix_web::web::Payload;
|
|
||||||
use super::UpdateMeta;
|
|
||||||
use crate::index::{SearchResult, SearchQuery};
|
|
||||||
use actix_web::web::Bytes;
|
|
||||||
|
|
||||||
use crate::index::Settings;
|
|
||||||
use super::UpdateStatus;
|
|
||||||
|
|
||||||
pub struct IndexController {
|
|
||||||
uuid_resolver: uuid_resolver::UuidResolverHandle,
|
|
||||||
index_handle: index_actor::IndexActorHandle,
|
|
||||||
update_handle: update_actor::UpdateActorHandle<Bytes>,
|
|
||||||
}
|
|
||||||
|
|
||||||
enum IndexControllerMsg {
|
|
||||||
CreateIndex {
|
|
||||||
uuid: Uuid,
|
|
||||||
primary_key: Option<String>,
|
|
||||||
ret: oneshot::Sender<anyhow::Result<IndexMetadata>>,
|
|
||||||
},
|
|
||||||
Shutdown,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl IndexController {
|
|
||||||
pub fn new(path: impl AsRef<Path>) -> Self {
|
|
||||||
let uuid_resolver = uuid_resolver::UuidResolverHandle::new();
|
|
||||||
let index_actor = index_actor::IndexActorHandle::new(&path);
|
|
||||||
let update_handle = update_actor::UpdateActorHandle::new(index_actor.clone(), &path);
|
|
||||||
Self { uuid_resolver, index_handle: index_actor, update_handle }
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn add_documents(
|
|
||||||
&self,
|
|
||||||
index: String,
|
|
||||||
method: milli::update::IndexDocumentsMethod,
|
|
||||||
format: milli::update::UpdateFormat,
|
|
||||||
mut payload: Payload,
|
|
||||||
primary_key: Option<String>,
|
|
||||||
) -> anyhow::Result<super::UpdateStatus> {
|
|
||||||
let uuid = self.uuid_resolver.get_or_create(index).await?;
|
|
||||||
let meta = UpdateMeta::DocumentsAddition { method, format, primary_key };
|
|
||||||
let (sender, receiver) = mpsc::channel(10);
|
|
||||||
|
|
||||||
// It is necessary to spawn a local task to senf the payload to the update handle to
|
|
||||||
// prevent dead_locking between the update_handle::update that waits for the update to be
|
|
||||||
// registered and the update_actor that waits for the the payload to be sent to it.
|
|
||||||
tokio::task::spawn_local(async move {
|
|
||||||
while let Some(bytes) = payload.next().await {
|
|
||||||
match bytes {
|
|
||||||
Ok(bytes) => { sender.send(Ok(bytes)).await; },
|
|
||||||
Err(e) => {
|
|
||||||
let error: Box<dyn std::error::Error + Sync + Send + 'static> = Box::new(e);
|
|
||||||
sender.send(Err(error)).await; },
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// This must be done *AFTER* spawning the task.
|
|
||||||
let status = self.update_handle.update(meta, receiver, uuid).await?;
|
|
||||||
Ok(status)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn clear_documents(&self, index: String) -> anyhow::Result<UpdateStatus> {
|
|
||||||
todo!()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn delete_documents(&self, index: String, document_ids: Vec<String>) -> anyhow::Result<super::UpdateStatus> {
|
|
||||||
todo!()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn update_settings(&self, index_uid: String, settings: Settings) -> anyhow::Result<super::UpdateStatus> {
|
|
||||||
todo!()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn create_index(&self, index_settings: super::IndexSettings) -> anyhow::Result<super::IndexMetadata> {
|
|
||||||
let super::IndexSettings { name, primary_key } = index_settings;
|
|
||||||
let uuid = self.uuid_resolver.create(name.unwrap()).await?;
|
|
||||||
let index_meta = self.index_handle.create_index(uuid, primary_key).await?;
|
|
||||||
Ok(index_meta)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn delete_index(&self, index_uid: String) -> anyhow::Result<()> {
|
|
||||||
todo!()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn swap_indices(&self, index1_uid: String, index2_uid: String) -> anyhow::Result<()> {
|
|
||||||
todo!()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn index(&self, name: String) -> anyhow::Result<Option<std::sync::Arc<milli::Index>>> {
|
|
||||||
todo!()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn update_status(&self, index: String, id: u64) -> anyhow::Result<Option<UpdateStatus>> {
|
|
||||||
todo!()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn all_update_status(&self, index: String) -> anyhow::Result<Vec<super::UpdateStatus>> {
|
|
||||||
todo!()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn list_indexes(&self) -> anyhow::Result<Vec<super::IndexMetadata>> {
|
|
||||||
todo!()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn update_index(&self, name: String, index_settings: super::IndexSettings) -> anyhow::Result<super::IndexMetadata> {
|
|
||||||
todo!()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn search(&self, name: String, query: SearchQuery) -> anyhow::Result<SearchResult> {
|
|
||||||
let uuid = self.uuid_resolver.resolve(name).await.unwrap().unwrap();
|
|
||||||
let result = self.index_handle.search(uuid, query).await?;
|
|
||||||
Ok(result)
|
|
||||||
}
|
|
||||||
}
|
|
@ -357,7 +357,6 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
|
|||||||
async fn handle_get_meta(&self, uuid: Uuid) -> Result<Option<IndexMeta>> {
|
async fn handle_get_meta(&self, uuid: Uuid) -> Result<Option<IndexMeta>> {
|
||||||
match self.store.get(uuid).await? {
|
match self.store.get(uuid).await? {
|
||||||
Some(index) => {
|
Some(index) => {
|
||||||
println!("geting meta yoyo");
|
|
||||||
let meta = spawn_blocking(move || IndexMeta::new(&index))
|
let meta = spawn_blocking(move || IndexMeta::new(&index))
|
||||||
.await
|
.await
|
||||||
.map_err(|e| IndexError::Error(e.into()))??;
|
.map_err(|e| IndexError::Error(e.into()))??;
|
||||||
|
@ -1,606 +0,0 @@
|
|||||||
use std::fs::{create_dir_all, remove_dir_all};
|
|
||||||
use std::path::{Path, PathBuf};
|
|
||||||
use std::sync::Arc;
|
|
||||||
use std::time::Duration;
|
|
||||||
|
|
||||||
use anyhow::{bail, Context};
|
|
||||||
use chrono::{DateTime, Utc};
|
|
||||||
use dashmap::{mapref::entry::Entry, DashMap};
|
|
||||||
use heed::{
|
|
||||||
types::{ByteSlice, SerdeJson, Str},
|
|
||||||
Database, Env, EnvOpenOptions, RoTxn, RwTxn,
|
|
||||||
};
|
|
||||||
use log::{error, info};
|
|
||||||
use milli::Index;
|
|
||||||
use rayon::ThreadPool; use serde::{Deserialize, Serialize};
|
|
||||||
use uuid::Uuid;
|
|
||||||
|
|
||||||
use super::update_handler::UpdateHandler;
|
|
||||||
use super::{UpdateMeta, UpdateResult};
|
|
||||||
use crate::option::IndexerOpts;
|
|
||||||
|
|
||||||
type UpdateStore = super::update_store::UpdateStore<UpdateMeta, UpdateResult, String>;
|
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug, PartialEq)]
|
|
||||||
pub struct IndexMeta {
|
|
||||||
update_store_size: u64,
|
|
||||||
index_store_size: u64,
|
|
||||||
pub uuid: Uuid,
|
|
||||||
pub created_at: DateTime<Utc>,
|
|
||||||
pub updated_at: DateTime<Utc>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl IndexMeta {
|
|
||||||
fn open(
|
|
||||||
&self,
|
|
||||||
path: impl AsRef<Path>,
|
|
||||||
thread_pool: Arc<ThreadPool>,
|
|
||||||
indexer_options: &IndexerOpts,
|
|
||||||
) -> anyhow::Result<(Arc<Index>, Arc<UpdateStore>)> {
|
|
||||||
let update_path = make_update_db_path(&path, &self.uuid);
|
|
||||||
let index_path = make_index_db_path(&path, &self.uuid);
|
|
||||||
|
|
||||||
create_dir_all(&update_path)?;
|
|
||||||
create_dir_all(&index_path)?;
|
|
||||||
|
|
||||||
let mut options = EnvOpenOptions::new();
|
|
||||||
options.map_size(self.index_store_size as usize);
|
|
||||||
let index = Arc::new(Index::new(options, index_path)?);
|
|
||||||
|
|
||||||
let mut options = EnvOpenOptions::new();
|
|
||||||
options.map_size(self.update_store_size as usize);
|
|
||||||
let handler = UpdateHandler::new(indexer_options, index.clone(), thread_pool)?;
|
|
||||||
let update_store = UpdateStore::open(options, update_path, handler)?;
|
|
||||||
|
|
||||||
Ok((index, update_store))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct IndexStore {
|
|
||||||
env: Env,
|
|
||||||
name_to_uuid: Database<Str, ByteSlice>,
|
|
||||||
uuid_to_index: DashMap<Uuid, (Arc<Index>, Arc<UpdateStore>)>,
|
|
||||||
uuid_to_index_meta: Database<ByteSlice, SerdeJson<IndexMeta>>,
|
|
||||||
|
|
||||||
thread_pool: Arc<ThreadPool>,
|
|
||||||
indexer_options: IndexerOpts,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl IndexStore {
|
|
||||||
pub fn new(path: impl AsRef<Path>, indexer_options: IndexerOpts) -> anyhow::Result<Self> {
|
|
||||||
let env = EnvOpenOptions::new()
|
|
||||||
.map_size(4096 * 100)
|
|
||||||
.max_dbs(2)
|
|
||||||
.open(path)?;
|
|
||||||
|
|
||||||
let uuid_to_index = DashMap::new();
|
|
||||||
let name_to_uuid = open_or_create_database(&env, Some("name_to_uid"))?;
|
|
||||||
let uuid_to_index_meta = open_or_create_database(&env, Some("uid_to_index_db"))?;
|
|
||||||
|
|
||||||
let thread_pool = rayon::ThreadPoolBuilder::new()
|
|
||||||
.num_threads(indexer_options.indexing_jobs.unwrap_or(0))
|
|
||||||
.build()?;
|
|
||||||
let thread_pool = Arc::new(thread_pool);
|
|
||||||
|
|
||||||
Ok(Self {
|
|
||||||
env,
|
|
||||||
name_to_uuid,
|
|
||||||
uuid_to_index,
|
|
||||||
uuid_to_index_meta,
|
|
||||||
|
|
||||||
thread_pool,
|
|
||||||
indexer_options,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn delete(&self, index_uid: impl AsRef<str>) -> anyhow::Result<()> {
|
|
||||||
// we remove the references to the index from the index map so it is not accessible anymore
|
|
||||||
let mut txn = self.env.write_txn()?;
|
|
||||||
let uuid = self
|
|
||||||
.index_uuid(&txn, &index_uid)?
|
|
||||||
.with_context(|| format!("Index {:?} doesn't exist", index_uid.as_ref()))?;
|
|
||||||
self.name_to_uuid.delete(&mut txn, index_uid.as_ref())?;
|
|
||||||
self.uuid_to_index_meta.delete(&mut txn, uuid.as_bytes())?;
|
|
||||||
txn.commit()?;
|
|
||||||
// If the index was loaded (i.e it is present in the uuid_to_index map), then we need to
|
|
||||||
// close it. The process goes as follow:
|
|
||||||
//
|
|
||||||
// 1) We want to remove any pending updates from the store.
|
|
||||||
// 2) We try to get ownership on the update store so we can close it. It may take a
|
|
||||||
// couple of tries, but since the update store event loop only has a weak reference to
|
|
||||||
// itself, and we are the only other function holding a reference to it otherwise, we will
|
|
||||||
// get it eventually.
|
|
||||||
// 3) We request a closing of the update store.
|
|
||||||
// 4) We can take ownership on the index, and close it.
|
|
||||||
// 5) We remove all the files from the file system.
|
|
||||||
let index_uid = index_uid.as_ref().to_string();
|
|
||||||
let path = self.env.path().to_owned();
|
|
||||||
if let Some((_, (index, updates))) = self.uuid_to_index.remove(&uuid) {
|
|
||||||
std::thread::spawn(move || {
|
|
||||||
info!("Preparing for {:?} deletion.", index_uid);
|
|
||||||
// this error is non fatal, but may delay the deletion.
|
|
||||||
if let Err(e) = updates.abort_pendings() {
|
|
||||||
error!(
|
|
||||||
"error aborting pending updates when deleting index {:?}: {}",
|
|
||||||
index_uid, e
|
|
||||||
);
|
|
||||||
}
|
|
||||||
let updates = get_arc_ownership_blocking(updates);
|
|
||||||
let close_event = updates.prepare_for_closing();
|
|
||||||
close_event.wait();
|
|
||||||
info!("closed update store for {:?}", index_uid);
|
|
||||||
|
|
||||||
let index = get_arc_ownership_blocking(index);
|
|
||||||
let close_event = index.prepare_for_closing();
|
|
||||||
close_event.wait();
|
|
||||||
|
|
||||||
let update_path = make_update_db_path(&path, &uuid);
|
|
||||||
let index_path = make_index_db_path(&path, &uuid);
|
|
||||||
|
|
||||||
if let Err(e) = remove_dir_all(index_path) {
|
|
||||||
error!("error removing index {:?}: {}", index_uid, e);
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Err(e) = remove_dir_all(update_path) {
|
|
||||||
error!("error removing index {:?}: {}", index_uid, e);
|
|
||||||
}
|
|
||||||
|
|
||||||
info!("index {:?} deleted.", index_uid);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn index_uuid(&self, txn: &RoTxn, name: impl AsRef<str>) -> anyhow::Result<Option<Uuid>> {
|
|
||||||
match self.name_to_uuid.get(txn, name.as_ref())? {
|
|
||||||
Some(bytes) => {
|
|
||||||
let uuid = Uuid::from_slice(bytes)?;
|
|
||||||
Ok(Some(uuid))
|
|
||||||
}
|
|
||||||
None => Ok(None),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn retrieve_index(
|
|
||||||
&self,
|
|
||||||
txn: &RoTxn,
|
|
||||||
uid: Uuid,
|
|
||||||
) -> anyhow::Result<Option<(Arc<Index>, Arc<UpdateStore>)>> {
|
|
||||||
match self.uuid_to_index.entry(uid.clone()) {
|
|
||||||
Entry::Vacant(entry) => match self.uuid_to_index_meta.get(txn, uid.as_bytes())? {
|
|
||||||
Some(meta) => {
|
|
||||||
let path = self.env.path();
|
|
||||||
let (index, updates) =
|
|
||||||
meta.open(path, self.thread_pool.clone(), &self.indexer_options)?;
|
|
||||||
entry.insert((index.clone(), updates.clone()));
|
|
||||||
Ok(Some((index, updates)))
|
|
||||||
}
|
|
||||||
None => Ok(None),
|
|
||||||
},
|
|
||||||
Entry::Occupied(entry) => {
|
|
||||||
let (index, updates) = entry.get();
|
|
||||||
Ok(Some((index.clone(), updates.clone())))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get_index_txn(
|
|
||||||
&self,
|
|
||||||
txn: &RoTxn,
|
|
||||||
name: impl AsRef<str>,
|
|
||||||
) -> anyhow::Result<Option<(Arc<Index>, Arc<UpdateStore>)>> {
|
|
||||||
match self.index_uuid(&txn, name)? {
|
|
||||||
Some(uid) => self.retrieve_index(&txn, uid),
|
|
||||||
None => Ok(None),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn index(
|
|
||||||
&self,
|
|
||||||
name: impl AsRef<str>,
|
|
||||||
) -> anyhow::Result<Option<(Arc<Index>, Arc<UpdateStore>)>> {
|
|
||||||
let txn = self.env.read_txn()?;
|
|
||||||
self.get_index_txn(&txn, name)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Use this function to perform an update on an index.
|
|
||||||
/// This function also puts a lock on what index is allowed to perform an update.
|
|
||||||
pub fn update_index<F, T>(&self, name: impl AsRef<str>, f: F) -> anyhow::Result<(T, IndexMeta)>
|
|
||||||
where
|
|
||||||
F: FnOnce(&Index) -> anyhow::Result<T>,
|
|
||||||
{
|
|
||||||
let mut txn = self.env.write_txn()?;
|
|
||||||
let (index, _) = self
|
|
||||||
.get_index_txn(&txn, &name)?
|
|
||||||
.with_context(|| format!("Index {:?} doesn't exist", name.as_ref()))?;
|
|
||||||
let result = f(index.as_ref());
|
|
||||||
match result {
|
|
||||||
Ok(ret) => {
|
|
||||||
let meta = self.update_meta(&mut txn, name, |meta| meta.updated_at = Utc::now())?;
|
|
||||||
txn.commit()?;
|
|
||||||
Ok((ret, meta))
|
|
||||||
}
|
|
||||||
Err(e) => Err(e),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn index_with_meta(
|
|
||||||
&self,
|
|
||||||
name: impl AsRef<str>,
|
|
||||||
) -> anyhow::Result<Option<(Arc<Index>, IndexMeta)>> {
|
|
||||||
let txn = self.env.read_txn()?;
|
|
||||||
let uuid = self.index_uuid(&txn, &name)?;
|
|
||||||
match uuid {
|
|
||||||
Some(uuid) => {
|
|
||||||
let meta = self
|
|
||||||
.uuid_to_index_meta
|
|
||||||
.get(&txn, uuid.as_bytes())?
|
|
||||||
.with_context(|| {
|
|
||||||
format!("unable to retrieve metadata for index {:?}", name.as_ref())
|
|
||||||
})?;
|
|
||||||
let (index, _) = self
|
|
||||||
.retrieve_index(&txn, uuid)?
|
|
||||||
.with_context(|| format!("unable to retrieve index {:?}", name.as_ref()))?;
|
|
||||||
Ok(Some((index, meta)))
|
|
||||||
}
|
|
||||||
None => Ok(None),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn update_meta<F>(
|
|
||||||
&self,
|
|
||||||
txn: &mut RwTxn,
|
|
||||||
name: impl AsRef<str>,
|
|
||||||
f: F,
|
|
||||||
) -> anyhow::Result<IndexMeta>
|
|
||||||
where
|
|
||||||
F: FnOnce(&mut IndexMeta),
|
|
||||||
{
|
|
||||||
let uuid = self
|
|
||||||
.index_uuid(txn, &name)?
|
|
||||||
.with_context(|| format!("Index {:?} doesn't exist", name.as_ref()))?;
|
|
||||||
let mut meta = self
|
|
||||||
.uuid_to_index_meta
|
|
||||||
.get(txn, uuid.as_bytes())?
|
|
||||||
.with_context(|| format!("couldn't retrieve metadata for index {:?}", name.as_ref()))?;
|
|
||||||
f(&mut meta);
|
|
||||||
self.uuid_to_index_meta.put(txn, uuid.as_bytes(), &meta)?;
|
|
||||||
Ok(meta)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn get_or_create_index(
|
|
||||||
&self,
|
|
||||||
name: impl AsRef<str>,
|
|
||||||
update_size: u64,
|
|
||||||
index_size: u64,
|
|
||||||
) -> anyhow::Result<(Arc<Index>, Arc<UpdateStore>)> {
|
|
||||||
let mut txn = self.env.write_txn()?;
|
|
||||||
match self.get_index_txn(&txn, name.as_ref())? {
|
|
||||||
Some(res) => Ok(res),
|
|
||||||
None => {
|
|
||||||
let uuid = Uuid::new_v4();
|
|
||||||
let (index, updates, _) =
|
|
||||||
self.create_index_txn(&mut txn, uuid, name, update_size, index_size)?;
|
|
||||||
// If we fail to commit the transaction, we must delete the database from the
|
|
||||||
// file-system.
|
|
||||||
if let Err(e) = txn.commit() {
|
|
||||||
self.clean_db(uuid);
|
|
||||||
return Err(e)?;
|
|
||||||
}
|
|
||||||
Ok((index, updates))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Remove all the files and data associated with a db uuid.
|
|
||||||
fn clean_db(&self, uuid: Uuid) {
|
|
||||||
let update_db_path = make_update_db_path(self.env.path(), &uuid);
|
|
||||||
let index_db_path = make_index_db_path(self.env.path(), &uuid);
|
|
||||||
|
|
||||||
remove_dir_all(update_db_path).expect("Failed to clean database");
|
|
||||||
remove_dir_all(index_db_path).expect("Failed to clean database");
|
|
||||||
|
|
||||||
self.uuid_to_index.remove(&uuid);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn create_index_txn(
|
|
||||||
&self,
|
|
||||||
txn: &mut RwTxn,
|
|
||||||
uuid: Uuid,
|
|
||||||
name: impl AsRef<str>,
|
|
||||||
update_store_size: u64,
|
|
||||||
index_store_size: u64,
|
|
||||||
) -> anyhow::Result<(Arc<Index>, Arc<UpdateStore>, IndexMeta)> {
|
|
||||||
let created_at = Utc::now();
|
|
||||||
let updated_at = created_at;
|
|
||||||
let meta = IndexMeta {
|
|
||||||
update_store_size,
|
|
||||||
index_store_size,
|
|
||||||
uuid: uuid.clone(),
|
|
||||||
created_at,
|
|
||||||
updated_at,
|
|
||||||
};
|
|
||||||
|
|
||||||
self.name_to_uuid.put(txn, name.as_ref(), uuid.as_bytes())?;
|
|
||||||
self.uuid_to_index_meta.put(txn, uuid.as_bytes(), &meta)?;
|
|
||||||
|
|
||||||
let path = self.env.path();
|
|
||||||
let (index, update_store) =
|
|
||||||
match meta.open(path, self.thread_pool.clone(), &self.indexer_options) {
|
|
||||||
Ok(res) => res,
|
|
||||||
Err(e) => {
|
|
||||||
self.clean_db(uuid);
|
|
||||||
return Err(e);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
self.uuid_to_index
|
|
||||||
.insert(uuid, (index.clone(), update_store.clone()));
|
|
||||||
|
|
||||||
Ok((index, update_store, meta))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Same as `get_or_create`, but returns an error if the index already exists.
|
|
||||||
pub fn create_index(
|
|
||||||
&self,
|
|
||||||
name: impl AsRef<str>,
|
|
||||||
update_size: u64,
|
|
||||||
index_size: u64,
|
|
||||||
) -> anyhow::Result<(Arc<Index>, Arc<UpdateStore>, IndexMeta)> {
|
|
||||||
let uuid = Uuid::new_v4();
|
|
||||||
let mut txn = self.env.write_txn()?;
|
|
||||||
|
|
||||||
if self.name_to_uuid.get(&txn, name.as_ref())?.is_some() {
|
|
||||||
bail!("index {:?} already exists", name.as_ref())
|
|
||||||
}
|
|
||||||
|
|
||||||
let result = self.create_index_txn(&mut txn, uuid, name, update_size, index_size)?;
|
|
||||||
// If we fail to commit the transaction, we must delete the database from the
|
|
||||||
// file-system.
|
|
||||||
if let Err(e) = txn.commit() {
|
|
||||||
self.clean_db(uuid);
|
|
||||||
return Err(e)?;
|
|
||||||
}
|
|
||||||
Ok(result)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns each index associated with its metadata:
|
|
||||||
/// (index_name, IndexMeta, primary_key)
|
|
||||||
/// This method will force all the indexes to be loaded.
|
|
||||||
pub fn list_indexes(&self) -> anyhow::Result<Vec<(String, IndexMeta, Option<String>)>> {
|
|
||||||
let txn = self.env.read_txn()?;
|
|
||||||
let metas = self.name_to_uuid.iter(&txn)?.filter_map(|entry| {
|
|
||||||
entry
|
|
||||||
.map_err(|e| {
|
|
||||||
error!("error decoding entry while listing indexes: {}", e);
|
|
||||||
e
|
|
||||||
})
|
|
||||||
.ok()
|
|
||||||
});
|
|
||||||
let mut indexes = Vec::new();
|
|
||||||
for (name, uuid) in metas {
|
|
||||||
// get index to retrieve primary key
|
|
||||||
let (index, _) = self
|
|
||||||
.get_index_txn(&txn, name)?
|
|
||||||
.with_context(|| format!("could not load index {:?}", name))?;
|
|
||||||
let primary_key = index.primary_key(&index.read_txn()?)?.map(String::from);
|
|
||||||
// retieve meta
|
|
||||||
let meta = self
|
|
||||||
.uuid_to_index_meta
|
|
||||||
.get(&txn, &uuid)?
|
|
||||||
.with_context(|| format!("could not retieve meta for index {:?}", name))?;
|
|
||||||
indexes.push((name.to_owned(), meta, primary_key));
|
|
||||||
}
|
|
||||||
Ok(indexes)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Loops on an arc to get ownership on the wrapped value. This method sleeps 100ms before retrying.
|
|
||||||
fn get_arc_ownership_blocking<T>(mut item: Arc<T>) -> T {
|
|
||||||
loop {
|
|
||||||
match Arc::try_unwrap(item) {
|
|
||||||
Ok(item) => return item,
|
|
||||||
Err(item_arc) => {
|
|
||||||
item = item_arc;
|
|
||||||
std::thread::sleep(Duration::from_millis(100));
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn open_or_create_database<K: 'static, V: 'static>(
|
|
||||||
env: &Env,
|
|
||||||
name: Option<&str>,
|
|
||||||
) -> anyhow::Result<Database<K, V>> {
|
|
||||||
match env.open_database::<K, V>(name)? {
|
|
||||||
Some(db) => Ok(db),
|
|
||||||
None => Ok(env.create_database::<K, V>(name)?),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn make_update_db_path(path: impl AsRef<Path>, uuid: &Uuid) -> PathBuf {
|
|
||||||
let mut path = path.as_ref().to_path_buf();
|
|
||||||
path.push(format!("update{}", uuid));
|
|
||||||
path
|
|
||||||
}
|
|
||||||
|
|
||||||
fn make_index_db_path(path: impl AsRef<Path>, uuid: &Uuid) -> PathBuf {
|
|
||||||
let mut path = path.as_ref().to_path_buf();
|
|
||||||
path.push(format!("index{}", uuid));
|
|
||||||
path
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod test {
|
|
||||||
use super::*;
|
|
||||||
use std::path::PathBuf;
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_make_update_db_path() {
|
|
||||||
let uuid = Uuid::new_v4();
|
|
||||||
assert_eq!(
|
|
||||||
make_update_db_path("/home", &uuid),
|
|
||||||
PathBuf::from(format!("/home/update{}", uuid))
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_make_index_db_path() {
|
|
||||||
let uuid = Uuid::new_v4();
|
|
||||||
assert_eq!(
|
|
||||||
make_index_db_path("/home", &uuid),
|
|
||||||
PathBuf::from(format!("/home/index{}", uuid))
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
mod index_store {
|
|
||||||
use super::*;
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_index_uuid() {
|
|
||||||
let temp = tempfile::tempdir().unwrap();
|
|
||||||
let store = IndexStore::new(temp, IndexerOpts::default()).unwrap();
|
|
||||||
|
|
||||||
let name = "foobar";
|
|
||||||
let txn = store.env.read_txn().unwrap();
|
|
||||||
// name is not found if the uuid in not present in the db
|
|
||||||
assert!(store.index_uuid(&txn, &name).unwrap().is_none());
|
|
||||||
drop(txn);
|
|
||||||
|
|
||||||
// insert an uuid in the the name_to_uuid_db:
|
|
||||||
let uuid = Uuid::new_v4();
|
|
||||||
let mut txn = store.env.write_txn().unwrap();
|
|
||||||
store
|
|
||||||
.name_to_uuid
|
|
||||||
.put(&mut txn, &name, uuid.as_bytes())
|
|
||||||
.unwrap();
|
|
||||||
txn.commit().unwrap();
|
|
||||||
|
|
||||||
// check that the uuid is there
|
|
||||||
let txn = store.env.read_txn().unwrap();
|
|
||||||
assert_eq!(store.index_uuid(&txn, &name).unwrap(), Some(uuid));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_retrieve_index() {
|
|
||||||
let temp = tempfile::tempdir().unwrap();
|
|
||||||
let store = IndexStore::new(temp, IndexerOpts::default()).unwrap();
|
|
||||||
let uuid = Uuid::new_v4();
|
|
||||||
|
|
||||||
let txn = store.env.read_txn().unwrap();
|
|
||||||
assert!(store.retrieve_index(&txn, uuid).unwrap().is_none());
|
|
||||||
|
|
||||||
let created_at = Utc::now();
|
|
||||||
let updated_at = created_at;
|
|
||||||
|
|
||||||
let meta = IndexMeta {
|
|
||||||
update_store_size: 4096 * 100,
|
|
||||||
index_store_size: 4096 * 100,
|
|
||||||
uuid: uuid.clone(),
|
|
||||||
created_at,
|
|
||||||
updated_at,
|
|
||||||
};
|
|
||||||
let mut txn = store.env.write_txn().unwrap();
|
|
||||||
store
|
|
||||||
.uuid_to_index_meta
|
|
||||||
.put(&mut txn, uuid.as_bytes(), &meta)
|
|
||||||
.unwrap();
|
|
||||||
txn.commit().unwrap();
|
|
||||||
|
|
||||||
// the index cache should be empty
|
|
||||||
assert!(store.uuid_to_index.is_empty());
|
|
||||||
|
|
||||||
let txn = store.env.read_txn().unwrap();
|
|
||||||
assert!(store.retrieve_index(&txn, uuid).unwrap().is_some());
|
|
||||||
assert_eq!(store.uuid_to_index.len(), 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_index() {
|
|
||||||
let temp = tempfile::tempdir().unwrap();
|
|
||||||
let store = IndexStore::new(temp, IndexerOpts::default()).unwrap();
|
|
||||||
let name = "foobar";
|
|
||||||
|
|
||||||
assert!(store.index(&name).unwrap().is_none());
|
|
||||||
|
|
||||||
let created_at = Utc::now();
|
|
||||||
let updated_at = created_at;
|
|
||||||
|
|
||||||
let uuid = Uuid::new_v4();
|
|
||||||
let meta = IndexMeta {
|
|
||||||
update_store_size: 4096 * 100,
|
|
||||||
index_store_size: 4096 * 100,
|
|
||||||
uuid: uuid.clone(),
|
|
||||||
created_at,
|
|
||||||
updated_at,
|
|
||||||
};
|
|
||||||
let mut txn = store.env.write_txn().unwrap();
|
|
||||||
store
|
|
||||||
.name_to_uuid
|
|
||||||
.put(&mut txn, &name, uuid.as_bytes())
|
|
||||||
.unwrap();
|
|
||||||
store
|
|
||||||
.uuid_to_index_meta
|
|
||||||
.put(&mut txn, uuid.as_bytes(), &meta)
|
|
||||||
.unwrap();
|
|
||||||
txn.commit().unwrap();
|
|
||||||
|
|
||||||
assert!(store.index(&name).unwrap().is_some());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_get_or_create_index() {
|
|
||||||
let temp = tempfile::tempdir().unwrap();
|
|
||||||
let store = IndexStore::new(temp, IndexerOpts::default()).unwrap();
|
|
||||||
let name = "foobar";
|
|
||||||
|
|
||||||
let update_store_size = 4096 * 100;
|
|
||||||
let index_store_size = 4096 * 100;
|
|
||||||
store
|
|
||||||
.get_or_create_index(&name, update_store_size, index_store_size)
|
|
||||||
.unwrap();
|
|
||||||
let txn = store.env.read_txn().unwrap();
|
|
||||||
let uuid = store.name_to_uuid.get(&txn, &name).unwrap();
|
|
||||||
assert_eq!(store.uuid_to_index.len(), 1);
|
|
||||||
assert!(uuid.is_some());
|
|
||||||
let uuid = Uuid::from_slice(uuid.unwrap()).unwrap();
|
|
||||||
let meta = store
|
|
||||||
.uuid_to_index_meta
|
|
||||||
.get(&txn, uuid.as_bytes())
|
|
||||||
.unwrap()
|
|
||||||
.unwrap();
|
|
||||||
assert_eq!(meta.update_store_size, update_store_size);
|
|
||||||
assert_eq!(meta.index_store_size, index_store_size);
|
|
||||||
assert_eq!(meta.uuid, uuid);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_create_index() {
|
|
||||||
let temp = tempfile::tempdir().unwrap();
|
|
||||||
let store = IndexStore::new(temp, IndexerOpts::default()).unwrap();
|
|
||||||
let name = "foobar";
|
|
||||||
|
|
||||||
let update_store_size = 4096 * 100;
|
|
||||||
let index_store_size = 4096 * 100;
|
|
||||||
let uuid = Uuid::new_v4();
|
|
||||||
let mut txn = store.env.write_txn().unwrap();
|
|
||||||
store
|
|
||||||
.create_index_txn(&mut txn, uuid, name, update_store_size, index_store_size)
|
|
||||||
.unwrap();
|
|
||||||
let uuid = store.name_to_uuid.get(&txn, &name).unwrap();
|
|
||||||
assert_eq!(store.uuid_to_index.len(), 1);
|
|
||||||
assert!(uuid.is_some());
|
|
||||||
let uuid = Uuid::from_slice(uuid.unwrap()).unwrap();
|
|
||||||
let meta = store
|
|
||||||
.uuid_to_index_meta
|
|
||||||
.get(&txn, uuid.as_bytes())
|
|
||||||
.unwrap()
|
|
||||||
.unwrap();
|
|
||||||
assert_eq!(meta.update_store_size, update_store_size);
|
|
||||||
assert_eq!(meta.index_store_size, index_store_size);
|
|
||||||
assert_eq!(meta.uuid, uuid);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,228 +0,0 @@
|
|||||||
mod update_store;
|
|
||||||
mod index_store;
|
|
||||||
mod update_handler;
|
|
||||||
|
|
||||||
use std::path::Path;
|
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
use anyhow::{bail, Context};
|
|
||||||
use itertools::Itertools;
|
|
||||||
use milli::Index;
|
|
||||||
|
|
||||||
use crate::option::IndexerOpts;
|
|
||||||
use index_store::IndexStore;
|
|
||||||
use super::IndexController;
|
|
||||||
use super::updates::UpdateStatus;
|
|
||||||
use super::{UpdateMeta, UpdateResult, IndexMetadata, IndexSettings};
|
|
||||||
|
|
||||||
pub struct LocalIndexController {
|
|
||||||
indexes: IndexStore,
|
|
||||||
update_db_size: u64,
|
|
||||||
index_db_size: u64,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl LocalIndexController {
|
|
||||||
pub fn new(
|
|
||||||
path: impl AsRef<Path>,
|
|
||||||
opt: IndexerOpts,
|
|
||||||
index_db_size: u64,
|
|
||||||
update_db_size: u64,
|
|
||||||
) -> anyhow::Result<Self> {
|
|
||||||
let indexes = IndexStore::new(path, opt)?;
|
|
||||||
Ok(Self { indexes, index_db_size, update_db_size })
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl IndexController for LocalIndexController {
|
|
||||||
fn add_documents<S: AsRef<str>>(
|
|
||||||
&self,
|
|
||||||
index: S,
|
|
||||||
method: milli::update::IndexDocumentsMethod,
|
|
||||||
format: milli::update::UpdateFormat,
|
|
||||||
data: &[u8],
|
|
||||||
primary_key: Option<String>,
|
|
||||||
) -> anyhow::Result<UpdateStatus<UpdateMeta, UpdateResult, String>> {
|
|
||||||
let (_, update_store) = self.indexes.get_or_create_index(&index, self.update_db_size, self.index_db_size)?;
|
|
||||||
let meta = UpdateMeta::DocumentsAddition { method, format, primary_key };
|
|
||||||
let pending = update_store.register_update(meta, data)?;
|
|
||||||
Ok(pending.into())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn update_settings<S: AsRef<str>>(
|
|
||||||
&self,
|
|
||||||
index: S,
|
|
||||||
settings: super::Settings
|
|
||||||
) -> anyhow::Result<UpdateStatus<UpdateMeta, UpdateResult, String>> {
|
|
||||||
let (_, update_store) = self.indexes.get_or_create_index(&index, self.update_db_size, self.index_db_size)?;
|
|
||||||
let meta = UpdateMeta::Settings(settings);
|
|
||||||
let pending = update_store.register_update(meta, &[])?;
|
|
||||||
Ok(pending.into())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn create_index(&self, index_settings: IndexSettings) -> anyhow::Result<IndexMetadata> {
|
|
||||||
let index_name = index_settings.name.context("Missing name for index")?;
|
|
||||||
let (index, _, meta) = self.indexes.create_index(&index_name, self.update_db_size, self.index_db_size)?;
|
|
||||||
if let Some(ref primary_key) = index_settings.primary_key {
|
|
||||||
if let Err(e) = update_primary_key(index, primary_key).context("error creating index") {
|
|
||||||
// TODO: creating index could not be completed, delete everything.
|
|
||||||
Err(e)?
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let meta = IndexMetadata {
|
|
||||||
uid: index_name,
|
|
||||||
uuid: meta.uuid.clone(),
|
|
||||||
created_at: meta.created_at,
|
|
||||||
updated_at: meta.created_at,
|
|
||||||
primary_key: index_settings.primary_key,
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(meta)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn delete_index<S: AsRef<str>>(&self, index_uid: S) -> anyhow::Result<()> {
|
|
||||||
self.indexes.delete(index_uid)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn swap_indices<S1: AsRef<str>, S2: AsRef<str>>(&self, _index1_uid: S1, _index2_uid: S2) -> anyhow::Result<()> {
|
|
||||||
todo!()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn index(&self, name: impl AsRef<str>) -> anyhow::Result<Option<Arc<Index>>> {
|
|
||||||
let index = self.indexes.index(name)?.map(|(i, _)| i);
|
|
||||||
Ok(index)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn update_status(&self, index: impl AsRef<str>, id: u64) -> anyhow::Result<Option<UpdateStatus<UpdateMeta, UpdateResult, String>>> {
|
|
||||||
match self.indexes.index(&index)? {
|
|
||||||
Some((_, update_store)) => Ok(update_store.meta(id)?),
|
|
||||||
None => bail!("index {:?} doesn't exist", index.as_ref()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn all_update_status(&self, index: impl AsRef<str>) -> anyhow::Result<Vec<UpdateStatus<UpdateMeta, UpdateResult, String>>> {
|
|
||||||
match self.indexes.index(&index)? {
|
|
||||||
Some((_, update_store)) => {
|
|
||||||
let updates = update_store.iter_metas(|processing, processed, pending, aborted, failed| {
|
|
||||||
Ok(processing
|
|
||||||
.map(UpdateStatus::from)
|
|
||||||
.into_iter()
|
|
||||||
.chain(pending.filter_map(|p| p.ok()).map(|(_, u)| UpdateStatus::from(u)))
|
|
||||||
.chain(aborted.filter_map(Result::ok).map(|(_, u)| UpdateStatus::from(u)))
|
|
||||||
.chain(processed.filter_map(Result::ok).map(|(_, u)| UpdateStatus::from(u)))
|
|
||||||
.chain(failed.filter_map(Result::ok).map(|(_, u)| UpdateStatus::from(u)))
|
|
||||||
.sorted_by(|a, b| a.id().cmp(&b.id()))
|
|
||||||
.collect())
|
|
||||||
})?;
|
|
||||||
Ok(updates)
|
|
||||||
}
|
|
||||||
None => bail!("index {} doesn't exist.", index.as_ref()),
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
fn list_indexes(&self) -> anyhow::Result<Vec<IndexMetadata>> {
|
|
||||||
let metas = self.indexes.list_indexes()?;
|
|
||||||
let mut output_meta = Vec::new();
|
|
||||||
for (uid, meta, primary_key) in metas {
|
|
||||||
let created_at = meta.created_at;
|
|
||||||
let uuid = meta.uuid;
|
|
||||||
let updated_at = self
|
|
||||||
.all_update_status(&uid)?
|
|
||||||
.iter()
|
|
||||||
.filter_map(|u| u.processed().map(|u| u.processed_at))
|
|
||||||
.max()
|
|
||||||
.unwrap_or(created_at);
|
|
||||||
|
|
||||||
let index_meta = IndexMetadata {
|
|
||||||
uid,
|
|
||||||
created_at,
|
|
||||||
updated_at,
|
|
||||||
uuid,
|
|
||||||
primary_key,
|
|
||||||
};
|
|
||||||
output_meta.push(index_meta);
|
|
||||||
}
|
|
||||||
Ok(output_meta)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn update_index(&self, uid: impl AsRef<str>, index_settings: IndexSettings) -> anyhow::Result<IndexMetadata> {
|
|
||||||
if index_settings.name.is_some() {
|
|
||||||
bail!("can't update an index name.")
|
|
||||||
}
|
|
||||||
|
|
||||||
let (primary_key, meta) = match index_settings.primary_key {
|
|
||||||
Some(ref primary_key) => {
|
|
||||||
self.indexes
|
|
||||||
.update_index(&uid, |index| {
|
|
||||||
let mut txn = index.write_txn()?;
|
|
||||||
if index.primary_key(&txn)?.is_some() {
|
|
||||||
bail!("primary key already exists.")
|
|
||||||
}
|
|
||||||
index.put_primary_key(&mut txn, primary_key)?;
|
|
||||||
txn.commit()?;
|
|
||||||
Ok(Some(primary_key.clone()))
|
|
||||||
})?
|
|
||||||
},
|
|
||||||
None => {
|
|
||||||
let (index, meta) = self.indexes
|
|
||||||
.index_with_meta(&uid)?
|
|
||||||
.with_context(|| format!("index {:?} doesn't exist.", uid.as_ref()))?;
|
|
||||||
let primary_key = index
|
|
||||||
.primary_key(&index.read_txn()?)?
|
|
||||||
.map(String::from);
|
|
||||||
(primary_key, meta)
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(IndexMetadata {
|
|
||||||
uid: uid.as_ref().to_string(),
|
|
||||||
uuid: meta.uuid.clone(),
|
|
||||||
created_at: meta.created_at,
|
|
||||||
updated_at: meta.updated_at,
|
|
||||||
primary_key,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
fn clear_documents(&self, index: impl AsRef<str>) -> anyhow::Result<super::UpdateStatus> {
|
|
||||||
let (_, update_store) = self.indexes.index(&index)?
|
|
||||||
.with_context(|| format!("Index {:?} doesn't exist", index.as_ref()))?;
|
|
||||||
let meta = UpdateMeta::ClearDocuments;
|
|
||||||
let pending = update_store.register_update(meta, &[])?;
|
|
||||||
Ok(pending.into())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn delete_documents(&self, index: impl AsRef<str>, document_ids: Vec<String>) -> anyhow::Result<super::UpdateStatus> {
|
|
||||||
let (_, update_store) = self.indexes.index(&index)?
|
|
||||||
.with_context(|| format!("Index {:?} doesn't exist", index.as_ref()))?;
|
|
||||||
let meta = UpdateMeta::DeleteDocuments;
|
|
||||||
let content = serde_json::to_vec(&document_ids)?;
|
|
||||||
let pending = update_store.register_update(meta, &content)?;
|
|
||||||
Ok(pending.into())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn update_primary_key(index: impl AsRef<Index>, primary_key: impl AsRef<str>) -> anyhow::Result<()> {
|
|
||||||
let index = index.as_ref();
|
|
||||||
let mut txn = index.write_txn()?;
|
|
||||||
if index.primary_key(&txn)?.is_some() {
|
|
||||||
bail!("primary key already set.")
|
|
||||||
}
|
|
||||||
index.put_primary_key(&mut txn, primary_key.as_ref())?;
|
|
||||||
txn.commit()?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod test {
|
|
||||||
use super::*;
|
|
||||||
use tempfile::tempdir;
|
|
||||||
use crate::make_index_controller_tests;
|
|
||||||
|
|
||||||
make_index_controller_tests!({
|
|
||||||
let options = IndexerOpts::default();
|
|
||||||
let path = tempdir().unwrap();
|
|
||||||
let size = 4096 * 100;
|
|
||||||
LocalIndexController::new(path, options, size, size).unwrap()
|
|
||||||
});
|
|
||||||
}
|
|
@ -28,7 +28,6 @@ async fn add_documents_no_index_creation() {
|
|||||||
|
|
||||||
let (response, code) = index.get_update(0).await;
|
let (response, code) = index.get_update(0).await;
|
||||||
assert_eq!(code, 200);
|
assert_eq!(code, 200);
|
||||||
println!("response: {}", response);
|
|
||||||
assert_eq!(response["status"], "processed");
|
assert_eq!(response["status"], "processed");
|
||||||
assert_eq!(response["updateId"], 0);
|
assert_eq!(response["updateId"], 0);
|
||||||
assert_eq!(response["success"]["DocumentsAddition"]["nb_documents"], 1);
|
assert_eq!(response["success"]["DocumentsAddition"]["nb_documents"], 1);
|
||||||
|
@ -7,7 +7,6 @@ async fn create_index_no_primary_key() {
|
|||||||
let index = server.index("test");
|
let index = server.index("test");
|
||||||
let (response, code) = index.create(None).await;
|
let (response, code) = index.create(None).await;
|
||||||
|
|
||||||
println!("response: {}", response);
|
|
||||||
|
|
||||||
assert_eq!(code, 200);
|
assert_eq!(code, 200);
|
||||||
assert_eq!(response["uid"], "test");
|
assert_eq!(response["uid"], "test");
|
||||||
|
@ -6,13 +6,11 @@ async fn create_and_delete_index() {
|
|||||||
let index = server.index("test");
|
let index = server.index("test");
|
||||||
let (_response, code) = index.create(None).await;
|
let (_response, code) = index.create(None).await;
|
||||||
|
|
||||||
println!("response: {}", _response);
|
|
||||||
|
|
||||||
assert_eq!(code, 200);
|
assert_eq!(code, 200);
|
||||||
|
|
||||||
let (_response, code) = index.delete().await;
|
let (_response, code) = index.delete().await;
|
||||||
|
|
||||||
println!("response: {}", _response);
|
|
||||||
|
|
||||||
assert_eq!(code, 200);
|
assert_eq!(code, 200);
|
||||||
|
|
||||||
|
@ -36,7 +36,6 @@ async fn test_partial_update() {
|
|||||||
let server = Server::new().await;
|
let server = Server::new().await;
|
||||||
let index = server.index("test");
|
let index = server.index("test");
|
||||||
let (response, _code) = index.update_settings(json!({"displayedAttributes": ["foo"]})).await;
|
let (response, _code) = index.update_settings(json!({"displayedAttributes": ["foo"]})).await;
|
||||||
println!("response: {}", response);
|
|
||||||
index.wait_update_id(0).await;
|
index.wait_update_id(0).await;
|
||||||
let (response, code) = index.settings().await;
|
let (response, code) = index.settings().await;
|
||||||
assert_eq!(code, 200);
|
assert_eq!(code, 200);
|
||||||
@ -44,7 +43,6 @@ async fn test_partial_update() {
|
|||||||
assert_eq!(response["searchableAttributes"],json!(["*"]));
|
assert_eq!(response["searchableAttributes"],json!(["*"]));
|
||||||
|
|
||||||
let (response, _) = index.update_settings(json!({"searchableAttributes": ["bar"]})).await;
|
let (response, _) = index.update_settings(json!({"searchableAttributes": ["bar"]})).await;
|
||||||
println!("resp: {}", response);
|
|
||||||
index.wait_update_id(1).await;
|
index.wait_update_id(1).await;
|
||||||
|
|
||||||
let (response, code) = index.settings().await;
|
let (response, code) = index.settings().await;
|
||||||
@ -96,7 +94,6 @@ async fn update_setting_unexisting_index_invalid_uid() {
|
|||||||
let server = Server::new().await;
|
let server = Server::new().await;
|
||||||
let index = server.index("test##! ");
|
let index = server.index("test##! ");
|
||||||
let (_response, code) = index.update_settings(json!({})).await;
|
let (_response, code) = index.update_settings(json!({})).await;
|
||||||
println!("response: {}", _response);
|
|
||||||
assert_eq!(code, 400);
|
assert_eq!(code, 400);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -46,7 +46,6 @@ async fn list_no_updates() {
|
|||||||
let index = server.index("test");
|
let index = server.index("test");
|
||||||
index.create(None).await;
|
index.create(None).await;
|
||||||
let (response, code) = index.list_updates().await;
|
let (response, code) = index.list_updates().await;
|
||||||
println!("response: {}", response);
|
|
||||||
assert_eq!(code, 200);
|
assert_eq!(code, 200);
|
||||||
assert!(response.as_array().unwrap().is_empty());
|
assert!(response.as_array().unwrap().is_empty());
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user