mirror of
https://github.com/huggingface/text-generation-inference.git
synced 2025-06-19 15:52:08 +00:00
Change to use notify_one, adjust waiting_tokens accounting
This commit is contained in:
parent
1963f0e1bb
commit
aaf38e5978
@ -70,7 +70,7 @@ impl Batcher {
|
|||||||
|
|
||||||
// Notify the background task that we have a new entry in the database that needs
|
// Notify the background task that we have a new entry in the database that needs
|
||||||
// to be batched
|
// to be batched
|
||||||
self.shared.batching_task.notify_waiters();
|
self.shared.batching_task.notify_one();
|
||||||
|
|
||||||
// Await on the response from the background task
|
// Await on the response from the background task
|
||||||
// We can safely unwrap as the background task will never drop the sender
|
// We can safely unwrap as the background task will never drop the sender
|
||||||
@ -104,10 +104,9 @@ async fn batching_task(
|
|||||||
// Get the next batch from the DB
|
// Get the next batch from the DB
|
||||||
// This batch might be smaller than the maximum batch size if there are not enough requests
|
// This batch might be smaller than the maximum batch size if there are not enough requests
|
||||||
// waiting in the DB
|
// waiting in the DB
|
||||||
let mut waiting_tokens = 0;
|
|
||||||
while let Some((request_ids, batch)) = db.next_batch(None, max_batch_size) {
|
while let Some((request_ids, batch)) = db.next_batch(None, max_batch_size) {
|
||||||
let mut cached_batch = wrap_future(client.generate(batch), request_ids, &db).await;
|
let mut cached_batch = wrap_future(client.generate(batch), request_ids, &db).await;
|
||||||
waiting_tokens += 1;
|
let mut waiting_tokens = 1;
|
||||||
|
|
||||||
// We loop until we do not receive any cached batch from the inference server (== until
|
// We loop until we do not receive any cached batch from the inference server (== until
|
||||||
// all requests have met their stopping criteria)
|
// all requests have met their stopping criteria)
|
||||||
@ -131,11 +130,11 @@ async fn batching_task(
|
|||||||
if let Some((new_request_ids, new_batch)) =
|
if let Some((new_request_ids, new_batch)) =
|
||||||
db.next_batch(min_size, max_batch_size)
|
db.next_batch(min_size, max_batch_size)
|
||||||
{
|
{
|
||||||
// Reset waiting counter
|
|
||||||
waiting_tokens = 0;
|
|
||||||
// Generate one token for this new batch to have the attention past in cache
|
// Generate one token for this new batch to have the attention past in cache
|
||||||
let new_cached_batch =
|
let new_cached_batch =
|
||||||
wrap_future(client.generate(new_batch), new_request_ids, &db).await;
|
wrap_future(client.generate(new_batch), new_request_ids, &db).await;
|
||||||
|
// Reset waiting counter
|
||||||
|
waiting_tokens = 1;
|
||||||
// Extend current batch with the new batch
|
// Extend current batch with the new batch
|
||||||
if let Some(new_cached_batch) = new_cached_batch {
|
if let Some(new_cached_batch) = new_cached_batch {
|
||||||
request_ids.extend(new_cached_batch.requests.iter().map(|req| req.id));
|
request_ids.extend(new_cached_batch.requests.iter().map(|req| req.id));
|
||||||
|
Loading…
Reference in New Issue
Block a user