diff --git a/milli/src/update/index_documents/cache.rs b/milli/src/update/index_documents/cache.rs index 5d3664301..cd633670b 100644 --- a/milli/src/update/index_documents/cache.rs +++ b/milli/src/update/index_documents/cache.rs @@ -140,6 +140,10 @@ where self.sorter.insert(key, value_writer.into_inner().unwrap()) } + pub fn direct_insert(&mut self, key: &[u8], val: &[u8]) -> Result<(), grenad::Error> { + self.sorter.insert(key, val) + } + pub fn into_sorter(mut self) -> Result, grenad::Error> { let default_lru = LruCache::new(NonZeroUsize::MIN); for (key, deladd) in mem::replace(&mut self.cache, default_lru) { @@ -155,10 +159,6 @@ pub struct DelAddRoaringBitmap { } impl DelAddRoaringBitmap { - fn new_del_add(bitmap: RoaringBitmap) -> Self { - DelAddRoaringBitmap { del: Some(bitmap.clone()), add: Some(bitmap) } - } - fn new_del_add_u32(n: u32) -> Self { DelAddRoaringBitmap { del: Some(RoaringBitmap::from([n])), diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index 0a9a475bb..c6dfd86a7 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -1,4 +1,4 @@ -mod cache; +pub mod cache; mod enrich; mod extract; mod helpers; diff --git a/milli/src/update/word_prefix_docids.rs b/milli/src/update/word_prefix_docids.rs index a5c25ad91..4326a380c 100644 --- a/milli/src/update/word_prefix_docids.rs +++ b/milli/src/update/word_prefix_docids.rs @@ -1,11 +1,13 @@ use std::collections::{HashMap, HashSet}; +use std::num::NonZeroUsize; use grenad::CompressionType; -use heed::types::{Bytes, Str}; +use heed::types::Str; use heed::Database; +use super::index_documents::cache::SorterCacheDelAddCboRoaringBitmap; use super::index_documents::REDIS_CLIENT; -use crate::update::del_add::{deladd_serialize_add_side, DelAdd, KvWriterDelAdd}; +use crate::update::del_add::deladd_serialize_add_side; use crate::update::index_documents::{ create_sorter, merge_deladd_cbo_roaring_bitmaps, merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, valid_lmdb_key, @@ -53,11 +55,9 @@ impl<'t, 'i> WordPrefixDocids<'t, 'i> { common_prefix_fst_words: &[&[String]], del_prefix_fst_words: &HashSet>, ) -> Result<()> { - let mut conn = REDIS_CLIENT.get_connection().unwrap(); - // It is forbidden to keep a mutable reference into the database // and write into it at the same time, therefore we write into another file. - let mut prefix_docids_sorter = create_sorter( + let prefix_docids_sorter = create_sorter( grenad::SortAlgorithm::Unstable, merge_deladd_cbo_roaring_bitmaps, self.chunk_compression_type, @@ -65,6 +65,11 @@ impl<'t, 'i> WordPrefixDocids<'t, 'i> { self.max_nb_chunks, self.max_memory, ); + let mut cached_prefix_docids_sorter = SorterCacheDelAddCboRoaringBitmap::<20, MergeFn>::new( + NonZeroUsize::new(200).unwrap(), + prefix_docids_sorter, + REDIS_CLIENT.get_connection().unwrap(), + ); if !common_prefix_fst_words.is_empty() { let mut current_prefixes: Option<&&[String]> = None; @@ -76,8 +81,7 @@ impl<'t, 'i> WordPrefixDocids<'t, 'i> { _otherwise => { write_prefixes_in_sorter( &mut prefixes_cache, - &mut prefix_docids_sorter, - &mut conn, + &mut cached_prefix_docids_sorter, )?; common_prefix_fst_words .iter() @@ -100,22 +104,17 @@ impl<'t, 'i> WordPrefixDocids<'t, 'i> { } } - write_prefixes_in_sorter(&mut prefixes_cache, &mut prefix_docids_sorter, &mut conn)?; + write_prefixes_in_sorter(&mut prefixes_cache, &mut cached_prefix_docids_sorter)?; } // We fetch the docids associated to the newly added word prefix fst only. - let db = self.word_docids.remap_data_type::(); - let mut buffer = Vec::new(); + let db = self.word_docids.lazily_decode_data(); for prefix in new_prefix_fst_words { let prefix = std::str::from_utf8(prefix.as_bytes())?; for result in db.prefix_iter(self.wtxn, prefix)? { - let (_word, data) = result?; - buffer.clear(); - let mut writer = KvWriterDelAdd::new(&mut buffer); - writer.insert(DelAdd::Addition, data)?; - - redis::cmd("INCR").arg(prefix.as_bytes()).query::(&mut conn).unwrap(); - prefix_docids_sorter.insert(prefix, writer.into_inner()?)?; + let (_word, lazy_data) = result?; + cached_prefix_docids_sorter + .insert_add(prefix.as_bytes(), lazy_data.decode().unwrap())?; } } @@ -133,7 +132,7 @@ impl<'t, 'i> WordPrefixDocids<'t, 'i> { // We finally write the word prefix docids into the LMDB database. write_sorter_into_database( - prefix_docids_sorter, + cached_prefix_docids_sorter.into_sorter()?, &self.word_prefix_docids, self.wtxn, database_is_empty, @@ -147,14 +146,12 @@ impl<'t, 'i> WordPrefixDocids<'t, 'i> { fn write_prefixes_in_sorter( prefixes: &mut HashMap, Vec>>, - sorter: &mut grenad::Sorter, - conn: &mut redis::Connection, + sorter: &mut SorterCacheDelAddCboRoaringBitmap<20, MergeFn>, ) -> Result<()> { for (key, data_slices) in prefixes.drain() { for data in data_slices { if valid_lmdb_key(&key) { - redis::cmd("INCR").arg(key.as_slice()).query::(conn).unwrap(); - sorter.insert(&key, data)?; + sorter.direct_insert(&key, &data)?; } } } diff --git a/milli/src/update/words_prefix_integer_docids.rs b/milli/src/update/words_prefix_integer_docids.rs index 7922887dd..8e6e957ce 100644 --- a/milli/src/update/words_prefix_integer_docids.rs +++ b/milli/src/update/words_prefix_integer_docids.rs @@ -1,4 +1,5 @@ use std::collections::{HashMap, HashSet}; +use std::num::NonZeroUsize; use std::str; use grenad::CompressionType; @@ -9,7 +10,8 @@ use tracing::debug; use crate::error::SerializationError; use crate::heed_codec::StrBEU16Codec; use crate::index::main_key::WORDS_PREFIXES_FST_KEY; -use crate::update::del_add::{deladd_serialize_add_side, DelAdd, KvWriterDelAdd}; +use crate::update::del_add::deladd_serialize_add_side; +use crate::update::index_documents::cache::SorterCacheDelAddCboRoaringBitmap; use crate::update::index_documents::{ create_sorter, merge_deladd_cbo_roaring_bitmaps, merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, valid_lmdb_key, @@ -59,9 +61,7 @@ impl<'t, 'i> WordPrefixIntegerDocids<'t, 'i> { ) -> Result<()> { debug!("Computing and writing the word levels integers docids into LMDB on disk..."); - let mut conn = REDIS_CLIENT.get_connection().unwrap(); - - let mut prefix_integer_docids_sorter = create_sorter( + let prefix_integer_docids_sorter = create_sorter( grenad::SortAlgorithm::Unstable, merge_deladd_cbo_roaring_bitmaps, self.chunk_compression_type, @@ -69,6 +69,12 @@ impl<'t, 'i> WordPrefixIntegerDocids<'t, 'i> { self.max_nb_chunks, self.max_memory, ); + let mut cached_prefix_integer_docids_sorter = + SorterCacheDelAddCboRoaringBitmap::<20, MergeFn>::new( + NonZeroUsize::new(200).unwrap(), + prefix_integer_docids_sorter, + REDIS_CLIENT.get_connection().unwrap(), + ); if !common_prefix_fst_words.is_empty() { // We fetch all the new common prefixes between the previous and new prefix fst. @@ -86,8 +92,7 @@ impl<'t, 'i> WordPrefixIntegerDocids<'t, 'i> { _otherwise => { write_prefixes_in_sorter( &mut prefixes_cache, - &mut prefix_integer_docids_sorter, - &mut conn, + &mut cached_prefix_integer_docids_sorter, )?; common_prefix_fst_words .iter() @@ -115,14 +120,12 @@ impl<'t, 'i> WordPrefixIntegerDocids<'t, 'i> { write_prefixes_in_sorter( &mut prefixes_cache, - &mut prefix_integer_docids_sorter, - &mut conn, + &mut cached_prefix_integer_docids_sorter, )?; } // We fetch the docids associated to the newly added word prefix fst only. - let db = self.word_database.remap_data_type::(); - let mut buffer = Vec::new(); + let db = self.word_database.lazily_decode_data(); for prefix_bytes in new_prefix_fst_words { let prefix = str::from_utf8(prefix_bytes.as_bytes()).map_err(|_| { SerializationError::Decoding { db_name: Some(WORDS_PREFIXES_FST_KEY) } @@ -134,16 +137,12 @@ impl<'t, 'i> WordPrefixIntegerDocids<'t, 'i> { .prefix_iter(self.wtxn, prefix_bytes.as_bytes())? .remap_key_type::(); for result in iter { - let ((word, pos), data) = result?; + let ((word, pos), lazy_data) = result?; if word.starts_with(prefix) { let key = (prefix, pos); let bytes = StrBEU16Codec::bytes_encode(&key).unwrap(); - - buffer.clear(); - let mut writer = KvWriterDelAdd::new(&mut buffer); - writer.insert(DelAdd::Addition, data)?; - redis::cmd("INCR").arg(bytes.as_ref()).query::(&mut conn).unwrap(); - prefix_integer_docids_sorter.insert(bytes, writer.into_inner()?)?; + cached_prefix_integer_docids_sorter + .insert_add(&bytes, lazy_data.decode().unwrap())?; } } } @@ -167,7 +166,7 @@ impl<'t, 'i> WordPrefixIntegerDocids<'t, 'i> { // We finally write all the word prefix integer docids into the LMDB database. write_sorter_into_database( - prefix_integer_docids_sorter, + cached_prefix_integer_docids_sorter.into_sorter()?, &self.prefix_database, self.wtxn, database_is_empty, @@ -181,15 +180,13 @@ impl<'t, 'i> WordPrefixIntegerDocids<'t, 'i> { fn write_prefixes_in_sorter( prefixes: &mut HashMap, Vec>>, - sorter: &mut grenad::Sorter, - conn: &mut redis::Connection, + sorter: &mut SorterCacheDelAddCboRoaringBitmap<20, MergeFn>, ) -> Result<()> { // TODO: Merge before insertion. for (key, data_slices) in prefixes.drain() { for data in data_slices { if valid_lmdb_key(&key) { - redis::cmd("INCR").arg(key.as_slice()).query::(conn).unwrap(); - sorter.insert(&key, data)?; + sorter.direct_insert(&key, &data)?; } } }