atomic snapshot import

This commit is contained in:
mpostma 2021-03-25 14:48:51 +01:00
parent 7d6ec7f3d3
commit d7c077cffb
No known key found for this signature in database
GPG Key ID: CBC8A7C1D7A28C3A

View File

@ -3,9 +3,9 @@ use std::time::Duration;
use anyhow::bail; use anyhow::bail;
use log::{error, info}; use log::{error, info};
use tokio::fs;
use tokio::task::spawn_blocking; use tokio::task::spawn_blocking;
use tokio::time::sleep; use tokio::time::sleep;
use tokio::fs;
use super::update_actor::UpdateActorHandle; use super::update_actor::UpdateActorHandle;
use super::uuid_resolver::UuidResolverHandle; use super::uuid_resolver::UuidResolverHandle;
@ -59,7 +59,8 @@ where
let snapshot_dir = self.snapshot_path.clone(); let snapshot_dir = self.snapshot_path.clone();
fs::create_dir_all(&snapshot_dir).await?; fs::create_dir_all(&snapshot_dir).await?;
let temp_snapshot_dir = spawn_blocking(move || tempfile::tempdir_in(snapshot_dir)).await??; let temp_snapshot_dir =
spawn_blocking(move || tempfile::tempdir_in(snapshot_dir)).await??;
let temp_snapshot_path = temp_snapshot_dir.path().to_owned(); let temp_snapshot_path = temp_snapshot_dir.path().to_owned();
let uuids = self let uuids = self
@ -82,7 +83,9 @@ where
futures::future::try_join_all(tasks).await?; futures::future::try_join_all(tasks).await?;
let snapshot_dir = self.snapshot_path.clone(); let snapshot_dir = self.snapshot_path.clone();
let snapshot_path = self.snapshot_path.join(format!("{}.snapshot", self.db_name)); let snapshot_path = self
.snapshot_path
.join(format!("{}.snapshot", self.db_name));
let snapshot_path = spawn_blocking(move || -> anyhow::Result<PathBuf> { let snapshot_path = spawn_blocking(move || -> anyhow::Result<PathBuf> {
let temp_snapshot_file = tempfile::NamedTempFile::new_in(snapshot_dir)?; let temp_snapshot_file = tempfile::NamedTempFile::new_in(snapshot_dir)?;
let temp_snapshot_file_path = temp_snapshot_file.path().to_owned(); let temp_snapshot_file_path = temp_snapshot_file.path().to_owned();
@ -105,7 +108,14 @@ pub fn load_snapshot(
ignore_missing_snapshot: bool, ignore_missing_snapshot: bool,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
if !db_path.as_ref().exists() && snapshot_path.as_ref().exists() { if !db_path.as_ref().exists() && snapshot_path.as_ref().exists() {
compression::from_tar_gz(snapshot_path, db_path) match compression::from_tar_gz(snapshot_path, &db_path) {
Ok(()) => Ok(()),
Err(e) => {
// clean created db folder
std::fs::remove_dir_all(&db_path)?;
Err(e)
}
}
} else if db_path.as_ref().exists() && !ignore_snapshot_if_db_exists { } else if db_path.as_ref().exists() && !ignore_snapshot_if_db_exists {
bail!( bail!(
"database already exists at {:?}, try to delete it or rename it", "database already exists at {:?}, try to delete it or rename it",