2020-11-13 23:16:07 +08:00
|
|
|
use std::collections::{HashMap, HashSet};
|
2020-12-21 19:52:09 +08:00
|
|
|
use std::fmt::Display;
|
2020-10-19 22:03:17 +08:00
|
|
|
use std::fs::{File, create_dir_all};
|
2020-05-31 23:48:13 +08:00
|
|
|
use std::net::SocketAddr;
|
2020-11-18 04:19:25 +08:00
|
|
|
use std::num::NonZeroUsize;
|
2020-05-31 23:48:13 +08:00
|
|
|
use std::path::PathBuf;
|
|
|
|
use std::str::FromStr;
|
2020-10-19 22:03:17 +08:00
|
|
|
use std::sync::Arc;
|
2020-05-31 23:48:13 +08:00
|
|
|
use std::time::Instant;
|
2020-11-02 20:01:32 +08:00
|
|
|
use std::{mem, io};
|
2020-05-31 23:48:13 +08:00
|
|
|
|
2020-07-11 20:17:37 +08:00
|
|
|
use askama_warp::Template;
|
2020-12-20 18:55:21 +08:00
|
|
|
use byte_unit::Byte;
|
2020-10-24 22:23:08 +08:00
|
|
|
use flate2::read::GzDecoder;
|
2020-10-20 17:19:34 +08:00
|
|
|
use futures::stream;
|
2020-10-22 00:26:29 +08:00
|
|
|
use futures::{FutureExt, StreamExt};
|
2020-10-27 03:18:10 +08:00
|
|
|
use grenad::CompressionType;
|
2020-05-31 23:48:13 +08:00
|
|
|
use heed::EnvOpenOptions;
|
2020-11-03 02:11:22 +08:00
|
|
|
use once_cell::sync::OnceCell;
|
|
|
|
use rayon::ThreadPool;
|
2020-11-02 22:47:21 +08:00
|
|
|
use serde::{Serialize, Deserialize, Deserializer};
|
2020-11-05 20:58:07 +08:00
|
|
|
use serde_json::{Map, Value};
|
2020-05-31 23:48:13 +08:00
|
|
|
use structopt::StructOpt;
|
2020-10-19 22:03:17 +08:00
|
|
|
use tokio::fs::File as TFile;
|
|
|
|
use tokio::io::AsyncWriteExt;
|
2020-10-20 17:19:34 +08:00
|
|
|
use tokio::sync::broadcast;
|
2020-10-19 22:03:17 +08:00
|
|
|
use warp::filters::ws::Message;
|
2020-05-31 23:48:13 +08:00
|
|
|
use warp::{Filter, http::Response};
|
2020-12-24 03:04:19 +08:00
|
|
|
use meilisearch_tokenizer::{Analyzer, AnalyzerConfig};
|
2020-05-31 23:48:13 +08:00
|
|
|
|
2020-11-11 20:14:16 +08:00
|
|
|
use milli::update::UpdateIndexingStep::*;
|
2020-11-28 19:43:43 +08:00
|
|
|
use milli::update::{UpdateBuilder, IndexDocumentsMethod, UpdateFormat};
|
2020-11-14 20:21:22 +08:00
|
|
|
use milli::{obkv_to_json, Index, UpdateStore, SearchResult, FacetCondition};
|
2020-06-11 04:05:01 +08:00
|
|
|
|
2020-11-03 02:11:22 +08:00
|
|
|
static GLOBAL_THREAD_POOL: OnceCell<ThreadPool> = OnceCell::new();
|
|
|
|
|
2020-05-31 23:48:13 +08:00
|
|
|
#[derive(Debug, StructOpt)]
|
2020-10-19 19:44:17 +08:00
|
|
|
/// The HTTP main server of the milli project.
|
|
|
|
pub struct Opt {
|
2020-05-31 23:48:13 +08:00
|
|
|
/// The database path where the LMDB database is located.
|
|
|
|
/// It is created if it doesn't already exist.
|
|
|
|
#[structopt(long = "db", parse(from_os_str))]
|
|
|
|
database: PathBuf,
|
|
|
|
|
|
|
|
/// The maximum size the database can take on disk. It is recommended to specify
|
|
|
|
/// the whole disk space (value must be a multiple of a page size).
|
2020-12-20 18:55:21 +08:00
|
|
|
#[structopt(long = "db-size", default_value = "100 GiB")]
|
|
|
|
database_size: Byte,
|
2020-05-31 23:48:13 +08:00
|
|
|
|
2020-10-19 22:03:17 +08:00
|
|
|
/// The maximum size the database that stores the updates can take on disk. It is recommended
|
|
|
|
/// to specify the whole disk space (value must be a multiple of a page size).
|
2020-12-20 18:55:21 +08:00
|
|
|
#[structopt(long = "udb-size", default_value = "10 GiB")]
|
|
|
|
update_database_size: Byte,
|
2020-10-19 22:03:17 +08:00
|
|
|
|
2020-07-14 17:27:46 +08:00
|
|
|
/// Disable document highlighting on the dashboard.
|
|
|
|
#[structopt(long)]
|
|
|
|
disable_highlighting: bool,
|
|
|
|
|
2020-07-12 17:04:35 +08:00
|
|
|
/// Verbose mode (-v, -vv, -vvv, etc.)
|
|
|
|
#[structopt(short, long, parse(from_occurrences))]
|
|
|
|
verbose: usize,
|
|
|
|
|
2020-05-31 23:48:13 +08:00
|
|
|
/// The ip and port on which the database will listen for HTTP requests.
|
|
|
|
#[structopt(short = "l", long, default_value = "127.0.0.1:9700")]
|
|
|
|
http_listen_addr: String,
|
2020-10-20 20:20:17 +08:00
|
|
|
|
|
|
|
#[structopt(flatten)]
|
|
|
|
indexer: IndexerOpt,
|
2020-05-31 23:48:13 +08:00
|
|
|
}
|
|
|
|
|
2020-10-27 03:18:10 +08:00
|
|
|
#[derive(Debug, Clone, StructOpt)]
|
|
|
|
pub struct IndexerOpt {
|
|
|
|
/// The amount of documents to skip before printing
|
|
|
|
/// a log regarding the indexing advancement.
|
2020-11-10 00:34:52 +08:00
|
|
|
#[structopt(long, default_value = "100000")] // 100k
|
2020-10-27 03:18:10 +08:00
|
|
|
pub log_every_n: usize,
|
|
|
|
|
|
|
|
/// MTBL max number of chunks in bytes.
|
|
|
|
#[structopt(long)]
|
|
|
|
pub max_nb_chunks: Option<usize>,
|
|
|
|
|
|
|
|
/// The maximum amount of memory to use for the MTBL buffer. It is recommended
|
|
|
|
/// to use something like 80%-90% of the available memory.
|
|
|
|
///
|
|
|
|
/// It is automatically split by the number of jobs e.g. if you use 7 jobs
|
|
|
|
/// and 7 GB of max memory, each thread will use a maximum of 1 GB.
|
2020-12-20 18:55:21 +08:00
|
|
|
#[structopt(long, default_value = "7 GiB")]
|
|
|
|
pub max_memory: Byte,
|
2020-10-27 03:18:10 +08:00
|
|
|
|
|
|
|
/// Size of the linked hash map cache when indexing.
|
|
|
|
/// The bigger it is, the faster the indexing is but the more memory it takes.
|
|
|
|
#[structopt(long, default_value = "500")]
|
|
|
|
pub linked_hash_map_size: usize,
|
|
|
|
|
|
|
|
/// The name of the compression algorithm to use when compressing intermediate
|
|
|
|
/// chunks during indexing documents.
|
|
|
|
///
|
|
|
|
/// Choosing a fast algorithm will make the indexing faster but may consume more memory.
|
|
|
|
#[structopt(long, default_value = "snappy", possible_values = &["snappy", "zlib", "lz4", "lz4hc", "zstd"])]
|
|
|
|
pub chunk_compression_type: CompressionType,
|
|
|
|
|
|
|
|
/// The level of compression of the chosen algorithm.
|
|
|
|
#[structopt(long, requires = "chunk-compression-type")]
|
|
|
|
pub chunk_compression_level: Option<u32>,
|
|
|
|
|
|
|
|
/// The number of bytes to remove from the begining of the chunks while reading/sorting
|
|
|
|
/// or merging them.
|
|
|
|
///
|
|
|
|
/// File fusing must only be enable on file systems that support the `FALLOC_FL_COLLAPSE_RANGE`,
|
|
|
|
/// (i.e. ext4 and XFS). File fusing will only work if the `enable-chunk-fusing` is set.
|
2020-12-20 18:55:21 +08:00
|
|
|
#[structopt(long, default_value = "4 GiB")]
|
|
|
|
pub chunk_fusing_shrink_size: Byte,
|
2020-10-27 03:18:10 +08:00
|
|
|
|
|
|
|
/// Enable the chunk fusing or not, this reduces the amount of disk used by a factor of 2.
|
|
|
|
#[structopt(long)]
|
|
|
|
pub enable_chunk_fusing: bool,
|
|
|
|
|
|
|
|
/// Number of parallel jobs for indexing, defaults to # of CPUs.
|
|
|
|
#[structopt(long)]
|
|
|
|
pub indexing_jobs: Option<usize>,
|
|
|
|
}
|
|
|
|
|
2020-12-24 03:04:19 +08:00
|
|
|
struct Highlighter<'a, A> {
|
|
|
|
analyzer: Analyzer<'a, A>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<'a, A: AsRef<[u8]>> Highlighter<'a, A> {
|
|
|
|
fn new(stop_words: &'a fst::Set<A>) -> Self {
|
|
|
|
let analyzer = Analyzer::new(AnalyzerConfig::default_with_stopwords(stop_words));
|
|
|
|
Self { analyzer }
|
|
|
|
}
|
|
|
|
|
|
|
|
fn highlight_value(&self, value: Value, words_to_highlight: &HashSet<String>) -> Value {
|
2020-11-05 20:58:07 +08:00
|
|
|
match value {
|
|
|
|
Value::Null => Value::Null,
|
|
|
|
Value::Bool(boolean) => Value::Bool(boolean),
|
|
|
|
Value::Number(number) => Value::Number(number),
|
|
|
|
Value::String(old_string) => {
|
|
|
|
let mut string = String::new();
|
2020-12-24 03:04:19 +08:00
|
|
|
let analyzed = self.analyzer.analyze(&old_string);
|
|
|
|
for (word, token) in analyzed.reconstruct() {
|
|
|
|
if token.is_word() {
|
|
|
|
let to_highlight = words_to_highlight.contains(token.text());
|
2020-11-05 20:58:07 +08:00
|
|
|
if to_highlight { string.push_str("<mark>") }
|
2020-12-24 03:04:19 +08:00
|
|
|
string.push_str(word);
|
2020-11-05 20:58:07 +08:00
|
|
|
if to_highlight { string.push_str("</mark>") }
|
|
|
|
} else {
|
2020-12-24 03:04:19 +08:00
|
|
|
string.push_str(word);
|
2020-11-05 20:58:07 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
Value::String(string)
|
|
|
|
},
|
|
|
|
Value::Array(values) => {
|
|
|
|
Value::Array(values.into_iter()
|
2020-12-24 03:04:19 +08:00
|
|
|
.map(|v| self.highlight_value(v, words_to_highlight))
|
2020-11-05 20:58:07 +08:00
|
|
|
.collect())
|
|
|
|
},
|
|
|
|
Value::Object(object) => {
|
|
|
|
Value::Object(object.into_iter()
|
2020-12-24 03:04:19 +08:00
|
|
|
.map(|(k, v)| (k, self.highlight_value(v, words_to_highlight)))
|
2020-11-05 20:58:07 +08:00
|
|
|
.collect())
|
|
|
|
},
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-12-24 03:04:19 +08:00
|
|
|
fn highlight_record(
|
|
|
|
&self,
|
|
|
|
object: &mut Map<String, Value>,
|
|
|
|
words_to_highlight: &HashSet<String>,
|
|
|
|
attributes_to_highlight: &HashSet<String>,
|
|
|
|
) {
|
|
|
|
// TODO do we need to create a string for element that are not and needs to be highlight?
|
|
|
|
for (key, value) in object.iter_mut() {
|
|
|
|
if attributes_to_highlight.contains(key) {
|
|
|
|
let old_value = mem::take(value);
|
|
|
|
*value = self.highlight_value(old_value, words_to_highlight);
|
|
|
|
}
|
2020-08-31 03:50:30 +08:00
|
|
|
}
|
2020-08-05 19:52:27 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-07-11 20:17:37 +08:00
|
|
|
#[derive(Template)]
|
|
|
|
#[template(path = "index.html")]
|
|
|
|
struct IndexTemplate {
|
|
|
|
db_name: String,
|
|
|
|
db_size: usize,
|
|
|
|
docs_count: usize,
|
|
|
|
}
|
|
|
|
|
2020-10-20 01:57:15 +08:00
|
|
|
#[derive(Template)]
|
|
|
|
#[template(path = "updates.html")]
|
2020-12-21 19:52:09 +08:00
|
|
|
struct UpdatesTemplate<M: Serialize + Send, P: Serialize + Send, N: Serialize + Send + Display> {
|
2020-10-20 01:57:15 +08:00
|
|
|
db_name: String,
|
2020-10-20 18:09:38 +08:00
|
|
|
db_size: usize,
|
|
|
|
docs_count: usize,
|
2020-10-21 21:38:28 +08:00
|
|
|
updates: Vec<UpdateStatus<M, P, N>>,
|
2020-10-20 18:09:38 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Debug, Clone, Serialize)]
|
|
|
|
#[serde(tag = "type")]
|
2020-10-21 21:38:28 +08:00
|
|
|
enum UpdateStatus<M, P, N> {
|
2020-10-20 18:09:38 +08:00
|
|
|
Pending { update_id: u64, meta: M },
|
2020-10-21 21:38:28 +08:00
|
|
|
Progressing { update_id: u64, meta: P },
|
|
|
|
Processed { update_id: u64, meta: N },
|
2020-11-29 19:23:52 +08:00
|
|
|
Aborted { update_id: u64, meta: M },
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<M, P, N> UpdateStatus<M, P, N> {
|
|
|
|
fn update_id(&self) -> u64 {
|
|
|
|
match self {
|
|
|
|
UpdateStatus::Pending { update_id, .. } => *update_id,
|
|
|
|
UpdateStatus::Progressing { update_id, .. } => *update_id,
|
|
|
|
UpdateStatus::Processed { update_id, .. } => *update_id,
|
|
|
|
UpdateStatus::Aborted { update_id, .. } => *update_id,
|
|
|
|
}
|
|
|
|
}
|
2020-10-21 21:38:28 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
|
|
#[serde(tag = "type")]
|
|
|
|
enum UpdateMeta {
|
2020-12-21 06:10:09 +08:00
|
|
|
DocumentsAddition { method: String, format: String, encoding: Option<String> },
|
2020-10-30 20:12:55 +08:00
|
|
|
ClearDocuments,
|
2020-11-02 22:47:21 +08:00
|
|
|
Settings(Settings),
|
2020-11-23 20:08:57 +08:00
|
|
|
Facets(Facets),
|
2020-10-21 21:38:28 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
|
|
#[serde(tag = "type")]
|
|
|
|
enum UpdateMetaProgress {
|
|
|
|
DocumentsAddition {
|
2020-11-11 20:14:16 +08:00
|
|
|
step: usize,
|
|
|
|
total_steps: usize,
|
|
|
|
current: usize,
|
|
|
|
total: Option<usize>,
|
2020-10-21 21:38:28 +08:00
|
|
|
},
|
2020-10-20 01:57:15 +08:00
|
|
|
}
|
|
|
|
|
2020-11-02 22:47:21 +08:00
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
2020-11-13 23:16:07 +08:00
|
|
|
#[serde(deny_unknown_fields)]
|
|
|
|
#[serde(rename_all = "camelCase")]
|
2020-11-02 22:47:21 +08:00
|
|
|
struct Settings {
|
|
|
|
#[serde(
|
|
|
|
default,
|
|
|
|
deserialize_with = "deserialize_some",
|
|
|
|
skip_serializing_if = "Option::is_none",
|
|
|
|
)]
|
|
|
|
displayed_attributes: Option<Option<Vec<String>>>,
|
2020-11-04 02:35:55 +08:00
|
|
|
|
|
|
|
#[serde(
|
|
|
|
default,
|
|
|
|
deserialize_with = "deserialize_some",
|
|
|
|
skip_serializing_if = "Option::is_none",
|
|
|
|
)]
|
|
|
|
searchable_attributes: Option<Option<Vec<String>>>,
|
2020-11-13 23:16:07 +08:00
|
|
|
|
|
|
|
#[serde(default)]
|
|
|
|
faceted_attributes: Option<HashMap<String, String>>,
|
2020-12-04 19:02:22 +08:00
|
|
|
|
|
|
|
#[serde(
|
|
|
|
default,
|
|
|
|
deserialize_with = "deserialize_some",
|
|
|
|
skip_serializing_if = "Option::is_none",
|
|
|
|
)]
|
|
|
|
criteria: Option<Option<Vec<String>>>,
|
2020-11-02 22:47:21 +08:00
|
|
|
}
|
|
|
|
|
2020-11-18 04:19:25 +08:00
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
|
|
#[serde(deny_unknown_fields)]
|
|
|
|
#[serde(rename_all = "camelCase")]
|
2020-11-23 20:08:57 +08:00
|
|
|
struct Facets {
|
2020-11-28 19:43:43 +08:00
|
|
|
level_group_size: Option<NonZeroUsize>,
|
|
|
|
min_level_size: Option<NonZeroUsize>,
|
2020-11-18 04:19:25 +08:00
|
|
|
}
|
|
|
|
|
2020-11-02 22:47:21 +08:00
|
|
|
// Any value that is present is considered Some value, including null.
|
|
|
|
fn deserialize_some<'de, T, D>(deserializer: D) -> Result<Option<T>, D::Error>
|
|
|
|
where T: Deserialize<'de>,
|
|
|
|
D: Deserializer<'de>
|
|
|
|
{
|
|
|
|
Deserialize::deserialize(deserializer).map(Some)
|
|
|
|
}
|
|
|
|
|
2020-11-05 18:16:39 +08:00
|
|
|
#[tokio::main]
|
|
|
|
async fn main() -> anyhow::Result<()> {
|
|
|
|
let opt = Opt::from_args();
|
|
|
|
|
2020-07-12 17:04:35 +08:00
|
|
|
stderrlog::new()
|
|
|
|
.verbosity(opt.verbose)
|
|
|
|
.show_level(false)
|
|
|
|
.timestamp(stderrlog::Timestamp::Off)
|
|
|
|
.init()?;
|
|
|
|
|
2020-10-20 21:00:58 +08:00
|
|
|
create_dir_all(&opt.database)?;
|
2020-10-30 17:56:35 +08:00
|
|
|
let mut options = EnvOpenOptions::new();
|
2020-12-20 18:55:21 +08:00
|
|
|
options.map_size(opt.database_size.get_bytes() as usize);
|
2020-05-31 23:48:13 +08:00
|
|
|
|
2020-11-03 02:11:22 +08:00
|
|
|
// Setup the global thread pool
|
|
|
|
let jobs = opt.indexer.indexing_jobs.unwrap_or(0);
|
|
|
|
let pool = rayon::ThreadPoolBuilder::new().num_threads(jobs).build()?;
|
|
|
|
GLOBAL_THREAD_POOL.set(pool).unwrap();
|
|
|
|
|
2020-08-07 19:11:31 +08:00
|
|
|
// Open the LMDB database.
|
2020-10-30 17:56:35 +08:00
|
|
|
let index = Index::new(options, &opt.database)?;
|
2020-08-07 19:11:31 +08:00
|
|
|
|
2020-10-19 22:03:17 +08:00
|
|
|
// Setup the LMDB based update database.
|
|
|
|
let mut update_store_options = EnvOpenOptions::new();
|
2020-12-20 18:55:21 +08:00
|
|
|
update_store_options.map_size(opt.update_database_size.get_bytes() as usize);
|
2020-10-19 22:03:17 +08:00
|
|
|
|
|
|
|
let update_store_path = opt.database.join("updates.mdb");
|
|
|
|
create_dir_all(&update_store_path)?;
|
|
|
|
|
2020-10-20 17:19:34 +08:00
|
|
|
let (update_status_sender, _) = broadcast::channel(100);
|
2020-10-19 22:03:17 +08:00
|
|
|
let update_status_sender_cloned = update_status_sender.clone();
|
2020-10-20 21:00:58 +08:00
|
|
|
let index_cloned = index.clone();
|
|
|
|
let indexer_opt_cloned = opt.indexer.clone();
|
2020-10-19 22:03:17 +08:00
|
|
|
let update_store = UpdateStore::open(
|
|
|
|
update_store_options,
|
|
|
|
update_store_path,
|
2020-12-22 18:23:25 +08:00
|
|
|
// the type hint is necessary: https://github.com/rust-lang/rust/issues/32600
|
|
|
|
move |update_id, meta, content:&_| {
|
2020-10-27 03:18:10 +08:00
|
|
|
// We prepare the update by using the update builder.
|
|
|
|
let mut update_builder = UpdateBuilder::new();
|
|
|
|
if let Some(max_nb_chunks) = indexer_opt_cloned.max_nb_chunks {
|
|
|
|
update_builder.max_nb_chunks(max_nb_chunks);
|
|
|
|
}
|
|
|
|
if let Some(chunk_compression_level) = indexer_opt_cloned.chunk_compression_level {
|
|
|
|
update_builder.chunk_compression_level(chunk_compression_level);
|
|
|
|
}
|
2020-11-03 02:11:22 +08:00
|
|
|
update_builder.thread_pool(GLOBAL_THREAD_POOL.get().unwrap());
|
2020-10-27 03:18:10 +08:00
|
|
|
update_builder.log_every_n(indexer_opt_cloned.log_every_n);
|
2020-12-20 18:55:21 +08:00
|
|
|
update_builder.max_memory(indexer_opt_cloned.max_memory.get_bytes() as usize);
|
2020-10-27 03:18:10 +08:00
|
|
|
update_builder.linked_hash_map_size(indexer_opt_cloned.linked_hash_map_size);
|
|
|
|
update_builder.chunk_compression_type(indexer_opt_cloned.chunk_compression_type);
|
2020-12-20 18:55:21 +08:00
|
|
|
update_builder.chunk_fusing_shrink_size(indexer_opt_cloned.chunk_fusing_shrink_size.get_bytes());
|
2020-10-27 03:18:10 +08:00
|
|
|
|
2020-12-21 06:43:31 +08:00
|
|
|
let before_update = Instant::now();
|
2020-10-27 03:18:10 +08:00
|
|
|
// we extract the update type and execute the update itself.
|
|
|
|
let result: anyhow::Result<()> = match meta {
|
2020-12-21 06:10:09 +08:00
|
|
|
UpdateMeta::DocumentsAddition { method, format, encoding } => {
|
2020-10-24 22:23:08 +08:00
|
|
|
// We must use the write transaction of the update here.
|
2020-10-30 17:56:35 +08:00
|
|
|
let mut wtxn = index_cloned.write_txn()?;
|
2020-10-27 03:18:10 +08:00
|
|
|
let mut builder = update_builder.index_documents(&mut wtxn, &index_cloned);
|
|
|
|
|
2020-11-01 00:48:24 +08:00
|
|
|
match format.as_str() {
|
|
|
|
"csv" => builder.update_format(UpdateFormat::Csv),
|
2020-11-01 18:50:10 +08:00
|
|
|
"json" => builder.update_format(UpdateFormat::Json),
|
|
|
|
"json-stream" => builder.update_format(UpdateFormat::JsonStream),
|
2020-11-01 00:48:24 +08:00
|
|
|
otherwise => panic!("invalid update format {:?}", otherwise),
|
|
|
|
};
|
|
|
|
|
|
|
|
match method.as_str() {
|
|
|
|
"replace" => builder.index_documents_method(IndexDocumentsMethod::ReplaceDocuments),
|
|
|
|
"update" => builder.index_documents_method(IndexDocumentsMethod::UpdateDocuments),
|
|
|
|
otherwise => panic!("invalid indexing method {:?}", otherwise),
|
|
|
|
};
|
2020-10-24 22:23:08 +08:00
|
|
|
|
2020-12-21 06:10:09 +08:00
|
|
|
let reader = match encoding.as_deref() {
|
|
|
|
Some("gzip") => Box::new(GzDecoder::new(content)),
|
|
|
|
None => Box::new(content) as Box<dyn io::Read>,
|
|
|
|
otherwise => panic!("invalid encoding format {:?}", otherwise),
|
2020-10-21 21:38:28 +08:00
|
|
|
};
|
|
|
|
|
2020-11-11 20:14:16 +08:00
|
|
|
let result = builder.execute(reader, |indexing_step| {
|
|
|
|
let (current, total) = match indexing_step {
|
|
|
|
TransformFromUserIntoGenericFormat { documents_seen } => (documents_seen, None),
|
|
|
|
ComputeIdsAndMergeDocuments { documents_seen, total_documents } => (documents_seen, Some(total_documents)),
|
|
|
|
IndexDocuments { documents_seen, total_documents } => (documents_seen, Some(total_documents)),
|
|
|
|
MergeDataIntoFinalDatabase { databases_seen, total_databases } => (databases_seen, Some(total_databases)),
|
|
|
|
};
|
2020-10-27 03:18:10 +08:00
|
|
|
let _ = update_status_sender_cloned.send(UpdateStatus::Progressing {
|
|
|
|
update_id,
|
|
|
|
meta: UpdateMetaProgress::DocumentsAddition {
|
2020-11-11 20:14:16 +08:00
|
|
|
step: indexing_step.step(),
|
|
|
|
total_steps: indexing_step.number_of_steps(),
|
|
|
|
current,
|
|
|
|
total,
|
2020-10-27 03:18:10 +08:00
|
|
|
}
|
|
|
|
});
|
|
|
|
});
|
|
|
|
|
|
|
|
match result {
|
|
|
|
Ok(()) => wtxn.commit().map_err(Into::into),
|
|
|
|
Err(e) => Err(e.into())
|
|
|
|
}
|
2020-10-24 22:23:08 +08:00
|
|
|
},
|
2020-10-30 20:12:55 +08:00
|
|
|
UpdateMeta::ClearDocuments => {
|
|
|
|
// We must use the write transaction of the update here.
|
|
|
|
let mut wtxn = index_cloned.write_txn()?;
|
|
|
|
let builder = update_builder.clear_documents(&mut wtxn, &index_cloned);
|
|
|
|
|
|
|
|
match builder.execute() {
|
|
|
|
Ok(_count) => wtxn.commit().map_err(Into::into),
|
|
|
|
Err(e) => Err(e.into())
|
|
|
|
}
|
2020-11-02 22:31:20 +08:00
|
|
|
},
|
2020-11-02 22:47:21 +08:00
|
|
|
UpdateMeta::Settings(settings) => {
|
|
|
|
// We must use the write transaction of the update here.
|
|
|
|
let mut wtxn = index_cloned.write_txn()?;
|
|
|
|
let mut builder = update_builder.settings(&mut wtxn, &index_cloned);
|
|
|
|
|
2020-11-04 02:35:55 +08:00
|
|
|
// We transpose the settings JSON struct into a real setting update.
|
|
|
|
if let Some(names) = settings.searchable_attributes {
|
|
|
|
match names {
|
|
|
|
Some(names) => builder.set_searchable_fields(names),
|
|
|
|
None => builder.reset_searchable_fields(),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-11-02 22:47:21 +08:00
|
|
|
// We transpose the settings JSON struct into a real setting update.
|
|
|
|
if let Some(names) = settings.displayed_attributes {
|
|
|
|
match names {
|
|
|
|
Some(names) => builder.set_displayed_fields(names),
|
|
|
|
None => builder.reset_displayed_fields(),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-11-13 23:16:07 +08:00
|
|
|
// We transpose the settings JSON struct into a real setting update.
|
|
|
|
if let Some(facet_types) = settings.faceted_attributes {
|
|
|
|
builder.set_faceted_fields(facet_types);
|
|
|
|
}
|
|
|
|
|
2020-12-04 19:02:22 +08:00
|
|
|
// We transpose the settings JSON struct into a real setting update.
|
|
|
|
if let Some(criteria) = settings.criteria {
|
|
|
|
match criteria {
|
|
|
|
Some(criteria) => builder.set_criteria(criteria),
|
|
|
|
None => builder.reset_criteria(),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-11-11 20:14:16 +08:00
|
|
|
let result = builder.execute(|indexing_step| {
|
|
|
|
let (current, total) = match indexing_step {
|
|
|
|
TransformFromUserIntoGenericFormat { documents_seen } => (documents_seen, None),
|
|
|
|
ComputeIdsAndMergeDocuments { documents_seen, total_documents } => (documents_seen, Some(total_documents)),
|
|
|
|
IndexDocuments { documents_seen, total_documents } => (documents_seen, Some(total_documents)),
|
|
|
|
MergeDataIntoFinalDatabase { databases_seen, total_databases } => (databases_seen, Some(total_databases)),
|
|
|
|
};
|
2020-11-03 20:20:11 +08:00
|
|
|
let _ = update_status_sender_cloned.send(UpdateStatus::Progressing {
|
|
|
|
update_id,
|
|
|
|
meta: UpdateMetaProgress::DocumentsAddition {
|
2020-11-11 20:14:16 +08:00
|
|
|
step: indexing_step.step(),
|
|
|
|
total_steps: indexing_step.number_of_steps(),
|
|
|
|
current,
|
|
|
|
total,
|
2020-11-03 20:20:11 +08:00
|
|
|
}
|
|
|
|
});
|
|
|
|
});
|
|
|
|
|
|
|
|
match result {
|
2020-11-02 22:47:21 +08:00
|
|
|
Ok(_count) => wtxn.commit().map_err(Into::into),
|
|
|
|
Err(e) => Err(e.into())
|
|
|
|
}
|
2020-11-18 04:19:25 +08:00
|
|
|
},
|
2020-11-23 20:08:57 +08:00
|
|
|
UpdateMeta::Facets(levels) => {
|
2020-11-18 04:19:25 +08:00
|
|
|
// We must use the write transaction of the update here.
|
|
|
|
let mut wtxn = index_cloned.write_txn()?;
|
2020-11-23 20:08:57 +08:00
|
|
|
let mut builder = update_builder.facets(&mut wtxn, &index_cloned);
|
2020-11-28 19:43:43 +08:00
|
|
|
if let Some(value) = levels.level_group_size {
|
|
|
|
builder.level_group_size(value);
|
2020-11-18 04:19:25 +08:00
|
|
|
}
|
2020-11-28 19:43:43 +08:00
|
|
|
if let Some(value) = levels.min_level_size {
|
|
|
|
builder.min_level_size(value);
|
2020-11-18 04:19:25 +08:00
|
|
|
}
|
|
|
|
match builder.execute() {
|
|
|
|
Ok(()) => wtxn.commit().map_err(Into::into),
|
|
|
|
Err(e) => Err(e.into())
|
|
|
|
}
|
2020-10-21 21:38:28 +08:00
|
|
|
}
|
|
|
|
};
|
2020-10-20 21:00:58 +08:00
|
|
|
|
|
|
|
let meta = match result {
|
2020-12-21 06:43:31 +08:00
|
|
|
Ok(()) => format!("valid update content processed in {:.02?}", before_update.elapsed()),
|
2020-10-28 18:17:36 +08:00
|
|
|
Err(e) => format!("error while processing update content: {:?}", e),
|
2020-10-20 21:00:58 +08:00
|
|
|
};
|
2020-10-20 18:28:10 +08:00
|
|
|
|
2020-10-20 18:09:38 +08:00
|
|
|
let processed = UpdateStatus::Processed { update_id, meta: meta.clone() };
|
|
|
|
let _ = update_status_sender_cloned.send(processed);
|
2020-10-20 21:00:58 +08:00
|
|
|
|
2020-10-19 22:03:17 +08:00
|
|
|
Ok(meta)
|
|
|
|
})?;
|
|
|
|
|
2020-10-20 21:00:58 +08:00
|
|
|
// The database name will not change.
|
2020-07-11 20:17:37 +08:00
|
|
|
let db_name = opt.database.file_stem().and_then(|s| s.to_str()).unwrap_or("").to_string();
|
2020-10-20 21:00:58 +08:00
|
|
|
let lmdb_path = opt.database.join("data.mdb");
|
2020-07-11 20:17:37 +08:00
|
|
|
|
2020-05-31 23:48:13 +08:00
|
|
|
// We run and wait on the HTTP server
|
|
|
|
|
|
|
|
// Expose an HTML page to debug the search in a browser
|
2020-10-20 01:57:15 +08:00
|
|
|
let db_name_cloned = db_name.clone();
|
2020-10-20 21:00:58 +08:00
|
|
|
let lmdb_path_cloned = lmdb_path.clone();
|
|
|
|
let index_cloned = index.clone();
|
2020-05-31 23:48:13 +08:00
|
|
|
let dash_html_route = warp::filters::method::get()
|
|
|
|
.and(warp::filters::path::end())
|
2020-10-20 21:00:58 +08:00
|
|
|
.map(move || {
|
|
|
|
// We retrieve the database size.
|
|
|
|
let db_size = File::open(lmdb_path_cloned.clone())
|
|
|
|
.unwrap()
|
|
|
|
.metadata()
|
|
|
|
.unwrap()
|
|
|
|
.len() as usize;
|
|
|
|
|
|
|
|
// And the number of documents in the database.
|
2020-10-30 18:42:00 +08:00
|
|
|
let rtxn = index_cloned.read_txn().unwrap();
|
2020-10-20 21:00:58 +08:00
|
|
|
let docs_count = index_cloned.clone().number_of_documents(&rtxn).unwrap() as usize;
|
|
|
|
|
|
|
|
IndexTemplate { db_name: db_name_cloned.clone(), db_size, docs_count }
|
|
|
|
});
|
2020-10-20 01:57:15 +08:00
|
|
|
|
|
|
|
let update_store_cloned = update_store.clone();
|
2020-10-20 21:00:58 +08:00
|
|
|
let lmdb_path_cloned = lmdb_path.clone();
|
|
|
|
let index_cloned = index.clone();
|
2020-10-20 01:57:15 +08:00
|
|
|
let updates_list_or_html_route = warp::filters::method::get()
|
|
|
|
.and(warp::header("Accept"))
|
|
|
|
.and(warp::path!("updates"))
|
|
|
|
.map(move |header: String| {
|
|
|
|
let update_store = update_store_cloned.clone();
|
2020-11-29 19:23:52 +08:00
|
|
|
let mut updates = update_store.iter_metas(|processed, aborted, pending| {
|
2020-10-21 21:38:28 +08:00
|
|
|
let mut updates = Vec::<UpdateStatus<_, UpdateMetaProgress, _>>::new();
|
2020-10-20 01:57:15 +08:00
|
|
|
for result in processed {
|
2020-10-20 18:09:38 +08:00
|
|
|
let (uid, meta) = result?;
|
|
|
|
updates.push(UpdateStatus::Processed { update_id: uid.get(), meta });
|
2020-10-20 01:57:15 +08:00
|
|
|
}
|
2020-11-29 19:23:52 +08:00
|
|
|
for result in aborted {
|
|
|
|
let (uid, meta) = result?;
|
|
|
|
updates.push(UpdateStatus::Aborted { update_id: uid.get(), meta });
|
|
|
|
}
|
2020-10-20 01:57:15 +08:00
|
|
|
for result in pending {
|
2020-10-20 18:09:38 +08:00
|
|
|
let (uid, meta) = result?;
|
|
|
|
updates.push(UpdateStatus::Pending { update_id: uid.get(), meta });
|
2020-10-20 01:57:15 +08:00
|
|
|
}
|
|
|
|
Ok(updates)
|
|
|
|
}).unwrap();
|
|
|
|
|
2020-11-29 19:23:52 +08:00
|
|
|
updates.sort_unstable_by(|s1, s2| s1.update_id().cmp(&s2.update_id()).reverse());
|
2020-10-20 21:00:58 +08:00
|
|
|
|
2020-11-29 19:23:52 +08:00
|
|
|
if header.contains("text/html") {
|
2020-10-20 21:00:58 +08:00
|
|
|
// We retrieve the database size.
|
|
|
|
let db_size = File::open(lmdb_path_cloned.clone())
|
|
|
|
.unwrap()
|
|
|
|
.metadata()
|
|
|
|
.unwrap()
|
|
|
|
.len() as usize;
|
|
|
|
|
|
|
|
// And the number of documents in the database.
|
2020-10-30 18:42:00 +08:00
|
|
|
let rtxn = index_cloned.read_txn().unwrap();
|
2020-10-20 21:00:58 +08:00
|
|
|
let docs_count = index_cloned.clone().number_of_documents(&rtxn).unwrap() as usize;
|
|
|
|
|
2020-10-20 18:09:38 +08:00
|
|
|
let template = UpdatesTemplate {
|
|
|
|
db_name: db_name.clone(),
|
|
|
|
db_size,
|
|
|
|
docs_count,
|
|
|
|
updates,
|
|
|
|
};
|
2020-10-20 01:57:15 +08:00
|
|
|
Box::new(template) as Box<dyn warp::Reply>
|
|
|
|
} else {
|
|
|
|
Box::new(warp::reply::json(&updates))
|
|
|
|
}
|
|
|
|
});
|
2020-05-31 23:48:13 +08:00
|
|
|
|
|
|
|
let dash_bulma_route = warp::filters::method::get()
|
|
|
|
.and(warp::path!("bulma.min.css"))
|
|
|
|
.map(|| Response::builder()
|
|
|
|
.header("content-type", "text/css; charset=utf-8")
|
2020-11-05 18:16:39 +08:00
|
|
|
.body(include_str!("../public/bulma.min.css"))
|
2020-05-31 23:48:13 +08:00
|
|
|
);
|
|
|
|
|
2020-07-14 05:51:41 +08:00
|
|
|
let dash_bulma_dark_route = warp::filters::method::get()
|
|
|
|
.and(warp::path!("bulma-prefers-dark.min.css"))
|
|
|
|
.map(|| Response::builder()
|
|
|
|
.header("content-type", "text/css; charset=utf-8")
|
2020-11-05 18:16:39 +08:00
|
|
|
.body(include_str!("../public/bulma-prefers-dark.min.css"))
|
2020-07-14 05:51:41 +08:00
|
|
|
);
|
|
|
|
|
2020-07-11 17:48:27 +08:00
|
|
|
let dash_style_route = warp::filters::method::get()
|
|
|
|
.and(warp::path!("style.css"))
|
|
|
|
.map(|| Response::builder()
|
|
|
|
.header("content-type", "text/css; charset=utf-8")
|
2020-11-05 18:16:39 +08:00
|
|
|
.body(include_str!("../public/style.css"))
|
2020-07-11 17:48:27 +08:00
|
|
|
);
|
|
|
|
|
2020-05-31 23:48:13 +08:00
|
|
|
let dash_jquery_route = warp::filters::method::get()
|
|
|
|
.and(warp::path!("jquery-3.4.1.min.js"))
|
|
|
|
.map(|| Response::builder()
|
|
|
|
.header("content-type", "application/javascript; charset=utf-8")
|
2020-11-05 18:16:39 +08:00
|
|
|
.body(include_str!("../public/jquery-3.4.1.min.js"))
|
2020-05-31 23:48:13 +08:00
|
|
|
);
|
|
|
|
|
2020-07-11 20:17:37 +08:00
|
|
|
let dash_filesize_route = warp::filters::method::get()
|
|
|
|
.and(warp::path!("filesize.min.js"))
|
|
|
|
.map(|| Response::builder()
|
|
|
|
.header("content-type", "application/javascript; charset=utf-8")
|
2020-11-05 18:16:39 +08:00
|
|
|
.body(include_str!("../public/filesize.min.js"))
|
2020-07-11 20:17:37 +08:00
|
|
|
);
|
|
|
|
|
2020-07-11 17:48:27 +08:00
|
|
|
let dash_script_route = warp::filters::method::get()
|
|
|
|
.and(warp::path!("script.js"))
|
|
|
|
.map(|| Response::builder()
|
|
|
|
.header("content-type", "application/javascript; charset=utf-8")
|
2020-11-05 18:16:39 +08:00
|
|
|
.body(include_str!("../public/script.js"))
|
2020-07-11 17:48:27 +08:00
|
|
|
);
|
|
|
|
|
2020-10-20 01:57:15 +08:00
|
|
|
let updates_script_route = warp::filters::method::get()
|
|
|
|
.and(warp::path!("updates-script.js"))
|
|
|
|
.map(|| Response::builder()
|
|
|
|
.header("content-type", "application/javascript; charset=utf-8")
|
2020-11-05 18:16:39 +08:00
|
|
|
.body(include_str!("../public/updates-script.js"))
|
2020-10-20 01:57:15 +08:00
|
|
|
);
|
|
|
|
|
2020-07-16 05:51:12 +08:00
|
|
|
let dash_logo_white_route = warp::filters::method::get()
|
|
|
|
.and(warp::path!("logo-white.svg"))
|
|
|
|
.map(|| Response::builder()
|
|
|
|
.header("content-type", "image/svg+xml")
|
2020-11-05 18:16:39 +08:00
|
|
|
.body(include_str!("../public/logo-white.svg"))
|
2020-07-16 05:51:12 +08:00
|
|
|
);
|
|
|
|
|
|
|
|
let dash_logo_black_route = warp::filters::method::get()
|
|
|
|
.and(warp::path!("logo-black.svg"))
|
|
|
|
.map(|| Response::builder()
|
|
|
|
.header("content-type", "image/svg+xml")
|
2020-11-05 18:16:39 +08:00
|
|
|
.body(include_str!("../public/logo-black.svg"))
|
2020-07-16 05:51:12 +08:00
|
|
|
);
|
|
|
|
|
2020-11-14 20:21:22 +08:00
|
|
|
#[derive(Debug, Deserialize)]
|
|
|
|
#[serde(deny_unknown_fields)]
|
|
|
|
#[serde(rename_all = "camelCase")]
|
2020-05-31 23:48:13 +08:00
|
|
|
struct QueryBody {
|
2020-10-06 20:52:05 +08:00
|
|
|
query: Option<String>,
|
2020-11-14 20:21:22 +08:00
|
|
|
facet_condition: Option<String>,
|
2020-05-31 23:48:13 +08:00
|
|
|
}
|
|
|
|
|
2020-07-14 17:27:46 +08:00
|
|
|
let disable_highlighting = opt.disable_highlighting;
|
2020-11-11 00:00:38 +08:00
|
|
|
let index_cloned = index.clone();
|
2020-05-31 23:48:13 +08:00
|
|
|
let query_route = warp::filters::method::post()
|
|
|
|
.and(warp::path!("query"))
|
|
|
|
.and(warp::body::json())
|
|
|
|
.map(move |query: QueryBody| {
|
|
|
|
let before_search = Instant::now();
|
2020-11-11 00:00:38 +08:00
|
|
|
let index = index_cloned.clone();
|
2020-10-30 17:56:35 +08:00
|
|
|
let rtxn = index.read_txn().unwrap();
|
2020-05-31 23:48:13 +08:00
|
|
|
|
2020-10-06 20:52:05 +08:00
|
|
|
let mut search = index.search(&rtxn);
|
|
|
|
if let Some(query) = query.query {
|
|
|
|
search.query(query);
|
|
|
|
}
|
2020-11-14 20:21:22 +08:00
|
|
|
if let Some(condition) = query.facet_condition {
|
2020-11-21 20:09:49 +08:00
|
|
|
if !condition.trim().is_empty() {
|
|
|
|
let condition = FacetCondition::from_str(&rtxn, &index, &condition).unwrap();
|
2020-11-14 21:01:41 +08:00
|
|
|
search.facet_condition(condition);
|
|
|
|
}
|
2020-11-14 20:21:22 +08:00
|
|
|
}
|
2020-10-06 20:52:05 +08:00
|
|
|
|
|
|
|
let SearchResult { found_words, documents_ids } = search.execute().unwrap();
|
2020-05-31 23:48:13 +08:00
|
|
|
|
2020-10-22 00:26:29 +08:00
|
|
|
let mut documents = Vec::new();
|
2020-10-26 01:32:01 +08:00
|
|
|
let fields_ids_map = index.fields_ids_map(&rtxn).unwrap();
|
2021-01-21 00:27:43 +08:00
|
|
|
let displayed_fields = match index.displayed_fields_ids(&rtxn).unwrap() {
|
|
|
|
Some(fields) => fields,
|
|
|
|
None => fields_ids_map.iter().map(|(id, _)| id).collect(),
|
2020-11-02 20:01:32 +08:00
|
|
|
};
|
2020-11-05 20:58:07 +08:00
|
|
|
let attributes_to_highlight = match index.searchable_fields(&rtxn).unwrap() {
|
2021-01-21 00:27:43 +08:00
|
|
|
Some(fields) => fields.into_iter().map(String::from).collect(),
|
|
|
|
None => fields_ids_map.iter().map(|(_, name)| name).map(String::from).collect(),
|
2020-11-05 20:58:07 +08:00
|
|
|
};
|
2020-10-22 20:23:33 +08:00
|
|
|
|
2020-12-24 03:04:19 +08:00
|
|
|
let stop_words = fst::Set::default();
|
|
|
|
let highlighter = Highlighter::new(&stop_words);
|
|
|
|
|
2020-11-05 20:58:07 +08:00
|
|
|
for (_id, obkv) in index.documents(&rtxn, documents_ids).unwrap() {
|
|
|
|
let mut object = obkv_to_json(&displayed_fields, &fields_ids_map, obkv).unwrap();
|
2020-10-22 20:23:33 +08:00
|
|
|
if !disable_highlighting {
|
2020-12-24 03:04:19 +08:00
|
|
|
highlighter.highlight_record(&mut object, &found_words, &attributes_to_highlight);
|
2020-10-22 00:26:29 +08:00
|
|
|
}
|
2020-10-22 20:23:33 +08:00
|
|
|
|
2020-11-05 20:58:07 +08:00
|
|
|
documents.push(object);
|
2020-10-22 00:26:29 +08:00
|
|
|
}
|
2020-05-31 23:48:13 +08:00
|
|
|
|
|
|
|
Response::builder()
|
2020-10-22 00:26:29 +08:00
|
|
|
.header("Content-Type", "application/json")
|
2020-05-31 23:48:13 +08:00
|
|
|
.header("Time-Ms", before_search.elapsed().as_millis().to_string())
|
2020-10-22 00:26:29 +08:00
|
|
|
.body(serde_json::to_string(&documents).unwrap())
|
2020-05-31 23:48:13 +08:00
|
|
|
});
|
|
|
|
|
2020-11-11 00:00:38 +08:00
|
|
|
let index_cloned = index.clone();
|
|
|
|
let document_route = warp::filters::method::get()
|
|
|
|
.and(warp::path!("document" / String))
|
|
|
|
.map(move |id: String| {
|
|
|
|
let index = index_cloned.clone();
|
|
|
|
let rtxn = index.read_txn().unwrap();
|
|
|
|
|
2020-11-22 18:54:04 +08:00
|
|
|
let external_documents_ids = index.external_documents_ids(&rtxn).unwrap();
|
2020-11-11 00:00:38 +08:00
|
|
|
let fields_ids_map = index.fields_ids_map(&rtxn).unwrap();
|
2021-01-21 00:27:43 +08:00
|
|
|
let displayed_fields = match index.displayed_fields_ids(&rtxn).unwrap() {
|
|
|
|
Some(fields) => fields,
|
|
|
|
None => fields_ids_map.iter().map(|(id, _)| id).collect(),
|
2020-11-11 00:00:38 +08:00
|
|
|
};
|
|
|
|
|
2020-11-22 18:54:04 +08:00
|
|
|
match external_documents_ids.get(&id) {
|
2020-11-11 00:00:38 +08:00
|
|
|
Some(document_id) => {
|
|
|
|
let document_id = document_id as u32;
|
|
|
|
let (_, obkv) = index.documents(&rtxn, Some(document_id)).unwrap().pop().unwrap();
|
|
|
|
let document = obkv_to_json(&displayed_fields, &fields_ids_map, obkv).unwrap();
|
|
|
|
|
|
|
|
Response::builder()
|
|
|
|
.header("Content-Type", "application/json")
|
|
|
|
.body(serde_json::to_string(&document).unwrap())
|
|
|
|
},
|
|
|
|
None => {
|
|
|
|
Response::builder()
|
|
|
|
.status(404)
|
|
|
|
.body(format!("Document with id {:?} not found.", id))
|
|
|
|
},
|
|
|
|
}
|
|
|
|
});
|
|
|
|
|
2020-10-19 22:03:17 +08:00
|
|
|
async fn buf_stream(
|
2020-10-21 21:38:28 +08:00
|
|
|
update_store: Arc<UpdateStore<UpdateMeta, String>>,
|
|
|
|
update_status_sender: broadcast::Sender<UpdateStatus<UpdateMeta, UpdateMetaProgress, String>>,
|
2020-11-01 00:48:24 +08:00
|
|
|
update_method: Option<String>,
|
|
|
|
update_format: UpdateFormat,
|
2020-12-21 06:10:09 +08:00
|
|
|
encoding: Option<String>,
|
2020-10-19 22:03:17 +08:00
|
|
|
mut stream: impl futures::Stream<Item=Result<impl bytes::Buf, warp::Error>> + Unpin,
|
|
|
|
) -> Result<impl warp::Reply, warp::Rejection>
|
|
|
|
{
|
|
|
|
let file = tokio::task::block_in_place(tempfile::tempfile).unwrap();
|
2020-12-21 06:10:09 +08:00
|
|
|
let mut file = TFile::from_std(file);
|
2020-10-19 22:03:17 +08:00
|
|
|
|
|
|
|
while let Some(result) = stream.next().await {
|
|
|
|
let bytes = result.unwrap().to_bytes();
|
2020-12-21 06:10:09 +08:00
|
|
|
file.write_all(&bytes[..]).await.unwrap();
|
2020-10-19 22:03:17 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
let file = file.into_std().await;
|
2020-12-24 03:04:19 +08:00
|
|
|
let mmap = unsafe { memmap::Mmap::map(&file).expect("can't map file") };
|
2020-10-19 22:03:17 +08:00
|
|
|
|
2020-11-01 00:48:24 +08:00
|
|
|
let method = match update_method.as_deref() {
|
|
|
|
Some("replace") => String::from("replace"),
|
|
|
|
Some("update") => String::from("update"),
|
|
|
|
_ => String::from("replace"),
|
|
|
|
};
|
|
|
|
|
|
|
|
let format = match update_format {
|
|
|
|
UpdateFormat::Csv => String::from("csv"),
|
|
|
|
UpdateFormat::Json => String::from("json"),
|
2020-11-01 18:50:10 +08:00
|
|
|
UpdateFormat::JsonStream => String::from("json-stream"),
|
2020-11-05 18:16:39 +08:00
|
|
|
_ => panic!("Unknown update format"),
|
2020-11-01 00:48:24 +08:00
|
|
|
};
|
|
|
|
|
2020-12-21 06:10:09 +08:00
|
|
|
let meta = UpdateMeta::DocumentsAddition { method, format, encoding };
|
2020-10-20 18:09:38 +08:00
|
|
|
let update_id = update_store.register_update(&meta, &mmap[..]).unwrap();
|
2020-10-20 21:14:06 +08:00
|
|
|
let _ = update_status_sender.send(UpdateStatus::Pending { update_id, meta });
|
2020-10-20 18:09:38 +08:00
|
|
|
eprintln!("update {} registered", update_id);
|
2020-10-19 22:03:17 +08:00
|
|
|
|
|
|
|
Ok(warp::reply())
|
|
|
|
}
|
|
|
|
|
2020-11-01 00:48:24 +08:00
|
|
|
#[derive(Deserialize)]
|
|
|
|
struct QueryUpdate {
|
|
|
|
method: Option<String>,
|
|
|
|
}
|
|
|
|
|
2020-10-19 22:03:17 +08:00
|
|
|
let update_store_cloned = update_store.clone();
|
2020-10-20 17:19:34 +08:00
|
|
|
let update_status_sender_cloned = update_status_sender.clone();
|
2020-12-21 06:10:09 +08:00
|
|
|
let indexing_route = warp::filters::method::post()
|
2020-11-01 00:48:24 +08:00
|
|
|
.and(warp::path!("documents"))
|
2020-12-21 06:10:09 +08:00
|
|
|
.and(warp::header::header("content-type"))
|
|
|
|
.and(warp::header::optional::<String>("content-encoding"))
|
|
|
|
.and(warp::query::query())
|
2020-10-19 22:03:17 +08:00
|
|
|
.and(warp::body::stream())
|
2020-12-21 06:10:09 +08:00
|
|
|
.and_then(move |content_type: String, content_encoding, params: QueryUpdate, stream| {
|
|
|
|
let format = match content_type.as_str() {
|
|
|
|
"text/csv" => UpdateFormat::Csv,
|
|
|
|
"application/json" => UpdateFormat::Json,
|
|
|
|
"application/x-ndjson" => UpdateFormat::JsonStream,
|
|
|
|
otherwise => panic!("invalid update format: {}", otherwise),
|
|
|
|
};
|
2020-10-19 22:03:17 +08:00
|
|
|
|
2020-11-01 18:50:10 +08:00
|
|
|
buf_stream(
|
|
|
|
update_store_cloned.clone(),
|
|
|
|
update_status_sender_cloned.clone(),
|
|
|
|
params.method,
|
2020-12-21 06:10:09 +08:00
|
|
|
format,
|
|
|
|
content_encoding,
|
2020-11-01 18:50:10 +08:00
|
|
|
stream,
|
|
|
|
)
|
|
|
|
});
|
|
|
|
|
2020-11-02 22:30:29 +08:00
|
|
|
let update_store_cloned = update_store.clone();
|
2020-10-21 21:38:28 +08:00
|
|
|
let update_status_sender_cloned = update_status_sender.clone();
|
2020-10-30 20:12:55 +08:00
|
|
|
let clearing_route = warp::filters::method::post()
|
|
|
|
.and(warp::path!("clear-documents"))
|
|
|
|
.map(move || {
|
|
|
|
let meta = UpdateMeta::ClearDocuments;
|
2020-11-02 22:47:21 +08:00
|
|
|
let update_id = update_store_cloned.register_update(&meta, &[]).unwrap();
|
|
|
|
let _ = update_status_sender_cloned.send(UpdateStatus::Pending { update_id, meta });
|
|
|
|
eprintln!("update {} registered", update_id);
|
|
|
|
Ok(warp::reply())
|
|
|
|
});
|
|
|
|
|
|
|
|
let update_store_cloned = update_store.clone();
|
|
|
|
let update_status_sender_cloned = update_status_sender.clone();
|
|
|
|
let change_settings_route = warp::filters::method::post()
|
|
|
|
.and(warp::path!("settings"))
|
|
|
|
.and(warp::body::json())
|
|
|
|
.map(move |settings: Settings| {
|
|
|
|
let meta = UpdateMeta::Settings(settings);
|
|
|
|
let update_id = update_store_cloned.register_update(&meta, &[]).unwrap();
|
2020-10-21 21:38:28 +08:00
|
|
|
let _ = update_status_sender_cloned.send(UpdateStatus::Pending { update_id, meta });
|
|
|
|
eprintln!("update {} registered", update_id);
|
|
|
|
Ok(warp::reply())
|
|
|
|
});
|
|
|
|
|
2020-11-18 04:19:25 +08:00
|
|
|
let update_store_cloned = update_store.clone();
|
|
|
|
let update_status_sender_cloned = update_status_sender.clone();
|
|
|
|
let change_facet_levels_route = warp::filters::method::post()
|
2020-11-28 19:43:43 +08:00
|
|
|
.and(warp::path!("facet-level-sizes"))
|
2020-11-18 04:19:25 +08:00
|
|
|
.and(warp::body::json())
|
2020-11-23 20:08:57 +08:00
|
|
|
.map(move |levels: Facets| {
|
|
|
|
let meta = UpdateMeta::Facets(levels);
|
2020-11-18 04:19:25 +08:00
|
|
|
let update_id = update_store_cloned.register_update(&meta, &[]).unwrap();
|
|
|
|
let _ = update_status_sender_cloned.send(UpdateStatus::Pending { update_id, meta });
|
|
|
|
eprintln!("update {} registered", update_id);
|
|
|
|
warp::reply()
|
|
|
|
});
|
|
|
|
|
2020-11-29 19:23:52 +08:00
|
|
|
let update_store_cloned = update_store.clone();
|
|
|
|
let update_status_sender_cloned = update_status_sender.clone();
|
|
|
|
let abort_update_id_route = warp::filters::method::delete()
|
|
|
|
.and(warp::path!("update" / u64))
|
|
|
|
.map(move |update_id: u64| {
|
|
|
|
if let Some(meta) = update_store_cloned.abort_update(update_id).unwrap() {
|
|
|
|
let _ = update_status_sender_cloned.send(UpdateStatus::Aborted { update_id, meta });
|
|
|
|
eprintln!("update {} aborted", update_id);
|
|
|
|
}
|
|
|
|
warp::reply()
|
|
|
|
});
|
|
|
|
|
|
|
|
let update_store_cloned = update_store.clone();
|
|
|
|
let update_status_sender_cloned = update_status_sender.clone();
|
|
|
|
let abort_pending_updates_route = warp::filters::method::delete()
|
|
|
|
.and(warp::path!("updates"))
|
|
|
|
.map(move || {
|
|
|
|
let updates = update_store_cloned.abort_pendings().unwrap();
|
|
|
|
for (update_id, meta) in updates {
|
|
|
|
let _ = update_status_sender_cloned.send(UpdateStatus::Aborted { update_id, meta });
|
|
|
|
eprintln!("update {} aborted", update_id);
|
|
|
|
}
|
|
|
|
warp::reply()
|
|
|
|
});
|
|
|
|
|
2020-10-19 22:03:17 +08:00
|
|
|
let update_ws_route = warp::ws()
|
|
|
|
.and(warp::path!("updates" / "ws"))
|
|
|
|
.map(move |ws: warp::ws::Ws| {
|
|
|
|
// And then our closure will be called when it completes...
|
2020-10-20 17:19:34 +08:00
|
|
|
let update_status_receiver = update_status_sender.subscribe();
|
2020-10-19 22:03:17 +08:00
|
|
|
ws.on_upgrade(|websocket| {
|
|
|
|
// Just echo all updates messages...
|
2020-10-20 17:19:34 +08:00
|
|
|
update_status_receiver
|
|
|
|
.into_stream()
|
|
|
|
.flat_map(|result| {
|
2020-10-20 18:09:38 +08:00
|
|
|
match result {
|
|
|
|
Ok(status) => {
|
|
|
|
let msg = serde_json::to_string(&status).unwrap();
|
|
|
|
stream::iter(Some(Ok(Message::text(msg))))
|
|
|
|
},
|
2020-10-20 17:19:34 +08:00
|
|
|
Err(e) => {
|
|
|
|
eprintln!("channel error: {:?}", e);
|
|
|
|
stream::iter(None)
|
|
|
|
},
|
|
|
|
}
|
|
|
|
})
|
2020-10-19 22:03:17 +08:00
|
|
|
.forward(websocket)
|
|
|
|
.map(|result| {
|
|
|
|
if let Err(e) = result {
|
|
|
|
eprintln!("websocket error: {:?}", e);
|
|
|
|
}
|
|
|
|
})
|
|
|
|
})
|
|
|
|
});
|
|
|
|
|
2020-05-31 23:48:13 +08:00
|
|
|
let routes = dash_html_route
|
2020-10-20 01:57:15 +08:00
|
|
|
.or(updates_list_or_html_route)
|
2020-05-31 23:48:13 +08:00
|
|
|
.or(dash_bulma_route)
|
2020-07-14 05:51:41 +08:00
|
|
|
.or(dash_bulma_dark_route)
|
2020-07-11 17:48:27 +08:00
|
|
|
.or(dash_style_route)
|
2020-05-31 23:48:13 +08:00
|
|
|
.or(dash_jquery_route)
|
2020-07-11 20:17:37 +08:00
|
|
|
.or(dash_filesize_route)
|
2020-07-11 17:48:27 +08:00
|
|
|
.or(dash_script_route)
|
2020-10-20 01:57:15 +08:00
|
|
|
.or(updates_script_route)
|
2020-07-16 05:51:12 +08:00
|
|
|
.or(dash_logo_white_route)
|
|
|
|
.or(dash_logo_black_route)
|
2020-10-19 22:03:17 +08:00
|
|
|
.or(query_route)
|
2020-11-11 00:00:38 +08:00
|
|
|
.or(document_route)
|
2020-12-21 06:10:09 +08:00
|
|
|
.or(indexing_route)
|
2020-11-29 19:23:52 +08:00
|
|
|
.or(abort_update_id_route)
|
|
|
|
.or(abort_pending_updates_route)
|
2020-10-30 20:12:55 +08:00
|
|
|
.or(clearing_route)
|
2020-11-02 22:30:29 +08:00
|
|
|
.or(change_settings_route)
|
2020-11-18 04:19:25 +08:00
|
|
|
.or(change_facet_levels_route)
|
2020-10-20 01:57:15 +08:00
|
|
|
.or(update_ws_route);
|
2020-05-31 23:48:13 +08:00
|
|
|
|
2020-10-19 19:44:17 +08:00
|
|
|
let addr = SocketAddr::from_str(&opt.http_listen_addr)?;
|
2020-11-05 18:16:39 +08:00
|
|
|
Ok(warp::serve(routes).run(addr).await)
|
2020-05-31 23:48:13 +08:00
|
|
|
}
|