From f980422c57bd8a474060fda7d6572b8e999b0ddb Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Wed, 14 Oct 2020 09:59:27 +0200 Subject: [PATCH] Move from oxidized-mtbl to grenad --- Cargo.lock | 173 +++++++++++++++++++++------------------------ Cargo.toml | 8 +-- src/bin/indexer.rs | 80 ++++++++++++--------- 3 files changed, 130 insertions(+), 131 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ae82d20d2..ee1197f70 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -155,7 +155,7 @@ version = "0.2.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "31accafdb70df7871592c058eca3985b71104e15ac32f64706022c58867da931" dependencies = [ - "lazy_static 1.4.0", + "lazy_static", "memchr", "regex-automata", "serde", @@ -251,12 +251,6 @@ dependencies = [ "bitflags", ] -[[package]] -name = "crc32c" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77ba37ef26c12988c1cee882d522d65e1d5d2ad8c3864665b88ee92767ed84c5" - [[package]] name = "crc32fast" version = "1.2.0" @@ -268,9 +262,9 @@ dependencies = [ [[package]] name = "criterion" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "63f696897c88b57f4ffe3c69d8e1a0613c7d0e6c4833363c8560fbde9c47b966" +checksum = "70daa7ceec6cf143990669a04c7df13391d55fb27bd4079d252fca774ba244d8" dependencies = [ "atty", "cast", @@ -278,13 +272,14 @@ dependencies = [ "criterion-plot", "csv", "itertools", - "lazy_static 1.4.0", + "lazy_static", "num-traits", "oorandom", "plotters", "rayon", "regex", "serde", + "serde_cbor", "serde_derive", "serde_json", "tinytemplate", @@ -293,9 +288,9 @@ dependencies = [ [[package]] name = "criterion-plot" -version = "0.4.2" +version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ddeaf7989f00f2e1d871a26a110f3ed713632feac17f65f03ca938c542618b60" +checksum = "e022feadec601fba1649cfa83586381a4ad31c6bf3a9ab7d408118b05dd9889d" dependencies = [ "cast", "itertools", @@ -321,7 +316,7 @@ dependencies = [ "autocfg 1.0.0", "cfg-if", "crossbeam-utils", - "lazy_static 1.4.0", + "lazy_static", "maybe-uninit", "memoffset", "scopeguard", @@ -345,7 +340,7 @@ checksum = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8" dependencies = [ "autocfg 1.0.0", "cfg-if", - "lazy_static 1.4.0", + "lazy_static", ] [[package]] @@ -579,6 +574,20 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574" +[[package]] +name = "grenad" +version = "0.1.0" +source = "git+https://github.com/Kerollmops/grenad.git?rev=a884670#a8846703ddee1f1ed86efd3168561606c244e135" +dependencies = [ + "byteorder", + "flate2", + "log 0.4.11", + "nix", + "snap", + "tempfile", + "zstd", +] + [[package]] name = "h2" version = "0.2.5" @@ -598,6 +607,12 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "half" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d36fab90f82edc3c747f9d438e06cf0a491055896f2a279638bb5beed6c40177" + [[package]] name = "hashbrown" version = "0.8.2" @@ -835,9 +850,9 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.40" +version = "0.3.45" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce10c23ad2ea25ceca0093bd3192229da4c5b3c0f2de499c1ecac0d98d452177" +checksum = "ca059e81d9486668f12d455a4ea6daa600bd408134cd17e3d3fb5a32d1f016f8" dependencies = [ "wasm-bindgen", ] @@ -852,12 +867,6 @@ dependencies = [ "winapi-build", ] -[[package]] -name = "lazy_static" -version = "0.2.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76f033c7ad61445c5b347c7382dd1237847eb1bce590fe50365dcb33d546be73" - [[package]] name = "lazy_static" version = "1.4.0" @@ -875,9 +884,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.70" +version = "0.2.79" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3baa92041a6fec78c687fa0cc2b3fae8884f743d672cf551bed1d6dac6988d0f" +checksum = "2448f6066e80e3bfc792e9c98bf705b4b0fc6e8ef5b43e5889aff0eaa9c58743" [[package]] name = "linked-hash-map" @@ -932,16 +941,6 @@ version = "2.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3728d817d99e5ac407411fa471ff9800a778d88a24685968b36824eaf4bee400" -[[package]] -name = "memmap" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6585fd95e7bb50d6cc31e20d4cf9afb4e2ba16c5846fc76793f11218da9c475b" -dependencies = [ - "libc", - "winapi 0.3.8", -] - [[package]] name = "memoffset" version = "0.5.4" @@ -965,6 +964,7 @@ dependencies = [ "flate2", "fst", "fxhash", + "grenad", "heed", "human_format", "itertools", @@ -972,10 +972,8 @@ dependencies = [ "levenshtein_automata", "linked-hash-map", "log 0.4.11", - "memmap", "near-proximity", "once_cell", - "oxidized-mtbl", "rayon", "ringtail", "roaring", @@ -1137,6 +1135,18 @@ dependencies = [ "winapi 0.3.8", ] +[[package]] +name = "nix" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85db2feff6bf70ebc3a4793191517d5f0331100a2f10f9bf93b5e5214f32b7b7" +dependencies = [ + "bitflags", + "cc", + "cfg-if", + "libc", +] + [[package]] name = "nom" version = "5.1.2" @@ -1200,21 +1210,6 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2839e79665f131bdb5782e51f2c6c9599c133c6098982a54c794358bf432529c" -[[package]] -name = "oxidized-mtbl" -version = "0.1.0" -source = "git+https://github.com/Kerollmops/oxidized-mtbl.git?rev=9ff8082#9ff808236980305183b2440f08cf471c58e24961" -dependencies = [ - "byteorder", - "crc32c", - "flate2", - "log 0.4.11", - "memmap", - "snap", - "tempfile", - "zstd", -] - [[package]] name = "page_size" version = "0.4.2" @@ -1556,7 +1551,7 @@ dependencies = [ "crossbeam-deque", "crossbeam-queue", "crossbeam-utils", - "lazy_static 1.4.0", + "lazy_static", "num_cpus", ] @@ -1586,9 +1581,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.3.9" +version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c3780fcf44b193bc4d09f36d2a3c87b251da4a046c87795a0d35f4f927ad8e6" +checksum = "8963b85b8ce3074fecffde43b4b0dded83ce2f367dc8d363afc56679f3ee820b" dependencies = [ "regex-syntax", ] @@ -1604,9 +1599,9 @@ dependencies = [ [[package]] name = "regex-syntax" -version = "0.6.18" +version = "0.6.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26412eb97c6b088a6997e05f69403a802a92d520de2f8e63c2b65f9e0f47c4e8" +checksum = "8cab7a364d15cde1e505267766a2d3c4e22a843e1a601f0fa7564c0f82ced11c" [[package]] name = "remove_dir_all" @@ -1698,6 +1693,16 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "serde_cbor" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e18acfa2f90e8b735b2836ab8d538de304cbb6729a7360729ea5a895d15a622" +dependencies = [ + "half", + "serde", +] + [[package]] name = "serde_derive" version = "1.0.110" @@ -1807,9 +1812,9 @@ dependencies = [ [[package]] name = "stderrlog" -version = "0.4.3" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32e5ee9b90a5452c570a0b0ac1c99ae9498db7e56e33d74366de7f2a7add7f25" +checksum = "b02f316286ae558d83acc93dd81eaba096e746987a7961d4a9ae026842bae67f" dependencies = [ "atty", "chrono", @@ -1825,7 +1830,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "863246aaf5ddd0d6928dfeb1a9ca65f505599e4e1b399935ef7e75107516b4ef" dependencies = [ "clap", - "lazy_static 1.4.0", + "lazy_static", "structopt-derive", ] @@ -1933,12 +1938,11 @@ dependencies = [ [[package]] name = "thread_local" -version = "0.3.4" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1697c4b57aeeb7a536b647165a2825faddffb1d3bad386d507709bd51a90bb14" +checksum = "d40c6d1b69745a6ec6fb1ca717914848da4b44ae29d9b3080cbee91d72a69b14" dependencies = [ - "lazy_static 0.2.11", - "unreachable", + "lazy_static", ] [[package]] @@ -1986,7 +1990,7 @@ dependencies = [ "fnv", "futures-core", "iovec", - "lazy_static 1.4.0", + "lazy_static", "libc", "memchr", "mio", @@ -2147,15 +2151,6 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "826e7639553986605ec5979c7dd957c7895e93eabed50ab2ffa7f6128a75097c" -[[package]] -name = "unreachable" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "382810877fe448991dfc7f0dd6e3ae5d58088fd0ea5e35189655f84e6814fa56" -dependencies = [ - "void", -] - [[package]] name = "url" version = "2.1.1" @@ -2191,12 +2186,6 @@ version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5a972e5669d67ba988ce3dc826706fb0a8b01471c088cb0b6110b805cc36aed" -[[package]] -name = "void" -version = "1.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" - [[package]] name = "walkdir" version = "2.3.1" @@ -2252,9 +2241,9 @@ checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" [[package]] name = "wasm-bindgen" -version = "0.2.63" +version = "0.2.68" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c2dc4aa152834bc334f506c1a06b866416a8b6697d5c9f75b9a689c8486def0" +checksum = "1ac64ead5ea5f05873d7c12b545865ca2b8d28adfc50a49b84770a3a97265d42" dependencies = [ "cfg-if", "wasm-bindgen-macro", @@ -2262,12 +2251,12 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.63" +version = "0.2.68" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ded84f06e0ed21499f6184df0e0cb3494727b0c5da89534e0fcc55c51d812101" +checksum = "f22b422e2a757c35a73774860af8e112bff612ce6cb604224e8e47641a9e4f68" dependencies = [ "bumpalo", - "lazy_static 1.4.0", + "lazy_static", "log 0.4.11", "proc-macro2", "quote", @@ -2277,9 +2266,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.63" +version = "0.2.68" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "838e423688dac18d73e31edce74ddfac468e37b1506ad163ffaf0a46f703ffe3" +checksum = "6b13312a745c08c469f0b292dd2fcd6411dba5f7160f593da6ef69b64e407038" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -2287,9 +2276,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.63" +version = "0.2.68" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3156052d8ec77142051a533cdd686cba889537b213f948cd1d20869926e68e92" +checksum = "f249f06ef7ee334cc3b8ff031bfc11ec99d00f34d86da7498396dc1e3b1498fe" dependencies = [ "proc-macro2", "quote", @@ -2300,15 +2289,15 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.63" +version = "0.2.68" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c9ba19973a58daf4db6f352eda73dc0e289493cd29fb2632eb172085b6521acd" +checksum = "1d649a3145108d7d3fbcde896a468d1bd636791823c9921135218ad89be08307" [[package]] name = "web-sys" -version = "0.3.40" +version = "0.3.45" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b72fe77fd39e4bd3eaa4412fd299a0be6b3dfe9d2597e2f1c20beb968f41d17" +checksum = "4bf6ef87ad7ae8008e15a355ce696bed26012b7caa21605188cfd8214ab51e2d" dependencies = [ "js-sys", "wasm-bindgen", diff --git a/Cargo.toml b/Cargo.toml index 43f43f602..cb8d0dd06 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,15 +13,14 @@ csv = "1.1.3" flate2 = "1.0.17" fst = "0.4.4" fxhash = "0.2.1" +grenad = { git = "https://github.com/Kerollmops/grenad.git", rev = "dee0815" } heed = { version = "0.8.1", default-features = false, features = ["lmdb"] } human_format = "1.0.3" jemallocator = "0.3.2" levenshtein_automata = { version = "0.2.0", features = ["fst_automaton"] } linked-hash-map = "0.5.3" -memmap = "0.7.0" near-proximity = { git = "https://github.com/Kerollmops/plane-sweep-proximity", rev = "6608205" } once_cell = "1.4.0" -oxidized-mtbl = { git = "https://github.com/Kerollmops/oxidized-mtbl.git", rev = "9ff8082" } rayon = "1.3.1" ringtail = "0.3.0" roaring = "0.6.1" @@ -36,7 +35,7 @@ itertools = "0.9.0" # logging log = "0.4.11" -stderrlog = "0.4.3" +stderrlog = "0.5.0" # http server askama = "0.10.1" @@ -46,13 +45,14 @@ tokio = { version = "0.2.15", features = ["full"] } warp = "0.2.2" [dev-dependencies] -criterion = "0.3" +criterion = "0.3.3" [build-dependencies] fst = "0.4.4" [features] default = [] +file-fuse = ["grenad/file-fuse"] [[bench]] name = "search" diff --git a/src/bin/indexer.rs b/src/bin/indexer.rs index 55f657312..22ea52ab1 100644 --- a/src/bin/indexer.rs +++ b/src/bin/indexer.rs @@ -1,7 +1,7 @@ use std::collections::{BTreeMap, HashMap}; use std::convert::TryFrom; use std::fs::File; -use std::io::{self, Read, Write}; +use std::io::{self, Read, Write, Seek, SeekFrom}; use std::iter::FromIterator; use std::path::PathBuf; use std::sync::mpsc::sync_channel; @@ -16,8 +16,7 @@ use fst::IntoStreamer; use heed::{EnvOpenOptions, BytesEncode, types::ByteSlice}; use linked_hash_map::LinkedHashMap; use log::{debug, info}; -use memmap::Mmap; -use oxidized_mtbl::{Reader, Writer, Merger, Sorter, CompressionType}; +use grenad::{Reader, FileFuse, Writer, Merger, Sorter, CompressionType}; use rayon::prelude::*; use roaring::RoaringBitmap; use structopt::StructOpt; @@ -110,6 +109,9 @@ struct IndexerOpt { /// The level of compression of the chosen algorithm. #[structopt(long, requires = "chunk-compression-type")] chunk_compression_level: Option, + + #[structopt(long, default_value = "4294967296")] // 4 GB + file_fusing_shrink_size: u64, } fn format_count(n: usize) -> String { @@ -120,7 +122,7 @@ fn lmdb_key_valid_size(key: &[u8]) -> bool { !key.is_empty() && key.len() <= LMDB_MAX_KEY_LENGTH } -fn create_writer(typ: CompressionType, level: Option, file: File) -> Writer { +fn create_writer(typ: CompressionType, level: Option, file: File) -> io::Result> { let mut builder = Writer::builder(); builder.compression_type(typ); if let Some(level) = level { @@ -129,21 +131,24 @@ fn create_writer(typ: CompressionType, level: Option, file: File) -> Writer builder.build(file) } -fn writer_into_reader(writer: Writer) -> anyhow::Result> { - let file = writer.into_inner()?; - let mmap = unsafe { Mmap::map(&file)? }; - Reader::new(mmap).map_err(Into::into) +fn writer_into_reader(writer: Writer, shrink_size: u64) -> anyhow::Result> { + let mut file = writer.into_inner()?; + file.seek(SeekFrom::Start(0))?; + let file = FileFuse::with_shrink_size(file, shrink_size); + Reader::new(file).map_err(Into::into) } fn create_sorter( merge: MergeFn, chunk_compression_type: CompressionType, chunk_compression_level: Option, + file_fusing_shrink_size: u64, max_nb_chunks: Option, max_memory: Option, ) -> Sorter { let mut builder = Sorter::builder(merge); + builder.file_fusing_shrink_size(file_fusing_shrink_size); builder.chunk_compression_type(chunk_compression_type); if let Some(level) = chunk_compression_level { builder.chunk_compression_level(level); @@ -194,11 +199,11 @@ fn compute_words_pair_proximities( type MergeFn = fn(&[u8], &[Vec]) -> Result, ()>; struct Readers { - main: Reader, - word_docids: Reader, - docid_word_positions: Reader, - words_pairs_proximities_docids: Reader, - documents: Reader, + main: Reader, + word_docids: Reader, + docid_word_positions: Reader, + words_pairs_proximities_docids: Reader, + documents: Reader, } struct Store { @@ -210,6 +215,7 @@ struct Store { // MTBL parameters chunk_compression_type: CompressionType, chunk_compression_level: Option, + file_fusing_shrink_size: u64, // MTBL sorters main_sorter: Sorter, word_docids_sorter: Sorter, @@ -226,6 +232,7 @@ impl Store { max_memory: Option, chunk_compression_type: CompressionType, chunk_compression_level: Option, + file_fusing_shrink_size: u64, ) -> anyhow::Result { // We divide the max memory by the number of sorter the Store have. @@ -235,6 +242,7 @@ impl Store { main_merge, chunk_compression_type, chunk_compression_level, + file_fusing_shrink_size, max_nb_chunks, max_memory, ); @@ -242,6 +250,7 @@ impl Store { word_docids_merge, chunk_compression_type, chunk_compression_level, + file_fusing_shrink_size, max_nb_chunks, max_memory, ); @@ -249,14 +258,15 @@ impl Store { words_pairs_proximities_docids_merge, chunk_compression_type, chunk_compression_level, + file_fusing_shrink_size, max_nb_chunks, max_memory, ); - let documents_writer = tempfile().map(|f| { + let documents_writer = tempfile().and_then(|f| { create_writer(chunk_compression_type, chunk_compression_level, f) })?; - let docid_word_positions_writer = tempfile().map(|f| { + let docid_word_positions_writer = tempfile().and_then(|f| { create_writer(chunk_compression_type, chunk_compression_level, f) })?; @@ -268,6 +278,7 @@ impl Store { documents_ids: RoaringBitmap::new(), chunk_compression_type, chunk_compression_level, + file_fusing_shrink_size, main_sorter, word_docids_sorter, @@ -510,6 +521,7 @@ impl Store { fn finish(mut self) -> anyhow::Result { let comp_type = self.chunk_compression_type; let comp_level = self.chunk_compression_level; + let shrink_size = self.file_fusing_shrink_size; Self::write_word_docids(&mut self.word_docids_sorter, self.word_docids)?; Self::write_documents_ids(&mut self.main_sorter, self.documents_ids)?; @@ -518,12 +530,11 @@ impl Store { self.words_pairs_proximities_docids, )?; - let mut word_docids_wtr = tempfile().map(|f| create_writer(comp_type, comp_level, f))?; + let mut word_docids_wtr = tempfile().and_then(|f| create_writer(comp_type, comp_level, f))?; let mut builder = fst::SetBuilder::memory(); let mut iter = self.word_docids_sorter.into_iter()?; - while let Some(result) = iter.next() { - let (word, val) = result?; + while let Some((word, val)) = iter.next()? { // This is a lexicographically ordered word position // we use the key to construct the words fst. builder.insert(word)?; @@ -533,17 +544,17 @@ impl Store { let fst = builder.into_set(); self.main_sorter.insert(WORDS_FST_KEY, fst.as_fst().as_bytes())?; - let mut main_wtr = tempfile().map(|f| create_writer(comp_type, comp_level, f))?; + let mut main_wtr = tempfile().and_then(|f| create_writer(comp_type, comp_level, f))?; self.main_sorter.write_into(&mut main_wtr)?; - let mut words_pairs_proximities_docids_wtr = tempfile().map(|f| create_writer(comp_type, comp_level, f))?; + let mut words_pairs_proximities_docids_wtr = tempfile().and_then(|f| create_writer(comp_type, comp_level, f))?; self.words_pairs_proximities_docids_sorter.write_into(&mut words_pairs_proximities_docids_wtr)?; - let main = writer_into_reader(main_wtr)?; - let word_docids = writer_into_reader(word_docids_wtr)?; - let words_pairs_proximities_docids = writer_into_reader(words_pairs_proximities_docids_wtr)?; - let docid_word_positions = writer_into_reader(self.docid_word_positions_writer)?; - let documents = writer_into_reader(self.documents_writer)?; + let main = writer_into_reader(main_wtr, shrink_size)?; + let word_docids = writer_into_reader(word_docids_wtr, shrink_size)?; + let words_pairs_proximities_docids = writer_into_reader(words_pairs_proximities_docids_wtr, shrink_size)?; + let docid_word_positions = writer_into_reader(self.docid_word_positions_writer, shrink_size)?; + let documents = writer_into_reader(self.documents_writer, shrink_size)?; Ok(Readers { main, @@ -614,7 +625,7 @@ fn documents_merge(key: &[u8], _values: &[Vec]) -> Result, ()> { panic!("merging documents is an error ({:?})", key.as_bstr()) } -fn merge_readers(sources: Vec>, merge: MergeFn) -> Merger { +fn merge_readers(sources: Vec>, merge: MergeFn) -> Merger { let mut builder = Merger::builder(merge); builder.extend(sources); builder.build() @@ -623,7 +634,7 @@ fn merge_readers(sources: Vec>, merge: MergeFn) -> Merger>, + sources: Vec>, merge: MergeFn, ) -> anyhow::Result<()> { debug!("Merging {} MTBL stores...", sources.len()); @@ -633,8 +644,7 @@ fn merge_into_lmdb_database( let mut in_iter = merger.into_merge_iter()?; let mut out_iter = database.iter_mut::<_, ByteSlice, ByteSlice>(wtxn)?; - while let Some(result) = in_iter.next() { - let (k, v) = result?; + while let Some((k, v)) = in_iter.next()? { out_iter.append(k, v).with_context(|| format!("writing {:?} into LMDB", k.as_bstr()))?; } @@ -645,15 +655,13 @@ fn merge_into_lmdb_database( fn write_into_lmdb_database( wtxn: &mut heed::RwTxn, database: heed::PolyDatabase, - reader: Reader, + mut reader: Reader, ) -> anyhow::Result<()> { debug!("Writing MTBL stores..."); let before = Instant::now(); - let mut in_iter = reader.into_iter()?; let mut out_iter = database.iter_mut::<_, ByteSlice, ByteSlice>(wtxn)?; - while let Some(result) = in_iter.next() { - let (k, v) = result?; + while let Some((k, v)) = reader.next()? { out_iter.append(k, v).with_context(|| format!("writing {:?} into LMDB", k.as_bstr()))?; } @@ -746,6 +754,7 @@ fn main() -> anyhow::Result<()> { let max_memory_by_job = opt.indexer.max_memory / num_threads; let chunk_compression_type = opt.indexer.chunk_compression_type; let chunk_compression_level = opt.indexer.chunk_compression_level; + let file_fusing_shrink_size = opt.indexer.file_fusing_shrink_size; let log_every_n = opt.indexer.log_every_n; let readers = csv_readers(opt.csv_file, num_threads)? @@ -758,6 +767,7 @@ fn main() -> anyhow::Result<()> { Some(max_memory_by_job), chunk_compression_type, chunk_compression_level, + file_fusing_shrink_size, )?; store.index_csv(rdr, i, num_threads, log_every_n) }) @@ -779,12 +789,12 @@ fn main() -> anyhow::Result<()> { // This is the function that merge the readers // by using the given merge function. let merge_readers = move |readers, merge| { - let mut writer = tempfile().map(|f| { + let mut writer = tempfile().and_then(|f| { create_writer(chunk_compression_type, chunk_compression_level, f) })?; let merger = merge_readers(readers, merge); merger.write_into(&mut writer)?; - writer_into_reader(writer) + writer_into_reader(writer, file_fusing_shrink_size) }; // The enum and the channel which is used to transfert