From dd506e5d87f473aba62487327a9598da7b351124 Mon Sep 17 00:00:00 2001 From: Tamo Date: Sun, 16 Oct 2022 02:01:45 +0200 Subject: [PATCH] stop dumping the current dumping task as enqueued so it's not looping for ever --- index-scheduler/src/batch.rs | 46 ++++++++++++++++++++++++++---------- 1 file changed, 34 insertions(+), 12 deletions(-) diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 6075c6145..21ed1dd39 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -22,6 +22,7 @@ use meilisearch_types::{ Index, }; use roaring::RoaringBitmap; +use time::OffsetDateTime; use uuid::Uuid; #[derive(Debug)] @@ -486,6 +487,7 @@ impl IndexScheduler { } Batch::Snapshot(_) => todo!(), Batch::Dump(mut task) => { + let started_at = OffsetDateTime::now_utc(); let KindWithContent::DumpExport { keys, instance_uid, dump_uid } = &task.kind else { unreachable!(); }; @@ -502,23 +504,43 @@ impl IndexScheduler { // 2. dump the tasks let mut tasks = dump.create_tasks_queue()?; for ret in self.all_tasks.iter(&rtxn)? { - let (_, task) = ret?; - let content_file = task.content_uuid().map(|uuid| uuid.clone()); - let mut dump_content_file = tasks.push_task(&task.into())?; + let (_, mut t) = ret?; + let status = t.status; + let content_file = t.content_uuid().map(|uuid| uuid.clone()); - // 2.1. Dump the `content_file` associated with the task if there is one. + // In the case we're dumping ourselves we want to be marked as finished + // to not loop over ourselves indefinitely. + if t.uid == task.uid { + let finished_at = OffsetDateTime::now_utc(); + + // We're going to fake the date because we don't know if everything is going to go well. + // But we need to dump the task as finished and successful. + // If something fail everything will be set appropriately in the end. + t.status = Status::Succeeded; + t.started_at = Some(started_at); + t.finished_at = Some(finished_at); + } + let mut dump_content_file = tasks.push_task(&t.into())?; + + // 2.1. Dump the `content_file` associated with the task if there is one and the task is not finished yet. if let Some(content_file) = content_file { - let content_file = self.file_store.get_update(content_file)?; + if status == Status::Enqueued { + let content_file = self.file_store.get_update(content_file)?; - let reader = DocumentsBatchReader::from_reader(content_file) - .map_err(milli::Error::from)?; + let reader = DocumentsBatchReader::from_reader(content_file) + .map_err(milli::Error::from)?; - let (mut cursor, documents_batch_index) = - reader.into_cursor_and_fields_index(); + let (mut cursor, documents_batch_index) = + reader.into_cursor_and_fields_index(); - while let Some(doc) = cursor.next_document().map_err(milli::Error::from)? { - dump_content_file - .push_document(&obkv_to_object(&doc, &documents_batch_index)?)?; + while let Some(doc) = + cursor.next_document().map_err(milli::Error::from)? + { + dump_content_file.push_document(&obkv_to_object( + &doc, + &documents_batch_index, + )?)?; + } } } }