From 0b2fff27f231c3eb37c9340a6879135c70a227d2 Mon Sep 17 00:00:00 2001 From: Tamo Date: Tue, 28 Nov 2023 15:55:48 +0100 Subject: [PATCH] update and fix the test --- index-scheduler/src/lib.rs | 1 - meilisearch/tests/tasks/webhook.rs | 73 ++++++++++++++++++++---------- 2 files changed, 49 insertions(+), 25 deletions(-) diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index d96b6c2af..5d44ed104 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -1288,7 +1288,6 @@ impl IndexScheduler { buffer.push(b'\n'); } - println!("Sending request to {url}"); let _ = ureq::post(url).send_bytes(&buffer).unwrap(); } diff --git a/meilisearch/tests/tasks/webhook.rs b/meilisearch/tests/tasks/webhook.rs index 613b4ff3f..e852839ec 100644 --- a/meilisearch/tests/tasks/webhook.rs +++ b/meilisearch/tests/tasks/webhook.rs @@ -1,10 +1,14 @@ +//! To test the webhook, we need to spawn a new server with a URL listening for +//! post requests. The webhook handle starts a server and forwards all the +//! received requests into a channel for you to handle. + use std::sync::Arc; use actix_http::body::MessageBody; use actix_web::dev::{ServiceFactory, ServiceResponse}; use actix_web::web::{Bytes, Data}; use actix_web::{post, App, HttpResponse, HttpServer}; -use meili_snap::snapshot; +use meili_snap::{json_string, snapshot}; use meilisearch::Opt; use tokio::sync::mpsc; @@ -13,7 +17,6 @@ use crate::json; #[post("/")] async fn forward_body(sender: Data>>, body: Bytes) -> HttpResponse { - println!("Received something"); let body = body.to_vec(); sender.send(body).unwrap(); HttpResponse::Ok().into() @@ -47,51 +50,73 @@ async fn create_webhook_server() -> WebhookHandle { let (sender, receiver) = mpsc::unbounded_channel(); let sender = Arc::new(sender); + // By listening on the port 0, the system will give us any available port. let server = HttpServer::new(move || create_app(sender.clone())).bind(("127.0.0.1", 0)).unwrap(); let (ip, scheme) = server.addrs_with_scheme()[0]; let url = format!("{scheme}://{ip}/"); - println!("url is {url}"); - // TODO: Is it cleaned once the test is over let server_handle = tokio::spawn(server.run()); - WebhookHandle { server_handle, url, receiver } } #[actix_web::test] async fn test_basic_webhook() { - // Request a new server from the pool - let mut handle = create_webhook_server().await; + let WebhookHandle { server_handle, url, mut receiver } = create_webhook_server().await; let db_path = tempfile::tempdir().unwrap(); let server = Server::new_with_options(Opt { - task_webhook_url: Some(handle.url.clone()), + task_webhook_url: Some(url), ..default_settings(db_path.path()) }) .await .unwrap(); let index = server.index("tamo"); - // TODO: may be flaky, we're relying on the fact that during the time the first document addition succeed, the two other operations will be received. - for i in 0..3 { + // May be flaky: we're relying on the fact that while the first document addition is processed, the other + // operations will be received and will be batched together. If it doesn't happen it's not a problem + // the rest of the test won't assume anything about the number of tasks per batch. + for i in 0..5 { let (_, _status) = index.add_documents(json!({ "id": i, "doggo": "bone" }), None).await; } - let payload = handle.receiver.recv().await.unwrap(); - let jsonl = String::from_utf8(payload).unwrap(); + let mut nb_tasks = 0; + while let Some(payload) = receiver.recv().await { + let payload = String::from_utf8(payload).unwrap(); + let jsonl = payload.split('\n'); + for json in jsonl { + if json.is_empty() { + break; // we reached EOF + } + nb_tasks += 1; + let json: serde_json::Value = serde_json::from_str(json).unwrap(); + snapshot!( + json_string!(json, { ".uid" => "[uid]", ".duration" => "[duration]", ".enqueuedAt" => "[date]", ".startedAt" => "[date]", ".finishedAt" => "[date]" }), + @r###" + { + "uid": "[uid]", + "indexUid": "tamo", + "status": "succeeded", + "type": "documentAdditionOrUpdate", + "canceledBy": null, + "details": { + "receivedDocuments": 1, + "indexedDocuments": 1 + }, + "error": null, + "duration": "[duration]", + "enqueuedAt": "[date]", + "startedAt": "[date]", + "finishedAt": "[date]" + } + "###); + } + if nb_tasks == 5 { + break; + } + } - snapshot!(jsonl, - @r###" - {"uid":0,"indexUid":"tamo","status":"succeeded","type":"documentAdditionOrUpdate","canceledBy":null,"details":{"receivedDocuments":1,"indexedDocuments":1},"error":null,"duration":"PT0.027444S","enqueuedAt":"2023-11-28T14:05:37.767678Z","startedAt":"2023-11-28T14:05:37.769519Z","finishedAt":"2023-11-28T14:05:37.796963Z"} - "###); + assert!(nb_tasks == 5, "We should have received the 5 tasks but only received {nb_tasks}"); - let payload = handle.receiver.recv().await.unwrap(); - let jsonl = String::from_utf8(payload).unwrap(); - - snapshot!(jsonl, - @r###" - {"uid":1,"indexUid":"tamo","status":"succeeded","type":"documentAdditionOrUpdate","canceledBy":null,"details":{"receivedDocuments":1,"indexedDocuments":1},"error":null,"duration":"PT0.020221S","enqueuedAt":"2023-11-28T14:05:37.773731Z","startedAt":"2023-11-28T14:05:37.799448Z","finishedAt":"2023-11-28T14:05:37.819669Z"} - {"uid":2,"indexUid":"tamo","status":"succeeded","type":"documentAdditionOrUpdate","canceledBy":null,"details":{"receivedDocuments":1,"indexedDocuments":1},"error":null,"duration":"PT0.020221S","enqueuedAt":"2023-11-28T14:05:37.780466Z","startedAt":"2023-11-28T14:05:37.799448Z","finishedAt":"2023-11-28T14:05:37.819669Z"} - "###); + server_handle.abort(); }