From c690da5973491ba6c93f9616859d6b8c70d89b2c Mon Sep 17 00:00:00 2001 From: Hugo Larcher Date: Tue, 28 Jan 2025 10:29:18 +0100 Subject: [PATCH] fix: Telemetry (#2957) * fix: add telemetry regular pings and fix unhandled errors avoid not sending telemetry stop events. * fix: simplify error handling * fix: update ping delay and update doc. * fix: clippy * doc: Rephrase properly. --- docs/source/usage_statistics.md | 2 +- router/src/server.rs | 47 ++++++++++++++++++++++++++------- router/src/usage_stats.rs | 3 ++- 3 files changed, 40 insertions(+), 12 deletions(-) diff --git a/docs/source/usage_statistics.md b/docs/source/usage_statistics.md index d3878b53c..dbf36c779 100644 --- a/docs/source/usage_statistics.md +++ b/docs/source/usage_statistics.md @@ -3,7 +3,7 @@ Text Generation Inference collects anonymous usage statistics to help us improve the service. The collected data is used to improve TGI and to understand what causes failures. The data is collected transparently and any sensitive information is omitted. -Data is sent twice, once on server startup and once when server stops. Also, usage statistics are only enabled when TGI is running in docker to avoid collecting data then TGI runs directly on the host machine. +Usage statistics are collected only when TGI is running in a Docker container. This prevents data collection when TGI is run directly on the host machine. The collected data includes startup and shutdown events, as well as a heartbeat signal sent every 15 minutes. ## What data is collected diff --git a/router/src/server.rs b/router/src/server.rs index aef0f8120..2781f9fbf 100644 --- a/router/src/server.rs +++ b/router/src/server.rs @@ -54,6 +54,9 @@ use std::fs::File; use std::io::BufReader; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::Duration; use thiserror::Error; use tokio::select; use tokio::signal; @@ -1819,9 +1822,9 @@ pub async fn run( HubTokenizerConfig::default() }); - let tokenizer: Tokenizer = { + let tokenizer: Result = { use pyo3::prelude::*; - pyo3::Python::with_gil(|py| -> PyResult<()> { + Python::with_gil(|py| -> PyResult<()> { py_resolve_tokenizer(py, &tokenizer_name, revision.as_deref(), trust_remote_code)?; Ok(()) }) @@ -1832,16 +1835,16 @@ pub async fn run( let out = legacy_tokenizer_handle(config_filename.as_ref()); out.ok_or(err) }) - .expect("We cannot load a tokenizer"); + .map_err(|_| WebServerError::Tokenizer("Unable to load tokenizer.".to_string()))?; let filename = "out/tokenizer.json"; if let Ok(tok) = tokenizers::Tokenizer::from_file(filename) { - Tokenizer::Rust(tok) + Ok(Tokenizer::Rust(tok)) } else { - Tokenizer::Python { + Ok(Tokenizer::Python { tokenizer_name: tokenizer_name.clone(), revision: revision.clone(), trust_remote_code, - } + }) } }; @@ -1901,11 +1904,27 @@ pub async fn run( _ => None, }; - if let Some(ref ua) = user_agent { + let stop_usage_thread = Arc::new(AtomicBool::new(false)); + let stop_usage_thread_clone = stop_usage_thread.clone(); + if let Some(ua) = user_agent.clone() { let start_event = usage_stats::UsageStatsEvent::new(ua.clone(), usage_stats::EventType::Start, None); tokio::spawn(async move { + // send start event start_event.send().await; + let mut last_report = Instant::now(); + while !stop_usage_thread_clone.load(Ordering::Relaxed) { + if last_report.elapsed() > Duration::from_secs(900) { + let report_event = usage_stats::UsageStatsEvent::new( + ua.clone(), + usage_stats::EventType::Ping, + None, + ); + report_event.send().await; + last_report = Instant::now(); + } + tokio::time::sleep(Duration::from_secs(1)).await; + } }); }; let compat_return_full_text = match &model_info.pipeline_tag { @@ -1926,7 +1945,7 @@ pub async fn run( validation_workers, api_key, config, - (tokenizer, tokenizer_config), + (tokenizer?, tokenizer_config), (preprocessor_config, processor_config), hostname, port, @@ -1943,6 +1962,7 @@ pub async fn run( .await; if let Some(ua) = user_agent { + stop_usage_thread.store(true, Ordering::Relaxed); match result { Ok(_) => { let stop_event = usage_stats::UsageStatsEvent::new( @@ -2419,8 +2439,13 @@ async fn start( } } else { // Run server - - let listener = tokio::net::TcpListener::bind(&addr).await.unwrap(); + let listener = match tokio::net::TcpListener::bind(&addr).await { + Ok(listener) => listener, + Err(e) => { + tracing::error!("Failed to bind to {addr}: {e}"); + return Err(WebServerError::Axum(Box::new(e))); + } + }; axum::serve(listener, app) .with_graceful_shutdown(shutdown_signal()) .await @@ -2535,4 +2560,6 @@ impl From for Event { pub enum WebServerError { #[error("Axum error: {0}")] Axum(#[from] axum::BoxError), + #[error("Tokenizer error: {0}")] + Tokenizer(String), } diff --git a/router/src/usage_stats.rs b/router/src/usage_stats.rs index e9d98327e..4139c4c58 100644 --- a/router/src/usage_stats.rs +++ b/router/src/usage_stats.rs @@ -43,6 +43,7 @@ pub enum EventType { Start, Stop, Error, + Ping, } #[derive(Debug, Serialize)] @@ -70,7 +71,7 @@ impl UsageStatsEvent { .post(TELEMETRY_URL) .headers(headers) .body(body) - .timeout(Duration::from_secs(5)) + .timeout(Duration::from_secs(10)) .send() .await; }