diff --git a/backends/llamacpp/src/main.rs b/backends/llamacpp/src/main.rs index 9ee61ce6..c193c868 100644 --- a/backends/llamacpp/src/main.rs +++ b/backends/llamacpp/src/main.rs @@ -119,7 +119,7 @@ struct Args { #[clap(default_value = "3000", long, short, env)] port: u16, - #[clap(default_value = "9000", long, short, env)] + #[clap(default_value = "9000", long, env)] prometheus_port: u16, /// Enable JSON output format. diff --git a/backends/trtllm/csrc/backend.cpp b/backends/trtllm/csrc/backend.cpp index 2151466b..4a131e31 100644 --- a/backends/trtllm/csrc/backend.cpp +++ b/backends/trtllm/csrc/backend.cpp @@ -59,7 +59,14 @@ namespace huggingface::tgi::backends::trtllm { static_cast(g_params.max_new_tokens), true, (tle::SamplingConfig) s_params, - tle::OutputConfig{ /* returnLogProbs= */ true}, + tle::OutputConfig{ + /* returnLogProbs= */ true, + false, + false, + false, + false, + /* returnPerfMetrics=*/ true, + }, std::nullopt, std::nullopt, std::nullopt, diff --git a/backends/trtllm/csrc/ffi.hpp b/backends/trtllm/csrc/ffi.hpp index 840614bb..624259cf 100644 --- a/backends/trtllm/csrc/ffi.hpp +++ b/backends/trtllm/csrc/ffi.hpp @@ -1,6 +1,8 @@ #ifndef TGI_BACKEND_TRTLLM_FFI #define TGI_BACKEND_TRTLLM_FFI +#include +#include #include #include @@ -17,7 +19,7 @@ namespace rust::behavior { template static void trycatch(Try &&func, Fail &&fail) noexcept try { func(); - } catch (tensorrt_llm::common::TllmException &e) { + } catch (const std::exception &e) { fail(e.what()); } } @@ -42,22 +44,46 @@ namespace huggingface::tgi::backends::trtllm { return finish_reason_t::kEND_ID; case tle::FinishReason::kLENGTH: return finish_reason_t::kLENGTH; + case tle::FinishReason::kTIMED_OUT: + return finish_reason_t::kTIMED_OUT; + case tle::FinishReason::kCANCELLED: + return finish_reason_t::kCANCELLED; default: std::unreachable(); } } - static auto as_generation_step = [](const tle::Response &r) { + static auto as_generation_step = [](const tle::Response &r, const std::chrono::time_point created) { const auto reqId = r.getRequestId(); if (!r.hasError()) [[likely]] { const auto result = r.getResult(); - const auto logits = result.logProbs.value()[0]; + std::optional token_id = std::nullopt; + if (!result.outputTokenIds.empty() && !result.outputTokenIds[0].empty()) { + token_id = static_cast(result.outputTokenIds[0][0]); + } + + std::optional log_prob = std::nullopt; + if (result.logProbs && !result.logProbs->empty() && !result.logProbs.value()[0].empty()) { + log_prob = result.logProbs.value()[0].back(); + } + + std::optional first_scheduled_time_ns = std::nullopt; + if (result.requestPerfMetrics) { + const auto &t = result.requestPerfMetrics->timingMetrics; + const auto ns = std::chrono::duration_cast(t.firstScheduledTime - created).count(); + first_scheduled_time_ns = static_cast(ns); + } + return generation_step_t{ reqId, - static_cast(result.outputTokenIds[0][0]), - logits.back(), + token_id.value_or(0), + log_prob.value_or(0.0), + first_scheduled_time_ns.value_or(0), result.isFinal, as_finish_reason_t(result.finishReasons[0]), + token_id.has_value(), + log_prob.has_value(), + first_scheduled_time_ns.has_value(), false, std::string() }; @@ -66,8 +92,12 @@ namespace huggingface::tgi::backends::trtllm { reqId, 0, 0.0, + 0, true, finish_reason_t::kNOT_FINISHED, + false, + false, + false, true, std::move(r.getErrorMsg()) }; @@ -79,9 +109,16 @@ namespace huggingface::tgi::backends::trtllm { private: backend_t inner_; + // m_created_time is a reference point to convert time from c++ time_point + // to rust Instant. + std::chrono::time_point m_created_time; + + public: - tensorrt_llm_backend_t(std::filesystem::path &&engine_folder, std::filesystem::path &&executor_worker_path) - : inner_(engine_folder, executor_worker_path) {} + tensorrt_llm_backend_t(std::filesystem::path &&engine_folder, std::filesystem::path &&executor_worker_path, const std::chrono::time_point& created_time) + : inner_(engine_folder, executor_worker_path), + m_created_time {created_time} + {} size_t num_tokens_ready() const noexcept { return inner_.num_tokens_ready(); } @@ -121,13 +158,16 @@ namespace huggingface::tgi::backends::trtllm { SPDLOG_TRACE("[FFI] Successfully pulled out {:d} responses from executor", responses.size()); + auto f = [this](const tle::Response &r){ + return as_generation_step(r, m_created_time); + }; // Transform tle::Response to generation_step_t #ifdef __cpp_lib_ranges_to_container - auto steps = responses | std::views::transform(as_generation_step) | std::ranges::to(); + auto steps = responses | std::views::transform(f) | std::ranges::to(); #else auto steps = std::vector(); steps.reserve(responses.size()); - std::transform(responses.begin(), responses.end(), std::back_inserter(steps), as_generation_step); + std::transform(responses.begin(), responses.end(), std::back_inserter(steps), f); #endif return std::make_unique>(steps); @@ -179,12 +219,14 @@ namespace huggingface::tgi::backends::trtllm { std::unique_ptr create_backend_from_engine_folder(const rust::Str engines_folder, const rust::Str executor_worker_path) { + const auto created_time = std::chrono::steady_clock::now(); std::call_once(backend_initialized_flag, initialize_tensorrt_llm_backend); return std::make_unique( std::filesystem::path(std::string_view(engines_folder.begin(), engines_folder.end()), std::filesystem::path::format::auto_format), std::filesystem::path(std::string_view(executor_worker_path.begin(), executor_worker_path.end()), - std::filesystem::path::format::auto_format) + std::filesystem::path::format::auto_format), + created_time ); } } diff --git a/backends/trtllm/src/errors.rs b/backends/trtllm/src/errors.rs index 812fd6e3..3e6bd743 100644 --- a/backends/trtllm/src/errors.rs +++ b/backends/trtllm/src/errors.rs @@ -19,4 +19,8 @@ pub enum TensorRtLlmBackendError { WebServer(#[from] server::WebServerError), #[error("Tokio runtime failed to start: {0}")] Tokio(#[from] std::io::Error), + #[error("config.json doesn't exist in engine folder {0}")] + ConfigNotFound(PathBuf), + #[error("generation_config.json doesn't exist in engine folder {0}")] + GenerationConfigNotFound(PathBuf), } diff --git a/backends/trtllm/src/lib.rs b/backends/trtllm/src/lib.rs index 08507256..3a245151 100644 --- a/backends/trtllm/src/lib.rs +++ b/backends/trtllm/src/lib.rs @@ -24,6 +24,14 @@ mod ffi { /// The request finished because the maximum number of tokens was reached. #[cxx_name = "kLENGTH"] MaxLength = 3u8, + + #[cxx_name = "kTIMED_OUT"] + /// The request finished because it got timed out (via the mAllotedTime parameter) + TimedOut = 4u8, + + #[cxx_name = "kCANCELLED"] + /// The request was cancelled by calling cancelRequest. + Cancelled = 5u8, } /// Struct used as shared type between rust and C++ to represent the result @@ -34,8 +42,14 @@ mod ffi { request_id: u64, token_id: u32, log_prob: f32, + + /// The time of first schedule since the creation of the backend + first_scheduled_time_ns: i64, is_final: bool, finish_reason: FinishReason, + token_id_valid: bool, + log_prob_valid: bool, + first_scheduled_time_ns_valid: bool, has_error: bool, error_msg: String, } diff --git a/backends/trtllm/src/looper.rs b/backends/trtllm/src/looper.rs index 5fed954f..267f9fa9 100644 --- a/backends/trtllm/src/looper.rs +++ b/backends/trtllm/src/looper.rs @@ -3,12 +3,12 @@ use cxx::UniquePtr; use hashbrown::HashMap; use std::hint; use std::ops::Deref; -use std::path::Path; +use std::path::{Path, PathBuf}; use tokenizers::Tokenizer; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use tokio::sync::TryAcquireError; use tokio::task::spawn_blocking; -use tokio::time::Instant; +use tokio::time::{Duration, Instant}; use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::{debug, error, warn}; @@ -35,6 +35,9 @@ struct GenerationContext { tokens: Vec, start: Option, queued: Instant, + + /// output_buffer stores the output for detecting stop sequences + output_buffer: Option, } #[derive(Debug, Copy, Clone)] @@ -49,16 +52,28 @@ impl<'step> TryFrom<&'step GenerationStep> for DecodedToken { type Error = InferError; fn try_from(step: &'step GenerationStep) -> Result { - if !step.has_error { - Ok(Self { - id: step.token_id, - log_prob: step.log_prob, - is_final: step.is_final, - finish_reason: step.finish_reason, - }) - } else { - Err(GenerationError(step.error_msg.clone())) + if step.has_error { + return Err(GenerationError(step.error_msg.clone())); } + + if !step.token_id_valid { + return Err(GenerationError( + "GenerationStep contains no token_id".to_string(), + )); + } + + if !step.log_prob_valid { + return Err(GenerationError( + "GenerationStep contains no log_prob".to_string(), + )); + } + + Ok(Self { + id: step.token_id, + log_prob: step.log_prob, + is_final: step.is_final, + finish_reason: step.finish_reason, + }) } } @@ -67,6 +82,7 @@ fn executor_status_looper( tokenizer: Tokenizer, mut backend: UniquePtr, mut backlog: UnboundedReceiver, + created_time: Instant, ) { // Track the tuple (request_id, stream) for each request let mut in_flights = @@ -74,7 +90,12 @@ fn executor_status_looper( 'scheduler: loop { // Is there any request pending to be scheduled? - let awaiting_requests = backlog.len(); + let mut awaiting_requests = backlog.len(); + if awaiting_requests == 0 && in_flights.is_empty() { + // Wait for 1 request if we are not waiting for any response, + // so that the loop blocks at receive from backlog. + awaiting_requests += 1; + } for _ in 0..awaiting_requests { // Retrieve all the requests if let Some(ctx) = backlog.blocking_recv() { @@ -83,12 +104,17 @@ fn executor_status_looper( let generation_params = &request.parameters; let stopping_params = &request.stopping_parameters; let input_ids = request.input_ids.as_deref(); + let top_k = if generation_params.do_sample { + generation_params.top_k + } else { + 1 + }; // Submit to the TensorRT-LLM executor for scheduling match backend.pin_mut().submit( &input_ids.unwrap(), // This is checked beforehand in validate() stopping_params.max_new_tokens, - generation_params.top_k, + top_k, generation_params.top_p, generation_params.temperature, generation_params.repetition_penalty, @@ -124,12 +150,22 @@ fn executor_status_looper( for step in responses.deref() { if let Some(ctx) = in_flights.get_mut(&step.request_id) { // Update the starting timestamp if not set - // This value might not be the actual real starting time of the request - // on the executor side - Need to expose more info from the executor to - // retrieve this value - // TODO : Expose actual real starting time for a request on FFI layer if ctx.start.is_none() { - ctx.start = Some(Instant::now()); + if step.first_scheduled_time_ns_valid { + if step.first_scheduled_time_ns >= 0 { + ctx.start = created_time.checked_add(Duration::from_nanos( + step.first_scheduled_time_ns as u64, + )); + } else { + ctx.start = created_time.checked_sub(Duration::from_nanos( + -step.first_scheduled_time_ns as u64, + )); + } + } + + if ctx.start.is_none() { + ctx.start = Some(Instant::now()); + } } // Try to map the generation step to a DecodedToken @@ -151,7 +187,16 @@ fn executor_status_looper( let _ = in_flights.remove(&step.request_id); } } else { - warn!("Untracked request {}", step.request_id,); + match step.finish_reason { + FinishReason::Cancelled => { + // The client has canceled the request, so this should not generate a + // warning. + debug!("Cancelled request {}", step.request_id); + } + _ => { + warn!("Untracked request {}", step.request_id); + } + } } } } @@ -170,11 +215,39 @@ fn executor_status_looper( fn post_process_decoded_token( tokenizer: &Tokenizer, ctx: &mut GenerationContext, - decoded_token: DecodedToken, + mut decoded_token: DecodedToken, ) -> InferResult { match tokenizer.decode(&[decoded_token.id], false) { Ok(text) => { let is_special = tokenizer.get_added_vocabulary().is_special_token(&text); + + if let Some(buf) = ctx.output_buffer.as_mut() { + if buf.len() + text.len() > buf.capacity() { + let mut start = buf.len() + text.len() - buf.capacity(); + while start <= buf.len() && !buf.is_char_boundary(start) { + start += 1; + } + buf.drain(..start); + } + buf.push_str(&text); + + for stop_seq in &ctx.request.stopping_parameters.stop_sequences { + let start = if 1 + buf.len() > text.len() + stop_seq.len() { + let mut start = 1 + buf.len() - text.len() - stop_seq.len(); + while start > 0 && !buf.is_char_boundary(start) { + start -= 1; + } + start + } else { + 0 + }; + if buf[start..].contains(stop_seq) { + decoded_token.is_final = true; + decoded_token.finish_reason = FinishReason::StopWords; + } + } + } + let token = Token { id: decoded_token.id, text, @@ -231,6 +304,26 @@ fn ensure_paths_exist, PP: AsRef>( return Err(err); } + let mut config_path = PathBuf::from(engine_folder); + config_path.push("config.json"); + + if !config_path.exists() { + let err = TensorRtLlmBackendError::ConfigNotFound(engine_folder.to_path_buf()); + + error!("Path validation failed: {}", err,); + return Err(err); + } + + let mut generation_config_path = PathBuf::from(engine_folder); + generation_config_path.push("generation_config.json"); + + if !generation_config_path.exists() { + let err = TensorRtLlmBackendError::GenerationConfigNotFound(engine_folder.to_path_buf()); + + error!("Path validation failed: {}", err,); + return Err(err); + } + // Ensure executor worker binary exists if !executor_worker_path.exists() { let err = TensorRtLlmBackendError::ExecutorWorkerNotFound(engine_folder.to_path_buf()); @@ -271,13 +364,23 @@ impl TensorRtLlmBackendV2 { // Allocate the IPC layer to communicate with the backend let (executor_sender, executor_receiver) = unbounded_channel(); + // This is a reference point to convert time from c++ time_point + // to rust Instant. + let created_time = Instant::now(); + // Create the FFI backend let backend = create_backend_from_engine_folder(&engine_folder, &executor_worker_path) .map_err(|e| TensorRtLlmBackendError::Runtime(first_line(e.what(), "Unknown error")))?; // Executor looper is responsible for scheduling and pulling requests state at regular interval spawn_blocking(move || { - executor_status_looper(max_inflight_requests, tokenizer, backend, executor_receiver) + executor_status_looper( + max_inflight_requests, + tokenizer, + backend, + executor_receiver, + created_time, + ) }); Ok(TensorRtLlmBackendV2(executor_sender)) @@ -323,12 +426,20 @@ impl Backend for TensorRtLlmBackendV2 { // Send the context to the executor for scheduling let queued = Instant::now(); + let output_buffer = request + .stopping_parameters + .stop_sequences + .iter() + .map(|x| x.len()) + .max() + .map(|m| String::with_capacity(m + 32)); // TODO: is this number enough? match self.0.send(GenerationContext { request, streamer, tokens: Vec::with_capacity(256), start: None, queued, + output_buffer, }) { Ok(_) => Ok(UnboundedReceiverStream::new(receiver)), Err(_) => Err(GenerationError( diff --git a/backends/trtllm/src/main.rs b/backends/trtllm/src/main.rs index 543f8e6e..81fca0e7 100644 --- a/backends/trtllm/src/main.rs +++ b/backends/trtllm/src/main.rs @@ -37,7 +37,7 @@ struct Args { hostname: String, #[clap(default_value = "3000", long, short, env)] port: u16, - #[clap(default_value = "9000", long, short, env)] + #[clap(default_value = "9000", long, env)] prometheus_port: u16, #[clap(long, env, required = true)] tokenizer_name: String, diff --git a/backends/v2/src/main.rs b/backends/v2/src/main.rs index 60b5d52b..a0f3558c 100644 --- a/backends/v2/src/main.rs +++ b/backends/v2/src/main.rs @@ -36,7 +36,7 @@ struct Args { hostname: String, #[clap(default_value = "3000", long, short, env)] port: u16, - #[clap(default_value = "9000", long, short, env)] + #[clap(default_value = "9000", long, env)] prometheus_port: u16, #[clap(default_value = "/tmp/text-generation-server-0", long, env)] master_shard_uds_path: String, diff --git a/backends/v3/src/main.rs b/backends/v3/src/main.rs index 44e63853..75a20691 100644 --- a/backends/v3/src/main.rs +++ b/backends/v3/src/main.rs @@ -36,7 +36,7 @@ struct Args { hostname: String, #[clap(default_value = "3000", long, short, env)] port: u16, - #[clap(default_value = "9000", long, short, env)] + #[clap(default_value = "9000", long, env)] prometheus_port: u16, #[clap(default_value = "/tmp/text-generation-server-0", long, env)] master_shard_uds_path: String, diff --git a/docs/source/reference/launcher.md b/docs/source/reference/launcher.md index 51bd461f..c7ec90d9 100644 --- a/docs/source/reference/launcher.md +++ b/docs/source/reference/launcher.md @@ -254,7 +254,7 @@ Options: ``` ## PROMETHEUS_PORT ```shell - -p, --prometheus-port + --prometheus-port The Prometheus port to listen on [env: PROMETHEUS_PORT=] diff --git a/launcher/src/main.rs b/launcher/src/main.rs index c727623c..f339cbb4 100644 --- a/launcher/src/main.rs +++ b/launcher/src/main.rs @@ -774,7 +774,7 @@ struct Args { port: u16, /// The Prometheus port to listen on. - #[clap(default_value = "9000", long, short, env)] + #[clap(default_value = "9000", long, env)] prometheus_port: u16, /// The name of the socket for gRPC communication between the webserver