From 479607e5dd9185a1a69ec39f05a6c97be8e87c98 Mon Sep 17 00:00:00 2001 From: ManyTheFish Date: Tue, 10 Dec 2024 15:12:50 +0100 Subject: [PATCH 1/3] Convert update files from OBKV to ndjson --- Cargo.lock | 13 +++--- crates/meilitool/Cargo.toml | 5 ++- crates/meilitool/src/main.rs | 2 +- crates/meilitool/src/upgrade/mod.rs | 5 +++ crates/meilitool/src/upgrade/v1_12.rs | 63 +++++++++++++++++++++++++++ 5 files changed, 81 insertions(+), 7 deletions(-) create mode 100644 crates/meilitool/src/upgrade/v1_12.rs diff --git a/Cargo.lock b/Cargo.lock index 9476506ec..ae2715f25 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2661,12 +2661,12 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.2.6" +version = "2.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "168fb715dda47215e360912c096649d23d58bf392ac62f73919e831745e40f26" +checksum = "62f822373a4fe84d4bb149bf54e584a7f4abec90e072ed49cda0edea5b95471f" dependencies = [ "equivalent", - "hashbrown 0.14.3", + "hashbrown 0.15.1", "serde", ] @@ -3597,9 +3597,12 @@ dependencies = [ "clap", "dump", "file-store", + "indexmap", "meilisearch-auth", "meilisearch-types", "serde", + "serde_json", + "tempfile", "time", "uuid", ] @@ -4969,9 +4972,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.132" +version = "1.0.133" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d726bfaff4b320266d395898905d0eba0345aae23b54aee3a737e260fd46db03" +checksum = "c7fceb2473b9166b2294ef05efcb65a3db80803f0b03ef86a5fc88a2b85ee377" dependencies = [ "indexmap", "itoa", diff --git a/crates/meilitool/Cargo.toml b/crates/meilitool/Cargo.toml index 048da6232..7d0b9f32c 100644 --- a/crates/meilitool/Cargo.toml +++ b/crates/meilitool/Cargo.toml @@ -10,12 +10,15 @@ license.workspace = true [dependencies] anyhow = "1.0.86" +arroy_v04_to_v05 = { package = "arroy", git = "https://github.com/meilisearch/arroy/", tag = "DO-NOT-DELETE-upgrade-v04-to-v05" } clap = { version = "4.5.9", features = ["derive"] } dump = { path = "../dump" } file-store = { path = "../file-store" } +indexmap = {version = "2.7.0", features = ["serde"]} meilisearch-auth = { path = "../meilisearch-auth" } meilisearch-types = { path = "../meilisearch-types" } serde = { version = "1.0.209", features = ["derive"] } +serde_json = {version = "1.0.133", features = ["preserve_order"]} +tempfile = "3.14.0" time = { version = "0.3.36", features = ["formatting", "parsing", "alloc"] } uuid = { version = "1.10.0", features = ["v4"], default-features = false } -arroy_v04_to_v05 = { package = "arroy", git = "https://github.com/meilisearch/arroy/", tag = "DO-NOT-DELETE-upgrade-v04-to-v05" } diff --git a/crates/meilitool/src/main.rs b/crates/meilitool/src/main.rs index f84cea98d..44eb4960e 100644 --- a/crates/meilitool/src/main.rs +++ b/crates/meilitool/src/main.rs @@ -73,7 +73,7 @@ enum Command { /// /// Supported upgrade paths: /// - /// - v1.9.x -> v1.10.x -> v1.11.x + /// - v1.9.x -> v1.10.x -> v1.11.x -> v1.12.x OfflineUpgrade { #[arg(long)] target_version: String, diff --git a/crates/meilitool/src/upgrade/mod.rs b/crates/meilitool/src/upgrade/mod.rs index 36630c3b3..50882f610 100644 --- a/crates/meilitool/src/upgrade/mod.rs +++ b/crates/meilitool/src/upgrade/mod.rs @@ -1,5 +1,6 @@ mod v1_10; mod v1_11; +mod v1_12; mod v1_9; use std::path::{Path, PathBuf}; @@ -8,6 +9,7 @@ use anyhow::{bail, Context}; use meilisearch_types::versioning::create_version_file; use v1_10::v1_9_to_v1_10; +use v1_12::v1_11_to_v1_12; use crate::upgrade::v1_11::v1_10_to_v1_11; @@ -22,6 +24,7 @@ impl OfflineUpgrade { let upgrade_list = [ (v1_9_to_v1_10 as fn(&Path) -> Result<(), anyhow::Error>, "1", "10", "0"), (v1_10_to_v1_11, "1", "11", "0"), + (v1_11_to_v1_12, "1", "12", "0"), ]; let (current_major, current_minor, current_patch) = &self.current_version; @@ -33,6 +36,7 @@ impl OfflineUpgrade { ) { ("1", "9", _) => 0, ("1", "10", _) => 1, + ("1", "11", _) => 2, _ => { bail!("Unsupported current version {current_major}.{current_minor}.{current_patch}. Can only upgrade from v1.9 and v1.10") } @@ -43,6 +47,7 @@ impl OfflineUpgrade { let ends_at = match (target_major.as_str(), target_minor.as_str(), target_patch.as_str()) { ("1", "10", _) => 0, ("1", "11", _) => 1, + ("1", "12", _) => 2, (major, _, _) if major.starts_with('v') => { bail!("Target version must not starts with a `v`. Instead of writing `v1.9.0` write `1.9.0` for example.") } diff --git a/crates/meilitool/src/upgrade/v1_12.rs b/crates/meilitool/src/upgrade/v1_12.rs new file mode 100644 index 000000000..ab97f417b --- /dev/null +++ b/crates/meilitool/src/upgrade/v1_12.rs @@ -0,0 +1,63 @@ +//! The breaking changes that happened between the v1.11 and the v1.12 are: +//! - The new indexer changed the update files format from OBKV to ndjson. https://github.com/meilisearch/meilisearch/pull/4900 + +use std::{io::BufWriter, path::Path}; + +use anyhow::Context; +use file_store::FileStore; +use indexmap::IndexMap; +use meilisearch_types::milli::documents::DocumentsBatchReader; +use serde_json::value::RawValue; +use tempfile::NamedTempFile; + +pub fn v1_11_to_v1_12(db_path: &Path) -> anyhow::Result<()> { + println!("Upgrading from v1.11.0 to v1.12.0"); + + convert_update_files(db_path)?; + + Ok(()) +} + +/// Convert the update files from OBKV to ndjson format. +/// +/// 1) List all the update files using the file store. +/// 2) For each update file, read the update file into a DocumentsBatchReader. +/// 3) For each document in the update file, convert the document to a JSON object. +/// 4) Write the JSON object to a tmp file in the update files directory. +/// 5) Persist the tmp file replacing the old update file. +fn convert_update_files(db_path: &Path) -> anyhow::Result<()> { + let update_files_dir_path = db_path.join("update_files"); + let file_store = FileStore::new(&update_files_dir_path)?; + + for uuid in file_store.all_uuids()? { + let uuid = uuid?; + let update_file_path = file_store.get_update_path(uuid); + let update_file = file_store.get_update(uuid)?; + + let mut file = NamedTempFile::new_in(&update_files_dir_path).map(BufWriter::new)?; + + let reader = DocumentsBatchReader::from_reader(update_file)?; + let (mut cursor, index) = reader.into_cursor_and_fields_index(); + + while let Some(document) = cursor.next_document()? { + let mut json_document = IndexMap::new(); + for (fid, value) in document { + let field_name = index + .name(fid) + .with_context(|| format!("while getting field name for fid {fid}"))?; + let value: &RawValue = serde_json::from_slice(value)?; + json_document.insert(field_name, value); + } + + serde_json::to_writer(&mut file, &json_document)?; + } + + let file = file + .into_inner() + .map_err(|e| e.into_error()) + .context("while flushing update file bufwriter")?; + let _ = file.persist(update_file_path)?; + } + + Ok(()) +} From c614d0dd353947c46de2da8635e6e4b8e0b8404c Mon Sep 17 00:00:00 2001 From: ManyTheFish Date: Wed, 11 Dec 2024 09:54:34 +0100 Subject: [PATCH 2/3] Add context when returning an error --- crates/meilitool/src/upgrade/v1_12.rs | 40 ++++++++++++++++++--------- 1 file changed, 27 insertions(+), 13 deletions(-) diff --git a/crates/meilitool/src/upgrade/v1_12.rs b/crates/meilitool/src/upgrade/v1_12.rs index ab97f417b..77060d90d 100644 --- a/crates/meilitool/src/upgrade/v1_12.rs +++ b/crates/meilitool/src/upgrade/v1_12.rs @@ -27,24 +27,37 @@ pub fn v1_11_to_v1_12(db_path: &Path) -> anyhow::Result<()> { /// 5) Persist the tmp file replacing the old update file. fn convert_update_files(db_path: &Path) -> anyhow::Result<()> { let update_files_dir_path = db_path.join("update_files"); - let file_store = FileStore::new(&update_files_dir_path)?; + let file_store = FileStore::new(&update_files_dir_path).with_context(|| { + format!("while creating file store for update files dir {update_files_dir_path:?}") + })?; - for uuid in file_store.all_uuids()? { - let uuid = uuid?; + for uuid in file_store.all_uuids().context("while retrieving uuids from file store")? { + let uuid = uuid.context("while retrieving uuid from file store")?; let update_file_path = file_store.get_update_path(uuid); - let update_file = file_store.get_update(uuid)?; + let update_file = file_store + .get_update(uuid) + .with_context(|| format!("while getting update file for uuid {uuid:?}"))?; - let mut file = NamedTempFile::new_in(&update_files_dir_path).map(BufWriter::new)?; + let mut file = + NamedTempFile::new_in(&update_files_dir_path).map(BufWriter::new).with_context( + || format!("while creating bufwriter for update file {update_file_path:?}"), + )?; - let reader = DocumentsBatchReader::from_reader(update_file)?; + let reader = DocumentsBatchReader::from_reader(update_file).with_context(|| { + format!("while creating documents batch reader for update file {update_file_path:?}") + })?; let (mut cursor, index) = reader.into_cursor_and_fields_index(); - while let Some(document) = cursor.next_document()? { + while let Some(document) = cursor.next_document().with_context(|| { + format!( + "while reading documents from batch reader for update file {update_file_path:?}" + ) + })? { let mut json_document = IndexMap::new(); for (fid, value) in document { let field_name = index .name(fid) - .with_context(|| format!("while getting field name for fid {fid}"))?; + .with_context(|| format!("while getting field name for fid {fid} for update file {update_file_path:?}"))?; let value: &RawValue = serde_json::from_slice(value)?; json_document.insert(field_name, value); } @@ -52,11 +65,12 @@ fn convert_update_files(db_path: &Path) -> anyhow::Result<()> { serde_json::to_writer(&mut file, &json_document)?; } - let file = file - .into_inner() - .map_err(|e| e.into_error()) - .context("while flushing update file bufwriter")?; - let _ = file.persist(update_file_path)?; + let file = file.into_inner().map_err(|e| e.into_error()).context(format!( + "while flushing update file bufwriter for update file {update_file_path:?}" + ))?; + let _ = file + .persist(&update_file_path) + .with_context(|| format!("while persisting update file {update_file_path:?}"))?; } Ok(()) From 5c492031d9155139191e1b175259db86f7aead06 Mon Sep 17 00:00:00 2001 From: Many the fish Date: Wed, 11 Dec 2024 14:34:18 +0100 Subject: [PATCH 3/3] Update crates/meilitool/src/upgrade/v1_12.rs Co-authored-by: Louis Dureuil --- crates/meilitool/src/upgrade/v1_12.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/meilitool/src/upgrade/v1_12.rs b/crates/meilitool/src/upgrade/v1_12.rs index 77060d90d..85fb41472 100644 --- a/crates/meilitool/src/upgrade/v1_12.rs +++ b/crates/meilitool/src/upgrade/v1_12.rs @@ -69,6 +69,7 @@ fn convert_update_files(db_path: &Path) -> anyhow::Result<()> { "while flushing update file bufwriter for update file {update_file_path:?}" ))?; let _ = file + // atomically replace the obkv file with the rewritten NDJSON file .persist(&update_file_path) .with_context(|| format!("while persisting update file {update_file_path:?}"))?; }