diff --git a/router/src/infer.rs b/router/src/infer.rs index 787ccfcf..8547df98 100644 --- a/router/src/infer.rs +++ b/router/src/infer.rs @@ -257,6 +257,7 @@ impl Infer { /// /// Batches requests and sends them to the inference server #[allow(clippy::too_many_arguments)] +#[instrument(skip_all)] async fn batching_task( mut client: ShardedClient, waiting_served_ratio: f32, @@ -275,6 +276,7 @@ async fn batching_task( // Get the next batch from the queue // This batch might be smaller than the maximum batch size if there are not enough requests // waiting in the queue + tracing::debug!("First batch"); while let Some((mut entries, batch, span)) = queue .next_batch(None, max_batch_prefill_tokens, max_batch_total_tokens) .await @@ -378,6 +380,8 @@ async fn prefill( let batch_id = batch.id; metrics::increment_counter!("tgi_batch_inference_count", "method" => "prefill"); + tracing::debug!("Prefill"); + match client.prefill(batch).await { Ok((generations, next_batch)) => { // Update health @@ -415,6 +419,8 @@ async fn decode( let batch_ids: Vec = batches.iter().map(|b| b.id).collect(); metrics::increment_counter!("tgi_batch_inference_count", "method" => "decode"); + tracing::debug!("Decode"); + match client.decode(batches).await { Ok((generations, next_batch)) => { // Update health @@ -513,6 +519,7 @@ fn send_responses( ) -> Result>>> { // Return directly if the channel is disconnected if entry.response_tx.is_disconnected() { + metrics::increment_counter!("tgi_request_failure", "err" => "dropped"); return Ok(true); } diff --git a/router/src/queue.rs b/router/src/queue.rs index 1ab9eb11..7eee91ea 100644 --- a/router/src/queue.rs +++ b/router/src/queue.rs @@ -82,7 +82,9 @@ impl Queue { .unwrap(); // Await on response channel // Unwrap is safe here - response_receiver.await.unwrap() + let response = response_receiver.await.unwrap(); + tracing::debug!("Next batch: {}", response.is_some()); + response } }