use std::collections::HashSet; use std::fs::{File, create_dir_all}; use std::{mem, io}; use std::net::SocketAddr; use std::path::PathBuf; use std::str::FromStr; use std::sync::Arc; use std::time::Instant; use askama_warp::Template; use flate2::read::GzDecoder; use futures::stream; use futures::{FutureExt, StreamExt}; use grenad::CompressionType; use heed::EnvOpenOptions; use indexmap::IndexMap; use serde::{Serialize, Deserialize}; use structopt::StructOpt; use tokio::fs::File as TFile; use tokio::io::AsyncWriteExt; use tokio::sync::broadcast; use warp::filters::ws::Message; use warp::{Filter, http::Response}; use crate::tokenizer::{simple_tokenizer, TokenType}; use crate::update::{UpdateBuilder, IndexDocumentsMethod, UpdateFormat}; use crate::{Index, UpdateStore, SearchResult}; #[derive(Debug, StructOpt)] /// The HTTP main server of the milli project. pub struct Opt { /// The database path where the LMDB database is located. /// It is created if it doesn't already exist. #[structopt(long = "db", parse(from_os_str))] database: PathBuf, /// The maximum size the database can take on disk. It is recommended to specify /// the whole disk space (value must be a multiple of a page size). #[structopt(long = "db-size", default_value = "107374182400")] // 100 GB database_size: usize, /// The maximum size the database that stores the updates can take on disk. It is recommended /// to specify the whole disk space (value must be a multiple of a page size). #[structopt(long = "udb-size", default_value = "10737418240")] // 10 GB update_database_size: usize, /// Disable document highlighting on the dashboard. #[structopt(long)] disable_highlighting: bool, /// Verbose mode (-v, -vv, -vvv, etc.) #[structopt(short, long, parse(from_occurrences))] verbose: usize, /// The ip and port on which the database will listen for HTTP requests. #[structopt(short = "l", long, default_value = "127.0.0.1:9700")] http_listen_addr: String, #[structopt(flatten)] indexer: IndexerOpt, } #[derive(Debug, Clone, StructOpt)] pub struct IndexerOpt { /// The amount of documents to skip before printing /// a log regarding the indexing advancement. #[structopt(long, default_value = "1000000")] // 1m pub log_every_n: usize, /// MTBL max number of chunks in bytes. #[structopt(long)] pub max_nb_chunks: Option, /// The maximum amount of memory to use for the MTBL buffer. It is recommended /// to use something like 80%-90% of the available memory. /// /// It is automatically split by the number of jobs e.g. if you use 7 jobs /// and 7 GB of max memory, each thread will use a maximum of 1 GB. #[structopt(long, default_value = "7516192768")] // 7 GB pub max_memory: usize, /// Size of the linked hash map cache when indexing. /// The bigger it is, the faster the indexing is but the more memory it takes. #[structopt(long, default_value = "500")] pub linked_hash_map_size: usize, /// The name of the compression algorithm to use when compressing intermediate /// chunks during indexing documents. /// /// Choosing a fast algorithm will make the indexing faster but may consume more memory. #[structopt(long, default_value = "snappy", possible_values = &["snappy", "zlib", "lz4", "lz4hc", "zstd"])] pub chunk_compression_type: CompressionType, /// The level of compression of the chosen algorithm. #[structopt(long, requires = "chunk-compression-type")] pub chunk_compression_level: Option, /// The number of bytes to remove from the begining of the chunks while reading/sorting /// or merging them. /// /// File fusing must only be enable on file systems that support the `FALLOC_FL_COLLAPSE_RANGE`, /// (i.e. ext4 and XFS). File fusing will only work if the `enable-chunk-fusing` is set. #[structopt(long, default_value = "4294967296")] // 4 GB pub chunk_fusing_shrink_size: u64, /// Enable the chunk fusing or not, this reduces the amount of disk used by a factor of 2. #[structopt(long)] pub enable_chunk_fusing: bool, /// Number of parallel jobs for indexing, defaults to # of CPUs. #[structopt(long)] pub indexing_jobs: Option, } fn highlight_record(record: &mut IndexMap, words: &HashSet) { for (_key, value) in record.iter_mut() { let old_value = mem::take(value); for (token_type, token) in simple_tokenizer(&old_value) { if token_type == TokenType::Word { let lowercase_token = token.to_lowercase(); let to_highlight = words.contains(&lowercase_token); if to_highlight { value.push_str("") } value.push_str(token); if to_highlight { value.push_str("") } } else { value.push_str(token); } } } } #[derive(Template)] #[template(path = "index.html")] struct IndexTemplate { db_name: String, db_size: usize, docs_count: usize, } #[derive(Template)] #[template(path = "updates.html")] struct UpdatesTemplate { db_name: String, db_size: usize, docs_count: usize, updates: Vec>, } #[derive(Debug, Clone, Serialize)] #[serde(tag = "type")] enum UpdateStatus { Pending { update_id: u64, meta: M }, Progressing { update_id: u64, meta: P }, Processed { update_id: u64, meta: N }, } #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "type")] enum UpdateMeta { DocumentsAddition { method: String, format: String }, ClearDocuments, } #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "type")] enum UpdateMetaProgress { DocumentsAddition { processed_number_of_documents: usize, total_number_of_documents: Option, }, } pub fn run(opt: Opt) -> anyhow::Result<()> { stderrlog::new() .verbosity(opt.verbose) .show_level(false) .timestamp(stderrlog::Timestamp::Off) .init()?; create_dir_all(&opt.database)?; let mut options = EnvOpenOptions::new(); options.map_size(opt.database_size); // Open the LMDB database. let index = Index::new(options, &opt.database)?; // Setup the LMDB based update database. let mut update_store_options = EnvOpenOptions::new(); update_store_options.map_size(opt.update_database_size); let update_store_path = opt.database.join("updates.mdb"); create_dir_all(&update_store_path)?; let (update_status_sender, _) = broadcast::channel(100); let update_status_sender_cloned = update_status_sender.clone(); let index_cloned = index.clone(); let indexer_opt_cloned = opt.indexer.clone(); let update_store = UpdateStore::open( update_store_options, update_store_path, move |update_id, meta, content| { // We prepare the update by using the update builder. let mut update_builder = UpdateBuilder::new(); if let Some(max_nb_chunks) = indexer_opt_cloned.max_nb_chunks { update_builder.max_nb_chunks(max_nb_chunks); } if let Some(chunk_compression_level) = indexer_opt_cloned.chunk_compression_level { update_builder.chunk_compression_level(chunk_compression_level); } if let Some(indexing_jobs) = indexer_opt_cloned.indexing_jobs { update_builder.indexing_jobs(indexing_jobs); } update_builder.log_every_n(indexer_opt_cloned.log_every_n); update_builder.max_memory(indexer_opt_cloned.max_memory); update_builder.linked_hash_map_size(indexer_opt_cloned.linked_hash_map_size); update_builder.chunk_compression_type(indexer_opt_cloned.chunk_compression_type); update_builder.chunk_fusing_shrink_size(indexer_opt_cloned.chunk_fusing_shrink_size); // we extract the update type and execute the update itself. let result: anyhow::Result<()> = match meta { UpdateMeta::DocumentsAddition { method, format } => { // We must use the write transaction of the update here. let mut wtxn = index_cloned.write_txn()?; let mut builder = update_builder.index_documents(&mut wtxn, &index_cloned); match format.as_str() { "json" => builder.update_format(UpdateFormat::Json), "csv" => builder.update_format(UpdateFormat::Csv), otherwise => panic!("invalid update format {:?}", otherwise), }; match method.as_str() { "replace" => builder.index_documents_method(IndexDocumentsMethod::ReplaceDocuments), "update" => builder.index_documents_method(IndexDocumentsMethod::UpdateDocuments), otherwise => panic!("invalid indexing method {:?}", otherwise), }; let gzipped = false; let reader = if gzipped { Box::new(GzDecoder::new(content)) } else { Box::new(content) as Box }; let result = builder.execute(reader, |count, total| { let _ = update_status_sender_cloned.send(UpdateStatus::Progressing { update_id, meta: UpdateMetaProgress::DocumentsAddition { processed_number_of_documents: count, total_number_of_documents: Some(total), } }); }); match result { Ok(()) => wtxn.commit().map_err(Into::into), Err(e) => Err(e.into()) } }, UpdateMeta::ClearDocuments => { // We must use the write transaction of the update here. let mut wtxn = index_cloned.write_txn()?; let builder = update_builder.clear_documents(&mut wtxn, &index_cloned); match builder.execute() { Ok(_count) => wtxn.commit().map_err(Into::into), Err(e) => Err(e.into()) } } }; let meta = match result { Ok(()) => format!("valid update content"), Err(e) => format!("error while processing update content: {:?}", e), }; let processed = UpdateStatus::Processed { update_id, meta: meta.clone() }; let _ = update_status_sender_cloned.send(processed); Ok(meta) })?; // The database name will not change. let db_name = opt.database.file_stem().and_then(|s| s.to_str()).unwrap_or("").to_string(); let lmdb_path = opt.database.join("data.mdb"); // We run and wait on the HTTP server // Expose an HTML page to debug the search in a browser let db_name_cloned = db_name.clone(); let lmdb_path_cloned = lmdb_path.clone(); let index_cloned = index.clone(); let dash_html_route = warp::filters::method::get() .and(warp::filters::path::end()) .map(move || { // We retrieve the database size. let db_size = File::open(lmdb_path_cloned.clone()) .unwrap() .metadata() .unwrap() .len() as usize; // And the number of documents in the database. let rtxn = index_cloned.read_txn().unwrap(); let docs_count = index_cloned.clone().number_of_documents(&rtxn).unwrap() as usize; IndexTemplate { db_name: db_name_cloned.clone(), db_size, docs_count } }); let update_store_cloned = update_store.clone(); let lmdb_path_cloned = lmdb_path.clone(); let index_cloned = index.clone(); let updates_list_or_html_route = warp::filters::method::get() .and(warp::header("Accept")) .and(warp::path!("updates")) .map(move |header: String| { let update_store = update_store_cloned.clone(); let mut updates = update_store.iter_metas(|processed, pending| { let mut updates = Vec::>::new(); for result in processed { let (uid, meta) = result?; updates.push(UpdateStatus::Processed { update_id: uid.get(), meta }); } for result in pending { let (uid, meta) = result?; updates.push(UpdateStatus::Pending { update_id: uid.get(), meta }); } Ok(updates) }).unwrap(); if header.contains("text/html") { updates.reverse(); // We retrieve the database size. let db_size = File::open(lmdb_path_cloned.clone()) .unwrap() .metadata() .unwrap() .len() as usize; // And the number of documents in the database. let rtxn = index_cloned.read_txn().unwrap(); let docs_count = index_cloned.clone().number_of_documents(&rtxn).unwrap() as usize; let template = UpdatesTemplate { db_name: db_name.clone(), db_size, docs_count, updates, }; Box::new(template) as Box } else { Box::new(warp::reply::json(&updates)) } }); let dash_bulma_route = warp::filters::method::get() .and(warp::path!("bulma.min.css")) .map(|| Response::builder() .header("content-type", "text/css; charset=utf-8") .body(include_str!("../../public/bulma.min.css")) ); let dash_bulma_dark_route = warp::filters::method::get() .and(warp::path!("bulma-prefers-dark.min.css")) .map(|| Response::builder() .header("content-type", "text/css; charset=utf-8") .body(include_str!("../../public/bulma-prefers-dark.min.css")) ); let dash_style_route = warp::filters::method::get() .and(warp::path!("style.css")) .map(|| Response::builder() .header("content-type", "text/css; charset=utf-8") .body(include_str!("../../public/style.css")) ); let dash_jquery_route = warp::filters::method::get() .and(warp::path!("jquery-3.4.1.min.js")) .map(|| Response::builder() .header("content-type", "application/javascript; charset=utf-8") .body(include_str!("../../public/jquery-3.4.1.min.js")) ); let dash_filesize_route = warp::filters::method::get() .and(warp::path!("filesize.min.js")) .map(|| Response::builder() .header("content-type", "application/javascript; charset=utf-8") .body(include_str!("../../public/filesize.min.js")) ); let dash_script_route = warp::filters::method::get() .and(warp::path!("script.js")) .map(|| Response::builder() .header("content-type", "application/javascript; charset=utf-8") .body(include_str!("../../public/script.js")) ); let updates_script_route = warp::filters::method::get() .and(warp::path!("updates-script.js")) .map(|| Response::builder() .header("content-type", "application/javascript; charset=utf-8") .body(include_str!("../../public/updates-script.js")) ); let dash_logo_white_route = warp::filters::method::get() .and(warp::path!("logo-white.svg")) .map(|| Response::builder() .header("content-type", "image/svg+xml") .body(include_str!("../../public/logo-white.svg")) ); let dash_logo_black_route = warp::filters::method::get() .and(warp::path!("logo-black.svg")) .map(|| Response::builder() .header("content-type", "image/svg+xml") .body(include_str!("../../public/logo-black.svg")) ); #[derive(Deserialize)] struct QueryBody { query: Option, } let disable_highlighting = opt.disable_highlighting; let query_route = warp::filters::method::post() .and(warp::path!("query")) .and(warp::body::json()) .map(move |query: QueryBody| { let before_search = Instant::now(); let rtxn = index.read_txn().unwrap(); let mut search = index.search(&rtxn); if let Some(query) = query.query { search.query(query); } let SearchResult { found_words, documents_ids } = search.execute().unwrap(); let mut documents = Vec::new(); let fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); for (_id, record) in index.documents(&rtxn, documents_ids).unwrap() { let mut record = record.iter() .map(|(key_id, value)| { let key = fields_ids_map.name(key_id).unwrap().to_owned(); // TODO we must deserialize a Json Value and highlight it. let value = serde_json::from_slice(value).unwrap(); (key, value) }) .collect(); if !disable_highlighting { highlight_record(&mut record, &found_words); } documents.push(record); } Response::builder() .header("Content-Type", "application/json") .header("Time-Ms", before_search.elapsed().as_millis().to_string()) .body(serde_json::to_string(&documents).unwrap()) }); async fn buf_stream( update_store: Arc>, update_status_sender: broadcast::Sender>, update_method: Option, update_format: UpdateFormat, mut stream: impl futures::Stream> + Unpin, ) -> Result { let file = tokio::task::block_in_place(tempfile::tempfile).unwrap(); let mut file = TFile::from_std(file); while let Some(result) = stream.next().await { let bytes = result.unwrap().to_bytes(); file.write_all(&bytes[..]).await.unwrap(); } let file = file.into_std().await; let mmap = unsafe { memmap::Mmap::map(&file).unwrap() }; let method = match update_method.as_deref() { Some("replace") => String::from("replace"), Some("update") => String::from("update"), _ => String::from("replace"), }; let format = match update_format { UpdateFormat::Csv => String::from("csv"), UpdateFormat::Json => String::from("json"), }; let meta = UpdateMeta::DocumentsAddition { method, format }; let update_id = update_store.register_update(&meta, &mmap[..]).unwrap(); let _ = update_status_sender.send(UpdateStatus::Pending { update_id, meta }); eprintln!("update {} registered", update_id); Ok(warp::reply()) } #[derive(Deserialize)] struct QueryUpdate { method: Option, } let update_store_cloned = update_store.clone(); let update_status_sender_cloned = update_status_sender.clone(); let indexing_route_csv = warp::filters::method::post() .and(warp::path!("documents")) .and(warp::header::exact_ignore_case("content-type", "text/csv")) .and(warp::filters::query::query()) .and(warp::body::stream()) .and_then(move |params: QueryUpdate, stream| { buf_stream( update_store_cloned.clone(), update_status_sender_cloned.clone(), params.method, UpdateFormat::Csv, stream, ) }); let update_store_cloned = update_store.clone(); let update_status_sender_cloned = update_status_sender.clone(); let indexing_route_json = warp::filters::method::post() .and(warp::path!("documents")) .and(warp::header::exact_ignore_case("content-type", "application/json")) .and(warp::filters::query::query()) .and(warp::body::stream()) .and_then(move |params: QueryUpdate, stream| { buf_stream( update_store_cloned.clone(), update_status_sender_cloned.clone(), params.method, UpdateFormat::Json, stream, ) }); let update_status_sender_cloned = update_status_sender.clone(); let clearing_route = warp::filters::method::post() .and(warp::path!("clear-documents")) .map(move || { let meta = UpdateMeta::ClearDocuments; let update_id = update_store.register_update(&meta, &[]).unwrap(); let _ = update_status_sender_cloned.send(UpdateStatus::Pending { update_id, meta }); eprintln!("update {} registered", update_id); Ok(warp::reply()) }); let update_ws_route = warp::ws() .and(warp::path!("updates" / "ws")) .map(move |ws: warp::ws::Ws| { // And then our closure will be called when it completes... let update_status_receiver = update_status_sender.subscribe(); ws.on_upgrade(|websocket| { // Just echo all updates messages... update_status_receiver .into_stream() .flat_map(|result| { match result { Ok(status) => { let msg = serde_json::to_string(&status).unwrap(); stream::iter(Some(Ok(Message::text(msg)))) }, Err(e) => { eprintln!("channel error: {:?}", e); stream::iter(None) }, } }) .forward(websocket) .map(|result| { if let Err(e) = result { eprintln!("websocket error: {:?}", e); } }) }) }); let routes = dash_html_route .or(updates_list_or_html_route) .or(dash_bulma_route) .or(dash_bulma_dark_route) .or(dash_style_route) .or(dash_jquery_route) .or(dash_filesize_route) .or(dash_script_route) .or(updates_script_route) .or(dash_logo_white_route) .or(dash_logo_black_route) .or(query_route) .or(indexing_route_csv) .or(indexing_route_json) .or(clearing_route) .or(update_ws_route); let addr = SocketAddr::from_str(&opt.http_listen_addr)?; tokio::runtime::Builder::new() .threaded_scheduler() .enable_all() .build()? .block_on(async { warp::serve(routes).run(addr).await }); Ok(()) }