From 083806aa427d935ab1a594ee45adff8e2fe5d02f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Danie=CC=88l=20de=20Kok?= Date: Mon, 15 Jul 2024 13:44:22 +0000 Subject: [PATCH] Traitify the current allocator in preparation for swappable alloc --- router/src/infer/v3/block_allocator.rs | 115 ++++++++++++++++--------- 1 file changed, 72 insertions(+), 43 deletions(-) diff --git a/router/src/infer/v3/block_allocator.rs b/router/src/infer/v3/block_allocator.rs index f6e45849..a135d58c 100644 --- a/router/src/infer/v3/block_allocator.rs +++ b/router/src/infer/v3/block_allocator.rs @@ -1,8 +1,6 @@ -use itertools::Itertools; use std::{ - borrow::BorrowMut, - cmp::{min, Reverse}, - collections::{hash_map::Entry, BTreeMap, BTreeSet, BinaryHeap, HashMap, HashSet}, + cmp::min, + collections::{hash_map::Entry, BTreeSet, HashMap}, hash::{DefaultHasher, Hash, Hasher}, }; use tokio::sync::{mpsc, oneshot}; @@ -80,56 +78,87 @@ async fn block_allocator_task( window_size: Option, mut receiver: mpsc::UnboundedReceiver, ) { - // Block 0 is reserved for health checks - let mut free_blocks: Vec = (1..blocks).collect(); + let mut allocator = SimpleAllocator::new(blocks, block_size, window_size); while let Some(cmd) = receiver.recv().await { match cmd { - BlockAllocatorCommand::Free { blocks } => free_blocks.extend(blocks), + BlockAllocatorCommand::Free { blocks } => allocator.free(blocks), BlockAllocatorCommand::Allocate { tokens, response_sender, } => { - // Apply window size - let (required_blocks, repeats) = { - let (tokens, repeats) = match window_size { - None => (tokens, 1), - Some(window_size) => { - let repeats = (tokens + window_size - 1) / window_size; - let tokens = min(tokens, window_size); - (tokens, repeats as usize) - } - }; - // Pad to a multiple of block size - let required_blocks = (tokens + block_size - 1) / block_size; - (required_blocks, repeats) - }; - - let tokens = tokens as usize; - let allocation = if required_blocks > free_blocks.len() as u32 { - None - } else { - let blocks = - free_blocks.split_off(free_blocks.len() - required_blocks as usize); - let mut slots = Vec::with_capacity( - (required_blocks * block_size * repeats as u32) as usize, - ); - - 'slots: for block_id in blocks.repeat(repeats).iter() { - for s in (block_id * block_size)..((block_id + 1) * block_size) { - slots.push(s); - if slots.len() == tokens { - break 'slots; - } - } - } - Some((blocks, slots)) - }; - response_sender.send(allocation).unwrap(); + response_sender.send(allocator.allocate(tokens)).unwrap(); } } } } +pub trait Allocator { + fn allocate(&mut self, tokens: u32) -> Option<(Vec, Vec)>; + + fn free(&mut self, blocks: Vec); +} + +pub struct SimpleAllocator { + free_blocks: Vec, + block_size: u32, + window_size: Option, +} + +impl SimpleAllocator { + fn new(blocks: u32, block_size: u32, window_size: Option) -> Self { + SimpleAllocator { + block_size, + // Block 0 is reserved for health checks + free_blocks: (1..blocks).collect(), + window_size, + } + } +} + +impl Allocator for SimpleAllocator { + fn allocate(&mut self, tokens: u32) -> Option<(Vec, Vec)> { + // Apply window size + let (required_blocks, repeats) = { + let (tokens, repeats) = match self.window_size { + None => (tokens, 1), + Some(window_size) => { + let repeats = (tokens + window_size - 1) / window_size; + let tokens = min(tokens, window_size); + (tokens, repeats as usize) + } + }; + // Pad to a multiple of block size + let required_blocks = (tokens + self.block_size - 1) / self.block_size; + (required_blocks, repeats) + }; + + let tokens = tokens as usize; + if required_blocks > self.free_blocks.len() as u32 { + None + } else { + let blocks = self + .free_blocks + .split_off(self.free_blocks.len() - required_blocks as usize); + let mut slots = + Vec::with_capacity((required_blocks * self.block_size * repeats as u32) as usize); + + 'slots: for block_id in blocks.repeat(repeats).iter() { + for s in (block_id * self.block_size)..((block_id + 1) * self.block_size) { + slots.push(s); + if slots.len() == tokens { + break 'slots; + } + } + } + Some((blocks, slots)) + } + } + + fn free(&mut self, blocks: Vec) { + self.free_blocks.extend(blocks) + } +} + #[derive(Debug)] enum BlockAllocatorCommand { Free {