diff --git a/Cargo.lock b/Cargo.lock index f5a2fde2f..465a55817 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1216,7 +1216,7 @@ checksum = "2839e79665f131bdb5782e51f2c6c9599c133c6098982a54c794358bf432529c" [[package]] name = "oxidized-mtbl" version = "0.1.0" -source = "git+https://github.com/Kerollmops/oxidized-mtbl.git?rev=6b8a3a8#6b8a3a83a8b83bfdba38f7ea67bfa5868e668741" +source = "git+https://github.com/Kerollmops/oxidized-mtbl.git?rev=13294cc#13294ccd73c9d6f71645a3ed2852656f3c86d31d" dependencies = [ "byteorder", "crc32c", diff --git a/src/bin/indexer.rs b/src/bin/indexer.rs index 40163df36..ef873c966 100644 --- a/src/bin/indexer.rs +++ b/src/bin/indexer.rs @@ -1,3 +1,4 @@ +use std::convert::TryInto; use std::convert::TryFrom; use std::fs::File; use std::io::{self, Read, Write}; @@ -30,10 +31,10 @@ const MAX_ATTRIBUTES: usize = u32::max_value() as usize / MAX_POSITION; const HEADERS_KEY: &[u8] = b"\0headers"; const WORDS_FST_KEY: &[u8] = b"\x05words-fst"; +const DOCUMENTS_KEY: &[u8] = b"\x06documents"; const WORD_POSITIONS_BYTE: u8 = 1; const WORD_POSITION_DOCIDS_BYTE: u8 = 2; const WORD_ATTRIBUTE_DOCIDS_BYTE: u8 = 3; -const DOCUMENT_BYTE: u8 = 4; #[cfg(target_os = "linux")] #[global_allocator] @@ -88,22 +89,23 @@ struct Store { word_position_docids: ArcCache<(SmallVec32, Position), RoaringBitmap>, word_attribute_docids: ArcCache<(SmallVec32, Attribute), RoaringBitmap>, sorter: Sorter, + documents_sorter: Sorter, } impl Store { fn new(arc_cache_size: Option, max_nb_chunks: Option, max_memory: Option) -> Store { let mut builder = Sorter::builder(merge as MergeFn); - builder.chunk_compression_type(CompressionType::Snappy); - if let Some(nb_chunks) = max_nb_chunks { builder.max_nb_chunks(nb_chunks); } - if let Some(memory) = max_memory { builder.max_memory(memory); } + let mut documents_builder = Sorter::builder(docs_merge as MergeFn); + documents_builder.chunk_compression_type(CompressionType::Snappy); + let arc_cache_size = arc_cache_size.unwrap_or(65_535); Store { @@ -111,6 +113,7 @@ impl Store { word_position_docids: ArcCache::new(arc_cache_size), word_attribute_docids: ArcCache::new(arc_cache_size), sorter: builder.build(), + documents_sorter: documents_builder.build(), } } @@ -144,13 +147,7 @@ impl Store { } pub fn write_document(&mut self, id: DocumentId, content: &[u8]) -> anyhow::Result<()> { - let id = id.to_be_bytes(); - let mut key = Vec::with_capacity(1 + id.len()); - - key.push(DOCUMENT_BYTE); - key.extend_from_slice(&id); - - Ok(self.sorter.insert(&key, content)?) + Ok(self.documents_sorter.insert(id.to_be_bytes(), content)?) } fn write_word_positions(sorter: &mut Sorter, iter: I) -> anyhow::Result<()> @@ -245,6 +242,12 @@ impl Store { let fst = builder.into_set(); wtr.insert(WORDS_FST_KEY, fst.as_fst().as_bytes())?; + let mut docs_wtr = tempfile::tempfile().map(Writer::new)?; + self.documents_sorter.write_into(&mut docs_wtr)?; + let docs_file = docs_wtr.into_inner()?; + let docs_mmap = unsafe { Mmap::map(&docs_file)? }; + wtr.insert(DOCUMENTS_KEY, docs_mmap)?; + let file = wtr.into_inner()?; let mmap = unsafe { Mmap::map(&file)? }; let reader = Reader::new(mmap)?; @@ -253,6 +256,12 @@ impl Store { } } +fn docs_merge(key: &[u8], values: &[Vec]) -> Result, ()> { + let key = key.try_into().unwrap(); + let id = u32::from_be_bytes(key); + panic!("documents must not conflict ({} with {} values)!", id, values.len()) +} + fn merge(key: &[u8], values: &[Vec]) -> Result, ()> { match key { WORDS_FST_KEY => { @@ -271,6 +280,20 @@ fn merge(key: &[u8], values: &[Vec]) -> Result, ()> { assert!(values.windows(2).all(|vs| vs[0] == vs[1])); Ok(values[0].to_vec()) }, + DOCUMENTS_KEY => { + let sources: Vec<_> = values.iter().map(Reader::new).collect::>().unwrap(); + + let mut builder = Merger::builder(docs_merge); + builder.extend(sources); + let merger = builder.build(); + + let mut builder = Writer::builder(); + builder.compression_type(CompressionType::Snappy); + + let mut wtr = builder.memory(); + merger.write_into(&mut wtr).unwrap(); + Ok(wtr.into_inner().unwrap()) + }, key => match key[0] { WORD_POSITIONS_BYTE | WORD_POSITION_DOCIDS_BYTE | WORD_ATTRIBUTE_DOCIDS_BYTE => { let mut first = RoaringBitmap::deserialize_from(values[0].as_slice()).unwrap(); @@ -284,10 +307,6 @@ fn merge(key: &[u8], values: &[Vec]) -> Result, ()> { first.serialize_into(&mut vec).unwrap(); Ok(vec) }, - DOCUMENT_BYTE => { - assert!(values.windows(2).all(|vs| vs[0] == vs[1])); - Ok(values[0].to_vec()) - }, otherwise => panic!("wut {:?}", otherwise), } } @@ -304,6 +323,10 @@ fn lmdb_writer(wtxn: &mut heed::RwTxn, index: &Index, key: &[u8], val: &[u8]) -> // Write the headers index.main.put::<_, Str, ByteSlice>(wtxn, "headers", val)?; } + else if key == DOCUMENTS_KEY { + // Write the documents + index.main.put::<_, Str, ByteSlice>(wtxn, "documents", val)?; + } else if key.starts_with(&[WORD_POSITIONS_BYTE]) { // Write the postings lists index.word_positions.as_polymorph() @@ -319,11 +342,6 @@ fn lmdb_writer(wtxn: &mut heed::RwTxn, index: &Index, key: &[u8], val: &[u8]) -> index.word_attribute_docids.as_polymorph() .put::<_, ByteSlice, ByteSlice>(wtxn, &key[1..], val)?; } - else if key.starts_with(&[DOCUMENT_BYTE]) { - // Write the documents - index.documents.as_polymorph() - .put::<_, ByteSlice, ByteSlice>(wtxn, &key[1..], val)?; - } Ok(()) } @@ -357,7 +375,7 @@ fn index_csv( max_memory: Option, ) -> anyhow::Result> { - debug!("{:?}: Indexing into an Indexed...", thread_index); + debug!("{:?}: Indexing into a Store...", thread_index); let mut store = Store::new(arc_cache_size, max_nb_chunks, max_memory); @@ -480,7 +498,7 @@ fn main() -> anyhow::Result<()> { let mut wtxn = env.write_txn()?; merge_into_lmdb(stores, |k, v| lmdb_writer(&mut wtxn, &index, k, v))?; - let count = index.documents.len(&wtxn)?; + let count = index.documents(&wtxn)?.unwrap().metadata().count_entries; wtxn.commit()?; debug!("Wrote {} documents into LMDB", count); diff --git a/src/bin/search.rs b/src/bin/search.rs index c3fd7cd66..832013d2a 100644 --- a/src/bin/search.rs +++ b/src/bin/search.rs @@ -5,7 +5,7 @@ use std::time::Instant; use heed::EnvOpenOptions; use log::debug; -use milli::{Index, BEU32}; +use milli::Index; use structopt::StructOpt; #[cfg(target_os = "linux")] @@ -67,9 +67,11 @@ fn main() -> anyhow::Result<()> { let mut stdout = io::stdout(); stdout.write_all(&headers)?; + let documents = index.documents(&rtxn)?.unwrap(); for id in &documents_ids { - if let Some(content) = index.documents.get(&rtxn, &BEU32::new(*id))? { - stdout.write_all(&content)?; + let id_bytes = id.to_be_bytes(); + if let Some(content) = documents.clone().get(&id_bytes)? { + stdout.write_all(content.as_ref())?; } } diff --git a/src/bin/serve.rs b/src/bin/serve.rs index 1d0dc6277..9554a6c2f 100644 --- a/src/bin/serve.rs +++ b/src/bin/serve.rs @@ -13,7 +13,7 @@ use slice_group_by::StrGroupBy; use structopt::StructOpt; use warp::{Filter, http::Response}; -use milli::{BEU32, Index}; +use milli::Index; #[cfg(target_os = "linux")] #[global_allocator] @@ -87,7 +87,7 @@ async fn main() -> anyhow::Result<()> { // the disk file size and the number of documents in the database. let db_name = opt.database.file_stem().and_then(|s| s.to_str()).unwrap_or("").to_string(); let db_size = File::open(opt.database.join("data.mdb"))?.metadata()?.len() as usize; - let docs_count = env.read_txn().and_then(|r| index.documents.len(&r))?; + let docs_count = env.read_txn().and_then(|r| Ok(index.documents(&r).unwrap().unwrap().metadata().count_entries))?; // We run and wait on the HTTP server @@ -98,7 +98,7 @@ async fn main() -> anyhow::Result<()> { IndexTemplate { db_name: db_name.clone(), db_size, - docs_count, + docs_count: docs_count as usize, } }); @@ -185,11 +185,13 @@ async fn main() -> anyhow::Result<()> { if let Some(headers) = index.headers(&rtxn).unwrap() { // We write the headers body.extend_from_slice(headers); + let documents = index.documents(&rtxn).unwrap().unwrap(); for id in documents_ids { - let content = index.documents.get(&rtxn, &BEU32::new(id)).unwrap(); + let id_bytes = id.to_be_bytes(); + let content = documents.clone().get(&id_bytes).unwrap(); let content = content.expect(&format!("could not find document {}", id)); - let content = std::str::from_utf8(content).unwrap(); + let content = std::str::from_utf8(content.as_ref()).unwrap(); let content = if disable_highlighting { Cow::from(content) diff --git a/src/lib.rs b/src/lib.rs index f9f26d1d5..40a8b9a03 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,6 +16,7 @@ use heed::{PolyDatabase, Database}; use levenshtein_automata::LevenshteinAutomatonBuilder as LevBuilder; use log::debug; use once_cell::sync::Lazy; +use oxidized_mtbl::Reader; use roaring::RoaringBitmap; use self::best_proximity::BestProximity; @@ -49,8 +50,6 @@ pub struct Index { pub prefix_word_position_docids: Database, /// Maps a word and an attribute (u32) to all the documents ids that it appears in. pub word_attribute_docids: Database, - /// Maps an internal document to the content of the document in CSV. - pub documents: Database, ByteSlice>, } impl Index { @@ -62,7 +61,6 @@ impl Index { word_position_docids: env.create_database(Some("word-position-docids"))?, prefix_word_position_docids: env.create_database(Some("prefix-word-position-docids"))?, word_attribute_docids: env.create_database(Some("word-attribute-docids"))?, - documents: env.create_database(Some("documents"))?, }) } @@ -74,6 +72,13 @@ impl Index { self.main.get::<_, Str, ByteSlice>(rtxn, "headers") } + pub fn documents<'t>(&self, rtxn: &'t heed::RoTxn) -> anyhow::Result>> { + match self.main.get::<_, Str, ByteSlice>(rtxn, "documents")? { + Some(bytes) => Ok(Some(Reader::new(bytes)?)), + None => Ok(None), + } + } + pub fn number_of_attributes<'t>(&self, rtxn: &'t heed::RoTxn) -> anyhow::Result> { match self.headers(rtxn)? { Some(headers) => {