Fixing the updating logic of backends.

This commit is contained in:
Nicolas Patry 2025-04-10 11:04:03 +02:00
parent 18cb4a4221
commit 73d0876f12
No known key found for this signature in database
GPG Key ID: 87B37D879D09DEB4
2 changed files with 33 additions and 20 deletions

View File

@ -172,11 +172,13 @@ impl<T: LoadBalancer> OverloadHandler<T> {
} }
Msg::AddBackend(backend) => { Msg::AddBackend(backend) => {
match self.backends.binary_search(&backend) { match self.backends.binary_search(&backend) {
Ok(pos) => {} // element already in vector @ `pos` Ok(pos) => {
log::warn!("Backend {backend} already exists at pos {pos}");
} // element already in vector @ `pos`
Err(pos) => { Err(pos) => {
self.backends.insert(pos, new_elem) self.backends.insert(pos, backend);
self.inflight.insert(pos, AtomicUsize::new(0)) self.inflight.insert(pos, AtomicUsize::new(0));
self.inqueue.insert(pos, AtomicUsize::new(0)) self.inqueue.insert(pos, AtomicUsize::new(0));
} }
} }
} }
@ -190,19 +192,27 @@ impl<T: LoadBalancer> OverloadHandler<T> {
} }
Msg::SetBackends(mut new_backends) => { Msg::SetBackends(mut new_backends) => {
new_backends.sort(); new_backends.sort();
let (new_backends, new_inflight, new_inqueue): (Vec<_>, Vec<_>, Vec<_>) = self.new_backends.iter().enumerate().map(|(ni, nb)| { let (new_backends, (new_inflight, new_inqueue)): (
if let Some(i) = backends.iter().position(|b| b == nb){ Vec<String>,
let inflight = self.inflight[i]; (Vec<_>, Vec<_>),
let inqueue = self.inqueue[i]; ) = new_backends
(nb, inflight, inqueue) .into_iter()
.map(|nb| {
if let Some(i) = self.backends.iter().position(|b| *b == nb) {
let inflight =
AtomicUsize::new(self.inflight[i].load(Ordering::Relaxed));
let inqueue =
AtomicUsize::new(self.inqueue[i].load(Ordering::Relaxed));
(nb, (inflight, inqueue))
} else { } else {
(nb, AtomicUsize::new(0), AtomicUsize::new(0)) (nb, (AtomicUsize::new(0), AtomicUsize::new(0)))
} }
}).collect(); })
.unzip();
self.backends = new_backends; self.backends = new_backends;
self.inflight = inflight; self.inflight = new_inflight;
self.inqueue = inqueue; self.inqueue = new_inqueue;
} }
} }
} }

View File

@ -8,7 +8,7 @@ use kvrouter::{
}; };
#[tokio::main] #[tokio::main]
async fn main() { async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// List of backend servers // List of backend servers
let backends = vec![ let backends = vec![
// "http://localhost:8000".to_string(), // "http://localhost:8000".to_string(),
@ -24,8 +24,10 @@ async fn main() {
let (sx, rx) = tokio::sync::mpsc::channel(100); let (sx, rx) = tokio::sync::mpsc::channel(100);
let communicator = Communicator::new(sx); let communicator = Communicator::new(sx);
let host = std::env::var("TGI_KVROUTER_HOST").unwrap_or("127.0.0.1"); let host = std::env::var("TGI_KVROUTER_HOST").unwrap_or("127.0.0.1".to_string());
let port : u16= std::env::var("TGI_KVROUTER_PORT").unwrap_or("3000").parse()?; let port: u16 = std::env::var("TGI_KVROUTER_PORT")
.unwrap_or("3000".to_string())
.parse()?;
tokio::task::spawn(async move { tokio::task::spawn(async move {
if std::env::var("TGI_KVROUTER_LB").unwrap_or("".to_string()) == *"roundrobin" { if std::env::var("TGI_KVROUTER_LB").unwrap_or("".to_string()) == *"roundrobin" {
println!("Using round robin"); println!("Using round robin");
@ -46,7 +48,8 @@ async fn main() {
.with_state(communicator); .with_state(communicator);
// run it // run it
let listener = tokio::net::TcpListener::bind((HOST, PORT)).await.unwrap(); let listener = tokio::net::TcpListener::bind((host, port)).await.unwrap();
println!("listening on {}", listener.local_addr().unwrap()); println!("listening on {}", listener.local_addr().unwrap());
axum::serve(listener, app).await.unwrap(); axum::serve(listener, app).await.unwrap();
Ok(())
} }