diff --git a/backends/v3/src/backend.rs b/backends/v3/src/backend.rs index a5c0f512..7ae794a0 100644 --- a/backends/v3/src/backend.rs +++ b/backends/v3/src/backend.rs @@ -193,7 +193,7 @@ pub(crate) async fn batching_task( }; // Try to get a new batch - if let Some((new_entries, new_batch, span)) = queue + if let Some((mut new_entries, new_batch, span)) = queue .next_batch(min_size, max_size, prefill_token_budget, token_budget) .await { @@ -209,11 +209,26 @@ pub(crate) async fn batching_task( }; counter.increment(1); } - let cached_batch = if support_chunking { - // Concat current batch to the new one - batches.pop() + + let new_cached_batch = if support_chunking { + // Get cached batch + let cached_batch = batches.pop(); + // Extend entries with the new entries since the batch will be + // concatenated during the prefill op server side + entries.extend(new_entries); + // Generate one token for both the cached batch and the new batch + let new_cached_batch = + prefill(&mut client, new_batch, cached_batch, &mut entries) + .instrument(span) + .await; + if new_cached_batch.is_none() { + // New cached batch is empty, no work left + break; + } + new_cached_batch } else { - // Request are waiting only if we don't support chunking + // Request are waiting because we cannot concatenate the batches if the + // model/server does not support chunking entries.iter_mut().for_each(|(_, entry)| { // Create a new span to add the info that this entry is waiting // because a new batch is being computed @@ -224,23 +239,24 @@ pub(crate) async fn batching_task( // Update entry entry.temp_span = Some(entry_waiting_span); }); - None - }; - entries.extend(new_entries); - // Generate one token for this new batch to have the attention past in cache - let new_cached_batch = - prefill(&mut client, new_batch, cached_batch, &mut entries) - .instrument(span) - .await; + // Generate one token for this new batch to have the attention past in cache + let new_cached_batch = + prefill(&mut client, new_batch, None, &mut new_entries) + .instrument(span) + .await; + if new_cached_batch.is_some() { + // Extend entries + entries.extend(new_entries); + } + new_cached_batch + }; + // Reset waiting counter waiting_tokens = 1; // Extend current batch with the new batch if let Some(new_cached_batch) = new_cached_batch { batches.push(new_cached_batch); - } else if support_chunking { - // New cached batch is empty, no work left - break; } }