fix: Telemetry (#2957)

* fix: add telemetry regular pings and fix unhandled errors avoid not sending telemetry stop events.

* fix: simplify error handling

* fix: update ping delay and update doc.

* fix: clippy

* doc: Rephrase properly.
This commit is contained in:
Hugo Larcher 2025-01-28 10:29:18 +01:00 committed by GitHub
parent db922eb77e
commit c690da5973
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 40 additions and 12 deletions

View File

@ -3,7 +3,7 @@
Text Generation Inference collects anonymous usage statistics to help us improve the service. The collected data is used to improve TGI and to understand what causes failures. The data is collected transparently and any sensitive information is omitted. Text Generation Inference collects anonymous usage statistics to help us improve the service. The collected data is used to improve TGI and to understand what causes failures. The data is collected transparently and any sensitive information is omitted.
Data is sent twice, once on server startup and once when server stops. Also, usage statistics are only enabled when TGI is running in docker to avoid collecting data then TGI runs directly on the host machine. Usage statistics are collected only when TGI is running in a Docker container. This prevents data collection when TGI is run directly on the host machine. The collected data includes startup and shutdown events, as well as a heartbeat signal sent every 15 minutes.
## What data is collected ## What data is collected

View File

@ -54,6 +54,9 @@ use std::fs::File;
use std::io::BufReader; use std::io::BufReader;
use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use thiserror::Error; use thiserror::Error;
use tokio::select; use tokio::select;
use tokio::signal; use tokio::signal;
@ -1819,9 +1822,9 @@ pub async fn run(
HubTokenizerConfig::default() HubTokenizerConfig::default()
}); });
let tokenizer: Tokenizer = { let tokenizer: Result<Tokenizer, WebServerError> = {
use pyo3::prelude::*; use pyo3::prelude::*;
pyo3::Python::with_gil(|py| -> PyResult<()> { Python::with_gil(|py| -> PyResult<()> {
py_resolve_tokenizer(py, &tokenizer_name, revision.as_deref(), trust_remote_code)?; py_resolve_tokenizer(py, &tokenizer_name, revision.as_deref(), trust_remote_code)?;
Ok(()) Ok(())
}) })
@ -1832,16 +1835,16 @@ pub async fn run(
let out = legacy_tokenizer_handle(config_filename.as_ref()); let out = legacy_tokenizer_handle(config_filename.as_ref());
out.ok_or(err) out.ok_or(err)
}) })
.expect("We cannot load a tokenizer"); .map_err(|_| WebServerError::Tokenizer("Unable to load tokenizer.".to_string()))?;
let filename = "out/tokenizer.json"; let filename = "out/tokenizer.json";
if let Ok(tok) = tokenizers::Tokenizer::from_file(filename) { if let Ok(tok) = tokenizers::Tokenizer::from_file(filename) {
Tokenizer::Rust(tok) Ok(Tokenizer::Rust(tok))
} else { } else {
Tokenizer::Python { Ok(Tokenizer::Python {
tokenizer_name: tokenizer_name.clone(), tokenizer_name: tokenizer_name.clone(),
revision: revision.clone(), revision: revision.clone(),
trust_remote_code, trust_remote_code,
} })
} }
}; };
@ -1901,11 +1904,27 @@ pub async fn run(
_ => None, _ => None,
}; };
if let Some(ref ua) = user_agent { let stop_usage_thread = Arc::new(AtomicBool::new(false));
let stop_usage_thread_clone = stop_usage_thread.clone();
if let Some(ua) = user_agent.clone() {
let start_event = let start_event =
usage_stats::UsageStatsEvent::new(ua.clone(), usage_stats::EventType::Start, None); usage_stats::UsageStatsEvent::new(ua.clone(), usage_stats::EventType::Start, None);
tokio::spawn(async move { tokio::spawn(async move {
// send start event
start_event.send().await; start_event.send().await;
let mut last_report = Instant::now();
while !stop_usage_thread_clone.load(Ordering::Relaxed) {
if last_report.elapsed() > Duration::from_secs(900) {
let report_event = usage_stats::UsageStatsEvent::new(
ua.clone(),
usage_stats::EventType::Ping,
None,
);
report_event.send().await;
last_report = Instant::now();
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
}); });
}; };
let compat_return_full_text = match &model_info.pipeline_tag { let compat_return_full_text = match &model_info.pipeline_tag {
@ -1926,7 +1945,7 @@ pub async fn run(
validation_workers, validation_workers,
api_key, api_key,
config, config,
(tokenizer, tokenizer_config), (tokenizer?, tokenizer_config),
(preprocessor_config, processor_config), (preprocessor_config, processor_config),
hostname, hostname,
port, port,
@ -1943,6 +1962,7 @@ pub async fn run(
.await; .await;
if let Some(ua) = user_agent { if let Some(ua) = user_agent {
stop_usage_thread.store(true, Ordering::Relaxed);
match result { match result {
Ok(_) => { Ok(_) => {
let stop_event = usage_stats::UsageStatsEvent::new( let stop_event = usage_stats::UsageStatsEvent::new(
@ -2419,8 +2439,13 @@ async fn start(
} }
} else { } else {
// Run server // Run server
let listener = match tokio::net::TcpListener::bind(&addr).await {
let listener = tokio::net::TcpListener::bind(&addr).await.unwrap(); Ok(listener) => listener,
Err(e) => {
tracing::error!("Failed to bind to {addr}: {e}");
return Err(WebServerError::Axum(Box::new(e)));
}
};
axum::serve(listener, app) axum::serve(listener, app)
.with_graceful_shutdown(shutdown_signal()) .with_graceful_shutdown(shutdown_signal())
.await .await
@ -2535,4 +2560,6 @@ impl From<InferError> for Event {
pub enum WebServerError { pub enum WebServerError {
#[error("Axum error: {0}")] #[error("Axum error: {0}")]
Axum(#[from] axum::BoxError), Axum(#[from] axum::BoxError),
#[error("Tokenizer error: {0}")]
Tokenizer(String),
} }

View File

@ -43,6 +43,7 @@ pub enum EventType {
Start, Start,
Stop, Stop,
Error, Error,
Ping,
} }
#[derive(Debug, Serialize)] #[derive(Debug, Serialize)]
@ -70,7 +71,7 @@ impl UsageStatsEvent {
.post(TELEMETRY_URL) .post(TELEMETRY_URL)
.headers(headers) .headers(headers)
.body(body) .body(body)
.timeout(Duration::from_secs(5)) .timeout(Duration::from_secs(10))
.send() .send()
.await; .await;
} }