use std::fs; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; use anyhow::bail; use fs_extra::dir::{self, CopyOptions}; use log::{info, trace}; use meilisearch_auth::open_auth_store_env; use milli::heed::CompactionOption; use tokio::sync::RwLock; use tokio::time::sleep; use walkdir::WalkDir; use crate::compression::from_tar_gz; use crate::index_controller::open_meta_env; use crate::index_controller::versioning::VERSION_FILE_NAME; use crate::tasks::Scheduler; pub struct SnapshotService { pub(crate) db_path: PathBuf, pub(crate) snapshot_period: Duration, pub(crate) snapshot_path: PathBuf, pub(crate) index_size: usize, pub(crate) meta_env_size: usize, pub(crate) scheduler: Arc>, } impl SnapshotService { pub async fn run(self) { info!( "Snapshot scheduled every {}s.", self.snapshot_period.as_secs() ); loop { let snapshot_job = SnapshotJob { dest_path: self.snapshot_path.clone(), src_path: self.db_path.clone(), meta_env_size: self.meta_env_size, index_size: self.index_size, }; self.scheduler.write().await.schedule_snapshot(snapshot_job); sleep(self.snapshot_period).await; } } } pub fn load_snapshot( db_path: impl AsRef, snapshot_path: impl AsRef, ignore_snapshot_if_db_exists: bool, ignore_missing_snapshot: bool, ) -> anyhow::Result<()> { let empty_db = crate::is_empty_db(&db_path); let snapshot_path_exists = snapshot_path.as_ref().exists(); if empty_db && snapshot_path_exists { match from_tar_gz(snapshot_path, &db_path) { Ok(()) => Ok(()), Err(e) => { //clean created db folder std::fs::remove_dir_all(&db_path)?; Err(e) } } } else if !empty_db && !ignore_snapshot_if_db_exists { bail!( "database already exists at {:?}, try to delete it or rename it", db_path .as_ref() .canonicalize() .unwrap_or_else(|_| db_path.as_ref().to_owned()) ) } else if !snapshot_path_exists && !ignore_missing_snapshot { bail!("snapshot doesn't exist at {:?}", snapshot_path.as_ref()) } else { Ok(()) } } #[derive(Debug)] pub struct SnapshotJob { dest_path: PathBuf, src_path: PathBuf, meta_env_size: usize, index_size: usize, } impl SnapshotJob { pub async fn run(self) -> anyhow::Result<()> { tokio::task::spawn_blocking(|| self.run_sync()).await??; Ok(()) } fn run_sync(self) -> anyhow::Result<()> { trace!("Performing snapshot."); let snapshot_dir = self.dest_path.clone(); std::fs::create_dir_all(&snapshot_dir)?; let temp_snapshot_dir = tempfile::tempdir()?; let temp_snapshot_path = temp_snapshot_dir.path(); self.snapshot_version_file(temp_snapshot_path)?; self.snapshot_meta_env(temp_snapshot_path)?; self.snapshot_file_store(temp_snapshot_path)?; self.snapshot_indexes(temp_snapshot_path)?; self.snapshot_auth(temp_snapshot_path)?; let db_name = self .src_path .file_name() .and_then(|n| n.to_str()) .unwrap_or("data.ms") .to_string(); let snapshot_path = self.dest_path.join(format!("{}.snapshot", db_name)); let temp_snapshot_file = tempfile::NamedTempFile::new_in(&snapshot_dir)?; let temp_snapshot_file_path = temp_snapshot_file.path().to_owned(); crate::compression::to_tar_gz(temp_snapshot_path, temp_snapshot_file_path)?; let _file = temp_snapshot_file.persist(&snapshot_path)?; #[cfg(unix)] { use std::fs::Permissions; use std::os::unix::fs::PermissionsExt; let perm = Permissions::from_mode(0o644); _file.set_permissions(perm)?; } trace!("Created snapshot in {:?}.", snapshot_path); Ok(()) } fn snapshot_version_file(&self, path: &Path) -> anyhow::Result<()> { let dst = path.join(VERSION_FILE_NAME); let src = self.src_path.join(VERSION_FILE_NAME); fs::copy(src, dst)?; Ok(()) } fn snapshot_meta_env(&self, path: &Path) -> anyhow::Result<()> { let env = open_meta_env(&self.src_path, self.meta_env_size)?; let dst = path.join("data.mdb"); env.copy_to_path(dst, milli::heed::CompactionOption::Enabled)?; Ok(()) } fn snapshot_file_store(&self, path: &Path) -> anyhow::Result<()> { // for now we simply copy the updates/updates_files // FIXME(marin): We may copy more files than necessary, if new files are added while we are // performing the snapshop. We need a way to filter them out. let dst = path.join("updates"); fs::create_dir_all(&dst)?; let options = CopyOptions::default(); dir::copy(self.src_path.join("updates/updates_files"), dst, &options)?; Ok(()) } fn snapshot_indexes(&self, path: &Path) -> anyhow::Result<()> { let indexes_path = self.src_path.join("indexes/"); let dst = path.join("indexes/"); for entry in WalkDir::new(indexes_path).max_depth(1).into_iter().skip(1) { let entry = entry?; let name = entry.file_name(); let dst = dst.join(name); std::fs::create_dir_all(&dst)?; let dst = dst.join("data.mdb"); let mut options = milli::heed::EnvOpenOptions::new(); options.map_size(self.index_size); let index = milli::Index::new(options, entry.path())?; index.copy_to_path(dst, CompactionOption::Enabled)?; } Ok(()) } fn snapshot_auth(&self, path: &Path) -> anyhow::Result<()> { let auth_path = self.src_path.join("auth"); let dst = path.join("auth"); std::fs::create_dir_all(&dst)?; let dst = dst.join("data.mdb"); let env = open_auth_store_env(&auth_path)?; env.copy_to_path(dst, milli::heed::CompactionOption::Enabled)?; Ok(()) } }