improved instrumentation

This commit is contained in:
OlivierDehaene 2023-02-10 15:30:53 +01:00
parent f81f0828d7
commit 67cd625c82
8 changed files with 25 additions and 20 deletions

View File

@ -27,6 +27,7 @@ to power LLMs api-inference widgets.
- [Docker](#docker) - [Docker](#docker)
- [API Documentation](#api-documentation) - [API Documentation](#api-documentation)
- [A note on Shared Memory](#a-note-on-shared-memory-shm) - [A note on Shared Memory](#a-note-on-shared-memory-shm)
- [Distributed Tracing](#distributed-tracing)
- [Local Install](#local-install) - [Local Install](#local-install)
- [CUDA Kernels](#cuda-kernels) - [CUDA Kernels](#cuda-kernels)
- [Run BLOOM](#run-bloom) - [Run BLOOM](#run-bloom)
@ -103,6 +104,11 @@ curl 127.0.0.1:8080/generate_stream \
You can consult the OpenAPI documentation of the `text-generation-inference` REST API using the `/docs` route. You can consult the OpenAPI documentation of the `text-generation-inference` REST API using the `/docs` route.
The Swagger UI is also available at: [https://huggingface.github.io/text-generation-inference](https://huggingface.github.io/text-generation-inference). The Swagger UI is also available at: [https://huggingface.github.io/text-generation-inference](https://huggingface.github.io/text-generation-inference).
### Distributed Tracing
`text-generation-inference` is instrumented with distributed tracing using OpenTelemetry. You can use this feature
by setting the address to an OTLP collector with the `--otlp-endpoint` argument.
### A note on Shared Memory (shm) ### A note on Shared Memory (shm)
[`NCCL`](https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/index.html) is a communication framework used by [`NCCL`](https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/index.html) is a communication framework used by

View File

@ -66,7 +66,7 @@ impl Client {
/// ///
/// Returns Generation for each request in batch /// Returns Generation for each request in batch
/// and the next cached batch /// and the next cached batch
#[instrument(skip(self))] #[instrument(skip_all, fields(id = &batch.id, size = &batch.size))]
pub async fn prefill(&mut self, batch: Batch) -> Result<(Vec<Generation>, Option<Batch>)> { pub async fn prefill(&mut self, batch: Batch) -> Result<(Vec<Generation>, Option<Batch>)> {
let request = tonic::Request::new(PrefillRequest { batch: Some(batch) }).inject_context(); let request = tonic::Request::new(PrefillRequest { batch: Some(batch) }).inject_context();
let response = self.stub.prefill(request).await?.into_inner(); let response = self.stub.prefill(request).await?.into_inner();
@ -77,7 +77,7 @@ impl Client {
/// ///
/// Returns Generation for each request in batches /// Returns Generation for each request in batches
/// and the next cached batch /// and the next cached batch
#[instrument(skip(self))] #[instrument(skip_all, fields(size = batches.iter().map(|batch|{batch.size}).sum::<u32>()))]
pub async fn decode( pub async fn decode(
&mut self, &mut self,
batches: Vec<Batch>, batches: Vec<Batch>,

View File

@ -53,7 +53,7 @@ impl ShardedClient {
/// ///
/// Returns Generation for each request in batch /// Returns Generation for each request in batch
/// and the next cached batch /// and the next cached batch
#[instrument(skip(self))] #[instrument(skip_all, fields(id = &batch.id, size = &batch.size))]
pub async fn prefill(&mut self, batch: Batch) -> Result<(Vec<Generation>, Option<Batch>)> { pub async fn prefill(&mut self, batch: Batch) -> Result<(Vec<Generation>, Option<Batch>)> {
let futures: Vec<_> = self let futures: Vec<_> = self
.clients .clients
@ -69,7 +69,7 @@ impl ShardedClient {
/// ///
/// Returns Generation for each request in batches /// Returns Generation for each request in batches
/// and the next cached batch /// and the next cached batch
#[instrument(skip(self))] #[instrument(skip_all, fields(size = batches.iter().map(|batch|{batch.size}).sum::<u32>()))]
pub async fn decode( pub async fn decode(
&mut self, &mut self,
batches: Vec<Batch>, batches: Vec<Batch>,

View File

@ -243,10 +243,11 @@ async fn batching_task(
} }
} }
// Create span for this batch to add context to inference calls // Create span for this batch to add context to inference calls
let next_batch_span = info_span!(parent: None, "batch"); let next_batch_size = entries.len();
let next_batch_span = info_span!(parent: None, "batch", batch_size = next_batch_size);
entries.iter_mut().for_each(|(_, entry)| { entries.iter_mut().for_each(|(_, entry)| {
// Create a new span to link the batch back to this entry // Create a new span to link the batch back to this entry
let entry_batch_span = info_span!(parent: &entry.span, "infer"); let entry_batch_span = info_span!(parent: &entry.span, "infer", batch_size = next_batch_size);
// Add relationship // Add relationship
entry_batch_span.follows_from(&next_batch_span); entry_batch_span.follows_from(&next_batch_span);
// Update entry // Update entry
@ -263,7 +264,7 @@ async fn batching_task(
} }
/// Wrap a future inside a match statement to handle errors and send the responses to Infer /// Wrap a future inside a match statement to handle errors and send the responses to Infer
#[instrument(skip(future))] #[instrument(skip_all)]
async fn wrap_future( async fn wrap_future(
future: impl Future<Output = Result<(Vec<Generation>, Option<Batch>), ClientError>>, future: impl Future<Output = Result<(Vec<Generation>, Option<Batch>), ClientError>>,
entries: &mut IntMap<u64, Entry>, entries: &mut IntMap<u64, Entry>,
@ -282,7 +283,7 @@ async fn wrap_future(
} }
/// Send errors to Infer for all `entries` /// Send errors to Infer for all `entries`
#[instrument] #[instrument(skip_all)]
fn send_errors(error: ClientError, entries: &mut IntMap<u64, Entry>) { fn send_errors(error: ClientError, entries: &mut IntMap<u64, Entry>) {
entries.drain().for_each(|(_, entry)| { entries.drain().for_each(|(_, entry)| {
// Create and enter a span to link this function back to the entry // Create and enter a span to link this function back to the entry
@ -299,7 +300,7 @@ fn send_errors(error: ClientError, entries: &mut IntMap<u64, Entry>) {
} }
/// Send one or multiple `InferStreamResponse` to Infer for all `entries` /// Send one or multiple `InferStreamResponse` to Infer for all `entries`
#[instrument] #[instrument(skip_all)]
fn send_generations(generations: Vec<Generation>, entries: &mut IntMap<u64, Entry>) { fn send_generations(generations: Vec<Generation>, entries: &mut IntMap<u64, Entry>) {
generations.into_iter().for_each(|generation| { generations.into_iter().for_each(|generation| {
// Get entry // Get entry
@ -309,7 +310,7 @@ fn send_generations(generations: Vec<Generation>, entries: &mut IntMap<u64, Entr
.expect("ID not found in entries. This is a bug."); .expect("ID not found in entries. This is a bug.");
// Create and enter a span to link this function back to the entry // Create and enter a span to link this function back to the entry
let _generation_span = info_span!(parent: entry.batch_span.as_ref().expect("batch_span is None. This is a bug."), "send_generation").entered(); let _generation_span = info_span!(parent: entry.batch_span.as_ref().expect("batch_span is None. This is a bug."), "send_generation", generation = ?generation).entered();
if let Some(prefill_tokens) = generation.prefill_tokens { if let Some(prefill_tokens) = generation.prefill_tokens {
// Send message // Send message

View File

@ -121,7 +121,6 @@ fn init_logging(otlp_endpoint: Option<String>, json_output: bool) {
true => fmt_layer true => fmt_layer
.json() .json()
.flatten_event(true) .flatten_event(true)
.with_span_list(false)
.boxed(), .boxed(),
false => fmt_layer.boxed(), false => fmt_layer.boxed(),
}; };

View File

@ -48,7 +48,7 @@ impl Queue {
} }
/// Append an entry to the queue /// Append an entry to the queue
#[instrument(skip(self))] #[instrument(skip_all)]
pub(crate) fn append(&self, entry: Entry) { pub(crate) fn append(&self, entry: Entry) {
// Send append command to the background task managing the state // Send append command to the background task managing the state
// Unwrap is safe here // Unwrap is safe here
@ -143,12 +143,12 @@ impl State {
} }
} }
// Create span for this batch to add context to inference calls
let next_batch_span = info_span!(parent: None, "batch");
next_batch_span.follows_from(&Span::current());
let next_batch_size = min(self.entries.len(), max_size); let next_batch_size = min(self.entries.len(), max_size);
// Create span for this batch to add context to inference calls
let next_batch_span = info_span!(parent: None, "batch", batch_size = next_batch_size);
next_batch_span.follows_from(&Span::current());
let mut batch_requests = Vec::with_capacity(next_batch_size); let mut batch_requests = Vec::with_capacity(next_batch_size);
let mut batch_entries = let mut batch_entries =
IntMap::with_capacity_and_hasher(next_batch_size, BuildNoHashHasher::default()); IntMap::with_capacity_and_hasher(next_batch_size, BuildNoHashHasher::default());
@ -158,7 +158,7 @@ impl State {
.drain(..next_batch_size) .drain(..next_batch_size)
.for_each(|(id, mut entry)| { .for_each(|(id, mut entry)| {
// Create a new span to link the batch back to this entry // Create a new span to link the batch back to this entry
let entry_batch_span = info_span!(parent: &entry.span, "infer"); let entry_batch_span = info_span!(parent: &entry.span, "infer", batch_size = next_batch_size);
// Add relationship // Add relationship
entry_batch_span.follows_from(&next_batch_span); entry_batch_span.follows_from(&next_batch_span);
// Update entry // Update entry

View File

@ -37,7 +37,7 @@ impl Validation {
} }
/// Validate a payload and get the number of tokens in the input /// Validate a payload and get the number of tokens in the input
#[instrument(skip(self))] #[instrument(skip_all)]
pub(crate) async fn validate( pub(crate) async fn validate(
&self, &self,
request: GenerateRequest, request: GenerateRequest,

View File

@ -4,7 +4,6 @@ import typer
from pathlib import Path from pathlib import Path
from loguru import logger from loguru import logger
from typer import Argument
from typing import Optional from typing import Optional
from text_generation import server, utils from text_generation import server, utils
@ -22,7 +21,7 @@ def serve(
uds_path: Path = "/tmp/text-generation", uds_path: Path = "/tmp/text-generation",
logger_level: str = "INFO", logger_level: str = "INFO",
json_output: bool = False, json_output: bool = False,
otlp_endpoint: Optional[str] = Argument(None, envvar="OTLP_ENDPOINT"), otlp_endpoint: Optional[str] = None,
): ):
if sharded: if sharded:
assert ( assert (