Compress the snapshot in a tarball

This commit is contained in:
Kerollmops 2022-10-25 15:51:15 +02:00 committed by Tamo
parent 19910f912d
commit 90f1334757
8 changed files with 65 additions and 7 deletions

2
Cargo.lock generated
View File

@ -2341,6 +2341,7 @@ dependencies = [
"csv",
"either",
"enum-iterator",
"flate2",
"fst",
"insta",
"meili-snap",
@ -2350,6 +2351,7 @@ dependencies = [
"roaring",
"serde",
"serde_json",
"tar",
"thiserror",
"time",
"tokio",

View File

@ -18,6 +18,7 @@ one indexing operation.
*/
use std::collections::HashSet;
use std::ffi::OsStr;
use std::fs::{self, File};
use std::io::BufWriter;
@ -33,7 +34,7 @@ use meilisearch_types::milli::update::{
use meilisearch_types::milli::{self, BEU32};
use meilisearch_types::settings::{apply_settings_to_builder, Settings, Unchecked};
use meilisearch_types::tasks::{Details, Kind, KindWithContent, Status, Task};
use meilisearch_types::{Index, VERSION_FILE_NAME};
use meilisearch_types::{compression, Index, VERSION_FILE_NAME};
use roaring::RoaringBitmap;
use time::OffsetDateTime;
use uuid::Uuid;
@ -621,7 +622,23 @@ impl IndexScheduler {
let auth = milli::heed::EnvOpenOptions::new().open(src)?;
auth.copy_to_path(dst, CompactionOption::Enabled)?;
todo!("tar-gz and append .snapshot at the end of the file");
// 5. Copy and tarball the flat snapshot
// 5.1 Find the original name of the database
// TODO find a better way to get this path
let mut base_path = self.env.path().to_owned();
base_path.pop();
let db_name = base_path.file_name().and_then(OsStr::to_str).unwrap_or("data.ms");
// 5.2 Tarball the content of the snapshot in a tempfile with a .snapshot extension
let snapshot_path = self.snapshots_path.join(db_name).with_extension("snapshot");
let temp_snapshot_file = tempfile::NamedTempFile::new_in(&self.snapshots_path)?;
compression::to_tar_gz(temp_snapshot_dir.path(), temp_snapshot_file.path())?;
let file = temp_snapshot_file.persist(&snapshot_path)?;
// 5.3 Change the permission to make the snapshot readonly
let mut permissions = file.metadata()?.permissions();
permissions.set_readonly(true);
file.set_permissions(permissions)?;
for task in &mut tasks {
task.status = Status::Succeeded;

View File

@ -32,6 +32,8 @@ pub enum Error {
FileStore(#[from] file_store::Error),
#[error(transparent)]
IoError(#[from] std::io::Error),
#[error(transparent)]
Persist(#[from] tempfile::PersistError),
#[error(transparent)]
Anyhow(#[from] anyhow::Error),
@ -59,10 +61,11 @@ impl ErrorCode for Error {
Error::Dump(e) => e.error_code(),
Error::Milli(e) => e.error_code(),
Error::ProcessBatchPanicked => Code::Internal,
// TODO: TAMO: are all these errors really internal?
// TODO: TAMO: are all these errors really internal?
Error::Heed(_) => Code::Internal,
Error::FileStore(_) => Code::Internal,
Error::IoError(_) => Code::Internal,
Error::Persist(_) => Code::Internal,
Error::Anyhow(_) => Code::Internal,
Error::CorruptedTaskQueue => Code::Internal,
Error::CorruptedDump => Code::Internal,

View File

@ -955,6 +955,7 @@ mod tests {
use meilisearch_types::milli::update::IndexDocumentsMethod::{
ReplaceDocuments, UpdateDocuments,
};
use meilisearch_types::VERSION_FILE_NAME;
use tempfile::TempDir;
use time::Duration;
use uuid::Uuid;

View File

@ -35,6 +35,7 @@ use meilisearch_auth::AuthController;
use meilisearch_types::milli::documents::{DocumentsBatchBuilder, DocumentsBatchReader};
use meilisearch_types::milli::update::{IndexDocumentsConfig, IndexDocumentsMethod};
use meilisearch_types::settings::apply_settings_to_builder;
use meilisearch_types::versioning::{check_version_file, create_version_file};
use meilisearch_types::{milli, VERSION_FILE_NAME};
pub use option::Opt;
@ -128,23 +129,23 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(IndexScheduler, AuthContr
match (
index_scheduler_builder().map_err(anyhow::Error::from),
auth_controller_builder().map_err(anyhow::Error::from),
create_version_file(&opt.db_path).map_err(anyhow::Error::from),
) {
(Ok(i), Ok(a)) => Ok((i, a)),
(Err(e), _) | (_, Err(e)) => {
(Ok(i), Ok(a), Ok(())) => Ok((i, a)),
(Err(e), _, _) | (_, Err(e), _) | (_, _, Err(e)) => {
std::fs::remove_dir_all(&opt.db_path)?;
Err(e)
}
}
};
let empty_db = is_empty_db(&opt.db_path);
let (index_scheduler, auth_controller) = if let Some(ref _path) = opt.import_snapshot {
// handle the snapshot with something akin to the dumps
// + the snapshot interval / spawning a thread
todo!();
} else if let Some(ref path) = opt.import_dump {
let empty_db = is_empty_db(&opt.db_path);
let src_path_exists = path.exists();
if empty_db && src_path_exists {
let (mut index_scheduler, mut auth_controller) = meilisearch_builder()?;
match import_dump(&opt.db_path, path, &mut index_scheduler, &mut auth_controller) {
@ -172,6 +173,9 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(IndexScheduler, AuthContr
}
}
} else {
if !empty_db {
check_version_file(&opt.db_path)?;
}
meilisearch_builder()?
};

View File

@ -10,6 +10,7 @@ anyhow = "1.0.65"
csv = "1.1.6"
either = { version = "1.6.1", features = ["serde"] }
enum-iterator = "1.1.3"
flate2 = "1.0.24"
fst = "0.4.7"
milli = { git = "https://github.com/meilisearch/milli.git", branch = "indexation-abortion", default-features = false }
proptest = { version = "1.0.0", optional = true }
@ -17,6 +18,7 @@ proptest-derive = { version = "0.3.0", optional = true }
roaring = { version = "0.10.0", features = ["serde"] }
serde = { version = "1.0.145", features = ["derive"] }
serde_json = "1.0.85"
tar = "0.4.38"
thiserror = "1.0.30"
time = { version = "0.3.7", features = ["serde-well-known", "formatting", "parsing", "macros"] }
tokio = "1.0"

View File

@ -0,0 +1,28 @@
use std::fs::{create_dir_all, File};
use std::io::Write;
use std::path::Path;
use flate2::read::GzDecoder;
use flate2::write::GzEncoder;
use flate2::Compression;
use tar::{Archive, Builder};
pub fn to_tar_gz(src: impl AsRef<Path>, dest: impl AsRef<Path>) -> anyhow::Result<()> {
let mut f = File::create(dest)?;
let gz_encoder = GzEncoder::new(&mut f, Compression::default());
let mut tar_encoder = Builder::new(gz_encoder);
tar_encoder.append_dir_all(".", src)?;
let gz_encoder = tar_encoder.into_inner()?;
gz_encoder.finish()?;
f.flush()?;
Ok(())
}
pub fn from_tar_gz(src: impl AsRef<Path>, dest: impl AsRef<Path>) -> anyhow::Result<()> {
let f = File::open(&src)?;
let gz = GzDecoder::new(f);
let mut ar = Archive::new(gz);
create_dir_all(&dest)?;
ar.unpack(&dest)?;
Ok(())
}

View File

@ -1,3 +1,4 @@
pub mod compression;
pub mod document_formats;
pub mod error;
pub mod index_uid;