A small simplification and add a few more comments

This commit is contained in:
Nick Hill 2023-01-19 11:48:32 -08:00
parent d0ccada7c0
commit f31b8a7fed

View File

@ -29,6 +29,7 @@ pub(crate) struct Entry {
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct Queue { pub(crate) struct Queue {
receiver: Receiver<Entry>, receiver: Receiver<Entry>,
// Staging buffer, filled until max_size is reached
buffer: VecDeque<Entry>, buffer: VecDeque<Entry>,
/// Id of the next entry /// Id of the next entry
next_id: u64, next_id: u64,
@ -44,22 +45,23 @@ impl Queue {
/// Get the next batch, blocking until available /// Get the next batch, blocking until available
/// Corresponding entries are added to the entries map /// Corresponding entries are added to the entries map
/// Returns None only if the queue has been closed
pub(crate) async fn next_batch( pub(crate) async fn next_batch(
&mut self, &mut self,
max_size: usize, max_size: usize,
entries: &mut IntMap<u64, Entry>, entries: &mut IntMap<u64, Entry>,
) -> Option<Batch> { ) -> Option<Batch> {
loop { if self.buffer.is_empty() {
if self.buffer.is_empty() { // Await on the queue while the buffer is empty
match self.receiver.recv().await { match self.receiver.recv().await {
Some(ent) => self.buffer.push_back(ent), Some(ent) => self.buffer.push_back(ent),
None => return None, // Queue closed, we must be shutting down
} None => return None,
}
if let Some(batch) = self.try_next_batch(1, max_size, entries) {
return Some(batch)
} }
} }
// We have at least one entry in the buffer, try to fill it further up to max_size
// This will always return Some
self.try_next_batch(1, max_size, entries)
} }
/// Get the next batch without blocking /// Get the next batch without blocking
@ -70,6 +72,7 @@ impl Queue {
max_size: usize, max_size: usize,
entries: &mut IntMap<u64, Entry>, entries: &mut IntMap<u64, Entry>,
) -> Option<Batch> { ) -> Option<Batch> {
// Fill the buffer up to max_size, without waiting
while self.buffer.len() < max_size { while self.buffer.len() < max_size {
match self.receiver.try_recv() { match self.receiver.try_recv() {
Ok(ent) => self.buffer.push_back(ent), Ok(ent) => self.buffer.push_back(ent),
@ -78,12 +81,14 @@ impl Queue {
} }
let len = self.buffer.len(); let len = self.buffer.len();
// Return None if we didn't reach the minimum requested
if len < min_size || len == 0 { if len < min_size || len == 0 {
// Can't get minimum
return None; return None;
} }
let now = Some(Instant::now()); let now = Some(Instant::now());
// Collect vec of Requests to return in batch,
// moving the entries from the buffer into the provided hashmap
let requests = self.buffer.drain(..min(len, max_size)) let requests = self.buffer.drain(..min(len, max_size))
.map(|mut entry| { .map(|mut entry| {
let id = self.next_id; let id = self.next_id;