diff --git a/milli/src/update/index_documents/extract/extract_word_docids.rs b/milli/src/update/index_documents/extract/extract_word_docids.rs index f38701dac..c8a01690b 100644 --- a/milli/src/update/index_documents/extract/extract_word_docids.rs +++ b/milli/src/update/index_documents/extract/extract_word_docids.rs @@ -1,14 +1,17 @@ use std::collections::{BTreeSet, HashSet}; use std::fs::File; use std::io::{self, BufReader}; +use std::sync::Mutex; use heed::BytesDecode; use obkv::KvReaderU16; +use once_cell::sync::Lazy; use super::helpers::{ create_sorter, create_writer, merge_deladd_cbo_roaring_bitmaps, sorter_into_reader, try_split_array_at, writer_into_reader, GrenadParameters, }; +use super::RawKVWriter; use crate::error::SerializationError; use crate::heed_codec::StrBEU16Codec; use crate::index::db_name::DOCID_WORD_POSITIONS; @@ -16,6 +19,11 @@ use crate::update::del_add::{is_noop_del_add_obkv, DelAdd, KvReaderDelAdd, KvWri use crate::update::MergeFn; use crate::{DocumentId, FieldId, Result}; +static WORD_FID_DOCIDS_RAW_KV: Lazy> = + Lazy::new(|| Mutex::new(RawKVWriter::new("extract_word_fid_docids").unwrap())); +static WORD_DOCIDS_RAW_KV: Lazy> = + Lazy::new(|| Mutex::new(RawKVWriter::new("extract_word_docids").unwrap())); + /// Extracts the word and the documents ids where this word appear. /// /// Returns a grenad reader with the list of extracted words and @@ -109,6 +117,7 @@ pub fn extract_word_docids( tempfile::tempfile()?, ); + let mut word_docids_raw_kv = WORD_DOCIDS_RAW_KV.lock().unwrap(); let mut iter = word_fid_docids_sorter.into_stream_merger_iter()?; // TODO: replace sorters by writers by accumulating values into a buffer before inserting them. while let Some((key, value)) = iter.next()? { @@ -124,10 +133,14 @@ pub fn extract_word_docids( if exact_attributes.contains(&fid) { exact_word_docids_sorter.insert(word.as_bytes(), value)?; } else { + word_docids_raw_kv.push(word.as_bytes(), value).unwrap(); word_docids_sorter.insert(word.as_bytes(), value)?; } } + WORD_FID_DOCIDS_RAW_KV.lock().unwrap().flush().unwrap(); + word_docids_raw_kv.flush().unwrap(); + Ok(( sorter_into_reader(word_docids_sorter, indexer)?, sorter_into_reader(exact_word_docids_sorter, indexer)?, @@ -146,6 +159,8 @@ fn words_into_sorter( ) -> Result<()> { puffin::profile_function!(); + let mut raw_kv_word_fid_docids = WORD_FID_DOCIDS_RAW_KV.lock().unwrap(); + use itertools::merge_join_by; use itertools::EitherOrBoth::{Both, Left, Right}; @@ -173,7 +188,9 @@ fn words_into_sorter( key_buffer.extend_from_slice(word_bytes); key_buffer.push(0); key_buffer.extend_from_slice(&fid.to_be_bytes()); - word_fid_docids_sorter.insert(&key_buffer, value_writer.into_inner().unwrap())?; + let value_buffer = value_writer.into_inner().unwrap(); + raw_kv_word_fid_docids.push(key_buffer, value_buffer).unwrap(); + word_fid_docids_sorter.insert(&key_buffer, value_buffer)?; } Ok(()) diff --git a/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs b/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs index 82a94ce00..f75357998 100644 --- a/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs +++ b/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs @@ -1,20 +1,26 @@ use std::collections::{BTreeMap, VecDeque}; use std::fs::File; use std::io::BufReader; +use std::sync::Mutex; use std::{cmp, io}; use obkv::KvReaderU16; +use once_cell::sync::Lazy; use super::helpers::{ create_sorter, create_writer, merge_deladd_cbo_roaring_bitmaps, try_split_array_at, writer_into_reader, GrenadParameters, MergeFn, }; +use super::RawKVWriter; use crate::error::SerializationError; use crate::index::db_name::DOCID_WORD_POSITIONS; use crate::proximity::{index_proximity, MAX_DISTANCE}; use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd}; use crate::{DocumentId, Result}; +static WORD_PAIR_PROXIMITY_DOCIDS_RAW_KV: Lazy> = + Lazy::new(|| Mutex::new(RawKVWriter::new("extract_word_pair_proximity_docids").unwrap())); + /// Extracts the best proximity between pairs of words and the documents ids where this pair appear. /// /// Returns a grenad reader with the list of extracted word pairs proximities and @@ -153,6 +159,9 @@ pub fn extract_word_pair_proximity_docids( &mut word_pair_proximity_docids_sorters, )?; } + + WORD_PAIR_PROXIMITY_DOCIDS_RAW_KV.lock().unwrap().flush().unwrap(); + { puffin::profile_scope!("sorter_into_reader"); // FIXME: span inside of a hot loop might degrade performance and create big reports @@ -186,6 +195,8 @@ fn document_word_positions_into_sorter( use itertools::merge_join_by; use itertools::EitherOrBoth::{Both, Left, Right}; + let mut word_pair_proximity_docids_raw_kv = WORD_PAIR_PROXIMITY_DOCIDS_RAW_KV.lock().unwrap(); + let mut buffer = Vec::new(); let mut key_buffer = Vec::new(); for eob in @@ -217,8 +228,9 @@ fn document_word_positions_into_sorter( key_buffer.push(0); key_buffer.extend_from_slice(w2.as_bytes()); - word_pair_proximity_docids_sorters[*prox as usize - 1] - .insert(&key_buffer, value_writer.into_inner().unwrap())?; + let value_buffer = value_writer.into_inner().unwrap(); + word_pair_proximity_docids_raw_kv.push(&key_buffer, value_buffer).unwrap(); + word_pair_proximity_docids_sorters[*prox as usize - 1].insert(&key_buffer, value_buffer)?; } Ok(()) diff --git a/milli/src/update/index_documents/extract/extract_word_position_docids.rs b/milli/src/update/index_documents/extract/extract_word_position_docids.rs index 4bc553d9a..f17645834 100644 --- a/milli/src/update/index_documents/extract/extract_word_position_docids.rs +++ b/milli/src/update/index_documents/extract/extract_word_position_docids.rs @@ -1,19 +1,25 @@ use std::collections::BTreeSet; use std::fs::File; use std::io::{self, BufReader}; +use std::sync::Mutex; use obkv::KvReaderU16; +use once_cell::sync::Lazy; use super::helpers::{ create_sorter, merge_deladd_cbo_roaring_bitmaps, sorter_into_reader, try_split_array_at, GrenadParameters, }; +use super::RawKVWriter; use crate::error::SerializationError; use crate::index::db_name::DOCID_WORD_POSITIONS; use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd}; use crate::update::MergeFn; use crate::{bucketed_position, DocumentId, Result}; +static WORD_POSITION_DOCIDS_RAW_KV: Lazy> = + Lazy::new(|| Mutex::new(RawKVWriter::new("extract_word_position_docids").unwrap())); + /// Extracts the word positions and the documents ids where this word appear. /// /// Returns a grenad reader with the list of extracted words at positions and @@ -88,6 +94,8 @@ pub fn extract_word_position_docids( )?; } + WORD_POSITION_DOCIDS_RAW_KV.lock().unwrap().flush().unwrap(); + // TODO remove noop DelAdd OBKV let word_position_docids_reader = sorter_into_reader(word_position_docids_sorter, indexer)?; @@ -107,6 +115,8 @@ fn words_position_into_sorter( use itertools::merge_join_by; use itertools::EitherOrBoth::{Both, Left, Right}; + let mut word_position_docids_raw_kv = WORD_POSITION_DOCIDS_RAW_KV.lock().unwrap(); + let mut buffer = Vec::new(); for eob in merge_join_by(del_word_positions.iter(), add_word_positions.iter(), |d, a| d.cmp(a)) { @@ -133,7 +143,10 @@ fn words_position_into_sorter( key_buffer.extend_from_slice(word_bytes); key_buffer.push(0); key_buffer.extend_from_slice(&position.to_be_bytes()); - word_position_docids_sorter.insert(&key_buffer, value_writer.into_inner().unwrap())?; + + let value_buffer = value_writer.into_inner().unwrap(); + word_position_docids_raw_kv.push(key_buffer, value_buffer).unwrap(); + word_position_docids_sorter.insert(&key_buffer, &value_buffer)?; } Ok(()) diff --git a/milli/src/update/index_documents/extract/mod.rs b/milli/src/update/index_documents/extract/mod.rs index 44f54ff26..221181c09 100644 --- a/milli/src/update/index_documents/extract/mod.rs +++ b/milli/src/update/index_documents/extract/mod.rs @@ -11,7 +11,7 @@ mod extract_word_position_docids; use std::collections::HashSet; use std::fs::File; -use std::io::BufReader; +use std::io::{self, BufReader, Write}; use crossbeam_channel::Sender; use rayon::prelude::*; @@ -38,6 +38,30 @@ use crate::proximity::ProximityPrecision; use crate::vector::EmbeddingConfigs; use crate::{FieldId, FieldsIdsMap, Result}; +pub struct RawKVWriter { + file: io::BufWriter, +} + +impl RawKVWriter { + pub fn new(path: &str) -> io::Result { + Ok(Self { file: File::create(path).map(io::BufWriter::new)? }) + } + + pub fn push(&mut self, key: &[u8], value: &[u8]) -> io::Result<()> { + let key_len = key.len().to_be_bytes(); + let value_len = value.len().to_be_bytes(); + self.file.write_all(&key_len)?; + self.file.write_all(key)?; + self.file.write_all(&value_len)?; + self.file.write_all(value)?; + Ok(()) + } + + pub fn flush(&mut self) -> io::Result<()> { + self.file.flush() + } +} + /// Extract data for each databases from obkv documents in parallel. /// Send data in grenad file over provided Sender. #[allow(clippy::too_many_arguments)]