From bbd685af5e84691da59b4d8f294ef1b5207557b5 Mon Sep 17 00:00:00 2001 From: ad hoc Date: Wed, 1 Jun 2022 09:53:07 +0200 Subject: [PATCH] move IndexResolver to real module --- meilisearch-lib/src/index_resolver/mod.rs | 574 +++++++++++----------- 1 file changed, 291 insertions(+), 283 deletions(-) diff --git a/meilisearch-lib/src/index_resolver/mod.rs b/meilisearch-lib/src/index_resolver/mod.rs index ac82f7a3d..7eb564376 100644 --- a/meilisearch-lib/src/index_resolver/mod.rs +++ b/meilisearch-lib/src/index_resolver/mod.rs @@ -27,6 +27,8 @@ use self::meta_store::IndexMeta; pub type HardStateIndexResolver = IndexResolver; +pub use real::IndexResolver; + /// An index uid is composed of only ascii alphanumeric characters, - and _, between 1 and 400 /// bytes long #[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] @@ -96,326 +98,332 @@ impl FromStr for IndexUid { } } -pub struct IndexResolver { - index_uuid_store: U, - index_store: I, - pub file_store: UpdateFileStore, -} +mod real { + use super::*; -impl IndexResolver { - pub fn load_dump( - src: impl AsRef, - dst: impl AsRef, - index_db_size: usize, - env: Arc, - indexer_opts: &IndexerOpts, - ) -> anyhow::Result<()> { - HeedMetaStore::load_dump(&src, env)?; - let indexes_path = src.as_ref().join("indexes"); - let indexes = indexes_path.read_dir()?; - let indexer_config = IndexerConfig::try_from(indexer_opts)?; - for index in indexes { - Index::load_dump(&index?.path(), &dst, index_db_size, &indexer_config)?; - } - - Ok(()) + pub struct IndexResolver { + index_uuid_store: U, + index_store: I, + pub file_store: UpdateFileStore, } -} -impl IndexResolver -where - U: IndexMetaStore, - I: IndexStore, -{ - pub fn new(index_uuid_store: U, index_store: I, file_store: UpdateFileStore) -> Self { - Self { - index_uuid_store, - index_store, - file_store, + impl IndexResolver { + pub fn load_dump( + src: impl AsRef, + dst: impl AsRef, + index_db_size: usize, + env: Arc, + indexer_opts: &IndexerOpts, + ) -> anyhow::Result<()> { + HeedMetaStore::load_dump(&src, env)?; + let indexes_path = src.as_ref().join("indexes"); + let indexes = indexes_path.read_dir()?; + let indexer_config = IndexerConfig::try_from(indexer_opts)?; + for index in indexes { + Index::load_dump(&index?.path(), &dst, index_db_size, &indexer_config)?; + } + + Ok(()) } } - pub async fn process_document_addition_batch(&self, mut tasks: Vec) -> Vec { - fn get_content_uuid(task: &Task) -> Uuid { - match task { - Task { - content: TaskContent::DocumentAddition { content_uuid, .. }, - .. - } => *content_uuid, - _ => panic!("unexpected task in the document addition batch"), + impl IndexResolver + where + U: IndexMetaStore, + I: IndexStore, + { + pub fn new(index_uuid_store: U, index_store: I, file_store: UpdateFileStore) -> Self { + Self { + index_uuid_store, + index_store, + file_store, } } - let content_uuids = tasks.iter().map(get_content_uuid).collect::>(); - - match tasks.first() { - Some(Task { - id, - content: - TaskContent::DocumentAddition { - merge_strategy, - primary_key, - allow_index_creation, - index_uid, + pub async fn process_document_addition_batch(&self, mut tasks: Vec) -> Vec { + fn get_content_uuid(task: &Task) -> Uuid { + match task { + Task { + content: TaskContent::DocumentAddition { content_uuid, .. }, .. - }, - .. - }) => { - let primary_key = primary_key.clone(); - let method = *merge_strategy; + } => *content_uuid, + _ => panic!("unexpected task in the document addition batch"), + } + } - let index = if *allow_index_creation { - self.get_or_create_index(index_uid.clone(), *id).await - } else { - self.get_index(index_uid.as_str().to_string()).await - }; + let content_uuids = tasks.iter().map(get_content_uuid).collect::>(); - // If the index doesn't exist and we are not allowed to create it with the first - // task, we must fails the whole batch. - let now = OffsetDateTime::now_utc(); - let index = match index { - Ok(index) => index, - Err(e) => { - let error = ResponseError::from(e); - for task in tasks.iter_mut() { - task.events.push(TaskEvent::Failed { - error: error.clone(), - timestamp: now, - }); - } - return tasks; - } - }; - - let file_store = self.file_store.clone(); - let result = spawn_blocking(move || { - index.update_documents( - method, - primary_key, - file_store, - content_uuids.into_iter(), - ) - }) - .await; - - let event = match result { - Ok(Ok(result)) => TaskEvent::Succeeded { - timestamp: OffsetDateTime::now_utc(), - result: TaskResult::DocumentAddition { - indexed_documents: result.indexed_documents, + match tasks.first() { + Some(Task { + id, + content: + TaskContent::DocumentAddition { + merge_strategy, + primary_key, + allow_index_creation, + index_uid, + .. }, - }, - Ok(Err(e)) => TaskEvent::Failed { - timestamp: OffsetDateTime::now_utc(), - error: e.into(), - }, - Err(e) => TaskEvent::Failed { - timestamp: OffsetDateTime::now_utc(), - error: IndexResolverError::from(e).into(), - }, - }; - - for task in tasks.iter_mut() { - task.events.push(event.clone()); - } - - tasks - } - _ => panic!("invalid batch!"), - } - } - - pub async fn process_task(&self, task: &Task) -> Result { - match &task.content { - TaskContent::DocumentAddition { .. } => panic!("updates should be handled by batch"), - TaskContent::DocumentDeletion { - deletion: DocumentDeletion::Ids(ids), - index_uid, - } => { - let ids = ids.clone(); - let index = self.get_index(index_uid.clone().into_inner()).await?; - - let DocumentDeletionResult { - deleted_documents, .. - } = spawn_blocking(move || index.delete_documents(&ids)).await??; - - Ok(TaskResult::DocumentDeletion { deleted_documents }) - } - TaskContent::DocumentDeletion { - deletion: DocumentDeletion::Clear, - index_uid, - } => { - let index = self.get_index(index_uid.clone().into_inner()).await?; - let deleted_documents = spawn_blocking(move || -> IndexResult { - let number_documents = index.stats()?.number_of_documents; - index.clear_documents()?; - Ok(number_documents) - }) - .await??; - - Ok(TaskResult::ClearAll { deleted_documents }) - } - TaskContent::SettingsUpdate { - settings, - is_deletion, - allow_index_creation, - index_uid, - } => { - let index = if *is_deletion || !*allow_index_creation { - self.get_index(index_uid.clone().into_inner()).await? - } else { - self.get_or_create_index(index_uid.clone(), task.id).await? - }; - - let settings = settings.clone(); - spawn_blocking(move || index.update_settings(&settings.check())).await??; - - Ok(TaskResult::Other) - } - TaskContent::IndexDeletion { index_uid } => { - let index = self.delete_index(index_uid.clone().into_inner()).await?; - - let deleted_documents = spawn_blocking(move || -> IndexResult { - Ok(index.stats()?.number_of_documents) - }) - .await??; - - Ok(TaskResult::ClearAll { deleted_documents }) - } - TaskContent::IndexCreation { - primary_key, - index_uid, - } => { - let index = self.create_index(index_uid.clone(), task.id).await?; - - if let Some(primary_key) = primary_key { + .. + }) => { let primary_key = primary_key.clone(); - spawn_blocking(move || index.update_primary_key(primary_key)).await??; - } + let method = *merge_strategy; - Ok(TaskResult::Other) - } - TaskContent::IndexUpdate { - primary_key, - index_uid, - } => { - let index = self.get_index(index_uid.clone().into_inner()).await?; + let index = if *allow_index_creation { + self.get_or_create_index(index_uid.clone(), *id).await + } else { + self.get_index(index_uid.as_str().to_string()).await + }; - if let Some(primary_key) = primary_key { - let primary_key = primary_key.clone(); - spawn_blocking(move || index.update_primary_key(primary_key)).await??; - } - - Ok(TaskResult::Other) - } - _ => unreachable!("Invalid task for index resolver"), - } - } - - pub async fn dump(&self, path: impl AsRef) -> Result<()> { - for (_, index) in self.list().await? { - index.dump(&path)?; - } - self.index_uuid_store.dump(path.as_ref().to_owned()).await?; - Ok(()) - } - - async fn create_index(&self, uid: IndexUid, creation_task_id: TaskId) -> Result { - match self.index_uuid_store.get(uid.into_inner()).await? { - (uid, Some(_)) => Err(IndexResolverError::IndexAlreadyExists(uid)), - (uid, None) => { - let uuid = Uuid::new_v4(); - let index = self.index_store.create(uuid).await?; - match self - .index_uuid_store - .insert( - uid, - IndexMeta { - uuid, - creation_task_id, - }, - ) - .await - { - Err(e) => { - match self.index_store.delete(uuid).await { - Ok(Some(index)) => { - index.close(); + // If the index doesn't exist and we are not allowed to create it with the first + // task, we must fails the whole batch. + let now = OffsetDateTime::now_utc(); + let index = match index { + Ok(index) => index, + Err(e) => { + let error = ResponseError::from(e); + for task in tasks.iter_mut() { + task.events.push(TaskEvent::Failed { + error: error.clone(), + timestamp: now, + }); } - Ok(None) => (), - Err(e) => log::error!("Error while deleting index: {:?}", e), + return tasks; } - Err(e) + }; + + let file_store = self.file_store.clone(); + let result = spawn_blocking(move || { + index.update_documents( + method, + primary_key, + file_store, + content_uuids.into_iter(), + ) + }) + .await; + + let event = match result { + Ok(Ok(result)) => TaskEvent::Succeeded { + timestamp: OffsetDateTime::now_utc(), + result: TaskResult::DocumentAddition { + indexed_documents: result.indexed_documents, + }, + }, + Ok(Err(e)) => TaskEvent::Failed { + timestamp: OffsetDateTime::now_utc(), + error: e.into(), + }, + Err(e) => TaskEvent::Failed { + timestamp: OffsetDateTime::now_utc(), + error: IndexResolverError::from(e).into(), + }, + }; + + for task in tasks.iter_mut() { + task.events.push(event.clone()); } - Ok(()) => Ok(index), + + tasks } + _ => panic!("invalid batch!"), } } - } - /// Get or create an index with name `uid`. - pub async fn get_or_create_index(&self, uid: IndexUid, task_id: TaskId) -> Result { - match self.create_index(uid, task_id).await { - Ok(index) => Ok(index), - Err(IndexResolverError::IndexAlreadyExists(uid)) => self.get_index(uid).await, - Err(e) => Err(e), + pub async fn process_task(&self, task: &Task) -> Result { + match &task.content { + TaskContent::DocumentAddition { .. } => { + panic!("updates should be handled by batch") + } + TaskContent::DocumentDeletion { + deletion: DocumentDeletion::Ids(ids), + index_uid, + } => { + let ids = ids.clone(); + let index = self.get_index(index_uid.clone().into_inner()).await?; + + let DocumentDeletionResult { + deleted_documents, .. + } = spawn_blocking(move || index.delete_documents(&ids)).await??; + + Ok(TaskResult::DocumentDeletion { deleted_documents }) + } + TaskContent::DocumentDeletion { + deletion: DocumentDeletion::Clear, + index_uid, + } => { + let index = self.get_index(index_uid.clone().into_inner()).await?; + let deleted_documents = spawn_blocking(move || -> IndexResult { + let number_documents = index.stats()?.number_of_documents; + index.clear_documents()?; + Ok(number_documents) + }) + .await??; + + Ok(TaskResult::ClearAll { deleted_documents }) + } + TaskContent::SettingsUpdate { + settings, + is_deletion, + allow_index_creation, + index_uid, + } => { + let index = if *is_deletion || !*allow_index_creation { + self.get_index(index_uid.clone().into_inner()).await? + } else { + self.get_or_create_index(index_uid.clone(), task.id).await? + }; + + let settings = settings.clone(); + spawn_blocking(move || index.update_settings(&settings.check())).await??; + + Ok(TaskResult::Other) + } + TaskContent::IndexDeletion { index_uid } => { + let index = self.delete_index(index_uid.clone().into_inner()).await?; + + let deleted_documents = spawn_blocking(move || -> IndexResult { + Ok(index.stats()?.number_of_documents) + }) + .await??; + + Ok(TaskResult::ClearAll { deleted_documents }) + } + TaskContent::IndexCreation { + primary_key, + index_uid, + } => { + let index = self.create_index(index_uid.clone(), task.id).await?; + + if let Some(primary_key) = primary_key { + let primary_key = primary_key.clone(); + spawn_blocking(move || index.update_primary_key(primary_key)).await??; + } + + Ok(TaskResult::Other) + } + TaskContent::IndexUpdate { + primary_key, + index_uid, + } => { + let index = self.get_index(index_uid.clone().into_inner()).await?; + + if let Some(primary_key) = primary_key { + let primary_key = primary_key.clone(); + spawn_blocking(move || index.update_primary_key(primary_key)).await??; + } + + Ok(TaskResult::Other) + } + _ => unreachable!("Invalid task for index resolver"), + } } - } - pub async fn list(&self) -> Result> { - let uuids = self.index_uuid_store.list().await?; - let mut indexes = Vec::new(); - for (name, IndexMeta { uuid, .. }) in uuids { - match self.index_store.get(uuid).await? { - Some(index) => indexes.push((name, index)), - None => { - // we found an unexisting index, we remove it from the uuid store - let _ = self.index_uuid_store.delete(name).await; + pub async fn dump(&self, path: impl AsRef) -> Result<()> { + for (_, index) in self.list().await? { + index.dump(&path)?; + } + self.index_uuid_store.dump(path.as_ref().to_owned()).await?; + Ok(()) + } + + async fn create_index(&self, uid: IndexUid, creation_task_id: TaskId) -> Result { + match self.index_uuid_store.get(uid.into_inner()).await? { + (uid, Some(_)) => Err(IndexResolverError::IndexAlreadyExists(uid)), + (uid, None) => { + let uuid = Uuid::new_v4(); + let index = self.index_store.create(uuid).await?; + match self + .index_uuid_store + .insert( + uid, + IndexMeta { + uuid, + creation_task_id, + }, + ) + .await + { + Err(e) => { + match self.index_store.delete(uuid).await { + Ok(Some(index)) => { + index.close(); + } + Ok(None) => (), + Err(e) => log::error!("Error while deleting index: {:?}", e), + } + Err(e) + } + Ok(()) => Ok(index), + } } } } - Ok(indexes) - } - - pub async fn delete_index(&self, uid: String) -> Result { - match self.index_uuid_store.delete(uid.clone()).await? { - Some(IndexMeta { uuid, .. }) => match self.index_store.delete(uuid).await? { - Some(index) => { - index.clone().close(); - Ok(index) - } - None => Err(IndexResolverError::UnexistingIndex(uid)), - }, - None => Err(IndexResolverError::UnexistingIndex(uid)), + /// Get or create an index with name `uid`. + pub async fn get_or_create_index(&self, uid: IndexUid, task_id: TaskId) -> Result { + match self.create_index(uid, task_id).await { + Ok(index) => Ok(index), + Err(IndexResolverError::IndexAlreadyExists(uid)) => self.get_index(uid).await, + Err(e) => Err(e), + } } - } - pub async fn get_index(&self, uid: String) -> Result { - match self.index_uuid_store.get(uid).await? { - (name, Some(IndexMeta { uuid, .. })) => { + pub async fn list(&self) -> Result> { + let uuids = self.index_uuid_store.list().await?; + let mut indexes = Vec::new(); + for (name, IndexMeta { uuid, .. }) in uuids { match self.index_store.get(uuid).await? { - Some(index) => Ok(index), + Some(index) => indexes.push((name, index)), None => { - // For some reason we got a uuid to an unexisting index, we return an error, - // and remove the uuid from the uuid store. - let _ = self.index_uuid_store.delete(name.clone()).await; - Err(IndexResolverError::UnexistingIndex(name)) + // we found an unexisting index, we remove it from the uuid store + let _ = self.index_uuid_store.delete(name).await; } } } - (name, _) => Err(IndexResolverError::UnexistingIndex(name)), - } - } - pub async fn get_index_creation_task_id(&self, index_uid: String) -> Result { - let (uid, meta) = self.index_uuid_store.get(index_uid).await?; - meta.map( - |IndexMeta { - creation_task_id, .. - }| creation_task_id, - ) - .ok_or(IndexResolverError::UnexistingIndex(uid)) + Ok(indexes) + } + + pub async fn delete_index(&self, uid: String) -> Result { + match self.index_uuid_store.delete(uid.clone()).await? { + Some(IndexMeta { uuid, .. }) => match self.index_store.delete(uuid).await? { + Some(index) => { + index.clone().close(); + Ok(index) + } + None => Err(IndexResolverError::UnexistingIndex(uid)), + }, + None => Err(IndexResolverError::UnexistingIndex(uid)), + } + } + + pub async fn get_index(&self, uid: String) -> Result { + match self.index_uuid_store.get(uid).await? { + (name, Some(IndexMeta { uuid, .. })) => { + match self.index_store.get(uuid).await? { + Some(index) => Ok(index), + None => { + // For some reason we got a uuid to an unexisting index, we return an error, + // and remove the uuid from the uuid store. + let _ = self.index_uuid_store.delete(name.clone()).await; + Err(IndexResolverError::UnexistingIndex(name)) + } + } + } + (name, _) => Err(IndexResolverError::UnexistingIndex(name)), + } + } + + pub async fn get_index_creation_task_id(&self, index_uid: String) -> Result { + let (uid, meta) = self.index_uuid_store.get(index_uid).await?; + meta.map( + |IndexMeta { + creation_task_id, .. + }| creation_task_id, + ) + .ok_or(IndexResolverError::UnexistingIndex(uid)) + } } }