2021-05-27 20:30:20 +08:00
|
|
|
use std::fs::File;
|
|
|
|
use std::path::{Path, PathBuf};
|
2021-05-11 02:25:09 +08:00
|
|
|
|
2021-05-30 21:55:17 +08:00
|
|
|
use chrono::{DateTime, Utc};
|
2021-06-15 03:26:35 +08:00
|
|
|
use log::{info, warn};
|
2021-05-11 02:20:36 +08:00
|
|
|
#[cfg(test)]
|
|
|
|
use mockall::automock;
|
2021-05-11 02:23:12 +08:00
|
|
|
use serde::{Deserialize, Serialize};
|
2021-05-31 22:40:59 +08:00
|
|
|
use tokio::fs::create_dir_all;
|
2021-04-28 22:43:49 +08:00
|
|
|
|
2021-05-27 02:42:09 +08:00
|
|
|
use loaders::v1::MetadataV1;
|
|
|
|
use loaders::v2::MetadataV2;
|
2021-05-11 02:25:09 +08:00
|
|
|
|
|
|
|
pub use actor::DumpActor;
|
2021-05-11 02:20:36 +08:00
|
|
|
pub use handle_impl::*;
|
2021-05-11 02:25:09 +08:00
|
|
|
pub use message::DumpMsg;
|
|
|
|
|
2021-05-27 20:30:20 +08:00
|
|
|
use super::{update_actor::UpdateActorHandle, uuid_resolver::UuidResolverHandle};
|
2021-06-15 03:26:35 +08:00
|
|
|
use crate::index_controller::dump_actor::error::DumpActorError;
|
2021-05-27 20:30:20 +08:00
|
|
|
use crate::{helpers::compression, option::IndexerOpts};
|
2021-06-15 03:26:35 +08:00
|
|
|
use error::Result;
|
2021-05-27 04:52:06 +08:00
|
|
|
|
|
|
|
mod actor;
|
|
|
|
mod handle_impl;
|
|
|
|
mod loaders;
|
|
|
|
mod message;
|
2021-06-15 03:26:35 +08:00
|
|
|
pub mod error;
|
2021-05-27 04:52:06 +08:00
|
|
|
|
2021-05-31 22:03:39 +08:00
|
|
|
const META_FILE_NAME: &str = "metadata.json";
|
2021-05-27 20:30:20 +08:00
|
|
|
|
2021-05-11 02:25:09 +08:00
|
|
|
#[async_trait::async_trait]
|
|
|
|
#[cfg_attr(test, automock)]
|
|
|
|
pub trait DumpActorHandle {
|
|
|
|
/// Start the creation of a dump
|
|
|
|
/// Implementation: [handle_impl::DumpActorHandleImpl::create_dump]
|
2021-06-15 03:26:35 +08:00
|
|
|
async fn create_dump(&self) -> Result<DumpInfo>;
|
2021-05-11 02:25:09 +08:00
|
|
|
|
|
|
|
/// Return the status of an already created dump
|
|
|
|
/// Implementation: [handle_impl::DumpActorHandleImpl::dump_status]
|
2021-06-15 03:26:35 +08:00
|
|
|
async fn dump_info(&self, uid: String) -> Result<DumpInfo>;
|
2021-05-11 02:25:09 +08:00
|
|
|
}
|
|
|
|
|
2021-04-28 22:43:49 +08:00
|
|
|
#[derive(Debug, Serialize, Deserialize)]
|
2021-05-31 16:42:31 +08:00
|
|
|
#[serde(tag = "dumpVersion")]
|
2021-05-27 02:42:09 +08:00
|
|
|
pub enum Metadata {
|
2021-05-31 16:42:31 +08:00
|
|
|
V1(MetadataV1),
|
|
|
|
V2(MetadataV2),
|
2021-04-28 22:43:49 +08:00
|
|
|
}
|
|
|
|
|
2021-04-29 20:45:08 +08:00
|
|
|
impl Metadata {
|
2021-05-31 22:40:59 +08:00
|
|
|
pub fn new_v2(index_db_size: usize, update_db_size: usize) -> Self {
|
2021-05-27 16:51:19 +08:00
|
|
|
let meta = MetadataV2::new(index_db_size, update_db_size);
|
2021-05-31 16:42:31 +08:00
|
|
|
Self::V2(meta)
|
2021-05-27 16:51:19 +08:00
|
|
|
}
|
2021-04-28 22:43:49 +08:00
|
|
|
}
|
|
|
|
|
2021-05-11 02:25:09 +08:00
|
|
|
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
|
|
|
|
#[serde(rename_all = "snake_case")]
|
|
|
|
pub enum DumpStatus {
|
|
|
|
Done,
|
|
|
|
InProgress,
|
|
|
|
Failed,
|
2021-04-28 22:43:49 +08:00
|
|
|
}
|
|
|
|
|
2021-05-11 02:25:09 +08:00
|
|
|
#[derive(Debug, Serialize, Clone)]
|
|
|
|
#[serde(rename_all = "camelCase")]
|
|
|
|
pub struct DumpInfo {
|
|
|
|
pub uid: String,
|
|
|
|
pub status: DumpStatus,
|
2021-05-25 16:48:57 +08:00
|
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
2021-05-24 23:33:42 +08:00
|
|
|
pub error: Option<String>,
|
2021-05-30 21:55:17 +08:00
|
|
|
started_at: DateTime<Utc>,
|
|
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
|
|
finished_at: Option<DateTime<Utc>>,
|
2021-05-11 02:25:09 +08:00
|
|
|
}
|
2021-04-28 22:43:49 +08:00
|
|
|
|
2021-05-11 02:25:09 +08:00
|
|
|
impl DumpInfo {
|
|
|
|
pub fn new(uid: String, status: DumpStatus) -> Self {
|
|
|
|
Self {
|
|
|
|
uid,
|
|
|
|
status,
|
|
|
|
error: None,
|
2021-05-30 21:55:17 +08:00
|
|
|
started_at: Utc::now(),
|
|
|
|
finished_at: None,
|
2021-05-11 02:25:09 +08:00
|
|
|
}
|
2021-05-05 20:11:56 +08:00
|
|
|
}
|
2021-04-28 22:43:49 +08:00
|
|
|
|
2021-05-11 02:25:09 +08:00
|
|
|
pub fn with_error(&mut self, error: String) {
|
|
|
|
self.status = DumpStatus::Failed;
|
2021-05-30 21:55:17 +08:00
|
|
|
self.finished_at = Some(Utc::now());
|
2021-05-24 23:33:42 +08:00
|
|
|
self.error = Some(error);
|
2021-05-05 20:11:56 +08:00
|
|
|
}
|
2021-04-28 22:43:49 +08:00
|
|
|
|
2021-05-11 02:25:09 +08:00
|
|
|
pub fn done(&mut self) {
|
2021-05-30 21:55:17 +08:00
|
|
|
self.finished_at = Some(Utc::now());
|
2021-05-11 02:25:09 +08:00
|
|
|
self.status = DumpStatus::Done;
|
|
|
|
}
|
2021-04-28 22:43:49 +08:00
|
|
|
|
2021-05-11 02:25:09 +08:00
|
|
|
pub fn dump_already_in_progress(&self) -> bool {
|
|
|
|
self.status == DumpStatus::InProgress
|
|
|
|
}
|
2021-04-28 22:43:49 +08:00
|
|
|
}
|
|
|
|
|
2021-05-27 20:30:20 +08:00
|
|
|
pub fn load_dump(
|
2021-05-27 02:42:09 +08:00
|
|
|
dst_path: impl AsRef<Path>,
|
|
|
|
src_path: impl AsRef<Path>,
|
2021-05-31 22:40:59 +08:00
|
|
|
index_db_size: usize,
|
|
|
|
update_db_size: usize,
|
2021-05-27 04:52:06 +08:00
|
|
|
indexer_opts: &IndexerOpts,
|
2021-06-15 03:26:35 +08:00
|
|
|
) -> std::result::Result<(), Box<dyn std::error::Error>> {
|
2021-05-27 20:30:20 +08:00
|
|
|
let tmp_src = tempfile::tempdir_in(".")?;
|
|
|
|
let tmp_src_path = tmp_src.path();
|
|
|
|
|
|
|
|
compression::from_tar_gz(&src_path, tmp_src_path)?;
|
|
|
|
|
|
|
|
let meta_path = tmp_src_path.join(META_FILE_NAME);
|
2021-05-27 02:42:09 +08:00
|
|
|
let mut meta_file = File::open(&meta_path)?;
|
|
|
|
let meta: Metadata = serde_json::from_reader(&mut meta_file)?;
|
2021-04-28 22:43:49 +08:00
|
|
|
|
2021-05-31 16:42:31 +08:00
|
|
|
let dst_dir = dst_path
|
|
|
|
.as_ref()
|
|
|
|
.parent()
|
2021-06-15 03:26:35 +08:00
|
|
|
// TODO
|
|
|
|
//.with_context(|| format!("Invalid db path: {}", dst_path.as_ref().display()))?;
|
|
|
|
.unwrap();
|
2021-05-31 16:42:31 +08:00
|
|
|
|
|
|
|
let tmp_dst = tempfile::tempdir_in(dst_dir)?;
|
|
|
|
|
2021-05-27 02:42:09 +08:00
|
|
|
match meta {
|
2021-05-31 22:03:39 +08:00
|
|
|
Metadata::V1(meta) => {
|
2021-05-31 22:40:59 +08:00
|
|
|
meta.load_dump(&tmp_src_path, tmp_dst.path(), index_db_size, indexer_opts)?
|
2021-05-31 22:03:39 +08:00
|
|
|
}
|
2021-05-31 16:42:31 +08:00
|
|
|
Metadata::V2(meta) => meta.load_dump(
|
2021-05-27 20:30:20 +08:00
|
|
|
&tmp_src_path,
|
2021-05-31 16:42:31 +08:00
|
|
|
tmp_dst.path(),
|
2021-05-27 20:30:20 +08:00
|
|
|
index_db_size,
|
|
|
|
update_db_size,
|
|
|
|
indexer_opts,
|
|
|
|
)?,
|
2021-05-11 19:03:47 +08:00
|
|
|
}
|
2021-05-31 16:42:31 +08:00
|
|
|
// Persist and atomically rename the db
|
|
|
|
let persisted_dump = tmp_dst.into_path();
|
|
|
|
if dst_path.as_ref().exists() {
|
|
|
|
warn!("Overwriting database at {}", dst_path.as_ref().display());
|
|
|
|
std::fs::remove_dir_all(&dst_path)?;
|
|
|
|
}
|
|
|
|
|
|
|
|
std::fs::rename(&persisted_dump, &dst_path)?;
|
2021-05-07 00:44:16 +08:00
|
|
|
|
2021-04-28 22:43:49 +08:00
|
|
|
Ok(())
|
|
|
|
}
|
2021-05-27 20:30:20 +08:00
|
|
|
|
|
|
|
struct DumpTask<U, P> {
|
|
|
|
path: PathBuf,
|
|
|
|
uuid_resolver: U,
|
|
|
|
update_handle: P,
|
|
|
|
uid: String,
|
2021-05-31 22:40:59 +08:00
|
|
|
update_db_size: usize,
|
|
|
|
index_db_size: usize,
|
2021-05-27 20:30:20 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
impl<U, P> DumpTask<U, P>
|
|
|
|
where
|
|
|
|
U: UuidResolverHandle + Send + Sync + Clone + 'static,
|
|
|
|
P: UpdateActorHandle + Send + Sync + Clone + 'static,
|
|
|
|
{
|
2021-06-15 03:26:35 +08:00
|
|
|
async fn run(self) -> Result<()> {
|
2021-05-27 20:30:20 +08:00
|
|
|
info!("Performing dump.");
|
|
|
|
|
|
|
|
create_dir_all(&self.path).await?;
|
|
|
|
|
|
|
|
let path_clone = self.path.clone();
|
|
|
|
let temp_dump_dir =
|
|
|
|
tokio::task::spawn_blocking(|| tempfile::TempDir::new_in(path_clone)).await??;
|
|
|
|
let temp_dump_path = temp_dump_dir.path().to_owned();
|
|
|
|
|
|
|
|
let meta = Metadata::new_v2(self.index_db_size, self.update_db_size);
|
|
|
|
let meta_path = temp_dump_path.join(META_FILE_NAME);
|
|
|
|
let mut meta_file = File::create(&meta_path)?;
|
|
|
|
serde_json::to_writer(&mut meta_file, &meta)?;
|
|
|
|
|
|
|
|
let uuids = self.uuid_resolver.dump(temp_dump_path.clone()).await?;
|
|
|
|
|
|
|
|
self.update_handle
|
|
|
|
.dump(uuids, temp_dump_path.clone())
|
|
|
|
.await?;
|
|
|
|
|
2021-06-15 03:26:35 +08:00
|
|
|
let dump_path = tokio::task::spawn_blocking(move || -> Result<PathBuf> {
|
2021-05-27 20:30:20 +08:00
|
|
|
let temp_dump_file = tempfile::NamedTempFile::new_in(&self.path)?;
|
2021-06-15 03:26:35 +08:00
|
|
|
compression::to_tar_gz(temp_dump_path, temp_dump_file.path())
|
|
|
|
.map_err(|e| DumpActorError::Internal(e))?;
|
2021-05-27 20:30:20 +08:00
|
|
|
|
2021-06-01 17:18:37 +08:00
|
|
|
let dump_path = self.path.join(self.uid).with_extension("dump");
|
2021-05-27 20:30:20 +08:00
|
|
|
temp_dump_file.persist(&dump_path)?;
|
|
|
|
|
|
|
|
Ok(dump_path)
|
|
|
|
})
|
|
|
|
.await??;
|
|
|
|
|
|
|
|
info!("Created dump in {:?}.", dump_path);
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
}
|