From 3c9483b6e0be480c4acbe415b8958a0ecb689072 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Mon, 20 Jan 2025 10:09:02 +0100 Subject: [PATCH] meilitool dumps old-style dump for older DBs, otherwise new-style --- crates/meilitool/src/main.rs | 43 +++++++++++++++++++++++++++--------- 1 file changed, 33 insertions(+), 10 deletions(-) diff --git a/crates/meilitool/src/main.rs b/crates/meilitool/src/main.rs index 0f7702f9d..743fe552e 100644 --- a/crates/meilitool/src/main.rs +++ b/crates/meilitool/src/main.rs @@ -9,6 +9,7 @@ use file_store::FileStore; use meilisearch_auth::AuthController; use meilisearch_types::heed::types::{SerdeJson, Str}; use meilisearch_types::heed::{Database, Env, EnvOpenOptions, RoTxn, RwTxn, Unspecified}; +use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader}; use meilisearch_types::milli::{obkv_to_json, BEU32}; use meilisearch_types::tasks::{Status, Task}; use meilisearch_types::versioning::{get_version, parse_version}; @@ -87,7 +88,7 @@ fn main() -> anyhow::Result<()> { match command { Command::ClearTaskQueue => clear_task_queue(db_path), Command::ExportADump { dump_dir, skip_enqueued_tasks } => { - export_a_dump(db_path, dump_dir, skip_enqueued_tasks) + export_a_dump(db_path, dump_dir, skip_enqueued_tasks, detected_version) } Command::OfflineUpgrade { target_version } => { let target_version = parse_version(&target_version).context("While parsing `--target-version`. Make sure `--target-version` is in the format MAJOR.MINOR.PATCH")?; @@ -186,6 +187,7 @@ fn export_a_dump( db_path: PathBuf, dump_dir: PathBuf, skip_enqueued_tasks: bool, + detected_version: (String, String, String), ) -> Result<(), anyhow::Error> { let started_at = OffsetDateTime::now_utc(); @@ -237,9 +239,6 @@ fn export_a_dump( if skip_enqueued_tasks { eprintln!("Skip dumping the enqueued tasks..."); } else { - eprintln!("Dumping the enqueued tasks..."); - - // 3. dump the tasks let mut dump_tasks = dump.create_tasks_queue()?; let mut count = 0; for ret in all_tasks.iter(&rtxn)? { @@ -253,13 +252,37 @@ fn export_a_dump( if status == Status::Enqueued { let content_file = file_store.get_update(content_file_uuid)?; - for document in - serde_json::de::Deserializer::from_reader(content_file).into_iter() + if ( + detected_version.0.as_str(), + detected_version.1.as_str(), + detected_version.2.as_str(), + ) < ("1", "12", "0") { - let document = document.with_context(|| { - format!("While reading content file {:?}", content_file_uuid) - })?; - dump_content_file.push_document(&document)?; + eprintln!("Dumping the enqueued tasks reading them in obkv format..."); + let reader = + DocumentsBatchReader::from_reader(content_file).with_context(|| { + format!("While reading content file {:?}", content_file_uuid) + })?; + let (mut cursor, documents_batch_index) = + reader.into_cursor_and_fields_index(); + while let Some(doc) = cursor.next_document().with_context(|| { + format!("While iterating on content file {:?}", content_file_uuid) + })? { + dump_content_file + .push_document(&obkv_to_object(doc, &documents_batch_index)?)?; + } + } else { + eprintln!( + "Dumping the enqueued tasks reading them in JSON stream format..." + ); + for document in + serde_json::de::Deserializer::from_reader(content_file).into_iter() + { + let document = document.with_context(|| { + format!("While reading content file {:?}", content_file_uuid) + })?; + dump_content_file.push_document(&document)?; + } } dump_content_file.flush()?;