diff --git a/meilitool/src/main.rs b/meilitool/src/main.rs index 06c4890a5..0f2b34f20 100644 --- a/meilitool/src/main.rs +++ b/meilitool/src/main.rs @@ -2,7 +2,7 @@ use std::fs::{read_dir, read_to_string, remove_file, File}; use std::io::BufWriter; use std::path::PathBuf; -use anyhow::Context; +use anyhow::{bail, Context}; use clap::{Parser, Subcommand}; use dump::{DumpWriter, IndexMetadata}; use file_store::FileStore; @@ -10,9 +10,10 @@ use meilisearch_auth::AuthController; use meilisearch_types::heed::types::{SerdeJson, Str}; use meilisearch_types::heed::{Database, Env, EnvOpenOptions, RoTxn, RwTxn, Unspecified}; use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader}; +use meilisearch_types::milli::index::{db_name, main_key}; use meilisearch_types::milli::{obkv_to_json, BEU32}; use meilisearch_types::tasks::{Status, Task}; -use meilisearch_types::versioning::check_version_file; +use meilisearch_types::versioning::{create_version_file, get_version, parse_version}; use meilisearch_types::Index; use time::macros::format_description; use time::OffsetDateTime; @@ -62,21 +63,404 @@ enum Command { #[arg(long)] skip_enqueued_tasks: bool, }, + + /// Attempts to upgrade from one major version to the next without a dump. + /// + /// Make sure to run this commmand when Meilisearch is not running! + /// If Meilisearch is running while executing this command, the database could be corrupted + /// (contain data from both the old and the new versions) + /// + /// Supported upgrade paths: + /// + /// - v1.9.0 -> v1.10.0 + OfflineUpgrade { + #[arg(long)] + target_version: String, + }, } fn main() -> anyhow::Result<()> { let Cli { db_path, command } = Cli::parse(); - check_version_file(&db_path).context("While checking the version file")?; + let detected_version = get_version(&db_path).context("While checking the version file")?; match command { Command::ClearTaskQueue => clear_task_queue(db_path), Command::ExportADump { dump_dir, skip_enqueued_tasks } => { export_a_dump(db_path, dump_dir, skip_enqueued_tasks) } + Command::OfflineUpgrade { target_version } => { + let target_version = parse_version(&target_version).context("While parsing `--target-version`. Make sure `--target-version` is in the format MAJOR.MINOR.PATCH")?; + OfflineUpgrade { db_path, current_version: detected_version, target_version }.upgrade() + } } } +struct OfflineUpgrade { + db_path: PathBuf, + current_version: (String, String, String), + target_version: (String, String, String), +} + +impl OfflineUpgrade { + fn upgrade(self) -> anyhow::Result<()> { + // TODO: if we make this process support more versions, introduce a more flexible way of checking for the version + // currently only supports v1.9 to v1.10 + let (current_major, current_minor, current_patch) = &self.current_version; + + match (current_major.as_str(), current_minor.as_str(), current_patch.as_str()) { + ("1", "9", _) => {} + _ => { + bail!("Unsupported current version {current_major}.{current_minor}.{current_patch}. Can only upgrade from v1.9") + } + } + + let (target_major, target_minor, target_patch) = &self.target_version; + + match (target_major.as_str(), target_minor.as_str(), target_patch.as_str()) { + ("1", "10", _) => {} + _ => { + bail!("Unsupported target version {target_major}.{target_minor}.{target_patch}. Can only upgrade to v1.10") + } + } + + println!("Upgrading from {current_major}.{current_minor}.{current_patch} to {target_major}.{target_minor}.{target_patch}"); + + self.v1_9_to_v1_10()?; + + println!("Writing VERSION file"); + + create_version_file(&self.db_path, target_major, target_minor, target_patch) + .context("while writing VERSION file after the upgrade")?; + + println!("Success"); + + Ok(()) + } + + fn v1_9_to_v1_10(&self) -> anyhow::Result<()> { + // 2 changes here + + // 1. date format. needs to be done before opening the Index + // 2. REST embedders. We don't support this case right now, so bail + + let index_scheduler_path = self.db_path.join("tasks"); + let env = unsafe { EnvOpenOptions::new().max_dbs(100).open(&index_scheduler_path) } + .with_context(|| { + format!("While trying to open {:?}", index_scheduler_path.display()) + })?; + + let mut sched_wtxn = env.write_txn()?; + + let index_mapping: Database = + try_opening_database(&env, &sched_wtxn, "index-mapping")?; + + let index_stats: Database = + try_opening_database(&env, &sched_wtxn, "index-stats").with_context(|| { + format!("While trying to open {:?}", index_scheduler_path.display()) + })?; + + let index_count = + index_mapping.len(&sched_wtxn).context("while reading the number of indexes")?; + + // FIXME: not ideal, we have to pre-populate all indexes to prevent double borrow of sched_wtxn + // 1. immutably for the iteration + // 2. mutably for updating index stats + let indexes: Vec<_> = index_mapping + .iter(&sched_wtxn)? + .map(|res| res.map(|(uid, uuid)| (uid.to_owned(), uuid))) + .collect(); + for (index_index, result) in indexes.into_iter().enumerate() { + let (uid, uuid) = result?; + let index_path = self.db_path.join("indexes").join(uuid.to_string()); + + println!( + "[{index_index}/{index_count}]Updating index {uid} at '{}'", + index_path.display() + ); + + let index_env = unsafe { + // FIXME: fetch the 25 magic number from the index file + EnvOpenOptions::new().max_dbs(25).open(&index_path).with_context(|| { + format!("while opening index {uid} at '{}'", index_path.display()) + })? + }; + + let mut index_wtxn = index_env.write_txn().with_context(|| { + format!( + "while obtaining a write transaction for index {uid} at {}", + index_path.display() + ) + })?; + + println!("\tUpdating index stats"); + update_index_stats(index_stats, &uid, uuid, &mut sched_wtxn)?; + println!("\tUpdating date format"); + update_date_format(&uid, &index_env, &mut index_wtxn)?; + + println!("\tChecking for incompatible embedders (REST embedders)"); + check_rest_embedder(&uid, &index_env, &index_wtxn)?; + + index_wtxn.commit().with_context(|| { + format!( + "while committing the write txn for index {uid} at {}", + index_path.display() + ) + })?; + } + + sched_wtxn.commit().context("while committing the write txn for the index-scheduler")?; + + println!("Upgrading database succeeded"); + + Ok(()) + } +} + +pub mod v1_9 { + pub type FieldDistribution = std::collections::BTreeMap; + + /// The statistics that can be computed from an `Index` object. + #[derive(serde::Serialize, serde::Deserialize, Debug)] + pub struct IndexStats { + /// Number of documents in the index. + pub number_of_documents: u64, + /// Size taken up by the index' DB, in bytes. + /// + /// This includes the size taken by both the used and free pages of the DB, and as the free pages + /// are not returned to the disk after a deletion, this number is typically larger than + /// `used_database_size` that only includes the size of the used pages. + pub database_size: u64, + /// Size taken by the used pages of the index' DB, in bytes. + /// + /// As the DB backend does not return to the disk the pages that are not currently used by the DB, + /// this value is typically smaller than `database_size`. + pub used_database_size: u64, + /// Association of every field name with the number of times it occurs in the documents. + pub field_distribution: FieldDistribution, + /// Creation date of the index. + pub created_at: time::OffsetDateTime, + /// Date of the last update of the index. + pub updated_at: time::OffsetDateTime, + } + + use serde::{Deserialize, Serialize}; + + #[derive(Debug, Deserialize, Serialize)] + pub struct IndexEmbeddingConfig { + pub name: String, + pub config: EmbeddingConfig, + } + + #[derive(Debug, Clone, Default, serde::Deserialize, serde::Serialize)] + pub struct EmbeddingConfig { + /// Options of the embedder, specific to each kind of embedder + pub embedder_options: EmbedderOptions, + } + + /// Options of an embedder, specific to each kind of embedder. + #[derive(Debug, Clone, Hash, PartialEq, Eq, serde::Deserialize, serde::Serialize)] + pub enum EmbedderOptions { + HuggingFace(hf::EmbedderOptions), + OpenAi(openai::EmbedderOptions), + Ollama(ollama::EmbedderOptions), + UserProvided(manual::EmbedderOptions), + Rest(rest::EmbedderOptions), + } + + impl Default for EmbedderOptions { + fn default() -> Self { + Self::OpenAi(openai::EmbedderOptions { api_key: None, dimensions: None }) + } + } + + mod hf { + #[derive(Debug, Clone, Hash, PartialEq, Eq, serde::Deserialize, serde::Serialize)] + pub struct EmbedderOptions { + pub model: String, + pub revision: Option, + } + } + mod openai { + + #[derive(Debug, Clone, Hash, PartialEq, Eq, serde::Deserialize, serde::Serialize)] + pub struct EmbedderOptions { + pub api_key: Option, + pub dimensions: Option, + } + } + mod ollama { + #[derive(Debug, Clone, Hash, PartialEq, Eq, serde::Deserialize, serde::Serialize)] + pub struct EmbedderOptions { + pub embedding_model: String, + pub url: Option, + pub api_key: Option, + } + } + mod manual { + #[derive(Debug, Clone, Hash, PartialEq, Eq, serde::Deserialize, serde::Serialize)] + pub struct EmbedderOptions { + pub dimensions: usize, + } + } + mod rest { + #[derive(Debug, Clone, PartialEq, Eq, serde::Deserialize, serde::Serialize, Hash)] + pub struct EmbedderOptions { + pub api_key: Option, + pub dimensions: Option, + pub url: String, + pub input_field: Vec, + // path to the array of embeddings + pub path_to_embeddings: Vec, + // shape of a single embedding + pub embedding_object: Vec, + } + } + + pub type OffsetDateTime = time::OffsetDateTime; +} + +pub mod v1_10 { + use crate::v1_9; + + pub type FieldDistribution = std::collections::BTreeMap; + + /// The statistics that can be computed from an `Index` object. + #[derive(serde::Serialize, serde::Deserialize, Debug)] + pub struct IndexStats { + /// Number of documents in the index. + pub number_of_documents: u64, + /// Size taken up by the index' DB, in bytes. + /// + /// This includes the size taken by both the used and free pages of the DB, and as the free pages + /// are not returned to the disk after a deletion, this number is typically larger than + /// `used_database_size` that only includes the size of the used pages. + pub database_size: u64, + /// Size taken by the used pages of the index' DB, in bytes. + /// + /// As the DB backend does not return to the disk the pages that are not currently used by the DB, + /// this value is typically smaller than `database_size`. + pub used_database_size: u64, + /// Association of every field name with the number of times it occurs in the documents. + pub field_distribution: FieldDistribution, + /// Creation date of the index. + #[serde(with = "time::serde::rfc3339")] + pub created_at: time::OffsetDateTime, + /// Date of the last update of the index. + #[serde(with = "time::serde::rfc3339")] + pub updated_at: time::OffsetDateTime, + } + + impl From for IndexStats { + fn from( + v1_9::IndexStats { + number_of_documents, + database_size, + used_database_size, + field_distribution, + created_at, + updated_at, + }: v1_9::IndexStats, + ) -> Self { + IndexStats { + number_of_documents, + database_size, + used_database_size, + field_distribution, + created_at, + updated_at, + } + } + } + + #[derive(serde::Serialize, serde::Deserialize)] + #[serde(transparent)] + pub struct OffsetDateTime(#[serde(with = "time::serde::rfc3339")] pub time::OffsetDateTime); +} + +fn update_index_stats( + index_stats: Database, + index_uid: &str, + index_uuid: uuid::Uuid, + sched_wtxn: &mut RwTxn, +) -> anyhow::Result<()> { + let ctx = || format!("while updating index stats for index {index_uid}"); + + let stats: Option = index_stats + .remap_data_type::>() + .get(sched_wtxn, &index_uuid) + .with_context(ctx)?; + + if let Some(stats) = stats { + let stats: v1_10::IndexStats = stats.into(); + + index_stats + .remap_data_type::>() + .put(sched_wtxn, &index_uuid, &stats) + .with_context(ctx)?; + } + + Ok(()) +} + +fn update_date_format( + index_uid: &str, + index_env: &Env, + index_wtxn: &mut RwTxn, +) -> anyhow::Result<()> { + let main = try_opening_poly_database(index_env, index_wtxn, db_name::MAIN) + .with_context(|| format!("while updating date format for index {index_uid}"))?; + + date_round_trip(index_wtxn, index_uid, main, main_key::CREATED_AT_KEY)?; + date_round_trip(index_wtxn, index_uid, main, main_key::UPDATED_AT_KEY)?; + + Ok(()) +} + +fn check_rest_embedder(index_uid: &str, index_env: &Env, index_txn: &RoTxn) -> anyhow::Result<()> { + let main = try_opening_poly_database(index_env, index_txn, db_name::MAIN) + .with_context(|| format!("while checking REST embedders for index {index_uid}"))?; + + for config in main + .remap_types::>>() + .get(index_txn, main_key::EMBEDDING_CONFIGS)? + .unwrap_or_default() + { + if let v1_9::EmbedderOptions::Rest(_) = config.config.embedder_options { + bail!( + "index {index_uid} has a REST embedder: {}. \ + REST embedder are unsupported for upgrade. \ + Remove the embedder and retry.", + config.name + ) + } + } + + Ok(()) +} + +fn date_round_trip( + wtxn: &mut RwTxn, + index_uid: &str, + db: Database, + key: &str, +) -> anyhow::Result<()> { + let datetime = + db.remap_types::>().get(wtxn, key).with_context( + || format!("could not read `{key}` while updating date format for index {index_uid}"), + )?; + + if let Some(datetime) = datetime { + db.remap_types::>() + .put(wtxn, key, &v1_10::OffsetDateTime(datetime)) + .with_context(|| { + format!("could not write `{key}` while updating date format for index {index_uid}") + })?; + } + + Ok(()) +} + /// Clears the task queue located at `db_path`. fn clear_task_queue(db_path: PathBuf) -> anyhow::Result<()> { let path = db_path.join("tasks");