update extract word docids

This commit is contained in:
ManyTheFish 2023-10-19 11:58:31 +02:00
parent e0dc413521
commit d50408d670
2 changed files with 59 additions and 15 deletions

View File

@ -98,3 +98,7 @@ pub fn del_add_from_two_obkvs<K: obkv::Key + PartialOrd + Ord>(
writer.finish() writer.finish()
} }
pub fn is_noop_del_add_obkv(del_add: KvReaderDelAdd) -> bool {
del_add.get(DelAdd::Deletion) == del_add.get(DelAdd::Addition)
}

View File

@ -6,12 +6,13 @@ use heed::BytesDecode;
use obkv::KvReaderU16; use obkv::KvReaderU16;
use super::helpers::{ use super::helpers::{
create_sorter, create_writer, merge_cbo_roaring_bitmaps, sorter_into_reader, create_sorter, create_writer, merge_deladd_cbo_roaring_bitmaps, sorter_into_reader,
try_split_array_at, writer_into_reader, GrenadParameters, try_split_array_at, writer_into_reader, GrenadParameters,
}; };
use crate::error::SerializationError; use crate::error::SerializationError;
use crate::heed_codec::StrBEU16Codec; use crate::heed_codec::StrBEU16Codec;
use crate::index::db_name::DOCID_WORD_POSITIONS; use crate::index::db_name::DOCID_WORD_POSITIONS;
use crate::update::del_add::{is_noop_del_add_obkv, DelAdd, KvReaderDelAdd, KvWriterDelAdd};
use crate::update::MergeFn; use crate::update::MergeFn;
use crate::{DocumentId, FieldId, Result}; use crate::{DocumentId, FieldId, Result};
@ -34,14 +35,15 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
let mut word_fid_docids_sorter = create_sorter( let mut word_fid_docids_sorter = create_sorter(
grenad::SortAlgorithm::Unstable, grenad::SortAlgorithm::Unstable,
merge_cbo_roaring_bitmaps, merge_deladd_cbo_roaring_bitmaps,
indexer.chunk_compression_type, indexer.chunk_compression_type,
indexer.chunk_compression_level, indexer.chunk_compression_level,
indexer.max_nb_chunks, indexer.max_nb_chunks,
max_memory.map(|x| x / 3), max_memory.map(|x| x / 3),
); );
let mut key_buffer = Vec::new(); let mut key_buffer = Vec::new();
let mut words = BTreeSet::new(); let mut del_words = BTreeSet::new();
let mut add_words = BTreeSet::new();
let mut cursor = docid_word_positions.into_cursor()?; let mut cursor = docid_word_positions.into_cursor()?;
while let Some((key, value)) = cursor.move_on_next()? { while let Some((key, value)) = cursor.move_on_next()? {
let (document_id_bytes, fid_bytes) = try_split_array_at(key) let (document_id_bytes, fid_bytes) = try_split_array_at(key)
@ -51,24 +53,37 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
let document_id = u32::from_be_bytes(document_id_bytes); let document_id = u32::from_be_bytes(document_id_bytes);
let fid = u16::from_be_bytes(fid_bytes); let fid = u16::from_be_bytes(fid_bytes);
for (_pos, word) in KvReaderU16::new(&value).iter() { let del_add_reader = KvReaderDelAdd::new(&value);
words.insert(word.to_vec()); // extract all unique words to remove.
if let Some(deletion) = del_add_reader.get(DelAdd::Deletion) {
for (_pos, word) in KvReaderU16::new(&deletion).iter() {
del_words.insert(word.to_vec());
}
}
// extract all unique additional words.
if let Some(addition) = del_add_reader.get(DelAdd::Addition) {
for (_pos, word) in KvReaderU16::new(&addition).iter() {
add_words.insert(word.to_vec());
}
} }
words_into_sorter( words_into_sorter(
document_id, document_id,
fid, fid,
&mut key_buffer, &mut key_buffer,
&mut words, &del_words,
&add_words,
&mut word_fid_docids_sorter, &mut word_fid_docids_sorter,
)?; )?;
words.clear(); del_words.clear();
add_words.clear();
} }
let mut word_docids_sorter = create_sorter( let mut word_docids_sorter = create_sorter(
grenad::SortAlgorithm::Unstable, grenad::SortAlgorithm::Unstable,
merge_cbo_roaring_bitmaps, merge_deladd_cbo_roaring_bitmaps,
indexer.chunk_compression_type, indexer.chunk_compression_type,
indexer.chunk_compression_level, indexer.chunk_compression_level,
indexer.max_nb_chunks, indexer.max_nb_chunks,
@ -77,7 +92,7 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
let mut exact_word_docids_sorter = create_sorter( let mut exact_word_docids_sorter = create_sorter(
grenad::SortAlgorithm::Unstable, grenad::SortAlgorithm::Unstable,
merge_cbo_roaring_bitmaps, merge_deladd_cbo_roaring_bitmaps,
indexer.chunk_compression_type, indexer.chunk_compression_type,
indexer.chunk_compression_level, indexer.chunk_compression_level,
indexer.max_nb_chunks, indexer.max_nb_chunks,
@ -91,8 +106,12 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
); );
let mut iter = word_fid_docids_sorter.into_stream_merger_iter()?; let mut iter = word_fid_docids_sorter.into_stream_merger_iter()?;
// TODO: replace sorters by writers by accumulating values into a buffer before inserting them.
while let Some((key, value)) = iter.next()? { while let Some((key, value)) = iter.next()? {
word_fid_docids_writer.insert(key, value)?; // only keep the value if their is a change to apply in the DB.
if !is_noop_del_add_obkv(KvReaderDelAdd::new(value)) {
word_fid_docids_writer.insert(key, value)?;
}
let (word, fid) = StrBEU16Codec::bytes_decode(key) let (word, fid) = StrBEU16Codec::bytes_decode(key)
.ok_or(SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?; .ok_or(SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?;
@ -116,20 +135,41 @@ fn words_into_sorter(
document_id: DocumentId, document_id: DocumentId,
fid: FieldId, fid: FieldId,
key_buffer: &mut Vec<u8>, key_buffer: &mut Vec<u8>,
words: &mut BTreeSet<Vec<u8>>, del_words: &BTreeSet<Vec<u8>>,
add_words: &BTreeSet<Vec<u8>>,
word_fid_docids_sorter: &mut grenad::Sorter<MergeFn>, word_fid_docids_sorter: &mut grenad::Sorter<MergeFn>,
) -> Result<()> { ) -> Result<()> {
puffin::profile_function!(); puffin::profile_function!();
for word_bytes in words.iter() { use itertools::merge_join_by;
use itertools::EitherOrBoth::{Both, Left, Right};
let mut buffer = Vec::new();
for eob in merge_join_by(del_words.iter(), add_words.iter(), |d, a| d.cmp(a)) {
buffer.clear();
let mut value_writer = KvWriterDelAdd::new(&mut buffer);
let word_bytes = match eob {
Left(word_bytes) => {
value_writer.insert(DelAdd::Deletion, document_id.to_ne_bytes()).unwrap();
word_bytes
}
Right(word_bytes) => {
value_writer.insert(DelAdd::Addition, document_id.to_ne_bytes()).unwrap();
word_bytes
}
Both(word_bytes, _) => {
value_writer.insert(DelAdd::Deletion, document_id.to_ne_bytes()).unwrap();
value_writer.insert(DelAdd::Addition, document_id.to_ne_bytes()).unwrap();
word_bytes
}
};
key_buffer.clear(); key_buffer.clear();
key_buffer.extend_from_slice(&word_bytes); key_buffer.extend_from_slice(&word_bytes);
key_buffer.push(0); key_buffer.push(0);
key_buffer.extend_from_slice(&fid.to_be_bytes()); key_buffer.extend_from_slice(&fid.to_be_bytes());
word_fid_docids_sorter.insert(&key_buffer, document_id.to_ne_bytes())?; word_fid_docids_sorter.insert(&key_buffer, value_writer.into_inner().unwrap())?;
} }
words.clear();
Ok(()) Ok(())
} }