diff --git a/crates/dump/src/writer.rs b/crates/dump/src/writer.rs index 3ee51cabf..41a1702cb 100644 --- a/crates/dump/src/writer.rs +++ b/crates/dump/src/writer.rs @@ -4,6 +4,7 @@ use std::path::PathBuf; use flate2::write::GzEncoder; use flate2::Compression; +use meilisearch_types::batch_view::BatchView; use meilisearch_types::features::RuntimeTogglableFeatures; use meilisearch_types::keys::Key; use meilisearch_types::settings::{Checked, Settings}; @@ -54,6 +55,10 @@ impl DumpWriter { TaskWriter::new(self.dir.path().join("tasks")) } + pub fn create_batches_queue(&self) -> Result { + BatchWriter::new(self.dir.path().join("batches")) + } + pub fn create_experimental_features(&self, features: RuntimeTogglableFeatures) -> Result<()> { Ok(std::fs::write( self.dir.path().join("experimental-features.json"), @@ -156,6 +161,31 @@ impl UpdateFile { } } +pub struct BatchWriter { + queue: BufWriter, +} + +impl BatchWriter { + pub(crate) fn new(path: PathBuf) -> Result { + std::fs::create_dir(&path)?; + let queue = File::create(path.join("queue.jsonl"))?; + Ok(BatchWriter { queue: BufWriter::new(queue) }) + } + + /// Pushes batches in the dump. + /// The batches doesn't contains any private information thus we don't need a special type like with the tasks. + pub fn push_batch(&mut self, batch: &BatchView) -> Result<()> { + self.queue.write_all(&serde_json::to_vec(batch)?)?; + self.queue.write_all(b"\n")?; + Ok(()) + } + + pub fn flush(mut self) -> Result<()> { + self.queue.flush()?; + Ok(()) + } +} + pub struct IndexWriter { documents: BufWriter, settings: File, diff --git a/crates/index-scheduler/src/batch.rs b/crates/index-scheduler/src/batch.rs index e23d1c1bb..c463bea35 100644 --- a/crates/index-scheduler/src/batch.rs +++ b/crates/index-scheduler/src/batch.rs @@ -17,7 +17,7 @@ tasks individually, but should be much faster since we are only performing one indexing operation. */ -use std::collections::{BTreeSet, HashMap, HashSet}; +use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use std::ffi::OsStr; use std::fmt; use std::fs::{self, File}; @@ -27,6 +27,7 @@ use std::sync::atomic::Ordering; use bumpalo::collections::CollectIn; use bumpalo::Bump; use dump::IndexMetadata; +use meilisearch_types::batch_view::BatchView; use meilisearch_types::batches::BatchId; use meilisearch_types::heed::{RoTxn, RwTxn}; use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader, PrimaryKey}; @@ -790,7 +791,8 @@ impl IndexScheduler { let rtxn = self.env.read_txn()?; - // 2. dump the tasks + // 2. dump the queue + // 2.1. dump the tasks progress.update_progress(DumpCreationProgress::DumpTheTasks); let mut dump_tasks = dump.create_tasks_queue()?; @@ -821,7 +823,7 @@ impl IndexScheduler { } let mut dump_content_file = dump_tasks.push_task(&t.into())?; - // 2.1. Dump the `content_file` associated with the task if there is one and the task is not finished yet. + // 2.1.1. Dump the `content_file` associated with the task if there is one and the task is not finished yet. if let Some(content_file) = content_file { if self.must_stop_processing.get() { return Err(Error::AbortedTask); @@ -851,6 +853,39 @@ impl IndexScheduler { } dump_tasks.flush()?; + // 2.2. dump the batches + let mut dump_batches = dump.create_batches_queue()?; + let (atomic, update_batch_progress) = + AtomicBatchStep::new(self.all_batches.len(&rtxn)? as u32); + progress.update_progress(update_batch_progress); + + for ret in self.all_batches.iter(&rtxn)? { + if self.must_stop_processing.get() { + return Err(Error::AbortedTask); + } + + let (_, mut batch) = ret?; + + // In the case we're dumping ourselves we want to be marked as finished + // to not loop over ourselves indefinitely. + if task.batch_uid == Some(batch.uid) { + let finished_at = OffsetDateTime::now_utc(); + + // We're going to fake the date because we don't know if everything is going to go well. + // But we need to dump the task as finished and successful. + // If something fail everything will be set appropriately in the end. + batch.progress = None; + let mut statuses = BTreeMap::new(); + statuses.insert(Status::Succeeded, 1); + batch.stats.status = statuses; + batch.started_at = started_at; + batch.finished_at = Some(finished_at); + } + dump_batches.push_batch(&BatchView::from_batch(&batch))?; + atomic.fetch_add(1, Ordering::Relaxed); + } + dump_batches.flush()?; + // 3. Dump the indexes progress.update_progress(DumpCreationProgress::DumpTheIndexes); let nb_indexes = self.index_mapper.index_mapping.len(&rtxn)? as u32; diff --git a/crates/index-scheduler/src/lib.rs b/crates/index-scheduler/src/lib.rs index f5f73087d..f81c1367b 100644 --- a/crates/index-scheduler/src/lib.rs +++ b/crates/index-scheduler/src/lib.rs @@ -49,6 +49,7 @@ pub use features::RoFeatures; use file_store::FileStore; use flate2::bufread::GzEncoder; use flate2::Compression; +use meilisearch_types::batch_view::BatchView; use meilisearch_types::batches::{Batch, BatchId}; use meilisearch_types::error::ResponseError; use meilisearch_types::features::{InstanceTogglableFeatures, RuntimeTogglableFeatures}; @@ -1994,6 +1995,8 @@ pub struct Dump<'a> { indexes: HashMap, statuses: HashMap, kinds: HashMap, + + batch_to_tasks_mapping: HashMap, } impl<'a> Dump<'a> { @@ -2007,6 +2010,7 @@ impl<'a> Dump<'a> { indexes: HashMap::new(), statuses: HashMap::new(), kinds: HashMap::new(), + batch_to_tasks_mapping: HashMap::new(), }) } @@ -2158,9 +2162,74 @@ impl<'a> Dump<'a> { self.statuses.entry(task.status).or_default().insert(task.uid); self.kinds.entry(task.kind.as_kind()).or_default().insert(task.uid); + if let Some(batch_uid) = task.batch_uid { + self.batch_to_tasks_mapping.entry(batch_uid).or_default().insert(task.uid); + } + Ok(task) } + /// Register a new task coming from a dump in the scheduler. + /// By taking a mutable ref we're pretty sure no one will ever import a dump while actix is running. + pub fn register_dumped_batch(&mut self, batch: BatchView) -> Result<()> { + let batch = Batch { + uid: batch.uid, + // Batch cannot be processing while we import it + progress: None, + details: batch.details, + stats: batch.stats, + started_at: batch.started_at, + finished_at: batch.finished_at, + }; + + self.index_scheduler.all_batches.put(&mut self.wtxn, &batch.uid, &batch)?; + + for index in batch.indexes() { + match self.indexes.get_mut(index) { + Some(bitmap) => { + bitmap.insert(batch.uid); + } + None => { + let mut bitmap = RoaringBitmap::new(); + bitmap.insert(batch.uid); + self.indexes.insert(index.to_string(), bitmap); + } + }; + } + + utils::insert_task_datetime( + &mut self.wtxn, + self.index_scheduler.enqueued_at, + batch.enqueued_at, + batch.uid, + )?; + + // we can't override the started_at & finished_at, so we must only set it if the tasks is finished and won't change + if matches!(batch.status, Status::Succeeded | Status::Failed | Status::Canceled) { + if let Some(started_at) = batch.started_at { + utils::insert_task_datetime( + &mut self.wtxn, + self.index_scheduler.started_at, + started_at, + batch.uid, + )?; + } + if let Some(finished_at) = batch.finished_at { + utils::insert_task_datetime( + &mut self.wtxn, + self.index_scheduler.finished_at, + finished_at, + batch.uid, + )?; + } + } + + self.statuses.entry(batch.status).or_default().insert(batch.uid); + self.kinds.entry(batch.kind.as_kind()).or_default().insert(batch.uid); + + Ok(batch) + } + /// Commit all the changes and exit the importing dump state pub fn finish(mut self) -> Result<()> { for (index, bitmap) in self.indexes {