diff --git a/Cargo.lock b/Cargo.lock index 8166bd090..1f971d900 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3602,6 +3602,7 @@ dependencies = [ "indexmap", "meilisearch-auth", "meilisearch-types", + "milli", "serde", "serde_json", "tempfile", diff --git a/crates/meilitool/Cargo.toml b/crates/meilitool/Cargo.toml index 7d0b9f32c..4824fe935 100644 --- a/crates/meilitool/Cargo.toml +++ b/crates/meilitool/Cargo.toml @@ -14,11 +14,12 @@ arroy_v04_to_v05 = { package = "arroy", git = "https://github.com/meilisearch/ar clap = { version = "4.5.9", features = ["derive"] } dump = { path = "../dump" } file-store = { path = "../file-store" } -indexmap = {version = "2.7.0", features = ["serde"]} +indexmap = { version = "2.7.0", features = ["serde"] } meilisearch-auth = { path = "../meilisearch-auth" } meilisearch-types = { path = "../meilisearch-types" } +milli = { path = "../milli" } serde = { version = "1.0.209", features = ["derive"] } -serde_json = {version = "1.0.133", features = ["preserve_order"]} +serde_json = { version = "1.0.133", features = ["preserve_order"] } tempfile = "3.14.0" time = { version = "0.3.36", features = ["formatting", "parsing", "alloc"] } uuid = { version = "1.10.0", features = ["v4"], default-features = false } diff --git a/crates/meilitool/src/upgrade/mod.rs b/crates/meilitool/src/upgrade/mod.rs index 14f941311..4084daead 100644 --- a/crates/meilitool/src/upgrade/mod.rs +++ b/crates/meilitool/src/upgrade/mod.rs @@ -8,7 +8,7 @@ use std::path::{Path, PathBuf}; use anyhow::{bail, Context}; use meilisearch_types::versioning::create_version_file; use v1_10::v1_9_to_v1_10; -use v1_12::v1_11_to_v1_12; +use v1_12::{v1_11_to_v1_12, v1_12_to_v1_12_3}; use crate::upgrade::v1_11::v1_10_to_v1_11; @@ -24,6 +24,7 @@ impl OfflineUpgrade { (v1_9_to_v1_10 as fn(&Path) -> Result<(), anyhow::Error>, "1", "10", "0"), (v1_10_to_v1_11, "1", "11", "0"), (v1_11_to_v1_12, "1", "12", "0"), + (v1_12_to_v1_12_3, "1", "12", "3"), ]; let (current_major, current_minor, current_patch) = &self.current_version; @@ -36,6 +37,7 @@ impl OfflineUpgrade { ("1", "9", _) => 0, ("1", "10", _) => 1, ("1", "11", _) => 2, + ("1", "12", x) if x == "0" || x == "1" || x == "2" => 3, _ => { bail!("Unsupported current version {current_major}.{current_minor}.{current_patch}. Can only upgrade from v1.9 and v1.10") } @@ -46,7 +48,8 @@ impl OfflineUpgrade { let ends_at = match (target_major.as_str(), target_minor.as_str(), target_patch.as_str()) { ("1", "10", _) => 0, ("1", "11", _) => 1, - ("1", "12", _) => 2, + ("1", "12", x) if x == "0" || x == "1" || x == "2" => 2, + ("1", "12", "3") => 3, (major, _, _) if major.starts_with('v') => { bail!("Target version must not starts with a `v`. Instead of writing `v1.9.0` write `1.9.0` for example.") } diff --git a/crates/meilitool/src/upgrade/v1_12.rs b/crates/meilitool/src/upgrade/v1_12.rs index 444617375..7a9e71e7e 100644 --- a/crates/meilitool/src/upgrade/v1_12.rs +++ b/crates/meilitool/src/upgrade/v1_12.rs @@ -1,16 +1,24 @@ //! The breaking changes that happened between the v1.11 and the v1.12 are: //! - The new indexer changed the update files format from OBKV to ndjson. https://github.com/meilisearch/meilisearch/pull/4900 +use std::borrow::Cow; use std::io::BufWriter; use std::path::Path; +use std::sync::atomic::AtomicBool; use anyhow::Context; use file_store::FileStore; use indexmap::IndexMap; use meilisearch_types::milli::documents::DocumentsBatchReader; +use milli::heed::types::Str; +use milli::heed::{Database, EnvOpenOptions}; +use milli::progress::Step; use serde_json::value::RawValue; use tempfile::NamedTempFile; +use crate::try_opening_database; +use crate::uuid_codec::UuidCodec; + pub fn v1_11_to_v1_12(db_path: &Path) -> anyhow::Result<()> { println!("Upgrading from v1.11.0 to v1.12.0"); @@ -19,6 +27,14 @@ pub fn v1_11_to_v1_12(db_path: &Path) -> anyhow::Result<()> { Ok(()) } +pub fn v1_12_to_v1_12_3(db_path: &Path) -> anyhow::Result<()> { + println!("Upgrading from v1.12.{{0, 1, 2}} to v1.12.3"); + + rebuild_field_distribution(db_path)?; + + Ok(()) +} + /// Convert the update files from OBKV to ndjson format. /// /// 1) List all the update files using the file store. @@ -77,3 +93,113 @@ fn convert_update_files(db_path: &Path) -> anyhow::Result<()> { Ok(()) } + +/// Rebuild field distribution as it was wrongly computed in v1.12.x if x < 3 +fn rebuild_field_distribution(db_path: &Path) -> anyhow::Result<()> { + let index_scheduler_path = db_path.join("tasks"); + let env = unsafe { EnvOpenOptions::new().max_dbs(100).open(&index_scheduler_path) } + .with_context(|| format!("While trying to open {:?}", index_scheduler_path.display()))?; + + let sched_rtxn = env.read_txn()?; + + let index_mapping: Database = + try_opening_database(&env, &sched_rtxn, "index-mapping")?; + + let index_count = + index_mapping.len(&sched_rtxn).context("while reading the number of indexes")?; + + let progress = milli::progress::Progress::default(); + let finished = AtomicBool::new(false); + + std::thread::scope(|scope| { + let indexes = index_mapping.iter(&sched_rtxn)?; + + let display_progress = std::thread::Builder::new() + .name("display_progress".into()) + .spawn_scoped(scope, || { + while !finished.load(std::sync::atomic::Ordering::Relaxed) { + std::thread::sleep(std::time::Duration::from_secs(5)); + let view = progress.as_progress_view(); + let Ok(view) = serde_json::to_string(&view) else { + continue; + }; + println!("{view}"); + } + }) + .unwrap(); + + for (index_index, result) in indexes.enumerate() { + let (uid, uuid) = result?; + progress.update_progress(VariableNameStep::new( + uid, + index_index as u32, + index_count as u32, + )); + let index_path = db_path.join("indexes").join(uuid.to_string()); + + println!( + "[{}/{index_count}]Updating index `{uid}` at `{}`", + index_index + 1, + index_path.display() + ); + + println!("\t- Rebuilding field distribution"); + + let index = + milli::Index::new(EnvOpenOptions::new(), &index_path).with_context(|| { + format!("while opening index {uid} at '{}'", index_path.display()) + })?; + + let mut index_txn = index.write_txn()?; + + milli::update::new::reindex::field_distribution(&index, &mut index_txn, &progress) + .context("while rebuilding field distribution")?; + + index_txn.commit().context("while committing the write txn for the updated index")?; + } + + sched_rtxn.commit().context("while committing the write txn for the index-scheduler")?; + + finished.store(true, std::sync::atomic::Ordering::Relaxed); + + if let Err(panic) = display_progress.join() { + let msg = match panic.downcast_ref::<&'static str>() { + Some(s) => *s, + None => match panic.downcast_ref::() { + Some(s) => &s[..], + None => "Box", + }, + }; + eprintln!("WARN: the display thread panicked with {msg}"); + } + + println!("Upgrading database succeeded"); + Ok(()) + }) +} + +pub struct VariableNameStep { + name: String, + current: u32, + total: u32, +} + +impl VariableNameStep { + pub fn new(name: impl Into, current: u32, total: u32) -> Self { + Self { name: name.into(), current, total } + } +} + +impl Step for VariableNameStep { + fn name(&self) -> Cow<'static, str> { + self.name.clone().into() + } + + fn current(&self) -> u32 { + self.current + } + + fn total(&self) -> u32 { + self.total + } +} diff --git a/crates/milli/src/update/new/mod.rs b/crates/milli/src/update/new/mod.rs index 87995ee55..b7e08a461 100644 --- a/crates/milli/src/update/new/mod.rs +++ b/crates/milli/src/update/new/mod.rs @@ -16,6 +16,7 @@ pub mod indexer; mod merger; mod parallel_iterator_ext; mod ref_cell_ext; +pub mod reindex; pub(crate) mod steps; pub(crate) mod thread_local; pub mod vector_document; diff --git a/crates/milli/src/update/new/reindex.rs b/crates/milli/src/update/new/reindex.rs new file mode 100644 index 000000000..b5b7f3cf7 --- /dev/null +++ b/crates/milli/src/update/new/reindex.rs @@ -0,0 +1,47 @@ +use heed::RwTxn; + +use super::document::{Document, DocumentFromDb}; +use crate::progress::{self, AtomicSubStep, NamedStep, Progress}; +use crate::{FieldDistribution, Index, Result}; + +pub fn field_distribution(index: &Index, wtxn: &mut RwTxn<'_>, progress: &Progress) -> Result<()> { + let mut distribution = FieldDistribution::new(); + + let document_count = index.number_of_documents(wtxn)?; + let field_id_map = index.fields_ids_map(wtxn)?; + + let (update_document_count, sub_step) = + AtomicSubStep::::new(document_count as u32); + progress.update_progress(sub_step); + + let docids = index.documents_ids(wtxn)?; + + for docid in docids { + update_document_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + + let Some(document) = DocumentFromDb::new(docid, wtxn, index, &field_id_map)? else { + continue; + }; + let geo_iter = document.geo_field().transpose().map(|res| res.map(|rv| ("_geo", rv))); + for res in document.iter_top_level_fields().chain(geo_iter) { + let (field_name, _) = res?; + if let Some(count) = distribution.get_mut(field_name) { + *count += 1; + } else { + distribution.insert(field_name.to_owned(), 1); + } + } + } + + index.put_field_distribution(wtxn, &distribution)?; + Ok(()) +} + +#[derive(Default)] +pub struct FieldDistributionIndexProgress; + +impl NamedStep for FieldDistributionIndexProgress { + fn name(&self) -> &'static str { + "documents" + } +}