use std::borrow::Cow; use std::fs::File; use std::io::{self, BufReader, BufWriter, Seek}; use std::time::Instant; use grenad::{CompressionType, Sorter}; use heed::types::ByteSlice; use log::debug; use super::{ClonableMmap, MergeFn}; use crate::error::InternalError; use crate::Result; pub type CursorClonableMmap = io::Cursor; pub fn create_writer( typ: grenad::CompressionType, level: Option, file: R, ) -> grenad::Writer> { let mut builder = grenad::Writer::builder(); builder.compression_type(typ); if let Some(level) = level { builder.compression_level(level); } builder.build(BufWriter::new(file)) } pub fn create_sorter( sort_algorithm: grenad::SortAlgorithm, merge: MergeFn, chunk_compression_type: grenad::CompressionType, chunk_compression_level: Option, max_nb_chunks: Option, max_memory: Option, ) -> grenad::Sorter { let mut builder = grenad::Sorter::builder(merge); builder.chunk_compression_type(chunk_compression_type); if let Some(level) = chunk_compression_level { builder.chunk_compression_level(level); } if let Some(nb_chunks) = max_nb_chunks { builder.max_nb_chunks(nb_chunks); } if let Some(memory) = max_memory { builder.dump_threshold(memory); builder.allow_realloc(false); } builder.sort_algorithm(sort_algorithm); builder.build() } pub fn sorter_into_reader( sorter: grenad::Sorter, indexer: GrenadParameters, ) -> Result>> { puffin::profile_function!(); let mut writer = create_writer( indexer.chunk_compression_type, indexer.chunk_compression_level, tempfile::tempfile()?, ); sorter.write_into_stream_writer(&mut writer)?; writer_into_reader(writer) } pub fn writer_into_reader( writer: grenad::Writer>, ) -> Result>> { let mut file = writer.into_inner()?.into_inner().map_err(|err| err.into_error())?; file.rewind()?; grenad::Reader::new(BufReader::new(file)).map_err(Into::into) } pub unsafe fn as_cloneable_grenad( reader: &grenad::Reader>, ) -> Result> { let file = reader.get_ref().get_ref(); let mmap = memmap2::Mmap::map(file)?; let cursor = io::Cursor::new(ClonableMmap::from(mmap)); let reader = grenad::Reader::new(cursor)?; Ok(reader) } pub trait MergeableReader where Self: Sized, { type Output; fn merge(self, merge_fn: MergeFn, indexer: &GrenadParameters) -> Result; } impl MergeableReader for Vec>> { type Output = grenad::Reader>; fn merge(self, merge_fn: MergeFn, params: &GrenadParameters) -> Result { let mut merger = MergerBuilder::new(merge_fn); self.into_iter().try_for_each(|r| merger.push(r))?; merger.finish(params) } } impl MergeableReader for Vec<(grenad::Reader>, grenad::Reader>)> { type Output = (grenad::Reader>, grenad::Reader>); fn merge(self, merge_fn: MergeFn, params: &GrenadParameters) -> Result { let mut m1 = MergerBuilder::new(merge_fn); let mut m2 = MergerBuilder::new(merge_fn); for (r1, r2) in self.into_iter() { m1.push(r1)?; m2.push(r2)?; } Ok((m1.finish(params)?, m2.finish(params)?)) } } impl MergeableReader for Vec<( grenad::Reader>, grenad::Reader>, grenad::Reader>, )> { type Output = ( grenad::Reader>, grenad::Reader>, grenad::Reader>, ); fn merge(self, merge_fn: MergeFn, params: &GrenadParameters) -> Result { let mut m1 = MergerBuilder::new(merge_fn); let mut m2 = MergerBuilder::new(merge_fn); let mut m3 = MergerBuilder::new(merge_fn); for (r1, r2, r3) in self.into_iter() { m1.push(r1)?; m2.push(r2)?; m3.push(r3)?; } Ok((m1.finish(params)?, m2.finish(params)?, m3.finish(params)?)) } } struct MergerBuilder(grenad::MergerBuilder); impl MergerBuilder { fn new(merge_fn: MergeFn) -> Self { Self(grenad::MergerBuilder::new(merge_fn)) } fn push(&mut self, reader: grenad::Reader) -> Result<()> { self.0.push(reader.into_cursor()?); Ok(()) } fn finish(self, params: &GrenadParameters) -> Result>> { let merger = self.0.build(); let mut writer = create_writer( params.chunk_compression_type, params.chunk_compression_level, tempfile::tempfile()?, ); merger.write_into_stream_writer(&mut writer)?; writer_into_reader(writer) } } #[derive(Debug, Clone, Copy)] pub struct GrenadParameters { pub chunk_compression_type: CompressionType, pub chunk_compression_level: Option, pub max_memory: Option, pub max_nb_chunks: Option, } impl Default for GrenadParameters { fn default() -> Self { Self { chunk_compression_type: CompressionType::None, chunk_compression_level: None, max_memory: None, max_nb_chunks: None, } } } impl GrenadParameters { /// This function use the number of threads in the current threadpool to compute the value. /// This should be called inside of a rayon thread pool, /// Otherwise, it will take the global number of threads. pub fn max_memory_by_thread(&self) -> Option { self.max_memory.map(|max_memory| max_memory / rayon::current_num_threads()) } } /// Returns an iterator that outputs grenad readers of obkv documents /// with a maximum size of approximately `documents_chunks_size`. /// /// The grenad obkv entries are composed of an incremental document id big-endian /// encoded as the key and an obkv object with an `u8` for the field as the key /// and a simple UTF-8 encoded string as the value. pub fn grenad_obkv_into_chunks( reader: grenad::Reader, indexer: GrenadParameters, documents_chunk_size: usize, ) -> Result>>>> { let mut continue_reading = true; let mut cursor = reader.into_cursor()?; let mut transposer = move || { if !continue_reading { return Ok(None); } let mut current_chunk_size = 0u64; let mut obkv_documents = create_writer( indexer.chunk_compression_type, indexer.chunk_compression_level, tempfile::tempfile()?, ); while let Some((document_id, obkv)) = cursor.move_on_next()? { obkv_documents.insert(document_id, obkv)?; current_chunk_size += document_id.len() as u64 + obkv.len() as u64; if current_chunk_size >= documents_chunk_size as u64 { return writer_into_reader(obkv_documents).map(Some); } } continue_reading = false; writer_into_reader(obkv_documents).map(Some) }; Ok(std::iter::from_fn(move || transposer().transpose())) } pub fn sorter_into_lmdb_database( wtxn: &mut heed::RwTxn, database: heed::PolyDatabase, sorter: Sorter, merge: MergeFn, ) -> Result<()> { puffin::profile_function!(); debug!("Writing MTBL sorter..."); let before = Instant::now(); let mut merger_iter = sorter.into_stream_merger_iter()?; if database.is_empty(wtxn)? { let mut out_iter = database.iter_mut::<_, ByteSlice, ByteSlice>(wtxn)?; while let Some((k, v)) = merger_iter.next()? { // safety: we don't keep references from inside the LMDB database. unsafe { out_iter.append(k, v)? }; } } else { while let Some((k, v)) = merger_iter.next()? { let mut iter = database.prefix_iter_mut::<_, ByteSlice, ByteSlice>(wtxn, k)?; match iter.next().transpose()? { Some((key, old_val)) if key == k => { let vals = vec![Cow::Borrowed(old_val), Cow::Borrowed(v)]; let val = merge(k, &vals).map_err(|_| { // TODO just wrap this error? InternalError::IndexingMergingKeys { process: "get-put-merge" } })?; // safety: we don't keep references from inside the LMDB database. unsafe { iter.put_current(k, &val)? }; } _ => { drop(iter); database.put::<_, ByteSlice, ByteSlice>(wtxn, k, v)?; } } } } debug!("MTBL sorter writen in {:.02?}!", before.elapsed()); Ok(()) } /// Used when trying to merge readers, but you don't actually care about the values. pub fn merge_ignore_values<'a>(_key: &[u8], _values: &[Cow<'a, [u8]>]) -> Result> { Ok(Cow::Owned(Vec::new())) }