mirror of
https://github.com/huggingface/text-generation-inference.git
synced 2025-09-12 04:44:52 +00:00
fix: return event in all cases
This commit is contained in:
parent
17c66892b5
commit
7d2aa27161
@ -456,24 +456,17 @@ async fn generate_stream(
|
|||||||
let (headers, response_stream) =
|
let (headers, response_stream) =
|
||||||
generate_stream_internal(infer, compute_type, Json(req), span).await;
|
generate_stream_internal(infer, compute_type, Json(req), span).await;
|
||||||
|
|
||||||
let final_response_stream = async_stream::stream! {
|
let response_stream = async_stream::stream! {
|
||||||
let mut response_stream = Box::pin(response_stream);
|
let mut response_stream = Box::pin(response_stream);
|
||||||
while let Some(raw_event) = response_stream.next().await {
|
while let Some(raw_event) = response_stream.next().await {
|
||||||
match raw_event {
|
yield Ok(match raw_event {
|
||||||
Ok(stream_token) => {
|
Ok(token) => Event::default().json_data(token).unwrap(),
|
||||||
let event = Event::default();
|
Err(err) => Event::from(err),
|
||||||
let event = event.json_data(stream_token).unwrap();
|
});
|
||||||
yield Ok(event);
|
|
||||||
}
|
|
||||||
Err(_err) => {
|
|
||||||
let event = Event::default();
|
|
||||||
yield Ok(event);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let sse = Sse::new(final_response_stream).keep_alive(KeepAlive::default());
|
let sse = Sse::new(response_stream).keep_alive(KeepAlive::default());
|
||||||
(headers, sse)
|
(headers, sse)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -797,7 +790,7 @@ async fn completions(
|
|||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
let final_response_stream = async_stream::stream! {
|
let response_stream = async_stream::stream! {
|
||||||
let mut response_stream = Box::pin(response_stream);
|
let mut response_stream = Box::pin(response_stream);
|
||||||
|
|
||||||
while let Some(stream_token) = response_stream.next().await {
|
while let Some(stream_token) = response_stream.next().await {
|
||||||
@ -866,7 +859,7 @@ async fn completions(
|
|||||||
let _ = header_tx.send(headers);
|
let _ = header_tx.send(headers);
|
||||||
|
|
||||||
// pin an emit messages to the sse_tx
|
// pin an emit messages to the sse_tx
|
||||||
let mut sse = Box::pin(final_response_stream);
|
let mut sse = Box::pin(response_stream);
|
||||||
while let Some(event) = sse.next().await {
|
while let Some(event) = sse.next().await {
|
||||||
if sse_tx.send(event).is_err() {
|
if sse_tx.send(event).is_err() {
|
||||||
tracing::error!("Failed to send event. Receiver dropped.");
|
tracing::error!("Failed to send event. Receiver dropped.");
|
||||||
@ -1240,7 +1233,7 @@ async fn chat_completions(
|
|||||||
let (headers, response_stream) =
|
let (headers, response_stream) =
|
||||||
generate_stream_internal(infer, compute_type, Json(generate_request), span).await;
|
generate_stream_internal(infer, compute_type, Json(generate_request), span).await;
|
||||||
|
|
||||||
let final_response_stream = async_stream::stream! {
|
let response_stream = async_stream::stream! {
|
||||||
let mut response_stream = Box::pin(response_stream);
|
let mut response_stream = Box::pin(response_stream);
|
||||||
let mut buffer = Vec::new();
|
let mut buffer = Vec::new();
|
||||||
let mut json_buffer = String::new();
|
let mut json_buffer = String::new();
|
||||||
@ -1386,7 +1379,7 @@ async fn chat_completions(
|
|||||||
yield Ok::<Event, Infallible>(Event::default().data("[DONE]"));
|
yield Ok::<Event, Infallible>(Event::default().data("[DONE]"));
|
||||||
};
|
};
|
||||||
|
|
||||||
let sse = Sse::new(final_response_stream).keep_alive(KeepAlive::default());
|
let sse = Sse::new(response_stream).keep_alive(KeepAlive::default());
|
||||||
Ok((headers, sse).into_response())
|
Ok((headers, sse).into_response())
|
||||||
} else {
|
} else {
|
||||||
let (headers, Json(generation)) =
|
let (headers, Json(generation)) =
|
||||||
|
Loading…
Reference in New Issue
Block a user