diff --git a/crates/index-scheduler/src/batch.rs b/crates/index-scheduler/src/batch.rs index e23d1c1bb..89c7f6f45 100644 --- a/crates/index-scheduler/src/batch.rs +++ b/crates/index-scheduler/src/batch.rs @@ -29,7 +29,7 @@ use bumpalo::Bump; use dump::IndexMetadata; use meilisearch_types::batches::BatchId; use meilisearch_types::heed::{RoTxn, RwTxn}; -use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader, PrimaryKey}; +use meilisearch_types::milli::documents::PrimaryKey; use meilisearch_types::milli::heed::CompactionOption; use meilisearch_types::milli::progress::Progress; use meilisearch_types::milli::update::new::indexer::{self, UpdateByFunction}; @@ -819,6 +819,13 @@ impl IndexScheduler { t.started_at = Some(started_at); t.finished_at = Some(finished_at); } + + // Patch the task to remove the batch uid, because as of v1.12.5 batches are not persisted. + // This prevent from referencing *future* batches not actually associated with the task. + // + // See for details. + t.batch_uid = None; + let mut dump_content_file = dump_tasks.push_task(&t.into())?; // 2.1. Dump the `content_file` associated with the task if there is one and the task is not finished yet. @@ -829,21 +836,18 @@ impl IndexScheduler { if status == Status::Enqueued { let content_file = self.file_store.get_update(content_file)?; - let reader = DocumentsBatchReader::from_reader(content_file) - .map_err(|e| Error::from_milli(e.into(), None))?; - - let (mut cursor, documents_batch_index) = - reader.into_cursor_and_fields_index(); - - while let Some(doc) = cursor - .next_document() - .map_err(|e| Error::from_milli(e.into(), None))? + for document in + serde_json::de::Deserializer::from_reader(content_file).into_iter() { - dump_content_file.push_document( - &obkv_to_object(doc, &documents_batch_index) - .map_err(|e| Error::from_milli(e, None))?, - )?; + let document = document.map_err(|e| { + Error::from_milli( + milli::InternalError::SerdeJson(e).into(), + None, + ) + })?; + dump_content_file.push_document(&document)?; } + dump_content_file.flush()?; } } diff --git a/crates/index-scheduler/src/lib.rs b/crates/index-scheduler/src/lib.rs index f5f73087d..ac51e584a 100644 --- a/crates/index-scheduler/src/lib.rs +++ b/crates/index-scheduler/src/lib.rs @@ -55,7 +55,6 @@ use meilisearch_types::features::{InstanceTogglableFeatures, RuntimeTogglableFea use meilisearch_types::heed::byteorder::BE; use meilisearch_types::heed::types::{SerdeBincode, SerdeJson, Str, I128}; use meilisearch_types::heed::{self, Database, Env, PutFlags, RoTxn, RwTxn}; -use meilisearch_types::milli::documents::DocumentsBatchBuilder; use meilisearch_types::milli::index::IndexEmbeddingConfig; use meilisearch_types::milli::update::IndexerConfig; use meilisearch_types::milli::vector::{Embedder, EmbedderOptions, EmbeddingConfigs}; @@ -2017,14 +2016,19 @@ impl<'a> Dump<'a> { task: TaskDump, content_file: Option>, ) -> Result { + let task_has_no_docs = matches!(task.kind, KindDump::DocumentImport { documents_count, .. } if documents_count == 0); + let content_uuid = match content_file { Some(content_file) if task.status == Status::Enqueued => { - let (uuid, mut file) = self.index_scheduler.create_update_file(false)?; - let mut builder = DocumentsBatchBuilder::new(&mut file); + let (uuid, file) = self.index_scheduler.create_update_file(false)?; + let mut writer = io::BufWriter::new(file); for doc in content_file { - builder.append_json_object(&doc?)?; + let doc = doc?; + serde_json::to_writer(&mut writer, &doc).map_err(|e| { + Error::from_milli(milli::InternalError::SerdeJson(e).into(), None) + })?; } - builder.into_inner()?; + let file = writer.into_inner().map_err(|e| e.into_error())?; file.persist()?; Some(uuid) @@ -2032,6 +2036,12 @@ impl<'a> Dump<'a> { // If the task isn't `Enqueued` then just generate a recognisable `Uuid` // in case we try to open it later. _ if task.status != Status::Enqueued => Some(Uuid::nil()), + None if task.status == Status::Enqueued && task_has_no_docs => { + let (uuid, file) = self.index_scheduler.create_update_file(false)?; + file.persist()?; + + Some(uuid) + } _ => None, }; diff --git a/crates/meilitool/src/main.rs b/crates/meilitool/src/main.rs index 44eb4960e..743fe552e 100644 --- a/crates/meilitool/src/main.rs +++ b/crates/meilitool/src/main.rs @@ -88,7 +88,7 @@ fn main() -> anyhow::Result<()> { match command { Command::ClearTaskQueue => clear_task_queue(db_path), Command::ExportADump { dump_dir, skip_enqueued_tasks } => { - export_a_dump(db_path, dump_dir, skip_enqueued_tasks) + export_a_dump(db_path, dump_dir, skip_enqueued_tasks, detected_version) } Command::OfflineUpgrade { target_version } => { let target_version = parse_version(&target_version).context("While parsing `--target-version`. Make sure `--target-version` is in the format MAJOR.MINOR.PATCH")?; @@ -187,6 +187,7 @@ fn export_a_dump( db_path: PathBuf, dump_dir: PathBuf, skip_enqueued_tasks: bool, + detected_version: (String, String, String), ) -> Result<(), anyhow::Error> { let started_at = OffsetDateTime::now_utc(); @@ -238,9 +239,6 @@ fn export_a_dump( if skip_enqueued_tasks { eprintln!("Skip dumping the enqueued tasks..."); } else { - eprintln!("Dumping the enqueued tasks..."); - - // 3. dump the tasks let mut dump_tasks = dump.create_tasks_queue()?; let mut count = 0; for ret in all_tasks.iter(&rtxn)? { @@ -254,18 +252,39 @@ fn export_a_dump( if status == Status::Enqueued { let content_file = file_store.get_update(content_file_uuid)?; - let reader = - DocumentsBatchReader::from_reader(content_file).with_context(|| { - format!("While reading content file {:?}", content_file_uuid) - })?; - - let (mut cursor, documents_batch_index) = reader.into_cursor_and_fields_index(); - while let Some(doc) = cursor.next_document().with_context(|| { - format!("While iterating on content file {:?}", content_file_uuid) - })? { - dump_content_file - .push_document(&obkv_to_object(doc, &documents_batch_index)?)?; + if ( + detected_version.0.as_str(), + detected_version.1.as_str(), + detected_version.2.as_str(), + ) < ("1", "12", "0") + { + eprintln!("Dumping the enqueued tasks reading them in obkv format..."); + let reader = + DocumentsBatchReader::from_reader(content_file).with_context(|| { + format!("While reading content file {:?}", content_file_uuid) + })?; + let (mut cursor, documents_batch_index) = + reader.into_cursor_and_fields_index(); + while let Some(doc) = cursor.next_document().with_context(|| { + format!("While iterating on content file {:?}", content_file_uuid) + })? { + dump_content_file + .push_document(&obkv_to_object(doc, &documents_batch_index)?)?; + } + } else { + eprintln!( + "Dumping the enqueued tasks reading them in JSON stream format..." + ); + for document in + serde_json::de::Deserializer::from_reader(content_file).into_iter() + { + let document = document.with_context(|| { + format!("While reading content file {:?}", content_file_uuid) + })?; + dump_content_file.push_document(&document)?; + } } + dump_content_file.flush()?; count += 1; } diff --git a/crates/meilitool/src/upgrade/mod.rs b/crates/meilitool/src/upgrade/mod.rs index 47ca2cbd9..2d5230341 100644 --- a/crates/meilitool/src/upgrade/mod.rs +++ b/crates/meilitool/src/upgrade/mod.rs @@ -20,6 +20,34 @@ pub struct OfflineUpgrade { impl OfflineUpgrade { pub fn upgrade(self) -> anyhow::Result<()> { + // Adding a version? + // + // 1. Update the LAST_SUPPORTED_UPGRADE_FROM_VERSION and LAST_SUPPORTED_UPGRADE_TO_VERSION. + // 2. Add new version to the upgrade list if necessary + // 3. Use `no_upgrade` as index for versions that are compatible. + + if self.current_version == self.target_version { + println!("Database is already at the target version. Exiting."); + return Ok(()); + } + + if self.current_version > self.target_version { + bail!( + "Cannot downgrade from {}.{}.{} to {}.{}.{}. Downgrade not supported", + self.current_version.0, + self.current_version.1, + self.current_version.2, + self.target_version.0, + self.target_version.1, + self.target_version.2 + ); + } + + const FIRST_SUPPORTED_UPGRADE_FROM_VERSION: &str = "1.9.0"; + const LAST_SUPPORTED_UPGRADE_FROM_VERSION: &str = "1.12.5"; + const FIRST_SUPPORTED_UPGRADE_TO_VERSION: &str = "1.10.0"; + const LAST_SUPPORTED_UPGRADE_TO_VERSION: &str = "1.12.5"; + let upgrade_list = [ ( v1_9_to_v1_10 as fn(&Path, &str, &str, &str) -> Result<(), anyhow::Error>, @@ -32,6 +60,8 @@ impl OfflineUpgrade { (v1_12_to_v1_12_3, "1", "12", "3"), ]; + let no_upgrade: usize = upgrade_list.len(); + let (current_major, current_minor, current_patch) = &self.current_version; let start_at = match ( @@ -42,9 +72,12 @@ impl OfflineUpgrade { ("1", "9", _) => 0, ("1", "10", _) => 1, ("1", "11", _) => 2, - ("1", "12", x) if x == "0" || x == "1" || x == "2" => 3, + ("1", "12", "0" | "1" | "2") => 3, + ("1", "12", "3" | "4" | "5") => no_upgrade, _ => { - bail!("Unsupported current version {current_major}.{current_minor}.{current_patch}. Can only upgrade from v1.9 and v1.10") + bail!("Unsupported current version {current_major}.{current_minor}.{current_patch}. Can only upgrade from versions in range [{}-{}]", + FIRST_SUPPORTED_UPGRADE_FROM_VERSION, + LAST_SUPPORTED_UPGRADE_FROM_VERSION); } }; @@ -53,18 +86,28 @@ impl OfflineUpgrade { let ends_at = match (target_major.as_str(), target_minor.as_str(), target_patch.as_str()) { ("1", "10", _) => 0, ("1", "11", _) => 1, - ("1", "12", x) if x == "0" || x == "1" || x == "2" => 2, - ("1", "12", "3") => 3, + ("1", "12", "0" | "1" | "2") => 2, + ("1", "12", "3" | "4" | "5") => 3, (major, _, _) if major.starts_with('v') => { bail!("Target version must not starts with a `v`. Instead of writing `v1.9.0` write `1.9.0` for example.") } _ => { - bail!("Unsupported target version {target_major}.{target_minor}.{target_patch}. Can only upgrade to v1.10 and v1.11") + bail!("Unsupported target version {target_major}.{target_minor}.{target_patch}. Can only upgrade to versions in range [{}-{}]", + FIRST_SUPPORTED_UPGRADE_TO_VERSION, + LAST_SUPPORTED_UPGRADE_TO_VERSION); } }; println!("Starting the upgrade from {current_major}.{current_minor}.{current_patch} to {target_major}.{target_minor}.{target_patch}"); + if start_at == no_upgrade { + println!("No upgrade operation to perform, writing VERSION file"); + create_version_file(&self.db_path, target_major, target_minor, target_patch) + .context("while writing VERSION file after the upgrade")?; + println!("Success"); + return Ok(()); + } + #[allow(clippy::needless_range_loop)] for index in start_at..=ends_at { let (func, major, minor, patch) = upgrade_list[index];