Simplify stream implementation

This commit is contained in:
Louis Dureuil 2024-01-30 12:27:49 +01:00
parent 31df954aa5
commit 9507a20d8b
No known key found for this signature in database

View File

@ -99,6 +99,16 @@ impl futures_util::Stream for LogStreamer {
} }
} }
impl LogStreamer {
pub fn into_stream(self) -> impl futures_util::Stream<Item = Result<Bytes, ResponseError>> {
futures_util::stream::unfold(self, move |mut this| async move {
let vec = this.receiver.recv().await;
vec.map(From::from).map(Ok).map(|a| (a, this))
})
}
}
pub fn make_layer< pub fn make_layer<
S: tracing::Subscriber + for<'span> tracing_subscriber::registry::LookupSpan<'span>, S: tracing::Subscriber + for<'span> tracing_subscriber::registry::LookupSpan<'span>,
>( >(
@ -159,7 +169,7 @@ pub async fn get_logs(
.unwrap(); .unwrap();
if was_available { if was_available {
Ok(HttpResponse::Ok().streaming(LogStreamer { receiver })) Ok(HttpResponse::Ok().streaming(LogStreamer { receiver }.into_stream()))
} else { } else {
Err(MeilisearchHttpError::AlreadyUsedLogRoute.into()) Err(MeilisearchHttpError::AlreadyUsedLogRoute.into())
} }