From a122d3d466dd07f598f024d4c781ddb4944c1aa8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Tue, 20 Oct 2020 14:20:17 +0200 Subject: [PATCH] Export the indexing part into a module --- src/indexing/merge_function.rs | 68 +++ src/indexing/mod.rs | 336 +++++++++++++ src/indexing/store.rs | 443 ++++++++++++++++++ src/lib.rs | 1 + src/subcommand/indexer.rs | 833 +-------------------------------- src/subcommand/serve.rs | 4 + 6 files changed, 861 insertions(+), 824 deletions(-) create mode 100644 src/indexing/merge_function.rs create mode 100644 src/indexing/mod.rs create mode 100644 src/indexing/store.rs diff --git a/src/indexing/merge_function.rs b/src/indexing/merge_function.rs new file mode 100644 index 000000000..f9506db1c --- /dev/null +++ b/src/indexing/merge_function.rs @@ -0,0 +1,68 @@ +use bstr::ByteSlice as _; +use fst::IntoStreamer; +use roaring::RoaringBitmap; + +use crate::heed_codec::CboRoaringBitmapCodec; + +const WORDS_FST_KEY: &[u8] = crate::WORDS_FST_KEY.as_bytes(); +const HEADERS_KEY: &[u8] = crate::HEADERS_KEY.as_bytes(); +const DOCUMENTS_IDS_KEY: &[u8] = crate::DOCUMENTS_IDS_KEY.as_bytes(); + +pub fn main_merge(key: &[u8], values: &[Vec]) -> Result, ()> { + match key { + WORDS_FST_KEY => { + let fsts: Vec<_> = values.iter().map(|v| fst::Set::new(v).unwrap()).collect(); + + // Union of the FSTs + let mut op = fst::set::OpBuilder::new(); + fsts.iter().for_each(|fst| op.push(fst.into_stream())); + let op = op.r#union(); + + let mut build = fst::SetBuilder::memory(); + build.extend_stream(op.into_stream()).unwrap(); + Ok(build.into_inner().unwrap()) + }, + HEADERS_KEY => { + assert!(values.windows(2).all(|vs| vs[0] == vs[1])); + Ok(values[0].to_vec()) + }, + DOCUMENTS_IDS_KEY => word_docids_merge(&[], values), + otherwise => panic!("wut {:?}", otherwise), + } +} + +pub fn word_docids_merge(_key: &[u8], values: &[Vec]) -> Result, ()> { + let (head, tail) = values.split_first().unwrap(); + let mut head = RoaringBitmap::deserialize_from(head.as_slice()).unwrap(); + + for value in tail { + let bitmap = RoaringBitmap::deserialize_from(value.as_slice()).unwrap(); + head.union_with(&bitmap); + } + + let mut vec = Vec::with_capacity(head.serialized_size()); + head.serialize_into(&mut vec).unwrap(); + Ok(vec) +} + +pub fn docid_word_positions_merge(key: &[u8], _values: &[Vec]) -> Result, ()> { + panic!("merging docid word positions is an error ({:?})", key.as_bstr()) +} + +pub fn words_pairs_proximities_docids_merge(_key: &[u8], values: &[Vec]) -> Result, ()> { + let (head, tail) = values.split_first().unwrap(); + let mut head = CboRoaringBitmapCodec::deserialize_from(head.as_slice()).unwrap(); + + for value in tail { + let bitmap = CboRoaringBitmapCodec::deserialize_from(value.as_slice()).unwrap(); + head.union_with(&bitmap); + } + + let mut vec = Vec::new(); + CboRoaringBitmapCodec::serialize_into(&head, &mut vec).unwrap(); + Ok(vec) +} + +pub fn documents_merge(key: &[u8], _values: &[Vec]) -> Result, ()> { + panic!("merging documents is an error ({:?})", key.as_bstr()) +} diff --git a/src/indexing/mod.rs b/src/indexing/mod.rs new file mode 100644 index 000000000..fd5a8e607 --- /dev/null +++ b/src/indexing/mod.rs @@ -0,0 +1,336 @@ +use std::fs::File; +use std::io::{self, Read, Seek, SeekFrom}; +use std::sync::mpsc::sync_channel; +use std::time::Instant; + +use anyhow::Context; +use bstr::ByteSlice as _; +use flate2::read::GzDecoder; +use grenad::{Writer, Sorter, Merger, Reader, FileFuse, CompressionType}; +use heed::types::ByteSlice; +use log::{debug, info}; +use rayon::prelude::*; +use structopt::StructOpt; +use tempfile::tempfile; + +use crate::Index; +use self::store::Store; +use self::merge_function::{ + main_merge, word_docids_merge, words_pairs_proximities_docids_merge, + docid_word_positions_merge, documents_merge, +}; + +mod store; +mod merge_function; + +#[derive(Debug, StructOpt)] +pub struct IndexerOpt { + /// The amount of documents to skip before printing + /// a log regarding the indexing advancement. + #[structopt(long, default_value = "1000000")] // 1m + log_every_n: usize, + + /// MTBL max number of chunks in bytes. + #[structopt(long)] + max_nb_chunks: Option, + + /// The maximum amount of memory to use for the MTBL buffer. It is recommended + /// to use something like 80%-90% of the available memory. + /// + /// It is automatically split by the number of jobs e.g. if you use 7 jobs + /// and 7 GB of max memory, each thread will use a maximum of 1 GB. + #[structopt(long, default_value = "7516192768")] // 7 GB + max_memory: usize, + + /// Size of the linked hash map cache when indexing. + /// The bigger it is, the faster the indexing is but the more memory it takes. + #[structopt(long, default_value = "500")] + linked_hash_map_size: usize, + + /// The name of the compression algorithm to use when compressing intermediate + /// chunks during indexing documents. + /// + /// Choosing a fast algorithm will make the indexing faster but may consume more memory. + #[structopt(long, default_value = "snappy", possible_values = &["snappy", "zlib", "lz4", "lz4hc", "zstd"])] + chunk_compression_type: CompressionType, + + /// The level of compression of the chosen algorithm. + #[structopt(long, requires = "chunk-compression-type")] + chunk_compression_level: Option, + + /// The number of bytes to remove from the begining of the chunks while reading/sorting + /// or merging them. + /// + /// File fusing must only be enable on file systems that support the `FALLOC_FL_COLLAPSE_RANGE`, + /// (i.e. ext4 and XFS). File fusing will only work if the `enable-chunk-fusing` is set. + #[structopt(long, default_value = "4294967296")] // 4 GB + chunk_fusing_shrink_size: u64, + + /// Enable the chunk fusing or not, this reduces the amount of disk used by a factor of 2. + #[structopt(long)] + enable_chunk_fusing: bool, + + /// Number of parallel jobs for indexing, defaults to # of CPUs. + #[structopt(long)] + indexing_jobs: Option, +} + +type MergeFn = fn(&[u8], &[Vec]) -> Result, ()>; + +fn create_writer(typ: CompressionType, level: Option, file: File) -> io::Result> { + let mut builder = Writer::builder(); + builder.compression_type(typ); + if let Some(level) = level { + builder.compression_level(level); + } + builder.build(file) +} + +fn create_sorter( + merge: MergeFn, + chunk_compression_type: CompressionType, + chunk_compression_level: Option, + chunk_fusing_shrink_size: Option, + max_nb_chunks: Option, + max_memory: Option, +) -> Sorter +{ + let mut builder = Sorter::builder(merge); + if let Some(shrink_size) = chunk_fusing_shrink_size { + builder.file_fusing_shrink_size(shrink_size); + } + builder.chunk_compression_type(chunk_compression_type); + if let Some(level) = chunk_compression_level { + builder.chunk_compression_level(level); + } + if let Some(nb_chunks) = max_nb_chunks { + builder.max_nb_chunks(nb_chunks); + } + if let Some(memory) = max_memory { + builder.max_memory(memory); + } + builder.build() +} + +fn writer_into_reader(writer: Writer, shrink_size: Option) -> anyhow::Result> { + let mut file = writer.into_inner()?; + file.seek(SeekFrom::Start(0))?; + let file = if let Some(shrink_size) = shrink_size { + FileFuse::builder().shrink_size(shrink_size).build(file) + } else { + FileFuse::new(file) + }; + Reader::new(file).map_err(Into::into) +} + +fn merge_readers(sources: Vec>, merge: MergeFn) -> Merger { + let mut builder = Merger::builder(merge); + builder.extend(sources); + builder.build() +} + +fn merge_into_lmdb_database( + wtxn: &mut heed::RwTxn, + database: heed::PolyDatabase, + sources: Vec>, + merge: MergeFn, +) -> anyhow::Result<()> { + debug!("Merging {} MTBL stores...", sources.len()); + let before = Instant::now(); + + let merger = merge_readers(sources, merge); + let mut in_iter = merger.into_merge_iter()?; + + let mut out_iter = database.iter_mut::<_, ByteSlice, ByteSlice>(wtxn)?; + while let Some((k, v)) = in_iter.next()? { + out_iter.append(k, v).with_context(|| format!("writing {:?} into LMDB", k.as_bstr()))?; + } + + debug!("MTBL stores merged in {:.02?}!", before.elapsed()); + Ok(()) +} + +fn write_into_lmdb_database( + wtxn: &mut heed::RwTxn, + database: heed::PolyDatabase, + mut reader: Reader, +) -> anyhow::Result<()> { + debug!("Writing MTBL stores..."); + let before = Instant::now(); + + let mut out_iter = database.iter_mut::<_, ByteSlice, ByteSlice>(wtxn)?; + while let Some((k, v)) = reader.next()? { + out_iter.append(k, v).with_context(|| format!("writing {:?} into LMDB", k.as_bstr()))?; + } + + debug!("MTBL stores merged in {:.02?}!", before.elapsed()); + Ok(()) +} + +fn csv_bytes_readers<'a>( + content: &'a [u8], + gzipped: bool, + count: usize, +) -> Vec>> +{ + let mut readers = Vec::new(); + + for _ in 0..count { + let content = if gzipped { + Box::new(GzDecoder::new(content)) as Box + } else { + Box::new(content) as Box + }; + let reader = csv::Reader::from_reader(content); + readers.push(reader); + } + + readers +} + +pub fn run<'a>( + env: &heed::Env, + index: &Index, + opt: IndexerOpt, + content: &'a [u8], + gzipped: bool, +) -> anyhow::Result<()> +{ + let jobs = opt.indexing_jobs.unwrap_or(0); + let pool = rayon::ThreadPoolBuilder::new().num_threads(jobs).build()?; + pool.install(|| run_intern(env, index, opt, content, gzipped)) +} + +fn run_intern<'a>( + env: &heed::Env, + index: &Index, + opt: IndexerOpt, + content: &'a [u8], + gzipped: bool, +) -> anyhow::Result<()> +{ + let before_indexing = Instant::now(); + let num_threads = rayon::current_num_threads(); + let linked_hash_map_size = opt.linked_hash_map_size; + let max_nb_chunks = opt.max_nb_chunks; + let max_memory_by_job = opt.max_memory / num_threads; + let chunk_compression_type = opt.chunk_compression_type; + let chunk_compression_level = opt.chunk_compression_level; + let log_every_n = opt.log_every_n; + + let chunk_fusing_shrink_size = if opt.enable_chunk_fusing { + Some(opt.chunk_fusing_shrink_size) + } else { + None + }; + + let readers = csv_bytes_readers(content, gzipped, num_threads) + .into_par_iter() + .enumerate() + .map(|(i, rdr)| { + let store = Store::new( + linked_hash_map_size, + max_nb_chunks, + Some(max_memory_by_job), + chunk_compression_type, + chunk_compression_level, + chunk_fusing_shrink_size, + )?; + store.index_csv(rdr, i, num_threads, log_every_n) + }) + .collect::, _>>()?; + + let mut main_readers = Vec::with_capacity(readers.len()); + let mut word_docids_readers = Vec::with_capacity(readers.len()); + let mut docid_word_positions_readers = Vec::with_capacity(readers.len()); + let mut words_pairs_proximities_docids_readers = Vec::with_capacity(readers.len()); + let mut documents_readers = Vec::with_capacity(readers.len()); + readers.into_iter().for_each(|readers| { + main_readers.push(readers.main); + word_docids_readers.push(readers.word_docids); + docid_word_positions_readers.push(readers.docid_word_positions); + words_pairs_proximities_docids_readers.push(readers.words_pairs_proximities_docids); + documents_readers.push(readers.documents); + }); + + // This is the function that merge the readers + // by using the given merge function. + let merge_readers = move |readers, merge| { + let mut writer = tempfile().and_then(|f| { + create_writer(chunk_compression_type, chunk_compression_level, f) + })?; + let merger = merge_readers(readers, merge); + merger.write_into(&mut writer)?; + writer_into_reader(writer, chunk_fusing_shrink_size) + }; + + // The enum and the channel which is used to transfert + // the readers merges potentially done on another thread. + enum DatabaseType { Main, WordDocids, WordsPairsProximitiesDocids }; + let (sender, receiver) = sync_channel(3); + + debug!("Merging the main, word docids and words pairs proximity docids in parallel..."); + rayon::spawn(move || { + vec![ + (DatabaseType::Main, main_readers, main_merge as MergeFn), + (DatabaseType::WordDocids, word_docids_readers, word_docids_merge), + ( + DatabaseType::WordsPairsProximitiesDocids, + words_pairs_proximities_docids_readers, + words_pairs_proximities_docids_merge, + ), + ] + .into_par_iter() + .for_each(|(dbtype, readers, merge)| { + let result = merge_readers(readers, merge); + sender.send((dbtype, result)).unwrap(); + }); + }); + + let mut wtxn = env.write_txn()?; + + debug!("Writing the docid word positions into LMDB on disk..."); + merge_into_lmdb_database( + &mut wtxn, + *index.docid_word_positions.as_polymorph(), + docid_word_positions_readers, + docid_word_positions_merge, + )?; + + debug!("Writing the documents into LMDB on disk..."); + merge_into_lmdb_database( + &mut wtxn, + *index.documents.as_polymorph(), + documents_readers, + documents_merge, + )?; + + for (db_type, result) in receiver { + let content = result?; + match db_type { + DatabaseType::Main => { + debug!("Writing the main elements into LMDB on disk..."); + write_into_lmdb_database(&mut wtxn, index.main, content)?; + }, + DatabaseType::WordDocids => { + debug!("Writing the words docids into LMDB on disk..."); + let db = *index.word_docids.as_polymorph(); + write_into_lmdb_database(&mut wtxn, db, content)?; + }, + DatabaseType::WordsPairsProximitiesDocids => { + debug!("Writing the words pairs proximities docids into LMDB on disk..."); + let db = *index.word_pair_proximity_docids.as_polymorph(); + write_into_lmdb_database(&mut wtxn, db, content)?; + }, + } + } + + debug!("Retrieving the number of documents..."); + let count = index.number_of_documents(&wtxn)?; + + wtxn.commit()?; + + info!("Wrote {} documents in {:.02?}", count, before_indexing.elapsed()); + + Ok(()) +} diff --git a/src/indexing/store.rs b/src/indexing/store.rs new file mode 100644 index 000000000..8f9fc15df --- /dev/null +++ b/src/indexing/store.rs @@ -0,0 +1,443 @@ +use std::collections::{BTreeMap, HashMap}; +use std::convert::TryFrom; +use std::fs::File; +use std::io::Read; +use std::iter::FromIterator; +use std::time::Instant; +use std::{cmp, iter}; + +use anyhow::Context; +use bstr::ByteSlice as _; +use csv::StringRecord; +use heed::BytesEncode; +use linked_hash_map::LinkedHashMap; +use log::{debug, info}; +use grenad::{Reader, FileFuse, Writer, Sorter, CompressionType}; +use roaring::RoaringBitmap; +use tempfile::tempfile; + +use crate::heed_codec::{CsvStringRecordCodec, BoRoaringBitmapCodec, CboRoaringBitmapCodec}; +use crate::tokenizer::{simple_tokenizer, only_token}; +use crate::{SmallVec32, Position, DocumentId}; + +use super::{MergeFn, create_writer, create_sorter, writer_into_reader}; +use super::merge_function::{main_merge, word_docids_merge, words_pairs_proximities_docids_merge}; + +const LMDB_MAX_KEY_LENGTH: usize = 511; +const ONE_KILOBYTE: usize = 1024 * 1024; + +const MAX_POSITION: usize = 1000; +const MAX_ATTRIBUTES: usize = u32::max_value() as usize / MAX_POSITION; + +const WORDS_FST_KEY: &[u8] = crate::WORDS_FST_KEY.as_bytes(); +const HEADERS_KEY: &[u8] = crate::HEADERS_KEY.as_bytes(); +const DOCUMENTS_IDS_KEY: &[u8] = crate::DOCUMENTS_IDS_KEY.as_bytes(); + +pub struct Readers { + pub main: Reader, + pub word_docids: Reader, + pub docid_word_positions: Reader, + pub words_pairs_proximities_docids: Reader, + pub documents: Reader, +} + +pub struct Store { + word_docids: LinkedHashMap, RoaringBitmap>, + word_docids_limit: usize, + words_pairs_proximities_docids: LinkedHashMap<(SmallVec32, SmallVec32, u8), RoaringBitmap>, + words_pairs_proximities_docids_limit: usize, + documents_ids: RoaringBitmap, + // MTBL parameters + chunk_compression_type: CompressionType, + chunk_compression_level: Option, + chunk_fusing_shrink_size: Option, + // MTBL sorters + main_sorter: Sorter, + word_docids_sorter: Sorter, + words_pairs_proximities_docids_sorter: Sorter, + // MTBL writers + docid_word_positions_writer: Writer, + documents_writer: Writer, +} + +impl Store { + pub fn new( + linked_hash_map_size: usize, + max_nb_chunks: Option, + max_memory: Option, + chunk_compression_type: CompressionType, + chunk_compression_level: Option, + chunk_fusing_shrink_size: Option, + ) -> anyhow::Result + { + // We divide the max memory by the number of sorter the Store have. + let max_memory = max_memory.map(|mm| cmp::max(ONE_KILOBYTE, mm / 3)); + + let main_sorter = create_sorter( + main_merge, + chunk_compression_type, + chunk_compression_level, + chunk_fusing_shrink_size, + max_nb_chunks, + max_memory, + ); + let word_docids_sorter = create_sorter( + word_docids_merge, + chunk_compression_type, + chunk_compression_level, + chunk_fusing_shrink_size, + max_nb_chunks, + max_memory, + ); + let words_pairs_proximities_docids_sorter = create_sorter( + words_pairs_proximities_docids_merge, + chunk_compression_type, + chunk_compression_level, + chunk_fusing_shrink_size, + max_nb_chunks, + max_memory, + ); + + let documents_writer = tempfile().and_then(|f| { + create_writer(chunk_compression_type, chunk_compression_level, f) + })?; + let docid_word_positions_writer = tempfile().and_then(|f| { + create_writer(chunk_compression_type, chunk_compression_level, f) + })?; + + Ok(Store { + word_docids: LinkedHashMap::with_capacity(linked_hash_map_size), + word_docids_limit: linked_hash_map_size, + words_pairs_proximities_docids: LinkedHashMap::with_capacity(linked_hash_map_size), + words_pairs_proximities_docids_limit: linked_hash_map_size, + documents_ids: RoaringBitmap::new(), + chunk_compression_type, + chunk_compression_level, + chunk_fusing_shrink_size, + + main_sorter, + word_docids_sorter, + words_pairs_proximities_docids_sorter, + + docid_word_positions_writer, + documents_writer, + }) + } + + // Save the documents ids under the position and word we have seen it. + fn insert_word_docid(&mut self, word: &str, id: DocumentId) -> anyhow::Result<()> { + // if get_refresh finds the element it is assured to be at the end of the linked hash map. + match self.word_docids.get_refresh(word.as_bytes()) { + Some(old) => { old.insert(id); }, + None => { + let word_vec = SmallVec32::from(word.as_bytes()); + // A newly inserted element is append at the end of the linked hash map. + self.word_docids.insert(word_vec, RoaringBitmap::from_iter(Some(id))); + // If the word docids just reached it's capacity we must make sure to remove + // one element, this way next time we insert we doesn't grow the capacity. + if self.word_docids.len() == self.word_docids_limit { + // Removing the front element is equivalent to removing the LRU element. + let lru = self.word_docids.pop_front(); + Self::write_word_docids(&mut self.word_docids_sorter, lru)?; + } + } + } + Ok(()) + } + + // Save the documents ids under the words pairs proximities that it contains. + fn insert_words_pairs_proximities_docids<'a>( + &mut self, + words_pairs_proximities: impl IntoIterator, + id: DocumentId, + ) -> anyhow::Result<()> + { + for ((w1, w2), prox) in words_pairs_proximities { + let w1 = SmallVec32::from(w1.as_bytes()); + let w2 = SmallVec32::from(w2.as_bytes()); + let key = (w1, w2, prox); + // if get_refresh finds the element it is assured + // to be at the end of the linked hash map. + match self.words_pairs_proximities_docids.get_refresh(&key) { + Some(old) => { old.insert(id); }, + None => { + // A newly inserted element is append at the end of the linked hash map. + let ids = RoaringBitmap::from_iter(Some(id)); + self.words_pairs_proximities_docids.insert(key, ids); + } + } + } + + // If the linked hashmap is over capacity we must remove the overflowing elements. + let len = self.words_pairs_proximities_docids.len(); + let overflow = len.checked_sub(self.words_pairs_proximities_docids_limit); + if let Some(overflow) = overflow { + let mut lrus = Vec::with_capacity(overflow); + // Removing front elements is equivalent to removing the LRUs. + let iter = iter::from_fn(|| self.words_pairs_proximities_docids.pop_front()); + iter.take(overflow).for_each(|x| lrus.push(x)); + Self::write_words_pairs_proximities(&mut self.words_pairs_proximities_docids_sorter, lrus)?; + } + + Ok(()) + } + + fn write_headers(&mut self, headers: &StringRecord) -> anyhow::Result<()> { + let headers = CsvStringRecordCodec::bytes_encode(headers) + .with_context(|| format!("could not encode csv record"))?; + Ok(self.main_sorter.insert(HEADERS_KEY, headers)?) + } + + fn write_document( + &mut self, + document_id: DocumentId, + words_positions: &HashMap>, + record: &StringRecord, + ) -> anyhow::Result<()> + { + // We compute the list of words pairs proximities (self-join) and write it directly to disk. + let words_pair_proximities = compute_words_pair_proximities(&words_positions); + self.insert_words_pairs_proximities_docids(words_pair_proximities, document_id)?; + + // We store document_id associated with all the words the record contains. + for (word, _) in words_positions { + self.insert_word_docid(word, document_id)?; + } + + let record = CsvStringRecordCodec::bytes_encode(record) + .with_context(|| format!("could not encode CSV record"))?; + + self.documents_ids.insert(document_id); + self.documents_writer.insert(document_id.to_be_bytes(), record)?; + Self::write_docid_word_positions(&mut self.docid_word_positions_writer, document_id, words_positions)?; + + Ok(()) + } + + fn write_words_pairs_proximities( + sorter: &mut Sorter, + iter: impl IntoIterator, SmallVec32, u8), RoaringBitmap)>, + ) -> anyhow::Result<()> + { + let mut key = Vec::new(); + let mut buffer = Vec::new(); + + for ((w1, w2, min_prox), docids) in iter { + key.clear(); + key.extend_from_slice(w1.as_bytes()); + key.push(0); + key.extend_from_slice(w2.as_bytes()); + // Storing the minimun proximity found between those words + key.push(min_prox); + // We serialize the document ids into a buffer + buffer.clear(); + buffer.reserve(CboRoaringBitmapCodec::serialized_size(&docids)); + CboRoaringBitmapCodec::serialize_into(&docids, &mut buffer)?; + // that we write under the generated key into MTBL + if lmdb_key_valid_size(&key) { + sorter.insert(&key, &buffer)?; + } + } + + Ok(()) + } + + fn write_docid_word_positions( + writer: &mut Writer, + id: DocumentId, + words_positions: &HashMap>, + ) -> anyhow::Result<()> + { + // We prefix the words by the document id. + let mut key = id.to_be_bytes().to_vec(); + let base_size = key.len(); + + // We order the words lexicographically, this way we avoid passing by a sorter. + let words_positions = BTreeMap::from_iter(words_positions); + + for (word, positions) in words_positions { + key.truncate(base_size); + key.extend_from_slice(word.as_bytes()); + // We serialize the positions into a buffer. + let positions = RoaringBitmap::from_iter(positions.iter().cloned()); + let bytes = BoRoaringBitmapCodec::bytes_encode(&positions) + .with_context(|| "could not serialize positions")?; + // that we write under the generated key into MTBL + if lmdb_key_valid_size(&key) { + writer.insert(&key, &bytes)?; + } + } + + Ok(()) + } + + fn write_word_docids(sorter: &mut Sorter, iter: I) -> anyhow::Result<()> + where I: IntoIterator, RoaringBitmap)> + { + let mut key = Vec::new(); + let mut buffer = Vec::new(); + + for (word, ids) in iter { + key.clear(); + key.extend_from_slice(&word); + // We serialize the document ids into a buffer + buffer.clear(); + let ids = RoaringBitmap::from_iter(ids); + buffer.reserve(ids.serialized_size()); + ids.serialize_into(&mut buffer)?; + // that we write under the generated key into MTBL + if lmdb_key_valid_size(&key) { + sorter.insert(&key, &buffer)?; + } + } + + Ok(()) + } + + fn write_documents_ids(sorter: &mut Sorter, ids: RoaringBitmap) -> anyhow::Result<()> { + let mut buffer = Vec::with_capacity(ids.serialized_size()); + ids.serialize_into(&mut buffer)?; + sorter.insert(DOCUMENTS_IDS_KEY, &buffer)?; + Ok(()) + } + + pub fn index_csv<'a>( + mut self, + mut rdr: csv::Reader>, + thread_index: usize, + num_threads: usize, + log_every_n: usize, + ) -> anyhow::Result + { + debug!("{:?}: Indexing in a Store...", thread_index); + + // Write the headers into the store. + let headers = rdr.headers()?; + self.write_headers(&headers)?; + + let mut before = Instant::now(); + let mut document_id: usize = 0; + let mut document = csv::StringRecord::new(); + let mut words_positions = HashMap::new(); + + while rdr.read_record(&mut document)? { + // We skip documents that must not be indexed by this thread. + if document_id % num_threads == thread_index { + // This is a log routine that we do every `log_every_n` documents. + if document_id % log_every_n == 0 { + let count = format_count(document_id); + info!("We have seen {} documents so far ({:.02?}).", count, before.elapsed()); + before = Instant::now(); + } + + let document_id = DocumentId::try_from(document_id).context("generated id is too big")?; + for (attr, content) in document.iter().enumerate().take(MAX_ATTRIBUTES) { + for (pos, token) in simple_tokenizer(&content).filter_map(only_token).enumerate().take(MAX_POSITION) { + let word = token.to_lowercase(); + let position = (attr * MAX_POSITION + pos) as u32; + words_positions.entry(word).or_insert_with(SmallVec32::new).push(position); + } + } + + // We write the document in the documents store. + self.write_document(document_id, &words_positions, &document)?; + words_positions.clear(); + } + + // Compute the document id of the next document. + document_id = document_id + 1; + } + + let readers = self.finish()?; + debug!("{:?}: Store created!", thread_index); + Ok(readers) + } + + fn finish(mut self) -> anyhow::Result { + let comp_type = self.chunk_compression_type; + let comp_level = self.chunk_compression_level; + let shrink_size = self.chunk_fusing_shrink_size; + + Self::write_word_docids(&mut self.word_docids_sorter, self.word_docids)?; + Self::write_documents_ids(&mut self.main_sorter, self.documents_ids)?; + Self::write_words_pairs_proximities( + &mut self.words_pairs_proximities_docids_sorter, + self.words_pairs_proximities_docids, + )?; + + let mut word_docids_wtr = tempfile().and_then(|f| create_writer(comp_type, comp_level, f))?; + let mut builder = fst::SetBuilder::memory(); + + let mut iter = self.word_docids_sorter.into_iter()?; + while let Some((word, val)) = iter.next()? { + // This is a lexicographically ordered word position + // we use the key to construct the words fst. + builder.insert(word)?; + word_docids_wtr.insert(word, val)?; + } + + let fst = builder.into_set(); + self.main_sorter.insert(WORDS_FST_KEY, fst.as_fst().as_bytes())?; + + let mut main_wtr = tempfile().and_then(|f| create_writer(comp_type, comp_level, f))?; + self.main_sorter.write_into(&mut main_wtr)?; + + let mut words_pairs_proximities_docids_wtr = tempfile().and_then(|f| create_writer(comp_type, comp_level, f))?; + self.words_pairs_proximities_docids_sorter.write_into(&mut words_pairs_proximities_docids_wtr)?; + + let main = writer_into_reader(main_wtr, shrink_size)?; + let word_docids = writer_into_reader(word_docids_wtr, shrink_size)?; + let words_pairs_proximities_docids = writer_into_reader(words_pairs_proximities_docids_wtr, shrink_size)?; + let docid_word_positions = writer_into_reader(self.docid_word_positions_writer, shrink_size)?; + let documents = writer_into_reader(self.documents_writer, shrink_size)?; + + Ok(Readers { + main, + word_docids, + docid_word_positions, + words_pairs_proximities_docids, + documents, + }) + } +} + +/// Outputs a list of all pairs of words with the shortest proximity between 1 and 7 inclusive. +/// +/// This list is used by the engine to calculate the documents containing words that are +/// close to each other. +fn compute_words_pair_proximities( + word_positions: &HashMap>, +) -> HashMap<(&str, &str), u8> +{ + use itertools::Itertools; + + let mut words_pair_proximities = HashMap::new(); + for ((w1, ps1), (w2, ps2)) in word_positions.iter().cartesian_product(word_positions) { + let mut min_prox = None; + for (ps1, ps2) in ps1.iter().cartesian_product(ps2) { + let prox = crate::proximity::positions_proximity(*ps1, *ps2); + let prox = u8::try_from(prox).unwrap(); + // We don't care about a word that appear at the + // same position or too far from the other. + if prox >= 1 && prox <= 7 { + if min_prox.map_or(true, |mp| prox < mp) { + min_prox = Some(prox) + } + } + } + + if let Some(min_prox) = min_prox { + words_pair_proximities.insert((w1.as_str(), w2.as_str()), min_prox); + } + } + + words_pair_proximities +} + +fn format_count(n: usize) -> String { + human_format::Formatter::new().with_decimals(1).with_separator("").format(n as f64) +} + +fn lmdb_key_valid_size(key: &[u8]) -> bool { + !key.is_empty() && key.len() <= LMDB_MAX_KEY_LENGTH +} diff --git a/src/lib.rs b/src/lib.rs index 25a4f45a7..1124f84e0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,5 @@ mod criterion; +mod indexing; mod mdfs; mod query_tokens; mod search; diff --git a/src/subcommand/indexer.rs b/src/subcommand/indexer.rs index 47a429112..7310de11c 100644 --- a/src/subcommand/indexer.rs +++ b/src/subcommand/indexer.rs @@ -1,40 +1,12 @@ -use std::collections::{BTreeMap, HashMap}; -use std::convert::TryFrom; use std::fs::File; -use std::io::{self, Read, Write, Seek, SeekFrom}; -use std::iter::FromIterator; use std::path::PathBuf; -use std::sync::mpsc::sync_channel; -use std::time::Instant; -use std::{cmp, iter, thread}; -use anyhow::{Context, bail}; -use bstr::ByteSlice as _; -use csv::StringRecord; -use flate2::read::GzDecoder; -use fst::IntoStreamer; -use heed::{EnvOpenOptions, BytesEncode, types::ByteSlice}; -use linked_hash_map::LinkedHashMap; -use log::{debug, info}; -use grenad::{Reader, FileFuse, Writer, Merger, Sorter, CompressionType}; -use rayon::prelude::*; -use roaring::RoaringBitmap; +use anyhow::bail; +use heed::EnvOpenOptions; use structopt::StructOpt; -use tempfile::tempfile; -use crate::heed_codec::{CsvStringRecordCodec, BoRoaringBitmapCodec, CboRoaringBitmapCodec}; -use crate::tokenizer::{simple_tokenizer, only_token}; -use crate::{SmallVec32, Index, Position, DocumentId}; - -const LMDB_MAX_KEY_LENGTH: usize = 511; -const ONE_KILOBYTE: usize = 1024 * 1024; - -const MAX_POSITION: usize = 1000; -const MAX_ATTRIBUTES: usize = u32::max_value() as usize / MAX_POSITION; - -const WORDS_FST_KEY: &[u8] = crate::WORDS_FST_KEY.as_bytes(); -const HEADERS_KEY: &[u8] = crate::HEADERS_KEY.as_bytes(); -const DOCUMENTS_IDS_KEY: &[u8] = crate::DOCUMENTS_IDS_KEY.as_bytes(); +use crate::indexing::{self, IndexerOpt}; +use crate::Index; #[derive(Debug, StructOpt)] #[structopt(name = "milli-indexer")] @@ -50,10 +22,6 @@ pub struct Opt { #[structopt(long = "db-size", default_value = "107374182400")] // 100 GB database_size: usize, - /// Number of parallel jobs, defaults to # of CPUs. - #[structopt(short, long)] - jobs: Option, - #[structopt(flatten)] indexer: IndexerOpt, @@ -71,667 +39,6 @@ pub struct Opt { csv_file: Option, } -#[derive(Debug, StructOpt)] -struct IndexerOpt { - /// The amount of documents to skip before printing - /// a log regarding the indexing advancement. - #[structopt(long, default_value = "1000000")] // 1m - log_every_n: usize, - - /// MTBL max number of chunks in bytes. - #[structopt(long)] - max_nb_chunks: Option, - - /// The maximum amount of memory to use for the MTBL buffer. It is recommended - /// to use something like 80%-90% of the available memory. - /// - /// It is automatically split by the number of jobs e.g. if you use 7 jobs - /// and 7 GB of max memory, each thread will use a maximum of 1 GB. - #[structopt(long, default_value = "7516192768")] // 7 GB - max_memory: usize, - - /// Size of the linked hash map cache when indexing. - /// The bigger it is, the faster the indexing is but the more memory it takes. - #[structopt(long, default_value = "500")] - linked_hash_map_size: usize, - - /// The name of the compression algorithm to use when compressing intermediate - /// chunks during indexing documents. - /// - /// Choosing a fast algorithm will make the indexing faster but may consume more memory. - #[structopt(long, default_value = "snappy", possible_values = &["snappy", "zlib", "lz4", "lz4hc", "zstd"])] - chunk_compression_type: CompressionType, - - /// The level of compression of the chosen algorithm. - #[structopt(long, requires = "chunk-compression-type")] - chunk_compression_level: Option, - - /// The number of bytes to remove from the begining of the chunks while reading/sorting - /// or merging them. - /// - /// File fusing must only be enable on file systems that support the `FALLOC_FL_COLLAPSE_RANGE`, - /// (i.e. ext4 and XFS). File fusing will only work if the `enable-chunk-fusing` is set. - #[structopt(long, default_value = "4294967296")] // 4 GB - chunk_fusing_shrink_size: u64, - - /// Enable the chunk fusing or not, this reduces the amount of disk used by a factor of 2. - #[structopt(long)] - enable_chunk_fusing: bool, -} - -fn format_count(n: usize) -> String { - human_format::Formatter::new().with_decimals(1).with_separator("").format(n as f64) -} - -fn lmdb_key_valid_size(key: &[u8]) -> bool { - !key.is_empty() && key.len() <= LMDB_MAX_KEY_LENGTH -} - -fn create_writer(typ: CompressionType, level: Option, file: File) -> io::Result> { - let mut builder = Writer::builder(); - builder.compression_type(typ); - if let Some(level) = level { - builder.compression_level(level); - } - builder.build(file) -} - -fn writer_into_reader(writer: Writer, shrink_size: Option) -> anyhow::Result> { - let mut file = writer.into_inner()?; - file.seek(SeekFrom::Start(0))?; - let file = if let Some(shrink_size) = shrink_size { - FileFuse::builder().shrink_size(shrink_size).build(file) - } else { - FileFuse::new(file) - }; - Reader::new(file).map_err(Into::into) -} - -fn create_sorter( - merge: MergeFn, - chunk_compression_type: CompressionType, - chunk_compression_level: Option, - chunk_fusing_shrink_size: Option, - max_nb_chunks: Option, - max_memory: Option, -) -> Sorter -{ - let mut builder = Sorter::builder(merge); - if let Some(shrink_size) = chunk_fusing_shrink_size { - builder.file_fusing_shrink_size(shrink_size); - } - builder.chunk_compression_type(chunk_compression_type); - if let Some(level) = chunk_compression_level { - builder.chunk_compression_level(level); - } - if let Some(nb_chunks) = max_nb_chunks { - builder.max_nb_chunks(nb_chunks); - } - if let Some(memory) = max_memory { - builder.max_memory(memory); - } - builder.build() -} - -/// Outputs a list of all pairs of words with the shortest proximity between 1 and 7 inclusive. -/// -/// This list is used by the engine to calculate the documents containing words that are -/// close to each other. -fn compute_words_pair_proximities( - word_positions: &HashMap>, -) -> HashMap<(&str, &str), u8> -{ - use itertools::Itertools; - - let mut words_pair_proximities = HashMap::new(); - for ((w1, ps1), (w2, ps2)) in word_positions.iter().cartesian_product(word_positions) { - let mut min_prox = None; - for (ps1, ps2) in ps1.iter().cartesian_product(ps2) { - let prox = crate::proximity::positions_proximity(*ps1, *ps2); - let prox = u8::try_from(prox).unwrap(); - // We don't care about a word that appear at the - // same position or too far from the other. - if prox >= 1 && prox <= 7 { - if min_prox.map_or(true, |mp| prox < mp) { - min_prox = Some(prox) - } - } - } - - if let Some(min_prox) = min_prox { - words_pair_proximities.insert((w1.as_str(), w2.as_str()), min_prox); - } - } - - words_pair_proximities -} - -type MergeFn = fn(&[u8], &[Vec]) -> Result, ()>; - -struct Readers { - main: Reader, - word_docids: Reader, - docid_word_positions: Reader, - words_pairs_proximities_docids: Reader, - documents: Reader, -} - -struct Store { - word_docids: LinkedHashMap, RoaringBitmap>, - word_docids_limit: usize, - words_pairs_proximities_docids: LinkedHashMap<(SmallVec32, SmallVec32, u8), RoaringBitmap>, - words_pairs_proximities_docids_limit: usize, - documents_ids: RoaringBitmap, - // MTBL parameters - chunk_compression_type: CompressionType, - chunk_compression_level: Option, - chunk_fusing_shrink_size: Option, - // MTBL sorters - main_sorter: Sorter, - word_docids_sorter: Sorter, - words_pairs_proximities_docids_sorter: Sorter, - // MTBL writers - docid_word_positions_writer: Writer, - documents_writer: Writer, -} - -impl Store { - pub fn new( - linked_hash_map_size: usize, - max_nb_chunks: Option, - max_memory: Option, - chunk_compression_type: CompressionType, - chunk_compression_level: Option, - chunk_fusing_shrink_size: Option, - ) -> anyhow::Result - { - // We divide the max memory by the number of sorter the Store have. - let max_memory = max_memory.map(|mm| cmp::max(ONE_KILOBYTE, mm / 3)); - - let main_sorter = create_sorter( - main_merge, - chunk_compression_type, - chunk_compression_level, - chunk_fusing_shrink_size, - max_nb_chunks, - max_memory, - ); - let word_docids_sorter = create_sorter( - word_docids_merge, - chunk_compression_type, - chunk_compression_level, - chunk_fusing_shrink_size, - max_nb_chunks, - max_memory, - ); - let words_pairs_proximities_docids_sorter = create_sorter( - words_pairs_proximities_docids_merge, - chunk_compression_type, - chunk_compression_level, - chunk_fusing_shrink_size, - max_nb_chunks, - max_memory, - ); - - let documents_writer = tempfile().and_then(|f| { - create_writer(chunk_compression_type, chunk_compression_level, f) - })?; - let docid_word_positions_writer = tempfile().and_then(|f| { - create_writer(chunk_compression_type, chunk_compression_level, f) - })?; - - Ok(Store { - word_docids: LinkedHashMap::with_capacity(linked_hash_map_size), - word_docids_limit: linked_hash_map_size, - words_pairs_proximities_docids: LinkedHashMap::with_capacity(linked_hash_map_size), - words_pairs_proximities_docids_limit: linked_hash_map_size, - documents_ids: RoaringBitmap::new(), - chunk_compression_type, - chunk_compression_level, - chunk_fusing_shrink_size, - - main_sorter, - word_docids_sorter, - words_pairs_proximities_docids_sorter, - - docid_word_positions_writer, - documents_writer, - }) - } - - // Save the documents ids under the position and word we have seen it. - fn insert_word_docid(&mut self, word: &str, id: DocumentId) -> anyhow::Result<()> { - // if get_refresh finds the element it is assured to be at the end of the linked hash map. - match self.word_docids.get_refresh(word.as_bytes()) { - Some(old) => { old.insert(id); }, - None => { - let word_vec = SmallVec32::from(word.as_bytes()); - // A newly inserted element is append at the end of the linked hash map. - self.word_docids.insert(word_vec, RoaringBitmap::from_iter(Some(id))); - // If the word docids just reached it's capacity we must make sure to remove - // one element, this way next time we insert we doesn't grow the capacity. - if self.word_docids.len() == self.word_docids_limit { - // Removing the front element is equivalent to removing the LRU element. - let lru = self.word_docids.pop_front(); - Self::write_word_docids(&mut self.word_docids_sorter, lru)?; - } - } - } - Ok(()) - } - - // Save the documents ids under the words pairs proximities that it contains. - fn insert_words_pairs_proximities_docids<'a>( - &mut self, - words_pairs_proximities: impl IntoIterator, - id: DocumentId, - ) -> anyhow::Result<()> - { - for ((w1, w2), prox) in words_pairs_proximities { - let w1 = SmallVec32::from(w1.as_bytes()); - let w2 = SmallVec32::from(w2.as_bytes()); - let key = (w1, w2, prox); - // if get_refresh finds the element it is assured - // to be at the end of the linked hash map. - match self.words_pairs_proximities_docids.get_refresh(&key) { - Some(old) => { old.insert(id); }, - None => { - // A newly inserted element is append at the end of the linked hash map. - let ids = RoaringBitmap::from_iter(Some(id)); - self.words_pairs_proximities_docids.insert(key, ids); - } - } - } - - // If the linked hashmap is over capacity we must remove the overflowing elements. - let len = self.words_pairs_proximities_docids.len(); - let overflow = len.checked_sub(self.words_pairs_proximities_docids_limit); - if let Some(overflow) = overflow { - let mut lrus = Vec::with_capacity(overflow); - // Removing front elements is equivalent to removing the LRUs. - let iter = iter::from_fn(|| self.words_pairs_proximities_docids.pop_front()); - iter.take(overflow).for_each(|x| lrus.push(x)); - Self::write_words_pairs_proximities(&mut self.words_pairs_proximities_docids_sorter, lrus)?; - } - - Ok(()) - } - - fn write_headers(&mut self, headers: &StringRecord) -> anyhow::Result<()> { - let headers = CsvStringRecordCodec::bytes_encode(headers) - .with_context(|| format!("could not encode csv record"))?; - Ok(self.main_sorter.insert(HEADERS_KEY, headers)?) - } - - fn write_document( - &mut self, - document_id: DocumentId, - words_positions: &HashMap>, - record: &StringRecord, - ) -> anyhow::Result<()> - { - // We compute the list of words pairs proximities (self-join) and write it directly to disk. - let words_pair_proximities = compute_words_pair_proximities(&words_positions); - self.insert_words_pairs_proximities_docids(words_pair_proximities, document_id)?; - - // We store document_id associated with all the words the record contains. - for (word, _) in words_positions { - self.insert_word_docid(word, document_id)?; - } - - let record = CsvStringRecordCodec::bytes_encode(record) - .with_context(|| format!("could not encode CSV record"))?; - - self.documents_ids.insert(document_id); - self.documents_writer.insert(document_id.to_be_bytes(), record)?; - Self::write_docid_word_positions(&mut self.docid_word_positions_writer, document_id, words_positions)?; - - Ok(()) - } - - fn write_words_pairs_proximities( - sorter: &mut Sorter, - iter: impl IntoIterator, SmallVec32, u8), RoaringBitmap)>, - ) -> anyhow::Result<()> - { - let mut key = Vec::new(); - let mut buffer = Vec::new(); - - for ((w1, w2, min_prox), docids) in iter { - key.clear(); - key.extend_from_slice(w1.as_bytes()); - key.push(0); - key.extend_from_slice(w2.as_bytes()); - // Storing the minimun proximity found between those words - key.push(min_prox); - // We serialize the document ids into a buffer - buffer.clear(); - buffer.reserve(CboRoaringBitmapCodec::serialized_size(&docids)); - CboRoaringBitmapCodec::serialize_into(&docids, &mut buffer)?; - // that we write under the generated key into MTBL - if lmdb_key_valid_size(&key) { - sorter.insert(&key, &buffer)?; - } - } - - Ok(()) - } - - fn write_docid_word_positions( - writer: &mut Writer, - id: DocumentId, - words_positions: &HashMap>, - ) -> anyhow::Result<()> - { - // We prefix the words by the document id. - let mut key = id.to_be_bytes().to_vec(); - let base_size = key.len(); - - // We order the words lexicographically, this way we avoid passing by a sorter. - let words_positions = BTreeMap::from_iter(words_positions); - - for (word, positions) in words_positions { - key.truncate(base_size); - key.extend_from_slice(word.as_bytes()); - // We serialize the positions into a buffer. - let positions = RoaringBitmap::from_iter(positions.iter().cloned()); - let bytes = BoRoaringBitmapCodec::bytes_encode(&positions) - .with_context(|| "could not serialize positions")?; - // that we write under the generated key into MTBL - if lmdb_key_valid_size(&key) { - writer.insert(&key, &bytes)?; - } - } - - Ok(()) - } - - fn write_word_docids(sorter: &mut Sorter, iter: I) -> anyhow::Result<()> - where I: IntoIterator, RoaringBitmap)> - { - let mut key = Vec::new(); - let mut buffer = Vec::new(); - - for (word, ids) in iter { - key.clear(); - key.extend_from_slice(&word); - // We serialize the document ids into a buffer - buffer.clear(); - let ids = RoaringBitmap::from_iter(ids); - buffer.reserve(ids.serialized_size()); - ids.serialize_into(&mut buffer)?; - // that we write under the generated key into MTBL - if lmdb_key_valid_size(&key) { - sorter.insert(&key, &buffer)?; - } - } - - Ok(()) - } - - fn write_documents_ids(sorter: &mut Sorter, ids: RoaringBitmap) -> anyhow::Result<()> { - let mut buffer = Vec::with_capacity(ids.serialized_size()); - ids.serialize_into(&mut buffer)?; - sorter.insert(DOCUMENTS_IDS_KEY, &buffer)?; - Ok(()) - } - - pub fn index_csv( - mut self, - mut rdr: csv::Reader>, - thread_index: usize, - num_threads: usize, - log_every_n: usize, - ) -> anyhow::Result - { - debug!("{:?}: Indexing in a Store...", thread_index); - - // Write the headers into the store. - let headers = rdr.headers()?; - self.write_headers(&headers)?; - - let mut before = Instant::now(); - let mut document_id: usize = 0; - let mut document = csv::StringRecord::new(); - let mut words_positions = HashMap::new(); - - while rdr.read_record(&mut document)? { - // We skip documents that must not be indexed by this thread. - if document_id % num_threads == thread_index { - // This is a log routine that we do every `log_every_n` documents. - if document_id % log_every_n == 0 { - let count = format_count(document_id); - info!("We have seen {} documents so far ({:.02?}).", count, before.elapsed()); - before = Instant::now(); - } - - let document_id = DocumentId::try_from(document_id).context("generated id is too big")?; - for (attr, content) in document.iter().enumerate().take(MAX_ATTRIBUTES) { - for (pos, token) in simple_tokenizer(&content).filter_map(only_token).enumerate().take(MAX_POSITION) { - let word = token.to_lowercase(); - let position = (attr * MAX_POSITION + pos) as u32; - words_positions.entry(word).or_insert_with(SmallVec32::new).push(position); - } - } - - // We write the document in the documents store. - self.write_document(document_id, &words_positions, &document)?; - words_positions.clear(); - } - - // Compute the document id of the next document. - document_id = document_id + 1; - } - - let readers = self.finish()?; - debug!("{:?}: Store created!", thread_index); - Ok(readers) - } - - fn finish(mut self) -> anyhow::Result { - let comp_type = self.chunk_compression_type; - let comp_level = self.chunk_compression_level; - let shrink_size = self.chunk_fusing_shrink_size; - - Self::write_word_docids(&mut self.word_docids_sorter, self.word_docids)?; - Self::write_documents_ids(&mut self.main_sorter, self.documents_ids)?; - Self::write_words_pairs_proximities( - &mut self.words_pairs_proximities_docids_sorter, - self.words_pairs_proximities_docids, - )?; - - let mut word_docids_wtr = tempfile().and_then(|f| create_writer(comp_type, comp_level, f))?; - let mut builder = fst::SetBuilder::memory(); - - let mut iter = self.word_docids_sorter.into_iter()?; - while let Some((word, val)) = iter.next()? { - // This is a lexicographically ordered word position - // we use the key to construct the words fst. - builder.insert(word)?; - word_docids_wtr.insert(word, val)?; - } - - let fst = builder.into_set(); - self.main_sorter.insert(WORDS_FST_KEY, fst.as_fst().as_bytes())?; - - let mut main_wtr = tempfile().and_then(|f| create_writer(comp_type, comp_level, f))?; - self.main_sorter.write_into(&mut main_wtr)?; - - let mut words_pairs_proximities_docids_wtr = tempfile().and_then(|f| create_writer(comp_type, comp_level, f))?; - self.words_pairs_proximities_docids_sorter.write_into(&mut words_pairs_proximities_docids_wtr)?; - - let main = writer_into_reader(main_wtr, shrink_size)?; - let word_docids = writer_into_reader(word_docids_wtr, shrink_size)?; - let words_pairs_proximities_docids = writer_into_reader(words_pairs_proximities_docids_wtr, shrink_size)?; - let docid_word_positions = writer_into_reader(self.docid_word_positions_writer, shrink_size)?; - let documents = writer_into_reader(self.documents_writer, shrink_size)?; - - Ok(Readers { - main, - word_docids, - docid_word_positions, - words_pairs_proximities_docids, - documents, - }) - } -} - -fn main_merge(key: &[u8], values: &[Vec]) -> Result, ()> { - match key { - WORDS_FST_KEY => { - let fsts: Vec<_> = values.iter().map(|v| fst::Set::new(v).unwrap()).collect(); - - // Union of the FSTs - let mut op = fst::set::OpBuilder::new(); - fsts.iter().for_each(|fst| op.push(fst.into_stream())); - let op = op.r#union(); - - let mut build = fst::SetBuilder::memory(); - build.extend_stream(op.into_stream()).unwrap(); - Ok(build.into_inner().unwrap()) - }, - HEADERS_KEY => { - assert!(values.windows(2).all(|vs| vs[0] == vs[1])); - Ok(values[0].to_vec()) - }, - DOCUMENTS_IDS_KEY => word_docids_merge(&[], values), - otherwise => panic!("wut {:?}", otherwise), - } -} - -fn word_docids_merge(_key: &[u8], values: &[Vec]) -> Result, ()> { - let (head, tail) = values.split_first().unwrap(); - let mut head = RoaringBitmap::deserialize_from(head.as_slice()).unwrap(); - - for value in tail { - let bitmap = RoaringBitmap::deserialize_from(value.as_slice()).unwrap(); - head.union_with(&bitmap); - } - - let mut vec = Vec::with_capacity(head.serialized_size()); - head.serialize_into(&mut vec).unwrap(); - Ok(vec) -} - -fn docid_word_positions_merge(key: &[u8], _values: &[Vec]) -> Result, ()> { - panic!("merging docid word positions is an error ({:?})", key.as_bstr()) -} - -fn words_pairs_proximities_docids_merge(_key: &[u8], values: &[Vec]) -> Result, ()> { - let (head, tail) = values.split_first().unwrap(); - let mut head = CboRoaringBitmapCodec::deserialize_from(head.as_slice()).unwrap(); - - for value in tail { - let bitmap = CboRoaringBitmapCodec::deserialize_from(value.as_slice()).unwrap(); - head.union_with(&bitmap); - } - - let mut vec = Vec::new(); - CboRoaringBitmapCodec::serialize_into(&head, &mut vec).unwrap(); - Ok(vec) -} - -fn documents_merge(key: &[u8], _values: &[Vec]) -> Result, ()> { - panic!("merging documents is an error ({:?})", key.as_bstr()) -} - -fn merge_readers(sources: Vec>, merge: MergeFn) -> Merger { - let mut builder = Merger::builder(merge); - builder.extend(sources); - builder.build() -} - -fn merge_into_lmdb_database( - wtxn: &mut heed::RwTxn, - database: heed::PolyDatabase, - sources: Vec>, - merge: MergeFn, -) -> anyhow::Result<()> { - debug!("Merging {} MTBL stores...", sources.len()); - let before = Instant::now(); - - let merger = merge_readers(sources, merge); - let mut in_iter = merger.into_merge_iter()?; - - let mut out_iter = database.iter_mut::<_, ByteSlice, ByteSlice>(wtxn)?; - while let Some((k, v)) = in_iter.next()? { - out_iter.append(k, v).with_context(|| format!("writing {:?} into LMDB", k.as_bstr()))?; - } - - debug!("MTBL stores merged in {:.02?}!", before.elapsed()); - Ok(()) -} - -fn write_into_lmdb_database( - wtxn: &mut heed::RwTxn, - database: heed::PolyDatabase, - mut reader: Reader, -) -> anyhow::Result<()> { - debug!("Writing MTBL stores..."); - let before = Instant::now(); - - let mut out_iter = database.iter_mut::<_, ByteSlice, ByteSlice>(wtxn)?; - while let Some((k, v)) = reader.next()? { - out_iter.append(k, v).with_context(|| format!("writing {:?} into LMDB", k.as_bstr()))?; - } - - debug!("MTBL stores merged in {:.02?}!", before.elapsed()); - Ok(()) -} - -/// Returns the list of CSV sources that the indexer must read. -/// -/// There is `num_threads` sources. If the file is not specified, the standard input is used. -fn csv_readers( - csv_file_path: Option, - num_threads: usize, -) -> anyhow::Result>>> -{ - match csv_file_path { - Some(file_path) => { - // We open the file # jobs times. - iter::repeat_with(|| { - let file = File::open(&file_path) - .with_context(|| format!("Failed to read CSV file {}", file_path.display()))?; - // if the file extension is "gz" or "gzip" we can decode and read it. - let r = if file_path.extension().map_or(false, |e| e == "gz" || e == "gzip") { - Box::new(GzDecoder::new(file)) as Box - } else { - Box::new(file) as Box - }; - Ok(csv::Reader::from_reader(r)) as anyhow::Result<_> - }) - .take(num_threads) - .collect() - }, - None => { - let mut csv_readers = Vec::new(); - let mut writers = Vec::new(); - for (r, w) in iter::repeat_with(ringtail::io::pipe).take(num_threads) { - let r = Box::new(r) as Box; - csv_readers.push(csv::Reader::from_reader(r)); - writers.push(w); - } - - thread::spawn(move || { - let stdin = std::io::stdin(); - let mut stdin = stdin.lock(); - let mut buffer = [0u8; 4096]; - loop { - match stdin.read(&mut buffer)? { - 0 => return Ok(()) as io::Result<()>, - size => for w in &mut writers { - w.write_all(&buffer[..size])?; - } - } - } - }); - - Ok(csv_readers) - }, - } -} - pub fn run(opt: Opt) -> anyhow::Result<()> { stderrlog::new() .verbosity(opt.verbose) @@ -739,10 +46,6 @@ pub fn run(opt: Opt) -> anyhow::Result<()> { .timestamp(stderrlog::Timestamp::Off) .init()?; - if let Some(jobs) = opt.jobs { - rayon::ThreadPoolBuilder::new().num_threads(jobs).build_global()?; - } - if opt.database.exists() { bail!("Database ({}) already exists, delete it to continue.", opt.database.display()); } @@ -753,130 +56,12 @@ pub fn run(opt: Opt) -> anyhow::Result<()> { .max_dbs(10) .open(&opt.database)?; - let before_indexing = Instant::now(); let index = Index::new(&env)?; - let num_threads = rayon::current_num_threads(); - let linked_hash_map_size = opt.indexer.linked_hash_map_size; - let max_nb_chunks = opt.indexer.max_nb_chunks; - let max_memory_by_job = opt.indexer.max_memory / num_threads; - let chunk_compression_type = opt.indexer.chunk_compression_type; - let chunk_compression_level = opt.indexer.chunk_compression_level; - let log_every_n = opt.indexer.log_every_n; + let file_path = opt.csv_file.unwrap(); + let gzipped = file_path.extension().map_or(false, |e| e == "gz" || e == "gzip"); + let file = File::open(file_path)?; + let content = unsafe { memmap::Mmap::map(&file)? }; - let chunk_fusing_shrink_size = if opt.indexer.enable_chunk_fusing { - Some(opt.indexer.chunk_fusing_shrink_size) - } else { - None - }; - - let readers = csv_readers(opt.csv_file, num_threads)? - .into_par_iter() - .enumerate() - .map(|(i, rdr)| { - let store = Store::new( - linked_hash_map_size, - max_nb_chunks, - Some(max_memory_by_job), - chunk_compression_type, - chunk_compression_level, - chunk_fusing_shrink_size, - )?; - store.index_csv(rdr, i, num_threads, log_every_n) - }) - .collect::, _>>()?; - - let mut main_readers = Vec::with_capacity(readers.len()); - let mut word_docids_readers = Vec::with_capacity(readers.len()); - let mut docid_word_positions_readers = Vec::with_capacity(readers.len()); - let mut words_pairs_proximities_docids_readers = Vec::with_capacity(readers.len()); - let mut documents_readers = Vec::with_capacity(readers.len()); - readers.into_iter().for_each(|readers| { - main_readers.push(readers.main); - word_docids_readers.push(readers.word_docids); - docid_word_positions_readers.push(readers.docid_word_positions); - words_pairs_proximities_docids_readers.push(readers.words_pairs_proximities_docids); - documents_readers.push(readers.documents); - }); - - // This is the function that merge the readers - // by using the given merge function. - let merge_readers = move |readers, merge| { - let mut writer = tempfile().and_then(|f| { - create_writer(chunk_compression_type, chunk_compression_level, f) - })?; - let merger = merge_readers(readers, merge); - merger.write_into(&mut writer)?; - writer_into_reader(writer, chunk_fusing_shrink_size) - }; - - // The enum and the channel which is used to transfert - // the readers merges potentially done on another thread. - enum DatabaseType { Main, WordDocids, WordsPairsProximitiesDocids }; - let (sender, receiver) = sync_channel(3); - - debug!("Merging the main, word docids and words pairs proximity docids in parallel..."); - rayon::spawn(move || { - vec![ - (DatabaseType::Main, main_readers, main_merge as MergeFn), - (DatabaseType::WordDocids, word_docids_readers, word_docids_merge), - ( - DatabaseType::WordsPairsProximitiesDocids, - words_pairs_proximities_docids_readers, - words_pairs_proximities_docids_merge, - ), - ] - .into_par_iter() - .for_each(|(dbtype, readers, merge)| { - let result = merge_readers(readers, merge); - sender.send((dbtype, result)).unwrap(); - }); - }); - - let mut wtxn = env.write_txn()?; - - debug!("Writing the docid word positions into LMDB on disk..."); - merge_into_lmdb_database( - &mut wtxn, - *index.docid_word_positions.as_polymorph(), - docid_word_positions_readers, - docid_word_positions_merge, - )?; - - debug!("Writing the documents into LMDB on disk..."); - merge_into_lmdb_database( - &mut wtxn, - *index.documents.as_polymorph(), - documents_readers, - documents_merge, - )?; - - for (db_type, result) in receiver { - let content = result?; - match db_type { - DatabaseType::Main => { - debug!("Writing the main elements into LMDB on disk..."); - write_into_lmdb_database(&mut wtxn, index.main, content)?; - }, - DatabaseType::WordDocids => { - debug!("Writing the words docids into LMDB on disk..."); - let db = *index.word_docids.as_polymorph(); - write_into_lmdb_database(&mut wtxn, db, content)?; - }, - DatabaseType::WordsPairsProximitiesDocids => { - debug!("Writing the words pairs proximities docids into LMDB on disk..."); - let db = *index.word_pair_proximity_docids.as_polymorph(); - write_into_lmdb_database(&mut wtxn, db, content)?; - }, - } - } - - debug!("Retrieving the number of documents..."); - let count = index.number_of_documents(&wtxn)?; - - wtxn.commit()?; - - info!("Wrote {} documents in {:.02?}", count, before_indexing.elapsed()); - - Ok(()) + indexing::run(&env, &index, opt.indexer, &content, gzipped) } diff --git a/src/subcommand/serve.rs b/src/subcommand/serve.rs index 4f795107c..e19d1d954 100644 --- a/src/subcommand/serve.rs +++ b/src/subcommand/serve.rs @@ -19,6 +19,7 @@ use tokio::sync::broadcast; use warp::filters::ws::Message; use warp::{Filter, http::Response}; +use crate::indexing::IndexerOpt; use crate::tokenizer::{simple_tokenizer, TokenType}; use crate::{Index, UpdateStore, SearchResult}; @@ -51,6 +52,9 @@ pub struct Opt { /// The ip and port on which the database will listen for HTTP requests. #[structopt(short = "l", long, default_value = "127.0.0.1:9700")] http_listen_addr: String, + + #[structopt(flatten)] + indexer: IndexerOpt, } fn highlight_record(record: &csv::StringRecord, words: &HashSet) -> csv::StringRecord {