diff --git a/backends/llamacpp/build.rs b/backends/llamacpp/build.rs index 22726db1..023ccfba 100644 --- a/backends/llamacpp/build.rs +++ b/backends/llamacpp/build.rs @@ -12,8 +12,12 @@ const BACKEND_DEPS: [&str; 2] = [CMAKE_LLAMA_CPP_TARGET, CMAKE_LLAMA_CPP_FFI_TAR macro_rules! probe { ($name: expr, $version: expr) => { if let Err(_) = pkg_config::probe_library($name) { - pkg_config::probe_library(&format!("{}-{}", $name, $version)) - .expect(&format!("Failed to locate {}", $name)); + match pkg_config::probe_library(&format!("{}-{}", $name, $version)) { + Ok(_) => Ok(()), + Err(_) => Err(()), + } + } else { + Ok(()) } }; } @@ -53,16 +57,27 @@ fn build_backend( deps_folder } -fn build_ffi_layer(deps_folder: &Path, install_prefix: &Path) { - println!("cargo:warning={}", deps_folder.display()); +fn build_ffi_layer(is_debug: bool, install_prefix: &Path) { CFG.include_prefix = "backends/llamacpp"; - cxx_build::bridge("src/lib.rs") + + let mut bridge = cxx_build::bridge("src/lib.rs"); + + bridge .static_flag(true) .std("c++23") .include(install_prefix.join("include")) .include("csrc") - .file("csrc/ffi.hpp") - .compile(CMAKE_LLAMA_CPP_FFI_TARGET); // Make sure this target is not the same as cmake above + .file("csrc/ffi.hpp"); + + if is_debug { + bridge.define("TGI_LLAMACPP_BACKEND_DEBUG", ""); + } + + if probe!("numa", "2.0").is_ok() { + bridge.define("NUMA_AVAILABLE", ""); + }; + + bridge.compile(CMAKE_LLAMA_CPP_FFI_TARGET); // Make sure this target is not the same as cmake above } fn main() { @@ -82,11 +97,12 @@ fn main() { let deps_path = build_backend(is_debug, opt_level, out_dir.as_path(), &install_path); // Build the FFI layer calling the backend above - build_ffi_layer(&deps_path, &install_path); + build_ffi_layer(is_debug, &install_path); // Emit linkage search path - probe!("ompi", MPI_REQUIRED_VERSION); - probe!("numa", "2.0"); + if probe!("ompi", MPI_REQUIRED_VERSION).is_err() { + panic!("An implement of MPI is required"); + } // Backend BACKEND_DEPS.iter().for_each(|name| { @@ -97,9 +113,7 @@ fn main() { println!("cargo:rustc-link-search=native={}", out_dir.display()); let spdlog_linkage_target = if is_debug { "spdlogd" } else { "spdlog" }; - let fmt_linkage_target = if is_debug { "fmtd" } else { "fmt" }; println!("cargo:rustc-link-lib=dylib={spdlog_linkage_target}"); - println!("cargo:rustc-link-lib=dylib={fmt_linkage_target}"); println!("cargo:rustc-link-lib=dylib=ggml"); println!("cargo:rustc-link-lib=dylib=llama"); diff --git a/backends/llamacpp/cmake/numa.cmake b/backends/llamacpp/cmake/numa.cmake index 0399b752..94dfddc2 100644 --- a/backends/llamacpp/cmake/numa.cmake +++ b/backends/llamacpp/cmake/numa.cmake @@ -13,8 +13,8 @@ FIND_LIBRARY(NUMA_LIBRARY NAME numa IF (NUMA_INCLUDE_DIR AND NUMA_LIBRARY) SET(NUMA_FOUND TRUE) MESSAGE(STATUS "Found numa library: inc=${NUMA_INCLUDE_DIR}, lib=${NUMA_LIBRARY}") + add_compile_definitions(NUMA_AVAILABLE) ELSE () SET(NUMA_FOUND FALSE) MESSAGE(STATUS "WARNING: Numa library not found.") - MESSAGE(STATUS "Try: 'sudo apt-get install libnuma libnuma-dev' (or sudo yum install numactl numactl-devel)") ENDIF () \ No newline at end of file diff --git a/backends/llamacpp/csrc/ffi.hpp b/backends/llamacpp/csrc/ffi.hpp index 9700f52e..147f81ae 100644 --- a/backends/llamacpp/csrc/ffi.hpp +++ b/backends/llamacpp/csrc/ffi.hpp @@ -17,7 +17,12 @@ #include #include +#ifdef NUMA_AVAILABLE +#define CURRENT_THREAD 0 +#include +#include #include +#endif namespace huggingface::tgi::backends::llamacpp { class llama_cpp_worker_frontend_t; @@ -84,6 +89,10 @@ namespace huggingface::tgi::backends::llamacpp { }; std::unique_ptr create_worker_frontend(rust::Str modelPath) { +#ifdef TGI_LLAMACPP_BACKEND_DEBUG + spdlog::set_level(spdlog::level::debug); +#endif + // Initialize the numa context from numactl static const bool INITIALIZED_NUMA_CONTEXT_ONCE = [](){ llama_numa_init(GGML_NUMA_STRATEGY_NUMACTL); @@ -99,21 +108,70 @@ namespace huggingface::tgi::backends::llamacpp { return std::make_unique(model); } + struct numa_cpumask_deleter { void operator()(struct bitmask* cpumask){ numa_free_cpumask(cpumask); }}; + typedef std::unique_ptr unique_cpumask_ptr; + void set_numactl_core_affinity(rust::Slice affinity) { - SPDLOG_INFO("Setting numactl cores affinity to {} for thread {}", affinity, std::this_thread::get_id()); -// auto nodes = std::unordered_set(); - auto cpumask = numa_allocate_cpumask(); - for(auto core : affinity) { - numa_bitmask_setbit(cpumask, core); - numa_sched_setaffinity(0, cpumask); +// void set_numactl_core_affinity(std::vector affinity) { +#ifdef NUMA_AVAILABLE + if(numa_available()) { + SPDLOG_INFO("Setting numactl cores affinity to {} for thread {}", affinity, std::this_thread::get_id()); + + auto cpumask = unique_cpumask_ptr(numa_allocate_cpumask()); + std::ranges::for_each(affinity, [&cpumask](size_t cpu) { numa_bitmask_setbit(cpumask.get(), cpu); }); + numa_sched_setaffinity(CURRENT_THREAD, cpumask.get()); + + // Retrieve some information about the current setup + if(const auto numa_num_nodes = numa_num_configured_nodes(); numa_num_nodes > 1) { + const auto *numa_all_cpus = numa_all_cpus_ptr; + SPDLOG_INFO(FMT_STRING("All CPUs: {:b} (# Nodes: {:d}"), *numa_all_cpus->maskp, numa_num_nodes); + + // Retrieve the cpumask specific for the current node + auto cpus_per_node = unique_cpumask_ptr(numa_allocate_cpumask()); + + // Allocate a set which keeps track of which nodes is being targeted + auto numa_spawning_nodes = std::unordered_set(); + for(auto node = 0; node < numa_num_nodes; ++node) { + // Retrieve the cpumask for the target node + numa_node_to_cpus(node, cpus_per_node.get()); + + // intersect which cores on the nodes are targeted, in no one on that specific node + // the value of allocated_cpus_on_node will be 0 as the result of the AND operation. + const auto allocated_cpus_on_node = *cpus_per_node->maskp & *cpumask->maskp; + if(allocated_cpus_on_node > 0) { + + // If we have some cores on the node, attempt to insert in the set of targeted node + if(const auto [_, was_inserted] = numa_spawning_nodes.emplace(node); was_inserted) { + SPDLOG_DEBUG("Allocated thread spawning node: {:d}", node); + } + } + + // Clear all the bits relative to the current node + numa_bitmask_clearall(cpus_per_node.get()); + } + + // Bind the memory if we spawn a single node, otherwise, let's display a warning + if(numa_spawning_nodes.size() == 1) { + SPDLOG_INFO(FMT_STRING("Setting memory affinity to node: {:d}"), *numa_spawning_nodes.begin()); + numa_set_preferred(*numa_spawning_nodes.begin()); + } else { + SPDLOG_WARN(FMT_STRING("Specified thread affinity spawn multiple NUMA nodes: {}"), numa_spawning_nodes); + } + } + +#ifdef TGI_LLAMACPP_BACKEND_DEBUG + // Sanity check in the logs... + auto *cpumask_check = numa_allocate_cpumask(); + numa_sched_getaffinity(CURRENT_THREAD, cpumask_check); + SPDLOG_DEBUG( + FMT_STRING("numa_sched_affinity for thread {} -> {:b}"), + std::this_thread::get_id(), *cpumask_check->maskp); + numa_free_cpumask(cpumask_check); +#endif } - -//#ifdef TGI_LLAMACPP_BACKEND_DEBUG - auto cpumask_check = numa_allocate_cpumask(); - numa_sched_getaffinity(0, cpumask_check); - SPDLOG_DEBUG(FMT_STRING("numa_sched_affinity for thread {} -> {:b}"), std::this_thread::get_id(), *cpumask_check->maskp); -//#endif - +#else + SPDLOG_WARN("TGI's llama.cpp backend was compiled without NUMA support"); +#endif } } diff --git a/backends/llamacpp/src/backend.rs b/backends/llamacpp/src/backend.rs index fa5bfbab..1ef959a8 100644 --- a/backends/llamacpp/src/backend.rs +++ b/backends/llamacpp/src/backend.rs @@ -5,7 +5,6 @@ use crate::ffi::{ use async_trait::async_trait; use cxx::UniquePtr; use log::warn; -use std::cell::RefCell; use std::ops::Range; use std::path::{Path, PathBuf}; use std::sync::mpsc::{channel, Receiver, Sender}; @@ -20,7 +19,7 @@ use text_generation_router::{FinishReason, Token}; use thiserror::Error; use tokenizers::Tokenizer; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; -use tokio::sync::{Semaphore, SemaphorePermit, TryAcquireError}; +use tokio::sync::Semaphore; use tokio::task::JoinHandle; use tokio::time::Instant; use tokio_stream::wrappers::UnboundedReceiverStream; @@ -180,7 +179,7 @@ fn get_cores_allocation(num_cores_per_instance: usize) -> Vec> { }; // If we have spare cores, let's see if we can give everyone one more core - let mut num_instances = cores_count / effective_num_cores_per_instance; + let num_instances = cores_count / effective_num_cores_per_instance; if cores_count - (num_instances * effective_num_cores_per_instance) >= num_instances { effective_num_cores_per_instance = effective_num_cores_per_instance + 1; warn!("Overriding cores allocation to {effective_num_cores_per_instance} per instance"); @@ -190,7 +189,7 @@ fn get_cores_allocation(num_cores_per_instance: usize) -> Vec> { .map(|ordinal| { let start = ordinal * effective_num_cores_per_instance; let end = (ordinal + 1) * effective_num_cores_per_instance - 1; - (start..end) + start..end }) .collect() }