From 03af99650df17b354b295b271099d8f2c6a7aec6 Mon Sep 17 00:00:00 2001 From: mpostma Date: Wed, 29 Sep 2021 12:34:39 +0200 Subject: [PATCH 1/3] fix dumpv1 --- meilisearch-lib/src/compression.rs | 5 +++-- .../src/index_controller/dump_actor/loaders/v1.rs | 6 +++--- .../src/index_controller/dump_actor/mod.rs | 11 +++++++++++ .../src/index_controller/update_file_store.rs | 1 + 4 files changed, 18 insertions(+), 5 deletions(-) diff --git a/meilisearch-lib/src/compression.rs b/meilisearch-lib/src/compression.rs index c4747cb21..9bbe9ae00 100644 --- a/meilisearch-lib/src/compression.rs +++ b/meilisearch-lib/src/compression.rs @@ -1,8 +1,9 @@ -use std::fs::{create_dir_all, File}; +use std::fs::{File, create_dir_all}; use std::io::Write; use std::path::Path; -use flate2::{read::GzDecoder, write::GzEncoder, Compression}; +use flate2::read::GzDecoder; +use flate2::{Compression, write::GzEncoder}; use tar::{Archive, Builder}; pub fn to_tar_gz(src: impl AsRef, dest: impl AsRef) -> anyhow::Result<()> { diff --git a/meilisearch-lib/src/index_controller/dump_actor/loaders/v1.rs b/meilisearch-lib/src/index_controller/dump_actor/loaders/v1.rs index a41e18683..303bdd081 100644 --- a/meilisearch-lib/src/index_controller/dump_actor/loaders/v1.rs +++ b/meilisearch-lib/src/index_controller/dump_actor/loaders/v1.rs @@ -39,9 +39,9 @@ impl MetadataV1 { ); let uuid_store = HeedUuidStore::new(&dst)?; - for index in self.indexes { + for index in dbg!(self.indexes) { let uuid = Uuid::new_v4(); - uuid_store.insert(index.uid.clone(), uuid)?; + uuid_store.insert(dbg!(index.uid.clone()), dbg!(uuid))?; let src = src.as_ref().join(index.uid); load_index( &src, @@ -93,7 +93,7 @@ fn load_index( size: usize, indexer_options: &IndexerOpts, ) -> anyhow::Result<()> { - let index_path = dst.as_ref().join(&format!("indexes/index-{}", uuid)); + let index_path = dst.as_ref().join(&format!("indexes/{}", uuid)); create_dir_all(&index_path)?; let mut options = EnvOpenOptions::new(); diff --git a/meilisearch-lib/src/index_controller/dump_actor/mod.rs b/meilisearch-lib/src/index_controller/dump_actor/mod.rs index 3f9d33223..4f32ade4d 100644 --- a/meilisearch-lib/src/index_controller/dump_actor/mod.rs +++ b/meilisearch-lib/src/index_controller/dump_actor/mod.rs @@ -109,6 +109,15 @@ pub fn load_dump( update_db_size: usize, indexer_opts: &IndexerOpts, ) -> anyhow::Result<()> { + // Setup a temp directory path in the same path as the database, to prevent cross devices + // references. + let temp_path = dst_path.as_ref().parent().map(ToOwned::to_owned).unwrap_or_else(|| ".".into()); + if cfg!(windows) { + std::env::set_var("TMP", temp_path); + } else { + std::env::set_var("TMPDIR", temp_path); + } + let tmp_src = tempfile::tempdir()?; let tmp_src_path = tmp_src.path(); @@ -120,6 +129,8 @@ pub fn load_dump( let tmp_dst = tempfile::tempdir()?; + println!("temp path: {}", tmp_dst.path().display()); + match meta { Metadata::V1(meta) => { meta.load_dump(&tmp_src_path, tmp_dst.path(), index_db_size, indexer_opts)? diff --git a/meilisearch-lib/src/index_controller/update_file_store.rs b/meilisearch-lib/src/index_controller/update_file_store.rs index 09ddc1d89..2a178d908 100644 --- a/meilisearch-lib/src/index_controller/update_file_store.rs +++ b/meilisearch-lib/src/index_controller/update_file_store.rs @@ -75,6 +75,7 @@ impl UpdateFileStore { create_dir_all(&dst_update_files_path)?; + println!("src_update file: {}", src_update_files_path.display()); let entries = std::fs::read_dir(src_update_files_path)?; for entry in entries { From 66f39aaa927ac1c254712d51a5337d148cfcceab Mon Sep 17 00:00:00 2001 From: mpostma Date: Wed, 29 Sep 2021 15:24:59 +0200 Subject: [PATCH 2/3] fix dump v3 --- .../dump_actor/loaders/mod.rs | 3 +- .../index_controller/dump_actor/loaders/v2.rs | 52 ------------------- .../src/index_controller/dump_actor/mod.rs | 46 ++++++++++++---- .../src/index_controller/update_file_store.rs | 5 ++ 4 files changed, 42 insertions(+), 64 deletions(-) delete mode 100644 meilisearch-lib/src/index_controller/dump_actor/loaders/v2.rs diff --git a/meilisearch-lib/src/index_controller/dump_actor/loaders/mod.rs b/meilisearch-lib/src/index_controller/dump_actor/loaders/mod.rs index ae6adc7cf..5b686dd05 100644 --- a/meilisearch-lib/src/index_controller/dump_actor/loaders/mod.rs +++ b/meilisearch-lib/src/index_controller/dump_actor/loaders/mod.rs @@ -1,2 +1,3 @@ pub mod v1; -pub mod v2; +pub mod v3; + diff --git a/meilisearch-lib/src/index_controller/dump_actor/loaders/v2.rs b/meilisearch-lib/src/index_controller/dump_actor/loaders/v2.rs deleted file mode 100644 index 8280e9613..000000000 --- a/meilisearch-lib/src/index_controller/dump_actor/loaders/v2.rs +++ /dev/null @@ -1,52 +0,0 @@ -use std::path::Path; - -use chrono::{DateTime, Utc}; -use log::info; -use serde::{Deserialize, Serialize}; - -use crate::index_controller::index_resolver::IndexResolver; -use crate::index_controller::update_file_store::UpdateFileStore; -use crate::index_controller::updates::store::UpdateStore; -use crate::options::IndexerOpts; - -#[derive(Serialize, Deserialize, Debug)] -#[serde(rename_all = "camelCase")] -pub struct MetadataV2 { - db_version: String, - index_db_size: usize, - update_db_size: usize, - dump_date: DateTime, -} - -impl MetadataV2 { - pub fn new(index_db_size: usize, update_db_size: usize) -> Self { - Self { - db_version: env!("CARGO_PKG_VERSION").to_string(), - index_db_size, - update_db_size, - dump_date: Utc::now(), - } - } - - pub fn load_dump( - self, - src: impl AsRef, - dst: impl AsRef, - index_db_size: usize, - update_db_size: usize, - indexing_options: &IndexerOpts, - ) -> anyhow::Result<()> { - info!( - "Loading dump from {}, dump database version: {}, dump version: V2", - self.dump_date, self.db_version - ); - - IndexResolver::load_dump(src.as_ref(), &dst, index_db_size, indexing_options)?; - UpdateFileStore::load_dump(src.as_ref(), &dst)?; - UpdateStore::load_dump(&src, &dst, update_db_size)?; - - info!("Loading indexes."); - - Ok(()) - } -} diff --git a/meilisearch-lib/src/index_controller/dump_actor/mod.rs b/meilisearch-lib/src/index_controller/dump_actor/mod.rs index 4f32ade4d..12e819392 100644 --- a/meilisearch-lib/src/index_controller/dump_actor/mod.rs +++ b/meilisearch-lib/src/index_controller/dump_actor/mod.rs @@ -8,7 +8,6 @@ use serde::{Deserialize, Serialize}; use tokio::fs::create_dir_all; use loaders::v1::MetadataV1; -use loaders::v2::MetadataV2; pub use actor::DumpActor; pub use handle_impl::*; @@ -18,6 +17,7 @@ use super::index_resolver::HardStateIndexResolver; use super::updates::UpdateSender; use crate::compression::{from_tar_gz, to_tar_gz}; use crate::index_controller::dump_actor::error::DumpActorError; +use crate::index_controller::dump_actor::loaders::v3; use crate::index_controller::updates::UpdateMsg; use crate::options::IndexerOpts; use error::Result; @@ -30,6 +30,27 @@ mod message; const META_FILE_NAME: &str = "metadata.json"; +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct Metadata { + db_version: String, + index_db_size: usize, + update_db_size: usize, + dump_date: DateTime, +} + +impl Metadata { + pub fn new(index_db_size: usize, update_db_size: usize) -> Self { + Self { + db_version: env!("CARGO_PKG_VERSION").to_string(), + index_db_size, + update_db_size, + dump_date: Utc::now(), + } + } + +} + #[async_trait::async_trait] pub trait DumpActorHandle { /// Start the creation of a dump @@ -43,15 +64,16 @@ pub trait DumpActorHandle { #[derive(Debug, Serialize, Deserialize)] #[serde(tag = "dumpVersion")] -pub enum Metadata { +pub enum MetadataVersion { V1(MetadataV1), - V2(MetadataV2), + V2(Metadata), + V3(Metadata), } -impl Metadata { - pub fn new_v2(index_db_size: usize, update_db_size: usize) -> Self { - let meta = MetadataV2::new(index_db_size, update_db_size); - Self::V2(meta) +impl MetadataVersion { + pub fn new_v3(index_db_size: usize, update_db_size: usize) -> Self { + let meta = Metadata::new(index_db_size, update_db_size); + Self::V3(meta) } } @@ -125,23 +147,25 @@ pub fn load_dump( let meta_path = tmp_src_path.join(META_FILE_NAME); let mut meta_file = File::open(&meta_path)?; - let meta: Metadata = serde_json::from_reader(&mut meta_file)?; + let meta: MetadataVersion = serde_json::from_reader(&mut meta_file)?; let tmp_dst = tempfile::tempdir()?; println!("temp path: {}", tmp_dst.path().display()); match meta { - Metadata::V1(meta) => { + MetadataVersion::V1(meta) => { meta.load_dump(&tmp_src_path, tmp_dst.path(), index_db_size, indexer_opts)? } - Metadata::V2(meta) => meta.load_dump( + MetadataVersion::V3(meta) => v3::load_dump( + meta, &tmp_src_path, tmp_dst.path(), index_db_size, update_db_size, indexer_opts, )?, + MetadataVersion::V2(_) => todo!(), } // Persist and atomically rename the db let persisted_dump = tmp_dst.into_path(); @@ -173,7 +197,7 @@ impl DumpTask { let temp_dump_dir = tokio::task::spawn_blocking(tempfile::TempDir::new).await??; let temp_dump_path = temp_dump_dir.path().to_owned(); - let meta = Metadata::new_v2(self.index_db_size, self.update_db_size); + let meta = MetadataVersion::new_v3(self.index_db_size, self.update_db_size); let meta_path = temp_dump_path.join(META_FILE_NAME); let mut meta_file = File::create(&meta_path)?; serde_json::to_writer(&mut meta_file, &meta)?; diff --git a/meilisearch-lib/src/index_controller/update_file_store.rs b/meilisearch-lib/src/index_controller/update_file_store.rs index 2a178d908..82ba0bffb 100644 --- a/meilisearch-lib/src/index_controller/update_file_store.rs +++ b/meilisearch-lib/src/index_controller/update_file_store.rs @@ -73,6 +73,11 @@ impl UpdateFileStore { let src_update_files_path = src.as_ref().join(UPDATE_FILES_PATH); let dst_update_files_path = dst.as_ref().join(UPDATE_FILES_PATH); + // No update files to load + if !src_update_files_path.exists() { + return Ok(()) + } + create_dir_all(&dst_update_files_path)?; println!("src_update file: {}", src_update_files_path.display()); From ee372a7b3015140e2ca6ea11295953d6d2a968cb Mon Sep 17 00:00:00 2001 From: mpostma Date: Wed, 29 Sep 2021 15:41:25 +0200 Subject: [PATCH 3/3] implement new dump v2 --- meilisearch-lib/src/compression.rs | 5 +- meilisearch-lib/src/document_formats.rs | 2 +- meilisearch-lib/src/index/dump.rs | 40 +- .../dump_actor/loaders/mod.rs | 16 + .../index_controller/dump_actor/loaders/v1.rs | 16 +- .../index_controller/dump_actor/loaders/v2.rs | 393 ++++++++++++++++++ .../index_controller/dump_actor/loaders/v3.rs | 31 ++ .../src/index_controller/dump_actor/mod.rs | 49 ++- .../index_resolver/uuid_store.rs | 1 - meilisearch-lib/src/index_controller/mod.rs | 14 - .../src/index_controller/update_file_store.rs | 3 +- .../src/index_controller/updates/status.rs | 4 +- .../index_controller/updates/store/dump.rs | 8 +- 13 files changed, 501 insertions(+), 81 deletions(-) create mode 100644 meilisearch-lib/src/index_controller/dump_actor/loaders/v2.rs create mode 100644 meilisearch-lib/src/index_controller/dump_actor/loaders/v3.rs diff --git a/meilisearch-lib/src/compression.rs b/meilisearch-lib/src/compression.rs index 9bbe9ae00..c4747cb21 100644 --- a/meilisearch-lib/src/compression.rs +++ b/meilisearch-lib/src/compression.rs @@ -1,9 +1,8 @@ -use std::fs::{File, create_dir_all}; +use std::fs::{create_dir_all, File}; use std::io::Write; use std::path::Path; -use flate2::read::GzDecoder; -use flate2::{Compression, write::GzEncoder}; +use flate2::{read::GzDecoder, write::GzEncoder, Compression}; use tar::{Archive, Builder}; pub fn to_tar_gz(src: impl AsRef, dest: impl AsRef) -> anyhow::Result<()> { diff --git a/meilisearch-lib/src/document_formats.rs b/meilisearch-lib/src/document_formats.rs index 334b6f601..0b0431e85 100644 --- a/meilisearch-lib/src/document_formats.rs +++ b/meilisearch-lib/src/document_formats.rs @@ -312,7 +312,7 @@ mod test { let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap(); - assert!(dbg!(csv_iter.next().unwrap()).is_err()); + assert!(csv_iter.next().unwrap().is_err()); } #[test] diff --git a/meilisearch-lib/src/index/dump.rs b/meilisearch-lib/src/index/dump.rs index 4a769f136..a48d9b834 100644 --- a/meilisearch-lib/src/index/dump.rs +++ b/meilisearch-lib/src/index/dump.rs @@ -7,12 +7,10 @@ use heed::{EnvOpenOptions, RoTxn}; use indexmap::IndexMap; use milli::documents::DocumentBatchReader; use serde::{Deserialize, Serialize}; -use serde_json::Value; use crate::document_formats::read_ndjson; use crate::index::update_handler::UpdateHandler; use crate::index::updates::apply_settings_to_builder; -use crate::index_controller::{asc_ranking_rule, desc_ranking_rule}; use super::error::Result; use super::{Index, Settings, Unchecked}; @@ -100,23 +98,11 @@ impl Index { create_dir_all(&dst_dir_path)?; let meta_path = src.as_ref().join(META_FILE_NAME); - let mut meta_file = File::open(meta_path)?; - - // We first deserialize the dump meta into a serde_json::Value and change - // the custom ranking rules settings from the old format to the new format. - let mut meta: Value = serde_json::from_reader(&mut meta_file)?; - if let Some(ranking_rules) = meta.pointer_mut("/settings/rankingRules") { - convert_custom_ranking_rules(ranking_rules); - } - - // Then we serialize it back into a vec to deserialize it - // into a `DumpMeta` struct with the newly patched `rankingRules` format. - let patched_meta = serde_json::to_vec(&meta)?; - + let meta_file = File::open(meta_path)?; let DumpMeta { settings, primary_key, - } = serde_json::from_slice(&patched_meta)?; + } = serde_json::from_reader(meta_file)?; let settings = settings.check(); let mut options = EnvOpenOptions::new(); @@ -164,25 +150,3 @@ impl Index { Ok(()) } } - -/// Converts the ranking rules from the format `asc(_)`, `desc(_)` to the format `_:asc`, `_:desc`. -/// -/// This is done for compatibility reasons, and to avoid a new dump version, -/// since the new syntax was introduced soon after the new dump version. -fn convert_custom_ranking_rules(ranking_rules: &mut Value) { - *ranking_rules = match ranking_rules.take() { - Value::Array(values) => values - .into_iter() - .filter_map(|value| match value { - Value::String(s) if s.starts_with("asc") => asc_ranking_rule(&s) - .map(|f| format!("{}:asc", f)) - .map(Value::String), - Value::String(s) if s.starts_with("desc") => desc_ranking_rule(&s) - .map(|f| format!("{}:desc", f)) - .map(Value::String), - otherwise => Some(otherwise), - }) - .collect(), - otherwise => otherwise, - } -} diff --git a/meilisearch-lib/src/index_controller/dump_actor/loaders/mod.rs b/meilisearch-lib/src/index_controller/dump_actor/loaders/mod.rs index 5b686dd05..a0c4fd721 100644 --- a/meilisearch-lib/src/index_controller/dump_actor/loaders/mod.rs +++ b/meilisearch-lib/src/index_controller/dump_actor/loaders/mod.rs @@ -1,3 +1,19 @@ pub mod v1; +pub mod v2; pub mod v3; +mod compat { + /// Parses the v1 version of the Asc ranking rules `asc(price)`and returns the field name. + pub fn asc_ranking_rule(text: &str) -> Option<&str> { + text.split_once("asc(") + .and_then(|(_, tail)| tail.rsplit_once(")")) + .map(|(field, _)| field) + } + + /// Parses the v1 version of the Desc ranking rules `desc(price)`and returns the field name. + pub fn desc_ranking_rule(text: &str) -> Option<&str> { + text.split_once("desc(") + .and_then(|(_, tail)| tail.rsplit_once(")")) + .map(|(field, _)| field) + } +} diff --git a/meilisearch-lib/src/index_controller/dump_actor/loaders/v1.rs b/meilisearch-lib/src/index_controller/dump_actor/loaders/v1.rs index 303bdd081..63e0c6745 100644 --- a/meilisearch-lib/src/index_controller/dump_actor/loaders/v1.rs +++ b/meilisearch-lib/src/index_controller/dump_actor/loaders/v1.rs @@ -5,7 +5,7 @@ use std::marker::PhantomData; use std::path::Path; use heed::EnvOpenOptions; -use log::{error, info, warn}; +use log::{error, warn}; use milli::documents::DocumentBatchReader; use milli::update::Setting; use serde::{Deserialize, Deserializer, Serialize}; @@ -14,14 +14,15 @@ use uuid::Uuid; use crate::document_formats::read_ndjson; use crate::index::apply_settings_to_builder; use crate::index::update_handler::UpdateHandler; +use crate::index_controller::dump_actor::loaders::compat::{asc_ranking_rule, desc_ranking_rule}; use crate::index_controller::index_resolver::uuid_store::HeedUuidStore; -use crate::index_controller::{self, asc_ranking_rule, desc_ranking_rule, IndexMetadata}; +use crate::index_controller::{self, IndexMetadata}; use crate::{index::Unchecked, options::IndexerOpts}; #[derive(Serialize, Deserialize, Debug)] #[serde(rename_all = "camelCase")] pub struct MetadataV1 { - db_version: String, + pub db_version: String, indexes: Vec, } @@ -33,15 +34,10 @@ impl MetadataV1 { size: usize, indexer_options: &IndexerOpts, ) -> anyhow::Result<()> { - info!( - "Loading dump, dump database version: {}, dump version: V1", - self.db_version - ); - let uuid_store = HeedUuidStore::new(&dst)?; - for index in dbg!(self.indexes) { + for index in self.indexes { let uuid = Uuid::new_v4(); - uuid_store.insert(dbg!(index.uid.clone()), dbg!(uuid))?; + uuid_store.insert(index.uid.clone(), uuid)?; let src = src.as_ref().join(index.uid); load_index( &src, diff --git a/meilisearch-lib/src/index_controller/dump_actor/loaders/v2.rs b/meilisearch-lib/src/index_controller/dump_actor/loaders/v2.rs new file mode 100644 index 000000000..0040d4cea --- /dev/null +++ b/meilisearch-lib/src/index_controller/dump_actor/loaders/v2.rs @@ -0,0 +1,393 @@ +use std::fs::{File, OpenOptions}; +use std::io::Write; +use std::path::{Path, PathBuf}; + +use serde_json::{Deserializer, Value}; +use tempfile::NamedTempFile; +use uuid::Uuid; + +use crate::index_controller::dump_actor::loaders::compat::{asc_ranking_rule, desc_ranking_rule}; +use crate::index_controller::dump_actor::Metadata; +use crate::index_controller::updates::status::{ + Aborted, Enqueued, Failed, Processed, Processing, UpdateResult, UpdateStatus, +}; +use crate::index_controller::updates::store::dump::UpdateEntry; +use crate::index_controller::updates::store::Update; +use crate::options::IndexerOpts; + +use super::v3; + +/// The dump v2 reads the dump folder and patches all the needed file to make it compatible with a +/// dump v3, then calls the dump v3 to actually handle the dump. +pub fn load_dump( + meta: Metadata, + src: impl AsRef, + dst: impl AsRef, + index_db_size: usize, + update_db_size: usize, + indexing_options: &IndexerOpts, +) -> anyhow::Result<()> { + let indexes_path = src.as_ref().join("indexes"); + + let dir_entries = std::fs::read_dir(indexes_path)?; + for entry in dir_entries { + let entry = entry?; + + // rename the index folder + let path = entry.path(); + let new_path = patch_index_uuid_path(&path).expect("invalid index folder."); + + std::fs::rename(path, &new_path)?; + + let settings_path = new_path.join("meta.json"); + + patch_settings(settings_path)?; + } + + let update_path = src.as_ref().join("updates/data.jsonl"); + patch_updates(update_path)?; + + v3::load_dump( + meta, + src, + dst, + index_db_size, + update_db_size, + indexing_options, + ) +} + +fn patch_index_uuid_path(path: &Path) -> Option { + let uuid = path.file_name()?.to_str()?.trim_start_matches("index-"); + let new_path = path.parent()?.join(uuid); + Some(new_path) +} + +fn patch_settings(path: impl AsRef) -> anyhow::Result<()> { + let mut meta_file = File::open(&path)?; + let mut meta: Value = serde_json::from_reader(&mut meta_file)?; + + // We first deserialize the dump meta into a serde_json::Value and change + // the custom ranking rules settings from the old format to the new format. + if let Some(ranking_rules) = meta.pointer_mut("/settings/rankingRules") { + patch_custon_ranking_rules(ranking_rules); + } + + let mut meta_file = OpenOptions::new().truncate(true).write(true).open(path)?; + + serde_json::to_writer(&mut meta_file, &meta)?; + + Ok(()) +} + +fn patch_updates(path: impl AsRef) -> anyhow::Result<()> { + let mut output_update_file = NamedTempFile::new()?; + let update_file = File::open(&path)?; + + let stream = Deserializer::from_reader(update_file).into_iter::(); + + for update in stream { + let update_entry = update?; + + let update_entry = UpdateEntry::from(update_entry); + + serde_json::to_writer(&mut output_update_file, &update_entry)?; + output_update_file.write_all(b"\n")?; + } + + output_update_file.flush()?; + output_update_file.persist(path)?; + + Ok(()) +} + +/// Converts the ranking rules from the format `asc(_)`, `desc(_)` to the format `_:asc`, `_:desc`. +/// +/// This is done for compatibility reasons, and to avoid a new dump version, +/// since the new syntax was introduced soon after the new dump version. +fn patch_custon_ranking_rules(ranking_rules: &mut Value) { + *ranking_rules = match ranking_rules.take() { + Value::Array(values) => values + .into_iter() + .filter_map(|value| match value { + Value::String(s) if s.starts_with("asc") => asc_ranking_rule(&s) + .map(|f| format!("{}:asc", f)) + .map(Value::String), + Value::String(s) if s.starts_with("desc") => desc_ranking_rule(&s) + .map(|f| format!("{}:desc", f)) + .map(Value::String), + otherwise => Some(otherwise), + }) + .collect(), + otherwise => otherwise, + } +} + +impl From for UpdateEntry { + fn from(compat::UpdateEntry { uuid, update }: compat::UpdateEntry) -> Self { + let update = match update { + compat::UpdateStatus::Processing(meta) => UpdateStatus::Processing(meta.into()), + compat::UpdateStatus::Enqueued(meta) => UpdateStatus::Enqueued(meta.into()), + compat::UpdateStatus::Processed(meta) => UpdateStatus::Processed(meta.into()), + compat::UpdateStatus::Aborted(meta) => UpdateStatus::Aborted(meta.into()), + compat::UpdateStatus::Failed(meta) => UpdateStatus::Failed(meta.into()), + }; + + Self { uuid, update } + } +} + +impl From for Failed { + fn from(other: compat::Failed) -> Self { + let compat::Failed { + from, + error, + failed_at, + } = other; + + Self { + from: from.into(), + msg: error.message, + code: compat::error_code_from_str(&error.error_code) + .expect("Invalid update: Invalid error code"), + failed_at, + } + } +} + +impl From for Aborted { + fn from(other: compat::Aborted) -> Self { + let compat::Aborted { from, aborted_at } = other; + + Self { + from: from.into(), + aborted_at, + } + } +} + +impl From for Processing { + fn from(other: compat::Processing) -> Self { + let compat::Processing { + from, + started_processing_at, + } = other; + + Self { + from: from.into(), + started_processing_at, + } + } +} + +impl From for Enqueued { + fn from(other: compat::Enqueued) -> Self { + let compat::Enqueued { + update_id, + meta, + enqueued_at, + content, + } = other; + + let meta = match meta { + compat::UpdateMeta::DocumentsAddition { + method, + primary_key, + .. + } => { + Update::DocumentAddition { + primary_key, + method, + // Just ignore if the uuid is no present. If it is needed later, an error will + // be thrown. + content_uuid: content.unwrap_or_else(Uuid::default), + } + } + compat::UpdateMeta::ClearDocuments => Update::ClearDocuments, + compat::UpdateMeta::DeleteDocuments { ids } => Update::DeleteDocuments(ids), + compat::UpdateMeta::Settings(settings) => Update::Settings(settings), + }; + + Self { + update_id, + meta, + enqueued_at, + } + } +} + +impl From for Processed { + fn from(other: compat::Processed) -> Self { + let compat::Processed { + from, + success, + processed_at, + } = other; + + Self { + success: success.into(), + processed_at, + from: from.into(), + } + } +} + +impl From for UpdateResult { + fn from(other: compat::UpdateResult) -> Self { + match other { + compat::UpdateResult::DocumentsAddition(r) => Self::DocumentsAddition(r), + compat::UpdateResult::DocumentDeletion { deleted } => { + Self::DocumentDeletion { deleted } + } + compat::UpdateResult::Other => Self::Other, + } + } +} + +/// compat structure from pre-dumpv3 meilisearch +mod compat { + use anyhow::bail; + use chrono::{DateTime, Utc}; + use meilisearch_error::Code; + use milli::update::{DocumentAdditionResult, IndexDocumentsMethod}; + use serde::{Deserialize, Serialize}; + use uuid::Uuid; + + use crate::index::{Settings, Unchecked}; + + #[derive(Serialize, Deserialize)] + pub struct UpdateEntry { + pub uuid: Uuid, + pub update: UpdateStatus, + } + + #[derive(Debug, Clone, Serialize, Deserialize)] + pub enum UpdateFormat { + Json, + } + + #[derive(Debug, Clone, Serialize, Deserialize)] + pub enum UpdateResult { + DocumentsAddition(DocumentAdditionResult), + DocumentDeletion { deleted: u64 }, + Other, + } + + #[allow(clippy::large_enum_variant)] + #[derive(Debug, Clone, Serialize, Deserialize)] + #[serde(tag = "type")] + pub enum UpdateMeta { + DocumentsAddition { + method: IndexDocumentsMethod, + format: UpdateFormat, + primary_key: Option, + }, + ClearDocuments, + DeleteDocuments { + ids: Vec, + }, + Settings(Settings), + } + + #[derive(Debug, Serialize, Deserialize, Clone)] + #[serde(rename_all = "camelCase")] + pub struct Enqueued { + pub update_id: u64, + pub meta: UpdateMeta, + pub enqueued_at: DateTime, + pub content: Option, + } + + #[derive(Debug, Serialize, Deserialize, Clone)] + #[serde(rename_all = "camelCase")] + pub struct Processed { + pub success: UpdateResult, + pub processed_at: DateTime, + #[serde(flatten)] + pub from: Processing, + } + + #[derive(Debug, Serialize, Deserialize, Clone)] + #[serde(rename_all = "camelCase")] + pub struct Processing { + #[serde(flatten)] + pub from: Enqueued, + pub started_processing_at: DateTime, + } + + #[derive(Debug, Serialize, Deserialize, Clone)] + #[serde(rename_all = "camelCase")] + pub struct Aborted { + #[serde(flatten)] + pub from: Enqueued, + pub aborted_at: DateTime, + } + + #[derive(Debug, Serialize, Deserialize)] + #[serde(rename_all = "camelCase")] + pub struct Failed { + #[serde(flatten)] + pub from: Processing, + pub error: ResponseError, + pub failed_at: DateTime, + } + + #[derive(Debug, Serialize, Deserialize)] + #[serde(tag = "status", rename_all = "camelCase")] + pub enum UpdateStatus { + Processing(Processing), + Enqueued(Enqueued), + Processed(Processed), + Aborted(Aborted), + Failed(Failed), + } + + type StatusCode = (); + + #[derive(Debug, Serialize, Deserialize, Clone)] + #[serde(rename_all = "camelCase")] + pub struct ResponseError { + #[serde(skip)] + pub code: StatusCode, + pub message: String, + pub error_code: String, + pub error_type: String, + pub error_link: String, + } + + pub fn error_code_from_str(s: &str) -> anyhow::Result { + let code = match s { + "index_creation_failed" => Code::CreateIndex, + "index_already_exists" => Code::IndexAlreadyExists, + "index_not_found" => Code::IndexNotFound, + "invalid_index_uid" => Code::InvalidIndexUid, + "index_not_accessible" => Code::OpenIndex, + "invalid_state" => Code::InvalidState, + "missing_primary_key" => Code::MissingPrimaryKey, + "primary_key_already_present" => Code::PrimaryKeyAlreadyPresent, + "invalid_request" => Code::InvalidRankingRule, + "max_fields_limit_exceeded" => Code::MaxFieldsLimitExceeded, + "missing_document_id" => Code::MissingDocumentId, + "invalid_facet" => Code::Facet, + "invalid_filter" => Code::Filter, + "invalid_sort" => Code::Sort, + "bad_parameter" => Code::BadParameter, + "bad_request" => Code::BadRequest, + "document_not_found" => Code::DocumentNotFound, + "internal" => Code::Internal, + "invalid_geo_field" => Code::InvalidGeoField, + "invalid_token" => Code::InvalidToken, + "missing_authorization_header" => Code::MissingAuthorizationHeader, + "not_found" => Code::NotFound, + "payload_too_large" => Code::PayloadTooLarge, + "unretrievable_document" => Code::RetrieveDocument, + "search_error" => Code::SearchDocuments, + "unsupported_media_type" => Code::UnsupportedMediaType, + "dump_already_in_progress" => Code::DumpAlreadyInProgress, + "dump_process_failed" => Code::DumpProcessFailed, + _ => bail!("unknow error code."), + }; + + Ok(code) + } +} diff --git a/meilisearch-lib/src/index_controller/dump_actor/loaders/v3.rs b/meilisearch-lib/src/index_controller/dump_actor/loaders/v3.rs new file mode 100644 index 000000000..480dd83d4 --- /dev/null +++ b/meilisearch-lib/src/index_controller/dump_actor/loaders/v3.rs @@ -0,0 +1,31 @@ +use std::path::Path; + +use log::info; + +use crate::index_controller::dump_actor::Metadata; +use crate::index_controller::index_resolver::IndexResolver; +use crate::index_controller::update_file_store::UpdateFileStore; +use crate::index_controller::updates::store::UpdateStore; +use crate::options::IndexerOpts; + +pub fn load_dump( + meta: Metadata, + src: impl AsRef, + dst: impl AsRef, + index_db_size: usize, + update_db_size: usize, + indexing_options: &IndexerOpts, +) -> anyhow::Result<()> { + info!( + "Loading dump from {}, dump database version: {}, dump version: V3", + meta.dump_date, meta.db_version + ); + + IndexResolver::load_dump(src.as_ref(), &dst, index_db_size, indexing_options)?; + UpdateFileStore::load_dump(src.as_ref(), &dst)?; + UpdateStore::load_dump(&src, &dst, update_db_size)?; + + info!("Loading indexes."); + + Ok(()) +} diff --git a/meilisearch-lib/src/index_controller/dump_actor/mod.rs b/meilisearch-lib/src/index_controller/dump_actor/mod.rs index 12e819392..0ebedaa09 100644 --- a/meilisearch-lib/src/index_controller/dump_actor/mod.rs +++ b/meilisearch-lib/src/index_controller/dump_actor/mod.rs @@ -17,7 +17,7 @@ use super::index_resolver::HardStateIndexResolver; use super::updates::UpdateSender; use crate::compression::{from_tar_gz, to_tar_gz}; use crate::index_controller::dump_actor::error::DumpActorError; -use crate::index_controller::dump_actor::loaders::v3; +use crate::index_controller::dump_actor::loaders::{v2, v3}; use crate::index_controller::updates::UpdateMsg; use crate::options::IndexerOpts; use error::Result; @@ -48,7 +48,6 @@ impl Metadata { dump_date: Utc::now(), } } - } #[async_trait::async_trait] @@ -75,6 +74,28 @@ impl MetadataVersion { let meta = Metadata::new(index_db_size, update_db_size); Self::V3(meta) } + + pub fn db_version(&self) -> &str { + match self { + Self::V1(meta) => &meta.db_version, + Self::V2(meta) | Self::V3(meta) => &meta.db_version, + } + } + + pub fn version(&self) -> &str { + match self { + MetadataVersion::V1(_) => "V1", + MetadataVersion::V2(_) => "V2", + MetadataVersion::V3(_) => "V3", + } + } + + pub fn dump_date(&self) -> Option<&DateTime> { + match self { + MetadataVersion::V1(_) => None, + MetadataVersion::V2(meta) | MetadataVersion::V3(meta) => Some(&meta.dump_date), + } + } } #[derive(Debug, Serialize, Deserialize, PartialEq, Clone)] @@ -133,7 +154,11 @@ pub fn load_dump( ) -> anyhow::Result<()> { // Setup a temp directory path in the same path as the database, to prevent cross devices // references. - let temp_path = dst_path.as_ref().parent().map(ToOwned::to_owned).unwrap_or_else(|| ".".into()); + let temp_path = dst_path + .as_ref() + .parent() + .map(ToOwned::to_owned) + .unwrap_or_else(|| ".".into()); if cfg!(windows) { std::env::set_var("TMP", temp_path); } else { @@ -151,12 +176,27 @@ pub fn load_dump( let tmp_dst = tempfile::tempdir()?; - println!("temp path: {}", tmp_dst.path().display()); + info!( + "Loading dump {}, dump database version: {}, dump version: {}", + meta.dump_date() + .map(|t| format!("from {}", t)) + .unwrap_or_else(String::new), + meta.db_version(), + meta.version() + ); match meta { MetadataVersion::V1(meta) => { meta.load_dump(&tmp_src_path, tmp_dst.path(), index_db_size, indexer_opts)? } + MetadataVersion::V2(meta) => v2::load_dump( + meta, + &tmp_src_path, + tmp_dst.path(), + index_db_size, + update_db_size, + indexer_opts, + )?, MetadataVersion::V3(meta) => v3::load_dump( meta, &tmp_src_path, @@ -165,7 +205,6 @@ pub fn load_dump( update_db_size, indexer_opts, )?, - MetadataVersion::V2(_) => todo!(), } // Persist and atomically rename the db let persisted_dump = tmp_dst.into_path(); diff --git a/meilisearch-lib/src/index_controller/index_resolver/uuid_store.rs b/meilisearch-lib/src/index_controller/index_resolver/uuid_store.rs index 3e582944d..f8bde7270 100644 --- a/meilisearch-lib/src/index_controller/index_resolver/uuid_store.rs +++ b/meilisearch-lib/src/index_controller/index_resolver/uuid_store.rs @@ -173,7 +173,6 @@ impl HeedUuidStore { Ok(0) => break, Ok(_) => { let DumpEntry { uuid, uid } = serde_json::from_str(&line)?; - println!("importing {} {}", uid, uuid); db.db.put(&mut txn, &uid, uuid.as_bytes())?; } Err(e) => return Err(e.into()), diff --git a/meilisearch-lib/src/index_controller/mod.rs b/meilisearch-lib/src/index_controller/mod.rs index 52b2b1d01..3ca1d3f3f 100644 --- a/meilisearch-lib/src/index_controller/mod.rs +++ b/meilisearch-lib/src/index_controller/mod.rs @@ -488,17 +488,3 @@ pub async fn get_arc_ownership_blocking(mut item: Arc) -> T { } } } - -/// Parses the v1 version of the Asc ranking rules `asc(price)`and returns the field name. -pub fn asc_ranking_rule(text: &str) -> Option<&str> { - text.split_once("asc(") - .and_then(|(_, tail)| tail.rsplit_once(")")) - .map(|(field, _)| field) -} - -/// Parses the v1 version of the Desc ranking rules `desc(price)`and returns the field name. -pub fn desc_ranking_rule(text: &str) -> Option<&str> { - text.split_once("desc(") - .and_then(|(_, tail)| tail.rsplit_once(")")) - .map(|(field, _)| field) -} diff --git a/meilisearch-lib/src/index_controller/update_file_store.rs b/meilisearch-lib/src/index_controller/update_file_store.rs index 82ba0bffb..5ee93c58a 100644 --- a/meilisearch-lib/src/index_controller/update_file_store.rs +++ b/meilisearch-lib/src/index_controller/update_file_store.rs @@ -75,12 +75,11 @@ impl UpdateFileStore { // No update files to load if !src_update_files_path.exists() { - return Ok(()) + return Ok(()); } create_dir_all(&dst_update_files_path)?; - println!("src_update file: {}", src_update_files_path.display()); let entries = std::fs::read_dir(src_update_files_path)?; for entry in entries { diff --git a/meilisearch-lib/src/index_controller/updates/status.rs b/meilisearch-lib/src/index_controller/updates/status.rs index e7f82b343..df222d257 100644 --- a/meilisearch-lib/src/index_controller/updates/status.rs +++ b/meilisearch-lib/src/index_controller/updates/status.rs @@ -133,8 +133,8 @@ impl Processing { #[serde(rename_all = "camelCase")] pub struct Aborted { #[serde(flatten)] - from: Enqueued, - aborted_at: DateTime, + pub from: Enqueued, + pub aborted_at: DateTime, } impl Aborted { diff --git a/meilisearch-lib/src/index_controller/updates/store/dump.rs b/meilisearch-lib/src/index_controller/updates/store/dump.rs index 298217885..cec5431a8 100644 --- a/meilisearch-lib/src/index_controller/updates/store/dump.rs +++ b/meilisearch-lib/src/index_controller/updates/store/dump.rs @@ -21,9 +21,9 @@ use crate::{ }; #[derive(Serialize, Deserialize)] -struct UpdateEntry { - uuid: Uuid, - update: UpdateStatus, +pub struct UpdateEntry { + pub uuid: Uuid, + pub update: UpdateStatus, } impl UpdateStore { @@ -130,8 +130,6 @@ impl UpdateStore { dst: impl AsRef, db_size: usize, ) -> anyhow::Result<()> { - println!("target path: {}", dst.as_ref().display()); - let mut options = EnvOpenOptions::new(); options.map_size(db_size as usize);