dump meta

This commit is contained in:
Marin Postma 2021-05-27 10:51:19 +02:00
parent b924e897f1
commit c47369839b
No known key found for this signature in database
GPG Key ID: D5241F0C0C865F30
3 changed files with 82 additions and 54 deletions

View File

@ -1,17 +1,17 @@
use super::{DumpError, DumpInfo, DumpMsg, DumpResult, DumpStatus}; use super::{DumpError, DumpInfo, DumpMsg, DumpResult, DumpStatus};
use crate::helpers::compression; use crate::{helpers::compression, index_controller::dump_actor::Metadata};
use crate::index_controller::{update_actor, uuid_resolver}; use crate::index_controller::{update_actor, uuid_resolver};
use async_stream::stream; use async_stream::stream;
use chrono::Utc; use chrono::Utc;
use futures::stream::StreamExt; use futures::stream::StreamExt;
use log::{error, info}; use log::{error, info};
use std::{ use update_actor::UpdateActorHandle;
path::{Path, PathBuf}, use uuid_resolver::UuidResolverHandle;
sync::Arc, use std::{fs::File, path::{Path, PathBuf}, sync::Arc};
};
use tokio::{fs::create_dir_all, sync::{mpsc, oneshot, RwLock}}; use tokio::{fs::create_dir_all, sync::{mpsc, oneshot, RwLock}};
pub const CONCURRENT_DUMP_MSG: usize = 10; pub const CONCURRENT_DUMP_MSG: usize = 10;
const META_FILE_NAME: &'static str = "metadata.json";
pub struct DumpActor<UuidResolver, Update> { pub struct DumpActor<UuidResolver, Update> {
inbox: Option<mpsc::Receiver<DumpMsg>>, inbox: Option<mpsc::Receiver<DumpMsg>>,
@ -19,8 +19,8 @@ pub struct DumpActor<UuidResolver, Update> {
update: Update, update: Update,
dump_path: PathBuf, dump_path: PathBuf,
dump_info: Arc<RwLock<Option<DumpInfo>>>, dump_info: Arc<RwLock<Option<DumpInfo>>>,
_update_db_size: u64, update_db_size: u64,
_index_db_size: u64, index_db_size: u64,
} }
/// Generate uid from creation date /// Generate uid from creation date
@ -30,16 +30,16 @@ fn generate_uid() -> String {
impl<UuidResolver, Update> DumpActor<UuidResolver, Update> impl<UuidResolver, Update> DumpActor<UuidResolver, Update>
where where
UuidResolver: uuid_resolver::UuidResolverHandle + Send + Sync + Clone + 'static, UuidResolver: UuidResolverHandle + Send + Sync + Clone + 'static,
Update: update_actor::UpdateActorHandle + Send + Sync + Clone + 'static, Update: UpdateActorHandle + Send + Sync + Clone + 'static,
{ {
pub fn new( pub fn new(
inbox: mpsc::Receiver<DumpMsg>, inbox: mpsc::Receiver<DumpMsg>,
uuid_resolver: UuidResolver, uuid_resolver: UuidResolver,
update: Update, update: Update,
dump_path: impl AsRef<Path>, dump_path: impl AsRef<Path>,
_index_db_size: u64, index_db_size: u64,
_update_db_size: u64, update_db_size: u64,
) -> Self { ) -> Self {
Self { Self {
inbox: Some(inbox), inbox: Some(inbox),
@ -47,8 +47,8 @@ where
update, update,
dump_path: dump_path.as_ref().into(), dump_path: dump_path.as_ref().into(),
dump_info: Arc::new(RwLock::new(None)), dump_info: Arc::new(RwLock::new(None)),
_index_db_size, index_db_size,
_update_db_size, update_db_size,
} }
} }
@ -103,13 +103,16 @@ where
let dump_info = self.dump_info.clone(); let dump_info = self.dump_info.clone();
let task_result = tokio::task::spawn(perform_dump( let task = DumpTask {
self.dump_path.clone(), path: self.dump_path.clone(),
self.uuid_resolver.clone(), uuid_resolver: self.uuid_resolver.clone(),
self.update.clone(), update_handle: self.update.clone(),
uid.clone(), uid: uid.clone(),
)) update_db_size: self.update_db_size,
.await; index_db_size: self.index_db_size,
};
let task_result = tokio::task::spawn(task.run()).await;
match task_result { match task_result {
Ok(Ok(())) => { Ok(Ok(())) => {
@ -152,35 +155,46 @@ where
}) })
) )
} }
} }
async fn perform_dump<UuidResolver, Update>( struct DumpTask<U, P> {
path: PathBuf, path: PathBuf,
uuid_resolver: UuidResolver, uuid_resolver: U,
update_handle: Update, update_handle: P,
uid: String, uid: String,
) -> anyhow::Result<()> update_db_size: u64,
index_db_size: u64,
}
impl<U, P> DumpTask<U, P>
where where
UuidResolver: uuid_resolver::UuidResolverHandle + Send + Sync + Clone + 'static, U: UuidResolverHandle + Send + Sync + Clone + 'static,
Update: update_actor::UpdateActorHandle + Send + Sync + Clone + 'static, P: UpdateActorHandle + Send + Sync + Clone + 'static,
{ {
async fn run(self) -> anyhow::Result<()> {
info!("Performing dump."); info!("Performing dump.");
create_dir_all(&path).await?; create_dir_all(&self.path).await?;
let path_clone = path.clone(); let path_clone = self.path.clone();
let temp_dump_dir = tokio::task::spawn_blocking(|| tempfile::TempDir::new_in(path_clone)).await??; let temp_dump_dir = tokio::task::spawn_blocking(|| tempfile::TempDir::new_in(path_clone)).await??;
let temp_dump_path = temp_dump_dir.path().to_owned(); let temp_dump_path = temp_dump_dir.path().to_owned();
let uuids = uuid_resolver.dump(temp_dump_path.clone()).await?; let meta = Metadata::new_v2(self.index_db_size, self.update_db_size);
let meta_path = temp_dump_path.join(META_FILE_NAME);
let mut meta_file = File::create(&meta_path)?;
serde_json::to_writer(&mut meta_file, &meta)?;
update_handle.dump(uuids, temp_dump_path.clone()).await?; let uuids = self.uuid_resolver.dump(temp_dump_path.clone()).await?;
self.update_handle.dump(uuids, temp_dump_path.clone()).await?;
let dump_path = tokio::task::spawn_blocking(move || -> anyhow::Result<PathBuf> { let dump_path = tokio::task::spawn_blocking(move || -> anyhow::Result<PathBuf> {
let temp_dump_file = tempfile::NamedTempFile::new_in(&path)?; let temp_dump_file = tempfile::NamedTempFile::new_in(&self.path)?;
compression::to_tar_gz(temp_dump_path, temp_dump_file.path())?; compression::to_tar_gz(temp_dump_path, temp_dump_file.path())?;
let dump_path = path.join(format!("{}.dump", uid)); let dump_path = self.path.join(format!("{}.dump", self.uid));
temp_dump_file.persist(&dump_path)?; temp_dump_file.persist(&dump_path)?;
Ok(dump_path) Ok(dump_path)
@ -190,4 +204,5 @@ where
info!("Created dump in {:?}.", dump_path); info!("Created dump in {:?}.", dump_path);
Ok(()) Ok(())
}
} }

View File

@ -16,6 +16,15 @@ pub struct MetadataV2 {
} }
impl MetadataV2 { impl MetadataV2 {
pub fn new(index_db_size: u64, update_db_size: u64) -> Self {
Self {
db_version: env!("CARGO_PKG_VERSION").to_string(),
index_db_size,
update_db_size,
dump_date: Utc::now(),
}
}
pub fn load_dump( pub fn load_dump(
self, self,
src: impl AsRef<Path>, src: impl AsRef<Path>,

View File

@ -62,6 +62,10 @@ pub enum Metadata {
} }
impl Metadata { impl Metadata {
pub fn new_v2(index_db_size: u64, update_db_size: u64) -> Self {
let meta = MetadataV2::new(index_db_size, update_db_size);
Self::V2 { meta }
}
/// Extract Metadata from `metadata.json` file present at provided `dir_path` /// Extract Metadata from `metadata.json` file present at provided `dir_path`
fn from_path(dir_path: &Path) -> anyhow::Result<Self> { fn from_path(dir_path: &Path) -> anyhow::Result<Self> {
let path = dir_path.join("metadata.json"); let path = dir_path.join("metadata.json");