make scheduler accept multiple batch handlers

This commit is contained in:
ad hoc 2022-05-19 12:43:46 +02:00
parent 6a0231cb28
commit 46cdc17701
No known key found for this signature in database
GPG Key ID: 4F00A782990CC643
28 changed files with 484 additions and 374 deletions

View File

@ -44,7 +44,7 @@ impl From<TaskContent> for TaskType {
TaskContent::IndexDeletion => TaskType::IndexDeletion, TaskContent::IndexDeletion => TaskType::IndexDeletion,
TaskContent::IndexCreation { .. } => TaskType::IndexCreation, TaskContent::IndexCreation { .. } => TaskType::IndexCreation,
TaskContent::IndexUpdate { .. } => TaskType::IndexUpdate, TaskContent::IndexUpdate { .. } => TaskType::IndexUpdate,
TaskContent::Dump { path } => TaskType::Dump, TaskContent::Dump { .. } => TaskType::Dump,
_ => unreachable!("unexpected task type"), _ => unreachable!("unexpected task type"),
} }
} }

View File

@ -5,8 +5,8 @@ use std::path::{Path, PathBuf};
use serde_json::{Deserializer, Value}; use serde_json::{Deserializer, Value};
use tempfile::NamedTempFile; use tempfile::NamedTempFile;
use crate::index_controller::dump_actor::compat::{self, v2, v3}; use crate::dump::compat::{self, v2, v3};
use crate::index_controller::dump_actor::Metadata; use crate::dump::Metadata;
use crate::options::IndexerOpts; use crate::options::IndexerOpts;
/// The dump v2 reads the dump folder and patches all the needed file to make it compatible with a /// The dump v2 reads the dump folder and patches all the needed file to make it compatible with a

View File

@ -9,8 +9,8 @@ use log::info;
use tempfile::tempdir; use tempfile::tempdir;
use uuid::Uuid; use uuid::Uuid;
use crate::index_controller::dump_actor::compat::v3; use crate::dump::compat::v3;
use crate::index_controller::dump_actor::Metadata; use crate::dump::Metadata;
use crate::index_resolver::meta_store::{DumpEntry, IndexMeta}; use crate::index_resolver::meta_store::{DumpEntry, IndexMeta};
use crate::options::IndexerOpts; use crate::options::IndexerOpts;
use crate::tasks::task::{Task, TaskId}; use crate::tasks::task::{Task, TaskId};

View File

@ -6,7 +6,7 @@ use meilisearch_auth::AuthController;
use milli::heed::EnvOpenOptions; use milli::heed::EnvOpenOptions;
use crate::analytics; use crate::analytics;
use crate::index_controller::dump_actor::Metadata; use crate::dump::Metadata;
use crate::index_resolver::IndexResolver; use crate::index_resolver::IndexResolver;
use crate::options::IndexerOpts; use crate::options::IndexerOpts;
use crate::tasks::TaskStore; use crate::tasks::TaskStore;

View File

@ -3,28 +3,24 @@ use std::path::{Path, PathBuf};
use std::sync::Arc; use std::sync::Arc;
use anyhow::bail; use anyhow::bail;
use log::{info, trace}; use log::info;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use time::OffsetDateTime; use time::OffsetDateTime;
pub use actor::DumpActor; pub use actor::DumpActor;
pub use handle_impl::*; pub use handle_impl::*;
use meilisearch_auth::AuthController;
pub use message::DumpMsg; pub use message::DumpMsg;
use tempfile::TempDir; use tempfile::TempDir;
use tokio::fs::create_dir_all; use tokio::sync::RwLock;
use tokio::sync::{oneshot, RwLock};
use crate::analytics; use crate::compression::from_tar_gz;
use crate::compression::{from_tar_gz, to_tar_gz};
use crate::index_controller::dump_actor::error::DumpActorError;
use crate::index_controller::dump_actor::loaders::{v2, v3, v4};
use crate::options::IndexerOpts; use crate::options::IndexerOpts;
use crate::tasks::task::Job;
use crate::tasks::Scheduler; use crate::tasks::Scheduler;
use crate::update_file_store::UpdateFileStore; use crate::update_file_store::UpdateFileStore;
use error::Result; use error::Result;
use self::loaders::{v2, v3, v4};
mod actor; mod actor;
mod compat; mod compat;
pub mod error; pub mod error;
@ -316,7 +312,7 @@ fn persist_dump(dst_path: impl AsRef<Path>, tmp_dst: TempDir) -> anyhow::Result<
Ok(()) Ok(())
} }
struct DumpJob { pub struct DumpJob {
dump_path: PathBuf, dump_path: PathBuf,
db_path: PathBuf, db_path: PathBuf,
update_file_store: UpdateFileStore, update_file_store: UpdateFileStore,
@ -328,65 +324,65 @@ struct DumpJob {
impl DumpJob { impl DumpJob {
async fn run(self) -> Result<()> { async fn run(self) -> Result<()> {
trace!("Performing dump."); // trace!("Performing dump.");
//
create_dir_all(&self.dump_path).await?; // create_dir_all(&self.dump_path).await?;
//
let temp_dump_dir = tokio::task::spawn_blocking(tempfile::TempDir::new).await??; // let temp_dump_dir = tokio::task::spawn_blocking(tempfile::TempDir::new).await??;
let temp_dump_path = temp_dump_dir.path().to_owned(); // let temp_dump_path = temp_dump_dir.path().to_owned();
//
let meta = MetadataVersion::new_v4(self.index_db_size, self.update_db_size); // let meta = MetadataVersion::new_v4(self.index_db_size, self.update_db_size);
let meta_path = temp_dump_path.join(META_FILE_NAME); // let meta_path = temp_dump_path.join(META_FILE_NAME);
let mut meta_file = File::create(&meta_path)?; // let mut meta_file = File::create(&meta_path)?;
serde_json::to_writer(&mut meta_file, &meta)?; // serde_json::to_writer(&mut meta_file, &meta)?;
analytics::copy_user_id(&self.db_path, &temp_dump_path); // analytics::copy_user_id(&self.db_path, &temp_dump_path);
//
create_dir_all(&temp_dump_path.join("indexes")).await?; // create_dir_all(&temp_dump_path.join("indexes")).await?;
//
let (sender, receiver) = oneshot::channel(); // let (sender, receiver) = oneshot::channel();
//
self.scheduler // self.scheduler
.write() // .write()
.await // .await
.schedule_job(Job::Dump { // .schedule_job(Job::Dump {
ret: sender, // ret: sender,
path: temp_dump_path.clone(), // path: temp_dump_path.clone(),
}) // })
.await; // .await;
//
// wait until the job has started performing before finishing the dump process // // wait until the job has started performing before finishing the dump process
let sender = receiver.await??; // let sender = receiver.await??;
//
AuthController::dump(&self.db_path, &temp_dump_path)?; // AuthController::dump(&self.db_path, &temp_dump_path)?;
//
//TODO(marin): this is not right, the scheduler should dump itself, not do it here... // //TODO(marin): this is not right, the scheduler should dump itself, not do it here...
self.scheduler // self.scheduler
.read() // .read()
.await // .await
.dump(&temp_dump_path, self.update_file_store.clone()) // .dump(&temp_dump_path, self.update_file_store.clone())
.await?; // .await?;
//
let dump_path = tokio::task::spawn_blocking(move || -> Result<PathBuf> { // let dump_path = tokio::task::spawn_blocking(move || -> Result<PathBuf> {
// for now we simply copy the updates/updates_files // // for now we simply copy the updates/updates_files
// FIXME: We may copy more files than necessary, if new files are added while we are // // FIXME: We may copy more files than necessary, if new files are added while we are
// performing the dump. We need a way to filter them out. // // performing the dump. We need a way to filter them out.
//
let temp_dump_file = tempfile::NamedTempFile::new_in(&self.dump_path)?; // let temp_dump_file = tempfile::NamedTempFile::new_in(&self.dump_path)?;
to_tar_gz(temp_dump_path, temp_dump_file.path()) // to_tar_gz(temp_dump_path, temp_dump_file.path())
.map_err(|e| DumpActorError::Internal(e.into()))?; // .map_err(|e| DumpActorError::Internal(e.into()))?;
//
let dump_path = self.dump_path.join(self.uid).with_extension("dump"); // let dump_path = self.dump_path.join(self.uid).with_extension("dump");
temp_dump_file.persist(&dump_path)?; // temp_dump_file.persist(&dump_path)?;
//
Ok(dump_path) // Ok(dump_path)
}) // })
.await??; // .await??;
//
// notify the update loop that we are finished performing the dump. // // notify the update loop that we are finished performing the dump.
let _ = sender.send(()); // let _ = sender.send(());
//
info!("Created dump in {:?}.", dump_path); // info!("Created dump in {:?}.", dump_path);
//
Ok(()) Ok(())
} }
} }
@ -401,7 +397,7 @@ mod test {
use crate::options::SchedulerConfig; use crate::options::SchedulerConfig;
use crate::tasks::error::Result as TaskResult; use crate::tasks::error::Result as TaskResult;
use crate::tasks::task::{Task, TaskId}; use crate::tasks::task::{Task, TaskId};
use crate::tasks::{MockTaskPerformer, TaskFilter, TaskStore}; use crate::tasks::{BatchHandler, TaskFilter, TaskStore};
use crate::update_file_store::UpdateFileStore; use crate::update_file_store::UpdateFileStore;
fn setup() { fn setup() {
@ -426,7 +422,7 @@ mod test {
let mocker = Mocker::default(); let mocker = Mocker::default();
let update_file_store = UpdateFileStore::mock(mocker); let update_file_store = UpdateFileStore::mock(mocker);
let mut performer = MockTaskPerformer::new(); let mut performer = BatchHandler::new();
performer performer
.expect_process_job() .expect_process_job()
.once() .once()
@ -480,7 +476,7 @@ mod test {
) )
.then(|_| Ok(Vec::new())); .then(|_| Ok(Vec::new()));
let task_store = TaskStore::mock(mocker); let task_store = TaskStore::mock(mocker);
let mut performer = MockTaskPerformer::new(); let mut performer = BatchHandler::new();
performer performer
.expect_process_job() .expect_process_job()
.once() .once()

View File

@ -6,11 +6,11 @@ use tokio::task::JoinError;
use super::DocumentAdditionFormat; use super::DocumentAdditionFormat;
use crate::document_formats::DocumentFormatError; use crate::document_formats::DocumentFormatError;
use crate::dump::error::DumpActorError;
use crate::index::error::IndexError; use crate::index::error::IndexError;
use crate::tasks::error::TaskError; use crate::tasks::error::TaskError;
use crate::update_file_store::UpdateFileStoreError; use crate::update_file_store::UpdateFileStoreError;
use super::dump_actor::error::DumpActorError;
use crate::index_resolver::error::IndexResolverError; use crate::index_resolver::error::IndexResolverError;
pub type Result<T> = std::result::Result<T, IndexControllerError>; pub type Result<T> = std::result::Result<T, IndexControllerError>;

View File

@ -19,25 +19,23 @@ use tokio::time::sleep;
use uuid::Uuid; use uuid::Uuid;
use crate::document_formats::{read_csv, read_json, read_ndjson}; use crate::document_formats::{read_csv, read_json, read_ndjson};
use crate::dump::{self, load_dump, DumpActor, DumpActorHandle, DumpActorHandleImpl, DumpInfo};
use crate::index::{ use crate::index::{
Checked, Document, IndexMeta, IndexStats, SearchQuery, SearchResult, Settings, Unchecked, Checked, Document, IndexMeta, IndexStats, SearchQuery, SearchResult, Settings, Unchecked,
}; };
use crate::index_controller::dump_actor::{load_dump, DumpActor, DumpActorHandleImpl};
use crate::options::{IndexerOpts, SchedulerConfig}; use crate::options::{IndexerOpts, SchedulerConfig};
use crate::snapshot::{load_snapshot, SnapshotService}; use crate::snapshot::{load_snapshot, SnapshotService};
use crate::tasks::error::TaskError; use crate::tasks::error::TaskError;
use crate::tasks::task::{DocumentDeletion, Task, TaskContent, TaskId}; use crate::tasks::task::{DocumentDeletion, Task, TaskContent, TaskId};
use crate::tasks::{Scheduler, TaskFilter, TaskStore}; use crate::tasks::{BatchHandler, EmptyBatchHandler, Scheduler, TaskFilter, TaskStore};
use error::Result; use error::Result;
use self::dump_actor::{DumpActorHandle, DumpInfo};
use self::error::IndexControllerError; use self::error::IndexControllerError;
use crate::index_resolver::index_store::{IndexStore, MapIndexStore}; use crate::index_resolver::index_store::{IndexStore, MapIndexStore};
use crate::index_resolver::meta_store::{HeedMetaStore, IndexMetaStore}; use crate::index_resolver::meta_store::{HeedMetaStore, IndexMetaStore};
use crate::index_resolver::{create_index_resolver, IndexResolver, IndexUid}; use crate::index_resolver::{create_index_resolver, IndexResolver, IndexUid};
use crate::update_file_store::UpdateFileStore; use crate::update_file_store::UpdateFileStore;
mod dump_actor;
pub mod error; pub mod error;
pub mod versioning; pub mod versioning;
@ -73,12 +71,12 @@ pub struct IndexSettings {
} }
pub struct IndexController<U, I> { pub struct IndexController<U, I> {
index_resolver: Arc<IndexResolver<U, I>>, pub index_resolver: Arc<IndexResolver<U, I>>,
scheduler: Arc<RwLock<Scheduler>>, scheduler: Arc<RwLock<Scheduler>>,
task_store: TaskStore, task_store: TaskStore,
dump_path: PathBuf, dump_path: PathBuf,
dump_handle: dump_actor::DumpActorHandleImpl, dump_handle: dump::DumpActorHandleImpl,
update_file_store: UpdateFileStore, pub update_file_store: UpdateFileStore,
} }
/// Need a custom implementation for clone because deriving require that U and I are clone. /// Need a custom implementation for clone because deriving require that U and I are clone.
@ -223,8 +221,9 @@ impl IndexControllerBuilder {
)?); )?);
let task_store = TaskStore::new(meta_env)?; let task_store = TaskStore::new(meta_env)?;
let scheduler = let handlers: Vec<Arc<dyn BatchHandler + Sync + Send + 'static>> =
Scheduler::new(task_store.clone(), index_resolver.clone(), scheduler_config)?; vec![index_resolver.clone(), Arc::new(EmptyBatchHandler)];
let scheduler = Scheduler::new(task_store.clone(), handlers, scheduler_config)?;
let dump_path = self let dump_path = self
.dump_dst .dump_dst

View File

@ -3,7 +3,7 @@ pub mod index_store;
pub mod meta_store; pub mod meta_store;
use std::convert::{TryFrom, TryInto}; use std::convert::{TryFrom, TryInto};
use std::path::{Path, PathBuf}; use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
use error::{IndexResolverError, Result}; use error::{IndexResolverError, Result};
@ -14,15 +14,12 @@ use milli::heed::Env;
use milli::update::{DocumentDeletionResult, IndexerConfig}; use milli::update::{DocumentDeletionResult, IndexerConfig};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use time::OffsetDateTime; use time::OffsetDateTime;
use tokio::sync::oneshot;
use tokio::task::spawn_blocking; use tokio::task::spawn_blocking;
use uuid::Uuid; use uuid::Uuid;
use crate::index::{error::Result as IndexResult, Index}; use crate::index::{error::Result as IndexResult, Index};
use crate::options::IndexerOpts; use crate::options::IndexerOpts;
use crate::tasks::batch::Batch; use crate::tasks::task::{DocumentDeletion, Task, TaskContent, TaskEvent, TaskId, TaskResult};
use crate::tasks::task::{DocumentDeletion, Job, Task, TaskContent, TaskEvent, TaskId, TaskResult};
use crate::tasks::TaskPerformer;
use crate::update_file_store::UpdateFileStore; use crate::update_file_store::UpdateFileStore;
use self::meta_store::IndexMeta; use self::meta_store::IndexMeta;
@ -91,69 +88,10 @@ impl TryInto<IndexUid> for String {
} }
} }
#[async_trait::async_trait]
impl<U, I> TaskPerformer for IndexResolver<U, I>
where
U: IndexMetaStore + Send + Sync + 'static,
I: IndexStore + Send + Sync + 'static,
{
async fn process_batch(&self, mut batch: Batch) -> Batch {
// If a batch contains multiple tasks, then it must be a document addition batch
if let Some(Task {
content: TaskContent::DocumentAddition { .. },
..
}) = batch.tasks.first()
{
debug_assert!(batch.tasks.iter().all(|t| matches!(
t,
Task {
content: TaskContent::DocumentAddition { .. },
..
}
)));
self.process_document_addition_batch(batch).await
} else {
if let Some(task) = batch.tasks.first_mut() {
task.events
.push(TaskEvent::Processing(OffsetDateTime::now_utc()));
match self.process_task(task).await {
Ok(success) => {
task.events.push(TaskEvent::Succeded {
result: success,
timestamp: OffsetDateTime::now_utc(),
});
}
Err(err) => task.events.push(TaskEvent::Failed {
error: err.into(),
timestamp: OffsetDateTime::now_utc(),
}),
}
}
batch
}
}
async fn process_job(&self, job: Job) {
self.process_job(job).await;
}
async fn finish(&self, batch: &Batch) {
for task in &batch.tasks {
if let Some(content_uuid) = task.get_content_uuid() {
if let Err(e) = self.file_store.delete(content_uuid).await {
log::error!("error deleting update file: {}", e);
}
}
}
}
}
pub struct IndexResolver<U, I> { pub struct IndexResolver<U, I> {
index_uuid_store: U, index_uuid_store: U,
index_store: I, index_store: I,
file_store: UpdateFileStore, pub file_store: UpdateFileStore,
} }
impl IndexResolver<HeedMetaStore, MapIndexStore> { impl IndexResolver<HeedMetaStore, MapIndexStore> {
@ -189,7 +127,7 @@ where
} }
} }
async fn process_document_addition_batch(&self, mut batch: Batch) -> Batch { pub async fn process_document_addition_batch(&self, mut tasks: Vec<Task>) -> Vec<Task> {
fn get_content_uuid(task: &Task) -> Uuid { fn get_content_uuid(task: &Task) -> Uuid {
match task { match task {
Task { Task {
@ -200,9 +138,9 @@ where
} }
} }
let content_uuids = batch.tasks.iter().map(get_content_uuid).collect::<Vec<_>>(); let content_uuids = tasks.iter().map(get_content_uuid).collect::<Vec<_>>();
match batch.tasks.first() { match tasks.first() {
Some(Task { Some(Task {
index_uid: Some(ref index_uid), index_uid: Some(ref index_uid),
id, id,
@ -231,13 +169,13 @@ where
Ok(index) => index, Ok(index) => index,
Err(e) => { Err(e) => {
let error = ResponseError::from(e); let error = ResponseError::from(e);
for task in batch.tasks.iter_mut() { for task in tasks.iter_mut() {
task.events.push(TaskEvent::Failed { task.events.push(TaskEvent::Failed {
error: error.clone(), error: error.clone(),
timestamp: now, timestamp: now,
}); });
} }
return batch; return tasks;
} }
}; };
@ -269,17 +207,17 @@ where
}, },
}; };
for task in batch.tasks.iter_mut() { for task in tasks.iter_mut() {
task.events.push(event.clone()); task.events.push(event.clone());
} }
batch tasks
} }
_ => panic!("invalid batch!"), _ => panic!("invalid batch!"),
} }
} }
async fn process_task(&self, task: &Task) -> Result<TaskResult> { pub async fn process_task(&self, task: &Task) -> Result<TaskResult> {
let index_uid = task.index_uid.clone(); let index_uid = task.index_uid.clone();
match &task.content { match &task.content {
TaskContent::DocumentAddition { .. } => panic!("updates should be handled by batch"), TaskContent::DocumentAddition { .. } => panic!("updates should be handled by batch"),
@ -351,33 +289,7 @@ where
Ok(TaskResult::Other) Ok(TaskResult::Other)
} }
TaskContent::Dump { path } => self.perform_dump(path).await, _ => unreachable!("Invalid task for index resolver"),
}
}
async fn perform_dump(&self, path: &PathBuf) -> Result<TaskResult> {
todo!()
}
async fn process_job(&self, job: Job) {
match job {
Job::Dump { ret, path } => {
log::trace!("The Dump task is getting executed");
let (sender, receiver) = oneshot::channel();
if ret.send(self.dump(path).await.map(|_| sender)).is_err() {
log::error!("The dump actor died.");
}
// wait until the dump has finished performing.
let _ = receiver.await;
}
Job::Empty => log::error!("Tried to process an empty task."),
Job::Snapshot(job) => {
if let Err(e) = job.run().await {
log::error!("Error performing snapshot: {}", e);
}
}
} }
} }

View File

@ -3,6 +3,7 @@ pub mod error;
pub mod options; pub mod options;
mod analytics; mod analytics;
mod dump;
pub mod index; pub mod index;
pub mod index_controller; pub mod index_controller;
mod index_resolver; mod index_resolver;

View File

@ -14,7 +14,6 @@ use walkdir::WalkDir;
use crate::compression::from_tar_gz; use crate::compression::from_tar_gz;
use crate::index_controller::open_meta_env; use crate::index_controller::open_meta_env;
use crate::index_controller::versioning::VERSION_FILE_NAME; use crate::index_controller::versioning::VERSION_FILE_NAME;
use crate::tasks::task::Job;
use crate::tasks::Scheduler; use crate::tasks::Scheduler;
pub struct SnapshotService { pub struct SnapshotService {
@ -39,8 +38,7 @@ impl SnapshotService {
meta_env_size: self.meta_env_size, meta_env_size: self.meta_env_size,
index_size: self.index_size, index_size: self.index_size,
}; };
let job = Job::Snapshot(snapshot_job); self.scheduler.write().await.register_snapshot(snapshot_job);
self.scheduler.write().await.schedule_job(job).await;
sleep(self.snapshot_period).await; sleep(self.snapshot_period).await;
} }
} }

View File

@ -1,22 +1,75 @@
use time::OffsetDateTime; use time::OffsetDateTime;
use super::task::Task; use crate::snapshot::SnapshotJob;
use super::task::{Task, TaskEvent};
pub type BatchId = u64; pub type BatchId = u64;
#[derive(Debug)]
pub enum BatchContent {
DocumentAddtitionBatch(Vec<Task>),
IndexUpdate(Task),
Dump(Task),
Snapshot(SnapshotJob),
// Symbolizes a empty batch. This can occur when we were woken, but there wasn't any work to do.
Empty,
}
impl BatchContent {
pub fn first(&self) -> Option<&Task> {
match self {
BatchContent::DocumentAddtitionBatch(ts) => ts.first(),
BatchContent::Dump(t) | BatchContent::IndexUpdate(t) => Some(t),
BatchContent::Snapshot(_) | BatchContent::Empty => None,
}
}
pub fn push_event(&mut self, event: TaskEvent) {
match self {
BatchContent::DocumentAddtitionBatch(ts) => {
ts.iter_mut().for_each(|t| t.events.push(event.clone()))
}
BatchContent::IndexUpdate(t) | BatchContent::Dump(t) => t.events.push(event),
BatchContent::Snapshot(_) | BatchContent::Empty => (),
}
}
}
#[derive(Debug)] #[derive(Debug)]
pub struct Batch { pub struct Batch {
pub id: BatchId, // Only batches that contains a persistant tasks are given an id. Snapshot batches don't have
// an id.
pub id: Option<BatchId>,
pub created_at: OffsetDateTime, pub created_at: OffsetDateTime,
pub tasks: Vec<Task>, pub content: BatchContent,
} }
impl Batch { impl Batch {
pub fn new(id: Option<BatchId>, content: BatchContent) -> Self {
Self {
id,
created_at: OffsetDateTime::now_utc(),
content,
}
}
pub fn len(&self) -> usize { pub fn len(&self) -> usize {
self.tasks.len() match self.content {
BatchContent::DocumentAddtitionBatch(ref ts) => ts.len(),
BatchContent::IndexUpdate(_) | BatchContent::Dump(_) | BatchContent::Snapshot(_) => 1,
BatchContent::Empty => 0,
}
} }
pub fn is_empty(&self) -> bool { pub fn is_empty(&self) -> bool {
self.tasks.is_empty() self.len() == 0
}
pub fn empty() -> Self {
Self {
id: None,
created_at: OffsetDateTime::now_utc(),
content: BatchContent::Empty,
}
} }
} }

View File

@ -0,0 +1,20 @@
use crate::tasks::batch::{Batch, BatchContent};
use crate::tasks::BatchHandler;
/// A sink handler for empty tasks.
pub struct EmptyBatchHandler;
#[async_trait::async_trait]
impl BatchHandler for EmptyBatchHandler {
fn accept(&self, batch: &Batch) -> bool {
matches!(batch.content, BatchContent::Empty)
}
async fn process_batch(&self, batch: Batch) -> Batch {
batch
}
async fn finish(&self, _: &Batch) {
()
}
}

View File

@ -0,0 +1,58 @@
use time::OffsetDateTime;
use crate::index_resolver::IndexResolver;
use crate::index_resolver::{index_store::IndexStore, meta_store::IndexMetaStore};
use crate::tasks::batch::{Batch, BatchContent};
use crate::tasks::task::TaskEvent;
use crate::tasks::BatchHandler;
#[async_trait::async_trait]
impl<U, I> BatchHandler for IndexResolver<U, I>
where
U: IndexMetaStore + Send + Sync + 'static,
I: IndexStore + Send + Sync + 'static,
{
fn accept(&self, batch: &Batch) -> bool {
match batch.content {
BatchContent::DocumentAddtitionBatch(_) | BatchContent::IndexUpdate(_) => true,
_ => false,
}
}
async fn process_batch(&self, mut batch: Batch) -> Batch {
match batch.content {
BatchContent::DocumentAddtitionBatch(ref mut tasks) => {
*tasks = self
.process_document_addition_batch(std::mem::take(tasks))
.await;
}
BatchContent::IndexUpdate(ref mut task) => match self.process_task(&task).await {
Ok(success) => {
task.events.push(TaskEvent::Succeded {
result: success,
timestamp: OffsetDateTime::now_utc(),
});
}
Err(err) => task.events.push(TaskEvent::Failed {
error: err.into(),
timestamp: OffsetDateTime::now_utc(),
}),
},
_ => unreachable!(),
}
batch
}
async fn finish(&self, batch: &Batch) {
if let BatchContent::DocumentAddtitionBatch(ref tasks) = batch.content {
for task in tasks {
if let Some(content_uuid) = task.get_content_uuid() {
if let Err(e) = self.file_store.delete(content_uuid).await {
log::error!("error deleting update file: {}", e);
}
}
}
}
}
}

View File

@ -0,0 +1,2 @@
pub mod empty_handler;
mod index_resolver_handler;

View File

@ -1,5 +1,6 @@
use async_trait::async_trait; use async_trait::async_trait;
pub use batch_handlers::empty_handler::EmptyBatchHandler;
pub use scheduler::Scheduler; pub use scheduler::Scheduler;
pub use task_store::TaskFilter; pub use task_store::TaskFilter;
@ -11,9 +12,8 @@ pub use task_store::TaskStore;
use batch::Batch; use batch::Batch;
use error::Result; use error::Result;
use self::task::Job;
pub mod batch; pub mod batch;
mod batch_handlers;
pub mod error; pub mod error;
mod scheduler; mod scheduler;
pub mod task; pub mod task;
@ -22,12 +22,13 @@ pub mod update_loop;
#[cfg_attr(test, mockall::automock(type Error=test::DebugError;))] #[cfg_attr(test, mockall::automock(type Error=test::DebugError;))]
#[async_trait] #[async_trait]
pub trait TaskPerformer: Sync + Send + 'static { pub trait BatchHandler: Sync + Send + 'static {
/// return whether this handler can accept this batch
fn accept(&self, batch: &Batch) -> bool;
/// Processes the `Task` batch returning the batch with the `Task` updated. /// Processes the `Task` batch returning the batch with the `Task` updated.
async fn process_batch(&self, batch: Batch) -> Batch; async fn process_batch(&self, batch: Batch) -> Batch;
async fn process_job(&self, job: Job);
/// `finish` is called when the result of `process` has been commited to the task store. This /// `finish` is called when the result of `process` has been commited to the task store. This
/// method can be used to perform cleanup after the update has been completed for example. /// method can be used to perform cleanup after the update has been completed for example.
async fn finish(&self, batch: &Batch); async fn finish(&self, batch: &Batch);

View File

@ -2,6 +2,7 @@ use std::cmp::Ordering;
use std::collections::{hash_map::Entry, BinaryHeap, HashMap, VecDeque}; use std::collections::{hash_map::Entry, BinaryHeap, HashMap, VecDeque};
use std::ops::{Deref, DerefMut}; use std::ops::{Deref, DerefMut};
use std::path::Path; use std::path::Path;
use std::slice;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
@ -11,24 +12,21 @@ use time::OffsetDateTime;
use tokio::sync::{watch, RwLock}; use tokio::sync::{watch, RwLock};
use crate::options::SchedulerConfig; use crate::options::SchedulerConfig;
use crate::snapshot::SnapshotJob;
use crate::update_file_store::UpdateFileStore; use crate::update_file_store::UpdateFileStore;
use super::batch::Batch; use super::batch::{Batch, BatchContent};
use super::error::Result; use super::error::Result;
use super::task::{Job, Task, TaskContent, TaskEvent, TaskId}; use super::task::{Task, TaskContent, TaskEvent, TaskId};
use super::update_loop::UpdateLoop; use super::update_loop::UpdateLoop;
use super::{TaskFilter, TaskPerformer, TaskStore}; use super::{BatchHandler, TaskFilter, TaskStore};
#[derive(Eq, Debug, Clone, Copy)] #[derive(Eq, Debug, Clone, Copy)]
enum TaskType { enum TaskType {
DocumentAddition { DocumentAddition { number: usize },
number: usize, DocumentUpdate { number: usize },
}, IndexUpdate,
DocumentUpdate { Dump,
number: usize,
},
/// Any other kind of task, including Dumps
Other,
} }
/// Two tasks are equal if they have the same type. /// Two tasks are equal if they have the same type.
@ -166,7 +164,13 @@ impl TaskQueue {
} => TaskType::DocumentUpdate { } => TaskType::DocumentUpdate {
number: documents_count, number: documents_count,
}, },
_ => TaskType::Other, TaskContent::Dump { .. } => TaskType::Dump,
TaskContent::DocumentDeletion(_)
| TaskContent::SettingsUpdate { .. }
| TaskContent::IndexDeletion
| TaskContent::IndexCreation { .. }
| TaskContent::IndexUpdate { .. } => TaskType::IndexUpdate,
_ => unreachable!("unhandled task type"),
}; };
let task = PendingTask { kind, id }; let task = PendingTask { kind, id };
@ -217,11 +221,12 @@ impl TaskQueue {
} }
pub struct Scheduler { pub struct Scheduler {
jobs: VecDeque<Job>, // TODO: currently snapshots are non persistent tasks, and are treated differently.
snapshots: VecDeque<SnapshotJob>,
tasks: TaskQueue, tasks: TaskQueue,
store: TaskStore, store: TaskStore,
processing: Vec<TaskId>, processing: Processing,
next_fetched_task_id: TaskId, next_fetched_task_id: TaskId,
config: SchedulerConfig, config: SchedulerConfig,
/// Notifies the update loop that a new task was received /// Notifies the update loop that a new task was received
@ -229,14 +234,11 @@ pub struct Scheduler {
} }
impl Scheduler { impl Scheduler {
pub fn new<P>( pub fn new(
store: TaskStore, store: TaskStore,
performer: Arc<P>, performers: Vec<Arc<dyn BatchHandler + Sync + Send + 'static>>,
mut config: SchedulerConfig, mut config: SchedulerConfig,
) -> Result<Arc<RwLock<Self>>> ) -> Result<Arc<RwLock<Self>>> {
where
P: TaskPerformer,
{
let (notifier, rcv) = watch::channel(()); let (notifier, rcv) = watch::channel(());
let debounce_time = config.debounce_duration_sec; let debounce_time = config.debounce_duration_sec;
@ -247,11 +249,11 @@ impl Scheduler {
} }
let this = Self { let this = Self {
jobs: VecDeque::new(), snapshots: VecDeque::new(),
tasks: TaskQueue::default(), tasks: TaskQueue::default(),
store, store,
processing: Vec::new(), processing: Processing::Nothing,
next_fetched_task_id: 0, next_fetched_task_id: 0,
config, config,
notifier, notifier,
@ -264,7 +266,7 @@ impl Scheduler {
let update_loop = UpdateLoop::new( let update_loop = UpdateLoop::new(
this.clone(), this.clone(),
performer, performers,
debounce_time.filter(|&v| v > 0).map(Duration::from_secs), debounce_time.filter(|&v| v > 0).map(Duration::from_secs),
rcv, rcv,
); );
@ -283,9 +285,13 @@ impl Scheduler {
self.tasks.insert(task); self.tasks.insert(task);
} }
pub fn register_snapshot(&mut self, job: SnapshotJob) {
self.snapshots.push_back(job);
}
/// Clears the processing list, this method should be called when the processing of a batch is finished. /// Clears the processing list, this method should be called when the processing of a batch is finished.
pub fn finish(&mut self) { pub fn finish(&mut self) {
self.processing.clear(); self.processing = Processing::Nothing;
} }
pub fn notify(&self) { pub fn notify(&self) {
@ -293,13 +299,27 @@ impl Scheduler {
} }
fn notify_if_not_empty(&self) { fn notify_if_not_empty(&self) {
if !self.jobs.is_empty() || !self.tasks.is_empty() { if !self.snapshots.is_empty() || !self.tasks.is_empty() {
self.notify(); self.notify();
} }
} }
pub async fn update_tasks(&self, tasks: Vec<Task>) -> Result<Vec<Task>> { pub async fn update_tasks(&self, content: BatchContent) -> Result<BatchContent> {
self.store.update_tasks(tasks).await match content {
BatchContent::DocumentAddtitionBatch(tasks) => {
let tasks = self.store.update_tasks(tasks).await?;
Ok(BatchContent::DocumentAddtitionBatch(tasks))
}
BatchContent::IndexUpdate(t) => {
let mut tasks = self.store.update_tasks(vec![t]).await?;
Ok(BatchContent::IndexUpdate(tasks.remove(0)))
}
BatchContent::Dump(t) => {
let mut tasks = self.store.update_tasks(vec![t]).await?;
Ok(BatchContent::Dump(tasks.remove(0)))
}
other => Ok(other),
}
} }
pub async fn get_task(&self, id: TaskId, filter: Option<TaskFilter>) -> Result<Task> { pub async fn get_task(&self, id: TaskId, filter: Option<TaskFilter>) -> Result<Task> {
@ -318,16 +338,16 @@ impl Scheduler {
pub async fn get_processing_tasks(&self) -> Result<Vec<Task>> { pub async fn get_processing_tasks(&self) -> Result<Vec<Task>> {
let mut tasks = Vec::new(); let mut tasks = Vec::new();
for id in self.processing.iter() { for id in self.processing.ids() {
let task = self.store.get_task(*id, None).await?; let task = self.store.get_task(id, None).await?;
tasks.push(task); tasks.push(task);
} }
Ok(tasks) Ok(tasks)
} }
pub async fn schedule_job(&mut self, job: Job) { pub async fn schedule_snapshot(&mut self, job: SnapshotJob) {
self.jobs.push_back(job); self.snapshots.push_back(job);
self.notify(); self.notify();
} }
@ -353,84 +373,136 @@ impl Scheduler {
} }
/// Prepare the next batch, and set `processing` to the ids in that batch. /// Prepare the next batch, and set `processing` to the ids in that batch.
pub async fn prepare(&mut self) -> Result<Pending> { pub async fn prepare(&mut self) -> Result<Batch> {
// If there is a job to process, do it first. // If there is a job to process, do it first.
if let Some(job) = self.jobs.pop_front() { if let Some(job) = self.snapshots.pop_front() {
// There is more work to do, notify the update loop // There is more work to do, notify the update loop
self.notify_if_not_empty(); self.notify_if_not_empty();
return Ok(Pending::Job(job)); let batch = Batch::new(None, BatchContent::Snapshot(job));
return Ok(batch);
} }
// Try to fill the queue with pending tasks. // Try to fill the queue with pending tasks.
self.fetch_pending_tasks().await?; self.fetch_pending_tasks().await?;
make_batch(&mut self.tasks, &mut self.processing, &self.config); self.processing = make_batch(&mut self.tasks, &self.config);
log::debug!("prepared batch with {} tasks", self.processing.len()); log::debug!("prepared batch with {} tasks", self.processing.len());
if !self.processing.is_empty() { if !self.processing.is_nothing() {
let ids = std::mem::take(&mut self.processing); let (processing, mut content) = self
.store
.get_processing_tasks(std::mem::take(&mut self.processing))
.await?;
let (ids, mut tasks) = self.store.get_pending_tasks(ids).await?; // The batch id is the id of the first update it contains. At this point we must have a
// valid batch that contains at least 1 task.
// The batch id is the id of the first update it contains let id = match content.first() {
let id = match tasks.first() {
Some(Task { id, .. }) => *id, Some(Task { id, .. }) => *id,
_ => panic!("invalid batch"), _ => panic!("invalid batch"),
}; };
tasks.iter_mut().for_each(|t| { content.push_event(TaskEvent::Batched {
t.events.push(TaskEvent::Batched {
batch_id: id, batch_id: id,
timestamp: OffsetDateTime::now_utc(), timestamp: OffsetDateTime::now_utc(),
})
}); });
self.processing = ids; self.processing = processing;
let batch = Batch { let batch = Batch::new(Some(id), content);
id,
created_at: OffsetDateTime::now_utc(),
tasks,
};
// There is more work to do, notify the update loop // There is more work to do, notify the update loop
self.notify_if_not_empty(); self.notify_if_not_empty();
Ok(Pending::Batch(batch)) Ok(batch)
} else { } else {
Ok(Pending::Nothing) Ok(Batch::empty())
} }
} }
} }
#[derive(Debug)] #[derive(Debug, Default)]
pub enum Pending { pub enum Processing {
Batch(Batch), DocumentAdditions(Vec<TaskId>),
Job(Job), IndexUpdate(TaskId),
Dump(TaskId),
/// Variant used when there is nothing to process.
#[default]
Nothing, Nothing,
} }
fn make_batch(tasks: &mut TaskQueue, processing: &mut Vec<TaskId>, config: &SchedulerConfig) { enum ProcessingIter<'a> {
processing.clear(); Many(slice::Iter<'a, TaskId>),
Single(Option<TaskId>),
}
impl<'a> Iterator for ProcessingIter<'a> {
type Item = TaskId;
fn next(&mut self) -> Option<Self::Item> {
match self {
ProcessingIter::Many(iter) => iter.next().copied(),
ProcessingIter::Single(val) => val.take(),
}
}
}
impl Processing {
fn is_nothing(&self) -> bool {
matches!(self, Processing::Nothing)
}
pub fn ids(&self) -> impl Iterator<Item = TaskId> + '_ {
match self {
Processing::DocumentAdditions(v) => ProcessingIter::Many(v.iter()),
Processing::IndexUpdate(id) | Processing::Dump(id) => ProcessingIter::Single(Some(*id)),
Processing::Nothing => ProcessingIter::Single(None),
}
}
pub fn len(&self) -> usize {
match self {
Processing::DocumentAdditions(v) => v.len(),
Processing::IndexUpdate(_) | Processing::Dump(_) => 1,
Processing::Nothing => 0,
}
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
fn make_batch(tasks: &mut TaskQueue, config: &SchedulerConfig) -> Processing {
let mut doc_count = 0; let mut doc_count = 0;
tasks.head_mut(|list| match list.peek().copied() { tasks
.head_mut(|list| match list.peek().copied() {
Some(PendingTask { Some(PendingTask {
kind: TaskType::Other, kind: TaskType::IndexUpdate,
id, id,
}) => { }) => {
processing.push(id);
list.pop(); list.pop();
Processing::IndexUpdate(id)
} }
Some(PendingTask { kind, .. }) => loop { Some(PendingTask {
kind: TaskType::Dump,
id,
}) => {
list.pop();
Processing::Dump(id)
}
Some(PendingTask { kind, .. }) => {
let mut task_list = Vec::new();
loop {
match list.peek() { match list.peek() {
Some(pending) if pending.kind == kind => { Some(pending) if pending.kind == kind => {
// We always need to process at least one task for the scheduler to make progress. // We always need to process at least one task for the scheduler to make progress.
if processing.len() >= config.max_batch_size.unwrap_or(usize::MAX).max(1) { if task_list.len() >= config.max_batch_size.unwrap_or(usize::MAX).max(1)
{
break; break;
} }
let pending = list.pop().unwrap(); let pending = list.pop().unwrap();
processing.push(pending.id); task_list.push(pending.id);
// We add the number of documents to the count if we are scheduling document additions and // We add the number of documents to the count if we are scheduling document additions and
// stop adding if we already have enough. // stop adding if we already have enough.
@ -441,7 +513,9 @@ fn make_batch(tasks: &mut TaskQueue, processing: &mut Vec<TaskId>, config: &Sche
| TaskType::DocumentAddition { number } => { | TaskType::DocumentAddition { number } => {
doc_count += number; doc_count += number;
if doc_count >= config.max_documents_per_batch.unwrap_or(usize::MAX) { if doc_count
>= config.max_documents_per_batch.unwrap_or(usize::MAX)
{
break; break;
} }
} }
@ -450,9 +524,12 @@ fn make_batch(tasks: &mut TaskQueue, processing: &mut Vec<TaskId>, config: &Sche
} }
_ => break, _ => break,
} }
}, }
None => (), Processing::DocumentAdditions(task_list)
}); }
None => Processing::Nothing,
})
.unwrap_or(Processing::Nothing)
} }
#[cfg(test)] #[cfg(test)]

View File

@ -4,14 +4,12 @@ use meilisearch_error::ResponseError;
use milli::update::{DocumentAdditionResult, IndexDocumentsMethod}; use milli::update::{DocumentAdditionResult, IndexDocumentsMethod};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use time::OffsetDateTime; use time::OffsetDateTime;
use tokio::sync::oneshot;
use uuid::Uuid; use uuid::Uuid;
use super::batch::BatchId; use super::batch::BatchId;
use crate::{ use crate::{
index::{Settings, Unchecked}, index::{Settings, Unchecked},
index_resolver::{error::IndexResolverError, IndexUid}, index_resolver::IndexUid,
snapshot::SnapshotJob,
}; };
pub type TaskId = u64; pub type TaskId = u64;
@ -110,33 +108,6 @@ impl Task {
} }
} }
/// A job is like a volatile priority `Task`.
/// It should be processed as fast as possible and is not stored on disk.
/// This means, when Meilisearch is closed all your unprocessed jobs will disappear.
#[derive(Debug, derivative::Derivative)]
#[derivative(PartialEq)]
pub enum Job {
Dump {
#[derivative(PartialEq = "ignore")]
ret: oneshot::Sender<Result<oneshot::Sender<()>, IndexResolverError>>,
path: PathBuf,
},
Snapshot(#[derivative(PartialEq = "ignore")] SnapshotJob),
Empty,
}
impl Default for Job {
fn default() -> Self {
Self::Empty
}
}
impl Job {
pub fn take(&mut self) -> Self {
std::mem::take(self)
}
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[cfg_attr(test, derive(proptest_derive::Arbitrary))] #[cfg_attr(test, derive(proptest_derive::Arbitrary))]
pub enum DocumentDeletion { pub enum DocumentDeletion {

View File

@ -9,7 +9,9 @@ use log::debug;
use milli::heed::{Env, RwTxn}; use milli::heed::{Env, RwTxn};
use time::OffsetDateTime; use time::OffsetDateTime;
use super::batch::BatchContent;
use super::error::TaskError; use super::error::TaskError;
use super::scheduler::Processing;
use super::task::{Task, TaskContent, TaskId}; use super::task::{Task, TaskContent, TaskId};
use super::Result; use super::Result;
use crate::index_resolver::IndexUid; use crate::index_resolver::IndexUid;
@ -122,19 +124,44 @@ impl TaskStore {
} }
} }
pub async fn get_pending_tasks(&self, ids: Vec<TaskId>) -> Result<(Vec<TaskId>, Vec<Task>)> { /// This methods takes a `Processing` which contains the next task ids to process, and returns
/// the coresponding tasks along with the ownership to the passed processing.
///
/// We need get_processing_tasks to take ownership over `Processing` because we need it to be
/// valid for 'static.
pub async fn get_processing_tasks(
&self,
processing: Processing,
) -> Result<(Processing, BatchContent)> {
let store = self.store.clone(); let store = self.store.clone();
let tasks = tokio::task::spawn_blocking(move || -> Result<_> { let tasks = tokio::task::spawn_blocking(move || -> Result<_> {
let mut tasks = Vec::new();
let txn = store.rtxn()?; let txn = store.rtxn()?;
let content = match processing {
Processing::DocumentAdditions(ref ids) => {
let mut tasks = Vec::new();
for id in ids.iter() { for id in ids.iter() {
let task = store let task = store
.get(&txn, *id)? .get(&txn, *id)?
.ok_or(TaskError::UnexistingTask(*id))?; .ok_or(TaskError::UnexistingTask(*id))?;
tasks.push(task); tasks.push(task);
} }
Ok((ids, tasks)) BatchContent::DocumentAddtitionBatch(tasks)
}
Processing::IndexUpdate(id) => {
let task = store.get(&txn, id)?.ok_or(TaskError::UnexistingTask(id))?;
BatchContent::IndexUpdate(task)
}
Processing::Dump(id) => {
let task = store.get(&txn, id)?.ok_or(TaskError::UnexistingTask(id))?;
debug_assert!(matches!(task.content, TaskContent::Dump { .. }));
BatchContent::Dump(task)
}
Processing::Nothing => unreachable!(),
};
Ok((processing, content))
}) })
.await??; .await??;
@ -231,7 +258,7 @@ impl TaskStore {
#[cfg(test)] #[cfg(test)]
pub mod test { pub mod test {
use crate::tasks::task_store::store::test::tmp_env; use crate::tasks::{scheduler::Processing, task_store::store::test::tmp_env};
use super::*; use super::*;
@ -280,12 +307,12 @@ pub mod test {
} }
} }
pub async fn get_pending_tasks( pub async fn get_processing_tasks(
&self, &self,
tasks: Vec<TaskId>, tasks: Processing,
) -> Result<(Vec<TaskId>, Vec<Task>)> { ) -> Result<(Processing, BatchContent)> {
match self { match self {
Self::Real(s) => s.get_pending_tasks(tasks).await, Self::Real(s) => s.get_processing_tasks(tasks).await,
Self::Mock(m) => unsafe { m.get("get_pending_task").call(tasks) }, Self::Mock(m) => unsafe { m.get("get_pending_task").call(tasks) },
} }
} }

View File

@ -7,33 +7,29 @@ use tokio::time::interval_at;
use super::batch::Batch; use super::batch::Batch;
use super::error::Result; use super::error::Result;
use super::scheduler::Pending; use super::{BatchHandler, Scheduler};
use super::{Scheduler, TaskPerformer};
use crate::tasks::task::TaskEvent; use crate::tasks::task::TaskEvent;
/// The update loop sequentially performs batches of updates by asking the scheduler for a batch, /// The update loop sequentially performs batches of updates by asking the scheduler for a batch,
/// and handing it to the `TaskPerformer`. /// and handing it to the `TaskPerformer`.
pub struct UpdateLoop<P: TaskPerformer> { pub struct UpdateLoop {
scheduler: Arc<RwLock<Scheduler>>, scheduler: Arc<RwLock<Scheduler>>,
performer: Arc<P>, performers: Vec<Arc<dyn BatchHandler + Send + Sync + 'static>>,
notifier: Option<watch::Receiver<()>>, notifier: Option<watch::Receiver<()>>,
debounce_duration: Option<Duration>, debounce_duration: Option<Duration>,
} }
impl<P> UpdateLoop<P> impl UpdateLoop {
where
P: TaskPerformer + Send + Sync + 'static,
{
pub fn new( pub fn new(
scheduler: Arc<RwLock<Scheduler>>, scheduler: Arc<RwLock<Scheduler>>,
performer: Arc<P>, performers: Vec<Arc<dyn BatchHandler + Send + Sync + 'static>>,
debuf_duration: Option<Duration>, debuf_duration: Option<Duration>,
notifier: watch::Receiver<()>, notifier: watch::Receiver<()>,
) -> Self { ) -> Self {
Self { Self {
scheduler, scheduler,
performer, performers,
debounce_duration: debuf_duration, debounce_duration: debuf_duration,
notifier: Some(notifier), notifier: Some(notifier),
} }
@ -59,34 +55,29 @@ where
} }
async fn process_next_batch(&self) -> Result<()> { async fn process_next_batch(&self) -> Result<()> {
let pending = { self.scheduler.write().await.prepare().await? }; let mut batch = { self.scheduler.write().await.prepare().await? };
match pending { let performer = self
Pending::Batch(mut batch) => { .performers
for task in &mut batch.tasks { .iter()
task.events .find(|p| p.accept(&batch))
.push(TaskEvent::Processing(OffsetDateTime::now_utc())); .expect("No performer found for batch")
} .clone();
batch.tasks = { batch
.content
.push_event(TaskEvent::Processing(OffsetDateTime::now_utc()));
batch.content = {
self.scheduler self.scheduler
.read() .read()
.await .await
.update_tasks(batch.tasks) .update_tasks(batch.content)
.await? .await?
}; };
let performer = self.performer.clone();
let batch = performer.process_batch(batch).await; let batch = performer.process_batch(batch).await;
self.handle_batch_result(batch).await?; self.handle_batch_result(batch, performer).await?;
}
Pending::Job(job) => {
let performer = self.performer.clone();
performer.process_job(job).await;
}
Pending::Nothing => (),
}
Ok(()) Ok(())
} }
@ -96,13 +87,17 @@ where
/// When a task is processed, the result of the process is pushed to its event list. The /// When a task is processed, the result of the process is pushed to its event list. The
/// `handle_batch_result` make sure that the new state is saved to the store. /// `handle_batch_result` make sure that the new state is saved to the store.
/// The tasks are then removed from the processing queue. /// The tasks are then removed from the processing queue.
async fn handle_batch_result(&self, mut batch: Batch) -> Result<()> { async fn handle_batch_result(
&self,
mut batch: Batch,
performer: Arc<dyn BatchHandler + Sync + Send + 'static>,
) -> Result<()> {
let mut scheduler = self.scheduler.write().await; let mut scheduler = self.scheduler.write().await;
let tasks = scheduler.update_tasks(batch.tasks).await?; let content = scheduler.update_tasks(batch.content).await?;
scheduler.finish(); scheduler.finish();
drop(scheduler); drop(scheduler);
batch.tasks = tasks; batch.content = content;
self.performer.finish(&batch).await; performer.finish(&batch).await;
Ok(()) Ok(())
} }
} }