diff --git a/milli/src/update/index_documents/extract/extract_facet_number_docids.rs b/milli/src/update/index_documents/extract/extract_facet_number_docids.rs index bfd769604..b5550a765 100644 --- a/milli/src/update/index_documents/extract/extract_facet_number_docids.rs +++ b/milli/src/update/index_documents/extract/extract_facet_number_docids.rs @@ -23,6 +23,7 @@ pub fn extract_facet_number_docids( indexer: GrenadParameters, _settings_diff: &InnerIndexSettingsDiff, ) -> Result>> { + let mut conn = super::REDIS_CLIENT.get_connection().unwrap(); let max_memory = indexer.max_memory_by_thread(); let mut facet_number_docids_sorter = create_sorter( @@ -50,6 +51,7 @@ pub fn extract_facet_number_docids( } obkv.finish()?; + redis::cmd("INCR").arg(key_bytes.as_ref()).query::(&mut conn).unwrap(); facet_number_docids_sorter.insert(key_bytes, &buffer)?; } diff --git a/milli/src/update/index_documents/extract/extract_facet_string_docids.rs b/milli/src/update/index_documents/extract/extract_facet_string_docids.rs index 3deace127..14c95bc2d 100644 --- a/milli/src/update/index_documents/extract/extract_facet_string_docids.rs +++ b/milli/src/update/index_documents/extract/extract_facet_string_docids.rs @@ -28,6 +28,7 @@ pub fn extract_facet_string_docids( indexer: GrenadParameters, _settings_diff: &InnerIndexSettingsDiff, ) -> Result<(grenad::Reader>, grenad::Reader>)> { + let mut conn = super::REDIS_CLIENT.get_connection().unwrap(); let max_memory = indexer.max_memory_by_thread(); let options = NormalizerOption { lossy: true, ..Default::default() }; @@ -94,6 +95,7 @@ pub fn extract_facet_string_docids( let key = (field_id, hyper_normalized_value.as_ref()); let key_bytes = BEU16StrCodec::bytes_encode(&key).map_err(heed::Error::Encoding)?; + redis::cmd("INCR").arg(key_bytes.as_ref()).query::(&mut conn).unwrap(); normalized_facet_string_docids_sorter.insert(key_bytes, &buffer)?; } @@ -106,6 +108,7 @@ pub fn extract_facet_string_docids( obkv.insert(deladd_key, document_id.to_ne_bytes())?; } obkv.finish()?; + redis::cmd("INCR").arg(key_bytes.as_ref()).query::(&mut conn).unwrap(); facet_string_docids_sorter.insert(&key_bytes, &buffer)?; } diff --git a/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs b/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs index 810fa26a9..14561281b 100644 --- a/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs +++ b/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs @@ -46,6 +46,7 @@ pub fn extract_fid_docid_facet_values( indexer: GrenadParameters, settings_diff: &InnerIndexSettingsDiff, ) -> Result { + let mut conn = super::REDIS_CLIENT.get_connection().unwrap(); let max_memory = indexer.max_memory_by_thread(); let mut fid_docid_facet_numbers_sorter = create_sorter( @@ -169,20 +170,22 @@ pub fn extract_fid_docid_facet_values( add_value.map(|value| extract_facet_values(&value, add_geo_support)); // Those closures are just here to simplify things a bit. - let mut insert_numbers_diff = |del_numbers, add_numbers| { + let mut insert_numbers_diff = |del_numbers, add_numbers, conn| { insert_numbers_diff( &mut fid_docid_facet_numbers_sorter, &mut numbers_key_buffer, del_numbers, add_numbers, + conn, ) }; - let mut insert_strings_diff = |del_strings, add_strings| { + let mut insert_strings_diff = |del_strings, add_strings, conn| { insert_strings_diff( &mut fid_docid_facet_strings_sorter, &mut strings_key_buffer, del_strings, add_strings, + conn, ) }; @@ -196,8 +199,8 @@ pub fn extract_fid_docid_facet_values( del_is_empty.insert(document); } Values { numbers, strings } => { - insert_numbers_diff(numbers, vec![])?; - insert_strings_diff(strings, vec![])?; + insert_numbers_diff(numbers, vec![], &mut conn)?; + insert_strings_diff(strings, vec![], &mut conn)?; } }, (None, Some(add_filterable_values)) => match add_filterable_values { @@ -208,8 +211,8 @@ pub fn extract_fid_docid_facet_values( add_is_empty.insert(document); } Values { numbers, strings } => { - insert_numbers_diff(vec![], numbers)?; - insert_strings_diff(vec![], strings)?; + insert_numbers_diff(vec![], numbers, &mut conn)?; + insert_strings_diff(vec![], strings, &mut conn)?; } }, (Some(del_filterable_values), Some(add_filterable_values)) => { @@ -224,31 +227,31 @@ pub fn extract_fid_docid_facet_values( add_is_null.insert(document); } (Null, Values { numbers, strings }) => { - insert_numbers_diff(vec![], numbers)?; - insert_strings_diff(vec![], strings)?; + insert_numbers_diff(vec![], numbers, &mut conn)?; + insert_strings_diff(vec![], strings, &mut conn)?; del_is_null.insert(document); } (Empty, Values { numbers, strings }) => { - insert_numbers_diff(vec![], numbers)?; - insert_strings_diff(vec![], strings)?; + insert_numbers_diff(vec![], numbers, &mut conn)?; + insert_strings_diff(vec![], strings, &mut conn)?; del_is_empty.insert(document); } (Values { numbers, strings }, Null) => { add_is_null.insert(document); - insert_numbers_diff(numbers, vec![])?; - insert_strings_diff(strings, vec![])?; + insert_numbers_diff(numbers, vec![], &mut conn)?; + insert_strings_diff(strings, vec![], &mut conn)?; } (Values { numbers, strings }, Empty) => { add_is_empty.insert(document); - insert_numbers_diff(numbers, vec![])?; - insert_strings_diff(strings, vec![])?; + insert_numbers_diff(numbers, vec![], &mut conn)?; + insert_strings_diff(strings, vec![], &mut conn)?; } ( Values { numbers: del_numbers, strings: del_strings }, Values { numbers: add_numbers, strings: add_strings }, ) => { - insert_numbers_diff(del_numbers, add_numbers)?; - insert_strings_diff(del_strings, add_strings)?; + insert_numbers_diff(del_numbers, add_numbers, &mut conn)?; + insert_strings_diff(del_strings, add_strings, &mut conn)?; } } } @@ -331,6 +334,7 @@ fn insert_numbers_diff( key_buffer: &mut Vec, mut del_numbers: Vec, mut add_numbers: Vec, + conn: &mut redis::Connection, ) -> Result<()> where MF: for<'a> Fn(&[u8], &[Cow<'a, [u8]>]) -> StdResult, Error>, @@ -362,6 +366,7 @@ where let mut obkv = KvWriterDelAdd::memory(); obkv.insert(DelAdd::Deletion, bytes_of(&()))?; let bytes = obkv.into_inner()?; + redis::cmd("INCR").arg(key_buffer.as_slice()).query::(conn).unwrap(); fid_docid_facet_numbers_sorter.insert(&key_buffer, bytes)?; } } @@ -375,6 +380,7 @@ where let mut obkv = KvWriterDelAdd::memory(); obkv.insert(DelAdd::Addition, bytes_of(&()))?; let bytes = obkv.into_inner()?; + redis::cmd("INCR").arg(key_buffer.as_slice()).query::(conn).unwrap(); fid_docid_facet_numbers_sorter.insert(&key_buffer, bytes)?; } } @@ -391,6 +397,7 @@ fn insert_strings_diff( key_buffer: &mut Vec, mut del_strings: Vec<(String, String)>, mut add_strings: Vec<(String, String)>, + conn: &mut redis::Connection, ) -> Result<()> where MF: for<'a> Fn(&[u8], &[Cow<'a, [u8]>]) -> StdResult, Error>, @@ -419,6 +426,7 @@ where let mut obkv = KvWriterDelAdd::memory(); obkv.insert(DelAdd::Deletion, original)?; let bytes = obkv.into_inner()?; + redis::cmd("INCR").arg(key_buffer.as_slice()).query::(conn).unwrap(); fid_docid_facet_strings_sorter.insert(&key_buffer, bytes)?; } EitherOrBoth::Right((normalized, original)) => { @@ -428,6 +436,7 @@ where let mut obkv = KvWriterDelAdd::memory(); obkv.insert(DelAdd::Addition, original)?; let bytes = obkv.into_inner()?; + redis::cmd("INCR").arg(key_buffer.as_slice()).query::(conn).unwrap(); fid_docid_facet_strings_sorter.insert(&key_buffer, bytes)?; } } diff --git a/milli/src/update/index_documents/extract/extract_word_docids.rs b/milli/src/update/index_documents/extract/extract_word_docids.rs index 5574fca62..e70ea7666 100644 --- a/milli/src/update/index_documents/extract/extract_word_docids.rs +++ b/milli/src/update/index_documents/extract/extract_word_docids.rs @@ -132,6 +132,7 @@ pub fn extract_word_docids( buffer.clear(); let mut obkv = KvWriterDelAdd::new(&mut buffer); obkv.insert(DelAdd::Deletion, value)?; + redis::cmd("INCR").arg(w.as_bytes()).query::(&mut conn).unwrap(); if delete_from_exact { exact_word_docids_sorter.insert(w, obkv.into_inner().unwrap())?; } else { @@ -144,6 +145,7 @@ pub fn extract_word_docids( buffer.clear(); let mut obkv = KvWriterDelAdd::new(&mut buffer); obkv.insert(DelAdd::Addition, value)?; + redis::cmd("INCR").arg(w.as_bytes()).query::(&mut conn).unwrap(); if add_in_exact { exact_word_docids_sorter.insert(w, obkv.into_inner().unwrap())?; } else { diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index 7f07dafed..8b2e6683e 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -13,6 +13,7 @@ use std::result::Result as StdResult; use std::sync::Arc; use crossbeam_channel::{Receiver, Sender}; +pub use extract::REDIS_CLIENT; use grenad::{Merger, MergerBuilder}; use heed::types::Str; use heed::Database; diff --git a/milli/src/update/word_prefix_docids.rs b/milli/src/update/word_prefix_docids.rs index 925635f80..a5c25ad91 100644 --- a/milli/src/update/word_prefix_docids.rs +++ b/milli/src/update/word_prefix_docids.rs @@ -4,6 +4,7 @@ use grenad::CompressionType; use heed::types::{Bytes, Str}; use heed::Database; +use super::index_documents::REDIS_CLIENT; use crate::update::del_add::{deladd_serialize_add_side, DelAdd, KvWriterDelAdd}; use crate::update::index_documents::{ create_sorter, merge_deladd_cbo_roaring_bitmaps, @@ -52,6 +53,8 @@ impl<'t, 'i> WordPrefixDocids<'t, 'i> { common_prefix_fst_words: &[&[String]], del_prefix_fst_words: &HashSet>, ) -> Result<()> { + let mut conn = REDIS_CLIENT.get_connection().unwrap(); + // It is forbidden to keep a mutable reference into the database // and write into it at the same time, therefore we write into another file. let mut prefix_docids_sorter = create_sorter( @@ -71,7 +74,11 @@ impl<'t, 'i> WordPrefixDocids<'t, 'i> { current_prefixes = match current_prefixes.take() { Some(prefixes) if word.starts_with(prefixes[0].as_bytes()) => Some(prefixes), _otherwise => { - write_prefixes_in_sorter(&mut prefixes_cache, &mut prefix_docids_sorter)?; + write_prefixes_in_sorter( + &mut prefixes_cache, + &mut prefix_docids_sorter, + &mut conn, + )?; common_prefix_fst_words .iter() .find(|prefixes| word.starts_with(prefixes[0].as_bytes())) @@ -93,7 +100,7 @@ impl<'t, 'i> WordPrefixDocids<'t, 'i> { } } - write_prefixes_in_sorter(&mut prefixes_cache, &mut prefix_docids_sorter)?; + write_prefixes_in_sorter(&mut prefixes_cache, &mut prefix_docids_sorter, &mut conn)?; } // We fetch the docids associated to the newly added word prefix fst only. @@ -107,6 +114,7 @@ impl<'t, 'i> WordPrefixDocids<'t, 'i> { let mut writer = KvWriterDelAdd::new(&mut buffer); writer.insert(DelAdd::Addition, data)?; + redis::cmd("INCR").arg(prefix.as_bytes()).query::(&mut conn).unwrap(); prefix_docids_sorter.insert(prefix, writer.into_inner()?)?; } } @@ -140,10 +148,12 @@ impl<'t, 'i> WordPrefixDocids<'t, 'i> { fn write_prefixes_in_sorter( prefixes: &mut HashMap, Vec>>, sorter: &mut grenad::Sorter, + conn: &mut redis::Connection, ) -> Result<()> { for (key, data_slices) in prefixes.drain() { for data in data_slices { if valid_lmdb_key(&key) { + redis::cmd("INCR").arg(key.as_slice()).query::(conn).unwrap(); sorter.insert(&key, data)?; } } diff --git a/milli/src/update/words_prefix_integer_docids.rs b/milli/src/update/words_prefix_integer_docids.rs index 9b6aa21ae..7922887dd 100644 --- a/milli/src/update/words_prefix_integer_docids.rs +++ b/milli/src/update/words_prefix_integer_docids.rs @@ -13,7 +13,7 @@ use crate::update::del_add::{deladd_serialize_add_side, DelAdd, KvWriterDelAdd}; use crate::update::index_documents::{ create_sorter, merge_deladd_cbo_roaring_bitmaps, merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, valid_lmdb_key, - write_sorter_into_database, CursorClonableMmap, MergeFn, + write_sorter_into_database, CursorClonableMmap, MergeFn, REDIS_CLIENT, }; use crate::{CboRoaringBitmapCodec, Result}; @@ -59,6 +59,8 @@ impl<'t, 'i> WordPrefixIntegerDocids<'t, 'i> { ) -> Result<()> { debug!("Computing and writing the word levels integers docids into LMDB on disk..."); + let mut conn = REDIS_CLIENT.get_connection().unwrap(); + let mut prefix_integer_docids_sorter = create_sorter( grenad::SortAlgorithm::Unstable, merge_deladd_cbo_roaring_bitmaps, @@ -85,6 +87,7 @@ impl<'t, 'i> WordPrefixIntegerDocids<'t, 'i> { write_prefixes_in_sorter( &mut prefixes_cache, &mut prefix_integer_docids_sorter, + &mut conn, )?; common_prefix_fst_words .iter() @@ -110,7 +113,11 @@ impl<'t, 'i> WordPrefixIntegerDocids<'t, 'i> { } } - write_prefixes_in_sorter(&mut prefixes_cache, &mut prefix_integer_docids_sorter)?; + write_prefixes_in_sorter( + &mut prefixes_cache, + &mut prefix_integer_docids_sorter, + &mut conn, + )?; } // We fetch the docids associated to the newly added word prefix fst only. @@ -135,6 +142,7 @@ impl<'t, 'i> WordPrefixIntegerDocids<'t, 'i> { buffer.clear(); let mut writer = KvWriterDelAdd::new(&mut buffer); writer.insert(DelAdd::Addition, data)?; + redis::cmd("INCR").arg(bytes.as_ref()).query::(&mut conn).unwrap(); prefix_integer_docids_sorter.insert(bytes, writer.into_inner()?)?; } } @@ -174,11 +182,13 @@ impl<'t, 'i> WordPrefixIntegerDocids<'t, 'i> { fn write_prefixes_in_sorter( prefixes: &mut HashMap, Vec>>, sorter: &mut grenad::Sorter, + conn: &mut redis::Connection, ) -> Result<()> { // TODO: Merge before insertion. for (key, data_slices) in prefixes.drain() { for data in data_slices { if valid_lmdb_key(&key) { + redis::cmd("INCR").arg(key.as_slice()).query::(conn).unwrap(); sorter.insert(&key, data)?; } }