mod update_store; use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use std::fmt::Display; use std::fs::{create_dir_all, File}; use std::io::{BufRead, BufReader, Cursor, Read}; use std::net::SocketAddr; use std::num::{NonZeroU32, NonZeroUsize}; use std::path::PathBuf; use std::str::FromStr; use std::sync::Arc; use std::time::Instant; use std::{io, mem}; use askama_warp::Template; use byte_unit::Byte; use either::Either; use flate2::read::GzDecoder; use futures::{stream, FutureExt, StreamExt}; use heed::EnvOpenOptions; use milli::documents::DocumentBatchReader; use milli::tokenizer::TokenizerBuilder; use milli::update::UpdateIndexingStep::*; use milli::update::{ ClearDocuments, IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, Setting, }; use milli::{ obkv_to_json, CompressionType, Filter as MilliFilter, FilterCondition, FormatOptions, Index, MatcherBuilder, SearchResult, SortError, }; use once_cell::sync::OnceCell; use serde::{Deserialize, Serialize}; use serde_json::{Map, Value}; use structopt::StructOpt; use tokio::fs::File as TFile; use tokio::io::AsyncWriteExt; use tokio::sync::broadcast; use tokio_stream::wrappers::BroadcastStream; use warp::filters::ws::Message; use warp::http::Response; use warp::Filter; use self::update_store::UpdateStore; #[cfg(target_os = "linux")] #[global_allocator] static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; static GLOBAL_CONFIG: OnceCell = OnceCell::new(); #[derive(Debug, StructOpt)] /// The HTTP main server of the milli project. pub struct Opt { /// The database path where the LMDB database is located. /// It is created if it doesn't already exist. #[structopt(long = "db", parse(from_os_str))] database: PathBuf, /// The maximum size the database can take on disk. It is recommended to specify /// the whole disk space (value must be a multiple of a page size). #[structopt(long = "db-size", default_value = "100 GiB")] database_size: Byte, /// The maximum size the database that stores the updates can take on disk. It is recommended /// to specify the whole disk space (value must be a multiple of a page size). #[structopt(long = "udb-size", default_value = "10 GiB")] update_database_size: Byte, /// Disable document highlighting on the dashboard. #[structopt(long)] disable_highlighting: bool, /// Verbose mode (-v, -vv, -vvv, etc.) #[structopt(short, long, parse(from_occurrences))] verbose: usize, /// The ip and port on which the database will listen for HTTP requests. #[structopt(short = "l", long, default_value = "127.0.0.1:9700")] http_listen_addr: String, #[structopt(flatten)] indexer: IndexerOpt, } #[derive(Debug, Clone, StructOpt)] pub struct IndexerOpt { /// The amount of documents to skip before printing /// a log regarding the indexing advancement. #[structopt(long, default_value = "100000")] // 100k pub log_every_n: usize, /// MTBL max number of chunks in bytes. #[structopt(long)] pub max_nb_chunks: Option, /// The maximum amount of memory to use for the MTBL buffer. It is recommended /// to use something like 80%-90% of the available memory. /// /// It is automatically split by the number of jobs e.g. if you use 7 jobs /// and 7 GB of max memory, each thread will use a maximum of 1 GB. #[structopt(long, default_value = "7 GiB")] pub max_memory: Byte, /// Size of the linked hash map cache when indexing. /// The bigger it is, the faster the indexing is but the more memory it takes. #[structopt(long, default_value = "500")] pub linked_hash_map_size: usize, /// The name of the compression algorithm to use when compressing intermediate /// chunks during indexing documents. /// /// Choosing a fast algorithm will make the indexing faster but may consume more memory. #[structopt(long, possible_values = &["snappy", "zlib", "lz4", "lz4hc", "zstd"])] pub chunk_compression_type: Option, /// The level of compression of the chosen algorithm. #[structopt(long, requires = "chunk-compression-type")] pub chunk_compression_level: Option, /// The number of bytes to remove from the begining of the chunks while reading/sorting /// or merging them. /// /// File fusing must only be enable on file systems that support the `FALLOC_FL_COLLAPSE_RANGE`, /// (i.e. ext4 and XFS). File fusing will only work if the `enable-chunk-fusing` is set. #[structopt(long, default_value = "4 GiB")] pub chunk_fusing_shrink_size: Byte, /// Enable the chunk fusing or not, this reduces the amount of disk used by a factor of 2. #[structopt(long)] pub enable_chunk_fusing: bool, /// Number of parallel jobs for indexing, defaults to # of CPUs. #[structopt(long)] pub indexing_jobs: Option, /// Maximum relative position in an attribute for a word to be indexed. /// Any value higher than 65535 will be clamped. #[structopt(long)] pub max_positions_per_attributes: Option, } struct Highlighter<'s, A> { matcher_builder: MatcherBuilder<'s, A>, } impl<'s, A: AsRef<[u8]>> Highlighter<'s, A> { fn new(matcher_builder: MatcherBuilder<'s, A>) -> Self { Self { matcher_builder } } fn highlight_value(&self, value: Value) -> Value { match value { Value::Null => Value::Null, Value::Bool(boolean) => Value::Bool(boolean), Value::Number(number) => Value::Number(number), Value::String(old_string) => { let mut matcher = self.matcher_builder.build(&old_string); let format_options = FormatOptions { highlight: true, crop: Some(10) }; Value::String(matcher.format(format_options).to_string()) } Value::Array(values) => { Value::Array(values.into_iter().map(|v| self.highlight_value(v)).collect()) } Value::Object(object) => Value::Object( object.into_iter().map(|(k, v)| (k, self.highlight_value(v))).collect(), ), } } fn highlight_record( &self, object: &mut Map, attributes_to_highlight: &HashSet, ) { // TODO do we need to create a string for element that are not and needs to be highlight? for (key, value) in object.iter_mut() { if attributes_to_highlight.contains(key) { let old_value = mem::take(value); *value = self.highlight_value(old_value); } } } } #[derive(Template)] #[template(path = "index.html")] struct IndexTemplate { db_name: String, db_size: usize, docs_count: usize, } #[derive(Template)] #[template(path = "updates.html")] struct UpdatesTemplate { db_name: String, db_size: usize, docs_count: usize, updates: Vec>, } #[derive(Debug, Clone, Serialize)] #[serde(tag = "type")] enum UpdateStatus { Pending { update_id: u64, meta: M }, Progressing { update_id: u64, meta: P }, Processed { update_id: u64, meta: N }, Aborted { update_id: u64, meta: M }, } impl UpdateStatus { fn update_id(&self) -> u64 { match self { UpdateStatus::Pending { update_id, .. } => *update_id, UpdateStatus::Progressing { update_id, .. } => *update_id, UpdateStatus::Processed { update_id, .. } => *update_id, UpdateStatus::Aborted { update_id, .. } => *update_id, } } } #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "type")] enum UpdateMeta { DocumentsAddition { method: String, format: String, encoding: Option }, ClearDocuments, Settings(Settings), Facets(Facets), } #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "type")] enum UpdateMetaProgress { DocumentsAddition { step: usize, total_steps: usize, current: usize, total: Option }, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] #[serde(deny_unknown_fields)] #[serde(rename_all = "camelCase")] struct Settings { #[serde(default, skip_serializing_if = "Setting::is_not_set")] displayed_attributes: Setting>, #[serde(default, skip_serializing_if = "Setting::is_not_set")] searchable_attributes: Setting>, #[serde(default, skip_serializing_if = "Setting::is_not_set")] filterable_attributes: Setting>, #[serde(default, skip_serializing_if = "Setting::is_not_set")] sortable_attributes: Setting>, #[serde(default, skip_serializing_if = "Setting::is_not_set")] criteria: Setting>, #[serde(default, skip_serializing_if = "Setting::is_not_set")] stop_words: Setting>, #[serde(default, skip_serializing_if = "Setting::is_not_set")] synonyms: Setting>>, } #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(deny_unknown_fields)] #[serde(rename_all = "camelCase")] struct Facets { level_group_size: Option, min_level_size: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(deny_unknown_fields)] #[serde(rename_all = "camelCase")] struct WordsPrefixes { threshold: Option, max_prefix_length: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(deny_unknown_fields)] #[serde(rename_all = "camelCase")] struct WordsLevelPositions { level_group_size: Option, min_level_size: Option, } #[tokio::main] async fn main() -> anyhow::Result<()> { let opt = Opt::from_args(); stderrlog::new() .verbosity(opt.verbose) .show_level(false) .timestamp(stderrlog::Timestamp::Off) .init()?; create_dir_all(&opt.database)?; let mut options = EnvOpenOptions::new(); options.map_size(opt.database_size.get_bytes() as usize); // Setup the global thread pool let jobs = opt.indexer.indexing_jobs.unwrap_or(0); let pool = rayon::ThreadPoolBuilder::new().num_threads(jobs).build()?; let config = IndexerConfig { max_nb_chunks: opt.indexer.max_nb_chunks, chunk_compression_level: opt.indexer.chunk_compression_level, max_positions_per_attributes: opt.indexer.max_positions_per_attributes, thread_pool: Some(pool), log_every_n: Some(opt.indexer.log_every_n), max_memory: Some(opt.indexer.max_memory.get_bytes() as usize), chunk_compression_type: opt.indexer.chunk_compression_type.unwrap_or(CompressionType::None), ..Default::default() }; GLOBAL_CONFIG.set(config).unwrap(); // Open the LMDB database. let index = Index::new(options, &opt.database)?; // Setup the LMDB based update database. let mut update_store_options = EnvOpenOptions::new(); update_store_options.map_size(opt.update_database_size.get_bytes() as usize); let update_store_path = opt.database.join("updates.mdb"); create_dir_all(&update_store_path)?; let (update_status_sender, _) = broadcast::channel(100); let update_status_sender_cloned = update_status_sender.clone(); let index_cloned = index.clone(); let update_store = UpdateStore::open( update_store_options, update_store_path, // the type hint is necessary: https://github.com/rust-lang/rust/issues/32600 move |update_id, meta, content: &_| { // We prepare the update by using the update builder. let before_update = Instant::now(); // we extract the update type and execute the update itself. let result: anyhow::Result<()> = (|| match meta { UpdateMeta::DocumentsAddition { method, format, encoding } => { // We must use the write transaction of the update here. let mut wtxn = index_cloned.write_txn()?; let update_method = match method.as_str() { "replace" => IndexDocumentsMethod::ReplaceDocuments, "update" => IndexDocumentsMethod::UpdateDocuments, otherwise => panic!("invalid indexing method {:?}", otherwise), }; let indexing_config = IndexDocumentsConfig { update_method, autogenerate_docids: true, ..Default::default() }; let indexing_callback = |indexing_step| { let (current, total) = match indexing_step { RemapDocumentAddition { documents_seen } => (documents_seen, None), ComputeIdsAndMergeDocuments { documents_seen, total_documents } => { (documents_seen, Some(total_documents)) } IndexDocuments { documents_seen, total_documents } => { (documents_seen, Some(total_documents)) } MergeDataIntoFinalDatabase { databases_seen, total_databases } => { (databases_seen, Some(total_databases)) } }; let _ = update_status_sender_cloned.send(UpdateStatus::Progressing { update_id, meta: UpdateMetaProgress::DocumentsAddition { step: indexing_step.step(), total_steps: indexing_step.number_of_steps(), current, total, }, }); }; let mut builder = milli::update::IndexDocuments::new( &mut wtxn, &index_cloned, GLOBAL_CONFIG.get().unwrap(), indexing_config, indexing_callback, )?; let reader = match encoding.as_deref() { Some("gzip") => Box::new(GzDecoder::new(content)), None => Box::new(content) as Box, otherwise => panic!("invalid encoding format {:?}", otherwise), }; let documents = match format.as_str() { "csv" => documents_from_csv(reader)?, "json" => documents_from_json(reader)?, "jsonl" => documents_from_jsonl(reader)?, otherwise => panic!("invalid update format {:?}", otherwise), }; let documents = DocumentBatchReader::from_reader(Cursor::new(documents))?; builder.add_documents(documents)?; let result = builder.execute(); match result { Ok(_) => wtxn.commit().map_err(Into::into), Err(e) => Err(e.into()), } } UpdateMeta::ClearDocuments => { // We must use the write transaction of the update here. let mut wtxn = index_cloned.write_txn()?; let builder = ClearDocuments::new(&mut wtxn, &index_cloned); match builder.execute() { Ok(_count) => wtxn.commit().map_err(Into::into), Err(e) => Err(e.into()), } } UpdateMeta::Settings(settings) => { // We must use the write transaction of the update here. let mut wtxn = index_cloned.write_txn()?; let mut builder = milli::update::Settings::new( &mut wtxn, &index_cloned, GLOBAL_CONFIG.get().unwrap(), ); // We transpose the settings JSON struct into a real setting update. match settings.searchable_attributes { Setting::Set(searchable_attributes) => { builder.set_searchable_fields(searchable_attributes) } Setting::Reset => builder.reset_searchable_fields(), Setting::NotSet => (), } // We transpose the settings JSON struct into a real setting update. match settings.displayed_attributes { Setting::Set(displayed_attributes) => { builder.set_displayed_fields(displayed_attributes) } Setting::Reset => builder.reset_displayed_fields(), Setting::NotSet => (), } // We transpose the settings JSON struct into a real setting update. match settings.filterable_attributes { Setting::Set(filterable_attributes) => { builder.set_filterable_fields(filterable_attributes) } Setting::Reset => builder.reset_filterable_fields(), Setting::NotSet => (), } // We transpose the settings JSON struct into a real setting update. match settings.sortable_attributes { Setting::Set(sortable_attributes) => { builder.set_sortable_fields(sortable_attributes) } Setting::Reset => builder.reset_sortable_fields(), Setting::NotSet => (), } // We transpose the settings JSON struct into a real setting update. match settings.criteria { Setting::Set(criteria) => builder.set_criteria(criteria), Setting::Reset => builder.reset_criteria(), Setting::NotSet => (), } // We transpose the settings JSON struct into a real setting update. match settings.stop_words { Setting::Set(stop_words) => builder.set_stop_words(stop_words), Setting::Reset => builder.reset_stop_words(), Setting::NotSet => (), } // We transpose the settings JSON struct into a real setting update. match settings.synonyms { Setting::Set(synonyms) => builder.set_synonyms(synonyms), Setting::Reset => builder.reset_synonyms(), Setting::NotSet => (), } let result = builder.execute(|indexing_step| { let (current, total) = match indexing_step { RemapDocumentAddition { documents_seen } => (documents_seen, None), ComputeIdsAndMergeDocuments { documents_seen, total_documents } => { (documents_seen, Some(total_documents)) } IndexDocuments { documents_seen, total_documents } => { (documents_seen, Some(total_documents)) } MergeDataIntoFinalDatabase { databases_seen, total_databases } => { (databases_seen, Some(total_databases)) } }; let _ = update_status_sender_cloned.send(UpdateStatus::Progressing { update_id, meta: UpdateMetaProgress::DocumentsAddition { step: indexing_step.step(), total_steps: indexing_step.number_of_steps(), current, total, }, }); }); match result { Ok(_count) => wtxn.commit().map_err(Into::into), Err(e) => Err(e.into()), } } UpdateMeta::Facets(levels) => { // We must use the write transaction of the update here. let mut wtxn = index_cloned.write_txn()?; let mut builder = milli::update::Facets::new(&mut wtxn, &index_cloned); if let Some(value) = levels.level_group_size { builder.level_group_size(value); } if let Some(value) = levels.min_level_size { builder.min_level_size(value); } match builder.execute() { Ok(()) => wtxn.commit().map_err(Into::into), Err(e) => Err(e.into()), } } })(); let meta = match result { Ok(()) => { format!("valid update content processed in {:.02?}", before_update.elapsed()) } Err(e) => format!("error while processing update content: {:?}", e), }; let processed = UpdateStatus::Processed { update_id, meta: meta.clone() }; let _ = update_status_sender_cloned.send(processed); Ok(meta) }, )?; // The database name will not change. let db_name = opt.database.file_stem().and_then(|s| s.to_str()).unwrap_or("").to_string(); let lmdb_path = opt.database.join("data.mdb"); // We run and wait on the HTTP server // Expose an HTML page to debug the search in a browser let db_name_cloned = db_name.clone(); let lmdb_path_cloned = lmdb_path.clone(); let index_cloned = index.clone(); let dash_html_route = warp::filters::method::get().and(warp::filters::path::end()).map(move || { // We retrieve the database size. let db_size = File::open(lmdb_path_cloned.clone()).unwrap().metadata().unwrap().len() as usize; // And the number of documents in the database. let rtxn = index_cloned.read_txn().unwrap(); let docs_count = index_cloned.clone().number_of_documents(&rtxn).unwrap() as usize; IndexTemplate { db_name: db_name_cloned.clone(), db_size, docs_count } }); let update_store_cloned = update_store.clone(); let lmdb_path_cloned = lmdb_path.clone(); let index_cloned = index.clone(); let updates_list_or_html_route = warp::filters::method::get() .and(warp::header("Accept")) .and(warp::path!("updates")) .map(move |header: String| { let update_store = update_store_cloned.clone(); let mut updates = update_store .iter_metas(|processed, aborted, pending| { let mut updates = Vec::>::new(); for result in processed { let (uid, meta) = result?; updates.push(UpdateStatus::Processed { update_id: uid.get(), meta }); } for result in aborted { let (uid, meta) = result?; updates.push(UpdateStatus::Aborted { update_id: uid.get(), meta }); } for result in pending { let (uid, meta) = result?; updates.push(UpdateStatus::Pending { update_id: uid.get(), meta }); } Ok(updates) }) .unwrap(); updates.sort_unstable_by(|s1, s2| s1.update_id().cmp(&s2.update_id()).reverse()); if header.contains("text/html") { // We retrieve the database size. let db_size = File::open(lmdb_path_cloned.clone()).unwrap().metadata().unwrap().len() as usize; // And the number of documents in the database. let rtxn = index_cloned.read_txn().unwrap(); let docs_count = index_cloned.clone().number_of_documents(&rtxn).unwrap() as usize; let template = UpdatesTemplate { db_name: db_name.clone(), db_size, docs_count, updates }; Box::new(template) as Box } else { Box::new(warp::reply::json(&updates)) } }); let dash_bulma_route = warp::filters::method::get().and(warp::path!("bulma.min.css")).map(|| { Response::builder() .header("content-type", "text/css; charset=utf-8") .body(include_str!("../public/bulma.min.css")) }); let dash_bulma_dark_route = warp::filters::method::get().and(warp::path!("bulma-prefers-dark.min.css")).map(|| { Response::builder() .header("content-type", "text/css; charset=utf-8") .body(include_str!("../public/bulma-prefers-dark.min.css")) }); let dash_style_route = warp::filters::method::get().and(warp::path!("style.css")).map(|| { Response::builder() .header("content-type", "text/css; charset=utf-8") .body(include_str!("../public/style.css")) }); let dash_jquery_route = warp::filters::method::get().and(warp::path!("jquery-3.4.1.min.js")).map(|| { Response::builder() .header("content-type", "application/javascript; charset=utf-8") .body(include_str!("../public/jquery-3.4.1.min.js")) }); let dash_filesize_route = warp::filters::method::get().and(warp::path!("filesize.min.js")).map(|| { Response::builder() .header("content-type", "application/javascript; charset=utf-8") .body(include_str!("../public/filesize.min.js")) }); let dash_script_route = warp::filters::method::get().and(warp::path!("script.js")).map(|| { Response::builder() .header("content-type", "application/javascript; charset=utf-8") .body(include_str!("../public/script.js")) }); let updates_script_route = warp::filters::method::get().and(warp::path!("updates-script.js")).map(|| { Response::builder() .header("content-type", "application/javascript; charset=utf-8") .body(include_str!("../public/updates-script.js")) }); let dash_logo_white_route = warp::filters::method::get().and(warp::path!("logo-white.svg")).map(|| { Response::builder() .header("content-type", "image/svg+xml") .body(include_str!("../public/logo-white.svg")) }); let dash_logo_black_route = warp::filters::method::get().and(warp::path!("logo-black.svg")).map(|| { Response::builder() .header("content-type", "image/svg+xml") .body(include_str!("../public/logo-black.svg")) }); #[derive(Debug, Deserialize)] #[serde(untagged)] enum UntaggedEither { Left(L), Right(R), } impl From> for Either { fn from(value: UntaggedEither) -> Either { match value { UntaggedEither::Left(left) => Either::Left(left), UntaggedEither::Right(right) => Either::Right(right), } } } #[derive(Debug, Deserialize)] #[serde(deny_unknown_fields)] #[serde(rename_all = "camelCase")] struct QueryBody { query: Option, filters: Option, sort: Option, facet_filters: Option, String>>>, facet_distribution: Option, limit: Option, } #[derive(Debug, Serialize)] #[serde(rename_all = "camelCase")] struct Answer { documents: Vec>, number_of_candidates: u64, facets: BTreeMap>, } let disable_highlighting = opt.disable_highlighting; let index_cloned = index.clone(); let query_route = warp::filters::method::post() .and(warp::path!("query")) .and(warp::body::json()) .map(move |query: QueryBody| { let before_search = Instant::now(); let index = index_cloned.clone(); let rtxn = index.read_txn().unwrap(); let mut search = index.search(&rtxn); if let Some(query) = query.query { search.query(query); } let filters = match query.filters.as_ref() { Some(condition) if !condition.trim().is_empty() => { MilliFilter::from_str(condition).unwrap() } _otherwise => None, }; let facet_filters = match query.facet_filters.as_ref() { Some(array) => { let eithers = array.iter().map(|either| match either { UntaggedEither::Left(l) => { Either::Left(l.iter().map(|s| s.as_str()).collect::>()) } UntaggedEither::Right(r) => Either::Right(r.as_str()), }); MilliFilter::from_array(eithers).unwrap() } _otherwise => None, }; let condition = match (filters, facet_filters) { (Some(filters), Some(facet_filters)) => Some(FilterCondition::And( Box::new(filters.into()), Box::new(facet_filters.into()), )), (Some(condition), None) | (None, Some(condition)) => Some(condition.into()), _otherwise => None, }; if let Some(condition) = condition { search.filter(condition.into()); } if let Some(limit) = query.limit { search.limit(limit); } if let Some(sort) = query.sort { search.sort_criteria(vec![sort.parse().map_err(SortError::from).unwrap()]); } let SearchResult { matching_words, candidates, documents_ids } = search.execute().unwrap(); let number_of_candidates = candidates.len(); let facets = if query.facet_distribution == Some(true) { Some(index.facets_distribution(&rtxn).candidates(candidates).execute().unwrap()) } else { None }; let mut documents = Vec::new(); let fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); let displayed_fields = match index.displayed_fields_ids(&rtxn).unwrap() { Some(fields) => fields, None => fields_ids_map.iter().map(|(id, _)| id).collect(), }; let attributes_to_highlight = match index.searchable_fields(&rtxn).unwrap() { Some(fields) => fields.into_iter().map(String::from).collect(), None => fields_ids_map.iter().map(|(_, name)| name).map(String::from).collect(), }; let mut matcher_builder = MatcherBuilder::new(matching_words, TokenizerBuilder::default().build()); matcher_builder.highlight_prefix("".to_string()); matcher_builder.highlight_suffix("".to_string()); let highlighter = Highlighter::new(matcher_builder); for (_id, obkv) in index.documents(&rtxn, documents_ids).unwrap() { let mut object = obkv_to_json(&displayed_fields, &fields_ids_map, obkv).unwrap(); if !disable_highlighting { highlighter.highlight_record(&mut object, &attributes_to_highlight); } documents.push(object); } let answer = Answer { documents, number_of_candidates, facets: facets.unwrap_or_default() }; Response::builder() .header("Content-Type", "application/json") .header("Time-Ms", before_search.elapsed().as_millis().to_string()) .body(serde_json::to_string(&answer).unwrap()) }); let index_cloned = index.clone(); let document_route = warp::filters::method::get().and(warp::path!("document" / String)).map( move |id: String| { let index = index_cloned.clone(); let rtxn = index.read_txn().unwrap(); let external_documents_ids = index.external_documents_ids(&rtxn).unwrap(); let fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); let displayed_fields = match index.displayed_fields_ids(&rtxn).unwrap() { Some(fields) => fields, None => fields_ids_map.iter().map(|(id, _)| id).collect(), }; match external_documents_ids.get(&id) { Some(document_id) => { let document_id = document_id as u32; let (_, obkv) = index.documents(&rtxn, Some(document_id)).unwrap().pop().unwrap(); let document = obkv_to_json(&displayed_fields, &fields_ids_map, obkv).unwrap(); Response::builder() .header("Content-Type", "application/json") .body(serde_json::to_string(&document).unwrap()) } None => Response::builder() .status(404) .body(format!("Document with id {:?} not found.", id)), } }, ); async fn buf_stream( update_store: Arc>, update_status_sender: broadcast::Sender< UpdateStatus, >, update_method: Option, format: String, encoding: Option, mut stream: impl futures::Stream> + Unpin, ) -> Result { let file = tokio::task::block_in_place(tempfile::tempfile).unwrap(); let mut file = TFile::from_std(file); while let Some(result) = stream.next().await { let mut bytes = Vec::new(); result.unwrap().reader().read_to_end(&mut bytes).unwrap(); file.write_all(&bytes[..]).await.unwrap(); } let file = file.into_std().await; let mmap = unsafe { memmap2::Mmap::map(&file).expect("can't map file") }; let method = match update_method.as_deref() { Some("replace") => String::from("replace"), Some("update") => String::from("update"), _ => String::from("replace"), }; let meta = UpdateMeta::DocumentsAddition { method, format, encoding }; let update_id = update_store.register_update(&meta, &mmap[..]).unwrap(); let _ = update_status_sender.send(UpdateStatus::Pending { update_id, meta }); eprintln!("update {} registered", update_id); Ok(warp::reply()) } #[derive(Deserialize)] struct QueryUpdate { method: Option, } let update_store_cloned = update_store.clone(); let update_status_sender_cloned = update_status_sender.clone(); let indexing_route = warp::filters::method::post() .and(warp::path!("documents")) .and(warp::header::header("content-type")) .and(warp::header::optional::("content-encoding")) .and(warp::query::query()) .and(warp::body::stream()) .and_then(move |content_type: String, content_encoding, params: QueryUpdate, stream| { let format = match content_type.as_str() { "text/csv" => "csv", "application/json" => "json", "application/x-ndjson" => "jsonl", otherwise => panic!("invalid update format: {}", otherwise), }; buf_stream( update_store_cloned.clone(), update_status_sender_cloned.clone(), params.method, format.to_string(), content_encoding, stream, ) }); let update_store_cloned = update_store.clone(); let update_status_sender_cloned = update_status_sender.clone(); let clearing_route = warp::filters::method::post().and(warp::path!("clear-documents")).map(move || { let meta = UpdateMeta::ClearDocuments; let update_id = update_store_cloned.register_update(&meta, &[]).unwrap(); let _ = update_status_sender_cloned.send(UpdateStatus::Pending { update_id, meta }); eprintln!("update {} registered", update_id); Ok(warp::reply()) }); let update_store_cloned = update_store.clone(); let update_status_sender_cloned = update_status_sender.clone(); let change_settings_route = warp::filters::method::post() .and(warp::path!("settings")) .and(warp::body::json()) .map(move |settings: Settings| { let meta = UpdateMeta::Settings(settings); let update_id = update_store_cloned.register_update(&meta, &[]).unwrap(); let _ = update_status_sender_cloned.send(UpdateStatus::Pending { update_id, meta }); eprintln!("update {} registered", update_id); Ok(warp::reply()) }); let update_store_cloned = update_store.clone(); let update_status_sender_cloned = update_status_sender.clone(); let change_facet_levels_route = warp::filters::method::post() .and(warp::path!("facet-level-sizes")) .and(warp::body::json()) .map(move |levels: Facets| { let meta = UpdateMeta::Facets(levels); let update_id = update_store_cloned.register_update(&meta, &[]).unwrap(); let _ = update_status_sender_cloned.send(UpdateStatus::Pending { update_id, meta }); eprintln!("update {} registered", update_id); warp::reply() }); let update_store_cloned = update_store.clone(); let update_status_sender_cloned = update_status_sender.clone(); let abort_update_id_route = warp::filters::method::delete() .and(warp::path!("update" / u64)) .map(move |update_id: u64| { if let Some(meta) = update_store_cloned.abort_update(update_id).unwrap() { let _ = update_status_sender_cloned.send(UpdateStatus::Aborted { update_id, meta }); eprintln!("update {} aborted", update_id); } warp::reply() }); let update_store_cloned = update_store.clone(); let update_status_sender_cloned = update_status_sender.clone(); let abort_pending_updates_route = warp::filters::method::delete().and(warp::path!("updates")).map(move || { let updates = update_store_cloned.abort_pendings().unwrap(); for (update_id, meta) in updates { let _ = update_status_sender_cloned.send(UpdateStatus::Aborted { update_id, meta }); eprintln!("update {} aborted", update_id); } warp::reply() }); let update_ws_route = warp::ws().and(warp::path!("updates" / "ws")).map(move |ws: warp::ws::Ws| { // And then our closure will be called when it completes... let update_status_receiver = update_status_sender.subscribe(); ws.on_upgrade(|websocket| { // Just echo all updates messages... BroadcastStream::new(update_status_receiver) .flat_map(|result| match result { Ok(status) => { let msg = serde_json::to_string(&status).unwrap(); stream::iter(Some(Ok(Message::text(msg)))) } Err(e) => { eprintln!("channel error: {:?}", e); stream::iter(None) } }) .forward(websocket) .map(|result| { if let Err(e) = result { eprintln!("websocket error: {:?}", e); } }) }) }); let die_route = warp::filters::method::get().and(warp::path!("die")).map(move || { eprintln!("Killed by an HTTP request received on the die route"); std::process::exit(0); #[allow(unreachable_code)] warp::reply() }); let routes = dash_html_route .or(updates_list_or_html_route) .or(dash_bulma_route) .or(dash_bulma_dark_route) .or(dash_style_route) .or(dash_jquery_route) .or(dash_filesize_route) .or(dash_script_route) .or(updates_script_route) .or(dash_logo_white_route) .or(dash_logo_black_route) .or(query_route) .or(document_route) .or(indexing_route) .or(abort_update_id_route) .or(abort_pending_updates_route) .or(clearing_route) .or(change_settings_route) .or(change_facet_levels_route) .or(update_ws_route) .or(die_route); let addr = SocketAddr::from_str(&opt.http_listen_addr)?; warp::serve(routes).run(addr).await; Ok(()) } fn documents_from_jsonl(reader: impl io::Read) -> anyhow::Result> { let mut writer = Cursor::new(Vec::new()); let mut documents = milli::documents::DocumentBatchBuilder::new(&mut writer)?; for result in BufReader::new(reader).lines() { let line = result?; documents.extend_from_json(Cursor::new(line))?; } documents.finish()?; Ok(writer.into_inner()) } fn documents_from_json(reader: impl io::Read) -> anyhow::Result> { let mut writer = Cursor::new(Vec::new()); let mut documents = milli::documents::DocumentBatchBuilder::new(&mut writer)?; documents.extend_from_json(reader)?; documents.finish()?; Ok(writer.into_inner()) } fn documents_from_csv(reader: impl io::Read) -> anyhow::Result> { let mut writer = Cursor::new(Vec::new()); milli::documents::DocumentBatchBuilder::from_csv(reader, &mut writer)?.finish()?; Ok(writer.into_inner()) } #[cfg(test)] mod tests { use maplit::{btreeset, hashmap, hashset}; use milli::update::Setting; use serde_test::{assert_tokens, Token}; use crate::Settings; #[test] fn serde_settings_set() { let settings = Settings { displayed_attributes: Setting::Set(vec!["name".to_string()]), searchable_attributes: Setting::Set(vec!["age".to_string()]), filterable_attributes: Setting::Set(hashset! { "age".to_string() }), sortable_attributes: Setting::Set(hashset! { "age".to_string() }), criteria: Setting::Set(vec!["age:asc".to_string()]), stop_words: Setting::Set(btreeset! { "and".to_string() }), synonyms: Setting::Set(hashmap! { "alex".to_string() => vec!["alexey".to_string()] }), }; assert_tokens( &settings, &[ Token::Struct { name: "Settings", len: 7 }, Token::Str("displayedAttributes"), Token::Some, Token::Seq { len: Some(1) }, Token::Str("name"), Token::SeqEnd, Token::Str("searchableAttributes"), Token::Some, Token::Seq { len: Some(1) }, Token::Str("age"), Token::SeqEnd, Token::Str("filterableAttributes"), Token::Some, Token::Seq { len: Some(1) }, Token::Str("age"), Token::SeqEnd, Token::Str("sortableAttributes"), Token::Some, Token::Seq { len: Some(1) }, Token::Str("age"), Token::SeqEnd, Token::Str("criteria"), Token::Some, Token::Seq { len: Some(1) }, Token::Str("age:asc"), Token::SeqEnd, Token::Str("stopWords"), Token::Some, Token::Seq { len: Some(1) }, Token::Str("and"), Token::SeqEnd, Token::Str("synonyms"), Token::Some, Token::Map { len: Some(1) }, Token::Str("alex"), Token::Seq { len: Some(1) }, Token::Str("alexey"), Token::SeqEnd, Token::MapEnd, Token::StructEnd, ], ); } #[test] fn serde_settings_reset() { let settings = Settings { displayed_attributes: Setting::Reset, searchable_attributes: Setting::Reset, filterable_attributes: Setting::Reset, sortable_attributes: Setting::Reset, criteria: Setting::Reset, stop_words: Setting::Reset, synonyms: Setting::Reset, }; assert_tokens( &settings, &[ Token::Struct { name: "Settings", len: 7 }, Token::Str("displayedAttributes"), Token::None, Token::Str("searchableAttributes"), Token::None, Token::Str("filterableAttributes"), Token::None, Token::Str("sortableAttributes"), Token::None, Token::Str("criteria"), Token::None, Token::Str("stopWords"), Token::None, Token::Str("synonyms"), Token::None, Token::StructEnd, ], ); } #[test] fn serde_settings_notset() { let settings = Settings { displayed_attributes: Setting::NotSet, searchable_attributes: Setting::NotSet, filterable_attributes: Setting::NotSet, sortable_attributes: Setting::NotSet, criteria: Setting::NotSet, stop_words: Setting::NotSet, synonyms: Setting::NotSet, }; assert_tokens(&settings, &[Token::Struct { name: "Settings", len: 0 }, Token::StructEnd]); } }