From a18d9a1f871c5c039b4bb8fa0f4820ebbb0d90a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Fri, 13 Nov 2020 14:49:48 +0100 Subject: [PATCH] Parse and store the faceted fields --- Cargo.lock | 17 +++ Cargo.toml | 2 + src/lib.rs | 1 + src/update/index_documents/mod.rs | 55 +++++++-- src/update/index_documents/store.rs | 183 ++++++++++++++++++++++++++-- src/update/settings.rs | 28 +++++ 6 files changed, 262 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 63e30b65c..ddb2e9ec5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -550,6 +550,12 @@ dependencies = [ "cfg-if 0.1.10", ] +[[package]] +name = "maplit" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d" + [[package]] name = "matches" version = "0.1.8" @@ -608,10 +614,12 @@ dependencies = [ "levenshtein_automata", "linked-hash-map", "log", + "maplit", "memmap", "near-proximity", "obkv", "once_cell", + "ordered-float", "rayon", "ringtail", "roaring", @@ -708,6 +716,15 @@ version = "11.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a170cebd8021a008ea92e4db85a72f80b35df514ec664b296fdcbb654eac0b2c" +[[package]] +name = "ordered-float" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fe9037165d7023b1228bc4ae9a2fa1a2b0095eca6c2998c624723dfd01314a5" +dependencies = [ + "num-traits", +] + [[package]] name = "page_size" version = "0.4.2" diff --git a/Cargo.toml b/Cargo.toml index 441397275..fd453a5f2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ memmap = "0.7.0" near-proximity = { git = "https://github.com/Kerollmops/plane-sweep-proximity", rev = "6608205" } obkv = "0.1.0" once_cell = "1.4.0" +ordered-float = "2.0.0" rayon = "1.3.1" ringtail = "0.3.0" roaring = "0.6.1" @@ -44,6 +45,7 @@ stderrlog = "0.5.0" [dev-dependencies] criterion = "0.3.3" +maplit = "1.0.2" [build-dependencies] fst = "0.4.4" diff --git a/src/lib.rs b/src/lib.rs index 0e9ee0ec0..808c54a4a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -34,6 +34,7 @@ pub type FastMap8 = HashMap>; pub type SmallString32 = smallstr::SmallString<[u8; 32]>; pub type SmallVec32 = smallvec::SmallVec<[T; 32]>; pub type SmallVec16 = smallvec::SmallVec<[T; 16]>; +pub type SmallVec8 = smallvec::SmallVec<[T; 8]>; pub type BEU32 = heed::zerocopy::U32; pub type BEU64 = heed::zerocopy::U64; pub type DocumentId = u32; diff --git a/src/update/index_documents/mod.rs b/src/update/index_documents/mod.rs index c060b04a1..13b725e19 100644 --- a/src/update/index_documents/mod.rs +++ b/src/update/index_documents/mod.rs @@ -16,10 +16,10 @@ use rayon::ThreadPool; use crate::index::Index; use crate::update::UpdateIndexingStep; -use self::store::Store; +use self::store::{Store, Readers}; use self::merge_function::{ main_merge, word_docids_merge, words_pairs_proximities_docids_merge, - docid_word_positions_merge, documents_merge, + docid_word_positions_merge, documents_merge, facet_field_value_docids_merge, }; pub use self::transform::{Transform, TransformOutput}; @@ -327,6 +327,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { enum DatabaseType { Main, WordDocids, + FacetValuesDocids, } let faceted_fields = self.index.faceted_fields(self.wtxn)?; @@ -386,13 +387,23 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { let mut word_docids_readers = Vec::with_capacity(readers.len()); let mut docid_word_positions_readers = Vec::with_capacity(readers.len()); let mut words_pairs_proximities_docids_readers = Vec::with_capacity(readers.len()); + let mut facet_field_value_docids_readers = Vec::with_capacity(readers.len()); let mut documents_readers = Vec::with_capacity(readers.len()); readers.into_iter().for_each(|readers| { - main_readers.push(readers.main); - word_docids_readers.push(readers.word_docids); - docid_word_positions_readers.push(readers.docid_word_positions); - words_pairs_proximities_docids_readers.push(readers.words_pairs_proximities_docids); - documents_readers.push(readers.documents); + let Readers { + main, + word_docids, + docid_word_positions, + words_pairs_proximities_docids, + facet_field_value_docids, + documents + } = readers; + main_readers.push(main); + word_docids_readers.push(word_docids); + docid_word_positions_readers.push(docid_word_positions); + words_pairs_proximities_docids_readers.push(words_pairs_proximities_docids); + facet_field_value_docids_readers.push(facet_field_value_docids); + documents_readers.push(documents); }); // This is the function that merge the readers @@ -415,6 +426,11 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { vec![ (DatabaseType::Main, main_readers, main_merge as MergeFn), (DatabaseType::WordDocids, word_docids_readers, word_docids_merge), + ( + DatabaseType::FacetValuesDocids, + facet_field_value_docids_readers, + facet_field_value_docids_merge, + ), ] .into_par_iter() .for_each(|(dbtype, readers, merge)| { @@ -465,9 +481,11 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { self.index.put_documents_ids(self.wtxn, &documents_ids)?; let mut database_count = 0; + let total_databases = 6; + progress_callback(UpdateIndexingStep::MergeDataIntoFinalDatabase { databases_seen: 0, - total_databases: 5, + total_databases, }); debug!("Writing the docid word positions into LMDB on disk..."); @@ -482,7 +500,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { database_count += 1; progress_callback(UpdateIndexingStep::MergeDataIntoFinalDatabase { databases_seen: database_count, - total_databases: 5, + total_databases, }); debug!("Writing the documents into LMDB on disk..."); @@ -497,7 +515,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { database_count += 1; progress_callback(UpdateIndexingStep::MergeDataIntoFinalDatabase { databases_seen: database_count, - total_databases: 5, + total_databases, }); debug!("Writing the words pairs proximities docids into LMDB on disk..."); @@ -512,7 +530,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { database_count += 1; progress_callback(UpdateIndexingStep::MergeDataIntoFinalDatabase { databases_seen: database_count, - total_databases: 5, + total_databases, }); for (db_type, result) in receiver { @@ -539,16 +557,27 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { write_method, )?; }, + DatabaseType::FacetValuesDocids => { + debug!("Writing the facet values docids into LMDB on disk..."); + let db = *self.index.facet_field_id_value_docids.as_polymorph(); + write_into_lmdb_database( + self.wtxn, + db, + content, + facet_field_value_docids_merge, + write_method, + )?; + }, } database_count += 1; progress_callback(UpdateIndexingStep::MergeDataIntoFinalDatabase { databases_seen: database_count, - total_databases: 5, + total_databases, }); } - debug_assert_eq!(database_count, 5); + debug_assert_eq!(database_count, total_databases); info!("Transform output indexed in {:.02?}", before_indexing.elapsed()); diff --git a/src/update/index_documents/store.rs b/src/update/index_documents/store.rs index e9548eec3..3182e2ccd 100644 --- a/src/update/index_documents/store.rs +++ b/src/update/index_documents/store.rs @@ -1,3 +1,4 @@ +use std::borrow::Cow; use std::collections::{BTreeMap, HashMap, HashSet}; use std::convert::{TryFrom, TryInto}; use std::fs::File; @@ -5,23 +6,29 @@ use std::iter::FromIterator; use std::time::Instant; use std::{cmp, iter}; -use anyhow::Context; +use anyhow::{bail, Context}; use bstr::ByteSlice as _; +use grenad::{Reader, FileFuse, Writer, Sorter, CompressionType}; use heed::BytesEncode; use linked_hash_map::LinkedHashMap; use log::{debug, info}; -use grenad::{Reader, FileFuse, Writer, Sorter, CompressionType}; +use ordered_float::OrderedFloat; use roaring::RoaringBitmap; +use serde_json::Value; use tempfile::tempfile; use crate::facet::FacetType; use crate::heed_codec::{BoRoaringBitmapCodec, CboRoaringBitmapCodec}; +use crate::heed_codec::facet::{FacetValueStringCodec, FacetValueF64Codec, FacetValueI64Codec}; use crate::tokenizer::{simple_tokenizer, only_token}; use crate::update::UpdateIndexingStep; -use crate::{json_to_string, SmallVec32, Position, DocumentId}; +use crate::{json_to_string, SmallVec8, SmallVec32, SmallString32, Position, DocumentId}; use super::{MergeFn, create_writer, create_sorter, writer_into_reader}; -use super::merge_function::{main_merge, word_docids_merge, words_pairs_proximities_docids_merge}; +use super::merge_function::{ + main_merge, word_docids_merge, words_pairs_proximities_docids_merge, + facet_field_value_docids_merge, +}; const LMDB_MAX_KEY_LENGTH: usize = 511; const ONE_KILOBYTE: usize = 1024 * 1024; @@ -34,6 +41,7 @@ pub struct Readers { pub word_docids: Reader, pub docid_word_positions: Reader, pub words_pairs_proximities_docids: Reader, + pub facet_field_value_docids: Reader, pub documents: Reader, } @@ -46,6 +54,8 @@ pub struct Store { word_docids_limit: usize, words_pairs_proximities_docids: LinkedHashMap<(SmallVec32, SmallVec32, u8), RoaringBitmap>, words_pairs_proximities_docids_limit: usize, + facet_field_value_docids: LinkedHashMap<(u8, FacetValue), RoaringBitmap>, + facet_field_value_docids_limit: usize, // MTBL parameters chunk_compression_type: CompressionType, chunk_compression_level: Option, @@ -54,6 +64,7 @@ pub struct Store { main_sorter: Sorter, word_docids_sorter: Sorter, words_pairs_proximities_docids_sorter: Sorter, + facet_field_value_docids_sorter: Sorter, // MTBL writers docid_word_positions_writer: Writer, documents_writer: Writer, @@ -72,7 +83,7 @@ impl Store { ) -> anyhow::Result { // We divide the max memory by the number of sorter the Store have. - let max_memory = max_memory.map(|mm| cmp::max(ONE_KILOBYTE, mm / 3)); + let max_memory = max_memory.map(|mm| cmp::max(ONE_KILOBYTE, mm / 4)); let linked_hash_map_size = linked_hash_map_size.unwrap_or(500); let main_sorter = create_sorter( @@ -99,6 +110,14 @@ impl Store { max_nb_chunks, max_memory, ); + let facet_field_value_docids_sorter = create_sorter( + facet_field_value_docids_merge, + chunk_compression_type, + chunk_compression_level, + chunk_fusing_shrink_size, + max_nb_chunks, + max_memory, + ); let documents_writer = tempfile().and_then(|f| { create_writer(chunk_compression_type, chunk_compression_level, f) @@ -116,6 +135,8 @@ impl Store { word_docids_limit: linked_hash_map_size, words_pairs_proximities_docids: LinkedHashMap::with_capacity(linked_hash_map_size), words_pairs_proximities_docids_limit: linked_hash_map_size, + facet_field_value_docids: LinkedHashMap::with_capacity(linked_hash_map_size), + facet_field_value_docids_limit: linked_hash_map_size, // MTBL parameters chunk_compression_type, chunk_compression_level, @@ -124,6 +145,7 @@ impl Store { main_sorter, word_docids_sorter, words_pairs_proximities_docids_sorter, + facet_field_value_docids_sorter, // MTBL writers docid_word_positions_writer, documents_writer, @@ -151,6 +173,35 @@ impl Store { Ok(()) } + // Save the documents ids under the facet field id and value we have seen it. + fn insert_facet_values_docid( + &mut self, + field_id: u8, + field_value: FacetValue, + id: DocumentId, + ) -> anyhow::Result<()> + { + let key = (field_id, field_value); + // if get_refresh finds the element it is assured to be at the end of the linked hash map. + match self.facet_field_value_docids.get_refresh(&key) { + Some(old) => { old.insert(id); }, + None => { + // A newly inserted element is append at the end of the linked hash map. + self.facet_field_value_docids.insert(key, RoaringBitmap::from_iter(Some(id))); + // If the word docids just reached it's capacity we must make sure to remove + // one element, this way next time we insert we doesn't grow the capacity. + if self.facet_field_value_docids.len() == self.facet_field_value_docids_limit { + // Removing the front element is equivalent to removing the LRU element. + Self::write_docid_facet_field_values( + &mut self.facet_field_value_docids_sorter, + self.facet_field_value_docids.pop_front(), + )?; + } + } + } + Ok(()) + } + // Save the documents ids under the words pairs proximities that it contains. fn insert_words_pairs_proximities_docids<'a>( &mut self, @@ -191,7 +242,8 @@ impl Store { fn write_document( &mut self, document_id: DocumentId, - words_positions: &HashMap>, + words_positions: &mut HashMap>, + facet_values: &mut HashMap>, record: &[u8], ) -> anyhow::Result<()> { @@ -200,13 +252,20 @@ impl Store { self.insert_words_pairs_proximities_docids(words_pair_proximities, document_id)?; // We store document_id associated with all the words the record contains. - for (word, _) in words_positions { - self.insert_word_docid(word, document_id)?; + for (word, _) in words_positions.drain() { + self.insert_word_docid(&word, document_id)?; } self.documents_writer.insert(document_id.to_be_bytes(), record)?; Self::write_docid_word_positions(&mut self.docid_word_positions_writer, document_id, words_positions)?; + // We store document_id associated with all the field id and values. + for (field, values) in facet_values.drain() { + for value in values { + self.insert_facet_values_docid(field, value, document_id)?; + } + } + Ok(()) } @@ -267,6 +326,31 @@ impl Store { Ok(()) } + fn write_docid_facet_field_values( + sorter: &mut Sorter, + iter: I, + ) -> anyhow::Result<()> + where I: IntoIterator + { + use FacetValue::*; + + for ((field_id, value), docids) in iter { + let result = match value { + String(s) => FacetValueStringCodec::bytes_encode(&(field_id, &s)).map(Cow::into_owned), + Float(f) => FacetValueF64Codec::bytes_encode(&(field_id, *f)).map(Cow::into_owned), + Integer(i) => FacetValueI64Codec::bytes_encode(&(field_id, i)).map(Cow::into_owned), + }; + let key = result.context("could not serialize facet key")?; + let bytes = CboRoaringBitmapCodec::bytes_encode(&docids) + .context("could not serialize docids")?; + if lmdb_key_valid_size(&key) { + sorter.insert(&key, &bytes)?; + } + } + + Ok(()) + } + fn write_word_docids(sorter: &mut Sorter, iter: I) -> anyhow::Result<()> where I: IntoIterator, RoaringBitmap)> { @@ -305,6 +389,7 @@ impl Store { let mut before = Instant::now(); let mut words_positions = HashMap::new(); + let mut facet_values = HashMap::new(); let mut count: usize = 0; while let Some((key, value)) = documents.next()? { @@ -328,7 +413,10 @@ impl Store { let value = serde_json::from_slice(content)?; if let Some(ftype) = self.faceted_fields.get(&attr) { - todo!("parse facet field value") + let mut values = parse_facet_value(*ftype, &value).with_context(|| { + format!("extracting facets from the value {}", value) + })?; + facet_values.entry(attr).or_insert_with(SmallVec8::new).extend(values.drain(..)); } if self.searchable_fields.contains(&attr) { @@ -348,8 +436,7 @@ impl Store { } // We write the document in the documents store. - self.write_document(document_id, &words_positions, value)?; - words_positions.clear(); + self.write_document(document_id, &mut words_positions, &mut facet_values, value)?; } // Compute the document id of the next document. @@ -376,6 +463,10 @@ impl Store { &mut self.words_pairs_proximities_docids_sorter, self.words_pairs_proximities_docids, )?; + Self::write_docid_facet_field_values( + &mut self.facet_field_value_docids_sorter, + self.facet_field_value_docids, + )?; let mut word_docids_wtr = tempfile().and_then(|f| create_writer(comp_type, comp_level, f))?; let mut builder = fst::SetBuilder::memory(); @@ -397,9 +488,13 @@ impl Store { let mut words_pairs_proximities_docids_wtr = tempfile().and_then(|f| create_writer(comp_type, comp_level, f))?; self.words_pairs_proximities_docids_sorter.write_into(&mut words_pairs_proximities_docids_wtr)?; + let mut facet_field_value_docids_wtr = tempfile().and_then(|f| create_writer(comp_type, comp_level, f))?; + self.facet_field_value_docids_sorter.write_into(&mut facet_field_value_docids_wtr)?; + let main = writer_into_reader(main_wtr, shrink_size)?; let word_docids = writer_into_reader(word_docids_wtr, shrink_size)?; let words_pairs_proximities_docids = writer_into_reader(words_pairs_proximities_docids_wtr, shrink_size)?; + let facet_field_value_docids = writer_into_reader(facet_field_value_docids_wtr, shrink_size)?; let docid_word_positions = writer_into_reader(self.docid_word_positions_writer, shrink_size)?; let documents = writer_into_reader(self.documents_writer, shrink_size)?; @@ -408,6 +503,7 @@ impl Store { word_docids, docid_word_positions, words_pairs_proximities_docids, + facet_field_value_docids, documents, }) } @@ -453,3 +549,68 @@ fn format_count(n: usize) -> String { fn lmdb_key_valid_size(key: &[u8]) -> bool { !key.is_empty() && key.len() <= LMDB_MAX_KEY_LENGTH } + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +enum FacetValue { + String(SmallString32), + Float(OrderedFloat), + Integer(i64), +} + +fn parse_facet_value(ftype: FacetType, value: &Value) -> anyhow::Result> { + use FacetValue::*; + + fn inner_parse_facet_value( + ftype: FacetType, + value: &Value, + can_recurse: bool, + output: &mut SmallVec8, + ) -> anyhow::Result<()> + { + match value { + Value::Null => Ok(()), + Value::Bool(b) => Ok(output.push(Integer(*b as i64))), + Value::Number(number) => match ftype { + FacetType::String => bail!("invalid facet type, expecting {} found number", ftype), + FacetType::Float => match number.as_f64() { + Some(float) => Ok(output.push(Float(OrderedFloat(float)))), + None => bail!("invalid facet type, expecting {} found integer", ftype), + }, + FacetType::Integer => match number.as_i64() { + Some(integer) => Ok(output.push(Integer(integer))), + None => if number.is_f64() { + bail!("invalid facet type, expecting {} found float", ftype) + } else { + bail!("invalid facet type, expecting {} found out-of-bound integer (64bit)", ftype) + }, + }, + }, + Value::String(string) => { + match ftype { + FacetType::String => { + let string = SmallString32::from(string.as_str()); + Ok(output.push(String(string))) + }, + FacetType::Float => match string.parse() { + Ok(float) => Ok(output.push(Float(OrderedFloat(float)))), + Err(_err) => bail!("invalid facet type, expecting {} found string", ftype), + }, + FacetType::Integer => match string.parse() { + Ok(integer) => Ok(output.push(Integer(integer))), + Err(_err) => bail!("invalid facet type, expecting {} found string", ftype), + }, + } + }, + Value::Array(values) => if can_recurse { + values.iter().map(|v| inner_parse_facet_value(ftype, v, false, output)).collect() + } else { + bail!("invalid facet type, expecting {} found sub-array ()", ftype) + }, + Value::Object(_) => bail!("invalid facet type, expecting {} found object", ftype), + } + } + + let mut facet_values = SmallVec8::new(); + inner_parse_facet_value(ftype, value, true, &mut facet_values)?; + Ok(facet_values) +} diff --git a/src/update/settings.rs b/src/update/settings.rs index d5ffba23b..de165926f 100644 --- a/src/update/settings.rs +++ b/src/update/settings.rs @@ -306,6 +306,7 @@ mod tests { use super::*; use crate::update::{IndexDocuments, UpdateFormat}; use heed::EnvOpenOptions; + use maplit::hashmap; #[test] fn set_and_reset_searchable_fields() { @@ -473,4 +474,31 @@ mod tests { assert_eq!(fields_ids, None); drop(rtxn); } + + #[test] + fn set_faceted_fields() { + let path = tempfile::tempdir().unwrap(); + let mut options = EnvOpenOptions::new(); + options.map_size(10 * 1024 * 1024); // 10 MB + let index = Index::new(options, &path).unwrap(); + + // Set the faceted fields to be the age. + let mut wtxn = index.write_txn().unwrap(); + let mut builder = Settings::new(&mut wtxn, &index); + builder.set_faceted_fields(hashmap!{ "age".into() => "integer".into() }); + builder.execute(|_| ()).unwrap(); + + // Then index some documents. + let content = &b"name,age\nkevin,23\nkevina,21\nbenoit,34\n"[..]; + let mut builder = IndexDocuments::new(&mut wtxn, &index); + builder.update_format(UpdateFormat::Csv); + builder.execute(content, |_| ()).unwrap(); + wtxn.commit().unwrap(); + + // Check that the displayed fields are correctly set. + let rtxn = index.read_txn().unwrap(); + let fields_ids = index.faceted_fields(&rtxn).unwrap(); + assert_eq!(fields_ids, hashmap!{ 0 => FacetType::Integer }); + drop(rtxn); + } }