diff --git a/milli/src/update/index_documents/extract/mod.rs b/milli/src/update/index_documents/extract/mod.rs index 18340a3ae..7fa2fa237 100644 --- a/milli/src/update/index_documents/extract/mod.rs +++ b/milli/src/update/index_documents/extract/mod.rs @@ -8,6 +8,7 @@ mod extract_vector_points; mod extract_word_docids; mod extract_word_pair_proximity_docids; mod extract_word_position_docids; +// mod searchable; use std::fs::File; use std::io::BufReader; diff --git a/milli/src/update/index_documents/extract/searchable/field_word_position.rs b/milli/src/update/index_documents/extract/searchable/field_word_position.rs new file mode 100644 index 000000000..6a4e6f646 --- /dev/null +++ b/milli/src/update/index_documents/extract/searchable/field_word_position.rs @@ -0,0 +1,211 @@ +use std::collections::HashMap; + +use charabia::normalizer::NormalizedTokenIter; +use charabia::{Language, Script, SeparatorKind, Token, TokenKind, Tokenizer, TokenizerBuilder}; +use roaring::RoaringBitmap; +use serde_json::Value; + +use crate::update::settings::InnerIndexSettings; +use crate::{InternalError, Result, MAX_POSITION_PER_ATTRIBUTE, MAX_WORD_LENGTH}; + +pub type ScriptLanguageDocidsMap = HashMap<(Script, Language), (RoaringBitmap, RoaringBitmap)>; + +pub struct FieldWordPositionExtractorBuilder<'a> { + max_positions_per_attributes: u16, + stop_words: Option<&'a fst::Set>>, + separators: Option>, + dictionary: Option>, +} + +impl<'a> FieldWordPositionExtractorBuilder<'a> { + pub fn new( + max_positions_per_attributes: Option, + settings: &'a InnerIndexSettings, + ) -> Result { + let stop_words = settings.stop_words.as_ref(); + let separators: Option> = + settings.allowed_separators.as_ref().map(|s| s.iter().map(String::as_str).collect()); + let dictionary: Option> = + settings.dictionary.as_ref().map(|s| s.iter().map(String::as_str).collect()); + Ok(Self { + max_positions_per_attributes: max_positions_per_attributes + .map_or(MAX_POSITION_PER_ATTRIBUTE as u16, |max| { + max.min(MAX_POSITION_PER_ATTRIBUTE) as u16 + }), + stop_words, + separators, + dictionary, + }) + } + + pub fn build(&'a self) -> FieldWordPositionExtractor<'a> { + let builder = tokenizer_builder( + self.stop_words, + self.separators.as_deref(), + self.dictionary.as_deref(), + None, + ); + + FieldWordPositionExtractor { + tokenizer: builder.into_tokenizer(), + max_positions_per_attributes: self.max_positions_per_attributes, + } + } +} + +pub struct FieldWordPositionExtractor<'a> { + tokenizer: Tokenizer<'a>, + max_positions_per_attributes: u16, +} + +impl<'a> FieldWordPositionExtractor<'a> { + pub fn extract<'b>( + &'a self, + field_bytes: &[u8], + buffer: &'b mut String, + ) -> Result> { + let field_value = serde_json::from_slice(field_bytes).map_err(InternalError::SerdeJson)?; + Ok(ExtractedFieldWordPosition { + tokenizer: &self.tokenizer, + max_positions_per_attributes: self.max_positions_per_attributes, + field_value, + buffer: buffer, + }) + } +} + +pub struct ExtractedFieldWordPosition<'a, 'b> { + tokenizer: &'a Tokenizer<'a>, + max_positions_per_attributes: u16, + field_value: Value, + buffer: &'b mut String, +} + +impl<'a> ExtractedFieldWordPosition<'a, '_> { + pub fn iter<'o>(&'o mut self) -> FieldWordPositionIter<'o> { + self.buffer.clear(); + let inner = match json_to_string(&self.field_value, &mut self.buffer) { + Some(field) => Some(self.tokenizer.tokenize(field)), + None => None, + }; + + // create an iterator of token with their positions. + FieldWordPositionIter { + inner, + max_positions_per_attributes: self.max_positions_per_attributes, + position: 0, + prev_kind: None, + } + } +} + +pub struct FieldWordPositionIter<'a> { + inner: Option>, + max_positions_per_attributes: u16, + position: u16, + prev_kind: Option, +} + +impl<'a> Iterator for FieldWordPositionIter<'a> { + type Item = (u16, Token<'a>); + + fn next(&mut self) -> Option { + if self.position >= self.max_positions_per_attributes { + return None; + } + + let token = self.inner.as_mut().map(|i| i.next()).flatten()?; + + match token.kind { + TokenKind::Word | TokenKind::StopWord if !token.lemma().is_empty() => { + self.position += match self.prev_kind { + Some(TokenKind::Separator(SeparatorKind::Hard)) => 8, + Some(_) => 1, + None => 0, + }; + self.prev_kind = Some(token.kind) + } + TokenKind::Separator(_) if self.position == 0 => { + return self.next(); + } + TokenKind::Separator(SeparatorKind::Hard) => { + self.prev_kind = Some(token.kind); + } + TokenKind::Separator(SeparatorKind::Soft) + if self.prev_kind != Some(TokenKind::Separator(SeparatorKind::Hard)) => + { + self.prev_kind = Some(token.kind); + } + _ => return self.next(), + } + + if !token.is_word() { + return self.next(); + } + + // keep a word only if it is not empty and fit in a LMDB key. + let lemma = token.lemma().trim(); + if !lemma.is_empty() && lemma.len() <= MAX_WORD_LENGTH { + Some((self.position, token)) + } else { + self.next() + } + } +} + +/// Factorize tokenizer building. +pub fn tokenizer_builder<'a>( + stop_words: Option<&'a fst::Set>>, + allowed_separators: Option<&'a [&str]>, + dictionary: Option<&'a [&str]>, + script_language: Option<&'a HashMap>>, +) -> TokenizerBuilder<'a, Vec> { + let mut tokenizer_builder = TokenizerBuilder::new(); + if let Some(stop_words) = stop_words { + tokenizer_builder.stop_words(stop_words); + } + if let Some(dictionary) = dictionary { + tokenizer_builder.words_dict(dictionary); + } + if let Some(separators) = allowed_separators { + tokenizer_builder.separators(separators); + } + + if let Some(script_language) = script_language { + tokenizer_builder.allow_list(script_language); + } + + tokenizer_builder +} + +/// Transform a JSON value into a string that can be indexed. +fn json_to_string<'a>(value: &'a Value, buffer: &'a mut String) -> Option<&'a str> { + fn inner(value: &Value, output: &mut String) -> bool { + use std::fmt::Write; + match value { + Value::Null | Value::Object(_) => false, + Value::Bool(boolean) => write!(output, "{}", boolean).is_ok(), + Value::Number(number) => write!(output, "{}", number).is_ok(), + Value::String(string) => write!(output, "{}", string).is_ok(), + Value::Array(array) => { + let mut count = 0; + for value in array { + if inner(value, output) { + output.push_str(". "); + count += 1; + } + } + // check that at least one value was written + count != 0 + } + } + } + + if let Value::String(string) = value { + Some(string) + } else if inner(value, buffer) { + Some(buffer) + } else { + None + } +} diff --git a/milli/src/update/index_documents/extract/searchable/mod.rs b/milli/src/update/index_documents/extract/searchable/mod.rs new file mode 100644 index 000000000..f26ed06b3 --- /dev/null +++ b/milli/src/update/index_documents/extract/searchable/mod.rs @@ -0,0 +1,114 @@ +use std::collections::{BTreeMap, BTreeSet}; +use std::convert::TryInto; +use std::fs::File; +use std::io; +use std::io::BufReader; + +use field_word_position::FieldWordPositionExtractorBuilder; +use obkv::KvReader; +use roaring::RoaringBitmap; +use word_docids::{WordDocidsDump, WordDocidsExtractor}; + +use crate::update::del_add::{DelAdd, KvReaderDelAdd}; +use crate::update::index_documents::extract::extract_docid_word_positions::ScriptLanguageDocidsMap; +use crate::update::index_documents::GrenadParameters; +use crate::update::settings::InnerIndexSettingsDiff; +use crate::{FieldId, Result, SerializationError}; + +mod field_word_position; +mod word_docids; + +#[tracing::instrument(level = "trace", skip_all, target = "indexing::extract")] +pub fn extract_searchable_data( + obkv_documents: grenad::Reader, + indexer: GrenadParameters, + settings_diff: &InnerIndexSettingsDiff, + max_positions_per_attributes: Option, +) -> Result<(grenad::Reader>, ScriptLanguageDocidsMap)> { + let searchable_fields_to_index = settings_diff.searchable_fields_to_index(); + + let mut documents_ids = RoaringBitmap::new(); + + let add_builder = + FieldWordPositionExtractorBuilder::new(max_positions_per_attributes, &settings_diff.new)?; + let add_token_positions_extractor = add_builder.build(); + let del_builder; + let del_token_positions_extractor = if settings_diff.settings_update_only { + del_builder = FieldWordPositionExtractorBuilder::new( + max_positions_per_attributes, + &settings_diff.old, + )?; + del_builder.build() + } else { + add_builder.build() + }; + let token_positions_extractor = &[del_token_positions_extractor, add_token_positions_extractor]; + + let mut word_map = BTreeMap::new(); + let mut word_docids_extractor = WordDocidsExtractor::new(settings_diff); + + let mut cursor = obkv_documents.into_cursor()?; + // loop over documents + while let Some((key, value)) = cursor.move_on_next()? { + let document_id = key + .try_into() + .map(u32::from_be_bytes) + .map_err(|_| SerializationError::InvalidNumberSerialization)?; + let obkv = KvReader::::new(value); + // if the searchable fields didn't change, skip the searchable indexing for this document. + if !settings_diff.reindex_searchable() + && !searchable_fields_changed(&obkv, &searchable_fields_to_index) + { + continue; + } + + documents_ids.push(document_id); + + let mut buffer = String::new(); + for field_id in searchable_fields_to_index.iter() { + let Some(field_obkv) = obkv.get(*field_id).map(KvReaderDelAdd::new) else { continue }; + + for (deladd, field_bytes) in field_obkv { + let mut extracted_positions = + token_positions_extractor[deladd as usize].extract(field_bytes, &mut buffer)?; + for (position, token) in extracted_positions.iter() { + let word = token.lemma().trim(); + if !word_map.contains_key(word) { + word_map.insert(word.to_string(), word_map.len() as u32); + } + let word_id = word_map.get(word).unwrap(); + word_docids_extractor.insert(*word_id, *field_id, document_id, deladd); + } + } + } + + if word_docids_extractor.rough_size_estimate() + > indexer.max_memory.map_or(512 * 1024 * 1024, |s| s.min(512 * 1024 * 1024)) + { + let WordDocidsDump { .. } = + word_docids_extractor.dump(&word_map, &searchable_fields_to_index, indexer)?; + } + } + + todo!() +} + +/// Check if any searchable fields of a document changed. +fn searchable_fields_changed( + obkv: &KvReader, + searchable_fields: &BTreeSet, +) -> bool { + for field_id in searchable_fields { + let Some(field_obkv) = obkv.get(*field_id).map(KvReaderDelAdd::new) else { continue }; + match (field_obkv.get(DelAdd::Deletion), field_obkv.get(DelAdd::Addition)) { + // if both fields are None, check the next field. + (None, None) => (), + // if both contains a value and values are the same, check the next field. + (Some(del), Some(add)) if del == add => (), + // otherwise the fields are different, return true. + _otherwise => return true, + } + } + + false +} diff --git a/milli/src/update/index_documents/extract/searchable/word_docids.rs b/milli/src/update/index_documents/extract/searchable/word_docids.rs new file mode 100644 index 000000000..7f737525d --- /dev/null +++ b/milli/src/update/index_documents/extract/searchable/word_docids.rs @@ -0,0 +1,203 @@ +use std::collections::hash_map::Entry::{Occupied, Vacant}; +use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; +use std::fs::File; +use std::hash::Hash; +use std::io::BufReader; +use std::mem::size_of; + +use roaring::RoaringBitmap; + +use crate::update::del_add::KvWriterDelAdd; +use crate::update::index_documents::extract::searchable::DelAdd; +use crate::update::index_documents::{create_writer, writer_into_reader, GrenadParameters}; +use crate::update::settings::InnerIndexSettingsDiff; +use crate::{CboRoaringBitmapCodec, DocumentId, FieldId, Result}; + +pub struct WordDocidsExtractor<'a> { + word_fid_docids: RevertedIndex<(u32, FieldId)>, + settings_diff: &'a InnerIndexSettingsDiff, +} + +impl<'a> WordDocidsExtractor<'a> { + pub fn new(settings_diff: &'a InnerIndexSettingsDiff) -> Self { + Self { word_fid_docids: RevertedIndex::new(), settings_diff } + } + pub fn insert(&mut self, wordid: u32, fieldid: FieldId, docid: DocumentId, del_add: DelAdd) { + self.word_fid_docids.insert((wordid, fieldid), docid, del_add); + } + + pub fn rough_size_estimate(&self) -> usize { + self.word_fid_docids.rough_size_estimate() + } + + pub fn dump( + &mut self, + word_map: &BTreeMap, + fields: &BTreeSet, + indexer: GrenadParameters, + ) -> Result { + let mut word_fid_docids_writer = create_writer( + indexer.chunk_compression_type, + indexer.chunk_compression_level, + tempfile::tempfile()?, + ); + + let mut word_docids_writer = create_writer( + indexer.chunk_compression_type, + indexer.chunk_compression_level, + tempfile::tempfile()?, + ); + + let mut exact_word_docids_writer = create_writer( + indexer.chunk_compression_type, + indexer.chunk_compression_level, + tempfile::tempfile()?, + ); + + let mut exact_word_deletion = RoaringBitmap::new(); + let mut exact_word_addition = RoaringBitmap::new(); + let mut word_deletion = RoaringBitmap::new(); + let mut word_addition = RoaringBitmap::new(); + let mut key_buffer = Vec::new(); + let mut bitmap_buffer = Vec::new(); + let mut obkv_buffer = Vec::new(); + for (word, wid) in word_map { + exact_word_deletion.clear(); + exact_word_addition.clear(); + word_deletion.clear(); + word_addition.clear(); + for fid in fields { + if let Some((deletion, addition)) = self.word_fid_docids.inner.get(&(*wid, *fid)) { + if self.settings_diff.old.exact_attributes.contains(&fid) { + exact_word_deletion |= deletion; + } else { + word_deletion |= deletion; + } + + if self.settings_diff.new.exact_attributes.contains(&fid) { + exact_word_addition |= addition; + } else { + word_addition |= addition; + } + + if deletion != addition { + key_buffer.clear(); + key_buffer.extend_from_slice(word.as_bytes()); + key_buffer.push(0); + key_buffer.extend_from_slice(&fid.to_be_bytes()); + let value = bitmaps_into_deladd_obkv( + deletion, + addition, + &mut obkv_buffer, + &mut bitmap_buffer, + )?; + word_fid_docids_writer.insert(&key_buffer, value)?; + } + } + } + + key_buffer.clear(); + key_buffer.extend_from_slice(word.as_bytes()); + if exact_word_deletion != exact_word_addition { + let value = bitmaps_into_deladd_obkv( + &exact_word_deletion, + &exact_word_addition, + &mut obkv_buffer, + &mut bitmap_buffer, + )?; + exact_word_docids_writer.insert(&key_buffer, value)?; + } + + if word_deletion != word_addition { + let value = bitmaps_into_deladd_obkv( + &word_deletion, + &word_addition, + &mut obkv_buffer, + &mut bitmap_buffer, + )?; + word_docids_writer.insert(&key_buffer, value)?; + } + } + + self.word_fid_docids.clear(); + + Ok(WordDocidsDump { + word_fid_docids: writer_into_reader(word_fid_docids_writer)?, + word_docids: writer_into_reader(word_docids_writer)?, + exact_word_docids: writer_into_reader(exact_word_docids_writer)?, + }) + } +} + +fn bitmaps_into_deladd_obkv<'a>( + deletion: &RoaringBitmap, + addition: &RoaringBitmap, + obkv_buffer: &'a mut Vec, + bitmap_buffer: &mut Vec, +) -> Result<&'a mut Vec> { + obkv_buffer.clear(); + let mut value_writer = KvWriterDelAdd::new(obkv_buffer); + if !deletion.is_empty() { + bitmap_buffer.clear(); + CboRoaringBitmapCodec::serialize_into(deletion, bitmap_buffer); + value_writer.insert(DelAdd::Deletion, &*bitmap_buffer)?; + } + if !addition.is_empty() { + bitmap_buffer.clear(); + CboRoaringBitmapCodec::serialize_into(addition, bitmap_buffer); + value_writer.insert(DelAdd::Addition, &*bitmap_buffer)?; + } + Ok(value_writer.into_inner()?) +} + +#[derive(Debug)] +struct RevertedIndex { + inner: HashMap, + max_value_size: usize, +} + +impl RevertedIndex { + pub fn insert(&mut self, key: K, docid: DocumentId, del_add: DelAdd) { + let size = match self.inner.entry(key) { + Occupied(mut entry) => { + let (ref mut del, ref mut add) = entry.get_mut(); + match del_add { + DelAdd::Deletion => del.insert(docid), + DelAdd::Addition => add.insert(docid), + }; + del.serialized_size() + add.serialized_size() + } + Vacant(entry) => { + let mut bitmap = RoaringBitmap::new(); + bitmap.insert(docid); + let size = bitmap.serialized_size(); + match del_add { + DelAdd::Deletion => entry.insert((bitmap, RoaringBitmap::new())), + DelAdd::Addition => entry.insert((RoaringBitmap::new(), bitmap)), + }; + size * 2 + } + }; + + self.max_value_size = self.max_value_size.max(size); + } + + pub fn new() -> Self { + Self { inner: HashMap::new(), max_value_size: 0 } + } + + pub fn rough_size_estimate(&self) -> usize { + self.inner.len() * size_of::() + self.inner.len() * self.max_value_size + } + + fn clear(&mut self) { + self.max_value_size = 0; + self.inner.clear(); + } +} + +pub struct WordDocidsDump { + pub word_fid_docids: grenad::Reader>, + pub word_docids: grenad::Reader>, + pub exact_word_docids: grenad::Reader>, +} diff --git a/milli/src/update/settings.rs b/milli/src/update/settings.rs index be9b6b74e..8e85b2c4c 100644 --- a/milli/src/update/settings.rs +++ b/milli/src/update/settings.rs @@ -1162,6 +1162,18 @@ impl InnerIndexSettingsDiff { } } + pub fn searchable_fields_to_index(&self) -> BTreeSet { + if self.settings_update_only { + self.new + .fields_ids_map + .ids() + .filter(|id| self.reindex_searchable_id(*id).is_some()) + .collect() + } else { + self.new.searchable_fields_ids.iter().copied().collect() + } + } + pub fn any_reindexing_needed(&self) -> bool { self.reindex_searchable() || self.reindex_facets() || self.reindex_vectors() }