diff --git a/Cargo.lock b/Cargo.lock index 500f28454..cef8e9c8a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -404,6 +404,25 @@ dependencies = [ "thiserror", ] +[[package]] +name = "arroy" +version = "0.5.0" +source = "git+https://github.com/meilisearch/arroy/?tag=DO-NOT-DELETE-upgrade-v04-to-v05#053807bf38dc079f25b003f19fc30fbf3613f6e7" +dependencies = [ + "bytemuck", + "byteorder", + "heed", + "log", + "memmap2", + "nohash", + "ordered-float", + "rand", + "rayon", + "roaring", + "tempfile", + "thiserror", +] + [[package]] name = "assert-json-diff" version = "2.0.2" @@ -707,9 +726,9 @@ checksum = "2c676a478f63e9fa2dd5368a42f28bba0d6c560b775f38583c8bbaa7fcd67c9c" [[package]] name = "bytemuck" -version = "1.16.1" +version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b236fc92302c97ed75b38da1f4917b5cdda4984745740f153a5d3059e48d725e" +checksum = "8334215b81e418a0a7bdb8ef0849474f40bb10c8b71f1c4ed315cff49f32494d" dependencies = [ "bytemuck_derive", ] @@ -2556,7 +2575,7 @@ name = "index-scheduler" version = "1.11.0" dependencies = [ "anyhow", - "arroy", + "arroy 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "big_s", "bincode", "crossbeam", @@ -3517,6 +3536,7 @@ name = "meilitool" version = "1.11.0" dependencies = [ "anyhow", + "arroy 0.5.0 (git+https://github.com/meilisearch/arroy/?tag=DO-NOT-DELETE-upgrade-v04-to-v05)", "clap", "dump", "file-store", @@ -3547,7 +3567,7 @@ dependencies = [ name = "milli" version = "1.11.0" dependencies = [ - "arroy", + "arroy 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "big_s", "bimap", "bincode", diff --git a/crates/meilisearch/src/routes/indexes/search_analytics.rs b/crates/meilisearch/src/routes/indexes/search_analytics.rs index 8bbb1781f..b16e2636e 100644 --- a/crates/meilisearch/src/routes/indexes/search_analytics.rs +++ b/crates/meilisearch/src/routes/indexes/search_analytics.rs @@ -1,18 +1,16 @@ -use once_cell::sync::Lazy; -use regex::Regex; -use serde_json::{json, Value}; use std::collections::{BTreeSet, BinaryHeap, HashMap}; use meilisearch_types::locales::Locale; +use once_cell::sync::Lazy; +use regex::Regex; +use serde_json::{json, Value}; -use crate::{ - aggregate_methods, - analytics::{Aggregate, AggregateMethod}, - search::{ - SearchQuery, SearchResult, DEFAULT_CROP_LENGTH, DEFAULT_CROP_MARKER, - DEFAULT_HIGHLIGHT_POST_TAG, DEFAULT_HIGHLIGHT_PRE_TAG, DEFAULT_SEARCH_LIMIT, - DEFAULT_SEMANTIC_RATIO, - }, +use crate::aggregate_methods; +use crate::analytics::{Aggregate, AggregateMethod}; +use crate::search::{ + SearchQuery, SearchResult, DEFAULT_CROP_LENGTH, DEFAULT_CROP_MARKER, + DEFAULT_HIGHLIGHT_POST_TAG, DEFAULT_HIGHLIGHT_PRE_TAG, DEFAULT_SEARCH_LIMIT, + DEFAULT_SEMANTIC_RATIO, }; aggregate_methods!( diff --git a/crates/meilisearch/src/routes/indexes/settings.rs b/crates/meilisearch/src/routes/indexes/settings.rs index bca763a99..a9d8d3053 100644 --- a/crates/meilisearch/src/routes/indexes/settings.rs +++ b/crates/meilisearch/src/routes/indexes/settings.rs @@ -1,4 +1,3 @@ -use super::settings_analytics::*; use actix_web::web::Data; use actix_web::{web, HttpRequest, HttpResponse}; use deserr::actix_web::AwebJson; @@ -11,6 +10,7 @@ use meilisearch_types::settings::{settings, SecretPolicy, Settings, Unchecked}; use meilisearch_types::tasks::KindWithContent; use tracing::debug; +use super::settings_analytics::*; use crate::analytics::Analytics; use crate::extractors::authentication::policies::*; use crate::extractors::authentication::GuardedData; diff --git a/crates/meilisearch/src/routes/indexes/settings_analytics.rs b/crates/meilisearch/src/routes/indexes/settings_analytics.rs index de01b72e8..32bddcbdd 100644 --- a/crates/meilisearch/src/routes/indexes/settings_analytics.rs +++ b/crates/meilisearch/src/routes/indexes/settings_analytics.rs @@ -3,15 +3,16 @@ //! through the sub-settings route directly without any manipulation. //! This is why we often use a `Option<&Vec<_>>` instead of a `Option<&[_]>`. +use std::collections::{BTreeMap, BTreeSet, HashSet}; + +use meilisearch_types::facet_values_sort::FacetValuesSort; use meilisearch_types::locales::{Locale, LocalizedAttributesRuleView}; use meilisearch_types::milli::update::Setting; use meilisearch_types::milli::vector::settings::EmbeddingSettings; use meilisearch_types::settings::{ - FacetingSettings, PaginationSettings, ProximityPrecisionView, TypoSettings, + FacetingSettings, PaginationSettings, ProximityPrecisionView, RankingRuleView, TypoSettings, }; -use meilisearch_types::{facet_values_sort::FacetValuesSort, settings::RankingRuleView}; use serde::Serialize; -use std::collections::{BTreeMap, BTreeSet, HashSet}; use crate::analytics::Aggregate; diff --git a/crates/meilisearch/src/routes/indexes/similar_analytics.rs b/crates/meilisearch/src/routes/indexes/similar_analytics.rs index 69685a56c..726839c3a 100644 --- a/crates/meilisearch/src/routes/indexes/similar_analytics.rs +++ b/crates/meilisearch/src/routes/indexes/similar_analytics.rs @@ -4,11 +4,9 @@ use once_cell::sync::Lazy; use regex::Regex; use serde_json::{json, Value}; -use crate::{ - aggregate_methods, - analytics::{Aggregate, AggregateMethod}, - search::{SimilarQuery, SimilarResult}, -}; +use crate::aggregate_methods; +use crate::analytics::{Aggregate, AggregateMethod}; +use crate::search::{SimilarQuery, SimilarResult}; aggregate_methods!( SimilarPOST => "Similar POST", diff --git a/crates/meilisearch/src/routes/multi_search.rs b/crates/meilisearch/src/routes/multi_search.rs index b7bd31716..f8b1bc6ee 100644 --- a/crates/meilisearch/src/routes/multi_search.rs +++ b/crates/meilisearch/src/routes/multi_search.rs @@ -9,6 +9,7 @@ use meilisearch_types::keys::actions; use serde::Serialize; use tracing::debug; +use super::multi_search_analytics::MultiSearchAggregator; use crate::analytics::Analytics; use crate::error::MeilisearchHttpError; use crate::extractors::authentication::policies::ActionPolicy; @@ -21,8 +22,6 @@ use crate::search::{ }; use crate::search_queue::SearchQueue; -use super::multi_search_analytics::MultiSearchAggregator; - pub fn configure(cfg: &mut web::ServiceConfig) { cfg.service(web::resource("").route(web::post().to(SeqHandler(multi_search_with_post)))); } diff --git a/crates/meilisearch/src/routes/multi_search_analytics.rs b/crates/meilisearch/src/routes/multi_search_analytics.rs index be1218399..3d07f471c 100644 --- a/crates/meilisearch/src/routes/multi_search_analytics.rs +++ b/crates/meilisearch/src/routes/multi_search_analytics.rs @@ -2,10 +2,8 @@ use std::collections::HashSet; use serde_json::json; -use crate::{ - analytics::Aggregate, - search::{FederatedSearch, SearchQueryWithIndex}, -}; +use crate::analytics::Aggregate; +use crate::search::{FederatedSearch, SearchQueryWithIndex}; #[derive(Default)] pub struct MultiSearchAggregator { diff --git a/crates/meilisearch/tests/common/index.rs b/crates/meilisearch/tests/common/index.rs index 784067c2d..221333fd7 100644 --- a/crates/meilisearch/tests/common/index.rs +++ b/crates/meilisearch/tests/common/index.rs @@ -9,8 +9,7 @@ use urlencoding::encode as urlencode; use super::encoder::Encoder; use super::service::Service; -use super::Value; -use super::{Owned, Shared}; +use super::{Owned, Shared, Value}; use crate::json; pub struct Index<'a, State = Owned> { diff --git a/crates/meilitool/Cargo.toml b/crates/meilitool/Cargo.toml index ce6c1ad5b..048da6232 100644 --- a/crates/meilitool/Cargo.toml +++ b/crates/meilitool/Cargo.toml @@ -16,5 +16,6 @@ file-store = { path = "../file-store" } meilisearch-auth = { path = "../meilisearch-auth" } meilisearch-types = { path = "../meilisearch-types" } serde = { version = "1.0.209", features = ["derive"] } -time = { version = "0.3.36", features = ["formatting"] } +time = { version = "0.3.36", features = ["formatting", "parsing", "alloc"] } uuid = { version = "1.10.0", features = ["v4"], default-features = false } +arroy_v04_to_v05 = { package = "arroy", git = "https://github.com/meilisearch/arroy/", tag = "DO-NOT-DELETE-upgrade-v04-to-v05" } diff --git a/crates/meilitool/src/main.rs b/crates/meilitool/src/main.rs index 9dbff2486..978824356 100644 --- a/crates/meilitool/src/main.rs +++ b/crates/meilitool/src/main.rs @@ -2,7 +2,7 @@ use std::fs::{read_dir, read_to_string, remove_file, File}; use std::io::BufWriter; use std::path::PathBuf; -use anyhow::{bail, Context}; +use anyhow::Context; use clap::{Parser, Subcommand}; use dump::{DumpWriter, IndexMetadata}; use file_store::FileStore; @@ -10,15 +10,16 @@ use meilisearch_auth::AuthController; use meilisearch_types::heed::types::{SerdeJson, Str}; use meilisearch_types::heed::{Database, Env, EnvOpenOptions, RoTxn, RwTxn, Unspecified}; use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader}; -use meilisearch_types::milli::index::{db_name, main_key}; use meilisearch_types::milli::{obkv_to_json, BEU32}; use meilisearch_types::tasks::{Status, Task}; -use meilisearch_types::versioning::{create_version_file, get_version, parse_version}; +use meilisearch_types::versioning::{get_version, parse_version}; use meilisearch_types::Index; use time::macros::format_description; use time::OffsetDateTime; +use upgrade::OfflineUpgrade; use uuid_codec::UuidCodec; +mod upgrade; mod uuid_codec; #[derive(Parser)] @@ -72,7 +73,7 @@ enum Command { /// /// Supported upgrade paths: /// - /// - v1.9.0 -> v1.10.0 + /// - v1.9.x -> v1.10.x -> v1.11.x OfflineUpgrade { #[arg(long)] target_version: String, @@ -96,425 +97,6 @@ fn main() -> anyhow::Result<()> { } } -struct OfflineUpgrade { - db_path: PathBuf, - current_version: (String, String, String), - target_version: (String, String, String), -} - -impl OfflineUpgrade { - fn upgrade(self) -> anyhow::Result<()> { - // TODO: if we make this process support more versions, introduce a more flexible way of checking for the version - // currently only supports v1.9 to v1.10 - let (current_major, current_minor, current_patch) = &self.current_version; - - match (current_major.as_str(), current_minor.as_str(), current_patch.as_str()) { - ("1", "9", _) => {} - _ => { - bail!("Unsupported current version {current_major}.{current_minor}.{current_patch}. Can only upgrade from v1.9") - } - } - - let (target_major, target_minor, target_patch) = &self.target_version; - - match (target_major.as_str(), target_minor.as_str(), target_patch.as_str()) { - ("1", "10", _) => {} - _ => { - bail!("Unsupported target version {target_major}.{target_minor}.{target_patch}. Can only upgrade to v1.10") - } - } - - println!("Upgrading from {current_major}.{current_minor}.{current_patch} to {target_major}.{target_minor}.{target_patch}"); - - self.v1_9_to_v1_10()?; - - println!("Writing VERSION file"); - - create_version_file(&self.db_path, target_major, target_minor, target_patch) - .context("while writing VERSION file after the upgrade")?; - - println!("Success"); - - Ok(()) - } - - fn v1_9_to_v1_10(&self) -> anyhow::Result<()> { - // 2 changes here - - // 1. date format. needs to be done before opening the Index - // 2. REST embedders. We don't support this case right now, so bail - - let index_scheduler_path = self.db_path.join("tasks"); - let env = unsafe { EnvOpenOptions::new().max_dbs(100).open(&index_scheduler_path) } - .with_context(|| { - format!("While trying to open {:?}", index_scheduler_path.display()) - })?; - - let mut sched_wtxn = env.write_txn()?; - - let index_mapping: Database = - try_opening_database(&env, &sched_wtxn, "index-mapping")?; - - let index_stats: Database = - try_opening_database(&env, &sched_wtxn, "index-stats").with_context(|| { - format!("While trying to open {:?}", index_scheduler_path.display()) - })?; - - let index_count = - index_mapping.len(&sched_wtxn).context("while reading the number of indexes")?; - - // FIXME: not ideal, we have to pre-populate all indexes to prevent double borrow of sched_wtxn - // 1. immutably for the iteration - // 2. mutably for updating index stats - let indexes: Vec<_> = index_mapping - .iter(&sched_wtxn)? - .map(|res| res.map(|(uid, uuid)| (uid.to_owned(), uuid))) - .collect(); - - let mut rest_embedders = Vec::new(); - - let mut unwrapped_indexes = Vec::new(); - - // check that update can take place - for (index_index, result) in indexes.into_iter().enumerate() { - let (uid, uuid) = result?; - let index_path = self.db_path.join("indexes").join(uuid.to_string()); - - println!( - "[{}/{index_count}]Checking that update can take place for `{uid}` at `{}`", - index_index + 1, - index_path.display() - ); - - let index_env = unsafe { - // FIXME: fetch the 25 magic number from the index file - EnvOpenOptions::new().max_dbs(25).open(&index_path).with_context(|| { - format!("while opening index {uid} at '{}'", index_path.display()) - })? - }; - - let index_txn = index_env.read_txn().with_context(|| { - format!( - "while obtaining a write transaction for index {uid} at {}", - index_path.display() - ) - })?; - - println!("\t- Checking for incompatible embedders (REST embedders)"); - let rest_embedders_for_index = find_rest_embedders(&uid, &index_env, &index_txn)?; - - if rest_embedders_for_index.is_empty() { - unwrapped_indexes.push((uid, uuid)); - } else { - // no need to add to unwrapped indexes because we'll exit early - rest_embedders.push((uid, rest_embedders_for_index)); - } - } - - if !rest_embedders.is_empty() { - let rest_embedders = rest_embedders - .into_iter() - .flat_map(|(index, embedders)| std::iter::repeat(index.clone()).zip(embedders)) - .map(|(index, embedder)| format!("\t- embedder `{embedder}` in index `{index}`")) - .collect::>() - .join("\n"); - bail!("The update cannot take place because there are REST embedder(s). Remove them before proceeding with the update:\n{rest_embedders}\n\n\ - The database has not been modified and is still a valid v1.9 database."); - } - - println!("Update can take place, updating"); - - for (index_index, (uid, uuid)) in unwrapped_indexes.into_iter().enumerate() { - let index_path = self.db_path.join("indexes").join(uuid.to_string()); - - println!( - "[{}/{index_count}]Updating index `{uid}` at `{}`", - index_index + 1, - index_path.display() - ); - - let index_env = unsafe { - // FIXME: fetch the 25 magic number from the index file - EnvOpenOptions::new().max_dbs(25).open(&index_path).with_context(|| { - format!("while opening index {uid} at '{}'", index_path.display()) - })? - }; - - let mut index_wtxn = index_env.write_txn().with_context(|| { - format!( - "while obtaining a write transaction for index `{uid}` at `{}`", - index_path.display() - ) - })?; - - println!("\t- Updating index stats"); - update_index_stats(index_stats, &uid, uuid, &mut sched_wtxn)?; - println!("\t- Updating date format"); - update_date_format(&uid, &index_env, &mut index_wtxn)?; - - index_wtxn.commit().with_context(|| { - format!( - "while committing the write txn for index `{uid}` at {}", - index_path.display() - ) - })?; - } - - sched_wtxn.commit().context("while committing the write txn for the index-scheduler")?; - - println!("Upgrading database succeeded"); - - Ok(()) - } -} - -pub mod v1_9 { - pub type FieldDistribution = std::collections::BTreeMap; - - /// The statistics that can be computed from an `Index` object. - #[derive(serde::Serialize, serde::Deserialize, Debug)] - pub struct IndexStats { - /// Number of documents in the index. - pub number_of_documents: u64, - /// Size taken up by the index' DB, in bytes. - /// - /// This includes the size taken by both the used and free pages of the DB, and as the free pages - /// are not returned to the disk after a deletion, this number is typically larger than - /// `used_database_size` that only includes the size of the used pages. - pub database_size: u64, - /// Size taken by the used pages of the index' DB, in bytes. - /// - /// As the DB backend does not return to the disk the pages that are not currently used by the DB, - /// this value is typically smaller than `database_size`. - pub used_database_size: u64, - /// Association of every field name with the number of times it occurs in the documents. - pub field_distribution: FieldDistribution, - /// Creation date of the index. - pub created_at: time::OffsetDateTime, - /// Date of the last update of the index. - pub updated_at: time::OffsetDateTime, - } - - use serde::{Deserialize, Serialize}; - - #[derive(Debug, Deserialize, Serialize)] - pub struct IndexEmbeddingConfig { - pub name: String, - pub config: EmbeddingConfig, - } - - #[derive(Debug, Clone, Default, serde::Deserialize, serde::Serialize)] - pub struct EmbeddingConfig { - /// Options of the embedder, specific to each kind of embedder - pub embedder_options: EmbedderOptions, - } - - /// Options of an embedder, specific to each kind of embedder. - #[derive(Debug, Clone, Hash, PartialEq, Eq, serde::Deserialize, serde::Serialize)] - pub enum EmbedderOptions { - HuggingFace(hf::EmbedderOptions), - OpenAi(openai::EmbedderOptions), - Ollama(ollama::EmbedderOptions), - UserProvided(manual::EmbedderOptions), - Rest(rest::EmbedderOptions), - } - - impl Default for EmbedderOptions { - fn default() -> Self { - Self::OpenAi(openai::EmbedderOptions { api_key: None, dimensions: None }) - } - } - - mod hf { - #[derive(Debug, Clone, Hash, PartialEq, Eq, serde::Deserialize, serde::Serialize)] - pub struct EmbedderOptions { - pub model: String, - pub revision: Option, - } - } - mod openai { - - #[derive(Debug, Clone, Hash, PartialEq, Eq, serde::Deserialize, serde::Serialize)] - pub struct EmbedderOptions { - pub api_key: Option, - pub dimensions: Option, - } - } - mod ollama { - #[derive(Debug, Clone, Hash, PartialEq, Eq, serde::Deserialize, serde::Serialize)] - pub struct EmbedderOptions { - pub embedding_model: String, - pub url: Option, - pub api_key: Option, - } - } - mod manual { - #[derive(Debug, Clone, Hash, PartialEq, Eq, serde::Deserialize, serde::Serialize)] - pub struct EmbedderOptions { - pub dimensions: usize, - } - } - mod rest { - #[derive(Debug, Clone, PartialEq, Eq, serde::Deserialize, serde::Serialize, Hash)] - pub struct EmbedderOptions { - pub api_key: Option, - pub dimensions: Option, - pub url: String, - pub input_field: Vec, - // path to the array of embeddings - pub path_to_embeddings: Vec, - // shape of a single embedding - pub embedding_object: Vec, - } - } - - pub type OffsetDateTime = time::OffsetDateTime; -} - -pub mod v1_10 { - use crate::v1_9; - - pub type FieldDistribution = std::collections::BTreeMap; - - /// The statistics that can be computed from an `Index` object. - #[derive(serde::Serialize, serde::Deserialize, Debug)] - pub struct IndexStats { - /// Number of documents in the index. - pub number_of_documents: u64, - /// Size taken up by the index' DB, in bytes. - /// - /// This includes the size taken by both the used and free pages of the DB, and as the free pages - /// are not returned to the disk after a deletion, this number is typically larger than - /// `used_database_size` that only includes the size of the used pages. - pub database_size: u64, - /// Size taken by the used pages of the index' DB, in bytes. - /// - /// As the DB backend does not return to the disk the pages that are not currently used by the DB, - /// this value is typically smaller than `database_size`. - pub used_database_size: u64, - /// Association of every field name with the number of times it occurs in the documents. - pub field_distribution: FieldDistribution, - /// Creation date of the index. - #[serde(with = "time::serde::rfc3339")] - pub created_at: time::OffsetDateTime, - /// Date of the last update of the index. - #[serde(with = "time::serde::rfc3339")] - pub updated_at: time::OffsetDateTime, - } - - impl From for IndexStats { - fn from( - v1_9::IndexStats { - number_of_documents, - database_size, - used_database_size, - field_distribution, - created_at, - updated_at, - }: v1_9::IndexStats, - ) -> Self { - IndexStats { - number_of_documents, - database_size, - used_database_size, - field_distribution, - created_at, - updated_at, - } - } - } - - #[derive(serde::Serialize, serde::Deserialize)] - #[serde(transparent)] - pub struct OffsetDateTime(#[serde(with = "time::serde::rfc3339")] pub time::OffsetDateTime); -} - -fn update_index_stats( - index_stats: Database, - index_uid: &str, - index_uuid: uuid::Uuid, - sched_wtxn: &mut RwTxn, -) -> anyhow::Result<()> { - let ctx = || format!("while updating index stats for index `{index_uid}`"); - - let stats: Option = index_stats - .remap_data_type::>() - .get(sched_wtxn, &index_uuid) - .with_context(ctx)?; - - if let Some(stats) = stats { - let stats: v1_10::IndexStats = stats.into(); - - index_stats - .remap_data_type::>() - .put(sched_wtxn, &index_uuid, &stats) - .with_context(ctx)?; - } - - Ok(()) -} - -fn update_date_format( - index_uid: &str, - index_env: &Env, - index_wtxn: &mut RwTxn, -) -> anyhow::Result<()> { - let main = try_opening_poly_database(index_env, index_wtxn, db_name::MAIN) - .with_context(|| format!("while updating date format for index `{index_uid}`"))?; - - date_round_trip(index_wtxn, index_uid, main, main_key::CREATED_AT_KEY)?; - date_round_trip(index_wtxn, index_uid, main, main_key::UPDATED_AT_KEY)?; - - Ok(()) -} - -fn find_rest_embedders( - index_uid: &str, - index_env: &Env, - index_txn: &RoTxn, -) -> anyhow::Result> { - let main = try_opening_poly_database(index_env, index_txn, db_name::MAIN) - .with_context(|| format!("while checking REST embedders for index `{index_uid}`"))?; - - let mut rest_embedders = vec![]; - - for config in main - .remap_types::>>() - .get(index_txn, main_key::EMBEDDING_CONFIGS)? - .unwrap_or_default() - { - if let v1_9::EmbedderOptions::Rest(_) = config.config.embedder_options { - rest_embedders.push(config.name); - } - } - - Ok(rest_embedders) -} - -fn date_round_trip( - wtxn: &mut RwTxn, - index_uid: &str, - db: Database, - key: &str, -) -> anyhow::Result<()> { - let datetime = - db.remap_types::>().get(wtxn, key).with_context( - || format!("could not read `{key}` while updating date format for index `{index_uid}`"), - )?; - - if let Some(datetime) = datetime { - db.remap_types::>() - .put(wtxn, key, &v1_10::OffsetDateTime(datetime)) - .with_context(|| { - format!( - "could not write `{key}` while updating date format for index `{index_uid}`" - ) - })?; - } - - Ok(()) -} - /// Clears the task queue located at `db_path`. fn clear_task_queue(db_path: PathBuf) -> anyhow::Result<()> { let path = db_path.join("tasks"); diff --git a/crates/meilitool/src/upgrade/mod.rs b/crates/meilitool/src/upgrade/mod.rs new file mode 100644 index 000000000..36630c3b3 --- /dev/null +++ b/crates/meilitool/src/upgrade/mod.rs @@ -0,0 +1,73 @@ +mod v1_10; +mod v1_11; +mod v1_9; + +use std::path::{Path, PathBuf}; + +use anyhow::{bail, Context}; +use meilisearch_types::versioning::create_version_file; + +use v1_10::v1_9_to_v1_10; + +use crate::upgrade::v1_11::v1_10_to_v1_11; + +pub struct OfflineUpgrade { + pub db_path: PathBuf, + pub current_version: (String, String, String), + pub target_version: (String, String, String), +} + +impl OfflineUpgrade { + pub fn upgrade(self) -> anyhow::Result<()> { + let upgrade_list = [ + (v1_9_to_v1_10 as fn(&Path) -> Result<(), anyhow::Error>, "1", "10", "0"), + (v1_10_to_v1_11, "1", "11", "0"), + ]; + + let (current_major, current_minor, current_patch) = &self.current_version; + + let start_at = match ( + current_major.as_str(), + current_minor.as_str(), + current_patch.as_str(), + ) { + ("1", "9", _) => 0, + ("1", "10", _) => 1, + _ => { + bail!("Unsupported current version {current_major}.{current_minor}.{current_patch}. Can only upgrade from v1.9 and v1.10") + } + }; + + let (target_major, target_minor, target_patch) = &self.target_version; + + let ends_at = match (target_major.as_str(), target_minor.as_str(), target_patch.as_str()) { + ("1", "10", _) => 0, + ("1", "11", _) => 1, + (major, _, _) if major.starts_with('v') => { + bail!("Target version must not starts with a `v`. Instead of writing `v1.9.0` write `1.9.0` for example.") + } + _ => { + bail!("Unsupported target version {target_major}.{target_minor}.{target_patch}. Can only upgrade to v1.10 and v1.11") + } + }; + + println!("Starting the upgrade from {current_major}.{current_minor}.{current_patch} to {target_major}.{target_minor}.{target_patch}"); + + #[allow(clippy::needless_range_loop)] + for index in start_at..=ends_at { + let (func, major, minor, patch) = upgrade_list[index]; + (func)(&self.db_path)?; + println!("Done"); + // We're writing the version file just in case an issue arise _while_ upgrading. + // We don't want the DB to fail in an unknown state. + println!("Writing VERSION file"); + + create_version_file(&self.db_path, major, minor, patch) + .context("while writing VERSION file after the upgrade")?; + } + + println!("Success"); + + Ok(()) + } +} diff --git a/crates/meilitool/src/upgrade/v1_10.rs b/crates/meilitool/src/upgrade/v1_10.rs new file mode 100644 index 000000000..3dd7c72a2 --- /dev/null +++ b/crates/meilitool/src/upgrade/v1_10.rs @@ -0,0 +1,289 @@ +use anyhow::bail; +use std::path::Path; + +use anyhow::Context; +use meilisearch_types::{ + heed::{ + types::{SerdeJson, Str}, + Database, Env, EnvOpenOptions, RoTxn, RwTxn, Unspecified, + }, + milli::index::{db_name, main_key}, +}; + +use crate::{try_opening_database, try_opening_poly_database, uuid_codec::UuidCodec}; + +use super::v1_9; + +pub type FieldDistribution = std::collections::BTreeMap; + +/// The statistics that can be computed from an `Index` object. +#[derive(serde::Serialize, serde::Deserialize, Debug)] +pub struct IndexStats { + /// Number of documents in the index. + pub number_of_documents: u64, + /// Size taken up by the index' DB, in bytes. + /// + /// This includes the size taken by both the used and free pages of the DB, and as the free pages + /// are not returned to the disk after a deletion, this number is typically larger than + /// `used_database_size` that only includes the size of the used pages. + pub database_size: u64, + /// Size taken by the used pages of the index' DB, in bytes. + /// + /// As the DB backend does not return to the disk the pages that are not currently used by the DB, + /// this value is typically smaller than `database_size`. + pub used_database_size: u64, + /// Association of every field name with the number of times it occurs in the documents. + pub field_distribution: FieldDistribution, + /// Creation date of the index. + #[serde(with = "time::serde::rfc3339")] + pub created_at: time::OffsetDateTime, + /// Date of the last update of the index. + #[serde(with = "time::serde::rfc3339")] + pub updated_at: time::OffsetDateTime, +} + +impl From for IndexStats { + fn from( + v1_9::IndexStats { + number_of_documents, + database_size, + used_database_size, + field_distribution, + created_at, + updated_at, + }: v1_9::IndexStats, + ) -> Self { + IndexStats { + number_of_documents, + database_size, + used_database_size, + field_distribution, + created_at: created_at.0, + updated_at: updated_at.0, + } + } +} + +#[derive(serde::Serialize, serde::Deserialize)] +#[serde(transparent)] +pub struct OffsetDateTime(#[serde(with = "time::serde::rfc3339")] pub time::OffsetDateTime); + +fn update_index_stats( + index_stats: Database, + index_uid: &str, + index_uuid: uuid::Uuid, + sched_wtxn: &mut RwTxn, +) -> anyhow::Result<()> { + let ctx = || format!("while updating index stats for index `{index_uid}`"); + + let stats: Option<&str> = index_stats + .remap_data_type::() + .get(sched_wtxn, &index_uuid) + .with_context(ctx) + .with_context(|| "While reading value")?; + dbg!(stats); + + let stats: Option = index_stats + .remap_data_type::>() + .get(sched_wtxn, &index_uuid) + .with_context(ctx) + .with_context(|| "While reading value")?; + + if let Some(stats) = stats { + let stats: self::IndexStats = stats.into(); + + index_stats + .remap_data_type::>() + .put(sched_wtxn, &index_uuid, &stats) + .with_context(ctx) + .with_context(|| "While writing value")?; + } + + Ok(()) +} + +fn update_date_format( + index_uid: &str, + index_env: &Env, + index_wtxn: &mut RwTxn, +) -> anyhow::Result<()> { + let main = try_opening_poly_database(index_env, index_wtxn, db_name::MAIN) + .with_context(|| format!("while updating date format for index `{index_uid}`"))?; + + date_round_trip(index_wtxn, index_uid, main, main_key::CREATED_AT_KEY)?; + date_round_trip(index_wtxn, index_uid, main, main_key::UPDATED_AT_KEY)?; + + Ok(()) +} + +fn find_rest_embedders( + index_uid: &str, + index_env: &Env, + index_txn: &RoTxn, +) -> anyhow::Result> { + let main = try_opening_poly_database(index_env, index_txn, db_name::MAIN) + .with_context(|| format!("while checking REST embedders for index `{index_uid}`"))?; + + let mut rest_embedders = vec![]; + + for config in main + .remap_types::>>() + .get(index_txn, main_key::EMBEDDING_CONFIGS)? + .unwrap_or_default() + { + if let v1_9::EmbedderOptions::Rest(_) = config.config.embedder_options { + rest_embedders.push(config.name); + } + } + + Ok(rest_embedders) +} + +fn date_round_trip( + wtxn: &mut RwTxn, + index_uid: &str, + db: Database, + key: &str, +) -> anyhow::Result<()> { + let datetime = + db.remap_types::>().get(wtxn, key).with_context( + || format!("could not read `{key}` while updating date format for index `{index_uid}`"), + )?; + + if let Some(datetime) = datetime { + db.remap_types::>() + .put(wtxn, key, &self::OffsetDateTime(datetime.0)) + .with_context(|| { + format!( + "could not write `{key}` while updating date format for index `{index_uid}`" + ) + })?; + } + + Ok(()) +} + +pub fn v1_9_to_v1_10(db_path: &Path) -> anyhow::Result<()> { + println!("Upgrading from v1.9.0 to v1.10.0"); + // 2 changes here + + // 1. date format. needs to be done before opening the Index + // 2. REST embedders. We don't support this case right now, so bail + + let index_scheduler_path = db_path.join("tasks"); + let env = unsafe { EnvOpenOptions::new().max_dbs(100).open(&index_scheduler_path) } + .with_context(|| format!("While trying to open {:?}", index_scheduler_path.display()))?; + + let mut sched_wtxn = env.write_txn()?; + + let index_mapping: Database = + try_opening_database(&env, &sched_wtxn, "index-mapping")?; + + let index_stats: Database = + try_opening_database(&env, &sched_wtxn, "index-stats").with_context(|| { + format!("While trying to open {:?}", index_scheduler_path.display()) + })?; + + let index_count = + index_mapping.len(&sched_wtxn).context("while reading the number of indexes")?; + + // FIXME: not ideal, we have to pre-populate all indexes to prevent double borrow of sched_wtxn + // 1. immutably for the iteration + // 2. mutably for updating index stats + let indexes: Vec<_> = index_mapping + .iter(&sched_wtxn)? + .map(|res| res.map(|(uid, uuid)| (uid.to_owned(), uuid))) + .collect(); + + let mut rest_embedders = Vec::new(); + + let mut unwrapped_indexes = Vec::new(); + + // check that update can take place + for (index_index, result) in indexes.into_iter().enumerate() { + let (uid, uuid) = result?; + let index_path = db_path.join("indexes").join(uuid.to_string()); + + println!( + "[{}/{index_count}]Checking that update can take place for `{uid}` at `{}`", + index_index + 1, + index_path.display() + ); + + let index_env = unsafe { + // FIXME: fetch the 25 magic number from the index file + EnvOpenOptions::new().max_dbs(25).open(&index_path).with_context(|| { + format!("while opening index {uid} at '{}'", index_path.display()) + })? + }; + + let index_txn = index_env.read_txn().with_context(|| { + format!( + "while obtaining a write transaction for index {uid} at {}", + index_path.display() + ) + })?; + + println!("\t- Checking for incompatible embedders (REST embedders)"); + let rest_embedders_for_index = find_rest_embedders(&uid, &index_env, &index_txn)?; + + if rest_embedders_for_index.is_empty() { + unwrapped_indexes.push((uid, uuid)); + } else { + // no need to add to unwrapped indexes because we'll exit early + rest_embedders.push((uid, rest_embedders_for_index)); + } + } + + if !rest_embedders.is_empty() { + let rest_embedders = rest_embedders + .into_iter() + .flat_map(|(index, embedders)| std::iter::repeat(index.clone()).zip(embedders)) + .map(|(index, embedder)| format!("\t- embedder `{embedder}` in index `{index}`")) + .collect::>() + .join("\n"); + bail!("The update cannot take place because there are REST embedder(s). Remove them before proceeding with the update:\n{rest_embedders}\n\n\ + The database has not been modified and is still a valid v1.9 database."); + } + + println!("Update can take place, updating"); + + for (index_index, (uid, uuid)) in unwrapped_indexes.into_iter().enumerate() { + let index_path = db_path.join("indexes").join(uuid.to_string()); + + println!( + "[{}/{index_count}]Updating index `{uid}` at `{}`", + index_index + 1, + index_path.display() + ); + + let index_env = unsafe { + // FIXME: fetch the 25 magic number from the index file + EnvOpenOptions::new().max_dbs(25).open(&index_path).with_context(|| { + format!("while opening index {uid} at '{}'", index_path.display()) + })? + }; + + let mut index_wtxn = index_env.write_txn().with_context(|| { + format!( + "while obtaining a write transaction for index `{uid}` at `{}`", + index_path.display() + ) + })?; + + println!("\t- Updating index stats"); + update_index_stats(index_stats, &uid, uuid, &mut sched_wtxn)?; + println!("\t- Updating date format"); + update_date_format(&uid, &index_env, &mut index_wtxn)?; + + index_wtxn.commit().with_context(|| { + format!("while committing the write txn for index `{uid}` at {}", index_path.display()) + })?; + } + + sched_wtxn.commit().context("while committing the write txn for the index-scheduler")?; + + println!("Upgrading database succeeded"); + + Ok(()) +} diff --git a/crates/meilitool/src/upgrade/v1_11.rs b/crates/meilitool/src/upgrade/v1_11.rs new file mode 100644 index 000000000..0c84d3842 --- /dev/null +++ b/crates/meilitool/src/upgrade/v1_11.rs @@ -0,0 +1,85 @@ +//! The breaking changes that happened between the v1.10 and the v1.11 are: +//! - Arroy went from the v0.4.0 to the v0.5.0, see this release note to get the whole context: https://github.com/meilisearch/arroy/releases/tag/v0.5.0 +//! - The `angular` distance has been renamed to `cosine` => We only need to update the string in the metadata. +//! - Reorganize the `NodeId` to make the appending of vectors work => We'll have to update the keys of almost all items in the DB. +//! - Store the list of updated IDs directly in LMDB instead of a roaring bitmap => This shouldn't be an issue since we are never supposed to commit this roaring bitmap, but it's not forbidden by arroy so ensuring it works is probably better than anything. + +use std::path::Path; + +use anyhow::Context; +use meilisearch_types::{ + heed::{types::Str, Database, EnvOpenOptions}, + milli::index::db_name, +}; + +use crate::{try_opening_database, try_opening_poly_database, uuid_codec::UuidCodec}; + +pub fn v1_10_to_v1_11(db_path: &Path) -> anyhow::Result<()> { + println!("Upgrading from v1.10.0 to v1.11.0"); + + let index_scheduler_path = db_path.join("tasks"); + let env = unsafe { EnvOpenOptions::new().max_dbs(100).open(&index_scheduler_path) } + .with_context(|| format!("While trying to open {:?}", index_scheduler_path.display()))?; + + let sched_rtxn = env.read_txn()?; + + let index_mapping: Database = + try_opening_database(&env, &sched_rtxn, "index-mapping")?; + + let index_count = + index_mapping.len(&sched_rtxn).context("while reading the number of indexes")?; + + let indexes: Vec<_> = index_mapping + .iter(&sched_rtxn)? + .map(|res| res.map(|(uid, uuid)| (uid.to_owned(), uuid))) + .collect(); + + for (index_index, result) in indexes.into_iter().enumerate() { + let (uid, uuid) = result?; + let index_path = db_path.join("indexes").join(uuid.to_string()); + + println!( + "[{}/{index_count}]Updating embeddings for `{uid}` at `{}`", + index_index + 1, + index_path.display() + ); + + let index_env = unsafe { + EnvOpenOptions::new().max_dbs(25).open(&index_path).with_context(|| { + format!("while opening index {uid} at '{}'", index_path.display()) + })? + }; + + let index_rtxn = index_env.read_txn().with_context(|| { + format!( + "while obtaining a read transaction for index {uid} at {}", + index_path.display() + ) + })?; + let index_read_database = + try_opening_poly_database(&index_env, &index_rtxn, db_name::VECTOR_ARROY) + .with_context(|| format!("while updating date format for index `{uid}`"))?; + + let mut index_wtxn = index_env.write_txn().with_context(|| { + format!( + "while obtaining a write transaction for index {uid} at {}", + index_path.display() + ) + })?; + + let index_write_database = + try_opening_poly_database(&index_env, &index_wtxn, db_name::VECTOR_ARROY) + .with_context(|| format!("while updating date format for index `{uid}`"))?; + + arroy_v04_to_v05::ugrade_from_prev_version( + &index_rtxn, + index_read_database, + &mut index_wtxn, + index_write_database, + )?; + + index_wtxn.commit()?; + } + + Ok(()) +} diff --git a/crates/meilitool/src/upgrade/v1_9.rs b/crates/meilitool/src/upgrade/v1_9.rs new file mode 100644 index 000000000..96cbfe68c --- /dev/null +++ b/crates/meilitool/src/upgrade/v1_9.rs @@ -0,0 +1,158 @@ +use serde::{Deserialize, Serialize}; +use time::{Date, OffsetDateTime, Time, UtcOffset}; + +pub type FieldDistribution = std::collections::BTreeMap; + +/// The statistics that can be computed from an `Index` object. +#[derive(serde::Deserialize, Debug)] +pub struct IndexStats { + /// Number of documents in the index. + pub number_of_documents: u64, + /// Size taken up by the index' DB, in bytes. + /// + /// This includes the size taken by both the used and free pages of the DB, and as the free pages + /// are not returned to the disk after a deletion, this number is typically larger than + /// `used_database_size` that only includes the size of the used pages. + pub database_size: u64, + /// Size taken by the used pages of the index' DB, in bytes. + /// + /// As the DB backend does not return to the disk the pages that are not currently used by the DB, + /// this value is typically smaller than `database_size`. + pub used_database_size: u64, + /// Association of every field name with the number of times it occurs in the documents. + pub field_distribution: FieldDistribution, + /// Creation date of the index. + pub created_at: LegacyDateTime, + /// Date of the last update of the index. + pub updated_at: LegacyDateTime, +} + +#[derive(Debug, Deserialize, Serialize)] +pub struct IndexEmbeddingConfig { + pub name: String, + pub config: EmbeddingConfig, +} + +#[derive(Debug, Clone, Default, serde::Deserialize, serde::Serialize)] +pub struct EmbeddingConfig { + /// Options of the embedder, specific to each kind of embedder + pub embedder_options: EmbedderOptions, +} + +/// Options of an embedder, specific to each kind of embedder. +#[derive(Debug, Clone, Hash, PartialEq, Eq, serde::Deserialize, serde::Serialize)] +pub enum EmbedderOptions { + HuggingFace(hf::EmbedderOptions), + OpenAi(openai::EmbedderOptions), + Ollama(ollama::EmbedderOptions), + UserProvided(manual::EmbedderOptions), + Rest(rest::EmbedderOptions), +} + +impl Default for EmbedderOptions { + fn default() -> Self { + Self::OpenAi(openai::EmbedderOptions { api_key: None, dimensions: None }) + } +} + +mod hf { + #[derive(Debug, Clone, Hash, PartialEq, Eq, serde::Deserialize, serde::Serialize)] + pub struct EmbedderOptions { + pub model: String, + pub revision: Option, + } +} +mod openai { + + #[derive(Debug, Clone, Hash, PartialEq, Eq, serde::Deserialize, serde::Serialize)] + pub struct EmbedderOptions { + pub api_key: Option, + pub dimensions: Option, + } +} +mod ollama { + #[derive(Debug, Clone, Hash, PartialEq, Eq, serde::Deserialize, serde::Serialize)] + pub struct EmbedderOptions { + pub embedding_model: String, + pub url: Option, + pub api_key: Option, + } +} +mod manual { + #[derive(Debug, Clone, Hash, PartialEq, Eq, serde::Deserialize, serde::Serialize)] + pub struct EmbedderOptions { + pub dimensions: usize, + } +} +mod rest { + #[derive(Debug, Clone, PartialEq, Eq, serde::Deserialize, serde::Serialize, Hash)] + pub struct EmbedderOptions { + pub api_key: Option, + pub dimensions: Option, + pub url: String, + pub input_field: Vec, + // path to the array of embeddings + pub path_to_embeddings: Vec, + // shape of a single embedding + pub embedding_object: Vec, + } +} + +/// A datetime from Meilisearch v1.9 with an unspecified format. +#[derive(Debug)] +pub struct LegacyDateTime(pub OffsetDateTime); + +impl<'de> Deserialize<'de> for LegacyDateTime { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + struct Visitor; + impl<'de> serde::de::Visitor<'de> for Visitor { + type Value = OffsetDateTime; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(formatter, "a valid datetime") + } + + // Comes from a binary. The legacy format is: + // 2024-11-04 13:32:08.48368 +00:00:00 + fn visit_str(self, v: &str) -> Result + where + E: serde::de::Error, + { + let format = time::macros::format_description!("[year]-[month]-[day] [hour]:[minute]:[second].[subsecond] [offset_hour sign:mandatory]:[offset_minute]:[offset_second]"); + OffsetDateTime::parse(v, format).map_err(E::custom) + } + + // Comes from the docker image, the legacy format is: + // [2024, 309, 17, 15, 1, 698184971, 0,0,0] + // year, day in year, hour, minute, sec, subsec , offset stuff + fn visit_seq(self, mut seq: A) -> Result + where + A: serde::de::SeqAccess<'de>, + { + let mut vec = Vec::new(); + // We must deserialize the value as `i64` because the largest values are `u32` and `i32` + while let Some(el) = seq.next_element::()? { + vec.push(el); + } + if vec.len() != 9 { + return Err(serde::de::Error::custom(format!( + "Invalid datetime, received an array of {} elements instead of 9", + vec.len() + ))); + } + Ok(OffsetDateTime::new_in_offset( + Date::from_ordinal_date(vec[0] as i32, vec[1] as u16) + .map_err(serde::de::Error::custom)?, + Time::from_hms_nano(vec[2] as u8, vec[3] as u8, vec[4] as u8, vec[5] as u32) + .map_err(serde::de::Error::custom)?, + UtcOffset::from_hms(vec[6] as i8, vec[7] as i8, vec[8] as i8) + .map_err(serde::de::Error::custom)?, + )) + } + } + deserializer.deserialize_any(Visitor).map(LegacyDateTime) + } +} diff --git a/crates/milli/Cargo.toml b/crates/milli/Cargo.toml index df0e59496..7b43fbf33 100644 --- a/crates/milli/Cargo.toml +++ b/crates/milli/Cargo.toml @@ -15,7 +15,7 @@ license.workspace = true bimap = { version = "0.6.3", features = ["serde"] } bincode = "1.3.3" bstr = "1.9.1" -bytemuck = { version = "1.16.1", features = ["extern_crate_alloc"] } +bytemuck = { version = "1.18.0", features = ["extern_crate_alloc"] } byteorder = "1.5.0" charabia = { version = "0.9.1", default-features = false } concat-arrays = "0.1.2"