diff --git a/milli/src/update/del_add.rs b/milli/src/update/del_add.rs index 346ae0afa..c8b7f0f6a 100644 --- a/milli/src/update/del_add.rs +++ b/milli/src/update/del_add.rs @@ -98,3 +98,7 @@ pub fn del_add_from_two_obkvs( writer.finish() } + +pub fn is_noop_del_add_obkv(del_add: KvReaderDelAdd) -> bool { + del_add.get(DelAdd::Deletion) == del_add.get(DelAdd::Addition) +} diff --git a/milli/src/update/index_documents/extract/extract_word_docids.rs b/milli/src/update/index_documents/extract/extract_word_docids.rs index 3df962585..a95162236 100644 --- a/milli/src/update/index_documents/extract/extract_word_docids.rs +++ b/milli/src/update/index_documents/extract/extract_word_docids.rs @@ -7,12 +7,13 @@ use heed::BytesDecode; use obkv::KvReaderU16; use super::helpers::{ - create_sorter, create_writer, merge_cbo_roaring_bitmaps, sorter_into_reader, + create_sorter, create_writer, merge_deladd_cbo_roaring_bitmaps, sorter_into_reader, try_split_array_at, writer_into_reader, GrenadParameters, }; use crate::error::SerializationError; use crate::heed_codec::StrBEU16Codec; use crate::index::db_name::DOCID_WORD_POSITIONS; +use crate::update::del_add::{is_noop_del_add_obkv, DelAdd, KvReaderDelAdd, KvWriterDelAdd}; use crate::update::MergeFn; use crate::{DocumentId, FieldId, Result}; @@ -39,14 +40,15 @@ pub fn extract_word_docids( let mut word_fid_docids_sorter = create_sorter( grenad::SortAlgorithm::Unstable, - merge_cbo_roaring_bitmaps, + merge_deladd_cbo_roaring_bitmaps, indexer.chunk_compression_type, indexer.chunk_compression_level, indexer.max_nb_chunks, max_memory.map(|x| x / 3), ); let mut key_buffer = Vec::new(); - let mut words = BTreeSet::new(); + let mut del_words = BTreeSet::new(); + let mut add_words = BTreeSet::new(); let mut cursor = docid_word_positions.into_cursor()?; while let Some((key, value)) = cursor.move_on_next()? { let (document_id_bytes, fid_bytes) = try_split_array_at(key) @@ -56,24 +58,37 @@ pub fn extract_word_docids( let document_id = u32::from_be_bytes(document_id_bytes); let fid = u16::from_be_bytes(fid_bytes); - for (_pos, word) in KvReaderU16::new(&value).iter() { - words.insert(word.to_vec()); + let del_add_reader = KvReaderDelAdd::new(&value); + // extract all unique words to remove. + if let Some(deletion) = del_add_reader.get(DelAdd::Deletion) { + for (_pos, word) in KvReaderU16::new(&deletion).iter() { + del_words.insert(word.to_vec()); + } + } + + // extract all unique additional words. + if let Some(addition) = del_add_reader.get(DelAdd::Addition) { + for (_pos, word) in KvReaderU16::new(&addition).iter() { + add_words.insert(word.to_vec()); + } } words_into_sorter( document_id, fid, &mut key_buffer, - &mut words, + &del_words, + &add_words, &mut word_fid_docids_sorter, )?; - words.clear(); + del_words.clear(); + add_words.clear(); } let mut word_docids_sorter = create_sorter( grenad::SortAlgorithm::Unstable, - merge_cbo_roaring_bitmaps, + merge_deladd_cbo_roaring_bitmaps, indexer.chunk_compression_type, indexer.chunk_compression_level, indexer.max_nb_chunks, @@ -82,7 +97,7 @@ pub fn extract_word_docids( let mut exact_word_docids_sorter = create_sorter( grenad::SortAlgorithm::Unstable, - merge_cbo_roaring_bitmaps, + merge_deladd_cbo_roaring_bitmaps, indexer.chunk_compression_type, indexer.chunk_compression_level, indexer.max_nb_chunks, @@ -96,8 +111,12 @@ pub fn extract_word_docids( ); let mut iter = word_fid_docids_sorter.into_stream_merger_iter()?; + // TODO: replace sorters by writers by accumulating values into a buffer before inserting them. while let Some((key, value)) = iter.next()? { - word_fid_docids_writer.insert(key, value)?; + // only keep the value if their is a change to apply in the DB. + if !is_noop_del_add_obkv(KvReaderDelAdd::new(value)) { + word_fid_docids_writer.insert(key, value)?; + } let (word, fid) = StrBEU16Codec::bytes_decode(key) .ok_or(SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?; @@ -121,20 +140,41 @@ fn words_into_sorter( document_id: DocumentId, fid: FieldId, key_buffer: &mut Vec, - words: &mut BTreeSet>, + del_words: &BTreeSet>, + add_words: &BTreeSet>, word_fid_docids_sorter: &mut grenad::Sorter, ) -> Result<()> { puffin::profile_function!(); - for word_bytes in words.iter() { + use itertools::merge_join_by; + use itertools::EitherOrBoth::{Both, Left, Right}; + + let mut buffer = Vec::new(); + for eob in merge_join_by(del_words.iter(), add_words.iter(), |d, a| d.cmp(a)) { + buffer.clear(); + let mut value_writer = KvWriterDelAdd::new(&mut buffer); + let word_bytes = match eob { + Left(word_bytes) => { + value_writer.insert(DelAdd::Deletion, document_id.to_ne_bytes()).unwrap(); + word_bytes + } + Right(word_bytes) => { + value_writer.insert(DelAdd::Addition, document_id.to_ne_bytes()).unwrap(); + word_bytes + } + Both(word_bytes, _) => { + value_writer.insert(DelAdd::Deletion, document_id.to_ne_bytes()).unwrap(); + value_writer.insert(DelAdd::Addition, document_id.to_ne_bytes()).unwrap(); + word_bytes + } + }; + key_buffer.clear(); key_buffer.extend_from_slice(&word_bytes); key_buffer.push(0); key_buffer.extend_from_slice(&fid.to_be_bytes()); - word_fid_docids_sorter.insert(&key_buffer, document_id.to_ne_bytes())?; + word_fid_docids_sorter.insert(&key_buffer, value_writer.into_inner().unwrap())?; } - words.clear(); - Ok(()) }