diff --git a/dump/src/reader/compat/mod.rs b/dump/src/reader/compat/mod.rs new file mode 100644 index 000000000..9abac24c7 --- /dev/null +++ b/dump/src/reader/compat/mod.rs @@ -0,0 +1,17 @@ +pub mod v2; +pub mod v3; +pub mod v4; + +/// Parses the v1 version of the Asc ranking rules `asc(price)`and returns the field name. +pub fn asc_ranking_rule(text: &str) -> Option<&str> { + text.split_once("asc(") + .and_then(|(_, tail)| tail.rsplit_once(')')) + .map(|(field, _)| field) +} + +/// Parses the v1 version of the Desc ranking rules `desc(price)`and returns the field name. +pub fn desc_ranking_rule(text: &str) -> Option<&str> { + text.split_once("desc(") + .and_then(|(_, tail)| tail.rsplit_once(')')) + .map(|(field, _)| field) +} diff --git a/dump/src/reader/compat/v2.rs b/dump/src/reader/compat/v2.rs new file mode 100644 index 000000000..364d894c4 --- /dev/null +++ b/dump/src/reader/compat/v2.rs @@ -0,0 +1,152 @@ +use anyhow::bail; +use meilisearch_types::error::Code; +use milli::update::IndexDocumentsMethod; +use serde::{Deserialize, Serialize}; +use time::OffsetDateTime; +use uuid::Uuid; + +use crate::index::{Settings, Unchecked}; + +#[derive(Serialize, Deserialize)] +pub struct UpdateEntry { + pub uuid: Uuid, + pub update: UpdateStatus, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum UpdateFormat { + Json, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct DocumentAdditionResult { + pub nb_documents: usize, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum UpdateResult { + DocumentsAddition(DocumentAdditionResult), + DocumentDeletion { deleted: u64 }, + Other, +} + +#[allow(clippy::large_enum_variant)] +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type")] +pub enum UpdateMeta { + DocumentsAddition { + method: IndexDocumentsMethod, + format: UpdateFormat, + primary_key: Option, + }, + ClearDocuments, + DeleteDocuments { + ids: Vec, + }, + Settings(Settings), +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct Enqueued { + pub update_id: u64, + pub meta: UpdateMeta, + #[serde(with = "time::serde::rfc3339")] + pub enqueued_at: OffsetDateTime, + pub content: Option, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct Processed { + pub success: UpdateResult, + #[serde(with = "time::serde::rfc3339")] + pub processed_at: OffsetDateTime, + #[serde(flatten)] + pub from: Processing, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct Processing { + #[serde(flatten)] + pub from: Enqueued, + #[serde(with = "time::serde::rfc3339")] + pub started_processing_at: OffsetDateTime, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct Aborted { + #[serde(flatten)] + pub from: Enqueued, + #[serde(with = "time::serde::rfc3339")] + pub aborted_at: OffsetDateTime, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Failed { + #[serde(flatten)] + pub from: Processing, + pub error: ResponseError, + #[serde(with = "time::serde::rfc3339")] + pub failed_at: OffsetDateTime, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(tag = "status", rename_all = "camelCase")] +pub enum UpdateStatus { + Processing(Processing), + Enqueued(Enqueued), + Processed(Processed), + Aborted(Aborted), + Failed(Failed), +} + +type StatusCode = (); + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct ResponseError { + #[serde(skip)] + pub code: StatusCode, + pub message: String, + pub error_code: String, + pub error_type: String, + pub error_link: String, +} + +pub fn error_code_from_str(s: &str) -> anyhow::Result { + let code = match s { + "index_creation_failed" => Code::CreateIndex, + "index_already_exists" => Code::IndexAlreadyExists, + "index_not_found" => Code::IndexNotFound, + "invalid_index_uid" => Code::InvalidIndexUid, + "invalid_state" => Code::InvalidState, + "missing_primary_key" => Code::MissingPrimaryKey, + "primary_key_already_present" => Code::PrimaryKeyAlreadyPresent, + "invalid_request" => Code::InvalidRankingRule, + "max_fields_limit_exceeded" => Code::MaxFieldsLimitExceeded, + "missing_document_id" => Code::MissingDocumentId, + "invalid_facet" => Code::Filter, + "invalid_filter" => Code::Filter, + "invalid_sort" => Code::Sort, + "bad_parameter" => Code::BadParameter, + "bad_request" => Code::BadRequest, + "document_not_found" => Code::DocumentNotFound, + "internal" => Code::Internal, + "invalid_geo_field" => Code::InvalidGeoField, + "invalid_token" => Code::InvalidToken, + "missing_authorization_header" => Code::MissingAuthorizationHeader, + "payload_too_large" => Code::PayloadTooLarge, + "unretrievable_document" => Code::RetrieveDocument, + "search_error" => Code::SearchDocuments, + "unsupported_media_type" => Code::UnsupportedMediaType, + "dump_already_in_progress" => Code::DumpAlreadyInProgress, + "dump_process_failed" => Code::DumpProcessFailed, + _ => bail!("unknow error code."), + }; + + Ok(code) +} diff --git a/dump/src/reader/compat/v3.rs b/dump/src/reader/compat/v3.rs new file mode 100644 index 000000000..61e31eccd --- /dev/null +++ b/dump/src/reader/compat/v3.rs @@ -0,0 +1,205 @@ +use meilisearch_types::error::{Code, ResponseError}; +use meilisearch_types::index_uid::IndexUid; +use milli::update::IndexDocumentsMethod; +use serde::{Deserialize, Serialize}; +use time::OffsetDateTime; +use uuid::Uuid; + +use super::v4::{Task, TaskContent, TaskEvent}; +use crate::index::{Settings, Unchecked}; +use crate::tasks::task::{DocumentDeletion, TaskId, TaskResult}; + +use super::v2; + +#[derive(Serialize, Deserialize)] +pub struct DumpEntry { + pub uuid: Uuid, + pub uid: String, +} + +#[derive(Serialize, Deserialize)] +pub struct UpdateEntry { + pub uuid: Uuid, + pub update: UpdateStatus, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(tag = "status", rename_all = "camelCase")] +pub enum UpdateStatus { + Processing(Processing), + Enqueued(Enqueued), + Processed(Processed), + Failed(Failed), +} + +impl From for TaskResult { + fn from(other: v2::UpdateResult) -> Self { + match other { + v2::UpdateResult::DocumentsAddition(result) => TaskResult::DocumentAddition { + indexed_documents: result.nb_documents as u64, + }, + v2::UpdateResult::DocumentDeletion { deleted } => TaskResult::DocumentDeletion { + deleted_documents: deleted, + }, + v2::UpdateResult::Other => TaskResult::Other, + } + } +} + +#[allow(clippy::large_enum_variant)] +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum Update { + DeleteDocuments(Vec), + DocumentAddition { + primary_key: Option, + method: IndexDocumentsMethod, + content_uuid: Uuid, + }, + Settings(Settings), + ClearDocuments, +} + +impl From for super::v4::TaskContent { + fn from(update: Update) -> Self { + match update { + Update::DeleteDocuments(ids) => { + TaskContent::DocumentDeletion(DocumentDeletion::Ids(ids)) + } + Update::DocumentAddition { + primary_key, + method, + .. + } => TaskContent::DocumentAddition { + content_uuid: Uuid::default(), + merge_strategy: method, + primary_key, + // document count is unknown for legacy updates + documents_count: 0, + allow_index_creation: true, + }, + Update::Settings(settings) => TaskContent::SettingsUpdate { + settings, + // There is no way to know now, so we assume it isn't + is_deletion: false, + allow_index_creation: true, + }, + Update::ClearDocuments => TaskContent::DocumentDeletion(DocumentDeletion::Clear), + } + } +} + +#[allow(clippy::large_enum_variant)] +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type")] +pub enum UpdateMeta { + DocumentsAddition { + method: IndexDocumentsMethod, + primary_key: Option, + }, + ClearDocuments, + DeleteDocuments { + ids: Vec, + }, + Settings(Settings), +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct Enqueued { + pub update_id: u64, + pub meta: Update, + #[serde(with = "time::serde::rfc3339")] + pub enqueued_at: OffsetDateTime, +} + +impl Enqueued { + fn update_task(self, task: &mut Task) { + // we do not erase the `TaskId` that was given to us. + task.content = self.meta.into(); + task.events.push(TaskEvent::Created(self.enqueued_at)); + } +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct Processed { + pub success: v2::UpdateResult, + #[serde(with = "time::serde::rfc3339")] + pub processed_at: OffsetDateTime, + #[serde(flatten)] + pub from: Processing, +} + +impl Processed { + fn update_task(self, task: &mut Task) { + self.from.update_task(task); + + let event = TaskEvent::Succeded { + result: TaskResult::from(self.success), + timestamp: self.processed_at, + }; + task.events.push(event); + } +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct Processing { + #[serde(flatten)] + pub from: Enqueued, + #[serde(with = "time::serde::rfc3339")] + pub started_processing_at: OffsetDateTime, +} + +impl Processing { + fn update_task(self, task: &mut Task) { + self.from.update_task(task); + + let event = TaskEvent::Processing(self.started_processing_at); + task.events.push(event); + } +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Failed { + #[serde(flatten)] + pub from: Processing, + pub msg: String, + pub code: Code, + #[serde(with = "time::serde::rfc3339")] + pub failed_at: OffsetDateTime, +} + +impl Failed { + fn update_task(self, task: &mut Task) { + self.from.update_task(task); + + let event = TaskEvent::Failed { + error: ResponseError::from_msg(self.msg, self.code), + timestamp: self.failed_at, + }; + task.events.push(event); + } +} + +impl From<(UpdateStatus, String, TaskId)> for Task { + fn from((update, uid, task_id): (UpdateStatus, String, TaskId)) -> Self { + // Dummy task + let mut task = super::v4::Task { + id: task_id, + index_uid: IndexUid::new_unchecked(uid), + content: super::v4::TaskContent::IndexDeletion, + events: Vec::new(), + }; + + match update { + UpdateStatus::Processing(u) => u.update_task(&mut task), + UpdateStatus::Enqueued(u) => u.update_task(&mut task), + UpdateStatus::Processed(u) => u.update_task(&mut task), + UpdateStatus::Failed(u) => u.update_task(&mut task), + } + + task + } +} diff --git a/dump/src/reader/compat/v4.rs b/dump/src/reader/compat/v4.rs new file mode 100644 index 000000000..c412e7f17 --- /dev/null +++ b/dump/src/reader/compat/v4.rs @@ -0,0 +1,145 @@ +use meilisearch_types::error::ResponseError; +use meilisearch_types::index_uid::IndexUid; +use milli::update::IndexDocumentsMethod; +use serde::{Deserialize, Serialize}; +use time::OffsetDateTime; +use uuid::Uuid; + +use crate::index::{Settings, Unchecked}; +use crate::tasks::batch::BatchId; +use crate::tasks::task::{ + DocumentDeletion, TaskContent as NewTaskContent, TaskEvent as NewTaskEvent, TaskId, TaskResult, +}; + +#[derive(Debug, Serialize, Deserialize)] +pub struct Task { + pub id: TaskId, + pub index_uid: IndexUid, + pub content: TaskContent, + pub events: Vec, +} + +impl From for crate::tasks::task::Task { + fn from(other: Task) -> Self { + Self { + id: other.id, + content: NewTaskContent::from((other.index_uid, other.content)), + events: other.events.into_iter().map(Into::into).collect(), + } + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub enum TaskEvent { + Created(#[serde(with = "time::serde::rfc3339")] OffsetDateTime), + Batched { + #[serde(with = "time::serde::rfc3339")] + timestamp: OffsetDateTime, + batch_id: BatchId, + }, + Processing(#[serde(with = "time::serde::rfc3339")] OffsetDateTime), + Succeded { + result: TaskResult, + #[serde(with = "time::serde::rfc3339")] + timestamp: OffsetDateTime, + }, + Failed { + error: ResponseError, + #[serde(with = "time::serde::rfc3339")] + timestamp: OffsetDateTime, + }, +} + +impl From for NewTaskEvent { + fn from(other: TaskEvent) -> Self { + match other { + TaskEvent::Created(x) => NewTaskEvent::Created(x), + TaskEvent::Batched { + timestamp, + batch_id, + } => NewTaskEvent::Batched { + timestamp, + batch_id, + }, + TaskEvent::Processing(x) => NewTaskEvent::Processing(x), + TaskEvent::Succeded { result, timestamp } => { + NewTaskEvent::Succeeded { result, timestamp } + } + TaskEvent::Failed { error, timestamp } => NewTaskEvent::Failed { error, timestamp }, + } + } +} + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +#[allow(clippy::large_enum_variant)] +pub enum TaskContent { + DocumentAddition { + content_uuid: Uuid, + merge_strategy: IndexDocumentsMethod, + primary_key: Option, + documents_count: usize, + allow_index_creation: bool, + }, + DocumentDeletion(DocumentDeletion), + SettingsUpdate { + settings: Settings, + /// Indicates whether the task was a deletion + is_deletion: bool, + allow_index_creation: bool, + }, + IndexDeletion, + IndexCreation { + primary_key: Option, + }, + IndexUpdate { + primary_key: Option, + }, + Dump { + uid: String, + }, +} + +impl From<(IndexUid, TaskContent)> for NewTaskContent { + fn from((index_uid, content): (IndexUid, TaskContent)) -> Self { + match content { + TaskContent::DocumentAddition { + content_uuid, + merge_strategy, + primary_key, + documents_count, + allow_index_creation, + } => NewTaskContent::DocumentAddition { + index_uid, + content_uuid, + merge_strategy, + primary_key, + documents_count, + allow_index_creation, + }, + TaskContent::DocumentDeletion(deletion) => NewTaskContent::DocumentDeletion { + index_uid, + deletion, + }, + TaskContent::SettingsUpdate { + settings, + is_deletion, + allow_index_creation, + } => NewTaskContent::SettingsUpdate { + index_uid, + settings, + is_deletion, + allow_index_creation, + }, + TaskContent::IndexDeletion => NewTaskContent::IndexDeletion { index_uid }, + TaskContent::IndexCreation { primary_key } => NewTaskContent::IndexCreation { + index_uid, + primary_key, + }, + TaskContent::IndexUpdate { primary_key } => NewTaskContent::IndexUpdate { + index_uid, + primary_key, + }, + TaskContent::Dump { uid } => NewTaskContent::Dump { uid }, + } + } +} diff --git a/dump/src/reader/error.rs b/dump/src/reader/error.rs new file mode 100644 index 000000000..679fa2bc2 --- /dev/null +++ b/dump/src/reader/error.rs @@ -0,0 +1,42 @@ +use meilisearch_auth::error::AuthControllerError; +use meilisearch_types::error::{Code, ErrorCode}; +use meilisearch_types::internal_error; + +use crate::{index_resolver::error::IndexResolverError, tasks::error::TaskError}; + +pub type Result = std::result::Result; + +#[derive(thiserror::Error, Debug)] +pub enum DumpError { + #[error("An internal error has occurred. `{0}`.")] + Internal(Box), + #[error("{0}")] + IndexResolver(Box), +} + +internal_error!( + DumpError: milli::heed::Error, + std::io::Error, + tokio::task::JoinError, + tokio::sync::oneshot::error::RecvError, + serde_json::error::Error, + tempfile::PersistError, + fs_extra::error::Error, + AuthControllerError, + TaskError +); + +impl From for DumpError { + fn from(e: IndexResolverError) -> Self { + Self::IndexResolver(Box::new(e)) + } +} + +impl ErrorCode for DumpError { + fn error_code(&self) -> Code { + match self { + DumpError::Internal(_) => Code::Internal, + DumpError::IndexResolver(e) => e.error_code(), + } + } +} diff --git a/dump/src/reader/loaders/mod.rs b/dump/src/reader/loaders/mod.rs new file mode 100644 index 000000000..199b20c02 --- /dev/null +++ b/dump/src/reader/loaders/mod.rs @@ -0,0 +1,4 @@ +pub mod v2; +pub mod v3; +pub mod v4; +pub mod v5; diff --git a/dump/src/reader/loaders/v1.rs b/dump/src/reader/loaders/v1.rs new file mode 100644 index 000000000..5c015b96a --- /dev/null +++ b/dump/src/reader/loaders/v1.rs @@ -0,0 +1,43 @@ +use std::path::Path; + +use serde::{Deserialize, Serialize}; + +use crate::index_controller::IndexMetadata; + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +struct Metadata { + indexes: Vec, + db_version: String, + dump_version: crate::Version, +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +struct Index { + pub name: String, + pub uid: String, + #[serde(with = "time::serde::rfc3339")] + created_at: OffsetDateTime, + #[serde(with = "time::serde::rfc3339")] + updated_at: OffsetDateTime, + pub primary_key: Option, +} + +pub struct V1Reader { + dump: TempDir, + metadata: Metadata, +} + +impl Reader { + pub fn open(dump: &TempDir) -> Result { + let mut meta_file = File::open(path.path().join("metadata.json"))?; + let metadata = serde_json::from_reader(&mut meta_file)?; + + Ok(Reader { dump, metadata }) + } + + pub fn date(&self) -> Result> { + Ok(None) + } +} diff --git a/dump/src/reader/loaders/v2.rs b/dump/src/reader/loaders/v2.rs new file mode 100644 index 000000000..5926de931 --- /dev/null +++ b/dump/src/reader/loaders/v2.rs @@ -0,0 +1,216 @@ +use std::fs::{File, OpenOptions}; +use std::io::Write; +use std::path::{Path, PathBuf}; + +use serde_json::{Deserializer, Value}; +use tempfile::NamedTempFile; + +use crate::dump::compat::{self, v2, v3}; +use crate::dump::Metadata; +use crate::options::IndexerOpts; + +/// The dump v2 reads the dump folder and patches all the needed file to make it compatible with a +/// dump v3, then calls the dump v3 to actually handle the dump. +pub fn load_dump( + meta: Metadata, + src: impl AsRef, + dst: impl AsRef, + index_db_size: usize, + update_db_size: usize, + indexing_options: &IndexerOpts, +) -> anyhow::Result<()> { + log::info!("Patching dump V2 to dump V3..."); + let indexes_path = src.as_ref().join("indexes"); + + let dir_entries = std::fs::read_dir(indexes_path)?; + for entry in dir_entries { + let entry = entry?; + + // rename the index folder + let path = entry.path(); + let new_path = patch_index_uuid_path(&path).expect("invalid index folder."); + + std::fs::rename(path, &new_path)?; + + let settings_path = new_path.join("meta.json"); + + patch_settings(settings_path)?; + } + + let update_dir = src.as_ref().join("updates"); + let update_path = update_dir.join("data.jsonl"); + patch_updates(update_dir, update_path)?; + + super::v3::load_dump( + meta, + src, + dst, + index_db_size, + update_db_size, + indexing_options, + ) +} + +fn patch_index_uuid_path(path: &Path) -> Option { + let uuid = path.file_name()?.to_str()?.trim_start_matches("index-"); + let new_path = path.parent()?.join(uuid); + Some(new_path) +} + +fn patch_settings(path: impl AsRef) -> anyhow::Result<()> { + let mut meta_file = File::open(&path)?; + let mut meta: Value = serde_json::from_reader(&mut meta_file)?; + + // We first deserialize the dump meta into a serde_json::Value and change + // the custom ranking rules settings from the old format to the new format. + if let Some(ranking_rules) = meta.pointer_mut("/settings/rankingRules") { + patch_custom_ranking_rules(ranking_rules); + } + + let mut meta_file = OpenOptions::new().truncate(true).write(true).open(path)?; + + serde_json::to_writer(&mut meta_file, &meta)?; + + Ok(()) +} + +fn patch_updates(dir: impl AsRef, path: impl AsRef) -> anyhow::Result<()> { + let mut output_update_file = NamedTempFile::new_in(&dir)?; + let update_file = File::open(&path)?; + + let stream = Deserializer::from_reader(update_file).into_iter::(); + + for update in stream { + let update_entry = update?; + + let update_entry = v3::UpdateEntry::from(update_entry); + + serde_json::to_writer(&mut output_update_file, &update_entry)?; + output_update_file.write_all(b"\n")?; + } + + output_update_file.flush()?; + output_update_file.persist(path)?; + + Ok(()) +} + +/// Converts the ranking rules from the format `asc(_)`, `desc(_)` to the format `_:asc`, `_:desc`. +/// +/// This is done for compatibility reasons, and to avoid a new dump version, +/// since the new syntax was introduced soon after the new dump version. +fn patch_custom_ranking_rules(ranking_rules: &mut Value) { + *ranking_rules = match ranking_rules.take() { + Value::Array(values) => values + .into_iter() + .filter_map(|value| match value { + Value::String(s) if s.starts_with("asc") => compat::asc_ranking_rule(&s) + .map(|f| format!("{}:asc", f)) + .map(Value::String), + Value::String(s) if s.starts_with("desc") => compat::desc_ranking_rule(&s) + .map(|f| format!("{}:desc", f)) + .map(Value::String), + otherwise => Some(otherwise), + }) + .collect(), + otherwise => otherwise, + } +} + +impl From for v3::UpdateEntry { + fn from(v2::UpdateEntry { uuid, update }: v2::UpdateEntry) -> Self { + let update = match update { + v2::UpdateStatus::Processing(meta) => v3::UpdateStatus::Processing(meta.into()), + v2::UpdateStatus::Enqueued(meta) => v3::UpdateStatus::Enqueued(meta.into()), + v2::UpdateStatus::Processed(meta) => v3::UpdateStatus::Processed(meta.into()), + v2::UpdateStatus::Aborted(_) => unreachable!("Updates could never be aborted."), + v2::UpdateStatus::Failed(meta) => v3::UpdateStatus::Failed(meta.into()), + }; + + Self { uuid, update } + } +} + +impl From for v3::Failed { + fn from(other: v2::Failed) -> Self { + let v2::Failed { + from, + error, + failed_at, + } = other; + + Self { + from: from.into(), + msg: error.message, + code: v2::error_code_from_str(&error.error_code) + .expect("Invalid update: Invalid error code"), + failed_at, + } + } +} + +impl From for v3::Processing { + fn from(other: v2::Processing) -> Self { + let v2::Processing { + from, + started_processing_at, + } = other; + + Self { + from: from.into(), + started_processing_at, + } + } +} + +impl From for v3::Enqueued { + fn from(other: v2::Enqueued) -> Self { + let v2::Enqueued { + update_id, + meta, + enqueued_at, + content, + } = other; + + let meta = match meta { + v2::UpdateMeta::DocumentsAddition { + method, + primary_key, + .. + } => { + v3::Update::DocumentAddition { + primary_key, + method, + // Just ignore if the uuid is no present. If it is needed later, an error will + // be thrown. + content_uuid: content.unwrap_or_default(), + } + } + v2::UpdateMeta::ClearDocuments => v3::Update::ClearDocuments, + v2::UpdateMeta::DeleteDocuments { ids } => v3::Update::DeleteDocuments(ids), + v2::UpdateMeta::Settings(settings) => v3::Update::Settings(settings), + }; + + Self { + update_id, + meta, + enqueued_at, + } + } +} + +impl From for v3::Processed { + fn from(other: v2::Processed) -> Self { + let v2::Processed { + from, + success, + processed_at, + } = other; + + Self { + success, + processed_at, + from: from.into(), + } + } +} diff --git a/dump/src/reader/loaders/v3.rs b/dump/src/reader/loaders/v3.rs new file mode 100644 index 000000000..44984c946 --- /dev/null +++ b/dump/src/reader/loaders/v3.rs @@ -0,0 +1,136 @@ +use std::collections::HashMap; +use std::fs::{self, File}; +use std::io::{BufReader, BufWriter, Write}; +use std::path::Path; + +use anyhow::Context; +use fs_extra::dir::{self, CopyOptions}; +use log::info; +use tempfile::tempdir; +use uuid::Uuid; + +use crate::dump::compat::{self, v3}; +use crate::dump::Metadata; +use crate::index_resolver::meta_store::{DumpEntry, IndexMeta}; +use crate::options::IndexerOpts; +use crate::tasks::task::TaskId; + +/// dump structure for V3: +/// . +/// ├── indexes +/// │   └── 25f10bb8-6ea8-42f0-bd48-ad5857f77648 +/// │   ├── documents.jsonl +/// │   └── meta.json +/// ├── index_uuids +/// │   └── data.jsonl +/// ├── metadata.json +/// └── updates +/// └── data.jsonl + +pub fn load_dump( + meta: Metadata, + src: impl AsRef, + dst: impl AsRef, + index_db_size: usize, + meta_env_size: usize, + indexing_options: &IndexerOpts, +) -> anyhow::Result<()> { + info!("Patching dump V3 to dump V4..."); + + let patched_dir = tempdir()?; + + let options = CopyOptions::default(); + dir::copy(src.as_ref().join("indexes"), patched_dir.path(), &options)?; + dir::copy( + src.as_ref().join("index_uuids"), + patched_dir.path(), + &options, + )?; + + let uuid_map = patch_index_meta( + src.as_ref().join("index_uuids/data.jsonl"), + patched_dir.path(), + )?; + + fs::copy( + src.as_ref().join("metadata.json"), + patched_dir.path().join("metadata.json"), + )?; + + patch_updates(&src, patched_dir.path(), uuid_map)?; + + super::v4::load_dump( + meta, + patched_dir.path(), + dst, + index_db_size, + meta_env_size, + indexing_options, + ) +} + +fn patch_index_meta( + path: impl AsRef, + dst: impl AsRef, +) -> anyhow::Result> { + let file = BufReader::new(File::open(path)?); + let dst = dst.as_ref().join("index_uuids"); + fs::create_dir_all(&dst)?; + let mut dst_file = File::create(dst.join("data.jsonl"))?; + + let map = serde_json::Deserializer::from_reader(file) + .into_iter::() + .try_fold(HashMap::new(), |mut map, entry| -> anyhow::Result<_> { + let entry = entry?; + map.insert(entry.uuid, entry.uid.clone()); + let meta = IndexMeta { + uuid: entry.uuid, + // This is lost information, we patch it to 0; + creation_task_id: 0, + }; + let entry = DumpEntry { + uid: entry.uid, + index_meta: meta, + }; + serde_json::to_writer(&mut dst_file, &entry)?; + dst_file.write_all(b"\n")?; + Ok(map) + })?; + + dst_file.flush()?; + + Ok(map) +} + +fn patch_updates( + src: impl AsRef, + dst: impl AsRef, + uuid_map: HashMap, +) -> anyhow::Result<()> { + let dst = dst.as_ref().join("updates"); + fs::create_dir_all(&dst)?; + + let mut dst_file = BufWriter::new(File::create(dst.join("data.jsonl"))?); + let src_file = BufReader::new(File::open(src.as_ref().join("updates/data.jsonl"))?); + + serde_json::Deserializer::from_reader(src_file) + .into_iter::() + .enumerate() + .try_for_each(|(task_id, entry)| -> anyhow::Result<()> { + let entry = entry?; + let name = uuid_map + .get(&entry.uuid) + .with_context(|| format!("Unknown index uuid: {}", entry.uuid))? + .clone(); + serde_json::to_writer( + &mut dst_file, + &compat::v4::Task::from((entry.update, name, task_id as TaskId)), + )?; + dst_file.write_all(b"\n")?; + Ok(()) + })?; + + dst_file.flush()?; + + Ok(()) +} diff --git a/dump/src/reader/loaders/v4.rs b/dump/src/reader/loaders/v4.rs new file mode 100644 index 000000000..0744df7ea --- /dev/null +++ b/dump/src/reader/loaders/v4.rs @@ -0,0 +1,103 @@ +use std::fs::{self, create_dir_all, File}; +use std::io::{BufReader, Write}; +use std::path::Path; + +use fs_extra::dir::{self, CopyOptions}; +use log::info; +use serde_json::{Deserializer, Map, Value}; +use tempfile::tempdir; +use uuid::Uuid; + +use crate::dump::{compat, Metadata}; +use crate::options::IndexerOpts; +use crate::tasks::task::Task; + +pub fn load_dump( + meta: Metadata, + src: impl AsRef, + dst: impl AsRef, + index_db_size: usize, + meta_env_size: usize, + indexing_options: &IndexerOpts, +) -> anyhow::Result<()> { + info!("Patching dump V4 to dump V5..."); + + let patched_dir = tempdir()?; + let options = CopyOptions::default(); + + // Indexes + dir::copy(src.as_ref().join("indexes"), &patched_dir, &options)?; + + // Index uuids + dir::copy(src.as_ref().join("index_uuids"), &patched_dir, &options)?; + + // Metadata + fs::copy( + src.as_ref().join("metadata.json"), + patched_dir.path().join("metadata.json"), + )?; + + // Updates + patch_updates(&src, &patched_dir)?; + + // Keys + patch_keys(&src, &patched_dir)?; + + super::v5::load_dump( + meta, + &patched_dir, + dst, + index_db_size, + meta_env_size, + indexing_options, + ) +} + +fn patch_updates(src: impl AsRef, dst: impl AsRef) -> anyhow::Result<()> { + let updates_path = src.as_ref().join("updates/data.jsonl"); + let output_updates_path = dst.as_ref().join("updates/data.jsonl"); + create_dir_all(output_updates_path.parent().unwrap())?; + let udpates_file = File::open(updates_path)?; + let mut output_update_file = File::create(output_updates_path)?; + + serde_json::Deserializer::from_reader(udpates_file) + .into_iter::() + .try_for_each(|task| -> anyhow::Result<()> { + let task: Task = task?.into(); + + serde_json::to_writer(&mut output_update_file, &task)?; + output_update_file.write_all(b"\n")?; + + Ok(()) + })?; + + output_update_file.flush()?; + + Ok(()) +} + +fn patch_keys(src: impl AsRef, dst: impl AsRef) -> anyhow::Result<()> { + let keys_file_src = src.as_ref().join("keys"); + + if !keys_file_src.exists() { + return Ok(()); + } + + fs::create_dir_all(&dst)?; + let keys_file_dst = dst.as_ref().join("keys"); + let mut writer = File::create(&keys_file_dst)?; + + let reader = BufReader::new(File::open(&keys_file_src)?); + for key in Deserializer::from_reader(reader).into_iter() { + let mut key: Map = key?; + + // generate a new uuid v4 and insert it in the key. + let uid = serde_json::to_value(Uuid::new_v4()).unwrap(); + key.insert("uid".to_string(), uid); + + serde_json::to_writer(&mut writer, &key)?; + writer.write_all(b"\n")?; + } + + Ok(()) +} diff --git a/dump/src/reader/loaders/v5.rs b/dump/src/reader/loaders/v5.rs new file mode 100644 index 000000000..fcb4224bb --- /dev/null +++ b/dump/src/reader/loaders/v5.rs @@ -0,0 +1,47 @@ +use std::{path::Path, sync::Arc}; + +use log::info; +use meilisearch_auth::AuthController; +use milli::heed::EnvOpenOptions; + +use crate::analytics; +use crate::dump::Metadata; +use crate::index_resolver::IndexResolver; +use crate::options::IndexerOpts; +use crate::tasks::TaskStore; +use crate::update_file_store::UpdateFileStore; + +pub fn load_dump( + meta: Metadata, + src: impl AsRef, + dst: impl AsRef, + index_db_size: usize, + meta_env_size: usize, + indexing_options: &IndexerOpts, +) -> anyhow::Result<()> { + info!( + "Loading dump from {}, dump database version: {}, dump version: V5", + meta.dump_date, meta.db_version + ); + + let mut options = EnvOpenOptions::new(); + options.map_size(meta_env_size); + options.max_dbs(100); + let env = Arc::new(options.open(&dst)?); + + IndexResolver::load_dump( + src.as_ref(), + &dst, + index_db_size, + env.clone(), + indexing_options, + )?; + UpdateFileStore::load_dump(src.as_ref(), &dst)?; + TaskStore::load_dump(&src, env)?; + AuthController::load_dump(&src, &dst)?; + analytics::copy_user_id(src.as_ref(), dst.as_ref()); + + info!("Loading indexes."); + + Ok(()) +} diff --git a/dump/src/reader/mod.rs b/dump/src/reader/mod.rs new file mode 100644 index 000000000..f2f5019ac --- /dev/null +++ b/dump/src/reader/mod.rs @@ -0,0 +1,105 @@ +use std::path::Path; +use std::{fs::File, io::BufReader}; + +use flate2::{bufread::GzDecoder, Compression}; +use index::{Settings, Unchecked}; +use index_scheduler::TaskView; +use meilisearch_auth::Key; +use serde::{Deserialize, Serialize}; + +use tempfile::TempDir; +use time::OffsetDateTime; + +use crate::{Result, Version}; + +// use self::loaders::{v2, v3, v4, v5}; + +// pub mod error; +// mod compat; +// mod loaders; +mod v1; +// mod v6; + +pub fn open( + dump_path: &Path, +) -> Result< + impl DumpReader< + Document = serde_json::Value, + Settings = Settings, + Task = TaskView, + UpdateFile = (), + Key = Key, + >, +> { + let path = TempDir::new()?; + + let dump = File::open(dump_path)?; + let mut dump = BufReader::new(dump); + + let gz = GzDecoder::new(&mut dump); + let mut archive = tar::Archive::new(gz); + archive.unpack(path.path())?; + + #[derive(Deserialize)] + struct MetadataVersion { + pub dump_version: Version, + } + let mut meta_file = File::open(path.path().join("metadata.json"))?; + let MetadataVersion { dump_version } = serde_json::from_reader(&mut meta_file)?; + + match dump_version { + // Version::V1 => Ok(Box::new(v1::Reader::open(path)?)), + Version::V1 => todo!(), + Version::V2 => todo!(), + Version::V3 => todo!(), + Version::V4 => todo!(), + Version::V5 => todo!(), + Version::V6 => todo!(), + }; + + todo!() +} + +pub trait DumpReader { + type Document; + type Settings; + + type Task; + type UpdateFile; + + type Key; + + /// Return the version of the dump. + fn version(&self) -> Version; + + /// Return at which date the index was created. + fn date(&self) -> Result>; + + /// Return an iterator over each indexes. + fn indexes( + &self, + ) -> Result< + Box< + dyn Iterator< + Item = Box>, + >, + >, + >; + + /// Return all the tasks in the dump with a possible update file. + fn tasks( + &self, + ) -> Result)>>>>; + + /// Return all the keys. + fn keys(&self) -> Result>>; +} + +pub trait IndexReader { + type Document; + type Settings; + + fn name(&self) -> &str; + fn documents(&self) -> Result>>; + fn settings(&self) -> Result; +} diff --git a/dump/src/reader/v1/mod.rs b/dump/src/reader/v1/mod.rs new file mode 100644 index 000000000..9f4a9cdd7 --- /dev/null +++ b/dump/src/reader/v1/mod.rs @@ -0,0 +1,177 @@ +use std::{ + convert::Infallible, + fs::{self, File}, + io::{BufRead, BufReader}, + path::Path, +}; + +use serde::Deserialize; +use tempfile::TempDir; +use time::OffsetDateTime; + +use self::update::UpdateStatus; + +use super::{DumpReader, IndexReader}; +use crate::{Error, Result, Version}; + +pub mod settings; +pub mod update; +pub mod v1; + +pub struct V1Reader { + dump: TempDir, + metadata: v1::Metadata, + indexes: Vec, +} + +struct V1IndexReader { + name: String, + documents: File, + settings: File, + updates: File, + + current_update: Option, +} + +impl V1IndexReader { + pub fn new(name: String, path: &Path) -> Result { + let mut ret = V1IndexReader { + name, + documents: File::open(path.join("documents.jsonl"))?, + settings: File::open(path.join("settings.json"))?, + updates: File::open(path.join("updates.jsonl"))?, + current_update: None, + }; + ret.next_update(); + + Ok(ret) + } + + pub fn next_update(&mut self) -> Result> { + let mut tasks = self.updates; + let mut reader = BufReader::new(&mut tasks); + + let current_update = if let Some(line) = reader.lines().next() { + Some(serde_json::from_str(&line?)?) + } else { + None + }; + + Ok(std::mem::replace(&mut self.current_update, current_update)) + } +} + +impl V1Reader { + pub fn open(dump: TempDir) -> Result { + let mut meta_file = fs::read(dump.path().join("metadata.json"))?; + let metadata = serde_json::from_reader(&*meta_file)?; + + let mut indexes = Vec::new(); + + let entries = fs::read_dir(dump.path())?; + for entry in entries { + let entry = entry?; + if entry.file_type()?.is_dir() { + indexes.push(V1IndexReader::new( + entry + .file_name() + .to_str() + .ok_or(Error::BadIndexName)? + .to_string(), + &entry.path(), + )?); + } + } + + Ok(V1Reader { + dump, + metadata, + indexes, + }) + } + + pub fn date(&self) -> Result> { + Ok(None) + } + + fn next_update(&mut self) -> Result> { + if let Some((idx, _)) = self + .indexes + .iter() + .map(|index| index.current_update) + .enumerate() + .filter_map(|(idx, update)| update.map(|u| (idx, u))) + .min_by_key(|(_, update)| update.enqueued_at()) + { + self.indexes[idx].next_update() + } else { + Ok(None) + } + } +} + +impl IndexReader for &V1IndexReader { + type Document = serde_json::Value; + type Settings = settings::Settings; + + fn name(&self) -> &str { + todo!() + } + + fn documents(&self) -> Result>> { + todo!() + } + + fn settings(&self) -> Result { + todo!() + } +} + +impl DumpReader for V1Reader { + type Document = serde_json::Value; + type Settings = settings::Settings; + + type Task = update::UpdateStatus; + type UpdateFile = (); + + type Key = Infallible; + + fn date(&self) -> Result> { + Ok(None) + } + + fn version(&self) -> Version { + Version::V1 + } + + fn indexes( + &self, + ) -> Result< + Box< + dyn Iterator< + Item = Box< + dyn super::IndexReader, + >, + >, + >, + > { + Ok(Box::new(self.indexes.iter().map(|index| { + Box::new(index) + as Box> + }))) + } + + fn tasks( + &self, + ) -> Result)>>>> { + Ok(Box::new(std::iter::from_fn(|| { + self.next_update() + .transpose() + .map(|result| result.map(|task| (task, None))) + }))) + } + + fn keys(&self) -> Result>> { + Ok(Box::new(std::iter::empty())) + } +} diff --git a/dump/src/reader/v1/settings.rs b/dump/src/reader/v1/settings.rs new file mode 100644 index 000000000..0065d3f97 --- /dev/null +++ b/dump/src/reader/v1/settings.rs @@ -0,0 +1,63 @@ +use std::collections::{BTreeMap, BTreeSet}; +use std::result::Result as StdResult; + +use serde::{Deserialize, Deserializer, Serialize}; + +#[derive(Default, Clone, Serialize, Deserialize, Debug)] +#[serde(rename_all = "camelCase", deny_unknown_fields)] +pub struct Settings { + #[serde(default, deserialize_with = "deserialize_some")] + pub ranking_rules: Option>>, + #[serde(default, deserialize_with = "deserialize_some")] + pub distinct_attribute: Option>, + #[serde(default, deserialize_with = "deserialize_some")] + pub searchable_attributes: Option>>, + #[serde(default, deserialize_with = "deserialize_some")] + pub displayed_attributes: Option>>, + #[serde(default, deserialize_with = "deserialize_some")] + pub stop_words: Option>>, + #[serde(default, deserialize_with = "deserialize_some")] + pub synonyms: Option>>>, + #[serde(default, deserialize_with = "deserialize_some")] + pub attributes_for_faceting: Option>>, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SettingsUpdate { + pub ranking_rules: UpdateState>, + pub distinct_attribute: UpdateState, + pub primary_key: UpdateState, + pub searchable_attributes: UpdateState>, + pub displayed_attributes: UpdateState>, + pub stop_words: UpdateState>, + pub synonyms: UpdateState>>, + pub attributes_for_faceting: UpdateState>, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum UpdateState { + Update(T), + Clear, + Nothing, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum RankingRule { + Typo, + Words, + Proximity, + Attribute, + WordsPosition, + Exactness, + Asc(String), + Desc(String), +} + +// Any value that is present is considered Some value, including null. +fn deserialize_some<'de, T, D>(deserializer: D) -> StdResult, D::Error> +where + T: Deserialize<'de>, + D: Deserializer<'de>, +{ + Deserialize::deserialize(deserializer).map(Some) +} diff --git a/dump/src/reader/v1/update.rs b/dump/src/reader/v1/update.rs new file mode 100644 index 000000000..c9ccaf309 --- /dev/null +++ b/dump/src/reader/v1/update.rs @@ -0,0 +1,120 @@ +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use time::OffsetDateTime; + +use super::settings::SettingsUpdate; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Update { + data: UpdateData, + #[serde(with = "time::serde::rfc3339")] + enqueued_at: OffsetDateTime, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum UpdateData { + ClearAll, + Customs(Vec), + // (primary key, documents) + DocumentsAddition { + primary_key: Option, + documents: Vec>, + }, + DocumentsPartial { + primary_key: Option, + documents: Vec>, + }, + DocumentsDeletion(Vec), + Settings(Box), +} + +impl UpdateData { + pub fn update_type(&self) -> UpdateType { + match self { + UpdateData::ClearAll => UpdateType::ClearAll, + UpdateData::Customs(_) => UpdateType::Customs, + UpdateData::DocumentsAddition { documents, .. } => UpdateType::DocumentsAddition { + number: documents.len(), + }, + UpdateData::DocumentsPartial { documents, .. } => UpdateType::DocumentsPartial { + number: documents.len(), + }, + UpdateData::DocumentsDeletion(deletion) => UpdateType::DocumentsDeletion { + number: deletion.len(), + }, + UpdateData::Settings(update) => UpdateType::Settings { + settings: update.clone(), + }, + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "name")] +pub enum UpdateType { + ClearAll, + Customs, + DocumentsAddition { number: usize }, + DocumentsPartial { number: usize }, + DocumentsDeletion { number: usize }, + Settings { settings: Box }, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ProcessedUpdateResult { + pub update_id: u64, + #[serde(rename = "type")] + pub update_type: UpdateType, + #[serde(skip_serializing_if = "Option::is_none")] + pub error: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub error_type: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub error_code: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub error_link: Option, + pub duration: f64, // in seconds + #[serde(with = "time::serde::rfc3339")] + pub enqueued_at: OffsetDateTime, + #[serde(with = "time::serde::rfc3339")] + pub processed_at: OffsetDateTime, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct EnqueuedUpdateResult { + pub update_id: u64, + #[serde(rename = "type")] + pub update_type: UpdateType, + #[serde(with = "time::serde::rfc3339")] + pub enqueued_at: OffsetDateTime, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase", tag = "status")] +pub enum UpdateStatus { + Enqueued { + #[serde(flatten)] + content: EnqueuedUpdateResult, + }, + Failed { + #[serde(flatten)] + content: ProcessedUpdateResult, + }, + Processed { + #[serde(flatten)] + content: ProcessedUpdateResult, + }, +} + +impl UpdateStatus { + pub fn enqueued_at(&self) -> &OffsetDateTime { + match self { + UpdateStatus::Enqueued { content } => &content.enqueued_at, + UpdateStatus::Failed { content } | UpdateStatus::Processed { content } => { + &content.enqueued_at + } + } + } +} diff --git a/dump/src/reader/v1/v1.rs b/dump/src/reader/v1/v1.rs new file mode 100644 index 000000000..0f4312508 --- /dev/null +++ b/dump/src/reader/v1/v1.rs @@ -0,0 +1,22 @@ +use serde::Deserialize; +use time::OffsetDateTime; + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Index { + pub name: String, + pub uid: String, + #[serde(with = "time::serde::rfc3339")] + created_at: OffsetDateTime, + #[serde(with = "time::serde::rfc3339")] + updated_at: OffsetDateTime, + pub primary_key: Option, +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Metadata { + indexes: Vec, + db_version: String, + dump_version: crate::Version, +} diff --git a/dump/src/reader/v6.rs b/dump/src/reader/v6.rs new file mode 100644 index 000000000..84cefe350 --- /dev/null +++ b/dump/src/reader/v6.rs @@ -0,0 +1,16 @@ +use std::{ + fs::{self}, + path::Path, +}; + +use time::OffsetDateTime; + +use crate::Result; + +type Metadata = crate::Metadata; + +pub fn date(dump: &Path) -> Result { + let metadata = fs::read(dump.join("metadata.json"))?; + let metadata: Metadata = serde_json::from_reader(metadata)?; + Ok(metadata.dump_date) +}