update extract word pair proximity to support deladd obkvs

This commit is contained in:
ManyTheFish 2023-10-19 14:18:14 +02:00
parent c63ff5298b
commit b8fed737ef

View File

@ -1,16 +1,17 @@
use std::collections::{HashMap, VecDeque}; use std::collections::{BTreeMap, VecDeque};
use std::fs::File; use std::fs::File;
use std::{cmp, io}; use std::{cmp, io};
use obkv::KvReaderU16; use obkv::KvReaderU16;
use super::helpers::{ use super::helpers::{
create_sorter, create_writer, merge_cbo_roaring_bitmaps, sorter_into_reader, create_sorter, create_writer, merge_deladd_cbo_roaring_bitmaps, try_split_array_at,
try_split_array_at, writer_into_reader, GrenadParameters, MergeFn, writer_into_reader, GrenadParameters, MergeFn,
}; };
use crate::error::SerializationError; use crate::error::SerializationError;
use crate::index::db_name::DOCID_WORD_POSITIONS; use crate::index::db_name::DOCID_WORD_POSITIONS;
use crate::proximity::{index_proximity, MAX_DISTANCE}; use crate::proximity::{index_proximity, MAX_DISTANCE};
use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd};
use crate::{DocumentId, Result}; use crate::{DocumentId, Result};
/// Extracts the best proximity between pairs of words and the documents ids where this pair appear. /// Extracts the best proximity between pairs of words and the documents ids where this pair appear.
@ -31,7 +32,7 @@ pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>(
.map(|_| { .map(|_| {
create_sorter( create_sorter(
grenad::SortAlgorithm::Unstable, grenad::SortAlgorithm::Unstable,
merge_cbo_roaring_bitmaps, merge_deladd_cbo_roaring_bitmaps,
indexer.chunk_compression_type, indexer.chunk_compression_type,
indexer.chunk_compression_level, indexer.chunk_compression_level,
indexer.max_nb_chunks, indexer.max_nb_chunks,
@ -40,9 +41,12 @@ pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>(
}) })
.collect(); .collect();
let mut word_positions: VecDeque<(String, u16)> = let mut del_word_positions: VecDeque<(String, u16)> =
VecDeque::with_capacity(MAX_DISTANCE as usize); VecDeque::with_capacity(MAX_DISTANCE as usize);
let mut word_pair_proximity = HashMap::new(); let mut add_word_positions: VecDeque<(String, u16)> =
VecDeque::with_capacity(MAX_DISTANCE as usize);
let mut del_word_pair_proximity = BTreeMap::new();
let mut add_word_pair_proximity = BTreeMap::new();
let mut current_document_id = None; let mut current_document_id = None;
let mut cursor = docid_word_positions.into_cursor()?; let mut cursor = docid_word_positions.into_cursor()?;
@ -54,50 +58,90 @@ pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>(
// if we change document, we fill the sorter // if we change document, we fill the sorter
if current_document_id.map_or(false, |id| id != document_id) { if current_document_id.map_or(false, |id| id != document_id) {
puffin::profile_scope!("Document into sorter"); puffin::profile_scope!("Document into sorter");
while !word_positions.is_empty() {
word_positions_into_word_pair_proximity(
&mut word_positions,
&mut word_pair_proximity,
)?;
}
document_word_positions_into_sorter( document_word_positions_into_sorter(
current_document_id.unwrap(), current_document_id.unwrap(),
&word_pair_proximity, &del_word_pair_proximity,
&add_word_pair_proximity,
&mut word_pair_proximity_docids_sorters, &mut word_pair_proximity_docids_sorters,
)?; )?;
word_pair_proximity.clear(); del_word_pair_proximity.clear();
word_positions.clear(); add_word_pair_proximity.clear();
} }
current_document_id = Some(document_id); current_document_id = Some(document_id);
for (position, word) in KvReaderU16::new(&value).iter() { let (del, add): (Result<_>, Result<_>) = rayon::join(
|| {
// deletions
if let Some(deletion) = KvReaderDelAdd::new(&value).get(DelAdd::Deletion) {
for (position, word) in KvReaderU16::new(deletion).iter() {
// drain the proximity window until the head word is considered close to the word we are inserting. // drain the proximity window until the head word is considered close to the word we are inserting.
while word_positions.get(0).map_or(false, |(_w, p)| { while del_word_positions.get(0).map_or(false, |(_w, p)| {
index_proximity(*p as u32, position as u32) >= MAX_DISTANCE index_proximity(*p as u32, position as u32) >= MAX_DISTANCE
}) { }) {
word_positions_into_word_pair_proximity( word_positions_into_word_pair_proximity(
&mut word_positions, &mut del_word_positions,
&mut word_pair_proximity, &mut del_word_pair_proximity,
)?; )?;
} }
// insert the new word. // insert the new word.
let word = std::str::from_utf8(word)?; let word = std::str::from_utf8(word)?;
word_positions.push_back((word.to_string(), position)); del_word_positions.push_back((word.to_string(), position));
} }
while !del_word_positions.is_empty() {
word_positions_into_word_pair_proximity(
&mut del_word_positions,
&mut del_word_pair_proximity,
)?;
}
}
Ok(())
},
|| {
// additions
if let Some(addition) = KvReaderDelAdd::new(&value).get(DelAdd::Addition) {
for (position, word) in KvReaderU16::new(addition).iter() {
// drain the proximity window until the head word is considered close to the word we are inserting.
while add_word_positions.get(0).map_or(false, |(_w, p)| {
index_proximity(*p as u32, position as u32) >= MAX_DISTANCE
}) {
word_positions_into_word_pair_proximity(
&mut add_word_positions,
&mut add_word_pair_proximity,
)?;
}
// insert the new word.
let word = std::str::from_utf8(word)?;
add_word_positions.push_back((word.to_string(), position));
}
while !add_word_positions.is_empty() {
word_positions_into_word_pair_proximity(
&mut add_word_positions,
&mut add_word_pair_proximity,
)?;
}
}
Ok(())
},
);
del?;
add?;
} }
if let Some(document_id) = current_document_id { if let Some(document_id) = current_document_id {
puffin::profile_scope!("Final document into sorter"); puffin::profile_scope!("Final document into sorter");
while !word_positions.is_empty() {
word_positions_into_word_pair_proximity(&mut word_positions, &mut word_pair_proximity)?;
}
document_word_positions_into_sorter( document_word_positions_into_sorter(
document_id, document_id,
&word_pair_proximity, &del_word_pair_proximity,
&add_word_pair_proximity,
&mut word_pair_proximity_docids_sorters, &mut word_pair_proximity_docids_sorters,
)?; )?;
} }
@ -123,11 +167,38 @@ pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>(
/// close to each other. /// close to each other.
fn document_word_positions_into_sorter( fn document_word_positions_into_sorter(
document_id: DocumentId, document_id: DocumentId,
word_pair_proximity: &HashMap<(String, String), u8>, del_word_pair_proximity: &BTreeMap<(String, String), u8>,
add_word_pair_proximity: &BTreeMap<(String, String), u8>,
word_pair_proximity_docids_sorters: &mut Vec<grenad::Sorter<MergeFn>>, word_pair_proximity_docids_sorters: &mut Vec<grenad::Sorter<MergeFn>>,
) -> Result<()> { ) -> Result<()> {
use itertools::merge_join_by;
use itertools::EitherOrBoth::{Both, Left, Right};
let mut buffer = Vec::new();
let mut key_buffer = Vec::new(); let mut key_buffer = Vec::new();
for ((w1, w2), prox) in word_pair_proximity { for eob in
merge_join_by(del_word_pair_proximity.iter(), add_word_pair_proximity.iter(), |d, a| {
d.cmp(a)
})
{
buffer.clear();
let mut value_writer = KvWriterDelAdd::new(&mut buffer);
let ((w1, w2), prox) = match eob {
Left(key_value) => {
value_writer.insert(DelAdd::Deletion, document_id.to_ne_bytes()).unwrap();
key_value
}
Right(key_value) => {
value_writer.insert(DelAdd::Addition, document_id.to_ne_bytes()).unwrap();
key_value
}
Both(key_value, _) => {
value_writer.insert(DelAdd::Deletion, document_id.to_ne_bytes()).unwrap();
value_writer.insert(DelAdd::Addition, document_id.to_ne_bytes()).unwrap();
key_value
}
};
key_buffer.clear(); key_buffer.clear();
key_buffer.push(*prox as u8); key_buffer.push(*prox as u8);
key_buffer.extend_from_slice(w1.as_bytes()); key_buffer.extend_from_slice(w1.as_bytes());
@ -135,7 +206,7 @@ fn document_word_positions_into_sorter(
key_buffer.extend_from_slice(w2.as_bytes()); key_buffer.extend_from_slice(w2.as_bytes());
word_pair_proximity_docids_sorters[*prox as usize - 1] word_pair_proximity_docids_sorters[*prox as usize - 1]
.insert(&key_buffer, document_id.to_ne_bytes())?; .insert(&key_buffer, value_writer.into_inner().unwrap())?;
} }
Ok(()) Ok(())
@ -143,7 +214,7 @@ fn document_word_positions_into_sorter(
fn word_positions_into_word_pair_proximity( fn word_positions_into_word_pair_proximity(
word_positions: &mut VecDeque<(String, u16)>, word_positions: &mut VecDeque<(String, u16)>,
word_pair_proximity: &mut HashMap<(String, String), u8>, word_pair_proximity: &mut BTreeMap<(String, String), u8>,
) -> Result<()> { ) -> Result<()> {
let (head_word, head_position) = word_positions.pop_front().unwrap(); let (head_word, head_position) = word_positions.pop_front().unwrap();
for (word, position) in word_positions.iter() { for (word, position) in word_positions.iter() {