From 6832bde1f5f4ebcd35aa31bf7e51841602a9b279 Mon Sep 17 00:00:00 2001 From: Tamo Date: Thu, 4 May 2023 20:25:22 +0200 Subject: [PATCH] implement a first version of the clear indexes --- dump/src/lib.rs | 6 +- index-scheduler/src/autobatcher.rs | 9 +- index-scheduler/src/batch.rs | 21 ++++ index-scheduler/src/index_mapper/mod.rs | 124 +++++++++++++++--------- index-scheduler/src/lib.rs | 1 + index-scheduler/src/utils.rs | 1 + meilisearch-types/src/tasks.rs | 15 ++- meilisearch/src/routes/indexes/mod.rs | 19 +++- 8 files changed, 143 insertions(+), 53 deletions(-) diff --git a/dump/src/lib.rs b/dump/src/lib.rs index ed76d708e..cd1418a92 100644 --- a/dump/src/lib.rs +++ b/dump/src/lib.rs @@ -110,6 +110,9 @@ pub enum KindDump { allow_index_creation: bool, }, IndexDeletion, + IndexClear { + index_uids: Vec, + }, IndexCreation { primary_key: Option, }, @@ -180,6 +183,7 @@ impl From for KindDump { .. } => KindDump::Settings { settings: new_settings, is_deletion, allow_index_creation }, KindWithContent::IndexDeletion { .. } => KindDump::IndexDeletion, + KindWithContent::IndexClear { index_uids } => KindDump::IndexClear { index_uids }, KindWithContent::IndexCreation { primary_key, .. } => { KindDump::IndexCreation { primary_key } } @@ -211,8 +215,8 @@ pub(crate) mod test { use maplit::btreeset; use meilisearch_types::index_uid_pattern::IndexUidPattern; use meilisearch_types::keys::{Action, Key}; + use meilisearch_types::milli; use meilisearch_types::milli::update::Setting; - use meilisearch_types::milli::{self}; use meilisearch_types::settings::{Checked, Settings}; use meilisearch_types::tasks::{Details, Status}; use serde_json::{json, Map, Value}; diff --git a/index-scheduler/src/autobatcher.rs b/index-scheduler/src/autobatcher.rs index d738cc5e4..5d5875f9f 100644 --- a/index-scheduler/src/autobatcher.rs +++ b/index-scheduler/src/autobatcher.rs @@ -32,6 +32,7 @@ enum AutobatchKind { }, IndexCreation, IndexDeletion, + IndexClear, IndexUpdate, IndexSwap, } @@ -74,6 +75,7 @@ impl From for AutobatchKind { } } KindWithContent::IndexDeletion { .. } => AutobatchKind::IndexDeletion, + KindWithContent::IndexClear { .. } => AutobatchKind::IndexClear, KindWithContent::IndexCreation { .. } => AutobatchKind::IndexCreation, KindWithContent::IndexUpdate { .. } => AutobatchKind::IndexUpdate, KindWithContent::IndexSwap { .. } => AutobatchKind::IndexSwap, @@ -123,6 +125,9 @@ pub enum BatchKind { IndexDeletion { ids: Vec, }, + IndexClear { + id: TaskId, + }, IndexCreation { id: TaskId, }, @@ -173,6 +178,7 @@ impl BatchKind { match AutobatchKind::from(kind) { K::IndexCreation => (Break(BatchKind::IndexCreation { id: task_id }), true), K::IndexDeletion => (Break(BatchKind::IndexDeletion { ids: vec![task_id] }), false), + K::IndexClear => (Break(BatchKind::IndexClear { id: task_id }), false), K::IndexUpdate => (Break(BatchKind::IndexUpdate { id: task_id }), false), K::IndexSwap => (Break(BatchKind::IndexSwap { id: task_id }), false), K::DocumentClear => (Continue(BatchKind::DocumentClear { ids: vec![task_id] }), false), @@ -222,7 +228,7 @@ impl BatchKind { match (self, kind) { // We don't batch any of these operations - (this, K::IndexCreation | K::IndexUpdate | K::IndexSwap | K::DocumentDeletionByFilter) => Break(this), + (this, K::IndexCreation | K::IndexUpdate | K::IndexClear | K::IndexSwap | K::DocumentDeletionByFilter) => Break(this), // We must not batch tasks that don't have the same index creation rights if the index doesn't already exists. (this, kind) if !index_already_exists && this.allow_index_creation() == Some(false) && kind.allow_index_creation() == Some(true) => { Break(this) @@ -480,6 +486,7 @@ impl BatchKind { ( BatchKind::IndexCreation { .. } | BatchKind::IndexDeletion { .. } + | BatchKind::IndexClear { .. } | BatchKind::IndexUpdate { .. } | BatchKind::IndexSwap { .. } | BatchKind::DocumentDeletionByFilter { .. }, diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index c88234809..aada939cd 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -85,6 +85,10 @@ pub(crate) enum Batch { tasks: Vec, index_has_been_created: bool, }, + IndexClear { + index_uids: Vec, + task: Task, + }, IndexSwap { task: Task, }, @@ -154,6 +158,7 @@ impl Batch { | Batch::TaskDeletion(task) | Batch::Dump(task) | Batch::IndexCreation { task, .. } + | Batch::IndexClear { task, .. } | Batch::IndexDocumentDeletionByFilter { task, .. } | Batch::IndexUpdate { task, .. } => vec![task.uid], Batch::SnapshotCreation(tasks) | Batch::IndexDeletion { tasks, .. } => { @@ -189,6 +194,7 @@ impl Batch { | TaskDeletion(_) | SnapshotCreation(_) | Dump(_) + | IndexClear { .. } | IndexSwap { .. } => None, IndexOperation { op, .. } => Some(op.index_uid()), IndexCreation { index_uid, .. } @@ -453,6 +459,14 @@ impl IndexScheduler { index_has_been_created: must_create_index, tasks: self.get_existing_tasks(rtxn, ids)?, })), + BatchKind::IndexClear { id } => { + let task = self.get_task(rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?; + let index_uids = match &task.kind { + KindWithContent::IndexClear { index_uids } => index_uids.clone(), + _ => unreachable!(), + }; + Ok(Some(Batch::IndexClear { index_uids, task })) + } BatchKind::IndexSwap { id } => { let task = self.get_task(rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?; Ok(Some(Batch::IndexSwap { task })) @@ -1017,6 +1031,13 @@ impl IndexScheduler { Ok(tasks) } + Batch::IndexClear { index_uids, mut task } => { + let wtxn = self.env.write_txn()?; + self.index_mapper.delete_indexes(wtxn, index_uids, false)?; + task.status = Status::Succeeded; + + Ok(vec![task]) + } Batch::IndexSwap { mut task } => { let mut wtxn = self.env.write_txn()?; let swaps = if let KindWithContent::IndexSwap { swaps } = &task.kind { diff --git a/index-scheduler/src/index_mapper/mod.rs b/index-scheduler/src/index_mapper/mod.rs index 2bf6f46ad..e90651d89 100644 --- a/index-scheduler/src/index_mapper/mod.rs +++ b/index-scheduler/src/index_mapper/mod.rs @@ -173,19 +173,37 @@ impl IndexMapper { } } + pub fn delete_index(&self, wtxn: RwTxn, name: &str) -> Result<()> { + self.delete_indexes(wtxn, Some(name), true) + } + /// Removes the index from the mapping table and the in-memory index map /// but keeps the associated tasks. - pub fn delete_index(&self, mut wtxn: RwTxn, name: &str) -> Result<()> { - let uuid = self - .index_mapping - .get(&wtxn, name)? - .ok_or_else(|| Error::IndexNotFound(name.to_string()))?; + pub fn delete_indexes( + &self, + mut wtxn: RwTxn, + names: impl IntoIterator>, + error_on_missing_index: bool, + ) -> Result<()> { + let indexes = names + .into_iter() + .map(|name| { + let name = name.as_ref().to_string(); + let uuid = self + .index_mapping + .get(&wtxn, &name)? + .ok_or_else(|| Error::IndexNotFound(name.to_string()))?; + Ok((name, uuid)) + }) + .filter(|res| error_on_missing_index || res.is_ok()) + .collect::>>()?; - // Not an error if the index had no stats in cache. - self.index_stats.delete(&mut wtxn, &uuid)?; - - // Once we retrieved the UUID of the index we remove it from the mapping table. - assert!(self.index_mapping.delete(&mut wtxn, name)?); + for (name, uuid) in indexes.iter() { + // Not an error if the index had no stats in cache. + self.index_stats.delete(&mut wtxn, uuid)?; + // Once we retrieved the UUID of the index we remove it from the mapping table. + assert!(self.index_mapping.delete(&mut wtxn, name)?); + } wtxn.commit()?; @@ -203,51 +221,63 @@ impl IndexMapper { // This can not be caused by indexation because deleting an index happens in the scheduler itself, so cannot be concurrent with indexation. // // In these situations, reporting the error through a panic is in order. - let closing_event = loop { - let mut lock = self.index_map.write().unwrap(); - match lock.start_deletion(&uuid) { - Ok(env_closing) => break env_closing, - Err(Some(reopen)) => { - // drop the lock here so that we don't synchronously wait for the index to close. - drop(lock); - tries += 1; - if tries >= 100 { - panic!("Too many attempts to close index {name} prior to deletion.") + let indexes = indexes + .into_iter() + .map(|(name, uuid)| { + let closing_event = loop { + let mut lock = self.index_map.write().unwrap(); + match lock.start_deletion(&uuid) { + Ok(env_closing) => break env_closing, + Err(Some(reopen)) => { + // drop the lock here so that we don't synchronously wait for the index to close. + drop(lock); + tries += 1; + if tries >= 100 { + panic!("Too many attempts to close index {name} prior to deletion.") + } + let reopen = + if let Some(reopen) = reopen.wait_timeout(Duration::from_secs(6)) { + reopen + } else { + continue; + }; + reopen.close(&mut self.index_map.write().unwrap()); + continue; + } + // TODO: what is this case, what does that mean? + Err(None) => return None, } - let reopen = if let Some(reopen) = reopen.wait_timeout(Duration::from_secs(6)) { - reopen - } else { - continue; - }; - reopen.close(&mut self.index_map.write().unwrap()); - continue; - } - Err(None) => return Ok(()), - } - }; + }; + Some((name, uuid, closing_event)) + }) + .filter_map(|thingy| thingy) + .map(|(name, uuid, closing)| { + (name.to_string(), uuid, self.base_path.join(uuid.to_string()), closing) + }) + .collect::>(); let index_map = self.index_map.clone(); - let index_path = self.base_path.join(uuid.to_string()); - let index_name = name.to_string(); thread::Builder::new() .name(String::from("index_deleter")) .spawn(move || { - // We first wait to be sure that the previously opened index is effectively closed. - // This can take a lot of time, this is why we do that in a separate thread. - if let Some(closing_event) = closing_event { - closing_event.wait(); - } + for (name, uuid, index_path, closing_event) in indexes { + // We first wait to be sure that the previously opened index is effectively closed. + // This can take a lot of time, this is why we do that in a separate thread. + if let Some(closing_event) = closing_event { + closing_event.wait(); + } - // Then we remove the content from disk. - if let Err(e) = fs::remove_dir_all(&index_path) { - error!( - "An error happened when deleting the index {} ({}): {}", - index_name, uuid, e - ); - } + // Then we remove the content from disk. + if let Err(e) = fs::remove_dir_all(&index_path) { + error!( + "An error happened when deleting the index {} ({}): {}", + name, uuid, e + ); + } - // Finally we remove the entry from the index map. - index_map.write().unwrap().end_deletion(&uuid); + // Finally we remove the entry from the index map. + index_map.write().unwrap().end_deletion(&uuid); + } }) .unwrap(); diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index af20ba1ae..e7c809e87 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -1284,6 +1284,7 @@ impl<'a> Dump<'a> { KindDump::IndexDeletion => KindWithContent::IndexDeletion { index_uid: task.index_uid.ok_or(Error::CorruptedDump)?, }, + KindDump::IndexClear { index_uids } => KindWithContent::IndexClear { index_uids }, KindDump::IndexCreation { primary_key } => KindWithContent::IndexCreation { index_uid: task.index_uid.ok_or(Error::CorruptedDump)?, primary_key, diff --git a/index-scheduler/src/utils.rs b/index-scheduler/src/utils.rs index 97f437bed..ecef4f7c4 100644 --- a/index-scheduler/src/utils.rs +++ b/index-scheduler/src/utils.rs @@ -258,6 +258,7 @@ pub fn swap_index_uid_in_task(task: &mut Task, swap: (&str, &str)) { K::TaskCancelation { .. } | K::TaskDeletion { .. } | K::DumpCreation { .. } + | K::IndexClear { .. } | K::SnapshotCreation => (), }; if let Some(Details::IndexSwap { swaps }) = &mut task.details { diff --git a/meilisearch-types/src/tasks.rs b/meilisearch-types/src/tasks.rs index e746a53b8..cc1e684c8 100644 --- a/meilisearch-types/src/tasks.rs +++ b/meilisearch-types/src/tasks.rs @@ -46,6 +46,7 @@ impl Task { | SnapshotCreation | TaskCancelation { .. } | TaskDeletion { .. } + | IndexClear { .. } | IndexSwap { .. } => None, DocumentAdditionOrUpdate { index_uid, .. } | DocumentDeletion { index_uid, .. } @@ -72,6 +73,7 @@ impl Task { | KindWithContent::DocumentClear { .. } | KindWithContent::SettingsUpdate { .. } | KindWithContent::IndexDeletion { .. } + | KindWithContent::IndexClear { .. } | KindWithContent::IndexCreation { .. } | KindWithContent::IndexUpdate { .. } | KindWithContent::IndexSwap { .. } @@ -111,6 +113,9 @@ pub enum KindWithContent { is_deletion: bool, allow_index_creation: bool, }, + IndexClear { + index_uids: Vec, + }, IndexDeletion { index_uid: String, }, @@ -156,6 +161,7 @@ impl KindWithContent { KindWithContent::SettingsUpdate { .. } => Kind::SettingsUpdate, KindWithContent::IndexCreation { .. } => Kind::IndexCreation, KindWithContent::IndexDeletion { .. } => Kind::IndexDeletion, + KindWithContent::IndexClear { .. } => Kind::IndexDeletion, KindWithContent::IndexUpdate { .. } => Kind::IndexUpdate, KindWithContent::IndexSwap { .. } => Kind::IndexSwap, KindWithContent::TaskCancelation { .. } => Kind::TaskCancelation, @@ -181,6 +187,7 @@ impl KindWithContent { | IndexCreation { index_uid, .. } | IndexUpdate { index_uid, .. } | IndexDeletion { index_uid } => vec![index_uid], + IndexClear { index_uids } => index_uids.into_iter().map(|s| s.as_ref()).collect(), IndexSwap { swaps } => { let mut indexes = HashSet::<&str>::default(); for swap in swaps { @@ -214,7 +221,9 @@ impl KindWithContent { deleted_documents: None, }) } - KindWithContent::DocumentClear { .. } | KindWithContent::IndexDeletion { .. } => { + KindWithContent::DocumentClear { .. } + | KindWithContent::IndexDeletion { .. } + | KindWithContent::IndexClear { .. } => { Some(Details::ClearAll { deleted_documents: None }) } KindWithContent::SettingsUpdate { new_settings, .. } => { @@ -268,7 +277,7 @@ impl KindWithContent { KindWithContent::SettingsUpdate { new_settings, .. } => { Some(Details::SettingsUpdate { settings: new_settings.clone() }) } - KindWithContent::IndexDeletion { .. } => None, + KindWithContent::IndexDeletion { .. } | KindWithContent::IndexClear { .. } => None, KindWithContent::IndexCreation { primary_key, .. } | KindWithContent::IndexUpdate { primary_key, .. } => { Some(Details::IndexInfo { primary_key: primary_key.clone() }) @@ -307,7 +316,7 @@ impl From<&KindWithContent> for Option
{ KindWithContent::SettingsUpdate { new_settings, .. } => { Some(Details::SettingsUpdate { settings: new_settings.clone() }) } - KindWithContent::IndexDeletion { .. } => None, + KindWithContent::IndexDeletion { .. } | KindWithContent::IndexClear { .. } => None, KindWithContent::IndexCreation { primary_key, .. } => { Some(Details::IndexInfo { primary_key: primary_key.clone() }) } diff --git a/meilisearch/src/routes/indexes/mod.rs b/meilisearch/src/routes/indexes/mod.rs index ba925b3d5..a783826c5 100644 --- a/meilisearch/src/routes/indexes/mod.rs +++ b/meilisearch/src/routes/indexes/mod.rs @@ -31,7 +31,8 @@ pub fn configure(cfg: &mut web::ServiceConfig) { cfg.service( web::resource("") .route(web::get().to(list_indexes)) - .route(web::post().to(SeqHandler(create_index))), + .route(web::post().to(SeqHandler(create_index))) + .route(web::delete().to(SeqHandler(delete_all_indexes))), ) .service( web::scope("/{index_uid}") @@ -107,6 +108,22 @@ pub async fn list_indexes( Ok(HttpResponse::Ok().json(ret)) } +pub async fn delete_all_indexes( + index_scheduler: GuardedData, Data>, + _req: HttpRequest, + _analytics: web::Data, +) -> Result { + let filters = index_scheduler.filters(); + let indexes = index_scheduler.index_names()?; + let indexes = indexes.into_iter().filter(|uid| filters.is_index_authorized(uid)).collect(); + + let task = KindWithContent::IndexClear { index_uids: indexes }; + let task: SummarizedTaskView = + tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into(); + + Ok(HttpResponse::Accepted().json(task)) +} + #[derive(Deserr, Debug)] #[deserr(error = DeserrJsonError, rename_all = camelCase, deny_unknown_fields)] pub struct IndexCreateRequest {