From fa00b42c93b5f5ac152300960b6024de870d127d Mon Sep 17 00:00:00 2001 From: Tamo Date: Tue, 4 Feb 2025 11:13:29 +0100 Subject: [PATCH 1/6] fix the missing batch in the dumps in meilisearch and meilitools --- crates/dump/README.md | 10 +- crates/dump/src/lib.rs | 37 +++++- crates/dump/src/reader/mod.rs | 39 ++++++ crates/dump/src/reader/v6/mod.rs | 24 +++- crates/dump/src/writer.rs | 49 +++++++- crates/index-scheduler/src/dump.rs | 88 +++++++++++++ crates/index-scheduler/src/processing.rs | 1 + .../src/scheduler/process_dump_creation.rs | 46 ++++++- crates/meilisearch-types/src/batches.rs | 18 ++- crates/meilisearch/src/lib.rs | 8 +- crates/meilitool/src/main.rs | 117 ++++++++++-------- 11 files changed, 367 insertions(+), 70 deletions(-) diff --git a/crates/dump/README.md b/crates/dump/README.md index 3537f188e..42d84ec80 100644 --- a/crates/dump/README.md +++ b/crates/dump/README.md @@ -10,8 +10,10 @@ dump ├── instance-uid.uuid ├── keys.jsonl ├── metadata.json -└── tasks - ├── update_files - │ └── [task_id].jsonl +├── tasks +│ ├── update_files +│ │ └── [task_id].jsonl +│ └── queue.jsonl +└── batches └── queue.jsonl -``` \ No newline at end of file +``` diff --git a/crates/dump/src/lib.rs b/crates/dump/src/lib.rs index ad2d96e1c..905a6485d 100644 --- a/crates/dump/src/lib.rs +++ b/crates/dump/src/lib.rs @@ -228,6 +228,7 @@ pub(crate) mod test { use big_s::S; use maplit::{btreemap, btreeset}; + use meilisearch_types::batches::{Batch, BatchEnqueuedAt, BatchStats}; use meilisearch_types::facet_values_sort::FacetValuesSort; use meilisearch_types::features::{Network, Remote, RuntimeTogglableFeatures}; use meilisearch_types::index_uid_pattern::IndexUidPattern; @@ -235,7 +236,8 @@ pub(crate) mod test { use meilisearch_types::milli; use meilisearch_types::milli::update::Setting; use meilisearch_types::settings::{Checked, FacetingSettings, Settings}; - use meilisearch_types::tasks::{Details, Status}; + use meilisearch_types::task_view::DetailsView; + use meilisearch_types::tasks::{Details, Kind, Status}; use serde_json::{json, Map, Value}; use time::macros::datetime; use uuid::Uuid; @@ -305,6 +307,30 @@ pub(crate) mod test { settings.check() } + pub fn create_test_batches() -> Vec { + vec![Batch { + uid: 0, + details: DetailsView { + received_documents: Some(12), + indexed_documents: Some(Some(10)), + ..DetailsView::default() + }, + progress: None, + stats: BatchStats { + total_nb_tasks: 1, + status: maplit::btreemap! { Status::Succeeded => 1 }, + types: maplit::btreemap! { Kind::DocumentAdditionOrUpdate => 1 }, + index_uids: maplit::btreemap! { "doggo".to_string() => 1 }, + }, + enqueued_at: Some(BatchEnqueuedAt { + earliest: datetime!(2022-11-11 0:00 UTC), + oldest: datetime!(2022-11-11 0:00 UTC), + }), + started_at: datetime!(2022-11-20 0:00 UTC), + finished_at: Some(datetime!(2022-11-21 0:00 UTC)), + }] + } + pub fn create_test_tasks() -> Vec<(TaskDump, Option>)> { vec![ ( @@ -427,6 +453,15 @@ pub(crate) mod test { index.flush().unwrap(); index.settings(&settings).unwrap(); + // ========== pushing the batch queue + let batches = create_test_batches(); + + let mut batch_queue = dump.create_batches_queue().unwrap(); + for batch in &batches { + batch_queue.push_batch(batch).unwrap(); + } + batch_queue.flush().unwrap(); + // ========== pushing the task queue let tasks = create_test_tasks(); diff --git a/crates/dump/src/reader/mod.rs b/crates/dump/src/reader/mod.rs index ec74fa4fd..2b4440ab7 100644 --- a/crates/dump/src/reader/mod.rs +++ b/crates/dump/src/reader/mod.rs @@ -102,6 +102,13 @@ impl DumpReader { } } + pub fn batches(&mut self) -> Result> + '_>> { + match self { + DumpReader::Current(current) => Ok(current.batches()), + DumpReader::Compat(_compat) => Ok(Box::new(std::iter::empty())), + } + } + pub fn keys(&mut self) -> Result> + '_>> { match self { DumpReader::Current(current) => Ok(current.keys()), @@ -227,6 +234,10 @@ pub(crate) mod test { insta::assert_snapshot!(dump.date().unwrap(), @"2024-05-16 15:51:34.151044 +00:00:00"); insta::assert_debug_snapshot!(dump.instance_uid().unwrap(), @"None"); + // batches didn't exists at the time + let batches = dump.batches().unwrap().collect::>>().unwrap(); + meili_snap::snapshot!(meili_snap::json_string!(batches), @"[]"); + // tasks let tasks = dump.tasks().unwrap().collect::>>().unwrap(); let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip(); @@ -348,6 +359,10 @@ pub(crate) mod test { insta::assert_snapshot!(dump.date().unwrap(), @"2023-07-06 7:10:27.21958 +00:00:00"); insta::assert_debug_snapshot!(dump.instance_uid().unwrap(), @"None"); + // batches didn't exists at the time + let batches = dump.batches().unwrap().collect::>>().unwrap(); + meili_snap::snapshot!(meili_snap::json_string!(batches), @"[]"); + // tasks let tasks = dump.tasks().unwrap().collect::>>().unwrap(); let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip(); @@ -412,6 +427,10 @@ pub(crate) mod test { insta::assert_snapshot!(dump.date().unwrap(), @"2022-10-04 15:55:10.344982459 +00:00:00"); insta::assert_snapshot!(dump.instance_uid().unwrap().unwrap(), @"9e15e977-f2ae-4761-943f-1eaf75fd736d"); + // batches didn't exists at the time + let batches = dump.batches().unwrap().collect::>>().unwrap(); + meili_snap::snapshot!(meili_snap::json_string!(batches), @"[]"); + // tasks let tasks = dump.tasks().unwrap().collect::>>().unwrap(); let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip(); @@ -492,6 +511,10 @@ pub(crate) mod test { insta::assert_snapshot!(dump.date().unwrap(), @"2022-10-06 12:53:49.131989609 +00:00:00"); insta::assert_snapshot!(dump.instance_uid().unwrap().unwrap(), @"9e15e977-f2ae-4761-943f-1eaf75fd736d"); + // batches didn't exists at the time + let batches = dump.batches().unwrap().collect::>>().unwrap(); + meili_snap::snapshot!(meili_snap::json_string!(batches), @"[]"); + // tasks let tasks = dump.tasks().unwrap().collect::>>().unwrap(); let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip(); @@ -569,6 +592,10 @@ pub(crate) mod test { insta::assert_snapshot!(dump.date().unwrap(), @"2022-10-07 11:39:03.709153554 +00:00:00"); assert_eq!(dump.instance_uid().unwrap(), None); + // batches didn't exists at the time + let batches = dump.batches().unwrap().collect::>>().unwrap(); + meili_snap::snapshot!(meili_snap::json_string!(batches), @"[]"); + // tasks let tasks = dump.tasks().unwrap().collect::>>().unwrap(); let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip(); @@ -662,6 +689,10 @@ pub(crate) mod test { insta::assert_snapshot!(dump.date().unwrap(), @"2022-10-09 20:27:59.904096267 +00:00:00"); assert_eq!(dump.instance_uid().unwrap(), None); + // batches didn't exists at the time + let batches = dump.batches().unwrap().collect::>>().unwrap(); + meili_snap::snapshot!(meili_snap::json_string!(batches), @"[]"); + // tasks let tasks = dump.tasks().unwrap().collect::>>().unwrap(); let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip(); @@ -755,6 +786,10 @@ pub(crate) mod test { insta::assert_snapshot!(dump.date().unwrap(), @"2023-01-30 16:26:09.247261 +00:00:00"); assert_eq!(dump.instance_uid().unwrap(), None); + // batches didn't exists at the time + let batches = dump.batches().unwrap().collect::>>().unwrap(); + meili_snap::snapshot!(meili_snap::json_string!(batches), @"[]"); + // tasks let tasks = dump.tasks().unwrap().collect::>>().unwrap(); let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip(); @@ -831,6 +866,10 @@ pub(crate) mod test { assert_eq!(dump.date(), None); assert_eq!(dump.instance_uid().unwrap(), None); + // batches didn't exists at the time + let batches = dump.batches().unwrap().collect::>>().unwrap(); + meili_snap::snapshot!(meili_snap::json_string!(batches), @"[]"); + // tasks let tasks = dump.tasks().unwrap().collect::>>().unwrap(); let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip(); diff --git a/crates/dump/src/reader/v6/mod.rs b/crates/dump/src/reader/v6/mod.rs index 4c05f16bf..9e0d07c78 100644 --- a/crates/dump/src/reader/v6/mod.rs +++ b/crates/dump/src/reader/v6/mod.rs @@ -18,6 +18,7 @@ pub type Checked = meilisearch_types::settings::Checked; pub type Unchecked = meilisearch_types::settings::Unchecked; pub type Task = crate::TaskDump; +pub type Batch = meilisearch_types::batches::Batch; pub type Key = meilisearch_types::keys::Key; pub type RuntimeTogglableFeatures = meilisearch_types::features::RuntimeTogglableFeatures; pub type Network = meilisearch_types::features::Network; @@ -49,6 +50,7 @@ pub struct V6Reader { instance_uid: Option, metadata: Metadata, tasks: BufReader, + batches: Option>, keys: BufReader, features: Option, network: Option, @@ -79,6 +81,12 @@ impl V6Reader { } else { None }; + let batches = match File::open(dump.path().join("batches").join("queue.jsonl")) { + Ok(file) => Some(BufReader::new(file)), + // The batch file was only introduced during the v1.13, anything prior to that won't have batches + Err(err) if err.kind() == ErrorKind::NotFound => None, + Err(e) => return Err(e.into()), + }; let network_file = match fs::read(dump.path().join("network.json")) { Ok(network_file) => Some(network_file), @@ -101,6 +109,7 @@ impl V6Reader { metadata: serde_json::from_reader(&*meta_file)?, instance_uid, tasks: BufReader::new(File::open(dump.path().join("tasks").join("queue.jsonl"))?), + batches, keys: BufReader::new(File::open(dump.path().join("keys.jsonl"))?), features, network, @@ -144,7 +153,7 @@ impl V6Reader { &mut self, ) -> Box>)>> + '_> { Box::new((&mut self.tasks).lines().map(|line| -> Result<_> { - let task: Task = serde_json::from_str(&line?).unwrap(); + let task: Task = serde_json::from_str(&line?)?; let update_file_path = self .dump @@ -156,8 +165,7 @@ impl V6Reader { if update_file_path.exists() { Ok(( task, - Some(Box::new(UpdateFile::new(&update_file_path).unwrap()) - as Box), + Some(Box::new(UpdateFile::new(&update_file_path)?) as Box), )) } else { Ok((task, None)) @@ -165,6 +173,16 @@ impl V6Reader { })) } + pub fn batches(&mut self) -> Box> + '_> { + match self.batches.as_mut() { + Some(batches) => Box::new((batches).lines().map(|line| -> Result<_> { + let batch = serde_json::from_str(&line?)?; + Ok(batch) + })), + None => Box::new(std::iter::empty()) as Box> + '_>, + } + } + pub fn keys(&mut self) -> Box> + '_> { Box::new( (&mut self.keys).lines().map(|line| -> Result<_> { Ok(serde_json::from_str(&line?)?) }), diff --git a/crates/dump/src/writer.rs b/crates/dump/src/writer.rs index 923147c63..bfe091ab5 100644 --- a/crates/dump/src/writer.rs +++ b/crates/dump/src/writer.rs @@ -4,6 +4,7 @@ use std::path::PathBuf; use flate2::write::GzEncoder; use flate2::Compression; +use meilisearch_types::batches::Batch; use meilisearch_types::features::{Network, RuntimeTogglableFeatures}; use meilisearch_types::keys::Key; use meilisearch_types::settings::{Checked, Settings}; @@ -54,6 +55,10 @@ impl DumpWriter { TaskWriter::new(self.dir.path().join("tasks")) } + pub fn create_batches_queue(&self) -> Result { + BatchWriter::new(self.dir.path().join("batches")) + } + pub fn create_experimental_features(&self, features: RuntimeTogglableFeatures) -> Result<()> { Ok(std::fs::write( self.dir.path().join("experimental-features.json"), @@ -130,6 +135,30 @@ impl TaskWriter { } } +pub struct BatchWriter { + queue: BufWriter, +} + +impl BatchWriter { + pub(crate) fn new(path: PathBuf) -> Result { + std::fs::create_dir(&path)?; + let queue = File::create(path.join("queue.jsonl"))?; + Ok(BatchWriter { queue: BufWriter::new(queue) }) + } + + /// Pushes batches in the dump. + pub fn push_batch(&mut self, batch: &Batch) -> Result<()> { + self.queue.write_all(&serde_json::to_vec(batch)?)?; + self.queue.write_all(b"\n")?; + Ok(()) + } + + pub fn flush(mut self) -> Result<()> { + self.queue.flush()?; + Ok(()) + } +} + pub struct UpdateFile { path: PathBuf, writer: Option>, @@ -209,8 +238,8 @@ pub(crate) mod test { use super::*; use crate::reader::Document; use crate::test::{ - create_test_api_keys, create_test_documents, create_test_dump, create_test_instance_uid, - create_test_settings, create_test_tasks, + create_test_api_keys, create_test_batches, create_test_documents, create_test_dump, + create_test_instance_uid, create_test_settings, create_test_tasks, }; fn create_directory_hierarchy(dir: &Path) -> String { @@ -285,8 +314,10 @@ pub(crate) mod test { let dump_path = dump.path(); // ==== checking global file hierarchy (we want to be sure there isn't too many files or too few) - insta::assert_snapshot!(create_directory_hierarchy(dump_path), @r###" + insta::assert_snapshot!(create_directory_hierarchy(dump_path), @r" . + ├---- batches/ + │ └---- queue.jsonl ├---- indexes/ │ └---- doggos/ │ │ ├---- documents.jsonl @@ -301,7 +332,7 @@ pub(crate) mod test { ├---- keys.jsonl ├---- metadata.json └---- network.json - "###); + "); // ==== checking the top level infos let metadata = fs::read_to_string(dump_path.join("metadata.json")).unwrap(); @@ -354,6 +385,16 @@ pub(crate) mod test { } } + // ==== checking the batch queue + let batches_queue = fs::read_to_string(dump_path.join("batches/queue.jsonl")).unwrap(); + for (batch, expected) in batches_queue.lines().zip(create_test_batches()) { + let mut batch = serde_json::from_str::(batch).unwrap(); + if batch.details.settings == Some(Box::new(Settings::::default())) { + batch.details.settings = None; + } + assert_eq!(batch, expected, "{batch:#?}{expected:#?}"); + } + // ==== checking the keys let keys = fs::read_to_string(dump_path.join("keys.jsonl")).unwrap(); for (key, expected) in keys.lines().zip(create_test_api_keys()) { diff --git a/crates/index-scheduler/src/dump.rs b/crates/index-scheduler/src/dump.rs index 7e0341fcb..ca26e50c8 100644 --- a/crates/index-scheduler/src/dump.rs +++ b/crates/index-scheduler/src/dump.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use std::io; use dump::{KindDump, TaskDump, UpdateFile}; +use meilisearch_types::batches::{Batch, BatchId}; use meilisearch_types::heed::RwTxn; use meilisearch_types::milli; use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task}; @@ -14,9 +15,15 @@ pub struct Dump<'a> { index_scheduler: &'a IndexScheduler, wtxn: RwTxn<'a>, + batch_to_task_mapping: HashMap, + indexes: HashMap, statuses: HashMap, kinds: HashMap, + + batch_indexes: HashMap, + batch_statuses: HashMap, + batch_kinds: HashMap, } impl<'a> Dump<'a> { @@ -27,12 +34,72 @@ impl<'a> Dump<'a> { Ok(Dump { index_scheduler, wtxn, + batch_to_task_mapping: HashMap::new(), indexes: HashMap::new(), statuses: HashMap::new(), kinds: HashMap::new(), + batch_indexes: HashMap::new(), + batch_statuses: HashMap::new(), + batch_kinds: HashMap::new(), }) } + /// Register a new batch coming from a dump in the scheduler. + /// By taking a mutable ref we're pretty sure no one will ever import a dump while actix is running. + pub fn register_dumped_batch(&mut self, batch: Batch) -> Result<()> { + self.index_scheduler.queue.batches.all_batches.put(&mut self.wtxn, &batch.uid, &batch)?; + if let Some(enqueued_at) = batch.enqueued_at { + utils::insert_task_datetime( + &mut self.wtxn, + self.index_scheduler.queue.batches.enqueued_at, + enqueued_at.earliest, + batch.uid, + )?; + utils::insert_task_datetime( + &mut self.wtxn, + self.index_scheduler.queue.batches.enqueued_at, + enqueued_at.oldest, + batch.uid, + )?; + } + utils::insert_task_datetime( + &mut self.wtxn, + self.index_scheduler.queue.batches.started_at, + batch.started_at, + batch.uid, + )?; + if let Some(finished_at) = batch.finished_at { + utils::insert_task_datetime( + &mut self.wtxn, + self.index_scheduler.queue.batches.finished_at, + finished_at, + batch.uid, + )?; + } + + for index in batch.stats.index_uids.keys() { + match self.batch_indexes.get_mut(index) { + Some(bitmap) => { + bitmap.insert(batch.uid); + } + None => { + let mut bitmap = RoaringBitmap::new(); + bitmap.insert(batch.uid); + self.batch_indexes.insert(index.to_string(), bitmap); + } + }; + } + + for status in batch.stats.status.keys() { + self.batch_statuses.entry(*status).or_default().insert(batch.uid); + } + for kind in batch.stats.types.keys() { + self.batch_kinds.entry(*kind).or_default().insert(batch.uid); + } + + Ok(()) + } + /// Register a new task coming from a dump in the scheduler. /// By taking a mutable ref we're pretty sure no one will ever import a dump while actix is running. pub fn register_dumped_task( @@ -149,6 +216,9 @@ impl<'a> Dump<'a> { }; self.index_scheduler.queue.tasks.all_tasks.put(&mut self.wtxn, &task.uid, &task)?; + if let Some(batch_id) = task.batch_uid { + self.batch_to_task_mapping.entry(batch_id).or_default().insert(task.uid); + } for index in task.indexes() { match self.indexes.get_mut(index) { @@ -198,6 +268,14 @@ impl<'a> Dump<'a> { /// Commit all the changes and exit the importing dump state pub fn finish(mut self) -> Result<()> { + for (batch_id, task_ids) in self.batch_to_task_mapping { + self.index_scheduler.queue.batch_to_tasks_mapping.put( + &mut self.wtxn, + &batch_id, + &task_ids, + )?; + } + for (index, bitmap) in self.indexes { self.index_scheduler.queue.tasks.index_tasks.put(&mut self.wtxn, &index, &bitmap)?; } @@ -208,6 +286,16 @@ impl<'a> Dump<'a> { self.index_scheduler.queue.tasks.put_kind(&mut self.wtxn, kind, &bitmap)?; } + for (index, bitmap) in self.batch_indexes { + self.index_scheduler.queue.batches.index_tasks.put(&mut self.wtxn, &index, &bitmap)?; + } + for (status, bitmap) in self.batch_statuses { + self.index_scheduler.queue.batches.put_status(&mut self.wtxn, status, &bitmap)?; + } + for (kind, bitmap) in self.batch_kinds { + self.index_scheduler.queue.batches.put_kind(&mut self.wtxn, kind, &bitmap)?; + } + self.wtxn.commit()?; self.index_scheduler.scheduler.wake_up.signal(); diff --git a/crates/index-scheduler/src/processing.rs b/crates/index-scheduler/src/processing.rs index 58f01c770..fed26aeb7 100644 --- a/crates/index-scheduler/src/processing.rs +++ b/crates/index-scheduler/src/processing.rs @@ -96,6 +96,7 @@ make_enum_progress! { StartTheDumpCreation, DumpTheApiKeys, DumpTheTasks, + DumpTheBatches, DumpTheIndexes, DumpTheExperimentalFeatures, CompressTheDump, diff --git a/crates/index-scheduler/src/scheduler/process_dump_creation.rs b/crates/index-scheduler/src/scheduler/process_dump_creation.rs index adf5a5b61..4a1aef44a 100644 --- a/crates/index-scheduler/src/scheduler/process_dump_creation.rs +++ b/crates/index-scheduler/src/scheduler/process_dump_creation.rs @@ -1,3 +1,4 @@ +use std::collections::BTreeMap; use std::fs::File; use std::io::BufWriter; use std::sync::atomic::Ordering; @@ -11,7 +12,9 @@ use meilisearch_types::tasks::{Details, KindWithContent, Status, Task}; use time::macros::format_description; use time::OffsetDateTime; -use crate::processing::{AtomicDocumentStep, AtomicTaskStep, DumpCreationProgress}; +use crate::processing::{ + AtomicBatchStep, AtomicDocumentStep, AtomicTaskStep, DumpCreationProgress, +}; use crate::{Error, IndexScheduler, Result}; impl IndexScheduler { @@ -102,7 +105,40 @@ impl IndexScheduler { } dump_tasks.flush()?; - // 3. Dump the indexes + // 3. dump the batches + progress.update_progress(DumpCreationProgress::DumpTheBatches); + let mut dump_batches = dump.create_batches_queue()?; + + let (atomic, update_batch_progress) = + AtomicBatchStep::new(self.queue.batches.all_batches.len(&rtxn)? as u32); + progress.update_progress(update_batch_progress); + + for ret in self.queue.batches.all_batches.iter(&rtxn)? { + if self.scheduler.must_stop_processing.get() { + return Err(Error::AbortedTask); + } + + let (_, mut b) = ret?; + // In the case we're dumping ourselves we want to be marked as finished + // to not loop over ourselves indefinitely. + if b.uid == task.uid { + let finished_at = OffsetDateTime::now_utc(); + + // We're going to fake the date because we don't know if everything is going to go well. + // But we need to dump the task as finished and successful. + // If something fail everything will be set appropriately in the end. + let mut statuses = BTreeMap::new(); + statuses.insert(Status::Succeeded, b.stats.total_nb_tasks); + b.stats.status = statuses; + b.finished_at = Some(finished_at); + } + + dump_batches.push_batch(&b)?; + atomic.fetch_add(1, Ordering::Relaxed); + } + dump_batches.flush()?; + + // 4. Dump the indexes progress.update_progress(DumpCreationProgress::DumpTheIndexes); let nb_indexes = self.index_mapper.index_mapping.len(&rtxn)? as u32; let mut count = 0; @@ -142,7 +178,7 @@ impl IndexScheduler { let documents = index .all_documents(&rtxn) .map_err(|e| Error::from_milli(e, Some(uid.to_string())))?; - // 3.1. Dump the documents + // 4.1. Dump the documents for ret in documents { if self.scheduler.must_stop_processing.get() { return Err(Error::AbortedTask); @@ -204,7 +240,7 @@ impl IndexScheduler { atomic.fetch_add(1, Ordering::Relaxed); } - // 3.2. Dump the settings + // 4.2. Dump the settings let settings = meilisearch_types::settings::settings( index, &rtxn, @@ -215,7 +251,7 @@ impl IndexScheduler { Ok(()) })?; - // 4. Dump experimental feature settings + // 5. Dump experimental feature settings progress.update_progress(DumpCreationProgress::DumpTheExperimentalFeatures); let features = self.features().runtime_features(); dump.create_experimental_features(features)?; diff --git a/crates/meilisearch-types/src/batches.rs b/crates/meilisearch-types/src/batches.rs index 462d314db..663f5cb8d 100644 --- a/crates/meilisearch-types/src/batches.rs +++ b/crates/meilisearch-types/src/batches.rs @@ -30,7 +30,21 @@ pub struct Batch { pub enqueued_at: Option, } -#[derive(Clone, Copy, Debug, Serialize, Deserialize)] +impl PartialEq for Batch { + fn eq(&self, other: &Self) -> bool { + let Self { uid, progress, details, stats, started_at, finished_at, enqueued_at } = self; + + *uid == other.uid + && progress.is_none() == other.progress.is_none() + && details == &other.details + && stats == &other.stats + && started_at == &other.started_at + && finished_at == &other.finished_at + && enqueued_at == &other.enqueued_at + } +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct BatchEnqueuedAt { #[serde(with = "time::serde::rfc3339")] pub earliest: OffsetDateTime, @@ -38,7 +52,7 @@ pub struct BatchEnqueuedAt { pub oldest: OffsetDateTime, } -#[derive(Default, Debug, Clone, Serialize, Deserialize, ToSchema)] +#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize, ToSchema)] #[serde(rename_all = "camelCase")] #[schema(rename_all = "camelCase")] pub struct BatchStats { diff --git a/crates/meilisearch/src/lib.rs b/crates/meilisearch/src/lib.rs index 9b4ee25d6..e22b6dff3 100644 --- a/crates/meilisearch/src/lib.rs +++ b/crates/meilisearch/src/lib.rs @@ -571,9 +571,15 @@ fn import_dump( index_scheduler.refresh_index_stats(&uid)?; } + // 5. Import the queue let mut index_scheduler_dump = index_scheduler.register_dumped_task()?; + // 5.1. Import the batches + for ret in dump_reader.batches()? { + let batch = ret?; + index_scheduler_dump.register_dumped_batch(batch)?; + } - // 5. Import the tasks. + // 5.2. Import the tasks for ret in dump_reader.tasks()? { let (task, file) = ret?; index_scheduler_dump.register_dumped_task(task, file)?; diff --git a/crates/meilitool/src/main.rs b/crates/meilitool/src/main.rs index a30a80d7c..9b3e11ff0 100644 --- a/crates/meilitool/src/main.rs +++ b/crates/meilitool/src/main.rs @@ -8,6 +8,7 @@ use clap::{Parser, Subcommand}; use dump::{DumpWriter, IndexMetadata}; use file_store::FileStore; use meilisearch_auth::AuthController; +use meilisearch_types::batches::Batch; use meilisearch_types::heed::types::{SerdeJson, Str}; use meilisearch_types::heed::{ CompactionOption, Database, Env, EnvOpenOptions, RoTxn, RwTxn, Unspecified, @@ -279,70 +280,86 @@ fn export_a_dump( eprintln!("Successfully dumped {count} keys!"); + eprintln!("Dumping the queue"); let rtxn = env.read_txn()?; let all_tasks: Database> = try_opening_database(&env, &rtxn, "all-tasks")?; + let all_batches: Database> = + try_opening_database(&env, &rtxn, "all-batches")?; let index_mapping: Database = try_opening_database(&env, &rtxn, "index-mapping")?; - if skip_enqueued_tasks { - eprintln!("Skip dumping the enqueued tasks..."); - } else { - let mut dump_tasks = dump.create_tasks_queue()?; - let mut count = 0; - for ret in all_tasks.iter(&rtxn)? { - let (_, t) = ret?; - let status = t.status; - let content_file = t.content_uuid(); + eprintln!("Dumping the tasks"); + let mut dump_tasks = dump.create_tasks_queue()?; + let mut count_tasks = 0; + let mut count_enqueued_tasks = 0; + for ret in all_tasks.iter(&rtxn)? { + let (_, t) = ret?; + let status = t.status; + let content_file = t.content_uuid(); - let mut dump_content_file = dump_tasks.push_task(&t.into())?; + if status == Status::Enqueued && skip_enqueued_tasks { + continue; + } - // 3.1. Dump the `content_file` associated with the task if there is one and the task is not finished yet. - if let Some(content_file_uuid) = content_file { - if status == Status::Enqueued { - let content_file = file_store.get_update(content_file_uuid)?; + let mut dump_content_file = dump_tasks.push_task(&t.into())?; - if (detected_version.0, detected_version.1, detected_version.2) < (1, 12, 0) { - eprintln!("Dumping the enqueued tasks reading them in obkv format..."); - let reader = - DocumentsBatchReader::from_reader(content_file).with_context(|| { - format!("While reading content file {:?}", content_file_uuid) - })?; - let (mut cursor, documents_batch_index) = - reader.into_cursor_and_fields_index(); - while let Some(doc) = cursor.next_document().with_context(|| { - format!("While iterating on content file {:?}", content_file_uuid) - })? { - dump_content_file - .push_document(&obkv_to_object(doc, &documents_batch_index)?)?; - } - } else { - eprintln!( - "Dumping the enqueued tasks reading them in JSON stream format..." - ); - for document in - serde_json::de::Deserializer::from_reader(content_file).into_iter() - { - let document = document.with_context(|| { - format!("While reading content file {:?}", content_file_uuid) - })?; - dump_content_file.push_document(&document)?; - } + // 3.1. Dump the `content_file` associated with the task if there is one and the task is not finished yet. + if let Some(content_file_uuid) = content_file { + if status == Status::Enqueued { + let content_file = file_store.get_update(content_file_uuid)?; + + if (detected_version.0, detected_version.1, detected_version.2) < (1, 12, 0) { + eprintln!("Dumping the enqueued tasks reading them in obkv format..."); + let reader = + DocumentsBatchReader::from_reader(content_file).with_context(|| { + format!("While reading content file {:?}", content_file_uuid) + })?; + let (mut cursor, documents_batch_index) = reader.into_cursor_and_fields_index(); + while let Some(doc) = cursor.next_document().with_context(|| { + format!("While iterating on content file {:?}", content_file_uuid) + })? { + dump_content_file + .push_document(&obkv_to_object(doc, &documents_batch_index)?)?; + } + } else { + eprintln!("Dumping the enqueued tasks reading them in JSON stream format..."); + for document in + serde_json::de::Deserializer::from_reader(content_file).into_iter() + { + let document = document.with_context(|| { + format!("While reading content file {:?}", content_file_uuid) + })?; + dump_content_file.push_document(&document)?; } - - dump_content_file.flush()?; - count += 1; } + + dump_content_file.flush()?; + count_enqueued_tasks += 1; } } - dump_tasks.flush()?; - - eprintln!("Successfully dumped {count} enqueued tasks!"); + count_tasks += 1; } + dump_tasks.flush()?; + eprintln!( + "Successfully dumped {count_tasks} tasks including {count_enqueued_tasks} enqueued tasks!" + ); + // 4. dump the batches + eprintln!("Dumping the batches"); + let mut dump_batches = dump.create_batches_queue()?; + let mut count = 0; + + for ret in all_batches.iter(&rtxn)? { + let (_, b) = ret?; + dump_batches.push_batch(&b)?; + count += 1; + } + dump_batches.flush()?; + eprintln!("Successfully dumped {count} batches!"); + + // 5. Dump the indexes eprintln!("Dumping the indexes..."); - - // 4. Dump the indexes let mut count = 0; for result in index_mapping.iter(&rtxn)? { let (uid, uuid) = result?; @@ -363,14 +380,14 @@ fn export_a_dump( let fields_ids_map = index.fields_ids_map(&rtxn)?; let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect(); - // 4.1. Dump the documents + // 5.1. Dump the documents for ret in index.all_documents(&rtxn)? { let (_id, doc) = ret?; let document = obkv_to_json(&all_fields, &fields_ids_map, doc)?; index_dumper.push_document(&document)?; } - // 4.2. Dump the settings + // 5.2. Dump the settings let settings = meilisearch_types::settings::settings( &index, &rtxn, From 80198aa8552693f52bf2d721509af9b70069bbd1 Mon Sep 17 00:00:00 2001 From: Tamo Date: Tue, 4 Feb 2025 11:49:11 +0100 Subject: [PATCH 2/6] add a dump test with batches and enqueued tasks --- ...v6_v1.13.0_batches_and_enqueued_tasks.dump | Bin 0 -> 1706 bytes crates/meilisearch/tests/common/server.rs | 4 + crates/meilisearch/tests/dumps/data.rs | 5 ++ crates/meilisearch/tests/dumps/mod.rs | 55 ++++++++++++ .../batches.snap | 78 ++++++++++++++++++ .../tasks.snap | 78 ++++++++++++++++++ 6 files changed, 220 insertions(+) create mode 100644 crates/meilisearch/tests/assets/v6_v1.13.0_batches_and_enqueued_tasks.dump create mode 100644 crates/meilisearch/tests/dumps/snapshots/mod.rs/import_dump_v6_containing_batches_and_enqueued_tasks/batches.snap create mode 100644 crates/meilisearch/tests/dumps/snapshots/mod.rs/import_dump_v6_containing_batches_and_enqueued_tasks/tasks.snap diff --git a/crates/meilisearch/tests/assets/v6_v1.13.0_batches_and_enqueued_tasks.dump b/crates/meilisearch/tests/assets/v6_v1.13.0_batches_and_enqueued_tasks.dump new file mode 100644 index 0000000000000000000000000000000000000000..a1816b79a61b0eaf7612974cc3304db694f1bb3a GIT binary patch literal 1706 zcmV;b237eViwFP!00000|Lt2}Z`(Ey*YiGwr^mu}#6OZOzqJj51;f^&O*){%V9*kk zaFIn#qSkmpzWa`(<=Al?r>U*2t$q)YMII^gjy&EUW_aWr9G*wL(D&i#q0skMpA8&D zp@alJJau_U5pjH{1=N*dWv}OaDVm8i&#wghy5Oal@m#TNkaDK#l2^k&E15fcgNHmX zpmeMJ5yAK5k0|wt(+U&6bNS!6GpO^FUmy6I&YG zt*K2_RmU;s35Vtgny7ifO}WSue*L-xTf!U?y3Q1xi^SAl@l=$$+j^4`hMi&00sD?+jmoui|X<*>IhRjB6Oswd2ZIUp}uiBD>K2X?cec8 z7)60;DYN8Bl()A(EZB!u+2@u#=HhRjEYD0sV0f@z_=Sv47G1qe;D82RzbVg)gz1C( zxjgsB7!7gg2h_tM-Nkc5O|eIV5en$%@Z5gK9|gpVLh=~T?^6QLoeukh((BDRES_`z z6HL4FzXcfiAJs;0Oj3~@!W4Xl{Bam|_Fo&=l>ZUlw;8z0|A(_?!~e&f|KAG0Tm!QL zgZ&~e;Pe%_;+fl>vEL2s{y(Jmw{!o)uHV@IIPijQ|F?iU|Ib7M&h<-PXW5@Pd0Cb6t@Wd_BvOn#b}p-keX1XNk`dn`ZR z>M`DVt&`*-w{@*_muxoOyMy_hOWEMhlT9=HZ%i~#**X50MnjA+9Z@eJD=Z|BvCzIh zzP9UGv8n)oZ9w`vRPZV;MPbX`^NRO`>M7Z)AVLsd_R><$dg_wvA7Fpvy3d@=pilRh zr>xFY@4cmb`S!H;GYF$^2kgQ^h;Z!t1D{60fcjxH7%?9W!eGRba1;blGy-B~@)de4 zuKlF@N5ef+_(H!!CQLx?Yr<4BStBptFMCi&AXtYKN26dwj7G6G^wX$s`&*cj%Cg7u zxw;hj1+avvo~U}cR6W)(OG3WRaCq2z&1(JQ$KJ6tG%y2cyOB9GDC%h@;zyYJGMR~d zgQpnWwFI+ZfYTB1l%RMpWhfdDpHUA@(Foyqou@DM#ZE#&U0^?a8awZ(Z_W2%ZRRnb z*jYp&o_YzP0~V6$fKEv`h?o})IEy2e5*$ZU;@;nauzs6$Y`b~FcEWf}(2xLILO5M^ z97GFZfkMTVPlGAF{po?LP&zq4vRG0{39zkShK{SWv^V#C+3-AN>K&+O7Z%m}+@ZLf^MpV``oX+(q7IQe0 z888H#P;>hs*DOeEum=K1BG;7f#XtN6h6B{b=af0uzumD9U3`*@^2m-uckm`(ndY)= zowGolLM%#eUESx5U4LL%&8xS(JkjM4E%H3eNAqrLU(Ge~3z!7Uic4m^;euU=d>O!j zj?VP8l%Ir&IKi^kzVK|ylZ2PeO(C;HUe`*d=`U5ot4zi$gQ>19DqCenYF&ngH$@5B z=ejAg)+)^9On^n)-6K6H*7g4~lDOB9z%%l{es}+`9oYOYYVyA}PTsj}-woFHKV&x; zc=oJS`ww2o=Kp^LUH-QXEK)zq1lc0xrDINGts0h*q3wy%z{A7~X|xg;+M2l|kkq^sWF65gB^A5a^CgLV9ab zE1(dz>2~(lgw?d^u!cX*{;|{k-^~AY_rF@gQ}X0ahYlS&bol1*FWU#kjsQde05Il! ALjV8( literal 0 HcmV?d00001 diff --git a/crates/meilisearch/tests/common/server.rs b/crates/meilisearch/tests/common/server.rs index c017a060c..c9ab3dced 100644 --- a/crates/meilisearch/tests/common/server.rs +++ b/crates/meilisearch/tests/common/server.rs @@ -163,6 +163,10 @@ impl Server { self.service.get("/tasks").await } + pub async fn batches(&self) -> (Value, StatusCode) { + self.service.get("/batches").await + } + pub async fn set_features(&self, value: Value) -> (Value, StatusCode) { self.service.patch("/experimental-features", value).await } diff --git a/crates/meilisearch/tests/dumps/data.rs b/crates/meilisearch/tests/dumps/data.rs index d353aaf1d..cb46aa41f 100644 --- a/crates/meilisearch/tests/dumps/data.rs +++ b/crates/meilisearch/tests/dumps/data.rs @@ -22,6 +22,7 @@ pub enum GetDump { TestV5, TestV6WithExperimental, + TestV6WithBatchesAndEnqueuedTasks, } impl GetDump { @@ -74,6 +75,10 @@ impl GetDump { "tests/assets/v6_v1.6.0_use_deactivated_experimental_setting.dump" ) .into(), + GetDump::TestV6WithBatchesAndEnqueuedTasks => { + exist_relative_path!("tests/assets/v6_v1.13.0_batches_and_enqueued_tasks.dump") + .into() + } } } } diff --git a/crates/meilisearch/tests/dumps/mod.rs b/crates/meilisearch/tests/dumps/mod.rs index 6102a2817..9edcd95fc 100644 --- a/crates/meilisearch/tests/dumps/mod.rs +++ b/crates/meilisearch/tests/dumps/mod.rs @@ -1994,6 +1994,61 @@ async fn import_dump_v6_containing_experimental_features() { .await; } +#[actix_rt::test] +async fn import_dump_v6_containing_batches_and_enqueued_tasks() { + let temp = tempfile::tempdir().unwrap(); + + let options = Opt { + import_dump: Some(GetDump::TestV6WithBatchesAndEnqueuedTasks.path()), + ..default_settings(temp.path()) + }; + let mut server = Server::new_auth_with_options(options, temp).await; + server.use_api_key("MASTER_KEY"); + server.wait_task(2).await.succeeded(); + let (tasks, _) = server.tasks().await; + snapshot!(json_string!(tasks, { ".results[1].startedAt" => "[date]", ".results[1].finishedAt" => "[date]", ".results[1].duration" => "[date]" }), name: "tasks"); + let (batches, _) = server.batches().await; + snapshot!(json_string!(batches, { ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].duration" => "[date]" }), name: "batches"); + + let (indexes, code) = server.list_indexes(None, None).await; + assert_eq!(code, 200, "{indexes}"); + + assert_eq!(indexes["results"].as_array().unwrap().len(), 1); + assert_eq!(indexes["results"][0]["uid"], json!("kefir")); + assert_eq!(indexes["results"][0]["primaryKey"], json!("id")); + + let (response, code) = server.get_features().await; + meili_snap::snapshot!(code, @"200 OK"); + meili_snap::snapshot!(meili_snap::json_string!(response), @r###" + { + "metrics": false, + "logsRoute": false, + "editDocumentsByFunction": false, + "containsFilter": false + } + "###); + + let index = server.index("kefir"); + let (documents, _) = index.get_all_documents_raw("").await; + snapshot!(documents, @r#" + { + "results": [ + { + "id": 1, + "dog": "kefir" + }, + { + "id": 2, + "dog": "intel" + } + ], + "offset": 0, + "limit": 20, + "total": 2 + } + "#); +} + // In this test we must generate the dump ourselves to ensure the // `user provided` vectors are well set #[actix_rt::test] diff --git a/crates/meilisearch/tests/dumps/snapshots/mod.rs/import_dump_v6_containing_batches_and_enqueued_tasks/batches.snap b/crates/meilisearch/tests/dumps/snapshots/mod.rs/import_dump_v6_containing_batches_and_enqueued_tasks/batches.snap new file mode 100644 index 000000000..aeac6cf55 --- /dev/null +++ b/crates/meilisearch/tests/dumps/snapshots/mod.rs/import_dump_v6_containing_batches_and_enqueued_tasks/batches.snap @@ -0,0 +1,78 @@ +--- +source: crates/meilisearch/tests/dumps/mod.rs +snapshot_kind: text +--- +{ + "results": [ + { + "uid": 2, + "progress": null, + "details": { + "receivedDocuments": 1, + "indexedDocuments": 1 + }, + "stats": { + "totalNbTasks": 1, + "status": { + "succeeded": 1 + }, + "types": { + "documentAdditionOrUpdate": 1 + }, + "indexUids": { + "kefir": 1 + } + }, + "duration": "[date]", + "startedAt": "[date]", + "finishedAt": "[date]" + }, + { + "uid": 1, + "progress": null, + "details": { + "receivedDocuments": 1, + "indexedDocuments": 1 + }, + "stats": { + "totalNbTasks": 1, + "status": { + "succeeded": 1 + }, + "types": { + "documentAdditionOrUpdate": 1 + }, + "indexUids": { + "kefir": 1 + } + }, + "duration": "PT0.144827890S", + "startedAt": "2025-02-04T10:15:21.275640274Z", + "finishedAt": "2025-02-04T10:15:21.420468164Z" + }, + { + "uid": 0, + "progress": null, + "details": {}, + "stats": { + "totalNbTasks": 1, + "status": { + "succeeded": 1 + }, + "types": { + "indexCreation": 1 + }, + "indexUids": { + "kefir": 1 + } + }, + "duration": "PT0.032902186S", + "startedAt": "2025-02-04T10:14:43.559526162Z", + "finishedAt": "2025-02-04T10:14:43.592428348Z" + } + ], + "total": 3, + "limit": 20, + "from": 2, + "next": null +} diff --git a/crates/meilisearch/tests/dumps/snapshots/mod.rs/import_dump_v6_containing_batches_and_enqueued_tasks/tasks.snap b/crates/meilisearch/tests/dumps/snapshots/mod.rs/import_dump_v6_containing_batches_and_enqueued_tasks/tasks.snap new file mode 100644 index 000000000..99dc06f24 --- /dev/null +++ b/crates/meilisearch/tests/dumps/snapshots/mod.rs/import_dump_v6_containing_batches_and_enqueued_tasks/tasks.snap @@ -0,0 +1,78 @@ +--- +source: crates/meilisearch/tests/dumps/mod.rs +snapshot_kind: text +--- +{ + "results": [ + { + "uid": 3, + "batchUid": null, + "indexUid": null, + "status": "succeeded", + "type": "dumpCreation", + "canceledBy": null, + "details": { + "dumpUid": null + }, + "error": null, + "duration": "PT0.000629059S", + "enqueuedAt": "2025-02-04T10:22:31.318175268Z", + "startedAt": "2025-02-04T10:22:31.331701375Z", + "finishedAt": "2025-02-04T10:22:31.332330434Z" + }, + { + "uid": 2, + "batchUid": 2, + "indexUid": "kefir", + "status": "succeeded", + "type": "documentAdditionOrUpdate", + "canceledBy": null, + "details": { + "receivedDocuments": 1, + "indexedDocuments": 1 + }, + "error": null, + "duration": "[date]", + "enqueuedAt": "2025-02-04T10:15:49.212484063Z", + "startedAt": "[date]", + "finishedAt": "[date]" + }, + { + "uid": 1, + "batchUid": null, + "indexUid": "kefir", + "status": "succeeded", + "type": "documentAdditionOrUpdate", + "canceledBy": null, + "details": { + "receivedDocuments": 1, + "indexedDocuments": 1 + }, + "error": null, + "duration": "PT0.144827890S", + "enqueuedAt": "2025-02-04T10:15:21.258630973Z", + "startedAt": "2025-02-04T10:15:21.275640274Z", + "finishedAt": "2025-02-04T10:15:21.420468164Z" + }, + { + "uid": 0, + "batchUid": null, + "indexUid": "kefir", + "status": "succeeded", + "type": "indexCreation", + "canceledBy": null, + "details": { + "primaryKey": null + }, + "error": null, + "duration": "PT0.032902186S", + "enqueuedAt": "2025-02-04T10:14:43.550379968Z", + "startedAt": "2025-02-04T10:14:43.559526162Z", + "finishedAt": "2025-02-04T10:14:43.592428348Z" + } + ], + "total": 4, + "limit": 20, + "from": 3, + "next": null +} From 9293e7f2c1deca04e3313b803088f0403c4b39e3 Mon Sep 17 00:00:00 2001 From: Tamo Date: Wed, 5 Feb 2025 18:16:56 +0100 Subject: [PATCH 3/6] fix tests after rebase --- crates/meilisearch/tests/dumps/mod.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/crates/meilisearch/tests/dumps/mod.rs b/crates/meilisearch/tests/dumps/mod.rs index 9edcd95fc..1c38175fa 100644 --- a/crates/meilisearch/tests/dumps/mod.rs +++ b/crates/meilisearch/tests/dumps/mod.rs @@ -2019,14 +2019,15 @@ async fn import_dump_v6_containing_batches_and_enqueued_tasks() { let (response, code) = server.get_features().await; meili_snap::snapshot!(code, @"200 OK"); - meili_snap::snapshot!(meili_snap::json_string!(response), @r###" + meili_snap::snapshot!(meili_snap::json_string!(response), @r#" { "metrics": false, "logsRoute": false, "editDocumentsByFunction": false, - "containsFilter": false + "containsFilter": false, + "network": false } - "###); + "#); let index = server.index("kefir"); let (documents, _) = index.get_all_documents_raw("").await; From 00eb47d42ef4c86f5bd7b5947083d8f871251c15 Mon Sep 17 00:00:00 2001 From: Tamo Date: Tue, 11 Feb 2025 10:48:04 +0100 Subject: [PATCH 4/6] use serde_json::to_writer instead of serializing + writing --- crates/dump/src/writer.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/dump/src/writer.rs b/crates/dump/src/writer.rs index bfe091ab5..63b006b5c 100644 --- a/crates/dump/src/writer.rs +++ b/crates/dump/src/writer.rs @@ -93,7 +93,7 @@ impl KeyWriter { } pub fn push_key(&mut self, key: &Key) -> Result<()> { - self.keys.write_all(&serde_json::to_vec(key)?)?; + serde_json::to_writer(&mut self.keys, &key)?; self.keys.write_all(b"\n")?; Ok(()) } @@ -123,7 +123,7 @@ impl TaskWriter { /// Pushes tasks in the dump. /// If the tasks has an associated `update_file` it'll use the `task_id` as its name. pub fn push_task(&mut self, task: &TaskDump) -> Result { - self.queue.write_all(&serde_json::to_vec(task)?)?; + serde_json::to_writer(&mut self.queue, &task)?; self.queue.write_all(b"\n")?; Ok(UpdateFile::new(self.update_files.join(format!("{}.jsonl", task.uid)))) @@ -148,7 +148,7 @@ impl BatchWriter { /// Pushes batches in the dump. pub fn push_batch(&mut self, batch: &Batch) -> Result<()> { - self.queue.write_all(&serde_json::to_vec(batch)?)?; + serde_json::to_writer(&mut self.queue, &batch)?; self.queue.write_all(b"\n")?; Ok(()) } @@ -170,8 +170,8 @@ impl UpdateFile { } pub fn push_document(&mut self, document: &Document) -> Result<()> { - if let Some(writer) = self.writer.as_mut() { - writer.write_all(&serde_json::to_vec(document)?)?; + if let Some(mut writer) = self.writer.as_mut() { + serde_json::to_writer(&mut writer, &document)?; writer.write_all(b"\n")?; } else { let file = File::create(&self.path).unwrap(); From 84e2a1f836926523aa442b74a1ba749522b7e57a Mon Sep 17 00:00:00 2001 From: Tamo Date: Tue, 11 Feb 2025 10:49:57 +0100 Subject: [PATCH 5/6] rename the atomic to something more meaningful --- crates/index-scheduler/src/scheduler/process_dump_creation.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/index-scheduler/src/scheduler/process_dump_creation.rs b/crates/index-scheduler/src/scheduler/process_dump_creation.rs index 4a1aef44a..a6d785b2f 100644 --- a/crates/index-scheduler/src/scheduler/process_dump_creation.rs +++ b/crates/index-scheduler/src/scheduler/process_dump_creation.rs @@ -109,7 +109,7 @@ impl IndexScheduler { progress.update_progress(DumpCreationProgress::DumpTheBatches); let mut dump_batches = dump.create_batches_queue()?; - let (atomic, update_batch_progress) = + let (atomic_batch_progress, update_batch_progress) = AtomicBatchStep::new(self.queue.batches.all_batches.len(&rtxn)? as u32); progress.update_progress(update_batch_progress); @@ -134,7 +134,7 @@ impl IndexScheduler { } dump_batches.push_batch(&b)?; - atomic.fetch_add(1, Ordering::Relaxed); + atomic_batch_progress.fetch_add(1, Ordering::Relaxed); } dump_batches.flush()?; From 43c8d54501c6212e3730faebdc3104ffc393f2ca Mon Sep 17 00:00:00 2001 From: Tamo Date: Tue, 11 Feb 2025 11:19:13 +0100 Subject: [PATCH 6/6] fix test after rebase --- crates/meilisearch/tests/dumps/mod.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/crates/meilisearch/tests/dumps/mod.rs b/crates/meilisearch/tests/dumps/mod.rs index 1c38175fa..b438006c5 100644 --- a/crates/meilisearch/tests/dumps/mod.rs +++ b/crates/meilisearch/tests/dumps/mod.rs @@ -2019,15 +2019,16 @@ async fn import_dump_v6_containing_batches_and_enqueued_tasks() { let (response, code) = server.get_features().await; meili_snap::snapshot!(code, @"200 OK"); - meili_snap::snapshot!(meili_snap::json_string!(response), @r#" + meili_snap::snapshot!(meili_snap::json_string!(response), @r###" { "metrics": false, "logsRoute": false, "editDocumentsByFunction": false, "containsFilter": false, - "network": false + "network": false, + "getTaskDocumentsRoute": false } - "#); + "###); let index = server.index("kefir"); let (documents, _) = index.get_all_documents_raw("").await;