fix: incomplete generations w/ single tokens generations and models that did not support chunking (#2770)

* Incomplete generation stream fix (#2754)

entries.len() could > batch.size in prefill, so need to filter as well.

Signed-off-by: Wang, Yi A <yi.a.wang@intel.com>

* entries was wrongly extended for model that did not support chunking

---------

Signed-off-by: Wang, Yi A <yi.a.wang@intel.com>
Co-authored-by: Wang, Yi <yi.a.wang@intel.com>
This commit is contained in:
OlivierDehaene 2024-11-21 17:37:55 +01:00 committed by GitHub
parent 3c54488638
commit 8e0c161d0a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -193,7 +193,7 @@ pub(crate) async fn batching_task(
};
// Try to get a new batch
if let Some((new_entries, new_batch, span)) = queue
if let Some((mut new_entries, new_batch, span)) = queue
.next_batch(min_size, max_size, prefill_token_budget, token_budget)
.await
{
@ -209,11 +209,26 @@ pub(crate) async fn batching_task(
};
counter.increment(1);
}
let cached_batch = if support_chunking {
// Concat current batch to the new one
batches.pop()
let new_cached_batch = if support_chunking {
// Get cached batch
let cached_batch = batches.pop();
// Extend entries with the new entries since the batch will be
// concatenated during the prefill op server side
entries.extend(new_entries);
// Generate one token for both the cached batch and the new batch
let new_cached_batch =
prefill(&mut client, new_batch, cached_batch, &mut entries)
.instrument(span)
.await;
if new_cached_batch.is_none() {
// New cached batch is empty, no work left
break;
}
new_cached_batch
} else {
// Request are waiting only if we don't support chunking
// Request are waiting because we cannot concatenate the batches if the
// model/server does not support chunking
entries.iter_mut().for_each(|(_, entry)| {
// Create a new span to add the info that this entry is waiting
// because a new batch is being computed
@ -224,23 +239,24 @@ pub(crate) async fn batching_task(
// Update entry
entry.temp_span = Some(entry_waiting_span);
});
None
};
entries.extend(new_entries);
// Generate one token for this new batch to have the attention past in cache
let new_cached_batch =
prefill(&mut client, new_batch, cached_batch, &mut entries)
.instrument(span)
.await;
// Generate one token for this new batch to have the attention past in cache
let new_cached_batch =
prefill(&mut client, new_batch, None, &mut new_entries)
.instrument(span)
.await;
if new_cached_batch.is_some() {
// Extend entries
entries.extend(new_entries);
}
new_cached_batch
};
// Reset waiting counter
waiting_tokens = 1;
// Extend current batch with the new batch
if let Some(new_cached_batch) = new_cached_batch {
batches.push(new_cached_batch);
} else if support_chunking {
// New cached batch is empty, no work left
break;
}
}