diff --git a/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs b/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs index 9a9c33fc0..80f8a30f8 100644 --- a/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs +++ b/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs @@ -1,16 +1,17 @@ -use std::collections::{HashMap, VecDeque}; +use std::collections::{BTreeMap, VecDeque}; use std::fs::File; use std::{cmp, io}; use obkv::KvReaderU16; use super::helpers::{ - create_sorter, create_writer, merge_cbo_roaring_bitmaps, sorter_into_reader, - try_split_array_at, writer_into_reader, GrenadParameters, MergeFn, + create_sorter, create_writer, merge_deladd_cbo_roaring_bitmaps, try_split_array_at, + writer_into_reader, GrenadParameters, MergeFn, }; use crate::error::SerializationError; use crate::index::db_name::DOCID_WORD_POSITIONS; use crate::proximity::{index_proximity, MAX_DISTANCE}; +use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd}; use crate::{DocumentId, Result}; /// Extracts the best proximity between pairs of words and the documents ids where this pair appear. @@ -31,7 +32,7 @@ pub fn extract_word_pair_proximity_docids( .map(|_| { create_sorter( grenad::SortAlgorithm::Unstable, - merge_cbo_roaring_bitmaps, + merge_deladd_cbo_roaring_bitmaps, indexer.chunk_compression_type, indexer.chunk_compression_level, indexer.max_nb_chunks, @@ -40,9 +41,12 @@ pub fn extract_word_pair_proximity_docids( }) .collect(); - let mut word_positions: VecDeque<(String, u16)> = + let mut del_word_positions: VecDeque<(String, u16)> = VecDeque::with_capacity(MAX_DISTANCE as usize); - let mut word_pair_proximity = HashMap::new(); + let mut add_word_positions: VecDeque<(String, u16)> = + VecDeque::with_capacity(MAX_DISTANCE as usize); + let mut del_word_pair_proximity = BTreeMap::new(); + let mut add_word_pair_proximity = BTreeMap::new(); let mut current_document_id = None; let mut cursor = docid_word_positions.into_cursor()?; @@ -54,50 +58,90 @@ pub fn extract_word_pair_proximity_docids( // if we change document, we fill the sorter if current_document_id.map_or(false, |id| id != document_id) { puffin::profile_scope!("Document into sorter"); - while !word_positions.is_empty() { - word_positions_into_word_pair_proximity( - &mut word_positions, - &mut word_pair_proximity, - )?; - } document_word_positions_into_sorter( current_document_id.unwrap(), - &word_pair_proximity, + &del_word_pair_proximity, + &add_word_pair_proximity, &mut word_pair_proximity_docids_sorters, )?; - word_pair_proximity.clear(); - word_positions.clear(); + del_word_pair_proximity.clear(); + add_word_pair_proximity.clear(); } current_document_id = Some(document_id); - for (position, word) in KvReaderU16::new(&value).iter() { - // drain the proximity window until the head word is considered close to the word we are inserting. - while word_positions.get(0).map_or(false, |(_w, p)| { - index_proximity(*p as u32, position as u32) >= MAX_DISTANCE - }) { - word_positions_into_word_pair_proximity( - &mut word_positions, - &mut word_pair_proximity, - )?; - } + let (del, add): (Result<_>, Result<_>) = rayon::join( + || { + // deletions + if let Some(deletion) = KvReaderDelAdd::new(&value).get(DelAdd::Deletion) { + for (position, word) in KvReaderU16::new(deletion).iter() { + // drain the proximity window until the head word is considered close to the word we are inserting. + while del_word_positions.get(0).map_or(false, |(_w, p)| { + index_proximity(*p as u32, position as u32) >= MAX_DISTANCE + }) { + word_positions_into_word_pair_proximity( + &mut del_word_positions, + &mut del_word_pair_proximity, + )?; + } - // insert the new word. - let word = std::str::from_utf8(word)?; - word_positions.push_back((word.to_string(), position)); - } + // insert the new word. + let word = std::str::from_utf8(word)?; + del_word_positions.push_back((word.to_string(), position)); + } + + while !del_word_positions.is_empty() { + word_positions_into_word_pair_proximity( + &mut del_word_positions, + &mut del_word_pair_proximity, + )?; + } + } + + Ok(()) + }, + || { + // additions + if let Some(addition) = KvReaderDelAdd::new(&value).get(DelAdd::Addition) { + for (position, word) in KvReaderU16::new(addition).iter() { + // drain the proximity window until the head word is considered close to the word we are inserting. + while add_word_positions.get(0).map_or(false, |(_w, p)| { + index_proximity(*p as u32, position as u32) >= MAX_DISTANCE + }) { + word_positions_into_word_pair_proximity( + &mut add_word_positions, + &mut add_word_pair_proximity, + )?; + } + + // insert the new word. + let word = std::str::from_utf8(word)?; + add_word_positions.push_back((word.to_string(), position)); + } + + while !add_word_positions.is_empty() { + word_positions_into_word_pair_proximity( + &mut add_word_positions, + &mut add_word_pair_proximity, + )?; + } + } + + Ok(()) + }, + ); + + del?; + add?; } if let Some(document_id) = current_document_id { puffin::profile_scope!("Final document into sorter"); - while !word_positions.is_empty() { - word_positions_into_word_pair_proximity(&mut word_positions, &mut word_pair_proximity)?; - } - document_word_positions_into_sorter( document_id, - &word_pair_proximity, + &del_word_pair_proximity, + &add_word_pair_proximity, &mut word_pair_proximity_docids_sorters, )?; } @@ -123,11 +167,38 @@ pub fn extract_word_pair_proximity_docids( /// close to each other. fn document_word_positions_into_sorter( document_id: DocumentId, - word_pair_proximity: &HashMap<(String, String), u8>, + del_word_pair_proximity: &BTreeMap<(String, String), u8>, + add_word_pair_proximity: &BTreeMap<(String, String), u8>, word_pair_proximity_docids_sorters: &mut Vec>, ) -> Result<()> { + use itertools::merge_join_by; + use itertools::EitherOrBoth::{Both, Left, Right}; + + let mut buffer = Vec::new(); let mut key_buffer = Vec::new(); - for ((w1, w2), prox) in word_pair_proximity { + for eob in + merge_join_by(del_word_pair_proximity.iter(), add_word_pair_proximity.iter(), |d, a| { + d.cmp(a) + }) + { + buffer.clear(); + let mut value_writer = KvWriterDelAdd::new(&mut buffer); + let ((w1, w2), prox) = match eob { + Left(key_value) => { + value_writer.insert(DelAdd::Deletion, document_id.to_ne_bytes()).unwrap(); + key_value + } + Right(key_value) => { + value_writer.insert(DelAdd::Addition, document_id.to_ne_bytes()).unwrap(); + key_value + } + Both(key_value, _) => { + value_writer.insert(DelAdd::Deletion, document_id.to_ne_bytes()).unwrap(); + value_writer.insert(DelAdd::Addition, document_id.to_ne_bytes()).unwrap(); + key_value + } + }; + key_buffer.clear(); key_buffer.push(*prox as u8); key_buffer.extend_from_slice(w1.as_bytes()); @@ -135,7 +206,7 @@ fn document_word_positions_into_sorter( key_buffer.extend_from_slice(w2.as_bytes()); word_pair_proximity_docids_sorters[*prox as usize - 1] - .insert(&key_buffer, document_id.to_ne_bytes())?; + .insert(&key_buffer, value_writer.into_inner().unwrap())?; } Ok(()) @@ -143,7 +214,7 @@ fn document_word_positions_into_sorter( fn word_positions_into_word_pair_proximity( word_positions: &mut VecDeque<(String, u16)>, - word_pair_proximity: &mut HashMap<(String, String), u8>, + word_pair_proximity: &mut BTreeMap<(String, String), u8>, ) -> Result<()> { let (head_word, head_position) = word_positions.pop_front().unwrap(); for (word, position) in word_positions.iter() {