Use Redis to measure the Sorter insertions

This commit is contained in:
Clément Renault 2024-07-17 13:55:51 +02:00
parent 48bc797dce
commit f17cb2ef5b
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
8 changed files with 62 additions and 0 deletions

32
Cargo.lock generated
View File

@ -1047,6 +1047,16 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7"
[[package]]
name = "combine"
version = "4.6.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd"
dependencies = [
"bytes",
"memchr",
]
[[package]] [[package]]
name = "concat-arrays" name = "concat-arrays"
version = "0.1.2" version = "0.1.2"
@ -3552,6 +3562,7 @@ dependencies = [
"rand", "rand",
"rayon", "rayon",
"rayon-par-bridge", "rayon-par-bridge",
"redis",
"rhai", "rhai",
"roaring", "roaring",
"rstar", "rstar",
@ -4420,6 +4431,21 @@ version = "0.5.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03251193000f4bd3b042892be858ee50e8b3719f2b08e5833ac4353724632430" checksum = "03251193000f4bd3b042892be858ee50e8b3719f2b08e5833ac4353724632430"
[[package]]
name = "redis"
version = "0.25.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e0d7a6955c7511f60f3ba9e86c6d02b3c3f144f8c24b288d1f4e18074ab8bbec"
dependencies = [
"combine",
"itoa",
"percent-encoding",
"ryu",
"sha1_smol",
"socket2 0.5.5",
"url",
]
[[package]] [[package]]
name = "redox_syscall" name = "redox_syscall"
version = "0.2.16" version = "0.2.16"
@ -4891,6 +4917,12 @@ dependencies = [
"digest", "digest",
] ]
[[package]]
name = "sha1_smol"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae1a47186c03a32177042e55dbc5fd5aee900b8e0069a8d70fba96a9375cd012"
[[package]] [[package]]
name = "sha2" name = "sha2"
version = "0.10.8" version = "0.10.8"

View File

@ -67,6 +67,8 @@ filter-parser = { path = "../filter-parser" }
# documents words self-join # documents words self-join
itertools = "0.13.0" itertools = "0.13.0"
redis = "0.25.4"
csv = "1.3.0" csv = "1.3.0"
candle-core = { version = "0.6.0" } candle-core = { version = "0.6.0" }
candle-transformers = { version = "0.6.0" } candle-transformers = { version = "0.6.0" }

View File

@ -29,6 +29,8 @@ pub fn extract_docid_word_positions<R: io::Read + io::Seek>(
settings_diff: &InnerIndexSettingsDiff, settings_diff: &InnerIndexSettingsDiff,
max_positions_per_attributes: Option<u32>, max_positions_per_attributes: Option<u32>,
) -> Result<(grenad::Reader<BufReader<File>>, ScriptLanguageDocidsMap)> { ) -> Result<(grenad::Reader<BufReader<File>>, ScriptLanguageDocidsMap)> {
let mut conn = super::REDIS_CLIENT.get_connection().unwrap();
let max_positions_per_attributes = max_positions_per_attributes let max_positions_per_attributes = max_positions_per_attributes
.map_or(MAX_POSITION_PER_ATTRIBUTE, |max| max.min(MAX_POSITION_PER_ATTRIBUTE)); .map_or(MAX_POSITION_PER_ATTRIBUTE, |max| max.min(MAX_POSITION_PER_ATTRIBUTE));
let max_memory = indexer.max_memory_by_thread(); let max_memory = indexer.max_memory_by_thread();
@ -148,6 +150,7 @@ pub fn extract_docid_word_positions<R: io::Read + io::Seek>(
for (field_id, value) in obkv.iter() { for (field_id, value) in obkv.iter() {
key_buffer.truncate(mem::size_of::<u32>()); key_buffer.truncate(mem::size_of::<u32>());
key_buffer.extend_from_slice(&field_id.to_be_bytes()); key_buffer.extend_from_slice(&field_id.to_be_bytes());
redis::cmd("INCR").arg(key_buffer.as_slice()).query::<usize>(&mut conn).unwrap();
docid_word_positions_sorter.insert(&key_buffer, value)?; docid_word_positions_sorter.insert(&key_buffer, value)?;
} }

View File

@ -26,6 +26,7 @@ pub fn extract_fid_word_count_docids<R: io::Read + io::Seek>(
indexer: GrenadParameters, indexer: GrenadParameters,
_settings_diff: &InnerIndexSettingsDiff, _settings_diff: &InnerIndexSettingsDiff,
) -> Result<grenad::Reader<BufReader<File>>> { ) -> Result<grenad::Reader<BufReader<File>>> {
let mut conn = super::REDIS_CLIENT.get_connection().unwrap();
let max_memory = indexer.max_memory_by_thread(); let max_memory = indexer.max_memory_by_thread();
let mut fid_word_count_docids_sorter = create_sorter( let mut fid_word_count_docids_sorter = create_sorter(
@ -70,6 +71,7 @@ pub fn extract_fid_word_count_docids<R: io::Read + io::Seek>(
key_buffer.clear(); key_buffer.clear();
key_buffer.extend_from_slice(fid_bytes); key_buffer.extend_from_slice(fid_bytes);
key_buffer.push(word_count as u8); key_buffer.push(word_count as u8);
redis::cmd("INCR").arg(key_buffer.as_slice()).query::<usize>(&mut conn).unwrap();
fid_word_count_docids_sorter fid_word_count_docids_sorter
.insert(&key_buffer, value_writer.into_inner().unwrap())?; .insert(&key_buffer, value_writer.into_inner().unwrap())?;
} }
@ -81,6 +83,7 @@ pub fn extract_fid_word_count_docids<R: io::Read + io::Seek>(
key_buffer.clear(); key_buffer.clear();
key_buffer.extend_from_slice(fid_bytes); key_buffer.extend_from_slice(fid_bytes);
key_buffer.push(word_count as u8); key_buffer.push(word_count as u8);
redis::cmd("INCR").arg(key_buffer.as_slice()).query::<usize>(&mut conn).unwrap();
fid_word_count_docids_sorter fid_word_count_docids_sorter
.insert(&key_buffer, value_writer.into_inner().unwrap())?; .insert(&key_buffer, value_writer.into_inner().unwrap())?;
} }

View File

@ -10,6 +10,7 @@ use super::helpers::{
create_sorter, create_writer, merge_deladd_cbo_roaring_bitmaps, try_split_array_at, create_sorter, create_writer, merge_deladd_cbo_roaring_bitmaps, try_split_array_at,
writer_into_reader, GrenadParameters, writer_into_reader, GrenadParameters,
}; };
use super::REDIS_CLIENT;
use crate::error::SerializationError; use crate::error::SerializationError;
use crate::heed_codec::StrBEU16Codec; use crate::heed_codec::StrBEU16Codec;
use crate::index::db_name::DOCID_WORD_POSITIONS; use crate::index::db_name::DOCID_WORD_POSITIONS;
@ -37,6 +38,7 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
grenad::Reader<BufReader<File>>, grenad::Reader<BufReader<File>>,
)> { )> {
let max_memory = indexer.max_memory_by_thread(); let max_memory = indexer.max_memory_by_thread();
let mut conn = REDIS_CLIENT.get_connection().unwrap();
let mut word_fid_docids_sorter = create_sorter( let mut word_fid_docids_sorter = create_sorter(
grenad::SortAlgorithm::Unstable, grenad::SortAlgorithm::Unstable,
@ -80,6 +82,7 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
&del_words, &del_words,
&add_words, &add_words,
&mut word_fid_docids_sorter, &mut word_fid_docids_sorter,
&mut conn,
)?; )?;
del_words.clear(); del_words.clear();
@ -164,6 +167,7 @@ fn words_into_sorter(
del_words: &BTreeSet<Vec<u8>>, del_words: &BTreeSet<Vec<u8>>,
add_words: &BTreeSet<Vec<u8>>, add_words: &BTreeSet<Vec<u8>>,
word_fid_docids_sorter: &mut grenad::Sorter<MergeFn>, word_fid_docids_sorter: &mut grenad::Sorter<MergeFn>,
conn: &mut redis::Connection,
) -> Result<()> { ) -> Result<()> {
use itertools::merge_join_by; use itertools::merge_join_by;
use itertools::EitherOrBoth::{Both, Left, Right}; use itertools::EitherOrBoth::{Both, Left, Right};
@ -192,18 +196,21 @@ fn words_into_sorter(
key_buffer.extend_from_slice(word_bytes); key_buffer.extend_from_slice(word_bytes);
key_buffer.push(0); key_buffer.push(0);
key_buffer.extend_from_slice(&fid.to_be_bytes()); key_buffer.extend_from_slice(&fid.to_be_bytes());
redis::cmd("INCR").arg(key_buffer.as_slice()).query::<usize>(conn).unwrap();
word_fid_docids_sorter.insert(&key_buffer, value_writer.into_inner().unwrap())?; word_fid_docids_sorter.insert(&key_buffer, value_writer.into_inner().unwrap())?;
} }
Ok(()) Ok(())
} }
// TODO do we still use this?
#[tracing::instrument(level = "trace", skip_all, target = "indexing::extract")] #[tracing::instrument(level = "trace", skip_all, target = "indexing::extract")]
fn docids_into_writers<W>( fn docids_into_writers<W>(
word: &str, word: &str,
deletions: &RoaringBitmap, deletions: &RoaringBitmap,
additions: &RoaringBitmap, additions: &RoaringBitmap,
writer: &mut grenad::Writer<W>, writer: &mut grenad::Writer<W>,
conn: &mut redis::Connection,
) -> Result<()> ) -> Result<()>
where where
W: std::io::Write, W: std::io::Write,
@ -235,6 +242,7 @@ where
} }
// insert everything in the same writer. // insert everything in the same writer.
redis::cmd("INCR").arg(word.as_bytes()).query::<usize>(conn).unwrap();
writer.insert(word.as_bytes(), obkv.into_inner().unwrap())?; writer.insert(word.as_bytes(), obkv.into_inner().unwrap())?;
Ok(()) Ok(())

View File

@ -26,6 +26,8 @@ pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>(
indexer: GrenadParameters, indexer: GrenadParameters,
settings_diff: &InnerIndexSettingsDiff, settings_diff: &InnerIndexSettingsDiff,
) -> Result<grenad::Reader<BufReader<File>>> { ) -> Result<grenad::Reader<BufReader<File>>> {
let mut conn = super::REDIS_CLIENT.get_connection().unwrap();
// early return if the data shouldn't be deleted nor created. // early return if the data shouldn't be deleted nor created.
if settings_diff.settings_update_only && !settings_diff.reindex_proximities() { if settings_diff.settings_update_only && !settings_diff.reindex_proximities() {
let writer = create_writer( let writer = create_writer(
@ -78,6 +80,7 @@ pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>(
&del_word_pair_proximity, &del_word_pair_proximity,
&add_word_pair_proximity, &add_word_pair_proximity,
&mut word_pair_proximity_docids_sorters, &mut word_pair_proximity_docids_sorters,
&mut conn,
)?; )?;
del_word_pair_proximity.clear(); del_word_pair_proximity.clear();
add_word_pair_proximity.clear(); add_word_pair_proximity.clear();
@ -168,6 +171,7 @@ pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>(
&del_word_pair_proximity, &del_word_pair_proximity,
&add_word_pair_proximity, &add_word_pair_proximity,
&mut word_pair_proximity_docids_sorters, &mut word_pair_proximity_docids_sorters,
&mut conn,
)?; )?;
} }
{ {
@ -198,6 +202,7 @@ fn document_word_positions_into_sorter(
del_word_pair_proximity: &BTreeMap<(String, String), u8>, del_word_pair_proximity: &BTreeMap<(String, String), u8>,
add_word_pair_proximity: &BTreeMap<(String, String), u8>, add_word_pair_proximity: &BTreeMap<(String, String), u8>,
word_pair_proximity_docids_sorters: &mut [grenad::Sorter<MergeFn>], word_pair_proximity_docids_sorters: &mut [grenad::Sorter<MergeFn>],
conn: &mut redis::Connection,
) -> Result<()> { ) -> Result<()> {
use itertools::merge_join_by; use itertools::merge_join_by;
use itertools::EitherOrBoth::{Both, Left, Right}; use itertools::EitherOrBoth::{Both, Left, Right};
@ -233,6 +238,7 @@ fn document_word_positions_into_sorter(
key_buffer.push(0); key_buffer.push(0);
key_buffer.extend_from_slice(w2.as_bytes()); key_buffer.extend_from_slice(w2.as_bytes());
redis::cmd("INCR").arg(key_buffer.as_slice()).query::<usize>(conn).unwrap();
word_pair_proximity_docids_sorters[*prox as usize - 1] word_pair_proximity_docids_sorters[*prox as usize - 1]
.insert(&key_buffer, value_writer.into_inner().unwrap())?; .insert(&key_buffer, value_writer.into_inner().unwrap())?;
} }

View File

@ -25,6 +25,7 @@ pub fn extract_word_position_docids<R: io::Read + io::Seek>(
indexer: GrenadParameters, indexer: GrenadParameters,
_settings_diff: &InnerIndexSettingsDiff, _settings_diff: &InnerIndexSettingsDiff,
) -> Result<grenad::Reader<BufReader<File>>> { ) -> Result<grenad::Reader<BufReader<File>>> {
let mut conn = super::REDIS_CLIENT.get_connection().unwrap();
let max_memory = indexer.max_memory_by_thread(); let max_memory = indexer.max_memory_by_thread();
let mut word_position_docids_sorter = create_sorter( let mut word_position_docids_sorter = create_sorter(
@ -53,6 +54,7 @@ pub fn extract_word_position_docids<R: io::Read + io::Seek>(
&del_word_positions, &del_word_positions,
&add_word_positions, &add_word_positions,
&mut word_position_docids_sorter, &mut word_position_docids_sorter,
&mut conn,
)?; )?;
del_word_positions.clear(); del_word_positions.clear();
add_word_positions.clear(); add_word_positions.clear();
@ -85,6 +87,7 @@ pub fn extract_word_position_docids<R: io::Read + io::Seek>(
&del_word_positions, &del_word_positions,
&add_word_positions, &add_word_positions,
&mut word_position_docids_sorter, &mut word_position_docids_sorter,
&mut conn,
)?; )?;
} }
@ -101,6 +104,7 @@ fn words_position_into_sorter(
del_word_positions: &BTreeSet<(u16, Vec<u8>)>, del_word_positions: &BTreeSet<(u16, Vec<u8>)>,
add_word_positions: &BTreeSet<(u16, Vec<u8>)>, add_word_positions: &BTreeSet<(u16, Vec<u8>)>,
word_position_docids_sorter: &mut grenad::Sorter<MergeFn>, word_position_docids_sorter: &mut grenad::Sorter<MergeFn>,
conn: &mut redis::Connection,
) -> Result<()> { ) -> Result<()> {
use itertools::merge_join_by; use itertools::merge_join_by;
use itertools::EitherOrBoth::{Both, Left, Right}; use itertools::EitherOrBoth::{Both, Left, Right};
@ -131,6 +135,7 @@ fn words_position_into_sorter(
key_buffer.extend_from_slice(word_bytes); key_buffer.extend_from_slice(word_bytes);
key_buffer.push(0); key_buffer.push(0);
key_buffer.extend_from_slice(&position.to_be_bytes()); key_buffer.extend_from_slice(&position.to_be_bytes());
redis::cmd("INCR").arg(key_buffer.as_slice()).query::<usize>(conn).unwrap();
word_position_docids_sorter.insert(&key_buffer, value_writer.into_inner().unwrap())?; word_position_docids_sorter.insert(&key_buffer, value_writer.into_inner().unwrap())?;
} }

View File

@ -35,6 +35,9 @@ use crate::update::settings::InnerIndexSettingsDiff;
use crate::vector::error::PossibleEmbeddingMistakes; use crate::vector::error::PossibleEmbeddingMistakes;
use crate::{FieldId, Result, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder}; use crate::{FieldId, Result, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder};
pub static REDIS_CLIENT: once_cell::sync::Lazy<redis::Client> =
once_cell::sync::Lazy::new(|| redis::Client::open("redis://127.0.0.1/").unwrap());
/// Extract data for each databases from obkv documents in parallel. /// Extract data for each databases from obkv documents in parallel.
/// Send data in grenad file over provided Sender. /// Send data in grenad file over provided Sender.
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]