implement index deletion

This commit is contained in:
mpostma 2021-02-15 10:53:21 +01:00
parent a580a6a44d
commit a9e9e72840
No known key found for this signature in database
GPG Key ID: CBC8A7C1D7A28C3A
5 changed files with 102 additions and 12 deletions

View File

@ -82,6 +82,15 @@ impl Data {
Ok(update.into()) Ok(update.into())
} }
pub async fn delete_index(
&self,
index: impl AsRef<str> + Send + Sync + 'static,
) -> anyhow::Result<()> {
let index_controller = self.index_controller.clone();
tokio::task::spawn_blocking(move || { index_controller.delete_index(index) }).await??;
Ok(())
}
#[inline] #[inline]
pub fn get_update_status(&self, index: impl AsRef<str>, uid: u64) -> anyhow::Result<Option<UpdateStatus>> { pub fn get_update_status(&self, index: impl AsRef<str>, uid: u64) -> anyhow::Result<Option<UpdateStatus>> {
self.index_controller.update_status(index, uid) self.index_controller.update_status(index, uid)

View File

@ -1,3 +1,4 @@
use std::time::Duration;
use std::fs::{create_dir_all, remove_dir_all}; use std::fs::{create_dir_all, remove_dir_all};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::Arc; use std::sync::Arc;
@ -6,7 +7,7 @@ use anyhow::{bail, Context};
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use dashmap::{DashMap, mapref::entry::Entry}; use dashmap::{DashMap, mapref::entry::Entry};
use heed::{Env, EnvOpenOptions, Database, types::{Str, SerdeJson, ByteSlice}, RoTxn, RwTxn}; use heed::{Env, EnvOpenOptions, Database, types::{Str, SerdeJson, ByteSlice}, RoTxn, RwTxn};
use log::error; use log::{error, info};
use milli::Index; use milli::Index;
use rayon::ThreadPool; use rayon::ThreadPool;
use serde::{Serialize, Deserialize}; use serde::{Serialize, Deserialize};
@ -90,6 +91,52 @@ impl IndexStore {
}) })
} }
pub fn delete(&self, index_uid: impl AsRef<str>) -> anyhow::Result<()> {
// we remove the references to the index from the index map so it is not accessible anymore
let mut txn = self.env.write_txn()?;
let uuid = self.index_uuid(&txn, &index_uid)?
.with_context(|| format!("Index {:?} doesn't exist", index_uid.as_ref()))?;
self.name_to_uuid.delete(&mut txn, index_uid.as_ref())?;
self.uuid_to_index_meta.delete(&mut txn, uuid.as_bytes())?;
txn.commit()?;
// If the index was loaded, we need to close it. Since we already removed references to it
// from the index_store, the only that can still get a reference to it is the update store.
//
// First, we want to remove any pending updates from the store.
// Second, we try to get ownership on the update store so we can close it. It may take a
// couple of tries, but since the update store event loop only has a weak reference to
// itself, and we are the only other function holding a reference to it otherwise, we will
// get it eventually.
// Fourth, we request a closing of the update store.
// Fifth, we can take ownership on the index, and close it.
// Lastly, remove all the files from the file system.
let index_uid = index_uid.as_ref().to_string();
if let Some((_, (index, updates))) = self.uuid_to_index.remove(&uuid) {
std::thread::spawn(move || {
info!("Preparing for {:?} deletion.", index_uid);
// this error is non fatal, but may delay the deletion.
if let Err(e) = updates.abort_pendings() {
error!(
"error aborting pending updates when deleting index {:?}: {}",
index_uid,
e
);
}
let updates = get_arc_ownership_blocking(updates);
let close_event = updates.prepare_for_closing();
close_event.wait();
info!("closed update store for {:?}", index_uid);
let index = get_arc_ownership_blocking(index);
let close_event = index.prepare_for_closing();
close_event.wait();
info!("index {:?} deleted.", index_uid);
});
}
Ok(())
}
fn index_uuid(&self, txn: &RoTxn, name: impl AsRef<str>) -> anyhow::Result<Option<Uuid>> { fn index_uuid(&self, txn: &RoTxn, name: impl AsRef<str>) -> anyhow::Result<Option<Uuid>> {
match self.name_to_uuid.get(txn, name.as_ref())? { match self.name_to_uuid.get(txn, name.as_ref())? {
Some(bytes) => { Some(bytes) => {
@ -299,6 +346,20 @@ impl IndexStore {
} }
} }
// Loops on an arc to get ownership on the wrapped value. This method sleeps 100ms before retrying.
fn get_arc_ownership_blocking<T>(mut item: Arc<T>) -> T {
loop {
match Arc::try_unwrap(item) {
Ok(item) => return item,
Err(item_arc) => {
item = item_arc;
std::thread::sleep(Duration::from_millis(100));
continue;
}
}
}
}
fn open_or_create_database<K: 'static, V: 'static>(env: &Env, name: Option<&str>) -> anyhow::Result<Database<K, V>> { fn open_or_create_database<K: 'static, V: 'static>(env: &Env, name: Option<&str>) -> anyhow::Result<Database<K, V>> {
match env.open_database::<K, V>(name)? { match env.open_database::<K, V>(name)? {
Some(db) => Ok(db), Some(db) => Ok(db),

View File

@ -80,8 +80,8 @@ impl IndexController for LocalIndexController {
Ok(meta) Ok(meta)
} }
fn delete_index<S: AsRef<str>>(&self, _index_uid: S) -> anyhow::Result<()> { fn delete_index<S: AsRef<str>>(&self, index_uid: S) -> anyhow::Result<()> {
todo!() self.indexes.delete(index_uid)
} }
fn swap_indices<S1: AsRef<str>, S2: AsRef<str>>(&self, _index1_uid: S1, _index2_uid: S2) -> anyhow::Result<()> { fn swap_indices<S1: AsRef<str>, S2: AsRef<str>>(&self, _index1_uid: S1, _index2_uid: S2) -> anyhow::Result<()> {

View File

@ -66,23 +66,37 @@ where
processing, processing,
}); });
let update_store_cloned = update_store.clone(); // We need a week reference so we can take ownership on the arc later when we
// want to close the index.
let update_store_weak = Arc::downgrade(&update_store);
std::thread::spawn(move || { std::thread::spawn(move || {
// Block and wait for something to process. // Block and wait for something to process.
for () in notification_receiver { 'outer: for _ in notification_receiver {
loop { loop {
match update_store_cloned.process_pending_update(&mut update_handler) { match update_store_weak.upgrade() {
Some(update_store) => {
match update_store.process_pending_update(&mut update_handler) {
Ok(Some(_)) => (), Ok(Some(_)) => (),
Ok(None) => break, Ok(None) => break,
Err(e) => eprintln!("error while processing update: {}", e), Err(e) => eprintln!("error while processing update: {}", e),
} }
} }
// the ownership on the arc has been taken, we need to exit
None => break 'outer,
}
}
} }
}); });
Ok(update_store) Ok(update_store)
} }
pub fn prepare_for_closing(self) -> heed::EnvClosingEvent {
// We ignore this error, since that would mean the event loop is already closed.
let closing_event = self.env.prepare_for_closing();
closing_event
}
/// Returns the new biggest id to use to store the new update. /// Returns the new biggest id to use to store the new update.
fn new_update_id(&self, txn: &heed::RoTxn) -> heed::Result<u64> { fn new_update_id(&self, txn: &heed::RoTxn) -> heed::Result<u64> {
let last_pending = self.pending_meta let last_pending = self.pending_meta

View File

@ -108,10 +108,16 @@ async fn update_index(
#[delete("/indexes/{index_uid}", wrap = "Authentication::Private")] #[delete("/indexes/{index_uid}", wrap = "Authentication::Private")]
async fn delete_index( async fn delete_index(
_data: web::Data<Data>, data: web::Data<Data>,
_path: web::Path<IndexParam>, path: web::Path<IndexParam>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
match data.delete_index(path.index_uid.clone()).await {
Ok(_) => Ok(HttpResponse::Ok().finish()),
Err(e) => {
error!("{}", e);
todo!() todo!()
}
}
} }
#[derive(Deserialize)] #[derive(Deserialize)]