diff --git a/crates/milli/src/update/new/words_prefix_docids.rs b/crates/milli/src/update/new/words_prefix_docids.rs index ffc0c5048..5454d815e 100644 --- a/crates/milli/src/update/new/words_prefix_docids.rs +++ b/crates/milli/src/update/new/words_prefix_docids.rs @@ -4,7 +4,7 @@ use std::io::{BufReader, BufWriter, Read, Seek, Write}; use hashbrown::HashMap; use heed::types::Bytes; -use heed::{BytesDecode, Database, RoTxn, RwTxn}; +use heed::{BytesDecode, Database, Error, RoTxn, RwTxn}; use rayon::iter::{IntoParallelIterator, ParallelIterator as _}; use roaring::MultiOps; use tempfile::tempfile; @@ -171,35 +171,54 @@ impl WordPrefixIntegerDocids { prefixes: &HashSet, ) -> Result<()> { // We fetch the docids associated to the newly added word prefix fst only. - // We use a HashMap to store the docids associated to each position, may be RAM consuming. - let mut integer_docids = HashMap::new(); - let mut key_buffer = Vec::new(); - for prefix in prefixes { - let prefix = prefix.as_bytes(); - for result in self.database.prefix_iter(wtxn, prefix)? { - let (key, data) = result?; - let (_word, pos) = - StrBEU16Codec::bytes_decode(key).map_err(heed::Error::Decoding)?; + // And collect the CboRoaringBitmaps pointers in an HashMap. + let frozen = FrozenPrefixIntegerBitmaps::from_prefixes(self.database, wtxn, prefixes)?; - match integer_docids.get_mut(&pos) { - Some(docids) => { - *docids |= &data; - } - None => { - integer_docids.insert(pos, data); - } - } + // We access this HashMap in parallel to compute the *union* of all + // of them and *serialize* them into files. There is one file by CPU. + let local_entries = ThreadLocal::with_capacity(rayon::current_num_threads()); + prefixes.into_par_iter().map(AsRef::as_ref).try_for_each(|prefix| { + let refcell = local_entries.get_or_try(|| { + tempfile().map(BufWriter::new).map(|f| RefCell::new((Vec::new(), f, Vec::new()))) + })?; + + let mut refmut = refcell.borrow_mut_or_yield(); + let (ref mut index, ref mut file, ref mut buffer) = *refmut; + + for (&pos, bitmaps_bytes) in frozen.bitmaps(prefix).unwrap() { + let output = bitmaps_bytes + .iter() + .map(|bytes| CboRoaringBitmapCodec::deserialize_from(bytes)) + .union()?; + + buffer.clear(); + CboRoaringBitmapCodec::serialize_into(&output, buffer); + index.push(PrefixIntegerEntry { prefix, pos, serialized_length: buffer.len() }); + file.write_all(buffer)?; } - for (pos, docids) in integer_docids.iter_mut() { - if !docids.is_empty() { - key_buffer.clear(); - key_buffer.extend_from_slice(prefix); - key_buffer.push(0); - key_buffer.extend_from_slice(&pos.to_be_bytes()); - self.prefix_database.put(wtxn, &key_buffer, docids)?; - } - docids.clear(); + Result::Ok(()) + })?; + + drop(frozen); + + // We iterate over all the collected and serialized bitmaps through + // the files and entries to eventually put them in the final database. + let mut key_buffer = Vec::new(); + for refcell in local_entries { + let (index, file, mut buffer) = refcell.into_inner(); + let mut file = file.into_inner().map_err(|e| e.into_error())?; + file.rewind()?; + let mut file = BufReader::new(file); + for PrefixIntegerEntry { prefix, pos, serialized_length } in index { + buffer.resize(serialized_length, 0); + file.read_exact(&mut buffer)?; + + key_buffer.clear(); + key_buffer.extend_from_slice(prefix.as_bytes()); + key_buffer.push(0); + key_buffer.extend_from_slice(&pos.to_be_bytes()); + self.prefix_database.remap_data_type::().put(wtxn, &key_buffer, &buffer)?; } } @@ -207,6 +226,48 @@ impl WordPrefixIntegerDocids { } } +/// Represents a prefix and the lenght the bitmap takes on disk. +struct PrefixIntegerEntry<'a> { + prefix: &'a str, + pos: u16, + serialized_length: usize, +} + +/// TODO doc +struct FrozenPrefixIntegerBitmaps<'a, 'rtxn> { + prefixes_bitmaps: HashMap<&'a str, HashMap>>, +} + +impl<'a, 'rtxn> FrozenPrefixIntegerBitmaps<'a, 'rtxn> { + #[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")] + pub fn from_prefixes( + database: Database, + rtxn: &'rtxn RoTxn, + prefixes: &'a HashSet, + ) -> heed::Result { + let database = database.remap_data_type::(); + + let mut prefixes_bitmaps = HashMap::new(); + for prefix in prefixes { + let mut positions = HashMap::new(); + for result in database.prefix_iter(rtxn, prefix.as_bytes())? { + let (key, bytes) = result?; + let (_word, pos) = StrBEU16Codec::bytes_decode(key).map_err(Error::Decoding)?; + positions.entry(pos).or_insert_with(Vec::new).push(bytes); + } + assert!(prefixes_bitmaps.insert(prefix.as_str(), positions).is_none()); + } + + Ok(Self { prefixes_bitmaps }) + } + + pub fn bitmaps(&self, key: &'a str) -> Option<&HashMap>> { + self.prefixes_bitmaps.get(&key) + } +} + +unsafe impl<'a, 'rtxn> Sync for FrozenPrefixIntegerBitmaps<'a, 'rtxn> {} + #[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")] fn delete_prefixes( wtxn: &mut RwTxn,