meilisearch/meilisearch-http/src/index_controller/snapshot.rs

87 lines
2.4 KiB
Rust
Raw Normal View History

2021-03-17 18:53:23 +08:00
use std::path::PathBuf;
use std::time::Duration;
2021-03-22 23:51:53 +08:00
use anyhow::bail;
2021-03-23 02:19:37 +08:00
use log::{error, info};
2021-03-22 23:51:53 +08:00
use tokio::fs;
use tokio::task::spawn_blocking;
2021-03-23 02:19:37 +08:00
use tokio::time::sleep;
2021-03-17 18:53:23 +08:00
2021-03-22 23:51:53 +08:00
use crate::helpers::compression;
2021-03-17 18:53:23 +08:00
use super::index_actor::IndexActorHandle;
use super::update_actor::UpdateActorHandle;
use super::uuid_resolver::UuidResolverHandle;
#[allow(dead_code)]
pub struct SnapshotService<B> {
index_handle: IndexActorHandle,
uuid_resolver_handle: UuidResolverHandle,
update_handle: UpdateActorHandle<B>,
snapshot_period: Duration,
snapshot_path: PathBuf,
}
impl<B> SnapshotService<B> {
pub fn new(
index_handle: IndexActorHandle,
uuid_resolver_handle: UuidResolverHandle,
update_handle: UpdateActorHandle<B>,
snapshot_period: Duration,
snapshot_path: PathBuf,
) -> Self {
Self {
index_handle,
uuid_resolver_handle,
update_handle,
snapshot_period,
snapshot_path,
}
}
pub async fn run(self) {
loop {
2021-03-23 02:19:37 +08:00
sleep(self.snapshot_period).await;
if let Err(e) = self.perform_snapshot().await {
error!("{}", e);
}
2021-03-17 18:53:23 +08:00
}
}
2021-03-20 03:08:00 +08:00
async fn perform_snapshot(&self) -> anyhow::Result<()> {
2021-03-22 23:51:53 +08:00
if self.snapshot_path.file_name().is_none() {
bail!("invalid snapshot file path");
}
let temp_snapshot_dir = spawn_blocking(move || tempfile::tempdir_in(".")).await??;
let temp_snapshot_path = temp_snapshot_dir.path().to_owned();
fs::create_dir_all(&temp_snapshot_path).await?;
2021-03-20 03:08:00 +08:00
let uuids = self.uuid_resolver_handle.snapshot(temp_snapshot_path.clone()).await?;
2021-03-23 02:19:37 +08:00
if uuids.is_empty() {
return Ok(())
}
2021-03-23 02:59:19 +08:00
let tasks = uuids
.iter()
.map(|&uuid| self.update_handle.snapshot(uuid, temp_snapshot_path.clone()))
.collect::<Vec<_>>();
futures::future::try_join_all(tasks).await?;
2021-03-22 23:51:53 +08:00
let temp_snapshot_file = temp_snapshot_path.with_extension("temp");
let temp_snapshot_file_clone = temp_snapshot_file.clone();
let temp_snapshot_path_clone = temp_snapshot_path.clone();
spawn_blocking(move || compression::to_tar_gz(temp_snapshot_path_clone, temp_snapshot_file_clone)).await??;
fs::rename(temp_snapshot_file, &self.snapshot_path).await?;
2021-03-23 02:19:37 +08:00
info!("Created snapshot in {:?}.", self.snapshot_path);
2021-03-20 03:08:00 +08:00
Ok(())
2021-03-17 18:53:23 +08:00
}
}