diff --git a/Cargo.lock b/Cargo.lock index b24c2fc65..816a2f613 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -301,10 +301,10 @@ dependencies = [ ] [[package]] -name = "adler32" -version = "1.0.4" +name = "adler" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d2e7343e7fc9de883d1b0341e0b13970f764c14101234857d2ddafa1cb1cac2" +checksum = "ccc9a9dd069569f212bc4330af9f17c4afb5e8ce185e83dbb14f1349dda18b10" [[package]] name = "ahash" @@ -889,10 +889,22 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed" [[package]] -name = "flate2" -version = "1.0.14" +name = "filetime" +version = "0.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2cfff41391129e0a856d6d822600b8d71179d46879e310417eb9c762eb178b42" +checksum = "affc17579b132fc2461adf7c575cc6e8b134ebca52c51f5411388965227dc695" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "winapi 0.3.8", +] + +[[package]] +name = "flate2" +version = "1.0.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68c90b0fc46cf89d227cc78b40e494ff81287a92dd07631e5af0d06fe3cf885e" dependencies = [ "cfg-if", "crc32fast", @@ -1548,6 +1560,7 @@ dependencies = [ "chrono", "crossbeam-channel", "env_logger", + "flate2", "futures", "http 0.1.21", "indexmap", @@ -1571,7 +1584,9 @@ dependencies = [ "siphasher", "slice-group-by", "structopt", + "tar", "tempdir", + "tempfile", "tokio", "ureq", "vergen", @@ -1639,11 +1654,11 @@ dependencies = [ [[package]] name = "miniz_oxide" -version = "0.3.6" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa679ff6578b1cddee93d7e82e263b94a575e0bfced07284eb0c037c1d2416a5" +checksum = "be0f75932c1f6cfae3c04000e40114adf955636e19040f9c0a2c380702aa1c7f" dependencies = [ - "adler32", + "adler", ] [[package]] @@ -2585,6 +2600,18 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "tar" +version = "0.4.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8a4c1d0bee3230179544336c15eefb563cf0302955d962e456542323e8c2e8a" +dependencies = [ + "filetime", + "libc", + "redox_syscall", + "xattr", +] + [[package]] name = "tempdir" version = "0.3.7" @@ -3162,6 +3189,15 @@ dependencies = [ "winapi-build", ] +[[package]] +name = "xattr" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "244c3741f4240ef46274860397c7c74e50eb23624996930e484c16679633a54c" +dependencies = [ + "libc", +] + [[package]] name = "zerocopy" version = "0.3.0" diff --git a/meilisearch-http/Cargo.toml b/meilisearch-http/Cargo.toml index 5594f174d..eeb96cd77 100644 --- a/meilisearch-http/Cargo.toml +++ b/meilisearch-http/Cargo.toml @@ -27,6 +27,7 @@ bytes = "0.5.4" chrono = { version = "0.4.11", features = ["serde"] } crossbeam-channel = "0.4.2" env_logger = "0.7.1" +flate2 = "1.0.16" futures = "0.3.4" http = "0.1.19" indexmap = { version = "1.3.2", features = ["serde-1"] } @@ -47,6 +48,8 @@ sha2 = "0.8.1" siphasher = "0.3.2" slice-group-by = "0.2.6" structopt = "0.3.12" +tar = "0.4.29" +tempfile = "3.1.0" tokio = { version = "0.2.18", features = ["macros"] } ureq = { version = "0.12.0", features = ["tls"], default-features = false } walkdir = "2.3.1" diff --git a/meilisearch-http/src/error.rs b/meilisearch-http/src/error.rs index 7014b1a5b..501694c0d 100644 --- a/meilisearch-http/src/error.rs +++ b/meilisearch-http/src/error.rs @@ -236,6 +236,18 @@ impl From for Error { } } +impl From for Error { + fn from(err: std::io::Error) -> Error { + Error::Internal(err.to_string()) + } +} + +impl From for Error { + fn from(err: meilisearch_core::Error) -> Error { + Error::Internal(err.to_string()) + } +} + impl From for ResponseError { fn from(err: FacetCountError) -> ResponseError { ResponseError { inner: Box::new(err) } diff --git a/meilisearch-http/src/lib.rs b/meilisearch-http/src/lib.rs index 486f4a39f..c6de32160 100644 --- a/meilisearch-http/src/lib.rs +++ b/meilisearch-http/src/lib.rs @@ -7,6 +7,7 @@ pub mod models; pub mod option; pub mod routes; pub mod analytics; +pub mod snapshot; use actix_http::Error; use actix_service::ServiceFactory; diff --git a/meilisearch-http/src/main.rs b/meilisearch-http/src/main.rs index 13ce99d57..0537a462f 100644 --- a/meilisearch-http/src/main.rs +++ b/meilisearch-http/src/main.rs @@ -6,6 +6,7 @@ use main_error::MainError; use meilisearch_http::helpers::NormalizePath; use meilisearch_http::{create_app, index_update_callback, Data, Opt}; use structopt::StructOpt; +use meilisearch_http::snapshot; mod analytics; @@ -51,6 +52,10 @@ async fn main() -> Result<(), MainError> { _ => unreachable!(), } + if let Some(path) = &opt.load_from_snapshot { + snapshot::load_snapshot(&opt.db_path, path, opt.ignore_snapshot_if_db_exists, opt.ignore_missing_snapshot)?; + } + let data = Data::new(opt.clone())?; if !opt.no_analytics { @@ -64,6 +69,10 @@ async fn main() -> Result<(), MainError> { index_update_callback(name, &data_cloned, status); })); + if let Some(path) = &opt.snapshot_path { + snapshot::schedule_snapshot(data.clone(), &path, opt.snapshot_interval_sec)?; + } + print_launch_resume(&opt, &data); let http_server = HttpServer::new(move || { diff --git a/meilisearch-http/src/option.rs b/meilisearch-http/src/option.rs index 0c212066e..2bcb62687 100644 --- a/meilisearch-http/src/option.rs +++ b/meilisearch-http/src/option.rs @@ -1,7 +1,7 @@ +use std::{error, fs}; use std::io::{BufReader, Read}; use std::path::PathBuf; use std::sync::Arc; -use std::{error, fs}; use rustls::internal::pemfile::{certs, pkcs8_private_keys, rsa_private_keys}; use rustls::{ @@ -93,6 +93,28 @@ pub struct Opt { /// SSL support tickets. #[structopt(long, env = "MEILI_SSL_TICKETS")] pub ssl_tickets: bool, + + /// Defines the path of the snapshot file to import. + /// This option will, by default, stop the process if a database already exist or if no snapshot exists at + /// the given path. If this option is not specified no snapshot is imported. + #[structopt(long, env = "MEILI_LOAD_FROM_SNAPSHOT")] + pub load_from_snapshot: Option, + + /// The engine will ignore a missing snapshot and not return an error in such case. + #[structopt(long, requires = "load-from-snapshot", env = "MEILI_IGNORE_MISSING_SNAPSHOT")] + pub ignore_missing_snapshot: bool, + + /// The engine will skip snapshot importation and not return an error in such case. + #[structopt(long, requires = "load-from-snapshot", env = "MEILI_IGNORE_SNAPSHOT_IF_DB_EXISTS")] + pub ignore_snapshot_if_db_exists: bool, + + /// Defines the directory path where meilisearch will create snapshot each snapshot_time_gap. + #[structopt(long, env = "MEILI_SNAPSHOT_PATH")] + pub snapshot_path: Option, + + /// Defines time interval, in seconds, between each snapshot creation. + #[structopt(long, requires = "snapshot-path", default_value = "86400", env = "MEILI_SNAPSHOT_INTERVAL_SEC")] + pub snapshot_interval_sec: u64, } impl Opt { diff --git a/meilisearch-http/src/snapshot.rs b/meilisearch-http/src/snapshot.rs new file mode 100644 index 000000000..a82566145 --- /dev/null +++ b/meilisearch-http/src/snapshot.rs @@ -0,0 +1,124 @@ +use crate::Data; +use crate::error::Error; + +use flate2::Compression; +use flate2::read::GzDecoder; +use flate2::write::GzEncoder; +use log::error; +use std::fs::{create_dir_all, File}; +use std::io; +use std::path::Path; +use std::thread; +use std::time::{Duration}; +use tar::{Builder, Archive}; +use tempfile::TempDir; + +fn pack(src: &Path, dest: &Path) -> io::Result<()> { + let f = File::create(dest)?; + let gz_encoder = GzEncoder::new(f, Compression::default()); + + let mut tar_encoder = Builder::new(gz_encoder); + tar_encoder.append_dir_all(".", src)?; + let gz_encoder = tar_encoder.into_inner()?; + + gz_encoder.finish()?; + + Ok(()) +} + +fn unpack(src: &Path, dest: &Path) -> Result<(), Error> { + let f = File::open(src)?; + let gz = GzDecoder::new(f); + let mut ar = Archive::new(gz); + + create_dir_all(dest)?; + ar.unpack(dest)?; + + Ok(()) +} + +pub fn load_snapshot( + db_path: &str, + snapshot_path: &Path, + ignore_snapshot_if_db_exists: bool, + ignore_missing_snapshot: bool +) -> Result<(), Error> { + let db_path = Path::new(db_path); + + if !db_path.exists() && snapshot_path.exists() { + unpack(snapshot_path, db_path) + } else if db_path.exists() && !ignore_snapshot_if_db_exists { + Err(Error::Internal(format!("database already exists at {:?}", db_path))) + } else if !snapshot_path.exists() && !ignore_missing_snapshot { + Err(Error::Internal(format!("snapshot doesn't exist at {:?}", snapshot_path))) + } else { + Ok(()) + } +} + +pub fn create_snapshot(data: &Data, snapshot_path: &Path) -> Result<(), Error> { + let tmp_dir = TempDir::new()?; + + data.db.copy_and_compact_to_path(tmp_dir.path())?; + + pack(tmp_dir.path(), snapshot_path).or_else(|e| Err(Error::Internal(format!("something went wrong during snapshot compression: {}", e)))) +} + +pub fn schedule_snapshot(data: Data, snapshot_dir: &Path, time_gap_s: u64) -> Result<(), Error> { + if snapshot_dir.file_name().is_none() { + return Err(Error::Internal("invalid snapshot file path".to_string())); + } + let db_name = Path::new(&data.db_path).file_name().ok_or_else(|| Error::Internal("invalid database name".to_string()))?; + create_dir_all(snapshot_dir)?; + let snapshot_path = snapshot_dir.join(format!("{}.tar.gz", db_name.to_str().unwrap_or("data.ms"))); + + thread::spawn(move || loop { + thread::sleep(Duration::from_secs(time_gap_s)); + if let Err(e) = create_snapshot(&data, &snapshot_path) { + error!("Unsuccessful snapshot creation: {}", e); + } + }); + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::io::prelude::*; + use std::fs; + + #[test] + fn test_pack_unpack() { + let tempdir = TempDir::new().unwrap(); + + let test_dir = tempdir.path(); + let src_dir = test_dir.join("src"); + let dest_dir = test_dir.join("complex/destination/path/"); + let archive_path = test_dir.join("archive.tar.gz"); + + let file_1_relative = Path::new("file1.txt"); + let subfolder_relative = Path::new("subfolder/"); + let file_2_relative = Path::new("subfolder/file2.txt"); + + create_dir_all(src_dir.join(subfolder_relative)).unwrap(); + File::create(src_dir.join(file_1_relative)).unwrap().write_all(b"Hello_file_1").unwrap(); + File::create(src_dir.join(file_2_relative)).unwrap().write_all(b"Hello_file_2").unwrap(); + + + assert!(pack(&src_dir, &archive_path).is_ok()); + assert!(archive_path.exists()); + assert!(load_snapshot(&dest_dir.to_str().unwrap(), &archive_path, false, false).is_ok()); + + assert!(dest_dir.exists()); + assert!(dest_dir.join(file_1_relative).exists()); + assert!(dest_dir.join(subfolder_relative).exists()); + assert!(dest_dir.join(file_2_relative).exists()); + + let contents = fs::read_to_string(dest_dir.join(file_1_relative)).unwrap(); + assert_eq!(contents, "Hello_file_1"); + + let contents = fs::read_to_string(dest_dir.join(file_2_relative)).unwrap(); + assert_eq!(contents, "Hello_file_2"); + } +}