update and fix the test

This commit is contained in:
Tamo 2023-11-28 15:55:48 +01:00 committed by Clément Renault
parent 3adbc2b942
commit 0b2fff27f2
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
2 changed files with 49 additions and 25 deletions

View File

@ -1288,7 +1288,6 @@ impl IndexScheduler {
buffer.push(b'\n'); buffer.push(b'\n');
} }
println!("Sending request to {url}");
let _ = ureq::post(url).send_bytes(&buffer).unwrap(); let _ = ureq::post(url).send_bytes(&buffer).unwrap();
} }

View File

@ -1,10 +1,14 @@
//! To test the webhook, we need to spawn a new server with a URL listening for
//! post requests. The webhook handle starts a server and forwards all the
//! received requests into a channel for you to handle.
use std::sync::Arc; use std::sync::Arc;
use actix_http::body::MessageBody; use actix_http::body::MessageBody;
use actix_web::dev::{ServiceFactory, ServiceResponse}; use actix_web::dev::{ServiceFactory, ServiceResponse};
use actix_web::web::{Bytes, Data}; use actix_web::web::{Bytes, Data};
use actix_web::{post, App, HttpResponse, HttpServer}; use actix_web::{post, App, HttpResponse, HttpServer};
use meili_snap::snapshot; use meili_snap::{json_string, snapshot};
use meilisearch::Opt; use meilisearch::Opt;
use tokio::sync::mpsc; use tokio::sync::mpsc;
@ -13,7 +17,6 @@ use crate::json;
#[post("/")] #[post("/")]
async fn forward_body(sender: Data<mpsc::UnboundedSender<Vec<u8>>>, body: Bytes) -> HttpResponse { async fn forward_body(sender: Data<mpsc::UnboundedSender<Vec<u8>>>, body: Bytes) -> HttpResponse {
println!("Received something");
let body = body.to_vec(); let body = body.to_vec();
sender.send(body).unwrap(); sender.send(body).unwrap();
HttpResponse::Ok().into() HttpResponse::Ok().into()
@ -47,51 +50,73 @@ async fn create_webhook_server() -> WebhookHandle {
let (sender, receiver) = mpsc::unbounded_channel(); let (sender, receiver) = mpsc::unbounded_channel();
let sender = Arc::new(sender); let sender = Arc::new(sender);
// By listening on the port 0, the system will give us any available port.
let server = let server =
HttpServer::new(move || create_app(sender.clone())).bind(("127.0.0.1", 0)).unwrap(); HttpServer::new(move || create_app(sender.clone())).bind(("127.0.0.1", 0)).unwrap();
let (ip, scheme) = server.addrs_with_scheme()[0]; let (ip, scheme) = server.addrs_with_scheme()[0];
let url = format!("{scheme}://{ip}/"); let url = format!("{scheme}://{ip}/");
println!("url is {url}");
// TODO: Is it cleaned once the test is over
let server_handle = tokio::spawn(server.run()); let server_handle = tokio::spawn(server.run());
WebhookHandle { server_handle, url, receiver } WebhookHandle { server_handle, url, receiver }
} }
#[actix_web::test] #[actix_web::test]
async fn test_basic_webhook() { async fn test_basic_webhook() {
// Request a new server from the pool let WebhookHandle { server_handle, url, mut receiver } = create_webhook_server().await;
let mut handle = create_webhook_server().await;
let db_path = tempfile::tempdir().unwrap(); let db_path = tempfile::tempdir().unwrap();
let server = Server::new_with_options(Opt { let server = Server::new_with_options(Opt {
task_webhook_url: Some(handle.url.clone()), task_webhook_url: Some(url),
..default_settings(db_path.path()) ..default_settings(db_path.path())
}) })
.await .await
.unwrap(); .unwrap();
let index = server.index("tamo"); let index = server.index("tamo");
// TODO: may be flaky, we're relying on the fact that during the time the first document addition succeed, the two other operations will be received. // May be flaky: we're relying on the fact that while the first document addition is processed, the other
for i in 0..3 { // operations will be received and will be batched together. If it doesn't happen it's not a problem
// the rest of the test won't assume anything about the number of tasks per batch.
for i in 0..5 {
let (_, _status) = index.add_documents(json!({ "id": i, "doggo": "bone" }), None).await; let (_, _status) = index.add_documents(json!({ "id": i, "doggo": "bone" }), None).await;
} }
let payload = handle.receiver.recv().await.unwrap(); let mut nb_tasks = 0;
let jsonl = String::from_utf8(payload).unwrap(); while let Some(payload) = receiver.recv().await {
let payload = String::from_utf8(payload).unwrap();
snapshot!(jsonl, let jsonl = payload.split('\n');
for json in jsonl {
if json.is_empty() {
break; // we reached EOF
}
nb_tasks += 1;
let json: serde_json::Value = serde_json::from_str(json).unwrap();
snapshot!(
json_string!(json, { ".uid" => "[uid]", ".duration" => "[duration]", ".enqueuedAt" => "[date]", ".startedAt" => "[date]", ".finishedAt" => "[date]" }),
@r###" @r###"
{"uid":0,"indexUid":"tamo","status":"succeeded","type":"documentAdditionOrUpdate","canceledBy":null,"details":{"receivedDocuments":1,"indexedDocuments":1},"error":null,"duration":"PT0.027444S","enqueuedAt":"2023-11-28T14:05:37.767678Z","startedAt":"2023-11-28T14:05:37.769519Z","finishedAt":"2023-11-28T14:05:37.796963Z"} {
"###); "uid": "[uid]",
"indexUid": "tamo",
let payload = handle.receiver.recv().await.unwrap(); "status": "succeeded",
let jsonl = String::from_utf8(payload).unwrap(); "type": "documentAdditionOrUpdate",
"canceledBy": null,
snapshot!(jsonl, "details": {
@r###" "receivedDocuments": 1,
{"uid":1,"indexUid":"tamo","status":"succeeded","type":"documentAdditionOrUpdate","canceledBy":null,"details":{"receivedDocuments":1,"indexedDocuments":1},"error":null,"duration":"PT0.020221S","enqueuedAt":"2023-11-28T14:05:37.773731Z","startedAt":"2023-11-28T14:05:37.799448Z","finishedAt":"2023-11-28T14:05:37.819669Z"} "indexedDocuments": 1
{"uid":2,"indexUid":"tamo","status":"succeeded","type":"documentAdditionOrUpdate","canceledBy":null,"details":{"receivedDocuments":1,"indexedDocuments":1},"error":null,"duration":"PT0.020221S","enqueuedAt":"2023-11-28T14:05:37.780466Z","startedAt":"2023-11-28T14:05:37.799448Z","finishedAt":"2023-11-28T14:05:37.819669Z"} },
"error": null,
"duration": "[duration]",
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]"
}
"###); "###);
} }
if nb_tasks == 5 {
break;
}
}
assert!(nb_tasks == 5, "We should have received the 5 tasks but only received {nb_tasks}");
server_handle.abort();
}