diff --git a/src/data/updates.rs b/src/data/updates.rs index e2bcba9d2..8a7052ade 100644 --- a/src/data/updates.rs +++ b/src/data/updates.rs @@ -82,6 +82,15 @@ impl Data { Ok(update.into()) } + pub async fn delete_index( + &self, + index: impl AsRef + Send + Sync + 'static, + ) -> anyhow::Result<()> { + let index_controller = self.index_controller.clone(); + tokio::task::spawn_blocking(move || { index_controller.delete_index(index) }).await??; + Ok(()) + } + #[inline] pub fn get_update_status(&self, index: impl AsRef, uid: u64) -> anyhow::Result> { self.index_controller.update_status(index, uid) diff --git a/src/index_controller/local_index_controller/index_store.rs b/src/index_controller/local_index_controller/index_store.rs index 226915016..3fe8a3f59 100644 --- a/src/index_controller/local_index_controller/index_store.rs +++ b/src/index_controller/local_index_controller/index_store.rs @@ -1,20 +1,24 @@ use std::fs::{create_dir_all, remove_dir_all}; use std::path::{Path, PathBuf}; use std::sync::Arc; +use std::time::Duration; use anyhow::{bail, Context}; use chrono::{DateTime, Utc}; -use dashmap::{DashMap, mapref::entry::Entry}; -use heed::{Env, EnvOpenOptions, Database, types::{Str, SerdeJson, ByteSlice}, RoTxn, RwTxn}; -use log::error; +use dashmap::{mapref::entry::Entry, DashMap}; +use heed::{ + types::{ByteSlice, SerdeJson, Str}, + Database, Env, EnvOpenOptions, RoTxn, RwTxn, +}; +use log::{error, info}; use milli::Index; use rayon::ThreadPool; -use serde::{Serialize, Deserialize}; +use serde::{Deserialize, Serialize}; use uuid::Uuid; -use crate::option::IndexerOpts; use super::update_handler::UpdateHandler; use super::{UpdateMeta, UpdateResult}; +use crate::option::IndexerOpts; type UpdateStore = super::update_store::UpdateStore; @@ -90,29 +94,91 @@ impl IndexStore { }) } + pub fn delete(&self, index_uid: impl AsRef) -> anyhow::Result<()> { + // we remove the references to the index from the index map so it is not accessible anymore + let mut txn = self.env.write_txn()?; + let uuid = self + .index_uuid(&txn, &index_uid)? + .with_context(|| format!("Index {:?} doesn't exist", index_uid.as_ref()))?; + self.name_to_uuid.delete(&mut txn, index_uid.as_ref())?; + self.uuid_to_index_meta.delete(&mut txn, uuid.as_bytes())?; + txn.commit()?; + // If the index was loaded (i.e it is present in the uuid_to_index map), then we need to + // close it. The process goes as follow: + // + // 1) We want to remove any pending updates from the store. + // 2) We try to get ownership on the update store so we can close it. It may take a + // couple of tries, but since the update store event loop only has a weak reference to + // itself, and we are the only other function holding a reference to it otherwise, we will + // get it eventually. + // 3) We request a closing of the update store. + // 4) We can take ownership on the index, and close it. + // 5) We remove all the files from the file system. + let index_uid = index_uid.as_ref().to_string(); + let path = self.env.path().to_owned(); + if let Some((_, (index, updates))) = self.uuid_to_index.remove(&uuid) { + std::thread::spawn(move || { + info!("Preparing for {:?} deletion.", index_uid); + // this error is non fatal, but may delay the deletion. + if let Err(e) = updates.abort_pendings() { + error!( + "error aborting pending updates when deleting index {:?}: {}", + index_uid, e + ); + } + let updates = get_arc_ownership_blocking(updates); + let close_event = updates.prepare_for_closing(); + close_event.wait(); + info!("closed update store for {:?}", index_uid); + + let index = get_arc_ownership_blocking(index); + let close_event = index.prepare_for_closing(); + close_event.wait(); + + let update_path = make_update_db_path(&path, &uuid); + let index_path = make_index_db_path(&path, &uuid); + + if let Err(e) = remove_dir_all(index_path) { + error!("error removing index {:?}: {}", index_uid, e); + } + + if let Err(e) = remove_dir_all(update_path) { + error!("error removing index {:?}: {}", index_uid, e); + } + + info!("index {:?} deleted.", index_uid); + }); + } + + Ok(()) + } + fn index_uuid(&self, txn: &RoTxn, name: impl AsRef) -> anyhow::Result> { match self.name_to_uuid.get(txn, name.as_ref())? { Some(bytes) => { let uuid = Uuid::from_slice(bytes)?; Ok(Some(uuid)) } - None => Ok(None) + None => Ok(None), } } - fn retrieve_index(&self, txn: &RoTxn, uid: Uuid) -> anyhow::Result, Arc)>> { + fn retrieve_index( + &self, + txn: &RoTxn, + uid: Uuid, + ) -> anyhow::Result, Arc)>> { match self.uuid_to_index.entry(uid.clone()) { - Entry::Vacant(entry) => { - match self.uuid_to_index_meta.get(txn, uid.as_bytes())? { - Some(meta) => { - let path = self.env.path(); - let (index, updates) = meta.open(path, self.thread_pool.clone(), &self.indexer_options)?; - entry.insert((index.clone(), updates.clone())); - Ok(Some((index, updates))) - }, - None => Ok(None) + Entry::Vacant(entry) => match self.uuid_to_index_meta.get(txn, uid.as_bytes())? { + Some(meta) => { + let path = self.env.path(); + let (index, updates) = + meta.open(path, self.thread_pool.clone(), &self.indexer_options)?; + entry.insert((index.clone(), updates.clone())); + Ok(Some((index, updates))) } - } + None => Ok(None), + }, Entry::Occupied(entry) => { let (index, updates) = entry.get(); Ok(Some((index.clone(), updates.clone()))) @@ -120,14 +186,21 @@ impl IndexStore { } } - fn get_index_txn(&self, txn: &RoTxn, name: impl AsRef) -> anyhow::Result, Arc)>> { + fn get_index_txn( + &self, + txn: &RoTxn, + name: impl AsRef, + ) -> anyhow::Result, Arc)>> { match self.index_uuid(&txn, name)? { Some(uid) => self.retrieve_index(&txn, uid), None => Ok(None), } } - pub fn index(&self, name: impl AsRef) -> anyhow::Result, Arc)>> { + pub fn index( + &self, + name: impl AsRef, + ) -> anyhow::Result, Arc)>> { let txn = self.env.read_txn()?; self.get_index_txn(&txn, name) } @@ -139,7 +212,8 @@ impl IndexStore { F: FnOnce(&Index) -> anyhow::Result, { let mut txn = self.env.write_txn()?; - let (index, _) = self.get_index_txn(&txn, &name)? + let (index, _) = self + .get_index_txn(&txn, &name)? .with_context(|| format!("Index {:?} doesn't exist", name.as_ref()))?; let result = f(index.as_ref()); match result { @@ -148,18 +222,26 @@ impl IndexStore { txn.commit()?; Ok((ret, meta)) } - Err(e) => Err(e) + Err(e) => Err(e), } } - pub fn index_with_meta(&self, name: impl AsRef) -> anyhow::Result, IndexMeta)>> { + pub fn index_with_meta( + &self, + name: impl AsRef, + ) -> anyhow::Result, IndexMeta)>> { let txn = self.env.read_txn()?; let uuid = self.index_uuid(&txn, &name)?; match uuid { Some(uuid) => { - let meta = self.uuid_to_index_meta.get(&txn, uuid.as_bytes())? - .with_context(|| format!("unable to retrieve metadata for index {:?}", name.as_ref()))?; - let (index, _) = self.retrieve_index(&txn, uuid)? + let meta = self + .uuid_to_index_meta + .get(&txn, uuid.as_bytes())? + .with_context(|| { + format!("unable to retrieve metadata for index {:?}", name.as_ref()) + })?; + let (index, _) = self + .retrieve_index(&txn, uuid)? .with_context(|| format!("unable to retrieve index {:?}", name.as_ref()))?; Ok(Some((index, meta))) } @@ -167,13 +249,20 @@ impl IndexStore { } } - fn update_meta(&self, txn: &mut RwTxn, name: impl AsRef, f: F) -> anyhow::Result + fn update_meta( + &self, + txn: &mut RwTxn, + name: impl AsRef, + f: F, + ) -> anyhow::Result where - F: FnOnce(&mut IndexMeta) + F: FnOnce(&mut IndexMeta), { - let uuid = self.index_uuid(txn, &name)? - .with_context(|| format!("Index {:?} doesn't exist", name.as_ref()))?; - let mut meta = self.uuid_to_index_meta + let uuid = self + .index_uuid(txn, &name)? + .with_context(|| format!("Index {:?} doesn't exist", name.as_ref()))?; + let mut meta = self + .uuid_to_index_meta .get(txn, uuid.as_bytes())? .with_context(|| format!("couldn't retrieve metadata for index {:?}", name.as_ref()))?; f(&mut meta); @@ -192,7 +281,8 @@ impl IndexStore { Some(res) => Ok(res), None => { let uuid = Uuid::new_v4(); - let (index, updates, _) = self.create_index_txn(&mut txn, uuid, name, update_size, index_size)?; + let (index, updates, _) = + self.create_index_txn(&mut txn, uuid, name, update_size, index_size)?; // If we fail to commit the transaction, we must delete the database from the // file-system. if let Err(e) = txn.commit() { @@ -200,7 +290,7 @@ impl IndexStore { return Err(e)?; } Ok((index, updates)) - }, + } } } @@ -215,7 +305,8 @@ impl IndexStore { self.uuid_to_index.remove(&uuid); } - fn create_index_txn( &self, + fn create_index_txn( + &self, txn: &mut RwTxn, uuid: Uuid, name: impl AsRef, @@ -236,15 +327,17 @@ impl IndexStore { self.uuid_to_index_meta.put(txn, uuid.as_bytes(), &meta)?; let path = self.env.path(); - let (index, update_store) = match meta.open(path, self.thread_pool.clone(), &self.indexer_options) { - Ok(res) => res, - Err(e) => { - self.clean_db(uuid); - return Err(e) - } - }; + let (index, update_store) = + match meta.open(path, self.thread_pool.clone(), &self.indexer_options) { + Ok(res) => res, + Err(e) => { + self.clean_db(uuid); + return Err(e); + } + }; - self.uuid_to_index.insert(uuid, (index.clone(), update_store.clone())); + self.uuid_to_index + .insert(uuid, (index.clone(), update_store.clone())); Ok((index, update_store, meta)) } @@ -278,19 +371,24 @@ impl IndexStore { /// This method will force all the indexes to be loaded. pub fn list_indexes(&self) -> anyhow::Result)>> { let txn = self.env.read_txn()?; - let metas = self.name_to_uuid - .iter(&txn)? - .filter_map(|entry| entry - .map_err(|e| { error!("error decoding entry while listing indexes: {}", e); e }).ok()); + let metas = self.name_to_uuid.iter(&txn)?.filter_map(|entry| { + entry + .map_err(|e| { + error!("error decoding entry while listing indexes: {}", e); + e + }) + .ok() + }); let mut indexes = Vec::new(); for (name, uuid) in metas { // get index to retrieve primary key - let (index, _) = self.get_index_txn(&txn, name)? + let (index, _) = self + .get_index_txn(&txn, name)? .with_context(|| format!("could not load index {:?}", name))?; - let primary_key = index.primary_key(&index.read_txn()?)? - .map(String::from); + let primary_key = index.primary_key(&index.read_txn()?)?.map(String::from); // retieve meta - let meta = self.uuid_to_index_meta + let meta = self + .uuid_to_index_meta .get(&txn, &uuid)? .with_context(|| format!("could not retieve meta for index {:?}", name))?; indexes.push((name.to_owned(), meta, primary_key)); @@ -299,7 +397,24 @@ impl IndexStore { } } -fn open_or_create_database(env: &Env, name: Option<&str>) -> anyhow::Result> { +// Loops on an arc to get ownership on the wrapped value. This method sleeps 100ms before retrying. +fn get_arc_ownership_blocking(mut item: Arc) -> T { + loop { + match Arc::try_unwrap(item) { + Ok(item) => return item, + Err(item_arc) => { + item = item_arc; + std::thread::sleep(Duration::from_millis(100)); + continue; + } + } + } +} + +fn open_or_create_database( + env: &Env, + name: Option<&str>, +) -> anyhow::Result> { match env.open_database::(name)? { Some(db) => Ok(db), None => Ok(env.create_database::(name)?), @@ -358,7 +473,10 @@ mod test { // insert an uuid in the the name_to_uuid_db: let uuid = Uuid::new_v4(); let mut txn = store.env.write_txn().unwrap(); - store.name_to_uuid.put(&mut txn, &name, uuid.as_bytes()).unwrap(); + store + .name_to_uuid + .put(&mut txn, &name, uuid.as_bytes()) + .unwrap(); txn.commit().unwrap(); // check that the uuid is there @@ -386,7 +504,10 @@ mod test { updated_at, }; let mut txn = store.env.write_txn().unwrap(); - store.uuid_to_index_meta.put(&mut txn, uuid.as_bytes(), &meta).unwrap(); + store + .uuid_to_index_meta + .put(&mut txn, uuid.as_bytes(), &meta) + .unwrap(); txn.commit().unwrap(); // the index cache should be empty @@ -417,8 +538,14 @@ mod test { updated_at, }; let mut txn = store.env.write_txn().unwrap(); - store.name_to_uuid.put(&mut txn, &name, uuid.as_bytes()).unwrap(); - store.uuid_to_index_meta.put(&mut txn, uuid.as_bytes(), &meta).unwrap(); + store + .name_to_uuid + .put(&mut txn, &name, uuid.as_bytes()) + .unwrap(); + store + .uuid_to_index_meta + .put(&mut txn, uuid.as_bytes(), &meta) + .unwrap(); txn.commit().unwrap(); assert!(store.index(&name).unwrap().is_some()); @@ -432,13 +559,19 @@ mod test { let update_store_size = 4096 * 100; let index_store_size = 4096 * 100; - store.get_or_create_index(&name, update_store_size, index_store_size).unwrap(); + store + .get_or_create_index(&name, update_store_size, index_store_size) + .unwrap(); let txn = store.env.read_txn().unwrap(); - let uuid = store.name_to_uuid.get(&txn, &name).unwrap(); + let uuid = store.name_to_uuid.get(&txn, &name).unwrap(); assert_eq!(store.uuid_to_index.len(), 1); assert!(uuid.is_some()); let uuid = Uuid::from_slice(uuid.unwrap()).unwrap(); - let meta = store.uuid_to_index_meta.get(&txn, uuid.as_bytes()).unwrap().unwrap(); + let meta = store + .uuid_to_index_meta + .get(&txn, uuid.as_bytes()) + .unwrap() + .unwrap(); assert_eq!(meta.update_store_size, update_store_size); assert_eq!(meta.index_store_size, index_store_size); assert_eq!(meta.uuid, uuid); @@ -454,12 +587,18 @@ mod test { let index_store_size = 4096 * 100; let uuid = Uuid::new_v4(); let mut txn = store.env.write_txn().unwrap(); - store.create_index_txn(&mut txn, uuid, name, update_store_size, index_store_size).unwrap(); + store + .create_index_txn(&mut txn, uuid, name, update_store_size, index_store_size) + .unwrap(); let uuid = store.name_to_uuid.get(&txn, &name).unwrap(); assert_eq!(store.uuid_to_index.len(), 1); assert!(uuid.is_some()); let uuid = Uuid::from_slice(uuid.unwrap()).unwrap(); - let meta = store.uuid_to_index_meta.get(&txn, uuid.as_bytes()).unwrap().unwrap(); + let meta = store + .uuid_to_index_meta + .get(&txn, uuid.as_bytes()) + .unwrap() + .unwrap(); assert_eq!(meta.update_store_size, update_store_size); assert_eq!(meta.index_store_size, index_store_size); assert_eq!(meta.uuid, uuid); diff --git a/src/index_controller/local_index_controller/mod.rs b/src/index_controller/local_index_controller/mod.rs index 186ed3ddd..47de1a370 100644 --- a/src/index_controller/local_index_controller/mod.rs +++ b/src/index_controller/local_index_controller/mod.rs @@ -80,8 +80,8 @@ impl IndexController for LocalIndexController { Ok(meta) } - fn delete_index>(&self, _index_uid: S) -> anyhow::Result<()> { - todo!() + fn delete_index>(&self, index_uid: S) -> anyhow::Result<()> { + self.indexes.delete(index_uid) } fn swap_indices, S2: AsRef>(&self, _index1_uid: S1, _index2_uid: S2) -> anyhow::Result<()> { diff --git a/src/index_controller/local_index_controller/update_store.rs b/src/index_controller/local_index_controller/update_store.rs index 9f0c4ad7d..b025ff090 100644 --- a/src/index_controller/local_index_controller/update_store.rs +++ b/src/index_controller/local_index_controller/update_store.rs @@ -66,15 +66,23 @@ where processing, }); - let update_store_cloned = update_store.clone(); + // We need a weak reference so we can take ownership on the arc later when we + // want to close the index. + let update_store_weak = Arc::downgrade(&update_store); std::thread::spawn(move || { // Block and wait for something to process. - for () in notification_receiver { + 'outer: for _ in notification_receiver { loop { - match update_store_cloned.process_pending_update(&mut update_handler) { - Ok(Some(_)) => (), - Ok(None) => break, - Err(e) => eprintln!("error while processing update: {}", e), + match update_store_weak.upgrade() { + Some(update_store) => { + match update_store.process_pending_update(&mut update_handler) { + Ok(Some(_)) => (), + Ok(None) => break, + Err(e) => eprintln!("error while processing update: {}", e), + } + } + // the ownership on the arc has been taken, we need to exit. + None => break 'outer, } } } @@ -83,6 +91,10 @@ where Ok(update_store) } + pub fn prepare_for_closing(self) -> heed::EnvClosingEvent { + self.env.prepare_for_closing() + } + /// Returns the new biggest id to use to store the new update. fn new_update_id(&self, txn: &heed::RoTxn) -> heed::Result { let last_pending = self.pending_meta diff --git a/src/routes/index.rs b/src/routes/index.rs index 20f40069c..151202a88 100644 --- a/src/routes/index.rs +++ b/src/routes/index.rs @@ -108,10 +108,16 @@ async fn update_index( #[delete("/indexes/{index_uid}", wrap = "Authentication::Private")] async fn delete_index( - _data: web::Data, - _path: web::Path, + data: web::Data, + path: web::Path, ) -> Result { - todo!() + match data.delete_index(path.index_uid.clone()).await { + Ok(_) => Ok(HttpResponse::Ok().finish()), + Err(e) => { + error!("{}", e); + todo!() + } + } } #[derive(Deserialize)]