fix: add telemetry regular pings and fix unhandled errors avoid not sending telemetry stop events.

This commit is contained in:
Hugo Larcher 2025-01-24 18:10:12 +01:00
parent 6cb41a80a1
commit 4c8bf7f5b8
No known key found for this signature in database
GPG Key ID: 3DAF63124699CA2B
2 changed files with 46 additions and 12 deletions

View File

@ -54,6 +54,9 @@ use std::fs::File;
use std::io::BufReader; use std::io::BufReader;
use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use thiserror::Error; use thiserror::Error;
use tokio::select; use tokio::select;
use tokio::signal; use tokio::signal;
@ -1819,9 +1822,9 @@ pub async fn run(
HubTokenizerConfig::default() HubTokenizerConfig::default()
}); });
let tokenizer: Tokenizer = { let tokenizer: Result<Tokenizer, WebServerError> = {
use pyo3::prelude::*; use pyo3::prelude::*;
pyo3::Python::with_gil(|py| -> PyResult<()> { match Python::with_gil(|py| -> PyResult<()> {
py_resolve_tokenizer(py, &tokenizer_name, revision.as_deref(), trust_remote_code)?; py_resolve_tokenizer(py, &tokenizer_name, revision.as_deref(), trust_remote_code)?;
Ok(()) Ok(())
}) })
@ -1831,17 +1834,23 @@ pub async fn run(
.or_else(|err| { .or_else(|err| {
let out = legacy_tokenizer_handle(config_filename.as_ref()); let out = legacy_tokenizer_handle(config_filename.as_ref());
out.ok_or(err) out.ok_or(err)
}) }) {
.expect("We cannot load a tokenizer"); Ok(_) => {}
Err(_) => {
return Err(WebServerError::Tokenizer(
"Unable to load tokenizer.".to_string(),
));
}
}
let filename = "out/tokenizer.json"; let filename = "out/tokenizer.json";
if let Ok(tok) = tokenizers::Tokenizer::from_file(filename) { if let Ok(tok) = tokenizers::Tokenizer::from_file(filename) {
Tokenizer::Rust(tok) Ok(Tokenizer::Rust(tok))
} else { } else {
Tokenizer::Python { Ok(Tokenizer::Python {
tokenizer_name: tokenizer_name.clone(), tokenizer_name: tokenizer_name.clone(),
revision: revision.clone(), revision: revision.clone(),
trust_remote_code, trust_remote_code,
} })
} }
}; };
@ -1901,11 +1910,27 @@ pub async fn run(
_ => None, _ => None,
}; };
if let Some(ref ua) = user_agent { let stop_usage_thread = Arc::new(AtomicBool::new(false));
let stop_usage_thread_clone = stop_usage_thread.clone();
if let Some(ua) = user_agent.clone() {
let start_event = let start_event =
usage_stats::UsageStatsEvent::new(ua.clone(), usage_stats::EventType::Start, None); usage_stats::UsageStatsEvent::new(ua.clone(), usage_stats::EventType::Start, None);
tokio::spawn(async move { tokio::spawn(async move {
// send start event
start_event.send().await; start_event.send().await;
let mut last_report = Instant::now();
while !stop_usage_thread_clone.load(Ordering::Relaxed) {
if last_report.elapsed() > Duration::from_secs(3600) {
let report_event = usage_stats::UsageStatsEvent::new(
ua.clone(),
usage_stats::EventType::Ping,
None,
);
report_event.send().await;
last_report = Instant::now();
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
}); });
}; };
let compat_return_full_text = match &model_info.pipeline_tag { let compat_return_full_text = match &model_info.pipeline_tag {
@ -1926,7 +1951,7 @@ pub async fn run(
validation_workers, validation_workers,
api_key, api_key,
config, config,
(tokenizer, tokenizer_config), (tokenizer?, tokenizer_config),
(preprocessor_config, processor_config), (preprocessor_config, processor_config),
hostname, hostname,
port, port,
@ -1943,6 +1968,7 @@ pub async fn run(
.await; .await;
if let Some(ua) = user_agent { if let Some(ua) = user_agent {
stop_usage_thread.store(true, Ordering::Relaxed);
match result { match result {
Ok(_) => { Ok(_) => {
let stop_event = usage_stats::UsageStatsEvent::new( let stop_event = usage_stats::UsageStatsEvent::new(
@ -2419,8 +2445,13 @@ async fn start(
} }
} else { } else {
// Run server // Run server
let listener = match tokio::net::TcpListener::bind(&addr).await {
let listener = tokio::net::TcpListener::bind(&addr).await.unwrap(); Ok(listener) => listener,
Err(e) => {
tracing::error!("Failed to bind to {addr}: {e}");
return Err(WebServerError::Axum(Box::new(e)));
}
};
axum::serve(listener, app) axum::serve(listener, app)
.with_graceful_shutdown(shutdown_signal()) .with_graceful_shutdown(shutdown_signal())
.await .await
@ -2535,4 +2566,6 @@ impl From<InferError> for Event {
pub enum WebServerError { pub enum WebServerError {
#[error("Axum error: {0}")] #[error("Axum error: {0}")]
Axum(#[from] axum::BoxError), Axum(#[from] axum::BoxError),
#[error("Tokenizer error: {0}")]
Tokenizer(String),
} }

View File

@ -43,6 +43,7 @@ pub enum EventType {
Start, Start,
Stop, Stop,
Error, Error,
Ping,
} }
#[derive(Debug, Serialize)] #[derive(Debug, Serialize)]
@ -70,7 +71,7 @@ impl UsageStatsEvent {
.post(TELEMETRY_URL) .post(TELEMETRY_URL)
.headers(headers) .headers(headers)
.body(body) .body(body)
.timeout(Duration::from_secs(5)) .timeout(Duration::from_secs(10))
.send() .send()
.await; .await;
} }