diff --git a/meilisearch-lib/src/dump/handler.rs b/meilisearch-lib/src/dump/handler.rs index 4adb7011a..830bf4d0d 100644 --- a/meilisearch-lib/src/dump/handler.rs +++ b/meilisearch-lib/src/dump/handler.rs @@ -121,7 +121,6 @@ mod real { #[cfg(test)] mod test { - use std::marker::PhantomData; use std::path::PathBuf; use std::sync::Arc; @@ -137,12 +136,12 @@ mod test { pub enum MockDumpHandler { Real(super::real::DumpHandler), - Mock(Mocker, PhantomData<(U, I)>), + Mock(Mocker), } impl MockDumpHandler { pub fn mock(mocker: Mocker) -> Self { - Self::Mock(mocker, PhantomData) + Self::Mock(mocker) } } @@ -173,7 +172,7 @@ mod test { pub async fn run(&self, uid: String) -> Result<()> { match self { DumpHandler::Real(real) => real.run(uid).await, - DumpHandler::Mock(mocker, _) => unsafe { mocker.get("run").call(uid) }, + DumpHandler::Mock(mocker) => unsafe { mocker.get("run").call(uid) }, } } } diff --git a/meilisearch-lib/src/index_resolver/error.rs b/meilisearch-lib/src/index_resolver/error.rs index 6c86aa6b8..610ec6c7c 100644 --- a/meilisearch-lib/src/index_resolver/error.rs +++ b/meilisearch-lib/src/index_resolver/error.rs @@ -5,7 +5,7 @@ use tokio::sync::mpsc::error::SendError as MpscSendError; use tokio::sync::oneshot::error::RecvError as OneshotRecvError; use uuid::Uuid; -use crate::{error::MilliError, index::error::IndexError}; +use crate::{error::MilliError, index::error::IndexError, update_file_store::UpdateFileStoreError}; pub type Result = std::result::Result; @@ -49,7 +49,8 @@ internal_error!( uuid::Error, std::io::Error, tokio::task::JoinError, - serde_json::Error + serde_json::Error, + UpdateFileStoreError ); impl ErrorCode for IndexResolverError { diff --git a/meilisearch-lib/src/index_resolver/mod.rs b/meilisearch-lib/src/index_resolver/mod.rs index ac82f7a3d..32970fc37 100644 --- a/meilisearch-lib/src/index_resolver/mod.rs +++ b/meilisearch-lib/src/index_resolver/mod.rs @@ -27,6 +27,12 @@ use self::meta_store::IndexMeta; pub type HardStateIndexResolver = IndexResolver; +#[cfg(not(test))] +pub use real::IndexResolver; + +#[cfg(test)] +pub use test::MockIndexResolver as IndexResolver; + /// An index uid is composed of only ascii alphanumeric characters, - and _, between 1 and 400 /// bytes long #[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] @@ -96,349 +102,448 @@ impl FromStr for IndexUid { } } -pub struct IndexResolver { - index_uuid_store: U, - index_store: I, - pub file_store: UpdateFileStore, -} +mod real { + use super::*; -impl IndexResolver { - pub fn load_dump( - src: impl AsRef, - dst: impl AsRef, - index_db_size: usize, - env: Arc, - indexer_opts: &IndexerOpts, - ) -> anyhow::Result<()> { - HeedMetaStore::load_dump(&src, env)?; - let indexes_path = src.as_ref().join("indexes"); - let indexes = indexes_path.read_dir()?; - let indexer_config = IndexerConfig::try_from(indexer_opts)?; - for index in indexes { - Index::load_dump(&index?.path(), &dst, index_db_size, &indexer_config)?; - } - - Ok(()) + pub struct IndexResolver { + pub(super) index_uuid_store: U, + pub(super) index_store: I, + pub(super) file_store: UpdateFileStore, } -} -impl IndexResolver -where - U: IndexMetaStore, - I: IndexStore, -{ - pub fn new(index_uuid_store: U, index_store: I, file_store: UpdateFileStore) -> Self { - Self { - index_uuid_store, - index_store, - file_store, + impl IndexResolver { + pub fn load_dump( + src: impl AsRef, + dst: impl AsRef, + index_db_size: usize, + env: Arc, + indexer_opts: &IndexerOpts, + ) -> anyhow::Result<()> { + HeedMetaStore::load_dump(&src, env)?; + let indexes_path = src.as_ref().join("indexes"); + let indexes = indexes_path.read_dir()?; + let indexer_config = IndexerConfig::try_from(indexer_opts)?; + for index in indexes { + Index::load_dump(&index?.path(), &dst, index_db_size, &indexer_config)?; + } + + Ok(()) } } - pub async fn process_document_addition_batch(&self, mut tasks: Vec) -> Vec { - fn get_content_uuid(task: &Task) -> Uuid { - match task { - Task { - content: TaskContent::DocumentAddition { content_uuid, .. }, - .. - } => *content_uuid, - _ => panic!("unexpected task in the document addition batch"), + impl IndexResolver + where + U: IndexMetaStore, + I: IndexStore, + { + pub fn new(index_uuid_store: U, index_store: I, file_store: UpdateFileStore) -> Self { + Self { + index_uuid_store, + index_store, + file_store, } } - let content_uuids = tasks.iter().map(get_content_uuid).collect::>(); - - match tasks.first() { - Some(Task { - id, - content: - TaskContent::DocumentAddition { - merge_strategy, - primary_key, - allow_index_creation, - index_uid, + pub async fn process_document_addition_batch(&self, mut tasks: Vec) -> Vec { + fn get_content_uuid(task: &Task) -> Uuid { + match task { + Task { + content: TaskContent::DocumentAddition { content_uuid, .. }, .. - }, - .. - }) => { - let primary_key = primary_key.clone(); - let method = *merge_strategy; + } => *content_uuid, + _ => panic!("unexpected task in the document addition batch"), + } + } - let index = if *allow_index_creation { - self.get_or_create_index(index_uid.clone(), *id).await - } else { - self.get_index(index_uid.as_str().to_string()).await - }; + let content_uuids = tasks.iter().map(get_content_uuid).collect::>(); - // If the index doesn't exist and we are not allowed to create it with the first - // task, we must fails the whole batch. - let now = OffsetDateTime::now_utc(); - let index = match index { - Ok(index) => index, - Err(e) => { - let error = ResponseError::from(e); - for task in tasks.iter_mut() { - task.events.push(TaskEvent::Failed { - error: error.clone(), - timestamp: now, - }); - } - return tasks; - } - }; - - let file_store = self.file_store.clone(); - let result = spawn_blocking(move || { - index.update_documents( - method, - primary_key, - file_store, - content_uuids.into_iter(), - ) - }) - .await; - - let event = match result { - Ok(Ok(result)) => TaskEvent::Succeeded { - timestamp: OffsetDateTime::now_utc(), - result: TaskResult::DocumentAddition { - indexed_documents: result.indexed_documents, + match tasks.first() { + Some(Task { + id, + content: + TaskContent::DocumentAddition { + merge_strategy, + primary_key, + allow_index_creation, + index_uid, + .. }, - }, - Ok(Err(e)) => TaskEvent::Failed { - timestamp: OffsetDateTime::now_utc(), - error: e.into(), - }, - Err(e) => TaskEvent::Failed { - timestamp: OffsetDateTime::now_utc(), - error: IndexResolverError::from(e).into(), - }, - }; - - for task in tasks.iter_mut() { - task.events.push(event.clone()); - } - - tasks - } - _ => panic!("invalid batch!"), - } - } - - pub async fn process_task(&self, task: &Task) -> Result { - match &task.content { - TaskContent::DocumentAddition { .. } => panic!("updates should be handled by batch"), - TaskContent::DocumentDeletion { - deletion: DocumentDeletion::Ids(ids), - index_uid, - } => { - let ids = ids.clone(); - let index = self.get_index(index_uid.clone().into_inner()).await?; - - let DocumentDeletionResult { - deleted_documents, .. - } = spawn_blocking(move || index.delete_documents(&ids)).await??; - - Ok(TaskResult::DocumentDeletion { deleted_documents }) - } - TaskContent::DocumentDeletion { - deletion: DocumentDeletion::Clear, - index_uid, - } => { - let index = self.get_index(index_uid.clone().into_inner()).await?; - let deleted_documents = spawn_blocking(move || -> IndexResult { - let number_documents = index.stats()?.number_of_documents; - index.clear_documents()?; - Ok(number_documents) - }) - .await??; - - Ok(TaskResult::ClearAll { deleted_documents }) - } - TaskContent::SettingsUpdate { - settings, - is_deletion, - allow_index_creation, - index_uid, - } => { - let index = if *is_deletion || !*allow_index_creation { - self.get_index(index_uid.clone().into_inner()).await? - } else { - self.get_or_create_index(index_uid.clone(), task.id).await? - }; - - let settings = settings.clone(); - spawn_blocking(move || index.update_settings(&settings.check())).await??; - - Ok(TaskResult::Other) - } - TaskContent::IndexDeletion { index_uid } => { - let index = self.delete_index(index_uid.clone().into_inner()).await?; - - let deleted_documents = spawn_blocking(move || -> IndexResult { - Ok(index.stats()?.number_of_documents) - }) - .await??; - - Ok(TaskResult::ClearAll { deleted_documents }) - } - TaskContent::IndexCreation { - primary_key, - index_uid, - } => { - let index = self.create_index(index_uid.clone(), task.id).await?; - - if let Some(primary_key) = primary_key { + .. + }) => { let primary_key = primary_key.clone(); - spawn_blocking(move || index.update_primary_key(primary_key)).await??; - } + let method = *merge_strategy; - Ok(TaskResult::Other) - } - TaskContent::IndexUpdate { - primary_key, - index_uid, - } => { - let index = self.get_index(index_uid.clone().into_inner()).await?; + let index = if *allow_index_creation { + self.get_or_create_index(index_uid.clone(), *id).await + } else { + self.get_index(index_uid.as_str().to_string()).await + }; - if let Some(primary_key) = primary_key { - let primary_key = primary_key.clone(); - spawn_blocking(move || index.update_primary_key(primary_key)).await??; - } - - Ok(TaskResult::Other) - } - _ => unreachable!("Invalid task for index resolver"), - } - } - - pub async fn dump(&self, path: impl AsRef) -> Result<()> { - for (_, index) in self.list().await? { - index.dump(&path)?; - } - self.index_uuid_store.dump(path.as_ref().to_owned()).await?; - Ok(()) - } - - async fn create_index(&self, uid: IndexUid, creation_task_id: TaskId) -> Result { - match self.index_uuid_store.get(uid.into_inner()).await? { - (uid, Some(_)) => Err(IndexResolverError::IndexAlreadyExists(uid)), - (uid, None) => { - let uuid = Uuid::new_v4(); - let index = self.index_store.create(uuid).await?; - match self - .index_uuid_store - .insert( - uid, - IndexMeta { - uuid, - creation_task_id, - }, - ) - .await - { - Err(e) => { - match self.index_store.delete(uuid).await { - Ok(Some(index)) => { - index.close(); + // If the index doesn't exist and we are not allowed to create it with the first + // task, we must fails the whole batch. + let now = OffsetDateTime::now_utc(); + let index = match index { + Ok(index) => index, + Err(e) => { + let error = ResponseError::from(e); + for task in tasks.iter_mut() { + task.events.push(TaskEvent::Failed { + error: error.clone(), + timestamp: now, + }); } - Ok(None) => (), - Err(e) => log::error!("Error while deleting index: {:?}", e), + return tasks; } - Err(e) + }; + + let file_store = self.file_store.clone(); + let result = spawn_blocking(move || { + index.update_documents( + method, + primary_key, + file_store, + content_uuids.into_iter(), + ) + }) + .await; + + let event = match result { + Ok(Ok(result)) => TaskEvent::Succeeded { + timestamp: OffsetDateTime::now_utc(), + result: TaskResult::DocumentAddition { + indexed_documents: result.indexed_documents, + }, + }, + Ok(Err(e)) => TaskEvent::Failed { + timestamp: OffsetDateTime::now_utc(), + error: e.into(), + }, + Err(e) => TaskEvent::Failed { + timestamp: OffsetDateTime::now_utc(), + error: IndexResolverError::from(e).into(), + }, + }; + + for task in tasks.iter_mut() { + task.events.push(event.clone()); } - Ok(()) => Ok(index), + + tasks } + _ => panic!("invalid batch!"), } } - } - /// Get or create an index with name `uid`. - pub async fn get_or_create_index(&self, uid: IndexUid, task_id: TaskId) -> Result { - match self.create_index(uid, task_id).await { - Ok(index) => Ok(index), - Err(IndexResolverError::IndexAlreadyExists(uid)) => self.get_index(uid).await, - Err(e) => Err(e), + pub async fn delete_content_file(&self, content_uuid: Uuid) -> Result<()> { + self.file_store.delete(content_uuid).await?; + Ok(()) } - } - pub async fn list(&self) -> Result> { - let uuids = self.index_uuid_store.list().await?; - let mut indexes = Vec::new(); - for (name, IndexMeta { uuid, .. }) in uuids { - match self.index_store.get(uuid).await? { - Some(index) => indexes.push((name, index)), - None => { - // we found an unexisting index, we remove it from the uuid store - let _ = self.index_uuid_store.delete(name).await; + pub async fn process_task(&self, task: &Task) -> Result { + match &task.content { + TaskContent::DocumentAddition { .. } => { + panic!("updates should be handled by batch") + } + TaskContent::DocumentDeletion { + deletion: DocumentDeletion::Ids(ids), + index_uid, + } => { + let ids = ids.clone(); + let index = self.get_index(index_uid.clone().into_inner()).await?; + + let DocumentDeletionResult { + deleted_documents, .. + } = spawn_blocking(move || index.delete_documents(&ids)).await??; + + Ok(TaskResult::DocumentDeletion { deleted_documents }) + } + TaskContent::DocumentDeletion { + deletion: DocumentDeletion::Clear, + index_uid, + } => { + let index = self.get_index(index_uid.clone().into_inner()).await?; + let deleted_documents = spawn_blocking(move || -> IndexResult { + let number_documents = index.stats()?.number_of_documents; + index.clear_documents()?; + Ok(number_documents) + }) + .await??; + + Ok(TaskResult::ClearAll { deleted_documents }) + } + TaskContent::SettingsUpdate { + settings, + is_deletion, + allow_index_creation, + index_uid, + } => { + let index = if *is_deletion || !*allow_index_creation { + self.get_index(index_uid.clone().into_inner()).await? + } else { + self.get_or_create_index(index_uid.clone(), task.id).await? + }; + + let settings = settings.clone(); + spawn_blocking(move || index.update_settings(&settings.check())).await??; + + Ok(TaskResult::Other) + } + TaskContent::IndexDeletion { index_uid } => { + let index = self.delete_index(index_uid.clone().into_inner()).await?; + + let deleted_documents = spawn_blocking(move || -> IndexResult { + Ok(index.stats()?.number_of_documents) + }) + .await??; + + Ok(TaskResult::ClearAll { deleted_documents }) + } + TaskContent::IndexCreation { + primary_key, + index_uid, + } => { + let index = self.create_index(index_uid.clone(), task.id).await?; + + if let Some(primary_key) = primary_key { + let primary_key = primary_key.clone(); + spawn_blocking(move || index.update_primary_key(primary_key)).await??; + } + + Ok(TaskResult::Other) + } + TaskContent::IndexUpdate { + primary_key, + index_uid, + } => { + let index = self.get_index(index_uid.clone().into_inner()).await?; + + if let Some(primary_key) = primary_key { + let primary_key = primary_key.clone(); + spawn_blocking(move || index.update_primary_key(primary_key)).await??; + } + + Ok(TaskResult::Other) + } + _ => unreachable!("Invalid task for index resolver"), + } + } + + pub async fn dump(&self, path: impl AsRef) -> Result<()> { + for (_, index) in self.list().await? { + index.dump(&path)?; + } + self.index_uuid_store.dump(path.as_ref().to_owned()).await?; + Ok(()) + } + + async fn create_index(&self, uid: IndexUid, creation_task_id: TaskId) -> Result { + match self.index_uuid_store.get(uid.into_inner()).await? { + (uid, Some(_)) => Err(IndexResolverError::IndexAlreadyExists(uid)), + (uid, None) => { + let uuid = Uuid::new_v4(); + let index = self.index_store.create(uuid).await?; + match self + .index_uuid_store + .insert( + uid, + IndexMeta { + uuid, + creation_task_id, + }, + ) + .await + { + Err(e) => { + match self.index_store.delete(uuid).await { + Ok(Some(index)) => { + index.close(); + } + Ok(None) => (), + Err(e) => log::error!("Error while deleting index: {:?}", e), + } + Err(e) + } + Ok(()) => Ok(index), + } } } } - Ok(indexes) - } - - pub async fn delete_index(&self, uid: String) -> Result { - match self.index_uuid_store.delete(uid.clone()).await? { - Some(IndexMeta { uuid, .. }) => match self.index_store.delete(uuid).await? { - Some(index) => { - index.clone().close(); - Ok(index) - } - None => Err(IndexResolverError::UnexistingIndex(uid)), - }, - None => Err(IndexResolverError::UnexistingIndex(uid)), + /// Get or create an index with name `uid`. + pub async fn get_or_create_index(&self, uid: IndexUid, task_id: TaskId) -> Result { + match self.create_index(uid, task_id).await { + Ok(index) => Ok(index), + Err(IndexResolverError::IndexAlreadyExists(uid)) => self.get_index(uid).await, + Err(e) => Err(e), + } } - } - pub async fn get_index(&self, uid: String) -> Result { - match self.index_uuid_store.get(uid).await? { - (name, Some(IndexMeta { uuid, .. })) => { + pub async fn list(&self) -> Result> { + let uuids = self.index_uuid_store.list().await?; + let mut indexes = Vec::new(); + for (name, IndexMeta { uuid, .. }) in uuids { match self.index_store.get(uuid).await? { - Some(index) => Ok(index), + Some(index) => indexes.push((name, index)), None => { - // For some reason we got a uuid to an unexisting index, we return an error, - // and remove the uuid from the uuid store. - let _ = self.index_uuid_store.delete(name.clone()).await; - Err(IndexResolverError::UnexistingIndex(name)) + // we found an unexisting index, we remove it from the uuid store + let _ = self.index_uuid_store.delete(name).await; } } } - (name, _) => Err(IndexResolverError::UnexistingIndex(name)), - } - } - pub async fn get_index_creation_task_id(&self, index_uid: String) -> Result { - let (uid, meta) = self.index_uuid_store.get(index_uid).await?; - meta.map( - |IndexMeta { - creation_task_id, .. - }| creation_task_id, - ) - .ok_or(IndexResolverError::UnexistingIndex(uid)) + Ok(indexes) + } + + pub async fn delete_index(&self, uid: String) -> Result { + match self.index_uuid_store.delete(uid.clone()).await? { + Some(IndexMeta { uuid, .. }) => match self.index_store.delete(uuid).await? { + Some(index) => { + index.clone().close(); + Ok(index) + } + None => Err(IndexResolverError::UnexistingIndex(uid)), + }, + None => Err(IndexResolverError::UnexistingIndex(uid)), + } + } + + pub async fn get_index(&self, uid: String) -> Result { + match self.index_uuid_store.get(uid).await? { + (name, Some(IndexMeta { uuid, .. })) => { + match self.index_store.get(uuid).await? { + Some(index) => Ok(index), + None => { + // For some reason we got a uuid to an unexisting index, we return an error, + // and remove the uuid from the uuid store. + let _ = self.index_uuid_store.delete(name.clone()).await; + Err(IndexResolverError::UnexistingIndex(name)) + } + } + } + (name, _) => Err(IndexResolverError::UnexistingIndex(name)), + } + } + + pub async fn get_index_creation_task_id(&self, index_uid: String) -> Result { + let (uid, meta) = self.index_uuid_store.get(index_uid).await?; + meta.map( + |IndexMeta { + creation_task_id, .. + }| creation_task_id, + ) + .ok_or(IndexResolverError::UnexistingIndex(uid)) + } } } #[cfg(test)] mod test { - // use std::{collections::BTreeMap, vec::IntoIter}; - // - // use super::*; - // - // use futures::future::ok; - // use milli::update::{DocumentAdditionResult, IndexDocumentsMethod}; - // use nelson::Mocker; - // use proptest::prelude::*; - // - // use crate::{ - // index::{ - // error::{IndexError, Result as IndexResult}, - // Checked, IndexMeta, IndexStats, Settings, - // }, - // tasks::{batch::Batch, BatchHandler}, - // }; - // use index_store::MockIndexStore; - // use meta_store::MockIndexMetaStore; + use super::*; + + use nelson::Mocker; + + pub enum MockIndexResolver { + Real(super::real::IndexResolver), + Mock(Mocker), + } + + impl MockIndexResolver { + pub fn load_dump( + src: impl AsRef, + dst: impl AsRef, + index_db_size: usize, + env: Arc, + indexer_opts: &IndexerOpts, + ) -> anyhow::Result<()> { + super::real::IndexResolver::load_dump(src, dst, index_db_size, env, indexer_opts) + } + } + + impl MockIndexResolver + where + U: IndexMetaStore, + I: IndexStore, + { + pub fn new(index_uuid_store: U, index_store: I, file_store: UpdateFileStore) -> Self { + Self::Real(super::real::IndexResolver { + index_uuid_store, + index_store, + file_store, + }) + } + + pub fn mock(mocker: Mocker) -> Self { + Self::Mock(mocker) + } + + pub async fn process_document_addition_batch(&self, tasks: Vec) -> Vec { + match self { + IndexResolver::Real(r) => r.process_document_addition_batch(tasks).await, + IndexResolver::Mock(m) => unsafe { + m.get("process_document_addition_batch").call(tasks) + }, + } + } + + pub async fn process_task(&self, task: &Task) -> Result { + match self { + IndexResolver::Real(r) => r.process_task(task).await, + IndexResolver::Mock(m) => unsafe { m.get("process_task").call(task) }, + } + } + + pub async fn dump(&self, path: impl AsRef) -> Result<()> { + match self { + IndexResolver::Real(r) => r.dump(path).await, + IndexResolver::Mock(_) => todo!(), + } + } + + /// Get or create an index with name `uid`. + pub async fn get_or_create_index(&self, uid: IndexUid, task_id: TaskId) -> Result { + match self { + IndexResolver::Real(r) => r.get_or_create_index(uid, task_id).await, + IndexResolver::Mock(_) => todo!(), + } + } + + pub async fn list(&self) -> Result> { + match self { + IndexResolver::Real(r) => r.list().await, + IndexResolver::Mock(_) => todo!(), + } + } + + pub async fn delete_index(&self, uid: String) -> Result { + match self { + IndexResolver::Real(r) => r.delete_index(uid).await, + IndexResolver::Mock(_) => todo!(), + } + } + + pub async fn get_index(&self, uid: String) -> Result { + match self { + IndexResolver::Real(r) => r.get_index(uid).await, + IndexResolver::Mock(_) => todo!(), + } + } + + pub async fn get_index_creation_task_id(&self, index_uid: String) -> Result { + match self { + IndexResolver::Real(r) => r.get_index_creation_task_id(index_uid).await, + IndexResolver::Mock(_) => todo!(), + } + } + + pub async fn delete_content_file(&self, content_uuid: Uuid) -> Result<()> { + match self { + IndexResolver::Real(r) => r.delete_content_file(content_uuid).await, + IndexResolver::Mock(m) => unsafe { + m.get("delete_content_file").call(content_uuid) + }, + } + } + } // TODO: ignoring this test, it has become too complex to maintain, and rather implement // handler logic test. diff --git a/meilisearch-lib/src/tasks/handlers/index_resolver_handler.rs b/meilisearch-lib/src/tasks/handlers/index_resolver_handler.rs index 75f0623b2..de624106c 100644 --- a/meilisearch-lib/src/tasks/handlers/index_resolver_handler.rs +++ b/meilisearch-lib/src/tasks/handlers/index_resolver_handler.rs @@ -38,7 +38,7 @@ where if let BatchContent::DocumentsAdditionBatch(ref tasks) = batch.content { for task in tasks { if let Some(content_uuid) = task.get_content_uuid() { - if let Err(e) = self.file_store.delete(content_uuid).await { + if let Err(e) = self.delete_content_file(content_uuid).await { log::error!("error deleting update file: {}", e); } } @@ -49,7 +49,12 @@ where #[cfg(test)] mod test { - use crate::index_resolver::{index_store::MockIndexStore, meta_store::MockIndexMetaStore}; + use crate::index_resolver::index_store::MapIndexStore; + use crate::index_resolver::meta_store::HeedMetaStore; + use crate::index_resolver::{ + error::Result as IndexResult, index_store::MockIndexStore, meta_store::MockIndexMetaStore, + }; + use crate::tasks::task::TaskResult; use crate::tasks::{ handlers::test::task_to_batch, task::{Task, TaskContent}, @@ -142,5 +147,58 @@ mod test { index_resolver.process_batch(batch).await; } - // TODO: test perform_batch. We need a Mocker for IndexResolver. + proptest! { + #[test] + fn index_document_task_deletes_update_file( + task in any::(), + ) { + let rt = tokio::runtime::Runtime::new().unwrap(); + let handle = rt.spawn(async { + let mocker = Mocker::default(); + + if let TaskContent::DocumentAddition{ .. } = task.content { + mocker.when::>("delete_content_file").then(|_| Ok(())); + } + + let index_resolver: IndexResolver = IndexResolver::mock(mocker); + + let batch = task_to_batch(task); + + index_resolver.finish(&batch).await; + }); + + rt.block_on(handle).unwrap(); + } + + #[test] + fn test_handle_batch(task in any::()) { + let rt = tokio::runtime::Runtime::new().unwrap(); + let handle = rt.spawn(async { + let mocker = Mocker::default(); + match task.content { + TaskContent::DocumentAddition { .. } => { + mocker.when::, Vec>("process_document_addition_batch").then(|tasks| tasks); + } + TaskContent::Dump { .. } => (), + _ => { + mocker.when::<&Task, IndexResult>("process_task").then(|_| Ok(TaskResult::Other)); + } + } + let index_resolver: IndexResolver = IndexResolver::mock(mocker); + + + let batch = task_to_batch(task); + + if index_resolver.accept(&batch) { + index_resolver.process_batch(batch).await; + } + }); + + if let Err(e) = rt.block_on(handle) { + if e.is_panic() { + std::panic::resume_unwind(e.into_panic()); + } + } + } + } }