From 8863f3728c05b30b6106870b19211d719b413fbd Mon Sep 17 00:00:00 2001 From: Antti Kervinen Date: Fri, 7 Feb 2025 15:59:41 +0200 Subject: [PATCH] Fix CPU and memory affinity under external resource management - Fixes CPU affinity when running inference on CPU, and when CPUs are externally managed using taskset, numactl, cgroups, Kubernetes CPU manager, NRI resource policy plugins, for instance. - Detect external CPU management and trust the external CPU manager completely. It is more likely that external manager has the big picture of all other tasks running on the system, their QoS, hardware characteristics, etc. - For instance, do not modify even memory affinity, because the external manager may know better which NUMA node has fastest memory, or which NUMA nodes have enough free memory for this inference. Fixes: #3011 Signed-off-by: Antti Kervinen --- .../models/flash_causal_lm.py | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/server/text_generation_server/models/flash_causal_lm.py b/server/text_generation_server/models/flash_causal_lm.py index f268e499..272385b5 100644 --- a/server/text_generation_server/models/flash_causal_lm.py +++ b/server/text_generation_server/models/flash_causal_lm.py @@ -102,6 +102,39 @@ def get_sliding_windows() -> int: def init_cpu_threads_env(rank_id: int, world_size: int): + import psutil + allowed_cpus = psutil.Process().cpu_affinity() + if len(allowed_cpus) < psutil.cpu_count(logical=True): + _init_cpu_threads_env_use_allowed(rank_id, world_size, allowed_cpus) + else: + _init_cpu_threads_env_use_all(rank_id, world_size) + +def _init_cpu_threads_env_use_allowed(rank_id: int, world_size: int, allowed_cpus: list): + import importlib.util + + if os.getenv("OMP_NUM_THREADS") is None: + num_cpus_per_rank = max(int(len(allowed_cpus) / world_size), 1) + else: + num_cpus_per_rank = min(int(os.getenv("OMP_NUM_THREADS")), len(allowed_cpus)) + + if importlib.util.find_spec("numa") is not None: + import numa + + slice_info = f"slice {rank_id+1}/{world_size} of externally allowed {len(allowed_cpus)} CPUs" + allowed_mems = numa.memory.get_membind_nodes() + cpu_start = num_cpus_per_rank * rank_id + allowed_cpus_for_rank = allowed_cpus[cpu_start : cpu_start + num_cpus_per_rank] + numa.schedule.run_on_cpus(0, *allowed_cpus_for_rank) + effective_allowed_cpus = numa.schedule.get_affinitive_cpus(0) + else: + slice_info = "externally allowed, cannot import numa for slicing" + allowed_mems = "n/a" + effective_allowed_cpus = allowed_cpus + num_threads = num_cpus_per_rank + torch.set_num_threads(num_threads) + logger.info(f"affinity={effective_allowed_cpus} ({slice_info}), membind={allowed_mems}, threads={num_threads}") + +def _init_cpu_threads_env_use_all(rank_id: int, world_size: int): import importlib.util if importlib.util.find_spec("numa") is not None: