meilisearch/meilisearch-http/src/index_controller/dump_actor/mod.rs

203 lines
5.3 KiB
Rust
Raw Normal View History

2021-05-27 20:30:20 +08:00
use std::fs::File;
use std::path::{Path, PathBuf};
2021-05-11 02:25:09 +08:00
2021-05-30 21:55:17 +08:00
use chrono::{DateTime, Utc};
2021-05-27 20:30:20 +08:00
use log::{error, info};
#[cfg(test)]
use mockall::automock;
2021-05-11 02:23:12 +08:00
use serde::{Deserialize, Serialize};
use thiserror::Error;
2021-04-28 22:43:49 +08:00
2021-05-27 02:42:09 +08:00
use loaders::v1::MetadataV1;
use loaders::v2::MetadataV2;
2021-05-11 02:25:09 +08:00
pub use actor::DumpActor;
pub use handle_impl::*;
2021-05-11 02:25:09 +08:00
pub use message::DumpMsg;
2021-05-27 20:30:20 +08:00
use tokio::fs::create_dir_all;
2021-05-11 02:25:09 +08:00
2021-05-27 20:30:20 +08:00
use super::{update_actor::UpdateActorHandle, uuid_resolver::UuidResolverHandle};
use crate::{helpers::compression, option::IndexerOpts};
2021-05-27 04:52:06 +08:00
mod actor;
mod handle_impl;
mod loaders;
mod message;
2021-05-27 20:30:20 +08:00
const META_FILE_NAME: &'static str = "metadata.json";
2021-05-11 02:25:09 +08:00
pub type DumpResult<T> = std::result::Result<T, DumpError>;
#[derive(Error, Debug)]
pub enum DumpError {
#[error("error with index: {0}")]
Error(#[from] anyhow::Error),
#[error("Heed error: {0}")]
HeedError(#[from] heed::Error),
#[error("dump already running")]
DumpAlreadyRunning,
#[error("dump `{0}` does not exist")]
DumpDoesNotExist(String),
}
2021-04-28 22:43:49 +08:00
2021-05-11 02:25:09 +08:00
#[async_trait::async_trait]
#[cfg_attr(test, automock)]
pub trait DumpActorHandle {
/// Start the creation of a dump
/// Implementation: [handle_impl::DumpActorHandleImpl::create_dump]
async fn create_dump(&self) -> DumpResult<DumpInfo>;
/// Return the status of an already created dump
/// Implementation: [handle_impl::DumpActorHandleImpl::dump_status]
async fn dump_info(&self, uid: String) -> DumpResult<DumpInfo>;
}
2021-04-28 22:43:49 +08:00
#[derive(Debug, Serialize, Deserialize)]
2021-05-27 02:42:09 +08:00
#[serde(rename_all = "camelCase", tag = "dump_version")]
pub enum Metadata {
V1 {
#[serde(flatten)]
meta: MetadataV1,
},
V2 {
#[serde(flatten)]
meta: MetadataV2,
},
2021-04-28 22:43:49 +08:00
}
2021-04-29 20:45:08 +08:00
impl Metadata {
2021-05-27 16:51:19 +08:00
pub fn new_v2(index_db_size: u64, update_db_size: u64) -> Self {
let meta = MetadataV2::new(index_db_size, update_db_size);
Self::V2 { meta }
}
2021-04-28 22:43:49 +08:00
}
2021-05-11 02:25:09 +08:00
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
#[serde(rename_all = "snake_case")]
pub enum DumpStatus {
Done,
InProgress,
Failed,
2021-04-28 22:43:49 +08:00
}
2021-05-11 02:25:09 +08:00
#[derive(Debug, Serialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct DumpInfo {
pub uid: String,
pub status: DumpStatus,
2021-05-25 16:48:57 +08:00
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
2021-05-30 21:55:17 +08:00
started_at: DateTime<Utc>,
#[serde(skip_serializing_if = "Option::is_none")]
finished_at: Option<DateTime<Utc>>,
2021-05-11 02:25:09 +08:00
}
2021-04-28 22:43:49 +08:00
2021-05-11 02:25:09 +08:00
impl DumpInfo {
pub fn new(uid: String, status: DumpStatus) -> Self {
Self {
uid,
status,
error: None,
2021-05-30 21:55:17 +08:00
started_at: Utc::now(),
finished_at: None,
2021-05-11 02:25:09 +08:00
}
2021-05-05 20:11:56 +08:00
}
2021-04-28 22:43:49 +08:00
2021-05-11 02:25:09 +08:00
pub fn with_error(&mut self, error: String) {
self.status = DumpStatus::Failed;
2021-05-30 21:55:17 +08:00
self.finished_at = Some(Utc::now());
self.error = Some(error);
2021-05-05 20:11:56 +08:00
}
2021-04-28 22:43:49 +08:00
2021-05-11 02:25:09 +08:00
pub fn done(&mut self) {
2021-05-30 21:55:17 +08:00
self.finished_at = Some(Utc::now());
2021-05-11 02:25:09 +08:00
self.status = DumpStatus::Done;
}
2021-04-28 22:43:49 +08:00
2021-05-11 02:25:09 +08:00
pub fn dump_already_in_progress(&self) -> bool {
self.status == DumpStatus::InProgress
}
2021-04-28 22:43:49 +08:00
}
2021-05-27 20:30:20 +08:00
pub fn load_dump(
2021-05-27 02:42:09 +08:00
dst_path: impl AsRef<Path>,
src_path: impl AsRef<Path>,
2021-05-27 20:30:20 +08:00
index_db_size: u64,
update_db_size: u64,
2021-05-27 04:52:06 +08:00
indexer_opts: &IndexerOpts,
2021-04-28 22:43:49 +08:00
) -> anyhow::Result<()> {
2021-05-27 20:30:20 +08:00
let tmp_src = tempfile::tempdir_in(".")?;
let tmp_src_path = tmp_src.path();
compression::from_tar_gz(&src_path, tmp_src_path)?;
let meta_path = tmp_src_path.join(META_FILE_NAME);
2021-05-27 02:42:09 +08:00
let mut meta_file = File::open(&meta_path)?;
let meta: Metadata = serde_json::from_reader(&mut meta_file)?;
2021-04-28 22:43:49 +08:00
2021-05-27 02:42:09 +08:00
match meta {
2021-05-27 20:30:20 +08:00
Metadata::V1 { meta } => meta.load_dump(&tmp_src_path, dst_path)?,
Metadata::V2 { meta } => meta.load_dump(
&tmp_src_path,
dst_path.as_ref(),
index_db_size,
update_db_size,
indexer_opts,
)?,
}
2021-05-07 00:44:16 +08:00
2021-04-28 22:43:49 +08:00
Ok(())
}
2021-05-27 20:30:20 +08:00
struct DumpTask<U, P> {
path: PathBuf,
uuid_resolver: U,
update_handle: P,
uid: String,
update_db_size: u64,
index_db_size: u64,
}
impl<U, P> DumpTask<U, P>
where
U: UuidResolverHandle + Send + Sync + Clone + 'static,
P: UpdateActorHandle + Send + Sync + Clone + 'static,
{
async fn run(self) -> anyhow::Result<()> {
info!("Performing dump.");
create_dir_all(&self.path).await?;
let path_clone = self.path.clone();
let temp_dump_dir =
tokio::task::spawn_blocking(|| tempfile::TempDir::new_in(path_clone)).await??;
let temp_dump_path = temp_dump_dir.path().to_owned();
let meta = Metadata::new_v2(self.index_db_size, self.update_db_size);
let meta_path = temp_dump_path.join(META_FILE_NAME);
let mut meta_file = File::create(&meta_path)?;
serde_json::to_writer(&mut meta_file, &meta)?;
let uuids = self.uuid_resolver.dump(temp_dump_path.clone()).await?;
self.update_handle
.dump(uuids, temp_dump_path.clone())
.await?;
let dump_path = tokio::task::spawn_blocking(move || -> anyhow::Result<PathBuf> {
let temp_dump_file = tempfile::NamedTempFile::new_in(&self.path)?;
compression::to_tar_gz(temp_dump_path, temp_dump_file.path())?;
let dump_path = self.path.join(format!("{}.dump", self.uid));
temp_dump_file.persist(&dump_path)?;
Ok(dump_path)
})
.await??;
info!("Created dump in {:?}.", dump_path);
Ok(())
}
}