From e0dc41352187948ddffded401dcd3ff73e372e3e Mon Sep 17 00:00:00 2001 From: ManyTheFish Date: Thu, 19 Oct 2023 10:22:39 +0200 Subject: [PATCH 1/4] Make script language docids map taking a tuple of roaring bitmaps expressing the deletions and the additions --- .../extract/extract_docid_word_positions.rs | 21 ++++---- .../src/update/index_documents/typed_chunk.rs | 48 ++++++++----------- 2 files changed, 29 insertions(+), 40 deletions(-) diff --git a/milli/src/update/index_documents/extract/extract_docid_word_positions.rs b/milli/src/update/index_documents/extract/extract_docid_word_positions.rs index eb6e9768f..925ca2d10 100644 --- a/milli/src/update/index_documents/extract/extract_docid_word_positions.rs +++ b/milli/src/update/index_documents/extract/extract_docid_word_positions.rs @@ -13,7 +13,7 @@ use crate::error::{InternalError, SerializationError}; use crate::update::del_add::{del_add_from_two_obkvs, DelAdd, KvReaderDelAdd}; use crate::{FieldId, Result, MAX_POSITION_PER_ATTRIBUTE, MAX_WORD_LENGTH}; -pub type ScriptLanguageDocidsMap = HashMap<(Script, Language), RoaringBitmap>; +pub type ScriptLanguageDocidsMap = HashMap<(Script, Language), (RoaringBitmap, RoaringBitmap)>; /// Extracts the word and positions where this word appear and /// prefixes it by the document id. @@ -29,8 +29,7 @@ pub fn extract_docid_word_positions( allowed_separators: Option<&[&str]>, dictionary: Option<&[&str]>, max_positions_per_attributes: Option, -) -> Result<(RoaringBitmap, grenad::Reader, (ScriptLanguageDocidsMap, ScriptLanguageDocidsMap))> -{ +) -> Result<(RoaringBitmap, grenad::Reader, ScriptLanguageDocidsMap)> { puffin::profile_function!(); let max_positions_per_attributes = max_positions_per_attributes @@ -39,8 +38,7 @@ pub fn extract_docid_word_positions( // initialize destination values. let mut documents_ids = RoaringBitmap::new(); - let mut del_script_language_docids = HashMap::new(); - let mut add_script_language_docids = HashMap::new(); + let mut script_language_docids = HashMap::new(); let mut docid_word_positions_sorter = create_sorter( grenad::SortAlgorithm::Stable, keep_latest_obkv, @@ -134,25 +132,24 @@ pub fn extract_docid_word_positions( // update script_language_docids deletions. for (script, languages_frequency) in del_script_language_word_count { for (language, _) in languages_frequency { - let entry = del_script_language_docids + let entry = script_language_docids .entry((script, language)) - .or_insert_with(RoaringBitmap::new); - entry.push(document_id); + .or_insert_with(|| (RoaringBitmap::new(), RoaringBitmap::new())); + entry.0.push(document_id); } } // update script_language_docids additions. for (script, languages_frequency) in add_script_language_word_count { for (language, _) in languages_frequency { - let entry = add_script_language_docids + let entry = script_language_docids .entry((script, language)) - .or_insert_with(RoaringBitmap::new); - entry.push(document_id); + .or_insert_with(|| (RoaringBitmap::new(), RoaringBitmap::new())); + entry.1.push(document_id); } } } - let script_language_docids = (del_script_language_docids, add_script_language_docids); sorter_into_reader(docid_word_positions_sorter, indexer) .map(|reader| (documents_ids, reader, script_language_docids)) } diff --git a/milli/src/update/index_documents/typed_chunk.rs b/milli/src/update/index_documents/typed_chunk.rs index 3d78e6235..c83769fb5 100644 --- a/milli/src/update/index_documents/typed_chunk.rs +++ b/milli/src/update/index_documents/typed_chunk.rs @@ -43,9 +43,7 @@ pub(crate) enum TypedChunk { FieldIdFacetIsEmptyDocids(grenad::Reader), GeoPoints(grenad::Reader), VectorPoints(grenad::Reader), - ScriptLanguageDocids( - (HashMap<(Script, Language), RoaringBitmap>, HashMap<(Script, Language), RoaringBitmap>), - ), + ScriptLanguageDocids(HashMap<(Script, Language), (RoaringBitmap, RoaringBitmap)>), } impl TypedChunk { @@ -103,8 +101,8 @@ impl TypedChunk { TypedChunk::VectorPoints(grenad) => { format!("VectorPoints {{ number_of_entries: {} }}", grenad.len()) } - TypedChunk::ScriptLanguageDocids((_, addition)) => { - format!("ScriptLanguageDocids {{ number_of_entries: {} }}", addition.len()) + TypedChunk::ScriptLanguageDocids(sl_map) => { + format!("ScriptLanguageDocids {{ number_of_entries: {} }}", sl_map.len()) } } } @@ -346,24 +344,25 @@ pub(crate) fn write_typed_chunk_into_index( log::debug!("There are {} entries in the HNSW so far", hnsw_length); index.put_vector_hnsw(wtxn, &new_hnsw)?; } - TypedChunk::ScriptLanguageDocids((deletion, addition)) => { - for (key, value) in deletion { - if let Some(mut db_values) = index.script_language_docids.get(wtxn, &key)? { - db_values -= value; - if db_values.is_empty() { - index.script_language_docids.delete(wtxn, &key)?; - } else { - index.script_language_docids.put(wtxn, &key, &db_values)?; - } - } - } - - for (key, value) in addition { + TypedChunk::ScriptLanguageDocids(sl_map) => { + for (key, (deletion, addition)) in sl_map { + let mut db_key_exists = false; let final_value = match index.script_language_docids.get(wtxn, &key)? { - Some(mut db_values) => db_values | value, - None => value, + Some(db_values) => { + db_key_exists = true; + (db_values - deletion) | addition + } + None => addition, }; - index.script_language_docids.put(wtxn, &key, &final_value)?; + + if final_value.is_empty() { + // If the database entry exists, delete it. + if db_key_exists == true { + index.script_language_docids.delete(wtxn, &key)?; + } + } else { + index.script_language_docids.put(wtxn, &key, &final_value)?; + } } } } @@ -388,13 +387,6 @@ fn merge_word_docids_reader_into_fst( Ok(builder.into_set()) } -fn merge_roaring_bitmaps(new_value: &[u8], db_value: &[u8], buffer: &mut Vec) -> Result<()> { - let new_value = RoaringBitmap::deserialize_from(new_value)?; - let db_value = RoaringBitmap::deserialize_from(db_value)?; - let value = new_value | db_value; - Ok(serialize_roaring_bitmap(&value, buffer)?) -} - fn merge_cbo_roaring_bitmaps( new_value: &[u8], db_value: &[u8], From d50408d670513119b58866bc87d1084b958f45a8 Mon Sep 17 00:00:00 2001 From: ManyTheFish Date: Thu, 19 Oct 2023 11:58:31 +0200 Subject: [PATCH 2/4] update extract word docids --- milli/src/update/del_add.rs | 4 ++ .../extract/extract_word_docids.rs | 70 +++++++++++++++---- 2 files changed, 59 insertions(+), 15 deletions(-) diff --git a/milli/src/update/del_add.rs b/milli/src/update/del_add.rs index 346ae0afa..c8b7f0f6a 100644 --- a/milli/src/update/del_add.rs +++ b/milli/src/update/del_add.rs @@ -98,3 +98,7 @@ pub fn del_add_from_two_obkvs( writer.finish() } + +pub fn is_noop_del_add_obkv(del_add: KvReaderDelAdd) -> bool { + del_add.get(DelAdd::Deletion) == del_add.get(DelAdd::Addition) +} diff --git a/milli/src/update/index_documents/extract/extract_word_docids.rs b/milli/src/update/index_documents/extract/extract_word_docids.rs index a9821e2f5..462c8ae25 100644 --- a/milli/src/update/index_documents/extract/extract_word_docids.rs +++ b/milli/src/update/index_documents/extract/extract_word_docids.rs @@ -6,12 +6,13 @@ use heed::BytesDecode; use obkv::KvReaderU16; use super::helpers::{ - create_sorter, create_writer, merge_cbo_roaring_bitmaps, sorter_into_reader, + create_sorter, create_writer, merge_deladd_cbo_roaring_bitmaps, sorter_into_reader, try_split_array_at, writer_into_reader, GrenadParameters, }; use crate::error::SerializationError; use crate::heed_codec::StrBEU16Codec; use crate::index::db_name::DOCID_WORD_POSITIONS; +use crate::update::del_add::{is_noop_del_add_obkv, DelAdd, KvReaderDelAdd, KvWriterDelAdd}; use crate::update::MergeFn; use crate::{DocumentId, FieldId, Result}; @@ -34,14 +35,15 @@ pub fn extract_word_docids( let mut word_fid_docids_sorter = create_sorter( grenad::SortAlgorithm::Unstable, - merge_cbo_roaring_bitmaps, + merge_deladd_cbo_roaring_bitmaps, indexer.chunk_compression_type, indexer.chunk_compression_level, indexer.max_nb_chunks, max_memory.map(|x| x / 3), ); let mut key_buffer = Vec::new(); - let mut words = BTreeSet::new(); + let mut del_words = BTreeSet::new(); + let mut add_words = BTreeSet::new(); let mut cursor = docid_word_positions.into_cursor()?; while let Some((key, value)) = cursor.move_on_next()? { let (document_id_bytes, fid_bytes) = try_split_array_at(key) @@ -51,24 +53,37 @@ pub fn extract_word_docids( let document_id = u32::from_be_bytes(document_id_bytes); let fid = u16::from_be_bytes(fid_bytes); - for (_pos, word) in KvReaderU16::new(&value).iter() { - words.insert(word.to_vec()); + let del_add_reader = KvReaderDelAdd::new(&value); + // extract all unique words to remove. + if let Some(deletion) = del_add_reader.get(DelAdd::Deletion) { + for (_pos, word) in KvReaderU16::new(&deletion).iter() { + del_words.insert(word.to_vec()); + } + } + + // extract all unique additional words. + if let Some(addition) = del_add_reader.get(DelAdd::Addition) { + for (_pos, word) in KvReaderU16::new(&addition).iter() { + add_words.insert(word.to_vec()); + } } words_into_sorter( document_id, fid, &mut key_buffer, - &mut words, + &del_words, + &add_words, &mut word_fid_docids_sorter, )?; - words.clear(); + del_words.clear(); + add_words.clear(); } let mut word_docids_sorter = create_sorter( grenad::SortAlgorithm::Unstable, - merge_cbo_roaring_bitmaps, + merge_deladd_cbo_roaring_bitmaps, indexer.chunk_compression_type, indexer.chunk_compression_level, indexer.max_nb_chunks, @@ -77,7 +92,7 @@ pub fn extract_word_docids( let mut exact_word_docids_sorter = create_sorter( grenad::SortAlgorithm::Unstable, - merge_cbo_roaring_bitmaps, + merge_deladd_cbo_roaring_bitmaps, indexer.chunk_compression_type, indexer.chunk_compression_level, indexer.max_nb_chunks, @@ -91,8 +106,12 @@ pub fn extract_word_docids( ); let mut iter = word_fid_docids_sorter.into_stream_merger_iter()?; + // TODO: replace sorters by writers by accumulating values into a buffer before inserting them. while let Some((key, value)) = iter.next()? { - word_fid_docids_writer.insert(key, value)?; + // only keep the value if their is a change to apply in the DB. + if !is_noop_del_add_obkv(KvReaderDelAdd::new(value)) { + word_fid_docids_writer.insert(key, value)?; + } let (word, fid) = StrBEU16Codec::bytes_decode(key) .ok_or(SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?; @@ -116,20 +135,41 @@ fn words_into_sorter( document_id: DocumentId, fid: FieldId, key_buffer: &mut Vec, - words: &mut BTreeSet>, + del_words: &BTreeSet>, + add_words: &BTreeSet>, word_fid_docids_sorter: &mut grenad::Sorter, ) -> Result<()> { puffin::profile_function!(); - for word_bytes in words.iter() { + use itertools::merge_join_by; + use itertools::EitherOrBoth::{Both, Left, Right}; + + let mut buffer = Vec::new(); + for eob in merge_join_by(del_words.iter(), add_words.iter(), |d, a| d.cmp(a)) { + buffer.clear(); + let mut value_writer = KvWriterDelAdd::new(&mut buffer); + let word_bytes = match eob { + Left(word_bytes) => { + value_writer.insert(DelAdd::Deletion, document_id.to_ne_bytes()).unwrap(); + word_bytes + } + Right(word_bytes) => { + value_writer.insert(DelAdd::Addition, document_id.to_ne_bytes()).unwrap(); + word_bytes + } + Both(word_bytes, _) => { + value_writer.insert(DelAdd::Deletion, document_id.to_ne_bytes()).unwrap(); + value_writer.insert(DelAdd::Addition, document_id.to_ne_bytes()).unwrap(); + word_bytes + } + }; + key_buffer.clear(); key_buffer.extend_from_slice(&word_bytes); key_buffer.push(0); key_buffer.extend_from_slice(&fid.to_be_bytes()); - word_fid_docids_sorter.insert(&key_buffer, document_id.to_ne_bytes())?; + word_fid_docids_sorter.insert(&key_buffer, value_writer.into_inner().unwrap())?; } - words.clear(); - Ok(()) } From c63ff5298b9c279b8a9559d21ca586a23085e0e4 Mon Sep 17 00:00:00 2001 From: ManyTheFish Date: Thu, 19 Oct 2023 13:27:07 +0200 Subject: [PATCH 3/4] update extract word position docids --- .../extract/extract_word_position_docids.rs | 105 ++++++++++++++---- 1 file changed, 82 insertions(+), 23 deletions(-) diff --git a/milli/src/update/index_documents/extract/extract_word_position_docids.rs b/milli/src/update/index_documents/extract/extract_word_position_docids.rs index 7e336a150..1ac29e63c 100644 --- a/milli/src/update/index_documents/extract/extract_word_position_docids.rs +++ b/milli/src/update/index_documents/extract/extract_word_position_docids.rs @@ -1,15 +1,17 @@ -use std::collections::HashSet; +use std::collections::BTreeSet; use std::fs::File; use std::io; use obkv::KvReaderU16; use super::helpers::{ - create_sorter, merge_cbo_roaring_bitmaps, sorter_into_reader, try_split_array_at, + create_sorter, merge_deladd_cbo_roaring_bitmaps, sorter_into_reader, try_split_array_at, GrenadParameters, }; use crate::error::SerializationError; use crate::index::db_name::DOCID_WORD_POSITIONS; +use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd}; +use crate::update::MergeFn; use crate::{bucketed_position, DocumentId, Result}; /// Extracts the word positions and the documents ids where this word appear. @@ -27,14 +29,15 @@ pub fn extract_word_position_docids( let mut word_position_docids_sorter = create_sorter( grenad::SortAlgorithm::Unstable, - merge_cbo_roaring_bitmaps, + merge_deladd_cbo_roaring_bitmaps, indexer.chunk_compression_type, indexer.chunk_compression_level, indexer.max_nb_chunks, max_memory, ); - let mut word_positions: HashSet<(u16, Vec)> = HashSet::new(); + let mut del_word_positions: BTreeSet<(u16, Vec)> = BTreeSet::new(); + let mut add_word_positions: BTreeSet<(u16, Vec)> = BTreeSet::new(); let mut current_document_id: Option = None; let mut key_buffer = Vec::new(); let mut cursor = docid_word_positions.into_cursor()?; @@ -44,36 +47,92 @@ pub fn extract_word_position_docids( let document_id = DocumentId::from_be_bytes(document_id_bytes); if current_document_id.map_or(false, |id| document_id != id) { - for (position, word_bytes) in word_positions.iter() { - key_buffer.clear(); - key_buffer.extend_from_slice(word_bytes); - key_buffer.push(0); - key_buffer.extend_from_slice(&position.to_be_bytes()); - word_position_docids_sorter - .insert(&key_buffer, current_document_id.unwrap().to_ne_bytes())?; - } - word_positions.clear(); + words_position_into_sorter( + current_document_id.unwrap(), + &mut key_buffer, + &del_word_positions, + &add_word_positions, + &mut word_position_docids_sorter, + )?; + del_word_positions.clear(); + add_word_positions.clear(); } current_document_id = Some(document_id); - for (position, word_bytes) in KvReaderU16::new(&value).iter() { - let position = bucketed_position(position); - word_positions.insert((position, word_bytes.to_vec())); + let del_add_reader = KvReaderDelAdd::new(&value); + // extract all unique words to remove. + if let Some(deletion) = del_add_reader.get(DelAdd::Deletion) { + for (position, word_bytes) in KvReaderU16::new(deletion).iter() { + let position = bucketed_position(position); + del_word_positions.insert((position, word_bytes.to_vec())); + } + } + + // extract all unique additional words. + if let Some(addition) = del_add_reader.get(DelAdd::Addition) { + for (position, word_bytes) in KvReaderU16::new(addition).iter() { + let position = bucketed_position(position); + add_word_positions.insert((position, word_bytes.to_vec())); + } } } if let Some(document_id) = current_document_id { - for (position, word_bytes) in word_positions { - key_buffer.clear(); - key_buffer.extend_from_slice(&word_bytes); - key_buffer.push(0); - key_buffer.extend_from_slice(&position.to_be_bytes()); - word_position_docids_sorter.insert(&key_buffer, document_id.to_ne_bytes())?; - } + words_position_into_sorter( + document_id, + &mut key_buffer, + &del_word_positions, + &add_word_positions, + &mut word_position_docids_sorter, + )?; } + // TODO remove noop DelAdd OBKV let word_position_docids_reader = sorter_into_reader(word_position_docids_sorter, indexer)?; Ok(word_position_docids_reader) } + +fn words_position_into_sorter( + document_id: DocumentId, + key_buffer: &mut Vec, + del_word_positions: &BTreeSet<(u16, Vec)>, + add_word_positions: &BTreeSet<(u16, Vec)>, + word_position_docids_sorter: &mut grenad::Sorter, +) -> Result<()> { + puffin::profile_function!(); + + use itertools::merge_join_by; + use itertools::EitherOrBoth::{Both, Left, Right}; + + let mut buffer = Vec::new(); + for eob in merge_join_by(del_word_positions.iter(), add_word_positions.iter(), |d, a| d.cmp(a)) + { + buffer.clear(); + let mut value_writer = KvWriterDelAdd::new(&mut buffer); + let (position, word_bytes) = match eob { + Left(key) => { + value_writer.insert(DelAdd::Deletion, document_id.to_ne_bytes()).unwrap(); + key + } + Right(key) => { + value_writer.insert(DelAdd::Addition, document_id.to_ne_bytes()).unwrap(); + key + } + Both(key, _) => { + value_writer.insert(DelAdd::Deletion, document_id.to_ne_bytes()).unwrap(); + value_writer.insert(DelAdd::Addition, document_id.to_ne_bytes()).unwrap(); + key + } + }; + + key_buffer.clear(); + key_buffer.extend_from_slice(word_bytes); + key_buffer.push(0); + key_buffer.extend_from_slice(&position.to_be_bytes()); + word_position_docids_sorter.insert(&key_buffer, value_writer.into_inner().unwrap())?; + } + + Ok(()) +} From b8fed737ef20517839bc343fb40123d3eaa38baa Mon Sep 17 00:00:00 2001 From: ManyTheFish Date: Thu, 19 Oct 2023 14:18:14 +0200 Subject: [PATCH 4/4] update extract word pair proximity to support deladd obkvs --- .../extract_word_pair_proximity_docids.rs | 147 +++++++++++++----- 1 file changed, 109 insertions(+), 38 deletions(-) diff --git a/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs b/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs index 9a9c33fc0..80f8a30f8 100644 --- a/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs +++ b/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs @@ -1,16 +1,17 @@ -use std::collections::{HashMap, VecDeque}; +use std::collections::{BTreeMap, VecDeque}; use std::fs::File; use std::{cmp, io}; use obkv::KvReaderU16; use super::helpers::{ - create_sorter, create_writer, merge_cbo_roaring_bitmaps, sorter_into_reader, - try_split_array_at, writer_into_reader, GrenadParameters, MergeFn, + create_sorter, create_writer, merge_deladd_cbo_roaring_bitmaps, try_split_array_at, + writer_into_reader, GrenadParameters, MergeFn, }; use crate::error::SerializationError; use crate::index::db_name::DOCID_WORD_POSITIONS; use crate::proximity::{index_proximity, MAX_DISTANCE}; +use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd}; use crate::{DocumentId, Result}; /// Extracts the best proximity between pairs of words and the documents ids where this pair appear. @@ -31,7 +32,7 @@ pub fn extract_word_pair_proximity_docids( .map(|_| { create_sorter( grenad::SortAlgorithm::Unstable, - merge_cbo_roaring_bitmaps, + merge_deladd_cbo_roaring_bitmaps, indexer.chunk_compression_type, indexer.chunk_compression_level, indexer.max_nb_chunks, @@ -40,9 +41,12 @@ pub fn extract_word_pair_proximity_docids( }) .collect(); - let mut word_positions: VecDeque<(String, u16)> = + let mut del_word_positions: VecDeque<(String, u16)> = VecDeque::with_capacity(MAX_DISTANCE as usize); - let mut word_pair_proximity = HashMap::new(); + let mut add_word_positions: VecDeque<(String, u16)> = + VecDeque::with_capacity(MAX_DISTANCE as usize); + let mut del_word_pair_proximity = BTreeMap::new(); + let mut add_word_pair_proximity = BTreeMap::new(); let mut current_document_id = None; let mut cursor = docid_word_positions.into_cursor()?; @@ -54,50 +58,90 @@ pub fn extract_word_pair_proximity_docids( // if we change document, we fill the sorter if current_document_id.map_or(false, |id| id != document_id) { puffin::profile_scope!("Document into sorter"); - while !word_positions.is_empty() { - word_positions_into_word_pair_proximity( - &mut word_positions, - &mut word_pair_proximity, - )?; - } document_word_positions_into_sorter( current_document_id.unwrap(), - &word_pair_proximity, + &del_word_pair_proximity, + &add_word_pair_proximity, &mut word_pair_proximity_docids_sorters, )?; - word_pair_proximity.clear(); - word_positions.clear(); + del_word_pair_proximity.clear(); + add_word_pair_proximity.clear(); } current_document_id = Some(document_id); - for (position, word) in KvReaderU16::new(&value).iter() { - // drain the proximity window until the head word is considered close to the word we are inserting. - while word_positions.get(0).map_or(false, |(_w, p)| { - index_proximity(*p as u32, position as u32) >= MAX_DISTANCE - }) { - word_positions_into_word_pair_proximity( - &mut word_positions, - &mut word_pair_proximity, - )?; - } + let (del, add): (Result<_>, Result<_>) = rayon::join( + || { + // deletions + if let Some(deletion) = KvReaderDelAdd::new(&value).get(DelAdd::Deletion) { + for (position, word) in KvReaderU16::new(deletion).iter() { + // drain the proximity window until the head word is considered close to the word we are inserting. + while del_word_positions.get(0).map_or(false, |(_w, p)| { + index_proximity(*p as u32, position as u32) >= MAX_DISTANCE + }) { + word_positions_into_word_pair_proximity( + &mut del_word_positions, + &mut del_word_pair_proximity, + )?; + } - // insert the new word. - let word = std::str::from_utf8(word)?; - word_positions.push_back((word.to_string(), position)); - } + // insert the new word. + let word = std::str::from_utf8(word)?; + del_word_positions.push_back((word.to_string(), position)); + } + + while !del_word_positions.is_empty() { + word_positions_into_word_pair_proximity( + &mut del_word_positions, + &mut del_word_pair_proximity, + )?; + } + } + + Ok(()) + }, + || { + // additions + if let Some(addition) = KvReaderDelAdd::new(&value).get(DelAdd::Addition) { + for (position, word) in KvReaderU16::new(addition).iter() { + // drain the proximity window until the head word is considered close to the word we are inserting. + while add_word_positions.get(0).map_or(false, |(_w, p)| { + index_proximity(*p as u32, position as u32) >= MAX_DISTANCE + }) { + word_positions_into_word_pair_proximity( + &mut add_word_positions, + &mut add_word_pair_proximity, + )?; + } + + // insert the new word. + let word = std::str::from_utf8(word)?; + add_word_positions.push_back((word.to_string(), position)); + } + + while !add_word_positions.is_empty() { + word_positions_into_word_pair_proximity( + &mut add_word_positions, + &mut add_word_pair_proximity, + )?; + } + } + + Ok(()) + }, + ); + + del?; + add?; } if let Some(document_id) = current_document_id { puffin::profile_scope!("Final document into sorter"); - while !word_positions.is_empty() { - word_positions_into_word_pair_proximity(&mut word_positions, &mut word_pair_proximity)?; - } - document_word_positions_into_sorter( document_id, - &word_pair_proximity, + &del_word_pair_proximity, + &add_word_pair_proximity, &mut word_pair_proximity_docids_sorters, )?; } @@ -123,11 +167,38 @@ pub fn extract_word_pair_proximity_docids( /// close to each other. fn document_word_positions_into_sorter( document_id: DocumentId, - word_pair_proximity: &HashMap<(String, String), u8>, + del_word_pair_proximity: &BTreeMap<(String, String), u8>, + add_word_pair_proximity: &BTreeMap<(String, String), u8>, word_pair_proximity_docids_sorters: &mut Vec>, ) -> Result<()> { + use itertools::merge_join_by; + use itertools::EitherOrBoth::{Both, Left, Right}; + + let mut buffer = Vec::new(); let mut key_buffer = Vec::new(); - for ((w1, w2), prox) in word_pair_proximity { + for eob in + merge_join_by(del_word_pair_proximity.iter(), add_word_pair_proximity.iter(), |d, a| { + d.cmp(a) + }) + { + buffer.clear(); + let mut value_writer = KvWriterDelAdd::new(&mut buffer); + let ((w1, w2), prox) = match eob { + Left(key_value) => { + value_writer.insert(DelAdd::Deletion, document_id.to_ne_bytes()).unwrap(); + key_value + } + Right(key_value) => { + value_writer.insert(DelAdd::Addition, document_id.to_ne_bytes()).unwrap(); + key_value + } + Both(key_value, _) => { + value_writer.insert(DelAdd::Deletion, document_id.to_ne_bytes()).unwrap(); + value_writer.insert(DelAdd::Addition, document_id.to_ne_bytes()).unwrap(); + key_value + } + }; + key_buffer.clear(); key_buffer.push(*prox as u8); key_buffer.extend_from_slice(w1.as_bytes()); @@ -135,7 +206,7 @@ fn document_word_positions_into_sorter( key_buffer.extend_from_slice(w2.as_bytes()); word_pair_proximity_docids_sorters[*prox as usize - 1] - .insert(&key_buffer, document_id.to_ne_bytes())?; + .insert(&key_buffer, value_writer.into_inner().unwrap())?; } Ok(()) @@ -143,7 +214,7 @@ fn document_word_positions_into_sorter( fn word_positions_into_word_pair_proximity( word_positions: &mut VecDeque<(String, u16)>, - word_pair_proximity: &mut HashMap<(String, String), u8>, + word_pair_proximity: &mut BTreeMap<(String, String), u8>, ) -> Result<()> { let (head_word, head_position) = word_positions.pop_front().unwrap(); for (word, position) in word_positions.iter() {