From aad5b789a7e2ed7c8825c8ff16b4f76efb410d0a Mon Sep 17 00:00:00 2001 From: mpostma Date: Mon, 15 Feb 2021 23:37:08 +0100 Subject: [PATCH] review edits --- .../local_index_controller/index_store.rs | 189 ++++++++++++------ .../local_index_controller/update_store.rs | 8 +- 2 files changed, 130 insertions(+), 67 deletions(-) diff --git a/src/index_controller/local_index_controller/index_store.rs b/src/index_controller/local_index_controller/index_store.rs index f2e00e4fe..3fe8a3f59 100644 --- a/src/index_controller/local_index_controller/index_store.rs +++ b/src/index_controller/local_index_controller/index_store.rs @@ -1,21 +1,24 @@ -use std::time::Duration; use std::fs::{create_dir_all, remove_dir_all}; use std::path::{Path, PathBuf}; use std::sync::Arc; +use std::time::Duration; use anyhow::{bail, Context}; use chrono::{DateTime, Utc}; -use dashmap::{DashMap, mapref::entry::Entry}; -use heed::{Env, EnvOpenOptions, Database, types::{Str, SerdeJson, ByteSlice}, RoTxn, RwTxn}; +use dashmap::{mapref::entry::Entry, DashMap}; +use heed::{ + types::{ByteSlice, SerdeJson, Str}, + Database, Env, EnvOpenOptions, RoTxn, RwTxn, +}; use log::{error, info}; use milli::Index; use rayon::ThreadPool; -use serde::{Serialize, Deserialize}; +use serde::{Deserialize, Serialize}; use uuid::Uuid; -use crate::option::IndexerOpts; use super::update_handler::UpdateHandler; use super::{UpdateMeta, UpdateResult}; +use crate::option::IndexerOpts; type UpdateStore = super::update_store::UpdateStore; @@ -94,7 +97,8 @@ impl IndexStore { pub fn delete(&self, index_uid: impl AsRef) -> anyhow::Result<()> { // we remove the references to the index from the index map so it is not accessible anymore let mut txn = self.env.write_txn()?; - let uuid = self.index_uuid(&txn, &index_uid)? + let uuid = self + .index_uuid(&txn, &index_uid)? .with_context(|| format!("Index {:?} doesn't exist", index_uid.as_ref()))?; self.name_to_uuid.delete(&mut txn, index_uid.as_ref())?; self.uuid_to_index_meta.delete(&mut txn, uuid.as_bytes())?; @@ -119,8 +123,7 @@ impl IndexStore { if let Err(e) = updates.abort_pendings() { error!( "error aborting pending updates when deleting index {:?}: {}", - index_uid, - e + index_uid, e ); } let updates = get_arc_ownership_blocking(updates); @@ -156,23 +159,26 @@ impl IndexStore { let uuid = Uuid::from_slice(bytes)?; Ok(Some(uuid)) } - None => Ok(None) + None => Ok(None), } } - fn retrieve_index(&self, txn: &RoTxn, uid: Uuid) -> anyhow::Result, Arc)>> { + fn retrieve_index( + &self, + txn: &RoTxn, + uid: Uuid, + ) -> anyhow::Result, Arc)>> { match self.uuid_to_index.entry(uid.clone()) { - Entry::Vacant(entry) => { - match self.uuid_to_index_meta.get(txn, uid.as_bytes())? { - Some(meta) => { - let path = self.env.path(); - let (index, updates) = meta.open(path, self.thread_pool.clone(), &self.indexer_options)?; - entry.insert((index.clone(), updates.clone())); - Ok(Some((index, updates))) - }, - None => Ok(None) + Entry::Vacant(entry) => match self.uuid_to_index_meta.get(txn, uid.as_bytes())? { + Some(meta) => { + let path = self.env.path(); + let (index, updates) = + meta.open(path, self.thread_pool.clone(), &self.indexer_options)?; + entry.insert((index.clone(), updates.clone())); + Ok(Some((index, updates))) } - } + None => Ok(None), + }, Entry::Occupied(entry) => { let (index, updates) = entry.get(); Ok(Some((index.clone(), updates.clone()))) @@ -180,14 +186,21 @@ impl IndexStore { } } - fn get_index_txn(&self, txn: &RoTxn, name: impl AsRef) -> anyhow::Result, Arc)>> { + fn get_index_txn( + &self, + txn: &RoTxn, + name: impl AsRef, + ) -> anyhow::Result, Arc)>> { match self.index_uuid(&txn, name)? { Some(uid) => self.retrieve_index(&txn, uid), None => Ok(None), } } - pub fn index(&self, name: impl AsRef) -> anyhow::Result, Arc)>> { + pub fn index( + &self, + name: impl AsRef, + ) -> anyhow::Result, Arc)>> { let txn = self.env.read_txn()?; self.get_index_txn(&txn, name) } @@ -199,7 +212,8 @@ impl IndexStore { F: FnOnce(&Index) -> anyhow::Result, { let mut txn = self.env.write_txn()?; - let (index, _) = self.get_index_txn(&txn, &name)? + let (index, _) = self + .get_index_txn(&txn, &name)? .with_context(|| format!("Index {:?} doesn't exist", name.as_ref()))?; let result = f(index.as_ref()); match result { @@ -208,18 +222,26 @@ impl IndexStore { txn.commit()?; Ok((ret, meta)) } - Err(e) => Err(e) + Err(e) => Err(e), } } - pub fn index_with_meta(&self, name: impl AsRef) -> anyhow::Result, IndexMeta)>> { + pub fn index_with_meta( + &self, + name: impl AsRef, + ) -> anyhow::Result, IndexMeta)>> { let txn = self.env.read_txn()?; let uuid = self.index_uuid(&txn, &name)?; match uuid { Some(uuid) => { - let meta = self.uuid_to_index_meta.get(&txn, uuid.as_bytes())? - .with_context(|| format!("unable to retrieve metadata for index {:?}", name.as_ref()))?; - let (index, _) = self.retrieve_index(&txn, uuid)? + let meta = self + .uuid_to_index_meta + .get(&txn, uuid.as_bytes())? + .with_context(|| { + format!("unable to retrieve metadata for index {:?}", name.as_ref()) + })?; + let (index, _) = self + .retrieve_index(&txn, uuid)? .with_context(|| format!("unable to retrieve index {:?}", name.as_ref()))?; Ok(Some((index, meta))) } @@ -227,13 +249,20 @@ impl IndexStore { } } - fn update_meta(&self, txn: &mut RwTxn, name: impl AsRef, f: F) -> anyhow::Result + fn update_meta( + &self, + txn: &mut RwTxn, + name: impl AsRef, + f: F, + ) -> anyhow::Result where - F: FnOnce(&mut IndexMeta) + F: FnOnce(&mut IndexMeta), { - let uuid = self.index_uuid(txn, &name)? - .with_context(|| format!("Index {:?} doesn't exist", name.as_ref()))?; - let mut meta = self.uuid_to_index_meta + let uuid = self + .index_uuid(txn, &name)? + .with_context(|| format!("Index {:?} doesn't exist", name.as_ref()))?; + let mut meta = self + .uuid_to_index_meta .get(txn, uuid.as_bytes())? .with_context(|| format!("couldn't retrieve metadata for index {:?}", name.as_ref()))?; f(&mut meta); @@ -252,7 +281,8 @@ impl IndexStore { Some(res) => Ok(res), None => { let uuid = Uuid::new_v4(); - let (index, updates, _) = self.create_index_txn(&mut txn, uuid, name, update_size, index_size)?; + let (index, updates, _) = + self.create_index_txn(&mut txn, uuid, name, update_size, index_size)?; // If we fail to commit the transaction, we must delete the database from the // file-system. if let Err(e) = txn.commit() { @@ -260,7 +290,7 @@ impl IndexStore { return Err(e)?; } Ok((index, updates)) - }, + } } } @@ -275,7 +305,8 @@ impl IndexStore { self.uuid_to_index.remove(&uuid); } - fn create_index_txn( &self, + fn create_index_txn( + &self, txn: &mut RwTxn, uuid: Uuid, name: impl AsRef, @@ -296,15 +327,17 @@ impl IndexStore { self.uuid_to_index_meta.put(txn, uuid.as_bytes(), &meta)?; let path = self.env.path(); - let (index, update_store) = match meta.open(path, self.thread_pool.clone(), &self.indexer_options) { - Ok(res) => res, - Err(e) => { - self.clean_db(uuid); - return Err(e) - } - }; + let (index, update_store) = + match meta.open(path, self.thread_pool.clone(), &self.indexer_options) { + Ok(res) => res, + Err(e) => { + self.clean_db(uuid); + return Err(e); + } + }; - self.uuid_to_index.insert(uuid, (index.clone(), update_store.clone())); + self.uuid_to_index + .insert(uuid, (index.clone(), update_store.clone())); Ok((index, update_store, meta)) } @@ -338,19 +371,24 @@ impl IndexStore { /// This method will force all the indexes to be loaded. pub fn list_indexes(&self) -> anyhow::Result)>> { let txn = self.env.read_txn()?; - let metas = self.name_to_uuid - .iter(&txn)? - .filter_map(|entry| entry - .map_err(|e| { error!("error decoding entry while listing indexes: {}", e); e }).ok()); + let metas = self.name_to_uuid.iter(&txn)?.filter_map(|entry| { + entry + .map_err(|e| { + error!("error decoding entry while listing indexes: {}", e); + e + }) + .ok() + }); let mut indexes = Vec::new(); for (name, uuid) in metas { // get index to retrieve primary key - let (index, _) = self.get_index_txn(&txn, name)? + let (index, _) = self + .get_index_txn(&txn, name)? .with_context(|| format!("could not load index {:?}", name))?; - let primary_key = index.primary_key(&index.read_txn()?)? - .map(String::from); + let primary_key = index.primary_key(&index.read_txn()?)?.map(String::from); // retieve meta - let meta = self.uuid_to_index_meta + let meta = self + .uuid_to_index_meta .get(&txn, &uuid)? .with_context(|| format!("could not retieve meta for index {:?}", name))?; indexes.push((name.to_owned(), meta, primary_key)); @@ -373,7 +411,10 @@ fn get_arc_ownership_blocking(mut item: Arc) -> T { } } -fn open_or_create_database(env: &Env, name: Option<&str>) -> anyhow::Result> { +fn open_or_create_database( + env: &Env, + name: Option<&str>, +) -> anyhow::Result> { match env.open_database::(name)? { Some(db) => Ok(db), None => Ok(env.create_database::(name)?), @@ -432,7 +473,10 @@ mod test { // insert an uuid in the the name_to_uuid_db: let uuid = Uuid::new_v4(); let mut txn = store.env.write_txn().unwrap(); - store.name_to_uuid.put(&mut txn, &name, uuid.as_bytes()).unwrap(); + store + .name_to_uuid + .put(&mut txn, &name, uuid.as_bytes()) + .unwrap(); txn.commit().unwrap(); // check that the uuid is there @@ -460,7 +504,10 @@ mod test { updated_at, }; let mut txn = store.env.write_txn().unwrap(); - store.uuid_to_index_meta.put(&mut txn, uuid.as_bytes(), &meta).unwrap(); + store + .uuid_to_index_meta + .put(&mut txn, uuid.as_bytes(), &meta) + .unwrap(); txn.commit().unwrap(); // the index cache should be empty @@ -491,8 +538,14 @@ mod test { updated_at, }; let mut txn = store.env.write_txn().unwrap(); - store.name_to_uuid.put(&mut txn, &name, uuid.as_bytes()).unwrap(); - store.uuid_to_index_meta.put(&mut txn, uuid.as_bytes(), &meta).unwrap(); + store + .name_to_uuid + .put(&mut txn, &name, uuid.as_bytes()) + .unwrap(); + store + .uuid_to_index_meta + .put(&mut txn, uuid.as_bytes(), &meta) + .unwrap(); txn.commit().unwrap(); assert!(store.index(&name).unwrap().is_some()); @@ -506,13 +559,19 @@ mod test { let update_store_size = 4096 * 100; let index_store_size = 4096 * 100; - store.get_or_create_index(&name, update_store_size, index_store_size).unwrap(); + store + .get_or_create_index(&name, update_store_size, index_store_size) + .unwrap(); let txn = store.env.read_txn().unwrap(); - let uuid = store.name_to_uuid.get(&txn, &name).unwrap(); + let uuid = store.name_to_uuid.get(&txn, &name).unwrap(); assert_eq!(store.uuid_to_index.len(), 1); assert!(uuid.is_some()); let uuid = Uuid::from_slice(uuid.unwrap()).unwrap(); - let meta = store.uuid_to_index_meta.get(&txn, uuid.as_bytes()).unwrap().unwrap(); + let meta = store + .uuid_to_index_meta + .get(&txn, uuid.as_bytes()) + .unwrap() + .unwrap(); assert_eq!(meta.update_store_size, update_store_size); assert_eq!(meta.index_store_size, index_store_size); assert_eq!(meta.uuid, uuid); @@ -528,12 +587,18 @@ mod test { let index_store_size = 4096 * 100; let uuid = Uuid::new_v4(); let mut txn = store.env.write_txn().unwrap(); - store.create_index_txn(&mut txn, uuid, name, update_store_size, index_store_size).unwrap(); + store + .create_index_txn(&mut txn, uuid, name, update_store_size, index_store_size) + .unwrap(); let uuid = store.name_to_uuid.get(&txn, &name).unwrap(); assert_eq!(store.uuid_to_index.len(), 1); assert!(uuid.is_some()); let uuid = Uuid::from_slice(uuid.unwrap()).unwrap(); - let meta = store.uuid_to_index_meta.get(&txn, uuid.as_bytes()).unwrap().unwrap(); + let meta = store + .uuid_to_index_meta + .get(&txn, uuid.as_bytes()) + .unwrap() + .unwrap(); assert_eq!(meta.update_store_size, update_store_size); assert_eq!(meta.index_store_size, index_store_size); assert_eq!(meta.uuid, uuid); diff --git a/src/index_controller/local_index_controller/update_store.rs b/src/index_controller/local_index_controller/update_store.rs index 32f734ad4..b025ff090 100644 --- a/src/index_controller/local_index_controller/update_store.rs +++ b/src/index_controller/local_index_controller/update_store.rs @@ -66,7 +66,7 @@ where processing, }); - // We need a week reference so we can take ownership on the arc later when we + // We need a weak reference so we can take ownership on the arc later when we // want to close the index. let update_store_weak = Arc::downgrade(&update_store); std::thread::spawn(move || { @@ -81,7 +81,7 @@ where Err(e) => eprintln!("error while processing update: {}", e), } } - // the ownership on the arc has been taken, we need to exit + // the ownership on the arc has been taken, we need to exit. None => break 'outer, } } @@ -92,9 +92,7 @@ where } pub fn prepare_for_closing(self) -> heed::EnvClosingEvent { - // We ignore this error, since that would mean the event loop is already closed. - let closing_event = self.env.prepare_for_closing(); - closing_event + self.env.prepare_for_closing() } /// Returns the new biggest id to use to store the new update.