use std::fs::File; use std::io::{self, BufReader, BufWriter, Seek}; use grenad::{CompressionType, MergeFunction, Sorter}; use heed::types::Bytes; use super::ClonableMmap; use crate::update::index_documents::valid_lmdb_key; use crate::Result; /// This is something reasonable given the fact /// that there is one grenad sorter by thread. const MAX_GRENAD_SORTER_USAGE: usize = 500 * 1024 * 1024; // 500 MiB pub type CursorClonableMmap = io::Cursor; pub fn create_writer( typ: grenad::CompressionType, level: Option, file: R, ) -> grenad::Writer> { let mut builder = grenad::Writer::builder(); builder.compression_type(typ); if let Some(level) = level { builder.compression_level(level); } builder.build(BufWriter::new(file)) } /// A helper function that creates a grenad sorter /// with the given parameters. The max memory is /// clamped to something reasonable. pub fn create_sorter( sort_algorithm: grenad::SortAlgorithm, merge: MF, chunk_compression_type: grenad::CompressionType, chunk_compression_level: Option, max_nb_chunks: Option, max_memory: Option, sort_in_parallel: bool, ) -> grenad::Sorter { let mut builder = grenad::Sorter::builder(merge); builder.chunk_compression_type(chunk_compression_type); if let Some(level) = chunk_compression_level { builder.chunk_compression_level(level); } if let Some(nb_chunks) = max_nb_chunks { builder.max_nb_chunks(nb_chunks); } if let Some(memory) = max_memory { builder.dump_threshold(memory.min(MAX_GRENAD_SORTER_USAGE)); builder.allow_realloc(false); } builder.sort_algorithm(sort_algorithm); builder.sort_in_parallel(sort_in_parallel); builder.build() } #[tracing::instrument(level = "trace", skip_all, target = "indexing::grenad")] pub fn sorter_into_reader( sorter: grenad::Sorter, indexer: GrenadParameters, ) -> Result>> where MF: MergeFunction, crate::Error: From, { let mut writer = create_writer( indexer.chunk_compression_type, indexer.chunk_compression_level, tempfile::tempfile()?, ); sorter.write_into_stream_writer(&mut writer)?; writer_into_reader(writer) } pub fn writer_into_reader( writer: grenad::Writer>, ) -> Result>> { let mut file = writer.into_inner()?.into_inner().map_err(|err| err.into_error())?; file.rewind()?; grenad::Reader::new(BufReader::new(file)).map_err(Into::into) } pub unsafe fn as_cloneable_grenad( reader: &grenad::Reader>, ) -> Result> { let file = reader.get_ref().get_ref(); let mmap = memmap2::Mmap::map(file)?; let cursor = io::Cursor::new(ClonableMmap::from(mmap)); let reader = grenad::Reader::new(cursor)?; Ok(reader) } #[derive(Debug, Clone, Copy)] pub struct GrenadParameters { pub chunk_compression_type: CompressionType, pub chunk_compression_level: Option, pub max_memory: Option, pub max_nb_chunks: Option, } impl Default for GrenadParameters { fn default() -> Self { Self { chunk_compression_type: CompressionType::None, chunk_compression_level: None, max_memory: None, max_nb_chunks: None, } } } impl GrenadParameters { /// This function use the number of threads in the current threadpool to compute the value. /// /// This should be called inside of a rayon thread pool, /// otherwise, it will take the global number of threads. /// /// The max memory cannot exceed a given reasonable value. pub fn max_memory_by_thread(&self) -> Option { self.max_memory.map(|max_memory| { (max_memory / rayon::current_num_threads()).min(MAX_GRENAD_SORTER_USAGE) }) } } /// Returns an iterator that outputs grenad readers of obkv documents /// with a maximum size of approximately `documents_chunks_size`. /// /// The grenad obkv entries are composed of an incremental document id big-endian /// encoded as the key and an obkv object with an `u8` for the field as the key /// and a simple UTF-8 encoded string as the value. pub fn grenad_obkv_into_chunks( reader: grenad::Reader, indexer: GrenadParameters, documents_chunk_size: usize, ) -> Result>>>> { let mut continue_reading = true; let mut cursor = reader.into_cursor()?; let mut transposer = move || { if !continue_reading { return Ok(None); } let mut current_chunk_size = 0u64; let mut obkv_documents = create_writer( indexer.chunk_compression_type, indexer.chunk_compression_level, tempfile::tempfile()?, ); while let Some((document_id, obkv)) = cursor.move_on_next()? { if !obkv.is_empty() { obkv_documents.insert(document_id, obkv)?; current_chunk_size += document_id.len() as u64 + obkv.len() as u64; if current_chunk_size >= documents_chunk_size as u64 { return writer_into_reader(obkv_documents).map(Some); } } } continue_reading = false; writer_into_reader(obkv_documents).map(Some) }; Ok(std::iter::from_fn(move || transposer().transpose())) } /// Write provided sorter in database using serialize_value function. /// merge_values function is used if an entry already exist in the database. #[tracing::instrument(level = "trace", skip_all, target = "indexing::grenad")] pub fn write_sorter_into_database( sorter: Sorter, database: &heed::Database, wtxn: &mut heed::RwTxn<'_>, index_is_empty: bool, serialize_value: FS, merge_values: FM, ) -> Result<()> where FS: for<'a> Fn(&'a [u8], &'a mut Vec) -> Result<&'a [u8]>, FM: for<'a> Fn(&[u8], &[u8], &'a mut Vec) -> Result>, MF: MergeFunction, crate::Error: From, { let mut buffer = Vec::new(); let database = database.remap_types::(); let mut merger_iter = sorter.into_stream_merger_iter()?; while let Some((key, value)) = merger_iter.next()? { if valid_lmdb_key(key) { buffer.clear(); let value = if index_is_empty { Some(serialize_value(value, &mut buffer)?) } else { match database.get(wtxn, key)? { Some(prev_value) => merge_values(value, prev_value, &mut buffer)?, None => Some(serialize_value(value, &mut buffer)?), } }; match value { Some(value) => database.put(wtxn, key, value)?, None => { database.delete(wtxn, key)?; } } } } Ok(()) }