From f8f02af23e372ae0b07a44ba1eaf8503e62f0f67 Mon Sep 17 00:00:00 2001 From: mpostma Date: Thu, 4 Feb 2021 13:21:15 +0100 Subject: [PATCH] incorporate review changes --- Cargo.lock | 98 ------------------- Cargo.toml | 5 +- src/data/search.rs | 2 +- src/data/updates.rs | 15 ++- .../local_index_controller/index_store.rs | 40 ++++---- src/index_controller/mod.rs | 1 - src/option.rs | 16 +-- 7 files changed, 43 insertions(+), 134 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f036664f4..0b8128118 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -808,12 +808,6 @@ version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "80115a2dfde04491e181c2440a39e4be26e52d9ca4e92bed213f65b94e0b8db1" -[[package]] -name = "difference" -version = "2.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "524cbf6897b527295dff137cec09ecf3a05f4fddffd7dfcd1585403449e74198" - [[package]] name = "digest" version = "0.8.1" @@ -838,12 +832,6 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "212d0f5754cb6769937f4501cc0e67f4f4483c8d2c3e1e922ee9edbe4ab4c7c0" -[[package]] -name = "downcast" -version = "0.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4bb454f0228b18c7f4c3b0ebbee346ed9c52e7443b0999cd543ff3571205701d" - [[package]] name = "either" version = "1.6.1" @@ -949,15 +937,6 @@ dependencies = [ "miniz_oxide", ] -[[package]] -name = "float-cmp" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1267f4ac4f343772758f7b1bdcbe767c218bbab93bb432acbf5162bbf85a6c4" -dependencies = [ - "num-traits", -] - [[package]] name = "fnv" version = "1.0.7" @@ -974,12 +953,6 @@ dependencies = [ "percent-encoding", ] -[[package]] -name = "fragile" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69a039c3498dc930fe810151a34ba0c1c70b02b8625035592e74432f678591f2" - [[package]] name = "fs_extra" version = "1.2.0" @@ -1679,10 +1652,7 @@ dependencies = [ "memmap", "milli", "mime", - "mockall", - "obkv", "once_cell", - "page_size", "rand 0.7.3", "rayon", "regex", @@ -1880,33 +1850,6 @@ dependencies = [ "winapi 0.3.9", ] -[[package]] -name = "mockall" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "619634fd9149c4a06e66d8fd9256e85326d8eeee75abee4565ff76c92e4edfe0" -dependencies = [ - "cfg-if 1.0.0", - "downcast", - "fragile", - "lazy_static", - "mockall_derive", - "predicates", - "predicates-tree", -] - -[[package]] -name = "mockall_derive" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83714c95dbf4c24202f0f1b208f0f248e6bd65abfa8989303611a71c0f781548" -dependencies = [ - "cfg-if 1.0.0", - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "near-proximity" version = "0.1.0" @@ -1938,12 +1881,6 @@ dependencies = [ "libc", ] -[[package]] -name = "normalize-line-endings" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61807f77802ff30975e01f4f071c8ba10c022052f98b3294119f3e615d13e5be" - [[package]] name = "num-integer" version = "0.1.44" @@ -2212,35 +2149,6 @@ version = "0.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857" -[[package]] -name = "predicates" -version = "1.0.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eeb433456c1a57cc93554dea3ce40b4c19c4057e41c55d4a0f3d84ea71c325aa" -dependencies = [ - "difference", - "float-cmp", - "normalize-line-endings", - "predicates-core", - "regex", -] - -[[package]] -name = "predicates-core" -version = "1.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57e35a3326b75e49aa85f5dc6ec15b41108cf5aee58eabb1f274dd18b73c2451" - -[[package]] -name = "predicates-tree" -version = "1.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15f553275e5721409451eb85e15fd9a860a6e5ab4496eb215987502b5f5391f2" -dependencies = [ - "predicates-core", - "treeline", -] - [[package]] name = "proc-macro-error" version = "1.0.4" @@ -3218,12 +3126,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "treeline" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7f741b240f1a48843f9b8e0444fb55fb2a4ff67293b50a9179dfd5ea67f8d41" - [[package]] name = "trust-dns-proto" version = "0.19.6" diff --git a/Cargo.toml b/Cargo.toml index ad2d034ad..4668a6897 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,7 +30,7 @@ fst = "0.4.5" futures = "0.3.7" futures-util = "0.3.8" grenad = { git = "https://github.com/Kerollmops/grenad.git", rev = "3adcb26" } -heed = { version = "0.10.6", default-features = false, features = ["serde", "lmdb", "sync-read-txn"] } +heed = "0.10.6" http = "0.2.1" indexmap = { version = "1.3.2", features = ["serde-1"] } log = "0.4.8" @@ -55,8 +55,6 @@ tar = "0.4.29" tempfile = "3.1.0" tokio = { version = "0.2", features = ["full"] } dashmap = "4.0.2" -page_size = "0.4.2" -obkv = "0.1.1" uuid = "0.8.2" itertools = "0.10.0" @@ -72,7 +70,6 @@ serde_url_params = "0.2.0" tempdir = "0.3.7" assert-json-diff = { branch = "master", git = "https://github.com/qdequele/assert-json-diff" } tokio = { version = "0.2", features = ["macros", "time"] } -mockall = "0.9.0" [features] default = ["sentry"] diff --git a/src/data/search.rs b/src/data/search.rs index 2e05988aa..d0858d704 100644 --- a/src/data/search.rs +++ b/src/data/search.rs @@ -51,7 +51,7 @@ impl SearchQuery { if let Some(ref condition) = self.facet_condition { if !condition.trim().is_empty() { - let condition = FacetCondition::from_str(&rtxn, &index, &condition).unwrap(); + let condition = FacetCondition::from_str(&rtxn, &index, &condition)?; search.facet_condition(condition); } } diff --git a/src/data/updates.rs b/src/data/updates.rs index 27fc6537e..06aed8faa 100644 --- a/src/data/updates.rs +++ b/src/data/updates.rs @@ -25,7 +25,9 @@ impl Data { let file = tokio::fs::File::from_std(file?); let mut encoder = GzipEncoder::new(file); + let mut empty_update = true; while let Some(result) = stream.next().await { + empty_update = false; let bytes = &*result?; encoder.write_all(&bytes[..]).await?; } @@ -34,10 +36,19 @@ impl Data { let mut file = encoder.into_inner(); file.sync_all().await?; let file = file.into_std().await; - let mmap = unsafe { memmap::Mmap::map(&file)? }; + let index_controller = self.index_controller.clone(); - let update = tokio::task::spawn_blocking(move || index_controller.add_documents(index, method, format, &mmap[..])).await??; + let update = tokio::task::spawn_blocking(move ||{ + let mmap; + let bytes = if empty_update { + &[][..] + } else { + mmap = unsafe { memmap::Mmap::map(&file)? }; + &mmap + }; + index_controller.add_documents(index, method, format, &bytes) + }).await??; Ok(update.into()) } diff --git a/src/index_controller/local_index_controller/index_store.rs b/src/index_controller/local_index_controller/index_store.rs index 16df83d2c..483b6f5d6 100644 --- a/src/index_controller/local_index_controller/index_store.rs +++ b/src/index_controller/local_index_controller/index_store.rs @@ -17,8 +17,8 @@ type UpdateStore = super::update_store::UpdateStore, + name_to_uuid_meta: Database, uuid_to_index: DashMap, Arc)>, uuid_to_index_db: Database>, @@ -76,7 +76,7 @@ impl IndexStore { Ok(Self { env, - name_to_uuid_db: name_to_uid_db, + name_to_uuid_meta: name_to_uid_db, uuid_to_index: uid_to_index, uuid_to_index_db: uid_to_index_db, @@ -86,7 +86,7 @@ impl IndexStore { } fn index_uuid(&self, txn: &RoTxn, name: impl AsRef) -> anyhow::Result> { - match self.name_to_uuid_db.get(txn, name.as_ref())? { + match self.name_to_uuid_meta.get(txn, name.as_ref())? { Some(bytes) => { let uuid = Uuid::from_slice(bytes)?; Ok(Some(uuid)) @@ -115,7 +115,7 @@ impl IndexStore { } } - fn _get_index(&self, txn: &RoTxn, name: impl AsRef) -> anyhow::Result, Arc)>> { + fn get_index_txn(&self, txn: &RoTxn, name: impl AsRef) -> anyhow::Result, Arc)>> { match self.index_uuid(&txn, name)? { Some(uid) => self.retrieve_index(&txn, uid), None => Ok(None), @@ -124,7 +124,7 @@ impl IndexStore { pub fn index(&self, name: impl AsRef) -> anyhow::Result, Arc)>> { let txn = self.env.read_txn()?; - self._get_index(&txn, name) + self.get_index_txn(&txn, name) } pub fn get_or_create_index( @@ -134,7 +134,7 @@ impl IndexStore { index_size: u64, ) -> anyhow::Result<(Arc, Arc)> { let mut txn = self.env.write_txn()?; - match self._get_index(&txn, name.as_ref())? { + match self.get_index_txn(&txn, name.as_ref())? { Some(res) => Ok(res), None => { let uuid = Uuid::new_v4(); @@ -168,9 +168,9 @@ impl IndexStore { update_size: u64, index_size: u64, ) -> anyhow::Result<(Arc, Arc)> { - let meta = IndexMeta { update_size, index_size, uuid: uuid.clone() }; + let meta = IndexMeta { update_store_size: update_size, index_store_size: index_size, uuid: uuid.clone() }; - self.name_to_uuid_db.put(txn, name.as_ref(), uuid.as_bytes())?; + self.name_to_uuid_meta.put(txn, name.as_ref(), uuid.as_bytes())?; self.uuid_to_index_db.put(txn, uuid.as_bytes(), &meta)?; let path = self.env.path(); @@ -247,7 +247,7 @@ mod test { // insert an uuid in the the name_to_uuid_db: let uuid = Uuid::new_v4(); let mut txn = store.env.write_txn().unwrap(); - store.name_to_uuid_db.put(&mut txn, &name, uuid.as_bytes()).unwrap(); + store.name_to_uuid_meta.put(&mut txn, &name, uuid.as_bytes()).unwrap(); txn.commit().unwrap(); // check that the uuid is there @@ -264,7 +264,7 @@ mod test { let txn = store.env.read_txn().unwrap(); assert!(store.retrieve_index(&txn, uuid).unwrap().is_none()); - let meta = IndexMeta { update_size: 4096 * 100, index_size: 4096 * 100, uuid: uuid.clone() }; + let meta = IndexMeta { update_store_size: 4096 * 100, index_store_size: 4096 * 100, uuid: uuid.clone() }; let mut txn = store.env.write_txn().unwrap(); store.uuid_to_index_db.put(&mut txn, uuid.as_bytes(), &meta).unwrap(); txn.commit().unwrap(); @@ -286,9 +286,9 @@ mod test { assert!(store.index(&name).unwrap().is_none()); let uuid = Uuid::new_v4(); - let meta = IndexMeta { update_size: 4096 * 100, index_size: 4096 * 100, uuid: uuid.clone() }; + let meta = IndexMeta { update_store_size: 4096 * 100, index_store_size: 4096 * 100, uuid: uuid.clone() }; let mut txn = store.env.write_txn().unwrap(); - store.name_to_uuid_db.put(&mut txn, &name, uuid.as_bytes()).unwrap(); + store.name_to_uuid_meta.put(&mut txn, &name, uuid.as_bytes()).unwrap(); store.uuid_to_index_db.put(&mut txn, uuid.as_bytes(), &meta).unwrap(); txn.commit().unwrap(); @@ -303,11 +303,11 @@ mod test { store.get_or_create_index(&name, 4096 * 100, 4096 * 100).unwrap(); let txn = store.env.read_txn().unwrap(); - let uuid = store.name_to_uuid_db.get(&txn, &name).unwrap(); + let uuid = store.name_to_uuid_meta.get(&txn, &name).unwrap(); assert_eq!(store.uuid_to_index.len(), 1); assert!(uuid.is_some()); let uuid = Uuid::from_slice(uuid.unwrap()).unwrap(); - let meta = IndexMeta { update_size: 4096 * 100, index_size: 4096 * 100, uuid: uuid.clone() }; + let meta = IndexMeta { update_store_size: 4096 * 100, index_store_size: 4096 * 100, uuid: uuid.clone() }; assert_eq!(store.uuid_to_index_db.get(&txn, uuid.as_bytes()).unwrap(), Some(meta)); } @@ -322,11 +322,11 @@ mod test { let uuid = Uuid::new_v4(); let mut txn = store.env.write_txn().unwrap(); store.create_index(&mut txn, uuid, name, update_size, index_size).unwrap(); - let uuid = store.name_to_uuid_db.get(&txn, &name).unwrap(); + let uuid = store.name_to_uuid_meta.get(&txn, &name).unwrap(); assert_eq!(store.uuid_to_index.len(), 1); assert!(uuid.is_some()); let uuid = Uuid::from_slice(uuid.unwrap()).unwrap(); - let meta = IndexMeta { update_size , index_size, uuid: uuid.clone() }; + let meta = IndexMeta { update_store_size: update_size , index_store_size: index_size, uuid: uuid.clone() }; assert_eq!(store.uuid_to_index_db.get(&txn, uuid.as_bytes()).unwrap(), Some(meta)); } } diff --git a/src/index_controller/mod.rs b/src/index_controller/mod.rs index 0ea654dfb..d348ee876 100644 --- a/src/index_controller/mod.rs +++ b/src/index_controller/mod.rs @@ -16,7 +16,6 @@ pub use updates::{Processed, Processing, Failed}; pub type UpdateStatus = updates::UpdateStatus; - #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "type")] pub enum UpdateMeta { diff --git a/src/option.rs b/src/option.rs index dffa7d483..38f99880e 100644 --- a/src/option.rs +++ b/src/option.rs @@ -19,11 +19,11 @@ pub struct IndexerOpts { #[structopt(long, default_value = "100000")] // 100k pub log_every_n: usize, - /// MTBL max number of chunks in bytes. + /// Grenad max number of chunks in bytes. #[structopt(long)] pub max_nb_chunks: Option, - /// The maximum amount of memory to use for the MTBL buffer. It is recommended + /// The maximum amount of memory to use for the Grenad buffer. It is recommended /// to use something like 80%-90% of the available memory. /// /// It is automatically split by the number of jobs e.g. if you use 7 jobs @@ -37,7 +37,7 @@ pub struct IndexerOpts { pub linked_hash_map_size: usize, /// The name of the compression algorithm to use when compressing intermediate - /// chunks during indexing documents. + /// Grenad chunks while indexing documents. /// /// Choosing a fast algorithm will make the indexing faster but may consume more memory. #[structopt(long, default_value = "snappy", possible_values = &["snappy", "zlib", "lz4", "lz4hc", "zstd"])] @@ -55,7 +55,7 @@ pub struct IndexerOpts { #[structopt(long, default_value = "4 GiB")] pub chunk_fusing_shrink_size: Byte, - /// Enable the chunk fusing or not, this reduces the amount of disk used by a factor of 2. + /// Enable the chunk fusing or not, this reduces the amount of disk space used. #[structopt(long)] pub enable_chunk_fusing: bool, @@ -68,13 +68,13 @@ pub struct IndexerOpts { impl Default for IndexerOpts { fn default() -> Self { Self { - log_every_n: 0, + log_every_n: 100_000, max_nb_chunks: None, - max_memory: Byte::from_str("0Kb").unwrap(), - linked_hash_map_size: 0, + max_memory: Byte::from_str("1GiB").unwrap(), + linked_hash_map_size: 500, chunk_compression_type: CompressionType::None, chunk_compression_level: None, - chunk_fusing_shrink_size: Byte::from_str("0Kb").unwrap(), + chunk_fusing_shrink_size: Byte::from_str("4GiB").unwrap(), enable_chunk_fusing: false, indexing_jobs: None, }