mod update_store;

use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::fmt::Display;
use std::fs::{create_dir_all, File};
use std::io::{BufRead, BufReader, Cursor, Read};
use std::net::SocketAddr;
use std::num::{NonZeroU32, NonZeroUsize};
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Instant;
use std::{io, mem};

use askama_warp::Template;
use byte_unit::Byte;
use either::Either;
use flate2::read::GzDecoder;
use futures::{stream, FutureExt, StreamExt};
use heed::EnvOpenOptions;
use milli::documents::DocumentBatchReader;
use milli::tokenizer::{Analyzer, AnalyzerConfig};
use milli::update::UpdateIndexingStep::*;
use milli::update::{
    ClearDocuments, IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, Setting,
};
use milli::{
    obkv_to_json, CompressionType, Filter as MilliFilter, FilterCondition, FormatOptions, Index,
    MatcherBuilder, SearchResult, SortError,
};
use once_cell::sync::OnceCell;
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
use structopt::StructOpt;
use tokio::fs::File as TFile;
use tokio::io::AsyncWriteExt;
use tokio::sync::broadcast;
use tokio_stream::wrappers::BroadcastStream;
use warp::filters::ws::Message;
use warp::http::Response;
use warp::Filter;

use self::update_store::UpdateStore;

#[cfg(target_os = "linux")]
#[global_allocator]
static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;

static GLOBAL_CONFIG: OnceCell<IndexerConfig> = OnceCell::new();

#[derive(Debug, StructOpt)]
/// The HTTP main server of the milli project.
pub struct Opt {
    /// The database path where the LMDB database is located.
    /// It is created if it doesn't already exist.
    #[structopt(long = "db", parse(from_os_str))]
    database: PathBuf,

    /// The maximum size the database can take on disk. It is recommended to specify
    /// the whole disk space (value must be a multiple of a page size).
    #[structopt(long = "db-size", default_value = "100 GiB")]
    database_size: Byte,

    /// The maximum size the database that stores the updates can take on disk. It is recommended
    /// to specify the whole disk space (value must be a multiple of a page size).
    #[structopt(long = "udb-size", default_value = "10 GiB")]
    update_database_size: Byte,

    /// Disable document highlighting on the dashboard.
    #[structopt(long)]
    disable_highlighting: bool,

    /// Verbose mode (-v, -vv, -vvv, etc.)
    #[structopt(short, long, parse(from_occurrences))]
    verbose: usize,

    /// The ip and port on which the database will listen for HTTP requests.
    #[structopt(short = "l", long, default_value = "127.0.0.1:9700")]
    http_listen_addr: String,

    #[structopt(flatten)]
    indexer: IndexerOpt,
}

#[derive(Debug, Clone, StructOpt)]
pub struct IndexerOpt {
    /// The amount of documents to skip before printing
    /// a log regarding the indexing advancement.
    #[structopt(long, default_value = "100000")] // 100k
    pub log_every_n: usize,

    /// MTBL max number of chunks in bytes.
    #[structopt(long)]
    pub max_nb_chunks: Option<usize>,

    /// The maximum amount of memory to use for the MTBL buffer. It is recommended
    /// to use something like 80%-90% of the available memory.
    ///
    /// It is automatically split by the number of jobs e.g. if you use 7 jobs
    /// and 7 GB of max memory, each thread will use a maximum of 1 GB.
    #[structopt(long, default_value = "7 GiB")]
    pub max_memory: Byte,

    /// Size of the linked hash map cache when indexing.
    /// The bigger it is, the faster the indexing is but the more memory it takes.
    #[structopt(long, default_value = "500")]
    pub linked_hash_map_size: usize,

    /// The name of the compression algorithm to use when compressing intermediate
    /// chunks during indexing documents.
    ///
    /// Choosing a fast algorithm will make the indexing faster but may consume more memory.
    #[structopt(long, possible_values = &["snappy", "zlib", "lz4", "lz4hc", "zstd"])]
    pub chunk_compression_type: Option<CompressionType>,

    /// The level of compression of the chosen algorithm.
    #[structopt(long, requires = "chunk-compression-type")]
    pub chunk_compression_level: Option<u32>,

    /// The number of bytes to remove from the begining of the chunks while reading/sorting
    /// or merging them.
    ///
    /// File fusing must only be enable on file systems that support the `FALLOC_FL_COLLAPSE_RANGE`,
    /// (i.e. ext4 and XFS). File fusing will only work if the `enable-chunk-fusing` is set.
    #[structopt(long, default_value = "4 GiB")]
    pub chunk_fusing_shrink_size: Byte,

    /// Enable the chunk fusing or not, this reduces the amount of disk used by a factor of 2.
    #[structopt(long)]
    pub enable_chunk_fusing: bool,

    /// Number of parallel jobs for indexing, defaults to # of CPUs.
    #[structopt(long)]
    pub indexing_jobs: Option<usize>,

    /// Maximum relative position in an attribute for a word to be indexed.
    /// Any value higher than 65535 will be clamped.
    #[structopt(long)]
    pub max_positions_per_attributes: Option<u32>,
}

struct Highlighter<'a, A> {
    analyzer: Analyzer<'a, A>,
}

impl<'a, A: AsRef<[u8]>> Highlighter<'a, A> {
    fn new(stop_words: &'a fst::Set<A>) -> Self {
        let mut config = AnalyzerConfig::default();
        config.stop_words(stop_words);
        let analyzer = Analyzer::new(config);

        Self { analyzer }
    }

    fn highlight_value(&self, value: Value, matcher_builder: &MatcherBuilder) -> Value {
        match value {
            Value::Null => Value::Null,
            Value::Bool(boolean) => Value::Bool(boolean),
            Value::Number(number) => Value::Number(number),
            Value::String(old_string) => {
                let analyzed = self.analyzer.analyze(&old_string);
                let analyzed: Vec<_> = analyzed.tokens().collect();
                let mut matcher = matcher_builder.build(&analyzed[..], &old_string);

                let format_options = FormatOptions { highlight: true, crop: Some(10) };

                Value::String(matcher.format(format_options).to_string())
            }
            Value::Array(values) => Value::Array(
                values.into_iter().map(|v| self.highlight_value(v, matcher_builder)).collect(),
            ),
            Value::Object(object) => Value::Object(
                object
                    .into_iter()
                    .map(|(k, v)| (k, self.highlight_value(v, matcher_builder)))
                    .collect(),
            ),
        }
    }

    fn highlight_record(
        &self,
        object: &mut Map<String, Value>,
        matcher_builder: &MatcherBuilder,
        attributes_to_highlight: &HashSet<String>,
    ) {
        // TODO do we need to create a string for element that are not and needs to be highlight?
        for (key, value) in object.iter_mut() {
            if attributes_to_highlight.contains(key) {
                let old_value = mem::take(value);
                *value = self.highlight_value(old_value, matcher_builder);
            }
        }
    }
}

#[derive(Template)]
#[template(path = "index.html")]
struct IndexTemplate {
    db_name: String,
    db_size: usize,
    docs_count: usize,
}

#[derive(Template)]
#[template(path = "updates.html")]
struct UpdatesTemplate<M: Serialize + Send, P: Serialize + Send, N: Serialize + Send + Display> {
    db_name: String,
    db_size: usize,
    docs_count: usize,
    updates: Vec<UpdateStatus<M, P, N>>,
}

#[derive(Debug, Clone, Serialize)]
#[serde(tag = "type")]
enum UpdateStatus<M, P, N> {
    Pending { update_id: u64, meta: M },
    Progressing { update_id: u64, meta: P },
    Processed { update_id: u64, meta: N },
    Aborted { update_id: u64, meta: M },
}

impl<M, P, N> UpdateStatus<M, P, N> {
    fn update_id(&self) -> u64 {
        match self {
            UpdateStatus::Pending { update_id, .. } => *update_id,
            UpdateStatus::Progressing { update_id, .. } => *update_id,
            UpdateStatus::Processed { update_id, .. } => *update_id,
            UpdateStatus::Aborted { update_id, .. } => *update_id,
        }
    }
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
enum UpdateMeta {
    DocumentsAddition { method: String, format: String, encoding: Option<String> },
    ClearDocuments,
    Settings(Settings),
    Facets(Facets),
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
enum UpdateMetaProgress {
    DocumentsAddition { step: usize, total_steps: usize, current: usize, total: Option<usize> },
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(deny_unknown_fields)]
#[serde(rename_all = "camelCase")]
struct Settings {
    #[serde(default, skip_serializing_if = "Setting::is_not_set")]
    displayed_attributes: Setting<Vec<String>>,

    #[serde(default, skip_serializing_if = "Setting::is_not_set")]
    searchable_attributes: Setting<Vec<String>>,

    #[serde(default, skip_serializing_if = "Setting::is_not_set")]
    filterable_attributes: Setting<HashSet<String>>,

    #[serde(default, skip_serializing_if = "Setting::is_not_set")]
    sortable_attributes: Setting<HashSet<String>>,

    #[serde(default, skip_serializing_if = "Setting::is_not_set")]
    criteria: Setting<Vec<String>>,

    #[serde(default, skip_serializing_if = "Setting::is_not_set")]
    stop_words: Setting<BTreeSet<String>>,

    #[serde(default, skip_serializing_if = "Setting::is_not_set")]
    synonyms: Setting<HashMap<String, Vec<String>>>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
#[serde(rename_all = "camelCase")]
struct Facets {
    level_group_size: Option<NonZeroUsize>,
    min_level_size: Option<NonZeroUsize>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
#[serde(rename_all = "camelCase")]
struct WordsPrefixes {
    threshold: Option<f64>,
    max_prefix_length: Option<usize>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
#[serde(rename_all = "camelCase")]
struct WordsLevelPositions {
    level_group_size: Option<NonZeroU32>,
    min_level_size: Option<NonZeroU32>,
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let opt = Opt::from_args();

    stderrlog::new()
        .verbosity(opt.verbose)
        .show_level(false)
        .timestamp(stderrlog::Timestamp::Off)
        .init()?;

    create_dir_all(&opt.database)?;
    let mut options = EnvOpenOptions::new();
    options.map_size(opt.database_size.get_bytes() as usize);

    // Setup the global thread pool
    let jobs = opt.indexer.indexing_jobs.unwrap_or(0);
    let pool = rayon::ThreadPoolBuilder::new().num_threads(jobs).build()?;

    let config = IndexerConfig {
        max_nb_chunks: opt.indexer.max_nb_chunks,
        chunk_compression_level: opt.indexer.chunk_compression_level,
        max_positions_per_attributes: opt.indexer.max_positions_per_attributes,
        thread_pool: Some(pool),
        log_every_n: Some(opt.indexer.log_every_n),
        max_memory: Some(opt.indexer.max_memory.get_bytes() as usize),
        chunk_compression_type: opt.indexer.chunk_compression_type.unwrap_or(CompressionType::None),
        ..Default::default()
    };

    GLOBAL_CONFIG.set(config).unwrap();

    // Open the LMDB database.
    let index = Index::new(options, &opt.database)?;

    // Setup the LMDB based update database.
    let mut update_store_options = EnvOpenOptions::new();
    update_store_options.map_size(opt.update_database_size.get_bytes() as usize);

    let update_store_path = opt.database.join("updates.mdb");
    create_dir_all(&update_store_path)?;

    let (update_status_sender, _) = broadcast::channel(100);
    let update_status_sender_cloned = update_status_sender.clone();
    let index_cloned = index.clone();
    let update_store = UpdateStore::open(
        update_store_options,
        update_store_path,
        // the type hint is necessary: https://github.com/rust-lang/rust/issues/32600
        move |update_id, meta, content: &_| {
            // We prepare the update by using the update builder.

            let before_update = Instant::now();
            // we extract the update type and execute the update itself.
            let result: anyhow::Result<()> = (|| match meta {
                UpdateMeta::DocumentsAddition { method, format, encoding } => {
                    // We must use the write transaction of the update here.
                    let mut wtxn = index_cloned.write_txn()?;
                    let update_method = match method.as_str() {
                        "replace" => IndexDocumentsMethod::ReplaceDocuments,
                        "update" => IndexDocumentsMethod::UpdateDocuments,
                        otherwise => panic!("invalid indexing method {:?}", otherwise),
                    };
                    let indexing_config = IndexDocumentsConfig {
                        update_method,
                        autogenerate_docids: true,
                        ..Default::default()
                    };

                    let indexing_callback = |indexing_step| {
                        let (current, total) = match indexing_step {
                            RemapDocumentAddition { documents_seen } => (documents_seen, None),
                            ComputeIdsAndMergeDocuments { documents_seen, total_documents } => {
                                (documents_seen, Some(total_documents))
                            }
                            IndexDocuments { documents_seen, total_documents } => {
                                (documents_seen, Some(total_documents))
                            }
                            MergeDataIntoFinalDatabase { databases_seen, total_databases } => {
                                (databases_seen, Some(total_databases))
                            }
                        };
                        let _ = update_status_sender_cloned.send(UpdateStatus::Progressing {
                            update_id,
                            meta: UpdateMetaProgress::DocumentsAddition {
                                step: indexing_step.step(),
                                total_steps: indexing_step.number_of_steps(),
                                current,
                                total,
                            },
                        });
                    };

                    let mut builder = milli::update::IndexDocuments::new(
                        &mut wtxn,
                        &index_cloned,
                        GLOBAL_CONFIG.get().unwrap(),
                        indexing_config,
                        indexing_callback,
                    )?;

                    let reader = match encoding.as_deref() {
                        Some("gzip") => Box::new(GzDecoder::new(content)),
                        None => Box::new(content) as Box<dyn io::Read>,
                        otherwise => panic!("invalid encoding format {:?}", otherwise),
                    };

                    let documents = match format.as_str() {
                        "csv" => documents_from_csv(reader)?,
                        "json" => documents_from_json(reader)?,
                        "jsonl" => documents_from_jsonl(reader)?,
                        otherwise => panic!("invalid update format {:?}", otherwise),
                    };

                    let documents = DocumentBatchReader::from_reader(Cursor::new(documents))?;

                    builder.add_documents(documents)?;

                    let result = builder.execute();

                    match result {
                        Ok(_) => wtxn.commit().map_err(Into::into),
                        Err(e) => Err(e.into()),
                    }
                }
                UpdateMeta::ClearDocuments => {
                    // We must use the write transaction of the update here.
                    let mut wtxn = index_cloned.write_txn()?;
                    let builder = ClearDocuments::new(&mut wtxn, &index_cloned);

                    match builder.execute() {
                        Ok(_count) => wtxn.commit().map_err(Into::into),
                        Err(e) => Err(e.into()),
                    }
                }
                UpdateMeta::Settings(settings) => {
                    // We must use the write transaction of the update here.
                    let mut wtxn = index_cloned.write_txn()?;
                    let mut builder = milli::update::Settings::new(
                        &mut wtxn,
                        &index_cloned,
                        GLOBAL_CONFIG.get().unwrap(),
                    );

                    // We transpose the settings JSON struct into a real setting update.
                    match settings.searchable_attributes {
                        Setting::Set(searchable_attributes) => {
                            builder.set_searchable_fields(searchable_attributes)
                        }
                        Setting::Reset => builder.reset_searchable_fields(),
                        Setting::NotSet => (),
                    }

                    // We transpose the settings JSON struct into a real setting update.
                    match settings.displayed_attributes {
                        Setting::Set(displayed_attributes) => {
                            builder.set_displayed_fields(displayed_attributes)
                        }
                        Setting::Reset => builder.reset_displayed_fields(),
                        Setting::NotSet => (),
                    }

                    // We transpose the settings JSON struct into a real setting update.
                    match settings.filterable_attributes {
                        Setting::Set(filterable_attributes) => {
                            builder.set_filterable_fields(filterable_attributes)
                        }
                        Setting::Reset => builder.reset_filterable_fields(),
                        Setting::NotSet => (),
                    }

                    // We transpose the settings JSON struct into a real setting update.
                    match settings.sortable_attributes {
                        Setting::Set(sortable_attributes) => {
                            builder.set_sortable_fields(sortable_attributes)
                        }
                        Setting::Reset => builder.reset_sortable_fields(),
                        Setting::NotSet => (),
                    }

                    // We transpose the settings JSON struct into a real setting update.
                    match settings.criteria {
                        Setting::Set(criteria) => builder.set_criteria(criteria),
                        Setting::Reset => builder.reset_criteria(),
                        Setting::NotSet => (),
                    }

                    // We transpose the settings JSON struct into a real setting update.
                    match settings.stop_words {
                        Setting::Set(stop_words) => builder.set_stop_words(stop_words),
                        Setting::Reset => builder.reset_stop_words(),
                        Setting::NotSet => (),
                    }

                    // We transpose the settings JSON struct into a real setting update.
                    match settings.synonyms {
                        Setting::Set(synonyms) => builder.set_synonyms(synonyms),
                        Setting::Reset => builder.reset_synonyms(),
                        Setting::NotSet => (),
                    }

                    let result = builder.execute(|indexing_step| {
                        let (current, total) = match indexing_step {
                            RemapDocumentAddition { documents_seen } => (documents_seen, None),
                            ComputeIdsAndMergeDocuments { documents_seen, total_documents } => {
                                (documents_seen, Some(total_documents))
                            }
                            IndexDocuments { documents_seen, total_documents } => {
                                (documents_seen, Some(total_documents))
                            }
                            MergeDataIntoFinalDatabase { databases_seen, total_databases } => {
                                (databases_seen, Some(total_databases))
                            }
                        };
                        let _ = update_status_sender_cloned.send(UpdateStatus::Progressing {
                            update_id,
                            meta: UpdateMetaProgress::DocumentsAddition {
                                step: indexing_step.step(),
                                total_steps: indexing_step.number_of_steps(),
                                current,
                                total,
                            },
                        });
                    });

                    match result {
                        Ok(_count) => wtxn.commit().map_err(Into::into),
                        Err(e) => Err(e.into()),
                    }
                }
                UpdateMeta::Facets(levels) => {
                    // We must use the write transaction of the update here.
                    let mut wtxn = index_cloned.write_txn()?;
                    let mut builder = milli::update::Facets::new(&mut wtxn, &index_cloned);
                    if let Some(value) = levels.level_group_size {
                        builder.level_group_size(value);
                    }
                    if let Some(value) = levels.min_level_size {
                        builder.min_level_size(value);
                    }
                    match builder.execute() {
                        Ok(()) => wtxn.commit().map_err(Into::into),
                        Err(e) => Err(e.into()),
                    }
                }
            })();

            let meta = match result {
                Ok(()) => {
                    format!("valid update content processed in {:.02?}", before_update.elapsed())
                }
                Err(e) => format!("error while processing update content: {:?}", e),
            };

            let processed = UpdateStatus::Processed { update_id, meta: meta.clone() };
            let _ = update_status_sender_cloned.send(processed);

            Ok(meta)
        },
    )?;

    // The database name will not change.
    let db_name = opt.database.file_stem().and_then(|s| s.to_str()).unwrap_or("").to_string();
    let lmdb_path = opt.database.join("data.mdb");

    // We run and wait on the HTTP server

    // Expose an HTML page to debug the search in a browser
    let db_name_cloned = db_name.clone();
    let lmdb_path_cloned = lmdb_path.clone();
    let index_cloned = index.clone();
    let dash_html_route =
        warp::filters::method::get().and(warp::filters::path::end()).map(move || {
            // We retrieve the database size.
            let db_size =
                File::open(lmdb_path_cloned.clone()).unwrap().metadata().unwrap().len() as usize;

            // And the number of documents in the database.
            let rtxn = index_cloned.read_txn().unwrap();
            let docs_count = index_cloned.clone().number_of_documents(&rtxn).unwrap() as usize;

            IndexTemplate { db_name: db_name_cloned.clone(), db_size, docs_count }
        });

    let update_store_cloned = update_store.clone();
    let lmdb_path_cloned = lmdb_path.clone();
    let index_cloned = index.clone();
    let updates_list_or_html_route = warp::filters::method::get()
        .and(warp::header("Accept"))
        .and(warp::path!("updates"))
        .map(move |header: String| {
            let update_store = update_store_cloned.clone();
            let mut updates = update_store
                .iter_metas(|processed, aborted, pending| {
                    let mut updates = Vec::<UpdateStatus<_, UpdateMetaProgress, _>>::new();
                    for result in processed {
                        let (uid, meta) = result?;
                        updates.push(UpdateStatus::Processed { update_id: uid.get(), meta });
                    }
                    for result in aborted {
                        let (uid, meta) = result?;
                        updates.push(UpdateStatus::Aborted { update_id: uid.get(), meta });
                    }
                    for result in pending {
                        let (uid, meta) = result?;
                        updates.push(UpdateStatus::Pending { update_id: uid.get(), meta });
                    }
                    Ok(updates)
                })
                .unwrap();

            updates.sort_unstable_by(|s1, s2| s1.update_id().cmp(&s2.update_id()).reverse());

            if header.contains("text/html") {
                // We retrieve the database size.
                let db_size =
                    File::open(lmdb_path_cloned.clone()).unwrap().metadata().unwrap().len()
                        as usize;

                // And the number of documents in the database.
                let rtxn = index_cloned.read_txn().unwrap();
                let docs_count = index_cloned.clone().number_of_documents(&rtxn).unwrap() as usize;

                let template =
                    UpdatesTemplate { db_name: db_name.clone(), db_size, docs_count, updates };
                Box::new(template) as Box<dyn warp::Reply>
            } else {
                Box::new(warp::reply::json(&updates))
            }
        });

    let dash_bulma_route =
        warp::filters::method::get().and(warp::path!("bulma.min.css")).map(|| {
            Response::builder()
                .header("content-type", "text/css; charset=utf-8")
                .body(include_str!("../public/bulma.min.css"))
        });

    let dash_bulma_dark_route =
        warp::filters::method::get().and(warp::path!("bulma-prefers-dark.min.css")).map(|| {
            Response::builder()
                .header("content-type", "text/css; charset=utf-8")
                .body(include_str!("../public/bulma-prefers-dark.min.css"))
        });

    let dash_style_route = warp::filters::method::get().and(warp::path!("style.css")).map(|| {
        Response::builder()
            .header("content-type", "text/css; charset=utf-8")
            .body(include_str!("../public/style.css"))
    });

    let dash_jquery_route =
        warp::filters::method::get().and(warp::path!("jquery-3.4.1.min.js")).map(|| {
            Response::builder()
                .header("content-type", "application/javascript; charset=utf-8")
                .body(include_str!("../public/jquery-3.4.1.min.js"))
        });

    let dash_filesize_route =
        warp::filters::method::get().and(warp::path!("filesize.min.js")).map(|| {
            Response::builder()
                .header("content-type", "application/javascript; charset=utf-8")
                .body(include_str!("../public/filesize.min.js"))
        });

    let dash_script_route = warp::filters::method::get().and(warp::path!("script.js")).map(|| {
        Response::builder()
            .header("content-type", "application/javascript; charset=utf-8")
            .body(include_str!("../public/script.js"))
    });

    let updates_script_route =
        warp::filters::method::get().and(warp::path!("updates-script.js")).map(|| {
            Response::builder()
                .header("content-type", "application/javascript; charset=utf-8")
                .body(include_str!("../public/updates-script.js"))
        });

    let dash_logo_white_route =
        warp::filters::method::get().and(warp::path!("logo-white.svg")).map(|| {
            Response::builder()
                .header("content-type", "image/svg+xml")
                .body(include_str!("../public/logo-white.svg"))
        });

    let dash_logo_black_route =
        warp::filters::method::get().and(warp::path!("logo-black.svg")).map(|| {
            Response::builder()
                .header("content-type", "image/svg+xml")
                .body(include_str!("../public/logo-black.svg"))
        });

    #[derive(Debug, Deserialize)]
    #[serde(untagged)]
    enum UntaggedEither<L, R> {
        Left(L),
        Right(R),
    }

    impl<L, R> From<UntaggedEither<L, R>> for Either<L, R> {
        fn from(value: UntaggedEither<L, R>) -> Either<L, R> {
            match value {
                UntaggedEither::Left(left) => Either::Left(left),
                UntaggedEither::Right(right) => Either::Right(right),
            }
        }
    }

    #[derive(Debug, Deserialize)]
    #[serde(deny_unknown_fields)]
    #[serde(rename_all = "camelCase")]
    struct QueryBody {
        query: Option<String>,
        filters: Option<String>,
        sort: Option<String>,
        facet_filters: Option<Vec<UntaggedEither<Vec<String>, String>>>,
        facet_distribution: Option<bool>,
        limit: Option<usize>,
    }

    #[derive(Debug, Serialize)]
    #[serde(rename_all = "camelCase")]
    struct Answer {
        documents: Vec<Map<String, Value>>,
        number_of_candidates: u64,
        facets: BTreeMap<String, BTreeMap<String, u64>>,
    }

    let disable_highlighting = opt.disable_highlighting;
    let index_cloned = index.clone();
    let query_route = warp::filters::method::post()
        .and(warp::path!("query"))
        .and(warp::body::json())
        .map(move |query: QueryBody| {
            let before_search = Instant::now();
            let index = index_cloned.clone();
            let rtxn = index.read_txn().unwrap();

            let mut search = index.search(&rtxn);
            if let Some(query) = query.query {
                search.query(query);
            }

            let filters = match query.filters.as_ref() {
                Some(condition) if !condition.trim().is_empty() => {
                    MilliFilter::from_str(condition).unwrap()
                }
                _otherwise => None,
            };

            let facet_filters = match query.facet_filters.as_ref() {
                Some(array) => {
                    let eithers = array.iter().map(|either| match either {
                        UntaggedEither::Left(l) => {
                            Either::Left(l.iter().map(|s| s.as_str()).collect::<Vec<&str>>())
                        }
                        UntaggedEither::Right(r) => Either::Right(r.as_str()),
                    });
                    MilliFilter::from_array(eithers).unwrap()
                }
                _otherwise => None,
            };

            let condition = match (filters, facet_filters) {
                (Some(filters), Some(facet_filters)) => Some(FilterCondition::And(
                    Box::new(filters.into()),
                    Box::new(facet_filters.into()),
                )),
                (Some(condition), None) | (None, Some(condition)) => Some(condition.into()),
                _otherwise => None,
            };

            if let Some(condition) = condition {
                search.filter(condition.into());
            }

            if let Some(limit) = query.limit {
                search.limit(limit);
            }

            if let Some(sort) = query.sort {
                search.sort_criteria(vec![sort.parse().map_err(SortError::from).unwrap()]);
            }

            let SearchResult { matching_words, candidates, documents_ids } =
                search.execute().unwrap();

            let number_of_candidates = candidates.len();
            let facets = if query.facet_distribution == Some(true) {
                Some(index.facets_distribution(&rtxn).candidates(candidates).execute().unwrap())
            } else {
                None
            };

            let mut documents = Vec::new();
            let fields_ids_map = index.fields_ids_map(&rtxn).unwrap();
            let displayed_fields = match index.displayed_fields_ids(&rtxn).unwrap() {
                Some(fields) => fields,
                None => fields_ids_map.iter().map(|(id, _)| id).collect(),
            };
            let attributes_to_highlight = match index.searchable_fields(&rtxn).unwrap() {
                Some(fields) => fields.into_iter().map(String::from).collect(),
                None => fields_ids_map.iter().map(|(_, name)| name).map(String::from).collect(),
            };

            let stop_words = fst::Set::default();
            let highlighter = Highlighter::new(&stop_words);

            let mut matcher_builder = MatcherBuilder::from_matching_words(matching_words);
            matcher_builder.highlight_prefix("<mark>".to_string());
            matcher_builder.highlight_suffix("</mark>".to_string());
            for (_id, obkv) in index.documents(&rtxn, documents_ids).unwrap() {
                let mut object = obkv_to_json(&displayed_fields, &fields_ids_map, obkv).unwrap();
                if !disable_highlighting {
                    highlighter.highlight_record(
                        &mut object,
                        &matcher_builder,
                        &attributes_to_highlight,
                    );
                }

                documents.push(object);
            }

            let answer =
                Answer { documents, number_of_candidates, facets: facets.unwrap_or_default() };

            Response::builder()
                .header("Content-Type", "application/json")
                .header("Time-Ms", before_search.elapsed().as_millis().to_string())
                .body(serde_json::to_string(&answer).unwrap())
        });

    let index_cloned = index.clone();
    let document_route = warp::filters::method::get().and(warp::path!("document" / String)).map(
        move |id: String| {
            let index = index_cloned.clone();
            let rtxn = index.read_txn().unwrap();

            let external_documents_ids = index.external_documents_ids(&rtxn).unwrap();
            let fields_ids_map = index.fields_ids_map(&rtxn).unwrap();
            let displayed_fields = match index.displayed_fields_ids(&rtxn).unwrap() {
                Some(fields) => fields,
                None => fields_ids_map.iter().map(|(id, _)| id).collect(),
            };

            match external_documents_ids.get(&id) {
                Some(document_id) => {
                    let document_id = document_id as u32;
                    let (_, obkv) =
                        index.documents(&rtxn, Some(document_id)).unwrap().pop().unwrap();
                    let document = obkv_to_json(&displayed_fields, &fields_ids_map, obkv).unwrap();

                    Response::builder()
                        .header("Content-Type", "application/json")
                        .body(serde_json::to_string(&document).unwrap())
                }
                None => Response::builder()
                    .status(404)
                    .body(format!("Document with id {:?} not found.", id)),
            }
        },
    );

    async fn buf_stream(
        update_store: Arc<UpdateStore<UpdateMeta, String>>,
        update_status_sender: broadcast::Sender<
            UpdateStatus<UpdateMeta, UpdateMetaProgress, String>,
        >,
        update_method: Option<String>,
        format: String,
        encoding: Option<String>,
        mut stream: impl futures::Stream<Item = Result<impl bytes::Buf, warp::Error>> + Unpin,
    ) -> Result<impl warp::Reply, warp::Rejection> {
        let file = tokio::task::block_in_place(tempfile::tempfile).unwrap();
        let mut file = TFile::from_std(file);

        while let Some(result) = stream.next().await {
            let mut bytes = Vec::new();
            result.unwrap().reader().read_to_end(&mut bytes).unwrap();
            file.write_all(&bytes[..]).await.unwrap();
        }

        let file = file.into_std().await;
        let mmap = unsafe { memmap2::Mmap::map(&file).expect("can't map file") };

        let method = match update_method.as_deref() {
            Some("replace") => String::from("replace"),
            Some("update") => String::from("update"),
            _ => String::from("replace"),
        };

        let meta = UpdateMeta::DocumentsAddition { method, format, encoding };
        let update_id = update_store.register_update(&meta, &mmap[..]).unwrap();
        let _ = update_status_sender.send(UpdateStatus::Pending { update_id, meta });
        eprintln!("update {} registered", update_id);

        Ok(warp::reply())
    }

    #[derive(Deserialize)]
    struct QueryUpdate {
        method: Option<String>,
    }

    let update_store_cloned = update_store.clone();
    let update_status_sender_cloned = update_status_sender.clone();
    let indexing_route = warp::filters::method::post()
        .and(warp::path!("documents"))
        .and(warp::header::header("content-type"))
        .and(warp::header::optional::<String>("content-encoding"))
        .and(warp::query::query())
        .and(warp::body::stream())
        .and_then(move |content_type: String, content_encoding, params: QueryUpdate, stream| {
            let format = match content_type.as_str() {
                "text/csv" => "csv",
                "application/json" => "json",
                "application/x-ndjson" => "jsonl",
                otherwise => panic!("invalid update format: {}", otherwise),
            };

            buf_stream(
                update_store_cloned.clone(),
                update_status_sender_cloned.clone(),
                params.method,
                format.to_string(),
                content_encoding,
                stream,
            )
        });

    let update_store_cloned = update_store.clone();
    let update_status_sender_cloned = update_status_sender.clone();
    let clearing_route =
        warp::filters::method::post().and(warp::path!("clear-documents")).map(move || {
            let meta = UpdateMeta::ClearDocuments;
            let update_id = update_store_cloned.register_update(&meta, &[]).unwrap();
            let _ = update_status_sender_cloned.send(UpdateStatus::Pending { update_id, meta });
            eprintln!("update {} registered", update_id);
            Ok(warp::reply())
        });

    let update_store_cloned = update_store.clone();
    let update_status_sender_cloned = update_status_sender.clone();
    let change_settings_route = warp::filters::method::post()
        .and(warp::path!("settings"))
        .and(warp::body::json())
        .map(move |settings: Settings| {
            let meta = UpdateMeta::Settings(settings);
            let update_id = update_store_cloned.register_update(&meta, &[]).unwrap();
            let _ = update_status_sender_cloned.send(UpdateStatus::Pending { update_id, meta });
            eprintln!("update {} registered", update_id);
            Ok(warp::reply())
        });

    let update_store_cloned = update_store.clone();
    let update_status_sender_cloned = update_status_sender.clone();
    let change_facet_levels_route = warp::filters::method::post()
        .and(warp::path!("facet-level-sizes"))
        .and(warp::body::json())
        .map(move |levels: Facets| {
            let meta = UpdateMeta::Facets(levels);
            let update_id = update_store_cloned.register_update(&meta, &[]).unwrap();
            let _ = update_status_sender_cloned.send(UpdateStatus::Pending { update_id, meta });
            eprintln!("update {} registered", update_id);
            warp::reply()
        });

    let update_store_cloned = update_store.clone();
    let update_status_sender_cloned = update_status_sender.clone();
    let abort_update_id_route = warp::filters::method::delete()
        .and(warp::path!("update" / u64))
        .map(move |update_id: u64| {
            if let Some(meta) = update_store_cloned.abort_update(update_id).unwrap() {
                let _ = update_status_sender_cloned.send(UpdateStatus::Aborted { update_id, meta });
                eprintln!("update {} aborted", update_id);
            }
            warp::reply()
        });

    let update_store_cloned = update_store.clone();
    let update_status_sender_cloned = update_status_sender.clone();
    let abort_pending_updates_route =
        warp::filters::method::delete().and(warp::path!("updates")).map(move || {
            let updates = update_store_cloned.abort_pendings().unwrap();
            for (update_id, meta) in updates {
                let _ = update_status_sender_cloned.send(UpdateStatus::Aborted { update_id, meta });
                eprintln!("update {} aborted", update_id);
            }
            warp::reply()
        });

    let update_ws_route =
        warp::ws().and(warp::path!("updates" / "ws")).map(move |ws: warp::ws::Ws| {
            // And then our closure will be called when it completes...
            let update_status_receiver = update_status_sender.subscribe();
            ws.on_upgrade(|websocket| {
                // Just echo all updates messages...
                BroadcastStream::new(update_status_receiver)
                    .flat_map(|result| match result {
                        Ok(status) => {
                            let msg = serde_json::to_string(&status).unwrap();
                            stream::iter(Some(Ok(Message::text(msg))))
                        }
                        Err(e) => {
                            eprintln!("channel error: {:?}", e);
                            stream::iter(None)
                        }
                    })
                    .forward(websocket)
                    .map(|result| {
                        if let Err(e) = result {
                            eprintln!("websocket error: {:?}", e);
                        }
                    })
            })
        });

    let die_route = warp::filters::method::get().and(warp::path!("die")).map(move || {
        eprintln!("Killed by an HTTP request received on the die route");
        std::process::exit(0);
        #[allow(unreachable_code)]
        warp::reply()
    });

    let routes = dash_html_route
        .or(updates_list_or_html_route)
        .or(dash_bulma_route)
        .or(dash_bulma_dark_route)
        .or(dash_style_route)
        .or(dash_jquery_route)
        .or(dash_filesize_route)
        .or(dash_script_route)
        .or(updates_script_route)
        .or(dash_logo_white_route)
        .or(dash_logo_black_route)
        .or(query_route)
        .or(document_route)
        .or(indexing_route)
        .or(abort_update_id_route)
        .or(abort_pending_updates_route)
        .or(clearing_route)
        .or(change_settings_route)
        .or(change_facet_levels_route)
        .or(update_ws_route)
        .or(die_route);

    let addr = SocketAddr::from_str(&opt.http_listen_addr)?;
    warp::serve(routes).run(addr).await;
    Ok(())
}

fn documents_from_jsonl(reader: impl io::Read) -> anyhow::Result<Vec<u8>> {
    let mut writer = Cursor::new(Vec::new());
    let mut documents = milli::documents::DocumentBatchBuilder::new(&mut writer)?;

    for result in BufReader::new(reader).lines() {
        let line = result?;
        documents.extend_from_json(Cursor::new(line))?;
    }

    documents.finish()?;

    Ok(writer.into_inner())
}

fn documents_from_json(reader: impl io::Read) -> anyhow::Result<Vec<u8>> {
    let mut writer = Cursor::new(Vec::new());
    let mut documents = milli::documents::DocumentBatchBuilder::new(&mut writer)?;

    documents.extend_from_json(reader)?;
    documents.finish()?;

    Ok(writer.into_inner())
}

fn documents_from_csv(reader: impl io::Read) -> anyhow::Result<Vec<u8>> {
    let mut writer = Cursor::new(Vec::new());
    milli::documents::DocumentBatchBuilder::from_csv(reader, &mut writer)?.finish()?;

    Ok(writer.into_inner())
}

#[cfg(test)]
mod tests {
    use maplit::{btreeset, hashmap, hashset};
    use milli::update::Setting;
    use serde_test::{assert_tokens, Token};

    use crate::Settings;

    #[test]
    fn serde_settings_set() {
        let settings = Settings {
            displayed_attributes: Setting::Set(vec!["name".to_string()]),
            searchable_attributes: Setting::Set(vec!["age".to_string()]),
            filterable_attributes: Setting::Set(hashset! { "age".to_string() }),
            sortable_attributes: Setting::Set(hashset! { "age".to_string() }),
            criteria: Setting::Set(vec!["age:asc".to_string()]),
            stop_words: Setting::Set(btreeset! { "and".to_string() }),
            synonyms: Setting::Set(hashmap! { "alex".to_string() => vec!["alexey".to_string()] }),
        };

        assert_tokens(
            &settings,
            &[
                Token::Struct { name: "Settings", len: 7 },
                Token::Str("displayedAttributes"),
                Token::Some,
                Token::Seq { len: Some(1) },
                Token::Str("name"),
                Token::SeqEnd,
                Token::Str("searchableAttributes"),
                Token::Some,
                Token::Seq { len: Some(1) },
                Token::Str("age"),
                Token::SeqEnd,
                Token::Str("filterableAttributes"),
                Token::Some,
                Token::Seq { len: Some(1) },
                Token::Str("age"),
                Token::SeqEnd,
                Token::Str("sortableAttributes"),
                Token::Some,
                Token::Seq { len: Some(1) },
                Token::Str("age"),
                Token::SeqEnd,
                Token::Str("criteria"),
                Token::Some,
                Token::Seq { len: Some(1) },
                Token::Str("age:asc"),
                Token::SeqEnd,
                Token::Str("stopWords"),
                Token::Some,
                Token::Seq { len: Some(1) },
                Token::Str("and"),
                Token::SeqEnd,
                Token::Str("synonyms"),
                Token::Some,
                Token::Map { len: Some(1) },
                Token::Str("alex"),
                Token::Seq { len: Some(1) },
                Token::Str("alexey"),
                Token::SeqEnd,
                Token::MapEnd,
                Token::StructEnd,
            ],
        );
    }

    #[test]
    fn serde_settings_reset() {
        let settings = Settings {
            displayed_attributes: Setting::Reset,
            searchable_attributes: Setting::Reset,
            filterable_attributes: Setting::Reset,
            sortable_attributes: Setting::Reset,
            criteria: Setting::Reset,
            stop_words: Setting::Reset,
            synonyms: Setting::Reset,
        };

        assert_tokens(
            &settings,
            &[
                Token::Struct { name: "Settings", len: 7 },
                Token::Str("displayedAttributes"),
                Token::None,
                Token::Str("searchableAttributes"),
                Token::None,
                Token::Str("filterableAttributes"),
                Token::None,
                Token::Str("sortableAttributes"),
                Token::None,
                Token::Str("criteria"),
                Token::None,
                Token::Str("stopWords"),
                Token::None,
                Token::Str("synonyms"),
                Token::None,
                Token::StructEnd,
            ],
        );
    }

    #[test]
    fn serde_settings_notset() {
        let settings = Settings {
            displayed_attributes: Setting::NotSet,
            searchable_attributes: Setting::NotSet,
            filterable_attributes: Setting::NotSet,
            sortable_attributes: Setting::NotSet,
            criteria: Setting::NotSet,
            stop_words: Setting::NotSet,
            synonyms: Setting::NotSet,
        };

        assert_tokens(&settings, &[Token::Struct { name: "Settings", len: 0 }, Token::StructEnd]);
    }
}