From a9e9e72840d32955277fbe5585c504651a55ca25 Mon Sep 17 00:00:00 2001 From: mpostma Date: Mon, 15 Feb 2021 10:53:21 +0100 Subject: [PATCH] implement index deletion --- src/data/updates.rs | 9 +++ .../local_index_controller/index_store.rs | 63 ++++++++++++++++++- .../local_index_controller/mod.rs | 4 +- .../local_index_controller/update_store.rs | 26 ++++++-- src/routes/index.rs | 12 +++- 5 files changed, 102 insertions(+), 12 deletions(-) diff --git a/src/data/updates.rs b/src/data/updates.rs index e2bcba9d2..8a7052ade 100644 --- a/src/data/updates.rs +++ b/src/data/updates.rs @@ -82,6 +82,15 @@ impl Data { Ok(update.into()) } + pub async fn delete_index( + &self, + index: impl AsRef + Send + Sync + 'static, + ) -> anyhow::Result<()> { + let index_controller = self.index_controller.clone(); + tokio::task::spawn_blocking(move || { index_controller.delete_index(index) }).await??; + Ok(()) + } + #[inline] pub fn get_update_status(&self, index: impl AsRef, uid: u64) -> anyhow::Result> { self.index_controller.update_status(index, uid) diff --git a/src/index_controller/local_index_controller/index_store.rs b/src/index_controller/local_index_controller/index_store.rs index 226915016..2df1675a8 100644 --- a/src/index_controller/local_index_controller/index_store.rs +++ b/src/index_controller/local_index_controller/index_store.rs @@ -1,3 +1,4 @@ +use std::time::Duration; use std::fs::{create_dir_all, remove_dir_all}; use std::path::{Path, PathBuf}; use std::sync::Arc; @@ -6,7 +7,7 @@ use anyhow::{bail, Context}; use chrono::{DateTime, Utc}; use dashmap::{DashMap, mapref::entry::Entry}; use heed::{Env, EnvOpenOptions, Database, types::{Str, SerdeJson, ByteSlice}, RoTxn, RwTxn}; -use log::error; +use log::{error, info}; use milli::Index; use rayon::ThreadPool; use serde::{Serialize, Deserialize}; @@ -90,6 +91,52 @@ impl IndexStore { }) } + pub fn delete(&self, index_uid: impl AsRef) -> anyhow::Result<()> { + // we remove the references to the index from the index map so it is not accessible anymore + let mut txn = self.env.write_txn()?; + let uuid = self.index_uuid(&txn, &index_uid)? + .with_context(|| format!("Index {:?} doesn't exist", index_uid.as_ref()))?; + self.name_to_uuid.delete(&mut txn, index_uid.as_ref())?; + self.uuid_to_index_meta.delete(&mut txn, uuid.as_bytes())?; + txn.commit()?; + // If the index was loaded, we need to close it. Since we already removed references to it + // from the index_store, the only that can still get a reference to it is the update store. + // + // First, we want to remove any pending updates from the store. + // Second, we try to get ownership on the update store so we can close it. It may take a + // couple of tries, but since the update store event loop only has a weak reference to + // itself, and we are the only other function holding a reference to it otherwise, we will + // get it eventually. + // Fourth, we request a closing of the update store. + // Fifth, we can take ownership on the index, and close it. + // Lastly, remove all the files from the file system. + let index_uid = index_uid.as_ref().to_string(); + if let Some((_, (index, updates))) = self.uuid_to_index.remove(&uuid) { + std::thread::spawn(move || { + info!("Preparing for {:?} deletion.", index_uid); + // this error is non fatal, but may delay the deletion. + if let Err(e) = updates.abort_pendings() { + error!( + "error aborting pending updates when deleting index {:?}: {}", + index_uid, + e + ); + } + let updates = get_arc_ownership_blocking(updates); + let close_event = updates.prepare_for_closing(); + close_event.wait(); + info!("closed update store for {:?}", index_uid); + + let index = get_arc_ownership_blocking(index); + let close_event = index.prepare_for_closing(); + close_event.wait(); + info!("index {:?} deleted.", index_uid); + }); + } + + Ok(()) + } + fn index_uuid(&self, txn: &RoTxn, name: impl AsRef) -> anyhow::Result> { match self.name_to_uuid.get(txn, name.as_ref())? { Some(bytes) => { @@ -299,6 +346,20 @@ impl IndexStore { } } +// Loops on an arc to get ownership on the wrapped value. This method sleeps 100ms before retrying. +fn get_arc_ownership_blocking(mut item: Arc) -> T { + loop { + match Arc::try_unwrap(item) { + Ok(item) => return item, + Err(item_arc) => { + item = item_arc; + std::thread::sleep(Duration::from_millis(100)); + continue; + } + } + } +} + fn open_or_create_database(env: &Env, name: Option<&str>) -> anyhow::Result> { match env.open_database::(name)? { Some(db) => Ok(db), diff --git a/src/index_controller/local_index_controller/mod.rs b/src/index_controller/local_index_controller/mod.rs index 186ed3ddd..47de1a370 100644 --- a/src/index_controller/local_index_controller/mod.rs +++ b/src/index_controller/local_index_controller/mod.rs @@ -80,8 +80,8 @@ impl IndexController for LocalIndexController { Ok(meta) } - fn delete_index>(&self, _index_uid: S) -> anyhow::Result<()> { - todo!() + fn delete_index>(&self, index_uid: S) -> anyhow::Result<()> { + self.indexes.delete(index_uid) } fn swap_indices, S2: AsRef>(&self, _index1_uid: S1, _index2_uid: S2) -> anyhow::Result<()> { diff --git a/src/index_controller/local_index_controller/update_store.rs b/src/index_controller/local_index_controller/update_store.rs index 9f0c4ad7d..32f734ad4 100644 --- a/src/index_controller/local_index_controller/update_store.rs +++ b/src/index_controller/local_index_controller/update_store.rs @@ -66,15 +66,23 @@ where processing, }); - let update_store_cloned = update_store.clone(); + // We need a week reference so we can take ownership on the arc later when we + // want to close the index. + let update_store_weak = Arc::downgrade(&update_store); std::thread::spawn(move || { // Block and wait for something to process. - for () in notification_receiver { + 'outer: for _ in notification_receiver { loop { - match update_store_cloned.process_pending_update(&mut update_handler) { - Ok(Some(_)) => (), - Ok(None) => break, - Err(e) => eprintln!("error while processing update: {}", e), + match update_store_weak.upgrade() { + Some(update_store) => { + match update_store.process_pending_update(&mut update_handler) { + Ok(Some(_)) => (), + Ok(None) => break, + Err(e) => eprintln!("error while processing update: {}", e), + } + } + // the ownership on the arc has been taken, we need to exit + None => break 'outer, } } } @@ -83,6 +91,12 @@ where Ok(update_store) } + pub fn prepare_for_closing(self) -> heed::EnvClosingEvent { + // We ignore this error, since that would mean the event loop is already closed. + let closing_event = self.env.prepare_for_closing(); + closing_event + } + /// Returns the new biggest id to use to store the new update. fn new_update_id(&self, txn: &heed::RoTxn) -> heed::Result { let last_pending = self.pending_meta diff --git a/src/routes/index.rs b/src/routes/index.rs index 20f40069c..151202a88 100644 --- a/src/routes/index.rs +++ b/src/routes/index.rs @@ -108,10 +108,16 @@ async fn update_index( #[delete("/indexes/{index_uid}", wrap = "Authentication::Private")] async fn delete_index( - _data: web::Data, - _path: web::Path, + data: web::Data, + path: web::Path, ) -> Result { - todo!() + match data.delete_index(path.index_uid.clone()).await { + Ok(_) => Ok(HttpResponse::Ok().finish()), + Err(e) => { + error!("{}", e); + todo!() + } + } } #[derive(Deserialize)]