Add Neuron backend (#3033)

* feat: add neuron backend

* feat(neuron): add server standalone installation

* feat(neuron): add server and integration tests

* fix(neuron): increase ulimit when building image

The base image used to compile the rust components seems to have a low
ulimit for opened files, which leads to errors during compilation.

* test(neuron): merge integration tests and fixtures

* test: add --neuron option

* review: do not use latest tag

* review: remove ureq pinned version

* review: --privileged should be the exception

* feat: add neuron case to build ci

* fix(neuron): export models from container in test fixtures

The neuron tests require models to have been previously exported and
cached on the hub. This is done automatically by the neuron.model
fixture the first time the tests are ran for a specific version.
This fixture used to export the models using optimum-neuron directly,
but this package is not necessarily present on the system.
Instead, it is now done through the neuron TGI itself, since it
contains all the tools required to export the models.
Note that since the CI runs docker in docker (dind) it does not seem
possible to share a volume between the CI container and the container
used to export the model.
For that reason, a specific image with a modified entrypoint is built
on-the-fly when a model export is required.

* refactor: remove sagemaker entry-point

The SageMaker image is built differently anyway.

* fix(neuron): avoid using Levenshtein

* test(neuron): use smaller llama model

* feat(neuron): avoid installing CUDA in image

* test(neuron): no error anymore when requesting too many tokens

* ci: doing a precompilation step (with a different token).

* test(neuron): avoid using image sha when exporting models

We now manually evaluate the apparent hash of the neuron backend by
combining the hash of the neuron backend directory and Dockerfile.
This new hash is used to identify exported neuron models instead of the
image sha.
This has two benefits:
- it changes less frequently (only hwen the neuron backend changes),
  which means less neuron models being pushed to the hub,
- it can be evaluated locally, meaning that running the tests once
  locally will export the models before the CI uses them.

* test(neuron): added a small script to prune test models

---------

Co-authored-by: drbh <david.richard.holtz@gmail.com>
Co-authored-by: Nicolas Patry <patry.nicolas@protonmail.com>
This commit is contained in:
David Corvoysier 2025-02-24 09:10:05 +01:00 committed by GitHub
parent 97c5f7e685
commit c00add9c03
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
35 changed files with 3114 additions and 24 deletions

View File

@ -25,7 +25,7 @@ jobs:
docker_volume: ${{ steps.final.outputs.docker_volume }} docker_volume: ${{ steps.final.outputs.docker_volume }}
docker_devices: ${{ steps.final.outputs.docker_devices }} docker_devices: ${{ steps.final.outputs.docker_devices }}
runs_on: ${{ steps.final.outputs.runs_on }} runs_on: ${{ steps.final.outputs.runs_on }}
label: ${{ steps.final.outputs.label }} label_extension: ${{ steps.final.outputs.label_extension }}
extra_pytest: ${{ steps.final.outputs.extra_pytest }} extra_pytest: ${{ steps.final.outputs.extra_pytest }}
concurrency: concurrency:
group: ${{ github.workflow }}-build-and-push-image-${{ inputs.hardware }}-${{ github.head_ref || github.run_id }} group: ${{ github.workflow }}-build-and-push-image-${{ inputs.hardware }}-${{ github.head_ref || github.run_id }}
@ -114,6 +114,16 @@ jobs:
export extra_pytest="-k test_flash_gemma_simple" export extra_pytest="-k test_flash_gemma_simple"
export target="" export target=""
;; ;;
neuron)
export dockerfile="Dockerfile.neuron"
export label_extension="-neuron"
export docker_devices="/dev/neuron0"
export docker_volume="/mnt/cache"
export runs_on="aws-inf2-8xlarge"
export platform="cpu"
export extra_pytest="--neuron"
export target=""
;;
esac esac
echo $dockerfile echo $dockerfile
echo "Dockerfile=${dockerfile}" echo "Dockerfile=${dockerfile}"
@ -122,7 +132,7 @@ jobs:
echo $runs_on echo $runs_on
echo $platform echo $platform
echo "DOCKERFILE=${dockerfile}" >> $GITHUB_ENV echo "DOCKERFILE=${dockerfile}" >> $GITHUB_ENV
echo "LABEL=${label_extension}" >> $GITHUB_ENV echo "LABEL_EXTENSION=${label_extension}" >> $GITHUB_ENV
echo "PLATFORM=${platform}" >> $GITHUB_ENV echo "PLATFORM=${platform}" >> $GITHUB_ENV
echo "DOCKER_VOLUME=${docker_volume}" >> $GITHUB_ENV echo "DOCKER_VOLUME=${docker_volume}" >> $GITHUB_ENV
echo "DOCKER_DEVICES=${docker_devices}" >> $GITHUB_ENV echo "DOCKER_DEVICES=${docker_devices}" >> $GITHUB_ENV
@ -172,7 +182,7 @@ jobs:
images: | images: |
docker.io/huggingface/text-generation-inference-ci docker.io/huggingface/text-generation-inference-ci
tags: | tags: |
type=raw,value=sha-${{ env.GITHUB_SHA_SHORT }}${{ env.LABEL }} type=raw,value=sha-${{ env.GITHUB_SHA_SHORT }}${{ env.LABEL_EXTENSION }}
# If main, release or tag # If main, release or tag
- name: Extract metadata (tags, labels) for Docker - name: Extract metadata (tags, labels) for Docker
if: ${{ github.event_name != 'pull_request' }} if: ${{ github.event_name != 'pull_request' }}
@ -186,10 +196,10 @@ jobs:
ghcr.io/huggingface/text-generation-inference ghcr.io/huggingface/text-generation-inference
db4c2190dd824d1f950f5d1555fbadf0.azurecr.io/text-generation-inference db4c2190dd824d1f950f5d1555fbadf0.azurecr.io/text-generation-inference
tags: | tags: |
type=semver,pattern={{version}}${{ env.LABEL }} type=semver,pattern={{version}}${{ env.LABEL_EXTENSION }}
type=semver,pattern={{major}}.{{minor}}${{ env.LABEL }} type=semver,pattern={{major}}.{{minor}}${{ env.LABEL_EXTENSION }}
type=raw,value=latest${{ env.LABEL }},enable=${{ github.ref == format('refs/heads/{0}', github.event.repository.default_branch) }} type=raw,value=latest${{ env.LABEL_EXTENSION }},enable=${{ github.ref == format('refs/heads/{0}', github.event.repository.default_branch) }}
type=raw,value=sha-${{ env.GITHUB_SHA_SHORT }}${{ env.LABEL }} type=raw,value=sha-${{ env.GITHUB_SHA_SHORT }}${{ env.LABEL_EXTENSION }}
- name: Build and push Docker image - name: Build and push Docker image
id: build-and-push id: build-and-push
uses: docker/build-push-action@v4 uses: docker/build-push-action@v4
@ -200,7 +210,7 @@ jobs:
platforms: 'linux/amd64' platforms: 'linux/amd64'
build-args: | build-args: |
GIT_SHA=${{ env.GITHUB_SHA }} GIT_SHA=${{ env.GITHUB_SHA }}
DOCKER_LABEL=sha-${{ env.GITHUB_SHA_SHORT }}${{ env.LABEL }} DOCKER_LABEL=sha-${{ env.GITHUB_SHA_SHORT }}${{ env.LABEL_EXTENSION }}
PLATFORM=${{ env.PLATFORM }} PLATFORM=${{ env.PLATFORM }}
build_type=${{ env.BUILD_TYPE }} build_type=${{ env.BUILD_TYPE }}
sccache_gha_enabled=on sccache_gha_enabled=on
@ -209,23 +219,55 @@ jobs:
target: ${{ env.TARGET }} target: ${{ env.TARGET }}
tags: ${{ steps.meta.outputs.tags || steps.meta-pr.outputs.tags }} tags: ${{ steps.meta.outputs.tags || steps.meta-pr.outputs.tags }}
labels: ${{ steps.meta.outputs.labels || steps.meta-pr.outputs.labels }} labels: ${{ steps.meta.outputs.labels || steps.meta-pr.outputs.labels }}
cache-from: type=s3,region=us-east-1,bucket=ci-docker-buildx-cache,name=text-generation-inference-cache${{ env.LABEL }},mode=max,access_key_id=${{ secrets.S3_CI_DOCKER_BUILDX_CACHE_ACCESS_KEY_ID }},secret_access_key=${{ secrets.S3_CI_DOCKER_BUILDX_CACHE_SECRET_ACCESS_KEY }},mode=min cache-from: type=s3,region=us-east-1,bucket=ci-docker-buildx-cache,name=text-generation-inference-cache${{ env.LABEL_EXTENSION }},mode=max,access_key_id=${{ secrets.S3_CI_DOCKER_BUILDX_CACHE_ACCESS_KEY_ID }},secret_access_key=${{ secrets.S3_CI_DOCKER_BUILDX_CACHE_SECRET_ACCESS_KEY }},mode=min
cache-to: type=s3,region=us-east-1,bucket=ci-docker-buildx-cache,name=text-generation-inference-cache${{ env.LABEL }},mode=min,access_key_id=${{ secrets.S3_CI_DOCKER_BUILDX_CACHE_ACCESS_KEY_ID }},secret_access_key=${{ secrets.S3_CI_DOCKER_BUILDX_CACHE_SECRET_ACCESS_KEY }},mode=min cache-to: type=s3,region=us-east-1,bucket=ci-docker-buildx-cache,name=text-generation-inference-cache${{ env.LABEL_EXTENSION }},mode=min,access_key_id=${{ secrets.S3_CI_DOCKER_BUILDX_CACHE_ACCESS_KEY_ID }},secret_access_key=${{ secrets.S3_CI_DOCKER_BUILDX_CACHE_SECRET_ACCESS_KEY }},mode=min
- name: Final - name: Final
id: final id: final
run: | run: |
echo "docker_image=docker.io/huggingface/text-generation-inference-ci:sha-${{ env.GITHUB_SHA_SHORT}}${{ env.LABEL }}" >> "$GITHUB_OUTPUT" echo "docker_image=docker.io/huggingface/text-generation-inference-ci:sha-${{ env.GITHUB_SHA_SHORT}}${{ env.LABEL_EXTENSION }}" >> "$GITHUB_OUTPUT"
echo "docker_devices=${{ env.DOCKER_DEVICES }}" >> "$GITHUB_OUTPUT" echo "docker_devices=${{ env.DOCKER_DEVICES }}" >> "$GITHUB_OUTPUT"
echo "docker_volume=${{ env.DOCKER_VOLUME }}" >> "$GITHUB_OUTPUT" echo "docker_volume=${{ env.DOCKER_VOLUME }}" >> "$GITHUB_OUTPUT"
echo "runs_on=${{ env.RUNS_ON }}" >> "$GITHUB_OUTPUT" echo "runs_on=${{ env.RUNS_ON }}" >> "$GITHUB_OUTPUT"
echo "label=${{ env.LABEL }}" >> "$GITHUB_OUTPUT" echo "label_extension=${{ env.LABEL_EXTENSION }}" >> "$GITHUB_OUTPUT"
echo "extra_pytest=${{ env.EXTRA_PYTEST }}" >> "$GITHUB_OUTPUT" echo "extra_pytest=${{ env.EXTRA_PYTEST }}" >> "$GITHUB_OUTPUT"
integration_tests: precompile_static_models:
concurrency: concurrency:
group: ${{ github.workflow }}-${{ github.job }}-${{ needs.build-and-push.outputs.label }}-${{ github.head_ref || github.run_id }} group: ${{ github.workflow }}-${{ github.job }}-${{ needs.build-and-push.outputs.label_extension }}-${{ github.head_ref || github.run_id }}
cancel-in-progress: true cancel-in-progress: true
needs: build-and-push needs: build-and-push
if: needs.build-and-push.outputs.runs_on != 'ubuntu-latest' if: needs.build-and-push.outputs.label_extension == '-neuron'
runs-on:
group: ${{ needs.build-and-push.outputs.runs_on }}
env:
PYTEST_FLAGS: ${{ (startsWith(github.ref, 'refs/tags/') || github.ref == 'refs/heads/main' || inputs.release-tests == true) && '--release' || '--release' }}
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Inject slug/short variables
uses: rlespinasse/github-slug-action@v4.4.1
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: "3.11"
- name: Install
run: |
make install-integration-tests
- name: Run tests
run: |
export DOCKER_VOLUME=${{ needs.build-and-push.outputs.docker_volume }}
export DOCKER_IMAGE=${{ needs.build-and-push.outputs.docker_image }}
export DOCKER_DEVICES=${{ needs.build-and-push.outputs.docker_devices }}
export EXTRA_PYTEST="${{ needs.build-and-push.outputs.extra_pytest }}"
export HF_TOKEN=${{ secrets.HF_TOKEN_NEURON }}
echo $DOCKER_IMAGE
docker pull $DOCKER_IMAGE
pytest -s -vv integration-tests ${PYTEST_FLAGS} ${EXTRA_PYTEST}
integration_tests:
concurrency:
group: ${{ github.workflow }}-${{ github.job }}-${{ needs.build-and-push.outputs.label_extension }}-${{ github.head_ref || github.run_id }}
cancel-in-progress: true
needs: [precompile_static_models, build-and-push]
if: ${{ always() && !contains(needs.*.result, 'failure') && !contains(needs.*.result, 'cancelled') && needs.build-and-push.outputs.runs_on != 'ubuntu-latest' }}
runs-on: runs-on:
group: ${{ needs.build-and-push.outputs.runs_on }} group: ${{ needs.build-and-push.outputs.runs_on }}
env: env:
@ -255,7 +297,7 @@ jobs:
backend_trtllm_cxx_tests: backend_trtllm_cxx_tests:
needs: build-and-push needs: build-and-push
if: needs.build-and-push.outputs.label == '-trtllm' if: needs.build-and-push.outputs.label_extension == '-trtllm'
concurrency: concurrency:
group: ${{ github.workflow }}-${{ github.job }}-trtllm-${{ github.head_ref || github.run_id }} group: ${{ github.workflow }}-${{ github.job }}-trtllm-${{ github.head_ref || github.run_id }}
cancel-in-progress: true cancel-in-progress: true
@ -270,5 +312,5 @@ jobs:
steps: steps:
- name: Run C++/CUDA tests - name: Run C++/CUDA tests
if: ${{ env.LABEL == 'ci-runtime' }} if: ${{ env.LABEL_EXTENSION == 'ci-runtime' }}
run: /usr/local/tgi/bin/tgi_trtllm_backend_tests run: /usr/local/tgi/bin/tgi_trtllm_backend_tests

View File

@ -37,7 +37,7 @@ jobs:
# fail-fast is true by default # fail-fast is true by default
fail-fast: false fail-fast: false
matrix: matrix:
hardware: ["cuda", "cuda-trtllm", "rocm", "intel-xpu", "intel-cpu"] hardware: ["cuda", "cuda-trtllm", "rocm", "intel-xpu", "intel-cpu", "neuron"]
uses: ./.github/workflows/build.yaml # calls the one above ^ uses: ./.github/workflows/build.yaml # calls the one above ^
permissions: permissions:
contents: write contents: write

169
Dockerfile.neuron Normal file
View File

@ -0,0 +1,169 @@
# Fetch and extract the TGI sources
FROM alpine AS tgi
RUN mkdir -p /tgi
# Fetch the optimum-neuron sources directly to avoid relying on pypi deployments
FROM alpine AS optimum-neuron
RUN mkdir -p /optimum-neuron
ADD https://github.com/huggingface/optimum-neuron/archive/refs/tags/v0.0.28.tar.gz /optimum-neuron/sources.tar.gz
RUN tar -C /optimum-neuron -xf /optimum-neuron/sources.tar.gz --strip-components=1
# Build cargo components (adapted from TGI original Dockerfile)
# Note: we cannot use the cargo-chef base image as it uses python 3.11
FROM ubuntu:22.04 AS chef
RUN apt-get update -y \
&& apt-get install -y --no-install-recommends \
curl ca-certificates build-essential \
&& rm -rf /var/lib/apt/lists/* \
&& apt-get clean
RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- --default-toolchain 1.80.1 --profile minimal -y
ENV PATH="/root/.cargo/bin:${PATH}"
RUN cargo install cargo-chef --locked
WORKDIR /usr/src
ARG CARGO_REGISTRIES_CRATES_IO_PROTOCOL=sparse
FROM chef AS planner
COPY backends/neuron/Cargo.toml Cargo.toml
COPY Cargo.lock Cargo.lock
COPY rust-toolchain.toml rust-toolchain.toml
COPY proto proto
COPY router router
COPY backends backends
COPY launcher launcher
RUN cargo chef prepare --recipe-path recipe.json
FROM chef AS builder
RUN apt-get update -y \
&& apt-get install -y --no-install-recommends \
unzip python3-dev libssl-dev pkg-config \
&& rm -rf /var/lib/apt/lists/* \
&& apt-get clean
RUN PROTOC_ZIP=protoc-21.12-linux-x86_64.zip && \
curl -OL https://github.com/protocolbuffers/protobuf/releases/download/v21.12/$PROTOC_ZIP && \
unzip -o $PROTOC_ZIP -d /usr/local bin/protoc && \
unzip -o $PROTOC_ZIP -d /usr/local 'include/*' && \
rm -f $PROTOC_ZIP
COPY backends/neuron/Cargo.toml Cargo.toml
COPY --from=planner /usr/src/recipe.json recipe.json
RUN cargo chef cook --release --recipe-path recipe.json
COPY Cargo.lock Cargo.lock
COPY rust-toolchain.toml rust-toolchain.toml
COPY proto proto
COPY router router
COPY backends backends
COPY launcher launcher
RUN cargo build --release
# Python base image
FROM ubuntu:22.04 AS base
RUN apt-get update -y \
&& apt-get install -y --no-install-recommends \
python3-pip \
python3-setuptools \
python-is-python3 \
&& rm -rf /var/lib/apt/lists/* \
&& apt-get clean
RUN pip3 --no-cache-dir install --upgrade pip
# Python server build image
FROM base AS pyserver
RUN apt-get update -y \
&& apt-get install -y --no-install-recommends \
make \
python3-venv \
&& rm -rf /var/lib/apt/lists/* \
&& apt-get clean
RUN install -d /pyserver
WORKDIR /pyserver
COPY backends/neuron/server server
COPY proto proto
RUN pip3 install -r server/build-requirements.txt
RUN VERBOSE=1 BUILDDIR=/pyserver/build PROTODIR=/pyserver/proto make -C server package
# Neuron base image (used for deployment)
FROM base AS neuron
# Install system prerequisites
RUN apt-get update -y \
&& apt-get install -y --no-install-recommends \
gnupg2 \
wget \
python3-dev \
libexpat1 \
&& rm -rf /var/lib/apt/lists/* \
&& apt-get clean
RUN echo "deb https://apt.repos.neuron.amazonaws.com jammy main" > /etc/apt/sources.list.d/neuron.list
RUN wget -qO - https://apt.repos.neuron.amazonaws.com/GPG-PUB-KEY-AMAZON-AWS-NEURON.PUB | apt-key add -
# Install neuronx packages
RUN apt-get update -y \
&& apt-get install -y --no-install-recommends \
aws-neuronx-dkms=2.18.20.0 \
aws-neuronx-collectives=2.22.33.0-d2128d1aa \
aws-neuronx-runtime-lib=2.22.19.0-5856c0b42 \
aws-neuronx-tools=2.19.0.0 \
libxml2 \
&& rm -rf /var/lib/apt/lists/* \
&& apt-get clean
ENV PATH="/opt/bin/:/opt/aws/neuron/bin:${PATH}"
# Install manually torch CPU version to avoid pulling CUDA
RUN pip3 install \
torch==2.1.2 \
torchvision==0.16.2 \
--index-url https://download.pytorch.org/whl/cpu
RUN pip3 install \
neuronx-cc==2.15.143.0 \
torch-neuronx==2.1.2.2.3.2 \
transformers-neuronx==0.12.313 \
neuronx-distributed==0.9.0 \
libneuronxla==2.0.5347.0 \
--extra-index-url=https://pip.repos.neuron.amazonaws.com
# Install HuggingFace packages
RUN pip3 install \
hf_transfer huggingface_hub
# Install optimum-neuron
COPY --from=optimum-neuron /optimum-neuron optimum-neuron
RUN pip3 install ./optimum-neuron
# TGI base env
ENV HUGGINGFACE_HUB_CACHE=/tmp \
HF_HUB_ENABLE_HF_TRANSFER=1 \
PORT=80
# Disable color logs as they are not supported by CloudWatch
ENV LOGURU_COLORIZE=NO
ENV LOG_COLORIZE=0
# Install router
COPY --from=builder /usr/src/target/release/text-generation-router-v2 /usr/local/bin/text-generation-router
# Install launcher
COPY --from=builder /usr/src/target/release/text-generation-launcher /usr/local/bin/text-generation-launcher
# Install python server
COPY --from=pyserver /pyserver/build/dist dist
RUN pip install dist/text_generation_server*.tar.gz
# Final image
FROM neuron
COPY backends/neuron/tgi_env.py /tgi_env.py
COPY backends/neuron/tgi-entrypoint.sh /tgi-entrypoint.sh
RUN chmod +x /tgi-entrypoint.sh
ENTRYPOINT ["/tgi-entrypoint.sh"]

View File

@ -0,0 +1,47 @@
[workspace]
members = [
"backends/v2",
"backends/grpc-metadata",
"launcher",
"router"
]
default-members = [
"backends/v2",
"backends/grpc-metadata",
"launcher",
"router"
]
resolver = "2"
[workspace.package]
version = "3.0.0"
edition = "2021"
authors = ["Olivier Dehaene"]
homepage = "https://github.com/huggingface/text-generation-inference"
[workspace.dependencies]
base64 = "0.22.0"
tokenizers = { version = "0.20.0", features = ["http"] }
hf-hub = { version = "0.3.1", features = ["tokio"] }
metrics = { version = "0.23.0" }
metrics-exporter-prometheus = { version = "0.15.1", features = [] }
minijinja = { version = "2.2.0", features = ["json"] }
minijinja-contrib = { version = "2.0.2", features = ["pycompat"] }
pyo3 = { version = "0.22.2", features = ["auto-initialize"] }
[profile.release]
incremental = true
[profile.release-binary]
inherits = "release"
debug = 1
incremental = true
panic = "abort"
[profile.release-opt]
inherits = "release"
debug = 0
incremental = false
lto = "fat"
opt-level = 3
codegen-units = 1

34
backends/neuron/Makefile Normal file
View File

@ -0,0 +1,34 @@
# Copyright 2025 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
mkfile_path := $(abspath $(lastword $(MAKEFILE_LIST)))
mkfile_dir := $(dir $(mkfile_path))
root_dir := "${mkfile_dir}/../.."
.PHONY: image install_server test_server test_integration
VERSION := $(shell gawk 'match($$0, /^version = "(.*)"/, a) {print a[1]}' ${root_dir}/Cargo.toml)
image:
docker build --rm -f ${root_dir}/Dockerfile.neuron \
--ulimit nofile=100000:100000 \
--build-arg VERSION=$(VERSION) \
-t text-generation-inference:$(VERSION)-neuron ${root_dir}
install_server:
make -C ${mkfile_dir}/server install VERSION:=${VERSION}
test_server: install_server
python -m pip install -r ${mkfile_dir}/tests/requirements.txt
python -m pytest -sv ${mkfile_dir}/tests/server

25
backends/neuron/README.md Normal file
View File

@ -0,0 +1,25 @@
# Text-generation-inference - Neuron backend for AWS Trainium and inferentia2
## Description
This is the TGI backend for AWS Neuron Trainium and Inferentia family of chips.
This backend is composed of:
- the AWS Neuron SDK,
- the legacy v2 TGI launcher and router,
- a neuron specific inference server for text-generation.
## Usage
Please refer to the official [documentation](https://huggingface.co/docs/text-generation-inference/backends/neuron).
## Build your own image
The simplest way to build TGI with the neuron backend is to use the provided `Makefile`:
```shell
$ make -C backends/neuron image
```
Alternatively, you can build the image directly from the top directory using a command similar to the one defined
in the `Makefile` under the `image` target.

1
backends/neuron/server/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
build

View File

@ -0,0 +1,74 @@
# Initialize base variables
SHELL := /bin/bash
pkg_name := text_generation_server
BUILDDIR ?= $(CURDIR)/build
VERSION ?= 0.0.1
mkfile_path := $(abspath $(lastword $(MAKEFILE_LIST)))
mkfile_dir := $(dir $(mkfile_path))
pkg_dir := $(BUILDDIR)/$(pkg_name)
py_version := $(subst -,.,${VERSION})
pkg_dist := ${BUILDDIR}/dist/${pkg_name}-$(py_version).tar.gz
clean:
rm -rf $(BUILDDIR)/*
${BUILDDIR}:
install -d $@
# List static sources to be deployed in the package
src_dir := $(mkfile_dir)/$(pkg_name)
sources := $(wildcard $(src_dir)/*.py)
deployed_sources := $(subst $(src_dir), $(pkg_dir), $(sources))
# Static files are just copied
define COPY
cp -f $< $@
endef
# We use a PHONY target to represent the VERSION
.PHONY: VERSION
VERSION: ${BUILDDIR}
# The trick is to compare the value of the variable with the content of a file in the build directory
@if [[ `cat ${BUILDDIR}/VERSION 2>&1` != '$(VERSION)' ]]; then echo -n $(VERSION) >${BUILDDIR}/VERSION; fi
# Depending on the PHONY VERSION target makes sure the pyproject.toml is regenerated if the version changes
$(BUILDDIR)/pyproject.toml: $(mkfile_dir)/pyproject.toml VERSION
mkdir -p $(BUILDDIR)
$(COPY)
sed -i -e 's/version = "VERSION"/version = \"${VERSION}\"/' $@
$(pkg_dir)/%.py: $(src_dir)/%.py
mkdir -p $(pkg_dir)
$(COPY)
# Generated files are produced by grpcio tools
# If not provided, get local proto files
ifndef PROTODIR
PROTODIR := $(mkfile_dir)/../../../proto
endif
# Three python files are generated for each protobuf
protobufs := $(PROTODIR)/generate.proto
pkg_pb_dir := $(pkg_dir)/pb
generated_sources_base := $(foreach proto, $(protobufs), $(proto:.proto=_pb2.py))
generated_sources := $(subst $(PROTODIR), $(pkg_pb_dir), $(generated_sources_base))
generated_sources += $(subst $(PROTODIR), $(pkg_pb_dir), $(generated_sources_base:.py=.pyi))
generated_sources += $(subst $(PROTODIR), $(pkg_pb_dir), $(generated_sources_base:.py=_grpc.py))
$(pkg_pb_dir)/%_pb2.py $(pkg_pb_dir)/%_pb2.pyi $(pkg_pb_dir)/%_pb2_grpc.py: $(PROTODIR)/%.proto
mkdir -p $(pkg_pb_dir)
python -m grpc_tools.protoc -I$(PROTODIR) --python_out=$(pkg_pb_dir) \
--grpc_python_out=$(pkg_pb_dir) --mypy_out=$(pkg_pb_dir) $^
sed -i -e 's/^\(import.*pb2\)/from . \1/g' $(pkg_pb_dir)/$*_pb2_grpc.py
${pkg_dist}: $(BUILDDIR)/pyproject.toml $(deployed_sources) $(generated_sources)
python -m build $(BUILDDIR)
package: ${pkg_dist}
install: ${pkg_dist}
python3 -m pip uninstall -y ${pkg_name}
python3 -m pip install ${pkg_dist}

View File

@ -0,0 +1,3 @@
build
grpcio-tools==1.53.0
mypy-protobuf

View File

@ -0,0 +1,26 @@
[build-system]
requires = ["setuptools>=61.0"]
build-backend = "setuptools.build_meta"
[project]
name = "text-generation-server"
version = "VERSION"
authors = [{name="David Corvoysier", email="david@huggingface.co" }]
description = "TGI compatible inference server for AWS Neuronx platforms"
dependencies = [
'protobuf > 3.20.1, < 4',
'grpcio == 1.57.0',
'grpcio-status == 1.48.2',
'grpcio-reflection == 1.48.2',
'grpc-interceptor == 0.15.2',
'typer == 0.6.1',
'safetensors',
'loguru == 0.6.0',
'optimum-neuron[neuronx] >= 0.0.28',
]
[tool.setuptools]
packages = ["text_generation_server", "text_generation_server.pb"]
[project.scripts]
text-generation-server = 'text_generation_server.cli:app'

View File

@ -0,0 +1,111 @@
import sys
from typing import Optional
import typer
from loguru import logger
app = typer.Typer()
@app.command()
def serve(
model_id: str,
revision: Optional[str] = None,
sharded: bool = False,
trust_remote_code: bool = None,
uds_path: str = "/tmp/text-generation-server",
logger_level: str = "INFO",
json_output: bool = False,
otlp_endpoint: Optional[str] = None,
otlp_service_name: str = "text-generation-inference.server",
max_input_tokens: Optional[int] = None,
):
"""This is the main entry-point for the server CLI.
Args:
model_id (`str`):
The *model_id* of a model on the HuggingFace hub or the path to a local model.
revision (`Optional[str]`, defaults to `None`):
The revision of the model on the HuggingFace hub.
sharded (`bool`):
Whether the model must be sharded or not. Kept for compatibility with the
text-generation-launcher, but must be set to False.
trust-remote-code (`bool`):
Kept for compatibility with text-generation-launcher. Ignored.
uds_path (`Union[Path, str]`):
The local path on which the server will expose its google RPC services.
logger_level (`str`):
The server logger level. Defaults to *INFO*.
json_output (`bool`):
Use JSON format for log serialization.
otlp_endpoint (`Optional[str]`, defaults to `None`):
The Open Telemetry endpoint to use.
otlp_service_name (`Optional[str]`, defaults to `None`):
The name to use when pushing data to the Open Telemetry endpoint.
max_input_tokens (`Optional[int]`, defaults to `None`):
The maximum number of input tokens each request should contain.
"""
if sharded:
raise ValueError("Sharding is not supported.")
# Remove default handler
logger.remove()
logger.add(
sys.stdout,
format="{message}",
filter="text_generation_server",
level=logger_level,
serialize=json_output,
backtrace=True,
diagnose=False,
)
if trust_remote_code is not None:
logger.warning("'trust_remote_code' argument is not supported and will be ignored.")
# Import here after the logger is added to log potential import exceptions
from .server import serve
serve(model_id, revision, uds_path)
@app.command()
def download_weights(
model_id: str,
revision: Optional[str] = None,
logger_level: str = "INFO",
json_output: bool = False,
auto_convert: Optional[bool] = None,
extension: Optional[str] = None,
trust_remote_code: Optional[bool] = None,
merge_lora: Optional[bool] = None,
):
"""Download the model weights.
This command will be called by text-generation-launcher before serving the model.
"""
# Remove default handler
logger.remove()
logger.add(
sys.stdout,
format="{message}",
filter="text_generation_server",
level=logger_level,
serialize=json_output,
backtrace=True,
diagnose=False,
)
if extension is not None:
logger.warning("'extension' argument is not supported and will be ignored.")
if trust_remote_code is not None:
logger.warning("'trust_remote_code' argument is not supported and will be ignored.")
if auto_convert is not None:
logger.warning("'auto_convert' argument is not supported and will be ignored.")
if merge_lora is not None:
logger.warning("'merge_lora' argument is not supported and will be ignored.")
# Import here after the logger is added to log potential import exceptions
from .model import fetch_model
fetch_model(model_id, revision)

View File

@ -0,0 +1,636 @@
import copy
import logging
import time
from abc import ABC
from enum import Enum
from typing import List, Optional, Tuple
import torch
from loguru import logger
from transformers import AutoConfig, AutoTokenizer, PreTrainedTokenizerBase
from transformers.generation import GenerationConfig
from optimum.neuron import NeuronModelForCausalLM
from optimum.neuron.generation import TokenSelector
from .model import get_export_kwargs_from_env
from .pb.generate_pb2 import (
Batch,
CachedBatch,
FinishReason,
GeneratedText,
Generation,
InfoResponse,
Request,
Tokens,
)
# Disable optimum-neuron warnings as it seems to block the server after a while
optimum_logger = logging.getLogger("optimum.neuron")
optimum_logger.setLevel("CRITICAL")
class Generator(ABC):
"""An abstract class to represent the workhorse behind TextGenerationService.
Ideally, it should not rely on protobuf constructs, but in a first step it does.
Implementations would typically need a model and a tokenizer to implement the Generator methods.
"""
@property
def info(self) -> InfoResponse:
"""This should simply return the expected InfoResponse"""
raise NotImplementedError
def warmup(self, batch: Batch) -> int:
"""Verify if the hardware can support the target load.
Args:
batch (`Batch`):
A batch corresponding to the maximum number of concurrent requests.
Return:
The maximum number of tokens the model supports.
"""
raise NotImplementedError
def prefill(self, batch: Batch) -> Tuple[List[Generation], CachedBatch]:
"""Prefill is called whenever new requests need to be added.
When this method returns successfully, a decode method will follow
with both the current and newly prefilled batch(es).
Args:
batch (`Batch`):
A batch containing the new requests.
Return:
A list of `Generation` for each request and a `CachedBatch` containing all pending requests.
"""
raise NotImplementedError
def decode(self, batches: List[Batch]) -> Tuple[List[Generation], CachedBatch]:
"""Decode after a prefill or another decode."""
raise NotImplementedError
def filter(self, batch_id: int, request_ids: List[int]) -> CachedBatch:
"""Remove requests that are not listed from the specified batch"""
raise NotImplementedError
def clear(self):
"""Remove all requests from the generator"""
raise NotImplementedError
@classmethod
def from_pretrained(cls, model_id: str, revision: Optional[str]):
"""Factory method "a la transformers" """
raise NotImplementedError
class Slot:
"""Represents a slot in a static batch"""
class State(Enum):
EMPTY = 0
PAUSE = 1
READY = 2
def __init__(self, id: int, tokenizer: PreTrainedTokenizerBase):
self._id = id
self._tokenizer = tokenizer
self.clear()
def clear(self):
"""Clear the slot and mark it as available."""
self._state = Slot.State.EMPTY
self._batch_id = None
self._request_id = None
self._inputs = ""
self._truncate = 0
self._generation_config = None
self._tokens = []
self._mask = torch.tensor([])
self._selector = None
self._generated_tokens = 0
self._next_text_token_start = 0
self._next_text_token_end = 0
self._generated_text = ""
self._next_text = ""
@property
def id(self) -> int:
return self._id
@property
def state(self) -> "Slot.State":
return self._state
@property
def batch_id(self) -> int:
return self._batch_id
@property
def request_id(self) -> int:
return self._request_id
@property
def cached_text(self) -> str:
return self._inputs + self._generated_text
@property
def generation_config(self) -> GenerationConfig:
return self._generation_config
@property
def generated_tokens(self) -> int:
return self._generated_tokens
def assign(self, batch_id: int, request: Request, generation_config: GenerationConfig):
"""Assign a request to a slot.
Args:
request (`Request`):
The request to be assigned. Contains the inputs and tokens selection parameters.
generation_config (`transformers.GenerationConfig`):
The base generation config (might be modified by the request generation parameters).
"""
self._state = Slot.State.READY
self._batch_id = batch_id
self._request_id = request.id
self._inputs = request.inputs
if request.truncate:
self._truncate = request.truncate
self._generation_config = copy.deepcopy(generation_config)
# Update generation config with request parameters
self._generation_config.do_sample = request.parameters.do_sample
if self._generation_config.do_sample:
if request.parameters.temperature != 0:
self._generation_config.temperature = request.parameters.temperature
if request.parameters.top_k != 0:
self._generation_config.top_k = request.parameters.top_k
if request.parameters.top_p != 0:
self._generation_config.top_p = request.parameters.top_p
if request.parameters.typical_p != 0:
self._generation_config.typical_p = request.parameters.typical_p
if request.parameters.repetition_penalty != 0:
self._generation_config.repetition_penalty = request.parameters.repetition_penalty
self.seed = request.parameters.seed
self._generation_config.max_new_tokens = request.stopping_parameters.max_new_tokens
self._max_new_tokens = self._generation_config.max_new_tokens
stop_strings = request.stopping_parameters.stop_sequences
if stop_strings:
self._generation_config.stop_strings = stop_strings
def reset(self, input_ids: torch.LongTensor, attention_mask: torch.LongTensor, selector: TokenSelector):
"""Reset the slot for the next generation.
Args:
input_ids: (`torch.LongTensor`):
The new input_ids to use to generate the next token.
attention_mask: (`torch.LongTensor`):
The new attention_mask to use to generate the next token.
selector: (`optimum.neuron.generation.TokenSelector`):
An object implementing the updated token selection logic.
"""
self._tokens = input_ids.clone()
self._next_text_token_start = 0
self._next_text_token_end = torch.numel(self._tokens)
self._next_text = ""
self._mask = attention_mask.clone()
self._selector = selector
def pause(self, reset_on_pause: bool):
"""Mark the current slot as paused for generation.
Note that the KV cache for this slot will still be filled.
"""
if reset_on_pause:
# Drop the last token as it will be added back when resuming the slot
self._generated_tokens -= 1
# Since generated tokens are now part of the prefill, we need to reevaluate
# max_new_tokens for the next generation
self._generation_config.max_new_tokens = self._max_new_tokens - self._generated_tokens
self._state = Slot.State.PAUSE
def resume(self):
"""Mark the slot as ready for generation."""
self._state = Slot.State.READY
def _decode_next_tokens(
self,
) -> str:
"""Hack to hopefully support generate_stream for the maximum number of tokenizers"""
# We need to include the tokens that produced the last text to defeat cleanup algorithms in the decode
# which decide to add a space or not depending on the surrounding ids.
new_text = self._tokenizer.decode(self._tokens[self._next_text_token_start :], skip_special_tokens=False)
if new_text.endswith("<EFBFBD>"):
# utf-8 char at the end means it's a potential unfinished byte sequence
# from byte fallback tokenization.
return ""
# Compare the generated text with the one using only the tokens producing the last one
last_text = self._tokenizer.decode(
self._tokens[self._next_text_token_start : self._next_text_token_end],
skip_special_tokens=False,
)
if len(new_text) == len(last_text):
# Nothing new was actually generated
return ""
# Return the decoded text and store its token offsets
self._next_text_token_start = self._next_text_token_end
self._next_text_token_end = torch.numel(self._tokens)
return new_text[len(last_text) :]
def append(self, next_token: int) -> str:
"""Append a new generated token to this slot
The new token is added to the list of generated tokens, which impacts
directly the generated_text and stopped property.
The new token is however not added immediately to the slot inputs: it will
be added later on when it has effectively been used to produce the next token.
Args:
next_token (`int`):
The newly generated token.
Return:
The corresponding decoded text (if any).
"""
self._tokens = torch.cat([self._tokens, torch.LongTensor([next_token])])
self._mask = torch.cat([self._mask, torch.LongTensor([1])])
self._generated_tokens += 1
next_text = self._decode_next_tokens()
# Now that a new token has been generated, we can append the previous one to the generated text
self._generated_text += self._next_text
self._next_text = next_text
return next_text
def select(self, input_ids: torch.LongTensor, logits: torch.Tensor) -> torch.LongTensor:
"""Select the next token from the candidate logits.
Args:
input_ids (`torch.LongTensor` of shape `(batch_size, sequence_length)`):
The sequence used as a prompt for the generation (not used in all generation modes).
logits (`torch.Tensor` of shape `(batch_size, sequence_length)`):
The logits corresponding to the generated tokens.
Return:
`torch.LongTensor`: A scalar torch.LongTensor` containing the selected token.
"""
return self._selector.select(input_ids, logits)[0]
@property
def stopped(self) -> bool:
# Transformers stopping criteria expects a batch of input ids
input_ids = torch.unsqueeze(self._tokens, dim=0)
return self._selector.stopping_criteria(input_ids, None)
@property
def generated_text(self) -> str:
return self._generated_text + self._next_text
@property
def next_token(self) -> int:
return None if len(self._tokens) == 0 else self._tokens[-1]
@property
def attention_mask(self) -> torch.LongTensor:
return self._mask
@property
def max_token(self) -> int:
return self._generation_config.max_length
@property
def max_new_tokens(self) -> int:
# The current value of max_new_tokens: might be different of the target max_new_tokens
# if the slot has been paused and resumed.
return self._generation_config.max_new_tokens
@property
def truncate(self) -> int:
return self._truncate
class NeuronGenerator(Generator):
"""A Generator for Neuron models."""
def __init__(
self,
model: NeuronModelForCausalLM,
tokenizer: PreTrainedTokenizerBase,
):
self.model = model
self.rebuild_cache_on_prefill = not self.model.continuous_batching
# Specify padding and truncation options for decoder-only architecture
tokenizer.pad_token_id = tokenizer.eos_token_id
tokenizer.padding_side = "left"
tokenizer.truncation_side = "left"
self.tokenizer = tokenizer
self.special_tokens = self.tokenizer.all_special_ids
self.slots = [Slot(i, tokenizer) for i in range(self.model.batch_size)]
self.batch_id = 0
@property
def info(self) -> InfoResponse:
"""Returns the expected InfoResponse."""
dtype = getattr(self.model.config, "torch_dtype", "float32")
return InfoResponse(
requires_padding=True,
dtype=str(dtype),
device_type="xla",
)
def warmup(self, batch: Batch) -> int:
"""Verify if the hardware can support the target load.
Args:
batch (`Batch`):
A batch corresponding to the maximum number of concurrent requests.
Return:
The maximum number of tokens the model supports.
"""
# Just check that the warmup request parameters match the model capacity
batch_size = self.model.batch_size
if len(batch.requests) > batch_size:
raise ValueError(
f"Inconsistent batch_size configuration: Please make sure the batch_size in the compiled model (currently {batch_size}) matches the batch_size passed to TGI. The compiled model batch_size is usually in the neuron section of the model config.json file. You may also have passed it into optimum-cli during the compilation process. The batch size for TGI is usually set in the environment as MAX_BATCH_SIZE."
)
self.prefill(batch)
self.clear()
return self.model.batch_size * self.model.max_length
def prefill(self, batch: Batch) -> Tuple[List[Generation], CachedBatch]:
"""Prefill new requests.
Args:
batch (`Batch`):
A batch containing the new requests.
Return:
A list of `Generation` for each request and a `CachedBatch` containing all pending requests.
"""
slots = {state: [] for state in Slot.State}
for slot in self.slots:
slots[slot.state].append(slot)
active_slots = slots[Slot.State.READY]
empty_slots = slots[Slot.State.EMPTY]
if len(empty_slots) < len(batch.requests):
raise ValueError(
f"Cannot prefill {len(batch.requests)} new request(s) with only {len(empty_slots)} empty slots."
f" Please align max_batch_size with the static batch size: {self.model.batch_size}."
)
# Assign each request to an empty slot
logger.debug(f"Prefilling {len(batch.requests)} new request(s) with {len(empty_slots)} empty slot(s)")
new_slots = []
for request in batch.requests:
slot = empty_slots.pop()
slot.assign(self.batch_id, request, self.model.generation_config)
new_slots.append(slot)
logger.debug(
f"Request {slot.request_id} assigned to slot {slot.id} with and max_new_tokens {slot.max_new_tokens}"
)
if self.rebuild_cache_on_prefill:
# We will clear pending slots and prefill all slots
prefill_slots = self.slots
seq_ids = None
else:
# We only need to pass inputs for the new requests
prefill_slots = new_slots
seq_ids = torch.tensor([slot.id for slot in prefill_slots])
# Reconstruct the full inputs (without padding) as seen by the model.
# This comprises:
# - the inputs for new requests,
# - only when rebuilding the cache, the inputs and the generated text that has already
# been cached (i.e. excluding the last generated token) for unfinished requests.
inputs = []
max_length = 0
for slot in prefill_slots:
inputs.append(slot.cached_text)
# Apply truncation, making sure we fit into static dimensions
if slot.truncate == 0:
max_length = self.model.max_length
elif slot.truncate > max_length and slot.truncate < self.model.max_length:
max_length = slot.truncate
# Tokenize with padding and truncation
padded_inputs = self.tokenizer(
inputs, return_tensors="pt", padding=True, truncation=True, max_length=max_length
)
input_ids = padded_inputs.input_ids
attention_mask = padded_inputs.attention_mask
# Pause previously active slots during generation
next_tokens = []
for slot in active_slots:
slot.pause(reset_on_pause=self.rebuild_cache_on_prefill)
if self.rebuild_cache_on_prefill:
# The slot will be reset, so we need to store its next token
next_tokens.append(slot.next_token)
# Each slot must be reset with the padded inputs and masks
for i, slot in enumerate(prefill_slots):
if slot.state != slot.state.EMPTY:
if slot.truncate > 0 and slot.truncate < input_ids.shape[-1]:
# Apply per-request truncation
input_ids[i, : -slot.truncate] = self.tokenizer.pad_token_id
attention_mask[i, : -slot.truncate] = 0
slot_input_ids = input_ids[i : i + 1, :]
# Padded input ids are also required to set logits processors and stopping criterias
selector = TokenSelector.create(
slot_input_ids,
slot.generation_config,
self.model,
self.model.max_length,
tokenizer=self.tokenizer,
seed=slot.seed,
)
slot_input_ids = slot_input_ids.squeeze(dim=0).type(torch.int64)
slot_attention_mask = attention_mask[i]
slot.reset(slot_input_ids, slot_attention_mask, selector)
# Note: when rebuilding cache on prefill, the new tokens on paused slots will be ignored,
# as they have already been generated and sent back in the last decode.
model_inputs = self.model.prepare_inputs_for_prefill(input_ids, attention_mask, seq_ids)
logits = self.model(**model_inputs)[0]
generation, next_batch = self._generate_token(prefill_slots, self.batch_id, logits, input_ids)
self.batch_id += 1
# Reactivate previously active slots for the next decode
for i, slot in enumerate(active_slots):
slot.resume()
if self.rebuild_cache_on_prefill:
# Append back the next token
slot.append(next_tokens[i])
logger.debug("Model ready for decoding")
if next_batch is not None:
logger.debug(f"Next batch is {next_batch.id} with requests: {next_batch.request_ids}")
return generation, next_batch
def decode(self, batches: List[CachedBatch]) -> Tuple[List[Generation], CachedBatch]:
"""Decode the specified prefilled requests.
Args:
batches (`List[CachedBatch]`):
A list of previous batches containing the prefilled requests.
Return:
A list of `Generation` for each request and a `CachedBatch` containing all pending requests.
"""
# batches contains a list composed of:
# - the batch id returned by the last decode,
# - the batch id(s) returned by the last prefill(s)
# Batches are always concatenated during prefill, so we can
# just carry on with decoding. We adopt the id of the first
# batch in the list as our next batch id.
next_batch_id = batches[0].id
request_ids = []
for batch in batches:
request_ids += batch.request_ids
cleared_request_ids = []
for slot in self.slots:
if slot.state == slot.State.READY and slot.request_id not in request_ids:
cleared_request_ids.append(slot.request_id)
slot.clear()
if len(cleared_request_ids) > 0:
logger.info(f"Clearing slot for requests {cleared_request_ids} as they are not requested.")
active_slots = [slot for slot in self.slots if slot.state == slot.State.READY]
if len(active_slots) < len(request_ids):
raise ValueError("Unable to decode tokens for non-prefilled batches (probably due to a previous failure)")
if self.model.continuous_batching:
decode_slots = active_slots
seq_ids = torch.tensor([slot.id for slot in decode_slots])
else:
decode_slots = self.slots
seq_ids = None
# Reconstruct input_ids and attention_mask from decode slots
n_slots = len(decode_slots)
input_ids = torch.full([n_slots, 1], fill_value=self.tokenizer.eos_token_id, dtype=torch.int64)
max_length = 0
for slot in decode_slots:
max_length = max(max_length, slot.attention_mask.size(-1))
attention_mask = torch.zeros([n_slots, max_length], dtype=torch.int64)
for i, slot in enumerate(decode_slots):
if slot.state != Slot.State.EMPTY:
# input_ids are simply the tokens generated by the last decode or prefill requests (other tokens are cached)
input_ids[i, 0] = slot.next_token
attention_mask[i, : slot.attention_mask.size(-1)] = slot.attention_mask
model_inputs = self.model.prepare_inputs_for_decode(input_ids, attention_mask, seq_ids)
logits = self.model(**model_inputs)[0]
return self._generate_token(decode_slots, next_batch_id, logits, input_ids)
def _generate_token(
self, slots: List[Slot], next_batch_id: int, logits: torch.Tensor, input_ids: torch.LongTensor
) -> Tuple[List[Generation], CachedBatch]:
generations = []
active_slots = False
for i, slot in enumerate(slots):
if slot.state != Slot.State.READY:
continue
request_id = slot.request_id
next_token_logits = logits[i : i + 1, -1, :]
slot_input_ids = input_ids[i : i + 1, :]
next_token = slot.select(slot_input_ids, next_token_logits)
next_token_text = slot.append(next_token)
generated_text = None
finish_reason = None
if next_token == self.tokenizer.eos_token_id:
finish_reason = FinishReason.FINISH_REASON_EOS_TOKEN
elif slot.stopped:
if slot.generated_tokens == slot.max_new_tokens:
finish_reason = FinishReason.FINISH_REASON_LENGTH
else:
finish_reason = FinishReason.FINISH_REASON_STOP_SEQUENCE
if finish_reason is not None:
# We must include the generated text for each finished sequence in the response
generated_text = GeneratedText(
text=slot.generated_text, generated_tokens=slot.generated_tokens, finish_reason=finish_reason
)
logger.debug(f"Decode complete for request {request_id} with {slot.generated_tokens} tokens")
# mark the slot as available
slot.clear()
else:
active_slots = True
generations.append(
Generation(
request_id=request_id,
prefill_tokens=None,
tokens=Tokens(
ids=[next_token],
logprobs=[0],
texts=[next_token_text],
is_special=[next_token in self.special_tokens],
),
generated_text=generated_text,
)
)
batch = None
if active_slots:
# Whatever initial batch these requests came from, we always return all pending requests in a single batch
request_ids = [slot.request_id for slot in self.slots if slot.state == Slot.State.READY]
batch = self._cached_batch(next_batch_id, request_ids)
else:
logger.debug("No more pending requests")
return generations, batch
def _cached_batch(self, batch_id: int, request_ids: List):
size = len(request_ids)
max_tokens = size * self.model.max_length
return CachedBatch(id=batch_id, request_ids=request_ids, size=size, max_tokens=max_tokens)
def filter(self, batch_id: int, keep_request_ids: List[int]) -> CachedBatch:
"""Remove requests that are not listed from the specified batch
Args:
batch_id (`int`):
The id of a cached batch.
keep_ids(`List[int]`):
The list of requests that must be kept.
Return:
A `CachedBatch` containing the pending requests.
"""
keep_slot_ids = [slot.id for slot in self.slots if slot.request_id in keep_request_ids]
self._clear(keep_slot_ids)
return self._cached_batch(batch_id, keep_request_ids)
def clear(self, batch_id: Optional[int] = None):
"""Remove a subset or all requests from the generator"""
keep_ids = []
if batch_id is not None:
keep_ids = [slot.id for slot in self.slots if slot.batch_id != batch_id]
return self._clear(keep_ids)
def _clear(self, keep_slot_ids: List):
for slot in self.slots:
if slot.state != Slot.State.EMPTY and slot.id not in keep_slot_ids:
logger.debug(f"Removing slot {slot.id} with request {slot.request_id}")
slot.clear()
@classmethod
def from_pretrained(cls, model_id: str, revision: str = None):
"""Instantiate a NeuronGenerator.
Args:
model_id (`str`):
A hub model id or the path to a local model. This path must also contain a Tokenizer.
revision (`Optional[str]`, defaults to `None`):
The revision of the model on the HuggingFace hub.
Returns:
A NeuronGenerator.
"""
config = AutoConfig.from_pretrained(model_id)
neuron_config = getattr(config, "neuron", None)
start = time.time()
if neuron_config is None:
export_kwargs = get_export_kwargs_from_env()
logger.info(f"Exporting model to neuron with config: {export_kwargs}.")
model = NeuronModelForCausalLM.from_pretrained(
model_id, revision=revision, low_cpu_mem_usage=True, export=True, **export_kwargs
)
else:
logger.info("Loading model on neuron devices (this can take a few minutes).")
model = NeuronModelForCausalLM.from_pretrained(model_id, low_cpu_mem_usage=True, revision=revision)
end = time.time()
logger.info(f"Model successfully loaded in {end - start:.2f} s.")
tokenizer = AutoTokenizer.from_pretrained(model_id, revision=revision)
return cls(model, tokenizer)

View File

@ -0,0 +1,27 @@
from typing import Any, Callable
import grpc
from google.rpc import code_pb2, status_pb2
from grpc_interceptor.server import AsyncServerInterceptor
from grpc_status import rpc_status
from loguru import logger
class ExceptionInterceptor(AsyncServerInterceptor):
async def intercept(
self,
method: Callable,
request_or_iterator: Any,
context: grpc.ServicerContext,
method_name: str,
) -> Any:
try:
response = method(request_or_iterator, context)
return await response
except Exception as err:
method_name = method_name.split("/")[-1]
logger.exception(f"Method {method_name} encountered an error.")
await context.abort_with_status(
rpc_status.to_status(status_pb2.Status(code=code_pb2.INTERNAL, message=str(err)))
)

View File

@ -0,0 +1,118 @@
import os
import shutil
import time
from typing import Optional
from huggingface_hub import snapshot_download
from huggingface_hub.constants import HF_HUB_CACHE
from loguru import logger
from transformers import AutoConfig
from optimum.neuron import NeuronModelForCausalLM
from optimum.neuron.utils import get_hub_cached_entries
def get_export_kwargs_from_env():
batch_size = os.environ.get("MAX_BATCH_SIZE", None)
if batch_size is not None:
batch_size = int(batch_size)
sequence_length = os.environ.get("MAX_TOTAL_TOKENS", None)
if sequence_length is not None:
sequence_length = int(sequence_length)
num_cores = os.environ.get("HF_NUM_CORES", None)
if num_cores is not None:
num_cores = int(num_cores)
auto_cast_type = os.environ.get("HF_AUTO_CAST_TYPE", None)
return {
"task": "text-generation",
"batch_size": batch_size,
"sequence_length": sequence_length,
"num_cores": num_cores,
"auto_cast_type": auto_cast_type,
}
def is_cached(model_id, neuron_config):
# Look for cached entries for the specified model
in_cache = False
entries = get_hub_cached_entries(model_id, "inference")
# Look for compatible entries
for entry in entries:
compatible = True
for key, value in neuron_config.items():
# Only weights can be different
if key in ["checkpoint_id", "checkpoint_revision"]:
continue
if entry[key] != value:
compatible = False
if compatible:
in_cache = True
break
return in_cache
def log_cache_size():
path = HF_HUB_CACHE
if os.path.exists(path):
usage = shutil.disk_usage(path)
gb = 2**30
logger.info(f"Cache disk [{path}]: total = {usage.total / gb:.2f} G, free = {usage.free / gb:.2f} G")
else:
raise ValueError(f"The cache directory ({path}) does not exist.")
def fetch_model(
model_id: str,
revision: Optional[str] = None,
) -> str:
"""Fetch a neuron model.
Args:
model_id (`str`):
The *model_id* of a model on the HuggingFace hub or the path to a local model.
revision (`Optional[str]`, defaults to `None`):
The revision of the model on the HuggingFace hub.
Returns:
A string corresponding to the model_id or path.
"""
if not os.path.isdir("/sys/class/neuron_device/"):
raise SystemError("No neuron cores detected on the host.")
if os.path.isdir(model_id) and revision is not None:
logger.warning("Revision {} ignored for local model at {}".format(revision, model_id))
revision = None
# Download the model from the Hub (HUGGING_FACE_HUB_TOKEN must be set for a private or gated model)
# Note that the model may already be present in the cache.
config = AutoConfig.from_pretrained(model_id, revision=revision)
neuron_config = getattr(config, "neuron", None)
if neuron_config is not None:
if os.path.isdir(model_id):
return model_id
# Prefetch the neuron model from the Hub
logger.info(f"Fetching revision [{revision}] for neuron model {model_id} under {HF_HUB_CACHE}")
log_cache_size()
return snapshot_download(model_id, revision=revision, ignore_patterns="*.bin")
# Model needs to be exported: look for compatible cached entries on the hub
export_kwargs = get_export_kwargs_from_env()
export_config = NeuronModelForCausalLM.get_export_config(model_id, config, revision=revision, **export_kwargs)
neuron_config = export_config.neuron
if not is_cached(model_id, neuron_config):
hub_cache_url = "https://huggingface.co/aws-neuron/optimum-neuron-cache"
neuron_export_url = "https://huggingface.co/docs/optimum-neuron/main/en/guides/export_model#exporting-neuron-models-using-neuronx-tgi"
error_msg = (
f"No cached version found for {model_id} with {neuron_config}."
f"You can start a discussion to request it on {hub_cache_url}"
f"Alternatively, you can export your own neuron model as explained in {neuron_export_url}"
)
raise ValueError(error_msg)
logger.warning(f"{model_id} is not a neuron model: it will be exported using cached artifacts.")
if os.path.isdir(model_id):
return model_id
# Prefetch weights, tokenizer and generation config so that they are in cache
log_cache_size()
start = time.time()
snapshot_download(model_id, revision=revision, ignore_patterns="*.bin")
end = time.time()
logger.info(f"Model weights fetched in {end - start:.2f} s.")
log_cache_size()
return model_id

View File

@ -0,0 +1,89 @@
import asyncio
from pathlib import Path
from typing import List
from grpc import aio
from grpc_reflection.v1alpha import reflection
from loguru import logger
from .generator import Generator, NeuronGenerator
from .interceptor import ExceptionInterceptor
from .pb import generate_pb2, generate_pb2_grpc
class TextGenerationService(generate_pb2_grpc.TextGenerationServiceServicer):
def __init__(self, generator: Generator, server_urls: List[str]):
self.generator = generator
self.server_urls = server_urls
async def Info(self, request, context):
return self.generator.info
async def Health(self, request, context):
return generate_pb2.HealthResponse()
async def ServiceDiscovery(self, request, context):
return generate_pb2.ServiceDiscoveryResponse(urls=self.server_urls)
async def ClearCache(self, request, context):
if request.HasField("id"):
self.generator.clear(request.id)
else:
self.generator.clear()
return generate_pb2.ClearCacheResponse()
async def FilterBatch(self, request, context):
filtered_batch = self.generator.filter(request.batch_id, request.request_ids)
return generate_pb2.FilterBatchResponse(batch=filtered_batch)
async def Warmup(self, request, context):
max_tokens = self.generator.warmup(request.batch)
return generate_pb2.WarmupResponse(max_supported_total_tokens=max_tokens)
async def Prefill(self, request, context):
generations, batch = self.generator.prefill(request.batch)
return generate_pb2.PrefillResponse(generations=generations, batch=batch)
async def Decode(self, request, context):
generations, batch = self.generator.decode(request.batches)
return generate_pb2.DecodeResponse(generations=generations, batch=batch)
def serve(
model_id: str,
revision: str,
uds_path: Path,
):
async def serve_inner(model_id: str, revision: str):
unix_socket_template = "unix://{}-{}"
local_url = unix_socket_template.format(uds_path, 0)
server_urls = [local_url]
try:
generator = NeuronGenerator.from_pretrained(model_id, revision)
except Exception:
logger.exception("Error when initializing model")
raise
server = aio.server(interceptors=[ExceptionInterceptor()])
generate_pb2_grpc.add_TextGenerationServiceServicer_to_server(
TextGenerationService(generator, server_urls), server
)
SERVICE_NAMES = (
generate_pb2.DESCRIPTOR.services_by_name["TextGenerationService"].full_name,
reflection.SERVICE_NAME,
)
reflection.enable_server_reflection(SERVICE_NAMES, server)
server.add_insecure_port(local_url)
await server.start()
logger.info("Server started at {}".format(local_url))
try:
await server.wait_for_termination()
except KeyboardInterrupt:
logger.info("Signal received. Shutting down")
await server.stop(0)
asyncio.run(serve_inner(model_id, revision))

View File

@ -0,0 +1 @@
pytest_plugins = ["fixtures.model"]

129
backends/neuron/tests/fixtures/model.py vendored Normal file
View File

@ -0,0 +1,129 @@
import copy
import logging
import subprocess
import sys
from tempfile import TemporaryDirectory
import huggingface_hub
import pytest
from transformers import AutoTokenizer
from optimum.neuron import NeuronModelForCausalLM
from optimum.neuron.utils import synchronize_hub_cache
from optimum.neuron.version import __sdk_version__ as sdk_version
from optimum.neuron.version import __version__ as version
logging.basicConfig(
level=logging.INFO,
format="[%(asctime)s] %(levelname)s [%(filename)s.%(funcName)s:%(lineno)d] %(message)s",
stream=sys.stdout,
)
logger = logging.getLogger(__file__)
OPTIMUM_CACHE_REPO_ID = "optimum-internal-testing/neuron-testing-cache"
# All model configurations below will be added to the neuron_model_config fixture
MODEL_CONFIGURATIONS = {
"gpt2": {
"model_id": "gpt2",
"export_kwargs": {"batch_size": 4, "sequence_length": 1024, "num_cores": 2, "auto_cast_type": "fp16"},
},
"llama": {
"model_id": "NousResearch/Hermes-2-Theta-Llama-3-8B",
"export_kwargs": {"batch_size": 4, "sequence_length": 2048, "num_cores": 2, "auto_cast_type": "fp16"},
},
"mistral": {
"model_id": "optimum/mistral-1.1b-testing",
"export_kwargs": {"batch_size": 4, "sequence_length": 4096, "num_cores": 2, "auto_cast_type": "bf16"},
},
"qwen2": {
"model_id": "Qwen/Qwen2.5-0.5B",
"export_kwargs": {"batch_size": 4, "sequence_length": 4096, "num_cores": 2, "auto_cast_type": "fp16"},
},
"granite": {
"model_id": "ibm-granite/granite-3.1-2b-instruct",
"export_kwargs": {"batch_size": 4, "sequence_length": 4096, "num_cores": 2, "auto_cast_type": "bf16"},
},
}
def get_hub_neuron_model_id(config_name: str):
return f"optimum-internal-testing/neuron-testing-{version}-{sdk_version}-{config_name}"
def export_model(model_id, export_kwargs, neuron_model_path):
export_command = ["optimum-cli", "export", "neuron", "-m", model_id, "--task", "text-generation"]
for kwarg, value in export_kwargs.items():
export_command.append(f"--{kwarg}")
export_command.append(str(value))
export_command.append(neuron_model_path)
logger.info(f"Exporting {model_id} with {export_kwargs}")
try:
subprocess.run(export_command, check=True)
except subprocess.CalledProcessError as e:
raise ValueError(f"Failed to export model: {e}")
@pytest.fixture(scope="session", params=MODEL_CONFIGURATIONS.keys())
def neuron_model_config(request):
"""Expose a pre-trained neuron model
The fixture first makes sure the following model artifacts are present on the hub:
- exported neuron model under optimum-internal-testing/neuron-testing-<version>-<name>,
- cached artifacts under optimum-internal-testing/neuron-testing-cache.
If not, it will export the model and push it to the hub.
It then fetches the model locally and return a dictionary containing:
- a configuration name,
- the original model id,
- the export parameters,
- the neuron model id,
- the neuron model local path.
For each exposed model, the local directory is maintained for the duration of the
test session and cleaned up afterwards.
The hub model artifacts are never cleaned up and persist accross sessions.
They must be cleaned up manually when the optimum-neuron version changes.
"""
config_name = request.param
model_config = copy.deepcopy(MODEL_CONFIGURATIONS[request.param])
model_id = model_config["model_id"]
export_kwargs = model_config["export_kwargs"]
neuron_model_id = get_hub_neuron_model_id(config_name)
with TemporaryDirectory() as neuron_model_path:
hub = huggingface_hub.HfApi()
if hub.repo_exists(neuron_model_id):
logger.info(f"Fetching {neuron_model_id} from the HuggingFace hub")
hub.snapshot_download(neuron_model_id, local_dir=neuron_model_path)
else:
export_model(model_id, export_kwargs, neuron_model_path)
tokenizer = AutoTokenizer.from_pretrained(model_id)
tokenizer.save_pretrained(neuron_model_path)
del tokenizer
# Create the test model on the hub
hub.create_repo(neuron_model_id, private=True)
hub.upload_folder(
folder_path=neuron_model_path,
repo_id=neuron_model_id,
ignore_patterns=[NeuronModelForCausalLM.CHECKPOINT_DIR + "/*"],
)
# Make sure it is cached
synchronize_hub_cache(cache_repo_id=OPTIMUM_CACHE_REPO_ID)
# Add dynamic parameters to the model configuration
model_config["neuron_model_path"] = neuron_model_path
model_config["neuron_model_id"] = neuron_model_id
# Also add model configuration name to allow tests to adapt their expectations
model_config["name"] = config_name
# Yield instead of returning to keep a reference to the temporary directory.
# It will go out of scope and be released only once all tests needing the fixture
# have been completed.
logger.info(f"{config_name} ready for testing ...")
yield model_config
logger.info(f"Done with {config_name}")
@pytest.fixture(scope="module")
def neuron_model_path(neuron_model_config):
yield neuron_model_config["neuron_model_path"]

View File

@ -0,0 +1,25 @@
import os
from argparse import ArgumentParser
from huggingface_hub import HfApi
def main():
parser = ArgumentParser()
parser.add_argument("--yes", action="store_true", default=False)
args = parser.parse_args()
api = HfApi()
models = api.list_models(search="optimum-internal-testing/neuron-tgi-testing")
for model in models:
if args.yes:
delete = True
else:
answer = input(f"Do you want to delete {model.id} [y/N] ?")
delete = (answer == "y")
if delete:
api.delete_repo(model.id)
print(f"Deleted {model.id}.")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,2 @@
[pytest]
asyncio_mode = auto

View File

@ -0,0 +1,19 @@
# Copyright 2023 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
text-generation >= 0.6.0
pytest >= 7.4.0
pytest-asyncio >= 0.21.1
requests < 2.32.0
docker >= 6.1.3
Levenshtein

View File

@ -0,0 +1,149 @@
from text_generation_server.generator import NeuronGenerator
from text_generation_server.pb.generate_pb2 import (
Batch,
NextTokenChooserParameters,
Request,
StoppingCriteriaParameters,
)
def create_request(
id: int,
inputs: str,
truncate: int = 0,
max_new_tokens: int = 20,
do_sample: bool = False,
top_k: int = 50,
top_p: float = 0.9,
temperature: float = 1.0,
seed: int = 42,
repetition_penalty: float = 1.0,
):
parameters = NextTokenChooserParameters(
temperature=temperature,
top_k=top_k,
top_p=top_p,
do_sample=do_sample,
seed=seed,
repetition_penalty=repetition_penalty,
)
stopping_parameters = StoppingCriteriaParameters(max_new_tokens=max_new_tokens)
return Request(
id=id, inputs=inputs, truncate=truncate, parameters=parameters, stopping_parameters=stopping_parameters
)
def check_prefill(input_text, expected_token_id, expected_token_text, do_sample, batch_size, model_path):
"""Verify that a prefill for a single request generates the expected output."""
generator = NeuronGenerator.from_pretrained(model_path)
assert generator.model.batch_size >= batch_size
requests = []
max_new_tokens = 20
for i in range(batch_size):
requests.append(create_request(id=0, inputs=input_text, do_sample=do_sample, max_new_tokens=max_new_tokens))
# Let's be pessimistic when estimating max_tokens
batch_size * (len(input_text) + max_new_tokens)
max_length = generator.model.max_length
batch = Batch(id=0, requests=requests, size=batch_size, max_tokens=batch_size * max_length)
generations, next_batch = generator.prefill(batch)
assert next_batch.size == batch_size
# Whatever was passed as max_tokens, the server will correct it
# because of static batching
assert next_batch.max_tokens == batch_size * max_length
assert len(generations) == batch_size
for g in generations:
tokens = g.tokens
assert tokens.ids == [expected_token_id]
assert tokens.texts == [expected_token_text]
def check_decode_single(input_text, max_new_tokens, generated_text, do_sample, model_path):
"""Verify that a decoding for a single request generates the expected output."""
generator = NeuronGenerator.from_pretrained(model_path)
request = create_request(id=0, inputs=input_text, max_new_tokens=max_new_tokens, do_sample=do_sample)
max_length = generator.model.max_length
batch = Batch(id=0, requests=[request], size=1, max_tokens=max_length)
generations, next_batch = generator.prefill(batch)
# We already generated one token: call decode max_new_tokens - 1 times
for _ in range(max_new_tokens - 1):
assert next_batch.size == 1
assert next_batch.max_tokens == max_length
assert len(generations) == 1
assert len(generations[0].tokens.ids) == 1
generations, next_batch = generator.decode([next_batch])
assert next_batch is None
assert len(generations) == 1
output = generations[0].generated_text
assert output.generated_tokens == max_new_tokens
assert output.finish_reason == 0
assert output.text == generated_text
def check_decode_multiple(model_path):
"""Verify that two requests added to the batch at different generation steps
generate the same outputs (continuous batching).
"""
generator = NeuronGenerator.from_pretrained(model_path)
assert generator.model.batch_size > 1
input_text = "Once upon a time"
max_new_tokens = 20
# Prefill a single request, remembering the generated token
tokens = {0: [], 1: []}
request = create_request(id=0, inputs=input_text, max_new_tokens=max_new_tokens)
max_length = generator.model.max_length
batch = Batch(id=0, requests=[request], size=1, max_tokens=max_length)
generations, next_batch = generator.prefill(batch)
assert next_batch.size == 1
assert len(generations) == 1
g = generations[0]
tokens[g.request_id].append(g.tokens.ids[0])
assert len(tokens[0]) == 1
# Decode a few tokens
gen_tokens = 4
for _ in range(gen_tokens - 1):
generations, next_batch = generator.decode([next_batch])
assert len(generations) == 1
g = generations[0]
tokens[g.request_id].append(g.tokens.ids[0])
assert len(tokens[0]) == gen_tokens
assert next_batch.size == 1
# Add a second request
request = create_request(id=1, inputs=input_text, max_new_tokens=max_new_tokens)
batch = Batch(id=1, requests=[request], size=1, max_tokens=max_length)
generations, next_batch_1 = generator.prefill(batch)
assert next_batch_1.size == 1
# We should have generated only a single token
assert len(generations) == 1
g = generations[0]
tokens[g.request_id].append(g.tokens.ids[0])
assert len(tokens[0]) == gen_tokens
assert len(tokens[1]) == 1
# Decode more tokens until we reach the maximum for the first request
batches = [next_batch, next_batch_1]
for _ in range(max_new_tokens - gen_tokens):
generations, next_batch = generator.decode(batches)
for g in generations:
tokens[g.request_id].append(g.tokens.ids[0])
batches = [next_batch]
# Verify we now only have one pending request
assert next_batch.size == 1
assert len(tokens[0]) == max_new_tokens
assert len(tokens[1]) == max_new_tokens - gen_tokens + 1
# Verify we have the output for the first request
for g in generations:
if g.request_id == 0:
output = g.generated_text
assert output.text != ""
assert output.generated_tokens == max_new_tokens
generated_text = output.text
# Continue decoding until the end of the second request
for _ in range(gen_tokens - 1):
generations, next_batch = generator.decode([next_batch])
assert len(generations) == 1
g = generations[0]
tokens[g.request_id].append(g.tokens.ids[0])
assert next_batch is None
output = generations[0].generated_text
assert output.generated_tokens == max_new_tokens
assert tokens[0] == tokens[1]
assert output.text == generated_text

View File

@ -0,0 +1,74 @@
from helpers import create_request
from text_generation_server.generator import NeuronGenerator
from text_generation_server.pb.generate_pb2 import Batch
def test_continuous_batching_two_requests(neuron_model_config):
"""Verify that two requests added to the batch at different generation steps
generate the same outputs (continuous batching).
"""
neuron_model_path = neuron_model_config["neuron_model_path"]
generator = NeuronGenerator.from_pretrained(neuron_model_path)
assert generator.model.batch_size > 1
input_text = "Once upon a time"
max_new_tokens = 20
# Prefill a single request, remembering the generated token
tokens = {0: [], 1: []}
request = create_request(id=0, inputs=input_text, max_new_tokens=max_new_tokens)
max_length = generator.model.max_length
batch = Batch(id=0, requests=[request], size=1, max_tokens=max_length)
generations, next_batch = generator.prefill(batch)
assert next_batch.size == 1
assert len(generations) == 1
g = generations[0]
tokens[g.request_id].append(g.tokens.ids[0])
assert len(tokens[0]) == 1
# Decode a few tokens
gen_tokens = 4
for _ in range(gen_tokens - 1):
generations, next_batch = generator.decode([next_batch])
assert len(generations) == 1
g = generations[0]
tokens[g.request_id].append(g.tokens.ids[0])
assert len(tokens[0]) == gen_tokens
assert next_batch.size == 1
# Add a second request
request = create_request(id=1, inputs=input_text, max_new_tokens=max_new_tokens)
batch = Batch(id=1, requests=[request], size=1, max_tokens=max_length)
generations, next_batch_1 = generator.prefill(batch)
assert next_batch_1.size == 1
# We should have generated only a single token
assert len(generations) == 1
g = generations[0]
tokens[g.request_id].append(g.tokens.ids[0])
assert len(tokens[0]) == gen_tokens
assert len(tokens[1]) == 1
# Decode more tokens until we reach the maximum for the first request
batches = [next_batch, next_batch_1]
for _ in range(max_new_tokens - gen_tokens):
generations, next_batch = generator.decode(batches)
for g in generations:
tokens[g.request_id].append(g.tokens.ids[0])
batches = [next_batch]
# Verify we now only have one pending request
assert next_batch.size == 1
assert len(tokens[0]) == max_new_tokens
assert len(tokens[1]) == max_new_tokens - gen_tokens + 1
# Verify we have the output for the first request
for g in generations:
if g.request_id == 0:
output = g.generated_text
assert output.text != ""
assert output.generated_tokens == max_new_tokens
generated_text = output.text
# Continue decoding until the end of the second request
for _ in range(gen_tokens - 1):
generations, next_batch = generator.decode([next_batch])
assert len(generations) == 1
g = generations[0]
tokens[g.request_id].append(g.tokens.ids[0])
assert next_batch is None
output = generations[0].generated_text
assert output.generated_tokens == max_new_tokens
assert tokens[0] == tokens[1]
assert output.text == generated_text

View File

@ -0,0 +1,55 @@
from helpers import create_request
from text_generation_server.generator import NeuronGenerator
from text_generation_server.pb.generate_pb2 import Batch
def test_decode(neuron_model_config):
"""Verify that a decoding for a single request generates the expected output."""
config_name = neuron_model_config["name"]
neuron_model_path = neuron_model_config["neuron_model_path"]
generator = NeuronGenerator.from_pretrained(neuron_model_path)
for do_sample in [True, False]:
mode = "sample" if do_sample else "greedy"
print(f"{config_name}[{mode}]")
_test_decode(config_name, generator, do_sample)
generator.clear()
def _test_decode(config_name, generator, do_sample):
input_text = "It was a bright cold day in April, and the clocks were striking thirteen."
max_new_tokens = 20
request = create_request(id=0, inputs=input_text, max_new_tokens=max_new_tokens, do_sample=do_sample)
max_length = generator.model.max_length
batch = Batch(id=0, requests=[request], size=1, max_tokens=max_length)
generations, next_batch = generator.prefill(batch)
# We already generated one token: call decode max_new_tokens - 1 times
for _ in range(max_new_tokens - 1):
assert next_batch.size == 1
assert next_batch.max_tokens == max_length
assert len(generations) == 1
assert len(generations[0].tokens.ids) == 1
generations, next_batch = generator.decode([next_batch])
assert next_batch is None
assert len(generations) == 1
output = generations[0].generated_text
assert output.generated_tokens == max_new_tokens
assert output.finish_reason == 0
if do_sample:
expected_text = {
"gpt2": " The sun was set",
"llama": "George Orwell, 1984",
"mistral": "The sky was",
"qwen2": " A young woman with",
"granite": "1984, George Orwell",
}[config_name]
assert expected_text in output.text
else:
print(output.text)
expected_text = {
"gpt2": '\n\n"I\'m going to go to bed," I said.\n\n"I\'m going',
"llama": " George Orwells classic dystopian novel, 1984, begins with this ominous sentence. The story",
"mistral": "\nThe clocks were striking thirteen.\nThe clocks were striking thirteen.",
"qwen2": " I was sitting in my room, staring at the ceiling, when the door opened and in came a",
"granite": "\n\nThis opening line from George Orwell's dystopian novel \"198",
}[config_name]
assert output.text == expected_text

View File

@ -0,0 +1,61 @@
import pytest
import torch
from text_generation_server.generator import Slot
from text_generation_server.pb.generate_pb2 import Request
from transformers import AutoTokenizer, GenerationConfig
TOKENIZERS = ["NousResearch/Llama-2-7b-hf", "gpt2"]
@pytest.fixture(params=TOKENIZERS)
def tokenizer(request):
t = AutoTokenizer.from_pretrained(request.param)
t.padding_side = "left"
t.pad_token_id = t.eos_token_id
return t
@pytest.mark.parametrize(
"input_text, generated_text",
[
[
"It was a bright cold day in April, and the clocks were striking thirteen.",
" Winston Smith, his chin nuzzled into his breast in an effort to escape the vile wind,"
" slipped quickly through the glass doors of Victory Mansions, though not quickly enough"
" to prevent a swirl of gritty dust from entering along with him.",
],
["This sentence is written in chinese:", "我很感谢你的热情"],
["Some text might contain a lot of emojis like 😃", "😍💪 👉 👀"],
],
ids=["spaces", "chinese-utf8", "emojis"],
)
def test_decode_streaming(tokenizer, input_text, generated_text):
slot = Slot(0, tokenizer)
request = Request(id=0, inputs=input_text)
slot.assign(0, request, GenerationConfig())
assert slot.cached_text == input_text
inputs = tokenizer(input_text, padding="max_length", max_length=len(input_text) + 1, return_tensors="pt")
input_ids = inputs["input_ids"][0]
attention_mask = inputs["attention_mask"][0]
generated_tokens = tokenizer(generated_text, add_special_tokens=False)["input_ids"]
# We need to regenerate the full text as the tokenizer might change it (extra spaces might be added)
all_input_ids = torch.cat([input_ids, torch.tensor(generated_tokens)])
full_text = tokenizer.decode(all_input_ids, skip_special_tokens=True)
regenerated_text = full_text[len(input_text) :]
# Initialize the slot with the inputs
slot.reset(input_ids, attention_mask, selector=None)
assert slot.generated_tokens == 0
# Simulate an iterative generation (i.e. don't call select and use known tokens instead)
decoded_text = ""
for i in range(len(generated_tokens)):
text = slot.append(generated_tokens[i])
assert slot.generated_tokens == i + 1
decoded_text += text
assert decoded_text == regenerated_text

View File

@ -0,0 +1,10 @@
from text_generation_server.generator import NeuronGenerator
def test_info(neuron_model_path):
generator = NeuronGenerator.from_pretrained(neuron_model_path)
info = generator.info
assert info.requires_padding is True
assert info.device_type == "xla"
assert info.window_size == 0
assert info.speculate == 0

View File

@ -0,0 +1,89 @@
from helpers import create_request
from text_generation_server.generator import NeuronGenerator
from text_generation_server.pb.generate_pb2 import Batch
def test_prefill(neuron_model_config):
"""Verify that a prefill for a single request generates the expected output."""
config_name = neuron_model_config["name"]
neuron_model_path = neuron_model_config["neuron_model_path"]
generator = NeuronGenerator.from_pretrained(neuron_model_path)
max_batch_size = 4
assert generator.model.batch_size >= max_batch_size
for num_requests in [1, max_batch_size]:
for do_sample in [True, False]:
mode = "sample" if do_sample else "greedy"
print(f"[{mode}]: {num_requests} requests")
_test_prefill(config_name, generator, num_requests, do_sample)
generator.clear()
def _test_prefill(config_name, generator, batch_size, do_sample):
requests = []
max_new_tokens = 20
input_text = "It was a bright cold day in April, and the clocks were striking thirteen."
for i in range(batch_size):
requests.append(create_request(id=i, inputs=input_text, do_sample=do_sample, max_new_tokens=max_new_tokens))
# Let's be pessimistic when estimating max_tokens
max_length = generator.model.max_length
batch = Batch(id=0, requests=requests, size=batch_size, max_tokens=batch_size * max_length)
generations, next_batch = generator.prefill(batch)
assert next_batch.size == batch_size
# Whatever was passed as max_tokens, the server will correct it
# because of static batching
assert next_batch.max_tokens == batch_size * max_length
assert len(generations) == batch_size
if do_sample:
expectations = {
"gpt2": [383, " The"],
"llama": [10058, " George"],
"mistral": [450, " The"],
"qwen2": [362, " A"],
"granite": [308, " ("],
}[config_name]
else:
expectations = {
"gpt2": [198, "\n"],
"llama": [10058, " George"],
"mistral": [13, "\n"],
"qwen2": [358, " I"],
"granite": [203, "\n"],
}[config_name]
for g in generations:
tokens = g.tokens
assert tokens.ids[0] == expectations[0]
assert tokens.texts[0] == expectations[1]
def test_prefill_truncate(neuron_model_config):
config_name = neuron_model_config["name"]
neuron_model_path = neuron_model_config["neuron_model_path"]
generator = NeuronGenerator.from_pretrained(neuron_model_path)
batch_size = generator.model.batch_size
# We apply truncation to all requests but the first one
truncate = [
None,
] + [i * 3 for i in range(1, batch_size)]
input_text = (
"Two gin-scented tears trickled down the sides of his nose."
" But it was all right, everything was all right, the struggle was finished."
" He had won the victory over himself. He loved Big Brother."
)
requests = []
for i in range(batch_size):
requests.append(create_request(id=i, inputs=input_text, truncate=truncate[i]))
max_length = generator.model.max_length
batch = Batch(id=0, requests=requests, size=batch_size, max_tokens=batch_size * max_length)
generations, _ = generator.prefill(batch)
# Even if the input text is identical for all requests, the first generated token might
# be different because of the truncation
expectations = {
"gpt2": [" He", " He", "\n", " He"],
"llama": ["", " The", " He", " He"],
"mistral": [" He", "\n", " He", " He"],
"qwen2": [" He", " The", " He", " He"],
"granite": ["\n", "\n", " I", " He"],
}[config_name]
for i, g in enumerate(generations):
tokens = g.tokens
assert tokens.texts[0] == expectations[i]

View File

@ -0,0 +1,16 @@
#!/bin/bash
set -e -o pipefail -u
export ENV_FILEPATH=$(mktemp)
trap "rm -f ${ENV_FILEPATH}" EXIT
touch $ENV_FILEPATH
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
${SCRIPT_DIR}/tgi_env.py $@
source $ENV_FILEPATH
exec text-generation-launcher $@

226
backends/neuron/tgi_env.py Executable file
View File

@ -0,0 +1,226 @@
#!/usr/bin/env python
import argparse
import logging
import os
import sys
from typing import Any, Dict, List, Optional
from huggingface_hub import constants
from transformers import AutoConfig
from optimum.neuron.modeling_decoder import get_available_cores
from optimum.neuron.utils import get_hub_cached_entries
from optimum.neuron.utils.version_utils import get_neuronxcc_version
logger = logging.getLogger(__name__)
tgi_router_env_vars = ["MAX_BATCH_SIZE", "MAX_TOTAL_TOKENS", "MAX_INPUT_TOKENS", "MAX_BATCH_PREFILL_TOKENS"]
tgi_server_env_vars = ["HF_NUM_CORES", "HF_AUTO_CAST_TYPE"]
env_config_peering = [
("MAX_BATCH_SIZE", "batch_size"),
("MAX_TOTAL_TOKENS", "sequence_length"),
("HF_AUTO_CAST_TYPE", "auto_cast_type"),
("HF_NUM_CORES", "num_cores"),
]
# By the end of this script all env var should be specified properly
env_vars = tgi_server_env_vars + tgi_router_env_vars
available_cores = get_available_cores()
neuronxcc_version = get_neuronxcc_version()
def parse_cmdline_and_set_env(argv: List[str] = None) -> argparse.Namespace:
parser = argparse.ArgumentParser()
if not argv:
argv = sys.argv
# All these are params passed to tgi and intercepted here
parser.add_argument(
"--max-input-tokens", type=int, default=os.getenv("MAX_INPUT_TOKENS", os.getenv("MAX_INPUT_LENGTH", 0))
)
parser.add_argument("--max-total-tokens", type=int, default=os.getenv("MAX_TOTAL_TOKENS", 0))
parser.add_argument("--max-batch-size", type=int, default=os.getenv("MAX_BATCH_SIZE", 0))
parser.add_argument("--max-batch-prefill-tokens", type=int, default=os.getenv("MAX_BATCH_PREFILL_TOKENS", 0))
parser.add_argument("--model-id", type=str, default=os.getenv("MODEL_ID"))
parser.add_argument("--revision", type=str, default=os.getenv("REVISION"))
args = parser.parse_known_args(argv)[0]
if not args.model_id:
raise Exception("No model id provided ! Either specify it using --model-id cmdline or MODEL_ID env var")
# Override env with cmdline params
os.environ["MODEL_ID"] = args.model_id
# Set all tgi router and tgi server values to consistent values as early as possible
# from the order of the parser defaults, the tgi router value can override the tgi server ones
if args.max_total_tokens > 0:
os.environ["MAX_TOTAL_TOKENS"] = str(args.max_total_tokens)
if args.max_input_tokens > 0:
os.environ["MAX_INPUT_TOKENS"] = str(args.max_input_tokens)
if args.max_batch_size > 0:
os.environ["MAX_BATCH_SIZE"] = str(args.max_batch_size)
if args.max_batch_prefill_tokens > 0:
os.environ["MAX_BATCH_PREFILL_TOKENS"] = str(args.max_batch_prefill_tokens)
if args.revision:
os.environ["REVISION"] = str(args.revision)
return args
def neuron_config_to_env(neuron_config):
with open(os.environ["ENV_FILEPATH"], "w") as f:
for env_var, config_key in env_config_peering:
f.write("export {}={}\n".format(env_var, neuron_config[config_key]))
max_input_tokens = os.getenv("MAX_INPUT_TOKENS")
if not max_input_tokens:
max_input_tokens = int(neuron_config["sequence_length"]) // 2
if max_input_tokens == 0:
raise Exception("Model sequence length should be greater than 1")
f.write("export MAX_INPUT_TOKENS={}\n".format(max_input_tokens))
max_batch_prefill_tokens = os.getenv("MAX_BATCH_PREFILL_TOKENS")
if not max_batch_prefill_tokens:
max_batch_prefill_tokens = int(neuron_config["batch_size"]) * int(max_input_tokens)
f.write("export MAX_BATCH_PREFILL_TOKENS={}\n".format(max_batch_prefill_tokens))
def sort_neuron_configs(dictionary):
return -dictionary["num_cores"], -dictionary["batch_size"]
def lookup_compatible_cached_model(model_id: str, revision: Optional[str]) -> Optional[Dict[str, Any]]:
# Reuse the same mechanic as the one in use to configure the tgi server part
# The only difference here is that we stay as flexible as possible on the compatibility part
entries = get_hub_cached_entries(model_id, "inference")
logger.debug("Found %d cached entries for model %s, revision %s", len(entries), model_id, revision)
all_compatible = []
for entry in entries:
if check_env_and_neuron_config_compatibility(entry, check_compiler_version=True):
all_compatible.append(entry)
if not all_compatible:
logger.debug(
"No compatible cached entry found for model %s, env %s, available cores %s, neuronxcc version %s",
model_id,
get_env_dict(),
available_cores,
neuronxcc_version,
)
return None
logger.info("%d compatible neuron cached models found", len(all_compatible))
all_compatible = sorted(all_compatible, key=sort_neuron_configs)
entry = all_compatible[0]
return entry
def check_env_and_neuron_config_compatibility(neuron_config: Dict[str, Any], check_compiler_version: bool) -> bool:
logger.debug(
"Checking the provided neuron config %s is compatible with the local setup and provided environment",
neuron_config,
)
# Local setup compat checks
if neuron_config["num_cores"] > available_cores:
logger.debug("Not enough neuron cores available to run the provided neuron config")
return False
if check_compiler_version and neuron_config["compiler_version"] != neuronxcc_version:
logger.debug(
"Compiler version conflict, the local one (%s) differs from the one used to compile the model (%s)",
neuronxcc_version,
neuron_config["compiler_version"],
)
return False
for env_var, config_key in env_config_peering:
neuron_config_value = str(neuron_config[config_key])
env_value = os.getenv(env_var, str(neuron_config_value))
if env_value != neuron_config_value:
logger.debug(
"The provided env var '%s' and the neuron config '%s' param differ (%s != %s)",
env_var,
config_key,
env_value,
neuron_config_value,
)
return False
max_input_tokens = int(os.getenv("MAX_INPUT_TOKENS", os.getenv("MAX_INPUT_LENGTH", 0)))
if max_input_tokens > 0:
sequence_length = neuron_config["sequence_length"]
if max_input_tokens >= sequence_length:
logger.debug(
"Specified max input tokens is not compatible with config sequence length ( %s >= %s)",
max_input_tokens,
sequence_length,
)
return False
return True
def get_env_dict() -> Dict[str, str]:
d = {}
for k in env_vars:
d[k] = os.getenv(k)
return d
def main():
"""
This script determines proper default TGI env variables for the neuron precompiled models to
work properly
:return:
"""
args = parse_cmdline_and_set_env()
for env_var in env_vars:
if not os.getenv(env_var):
break
else:
logger.info("All env vars %s already set, skipping, user know what they are doing", env_vars)
sys.exit(0)
cache_dir = constants.HF_HUB_CACHE
logger.info("Cache dir %s, model %s", cache_dir, args.model_id)
config = AutoConfig.from_pretrained(args.model_id, revision=args.revision)
neuron_config = getattr(config, "neuron", None)
if neuron_config is not None:
compatible = check_env_and_neuron_config_compatibility(neuron_config, check_compiler_version=False)
if not compatible:
env_dict = get_env_dict()
msg = (
"Invalid neuron config and env. Config {}, env {}, available cores {}, neuronxcc version {}"
).format(neuron_config, env_dict, available_cores, neuronxcc_version)
logger.error(msg)
raise Exception(msg)
else:
neuron_config = lookup_compatible_cached_model(args.model_id, args.revision)
if not neuron_config:
msg = ("No compatible neuron config found. Provided env {}, available cores {}, neuronxcc version {}").format(
get_env_dict(), available_cores, neuronxcc_version
)
logger.error(msg)
raise Exception(msg)
neuron_config_to_env(neuron_config)
if __name__ == "__main__":
main()

View File

@ -50,6 +50,8 @@
title: Train Medusa title: Train Medusa
title: Tutorials title: Tutorials
- sections: - sections:
- local: backends/neuron
title: Neuron
- local: backends/trtllm - local: backends/trtllm
title: TensorRT-LLM title: TensorRT-LLM
- local: backends/llamacpp - local: backends/llamacpp

View File

@ -0,0 +1,179 @@
# Neuron backend for AWS Trainium and Inferentia
The Neuron backend allows the deployment of TGI on AWS Trainium and Inferentia family of chips.
The following hardware targets are supported:
- Trainium 1,
- Inferentia 2.
## Features
The basic TGI features are supported:
- continuous batching,
- token streaming,
- greedy search and multinomial sampling using [transformers](https://huggingface.co/docs/transformers/generation_strategies#customize-text-generation).
## Deploy the service from the Hugging Face hub
The simplest way to deploy the NeuronX TGI service for a specific model is to follow the
deployment instructions in the model card:
- click on the "Deploy" button on the right,
- select your deployment service ("Inference Endpoints" and "SageMaker" are supported),
- select "AWS Trainum & Inferentia",
- follow the instructions.
## Deploy the service on a dedicated host
The service is launched simply by running the text-generation-inference container with two sets of parameters:
```
docker run <system_parameters> ghcr.io/huggingface/text-generation-inference:3.1.0-neuron <service_parameters>
```
- system parameters are used to map ports, volumes and devices between the host and the service,
- service parameters are forwarded to the `text-generation-launcher`.
When deploying a service, you will need a pre-compiled Neuron model. The Neuron TGI backend supports two main modes of operation:
- you can either deploy the service on a model that has already been exported to Neuron,
- or alternatively you can take advantage of the Neuron Model Cache to export your own model.
### Common system parameters
Whenever you launch a TGI service, we highly recommend you to mount a shared volume mounted as `/data` in the container: this is where
the models will be cached to speed up further instantiations of the service.
Note also that enough neuron devices should be made visible to the container, knowing that each neuron device has two cores (so when deploying on two cores you need to expose at least one device).
The recommended way to expose a device in a production environment is to use explicitly the `--device` option (e.g `--device /dev/neuron0`) repeated as many time as there are devices to be exposed.
Note: alternatively, for a quick local test it is also possible to launch the service in `privileged` mode to get access to all neuron devices.
Finally, you might want to export the `HF_TOKEN` if you want to access gated repositories.
Here is an example of a service instantiation exposing only the first device:
```
docker run -p 8080:80 \
-v $(pwd)/data:/data \
--device=/dev/neuron0 \
-e HF_TOKEN=${HF_TOKEN} \
ghcr.io/huggingface/text-generation-inference:<VERSION>-neuron \
<service_parameters>
```
### Using a standard model from the 🤗 [HuggingFace Hub](https://huggingface.co/aws-neuron) (recommended)
We maintain a Neuron Model Cache of the most popular architecture and deployment parameters under [aws-neuron/optimum-neuron-cache](https://huggingface.co/aws-neuron/optimum-neuron-cache).
If you just want to try the service quickly using a model without exporting it to Neuron first, it is thus still possible, pending some conditions:
- you must specify the export parameters when launching the service (or use default parameters),
- the model configuration must be cached.
The snippet below shows how you can deploy a service from a hub standard model:
```
export HF_TOKEN=<YOUR_TOKEN>
docker run -p 8080:80 \
-v $(pwd)/data:/data \
--device=/dev/neuron0 \
--device=/dev/neuron1 \
--device=/dev/neuron2 \
--device=/dev/neuron3 \
-e HF_TOKEN=${HF_TOKEN} \
-e HF_AUTO_CAST_TYPE="fp16" \
-e HF_NUM_CORES=8 \
ghcr.io/huggingface/text-generation-inference:<VERSION>-neuron \
--model-id meta-llama/Meta-Llama-3-8B \
--max-batch-size 1 \
--max-input-length 3164 \
--max-total-tokens 4096
```
### Using a model exported to a local path
Alternatively, you can first [export the model to neuron format](https://huggingface.co/docs/optimum-neuron/main/en/guides/export_model#exporting-neuron-models-using-text-generation-inference) locally.
You can then deploy the service inside the shared volume:
```
docker run -p 8080:80 \
-v $(pwd)/data:/data \
--device=/dev/neuron0 \
--device=/dev/neuron1 \
ghcr.io/huggingface/text-generation-inference:<VERSION>-neuron \
--model-id /data/<neuron_model_path>
```
Note: You don't need to specify any service parameters, as they will all be deduced from the model export configuration. You must however expose enough devices to match the number of cores specified during the export phase.
### Using a neuron model from the 🤗 [HuggingFace Hub](https://huggingface.co/)
The easiest way to share a neuron model inside your organization is to push it on the Hugging Face hub, so that it can be deployed directly without requiring an export.
The snippet below shows how you can deploy a service from a hub neuron model:
```
docker run -p 8080:80 \
-v $(pwd)/data:/data \
--device=/dev/neuron0 \
--device=/dev/neuron1 \
-e HF_TOKEN=${HF_TOKEN} \
ghcr.io/huggingface/text-generation-inference:<VERSION>-neuron \
--model-id <organization>/<neuron-model>
```
### Choosing service parameters
Use the following command to list the available service parameters:
```
docker run ghcr.io/huggingface/text-generation-inference:<VERSION>-neuron --help
```
The configuration of an inference endpoint is always a compromise between throughput and latency: serving more requests in parallel will allow a higher throughput, but it will increase the latency.
The neuron models have static input dimensions `[batch_size, max_length]`.
This adds several restrictions to the following parameters:
- `--max-batch-size` must be set to `batch size`,
- `--max-input-length` must be lower than `max_length`,
- `--max-total-tokens` must be set to `max_length` (it is per-request).
Although not strictly necessary, but important for efficient prefilling:
- `--max-batch-prefill-tokens` should be set to `batch_size` * `max-input-length`.
### Choosing the correct batch size
As seen in the previous paragraph, neuron model static batch size has a direct influence on the endpoint latency and throughput.
Please refer to [text-generation-inference](https://github.com/huggingface/text-generation-inference) for optimization hints.
Note that the main constraint is to be able to fit the model for the specified `batch_size` within the total device memory available
on your instance (16GB per neuron core, with 2 cores per device).
## Query the service
You can query the model using either the `/generate` or `/generate_stream` routes:
```
curl 127.0.0.1:8080/generate \
-X POST \
-d '{"inputs":"What is Deep Learning?","parameters":{"max_new_tokens":20}}' \
-H 'Content-Type: application/json'
```
```
curl 127.0.0.1:8080/generate_stream \
-X POST \
-d '{"inputs":"What is Deep Learning?","parameters":{"max_new_tokens":20}}' \
-H 'Content-Type: application/json'
```
Note: replace 127.0.0.1:8080 with your actual IP address and port.

View File

@ -1,3 +1,4 @@
pytest_plugins = ["fixtures.neuron.service", "fixtures.neuron.model"]
# ruff: noqa: E402 # ruff: noqa: E402
from _pytest.fixtures import SubRequest from _pytest.fixtures import SubRequest
import requests import requests
@ -57,20 +58,37 @@ def pytest_addoption(parser):
parser.addoption( parser.addoption(
"--release", action="store_true", default=False, help="run release tests" "--release", action="store_true", default=False, help="run release tests"
) )
parser.addoption(
"--neuron", action="store_true", default=False, help="run neuron tests"
)
def pytest_configure(config): def pytest_configure(config):
config.addinivalue_line("markers", "release: mark test as a release-only test") config.addinivalue_line("markers", "release: mark test as a release-only test")
config.addinivalue_line("markers", "neuron: mark test as a neuron test")
def pytest_collection_modifyitems(config, items): def pytest_collection_modifyitems(config, items):
if config.getoption("--release"): selectors = []
# --release given in cli: do not skip release tests if not config.getoption("--release"):
return # --release not given in cli: skip release tests
skip_release = pytest.mark.skip(reason="need --release option to run") def skip_release(item):
if "release" in item.keywords:
item.add_marker(pytest.mark.skip(reason="need --release option to run"))
selectors.append(skip_release)
if config.getoption("--neuron"):
def skip_not_neuron(item):
if "neuron" not in item.keywords:
item.add_marker(pytest.mark.skip(reason="incompatible with --neuron option"))
selectors.append(skip_not_neuron)
else:
def skip_neuron(item):
if "neuron" in item.keywords:
item.add_marker(pytest.mark.skip(reason="requires --neuron to run"))
selectors.append(skip_neuron)
for item in items: for item in items:
if "release" in item.keywords: for selector in selectors:
item.add_marker(skip_release) selector(item)
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)

View File

@ -0,0 +1,223 @@
import copy
import logging
import sys
from tempfile import TemporaryDirectory
import huggingface_hub
import pytest
import docker
import hashlib
import os
import tempfile
from docker.errors import NotFound
TEST_ORGANIZATION = "optimum-internal-testing"
TEST_CACHE_REPO_ID = f"{TEST_ORGANIZATION}/neuron-testing-cache"
HF_TOKEN = huggingface_hub.get_token()
logging.basicConfig(
level=logging.INFO,
format="[%(asctime)s] %(levelname)s [%(filename)s.%(funcName)s:%(lineno)d] %(message)s",
stream=sys.stdout,
)
logger = logging.getLogger(__file__)
# All model configurations below will be added to the neuron_model_config fixture
MODEL_CONFIGURATIONS = {
"gpt2": {
"model_id": "gpt2",
"export_kwargs": {"batch_size": 4, "sequence_length": 1024, "num_cores": 2, "auto_cast_type": "fp16"},
},
"llama": {
"model_id": "unsloth/Llama-3.2-1B-Instruct",
"export_kwargs": {"batch_size": 4, "sequence_length": 2048, "num_cores": 2, "auto_cast_type": "fp16"},
},
"mistral": {
"model_id": "optimum/mistral-1.1b-testing",
"export_kwargs": {"batch_size": 4, "sequence_length": 4096, "num_cores": 2, "auto_cast_type": "bf16"},
},
"qwen2": {
"model_id": "Qwen/Qwen2.5-0.5B",
"export_kwargs": {"batch_size": 4, "sequence_length": 4096, "num_cores": 2, "auto_cast_type": "fp16"},
},
"granite": {
"model_id": "ibm-granite/granite-3.1-2b-instruct",
"export_kwargs": {"batch_size": 4, "sequence_length": 4096, "num_cores": 2, "auto_cast_type": "bf16"},
},
}
def get_neuron_backend_hash():
import subprocess
res = subprocess.run(["git", "rev-parse", "--show-toplevel"],
capture_output=True,
text=True)
root_dir = res.stdout.split('\n')[0]
def get_sha(path):
res = subprocess.run(["git", "ls-tree", "HEAD", f"{root_dir}/{path}"],
capture_output=True,
text=True)
# Output of the command is in the form '040000 tree|blob <SHA>\t<path>\n'
sha = res.stdout.split('\t')[0].split(' ')[-1]
return sha.encode()
# We hash both the neuron backends directory and Dockerfile and create a smaller hash out of that
m = hashlib.sha256()
m.update(get_sha('backends/neuron'))
m.update(get_sha('Dockerfile.neuron'))
return m.hexdigest()[:10]
def get_neuron_model_name(config_name: str):
return f"neuron-tgi-testing-{config_name}-{get_neuron_backend_hash()}"
def get_tgi_docker_image():
docker_image = os.getenv("DOCKER_IMAGE", None)
if docker_image is None:
client = docker.from_env()
images = client.images.list(filters={"reference": "text-generation-inference"})
if not images:
raise ValueError("No text-generation-inference image found on this host to run tests.")
docker_image = images[0].tags[0]
return docker_image
def export_model(config_name, model_config, neuron_model_name):
"""Export a neuron model.
The model is exported by a custom image built on the fly from the base TGI image.
This makes sure the exported model and image are aligned and avoids introducing
neuron specific imports in the test suite.
Args:
config_name (`str`):
Used to identify test configurations
model_config (`str`):
The model configuration for export (includes the original model id)
neuron_model_name (`str`):
The name of the exported model on the hub
"""
client = docker.from_env()
env = {"LOG_LEVEL": "info", "CUSTOM_CACHE_REPO": TEST_CACHE_REPO_ID}
if HF_TOKEN is not None:
env["HUGGING_FACE_HUB_TOKEN"] = HF_TOKEN
env["HF_TOKEN"] = HF_TOKEN
# Create a sub-image to export the model to workaround docker dind issues preventing
# to share a volume from the container running tests
model_id = model_config["model_id"]
export_kwargs = model_config["export_kwargs"]
base_image = get_tgi_docker_image()
export_image = f"neuron-tgi-tests-{config_name}-export-img"
logger.info(f"Building temporary image {export_image} from {base_image}")
with tempfile.TemporaryDirectory() as context_dir:
# Create entrypoint
model_path = "/data/neuron_model"
export_command = f"optimum-cli export neuron -m {model_id} --task text-generation"
for kwarg, value in export_kwargs.items():
export_command += f" --{kwarg} {str(value)}"
export_command += f" {model_path}"
entrypoint_content = f"""#!/bin/sh
{export_command}
huggingface-cli repo create --organization {TEST_ORGANIZATION} {neuron_model_name}
huggingface-cli upload {TEST_ORGANIZATION}/{neuron_model_name} {model_path} --exclude *.bin *.safetensors
optimum-cli neuron cache synchronize --repo_id {TEST_CACHE_REPO_ID}
"""
with open(os.path.join(context_dir, "entrypoint.sh"), "wb") as f:
f.write(entrypoint_content.encode("utf-8"))
f.flush()
# Create Dockerfile
docker_content = f"""
FROM {base_image}
COPY entrypoint.sh /export-entrypoint.sh
RUN chmod +x /export-entrypoint.sh
ENTRYPOINT ["/export-entrypoint.sh"]
"""
with open(os.path.join(context_dir, "Dockerfile"), "wb") as f:
f.write(docker_content.encode("utf-8"))
f.flush()
image, logs = client.images.build(path=context_dir, dockerfile=f.name, tag=export_image)
logger.info("Successfully built image %s", image.id)
logger.debug("Build logs %s", logs)
try:
container = client.containers.run(
export_image,
environment=env,
auto_remove=True,
detach=False,
devices=["/dev/neuron0"],
shm_size="1G",
)
logger.info(f"Successfully exported model for config {config_name}")
container.logs()
except Exception as e:
logger.exception(f"An exception occurred while running container: {e}.")
pass
finally:
# Cleanup the export image
logger.info("Cleaning image %s", image.id)
try:
image.remove(force=True)
except NotFound:
pass
except Exception as e:
logger.error("Error while removing image %s, skipping", image.id)
logger.exception(e)
@pytest.fixture(scope="session", params=MODEL_CONFIGURATIONS.keys())
def neuron_model_config(request):
"""Expose a pre-trained neuron model
The fixture first makes sure the following model artifacts are present on the hub:
- exported neuron model under optimum-internal-testing/neuron-testing-<name>-<version>,
- cached artifacts under optimum-internal-testing/neuron-testing-cache.
If not, it will export the model and push it to the hub.
It then fetches the model locally and return a dictionary containing:
- a configuration name,
- the original model id,
- the export parameters,
- the neuron model id,
- the neuron model local path.
For each exposed model, the local directory is maintained for the duration of the
test session and cleaned up afterwards.
The hub model artifacts are never cleaned up and persist accross sessions.
They must be cleaned up manually when the optimum-neuron version changes.
"""
config_name = request.param
model_config = copy.deepcopy(MODEL_CONFIGURATIONS[request.param])
neuron_model_name = get_neuron_model_name(config_name)
neuron_model_id = f"{TEST_ORGANIZATION}/{neuron_model_name}"
with TemporaryDirectory() as neuron_model_path:
hub = huggingface_hub.HfApi()
if not hub.repo_exists(neuron_model_id):
# Export the model first
export_model(config_name, model_config, neuron_model_name)
logger.info(f"Fetching {neuron_model_id} from the HuggingFace hub")
hub.snapshot_download(neuron_model_id, local_dir=neuron_model_path)
# Add dynamic parameters to the model configuration
model_config["neuron_model_path"] = neuron_model_path
model_config["neuron_model_id"] = neuron_model_id
# Also add model configuration name to allow tests to adapt their expectations
model_config["name"] = config_name
# Yield instead of returning to keep a reference to the temporary directory.
# It will go out of scope and be released only once all tests needing the fixture
# have been completed.
logger.info(f"{config_name} ready for testing ...")
yield model_config
logger.info(f"Done with {config_name}")
@pytest.fixture(scope="module")
def neuron_model_path(neuron_model_config):
yield neuron_model_config["neuron_model_path"]

View File

@ -0,0 +1,252 @@
import asyncio
import contextlib
import logging
import os
import random
import shutil
import sys
import tempfile
import time
from typing import List
import docker
import huggingface_hub
import pytest
from aiohttp import ClientConnectorError, ClientOSError, ServerDisconnectedError
from docker.errors import NotFound
from huggingface_hub import AsyncInferenceClient, TextGenerationOutput
OPTIMUM_CACHE_REPO_ID = "optimum-internal-testing/neuron-testing-cache"
HF_TOKEN = huggingface_hub.get_token()
def get_tgi_docker_image():
docker_image = os.getenv("DOCKER_IMAGE", None)
if docker_image is None:
client = docker.from_env()
images = client.images.list(filters={"reference": "text-generation-inference"})
if not images:
raise ValueError("No text-generation-inference image found on this host to run tests.")
docker_image = images[0].tags[0]
return docker_image
logging.basicConfig(
level=logging.INFO,
format="[%(asctime)s] %(levelname)s [%(filename)s.%(funcName)s:%(lineno)d] %(message)s",
stream=sys.stdout,
)
logger = logging.getLogger(__file__)
class TestClient(AsyncInferenceClient):
def __init__(self, service_name: str, base_url: str):
super().__init__(model=base_url)
self.service_name = service_name
class LauncherHandle:
def __init__(self, service_name: str, port: int):
self.client = TestClient(service_name, f"http://localhost:{port}")
def _inner_health(self):
raise NotImplementedError
async def health(self, timeout: int = 60):
assert timeout > 0
for i in range(timeout):
if not self._inner_health():
raise RuntimeError(f"Service crashed after {i} seconds.")
try:
await self.client.text_generation("test", max_new_tokens=1)
logger.info(f"Service started after {i} seconds")
return
except (ClientConnectorError, ClientOSError, ServerDisconnectedError):
time.sleep(1)
except Exception:
raise RuntimeError("Basic generation failed with: {e}")
raise RuntimeError(f"Service failed to start after {i} seconds.")
class ContainerLauncherHandle(LauncherHandle):
def __init__(self, service_name, docker_client, container_name, port: int):
super(ContainerLauncherHandle, self).__init__(service_name, port)
self.docker_client = docker_client
self.container_name = container_name
self._log_since = time.time()
def _inner_health(self) -> bool:
container = self.docker_client.containers.get(self.container_name)
container_output = container.logs(since=self._log_since).decode("utf-8")
self._log_since = time.time()
if container_output != "":
print(container_output, end="")
return container.status in ["running", "created"]
@pytest.fixture(scope="module")
def event_loop():
loop = asyncio.get_event_loop()
yield loop
loop.close()
@pytest.fixture(scope="module")
def neuron_launcher(event_loop):
"""Utility fixture to expose a TGI service.
The fixture uses a single event loop for each module, but it can create multiple
docker services with different parameters using the parametrized inner context.
Args:
service_name (`str`):
Used to identify test configurations and adjust test expectations,
model_name_or_path (`str`):
The model to use (can be a hub model or a path)
trust_remote_code (`bool`):
Must be set to True for gated models.
Returns:
A `ContainerLauncherHandle` containing both a TGI server and client.
"""
@contextlib.contextmanager
def docker_launcher(
service_name: str,
model_name_or_path: str,
trust_remote_code: bool = False,
):
port = random.randint(8000, 10_000)
client = docker.from_env()
container_name = f"tgi-tests-{service_name}-{port}"
try:
container = client.containers.get(container_name)
container.stop()
container.wait()
except NotFound:
pass
env = {"LOG_LEVEL": "info,text_generation_router=debug", "CUSTOM_CACHE_REPO": OPTIMUM_CACHE_REPO_ID}
if HF_TOKEN is not None:
env["HUGGING_FACE_HUB_TOKEN"] = HF_TOKEN
env["HF_TOKEN"] = HF_TOKEN
for var in ["MAX_BATCH_SIZE", "MAX_TOTAL_TOKENS", "HF_AUTO_CAST_TYPE", "HF_NUM_CORES"]:
if var in os.environ:
env[var] = os.environ[var]
base_image = get_tgi_docker_image()
if os.path.isdir(model_name_or_path):
# Create a sub-image containing the model to workaround docker dind issues preventing
# to share a volume from the container running tests
test_image = f"{container_name}-img"
logger.info(
"Building image on the flight derivated from %s, tagged with %s",
base_image,
test_image,
)
with tempfile.TemporaryDirectory() as context_dir:
# Copy model directory to build context
model_path = os.path.join(context_dir, "model")
shutil.copytree(model_name_or_path, model_path)
# Create Dockerfile
container_model_id = f"/data/{model_name_or_path}"
docker_content = f"""
FROM {base_image}
COPY model {container_model_id}
"""
with open(os.path.join(context_dir, "Dockerfile"), "wb") as f:
f.write(docker_content.encode("utf-8"))
f.flush()
image, logs = client.images.build(path=context_dir, dockerfile=f.name, tag=test_image)
logger.info("Successfully built image %s", image.id)
logger.debug("Build logs %s", logs)
else:
test_image = base_image
image = None
container_model_id = model_name_or_path
args = ["--model-id", container_model_id, "--env"]
if trust_remote_code:
args.append("--trust-remote-code")
container = client.containers.run(
test_image,
command=args,
name=container_name,
environment=env,
auto_remove=False,
detach=True,
devices=["/dev/neuron0"],
ports={"80/tcp": port},
shm_size="1G",
)
logger.info(f"Starting {container_name} container")
yield ContainerLauncherHandle(service_name, client, container.name, port)
try:
container.stop(timeout=60)
container.wait(timeout=60)
except Exception as e:
logger.exception(f"Ignoring exception while stopping container: {e}.")
pass
finally:
logger.info("Removing container %s", container_name)
try:
container.remove(force=True)
except Exception as e:
logger.error("Error while removing container %s, skipping", container_name)
logger.exception(e)
# Cleanup the build image
if image:
logger.info("Cleaning image %s", image.id)
try:
image.remove(force=True)
except NotFound:
pass
except Exception as e:
logger.error("Error while removing image %s, skipping", image.id)
logger.exception(e)
return docker_launcher
@pytest.fixture(scope="module")
def neuron_generate_load():
"""A utility fixture to launch multiple asynchronous TGI requests in parallel
Args:
client (`AsyncClient`):
An async client
prompt (`str`):
The prompt to use (identical for all requests)
max_new_tokens (`int`):
The number of tokens to generate for each request.
n (`int`):
The number of requests
Returns:
A list of `huggingface_hub.TextGenerationOutput`.
"""
async def generate_load_inner(
client: AsyncInferenceClient, prompt: str, max_new_tokens: int, n: int
) -> List[TextGenerationOutput]:
futures = [
client.text_generation(prompt, max_new_tokens=max_new_tokens, details=True, decoder_input_details=True)
for _ in range(n)
]
return await asyncio.gather(*futures)
return generate_load_inner

View File

@ -0,0 +1,93 @@
import pytest
@pytest.fixture
async def tgi_service(neuron_launcher, neuron_model_config):
model_name_or_path = neuron_model_config["neuron_model_path"]
service_name = neuron_model_config["name"]
with neuron_launcher(service_name, model_name_or_path) as tgi_service:
await tgi_service.health(600)
yield tgi_service
@pytest.mark.asyncio
async def test_model_single_request(tgi_service):
service_name = tgi_service.client.service_name
prompt = "What is Deep Learning?"
# Greedy bounded without input
response = await tgi_service.client.text_generation(
prompt, max_new_tokens=17, details=True, decoder_input_details=True
)
assert response.details.generated_tokens == 17
greedy_expectations = {
"gpt2": "\n\nDeep learning is a new field of research that has been around for a while",
"llama": " and How Does it Work?\nDeep learning is a subset of machine learning that uses artificial",
"mistral": "\nWhat is Deep Learning?\nDeep Learning is a type of machine learning that",
"qwen2": " - Part 1\n\nDeep Learning is a subset of Machine Learning that is based on",
"granite": "\n\nDeep Learning is a subset of Machine Learning, which is a branch of Art",
}
assert response.generated_text == greedy_expectations[service_name]
# Greedy bounded with input
response = await tgi_service.client.text_generation(
"What is Deep Learning?", max_new_tokens=17, return_full_text=True, details=True, decoder_input_details=True
)
assert response.details.generated_tokens == 17
assert response.generated_text == prompt + greedy_expectations[service_name]
# Sampling
response = await tgi_service.client.text_generation(
"What is Deep Learning?",
do_sample=True,
top_k=50,
top_p=0.9,
repetition_penalty=1.2,
max_new_tokens=128,
seed=42,
)
sample_expectations = {
"gpt2": "Deep Learning",
"llama": "Deep Learning",
"mistral": "Deep learning",
"qwen2": "Deep Learning",
"granite": "Deep learning",
}
assert sample_expectations[service_name] in response
# Sampling with stop sequence
stop_sequence = sample_expectations[service_name][-5:]
response = await tgi_service.client.text_generation(
"What is Deep Learning?",
do_sample=True,
top_k=50,
top_p=0.9,
repetition_penalty=1.2,
max_new_tokens=128,
seed=42,
stop_sequences=[stop_sequence],
)
assert response.endswith(stop_sequence)
@pytest.mark.asyncio
async def test_model_multiple_requests(tgi_service, neuron_generate_load):
num_requests = 4
responses = await neuron_generate_load(
tgi_service.client,
"What is Deep Learning?",
max_new_tokens=17,
n=num_requests,
)
assert len(responses) == 4
expectations = {
"gpt2": "Deep learning is a new field of research that has been around for a while",
"llama": "Deep learning is a subset of machine learning that uses artificial",
"mistral": "Deep Learning is a type of machine learning that",
"qwen2": "Deep Learning is a subset of Machine Learning that is based on",
"granite": "Deep Learning is a subset of Machine Learning, which is a branch of Art",
}
expected = expectations[tgi_service.client.service_name]
for r in responses:
assert r.details.generated_tokens == 17
assert expected in r.generated_text

View File

@ -0,0 +1,65 @@
import os
import pytest
from huggingface_hub.errors import ValidationError
@pytest.fixture(scope="module", params=["hub-neuron", "hub", "local-neuron"])
async def tgi_service(request, neuron_launcher, neuron_model_config):
"""Expose a TGI service corresponding to a model configuration
For each model configuration, the service will be started using the following
deployment options:
- from the hub original model (export parameters chosen after hub lookup),
- from the hub pre-exported neuron model,
- from a local path to the neuron model.
"""
# the tgi_env.py script will take care of setting these
for var in [
"MAX_BATCH_SIZE",
"MAX_INPUT_TOKENS",
"MAX_TOTAL_TOKENS",
"HF_NUM_CORES",
"HF_AUTO_CAST_TYPE",
]:
if var in os.environ:
del os.environ[var]
if request.param == "hub":
model_name_or_path = neuron_model_config["model_id"]
elif request.param == "hub-neuron":
model_name_or_path = neuron_model_config["neuron_model_id"]
else:
model_name_or_path = neuron_model_config["neuron_model_path"]
service_name = neuron_model_config["name"]
with neuron_launcher(service_name, model_name_or_path) as tgi_service:
await tgi_service.health(600)
yield tgi_service
@pytest.mark.asyncio
async def test_model_single_request(tgi_service):
# Just verify that the generation works, and nothing is raised, with several set of params
# No params
await tgi_service.client.text_generation(
"What is Deep Learning?",
)
response = await tgi_service.client.text_generation(
"How to cook beans ?",
max_new_tokens=17,
details=True,
decoder_input_details=True,
)
assert response.details.generated_tokens == 17
# Sampling
await tgi_service.client.text_generation(
"What is Deep Learning?",
do_sample=True,
top_k=50,
top_p=0.9,
repetition_penalty=1.2,
max_new_tokens=128,
seed=42,
)