diff --git a/Cargo.toml b/Cargo.toml index 0903eab10..bd461dff0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,7 @@ [workspace] members = [ "meilidb-core", + "meilidb-http", "meilidb-schema", "meilidb-tokenizer", ] diff --git a/meilidb-http/Cargo.toml b/meilidb-http/Cargo.toml new file mode 100644 index 000000000..890c8aec8 --- /dev/null +++ b/meilidb-http/Cargo.toml @@ -0,0 +1,55 @@ +[package] +name = "meilidb-http" +version = "0.1.0" +authors = [ + "Quentin de Quelen ", + "Clément Renault ", +] +edition = "2018" + +[dependencies] +bincode = "1.2.0" +chrono = { version = "0.4.9", features = ["serde"] } +crossbeam-channel = "0.3.9" +envconfig = "0.5.1" +envconfig_derive = "0.5.1" +heed = "0.1.0" +http = "0.1.19" +indexmap = { version = "1.3.0", features = ["serde-1"] } +jemallocator = "0.3.2" +log = "0.4.8" +main_error = "0.1.0" +meilidb-core = { path = "../meilidb-core", version = "0.6.0" } +meilidb-schema = { path = "../meilidb-schema", version = "0.6.0" } +pretty-bytes = "0.2.2" +rand = "0.7.2" +rayon = "1.2.0" +serde = { version = "1.0.101", features = ["derive"] } +serde_json = { version = "1.0.41", features = ["preserve_order"] } +structopt = "0.3.3" +sysinfo = "0.9.5" +walkdir = "2.2.9" + +[dependencies.async-compression] +default-features = false +features = ["stream", "gzip", "zlib", "brotli", "zstd"] +version = "0.1.0-alpha.7" + +[dependencies.tide] +git = "https://github.com/rustasync/tide" +rev = "e77709370bb24cf776fe6da902467c35131535b1" + +[dependencies.tide-log] +git = "https://github.com/rustasync/tide" +rev = "e77709370bb24cf776fe6da902467c35131535b1" + +[dependencies.tide-slog] +git = "https://github.com/rustasync/tide" +rev = "e77709370bb24cf776fe6da902467c35131535b1" + +[dependencies.tide-compression] +git = "https://github.com/rustasync/tide" +rev = "e77709370bb24cf776fe6da902467c35131535b1" + +[build-dependencies] +vergen = "3.0.4" diff --git a/meilidb-http/build.rs b/meilidb-http/build.rs new file mode 100644 index 000000000..2257407a8 --- /dev/null +++ b/meilidb-http/build.rs @@ -0,0 +1,10 @@ +use vergen::{generate_cargo_keys, ConstantsFlags}; + +fn main() { + // Setup the flags, toggling off the 'SEMVER_FROM_CARGO_PKG' flag + let mut flags = ConstantsFlags::all(); + flags.toggle(ConstantsFlags::SEMVER_FROM_CARGO_PKG); + + // Generate the 'cargo:' key output + generate_cargo_keys(ConstantsFlags::all()).expect("Unable to generate the cargo keys!"); +} diff --git a/meilidb-http/src/data.rs b/meilidb-http/src/data.rs new file mode 100644 index 000000000..4becfa10f --- /dev/null +++ b/meilidb-http/src/data.rs @@ -0,0 +1,190 @@ +use std::collections::HashMap; +use std::ops::Deref; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; + +use chrono::{DateTime, Utc}; +use heed::types::{SerdeBincode, Str}; +use log::*; +use meilidb_core::{Database, MResult}; +use sysinfo::Pid; + +use crate::option::Opt; +use crate::routes::index::index_update_callback; + +pub type FreqsMap = HashMap; +type SerdeFreqsMap = SerdeBincode; +type SerdeDatetime = SerdeBincode>; + +#[derive(Clone)] +pub struct Data { + inner: Arc, +} + +impl Deref for Data { + type Target = DataInner; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +#[derive(Clone)] +pub struct DataInner { + pub db: Arc, + pub db_path: String, + pub admin_token: Option, + pub server_pid: Pid, + pub accept_updates: Arc, +} + +impl DataInner { + pub fn is_indexing(&self, reader: &heed::RoTxn, index: &str) -> MResult> { + match self.db.open_index(&index) { + Some(index) => index.current_update_id(&reader).map(|u| Some(u.is_some())), + None => Ok(None), + } + } + + pub fn last_update( + &self, + reader: &heed::RoTxn, + index_name: &str, + ) -> MResult>> { + let key = format!("last-update-{}", index_name); + match self + .db + .common_store() + .get::(&reader, &key)? + { + Some(datetime) => Ok(Some(datetime)), + None => Ok(None), + } + } + + pub fn set_last_update(&self, writer: &mut heed::RwTxn, index_name: &str) -> MResult<()> { + let key = format!("last-update-{}", index_name); + self.db + .common_store() + .put::(writer, &key, &Utc::now()) + .map_err(Into::into) + } + + pub fn last_backup(&self, reader: &heed::RoTxn) -> MResult>> { + match self + .db + .common_store() + .get::(&reader, "last-backup")? + { + Some(datetime) => Ok(Some(datetime)), + None => Ok(None), + } + } + + pub fn set_last_backup(&self, writer: &mut heed::RwTxn) -> MResult<()> { + self.db + .common_store() + .put::(writer, "last-backup", &Utc::now())?; + + Ok(()) + } + + pub fn fields_frequency( + &self, + reader: &heed::RoTxn, + index_name: &str, + ) -> MResult> { + let key = format!("fields-frequency-{}", index_name); + match self + .db + .common_store() + .get::(&reader, &key)? + { + Some(freqs) => Ok(Some(freqs)), + None => Ok(None), + } + } + + pub fn compute_stats(&self, writer: &mut heed::RwTxn, index_name: &str) -> MResult<()> { + let index = match self.db.open_index(&index_name) { + Some(index) => index, + None => { + error!("Impossible to retrieve index {}", index_name); + return Ok(()); + } + }; + + let schema = match index.main.schema(&writer)? { + Some(schema) => schema, + None => return Ok(()), + }; + + let all_documents_fields = index + .documents_fields_counts + .all_documents_fields_counts(&writer)?; + + // count fields frequencies + let mut fields_frequency = HashMap::<_, usize>::new(); + for result in all_documents_fields { + let (_, attr, _) = result?; + *fields_frequency.entry(attr).or_default() += 1; + } + + // convert attributes to their names + let frequency: HashMap<_, _> = fields_frequency + .into_iter() + .map(|(a, c)| (schema.attribute_name(a).to_owned(), c)) + .collect(); + + let key = format!("fields-frequency-{}", index_name); + self.db + .common_store() + .put::(writer, &key, &frequency)?; + + Ok(()) + } + + pub fn stop_accept_updates(&self) { + self.accept_updates.store(false, Ordering::Relaxed); + } + + pub fn accept_updates(&self) -> bool { + self.accept_updates.load(Ordering::Relaxed) + } +} + +impl Data { + pub fn new(opt: Opt) -> Data { + let db_path = opt.database_path.clone(); + let admin_token = opt.admin_token.clone(); + let server_pid = sysinfo::get_current_pid().unwrap(); + + let db = Arc::new(Database::open_or_create(opt.database_path.clone()).unwrap()); + let accept_updates = Arc::new(AtomicBool::new(true)); + + let inner_data = DataInner { + db: db.clone(), + db_path, + admin_token, + server_pid, + accept_updates, + }; + + let data = Data { + inner: Arc::new(inner_data), + }; + + for index_name in db.indexes_names().unwrap() { + let callback_context = data.clone(); + let callback_name = index_name.clone(); + db.set_update_callback( + index_name, + Box::new(move |status| { + index_update_callback(&callback_name, &callback_context, status); + }), + ); + } + + data + } +} diff --git a/meilidb-http/src/error.rs b/meilidb-http/src/error.rs new file mode 100644 index 000000000..a0860fcd4 --- /dev/null +++ b/meilidb-http/src/error.rs @@ -0,0 +1,117 @@ +use std::fmt::Display; + +use http::status::StatusCode; +use log::{error, warn}; +use serde::{Deserialize, Serialize}; +use tide::response::IntoResponse; +use tide::Response; + +pub type SResult = Result; + +pub enum ResponseError { + Internal(String), + BadRequest(String), + InvalidToken(String), + NotFound(String), + IndexNotFound(String), + DocumentNotFound(String), + MissingHeader(String), + BadParameter(String, String), + CreateIndex(String), + Maintenance, +} + +impl ResponseError { + pub fn internal(message: impl Display) -> ResponseError { + ResponseError::Internal(message.to_string()) + } + + pub fn bad_request(message: impl Display) -> ResponseError { + ResponseError::BadRequest(message.to_string()) + } + + pub fn invalid_token(message: impl Display) -> ResponseError { + ResponseError::InvalidToken(message.to_string()) + } + + pub fn not_found(message: impl Display) -> ResponseError { + ResponseError::NotFound(message.to_string()) + } + + pub fn index_not_found(message: impl Display) -> ResponseError { + ResponseError::IndexNotFound(message.to_string()) + } + + pub fn document_not_found(message: impl Display) -> ResponseError { + ResponseError::DocumentNotFound(message.to_string()) + } + + pub fn missing_header(message: impl Display) -> ResponseError { + ResponseError::MissingHeader(message.to_string()) + } + + pub fn bad_parameter(name: impl Display, message: impl Display) -> ResponseError { + ResponseError::BadParameter(name.to_string(), message.to_string()) + } + + pub fn create_index(message: impl Display) -> ResponseError { + ResponseError::CreateIndex(message.to_string()) + } +} + +impl IntoResponse for ResponseError { + fn into_response(self) -> Response { + match self { + ResponseError::Internal(err) => { + error!("internal server error: {}", err); + error( + String::from("Internal server error"), + StatusCode::INTERNAL_SERVER_ERROR, + ) + } + ResponseError::BadRequest(err) => { + warn!("bad request: {}", err); + error(err, StatusCode::BAD_REQUEST) + } + ResponseError::InvalidToken(err) => { + error(format!("Invalid Token: {}", err), StatusCode::FORBIDDEN) + } + ResponseError::NotFound(err) => error(err, StatusCode::NOT_FOUND), + ResponseError::IndexNotFound(index) => { + error(format!("Index {} not found", index), StatusCode::NOT_FOUND) + } + ResponseError::DocumentNotFound(id) => error( + format!("Document with id {} not found", id), + StatusCode::NOT_FOUND, + ), + ResponseError::MissingHeader(header) => error( + format!("Header {} is missing", header), + StatusCode::UNAUTHORIZED, + ), + ResponseError::BadParameter(param, e) => error( + format!("Url parameter {} error: {}", param, e), + StatusCode::BAD_REQUEST, + ), + ResponseError::CreateIndex(err) => error( + format!("Impossible to create index; {}", err), + StatusCode::BAD_REQUEST, + ), + ResponseError::Maintenance => error( + String::from("Server is in maintenance, please try again later"), + StatusCode::SERVICE_UNAVAILABLE, + ), + } + } +} + +#[derive(Serialize, Deserialize)] +struct ErrorMessage { + message: String, +} + +fn error(message: String, status: StatusCode) -> Response { + let message = ErrorMessage { message }; + tide::response::json(message) + .with_status(status) + .into_response() +} diff --git a/meilidb-http/src/helpers/meilidb.rs b/meilidb-http/src/helpers/meilidb.rs new file mode 100644 index 000000000..0604e9995 --- /dev/null +++ b/meilidb-http/src/helpers/meilidb.rs @@ -0,0 +1,568 @@ +use crate::routes::setting::{RankingOrdering, SettingBody}; +use indexmap::IndexMap; +use log::*; +use meilidb_core::criterion::*; +use meilidb_core::Highlight; +use meilidb_core::{Index, RankedMap}; +use meilidb_schema::{Schema, SchemaAttr}; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use std::cmp::Ordering; +use std::collections::{HashMap, HashSet}; +use std::convert::From; +use std::error; +use std::fmt; +use std::time::{Duration, Instant}; + +#[derive(Debug)] +pub enum Error { + SearchDocuments(String), + RetrieveDocument(u64, String), + DocumentNotFound(u64), + CropFieldWrongType(String), + AttributeNotFoundOnDocument(String), + AttributeNotFoundOnSchema(String), + MissingFilterValue, + UnknownFilteredAttribute, + Internal(String), +} + +impl error::Error for Error {} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + use Error::*; + + match self { + SearchDocuments(err) => write!(f, "impossible to search documents; {}", err), + RetrieveDocument(id, err) => write!( + f, + "impossible to retrieve the document with id: {}; {}", + id, err + ), + DocumentNotFound(id) => write!(f, "document {} not found", id), + CropFieldWrongType(field) => { + write!(f, "the field {} cannot be cropped it's not a string", field) + } + AttributeNotFoundOnDocument(field) => { + write!(f, "field {} is not found on document", field) + } + AttributeNotFoundOnSchema(field) => write!(f, "field {} is not found on schema", field), + MissingFilterValue => f.write_str("a filter doesn't have a value to compare it with"), + UnknownFilteredAttribute => { + f.write_str("a filter is specifying an unknown schema attribute") + } + Internal(err) => write!(f, "internal error; {}", err), + } + } +} + +impl From for Error { + fn from(error: meilidb_core::Error) -> Self { + Error::Internal(error.to_string()) + } +} + +pub trait IndexSearchExt { + fn new_search(&self, query: String) -> SearchBuilder; +} + +impl IndexSearchExt for Index { + fn new_search(&self, query: String) -> SearchBuilder { + SearchBuilder { + index: self, + query, + offset: 0, + limit: 20, + attributes_to_crop: None, + attributes_to_retrieve: None, + attributes_to_search_in: None, + attributes_to_highlight: None, + filters: None, + timeout: Duration::from_millis(30), + matches: false, + } + } +} + +pub struct SearchBuilder<'a> { + index: &'a Index, + query: String, + offset: usize, + limit: usize, + attributes_to_crop: Option>, + attributes_to_retrieve: Option>, + attributes_to_search_in: Option>, + attributes_to_highlight: Option>, + filters: Option, + timeout: Duration, + matches: bool, +} + +impl<'a> SearchBuilder<'a> { + pub fn offset(&mut self, value: usize) -> &SearchBuilder { + self.offset = value; + self + } + + pub fn limit(&mut self, value: usize) -> &SearchBuilder { + self.limit = value; + self + } + + pub fn attributes_to_crop(&mut self, value: HashMap) -> &SearchBuilder { + self.attributes_to_crop = Some(value); + self + } + + pub fn attributes_to_retrieve(&mut self, value: HashSet) -> &SearchBuilder { + self.attributes_to_retrieve = Some(value); + self + } + + pub fn add_retrievable_field(&mut self, value: String) -> &SearchBuilder { + let attributes_to_retrieve = self.attributes_to_retrieve.get_or_insert(HashSet::new()); + attributes_to_retrieve.insert(value); + self + } + + pub fn attributes_to_search_in(&mut self, value: HashSet) -> &SearchBuilder { + self.attributes_to_search_in = Some(value); + self + } + + pub fn attributes_to_highlight(&mut self, value: HashSet) -> &SearchBuilder { + self.attributes_to_highlight = Some(value); + self + } + + pub fn filters(&mut self, value: String) -> &SearchBuilder { + self.filters = Some(value); + self + } + + pub fn timeout(&mut self, value: Duration) -> &SearchBuilder { + self.timeout = value; + self + } + + pub fn get_matches(&mut self) -> &SearchBuilder { + self.matches = true; + self + } + + pub fn search(&self, reader: &heed::RoTxn) -> Result { + let schema = self.index.main.schema(reader); + let schema = schema.map_err(|e| Error::Internal(e.to_string()))?; + let schema = match schema { + Some(schema) => schema, + None => return Err(Error::Internal(String::from("missing schema"))), + }; + + let ranked_map = self.index.main.ranked_map(reader); + let ranked_map = ranked_map.map_err(|e| Error::Internal(e.to_string()))?; + let ranked_map = ranked_map.unwrap_or_default(); + + let start = Instant::now(); + + // Change criteria + let mut query_builder = match self.get_criteria(reader, &ranked_map, &schema)? { + Some(criteria) => self.index.query_builder_with_criteria(criteria), + None => self.index.query_builder(), + }; + + // Filter searchable fields + if let Some(fields) = &self.attributes_to_search_in { + for attribute in fields.iter().filter_map(|f| schema.attribute(f)) { + query_builder.add_searchable_attribute(attribute.0); + } + } + + if let Some(filters) = &self.filters { + let mut split = filters.split(':'); + match (split.next(), split.next()) { + (Some(_), None) | (Some(_), Some("")) => return Err(Error::MissingFilterValue), + (Some(attr), Some(value)) => { + let ref_reader = reader; + let ref_index = &self.index; + let value = value.trim().to_lowercase(); + + let attr = match schema.attribute(attr) { + Some(attr) => attr, + None => return Err(Error::UnknownFilteredAttribute), + }; + + query_builder.with_filter(move |id| { + let attr = attr; + let index = ref_index; + let reader = ref_reader; + + match index.document_attribute::(reader, id, attr) { + Ok(Some(Value::String(s))) => s.to_lowercase() == value, + Ok(Some(Value::Bool(b))) => { + (value == "true" && b) || (value == "false" && !b) + } + Ok(Some(Value::Array(a))) => { + a.into_iter().any(|s| s.as_str() == Some(&value)) + } + _ => false, + } + }); + } + (_, _) => (), + } + } + + query_builder.with_fetch_timeout(self.timeout); + + let docs = + query_builder.query(reader, &self.query, self.offset..(self.offset + self.limit)); + + let mut hits = Vec::with_capacity(self.limit); + for doc in docs.map_err(|e| Error::SearchDocuments(e.to_string()))? { + // retrieve the content of document in kv store + let mut fields: Option> = None; + if let Some(attributes_to_retrieve) = &self.attributes_to_retrieve { + let mut set = HashSet::new(); + for field in attributes_to_retrieve { + set.insert(field.as_str()); + } + fields = Some(set); + } + let mut document: IndexMap = self + .index + .document(reader, fields.as_ref(), doc.id) + .map_err(|e| Error::RetrieveDocument(doc.id.0, e.to_string()))? + .ok_or(Error::DocumentNotFound(doc.id.0))?; + + let mut matches = doc.highlights.clone(); + + // Crops fields if needed + if let Some(fields) = self.attributes_to_crop.clone() { + for (field, length) in fields { + let _ = crop_document(&mut document, &mut matches, &schema, &field, length); + } + } + + // Transform to readable matches + let matches = calculate_matches(matches, self.attributes_to_retrieve.clone(), &schema); + + if !self.matches { + if let Some(attributes_to_highlight) = self.attributes_to_highlight.clone() { + let highlights = calculate_highlights( + document.clone(), + matches.clone(), + attributes_to_highlight, + ); + for (key, value) in highlights { + if let Some(content) = document.get_mut(&key) { + *content = value; + } + } + } + } + + let matches_info = if self.matches { Some(matches) } else { None }; + + let hit = SearchHit { + hit: document, + matches_info, + }; + + hits.push(hit); + } + + let time_ms = start.elapsed().as_millis() as usize; + + let results = SearchResult { + hits, + offset: self.offset, + limit: self.limit, + processing_time_ms: time_ms, + query: self.query.to_string(), + }; + + Ok(results) + } + + pub fn get_criteria( + &self, + reader: &heed::RoTxn, + ranked_map: &'a RankedMap, + schema: &Schema, + ) -> Result>, Error> { + let current_settings = match self.index.main.customs(reader).unwrap() { + Some(bytes) => bincode::deserialize(bytes).unwrap(), + None => SettingBody::default(), + }; + + let ranking_rules = ¤t_settings.ranking_rules; + let ranking_order = ¤t_settings.ranking_order; + + if let Some(ranking_rules) = ranking_rules { + let mut builder = CriteriaBuilder::with_capacity(7 + ranking_rules.len()); + if let Some(ranking_rules_order) = ranking_order { + for rule in ranking_rules_order { + match rule.as_str() { + "_sum_of_typos" => builder.push(SumOfTypos), + "_number_of_words" => builder.push(NumberOfWords), + "_word_proximity" => builder.push(WordsProximity), + "_sum_of_words_attribute" => builder.push(SumOfWordsAttribute), + "_sum_of_words_position" => builder.push(SumOfWordsPosition), + "_exact" => builder.push(Exact), + _ => { + let order = match ranking_rules.get(rule.as_str()) { + Some(o) => o, + None => continue, + }; + + let custom_ranking = match order { + RankingOrdering::Asc => { + SortByAttr::lower_is_better(&ranked_map, &schema, &rule) + .unwrap() + } + RankingOrdering::Dsc => { + SortByAttr::higher_is_better(&ranked_map, &schema, &rule) + .unwrap() + } + }; + + builder.push(custom_ranking); + } + } + } + builder.push(DocumentId); + return Ok(Some(builder.build())); + } else { + builder.push(SumOfTypos); + builder.push(NumberOfWords); + builder.push(WordsProximity); + builder.push(SumOfWordsAttribute); + builder.push(SumOfWordsPosition); + builder.push(Exact); + for (rule, order) in ranking_rules.iter() { + let custom_ranking = match order { + RankingOrdering::Asc => { + SortByAttr::lower_is_better(&ranked_map, &schema, &rule).unwrap() + } + RankingOrdering::Dsc => { + SortByAttr::higher_is_better(&ranked_map, &schema, &rule).unwrap() + } + }; + builder.push(custom_ranking); + } + builder.push(DocumentId); + return Ok(Some(builder.build())); + } + } + + Ok(None) + } +} + +#[derive(Debug, Clone, Eq, PartialEq, PartialOrd, Serialize, Deserialize)] +pub struct MatchPosition { + pub start: usize, + pub length: usize, +} + +impl Ord for MatchPosition { + fn cmp(&self, other: &Self) -> Ordering { + match self.start.cmp(&other.start) { + Ordering::Equal => self.length.cmp(&other.length), + _ => self.start.cmp(&other.start), + } + } +} + +pub type HighlightInfos = HashMap; +pub type MatchesInfos = HashMap>; +// pub type RankingInfos = HashMap; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SearchHit { + #[serde(flatten)] + pub hit: IndexMap, + #[serde(rename = "_matchesInfo", skip_serializing_if = "Option::is_none")] + pub matches_info: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct SearchResult { + pub hits: Vec, + pub offset: usize, + pub limit: usize, + pub processing_time_ms: usize, + pub query: String, + // pub parsed_query: String, + // pub params: Option, +} + +fn crop_text( + text: &str, + matches: impl IntoIterator, + context: usize, +) -> (String, Vec) { + let mut matches = matches.into_iter().peekable(); + + let char_index = matches.peek().map(|m| m.char_index as usize).unwrap_or(0); + let start = char_index.saturating_sub(context); + let text = text.chars().skip(start).take(context * 2).collect(); + + let matches = matches + .take_while(|m| (m.char_index as usize) + (m.char_length as usize) <= start + (context * 2)) + .map(|match_| Highlight { + char_index: match_.char_index - start as u16, + ..match_ + }) + .collect(); + + (text, matches) +} + +fn crop_document( + document: &mut IndexMap, + matches: &mut Vec, + schema: &Schema, + field: &str, + length: usize, +) -> Result<(), Error> { + matches.sort_unstable_by_key(|m| (m.char_index, m.char_length)); + + let attribute = schema + .attribute(field) + .ok_or(Error::AttributeNotFoundOnSchema(field.to_string()))?; + let selected_matches = matches + .iter() + .filter(|m| SchemaAttr::new(m.attribute) == attribute) + .cloned(); + let original_text = match document.get(field) { + Some(Value::String(text)) => text, + Some(_) => return Err(Error::CropFieldWrongType(field.to_string())), + None => return Err(Error::AttributeNotFoundOnDocument(field.to_string())), + }; + let (cropped_text, cropped_matches) = crop_text(&original_text, selected_matches, length); + + document.insert( + field.to_string(), + serde_json::value::Value::String(cropped_text), + ); + matches.retain(|m| SchemaAttr::new(m.attribute) != attribute); + matches.extend_from_slice(&cropped_matches); + Ok(()) +} + +fn calculate_matches( + matches: Vec, + attributes_to_retrieve: Option>, + schema: &Schema, +) -> MatchesInfos { + let mut matches_result: HashMap> = HashMap::new(); + for m in matches.iter() { + let attribute = schema + .attribute_name(SchemaAttr::new(m.attribute)) + .to_string(); + if let Some(attributes_to_retrieve) = attributes_to_retrieve.clone() { + if !attributes_to_retrieve.contains(attribute.as_str()) { + continue; + } + }; + if let Some(pos) = matches_result.get_mut(&attribute) { + pos.push(MatchPosition { + start: m.char_index as usize, + length: m.char_length as usize, + }); + } else { + let mut positions = Vec::new(); + positions.push(MatchPosition { + start: m.char_index as usize, + length: m.char_length as usize, + }); + matches_result.insert(attribute, positions); + } + } + for (_, val) in matches_result.iter_mut() { + val.sort_unstable(); + val.dedup(); + } + matches_result +} + +fn calculate_highlights( + document: IndexMap, + matches: MatchesInfos, + attributes_to_highlight: HashSet, +) -> HighlightInfos { + let mut highlight_result: HashMap = HashMap::new(); + for (attribute, matches) in matches.iter() { + if attributes_to_highlight.contains("*") || attributes_to_highlight.contains(attribute) { + if let Some(Value::String(value)) = document.get(attribute) { + let value: Vec<_> = value.chars().collect(); + let mut highlighted_value = String::new(); + let mut index = 0; + for m in matches { + if m.start >= index { + let before = value.get(index..m.start); + let highlighted = value.get(m.start..(m.start + m.length)); + if let (Some(before), Some(highlighted)) = (before, highlighted) { + highlighted_value.extend(before); + highlighted_value.push_str(""); + highlighted_value.extend(highlighted); + highlighted_value.push_str(""); + index = m.start + m.length; + } else { + error!("value: {:?}; index: {:?}, match: {:?}", value, index, m); + } + } + } + highlighted_value.extend(value[index..].iter()); + highlight_result.insert(attribute.to_string(), Value::String(highlighted_value)); + }; + } + } + highlight_result +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn calculate_highlights() { + let data = r#"{ + "title": "Fondation (Isaac ASIMOV)", + "description": "En ce début de trentième millénaire, l'Empire n'a jamais été aussi puissant, aussi étendu à travers toute la galaxie. C'est dans sa capitale, Trantor, que l'éminent savant Hari Seldon invente la psychohistoire, une science toute nouvelle, à base de psychologie et de mathématiques, qui lui permet de prédire l'avenir... C'est-à-dire l'effondrement de l'Empire d'ici cinq siècles et au-delà, trente mille années de chaos et de ténèbres. Pour empêcher cette catastrophe et sauver la civilisation, Seldon crée la Fondation." + }"#; + + let document: IndexMap = serde_json::from_str(data).unwrap(); + let mut attributes_to_highlight = HashSet::new(); + attributes_to_highlight.insert("*".to_string()); + + let mut matches: HashMap> = HashMap::new(); + + let mut m = Vec::new(); + m.push(MatchPosition { + start: 0, + length: 9, + }); + matches.insert("title".to_string(), m); + + let mut m = Vec::new(); + m.push(MatchPosition { + start: 510, + length: 9, + }); + matches.insert("description".to_string(), m); + let result = super::calculate_highlights(document, matches, attributes_to_highlight); + + let mut result_expected = HashMap::new(); + result_expected.insert( + "title".to_string(), + Value::String("Fondation (Isaac ASIMOV)".to_string()), + ); + result_expected.insert("description".to_string(), Value::String("En ce début de trentième millénaire, l'Empire n'a jamais été aussi puissant, aussi étendu à travers toute la galaxie. C'est dans sa capitale, Trantor, que l'éminent savant Hari Seldon invente la psychohistoire, une science toute nouvelle, à base de psychologie et de mathématiques, qui lui permet de prédire l'avenir... C'est-à-dire l'effondrement de l'Empire d'ici cinq siècles et au-delà, trente mille années de chaos et de ténèbres. Pour empêcher cette catastrophe et sauver la civilisation, Seldon crée la Fondation.".to_string())); + + assert_eq!(result, result_expected); + } +} diff --git a/meilidb-http/src/helpers/mod.rs b/meilidb-http/src/helpers/mod.rs new file mode 100644 index 000000000..9c530b0a2 --- /dev/null +++ b/meilidb-http/src/helpers/mod.rs @@ -0,0 +1,2 @@ +pub mod meilidb; +pub mod tide; diff --git a/meilidb-http/src/helpers/tide.rs b/meilidb-http/src/helpers/tide.rs new file mode 100644 index 000000000..e68da7852 --- /dev/null +++ b/meilidb-http/src/helpers/tide.rs @@ -0,0 +1,118 @@ +use crate::error::{ResponseError, SResult}; +use crate::models::token::*; +use crate::Data; +use chrono::Utc; +use heed::types::{SerdeBincode, Str}; +use meilidb_core::Index; +use serde_json::Value; +use tide::Context; + +pub trait ContextExt { + fn is_allowed(&self, acl: ACL) -> SResult<()>; + fn header(&self, name: &str) -> Result; + fn url_param(&self, name: &str) -> Result; + fn index(&self) -> Result; + fn identifier(&self) -> Result; +} + +impl ContextExt for Context { + fn is_allowed(&self, acl: ACL) -> SResult<()> { + let admin_token = match &self.state().admin_token { + Some(admin_token) => admin_token, + None => return Ok(()), + }; + + let user_api_key = self.header("X-Meili-API-Key")?; + if user_api_key == *admin_token { + return Ok(()); + } + let request_index: Option = None; //self.param::("index").ok(); + + let db = &self.state().db; + let env = &db.env; + let reader = env.read_txn().map_err(ResponseError::internal)?; + + let token_key = format!("{}{}", TOKEN_PREFIX_KEY, user_api_key); + + let token_config = db + .common_store() + .get::>(&reader, &token_key) + .map_err(ResponseError::internal)? + .ok_or(ResponseError::not_found(format!( + "token key: {}", + token_key + )))?; + + if token_config.revoked { + return Err(ResponseError::invalid_token("token revoked")); + } + + if let Some(index) = request_index { + if !token_config + .indexes + .iter() + .any(|r| match_wildcard(&r, &index)) + { + return Err(ResponseError::invalid_token( + "token is not allowed to access to this index", + )); + } + } + + if token_config.expires_at < Utc::now() { + return Err(ResponseError::invalid_token("token expired")); + } + + if token_config.acl.contains(&ACL::All) { + return Ok(()); + } + + if !token_config.acl.contains(&acl) { + return Err(ResponseError::invalid_token("token do not have this ACL")); + } + + Ok(()) + } + + fn header(&self, name: &str) -> Result { + let header = self + .headers() + .get(name) + .ok_or(ResponseError::missing_header(name))? + .to_str() + .map_err(|_| ResponseError::missing_header("X-Meili-API-Key"))? + .to_string(); + Ok(header) + } + + fn url_param(&self, name: &str) -> Result { + let param = self + .param::(name) + .map_err(|e| ResponseError::bad_parameter(name, e))?; + Ok(param) + } + + fn index(&self) -> Result { + let index_name = self.url_param("index")?; + let index = self + .state() + .db + .open_index(&index_name) + .ok_or(ResponseError::index_not_found(index_name))?; + Ok(index) + } + + fn identifier(&self) -> Result { + let name = self + .param::("identifier") + .as_ref() + .map(meilidb_core::serde::value_to_string) + .map_err(|e| ResponseError::bad_parameter("identifier", e))? + .ok_or(ResponseError::bad_parameter( + "identifier", + "missing parameter", + ))?; + + Ok(name) + } +} diff --git a/meilidb-http/src/lib.rs b/meilidb-http/src/lib.rs new file mode 100644 index 000000000..94f395c43 --- /dev/null +++ b/meilidb-http/src/lib.rs @@ -0,0 +1,11 @@ +#[macro_use] +extern crate envconfig_derive; + +pub mod data; +pub mod error; +pub mod helpers; +pub mod models; +pub mod option; +pub mod routes; + +use self::data::Data; diff --git a/meilidb-http/src/main.rs b/meilidb-http/src/main.rs new file mode 100644 index 000000000..cbb262e6a --- /dev/null +++ b/meilidb-http/src/main.rs @@ -0,0 +1,36 @@ +use http::header::HeaderValue; +use log::info; +use main_error::MainError; +use tide::middleware::{CorsMiddleware, CorsOrigin}; +use tide_log::RequestLogger; + +use meilidb_http::data::Data; +use meilidb_http::option::Opt; +use meilidb_http::routes; + +#[cfg(not(target_os = "macos"))] +#[global_allocator] +static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; + +pub fn main() -> Result<(), MainError> { + let opt = Opt::new(); + + let data = Data::new(opt.clone()); + let mut app = tide::App::with_state(data); + + app.middleware( + CorsMiddleware::new() + .allow_origin(CorsOrigin::from("*")) + .allow_methods(HeaderValue::from_static("GET, POST, OPTIONS")), + ); + app.middleware(RequestLogger::new()); + app.middleware(tide_compression::Compression::new()); + app.middleware(tide_compression::Decompression::new()); + + routes::load_routes(&mut app); + + info!("Server HTTP enabled"); + app.run(opt.http_addr)?; + + Ok(()) +} diff --git a/meilidb-http/src/models/mod.rs b/meilidb-http/src/models/mod.rs new file mode 100644 index 000000000..a6ce812e7 --- /dev/null +++ b/meilidb-http/src/models/mod.rs @@ -0,0 +1,3 @@ +pub mod schema; +pub mod token; +pub mod update_operation; diff --git a/meilidb-http/src/models/schema.rs b/meilidb-http/src/models/schema.rs new file mode 100644 index 000000000..ee9203aa2 --- /dev/null +++ b/meilidb-http/src/models/schema.rs @@ -0,0 +1,118 @@ +use std::collections::HashSet; + +use indexmap::IndexMap; +use meilidb_schema::{Schema, SchemaBuilder, SchemaProps}; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum FieldProperties { + Identifier, + Indexed, + Displayed, + Ranked, +} + +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] +pub struct SchemaBody(IndexMap>); + +impl From for SchemaBody { + fn from(value: Schema) -> SchemaBody { + let mut map = IndexMap::new(); + for (name, _attr, props) in value.iter() { + let old_properties = map.entry(name.to_owned()).or_insert(HashSet::new()); + if props.is_indexed() { + old_properties.insert(FieldProperties::Indexed); + } + if props.is_displayed() { + old_properties.insert(FieldProperties::Displayed); + } + if props.is_ranked() { + old_properties.insert(FieldProperties::Ranked); + } + } + let old_properties = map + .entry(value.identifier_name().to_string()) + .or_insert(HashSet::new()); + old_properties.insert(FieldProperties::Identifier); + old_properties.insert(FieldProperties::Displayed); + SchemaBody(map) + } +} + +impl Into for SchemaBody { + fn into(self) -> Schema { + let mut identifier = "documentId".to_string(); + let mut attributes = IndexMap::new(); + for (field, properties) in self.0 { + let mut indexed = false; + let mut displayed = false; + let mut ranked = false; + for property in properties { + match property { + FieldProperties::Indexed => indexed = true, + FieldProperties::Displayed => displayed = true, + FieldProperties::Ranked => ranked = true, + FieldProperties::Identifier => identifier = field.clone(), + } + } + attributes.insert( + field, + SchemaProps { + indexed, + displayed, + ranked, + }, + ); + } + + let mut builder = SchemaBuilder::with_identifier(identifier); + for (field, props) in attributes { + builder.new_attribute(field, props); + } + builder.build() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_schema_body_conversion() { + let schema_body = r#" + { + "id": ["identifier", "indexed", "displayed"], + "title": ["indexed", "displayed"], + "date": ["displayed"] + } + "#; + + let schema_builder = r#" + { + "identifier": "id", + "attributes": { + "id": { + "indexed": true, + "displayed": true + }, + "title": { + "indexed": true, + "displayed": true + }, + "date": { + "displayed": true + } + } + } + "#; + + let schema_body: SchemaBody = serde_json::from_str(schema_body).unwrap(); + let schema_builder: SchemaBuilder = serde_json::from_str(schema_builder).unwrap(); + + let schema_from_body: Schema = schema_body.into(); + let schema_from_builder: Schema = schema_builder.build(); + + assert_eq!(schema_from_body, schema_from_builder); + } +} diff --git a/meilidb-http/src/models/token.rs b/meilidb-http/src/models/token.rs new file mode 100644 index 000000000..b1d266735 --- /dev/null +++ b/meilidb-http/src/models/token.rs @@ -0,0 +1,72 @@ +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; + +pub const TOKEN_PREFIX_KEY: &str = "_token_"; + +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum ACL { + IndexesRead, + IndexesWrite, + DocumentsRead, + DocumentsWrite, + SettingsRead, + SettingsWrite, + Admin, + #[serde(rename = "*")] + All, +} + +pub type Wildcard = String; + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Token { + pub key: String, + pub description: String, + pub acl: Vec, + pub indexes: Vec, + pub created_at: DateTime, + pub updated_at: DateTime, + pub expires_at: DateTime, + pub revoked: bool, +} + +fn cleanup_wildcard(input: &str) -> (bool, &str, bool) { + let first = input.chars().next().filter(|&c| c == '*').is_some(); + let last = input.chars().last().filter(|&c| c == '*').is_some(); + let bound_last = std::cmp::max(input.len().saturating_sub(last as usize), first as usize); + let output = input.get(first as usize..bound_last).unwrap(); + (first, output, last) +} + +pub fn match_wildcard(pattern: &str, input: &str) -> bool { + let (first, pattern, last) = cleanup_wildcard(pattern); + + match (first, last) { + (false, false) => pattern == input, + (true, false) => input.ends_with(pattern), + (false, true) => input.starts_with(pattern), + (true, true) => input.contains(pattern), + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_match_wildcard() { + assert!(match_wildcard("*", "qqq")); + assert!(match_wildcard("*", "")); + assert!(match_wildcard("*ab", "qqqab")); + assert!(match_wildcard("*ab*", "qqqabqq")); + assert!(match_wildcard("ab*", "abqqq")); + assert!(match_wildcard("**", "ab")); + assert!(match_wildcard("ab", "ab")); + assert!(match_wildcard("ab*", "ab")); + assert!(match_wildcard("*ab", "ab")); + assert!(match_wildcard("*ab*", "ab")); + assert!(match_wildcard("*😆*", "ab😆dsa")); + } +} diff --git a/meilidb-http/src/models/update_operation.rs b/meilidb-http/src/models/update_operation.rs new file mode 100644 index 000000000..84f99af7c --- /dev/null +++ b/meilidb-http/src/models/update_operation.rs @@ -0,0 +1,33 @@ +use std::fmt; + +#[allow(dead_code)] +#[derive(Debug)] +pub enum UpdateOperation { + ClearAllDocuments, + DocumentsAddition, + DocumentsDeletion, + SynonymsAddition, + SynonymsDeletion, + StopWordsAddition, + StopWordsDeletion, + Schema, + Config, +} + +impl fmt::Display for UpdateOperation { + fn fmt(&self, f: &mut fmt::Formatter) -> std::fmt::Result { + use UpdateOperation::*; + + match self { + ClearAllDocuments => write!(f, "ClearAllDocuments"), + DocumentsAddition => write!(f, "DocumentsAddition"), + DocumentsDeletion => write!(f, "DocumentsDeletion"), + SynonymsAddition => write!(f, "SynonymsAddition"), + SynonymsDeletion => write!(f, "SynonymsDelettion"), + StopWordsAddition => write!(f, "StopWordsAddition"), + StopWordsDeletion => write!(f, "StopWordsDeletion"), + Schema => write!(f, "Schema"), + Config => write!(f, "Config"), + } + } +} diff --git a/meilidb-http/src/option.rs b/meilidb-http/src/option.rs new file mode 100644 index 000000000..fee010733 --- /dev/null +++ b/meilidb-http/src/option.rs @@ -0,0 +1,56 @@ +use envconfig::Envconfig; +use structopt::StructOpt; + +#[derive(Debug, Clone, StructOpt, Envconfig)] +struct Vars { + /// The destination where the database must be created. + #[structopt(long)] + #[envconfig(from = "MEILI_DATABASE_PATH")] + pub database_path: Option, + + /// The addr on which the http server will listen. + #[structopt(long)] + #[envconfig(from = "MEILI_HTTP_ADDR")] + pub http_addr: Option, + + #[structopt(long)] + #[envconfig(from = "MEILI_ADMIN_TOKEN")] + pub admin_token: Option, +} + +#[derive(Clone, Debug)] +pub struct Opt { + pub database_path: String, + pub http_addr: String, + pub admin_token: Option, +} + +impl Default for Opt { + fn default() -> Self { + Opt { + database_path: String::from("/tmp/meilidb"), + http_addr: String::from("127.0.0.1:8080"), + admin_token: None, + } + } +} + +impl Opt { + pub fn new() -> Self { + let default = Self::default(); + let args = Vars::from_args(); + let env = Vars::init().unwrap(); + + Self { + database_path: env + .database_path + .or(args.database_path) + .unwrap_or(default.database_path), + http_addr: env + .http_addr + .or(args.http_addr) + .unwrap_or(default.http_addr), + admin_token: env.admin_token.or(args.admin_token).or(default.admin_token), + } + } +} diff --git a/meilidb-http/src/routes/document.rs b/meilidb-http/src/routes/document.rs new file mode 100644 index 000000000..a627b2195 --- /dev/null +++ b/meilidb-http/src/routes/document.rs @@ -0,0 +1,250 @@ +use std::collections::{BTreeSet, HashSet}; + +use http::StatusCode; +use indexmap::IndexMap; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use tide::querystring::ContextExt as QSContextExt; +use tide::response::IntoResponse; +use tide::{Context, Response}; + +use crate::error::{ResponseError, SResult}; +use crate::helpers::tide::ContextExt; +use crate::models::token::ACL::*; +use crate::Data; + +pub async fn get_document(ctx: Context) -> SResult { + ctx.is_allowed(DocumentsRead)?; + + let index = ctx.index()?; + + let identifier = ctx.identifier()?; + let document_id = meilidb_core::serde::compute_document_id(identifier.clone()); + + let env = &ctx.state().db.env; + let reader = env.read_txn().map_err(ResponseError::internal)?; + + let response = index + .document::>(&reader, None, document_id) + .map_err(ResponseError::internal)? + .ok_or(ResponseError::document_not_found(&identifier))?; + + if response.is_empty() { + return Err(ResponseError::document_not_found(identifier)); + } + + Ok(tide::response::json(response)) +} + +#[derive(Default, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct IndexUpdateResponse { + pub update_id: u64, +} + +pub async fn delete_document(ctx: Context) -> SResult { + ctx.is_allowed(DocumentsWrite)?; + + if !ctx.state().accept_updates() { + return Err(ResponseError::Maintenance); + } + + let index = ctx.index()?; + let identifier = ctx.identifier()?; + let document_id = meilidb_core::serde::compute_document_id(identifier.clone()); + + let env = &ctx.state().db.env; + let mut writer = env.write_txn().map_err(ResponseError::internal)?; + + let mut documents_deletion = index.documents_deletion(); + documents_deletion.delete_document_by_id(document_id); + let update_id = documents_deletion + .finalize(&mut writer) + .map_err(ResponseError::internal)?; + + writer.commit().map_err(ResponseError::internal)?; + + let response_body = IndexUpdateResponse { update_id }; + Ok(tide::response::json(response_body) + .with_status(StatusCode::ACCEPTED) + .into_response()) +} + +#[derive(Default, Deserialize)] +#[serde(rename_all = "camelCase", deny_unknown_fields)] +struct BrowseQuery { + offset: Option, + limit: Option, + attributes_to_retrieve: Option, +} + +pub async fn browse_documents(ctx: Context) -> SResult { + ctx.is_allowed(DocumentsRead)?; + + let index = ctx.index()?; + let query: BrowseQuery = ctx.url_query().unwrap_or(BrowseQuery::default()); + + let offset = query.offset.unwrap_or(0); + let limit = query.limit.unwrap_or(20); + + let env = &ctx.state().db.env; + let reader = env.read_txn().map_err(ResponseError::internal)?; + + let documents_ids: Result, _> = + match index.documents_fields_counts.documents_ids(&reader) { + Ok(documents_ids) => documents_ids.skip(offset).take(limit).collect(), + Err(e) => return Err(ResponseError::internal(e)), + }; + + let documents_ids = match documents_ids { + Ok(documents_ids) => documents_ids, + Err(e) => return Err(ResponseError::internal(e)), + }; + + let mut response_body = Vec::>::new(); + + if let Some(attributes) = query.attributes_to_retrieve { + let attributes = attributes.split(',').collect::>(); + for document_id in documents_ids { + if let Ok(Some(document)) = index.document(&reader, Some(&attributes), document_id) { + response_body.push(document); + } + } + } else { + for document_id in documents_ids { + if let Ok(Some(document)) = index.document(&reader, None, document_id) { + response_body.push(document); + } + } + } + + if response_body.is_empty() { + Ok(tide::response::json(response_body) + .with_status(StatusCode::NO_CONTENT) + .into_response()) + } else { + Ok(tide::response::json(response_body) + .with_status(StatusCode::OK) + .into_response()) + } +} + +fn infered_schema(document: &IndexMap) -> Option { + use meilidb_schema::{SchemaBuilder, DISPLAYED, INDEXED}; + + let mut identifier = None; + for key in document.keys() { + if identifier.is_none() && key.to_lowercase().contains("id") { + identifier = Some(key); + } + } + + match identifier { + Some(identifier) => { + let mut builder = SchemaBuilder::with_identifier(identifier); + for key in document.keys() { + builder.new_attribute(key, DISPLAYED | INDEXED); + } + Some(builder.build()) + } + None => None, + } +} + +pub async fn add_or_update_multiple_documents(mut ctx: Context) -> SResult { + ctx.is_allowed(DocumentsWrite)?; + + if !ctx.state().accept_updates() { + return Err(ResponseError::Maintenance); + } + let data: Vec> = + ctx.body_json().await.map_err(ResponseError::bad_request)?; + let index = ctx.index()?; + + let env = &ctx.state().db.env; + let mut writer = env.write_txn().map_err(ResponseError::internal)?; + + let current_schema = index + .main + .schema(&writer) + .map_err(ResponseError::internal)?; + if current_schema.is_none() { + match data.first().and_then(infered_schema) { + Some(schema) => { + index + .schema_update(&mut writer, schema) + .map_err(ResponseError::internal)?; + } + None => return Err(ResponseError::bad_request("Could not infer a schema")), + } + } + + let mut document_addition = index.documents_addition(); + + for document in data { + document_addition.update_document(document); + } + + let update_id = document_addition + .finalize(&mut writer) + .map_err(ResponseError::internal)?; + + writer.commit().map_err(ResponseError::internal)?; + + let response_body = IndexUpdateResponse { update_id }; + Ok(tide::response::json(response_body) + .with_status(StatusCode::ACCEPTED) + .into_response()) +} + +pub async fn delete_multiple_documents(mut ctx: Context) -> SResult { + ctx.is_allowed(DocumentsWrite)?; + if !ctx.state().accept_updates() { + return Err(ResponseError::Maintenance); + } + let data: Vec = ctx.body_json().await.map_err(ResponseError::bad_request)?; + let index = ctx.index()?; + + let env = &ctx.state().db.env; + let mut writer = env.write_txn().map_err(ResponseError::internal)?; + + let mut documents_deletion = index.documents_deletion(); + + for identifier in data { + if let Some(identifier) = meilidb_core::serde::value_to_string(&identifier) { + documents_deletion + .delete_document_by_id(meilidb_core::serde::compute_document_id(identifier)); + } + } + + let update_id = documents_deletion + .finalize(&mut writer) + .map_err(ResponseError::internal)?; + + writer.commit().map_err(ResponseError::internal)?; + + let response_body = IndexUpdateResponse { update_id }; + Ok(tide::response::json(response_body) + .with_status(StatusCode::ACCEPTED) + .into_response()) +} + +pub async fn clear_all_documents(ctx: Context) -> SResult { + ctx.is_allowed(DocumentsWrite)?; + if !ctx.state().accept_updates() { + return Err(ResponseError::Maintenance); + } + let index = ctx.index()?; + + let env = &ctx.state().db.env; + let mut writer = env.write_txn().map_err(ResponseError::internal)?; + let update_id = index + .clear_all(&mut writer) + .map_err(ResponseError::internal)?; + writer.commit().map_err(ResponseError::internal)?; + + let response_body = IndexUpdateResponse { update_id }; + Ok(tide::response::json(response_body) + .with_status(StatusCode::ACCEPTED) + .into_response()) +} diff --git a/meilidb-http/src/routes/health.rs b/meilidb-http/src/routes/health.rs new file mode 100644 index 000000000..d919487e6 --- /dev/null +++ b/meilidb-http/src/routes/health.rs @@ -0,0 +1,79 @@ +use crate::error::{ResponseError, SResult}; +use crate::helpers::tide::ContextExt; +use crate::models::token::ACL::*; +use crate::Data; + +use heed::types::{Str, Unit}; +use serde::Deserialize; +use tide::Context; + +const UNHEALTHY_KEY: &str = "_is_unhealthy"; + +pub async fn get_health(ctx: Context) -> SResult<()> { + let db = &ctx.state().db; + let env = &db.env; + let reader = env.read_txn().map_err(ResponseError::internal)?; + + let common_store = ctx.state().db.common_store(); + + if let Ok(Some(_)) = common_store.get::(&reader, UNHEALTHY_KEY) { + return Err(ResponseError::Maintenance); + } + + Ok(()) +} + +pub async fn set_healthy(ctx: Context) -> SResult<()> { + ctx.is_allowed(Admin)?; + + let db = &ctx.state().db; + let env = &db.env; + let mut writer = env.write_txn().map_err(ResponseError::internal)?; + + let common_store = ctx.state().db.common_store(); + match common_store.delete::(&mut writer, UNHEALTHY_KEY) { + Ok(_) => (), + Err(e) => return Err(ResponseError::internal(e)), + } + + if let Err(e) = writer.commit() { + return Err(ResponseError::internal(e)); + } + + Ok(()) +} + +pub async fn set_unhealthy(ctx: Context) -> SResult<()> { + ctx.is_allowed(Admin)?; + + let db = &ctx.state().db; + let env = &db.env; + let mut writer = env.write_txn().map_err(ResponseError::internal)?; + + let common_store = ctx.state().db.common_store(); + + if let Err(e) = common_store.put::(&mut writer, UNHEALTHY_KEY, &()) { + return Err(ResponseError::internal(e)); + } + + if let Err(e) = writer.commit() { + return Err(ResponseError::internal(e)); + } + + Ok(()) +} + +#[derive(Deserialize, Clone)] +struct HealtBody { + health: bool, +} + +pub async fn change_healthyness(mut ctx: Context) -> SResult<()> { + let body: HealtBody = ctx.body_json().await.map_err(ResponseError::bad_request)?; + + if body.health { + set_healthy(ctx).await + } else { + set_unhealthy(ctx).await + } +} diff --git a/meilidb-http/src/routes/index.rs b/meilidb-http/src/routes/index.rs new file mode 100644 index 000000000..85cd4b236 --- /dev/null +++ b/meilidb-http/src/routes/index.rs @@ -0,0 +1,210 @@ +use http::StatusCode; +use meilidb_core::{ProcessedUpdateResult, UpdateStatus}; +use meilidb_schema::Schema; +use serde_json::json; +use tide::response::IntoResponse; +use tide::{Context, Response}; + +use crate::error::{ResponseError, SResult}; +use crate::helpers::tide::ContextExt; +use crate::models::schema::SchemaBody; +use crate::models::token::ACL::*; +use crate::routes::document::IndexUpdateResponse; +use crate::Data; + +pub async fn list_indexes(ctx: Context) -> SResult { + ctx.is_allowed(IndexesRead)?; + let list = ctx + .state() + .db + .indexes_names() + .map_err(ResponseError::internal)?; + Ok(tide::response::json(list)) +} + +pub async fn get_index_schema(ctx: Context) -> SResult { + ctx.is_allowed(IndexesRead)?; + + let index = ctx.index()?; + + let env = &ctx.state().db.env; + let reader = env.read_txn().map_err(ResponseError::internal)?; + + let schema = index + .main + .schema(&reader) + .map_err(ResponseError::create_index)?; + + match schema { + Some(schema) => { + let schema = SchemaBody::from(schema); + Ok(tide::response::json(schema)) + } + None => Ok( + tide::response::json(json!({ "message": "missing index schema" })) + .with_status(StatusCode::NOT_FOUND) + .into_response(), + ), + } +} + +pub async fn create_index(mut ctx: Context) -> SResult { + ctx.is_allowed(IndexesWrite)?; + + let index_name = ctx.url_param("index")?; + + let body = ctx.body_bytes().await.map_err(ResponseError::bad_request)?; + let schema: Option = if body.is_empty() { + None + } else { + serde_json::from_slice::(&body) + .map_err(ResponseError::bad_request) + .map(|s| Some(s.into()))? + }; + + let db = &ctx.state().db; + + let created_index = match db.create_index(&index_name) { + Ok(index) => index, + Err(meilidb_core::Error::IndexAlreadyExists) => db.open_index(&index_name).ok_or( + ResponseError::internal("index not found but must have been found"), + )?, + Err(e) => return Err(ResponseError::create_index(e)), + }; + + let callback_context = ctx.state().clone(); + let callback_name = index_name.clone(); + db.set_update_callback( + &index_name, + Box::new(move |status| { + index_update_callback(&callback_name, &callback_context, status); + }), + ); + + let env = &db.env; + let mut writer = env.write_txn().map_err(ResponseError::internal)?; + + match schema { + Some(schema) => { + let update_id = created_index + .schema_update(&mut writer, schema.clone()) + .map_err(ResponseError::internal)?; + + writer.commit().map_err(ResponseError::internal)?; + + let response_body = IndexUpdateResponse { update_id }; + Ok(tide::response::json(response_body) + .with_status(StatusCode::CREATED) + .into_response()) + } + None => Ok(Response::new(tide::Body::empty()) + .with_status(StatusCode::NO_CONTENT) + .into_response()), + } +} + +pub async fn update_schema(mut ctx: Context) -> SResult { + ctx.is_allowed(IndexesWrite)?; + + let index_name = ctx.url_param("index")?; + + let schema = ctx + .body_json::() + .await + .map_err(ResponseError::bad_request)?; + + let db = &ctx.state().db; + let env = &db.env; + let mut writer = env.write_txn().map_err(ResponseError::internal)?; + + let index = db + .open_index(&index_name) + .ok_or(ResponseError::index_not_found(index_name))?; + + let schema: meilidb_schema::Schema = schema.into(); + let update_id = index + .schema_update(&mut writer, schema.clone()) + .map_err(ResponseError::internal)?; + + writer.commit().map_err(ResponseError::internal)?; + + let response_body = IndexUpdateResponse { update_id }; + Ok(tide::response::json(response_body) + .with_status(StatusCode::ACCEPTED) + .into_response()) +} + +pub async fn get_update_status(ctx: Context) -> SResult { + ctx.is_allowed(IndexesRead)?; + + let env = &ctx.state().db.env; + let reader = env.read_txn().map_err(ResponseError::internal)?; + + let update_id = ctx + .param::("update_id") + .map_err(|e| ResponseError::bad_parameter("update_id", e))?; + + let index = ctx.index()?; + let status = index + .update_status(&reader, update_id) + .map_err(ResponseError::internal)?; + + let response = match status { + UpdateStatus::Enqueued(data) => { + tide::response::json(json!({ "status": "enqueued", "data": data })) + .with_status(StatusCode::OK) + .into_response() + } + UpdateStatus::Processed(data) => { + tide::response::json(json!({ "status": "processed", "data": data })) + .with_status(StatusCode::OK) + .into_response() + } + UpdateStatus::Unknown => tide::response::json(json!({ "message": "unknown update id" })) + .with_status(StatusCode::NOT_FOUND) + .into_response(), + }; + + Ok(response) +} + +pub async fn get_all_updates_status(ctx: Context) -> SResult { + ctx.is_allowed(IndexesRead)?; + + let env = &ctx.state().db.env; + let reader = env.read_txn().map_err(ResponseError::internal)?; + + let index = ctx.index()?; + let all_status = index + .all_updates_status(&reader) + .map_err(ResponseError::internal)?; + + let response = tide::response::json(all_status) + .with_status(StatusCode::OK) + .into_response(); + + Ok(response) +} + +pub async fn delete_index(ctx: Context) -> SResult { + ctx.is_allowed(IndexesWrite)?; + let _index_name = ctx.url_param("index")?; + let _index = ctx.index()?; + + // ctx.state() + // .db + // .delete_index(&index_name) + // .map_err(ResponseError::internal)?; + + Ok(StatusCode::NOT_IMPLEMENTED) +} + +pub fn index_update_callback(index_name: &str, data: &Data, _status: ProcessedUpdateResult) { + let env = &data.db.env; + let mut writer = env.write_txn().unwrap(); + + data.compute_stats(&mut writer, &index_name).unwrap(); + data.set_last_update(&mut writer, &index_name).unwrap(); + + writer.commit().unwrap(); +} diff --git a/meilidb-http/src/routes/key.rs b/meilidb-http/src/routes/key.rs new file mode 100644 index 000000000..cdefc39ba --- /dev/null +++ b/meilidb-http/src/routes/key.rs @@ -0,0 +1,189 @@ +use chrono::serde::ts_seconds; +use chrono::{DateTime, Utc}; +use heed::types::{SerdeBincode, Str}; +use http::StatusCode; +use rand::seq::SliceRandom; +use serde::{Deserialize, Serialize}; +use tide::response::IntoResponse; +use tide::{Context, Response}; + +use crate::error::{ResponseError, SResult}; +use crate::helpers::tide::ContextExt; +use crate::models::token::ACL::*; +use crate::models::token::*; +use crate::Data; + +fn generate_api_key() -> String { + let mut rng = rand::thread_rng(); + let sample = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"; + sample + .choose_multiple(&mut rng, 40) + .map(|c| *c as char) + .collect() +} + +pub async fn list(ctx: Context) -> SResult { + ctx.is_allowed(Admin)?; + + let db = &ctx.state().db; + let env = &db.env; + let reader = env.read_txn().map_err(ResponseError::internal)?; + + let common_store = db.common_store(); + + let mut response: Vec = Vec::new(); + + let iter = common_store + .prefix_iter::>(&reader, TOKEN_PREFIX_KEY) + .map_err(ResponseError::internal)?; + + for result in iter { + let (_, token) = result.map_err(ResponseError::internal)?; + response.push(token); + } + + Ok(tide::response::json(response)) +} + +pub async fn get(ctx: Context) -> SResult { + ctx.is_allowed(Admin)?; + let request_key = ctx.url_param("key")?; + + let db = &ctx.state().db; + let env = &db.env; + let reader = env.read_txn().map_err(ResponseError::internal)?; + + let token_key = format!("{}{}", TOKEN_PREFIX_KEY, request_key); + + let token_config = db + .common_store() + .get::>(&reader, &token_key) + .map_err(ResponseError::internal)? + .ok_or(ResponseError::not_found(format!( + "token key: {}", + token_key + )))?; + + Ok(tide::response::json(token_config)) +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase", deny_unknown_fields)] +pub struct CreatedRequest { + description: String, + acl: Vec, + indexes: Vec, + #[serde(with = "ts_seconds")] + expires_at: DateTime, +} + +pub async fn create(mut ctx: Context) -> SResult { + ctx.is_allowed(Admin)?; + + let data: CreatedRequest = ctx.body_json().await.map_err(ResponseError::bad_request)?; + + let key = generate_api_key(); + let token_key = format!("{}{}", TOKEN_PREFIX_KEY, key); + + let token_definition = Token { + key, + description: data.description, + acl: data.acl, + indexes: data.indexes, + expires_at: data.expires_at, + created_at: Utc::now(), + updated_at: Utc::now(), + revoked: false, + }; + + let db = &ctx.state().db; + let env = &db.env; + let mut writer = env.write_txn().map_err(ResponseError::internal)?; + + db.common_store() + .put::>(&mut writer, &token_key, &token_definition) + .map_err(ResponseError::internal)?; + + writer.commit().map_err(ResponseError::internal)?; + + Ok(tide::response::json(token_definition) + .with_status(StatusCode::CREATED) + .into_response()) +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase", deny_unknown_fields)] +pub struct UpdatedRequest { + description: Option, + acl: Option>, + indexes: Option>, +} + +pub async fn update(mut ctx: Context) -> SResult { + ctx.is_allowed(Admin)?; + let request_key = ctx.url_param("key")?; + + let data: UpdatedRequest = ctx.body_json().await.map_err(ResponseError::bad_request)?; + + let db = &ctx.state().db; + let env = &db.env; + let mut writer = env.write_txn().map_err(ResponseError::internal)?; + + let common_store = db.common_store(); + + let token_key = format!("{}{}", TOKEN_PREFIX_KEY, request_key); + + let mut token_config = common_store + .get::>(&writer, &token_key) + .map_err(ResponseError::internal)? + .ok_or(ResponseError::not_found(format!( + "token key: {}", + token_key + )))?; + + // apply the modifications + if let Some(description) = data.description { + token_config.description = description; + } + + if let Some(acl) = data.acl { + token_config.acl = acl; + } + + if let Some(indexes) = data.indexes { + token_config.indexes = indexes; + } + + token_config.updated_at = Utc::now(); + + common_store + .put::>(&mut writer, &token_key, &token_config) + .map_err(ResponseError::internal)?; + + writer.commit().map_err(ResponseError::internal)?; + + Ok(tide::response::json(token_config) + .with_status(StatusCode::ACCEPTED) + .into_response()) +} + +pub async fn delete(ctx: Context) -> SResult { + ctx.is_allowed(Admin)?; + let request_key = ctx.url_param("key")?; + + let db = &ctx.state().db; + let env = &db.env; + let mut writer = env.write_txn().map_err(ResponseError::internal)?; + + let common_store = db.common_store(); + + let token_key = format!("{}{}", TOKEN_PREFIX_KEY, request_key); + + common_store + .delete::(&mut writer, &token_key) + .map_err(ResponseError::internal)?; + + writer.commit().map_err(ResponseError::internal)?; + + Ok(StatusCode::ACCEPTED) +} diff --git a/meilidb-http/src/routes/mod.rs b/meilidb-http/src/routes/mod.rs new file mode 100644 index 000000000..7bb56ab21 --- /dev/null +++ b/meilidb-http/src/routes/mod.rs @@ -0,0 +1,111 @@ +use crate::data::Data; + +pub mod document; +pub mod health; +pub mod index; +pub mod key; +pub mod search; +pub mod setting; +pub mod stats; +pub mod stop_words; +pub mod synonym; + +pub fn load_routes(app: &mut tide::App) { + app.at("").nest(|router| { + router.at("/indexes").nest(|router| { + router.at("/").get(index::list_indexes); + + router.at("/search").post(search::search_multi_index); + + router.at("/:index").nest(|router| { + router.at("/search").get(search::search_with_url_query); + + router.at("/updates").nest(|router| { + router.at("/").get(index::get_all_updates_status); + + router.at("/:update_id").get(index::get_update_status); + }); + + router + .at("/") + .get(index::get_index_schema) + .post(index::create_index) + .put(index::update_schema) + .delete(index::delete_index); + + router.at("/documents").nest(|router| { + router + .at("/") + .get(document::browse_documents) + .post(document::add_or_update_multiple_documents) + .delete(document::clear_all_documents); + + router.at("/:identifier").nest(|router| { + router + .at("/") + .get(document::get_document) + .delete(document::delete_document); + }); + + router + .at("/delete") + .post(document::delete_multiple_documents); + }); + + router.at("/synonym").nest(|router| { + router.at("/").get(synonym::list).post(synonym::create); + + router + .at("/:synonym") + .get(synonym::get) + .put(synonym::update) + .delete(synonym::delete); + + router.at("/batch").post(synonym::batch_write); + router.at("/clear").post(synonym::clear); + }); + + router.at("/stop-words").nest(|router| { + router + .at("/") + .get(stop_words::list) + .put(stop_words::add) + .delete(stop_words::delete); + }); + + router + .at("/settings") + .get(setting::get) + .post(setting::update); + }); + }); + + router.at("/keys").nest(|router| { + router.at("/").get(key::list).post(key::create); + + router + .at("/:key") + .get(key::get) + .put(key::update) + .delete(key::delete); + }); + }); + + // Private + app.at("").nest(|router| { + router + .at("/health") + .get(health::get_health) + .post(health::set_healthy) + .put(health::change_healthyness) + .delete(health::set_unhealthy); + + router.at("/stats").get(stats::get_stats); + router.at("/stats/:index").get(stats::index_stat); + router.at("/version").get(stats::get_version); + router.at("/sys-info").get(stats::get_sys_info); + router + .at("/sys-info/pretty") + .get(stats::get_sys_info_pretty); + }); +} diff --git a/meilidb-http/src/routes/search.rs b/meilidb-http/src/routes/search.rs new file mode 100644 index 000000000..b8981d796 --- /dev/null +++ b/meilidb-http/src/routes/search.rs @@ -0,0 +1,231 @@ +use std::collections::HashMap; +use std::collections::HashSet; +use std::time::Duration; + +use meilidb_core::Index; +use rayon::iter::{IntoParallelIterator, ParallelIterator}; +use serde::{Deserialize, Serialize}; +use tide::querystring::ContextExt as QSContextExt; +use tide::{Context, Response}; + +use crate::error::{ResponseError, SResult}; +use crate::helpers::meilidb::{Error, IndexSearchExt, SearchHit}; +use crate::helpers::tide::ContextExt; +use crate::Data; + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase", deny_unknown_fields)] +struct SearchQuery { + q: String, + offset: Option, + limit: Option, + attributes_to_retrieve: Option, + attributes_to_search_in: Option, + attributes_to_crop: Option, + crop_length: Option, + attributes_to_highlight: Option, + filters: Option, + timeout_ms: Option, + matches: Option, +} + +pub async fn search_with_url_query(ctx: Context) -> SResult { + // ctx.is_allowed(DocumentsRead)?; + + let index = ctx.index()?; + let env = &ctx.state().db.env; + let reader = env.read_txn().map_err(ResponseError::internal)?; + + let query: SearchQuery = ctx + .url_query() + .map_err(|_| ResponseError::bad_request("invalid query parameter"))?; + + let mut search_builder = index.new_search(query.q.clone()); + + if let Some(offset) = query.offset { + search_builder.offset(offset); + } + if let Some(limit) = query.limit { + search_builder.limit(limit); + } + + if let Some(attributes_to_retrieve) = query.attributes_to_retrieve { + for attr in attributes_to_retrieve.split(',') { + search_builder.add_retrievable_field(attr.to_string()); + } + } + if let Some(attributes_to_search_in) = query.attributes_to_search_in { + for attr in attributes_to_search_in.split(',') { + search_builder.add_retrievable_field(attr.to_string()); + } + } + if let Some(attributes_to_crop) = query.attributes_to_crop { + let crop_length = query.crop_length.unwrap_or(200); + let attributes_to_crop = attributes_to_crop + .split(',') + .map(|r| (r.to_string(), crop_length)) + .collect(); + search_builder.attributes_to_crop(attributes_to_crop); + } + + if let Some(attributes_to_highlight) = query.attributes_to_highlight { + let attributes_to_highlight = attributes_to_highlight + .split(',') + .map(ToString::to_string) + .collect(); + search_builder.attributes_to_highlight(attributes_to_highlight); + } + + if let Some(filters) = query.filters { + search_builder.filters(filters); + } + + if let Some(timeout_ms) = query.timeout_ms { + search_builder.timeout(Duration::from_millis(timeout_ms)); + } + + if let Some(matches) = query.matches { + if matches { + search_builder.get_matches(); + } + } + + let response = match search_builder.search(&reader) { + Ok(response) => response, + Err(Error::Internal(message)) => return Err(ResponseError::Internal(message)), + Err(others) => return Err(ResponseError::bad_request(others)), + }; + + Ok(tide::response::json(response)) +} + +#[derive(Clone, Deserialize)] +#[serde(rename_all = "camelCase", deny_unknown_fields)] +struct SearchMultiBody { + indexes: HashSet, + query: String, + offset: Option, + limit: Option, + attributes_to_retrieve: Option>, + attributes_to_search_in: Option>, + attributes_to_crop: Option>, + attributes_to_highlight: Option>, + filters: Option, + timeout_ms: Option, + matches: Option, +} + +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +struct SearchMultiBodyResponse { + hits: HashMap>, + offset: usize, + hits_per_page: usize, + processing_time_ms: usize, + query: String, +} + +pub async fn search_multi_index(mut ctx: Context) -> SResult { + // ctx.is_allowed(DocumentsRead)?; + let body = ctx + .body_json::() + .await + .map_err(ResponseError::bad_request)?; + + let mut index_list = body.clone().indexes; + + for index in index_list.clone() { + if index == "*" { + index_list = ctx + .state() + .db + .indexes_names() + .map_err(ResponseError::internal)? + .into_iter() + .collect(); + } + } + + let mut offset = 0; + let mut count = 20; + + if let Some(body_offset) = body.offset { + if let Some(limit) = body.limit { + offset = body_offset; + count = limit; + } + } + + let offset = offset; + let count = count; + let db = &ctx.state().db; + let par_body = body.clone(); + let responses_per_index: Vec> = index_list + .into_par_iter() + .map(move |index_name| { + let index: Index = db + .open_index(&index_name) + .ok_or(ResponseError::index_not_found(&index_name))?; + + let mut search_builder = index.new_search(par_body.query.clone()); + + search_builder.offset(offset); + search_builder.limit(count); + + if let Some(attributes_to_retrieve) = par_body.attributes_to_retrieve.clone() { + search_builder.attributes_to_retrieve(attributes_to_retrieve); + } + if let Some(attributes_to_search_in) = par_body.attributes_to_search_in.clone() { + search_builder.attributes_to_search_in(attributes_to_search_in); + } + if let Some(attributes_to_crop) = par_body.attributes_to_crop.clone() { + search_builder.attributes_to_crop(attributes_to_crop); + } + if let Some(attributes_to_highlight) = par_body.attributes_to_highlight.clone() { + search_builder.attributes_to_highlight(attributes_to_highlight); + } + if let Some(filters) = par_body.filters.clone() { + search_builder.filters(filters); + } + if let Some(timeout_ms) = par_body.timeout_ms { + search_builder.timeout(Duration::from_secs(timeout_ms)); + } + if let Some(matches) = par_body.matches { + if matches { + search_builder.get_matches(); + } + } + + let env = &db.env; + let reader = env.read_txn().map_err(ResponseError::internal)?; + + let response = search_builder + .search(&reader) + .map_err(ResponseError::internal)?; + Ok((index_name, response)) + }) + .collect(); + + let mut hits_map = HashMap::new(); + + let mut max_query_time = 0; + + for response in responses_per_index { + if let Ok((index_name, response)) = response { + if response.processing_time_ms > max_query_time { + max_query_time = response.processing_time_ms; + } + hits_map.insert(index_name, response.hits); + } + } + + let response = SearchMultiBodyResponse { + hits: hits_map, + offset, + hits_per_page: count, + processing_time_ms: max_query_time, + query: body.query, + }; + + Ok(tide::response::json(response)) +} diff --git a/meilidb-http/src/routes/setting.rs b/meilidb-http/src/routes/setting.rs new file mode 100644 index 000000000..9c3a05f46 --- /dev/null +++ b/meilidb-http/src/routes/setting.rs @@ -0,0 +1,93 @@ +use std::collections::{HashMap, HashSet}; + +use http::StatusCode; +use serde::{Deserialize, Serialize}; +use tide::response::IntoResponse; +use tide::{Context, Response}; + +use crate::error::{ResponseError, SResult}; +use crate::helpers::tide::ContextExt; +use crate::models::token::ACL::*; +use crate::routes::document::IndexUpdateResponse; +use crate::Data; + +#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase", deny_unknown_fields)] +pub struct SettingBody { + pub stop_words: Option, + pub ranking_order: Option, + pub distinct_field: Option, + pub ranking_rules: Option, +} + +#[derive(Debug, Copy, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum RankingOrdering { + Asc, + Dsc, +} + +pub type StopWords = HashSet; +pub type RankingOrder = Vec; +pub type DistinctField = String; +pub type RankingRules = HashMap; + +pub async fn get(ctx: Context) -> SResult { + ctx.is_allowed(SettingsRead)?; + let index = ctx.index()?; + + let env = &ctx.state().db.env; + let reader = env.read_txn().map_err(ResponseError::internal)?; + + let settings = match index.main.customs(&reader).unwrap() { + Some(bytes) => bincode::deserialize(bytes).unwrap(), + None => SettingBody::default(), + }; + + Ok(tide::response::json(settings)) +} + +pub async fn update(mut ctx: Context) -> SResult { + ctx.is_allowed(SettingsWrite)?; + + let settings: SettingBody = ctx.body_json().await.map_err(ResponseError::bad_request)?; + + let index = ctx.index()?; + + let env = &ctx.state().db.env; + let mut writer = env.write_txn().map_err(ResponseError::internal)?; + + let mut current_settings = match index.main.customs(&writer).unwrap() { + Some(bytes) => bincode::deserialize(bytes).unwrap(), + None => SettingBody::default(), + }; + + if let Some(stop_words) = settings.stop_words { + current_settings.stop_words = Some(stop_words); + } + + if let Some(ranking_order) = settings.ranking_order { + current_settings.ranking_order = Some(ranking_order); + } + + if let Some(distinct_field) = settings.distinct_field { + current_settings.distinct_field = Some(distinct_field); + } + + if let Some(ranking_rules) = settings.ranking_rules { + current_settings.ranking_rules = Some(ranking_rules); + } + + let bytes = bincode::serialize(¤t_settings).unwrap(); + + let update_id = index + .customs_update(&mut writer, bytes) + .map_err(ResponseError::internal)?; + + writer.commit().map_err(ResponseError::internal)?; + + let response_body = IndexUpdateResponse { update_id }; + Ok(tide::response::json(response_body) + .with_status(StatusCode::ACCEPTED) + .into_response()) +} diff --git a/meilidb-http/src/routes/stats.rs b/meilidb-http/src/routes/stats.rs new file mode 100644 index 000000000..bdc82b138 --- /dev/null +++ b/meilidb-http/src/routes/stats.rs @@ -0,0 +1,334 @@ +use std::collections::HashMap; + +use chrono::{DateTime, Utc}; +use pretty_bytes::converter::convert; +use serde::Serialize; +use sysinfo::{NetworkExt, Pid, ProcessExt, ProcessorExt, System, SystemExt}; +use tide::{Context, Response}; +use walkdir::WalkDir; + +use crate::error::{ResponseError, SResult}; +use crate::helpers::tide::ContextExt; +use crate::models::token::ACL::*; +use crate::Data; + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +struct IndexStatsResponse { + number_of_documents: u64, + is_indexing: bool, + last_update: Option>, + fields_frequency: HashMap, +} + +pub async fn index_stat(ctx: Context) -> SResult { + ctx.is_allowed(Admin)?; + let index_name = ctx.url_param("index")?; + let index = ctx.index()?; + + let env = &ctx.state().db.env; + let reader = env.read_txn().map_err(ResponseError::internal)?; + + let number_of_documents = index + .main + .number_of_documents(&reader) + .map_err(ResponseError::internal)?; + + let fields_frequency = ctx + .state() + .fields_frequency(&reader, &index_name) + .map_err(ResponseError::internal)? + .unwrap_or_default(); + + let is_indexing = ctx + .state() + .is_indexing(&reader, &index_name) + .map_err(ResponseError::internal)? + .ok_or(ResponseError::not_found("Index not found"))?; + + let last_update = ctx + .state() + .last_update(&reader, &index_name) + .map_err(ResponseError::internal)?; + + let response = IndexStatsResponse { + number_of_documents, + is_indexing, + last_update, + fields_frequency, + }; + Ok(tide::response::json(response)) +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +struct StatsResult { + database_size: u64, + indexes: HashMap, +} + +pub async fn get_stats(ctx: Context) -> SResult { + ctx.is_allowed(Admin)?; + let mut index_list = HashMap::new(); + + if let Ok(indexes_set) = ctx.state().db.indexes_names() { + for index_name in indexes_set { + let db = &ctx.state().db; + let env = &db.env; + + let index = db.open_index(&index_name).unwrap(); + let reader = env.read_txn().map_err(ResponseError::internal)?; + + let number_of_documents = index + .main + .number_of_documents(&reader) + .map_err(ResponseError::internal)?; + + let fields_frequency = ctx + .state() + .fields_frequency(&reader, &index_name) + .map_err(ResponseError::internal)? + .unwrap_or_default(); + + let is_indexing = ctx + .state() + .is_indexing(&reader, &index_name) + .map_err(ResponseError::internal)? + .ok_or(ResponseError::not_found("Index not found"))?; + + let last_update = ctx + .state() + .last_update(&reader, &index_name) + .map_err(ResponseError::internal)?; + + let response = IndexStatsResponse { + number_of_documents, + is_indexing, + last_update, + fields_frequency, + }; + index_list.insert(index_name, response); + } + } + + let database_size = WalkDir::new(ctx.state().db_path.clone()) + .into_iter() + .filter_map(|entry| entry.ok()) + .filter_map(|entry| entry.metadata().ok()) + .filter(|metadata| metadata.is_file()) + .fold(0, |acc, m| acc + m.len()); + + let response = StatsResult { + database_size, + indexes: index_list, + }; + + Ok(tide::response::json(response)) +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +struct VersionResponse { + commit_sha: String, + build_date: String, + pkg_version: String, +} + +pub async fn get_version(ctx: Context) -> SResult { + ctx.is_allowed(Admin)?; + let response = VersionResponse { + commit_sha: env!("VERGEN_SHA").to_string(), + build_date: env!("VERGEN_BUILD_TIMESTAMP").to_string(), + pkg_version: env!("CARGO_PKG_VERSION").to_string(), + }; + + Ok(tide::response::json(response)) +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub(crate) struct SysGlobal { + total_memory: u64, + used_memory: u64, + total_swap: u64, + used_swap: u64, + input_data: u64, + output_data: u64, +} + +impl SysGlobal { + fn new() -> SysGlobal { + SysGlobal { + total_memory: 0, + used_memory: 0, + total_swap: 0, + used_swap: 0, + input_data: 0, + output_data: 0, + } + } +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub(crate) struct SysProcess { + memory: u64, + cpu: f32, +} + +impl SysProcess { + fn new() -> SysProcess { + SysProcess { + memory: 0, + cpu: 0.0, + } + } +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub(crate) struct SysInfo { + memory_usage: f64, + processor_usage: Vec, + global: SysGlobal, + process: SysProcess, +} + +impl SysInfo { + fn new() -> SysInfo { + SysInfo { + memory_usage: 0.0, + processor_usage: Vec::new(), + global: SysGlobal::new(), + process: SysProcess::new(), + } + } +} + +pub(crate) fn report(pid: Pid) -> SysInfo { + let mut sys = System::new(); + let mut info = SysInfo::new(); + + info.memory_usage = sys.get_used_memory() as f64 / sys.get_total_memory() as f64 * 100.0; + + for processor in sys.get_processor_list() { + info.processor_usage.push(processor.get_cpu_usage() * 100.0); + } + + info.global.total_memory = sys.get_total_memory(); + info.global.used_memory = sys.get_used_memory(); + info.global.total_swap = sys.get_total_swap(); + info.global.used_swap = sys.get_used_swap(); + info.global.input_data = sys.get_network().get_income(); + info.global.output_data = sys.get_network().get_outcome(); + + if let Some(process) = sys.get_process(pid) { + info.process.memory = process.memory(); + info.process.cpu = process.cpu_usage() * 100.0; + } + + sys.refresh_all(); + + info +} + +pub async fn get_sys_info(ctx: Context) -> SResult { + ctx.is_allowed(Admin)?; + Ok(tide::response::json(report(ctx.state().server_pid))) +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub(crate) struct SysGlobalPretty { + total_memory: String, + used_memory: String, + total_swap: String, + used_swap: String, + input_data: String, + output_data: String, +} + +impl SysGlobalPretty { + fn new() -> SysGlobalPretty { + SysGlobalPretty { + total_memory: "None".to_owned(), + used_memory: "None".to_owned(), + total_swap: "None".to_owned(), + used_swap: "None".to_owned(), + input_data: "None".to_owned(), + output_data: "None".to_owned(), + } + } +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub(crate) struct SysProcessPretty { + memory: String, + cpu: String, +} + +impl SysProcessPretty { + fn new() -> SysProcessPretty { + SysProcessPretty { + memory: "None".to_owned(), + cpu: "None".to_owned(), + } + } +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub(crate) struct SysInfoPretty { + memory_usage: String, + processor_usage: Vec, + global: SysGlobalPretty, + process: SysProcessPretty, +} + +impl SysInfoPretty { + fn new() -> SysInfoPretty { + SysInfoPretty { + memory_usage: "None".to_owned(), + processor_usage: Vec::new(), + global: SysGlobalPretty::new(), + process: SysProcessPretty::new(), + } + } +} + +pub(crate) fn report_pretty(pid: Pid) -> SysInfoPretty { + let mut sys = System::new(); + let mut info = SysInfoPretty::new(); + + info.memory_usage = format!( + "{:.1} %", + sys.get_used_memory() as f64 / sys.get_total_memory() as f64 * 100.0 + ); + + for processor in sys.get_processor_list() { + info.processor_usage + .push(format!("{:.1} %", processor.get_cpu_usage() * 100.0)); + } + + info.global.total_memory = convert(sys.get_total_memory() as f64 * 1024.0); + info.global.used_memory = convert(sys.get_used_memory() as f64 * 1024.0); + info.global.total_swap = convert(sys.get_total_swap() as f64 * 1024.0); + info.global.used_swap = convert(sys.get_used_swap() as f64 * 1024.0); + info.global.input_data = convert(sys.get_network().get_income() as f64); + info.global.output_data = convert(sys.get_network().get_outcome() as f64); + + if let Some(process) = sys.get_process(pid) { + info.process.memory = convert(process.memory() as f64 * 1024.0); + info.process.cpu = format!("{:.1} %", process.cpu_usage() * 100.0); + } + + sys.refresh_all(); + + info +} + +pub async fn get_sys_info_pretty(ctx: Context) -> SResult { + ctx.is_allowed(Admin)?; + Ok(tide::response::json(report_pretty(ctx.state().server_pid))) +} diff --git a/meilidb-http/src/routes/stop_words.rs b/meilidb-http/src/routes/stop_words.rs new file mode 100644 index 000000000..4628ad8fe --- /dev/null +++ b/meilidb-http/src/routes/stop_words.rs @@ -0,0 +1,82 @@ +use http::StatusCode; +use tide::response::IntoResponse; +use tide::{Context, Response}; + +use crate::error::{ResponseError, SResult}; +use crate::helpers::tide::ContextExt; +use crate::models::token::ACL::*; +use crate::routes::document::IndexUpdateResponse; +use crate::Data; + +pub async fn list(ctx: Context) -> SResult { + ctx.is_allowed(SettingsRead)?; + let index = ctx.index()?; + + let env = &ctx.state().db.env; + let reader = env.read_txn().map_err(ResponseError::internal)?; + + let stop_words_fst = index + .main + .stop_words_fst(&reader) + .map_err(ResponseError::internal)?; + + let stop_words = stop_words_fst + .unwrap_or_default() + .stream() + .into_strs() + .map_err(ResponseError::internal)?; + + Ok(tide::response::json(stop_words)) +} + +pub async fn add(mut ctx: Context) -> SResult { + ctx.is_allowed(SettingsRead)?; + let index = ctx.index()?; + + let data: Vec = ctx.body_json().await.map_err(ResponseError::bad_request)?; + + let env = &ctx.state().db.env; + let mut writer = env.write_txn().map_err(ResponseError::internal)?; + + let mut stop_words_addition = index.stop_words_addition(); + for stop_word in data { + stop_words_addition.add_stop_word(stop_word); + } + + let update_id = stop_words_addition + .finalize(&mut writer) + .map_err(ResponseError::internal)?; + + writer.commit().map_err(ResponseError::internal)?; + + let response_body = IndexUpdateResponse { update_id }; + Ok(tide::response::json(response_body) + .with_status(StatusCode::ACCEPTED) + .into_response()) +} + +pub async fn delete(mut ctx: Context) -> SResult { + ctx.is_allowed(SettingsRead)?; + let index = ctx.index()?; + + let data: Vec = ctx.body_json().await.map_err(ResponseError::bad_request)?; + + let env = &ctx.state().db.env; + let mut writer = env.write_txn().map_err(ResponseError::internal)?; + + let mut stop_words_deletion = index.stop_words_deletion(); + for stop_word in data { + stop_words_deletion.delete_stop_word(stop_word); + } + + let update_id = stop_words_deletion + .finalize(&mut writer) + .map_err(ResponseError::internal)?; + + writer.commit().map_err(ResponseError::internal)?; + + let response_body = IndexUpdateResponse { update_id }; + Ok(tide::response::json(response_body) + .with_status(StatusCode::ACCEPTED) + .into_response()) +} diff --git a/meilidb-http/src/routes/synonym.rs b/meilidb-http/src/routes/synonym.rs new file mode 100644 index 000000000..de45e5b86 --- /dev/null +++ b/meilidb-http/src/routes/synonym.rs @@ -0,0 +1,235 @@ +use std::collections::HashMap; + +use http::StatusCode; +use serde::{Deserialize, Serialize}; +use tide::response::IntoResponse; +use tide::{Context, Response}; + +use crate::error::{ResponseError, SResult}; +use crate::helpers::tide::ContextExt; +use crate::models::token::ACL::*; +use crate::routes::document::IndexUpdateResponse; +use crate::Data; + +#[derive(Clone, Serialize, Deserialize)] +#[serde(untagged)] +pub enum Synonym { + OneWay(SynonymOneWay), + MultiWay { synonyms: Vec }, +} + +#[derive(Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase", deny_unknown_fields)] +pub struct SynonymOneWay { + pub input: String, + pub synonyms: Vec, +} + +pub type Synonyms = Vec; + +pub async fn list(ctx: Context) -> SResult { + ctx.is_allowed(SettingsRead)?; + let index = ctx.index()?; + + let env = &ctx.state().db.env; + let reader = env.read_txn().map_err(ResponseError::internal)?; + + let synonyms_fst = index + .main + .synonyms_fst(&reader) + .map_err(ResponseError::internal)?; + + let synonyms_fst = synonyms_fst.unwrap_or_default(); + let synonyms_list = synonyms_fst.stream().into_strs().unwrap(); + + let mut response = HashMap::new(); + + let index_synonyms = &index.synonyms; + + for synonym in synonyms_list { + let alternative_list = index_synonyms + .synonyms(&reader, synonym.as_bytes()) + .unwrap() + .unwrap() + .stream() + .into_strs() + .unwrap(); + response.insert(synonym, alternative_list); + } + + Ok(tide::response::json(response)) +} + +pub async fn get(ctx: Context) -> SResult { + ctx.is_allowed(SettingsRead)?; + let synonym = ctx.url_param("synonym")?; + let index = ctx.index()?; + + let env = &ctx.state().db.env; + let reader = env.read_txn().map_err(ResponseError::internal)?; + + let synonym_list = index + .synonyms + .synonyms(&reader, synonym.as_bytes()) + .unwrap() + .unwrap() + .stream() + .into_strs() + .unwrap(); + + Ok(tide::response::json(synonym_list)) +} + +pub async fn create(mut ctx: Context) -> SResult { + ctx.is_allowed(SettingsWrite)?; + + let data: Synonym = ctx.body_json().await.map_err(ResponseError::bad_request)?; + + let index = ctx.index()?; + + let env = &ctx.state().db.env; + let mut writer = env.write_txn().map_err(ResponseError::internal)?; + + let mut synonyms_addition = index.synonyms_addition(); + + match data.clone() { + Synonym::OneWay(content) => { + synonyms_addition.add_synonym(content.input, content.synonyms.into_iter()) + } + Synonym::MultiWay { mut synonyms } => { + if synonyms.len() > 1 { + for _ in 0..synonyms.len() { + let (first, elems) = synonyms.split_first().unwrap(); + synonyms_addition.add_synonym(first, elems.iter()); + synonyms.rotate_left(1); + } + } + } + } + + let update_id = synonyms_addition + .finalize(&mut writer) + .map_err(ResponseError::internal)?; + + writer.commit().map_err(ResponseError::internal)?; + + let response_body = IndexUpdateResponse { update_id }; + Ok(tide::response::json(response_body) + .with_status(StatusCode::CREATED) + .into_response()) +} + +pub async fn update(mut ctx: Context) -> SResult { + ctx.is_allowed(SettingsWrite)?; + let synonym = ctx.url_param("synonym")?; + let index = ctx.index()?; + let data: Vec = ctx.body_json().await.map_err(ResponseError::bad_request)?; + + let env = &ctx.state().db.env; + let mut writer = env.write_txn().map_err(ResponseError::internal)?; + + let mut synonyms_addition = index.synonyms_addition(); + synonyms_addition.add_synonym(synonym.clone(), data.clone().into_iter()); + let update_id = synonyms_addition + .finalize(&mut writer) + .map_err(ResponseError::internal)?; + + writer.commit().map_err(ResponseError::internal)?; + + let response_body = IndexUpdateResponse { update_id }; + Ok(tide::response::json(response_body) + .with_status(StatusCode::ACCEPTED) + .into_response()) +} + +pub async fn delete(ctx: Context) -> SResult { + ctx.is_allowed(SettingsWrite)?; + let synonym = ctx.url_param("synonym")?; + let index = ctx.index()?; + + let env = &ctx.state().db.env; + let mut writer = env.write_txn().map_err(ResponseError::internal)?; + + let mut synonyms_deletion = index.synonyms_deletion(); + synonyms_deletion.delete_all_alternatives_of(synonym); + let update_id = synonyms_deletion + .finalize(&mut writer) + .map_err(ResponseError::internal)?; + + writer.commit().map_err(ResponseError::internal)?; + + let response_body = IndexUpdateResponse { update_id }; + Ok(tide::response::json(response_body) + .with_status(StatusCode::ACCEPTED) + .into_response()) +} + +pub async fn batch_write(mut ctx: Context) -> SResult { + ctx.is_allowed(SettingsWrite)?; + + let data: Synonyms = ctx.body_json().await.map_err(ResponseError::bad_request)?; + + let index = ctx.index()?; + + let env = &ctx.state().db.env; + let mut writer = env.write_txn().map_err(ResponseError::internal)?; + + let mut synonyms_addition = index.synonyms_addition(); + for raw in data { + match raw { + Synonym::OneWay(content) => { + synonyms_addition.add_synonym(content.input, content.synonyms.into_iter()) + } + Synonym::MultiWay { mut synonyms } => { + if synonyms.len() > 1 { + for _ in 0..synonyms.len() { + let (first, elems) = synonyms.split_first().unwrap(); + synonyms_addition.add_synonym(first, elems.iter()); + synonyms.rotate_left(1); + } + } + } + } + } + let update_id = synonyms_addition + .finalize(&mut writer) + .map_err(ResponseError::internal)?; + + writer.commit().map_err(ResponseError::internal)?; + + let response_body = IndexUpdateResponse { update_id }; + Ok(tide::response::json(response_body) + .with_status(StatusCode::ACCEPTED) + .into_response()) +} + +pub async fn clear(ctx: Context) -> SResult { + ctx.is_allowed(SettingsWrite)?; + let index = ctx.index()?; + + let env = &ctx.state().db.env; + let mut writer = env.write_txn().map_err(ResponseError::internal)?; + + let synonyms_fst = index + .main + .synonyms_fst(&writer) + .map_err(ResponseError::internal)?; + + let synonyms_fst = synonyms_fst.unwrap_or_default(); + let synonyms_list = synonyms_fst.stream().into_strs().unwrap(); + + let mut synonyms_deletion = index.synonyms_deletion(); + for synonym in synonyms_list { + synonyms_deletion.delete_all_alternatives_of(synonym); + } + let update_id = synonyms_deletion + .finalize(&mut writer) + .map_err(ResponseError::internal)?; + + writer.commit().map_err(ResponseError::internal)?; + + let response_body = IndexUpdateResponse { update_id }; + Ok(tide::response::json(response_body) + .with_status(StatusCode::ACCEPTED) + .into_response()) +}