feat(backend): bind thread and memory affinity for thread

This commit is contained in:
Morgan Funtowicz 2024-11-21 13:52:38 +01:00
parent 5335bf973b
commit 50c376612c
4 changed files with 101 additions and 30 deletions

View File

@ -12,8 +12,12 @@ const BACKEND_DEPS: [&str; 2] = [CMAKE_LLAMA_CPP_TARGET, CMAKE_LLAMA_CPP_FFI_TAR
macro_rules! probe { macro_rules! probe {
($name: expr, $version: expr) => { ($name: expr, $version: expr) => {
if let Err(_) = pkg_config::probe_library($name) { if let Err(_) = pkg_config::probe_library($name) {
pkg_config::probe_library(&format!("{}-{}", $name, $version)) match pkg_config::probe_library(&format!("{}-{}", $name, $version)) {
.expect(&format!("Failed to locate {}", $name)); Ok(_) => Ok(()),
Err(_) => Err(()),
}
} else {
Ok(())
} }
}; };
} }
@ -53,16 +57,27 @@ fn build_backend(
deps_folder deps_folder
} }
fn build_ffi_layer(deps_folder: &Path, install_prefix: &Path) { fn build_ffi_layer(is_debug: bool, install_prefix: &Path) {
println!("cargo:warning={}", deps_folder.display());
CFG.include_prefix = "backends/llamacpp"; CFG.include_prefix = "backends/llamacpp";
cxx_build::bridge("src/lib.rs")
let mut bridge = cxx_build::bridge("src/lib.rs");
bridge
.static_flag(true) .static_flag(true)
.std("c++23") .std("c++23")
.include(install_prefix.join("include")) .include(install_prefix.join("include"))
.include("csrc") .include("csrc")
.file("csrc/ffi.hpp") .file("csrc/ffi.hpp");
.compile(CMAKE_LLAMA_CPP_FFI_TARGET); // Make sure this target is not the same as cmake above
if is_debug {
bridge.define("TGI_LLAMACPP_BACKEND_DEBUG", "");
}
if probe!("numa", "2.0").is_ok() {
bridge.define("NUMA_AVAILABLE", "");
};
bridge.compile(CMAKE_LLAMA_CPP_FFI_TARGET); // Make sure this target is not the same as cmake above
} }
fn main() { fn main() {
@ -82,11 +97,12 @@ fn main() {
let deps_path = build_backend(is_debug, opt_level, out_dir.as_path(), &install_path); let deps_path = build_backend(is_debug, opt_level, out_dir.as_path(), &install_path);
// Build the FFI layer calling the backend above // Build the FFI layer calling the backend above
build_ffi_layer(&deps_path, &install_path); build_ffi_layer(is_debug, &install_path);
// Emit linkage search path // Emit linkage search path
probe!("ompi", MPI_REQUIRED_VERSION); if probe!("ompi", MPI_REQUIRED_VERSION).is_err() {
probe!("numa", "2.0"); panic!("An implement of MPI is required");
}
// Backend // Backend
BACKEND_DEPS.iter().for_each(|name| { BACKEND_DEPS.iter().for_each(|name| {
@ -97,9 +113,7 @@ fn main() {
println!("cargo:rustc-link-search=native={}", out_dir.display()); println!("cargo:rustc-link-search=native={}", out_dir.display());
let spdlog_linkage_target = if is_debug { "spdlogd" } else { "spdlog" }; let spdlog_linkage_target = if is_debug { "spdlogd" } else { "spdlog" };
let fmt_linkage_target = if is_debug { "fmtd" } else { "fmt" };
println!("cargo:rustc-link-lib=dylib={spdlog_linkage_target}"); println!("cargo:rustc-link-lib=dylib={spdlog_linkage_target}");
println!("cargo:rustc-link-lib=dylib={fmt_linkage_target}");
println!("cargo:rustc-link-lib=dylib=ggml"); println!("cargo:rustc-link-lib=dylib=ggml");
println!("cargo:rustc-link-lib=dylib=llama"); println!("cargo:rustc-link-lib=dylib=llama");

View File

@ -13,8 +13,8 @@ FIND_LIBRARY(NUMA_LIBRARY NAME numa
IF (NUMA_INCLUDE_DIR AND NUMA_LIBRARY) IF (NUMA_INCLUDE_DIR AND NUMA_LIBRARY)
SET(NUMA_FOUND TRUE) SET(NUMA_FOUND TRUE)
MESSAGE(STATUS "Found numa library: inc=${NUMA_INCLUDE_DIR}, lib=${NUMA_LIBRARY}") MESSAGE(STATUS "Found numa library: inc=${NUMA_INCLUDE_DIR}, lib=${NUMA_LIBRARY}")
add_compile_definitions(NUMA_AVAILABLE)
ELSE () ELSE ()
SET(NUMA_FOUND FALSE) SET(NUMA_FOUND FALSE)
MESSAGE(STATUS "WARNING: Numa library not found.") MESSAGE(STATUS "WARNING: Numa library not found.")
MESSAGE(STATUS "Try: 'sudo apt-get install libnuma libnuma-dev' (or sudo yum install numactl numactl-devel)")
ENDIF () ENDIF ()

View File

@ -17,7 +17,12 @@
#include <spdlog/fmt/ranges.h> #include <spdlog/fmt/ranges.h>
#include <spdlog/fmt/std.h> #include <spdlog/fmt/std.h>
#ifdef NUMA_AVAILABLE
#define CURRENT_THREAD 0
#include <algorithm>
#include <unordered_set>
#include <numa.h> #include <numa.h>
#endif
namespace huggingface::tgi::backends::llamacpp { namespace huggingface::tgi::backends::llamacpp {
class llama_cpp_worker_frontend_t; class llama_cpp_worker_frontend_t;
@ -84,6 +89,10 @@ namespace huggingface::tgi::backends::llamacpp {
}; };
std::unique_ptr<llama_cpp_worker_frontend_t> create_worker_frontend(rust::Str modelPath) { std::unique_ptr<llama_cpp_worker_frontend_t> create_worker_frontend(rust::Str modelPath) {
#ifdef TGI_LLAMACPP_BACKEND_DEBUG
spdlog::set_level(spdlog::level::debug);
#endif
// Initialize the numa context from numactl // Initialize the numa context from numactl
static const bool INITIALIZED_NUMA_CONTEXT_ONCE = [](){ static const bool INITIALIZED_NUMA_CONTEXT_ONCE = [](){
llama_numa_init(GGML_NUMA_STRATEGY_NUMACTL); llama_numa_init(GGML_NUMA_STRATEGY_NUMACTL);
@ -99,21 +108,70 @@ namespace huggingface::tgi::backends::llamacpp {
return std::make_unique<llama_cpp_worker_frontend_t>(model); return std::make_unique<llama_cpp_worker_frontend_t>(model);
} }
struct numa_cpumask_deleter { void operator()(struct bitmask* cpumask){ numa_free_cpumask(cpumask); }};
typedef std::unique_ptr<struct bitmask, numa_cpumask_deleter> unique_cpumask_ptr;
void set_numactl_core_affinity(rust::Slice<const size_t> affinity) { void set_numactl_core_affinity(rust::Slice<const size_t> affinity) {
SPDLOG_INFO("Setting numactl cores affinity to {} for thread {}", affinity, std::this_thread::get_id()); // void set_numactl_core_affinity(std::vector<size_t> affinity) {
// auto nodes = std::unordered_set<usize>(); #ifdef NUMA_AVAILABLE
auto cpumask = numa_allocate_cpumask(); if(numa_available()) {
for(auto core : affinity) { SPDLOG_INFO("Setting numactl cores affinity to {} for thread {}", affinity, std::this_thread::get_id());
numa_bitmask_setbit(cpumask, core);
numa_sched_setaffinity(0, cpumask); auto cpumask = unique_cpumask_ptr(numa_allocate_cpumask());
std::ranges::for_each(affinity, [&cpumask](size_t cpu) { numa_bitmask_setbit(cpumask.get(), cpu); });
numa_sched_setaffinity(CURRENT_THREAD, cpumask.get());
// Retrieve some information about the current setup
if(const auto numa_num_nodes = numa_num_configured_nodes(); numa_num_nodes > 1) {
const auto *numa_all_cpus = numa_all_cpus_ptr;
SPDLOG_INFO(FMT_STRING("All CPUs: {:b} (# Nodes: {:d}"), *numa_all_cpus->maskp, numa_num_nodes);
// Retrieve the cpumask specific for the current node
auto cpus_per_node = unique_cpumask_ptr(numa_allocate_cpumask());
// Allocate a set which keeps track of which nodes is being targeted
auto numa_spawning_nodes = std::unordered_set<size_t>();
for(auto node = 0; node < numa_num_nodes; ++node) {
// Retrieve the cpumask for the target node
numa_node_to_cpus(node, cpus_per_node.get());
// intersect which cores on the nodes are targeted, in no one on that specific node
// the value of allocated_cpus_on_node will be 0 as the result of the AND operation.
const auto allocated_cpus_on_node = *cpus_per_node->maskp & *cpumask->maskp;
if(allocated_cpus_on_node > 0) {
// If we have some cores on the node, attempt to insert in the set of targeted node
if(const auto [_, was_inserted] = numa_spawning_nodes.emplace(node); was_inserted) {
SPDLOG_DEBUG("Allocated thread spawning node: {:d}", node);
}
}
// Clear all the bits relative to the current node
numa_bitmask_clearall(cpus_per_node.get());
}
// Bind the memory if we spawn a single node, otherwise, let's display a warning
if(numa_spawning_nodes.size() == 1) {
SPDLOG_INFO(FMT_STRING("Setting memory affinity to node: {:d}"), *numa_spawning_nodes.begin());
numa_set_preferred(*numa_spawning_nodes.begin());
} else {
SPDLOG_WARN(FMT_STRING("Specified thread affinity spawn multiple NUMA nodes: {}"), numa_spawning_nodes);
}
}
#ifdef TGI_LLAMACPP_BACKEND_DEBUG
// Sanity check in the logs...
auto *cpumask_check = numa_allocate_cpumask();
numa_sched_getaffinity(CURRENT_THREAD, cpumask_check);
SPDLOG_DEBUG(
FMT_STRING("numa_sched_affinity for thread {} -> {:b}"),
std::this_thread::get_id(), *cpumask_check->maskp);
numa_free_cpumask(cpumask_check);
#endif
} }
#else
//#ifdef TGI_LLAMACPP_BACKEND_DEBUG SPDLOG_WARN("TGI's llama.cpp backend was compiled without NUMA support");
auto cpumask_check = numa_allocate_cpumask(); #endif
numa_sched_getaffinity(0, cpumask_check);
SPDLOG_DEBUG(FMT_STRING("numa_sched_affinity for thread {} -> {:b}"), std::this_thread::get_id(), *cpumask_check->maskp);
//#endif
} }
} }

View File

@ -5,7 +5,6 @@ use crate::ffi::{
use async_trait::async_trait; use async_trait::async_trait;
use cxx::UniquePtr; use cxx::UniquePtr;
use log::warn; use log::warn;
use std::cell::RefCell;
use std::ops::Range; use std::ops::Range;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::mpsc::{channel, Receiver, Sender};
@ -20,7 +19,7 @@ use text_generation_router::{FinishReason, Token};
use thiserror::Error; use thiserror::Error;
use tokenizers::Tokenizer; use tokenizers::Tokenizer;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::sync::{Semaphore, SemaphorePermit, TryAcquireError}; use tokio::sync::Semaphore;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tokio::time::Instant; use tokio::time::Instant;
use tokio_stream::wrappers::UnboundedReceiverStream; use tokio_stream::wrappers::UnboundedReceiverStream;
@ -180,7 +179,7 @@ fn get_cores_allocation(num_cores_per_instance: usize) -> Vec<Range<usize>> {
}; };
// If we have spare cores, let's see if we can give everyone one more core // If we have spare cores, let's see if we can give everyone one more core
let mut num_instances = cores_count / effective_num_cores_per_instance; let num_instances = cores_count / effective_num_cores_per_instance;
if cores_count - (num_instances * effective_num_cores_per_instance) >= num_instances { if cores_count - (num_instances * effective_num_cores_per_instance) >= num_instances {
effective_num_cores_per_instance = effective_num_cores_per_instance + 1; effective_num_cores_per_instance = effective_num_cores_per_instance + 1;
warn!("Overriding cores allocation to {effective_num_cores_per_instance} per instance"); warn!("Overriding cores allocation to {effective_num_cores_per_instance} per instance");
@ -190,7 +189,7 @@ fn get_cores_allocation(num_cores_per_instance: usize) -> Vec<Range<usize>> {
.map(|ordinal| { .map(|ordinal| {
let start = ordinal * effective_num_cores_per_instance; let start = ordinal * effective_num_cores_per_instance;
let end = (ordinal + 1) * effective_num_cores_per_instance - 1; let end = (ordinal + 1) * effective_num_cores_per_instance - 1;
(start..end) start..end
}) })
.collect() .collect()
} }