288 lines
9.0 KiB
Rust
Raw Normal View History

2021-08-16 13:36:30 +02:00
use std::borrow::Cow;
use std::fs::File;
use std::io::{self, BufReader, BufWriter, Seek};
2021-08-16 13:36:30 +02:00
use grenad::{CompressionType, Sorter};
2021-08-16 13:36:30 +02:00
use heed::types::ByteSlice;
use super::{ClonableMmap, MergeFn};
2023-11-08 16:41:26 +01:00
use crate::update::index_documents::valid_lmdb_key;
2021-08-16 13:36:30 +02:00
use crate::Result;
pub type CursorClonableMmap = io::Cursor<ClonableMmap>;
pub fn create_writer<R: io::Write>(
typ: grenad::CompressionType,
level: Option<u32>,
file: R,
) -> grenad::Writer<BufWriter<R>> {
2021-08-16 13:36:30 +02:00
let mut builder = grenad::Writer::builder();
builder.compression_type(typ);
if let Some(level) = level {
builder.compression_level(level);
}
builder.build(BufWriter::new(file))
2021-08-16 13:36:30 +02:00
}
pub fn create_sorter(
sort_algorithm: grenad::SortAlgorithm,
2021-08-16 13:36:30 +02:00
merge: MergeFn,
chunk_compression_type: grenad::CompressionType,
chunk_compression_level: Option<u32>,
max_nb_chunks: Option<usize>,
max_memory: Option<usize>,
) -> grenad::Sorter<MergeFn> {
let mut builder = grenad::Sorter::builder(merge);
builder.chunk_compression_type(chunk_compression_type);
if let Some(level) = chunk_compression_level {
builder.chunk_compression_level(level);
}
if let Some(nb_chunks) = max_nb_chunks {
builder.max_nb_chunks(nb_chunks);
}
if let Some(memory) = max_memory {
builder.dump_threshold(memory);
builder.allow_realloc(false);
}
builder.sort_algorithm(sort_algorithm);
builder.sort_in_parallel(true);
2021-08-16 13:36:30 +02:00
builder.build()
}
pub fn sorter_into_reader(
sorter: grenad::Sorter<MergeFn>,
indexer: GrenadParameters,
) -> Result<grenad::Reader<BufReader<File>>> {
puffin::profile_function!();
2022-02-16 15:28:48 +01:00
let mut writer = create_writer(
indexer.chunk_compression_type,
indexer.chunk_compression_level,
tempfile::tempfile()?,
);
sorter.write_into_stream_writer(&mut writer)?;
writer_into_reader(writer)
2021-08-16 13:36:30 +02:00
}
pub fn writer_into_reader(
writer: grenad::Writer<BufWriter<File>>,
) -> Result<grenad::Reader<BufReader<File>>> {
let mut file = writer.into_inner()?.into_inner().map_err(|err| err.into_error())?;
2023-01-30 17:17:35 +01:00
file.rewind()?;
grenad::Reader::new(BufReader::new(file)).map_err(Into::into)
2021-08-16 13:36:30 +02:00
}
pub unsafe fn as_cloneable_grenad(
reader: &grenad::Reader<BufReader<File>>,
2021-08-16 13:36:30 +02:00
) -> Result<grenad::Reader<CursorClonableMmap>> {
let file = reader.get_ref().get_ref();
let mmap = memmap2::Mmap::map(file)?;
2021-08-16 13:36:30 +02:00
let cursor = io::Cursor::new(ClonableMmap::from(mmap));
let reader = grenad::Reader::new(cursor)?;
Ok(reader)
}
2022-03-23 14:48:15 +01:00
pub trait MergeableReader
where
Self: Sized,
{
type Output;
fn merge(self, merge_fn: MergeFn, indexer: &GrenadParameters) -> Result<Self::Output>;
}
impl MergeableReader for Vec<grenad::Reader<BufReader<File>>> {
type Output = grenad::Reader<BufReader<File>>;
2022-03-23 14:48:15 +01:00
fn merge(self, merge_fn: MergeFn, params: &GrenadParameters) -> Result<Self::Output> {
let mut merger = MergerBuilder::new(merge_fn);
self.into_iter().try_for_each(|r| merger.push(r))?;
merger.finish(params)
2022-02-16 15:28:48 +01:00
}
2022-03-23 14:48:15 +01:00
}
2022-02-16 15:28:48 +01:00
impl MergeableReader for Vec<(grenad::Reader<BufReader<File>>, grenad::Reader<BufReader<File>>)> {
type Output = (grenad::Reader<BufReader<File>>, grenad::Reader<BufReader<File>>);
2022-02-16 15:28:48 +01:00
2022-03-23 14:48:15 +01:00
fn merge(self, merge_fn: MergeFn, params: &GrenadParameters) -> Result<Self::Output> {
let mut m1 = MergerBuilder::new(merge_fn);
let mut m2 = MergerBuilder::new(merge_fn);
for (r1, r2) in self.into_iter() {
m1.push(r1)?;
m2.push(r2)?;
}
Ok((m1.finish(params)?, m2.finish(params)?))
}
}
2023-09-18 09:59:38 +02:00
impl MergeableReader
for Vec<(
grenad::Reader<BufReader<File>>,
grenad::Reader<BufReader<File>>,
grenad::Reader<BufReader<File>>,
)>
{
type Output = (
grenad::Reader<BufReader<File>>,
grenad::Reader<BufReader<File>>,
grenad::Reader<BufReader<File>>,
);
fn merge(self, merge_fn: MergeFn, params: &GrenadParameters) -> Result<Self::Output> {
let mut m1 = MergerBuilder::new(merge_fn);
let mut m2 = MergerBuilder::new(merge_fn);
let mut m3 = MergerBuilder::new(merge_fn);
for (r1, r2, r3) in self.into_iter() {
m1.push(r1)?;
m2.push(r2)?;
m3.push(r3)?;
}
Ok((m1.finish(params)?, m2.finish(params)?, m3.finish(params)?))
}
}
2022-03-23 14:48:15 +01:00
struct MergerBuilder<R>(grenad::MergerBuilder<R, MergeFn>);
impl<R: io::Read + io::Seek> MergerBuilder<R> {
fn new(merge_fn: MergeFn) -> Self {
Self(grenad::MergerBuilder::new(merge_fn))
}
fn push(&mut self, reader: grenad::Reader<R>) -> Result<()> {
self.0.push(reader.into_cursor()?);
Ok(())
}
fn finish(self, params: &GrenadParameters) -> Result<grenad::Reader<BufReader<File>>> {
2022-03-23 14:48:15 +01:00
let merger = self.0.build();
let mut writer = create_writer(
params.chunk_compression_type,
params.chunk_compression_level,
tempfile::tempfile()?,
);
merger.write_into_stream_writer(&mut writer)?;
writer_into_reader(writer)
2022-03-23 14:48:15 +01:00
}
2021-08-16 13:36:30 +02:00
}
#[derive(Debug, Clone, Copy)]
pub struct GrenadParameters {
pub chunk_compression_type: CompressionType,
pub chunk_compression_level: Option<u32>,
pub max_memory: Option<usize>,
pub max_nb_chunks: Option<usize>,
}
impl Default for GrenadParameters {
fn default() -> Self {
Self {
chunk_compression_type: CompressionType::None,
chunk_compression_level: None,
max_memory: None,
max_nb_chunks: None,
}
}
}
impl GrenadParameters {
2021-09-02 15:17:52 +02:00
/// This function use the number of threads in the current threadpool to compute the value.
/// This should be called inside of a rayon thread pool,
/// Otherwise, it will take the global number of threads.
2021-08-16 13:36:30 +02:00
pub fn max_memory_by_thread(&self) -> Option<usize> {
self.max_memory.map(|max_memory| max_memory / rayon::current_num_threads())
}
}
/// Returns an iterator that outputs grenad readers of obkv documents
/// with a maximum size of approximately `documents_chunks_size`.
///
/// The grenad obkv entries are composed of an incremental document id big-endian
/// encoded as the key and an obkv object with an `u8` for the field as the key
/// and a simple UTF-8 encoded string as the value.
2022-02-16 15:28:48 +01:00
pub fn grenad_obkv_into_chunks<R: io::Read + io::Seek>(
reader: grenad::Reader<R>,
2021-08-16 13:36:30 +02:00
indexer: GrenadParameters,
2021-09-02 15:17:52 +02:00
documents_chunk_size: usize,
) -> Result<impl Iterator<Item = Result<grenad::Reader<BufReader<File>>>>> {
2021-08-16 13:36:30 +02:00
let mut continue_reading = true;
2022-02-16 15:28:48 +01:00
let mut cursor = reader.into_cursor()?;
2021-08-16 13:36:30 +02:00
let mut transposer = move || {
if !continue_reading {
return Ok(None);
}
let mut current_chunk_size = 0u64;
2022-02-16 15:28:48 +01:00
let mut obkv_documents = create_writer(
indexer.chunk_compression_type,
indexer.chunk_compression_level,
2022-02-16 15:28:48 +01:00
tempfile::tempfile()?,
);
while let Some((document_id, obkv)) = cursor.move_on_next()? {
if !obkv.is_empty() {
obkv_documents.insert(document_id, obkv)?;
current_chunk_size += document_id.len() as u64 + obkv.len() as u64;
2021-08-16 13:36:30 +02:00
if current_chunk_size >= documents_chunk_size as u64 {
return writer_into_reader(obkv_documents).map(Some);
}
2021-08-16 13:36:30 +02:00
}
}
continue_reading = false;
writer_into_reader(obkv_documents).map(Some)
};
2021-09-02 16:57:46 +02:00
Ok(std::iter::from_fn(move || transposer().transpose()))
2021-08-16 13:36:30 +02:00
}
2023-11-08 16:41:26 +01:00
/// Write provided sorter in database using serialize_value function.
/// merge_values function is used if an entry already exist in the database.
pub fn write_sorter_into_database<K, V, FS, FM>(
sorter: Sorter<MergeFn>,
database: &heed::Database<K, V>,
wtxn: &mut heed::RwTxn,
index_is_empty: bool,
serialize_value: FS,
merge_values: FM,
) -> Result<()>
where
FS: for<'a> Fn(&'a [u8], &'a mut Vec<u8>) -> Result<&'a [u8]>,
FM: for<'a> Fn(&[u8], &[u8], &'a mut Vec<u8>) -> Result<Option<&'a [u8]>>,
{
puffin::profile_function!();
let mut buffer = Vec::new();
let database = database.remap_types::<ByteSlice, ByteSlice>();
let mut merger_iter = sorter.into_stream_merger_iter()?;
while let Some((key, value)) = merger_iter.next()? {
if valid_lmdb_key(key) {
buffer.clear();
let value = if index_is_empty {
Some(serialize_value(value, &mut buffer)?)
} else {
match database.get(wtxn, key)? {
Some(prev_value) => merge_values(value, prev_value, &mut buffer)?,
None => Some(serialize_value(value, &mut buffer)?),
}
};
match value {
Some(value) => database.put(wtxn, key, value)?,
None => {
database.delete(wtxn, key)?;
}
}
}
}
Ok(())
}
2022-03-24 15:22:57 +01:00
/// Used when trying to merge readers, but you don't actually care about the values.
pub fn merge_ignore_values<'a>(_key: &[u8], _values: &[Cow<'a, [u8]>]) -> Result<Cow<'a, [u8]>> {
2022-03-24 15:22:57 +01:00
Ok(Cow::Owned(Vec::new()))
}