diff --git a/src/bin/indexer.rs b/src/bin/indexer.rs index 0dd7ef074..60c852c9f 100644 --- a/src/bin/indexer.rs +++ b/src/bin/indexer.rs @@ -21,7 +21,7 @@ use rayon::prelude::*; use roaring::RoaringBitmap; use structopt::StructOpt; -use milli::{lexer, SmallVec32, Index, DocumentId, Position, Attribute}; +use milli::{lexer, SmallVec32, Index, DocumentId, Position, Attribute, BEU32}; const LMDB_MAX_KEY_LENGTH: usize = 511; const ONE_MILLION: usize = 1_000_000; @@ -30,10 +30,12 @@ const MAX_POSITION: usize = 1000; const MAX_ATTRIBUTES: usize = u32::max_value() as usize / MAX_POSITION; const HEADERS_KEY: &[u8] = b"\0headers"; +const DOCUMENTS_IDS_KEY: &[u8] = b"\x04documents-ids"; const WORDS_FST_KEY: &[u8] = b"\x05words-fst"; const WORD_POSITIONS_BYTE: u8 = 1; const WORD_POSITION_DOCIDS_BYTE: u8 = 2; const WORD_ATTRIBUTE_DOCIDS_BYTE: u8 = 3; +const DOCUMENTS_IDS_BYTE: u8 = 4; #[cfg(target_os = "linux")] #[global_allocator] @@ -60,14 +62,6 @@ struct Opt { #[structopt(flatten)] indexer: IndexerOpt, - /// The name of the compression algorithm to use when compressing the final documents database. - #[structopt(long, default_value = "zlib", possible_values = &["snappy", "zlib", "lz4", "lz4hc", "zstd"])] - documents_compression_type: String, - - /// The level of compression of the chosen algorithm. - #[structopt(long, default_value = "9")] - documents_compression_level: u32, - /// Verbose mode (-v, -vv, -vvv, etc.) #[structopt(short, long, parse(from_occurrences))] verbose: usize, @@ -129,6 +123,7 @@ struct Store { word_positions: ArcCache, RoaringBitmap>, word_position_docids: ArcCache<(SmallVec32, Position), RoaringBitmap>, word_attribute_docids: ArcCache<(SmallVec32, Attribute), RoaringBitmap>, + documents_ids: RoaringBitmap, sorter: Sorter, documents_sorter: Sorter, } @@ -166,6 +161,7 @@ impl Store { word_positions: ArcCache::new(arc_cache_size), word_position_docids: ArcCache::new(arc_cache_size), word_attribute_docids: ArcCache::new(arc_cache_size), + documents_ids: RoaringBitmap::new(), sorter: builder.build(), documents_sorter: documents_builder.build(), } @@ -200,6 +196,10 @@ impl Store { Ok(self.sorter.insert(HEADERS_KEY, headers)?) } + pub fn write_document_id(&mut self, id: DocumentId) { + self.documents_ids.insert(id); + } + pub fn write_document(&mut self, id: DocumentId, content: &[u8]) -> anyhow::Result<()> { Ok(self.documents_sorter.insert(id.to_be_bytes(), content)?) } @@ -216,6 +216,7 @@ impl Store { key.extend_from_slice(&word); // We serialize the positions into a buffer buffer.clear(); + buffer.reserve(positions.serialized_size()); positions.serialize_into(&mut buffer)?; // that we write under the generated key into MTBL if lmdb_key_valid_size(&key) { @@ -240,6 +241,7 @@ impl Store { key.extend_from_slice(&pos.to_be_bytes()); // We serialize the document ids into a buffer buffer.clear(); + buffer.reserve(ids.serialized_size()); ids.serialize_into(&mut buffer)?; // that we write under the generated key into MTBL if lmdb_key_valid_size(&key) { @@ -264,6 +266,7 @@ impl Store { key.extend_from_slice(&attr.to_be_bytes()); // We serialize the document ids into a buffer buffer.clear(); + buffer.reserve(ids.serialized_size()); ids.serialize_into(&mut buffer)?; // that we write under the generated key into MTBL if lmdb_key_valid_size(&key) { @@ -274,10 +277,18 @@ impl Store { Ok(()) } + fn write_documents_ids(sorter: &mut Sorter, ids: RoaringBitmap) -> anyhow::Result<()> { + let mut buffer = Vec::with_capacity(ids.serialized_size()); + ids.serialize_into(&mut buffer)?; + sorter.insert(DOCUMENTS_IDS_KEY, &buffer)?; + Ok(()) + } + pub fn finish(mut self) -> anyhow::Result<(Reader, Reader)> { Self::write_word_positions(&mut self.sorter, self.word_positions)?; Self::write_word_position_docids(&mut self.sorter, self.word_position_docids)?; Self::write_word_attribute_docids(&mut self.sorter, self.word_attribute_docids)?; + Self::write_documents_ids(&mut self.sorter, self.documents_ids)?; let mut wtr = tempfile::tempfile().map(Writer::new)?; let mut builder = fst::SetBuilder::memory(); @@ -335,7 +346,7 @@ fn merge(key: &[u8], values: &[Vec]) -> Result, ()> { Ok(values[0].to_vec()) }, key => match key[0] { - WORD_POSITIONS_BYTE | WORD_POSITION_DOCIDS_BYTE | WORD_ATTRIBUTE_DOCIDS_BYTE => { + DOCUMENTS_IDS_BYTE | WORD_POSITIONS_BYTE | WORD_POSITION_DOCIDS_BYTE | WORD_ATTRIBUTE_DOCIDS_BYTE => { let mut first = RoaringBitmap::deserialize_from(values[0].as_slice()).unwrap(); for value in &values[1..] { @@ -363,6 +374,10 @@ fn lmdb_writer(wtxn: &mut heed::RwTxn, index: &Index, key: &[u8], val: &[u8]) -> // Write the headers index.main.put::<_, Str, ByteSlice>(wtxn, "headers", val)?; } + else if key == DOCUMENTS_IDS_KEY { + // Write the documents ids list + index.main.put::<_, Str, ByteSlice>(wtxn, "documents-ids", val)?; + } else if key.starts_with(&[WORD_POSITIONS_BYTE]) { // Write the postings lists index.word_positions.as_polymorph() @@ -458,6 +473,7 @@ fn index_csv( writer.write_byte_record(document.as_byte_record())?; let document = writer.into_inner()?; store.write_document(document_id, &document)?; + store.write_document_id(document_id); } // Compute the document id of the the next document. @@ -497,8 +513,6 @@ fn main() -> anyhow::Result<()> { let max_memory = opt.indexer.max_memory; let chunk_compression_type = compression_type_from_str(&opt.indexer.chunk_compression_type); let chunk_compression_level = opt.indexer.chunk_compression_level; - let documents_compression_type = compression_type_from_str(&opt.documents_compression_type); - let documents_compression_level = opt.documents_compression_level; let csv_readers = match opt.csv_file { Some(file_path) => { @@ -568,28 +582,22 @@ fn main() -> anyhow::Result<()> { docs_stores.push(d); }); - debug!("We are writing the documents into MTBL on disk..."); - // We also merge the documents into its own MTBL store. - let file = tempfile::tempfile()?; - let mut writer = Writer::builder() - .compression_type(documents_compression_type) - .compression_level(documents_compression_level) - .build(file); - let mut builder = Merger::builder(docs_merge); - builder.extend(docs_stores); - builder.build().write_into(&mut writer)?; - let file = writer.into_inner()?; - - // Read back the documents MTBL database from the file. - let documents_mmap = unsafe { memmap::Mmap::map(&file)? }; - let documents = Reader::new(documents_mmap)?; - - debug!("We are writing the postings lists and documents into LMDB on disk..."); - // We merge the postings lists into LMDB. let mut wtxn = env.write_txn()?; + + // We merge the postings lists into LMDB. + debug!("We are writing the postings lists into LMDB on disk..."); merge_into_lmdb(stores, |k, v| lmdb_writer(&mut wtxn, &index, k, v))?; - index.put_documents(&mut wtxn, &documents)?; + + // We merge the documents into LMDB. + debug!("We are writing the documents into LMDB on disk..."); + merge_into_lmdb(docs_stores, |k, v| { + let id = k.try_into().map(u32::from_be_bytes)?; + Ok(index.documents.put(&mut wtxn, &BEU32::new(id), v)?) + })?; + + // Retrieve the number of documents. let count = index.number_of_documents(&wtxn)?; + wtxn.commit()?; info!("Wrote {} documents in {:.02?}", count, before_indexing.elapsed()); diff --git a/src/heed_codec/mod.rs b/src/heed_codec/mod.rs index 7729065a9..bb75cdc15 100644 --- a/src/heed_codec/mod.rs +++ b/src/heed_codec/mod.rs @@ -1,7 +1,5 @@ -mod mtbl_codec; mod roaring_bitmap_codec; mod str_beu32_codec; -pub use self::mtbl_codec::MtblCodec; pub use self::roaring_bitmap_codec::RoaringBitmapCodec; pub use self::str_beu32_codec::StrBEU32Codec; diff --git a/src/heed_codec/mtbl_codec.rs b/src/heed_codec/mtbl_codec.rs deleted file mode 100644 index b4815da4f..000000000 --- a/src/heed_codec/mtbl_codec.rs +++ /dev/null @@ -1,21 +0,0 @@ -use std::borrow::Cow; -use std::marker::PhantomData; -use oxidized_mtbl::Reader; - -pub struct MtblCodec(PhantomData); - -impl<'a> heed::BytesDecode<'a> for MtblCodec<&'a [u8]> { - type DItem = Reader<&'a [u8]>; - - fn bytes_decode(bytes: &'a [u8]) -> Option { - Reader::new(bytes).ok() - } -} - -impl<'a, A: AsRef<[u8]> + 'a> heed::BytesEncode<'a> for MtblCodec { - type EItem = Reader; - - fn bytes_encode(item: &Self::EItem) -> Option> { - Some(Cow::Borrowed(item.as_bytes())) - } -} diff --git a/src/lib.rs b/src/lib.rs index 65b8f0534..8629c58bd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,15 +8,14 @@ pub mod lexer; use std::collections::HashMap; use std::hash::BuildHasherDefault; -use anyhow::{bail, Context}; +use anyhow::Context; use fxhash::{FxHasher32, FxHasher64}; use heed::types::*; use heed::{PolyDatabase, Database}; -use oxidized_mtbl as omtbl; pub use self::search::{Search, SearchResult}; pub use self::criterion::{Criterion, default_criteria}; -use self::heed_codec::{MtblCodec, RoaringBitmapCodec, StrBEU32Codec}; +use self::heed_codec::{RoaringBitmapCodec, StrBEU32Codec}; pub type FastMap4 = HashMap>; pub type FastMap8 = HashMap>; @@ -30,7 +29,7 @@ pub type Position = u32; const WORDS_FST_KEY: &str = "words-fst"; const HEADERS_KEY: &str = "headers"; -const DOCUMENTS_KEY: &str = "documents"; +const DOCUMENTS_IDS_KEY: &str = "documents-ids"; #[derive(Clone)] pub struct Index { @@ -42,6 +41,8 @@ pub struct Index { pub word_position_docids: Database, /// Maps a word and an attribute (u32) to all the documents ids where the given word appears. pub word_attribute_docids: Database, + /// Maps the document id to the document as a CSV line. + pub documents: Database, ByteSlice>, } impl Index { @@ -51,6 +52,7 @@ impl Index { word_positions: env.create_database(Some("word-positions"))?, word_position_docids: env.create_database(Some("word-position-docids"))?, word_attribute_docids: env.create_database(Some("word-attribute-docids"))?, + documents: env.create_database(Some("documents"))?, }) } @@ -91,29 +93,18 @@ impl Index { iter: impl IntoIterator, ) -> anyhow::Result)>> { - match self.main.get::<_, Str, MtblCodec<&[u8]>>(rtxn, DOCUMENTS_KEY)? { - Some(documents) => { - iter.into_iter().map(|id| { - let key = id.to_be_bytes(); - let content = documents.clone().get(&key)? - .with_context(|| format!("Could not find document {}", id))?; - Ok((id, content.as_ref().to_vec())) - }).collect() - }, - None => bail!("No documents database found"), - } - } - - pub fn put_documents>(&self, wtxn: &mut heed::RwTxn, documents: &omtbl::Reader) -> anyhow::Result<()> { - Ok(self.main.put::<_, Str, MtblCodec>(wtxn, DOCUMENTS_KEY, documents)?) + iter.into_iter().map(|id| { + let content = self.documents.get(rtxn, &BEU32::new(id))? + .with_context(|| format!("Could not find document {}", id))?; + Ok((id, content.to_vec())) + }).collect() } /// Returns the number of documents indexed in the database. pub fn number_of_documents<'t>(&self, rtxn: &'t heed::RoTxn) -> anyhow::Result { - match self.main.get::<_, Str, MtblCodec<&[u8]>>(rtxn, DOCUMENTS_KEY)? { - Some(documents) => Ok(documents.metadata().count_entries as usize), - None => return Ok(0), - } + let docids = self.main.get::<_, Str, RoaringBitmapCodec>(rtxn, DOCUMENTS_IDS_KEY)? + .with_context(|| format!("Could not find the list of documents ids"))?; + Ok(docids.len() as usize) } pub fn search<'a>(&'a self, rtxn: &'a heed::RoTxn) -> Search<'a> {