Merge pull request #311 from meilisearch/add-index-name-and-id

Add index name and change some routes request body & response
This commit is contained in:
Clément Renault 2019-11-21 11:59:14 +01:00 committed by GitHub
commit 4abea919b2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 500 additions and 220 deletions

20
Cargo.lock generated
View File

@ -839,7 +839,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "meilidb-core"
version = "0.7.0"
version = "0.8.0"
dependencies = [
"arc-swap 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)",
"assert_matches 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
@ -856,9 +856,9 @@ dependencies = [
"indexmap 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"levenshtein_automata 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"meilidb-schema 0.6.0",
"meilidb-tokenizer 0.6.1",
"meilidb-types 0.1.0",
"meilidb-schema 0.8.0",
"meilidb-tokenizer 0.8.0",
"meilidb-types 0.8.0",
"once_cell 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"ordered-float 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
"rustyline 5.0.4 (registry+https://github.com/rust-lang/crates.io-index)",
@ -876,7 +876,7 @@ dependencies = [
[[package]]
name = "meilidb-http"
version = "0.3.0"
version = "0.8.0"
dependencies = [
"async-compression 0.1.0-alpha.7 (registry+https://github.com/rust-lang/crates.io-index)",
"bincode 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
@ -889,8 +889,8 @@ dependencies = [
"jemallocator 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"main_error 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"meilidb-core 0.7.0",
"meilidb-schema 0.6.0",
"meilidb-core 0.8.0",
"meilidb-schema 0.8.0",
"pretty-bytes 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
"rayon 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
@ -908,7 +908,7 @@ dependencies = [
[[package]]
name = "meilidb-schema"
version = "0.6.0"
version = "0.8.0"
dependencies = [
"bincode 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"indexmap 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
@ -919,7 +919,7 @@ dependencies = [
[[package]]
name = "meilidb-tokenizer"
version = "0.6.1"
version = "0.8.0"
dependencies = [
"deunicode 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"slice-group-by 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)",
@ -927,7 +927,7 @@ dependencies = [
[[package]]
name = "meilidb-types"
version = "0.1.0"
version = "0.8.0"
dependencies = [
"serde 1.0.102 (registry+https://github.com/rust-lang/crates.io-index)",
"zerocopy 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)",

View File

@ -22,7 +22,7 @@ struct IndexCommand {
database_path: PathBuf,
#[structopt(long, default_value = "default")]
index_name: String,
index_uid: String,
/// The csv file to index.
#[structopt(parse(from_os_str))]
@ -46,7 +46,7 @@ struct SearchCommand {
database_path: PathBuf,
#[structopt(long, default_value = "default")]
index_name: String,
index_uid: String,
/// Timeout after which the search will return results.
#[structopt(long)]
@ -76,7 +76,7 @@ struct ShowUpdatesCommand {
database_path: PathBuf,
#[structopt(long, default_value = "default")]
index_name: String,
index_uid: String,
}
#[derive(Debug, StructOpt)]
@ -106,9 +106,9 @@ fn index_command(command: IndexCommand, database: Database) -> Result<(), Box<dy
let (sender, receiver) = mpsc::sync_channel(100);
let update_fn =
move |_name: &str, update: ProcessedUpdateResult| sender.send(update.update_id).unwrap();
let index = match database.open_index(&command.index_name) {
let index = match database.open_index(&command.index_uid) {
Some(index) => index,
None => database.create_index(&command.index_name).unwrap(),
None => database.create_index(&command.index_uid).unwrap(),
};
database.set_update_callback(Box::new(update_fn));
@ -318,7 +318,7 @@ fn crop_text(
fn search_command(command: SearchCommand, database: Database) -> Result<(), Box<dyn Error>> {
let env = &database.env;
let index = database
.open_index(&command.index_name)
.open_index(&command.index_uid)
.expect("Could not find index");
let reader = env.read_txn().unwrap();
@ -446,7 +446,7 @@ fn show_updates_command(
) -> Result<(), Box<dyn Error>> {
let env = &database.env;
let index = database
.open_index(&command.index_name)
.open_index(&command.index_uid)
.expect("Could not find index");
let reader = env.read_txn().unwrap();

View File

@ -45,7 +45,7 @@ pub type UpdateEventsEmitter = Sender<UpdateEvent>;
fn update_awaiter(
receiver: UpdateEvents,
env: heed::Env,
index_name: &str,
index_uid: &str,
update_fn: Arc<ArcSwapFn>,
index: Index,
) {
@ -91,7 +91,7 @@ fn update_awaiter(
// call the user callback when the update and the result are written consistently
if let Some(ref callback) = *update_fn.load() {
(callback)(index_name, status);
(callback)(index_uid, status);
}
}
}
@ -116,22 +116,22 @@ impl Database {
let mut must_open = Vec::new();
let reader = env.read_txn()?;
for result in indexes_store.iter(&reader)? {
let (index_name, _) = result?;
must_open.push(index_name.to_owned());
let (index_uid, _) = result?;
must_open.push(index_uid.to_owned());
}
reader.abort();
// open the previously aggregated indexes
let mut indexes = HashMap::new();
for index_name in must_open {
for index_uid in must_open {
let (sender, receiver) = crossbeam_channel::bounded(100);
let index = match store::open(&env, &index_name, sender.clone())? {
let index = match store::open(&env, &index_uid, sender.clone())? {
Some(index) => index,
None => {
log::warn!(
"the index {} doesn't exist or has not all the databases",
index_name
index_uid
);
continue;
}
@ -139,7 +139,7 @@ impl Database {
let env_clone = env.clone();
let index_clone = index.clone();
let name_clone = index_name.clone();
let name_clone = index_uid.clone();
let update_fn_clone = update_fn.clone();
let handle = thread::spawn(move || {
@ -156,7 +156,7 @@ impl Database {
// possible pre-boot updates are consumed
sender.send(UpdateEvent::NewUpdate).unwrap();
let result = indexes.insert(index_name, (index, handle));
let result = indexes.insert(index_uid, (index, handle));
assert!(
result.is_none(),
"The index should not have been already open"
@ -251,9 +251,9 @@ impl Database {
self.env.copy_to_path(path, CompactionOption::Enabled)
}
pub fn indexes_names(&self) -> MResult<Vec<String>> {
pub fn indexes_uids(&self) -> Vec<String> {
let indexes = self.indexes.read().unwrap();
Ok(indexes.keys().cloned().collect())
indexes.keys().cloned().collect()
}
pub fn common_store(&self) -> heed::PolyDatabase {

View File

@ -1,17 +1,27 @@
use crate::RankedMap;
use chrono::{DateTime, Utc};
use heed::types::{ByteSlice, OwnedType, SerdeBincode, Str};
use heed::Result as ZResult;
use meilidb_schema::Schema;
use std::collections::HashMap;
use std::sync::Arc;
const CREATED_AT_KEY: &str = "created-at";
const CUSTOMS_KEY: &str = "customs-key";
const FIELDS_FREQUENCY_KEY: &str = "fields-frequency";
const NAME_KEY: &str = "name";
const NUMBER_OF_DOCUMENTS_KEY: &str = "number-of-documents";
const RANKED_MAP_KEY: &str = "ranked-map";
const SCHEMA_KEY: &str = "schema";
const SYNONYMS_KEY: &str = "synonyms";
const STOP_WORDS_KEY: &str = "stop-words";
const SYNONYMS_KEY: &str = "synonyms";
const UPDATED_AT_KEY: &str = "updated-at";
const WORDS_KEY: &str = "words";
pub type FreqsMap = HashMap<String, usize>;
type SerdeFreqsMap = SerdeBincode<FreqsMap>;
type SerdeDatetime = SerdeBincode<DateTime<Utc>>;
#[derive(Copy, Clone)]
pub struct Main {
pub(crate) main: heed::PolyDatabase,
@ -22,6 +32,35 @@ impl Main {
self.main.clear(writer)
}
pub fn put_name(self, writer: &mut heed::RwTxn, name: &str) -> ZResult<()> {
self.main.put::<Str, Str>(writer, NAME_KEY, name)
}
pub fn name(self, reader: &heed::RoTxn) -> ZResult<Option<String>> {
Ok(self
.main
.get::<Str, Str>(reader, NAME_KEY)?
.map(|name| name.to_owned()))
}
pub fn put_created_at(self, writer: &mut heed::RwTxn) -> ZResult<()> {
self.main
.put::<Str, SerdeDatetime>(writer, CREATED_AT_KEY, &Utc::now())
}
pub fn created_at(self, reader: &heed::RoTxn) -> ZResult<Option<DateTime<Utc>>> {
self.main.get::<Str, SerdeDatetime>(reader, CREATED_AT_KEY)
}
pub fn put_updated_at(self, writer: &mut heed::RwTxn) -> ZResult<()> {
self.main
.put::<Str, SerdeDatetime>(writer, UPDATED_AT_KEY, &Utc::now())
}
pub fn updated_at(self, reader: &heed::RoTxn) -> ZResult<Option<DateTime<Utc>>> {
self.main.get::<Str, SerdeDatetime>(reader, UPDATED_AT_KEY)
}
pub fn put_words_fst(self, writer: &mut heed::RwTxn, fst: &fst::Set) -> ZResult<()> {
let bytes = fst.as_fst().as_bytes();
self.main.put::<Str, ByteSlice>(writer, WORDS_KEY, bytes)
@ -114,6 +153,25 @@ impl Main {
}
}
pub fn put_fields_frequency(
self,
writer: &mut heed::RwTxn,
fields_frequency: &FreqsMap,
) -> ZResult<()> {
self.main
.put::<Str, SerdeFreqsMap>(writer, FIELDS_FREQUENCY_KEY, fields_frequency)
}
pub fn fields_frequency(&self, reader: &heed::RoTxn) -> ZResult<Option<FreqsMap>> {
match self
.main
.get::<Str, SerdeFreqsMap>(reader, FIELDS_FREQUENCY_KEY)?
{
Some(freqs) => Ok(Some(freqs)),
None => Ok(None),
}
}
pub fn put_customs(self, writer: &mut heed::RwTxn, customs: &[u8]) -> ZResult<()> {
self.main
.put::<Str, ByteSlice>(writer, CUSTOMS_KEY, customs)

View File

@ -4,15 +4,15 @@ use std::sync::Arc;
use chrono::{DateTime, Utc};
use heed::types::{SerdeBincode, Str};
use log::*;
use meilidb_core::{Database, MResult};
use log::error;
use meilidb_core::{Database, Error as MError, MResult};
use sysinfo::Pid;
use crate::option::Opt;
use crate::routes::index::index_update_callback;
pub type FreqsMap = HashMap<String, usize>;
type SerdeFreqsMap = SerdeBincode<FreqsMap>;
const LAST_UPDATE_KEY: &str = "last-update";
type SerdeDatetime = SerdeBincode<DateTime<Utc>>;
#[derive(Clone)]
@ -44,51 +44,29 @@ impl DataInner {
}
}
pub fn last_update(
&self,
reader: &heed::RoTxn,
index_name: &str,
) -> MResult<Option<DateTime<Utc>>> {
let key = format!("last-update-{}", index_name);
pub fn last_update(&self, reader: &heed::RoTxn) -> MResult<Option<DateTime<Utc>>> {
match self
.db
.common_store()
.get::<Str, SerdeDatetime>(&reader, &key)?
.get::<Str, SerdeDatetime>(reader, LAST_UPDATE_KEY)?
{
Some(datetime) => Ok(Some(datetime)),
None => Ok(None),
}
}
pub fn set_last_update(&self, writer: &mut heed::RwTxn, index_name: &str) -> MResult<()> {
let key = format!("last-update-{}", index_name);
pub fn set_last_update(&self, writer: &mut heed::RwTxn) -> MResult<()> {
self.db
.common_store()
.put::<Str, SerdeDatetime>(writer, &key, &Utc::now())
.put::<Str, SerdeDatetime>(writer, LAST_UPDATE_KEY, &Utc::now())
.map_err(Into::into)
}
pub fn fields_frequency(
&self,
reader: &heed::RoTxn,
index_name: &str,
) -> MResult<Option<FreqsMap>> {
let key = format!("fields-frequency-{}", index_name);
match self
.db
.common_store()
.get::<Str, SerdeFreqsMap>(&reader, &key)?
{
Some(freqs) => Ok(Some(freqs)),
None => Ok(None),
}
}
pub fn compute_stats(&self, writer: &mut heed::RwTxn, index_name: &str) -> MResult<()> {
let index = match self.db.open_index(&index_name) {
pub fn compute_stats(&self, writer: &mut heed::RwTxn, index_uid: &str) -> MResult<()> {
let index = match self.db.open_index(&index_uid) {
Some(index) => index,
None => {
error!("Impossible to retrieve index {}", index_name);
error!("Impossible to retrieve index {}", index_uid);
return Ok(());
}
};
@ -115,12 +93,10 @@ impl DataInner {
.map(|(a, c)| (schema.attribute_name(a).to_owned(), c))
.collect();
let key = format!("fields-frequency-{}", index_name);
self.db
.common_store()
.put::<Str, SerdeFreqsMap>(writer, &key, &frequency)?;
Ok(())
index
.main
.put_fields_frequency(writer, &frequency)
.map_err(MError::Zlmdb)
}
}
@ -144,8 +120,8 @@ impl Data {
};
let callback_context = data.clone();
db.set_update_callback(Box::new(move |index_name, status| {
index_update_callback(&index_name, &callback_context, status);
db.set_update_callback(Box::new(move |index_uid, status| {
index_update_callback(&index_uid, &callback_context, status);
}));
data

View File

@ -1,6 +1,6 @@
use crate::routes::setting::{RankingOrdering, SettingBody};
use indexmap::IndexMap;
use log::*;
use log::error;
use meilidb_core::criterion::*;
use meilidb_core::Highlight;
use meilidb_core::{Index, RankedMap};

View File

@ -38,9 +38,9 @@ impl ContextExt for Context<Data> {
.common_store()
.get::<Str, SerdeBincode<Token>>(&reader, &token_key)
.map_err(ResponseError::internal)?
.ok_or(ResponseError::not_found(format!(
"token key: {}",
token_key
.ok_or(ResponseError::invalid_token(format!(
"Api key does not exist: {}",
user_api_key
)))?;
if token_config.revoked {
@ -93,12 +93,12 @@ impl ContextExt for Context<Data> {
}
fn index(&self) -> Result<Index, ResponseError> {
let index_name = self.url_param("index")?;
let index_uid = self.url_param("index")?;
let index = self
.state()
.db
.open_index(&index_name)
.ok_or(ResponseError::index_not_found(index_name))?;
.open_index(&index_uid)
.ok_or(ResponseError::index_not_found(index_uid))?;
Ok(index)
}

View File

@ -74,7 +74,7 @@ struct BrowseQuery {
attributes_to_retrieve: Option<String>,
}
pub async fn browse_documents(ctx: Context<Data>) -> SResult<Response> {
pub async fn get_all_documents(ctx: Context<Data>) -> SResult<Response> {
ctx.is_allowed(DocumentsRead)?;
let index = ctx.index()?;
@ -114,15 +114,7 @@ pub async fn browse_documents(ctx: Context<Data>) -> SResult<Response> {
}
}
if response_body.is_empty() {
Ok(tide::response::json(response_body)
.with_status(StatusCode::NO_CONTENT)
.into_response())
} else {
Ok(tide::response::json(response_body)
.with_status(StatusCode::OK)
.into_response())
}
Ok(tide::response::json(response_body))
}
fn infered_schema(document: &IndexMap<String, Value>) -> Option<meilidb_schema::Schema> {

View File

@ -1,7 +1,12 @@
use chrono::{DateTime, Utc};
use http::StatusCode;
use log::error;
use meilidb_core::ProcessedUpdateResult;
use meilidb_schema::Schema;
use meilidb_schema::{Schema, SchemaBuilder};
use rand::seq::SliceRandom;
use serde::{Deserialize, Serialize};
use serde_json::json;
use tide::querystring::ContextExt as QSContextExt;
use tide::response::IntoResponse;
use tide::{Context, Response};
@ -12,17 +17,74 @@ use crate::models::token::ACL::*;
use crate::routes::document::IndexUpdateResponse;
use crate::Data;
pub async fn list_indexes(ctx: Context<Data>) -> SResult<Response> {
ctx.is_allowed(IndexesRead)?;
let list = ctx
.state()
.db
.indexes_names()
.map_err(ResponseError::internal)?;
Ok(tide::response::json(list))
fn generate_uid() -> String {
let mut rng = rand::thread_rng();
let sample = b"abcdefghijklmnopqrstuvwxyz0123456789";
sample
.choose_multiple(&mut rng, 8)
.map(|c| *c as char)
.collect()
}
pub async fn get_index_schema(ctx: Context<Data>) -> SResult<Response> {
pub async fn list_indexes(ctx: Context<Data>) -> SResult<Response> {
ctx.is_allowed(IndexesRead)?;
let indexes_uids = ctx.state().db.indexes_uids();
let env = &ctx.state().db.env;
let reader = env.read_txn().map_err(ResponseError::internal)?;
let mut response_body = Vec::new();
for index_uid in indexes_uids {
let index = ctx.state().db.open_index(&index_uid);
match index {
Some(index) => {
let name = index
.main
.name(&reader)
.map_err(ResponseError::internal)?
.ok_or(ResponseError::internal("'name' not found"))?;
let created_at = index
.main
.created_at(&reader)
.map_err(ResponseError::internal)?
.ok_or(ResponseError::internal("'created_at' date not found"))?;
let updated_at = index
.main
.updated_at(&reader)
.map_err(ResponseError::internal)?
.ok_or(ResponseError::internal("'updated_at' date not found"))?;
let index_reponse = IndexResponse {
name,
uid: index_uid,
created_at,
updated_at,
};
response_body.push(index_reponse);
}
None => error!(
"Index {} is referenced in the indexes list but cannot be found",
index_uid
),
}
}
Ok(tide::response::json(response_body))
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct IndexResponse {
name: String,
uid: String,
created_at: DateTime<Utc>,
updated_at: DateTime<Utc>,
}
pub async fn get_index(ctx: Context<Data>) -> SResult<Response> {
ctx.is_allowed(IndexesRead)?;
let index = ctx.index()?;
@ -30,41 +92,65 @@ pub async fn get_index_schema(ctx: Context<Data>) -> SResult<Response> {
let env = &ctx.state().db.env;
let reader = env.read_txn().map_err(ResponseError::internal)?;
let schema = index
let uid = ctx.url_param("index")?;
let name = index
.main
.schema(&reader)
.map_err(ResponseError::create_index)?;
.name(&reader)
.map_err(ResponseError::internal)?
.ok_or(ResponseError::internal("'name' not found"))?;
let created_at = index
.main
.created_at(&reader)
.map_err(ResponseError::internal)?
.ok_or(ResponseError::internal("'created_at' date not found"))?;
let updated_at = index
.main
.updated_at(&reader)
.map_err(ResponseError::internal)?
.ok_or(ResponseError::internal("'updated_at' date not found"))?;
match schema {
Some(schema) => {
let schema = SchemaBody::from(schema);
Ok(tide::response::json(schema))
}
None => Ok(
tide::response::json(json!({ "message": "missing index schema" }))
.with_status(StatusCode::NOT_FOUND)
.into_response(),
),
}
let response_body = IndexResponse {
name,
uid,
created_at,
updated_at,
};
Ok(tide::response::json(response_body))
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase", deny_unknown_fields)]
struct IndexCreateRequest {
name: String,
schema: Option<SchemaBody>,
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct IndexCreateResponse {
name: String,
uid: String,
schema: Option<SchemaBody>,
#[serde(skip_serializing_if = "Option::is_none")]
update_id: Option<u64>,
created_at: DateTime<Utc>,
updated_at: DateTime<Utc>,
}
pub async fn create_index(mut ctx: Context<Data>) -> SResult<Response> {
ctx.is_allowed(IndexesWrite)?;
let index_name = ctx.url_param("index")?;
let body = ctx
.body_json::<IndexCreateRequest>()
.await
.map_err(ResponseError::bad_request)?;
let body = ctx.body_bytes().await.map_err(ResponseError::bad_request)?;
let schema: Option<Schema> = if body.is_empty() {
None
} else {
serde_json::from_slice::<SchemaBody>(&body)
.map_err(ResponseError::bad_request)
.map(|s| Some(s.into()))?
};
let generated_uid = generate_uid();
let db = &ctx.state().db;
let created_index = match db.create_index(&index_name) {
let created_index = match db.create_index(&generated_uid) {
Ok(index) => index,
Err(e) => return Err(ResponseError::create_index(e)),
};
@ -72,44 +158,172 @@ pub async fn create_index(mut ctx: Context<Data>) -> SResult<Response> {
let env = &db.env;
let mut writer = env.write_txn().map_err(ResponseError::internal)?;
created_index
.main
.put_name(&mut writer, &body.name)
.map_err(ResponseError::internal)?;
created_index
.main
.put_created_at(&mut writer)
.map_err(ResponseError::internal)?;
created_index
.main
.put_updated_at(&mut writer)
.map_err(ResponseError::internal)?;
let schema: Option<Schema> = body.schema.clone().map(Into::into);
let mut response_update_id = None;
if let Some(schema) = schema {
let update_id = created_index
.schema_update(&mut writer, schema)
.map_err(ResponseError::internal)?;
response_update_id = Some(update_id)
}
writer.commit().map_err(ResponseError::internal)?;
let response_body = IndexCreateResponse {
name: body.name,
uid: generated_uid,
schema: body.schema,
update_id: response_update_id,
created_at: Utc::now(),
updated_at: Utc::now(),
};
Ok(tide::response::json(response_body)
.with_status(StatusCode::CREATED)
.into_response())
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase", deny_unknown_fields)]
struct UpdateIndexRequest {
name: String,
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct UpdateIndexResponse {
name: String,
uid: String,
created_at: DateTime<Utc>,
updated_at: DateTime<Utc>,
}
pub async fn update_index(mut ctx: Context<Data>) -> SResult<Response> {
ctx.is_allowed(IndexesWrite)?;
let body = ctx
.body_json::<UpdateIndexRequest>()
.await
.map_err(ResponseError::bad_request)?;
let index_uid = ctx.url_param("index")?;
let index = ctx.index()?;
let db = &ctx.state().db;
let env = &db.env;
let mut writer = env.write_txn().map_err(ResponseError::internal)?;
index
.main
.put_name(&mut writer, &body.name)
.map_err(ResponseError::internal)?;
index
.main
.put_updated_at(&mut writer)
.map_err(ResponseError::internal)?;
writer.commit().map_err(ResponseError::internal)?;
let reader = env.read_txn().map_err(ResponseError::internal)?;
let created_at = index
.main
.created_at(&reader)
.map_err(ResponseError::internal)?
.ok_or(ResponseError::internal("'created_at' date not found"))?;
let updated_at = index
.main
.updated_at(&reader)
.map_err(ResponseError::internal)?
.ok_or(ResponseError::internal("'updated_at' date not found"))?;
let response_body = UpdateIndexResponse {
name: body.name,
uid: index_uid,
created_at,
updated_at,
};
Ok(tide::response::json(response_body)
.with_status(StatusCode::ACCEPTED)
.into_response())
}
#[derive(Default, Deserialize)]
#[serde(rename_all = "camelCase", deny_unknown_fields)]
struct SchemaParams {
raw: bool,
}
pub async fn get_index_schema(ctx: Context<Data>) -> SResult<Response> {
ctx.is_allowed(IndexesRead)?;
let index = ctx.index()?;
// Tide doesn't support "no query param"
let params: SchemaParams = ctx.url_query().unwrap_or_default();
let env = &ctx.state().db.env;
let reader = env.read_txn().map_err(ResponseError::internal)?;
let schema = index
.main
.schema(&reader)
.map_err(ResponseError::open_index)?;
match schema {
Some(schema) => {
let update_id = created_index
.schema_update(&mut writer, schema.clone())
.map_err(ResponseError::internal)?;
writer.commit().map_err(ResponseError::internal)?;
let response_body = IndexUpdateResponse { update_id };
Ok(tide::response::json(response_body)
.with_status(StatusCode::CREATED)
.into_response())
if params.raw {
Ok(tide::response::json(schema))
} else {
Ok(tide::response::json(SchemaBody::from(schema)))
}
}
None => Ok(Response::new(tide::Body::empty())
.with_status(StatusCode::NO_CONTENT)
.into_response()),
None => Err(ResponseError::not_found("missing index schema")),
}
}
pub async fn update_schema(mut ctx: Context<Data>) -> SResult<Response> {
ctx.is_allowed(IndexesWrite)?;
let index_name = ctx.url_param("index")?;
let index_uid = ctx.url_param("index")?;
let schema = ctx
.body_json::<SchemaBody>()
.await
.map_err(ResponseError::bad_request)?;
let params: SchemaParams = ctx.url_query().unwrap_or_default();
let schema = if params.raw {
ctx.body_json::<SchemaBuilder>()
.await
.map_err(ResponseError::bad_request)?
.build()
} else {
ctx.body_json::<SchemaBody>()
.await
.map_err(ResponseError::bad_request)?
.into()
};
let db = &ctx.state().db;
let env = &db.env;
let mut writer = env.write_txn().map_err(ResponseError::internal)?;
let index = db
.open_index(&index_name)
.ok_or(ResponseError::index_not_found(index_name))?;
.open_index(&index_uid)
.ok_or(ResponseError::index_not_found(index_uid))?;
let schema: meilidb_schema::Schema = schema.into();
let update_id = index
.schema_update(&mut writer, schema.clone())
.map_err(ResponseError::internal)?;
@ -169,12 +383,12 @@ pub async fn get_all_updates_status(ctx: Context<Data>) -> SResult<Response> {
pub async fn delete_index(ctx: Context<Data>) -> SResult<StatusCode> {
ctx.is_allowed(IndexesWrite)?;
let index_name = ctx.url_param("index")?;
let index_uid = ctx.url_param("index")?;
let found = ctx
.state()
.db
.delete_index(&index_name)
.delete_index(&index_uid)
.map_err(ResponseError::internal)?;
if found {
@ -184,12 +398,35 @@ pub async fn delete_index(ctx: Context<Data>) -> SResult<StatusCode> {
}
}
pub fn index_update_callback(index_name: &str, data: &Data, _status: ProcessedUpdateResult) {
let env = &data.db.env;
let mut writer = env.write_txn().unwrap();
pub fn index_update_callback(index_uid: &str, data: &Data, status: ProcessedUpdateResult) {
if status.error.is_some() {
return;
}
data.compute_stats(&mut writer, &index_name).unwrap();
data.set_last_update(&mut writer, &index_name).unwrap();
if let Some(index) = data.db.open_index(&index_uid) {
let env = &data.db.env;
let mut writer = match env.write_txn() {
Ok(writer) => writer,
Err(e) => {
error!("Impossible to get write_txn; {}", e);
return;
}
};
writer.commit().unwrap();
if let Err(e) = data.compute_stats(&mut writer, &index_uid) {
error!("Impossible to compute stats; {}", e)
}
if let Err(e) = data.set_last_update(&mut writer) {
error!("Impossible to update last_update; {}", e)
}
if let Err(e) = index.main.put_updated_at(&mut writer) {
error!("Impossible to update updated_at; {}", e)
}
if let Err(e) = writer.commit() {
error!("Impossible to get write_txn; {}", e);
}
}
}

View File

@ -117,6 +117,8 @@ pub struct UpdatedRequest {
description: Option<String>,
acl: Option<Vec<ACL>>,
indexes: Option<Vec<Wildcard>>,
expires_at: Option<DateTime<Utc>>,
revoked: Option<bool>,
}
pub async fn update(mut ctx: Context<Data>) -> SResult<Response> {
@ -154,6 +156,14 @@ pub async fn update(mut ctx: Context<Data>) -> SResult<Response> {
token_config.indexes = indexes;
}
if let Some(expires_at) = data.expires_at {
token_config.expires_at = expires_at;
}
if let Some(revoked) = data.revoked {
token_config.revoked = revoked;
}
token_config.updated_at = Utc::now();
common_store
@ -163,7 +173,7 @@ pub async fn update(mut ctx: Context<Data>) -> SResult<Response> {
writer.commit().map_err(ResponseError::internal)?;
Ok(tide::response::json(token_config)
.with_status(StatusCode::ACCEPTED)
.with_status(StatusCode::OK)
.into_response())
}
@ -185,5 +195,5 @@ pub async fn delete(ctx: Context<Data>) -> SResult<StatusCode> {
writer.commit().map_err(ResponseError::internal)?;
Ok(StatusCode::ACCEPTED)
Ok(StatusCode::NO_CONTENT)
}

View File

@ -13,7 +13,10 @@ pub mod synonym;
pub fn load_routes(app: &mut tide::App<Data>) {
app.at("").nest(|router| {
router.at("/indexes").nest(|router| {
router.at("/").get(index::list_indexes);
router
.at("/")
.get(index::list_indexes)
.post(index::create_index);
router.at("/search").post(search::search_multi_index);
@ -28,15 +31,19 @@ pub fn load_routes(app: &mut tide::App<Data>) {
router
.at("/")
.get(index::get_index_schema)
.post(index::create_index)
.put(index::update_schema)
.get(index::get_index)
.put(index::update_index)
.delete(index::delete_index);
router
.at("/schema")
.get(index::get_index_schema)
.put(index::update_schema);
router.at("/documents").nest(|router| {
router
.at("/")
.get(document::browse_documents)
.get(document::get_all_documents)
.post(document::add_or_replace_multiple_documents)
.put(document::add_or_update_multiple_documents)
.delete(document::clear_all_documents);
@ -53,8 +60,12 @@ pub fn load_routes(app: &mut tide::App<Data>) {
.post(document::delete_multiple_documents);
});
router.at("/synonym").nest(|router| {
router.at("/").get(synonym::list).post(synonym::create);
router.at("/synonyms").nest(|router| {
router
.at("/")
.get(synonym::list)
.post(synonym::create)
.delete(synonym::clear);
router
.at("/:synonym")
@ -63,14 +74,13 @@ pub fn load_routes(app: &mut tide::App<Data>) {
.delete(synonym::delete);
router.at("/batch").post(synonym::batch_write);
router.at("/clear").post(synonym::clear);
});
router.at("/stop-words").nest(|router| {
router
.at("/")
.get(stop_words::list)
.put(stop_words::add)
.patch(stop_words::add)
.delete(stop_words::delete);
});

View File

@ -155,13 +155,8 @@ pub async fn search_multi_index(mut ctx: Context<Data>) -> SResult<Response> {
for index in index_list.clone() {
if index == "*" {
index_list = ctx
.state()
.db
.indexes_names()
.map_err(ResponseError::internal)?
.into_iter()
.collect();
index_list = ctx.state().db.indexes_uids().into_iter().collect();
break;
}
}
@ -181,10 +176,10 @@ pub async fn search_multi_index(mut ctx: Context<Data>) -> SResult<Response> {
let par_body = body.clone();
let responses_per_index: Vec<SResult<_>> = index_list
.into_par_iter()
.map(move |index_name| {
.map(move |index_uid| {
let index: Index = db
.open_index(&index_name)
.ok_or(ResponseError::index_not_found(&index_name))?;
.open_index(&index_uid)
.ok_or(ResponseError::index_not_found(&index_uid))?;
let mut search_builder = index.new_search(par_body.query.clone());
@ -221,7 +216,7 @@ pub async fn search_multi_index(mut ctx: Context<Data>) -> SResult<Response> {
let response = search_builder
.search(&reader)
.map_err(ResponseError::internal)?;
Ok((index_name, response))
Ok((index_uid, response))
})
.collect();
@ -230,11 +225,11 @@ pub async fn search_multi_index(mut ctx: Context<Data>) -> SResult<Response> {
let mut max_query_time = 0;
for response in responses_per_index {
if let Ok((index_name, response)) = response {
if let Ok((index_uid, response)) = response {
if response.processing_time_ms > max_query_time {
max_query_time = response.processing_time_ms;
}
hits_map.insert(index_name, response.hits);
hits_map.insert(index_uid, response.hits);
}
}

View File

@ -1,6 +1,7 @@
use std::collections::HashMap;
use chrono::{DateTime, Utc};
use log::error;
use pretty_bytes::converter::convert;
use serde::Serialize;
use sysinfo::{NetworkExt, Pid, ProcessExt, ProcessorExt, System, SystemExt};
@ -17,13 +18,12 @@ use crate::Data;
struct IndexStatsResponse {
number_of_documents: u64,
is_indexing: bool,
last_update: Option<DateTime<Utc>>,
fields_frequency: HashMap<String, usize>,
}
pub async fn index_stat(ctx: Context<Data>) -> SResult<Response> {
ctx.is_allowed(Admin)?;
let index_name = ctx.url_param("index")?;
let index_uid = ctx.url_param("index")?;
let index = ctx.index()?;
let env = &ctx.state().db.env;
@ -34,27 +34,21 @@ pub async fn index_stat(ctx: Context<Data>) -> SResult<Response> {
.number_of_documents(&reader)
.map_err(ResponseError::internal)?;
let fields_frequency = ctx
.state()
.fields_frequency(&reader, &index_name)
let fields_frequency = index
.main
.fields_frequency(&reader)
.map_err(ResponseError::internal)?
.unwrap_or_default();
let is_indexing = ctx
.state()
.is_indexing(&reader, &index_name)
.is_indexing(&reader, &index_uid)
.map_err(ResponseError::internal)?
.ok_or(ResponseError::not_found("Index not found"))?;
let last_update = ctx
.state()
.last_update(&reader, &index_name)
.map_err(ResponseError::internal)?;
.ok_or(ResponseError::internal("'is_indexing' date not found"))?;
let response = IndexStatsResponse {
number_of_documents,
is_indexing,
last_update,
fields_frequency,
};
Ok(tide::response::json(response))
@ -64,6 +58,7 @@ pub async fn index_stat(ctx: Context<Data>) -> SResult<Response> {
#[serde(rename_all = "camelCase")]
struct StatsResult {
database_size: u64,
last_update: Option<DateTime<Utc>>,
indexes: HashMap<String, IndexStatsResponse>,
}
@ -72,43 +67,44 @@ pub async fn get_stats(ctx: Context<Data>) -> SResult<Response> {
let mut index_list = HashMap::new();
if let Ok(indexes_set) = ctx.state().db.indexes_names() {
for index_name in indexes_set {
let db = &ctx.state().db;
let env = &db.env;
let db = &ctx.state().db;
let env = &db.env;
let reader = env.read_txn().map_err(ResponseError::internal)?;
let index = db.open_index(&index_name).unwrap();
let reader = env.read_txn().map_err(ResponseError::internal)?;
let indexes_set = ctx.state().db.indexes_uids();
for index_uid in indexes_set {
let index = ctx.state().db.open_index(&index_uid);
let number_of_documents = index
.main
.number_of_documents(&reader)
.map_err(ResponseError::internal)?;
match index {
Some(index) => {
let number_of_documents = index
.main
.number_of_documents(&reader)
.map_err(ResponseError::internal)?;
let fields_frequency = ctx
.state()
.fields_frequency(&reader, &index_name)
.map_err(ResponseError::internal)?
.unwrap_or_default();
let fields_frequency = index
.main
.fields_frequency(&reader)
.map_err(ResponseError::internal)?
.unwrap_or_default();
let is_indexing = ctx
.state()
.is_indexing(&reader, &index_name)
.map_err(ResponseError::internal)?
.ok_or(ResponseError::not_found("Index not found"))?;
let is_indexing = ctx
.state()
.is_indexing(&reader, &index_uid)
.map_err(ResponseError::internal)?
.ok_or(ResponseError::internal("'is_indexing' date not found"))?;
let last_update = ctx
.state()
.last_update(&reader, &index_name)
.map_err(ResponseError::internal)?;
let response = IndexStatsResponse {
number_of_documents,
is_indexing,
last_update,
fields_frequency,
};
index_list.insert(index_name, response);
let response = IndexStatsResponse {
number_of_documents,
is_indexing,
fields_frequency,
};
index_list.insert(index_uid, response);
}
None => error!(
"Index {:?} is referenced in the indexes list but cannot be found",
index_uid
),
}
}
@ -119,8 +115,14 @@ pub async fn get_stats(ctx: Context<Data>) -> SResult<Response> {
.filter(|metadata| metadata.is_file())
.fold(0, |acc, m| acc + m.len());
let last_update = ctx
.state()
.last_update(&reader)
.map_err(ResponseError::internal)?;
let response = StatsResult {
database_size,
last_update,
indexes: index_list,
};

View File

@ -115,7 +115,7 @@ pub async fn create(mut ctx: Context<Data>) -> SResult<Response> {
let response_body = IndexUpdateResponse { update_id };
Ok(tide::response::json(response_body)
.with_status(StatusCode::CREATED)
.with_status(StatusCode::ACCEPTED)
.into_response())
}