Use the sorter cache when extracting the word counts

This commit is contained in:
Clément Renault 2024-07-17 16:35:52 +02:00
parent 092a383419
commit 54e2e2aa4a
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F

View File

@ -1,5 +1,6 @@
use std::fs::File; use std::fs::File;
use std::io::{self, BufReader}; use std::io::{self, BufReader};
use std::num::NonZeroUsize;
use obkv::KvReaderU16; use obkv::KvReaderU16;
@ -9,8 +10,10 @@ use super::helpers::{
}; };
use crate::error::SerializationError; use crate::error::SerializationError;
use crate::index::db_name::DOCID_WORD_POSITIONS; use crate::index::db_name::DOCID_WORD_POSITIONS;
use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd}; use crate::update::del_add::{DelAdd, KvReaderDelAdd};
use crate::update::index_documents::cache::SorterCacheDelAddCboRoaringBitmap;
use crate::update::settings::InnerIndexSettingsDiff; use crate::update::settings::InnerIndexSettingsDiff;
use crate::update::MergeFn;
use crate::Result; use crate::Result;
const MAX_COUNTED_WORDS: usize = 30; const MAX_COUNTED_WORDS: usize = 30;
@ -26,10 +29,9 @@ pub fn extract_fid_word_count_docids<R: io::Read + io::Seek>(
indexer: GrenadParameters, indexer: GrenadParameters,
_settings_diff: &InnerIndexSettingsDiff, _settings_diff: &InnerIndexSettingsDiff,
) -> Result<grenad::Reader<BufReader<File>>> { ) -> Result<grenad::Reader<BufReader<File>>> {
let mut conn = super::REDIS_CLIENT.get_connection().unwrap();
let max_memory = indexer.max_memory_by_thread(); let max_memory = indexer.max_memory_by_thread();
let mut fid_word_count_docids_sorter = create_sorter( let fid_word_count_docids_sorter = create_sorter(
grenad::SortAlgorithm::Unstable, grenad::SortAlgorithm::Unstable,
merge_deladd_cbo_roaring_bitmaps, merge_deladd_cbo_roaring_bitmaps,
indexer.chunk_compression_type, indexer.chunk_compression_type,
@ -37,9 +39,14 @@ pub fn extract_fid_word_count_docids<R: io::Read + io::Seek>(
indexer.max_nb_chunks, indexer.max_nb_chunks,
max_memory, max_memory,
); );
let mut cached_fid_word_count_docids_sorter =
SorterCacheDelAddCboRoaringBitmap::<20, MergeFn>::new(
NonZeroUsize::new(300).unwrap(),
fid_word_count_docids_sorter,
super::REDIS_CLIENT.get_connection().unwrap(),
);
let mut key_buffer = Vec::new(); let mut key_buffer = Vec::new();
let mut value_buffer = Vec::new();
let mut cursor = docid_word_positions.into_cursor()?; let mut cursor = docid_word_positions.into_cursor()?;
while let Some((key, value)) = cursor.move_on_next()? { while let Some((key, value)) = cursor.move_on_next()? {
let (document_id_bytes, fid_bytes) = try_split_array_at(key) let (document_id_bytes, fid_bytes) = try_split_array_at(key)
@ -65,30 +72,20 @@ pub fn extract_fid_word_count_docids<R: io::Read + io::Seek>(
if deletion != addition { if deletion != addition {
// Insert deleted word count in sorter if exist. // Insert deleted word count in sorter if exist.
if let Some(word_count) = deletion { if let Some(word_count) = deletion {
value_buffer.clear();
let mut value_writer = KvWriterDelAdd::new(&mut value_buffer);
value_writer.insert(DelAdd::Deletion, document_id.to_ne_bytes()).unwrap();
key_buffer.clear(); key_buffer.clear();
key_buffer.extend_from_slice(fid_bytes); key_buffer.extend_from_slice(fid_bytes);
key_buffer.push(word_count as u8); key_buffer.push(word_count as u8);
redis::cmd("INCR").arg(key_buffer.as_slice()).query::<usize>(&mut conn).unwrap(); cached_fid_word_count_docids_sorter.insert_del_u32(&key_buffer, document_id)?;
fid_word_count_docids_sorter
.insert(&key_buffer, value_writer.into_inner().unwrap())?;
} }
// Insert added word count in sorter if exist. // Insert added word count in sorter if exist.
if let Some(word_count) = addition { if let Some(word_count) = addition {
value_buffer.clear();
let mut value_writer = KvWriterDelAdd::new(&mut value_buffer);
value_writer.insert(DelAdd::Addition, document_id.to_ne_bytes()).unwrap();
key_buffer.clear(); key_buffer.clear();
key_buffer.extend_from_slice(fid_bytes); key_buffer.extend_from_slice(fid_bytes);
key_buffer.push(word_count as u8); key_buffer.push(word_count as u8);
redis::cmd("INCR").arg(key_buffer.as_slice()).query::<usize>(&mut conn).unwrap(); cached_fid_word_count_docids_sorter.insert_add_u32(&key_buffer, document_id)?;
fid_word_count_docids_sorter
.insert(&key_buffer, value_writer.into_inner().unwrap())?;
} }
} }
} }
sorter_into_reader(fid_word_count_docids_sorter, indexer) sorter_into_reader(cached_fid_word_count_docids_sorter.into_sorter()?, indexer)
} }