snapshot batch handler

This commit is contained in:
ad hoc 2022-05-23 16:30:06 +02:00
parent 8743d73973
commit 7b47e4e87a
No known key found for this signature in database
GPG Key ID: 4F00A782990CC643
6 changed files with 39 additions and 7 deletions

View File

@ -27,7 +27,9 @@ use crate::options::{IndexerOpts, SchedulerConfig};
use crate::snapshot::{load_snapshot, SnapshotService}; use crate::snapshot::{load_snapshot, SnapshotService};
use crate::tasks::error::TaskError; use crate::tasks::error::TaskError;
use crate::tasks::task::{DocumentDeletion, Task, TaskContent, TaskId}; use crate::tasks::task::{DocumentDeletion, Task, TaskContent, TaskId};
use crate::tasks::{BatchHandler, EmptyBatchHandler, Scheduler, TaskFilter, TaskStore}; use crate::tasks::{
BatchHandler, EmptyBatchHandler, Scheduler, SnapshotHandler, TaskFilter, TaskStore,
};
use error::Result; use error::Result;
use self::error::IndexControllerError; use self::error::IndexControllerError;
@ -235,6 +237,7 @@ impl IndexControllerBuilder {
let handlers: Vec<Arc<dyn BatchHandler + Sync + Send + 'static>> = vec![ let handlers: Vec<Arc<dyn BatchHandler + Sync + Send + 'static>> = vec![
index_resolver.clone(), index_resolver.clone(),
dump_handler, dump_handler,
Arc::new(SnapshotHandler),
// dummy handler to catch all empty batches // dummy handler to catch all empty batches
Arc::new(EmptyBatchHandler), Arc::new(EmptyBatchHandler),
]; ];

View File

@ -38,7 +38,7 @@ impl SnapshotService {
meta_env_size: self.meta_env_size, meta_env_size: self.meta_env_size,
index_size: self.index_size, index_size: self.index_size,
}; };
self.scheduler.write().await.register_snapshot(snapshot_job); self.scheduler.write().await.schedule_snapshot(snapshot_job);
sleep(self.snapshot_period).await; sleep(self.snapshot_period).await;
} }
} }

View File

@ -1,3 +1,4 @@
pub mod dump_handler; pub mod dump_handler;
pub mod empty_handler; pub mod empty_handler;
mod index_resolver_handler; mod index_resolver_handler;
pub mod snapshot_handler;

View File

@ -0,0 +1,31 @@
use crate::tasks::batch::{Batch, BatchContent};
use crate::tasks::BatchHandler;
pub struct SnapshotHandler;
#[async_trait::async_trait]
impl BatchHandler for SnapshotHandler {
fn accept(&self, batch: &Batch) -> bool {
match batch.content {
BatchContent::Snapshot(_) => true,
_ => false,
}
}
async fn process_batch(&self, batch: Batch) -> Batch {
match batch.content {
BatchContent::Snapshot(job) => {
if let Err(e) = job.run().await {
log::error!("snapshot error: {e}");
}
}
_ => unreachable!(),
}
Batch::empty()
}
async fn finish(&self, _: &Batch) {
()
}
}

View File

@ -1,6 +1,7 @@
use async_trait::async_trait; use async_trait::async_trait;
pub use batch_handlers::empty_handler::EmptyBatchHandler; pub use batch_handlers::empty_handler::EmptyBatchHandler;
pub use batch_handlers::snapshot_handler::SnapshotHandler;
pub use scheduler::Scheduler; pub use scheduler::Scheduler;
pub use task_store::TaskFilter; pub use task_store::TaskFilter;

View File

@ -279,10 +279,6 @@ impl Scheduler {
self.tasks.insert(task); self.tasks.insert(task);
} }
pub fn register_snapshot(&mut self, job: SnapshotJob) {
self.snapshots.push_back(job);
}
/// Clears the processing list, this method should be called when the processing of a batch is finished. /// Clears the processing list, this method should be called when the processing of a batch is finished.
pub fn finish(&mut self) { pub fn finish(&mut self) {
self.processing = Processing::Nothing; self.processing = Processing::Nothing;
@ -340,7 +336,7 @@ impl Scheduler {
Ok(tasks) Ok(tasks)
} }
pub async fn schedule_snapshot(&mut self, job: SnapshotJob) { pub fn schedule_snapshot(&mut self, job: SnapshotJob) {
self.snapshots.push_back(job); self.snapshots.push_back(job);
self.notify(); self.notify();
} }