diff --git a/http-ui/src/main.rs b/http-ui/src/main.rs index 9671576f6..e03261641 100644 --- a/http-ui/src/main.rs +++ b/http-ui/src/main.rs @@ -2,6 +2,7 @@ use std::borrow::Cow; use std::collections::{HashMap, HashSet}; use std::fs::{File, create_dir_all}; use std::net::SocketAddr; +use std::num::NonZeroUsize; use std::path::PathBuf; use std::str::FromStr; use std::sync::Arc; @@ -27,7 +28,7 @@ use warp::{Filter, http::Response}; use milli::tokenizer::{simple_tokenizer, TokenType}; use milli::update::UpdateIndexingStep::*; -use milli::update::{UpdateBuilder, IndexDocumentsMethod, UpdateFormat}; +use milli::update::{UpdateBuilder, IndexDocumentsMethod, UpdateFormat, EasingName}; use milli::{obkv_to_json, Index, UpdateStore, SearchResult, FacetCondition}; static GLOBAL_THREAD_POOL: OnceCell = OnceCell::new(); @@ -196,6 +197,7 @@ enum UpdateMeta { DocumentsAddition { method: String, format: String }, ClearDocuments, Settings(Settings), + FacetLevels(FacetLevels), } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -231,6 +233,15 @@ struct Settings { faceted_attributes: Option>, } +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +#[serde(rename_all = "camelCase")] +struct FacetLevels { + last_level_size: Option, + number_of_levels: Option, + easing_function: Option, +} + // Any value that is present is considered Some value, including null. fn deserialize_some<'de, T, D>(deserializer: D) -> Result, D::Error> where T: Deserialize<'de>, @@ -399,6 +410,36 @@ async fn main() -> anyhow::Result<()> { Ok(_count) => wtxn.commit().map_err(Into::into), Err(e) => Err(e.into()) } + }, + UpdateMeta::FacetLevels(levels) => { + // We must use the write transaction of the update here. + let mut wtxn = index_cloned.write_txn()?; + let mut builder = update_builder.facet_levels(&mut wtxn, &index_cloned); + if let Some(value) = levels.last_level_size { + builder.last_level_size(value); + } + if let Some(value) = levels.number_of_levels { + builder.number_of_levels(value); + } + if let Some(value) = levels.easing_function { + let easing_name = if value.eq_ignore_ascii_case("expo") { + EasingName::Expo + } else if value.eq_ignore_ascii_case("quart") { + EasingName::Quart + } else if value.eq_ignore_ascii_case("circ") { + EasingName::Circ + } else if value.eq_ignore_ascii_case("linear") { + EasingName::Linear + } else { + panic!("Invalid easing function name") + }; + builder.easing_function(easing_name); + } + + match builder.execute() { + Ok(()) => wtxn.commit().map_err(Into::into), + Err(e) => Err(e.into()) + } } }; @@ -759,6 +800,19 @@ async fn main() -> anyhow::Result<()> { Ok(warp::reply()) }); + let update_store_cloned = update_store.clone(); + let update_status_sender_cloned = update_status_sender.clone(); + let change_facet_levels_route = warp::filters::method::post() + .and(warp::path!("facet-levels")) + .and(warp::body::json()) + .map(move |levels: FacetLevels| { + let meta = UpdateMeta::FacetLevels(levels); + let update_id = update_store_cloned.register_update(&meta, &[]).unwrap(); + let _ = update_status_sender_cloned.send(UpdateStatus::Pending { update_id, meta }); + eprintln!("update {} registered", update_id); + warp::reply() + }); + let update_ws_route = warp::ws() .and(warp::path!("updates" / "ws")) .map(move |ws: warp::ws::Ws| { @@ -807,6 +861,7 @@ async fn main() -> anyhow::Result<()> { .or(indexing_json_stream_route) .or(clearing_route) .or(change_settings_route) + .or(change_facet_levels_route) .or(update_ws_route); let addr = SocketAddr::from_str(&opt.http_listen_addr)?; diff --git a/src/update/facet_levels.rs b/src/update/facet_levels.rs new file mode 100644 index 000000000..bc8f7121f --- /dev/null +++ b/src/update/facet_levels.rs @@ -0,0 +1,247 @@ +use std::fs::File; +use std::num::NonZeroUsize; + +use grenad::{CompressionType, Reader, Writer, FileFuse}; +use heed::types::{ByteSlice, DecodeIgnore}; +use heed::{BytesEncode, Error}; +use itertools::Itertools; +use log::debug; +use roaring::RoaringBitmap; + +use crate::facet::FacetType; +use crate::heed_codec::{facet::FacetLevelValueI64Codec, CboRoaringBitmapCodec}; +use crate::Index; +use crate::update::index_documents::WriteMethod; +use crate::update::index_documents::{create_writer, writer_into_reader, write_into_lmdb_database}; + +#[derive(Debug, Copy, Clone)] +pub enum EasingName { + Expo, + Quart, + Circ, + Linear, +} + +pub struct FacetLevels<'t, 'u, 'i> { + wtxn: &'t mut heed::RwTxn<'i, 'u>, + index: &'i Index, + pub(crate) chunk_compression_type: CompressionType, + pub(crate) chunk_compression_level: Option, + pub(crate) chunk_fusing_shrink_size: Option, + number_of_levels: NonZeroUsize, + last_level_size: NonZeroUsize, + easing_function: EasingName, +} + +impl<'t, 'u, 'i> FacetLevels<'t, 'u, 'i> { + pub fn new(wtxn: &'t mut heed::RwTxn<'i, 'u>, index: &'i Index) -> FacetLevels<'t, 'u, 'i> { + FacetLevels { + wtxn, + index, + chunk_compression_type: CompressionType::None, + chunk_compression_level: None, + chunk_fusing_shrink_size: None, + number_of_levels: NonZeroUsize::new(5).unwrap(), + last_level_size: NonZeroUsize::new(5).unwrap(), + easing_function: EasingName::Expo, + } + } + + pub fn number_of_levels(&mut self, value: NonZeroUsize) -> &mut Self { + self.number_of_levels = value; + self + } + + pub fn last_level_size(&mut self, value: NonZeroUsize) -> &mut Self { + self.last_level_size = value; + self + } + + pub fn easing_function(&mut self, value: EasingName) -> &mut Self { + self.easing_function = value; + self + } + + pub fn execute(self) -> anyhow::Result<()> { + // We get the faceted fields to be able to create the facet levels. + let faceted_fields = self.index.faceted_fields(self.wtxn)?; + + debug!("Computing and writing the facet values levels docids into LMDB on disk..."); + for (field_id, facet_type) in faceted_fields { + if facet_type == FacetType::String { continue } + + clear_field_levels( + self.wtxn, + self.index.facet_field_id_value_docids, + field_id, + )?; + + let content = compute_facet_levels( + self.wtxn, + self.index.facet_field_id_value_docids, + self.chunk_compression_type, + self.chunk_compression_level, + self.chunk_fusing_shrink_size, + self.last_level_size, + self.number_of_levels, + self.easing_function, + field_id, + facet_type, + )?; + + write_into_lmdb_database( + self.wtxn, + *self.index.facet_field_id_value_docids.as_polymorph(), + content, + |_, _| anyhow::bail!("invalid facet level merging"), + WriteMethod::GetMergePut, + )?; + } + + Ok(()) + } +} + +fn clear_field_levels( + wtxn: &mut heed::RwTxn, + db: heed::Database, + field_id: u8, +) -> heed::Result<()> +{ + let range = (field_id, 1, i64::MIN, i64::MIN)..=(field_id, u8::MAX, i64::MAX, i64::MAX); + db.remap_key_type::() + .delete_range(wtxn, &range) + .map(drop) +} + +fn compute_facet_levels( + rtxn: &heed::RoTxn, + db: heed::Database, + compression_type: CompressionType, + compression_level: Option, + shrink_size: Option, + last_level_size: NonZeroUsize, + number_of_levels: NonZeroUsize, + easing_function: EasingName, + field_id: u8, + facet_type: FacetType, +) -> anyhow::Result> +{ + let first_level_size = db.prefix_iter(rtxn, &[field_id])? + .remap_types::() + .fold(Ok(0usize), |count, result| result.and(count).map(|c| c + 1))?; + + // It is forbidden to keep a cursor and write in a database at the same time with LMDB + // therefore we write the facet levels entries into a grenad file before transfering them. + let mut writer = tempfile::tempfile().and_then(|file| { + create_writer(compression_type, compression_level, file) + })?; + + let level_0_range = (field_id, 0, i64::MIN, i64::MIN)..=(field_id, 0, i64::MAX, i64::MAX); + let level_sizes_iter = + levels_iterator(first_level_size, last_level_size.get(), number_of_levels.get(), easing_function) + .map(|size| (first_level_size as f64 / size as f64).ceil() as usize) + .unique() + .enumerate() + .skip(1); + + // TODO we must not create levels with identical group sizes. + for (level, level_entry_sizes) in level_sizes_iter { + let mut left = 0; + let mut right = 0; + let mut group_docids = RoaringBitmap::new(); + + dbg!(level, level_entry_sizes, first_level_size); + + let db = db.remap_key_type::(); + for (i, result) in db.range(rtxn, &level_0_range)?.enumerate() { + let ((_field_id, _level, value, _right), docids) = result?; + + if i == 0 { + left = value; + } else if i % level_entry_sizes == 0 { + // we found the first bound of the next group, we must store the left + // and right bounds associated with the docids. + write_entry(&mut writer, field_id, level as u8, left, right, &group_docids)?; + + // We save the left bound for the new group and also reset the docids. + group_docids = RoaringBitmap::new(); + left = value; + } + + // The right bound is always the bound we run through. + group_docids.union_with(&docids); + right = value; + } + + if !group_docids.is_empty() { + write_entry(&mut writer, field_id, level as u8, left, right, &group_docids)?; + } + } + + writer_into_reader(writer, shrink_size) +} + +fn write_entry( + writer: &mut Writer, + field_id: u8, + level: u8, + left: i64, + right: i64, + ids: &RoaringBitmap, +) -> anyhow::Result<()> +{ + let key = (field_id, level, left, right); + let key = FacetLevelValueI64Codec::bytes_encode(&key).ok_or(Error::Encoding)?; + let data = CboRoaringBitmapCodec::bytes_encode(&ids).ok_or(Error::Encoding)?; + writer.insert(&key, &data)?; + Ok(()) +} + +fn levels_iterator( + first_level_size: usize, // biggest level + last_level_size: usize, // smallest level + number_of_levels: usize, + easing_function: EasingName, +) -> impl Iterator +{ + let easing_function = match easing_function { + EasingName::Expo => ease_out_expo, + EasingName::Quart => ease_out_quart, + EasingName::Circ => ease_out_circ, + EasingName::Linear => ease_out_linear, + }; + + let b = last_level_size as f64; + let end = first_level_size as f64; + let c = end - b; + let d = number_of_levels; + (0..=d).map(move |t| ((end + b) - easing_function(t as f64, b, c, d as f64)) as usize) +} + +// Go look at the function definitions here: +// https://docs.rs/easer/0.2.1/easer/index.html +// https://easings.net/#easeOutExpo +fn ease_out_expo(t: f64, b: f64, c: f64, d: f64) -> f64 { + if t == d { + b + c + } else { + c * (-2.0_f64.powf(-10.0 * t / d) + 1.0) + b + } +} + +// https://easings.net/#easeOutCirc +fn ease_out_circ(t: f64, b: f64, c: f64, d: f64) -> f64 { + let t = t / d - 1.0; + c * (1.0 - t * t).sqrt() + b +} + +// https://easings.net/#easeOutQuart +fn ease_out_quart(t: f64, b: f64, c: f64, d: f64) -> f64 { + let t = t / d - 1.0; + -c * ((t * t * t * t) - 1.0) + b +} + +fn ease_out_linear(t: f64, b: f64, c: f64, d: f64) -> f64 { + c * t / d + b +} diff --git a/src/update/index_documents/facet_level.rs b/src/update/index_documents/facet_level.rs deleted file mode 100644 index 0a87e21d6..000000000 --- a/src/update/index_documents/facet_level.rs +++ /dev/null @@ -1,125 +0,0 @@ -use std::fs::File; - -use grenad::{CompressionType, Reader, Writer, FileFuse}; -use heed::types::{ByteSlice, DecodeIgnore}; -use heed::{BytesEncode, Error}; -use roaring::RoaringBitmap; - -use crate::facet::FacetType; -use crate::heed_codec::{facet::FacetLevelValueI64Codec, CboRoaringBitmapCodec}; -use crate::update::index_documents::{create_writer, writer_into_reader}; - -pub fn clear_field_levels( - wtxn: &mut heed::RwTxn, - db: heed::Database, - field_id: u8, -) -> heed::Result<()> -{ - let range = (field_id, 1, i64::MIN, i64::MIN)..=(field_id, u8::MAX, i64::MAX, i64::MAX); - db.remap_key_type::() - .delete_range(wtxn, &range) - .map(drop) -} - -pub fn compute_facet_levels( - rtxn: &heed::RoTxn, - db: heed::Database, - compression_type: CompressionType, - compression_level: Option, - shrink_size: Option, - field_id: u8, - facet_type: FacetType, -) -> anyhow::Result> -{ - let last_level_size = 5; - let number_of_levels = 5; - let first_level_size = db.prefix_iter(rtxn, &[field_id])? - .remap_types::() - .fold(Ok(0u64), |count, result| result.and(count).map(|c| c + 1))?; - - // It is forbidden to keep a cursor and write in a database at the same time with LMDB - // therefore we write the facet levels entries into a grenad file before transfering them. - let mut writer = tempfile::tempfile().and_then(|file| { - create_writer(compression_type, compression_level, file) - })?; - - let level_0_range = (field_id, 0, i64::MIN, i64::MIN)..=(field_id, 0, i64::MAX, i64::MAX); - let level_sizes_iter = levels_iterator(first_level_size, last_level_size, number_of_levels) - .enumerate() - .skip(1); - - // TODO we must not create levels with identical group sizes. - for (level, size) in level_sizes_iter { - let level_entry_sizes = (first_level_size as f64 / size as f64).ceil() as usize; - let mut left = 0; - let mut right = 0; - let mut group_docids = RoaringBitmap::new(); - - let db = db.remap_key_type::(); - for (i, result) in db.range(rtxn, &level_0_range)?.enumerate() { - let ((_field_id, _level, value, _right), docids) = result?; - - if i == 0 { - left = value; - } else if i % level_entry_sizes == 0 { - // we found the first bound of the next group, we must store the left - // and right bounds associated with the docids. - write_entry(&mut writer, field_id, level as u8, left, right, &group_docids)?; - - // We save the left bound for the new group and also reset the docids. - group_docids = RoaringBitmap::new(); - left = value; - } - - // The right bound is always the bound we run through. - group_docids.union_with(&docids); - right = value; - } - - if !group_docids.is_empty() { - write_entry(&mut writer, field_id, level as u8, left, right, &group_docids)?; - } - } - - writer_into_reader(writer, shrink_size) -} - -fn write_entry( - writer: &mut Writer, - field_id: u8, - level: u8, - left: i64, - right: i64, - ids: &RoaringBitmap, -) -> anyhow::Result<()> -{ - let key = (field_id, level, left, right); - let key = FacetLevelValueI64Codec::bytes_encode(&key).ok_or(Error::Encoding)?; - let data = CboRoaringBitmapCodec::bytes_encode(&ids).ok_or(Error::Encoding)?; - writer.insert(&key, &data)?; - Ok(()) -} - -fn levels_iterator( - first_level_size: u64, // biggest level - last_level_size: u64, // smallest level - number_of_levels: u64, -) -> impl Iterator -{ - // Go look at the function definitions here: - // https://docs.rs/easer/0.2.1/easer/index.html - // https://easings.net/#easeOutExpo - fn ease_out_expo(t: f64, b: f64, c: f64, d: f64) -> f64 { - if t == d { - b + c - } else { - c * (-2.0_f64.powf(-10.0 * t / d) + 1.0) + b - } - } - - let b = last_level_size as f64; - let end = first_level_size as f64; - let c = end - b; - let d = number_of_levels; - (0..=d).map(move |t| ((end + b) - ease_out_expo(t as f64, b, c, d as f64)) as u64) -} diff --git a/src/update/index_documents/mod.rs b/src/update/index_documents/mod.rs index f6587c3a8..50f5336fc 100644 --- a/src/update/index_documents/mod.rs +++ b/src/update/index_documents/mod.rs @@ -2,6 +2,7 @@ use std::borrow::Cow; use std::collections::HashSet; use std::fs::File; use std::io::{self, Seek, SeekFrom}; +use std::num::NonZeroUsize; use std::sync::mpsc::sync_channel; use std::time::Instant; @@ -14,32 +15,29 @@ use memmap::Mmap; use rayon::prelude::*; use rayon::ThreadPool; -use crate::facet::FacetType; use crate::index::Index; -use crate::update::UpdateIndexingStep; +use crate::update::{FacetLevels, UpdateIndexingStep}; use self::store::{Store, Readers}; use self::merge_function::{ main_merge, word_docids_merge, words_pairs_proximities_docids_merge, docid_word_positions_merge, documents_merge, facet_field_value_docids_merge, }; pub use self::transform::{Transform, TransformOutput}; -pub use self::facet_level::{clear_field_levels, compute_facet_levels}; use crate::MergeFn; use super::UpdateBuilder; -mod facet_level; mod merge_function; mod store; mod transform; #[derive(Debug, Copy, Clone)] -enum WriteMethod { +pub enum WriteMethod { Append, GetMergePut, } -fn create_writer(typ: CompressionType, level: Option, file: File) -> io::Result> { +pub fn create_writer(typ: CompressionType, level: Option, file: File) -> io::Result> { let mut builder = Writer::builder(); builder.compression_type(typ); if let Some(level) = level { @@ -48,7 +46,7 @@ fn create_writer(typ: CompressionType, level: Option, file: File) -> io::Re builder.build(file) } -fn create_sorter( +pub fn create_sorter( merge: MergeFn, chunk_compression_type: CompressionType, chunk_compression_level: Option, @@ -74,7 +72,7 @@ fn create_sorter( builder.build() } -fn writer_into_reader(writer: Writer, shrink_size: Option) -> anyhow::Result> { +pub fn writer_into_reader(writer: Writer, shrink_size: Option) -> anyhow::Result> { let mut file = writer.into_inner()?; file.seek(SeekFrom::Start(0))?; let file = if let Some(shrink_size) = shrink_size { @@ -85,13 +83,13 @@ fn writer_into_reader(writer: Writer, shrink_size: Option) -> anyhow: Reader::new(file).map_err(Into::into) } -fn merge_readers(sources: Vec>, merge: MergeFn) -> Merger { +pub fn merge_readers(sources: Vec>, merge: MergeFn) -> Merger { let mut builder = Merger::builder(merge); builder.extend(sources); builder.build() } -fn merge_into_lmdb_database( +pub fn merge_into_lmdb_database( wtxn: &mut heed::RwTxn, database: heed::PolyDatabase, sources: Vec>, @@ -135,7 +133,7 @@ fn merge_into_lmdb_database( Ok(()) } -fn write_into_lmdb_database( +pub fn write_into_lmdb_database( wtxn: &mut heed::RwTxn, database: heed::PolyDatabase, mut reader: Reader, @@ -210,6 +208,8 @@ pub struct IndexDocuments<'t, 'u, 'i, 'a> { pub(crate) chunk_compression_level: Option, pub(crate) chunk_fusing_shrink_size: Option, pub(crate) thread_pool: Option<&'a ThreadPool>, + facet_number_of_levels: Option, + facet_last_level_size: Option, update_method: IndexDocumentsMethod, update_format: UpdateFormat, autogenerate_docids: bool, @@ -228,6 +228,8 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { chunk_compression_level: None, chunk_fusing_shrink_size: None, thread_pool: None, + facet_number_of_levels: None, + facet_last_level_size: None, update_method: IndexDocumentsMethod::ReplaceDocuments, update_format: UpdateFormat::Json, autogenerate_docids: true, @@ -478,9 +480,6 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { // We write the external documents ids into the main database. self.index.put_external_documents_ids(self.wtxn, &external_documents_ids)?; - // We get the faceted fields to be able to create the facet levels. - let faceted_fields = self.index.faceted_fields(self.wtxn)?; - // We merge the new documents ids with the existing ones. documents_ids.union_with(&new_documents_ids); documents_ids.union_with(&replaced_documents_ids); @@ -583,34 +582,17 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { }); } - debug!("Computing and writing the facet values levels docids into LMDB on disk..."); - for (field_id, facet_type) in faceted_fields { - if facet_type == FacetType::String { continue } - - clear_field_levels( - self.wtxn, - self.index.facet_field_id_value_docids, - field_id, - )?; - - let content = compute_facet_levels( - self.wtxn, - self.index.facet_field_id_value_docids, - chunk_compression_type, - chunk_compression_level, - chunk_fusing_shrink_size, - field_id, - facet_type, - )?; - - write_into_lmdb_database( - self.wtxn, - *self.index.facet_field_id_value_docids.as_polymorph(), - content, - |_, _| anyhow::bail!("invalid facet level merging"), - WriteMethod::GetMergePut, - )?; + let mut builder = FacetLevels::new(self.wtxn, self.index); + builder.chunk_compression_type = self.chunk_compression_type; + builder.chunk_compression_level = self.chunk_compression_level; + builder.chunk_fusing_shrink_size = self.chunk_fusing_shrink_size; + if let Some(value) = self.facet_number_of_levels { + builder.number_of_levels(value); } + if let Some(value) = self.facet_last_level_size { + builder.last_level_size(value); + } + builder.execute()?; debug_assert_eq!(database_count, total_databases); diff --git a/src/update/mod.rs b/src/update/mod.rs index 75724269a..87035065c 100644 --- a/src/update/mod.rs +++ b/src/update/mod.rs @@ -1,6 +1,7 @@ mod available_documents_ids; mod clear_documents; mod delete_documents; +mod facet_levels; mod index_documents; mod settings; mod update_builder; @@ -11,6 +12,7 @@ pub use self::available_documents_ids::AvailableDocumentsIds; pub use self::clear_documents::ClearDocuments; pub use self::delete_documents::DeleteDocuments; pub use self::index_documents::{IndexDocuments, IndexDocumentsMethod, UpdateFormat}; +pub use self::facet_levels::{FacetLevels, EasingName}; pub use self::settings::Settings; pub use self::update_builder::UpdateBuilder; pub use self::update_step::UpdateIndexingStep; diff --git a/src/update/update_builder.rs b/src/update/update_builder.rs index 67ea04bfc..8f7f1a0a8 100644 --- a/src/update/update_builder.rs +++ b/src/update/update_builder.rs @@ -2,7 +2,7 @@ use grenad::CompressionType; use rayon::ThreadPool; use crate::Index; -use super::{ClearDocuments, DeleteDocuments, IndexDocuments, Settings}; +use super::{ClearDocuments, DeleteDocuments, IndexDocuments, Settings, FacetLevels}; pub struct UpdateBuilder<'a> { pub(crate) log_every_n: Option, @@ -118,4 +118,19 @@ impl<'a> UpdateBuilder<'a> { builder } + + pub fn facet_levels<'t, 'u, 'i>( + self, + wtxn: &'t mut heed::RwTxn<'i, 'u>, + index: &'i Index, + ) -> FacetLevels<'t, 'u, 'i> + { + let mut builder = FacetLevels::new(wtxn, index); + + builder.chunk_compression_type = self.chunk_compression_type; + builder.chunk_compression_level = self.chunk_compression_level; + builder.chunk_fusing_shrink_size = self.chunk_fusing_shrink_size; + + builder + } }