diff --git a/router/src/server.rs b/router/src/server.rs index 94d89703..f6df85d8 100644 --- a/router/src/server.rs +++ b/router/src/server.rs @@ -456,24 +456,17 @@ async fn generate_stream( let (headers, response_stream) = generate_stream_internal(infer, compute_type, Json(req), span).await; - let final_response_stream = async_stream::stream! { + let response_stream = async_stream::stream! { let mut response_stream = Box::pin(response_stream); while let Some(raw_event) = response_stream.next().await { - match raw_event { - Ok(stream_token) => { - let event = Event::default(); - let event = event.json_data(stream_token).unwrap(); - yield Ok(event); - } - Err(_err) => { - let event = Event::default(); - yield Ok(event); - } - } + yield Ok(match raw_event { + Ok(token) => Event::default().json_data(token).unwrap(), + Err(err) => Event::from(err), + }); } }; - let sse = Sse::new(final_response_stream).keep_alive(KeepAlive::default()); + let sse = Sse::new(response_stream).keep_alive(KeepAlive::default()); (headers, sse) } @@ -797,7 +790,7 @@ async fn completions( ) .await; - let final_response_stream = async_stream::stream! { + let response_stream = async_stream::stream! { let mut response_stream = Box::pin(response_stream); while let Some(stream_token) = response_stream.next().await { @@ -866,7 +859,7 @@ async fn completions( let _ = header_tx.send(headers); // pin an emit messages to the sse_tx - let mut sse = Box::pin(final_response_stream); + let mut sse = Box::pin(response_stream); while let Some(event) = sse.next().await { if sse_tx.send(event).is_err() { tracing::error!("Failed to send event. Receiver dropped."); @@ -1240,7 +1233,7 @@ async fn chat_completions( let (headers, response_stream) = generate_stream_internal(infer, compute_type, Json(generate_request), span).await; - let final_response_stream = async_stream::stream! { + let response_stream = async_stream::stream! { let mut response_stream = Box::pin(response_stream); let mut buffer = Vec::new(); let mut json_buffer = String::new(); @@ -1386,7 +1379,7 @@ async fn chat_completions( yield Ok::(Event::default().data("[DONE]")); }; - let sse = Sse::new(final_response_stream).keep_alive(KeepAlive::default()); + let sse = Sse::new(response_stream).keep_alive(KeepAlive::default()); Ok((headers, sse).into_response()) } else { let (headers, Json(generation)) =