2021-03-23 23:37:46 +08:00
|
|
|
use std::path::{Path, PathBuf};
|
2021-03-17 18:53:23 +08:00
|
|
|
use std::time::Duration;
|
|
|
|
|
2021-03-22 23:51:53 +08:00
|
|
|
use anyhow::bail;
|
2021-03-23 02:19:37 +08:00
|
|
|
use log::{error, info};
|
2021-03-22 23:51:53 +08:00
|
|
|
use tokio::fs;
|
|
|
|
use tokio::task::spawn_blocking;
|
2021-03-23 02:19:37 +08:00
|
|
|
use tokio::time::sleep;
|
2021-03-17 18:53:23 +08:00
|
|
|
|
|
|
|
use super::update_actor::UpdateActorHandle;
|
|
|
|
use super::uuid_resolver::UuidResolverHandle;
|
2021-03-23 23:19:01 +08:00
|
|
|
use crate::helpers::compression;
|
2021-03-17 18:53:23 +08:00
|
|
|
|
|
|
|
#[allow(dead_code)]
|
2021-03-23 18:00:50 +08:00
|
|
|
pub struct SnapshotService<U, R> {
|
|
|
|
uuid_resolver_handle: R,
|
|
|
|
update_handle: U,
|
2021-03-17 18:53:23 +08:00
|
|
|
snapshot_period: Duration,
|
|
|
|
snapshot_path: PathBuf,
|
|
|
|
}
|
|
|
|
|
2021-03-23 18:00:50 +08:00
|
|
|
impl<U, R> SnapshotService<U, R>
|
|
|
|
where
|
|
|
|
U: UpdateActorHandle,
|
2021-03-23 23:19:01 +08:00
|
|
|
R: UuidResolverHandle,
|
2021-03-23 18:00:50 +08:00
|
|
|
{
|
2021-03-17 18:53:23 +08:00
|
|
|
pub fn new(
|
2021-03-23 18:00:50 +08:00
|
|
|
uuid_resolver_handle: R,
|
|
|
|
update_handle: U,
|
2021-03-17 18:53:23 +08:00
|
|
|
snapshot_period: Duration,
|
|
|
|
snapshot_path: PathBuf,
|
|
|
|
) -> Self {
|
|
|
|
Self {
|
|
|
|
uuid_resolver_handle,
|
|
|
|
update_handle,
|
|
|
|
snapshot_period,
|
|
|
|
snapshot_path,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub async fn run(self) {
|
|
|
|
loop {
|
2021-03-23 02:19:37 +08:00
|
|
|
sleep(self.snapshot_period).await;
|
|
|
|
if let Err(e) = self.perform_snapshot().await {
|
|
|
|
error!("{}", e);
|
|
|
|
}
|
2021-03-17 18:53:23 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-03-20 03:08:00 +08:00
|
|
|
async fn perform_snapshot(&self) -> anyhow::Result<()> {
|
2021-03-23 23:19:01 +08:00
|
|
|
if !self.snapshot_path.is_file() {
|
2021-03-24 00:26:18 +08:00
|
|
|
bail!("Invalid snapshot file path.");
|
2021-03-22 23:51:53 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
let temp_snapshot_dir = spawn_blocking(move || tempfile::tempdir_in(".")).await??;
|
|
|
|
let temp_snapshot_path = temp_snapshot_dir.path().to_owned();
|
|
|
|
|
|
|
|
fs::create_dir_all(&temp_snapshot_path).await?;
|
|
|
|
|
2021-03-23 23:19:01 +08:00
|
|
|
let uuids = self
|
|
|
|
.uuid_resolver_handle
|
|
|
|
.snapshot(temp_snapshot_path.clone())
|
|
|
|
.await?;
|
2021-03-23 02:19:37 +08:00
|
|
|
|
|
|
|
if uuids.is_empty() {
|
2021-03-23 23:19:01 +08:00
|
|
|
return Ok(());
|
2021-03-23 02:19:37 +08:00
|
|
|
}
|
|
|
|
|
2021-03-23 02:59:19 +08:00
|
|
|
let tasks = uuids
|
|
|
|
.iter()
|
2021-03-23 23:19:01 +08:00
|
|
|
.map(|&uuid| {
|
|
|
|
self.update_handle
|
|
|
|
.snapshot(uuid, temp_snapshot_path.clone())
|
|
|
|
})
|
2021-03-23 02:59:19 +08:00
|
|
|
.collect::<Vec<_>>();
|
|
|
|
|
|
|
|
futures::future::try_join_all(tasks).await?;
|
2021-03-22 23:51:53 +08:00
|
|
|
|
|
|
|
let temp_snapshot_file = temp_snapshot_path.with_extension("temp");
|
|
|
|
|
|
|
|
let temp_snapshot_file_clone = temp_snapshot_file.clone();
|
|
|
|
let temp_snapshot_path_clone = temp_snapshot_path.clone();
|
2021-03-23 23:19:01 +08:00
|
|
|
spawn_blocking(move || {
|
|
|
|
compression::to_tar_gz(temp_snapshot_path_clone, temp_snapshot_file_clone)
|
|
|
|
})
|
|
|
|
.await??;
|
2021-03-22 23:51:53 +08:00
|
|
|
|
|
|
|
fs::rename(temp_snapshot_file, &self.snapshot_path).await?;
|
|
|
|
|
2021-03-23 02:19:37 +08:00
|
|
|
info!("Created snapshot in {:?}.", self.snapshot_path);
|
|
|
|
|
2021-03-20 03:08:00 +08:00
|
|
|
Ok(())
|
2021-03-17 18:53:23 +08:00
|
|
|
}
|
|
|
|
}
|
2021-03-23 23:19:01 +08:00
|
|
|
|
2021-03-23 23:37:46 +08:00
|
|
|
pub fn load_snapshot(
|
|
|
|
db_path: impl AsRef<Path>,
|
|
|
|
snapshot_path: impl AsRef<Path>,
|
|
|
|
ignore_snapshot_if_db_exists: bool,
|
|
|
|
ignore_missing_snapshot: bool,
|
|
|
|
) -> anyhow::Result<()> {
|
|
|
|
if !db_path.as_ref().exists() && snapshot_path.as_ref().exists() {
|
|
|
|
compression::from_tar_gz(snapshot_path, db_path)
|
|
|
|
} else if db_path.as_ref().exists() && !ignore_snapshot_if_db_exists {
|
|
|
|
bail!(
|
|
|
|
"database already exists at {:?}, try to delete it or rename it",
|
|
|
|
db_path
|
|
|
|
.as_ref()
|
|
|
|
.canonicalize()
|
|
|
|
.unwrap_or(db_path.as_ref().to_owned())
|
|
|
|
)
|
|
|
|
} else if !snapshot_path.as_ref().exists() && !ignore_missing_snapshot {
|
|
|
|
bail!(
|
|
|
|
"snapshot doesn't exist at {:?}",
|
|
|
|
snapshot_path
|
|
|
|
.as_ref()
|
|
|
|
.canonicalize()
|
|
|
|
.unwrap_or(snapshot_path.as_ref().to_owned())
|
|
|
|
)
|
|
|
|
} else {
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-03-23 23:19:01 +08:00
|
|
|
#[cfg(test)]
|
|
|
|
mod test {
|
2021-03-23 23:37:46 +08:00
|
|
|
use futures::future::{err, ok};
|
2021-03-23 23:19:01 +08:00
|
|
|
use rand::Rng;
|
|
|
|
use tokio::time::timeout;
|
|
|
|
use uuid::Uuid;
|
|
|
|
|
|
|
|
use super::*;
|
|
|
|
use crate::index_controller::update_actor::{MockUpdateActorHandle, UpdateError};
|
|
|
|
use crate::index_controller::uuid_resolver::{MockUuidResolverHandle, UuidError};
|
|
|
|
|
|
|
|
#[actix_rt::test]
|
|
|
|
async fn test_normal() {
|
|
|
|
let mut rng = rand::thread_rng();
|
|
|
|
let uuids_num = rng.gen_range(5, 10);
|
|
|
|
let uuids = (0..uuids_num).map(|_| Uuid::new_v4()).collect::<Vec<_>>();
|
|
|
|
|
|
|
|
let mut uuid_resolver = MockUuidResolverHandle::new();
|
|
|
|
let uuids_clone = uuids.clone();
|
|
|
|
uuid_resolver
|
|
|
|
.expect_snapshot()
|
|
|
|
.times(1)
|
|
|
|
.returning(move |_| Box::pin(ok(uuids_clone.clone())));
|
|
|
|
|
|
|
|
let mut update_handle = MockUpdateActorHandle::new();
|
|
|
|
let uuids_clone = uuids.clone();
|
|
|
|
update_handle
|
|
|
|
.expect_snapshot()
|
|
|
|
.withf(move |uuid, _path| uuids_clone.contains(uuid))
|
|
|
|
.times(uuids_num)
|
|
|
|
.returning(move |_, _| Box::pin(ok(())));
|
|
|
|
|
|
|
|
let snapshot_path = tempfile::NamedTempFile::new_in(".").unwrap();
|
|
|
|
let snapshot_service = SnapshotService::new(
|
|
|
|
uuid_resolver,
|
|
|
|
update_handle,
|
|
|
|
Duration::from_millis(100),
|
|
|
|
snapshot_path.path().to_owned(),
|
|
|
|
);
|
|
|
|
|
|
|
|
snapshot_service.perform_snapshot().await.unwrap();
|
|
|
|
}
|
|
|
|
|
|
|
|
#[actix_rt::test]
|
|
|
|
async fn bad_file_name() {
|
|
|
|
let uuid_resolver = MockUuidResolverHandle::new();
|
|
|
|
let update_handle = MockUpdateActorHandle::new();
|
|
|
|
|
|
|
|
let snapshot_service = SnapshotService::new(
|
|
|
|
uuid_resolver,
|
|
|
|
update_handle,
|
|
|
|
Duration::from_millis(100),
|
|
|
|
"directory/".into(),
|
|
|
|
);
|
|
|
|
|
|
|
|
assert!(snapshot_service.perform_snapshot().await.is_err());
|
|
|
|
}
|
|
|
|
|
|
|
|
#[actix_rt::test]
|
|
|
|
async fn error_performing_uuid_snapshot() {
|
|
|
|
let mut uuid_resolver = MockUuidResolverHandle::new();
|
|
|
|
uuid_resolver
|
|
|
|
.expect_snapshot()
|
|
|
|
.times(1)
|
|
|
|
// abitrary error
|
|
|
|
.returning(|_| Box::pin(err(UuidError::NameAlreadyExist)));
|
|
|
|
|
|
|
|
let update_handle = MockUpdateActorHandle::new();
|
|
|
|
|
|
|
|
let snapshot_path = tempfile::NamedTempFile::new_in(".").unwrap();
|
|
|
|
let snapshot_service = SnapshotService::new(
|
|
|
|
uuid_resolver,
|
|
|
|
update_handle,
|
|
|
|
Duration::from_millis(100),
|
|
|
|
snapshot_path.path().to_owned(),
|
|
|
|
);
|
|
|
|
|
|
|
|
assert!(snapshot_service.perform_snapshot().await.is_err());
|
|
|
|
// Nothing was written to the file
|
|
|
|
assert_eq!(snapshot_path.as_file().metadata().unwrap().len(), 0);
|
|
|
|
}
|
|
|
|
|
|
|
|
#[actix_rt::test]
|
|
|
|
async fn error_performing_index_snapshot() {
|
|
|
|
let uuid = Uuid::new_v4();
|
|
|
|
let mut uuid_resolver = MockUuidResolverHandle::new();
|
|
|
|
uuid_resolver
|
|
|
|
.expect_snapshot()
|
|
|
|
.times(1)
|
|
|
|
.returning(move |_| Box::pin(ok(vec![uuid])));
|
|
|
|
|
|
|
|
let mut update_handle = MockUpdateActorHandle::new();
|
|
|
|
update_handle
|
|
|
|
.expect_snapshot()
|
|
|
|
// abitrary error
|
|
|
|
.returning(|_, _| Box::pin(err(UpdateError::UnexistingUpdate(0))));
|
|
|
|
|
|
|
|
let snapshot_path = tempfile::NamedTempFile::new_in(".").unwrap();
|
|
|
|
let snapshot_service = SnapshotService::new(
|
|
|
|
uuid_resolver,
|
|
|
|
update_handle,
|
|
|
|
Duration::from_millis(100),
|
|
|
|
snapshot_path.path().to_owned(),
|
|
|
|
);
|
|
|
|
|
|
|
|
assert!(snapshot_service.perform_snapshot().await.is_err());
|
|
|
|
// Nothing was written to the file
|
|
|
|
assert_eq!(snapshot_path.as_file().metadata().unwrap().len(), 0);
|
|
|
|
}
|
|
|
|
|
|
|
|
#[actix_rt::test]
|
|
|
|
async fn test_loop() {
|
|
|
|
let mut uuid_resolver = MockUuidResolverHandle::new();
|
|
|
|
uuid_resolver
|
|
|
|
.expect_snapshot()
|
|
|
|
// we expect the funtion to be called between 2 and 3 time in the given interval.
|
|
|
|
.times(2..4)
|
|
|
|
// abitrary error, to short-circuit the function
|
|
|
|
.returning(move |_| Box::pin(err(UuidError::NameAlreadyExist)));
|
|
|
|
|
|
|
|
let update_handle = MockUpdateActorHandle::new();
|
|
|
|
|
|
|
|
let snapshot_path = tempfile::NamedTempFile::new_in(".").unwrap();
|
|
|
|
let snapshot_service = SnapshotService::new(
|
|
|
|
uuid_resolver,
|
|
|
|
update_handle,
|
|
|
|
Duration::from_millis(100),
|
|
|
|
snapshot_path.path().to_owned(),
|
|
|
|
);
|
|
|
|
|
|
|
|
let _ = timeout(Duration::from_millis(300), snapshot_service.run()).await;
|
|
|
|
}
|
|
|
|
}
|