diff --git a/Cargo.lock b/Cargo.lock index 9a7677ab7..8bc6477f4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2341,6 +2341,7 @@ dependencies = [ "csv", "either", "enum-iterator", + "flate2", "fst", "insta", "meili-snap", @@ -2350,6 +2351,7 @@ dependencies = [ "roaring", "serde", "serde_json", + "tar", "thiserror", "time", "tokio", diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index c97bbf45b..f80f6e02e 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -18,6 +18,7 @@ one indexing operation. */ use std::collections::HashSet; +use std::ffi::OsStr; use std::fs::{self, File}; use std::io::BufWriter; @@ -33,7 +34,7 @@ use meilisearch_types::milli::update::{ use meilisearch_types::milli::{self, BEU32}; use meilisearch_types::settings::{apply_settings_to_builder, Settings, Unchecked}; use meilisearch_types::tasks::{Details, Kind, KindWithContent, Status, Task}; -use meilisearch_types::{Index, VERSION_FILE_NAME}; +use meilisearch_types::{compression, Index, VERSION_FILE_NAME}; use roaring::RoaringBitmap; use time::OffsetDateTime; use uuid::Uuid; @@ -621,7 +622,23 @@ impl IndexScheduler { let auth = milli::heed::EnvOpenOptions::new().open(src)?; auth.copy_to_path(dst, CompactionOption::Enabled)?; - todo!("tar-gz and append .snapshot at the end of the file"); + // 5. Copy and tarball the flat snapshot + // 5.1 Find the original name of the database + // TODO find a better way to get this path + let mut base_path = self.env.path().to_owned(); + base_path.pop(); + let db_name = base_path.file_name().and_then(OsStr::to_str).unwrap_or("data.ms"); + + // 5.2 Tarball the content of the snapshot in a tempfile with a .snapshot extension + let snapshot_path = self.snapshots_path.join(db_name).with_extension("snapshot"); + let temp_snapshot_file = tempfile::NamedTempFile::new_in(&self.snapshots_path)?; + compression::to_tar_gz(temp_snapshot_dir.path(), temp_snapshot_file.path())?; + let file = temp_snapshot_file.persist(&snapshot_path)?; + + // 5.3 Change the permission to make the snapshot readonly + let mut permissions = file.metadata()?.permissions(); + permissions.set_readonly(true); + file.set_permissions(permissions)?; for task in &mut tasks { task.status = Status::Succeeded; diff --git a/index-scheduler/src/error.rs b/index-scheduler/src/error.rs index b34bcb2d8..4e404685d 100644 --- a/index-scheduler/src/error.rs +++ b/index-scheduler/src/error.rs @@ -32,6 +32,8 @@ pub enum Error { FileStore(#[from] file_store::Error), #[error(transparent)] IoError(#[from] std::io::Error), + #[error(transparent)] + Persist(#[from] tempfile::PersistError), #[error(transparent)] Anyhow(#[from] anyhow::Error), @@ -59,10 +61,11 @@ impl ErrorCode for Error { Error::Dump(e) => e.error_code(), Error::Milli(e) => e.error_code(), Error::ProcessBatchPanicked => Code::Internal, - // TODO: TAMO: are all these errors really internal? + // TODO: TAMO: are all these errors really internal? Error::Heed(_) => Code::Internal, Error::FileStore(_) => Code::Internal, Error::IoError(_) => Code::Internal, + Error::Persist(_) => Code::Internal, Error::Anyhow(_) => Code::Internal, Error::CorruptedTaskQueue => Code::Internal, Error::CorruptedDump => Code::Internal, diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 940b3eb4c..09e323bd8 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -955,6 +955,7 @@ mod tests { use meilisearch_types::milli::update::IndexDocumentsMethod::{ ReplaceDocuments, UpdateDocuments, }; + use meilisearch_types::VERSION_FILE_NAME; use tempfile::TempDir; use time::Duration; use uuid::Uuid; diff --git a/meilisearch-http/src/lib.rs b/meilisearch-http/src/lib.rs index 135c5080c..616652237 100644 --- a/meilisearch-http/src/lib.rs +++ b/meilisearch-http/src/lib.rs @@ -35,6 +35,7 @@ use meilisearch_auth::AuthController; use meilisearch_types::milli::documents::{DocumentsBatchBuilder, DocumentsBatchReader}; use meilisearch_types::milli::update::{IndexDocumentsConfig, IndexDocumentsMethod}; use meilisearch_types::settings::apply_settings_to_builder; +use meilisearch_types::versioning::{check_version_file, create_version_file}; use meilisearch_types::{milli, VERSION_FILE_NAME}; pub use option::Opt; @@ -128,23 +129,23 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(IndexScheduler, AuthContr match ( index_scheduler_builder().map_err(anyhow::Error::from), auth_controller_builder().map_err(anyhow::Error::from), + create_version_file(&opt.db_path).map_err(anyhow::Error::from), ) { - (Ok(i), Ok(a)) => Ok((i, a)), - (Err(e), _) | (_, Err(e)) => { + (Ok(i), Ok(a), Ok(())) => Ok((i, a)), + (Err(e), _, _) | (_, Err(e), _) | (_, _, Err(e)) => { std::fs::remove_dir_all(&opt.db_path)?; Err(e) } } }; + let empty_db = is_empty_db(&opt.db_path); let (index_scheduler, auth_controller) = if let Some(ref _path) = opt.import_snapshot { // handle the snapshot with something akin to the dumps // + the snapshot interval / spawning a thread todo!(); } else if let Some(ref path) = opt.import_dump { - let empty_db = is_empty_db(&opt.db_path); let src_path_exists = path.exists(); - if empty_db && src_path_exists { let (mut index_scheduler, mut auth_controller) = meilisearch_builder()?; match import_dump(&opt.db_path, path, &mut index_scheduler, &mut auth_controller) { @@ -172,6 +173,9 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(IndexScheduler, AuthContr } } } else { + if !empty_db { + check_version_file(&opt.db_path)?; + } meilisearch_builder()? }; diff --git a/meilisearch-types/Cargo.toml b/meilisearch-types/Cargo.toml index e0e48c65e..2ce23e4cc 100644 --- a/meilisearch-types/Cargo.toml +++ b/meilisearch-types/Cargo.toml @@ -10,6 +10,7 @@ anyhow = "1.0.65" csv = "1.1.6" either = { version = "1.6.1", features = ["serde"] } enum-iterator = "1.1.3" +flate2 = "1.0.24" fst = "0.4.7" milli = { git = "https://github.com/meilisearch/milli.git", branch = "indexation-abortion", default-features = false } proptest = { version = "1.0.0", optional = true } @@ -17,6 +18,7 @@ proptest-derive = { version = "0.3.0", optional = true } roaring = { version = "0.10.0", features = ["serde"] } serde = { version = "1.0.145", features = ["derive"] } serde_json = "1.0.85" +tar = "0.4.38" thiserror = "1.0.30" time = { version = "0.3.7", features = ["serde-well-known", "formatting", "parsing", "macros"] } tokio = "1.0" diff --git a/meilisearch-types/src/compression.rs b/meilisearch-types/src/compression.rs new file mode 100644 index 000000000..1d364b815 --- /dev/null +++ b/meilisearch-types/src/compression.rs @@ -0,0 +1,28 @@ +use std::fs::{create_dir_all, File}; +use std::io::Write; +use std::path::Path; + +use flate2::read::GzDecoder; +use flate2::write::GzEncoder; +use flate2::Compression; +use tar::{Archive, Builder}; + +pub fn to_tar_gz(src: impl AsRef, dest: impl AsRef) -> anyhow::Result<()> { + let mut f = File::create(dest)?; + let gz_encoder = GzEncoder::new(&mut f, Compression::default()); + let mut tar_encoder = Builder::new(gz_encoder); + tar_encoder.append_dir_all(".", src)?; + let gz_encoder = tar_encoder.into_inner()?; + gz_encoder.finish()?; + f.flush()?; + Ok(()) +} + +pub fn from_tar_gz(src: impl AsRef, dest: impl AsRef) -> anyhow::Result<()> { + let f = File::open(&src)?; + let gz = GzDecoder::new(f); + let mut ar = Archive::new(gz); + create_dir_all(&dest)?; + ar.unpack(&dest)?; + Ok(()) +} diff --git a/meilisearch-types/src/lib.rs b/meilisearch-types/src/lib.rs index f141de197..c7f7ca7f5 100644 --- a/meilisearch-types/src/lib.rs +++ b/meilisearch-types/src/lib.rs @@ -1,3 +1,4 @@ +pub mod compression; pub mod document_formats; pub mod error; pub mod index_uid;