review edits

This commit is contained in:
mpostma 2021-02-15 23:37:08 +01:00
parent 5c0b541248
commit aad5b789a7
No known key found for this signature in database
GPG Key ID: CBC8A7C1D7A28C3A
2 changed files with 130 additions and 67 deletions

View File

@ -1,21 +1,24 @@
use std::time::Duration;
use std::fs::{create_dir_all, remove_dir_all}; use std::fs::{create_dir_all, remove_dir_all};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration;
use anyhow::{bail, Context}; use anyhow::{bail, Context};
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use dashmap::{DashMap, mapref::entry::Entry}; use dashmap::{mapref::entry::Entry, DashMap};
use heed::{Env, EnvOpenOptions, Database, types::{Str, SerdeJson, ByteSlice}, RoTxn, RwTxn}; use heed::{
types::{ByteSlice, SerdeJson, Str},
Database, Env, EnvOpenOptions, RoTxn, RwTxn,
};
use log::{error, info}; use log::{error, info};
use milli::Index; use milli::Index;
use rayon::ThreadPool; use rayon::ThreadPool;
use serde::{Serialize, Deserialize}; use serde::{Deserialize, Serialize};
use uuid::Uuid; use uuid::Uuid;
use crate::option::IndexerOpts;
use super::update_handler::UpdateHandler; use super::update_handler::UpdateHandler;
use super::{UpdateMeta, UpdateResult}; use super::{UpdateMeta, UpdateResult};
use crate::option::IndexerOpts;
type UpdateStore = super::update_store::UpdateStore<UpdateMeta, UpdateResult, String>; type UpdateStore = super::update_store::UpdateStore<UpdateMeta, UpdateResult, String>;
@ -94,7 +97,8 @@ impl IndexStore {
pub fn delete(&self, index_uid: impl AsRef<str>) -> anyhow::Result<()> { pub fn delete(&self, index_uid: impl AsRef<str>) -> anyhow::Result<()> {
// we remove the references to the index from the index map so it is not accessible anymore // we remove the references to the index from the index map so it is not accessible anymore
let mut txn = self.env.write_txn()?; let mut txn = self.env.write_txn()?;
let uuid = self.index_uuid(&txn, &index_uid)? let uuid = self
.index_uuid(&txn, &index_uid)?
.with_context(|| format!("Index {:?} doesn't exist", index_uid.as_ref()))?; .with_context(|| format!("Index {:?} doesn't exist", index_uid.as_ref()))?;
self.name_to_uuid.delete(&mut txn, index_uid.as_ref())?; self.name_to_uuid.delete(&mut txn, index_uid.as_ref())?;
self.uuid_to_index_meta.delete(&mut txn, uuid.as_bytes())?; self.uuid_to_index_meta.delete(&mut txn, uuid.as_bytes())?;
@ -119,8 +123,7 @@ impl IndexStore {
if let Err(e) = updates.abort_pendings() { if let Err(e) = updates.abort_pendings() {
error!( error!(
"error aborting pending updates when deleting index {:?}: {}", "error aborting pending updates when deleting index {:?}: {}",
index_uid, index_uid, e
e
); );
} }
let updates = get_arc_ownership_blocking(updates); let updates = get_arc_ownership_blocking(updates);
@ -156,23 +159,26 @@ impl IndexStore {
let uuid = Uuid::from_slice(bytes)?; let uuid = Uuid::from_slice(bytes)?;
Ok(Some(uuid)) Ok(Some(uuid))
} }
None => Ok(None) None => Ok(None),
} }
} }
fn retrieve_index(&self, txn: &RoTxn, uid: Uuid) -> anyhow::Result<Option<(Arc<Index>, Arc<UpdateStore>)>> { fn retrieve_index(
&self,
txn: &RoTxn,
uid: Uuid,
) -> anyhow::Result<Option<(Arc<Index>, Arc<UpdateStore>)>> {
match self.uuid_to_index.entry(uid.clone()) { match self.uuid_to_index.entry(uid.clone()) {
Entry::Vacant(entry) => { Entry::Vacant(entry) => match self.uuid_to_index_meta.get(txn, uid.as_bytes())? {
match self.uuid_to_index_meta.get(txn, uid.as_bytes())? {
Some(meta) => { Some(meta) => {
let path = self.env.path(); let path = self.env.path();
let (index, updates) = meta.open(path, self.thread_pool.clone(), &self.indexer_options)?; let (index, updates) =
meta.open(path, self.thread_pool.clone(), &self.indexer_options)?;
entry.insert((index.clone(), updates.clone())); entry.insert((index.clone(), updates.clone()));
Ok(Some((index, updates))) Ok(Some((index, updates)))
}
None => Ok(None),
}, },
None => Ok(None)
}
}
Entry::Occupied(entry) => { Entry::Occupied(entry) => {
let (index, updates) = entry.get(); let (index, updates) = entry.get();
Ok(Some((index.clone(), updates.clone()))) Ok(Some((index.clone(), updates.clone())))
@ -180,14 +186,21 @@ impl IndexStore {
} }
} }
fn get_index_txn(&self, txn: &RoTxn, name: impl AsRef<str>) -> anyhow::Result<Option<(Arc<Index>, Arc<UpdateStore>)>> { fn get_index_txn(
&self,
txn: &RoTxn,
name: impl AsRef<str>,
) -> anyhow::Result<Option<(Arc<Index>, Arc<UpdateStore>)>> {
match self.index_uuid(&txn, name)? { match self.index_uuid(&txn, name)? {
Some(uid) => self.retrieve_index(&txn, uid), Some(uid) => self.retrieve_index(&txn, uid),
None => Ok(None), None => Ok(None),
} }
} }
pub fn index(&self, name: impl AsRef<str>) -> anyhow::Result<Option<(Arc<Index>, Arc<UpdateStore>)>> { pub fn index(
&self,
name: impl AsRef<str>,
) -> anyhow::Result<Option<(Arc<Index>, Arc<UpdateStore>)>> {
let txn = self.env.read_txn()?; let txn = self.env.read_txn()?;
self.get_index_txn(&txn, name) self.get_index_txn(&txn, name)
} }
@ -199,7 +212,8 @@ impl IndexStore {
F: FnOnce(&Index) -> anyhow::Result<T>, F: FnOnce(&Index) -> anyhow::Result<T>,
{ {
let mut txn = self.env.write_txn()?; let mut txn = self.env.write_txn()?;
let (index, _) = self.get_index_txn(&txn, &name)? let (index, _) = self
.get_index_txn(&txn, &name)?
.with_context(|| format!("Index {:?} doesn't exist", name.as_ref()))?; .with_context(|| format!("Index {:?} doesn't exist", name.as_ref()))?;
let result = f(index.as_ref()); let result = f(index.as_ref());
match result { match result {
@ -208,18 +222,26 @@ impl IndexStore {
txn.commit()?; txn.commit()?;
Ok((ret, meta)) Ok((ret, meta))
} }
Err(e) => Err(e) Err(e) => Err(e),
} }
} }
pub fn index_with_meta(&self, name: impl AsRef<str>) -> anyhow::Result<Option<(Arc<Index>, IndexMeta)>> { pub fn index_with_meta(
&self,
name: impl AsRef<str>,
) -> anyhow::Result<Option<(Arc<Index>, IndexMeta)>> {
let txn = self.env.read_txn()?; let txn = self.env.read_txn()?;
let uuid = self.index_uuid(&txn, &name)?; let uuid = self.index_uuid(&txn, &name)?;
match uuid { match uuid {
Some(uuid) => { Some(uuid) => {
let meta = self.uuid_to_index_meta.get(&txn, uuid.as_bytes())? let meta = self
.with_context(|| format!("unable to retrieve metadata for index {:?}", name.as_ref()))?; .uuid_to_index_meta
let (index, _) = self.retrieve_index(&txn, uuid)? .get(&txn, uuid.as_bytes())?
.with_context(|| {
format!("unable to retrieve metadata for index {:?}", name.as_ref())
})?;
let (index, _) = self
.retrieve_index(&txn, uuid)?
.with_context(|| format!("unable to retrieve index {:?}", name.as_ref()))?; .with_context(|| format!("unable to retrieve index {:?}", name.as_ref()))?;
Ok(Some((index, meta))) Ok(Some((index, meta)))
} }
@ -227,13 +249,20 @@ impl IndexStore {
} }
} }
fn update_meta<F>(&self, txn: &mut RwTxn, name: impl AsRef<str>, f: F) -> anyhow::Result<IndexMeta> fn update_meta<F>(
&self,
txn: &mut RwTxn,
name: impl AsRef<str>,
f: F,
) -> anyhow::Result<IndexMeta>
where where
F: FnOnce(&mut IndexMeta) F: FnOnce(&mut IndexMeta),
{ {
let uuid = self.index_uuid(txn, &name)? let uuid = self
.index_uuid(txn, &name)?
.with_context(|| format!("Index {:?} doesn't exist", name.as_ref()))?; .with_context(|| format!("Index {:?} doesn't exist", name.as_ref()))?;
let mut meta = self.uuid_to_index_meta let mut meta = self
.uuid_to_index_meta
.get(txn, uuid.as_bytes())? .get(txn, uuid.as_bytes())?
.with_context(|| format!("couldn't retrieve metadata for index {:?}", name.as_ref()))?; .with_context(|| format!("couldn't retrieve metadata for index {:?}", name.as_ref()))?;
f(&mut meta); f(&mut meta);
@ -252,7 +281,8 @@ impl IndexStore {
Some(res) => Ok(res), Some(res) => Ok(res),
None => { None => {
let uuid = Uuid::new_v4(); let uuid = Uuid::new_v4();
let (index, updates, _) = self.create_index_txn(&mut txn, uuid, name, update_size, index_size)?; let (index, updates, _) =
self.create_index_txn(&mut txn, uuid, name, update_size, index_size)?;
// If we fail to commit the transaction, we must delete the database from the // If we fail to commit the transaction, we must delete the database from the
// file-system. // file-system.
if let Err(e) = txn.commit() { if let Err(e) = txn.commit() {
@ -260,7 +290,7 @@ impl IndexStore {
return Err(e)?; return Err(e)?;
} }
Ok((index, updates)) Ok((index, updates))
}, }
} }
} }
@ -275,7 +305,8 @@ impl IndexStore {
self.uuid_to_index.remove(&uuid); self.uuid_to_index.remove(&uuid);
} }
fn create_index_txn( &self, fn create_index_txn(
&self,
txn: &mut RwTxn, txn: &mut RwTxn,
uuid: Uuid, uuid: Uuid,
name: impl AsRef<str>, name: impl AsRef<str>,
@ -296,15 +327,17 @@ impl IndexStore {
self.uuid_to_index_meta.put(txn, uuid.as_bytes(), &meta)?; self.uuid_to_index_meta.put(txn, uuid.as_bytes(), &meta)?;
let path = self.env.path(); let path = self.env.path();
let (index, update_store) = match meta.open(path, self.thread_pool.clone(), &self.indexer_options) { let (index, update_store) =
match meta.open(path, self.thread_pool.clone(), &self.indexer_options) {
Ok(res) => res, Ok(res) => res,
Err(e) => { Err(e) => {
self.clean_db(uuid); self.clean_db(uuid);
return Err(e) return Err(e);
} }
}; };
self.uuid_to_index.insert(uuid, (index.clone(), update_store.clone())); self.uuid_to_index
.insert(uuid, (index.clone(), update_store.clone()));
Ok((index, update_store, meta)) Ok((index, update_store, meta))
} }
@ -338,19 +371,24 @@ impl IndexStore {
/// This method will force all the indexes to be loaded. /// This method will force all the indexes to be loaded.
pub fn list_indexes(&self) -> anyhow::Result<Vec<(String, IndexMeta, Option<String>)>> { pub fn list_indexes(&self) -> anyhow::Result<Vec<(String, IndexMeta, Option<String>)>> {
let txn = self.env.read_txn()?; let txn = self.env.read_txn()?;
let metas = self.name_to_uuid let metas = self.name_to_uuid.iter(&txn)?.filter_map(|entry| {
.iter(&txn)? entry
.filter_map(|entry| entry .map_err(|e| {
.map_err(|e| { error!("error decoding entry while listing indexes: {}", e); e }).ok()); error!("error decoding entry while listing indexes: {}", e);
e
})
.ok()
});
let mut indexes = Vec::new(); let mut indexes = Vec::new();
for (name, uuid) in metas { for (name, uuid) in metas {
// get index to retrieve primary key // get index to retrieve primary key
let (index, _) = self.get_index_txn(&txn, name)? let (index, _) = self
.get_index_txn(&txn, name)?
.with_context(|| format!("could not load index {:?}", name))?; .with_context(|| format!("could not load index {:?}", name))?;
let primary_key = index.primary_key(&index.read_txn()?)? let primary_key = index.primary_key(&index.read_txn()?)?.map(String::from);
.map(String::from);
// retieve meta // retieve meta
let meta = self.uuid_to_index_meta let meta = self
.uuid_to_index_meta
.get(&txn, &uuid)? .get(&txn, &uuid)?
.with_context(|| format!("could not retieve meta for index {:?}", name))?; .with_context(|| format!("could not retieve meta for index {:?}", name))?;
indexes.push((name.to_owned(), meta, primary_key)); indexes.push((name.to_owned(), meta, primary_key));
@ -373,7 +411,10 @@ fn get_arc_ownership_blocking<T>(mut item: Arc<T>) -> T {
} }
} }
fn open_or_create_database<K: 'static, V: 'static>(env: &Env, name: Option<&str>) -> anyhow::Result<Database<K, V>> { fn open_or_create_database<K: 'static, V: 'static>(
env: &Env,
name: Option<&str>,
) -> anyhow::Result<Database<K, V>> {
match env.open_database::<K, V>(name)? { match env.open_database::<K, V>(name)? {
Some(db) => Ok(db), Some(db) => Ok(db),
None => Ok(env.create_database::<K, V>(name)?), None => Ok(env.create_database::<K, V>(name)?),
@ -432,7 +473,10 @@ mod test {
// insert an uuid in the the name_to_uuid_db: // insert an uuid in the the name_to_uuid_db:
let uuid = Uuid::new_v4(); let uuid = Uuid::new_v4();
let mut txn = store.env.write_txn().unwrap(); let mut txn = store.env.write_txn().unwrap();
store.name_to_uuid.put(&mut txn, &name, uuid.as_bytes()).unwrap(); store
.name_to_uuid
.put(&mut txn, &name, uuid.as_bytes())
.unwrap();
txn.commit().unwrap(); txn.commit().unwrap();
// check that the uuid is there // check that the uuid is there
@ -460,7 +504,10 @@ mod test {
updated_at, updated_at,
}; };
let mut txn = store.env.write_txn().unwrap(); let mut txn = store.env.write_txn().unwrap();
store.uuid_to_index_meta.put(&mut txn, uuid.as_bytes(), &meta).unwrap(); store
.uuid_to_index_meta
.put(&mut txn, uuid.as_bytes(), &meta)
.unwrap();
txn.commit().unwrap(); txn.commit().unwrap();
// the index cache should be empty // the index cache should be empty
@ -491,8 +538,14 @@ mod test {
updated_at, updated_at,
}; };
let mut txn = store.env.write_txn().unwrap(); let mut txn = store.env.write_txn().unwrap();
store.name_to_uuid.put(&mut txn, &name, uuid.as_bytes()).unwrap(); store
store.uuid_to_index_meta.put(&mut txn, uuid.as_bytes(), &meta).unwrap(); .name_to_uuid
.put(&mut txn, &name, uuid.as_bytes())
.unwrap();
store
.uuid_to_index_meta
.put(&mut txn, uuid.as_bytes(), &meta)
.unwrap();
txn.commit().unwrap(); txn.commit().unwrap();
assert!(store.index(&name).unwrap().is_some()); assert!(store.index(&name).unwrap().is_some());
@ -506,13 +559,19 @@ mod test {
let update_store_size = 4096 * 100; let update_store_size = 4096 * 100;
let index_store_size = 4096 * 100; let index_store_size = 4096 * 100;
store.get_or_create_index(&name, update_store_size, index_store_size).unwrap(); store
.get_or_create_index(&name, update_store_size, index_store_size)
.unwrap();
let txn = store.env.read_txn().unwrap(); let txn = store.env.read_txn().unwrap();
let uuid = store.name_to_uuid.get(&txn, &name).unwrap(); let uuid = store.name_to_uuid.get(&txn, &name).unwrap();
assert_eq!(store.uuid_to_index.len(), 1); assert_eq!(store.uuid_to_index.len(), 1);
assert!(uuid.is_some()); assert!(uuid.is_some());
let uuid = Uuid::from_slice(uuid.unwrap()).unwrap(); let uuid = Uuid::from_slice(uuid.unwrap()).unwrap();
let meta = store.uuid_to_index_meta.get(&txn, uuid.as_bytes()).unwrap().unwrap(); let meta = store
.uuid_to_index_meta
.get(&txn, uuid.as_bytes())
.unwrap()
.unwrap();
assert_eq!(meta.update_store_size, update_store_size); assert_eq!(meta.update_store_size, update_store_size);
assert_eq!(meta.index_store_size, index_store_size); assert_eq!(meta.index_store_size, index_store_size);
assert_eq!(meta.uuid, uuid); assert_eq!(meta.uuid, uuid);
@ -528,12 +587,18 @@ mod test {
let index_store_size = 4096 * 100; let index_store_size = 4096 * 100;
let uuid = Uuid::new_v4(); let uuid = Uuid::new_v4();
let mut txn = store.env.write_txn().unwrap(); let mut txn = store.env.write_txn().unwrap();
store.create_index_txn(&mut txn, uuid, name, update_store_size, index_store_size).unwrap(); store
.create_index_txn(&mut txn, uuid, name, update_store_size, index_store_size)
.unwrap();
let uuid = store.name_to_uuid.get(&txn, &name).unwrap(); let uuid = store.name_to_uuid.get(&txn, &name).unwrap();
assert_eq!(store.uuid_to_index.len(), 1); assert_eq!(store.uuid_to_index.len(), 1);
assert!(uuid.is_some()); assert!(uuid.is_some());
let uuid = Uuid::from_slice(uuid.unwrap()).unwrap(); let uuid = Uuid::from_slice(uuid.unwrap()).unwrap();
let meta = store.uuid_to_index_meta.get(&txn, uuid.as_bytes()).unwrap().unwrap(); let meta = store
.uuid_to_index_meta
.get(&txn, uuid.as_bytes())
.unwrap()
.unwrap();
assert_eq!(meta.update_store_size, update_store_size); assert_eq!(meta.update_store_size, update_store_size);
assert_eq!(meta.index_store_size, index_store_size); assert_eq!(meta.index_store_size, index_store_size);
assert_eq!(meta.uuid, uuid); assert_eq!(meta.uuid, uuid);

View File

@ -66,7 +66,7 @@ where
processing, processing,
}); });
// We need a week reference so we can take ownership on the arc later when we // We need a weak reference so we can take ownership on the arc later when we
// want to close the index. // want to close the index.
let update_store_weak = Arc::downgrade(&update_store); let update_store_weak = Arc::downgrade(&update_store);
std::thread::spawn(move || { std::thread::spawn(move || {
@ -81,7 +81,7 @@ where
Err(e) => eprintln!("error while processing update: {}", e), Err(e) => eprintln!("error while processing update: {}", e),
} }
} }
// the ownership on the arc has been taken, we need to exit // the ownership on the arc has been taken, we need to exit.
None => break 'outer, None => break 'outer,
} }
} }
@ -92,9 +92,7 @@ where
} }
pub fn prepare_for_closing(self) -> heed::EnvClosingEvent { pub fn prepare_for_closing(self) -> heed::EnvClosingEvent {
// We ignore this error, since that would mean the event loop is already closed. self.env.prepare_for_closing()
let closing_event = self.env.prepare_for_closing();
closing_event
} }
/// Returns the new biggest id to use to store the new update. /// Returns the new biggest id to use to store the new update.