Use the sorter cache when extracting the word position docids

This commit is contained in:
Clément Renault 2024-07-17 16:27:17 +02:00
parent 8319552e7d
commit 98d55e0d4d
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F

View File

@ -1,6 +1,7 @@
use std::collections::BTreeSet; use std::collections::BTreeSet;
use std::fs::File; use std::fs::File;
use std::io::{self, BufReader}; use std::io::{self, BufReader};
use std::num::NonZeroUsize;
use obkv::KvReaderU16; use obkv::KvReaderU16;
@ -10,7 +11,8 @@ use super::helpers::{
}; };
use crate::error::SerializationError; use crate::error::SerializationError;
use crate::index::db_name::DOCID_WORD_POSITIONS; use crate::index::db_name::DOCID_WORD_POSITIONS;
use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd}; use crate::update::del_add::{DelAdd, KvReaderDelAdd};
use crate::update::index_documents::cache::SorterCacheDelAddCboRoaringBitmap;
use crate::update::settings::InnerIndexSettingsDiff; use crate::update::settings::InnerIndexSettingsDiff;
use crate::update::MergeFn; use crate::update::MergeFn;
use crate::{bucketed_position, DocumentId, Result}; use crate::{bucketed_position, DocumentId, Result};
@ -25,10 +27,9 @@ pub fn extract_word_position_docids<R: io::Read + io::Seek>(
indexer: GrenadParameters, indexer: GrenadParameters,
_settings_diff: &InnerIndexSettingsDiff, _settings_diff: &InnerIndexSettingsDiff,
) -> Result<grenad::Reader<BufReader<File>>> { ) -> Result<grenad::Reader<BufReader<File>>> {
let mut conn = super::REDIS_CLIENT.get_connection().unwrap();
let max_memory = indexer.max_memory_by_thread(); let max_memory = indexer.max_memory_by_thread();
let mut word_position_docids_sorter = create_sorter( let word_position_docids_sorter = create_sorter(
grenad::SortAlgorithm::Unstable, grenad::SortAlgorithm::Unstable,
merge_deladd_cbo_roaring_bitmaps, merge_deladd_cbo_roaring_bitmaps,
indexer.chunk_compression_type, indexer.chunk_compression_type,
@ -36,6 +37,12 @@ pub fn extract_word_position_docids<R: io::Read + io::Seek>(
indexer.max_nb_chunks, indexer.max_nb_chunks,
max_memory, max_memory,
); );
let mut cached_word_position_docids_sorter =
SorterCacheDelAddCboRoaringBitmap::<20, MergeFn>::new(
NonZeroUsize::new(300).unwrap(),
word_position_docids_sorter,
super::REDIS_CLIENT.get_connection().unwrap(),
);
let mut del_word_positions: BTreeSet<(u16, Vec<u8>)> = BTreeSet::new(); let mut del_word_positions: BTreeSet<(u16, Vec<u8>)> = BTreeSet::new();
let mut add_word_positions: BTreeSet<(u16, Vec<u8>)> = BTreeSet::new(); let mut add_word_positions: BTreeSet<(u16, Vec<u8>)> = BTreeSet::new();
@ -53,8 +60,7 @@ pub fn extract_word_position_docids<R: io::Read + io::Seek>(
&mut key_buffer, &mut key_buffer,
&del_word_positions, &del_word_positions,
&add_word_positions, &add_word_positions,
&mut word_position_docids_sorter, &mut cached_word_position_docids_sorter,
&mut conn,
)?; )?;
del_word_positions.clear(); del_word_positions.clear();
add_word_positions.clear(); add_word_positions.clear();
@ -86,13 +92,13 @@ pub fn extract_word_position_docids<R: io::Read + io::Seek>(
&mut key_buffer, &mut key_buffer,
&del_word_positions, &del_word_positions,
&add_word_positions, &add_word_positions,
&mut word_position_docids_sorter, &mut cached_word_position_docids_sorter,
&mut conn,
)?; )?;
} }
// TODO remove noop DelAdd OBKV // TODO remove noop DelAdd OBKV
let word_position_docids_reader = sorter_into_reader(word_position_docids_sorter, indexer)?; let word_position_docids_reader =
sorter_into_reader(cached_word_position_docids_sorter.into_sorter()?, indexer)?;
Ok(word_position_docids_reader) Ok(word_position_docids_reader)
} }
@ -103,40 +109,38 @@ fn words_position_into_sorter(
key_buffer: &mut Vec<u8>, key_buffer: &mut Vec<u8>,
del_word_positions: &BTreeSet<(u16, Vec<u8>)>, del_word_positions: &BTreeSet<(u16, Vec<u8>)>,
add_word_positions: &BTreeSet<(u16, Vec<u8>)>, add_word_positions: &BTreeSet<(u16, Vec<u8>)>,
word_position_docids_sorter: &mut grenad::Sorter<MergeFn>, cached_word_position_docids_sorter: &mut SorterCacheDelAddCboRoaringBitmap<20, MergeFn>,
conn: &mut redis::Connection,
) -> Result<()> { ) -> Result<()> {
use itertools::merge_join_by; use itertools::merge_join_by;
use itertools::EitherOrBoth::{Both, Left, Right}; use itertools::EitherOrBoth::{Both, Left, Right};
let mut buffer = Vec::new();
for eob in merge_join_by(del_word_positions.iter(), add_word_positions.iter(), |d, a| d.cmp(a)) for eob in merge_join_by(del_word_positions.iter(), add_word_positions.iter(), |d, a| d.cmp(a))
{ {
buffer.clear();
let mut value_writer = KvWriterDelAdd::new(&mut buffer);
let (position, word_bytes) = match eob { let (position, word_bytes) = match eob {
Left(key) => { Left(key) => key,
value_writer.insert(DelAdd::Deletion, document_id.to_ne_bytes()).unwrap(); Right(key) => key,
key Both(key, _) => key,
}
Right(key) => {
value_writer.insert(DelAdd::Addition, document_id.to_ne_bytes()).unwrap();
key
}
Both(key, _) => {
// both values needs to be kept because it will be used in other extractors.
value_writer.insert(DelAdd::Deletion, document_id.to_ne_bytes()).unwrap();
value_writer.insert(DelAdd::Addition, document_id.to_ne_bytes()).unwrap();
key
}
}; };
key_buffer.clear(); key_buffer.clear();
key_buffer.extend_from_slice(word_bytes); key_buffer.extend_from_slice(word_bytes);
key_buffer.push(0); key_buffer.push(0);
key_buffer.extend_from_slice(&position.to_be_bytes()); key_buffer.extend_from_slice(&position.to_be_bytes());
redis::cmd("INCR").arg(key_buffer.as_slice()).query::<usize>(conn).unwrap();
word_position_docids_sorter.insert(&key_buffer, value_writer.into_inner().unwrap())?; match eob {
Left(_) => {
cached_word_position_docids_sorter
.insert_del_u32(key_buffer.as_slice(), document_id)?;
}
Right(_) => {
cached_word_position_docids_sorter
.insert_add_u32(key_buffer.as_slice(), document_id)?;
}
Both(_, _) => {
cached_word_position_docids_sorter
.insert_del_add_u32(key_buffer.as_slice(), document_id)?;
}
}
} }
Ok(()) Ok(())