mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-01-18 08:48:32 +08:00
Merge #829
829: implement snapshoting r=MarinPostma a=LegendreM related to #551. This pull request permit user to create periodically a snapshot of MeiliSearch database via a command line and launch meiliSearch from a snapshot with another command ## Documentation ### schedule a snapshot `--snapshot-path <DIRECTORY_PATH>`: this will periodically create a snapshot `<DB_NAME>.tar.gz` in the specified directory ### change period between 2 snapshot creation `--snapshot-interval-sec <GAP_IN_SEC>` choose the time gap between 2 snapshot ### start meilisearch from a snapshot `--load-from-snapshot <FILE_PATH>` this will use the snapshot stored at `<FILE_PATH>` to initialize MeiliSearch database, `--ignore-snapshot-if-db-exists` if set and if a db already exists, this will skip snapshot importation and continue process with actual db instead of quitting process by returning an Error `--ignore-missing-snapshot` if set and if no snapshot exists at provided path, this will skip snapshot importation and continue process with actual db instead of quitting process by returning an Error Co-authored-by: many <maxime@meilisearch.com>
This commit is contained in:
commit
a23bdb31a3
54
Cargo.lock
generated
54
Cargo.lock
generated
@ -301,10 +301,10 @@ dependencies = [
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "adler32"
|
||||
version = "1.0.4"
|
||||
name = "adler"
|
||||
version = "0.2.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5d2e7343e7fc9de883d1b0341e0b13970f764c14101234857d2ddafa1cb1cac2"
|
||||
checksum = "ccc9a9dd069569f212bc4330af9f17c4afb5e8ce185e83dbb14f1349dda18b10"
|
||||
|
||||
[[package]]
|
||||
name = "ahash"
|
||||
@ -889,10 +889,22 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed"
|
||||
|
||||
[[package]]
|
||||
name = "flate2"
|
||||
version = "1.0.14"
|
||||
name = "filetime"
|
||||
version = "0.2.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2cfff41391129e0a856d6d822600b8d71179d46879e310417eb9c762eb178b42"
|
||||
checksum = "affc17579b132fc2461adf7c575cc6e8b134ebca52c51f5411388965227dc695"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"libc",
|
||||
"redox_syscall",
|
||||
"winapi 0.3.8",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "flate2"
|
||||
version = "1.0.16"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "68c90b0fc46cf89d227cc78b40e494ff81287a92dd07631e5af0d06fe3cf885e"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"crc32fast",
|
||||
@ -1548,6 +1560,7 @@ dependencies = [
|
||||
"chrono",
|
||||
"crossbeam-channel",
|
||||
"env_logger",
|
||||
"flate2",
|
||||
"futures",
|
||||
"http 0.1.21",
|
||||
"indexmap",
|
||||
@ -1571,7 +1584,9 @@ dependencies = [
|
||||
"siphasher",
|
||||
"slice-group-by",
|
||||
"structopt",
|
||||
"tar",
|
||||
"tempdir",
|
||||
"tempfile",
|
||||
"tokio",
|
||||
"ureq",
|
||||
"vergen",
|
||||
@ -1639,11 +1654,11 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "miniz_oxide"
|
||||
version = "0.3.6"
|
||||
version = "0.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "aa679ff6578b1cddee93d7e82e263b94a575e0bfced07284eb0c037c1d2416a5"
|
||||
checksum = "be0f75932c1f6cfae3c04000e40114adf955636e19040f9c0a2c380702aa1c7f"
|
||||
dependencies = [
|
||||
"adler32",
|
||||
"adler",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -2585,6 +2600,18 @@ dependencies = [
|
||||
"unicode-xid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tar"
|
||||
version = "0.4.29"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c8a4c1d0bee3230179544336c15eefb563cf0302955d962e456542323e8c2e8a"
|
||||
dependencies = [
|
||||
"filetime",
|
||||
"libc",
|
||||
"redox_syscall",
|
||||
"xattr",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tempdir"
|
||||
version = "0.3.7"
|
||||
@ -3162,6 +3189,15 @@ dependencies = [
|
||||
"winapi-build",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "xattr"
|
||||
version = "0.2.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "244c3741f4240ef46274860397c7c74e50eb23624996930e484c16679633a54c"
|
||||
dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "zerocopy"
|
||||
version = "0.3.0"
|
||||
|
@ -27,6 +27,7 @@ bytes = "0.5.4"
|
||||
chrono = { version = "0.4.11", features = ["serde"] }
|
||||
crossbeam-channel = "0.4.2"
|
||||
env_logger = "0.7.1"
|
||||
flate2 = "1.0.16"
|
||||
futures = "0.3.4"
|
||||
http = "0.1.19"
|
||||
indexmap = { version = "1.3.2", features = ["serde-1"] }
|
||||
@ -47,6 +48,8 @@ sha2 = "0.8.1"
|
||||
siphasher = "0.3.2"
|
||||
slice-group-by = "0.2.6"
|
||||
structopt = "0.3.12"
|
||||
tar = "0.4.29"
|
||||
tempfile = "3.1.0"
|
||||
tokio = { version = "0.2.18", features = ["macros"] }
|
||||
ureq = { version = "0.12.0", features = ["tls"], default-features = false }
|
||||
walkdir = "2.3.1"
|
||||
|
@ -236,6 +236,18 @@ impl From<actix_http::Error> for Error {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<std::io::Error> for Error {
|
||||
fn from(err: std::io::Error) -> Error {
|
||||
Error::Internal(err.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<meilisearch_core::Error> for Error {
|
||||
fn from(err: meilisearch_core::Error) -> Error {
|
||||
Error::Internal(err.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<FacetCountError> for ResponseError {
|
||||
fn from(err: FacetCountError) -> ResponseError {
|
||||
ResponseError { inner: Box::new(err) }
|
||||
|
@ -7,6 +7,7 @@ pub mod models;
|
||||
pub mod option;
|
||||
pub mod routes;
|
||||
pub mod analytics;
|
||||
pub mod snapshot;
|
||||
|
||||
use actix_http::Error;
|
||||
use actix_service::ServiceFactory;
|
||||
|
@ -6,6 +6,7 @@ use main_error::MainError;
|
||||
use meilisearch_http::helpers::NormalizePath;
|
||||
use meilisearch_http::{create_app, index_update_callback, Data, Opt};
|
||||
use structopt::StructOpt;
|
||||
use meilisearch_http::snapshot;
|
||||
|
||||
mod analytics;
|
||||
|
||||
@ -51,6 +52,10 @@ async fn main() -> Result<(), MainError> {
|
||||
_ => unreachable!(),
|
||||
}
|
||||
|
||||
if let Some(path) = &opt.load_from_snapshot {
|
||||
snapshot::load_snapshot(&opt.db_path, path, opt.ignore_snapshot_if_db_exists, opt.ignore_missing_snapshot)?;
|
||||
}
|
||||
|
||||
let data = Data::new(opt.clone())?;
|
||||
|
||||
if !opt.no_analytics {
|
||||
@ -64,6 +69,10 @@ async fn main() -> Result<(), MainError> {
|
||||
index_update_callback(name, &data_cloned, status);
|
||||
}));
|
||||
|
||||
if let Some(path) = &opt.snapshot_path {
|
||||
snapshot::schedule_snapshot(data.clone(), &path, opt.snapshot_interval_sec)?;
|
||||
}
|
||||
|
||||
print_launch_resume(&opt, &data);
|
||||
|
||||
let http_server = HttpServer::new(move || {
|
||||
|
@ -1,7 +1,7 @@
|
||||
use std::{error, fs};
|
||||
use std::io::{BufReader, Read};
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::{error, fs};
|
||||
|
||||
use rustls::internal::pemfile::{certs, pkcs8_private_keys, rsa_private_keys};
|
||||
use rustls::{
|
||||
@ -93,6 +93,28 @@ pub struct Opt {
|
||||
/// SSL support tickets.
|
||||
#[structopt(long, env = "MEILI_SSL_TICKETS")]
|
||||
pub ssl_tickets: bool,
|
||||
|
||||
/// Defines the path of the snapshot file to import.
|
||||
/// This option will, by default, stop the process if a database already exist or if no snapshot exists at
|
||||
/// the given path. If this option is not specified no snapshot is imported.
|
||||
#[structopt(long, env = "MEILI_LOAD_FROM_SNAPSHOT")]
|
||||
pub load_from_snapshot: Option<PathBuf>,
|
||||
|
||||
/// The engine will ignore a missing snapshot and not return an error in such case.
|
||||
#[structopt(long, requires = "load-from-snapshot", env = "MEILI_IGNORE_MISSING_SNAPSHOT")]
|
||||
pub ignore_missing_snapshot: bool,
|
||||
|
||||
/// The engine will skip snapshot importation and not return an error in such case.
|
||||
#[structopt(long, requires = "load-from-snapshot", env = "MEILI_IGNORE_SNAPSHOT_IF_DB_EXISTS")]
|
||||
pub ignore_snapshot_if_db_exists: bool,
|
||||
|
||||
/// Defines the directory path where meilisearch will create snapshot each snapshot_time_gap.
|
||||
#[structopt(long, env = "MEILI_SNAPSHOT_PATH")]
|
||||
pub snapshot_path: Option<PathBuf>,
|
||||
|
||||
/// Defines time interval, in seconds, between each snapshot creation.
|
||||
#[structopt(long, requires = "snapshot-path", default_value = "86400", env = "MEILI_SNAPSHOT_INTERVAL_SEC")]
|
||||
pub snapshot_interval_sec: u64,
|
||||
}
|
||||
|
||||
impl Opt {
|
||||
|
124
meilisearch-http/src/snapshot.rs
Normal file
124
meilisearch-http/src/snapshot.rs
Normal file
@ -0,0 +1,124 @@
|
||||
use crate::Data;
|
||||
use crate::error::Error;
|
||||
|
||||
use flate2::Compression;
|
||||
use flate2::read::GzDecoder;
|
||||
use flate2::write::GzEncoder;
|
||||
use log::error;
|
||||
use std::fs::{create_dir_all, File};
|
||||
use std::io;
|
||||
use std::path::Path;
|
||||
use std::thread;
|
||||
use std::time::{Duration};
|
||||
use tar::{Builder, Archive};
|
||||
use tempfile::TempDir;
|
||||
|
||||
fn pack(src: &Path, dest: &Path) -> io::Result<()> {
|
||||
let f = File::create(dest)?;
|
||||
let gz_encoder = GzEncoder::new(f, Compression::default());
|
||||
|
||||
let mut tar_encoder = Builder::new(gz_encoder);
|
||||
tar_encoder.append_dir_all(".", src)?;
|
||||
let gz_encoder = tar_encoder.into_inner()?;
|
||||
|
||||
gz_encoder.finish()?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn unpack(src: &Path, dest: &Path) -> Result<(), Error> {
|
||||
let f = File::open(src)?;
|
||||
let gz = GzDecoder::new(f);
|
||||
let mut ar = Archive::new(gz);
|
||||
|
||||
create_dir_all(dest)?;
|
||||
ar.unpack(dest)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn load_snapshot(
|
||||
db_path: &str,
|
||||
snapshot_path: &Path,
|
||||
ignore_snapshot_if_db_exists: bool,
|
||||
ignore_missing_snapshot: bool
|
||||
) -> Result<(), Error> {
|
||||
let db_path = Path::new(db_path);
|
||||
|
||||
if !db_path.exists() && snapshot_path.exists() {
|
||||
unpack(snapshot_path, db_path)
|
||||
} else if db_path.exists() && !ignore_snapshot_if_db_exists {
|
||||
Err(Error::Internal(format!("database already exists at {:?}", db_path)))
|
||||
} else if !snapshot_path.exists() && !ignore_missing_snapshot {
|
||||
Err(Error::Internal(format!("snapshot doesn't exist at {:?}", snapshot_path)))
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn create_snapshot(data: &Data, snapshot_path: &Path) -> Result<(), Error> {
|
||||
let tmp_dir = TempDir::new()?;
|
||||
|
||||
data.db.copy_and_compact_to_path(tmp_dir.path())?;
|
||||
|
||||
pack(tmp_dir.path(), snapshot_path).or_else(|e| Err(Error::Internal(format!("something went wrong during snapshot compression: {}", e))))
|
||||
}
|
||||
|
||||
pub fn schedule_snapshot(data: Data, snapshot_dir: &Path, time_gap_s: u64) -> Result<(), Error> {
|
||||
if snapshot_dir.file_name().is_none() {
|
||||
return Err(Error::Internal("invalid snapshot file path".to_string()));
|
||||
}
|
||||
let db_name = Path::new(&data.db_path).file_name().ok_or_else(|| Error::Internal("invalid database name".to_string()))?;
|
||||
create_dir_all(snapshot_dir)?;
|
||||
let snapshot_path = snapshot_dir.join(format!("{}.tar.gz", db_name.to_str().unwrap_or("data.ms")));
|
||||
|
||||
thread::spawn(move || loop {
|
||||
thread::sleep(Duration::from_secs(time_gap_s));
|
||||
if let Err(e) = create_snapshot(&data, &snapshot_path) {
|
||||
error!("Unsuccessful snapshot creation: {}", e);
|
||||
}
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::io::prelude::*;
|
||||
use std::fs;
|
||||
|
||||
#[test]
|
||||
fn test_pack_unpack() {
|
||||
let tempdir = TempDir::new().unwrap();
|
||||
|
||||
let test_dir = tempdir.path();
|
||||
let src_dir = test_dir.join("src");
|
||||
let dest_dir = test_dir.join("complex/destination/path/");
|
||||
let archive_path = test_dir.join("archive.tar.gz");
|
||||
|
||||
let file_1_relative = Path::new("file1.txt");
|
||||
let subfolder_relative = Path::new("subfolder/");
|
||||
let file_2_relative = Path::new("subfolder/file2.txt");
|
||||
|
||||
create_dir_all(src_dir.join(subfolder_relative)).unwrap();
|
||||
File::create(src_dir.join(file_1_relative)).unwrap().write_all(b"Hello_file_1").unwrap();
|
||||
File::create(src_dir.join(file_2_relative)).unwrap().write_all(b"Hello_file_2").unwrap();
|
||||
|
||||
|
||||
assert!(pack(&src_dir, &archive_path).is_ok());
|
||||
assert!(archive_path.exists());
|
||||
assert!(load_snapshot(&dest_dir.to_str().unwrap(), &archive_path, false, false).is_ok());
|
||||
|
||||
assert!(dest_dir.exists());
|
||||
assert!(dest_dir.join(file_1_relative).exists());
|
||||
assert!(dest_dir.join(subfolder_relative).exists());
|
||||
assert!(dest_dir.join(file_2_relative).exists());
|
||||
|
||||
let contents = fs::read_to_string(dest_dir.join(file_1_relative)).unwrap();
|
||||
assert_eq!(contents, "Hello_file_1");
|
||||
|
||||
let contents = fs::read_to_string(dest_dir.join(file_2_relative)).unwrap();
|
||||
assert_eq!(contents, "Hello_file_2");
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user