feat(neuron): add server and integration tests

This commit is contained in:
David Corvoysier 2025-02-12 09:10:47 +00:00
parent 337329fff3
commit 856d7682cf
14 changed files with 1010 additions and 1 deletions

View File

@ -16,7 +16,7 @@ mkfile_path := $(abspath $(lastword $(MAKEFILE_LIST)))
mkfile_dir := $(dir $(mkfile_path)) mkfile_dir := $(dir $(mkfile_path))
root_dir := "${mkfile_dir}/../.." root_dir := "${mkfile_dir}/../.."
.PHONY: image install_server .PHONY: image install_server test_server test_integration
VERSION := $(shell gawk 'match($$0, /^version = "(.*)"/, a) {print a[1]}' ${root_dir}/Cargo.toml) VERSION := $(shell gawk 'match($$0, /^version = "(.*)"/, a) {print a[1]}' ${root_dir}/Cargo.toml)
@ -28,3 +28,11 @@ image:
install_server: install_server:
make -C ${mkfile_dir}/server install VERSION:=${VERSION} make -C ${mkfile_dir}/server install VERSION:=${VERSION}
test_server: install_server
python -m pip install -r ${mkfile_dir}/tests/requirements.txt
python -m pytest -sv ${mkfile_dir}/tests/server
test_integration: image
python -m pip install -r ${mkfile_dir}/tests/requirements.txt
python -m pytest -sv ${mkfile_dir}/tests/integration

View File

@ -0,0 +1 @@
pytest_plugins = ["fixtures.service", "fixtures.model"]

129
backends/neuron/tests/fixtures/model.py vendored Normal file
View File

@ -0,0 +1,129 @@
import copy
import logging
import subprocess
import sys
from tempfile import TemporaryDirectory
import huggingface_hub
import pytest
from transformers import AutoTokenizer
from optimum.neuron import NeuronModelForCausalLM
from optimum.neuron.utils import synchronize_hub_cache
from optimum.neuron.version import __sdk_version__ as sdk_version
from optimum.neuron.version import __version__ as version
logging.basicConfig(
level=logging.INFO,
format="[%(asctime)s] %(levelname)s [%(filename)s.%(funcName)s:%(lineno)d] %(message)s",
stream=sys.stdout,
)
logger = logging.getLogger(__file__)
OPTIMUM_CACHE_REPO_ID = "optimum-internal-testing/neuron-testing-cache"
# All model configurations below will be added to the neuron_model_config fixture
MODEL_CONFIGURATIONS = {
"gpt2": {
"model_id": "gpt2",
"export_kwargs": {"batch_size": 4, "sequence_length": 1024, "num_cores": 2, "auto_cast_type": "fp16"},
},
"llama": {
"model_id": "NousResearch/Hermes-2-Theta-Llama-3-8B",
"export_kwargs": {"batch_size": 4, "sequence_length": 2048, "num_cores": 2, "auto_cast_type": "fp16"},
},
"mistral": {
"model_id": "optimum/mistral-1.1b-testing",
"export_kwargs": {"batch_size": 4, "sequence_length": 4096, "num_cores": 2, "auto_cast_type": "bf16"},
},
"qwen2": {
"model_id": "Qwen/Qwen2.5-0.5B",
"export_kwargs": {"batch_size": 4, "sequence_length": 4096, "num_cores": 2, "auto_cast_type": "fp16"},
},
"granite": {
"model_id": "ibm-granite/granite-3.1-2b-instruct",
"export_kwargs": {"batch_size": 4, "sequence_length": 4096, "num_cores": 2, "auto_cast_type": "bf16"},
},
}
def get_hub_neuron_model_id(config_name: str):
return f"optimum-internal-testing/neuron-testing-{version}-{sdk_version}-{config_name}"
def export_model(model_id, export_kwargs, neuron_model_path):
export_command = ["optimum-cli", "export", "neuron", "-m", model_id, "--task", "text-generation"]
for kwarg, value in export_kwargs.items():
export_command.append(f"--{kwarg}")
export_command.append(str(value))
export_command.append(neuron_model_path)
logger.info(f"Exporting {model_id} with {export_kwargs}")
try:
subprocess.run(export_command, check=True)
except subprocess.CalledProcessError as e:
raise ValueError(f"Failed to export model: {e}")
@pytest.fixture(scope="session", params=MODEL_CONFIGURATIONS.keys())
def neuron_model_config(request):
"""Expose a pre-trained neuron model
The fixture first makes sure the following model artifacts are present on the hub:
- exported neuron model under optimum-internal-testing/neuron-testing-<version>-<name>,
- cached artifacts under optimum-internal-testing/neuron-testing-cache.
If not, it will export the model and push it to the hub.
It then fetches the model locally and return a dictionary containing:
- a configuration name,
- the original model id,
- the export parameters,
- the neuron model id,
- the neuron model local path.
For each exposed model, the local directory is maintained for the duration of the
test session and cleaned up afterwards.
The hub model artifacts are never cleaned up and persist accross sessions.
They must be cleaned up manually when the optimum-neuron version changes.
"""
config_name = request.param
model_config = copy.deepcopy(MODEL_CONFIGURATIONS[request.param])
model_id = model_config["model_id"]
export_kwargs = model_config["export_kwargs"]
neuron_model_id = get_hub_neuron_model_id(config_name)
with TemporaryDirectory() as neuron_model_path:
hub = huggingface_hub.HfApi()
if hub.repo_exists(neuron_model_id):
logger.info(f"Fetching {neuron_model_id} from the HuggingFace hub")
hub.snapshot_download(neuron_model_id, local_dir=neuron_model_path)
else:
export_model(model_id, export_kwargs, neuron_model_path)
tokenizer = AutoTokenizer.from_pretrained(model_id)
tokenizer.save_pretrained(neuron_model_path)
del tokenizer
# Create the test model on the hub
hub.create_repo(neuron_model_id, private=True)
hub.upload_folder(
folder_path=neuron_model_path,
repo_id=neuron_model_id,
ignore_patterns=[NeuronModelForCausalLM.CHECKPOINT_DIR + "/*"],
)
# Make sure it is cached
synchronize_hub_cache(cache_repo_id=OPTIMUM_CACHE_REPO_ID)
# Add dynamic parameters to the model configuration
model_config["neuron_model_path"] = neuron_model_path
model_config["neuron_model_id"] = neuron_model_id
# Also add model configuration name to allow tests to adapt their expectations
model_config["name"] = config_name
# Yield instead of returning to keep a reference to the temporary directory.
# It will go out of scope and be released only once all tests needing the fixture
# have been completed.
logger.info(f"{config_name} ready for testing ...")
yield model_config
logger.info(f"Done with {config_name}")
@pytest.fixture(scope="module")
def neuron_model_path(neuron_model_config):
yield neuron_model_config["neuron_model_path"]

View File

@ -0,0 +1,240 @@
import asyncio
import contextlib
import logging
import os
import random
import shutil
import sys
import tempfile
import time
from typing import List
import docker
import huggingface_hub
import pytest
from aiohttp import ClientConnectorError, ClientOSError, ServerDisconnectedError
from docker.errors import NotFound
from huggingface_hub import AsyncInferenceClient, TextGenerationOutput
OPTIMUM_CACHE_REPO_ID = "optimum-internal-testing/neuron-testing-cache"
DOCKER_IMAGE = os.getenv("DOCKER_IMAGE", "text-generation-inference:latest-neuron")
HF_TOKEN = huggingface_hub.get_token()
logging.basicConfig(
level=logging.INFO,
format="[%(asctime)s] %(levelname)s [%(filename)s.%(funcName)s:%(lineno)d] %(message)s",
stream=sys.stdout,
)
logger = logging.getLogger(__file__)
class TestClient(AsyncInferenceClient):
def __init__(self, service_name: str, base_url: str):
super().__init__(model=base_url)
self.service_name = service_name
class LauncherHandle:
def __init__(self, service_name: str, port: int):
self.client = TestClient(service_name, f"http://localhost:{port}")
def _inner_health(self):
raise NotImplementedError
async def health(self, timeout: int = 60):
assert timeout > 0
for i in range(timeout):
if not self._inner_health():
raise RuntimeError(f"Service crashed after {i} seconds.")
try:
await self.client.text_generation("test", max_new_tokens=1)
logger.info(f"Service started after {i} seconds")
return
except (ClientConnectorError, ClientOSError, ServerDisconnectedError):
time.sleep(1)
except Exception:
raise RuntimeError("Basic generation failed with: {e}")
raise RuntimeError(f"Service failed to start after {i} seconds.")
class ContainerLauncherHandle(LauncherHandle):
def __init__(self, service_name, docker_client, container_name, port: int):
super(ContainerLauncherHandle, self).__init__(service_name, port)
self.docker_client = docker_client
self.container_name = container_name
self._log_since = time.time()
def _inner_health(self) -> bool:
container = self.docker_client.containers.get(self.container_name)
container_output = container.logs(since=self._log_since).decode("utf-8")
self._log_since = time.time()
if container_output != "":
print(container_output, end="")
return container.status in ["running", "created"]
@pytest.fixture(scope="module")
def event_loop():
loop = asyncio.get_event_loop()
yield loop
loop.close()
@pytest.fixture(scope="module")
def launcher(event_loop):
"""Utility fixture to expose a TGI service.
The fixture uses a single event loop for each module, but it can create multiple
docker services with different parameters using the parametrized inner context.
Args:
service_name (`str`):
Used to identify test configurations and adjust test expectations,
model_name_or_path (`str`):
The model to use (can be a hub model or a path)
trust_remote_code (`bool`):
Must be set to True for gated models.
Returns:
A `ContainerLauncherHandle` containing both a TGI server and client.
"""
@contextlib.contextmanager
def docker_launcher(
service_name: str,
model_name_or_path: str,
trust_remote_code: bool = False,
):
port = random.randint(8000, 10_000)
client = docker.from_env()
container_name = f"tgi-tests-{service_name}-{port}"
try:
container = client.containers.get(container_name)
container.stop()
container.wait()
except NotFound:
pass
env = {"LOG_LEVEL": "info,text_generation_router=debug", "CUSTOM_CACHE_REPO": OPTIMUM_CACHE_REPO_ID}
if HF_TOKEN is not None:
env["HUGGING_FACE_HUB_TOKEN"] = HF_TOKEN
env["HF_TOKEN"] = HF_TOKEN
for var in ["MAX_BATCH_SIZE", "MAX_TOTAL_TOKENS", "HF_AUTO_CAST_TYPE", "HF_NUM_CORES"]:
if var in os.environ:
env[var] = os.environ[var]
if os.path.isdir(model_name_or_path):
# Create a sub-image containing the model to workaround docker dind issues preventing
# to share a volume from the container running tests
docker_tag = f"{container_name}-img"
logger.info(
"Building image on the flight derivated from %s, tagged with %s",
DOCKER_IMAGE,
docker_tag,
)
with tempfile.TemporaryDirectory() as context_dir:
# Copy model directory to build context
model_path = os.path.join(context_dir, "model")
shutil.copytree(model_name_or_path, model_path)
# Create Dockerfile
container_model_id = f"/data/{model_name_or_path}"
docker_content = f"""
FROM {DOCKER_IMAGE}
COPY model {container_model_id}
"""
with open(os.path.join(context_dir, "Dockerfile"), "wb") as f:
f.write(docker_content.encode("utf-8"))
f.flush()
image, logs = client.images.build(path=context_dir, dockerfile=f.name, tag=docker_tag)
logger.info("Successfully built image %s", image.id)
logger.debug("Build logs %s", logs)
else:
docker_tag = DOCKER_IMAGE
image = None
container_model_id = model_name_or_path
args = ["--model-id", container_model_id, "--env"]
if trust_remote_code:
args.append("--trust-remote-code")
container = client.containers.run(
docker_tag,
command=args,
name=container_name,
environment=env,
auto_remove=False,
detach=True,
devices=["/dev/neuron0"],
ports={"80/tcp": port},
shm_size="1G",
)
logger.info(f"Starting {container_name} container")
yield ContainerLauncherHandle(service_name, client, container.name, port)
try:
container.stop(timeout=60)
container.wait(timeout=60)
except Exception as e:
logger.exception(f"Ignoring exception while stopping container: {e}.")
pass
finally:
logger.info("Removing container %s", container_name)
try:
container.remove(force=True)
except Exception as e:
logger.error("Error while removing container %s, skipping", container_name)
logger.exception(e)
# Cleanup the build image
if image:
logger.info("Cleaning image %s", image.id)
try:
image.remove(force=True)
except NotFound:
pass
except Exception as e:
logger.error("Error while removing image %s, skipping", image.id)
logger.exception(e)
return docker_launcher
@pytest.fixture(scope="module")
def generate_load():
"""A utility fixture to launch multiple asynchronous TGI requests in parallel
Args:
client (`AsyncClient`):
An async client
prompt (`str`):
The prompt to use (identical for all requests)
max_new_tokens (`int`):
The number of tokens to generate for each request.
n (`int`):
The number of requests
Returns:
A list of `huggingface_hub.TextGenerationOutput`.
"""
async def generate_load_inner(
client: AsyncInferenceClient, prompt: str, max_new_tokens: int, n: int
) -> List[TextGenerationOutput]:
futures = [
client.text_generation(prompt, max_new_tokens=max_new_tokens, details=True, decoder_input_details=True)
for _ in range(n)
]
return await asyncio.gather(*futures)
return generate_load_inner

View File

@ -0,0 +1,96 @@
import Levenshtein
import pytest
@pytest.fixture
async def tgi_service(launcher, neuron_model_config):
model_name_or_path = neuron_model_config["neuron_model_path"]
service_name = neuron_model_config["name"]
with launcher(service_name, model_name_or_path) as tgi_service:
await tgi_service.health(600)
yield tgi_service
@pytest.mark.asyncio
async def test_model_single_request(tgi_service):
service_name = tgi_service.client.service_name
prompt = "What is Deep Learning?"
# Greedy bounded without input
response = await tgi_service.client.text_generation(
prompt, max_new_tokens=17, details=True, decoder_input_details=True
)
assert response.details.generated_tokens == 17
greedy_expectations = {
"gpt2": "\n\nDeep learning is a new field of research that has been around for a while",
"llama": " A Beginners Guide\nDeep learning is a subset of machine learning that involves the use",
"mistral": "\nWhat is Deep Learning?\nDeep Learning is a type of machine learning that",
"qwen2": " - Part 1\n\nDeep Learning is a subset of Machine Learning that is based on",
"granite": "\n\nDeep Learning is a subset of Machine Learning, which is a branch of Art",
}
assert response.generated_text == greedy_expectations[service_name]
# Greedy bounded with input
response = await tgi_service.client.text_generation(
"What is Deep Learning?", max_new_tokens=17, return_full_text=True, details=True, decoder_input_details=True
)
assert response.details.generated_tokens == 17
assert response.generated_text == prompt + greedy_expectations[service_name]
# Sampling
response = await tgi_service.client.text_generation(
"What is Deep Learning?",
do_sample=True,
top_k=50,
top_p=0.9,
repetition_penalty=1.2,
max_new_tokens=128,
seed=42,
)
sample_expectations = {
"gpt2": "Deep Learning",
"llama": "Deep Learning",
"mistral": "Deep learning",
"qwen2": "Deep Learning",
"granite": "Deep learning",
}
assert sample_expectations[service_name] in response
# Sampling with stop sequence
stop_sequence = sample_expectations[service_name][-5:]
response = await tgi_service.client.text_generation(
"What is Deep Learning?",
do_sample=True,
top_k=50,
top_p=0.9,
repetition_penalty=1.2,
max_new_tokens=128,
seed=42,
stop_sequences=[stop_sequence],
)
assert response.endswith(stop_sequence)
@pytest.mark.asyncio
async def test_model_multiple_requests(tgi_service, generate_load):
num_requests = 4
responses = await generate_load(
tgi_service.client,
"What is Deep Learning?",
max_new_tokens=17,
n=num_requests,
)
assert len(responses) == 4
expectations = {
"gpt2": "\n\nDeep learning is a new field of research that has been around for a while",
"llama": " A Beginners Guide\nDeep learning is a subset of machine learning that involves the use",
"mistral": "\nWhat is Deep Learning?\nDeep Learning is a type of machine learning that",
"qwen2": " - Part 1\n\nDeep Learning is a subset of Machine Learning that is based on",
"granite": "\n\nDeep Learning is a subset of Machine Learning, which is a branch of Art",
}
expected = expectations[tgi_service.client.service_name]
for r in responses:
assert r.details.generated_tokens == 17
# Compute the similarity with the expectation using the levenshtein distance
# We should not have more than two substitutions or additions
assert Levenshtein.distance(r.generated_text, expected) < 3

View File

@ -0,0 +1,76 @@
import os
import pytest
from huggingface_hub.errors import ValidationError
@pytest.fixture(scope="module", params=["hub-neuron", "hub", "local-neuron"])
async def tgi_service(request, launcher, neuron_model_config):
"""Expose a TGI service corresponding to a model configuration
For each model configuration, the service will be started using the following
deployment options:
- from the hub original model (export parameters chosen after hub lookup),
- from the hub pre-exported neuron model,
- from a local path to the neuron model.
"""
# the tgi_env.py script will take care of setting these
for var in [
"MAX_BATCH_SIZE",
"MAX_INPUT_TOKENS",
"MAX_TOTAL_TOKENS",
"HF_NUM_CORES",
"HF_AUTO_CAST_TYPE",
]:
if var in os.environ:
del os.environ[var]
if request.param == "hub":
model_name_or_path = neuron_model_config["model_id"]
elif request.param == "hub-neuron":
model_name_or_path = neuron_model_config["neuron_model_id"]
else:
model_name_or_path = neuron_model_config["neuron_model_path"]
service_name = neuron_model_config["name"]
with launcher(service_name, model_name_or_path) as tgi_service:
await tgi_service.health(600)
yield tgi_service
@pytest.mark.asyncio
async def test_model_single_request(tgi_service):
# Just verify that the generation works, and nothing is raised, with several set of params
# No params
await tgi_service.client.text_generation(
"What is Deep Learning?",
)
response = await tgi_service.client.text_generation(
"How to cook beans ?",
max_new_tokens=17,
details=True,
decoder_input_details=True,
)
assert response.details.generated_tokens == 17
# check error
try:
await tgi_service.client.text_generation("What is Deep Learning?", max_new_tokens=170000)
except ValidationError:
pass
else:
raise AssertionError(
"The previous text generation request should have failed, "
"because too many tokens were requested, it succeeded"
)
# Sampling
await tgi_service.client.text_generation(
"What is Deep Learning?",
do_sample=True,
top_k=50,
top_p=0.9,
repetition_penalty=1.2,
max_new_tokens=128,
seed=42,
)

View File

@ -0,0 +1,2 @@
[pytest]
asyncio_mode = auto

View File

@ -0,0 +1,19 @@
# Copyright 2023 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
text-generation >= 0.6.0
pytest >= 7.4.0
pytest-asyncio >= 0.21.1
requests < 2.32.0
docker >= 6.1.3
Levenshtein

View File

@ -0,0 +1,149 @@
from text_generation_server.generator import NeuronGenerator
from text_generation_server.pb.generate_pb2 import (
Batch,
NextTokenChooserParameters,
Request,
StoppingCriteriaParameters,
)
def create_request(
id: int,
inputs: str,
truncate: int = 0,
max_new_tokens: int = 20,
do_sample: bool = False,
top_k: int = 50,
top_p: float = 0.9,
temperature: float = 1.0,
seed: int = 42,
repetition_penalty: float = 1.0,
):
parameters = NextTokenChooserParameters(
temperature=temperature,
top_k=top_k,
top_p=top_p,
do_sample=do_sample,
seed=seed,
repetition_penalty=repetition_penalty,
)
stopping_parameters = StoppingCriteriaParameters(max_new_tokens=max_new_tokens)
return Request(
id=id, inputs=inputs, truncate=truncate, parameters=parameters, stopping_parameters=stopping_parameters
)
def check_prefill(input_text, expected_token_id, expected_token_text, do_sample, batch_size, model_path):
"""Verify that a prefill for a single request generates the expected output."""
generator = NeuronGenerator.from_pretrained(model_path)
assert generator.model.batch_size >= batch_size
requests = []
max_new_tokens = 20
for i in range(batch_size):
requests.append(create_request(id=0, inputs=input_text, do_sample=do_sample, max_new_tokens=max_new_tokens))
# Let's be pessimistic when estimating max_tokens
batch_size * (len(input_text) + max_new_tokens)
max_length = generator.model.max_length
batch = Batch(id=0, requests=requests, size=batch_size, max_tokens=batch_size * max_length)
generations, next_batch = generator.prefill(batch)
assert next_batch.size == batch_size
# Whatever was passed as max_tokens, the server will correct it
# because of static batching
assert next_batch.max_tokens == batch_size * max_length
assert len(generations) == batch_size
for g in generations:
tokens = g.tokens
assert tokens.ids == [expected_token_id]
assert tokens.texts == [expected_token_text]
def check_decode_single(input_text, max_new_tokens, generated_text, do_sample, model_path):
"""Verify that a decoding for a single request generates the expected output."""
generator = NeuronGenerator.from_pretrained(model_path)
request = create_request(id=0, inputs=input_text, max_new_tokens=max_new_tokens, do_sample=do_sample)
max_length = generator.model.max_length
batch = Batch(id=0, requests=[request], size=1, max_tokens=max_length)
generations, next_batch = generator.prefill(batch)
# We already generated one token: call decode max_new_tokens - 1 times
for _ in range(max_new_tokens - 1):
assert next_batch.size == 1
assert next_batch.max_tokens == max_length
assert len(generations) == 1
assert len(generations[0].tokens.ids) == 1
generations, next_batch = generator.decode([next_batch])
assert next_batch is None
assert len(generations) == 1
output = generations[0].generated_text
assert output.generated_tokens == max_new_tokens
assert output.finish_reason == 0
assert output.text == generated_text
def check_decode_multiple(model_path):
"""Verify that two requests added to the batch at different generation steps
generate the same outputs (continuous batching).
"""
generator = NeuronGenerator.from_pretrained(model_path)
assert generator.model.batch_size > 1
input_text = "Once upon a time"
max_new_tokens = 20
# Prefill a single request, remembering the generated token
tokens = {0: [], 1: []}
request = create_request(id=0, inputs=input_text, max_new_tokens=max_new_tokens)
max_length = generator.model.max_length
batch = Batch(id=0, requests=[request], size=1, max_tokens=max_length)
generations, next_batch = generator.prefill(batch)
assert next_batch.size == 1
assert len(generations) == 1
g = generations[0]
tokens[g.request_id].append(g.tokens.ids[0])
assert len(tokens[0]) == 1
# Decode a few tokens
gen_tokens = 4
for _ in range(gen_tokens - 1):
generations, next_batch = generator.decode([next_batch])
assert len(generations) == 1
g = generations[0]
tokens[g.request_id].append(g.tokens.ids[0])
assert len(tokens[0]) == gen_tokens
assert next_batch.size == 1
# Add a second request
request = create_request(id=1, inputs=input_text, max_new_tokens=max_new_tokens)
batch = Batch(id=1, requests=[request], size=1, max_tokens=max_length)
generations, next_batch_1 = generator.prefill(batch)
assert next_batch_1.size == 1
# We should have generated only a single token
assert len(generations) == 1
g = generations[0]
tokens[g.request_id].append(g.tokens.ids[0])
assert len(tokens[0]) == gen_tokens
assert len(tokens[1]) == 1
# Decode more tokens until we reach the maximum for the first request
batches = [next_batch, next_batch_1]
for _ in range(max_new_tokens - gen_tokens):
generations, next_batch = generator.decode(batches)
for g in generations:
tokens[g.request_id].append(g.tokens.ids[0])
batches = [next_batch]
# Verify we now only have one pending request
assert next_batch.size == 1
assert len(tokens[0]) == max_new_tokens
assert len(tokens[1]) == max_new_tokens - gen_tokens + 1
# Verify we have the output for the first request
for g in generations:
if g.request_id == 0:
output = g.generated_text
assert output.text != ""
assert output.generated_tokens == max_new_tokens
generated_text = output.text
# Continue decoding until the end of the second request
for _ in range(gen_tokens - 1):
generations, next_batch = generator.decode([next_batch])
assert len(generations) == 1
g = generations[0]
tokens[g.request_id].append(g.tokens.ids[0])
assert next_batch is None
output = generations[0].generated_text
assert output.generated_tokens == max_new_tokens
assert tokens[0] == tokens[1]
assert output.text == generated_text

View File

@ -0,0 +1,74 @@
from helpers import create_request
from text_generation_server.generator import NeuronGenerator
from text_generation_server.pb.generate_pb2 import Batch
def test_continuous_batching_two_requests(neuron_model_config):
"""Verify that two requests added to the batch at different generation steps
generate the same outputs (continuous batching).
"""
neuron_model_path = neuron_model_config["neuron_model_path"]
generator = NeuronGenerator.from_pretrained(neuron_model_path)
assert generator.model.batch_size > 1
input_text = "Once upon a time"
max_new_tokens = 20
# Prefill a single request, remembering the generated token
tokens = {0: [], 1: []}
request = create_request(id=0, inputs=input_text, max_new_tokens=max_new_tokens)
max_length = generator.model.max_length
batch = Batch(id=0, requests=[request], size=1, max_tokens=max_length)
generations, next_batch = generator.prefill(batch)
assert next_batch.size == 1
assert len(generations) == 1
g = generations[0]
tokens[g.request_id].append(g.tokens.ids[0])
assert len(tokens[0]) == 1
# Decode a few tokens
gen_tokens = 4
for _ in range(gen_tokens - 1):
generations, next_batch = generator.decode([next_batch])
assert len(generations) == 1
g = generations[0]
tokens[g.request_id].append(g.tokens.ids[0])
assert len(tokens[0]) == gen_tokens
assert next_batch.size == 1
# Add a second request
request = create_request(id=1, inputs=input_text, max_new_tokens=max_new_tokens)
batch = Batch(id=1, requests=[request], size=1, max_tokens=max_length)
generations, next_batch_1 = generator.prefill(batch)
assert next_batch_1.size == 1
# We should have generated only a single token
assert len(generations) == 1
g = generations[0]
tokens[g.request_id].append(g.tokens.ids[0])
assert len(tokens[0]) == gen_tokens
assert len(tokens[1]) == 1
# Decode more tokens until we reach the maximum for the first request
batches = [next_batch, next_batch_1]
for _ in range(max_new_tokens - gen_tokens):
generations, next_batch = generator.decode(batches)
for g in generations:
tokens[g.request_id].append(g.tokens.ids[0])
batches = [next_batch]
# Verify we now only have one pending request
assert next_batch.size == 1
assert len(tokens[0]) == max_new_tokens
assert len(tokens[1]) == max_new_tokens - gen_tokens + 1
# Verify we have the output for the first request
for g in generations:
if g.request_id == 0:
output = g.generated_text
assert output.text != ""
assert output.generated_tokens == max_new_tokens
generated_text = output.text
# Continue decoding until the end of the second request
for _ in range(gen_tokens - 1):
generations, next_batch = generator.decode([next_batch])
assert len(generations) == 1
g = generations[0]
tokens[g.request_id].append(g.tokens.ids[0])
assert next_batch is None
output = generations[0].generated_text
assert output.generated_tokens == max_new_tokens
assert tokens[0] == tokens[1]
assert output.text == generated_text

View File

@ -0,0 +1,55 @@
from helpers import create_request
from text_generation_server.generator import NeuronGenerator
from text_generation_server.pb.generate_pb2 import Batch
def test_decode(neuron_model_config):
"""Verify that a decoding for a single request generates the expected output."""
config_name = neuron_model_config["name"]
neuron_model_path = neuron_model_config["neuron_model_path"]
generator = NeuronGenerator.from_pretrained(neuron_model_path)
for do_sample in [True, False]:
mode = "sample" if do_sample else "greedy"
print(f"{config_name}[{mode}]")
_test_decode(config_name, generator, do_sample)
generator.clear()
def _test_decode(config_name, generator, do_sample):
input_text = "It was a bright cold day in April, and the clocks were striking thirteen."
max_new_tokens = 20
request = create_request(id=0, inputs=input_text, max_new_tokens=max_new_tokens, do_sample=do_sample)
max_length = generator.model.max_length
batch = Batch(id=0, requests=[request], size=1, max_tokens=max_length)
generations, next_batch = generator.prefill(batch)
# We already generated one token: call decode max_new_tokens - 1 times
for _ in range(max_new_tokens - 1):
assert next_batch.size == 1
assert next_batch.max_tokens == max_length
assert len(generations) == 1
assert len(generations[0].tokens.ids) == 1
generations, next_batch = generator.decode([next_batch])
assert next_batch is None
assert len(generations) == 1
output = generations[0].generated_text
assert output.generated_tokens == max_new_tokens
assert output.finish_reason == 0
if do_sample:
expected_text = {
"gpt2": " The sun was set",
"llama": "George Orwell, 1984",
"mistral": "The sky was",
"qwen2": " A young woman with",
"granite": "1984, George Orwell",
}[config_name]
assert expected_text in output.text
else:
print(output.text)
expected_text = {
"gpt2": '\n\n"I\'m going to go to bed," I said.\n\n"I\'m going',
"llama": " George Orwells classic dystopian novel, 1984, begins with this ominous sentence. The story",
"mistral": "\nThe clocks were striking thirteen.\nThe clocks were striking thirteen.",
"qwen2": " I was sitting in my room, staring at the ceiling, when the door opened and in came a",
"granite": "\n\nThis opening line from George Orwell's dystopian novel \"198",
}[config_name]
assert output.text == expected_text

View File

@ -0,0 +1,61 @@
import pytest
import torch
from text_generation_server.generator import Slot
from text_generation_server.pb.generate_pb2 import Request
from transformers import AutoTokenizer, GenerationConfig
TOKENIZERS = ["NousResearch/Llama-2-7b-hf", "gpt2"]
@pytest.fixture(params=TOKENIZERS)
def tokenizer(request):
t = AutoTokenizer.from_pretrained(request.param)
t.padding_side = "left"
t.pad_token_id = t.eos_token_id
return t
@pytest.mark.parametrize(
"input_text, generated_text",
[
[
"It was a bright cold day in April, and the clocks were striking thirteen.",
" Winston Smith, his chin nuzzled into his breast in an effort to escape the vile wind,"
" slipped quickly through the glass doors of Victory Mansions, though not quickly enough"
" to prevent a swirl of gritty dust from entering along with him.",
],
["This sentence is written in chinese:", "我很感谢你的热情"],
["Some text might contain a lot of emojis like 😃", "😍💪 👉 👀"],
],
ids=["spaces", "chinese-utf8", "emojis"],
)
def test_decode_streaming(tokenizer, input_text, generated_text):
slot = Slot(0, tokenizer)
request = Request(id=0, inputs=input_text)
slot.assign(0, request, GenerationConfig())
assert slot.cached_text == input_text
inputs = tokenizer(input_text, padding="max_length", max_length=len(input_text) + 1, return_tensors="pt")
input_ids = inputs["input_ids"][0]
attention_mask = inputs["attention_mask"][0]
generated_tokens = tokenizer(generated_text, add_special_tokens=False)["input_ids"]
# We need to regenerate the full text as the tokenizer might change it (extra spaces might be added)
all_input_ids = torch.cat([input_ids, torch.tensor(generated_tokens)])
full_text = tokenizer.decode(all_input_ids, skip_special_tokens=True)
regenerated_text = full_text[len(input_text) :]
# Initialize the slot with the inputs
slot.reset(input_ids, attention_mask, selector=None)
assert slot.generated_tokens == 0
# Simulate an iterative generation (i.e. don't call select and use known tokens instead)
decoded_text = ""
for i in range(len(generated_tokens)):
text = slot.append(generated_tokens[i])
assert slot.generated_tokens == i + 1
decoded_text += text
assert decoded_text == regenerated_text

View File

@ -0,0 +1,10 @@
from text_generation_server.generator import NeuronGenerator
def test_info(neuron_model_path):
generator = NeuronGenerator.from_pretrained(neuron_model_path)
info = generator.info
assert info.requires_padding is True
assert info.device_type == "xla"
assert info.window_size == 0
assert info.speculate == 0

View File

@ -0,0 +1,89 @@
from helpers import create_request
from text_generation_server.generator import NeuronGenerator
from text_generation_server.pb.generate_pb2 import Batch
def test_prefill(neuron_model_config):
"""Verify that a prefill for a single request generates the expected output."""
config_name = neuron_model_config["name"]
neuron_model_path = neuron_model_config["neuron_model_path"]
generator = NeuronGenerator.from_pretrained(neuron_model_path)
max_batch_size = 4
assert generator.model.batch_size >= max_batch_size
for num_requests in [1, max_batch_size]:
for do_sample in [True, False]:
mode = "sample" if do_sample else "greedy"
print(f"[{mode}]: {num_requests} requests")
_test_prefill(config_name, generator, num_requests, do_sample)
generator.clear()
def _test_prefill(config_name, generator, batch_size, do_sample):
requests = []
max_new_tokens = 20
input_text = "It was a bright cold day in April, and the clocks were striking thirteen."
for i in range(batch_size):
requests.append(create_request(id=i, inputs=input_text, do_sample=do_sample, max_new_tokens=max_new_tokens))
# Let's be pessimistic when estimating max_tokens
max_length = generator.model.max_length
batch = Batch(id=0, requests=requests, size=batch_size, max_tokens=batch_size * max_length)
generations, next_batch = generator.prefill(batch)
assert next_batch.size == batch_size
# Whatever was passed as max_tokens, the server will correct it
# because of static batching
assert next_batch.max_tokens == batch_size * max_length
assert len(generations) == batch_size
if do_sample:
expectations = {
"gpt2": [383, " The"],
"llama": [10058, " George"],
"mistral": [450, " The"],
"qwen2": [362, " A"],
"granite": [308, " ("],
}[config_name]
else:
expectations = {
"gpt2": [198, "\n"],
"llama": [10058, " George"],
"mistral": [13, "\n"],
"qwen2": [358, " I"],
"granite": [203, "\n"],
}[config_name]
for g in generations:
tokens = g.tokens
assert tokens.ids[0] == expectations[0]
assert tokens.texts[0] == expectations[1]
def test_prefill_truncate(neuron_model_config):
config_name = neuron_model_config["name"]
neuron_model_path = neuron_model_config["neuron_model_path"]
generator = NeuronGenerator.from_pretrained(neuron_model_path)
batch_size = generator.model.batch_size
# We apply truncation to all requests but the first one
truncate = [
None,
] + [i * 3 for i in range(1, batch_size)]
input_text = (
"Two gin-scented tears trickled down the sides of his nose."
" But it was all right, everything was all right, the struggle was finished."
" He had won the victory over himself. He loved Big Brother."
)
requests = []
for i in range(batch_size):
requests.append(create_request(id=i, inputs=input_text, truncate=truncate[i]))
max_length = generator.model.max_length
batch = Batch(id=0, requests=requests, size=batch_size, max_tokens=batch_size * max_length)
generations, _ = generator.prefill(batch)
# Even if the input text is identical for all requests, the first generated token might
# be different because of the truncation
expectations = {
"gpt2": [" He", " He", "\n", " He"],
"llama": ["", " The", " He", " He"],
"mistral": [" He", "\n", " He", " He"],
"qwen2": [" He", " The", " He", " He"],
"granite": ["\n", "\n", " I", " He"],
}[config_name]
for i, g in enumerate(generations):
tokens = g.tokens
assert tokens.texts[0] == expectations[i]