incorporate review changes

This commit is contained in:
mpostma 2021-02-04 13:21:15 +01:00
parent 9af0a08122
commit f8f02af23e
No known key found for this signature in database
GPG Key ID: CBC8A7C1D7A28C3A
7 changed files with 43 additions and 134 deletions

98
Cargo.lock generated
View File

@ -808,12 +808,6 @@ version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "80115a2dfde04491e181c2440a39e4be26e52d9ca4e92bed213f65b94e0b8db1" checksum = "80115a2dfde04491e181c2440a39e4be26e52d9ca4e92bed213f65b94e0b8db1"
[[package]]
name = "difference"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "524cbf6897b527295dff137cec09ecf3a05f4fddffd7dfcd1585403449e74198"
[[package]] [[package]]
name = "digest" name = "digest"
version = "0.8.1" version = "0.8.1"
@ -838,12 +832,6 @@ version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "212d0f5754cb6769937f4501cc0e67f4f4483c8d2c3e1e922ee9edbe4ab4c7c0" checksum = "212d0f5754cb6769937f4501cc0e67f4f4483c8d2c3e1e922ee9edbe4ab4c7c0"
[[package]]
name = "downcast"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4bb454f0228b18c7f4c3b0ebbee346ed9c52e7443b0999cd543ff3571205701d"
[[package]] [[package]]
name = "either" name = "either"
version = "1.6.1" version = "1.6.1"
@ -949,15 +937,6 @@ dependencies = [
"miniz_oxide", "miniz_oxide",
] ]
[[package]]
name = "float-cmp"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1267f4ac4f343772758f7b1bdcbe767c218bbab93bb432acbf5162bbf85a6c4"
dependencies = [
"num-traits",
]
[[package]] [[package]]
name = "fnv" name = "fnv"
version = "1.0.7" version = "1.0.7"
@ -974,12 +953,6 @@ dependencies = [
"percent-encoding", "percent-encoding",
] ]
[[package]]
name = "fragile"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69a039c3498dc930fe810151a34ba0c1c70b02b8625035592e74432f678591f2"
[[package]] [[package]]
name = "fs_extra" name = "fs_extra"
version = "1.2.0" version = "1.2.0"
@ -1679,10 +1652,7 @@ dependencies = [
"memmap", "memmap",
"milli", "milli",
"mime", "mime",
"mockall",
"obkv",
"once_cell", "once_cell",
"page_size",
"rand 0.7.3", "rand 0.7.3",
"rayon", "rayon",
"regex", "regex",
@ -1880,33 +1850,6 @@ dependencies = [
"winapi 0.3.9", "winapi 0.3.9",
] ]
[[package]]
name = "mockall"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "619634fd9149c4a06e66d8fd9256e85326d8eeee75abee4565ff76c92e4edfe0"
dependencies = [
"cfg-if 1.0.0",
"downcast",
"fragile",
"lazy_static",
"mockall_derive",
"predicates",
"predicates-tree",
]
[[package]]
name = "mockall_derive"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "83714c95dbf4c24202f0f1b208f0f248e6bd65abfa8989303611a71c0f781548"
dependencies = [
"cfg-if 1.0.0",
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "near-proximity" name = "near-proximity"
version = "0.1.0" version = "0.1.0"
@ -1938,12 +1881,6 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "normalize-line-endings"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61807f77802ff30975e01f4f071c8ba10c022052f98b3294119f3e615d13e5be"
[[package]] [[package]]
name = "num-integer" name = "num-integer"
version = "0.1.44" version = "0.1.44"
@ -2212,35 +2149,6 @@ version = "0.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857" checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857"
[[package]]
name = "predicates"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eeb433456c1a57cc93554dea3ce40b4c19c4057e41c55d4a0f3d84ea71c325aa"
dependencies = [
"difference",
"float-cmp",
"normalize-line-endings",
"predicates-core",
"regex",
]
[[package]]
name = "predicates-core"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57e35a3326b75e49aa85f5dc6ec15b41108cf5aee58eabb1f274dd18b73c2451"
[[package]]
name = "predicates-tree"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15f553275e5721409451eb85e15fd9a860a6e5ab4496eb215987502b5f5391f2"
dependencies = [
"predicates-core",
"treeline",
]
[[package]] [[package]]
name = "proc-macro-error" name = "proc-macro-error"
version = "1.0.4" version = "1.0.4"
@ -3218,12 +3126,6 @@ dependencies = [
"tracing", "tracing",
] ]
[[package]]
name = "treeline"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a7f741b240f1a48843f9b8e0444fb55fb2a4ff67293b50a9179dfd5ea67f8d41"
[[package]] [[package]]
name = "trust-dns-proto" name = "trust-dns-proto"
version = "0.19.6" version = "0.19.6"

View File

@ -30,7 +30,7 @@ fst = "0.4.5"
futures = "0.3.7" futures = "0.3.7"
futures-util = "0.3.8" futures-util = "0.3.8"
grenad = { git = "https://github.com/Kerollmops/grenad.git", rev = "3adcb26" } grenad = { git = "https://github.com/Kerollmops/grenad.git", rev = "3adcb26" }
heed = { version = "0.10.6", default-features = false, features = ["serde", "lmdb", "sync-read-txn"] } heed = "0.10.6"
http = "0.2.1" http = "0.2.1"
indexmap = { version = "1.3.2", features = ["serde-1"] } indexmap = { version = "1.3.2", features = ["serde-1"] }
log = "0.4.8" log = "0.4.8"
@ -55,8 +55,6 @@ tar = "0.4.29"
tempfile = "3.1.0" tempfile = "3.1.0"
tokio = { version = "0.2", features = ["full"] } tokio = { version = "0.2", features = ["full"] }
dashmap = "4.0.2" dashmap = "4.0.2"
page_size = "0.4.2"
obkv = "0.1.1"
uuid = "0.8.2" uuid = "0.8.2"
itertools = "0.10.0" itertools = "0.10.0"
@ -72,7 +70,6 @@ serde_url_params = "0.2.0"
tempdir = "0.3.7" tempdir = "0.3.7"
assert-json-diff = { branch = "master", git = "https://github.com/qdequele/assert-json-diff" } assert-json-diff = { branch = "master", git = "https://github.com/qdequele/assert-json-diff" }
tokio = { version = "0.2", features = ["macros", "time"] } tokio = { version = "0.2", features = ["macros", "time"] }
mockall = "0.9.0"
[features] [features]
default = ["sentry"] default = ["sentry"]

View File

@ -51,7 +51,7 @@ impl SearchQuery {
if let Some(ref condition) = self.facet_condition { if let Some(ref condition) = self.facet_condition {
if !condition.trim().is_empty() { if !condition.trim().is_empty() {
let condition = FacetCondition::from_str(&rtxn, &index, &condition).unwrap(); let condition = FacetCondition::from_str(&rtxn, &index, &condition)?;
search.facet_condition(condition); search.facet_condition(condition);
} }
} }

View File

@ -25,7 +25,9 @@ impl Data {
let file = tokio::fs::File::from_std(file?); let file = tokio::fs::File::from_std(file?);
let mut encoder = GzipEncoder::new(file); let mut encoder = GzipEncoder::new(file);
let mut empty_update = true;
while let Some(result) = stream.next().await { while let Some(result) = stream.next().await {
empty_update = false;
let bytes = &*result?; let bytes = &*result?;
encoder.write_all(&bytes[..]).await?; encoder.write_all(&bytes[..]).await?;
} }
@ -34,10 +36,19 @@ impl Data {
let mut file = encoder.into_inner(); let mut file = encoder.into_inner();
file.sync_all().await?; file.sync_all().await?;
let file = file.into_std().await; let file = file.into_std().await;
let mmap = unsafe { memmap::Mmap::map(&file)? };
let index_controller = self.index_controller.clone(); let index_controller = self.index_controller.clone();
let update = tokio::task::spawn_blocking(move || index_controller.add_documents(index, method, format, &mmap[..])).await??; let update = tokio::task::spawn_blocking(move ||{
let mmap;
let bytes = if empty_update {
&[][..]
} else {
mmap = unsafe { memmap::Mmap::map(&file)? };
&mmap
};
index_controller.add_documents(index, method, format, &bytes)
}).await??;
Ok(update.into()) Ok(update.into())
} }

View File

@ -17,8 +17,8 @@ type UpdateStore = super::update_store::UpdateStore<UpdateMeta, UpdateResult, St
#[derive(Serialize, Deserialize, Debug, PartialEq)] #[derive(Serialize, Deserialize, Debug, PartialEq)]
struct IndexMeta { struct IndexMeta {
update_size: u64, update_store_size: u64,
index_size: u64, index_store_size: u64,
uuid: Uuid, uuid: Uuid,
} }
@ -36,11 +36,11 @@ impl IndexMeta {
create_dir_all(&index_path)?; create_dir_all(&index_path)?;
let mut options = EnvOpenOptions::new(); let mut options = EnvOpenOptions::new();
options.map_size(self.index_size as usize); options.map_size(self.index_store_size as usize);
let index = Arc::new(Index::new(options, index_path)?); let index = Arc::new(Index::new(options, index_path)?);
let mut options = EnvOpenOptions::new(); let mut options = EnvOpenOptions::new();
options.map_size(self.update_size as usize); options.map_size(self.update_store_size as usize);
let handler = UpdateHandler::new(indexer_options, index.clone(), thread_pool)?; let handler = UpdateHandler::new(indexer_options, index.clone(), thread_pool)?;
let update_store = UpdateStore::open(options, update_path, handler)?; let update_store = UpdateStore::open(options, update_path, handler)?;
@ -50,7 +50,7 @@ impl IndexMeta {
pub struct IndexStore { pub struct IndexStore {
env: Env, env: Env,
name_to_uuid_db: Database<Str, ByteSlice>, name_to_uuid_meta: Database<Str, ByteSlice>,
uuid_to_index: DashMap<Uuid, (Arc<Index>, Arc<UpdateStore>)>, uuid_to_index: DashMap<Uuid, (Arc<Index>, Arc<UpdateStore>)>,
uuid_to_index_db: Database<ByteSlice, SerdeJson<IndexMeta>>, uuid_to_index_db: Database<ByteSlice, SerdeJson<IndexMeta>>,
@ -76,7 +76,7 @@ impl IndexStore {
Ok(Self { Ok(Self {
env, env,
name_to_uuid_db: name_to_uid_db, name_to_uuid_meta: name_to_uid_db,
uuid_to_index: uid_to_index, uuid_to_index: uid_to_index,
uuid_to_index_db: uid_to_index_db, uuid_to_index_db: uid_to_index_db,
@ -86,7 +86,7 @@ impl IndexStore {
} }
fn index_uuid(&self, txn: &RoTxn, name: impl AsRef<str>) -> anyhow::Result<Option<Uuid>> { fn index_uuid(&self, txn: &RoTxn, name: impl AsRef<str>) -> anyhow::Result<Option<Uuid>> {
match self.name_to_uuid_db.get(txn, name.as_ref())? { match self.name_to_uuid_meta.get(txn, name.as_ref())? {
Some(bytes) => { Some(bytes) => {
let uuid = Uuid::from_slice(bytes)?; let uuid = Uuid::from_slice(bytes)?;
Ok(Some(uuid)) Ok(Some(uuid))
@ -115,7 +115,7 @@ impl IndexStore {
} }
} }
fn _get_index(&self, txn: &RoTxn, name: impl AsRef<str>) -> anyhow::Result<Option<(Arc<Index>, Arc<UpdateStore>)>> { fn get_index_txn(&self, txn: &RoTxn, name: impl AsRef<str>) -> anyhow::Result<Option<(Arc<Index>, Arc<UpdateStore>)>> {
match self.index_uuid(&txn, name)? { match self.index_uuid(&txn, name)? {
Some(uid) => self.retrieve_index(&txn, uid), Some(uid) => self.retrieve_index(&txn, uid),
None => Ok(None), None => Ok(None),
@ -124,7 +124,7 @@ impl IndexStore {
pub fn index(&self, name: impl AsRef<str>) -> anyhow::Result<Option<(Arc<Index>, Arc<UpdateStore>)>> { pub fn index(&self, name: impl AsRef<str>) -> anyhow::Result<Option<(Arc<Index>, Arc<UpdateStore>)>> {
let txn = self.env.read_txn()?; let txn = self.env.read_txn()?;
self._get_index(&txn, name) self.get_index_txn(&txn, name)
} }
pub fn get_or_create_index( pub fn get_or_create_index(
@ -134,7 +134,7 @@ impl IndexStore {
index_size: u64, index_size: u64,
) -> anyhow::Result<(Arc<Index>, Arc<UpdateStore>)> { ) -> anyhow::Result<(Arc<Index>, Arc<UpdateStore>)> {
let mut txn = self.env.write_txn()?; let mut txn = self.env.write_txn()?;
match self._get_index(&txn, name.as_ref())? { match self.get_index_txn(&txn, name.as_ref())? {
Some(res) => Ok(res), Some(res) => Ok(res),
None => { None => {
let uuid = Uuid::new_v4(); let uuid = Uuid::new_v4();
@ -168,9 +168,9 @@ impl IndexStore {
update_size: u64, update_size: u64,
index_size: u64, index_size: u64,
) -> anyhow::Result<(Arc<Index>, Arc<UpdateStore>)> { ) -> anyhow::Result<(Arc<Index>, Arc<UpdateStore>)> {
let meta = IndexMeta { update_size, index_size, uuid: uuid.clone() }; let meta = IndexMeta { update_store_size: update_size, index_store_size: index_size, uuid: uuid.clone() };
self.name_to_uuid_db.put(txn, name.as_ref(), uuid.as_bytes())?; self.name_to_uuid_meta.put(txn, name.as_ref(), uuid.as_bytes())?;
self.uuid_to_index_db.put(txn, uuid.as_bytes(), &meta)?; self.uuid_to_index_db.put(txn, uuid.as_bytes(), &meta)?;
let path = self.env.path(); let path = self.env.path();
@ -247,7 +247,7 @@ mod test {
// insert an uuid in the the name_to_uuid_db: // insert an uuid in the the name_to_uuid_db:
let uuid = Uuid::new_v4(); let uuid = Uuid::new_v4();
let mut txn = store.env.write_txn().unwrap(); let mut txn = store.env.write_txn().unwrap();
store.name_to_uuid_db.put(&mut txn, &name, uuid.as_bytes()).unwrap(); store.name_to_uuid_meta.put(&mut txn, &name, uuid.as_bytes()).unwrap();
txn.commit().unwrap(); txn.commit().unwrap();
// check that the uuid is there // check that the uuid is there
@ -264,7 +264,7 @@ mod test {
let txn = store.env.read_txn().unwrap(); let txn = store.env.read_txn().unwrap();
assert!(store.retrieve_index(&txn, uuid).unwrap().is_none()); assert!(store.retrieve_index(&txn, uuid).unwrap().is_none());
let meta = IndexMeta { update_size: 4096 * 100, index_size: 4096 * 100, uuid: uuid.clone() }; let meta = IndexMeta { update_store_size: 4096 * 100, index_store_size: 4096 * 100, uuid: uuid.clone() };
let mut txn = store.env.write_txn().unwrap(); let mut txn = store.env.write_txn().unwrap();
store.uuid_to_index_db.put(&mut txn, uuid.as_bytes(), &meta).unwrap(); store.uuid_to_index_db.put(&mut txn, uuid.as_bytes(), &meta).unwrap();
txn.commit().unwrap(); txn.commit().unwrap();
@ -286,9 +286,9 @@ mod test {
assert!(store.index(&name).unwrap().is_none()); assert!(store.index(&name).unwrap().is_none());
let uuid = Uuid::new_v4(); let uuid = Uuid::new_v4();
let meta = IndexMeta { update_size: 4096 * 100, index_size: 4096 * 100, uuid: uuid.clone() }; let meta = IndexMeta { update_store_size: 4096 * 100, index_store_size: 4096 * 100, uuid: uuid.clone() };
let mut txn = store.env.write_txn().unwrap(); let mut txn = store.env.write_txn().unwrap();
store.name_to_uuid_db.put(&mut txn, &name, uuid.as_bytes()).unwrap(); store.name_to_uuid_meta.put(&mut txn, &name, uuid.as_bytes()).unwrap();
store.uuid_to_index_db.put(&mut txn, uuid.as_bytes(), &meta).unwrap(); store.uuid_to_index_db.put(&mut txn, uuid.as_bytes(), &meta).unwrap();
txn.commit().unwrap(); txn.commit().unwrap();
@ -303,11 +303,11 @@ mod test {
store.get_or_create_index(&name, 4096 * 100, 4096 * 100).unwrap(); store.get_or_create_index(&name, 4096 * 100, 4096 * 100).unwrap();
let txn = store.env.read_txn().unwrap(); let txn = store.env.read_txn().unwrap();
let uuid = store.name_to_uuid_db.get(&txn, &name).unwrap(); let uuid = store.name_to_uuid_meta.get(&txn, &name).unwrap();
assert_eq!(store.uuid_to_index.len(), 1); assert_eq!(store.uuid_to_index.len(), 1);
assert!(uuid.is_some()); assert!(uuid.is_some());
let uuid = Uuid::from_slice(uuid.unwrap()).unwrap(); let uuid = Uuid::from_slice(uuid.unwrap()).unwrap();
let meta = IndexMeta { update_size: 4096 * 100, index_size: 4096 * 100, uuid: uuid.clone() }; let meta = IndexMeta { update_store_size: 4096 * 100, index_store_size: 4096 * 100, uuid: uuid.clone() };
assert_eq!(store.uuid_to_index_db.get(&txn, uuid.as_bytes()).unwrap(), Some(meta)); assert_eq!(store.uuid_to_index_db.get(&txn, uuid.as_bytes()).unwrap(), Some(meta));
} }
@ -322,11 +322,11 @@ mod test {
let uuid = Uuid::new_v4(); let uuid = Uuid::new_v4();
let mut txn = store.env.write_txn().unwrap(); let mut txn = store.env.write_txn().unwrap();
store.create_index(&mut txn, uuid, name, update_size, index_size).unwrap(); store.create_index(&mut txn, uuid, name, update_size, index_size).unwrap();
let uuid = store.name_to_uuid_db.get(&txn, &name).unwrap(); let uuid = store.name_to_uuid_meta.get(&txn, &name).unwrap();
assert_eq!(store.uuid_to_index.len(), 1); assert_eq!(store.uuid_to_index.len(), 1);
assert!(uuid.is_some()); assert!(uuid.is_some());
let uuid = Uuid::from_slice(uuid.unwrap()).unwrap(); let uuid = Uuid::from_slice(uuid.unwrap()).unwrap();
let meta = IndexMeta { update_size , index_size, uuid: uuid.clone() }; let meta = IndexMeta { update_store_size: update_size , index_store_size: index_size, uuid: uuid.clone() };
assert_eq!(store.uuid_to_index_db.get(&txn, uuid.as_bytes()).unwrap(), Some(meta)); assert_eq!(store.uuid_to_index_db.get(&txn, uuid.as_bytes()).unwrap(), Some(meta));
} }
} }

View File

@ -16,7 +16,6 @@ pub use updates::{Processed, Processing, Failed};
pub type UpdateStatus = updates::UpdateStatus<UpdateMeta, UpdateResult, String>; pub type UpdateStatus = updates::UpdateStatus<UpdateMeta, UpdateResult, String>;
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")] #[serde(tag = "type")]
pub enum UpdateMeta { pub enum UpdateMeta {

View File

@ -19,11 +19,11 @@ pub struct IndexerOpts {
#[structopt(long, default_value = "100000")] // 100k #[structopt(long, default_value = "100000")] // 100k
pub log_every_n: usize, pub log_every_n: usize,
/// MTBL max number of chunks in bytes. /// Grenad max number of chunks in bytes.
#[structopt(long)] #[structopt(long)]
pub max_nb_chunks: Option<usize>, pub max_nb_chunks: Option<usize>,
/// The maximum amount of memory to use for the MTBL buffer. It is recommended /// The maximum amount of memory to use for the Grenad buffer. It is recommended
/// to use something like 80%-90% of the available memory. /// to use something like 80%-90% of the available memory.
/// ///
/// It is automatically split by the number of jobs e.g. if you use 7 jobs /// It is automatically split by the number of jobs e.g. if you use 7 jobs
@ -37,7 +37,7 @@ pub struct IndexerOpts {
pub linked_hash_map_size: usize, pub linked_hash_map_size: usize,
/// The name of the compression algorithm to use when compressing intermediate /// The name of the compression algorithm to use when compressing intermediate
/// chunks during indexing documents. /// Grenad chunks while indexing documents.
/// ///
/// Choosing a fast algorithm will make the indexing faster but may consume more memory. /// Choosing a fast algorithm will make the indexing faster but may consume more memory.
#[structopt(long, default_value = "snappy", possible_values = &["snappy", "zlib", "lz4", "lz4hc", "zstd"])] #[structopt(long, default_value = "snappy", possible_values = &["snappy", "zlib", "lz4", "lz4hc", "zstd"])]
@ -55,7 +55,7 @@ pub struct IndexerOpts {
#[structopt(long, default_value = "4 GiB")] #[structopt(long, default_value = "4 GiB")]
pub chunk_fusing_shrink_size: Byte, pub chunk_fusing_shrink_size: Byte,
/// Enable the chunk fusing or not, this reduces the amount of disk used by a factor of 2. /// Enable the chunk fusing or not, this reduces the amount of disk space used.
#[structopt(long)] #[structopt(long)]
pub enable_chunk_fusing: bool, pub enable_chunk_fusing: bool,
@ -68,13 +68,13 @@ pub struct IndexerOpts {
impl Default for IndexerOpts { impl Default for IndexerOpts {
fn default() -> Self { fn default() -> Self {
Self { Self {
log_every_n: 0, log_every_n: 100_000,
max_nb_chunks: None, max_nb_chunks: None,
max_memory: Byte::from_str("0Kb").unwrap(), max_memory: Byte::from_str("1GiB").unwrap(),
linked_hash_map_size: 0, linked_hash_map_size: 500,
chunk_compression_type: CompressionType::None, chunk_compression_type: CompressionType::None,
chunk_compression_level: None, chunk_compression_level: None,
chunk_fusing_shrink_size: Byte::from_str("0Kb").unwrap(), chunk_fusing_shrink_size: Byte::from_str("4GiB").unwrap(),
enable_chunk_fusing: false, enable_chunk_fusing: false,
indexing_jobs: None, indexing_jobs: None,
} }