text-generation-inference/backends/v3/src/queue.rs

819 lines
29 KiB
Rust
Raw Normal View History

Rebase TRT-llm (#2331) * wip wip refacto refacto Initial setup for CXX binding to TRTLLM Working FFI call for TGI and TRTLLM backend Remove unused parameters annd force tokenizer name to be set Overall build TRTLLM and deps through CMake build system Enable end to end CMake build First version loading engines and making it ready for inference Remembering to check how we can detect support for chunked context Move to latest TensorRT-LLM version Specify which default log level to use depending on CMake build type make leader executor mode working unconditionally call InitializeBackend on the FFI layer bind to CUDA::nvml to retrieve compute capabilities at runtime updated logic and comment to detect cuda compute capabilities implement the Stream method to send new tokens through a callback use spdlog release 1.14.1 moving forward update trtllm to latest version a96cccafcf6365c128f004f779160951f8c0801c correctly tell cmake to build dependent tensorrt-llm required libraries create cmake install target to put everything relevant in installation folder add auth_token CLI argument to provide hf hub authentification token allow converting huggingface::tokenizers error to TensorRtLlmBackendError use correct include for spdlog include guard to build example in cmakelists working setup of the ffi layer remove fmt import use external fmt lib end to end ffi flow working make sure to track include/ffi.h to trigger rebuild from cargo impl the rust backend which currently cannot move the actual computation in background thread expose shutdown function at ffi layer impl RwLock scenario for TensorRtLllmBackend oops missing c++ backend definitions compute the number of maximum new tokens for each request independently make sure the context is not dropped in the middle of the async decoding. remove unnecessary log add all the necessary plumbery to return the generated content update invalid doc in cpp file correctly forward back the log probabilities remove unneeded scope variable for now refactor Stream impl for Generation to factorise code expose the internal missing start/queue timestamp forward tgi parameters rep/freq penalty add some more validation about grammar not supported define a shared struct to hold the result of a decoding step expose information about potential error happening while decoding remove logging add logging in case of decoding error make sure executor_worker is provided add initial Dockerfile for TRTLLM backend add some more information in CMakeLists.txt to correctly install executorWorker add some more information in CMakeLists.txt to correctly find and install nvrtc wrapper simplify prebuilt trtllm libraries name definition do the same name definition stuff for tensorrt_llm_executor_static leverage pkg-config to probe libraries paths and reuse new install structure from cmake fix bad copy/past missing nvinfer linkage direction align all the linker search dependency add missing pkgconfig folder for MPI in Dockerfile correctly setup linking search path for runtime layer fix missing / before tgi lib path adding missing ld_library_path for cuda stubs in Dockerfile update tgi entrypoint commenting out Python part for TensorRT installation refactored docker image move to TensorRT-LLM v0.11.0 make docker linter happy with same capitalization rule fix typo refactor the compute capabilities detection along with num gpus update TensorRT-LLM to latest version update TensorRT install script to latest update build.rs to link to cuda 12.5 add missing dependant libraries for linking clean up a bit install to decoder_attention target add some custom stuff for nccl linkage fix envvar CARGO_CFG_TARGET_ARCH set at runtime vs compile time use std::env::const::ARCH make sure variable live long enough... look for cuda 12.5 add some more basic info in README.md * Rebase. * Fix autodocs. * Let's try to enable trtllm backend. * Ignore backends/v3 by default. * Fixing client. * Fix makefile + autodocs. * Updating the schema thing + redocly. * Fix trtllm lint. * Adding pb files ? * Remove cargo fmt temporarily. * ? * Tmp. * Remove both check + clippy ? * Backporting telemetry. * Backporting 457fb0a1 * Remove PB from git. * Fixing PB with default member backends/client * update TensorRT-LLM to latest version * provided None for api_key * link against libtensorrt_llm and not libtensorrt-llm --------- Co-authored-by: OlivierDehaene <23298448+OlivierDehaene@users.noreply.github.com> Co-authored-by: Morgan Funtowicz <morgan@huggingface.co>
2024-07-31 08:33:10 +00:00
use crate::block_allocator::{BlockAllocation, BlockAllocator};
use crate::client;
use crate::client::{
Batch, GrammarType, NextTokenChooserParameters, Request, StoppingCriteriaParameters,
};
use nohash_hasher::{BuildNoHashHasher, IntMap};
use std::cmp::max;
use std::collections::VecDeque;
Rebase TRT-llm (#2331) * wip wip refacto refacto Initial setup for CXX binding to TRTLLM Working FFI call for TGI and TRTLLM backend Remove unused parameters annd force tokenizer name to be set Overall build TRTLLM and deps through CMake build system Enable end to end CMake build First version loading engines and making it ready for inference Remembering to check how we can detect support for chunked context Move to latest TensorRT-LLM version Specify which default log level to use depending on CMake build type make leader executor mode working unconditionally call InitializeBackend on the FFI layer bind to CUDA::nvml to retrieve compute capabilities at runtime updated logic and comment to detect cuda compute capabilities implement the Stream method to send new tokens through a callback use spdlog release 1.14.1 moving forward update trtllm to latest version a96cccafcf6365c128f004f779160951f8c0801c correctly tell cmake to build dependent tensorrt-llm required libraries create cmake install target to put everything relevant in installation folder add auth_token CLI argument to provide hf hub authentification token allow converting huggingface::tokenizers error to TensorRtLlmBackendError use correct include for spdlog include guard to build example in cmakelists working setup of the ffi layer remove fmt import use external fmt lib end to end ffi flow working make sure to track include/ffi.h to trigger rebuild from cargo impl the rust backend which currently cannot move the actual computation in background thread expose shutdown function at ffi layer impl RwLock scenario for TensorRtLllmBackend oops missing c++ backend definitions compute the number of maximum new tokens for each request independently make sure the context is not dropped in the middle of the async decoding. remove unnecessary log add all the necessary plumbery to return the generated content update invalid doc in cpp file correctly forward back the log probabilities remove unneeded scope variable for now refactor Stream impl for Generation to factorise code expose the internal missing start/queue timestamp forward tgi parameters rep/freq penalty add some more validation about grammar not supported define a shared struct to hold the result of a decoding step expose information about potential error happening while decoding remove logging add logging in case of decoding error make sure executor_worker is provided add initial Dockerfile for TRTLLM backend add some more information in CMakeLists.txt to correctly install executorWorker add some more information in CMakeLists.txt to correctly find and install nvrtc wrapper simplify prebuilt trtllm libraries name definition do the same name definition stuff for tensorrt_llm_executor_static leverage pkg-config to probe libraries paths and reuse new install structure from cmake fix bad copy/past missing nvinfer linkage direction align all the linker search dependency add missing pkgconfig folder for MPI in Dockerfile correctly setup linking search path for runtime layer fix missing / before tgi lib path adding missing ld_library_path for cuda stubs in Dockerfile update tgi entrypoint commenting out Python part for TensorRT installation refactored docker image move to TensorRT-LLM v0.11.0 make docker linter happy with same capitalization rule fix typo refactor the compute capabilities detection along with num gpus update TensorRT-LLM to latest version update TensorRT install script to latest update build.rs to link to cuda 12.5 add missing dependant libraries for linking clean up a bit install to decoder_attention target add some custom stuff for nccl linkage fix envvar CARGO_CFG_TARGET_ARCH set at runtime vs compile time use std::env::const::ARCH make sure variable live long enough... look for cuda 12.5 add some more basic info in README.md * Rebase. * Fix autodocs. * Let's try to enable trtllm backend. * Ignore backends/v3 by default. * Fixing client. * Fix makefile + autodocs. * Updating the schema thing + redocly. * Fix trtllm lint. * Adding pb files ? * Remove cargo fmt temporarily. * ? * Tmp. * Remove both check + clippy ? * Backporting telemetry. * Backporting 457fb0a1 * Remove PB from git. * Fixing PB with default member backends/client * update TensorRT-LLM to latest version * provided None for api_key * link against libtensorrt_llm and not libtensorrt-llm --------- Co-authored-by: OlivierDehaene <23298448+OlivierDehaene@users.noreply.github.com> Co-authored-by: Morgan Funtowicz <morgan@huggingface.co>
2024-07-31 08:33:10 +00:00
use text_generation_router::infer::InferError;
use text_generation_router::infer::InferStreamResponse;
use text_generation_router::validation::{
Chunk, ChunksToString, ValidGenerateRequest, ValidGrammar, ValidParameters,
ValidStoppingParameters,
};
2023-10-23 13:51:12 +00:00
use tokio::sync::{mpsc, oneshot};
use tokio::time::Instant;
use tracing::{info_span, instrument, Instrument, Span};
/// Queue entry
#[derive(Debug)]
pub(crate) struct Entry {
/// Request
pub request: ValidGenerateRequest,
/// Response sender to communicate between the Infer struct and the batching_task
2023-10-23 13:51:12 +00:00
pub response_tx: mpsc::UnboundedSender<Result<InferStreamResponse, InferError>>,
2023-02-13 12:02:45 +00:00
/// Span that will live as long as entry
pub span: Span,
/// Temporary span used as a guard when logging inference, wait times...
pub temp_span: Option<Span>,
/// Instant when this entry was queued
pub queue_time: Instant,
/// Instant when this entry was added to a batch
pub batch_time: Option<Instant>,
/// Block Allocation
pub block_allocation: Option<BlockAllocation>,
}
/// Request Queue
#[derive(Debug, Clone)]
pub(crate) struct Queue {
/// Channel to communicate with the background queue task
2023-10-23 13:51:12 +00:00
queue_sender: mpsc::UnboundedSender<QueueCommand>,
}
impl Queue {
2023-12-11 11:46:30 +00:00
pub(crate) fn new(
requires_padding: bool,
block_size: u32,
prefix_caching: bool,
2023-12-11 11:46:30 +00:00
window_size: Option<u32>,
speculate: u32,
max_batch_total_tokens: u32,
support_chunking: bool,
2023-12-11 11:46:30 +00:00
) -> Self {
// Create channel
2023-10-23 13:51:12 +00:00
let (queue_sender, queue_receiver) = mpsc::unbounded_channel();
// Launch background queue task
2023-09-28 07:55:47 +00:00
tokio::spawn(queue_task(
requires_padding,
block_size,
prefix_caching,
2023-09-28 07:55:47 +00:00
window_size,
2023-12-11 11:46:30 +00:00
speculate,
max_batch_total_tokens,
support_chunking,
2023-09-28 07:55:47 +00:00
queue_receiver,
));
Self { queue_sender }
}
/// Append an entry to the queue
2023-02-13 12:02:45 +00:00
#[instrument(skip_all)]
pub(crate) fn append(&self, entry: Entry) {
// Send append command to the background task managing the state
// Unwrap is safe here
2023-02-13 12:02:45 +00:00
self.queue_sender
.send(QueueCommand::Append(Box::new(entry), Span::current()))
2023-02-13 12:02:45 +00:00
.unwrap();
}
// Get the next batch
2023-02-13 12:02:45 +00:00
#[instrument(skip(self))]
pub(crate) async fn next_batch(
&self,
min_size: Option<usize>,
max_size: Option<usize>,
prefill_token_budget: u32,
token_budget: u32,
) -> Option<NextBatch> {
if prefill_token_budget == 0 || token_budget == 0 {
return None;
};
// Create response channel
let (response_sender, response_receiver) = oneshot::channel();
// Send next batch command to the background task managing the state
// Unwrap is safe here
self.queue_sender
.send(QueueCommand::NextBatch {
min_size,
max_size,
prefill_token_budget,
token_budget,
response_sender,
2023-02-13 12:02:45 +00:00
span: Span::current(),
})
.unwrap();
// Await on response channel
// Unwrap is safe here
response_receiver.await.unwrap()
}
}
// Background task responsible of the queue state
#[allow(clippy::too_many_arguments)]
async fn queue_task(
requires_padding: bool,
block_size: u32,
prefix_caching: bool,
2023-09-28 07:55:47 +00:00
window_size: Option<u32>,
2023-12-11 11:46:30 +00:00
speculate: u32,
max_batch_total_tokens: u32,
support_chunking: bool,
2023-10-23 13:51:12 +00:00
mut receiver: mpsc::UnboundedReceiver<QueueCommand>,
) {
let mut state = State::new(
requires_padding,
block_size,
prefix_caching,
window_size,
speculate,
max_batch_total_tokens,
support_chunking,
);
2023-10-23 13:51:12 +00:00
while let Some(cmd) = receiver.recv().await {
match cmd {
QueueCommand::Append(entry, span) => {
span.in_scope(|| state.append(*entry));
metrics::gauge!("tgi_queue_size").increment(1.0);
}
QueueCommand::NextBatch {
min_size,
max_size,
prefill_token_budget,
token_budget,
response_sender,
2023-02-13 12:02:45 +00:00
span,
} => {
let next_batch = state
.next_batch(min_size, max_size, prefill_token_budget, token_budget)
.instrument(span)
.await;
response_sender.send(next_batch).unwrap();
metrics::gauge!("tgi_queue_size").set(state.entries.len() as f64);
}
}
}
}
/// Queue State
#[derive(Debug)]
struct State {
/// Queue entries organized in a Vec
entries: VecDeque<(u64, Entry)>,
/// Id of the next entry
next_id: u64,
/// Id of the next batch
next_batch_id: u64,
/// Paged Attention block size
block_size: u32,
2023-09-28 07:55:47 +00:00
2023-12-11 11:46:30 +00:00
/// Speculation amount
speculate: u32,
/// Whether the model allow the prefill chunking
/// If it does, the last request in the batch will be split to exactly match the prefill
/// token budget
support_chunking: bool,
/// Paged Attention Block Allocation
block_allocator: Option<BlockAllocator>,
}
impl State {
2023-12-11 11:46:30 +00:00
fn new(
requires_padding: bool,
block_size: u32,
prefix_caching: bool,
2023-12-11 11:46:30 +00:00
window_size: Option<u32>,
speculate: u32,
max_batch_total_tokens: u32,
support_chunking: bool,
2023-12-11 11:46:30 +00:00
) -> Self {
let block_allocator = (!requires_padding).then(|| {
BlockAllocator::new(
max_batch_total_tokens,
block_size,
prefix_caching,
window_size,
)
});
Self {
entries: VecDeque::with_capacity(128),
next_id: 0,
next_batch_id: 0,
block_size,
2023-12-11 11:46:30 +00:00
speculate,
support_chunking,
block_allocator,
}
}
/// Append an entry to the queue
2023-02-13 12:02:45 +00:00
fn append(&mut self, mut entry: Entry) {
// Create a span that will live as long as the entry is in the queue waiting to be batched
let queue_span = info_span!(parent: &entry.span, "queued");
entry.temp_span = Some(queue_span);
// Push entry in the queue
self.entries.push_back((self.next_id, entry));
self.next_id += 1;
}
// Get the next batch
async fn next_batch(
&mut self,
min_size: Option<usize>,
max_size: Option<usize>,
prefill_token_budget: u32,
token_budget: u32,
) -> Option<NextBatch> {
if self.entries.is_empty() {
Improve the defaults for the launcher (#1727) # What does this PR do? - Renamed `max_input_length` into `max_input_tokens` for consistency (backward compatible change, will yell if both are set.) - Will now use the config for `max_input_tokens` `max_total_token` and `max_batch_total_tokens`. - Capping the values to 16k in order to save VRAM on behalf of users (overriddable by simply setting the values). <!-- Congratulations! You've made it this far! You're not quite done yet though. Once merged, your PR is going to appear in the release notes with the title you set, so make sure it's a great title that fully reflects the extent of your awesome contribution. Then, please replace this with a description of the change and which issue is fixed (if applicable). Please also include relevant motivation and context. List any dependencies (if any) that are required for this change. Once you're done, someone will review your PR shortly (see the section "Who can review?" below to tag some potential reviewers). They may suggest changes to make the code even better. If no one reviewed your PR after a week has passed, don't hesitate to post a new comment @-mentioning the same persons---sometimes notifications get lost. --> <!-- Remove if not applicable --> Fixes # (issue) ## Before submitting - [ ] This PR fixes a typo or improves the docs (you can dismiss the other checks if that's the case). - [ ] Did you read the [contributor guideline](https://github.com/huggingface/transformers/blob/main/CONTRIBUTING.md#start-contributing-pull-requests), Pull Request section? - [ ] Was this discussed/approved via a Github issue or the [forum](https://discuss.huggingface.co/)? Please add a link to it if that's the case. - [ ] Did you make sure to update the documentation with your changes? Here are the [documentation guidelines](https://github.com/huggingface/transformers/tree/main/docs), and [here are tips on formatting docstrings](https://github.com/huggingface/transformers/tree/main/docs#writing-source-documentation). - [ ] Did you write any new necessary tests? ## Who can review? Anyone in the community is free to review the PR once the tests have passed. Feel free to tag members/contributors who may be interested in your PR. <!-- Your PR will be replied to more quickly if you can figure out the right person to tag with @ @OlivierDehaene OR @Narsil -->
2024-04-12 12:20:31 +00:00
tracing::debug!("No queue");
return None;
}
// Check if we have enough entries
if let Some(min_size) = min_size {
if self.entries.len() < min_size {
Improve the defaults for the launcher (#1727) # What does this PR do? - Renamed `max_input_length` into `max_input_tokens` for consistency (backward compatible change, will yell if both are set.) - Will now use the config for `max_input_tokens` `max_total_token` and `max_batch_total_tokens`. - Capping the values to 16k in order to save VRAM on behalf of users (overriddable by simply setting the values). <!-- Congratulations! You've made it this far! You're not quite done yet though. Once merged, your PR is going to appear in the release notes with the title you set, so make sure it's a great title that fully reflects the extent of your awesome contribution. Then, please replace this with a description of the change and which issue is fixed (if applicable). Please also include relevant motivation and context. List any dependencies (if any) that are required for this change. Once you're done, someone will review your PR shortly (see the section "Who can review?" below to tag some potential reviewers). They may suggest changes to make the code even better. If no one reviewed your PR after a week has passed, don't hesitate to post a new comment @-mentioning the same persons---sometimes notifications get lost. --> <!-- Remove if not applicable --> Fixes # (issue) ## Before submitting - [ ] This PR fixes a typo or improves the docs (you can dismiss the other checks if that's the case). - [ ] Did you read the [contributor guideline](https://github.com/huggingface/transformers/blob/main/CONTRIBUTING.md#start-contributing-pull-requests), Pull Request section? - [ ] Was this discussed/approved via a Github issue or the [forum](https://discuss.huggingface.co/)? Please add a link to it if that's the case. - [ ] Did you make sure to update the documentation with your changes? Here are the [documentation guidelines](https://github.com/huggingface/transformers/tree/main/docs), and [here are tips on formatting docstrings](https://github.com/huggingface/transformers/tree/main/docs#writing-source-documentation). - [ ] Did you write any new necessary tests? ## Who can review? Anyone in the community is free to review the PR once the tests have passed. Feel free to tag members/contributors who may be interested in your PR. <!-- Your PR will be replied to more quickly if you can figure out the right person to tag with @ @OlivierDehaene OR @Narsil -->
2024-04-12 12:20:31 +00:00
tracing::debug!("Not enough entries");
return None;
}
}
if let Some(max_size) = max_size {
if max_size == 0 {
tracing::debug!("No capacity");
return None;
}
}
// Pad prefill_token_budget to be a multiple of block size
let prefill_token_budget =
((prefill_token_budget + self.block_size - 1) / self.block_size) * self.block_size;
2023-02-13 12:02:45 +00:00
// Create span for this batch to add context to inference calls
let next_batch_span = info_span!(parent: None, "batch", batch_size = tracing::field::Empty);
next_batch_span.follows_from(Span::current());
2023-02-13 12:02:45 +00:00
let mut batch = Vec::with_capacity(self.entries.len());
let mut max_input_length = 0;
let mut prefill_tokens: u32 = 0;
let mut decode_tokens: u32 = 0;
let mut max_blocks = 0;
// Pop entries starting from the front of the queue
'entry_loop: while let Some((id, entry)) = self.entries.pop_front() {
// Filter entries where the response receiver was dropped (== entries where the request
// was dropped by the client)
2023-10-23 13:51:12 +00:00
if entry.response_tx.is_closed() {
metrics::counter!("tgi_request_failure", "err" => "dropped").increment(1);
Improve the defaults for the launcher (#1727) # What does this PR do? - Renamed `max_input_length` into `max_input_tokens` for consistency (backward compatible change, will yell if both are set.) - Will now use the config for `max_input_tokens` `max_total_token` and `max_batch_total_tokens`. - Capping the values to 16k in order to save VRAM on behalf of users (overriddable by simply setting the values). <!-- Congratulations! You've made it this far! You're not quite done yet though. Once merged, your PR is going to appear in the release notes with the title you set, so make sure it's a great title that fully reflects the extent of your awesome contribution. Then, please replace this with a description of the change and which issue is fixed (if applicable). Please also include relevant motivation and context. List any dependencies (if any) that are required for this change. Once you're done, someone will review your PR shortly (see the section "Who can review?" below to tag some potential reviewers). They may suggest changes to make the code even better. If no one reviewed your PR after a week has passed, don't hesitate to post a new comment @-mentioning the same persons---sometimes notifications get lost. --> <!-- Remove if not applicable --> Fixes # (issue) ## Before submitting - [ ] This PR fixes a typo or improves the docs (you can dismiss the other checks if that's the case). - [ ] Did you read the [contributor guideline](https://github.com/huggingface/transformers/blob/main/CONTRIBUTING.md#start-contributing-pull-requests), Pull Request section? - [ ] Was this discussed/approved via a Github issue or the [forum](https://discuss.huggingface.co/)? Please add a link to it if that's the case. - [ ] Did you make sure to update the documentation with your changes? Here are the [documentation guidelines](https://github.com/huggingface/transformers/tree/main/docs), and [here are tips on formatting docstrings](https://github.com/huggingface/transformers/tree/main/docs#writing-source-documentation). - [ ] Did you write any new necessary tests? ## Who can review? Anyone in the community is free to review the PR once the tests have passed. Feel free to tag members/contributors who may be interested in your PR. <!-- Your PR will be replied to more quickly if you can figure out the right person to tag with @ @OlivierDehaene OR @Narsil -->
2024-04-12 12:20:31 +00:00
tracing::debug!("Dropping entry");
continue;
}
let block_allocation = match &self.block_allocator {
None => {
// We pad to max input length in the Python shards
// We need to take these padding tokens into the equation
max_input_length = max_input_length.max(entry.request.input_length);
prefill_tokens = (batch.len() + 1) as u32 * max_input_length;
decode_tokens += entry.request.stopping_parameters.max_new_tokens;
let total_tokens = prefill_tokens + decode_tokens + self.speculate;
if prefill_tokens > prefill_token_budget || total_tokens > token_budget {
// Entry is over budget
// Add it back to the front
tracing::debug!("Over budget: prefill_tokens={prefill_tokens} > {prefill_token_budget} || {prefill_tokens} + {decode_tokens} + {} > {token_budget}", self.speculate);
self.entries.push_front((id, entry));
break 'entry_loop;
}
None
}
Some(block_allocator) => {
// If users wants the prefill logprobs, we cannot reuse the cache.
// So no input_ids for the radix tree.
let input_ids = if entry.request.decoder_input_details {
None
} else {
entry.request.input_ids.clone()
};
let tokens = entry.request.input_length
+ entry.request.stopping_parameters.max_new_tokens
+ self.speculate
- 1;
tracing::debug!("Allocating {tokens} with {input_ids:?}");
let block_allocation = match block_allocator.allocate(tokens, input_ids).await {
None => {
// Entry is over budget
// Add it back to the front
tracing::debug!("Over budget: not enough free blocks");
self.entries.push_front((id, entry));
break 'entry_loop;
}
Some(mut block_allocation) => {
tracing::debug!("Allocation: {block_allocation:?}");
max_blocks = max(max_blocks, block_allocation.blocks.len() as u32);
if block_allocation.prefix_len == entry.request.input_length {
// The whole request was found in the radix trie
// However, for the transformer forward to work, we need to
// have at least one token of postfix.
block_allocation.prefix_len -= 1;
}
block_allocation
}
};
let postfix_len = entry.request.input_length - block_allocation.prefix_len;
if prefill_tokens + postfix_len > prefill_token_budget {
// Entry is over budget
if self.support_chunking {
// We support chunking, just set postfix_len to exactly match prefill_token_budget
let chunk_len = prefill_token_budget.saturating_sub(prefill_tokens);
if chunk_len > 0 {
// Push this entry inside the batch
batch.push((id, entry, Some(block_allocation), Some(chunk_len)));
} else {
// We cannot prefill even one token for this entry
// Add it back to the queue
self.entries.push_front((id, entry));
}
tracing::debug!(
"Matched budget: prefill_tokens={} == {prefill_token_budget}",
prefill_tokens + postfix_len
);
break 'entry_loop;
} else {
// We don't support chunking, this entry needs to go back to the buffer
// Add it back to the front
tracing::debug!(
"Over budget: prefill_tokens={} > {prefill_token_budget}",
prefill_tokens + postfix_len
);
self.entries.push_front((id, entry));
break 'entry_loop;
}
}
prefill_tokens += postfix_len;
Some(block_allocation)
}
};
batch.push((id, entry, block_allocation, None));
if Some(batch.len()) == max_size {
break;
}
}
// Empty batch
if batch.is_empty() {
tracing::debug!("Filterered out all entries");
return None;
}
// XXX We haven't allocated yet, so we're allowed to ditch the results.
// Check if our batch is big enough
if let Some(min_size) = min_size {
// Batch is too small
if batch.len() < min_size {
// Add back entries to the queue in the correct order
for (id, entry, _, _) in batch.into_iter().rev() {
self.entries.push_front((id, entry));
}
return None;
}
}
let mut batch_requests = Vec::with_capacity(self.entries.len());
let mut batch_entries =
IntMap::with_capacity_and_hasher(self.entries.len(), BuildNoHashHasher::default());
for (id, mut entry, block_allocation, chunk_len) in batch {
// Create a new span to link the batch back to this entry
let entry_batch_span = info_span!(parent: &entry.span, "infer");
// Add relationships
next_batch_span.follows_from(&entry_batch_span);
entry_batch_span.follows_from(&next_batch_span);
// Update entry
entry.temp_span = Some(entry_batch_span);
let (blocks, slots, prefix_len) = match &block_allocation {
None => (Vec::new(), Vec::new(), 0),
Some(block_allocation) => (
block_allocation.blocks.clone(),
block_allocation.slots.clone(),
block_allocation.prefix_len,
),
};
entry.block_allocation = block_allocation;
batch_requests.push(Request {
id,
prefill_logprobs: entry.request.decoder_input_details,
Rebase TRT-llm (#2331) * wip wip refacto refacto Initial setup for CXX binding to TRTLLM Working FFI call for TGI and TRTLLM backend Remove unused parameters annd force tokenizer name to be set Overall build TRTLLM and deps through CMake build system Enable end to end CMake build First version loading engines and making it ready for inference Remembering to check how we can detect support for chunked context Move to latest TensorRT-LLM version Specify which default log level to use depending on CMake build type make leader executor mode working unconditionally call InitializeBackend on the FFI layer bind to CUDA::nvml to retrieve compute capabilities at runtime updated logic and comment to detect cuda compute capabilities implement the Stream method to send new tokens through a callback use spdlog release 1.14.1 moving forward update trtllm to latest version a96cccafcf6365c128f004f779160951f8c0801c correctly tell cmake to build dependent tensorrt-llm required libraries create cmake install target to put everything relevant in installation folder add auth_token CLI argument to provide hf hub authentification token allow converting huggingface::tokenizers error to TensorRtLlmBackendError use correct include for spdlog include guard to build example in cmakelists working setup of the ffi layer remove fmt import use external fmt lib end to end ffi flow working make sure to track include/ffi.h to trigger rebuild from cargo impl the rust backend which currently cannot move the actual computation in background thread expose shutdown function at ffi layer impl RwLock scenario for TensorRtLllmBackend oops missing c++ backend definitions compute the number of maximum new tokens for each request independently make sure the context is not dropped in the middle of the async decoding. remove unnecessary log add all the necessary plumbery to return the generated content update invalid doc in cpp file correctly forward back the log probabilities remove unneeded scope variable for now refactor Stream impl for Generation to factorise code expose the internal missing start/queue timestamp forward tgi parameters rep/freq penalty add some more validation about grammar not supported define a shared struct to hold the result of a decoding step expose information about potential error happening while decoding remove logging add logging in case of decoding error make sure executor_worker is provided add initial Dockerfile for TRTLLM backend add some more information in CMakeLists.txt to correctly install executorWorker add some more information in CMakeLists.txt to correctly find and install nvrtc wrapper simplify prebuilt trtllm libraries name definition do the same name definition stuff for tensorrt_llm_executor_static leverage pkg-config to probe libraries paths and reuse new install structure from cmake fix bad copy/past missing nvinfer linkage direction align all the linker search dependency add missing pkgconfig folder for MPI in Dockerfile correctly setup linking search path for runtime layer fix missing / before tgi lib path adding missing ld_library_path for cuda stubs in Dockerfile update tgi entrypoint commenting out Python part for TensorRT installation refactored docker image move to TensorRT-LLM v0.11.0 make docker linter happy with same capitalization rule fix typo refactor the compute capabilities detection along with num gpus update TensorRT-LLM to latest version update TensorRT install script to latest update build.rs to link to cuda 12.5 add missing dependant libraries for linking clean up a bit install to decoder_attention target add some custom stuff for nccl linkage fix envvar CARGO_CFG_TARGET_ARCH set at runtime vs compile time use std::env::const::ARCH make sure variable live long enough... look for cuda 12.5 add some more basic info in README.md * Rebase. * Fix autodocs. * Let's try to enable trtllm backend. * Ignore backends/v3 by default. * Fixing client. * Fix makefile + autodocs. * Updating the schema thing + redocly. * Fix trtllm lint. * Adding pb files ? * Remove cargo fmt temporarily. * ? * Tmp. * Remove both check + clippy ? * Backporting telemetry. * Backporting 457fb0a1 * Remove PB from git. * Fixing PB with default member backends/client * update TensorRT-LLM to latest version * provided None for api_key * link against libtensorrt_llm and not libtensorrt-llm --------- Co-authored-by: OlivierDehaene <23298448+OlivierDehaene@users.noreply.github.com> Co-authored-by: Morgan Funtowicz <morgan@huggingface.co>
2024-07-31 08:33:10 +00:00
input_chunks: Some(client::Input {
chunks: entry
.request
.inputs
.clone()
.into_iter()
.map(|c| client::InputChunk {
chunk: Some(match c {
Chunk::Text(text) => client::Chunk::Text(text),
Chunk::Image(image) => client::Chunk::Image(client::Image {
data: image.data,
mimetype: image.mimetype,
}),
2024-11-13 13:37:17 +00:00
Chunk::Video(url) => client::Chunk::Video(url),
Rebase TRT-llm (#2331) * wip wip refacto refacto Initial setup for CXX binding to TRTLLM Working FFI call for TGI and TRTLLM backend Remove unused parameters annd force tokenizer name to be set Overall build TRTLLM and deps through CMake build system Enable end to end CMake build First version loading engines and making it ready for inference Remembering to check how we can detect support for chunked context Move to latest TensorRT-LLM version Specify which default log level to use depending on CMake build type make leader executor mode working unconditionally call InitializeBackend on the FFI layer bind to CUDA::nvml to retrieve compute capabilities at runtime updated logic and comment to detect cuda compute capabilities implement the Stream method to send new tokens through a callback use spdlog release 1.14.1 moving forward update trtllm to latest version a96cccafcf6365c128f004f779160951f8c0801c correctly tell cmake to build dependent tensorrt-llm required libraries create cmake install target to put everything relevant in installation folder add auth_token CLI argument to provide hf hub authentification token allow converting huggingface::tokenizers error to TensorRtLlmBackendError use correct include for spdlog include guard to build example in cmakelists working setup of the ffi layer remove fmt import use external fmt lib end to end ffi flow working make sure to track include/ffi.h to trigger rebuild from cargo impl the rust backend which currently cannot move the actual computation in background thread expose shutdown function at ffi layer impl RwLock scenario for TensorRtLllmBackend oops missing c++ backend definitions compute the number of maximum new tokens for each request independently make sure the context is not dropped in the middle of the async decoding. remove unnecessary log add all the necessary plumbery to return the generated content update invalid doc in cpp file correctly forward back the log probabilities remove unneeded scope variable for now refactor Stream impl for Generation to factorise code expose the internal missing start/queue timestamp forward tgi parameters rep/freq penalty add some more validation about grammar not supported define a shared struct to hold the result of a decoding step expose information about potential error happening while decoding remove logging add logging in case of decoding error make sure executor_worker is provided add initial Dockerfile for TRTLLM backend add some more information in CMakeLists.txt to correctly install executorWorker add some more information in CMakeLists.txt to correctly find and install nvrtc wrapper simplify prebuilt trtllm libraries name definition do the same name definition stuff for tensorrt_llm_executor_static leverage pkg-config to probe libraries paths and reuse new install structure from cmake fix bad copy/past missing nvinfer linkage direction align all the linker search dependency add missing pkgconfig folder for MPI in Dockerfile correctly setup linking search path for runtime layer fix missing / before tgi lib path adding missing ld_library_path for cuda stubs in Dockerfile update tgi entrypoint commenting out Python part for TensorRT installation refactored docker image move to TensorRT-LLM v0.11.0 make docker linter happy with same capitalization rule fix typo refactor the compute capabilities detection along with num gpus update TensorRT-LLM to latest version update TensorRT install script to latest update build.rs to link to cuda 12.5 add missing dependant libraries for linking clean up a bit install to decoder_attention target add some custom stuff for nccl linkage fix envvar CARGO_CFG_TARGET_ARCH set at runtime vs compile time use std::env::const::ARCH make sure variable live long enough... look for cuda 12.5 add some more basic info in README.md * Rebase. * Fix autodocs. * Let's try to enable trtllm backend. * Ignore backends/v3 by default. * Fixing client. * Fix makefile + autodocs. * Updating the schema thing + redocly. * Fix trtllm lint. * Adding pb files ? * Remove cargo fmt temporarily. * ? * Tmp. * Remove both check + clippy ? * Backporting telemetry. * Backporting 457fb0a1 * Remove PB from git. * Fixing PB with default member backends/client * update TensorRT-LLM to latest version * provided None for api_key * link against libtensorrt_llm and not libtensorrt-llm --------- Co-authored-by: OlivierDehaene <23298448+OlivierDehaene@users.noreply.github.com> Co-authored-by: Morgan Funtowicz <morgan@huggingface.co>
2024-07-31 08:33:10 +00:00
}),
})
.collect(),
}),
inputs: entry.request.inputs.chunks_to_string(),
truncate: entry.request.truncate,
Lots of improvements (Still 2 allocators) (#2449) * Making prefix/flashinfer the default and testing the full release tests. * Include flashinfer in the docker. * Using prebuilt. * Allowing window_left_size (dummy version). * Disabling flashinfer/prefix caching on odd head_dim * Disable prefix caching for lora. * More specific codes. * Update lock * Updating integration tests with new values with FI/FD. Remove paged as a default too, and using FD everywhere. * Update cargo lock ? * Upgrade to 1.80 because of bitstream... * Everywhere 1.80 * Forgot last default place. * Apply suggestions from code review Co-authored-by: drbh <david.richard.holtz@gmail.com> * Updated flake lock * Tmp * Upgrade resolution system for less errors in resolution. * Remove lambda for cleaner function. * Handling debugger. * OVerride the env in server tests. * Is this enough to make it work ? * This seems to be working. * Downgrade some logs. * Fixing the default for vlm. * Don't enable prefix caching on VLM just yet. * Change `add_special_tokens` in order to have the correct tokens for chat input and not (since it's super important with the prefixing now) * Fixing prefix caching for flashdecoding. * Update all models. * Fixed flashinfer version. * add_special_tokens is internal only * Fixing seqlen with the new vlms. * Fixing the issue with `add_special_tokens` not being passed around. * Fixing the test. * Removing encoder_decoder (seq2seq). * Update the chat test. * Fixing the batching tokenization in flash causal lm. * Truncating left for radix purposes. * Oops this doesn't belong here. * Put back default pure shell. * Update server tests - Default to throughput test in k6 - Use TGI_WIGGLE_ROOM to adjust wiggle room * Only n_heads / process_group.size() are necessary. * Revert the integrationt tests change (seem linked to head_size modification). * Adding error message when assert is violated. * Fixing the free algorithm to handle times where the common prefix is smaller. * Apply suggestions from code review Co-authored-by: OlivierDehaene <olivier@huggingface.co> * Update server/text_generation_server/layers/attention/common.py Co-authored-by: OlivierDehaene <olivier@huggingface.co> * Fix disabling prefix caching - Fix windowing checks. * Revert the Cohere tokenizer change (for now using a revision instead). * Fmt. --------- Co-authored-by: drbh <david.richard.holtz@gmail.com> Co-authored-by: OlivierDehaene <olivier@huggingface.co>
2024-08-29 14:29:01 +00:00
add_special_tokens: entry.request.add_special_tokens,
parameters: Some(NextTokenChooserParameters::from(
entry.request.parameters.clone(),
)),
stopping_parameters: Some(StoppingCriteriaParameters::from(
entry.request.stopping_parameters.clone(),
)),
Rebased #617 (#868) # What does this PR do? <!-- Congratulations! You've made it this far! You're not quite done yet though. Once merged, your PR is going to appear in the release notes with the title you set, so make sure it's a great title that fully reflects the extent of your awesome contribution. Then, please replace this with a description of the change and which issue is fixed (if applicable). Please also include relevant motivation and context. List any dependencies (if any) that are required for this change. Once you're done, someone will review your PR shortly (see the section "Who can review?" below to tag some potential reviewers). They may suggest changes to make the code even better. If no one reviewed your PR after a week has passed, don't hesitate to post a new comment @-mentioning the same persons---sometimes notifications get lost. --> <!-- Remove if not applicable --> Fixes # (issue) ## Before submitting - [ ] This PR fixes a typo or improves the docs (you can dismiss the other checks if that's the case). - [ ] Did you read the [contributor guideline](https://github.com/huggingface/transformers/blob/main/CONTRIBUTING.md#start-contributing-pull-requests), Pull Request section? - [ ] Was this discussed/approved via a Github issue or the [forum](https://discuss.huggingface.co/)? Please add a link to it if that's the case. - [ ] Did you make sure to update the documentation with your changes? Here are the [documentation guidelines](https://github.com/huggingface/transformers/tree/main/docs), and [here are tips on formatting docstrings](https://github.com/huggingface/transformers/tree/main/docs#writing-source-documentation). - [ ] Did you write any new necessary tests? ## Who can review? Anyone in the community is free to review the PR once the tests have passed. Feel free to tag members/contributors who may be interested in your PR. <!-- Your PR will be replied to more quickly if you can figure out the right person to tag with @ @OlivierDehaene OR @Narsil --> --------- Co-authored-by: Vincent Brouwers <vincent.brouwers@ing.com>
2023-08-28 09:43:47 +00:00
top_n_tokens: entry.request.top_n_tokens,
blocks,
slots,
cache_len: prefix_len,
Enable multiple LoRa adapters (#2010) * feat: first draft load multiple lora * feat: load weights within layer and refactor lora pass * fix: refactor and reduce lora math * feat: baseline impl single request multi lora support * feat: prefer lorax implementation and port loading logic * fix: prefer adapter_data and refactors * feat: perfer loraxs custom punica kernels and add mlp loras * fix: adjust batch for bgmv * fix: adjust adapter_segments logic when in batch * fix: refactor and move changes to v3 proto * fix: pass model_id for all flash causal lms * fix: pass model_id for all causal and seq2seq lms * fix: add model_id to model test * feat: add lora support to mistral and refactors * feat: prefer model id in request * fix: include rust code for adapter id * feat: bump launcher and add new lora docs * feat: support base model generation and refactors * fix: rename doc to retry ci build * feat: support if vlm models * fix: add adapter_data param and avoid missing layers * fix: add adapter_data param to phi and neox * fix: update all models forwards to include adapter_data * fix: add model_id to IdeficsCausalLM * Update lora.md Fixed a typo * Update lora.md Fixing spam image * fix: add lora kernel to dockerfile, support running without kernels and refactors * fix: avoid dockerfile conflict * fix: refactors and adjust flash llama lora logic * fix: skip llama test due to CI issue (temp) * fix: skip llama test CI (temp) 2 * fix: revert skips and prefer updated ci token for tests * fix: refactors and helpful comments * fix: add noop in TensorParallelAdapterRowLinear too * fix: refactor and move shard_lora_weights logic * fix: exit early if no adapter_data --------- Co-authored-by: Derek <datavistics@gmail.com>
2024-06-25 18:46:27 +00:00
adapter_id: entry.request.adapter_id.clone(),
chunk_len,
});
// Set batch_time
entry.batch_time = Some(Instant::now());
// Insert in batch_entries IntMap
batch_entries.insert(id, entry);
}
// Final batch size
let size = batch_requests.len() as u32;
next_batch_span.record("batch_size", size);
let batch = Batch {
id: self.next_batch_id,
requests: batch_requests,
size,
max_tokens: (prefill_tokens + decode_tokens),
max_blocks,
};
// Increment batch id
self.next_batch_id += 1;
metrics::histogram!("tgi_batch_next_size").record(batch.size as f64);
2023-02-13 12:02:45 +00:00
Some((batch_entries, batch, next_batch_span))
}
}
2023-02-13 12:02:45 +00:00
type NextBatch = (IntMap<u64, Entry>, Batch, Span);
#[derive(Debug)]
enum QueueCommand {
Append(Box<Entry>, Span),
NextBatch {
min_size: Option<usize>,
max_size: Option<usize>,
prefill_token_budget: u32,
token_budget: u32,
response_sender: oneshot::Sender<Option<NextBatch>>,
2023-02-13 12:02:45 +00:00
span: Span,
},
}
impl From<ValidParameters> for NextTokenChooserParameters {
fn from(value: ValidParameters) -> Self {
let (grammar, grammar_type) = match value.grammar {
None => (String::new(), GrammarType::None),
Some(grammar) => match grammar {
ValidGrammar::Json(grammar_string) => (grammar_string, GrammarType::Json),
ValidGrammar::Regex(grammar_string) => (grammar_string, GrammarType::Regex),
},
};
Self {
temperature: value.temperature,
top_k: value.top_k,
top_p: value.top_p,
typical_p: value.typical_p,
do_sample: value.do_sample,
seed: value.seed,
repetition_penalty: value.repetition_penalty,
frequency_penalty: value.frequency_penalty,
watermark: value.watermark,
grammar,
grammar_type: grammar_type.into(),
}
}
}
impl From<ValidStoppingParameters> for StoppingCriteriaParameters {
fn from(value: ValidStoppingParameters) -> Self {
Self {
max_new_tokens: value.max_new_tokens,
stop_sequences: value.stop_sequences,
ignore_eos_token: value.ignore_eos_token,
}
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::*;
2023-02-13 12:02:45 +00:00
use tracing::info_span;
fn default_entry() -> (
Entry,
2023-10-23 13:51:12 +00:00
mpsc::UnboundedReceiver<Result<InferStreamResponse, InferError>>,
) {
2023-10-23 13:51:12 +00:00
let (response_tx, receiver_tx) = mpsc::unbounded_channel();
let entry = Entry {
request: ValidGenerateRequest {
inputs: vec![],
input_ids: Some(Arc::new(vec![])),
input_length: 1,
Lots of improvements (Still 2 allocators) (#2449) * Making prefix/flashinfer the default and testing the full release tests. * Include flashinfer in the docker. * Using prebuilt. * Allowing window_left_size (dummy version). * Disabling flashinfer/prefix caching on odd head_dim * Disable prefix caching for lora. * More specific codes. * Update lock * Updating integration tests with new values with FI/FD. Remove paged as a default too, and using FD everywhere. * Update cargo lock ? * Upgrade to 1.80 because of bitstream... * Everywhere 1.80 * Forgot last default place. * Apply suggestions from code review Co-authored-by: drbh <david.richard.holtz@gmail.com> * Updated flake lock * Tmp * Upgrade resolution system for less errors in resolution. * Remove lambda for cleaner function. * Handling debugger. * OVerride the env in server tests. * Is this enough to make it work ? * This seems to be working. * Downgrade some logs. * Fixing the default for vlm. * Don't enable prefix caching on VLM just yet. * Change `add_special_tokens` in order to have the correct tokens for chat input and not (since it's super important with the prefixing now) * Fixing prefix caching for flashdecoding. * Update all models. * Fixed flashinfer version. * add_special_tokens is internal only * Fixing seqlen with the new vlms. * Fixing the issue with `add_special_tokens` not being passed around. * Fixing the test. * Removing encoder_decoder (seq2seq). * Update the chat test. * Fixing the batching tokenization in flash causal lm. * Truncating left for radix purposes. * Oops this doesn't belong here. * Put back default pure shell. * Update server tests - Default to throughput test in k6 - Use TGI_WIGGLE_ROOM to adjust wiggle room * Only n_heads / process_group.size() are necessary. * Revert the integrationt tests change (seem linked to head_size modification). * Adding error message when assert is violated. * Fixing the free algorithm to handle times where the common prefix is smaller. * Apply suggestions from code review Co-authored-by: OlivierDehaene <olivier@huggingface.co> * Update server/text_generation_server/layers/attention/common.py Co-authored-by: OlivierDehaene <olivier@huggingface.co> * Fix disabling prefix caching - Fix windowing checks. * Revert the Cohere tokenizer change (for now using a revision instead). * Fmt. --------- Co-authored-by: drbh <david.richard.holtz@gmail.com> Co-authored-by: OlivierDehaene <olivier@huggingface.co>
2024-08-29 14:29:01 +00:00
add_special_tokens: true,
truncate: 0,
decoder_input_details: false,
parameters: ValidParameters {
temperature: 0.0,
top_k: 0,
top_p: 0.0,
typical_p: 0.0,
do_sample: false,
seed: 0,
repetition_penalty: 0.0,
frequency_penalty: 0.0,
watermark: false,
grammar: None,
},
stopping_parameters: ValidStoppingParameters {
ignore_eos_token: false,
max_new_tokens: 1,
max_total_new_tokens: 1024,
stop_sequences: vec![],
},
Rebased #617 (#868) # What does this PR do? <!-- Congratulations! You've made it this far! You're not quite done yet though. Once merged, your PR is going to appear in the release notes with the title you set, so make sure it's a great title that fully reflects the extent of your awesome contribution. Then, please replace this with a description of the change and which issue is fixed (if applicable). Please also include relevant motivation and context. List any dependencies (if any) that are required for this change. Once you're done, someone will review your PR shortly (see the section "Who can review?" below to tag some potential reviewers). They may suggest changes to make the code even better. If no one reviewed your PR after a week has passed, don't hesitate to post a new comment @-mentioning the same persons---sometimes notifications get lost. --> <!-- Remove if not applicable --> Fixes # (issue) ## Before submitting - [ ] This PR fixes a typo or improves the docs (you can dismiss the other checks if that's the case). - [ ] Did you read the [contributor guideline](https://github.com/huggingface/transformers/blob/main/CONTRIBUTING.md#start-contributing-pull-requests), Pull Request section? - [ ] Was this discussed/approved via a Github issue or the [forum](https://discuss.huggingface.co/)? Please add a link to it if that's the case. - [ ] Did you make sure to update the documentation with your changes? Here are the [documentation guidelines](https://github.com/huggingface/transformers/tree/main/docs), and [here are tips on formatting docstrings](https://github.com/huggingface/transformers/tree/main/docs#writing-source-documentation). - [ ] Did you write any new necessary tests? ## Who can review? Anyone in the community is free to review the PR once the tests have passed. Feel free to tag members/contributors who may be interested in your PR. <!-- Your PR will be replied to more quickly if you can figure out the right person to tag with @ @OlivierDehaene OR @Narsil --> --------- Co-authored-by: Vincent Brouwers <vincent.brouwers@ing.com>
2023-08-28 09:43:47 +00:00
top_n_tokens: 0,
Enable multiple LoRa adapters (#2010) * feat: first draft load multiple lora * feat: load weights within layer and refactor lora pass * fix: refactor and reduce lora math * feat: baseline impl single request multi lora support * feat: prefer lorax implementation and port loading logic * fix: prefer adapter_data and refactors * feat: perfer loraxs custom punica kernels and add mlp loras * fix: adjust batch for bgmv * fix: adjust adapter_segments logic when in batch * fix: refactor and move changes to v3 proto * fix: pass model_id for all flash causal lms * fix: pass model_id for all causal and seq2seq lms * fix: add model_id to model test * feat: add lora support to mistral and refactors * feat: prefer model id in request * fix: include rust code for adapter id * feat: bump launcher and add new lora docs * feat: support base model generation and refactors * fix: rename doc to retry ci build * feat: support if vlm models * fix: add adapter_data param and avoid missing layers * fix: add adapter_data param to phi and neox * fix: update all models forwards to include adapter_data * fix: add model_id to IdeficsCausalLM * Update lora.md Fixed a typo * Update lora.md Fixing spam image * fix: add lora kernel to dockerfile, support running without kernels and refactors * fix: avoid dockerfile conflict * fix: refactors and adjust flash llama lora logic * fix: skip llama test due to CI issue (temp) * fix: skip llama test CI (temp) 2 * fix: revert skips and prefer updated ci token for tests * fix: refactors and helpful comments * fix: add noop in TensorParallelAdapterRowLinear too * fix: refactor and move shard_lora_weights logic * fix: exit early if no adapter_data --------- Co-authored-by: Derek <datavistics@gmail.com>
2024-06-25 18:46:27 +00:00
adapter_id: None,
},
response_tx,
2023-02-13 12:02:45 +00:00
span: info_span!("entry"),
temp_span: None,
queue_time: Instant::now(),
batch_time: None,
block_allocation: None,
};
(entry, receiver_tx)
}
#[tokio::test]
async fn test_append() {
let mut state = State::new(false, 1, false, None, 0, 16, false);
let (entry, _guard) = default_entry();
assert_eq!(state.next_id, 0);
assert_eq!(state.entries.len(), 0);
state.append(entry);
assert_eq!(state.next_id, 1);
assert_eq!(state.entries.len(), 1);
let (id, _) = state.entries.remove(0).unwrap();
assert_eq!(id, 0);
}
#[tokio::test]
async fn test_next_batch_empty() {
let mut state = State::new(false, 1, false, None, 0, 16, false);
assert!(state.next_batch(None, None, 1, 1).await.is_none());
assert!(state.next_batch(Some(1), None, 1, 1).await.is_none());
}
#[tokio::test]
async fn test_next_batch_min_size() {
let mut state = State::new(false, 1, false, None, 0, 16, false);
let (entry1, _guard1) = default_entry();
let (entry2, _guard2) = default_entry();
state.append(entry1);
state.append(entry2);
let (entries, batch, _) = state.next_batch(None, None, 2, 2).await.unwrap();
assert_eq!(entries.len(), 2);
assert!(entries.contains_key(&0));
assert!(entries.contains_key(&1));
assert!(entries.get(&0).unwrap().batch_time.is_some());
assert!(entries.get(&1).unwrap().batch_time.is_some());
assert_eq!(batch.id, 0);
assert_eq!(batch.size, 2);
assert_eq!(state.next_id, 2);
assert_eq!(state.entries.len(), 0);
assert_eq!(state.next_batch_id, 1);
let (entry3, _guard3) = default_entry();
state.append(entry3);
assert!(state.next_batch(Some(2), None, 2, 2).await.is_none());
assert_eq!(state.next_id, 3);
assert_eq!(state.entries.len(), 1);
let (id, _) = state.entries.remove(0).unwrap();
assert_eq!(id, 2);
}
#[tokio::test]
async fn test_next_batch_max_size() {
let mut state = State::new(false, 1, false, None, 0, 16, false);
let (entry1, _guard1) = default_entry();
let (entry2, _guard2) = default_entry();
state.append(entry1);
state.append(entry2);
let (entries, batch, _) = state.next_batch(None, Some(1), 2, 2).await.unwrap();
assert_eq!(entries.len(), 1);
assert!(entries.contains_key(&0));
assert!(entries.get(&0).unwrap().batch_time.is_some());
assert_eq!(batch.id, 0);
assert_eq!(batch.size, 1);
assert_eq!(state.next_id, 2);
assert_eq!(state.entries.len(), 1);
assert_eq!(state.next_batch_id, 1);
}
#[tokio::test]
async fn test_next_batch_token_budget() {
let mut state = State::new(false, 1, false, None, 0, 16, false);
let (entry1, _guard1) = default_entry();
let (entry2, _guard2) = default_entry();
state.append(entry1);
state.append(entry2);
let (entries, batch, _) = state.next_batch(None, None, 1, 1).await.unwrap();
assert_eq!(entries.len(), 1);
assert!(entries.contains_key(&0));
assert_eq!(batch.id, 0);
assert_eq!(batch.size, 1);
assert_eq!(state.next_id, 2);
assert_eq!(state.entries.len(), 1);
assert_eq!(state.next_batch_id, 1);
let (entry3, _guard3) = default_entry();
state.append(entry3);
let (entries, batch, _) = state.next_batch(None, None, 3, 3).await.unwrap();
assert_eq!(entries.len(), 2);
assert!(entries.contains_key(&1));
assert!(entries.contains_key(&2));
assert_eq!(batch.id, 1);
assert_eq!(batch.size, 2);
assert_eq!(state.next_id, 3);
assert_eq!(state.entries.len(), 0);
assert_eq!(state.next_batch_id, 2);
}
#[tokio::test]
async fn test_queue_append() {
let queue = Queue::new(false, 1, false, None, 0, 16, false);
let (entry, _guard) = default_entry();
queue.append(entry);
}
#[tokio::test]
async fn test_queue_next_batch_empty() {
let queue = Queue::new(false, 1, false, None, 0, 16, false);
assert!(queue.next_batch(None, None, 1, 1).await.is_none());
assert!(queue.next_batch(Some(1), None, 1, 1).await.is_none());
}
#[tokio::test]
async fn test_queue_next_batch_min_size() {
let queue = Queue::new(false, 1, false, None, 0, 16, false);
let (entry1, _guard1) = default_entry();
let (entry2, _guard2) = default_entry();
queue.append(entry1);
queue.append(entry2);
let (entries, batch, _) = queue.next_batch(None, None, 2, 2).await.unwrap();
assert_eq!(entries.len(), 2);
assert!(entries.contains_key(&0));
assert!(entries.contains_key(&1));
assert!(entries.get(&0).unwrap().batch_time.is_some());
assert!(entries.get(&1).unwrap().batch_time.is_some());
assert_eq!(batch.id, 0);
assert_eq!(batch.size, 2);
let (entry3, _guard3) = default_entry();
queue.append(entry3);
// Not enough requests pending
assert!(queue.next_batch(Some(2), None, 2, 2).await.is_none());
// Not enough token budget
assert!(queue.next_batch(Some(1), None, 0, 0).await.is_none());
// Ok
let (entries2, batch2, _) = queue.next_batch(Some(1), None, 2, 2).await.unwrap();
assert_eq!(entries2.len(), 1);
assert!(entries2.contains_key(&2));
assert!(entries2.get(&2).unwrap().batch_time.is_some());
assert_eq!(batch2.id, 1);
assert_eq!(batch2.size, 1);
}
#[tokio::test]
async fn test_queue_next_batch_max_size() {
let queue = Queue::new(false, 1, false, None, 0, 16, false);
let (entry1, _guard1) = default_entry();
let (entry2, _guard2) = default_entry();
queue.append(entry1);
queue.append(entry2);
let (entries, batch, _) = queue.next_batch(None, Some(1), 2, 2).await.unwrap();
assert_eq!(entries.len(), 1);
assert!(entries.contains_key(&0));
assert!(entries.get(&0).unwrap().batch_time.is_some());
assert_eq!(batch.id, 0);
assert_eq!(batch.size, 1);
}
#[tokio::test]
async fn test_queue_next_batch_token_budget() {
let queue = Queue::new(false, 1, false, None, 0, 16, false);
let (entry1, _guard1) = default_entry();
let (entry2, _guard2) = default_entry();
queue.append(entry1);
queue.append(entry2);
let (entries, batch, _) = queue.next_batch(None, None, 1, 1).await.unwrap();
assert_eq!(entries.len(), 1);
assert!(entries.contains_key(&0));
assert_eq!(batch.id, 0);
assert_eq!(batch.size, 1);
let (entry3, _guard3) = default_entry();
queue.append(entry3);
let (entries, batch, _) = queue.next_batch(None, None, 3, 3).await.unwrap();
assert_eq!(entries.len(), 2);
assert!(entries.contains_key(&1));
assert!(entries.contains_key(&2));
assert_eq!(batch.id, 1);
assert_eq!(batch.size, 2);
}
2023-12-11 11:46:30 +00:00
#[tokio::test]
async fn test_queue_next_batch_token_speculate() {
let queue = Queue::new(true, 1, false, None, 2, 16, false);
2023-12-11 11:46:30 +00:00
let (entry1, _guard1) = default_entry();
let (entry2, _guard2) = default_entry();
queue.append(entry1);
queue.append(entry2);
// Budget of 1 is not enough
assert!(queue.next_batch(None, None, 1, 1).await.is_none());
2023-12-11 11:46:30 +00:00
let (entries, batch, _) = queue.next_batch(None, None, 6, 6).await.unwrap();
2023-12-11 11:46:30 +00:00
assert_eq!(entries.len(), 2);
assert!(entries.contains_key(&0));
assert!(entries.contains_key(&1));
assert_eq!(batch.id, 0);
assert_eq!(batch.size, 2);
}
#[tokio::test]
async fn test_queue_next_batch_dropped_receiver() {
let queue = Queue::new(false, 1, false, None, 0, 16, false);
let (entry, _) = default_entry();
queue.append(entry);
assert!(queue.next_batch(None, None, 1, 1).await.is_none());
}
}