This commit is contained in:
ManyTheFish 2024-07-01 11:24:32 +02:00
parent a838f39fce
commit 9874efc352
5 changed files with 541 additions and 0 deletions

View File

@ -8,6 +8,7 @@ mod extract_vector_points;
mod extract_word_docids;
mod extract_word_pair_proximity_docids;
mod extract_word_position_docids;
// mod searchable;
use std::fs::File;
use std::io::BufReader;

View File

@ -0,0 +1,211 @@
use std::collections::HashMap;
use charabia::normalizer::NormalizedTokenIter;
use charabia::{Language, Script, SeparatorKind, Token, TokenKind, Tokenizer, TokenizerBuilder};
use roaring::RoaringBitmap;
use serde_json::Value;
use crate::update::settings::InnerIndexSettings;
use crate::{InternalError, Result, MAX_POSITION_PER_ATTRIBUTE, MAX_WORD_LENGTH};
pub type ScriptLanguageDocidsMap = HashMap<(Script, Language), (RoaringBitmap, RoaringBitmap)>;
pub struct FieldWordPositionExtractorBuilder<'a> {
max_positions_per_attributes: u16,
stop_words: Option<&'a fst::Set<Vec<u8>>>,
separators: Option<Vec<&'a str>>,
dictionary: Option<Vec<&'a str>>,
}
impl<'a> FieldWordPositionExtractorBuilder<'a> {
pub fn new(
max_positions_per_attributes: Option<u32>,
settings: &'a InnerIndexSettings,
) -> Result<Self> {
let stop_words = settings.stop_words.as_ref();
let separators: Option<Vec<_>> =
settings.allowed_separators.as_ref().map(|s| s.iter().map(String::as_str).collect());
let dictionary: Option<Vec<_>> =
settings.dictionary.as_ref().map(|s| s.iter().map(String::as_str).collect());
Ok(Self {
max_positions_per_attributes: max_positions_per_attributes
.map_or(MAX_POSITION_PER_ATTRIBUTE as u16, |max| {
max.min(MAX_POSITION_PER_ATTRIBUTE) as u16
}),
stop_words,
separators,
dictionary,
})
}
pub fn build(&'a self) -> FieldWordPositionExtractor<'a> {
let builder = tokenizer_builder(
self.stop_words,
self.separators.as_deref(),
self.dictionary.as_deref(),
None,
);
FieldWordPositionExtractor {
tokenizer: builder.into_tokenizer(),
max_positions_per_attributes: self.max_positions_per_attributes,
}
}
}
pub struct FieldWordPositionExtractor<'a> {
tokenizer: Tokenizer<'a>,
max_positions_per_attributes: u16,
}
impl<'a> FieldWordPositionExtractor<'a> {
pub fn extract<'b>(
&'a self,
field_bytes: &[u8],
buffer: &'b mut String,
) -> Result<ExtractedFieldWordPosition<'a, 'b>> {
let field_value = serde_json::from_slice(field_bytes).map_err(InternalError::SerdeJson)?;
Ok(ExtractedFieldWordPosition {
tokenizer: &self.tokenizer,
max_positions_per_attributes: self.max_positions_per_attributes,
field_value,
buffer: buffer,
})
}
}
pub struct ExtractedFieldWordPosition<'a, 'b> {
tokenizer: &'a Tokenizer<'a>,
max_positions_per_attributes: u16,
field_value: Value,
buffer: &'b mut String,
}
impl<'a> ExtractedFieldWordPosition<'a, '_> {
pub fn iter<'o>(&'o mut self) -> FieldWordPositionIter<'o> {
self.buffer.clear();
let inner = match json_to_string(&self.field_value, &mut self.buffer) {
Some(field) => Some(self.tokenizer.tokenize(field)),
None => None,
};
// create an iterator of token with their positions.
FieldWordPositionIter {
inner,
max_positions_per_attributes: self.max_positions_per_attributes,
position: 0,
prev_kind: None,
}
}
}
pub struct FieldWordPositionIter<'a> {
inner: Option<NormalizedTokenIter<'a, 'a>>,
max_positions_per_attributes: u16,
position: u16,
prev_kind: Option<TokenKind>,
}
impl<'a> Iterator for FieldWordPositionIter<'a> {
type Item = (u16, Token<'a>);
fn next(&mut self) -> Option<Self::Item> {
if self.position >= self.max_positions_per_attributes {
return None;
}
let token = self.inner.as_mut().map(|i| i.next()).flatten()?;
match token.kind {
TokenKind::Word | TokenKind::StopWord if !token.lemma().is_empty() => {
self.position += match self.prev_kind {
Some(TokenKind::Separator(SeparatorKind::Hard)) => 8,
Some(_) => 1,
None => 0,
};
self.prev_kind = Some(token.kind)
}
TokenKind::Separator(_) if self.position == 0 => {
return self.next();
}
TokenKind::Separator(SeparatorKind::Hard) => {
self.prev_kind = Some(token.kind);
}
TokenKind::Separator(SeparatorKind::Soft)
if self.prev_kind != Some(TokenKind::Separator(SeparatorKind::Hard)) =>
{
self.prev_kind = Some(token.kind);
}
_ => return self.next(),
}
if !token.is_word() {
return self.next();
}
// keep a word only if it is not empty and fit in a LMDB key.
let lemma = token.lemma().trim();
if !lemma.is_empty() && lemma.len() <= MAX_WORD_LENGTH {
Some((self.position, token))
} else {
self.next()
}
}
}
/// Factorize tokenizer building.
pub fn tokenizer_builder<'a>(
stop_words: Option<&'a fst::Set<Vec<u8>>>,
allowed_separators: Option<&'a [&str]>,
dictionary: Option<&'a [&str]>,
script_language: Option<&'a HashMap<Script, Vec<Language>>>,
) -> TokenizerBuilder<'a, Vec<u8>> {
let mut tokenizer_builder = TokenizerBuilder::new();
if let Some(stop_words) = stop_words {
tokenizer_builder.stop_words(stop_words);
}
if let Some(dictionary) = dictionary {
tokenizer_builder.words_dict(dictionary);
}
if let Some(separators) = allowed_separators {
tokenizer_builder.separators(separators);
}
if let Some(script_language) = script_language {
tokenizer_builder.allow_list(script_language);
}
tokenizer_builder
}
/// Transform a JSON value into a string that can be indexed.
fn json_to_string<'a>(value: &'a Value, buffer: &'a mut String) -> Option<&'a str> {
fn inner(value: &Value, output: &mut String) -> bool {
use std::fmt::Write;
match value {
Value::Null | Value::Object(_) => false,
Value::Bool(boolean) => write!(output, "{}", boolean).is_ok(),
Value::Number(number) => write!(output, "{}", number).is_ok(),
Value::String(string) => write!(output, "{}", string).is_ok(),
Value::Array(array) => {
let mut count = 0;
for value in array {
if inner(value, output) {
output.push_str(". ");
count += 1;
}
}
// check that at least one value was written
count != 0
}
}
}
if let Value::String(string) = value {
Some(string)
} else if inner(value, buffer) {
Some(buffer)
} else {
None
}
}

View File

@ -0,0 +1,114 @@
use std::collections::{BTreeMap, BTreeSet};
use std::convert::TryInto;
use std::fs::File;
use std::io;
use std::io::BufReader;
use field_word_position::FieldWordPositionExtractorBuilder;
use obkv::KvReader;
use roaring::RoaringBitmap;
use word_docids::{WordDocidsDump, WordDocidsExtractor};
use crate::update::del_add::{DelAdd, KvReaderDelAdd};
use crate::update::index_documents::extract::extract_docid_word_positions::ScriptLanguageDocidsMap;
use crate::update::index_documents::GrenadParameters;
use crate::update::settings::InnerIndexSettingsDiff;
use crate::{FieldId, Result, SerializationError};
mod field_word_position;
mod word_docids;
#[tracing::instrument(level = "trace", skip_all, target = "indexing::extract")]
pub fn extract_searchable_data<R: io::Read + io::Seek>(
obkv_documents: grenad::Reader<R>,
indexer: GrenadParameters,
settings_diff: &InnerIndexSettingsDiff,
max_positions_per_attributes: Option<u32>,
) -> Result<(grenad::Reader<BufReader<File>>, ScriptLanguageDocidsMap)> {
let searchable_fields_to_index = settings_diff.searchable_fields_to_index();
let mut documents_ids = RoaringBitmap::new();
let add_builder =
FieldWordPositionExtractorBuilder::new(max_positions_per_attributes, &settings_diff.new)?;
let add_token_positions_extractor = add_builder.build();
let del_builder;
let del_token_positions_extractor = if settings_diff.settings_update_only {
del_builder = FieldWordPositionExtractorBuilder::new(
max_positions_per_attributes,
&settings_diff.old,
)?;
del_builder.build()
} else {
add_builder.build()
};
let token_positions_extractor = &[del_token_positions_extractor, add_token_positions_extractor];
let mut word_map = BTreeMap::new();
let mut word_docids_extractor = WordDocidsExtractor::new(settings_diff);
let mut cursor = obkv_documents.into_cursor()?;
// loop over documents
while let Some((key, value)) = cursor.move_on_next()? {
let document_id = key
.try_into()
.map(u32::from_be_bytes)
.map_err(|_| SerializationError::InvalidNumberSerialization)?;
let obkv = KvReader::<FieldId>::new(value);
// if the searchable fields didn't change, skip the searchable indexing for this document.
if !settings_diff.reindex_searchable()
&& !searchable_fields_changed(&obkv, &searchable_fields_to_index)
{
continue;
}
documents_ids.push(document_id);
let mut buffer = String::new();
for field_id in searchable_fields_to_index.iter() {
let Some(field_obkv) = obkv.get(*field_id).map(KvReaderDelAdd::new) else { continue };
for (deladd, field_bytes) in field_obkv {
let mut extracted_positions =
token_positions_extractor[deladd as usize].extract(field_bytes, &mut buffer)?;
for (position, token) in extracted_positions.iter() {
let word = token.lemma().trim();
if !word_map.contains_key(word) {
word_map.insert(word.to_string(), word_map.len() as u32);
}
let word_id = word_map.get(word).unwrap();
word_docids_extractor.insert(*word_id, *field_id, document_id, deladd);
}
}
}
if word_docids_extractor.rough_size_estimate()
> indexer.max_memory.map_or(512 * 1024 * 1024, |s| s.min(512 * 1024 * 1024))
{
let WordDocidsDump { .. } =
word_docids_extractor.dump(&word_map, &searchable_fields_to_index, indexer)?;
}
}
todo!()
}
/// Check if any searchable fields of a document changed.
fn searchable_fields_changed(
obkv: &KvReader<FieldId>,
searchable_fields: &BTreeSet<FieldId>,
) -> bool {
for field_id in searchable_fields {
let Some(field_obkv) = obkv.get(*field_id).map(KvReaderDelAdd::new) else { continue };
match (field_obkv.get(DelAdd::Deletion), field_obkv.get(DelAdd::Addition)) {
// if both fields are None, check the next field.
(None, None) => (),
// if both contains a value and values are the same, check the next field.
(Some(del), Some(add)) if del == add => (),
// otherwise the fields are different, return true.
_otherwise => return true,
}
}
false
}

View File

@ -0,0 +1,203 @@
use std::collections::hash_map::Entry::{Occupied, Vacant};
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::fs::File;
use std::hash::Hash;
use std::io::BufReader;
use std::mem::size_of;
use roaring::RoaringBitmap;
use crate::update::del_add::KvWriterDelAdd;
use crate::update::index_documents::extract::searchable::DelAdd;
use crate::update::index_documents::{create_writer, writer_into_reader, GrenadParameters};
use crate::update::settings::InnerIndexSettingsDiff;
use crate::{CboRoaringBitmapCodec, DocumentId, FieldId, Result};
pub struct WordDocidsExtractor<'a> {
word_fid_docids: RevertedIndex<(u32, FieldId)>,
settings_diff: &'a InnerIndexSettingsDiff,
}
impl<'a> WordDocidsExtractor<'a> {
pub fn new(settings_diff: &'a InnerIndexSettingsDiff) -> Self {
Self { word_fid_docids: RevertedIndex::new(), settings_diff }
}
pub fn insert(&mut self, wordid: u32, fieldid: FieldId, docid: DocumentId, del_add: DelAdd) {
self.word_fid_docids.insert((wordid, fieldid), docid, del_add);
}
pub fn rough_size_estimate(&self) -> usize {
self.word_fid_docids.rough_size_estimate()
}
pub fn dump(
&mut self,
word_map: &BTreeMap<String, u32>,
fields: &BTreeSet<FieldId>,
indexer: GrenadParameters,
) -> Result<WordDocidsDump> {
let mut word_fid_docids_writer = create_writer(
indexer.chunk_compression_type,
indexer.chunk_compression_level,
tempfile::tempfile()?,
);
let mut word_docids_writer = create_writer(
indexer.chunk_compression_type,
indexer.chunk_compression_level,
tempfile::tempfile()?,
);
let mut exact_word_docids_writer = create_writer(
indexer.chunk_compression_type,
indexer.chunk_compression_level,
tempfile::tempfile()?,
);
let mut exact_word_deletion = RoaringBitmap::new();
let mut exact_word_addition = RoaringBitmap::new();
let mut word_deletion = RoaringBitmap::new();
let mut word_addition = RoaringBitmap::new();
let mut key_buffer = Vec::new();
let mut bitmap_buffer = Vec::new();
let mut obkv_buffer = Vec::new();
for (word, wid) in word_map {
exact_word_deletion.clear();
exact_word_addition.clear();
word_deletion.clear();
word_addition.clear();
for fid in fields {
if let Some((deletion, addition)) = self.word_fid_docids.inner.get(&(*wid, *fid)) {
if self.settings_diff.old.exact_attributes.contains(&fid) {
exact_word_deletion |= deletion;
} else {
word_deletion |= deletion;
}
if self.settings_diff.new.exact_attributes.contains(&fid) {
exact_word_addition |= addition;
} else {
word_addition |= addition;
}
if deletion != addition {
key_buffer.clear();
key_buffer.extend_from_slice(word.as_bytes());
key_buffer.push(0);
key_buffer.extend_from_slice(&fid.to_be_bytes());
let value = bitmaps_into_deladd_obkv(
deletion,
addition,
&mut obkv_buffer,
&mut bitmap_buffer,
)?;
word_fid_docids_writer.insert(&key_buffer, value)?;
}
}
}
key_buffer.clear();
key_buffer.extend_from_slice(word.as_bytes());
if exact_word_deletion != exact_word_addition {
let value = bitmaps_into_deladd_obkv(
&exact_word_deletion,
&exact_word_addition,
&mut obkv_buffer,
&mut bitmap_buffer,
)?;
exact_word_docids_writer.insert(&key_buffer, value)?;
}
if word_deletion != word_addition {
let value = bitmaps_into_deladd_obkv(
&word_deletion,
&word_addition,
&mut obkv_buffer,
&mut bitmap_buffer,
)?;
word_docids_writer.insert(&key_buffer, value)?;
}
}
self.word_fid_docids.clear();
Ok(WordDocidsDump {
word_fid_docids: writer_into_reader(word_fid_docids_writer)?,
word_docids: writer_into_reader(word_docids_writer)?,
exact_word_docids: writer_into_reader(exact_word_docids_writer)?,
})
}
}
fn bitmaps_into_deladd_obkv<'a>(
deletion: &RoaringBitmap,
addition: &RoaringBitmap,
obkv_buffer: &'a mut Vec<u8>,
bitmap_buffer: &mut Vec<u8>,
) -> Result<&'a mut Vec<u8>> {
obkv_buffer.clear();
let mut value_writer = KvWriterDelAdd::new(obkv_buffer);
if !deletion.is_empty() {
bitmap_buffer.clear();
CboRoaringBitmapCodec::serialize_into(deletion, bitmap_buffer);
value_writer.insert(DelAdd::Deletion, &*bitmap_buffer)?;
}
if !addition.is_empty() {
bitmap_buffer.clear();
CboRoaringBitmapCodec::serialize_into(addition, bitmap_buffer);
value_writer.insert(DelAdd::Addition, &*bitmap_buffer)?;
}
Ok(value_writer.into_inner()?)
}
#[derive(Debug)]
struct RevertedIndex<K> {
inner: HashMap<K, (RoaringBitmap, RoaringBitmap)>,
max_value_size: usize,
}
impl<K: PartialEq + Eq + Hash> RevertedIndex<K> {
pub fn insert(&mut self, key: K, docid: DocumentId, del_add: DelAdd) {
let size = match self.inner.entry(key) {
Occupied(mut entry) => {
let (ref mut del, ref mut add) = entry.get_mut();
match del_add {
DelAdd::Deletion => del.insert(docid),
DelAdd::Addition => add.insert(docid),
};
del.serialized_size() + add.serialized_size()
}
Vacant(entry) => {
let mut bitmap = RoaringBitmap::new();
bitmap.insert(docid);
let size = bitmap.serialized_size();
match del_add {
DelAdd::Deletion => entry.insert((bitmap, RoaringBitmap::new())),
DelAdd::Addition => entry.insert((RoaringBitmap::new(), bitmap)),
};
size * 2
}
};
self.max_value_size = self.max_value_size.max(size);
}
pub fn new() -> Self {
Self { inner: HashMap::new(), max_value_size: 0 }
}
pub fn rough_size_estimate(&self) -> usize {
self.inner.len() * size_of::<K>() + self.inner.len() * self.max_value_size
}
fn clear(&mut self) {
self.max_value_size = 0;
self.inner.clear();
}
}
pub struct WordDocidsDump {
pub word_fid_docids: grenad::Reader<BufReader<File>>,
pub word_docids: grenad::Reader<BufReader<File>>,
pub exact_word_docids: grenad::Reader<BufReader<File>>,
}

View File

@ -1162,6 +1162,18 @@ impl InnerIndexSettingsDiff {
}
}
pub fn searchable_fields_to_index(&self) -> BTreeSet<FieldId> {
if self.settings_update_only {
self.new
.fields_ids_map
.ids()
.filter(|id| self.reindex_searchable_id(*id).is_some())
.collect()
} else {
self.new.searchable_fields_ids.iter().copied().collect()
}
}
pub fn any_reindexing_needed(&self) -> bool {
self.reindex_searchable() || self.reindex_facets() || self.reindex_vectors()
}