refactorize the dump and snapshot

This commit is contained in:
Tamo 2021-10-26 13:02:40 +02:00 committed by marin postma
parent 87a8bf5e96
commit c752c14c46
No known key found for this signature in database
GPG Key ID: 6088B7721C3E39F9
6 changed files with 28 additions and 24 deletions

View File

@ -1,17 +1,8 @@
use std::{fs, path::Path}; use std::{fs, path::Path};
/// To load a dump we get the user id from the source directory; /// Copy the `user-id` contained in one db to another. Ignore all errors.
/// If there was a user-id, write it to the new destination if not ignore the error pub fn copy_user_id(src: &Path, dst: &Path) {
pub fn load_dump(src: &Path, dst: &Path) {
if let Ok(user_id) = fs::read_to_string(src.join("user-id")) { if let Ok(user_id) = fs::read_to_string(src.join("user-id")) {
let _ = fs::write(dst.join("user-id"), &user_id); let _ = fs::write(dst.join("user-id"), &user_id);
} }
} }
/// To load a dump we get the user id either from the source directory;
/// If there was a user-id, write it to the new destination if not ignore the error
pub fn write_dump(src: &Path, dst: &Path) {
if let Ok(user_id) = fs::read_to_string(src) {
let _ = fs::write(dst, &user_id);
}
}

View File

@ -121,8 +121,8 @@ where
ret.send(Ok(info)).expect("Dump actor is dead"); ret.send(Ok(info)).expect("Dump actor is dead");
let task = DumpTask { let task = DumpTask {
path: self.dump_path.clone(), dump_path: self.dump_path.clone(),
analytics_path: self.analytics_path.clone(), db_path: self.analytics_path.clone(),
index_resolver: self.index_resolver.clone(), index_resolver: self.index_resolver.clone(),
update_sender: self.update.clone(), update_sender: self.update.clone(),
uid: uid.clone(), uid: uid.clone(),

View File

@ -25,7 +25,7 @@ pub fn load_dump(
IndexResolver::load_dump(src.as_ref(), &dst, index_db_size, indexing_options)?; IndexResolver::load_dump(src.as_ref(), &dst, index_db_size, indexing_options)?;
UpdateFileStore::load_dump(src.as_ref(), &dst)?; UpdateFileStore::load_dump(src.as_ref(), &dst)?;
UpdateStore::load_dump(&src, &dst, update_db_size)?; UpdateStore::load_dump(&src, &dst, update_db_size)?;
analytics::load_dump(src.as_ref(), dst.as_ref()); analytics::copy_user_id(src.as_ref(), dst.as_ref());
info!("Loading indexes."); info!("Loading indexes.");

View File

@ -223,8 +223,8 @@ pub fn load_dump(
} }
struct DumpTask<U, I> { struct DumpTask<U, I> {
path: PathBuf, dump_path: PathBuf,
analytics_path: PathBuf, db_path: PathBuf,
index_resolver: Arc<IndexResolver<U, I>>, index_resolver: Arc<IndexResolver<U, I>>,
update_sender: UpdateSender, update_sender: UpdateSender,
uid: String, uid: String,
@ -240,7 +240,7 @@ where
async fn run(self) -> Result<()> { async fn run(self) -> Result<()> {
trace!("Performing dump."); trace!("Performing dump.");
create_dir_all(&self.path).await?; create_dir_all(&self.dump_path).await?;
let temp_dump_dir = tokio::task::spawn_blocking(tempfile::TempDir::new).await??; let temp_dump_dir = tokio::task::spawn_blocking(tempfile::TempDir::new).await??;
let temp_dump_path = temp_dump_dir.path().to_owned(); let temp_dump_path = temp_dump_dir.path().to_owned();
@ -249,7 +249,7 @@ where
let meta_path = temp_dump_path.join(META_FILE_NAME); let meta_path = temp_dump_path.join(META_FILE_NAME);
let mut meta_file = File::create(&meta_path)?; let mut meta_file = File::create(&meta_path)?;
serde_json::to_writer(&mut meta_file, &meta)?; serde_json::to_writer(&mut meta_file, &meta)?;
analytics::write_dump(&self.analytics_path, &temp_dump_path.join("user-id")); analytics::copy_user_id(&self.db_path, &temp_dump_path);
create_dir_all(&temp_dump_path.join("indexes")).await?; create_dir_all(&temp_dump_path.join("indexes")).await?;
let uuids = self.index_resolver.dump(temp_dump_path.clone()).await?; let uuids = self.index_resolver.dump(temp_dump_path.clone()).await?;
@ -257,11 +257,11 @@ where
UpdateMsg::dump(&self.update_sender, uuids, temp_dump_path.clone()).await?; UpdateMsg::dump(&self.update_sender, uuids, temp_dump_path.clone()).await?;
let dump_path = tokio::task::spawn_blocking(move || -> Result<PathBuf> { let dump_path = tokio::task::spawn_blocking(move || -> Result<PathBuf> {
let temp_dump_file = tempfile::NamedTempFile::new_in(&self.path)?; let temp_dump_file = tempfile::NamedTempFile::new_in(&self.dump_path)?;
to_tar_gz(temp_dump_path, temp_dump_file.path()) to_tar_gz(temp_dump_path, temp_dump_file.path())
.map_err(|e| DumpActorError::Internal(e.into()))?; .map_err(|e| DumpActorError::Internal(e.into()))?;
let dump_path = self.path.join(self.uid).with_extension("dump"); let dump_path = self.dump_path.join(self.uid).with_extension("dump");
temp_dump_file.persist(&dump_path)?; temp_dump_file.persist(&dump_path)?;
Ok(dump_path) Ok(dump_path)
@ -341,9 +341,9 @@ mod test {
create_update_handler(index_resolver.clone(), tmp.path(), 4096 * 100).unwrap(); create_update_handler(index_resolver.clone(), tmp.path(), 4096 * 100).unwrap();
let task = DumpTask { let task = DumpTask {
path: tmp.path().to_owned(), dump_path: tmp.path().into(),
// this should do nothing // this should do nothing
analytics_path: tmp.path().join("user-id"), db_path: tmp.path().into(),
index_resolver, index_resolver,
update_sender, update_sender,
uid: String::from("test"), uid: String::from("test"),
@ -371,9 +371,9 @@ mod test {
create_update_handler(index_resolver.clone(), tmp.path(), 4096 * 100).unwrap(); create_update_handler(index_resolver.clone(), tmp.path(), 4096 * 100).unwrap();
let task = DumpTask { let task = DumpTask {
path: tmp.path().to_owned(), dump_path: tmp.path().into(),
// this should do nothing // this should do nothing
analytics_path: tmp.path().join("user-id"), db_path: tmp.path().into(),
index_resolver, index_resolver,
update_sender, update_sender,
uid: String::from("test"), uid: String::from("test"),

View File

@ -189,6 +189,7 @@ impl IndexControllerBuilder {
.ok_or_else(|| anyhow::anyhow!("Snapshot interval not provided."))?, .ok_or_else(|| anyhow::anyhow!("Snapshot interval not provided."))?,
self.snapshot_dir self.snapshot_dir
.ok_or_else(|| anyhow::anyhow!("Snapshot path not provided."))?, .ok_or_else(|| anyhow::anyhow!("Snapshot path not provided."))?,
db_path.as_ref().into(),
db_path db_path
.as_ref() .as_ref()
.file_name() .file_name()

View File

@ -8,6 +8,7 @@ use tokio::fs;
use tokio::task::spawn_blocking; use tokio::task::spawn_blocking;
use tokio::time::sleep; use tokio::time::sleep;
use crate::analytics;
use crate::compression::from_tar_gz; use crate::compression::from_tar_gz;
use crate::index_controller::updates::UpdateMsg; use crate::index_controller::updates::UpdateMsg;
@ -21,6 +22,7 @@ pub struct SnapshotService<U, I> {
update_sender: UpdateSender, update_sender: UpdateSender,
snapshot_period: Duration, snapshot_period: Duration,
snapshot_path: PathBuf, snapshot_path: PathBuf,
db_path: PathBuf,
db_name: String, db_name: String,
} }
@ -34,6 +36,7 @@ where
update_sender: UpdateSender, update_sender: UpdateSender,
snapshot_period: Duration, snapshot_period: Duration,
snapshot_path: PathBuf, snapshot_path: PathBuf,
db_path: PathBuf,
db_name: String, db_name: String,
) -> Self { ) -> Self {
Self { Self {
@ -41,6 +44,7 @@ where
update_sender, update_sender,
snapshot_period, snapshot_period,
snapshot_path, snapshot_path,
db_path,
db_name, db_name,
} }
} }
@ -71,6 +75,8 @@ where
.snapshot(temp_snapshot_path.clone()) .snapshot(temp_snapshot_path.clone())
.await?; .await?;
analytics::copy_user_id(&self.db_path, &temp_snapshot_path.clone());
if indexes.is_empty() { if indexes.is_empty() {
return Ok(()); return Ok(());
} }
@ -211,6 +217,8 @@ mod test {
update_sender, update_sender,
Duration::from_millis(100), Duration::from_millis(100),
snapshot_path.path().to_owned(), snapshot_path.path().to_owned(),
// this should do nothing
snapshot_path.path().to_owned(),
"data.ms".to_string(), "data.ms".to_string(),
); );
@ -243,6 +251,8 @@ mod test {
update_sender, update_sender,
Duration::from_millis(100), Duration::from_millis(100),
snapshot_path.path().to_owned(), snapshot_path.path().to_owned(),
// this should do nothing
snapshot_path.path().to_owned(),
"data.ms".to_string(), "data.ms".to_string(),
); );
@ -292,6 +302,8 @@ mod test {
update_sender, update_sender,
Duration::from_millis(100), Duration::from_millis(100),
snapshot_path.path().to_owned(), snapshot_path.path().to_owned(),
// this should do nothing
snapshot_path.path().to_owned(),
"data.ms".to_string(), "data.ms".to_string(),
); );