From e017986ddec21cd32d8e607b8dc29445a2de875c Mon Sep 17 00:00:00 2001 From: Tamo Date: Mon, 27 Jan 2025 18:22:07 +0100 Subject: [PATCH] wip --- crates/dump/src/lib.rs | 10 +++- crates/dump/src/reader/mod.rs | 10 ++++ crates/dump/src/reader/v6/mod.rs | 13 ++++- crates/dump/src/writer.rs | 4 +- crates/index-scheduler/src/batch.rs | 9 +++- crates/index-scheduler/src/lib.rs | 79 +++++++++++------------------ 6 files changed, 69 insertions(+), 56 deletions(-) diff --git a/crates/dump/src/lib.rs b/crates/dump/src/lib.rs index 31cd3028e..150160e93 100644 --- a/crates/dump/src/lib.rs +++ b/crates/dump/src/lib.rs @@ -1,7 +1,8 @@ #![allow(clippy::type_complexity)] #![allow(clippy::wrong_self_convention)] -use meilisearch_types::batches::BatchId; +use meilisearch_types::batch_view::BatchView; +use meilisearch_types::batches::{Batch, BatchId}; use meilisearch_types::error::ResponseError; use meilisearch_types::keys::Key; use meilisearch_types::milli::update::IndexDocumentsMethod; @@ -91,6 +92,13 @@ pub struct TaskDump { pub finished_at: Option, } +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct BatchDump { + pub original: Batch, + pub tasks: RoaringBitmap, +} + // A `Kind` specific version made for the dump. If modified you may break the dump. #[derive(Debug, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] diff --git a/crates/dump/src/reader/mod.rs b/crates/dump/src/reader/mod.rs index f9660972a..7fe3d1056 100644 --- a/crates/dump/src/reader/mod.rs +++ b/crates/dump/src/reader/mod.rs @@ -101,6 +101,16 @@ impl DumpReader { } } + pub fn batches(&mut self) -> Result> + '_>> { + match self { + DumpReader::Current(current) => Ok(current.dumps()), + // There was no batches in the previous version + DumpReader::Compat(_compat) => { + Ok(Box::new(std::iter::empty()) as Box> + '_>) + } + } + } + pub fn keys(&mut self) -> Result> + '_>> { match self { DumpReader::Current(current) => Ok(current.keys()), diff --git a/crates/dump/src/reader/v6/mod.rs b/crates/dump/src/reader/v6/mod.rs index 50b9751a2..e9f4bd952 100644 --- a/crates/dump/src/reader/v6/mod.rs +++ b/crates/dump/src/reader/v6/mod.rs @@ -18,6 +18,7 @@ pub type Checked = meilisearch_types::settings::Checked; pub type Unchecked = meilisearch_types::settings::Unchecked; pub type Task = crate::TaskDump; +pub type Batch = crate::BatchDump; pub type Key = meilisearch_types::keys::Key; pub type RuntimeTogglableFeatures = meilisearch_types::features::RuntimeTogglableFeatures; @@ -48,6 +49,7 @@ pub struct V6Reader { instance_uid: Option, metadata: Metadata, tasks: BufReader, + batches: BufReader, keys: BufReader, features: Option, } @@ -82,6 +84,7 @@ impl V6Reader { metadata: serde_json::from_reader(&*meta_file)?, instance_uid, tasks: BufReader::new(File::open(dump.path().join("tasks").join("queue.jsonl"))?), + batches: BufReader::new(File::open(dump.path().join("batches").join("queue.jsonl"))?), keys: BufReader::new(File::open(dump.path().join("keys.jsonl"))?), features, dump, @@ -124,7 +127,7 @@ impl V6Reader { &mut self, ) -> Box>)>> + '_> { Box::new((&mut self.tasks).lines().map(|line| -> Result<_> { - let task: Task = serde_json::from_str(&line?).unwrap(); + let task: Task = serde_json::from_str(&line?)?; let update_file_path = self .dump @@ -145,6 +148,14 @@ impl V6Reader { })) } + pub fn dumps(&mut self) -> Box> + '_> { + Box::new( + (&mut self.batches) + .lines() + .map(|line| -> Result<_> { Ok(serde_json::from_str(&line?)?) }), + ) + } + pub fn keys(&mut self) -> Box> + '_> { Box::new( (&mut self.keys).lines().map(|line| -> Result<_> { Ok(serde_json::from_str(&line?)?) }), diff --git a/crates/dump/src/writer.rs b/crates/dump/src/writer.rs index 41a1702cb..4be179a67 100644 --- a/crates/dump/src/writer.rs +++ b/crates/dump/src/writer.rs @@ -14,7 +14,7 @@ use time::OffsetDateTime; use uuid::Uuid; use crate::reader::Document; -use crate::{IndexMetadata, Metadata, Result, TaskDump, CURRENT_DUMP_VERSION}; +use crate::{BatchDump, IndexMetadata, Metadata, Result, TaskDump, CURRENT_DUMP_VERSION}; pub struct DumpWriter { dir: TempDir, @@ -174,7 +174,7 @@ impl BatchWriter { /// Pushes batches in the dump. /// The batches doesn't contains any private information thus we don't need a special type like with the tasks. - pub fn push_batch(&mut self, batch: &BatchView) -> Result<()> { + pub fn push_batch(&mut self, batch: &BatchDump) -> Result<()> { self.queue.write_all(&serde_json::to_vec(batch)?)?; self.queue.write_all(b"\n")?; Ok(()) diff --git a/crates/index-scheduler/src/batch.rs b/crates/index-scheduler/src/batch.rs index c463bea35..006e1083e 100644 --- a/crates/index-scheduler/src/batch.rs +++ b/crates/index-scheduler/src/batch.rs @@ -26,7 +26,7 @@ use std::sync::atomic::Ordering; use bumpalo::collections::CollectIn; use bumpalo::Bump; -use dump::IndexMetadata; +use dump::{BatchDump, IndexMetadata}; use meilisearch_types::batch_view::BatchView; use meilisearch_types::batches::BatchId; use meilisearch_types::heed::{RoTxn, RwTxn}; @@ -881,7 +881,12 @@ impl IndexScheduler { batch.started_at = started_at; batch.finished_at = Some(finished_at); } - dump_batches.push_batch(&BatchView::from_batch(&batch))?; + // a missing or empty batch shouldn't exists but if it happens we should just skip it + if let Some(tasks) = self.batch_to_tasks_mapping.get(&rtxn, &batch.uid)? { + if !tasks.is_empty() { + dump_batches.push_batch(&BatchDump { original: batch, tasks })?; + } + } atomic.fetch_add(1, Ordering::Relaxed); } dump_batches.flush()?; diff --git a/crates/index-scheduler/src/lib.rs b/crates/index-scheduler/src/lib.rs index f81c1367b..c835c7b45 100644 --- a/crates/index-scheduler/src/lib.rs +++ b/crates/index-scheduler/src/lib.rs @@ -43,7 +43,7 @@ use std::sync::atomic::{AtomicBool, AtomicU32}; use std::sync::{Arc, RwLock}; use std::time::Duration; -use dump::{KindDump, TaskDump, UpdateFile}; +use dump::{BatchDump, KindDump, TaskDump, UpdateFile}; pub use error::Error; pub use features::RoFeatures; use file_store::FileStore; @@ -1996,6 +1996,10 @@ pub struct Dump<'a> { statuses: HashMap, kinds: HashMap, + batch_indexes: HashMap, + batch_statuses: HashMap, + batch_kinds: HashMap, + batch_to_tasks_mapping: HashMap, } @@ -2010,6 +2014,9 @@ impl<'a> Dump<'a> { indexes: HashMap::new(), statuses: HashMap::new(), kinds: HashMap::new(), + batch_indexes: HashMap::new(), + batch_statuses: HashMap::new(), + batch_kinds: HashMap::new(), batch_to_tasks_mapping: HashMap::new(), }) } @@ -2171,63 +2178,35 @@ impl<'a> Dump<'a> { /// Register a new task coming from a dump in the scheduler. /// By taking a mutable ref we're pretty sure no one will ever import a dump while actix is running. - pub fn register_dumped_batch(&mut self, batch: BatchView) -> Result<()> { - let batch = Batch { - uid: batch.uid, - // Batch cannot be processing while we import it - progress: None, - details: batch.details, - stats: batch.stats, - started_at: batch.started_at, - finished_at: batch.finished_at, - }; + pub fn register_dumped_batch(&mut self, dump: BatchDump) -> Result<()> { + let BatchDump { original, tasks } = dump; - self.index_scheduler.all_batches.put(&mut self.wtxn, &batch.uid, &batch)?; - - for index in batch.indexes() { - match self.indexes.get_mut(index) { - Some(bitmap) => { - bitmap.insert(batch.uid); - } - None => { - let mut bitmap = RoaringBitmap::new(); - bitmap.insert(batch.uid); - self.indexes.insert(index.to_string(), bitmap); - } - }; - } + self.index_scheduler.all_batches.put(&mut self.wtxn, &original.uid, &original)?; + self.index_scheduler..put(&mut self.wtxn, &original.uid, &original)?; utils::insert_task_datetime( &mut self.wtxn, - self.index_scheduler.enqueued_at, - batch.enqueued_at, - batch.uid, + self.index_scheduler.batch_enqueued_at, + original.started_at, // TODO: retrieve the enqueued_at from the dump + original.uid, )?; - // we can't override the started_at & finished_at, so we must only set it if the tasks is finished and won't change - if matches!(batch.status, Status::Succeeded | Status::Failed | Status::Canceled) { - if let Some(started_at) = batch.started_at { - utils::insert_task_datetime( - &mut self.wtxn, - self.index_scheduler.started_at, - started_at, - batch.uid, - )?; - } - if let Some(finished_at) = batch.finished_at { - utils::insert_task_datetime( - &mut self.wtxn, - self.index_scheduler.finished_at, - finished_at, - batch.uid, - )?; - } + utils::insert_task_datetime( + &mut self.wtxn, + self.index_scheduler.started_at, + original.started_at, + original.uid, + )?; + if let Some(finished_at) = original.finished_at { + utils::insert_task_datetime( + &mut self.wtxn, + self.index_scheduler.finished_at, + finished_at, + original.uid, + )?; } - self.statuses.entry(batch.status).or_default().insert(batch.uid); - self.kinds.entry(batch.kind.as_kind()).or_default().insert(batch.uid); - - Ok(batch) + Ok(()) } /// Commit all the changes and exit the importing dump state