diff --git a/crates/meilitool/Cargo.toml b/crates/meilitool/Cargo.toml index 7d0b9f32c..52651a449 100644 --- a/crates/meilitool/Cargo.toml +++ b/crates/meilitool/Cargo.toml @@ -14,11 +14,11 @@ arroy_v04_to_v05 = { package = "arroy", git = "https://github.com/meilisearch/ar clap = { version = "4.5.9", features = ["derive"] } dump = { path = "../dump" } file-store = { path = "../file-store" } -indexmap = {version = "2.7.0", features = ["serde"]} +indexmap = { version = "2.7.0", features = ["serde"] } meilisearch-auth = { path = "../meilisearch-auth" } meilisearch-types = { path = "../meilisearch-types" } serde = { version = "1.0.209", features = ["derive"] } -serde_json = {version = "1.0.133", features = ["preserve_order"]} +serde_json = { version = "1.0.133", features = ["preserve_order"] } tempfile = "3.14.0" time = { version = "0.3.36", features = ["formatting", "parsing", "alloc"] } uuid = { version = "1.10.0", features = ["v4"], default-features = false } diff --git a/crates/meilitool/src/upgrade/mod.rs b/crates/meilitool/src/upgrade/mod.rs index 14f941311..47ca2cbd9 100644 --- a/crates/meilitool/src/upgrade/mod.rs +++ b/crates/meilitool/src/upgrade/mod.rs @@ -8,7 +8,7 @@ use std::path::{Path, PathBuf}; use anyhow::{bail, Context}; use meilisearch_types::versioning::create_version_file; use v1_10::v1_9_to_v1_10; -use v1_12::v1_11_to_v1_12; +use v1_12::{v1_11_to_v1_12, v1_12_to_v1_12_3}; use crate::upgrade::v1_11::v1_10_to_v1_11; @@ -21,9 +21,15 @@ pub struct OfflineUpgrade { impl OfflineUpgrade { pub fn upgrade(self) -> anyhow::Result<()> { let upgrade_list = [ - (v1_9_to_v1_10 as fn(&Path) -> Result<(), anyhow::Error>, "1", "10", "0"), + ( + v1_9_to_v1_10 as fn(&Path, &str, &str, &str) -> Result<(), anyhow::Error>, + "1", + "10", + "0", + ), (v1_10_to_v1_11, "1", "11", "0"), (v1_11_to_v1_12, "1", "12", "0"), + (v1_12_to_v1_12_3, "1", "12", "3"), ]; let (current_major, current_minor, current_patch) = &self.current_version; @@ -36,6 +42,7 @@ impl OfflineUpgrade { ("1", "9", _) => 0, ("1", "10", _) => 1, ("1", "11", _) => 2, + ("1", "12", x) if x == "0" || x == "1" || x == "2" => 3, _ => { bail!("Unsupported current version {current_major}.{current_minor}.{current_patch}. Can only upgrade from v1.9 and v1.10") } @@ -46,7 +53,8 @@ impl OfflineUpgrade { let ends_at = match (target_major.as_str(), target_minor.as_str(), target_patch.as_str()) { ("1", "10", _) => 0, ("1", "11", _) => 1, - ("1", "12", _) => 2, + ("1", "12", x) if x == "0" || x == "1" || x == "2" => 2, + ("1", "12", "3") => 3, (major, _, _) if major.starts_with('v') => { bail!("Target version must not starts with a `v`. Instead of writing `v1.9.0` write `1.9.0` for example.") } @@ -60,7 +68,7 @@ impl OfflineUpgrade { #[allow(clippy::needless_range_loop)] for index in start_at..=ends_at { let (func, major, minor, patch) = upgrade_list[index]; - (func)(&self.db_path)?; + (func)(&self.db_path, current_major, current_minor, current_patch)?; println!("Done"); // We're writing the version file just in case an issue arise _while_ upgrading. // We don't want the DB to fail in an unknown state. diff --git a/crates/meilitool/src/upgrade/v1_10.rs b/crates/meilitool/src/upgrade/v1_10.rs index 4a49ea471..a35fd4184 100644 --- a/crates/meilitool/src/upgrade/v1_10.rs +++ b/crates/meilitool/src/upgrade/v1_10.rs @@ -151,7 +151,12 @@ fn date_round_trip( Ok(()) } -pub fn v1_9_to_v1_10(db_path: &Path) -> anyhow::Result<()> { +pub fn v1_9_to_v1_10( + db_path: &Path, + _origin_major: &str, + _origin_minor: &str, + _origin_patch: &str, +) -> anyhow::Result<()> { println!("Upgrading from v1.9.0 to v1.10.0"); // 2 changes here diff --git a/crates/meilitool/src/upgrade/v1_11.rs b/crates/meilitool/src/upgrade/v1_11.rs index 92d853dd0..e24a35e8b 100644 --- a/crates/meilitool/src/upgrade/v1_11.rs +++ b/crates/meilitool/src/upgrade/v1_11.rs @@ -14,7 +14,12 @@ use meilisearch_types::milli::index::db_name; use crate::uuid_codec::UuidCodec; use crate::{try_opening_database, try_opening_poly_database}; -pub fn v1_10_to_v1_11(db_path: &Path) -> anyhow::Result<()> { +pub fn v1_10_to_v1_11( + db_path: &Path, + _origin_major: &str, + _origin_minor: &str, + _origin_patch: &str, +) -> anyhow::Result<()> { println!("Upgrading from v1.10.0 to v1.11.0"); let index_scheduler_path = db_path.join("tasks"); diff --git a/crates/meilitool/src/upgrade/v1_12.rs b/crates/meilitool/src/upgrade/v1_12.rs index 444617375..593fb833c 100644 --- a/crates/meilitool/src/upgrade/v1_12.rs +++ b/crates/meilitool/src/upgrade/v1_12.rs @@ -1,17 +1,34 @@ //! The breaking changes that happened between the v1.11 and the v1.12 are: //! - The new indexer changed the update files format from OBKV to ndjson. https://github.com/meilisearch/meilisearch/pull/4900 +use std::borrow::Cow; use std::io::BufWriter; use std::path::Path; +use std::sync::atomic::AtomicBool; use anyhow::Context; use file_store::FileStore; use indexmap::IndexMap; use meilisearch_types::milli::documents::DocumentsBatchReader; +use meilisearch_types::milli::heed::types::{SerdeJson, Str}; +use meilisearch_types::milli::heed::{Database, EnvOpenOptions, RoTxn, RwTxn}; +use meilisearch_types::milli::progress::Step; +use meilisearch_types::milli::{FieldDistribution, Index}; +use serde::Serialize; use serde_json::value::RawValue; use tempfile::NamedTempFile; +use time::OffsetDateTime; +use uuid::Uuid; -pub fn v1_11_to_v1_12(db_path: &Path) -> anyhow::Result<()> { +use crate::try_opening_database; +use crate::uuid_codec::UuidCodec; + +pub fn v1_11_to_v1_12( + db_path: &Path, + _origin_major: &str, + _origin_minor: &str, + _origin_patch: &str, +) -> anyhow::Result<()> { println!("Upgrading from v1.11.0 to v1.12.0"); convert_update_files(db_path)?; @@ -19,6 +36,23 @@ pub fn v1_11_to_v1_12(db_path: &Path) -> anyhow::Result<()> { Ok(()) } +pub fn v1_12_to_v1_12_3( + db_path: &Path, + origin_major: &str, + origin_minor: &str, + origin_patch: &str, +) -> anyhow::Result<()> { + println!("Upgrading from v1.12.{{0, 1, 2}} to v1.12.3"); + + if origin_minor == "12" { + rebuild_field_distribution(db_path)?; + } else { + println!("Not rebuilding field distribution as it wasn't corrupted coming from v{origin_major}.{origin_minor}.{origin_patch}"); + } + + Ok(()) +} + /// Convert the update files from OBKV to ndjson format. /// /// 1) List all the update files using the file store. @@ -77,3 +111,188 @@ fn convert_update_files(db_path: &Path) -> anyhow::Result<()> { Ok(()) } + +/// Rebuild field distribution as it was wrongly computed in v1.12.x if x < 3 +fn rebuild_field_distribution(db_path: &Path) -> anyhow::Result<()> { + let index_scheduler_path = db_path.join("tasks"); + let env = unsafe { EnvOpenOptions::new().max_dbs(100).open(&index_scheduler_path) } + .with_context(|| format!("While trying to open {:?}", index_scheduler_path.display()))?; + + let mut sched_wtxn = env.write_txn()?; + + let index_mapping: Database = + try_opening_database(&env, &sched_wtxn, "index-mapping")?; + let stats_db: Database> = + try_opening_database(&env, &sched_wtxn, "index-stats").with_context(|| { + format!("While trying to open {:?}", index_scheduler_path.display()) + })?; + + let index_count = + index_mapping.len(&sched_wtxn).context("while reading the number of indexes")?; + + // FIXME: not ideal, we have to pre-populate all indexes to prevent double borrow of sched_wtxn + // 1. immutably for the iteration + // 2. mutably for updating index stats + let indexes: Vec<_> = index_mapping + .iter(&sched_wtxn)? + .map(|res| res.map(|(uid, uuid)| (uid.to_owned(), uuid))) + .collect(); + + let progress = meilisearch_types::milli::progress::Progress::default(); + let finished = AtomicBool::new(false); + + std::thread::scope(|scope| { + let display_progress = std::thread::Builder::new() + .name("display_progress".into()) + .spawn_scoped(scope, || { + while !finished.load(std::sync::atomic::Ordering::Relaxed) { + std::thread::sleep(std::time::Duration::from_secs(5)); + let view = progress.as_progress_view(); + let Ok(view) = serde_json::to_string(&view) else { + continue; + }; + println!("{view}"); + } + }) + .unwrap(); + + for (index_index, result) in indexes.into_iter().enumerate() { + let (uid, uuid) = result?; + progress.update_progress(VariableNameStep::new( + &uid, + index_index as u32, + index_count as u32, + )); + let index_path = db_path.join("indexes").join(uuid.to_string()); + + println!( + "[{}/{index_count}]Updating index `{uid}` at `{}`", + index_index + 1, + index_path.display() + ); + + println!("\t- Rebuilding field distribution"); + + let index = meilisearch_types::milli::Index::new(EnvOpenOptions::new(), &index_path) + .with_context(|| { + format!("while opening index {uid} at '{}'", index_path.display()) + })?; + + let mut index_txn = index.write_txn()?; + + meilisearch_types::milli::update::new::reindex::field_distribution( + &index, + &mut index_txn, + &progress, + ) + .context("while rebuilding field distribution")?; + + let stats = IndexStats::new(&index, &index_txn) + .with_context(|| format!("computing stats for index `{uid}`"))?; + store_stats_of(stats_db, uuid, &mut sched_wtxn, &uid, &stats)?; + + index_txn.commit().context("while committing the write txn for the updated index")?; + } + + sched_wtxn.commit().context("while committing the write txn for the index-scheduler")?; + + finished.store(true, std::sync::atomic::Ordering::Relaxed); + + if let Err(panic) = display_progress.join() { + let msg = match panic.downcast_ref::<&'static str>() { + Some(s) => *s, + None => match panic.downcast_ref::() { + Some(s) => &s[..], + None => "Box", + }, + }; + eprintln!("WARN: the display thread panicked with {msg}"); + } + + println!("Upgrading database succeeded"); + Ok(()) + }) +} + +pub struct VariableNameStep { + name: String, + current: u32, + total: u32, +} + +impl VariableNameStep { + pub fn new(name: impl Into, current: u32, total: u32) -> Self { + Self { name: name.into(), current, total } + } +} + +impl Step for VariableNameStep { + fn name(&self) -> Cow<'static, str> { + self.name.clone().into() + } + + fn current(&self) -> u32 { + self.current + } + + fn total(&self) -> u32 { + self.total + } +} + +pub fn store_stats_of( + stats_db: Database>, + index_uuid: Uuid, + sched_wtxn: &mut RwTxn, + index_uid: &str, + stats: &IndexStats, +) -> anyhow::Result<()> { + stats_db + .put(sched_wtxn, &index_uuid, stats) + .with_context(|| format!("storing stats for index `{index_uid}`"))?; + Ok(()) +} + +/// The statistics that can be computed from an `Index` object. +#[derive(Serialize, Debug)] +pub struct IndexStats { + /// Number of documents in the index. + pub number_of_documents: u64, + /// Size taken up by the index' DB, in bytes. + /// + /// This includes the size taken by both the used and free pages of the DB, and as the free pages + /// are not returned to the disk after a deletion, this number is typically larger than + /// `used_database_size` that only includes the size of the used pages. + pub database_size: u64, + /// Size taken by the used pages of the index' DB, in bytes. + /// + /// As the DB backend does not return to the disk the pages that are not currently used by the DB, + /// this value is typically smaller than `database_size`. + pub used_database_size: u64, + /// Association of every field name with the number of times it occurs in the documents. + pub field_distribution: FieldDistribution, + /// Creation date of the index. + #[serde(with = "time::serde::rfc3339")] + pub created_at: OffsetDateTime, + /// Date of the last update of the index. + #[serde(with = "time::serde::rfc3339")] + pub updated_at: OffsetDateTime, +} + +impl IndexStats { + /// Compute the stats of an index + /// + /// # Parameters + /// + /// - rtxn: a RO transaction for the index, obtained from `Index::read_txn()`. + pub fn new(index: &Index, rtxn: &RoTxn) -> meilisearch_types::milli::Result { + Ok(IndexStats { + number_of_documents: index.number_of_documents(rtxn)?, + database_size: index.on_disk_size()?, + used_database_size: index.used_size()?, + field_distribution: index.field_distribution(rtxn)?, + created_at: index.created_at(rtxn)?, + updated_at: index.updated_at(rtxn)?, + }) + } +} diff --git a/crates/milli/src/update/index_documents/mod.rs b/crates/milli/src/update/index_documents/mod.rs index bae8e00b4..5631409b5 100644 --- a/crates/milli/src/update/index_documents/mod.rs +++ b/crates/milli/src/update/index_documents/mod.rs @@ -3334,6 +3334,44 @@ mod tests { rtxn.commit().unwrap(); } + #[test] + fn incremental_update_without_changing_facet_distribution() { + let index = TempIndex::new(); + index + .add_documents(documents!([ + {"id": 0, "some_field": "aaa", "other_field": "aaa" }, + {"id": 1, "some_field": "bbb", "other_field": "bbb" }, + ])) + .unwrap(); + { + let rtxn = index.read_txn().unwrap(); + // count field distribution + let results = index.field_distribution(&rtxn).unwrap(); + assert_eq!(Some(&2), results.get("id")); + assert_eq!(Some(&2), results.get("some_field")); + assert_eq!(Some(&2), results.get("other_field")); + } + + let mut index = index; + index.index_documents_config.update_method = IndexDocumentsMethod::UpdateDocuments; + + index + .add_documents(documents!([ + {"id": 0, "other_field": "bbb" }, + {"id": 1, "some_field": "ccc" }, + ])) + .unwrap(); + + { + let rtxn = index.read_txn().unwrap(); + // count field distribution + let results = index.field_distribution(&rtxn).unwrap(); + assert_eq!(Some(&2), results.get("id")); + assert_eq!(Some(&2), results.get("some_field")); + assert_eq!(Some(&2), results.get("other_field")); + } + } + #[test] fn delete_words_exact_attributes() { let index = TempIndex::new(); diff --git a/crates/milli/src/update/new/extract/documents.rs b/crates/milli/src/update/new/extract/documents.rs index 13307025a..0711a76b9 100644 --- a/crates/milli/src/update/new/extract/documents.rs +++ b/crates/milli/src/update/new/extract/documents.rs @@ -89,7 +89,8 @@ impl<'a, 'b, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a, 'b> { .or_default(); *entry -= 1; } - let content = update.updated(); + let content = + update.merged(&context.rtxn, context.index, &context.db_fields_ids_map)?; let geo_iter = content.geo_field().transpose().map(|res| res.map(|rv| ("_geo", rv))); for res in content.iter_top_level_fields().chain(geo_iter) { diff --git a/crates/milli/src/update/new/mod.rs b/crates/milli/src/update/new/mod.rs index 87995ee55..b7e08a461 100644 --- a/crates/milli/src/update/new/mod.rs +++ b/crates/milli/src/update/new/mod.rs @@ -16,6 +16,7 @@ pub mod indexer; mod merger; mod parallel_iterator_ext; mod ref_cell_ext; +pub mod reindex; pub(crate) mod steps; pub(crate) mod thread_local; pub mod vector_document; diff --git a/crates/milli/src/update/new/reindex.rs b/crates/milli/src/update/new/reindex.rs new file mode 100644 index 000000000..6bfeb123e --- /dev/null +++ b/crates/milli/src/update/new/reindex.rs @@ -0,0 +1,38 @@ +use heed::RwTxn; + +use super::document::{Document, DocumentFromDb}; +use crate::progress::{self, AtomicSubStep, Progress}; +use crate::{FieldDistribution, Index, Result}; + +pub fn field_distribution(index: &Index, wtxn: &mut RwTxn<'_>, progress: &Progress) -> Result<()> { + let mut distribution = FieldDistribution::new(); + + let document_count = index.number_of_documents(wtxn)?; + let field_id_map = index.fields_ids_map(wtxn)?; + + let (update_document_count, sub_step) = + AtomicSubStep::::new(document_count as u32); + progress.update_progress(sub_step); + + let docids = index.documents_ids(wtxn)?; + + for docid in docids { + update_document_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + + let Some(document) = DocumentFromDb::new(docid, wtxn, index, &field_id_map)? else { + continue; + }; + let geo_iter = document.geo_field().transpose().map(|res| res.map(|rv| ("_geo", rv))); + for res in document.iter_top_level_fields().chain(geo_iter) { + let (field_name, _) = res?; + if let Some(count) = distribution.get_mut(field_name) { + *count += 1; + } else { + distribution.insert(field_name.to_owned(), 1); + } + } + } + + index.put_field_distribution(wtxn, &distribution)?; + Ok(()) +}