mirror of
https://github.com/meilisearch/meilisearch.git
synced 2024-11-23 02:27:40 +08:00
Merge #1008
1008: Dump info r=Kerollmops a=LegendreM fix #998 fix #988 fix #1009 fix #1010 fix #1033 Co-authored-by: many <maxime@meilisearch.com>
This commit is contained in:
commit
f83caa6c40
@ -27,7 +27,7 @@ impl Deref for Data {
|
||||
pub struct DataInner {
|
||||
pub db: Arc<Database>,
|
||||
pub db_path: String,
|
||||
pub dumps_folder: PathBuf,
|
||||
pub dumps_dir: PathBuf,
|
||||
pub dump_batch_size: usize,
|
||||
pub api_keys: ApiKeys,
|
||||
pub server_pid: u32,
|
||||
@ -61,7 +61,7 @@ impl ApiKeys {
|
||||
impl Data {
|
||||
pub fn new(opt: Opt) -> Result<Data, Box<dyn Error>> {
|
||||
let db_path = opt.db_path.clone();
|
||||
let dumps_folder = opt.dumps_folder.clone();
|
||||
let dumps_dir = opt.dumps_dir.clone();
|
||||
let dump_batch_size = opt.dump_batch_size;
|
||||
let server_pid = std::process::id();
|
||||
|
||||
@ -85,7 +85,7 @@ impl Data {
|
||||
let inner_data = DataInner {
|
||||
db: db.clone(),
|
||||
db_path,
|
||||
dumps_folder,
|
||||
dumps_dir,
|
||||
dump_batch_size,
|
||||
api_keys,
|
||||
server_pid,
|
||||
|
@ -13,10 +13,11 @@ use meilisearch_core::settings::Settings;
|
||||
use meilisearch_core::update::{apply_settings_update, apply_documents_addition};
|
||||
use once_cell::sync::Lazy;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::json;
|
||||
use tempfile::TempDir;
|
||||
|
||||
use crate::Data;
|
||||
use crate::error::Error;
|
||||
use crate::error::{Error, ResponseError};
|
||||
use crate::helpers::compression;
|
||||
use crate::routes::index;
|
||||
use crate::routes::index::IndexResponse;
|
||||
@ -51,9 +52,9 @@ impl DumpMetadata {
|
||||
}
|
||||
}
|
||||
|
||||
/// Extract DumpMetadata from `metadata.json` file present at provided `folder_path`
|
||||
fn from_path(folder_path: &Path) -> Result<Self, Error> {
|
||||
let path = folder_path.join("metadata.json");
|
||||
/// Extract DumpMetadata from `metadata.json` file present at provided `dir_path`
|
||||
fn from_path(dir_path: &Path) -> Result<Self, Error> {
|
||||
let path = dir_path.join("metadata.json");
|
||||
let file = File::open(path)?;
|
||||
let reader = std::io::BufReader::new(file);
|
||||
let metadata = serde_json::from_reader(reader)?;
|
||||
@ -61,9 +62,9 @@ impl DumpMetadata {
|
||||
Ok(metadata)
|
||||
}
|
||||
|
||||
/// Write DumpMetadata in `metadata.json` file at provided `folder_path`
|
||||
fn to_path(&self, folder_path: &Path) -> Result<(), Error> {
|
||||
let path = folder_path.join("metadata.json");
|
||||
/// Write DumpMetadata in `metadata.json` file at provided `dir_path`
|
||||
fn to_path(&self, dir_path: &Path) -> Result<(), Error> {
|
||||
let path = dir_path.join("metadata.json");
|
||||
let file = File::create(path)?;
|
||||
|
||||
serde_json::to_writer(file, &self)?;
|
||||
@ -72,9 +73,9 @@ impl DumpMetadata {
|
||||
}
|
||||
}
|
||||
|
||||
/// Extract Settings from `settings.json` file present at provided `folder_path`
|
||||
fn settings_from_path(folder_path: &Path) -> Result<Settings, Error> {
|
||||
let path = folder_path.join("settings.json");
|
||||
/// Extract Settings from `settings.json` file present at provided `dir_path`
|
||||
fn settings_from_path(dir_path: &Path) -> Result<Settings, Error> {
|
||||
let path = dir_path.join("settings.json");
|
||||
let file = File::open(path)?;
|
||||
let reader = std::io::BufReader::new(file);
|
||||
let metadata = serde_json::from_reader(reader)?;
|
||||
@ -82,9 +83,9 @@ fn settings_from_path(folder_path: &Path) -> Result<Settings, Error> {
|
||||
Ok(metadata)
|
||||
}
|
||||
|
||||
/// Write Settings in `settings.json` file at provided `folder_path`
|
||||
fn settings_to_path(settings: &Settings, folder_path: &Path) -> Result<(), Error> {
|
||||
let path = folder_path.join("settings.json");
|
||||
/// Write Settings in `settings.json` file at provided `dir_path`
|
||||
fn settings_to_path(settings: &Settings, dir_path: &Path) -> Result<(), Error> {
|
||||
let path = dir_path.join("settings.json");
|
||||
let file = File::create(path)?;
|
||||
|
||||
serde_json::to_writer(file, settings)?;
|
||||
@ -95,7 +96,7 @@ fn settings_to_path(settings: &Settings, folder_path: &Path) -> Result<(), Error
|
||||
/// Import settings and documents of a dump with version `DumpVersion::V1` in specified index.
|
||||
fn import_index_v1(
|
||||
data: &Data,
|
||||
dumps_folder: &Path,
|
||||
dumps_dir: &Path,
|
||||
index_uid: &str,
|
||||
document_batch_size: usize,
|
||||
write_txn: &mut MainWriter,
|
||||
@ -107,12 +108,12 @@ fn import_index_v1(
|
||||
.open_index(index_uid)
|
||||
.ok_or(Error::index_not_found(index_uid))?;
|
||||
|
||||
// index folder path in dump folder
|
||||
let index_path = &dumps_folder.join(index_uid);
|
||||
// index dir path in dump dir
|
||||
let index_path = &dumps_dir.join(index_uid);
|
||||
|
||||
// extract `settings.json` file and import content
|
||||
let settings = settings_from_path(&index_path)?;
|
||||
let settings = settings.to_update().map_err(|_e| Error::dump_failed())?;
|
||||
let settings = settings.to_update().map_err(|e| Error::dump_failed(format!("importing settings for index {}; {}", index_uid, e)))?;
|
||||
apply_settings_update(write_txn, &index, settings)?;
|
||||
|
||||
// create iterator over documents in `documents.jsonl` to make batch importation
|
||||
@ -199,17 +200,17 @@ pub fn import_dump(
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum DumpStatus {
|
||||
Done,
|
||||
Processing,
|
||||
DumpProcessFailed,
|
||||
InProgress,
|
||||
Failed,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
#[derive(Debug, Serialize, Clone)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct DumpInfo {
|
||||
pub uid: String,
|
||||
pub status: DumpStatus,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub error: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none", flatten)]
|
||||
pub error: Option<serde_json::Value>,
|
||||
}
|
||||
|
||||
impl DumpInfo {
|
||||
@ -217,15 +218,15 @@ impl DumpInfo {
|
||||
Self { uid, status, error: None }
|
||||
}
|
||||
|
||||
pub fn with_error(mut self, error: String) -> Self {
|
||||
self.status = DumpStatus::DumpProcessFailed;
|
||||
self.error = Some(error);
|
||||
pub fn with_error(mut self, error: ResponseError) -> Self {
|
||||
self.status = DumpStatus::Failed;
|
||||
self.error = Some(json!(error));
|
||||
|
||||
self
|
||||
}
|
||||
|
||||
pub fn dump_already_in_progress(&self) -> bool {
|
||||
self.status == DumpStatus::Processing
|
||||
self.status == DumpStatus::InProgress
|
||||
}
|
||||
|
||||
pub fn get_current() -> Option<Self> {
|
||||
@ -242,29 +243,29 @@ fn generate_uid() -> String {
|
||||
Utc::now().format("%Y%m%d-%H%M%S%3f").to_string()
|
||||
}
|
||||
|
||||
/// Infer dumps_folder from dump_uid
|
||||
pub fn compressed_dumps_folder(dumps_folder: &Path, dump_uid: &str) -> PathBuf {
|
||||
dumps_folder.join(format!("{}.tar.gz", dump_uid))
|
||||
/// Infer dumps_dir from dump_uid
|
||||
pub fn compressed_dumps_dir(dumps_dir: &Path, dump_uid: &str) -> PathBuf {
|
||||
dumps_dir.join(format!("{}.dump", dump_uid))
|
||||
}
|
||||
|
||||
/// Write metadata in dump
|
||||
fn dump_metadata(data: &web::Data<Data>, folder_path: &Path, indexes: Vec<IndexResponse>) -> Result<(), Error> {
|
||||
fn dump_metadata(data: &web::Data<Data>, dir_path: &Path, indexes: Vec<IndexResponse>) -> Result<(), Error> {
|
||||
let (db_major, db_minor, db_patch) = data.db.version();
|
||||
let metadata = DumpMetadata::new(indexes, format!("{}.{}.{}", db_major, db_minor, db_patch));
|
||||
|
||||
metadata.to_path(folder_path)
|
||||
metadata.to_path(dir_path)
|
||||
}
|
||||
|
||||
/// Export settings of provided index in dump
|
||||
fn dump_index_settings(data: &web::Data<Data>, reader: &MainReader, folder_path: &Path, index_uid: &str) -> Result<(), Error> {
|
||||
fn dump_index_settings(data: &web::Data<Data>, reader: &MainReader, dir_path: &Path, index_uid: &str) -> Result<(), Error> {
|
||||
let settings = crate::routes::setting::get_all_sync(data, reader, index_uid)?;
|
||||
|
||||
settings_to_path(&settings, folder_path)
|
||||
settings_to_path(&settings, dir_path)
|
||||
}
|
||||
|
||||
/// Export updates of provided index in dump
|
||||
fn dump_index_updates(data: &web::Data<Data>, reader: &UpdateReader, folder_path: &Path, index_uid: &str) -> Result<(), Error> {
|
||||
let updates_path = folder_path.join("updates.jsonl");
|
||||
fn dump_index_updates(data: &web::Data<Data>, reader: &UpdateReader, dir_path: &Path, index_uid: &str) -> Result<(), Error> {
|
||||
let updates_path = dir_path.join("updates.jsonl");
|
||||
let updates = crate::routes::index::get_all_updates_status_sync(data, reader, index_uid)?;
|
||||
|
||||
let file = File::create(updates_path)?;
|
||||
@ -278,8 +279,8 @@ fn dump_index_updates(data: &web::Data<Data>, reader: &UpdateReader, folder_path
|
||||
}
|
||||
|
||||
/// Export documents of provided index in dump
|
||||
fn dump_index_documents(data: &web::Data<Data>, reader: &MainReader, folder_path: &Path, index_uid: &str) -> Result<(), Error> {
|
||||
let documents_path = folder_path.join("documents.jsonl");
|
||||
fn dump_index_documents(data: &web::Data<Data>, reader: &MainReader, dir_path: &Path, index_uid: &str) -> Result<(), Error> {
|
||||
let documents_path = dir_path.join("documents.jsonl");
|
||||
let file = File::create(documents_path)?;
|
||||
let dump_batch_size = data.dump_batch_size;
|
||||
|
||||
@ -299,14 +300,14 @@ fn dump_index_documents(data: &web::Data<Data>, reader: &MainReader, folder_path
|
||||
|
||||
/// Write error with a context.
|
||||
fn fail_dump_process<E: std::error::Error>(dump_info: DumpInfo, context: &str, error: E) {
|
||||
let error = format!("Something went wrong during dump process: {}; {}", context, error);
|
||||
let error_message = format!("{}; {}", context, error);
|
||||
|
||||
error!("{}", &error);
|
||||
dump_info.with_error(error).set_current();
|
||||
error!("Something went wrong during dump process: {}", &error_message);
|
||||
dump_info.with_error(Error::dump_failed(error_message).into()).set_current();
|
||||
}
|
||||
|
||||
/// Main function of dump.
|
||||
fn dump_process(data: web::Data<Data>, dumps_folder: PathBuf, dump_info: DumpInfo) {
|
||||
fn dump_process(data: web::Data<Data>, dumps_dir: PathBuf, dump_info: DumpInfo) {
|
||||
// open read transaction on Update
|
||||
let update_reader = match data.db.update_read_txn() {
|
||||
Ok(r) => r,
|
||||
@ -379,8 +380,8 @@ fn dump_process(data: web::Data<Data>, dumps_folder: PathBuf, dump_info: DumpInf
|
||||
}
|
||||
}
|
||||
|
||||
// compress dump in a file named `{dump_uid}.tar.gz` in `dumps_folder`
|
||||
if let Err(e) = crate::helpers::compression::to_tar_gz(&tmp_dir_path, &compressed_dumps_folder(&dumps_folder, &dump_info.uid)) {
|
||||
// compress dump in a file named `{dump_uid}.dump` in `dumps_dir`
|
||||
if let Err(e) = crate::helpers::compression::to_tar_gz(&tmp_dir_path, &compressed_dumps_dir(&dumps_dir, &dump_info.uid)) {
|
||||
fail_dump_process(dump_info, "compressing dump", e);
|
||||
return ;
|
||||
}
|
||||
@ -394,8 +395,8 @@ fn dump_process(data: web::Data<Data>, dumps_folder: PathBuf, dump_info: DumpInf
|
||||
resume.set_current();
|
||||
}
|
||||
|
||||
pub fn init_dump_process(data: &web::Data<Data>, dumps_folder: &Path) -> Result<DumpInfo, Error> {
|
||||
create_dir_all(dumps_folder).or(Err(Error::dump_failed()))?;
|
||||
pub fn init_dump_process(data: &web::Data<Data>, dumps_dir: &Path) -> Result<DumpInfo, Error> {
|
||||
create_dir_all(dumps_dir).map_err(|e| Error::dump_failed(format!("creating temporary directory {}", e)))?;
|
||||
|
||||
// check if a dump is already in progress
|
||||
if let Some(resume) = DumpInfo::get_current() {
|
||||
@ -407,17 +408,17 @@ pub fn init_dump_process(data: &web::Data<Data>, dumps_folder: &Path) -> Result<
|
||||
// generate a new dump info
|
||||
let info = DumpInfo::new(
|
||||
generate_uid(),
|
||||
DumpStatus::Processing
|
||||
DumpStatus::InProgress
|
||||
);
|
||||
|
||||
info.set_current();
|
||||
|
||||
let data = data.clone();
|
||||
let dumps_folder = dumps_folder.to_path_buf();
|
||||
let dumps_dir = dumps_dir.to_path_buf();
|
||||
let info_cloned = info.clone();
|
||||
// run dump process in a new thread
|
||||
thread::spawn(move ||
|
||||
dump_process(data, dumps_folder, info_cloned)
|
||||
dump_process(data, dumps_dir, info_cloned)
|
||||
);
|
||||
|
||||
Ok(info)
|
||||
|
@ -5,7 +5,7 @@ use actix_http::ResponseBuilder;
|
||||
use actix_web as aweb;
|
||||
use actix_web::error::{JsonPayloadError, QueryPayloadError};
|
||||
use actix_web::http::StatusCode;
|
||||
use serde_json::json;
|
||||
use serde::ser::{Serialize, Serializer, SerializeStruct};
|
||||
|
||||
use meilisearch_error::{ErrorCode, Code};
|
||||
|
||||
@ -34,6 +34,51 @@ impl From<Error> for ResponseError {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<meilisearch_core::Error> for ResponseError {
|
||||
fn from(err: meilisearch_core::Error) -> ResponseError {
|
||||
ResponseError { inner: Box::new(err) }
|
||||
}
|
||||
}
|
||||
|
||||
impl From<meilisearch_schema::Error> for ResponseError {
|
||||
fn from(err: meilisearch_schema::Error) -> ResponseError {
|
||||
ResponseError { inner: Box::new(err) }
|
||||
}
|
||||
}
|
||||
|
||||
impl From<FacetCountError> for ResponseError {
|
||||
fn from(err: FacetCountError) -> ResponseError {
|
||||
ResponseError { inner: Box::new(err) }
|
||||
}
|
||||
}
|
||||
|
||||
impl Serialize for ResponseError {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
let struct_name = "ResponseError";
|
||||
let field_count = 4;
|
||||
|
||||
let mut state = serializer.serialize_struct(struct_name, field_count)?;
|
||||
state.serialize_field("message", &self.to_string())?;
|
||||
state.serialize_field("errorCode", &self.error_name())?;
|
||||
state.serialize_field("errorType", &self.error_type())?;
|
||||
state.serialize_field("errorLink", &self.error_url())?;
|
||||
state.end()
|
||||
}
|
||||
}
|
||||
|
||||
impl aweb::error::ResponseError for ResponseError {
|
||||
fn error_response(&self) -> aweb::HttpResponse {
|
||||
ResponseBuilder::new(self.status_code()).json(&self)
|
||||
}
|
||||
|
||||
fn status_code(&self) -> StatusCode {
|
||||
self.http_status()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Error {
|
||||
BadParameter(String, String),
|
||||
@ -54,7 +99,7 @@ pub enum Error {
|
||||
PayloadTooLarge,
|
||||
UnsupportedMediaType,
|
||||
DumpAlreadyInProgress,
|
||||
DumpProcessFailed,
|
||||
DumpProcessFailed(String),
|
||||
}
|
||||
|
||||
impl error::Error for Error {}
|
||||
@ -81,7 +126,7 @@ impl ErrorCode for Error {
|
||||
PayloadTooLarge => Code::PayloadTooLarge,
|
||||
UnsupportedMediaType => Code::UnsupportedMediaType,
|
||||
DumpAlreadyInProgress => Code::DumpAlreadyInProgress,
|
||||
DumpProcessFailed => Code::DumpProcessFailed,
|
||||
DumpProcessFailed(_) => Code::DumpProcessFailed,
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -189,8 +234,8 @@ impl Error {
|
||||
Error::DumpAlreadyInProgress
|
||||
}
|
||||
|
||||
pub fn dump_failed() -> Error {
|
||||
Error::DumpProcessFailed
|
||||
pub fn dump_failed(message: String) -> Error {
|
||||
Error::DumpProcessFailed(message)
|
||||
}
|
||||
}
|
||||
|
||||
@ -215,44 +260,17 @@ impl fmt::Display for Error {
|
||||
Self::PayloadTooLarge => f.write_str("Payload too large"),
|
||||
Self::UnsupportedMediaType => f.write_str("Unsupported media type"),
|
||||
Self::DumpAlreadyInProgress => f.write_str("Another dump is already in progress"),
|
||||
Self::DumpProcessFailed => f.write_str("Dump process failed"),
|
||||
Self::DumpProcessFailed(message) => write!(f, "Dump process failed: {}", message),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl aweb::error::ResponseError for ResponseError {
|
||||
fn error_response(&self) -> aweb::HttpResponse {
|
||||
ResponseBuilder::new(self.status_code()).json(json!({
|
||||
"message": self.to_string(),
|
||||
"errorCode": self.error_name(),
|
||||
"errorType": self.error_type(),
|
||||
"errorLink": self.error_url(),
|
||||
}))
|
||||
}
|
||||
|
||||
fn status_code(&self) -> StatusCode {
|
||||
self.http_status()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<std::io::Error> for Error {
|
||||
fn from(err: std::io::Error) -> Error {
|
||||
Error::Internal(err.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<meilisearch_core::Error> for ResponseError {
|
||||
fn from(err: meilisearch_core::Error) -> ResponseError {
|
||||
ResponseError { inner: Box::new(err) }
|
||||
}
|
||||
}
|
||||
|
||||
impl From<meilisearch_schema::Error> for ResponseError {
|
||||
fn from(err: meilisearch_schema::Error) -> ResponseError {
|
||||
ResponseError { inner: Box::new(err) }
|
||||
}
|
||||
}
|
||||
|
||||
impl From<actix_http::Error> for Error {
|
||||
fn from(err: actix_http::Error) -> Error {
|
||||
Error::Internal(err.to_string())
|
||||
@ -271,12 +289,6 @@ impl From<serde_json::error::Error> for Error {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<FacetCountError> for ResponseError {
|
||||
fn from(err: FacetCountError) -> ResponseError {
|
||||
ResponseError { inner: Box::new(err) }
|
||||
}
|
||||
}
|
||||
|
||||
impl From<JsonPayloadError> for Error {
|
||||
fn from(err: JsonPayloadError) -> Error {
|
||||
match err {
|
||||
|
@ -52,7 +52,7 @@ async fn main() -> Result<(), MainError> {
|
||||
_ => unreachable!(),
|
||||
}
|
||||
|
||||
if let Some(path) = &opt.load_from_snapshot {
|
||||
if let Some(path) = &opt.import_snapshot {
|
||||
snapshot::load_snapshot(&opt.db_path, path, opt.ignore_snapshot_if_db_exists, opt.ignore_missing_snapshot)?;
|
||||
}
|
||||
|
||||
@ -74,8 +74,8 @@ async fn main() -> Result<(), MainError> {
|
||||
dump::import_dump(&data, path, opt.dump_batch_size)?;
|
||||
}
|
||||
|
||||
if let Some(path) = &opt.snapshot_path {
|
||||
snapshot::schedule_snapshot(data.clone(), &path, opt.snapshot_interval_sec.unwrap_or(86400))?;
|
||||
if opt.schedule_snapshot {
|
||||
snapshot::schedule_snapshot(data.clone(), &opt.snapshot_dir, opt.snapshot_interval_sec.unwrap_or(86400))?;
|
||||
}
|
||||
|
||||
print_launch_resume(&opt, &data);
|
||||
|
@ -97,31 +97,35 @@ pub struct Opt {
|
||||
/// Defines the path of the snapshot file to import.
|
||||
/// This option will, by default, stop the process if a database already exist or if no snapshot exists at
|
||||
/// the given path. If this option is not specified no snapshot is imported.
|
||||
#[structopt(long, env = "MEILI_LOAD_FROM_SNAPSHOT")]
|
||||
pub load_from_snapshot: Option<PathBuf>,
|
||||
#[structopt(long)]
|
||||
pub import_snapshot: Option<PathBuf>,
|
||||
|
||||
/// The engine will ignore a missing snapshot and not return an error in such case.
|
||||
#[structopt(long, requires = "load-from-snapshot", env = "MEILI_IGNORE_MISSING_SNAPSHOT")]
|
||||
#[structopt(long, requires = "import-snapshot")]
|
||||
pub ignore_missing_snapshot: bool,
|
||||
|
||||
/// The engine will skip snapshot importation and not return an error in such case.
|
||||
#[structopt(long, requires = "load-from-snapshot", env = "MEILI_IGNORE_SNAPSHOT_IF_DB_EXISTS")]
|
||||
#[structopt(long, requires = "import-snapshot")]
|
||||
pub ignore_snapshot_if_db_exists: bool,
|
||||
|
||||
/// Defines the directory path where meilisearch will create snapshot each snapshot_time_gap.
|
||||
#[structopt(long, env = "MEILI_SNAPSHOT_PATH")]
|
||||
pub snapshot_path: Option<PathBuf>,
|
||||
#[structopt(long, env = "MEILI_SNAPSHOT_DIR", default_value = "snapshots/")]
|
||||
pub snapshot_dir: PathBuf,
|
||||
|
||||
/// Activate snapshot scheduling.
|
||||
#[structopt(long, env = "MEILI_SCHEDULE_SNAPSHOT")]
|
||||
pub schedule_snapshot: bool,
|
||||
|
||||
/// Defines time interval, in seconds, between each snapshot creation.
|
||||
#[structopt(long, requires = "snapshot-path", env = "MEILI_SNAPSHOT_INTERVAL_SEC")]
|
||||
#[structopt(long, env = "MEILI_SNAPSHOT_INTERVAL_SEC")]
|
||||
pub snapshot_interval_sec: Option<u64>,
|
||||
|
||||
/// Folder where dumps are created when the dump route is called.
|
||||
#[structopt(long, env = "MEILI_DUMPS_FOLDER", default_value = "dumps/")]
|
||||
pub dumps_folder: PathBuf,
|
||||
#[structopt(long, env = "MEILI_DUMPS_DIR", default_value = "dumps/")]
|
||||
pub dumps_dir: PathBuf,
|
||||
|
||||
/// Import a dump from the specified path, must be a `.tar.gz` file.
|
||||
#[structopt(long, env = "MEILI_IMPORT_DUMP", conflicts_with = "load-from-snapshot")]
|
||||
#[structopt(long, conflicts_with = "import-snapshot")]
|
||||
pub import_dump: Option<PathBuf>,
|
||||
|
||||
/// The batch size used in the importation process, the bigger it is the faster the dump is created.
|
||||
|
@ -5,7 +5,7 @@ use actix_web::{get, post};
|
||||
use actix_web::{HttpResponse, web};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::dump::{DumpInfo, DumpStatus, compressed_dumps_folder, init_dump_process};
|
||||
use crate::dump::{DumpInfo, DumpStatus, compressed_dumps_dir, init_dump_process};
|
||||
use crate::Data;
|
||||
use crate::error::{Error, ResponseError};
|
||||
use crate::helpers::Authentication;
|
||||
@ -19,8 +19,8 @@ pub fn services(cfg: &mut web::ServiceConfig) {
|
||||
async fn trigger_dump(
|
||||
data: web::Data<Data>,
|
||||
) -> Result<HttpResponse, ResponseError> {
|
||||
let dumps_folder = Path::new(&data.dumps_folder);
|
||||
match init_dump_process(&data, &dumps_folder) {
|
||||
let dumps_dir = Path::new(&data.dumps_dir);
|
||||
match init_dump_process(&data, &dumps_dir) {
|
||||
Ok(resume) => Ok(HttpResponse::Accepted().json(resume)),
|
||||
Err(e) => Err(e.into())
|
||||
}
|
||||
@ -42,7 +42,7 @@ async fn get_dump_status(
|
||||
data: web::Data<Data>,
|
||||
path: web::Path<DumpParam>,
|
||||
) -> Result<HttpResponse, ResponseError> {
|
||||
let dumps_folder = Path::new(&data.dumps_folder);
|
||||
let dumps_dir = Path::new(&data.dumps_dir);
|
||||
let dump_uid = &path.dump_uid;
|
||||
|
||||
if let Some(resume) = DumpInfo::get_current() {
|
||||
@ -51,7 +51,7 @@ async fn get_dump_status(
|
||||
}
|
||||
}
|
||||
|
||||
if File::open(compressed_dumps_folder(Path::new(dumps_folder), dump_uid)).is_ok() {
|
||||
if File::open(compressed_dumps_dir(Path::new(dumps_dir), dump_uid)).is_ok() {
|
||||
let resume = DumpInfo::new(
|
||||
dump_uid.into(),
|
||||
DumpStatus::Done
|
||||
|
@ -20,9 +20,9 @@ pub fn load_snapshot(
|
||||
if !db_path.exists() && snapshot_path.exists() {
|
||||
compression::from_tar_gz(snapshot_path, db_path)
|
||||
} else if db_path.exists() && !ignore_snapshot_if_db_exists {
|
||||
Err(Error::Internal(format!("database already exists at {:?}", db_path)))
|
||||
Err(Error::Internal(format!("database already exists at {:?}, try to delete it or rename it", db_path.canonicalize().unwrap_or(db_path.into()))))
|
||||
} else if !snapshot_path.exists() && !ignore_missing_snapshot {
|
||||
Err(Error::Internal(format!("snapshot doesn't exist at {:?}", snapshot_path)))
|
||||
Err(Error::Internal(format!("snapshot doesn't exist at {:?}", snapshot_path.canonicalize().unwrap_or(snapshot_path.into()))))
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
@ -42,13 +42,13 @@ pub fn schedule_snapshot(data: Data, snapshot_dir: &Path, time_gap_s: u64) -> Re
|
||||
}
|
||||
let db_name = Path::new(&data.db_path).file_name().ok_or_else(|| Error::Internal("invalid database name".to_string()))?;
|
||||
create_dir_all(snapshot_dir)?;
|
||||
let snapshot_path = snapshot_dir.join(format!("{}.tar.gz", db_name.to_str().unwrap_or("data.ms")));
|
||||
let snapshot_path = snapshot_dir.join(format!("{}.snapshot", db_name.to_str().unwrap_or("data.ms")));
|
||||
|
||||
thread::spawn(move || loop {
|
||||
thread::sleep(Duration::from_secs(time_gap_s));
|
||||
if let Err(e) = create_snapshot(&data, &snapshot_path) {
|
||||
error!("Unsuccessful snapshot creation: {}", e);
|
||||
}
|
||||
thread::sleep(Duration::from_secs(time_gap_s));
|
||||
});
|
||||
|
||||
Ok(())
|
||||
@ -67,13 +67,13 @@ mod tests {
|
||||
let test_dir = tempdir.path();
|
||||
let src_dir = test_dir.join("src");
|
||||
let dest_dir = test_dir.join("complex/destination/path/");
|
||||
let archive_path = test_dir.join("archive.tar.gz");
|
||||
let archive_path = test_dir.join("archive.snapshot");
|
||||
|
||||
let file_1_relative = Path::new("file1.txt");
|
||||
let subfolder_relative = Path::new("subfolder/");
|
||||
let file_2_relative = Path::new("subfolder/file2.txt");
|
||||
let subdir_relative = Path::new("subdir/");
|
||||
let file_2_relative = Path::new("subdir/file2.txt");
|
||||
|
||||
create_dir_all(src_dir.join(subfolder_relative)).unwrap();
|
||||
create_dir_all(src_dir.join(subdir_relative)).unwrap();
|
||||
fs::File::create(src_dir.join(file_1_relative)).unwrap().write_all(b"Hello_file_1").unwrap();
|
||||
fs::File::create(src_dir.join(file_2_relative)).unwrap().write_all(b"Hello_file_2").unwrap();
|
||||
|
||||
@ -84,7 +84,7 @@ mod tests {
|
||||
|
||||
assert!(dest_dir.exists());
|
||||
assert!(dest_dir.join(file_1_relative).exists());
|
||||
assert!(dest_dir.join(subfolder_relative).exists());
|
||||
assert!(dest_dir.join(subdir_relative).exists());
|
||||
assert!(dest_dir.join(file_2_relative).exists());
|
||||
|
||||
let contents = fs::read_to_string(dest_dir.join(file_1_relative)).unwrap();
|
||||
|
@ -49,7 +49,7 @@ impl Server {
|
||||
|
||||
let opt = Opt {
|
||||
db_path: tmp_dir.path().join("db").to_str().unwrap().to_string(),
|
||||
dumps_folder: tmp_dir.path().join("dump"),
|
||||
dumps_dir: tmp_dir.path().join("dump"),
|
||||
dump_batch_size: 16,
|
||||
http_addr: "127.0.0.1:7700".to_owned(),
|
||||
master_key: None,
|
||||
|
@ -101,7 +101,7 @@ async fn trigger_dump_concurently_should_return_conflict() {
|
||||
|
||||
#[actix_rt::test]
|
||||
#[ignore]
|
||||
async fn get_dump_status_early_should_return_processing() {
|
||||
async fn get_dump_status_early_should_return_in_progress() {
|
||||
let mut server = common::Server::test_server().await;
|
||||
|
||||
|
||||
@ -116,7 +116,7 @@ async fn get_dump_status_early_should_return_processing() {
|
||||
|
||||
let expected = json!({
|
||||
"uid": dump_uid,
|
||||
"status": "processing"
|
||||
"status": "in_progress"
|
||||
});
|
||||
|
||||
assert_eq!(status_code, 200);
|
||||
@ -150,6 +150,39 @@ async fn get_dump_status_should_return_done() {
|
||||
assert_json_eq!(expected.clone(), value.clone(), ordered: false);
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
#[ignore]
|
||||
async fn get_dump_status_should_return_error_provoking_it() {
|
||||
let mut server = common::Server::test_server().await;
|
||||
|
||||
|
||||
let (value, status_code) = server.trigger_dump().await;
|
||||
|
||||
// removing destination directory provoking `No such file or directory` error
|
||||
std::fs::remove_dir(server.data().dumps_dir.clone()).unwrap();
|
||||
|
||||
assert_eq!(status_code, 202);
|
||||
|
||||
let dump_uid = value["uid"].as_str().unwrap().to_string();
|
||||
|
||||
let expected = json!({
|
||||
"uid": dump_uid.clone(),
|
||||
"status": "failed",
|
||||
"message": "Dump process failed: compressing dump; No such file or directory (os error 2)",
|
||||
"errorCode": "dump_process_failed",
|
||||
"errorType": "internal_error",
|
||||
"errorLink": "https://docs.meilisearch.com/errors#dump_process_failed"
|
||||
});
|
||||
|
||||
thread::sleep(Duration::from_secs(1)); // wait dump until process end
|
||||
|
||||
let (value, status_code) = server.get_dump_status(&dump_uid).await;
|
||||
|
||||
assert_eq!(status_code, 200);
|
||||
|
||||
assert_json_eq!(expected.clone(), value.clone(), ordered: false);
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
#[ignore]
|
||||
async fn dump_metadata_should_be_valid() {
|
||||
@ -164,11 +197,11 @@ async fn dump_metadata_should_be_valid() {
|
||||
|
||||
let uid = trigger_and_wait_dump(&mut server).await;
|
||||
|
||||
let dumps_folder = Path::new(&server.data().dumps_folder);
|
||||
let dumps_dir = Path::new(&server.data().dumps_dir);
|
||||
let tmp_dir = TempDir::new().unwrap();
|
||||
let tmp_dir_path = tmp_dir.path();
|
||||
|
||||
compression::from_tar_gz(&dumps_folder.join(&format!("{}.tar.gz", uid)), tmp_dir_path).unwrap();
|
||||
compression::from_tar_gz(&dumps_dir.join(&format!("{}.dump", uid)), tmp_dir_path).unwrap();
|
||||
|
||||
let file = File::open(tmp_dir_path.join("metadata.json")).unwrap();
|
||||
let mut metadata: serde_json::Value = serde_json::from_reader(file).unwrap();
|
||||
@ -205,9 +238,9 @@ async fn dump_gzip_should_have_been_created() {
|
||||
|
||||
|
||||
let dump_uid = trigger_and_wait_dump(&mut server).await;
|
||||
let dumps_folder = Path::new(&server.data().dumps_folder);
|
||||
let dumps_dir = Path::new(&server.data().dumps_dir);
|
||||
|
||||
let compressed_path = dumps_folder.join(format!("{}.tar.gz", dump_uid));
|
||||
let compressed_path = dumps_dir.join(format!("{}.dump", dump_uid));
|
||||
assert!(File::open(compressed_path).is_ok());
|
||||
}
|
||||
|
||||
@ -279,11 +312,11 @@ async fn dump_index_settings_should_be_valid() {
|
||||
|
||||
let uid = trigger_and_wait_dump(&mut server).await;
|
||||
|
||||
let dumps_folder = Path::new(&server.data().dumps_folder);
|
||||
let dumps_dir = Path::new(&server.data().dumps_dir);
|
||||
let tmp_dir = TempDir::new().unwrap();
|
||||
let tmp_dir_path = tmp_dir.path();
|
||||
|
||||
compression::from_tar_gz(&dumps_folder.join(&format!("{}.tar.gz", uid)), tmp_dir_path).unwrap();
|
||||
compression::from_tar_gz(&dumps_dir.join(&format!("{}.dump", uid)), tmp_dir_path).unwrap();
|
||||
|
||||
let file = File::open(tmp_dir_path.join("test").join("settings.json")).unwrap();
|
||||
let settings: serde_json::Value = serde_json::from_reader(file).unwrap();
|
||||
@ -303,11 +336,11 @@ async fn dump_index_documents_should_be_valid() {
|
||||
|
||||
let uid = trigger_and_wait_dump(&mut server).await;
|
||||
|
||||
let dumps_folder = Path::new(&server.data().dumps_folder);
|
||||
let dumps_dir = Path::new(&server.data().dumps_dir);
|
||||
let tmp_dir = TempDir::new().unwrap();
|
||||
let tmp_dir_path = tmp_dir.path();
|
||||
|
||||
compression::from_tar_gz(&dumps_folder.join(&format!("{}.tar.gz", uid)), tmp_dir_path).unwrap();
|
||||
compression::from_tar_gz(&dumps_dir.join(&format!("{}.dump", uid)), tmp_dir_path).unwrap();
|
||||
|
||||
let file = File::open(tmp_dir_path.join("test").join("documents.jsonl")).unwrap();
|
||||
let documents = read_all_jsonline(file);
|
||||
@ -327,11 +360,11 @@ async fn dump_index_updates_should_be_valid() {
|
||||
|
||||
let uid = trigger_and_wait_dump(&mut server).await;
|
||||
|
||||
let dumps_folder = Path::new(&server.data().dumps_folder);
|
||||
let dumps_dir = Path::new(&server.data().dumps_dir);
|
||||
let tmp_dir = TempDir::new().unwrap();
|
||||
let tmp_dir_path = tmp_dir.path();
|
||||
|
||||
compression::from_tar_gz(&dumps_folder.join(&format!("{}.tar.gz", uid)), tmp_dir_path).unwrap();
|
||||
compression::from_tar_gz(&dumps_dir.join(&format!("{}.dump", uid)), tmp_dir_path).unwrap();
|
||||
|
||||
let file = File::open(tmp_dir_path.join("test").join("updates.jsonl")).unwrap();
|
||||
let mut updates = read_all_jsonline(file);
|
||||
|
Loading…
Reference in New Issue
Block a user