Introduce MTBL parallel merging before LMDB writing

This commit is contained in:
Kerollmops 2020-05-31 14:20:17 +02:00
parent 6762c2d08f
commit 24587148fd
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
3 changed files with 372 additions and 114 deletions

225
Cargo.lock generated
View File

@ -1,5 +1,11 @@
# This file is automatically @generated by Cargo. # This file is automatically @generated by Cargo.
# It is not intended for manual editing. # It is not intended for manual editing.
[[package]]
name = "adler32"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d2e7343e7fc9de883d1b0341e0b13970f764c14101234857d2ddafa1cb1cac2"
[[package]] [[package]]
name = "anyhow" name = "anyhow"
version = "1.0.31" version = "1.0.31"
@ -66,6 +72,9 @@ name = "cc"
version = "1.0.54" version = "1.0.54"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7bbb73db36c1246e9034e307d0fba23f9a2e251faa47ade70c1bd252220c8311" checksum = "7bbb73db36c1246e9034e307d0fba23f9a2e251faa47ade70c1bd252220c8311"
dependencies = [
"jobserver",
]
[[package]] [[package]]
name = "cfg-if" name = "cfg-if"
@ -84,6 +93,21 @@ dependencies = [
"unicode-width", "unicode-width",
] ]
[[package]]
name = "crc32c"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77ba37ef26c12988c1cee882d522d65e1d5d2ad8c3864665b88ee92767ed84c5"
[[package]]
name = "crc32fast"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba125de2af0df55319f41944744ad91c71113bf74a4646efff39afe1f6842db1"
dependencies = [
"cfg-if",
]
[[package]] [[package]]
name = "crossbeam-deque" name = "crossbeam-deque"
version = "0.7.3" version = "0.7.3"
@ -165,6 +189,18 @@ version = "1.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb1f6b1ce1c140482ea30ddd3335fc0024ac7ee112895426e0a629a6c20adfe3" checksum = "bb1f6b1ce1c140482ea30ddd3335fc0024ac7ee112895426e0a629a6c20adfe3"
[[package]]
name = "flate2"
version = "1.0.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2cfff41391129e0a856d6d822600b8d71179d46879e310417eb9c762eb178b42"
dependencies = [
"cfg-if",
"crc32fast",
"libc",
"miniz_oxide",
]
[[package]] [[package]]
name = "fs_extra" name = "fs_extra"
version = "1.1.0" version = "1.1.0"
@ -186,6 +222,23 @@ dependencies = [
"byteorder 1.3.4", "byteorder 1.3.4",
] ]
[[package]]
name = "getrandom"
version = "0.1.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7abc8dd8451921606d809ba32e95b6111925cd2906060d2dcc29c070220503eb"
dependencies = [
"cfg-if",
"libc",
"wasi",
]
[[package]]
name = "glob"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574"
[[package]] [[package]]
name = "heck" name = "heck"
version = "0.3.1" version = "0.3.1"
@ -251,6 +304,15 @@ dependencies = [
"unicode-normalization", "unicode-normalization",
] ]
[[package]]
name = "itertools"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "284f18f85651fe11e8a991b2adb42cb078325c996ed026d994719efcfca1d54b"
dependencies = [
"either",
]
[[package]] [[package]]
name = "itoa" name = "itoa"
version = "0.4.5" version = "0.4.5"
@ -278,6 +340,15 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "jobserver"
version = "0.1.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c71313ebb9439f74b00d9d2dcec36440beaf57a6aa0623068441dd7cd81a7f2"
dependencies = [
"libc",
]
[[package]] [[package]]
name = "lazy_static" name = "lazy_static"
version = "1.4.0" version = "1.4.0"
@ -325,11 +396,15 @@ dependencies = [
"fxhash", "fxhash",
"heed", "heed",
"jemallocator", "jemallocator",
"memmap",
"oxidized-mtbl",
"rayon", "rayon",
"roaring", "roaring",
"slice-group-by", "slice-group-by",
"smallstr", "smallstr",
"smallvec",
"structopt", "structopt",
"tempfile",
] ]
[[package]] [[package]]
@ -338,6 +413,16 @@ version = "2.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3728d817d99e5ac407411fa471ff9800a778d88a24685968b36824eaf4bee400" checksum = "3728d817d99e5ac407411fa471ff9800a778d88a24685968b36824eaf4bee400"
[[package]]
name = "memmap"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6585fd95e7bb50d6cc31e20d4cf9afb4e2ba16c5846fc76793f11218da9c475b"
dependencies = [
"libc",
"winapi",
]
[[package]] [[package]]
name = "memoffset" name = "memoffset"
version = "0.5.4" version = "0.5.4"
@ -347,6 +432,15 @@ dependencies = [
"autocfg", "autocfg",
] ]
[[package]]
name = "miniz_oxide"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aa679ff6578b1cddee93d7e82e263b94a575e0bfced07284eb0c037c1d2416a5"
dependencies = [
"adler32",
]
[[package]] [[package]]
name = "num_cpus" name = "num_cpus"
version = "1.13.0" version = "1.13.0"
@ -363,6 +457,18 @@ version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b631f7e854af39a1739f401cf34a8a013dfe09eac4fa4dba91e9768bd28168d" checksum = "0b631f7e854af39a1739f401cf34a8a013dfe09eac4fa4dba91e9768bd28168d"
[[package]]
name = "oxidized-mtbl"
version = "0.1.0"
source = "git+https://github.com/Kerollmops/oxidized-mtbl.git?rev=8918476#8918476f61f4430890d067db7b4a6cfb2d549c43"
dependencies = [
"byteorder 1.3.4",
"crc32c",
"flate2",
"snap",
"zstd",
]
[[package]] [[package]]
name = "page_size" name = "page_size"
version = "0.4.2" version = "0.4.2"
@ -385,6 +491,12 @@ version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05da548ad6865900e60eaba7f589cc0783590a92e940c26953ff81ddbab2d677" checksum = "05da548ad6865900e60eaba7f589cc0783590a92e940c26953ff81ddbab2d677"
[[package]]
name = "ppv-lite86"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "237a5ed80e274dbc66f86bd59c1e25edc039660be53194b5fe0a482e0f2612ea"
[[package]] [[package]]
name = "proc-macro-error" name = "proc-macro-error"
version = "1.0.2" version = "1.0.2"
@ -429,6 +541,47 @@ dependencies = [
"proc-macro2", "proc-macro2",
] ]
[[package]]
name = "rand"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03"
dependencies = [
"getrandom",
"libc",
"rand_chacha",
"rand_core",
"rand_hc",
]
[[package]]
name = "rand_chacha"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402"
dependencies = [
"ppv-lite86",
"rand_core",
]
[[package]]
name = "rand_core"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19"
dependencies = [
"getrandom",
]
[[package]]
name = "rand_hc"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c"
dependencies = [
"rand_core",
]
[[package]] [[package]]
name = "rayon" name = "rayon"
version = "1.3.0" version = "1.3.0"
@ -453,6 +606,12 @@ dependencies = [
"num_cpus", "num_cpus",
] ]
[[package]]
name = "redox_syscall"
version = "0.1.56"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2439c63f3f6139d1b57529d16bc3b8bb855230c8efcc5d3a896c8bea7c3b1e84"
[[package]] [[package]]
name = "regex-automata" name = "regex-automata"
version = "0.1.9" version = "0.1.9"
@ -462,6 +621,15 @@ dependencies = [
"byteorder 1.3.4", "byteorder 1.3.4",
] ]
[[package]]
name = "remove_dir_all"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a83fa3702a688b9359eccba92d153ac33fd2e8462f9e0e3fdf155239ea7792e"
dependencies = [
"winapi",
]
[[package]] [[package]]
name = "roaring" name = "roaring"
version = "0.5.2" version = "0.5.2"
@ -521,6 +689,12 @@ version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c7cb5678e1615754284ec264d9bb5b4c27d2018577fd90ac0ceb578591ed5ee4" checksum = "c7cb5678e1615754284ec264d9bb5b4c27d2018577fd90ac0ceb578591ed5ee4"
[[package]]
name = "snap"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7fb9b0bb877b35a1cc1474a3b43d9c226a2625311760cdda2cbccbc0c7a8376"
[[package]] [[package]]
name = "structopt" name = "structopt"
version = "0.3.14" version = "0.3.14"
@ -579,6 +753,20 @@ dependencies = [
"unicode-xid", "unicode-xid",
] ]
[[package]]
name = "tempfile"
version = "3.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a6e24d9338a0a5be79593e2fa15a648add6138caa803e2d5bc782c371732ca9"
dependencies = [
"cfg-if",
"libc",
"rand",
"redox_syscall",
"remove_dir_all",
"winapi",
]
[[package]] [[package]]
name = "textwrap" name = "textwrap"
version = "0.11.0" version = "0.11.0"
@ -641,6 +829,12 @@ version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5a972e5669d67ba988ce3dc826706fb0a8b01471c088cb0b6110b805cc36aed" checksum = "b5a972e5669d67ba988ce3dc826706fb0a8b01471c088cb0b6110b805cc36aed"
[[package]]
name = "wasi"
version = "0.9.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519"
[[package]] [[package]]
name = "winapi" name = "winapi"
version = "0.3.8" version = "0.3.8"
@ -683,3 +877,34 @@ dependencies = [
"syn", "syn",
"synstructure", "synstructure",
] ]
[[package]]
name = "zstd"
version = "0.5.2+zstd.1.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "644352b10ce7f333d6e0af85bd4f5322dc449416dc1211c6308e95bca8923db4"
dependencies = [
"zstd-safe",
]
[[package]]
name = "zstd-safe"
version = "2.0.4+zstd.1.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7113c0c9aed2c55181f2d9f5b0a36e7d2c0183b11c058ab40b35987479efe4d7"
dependencies = [
"libc",
"zstd-sys",
]
[[package]]
name = "zstd-sys"
version = "1.4.16+zstd.1.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c442965efc45353be5a9b9969c9b0872fff6828c7e06d118dda2cb2d0bb11d5a"
dependencies = [
"cc",
"glob",
"itertools",
"libc",
]

View File

@ -8,16 +8,20 @@ edition = "2018"
anyhow = "1.0.28" anyhow = "1.0.28"
bitpacking = "0.8.2" bitpacking = "0.8.2"
byteorder = "1.3.4" byteorder = "1.3.4"
roaring = "0.5.2"
csv = "1.1.3" csv = "1.1.3"
fst = "0.4.3" fst = "0.4.3"
fxhash = "0.2.1" fxhash = "0.2.1"
heed = { version = "0.8.0", default-features = false, features = ["lmdb"] } heed = { version = "0.8.0", default-features = false, features = ["lmdb"] }
jemallocator = "0.3.2" jemallocator = "0.3.2"
memmap = "0.7.0"
oxidized-mtbl = { git = "https://github.com/Kerollmops/oxidized-mtbl.git", rev = "8918476" }
rayon = "1.3.0" rayon = "1.3.0"
roaring = "0.5.2"
slice-group-by = "0.2.6" slice-group-by = "0.2.6"
smallstr = "0.2.0" smallstr = "0.2.0"
smallvec = "1.4.0"
structopt = { version = "0.3.14", default-features = false } structopt = { version = "0.3.14", default-features = false }
tempfile = "3.1.0"
[profile.release] [profile.release]
debug = true debug = true

View File

@ -1,23 +1,25 @@
use std::collections::hash_map::Entry;
use std::collections::{HashMap, BTreeSet}; use std::collections::{HashMap, BTreeSet};
use std::convert::TryFrom; use std::convert::TryFrom;
use std::convert::TryInto;
use std::fs::File; use std::fs::File;
use std::hash::BuildHasherDefault; use std::hash::BuildHasherDefault;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use anyhow::{ensure, Context}; use anyhow::Context;
use roaring::RoaringBitmap; use fst::{Streamer, IntoStreamer};
use fst::IntoStreamer;
use fxhash::FxHasher32; use fxhash::FxHasher32;
use heed::{EnvOpenOptions, PolyDatabase, Database};
use heed::types::*; use heed::types::*;
use heed::{EnvOpenOptions, PolyDatabase, Database};
use oxidized_mtbl::{Reader, ReaderOptions, Writer, Merger, MergerOptions};
use rayon::prelude::*; use rayon::prelude::*;
use roaring::RoaringBitmap;
use slice_group_by::StrGroupBy; use slice_group_by::StrGroupBy;
use structopt::StructOpt; use structopt::StructOpt;
pub type FastMap4<K, V> = HashMap<K, V, BuildHasherDefault<FxHasher32>>; pub type FastMap4<K, V> = HashMap<K, V, BuildHasherDefault<FxHasher32>>;
pub type SmallString32 = smallstr::SmallString<[u8; 32]>; pub type SmallString32 = smallstr::SmallString<[u8; 32]>;
pub type SmallVec32 = smallvec::SmallVec<[u8; 32]>;
pub type BEU32 = heed::zerocopy::U32<heed::byteorder::BE>; pub type BEU32 = heed::zerocopy::U32<heed::byteorder::BE>;
pub type DocumentId = u32; pub type DocumentId = u32;
@ -39,100 +41,126 @@ struct Opt {
files_to_index: Vec<PathBuf>, files_to_index: Vec<PathBuf>,
} }
fn union_postings_ids(_key: &[u8], old_value: Option<&[u8]>, new_value: RoaringBitmap) -> Option<Vec<u8>> {
let result = match old_value {
Some(bytes) => {
let mut old_value = RoaringBitmap::deserialize_from(bytes).unwrap();
old_value.union_with(&new_value);
old_value
},
None => new_value,
};
let mut vec = Vec::new();
result.serialize_into(&mut vec).unwrap();
Some(vec)
}
fn union_words_fst(key: &[u8], old_value: Option<&[u8]>, new_value: &fst::Set<Vec<u8>>) -> Option<Vec<u8>> {
if key != b"words-fst" { unimplemented!() }
// Do an union of the old and the new set of words.
let mut builder = fst::set::OpBuilder::new();
let old_words = old_value.map(|v| fst::Set::new(v).unwrap());
let old_words = old_words.as_ref().map(|v| v.into_stream());
if let Some(old_words) = old_words {
builder.push(old_words);
}
builder.push(new_value);
let op = builder.r#union();
let mut build = fst::SetBuilder::memory();
build.extend_stream(op.into_stream()).unwrap();
Some(build.into_inner().unwrap())
}
fn alphanumeric_tokens(string: &str) -> impl Iterator<Item = &str> { fn alphanumeric_tokens(string: &str) -> impl Iterator<Item = &str> {
let is_alphanumeric = |s: &&str| s.chars().next().map_or(false, char::is_alphanumeric); let is_alphanumeric = |s: &&str| s.chars().next().map_or(false, char::is_alphanumeric);
string.linear_group_by_key(|c| c.is_alphanumeric()).filter(is_alphanumeric) string.linear_group_by_key(|c| c.is_alphanumeric()).filter(is_alphanumeric)
} }
#[derive(Default)]
struct Indexed { struct Indexed {
fst: fst::Set<Vec<u8>>, fst: fst::Set<Vec<u8>>,
postings_ids: FastMap4<SmallString32, RoaringBitmap>, postings_ids: FastMap4<SmallVec32, RoaringBitmap>,
headers: Vec<u8>, headers: Vec<u8>,
documents: Vec<(DocumentId, Vec<u8>)>, documents: Vec<(DocumentId, Vec<u8>)>,
} }
impl Indexed { #[derive(Default)]
fn merge_with(mut self, mut other: Indexed) -> Indexed { struct MtblKvStore(Option<File>);
// Union of the two FSTs impl MtblKvStore {
let op = fst::set::OpBuilder::new() fn from_indexed(mut indexed: Indexed) -> anyhow::Result<MtblKvStore> {
.add(self.fst.into_stream()) let outfile = tempfile::tempfile()?;
.add(other.fst.into_stream()) let mut out = Writer::new(outfile, None)?;
.r#union();
let mut build = fst::SetBuilder::memory(); out.add(b"\0headers", indexed.headers)?;
build.extend_stream(op.into_stream()).unwrap(); out.add(b"\0words-fst", indexed.fst.as_fst().as_bytes())?;
let fst = build.into_set();
// Merge the postings by unions // postings ids keys are all prefixed by a '1'
for (word, mut postings) in other.postings_ids { let mut key = vec![1];
match self.postings_ids.entry(word) { let mut buffer = Vec::new();
Entry::Occupied(mut entry) => { // We must write the postings ids in order for mtbl therefore
let old = entry.get(); // we iterate over the fst to read the words in order
postings.union_with(&old); let mut stream = indexed.fst.stream();
entry.insert(postings); while let Some(word) = stream.next() {
}, key.truncate(1);
Entry::Vacant(entry) => { key.extend_from_slice(word);
entry.insert(postings); if let Some(ids) = indexed.postings_ids.remove(word) {
}, buffer.clear();
ids.serialize_into(&mut buffer)?;
out.add(&key, &buffer).unwrap();
} }
} }
// assert headers are valid // postings ids keys are all prefixed by a '2'
if !self.headers.is_empty() { key[0] = 2;
assert_eq!(self.headers, other.headers); indexed.documents.sort_unstable();
for (id, content) in indexed.documents {
key.truncate(1);
key.extend_from_slice(&id.to_be_bytes());
out.add(&key, content).unwrap();
} }
// extend the documents let out = out.into_inner()?;
self.documents.append(&mut other.documents); Ok(MtblKvStore(Some(out)))
}
Indexed { fn merge_with(self, other: MtblKvStore) -> anyhow::Result<MtblKvStore> {
fst, let (left, right) = match (self.0, other.0) {
postings_ids: self.postings_ids, (Some(left), Some(right)) => (left, right),
headers: self.headers, (Some(left), None) => return Ok(MtblKvStore(Some(left))),
documents: self.documents, (None, Some(right)) => return Ok(MtblKvStore(Some(right))),
(None, None) => return Ok(MtblKvStore(None)),
};
let left = unsafe { memmap::Mmap::map(&left)? };
let right = unsafe { memmap::Mmap::map(&right)? };
let left = Reader::new(&left, ReaderOptions::default()).unwrap();
let right = Reader::new(&right, ReaderOptions::default()).unwrap();
fn merge(key: &[u8], left: &[u8], right: &[u8]) -> Option<Vec<u8>> {
if key == b"\0words-fst" {
let left_fst = fst::Set::new(left).unwrap();
let right_fst = fst::Set::new(right).unwrap();
// Union of the two FSTs
let op = fst::set::OpBuilder::new()
.add(left_fst.into_stream())
.add(right_fst.into_stream())
.r#union();
let mut build = fst::SetBuilder::memory();
build.extend_stream(op.into_stream()).unwrap();
Some(build.into_inner().unwrap())
}
else if key == b"\0headers" {
assert_eq!(left, right);
Some(left.to_vec())
}
else if key.starts_with(&[1]) {
let mut left = RoaringBitmap::deserialize_from(left).unwrap();
let right = RoaringBitmap::deserialize_from(right).unwrap();
left.union_with(&right);
let mut vec = Vec::new();
left.serialize_into(&mut vec).unwrap();
Some(vec)
}
else if key.starts_with(&[2]) {
assert_eq!(left, right);
Some(left.to_vec())
}
else {
panic!("wut? {:?}", key)
}
} }
let outfile = tempfile::tempfile()?;
let mut out = Writer::new(outfile, None)?;
let sources = vec![left, right];
let opt = MergerOptions { merge };
let mut merger = Merger::new(sources, opt);
let mut iter = merger.iter();
while let Some((k, v)) = iter.next() {
out.add(k, v).unwrap();
}
let out = out.into_inner()?;
Ok(MtblKvStore(Some(out)))
} }
} }
fn index_csv(mut rdr: csv::Reader<File>) -> anyhow::Result<Indexed> { fn index_csv(mut rdr: csv::Reader<File>) -> anyhow::Result<MtblKvStore> {
const MAX_POSITION: usize = 1000; const MAX_POSITION: usize = 1000;
const MAX_ATTRIBUTES: usize = u32::max_value() as usize / MAX_POSITION; const MAX_ATTRIBUTES: usize = u32::max_value() as usize / MAX_POSITION;
@ -153,7 +181,7 @@ fn index_csv(mut rdr: csv::Reader<File>) -> anyhow::Result<Indexed> {
for (_attr, content) in document.iter().enumerate().take(MAX_ATTRIBUTES) { for (_attr, content) in document.iter().enumerate().take(MAX_ATTRIBUTES) {
for (_pos, word) in alphanumeric_tokens(&content).enumerate().take(MAX_POSITION) { for (_pos, word) in alphanumeric_tokens(&content).enumerate().take(MAX_POSITION) {
if !word.is_empty() && word.len() < 500 { // LMDB limits if !word.is_empty() && word.len() < 500 { // LMDB limits
postings_ids.entry(SmallString32::from(word)) postings_ids.entry(SmallVec32::from(word.as_bytes()))
.or_insert_with(RoaringBitmap::new) .or_insert_with(RoaringBitmap::new)
.insert(document_id); .insert(document_id);
} }
@ -173,44 +201,51 @@ fn index_csv(mut rdr: csv::Reader<File>) -> anyhow::Result<Indexed> {
new_words.insert(word.clone()); new_words.insert(word.clone());
} }
let new_words_fst = fst::Set::from_iter(new_words.iter().map(SmallString32::as_str))?; let new_words_fst = fst::Set::from_iter(new_words.iter().map(SmallVec32::as_ref))?;
Ok(Indexed { fst: new_words_fst, headers, postings_ids, documents }) let indexed = Indexed { fst: new_words_fst, headers, postings_ids, documents };
MtblKvStore::from_indexed(indexed)
} }
// TODO merge with the previous values
fn writer( fn writer(
wtxn: &mut heed::RwTxn, wtxn: &mut heed::RwTxn,
main: PolyDatabase, main: PolyDatabase,
postings_ids: Database<Str, ByteSlice>, postings_ids: Database<Str, ByteSlice>,
documents: Database<OwnedType<BEU32>, ByteSlice>, documents: Database<OwnedType<BEU32>, ByteSlice>,
indexed: Indexed, mtbl_store: MtblKvStore,
) -> anyhow::Result<usize> ) -> anyhow::Result<usize>
{ {
// Write and merge the words fst let mtbl_store = match mtbl_store.0 {
let old_value = main.get::<_, Str, ByteSlice>(wtxn, "words-fst")?; Some(store) => unsafe { memmap::Mmap::map(&store)? },
let new_value = union_words_fst(b"words-fst", old_value, &indexed.fst) None => return Ok(0),
.context("error while do a words-fst union")?; };
main.put::<_, Str, ByteSlice>(wtxn, "words-fst", &new_value)?; let mtbl_store = Reader::new(&mtbl_store, ReaderOptions::default()).unwrap();
// Write the words fst
let fst = mtbl_store.get(b"\0words-fst").unwrap();
let fst = fst::Set::new(fst)?;
main.put::<_, Str, ByteSlice>(wtxn, "words-fst", &fst.as_fst().as_bytes())?;
// Write and merge the headers // Write and merge the headers
if let Some(old_headers) = main.get::<_, Str, ByteSlice>(wtxn, "headers")? { let headers = mtbl_store.get(b"\0headers").unwrap();
ensure!(old_headers == &*indexed.headers, "headers differs from the previous ones"); main.put::<_, Str, ByteSlice>(wtxn, "headers", headers.as_ref())?;
}
main.put::<_, Str, ByteSlice>(wtxn, "headers", &indexed.headers)?;
// Write and merge the postings lists // Write and merge the postings lists
for (word, postings) in indexed.postings_ids { let mut iter = mtbl_store.iter_prefix(&[1]).unwrap();
let old_value = postings_ids.get(wtxn, word.as_str())?; while let Some((word, postings)) = iter.next() {
let new_value = union_postings_ids(word.as_bytes(), old_value, postings) let word = std::str::from_utf8(&word[1..]).unwrap();
.context("error while do a words-fst union")?; postings_ids.put(wtxn, &word, &postings)?;
postings_ids.put(wtxn, &word, &new_value)?;
} }
let count = indexed.documents.len();
// Write the documents // Write the documents
for (id, content) in indexed.documents { let mut count = 0;
let mut iter = mtbl_store.iter_prefix(&[2]).unwrap();
while let Some((id_bytes, content)) = iter.next() {
let id = id_bytes[1..].try_into().map(u32::from_be_bytes).unwrap();
documents.put(wtxn, &BEU32::new(id), &content)?; documents.put(wtxn, &BEU32::new(id), &content)?;
count += 1;
} }
Ok(count) Ok(count)
@ -232,29 +267,23 @@ fn main() -> anyhow::Result<()> {
let res = opt.files_to_index let res = opt.files_to_index
.into_par_iter() .into_par_iter()
.try_fold(|| Indexed::default(), |acc, path| { .try_fold(MtblKvStore::default, |acc, path| {
let rdr = csv::Reader::from_path(path)?; let rdr = csv::Reader::from_path(path)?;
let indexed = index_csv(rdr)?; let mtbl_store = index_csv(rdr)?;
Ok(acc.merge_with(indexed)) as anyhow::Result<Indexed> acc.merge_with(mtbl_store)
})
.map(|indexed| match indexed {
Ok(indexed) => {
let tid = rayon::current_thread_index();
eprintln!("{:?}: A new step to write into LMDB", tid);
let mut wtxn = env.write_txn()?;
let count = writer(&mut wtxn, main, postings_ids, documents, indexed)?;
wtxn.commit()?;
eprintln!("{:?}: Wrote {} documents into LMDB", tid, count);
Ok(count)
},
Err(e) => Err(e),
}) })
.inspect(|_| { .inspect(|_| {
eprintln!("Total number of documents seen so far is {}", ID_GENERATOR.load(Ordering::Relaxed)) eprintln!("Total number of documents seen so far is {}", ID_GENERATOR.load(Ordering::Relaxed))
}) })
.try_reduce(|| 0, |a, b| Ok(a + b)); .try_reduce(MtblKvStore::default, MtblKvStore::merge_with);
println!("indexed {:?} documents", res); let mtbl_store = res?;
eprintln!("We are writing into LMDB...");
let mut wtxn = env.write_txn()?;
let count = writer(&mut wtxn, main, postings_ids, documents, mtbl_store)?;
wtxn.commit()?;
eprintln!("Wrote {} documents into LMDB", count);
Ok(()) Ok(())
} }