entries was wrongly extended for model that did not support chunking

This commit is contained in:
OlivierDehaene 2024-11-21 15:24:04 +01:00
parent 4cbba33139
commit 489675b5e5
No known key found for this signature in database
GPG Key ID: BB104D67809DA93C

View File

@ -193,7 +193,7 @@ pub(crate) async fn batching_task(
}; };
// Try to get a new batch // Try to get a new batch
if let Some((new_entries, new_batch, span)) = queue if let Some((mut new_entries, new_batch, span)) = queue
.next_batch(min_size, max_size, prefill_token_budget, token_budget) .next_batch(min_size, max_size, prefill_token_budget, token_budget)
.await .await
{ {
@ -209,11 +209,26 @@ pub(crate) async fn batching_task(
}; };
counter.increment(1); counter.increment(1);
} }
let cached_batch = if support_chunking {
// Concat current batch to the new one let new_cached_batch = if support_chunking {
batches.pop() // Get cached batch
let cached_batch = batches.pop();
// Extend entries with the new entries since the batch will be
// concatenated during the prefill op server side
entries.extend(new_entries);
// Generate one token for both the cached batch and the new batch
let new_cached_batch =
prefill(&mut client, new_batch, cached_batch, &mut entries)
.instrument(span)
.await;
if new_cached_batch.is_none() {
// New cached batch is empty, no work left
break;
}
new_cached_batch
} else { } else {
// Request are waiting only if we don't support chunking // Request are waiting because we cannot concatenate the batches if the
// model/server does not support chunking
entries.iter_mut().for_each(|(_, entry)| { entries.iter_mut().for_each(|(_, entry)| {
// Create a new span to add the info that this entry is waiting // Create a new span to add the info that this entry is waiting
// because a new batch is being computed // because a new batch is being computed
@ -224,23 +239,24 @@ pub(crate) async fn batching_task(
// Update entry // Update entry
entry.temp_span = Some(entry_waiting_span); entry.temp_span = Some(entry_waiting_span);
}); });
None
};
entries.extend(new_entries);
// Generate one token for this new batch to have the attention past in cache // Generate one token for this new batch to have the attention past in cache
let new_cached_batch = let new_cached_batch =
prefill(&mut client, new_batch, cached_batch, &mut entries) prefill(&mut client, new_batch, None, &mut new_entries)
.instrument(span) .instrument(span)
.await; .await;
if new_cached_batch.is_some() {
// Extend entries
entries.extend(new_entries);
}
new_cached_batch
};
// Reset waiting counter // Reset waiting counter
waiting_tokens = 1; waiting_tokens = 1;
// Extend current batch with the new batch // Extend current batch with the new batch
if let Some(new_cached_batch) = new_cached_batch { if let Some(new_cached_batch) = new_cached_batch {
batches.push(new_cached_batch); batches.push(new_cached_batch);
} else if support_chunking {
// New cached batch is empty, no work left
break;
} }
} }
@ -369,6 +385,11 @@ async fn filter_batch(
) -> Option<CachedBatch> { ) -> Option<CachedBatch> {
let mut batch = next_batch?; let mut batch = next_batch?;
// No need to filter
if batch.size as usize == entries.len() {
return Some(batch);
}
let id = batch.id; let id = batch.id;
// Retain only requests that are still in entries // Retain only requests that are still in entries