Merge branch 'main' into word-pair-proximity-docids-refactor

This commit is contained in:
Loïc Lecrenier 2022-10-24 15:23:00 +02:00
commit d76d0cb1bf
23 changed files with 135 additions and 154 deletions

View File

@ -1,6 +1,6 @@
[package] [package]
name = "benchmarks" name = "benchmarks"
version = "0.33.4" version = "0.34.0"
edition = "2018" edition = "2018"
publish = false publish = false

View File

@ -1,6 +1,6 @@
[package] [package]
name = "cli" name = "cli"
version = "0.33.4" version = "0.34.0"
edition = "2018" edition = "2018"
description = "A CLI to interact with a milli index" description = "A CLI to interact with a milli index"
publish = false publish = false

View File

@ -1,6 +1,6 @@
[package] [package]
name = "filter-parser" name = "filter-parser"
version = "0.33.4" version = "0.34.0"
edition = "2021" edition = "2021"
description = "The parser for the Meilisearch filter syntax" description = "The parser for the Meilisearch filter syntax"
publish = false publish = false

View File

@ -1,6 +1,6 @@
[package] [package]
name = "flatten-serde-json" name = "flatten-serde-json"
version = "0.33.4" version = "0.34.0"
edition = "2021" edition = "2021"
description = "Flatten serde-json objects like elastic search" description = "Flatten serde-json objects like elastic search"
readme = "README.md" readme = "README.md"

View File

@ -1,6 +1,6 @@
[package] [package]
name = "json-depth-checker" name = "json-depth-checker"
version = "0.33.4" version = "0.34.0"
edition = "2021" edition = "2021"
description = "A library that indicates if a JSON must be flattened" description = "A library that indicates if a JSON must be flattened"
publish = false publish = false

View File

@ -1,6 +1,6 @@
[package] [package]
name = "milli" name = "milli"
version = "0.33.4" version = "0.34.0"
authors = ["Kerollmops <clement@meilisearch.com>"] authors = ["Kerollmops <clement@meilisearch.com>"]
edition = "2018" edition = "2018"

View File

@ -195,7 +195,7 @@ pub fn lat_lng_to_xyz(coord: &[f64; 2]) -> [f64; 3] {
/// Returns `true` if the field match one of the faceted fields. /// Returns `true` if the field match one of the faceted fields.
/// See the function [`is_faceted_by`] below to see what “matching” means. /// See the function [`is_faceted_by`] below to see what “matching” means.
pub fn is_faceted(field: &str, faceted_fields: impl IntoIterator<Item = impl AsRef<str>>) -> bool { pub fn is_faceted(field: &str, faceted_fields: impl IntoIterator<Item = impl AsRef<str>>) -> bool {
faceted_fields.into_iter().find(|facet| is_faceted_by(field, facet.as_ref())).is_some() faceted_fields.into_iter().any(|facet| is_faceted_by(field, facet.as_ref()))
} }
/// Returns `true` if the field match the facet. /// Returns `true` if the field match the facet.

View File

@ -7,9 +7,9 @@ use serde::Serialize;
pub mod matching_words; pub mod matching_words;
const DEFAULT_CROP_MARKER: &'static str = ""; const DEFAULT_CROP_MARKER: &str = "";
const DEFAULT_HIGHLIGHT_PREFIX: &'static str = "<em>"; const DEFAULT_HIGHLIGHT_PREFIX: &str = "<em>";
const DEFAULT_HIGHLIGHT_SUFFIX: &'static str = "</em>"; const DEFAULT_HIGHLIGHT_SUFFIX: &str = "</em>";
/// Structure used to build a Matcher allowing to customize formating tags. /// Structure used to build a Matcher allowing to customize formating tags.
pub struct MatcherBuilder<'a, A> { pub struct MatcherBuilder<'a, A> {

View File

@ -357,39 +357,36 @@ pub fn word_derivations<'c>(
} else if fst.contains(word) { } else if fst.contains(word) {
derived_words.push((word.to_string(), 0)); derived_words.push((word.to_string(), 0));
} }
} else if max_typo == 1 {
let dfa = build_dfa(word, 1, is_prefix);
let starts = StartsWith(Str::new(get_first(word)));
let mut stream = fst.search_with_state(Intersection(starts, &dfa)).into_stream();
while let Some((word, state)) = stream.next() {
let word = std::str::from_utf8(word)?;
let d = dfa.distance(state.1);
derived_words.push((word.to_string(), d.to_u8()));
}
} else { } else {
if max_typo == 1 { let starts = StartsWith(Str::new(get_first(word)));
let dfa = build_dfa(word, 1, is_prefix); let first = Intersection(build_dfa(word, 1, is_prefix), Complement(&starts));
let starts = StartsWith(Str::new(get_first(word))); let second_dfa = build_dfa(word, 2, is_prefix);
let mut stream = let second = Intersection(&second_dfa, &starts);
fst.search_with_state(Intersection(starts, &dfa)).into_stream(); let automaton = Union(first, &second);
while let Some((word, state)) = stream.next() { let mut stream = fst.search_with_state(automaton).into_stream();
let word = std::str::from_utf8(word)?;
let d = dfa.distance(state.1);
derived_words.push((word.to_string(), d.to_u8()));
}
} else {
let starts = StartsWith(Str::new(get_first(word)));
let first = Intersection(build_dfa(word, 1, is_prefix), Complement(&starts));
let second_dfa = build_dfa(word, 2, is_prefix);
let second = Intersection(&second_dfa, &starts);
let automaton = Union(first, &second);
let mut stream = fst.search_with_state(automaton).into_stream(); while let Some((found_word, state)) = stream.next() {
let found_word = std::str::from_utf8(found_word)?;
while let Some((found_word, state)) = stream.next() { // in the case the typo is on the first letter, we know the number of typo
let found_word = std::str::from_utf8(found_word)?; // is two
// in the case the typo is on the first letter, we know the number of typo if get_first(found_word) != get_first(word) {
// is two derived_words.push((found_word.to_string(), 2));
if get_first(found_word) != get_first(word) { } else {
derived_words.push((found_word.to_string(), 2)); // Else, we know that it is the second dfa that matched and compute the
} else { // correct distance
// Else, we know that it is the second dfa that matched and compute the let d = second_dfa.distance((state.1).0);
// correct distance derived_words.push((found_word.to_string(), d.to_u8()));
let d = second_dfa.distance((state.1).0);
derived_words.push((found_word.to_string(), d.to_u8()));
}
} }
} }
} }

View File

@ -40,7 +40,7 @@ pub fn extract_fid_word_count_docids<R: io::Read + io::Seek>(
let mut cursor = docid_word_positions.into_cursor()?; let mut cursor = docid_word_positions.into_cursor()?;
while let Some((key, value)) = cursor.move_on_next()? { while let Some((key, value)) = cursor.move_on_next()? {
let (document_id_bytes, _word_bytes) = try_split_array_at(key) let (document_id_bytes, _word_bytes) = try_split_array_at(key)
.ok_or_else(|| SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?; .ok_or(SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?;
let document_id = u32::from_be_bytes(document_id_bytes); let document_id = u32::from_be_bytes(document_id_bytes);
let curr_document_id = *current_document_id.get_or_insert(document_id); let curr_document_id = *current_document_id.get_or_insert(document_id);

View File

@ -60,5 +60,5 @@ pub fn extract_geo_points<R: io::Read + io::Seek>(
} }
} }
Ok(writer_into_reader(writer)?) writer_into_reader(writer)
} }

View File

@ -51,7 +51,7 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
let mut cursor = docid_word_positions.into_cursor()?; let mut cursor = docid_word_positions.into_cursor()?;
while let Some((key, positions)) = cursor.move_on_next()? { while let Some((key, positions)) = cursor.move_on_next()? {
let (document_id_bytes, word_bytes) = try_split_array_at(key) let (document_id_bytes, word_bytes) = try_split_array_at(key)
.ok_or_else(|| SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?; .ok_or(SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?;
let document_id = u32::from_be_bytes(document_id_bytes); let document_id = u32::from_be_bytes(document_id_bytes);
let bitmap = RoaringBitmap::from_iter(Some(document_id)); let bitmap = RoaringBitmap::from_iter(Some(document_id));

View File

@ -39,7 +39,7 @@ pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>(
let mut cursor = docid_word_positions.into_cursor()?; let mut cursor = docid_word_positions.into_cursor()?;
while let Some((key, value)) = cursor.move_on_next()? { while let Some((key, value)) = cursor.move_on_next()? {
let (document_id_bytes, word_bytes) = try_split_array_at(key) let (document_id_bytes, word_bytes) = try_split_array_at(key)
.ok_or_else(|| SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?; .ok_or(SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?;
let document_id = u32::from_be_bytes(document_id_bytes); let document_id = u32::from_be_bytes(document_id_bytes);
let word = str::from_utf8(word_bytes)?; let word = str::from_utf8(word_bytes)?;
@ -81,7 +81,7 @@ pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>(
/// ///
/// This list is used by the engine to calculate the documents containing words that are /// This list is used by the engine to calculate the documents containing words that are
/// close to each other. /// close to each other.
fn document_word_positions_into_sorter<'b>( fn document_word_positions_into_sorter(
document_id: DocumentId, document_id: DocumentId,
mut word_positions_heap: BinaryHeap<PeekedWordPosition<vec::IntoIter<u32>>>, mut word_positions_heap: BinaryHeap<PeekedWordPosition<vec::IntoIter<u32>>>,
word_pair_proximity_docids_sorter: &mut grenad::Sorter<MergeFn>, word_pair_proximity_docids_sorter: &mut grenad::Sorter<MergeFn>,

View File

@ -33,7 +33,7 @@ pub fn extract_word_position_docids<R: io::Read + io::Seek>(
let mut cursor = docid_word_positions.into_cursor()?; let mut cursor = docid_word_positions.into_cursor()?;
while let Some((key, value)) = cursor.move_on_next()? { while let Some((key, value)) = cursor.move_on_next()? {
let (document_id_bytes, word_bytes) = try_split_array_at(key) let (document_id_bytes, word_bytes) = try_split_array_at(key)
.ok_or_else(|| SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?; .ok_or(SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?;
let document_id = DocumentId::from_be_bytes(document_id_bytes); let document_id = DocumentId::from_be_bytes(document_id_bytes);
for position in read_u32_ne_bytes(value) { for position in read_u32_ne_bytes(value) {

View File

@ -96,7 +96,7 @@ pub(crate) fn data_from_obkv_documents(
spawn_extraction_task::<_, _, Vec<grenad::Reader<File>>>( spawn_extraction_task::<_, _, Vec<grenad::Reader<File>>>(
docid_word_positions_chunks.clone(), docid_word_positions_chunks.clone(),
indexer.clone(), indexer,
lmdb_writer_sx.clone(), lmdb_writer_sx.clone(),
extract_word_pair_proximity_docids, extract_word_pair_proximity_docids,
merge_cbo_roaring_bitmaps, merge_cbo_roaring_bitmaps,
@ -106,7 +106,7 @@ pub(crate) fn data_from_obkv_documents(
spawn_extraction_task::<_, _, Vec<grenad::Reader<File>>>( spawn_extraction_task::<_, _, Vec<grenad::Reader<File>>>(
docid_word_positions_chunks.clone(), docid_word_positions_chunks.clone(),
indexer.clone(), indexer,
lmdb_writer_sx.clone(), lmdb_writer_sx.clone(),
extract_fid_word_count_docids, extract_fid_word_count_docids,
merge_cbo_roaring_bitmaps, merge_cbo_roaring_bitmaps,
@ -116,7 +116,7 @@ pub(crate) fn data_from_obkv_documents(
spawn_extraction_task::<_, _, Vec<(grenad::Reader<File>, grenad::Reader<File>)>>( spawn_extraction_task::<_, _, Vec<(grenad::Reader<File>, grenad::Reader<File>)>>(
docid_word_positions_chunks.clone(), docid_word_positions_chunks.clone(),
indexer.clone(), indexer,
lmdb_writer_sx.clone(), lmdb_writer_sx.clone(),
move |doc_word_pos, indexer| extract_word_docids(doc_word_pos, indexer, &exact_attributes), move |doc_word_pos, indexer| extract_word_docids(doc_word_pos, indexer, &exact_attributes),
merge_roaring_bitmaps, merge_roaring_bitmaps,
@ -128,8 +128,8 @@ pub(crate) fn data_from_obkv_documents(
); );
spawn_extraction_task::<_, _, Vec<grenad::Reader<File>>>( spawn_extraction_task::<_, _, Vec<grenad::Reader<File>>>(
docid_word_positions_chunks.clone(), docid_word_positions_chunks,
indexer.clone(), indexer,
lmdb_writer_sx.clone(), lmdb_writer_sx.clone(),
extract_word_position_docids, extract_word_position_docids,
merge_cbo_roaring_bitmaps, merge_cbo_roaring_bitmaps,
@ -138,8 +138,8 @@ pub(crate) fn data_from_obkv_documents(
); );
spawn_extraction_task::<_, _, Vec<grenad::Reader<File>>>( spawn_extraction_task::<_, _, Vec<grenad::Reader<File>>>(
docid_fid_facet_strings_chunks.clone(), docid_fid_facet_strings_chunks,
indexer.clone(), indexer,
lmdb_writer_sx.clone(), lmdb_writer_sx.clone(),
extract_facet_string_docids, extract_facet_string_docids,
keep_first_prefix_value_merge_roaring_bitmaps, keep_first_prefix_value_merge_roaring_bitmaps,
@ -148,8 +148,8 @@ pub(crate) fn data_from_obkv_documents(
); );
spawn_extraction_task::<_, _, Vec<grenad::Reader<File>>>( spawn_extraction_task::<_, _, Vec<grenad::Reader<File>>>(
docid_fid_facet_numbers_chunks.clone(), docid_fid_facet_numbers_chunks,
indexer.clone(), indexer,
lmdb_writer_sx.clone(), lmdb_writer_sx.clone(),
extract_facet_number_docids, extract_facet_number_docids,
merge_cbo_roaring_bitmaps, merge_cbo_roaring_bitmaps,
@ -183,12 +183,12 @@ fn spawn_extraction_task<FE, FS, M>(
{ {
rayon::spawn(move || { rayon::spawn(move || {
let chunks: Result<M> = let chunks: Result<M> =
chunks.into_par_iter().map(|chunk| extract_fn(chunk, indexer.clone())).collect(); chunks.into_par_iter().map(|chunk| extract_fn(chunk, indexer)).collect();
rayon::spawn(move || match chunks { rayon::spawn(move || match chunks {
Ok(chunks) => { Ok(chunks) => {
debug!("merge {} database", name); debug!("merge {} database", name);
let reader = chunks.merge(merge_fn, &indexer); let reader = chunks.merge(merge_fn, &indexer);
let _ = lmdb_writer_sx.send(reader.map(|r| serialize_fn(r))); let _ = lmdb_writer_sx.send(reader.map(serialize_fn));
} }
Err(e) => { Err(e) => {
let _ = lmdb_writer_sx.send(Err(e)); let _ = lmdb_writer_sx.send(Err(e));
@ -255,7 +255,7 @@ fn send_and_extract_flattened_documents_data(
|| { || {
let (documents_ids, docid_word_positions_chunk) = extract_docid_word_positions( let (documents_ids, docid_word_positions_chunk) = extract_docid_word_positions(
flattened_documents_chunk.clone(), flattened_documents_chunk.clone(),
indexer.clone(), indexer,
searchable_fields, searchable_fields,
stop_words.as_ref(), stop_words.as_ref(),
max_positions_per_attributes, max_positions_per_attributes,
@ -279,7 +279,7 @@ fn send_and_extract_flattened_documents_data(
fid_facet_exists_docids_chunk, fid_facet_exists_docids_chunk,
) = extract_fid_docid_facet_values( ) = extract_fid_docid_facet_values(
flattened_documents_chunk.clone(), flattened_documents_chunk.clone(),
indexer.clone(), indexer,
faceted_fields, faceted_fields,
)?; )?;

View File

@ -61,7 +61,7 @@ pub fn sorter_into_reader(
); );
sorter.write_into_stream_writer(&mut writer)?; sorter.write_into_stream_writer(&mut writer)?;
Ok(writer_into_reader(writer)?) writer_into_reader(writer)
} }
pub fn writer_into_reader(writer: grenad::Writer<File>) -> Result<grenad::Reader<File>> { pub fn writer_into_reader(writer: grenad::Writer<File>) -> Result<grenad::Reader<File>> {
@ -134,7 +134,7 @@ impl<R: io::Read + io::Seek> MergerBuilder<R> {
); );
merger.write_into_stream_writer(&mut writer)?; merger.write_into_stream_writer(&mut writer)?;
Ok(writer_into_reader(writer)?) writer_into_reader(writer)
} }
} }
@ -180,7 +180,6 @@ pub fn grenad_obkv_into_chunks<R: io::Read + io::Seek>(
let mut continue_reading = true; let mut continue_reading = true;
let mut cursor = reader.into_cursor()?; let mut cursor = reader.into_cursor()?;
let indexer_clone = indexer.clone();
let mut transposer = move || { let mut transposer = move || {
if !continue_reading { if !continue_reading {
return Ok(None); return Ok(None);
@ -188,8 +187,8 @@ pub fn grenad_obkv_into_chunks<R: io::Read + io::Seek>(
let mut current_chunk_size = 0u64; let mut current_chunk_size = 0u64;
let mut obkv_documents = create_writer( let mut obkv_documents = create_writer(
indexer_clone.chunk_compression_type, indexer.chunk_compression_type,
indexer_clone.chunk_compression_level, indexer.chunk_compression_level,
tempfile::tempfile()?, tempfile::tempfile()?,
); );
@ -224,7 +223,7 @@ pub fn write_into_lmdb_database(
match iter.next().transpose()? { match iter.next().transpose()? {
Some((key, old_val)) if key == k => { Some((key, old_val)) if key == k => {
let vals = &[Cow::Borrowed(old_val), Cow::Borrowed(v)][..]; let vals = &[Cow::Borrowed(old_val), Cow::Borrowed(v)][..];
let val = merge(k, &vals)?; let val = merge(k, vals)?;
// safety: we don't keep references from inside the LMDB database. // safety: we don't keep references from inside the LMDB database.
unsafe { iter.put_current(k, &val)? }; unsafe { iter.put_current(k, &val)? };
} }

View File

@ -88,7 +88,7 @@ pub fn keep_latest_obkv<'a>(_key: &[u8], obkvs: &[Cow<'a, [u8]>]) -> Result<Cow<
/// Merge all the obks in the order we see them. /// Merge all the obks in the order we see them.
pub fn merge_obkvs<'a>(_key: &[u8], obkvs: &[Cow<'a, [u8]>]) -> Result<Cow<'a, [u8]>> { pub fn merge_obkvs<'a>(_key: &[u8], obkvs: &[Cow<'a, [u8]>]) -> Result<Cow<'a, [u8]>> {
Ok(obkvs Ok(obkvs
.into_iter() .iter()
.cloned() .cloned()
.reduce(|acc, current| { .reduce(|acc, current| {
let first = obkv::KvReader::new(&acc); let first = obkv::KvReader::new(&acc);

View File

@ -106,7 +106,7 @@ where
) -> Result<IndexDocuments<'t, 'u, 'i, 'a, F>> { ) -> Result<IndexDocuments<'t, 'u, 'i, 'a, F>> {
let transform = Some(Transform::new( let transform = Some(Transform::new(
wtxn, wtxn,
&index, index,
indexer_config, indexer_config,
config.update_method, config.update_method,
config.autogenerate_docids, config.autogenerate_docids,
@ -291,18 +291,12 @@ where
// Run extraction pipeline in parallel. // Run extraction pipeline in parallel.
pool.install(|| { pool.install(|| {
// split obkv file into several chunks // split obkv file into several chunks
let original_chunk_iter = grenad_obkv_into_chunks( let original_chunk_iter =
original_documents, grenad_obkv_into_chunks(original_documents, pool_params, documents_chunk_size);
pool_params.clone(),
documents_chunk_size,
);
// split obkv file into several chunks // split obkv file into several chunks
let flattened_chunk_iter = grenad_obkv_into_chunks( let flattened_chunk_iter =
flattened_documents, grenad_obkv_into_chunks(flattened_documents, pool_params, documents_chunk_size);
pool_params.clone(),
documents_chunk_size,
);
let result = original_chunk_iter.and_then(|original_chunk| { let result = original_chunk_iter.and_then(|original_chunk| {
let flattened_chunk = flattened_chunk_iter?; let flattened_chunk = flattened_chunk_iter?;
@ -341,7 +335,7 @@ where
} }
let index_documents_ids = self.index.documents_ids(self.wtxn)?; let index_documents_ids = self.index.documents_ids(self.wtxn)?;
let index_is_empty = index_documents_ids.len() == 0; let index_is_empty = index_documents_ids.is_empty();
let mut final_documents_ids = RoaringBitmap::new(); let mut final_documents_ids = RoaringBitmap::new();
let mut word_pair_proximity_docids = None; let mut word_pair_proximity_docids = None;
let mut word_position_docids = None; let mut word_position_docids = None;
@ -378,7 +372,7 @@ where
}; };
let (docids, is_merged_database) = let (docids, is_merged_database) =
write_typed_chunk_into_index(typed_chunk, &self.index, self.wtxn, index_is_empty)?; write_typed_chunk_into_index(typed_chunk, self.index, self.wtxn, index_is_empty)?;
if !docids.is_empty() { if !docids.is_empty() {
final_documents_ids |= docids; final_documents_ids |= docids;
let documents_seen_count = final_documents_ids.len(); let documents_seen_count = final_documents_ids.len();
@ -475,7 +469,7 @@ where
); );
let common_prefix_fst_words: Vec<_> = common_prefix_fst_words let common_prefix_fst_words: Vec<_> = common_prefix_fst_words
.as_slice() .as_slice()
.linear_group_by_key(|x| x.chars().nth(0).unwrap()) .linear_group_by_key(|x| x.chars().next().unwrap())
.collect(); .collect();
// We retrieve the newly added words between the previous and new prefix word fst. // We retrieve the newly added words between the previous and new prefix word fst.
@ -498,9 +492,9 @@ where
execute_word_prefix_docids( execute_word_prefix_docids(
self.wtxn, self.wtxn,
word_docids, word_docids,
self.index.word_docids.clone(), self.index.word_docids,
self.index.word_prefix_docids.clone(), self.index.word_prefix_docids,
&self.indexer_config, self.indexer_config,
&new_prefix_fst_words, &new_prefix_fst_words,
&common_prefix_fst_words, &common_prefix_fst_words,
&del_prefix_fst_words, &del_prefix_fst_words,
@ -511,9 +505,9 @@ where
execute_word_prefix_docids( execute_word_prefix_docids(
self.wtxn, self.wtxn,
exact_word_docids, exact_word_docids,
self.index.exact_word_docids.clone(), self.index.exact_word_docids,
self.index.exact_word_prefix_docids.clone(), self.index.exact_word_prefix_docids,
&self.indexer_config, self.indexer_config,
&new_prefix_fst_words, &new_prefix_fst_words,
&common_prefix_fst_words, &common_prefix_fst_words,
&del_prefix_fst_words, &del_prefix_fst_words,
@ -596,12 +590,7 @@ fn execute_word_prefix_docids(
builder.chunk_compression_level = indexer_config.chunk_compression_level; builder.chunk_compression_level = indexer_config.chunk_compression_level;
builder.max_nb_chunks = indexer_config.max_nb_chunks; builder.max_nb_chunks = indexer_config.max_nb_chunks;
builder.max_memory = indexer_config.max_memory; builder.max_memory = indexer_config.max_memory;
builder.execute( builder.execute(cursor, new_prefix_fst_words, common_prefix_fst_words, del_prefix_fst_words)?;
cursor,
&new_prefix_fst_words,
&common_prefix_fst_words,
&del_prefix_fst_words,
)?;
Ok(()) Ok(())
} }

View File

@ -72,10 +72,10 @@ fn create_fields_mapping(
// we sort by id here to ensure a deterministic mapping of the fields, that preserves // we sort by id here to ensure a deterministic mapping of the fields, that preserves
// the original ordering. // the original ordering.
.sorted_by_key(|(&id, _)| id) .sorted_by_key(|(&id, _)| id)
.map(|(field, name)| match index_field_map.id(&name) { .map(|(field, name)| match index_field_map.id(name) {
Some(id) => Ok((*field, id)), Some(id) => Ok((*field, id)),
None => index_field_map None => index_field_map
.insert(&name) .insert(name)
.ok_or(Error::UserError(UserError::AttributeLimitReached)) .ok_or(Error::UserError(UserError::AttributeLimitReached))
.map(|id| (*field, id)), .map(|id| (*field, id)),
}) })
@ -192,7 +192,7 @@ impl<'a, 'i> Transform<'a, 'i> {
// Insertion in a obkv need to be done with keys ordered. For now they are ordered // Insertion in a obkv need to be done with keys ordered. For now they are ordered
// according to the document addition key order, so we sort it according to the // according to the document addition key order, so we sort it according to the
// fieldids map keys order. // fieldids map keys order.
field_buffer_cache.sort_unstable_by(|(f1, _), (f2, _)| f1.cmp(&f2)); field_buffer_cache.sort_unstable_by(|(f1, _), (f2, _)| f1.cmp(f2));
// Build the new obkv document. // Build the new obkv document.
let mut writer = obkv::KvWriter::new(&mut obkv_buffer); let mut writer = obkv::KvWriter::new(&mut obkv_buffer);
@ -202,24 +202,23 @@ impl<'a, 'i> Transform<'a, 'i> {
let mut original_docid = None; let mut original_docid = None;
let docid = let docid = match self.new_external_documents_ids_builder.entry((*external_id).into()) {
match self.new_external_documents_ids_builder.entry(external_id.clone().into()) { Entry::Occupied(entry) => *entry.get() as u32,
Entry::Occupied(entry) => *entry.get() as u32, Entry::Vacant(entry) => {
Entry::Vacant(entry) => { // If the document was already in the db we mark it as a replaced document.
// If the document was already in the db we mark it as a replaced document. // It'll be deleted later. We keep its original docid to insert it in the grenad.
// It'll be deleted later. We keep its original docid to insert it in the grenad. if let Some(docid) = external_documents_ids.get(entry.key()) {
if let Some(docid) = external_documents_ids.get(entry.key()) { self.replaced_documents_ids.insert(docid);
self.replaced_documents_ids.insert(docid); original_docid = Some(docid);
original_docid = Some(docid);
}
let docid = self
.available_documents_ids
.next()
.ok_or(UserError::DocumentLimitReached)?;
entry.insert(docid as u64);
docid
} }
}; let docid = self
.available_documents_ids
.next()
.ok_or(UserError::DocumentLimitReached)?;
entry.insert(docid as u64);
docid
}
};
let mut skip_insertion = false; let mut skip_insertion = false;
if let Some(original_docid) = original_docid { if let Some(original_docid) = original_docid {
@ -239,12 +238,12 @@ impl<'a, 'i> Transform<'a, 'i> {
// we're not replacing anything // we're not replacing anything
self.replaced_documents_ids.remove(original_docid); self.replaced_documents_ids.remove(original_docid);
// and we need to put back the original id as it was before // and we need to put back the original id as it was before
self.new_external_documents_ids_builder.remove(&*external_id); self.new_external_documents_ids_builder.remove(external_id);
skip_insertion = true; skip_insertion = true;
} else { } else {
// we associate the base document with the new key, everything will get merged later. // we associate the base document with the new key, everything will get merged later.
self.original_sorter.insert(&docid.to_be_bytes(), base_obkv)?; self.original_sorter.insert(&docid.to_be_bytes(), base_obkv)?;
match self.flatten_from_fields_ids_map(KvReader::new(&base_obkv))? { match self.flatten_from_fields_ids_map(KvReader::new(base_obkv))? {
Some(buffer) => { Some(buffer) => {
self.flattened_sorter.insert(docid.to_be_bytes(), &buffer)? self.flattened_sorter.insert(docid.to_be_bytes(), &buffer)?
} }
@ -453,7 +452,7 @@ impl<'a, 'i> Transform<'a, 'i> {
{ {
let primary_key = self let primary_key = self
.index .index
.primary_key(&wtxn)? .primary_key(wtxn)?
.ok_or(Error::UserError(UserError::MissingPrimaryKey))? .ok_or(Error::UserError(UserError::MissingPrimaryKey))?
.to_string(); .to_string();
@ -520,7 +519,7 @@ impl<'a, 'i> Transform<'a, 'i> {
self.new_external_documents_ids_builder.into_iter().collect(); self.new_external_documents_ids_builder.into_iter().collect();
new_external_documents_ids_builder new_external_documents_ids_builder
.sort_unstable_by(|(left, _), (right, _)| left.cmp(&right)); .sort_unstable_by(|(left, _), (right, _)| left.cmp(right));
let mut fst_new_external_documents_ids_builder = fst::MapBuilder::memory(); let mut fst_new_external_documents_ids_builder = fst::MapBuilder::memory();
new_external_documents_ids_builder.into_iter().try_for_each(|(key, value)| { new_external_documents_ids_builder.into_iter().try_for_each(|(key, value)| {
fst_new_external_documents_ids_builder.insert(key, value) fst_new_external_documents_ids_builder.insert(key, value)
@ -614,7 +613,7 @@ impl<'a, 'i> Transform<'a, 'i> {
let mut flattened: Vec<_> = flattened.into_iter().collect(); let mut flattened: Vec<_> = flattened.into_iter().collect();
// we reorder the field to get all the known field first // we reorder the field to get all the known field first
flattened.sort_unstable_by_key(|(key, _)| { flattened.sort_unstable_by_key(|(key, _)| {
new_fields_ids_map.id(&key).unwrap_or(FieldId::MAX) new_fields_ids_map.id(key).unwrap_or(FieldId::MAX)
}); });
for (key, value) in flattened { for (key, value) in flattened {

View File

@ -175,7 +175,7 @@ pub(crate) fn write_typed_chunk_into_index(
let mut cursor = fid_docid_facet_number.into_cursor()?; let mut cursor = fid_docid_facet_number.into_cursor()?;
while let Some((key, value)) = cursor.move_on_next()? { while let Some((key, value)) = cursor.move_on_next()? {
if valid_lmdb_key(key) { if valid_lmdb_key(key) {
index_fid_docid_facet_numbers.put(wtxn, key, &value)?; index_fid_docid_facet_numbers.put(wtxn, key, value)?;
} }
} }
} }
@ -185,7 +185,7 @@ pub(crate) fn write_typed_chunk_into_index(
let mut cursor = fid_docid_facet_string.into_cursor()?; let mut cursor = fid_docid_facet_string.into_cursor()?;
while let Some((key, value)) = cursor.move_on_next()? { while let Some((key, value)) = cursor.move_on_next()? {
if valid_lmdb_key(key) { if valid_lmdb_key(key) {
index_fid_docid_facet_strings.put(wtxn, key, &value)?; index_fid_docid_facet_strings.put(wtxn, key, value)?;
} }
} }
} }

View File

@ -15,7 +15,7 @@ use crate::update::index_documents::IndexDocumentsMethod;
use crate::update::{ClearDocuments, IndexDocuments, UpdateIndexingStep}; use crate::update::{ClearDocuments, IndexDocuments, UpdateIndexingStep};
use crate::{FieldsIdsMap, Index, Result}; use crate::{FieldsIdsMap, Index, Result};
#[derive(Debug, Clone, PartialEq, Copy)] #[derive(Debug, Clone, PartialEq, Eq, Copy)]
pub enum Setting<T> { pub enum Setting<T> {
Set(T), Set(T),
Reset, Reset,
@ -273,24 +273,21 @@ impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> {
let fields_ids_map = self.index.fields_ids_map(self.wtxn)?; let fields_ids_map = self.index.fields_ids_map(self.wtxn)?;
// if the settings are set before any document update, we don't need to do anything, and // if the settings are set before any document update, we don't need to do anything, and
// will set the primary key during the first document addition. // will set the primary key during the first document addition.
if self.index.number_of_documents(&self.wtxn)? == 0 { if self.index.number_of_documents(self.wtxn)? == 0 {
return Ok(()); return Ok(());
} }
let transform = Transform::new( let transform = Transform::new(
self.wtxn, self.wtxn,
&self.index, self.index,
&self.indexer_config, self.indexer_config,
IndexDocumentsMethod::ReplaceDocuments, IndexDocumentsMethod::ReplaceDocuments,
false, false,
)?; )?;
// We remap the documents fields based on the new `FieldsIdsMap`. // We remap the documents fields based on the new `FieldsIdsMap`.
let output = transform.remap_index_documents( let output =
self.wtxn, transform.remap_index_documents(self.wtxn, old_fields_ids_map, fields_ids_map)?;
old_fields_ids_map,
fields_ids_map.clone(),
)?;
let new_facets = output.compute_real_facets(self.wtxn, self.index)?; let new_facets = output.compute_real_facets(self.wtxn, self.index)?;
self.index.put_faceted_fields(self.wtxn, &new_facets)?; self.index.put_faceted_fields(self.wtxn, &new_facets)?;
@ -303,7 +300,7 @@ impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> {
let indexing_builder = IndexDocuments::new( let indexing_builder = IndexDocuments::new(
self.wtxn, self.wtxn,
self.index, self.index,
&self.indexer_config, self.indexer_config,
IndexDocumentsConfig::default(), IndexDocumentsConfig::default(),
&cb, &cb,
)?; )?;
@ -330,7 +327,7 @@ impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> {
fn update_distinct_field(&mut self) -> Result<bool> { fn update_distinct_field(&mut self) -> Result<bool> {
match self.distinct_field { match self.distinct_field {
Setting::Set(ref attr) => { Setting::Set(ref attr) => {
self.index.put_distinct_field(self.wtxn, &attr)?; self.index.put_distinct_field(self.wtxn, attr)?;
} }
Setting::Reset => { Setting::Reset => {
self.index.delete_distinct_field(self.wtxn)?; self.index.delete_distinct_field(self.wtxn)?;
@ -356,11 +353,11 @@ impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> {
// Add all the searchable attributes to the field map, and then add the // Add all the searchable attributes to the field map, and then add the
// remaining fields from the old field map to the new one // remaining fields from the old field map to the new one
for name in names.iter() { for name in names.iter() {
new_fields_ids_map.insert(&name).ok_or(UserError::AttributeLimitReached)?; new_fields_ids_map.insert(name).ok_or(UserError::AttributeLimitReached)?;
} }
for (_, name) in old_fields_ids_map.iter() { for (_, name) in old_fields_ids_map.iter() {
new_fields_ids_map.insert(&name).ok_or(UserError::AttributeLimitReached)?; new_fields_ids_map.insert(name).ok_or(UserError::AttributeLimitReached)?;
} }
self.index.put_all_searchable_fields_from_fields_ids_map( self.index.put_all_searchable_fields_from_fields_ids_map(
@ -462,11 +459,11 @@ impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> {
match self.exact_attributes { match self.exact_attributes {
Setting::Set(ref attrs) => { Setting::Set(ref attrs) => {
let attrs = attrs.iter().map(String::as_str).collect::<Vec<_>>(); let attrs = attrs.iter().map(String::as_str).collect::<Vec<_>>();
self.index.put_exact_attributes(&mut self.wtxn, &attrs)?; self.index.put_exact_attributes(self.wtxn, &attrs)?;
Ok(true) Ok(true)
} }
Setting::Reset => { Setting::Reset => {
self.index.delete_exact_attributes(&mut self.wtxn)?; self.index.delete_exact_attributes(self.wtxn)?;
Ok(true) Ok(true)
} }
Setting::NotSet => Ok(false), Setting::NotSet => Ok(false),
@ -528,7 +525,7 @@ impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> {
fn update_primary_key(&mut self) -> Result<()> { fn update_primary_key(&mut self) -> Result<()> {
match self.primary_key { match self.primary_key {
Setting::Set(ref primary_key) => { Setting::Set(ref primary_key) => {
if self.index.number_of_documents(&self.wtxn)? == 0 { if self.index.number_of_documents(self.wtxn)? == 0 {
let mut fields_ids_map = self.index.fields_ids_map(self.wtxn)?; let mut fields_ids_map = self.index.fields_ids_map(self.wtxn)?;
fields_ids_map.insert(primary_key).ok_or(UserError::AttributeLimitReached)?; fields_ids_map.insert(primary_key).ok_or(UserError::AttributeLimitReached)?;
self.index.put_fields_ids_map(self.wtxn, &fields_ids_map)?; self.index.put_fields_ids_map(self.wtxn, &fields_ids_map)?;
@ -540,7 +537,7 @@ impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> {
} }
} }
Setting::Reset => { Setting::Reset => {
if self.index.number_of_documents(&self.wtxn)? == 0 { if self.index.number_of_documents(self.wtxn)? == 0 {
self.index.delete_primary_key(self.wtxn)?; self.index.delete_primary_key(self.wtxn)?;
Ok(()) Ok(())
} else { } else {
@ -574,24 +571,24 @@ impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> {
if one > two { if one > two {
return Err(UserError::InvalidMinTypoWordLenSetting(one, two).into()); return Err(UserError::InvalidMinTypoWordLenSetting(one, two).into());
} else { } else {
self.index.put_min_word_len_one_typo(&mut self.wtxn, one)?; self.index.put_min_word_len_one_typo(self.wtxn, one)?;
self.index.put_min_word_len_two_typos(&mut self.wtxn, two)?; self.index.put_min_word_len_two_typos(self.wtxn, two)?;
} }
} }
(Setting::Set(one), _) => { (Setting::Set(one), _) => {
let two = self.index.min_word_len_two_typos(&self.wtxn)?; let two = self.index.min_word_len_two_typos(self.wtxn)?;
if one > two { if one > two {
return Err(UserError::InvalidMinTypoWordLenSetting(one, two).into()); return Err(UserError::InvalidMinTypoWordLenSetting(one, two).into());
} else { } else {
self.index.put_min_word_len_one_typo(&mut self.wtxn, one)?; self.index.put_min_word_len_one_typo(self.wtxn, one)?;
} }
} }
(_, Setting::Set(two)) => { (_, Setting::Set(two)) => {
let one = self.index.min_word_len_one_typo(&self.wtxn)?; let one = self.index.min_word_len_one_typo(self.wtxn)?;
if one > two { if one > two {
return Err(UserError::InvalidMinTypoWordLenSetting(one, two).into()); return Err(UserError::InvalidMinTypoWordLenSetting(one, two).into());
} else { } else {
self.index.put_min_word_len_two_typos(&mut self.wtxn, two)?; self.index.put_min_word_len_two_typos(self.wtxn, two)?;
} }
} }
_ => (), _ => (),
@ -621,10 +618,10 @@ impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> {
words.sort_unstable(); words.sort_unstable();
let words = fst::Set::from_iter(words.iter())?; let words = fst::Set::from_iter(words.iter())?;
self.index.put_exact_words(&mut self.wtxn, &words)?; self.index.put_exact_words(self.wtxn, &words)?;
} }
Setting::Reset => { Setting::Reset => {
self.index.put_exact_words(&mut self.wtxn, &fst::Set::default())?; self.index.put_exact_words(self.wtxn, &fst::Set::default())?;
} }
Setting::NotSet => (), Setting::NotSet => (),
} }
@ -635,10 +632,10 @@ impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> {
fn update_max_values_per_facet(&mut self) -> Result<()> { fn update_max_values_per_facet(&mut self) -> Result<()> {
match self.max_values_per_facet { match self.max_values_per_facet {
Setting::Set(max) => { Setting::Set(max) => {
self.index.put_max_values_per_facet(&mut self.wtxn, max)?; self.index.put_max_values_per_facet(self.wtxn, max)?;
} }
Setting::Reset => { Setting::Reset => {
self.index.delete_max_values_per_facet(&mut self.wtxn)?; self.index.delete_max_values_per_facet(self.wtxn)?;
} }
Setting::NotSet => (), Setting::NotSet => (),
} }
@ -649,10 +646,10 @@ impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> {
fn update_pagination_max_total_hits(&mut self) -> Result<()> { fn update_pagination_max_total_hits(&mut self) -> Result<()> {
match self.pagination_max_total_hits { match self.pagination_max_total_hits {
Setting::Set(max) => { Setting::Set(max) => {
self.index.put_pagination_max_total_hits(&mut self.wtxn, max)?; self.index.put_pagination_max_total_hits(self.wtxn, max)?;
} }
Setting::Reset => { Setting::Reset => {
self.index.delete_pagination_max_total_hits(&mut self.wtxn)?; self.index.delete_pagination_max_total_hits(self.wtxn)?;
} }
Setting::NotSet => (), Setting::NotSet => (),
} }
@ -666,8 +663,8 @@ impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> {
{ {
self.index.set_updated_at(self.wtxn, &OffsetDateTime::now_utc())?; self.index.set_updated_at(self.wtxn, &OffsetDateTime::now_utc())?;
let old_faceted_fields = self.index.user_defined_faceted_fields(&self.wtxn)?; let old_faceted_fields = self.index.user_defined_faceted_fields(self.wtxn)?;
let old_fields_ids_map = self.index.fields_ids_map(&self.wtxn)?; let old_fields_ids_map = self.index.fields_ids_map(self.wtxn)?;
self.update_displayed()?; self.update_displayed()?;
self.update_filterable()?; self.update_filterable()?;
@ -684,7 +681,7 @@ impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> {
// If there is new faceted fields we indicate that we must reindex as we must // If there is new faceted fields we indicate that we must reindex as we must
// index new fields as facets. It means that the distinct attribute, // index new fields as facets. It means that the distinct attribute,
// an Asc/Desc criterion or a filtered attribute as be added or removed. // an Asc/Desc criterion or a filtered attribute as be added or removed.
let new_faceted_fields = self.index.user_defined_faceted_fields(&self.wtxn)?; let new_faceted_fields = self.index.user_defined_faceted_fields(self.wtxn)?;
let faceted_updated = old_faceted_fields != new_faceted_fields; let faceted_updated = old_faceted_fields != new_faceted_fields;
let stop_words_updated = self.update_stop_words()?; let stop_words_updated = self.update_stop_words()?;

View File

@ -61,12 +61,12 @@ impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> {
let mut prefixes_cache = HashMap::new(); let mut prefixes_cache = HashMap::new();
while let Some((word, data)) = new_word_docids_iter.move_on_next()? { while let Some((word, data)) = new_word_docids_iter.move_on_next()? {
current_prefixes = match current_prefixes.take() { current_prefixes = match current_prefixes.take() {
Some(prefixes) if word.starts_with(&prefixes[0].as_bytes()) => Some(prefixes), Some(prefixes) if word.starts_with(prefixes[0].as_bytes()) => Some(prefixes),
_otherwise => { _otherwise => {
write_prefixes_in_sorter(&mut prefixes_cache, &mut prefix_docids_sorter)?; write_prefixes_in_sorter(&mut prefixes_cache, &mut prefix_docids_sorter)?;
common_prefix_fst_words common_prefix_fst_words
.iter() .iter()
.find(|prefixes| word.starts_with(&prefixes[0].as_bytes())) .find(|prefixes| word.starts_with(prefixes[0].as_bytes()))
} }
}; };

View File

@ -42,7 +42,7 @@ impl<'t, 'u, 'i> WordsPrefixesFst<'t, 'u, 'i> {
#[logging_timer::time("WordsPrefixesFst::{}")] #[logging_timer::time("WordsPrefixesFst::{}")]
pub fn execute(self) -> Result<()> { pub fn execute(self) -> Result<()> {
let words_fst = self.index.words_fst(&self.wtxn)?; let words_fst = self.index.words_fst(self.wtxn)?;
let mut current_prefix = vec![SmallString32::new(); self.max_prefix_length]; let mut current_prefix = vec![SmallString32::new(); self.max_prefix_length];
let mut current_prefix_count = vec![0; self.max_prefix_length]; let mut current_prefix_count = vec![0; self.max_prefix_length];