feat: emit params in logs for each request

This commit is contained in:
drbh 2024-04-17 01:18:09 +00:00
parent 593c443b45
commit bd28c36815

View File

@ -657,6 +657,7 @@ async fn completions(
format!("{}-{}", info.version, info.docker_label.unwrap_or("native")); format!("{}-{}", info.version, info.docker_label.unwrap_or("native"));
let infer_clone = infer.clone(); let infer_clone = infer.clone();
let compute_type_clone = compute_type.clone(); let compute_type_clone = compute_type.clone();
let params_clone = generate_request.parameters.clone();
// Create a future for each generate_stream_internal call. // Create a future for each generate_stream_internal call.
let generate_future = async move { let generate_future = async move {
@ -690,27 +691,32 @@ async fn completions(
let (header_tx, header_rx) = oneshot::channel(); let (header_tx, header_rx) = oneshot::channel();
let (sse_tx, sse_rx) = tokio::sync::mpsc::unbounded_channel(); let (sse_tx, sse_rx) = tokio::sync::mpsc::unbounded_channel();
tokio::spawn(async move { tokio::spawn(
let (header_map, sse) = generate_stream_internal( async move {
infer_clone.clone(), let (header_map, sse) = generate_stream_internal(
compute_type_clone.clone(), infer_clone.clone(),
Json(generate_request), compute_type_clone.clone(),
on_message_callback, Json(generate_request),
) on_message_callback,
.await; )
.await;
// send and dont wait for response // send and dont wait for response
let _ = header_tx.send(header_map); let _ = header_tx.send(header_map);
// pin an emit messages to the sse_tx // pin an emit messages to the sse_tx
let mut sse = Box::pin(sse); let mut sse = Box::pin(sse);
while let Some(event) = sse.next().await { while let Some(event) = sse.next().await {
if sse_tx.send(event).is_err() { if sse_tx.send(event).is_err() {
tracing::error!("Failed to send event. Receiver dropped."); tracing::error!("Failed to send event. Receiver dropped.");
break; break;
}
} }
} }
}); .instrument(
tracing::info_span!("request", index = %index, parameters = ?params_clone),
),
);
(header_rx, sse_rx) (header_rx, sse_rx)
}; };
@ -796,6 +802,7 @@ async fn completions(
for (index, generate_request) in generate_requests.into_iter().enumerate() { for (index, generate_request) in generate_requests.into_iter().enumerate() {
let infer_clone = infer.clone(); let infer_clone = infer.clone();
let compute_type_clone = compute_type.clone(); let compute_type_clone = compute_type.clone();
let params_clone = generate_request.parameters.clone();
let response_future = async move { let response_future = async move {
let result = generate( let result = generate(
Extension(infer_clone), Extension(infer_clone),
@ -804,7 +811,8 @@ async fn completions(
) )
.await; .await;
result.map(|(headers, generation)| (index, headers, generation)) result.map(|(headers, generation)| (index, headers, generation))
}; }
.instrument(tracing::info_span!("request", index = %index, parameters = ?params_clone));
responses.push(response_future); responses.push(response_future);
} }
let generate_responses = responses.try_collect::<Vec<_>>().await?; let generate_responses = responses.try_collect::<Vec<_>>().await?;