From 49d8fadb520c9edff37a06c3d67bf86589c1b9ec Mon Sep 17 00:00:00 2001 From: ad hoc Date: Wed, 25 May 2022 11:14:25 +0200 Subject: [PATCH] test dump handler --- meilisearch-lib/src/dump/handler.rs | 217 +++++++++++++----- meilisearch-lib/src/index_controller/mod.rs | 14 +- .../src/tasks/handlers/dump_handler.rs | 93 ++++++++ 3 files changed, 254 insertions(+), 70 deletions(-) diff --git a/meilisearch-lib/src/dump/handler.rs b/meilisearch-lib/src/dump/handler.rs index b168e162a..4adb7011a 100644 --- a/meilisearch-lib/src/dump/handler.rs +++ b/meilisearch-lib/src/dump/handler.rs @@ -1,18 +1,10 @@ -use std::{fs::File, path::PathBuf, sync::Arc}; +#[cfg(not(test))] +pub use real::DumpHandler; + +#[cfg(test)] +pub use test::MockDumpHandler as DumpHandler; -use log::{info, trace}; -use meilisearch_auth::AuthController; -use milli::heed::Env; use time::{macros::format_description, OffsetDateTime}; -use tokio::fs::create_dir_all; - -use crate::analytics; -use crate::compression::to_tar_gz; -use crate::dump::error::{DumpError, Result}; -use crate::dump::{MetadataVersion, META_FILE_NAME}; -use crate::index_resolver::{index_store::IndexStore, meta_store::IndexMetaStore, IndexResolver}; -use crate::tasks::TaskStore; -use crate::update_file_store::UpdateFileStore; /// Generate uid from creation date pub fn generate_uid() -> String { @@ -23,67 +15,166 @@ pub fn generate_uid() -> String { .unwrap() } -pub struct DumpHandler { - pub dump_path: PathBuf, - pub db_path: PathBuf, - pub update_file_store: UpdateFileStore, - pub task_store_size: usize, - pub index_db_size: usize, - pub env: Arc, - pub index_resolver: Arc>, -} +mod real { + use std::{fs::File, path::PathBuf, sync::Arc}; -impl DumpHandler -where - U: IndexMetaStore + Sync + Send + 'static, - I: IndexStore + Sync + Send + 'static, -{ - pub async fn run(&self, uid: String) -> Result<()> { - trace!("Performing dump."); + use log::{info, trace}; + use meilisearch_auth::AuthController; + use milli::heed::Env; + use tokio::fs::create_dir_all; - create_dir_all(&self.dump_path).await?; + use crate::analytics; + use crate::compression::to_tar_gz; + use crate::dump::error::{DumpError, Result}; + use crate::dump::{MetadataVersion, META_FILE_NAME}; + use crate::index_resolver::{ + index_store::IndexStore, meta_store::IndexMetaStore, IndexResolver, + }; + use crate::tasks::TaskStore; + use crate::update_file_store::UpdateFileStore; - let temp_dump_dir = tokio::task::spawn_blocking(tempfile::TempDir::new).await??; - let temp_dump_path = temp_dump_dir.path().to_owned(); + pub struct DumpHandler { + dump_path: PathBuf, + db_path: PathBuf, + update_file_store: UpdateFileStore, + task_store_size: usize, + index_db_size: usize, + env: Arc, + index_resolver: Arc>, + } - let meta = MetadataVersion::new_v5(self.index_db_size, self.task_store_size); - let meta_path = temp_dump_path.join(META_FILE_NAME); - // TODO: blocking - let mut meta_file = File::create(&meta_path)?; - serde_json::to_writer(&mut meta_file, &meta)?; - analytics::copy_user_id(&self.db_path, &temp_dump_path); + impl DumpHandler + where + U: IndexMetaStore + Sync + Send + 'static, + I: IndexStore + Sync + Send + 'static, + { + pub fn new( + dump_path: PathBuf, + db_path: PathBuf, + update_file_store: UpdateFileStore, + task_store_size: usize, + index_db_size: usize, + env: Arc, + index_resolver: Arc>, + ) -> Self { + Self { + dump_path, + db_path, + update_file_store, + task_store_size, + index_db_size, + env, + index_resolver, + } + } - create_dir_all(&temp_dump_path.join("indexes")).await?; + pub async fn run(&self, uid: String) -> Result<()> { + trace!("Performing dump."); - // TODO: this is blocking!! - AuthController::dump(&self.db_path, &temp_dump_path)?; - TaskStore::dump( - self.env.clone(), - &temp_dump_path, - self.update_file_store.clone(), - ) - .await?; - self.index_resolver.dump(&temp_dump_path).await?; + create_dir_all(&self.dump_path).await?; - let dump_path = self.dump_path.clone(); - let dump_path = tokio::task::spawn_blocking(move || -> Result { - // for now we simply copy the updates/updates_files - // FIXME: We may copy more files than necessary, if new files are added while we are - // performing the dump. We need a way to filter them out. + let temp_dump_dir = tokio::task::spawn_blocking(tempfile::TempDir::new).await??; + let temp_dump_path = temp_dump_dir.path().to_owned(); - let temp_dump_file = tempfile::NamedTempFile::new_in(&dump_path)?; - to_tar_gz(temp_dump_path, temp_dump_file.path()) - .map_err(|e| DumpError::Internal(e.into()))?; + let meta = MetadataVersion::new_v5(self.index_db_size, self.task_store_size); + let meta_path = temp_dump_path.join(META_FILE_NAME); + // TODO: blocking + let mut meta_file = File::create(&meta_path)?; + serde_json::to_writer(&mut meta_file, &meta)?; + analytics::copy_user_id(&self.db_path, &temp_dump_path); - let dump_path = dump_path.join(uid).with_extension("dump"); - temp_dump_file.persist(&dump_path)?; + create_dir_all(&temp_dump_path.join("indexes")).await?; - Ok(dump_path) - }) - .await??; + // TODO: this is blocking!! + AuthController::dump(&self.db_path, &temp_dump_path)?; + TaskStore::dump( + self.env.clone(), + &temp_dump_path, + self.update_file_store.clone(), + ) + .await?; + self.index_resolver.dump(&temp_dump_path).await?; - info!("Created dump in {:?}.", dump_path); + let dump_path = self.dump_path.clone(); + let dump_path = tokio::task::spawn_blocking(move || -> Result { + // for now we simply copy the updates/updates_files + // FIXME: We may copy more files than necessary, if new files are added while we are + // performing the dump. We need a way to filter them out. - Ok(()) + let temp_dump_file = tempfile::NamedTempFile::new_in(&dump_path)?; + to_tar_gz(temp_dump_path, temp_dump_file.path()) + .map_err(|e| DumpError::Internal(e.into()))?; + + let dump_path = dump_path.join(uid).with_extension("dump"); + temp_dump_file.persist(&dump_path)?; + + Ok(dump_path) + }) + .await??; + + info!("Created dump in {:?}.", dump_path); + + Ok(()) + } + } +} + +#[cfg(test)] +mod test { + use std::marker::PhantomData; + use std::path::PathBuf; + use std::sync::Arc; + + use milli::heed::Env; + use nelson::Mocker; + + use crate::dump::error::Result; + use crate::index_resolver::IndexResolver; + use crate::index_resolver::{index_store::IndexStore, meta_store::IndexMetaStore}; + use crate::update_file_store::UpdateFileStore; + + use super::*; + + pub enum MockDumpHandler { + Real(super::real::DumpHandler), + Mock(Mocker, PhantomData<(U, I)>), + } + + impl MockDumpHandler { + pub fn mock(mocker: Mocker) -> Self { + Self::Mock(mocker, PhantomData) + } + } + + impl MockDumpHandler + where + U: IndexMetaStore + Sync + Send + 'static, + I: IndexStore + Sync + Send + 'static, + { + pub fn new( + dump_path: PathBuf, + db_path: PathBuf, + update_file_store: UpdateFileStore, + task_store_size: usize, + index_db_size: usize, + env: Arc, + index_resolver: Arc>, + ) -> Self { + Self::Real(super::real::DumpHandler::new( + dump_path, + db_path, + update_file_store, + task_store_size, + index_db_size, + env, + index_resolver, + )) + } + pub async fn run(&self, uid: String) -> Result<()> { + match self { + DumpHandler::Real(real) => real.run(uid).await, + DumpHandler::Mock(mocker, _) => unsafe { mocker.get("run").call(uid) }, + } + } } } diff --git a/meilisearch-lib/src/index_controller/mod.rs b/meilisearch-lib/src/index_controller/mod.rs index b34523fd5..30a6b6dc8 100644 --- a/meilisearch-lib/src/index_controller/mod.rs +++ b/meilisearch-lib/src/index_controller/mod.rs @@ -222,15 +222,15 @@ impl IndexControllerBuilder { .dump_dst .ok_or_else(|| anyhow::anyhow!("Missing dump directory path"))?; - let dump_handler = Arc::new(DumpHandler { + let dump_handler = Arc::new(DumpHandler::new( dump_path, - db_path: db_path.as_ref().into(), - update_file_store: update_file_store.clone(), + db_path.as_ref().into(), + update_file_store.clone(), task_store_size, - index_db_size: index_size, - env: meta_env.clone(), - index_resolver: index_resolver.clone(), - }); + index_size, + meta_env.clone(), + index_resolver.clone(), + )); let task_store = TaskStore::new(meta_env)?; // register all the batch handlers for use with the scheduler. diff --git a/meilisearch-lib/src/tasks/handlers/dump_handler.rs b/meilisearch-lib/src/tasks/handlers/dump_handler.rs index fc506522f..e826242f4 100644 --- a/meilisearch-lib/src/tasks/handlers/dump_handler.rs +++ b/meilisearch-lib/src/tasks/handlers/dump_handler.rs @@ -39,3 +39,96 @@ where () } } + +#[cfg(test)] +mod test { + use crate::dump::error::{DumpError, Result as DumpResult}; + use crate::index_resolver::{index_store::MockIndexStore, meta_store::MockIndexMetaStore}; + use crate::tasks::handlers::test::task_to_batch; + + use super::*; + + use nelson::Mocker; + use proptest::prelude::*; + + proptest! { + #[test] + fn finish_does_nothing( + task in any::(), + ) { + let rt = tokio::runtime::Runtime::new().unwrap(); + let handle = rt.spawn(async { + let batch = task_to_batch(task); + + let mocker = Mocker::default(); + let dump_handler = DumpHandler::::mock(mocker); + + dump_handler.finish(&batch).await; + }); + + rt.block_on(handle).unwrap(); + } + + #[test] + fn test_handle_dump_success( + task in any::(), + ) { + let rt = tokio::runtime::Runtime::new().unwrap(); + let handle = rt.spawn(async { + let batch = task_to_batch(task); + let should_accept = matches!(batch.content, BatchContent::Dump { .. }); + + let mocker = Mocker::default(); + if should_accept { + mocker.when::>("run") + .once() + .then(|_| Ok(())); + } + + let dump_handler = DumpHandler::::mock(mocker); + + let accept = dump_handler.accept(&batch); + assert_eq!(accept, should_accept); + + if accept { + let batch = dump_handler.process_batch(batch).await; + let last_event = batch.content.first().unwrap().events.last().unwrap(); + assert!(matches!(last_event, TaskEvent::Succeded { .. })); + } + }); + + rt.block_on(handle).unwrap(); + } + + #[test] + fn test_handle_dump_error( + task in any::(), + ) { + let rt = tokio::runtime::Runtime::new().unwrap(); + let handle = rt.spawn(async { + let batch = task_to_batch(task); + let should_accept = matches!(batch.content, BatchContent::Dump { .. }); + + let mocker = Mocker::default(); + if should_accept { + mocker.when::>("run") + .once() + .then(|_| Err(DumpError::Internal("error".into()))); + } + + let dump_handler = DumpHandler::::mock(mocker); + + let accept = dump_handler.accept(&batch); + assert_eq!(accept, should_accept); + + if accept { + let batch = dump_handler.process_batch(batch).await; + let last_event = batch.content.first().unwrap().events.last().unwrap(); + assert!(matches!(last_event, TaskEvent::Failed { .. })); + } + }); + + rt.block_on(handle).unwrap(); + } + } +}