Use the sorter cache when extracting the word pair proximity docids

This commit is contained in:
Clément Renault 2024-07-17 16:27:39 +02:00
parent 98d55e0d4d
commit 092a383419
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F

View File

@ -1,6 +1,7 @@
use std::collections::{BTreeMap, VecDeque}; use std::collections::{BTreeMap, VecDeque};
use std::fs::File; use std::fs::File;
use std::io::BufReader; use std::io::BufReader;
use std::num::NonZeroUsize;
use std::{cmp, io}; use std::{cmp, io};
use obkv::KvReaderU16; use obkv::KvReaderU16;
@ -12,7 +13,8 @@ use super::helpers::{
use crate::error::SerializationError; use crate::error::SerializationError;
use crate::index::db_name::DOCID_WORD_POSITIONS; use crate::index::db_name::DOCID_WORD_POSITIONS;
use crate::proximity::{index_proximity, ProximityPrecision, MAX_DISTANCE}; use crate::proximity::{index_proximity, ProximityPrecision, MAX_DISTANCE};
use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd}; use crate::update::del_add::{DelAdd, KvReaderDelAdd};
use crate::update::index_documents::cache::SorterCacheDelAddCboRoaringBitmap;
use crate::update::settings::InnerIndexSettingsDiff; use crate::update::settings::InnerIndexSettingsDiff;
use crate::{DocumentId, Result}; use crate::{DocumentId, Result};
@ -26,8 +28,6 @@ pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>(
indexer: GrenadParameters, indexer: GrenadParameters,
settings_diff: &InnerIndexSettingsDiff, settings_diff: &InnerIndexSettingsDiff,
) -> Result<grenad::Reader<BufReader<File>>> { ) -> Result<grenad::Reader<BufReader<File>>> {
let mut conn = super::REDIS_CLIENT.get_connection().unwrap();
// early return if the data shouldn't be deleted nor created. // early return if the data shouldn't be deleted nor created.
if settings_diff.settings_update_only && !settings_diff.reindex_proximities() { if settings_diff.settings_update_only && !settings_diff.reindex_proximities() {
let writer = create_writer( let writer = create_writer(
@ -42,15 +42,20 @@ pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>(
let any_addition = settings_diff.new.proximity_precision == ProximityPrecision::ByWord; let any_addition = settings_diff.new.proximity_precision == ProximityPrecision::ByWord;
let max_memory = indexer.max_memory_by_thread(); let max_memory = indexer.max_memory_by_thread();
let mut word_pair_proximity_docids_sorters: Vec<_> = (1..MAX_DISTANCE) let mut cached_word_pair_proximity_docids_sorters: Vec<_> = (1..MAX_DISTANCE)
.map(|_| { .map(|_| {
create_sorter( let sorter = create_sorter(
grenad::SortAlgorithm::Unstable, grenad::SortAlgorithm::Unstable,
merge_deladd_cbo_roaring_bitmaps, merge_deladd_cbo_roaring_bitmaps,
indexer.chunk_compression_type, indexer.chunk_compression_type,
indexer.chunk_compression_level, indexer.chunk_compression_level,
indexer.max_nb_chunks, indexer.max_nb_chunks,
max_memory.map(|m| m / MAX_DISTANCE as usize), max_memory.map(|m| m / MAX_DISTANCE as usize),
);
SorterCacheDelAddCboRoaringBitmap::<20, MergeFn>::new(
NonZeroUsize::new(100).unwrap(),
sorter,
super::REDIS_CLIENT.get_connection().unwrap(),
) )
}) })
.collect(); .collect();
@ -79,8 +84,7 @@ pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>(
current_document_id.unwrap(), current_document_id.unwrap(),
&del_word_pair_proximity, &del_word_pair_proximity,
&add_word_pair_proximity, &add_word_pair_proximity,
&mut word_pair_proximity_docids_sorters, &mut cached_word_pair_proximity_docids_sorters,
&mut conn,
)?; )?;
del_word_pair_proximity.clear(); del_word_pair_proximity.clear();
add_word_pair_proximity.clear(); add_word_pair_proximity.clear();
@ -170,8 +174,7 @@ pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>(
document_id, document_id,
&del_word_pair_proximity, &del_word_pair_proximity,
&add_word_pair_proximity, &add_word_pair_proximity,
&mut word_pair_proximity_docids_sorters, &mut cached_word_pair_proximity_docids_sorters,
&mut conn,
)?; )?;
} }
{ {
@ -185,8 +188,8 @@ pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>(
tempfile::tempfile()?, tempfile::tempfile()?,
); );
for sorter in word_pair_proximity_docids_sorters { for cached_sorter in cached_word_pair_proximity_docids_sorters {
sorter.write_into_stream_writer(&mut writer)?; cached_sorter.into_sorter()?.write_into_stream_writer(&mut writer)?;
} }
writer_into_reader(writer) writer_into_reader(writer)
@ -201,35 +204,24 @@ fn document_word_positions_into_sorter(
document_id: DocumentId, document_id: DocumentId,
del_word_pair_proximity: &BTreeMap<(String, String), u8>, del_word_pair_proximity: &BTreeMap<(String, String), u8>,
add_word_pair_proximity: &BTreeMap<(String, String), u8>, add_word_pair_proximity: &BTreeMap<(String, String), u8>,
word_pair_proximity_docids_sorters: &mut [grenad::Sorter<MergeFn>], cached_word_pair_proximity_docids_sorters: &mut [SorterCacheDelAddCboRoaringBitmap<
conn: &mut redis::Connection, 20,
MergeFn,
>],
) -> Result<()> { ) -> Result<()> {
use itertools::merge_join_by; use itertools::merge_join_by;
use itertools::EitherOrBoth::{Both, Left, Right}; use itertools::EitherOrBoth::{Both, Left, Right};
let mut buffer = Vec::new();
let mut key_buffer = Vec::new(); let mut key_buffer = Vec::new();
for eob in for eob in
merge_join_by(del_word_pair_proximity.iter(), add_word_pair_proximity.iter(), |d, a| { merge_join_by(del_word_pair_proximity.iter(), add_word_pair_proximity.iter(), |d, a| {
d.cmp(a) d.cmp(a)
}) })
{ {
buffer.clear();
let mut value_writer = KvWriterDelAdd::new(&mut buffer);
let ((w1, w2), prox) = match eob { let ((w1, w2), prox) = match eob {
Left(key_value) => { Left(key_value) => key_value,
value_writer.insert(DelAdd::Deletion, document_id.to_ne_bytes()).unwrap(); Right(key_value) => key_value,
key_value Both(key_value, _) => key_value,
}
Right(key_value) => {
value_writer.insert(DelAdd::Addition, document_id.to_ne_bytes()).unwrap();
key_value
}
Both(key_value, _) => {
value_writer.insert(DelAdd::Deletion, document_id.to_ne_bytes()).unwrap();
value_writer.insert(DelAdd::Addition, document_id.to_ne_bytes()).unwrap();
key_value
}
}; };
key_buffer.clear(); key_buffer.clear();
@ -238,9 +230,20 @@ fn document_word_positions_into_sorter(
key_buffer.push(0); key_buffer.push(0);
key_buffer.extend_from_slice(w2.as_bytes()); key_buffer.extend_from_slice(w2.as_bytes());
redis::cmd("INCR").arg(key_buffer.as_slice()).query::<usize>(conn).unwrap(); match eob {
word_pair_proximity_docids_sorters[*prox as usize - 1] Left(_) => {
.insert(&key_buffer, value_writer.into_inner().unwrap())?; cached_word_pair_proximity_docids_sorters[*prox as usize - 1]
.insert_del_u32(&key_buffer, document_id)?;
}
Right(_) => {
cached_word_pair_proximity_docids_sorters[*prox as usize - 1]
.insert_add_u32(&key_buffer, document_id)?;
}
Both(_, _) => {
cached_word_pair_proximity_docids_sorters[*prox as usize - 1]
.insert_del_add_u32(&key_buffer, document_id)?;
}
}
} }
Ok(()) Ok(())