From d9254c43554755f8cc8e153f1d5b52dd029b2d58 Mon Sep 17 00:00:00 2001 From: mpostma Date: Sat, 6 Mar 2021 12:57:56 +0100 Subject: [PATCH] implement index delete --- src/data/updates.rs | 8 +-- src/index_controller/index_actor.rs | 80 +++++++++++++++++++++++---- src/index_controller/mod.rs | 32 +++++++++-- src/index_controller/update_actor.rs | 51 +++++++++++++++-- src/index_controller/uuid_resolver.rs | 30 ++++++++-- 5 files changed, 171 insertions(+), 30 deletions(-) diff --git a/src/data/updates.rs b/src/data/updates.rs index dd74bc47f..e9d1b51b7 100644 --- a/src/data/updates.rs +++ b/src/data/updates.rs @@ -51,12 +51,10 @@ impl Data { pub async fn delete_index( &self, - _index: impl AsRef + Send + Sync + 'static, + index: impl AsRef + Send + Sync + 'static, ) -> anyhow::Result<()> { - todo!() - //let index_controller = self.index_controller.clone(); - //tokio::task::spawn_blocking(move || { index_controller.delete_index(index) }).await??; - //Ok(()) + self.index_controller.delete_index(index.as_ref().to_owned()).await?; + Ok(()) } pub async fn get_update_status(&self, index: impl AsRef, uid: u64) -> anyhow::Result> { diff --git a/src/index_controller/index_actor.rs b/src/index_controller/index_actor.rs index 0e74d6665..c77793a5b 100644 --- a/src/index_controller/index_actor.rs +++ b/src/index_controller/index_actor.rs @@ -1,19 +1,20 @@ use std::collections::{hash_map::Entry, HashMap}; -use std::fs::{create_dir_all, File}; +use std::fs::{create_dir_all, File, remove_dir_all}; use std::path::{Path, PathBuf}; use std::sync::Arc; use async_stream::stream; use chrono::Utc; +use futures::pin_mut; use futures::stream::StreamExt; use heed::EnvOpenOptions; use log::info; +use std::future::Future; use thiserror::Error; use tokio::sync::{mpsc, oneshot, RwLock}; use uuid::Uuid; -use std::future::Future; -use futures::pin_mut; +use super::get_arc_ownership_blocking; use super::update_handler::UpdateHandler; use crate::index::UpdateResult as UResult; use crate::index::{Document, Index, SearchQuery, SearchResult, Settings}; @@ -59,7 +60,11 @@ enum IndexMsg { attributes_to_retrieve: Option>, doc_id: String, ret: oneshot::Sender>, - } + }, + Delete { + uuid: Uuid, + ret: oneshot::Sender>, + }, } struct IndexActor { @@ -82,13 +87,14 @@ trait IndexStore { async fn create_index(&self, uuid: Uuid, primary_key: Option) -> Result; async fn get_or_create(&self, uuid: Uuid) -> Result; async fn get(&self, uuid: Uuid) -> Result>; + async fn delete(&self, uuid: &Uuid) -> Result>; } impl IndexActor { fn new( read_receiver: mpsc::Receiver, write_receiver: mpsc::Receiver, - store: S + store: S, ) -> Self { let options = IndexerOpts::default(); let update_handler = UpdateHandler::new(&options).unwrap(); @@ -149,7 +155,6 @@ impl IndexActor { //futures.push(fut2); //futures.for_each(f) tokio::join!(fut1, fut2); - } async fn handle_message(&self, msg: IndexMsg) { @@ -173,7 +178,18 @@ impl IndexActor { self.handle_fetch_documents(uuid, offset, limit, attributes_to_retrieve, ret) .await } - Document { uuid, attributes_to_retrieve, doc_id, ret } => self.handle_fetch_document(uuid, doc_id, attributes_to_retrieve, ret).await, + Document { + uuid, + attributes_to_retrieve, + doc_id, + ret, + } => { + self.handle_fetch_document(uuid, doc_id, attributes_to_retrieve, ret) + .await + } + Delete { uuid, ret } => { + let _ = ret.send(self.handle_delete(uuid).await); + }, } } @@ -236,10 +252,12 @@ impl IndexActor { ) { let index = self.store.get(uuid).await.unwrap().unwrap(); tokio::task::spawn_blocking(move || { - let result = index.retrieve_documents(offset, limit, attributes_to_retrieve) + let result = index + .retrieve_documents(offset, limit, attributes_to_retrieve) .map_err(|e| IndexError::Error(e)); let _ = ret.send(result); - }).await; + }) + .await; } async fn handle_fetch_document( @@ -251,10 +269,29 @@ impl IndexActor { ) { let index = self.store.get(uuid).await.unwrap().unwrap(); tokio::task::spawn_blocking(move || { - let result = index.retrieve_document(doc_id, attributes_to_retrieve) + let result = index + .retrieve_document(doc_id, attributes_to_retrieve) .map_err(|e| IndexError::Error(e)); let _ = ret.send(result); - }).await; + }) + .await; + } + + async fn handle_delete(&self, uuid: Uuid) -> Result<()> { + let index = self.store.delete(&uuid).await?; + + if let Some(index) = index { + tokio::task::spawn(async move { + let index = index.0; + let store = get_arc_ownership_blocking(index).await; + tokio::task::spawn_blocking(move || { + store.prepare_for_closing().wait(); + info!("Index {} was closed.", uuid); + }); + }); + } + + Ok(()) } } @@ -272,7 +309,10 @@ impl IndexActorHandle { let store = MapIndexStore::new(path); let actor = IndexActor::new(read_receiver, write_receiver, store); tokio::task::spawn(actor.run()); - Self { read_sender, write_sender } + Self { + read_sender, + write_sender, + } } pub async fn create_index( @@ -346,6 +386,13 @@ impl IndexActorHandle { let _ = self.read_sender.send(msg).await; Ok(receiver.await.expect("IndexActor has been killed")?) } + + pub async fn delete(&self, uuid: Uuid) -> Result<()> { + let (ret, receiver) = oneshot::channel(); + let msg = IndexMsg::Delete { uuid, ret }; + let _ = self.read_sender.send(msg).await; + Ok(receiver.await.expect("IndexActor has been killed")?) + } } struct MapIndexStore { @@ -408,6 +455,15 @@ impl IndexStore for MapIndexStore { async fn get(&self, uuid: Uuid) -> Result> { Ok(self.index_store.read().await.get(&uuid).cloned()) } + + async fn delete(&self, uuid: &Uuid) -> Result> { + let index = self.index_store.write().await.remove(&uuid); + if index.is_some() { + let db_path = self.root.join(format!("index-{}", uuid)); + remove_dir_all(db_path).unwrap(); + } + Ok(index) + } } impl MapIndexStore { diff --git a/src/index_controller/mod.rs b/src/index_controller/mod.rs index 4606526c4..19fe62f4d 100644 --- a/src/index_controller/mod.rs +++ b/src/index_controller/mod.rs @@ -1,11 +1,13 @@ -mod updates; mod index_actor; mod update_actor; -mod uuid_resolver; -mod update_store; mod update_handler; +mod update_store; +mod updates; +mod uuid_resolver; use std::path::Path; +use std::sync::Arc; +use std::time::Duration; use actix_web::web::{Bytes, Payload}; use anyhow::Context; @@ -14,6 +16,7 @@ use futures::stream::StreamExt; use milli::update::{IndexDocumentsMethod, UpdateFormat}; use serde::{Serialize, Deserialize}; use tokio::sync::{mpsc, oneshot}; +use tokio::time::sleep; use uuid::Uuid; pub use updates::{Processed, Processing, Failed}; @@ -146,8 +149,14 @@ impl IndexController { Ok(index_meta) } - fn delete_index(&self, index_uid: String) -> anyhow::Result<()> { - todo!() + pub async fn delete_index(&self, index_uid: String) -> anyhow::Result<()> { + let uuid = self.uuid_resolver + .delete(index_uid) + .await? + .context("index not found")?; + self.update_handle.delete(uuid.clone()).await?; + self.index_handle.delete(uuid).await?; + Ok(()) } pub async fn update_status(&self, index: String, id: u64) -> anyhow::Result> { @@ -219,3 +228,16 @@ impl IndexController { Ok(result) } } + +pub async fn get_arc_ownership_blocking(mut item: Arc) -> T { + loop { + match Arc::try_unwrap(item) { + Ok(item) => return item, + Err(item_arc) => { + item = item_arc; + sleep(Duration::from_millis(100)).await; + continue; + } + } + } +} diff --git a/src/index_controller/update_actor.rs b/src/index_controller/update_actor.rs index 83f29c380..370912dcf 100644 --- a/src/index_controller/update_actor.rs +++ b/src/index_controller/update_actor.rs @@ -1,19 +1,20 @@ use std::collections::{hash_map::Entry, HashMap}; -use std::fs::create_dir_all; +use std::fs::{create_dir_all, remove_dir_all}; use std::path::{Path, PathBuf}; use std::sync::Arc; -use super::index_actor::IndexActorHandle; +use itertools::Itertools; use log::info; +use super::index_actor::IndexActorHandle; use thiserror::Error; use tokio::fs::File; use tokio::io::AsyncWriteExt; use tokio::sync::{mpsc, oneshot, RwLock}; use uuid::Uuid; -use itertools::Itertools; use crate::index::UpdateResult; use crate::index_controller::{UpdateMeta, UpdateStatus}; +use super::get_arc_ownership_blocking; pub type Result = std::result::Result; type UpdateStore = super::update_store::UpdateStore; @@ -42,7 +43,11 @@ enum UpdateMsg { uuid: Uuid, ret: oneshot::Sender>>, id: u64, - } + }, + Delete { + uuid: Uuid, + ret: oneshot::Sender>, + }, } struct UpdateActor { @@ -54,6 +59,7 @@ struct UpdateActor { #[async_trait::async_trait] trait UpdateStoreStore { async fn get_or_create(&self, uuid: Uuid) -> Result>; + async fn delete(&self, uuid: &Uuid) -> Result>>; async fn get(&self, uuid: &Uuid) -> Result>>; } @@ -89,6 +95,9 @@ where Some(GetUpdate { uuid, ret, id }) => { let _ = ret.send(self.handle_get_update(uuid, id).await); } + Some(Delete { uuid, ret }) => { + let _ = ret.send(self.handle_delete(uuid).await); + } None => {} } } @@ -173,6 +182,24 @@ where .map_err(|e| UpdateError::Error(Box::new(e)))?; Ok(result) } + + async fn handle_delete(&self, uuid: Uuid) -> Result<()> { + let store = self.store + .delete(&uuid) + .await?; + + if let Some(store) = store { + tokio::task::spawn(async move { + let store = get_arc_ownership_blocking(store).await; + tokio::task::spawn_blocking(move || { + store.prepare_for_closing().wait(); + info!("Update store {} was closed.", uuid); + }); + }); + } + + Ok(()) + } } #[derive(Clone)] @@ -225,6 +252,13 @@ where let _ = self.sender.send(msg).await; receiver.await.expect("update actor killed.") } + + pub async fn delete(&self, uuid: Uuid) -> Result<()> { + let (ret, receiver) = oneshot::channel(); + let msg = UpdateMsg::Delete { uuid, ret }; + let _ = self.sender.send(msg).await; + receiver.await.expect("update actor killed.") + } } struct MapUpdateStoreStore { @@ -269,4 +303,13 @@ impl UpdateStoreStore for MapUpdateStoreStore { async fn get(&self, uuid: &Uuid) -> Result>> { Ok(self.db.read().await.get(uuid).cloned()) } + + async fn delete(&self, uuid: &Uuid) -> Result>> { + let store = self.db.write().await.remove(&uuid); + if store.is_some() { + let path = self.path.clone().join(format!("updates-{}", uuid)); + remove_dir_all(path).unwrap(); + } + Ok(store) + } } diff --git a/src/index_controller/uuid_resolver.rs b/src/index_controller/uuid_resolver.rs index 3143ae8fc..50740f30f 100644 --- a/src/index_controller/uuid_resolver.rs +++ b/src/index_controller/uuid_resolver.rs @@ -22,6 +22,10 @@ enum UuidResolveMsg { name: String, ret: oneshot::Sender>, }, + Delete { + name: String, + ret: oneshot::Sender>>, + }, } struct UuidResolverActor { @@ -45,6 +49,7 @@ impl UuidResolverActor { Some(Create { name, ret }) => self.handle_create(name, ret).await, Some(GetOrCreate { name, ret }) => self.handle_get_or_create(name, ret).await, Some(Resolve { name, ret }) => self.handle_resolve(name, ret).await, + Some(Delete { name, ret }) => self.handle_delete(name, ret).await, // all senders have been dropped, need to quit. None => break, } @@ -64,7 +69,12 @@ impl UuidResolverActor { } async fn handle_resolve(&self, name: String, ret: oneshot::Sender>>) { - let result = self.store.get_uuid(name).await; + let result = self.store.get_uuid(&name).await; + let _ = ret.send(result); + } + + async fn handle_delete(&self, name: String, ret: oneshot::Sender>>) { + let result = self.store.delete(&name).await; let _ = ret.send(result); } } @@ -103,6 +113,13 @@ impl UuidResolverHandle { let _ = self.sender.send(msg).await; Ok(receiver.await.expect("Uuid resolver actor has been killed")?) } + + pub async fn delete(&self, name: String) -> anyhow::Result> { + let (ret, receiver) = oneshot::channel(); + let msg = UuidResolveMsg::Delete { name, ret }; + let _ = self.sender.send(msg).await; + Ok(receiver.await.expect("Uuid resolver actor has been killed")?) + } } #[derive(Clone, Debug, Error)] @@ -116,7 +133,8 @@ trait UuidStore { // Create a new entry for `name`. Return an error if `err` and the entry already exists, return // the uuid otherwise. async fn create_uuid(&self, name: String, err: bool) -> Result; - async fn get_uuid(&self, name: String) -> Result>; + async fn get_uuid(&self, name: &str) -> Result>; + async fn delete(&self, name: &str) -> Result>; } struct MapUuidStore(Arc>>); @@ -140,7 +158,11 @@ impl UuidStore for MapUuidStore { } } - async fn get_uuid(&self, name: String) -> Result> { - Ok(self.0.read().await.get(&name).cloned()) + async fn get_uuid(&self, name: &str) -> Result> { + Ok(self.0.read().await.get(name).cloned()) + } + + async fn delete(&self, name: &str) -> Result> { + Ok(self.0.write().await.remove(name)) } }