From a8ba80965667d98a6a37aa091760348aa0ef08c7 Mon Sep 17 00:00:00 2001 From: mpostma Date: Thu, 11 Feb 2021 12:03:00 +0100 Subject: [PATCH 1/3] implement clear all documents --- src/data/updates.rs | 10 ++++++++++ .../local_index_controller/mod.rs | 8 ++++++++ .../local_index_controller/update_store.rs | 2 ++ src/index_controller/mod.rs | 3 +++ src/routes/document.rs | 15 ++++++++++++--- 5 files changed, 35 insertions(+), 3 deletions(-) diff --git a/src/data/updates.rs b/src/data/updates.rs index 57e9d1864..d3566bb6f 100644 --- a/src/data/updates.rs +++ b/src/data/updates.rs @@ -62,6 +62,16 @@ impl Data { Ok(update.into()) } + pub async fn clear_documents( + &self, + index: impl AsRef, + ) -> anyhow::Result { + let index_controller = self.index_controller.clone(); + let index = index.as_ref().to_string(); + let update = tokio::task::spawn_blocking(move || index_controller.clear_documents(index)).await??; + Ok(update.into()) + } + #[inline] pub fn get_update_status(&self, index: impl AsRef, uid: u64) -> anyhow::Result> { self.index_controller.update_status(index, uid) diff --git a/src/index_controller/local_index_controller/mod.rs b/src/index_controller/local_index_controller/mod.rs index 62055f1a8..a0c7e0805 100644 --- a/src/index_controller/local_index_controller/mod.rs +++ b/src/index_controller/local_index_controller/mod.rs @@ -182,6 +182,14 @@ impl IndexController for LocalIndexController { primary_key, }) } + + fn clear_documents(&self, index: impl AsRef) -> anyhow::Result { + let (_, update_store) = self.indexes.index(&index)? + .with_context(|| format!("Index {:?} doesn't exist", index.as_ref()))?; + let meta = UpdateMeta::ClearDocuments; + let pending = update_store.register_update(meta, &[]).unwrap(); + Ok(pending.into()) + } } fn update_primary_key(index: impl AsRef, primary_key: impl AsRef) -> anyhow::Result<()> { diff --git a/src/index_controller/local_index_controller/update_store.rs b/src/index_controller/local_index_controller/update_store.rs index d4b796993..9f0c4ad7d 100644 --- a/src/index_controller/local_index_controller/update_store.rs +++ b/src/index_controller/local_index_controller/update_store.rs @@ -254,6 +254,7 @@ where /// Trying to abort an update that is currently being processed, an update /// that as already been processed or which doesn't actually exist, will /// return `None`. + #[allow(dead_code)] pub fn abort_update(&self, update_id: u64) -> heed::Result>> { let mut wtxn = self.env.write_txn()?; let key = BEU64::new(update_id); @@ -281,6 +282,7 @@ where /// Aborts all the pending updates, and not the one being currently processed. /// Returns the update metas and ids that were successfully aborted. + #[allow(dead_code)] pub fn abort_pendings(&self) -> heed::Result)>> { let mut wtxn = self.env.write_txn()?; let mut aborted_updates = Vec::new(); diff --git a/src/index_controller/mod.rs b/src/index_controller/mod.rs index 77d91575a..0ea71a72a 100644 --- a/src/index_controller/mod.rs +++ b/src/index_controller/mod.rs @@ -128,6 +128,9 @@ pub trait IndexController { data: &[u8], ) -> anyhow::Result; + /// Clear all documents in the given index. + fn clear_documents(&self, index: impl AsRef) -> anyhow::Result; + /// Updates an index settings. If the index does not exist, it will be created when the update /// is applied to the index. fn update_settings>(&self, index_uid: S, settings: Settings) -> anyhow::Result; diff --git a/src/routes/document.rs b/src/routes/document.rs index dcc669f85..744a2e7f1 100644 --- a/src/routes/document.rs +++ b/src/routes/document.rs @@ -169,8 +169,17 @@ async fn delete_documents( #[delete("/indexes/{index_uid}/documents", wrap = "Authentication::Private")] async fn clear_all_documents( - _data: web::Data, - _path: web::Path, + data: web::Data, + path: web::Path, ) -> Result { - todo!() + match data.clear_documents(&path.index_uid).await { + Ok(update) => { + let json = serde_json::to_string(&update).unwrap(); + Ok(HttpResponse::Ok().body(json)) + } + Err(e) => { + error!("{}", e); + unimplemented!(); + } + } } From c317af58bcacf71f901279e2beeb734a14b9b308 Mon Sep 17 00:00:00 2001 From: mpostma Date: Fri, 12 Feb 2021 17:39:14 +0100 Subject: [PATCH 2/3] implement delete document batches --- src/data/updates.rs | 11 ++++++++++ .../local_index_controller/mod.rs | 15 ++++++++++--- .../local_index_controller/update_handler.rs | 22 +++++++++++++++++++ src/index_controller/mod.rs | 5 +++++ src/routes/document.rs | 22 +++++++++++++++---- 5 files changed, 68 insertions(+), 7 deletions(-) diff --git a/src/data/updates.rs b/src/data/updates.rs index d3566bb6f..1f3290e47 100644 --- a/src/data/updates.rs +++ b/src/data/updates.rs @@ -72,6 +72,17 @@ impl Data { Ok(update.into()) } + pub async fn delete_documents( + &self, + index: impl AsRef, + document_ids: Vec, + ) -> anyhow::Result { + let index_controller = self.index_controller.clone(); + let index = index.as_ref().to_string(); + let update = tokio::task::spawn_blocking(move || index_controller.delete_documents(index, document_ids)).await??; + Ok(update.into()) + } + #[inline] pub fn get_update_status(&self, index: impl AsRef, uid: u64) -> anyhow::Result> { self.index_controller.update_status(index, uid) diff --git a/src/index_controller/local_index_controller/mod.rs b/src/index_controller/local_index_controller/mod.rs index a0c7e0805..c0903a5e1 100644 --- a/src/index_controller/local_index_controller/mod.rs +++ b/src/index_controller/local_index_controller/mod.rs @@ -43,7 +43,7 @@ impl IndexController for LocalIndexController { ) -> anyhow::Result> { let (_, update_store) = self.indexes.get_or_create_index(&index, self.update_db_size, self.index_db_size)?; let meta = UpdateMeta::DocumentsAddition { method, format }; - let pending = update_store.register_update(meta, data).unwrap(); + let pending = update_store.register_update(meta, data)?; Ok(pending.into()) } @@ -54,7 +54,7 @@ impl IndexController for LocalIndexController { ) -> anyhow::Result> { let (_, update_store) = self.indexes.get_or_create_index(&index, self.update_db_size, self.index_db_size)?; let meta = UpdateMeta::Settings(settings); - let pending = update_store.register_update(meta, &[]).unwrap(); + let pending = update_store.register_update(meta, &[])?; Ok(pending.into()) } @@ -187,7 +187,16 @@ impl IndexController for LocalIndexController { let (_, update_store) = self.indexes.index(&index)? .with_context(|| format!("Index {:?} doesn't exist", index.as_ref()))?; let meta = UpdateMeta::ClearDocuments; - let pending = update_store.register_update(meta, &[]).unwrap(); + let pending = update_store.register_update(meta, &[])?; + Ok(pending.into()) + } + + fn delete_documents(&self, index: impl AsRef, document_ids: Vec) -> anyhow::Result { + let (_, update_store) = self.indexes.index(&index)? + .with_context(|| format!("Index {:?} doesn't exist", index.as_ref()))?; + let meta = UpdateMeta::DeleteDocuments; + let content = serde_json::to_vec(&document_ids)?; + let pending = update_store.register_update(meta, &content)?; Ok(pending.into()) } } diff --git a/src/index_controller/local_index_controller/update_handler.rs b/src/index_controller/local_index_controller/update_handler.rs index c89b9f686..505e9aff0 100644 --- a/src/index_controller/local_index_controller/update_handler.rs +++ b/src/index_controller/local_index_controller/update_handler.rs @@ -177,6 +177,27 @@ impl UpdateHandler { Err(e) => Err(e.into()) } } + + fn delete_documents( + &self, + document_ids: &[u8], + update_builder: UpdateBuilder, + ) -> anyhow::Result { + let ids: Vec = serde_json::from_slice(document_ids)?; + let mut txn = self.index.write_txn()?; + let mut builder = update_builder.delete_documents(&mut txn, &self.index)?; + + // we ignore unexisting document ids + ids.iter().for_each(|id| { builder.delete_external_id(id); }); + + match builder.execute() { + Ok(deleted) => txn + .commit() + .and(Ok(UpdateResult::DocumentDeletion { deleted })) + .map_err(Into::into), + Err(e) => Err(e.into()) + } + } } impl HandleUpdate for UpdateHandler { @@ -194,6 +215,7 @@ impl HandleUpdate for UpdateHandler { let result = match meta.meta() { DocumentsAddition { method, format } => self.update_documents(*format, *method, content, update_builder), ClearDocuments => self.clear_documents(update_builder), + DeleteDocuments => self.delete_documents(content, update_builder,), Settings(settings) => self.update_settings(settings, update_builder), Facets(levels) => self.update_facets(levels, update_builder), }; diff --git a/src/index_controller/mod.rs b/src/index_controller/mod.rs index 0ea71a72a..e811b6478 100644 --- a/src/index_controller/mod.rs +++ b/src/index_controller/mod.rs @@ -33,6 +33,7 @@ pub struct IndexMetadata { pub enum UpdateMeta { DocumentsAddition { method: IndexDocumentsMethod, format: UpdateFormat }, ClearDocuments, + DeleteDocuments, Settings(Settings), Facets(Facets), } @@ -94,6 +95,7 @@ impl Settings { #[derive(Debug, Clone, Serialize, Deserialize)] pub enum UpdateResult { DocumentsAddition(DocumentAdditionResult), + DocumentDeletion { deleted: usize }, Other, } @@ -131,6 +133,9 @@ pub trait IndexController { /// Clear all documents in the given index. fn clear_documents(&self, index: impl AsRef) -> anyhow::Result; + /// Clear all documents in the given index. + fn delete_documents(&self, index: impl AsRef, document_ids: Vec) -> anyhow::Result; + /// Updates an index settings. If the index does not exist, it will be created when the update /// is applied to the index. fn update_settings>(&self, index_uid: S, settings: Settings) -> anyhow::Result; diff --git a/src/routes/document.rs b/src/routes/document.rs index 744a2e7f1..67ca2a02f 100644 --- a/src/routes/document.rs +++ b/src/routes/document.rs @@ -160,11 +160,25 @@ async fn update_documents( wrap = "Authentication::Private" )] async fn delete_documents( - _data: web::Data, - _path: web::Path, - _body: web::Json>, + data: web::Data, + path: web::Path, + body: web::Json>, ) -> Result { - todo!() + let ids = body + .iter() + .map(ToString::to_string) + .collect(); + + match data.delete_documents(&path.index_uid, ids).await { + Ok(result) => { + let json = serde_json::to_string(&result).unwrap(); + Ok(HttpResponse::Ok().body(json)) + } + Err(e) => { + error!("{}", e); + unimplemented!() + } + } } #[delete("/indexes/{index_uid}/documents", wrap = "Authentication::Private")] From 28b9c158b15ad19bdfc8d6e2b0c805b0901fdd2f Mon Sep 17 00:00:00 2001 From: mpostma Date: Sat, 13 Feb 2021 10:44:20 +0100 Subject: [PATCH 3/3] implement delete single document --- src/data/updates.rs | 6 ++--- .../local_index_controller/update_handler.rs | 4 +-- src/index_controller/mod.rs | 2 +- src/routes/document.rs | 25 +++++++++++++------ 4 files changed, 22 insertions(+), 15 deletions(-) diff --git a/src/data/updates.rs b/src/data/updates.rs index 1f3290e47..9fd2015f6 100644 --- a/src/data/updates.rs +++ b/src/data/updates.rs @@ -64,21 +64,19 @@ impl Data { pub async fn clear_documents( &self, - index: impl AsRef, + index: impl AsRef + Sync + Send + 'static, ) -> anyhow::Result { let index_controller = self.index_controller.clone(); - let index = index.as_ref().to_string(); let update = tokio::task::spawn_blocking(move || index_controller.clear_documents(index)).await??; Ok(update.into()) } pub async fn delete_documents( &self, - index: impl AsRef, + index: impl AsRef + Sync + Send + 'static, document_ids: Vec, ) -> anyhow::Result { let index_controller = self.index_controller.clone(); - let index = index.as_ref().to_string(); let update = tokio::task::spawn_blocking(move || index_controller.delete_documents(index, document_ids)).await??; Ok(update.into()) } diff --git a/src/index_controller/local_index_controller/update_handler.rs b/src/index_controller/local_index_controller/update_handler.rs index 505e9aff0..7b7487ffa 100644 --- a/src/index_controller/local_index_controller/update_handler.rs +++ b/src/index_controller/local_index_controller/update_handler.rs @@ -187,7 +187,7 @@ impl UpdateHandler { let mut txn = self.index.write_txn()?; let mut builder = update_builder.delete_documents(&mut txn, &self.index)?; - // we ignore unexisting document ids + // We ignore unexisting document ids ids.iter().for_each(|id| { builder.delete_external_id(id); }); match builder.execute() { @@ -215,7 +215,7 @@ impl HandleUpdate for UpdateHandler { let result = match meta.meta() { DocumentsAddition { method, format } => self.update_documents(*format, *method, content, update_builder), ClearDocuments => self.clear_documents(update_builder), - DeleteDocuments => self.delete_documents(content, update_builder,), + DeleteDocuments => self.delete_documents(content, update_builder), Settings(settings) => self.update_settings(settings, update_builder), Facets(levels) => self.update_facets(levels, update_builder), }; diff --git a/src/index_controller/mod.rs b/src/index_controller/mod.rs index e811b6478..b881e3268 100644 --- a/src/index_controller/mod.rs +++ b/src/index_controller/mod.rs @@ -133,7 +133,7 @@ pub trait IndexController { /// Clear all documents in the given index. fn clear_documents(&self, index: impl AsRef) -> anyhow::Result; - /// Clear all documents in the given index. + /// Delete all documents in `document_ids`. fn delete_documents(&self, index: impl AsRef, document_ids: Vec) -> anyhow::Result; /// Updates an index settings. If the index does not exist, it will be created when the update diff --git a/src/routes/document.rs b/src/routes/document.rs index 67ca2a02f..874ad722b 100644 --- a/src/routes/document.rs +++ b/src/routes/document.rs @@ -30,8 +30,8 @@ type Document = IndexMap; #[derive(Deserialize)] struct DocumentParam { - _index_uid: String, - _document_id: String, + index_uid: String, + document_id: String, } pub fn services(cfg: &mut web::ServiceConfig) { @@ -60,10 +60,19 @@ async fn get_document( wrap = "Authentication::Private" )] async fn delete_document( - _data: web::Data, - _path: web::Path, + data: web::Data, + path: web::Path, ) -> Result { - todo!() + match data.delete_documents(path.index_uid.clone(), vec![path.document_id.clone()]).await { + Ok(result) => { + let json = serde_json::to_string(&result).unwrap(); + Ok(HttpResponse::Ok().body(json)) + } + Err(e) => { + error!("{}", e); + unimplemented!() + } + } } #[derive(Deserialize)] @@ -166,10 +175,10 @@ async fn delete_documents( ) -> Result { let ids = body .iter() - .map(ToString::to_string) + .map(|v| v.as_str().map(String::from).unwrap_or_else(|| v.to_string())) .collect(); - match data.delete_documents(&path.index_uid, ids).await { + match data.delete_documents(path.index_uid.clone(), ids).await { Ok(result) => { let json = serde_json::to_string(&result).unwrap(); Ok(HttpResponse::Ok().body(json)) @@ -186,7 +195,7 @@ async fn clear_all_documents( data: web::Data, path: web::Path, ) -> Result { - match data.clear_documents(&path.index_uid).await { + match data.clear_documents(path.index_uid.clone()).await { Ok(update) => { let json = serde_json::to_string(&update).unwrap(); Ok(HttpResponse::Ok().body(json))