diff --git a/dump/src/reader/loaders/mod.rs b/dump/src/reader/loaders/mod.rs deleted file mode 100644 index 199b20c02..000000000 --- a/dump/src/reader/loaders/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -pub mod v2; -pub mod v3; -pub mod v4; -pub mod v5; diff --git a/dump/src/reader/loaders/v1.rs b/dump/src/reader/loaders/v1.rs deleted file mode 100644 index 5c015b96a..000000000 --- a/dump/src/reader/loaders/v1.rs +++ /dev/null @@ -1,43 +0,0 @@ -use std::path::Path; - -use serde::{Deserialize, Serialize}; - -use crate::index_controller::IndexMetadata; - -#[derive(Debug, Deserialize)] -#[serde(rename_all = "camelCase")] -struct Metadata { - indexes: Vec, - db_version: String, - dump_version: crate::Version, -} - -#[derive(Debug, Deserialize)] -#[serde(rename_all = "camelCase")] -struct Index { - pub name: String, - pub uid: String, - #[serde(with = "time::serde::rfc3339")] - created_at: OffsetDateTime, - #[serde(with = "time::serde::rfc3339")] - updated_at: OffsetDateTime, - pub primary_key: Option, -} - -pub struct V1Reader { - dump: TempDir, - metadata: Metadata, -} - -impl Reader { - pub fn open(dump: &TempDir) -> Result { - let mut meta_file = File::open(path.path().join("metadata.json"))?; - let metadata = serde_json::from_reader(&mut meta_file)?; - - Ok(Reader { dump, metadata }) - } - - pub fn date(&self) -> Result> { - Ok(None) - } -} diff --git a/dump/src/reader/loaders/v2.rs b/dump/src/reader/loaders/v2.rs deleted file mode 100644 index 5926de931..000000000 --- a/dump/src/reader/loaders/v2.rs +++ /dev/null @@ -1,216 +0,0 @@ -use std::fs::{File, OpenOptions}; -use std::io::Write; -use std::path::{Path, PathBuf}; - -use serde_json::{Deserializer, Value}; -use tempfile::NamedTempFile; - -use crate::dump::compat::{self, v2, v3}; -use crate::dump::Metadata; -use crate::options::IndexerOpts; - -/// The dump v2 reads the dump folder and patches all the needed file to make it compatible with a -/// dump v3, then calls the dump v3 to actually handle the dump. -pub fn load_dump( - meta: Metadata, - src: impl AsRef, - dst: impl AsRef, - index_db_size: usize, - update_db_size: usize, - indexing_options: &IndexerOpts, -) -> anyhow::Result<()> { - log::info!("Patching dump V2 to dump V3..."); - let indexes_path = src.as_ref().join("indexes"); - - let dir_entries = std::fs::read_dir(indexes_path)?; - for entry in dir_entries { - let entry = entry?; - - // rename the index folder - let path = entry.path(); - let new_path = patch_index_uuid_path(&path).expect("invalid index folder."); - - std::fs::rename(path, &new_path)?; - - let settings_path = new_path.join("meta.json"); - - patch_settings(settings_path)?; - } - - let update_dir = src.as_ref().join("updates"); - let update_path = update_dir.join("data.jsonl"); - patch_updates(update_dir, update_path)?; - - super::v3::load_dump( - meta, - src, - dst, - index_db_size, - update_db_size, - indexing_options, - ) -} - -fn patch_index_uuid_path(path: &Path) -> Option { - let uuid = path.file_name()?.to_str()?.trim_start_matches("index-"); - let new_path = path.parent()?.join(uuid); - Some(new_path) -} - -fn patch_settings(path: impl AsRef) -> anyhow::Result<()> { - let mut meta_file = File::open(&path)?; - let mut meta: Value = serde_json::from_reader(&mut meta_file)?; - - // We first deserialize the dump meta into a serde_json::Value and change - // the custom ranking rules settings from the old format to the new format. - if let Some(ranking_rules) = meta.pointer_mut("/settings/rankingRules") { - patch_custom_ranking_rules(ranking_rules); - } - - let mut meta_file = OpenOptions::new().truncate(true).write(true).open(path)?; - - serde_json::to_writer(&mut meta_file, &meta)?; - - Ok(()) -} - -fn patch_updates(dir: impl AsRef, path: impl AsRef) -> anyhow::Result<()> { - let mut output_update_file = NamedTempFile::new_in(&dir)?; - let update_file = File::open(&path)?; - - let stream = Deserializer::from_reader(update_file).into_iter::(); - - for update in stream { - let update_entry = update?; - - let update_entry = v3::UpdateEntry::from(update_entry); - - serde_json::to_writer(&mut output_update_file, &update_entry)?; - output_update_file.write_all(b"\n")?; - } - - output_update_file.flush()?; - output_update_file.persist(path)?; - - Ok(()) -} - -/// Converts the ranking rules from the format `asc(_)`, `desc(_)` to the format `_:asc`, `_:desc`. -/// -/// This is done for compatibility reasons, and to avoid a new dump version, -/// since the new syntax was introduced soon after the new dump version. -fn patch_custom_ranking_rules(ranking_rules: &mut Value) { - *ranking_rules = match ranking_rules.take() { - Value::Array(values) => values - .into_iter() - .filter_map(|value| match value { - Value::String(s) if s.starts_with("asc") => compat::asc_ranking_rule(&s) - .map(|f| format!("{}:asc", f)) - .map(Value::String), - Value::String(s) if s.starts_with("desc") => compat::desc_ranking_rule(&s) - .map(|f| format!("{}:desc", f)) - .map(Value::String), - otherwise => Some(otherwise), - }) - .collect(), - otherwise => otherwise, - } -} - -impl From for v3::UpdateEntry { - fn from(v2::UpdateEntry { uuid, update }: v2::UpdateEntry) -> Self { - let update = match update { - v2::UpdateStatus::Processing(meta) => v3::UpdateStatus::Processing(meta.into()), - v2::UpdateStatus::Enqueued(meta) => v3::UpdateStatus::Enqueued(meta.into()), - v2::UpdateStatus::Processed(meta) => v3::UpdateStatus::Processed(meta.into()), - v2::UpdateStatus::Aborted(_) => unreachable!("Updates could never be aborted."), - v2::UpdateStatus::Failed(meta) => v3::UpdateStatus::Failed(meta.into()), - }; - - Self { uuid, update } - } -} - -impl From for v3::Failed { - fn from(other: v2::Failed) -> Self { - let v2::Failed { - from, - error, - failed_at, - } = other; - - Self { - from: from.into(), - msg: error.message, - code: v2::error_code_from_str(&error.error_code) - .expect("Invalid update: Invalid error code"), - failed_at, - } - } -} - -impl From for v3::Processing { - fn from(other: v2::Processing) -> Self { - let v2::Processing { - from, - started_processing_at, - } = other; - - Self { - from: from.into(), - started_processing_at, - } - } -} - -impl From for v3::Enqueued { - fn from(other: v2::Enqueued) -> Self { - let v2::Enqueued { - update_id, - meta, - enqueued_at, - content, - } = other; - - let meta = match meta { - v2::UpdateMeta::DocumentsAddition { - method, - primary_key, - .. - } => { - v3::Update::DocumentAddition { - primary_key, - method, - // Just ignore if the uuid is no present. If it is needed later, an error will - // be thrown. - content_uuid: content.unwrap_or_default(), - } - } - v2::UpdateMeta::ClearDocuments => v3::Update::ClearDocuments, - v2::UpdateMeta::DeleteDocuments { ids } => v3::Update::DeleteDocuments(ids), - v2::UpdateMeta::Settings(settings) => v3::Update::Settings(settings), - }; - - Self { - update_id, - meta, - enqueued_at, - } - } -} - -impl From for v3::Processed { - fn from(other: v2::Processed) -> Self { - let v2::Processed { - from, - success, - processed_at, - } = other; - - Self { - success, - processed_at, - from: from.into(), - } - } -} diff --git a/dump/src/reader/loaders/v3.rs b/dump/src/reader/loaders/v3.rs deleted file mode 100644 index 44984c946..000000000 --- a/dump/src/reader/loaders/v3.rs +++ /dev/null @@ -1,136 +0,0 @@ -use std::collections::HashMap; -use std::fs::{self, File}; -use std::io::{BufReader, BufWriter, Write}; -use std::path::Path; - -use anyhow::Context; -use fs_extra::dir::{self, CopyOptions}; -use log::info; -use tempfile::tempdir; -use uuid::Uuid; - -use crate::dump::compat::{self, v3}; -use crate::dump::Metadata; -use crate::index_resolver::meta_store::{DumpEntry, IndexMeta}; -use crate::options::IndexerOpts; -use crate::tasks::task::TaskId; - -/// dump structure for V3: -/// . -/// ├── indexes -/// │   └── 25f10bb8-6ea8-42f0-bd48-ad5857f77648 -/// │   ├── documents.jsonl -/// │   └── meta.json -/// ├── index_uuids -/// │   └── data.jsonl -/// ├── metadata.json -/// └── updates -/// └── data.jsonl - -pub fn load_dump( - meta: Metadata, - src: impl AsRef, - dst: impl AsRef, - index_db_size: usize, - meta_env_size: usize, - indexing_options: &IndexerOpts, -) -> anyhow::Result<()> { - info!("Patching dump V3 to dump V4..."); - - let patched_dir = tempdir()?; - - let options = CopyOptions::default(); - dir::copy(src.as_ref().join("indexes"), patched_dir.path(), &options)?; - dir::copy( - src.as_ref().join("index_uuids"), - patched_dir.path(), - &options, - )?; - - let uuid_map = patch_index_meta( - src.as_ref().join("index_uuids/data.jsonl"), - patched_dir.path(), - )?; - - fs::copy( - src.as_ref().join("metadata.json"), - patched_dir.path().join("metadata.json"), - )?; - - patch_updates(&src, patched_dir.path(), uuid_map)?; - - super::v4::load_dump( - meta, - patched_dir.path(), - dst, - index_db_size, - meta_env_size, - indexing_options, - ) -} - -fn patch_index_meta( - path: impl AsRef, - dst: impl AsRef, -) -> anyhow::Result> { - let file = BufReader::new(File::open(path)?); - let dst = dst.as_ref().join("index_uuids"); - fs::create_dir_all(&dst)?; - let mut dst_file = File::create(dst.join("data.jsonl"))?; - - let map = serde_json::Deserializer::from_reader(file) - .into_iter::() - .try_fold(HashMap::new(), |mut map, entry| -> anyhow::Result<_> { - let entry = entry?; - map.insert(entry.uuid, entry.uid.clone()); - let meta = IndexMeta { - uuid: entry.uuid, - // This is lost information, we patch it to 0; - creation_task_id: 0, - }; - let entry = DumpEntry { - uid: entry.uid, - index_meta: meta, - }; - serde_json::to_writer(&mut dst_file, &entry)?; - dst_file.write_all(b"\n")?; - Ok(map) - })?; - - dst_file.flush()?; - - Ok(map) -} - -fn patch_updates( - src: impl AsRef, - dst: impl AsRef, - uuid_map: HashMap, -) -> anyhow::Result<()> { - let dst = dst.as_ref().join("updates"); - fs::create_dir_all(&dst)?; - - let mut dst_file = BufWriter::new(File::create(dst.join("data.jsonl"))?); - let src_file = BufReader::new(File::open(src.as_ref().join("updates/data.jsonl"))?); - - serde_json::Deserializer::from_reader(src_file) - .into_iter::() - .enumerate() - .try_for_each(|(task_id, entry)| -> anyhow::Result<()> { - let entry = entry?; - let name = uuid_map - .get(&entry.uuid) - .with_context(|| format!("Unknown index uuid: {}", entry.uuid))? - .clone(); - serde_json::to_writer( - &mut dst_file, - &compat::v4::Task::from((entry.update, name, task_id as TaskId)), - )?; - dst_file.write_all(b"\n")?; - Ok(()) - })?; - - dst_file.flush()?; - - Ok(()) -} diff --git a/dump/src/reader/loaders/v4.rs b/dump/src/reader/loaders/v4.rs deleted file mode 100644 index 0744df7ea..000000000 --- a/dump/src/reader/loaders/v4.rs +++ /dev/null @@ -1,103 +0,0 @@ -use std::fs::{self, create_dir_all, File}; -use std::io::{BufReader, Write}; -use std::path::Path; - -use fs_extra::dir::{self, CopyOptions}; -use log::info; -use serde_json::{Deserializer, Map, Value}; -use tempfile::tempdir; -use uuid::Uuid; - -use crate::dump::{compat, Metadata}; -use crate::options::IndexerOpts; -use crate::tasks::task::Task; - -pub fn load_dump( - meta: Metadata, - src: impl AsRef, - dst: impl AsRef, - index_db_size: usize, - meta_env_size: usize, - indexing_options: &IndexerOpts, -) -> anyhow::Result<()> { - info!("Patching dump V4 to dump V5..."); - - let patched_dir = tempdir()?; - let options = CopyOptions::default(); - - // Indexes - dir::copy(src.as_ref().join("indexes"), &patched_dir, &options)?; - - // Index uuids - dir::copy(src.as_ref().join("index_uuids"), &patched_dir, &options)?; - - // Metadata - fs::copy( - src.as_ref().join("metadata.json"), - patched_dir.path().join("metadata.json"), - )?; - - // Updates - patch_updates(&src, &patched_dir)?; - - // Keys - patch_keys(&src, &patched_dir)?; - - super::v5::load_dump( - meta, - &patched_dir, - dst, - index_db_size, - meta_env_size, - indexing_options, - ) -} - -fn patch_updates(src: impl AsRef, dst: impl AsRef) -> anyhow::Result<()> { - let updates_path = src.as_ref().join("updates/data.jsonl"); - let output_updates_path = dst.as_ref().join("updates/data.jsonl"); - create_dir_all(output_updates_path.parent().unwrap())?; - let udpates_file = File::open(updates_path)?; - let mut output_update_file = File::create(output_updates_path)?; - - serde_json::Deserializer::from_reader(udpates_file) - .into_iter::() - .try_for_each(|task| -> anyhow::Result<()> { - let task: Task = task?.into(); - - serde_json::to_writer(&mut output_update_file, &task)?; - output_update_file.write_all(b"\n")?; - - Ok(()) - })?; - - output_update_file.flush()?; - - Ok(()) -} - -fn patch_keys(src: impl AsRef, dst: impl AsRef) -> anyhow::Result<()> { - let keys_file_src = src.as_ref().join("keys"); - - if !keys_file_src.exists() { - return Ok(()); - } - - fs::create_dir_all(&dst)?; - let keys_file_dst = dst.as_ref().join("keys"); - let mut writer = File::create(&keys_file_dst)?; - - let reader = BufReader::new(File::open(&keys_file_src)?); - for key in Deserializer::from_reader(reader).into_iter() { - let mut key: Map = key?; - - // generate a new uuid v4 and insert it in the key. - let uid = serde_json::to_value(Uuid::new_v4()).unwrap(); - key.insert("uid".to_string(), uid); - - serde_json::to_writer(&mut writer, &key)?; - writer.write_all(b"\n")?; - } - - Ok(()) -}