From 46cdc1770168ae8250b804b53cff33d845b8958e Mon Sep 17 00:00:00 2001 From: ad hoc Date: Thu, 19 May 2022 12:43:46 +0200 Subject: [PATCH] make scheduler accept multiple batch handlers --- meilisearch-http/src/task.rs | 2 +- .../dump_actor => dump}/actor.rs | 0 .../dump_actor => dump}/compat/mod.rs | 0 .../dump_actor => dump}/compat/v2.rs | 0 .../dump_actor => dump}/compat/v3.rs | 0 .../dump_actor => dump}/error.rs | 0 .../dump_actor => dump}/handle_impl.rs | 0 .../dump_actor => dump}/loaders/mod.rs | 0 .../dump_actor => dump}/loaders/v1.rs | 0 .../dump_actor => dump}/loaders/v2.rs | 4 +- .../dump_actor => dump}/loaders/v3.rs | 4 +- .../dump_actor => dump}/loaders/v4.rs | 2 +- .../dump_actor => dump}/message.rs | 0 .../dump_actor => dump}/mod.rs | 140 +++++----- meilisearch-lib/src/index_controller/error.rs | 2 +- meilisearch-lib/src/index_controller/mod.rs | 17 +- meilisearch-lib/src/index_resolver/mod.rs | 112 +------- meilisearch-lib/src/lib.rs | 1 + meilisearch-lib/src/snapshot.rs | 4 +- meilisearch-lib/src/tasks/batch.rs | 63 ++++- .../src/tasks/batch_handlers/empty_handler.rs | 20 ++ .../batch_handlers/index_resolver_handler.rs | 58 ++++ .../src/tasks/batch_handlers/mod.rs | 2 + meilisearch-lib/src/tasks/mod.rs | 11 +- meilisearch-lib/src/tasks/scheduler.rs | 259 ++++++++++++------ meilisearch-lib/src/tasks/task.rs | 31 +-- meilisearch-lib/src/tasks/task_store/mod.rs | 55 +++- meilisearch-lib/src/tasks/update_loop.rs | 71 +++-- 28 files changed, 484 insertions(+), 374 deletions(-) rename meilisearch-lib/src/{index_controller/dump_actor => dump}/actor.rs (100%) rename meilisearch-lib/src/{index_controller/dump_actor => dump}/compat/mod.rs (100%) rename meilisearch-lib/src/{index_controller/dump_actor => dump}/compat/v2.rs (100%) rename meilisearch-lib/src/{index_controller/dump_actor => dump}/compat/v3.rs (100%) rename meilisearch-lib/src/{index_controller/dump_actor => dump}/error.rs (100%) rename meilisearch-lib/src/{index_controller/dump_actor => dump}/handle_impl.rs (100%) rename meilisearch-lib/src/{index_controller/dump_actor => dump}/loaders/mod.rs (100%) rename meilisearch-lib/src/{index_controller/dump_actor => dump}/loaders/v1.rs (100%) rename meilisearch-lib/src/{index_controller/dump_actor => dump}/loaders/v2.rs (98%) rename meilisearch-lib/src/{index_controller/dump_actor => dump}/loaders/v3.rs (97%) rename meilisearch-lib/src/{index_controller/dump_actor => dump}/loaders/v4.rs (95%) rename meilisearch-lib/src/{index_controller/dump_actor => dump}/message.rs (100%) rename meilisearch-lib/src/{index_controller/dump_actor => dump}/mod.rs (81%) create mode 100644 meilisearch-lib/src/tasks/batch_handlers/empty_handler.rs create mode 100644 meilisearch-lib/src/tasks/batch_handlers/index_resolver_handler.rs create mode 100644 meilisearch-lib/src/tasks/batch_handlers/mod.rs diff --git a/meilisearch-http/src/task.rs b/meilisearch-http/src/task.rs index c2399f141..5a8542ff8 100644 --- a/meilisearch-http/src/task.rs +++ b/meilisearch-http/src/task.rs @@ -44,7 +44,7 @@ impl From for TaskType { TaskContent::IndexDeletion => TaskType::IndexDeletion, TaskContent::IndexCreation { .. } => TaskType::IndexCreation, TaskContent::IndexUpdate { .. } => TaskType::IndexUpdate, - TaskContent::Dump { path } => TaskType::Dump, + TaskContent::Dump { .. } => TaskType::Dump, _ => unreachable!("unexpected task type"), } } diff --git a/meilisearch-lib/src/index_controller/dump_actor/actor.rs b/meilisearch-lib/src/dump/actor.rs similarity index 100% rename from meilisearch-lib/src/index_controller/dump_actor/actor.rs rename to meilisearch-lib/src/dump/actor.rs diff --git a/meilisearch-lib/src/index_controller/dump_actor/compat/mod.rs b/meilisearch-lib/src/dump/compat/mod.rs similarity index 100% rename from meilisearch-lib/src/index_controller/dump_actor/compat/mod.rs rename to meilisearch-lib/src/dump/compat/mod.rs diff --git a/meilisearch-lib/src/index_controller/dump_actor/compat/v2.rs b/meilisearch-lib/src/dump/compat/v2.rs similarity index 100% rename from meilisearch-lib/src/index_controller/dump_actor/compat/v2.rs rename to meilisearch-lib/src/dump/compat/v2.rs diff --git a/meilisearch-lib/src/index_controller/dump_actor/compat/v3.rs b/meilisearch-lib/src/dump/compat/v3.rs similarity index 100% rename from meilisearch-lib/src/index_controller/dump_actor/compat/v3.rs rename to meilisearch-lib/src/dump/compat/v3.rs diff --git a/meilisearch-lib/src/index_controller/dump_actor/error.rs b/meilisearch-lib/src/dump/error.rs similarity index 100% rename from meilisearch-lib/src/index_controller/dump_actor/error.rs rename to meilisearch-lib/src/dump/error.rs diff --git a/meilisearch-lib/src/index_controller/dump_actor/handle_impl.rs b/meilisearch-lib/src/dump/handle_impl.rs similarity index 100% rename from meilisearch-lib/src/index_controller/dump_actor/handle_impl.rs rename to meilisearch-lib/src/dump/handle_impl.rs diff --git a/meilisearch-lib/src/index_controller/dump_actor/loaders/mod.rs b/meilisearch-lib/src/dump/loaders/mod.rs similarity index 100% rename from meilisearch-lib/src/index_controller/dump_actor/loaders/mod.rs rename to meilisearch-lib/src/dump/loaders/mod.rs diff --git a/meilisearch-lib/src/index_controller/dump_actor/loaders/v1.rs b/meilisearch-lib/src/dump/loaders/v1.rs similarity index 100% rename from meilisearch-lib/src/index_controller/dump_actor/loaders/v1.rs rename to meilisearch-lib/src/dump/loaders/v1.rs diff --git a/meilisearch-lib/src/index_controller/dump_actor/loaders/v2.rs b/meilisearch-lib/src/dump/loaders/v2.rs similarity index 98% rename from meilisearch-lib/src/index_controller/dump_actor/loaders/v2.rs rename to meilisearch-lib/src/dump/loaders/v2.rs index e2445913e..5926de931 100644 --- a/meilisearch-lib/src/index_controller/dump_actor/loaders/v2.rs +++ b/meilisearch-lib/src/dump/loaders/v2.rs @@ -5,8 +5,8 @@ use std::path::{Path, PathBuf}; use serde_json::{Deserializer, Value}; use tempfile::NamedTempFile; -use crate::index_controller::dump_actor::compat::{self, v2, v3}; -use crate::index_controller::dump_actor::Metadata; +use crate::dump::compat::{self, v2, v3}; +use crate::dump::Metadata; use crate::options::IndexerOpts; /// The dump v2 reads the dump folder and patches all the needed file to make it compatible with a diff --git a/meilisearch-lib/src/index_controller/dump_actor/loaders/v3.rs b/meilisearch-lib/src/dump/loaders/v3.rs similarity index 97% rename from meilisearch-lib/src/index_controller/dump_actor/loaders/v3.rs rename to meilisearch-lib/src/dump/loaders/v3.rs index 902691511..0a2ea438b 100644 --- a/meilisearch-lib/src/index_controller/dump_actor/loaders/v3.rs +++ b/meilisearch-lib/src/dump/loaders/v3.rs @@ -9,8 +9,8 @@ use log::info; use tempfile::tempdir; use uuid::Uuid; -use crate::index_controller::dump_actor::compat::v3; -use crate::index_controller::dump_actor::Metadata; +use crate::dump::compat::v3; +use crate::dump::Metadata; use crate::index_resolver::meta_store::{DumpEntry, IndexMeta}; use crate::options::IndexerOpts; use crate::tasks::task::{Task, TaskId}; diff --git a/meilisearch-lib/src/index_controller/dump_actor/loaders/v4.rs b/meilisearch-lib/src/dump/loaders/v4.rs similarity index 95% rename from meilisearch-lib/src/index_controller/dump_actor/loaders/v4.rs rename to meilisearch-lib/src/dump/loaders/v4.rs index 38d61f146..c898f83b1 100644 --- a/meilisearch-lib/src/index_controller/dump_actor/loaders/v4.rs +++ b/meilisearch-lib/src/dump/loaders/v4.rs @@ -6,7 +6,7 @@ use meilisearch_auth::AuthController; use milli::heed::EnvOpenOptions; use crate::analytics; -use crate::index_controller::dump_actor::Metadata; +use crate::dump::Metadata; use crate::index_resolver::IndexResolver; use crate::options::IndexerOpts; use crate::tasks::TaskStore; diff --git a/meilisearch-lib/src/index_controller/dump_actor/message.rs b/meilisearch-lib/src/dump/message.rs similarity index 100% rename from meilisearch-lib/src/index_controller/dump_actor/message.rs rename to meilisearch-lib/src/dump/message.rs diff --git a/meilisearch-lib/src/index_controller/dump_actor/mod.rs b/meilisearch-lib/src/dump/mod.rs similarity index 81% rename from meilisearch-lib/src/index_controller/dump_actor/mod.rs rename to meilisearch-lib/src/dump/mod.rs index 00be3a371..bc717b35e 100644 --- a/meilisearch-lib/src/index_controller/dump_actor/mod.rs +++ b/meilisearch-lib/src/dump/mod.rs @@ -3,28 +3,24 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; use anyhow::bail; -use log::{info, trace}; +use log::info; use serde::{Deserialize, Serialize}; use time::OffsetDateTime; pub use actor::DumpActor; pub use handle_impl::*; -use meilisearch_auth::AuthController; pub use message::DumpMsg; use tempfile::TempDir; -use tokio::fs::create_dir_all; -use tokio::sync::{oneshot, RwLock}; +use tokio::sync::RwLock; -use crate::analytics; -use crate::compression::{from_tar_gz, to_tar_gz}; -use crate::index_controller::dump_actor::error::DumpActorError; -use crate::index_controller::dump_actor::loaders::{v2, v3, v4}; +use crate::compression::from_tar_gz; use crate::options::IndexerOpts; -use crate::tasks::task::Job; use crate::tasks::Scheduler; use crate::update_file_store::UpdateFileStore; use error::Result; +use self::loaders::{v2, v3, v4}; + mod actor; mod compat; pub mod error; @@ -316,7 +312,7 @@ fn persist_dump(dst_path: impl AsRef, tmp_dst: TempDir) -> anyhow::Result< Ok(()) } -struct DumpJob { +pub struct DumpJob { dump_path: PathBuf, db_path: PathBuf, update_file_store: UpdateFileStore, @@ -328,65 +324,65 @@ struct DumpJob { impl DumpJob { async fn run(self) -> Result<()> { - trace!("Performing dump."); - - create_dir_all(&self.dump_path).await?; - - let temp_dump_dir = tokio::task::spawn_blocking(tempfile::TempDir::new).await??; - let temp_dump_path = temp_dump_dir.path().to_owned(); - - let meta = MetadataVersion::new_v4(self.index_db_size, self.update_db_size); - let meta_path = temp_dump_path.join(META_FILE_NAME); - let mut meta_file = File::create(&meta_path)?; - serde_json::to_writer(&mut meta_file, &meta)?; - analytics::copy_user_id(&self.db_path, &temp_dump_path); - - create_dir_all(&temp_dump_path.join("indexes")).await?; - - let (sender, receiver) = oneshot::channel(); - - self.scheduler - .write() - .await - .schedule_job(Job::Dump { - ret: sender, - path: temp_dump_path.clone(), - }) - .await; - - // wait until the job has started performing before finishing the dump process - let sender = receiver.await??; - - AuthController::dump(&self.db_path, &temp_dump_path)?; - - //TODO(marin): this is not right, the scheduler should dump itself, not do it here... - self.scheduler - .read() - .await - .dump(&temp_dump_path, self.update_file_store.clone()) - .await?; - - let dump_path = tokio::task::spawn_blocking(move || -> Result { - // for now we simply copy the updates/updates_files - // FIXME: We may copy more files than necessary, if new files are added while we are - // performing the dump. We need a way to filter them out. - - let temp_dump_file = tempfile::NamedTempFile::new_in(&self.dump_path)?; - to_tar_gz(temp_dump_path, temp_dump_file.path()) - .map_err(|e| DumpActorError::Internal(e.into()))?; - - let dump_path = self.dump_path.join(self.uid).with_extension("dump"); - temp_dump_file.persist(&dump_path)?; - - Ok(dump_path) - }) - .await??; - - // notify the update loop that we are finished performing the dump. - let _ = sender.send(()); - - info!("Created dump in {:?}.", dump_path); - + // trace!("Performing dump."); + // + // create_dir_all(&self.dump_path).await?; + // + // let temp_dump_dir = tokio::task::spawn_blocking(tempfile::TempDir::new).await??; + // let temp_dump_path = temp_dump_dir.path().to_owned(); + // + // let meta = MetadataVersion::new_v4(self.index_db_size, self.update_db_size); + // let meta_path = temp_dump_path.join(META_FILE_NAME); + // let mut meta_file = File::create(&meta_path)?; + // serde_json::to_writer(&mut meta_file, &meta)?; + // analytics::copy_user_id(&self.db_path, &temp_dump_path); + // + // create_dir_all(&temp_dump_path.join("indexes")).await?; + // + // let (sender, receiver) = oneshot::channel(); + // + // self.scheduler + // .write() + // .await + // .schedule_job(Job::Dump { + // ret: sender, + // path: temp_dump_path.clone(), + // }) + // .await; + // + // // wait until the job has started performing before finishing the dump process + // let sender = receiver.await??; + // + // AuthController::dump(&self.db_path, &temp_dump_path)?; + // + // //TODO(marin): this is not right, the scheduler should dump itself, not do it here... + // self.scheduler + // .read() + // .await + // .dump(&temp_dump_path, self.update_file_store.clone()) + // .await?; + // + // let dump_path = tokio::task::spawn_blocking(move || -> Result { + // // for now we simply copy the updates/updates_files + // // FIXME: We may copy more files than necessary, if new files are added while we are + // // performing the dump. We need a way to filter them out. + // + // let temp_dump_file = tempfile::NamedTempFile::new_in(&self.dump_path)?; + // to_tar_gz(temp_dump_path, temp_dump_file.path()) + // .map_err(|e| DumpActorError::Internal(e.into()))?; + // + // let dump_path = self.dump_path.join(self.uid).with_extension("dump"); + // temp_dump_file.persist(&dump_path)?; + // + // Ok(dump_path) + // }) + // .await??; + // + // // notify the update loop that we are finished performing the dump. + // let _ = sender.send(()); + // + // info!("Created dump in {:?}.", dump_path); + // Ok(()) } } @@ -401,7 +397,7 @@ mod test { use crate::options::SchedulerConfig; use crate::tasks::error::Result as TaskResult; use crate::tasks::task::{Task, TaskId}; - use crate::tasks::{MockTaskPerformer, TaskFilter, TaskStore}; + use crate::tasks::{BatchHandler, TaskFilter, TaskStore}; use crate::update_file_store::UpdateFileStore; fn setup() { @@ -426,7 +422,7 @@ mod test { let mocker = Mocker::default(); let update_file_store = UpdateFileStore::mock(mocker); - let mut performer = MockTaskPerformer::new(); + let mut performer = BatchHandler::new(); performer .expect_process_job() .once() @@ -480,7 +476,7 @@ mod test { ) .then(|_| Ok(Vec::new())); let task_store = TaskStore::mock(mocker); - let mut performer = MockTaskPerformer::new(); + let mut performer = BatchHandler::new(); performer .expect_process_job() .once() diff --git a/meilisearch-lib/src/index_controller/error.rs b/meilisearch-lib/src/index_controller/error.rs index 85af76623..11ef03d73 100644 --- a/meilisearch-lib/src/index_controller/error.rs +++ b/meilisearch-lib/src/index_controller/error.rs @@ -6,11 +6,11 @@ use tokio::task::JoinError; use super::DocumentAdditionFormat; use crate::document_formats::DocumentFormatError; +use crate::dump::error::DumpActorError; use crate::index::error::IndexError; use crate::tasks::error::TaskError; use crate::update_file_store::UpdateFileStoreError; -use super::dump_actor::error::DumpActorError; use crate::index_resolver::error::IndexResolverError; pub type Result = std::result::Result; diff --git a/meilisearch-lib/src/index_controller/mod.rs b/meilisearch-lib/src/index_controller/mod.rs index 4be90489a..b73402d56 100644 --- a/meilisearch-lib/src/index_controller/mod.rs +++ b/meilisearch-lib/src/index_controller/mod.rs @@ -19,25 +19,23 @@ use tokio::time::sleep; use uuid::Uuid; use crate::document_formats::{read_csv, read_json, read_ndjson}; +use crate::dump::{self, load_dump, DumpActor, DumpActorHandle, DumpActorHandleImpl, DumpInfo}; use crate::index::{ Checked, Document, IndexMeta, IndexStats, SearchQuery, SearchResult, Settings, Unchecked, }; -use crate::index_controller::dump_actor::{load_dump, DumpActor, DumpActorHandleImpl}; use crate::options::{IndexerOpts, SchedulerConfig}; use crate::snapshot::{load_snapshot, SnapshotService}; use crate::tasks::error::TaskError; use crate::tasks::task::{DocumentDeletion, Task, TaskContent, TaskId}; -use crate::tasks::{Scheduler, TaskFilter, TaskStore}; +use crate::tasks::{BatchHandler, EmptyBatchHandler, Scheduler, TaskFilter, TaskStore}; use error::Result; -use self::dump_actor::{DumpActorHandle, DumpInfo}; use self::error::IndexControllerError; use crate::index_resolver::index_store::{IndexStore, MapIndexStore}; use crate::index_resolver::meta_store::{HeedMetaStore, IndexMetaStore}; use crate::index_resolver::{create_index_resolver, IndexResolver, IndexUid}; use crate::update_file_store::UpdateFileStore; -mod dump_actor; pub mod error; pub mod versioning; @@ -73,12 +71,12 @@ pub struct IndexSettings { } pub struct IndexController { - index_resolver: Arc>, + pub index_resolver: Arc>, scheduler: Arc>, task_store: TaskStore, dump_path: PathBuf, - dump_handle: dump_actor::DumpActorHandleImpl, - update_file_store: UpdateFileStore, + dump_handle: dump::DumpActorHandleImpl, + pub update_file_store: UpdateFileStore, } /// Need a custom implementation for clone because deriving require that U and I are clone. @@ -223,8 +221,9 @@ impl IndexControllerBuilder { )?); let task_store = TaskStore::new(meta_env)?; - let scheduler = - Scheduler::new(task_store.clone(), index_resolver.clone(), scheduler_config)?; + let handlers: Vec> = + vec![index_resolver.clone(), Arc::new(EmptyBatchHandler)]; + let scheduler = Scheduler::new(task_store.clone(), handlers, scheduler_config)?; let dump_path = self .dump_dst diff --git a/meilisearch-lib/src/index_resolver/mod.rs b/meilisearch-lib/src/index_resolver/mod.rs index 33be749b1..f463cd24d 100644 --- a/meilisearch-lib/src/index_resolver/mod.rs +++ b/meilisearch-lib/src/index_resolver/mod.rs @@ -3,7 +3,7 @@ pub mod index_store; pub mod meta_store; use std::convert::{TryFrom, TryInto}; -use std::path::{Path, PathBuf}; +use std::path::Path; use std::sync::Arc; use error::{IndexResolverError, Result}; @@ -14,15 +14,12 @@ use milli::heed::Env; use milli::update::{DocumentDeletionResult, IndexerConfig}; use serde::{Deserialize, Serialize}; use time::OffsetDateTime; -use tokio::sync::oneshot; use tokio::task::spawn_blocking; use uuid::Uuid; use crate::index::{error::Result as IndexResult, Index}; use crate::options::IndexerOpts; -use crate::tasks::batch::Batch; -use crate::tasks::task::{DocumentDeletion, Job, Task, TaskContent, TaskEvent, TaskId, TaskResult}; -use crate::tasks::TaskPerformer; +use crate::tasks::task::{DocumentDeletion, Task, TaskContent, TaskEvent, TaskId, TaskResult}; use crate::update_file_store::UpdateFileStore; use self::meta_store::IndexMeta; @@ -91,69 +88,10 @@ impl TryInto for String { } } -#[async_trait::async_trait] -impl TaskPerformer for IndexResolver -where - U: IndexMetaStore + Send + Sync + 'static, - I: IndexStore + Send + Sync + 'static, -{ - async fn process_batch(&self, mut batch: Batch) -> Batch { - // If a batch contains multiple tasks, then it must be a document addition batch - if let Some(Task { - content: TaskContent::DocumentAddition { .. }, - .. - }) = batch.tasks.first() - { - debug_assert!(batch.tasks.iter().all(|t| matches!( - t, - Task { - content: TaskContent::DocumentAddition { .. }, - .. - } - ))); - - self.process_document_addition_batch(batch).await - } else { - if let Some(task) = batch.tasks.first_mut() { - task.events - .push(TaskEvent::Processing(OffsetDateTime::now_utc())); - - match self.process_task(task).await { - Ok(success) => { - task.events.push(TaskEvent::Succeded { - result: success, - timestamp: OffsetDateTime::now_utc(), - }); - } - Err(err) => task.events.push(TaskEvent::Failed { - error: err.into(), - timestamp: OffsetDateTime::now_utc(), - }), - } - } - batch - } - } - - async fn process_job(&self, job: Job) { - self.process_job(job).await; - } - - async fn finish(&self, batch: &Batch) { - for task in &batch.tasks { - if let Some(content_uuid) = task.get_content_uuid() { - if let Err(e) = self.file_store.delete(content_uuid).await { - log::error!("error deleting update file: {}", e); - } - } - } - } -} - pub struct IndexResolver { index_uuid_store: U, index_store: I, - file_store: UpdateFileStore, + pub file_store: UpdateFileStore, } impl IndexResolver { @@ -189,7 +127,7 @@ where } } - async fn process_document_addition_batch(&self, mut batch: Batch) -> Batch { + pub async fn process_document_addition_batch(&self, mut tasks: Vec) -> Vec { fn get_content_uuid(task: &Task) -> Uuid { match task { Task { @@ -200,9 +138,9 @@ where } } - let content_uuids = batch.tasks.iter().map(get_content_uuid).collect::>(); + let content_uuids = tasks.iter().map(get_content_uuid).collect::>(); - match batch.tasks.first() { + match tasks.first() { Some(Task { index_uid: Some(ref index_uid), id, @@ -231,13 +169,13 @@ where Ok(index) => index, Err(e) => { let error = ResponseError::from(e); - for task in batch.tasks.iter_mut() { + for task in tasks.iter_mut() { task.events.push(TaskEvent::Failed { error: error.clone(), timestamp: now, }); } - return batch; + return tasks; } }; @@ -269,17 +207,17 @@ where }, }; - for task in batch.tasks.iter_mut() { + for task in tasks.iter_mut() { task.events.push(event.clone()); } - batch + tasks } _ => panic!("invalid batch!"), } } - async fn process_task(&self, task: &Task) -> Result { + pub async fn process_task(&self, task: &Task) -> Result { let index_uid = task.index_uid.clone(); match &task.content { TaskContent::DocumentAddition { .. } => panic!("updates should be handled by batch"), @@ -351,33 +289,7 @@ where Ok(TaskResult::Other) } - TaskContent::Dump { path } => self.perform_dump(path).await, - } - } - - async fn perform_dump(&self, path: &PathBuf) -> Result { - todo!() - } - - async fn process_job(&self, job: Job) { - match job { - Job::Dump { ret, path } => { - log::trace!("The Dump task is getting executed"); - - let (sender, receiver) = oneshot::channel(); - if ret.send(self.dump(path).await.map(|_| sender)).is_err() { - log::error!("The dump actor died."); - } - - // wait until the dump has finished performing. - let _ = receiver.await; - } - Job::Empty => log::error!("Tried to process an empty task."), - Job::Snapshot(job) => { - if let Err(e) = job.run().await { - log::error!("Error performing snapshot: {}", e); - } - } + _ => unreachable!("Invalid task for index resolver"), } } diff --git a/meilisearch-lib/src/lib.rs b/meilisearch-lib/src/lib.rs index 1161340ba..3d3d5e860 100644 --- a/meilisearch-lib/src/lib.rs +++ b/meilisearch-lib/src/lib.rs @@ -3,6 +3,7 @@ pub mod error; pub mod options; mod analytics; +mod dump; pub mod index; pub mod index_controller; mod index_resolver; diff --git a/meilisearch-lib/src/snapshot.rs b/meilisearch-lib/src/snapshot.rs index 6c27ad2f0..6dda0f3e8 100644 --- a/meilisearch-lib/src/snapshot.rs +++ b/meilisearch-lib/src/snapshot.rs @@ -14,7 +14,6 @@ use walkdir::WalkDir; use crate::compression::from_tar_gz; use crate::index_controller::open_meta_env; use crate::index_controller::versioning::VERSION_FILE_NAME; -use crate::tasks::task::Job; use crate::tasks::Scheduler; pub struct SnapshotService { @@ -39,8 +38,7 @@ impl SnapshotService { meta_env_size: self.meta_env_size, index_size: self.index_size, }; - let job = Job::Snapshot(snapshot_job); - self.scheduler.write().await.schedule_job(job).await; + self.scheduler.write().await.register_snapshot(snapshot_job); sleep(self.snapshot_period).await; } } diff --git a/meilisearch-lib/src/tasks/batch.rs b/meilisearch-lib/src/tasks/batch.rs index 4a8cf7907..88c73e3de 100644 --- a/meilisearch-lib/src/tasks/batch.rs +++ b/meilisearch-lib/src/tasks/batch.rs @@ -1,22 +1,75 @@ use time::OffsetDateTime; -use super::task::Task; +use crate::snapshot::SnapshotJob; + +use super::task::{Task, TaskEvent}; pub type BatchId = u64; +#[derive(Debug)] +pub enum BatchContent { + DocumentAddtitionBatch(Vec), + IndexUpdate(Task), + Dump(Task), + Snapshot(SnapshotJob), + // Symbolizes a empty batch. This can occur when we were woken, but there wasn't any work to do. + Empty, +} + +impl BatchContent { + pub fn first(&self) -> Option<&Task> { + match self { + BatchContent::DocumentAddtitionBatch(ts) => ts.first(), + BatchContent::Dump(t) | BatchContent::IndexUpdate(t) => Some(t), + BatchContent::Snapshot(_) | BatchContent::Empty => None, + } + } + + pub fn push_event(&mut self, event: TaskEvent) { + match self { + BatchContent::DocumentAddtitionBatch(ts) => { + ts.iter_mut().for_each(|t| t.events.push(event.clone())) + } + BatchContent::IndexUpdate(t) | BatchContent::Dump(t) => t.events.push(event), + BatchContent::Snapshot(_) | BatchContent::Empty => (), + } + } +} + #[derive(Debug)] pub struct Batch { - pub id: BatchId, + // Only batches that contains a persistant tasks are given an id. Snapshot batches don't have + // an id. + pub id: Option, pub created_at: OffsetDateTime, - pub tasks: Vec, + pub content: BatchContent, } impl Batch { + pub fn new(id: Option, content: BatchContent) -> Self { + Self { + id, + created_at: OffsetDateTime::now_utc(), + content, + } + } pub fn len(&self) -> usize { - self.tasks.len() + match self.content { + BatchContent::DocumentAddtitionBatch(ref ts) => ts.len(), + BatchContent::IndexUpdate(_) | BatchContent::Dump(_) | BatchContent::Snapshot(_) => 1, + BatchContent::Empty => 0, + } } pub fn is_empty(&self) -> bool { - self.tasks.is_empty() + self.len() == 0 + } + + pub fn empty() -> Self { + Self { + id: None, + created_at: OffsetDateTime::now_utc(), + content: BatchContent::Empty, + } } } diff --git a/meilisearch-lib/src/tasks/batch_handlers/empty_handler.rs b/meilisearch-lib/src/tasks/batch_handlers/empty_handler.rs new file mode 100644 index 000000000..5d6aa2275 --- /dev/null +++ b/meilisearch-lib/src/tasks/batch_handlers/empty_handler.rs @@ -0,0 +1,20 @@ +use crate::tasks::batch::{Batch, BatchContent}; +use crate::tasks::BatchHandler; + +/// A sink handler for empty tasks. +pub struct EmptyBatchHandler; + +#[async_trait::async_trait] +impl BatchHandler for EmptyBatchHandler { + fn accept(&self, batch: &Batch) -> bool { + matches!(batch.content, BatchContent::Empty) + } + + async fn process_batch(&self, batch: Batch) -> Batch { + batch + } + + async fn finish(&self, _: &Batch) { + () + } +} diff --git a/meilisearch-lib/src/tasks/batch_handlers/index_resolver_handler.rs b/meilisearch-lib/src/tasks/batch_handlers/index_resolver_handler.rs new file mode 100644 index 000000000..41a78a22b --- /dev/null +++ b/meilisearch-lib/src/tasks/batch_handlers/index_resolver_handler.rs @@ -0,0 +1,58 @@ +use time::OffsetDateTime; + +use crate::index_resolver::IndexResolver; +use crate::index_resolver::{index_store::IndexStore, meta_store::IndexMetaStore}; +use crate::tasks::batch::{Batch, BatchContent}; +use crate::tasks::task::TaskEvent; +use crate::tasks::BatchHandler; + +#[async_trait::async_trait] +impl BatchHandler for IndexResolver +where + U: IndexMetaStore + Send + Sync + 'static, + I: IndexStore + Send + Sync + 'static, +{ + fn accept(&self, batch: &Batch) -> bool { + match batch.content { + BatchContent::DocumentAddtitionBatch(_) | BatchContent::IndexUpdate(_) => true, + _ => false, + } + } + + async fn process_batch(&self, mut batch: Batch) -> Batch { + match batch.content { + BatchContent::DocumentAddtitionBatch(ref mut tasks) => { + *tasks = self + .process_document_addition_batch(std::mem::take(tasks)) + .await; + } + BatchContent::IndexUpdate(ref mut task) => match self.process_task(&task).await { + Ok(success) => { + task.events.push(TaskEvent::Succeded { + result: success, + timestamp: OffsetDateTime::now_utc(), + }); + } + Err(err) => task.events.push(TaskEvent::Failed { + error: err.into(), + timestamp: OffsetDateTime::now_utc(), + }), + }, + _ => unreachable!(), + } + + batch + } + + async fn finish(&self, batch: &Batch) { + if let BatchContent::DocumentAddtitionBatch(ref tasks) = batch.content { + for task in tasks { + if let Some(content_uuid) = task.get_content_uuid() { + if let Err(e) = self.file_store.delete(content_uuid).await { + log::error!("error deleting update file: {}", e); + } + } + } + } + } +} diff --git a/meilisearch-lib/src/tasks/batch_handlers/mod.rs b/meilisearch-lib/src/tasks/batch_handlers/mod.rs new file mode 100644 index 000000000..0e94c76f1 --- /dev/null +++ b/meilisearch-lib/src/tasks/batch_handlers/mod.rs @@ -0,0 +1,2 @@ +pub mod empty_handler; +mod index_resolver_handler; diff --git a/meilisearch-lib/src/tasks/mod.rs b/meilisearch-lib/src/tasks/mod.rs index b56dfaf9d..bc01c4901 100644 --- a/meilisearch-lib/src/tasks/mod.rs +++ b/meilisearch-lib/src/tasks/mod.rs @@ -1,5 +1,6 @@ use async_trait::async_trait; +pub use batch_handlers::empty_handler::EmptyBatchHandler; pub use scheduler::Scheduler; pub use task_store::TaskFilter; @@ -11,9 +12,8 @@ pub use task_store::TaskStore; use batch::Batch; use error::Result; -use self::task::Job; - pub mod batch; +mod batch_handlers; pub mod error; mod scheduler; pub mod task; @@ -22,12 +22,13 @@ pub mod update_loop; #[cfg_attr(test, mockall::automock(type Error=test::DebugError;))] #[async_trait] -pub trait TaskPerformer: Sync + Send + 'static { +pub trait BatchHandler: Sync + Send + 'static { + /// return whether this handler can accept this batch + fn accept(&self, batch: &Batch) -> bool; + /// Processes the `Task` batch returning the batch with the `Task` updated. async fn process_batch(&self, batch: Batch) -> Batch; - async fn process_job(&self, job: Job); - /// `finish` is called when the result of `process` has been commited to the task store. This /// method can be used to perform cleanup after the update has been completed for example. async fn finish(&self, batch: &Batch); diff --git a/meilisearch-lib/src/tasks/scheduler.rs b/meilisearch-lib/src/tasks/scheduler.rs index 1f76f179a..f3018b782 100644 --- a/meilisearch-lib/src/tasks/scheduler.rs +++ b/meilisearch-lib/src/tasks/scheduler.rs @@ -2,6 +2,7 @@ use std::cmp::Ordering; use std::collections::{hash_map::Entry, BinaryHeap, HashMap, VecDeque}; use std::ops::{Deref, DerefMut}; use std::path::Path; +use std::slice; use std::sync::Arc; use std::time::Duration; @@ -11,24 +12,21 @@ use time::OffsetDateTime; use tokio::sync::{watch, RwLock}; use crate::options::SchedulerConfig; +use crate::snapshot::SnapshotJob; use crate::update_file_store::UpdateFileStore; -use super::batch::Batch; +use super::batch::{Batch, BatchContent}; use super::error::Result; -use super::task::{Job, Task, TaskContent, TaskEvent, TaskId}; +use super::task::{Task, TaskContent, TaskEvent, TaskId}; use super::update_loop::UpdateLoop; -use super::{TaskFilter, TaskPerformer, TaskStore}; +use super::{BatchHandler, TaskFilter, TaskStore}; #[derive(Eq, Debug, Clone, Copy)] enum TaskType { - DocumentAddition { - number: usize, - }, - DocumentUpdate { - number: usize, - }, - /// Any other kind of task, including Dumps - Other, + DocumentAddition { number: usize }, + DocumentUpdate { number: usize }, + IndexUpdate, + Dump, } /// Two tasks are equal if they have the same type. @@ -166,7 +164,13 @@ impl TaskQueue { } => TaskType::DocumentUpdate { number: documents_count, }, - _ => TaskType::Other, + TaskContent::Dump { .. } => TaskType::Dump, + TaskContent::DocumentDeletion(_) + | TaskContent::SettingsUpdate { .. } + | TaskContent::IndexDeletion + | TaskContent::IndexCreation { .. } + | TaskContent::IndexUpdate { .. } => TaskType::IndexUpdate, + _ => unreachable!("unhandled task type"), }; let task = PendingTask { kind, id }; @@ -217,11 +221,12 @@ impl TaskQueue { } pub struct Scheduler { - jobs: VecDeque, + // TODO: currently snapshots are non persistent tasks, and are treated differently. + snapshots: VecDeque, tasks: TaskQueue, store: TaskStore, - processing: Vec, + processing: Processing, next_fetched_task_id: TaskId, config: SchedulerConfig, /// Notifies the update loop that a new task was received @@ -229,14 +234,11 @@ pub struct Scheduler { } impl Scheduler { - pub fn new

( + pub fn new( store: TaskStore, - performer: Arc

, + performers: Vec>, mut config: SchedulerConfig, - ) -> Result>> - where - P: TaskPerformer, - { + ) -> Result>> { let (notifier, rcv) = watch::channel(()); let debounce_time = config.debounce_duration_sec; @@ -247,11 +249,11 @@ impl Scheduler { } let this = Self { - jobs: VecDeque::new(), + snapshots: VecDeque::new(), tasks: TaskQueue::default(), store, - processing: Vec::new(), + processing: Processing::Nothing, next_fetched_task_id: 0, config, notifier, @@ -264,7 +266,7 @@ impl Scheduler { let update_loop = UpdateLoop::new( this.clone(), - performer, + performers, debounce_time.filter(|&v| v > 0).map(Duration::from_secs), rcv, ); @@ -283,9 +285,13 @@ impl Scheduler { self.tasks.insert(task); } + pub fn register_snapshot(&mut self, job: SnapshotJob) { + self.snapshots.push_back(job); + } + /// Clears the processing list, this method should be called when the processing of a batch is finished. pub fn finish(&mut self) { - self.processing.clear(); + self.processing = Processing::Nothing; } pub fn notify(&self) { @@ -293,13 +299,27 @@ impl Scheduler { } fn notify_if_not_empty(&self) { - if !self.jobs.is_empty() || !self.tasks.is_empty() { + if !self.snapshots.is_empty() || !self.tasks.is_empty() { self.notify(); } } - pub async fn update_tasks(&self, tasks: Vec) -> Result> { - self.store.update_tasks(tasks).await + pub async fn update_tasks(&self, content: BatchContent) -> Result { + match content { + BatchContent::DocumentAddtitionBatch(tasks) => { + let tasks = self.store.update_tasks(tasks).await?; + Ok(BatchContent::DocumentAddtitionBatch(tasks)) + } + BatchContent::IndexUpdate(t) => { + let mut tasks = self.store.update_tasks(vec![t]).await?; + Ok(BatchContent::IndexUpdate(tasks.remove(0))) + } + BatchContent::Dump(t) => { + let mut tasks = self.store.update_tasks(vec![t]).await?; + Ok(BatchContent::Dump(tasks.remove(0))) + } + other => Ok(other), + } } pub async fn get_task(&self, id: TaskId, filter: Option) -> Result { @@ -318,16 +338,16 @@ impl Scheduler { pub async fn get_processing_tasks(&self) -> Result> { let mut tasks = Vec::new(); - for id in self.processing.iter() { - let task = self.store.get_task(*id, None).await?; + for id in self.processing.ids() { + let task = self.store.get_task(id, None).await?; tasks.push(task); } Ok(tasks) } - pub async fn schedule_job(&mut self, job: Job) { - self.jobs.push_back(job); + pub async fn schedule_snapshot(&mut self, job: SnapshotJob) { + self.snapshots.push_back(job); self.notify(); } @@ -353,106 +373,163 @@ impl Scheduler { } /// Prepare the next batch, and set `processing` to the ids in that batch. - pub async fn prepare(&mut self) -> Result { + pub async fn prepare(&mut self) -> Result { // If there is a job to process, do it first. - if let Some(job) = self.jobs.pop_front() { + if let Some(job) = self.snapshots.pop_front() { // There is more work to do, notify the update loop self.notify_if_not_empty(); - return Ok(Pending::Job(job)); + let batch = Batch::new(None, BatchContent::Snapshot(job)); + return Ok(batch); } + // Try to fill the queue with pending tasks. self.fetch_pending_tasks().await?; - make_batch(&mut self.tasks, &mut self.processing, &self.config); + self.processing = make_batch(&mut self.tasks, &self.config); log::debug!("prepared batch with {} tasks", self.processing.len()); - if !self.processing.is_empty() { - let ids = std::mem::take(&mut self.processing); + if !self.processing.is_nothing() { + let (processing, mut content) = self + .store + .get_processing_tasks(std::mem::take(&mut self.processing)) + .await?; - let (ids, mut tasks) = self.store.get_pending_tasks(ids).await?; - - // The batch id is the id of the first update it contains - let id = match tasks.first() { + // The batch id is the id of the first update it contains. At this point we must have a + // valid batch that contains at least 1 task. + let id = match content.first() { Some(Task { id, .. }) => *id, _ => panic!("invalid batch"), }; - tasks.iter_mut().for_each(|t| { - t.events.push(TaskEvent::Batched { - batch_id: id, - timestamp: OffsetDateTime::now_utc(), - }) + content.push_event(TaskEvent::Batched { + batch_id: id, + timestamp: OffsetDateTime::now_utc(), }); - self.processing = ids; + self.processing = processing; - let batch = Batch { - id, - created_at: OffsetDateTime::now_utc(), - tasks, - }; + let batch = Batch::new(Some(id), content); // There is more work to do, notify the update loop self.notify_if_not_empty(); - Ok(Pending::Batch(batch)) + Ok(batch) } else { - Ok(Pending::Nothing) + Ok(Batch::empty()) } } } -#[derive(Debug)] -pub enum Pending { - Batch(Batch), - Job(Job), +#[derive(Debug, Default)] +pub enum Processing { + DocumentAdditions(Vec), + IndexUpdate(TaskId), + Dump(TaskId), + /// Variant used when there is nothing to process. + #[default] Nothing, } -fn make_batch(tasks: &mut TaskQueue, processing: &mut Vec, config: &SchedulerConfig) { - processing.clear(); +enum ProcessingIter<'a> { + Many(slice::Iter<'a, TaskId>), + Single(Option), +} - let mut doc_count = 0; - tasks.head_mut(|list| match list.peek().copied() { - Some(PendingTask { - kind: TaskType::Other, - id, - }) => { - processing.push(id); - list.pop(); +impl<'a> Iterator for ProcessingIter<'a> { + type Item = TaskId; + + fn next(&mut self) -> Option { + match self { + ProcessingIter::Many(iter) => iter.next().copied(), + ProcessingIter::Single(val) => val.take(), } - Some(PendingTask { kind, .. }) => loop { - match list.peek() { - Some(pending) if pending.kind == kind => { - // We always need to process at least one task for the scheduler to make progress. - if processing.len() >= config.max_batch_size.unwrap_or(usize::MAX).max(1) { - break; - } - let pending = list.pop().unwrap(); - processing.push(pending.id); + } +} - // We add the number of documents to the count if we are scheduling document additions and - // stop adding if we already have enough. - // - // We check that bound only after adding the current task to the batch, so that a batch contains at least one task. - match pending.kind { - TaskType::DocumentUpdate { number } - | TaskType::DocumentAddition { number } => { - doc_count += number; +impl Processing { + fn is_nothing(&self) -> bool { + matches!(self, Processing::Nothing) + } - if doc_count >= config.max_documents_per_batch.unwrap_or(usize::MAX) { + pub fn ids(&self) -> impl Iterator + '_ { + match self { + Processing::DocumentAdditions(v) => ProcessingIter::Many(v.iter()), + Processing::IndexUpdate(id) | Processing::Dump(id) => ProcessingIter::Single(Some(*id)), + Processing::Nothing => ProcessingIter::Single(None), + } + } + + pub fn len(&self) -> usize { + match self { + Processing::DocumentAdditions(v) => v.len(), + Processing::IndexUpdate(_) | Processing::Dump(_) => 1, + Processing::Nothing => 0, + } + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } +} + +fn make_batch(tasks: &mut TaskQueue, config: &SchedulerConfig) -> Processing { + let mut doc_count = 0; + tasks + .head_mut(|list| match list.peek().copied() { + Some(PendingTask { + kind: TaskType::IndexUpdate, + id, + }) => { + list.pop(); + Processing::IndexUpdate(id) + } + Some(PendingTask { + kind: TaskType::Dump, + id, + }) => { + list.pop(); + Processing::Dump(id) + } + Some(PendingTask { kind, .. }) => { + let mut task_list = Vec::new(); + loop { + match list.peek() { + Some(pending) if pending.kind == kind => { + // We always need to process at least one task for the scheduler to make progress. + if task_list.len() >= config.max_batch_size.unwrap_or(usize::MAX).max(1) + { break; } + let pending = list.pop().unwrap(); + task_list.push(pending.id); + + // We add the number of documents to the count if we are scheduling document additions and + // stop adding if we already have enough. + // + // We check that bound only after adding the current task to the batch, so that a batch contains at least one task. + match pending.kind { + TaskType::DocumentUpdate { number } + | TaskType::DocumentAddition { number } => { + doc_count += number; + + if doc_count + >= config.max_documents_per_batch.unwrap_or(usize::MAX) + { + break; + } + } + _ => (), + } } - _ => (), + _ => break, } } - _ => break, + Processing::DocumentAdditions(task_list) } - }, - None => (), - }); + None => Processing::Nothing, + }) + .unwrap_or(Processing::Nothing) } #[cfg(test)] diff --git a/meilisearch-lib/src/tasks/task.rs b/meilisearch-lib/src/tasks/task.rs index c20d2151b..cb5ba671a 100644 --- a/meilisearch-lib/src/tasks/task.rs +++ b/meilisearch-lib/src/tasks/task.rs @@ -4,14 +4,12 @@ use meilisearch_error::ResponseError; use milli::update::{DocumentAdditionResult, IndexDocumentsMethod}; use serde::{Deserialize, Serialize}; use time::OffsetDateTime; -use tokio::sync::oneshot; use uuid::Uuid; use super::batch::BatchId; use crate::{ index::{Settings, Unchecked}, - index_resolver::{error::IndexResolverError, IndexUid}, - snapshot::SnapshotJob, + index_resolver::IndexUid, }; pub type TaskId = u64; @@ -110,33 +108,6 @@ impl Task { } } -/// A job is like a volatile priority `Task`. -/// It should be processed as fast as possible and is not stored on disk. -/// This means, when Meilisearch is closed all your unprocessed jobs will disappear. -#[derive(Debug, derivative::Derivative)] -#[derivative(PartialEq)] -pub enum Job { - Dump { - #[derivative(PartialEq = "ignore")] - ret: oneshot::Sender, IndexResolverError>>, - path: PathBuf, - }, - Snapshot(#[derivative(PartialEq = "ignore")] SnapshotJob), - Empty, -} - -impl Default for Job { - fn default() -> Self { - Self::Empty - } -} - -impl Job { - pub fn take(&mut self) -> Self { - std::mem::take(self) - } -} - #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] #[cfg_attr(test, derive(proptest_derive::Arbitrary))] pub enum DocumentDeletion { diff --git a/meilisearch-lib/src/tasks/task_store/mod.rs b/meilisearch-lib/src/tasks/task_store/mod.rs index bde0f6360..f580c8e26 100644 --- a/meilisearch-lib/src/tasks/task_store/mod.rs +++ b/meilisearch-lib/src/tasks/task_store/mod.rs @@ -9,7 +9,9 @@ use log::debug; use milli::heed::{Env, RwTxn}; use time::OffsetDateTime; +use super::batch::BatchContent; use super::error::TaskError; +use super::scheduler::Processing; use super::task::{Task, TaskContent, TaskId}; use super::Result; use crate::index_resolver::IndexUid; @@ -122,19 +124,44 @@ impl TaskStore { } } - pub async fn get_pending_tasks(&self, ids: Vec) -> Result<(Vec, Vec)> { + /// This methods takes a `Processing` which contains the next task ids to process, and returns + /// the coresponding tasks along with the ownership to the passed processing. + /// + /// We need get_processing_tasks to take ownership over `Processing` because we need it to be + /// valid for 'static. + pub async fn get_processing_tasks( + &self, + processing: Processing, + ) -> Result<(Processing, BatchContent)> { let store = self.store.clone(); let tasks = tokio::task::spawn_blocking(move || -> Result<_> { - let mut tasks = Vec::new(); let txn = store.rtxn()?; - for id in ids.iter() { - let task = store - .get(&txn, *id)? - .ok_or(TaskError::UnexistingTask(*id))?; - tasks.push(task); - } - Ok((ids, tasks)) + let content = match processing { + Processing::DocumentAdditions(ref ids) => { + let mut tasks = Vec::new(); + + for id in ids.iter() { + let task = store + .get(&txn, *id)? + .ok_or(TaskError::UnexistingTask(*id))?; + tasks.push(task); + } + BatchContent::DocumentAddtitionBatch(tasks) + } + Processing::IndexUpdate(id) => { + let task = store.get(&txn, id)?.ok_or(TaskError::UnexistingTask(id))?; + BatchContent::IndexUpdate(task) + } + Processing::Dump(id) => { + let task = store.get(&txn, id)?.ok_or(TaskError::UnexistingTask(id))?; + debug_assert!(matches!(task.content, TaskContent::Dump { .. })); + BatchContent::Dump(task) + } + Processing::Nothing => unreachable!(), + }; + + Ok((processing, content)) }) .await??; @@ -231,7 +258,7 @@ impl TaskStore { #[cfg(test)] pub mod test { - use crate::tasks::task_store::store::test::tmp_env; + use crate::tasks::{scheduler::Processing, task_store::store::test::tmp_env}; use super::*; @@ -280,12 +307,12 @@ pub mod test { } } - pub async fn get_pending_tasks( + pub async fn get_processing_tasks( &self, - tasks: Vec, - ) -> Result<(Vec, Vec)> { + tasks: Processing, + ) -> Result<(Processing, BatchContent)> { match self { - Self::Real(s) => s.get_pending_tasks(tasks).await, + Self::Real(s) => s.get_processing_tasks(tasks).await, Self::Mock(m) => unsafe { m.get("get_pending_task").call(tasks) }, } } diff --git a/meilisearch-lib/src/tasks/update_loop.rs b/meilisearch-lib/src/tasks/update_loop.rs index b09811721..01e88755a 100644 --- a/meilisearch-lib/src/tasks/update_loop.rs +++ b/meilisearch-lib/src/tasks/update_loop.rs @@ -7,33 +7,29 @@ use tokio::time::interval_at; use super::batch::Batch; use super::error::Result; -use super::scheduler::Pending; -use super::{Scheduler, TaskPerformer}; +use super::{BatchHandler, Scheduler}; use crate::tasks::task::TaskEvent; /// The update loop sequentially performs batches of updates by asking the scheduler for a batch, /// and handing it to the `TaskPerformer`. -pub struct UpdateLoop { +pub struct UpdateLoop { scheduler: Arc>, - performer: Arc

, + performers: Vec>, notifier: Option>, debounce_duration: Option, } -impl

UpdateLoop

-where - P: TaskPerformer + Send + Sync + 'static, -{ +impl UpdateLoop { pub fn new( scheduler: Arc>, - performer: Arc

, + performers: Vec>, debuf_duration: Option, notifier: watch::Receiver<()>, ) -> Self { Self { scheduler, - performer, + performers, debounce_duration: debuf_duration, notifier: Some(notifier), } @@ -59,34 +55,29 @@ where } async fn process_next_batch(&self) -> Result<()> { - let pending = { self.scheduler.write().await.prepare().await? }; - match pending { - Pending::Batch(mut batch) => { - for task in &mut batch.tasks { - task.events - .push(TaskEvent::Processing(OffsetDateTime::now_utc())); - } + let mut batch = { self.scheduler.write().await.prepare().await? }; + let performer = self + .performers + .iter() + .find(|p| p.accept(&batch)) + .expect("No performer found for batch") + .clone(); - batch.tasks = { - self.scheduler - .read() - .await - .update_tasks(batch.tasks) - .await? - }; + batch + .content + .push_event(TaskEvent::Processing(OffsetDateTime::now_utc())); - let performer = self.performer.clone(); + batch.content = { + self.scheduler + .read() + .await + .update_tasks(batch.content) + .await? + }; - let batch = performer.process_batch(batch).await; + let batch = performer.process_batch(batch).await; - self.handle_batch_result(batch).await?; - } - Pending::Job(job) => { - let performer = self.performer.clone(); - performer.process_job(job).await; - } - Pending::Nothing => (), - } + self.handle_batch_result(batch, performer).await?; Ok(()) } @@ -96,13 +87,17 @@ where /// When a task is processed, the result of the process is pushed to its event list. The /// `handle_batch_result` make sure that the new state is saved to the store. /// The tasks are then removed from the processing queue. - async fn handle_batch_result(&self, mut batch: Batch) -> Result<()> { + async fn handle_batch_result( + &self, + mut batch: Batch, + performer: Arc, + ) -> Result<()> { let mut scheduler = self.scheduler.write().await; - let tasks = scheduler.update_tasks(batch.tasks).await?; + let content = scheduler.update_tasks(batch.content).await?; scheduler.finish(); drop(scheduler); - batch.tasks = tasks; - self.performer.finish(&batch).await; + batch.content = content; + performer.finish(&batch).await; Ok(()) } }