2020-08-05 19:52:27 +08:00
|
|
|
use std::collections::HashSet;
|
2020-10-19 22:03:17 +08:00
|
|
|
use std::fs::{File, create_dir_all};
|
2020-10-22 00:26:29 +08:00
|
|
|
use std::mem;
|
2020-05-31 23:48:13 +08:00
|
|
|
use std::net::SocketAddr;
|
|
|
|
use std::path::PathBuf;
|
|
|
|
use std::str::FromStr;
|
2020-10-19 22:03:17 +08:00
|
|
|
use std::sync::Arc;
|
2020-05-31 23:48:13 +08:00
|
|
|
use std::time::Instant;
|
|
|
|
|
2020-07-11 20:17:37 +08:00
|
|
|
use askama_warp::Template;
|
2020-10-20 17:19:34 +08:00
|
|
|
use futures::stream;
|
2020-10-22 00:26:29 +08:00
|
|
|
use futures::{FutureExt, StreamExt};
|
2020-05-31 23:48:13 +08:00
|
|
|
use heed::EnvOpenOptions;
|
2020-10-22 00:26:29 +08:00
|
|
|
use indexmap::IndexMap;
|
2020-10-20 18:09:38 +08:00
|
|
|
use serde::{Serialize, Deserialize};
|
2020-05-31 23:48:13 +08:00
|
|
|
use structopt::StructOpt;
|
2020-10-19 22:03:17 +08:00
|
|
|
use tokio::fs::File as TFile;
|
|
|
|
use tokio::io::AsyncWriteExt;
|
2020-10-20 17:19:34 +08:00
|
|
|
use tokio::sync::broadcast;
|
2020-10-19 22:03:17 +08:00
|
|
|
use warp::filters::ws::Message;
|
2020-05-31 23:48:13 +08:00
|
|
|
use warp::{Filter, http::Response};
|
|
|
|
|
2020-10-20 21:00:58 +08:00
|
|
|
use crate::indexing::{self, IndexerOpt};
|
2020-10-19 19:44:17 +08:00
|
|
|
use crate::tokenizer::{simple_tokenizer, TokenType};
|
2020-10-19 22:03:17 +08:00
|
|
|
use crate::{Index, UpdateStore, SearchResult};
|
2020-06-11 04:05:01 +08:00
|
|
|
|
2020-05-31 23:48:13 +08:00
|
|
|
#[derive(Debug, StructOpt)]
|
2020-10-19 19:44:17 +08:00
|
|
|
/// The HTTP main server of the milli project.
|
|
|
|
pub struct Opt {
|
2020-05-31 23:48:13 +08:00
|
|
|
/// The database path where the LMDB database is located.
|
|
|
|
/// It is created if it doesn't already exist.
|
|
|
|
#[structopt(long = "db", parse(from_os_str))]
|
|
|
|
database: PathBuf,
|
|
|
|
|
|
|
|
/// The maximum size the database can take on disk. It is recommended to specify
|
|
|
|
/// the whole disk space (value must be a multiple of a page size).
|
|
|
|
#[structopt(long = "db-size", default_value = "107374182400")] // 100 GB
|
|
|
|
database_size: usize,
|
|
|
|
|
2020-10-19 22:03:17 +08:00
|
|
|
/// The maximum size the database that stores the updates can take on disk. It is recommended
|
|
|
|
/// to specify the whole disk space (value must be a multiple of a page size).
|
|
|
|
#[structopt(long = "udb-size", default_value = "10737418240")] // 10 GB
|
|
|
|
update_database_size: usize,
|
|
|
|
|
2020-07-14 17:27:46 +08:00
|
|
|
/// Disable document highlighting on the dashboard.
|
|
|
|
#[structopt(long)]
|
|
|
|
disable_highlighting: bool,
|
|
|
|
|
2020-07-12 17:04:35 +08:00
|
|
|
/// Verbose mode (-v, -vv, -vvv, etc.)
|
|
|
|
#[structopt(short, long, parse(from_occurrences))]
|
|
|
|
verbose: usize,
|
|
|
|
|
2020-05-31 23:48:13 +08:00
|
|
|
/// The ip and port on which the database will listen for HTTP requests.
|
|
|
|
#[structopt(short = "l", long, default_value = "127.0.0.1:9700")]
|
|
|
|
http_listen_addr: String,
|
2020-10-20 20:20:17 +08:00
|
|
|
|
|
|
|
#[structopt(flatten)]
|
|
|
|
indexer: IndexerOpt,
|
2020-05-31 23:48:13 +08:00
|
|
|
}
|
|
|
|
|
2020-10-22 00:26:29 +08:00
|
|
|
fn highlight_record(record: &mut IndexMap<String, String>, words: &HashSet<String>) {
|
|
|
|
for (_key, value) in record.iter_mut() {
|
|
|
|
let old_value = mem::take(value);
|
|
|
|
for (token_type, token) in simple_tokenizer(&old_value) {
|
2020-08-31 20:20:42 +08:00
|
|
|
if token_type == TokenType::Word {
|
|
|
|
let lowercase_token = token.to_lowercase();
|
|
|
|
let to_highlight = words.contains(&lowercase_token);
|
2020-10-22 00:26:29 +08:00
|
|
|
if to_highlight { value.push_str("<mark>") }
|
|
|
|
value.push_str(token);
|
|
|
|
if to_highlight { value.push_str("</mark>") }
|
2020-08-31 20:20:42 +08:00
|
|
|
} else {
|
2020-10-22 00:26:29 +08:00
|
|
|
value.push_str(token);
|
2020-08-31 20:20:42 +08:00
|
|
|
}
|
2020-08-31 03:50:30 +08:00
|
|
|
}
|
2020-08-05 19:52:27 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-07-11 20:17:37 +08:00
|
|
|
#[derive(Template)]
|
|
|
|
#[template(path = "index.html")]
|
|
|
|
struct IndexTemplate {
|
|
|
|
db_name: String,
|
|
|
|
db_size: usize,
|
|
|
|
docs_count: usize,
|
|
|
|
}
|
|
|
|
|
2020-10-20 01:57:15 +08:00
|
|
|
#[derive(Template)]
|
|
|
|
#[template(path = "updates.html")]
|
2020-10-21 21:38:28 +08:00
|
|
|
struct UpdatesTemplate<M: Serialize + Send, P: Serialize + Send, N: Serialize + Send> {
|
2020-10-20 01:57:15 +08:00
|
|
|
db_name: String,
|
2020-10-20 18:09:38 +08:00
|
|
|
db_size: usize,
|
|
|
|
docs_count: usize,
|
2020-10-21 21:38:28 +08:00
|
|
|
updates: Vec<UpdateStatus<M, P, N>>,
|
2020-10-20 18:09:38 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Debug, Clone, Serialize)]
|
|
|
|
#[serde(tag = "type")]
|
2020-10-21 21:38:28 +08:00
|
|
|
enum UpdateStatus<M, P, N> {
|
2020-10-20 18:09:38 +08:00
|
|
|
Pending { update_id: u64, meta: M },
|
2020-10-21 21:38:28 +08:00
|
|
|
Progressing { update_id: u64, meta: P },
|
|
|
|
Processed { update_id: u64, meta: N },
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
|
|
#[serde(tag = "type")]
|
|
|
|
enum UpdateMeta {
|
|
|
|
DocumentsAddition {
|
|
|
|
total_number_of_documents: Option<usize>,
|
|
|
|
},
|
|
|
|
DocumentsAdditionFromPath {
|
|
|
|
path: PathBuf,
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
|
|
#[serde(tag = "type")]
|
|
|
|
enum UpdateMetaProgress {
|
|
|
|
DocumentsAddition {
|
|
|
|
processed_number_of_documents: usize,
|
|
|
|
total_number_of_documents: Option<usize>,
|
|
|
|
},
|
2020-10-20 01:57:15 +08:00
|
|
|
}
|
|
|
|
|
2020-10-19 19:44:17 +08:00
|
|
|
pub fn run(opt: Opt) -> anyhow::Result<()> {
|
2020-07-12 17:04:35 +08:00
|
|
|
stderrlog::new()
|
|
|
|
.verbosity(opt.verbose)
|
|
|
|
.show_level(false)
|
|
|
|
.timestamp(stderrlog::Timestamp::Off)
|
|
|
|
.init()?;
|
|
|
|
|
2020-10-20 21:00:58 +08:00
|
|
|
create_dir_all(&opt.database)?;
|
2020-05-31 23:48:13 +08:00
|
|
|
let env = EnvOpenOptions::new()
|
|
|
|
.map_size(opt.database_size)
|
|
|
|
.max_dbs(10)
|
|
|
|
.open(&opt.database)?;
|
|
|
|
|
2020-08-07 19:11:31 +08:00
|
|
|
// Open the LMDB database.
|
2020-08-28 21:38:05 +08:00
|
|
|
let index = Index::new(&env)?;
|
2020-08-07 19:11:31 +08:00
|
|
|
|
2020-10-19 22:03:17 +08:00
|
|
|
// Setup the LMDB based update database.
|
|
|
|
let mut update_store_options = EnvOpenOptions::new();
|
|
|
|
update_store_options.map_size(opt.update_database_size);
|
|
|
|
|
|
|
|
let update_store_path = opt.database.join("updates.mdb");
|
|
|
|
create_dir_all(&update_store_path)?;
|
|
|
|
|
2020-10-20 17:19:34 +08:00
|
|
|
let (update_status_sender, _) = broadcast::channel(100);
|
2020-10-19 22:03:17 +08:00
|
|
|
let update_status_sender_cloned = update_status_sender.clone();
|
2020-10-20 21:00:58 +08:00
|
|
|
let env_cloned = env.clone();
|
|
|
|
let index_cloned = index.clone();
|
|
|
|
let indexer_opt_cloned = opt.indexer.clone();
|
2020-10-19 22:03:17 +08:00
|
|
|
let update_store = UpdateStore::open(
|
|
|
|
update_store_options,
|
|
|
|
update_store_path,
|
2020-10-21 21:38:28 +08:00
|
|
|
move |update_id, meta, content| {
|
|
|
|
let result = match meta {
|
|
|
|
UpdateMeta::DocumentsAddition { total_number_of_documents } => {
|
|
|
|
let gzipped = false;
|
|
|
|
indexing::run(
|
|
|
|
&env_cloned,
|
|
|
|
&index_cloned,
|
|
|
|
&indexer_opt_cloned,
|
|
|
|
content,
|
|
|
|
gzipped,
|
|
|
|
|count| {
|
|
|
|
// We send progress status...
|
|
|
|
let meta = UpdateMetaProgress::DocumentsAddition {
|
|
|
|
processed_number_of_documents: count as usize,
|
|
|
|
total_number_of_documents,
|
|
|
|
};
|
|
|
|
let progress = UpdateStatus::Progressing { update_id, meta };
|
|
|
|
let _ = update_status_sender_cloned.send(progress);
|
|
|
|
},
|
|
|
|
)
|
|
|
|
},
|
|
|
|
UpdateMeta::DocumentsAdditionFromPath { path } => {
|
|
|
|
let file = match File::open(&path) {
|
|
|
|
Ok(file) => file,
|
|
|
|
Err(e) => {
|
|
|
|
let meta = format!("documents addition file ({}) error: {}", path.display(), e);
|
|
|
|
return Ok(meta);
|
|
|
|
}
|
|
|
|
};
|
|
|
|
let content = match unsafe { memmap::Mmap::map(&file) } {
|
|
|
|
Ok(mmap) => mmap,
|
|
|
|
Err(e) => {
|
|
|
|
let meta = format!("documents addition file ({}) mmap error: {}", path.display(), e);
|
|
|
|
return Ok(meta);
|
|
|
|
},
|
|
|
|
};
|
|
|
|
|
|
|
|
let gzipped = path.extension().map_or(false, |e| e == "gz" || e == "gzip");
|
|
|
|
indexing::run(
|
|
|
|
&env_cloned,
|
|
|
|
&index_cloned,
|
|
|
|
&indexer_opt_cloned,
|
|
|
|
&content,
|
|
|
|
gzipped,
|
|
|
|
|count| {
|
|
|
|
// We send progress status...
|
|
|
|
let meta = UpdateMetaProgress::DocumentsAddition {
|
|
|
|
processed_number_of_documents: count as usize,
|
|
|
|
total_number_of_documents: None,
|
|
|
|
};
|
|
|
|
let progress = UpdateStatus::Progressing { update_id, meta };
|
|
|
|
let _ = update_status_sender_cloned.send(progress);
|
|
|
|
},
|
|
|
|
)
|
|
|
|
}
|
|
|
|
};
|
2020-10-20 21:00:58 +08:00
|
|
|
|
|
|
|
let meta = match result {
|
|
|
|
Ok(()) => format!("valid update content"),
|
|
|
|
Err(e) => {
|
|
|
|
format!("error while processing update content: {}", e)
|
|
|
|
}
|
|
|
|
};
|
2020-10-20 18:28:10 +08:00
|
|
|
|
2020-10-20 18:09:38 +08:00
|
|
|
let processed = UpdateStatus::Processed { update_id, meta: meta.clone() };
|
|
|
|
let _ = update_status_sender_cloned.send(processed);
|
2020-10-20 21:00:58 +08:00
|
|
|
|
2020-10-19 22:03:17 +08:00
|
|
|
Ok(meta)
|
|
|
|
})?;
|
|
|
|
|
2020-10-20 21:00:58 +08:00
|
|
|
// The database name will not change.
|
2020-07-11 20:17:37 +08:00
|
|
|
let db_name = opt.database.file_stem().and_then(|s| s.to_str()).unwrap_or("").to_string();
|
2020-10-20 21:00:58 +08:00
|
|
|
let lmdb_path = opt.database.join("data.mdb");
|
2020-07-11 20:17:37 +08:00
|
|
|
|
2020-05-31 23:48:13 +08:00
|
|
|
// We run and wait on the HTTP server
|
|
|
|
|
|
|
|
// Expose an HTML page to debug the search in a browser
|
2020-10-20 01:57:15 +08:00
|
|
|
let db_name_cloned = db_name.clone();
|
2020-10-20 21:00:58 +08:00
|
|
|
let lmdb_path_cloned = lmdb_path.clone();
|
|
|
|
let env_cloned = env.clone();
|
|
|
|
let index_cloned = index.clone();
|
2020-05-31 23:48:13 +08:00
|
|
|
let dash_html_route = warp::filters::method::get()
|
|
|
|
.and(warp::filters::path::end())
|
2020-10-20 21:00:58 +08:00
|
|
|
.map(move || {
|
|
|
|
// We retrieve the database size.
|
|
|
|
let db_size = File::open(lmdb_path_cloned.clone())
|
|
|
|
.unwrap()
|
|
|
|
.metadata()
|
|
|
|
.unwrap()
|
|
|
|
.len() as usize;
|
|
|
|
|
|
|
|
// And the number of documents in the database.
|
|
|
|
let rtxn = env_cloned.clone().read_txn().unwrap();
|
|
|
|
let docs_count = index_cloned.clone().number_of_documents(&rtxn).unwrap() as usize;
|
|
|
|
|
|
|
|
IndexTemplate { db_name: db_name_cloned.clone(), db_size, docs_count }
|
|
|
|
});
|
2020-10-20 01:57:15 +08:00
|
|
|
|
|
|
|
let update_store_cloned = update_store.clone();
|
2020-10-20 21:00:58 +08:00
|
|
|
let lmdb_path_cloned = lmdb_path.clone();
|
|
|
|
let env_cloned = env.clone();
|
|
|
|
let index_cloned = index.clone();
|
2020-10-20 01:57:15 +08:00
|
|
|
let updates_list_or_html_route = warp::filters::method::get()
|
|
|
|
.and(warp::header("Accept"))
|
|
|
|
.and(warp::path!("updates"))
|
|
|
|
.map(move |header: String| {
|
|
|
|
let update_store = update_store_cloned.clone();
|
|
|
|
let mut updates = update_store.iter_metas(|processed, pending| {
|
2020-10-21 21:38:28 +08:00
|
|
|
let mut updates = Vec::<UpdateStatus<_, UpdateMetaProgress, _>>::new();
|
2020-10-20 01:57:15 +08:00
|
|
|
for result in processed {
|
2020-10-20 18:09:38 +08:00
|
|
|
let (uid, meta) = result?;
|
|
|
|
updates.push(UpdateStatus::Processed { update_id: uid.get(), meta });
|
2020-10-20 01:57:15 +08:00
|
|
|
}
|
|
|
|
for result in pending {
|
2020-10-20 18:09:38 +08:00
|
|
|
let (uid, meta) = result?;
|
|
|
|
updates.push(UpdateStatus::Pending { update_id: uid.get(), meta });
|
2020-10-20 01:57:15 +08:00
|
|
|
}
|
|
|
|
Ok(updates)
|
|
|
|
}).unwrap();
|
|
|
|
|
|
|
|
if header.contains("text/html") {
|
|
|
|
updates.reverse();
|
2020-10-20 21:00:58 +08:00
|
|
|
|
|
|
|
// We retrieve the database size.
|
|
|
|
let db_size = File::open(lmdb_path_cloned.clone())
|
|
|
|
.unwrap()
|
|
|
|
.metadata()
|
|
|
|
.unwrap()
|
|
|
|
.len() as usize;
|
|
|
|
|
|
|
|
// And the number of documents in the database.
|
|
|
|
let rtxn = env_cloned.clone().read_txn().unwrap();
|
|
|
|
let docs_count = index_cloned.clone().number_of_documents(&rtxn).unwrap() as usize;
|
|
|
|
|
2020-10-20 18:09:38 +08:00
|
|
|
let template = UpdatesTemplate {
|
|
|
|
db_name: db_name.clone(),
|
|
|
|
db_size,
|
|
|
|
docs_count,
|
|
|
|
updates,
|
|
|
|
};
|
2020-10-20 01:57:15 +08:00
|
|
|
Box::new(template) as Box<dyn warp::Reply>
|
|
|
|
} else {
|
|
|
|
Box::new(warp::reply::json(&updates))
|
|
|
|
}
|
|
|
|
});
|
2020-05-31 23:48:13 +08:00
|
|
|
|
|
|
|
let dash_bulma_route = warp::filters::method::get()
|
|
|
|
.and(warp::path!("bulma.min.css"))
|
|
|
|
.map(|| Response::builder()
|
|
|
|
.header("content-type", "text/css; charset=utf-8")
|
|
|
|
.body(include_str!("../../public/bulma.min.css"))
|
|
|
|
);
|
|
|
|
|
2020-07-14 05:51:41 +08:00
|
|
|
let dash_bulma_dark_route = warp::filters::method::get()
|
|
|
|
.and(warp::path!("bulma-prefers-dark.min.css"))
|
|
|
|
.map(|| Response::builder()
|
|
|
|
.header("content-type", "text/css; charset=utf-8")
|
|
|
|
.body(include_str!("../../public/bulma-prefers-dark.min.css"))
|
|
|
|
);
|
|
|
|
|
2020-07-11 17:48:27 +08:00
|
|
|
let dash_style_route = warp::filters::method::get()
|
|
|
|
.and(warp::path!("style.css"))
|
|
|
|
.map(|| Response::builder()
|
|
|
|
.header("content-type", "text/css; charset=utf-8")
|
|
|
|
.body(include_str!("../../public/style.css"))
|
|
|
|
);
|
|
|
|
|
2020-05-31 23:48:13 +08:00
|
|
|
let dash_jquery_route = warp::filters::method::get()
|
|
|
|
.and(warp::path!("jquery-3.4.1.min.js"))
|
|
|
|
.map(|| Response::builder()
|
|
|
|
.header("content-type", "application/javascript; charset=utf-8")
|
|
|
|
.body(include_str!("../../public/jquery-3.4.1.min.js"))
|
|
|
|
);
|
|
|
|
|
2020-07-11 20:17:37 +08:00
|
|
|
let dash_filesize_route = warp::filters::method::get()
|
|
|
|
.and(warp::path!("filesize.min.js"))
|
|
|
|
.map(|| Response::builder()
|
|
|
|
.header("content-type", "application/javascript; charset=utf-8")
|
|
|
|
.body(include_str!("../../public/filesize.min.js"))
|
|
|
|
);
|
|
|
|
|
2020-07-11 17:48:27 +08:00
|
|
|
let dash_script_route = warp::filters::method::get()
|
|
|
|
.and(warp::path!("script.js"))
|
|
|
|
.map(|| Response::builder()
|
|
|
|
.header("content-type", "application/javascript; charset=utf-8")
|
|
|
|
.body(include_str!("../../public/script.js"))
|
|
|
|
);
|
|
|
|
|
2020-10-20 01:57:15 +08:00
|
|
|
let updates_script_route = warp::filters::method::get()
|
|
|
|
.and(warp::path!("updates-script.js"))
|
|
|
|
.map(|| Response::builder()
|
|
|
|
.header("content-type", "application/javascript; charset=utf-8")
|
|
|
|
.body(include_str!("../../public/updates-script.js"))
|
|
|
|
);
|
|
|
|
|
2020-07-16 05:51:12 +08:00
|
|
|
let dash_logo_white_route = warp::filters::method::get()
|
|
|
|
.and(warp::path!("logo-white.svg"))
|
|
|
|
.map(|| Response::builder()
|
|
|
|
.header("content-type", "image/svg+xml")
|
|
|
|
.body(include_str!("../../public/logo-white.svg"))
|
|
|
|
);
|
|
|
|
|
|
|
|
let dash_logo_black_route = warp::filters::method::get()
|
|
|
|
.and(warp::path!("logo-black.svg"))
|
|
|
|
.map(|| Response::builder()
|
|
|
|
.header("content-type", "image/svg+xml")
|
|
|
|
.body(include_str!("../../public/logo-black.svg"))
|
|
|
|
);
|
|
|
|
|
2020-05-31 23:48:13 +08:00
|
|
|
#[derive(Deserialize)]
|
|
|
|
struct QueryBody {
|
2020-10-06 20:52:05 +08:00
|
|
|
query: Option<String>,
|
2020-05-31 23:48:13 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
let env_cloned = env.clone();
|
2020-07-14 17:27:46 +08:00
|
|
|
let disable_highlighting = opt.disable_highlighting;
|
2020-05-31 23:48:13 +08:00
|
|
|
let query_route = warp::filters::method::post()
|
|
|
|
.and(warp::path!("query"))
|
|
|
|
.and(warp::body::json())
|
|
|
|
.map(move |query: QueryBody| {
|
|
|
|
let before_search = Instant::now();
|
|
|
|
let rtxn = env_cloned.read_txn().unwrap();
|
|
|
|
|
2020-10-06 20:52:05 +08:00
|
|
|
let mut search = index.search(&rtxn);
|
|
|
|
if let Some(query) = query.query {
|
|
|
|
search.query(query);
|
|
|
|
}
|
|
|
|
|
|
|
|
let SearchResult { found_words, documents_ids } = search.execute().unwrap();
|
2020-05-31 23:48:13 +08:00
|
|
|
|
2020-10-22 00:26:29 +08:00
|
|
|
let mut documents = Vec::new();
|
|
|
|
if let Some(headers) = index.headers(&rtxn).unwrap() {
|
|
|
|
for (_id, record) in index.documents(&rtxn, documents_ids).unwrap() {
|
|
|
|
let mut record = record.iter()
|
|
|
|
.map(|(key_id, value)| {
|
|
|
|
let key = headers[key_id as usize].to_owned();
|
|
|
|
let value = std::str::from_utf8(value).unwrap().to_owned();
|
|
|
|
(key, value)
|
|
|
|
})
|
|
|
|
.collect();
|
|
|
|
|
|
|
|
if !disable_highlighting {
|
|
|
|
highlight_record(&mut record, &found_words);
|
2020-08-31 20:20:42 +08:00
|
|
|
}
|
|
|
|
|
2020-10-22 00:26:29 +08:00
|
|
|
documents.push(record);
|
|
|
|
}
|
|
|
|
}
|
2020-05-31 23:48:13 +08:00
|
|
|
|
|
|
|
Response::builder()
|
2020-10-22 00:26:29 +08:00
|
|
|
.header("Content-Type", "application/json")
|
2020-05-31 23:48:13 +08:00
|
|
|
.header("Time-Ms", before_search.elapsed().as_millis().to_string())
|
2020-10-22 00:26:29 +08:00
|
|
|
.body(serde_json::to_string(&documents).unwrap())
|
2020-05-31 23:48:13 +08:00
|
|
|
});
|
|
|
|
|
2020-10-19 22:03:17 +08:00
|
|
|
async fn buf_stream(
|
2020-10-21 21:38:28 +08:00
|
|
|
update_store: Arc<UpdateStore<UpdateMeta, String>>,
|
|
|
|
update_status_sender: broadcast::Sender<UpdateStatus<UpdateMeta, UpdateMetaProgress, String>>,
|
2020-10-19 22:03:17 +08:00
|
|
|
mut stream: impl futures::Stream<Item=Result<impl bytes::Buf, warp::Error>> + Unpin,
|
|
|
|
) -> Result<impl warp::Reply, warp::Rejection>
|
|
|
|
{
|
|
|
|
let file = tokio::task::block_in_place(tempfile::tempfile).unwrap();
|
|
|
|
let mut file = TFile::from_std(file);
|
|
|
|
|
|
|
|
while let Some(result) = stream.next().await {
|
|
|
|
let bytes = result.unwrap().to_bytes();
|
|
|
|
file.write_all(&bytes[..]).await.unwrap();
|
|
|
|
}
|
|
|
|
|
|
|
|
let file = file.into_std().await;
|
|
|
|
let mmap = unsafe { memmap::Mmap::map(&file).unwrap() };
|
|
|
|
|
2020-10-21 21:38:28 +08:00
|
|
|
let meta = UpdateMeta::DocumentsAddition { total_number_of_documents: None };
|
2020-10-20 18:09:38 +08:00
|
|
|
let update_id = update_store.register_update(&meta, &mmap[..]).unwrap();
|
2020-10-20 21:14:06 +08:00
|
|
|
let _ = update_status_sender.send(UpdateStatus::Pending { update_id, meta });
|
2020-10-20 18:09:38 +08:00
|
|
|
eprintln!("update {} registered", update_id);
|
2020-10-19 22:03:17 +08:00
|
|
|
|
|
|
|
Ok(warp::reply())
|
|
|
|
}
|
|
|
|
|
|
|
|
let update_store_cloned = update_store.clone();
|
2020-10-20 17:19:34 +08:00
|
|
|
let update_status_sender_cloned = update_status_sender.clone();
|
2020-10-21 21:38:28 +08:00
|
|
|
let indexing_route_csv = warp::filters::method::post()
|
2020-10-19 22:03:17 +08:00
|
|
|
.and(warp::path!("documents"))
|
2020-10-21 21:38:28 +08:00
|
|
|
.and(warp::header::exact_ignore_case("content-type", "text/csv"))
|
2020-10-19 22:03:17 +08:00
|
|
|
.and(warp::body::stream())
|
|
|
|
.and_then(move |stream| {
|
2020-10-20 17:19:34 +08:00
|
|
|
buf_stream(update_store_cloned.clone(), update_status_sender_cloned.clone(), stream)
|
2020-10-19 22:03:17 +08:00
|
|
|
});
|
|
|
|
|
2020-10-21 21:38:28 +08:00
|
|
|
let update_store_cloned = update_store.clone();
|
|
|
|
let update_status_sender_cloned = update_status_sender.clone();
|
|
|
|
let indexing_route_filepath = warp::filters::method::post()
|
|
|
|
.and(warp::path!("documents"))
|
|
|
|
.and(warp::header::exact_ignore_case("content-type", "text/x-filepath"))
|
|
|
|
.and(warp::body::bytes())
|
|
|
|
.map(move |bytes: bytes::Bytes| {
|
|
|
|
let string = std::str::from_utf8(&bytes).unwrap().trim();
|
|
|
|
let meta = UpdateMeta::DocumentsAdditionFromPath { path: PathBuf::from(string) };
|
|
|
|
let update_id = update_store_cloned.register_update(&meta, &[]).unwrap();
|
|
|
|
let _ = update_status_sender_cloned.send(UpdateStatus::Pending { update_id, meta });
|
|
|
|
eprintln!("update {} registered", update_id);
|
|
|
|
Ok(warp::reply())
|
|
|
|
});
|
|
|
|
|
2020-10-19 22:03:17 +08:00
|
|
|
let update_ws_route = warp::ws()
|
|
|
|
.and(warp::path!("updates" / "ws"))
|
|
|
|
.map(move |ws: warp::ws::Ws| {
|
|
|
|
// And then our closure will be called when it completes...
|
2020-10-20 17:19:34 +08:00
|
|
|
let update_status_receiver = update_status_sender.subscribe();
|
2020-10-19 22:03:17 +08:00
|
|
|
ws.on_upgrade(|websocket| {
|
|
|
|
// Just echo all updates messages...
|
2020-10-20 17:19:34 +08:00
|
|
|
update_status_receiver
|
|
|
|
.into_stream()
|
|
|
|
.flat_map(|result| {
|
2020-10-20 18:09:38 +08:00
|
|
|
match result {
|
|
|
|
Ok(status) => {
|
|
|
|
let msg = serde_json::to_string(&status).unwrap();
|
|
|
|
stream::iter(Some(Ok(Message::text(msg))))
|
|
|
|
},
|
2020-10-20 17:19:34 +08:00
|
|
|
Err(e) => {
|
|
|
|
eprintln!("channel error: {:?}", e);
|
|
|
|
stream::iter(None)
|
|
|
|
},
|
|
|
|
}
|
|
|
|
})
|
2020-10-19 22:03:17 +08:00
|
|
|
.forward(websocket)
|
|
|
|
.map(|result| {
|
|
|
|
if let Err(e) = result {
|
|
|
|
eprintln!("websocket error: {:?}", e);
|
|
|
|
}
|
|
|
|
})
|
|
|
|
})
|
|
|
|
});
|
|
|
|
|
2020-05-31 23:48:13 +08:00
|
|
|
let routes = dash_html_route
|
2020-10-20 01:57:15 +08:00
|
|
|
.or(updates_list_or_html_route)
|
2020-05-31 23:48:13 +08:00
|
|
|
.or(dash_bulma_route)
|
2020-07-14 05:51:41 +08:00
|
|
|
.or(dash_bulma_dark_route)
|
2020-07-11 17:48:27 +08:00
|
|
|
.or(dash_style_route)
|
2020-05-31 23:48:13 +08:00
|
|
|
.or(dash_jquery_route)
|
2020-07-11 20:17:37 +08:00
|
|
|
.or(dash_filesize_route)
|
2020-07-11 17:48:27 +08:00
|
|
|
.or(dash_script_route)
|
2020-10-20 01:57:15 +08:00
|
|
|
.or(updates_script_route)
|
2020-07-16 05:51:12 +08:00
|
|
|
.or(dash_logo_white_route)
|
|
|
|
.or(dash_logo_black_route)
|
2020-10-19 22:03:17 +08:00
|
|
|
.or(query_route)
|
2020-10-21 21:38:28 +08:00
|
|
|
.or(indexing_route_csv)
|
|
|
|
.or(indexing_route_filepath)
|
2020-10-20 01:57:15 +08:00
|
|
|
.or(update_ws_route);
|
2020-05-31 23:48:13 +08:00
|
|
|
|
2020-10-19 19:44:17 +08:00
|
|
|
let addr = SocketAddr::from_str(&opt.http_listen_addr)?;
|
2020-10-19 22:03:17 +08:00
|
|
|
tokio::runtime::Builder::new()
|
|
|
|
.threaded_scheduler()
|
2020-10-19 19:44:17 +08:00
|
|
|
.enable_all()
|
|
|
|
.build()?
|
|
|
|
.block_on(async {
|
|
|
|
warp::serve(routes).run(addr).await
|
|
|
|
});
|
2020-05-31 23:48:13 +08:00
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|