From d28f18658e58ac8dab0f9bb7346e70fbf5ac4f49 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Tue, 18 Jan 2022 14:02:24 +0100 Subject: [PATCH 01/14] Retrieve the previous version of the words prefixes FST --- milli/src/update/index_documents/mod.rs | 5 ++++- milli/src/update/word_prefix_pair_proximity_docids.rs | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index 4fbb75d5f..cdea37d54 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -353,6 +353,9 @@ where total_databases: TOTAL_POSTING_DATABASE_COUNT, }); + let previous_words_prefixes_fst = + self.index.words_prefixes_fst(self.wtxn)?.map_data(|cow| cow.into_owned())?; + // Run the words prefixes update operation. let mut builder = WordsPrefixesFst::new(self.wtxn, self.index); if let Some(value) = self.config.words_prefix_threshold { @@ -389,7 +392,7 @@ where builder.chunk_compression_level = self.indexer_config.chunk_compression_level; builder.max_nb_chunks = self.indexer_config.max_nb_chunks; builder.max_memory = self.indexer_config.max_memory; - builder.execute()?; + builder.execute(&previous_words_prefixes_fst)?; databases_seen += 1; (self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase { diff --git a/milli/src/update/word_prefix_pair_proximity_docids.rs b/milli/src/update/word_prefix_pair_proximity_docids.rs index 2dc00fb90..2788d5d35 100644 --- a/milli/src/update/word_prefix_pair_proximity_docids.rs +++ b/milli/src/update/word_prefix_pair_proximity_docids.rs @@ -61,7 +61,7 @@ impl<'t, 'u, 'i> WordPrefixPairProximityDocids<'t, 'u, 'i> { } #[logging_timer::time("WordPrefixPairProximityDocids::{}")] - pub fn execute(self) -> Result<()> { + pub fn execute>(self, old_prefix_fst: &fst::Set) -> Result<()> { debug!("Computing and writing the word prefix pair proximity docids into LMDB on disk..."); self.index.word_prefix_pair_proximity_docids.clear(self.wtxn)?; From 822f67e9ad1f17b8921c11aefd3d715170c96b70 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Tue, 18 Jan 2022 14:59:51 +0100 Subject: [PATCH 02/14] Bring the newly created word pair proximity docids --- milli/src/update/index_documents/mod.rs | 37 ++++++++++++++++--- .../word_prefix_pair_proximity_docids.rs | 9 ++++- 2 files changed, 38 insertions(+), 8 deletions(-) diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index cdea37d54..0ef05dba5 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -16,11 +16,12 @@ use typed_chunk::{write_typed_chunk_into_index, TypedChunk}; pub use self::helpers::{ create_sorter, create_writer, merge_cbo_roaring_bitmaps, merge_roaring_bitmaps, - sorter_into_lmdb_database, write_into_lmdb_database, writer_into_reader, MergeFn, + sorter_into_lmdb_database, write_into_lmdb_database, writer_into_reader, ClonableMmap, MergeFn, }; use self::helpers::{grenad_obkv_into_chunks, GrenadParameters}; pub use self::transform::{Transform, TransformOutput}; use crate::documents::DocumentBatchReader; +pub use crate::update::index_documents::helpers::CursorClonableMmap; use crate::update::{ self, Facets, IndexerConfig, UpdateIndexingStep, WordPrefixDocids, WordPrefixPairProximityDocids, WordPrefixPositionDocids, WordsPrefixesFst, @@ -282,6 +283,7 @@ where let index_documents_ids = self.index.documents_ids(self.wtxn)?; let index_is_empty = index_documents_ids.len() == 0; let mut final_documents_ids = RoaringBitmap::new(); + let mut word_pair_proximity_docids = Vec::new(); let mut databases_seen = 0; (self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase { @@ -289,9 +291,26 @@ where total_databases: TOTAL_POSTING_DATABASE_COUNT, }); - for typed_chunk in lmdb_writer_rx { + for result in lmdb_writer_rx { + let typed_chunk = match result? { + TypedChunk::WordPairProximityDocids(chunk) => { + // We extract and mmap our chunk file to be able to get it for next processes. + let mut file = chunk.into_inner(); + let mmap = unsafe { memmap2::Mmap::map(&file)? }; + let cursor_mmap = CursorClonableMmap::new(ClonableMmap::from(mmap)); + let chunk = grenad::Reader::new(cursor_mmap)?; + word_pair_proximity_docids.push(chunk); + + // We reconstruct our typed-chunk back. + file.rewind()?; + let chunk = grenad::Reader::new(file)?; + TypedChunk::WordPairProximityDocids(chunk) + } + otherwise => otherwise, + }; + let (docids, is_merged_database) = - write_typed_chunk_into_index(typed_chunk?, &self.index, self.wtxn, index_is_empty)?; + write_typed_chunk_into_index(typed_chunk, &self.index, self.wtxn, index_is_empty)?; if !docids.is_empty() { final_documents_ids |= docids; let documents_seen_count = final_documents_ids.len(); @@ -325,13 +344,19 @@ where let all_documents_ids = index_documents_ids | new_documents_ids | replaced_documents_ids; self.index.put_documents_ids(self.wtxn, &all_documents_ids)?; - self.execute_prefix_databases()?; + self.execute_prefix_databases(word_pair_proximity_docids)?; Ok(all_documents_ids.len()) } #[logging_timer::time("IndexDocuments::{}")] - pub fn execute_prefix_databases(self) -> Result<()> { + pub fn execute_prefix_databases( + self, + word_pair_proximity_docids: Vec>, + ) -> Result<()> + where + F: Fn(UpdateIndexingStep) + Sync, + { // Merged databases are already been indexed, we start from this count; let mut databases_seen = MERGED_DATABASE_COUNT; @@ -392,7 +417,7 @@ where builder.chunk_compression_level = self.indexer_config.chunk_compression_level; builder.max_nb_chunks = self.indexer_config.max_nb_chunks; builder.max_memory = self.indexer_config.max_memory; - builder.execute(&previous_words_prefixes_fst)?; + builder.execute(word_pair_proximity_docids, &previous_words_prefixes_fst)?; databases_seen += 1; (self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase { diff --git a/milli/src/update/word_prefix_pair_proximity_docids.rs b/milli/src/update/word_prefix_pair_proximity_docids.rs index 2788d5d35..6a49a0a63 100644 --- a/milli/src/update/word_prefix_pair_proximity_docids.rs +++ b/milli/src/update/word_prefix_pair_proximity_docids.rs @@ -7,7 +7,8 @@ use log::debug; use slice_group_by::GroupBy; use crate::update::index_documents::{ - create_sorter, merge_cbo_roaring_bitmaps, sorter_into_lmdb_database, MergeFn, WriteMethod, + create_sorter, merge_cbo_roaring_bitmaps, sorter_into_lmdb_database, CursorClonableMmap, + MergeFn, WriteMethod, }; use crate::{Index, Result}; @@ -61,7 +62,11 @@ impl<'t, 'u, 'i> WordPrefixPairProximityDocids<'t, 'u, 'i> { } #[logging_timer::time("WordPrefixPairProximityDocids::{}")] - pub fn execute>(self, old_prefix_fst: &fst::Set) -> Result<()> { + pub fn execute>( + self, + new_word_pair_proximity_docids: Vec>, + old_prefix_fst: &fst::Set, + ) -> Result<()> { debug!("Computing and writing the word prefix pair proximity docids into LMDB on disk..."); self.index.word_prefix_pair_proximity_docids.clear(self.wtxn)?; From c90fa95f93103c11b56d661b851b50a524d29f09 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Tue, 18 Jan 2022 15:23:18 +0100 Subject: [PATCH 03/14] Only compute the word prefix pairs on the created word pair proximities --- .../word_prefix_pair_proximity_docids.rs | 63 ++++++++++++++----- 1 file changed, 47 insertions(+), 16 deletions(-) diff --git a/milli/src/update/word_prefix_pair_proximity_docids.rs b/milli/src/update/word_prefix_pair_proximity_docids.rs index 6a49a0a63..d35f39c10 100644 --- a/milli/src/update/word_prefix_pair_proximity_docids.rs +++ b/milli/src/update/word_prefix_pair_proximity_docids.rs @@ -1,8 +1,8 @@ -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; -use fst::IntoStreamer; -use grenad::CompressionType; -use heed::types::ByteSlice; +use fst::{IntoStreamer, Streamer}; +use grenad::{CompressionType, MergerBuilder}; +use heed::BytesDecode; use log::debug; use slice_group_by::GroupBy; @@ -10,7 +10,7 @@ use crate::update::index_documents::{ create_sorter, merge_cbo_roaring_bitmaps, sorter_into_lmdb_database, CursorClonableMmap, MergeFn, WriteMethod, }; -use crate::{Index, Result}; +use crate::{Index, Result, StrStrU8Codec}; pub struct WordPrefixPairProximityDocids<'t, 'u, 'i> { wtxn: &'t mut heed::RwTxn<'i, 'u>, @@ -69,9 +69,12 @@ impl<'t, 'u, 'i> WordPrefixPairProximityDocids<'t, 'u, 'i> { ) -> Result<()> { debug!("Computing and writing the word prefix pair proximity docids into LMDB on disk..."); - self.index.word_prefix_pair_proximity_docids.clear(self.wtxn)?; + // We retrieve and merge the created word pair proximities docids entries + // for the newly added documents. + let mut wppd_merger = MergerBuilder::new(merge_cbo_roaring_bitmaps); + wppd_merger.extend(new_word_pair_proximity_docids); + let mut wppd_iter = wppd_merger.build().into_merger_iter()?; - // Here we create a sorter akin to the previous one. let mut word_prefix_pair_proximity_docids_sorter = create_sorter( merge_cbo_roaring_bitmaps, self.chunk_compression_type, @@ -85,13 +88,14 @@ impl<'t, 'u, 'i> WordPrefixPairProximityDocids<'t, 'u, 'i> { let prefix_fst_keys: Vec<_> = prefix_fst_keys.as_slice().linear_group_by_key(|x| x.chars().nth(0).unwrap()).collect(); - let mut db = - self.index.word_pair_proximity_docids.remap_data_type::().iter(self.wtxn)?; + // We compute the set of prefixes that are no more part of the prefix fst. + let suppr_pw = stream_into_hashset(old_prefix_fst.op().add(&prefix_fst).difference()); let mut buffer = Vec::new(); let mut current_prefixes: Option<&&[String]> = None; let mut prefixes_cache = HashMap::new(); - while let Some(((w1, w2, prox), data)) = db.next().transpose()? { + while let Some((key, data)) = wppd_iter.next()? { + let (w1, w2, prox) = StrStrU8Codec::bytes_decode(key).ok_or(heed::Error::Decoding)?; if prox > self.max_proximity { continue; } @@ -118,9 +122,9 @@ impl<'t, 'u, 'i> WordPrefixPairProximityDocids<'t, 'u, 'i> { buffer.push(prox); match prefixes_cache.get_mut(&buffer) { - Some(value) => value.push(data), + Some(value) => value.push(data.to_owned()), None => { - prefixes_cache.insert(buffer.clone(), vec![data]); + prefixes_cache.insert(buffer.clone(), vec![data.to_owned()]); } } } @@ -134,15 +138,28 @@ impl<'t, 'u, 'i> WordPrefixPairProximityDocids<'t, 'u, 'i> { )?; drop(prefix_fst); - drop(db); - // We finally write the word prefix pair proximity docids into the LMDB database. + // All of the word prefix pairs in the database that have a w2 + // that is contained in the `suppr_pw` set must be removed as well. + let mut iter = + self.index.word_prefix_pair_proximity_docids.iter_mut(self.wtxn)?.lazily_decode_data(); + while let Some(((_, w2, _), _)) = iter.next().transpose()? { + if suppr_pw.contains(w2.as_bytes()) { + // Delete this entry as the w2 prefix is no more in the words prefix fst. + unsafe { iter.del_current()? }; + } + } + + drop(iter); + + // We finally write and merge the new word prefix pair proximity docids + // in the LMDB database. sorter_into_lmdb_database( self.wtxn, *self.index.word_prefix_pair_proximity_docids.as_polymorph(), word_prefix_pair_proximity_docids_sorter, merge_cbo_roaring_bitmaps, - WriteMethod::Append, + WriteMethod::GetMergePut, )?; Ok(()) @@ -150,7 +167,7 @@ impl<'t, 'u, 'i> WordPrefixPairProximityDocids<'t, 'u, 'i> { } fn write_prefixes_in_sorter( - prefixes: &mut HashMap, Vec<&[u8]>>, + prefixes: &mut HashMap, Vec>>, sorter: &mut grenad::Sorter, ) -> Result<()> { for (key, data_slices) in prefixes.drain() { @@ -161,3 +178,17 @@ fn write_prefixes_in_sorter( Ok(()) } + +/// Converts an fst Stream into an HashSet. +fn stream_into_hashset<'f, I, S>(stream: I) -> HashSet> +where + I: for<'a> IntoStreamer<'a, Into = S, Item = &'a [u8]>, + S: 'f + for<'a> Streamer<'a, Item = &'a [u8]>, +{ + let mut hashset = HashSet::new(); + let mut stream = stream.into_stream(); + while let Some(value) = stream.next() { + hashset.insert(value.to_owned()); + } + hashset +} From 5404bc02dddaf790b23d71cb1834b53a461c06fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Wed, 19 Jan 2022 14:30:03 +0100 Subject: [PATCH 04/14] Move the fst_stream_into_hashset method in the helper methods --- .../src/update/index_documents/helpers/mod.rs | 16 +++++++++++++ milli/src/update/index_documents/mod.rs | 5 ++-- .../word_prefix_pair_proximity_docids.rs | 24 ++++--------------- 3 files changed, 24 insertions(+), 21 deletions(-) diff --git a/milli/src/update/index_documents/helpers/mod.rs b/milli/src/update/index_documents/helpers/mod.rs index 128288982..4086bfb7f 100644 --- a/milli/src/update/index_documents/helpers/mod.rs +++ b/milli/src/update/index_documents/helpers/mod.rs @@ -2,9 +2,11 @@ mod clonable_mmap; mod grenad_helpers; mod merge_functions; +use std::collections::HashSet; use std::convert::{TryFrom, TryInto}; pub use clonable_mmap::{ClonableMmap, CursorClonableMmap}; +use fst::{IntoStreamer, Streamer}; pub use grenad_helpers::{ create_sorter, create_writer, grenad_obkv_into_chunks, into_clonable_grenad, merge_readers, sorter_into_lmdb_database, sorter_into_reader, write_into_lmdb_database, writer_into_reader, @@ -43,3 +45,17 @@ where pub fn read_u32_ne_bytes(bytes: &[u8]) -> impl Iterator + '_ { bytes.chunks_exact(4).flat_map(TryInto::try_into).map(u32::from_ne_bytes) } + +/// Converts an fst Stream into an HashSet. +pub fn fst_stream_into_hashset<'f, I, S>(stream: I) -> HashSet> +where + I: for<'a> IntoStreamer<'a, Into = S, Item = &'a [u8]>, + S: 'f + for<'a> Streamer<'a, Item = &'a [u8]>, +{ + let mut hashset = HashSet::new(); + let mut stream = stream.into_stream(); + while let Some(value) = stream.next() { + hashset.insert(value.to_owned()); + } + hashset +} diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index 0ef05dba5..ae8e28b33 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -15,8 +15,9 @@ use serde::{Deserialize, Serialize}; use typed_chunk::{write_typed_chunk_into_index, TypedChunk}; pub use self::helpers::{ - create_sorter, create_writer, merge_cbo_roaring_bitmaps, merge_roaring_bitmaps, - sorter_into_lmdb_database, write_into_lmdb_database, writer_into_reader, ClonableMmap, MergeFn, + create_sorter, create_writer, fst_stream_into_hashset, merge_cbo_roaring_bitmaps, + merge_roaring_bitmaps, sorter_into_lmdb_database, write_into_lmdb_database, writer_into_reader, + ClonableMmap, MergeFn, }; use self::helpers::{grenad_obkv_into_chunks, GrenadParameters}; pub use self::transform::{Transform, TransformOutput}; diff --git a/milli/src/update/word_prefix_pair_proximity_docids.rs b/milli/src/update/word_prefix_pair_proximity_docids.rs index d35f39c10..50b7a978b 100644 --- a/milli/src/update/word_prefix_pair_proximity_docids.rs +++ b/milli/src/update/word_prefix_pair_proximity_docids.rs @@ -1,14 +1,14 @@ -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; -use fst::{IntoStreamer, Streamer}; +use fst::IntoStreamer; use grenad::{CompressionType, MergerBuilder}; use heed::BytesDecode; use log::debug; use slice_group_by::GroupBy; use crate::update::index_documents::{ - create_sorter, merge_cbo_roaring_bitmaps, sorter_into_lmdb_database, CursorClonableMmap, - MergeFn, WriteMethod, + create_sorter, fst_stream_into_hashset, merge_cbo_roaring_bitmaps, sorter_into_lmdb_database, + CursorClonableMmap, MergeFn, WriteMethod, }; use crate::{Index, Result, StrStrU8Codec}; @@ -89,7 +89,7 @@ impl<'t, 'u, 'i> WordPrefixPairProximityDocids<'t, 'u, 'i> { prefix_fst_keys.as_slice().linear_group_by_key(|x| x.chars().nth(0).unwrap()).collect(); // We compute the set of prefixes that are no more part of the prefix fst. - let suppr_pw = stream_into_hashset(old_prefix_fst.op().add(&prefix_fst).difference()); + let suppr_pw = fst_stream_into_hashset(old_prefix_fst.op().add(&prefix_fst).difference()); let mut buffer = Vec::new(); let mut current_prefixes: Option<&&[String]> = None; @@ -178,17 +178,3 @@ fn write_prefixes_in_sorter( Ok(()) } - -/// Converts an fst Stream into an HashSet. -fn stream_into_hashset<'f, I, S>(stream: I) -> HashSet> -where - I: for<'a> IntoStreamer<'a, Into = S, Item = &'a [u8]>, - S: 'f + for<'a> Streamer<'a, Item = &'a [u8]>, -{ - let mut hashset = HashSet::new(); - let mut stream = stream.into_stream(); - while let Some(value) = stream.next() { - hashset.insert(value.to_owned()); - } - hashset -} From 28692f65be2166ec12617fd87306bf896fb07c1a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Wed, 19 Jan 2022 15:02:04 +0100 Subject: [PATCH 05/14] Rework the WordPrefixDocids update to compute a subset of the database --- milli/src/update/index_documents/mod.rs | 19 +++++- milli/src/update/word_prefix_docids.rs | 87 +++++++++++++++++++------ 2 files changed, 85 insertions(+), 21 deletions(-) diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index ae8e28b33..77b761e6e 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -285,6 +285,7 @@ where let index_is_empty = index_documents_ids.len() == 0; let mut final_documents_ids = RoaringBitmap::new(); let mut word_pair_proximity_docids = Vec::new(); + let mut word_docids = Vec::new(); let mut databases_seen = 0; (self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase { @@ -294,6 +295,19 @@ where for result in lmdb_writer_rx { let typed_chunk = match result? { + TypedChunk::WordDocids(chunk) => { + // We extract and mmap our chunk file to be able to get it for next processes. + let mut file = chunk.into_inner(); + let mmap = unsafe { memmap2::Mmap::map(&file)? }; + let cursor_mmap = CursorClonableMmap::new(ClonableMmap::from(mmap)); + let chunk = grenad::Reader::new(cursor_mmap)?; + word_docids.push(chunk); + + // We reconstruct our typed-chunk back. + file.rewind()?; + let chunk = grenad::Reader::new(file)?; + TypedChunk::WordDocids(chunk) + } TypedChunk::WordPairProximityDocids(chunk) => { // We extract and mmap our chunk file to be able to get it for next processes. let mut file = chunk.into_inner(); @@ -345,7 +359,7 @@ where let all_documents_ids = index_documents_ids | new_documents_ids | replaced_documents_ids; self.index.put_documents_ids(self.wtxn, &all_documents_ids)?; - self.execute_prefix_databases(word_pair_proximity_docids)?; + self.execute_prefix_databases(word_docids, word_pair_proximity_docids)?; Ok(all_documents_ids.len()) } @@ -353,6 +367,7 @@ where #[logging_timer::time("IndexDocuments::{}")] pub fn execute_prefix_databases( self, + word_docids: Vec>, word_pair_proximity_docids: Vec>, ) -> Result<()> where @@ -404,7 +419,7 @@ where builder.chunk_compression_level = self.indexer_config.chunk_compression_level; builder.max_nb_chunks = self.indexer_config.max_nb_chunks; builder.max_memory = self.indexer_config.max_memory; - builder.execute()?; + builder.execute(word_docids, &previous_words_prefixes_fst)?; databases_seen += 1; (self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase { diff --git a/milli/src/update/word_prefix_docids.rs b/milli/src/update/word_prefix_docids.rs index 30dabf1ae..0703707f0 100644 --- a/milli/src/update/word_prefix_docids.rs +++ b/milli/src/update/word_prefix_docids.rs @@ -1,11 +1,12 @@ -use std::str; +use std::collections::HashMap; -use fst::Streamer; -use grenad::CompressionType; -use heed::types::ByteSlice; +use fst::IntoStreamer; +use grenad::{CompressionType, MergerBuilder}; +use slice_group_by::GroupBy; use crate::update::index_documents::{ - create_sorter, merge_roaring_bitmaps, sorter_into_lmdb_database, WriteMethod, + create_sorter, fst_stream_into_hashset, merge_roaring_bitmaps, sorter_into_lmdb_database, + CursorClonableMmap, MergeFn, WriteMethod, }; use crate::{Index, Result}; @@ -34,11 +35,18 @@ impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> { } #[logging_timer::time("WordPrefixDocids::{}")] - pub fn execute(self) -> Result<()> { - // Clear the word prefix docids database. - self.index.word_prefix_docids.clear(self.wtxn)?; - + pub fn execute>( + self, + new_word_docids: Vec>, + old_prefix_fst: &fst::Set, + ) -> Result<()> { let prefix_fst = self.index.words_prefixes_fst(self.wtxn)?; + let prefix_fst_keys = prefix_fst.into_stream().into_strs()?; + let prefix_fst_keys: Vec<_> = + prefix_fst_keys.as_slice().linear_group_by_key(|x| x.chars().nth(0).unwrap()).collect(); + + // We compute the set of prefixes that are no more part of the prefix fst. + let suppr_pw = fst_stream_into_hashset(old_prefix_fst.op().add(&prefix_fst).difference()); // It is forbidden to keep a mutable reference into the database // and write into it at the same time, therefore we write into another file. @@ -50,18 +58,46 @@ impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> { self.max_memory, ); - // We iterate over all the prefixes and retrieve the corresponding docids. - let mut prefix_stream = prefix_fst.stream(); - while let Some(bytes) = prefix_stream.next() { - let prefix = str::from_utf8(bytes)?; - let db = self.index.word_docids.remap_data_type::(); - for result in db.prefix_iter(self.wtxn, prefix)? { - let (_word, data) = result?; - prefix_docids_sorter.insert(prefix, data)?; + let mut word_docids_merger = MergerBuilder::new(merge_roaring_bitmaps); + word_docids_merger.extend(new_word_docids); + let mut word_docids_iter = word_docids_merger.build().into_merger_iter()?; + + let mut current_prefixes: Option<&&[String]> = None; + let mut prefixes_cache = HashMap::new(); + while let Some((word, data)) = word_docids_iter.next()? { + current_prefixes = match current_prefixes.take() { + Some(prefixes) if word.starts_with(&prefixes[0].as_bytes()) => Some(prefixes), + _otherwise => { + write_prefixes_in_sorter(&mut prefixes_cache, &mut prefix_docids_sorter)?; + prefix_fst_keys + .iter() + .find(|prefixes| word.starts_with(&prefixes[0].as_bytes())) + } + }; + + if let Some(prefixes) = current_prefixes { + for prefix in prefixes.iter() { + if word.starts_with(prefix.as_bytes()) { + match prefixes_cache.get_mut(prefix.as_bytes()) { + Some(value) => value.push(data.to_owned()), + None => { + prefixes_cache.insert(prefix.clone().into(), vec![data.to_owned()]); + } + } + } + } } } - drop(prefix_fst); + // We remove all the entries that are no more required in this word prefix docids database. + let mut iter = self.index.word_prefix_docids.iter_mut(self.wtxn)?.lazily_decode_data(); + while let Some((prefix, _)) = iter.next().transpose()? { + if suppr_pw.contains(prefix.as_bytes()) { + unsafe { iter.del_current()? }; + } + } + + drop(iter); // We finally write the word prefix docids into the LMDB database. sorter_into_lmdb_database( @@ -69,9 +105,22 @@ impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> { *self.index.word_prefix_docids.as_polymorph(), prefix_docids_sorter, merge_roaring_bitmaps, - WriteMethod::Append, + WriteMethod::GetMergePut, )?; Ok(()) } } + +fn write_prefixes_in_sorter( + prefixes: &mut HashMap, Vec>>, + sorter: &mut grenad::Sorter, +) -> Result<()> { + for (key, data_slices) in prefixes.drain() { + for data in data_slices { + sorter.insert(&key, data)?; + } + } + + Ok(()) +} From 2ec854210506c615b4986f53029bae087f8e4a54 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Tue, 25 Jan 2022 15:48:48 +0100 Subject: [PATCH 06/14] Rework the WordPrefixDocids update to compute a subset of the database --- milli/src/update/word_prefix_docids.rs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/milli/src/update/word_prefix_docids.rs b/milli/src/update/word_prefix_docids.rs index 0703707f0..105083d87 100644 --- a/milli/src/update/word_prefix_docids.rs +++ b/milli/src/update/word_prefix_docids.rs @@ -89,6 +89,21 @@ impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> { } } + write_prefixes_in_sorter(&mut prefixes_cache, &mut prefix_docids_sorter)?; + + // We fetch the docids associated to the newly added word prefix fst only. + let db = self.index.word_docids.remap_data_type::(); + let mut new_prefixes_stream = prefix_fst.op().add(old_prefix_fst).difference(); + while let Some(bytes) = new_prefixes_stream.next() { + let prefix = std::str::from_utf8(bytes)?; + for result in db.prefix_iter(self.wtxn, prefix)? { + let (_word, data) = result?; + prefix_docids_sorter.insert(prefix, data)?; + } + } + + drop(new_prefixes_stream); + // We remove all the entries that are no more required in this word prefix docids database. let mut iter = self.index.word_prefix_docids.iter_mut(self.wtxn)?.lazily_decode_data(); while let Some((prefix, _)) = iter.next().transpose()? { From d59e5593179accbfe71dda09b482bbed69c8cb1e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Thu, 20 Jan 2022 17:55:52 +0100 Subject: [PATCH 07/14] Fix the computation of the newly added and common prefix words --- .../src/update/index_documents/helpers/mod.rs | 17 +++++++++++++++- milli/src/update/index_documents/mod.rs | 6 +++--- milli/src/update/word_prefix_docids.rs | 20 ++++++++++++------- 3 files changed, 32 insertions(+), 11 deletions(-) diff --git a/milli/src/update/index_documents/helpers/mod.rs b/milli/src/update/index_documents/helpers/mod.rs index 4086bfb7f..bbb2b9b95 100644 --- a/milli/src/update/index_documents/helpers/mod.rs +++ b/milli/src/update/index_documents/helpers/mod.rs @@ -46,7 +46,7 @@ pub fn read_u32_ne_bytes(bytes: &[u8]) -> impl Iterator + '_ { bytes.chunks_exact(4).flat_map(TryInto::try_into).map(u32::from_ne_bytes) } -/// Converts an fst Stream into an HashSet. +/// Converts an fst Stream into an HashSet of Strings. pub fn fst_stream_into_hashset<'f, I, S>(stream: I) -> HashSet> where I: for<'a> IntoStreamer<'a, Into = S, Item = &'a [u8]>, @@ -59,3 +59,18 @@ where } hashset } + +// Converts an fst Stream into a Vec of Strings. +pub fn fst_stream_into_vec<'f, I, S>(stream: I) -> Vec +where + I: for<'a> IntoStreamer<'a, Into = S, Item = &'a [u8]>, + S: 'f + for<'a> Streamer<'a, Item = &'a [u8]>, +{ + let mut strings = Vec::new(); + let mut stream = stream.into_stream(); + while let Some(word) = stream.next() { + let s = std::str::from_utf8(word).unwrap(); + strings.push(s.to_owned()); + } + strings +} diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index 77b761e6e..ad3f73d0d 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -15,9 +15,9 @@ use serde::{Deserialize, Serialize}; use typed_chunk::{write_typed_chunk_into_index, TypedChunk}; pub use self::helpers::{ - create_sorter, create_writer, fst_stream_into_hashset, merge_cbo_roaring_bitmaps, - merge_roaring_bitmaps, sorter_into_lmdb_database, write_into_lmdb_database, writer_into_reader, - ClonableMmap, MergeFn, + create_sorter, create_writer, fst_stream_into_hashset, fst_stream_into_vec, + merge_cbo_roaring_bitmaps, merge_roaring_bitmaps, sorter_into_lmdb_database, + write_into_lmdb_database, writer_into_reader, ClonableMmap, MergeFn, }; use self::helpers::{grenad_obkv_into_chunks, GrenadParameters}; pub use self::transform::{Transform, TransformOutput}; diff --git a/milli/src/update/word_prefix_docids.rs b/milli/src/update/word_prefix_docids.rs index 105083d87..1e2996c9b 100644 --- a/milli/src/update/word_prefix_docids.rs +++ b/milli/src/update/word_prefix_docids.rs @@ -1,12 +1,13 @@ use std::collections::HashMap; -use fst::IntoStreamer; +use fst::Streamer; use grenad::{CompressionType, MergerBuilder}; +use heed::types::ByteSlice; use slice_group_by::GroupBy; use crate::update::index_documents::{ - create_sorter, fst_stream_into_hashset, merge_roaring_bitmaps, sorter_into_lmdb_database, - CursorClonableMmap, MergeFn, WriteMethod, + create_sorter, fst_stream_into_hashset, fst_stream_into_vec, merge_roaring_bitmaps, + sorter_into_lmdb_database, CursorClonableMmap, MergeFn, WriteMethod, }; use crate::{Index, Result}; @@ -41,9 +42,14 @@ impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> { old_prefix_fst: &fst::Set, ) -> Result<()> { let prefix_fst = self.index.words_prefixes_fst(self.wtxn)?; - let prefix_fst_keys = prefix_fst.into_stream().into_strs()?; - let prefix_fst_keys: Vec<_> = - prefix_fst_keys.as_slice().linear_group_by_key(|x| x.chars().nth(0).unwrap()).collect(); + + // We retrieve the common words between the previous and new prefix word fst. + let common_prefix_fst_keys = + fst_stream_into_vec(old_prefix_fst.op().add(&prefix_fst).intersection()); + let common_prefix_fst_keys: Vec<_> = common_prefix_fst_keys + .as_slice() + .linear_group_by_key(|x| x.chars().nth(0).unwrap()) + .collect(); // We compute the set of prefixes that are no more part of the prefix fst. let suppr_pw = fst_stream_into_hashset(old_prefix_fst.op().add(&prefix_fst).difference()); @@ -69,7 +75,7 @@ impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> { Some(prefixes) if word.starts_with(&prefixes[0].as_bytes()) => Some(prefixes), _otherwise => { write_prefixes_in_sorter(&mut prefixes_cache, &mut prefix_docids_sorter)?; - prefix_fst_keys + common_prefix_fst_keys .iter() .find(|prefixes| word.starts_with(&prefixes[0].as_bytes())) } From e760e027376e4b31bdb287ced1239dff25f9f65a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Tue, 25 Jan 2022 11:18:20 +0100 Subject: [PATCH 08/14] Fix the computation of the newly added and common prefix pair proximity words --- .../word_prefix_pair_proximity_docids.rs | 97 ++++++++++++++++--- 1 file changed, 83 insertions(+), 14 deletions(-) diff --git a/milli/src/update/word_prefix_pair_proximity_docids.rs b/milli/src/update/word_prefix_pair_proximity_docids.rs index 50b7a978b..dcc5db614 100644 --- a/milli/src/update/word_prefix_pair_proximity_docids.rs +++ b/milli/src/update/word_prefix_pair_proximity_docids.rs @@ -1,14 +1,14 @@ use std::collections::HashMap; -use fst::IntoStreamer; use grenad::{CompressionType, MergerBuilder}; +use heed::types::ByteSlice; use heed::BytesDecode; use log::debug; use slice_group_by::GroupBy; use crate::update::index_documents::{ - create_sorter, fst_stream_into_hashset, merge_cbo_roaring_bitmaps, sorter_into_lmdb_database, - CursorClonableMmap, MergeFn, WriteMethod, + create_sorter, fst_stream_into_hashset, fst_stream_into_vec, merge_cbo_roaring_bitmaps, + sorter_into_lmdb_database, CursorClonableMmap, MergeFn, WriteMethod, }; use crate::{Index, Result, StrStrU8Codec}; @@ -75,6 +75,27 @@ impl<'t, 'u, 'i> WordPrefixPairProximityDocids<'t, 'u, 'i> { wppd_merger.extend(new_word_pair_proximity_docids); let mut wppd_iter = wppd_merger.build().into_merger_iter()?; + let prefix_fst = self.index.words_prefixes_fst(self.wtxn)?; + + // We retrieve the common words between the previous and new prefix word fst. + let common_prefix_fst_keys = + fst_stream_into_vec(old_prefix_fst.op().add(&prefix_fst).intersection()); + let common_prefix_fst_keys: Vec<_> = common_prefix_fst_keys + .as_slice() + .linear_group_by_key(|x| x.chars().nth(0).unwrap()) + .collect(); + + // We retrieve the newly added words between the previous and new prefix word fst. + let new_prefix_fst_keys = + fst_stream_into_vec(prefix_fst.op().add(old_prefix_fst).difference()); + let new_prefix_fst_keys: Vec<_> = new_prefix_fst_keys + .as_slice() + .linear_group_by_key(|x| x.chars().nth(0).unwrap()) + .collect(); + + // We compute the set of prefixes that are no more part of the prefix fst. + let suppr_pw = fst_stream_into_hashset(old_prefix_fst.op().add(&prefix_fst).difference()); + let mut word_prefix_pair_proximity_docids_sorter = create_sorter( merge_cbo_roaring_bitmaps, self.chunk_compression_type, @@ -83,14 +104,8 @@ impl<'t, 'u, 'i> WordPrefixPairProximityDocids<'t, 'u, 'i> { self.max_memory, ); - let prefix_fst = self.index.words_prefixes_fst(self.wtxn)?; - let prefix_fst_keys = prefix_fst.into_stream().into_strs()?; - let prefix_fst_keys: Vec<_> = - prefix_fst_keys.as_slice().linear_group_by_key(|x| x.chars().nth(0).unwrap()).collect(); - - // We compute the set of prefixes that are no more part of the prefix fst. - let suppr_pw = fst_stream_into_hashset(old_prefix_fst.op().add(&prefix_fst).difference()); - + // We compute the prefix docids associated with the common prefixes between + // the old and new word prefix fst. let mut buffer = Vec::new(); let mut current_prefixes: Option<&&[String]> = None; let mut prefixes_cache = HashMap::new(); @@ -107,7 +122,57 @@ impl<'t, 'u, 'i> WordPrefixPairProximityDocids<'t, 'u, 'i> { &mut prefixes_cache, &mut word_prefix_pair_proximity_docids_sorter, )?; - prefix_fst_keys.iter().find(|prefixes| w2.starts_with(&prefixes[0])) + common_prefix_fst_keys.iter().find(|prefixes| w2.starts_with(&prefixes[0])) + } + }; + + if let Some(prefixes) = current_prefixes { + buffer.clear(); + buffer.extend_from_slice(w1.as_bytes()); + buffer.push(0); + for prefix in prefixes.iter() { + if prefix.len() <= self.max_prefix_length && w2.starts_with(prefix) { + buffer.truncate(w1.len() + 1); + buffer.extend_from_slice(prefix.as_bytes()); + buffer.push(prox); + + match prefixes_cache.get_mut(&buffer) { + Some(value) => value.push(data.to_owned()), + None => { + prefixes_cache.insert(buffer.clone(), vec![data.to_owned()]); + } + } + } + } + } + } + + write_prefixes_in_sorter( + &mut prefixes_cache, + &mut word_prefix_pair_proximity_docids_sorter, + )?; + + // We compute the prefix docids associated with the newly added prefixes + // in the new word prefix fst. + let mut db_iter = + self.index.word_pair_proximity_docids.remap_data_type::().iter(self.wtxn)?; + + let mut buffer = Vec::new(); + let mut current_prefixes: Option<&&[String]> = None; + let mut prefixes_cache = HashMap::new(); + while let Some(((w1, w2, prox), data)) = db_iter.next().transpose()? { + if prox > self.max_proximity { + continue; + } + + current_prefixes = match current_prefixes.take() { + Some(prefixes) if w2.starts_with(&prefixes[0]) => Some(prefixes), + _otherwise => { + write_prefixes_in_sorter( + &mut prefixes_cache, + &mut word_prefix_pair_proximity_docids_sorter, + )?; + new_prefix_fst_keys.iter().find(|prefixes| w2.starts_with(&prefixes[0])) } }; @@ -138,11 +203,15 @@ impl<'t, 'u, 'i> WordPrefixPairProximityDocids<'t, 'u, 'i> { )?; drop(prefix_fst); + drop(db_iter); // All of the word prefix pairs in the database that have a w2 // that is contained in the `suppr_pw` set must be removed as well. - let mut iter = - self.index.word_prefix_pair_proximity_docids.iter_mut(self.wtxn)?.lazily_decode_data(); + let mut iter = self + .index + .word_prefix_pair_proximity_docids + .remap_data_type::() + .iter_mut(self.wtxn)?; while let Some(((_, w2, _), _)) = iter.next().transpose()? { if suppr_pw.contains(w2.as_bytes()) { // Delete this entry as the w2 prefix is no more in the words prefix fst. From dbba5fd461337cc9ab3b9ba82e9a3bb64c444d73 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Tue, 25 Jan 2022 11:34:56 +0100 Subject: [PATCH 09/14] Create a function to simplify the word prefix pair proximity docids compute --- .../word_prefix_pair_proximity_docids.rs | 132 ++++++++++-------- 1 file changed, 72 insertions(+), 60 deletions(-) diff --git a/milli/src/update/word_prefix_pair_proximity_docids.rs b/milli/src/update/word_prefix_pair_proximity_docids.rs index dcc5db614..f846e8d9e 100644 --- a/milli/src/update/word_prefix_pair_proximity_docids.rs +++ b/milli/src/update/word_prefix_pair_proximity_docids.rs @@ -115,36 +115,18 @@ impl<'t, 'u, 'i> WordPrefixPairProximityDocids<'t, 'u, 'i> { continue; } - current_prefixes = match current_prefixes.take() { - Some(prefixes) if w2.starts_with(&prefixes[0]) => Some(prefixes), - _otherwise => { - write_prefixes_in_sorter( - &mut prefixes_cache, - &mut word_prefix_pair_proximity_docids_sorter, - )?; - common_prefix_fst_keys.iter().find(|prefixes| w2.starts_with(&prefixes[0])) - } - }; - - if let Some(prefixes) = current_prefixes { - buffer.clear(); - buffer.extend_from_slice(w1.as_bytes()); - buffer.push(0); - for prefix in prefixes.iter() { - if prefix.len() <= self.max_prefix_length && w2.starts_with(prefix) { - buffer.truncate(w1.len() + 1); - buffer.extend_from_slice(prefix.as_bytes()); - buffer.push(prox); - - match prefixes_cache.get_mut(&buffer) { - Some(value) => value.push(data.to_owned()), - None => { - prefixes_cache.insert(buffer.clone(), vec![data.to_owned()]); - } - } - } - } - } + insert_current_prefix_data_in_sorter( + &mut buffer, + &mut current_prefixes, + &mut prefixes_cache, + &mut word_prefix_pair_proximity_docids_sorter, + &common_prefix_fst_keys, + self.max_prefix_length, + w1, + w2, + prox, + data, + )?; } write_prefixes_in_sorter( @@ -165,36 +147,18 @@ impl<'t, 'u, 'i> WordPrefixPairProximityDocids<'t, 'u, 'i> { continue; } - current_prefixes = match current_prefixes.take() { - Some(prefixes) if w2.starts_with(&prefixes[0]) => Some(prefixes), - _otherwise => { - write_prefixes_in_sorter( - &mut prefixes_cache, - &mut word_prefix_pair_proximity_docids_sorter, - )?; - new_prefix_fst_keys.iter().find(|prefixes| w2.starts_with(&prefixes[0])) - } - }; - - if let Some(prefixes) = current_prefixes { - buffer.clear(); - buffer.extend_from_slice(w1.as_bytes()); - buffer.push(0); - for prefix in prefixes.iter() { - if prefix.len() <= self.max_prefix_length && w2.starts_with(prefix) { - buffer.truncate(w1.len() + 1); - buffer.extend_from_slice(prefix.as_bytes()); - buffer.push(prox); - - match prefixes_cache.get_mut(&buffer) { - Some(value) => value.push(data.to_owned()), - None => { - prefixes_cache.insert(buffer.clone(), vec![data.to_owned()]); - } - } - } - } - } + insert_current_prefix_data_in_sorter( + &mut buffer, + &mut current_prefixes, + &mut prefixes_cache, + &mut word_prefix_pair_proximity_docids_sorter, + &new_prefix_fst_keys, + self.max_prefix_length, + w1, + w2, + prox, + data, + )?; } write_prefixes_in_sorter( @@ -247,3 +211,51 @@ fn write_prefixes_in_sorter( Ok(()) } + +/// Computes the current prefix based on the previous and the currently iterated value +/// i.e. w1, w2, prox. It also makes sure to follow the `max_prefix_length` setting. +/// +/// Uses the current prefixes values to insert the associated data i.e. RoaringBitmap, +/// into the sorter that will, later, be inserted in the LMDB database. +fn insert_current_prefix_data_in_sorter<'a>( + buffer: &mut Vec, + current_prefixes: &mut Option<&'a &'a [String]>, + prefixes_cache: &mut HashMap, Vec>>, + word_prefix_pair_proximity_docids_sorter: &mut grenad::Sorter, + prefix_fst_keys: &'a [&'a [std::string::String]], + max_prefix_length: usize, + w1: &str, + w2: &str, + prox: u8, + data: &[u8], +) -> Result<()> { + *current_prefixes = match current_prefixes.take() { + Some(prefixes) if w2.starts_with(&prefixes[0]) => Some(prefixes), + _otherwise => { + write_prefixes_in_sorter(prefixes_cache, word_prefix_pair_proximity_docids_sorter)?; + prefix_fst_keys.iter().find(|prefixes| w2.starts_with(&prefixes[0])) + } + }; + + if let Some(prefixes) = current_prefixes { + buffer.clear(); + buffer.extend_from_slice(w1.as_bytes()); + buffer.push(0); + for prefix in prefixes.iter() { + if prefix.len() <= max_prefix_length && w2.starts_with(prefix) { + buffer.truncate(w1.len() + 1); + buffer.extend_from_slice(prefix.as_bytes()); + buffer.push(prox); + + match prefixes_cache.get_mut(buffer.as_slice()) { + Some(value) => value.push(data.to_owned()), + None => { + prefixes_cache.insert(buffer.clone(), vec![data.to_owned()]); + } + } + } + } + } + + Ok(()) +} From e9c02173cfd71dcf0acb013c064429fa61a789b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Tue, 25 Jan 2022 14:06:45 +0100 Subject: [PATCH 10/14] Rework the WordsPrefixPositionDocids update to compute a subset of the database --- milli/src/update/index_documents/mod.rs | 23 +++- .../update/words_prefix_position_docids.rs | 128 +++++++++++++++--- 2 files changed, 128 insertions(+), 23 deletions(-) diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index ad3f73d0d..7ea5c3816 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -285,6 +285,7 @@ where let index_is_empty = index_documents_ids.len() == 0; let mut final_documents_ids = RoaringBitmap::new(); let mut word_pair_proximity_docids = Vec::new(); + let mut word_position_docids = Vec::new(); let mut word_docids = Vec::new(); let mut databases_seen = 0; @@ -321,6 +322,19 @@ where let chunk = grenad::Reader::new(file)?; TypedChunk::WordPairProximityDocids(chunk) } + TypedChunk::WordPositionDocids(chunk) => { + // We extract and mmap our chunk file to be able to get it for next processes. + let mut file = chunk.into_inner(); + let mmap = unsafe { memmap2::Mmap::map(&file)? }; + let cursor_mmap = CursorClonableMmap::new(ClonableMmap::from(mmap)); + let chunk = grenad::Reader::new(cursor_mmap)?; + word_position_docids.push(chunk); + + // We reconstruct our typed-chunk back. + file.rewind()?; + let chunk = grenad::Reader::new(file)?; + TypedChunk::WordPositionDocids(chunk) + } otherwise => otherwise, }; @@ -359,7 +373,11 @@ where let all_documents_ids = index_documents_ids | new_documents_ids | replaced_documents_ids; self.index.put_documents_ids(self.wtxn, &all_documents_ids)?; - self.execute_prefix_databases(word_docids, word_pair_proximity_docids)?; + self.execute_prefix_databases( + word_docids, + word_pair_proximity_docids, + word_position_docids, + )?; Ok(all_documents_ids.len()) } @@ -369,6 +387,7 @@ where self, word_docids: Vec>, word_pair_proximity_docids: Vec>, + word_position_docids: Vec>, ) -> Result<()> where F: Fn(UpdateIndexingStep) + Sync, @@ -453,7 +472,7 @@ where if let Some(value) = self.config.words_positions_min_level_size { builder.min_level_size(value); } - builder.execute()?; + builder.execute(word_position_docids, &previous_words_prefixes_fst)?; databases_seen += 1; (self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase { diff --git a/milli/src/update/words_prefix_position_docids.rs b/milli/src/update/words_prefix_position_docids.rs index a8346a1cb..b1b8273ef 100644 --- a/milli/src/update/words_prefix_position_docids.rs +++ b/milli/src/update/words_prefix_position_docids.rs @@ -1,17 +1,20 @@ +use std::collections::HashMap; use std::num::NonZeroU32; use std::{cmp, str}; use fst::Streamer; -use grenad::CompressionType; +use grenad::{CompressionType, MergerBuilder}; use heed::types::ByteSlice; -use heed::BytesEncode; +use heed::{BytesDecode, BytesEncode}; use log::debug; +use slice_group_by::GroupBy; use crate::error::SerializationError; use crate::heed_codec::StrBEU32Codec; use crate::index::main_key::WORDS_PREFIXES_FST_KEY; use crate::update::index_documents::{ - create_sorter, merge_cbo_roaring_bitmaps, sorter_into_lmdb_database, WriteMethod, + create_sorter, fst_stream_into_hashset, fst_stream_into_vec, merge_cbo_roaring_bitmaps, + sorter_into_lmdb_database, CursorClonableMmap, MergeFn, WriteMethod, }; use crate::{Index, Result}; @@ -54,12 +57,27 @@ impl<'t, 'u, 'i> WordPrefixPositionDocids<'t, 'u, 'i> { } #[logging_timer::time("WordPrefixPositionDocids::{}")] - pub fn execute(self) -> Result<()> { + pub fn execute>( + self, + new_word_position_docids: Vec>, + old_prefix_fst: &fst::Set, + ) -> Result<()> { debug!("Computing and writing the word levels positions docids into LMDB on disk..."); - self.index.word_prefix_position_docids.clear(self.wtxn)?; + let prefix_fst = self.index.words_prefixes_fst(self.wtxn)?; - let mut word_prefix_positions_docids_sorter = create_sorter( + // We retrieve the common words between the previous and new prefix word fst. + let common_prefix_fst_keys = + fst_stream_into_vec(old_prefix_fst.op().add(&prefix_fst).intersection()); + let common_prefix_fst_keys: Vec<_> = common_prefix_fst_keys + .as_slice() + .linear_group_by_key(|x| x.chars().nth(0).unwrap()) + .collect(); + + // We compute the set of prefixes that are no more part of the prefix fst. + let suppr_pw = fst_stream_into_hashset(old_prefix_fst.op().add(&prefix_fst).difference()); + + let mut prefix_position_docids_sorter = create_sorter( merge_cbo_roaring_bitmaps, self.chunk_compression_type, self.chunk_compression_level, @@ -67,39 +85,107 @@ impl<'t, 'u, 'i> WordPrefixPositionDocids<'t, 'u, 'i> { self.max_memory, ); - // We insert the word prefix position and - // corresponds to the word-prefix position where the prefixes appears - // in the prefix FST previously constructed. - let prefix_fst = self.index.words_prefixes_fst(self.wtxn)?; + let mut word_position_docids_merger = MergerBuilder::new(merge_cbo_roaring_bitmaps); + word_position_docids_merger.extend(new_word_position_docids); + let mut word_position_docids_iter = + word_position_docids_merger.build().into_merger_iter()?; + + // We fetch all the new common prefixes between the previous and new prefix fst. + let mut buffer = Vec::new(); + let mut current_prefixes: Option<&&[String]> = None; + let mut prefixes_cache = HashMap::new(); + while let Some((key, data)) = word_position_docids_iter.next()? { + let (word, pos) = StrBEU32Codec::bytes_decode(key).ok_or(heed::Error::Decoding)?; + + current_prefixes = match current_prefixes.take() { + Some(prefixes) if word.starts_with(&prefixes[0]) => Some(prefixes), + _otherwise => { + write_prefixes_in_sorter( + &mut prefixes_cache, + &mut prefix_position_docids_sorter, + )?; + common_prefix_fst_keys.iter().find(|prefixes| word.starts_with(&prefixes[0])) + } + }; + + if let Some(prefixes) = current_prefixes { + for prefix in prefixes.iter() { + if word.starts_with(prefix) { + buffer.clear(); + buffer.extend_from_slice(prefix.as_bytes()); + buffer.extend_from_slice(&pos.to_be_bytes()); + match prefixes_cache.get_mut(&buffer) { + Some(value) => value.push(data.to_owned()), + None => { + prefixes_cache.insert(buffer.clone(), vec![data.to_owned()]); + } + } + } + } + } + } + + write_prefixes_in_sorter(&mut prefixes_cache, &mut prefix_position_docids_sorter)?; + + // We fetch the docids associated to the newly added word prefix fst only. let db = self.index.word_position_docids.remap_data_type::(); - // iter over all prefixes in the prefix fst. - let mut word_stream = prefix_fst.stream(); - while let Some(prefix_bytes) = word_stream.next() { + let mut new_prefixes_stream = prefix_fst.op().add(old_prefix_fst).difference(); + while let Some(prefix_bytes) = new_prefixes_stream.next() { let prefix = str::from_utf8(prefix_bytes).map_err(|_| { SerializationError::Decoding { db_name: Some(WORDS_PREFIXES_FST_KEY) } })?; // iter over all lines of the DB where the key is prefixed by the current prefix. - let mut iter = db + let iter = db .remap_key_type::() - .prefix_iter(self.wtxn, &prefix_bytes)? + .prefix_iter(self.wtxn, prefix_bytes)? .remap_key_type::(); - while let Some(((_word, pos), data)) = iter.next().transpose()? { - let key = (prefix, pos); - let bytes = StrBEU32Codec::bytes_encode(&key).unwrap(); - word_prefix_positions_docids_sorter.insert(bytes, data)?; + for result in iter { + let ((word, pos), data) = result?; + if word.starts_with(prefix) { + let key = (prefix, pos); + let bytes = StrBEU32Codec::bytes_encode(&key).unwrap(); + prefix_position_docids_sorter.insert(bytes, data)?; + } } } + drop(new_prefixes_stream); + + // We remove all the entries that are no more required in this word prefix position + // docids database. + let mut iter = + self.index.word_prefix_position_docids.iter_mut(self.wtxn)?.lazily_decode_data(); + while let Some(((prefix, _), _)) = iter.next().transpose()? { + if suppr_pw.contains(prefix.as_bytes()) { + unsafe { iter.del_current()? }; + } + } + + drop(iter); + // We finally write all the word prefix position docids into the LMDB database. sorter_into_lmdb_database( self.wtxn, *self.index.word_prefix_position_docids.as_polymorph(), - word_prefix_positions_docids_sorter, + prefix_position_docids_sorter, merge_cbo_roaring_bitmaps, - WriteMethod::Append, + WriteMethod::GetMergePut, )?; Ok(()) } } + +fn write_prefixes_in_sorter( + prefixes: &mut HashMap, Vec>>, + sorter: &mut grenad::Sorter, +) -> Result<()> { + for (key, data_slices) in prefixes.drain() { + for data in data_slices { + sorter.insert(&key, data)?; + } + } + + Ok(()) +} From 51d1e64b238b73919bd15f564d3f88bfa1901947 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Tue, 25 Jan 2022 17:12:12 +0100 Subject: [PATCH 11/14] Remove, now useless, the WriteMethod enum --- milli/src/update/facets.rs | 6 +- .../index_documents/helpers/grenad_helpers.rs | 74 ++++++------------- milli/src/update/index_documents/mod.rs | 6 -- milli/src/update/word_prefix_docids.rs | 3 +- .../word_prefix_pair_proximity_docids.rs | 3 +- .../update/words_prefix_position_docids.rs | 3 +- 6 files changed, 28 insertions(+), 67 deletions(-) diff --git a/milli/src/update/facets.rs b/milli/src/update/facets.rs index a2f17cba3..19684c6ea 100644 --- a/milli/src/update/facets.rs +++ b/milli/src/update/facets.rs @@ -15,9 +15,7 @@ use crate::heed_codec::facet::{ FacetStringLevelZeroValueCodec, FacetStringZeroBoundsValueCodec, }; use crate::heed_codec::CboRoaringBitmapCodec; -use crate::update::index_documents::{ - create_writer, write_into_lmdb_database, writer_into_reader, WriteMethod, -}; +use crate::update::index_documents::{create_writer, write_into_lmdb_database, writer_into_reader}; use crate::{FieldId, Index, Result}; pub struct Facets<'t, 'u, 'i> { @@ -120,7 +118,6 @@ impl<'t, 'u, 'i> Facets<'t, 'u, 'i> { *self.index.facet_id_f64_docids.as_polymorph(), facet_number_levels, |_, _| Err(InternalError::IndexingMergingKeys { process: "facet number levels" })?, - WriteMethod::GetMergePut, )?; write_into_lmdb_database( @@ -128,7 +125,6 @@ impl<'t, 'u, 'i> Facets<'t, 'u, 'i> { *self.index.facet_id_string_docids.as_polymorph(), facet_string_levels, |_, _| Err(InternalError::IndexingMergingKeys { process: "facet string levels" })?, - WriteMethod::GetMergePut, )?; } diff --git a/milli/src/update/index_documents/helpers/grenad_helpers.rs b/milli/src/update/index_documents/helpers/grenad_helpers.rs index 10662892b..eef067122 100644 --- a/milli/src/update/index_documents/helpers/grenad_helpers.rs +++ b/milli/src/update/index_documents/helpers/grenad_helpers.rs @@ -9,7 +9,6 @@ use log::debug; use super::{ClonableMmap, MergeFn}; use crate::error::InternalError; -use crate::update::index_documents::WriteMethod; use crate::Result; pub type CursorClonableMmap = io::Cursor; @@ -169,34 +168,22 @@ pub fn write_into_lmdb_database( database: heed::PolyDatabase, mut reader: Reader, merge: MergeFn, - method: WriteMethod, ) -> Result<()> { debug!("Writing MTBL stores..."); let before = Instant::now(); - match method { - WriteMethod::Append => { - let mut out_iter = database.iter_mut::<_, ByteSlice, ByteSlice>(wtxn)?; - while let Some((k, v)) = reader.next()? { + while let Some((k, v)) = reader.next()? { + let mut iter = database.prefix_iter_mut::<_, ByteSlice, ByteSlice>(wtxn, k)?; + match iter.next().transpose()? { + Some((key, old_val)) if key == k => { + let vals = &[Cow::Borrowed(old_val), Cow::Borrowed(v)][..]; + let val = merge(k, &vals)?; // safety: we don't keep references from inside the LMDB database. - unsafe { out_iter.append(k, v)? }; + unsafe { iter.put_current(k, &val)? }; } - } - WriteMethod::GetMergePut => { - while let Some((k, v)) = reader.next()? { - let mut iter = database.prefix_iter_mut::<_, ByteSlice, ByteSlice>(wtxn, k)?; - match iter.next().transpose()? { - Some((key, old_val)) if key == k => { - let vals = &[Cow::Borrowed(old_val), Cow::Borrowed(v)][..]; - let val = merge(k, &vals)?; - // safety: we don't keep references from inside the LMDB database. - unsafe { iter.put_current(k, &val)? }; - } - _ => { - drop(iter); - database.put::<_, ByteSlice, ByteSlice>(wtxn, k, v)?; - } - } + _ => { + drop(iter); + database.put::<_, ByteSlice, ByteSlice>(wtxn, k, v)?; } } } @@ -210,12 +197,11 @@ pub fn sorter_into_lmdb_database( database: heed::PolyDatabase, sorter: Sorter, merge: MergeFn, - method: WriteMethod, ) -> Result<()> { debug!("Writing MTBL sorter..."); let before = Instant::now(); - merger_iter_into_lmdb_database(wtxn, database, sorter.into_merger_iter()?, merge, method)?; + merger_iter_into_lmdb_database(wtxn, database, sorter.into_merger_iter()?, merge)?; debug!("MTBL sorter writen in {:.02?}!", before.elapsed()); Ok(()) @@ -226,34 +212,22 @@ fn merger_iter_into_lmdb_database( database: heed::PolyDatabase, mut sorter: MergerIter, merge: MergeFn, - method: WriteMethod, ) -> Result<()> { - match method { - WriteMethod::Append => { - let mut out_iter = database.iter_mut::<_, ByteSlice, ByteSlice>(wtxn)?; - while let Some((k, v)) = sorter.next()? { + while let Some((k, v)) = sorter.next()? { + let mut iter = database.prefix_iter_mut::<_, ByteSlice, ByteSlice>(wtxn, k)?; + match iter.next().transpose()? { + Some((key, old_val)) if key == k => { + let vals = vec![Cow::Borrowed(old_val), Cow::Borrowed(v)]; + let val = merge(k, &vals).map_err(|_| { + // TODO just wrap this error? + InternalError::IndexingMergingKeys { process: "get-put-merge" } + })?; // safety: we don't keep references from inside the LMDB database. - unsafe { out_iter.append(k, v)? }; + unsafe { iter.put_current(k, &val)? }; } - } - WriteMethod::GetMergePut => { - while let Some((k, v)) = sorter.next()? { - let mut iter = database.prefix_iter_mut::<_, ByteSlice, ByteSlice>(wtxn, k)?; - match iter.next().transpose()? { - Some((key, old_val)) if key == k => { - let vals = vec![Cow::Borrowed(old_val), Cow::Borrowed(v)]; - let val = merge(k, &vals).map_err(|_| { - // TODO just wrap this error? - InternalError::IndexingMergingKeys { process: "get-put-merge" } - })?; - // safety: we don't keep references from inside the LMDB database. - unsafe { iter.put_current(k, &val)? }; - } - _ => { - drop(iter); - database.put::<_, ByteSlice, ByteSlice>(wtxn, k, v)?; - } - } + _ => { + drop(iter); + database.put::<_, ByteSlice, ByteSlice>(wtxn, k, v)?; } } } diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index 7ea5c3816..ee80d8ada 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -59,12 +59,6 @@ impl Default for IndexDocumentsMethod { } } -#[derive(Debug, Copy, Clone)] -pub enum WriteMethod { - Append, - GetMergePut, -} - pub struct IndexDocuments<'t, 'u, 'i, 'a, F> { wtxn: &'t mut heed::RwTxn<'i, 'u>, index: &'i Index, diff --git a/milli/src/update/word_prefix_docids.rs b/milli/src/update/word_prefix_docids.rs index 1e2996c9b..cf50a5b8a 100644 --- a/milli/src/update/word_prefix_docids.rs +++ b/milli/src/update/word_prefix_docids.rs @@ -7,7 +7,7 @@ use slice_group_by::GroupBy; use crate::update::index_documents::{ create_sorter, fst_stream_into_hashset, fst_stream_into_vec, merge_roaring_bitmaps, - sorter_into_lmdb_database, CursorClonableMmap, MergeFn, WriteMethod, + sorter_into_lmdb_database, CursorClonableMmap, MergeFn, }; use crate::{Index, Result}; @@ -126,7 +126,6 @@ impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> { *self.index.word_prefix_docids.as_polymorph(), prefix_docids_sorter, merge_roaring_bitmaps, - WriteMethod::GetMergePut, )?; Ok(()) diff --git a/milli/src/update/word_prefix_pair_proximity_docids.rs b/milli/src/update/word_prefix_pair_proximity_docids.rs index f846e8d9e..5b025e4fc 100644 --- a/milli/src/update/word_prefix_pair_proximity_docids.rs +++ b/milli/src/update/word_prefix_pair_proximity_docids.rs @@ -8,7 +8,7 @@ use slice_group_by::GroupBy; use crate::update::index_documents::{ create_sorter, fst_stream_into_hashset, fst_stream_into_vec, merge_cbo_roaring_bitmaps, - sorter_into_lmdb_database, CursorClonableMmap, MergeFn, WriteMethod, + sorter_into_lmdb_database, CursorClonableMmap, MergeFn, }; use crate::{Index, Result, StrStrU8Codec}; @@ -192,7 +192,6 @@ impl<'t, 'u, 'i> WordPrefixPairProximityDocids<'t, 'u, 'i> { *self.index.word_prefix_pair_proximity_docids.as_polymorph(), word_prefix_pair_proximity_docids_sorter, merge_cbo_roaring_bitmaps, - WriteMethod::GetMergePut, )?; Ok(()) diff --git a/milli/src/update/words_prefix_position_docids.rs b/milli/src/update/words_prefix_position_docids.rs index b1b8273ef..178684cf0 100644 --- a/milli/src/update/words_prefix_position_docids.rs +++ b/milli/src/update/words_prefix_position_docids.rs @@ -14,7 +14,7 @@ use crate::heed_codec::StrBEU32Codec; use crate::index::main_key::WORDS_PREFIXES_FST_KEY; use crate::update::index_documents::{ create_sorter, fst_stream_into_hashset, fst_stream_into_vec, merge_cbo_roaring_bitmaps, - sorter_into_lmdb_database, CursorClonableMmap, MergeFn, WriteMethod, + sorter_into_lmdb_database, CursorClonableMmap, MergeFn, }; use crate::{Index, Result}; @@ -170,7 +170,6 @@ impl<'t, 'u, 'i> WordPrefixPositionDocids<'t, 'u, 'i> { *self.index.word_prefix_position_docids.as_polymorph(), prefix_position_docids_sorter, merge_cbo_roaring_bitmaps, - WriteMethod::GetMergePut, )?; Ok(()) From fb79c324304ea093ba6e79b04444b0b1907f9511 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Thu, 27 Jan 2022 11:00:18 +0100 Subject: [PATCH 12/14] Compute the new, common and, deleted prefix words fst once --- milli/src/update/index_documents/mod.rs | 43 +++++++++++++++++-- milli/src/update/word_prefix_docids.rs | 37 +++++----------- .../word_prefix_pair_proximity_docids.rs | 43 ++++++------------- .../update/words_prefix_position_docids.rs | 40 ++++++----------- 4 files changed, 75 insertions(+), 88 deletions(-) diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index ee80d8ada..a31d1875b 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -12,6 +12,7 @@ use crossbeam_channel::{Receiver, Sender}; use log::debug; use roaring::RoaringBitmap; use serde::{Deserialize, Serialize}; +use slice_group_by::GroupBy; use typed_chunk::{write_typed_chunk_into_index, TypedChunk}; pub use self::helpers::{ @@ -420,6 +421,27 @@ where } builder.execute()?; + let current_prefix_fst = self.index.words_prefixes_fst(self.wtxn)?; + + // We retrieve the common words between the previous and new prefix word fst. + let common_prefix_fst_words = fst_stream_into_vec( + previous_words_prefixes_fst.op().add(¤t_prefix_fst).intersection(), + ); + let common_prefix_fst_words: Vec<_> = common_prefix_fst_words + .as_slice() + .linear_group_by_key(|x| x.chars().nth(0).unwrap()) + .collect(); + + // We retrieve the newly added words between the previous and new prefix word fst. + let new_prefix_fst_words = fst_stream_into_vec( + current_prefix_fst.op().add(&previous_words_prefixes_fst).difference(), + ); + + // We compute the set of prefixes that are no more part of the prefix fst. + let del_prefix_fst_words = fst_stream_into_hashset( + previous_words_prefixes_fst.op().add(¤t_prefix_fst).difference(), + ); + databases_seen += 1; (self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase { databases_seen, @@ -432,7 +454,12 @@ where builder.chunk_compression_level = self.indexer_config.chunk_compression_level; builder.max_nb_chunks = self.indexer_config.max_nb_chunks; builder.max_memory = self.indexer_config.max_memory; - builder.execute(word_docids, &previous_words_prefixes_fst)?; + builder.execute( + word_docids, + &new_prefix_fst_words, + &common_prefix_fst_words, + &del_prefix_fst_words, + )?; databases_seen += 1; (self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase { @@ -446,7 +473,12 @@ where builder.chunk_compression_level = self.indexer_config.chunk_compression_level; builder.max_nb_chunks = self.indexer_config.max_nb_chunks; builder.max_memory = self.indexer_config.max_memory; - builder.execute(word_pair_proximity_docids, &previous_words_prefixes_fst)?; + builder.execute( + word_pair_proximity_docids, + &new_prefix_fst_words, + &common_prefix_fst_words, + &del_prefix_fst_words, + )?; databases_seen += 1; (self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase { @@ -466,7 +498,12 @@ where if let Some(value) = self.config.words_positions_min_level_size { builder.min_level_size(value); } - builder.execute(word_position_docids, &previous_words_prefixes_fst)?; + builder.execute( + word_position_docids, + &new_prefix_fst_words, + &common_prefix_fst_words, + &del_prefix_fst_words, + )?; databases_seen += 1; (self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase { diff --git a/milli/src/update/word_prefix_docids.rs b/milli/src/update/word_prefix_docids.rs index cf50a5b8a..624037f8f 100644 --- a/milli/src/update/word_prefix_docids.rs +++ b/milli/src/update/word_prefix_docids.rs @@ -1,13 +1,10 @@ -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; -use fst::Streamer; use grenad::{CompressionType, MergerBuilder}; use heed::types::ByteSlice; -use slice_group_by::GroupBy; use crate::update::index_documents::{ - create_sorter, fst_stream_into_hashset, fst_stream_into_vec, merge_roaring_bitmaps, - sorter_into_lmdb_database, CursorClonableMmap, MergeFn, + create_sorter, merge_roaring_bitmaps, sorter_into_lmdb_database, CursorClonableMmap, MergeFn, }; use crate::{Index, Result}; @@ -36,24 +33,13 @@ impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> { } #[logging_timer::time("WordPrefixDocids::{}")] - pub fn execute>( + pub fn execute( self, new_word_docids: Vec>, - old_prefix_fst: &fst::Set, + new_prefix_fst_words: &[String], + common_prefix_fst_words: &[&[String]], + del_prefix_fst_words: &HashSet>, ) -> Result<()> { - let prefix_fst = self.index.words_prefixes_fst(self.wtxn)?; - - // We retrieve the common words between the previous and new prefix word fst. - let common_prefix_fst_keys = - fst_stream_into_vec(old_prefix_fst.op().add(&prefix_fst).intersection()); - let common_prefix_fst_keys: Vec<_> = common_prefix_fst_keys - .as_slice() - .linear_group_by_key(|x| x.chars().nth(0).unwrap()) - .collect(); - - // We compute the set of prefixes that are no more part of the prefix fst. - let suppr_pw = fst_stream_into_hashset(old_prefix_fst.op().add(&prefix_fst).difference()); - // It is forbidden to keep a mutable reference into the database // and write into it at the same time, therefore we write into another file. let mut prefix_docids_sorter = create_sorter( @@ -75,7 +61,7 @@ impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> { Some(prefixes) if word.starts_with(&prefixes[0].as_bytes()) => Some(prefixes), _otherwise => { write_prefixes_in_sorter(&mut prefixes_cache, &mut prefix_docids_sorter)?; - common_prefix_fst_keys + common_prefix_fst_words .iter() .find(|prefixes| word.starts_with(&prefixes[0].as_bytes())) } @@ -99,21 +85,18 @@ impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> { // We fetch the docids associated to the newly added word prefix fst only. let db = self.index.word_docids.remap_data_type::(); - let mut new_prefixes_stream = prefix_fst.op().add(old_prefix_fst).difference(); - while let Some(bytes) = new_prefixes_stream.next() { - let prefix = std::str::from_utf8(bytes)?; + for prefix in new_prefix_fst_words { + let prefix = std::str::from_utf8(prefix.as_bytes())?; for result in db.prefix_iter(self.wtxn, prefix)? { let (_word, data) = result?; prefix_docids_sorter.insert(prefix, data)?; } } - drop(new_prefixes_stream); - // We remove all the entries that are no more required in this word prefix docids database. let mut iter = self.index.word_prefix_docids.iter_mut(self.wtxn)?.lazily_decode_data(); while let Some((prefix, _)) = iter.next().transpose()? { - if suppr_pw.contains(prefix.as_bytes()) { + if del_prefix_fst_words.contains(prefix.as_bytes()) { unsafe { iter.del_current()? }; } } diff --git a/milli/src/update/word_prefix_pair_proximity_docids.rs b/milli/src/update/word_prefix_pair_proximity_docids.rs index 5b025e4fc..530c2867e 100644 --- a/milli/src/update/word_prefix_pair_proximity_docids.rs +++ b/milli/src/update/word_prefix_pair_proximity_docids.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use grenad::{CompressionType, MergerBuilder}; use heed::types::ByteSlice; @@ -7,8 +7,8 @@ use log::debug; use slice_group_by::GroupBy; use crate::update::index_documents::{ - create_sorter, fst_stream_into_hashset, fst_stream_into_vec, merge_cbo_roaring_bitmaps, - sorter_into_lmdb_database, CursorClonableMmap, MergeFn, + create_sorter, merge_cbo_roaring_bitmaps, sorter_into_lmdb_database, CursorClonableMmap, + MergeFn, }; use crate::{Index, Result, StrStrU8Codec}; @@ -62,40 +62,24 @@ impl<'t, 'u, 'i> WordPrefixPairProximityDocids<'t, 'u, 'i> { } #[logging_timer::time("WordPrefixPairProximityDocids::{}")] - pub fn execute>( + pub fn execute( self, new_word_pair_proximity_docids: Vec>, - old_prefix_fst: &fst::Set, + new_prefix_fst_words: &[String], + common_prefix_fst_words: &[&[String]], + del_prefix_fst_words: &HashSet>, ) -> Result<()> { debug!("Computing and writing the word prefix pair proximity docids into LMDB on disk..."); + let new_prefix_fst_words: Vec<_> = + new_prefix_fst_words.linear_group_by_key(|x| x.chars().nth(0).unwrap()).collect(); + // We retrieve and merge the created word pair proximities docids entries // for the newly added documents. let mut wppd_merger = MergerBuilder::new(merge_cbo_roaring_bitmaps); wppd_merger.extend(new_word_pair_proximity_docids); let mut wppd_iter = wppd_merger.build().into_merger_iter()?; - let prefix_fst = self.index.words_prefixes_fst(self.wtxn)?; - - // We retrieve the common words between the previous and new prefix word fst. - let common_prefix_fst_keys = - fst_stream_into_vec(old_prefix_fst.op().add(&prefix_fst).intersection()); - let common_prefix_fst_keys: Vec<_> = common_prefix_fst_keys - .as_slice() - .linear_group_by_key(|x| x.chars().nth(0).unwrap()) - .collect(); - - // We retrieve the newly added words between the previous and new prefix word fst. - let new_prefix_fst_keys = - fst_stream_into_vec(prefix_fst.op().add(old_prefix_fst).difference()); - let new_prefix_fst_keys: Vec<_> = new_prefix_fst_keys - .as_slice() - .linear_group_by_key(|x| x.chars().nth(0).unwrap()) - .collect(); - - // We compute the set of prefixes that are no more part of the prefix fst. - let suppr_pw = fst_stream_into_hashset(old_prefix_fst.op().add(&prefix_fst).difference()); - let mut word_prefix_pair_proximity_docids_sorter = create_sorter( merge_cbo_roaring_bitmaps, self.chunk_compression_type, @@ -120,7 +104,7 @@ impl<'t, 'u, 'i> WordPrefixPairProximityDocids<'t, 'u, 'i> { &mut current_prefixes, &mut prefixes_cache, &mut word_prefix_pair_proximity_docids_sorter, - &common_prefix_fst_keys, + common_prefix_fst_words, self.max_prefix_length, w1, w2, @@ -152,7 +136,7 @@ impl<'t, 'u, 'i> WordPrefixPairProximityDocids<'t, 'u, 'i> { &mut current_prefixes, &mut prefixes_cache, &mut word_prefix_pair_proximity_docids_sorter, - &new_prefix_fst_keys, + &new_prefix_fst_words, self.max_prefix_length, w1, w2, @@ -166,7 +150,6 @@ impl<'t, 'u, 'i> WordPrefixPairProximityDocids<'t, 'u, 'i> { &mut word_prefix_pair_proximity_docids_sorter, )?; - drop(prefix_fst); drop(db_iter); // All of the word prefix pairs in the database that have a w2 @@ -177,7 +160,7 @@ impl<'t, 'u, 'i> WordPrefixPairProximityDocids<'t, 'u, 'i> { .remap_data_type::() .iter_mut(self.wtxn)?; while let Some(((_, w2, _), _)) = iter.next().transpose()? { - if suppr_pw.contains(w2.as_bytes()) { + if del_prefix_fst_words.contains(w2.as_bytes()) { // Delete this entry as the w2 prefix is no more in the words prefix fst. unsafe { iter.del_current()? }; } diff --git a/milli/src/update/words_prefix_position_docids.rs b/milli/src/update/words_prefix_position_docids.rs index 178684cf0..c992d01ec 100644 --- a/milli/src/update/words_prefix_position_docids.rs +++ b/milli/src/update/words_prefix_position_docids.rs @@ -1,20 +1,18 @@ -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::num::NonZeroU32; use std::{cmp, str}; -use fst::Streamer; use grenad::{CompressionType, MergerBuilder}; use heed::types::ByteSlice; use heed::{BytesDecode, BytesEncode}; use log::debug; -use slice_group_by::GroupBy; use crate::error::SerializationError; use crate::heed_codec::StrBEU32Codec; use crate::index::main_key::WORDS_PREFIXES_FST_KEY; use crate::update::index_documents::{ - create_sorter, fst_stream_into_hashset, fst_stream_into_vec, merge_cbo_roaring_bitmaps, - sorter_into_lmdb_database, CursorClonableMmap, MergeFn, + create_sorter, merge_cbo_roaring_bitmaps, sorter_into_lmdb_database, CursorClonableMmap, + MergeFn, }; use crate::{Index, Result}; @@ -57,26 +55,15 @@ impl<'t, 'u, 'i> WordPrefixPositionDocids<'t, 'u, 'i> { } #[logging_timer::time("WordPrefixPositionDocids::{}")] - pub fn execute>( + pub fn execute( self, new_word_position_docids: Vec>, - old_prefix_fst: &fst::Set, + new_prefix_fst_words: &[String], + common_prefix_fst_words: &[&[String]], + del_prefix_fst_words: &HashSet>, ) -> Result<()> { debug!("Computing and writing the word levels positions docids into LMDB on disk..."); - let prefix_fst = self.index.words_prefixes_fst(self.wtxn)?; - - // We retrieve the common words between the previous and new prefix word fst. - let common_prefix_fst_keys = - fst_stream_into_vec(old_prefix_fst.op().add(&prefix_fst).intersection()); - let common_prefix_fst_keys: Vec<_> = common_prefix_fst_keys - .as_slice() - .linear_group_by_key(|x| x.chars().nth(0).unwrap()) - .collect(); - - // We compute the set of prefixes that are no more part of the prefix fst. - let suppr_pw = fst_stream_into_hashset(old_prefix_fst.op().add(&prefix_fst).difference()); - let mut prefix_position_docids_sorter = create_sorter( merge_cbo_roaring_bitmaps, self.chunk_compression_type, @@ -104,7 +91,7 @@ impl<'t, 'u, 'i> WordPrefixPositionDocids<'t, 'u, 'i> { &mut prefixes_cache, &mut prefix_position_docids_sorter, )?; - common_prefix_fst_keys.iter().find(|prefixes| word.starts_with(&prefixes[0])) + common_prefix_fst_words.iter().find(|prefixes| word.starts_with(&prefixes[0])) } }; @@ -129,16 +116,15 @@ impl<'t, 'u, 'i> WordPrefixPositionDocids<'t, 'u, 'i> { // We fetch the docids associated to the newly added word prefix fst only. let db = self.index.word_position_docids.remap_data_type::(); - let mut new_prefixes_stream = prefix_fst.op().add(old_prefix_fst).difference(); - while let Some(prefix_bytes) = new_prefixes_stream.next() { - let prefix = str::from_utf8(prefix_bytes).map_err(|_| { + for prefix_bytes in new_prefix_fst_words { + let prefix = str::from_utf8(prefix_bytes.as_bytes()).map_err(|_| { SerializationError::Decoding { db_name: Some(WORDS_PREFIXES_FST_KEY) } })?; // iter over all lines of the DB where the key is prefixed by the current prefix. let iter = db .remap_key_type::() - .prefix_iter(self.wtxn, prefix_bytes)? + .prefix_iter(self.wtxn, prefix_bytes.as_bytes())? .remap_key_type::(); for result in iter { let ((word, pos), data) = result?; @@ -150,14 +136,12 @@ impl<'t, 'u, 'i> WordPrefixPositionDocids<'t, 'u, 'i> { } } - drop(new_prefixes_stream); - // We remove all the entries that are no more required in this word prefix position // docids database. let mut iter = self.index.word_prefix_position_docids.iter_mut(self.wtxn)?.lazily_decode_data(); while let Some(((prefix, _), _)) = iter.next().transpose()? { - if suppr_pw.contains(prefix.as_bytes()) { + if del_prefix_fst_words.contains(prefix.as_bytes()) { unsafe { iter.del_current()? }; } } From f367cc2e75971f512922a1276b00f556553f1daa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Wed, 16 Feb 2022 15:28:48 +0100 Subject: [PATCH 13/14] Finally bump grenad to v0.4.1 --- milli/Cargo.toml | 2 +- milli/src/error.rs | 7 ++ milli/src/update/facets.rs | 6 +- .../extract/extract_docid_word_positions.rs | 7 +- .../extract/extract_facet_number_docids.rs | 7 +- .../extract/extract_facet_string_docids.rs | 7 +- .../extract/extract_fid_docid_facet_values.rs | 7 +- .../extract/extract_fid_word_count_docids.rs | 7 +- .../extract/extract_geo_points.rs | 15 +++-- .../extract/extract_word_docids.rs | 7 +- .../extract_word_pair_proximity_docids.rs | 7 +- .../extract/extract_word_position_docids.rs | 7 +- .../index_documents/helpers/grenad_helpers.rs | 66 +++++++++++-------- milli/src/update/index_documents/transform.rs | 14 ++-- .../src/update/index_documents/typed_chunk.rs | 40 ++++++----- milli/src/update/word_prefix_docids.rs | 6 +- .../word_prefix_pair_proximity_docids.rs | 6 +- .../update/words_prefix_position_docids.rs | 6 +- 18 files changed, 130 insertions(+), 94 deletions(-) diff --git a/milli/Cargo.toml b/milli/Cargo.toml index 6b830c29e..9197fa818 100644 --- a/milli/Cargo.toml +++ b/milli/Cargo.toml @@ -16,7 +16,7 @@ either = "1.6.1" flate2 = "1.0.20" fst = "0.4.5" fxhash = "0.2.1" -grenad = { version = "0.3.1", default-features = false, features = ["tempfile"] } +grenad = { version = "0.4.1", default-features = false, features = ["tempfile"] } geoutils = "0.4.1" heed = { git = "https://github.com/Kerollmops/heed", tag = "v0.12.1", default-features = false, features = ["lmdb", "sync-read-txn"] } human_format = "1.0.3" diff --git a/milli/src/error.rs b/milli/src/error.rs index 47c9a5993..dce23582a 100644 --- a/milli/src/error.rs +++ b/milli/src/error.rs @@ -29,6 +29,7 @@ pub enum InternalError { FieldIdMapMissingEntry(FieldIdMapMissingEntry), Fst(fst::Error), GrenadInvalidCompressionType, + GrenadInvalidFormatVersion, IndexingMergingKeys { process: &'static str }, InvalidDatabaseTyping, RayonThreadPool(ThreadPoolBuildError), @@ -97,6 +98,9 @@ where grenad::Error::InvalidCompressionType => { Error::InternalError(InternalError::GrenadInvalidCompressionType) } + grenad::Error::InvalidFormatVersion => { + Error::InternalError(InternalError::GrenadInvalidFormatVersion) + } } } } @@ -186,6 +190,9 @@ impl fmt::Display for InternalError { Self::GrenadInvalidCompressionType => { f.write_str("Invalid compression type have been specified to grenad.") } + Self::GrenadInvalidFormatVersion => { + f.write_str("Invalid grenad file with an invalid version format.") + } Self::IndexingMergingKeys { process } => { write!(f, "Invalid merge while processing {}.", process) } diff --git a/milli/src/update/facets.rs b/milli/src/update/facets.rs index 19684c6ea..53305cdee 100644 --- a/milli/src/update/facets.rs +++ b/milli/src/update/facets.rs @@ -160,8 +160,7 @@ fn compute_facet_number_levels<'t>( // It is forbidden to keep a cursor and write in a database at the same time with LMDB // therefore we write the facet levels entries into a grenad file before transfering them. - let mut writer = tempfile::tempfile() - .and_then(|file| create_writer(compression_type, compression_level, file))?; + let mut writer = create_writer(compression_type, compression_level, tempfile::tempfile()?); let level_0_range = { let left = (field_id, 0, f64::MIN, f64::MIN); @@ -279,8 +278,7 @@ fn compute_facet_string_levels<'t>( // It is forbidden to keep a cursor and write in a database at the same time with LMDB // therefore we write the facet levels entries into a grenad file before transfering them. - let mut writer = tempfile::tempfile() - .and_then(|file| create_writer(compression_type, compression_level, file))?; + let mut writer = create_writer(compression_type, compression_level, tempfile::tempfile()?); // Groups sizes are always a power of the original level_group_size and therefore a group // always maps groups of the previous level and never splits previous levels groups in half. diff --git a/milli/src/update/index_documents/extract/extract_docid_word_positions.rs b/milli/src/update/index_documents/extract/extract_docid_word_positions.rs index fa1381412..44bf9dbf7 100644 --- a/milli/src/update/index_documents/extract/extract_docid_word_positions.rs +++ b/milli/src/update/index_documents/extract/extract_docid_word_positions.rs @@ -18,8 +18,8 @@ use crate::{absolute_from_relative_position, FieldId, Result, MAX_POSITION_PER_A /// Returns the generated internal documents ids and a grenad reader /// with the list of extracted words from the given chunk of documents. #[logging_timer::time] -pub fn extract_docid_word_positions( - mut obkv_documents: grenad::Reader, +pub fn extract_docid_word_positions( + obkv_documents: grenad::Reader, indexer: GrenadParameters, searchable_fields: &Option>, stop_words: Option<&fst::Set<&[u8]>>, @@ -46,7 +46,8 @@ pub fn extract_docid_word_positions( } let analyzer = Analyzer::>::new(AnalyzerConfig::default()); - while let Some((key, value)) = obkv_documents.next()? { + let mut cursor = obkv_documents.into_cursor()?; + while let Some((key, value)) = cursor.move_on_next()? { let document_id = key .try_into() .map(u32::from_be_bytes) diff --git a/milli/src/update/index_documents/extract/extract_facet_number_docids.rs b/milli/src/update/index_documents/extract/extract_facet_number_docids.rs index 5480bd605..fa63d9549 100644 --- a/milli/src/update/index_documents/extract/extract_facet_number_docids.rs +++ b/milli/src/update/index_documents/extract/extract_facet_number_docids.rs @@ -14,8 +14,8 @@ use crate::Result; /// Returns a grenad reader with the list of extracted facet numbers and /// documents ids from the given chunk of docid facet number positions. #[logging_timer::time] -pub fn extract_facet_number_docids( - mut docid_fid_facet_number: grenad::Reader, +pub fn extract_facet_number_docids( + docid_fid_facet_number: grenad::Reader, indexer: GrenadParameters, ) -> Result> { let max_memory = indexer.max_memory_by_thread(); @@ -28,7 +28,8 @@ pub fn extract_facet_number_docids( max_memory, ); - while let Some((key_bytes, _)) = docid_fid_facet_number.next()? { + let mut cursor = docid_fid_facet_number.into_cursor()?; + while let Some((key_bytes, _)) = cursor.move_on_next()? { let (field_id, document_id, number) = FieldDocIdFacetF64Codec::bytes_decode(key_bytes).unwrap(); diff --git a/milli/src/update/index_documents/extract/extract_facet_string_docids.rs b/milli/src/update/index_documents/extract/extract_facet_string_docids.rs index e08d062cf..8209d817b 100644 --- a/milli/src/update/index_documents/extract/extract_facet_string_docids.rs +++ b/milli/src/update/index_documents/extract/extract_facet_string_docids.rs @@ -16,8 +16,8 @@ use crate::{FieldId, Result}; /// Returns a grenad reader with the list of extracted facet strings and /// documents ids from the given chunk of docid facet string positions. #[logging_timer::time] -pub fn extract_facet_string_docids( - mut docid_fid_facet_string: grenad::Reader, +pub fn extract_facet_string_docids( + docid_fid_facet_string: grenad::Reader, indexer: GrenadParameters, ) -> Result> { let max_memory = indexer.max_memory_by_thread(); @@ -32,7 +32,8 @@ pub fn extract_facet_string_docids( let mut key_buffer = Vec::new(); let mut value_buffer = Vec::new(); - while let Some((key, original_value_bytes)) = docid_fid_facet_string.next()? { + let mut cursor = docid_fid_facet_string.into_cursor()?; + while let Some((key, original_value_bytes)) = cursor.move_on_next()? { let (field_id_bytes, bytes) = try_split_array_at(key).unwrap(); let field_id = FieldId::from_be_bytes(field_id_bytes); let (document_id_bytes, normalized_value_bytes) = try_split_array_at(bytes).unwrap(); diff --git a/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs b/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs index a1bf0b1e3..628636f78 100644 --- a/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs +++ b/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs @@ -16,8 +16,8 @@ use crate::{DocumentId, FieldId, Result}; /// Returns the generated grenad reader containing the docid the fid and the orginal value as key /// and the normalized value as value extracted from the given chunk of documents. #[logging_timer::time] -pub fn extract_fid_docid_facet_values( - mut obkv_documents: grenad::Reader, +pub fn extract_fid_docid_facet_values( + obkv_documents: grenad::Reader, indexer: GrenadParameters, faceted_fields: &HashSet, ) -> Result<(grenad::Reader, grenad::Reader)> { @@ -40,7 +40,8 @@ pub fn extract_fid_docid_facet_values( ); let mut key_buffer = Vec::new(); - while let Some((docid_bytes, value)) = obkv_documents.next()? { + let mut cursor = obkv_documents.into_cursor()?; + while let Some((docid_bytes, value)) = cursor.move_on_next()? { let obkv = obkv::KvReader::new(value); for (field_id, field_bytes) in obkv.iter() { diff --git a/milli/src/update/index_documents/extract/extract_fid_word_count_docids.rs b/milli/src/update/index_documents/extract/extract_fid_word_count_docids.rs index 4e25cb4f6..85a65ee14 100644 --- a/milli/src/update/index_documents/extract/extract_fid_word_count_docids.rs +++ b/milli/src/update/index_documents/extract/extract_fid_word_count_docids.rs @@ -18,8 +18,8 @@ use crate::{relative_from_absolute_position, DocumentId, FieldId, Result}; /// Returns a grenad reader with the list of extracted field id word counts /// and documents ids from the given chunk of docid word positions. #[logging_timer::time] -pub fn extract_fid_word_count_docids( - mut docid_word_positions: grenad::Reader, +pub fn extract_fid_word_count_docids( + docid_word_positions: grenad::Reader, indexer: GrenadParameters, ) -> Result> { let max_memory = indexer.max_memory_by_thread(); @@ -36,7 +36,8 @@ pub fn extract_fid_word_count_docids( let mut document_fid_wordcount = HashMap::new(); let mut current_document_id = None; - while let Some((key, value)) = docid_word_positions.next()? { + let mut cursor = docid_word_positions.into_cursor()?; + while let Some((key, value)) = cursor.move_on_next()? { let (document_id_bytes, _word_bytes) = try_split_array_at(key) .ok_or_else(|| SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?; let document_id = u32::from_be_bytes(document_id_bytes); diff --git a/milli/src/update/index_documents/extract/extract_geo_points.rs b/milli/src/update/index_documents/extract/extract_geo_points.rs index a36b608ee..e58d351d6 100644 --- a/milli/src/update/index_documents/extract/extract_geo_points.rs +++ b/milli/src/update/index_documents/extract/extract_geo_points.rs @@ -10,17 +10,20 @@ use crate::{FieldId, InternalError, Result, UserError}; /// Extracts the geographical coordinates contained in each document under the `_geo` field. /// /// Returns the generated grenad reader containing the docid as key associated to the (latitude, longitude) -pub fn extract_geo_points( - mut obkv_documents: grenad::Reader, +pub fn extract_geo_points( + obkv_documents: grenad::Reader, indexer: GrenadParameters, primary_key_id: FieldId, geo_field_id: FieldId, ) -> Result> { - let mut writer = tempfile::tempfile().and_then(|file| { - create_writer(indexer.chunk_compression_type, indexer.chunk_compression_level, file) - })?; + let mut writer = create_writer( + indexer.chunk_compression_type, + indexer.chunk_compression_level, + tempfile::tempfile()?, + ); - while let Some((docid_bytes, value)) = obkv_documents.next()? { + let mut cursor = obkv_documents.into_cursor()?; + while let Some((docid_bytes, value)) = cursor.move_on_next()? { let obkv = obkv::KvReader::new(value); let point: Value = match obkv.get(geo_field_id) { Some(point) => serde_json::from_slice(point).map_err(InternalError::SerdeJson)?, diff --git a/milli/src/update/index_documents/extract/extract_word_docids.rs b/milli/src/update/index_documents/extract/extract_word_docids.rs index 6d99fda44..80d68298a 100644 --- a/milli/src/update/index_documents/extract/extract_word_docids.rs +++ b/milli/src/update/index_documents/extract/extract_word_docids.rs @@ -17,8 +17,8 @@ use crate::Result; /// Returns a grenad reader with the list of extracted words and /// documents ids from the given chunk of docid word positions. #[logging_timer::time] -pub fn extract_word_docids( - mut docid_word_positions: grenad::Reader, +pub fn extract_word_docids( + docid_word_positions: grenad::Reader, indexer: GrenadParameters, ) -> Result> { let max_memory = indexer.max_memory_by_thread(); @@ -32,7 +32,8 @@ pub fn extract_word_docids( ); let mut value_buffer = Vec::new(); - while let Some((key, _value)) = docid_word_positions.next()? { + let mut cursor = docid_word_positions.into_cursor()?; + while let Some((key, _value)) = cursor.move_on_next()? { let (document_id_bytes, word_bytes) = try_split_array_at(key) .ok_or_else(|| SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?; let document_id = u32::from_be_bytes(document_id_bytes); diff --git a/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs b/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs index f3667694a..90349eb93 100644 --- a/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs +++ b/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs @@ -17,8 +17,8 @@ use crate::{DocumentId, Result}; /// Returns a grenad reader with the list of extracted word pairs proximities and /// documents ids from the given chunk of docid word positions. #[logging_timer::time] -pub fn extract_word_pair_proximity_docids( - mut docid_word_positions: grenad::Reader, +pub fn extract_word_pair_proximity_docids( + docid_word_positions: grenad::Reader, indexer: GrenadParameters, ) -> Result> { let max_memory = indexer.max_memory_by_thread(); @@ -35,7 +35,8 @@ pub fn extract_word_pair_proximity_docids( let mut document_word_positions_heap = BinaryHeap::new(); let mut current_document_id = None; - while let Some((key, value)) = docid_word_positions.next()? { + let mut cursor = docid_word_positions.into_cursor()?; + while let Some((key, value)) = cursor.move_on_next()? { let (document_id_bytes, word_bytes) = try_split_array_at(key) .ok_or_else(|| SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?; let document_id = u32::from_be_bytes(document_id_bytes); diff --git a/milli/src/update/index_documents/extract/extract_word_position_docids.rs b/milli/src/update/index_documents/extract/extract_word_position_docids.rs index 4ca8537ac..a4720ba2b 100644 --- a/milli/src/update/index_documents/extract/extract_word_position_docids.rs +++ b/milli/src/update/index_documents/extract/extract_word_position_docids.rs @@ -14,8 +14,8 @@ use crate::{DocumentId, Result}; /// Returns a grenad reader with the list of extracted words at positions and /// documents ids from the given chunk of docid word positions. #[logging_timer::time] -pub fn extract_word_position_docids( - mut docid_word_positions: grenad::Reader, +pub fn extract_word_position_docids( + docid_word_positions: grenad::Reader, indexer: GrenadParameters, ) -> Result> { let max_memory = indexer.max_memory_by_thread(); @@ -29,7 +29,8 @@ pub fn extract_word_position_docids( ); let mut key_buffer = Vec::new(); - while let Some((key, value)) = docid_word_positions.next()? { + let mut cursor = docid_word_positions.into_cursor()?; + while let Some((key, value)) = cursor.move_on_next()? { let (document_id_bytes, word_bytes) = try_split_array_at(key) .ok_or_else(|| SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?; let document_id = DocumentId::from_be_bytes(document_id_bytes); diff --git a/milli/src/update/index_documents/helpers/grenad_helpers.rs b/milli/src/update/index_documents/helpers/grenad_helpers.rs index eef067122..ec4a32755 100644 --- a/milli/src/update/index_documents/helpers/grenad_helpers.rs +++ b/milli/src/update/index_documents/helpers/grenad_helpers.rs @@ -17,7 +17,7 @@ pub fn create_writer( typ: grenad::CompressionType, level: Option, file: R, -) -> io::Result> { +) -> grenad::Writer { let mut builder = grenad::Writer::builder(); builder.compression_type(typ); if let Some(level) = level { @@ -52,10 +52,13 @@ pub fn sorter_into_reader( sorter: grenad::Sorter, indexer: GrenadParameters, ) -> Result> { - let mut writer = tempfile::tempfile().and_then(|file| { - create_writer(indexer.chunk_compression_type, indexer.chunk_compression_level, file) - })?; - sorter.write_into(&mut writer)?; + let mut writer = create_writer( + indexer.chunk_compression_type, + indexer.chunk_compression_level, + tempfile::tempfile()?, + ); + sorter.write_into_stream_writer(&mut writer)?; + Ok(writer_into_reader(writer)?) } @@ -75,20 +78,25 @@ pub unsafe fn into_clonable_grenad( Ok(reader) } -pub fn merge_readers( +pub fn merge_readers( readers: Vec>, merge_fn: MergeFn, indexer: GrenadParameters, ) -> Result> { let mut merger_builder = grenad::MergerBuilder::new(merge_fn); - merger_builder.extend(readers); + for reader in readers { + merger_builder.push(reader.into_cursor()?); + } + let merger = merger_builder.build(); - let mut writer = tempfile::tempfile().and_then(|file| { - create_writer(indexer.chunk_compression_type, indexer.chunk_compression_level, file) - })?; - merger.write_into(&mut writer)?; - let reader = writer_into_reader(writer)?; - Ok(reader) + let mut writer = create_writer( + indexer.chunk_compression_type, + indexer.chunk_compression_level, + tempfile::tempfile()?, + ); + merger.write_into_stream_writer(&mut writer)?; + + Ok(writer_into_reader(writer)?) } #[derive(Debug, Clone, Copy)] @@ -125,12 +133,13 @@ impl GrenadParameters { /// The grenad obkv entries are composed of an incremental document id big-endian /// encoded as the key and an obkv object with an `u8` for the field as the key /// and a simple UTF-8 encoded string as the value. -pub fn grenad_obkv_into_chunks( - mut reader: grenad::Reader, +pub fn grenad_obkv_into_chunks( + reader: grenad::Reader, indexer: GrenadParameters, documents_chunk_size: usize, ) -> Result>>> { let mut continue_reading = true; + let mut cursor = reader.into_cursor()?; let indexer_clone = indexer.clone(); let mut transposer = move || { @@ -139,15 +148,13 @@ pub fn grenad_obkv_into_chunks( } let mut current_chunk_size = 0u64; - let mut obkv_documents = tempfile::tempfile().and_then(|file| { - create_writer( - indexer_clone.chunk_compression_type, - indexer_clone.chunk_compression_level, - file, - ) - })?; + let mut obkv_documents = create_writer( + indexer_clone.chunk_compression_type, + indexer_clone.chunk_compression_level, + tempfile::tempfile()?, + ); - while let Some((document_id, obkv)) = reader.next()? { + while let Some((document_id, obkv)) = cursor.move_on_next()? { obkv_documents.insert(document_id, obkv)?; current_chunk_size += document_id.len() as u64 + obkv.len() as u64; @@ -166,13 +173,14 @@ pub fn grenad_obkv_into_chunks( pub fn write_into_lmdb_database( wtxn: &mut heed::RwTxn, database: heed::PolyDatabase, - mut reader: Reader, + reader: Reader, merge: MergeFn, ) -> Result<()> { debug!("Writing MTBL stores..."); let before = Instant::now(); - while let Some((k, v)) = reader.next()? { + let mut cursor = reader.into_cursor()?; + while let Some((k, v)) = cursor.move_on_next()? { let mut iter = database.prefix_iter_mut::<_, ByteSlice, ByteSlice>(wtxn, k)?; match iter.next().transpose()? { Some((key, old_val)) if key == k => { @@ -201,19 +209,19 @@ pub fn sorter_into_lmdb_database( debug!("Writing MTBL sorter..."); let before = Instant::now(); - merger_iter_into_lmdb_database(wtxn, database, sorter.into_merger_iter()?, merge)?; + merger_iter_into_lmdb_database(wtxn, database, sorter.into_stream_merger_iter()?, merge)?; debug!("MTBL sorter writen in {:.02?}!", before.elapsed()); Ok(()) } -fn merger_iter_into_lmdb_database( +fn merger_iter_into_lmdb_database( wtxn: &mut heed::RwTxn, database: heed::PolyDatabase, - mut sorter: MergerIter, + mut merger_iter: MergerIter, merge: MergeFn, ) -> Result<()> { - while let Some((k, v)) = sorter.next()? { + while let Some((k, v)) = merger_iter.next()? { let mut iter = database.prefix_iter_mut::<_, ByteSlice, ByteSlice>(wtxn, k)?; match iter.next().transpose()? { Some((key, old_val)) if key == k => { diff --git a/milli/src/update/index_documents/transform.rs b/milli/src/update/index_documents/transform.rs index f5fb1ec01..4ec34c0c6 100644 --- a/milli/src/update/index_documents/transform.rs +++ b/milli/src/update/index_documents/transform.rs @@ -277,7 +277,7 @@ impl<'a, 'i> Transform<'a, 'i> { let mut available_documents_ids = AvailableDocumentsIds::from_documents_ids(&documents_ids); // consume sorter, in order to free the internal allocation, before creating a new one. - let mut iter = self.sorter.into_merger_iter()?; + let mut iter = self.sorter.into_stream_merger_iter()?; // Once we have sort and deduplicated the documents we write them into a final file. let mut final_sorter = create_sorter( @@ -374,16 +374,15 @@ impl<'a, 'i> Transform<'a, 'i> { }); // We create a final writer to write the new documents in order from the sorter. - let file = tempfile::tempfile()?; let mut writer = create_writer( self.indexer_settings.chunk_compression_type, self.indexer_settings.chunk_compression_level, - file, - )?; + tempfile::tempfile()?, + ); // Once we have written all the documents into the final sorter, we write the documents // into this writer, extract the file and reset the seek to be able to read it again. - final_sorter.write_into(&mut writer)?; + final_sorter.write_into_stream_writer(&mut writer)?; let mut documents_file = writer.into_inner()?; documents_file.seek(SeekFrom::Start(0))?; @@ -424,12 +423,11 @@ impl<'a, 'i> Transform<'a, 'i> { let documents_count = documents_ids.len() as usize; // We create a final writer to write the new documents in order from the sorter. - let file = tempfile::tempfile()?; let mut writer = create_writer( self.indexer_settings.chunk_compression_type, self.indexer_settings.chunk_compression_level, - file, - )?; + tempfile::tempfile()?, + ); let mut obkv_buffer = Vec::new(); for result in self.index.documents.iter(wtxn)? { diff --git a/milli/src/update/index_documents/typed_chunk.rs b/milli/src/update/index_documents/typed_chunk.rs index 7f0cfcab3..3c77de7a1 100644 --- a/milli/src/update/index_documents/typed_chunk.rs +++ b/milli/src/update/index_documents/typed_chunk.rs @@ -1,6 +1,7 @@ use std::borrow::Cow; use std::convert::TryInto; use std::fs::File; +use std::io; use heed::types::ByteSlice; use heed::{BytesDecode, RwTxn}; @@ -65,8 +66,9 @@ pub(crate) fn write_typed_chunk_into_index( }, )?; } - TypedChunk::Documents(mut obkv_documents_iter) => { - while let Some((key, value)) = obkv_documents_iter.next()? { + TypedChunk::Documents(obkv_documents_iter) => { + let mut cursor = obkv_documents_iter.into_cursor()?; + while let Some((key, value)) = cursor.move_on_next()? { index.documents.remap_types::().put(wtxn, key, value)?; } } @@ -85,7 +87,7 @@ pub(crate) fn write_typed_chunk_into_index( return Ok((documents_ids, is_merged_database)) } TypedChunk::WordDocids(word_docids_iter) => { - let mut word_docids_iter = unsafe { into_clonable_grenad(word_docids_iter) }?; + let word_docids_iter = unsafe { into_clonable_grenad(word_docids_iter) }?; append_entries_into_database( word_docids_iter.clone(), &index.word_docids, @@ -97,7 +99,8 @@ pub(crate) fn write_typed_chunk_into_index( // create fst from word docids let mut builder = fst::SetBuilder::memory(); - while let Some((word, _value)) = word_docids_iter.next()? { + let mut cursor = word_docids_iter.into_cursor()?; + while let Some((word, _value)) = cursor.move_on_next()? { // This is a lexicographically ordered word position // we use the key to construct the words fst. builder.insert(word)?; @@ -146,19 +149,21 @@ pub(crate) fn write_typed_chunk_into_index( )?; is_merged_database = true; } - TypedChunk::FieldIdDocidFacetNumbers(mut fid_docid_facet_number) => { + TypedChunk::FieldIdDocidFacetNumbers(fid_docid_facet_number) => { let index_fid_docid_facet_numbers = index.field_id_docid_facet_f64s.remap_types::(); - while let Some((key, value)) = fid_docid_facet_number.next()? { + let mut cursor = fid_docid_facet_number.into_cursor()?; + while let Some((key, value)) = cursor.move_on_next()? { if valid_lmdb_key(key) { index_fid_docid_facet_numbers.put(wtxn, key, &value)?; } } } - TypedChunk::FieldIdDocidFacetStrings(mut fid_docid_facet_string) => { + TypedChunk::FieldIdDocidFacetStrings(fid_docid_facet_string) => { let index_fid_docid_facet_strings = index.field_id_docid_facet_strings.remap_types::(); - while let Some((key, value)) = fid_docid_facet_string.next()? { + let mut cursor = fid_docid_facet_string.into_cursor()?; + while let Some((key, value)) = cursor.move_on_next()? { if valid_lmdb_key(key) { index_fid_docid_facet_strings.put(wtxn, key, &value)?; } @@ -183,11 +188,12 @@ pub(crate) fn write_typed_chunk_into_index( )?; is_merged_database = true; } - TypedChunk::GeoPoints(mut geo_points) => { + TypedChunk::GeoPoints(geo_points) => { let mut rtree = index.geo_rtree(wtxn)?.unwrap_or_default(); let mut geo_faceted_docids = index.geo_faceted_documents_ids(wtxn)?; - while let Some((key, value)) = geo_points.next()? { + let mut cursor = geo_points.into_cursor()?; + while let Some((key, value)) = cursor.move_on_next()? { // convert the key back to a u32 (4 bytes) let docid = key.try_into().map(DocumentId::from_be_bytes).unwrap(); @@ -229,7 +235,7 @@ fn merge_cbo_roaring_bitmaps( /// Write provided entries in database using serialize_value function. /// merge_values function is used if an entry already exist in the database. fn write_entries_into_database( - mut data: grenad::Reader, + data: grenad::Reader, database: &heed::Database, wtxn: &mut RwTxn, index_is_empty: bool, @@ -237,14 +243,15 @@ fn write_entries_into_database( merge_values: FM, ) -> Result<()> where - R: std::io::Read, + R: io::Read + io::Seek, FS: for<'a> Fn(&'a [u8], &'a mut Vec) -> Result<&'a [u8]>, FM: Fn(&[u8], &[u8], &mut Vec) -> Result<()>, { let mut buffer = Vec::new(); let database = database.remap_types::(); - while let Some((key, value)) = data.next()? { + let mut cursor = data.into_cursor()?; + while let Some((key, value)) = cursor.move_on_next()? { if valid_lmdb_key(key) { buffer.clear(); let value = if index_is_empty { @@ -270,7 +277,7 @@ where /// All provided entries must be ordered. /// If the index is not empty, write_entries_into_database is called instead. fn append_entries_into_database( - mut data: grenad::Reader, + data: grenad::Reader, database: &heed::Database, wtxn: &mut RwTxn, index_is_empty: bool, @@ -278,7 +285,7 @@ fn append_entries_into_database( merge_values: FM, ) -> Result<()> where - R: std::io::Read, + R: io::Read + io::Seek, FS: for<'a> Fn(&'a [u8], &'a mut Vec) -> Result<&'a [u8]>, FM: Fn(&[u8], &[u8], &mut Vec) -> Result<()>, { @@ -296,7 +303,8 @@ where let mut buffer = Vec::new(); let mut database = database.iter_mut(wtxn)?.remap_types::(); - while let Some((key, value)) = data.next()? { + let mut cursor = data.into_cursor()?; + while let Some((key, value)) = cursor.move_on_next()? { if valid_lmdb_key(key) { buffer.clear(); let value = serialize_value(value, &mut buffer)?; diff --git a/milli/src/update/word_prefix_docids.rs b/milli/src/update/word_prefix_docids.rs index 624037f8f..0bb5edb9a 100644 --- a/milli/src/update/word_prefix_docids.rs +++ b/milli/src/update/word_prefix_docids.rs @@ -51,8 +51,10 @@ impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> { ); let mut word_docids_merger = MergerBuilder::new(merge_roaring_bitmaps); - word_docids_merger.extend(new_word_docids); - let mut word_docids_iter = word_docids_merger.build().into_merger_iter()?; + for reader in new_word_docids { + word_docids_merger.push(reader.into_cursor()?); + } + let mut word_docids_iter = word_docids_merger.build().into_stream_merger_iter()?; let mut current_prefixes: Option<&&[String]> = None; let mut prefixes_cache = HashMap::new(); diff --git a/milli/src/update/word_prefix_pair_proximity_docids.rs b/milli/src/update/word_prefix_pair_proximity_docids.rs index 530c2867e..b498d5850 100644 --- a/milli/src/update/word_prefix_pair_proximity_docids.rs +++ b/milli/src/update/word_prefix_pair_proximity_docids.rs @@ -77,8 +77,10 @@ impl<'t, 'u, 'i> WordPrefixPairProximityDocids<'t, 'u, 'i> { // We retrieve and merge the created word pair proximities docids entries // for the newly added documents. let mut wppd_merger = MergerBuilder::new(merge_cbo_roaring_bitmaps); - wppd_merger.extend(new_word_pair_proximity_docids); - let mut wppd_iter = wppd_merger.build().into_merger_iter()?; + for reader in new_word_pair_proximity_docids { + wppd_merger.push(reader.into_cursor()?); + } + let mut wppd_iter = wppd_merger.build().into_stream_merger_iter()?; let mut word_prefix_pair_proximity_docids_sorter = create_sorter( merge_cbo_roaring_bitmaps, diff --git a/milli/src/update/words_prefix_position_docids.rs b/milli/src/update/words_prefix_position_docids.rs index c992d01ec..9e15f4d6c 100644 --- a/milli/src/update/words_prefix_position_docids.rs +++ b/milli/src/update/words_prefix_position_docids.rs @@ -73,9 +73,11 @@ impl<'t, 'u, 'i> WordPrefixPositionDocids<'t, 'u, 'i> { ); let mut word_position_docids_merger = MergerBuilder::new(merge_cbo_roaring_bitmaps); - word_position_docids_merger.extend(new_word_position_docids); + for reader in new_word_position_docids { + word_position_docids_merger.push(reader.into_cursor()?); + } let mut word_position_docids_iter = - word_position_docids_merger.build().into_merger_iter()?; + word_position_docids_merger.build().into_stream_merger_iter()?; // We fetch all the new common prefixes between the previous and new prefix fst. let mut buffer = Vec::new(); From ff8d7a810de935db3f35583e3c3dba34d1ca32a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Wed, 16 Feb 2022 15:40:08 +0100 Subject: [PATCH 14/14] Change the behavior of the as_cloneable_grenad by taking a ref --- .../src/update/index_documents/extract/mod.rs | 10 ++--- .../index_documents/helpers/grenad_helpers.rs | 8 ++-- .../src/update/index_documents/helpers/mod.rs | 2 +- milli/src/update/index_documents/mod.rs | 42 ++++--------------- .../src/update/index_documents/typed_chunk.rs | 4 +- 5 files changed, 21 insertions(+), 45 deletions(-) diff --git a/milli/src/update/index_documents/extract/mod.rs b/milli/src/update/index_documents/extract/mod.rs index 0f04418ed..4c81b9334 100644 --- a/milli/src/update/index_documents/extract/mod.rs +++ b/milli/src/update/index_documents/extract/mod.rs @@ -25,7 +25,7 @@ use self::extract_word_docids::extract_word_docids; use self::extract_word_pair_proximity_docids::extract_word_pair_proximity_docids; use self::extract_word_position_docids::extract_word_position_docids; use super::helpers::{ - into_clonable_grenad, keep_first_prefix_value_merge_roaring_bitmaps, merge_cbo_roaring_bitmaps, + as_cloneable_grenad, keep_first_prefix_value_merge_roaring_bitmaps, merge_cbo_roaring_bitmaps, merge_readers, merge_roaring_bitmaps, CursorClonableMmap, GrenadParameters, MergeFn, }; use super::{helpers, TypedChunk}; @@ -184,7 +184,7 @@ fn extract_documents_data( grenad::Reader, (grenad::Reader, grenad::Reader), )> { - let documents_chunk = documents_chunk.and_then(|c| unsafe { into_clonable_grenad(c) })?; + let documents_chunk = documents_chunk.and_then(|c| unsafe { as_cloneable_grenad(&c) })?; let _ = lmdb_writer_sx.send(Ok(TypedChunk::Documents(documents_chunk.clone()))); @@ -217,7 +217,7 @@ fn extract_documents_data( // send docid_word_positions_chunk to DB writer let docid_word_positions_chunk = - unsafe { into_clonable_grenad(docid_word_positions_chunk)? }; + unsafe { as_cloneable_grenad(&docid_word_positions_chunk)? }; let _ = lmdb_writer_sx .send(Ok(TypedChunk::DocidWordPositions(docid_word_positions_chunk.clone()))); @@ -233,7 +233,7 @@ fn extract_documents_data( // send docid_fid_facet_numbers_chunk to DB writer let docid_fid_facet_numbers_chunk = - unsafe { into_clonable_grenad(docid_fid_facet_numbers_chunk)? }; + unsafe { as_cloneable_grenad(&docid_fid_facet_numbers_chunk)? }; let _ = lmdb_writer_sx.send(Ok(TypedChunk::FieldIdDocidFacetNumbers( docid_fid_facet_numbers_chunk.clone(), @@ -241,7 +241,7 @@ fn extract_documents_data( // send docid_fid_facet_strings_chunk to DB writer let docid_fid_facet_strings_chunk = - unsafe { into_clonable_grenad(docid_fid_facet_strings_chunk)? }; + unsafe { as_cloneable_grenad(&docid_fid_facet_strings_chunk)? }; let _ = lmdb_writer_sx.send(Ok(TypedChunk::FieldIdDocidFacetStrings( docid_fid_facet_strings_chunk.clone(), diff --git a/milli/src/update/index_documents/helpers/grenad_helpers.rs b/milli/src/update/index_documents/helpers/grenad_helpers.rs index ec4a32755..ded74b2af 100644 --- a/milli/src/update/index_documents/helpers/grenad_helpers.rs +++ b/milli/src/update/index_documents/helpers/grenad_helpers.rs @@ -68,11 +68,11 @@ pub fn writer_into_reader(writer: grenad::Writer) -> Result, +pub unsafe fn as_cloneable_grenad( + reader: &grenad::Reader, ) -> Result> { - let file = reader.into_inner(); - let mmap = memmap2::Mmap::map(&file)?; + let file = reader.get_ref(); + let mmap = memmap2::Mmap::map(file)?; let cursor = io::Cursor::new(ClonableMmap::from(mmap)); let reader = grenad::Reader::new(cursor)?; Ok(reader) diff --git a/milli/src/update/index_documents/helpers/mod.rs b/milli/src/update/index_documents/helpers/mod.rs index bbb2b9b95..22c1cfd6c 100644 --- a/milli/src/update/index_documents/helpers/mod.rs +++ b/milli/src/update/index_documents/helpers/mod.rs @@ -8,7 +8,7 @@ use std::convert::{TryFrom, TryInto}; pub use clonable_mmap::{ClonableMmap, CursorClonableMmap}; use fst::{IntoStreamer, Streamer}; pub use grenad_helpers::{ - create_sorter, create_writer, grenad_obkv_into_chunks, into_clonable_grenad, merge_readers, + as_cloneable_grenad, create_sorter, create_writer, grenad_obkv_into_chunks, merge_readers, sorter_into_lmdb_database, sorter_into_reader, write_into_lmdb_database, writer_into_reader, GrenadParameters, }; diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index a31d1875b..c69aae809 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -16,9 +16,9 @@ use slice_group_by::GroupBy; use typed_chunk::{write_typed_chunk_into_index, TypedChunk}; pub use self::helpers::{ - create_sorter, create_writer, fst_stream_into_hashset, fst_stream_into_vec, - merge_cbo_roaring_bitmaps, merge_roaring_bitmaps, sorter_into_lmdb_database, - write_into_lmdb_database, writer_into_reader, ClonableMmap, MergeFn, + as_cloneable_grenad, create_sorter, create_writer, fst_stream_into_hashset, + fst_stream_into_vec, merge_cbo_roaring_bitmaps, merge_roaring_bitmaps, + sorter_into_lmdb_database, write_into_lmdb_database, writer_into_reader, ClonableMmap, MergeFn, }; use self::helpers::{grenad_obkv_into_chunks, GrenadParameters}; pub use self::transform::{Transform, TransformOutput}; @@ -292,42 +292,18 @@ where for result in lmdb_writer_rx { let typed_chunk = match result? { TypedChunk::WordDocids(chunk) => { - // We extract and mmap our chunk file to be able to get it for next processes. - let mut file = chunk.into_inner(); - let mmap = unsafe { memmap2::Mmap::map(&file)? }; - let cursor_mmap = CursorClonableMmap::new(ClonableMmap::from(mmap)); - let chunk = grenad::Reader::new(cursor_mmap)?; - word_docids.push(chunk); - - // We reconstruct our typed-chunk back. - file.rewind()?; - let chunk = grenad::Reader::new(file)?; + let cloneable_chunk = unsafe { as_cloneable_grenad(&chunk)? }; + word_docids.push(cloneable_chunk); TypedChunk::WordDocids(chunk) } TypedChunk::WordPairProximityDocids(chunk) => { - // We extract and mmap our chunk file to be able to get it for next processes. - let mut file = chunk.into_inner(); - let mmap = unsafe { memmap2::Mmap::map(&file)? }; - let cursor_mmap = CursorClonableMmap::new(ClonableMmap::from(mmap)); - let chunk = grenad::Reader::new(cursor_mmap)?; - word_pair_proximity_docids.push(chunk); - - // We reconstruct our typed-chunk back. - file.rewind()?; - let chunk = grenad::Reader::new(file)?; + let cloneable_chunk = unsafe { as_cloneable_grenad(&chunk)? }; + word_pair_proximity_docids.push(cloneable_chunk); TypedChunk::WordPairProximityDocids(chunk) } TypedChunk::WordPositionDocids(chunk) => { - // We extract and mmap our chunk file to be able to get it for next processes. - let mut file = chunk.into_inner(); - let mmap = unsafe { memmap2::Mmap::map(&file)? }; - let cursor_mmap = CursorClonableMmap::new(ClonableMmap::from(mmap)); - let chunk = grenad::Reader::new(cursor_mmap)?; - word_position_docids.push(chunk); - - // We reconstruct our typed-chunk back. - file.rewind()?; - let chunk = grenad::Reader::new(file)?; + let cloneable_chunk = unsafe { as_cloneable_grenad(&chunk)? }; + word_position_docids.push(cloneable_chunk); TypedChunk::WordPositionDocids(chunk) } otherwise => otherwise, diff --git a/milli/src/update/index_documents/typed_chunk.rs b/milli/src/update/index_documents/typed_chunk.rs index 3c77de7a1..77ea31138 100644 --- a/milli/src/update/index_documents/typed_chunk.rs +++ b/milli/src/update/index_documents/typed_chunk.rs @@ -12,7 +12,7 @@ use super::helpers::{ CursorClonableMmap, }; use crate::heed_codec::facet::{decode_prefix_string, encode_prefix_string}; -use crate::update::index_documents::helpers::into_clonable_grenad; +use crate::update::index_documents::helpers::as_cloneable_grenad; use crate::{ lat_lng_to_xyz, BoRoaringBitmapCodec, CboRoaringBitmapCodec, DocumentId, GeoPoint, Index, Result, @@ -87,7 +87,7 @@ pub(crate) fn write_typed_chunk_into_index( return Ok((documents_ids, is_merged_database)) } TypedChunk::WordDocids(word_docids_iter) => { - let word_docids_iter = unsafe { into_clonable_grenad(word_docids_iter) }?; + let word_docids_iter = unsafe { as_cloneable_grenad(&word_docids_iter) }?; append_entries_into_database( word_docids_iter.clone(), &index.word_docids,