diff --git a/router/src/server.rs b/router/src/server.rs index a9779420..f0cf37d2 100644 --- a/router/src/server.rs +++ b/router/src/server.rs @@ -657,6 +657,7 @@ async fn completions( format!("{}-{}", info.version, info.docker_label.unwrap_or("native")); let infer_clone = infer.clone(); let compute_type_clone = compute_type.clone(); + let params_clone = generate_request.parameters.clone(); // Create a future for each generate_stream_internal call. let generate_future = async move { @@ -690,27 +691,32 @@ async fn completions( let (header_tx, header_rx) = oneshot::channel(); let (sse_tx, sse_rx) = tokio::sync::mpsc::unbounded_channel(); - tokio::spawn(async move { - let (header_map, sse) = generate_stream_internal( - infer_clone.clone(), - compute_type_clone.clone(), - Json(generate_request), - on_message_callback, - ) - .await; + tokio::spawn( + async move { + let (header_map, sse) = generate_stream_internal( + infer_clone.clone(), + compute_type_clone.clone(), + Json(generate_request), + on_message_callback, + ) + .await; - // send and dont wait for response - let _ = header_tx.send(header_map); + // send and dont wait for response + let _ = header_tx.send(header_map); - // pin an emit messages to the sse_tx - let mut sse = Box::pin(sse); - while let Some(event) = sse.next().await { - if sse_tx.send(event).is_err() { - tracing::error!("Failed to send event. Receiver dropped."); - break; + // pin an emit messages to the sse_tx + let mut sse = Box::pin(sse); + while let Some(event) = sse.next().await { + if sse_tx.send(event).is_err() { + tracing::error!("Failed to send event. Receiver dropped."); + break; + } } } - }); + .instrument( + tracing::info_span!("request", index = %index, parameters = ?params_clone), + ), + ); (header_rx, sse_rx) }; @@ -796,6 +802,7 @@ async fn completions( for (index, generate_request) in generate_requests.into_iter().enumerate() { let infer_clone = infer.clone(); let compute_type_clone = compute_type.clone(); + let params_clone = generate_request.parameters.clone(); let response_future = async move { let result = generate( Extension(infer_clone), @@ -804,7 +811,8 @@ async fn completions( ) .await; result.map(|(headers, generation)| (index, headers, generation)) - }; + } + .instrument(tracing::info_span!("request", index = %index, parameters = ?params_clone)); responses.push(response_future); } let generate_responses = responses.try_collect::>().await?;