From 67cd625c82e926609199f7ce537d904d33adb5f4 Mon Sep 17 00:00:00 2001 From: OlivierDehaene <23298448+OlivierDehaene@users.noreply.github.com> Date: Fri, 10 Feb 2023 15:30:53 +0100 Subject: [PATCH] improved instrumentation --- README.md | 6 ++++++ router/client/src/client.rs | 4 ++-- router/client/src/sharded_client.rs | 4 ++-- router/src/infer.rs | 13 +++++++------ router/src/main.rs | 1 - router/src/queue.rs | 12 ++++++------ router/src/validation.rs | 2 +- server/text_generation/cli.py | 3 +-- 8 files changed, 25 insertions(+), 20 deletions(-) diff --git a/README.md b/README.md index 938f602a..2ba89b51 100644 --- a/README.md +++ b/README.md @@ -27,6 +27,7 @@ to power LLMs api-inference widgets. - [Docker](#docker) - [API Documentation](#api-documentation) - [A note on Shared Memory](#a-note-on-shared-memory-shm) + - [Distributed Tracing](#distributed-tracing) - [Local Install](#local-install) - [CUDA Kernels](#cuda-kernels) - [Run BLOOM](#run-bloom) @@ -103,6 +104,11 @@ curl 127.0.0.1:8080/generate_stream \ You can consult the OpenAPI documentation of the `text-generation-inference` REST API using the `/docs` route. The Swagger UI is also available at: [https://huggingface.github.io/text-generation-inference](https://huggingface.github.io/text-generation-inference). +### Distributed Tracing + +`text-generation-inference` is instrumented with distributed tracing using OpenTelemetry. You can use this feature +by setting the address to an OTLP collector with the `--otlp-endpoint` argument. + ### A note on Shared Memory (shm) [`NCCL`](https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/index.html) is a communication framework used by diff --git a/router/client/src/client.rs b/router/client/src/client.rs index 199182f4..1f0d23f2 100644 --- a/router/client/src/client.rs +++ b/router/client/src/client.rs @@ -66,7 +66,7 @@ impl Client { /// /// Returns Generation for each request in batch /// and the next cached batch - #[instrument(skip(self))] + #[instrument(skip_all, fields(id = &batch.id, size = &batch.size))] pub async fn prefill(&mut self, batch: Batch) -> Result<(Vec, Option)> { let request = tonic::Request::new(PrefillRequest { batch: Some(batch) }).inject_context(); let response = self.stub.prefill(request).await?.into_inner(); @@ -77,7 +77,7 @@ impl Client { /// /// Returns Generation for each request in batches /// and the next cached batch - #[instrument(skip(self))] + #[instrument(skip_all, fields(size = batches.iter().map(|batch|{batch.size}).sum::()))] pub async fn decode( &mut self, batches: Vec, diff --git a/router/client/src/sharded_client.rs b/router/client/src/sharded_client.rs index f77425cd..2e662ca3 100644 --- a/router/client/src/sharded_client.rs +++ b/router/client/src/sharded_client.rs @@ -53,7 +53,7 @@ impl ShardedClient { /// /// Returns Generation for each request in batch /// and the next cached batch - #[instrument(skip(self))] + #[instrument(skip_all, fields(id = &batch.id, size = &batch.size))] pub async fn prefill(&mut self, batch: Batch) -> Result<(Vec, Option)> { let futures: Vec<_> = self .clients @@ -69,7 +69,7 @@ impl ShardedClient { /// /// Returns Generation for each request in batches /// and the next cached batch - #[instrument(skip(self))] + #[instrument(skip_all, fields(size = batches.iter().map(|batch|{batch.size}).sum::()))] pub async fn decode( &mut self, batches: Vec, diff --git a/router/src/infer.rs b/router/src/infer.rs index f2716e40..4013edfd 100644 --- a/router/src/infer.rs +++ b/router/src/infer.rs @@ -243,10 +243,11 @@ async fn batching_task( } } // Create span for this batch to add context to inference calls - let next_batch_span = info_span!(parent: None, "batch"); + let next_batch_size = entries.len(); + let next_batch_span = info_span!(parent: None, "batch", batch_size = next_batch_size); entries.iter_mut().for_each(|(_, entry)| { // Create a new span to link the batch back to this entry - let entry_batch_span = info_span!(parent: &entry.span, "infer"); + let entry_batch_span = info_span!(parent: &entry.span, "infer", batch_size = next_batch_size); // Add relationship entry_batch_span.follows_from(&next_batch_span); // Update entry @@ -263,7 +264,7 @@ async fn batching_task( } /// Wrap a future inside a match statement to handle errors and send the responses to Infer -#[instrument(skip(future))] +#[instrument(skip_all)] async fn wrap_future( future: impl Future, Option), ClientError>>, entries: &mut IntMap, @@ -282,7 +283,7 @@ async fn wrap_future( } /// Send errors to Infer for all `entries` -#[instrument] +#[instrument(skip_all)] fn send_errors(error: ClientError, entries: &mut IntMap) { entries.drain().for_each(|(_, entry)| { // Create and enter a span to link this function back to the entry @@ -299,7 +300,7 @@ fn send_errors(error: ClientError, entries: &mut IntMap) { } /// Send one or multiple `InferStreamResponse` to Infer for all `entries` -#[instrument] +#[instrument(skip_all)] fn send_generations(generations: Vec, entries: &mut IntMap) { generations.into_iter().for_each(|generation| { // Get entry @@ -309,7 +310,7 @@ fn send_generations(generations: Vec, entries: &mut IntMap, json_output: bool) { true => fmt_layer .json() .flatten_event(true) - .with_span_list(false) .boxed(), false => fmt_layer.boxed(), }; diff --git a/router/src/queue.rs b/router/src/queue.rs index 88a049e6..bba3b675 100644 --- a/router/src/queue.rs +++ b/router/src/queue.rs @@ -48,7 +48,7 @@ impl Queue { } /// Append an entry to the queue - #[instrument(skip(self))] + #[instrument(skip_all)] pub(crate) fn append(&self, entry: Entry) { // Send append command to the background task managing the state // Unwrap is safe here @@ -143,12 +143,12 @@ impl State { } } - // Create span for this batch to add context to inference calls - let next_batch_span = info_span!(parent: None, "batch"); - next_batch_span.follows_from(&Span::current()); - let next_batch_size = min(self.entries.len(), max_size); + // Create span for this batch to add context to inference calls + let next_batch_span = info_span!(parent: None, "batch", batch_size = next_batch_size); + next_batch_span.follows_from(&Span::current()); + let mut batch_requests = Vec::with_capacity(next_batch_size); let mut batch_entries = IntMap::with_capacity_and_hasher(next_batch_size, BuildNoHashHasher::default()); @@ -158,7 +158,7 @@ impl State { .drain(..next_batch_size) .for_each(|(id, mut entry)| { // Create a new span to link the batch back to this entry - let entry_batch_span = info_span!(parent: &entry.span, "infer"); + let entry_batch_span = info_span!(parent: &entry.span, "infer", batch_size = next_batch_size); // Add relationship entry_batch_span.follows_from(&next_batch_span); // Update entry diff --git a/router/src/validation.rs b/router/src/validation.rs index b3e05fc0..aa1c1d23 100644 --- a/router/src/validation.rs +++ b/router/src/validation.rs @@ -37,7 +37,7 @@ impl Validation { } /// Validate a payload and get the number of tokens in the input - #[instrument(skip(self))] + #[instrument(skip_all)] pub(crate) async fn validate( &self, request: GenerateRequest, diff --git a/server/text_generation/cli.py b/server/text_generation/cli.py index ba3a6a23..e9c8ea92 100644 --- a/server/text_generation/cli.py +++ b/server/text_generation/cli.py @@ -4,7 +4,6 @@ import typer from pathlib import Path from loguru import logger -from typer import Argument from typing import Optional from text_generation import server, utils @@ -22,7 +21,7 @@ def serve( uds_path: Path = "/tmp/text-generation", logger_level: str = "INFO", json_output: bool = False, - otlp_endpoint: Optional[str] = Argument(None, envvar="OTLP_ENDPOINT"), + otlp_endpoint: Optional[str] = None, ): if sharded: assert (