Merge pull request #36 from meilisearch/delete-documents

delete documents
This commit is contained in:
marin 2021-02-15 22:39:00 +01:00 committed by GitHub
commit 6c7175dfc2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 114 additions and 14 deletions

View File

@ -62,6 +62,25 @@ impl Data {
Ok(update.into()) Ok(update.into())
} }
pub async fn clear_documents(
&self,
index: impl AsRef<str> + Sync + Send + 'static,
) -> anyhow::Result<UpdateStatus> {
let index_controller = self.index_controller.clone();
let update = tokio::task::spawn_blocking(move || index_controller.clear_documents(index)).await??;
Ok(update.into())
}
pub async fn delete_documents(
&self,
index: impl AsRef<str> + Sync + Send + 'static,
document_ids: Vec<String>,
) -> anyhow::Result<UpdateStatus> {
let index_controller = self.index_controller.clone();
let update = tokio::task::spawn_blocking(move || index_controller.delete_documents(index, document_ids)).await??;
Ok(update.into())
}
#[inline] #[inline]
pub fn get_update_status(&self, index: impl AsRef<str>, uid: u64) -> anyhow::Result<Option<UpdateStatus>> { pub fn get_update_status(&self, index: impl AsRef<str>, uid: u64) -> anyhow::Result<Option<UpdateStatus>> {
self.index_controller.update_status(index, uid) self.index_controller.update_status(index, uid)

View File

@ -43,7 +43,7 @@ impl IndexController for LocalIndexController {
) -> anyhow::Result<UpdateStatus<UpdateMeta, UpdateResult, String>> { ) -> anyhow::Result<UpdateStatus<UpdateMeta, UpdateResult, String>> {
let (_, update_store) = self.indexes.get_or_create_index(&index, self.update_db_size, self.index_db_size)?; let (_, update_store) = self.indexes.get_or_create_index(&index, self.update_db_size, self.index_db_size)?;
let meta = UpdateMeta::DocumentsAddition { method, format }; let meta = UpdateMeta::DocumentsAddition { method, format };
let pending = update_store.register_update(meta, data).unwrap(); let pending = update_store.register_update(meta, data)?;
Ok(pending.into()) Ok(pending.into())
} }
@ -54,7 +54,7 @@ impl IndexController for LocalIndexController {
) -> anyhow::Result<UpdateStatus<UpdateMeta, UpdateResult, String>> { ) -> anyhow::Result<UpdateStatus<UpdateMeta, UpdateResult, String>> {
let (_, update_store) = self.indexes.get_or_create_index(&index, self.update_db_size, self.index_db_size)?; let (_, update_store) = self.indexes.get_or_create_index(&index, self.update_db_size, self.index_db_size)?;
let meta = UpdateMeta::Settings(settings); let meta = UpdateMeta::Settings(settings);
let pending = update_store.register_update(meta, &[]).unwrap(); let pending = update_store.register_update(meta, &[])?;
Ok(pending.into()) Ok(pending.into())
} }
@ -182,6 +182,23 @@ impl IndexController for LocalIndexController {
primary_key, primary_key,
}) })
} }
fn clear_documents(&self, index: impl AsRef<str>) -> anyhow::Result<super::UpdateStatus> {
let (_, update_store) = self.indexes.index(&index)?
.with_context(|| format!("Index {:?} doesn't exist", index.as_ref()))?;
let meta = UpdateMeta::ClearDocuments;
let pending = update_store.register_update(meta, &[])?;
Ok(pending.into())
}
fn delete_documents(&self, index: impl AsRef<str>, document_ids: Vec<String>) -> anyhow::Result<super::UpdateStatus> {
let (_, update_store) = self.indexes.index(&index)?
.with_context(|| format!("Index {:?} doesn't exist", index.as_ref()))?;
let meta = UpdateMeta::DeleteDocuments;
let content = serde_json::to_vec(&document_ids)?;
let pending = update_store.register_update(meta, &content)?;
Ok(pending.into())
}
} }
fn update_primary_key(index: impl AsRef<Index>, primary_key: impl AsRef<str>) -> anyhow::Result<()> { fn update_primary_key(index: impl AsRef<Index>, primary_key: impl AsRef<str>) -> anyhow::Result<()> {

View File

@ -177,6 +177,27 @@ impl UpdateHandler {
Err(e) => Err(e.into()) Err(e) => Err(e.into())
} }
} }
fn delete_documents(
&self,
document_ids: &[u8],
update_builder: UpdateBuilder,
) -> anyhow::Result<UpdateResult> {
let ids: Vec<String> = serde_json::from_slice(document_ids)?;
let mut txn = self.index.write_txn()?;
let mut builder = update_builder.delete_documents(&mut txn, &self.index)?;
// We ignore unexisting document ids
ids.iter().for_each(|id| { builder.delete_external_id(id); });
match builder.execute() {
Ok(deleted) => txn
.commit()
.and(Ok(UpdateResult::DocumentDeletion { deleted }))
.map_err(Into::into),
Err(e) => Err(e.into())
}
}
} }
impl HandleUpdate<UpdateMeta, UpdateResult, String> for UpdateHandler { impl HandleUpdate<UpdateMeta, UpdateResult, String> for UpdateHandler {
@ -194,6 +215,7 @@ impl HandleUpdate<UpdateMeta, UpdateResult, String> for UpdateHandler {
let result = match meta.meta() { let result = match meta.meta() {
DocumentsAddition { method, format } => self.update_documents(*format, *method, content, update_builder), DocumentsAddition { method, format } => self.update_documents(*format, *method, content, update_builder),
ClearDocuments => self.clear_documents(update_builder), ClearDocuments => self.clear_documents(update_builder),
DeleteDocuments => self.delete_documents(content, update_builder),
Settings(settings) => self.update_settings(settings, update_builder), Settings(settings) => self.update_settings(settings, update_builder),
Facets(levels) => self.update_facets(levels, update_builder), Facets(levels) => self.update_facets(levels, update_builder),
}; };

View File

@ -254,6 +254,7 @@ where
/// Trying to abort an update that is currently being processed, an update /// Trying to abort an update that is currently being processed, an update
/// that as already been processed or which doesn't actually exist, will /// that as already been processed or which doesn't actually exist, will
/// return `None`. /// return `None`.
#[allow(dead_code)]
pub fn abort_update(&self, update_id: u64) -> heed::Result<Option<Aborted<M>>> { pub fn abort_update(&self, update_id: u64) -> heed::Result<Option<Aborted<M>>> {
let mut wtxn = self.env.write_txn()?; let mut wtxn = self.env.write_txn()?;
let key = BEU64::new(update_id); let key = BEU64::new(update_id);
@ -281,6 +282,7 @@ where
/// Aborts all the pending updates, and not the one being currently processed. /// Aborts all the pending updates, and not the one being currently processed.
/// Returns the update metas and ids that were successfully aborted. /// Returns the update metas and ids that were successfully aborted.
#[allow(dead_code)]
pub fn abort_pendings(&self) -> heed::Result<Vec<(u64, Aborted<M>)>> { pub fn abort_pendings(&self) -> heed::Result<Vec<(u64, Aborted<M>)>> {
let mut wtxn = self.env.write_txn()?; let mut wtxn = self.env.write_txn()?;
let mut aborted_updates = Vec::new(); let mut aborted_updates = Vec::new();

View File

@ -33,6 +33,7 @@ pub struct IndexMetadata {
pub enum UpdateMeta { pub enum UpdateMeta {
DocumentsAddition { method: IndexDocumentsMethod, format: UpdateFormat }, DocumentsAddition { method: IndexDocumentsMethod, format: UpdateFormat },
ClearDocuments, ClearDocuments,
DeleteDocuments,
Settings(Settings), Settings(Settings),
Facets(Facets), Facets(Facets),
} }
@ -94,6 +95,7 @@ impl Settings {
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub enum UpdateResult { pub enum UpdateResult {
DocumentsAddition(DocumentAdditionResult), DocumentsAddition(DocumentAdditionResult),
DocumentDeletion { deleted: usize },
Other, Other,
} }
@ -128,6 +130,12 @@ pub trait IndexController {
data: &[u8], data: &[u8],
) -> anyhow::Result<UpdateStatus>; ) -> anyhow::Result<UpdateStatus>;
/// Clear all documents in the given index.
fn clear_documents(&self, index: impl AsRef<str>) -> anyhow::Result<UpdateStatus>;
/// Delete all documents in `document_ids`.
fn delete_documents(&self, index: impl AsRef<str>, document_ids: Vec<String>) -> anyhow::Result<UpdateStatus>;
/// Updates an index settings. If the index does not exist, it will be created when the update /// Updates an index settings. If the index does not exist, it will be created when the update
/// is applied to the index. /// is applied to the index.
fn update_settings<S: AsRef<str>>(&self, index_uid: S, settings: Settings) -> anyhow::Result<UpdateStatus>; fn update_settings<S: AsRef<str>>(&self, index_uid: S, settings: Settings) -> anyhow::Result<UpdateStatus>;

View File

@ -30,8 +30,8 @@ type Document = IndexMap<String, Value>;
#[derive(Deserialize)] #[derive(Deserialize)]
struct DocumentParam { struct DocumentParam {
_index_uid: String, index_uid: String,
_document_id: String, document_id: String,
} }
pub fn services(cfg: &mut web::ServiceConfig) { pub fn services(cfg: &mut web::ServiceConfig) {
@ -60,10 +60,19 @@ async fn get_document(
wrap = "Authentication::Private" wrap = "Authentication::Private"
)] )]
async fn delete_document( async fn delete_document(
_data: web::Data<Data>, data: web::Data<Data>,
_path: web::Path<DocumentParam>, path: web::Path<DocumentParam>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
todo!() match data.delete_documents(path.index_uid.clone(), vec![path.document_id.clone()]).await {
Ok(result) => {
let json = serde_json::to_string(&result).unwrap();
Ok(HttpResponse::Ok().body(json))
}
Err(e) => {
error!("{}", e);
unimplemented!()
}
}
} }
#[derive(Deserialize)] #[derive(Deserialize)]
@ -160,17 +169,40 @@ async fn update_documents(
wrap = "Authentication::Private" wrap = "Authentication::Private"
)] )]
async fn delete_documents( async fn delete_documents(
_data: web::Data<Data>, data: web::Data<Data>,
_path: web::Path<IndexParam>, path: web::Path<IndexParam>,
_body: web::Json<Vec<Value>>, body: web::Json<Vec<Value>>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
todo!() let ids = body
.iter()
.map(|v| v.as_str().map(String::from).unwrap_or_else(|| v.to_string()))
.collect();
match data.delete_documents(path.index_uid.clone(), ids).await {
Ok(result) => {
let json = serde_json::to_string(&result).unwrap();
Ok(HttpResponse::Ok().body(json))
}
Err(e) => {
error!("{}", e);
unimplemented!()
}
}
} }
#[delete("/indexes/{index_uid}/documents", wrap = "Authentication::Private")] #[delete("/indexes/{index_uid}/documents", wrap = "Authentication::Private")]
async fn clear_all_documents( async fn clear_all_documents(
_data: web::Data<Data>, data: web::Data<Data>,
_path: web::Path<IndexParam>, path: web::Path<IndexParam>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
todo!() match data.clear_documents(path.index_uid.clone()).await {
Ok(update) => {
let json = serde_json::to_string(&update).unwrap();
Ok(HttpResponse::Ok().body(json))
}
Err(e) => {
error!("{}", e);
unimplemented!();
}
}
} }