Measure much more places where we insert in sorters

This commit is contained in:
Clément Renault 2024-07-17 14:15:23 +02:00
parent f17cb2ef5b
commit eafc097a85
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
7 changed files with 57 additions and 20 deletions

View File

@ -23,6 +23,7 @@ pub fn extract_facet_number_docids<R: io::Read + io::Seek>(
indexer: GrenadParameters,
_settings_diff: &InnerIndexSettingsDiff,
) -> Result<grenad::Reader<BufReader<File>>> {
let mut conn = super::REDIS_CLIENT.get_connection().unwrap();
let max_memory = indexer.max_memory_by_thread();
let mut facet_number_docids_sorter = create_sorter(
@ -50,6 +51,7 @@ pub fn extract_facet_number_docids<R: io::Read + io::Seek>(
}
obkv.finish()?;
redis::cmd("INCR").arg(key_bytes.as_ref()).query::<usize>(&mut conn).unwrap();
facet_number_docids_sorter.insert(key_bytes, &buffer)?;
}

View File

@ -28,6 +28,7 @@ pub fn extract_facet_string_docids<R: io::Read + io::Seek>(
indexer: GrenadParameters,
_settings_diff: &InnerIndexSettingsDiff,
) -> Result<(grenad::Reader<BufReader<File>>, grenad::Reader<BufReader<File>>)> {
let mut conn = super::REDIS_CLIENT.get_connection().unwrap();
let max_memory = indexer.max_memory_by_thread();
let options = NormalizerOption { lossy: true, ..Default::default() };
@ -94,6 +95,7 @@ pub fn extract_facet_string_docids<R: io::Read + io::Seek>(
let key = (field_id, hyper_normalized_value.as_ref());
let key_bytes = BEU16StrCodec::bytes_encode(&key).map_err(heed::Error::Encoding)?;
redis::cmd("INCR").arg(key_bytes.as_ref()).query::<usize>(&mut conn).unwrap();
normalized_facet_string_docids_sorter.insert(key_bytes, &buffer)?;
}
@ -106,6 +108,7 @@ pub fn extract_facet_string_docids<R: io::Read + io::Seek>(
obkv.insert(deladd_key, document_id.to_ne_bytes())?;
}
obkv.finish()?;
redis::cmd("INCR").arg(key_bytes.as_ref()).query::<usize>(&mut conn).unwrap();
facet_string_docids_sorter.insert(&key_bytes, &buffer)?;
}

View File

@ -46,6 +46,7 @@ pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>(
indexer: GrenadParameters,
settings_diff: &InnerIndexSettingsDiff,
) -> Result<ExtractedFacetValues> {
let mut conn = super::REDIS_CLIENT.get_connection().unwrap();
let max_memory = indexer.max_memory_by_thread();
let mut fid_docid_facet_numbers_sorter = create_sorter(
@ -169,20 +170,22 @@ pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>(
add_value.map(|value| extract_facet_values(&value, add_geo_support));
// Those closures are just here to simplify things a bit.
let mut insert_numbers_diff = |del_numbers, add_numbers| {
let mut insert_numbers_diff = |del_numbers, add_numbers, conn| {
insert_numbers_diff(
&mut fid_docid_facet_numbers_sorter,
&mut numbers_key_buffer,
del_numbers,
add_numbers,
conn,
)
};
let mut insert_strings_diff = |del_strings, add_strings| {
let mut insert_strings_diff = |del_strings, add_strings, conn| {
insert_strings_diff(
&mut fid_docid_facet_strings_sorter,
&mut strings_key_buffer,
del_strings,
add_strings,
conn,
)
};
@ -196,8 +199,8 @@ pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>(
del_is_empty.insert(document);
}
Values { numbers, strings } => {
insert_numbers_diff(numbers, vec![])?;
insert_strings_diff(strings, vec![])?;
insert_numbers_diff(numbers, vec![], &mut conn)?;
insert_strings_diff(strings, vec![], &mut conn)?;
}
},
(None, Some(add_filterable_values)) => match add_filterable_values {
@ -208,8 +211,8 @@ pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>(
add_is_empty.insert(document);
}
Values { numbers, strings } => {
insert_numbers_diff(vec![], numbers)?;
insert_strings_diff(vec![], strings)?;
insert_numbers_diff(vec![], numbers, &mut conn)?;
insert_strings_diff(vec![], strings, &mut conn)?;
}
},
(Some(del_filterable_values), Some(add_filterable_values)) => {
@ -224,31 +227,31 @@ pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>(
add_is_null.insert(document);
}
(Null, Values { numbers, strings }) => {
insert_numbers_diff(vec![], numbers)?;
insert_strings_diff(vec![], strings)?;
insert_numbers_diff(vec![], numbers, &mut conn)?;
insert_strings_diff(vec![], strings, &mut conn)?;
del_is_null.insert(document);
}
(Empty, Values { numbers, strings }) => {
insert_numbers_diff(vec![], numbers)?;
insert_strings_diff(vec![], strings)?;
insert_numbers_diff(vec![], numbers, &mut conn)?;
insert_strings_diff(vec![], strings, &mut conn)?;
del_is_empty.insert(document);
}
(Values { numbers, strings }, Null) => {
add_is_null.insert(document);
insert_numbers_diff(numbers, vec![])?;
insert_strings_diff(strings, vec![])?;
insert_numbers_diff(numbers, vec![], &mut conn)?;
insert_strings_diff(strings, vec![], &mut conn)?;
}
(Values { numbers, strings }, Empty) => {
add_is_empty.insert(document);
insert_numbers_diff(numbers, vec![])?;
insert_strings_diff(strings, vec![])?;
insert_numbers_diff(numbers, vec![], &mut conn)?;
insert_strings_diff(strings, vec![], &mut conn)?;
}
(
Values { numbers: del_numbers, strings: del_strings },
Values { numbers: add_numbers, strings: add_strings },
) => {
insert_numbers_diff(del_numbers, add_numbers)?;
insert_strings_diff(del_strings, add_strings)?;
insert_numbers_diff(del_numbers, add_numbers, &mut conn)?;
insert_strings_diff(del_strings, add_strings, &mut conn)?;
}
}
}
@ -331,6 +334,7 @@ fn insert_numbers_diff<MF>(
key_buffer: &mut Vec<u8>,
mut del_numbers: Vec<f64>,
mut add_numbers: Vec<f64>,
conn: &mut redis::Connection,
) -> Result<()>
where
MF: for<'a> Fn(&[u8], &[Cow<'a, [u8]>]) -> StdResult<Cow<'a, [u8]>, Error>,
@ -362,6 +366,7 @@ where
let mut obkv = KvWriterDelAdd::memory();
obkv.insert(DelAdd::Deletion, bytes_of(&()))?;
let bytes = obkv.into_inner()?;
redis::cmd("INCR").arg(key_buffer.as_slice()).query::<usize>(conn).unwrap();
fid_docid_facet_numbers_sorter.insert(&key_buffer, bytes)?;
}
}
@ -375,6 +380,7 @@ where
let mut obkv = KvWriterDelAdd::memory();
obkv.insert(DelAdd::Addition, bytes_of(&()))?;
let bytes = obkv.into_inner()?;
redis::cmd("INCR").arg(key_buffer.as_slice()).query::<usize>(conn).unwrap();
fid_docid_facet_numbers_sorter.insert(&key_buffer, bytes)?;
}
}
@ -391,6 +397,7 @@ fn insert_strings_diff<MF>(
key_buffer: &mut Vec<u8>,
mut del_strings: Vec<(String, String)>,
mut add_strings: Vec<(String, String)>,
conn: &mut redis::Connection,
) -> Result<()>
where
MF: for<'a> Fn(&[u8], &[Cow<'a, [u8]>]) -> StdResult<Cow<'a, [u8]>, Error>,
@ -419,6 +426,7 @@ where
let mut obkv = KvWriterDelAdd::memory();
obkv.insert(DelAdd::Deletion, original)?;
let bytes = obkv.into_inner()?;
redis::cmd("INCR").arg(key_buffer.as_slice()).query::<usize>(conn).unwrap();
fid_docid_facet_strings_sorter.insert(&key_buffer, bytes)?;
}
EitherOrBoth::Right((normalized, original)) => {
@ -428,6 +436,7 @@ where
let mut obkv = KvWriterDelAdd::memory();
obkv.insert(DelAdd::Addition, original)?;
let bytes = obkv.into_inner()?;
redis::cmd("INCR").arg(key_buffer.as_slice()).query::<usize>(conn).unwrap();
fid_docid_facet_strings_sorter.insert(&key_buffer, bytes)?;
}
}

View File

@ -132,6 +132,7 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
buffer.clear();
let mut obkv = KvWriterDelAdd::new(&mut buffer);
obkv.insert(DelAdd::Deletion, value)?;
redis::cmd("INCR").arg(w.as_bytes()).query::<usize>(&mut conn).unwrap();
if delete_from_exact {
exact_word_docids_sorter.insert(w, obkv.into_inner().unwrap())?;
} else {
@ -144,6 +145,7 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
buffer.clear();
let mut obkv = KvWriterDelAdd::new(&mut buffer);
obkv.insert(DelAdd::Addition, value)?;
redis::cmd("INCR").arg(w.as_bytes()).query::<usize>(&mut conn).unwrap();
if add_in_exact {
exact_word_docids_sorter.insert(w, obkv.into_inner().unwrap())?;
} else {

View File

@ -13,6 +13,7 @@ use std::result::Result as StdResult;
use std::sync::Arc;
use crossbeam_channel::{Receiver, Sender};
pub use extract::REDIS_CLIENT;
use grenad::{Merger, MergerBuilder};
use heed::types::Str;
use heed::Database;

View File

@ -4,6 +4,7 @@ use grenad::CompressionType;
use heed::types::{Bytes, Str};
use heed::Database;
use super::index_documents::REDIS_CLIENT;
use crate::update::del_add::{deladd_serialize_add_side, DelAdd, KvWriterDelAdd};
use crate::update::index_documents::{
create_sorter, merge_deladd_cbo_roaring_bitmaps,
@ -52,6 +53,8 @@ impl<'t, 'i> WordPrefixDocids<'t, 'i> {
common_prefix_fst_words: &[&[String]],
del_prefix_fst_words: &HashSet<Vec<u8>>,
) -> Result<()> {
let mut conn = REDIS_CLIENT.get_connection().unwrap();
// It is forbidden to keep a mutable reference into the database
// and write into it at the same time, therefore we write into another file.
let mut prefix_docids_sorter = create_sorter(
@ -71,7 +74,11 @@ impl<'t, 'i> WordPrefixDocids<'t, 'i> {
current_prefixes = match current_prefixes.take() {
Some(prefixes) if word.starts_with(prefixes[0].as_bytes()) => Some(prefixes),
_otherwise => {
write_prefixes_in_sorter(&mut prefixes_cache, &mut prefix_docids_sorter)?;
write_prefixes_in_sorter(
&mut prefixes_cache,
&mut prefix_docids_sorter,
&mut conn,
)?;
common_prefix_fst_words
.iter()
.find(|prefixes| word.starts_with(prefixes[0].as_bytes()))
@ -93,7 +100,7 @@ impl<'t, 'i> WordPrefixDocids<'t, 'i> {
}
}
write_prefixes_in_sorter(&mut prefixes_cache, &mut prefix_docids_sorter)?;
write_prefixes_in_sorter(&mut prefixes_cache, &mut prefix_docids_sorter, &mut conn)?;
}
// We fetch the docids associated to the newly added word prefix fst only.
@ -107,6 +114,7 @@ impl<'t, 'i> WordPrefixDocids<'t, 'i> {
let mut writer = KvWriterDelAdd::new(&mut buffer);
writer.insert(DelAdd::Addition, data)?;
redis::cmd("INCR").arg(prefix.as_bytes()).query::<usize>(&mut conn).unwrap();
prefix_docids_sorter.insert(prefix, writer.into_inner()?)?;
}
}
@ -140,10 +148,12 @@ impl<'t, 'i> WordPrefixDocids<'t, 'i> {
fn write_prefixes_in_sorter(
prefixes: &mut HashMap<Vec<u8>, Vec<Vec<u8>>>,
sorter: &mut grenad::Sorter<MergeFn>,
conn: &mut redis::Connection,
) -> Result<()> {
for (key, data_slices) in prefixes.drain() {
for data in data_slices {
if valid_lmdb_key(&key) {
redis::cmd("INCR").arg(key.as_slice()).query::<usize>(conn).unwrap();
sorter.insert(&key, data)?;
}
}

View File

@ -13,7 +13,7 @@ use crate::update::del_add::{deladd_serialize_add_side, DelAdd, KvWriterDelAdd};
use crate::update::index_documents::{
create_sorter, merge_deladd_cbo_roaring_bitmaps,
merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, valid_lmdb_key,
write_sorter_into_database, CursorClonableMmap, MergeFn,
write_sorter_into_database, CursorClonableMmap, MergeFn, REDIS_CLIENT,
};
use crate::{CboRoaringBitmapCodec, Result};
@ -59,6 +59,8 @@ impl<'t, 'i> WordPrefixIntegerDocids<'t, 'i> {
) -> Result<()> {
debug!("Computing and writing the word levels integers docids into LMDB on disk...");
let mut conn = REDIS_CLIENT.get_connection().unwrap();
let mut prefix_integer_docids_sorter = create_sorter(
grenad::SortAlgorithm::Unstable,
merge_deladd_cbo_roaring_bitmaps,
@ -85,6 +87,7 @@ impl<'t, 'i> WordPrefixIntegerDocids<'t, 'i> {
write_prefixes_in_sorter(
&mut prefixes_cache,
&mut prefix_integer_docids_sorter,
&mut conn,
)?;
common_prefix_fst_words
.iter()
@ -110,7 +113,11 @@ impl<'t, 'i> WordPrefixIntegerDocids<'t, 'i> {
}
}
write_prefixes_in_sorter(&mut prefixes_cache, &mut prefix_integer_docids_sorter)?;
write_prefixes_in_sorter(
&mut prefixes_cache,
&mut prefix_integer_docids_sorter,
&mut conn,
)?;
}
// We fetch the docids associated to the newly added word prefix fst only.
@ -135,6 +142,7 @@ impl<'t, 'i> WordPrefixIntegerDocids<'t, 'i> {
buffer.clear();
let mut writer = KvWriterDelAdd::new(&mut buffer);
writer.insert(DelAdd::Addition, data)?;
redis::cmd("INCR").arg(bytes.as_ref()).query::<usize>(&mut conn).unwrap();
prefix_integer_docids_sorter.insert(bytes, writer.into_inner()?)?;
}
}
@ -174,11 +182,13 @@ impl<'t, 'i> WordPrefixIntegerDocids<'t, 'i> {
fn write_prefixes_in_sorter(
prefixes: &mut HashMap<Vec<u8>, Vec<Vec<u8>>>,
sorter: &mut grenad::Sorter<MergeFn>,
conn: &mut redis::Connection,
) -> Result<()> {
// TODO: Merge before insertion.
for (key, data_slices) in prefixes.drain() {
for data in data_slices {
if valid_lmdb_key(&key) {
redis::cmd("INCR").arg(key.as_slice()).query::<usize>(conn).unwrap();
sorter.insert(&key, data)?;
}
}