This commit is contained in:
Nicolas Patry 2025-01-31 14:43:04 +01:00 committed by GitHub
commit 0d3cb2baa5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 846 additions and 49 deletions

255
Cargo.lock generated
View File

@ -24,11 +24,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"getrandom", "getrandom 0.2.15",
"once_cell", "once_cell",
"serde", "serde",
"version_check", "version_check",
"zerocopy", "zerocopy 0.7.35",
] ]
[[package]] [[package]]
@ -291,7 +291,7 @@ dependencies = [
"http-body 0.4.6", "http-body 0.4.6",
"hyper 0.14.31", "hyper 0.14.31",
"itoa", "itoa",
"matchit", "matchit 0.7.3",
"memchr", "memchr",
"mime", "mime",
"percent-encoding", "percent-encoding",
@ -321,10 +321,10 @@ dependencies = [
"http 1.1.0", "http 1.1.0",
"http-body 1.0.1", "http-body 1.0.1",
"http-body-util", "http-body-util",
"hyper 1.5.1", "hyper 1.5.2",
"hyper-util", "hyper-util",
"itoa", "itoa",
"matchit", "matchit 0.7.3",
"memchr", "memchr",
"mime", "mime",
"percent-encoding", "percent-encoding",
@ -336,7 +336,42 @@ dependencies = [
"serde_urlencoded", "serde_urlencoded",
"sync_wrapper 1.0.2", "sync_wrapper 1.0.2",
"tokio", "tokio",
"tower 0.5.1", "tower 0.5.2",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "axum"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d6fd624c75e18b3b4c6b9caf42b1afe24437daaee904069137d8bab077be8b8"
dependencies = [
"axum-core 0.5.0",
"axum-macros",
"bytes",
"form_urlencoded",
"futures-util",
"http 1.1.0",
"http-body 1.0.1",
"http-body-util",
"hyper 1.5.2",
"hyper-util",
"itoa",
"matchit 0.8.4",
"memchr",
"mime",
"percent-encoding",
"pin-project-lite",
"rustversion",
"serde",
"serde_json",
"serde_path_to_error",
"serde_urlencoded",
"sync_wrapper 1.0.2",
"tokio",
"tower 0.5.2",
"tower-layer", "tower-layer",
"tower-service", "tower-service",
"tracing", "tracing",
@ -380,6 +415,37 @@ dependencies = [
"tracing", "tracing",
] ]
[[package]]
name = "axum-core"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df1362f362fd16024ae199c1970ce98f9661bf5ef94b9808fee734bc3698b733"
dependencies = [
"bytes",
"futures-util",
"http 1.1.0",
"http-body 1.0.1",
"http-body-util",
"mime",
"pin-project-lite",
"rustversion",
"sync_wrapper 1.0.2",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "axum-macros"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "604fde5e028fea851ce1d8570bbdc034bec850d157f7569d10f347d06808c05c"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.89",
]
[[package]] [[package]]
name = "axum-tracing-opentelemetry" name = "axum-tracing-opentelemetry"
version = "0.16.0" version = "0.16.0"
@ -1442,7 +1508,19 @@ checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"libc", "libc",
"wasi", "wasi 0.11.0+wasi-snapshot-preview1",
]
[[package]]
name = "getrandom"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43a49c392881ce6d5c3b8cb70f98717b7c07aabbdff06687b9030dbfbe2725f8"
dependencies = [
"cfg-if",
"libc",
"wasi 0.13.3+wasi-0.2.2",
"windows-targets 0.52.6",
] ]
[[package]] [[package]]
@ -1596,7 +1674,7 @@ dependencies = [
"log", "log",
"native-tls", "native-tls",
"num_cpus", "num_cpus",
"rand", "rand 0.8.5",
"reqwest 0.11.27", "reqwest 0.11.27",
"serde", "serde",
"serde_json", "serde_json",
@ -1719,9 +1797,9 @@ dependencies = [
[[package]] [[package]]
name = "hyper" name = "hyper"
version = "1.5.1" version = "1.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97818827ef4f364230e16705d4706e2897df2bb60617d6ca15d598025a3c481f" checksum = "256fb8d4bd6413123cc9d91832d78325c48ff41677595be797d90f42969beae0"
dependencies = [ dependencies = [
"bytes", "bytes",
"futures-channel", "futures-channel",
@ -1746,7 +1824,7 @@ checksum = "08afdbb5c31130e3034af566421053ab03787c640246a446327f550d11bcb333"
dependencies = [ dependencies = [
"futures-util", "futures-util",
"http 1.1.0", "http 1.1.0",
"hyper 1.5.1", "hyper 1.5.2",
"hyper-util", "hyper-util",
"log", "log",
"rustls 0.23.17", "rustls 0.23.17",
@ -1793,7 +1871,7 @@ dependencies = [
"futures-util", "futures-util",
"http 1.1.0", "http 1.1.0",
"http-body 1.0.1", "http-body 1.0.1",
"hyper 1.5.1", "hyper 1.5.2",
"pin-project-lite", "pin-project-lite",
"socket2", "socket2",
"tokio", "tokio",
@ -2166,6 +2244,22 @@ dependencies = [
"uuid-simd", "uuid-simd",
] ]
[[package]]
name = "kvrouter"
version = "3.0.2-dev0"
dependencies = [
"async-stream",
"axum 0.8.1",
"futures",
"futures-util",
"hyper 1.5.2",
"hyper-util",
"log",
"rand 0.9.0",
"slotmap",
"tokio",
]
[[package]] [[package]]
name = "lazy_static" name = "lazy_static"
version = "1.5.0" version = "1.5.0"
@ -2259,9 +2353,9 @@ dependencies = [
[[package]] [[package]]
name = "log" name = "log"
version = "0.4.22" version = "0.4.25"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" checksum = "04cbf5b083de1c7e0222a7a51dbfdba1cbe1c6ab0b15e29fff3f6c077fd9cd9f"
[[package]] [[package]]
name = "loop9" name = "loop9"
@ -2318,6 +2412,12 @@ version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94"
[[package]]
name = "matchit"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3"
[[package]] [[package]]
name = "maybe-rayon" name = "maybe-rayon"
version = "0.1.1" version = "0.1.1"
@ -2361,7 +2461,7 @@ checksum = "b4f0c8427b39666bf970460908b213ec09b3b350f20c0c2eabcbba51704a08e6"
dependencies = [ dependencies = [
"base64 0.22.1", "base64 0.22.1",
"http-body-util", "http-body-util",
"hyper 1.5.1", "hyper 1.5.2",
"hyper-rustls", "hyper-rustls",
"hyper-util", "hyper-util",
"indexmap 2.6.0", "indexmap 2.6.0",
@ -2450,7 +2550,7 @@ dependencies = [
"hermit-abi 0.3.9", "hermit-abi 0.3.9",
"libc", "libc",
"log", "log",
"wasi", "wasi 0.11.0+wasi-snapshot-preview1",
"windows-sys 0.52.0", "windows-sys 0.52.0",
] ]
@ -2499,7 +2599,7 @@ dependencies = [
"bytes", "bytes",
"futures", "futures",
"pin-project", "pin-project",
"rand", "rand 0.8.5",
"thiserror", "thiserror",
"tokio", "tokio",
"tokio-util", "tokio-util",
@ -2931,7 +3031,7 @@ dependencies = [
"opentelemetry_api", "opentelemetry_api",
"ordered-float 3.9.2", "ordered-float 3.9.2",
"percent-encoding", "percent-encoding",
"rand", "rand 0.8.5",
"regex", "regex",
"serde_json", "serde_json",
"thiserror", "thiserror",
@ -2955,7 +3055,7 @@ dependencies = [
"opentelemetry 0.21.0", "opentelemetry 0.21.0",
"ordered-float 4.5.0", "ordered-float 4.5.0",
"percent-encoding", "percent-encoding",
"rand", "rand 0.8.5",
"thiserror", "thiserror",
] ]
@ -3159,7 +3259,7 @@ version = "0.2.20"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77957b295656769bb8ad2b6a6b09d897d94f05c41b069aede1fcdaa675eaea04" checksum = "77957b295656769bb8ad2b6a6b09d897d94f05c41b069aede1fcdaa675eaea04"
dependencies = [ dependencies = [
"zerocopy", "zerocopy 0.7.35",
] ]
[[package]] [[package]]
@ -3392,7 +3492,7 @@ dependencies = [
"libc", "libc",
"once_cell", "once_cell",
"raw-cpuid", "raw-cpuid",
"wasi", "wasi 0.11.0+wasi-snapshot-preview1",
"web-sys", "web-sys",
"winapi", "winapi",
] ]
@ -3419,8 +3519,19 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
dependencies = [ dependencies = [
"libc", "libc",
"rand_chacha", "rand_chacha 0.3.1",
"rand_core", "rand_core 0.6.4",
]
[[package]]
name = "rand"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3779b94aeb87e8bd4e834cee3650289ee9e0d5677f976ecdb6d219e5f4f6cd94"
dependencies = [
"rand_chacha 0.9.0",
"rand_core 0.9.0",
"zerocopy 0.8.14",
] ]
[[package]] [[package]]
@ -3430,7 +3541,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
dependencies = [ dependencies = [
"ppv-lite86", "ppv-lite86",
"rand_core", "rand_core 0.6.4",
]
[[package]]
name = "rand_chacha"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb"
dependencies = [
"ppv-lite86",
"rand_core 0.9.0",
] ]
[[package]] [[package]]
@ -3439,7 +3560,17 @@ version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c"
dependencies = [ dependencies = [
"getrandom", "getrandom 0.2.15",
]
[[package]]
name = "rand_core"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b08f3c9802962f7e1b25113931d94f43ed9725bebc59db9d0c3e9a23b67e15ff"
dependencies = [
"getrandom 0.3.1",
"zerocopy 0.8.14",
] ]
[[package]] [[package]]
@ -3489,8 +3620,8 @@ dependencies = [
"once_cell", "once_cell",
"paste", "paste",
"profiling", "profiling",
"rand", "rand 0.8.5",
"rand_chacha", "rand_chacha 0.3.1",
"simd_helpers", "simd_helpers",
"system-deps", "system-deps",
"thiserror", "thiserror",
@ -3568,7 +3699,7 @@ version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba009ff324d1fc1b900bd1fdb31564febe58a8ccc8a6fdbb93b543d33b13ca43" checksum = "ba009ff324d1fc1b900bd1fdb31564febe58a8ccc8a6fdbb93b543d33b13ca43"
dependencies = [ dependencies = [
"getrandom", "getrandom 0.2.15",
"libredox", "libredox",
"thiserror", "thiserror",
] ]
@ -3704,7 +3835,7 @@ dependencies = [
"http 1.1.0", "http 1.1.0",
"http-body 1.0.1", "http-body 1.0.1",
"http-body-util", "http-body-util",
"hyper 1.5.1", "hyper 1.5.2",
"hyper-util", "hyper-util",
"ipnet", "ipnet",
"js-sys", "js-sys",
@ -3755,7 +3886,7 @@ checksum = "c17fa4cb658e3583423e915b9f3acc01cceaee1860e33d59ebae66adc3a2dc0d"
dependencies = [ dependencies = [
"cc", "cc",
"cfg-if", "cfg-if",
"getrandom", "getrandom 0.2.15",
"libc", "libc",
"spin 0.9.8", "spin 0.9.8",
"untrusted 0.9.0", "untrusted 0.9.0",
@ -4531,7 +4662,7 @@ dependencies = [
"opentelemetry-otlp", "opentelemetry-otlp",
"outlines-core", "outlines-core",
"pyo3", "pyo3",
"rand", "rand 0.8.5",
"regex", "regex",
"reqwest 0.11.27", "reqwest 0.11.27",
"serde", "serde",
@ -4579,7 +4710,7 @@ dependencies = [
"opentelemetry-otlp", "opentelemetry-otlp",
"prost 0.12.6", "prost 0.12.6",
"prost-build", "prost-build",
"rand", "rand 0.8.5",
"regex", "regex",
"reqwest 0.11.27", "reqwest 0.11.27",
"serde", "serde",
@ -4630,7 +4761,7 @@ dependencies = [
"opentelemetry-otlp", "opentelemetry-otlp",
"prost 0.12.6", "prost 0.12.6",
"prost-build", "prost-build",
"rand", "rand 0.8.5",
"regex", "regex",
"reqwest 0.11.27", "reqwest 0.11.27",
"serde", "serde",
@ -4764,7 +4895,7 @@ dependencies = [
"aho-corasick", "aho-corasick",
"derive_builder", "derive_builder",
"esaxx-rs", "esaxx-rs",
"getrandom", "getrandom 0.2.15",
"hf-hub", "hf-hub",
"indicatif", "indicatif",
"itertools 0.12.1", "itertools 0.12.1",
@ -4774,7 +4905,7 @@ dependencies = [
"monostate", "monostate",
"onig", "onig",
"paste", "paste",
"rand", "rand 0.8.5",
"rayon", "rayon",
"rayon-cond", "rayon-cond",
"regex", "regex",
@ -4844,7 +4975,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f57eb36ecbe0fc510036adff84824dd3c24bb781e21bfa67b69d556aa85214f" checksum = "7f57eb36ecbe0fc510036adff84824dd3c24bb781e21bfa67b69d556aa85214f"
dependencies = [ dependencies = [
"pin-project", "pin-project",
"rand", "rand 0.8.5",
"tokio", "tokio",
] ]
@ -4997,7 +5128,7 @@ dependencies = [
"indexmap 1.9.3", "indexmap 1.9.3",
"pin-project", "pin-project",
"pin-project-lite", "pin-project-lite",
"rand", "rand 0.8.5",
"slab", "slab",
"tokio", "tokio",
"tokio-util", "tokio-util",
@ -5008,14 +5139,14 @@ dependencies = [
[[package]] [[package]]
name = "tower" name = "tower"
version = "0.5.1" version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2873938d487c3cfb9aed7546dc9f2711d867c9f90c46b889989a2cb84eba6b4f" checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9"
dependencies = [ dependencies = [
"futures-core", "futures-core",
"futures-util", "futures-util",
"pin-project-lite", "pin-project-lite",
"sync_wrapper 0.1.2", "sync_wrapper 1.0.2",
"tokio", "tokio",
"tower-layer", "tower-layer",
"tower-service", "tower-service",
@ -5370,8 +5501,8 @@ version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8c5f0a0af699448548ad1a2fbf920fb4bee257eae39953ba95cb84891a0446a" checksum = "f8c5f0a0af699448548ad1a2fbf920fb4bee257eae39953ba95cb84891a0446a"
dependencies = [ dependencies = [
"getrandom", "getrandom 0.2.15",
"rand", "rand 0.8.5",
"uuid-macro-internal", "uuid-macro-internal",
] ]
@ -5479,6 +5610,15 @@ version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "wasi"
version = "0.13.3+wasi-0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26816d2e1a4a36a2940b96c5296ce403917633dff8f3440e9b236ed6f6bacad2"
dependencies = [
"wit-bindgen-rt",
]
[[package]] [[package]]
name = "wasm-bindgen" name = "wasm-bindgen"
version = "0.2.95" version = "0.2.95"
@ -5926,6 +6066,15 @@ dependencies = [
"windows-sys 0.48.0", "windows-sys 0.48.0",
] ]
[[package]]
name = "wit-bindgen-rt"
version = "0.33.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3268f3d866458b787f390cf61f4bbb563b922d091359f9608842999eaee3943c"
dependencies = [
"bitflags 2.6.0",
]
[[package]] [[package]]
name = "write16" name = "write16"
version = "1.0.0" version = "1.0.0"
@ -5975,7 +6124,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0"
dependencies = [ dependencies = [
"byteorder", "byteorder",
"zerocopy-derive", "zerocopy-derive 0.7.35",
]
[[package]]
name = "zerocopy"
version = "0.8.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a367f292d93d4eab890745e75a778da40909cab4d6ff8173693812f79c4a2468"
dependencies = [
"zerocopy-derive 0.8.14",
] ]
[[package]] [[package]]
@ -5989,6 +6147,17 @@ dependencies = [
"syn 2.0.89", "syn 2.0.89",
] ]
[[package]]
name = "zerocopy-derive"
version = "0.8.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3931cb58c62c13adec22e38686b559c86a30565e16ad6e8510a337cedc611e1"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.89",
]
[[package]] [[package]]
name = "zerofrom" name = "zerofrom"
version = "0.1.4" version = "0.1.4"

View File

@ -6,7 +6,8 @@ members = [
"backends/grpc-metadata", "backends/grpc-metadata",
"backends/trtllm", "backends/trtllm",
"launcher", "launcher",
"router" "router",
"kvrouter",
] ]
default-members = [ default-members = [
"benchmark", "benchmark",
@ -15,6 +16,7 @@ default-members = [
"backends/grpc-metadata", "backends/grpc-metadata",
# "backends/trtllm", # "backends/trtllm",
"launcher", "launcher",
"kvrouter",
"router" "router"
] ]
resolver = "2" resolver = "2"
@ -34,6 +36,7 @@ metrics-exporter-prometheus = { version = "0.15.1", features = [] }
minijinja = { version = "2.2.0", features = ["json"] } minijinja = { version = "2.2.0", features = ["json"] }
minijinja-contrib = { version = "2.0.2", features = ["pycompat"] } minijinja-contrib = { version = "2.0.2", features = ["pycompat"] }
pyo3 = { version = "0.22.2", features = ["auto-initialize"] } pyo3 = { version = "0.22.2", features = ["auto-initialize"] }
axum = { version = "0.7", features = ["json"] }
[profile.release] [profile.release]
incremental = true incremental = true

View File

@ -10,6 +10,7 @@ COPY Cargo.toml Cargo.toml
COPY rust-toolchain.toml rust-toolchain.toml COPY rust-toolchain.toml rust-toolchain.toml
COPY proto proto COPY proto proto
COPY benchmark benchmark COPY benchmark benchmark
COPY kvrouter kvrouter
COPY router router COPY router router
COPY backends backends COPY backends backends
COPY launcher launcher COPY launcher launcher

View File

@ -11,6 +11,7 @@ COPY rust-toolchain.toml rust-toolchain.toml
COPY proto proto COPY proto proto
COPY benchmark benchmark COPY benchmark benchmark
COPY router router COPY router router
COPY kvrouter kvrouter
COPY backends backends COPY backends backends
COPY launcher launcher COPY launcher launcher
RUN cargo chef prepare --recipe-path recipe.json RUN cargo chef prepare --recipe-path recipe.json

View File

@ -12,6 +12,7 @@ COPY rust-toolchain.toml rust-toolchain.toml
COPY proto proto COPY proto proto
COPY benchmark benchmark COPY benchmark benchmark
COPY router router COPY router router
COPY kvrouter kvrouter
COPY backends backends COPY backends backends
COPY launcher launcher COPY launcher launcher
RUN cargo chef prepare --recipe-path recipe.json RUN cargo chef prepare --recipe-path recipe.json

View File

@ -92,6 +92,7 @@ COPY rust-toolchain.toml rust-toolchain.toml
COPY router router COPY router router
COPY backends backends COPY backends backends
COPY benchmark benchmark COPY benchmark benchmark
COPY kvrouter kvrouter
COPY launcher launcher COPY launcher launcher
COPY --from=trt-builder /usr/local/tensorrt /usr/local/tensorrt COPY --from=trt-builder /usr/local/tensorrt /usr/local/tensorrt
COPY --from=mpi-builder /usr/local/mpi /usr/local/mpi COPY --from=mpi-builder /usr/local/mpi /usr/local/mpi

View File

@ -60,10 +60,15 @@ impl RadixAllocator {
"Free blocks {} need {n_blocks_needed}", "Free blocks {} need {n_blocks_needed}",
self.free_blocks.len() self.free_blocks.len()
); );
self.free_blocks.extend( let free_blocks = self
self.cache_blocks .cache_blocks
.evict(n_blocks_needed - self.free_blocks.len()), .evict(n_blocks_needed - self.free_blocks.len());
tracing::debug!(
"Freed {} blocks: Now having {} free blocks",
free_blocks.len(),
free_blocks.len() + self.free_blocks.len()
); );
self.free_blocks.extend(free_blocks);
} }
if self.free_blocks.len() >= n_blocks_needed { if self.free_blocks.len() >= n_blocks_needed {
@ -105,7 +110,15 @@ impl Allocator for RadixAllocator {
let suffix_blocks = suffix_len.div_ceil(self.block_size); let suffix_blocks = suffix_len.div_ceil(self.block_size);
let prefix_len_uncached = prefill_tokens.as_ref().map(|p| p.len()).unwrap_or_default();
tracing::info!("Prefix {prefix_len} - Suffix {suffix_len}"); tracing::info!("Prefix {prefix_len} - Suffix {suffix_len}");
metrics::counter!("tgi_cache_hit", "allocator" => "radix")
.increment(prefix_len.try_into().expect("Can convert usize to u64"));
metrics::counter!("tgi_cache_total", "allocator" => "radix").increment(
prefix_len_uncached
.try_into()
.expect("Can convert usize to u64"),
);
match self.alloc_or_reclaim(suffix_blocks as usize) { match self.alloc_or_reclaim(suffix_blocks as usize) {
Some(suffix_blocks) => blocks.extend(suffix_blocks), Some(suffix_blocks) => blocks.extend(suffix_blocks),

18
kvrouter/Cargo.toml Normal file
View File

@ -0,0 +1,18 @@
[package]
name = "kvrouter"
version.workspace = true
edition.workspace = true
authors.workspace = true
homepage.workspace = true
[dependencies]
async-stream = "0.3.6"
axum = { version = "0.8.1", features = ["macros"] }
futures = "0.3.31"
futures-util = "0.3.31"
hyper = { version = "1.5.2", features = ["full"] }
hyper-util = { version = "0.1.10", features = ["full"] }
log = "0.4.25"
rand = "0.9.0"
slotmap = "1.0.7"
tokio = { version = "1.43.0", features = ["macros", "rt-multi-thread"] }

286
kvrouter/src/lib.rs Normal file
View File

@ -0,0 +1,286 @@
use axum::{
body::Body,
extract::{Request, State},
http::uri::Uri,
response::{IntoResponse, Response},
};
use futures_util::stream::StreamExt;
use hyper::StatusCode;
use hyper_util::{client::legacy::connect::HttpConnector, rt::TokioExecutor};
use rand::{rng, Rng};
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::sync::{mpsc, oneshot};
mod trie;
use crate::trie::Trie;
const FACTOR_KEY: &str = "TGI_KVROUTER_FACTOR";
type Client = hyper_util::client::legacy::Client<HttpConnector, Body>;
pub struct ContentAware {
trie: Trie,
}
impl Default for ContentAware {
fn default() -> Self {
Self::new()
}
}
impl ContentAware {
pub fn new() -> Self {
let trie = Trie::new();
Self { trie }
}
}
impl LoadBalancer for ContentAware {
fn next(&mut self, key: &[u8], n_backends: usize) -> usize {
let trie = &mut self.trie;
let (start, stop) = trie.insert(key);
let n = trie.count();
eprintln!(
"Start {start} Stop {stop} N {n} : Key {}",
String::from_utf8_lossy(key)
);
let mut rng = rng();
let x: f32 = rng.random();
println!("Random number is {x:.2}");
let start = (start as f32) / (n as f32);
let stop = (stop as f32) / (n as f32);
let rescaled_x = x * (stop - start) + start;
assert!(rescaled_x >= start);
assert!(rescaled_x <= stop);
assert!(rescaled_x >= 0.0);
assert!(rescaled_x <= 1.0);
println!("Start {start:.2} stop {stop:.2}: rescaled {rescaled_x:.2}");
let n: usize = (rescaled_x * (n_backends as f32)) as usize;
n
}
}
pub struct RoundRobin {
current: AtomicUsize,
}
impl Default for RoundRobin {
fn default() -> Self {
Self::new()
}
}
impl RoundRobin {
pub fn new() -> Self {
let current = AtomicUsize::new(0);
Self { current }
}
}
impl LoadBalancer for RoundRobin {
fn next(&mut self, _key: &[u8], _n_backends: usize) -> usize {
self.current.fetch_add(1, Ordering::Relaxed)
}
}
pub struct OverloadHandler<T: LoadBalancer> {
load_balancer: T,
backends: Vec<String>,
inqueue: Vec<AtomicUsize>,
inflight: Vec<AtomicUsize>,
factor: f32,
rx: Rcv,
}
impl<T: LoadBalancer> OverloadHandler<T> {
pub fn new(load_balancer: T, backends: Vec<String>, rx: Rcv) -> Self {
let inflight = backends.iter().map(|_| AtomicUsize::new(0)).collect();
let inqueue = backends.iter().map(|_| AtomicUsize::new(0)).collect();
let factor: f32 = std::env::var(FACTOR_KEY)
.unwrap_or("1.5".to_string())
.parse()
.unwrap_or(1.5);
Self {
load_balancer,
backends,
factor,
inflight,
inqueue,
rx,
}
}
fn next(&mut self, key: &[u8]) -> String {
// Get the backend URL
let index = self.load_balancer.next(key, self.backends.len());
let n = self.backends.len();
let mut index = index % n;
let mut inflight = self.inflight[index].load(Ordering::Relaxed);
let mut inqueue = self.inqueue[index].load(Ordering::Relaxed);
for i in 0..n {
if (inqueue as f32) <= self.factor * inflight as f32 {
break;
}
if i == 0 {
eprintln!(
"Backend overloaded (queue: {inqueue} inflight {inflight}), jumping ahead"
);
}
index += 1;
index %= self.backends.len();
inflight = self.inflight[index].load(Ordering::Relaxed);
inqueue = self.inflight[index].load(Ordering::Relaxed);
}
let backend = &self.backends[index];
self.inflight[index].fetch_add(1, Ordering::Relaxed);
self.inqueue[index].fetch_add(1, Ordering::Relaxed);
backend.to_string()
}
pub async fn run(&mut self) {
while let Some(msg) = self.rx.recv().await {
eprintln!("Msg {msg:?}");
match msg {
Msg::Next(key, sx) => {
let backend: String = self.next(&key);
eprintln!("Sending back backend {backend}");
if let Err(err) = sx.send(backend) {
eprintln!("Cannot send back result: {err}");
}
}
Msg::Dequeue(backend) => {
let index = self.backends.iter().position(|b| b == &backend);
if let Some(index) = index {
self.inqueue[index].fetch_sub(1, Ordering::Relaxed);
}
}
Msg::Deflight(backend) => {
let index = self.backends.iter().position(|b| b == &backend);
if let Some(index) = index {
self.inflight[index].fetch_sub(1, Ordering::Relaxed);
}
}
Msg::AddBackend(backend) => {
self.backends.push(backend);
self.backends.sort();
}
Msg::RemoveBackend(backend) => {
self.backends.retain(|b| *b == backend);
self.backends.sort();
}
}
}
}
}
pub trait LoadBalancer {
fn next(&mut self, key: &[u8], n_backends: usize) -> usize;
}
#[derive(Debug)]
pub enum Msg {
Next(Vec<u8>, oneshot::Sender<String>),
Dequeue(String),
Deflight(String),
AddBackend(String),
RemoveBackend(String),
}
type Snd = mpsc::Sender<Msg>;
type Rcv = mpsc::Receiver<Msg>;
#[derive(Clone)]
pub struct Communicator {
sender: Snd,
client: Client,
}
impl Communicator {
pub fn new(sender: Snd) -> Self {
let client = hyper_util::client::legacy::Client::<(), ()>::builder(TokioExecutor::new())
.build(HttpConnector::new());
Self { sender, client }
}
async fn dequeue(&self, backend: String) -> Result<(), mpsc::error::SendError<Msg>> {
self.sender.send(Msg::Dequeue(backend)).await
}
async fn deflight(&self, backend: String) -> Result<(), mpsc::error::SendError<Msg>> {
self.sender.send(Msg::Deflight(backend)).await
}
async fn next(&self, key: Vec<u8>) -> Result<String, mpsc::error::SendError<Msg>> {
let (sx, rx) = oneshot::channel();
self.sender.send(Msg::Next(key, sx)).await?;
let backend = rx.await.unwrap();
Ok(backend)
}
}
pub async fn handler(
State(state): State<Communicator>,
req: Request,
) -> Result<Response<Body>, StatusCode> {
// Get the next backend index
let (parts, body) = req.into_parts();
let mut response_stream = body.into_data_stream();
let event = response_stream.next().await;
let key = if let Some(Ok(event)) = &event {
event.to_vec()
} else {
vec![]
};
let backend = state.next(key).await.map_err(|_| StatusCode::BAD_GATEWAY)?;
let response_stream = async_stream::stream! {
let mut response_stream = Box::pin(response_stream);
if let Some(event) = event{
yield event;
}
while let Some(raw_event) = response_stream.next().await {
yield raw_event;
}
};
let body = Body::from_stream(response_stream);
let mut req = Request::from_parts(parts, body);
let path = req.uri().path();
let path_query = req
.uri()
.path_and_query()
.map(|v| v.as_str())
.unwrap_or(path);
let uri = format!("{backend}{path_query}");
eprintln!("Inflight {uri}");
*req.uri_mut() = Uri::try_from(uri).unwrap();
let response = state
.client
.request(req)
.await
.map_err(|_| StatusCode::BAD_GATEWAY)?;
let response = response.into_response();
let (parts, body) = response.into_parts();
let response_stream = body.into_data_stream();
let response_stream = async_stream::stream! {
let mut response_stream = Box::pin(response_stream);
let mut start = true;
while let Some(raw_event) = response_stream.next().await {
if start{
eprintln!("Not inqueue");
state.dequeue(backend.to_string()).await.unwrap();
start = false;
}
yield raw_event;
}
eprintln!("Not inflight");
state.deflight(backend.to_string()).await.unwrap();
};
let body = Body::from_stream(response_stream);
Ok(Response::from_parts(parts, body))
}

47
kvrouter/src/main.rs Normal file
View File

@ -0,0 +1,47 @@
use axum::{
routing::Router,
routing::{get, post},
};
use kvrouter::{handler, Communicator, ContentAware, OverloadHandler, RoundRobin};
#[tokio::main]
async fn main() {
// List of backend servers
let backends = vec![
"http://localhost:8000".to_string(),
// "http://localhost:8001".to_string(),
// "http://localhost:8002".to_string(),
// "http://localhost:8003".to_string(),
];
// Create a new instance of the RoundRobinRouter
println!("Using Content aware");
// Create the Axum router
let (sx, rx) = tokio::sync::mpsc::channel(100);
let communicator = Communicator::new(sx);
tokio::task::spawn(async move {
if std::env::var("TGI_KVROUTER_LB").unwrap_or("".to_string()) == *"roundrobin" {
println!("Using round robin");
let lb = RoundRobin::new();
let mut router = OverloadHandler::new(lb, backends, rx);
router.run().await;
} else {
let lb = ContentAware::new();
let mut router = OverloadHandler::new(lb, backends, rx);
router.run().await;
};
});
let app = Router::new()
.route("/{*key}", get(handler))
.route("/{*key}", post(handler))
.with_state(communicator);
// run it
let listener = tokio::net::TcpListener::bind("127.0.0.1:3000")
.await
.unwrap();
println!("listening on {}", listener.local_addr().unwrap());
axum::serve(listener, app).await.unwrap();
}

257
kvrouter/src/trie.rs Normal file
View File

@ -0,0 +1,257 @@
use std::collections::BTreeMap;
// TODO
#[allow(dead_code)]
#[cfg_attr(test, derive(Debug, PartialEq))]
pub enum Error {
MissingEntry,
}
#[derive(Clone)]
pub struct Trie {
root: Node,
}
#[derive(Clone)]
#[cfg_attr(test, derive(Debug, PartialEq))]
pub struct Node {
content: Vec<u8>,
nelements: usize,
children: BTreeMap<u8, Node>,
}
pub fn mismatch(xs: &[u8], ys: &[u8]) -> usize {
// SIMD
mismatch_chunks::<128>(xs, ys)
}
fn mismatch_chunks<const N: usize>(xs: &[u8], ys: &[u8]) -> usize {
let off = xs
.chunks_exact(N)
.zip(ys.chunks_exact(N))
.take_while(|(x, y)| x == y)
.count()
* N;
off + xs[off..]
.iter()
.zip(&ys[off..])
.take_while(|(x, y)| x == y)
.count()
}
impl Node {
fn new() -> Self {
Self {
content: vec![],
nelements: 0,
children: BTreeMap::new(),
}
}
fn insert(&mut self, data: &[u8], left: usize) -> (usize, usize) {
let (start, stop) = if self.nelements == 0 {
self.content = data.to_vec();
(left, left + 1)
} else {
let mismatch = mismatch(data, &self.content);
if mismatch == self.content.len() {
// Full prefix match, just dive deeper
let (start, stop) = if let Some(c) = data.get(mismatch) {
let left: usize = self
.children
.iter()
.take_while(|(&d, _)| d < *c)
.map(|(_, n)| n.nelements)
.sum();
let next_node = self.children.entry(*c).or_insert(Node::new());
next_node.insert(&data[mismatch..], left)
} else {
(0, self.nelements + 1)
};
(left + start, left + stop)
} else {
// Partial match, split node
let left = self.content[mismatch..].to_vec();
let right = data[mismatch..].to_vec();
let children = std::mem::take(&mut self.children);
let mut children_content = vec![
(left, children, self.nelements),
(right, BTreeMap::new(), 1),
];
children_content.sort_by(|a, b| a.0.cmp(&b.0));
self.content.truncate(mismatch);
self.children.clear();
for (child_content, children, nelements) in children_content {
if !child_content.is_empty() {
let c = child_content[0];
let child = Node {
content: child_content,
nelements,
children,
};
self.children.insert(c, child);
}
}
let c = data[mismatch];
let left: usize = self
.children
.iter()
.take_while(|(&d, _)| d < c)
.map(|(_, n)| n.nelements)
.sum();
(left, left + 1)
}
};
self.nelements += 1;
(start, stop)
}
// TODO
#[allow(dead_code)]
fn remove(&mut self, data: &[u8]) -> Result<(), Error> {
let mismatch = mismatch(data, &self.content);
if mismatch != self.content.len() {
Err(Error::MissingEntry)
} else {
if let Some(c) = data.get(mismatch) {
if let Some(node) = self.children.get_mut(c) {
node.remove(&data[mismatch..])?;
}
}
self.nelements -= 1;
Ok(())
}
}
}
impl Trie {
pub fn new() -> Self {
let root = Node::new();
Self { root }
}
pub fn insert(&mut self, data: &[u8]) -> (usize, usize) {
self.root.insert(data, 0)
}
// TODO
#[allow(dead_code)]
pub fn remove(&mut self, data: &[u8]) -> Result<(), Error> {
self.root.remove(data)
}
pub fn count(&self) -> usize {
self.root.nelements
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn simple() {
let mut trie = Trie::new();
assert_eq!(trie.insert(b"toto"), (0, 1));
assert_eq!(trie.insert(b"tata"), (0, 1));
assert_eq!(trie.root.nelements, 2);
assert_eq!(trie.root.content, b"t");
assert_eq!(trie.root.children.len(), 2);
assert_eq!(
trie.root.children,
BTreeMap::from_iter([
(
b'a',
Node {
nelements: 1,
content: b"ata".to_vec(),
children: BTreeMap::new()
}
),
(
b'o',
Node {
nelements: 1,
content: b"oto".to_vec(),
children: BTreeMap::new()
}
)
])
);
assert_eq!(trie.insert(b"coco"), (0, 1));
assert_eq!(trie.insert(b"zaza"), (3, 4));
assert_eq!(trie.root.nelements, 4);
assert_eq!(trie.root.content, b"");
assert_eq!(trie.root.children.len(), 3);
assert_eq!(
trie.root.children,
BTreeMap::from_iter([
(
b'c',
Node {
nelements: 1,
content: b"coco".to_vec(),
children: BTreeMap::new()
}
),
(
b't',
Node {
nelements: 2,
content: b"t".to_vec(),
children: BTreeMap::from_iter([
(
b'a',
Node {
nelements: 1,
content: b"ata".to_vec(),
children: BTreeMap::new()
}
),
(
b'o',
Node {
nelements: 1,
content: b"oto".to_vec(),
children: BTreeMap::new()
}
)
])
}
),
(
b'z',
Node {
nelements: 1,
content: b"zaza".to_vec(),
children: BTreeMap::new()
}
),
])
);
}
#[test]
fn delete() {
let mut trie = Trie::new();
trie.insert(b"toto");
trie.insert(b"tata");
assert_eq!(trie.root.nelements, 2);
assert_eq!(trie.remove(b"coco"), Err(Error::MissingEntry));
assert_eq!(trie.remove(b"toto"), Ok(()));
assert_eq!(trie.root.nelements, 1);
}
#[test]
fn duplicate() {
let mut trie = Trie::new();
assert_eq!(trie.insert(b"toto"), (0, 1));
assert_eq!(trie.insert(b"toto"), (0, 2));
assert_eq!(trie.root.nelements, 2);
assert_eq!(trie.remove(b"toto"), Ok(()));
assert_eq!(trie.root.nelements, 1);
}
}

View File

@ -50,7 +50,7 @@ export function get_options() {
throughput: { throughput: {
executor: 'shared-iterations', executor: 'shared-iterations',
vus: 100, vus: 100,
iterations: 200, iterations: 500,
maxDuration: '40s', maxDuration: '40s',
}, },
}, },