From 8a9f952bdab7cd701738c8e27a39ba96f03b0ee7 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Thu, 16 Jan 2025 16:54:05 +0100 Subject: [PATCH 01/10] Create update files in new format --- crates/index-scheduler/src/batch.rs | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/crates/index-scheduler/src/batch.rs b/crates/index-scheduler/src/batch.rs index e23d1c1bb..6255bf332 100644 --- a/crates/index-scheduler/src/batch.rs +++ b/crates/index-scheduler/src/batch.rs @@ -829,21 +829,18 @@ impl IndexScheduler { if status == Status::Enqueued { let content_file = self.file_store.get_update(content_file)?; - let reader = DocumentsBatchReader::from_reader(content_file) - .map_err(|e| Error::from_milli(e.into(), None))?; - - let (mut cursor, documents_batch_index) = - reader.into_cursor_and_fields_index(); - - while let Some(doc) = cursor - .next_document() - .map_err(|e| Error::from_milli(e.into(), None))? + for document in + serde_json::de::Deserializer::from_reader(content_file).into_iter() { - dump_content_file.push_document( - &obkv_to_object(doc, &documents_batch_index) - .map_err(|e| Error::from_milli(e, None))?, - )?; + let document = document.map_err(|e| { + Error::from_milli( + milli::InternalError::SerdeJson(e).into(), + None, + ) + })?; + dump_content_file.push_document(&document)?; } + dump_content_file.flush()?; } } From 6383f8f19e6979299dc47ac35ef49db41979e224 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Thu, 16 Jan 2025 16:54:44 +0100 Subject: [PATCH 02/10] Do not explode on missing content file if the task has no docs --- crates/index-scheduler/src/lib.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/crates/index-scheduler/src/lib.rs b/crates/index-scheduler/src/lib.rs index f5f73087d..8a789da57 100644 --- a/crates/index-scheduler/src/lib.rs +++ b/crates/index-scheduler/src/lib.rs @@ -2017,6 +2017,11 @@ impl<'a> Dump<'a> { task: TaskDump, content_file: Option>, ) -> Result { + let task_has_no_docs = match task.kind { + KindDump::DocumentImport { documents_count, .. } if documents_count == 0 => true, + _ => false, + }; + let content_uuid = match content_file { Some(content_file) if task.status == Status::Enqueued => { let (uuid, mut file) = self.index_scheduler.create_update_file(false)?; @@ -2032,6 +2037,14 @@ impl<'a> Dump<'a> { // If the task isn't `Enqueued` then just generate a recognisable `Uuid` // in case we try to open it later. _ if task.status != Status::Enqueued => Some(Uuid::nil()), + None if task.status == Status::Enqueued && task_has_no_docs => { + let (uuid, mut file) = self.index_scheduler.create_update_file(false)?; + let builder = DocumentsBatchBuilder::new(&mut file); + builder.into_inner()?; + file.persist()?; + + Some(uuid) + } _ => None, }; From 1cadab9ad8d87a3cf3716fd6359a04c379f63c60 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Thu, 16 Jan 2025 16:55:04 +0100 Subject: [PATCH 03/10] Also fix dump import from meilitool --- crates/meilitool/src/main.rs | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/crates/meilitool/src/main.rs b/crates/meilitool/src/main.rs index 44eb4960e..053e83b96 100644 --- a/crates/meilitool/src/main.rs +++ b/crates/meilitool/src/main.rs @@ -254,18 +254,15 @@ fn export_a_dump( if status == Status::Enqueued { let content_file = file_store.get_update(content_file_uuid)?; - let reader = - DocumentsBatchReader::from_reader(content_file).with_context(|| { + for document in + serde_json::de::Deserializer::from_reader(content_file).into_iter() + { + let document = document.with_context(|| { format!("While reading content file {:?}", content_file_uuid) })?; - - let (mut cursor, documents_batch_index) = reader.into_cursor_and_fields_index(); - while let Some(doc) = cursor.next_document().with_context(|| { - format!("While iterating on content file {:?}", content_file_uuid) - })? { - dump_content_file - .push_document(&obkv_to_object(doc, &documents_batch_index)?)?; + dump_content_file.push_document(&document)?; } + dump_content_file.flush()?; count += 1; } From cea0c89212ac9ed8b1e0ef4cb531a58c422fb542 Mon Sep 17 00:00:00 2001 From: ManyTheFish Date: Thu, 16 Jan 2025 18:05:29 +0100 Subject: [PATCH 04/10] Change format of update file when importing dump --- crates/index-scheduler/src/batch.rs | 2 +- crates/index-scheduler/src/lib.rs | 11 +++++++---- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/crates/index-scheduler/src/batch.rs b/crates/index-scheduler/src/batch.rs index 6255bf332..3d45ce1fe 100644 --- a/crates/index-scheduler/src/batch.rs +++ b/crates/index-scheduler/src/batch.rs @@ -29,7 +29,7 @@ use bumpalo::Bump; use dump::IndexMetadata; use meilisearch_types::batches::BatchId; use meilisearch_types::heed::{RoTxn, RwTxn}; -use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader, PrimaryKey}; +use meilisearch_types::milli::documents::PrimaryKey; use meilisearch_types::milli::heed::CompactionOption; use meilisearch_types::milli::progress::Progress; use meilisearch_types::milli::update::new::indexer::{self, UpdateByFunction}; diff --git a/crates/index-scheduler/src/lib.rs b/crates/index-scheduler/src/lib.rs index 8a789da57..ca959d74c 100644 --- a/crates/index-scheduler/src/lib.rs +++ b/crates/index-scheduler/src/lib.rs @@ -2024,12 +2024,15 @@ impl<'a> Dump<'a> { let content_uuid = match content_file { Some(content_file) if task.status == Status::Enqueued => { - let (uuid, mut file) = self.index_scheduler.create_update_file(false)?; - let mut builder = DocumentsBatchBuilder::new(&mut file); + let (uuid, file) = self.index_scheduler.create_update_file(false)?; + let mut writer = io::BufWriter::new(file); for doc in content_file { - builder.append_json_object(&doc?)?; + let doc = doc?; + serde_json::to_writer(&mut writer, &doc).map_err(|e| { + Error::from_milli(milli::InternalError::SerdeJson(e).into(), None) + })?; } - builder.into_inner()?; + let file = writer.into_inner().map_err(|e| e.into_error())?; file.persist()?; Some(uuid) From 289eb92bef4d382cf08821a44fecac6b582dc444 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Thu, 16 Jan 2025 18:28:32 +0100 Subject: [PATCH 05/10] Fix warnings --- crates/index-scheduler/src/lib.rs | 1 - crates/meilitool/src/main.rs | 1 - 2 files changed, 2 deletions(-) diff --git a/crates/index-scheduler/src/lib.rs b/crates/index-scheduler/src/lib.rs index ca959d74c..b493b50cd 100644 --- a/crates/index-scheduler/src/lib.rs +++ b/crates/index-scheduler/src/lib.rs @@ -55,7 +55,6 @@ use meilisearch_types::features::{InstanceTogglableFeatures, RuntimeTogglableFea use meilisearch_types::heed::byteorder::BE; use meilisearch_types::heed::types::{SerdeBincode, SerdeJson, Str, I128}; use meilisearch_types::heed::{self, Database, Env, PutFlags, RoTxn, RwTxn}; -use meilisearch_types::milli::documents::DocumentsBatchBuilder; use meilisearch_types::milli::index::IndexEmbeddingConfig; use meilisearch_types::milli::update::IndexerConfig; use meilisearch_types::milli::vector::{Embedder, EmbedderOptions, EmbeddingConfigs}; diff --git a/crates/meilitool/src/main.rs b/crates/meilitool/src/main.rs index 053e83b96..0f7702f9d 100644 --- a/crates/meilitool/src/main.rs +++ b/crates/meilitool/src/main.rs @@ -9,7 +9,6 @@ use file_store::FileStore; use meilisearch_auth::AuthController; use meilisearch_types::heed::types::{SerdeJson, Str}; use meilisearch_types::heed::{Database, Env, EnvOpenOptions, RoTxn, RwTxn, Unspecified}; -use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader}; use meilisearch_types::milli::{obkv_to_json, BEU32}; use meilisearch_types::tasks::{Status, Task}; use meilisearch_types::versioning::{get_version, parse_version}; From 11458eefd9adbe3f672ed3a35edcd2b06e99b8e1 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Thu, 16 Jan 2025 18:28:43 +0100 Subject: [PATCH 06/10] Handle empty payloads --- crates/index-scheduler/src/lib.rs | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/crates/index-scheduler/src/lib.rs b/crates/index-scheduler/src/lib.rs index b493b50cd..ac51e584a 100644 --- a/crates/index-scheduler/src/lib.rs +++ b/crates/index-scheduler/src/lib.rs @@ -2016,10 +2016,7 @@ impl<'a> Dump<'a> { task: TaskDump, content_file: Option>, ) -> Result { - let task_has_no_docs = match task.kind { - KindDump::DocumentImport { documents_count, .. } if documents_count == 0 => true, - _ => false, - }; + let task_has_no_docs = matches!(task.kind, KindDump::DocumentImport { documents_count, .. } if documents_count == 0); let content_uuid = match content_file { Some(content_file) if task.status == Status::Enqueued => { @@ -2040,9 +2037,7 @@ impl<'a> Dump<'a> { // in case we try to open it later. _ if task.status != Status::Enqueued => Some(Uuid::nil()), None if task.status == Status::Enqueued && task_has_no_docs => { - let (uuid, mut file) = self.index_scheduler.create_update_file(false)?; - let builder = DocumentsBatchBuilder::new(&mut file); - builder.into_inner()?; + let (uuid, file) = self.index_scheduler.create_update_file(false)?; file.persist()?; Some(uuid) From 3c9483b6e0be480c4acbe415b8958a0ecb689072 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Mon, 20 Jan 2025 10:09:02 +0100 Subject: [PATCH 07/10] meilitool dumps old-style dump for older DBs, otherwise new-style --- crates/meilitool/src/main.rs | 43 +++++++++++++++++++++++++++--------- 1 file changed, 33 insertions(+), 10 deletions(-) diff --git a/crates/meilitool/src/main.rs b/crates/meilitool/src/main.rs index 0f7702f9d..743fe552e 100644 --- a/crates/meilitool/src/main.rs +++ b/crates/meilitool/src/main.rs @@ -9,6 +9,7 @@ use file_store::FileStore; use meilisearch_auth::AuthController; use meilisearch_types::heed::types::{SerdeJson, Str}; use meilisearch_types::heed::{Database, Env, EnvOpenOptions, RoTxn, RwTxn, Unspecified}; +use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader}; use meilisearch_types::milli::{obkv_to_json, BEU32}; use meilisearch_types::tasks::{Status, Task}; use meilisearch_types::versioning::{get_version, parse_version}; @@ -87,7 +88,7 @@ fn main() -> anyhow::Result<()> { match command { Command::ClearTaskQueue => clear_task_queue(db_path), Command::ExportADump { dump_dir, skip_enqueued_tasks } => { - export_a_dump(db_path, dump_dir, skip_enqueued_tasks) + export_a_dump(db_path, dump_dir, skip_enqueued_tasks, detected_version) } Command::OfflineUpgrade { target_version } => { let target_version = parse_version(&target_version).context("While parsing `--target-version`. Make sure `--target-version` is in the format MAJOR.MINOR.PATCH")?; @@ -186,6 +187,7 @@ fn export_a_dump( db_path: PathBuf, dump_dir: PathBuf, skip_enqueued_tasks: bool, + detected_version: (String, String, String), ) -> Result<(), anyhow::Error> { let started_at = OffsetDateTime::now_utc(); @@ -237,9 +239,6 @@ fn export_a_dump( if skip_enqueued_tasks { eprintln!("Skip dumping the enqueued tasks..."); } else { - eprintln!("Dumping the enqueued tasks..."); - - // 3. dump the tasks let mut dump_tasks = dump.create_tasks_queue()?; let mut count = 0; for ret in all_tasks.iter(&rtxn)? { @@ -253,13 +252,37 @@ fn export_a_dump( if status == Status::Enqueued { let content_file = file_store.get_update(content_file_uuid)?; - for document in - serde_json::de::Deserializer::from_reader(content_file).into_iter() + if ( + detected_version.0.as_str(), + detected_version.1.as_str(), + detected_version.2.as_str(), + ) < ("1", "12", "0") { - let document = document.with_context(|| { - format!("While reading content file {:?}", content_file_uuid) - })?; - dump_content_file.push_document(&document)?; + eprintln!("Dumping the enqueued tasks reading them in obkv format..."); + let reader = + DocumentsBatchReader::from_reader(content_file).with_context(|| { + format!("While reading content file {:?}", content_file_uuid) + })?; + let (mut cursor, documents_batch_index) = + reader.into_cursor_and_fields_index(); + while let Some(doc) = cursor.next_document().with_context(|| { + format!("While iterating on content file {:?}", content_file_uuid) + })? { + dump_content_file + .push_document(&obkv_to_object(doc, &documents_batch_index)?)?; + } + } else { + eprintln!( + "Dumping the enqueued tasks reading them in JSON stream format..." + ); + for document in + serde_json::de::Deserializer::from_reader(content_file).into_iter() + { + let document = document.with_context(|| { + format!("While reading content file {:?}", content_file_uuid) + })?; + dump_content_file.push_document(&document)?; + } } dump_content_file.flush()?; From 34d8c1a903dc7f926291118c5286f399ffdc9f57 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Mon, 20 Jan 2025 10:32:20 +0100 Subject: [PATCH 08/10] Make offline upgrade more flexible --- crates/meilitool/src/upgrade/mod.rs | 49 +++++++++++++++++++++++++++-- 1 file changed, 46 insertions(+), 3 deletions(-) diff --git a/crates/meilitool/src/upgrade/mod.rs b/crates/meilitool/src/upgrade/mod.rs index 47ca2cbd9..51cb5f454 100644 --- a/crates/meilitool/src/upgrade/mod.rs +++ b/crates/meilitool/src/upgrade/mod.rs @@ -20,6 +20,34 @@ pub struct OfflineUpgrade { impl OfflineUpgrade { pub fn upgrade(self) -> anyhow::Result<()> { + // Adding a version? + // + // 1. Update the LAST_SUPPORTED_UPGRADE_FROM_VERSION and LAST_SUPPORTED_UPGRADE_TO_VERSION. + // 2. Add new version to the upgrade list if necessary + // 3. Use `no_upgrade` as index for versions that are compatible. + + if self.current_version == self.target_version { + println!("Database is already at the target version. Exiting."); + return Ok(()); + } + + if self.current_version > self.target_version { + bail!( + "Cannot downgrade from {}.{}.{} to {}.{}.{}. Downgrade not supported", + self.current_version.0, + self.current_version.1, + self.current_version.2, + self.target_version.0, + self.target_version.1, + self.target_version.2 + ); + } + + const FIRST_SUPPORTED_UPGRADE_FROM_VERSION: &str = "1.9.0"; + const LAST_SUPPORTED_UPGRADE_FROM_VERSION: &str = "1.12.5"; + const FIRST_SUPPORTED_UPGRADE_TO_VERSION: &str = "1.10.0"; + const LAST_SUPPORTED_UPGRADE_TO_VERSION: &str = "1.12.5"; + let upgrade_list = [ ( v1_9_to_v1_10 as fn(&Path, &str, &str, &str) -> Result<(), anyhow::Error>, @@ -32,6 +60,8 @@ impl OfflineUpgrade { (v1_12_to_v1_12_3, "1", "12", "3"), ]; + let no_upgrade: usize = upgrade_list.len(); + let (current_major, current_minor, current_patch) = &self.current_version; let start_at = match ( @@ -43,8 +73,11 @@ impl OfflineUpgrade { ("1", "10", _) => 1, ("1", "11", _) => 2, ("1", "12", x) if x == "0" || x == "1" || x == "2" => 3, + ("1", "12", x) if x == "3" || x == "4" || x == "5" => no_upgrade, _ => { - bail!("Unsupported current version {current_major}.{current_minor}.{current_patch}. Can only upgrade from v1.9 and v1.10") + bail!("Unsupported current version {current_major}.{current_minor}.{current_patch}. Can only upgrade from versions in range [{}-{}]", + FIRST_SUPPORTED_UPGRADE_FROM_VERSION, + LAST_SUPPORTED_UPGRADE_FROM_VERSION); } }; @@ -54,17 +87,27 @@ impl OfflineUpgrade { ("1", "10", _) => 0, ("1", "11", _) => 1, ("1", "12", x) if x == "0" || x == "1" || x == "2" => 2, - ("1", "12", "3") => 3, + ("1", "12", x) if x == "3" || x == "4" || x == "5" => 3, (major, _, _) if major.starts_with('v') => { bail!("Target version must not starts with a `v`. Instead of writing `v1.9.0` write `1.9.0` for example.") } _ => { - bail!("Unsupported target version {target_major}.{target_minor}.{target_patch}. Can only upgrade to v1.10 and v1.11") + bail!("Unsupported target version {target_major}.{target_minor}.{target_patch}. Can only upgrade to versions in range [{}-{}]", + FIRST_SUPPORTED_UPGRADE_TO_VERSION, + LAST_SUPPORTED_UPGRADE_TO_VERSION); } }; println!("Starting the upgrade from {current_major}.{current_minor}.{current_patch} to {target_major}.{target_minor}.{target_patch}"); + if start_at == no_upgrade { + println!("No upgrade operation to perform, writing VERSION file"); + create_version_file(&self.db_path, target_major, target_minor, target_patch) + .context("while writing VERSION file after the upgrade")?; + println!("Success"); + return Ok(()); + } + #[allow(clippy::needless_range_loop)] for index in start_at..=ends_at { let (func, major, minor, patch) = upgrade_list[index]; From 40f8c0d84032212f41be7f2fec52549696b6aa6c Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Mon, 20 Jan 2025 11:16:18 +0100 Subject: [PATCH 09/10] Remove batch ids on export --- crates/index-scheduler/src/batch.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/crates/index-scheduler/src/batch.rs b/crates/index-scheduler/src/batch.rs index 3d45ce1fe..89c7f6f45 100644 --- a/crates/index-scheduler/src/batch.rs +++ b/crates/index-scheduler/src/batch.rs @@ -819,6 +819,13 @@ impl IndexScheduler { t.started_at = Some(started_at); t.finished_at = Some(finished_at); } + + // Patch the task to remove the batch uid, because as of v1.12.5 batches are not persisted. + // This prevent from referencing *future* batches not actually associated with the task. + // + // See for details. + t.batch_uid = None; + let mut dump_content_file = dump_tasks.push_task(&t.into())?; // 2.1. Dump the `content_file` associated with the task if there is one and the task is not finished yet. From c55891f73bdb1b47827a8dfcba3528262f5ff50d Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Mon, 20 Jan 2025 11:45:03 +0100 Subject: [PATCH 10/10] Replace guards by OR patterns Co-authored-by: Tamo --- crates/meilitool/src/upgrade/mod.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/meilitool/src/upgrade/mod.rs b/crates/meilitool/src/upgrade/mod.rs index 51cb5f454..2d5230341 100644 --- a/crates/meilitool/src/upgrade/mod.rs +++ b/crates/meilitool/src/upgrade/mod.rs @@ -72,8 +72,8 @@ impl OfflineUpgrade { ("1", "9", _) => 0, ("1", "10", _) => 1, ("1", "11", _) => 2, - ("1", "12", x) if x == "0" || x == "1" || x == "2" => 3, - ("1", "12", x) if x == "3" || x == "4" || x == "5" => no_upgrade, + ("1", "12", "0" | "1" | "2") => 3, + ("1", "12", "3" | "4" | "5") => no_upgrade, _ => { bail!("Unsupported current version {current_major}.{current_minor}.{current_patch}. Can only upgrade from versions in range [{}-{}]", FIRST_SUPPORTED_UPGRADE_FROM_VERSION, @@ -86,8 +86,8 @@ impl OfflineUpgrade { let ends_at = match (target_major.as_str(), target_minor.as_str(), target_patch.as_str()) { ("1", "10", _) => 0, ("1", "11", _) => 1, - ("1", "12", x) if x == "0" || x == "1" || x == "2" => 2, - ("1", "12", x) if x == "3" || x == "4" || x == "5" => 3, + ("1", "12", "0" | "1" | "2") => 2, + ("1", "12", "3" | "4" | "5") => 3, (major, _, _) if major.starts_with('v') => { bail!("Target version must not starts with a `v`. Instead of writing `v1.9.0` write `1.9.0` for example.") }