From f31b8a7fedaed3cf5bbe92b6e8289d95b7b3415a Mon Sep 17 00:00:00 2001 From: Nick Hill Date: Thu, 19 Jan 2023 11:48:32 -0800 Subject: [PATCH] A small simplification and add a few more comments --- router/src/queue.rs | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/router/src/queue.rs b/router/src/queue.rs index 0e323799..c6e520b7 100644 --- a/router/src/queue.rs +++ b/router/src/queue.rs @@ -29,6 +29,7 @@ pub(crate) struct Entry { #[derive(Debug)] pub(crate) struct Queue { receiver: Receiver, + // Staging buffer, filled until max_size is reached buffer: VecDeque, /// Id of the next entry next_id: u64, @@ -44,22 +45,23 @@ impl Queue { /// Get the next batch, blocking until available /// Corresponding entries are added to the entries map + /// Returns None only if the queue has been closed pub(crate) async fn next_batch( &mut self, max_size: usize, entries: &mut IntMap, ) -> Option { - loop { - if self.buffer.is_empty() { - match self.receiver.recv().await { - Some(ent) => self.buffer.push_back(ent), - None => return None, - } - } - if let Some(batch) = self.try_next_batch(1, max_size, entries) { - return Some(batch) + if self.buffer.is_empty() { + // Await on the queue while the buffer is empty + match self.receiver.recv().await { + Some(ent) => self.buffer.push_back(ent), + // Queue closed, we must be shutting down + None => return None, } } + // We have at least one entry in the buffer, try to fill it further up to max_size + // This will always return Some + self.try_next_batch(1, max_size, entries) } /// Get the next batch without blocking @@ -70,6 +72,7 @@ impl Queue { max_size: usize, entries: &mut IntMap, ) -> Option { + // Fill the buffer up to max_size, without waiting while self.buffer.len() < max_size { match self.receiver.try_recv() { Ok(ent) => self.buffer.push_back(ent), @@ -78,12 +81,14 @@ impl Queue { } let len = self.buffer.len(); + // Return None if we didn't reach the minimum requested if len < min_size || len == 0 { - // Can't get minimum return None; } let now = Some(Instant::now()); + // Collect vec of Requests to return in batch, + // moving the entries from the buffer into the provided hashmap let requests = self.buffer.drain(..min(len, max_size)) .map(|mut entry| { let id = self.next_id;