Fix task serialization

This commit is contained in:
ManyTheFish 2022-08-11 13:35:35 +02:00
parent 3a48de136e
commit ae174c2cca
3 changed files with 31 additions and 24 deletions

View File

@ -31,7 +31,7 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<MeiliSearch> {
// disable autobatching? // disable autobatching?
AUTOBATCHING_ENABLED.store( AUTOBATCHING_ENABLED.store(
opt.scheduler_options.disable_auto_batching, !opt.scheduler_options.disable_auto_batching,
std::sync::atomic::Ordering::Relaxed, std::sync::atomic::Ordering::Relaxed,
); );

View File

@ -231,7 +231,7 @@ pub struct TaskView {
#[serde(serialize_with = "time::serde::rfc3339::option::serialize")] #[serde(serialize_with = "time::serde::rfc3339::option::serialize")]
finished_at: Option<OffsetDateTime>, finished_at: Option<OffsetDateTime>,
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
batch_uid: Option<Option<BatchId>>, batch_uid: Option<BatchId>,
} }
impl From<Task> for TaskView { impl From<Task> for TaskView {
@ -380,15 +380,15 @@ impl From<Task> for TaskView {
let duration = finished_at.zip(started_at).map(|(tf, ts)| (tf - ts)); let duration = finished_at.zip(started_at).map(|(tf, ts)| (tf - ts));
let batch_uid = if AUTOBATCHING_ENABLED.load(std::sync::atomic::Ordering::Relaxed) { let batch_uid = AUTOBATCHING_ENABLED
let id = events.iter().find_map(|e| match e { .load(std::sync::atomic::Ordering::Relaxed)
.then(|| {
events.iter().find_map(|e| match e {
TaskEvent::Batched { batch_id, .. } => Some(*batch_id), TaskEvent::Batched { batch_id, .. } => Some(*batch_id),
_ => None, _ => None,
}); })
Some(id) })
} else { .flatten();
None
};
Self { Self {
uid: id, uid: id,

View File

@ -1126,10 +1126,10 @@ async fn batch_several_documents_addition() {
index.wait_task(4).await; index.wait_task(4).await;
// run a second completely failing batch // run a second completely failing batch
documents[40] = json!({"title": "error", "desc": "error"});
documents[70] = json!({"title": "error", "desc": "error"});
documents[130] = json!({"title": "error", "desc": "error"});
for chunk in documents.chunks(30) { for chunk in documents.chunks(30) {
let mut chunk = chunk.to_vec();
chunk[0] = json!({"title": "error", "desc": "error"});
index.add_documents(json!(chunk), Some("id")).await; index.add_documents(json!(chunk), Some("id")).await;
} }
// wait second batch of documents to finish // wait second batch of documents to finish
@ -1144,16 +1144,23 @@ async fn batch_several_documents_addition() {
json!( json!(
{ {
"results": [ "results": [
{"uid": 9, "status": "failed"}, // Completelly failing batch
{"uid": 8, "status": "failed"}, {"uid": 9, "status": "failed", "batchUid": 6},
{"uid": 7, "status": "failed"}, {"uid": 8, "status": "failed", "batchUid": 6},
{"uid": 6, "status": "failed"}, {"uid": 7, "status": "failed", "batchUid": 6},
{"uid": 5, "status": "failed"}, {"uid": 6, "status": "failed", "batchUid": 6},
{"uid": 4, "status": "succeeded"},
{"uid": 3, "status": "failed"}, // Inter-batch
{"uid": 2, "status": "succeeded"}, {"uid": 5, "status": "succeeded", "batchUid": 5},
{"uid": 1, "status": "succeeded"},
{"uid": 0, "status": "succeeded"}, // 1 fail in an succeded batch
{"uid": 4, "status": "succeeded", "batchUid": 1},
{"uid": 3, "status": "failed", "batchUid": 1},
{"uid": 2, "status": "succeeded", "batchUid": 1},
{"uid": 1, "status": "succeeded", "batchUid": 1},
// Inter-batch
{"uid": 0, "status": "succeeded", "batchUid": 0},
] ]
} }
) )