From 50e9fc89c872202e3702b3aebed566d2e3c5cb07 Mon Sep 17 00:00:00 2001 From: Morgan Funtowicz Date: Thu, 11 Jul 2024 21:24:32 +0000 Subject: [PATCH] working setup of the ffi layer --- backends/trtllm/include/backend.h | 32 +++++----- backends/trtllm/lib/backend.cpp | 66 ++++++++++---------- backends/trtllm/src/backend.rs | 100 +++++++++++++++++++++++++++--- backends/trtllm/src/ffi.cpp | 84 +++++++++++++++++++++++-- backends/trtllm/src/lib.rs | 28 +++++++-- 5 files changed, 247 insertions(+), 63 deletions(-) diff --git a/backends/trtllm/include/backend.h b/backends/trtllm/include/backend.h index 5fa07060..1823ddf9 100644 --- a/backends/trtllm/include/backend.h +++ b/backends/trtllm/include/backend.h @@ -5,8 +5,10 @@ #ifndef TGI_TRTLLM_BACKEND_H #define TGI_TRTLLM_BACKEND_H +#include #include #include +#include #include #include @@ -19,7 +21,8 @@ using json = nlohmann::json; namespace tle = tensorrt_llm::executor; namespace huggingface::tgi::backends { - + using RequestId = tle::IdType; + using TokenId = tle::TokenIdType; using TokenStreamingCallback = void(tle::TokenIdType); /** @@ -54,9 +57,7 @@ namespace huggingface::tgi::backends { * Indicate if the backend is ready to accept incoming request * @return true if ready, false otherwise */ - [[nodiscard]] bool IsReady() const { - return executor.canEnqueueRequests(); - } + [[nodiscard]] bool IsReady() const; /*** * Submit a new generation task to the executor @@ -65,26 +66,25 @@ namespace huggingface::tgi::backends { * @param topK * @param topP * @param temperature - * @param minLength - * @param repetitionPenalty - * @param frequencyPenalty * @param seed - * @param nTopTokens * @return Request id related to this generation for reference */ - [[nodiscard]] tle::IdType Submit( - const std::vector &tokens, + [[nodiscard]] RequestId Submit( + const std::vector &tokens, int32_t maxNewTokens, int32_t topK, float_t topP, float_t temperature, - int32_t minLength, - std::optional repetitionPenalty = std::nullopt, - std::optional frequencyPenalty = std::nullopt, - std::optional seed = std::nullopt, - std::optional nTopTokens = std::nullopt + uint64_t seed ); + /*** + * + * @param requestId The request id to poll the generation results + * @return + */ + std::vector Poll(RequestId requestId); + /*** * Unroll the token generation until end of stream is reached. * Every generated token is streamed back through the provided callback for further processing @@ -92,7 +92,7 @@ namespace huggingface::tgi::backends { * @param cb The callback to stream token back * @return Global number of generated tokens for this request id */ - size_t Stream(tle::IdType reqId, const std::function &cb); + uint32_t Stream(RequestId reqId, std::function &cb); }; } diff --git a/backends/trtllm/lib/backend.cpp b/backends/trtllm/lib/backend.cpp index 0f058128..ce778bb2 100644 --- a/backends/trtllm/lib/backend.cpp +++ b/backends/trtllm/lib/backend.cpp @@ -1,4 +1,5 @@ -#include +#include + #include #include @@ -17,15 +18,17 @@ tle::ExecutorConfig huggingface::tgi::backends::GetExecutorConfig(const json &co // Get the compute capabilities of the current hardware nvmlDevice_t device; int32_t cudaComputeCapabilitiesMajor = 0, cudaComputeCapabilitiesMinor = 0; - if(nvmlDeviceGetHandleByIndex_v2(0, &device) == NVML_SUCCESS) { + if (nvmlDeviceGetHandleByIndex_v2(0, &device) == NVML_SUCCESS) { SPDLOG_DEBUG("Successfully acquired nvmlDevice_t = 0"); - if(nvmlDeviceGetCudaComputeCapability(device, &cudaComputeCapabilitiesMajor, &cudaComputeCapabilitiesMinor) == NVML_SUCCESS) { - SPDLOG_INFO(FMT_STRING("Detected sm_{:d}{:d} compute capabilities"), cudaComputeCapabilitiesMajor, cudaComputeCapabilitiesMinor); + if (nvmlDeviceGetCudaComputeCapability(device, &cudaComputeCapabilitiesMajor, &cudaComputeCapabilitiesMinor) == + NVML_SUCCESS) { + SPDLOG_INFO(FMT_STRING("Detected sm_{:d}{:d} compute capabilities"), cudaComputeCapabilitiesMajor, + cudaComputeCapabilitiesMinor); } } // Single engine (TP = PP = 1) -> using leader mode (no MPI involved) - if(config["/pretrained_config/mapping/world_size"_json_pointer].get() == 1){ + if (config["/pretrained_config/mapping/world_size"_json_pointer].get() == 1) { SPDLOG_INFO("Detected single engine deployment, using leader mode"); execConfig.setParallelConfig(tle::ParallelConfig( tle::CommunicationType::kMPI, @@ -54,15 +57,18 @@ tle::ExecutorConfig huggingface::tgi::backends::GetExecutorConfig(const json &co huggingface::tgi::backends::TensorRtLlmBackend::TensorRtLlmBackend( const std::filesystem::path &enginesFolder, const std::filesystem::path &executorWorker -): - config(json::parse(std::ifstream(enginesFolder / "config.json"))), - executor( - enginesFolder, - tensorrt_llm::executor::ModelType::kDECODER_ONLY, - GetExecutorConfig(config, executorWorker.string() - )) -{ - SPDLOG_INFO(FMT_STRING("Engine (version={})"), config["/version"_json_pointer].get_ref()); +) : + config(json::parse(std::ifstream(enginesFolder / "config.json"))), + executor( + enginesFolder, + tensorrt_llm::executor::ModelType::kDECODER_ONLY, + GetExecutorConfig(config, executorWorker.string() + )) { + SPDLOG_INFO(FMT_STRING("Engine (version={})"), config["/version"_json_pointer].get_ref()); +} + +bool huggingface::tgi::backends::TensorRtLlmBackend::IsReady() const { + return executor.canEnqueueRequests(); } [[nodiscard("Returned request id needs to be provided back to gather generated tokens")]] @@ -72,11 +78,7 @@ tle::IdType huggingface::tgi::backends::TensorRtLlmBackend::Submit( const int32_t topK, const float_t topP, const float_t temperature, - const int32_t minLength, - std::optional repetitionPenalty, - std::optional frequencyPenalty, - std::optional seed, - std::optional nTopTokens + const uint64_t seed ) { SPDLOG_DEBUG( FMT_STRING("Submitting inference over {:d} tokens to the executor ({:d} already in-flight)"), @@ -92,27 +94,23 @@ tle::IdType huggingface::tgi::backends::TensorRtLlmBackend::Submit( std::nullopt, std::nullopt, seed, + std::nullopt, temperature, - minLength, std::nullopt, - repetitionPenalty, - std::nullopt, - frequencyPenalty, }; - const auto output = tle::OutputConfig{false, false, nTopTokens.value_or(1) > 1}; - const auto request = tle::Request{tokens, maxNewTokens, true, sampling, output}; - - return executor.enqueueRequest(request); + const auto output = tle::OutputConfig{false, false, false}; + return executor.enqueueRequest(tle::Request{tokens, maxNewTokens, true, sampling, output}); } -size_t huggingface::tgi::backends::TensorRtLlmBackend::Stream(const tle::IdType reqId, const std::function& cb) { +uint32_t huggingface::tgi::backends::TensorRtLlmBackend::Stream(const tle::IdType reqId, + std::function &cb) { bool isFinal = false; size_t generatedTokens = 0; do { const auto responses = executor.awaitResponses(reqId); - for (const auto &response: responses){ - if(response.hasError()) { + for (const auto &response: responses) { + if (response.hasError()) { SPDLOG_WARN("Caught error during generation: {}", response.getErrorMsg()); isFinal = true; } else { @@ -128,8 +126,12 @@ size_t huggingface::tgi::backends::TensorRtLlmBackend::Stream(const tle::IdType } } - } while(!isFinal); + } while (!isFinal); // Return the number of generated tokens return generatedTokens; -} \ No newline at end of file +} + +std::vector huggingface::tgi::backends::TensorRtLlmBackend::Poll(const tle::IdType requestId) { + return executor.awaitResponses(requestId); +} diff --git a/backends/trtllm/src/backend.rs b/backends/trtllm/src/backend.rs index d7d7180d..d4d3d00c 100644 --- a/backends/trtllm/src/backend.rs +++ b/backends/trtllm/src/backend.rs @@ -1,20 +1,24 @@ +use std::cell::RefCell; use std::path::Path; use async_trait::async_trait; use cxx::UniquePtr; use tokenizers::Tokenizer; use tokio::sync::mpsc; +use tokio::time::Instant; use tokio_stream::wrappers::UnboundedReceiverStream; use text_generation_router::infer::{Backend, InferError, InferStreamResponse}; -use text_generation_router::validation::ValidGenerateRequest; +use text_generation_router::validation::{Chunk, ValidGenerateRequest}; use crate::errors::TensorRtLlmBackendError; -use crate::ffi::{create_trtllm_backend, TensorRtLlmBackend}; +use crate::ffi::{create_trtllm_backend, TensorRtLlmBackendImpl}; + +struct GenerationContext(mpsc::UnboundedSender>); pub struct TrtLLmBackend { tokenizer: Tokenizer, - inner: UniquePtr, + inner: RefCell>, } unsafe impl Sync for TrtLLmBackend {} @@ -26,9 +30,12 @@ impl TrtLLmBackend { engine_folder: P, ) -> Result { let engine_folder = engine_folder.as_ref(); - let inner = create_trtllm_backend(engine_folder.to_str().unwrap()); + let inner = create_trtllm_backend(engine_folder.to_str().unwrap(), ""); - Ok(Self { tokenizer, inner }) + Ok(Self { + tokenizer, + inner: RefCell::new(inner), + }) } } @@ -39,12 +46,91 @@ impl Backend for TrtLLmBackend { request: ValidGenerateRequest, ) -> Result>, InferError> { let (sender, receiver) = mpsc::unbounded_channel(); - let request_id = self.inner.submit(); + let ctx = Box::new(GenerationContext(sender)); + + // Unpack parameters + let params = request.parameters; + + // Currently we handle single chunk of text + if request.inputs.len() == 1 { + match request + .inputs + .first() + .expect("Failed to access the first chunk") + { + Chunk::Text(text) => { + let encoding = self + .tokenizer + .encode(&**text, true) + .map_err(|e| InferError::ToolError(e.to_string()))?; + + let _start = Instant::now(); + let _request_id = self + .inner + .borrow_mut() + .as_mut() + .expect("Failed to retrieve pointer to TRTLLM backend") + .submit( + encoding.get_ids(), + 128, + params.top_k as i32, + params.top_p, + params.temperature, + params.seed, + ); + + // spawn_blocking(|| { + // // Stream generated tokens + // let num_generated_tokens = self + // .inner + // .borrow_mut() + // .as_mut() + // .expect("Failed to retrieve pointer to TRTLLM backend") + // .stream(request_id, ctx, |token, step, is_final| { + // // self.tokenizer.decode(&*[token], true).unwrap(); + // let token = Token { + // id: token, + // text: String::from(""), + // logprob: 1.0f32, + // special: false, + // }; + // + // sender + // .send(Ok(InferStreamResponse::Intermediate { + // token, + // top_tokens: vec![], + // })) + // .unwrap() + // }); + // + // // Notify the end + // Ok(InferStreamResponse::End { + // token: Token { + // id: 0, + // text: String::from(""), + // logprob: 1.0f32, + // special: false, + // }, + // top_tokens: vec![], + // generated_text: GeneratedText { + // text: String::from(""), + // generated_tokens: num_generated_tokens, + // finish_reason: FinishReason::EndOfSequenceToken, + // seed: Some(params.seed), + // }, + // start, + // queued: Instant::now(), + // }) + // }); + } + Chunk::Image(_) => {} + } + }; Ok(UnboundedReceiverStream::new(receiver)) } async fn health(&self, _current_health: bool) -> bool { - self.inner.is_ready() + self.inner.borrow_mut().is_ready() } } diff --git a/backends/trtllm/src/ffi.cpp b/backends/trtllm/src/ffi.cpp index 63043e05..06fc3623 100644 --- a/backends/trtllm/src/ffi.cpp +++ b/backends/trtllm/src/ffi.cpp @@ -1,23 +1,99 @@ // // Created by mfuntowicz on 6/30/24. // -#include -#include "rust/cxx.h" +#pragma once +#include +#include +#include + +#include "rust/cxx.h" #include "backends/trtllm/include/backend.h" namespace huggingface::tgi::backends { + class TensorRtLlmBackendImpl : TensorRtLlmBackend { + public: + /*** + * + * @param engineFolder + * @param executorWorker + */ + TensorRtLlmBackendImpl(const std::string_view &engineFolder, const std::string_view &executorWorker) : + TensorRtLlmBackend(std::move(engineFolder), std::move(executorWorker)) {} + + /*** + * + * @return + */ + bool IsReady() const { return TensorRtLlmBackend::IsReady(); } + + /*** + * + * @param tokens + * @param maxNewTokens + * @param topK + * @param topP + * @param temperature + * @param seed + * @return + */ + [[nodiscard("returned request id should be used to refer to the request's generation result later on")]] + RequestId Submit(rust::Slice tokens, + int32_t maxNewTokens, + int32_t topK, + float_t topP, + float_t temperature, + uint64_t seed) { + // This will copy all the items from the initial slice + std::vector tokens_(tokens.size()); + tokens_.assign(tokens.begin(), tokens.end()); + + return TensorRtLlmBackend::Submit(std::move(tokens_), maxNewTokens, topK, topP, temperature, seed); + } + + /*** + * + * @param requestId + * @param handler + * @return + */ +// uint32_t +// Stream(RequestId requestId, rust::Box , rust::Fn handler) { +// bool isDone = false; +// uint32_t numGeneratedTokens = 0; +// +// do { +// const auto responses = Poll(requestId); +// for (const auto &response: responses) { +// if (response.hasError()) { +// isDone = true; +// // TODO : bubble up the error to rust +// } else { +// const auto generation = response.getResult(); +// const auto token = generation.outputTokenIds[0][0]; +// isDone = generation.isFinal; +// +// // Propagate through the handler +// handler(token, numGeneratedTokens, isDone); +// } +// } +// } while (!isDone); +// +// return numGeneratedTokens; +// } + }; + /*** * * @param engineFolder * @return */ - std::unique_ptr create_trtllm_backend(rust::Str engineFolder, rust::Str executorWorker) { + std::unique_ptr create_trtllm_backend(rust::Str engineFolder, rust::Str executorWorker) { // Unconditionally call this to initialize and discover TRTLLM plugins InitializeBackend(); const auto enginePath = std::string_view(engineFolder.begin(), engineFolder.end()); const auto executorPath = std::string_view(executorWorker.begin(), executorWorker.end()); - return std::make_unique(std::move(enginePath), std::move(executorPath)); + return std::make_unique(std::move(enginePath), std::move(executorPath)); } } \ No newline at end of file diff --git a/backends/trtllm/src/lib.rs b/backends/trtllm/src/lib.rs index 7f7d5442..d2838099 100644 --- a/backends/trtllm/src/lib.rs +++ b/backends/trtllm/src/lib.rs @@ -8,7 +8,8 @@ mod ffi { unsafe extern "C++" { include!("backends/trtllm/src/ffi.cpp"); - type TensorRtLlmBackend; + /// Represent an instance of the underlying TensorRT-LLM backend + type TensorRtLlmBackendImpl; /// Create an instance backed behind an std::unique_ptr to manage the lifespan of the backend /// @@ -24,12 +25,31 @@ mod ffi { /// ``` /// /// ``` - fn create_trtllm_backend(engine_folder: &str, executor_worker: &str) -> UniquePtr; + fn create_trtllm_backend( + engine_folder: &str, + executor_worker: &str, + ) -> UniquePtr; #[rust_name = "is_ready"] - fn IsReady(&self) -> bool; + fn IsReady(self: &TensorRtLlmBackendImpl) -> bool; #[rust_name = "submit"] - fn Submit(&self) -> u64; + fn Submit( + self: Pin<&mut TensorRtLlmBackendImpl>, + tokens: &[u32], + max_new_tokens: i32, + top_k: i32, + top_p: f32, + temperature: f32, + seed: u64, + ) -> u64; + + // #[rust_name = "stream"] + // fn Stream( + // self: Pin<&mut TensorRtLlmBackendImpl>, + // request_id: u64, + // ctx: Box, + // callback: fn(u32, u32, bool), + // ) -> u32; } }