update extract word position docids

This commit is contained in:
ManyTheFish 2023-10-19 13:27:07 +02:00 committed by Louis Dureuil
parent 46aa75abdb
commit 6bcf8b4f8c
No known key found for this signature in database

View File

@ -1,15 +1,17 @@
use std::collections::HashSet; use std::collections::BTreeSet;
use std::fs::File; use std::fs::File;
use std::io::{self, BufReader}; use std::io::{self, BufReader};
use obkv::KvReaderU16; use obkv::KvReaderU16;
use super::helpers::{ use super::helpers::{
create_sorter, merge_cbo_roaring_bitmaps, sorter_into_reader, try_split_array_at, create_sorter, merge_deladd_cbo_roaring_bitmaps, sorter_into_reader, try_split_array_at,
GrenadParameters, GrenadParameters,
}; };
use crate::error::SerializationError; use crate::error::SerializationError;
use crate::index::db_name::DOCID_WORD_POSITIONS; use crate::index::db_name::DOCID_WORD_POSITIONS;
use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd};
use crate::update::MergeFn;
use crate::{bucketed_position, DocumentId, Result}; use crate::{bucketed_position, DocumentId, Result};
/// Extracts the word positions and the documents ids where this word appear. /// Extracts the word positions and the documents ids where this word appear.
@ -27,14 +29,15 @@ pub fn extract_word_position_docids<R: io::Read + io::Seek>(
let mut word_position_docids_sorter = create_sorter( let mut word_position_docids_sorter = create_sorter(
grenad::SortAlgorithm::Unstable, grenad::SortAlgorithm::Unstable,
merge_cbo_roaring_bitmaps, merge_deladd_cbo_roaring_bitmaps,
indexer.chunk_compression_type, indexer.chunk_compression_type,
indexer.chunk_compression_level, indexer.chunk_compression_level,
indexer.max_nb_chunks, indexer.max_nb_chunks,
max_memory, max_memory,
); );
let mut word_positions: HashSet<(u16, Vec<u8>)> = HashSet::new(); let mut del_word_positions: BTreeSet<(u16, Vec<u8>)> = BTreeSet::new();
let mut add_word_positions: BTreeSet<(u16, Vec<u8>)> = BTreeSet::new();
let mut current_document_id: Option<u32> = None; let mut current_document_id: Option<u32> = None;
let mut key_buffer = Vec::new(); let mut key_buffer = Vec::new();
let mut cursor = docid_word_positions.into_cursor()?; let mut cursor = docid_word_positions.into_cursor()?;
@ -44,36 +47,92 @@ pub fn extract_word_position_docids<R: io::Read + io::Seek>(
let document_id = DocumentId::from_be_bytes(document_id_bytes); let document_id = DocumentId::from_be_bytes(document_id_bytes);
if current_document_id.map_or(false, |id| document_id != id) { if current_document_id.map_or(false, |id| document_id != id) {
for (position, word_bytes) in word_positions.iter() { words_position_into_sorter(
key_buffer.clear(); current_document_id.unwrap(),
key_buffer.extend_from_slice(word_bytes); &mut key_buffer,
key_buffer.push(0); &del_word_positions,
key_buffer.extend_from_slice(&position.to_be_bytes()); &add_word_positions,
word_position_docids_sorter &mut word_position_docids_sorter,
.insert(&key_buffer, current_document_id.unwrap().to_ne_bytes())?; )?;
} del_word_positions.clear();
word_positions.clear(); add_word_positions.clear();
} }
current_document_id = Some(document_id); current_document_id = Some(document_id);
for (position, word_bytes) in KvReaderU16::new(&value).iter() { let del_add_reader = KvReaderDelAdd::new(&value);
// extract all unique words to remove.
if let Some(deletion) = del_add_reader.get(DelAdd::Deletion) {
for (position, word_bytes) in KvReaderU16::new(deletion).iter() {
let position = bucketed_position(position); let position = bucketed_position(position);
word_positions.insert((position, word_bytes.to_vec())); del_word_positions.insert((position, word_bytes.to_vec()));
}
}
// extract all unique additional words.
if let Some(addition) = del_add_reader.get(DelAdd::Addition) {
for (position, word_bytes) in KvReaderU16::new(addition).iter() {
let position = bucketed_position(position);
add_word_positions.insert((position, word_bytes.to_vec()));
}
} }
} }
if let Some(document_id) = current_document_id { if let Some(document_id) = current_document_id {
for (position, word_bytes) in word_positions { words_position_into_sorter(
key_buffer.clear(); document_id,
key_buffer.extend_from_slice(&word_bytes); &mut key_buffer,
key_buffer.push(0); &del_word_positions,
key_buffer.extend_from_slice(&position.to_be_bytes()); &add_word_positions,
word_position_docids_sorter.insert(&key_buffer, document_id.to_ne_bytes())?; &mut word_position_docids_sorter,
} )?;
} }
// TODO remove noop DelAdd OBKV
let word_position_docids_reader = sorter_into_reader(word_position_docids_sorter, indexer)?; let word_position_docids_reader = sorter_into_reader(word_position_docids_sorter, indexer)?;
Ok(word_position_docids_reader) Ok(word_position_docids_reader)
} }
fn words_position_into_sorter(
document_id: DocumentId,
key_buffer: &mut Vec<u8>,
del_word_positions: &BTreeSet<(u16, Vec<u8>)>,
add_word_positions: &BTreeSet<(u16, Vec<u8>)>,
word_position_docids_sorter: &mut grenad::Sorter<MergeFn>,
) -> Result<()> {
puffin::profile_function!();
use itertools::merge_join_by;
use itertools::EitherOrBoth::{Both, Left, Right};
let mut buffer = Vec::new();
for eob in merge_join_by(del_word_positions.iter(), add_word_positions.iter(), |d, a| d.cmp(a))
{
buffer.clear();
let mut value_writer = KvWriterDelAdd::new(&mut buffer);
let (position, word_bytes) = match eob {
Left(key) => {
value_writer.insert(DelAdd::Deletion, document_id.to_ne_bytes()).unwrap();
key
}
Right(key) => {
value_writer.insert(DelAdd::Addition, document_id.to_ne_bytes()).unwrap();
key
}
Both(key, _) => {
value_writer.insert(DelAdd::Deletion, document_id.to_ne_bytes()).unwrap();
value_writer.insert(DelAdd::Addition, document_id.to_ne_bytes()).unwrap();
key
}
};
key_buffer.clear();
key_buffer.extend_from_slice(word_bytes);
key_buffer.push(0);
key_buffer.extend_from_slice(&position.to_be_bytes());
word_position_docids_sorter.insert(&key_buffer, value_writer.into_inner().unwrap())?;
}
Ok(())
}