From 822f67e9ad1f17b8921c11aefd3d715170c96b70 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Tue, 18 Jan 2022 14:59:51 +0100 Subject: [PATCH] Bring the newly created word pair proximity docids --- milli/src/update/index_documents/mod.rs | 37 ++++++++++++++++--- .../word_prefix_pair_proximity_docids.rs | 9 ++++- 2 files changed, 38 insertions(+), 8 deletions(-) diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index cdea37d54..0ef05dba5 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -16,11 +16,12 @@ use typed_chunk::{write_typed_chunk_into_index, TypedChunk}; pub use self::helpers::{ create_sorter, create_writer, merge_cbo_roaring_bitmaps, merge_roaring_bitmaps, - sorter_into_lmdb_database, write_into_lmdb_database, writer_into_reader, MergeFn, + sorter_into_lmdb_database, write_into_lmdb_database, writer_into_reader, ClonableMmap, MergeFn, }; use self::helpers::{grenad_obkv_into_chunks, GrenadParameters}; pub use self::transform::{Transform, TransformOutput}; use crate::documents::DocumentBatchReader; +pub use crate::update::index_documents::helpers::CursorClonableMmap; use crate::update::{ self, Facets, IndexerConfig, UpdateIndexingStep, WordPrefixDocids, WordPrefixPairProximityDocids, WordPrefixPositionDocids, WordsPrefixesFst, @@ -282,6 +283,7 @@ where let index_documents_ids = self.index.documents_ids(self.wtxn)?; let index_is_empty = index_documents_ids.len() == 0; let mut final_documents_ids = RoaringBitmap::new(); + let mut word_pair_proximity_docids = Vec::new(); let mut databases_seen = 0; (self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase { @@ -289,9 +291,26 @@ where total_databases: TOTAL_POSTING_DATABASE_COUNT, }); - for typed_chunk in lmdb_writer_rx { + for result in lmdb_writer_rx { + let typed_chunk = match result? { + TypedChunk::WordPairProximityDocids(chunk) => { + // We extract and mmap our chunk file to be able to get it for next processes. + let mut file = chunk.into_inner(); + let mmap = unsafe { memmap2::Mmap::map(&file)? }; + let cursor_mmap = CursorClonableMmap::new(ClonableMmap::from(mmap)); + let chunk = grenad::Reader::new(cursor_mmap)?; + word_pair_proximity_docids.push(chunk); + + // We reconstruct our typed-chunk back. + file.rewind()?; + let chunk = grenad::Reader::new(file)?; + TypedChunk::WordPairProximityDocids(chunk) + } + otherwise => otherwise, + }; + let (docids, is_merged_database) = - write_typed_chunk_into_index(typed_chunk?, &self.index, self.wtxn, index_is_empty)?; + write_typed_chunk_into_index(typed_chunk, &self.index, self.wtxn, index_is_empty)?; if !docids.is_empty() { final_documents_ids |= docids; let documents_seen_count = final_documents_ids.len(); @@ -325,13 +344,19 @@ where let all_documents_ids = index_documents_ids | new_documents_ids | replaced_documents_ids; self.index.put_documents_ids(self.wtxn, &all_documents_ids)?; - self.execute_prefix_databases()?; + self.execute_prefix_databases(word_pair_proximity_docids)?; Ok(all_documents_ids.len()) } #[logging_timer::time("IndexDocuments::{}")] - pub fn execute_prefix_databases(self) -> Result<()> { + pub fn execute_prefix_databases( + self, + word_pair_proximity_docids: Vec>, + ) -> Result<()> + where + F: Fn(UpdateIndexingStep) + Sync, + { // Merged databases are already been indexed, we start from this count; let mut databases_seen = MERGED_DATABASE_COUNT; @@ -392,7 +417,7 @@ where builder.chunk_compression_level = self.indexer_config.chunk_compression_level; builder.max_nb_chunks = self.indexer_config.max_nb_chunks; builder.max_memory = self.indexer_config.max_memory; - builder.execute(&previous_words_prefixes_fst)?; + builder.execute(word_pair_proximity_docids, &previous_words_prefixes_fst)?; databases_seen += 1; (self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase { diff --git a/milli/src/update/word_prefix_pair_proximity_docids.rs b/milli/src/update/word_prefix_pair_proximity_docids.rs index 2788d5d35..6a49a0a63 100644 --- a/milli/src/update/word_prefix_pair_proximity_docids.rs +++ b/milli/src/update/word_prefix_pair_proximity_docids.rs @@ -7,7 +7,8 @@ use log::debug; use slice_group_by::GroupBy; use crate::update::index_documents::{ - create_sorter, merge_cbo_roaring_bitmaps, sorter_into_lmdb_database, MergeFn, WriteMethod, + create_sorter, merge_cbo_roaring_bitmaps, sorter_into_lmdb_database, CursorClonableMmap, + MergeFn, WriteMethod, }; use crate::{Index, Result}; @@ -61,7 +62,11 @@ impl<'t, 'u, 'i> WordPrefixPairProximityDocids<'t, 'u, 'i> { } #[logging_timer::time("WordPrefixPairProximityDocids::{}")] - pub fn execute>(self, old_prefix_fst: &fst::Set) -> Result<()> { + pub fn execute>( + self, + new_word_pair_proximity_docids: Vec>, + old_prefix_fst: &fst::Set, + ) -> Result<()> { debug!("Computing and writing the word prefix pair proximity docids into LMDB on disk..."); self.index.word_prefix_pair_proximity_docids.clear(self.wtxn)?;