mirror of
https://github.com/huggingface/text-generation-inference.git
synced 2025-04-19 22:02:06 +00:00
# What does this PR do? ``` text-generation-launcher --model-id XXX # Uses cuda graphs by default text-generation-launcher --model-id XXX --cuda-graphs "1,2" #Restrict the number of cuda graphs which saves VRAM text-generation-launcher --model-id XXX --cuda-graphs "0" # Disabling it entirely ``` <!-- Congratulations! You've made it this far! You're not quite done yet though. Once merged, your PR is going to appear in the release notes with the title you set, so make sure it's a great title that fully reflects the extent of your awesome contribution. Then, please replace this with a description of the change and which issue is fixed (if applicable). Please also include relevant motivation and context. List any dependencies (if any) that are required for this change. Once you're done, someone will review your PR shortly (see the section "Who can review?" below to tag some potential reviewers). They may suggest changes to make the code even better. If no one reviewed your PR after a week has passed, don't hesitate to post a new comment @-mentioning the same persons---sometimes notifications get lost. --> <!-- Remove if not applicable --> Fixes # (issue) ## Before submitting - [ ] This PR fixes a typo or improves the docs (you can dismiss the other checks if that's the case). - [ ] Did you read the [contributor guideline](https://github.com/huggingface/transformers/blob/main/CONTRIBUTING.md#start-contributing-pull-requests), Pull Request section? - [ ] Was this discussed/approved via a Github issue or the [forum](https://discuss.huggingface.co/)? Please add a link to it if that's the case. - [ ] Did you make sure to update the documentation with your changes? Here are the [documentation guidelines](https://github.com/huggingface/transformers/tree/main/docs), and [here are tips on formatting docstrings](https://github.com/huggingface/transformers/tree/main/docs#writing-source-documentation). - [ ] Did you write any new necessary tests? ## Who can review? Anyone in the community is free to review the PR once the tests have passed. Feel free to tag members/contributors who may be interested in your PR. <!-- Your PR will be replied to more quickly if you can figure out the right person to tag with @ @OlivierDehaene OR @Narsil -->
459 lines
14 KiB
Python
459 lines
14 KiB
Python
import sys
|
|
import subprocess
|
|
import contextlib
|
|
import pytest
|
|
import asyncio
|
|
import os
|
|
import docker
|
|
import json
|
|
import math
|
|
import time
|
|
import random
|
|
|
|
from docker.errors import NotFound
|
|
from typing import Optional, List, Dict
|
|
from syrupy.extensions.json import JSONSnapshotExtension
|
|
from aiohttp import ClientConnectorError, ClientOSError, ServerDisconnectedError
|
|
|
|
from text_generation import AsyncClient
|
|
from text_generation.types import (
|
|
Response,
|
|
Details,
|
|
InputToken,
|
|
Token,
|
|
BestOfSequence,
|
|
Grammar,
|
|
ChatComplete,
|
|
ChatCompletionChunk,
|
|
ChatCompletionComplete,
|
|
)
|
|
|
|
DOCKER_IMAGE = os.getenv("DOCKER_IMAGE", None)
|
|
HUGGING_FACE_HUB_TOKEN = os.getenv("HUGGING_FACE_HUB_TOKEN", None)
|
|
DOCKER_VOLUME = os.getenv("DOCKER_VOLUME", "/data")
|
|
|
|
|
|
class ResponseComparator(JSONSnapshotExtension):
|
|
rtol = 0.2
|
|
|
|
def serialize(
|
|
self,
|
|
data,
|
|
*,
|
|
exclude=None,
|
|
matcher=None,
|
|
):
|
|
if (
|
|
isinstance(data, Response)
|
|
or isinstance(data, ChatComplete)
|
|
or isinstance(data, ChatCompletionChunk)
|
|
or isinstance(data, ChatCompletionComplete)
|
|
):
|
|
data = data.model_dump()
|
|
|
|
if isinstance(data, List):
|
|
data = [d.model_dump() for d in data]
|
|
|
|
data = self._filter(
|
|
data=data, depth=0, path=(), exclude=exclude, matcher=matcher
|
|
)
|
|
return json.dumps(data, indent=2, ensure_ascii=False, sort_keys=False) + "\n"
|
|
|
|
def matches(
|
|
self,
|
|
*,
|
|
serialized_data,
|
|
snapshot_data,
|
|
) -> bool:
|
|
def convert_data(data):
|
|
data = json.loads(data)
|
|
if isinstance(data, Dict) and "choices" in data:
|
|
choices = data["choices"]
|
|
if (
|
|
isinstance(choices, List)
|
|
and len(choices) >= 1
|
|
and "delta" in choices[0]
|
|
):
|
|
return ChatCompletionChunk(**data)
|
|
return ChatComplete(**data)
|
|
|
|
if isinstance(data, Dict):
|
|
return Response(**data)
|
|
if isinstance(data, List):
|
|
return [Response(**d) for d in data]
|
|
raise NotImplementedError
|
|
|
|
def eq_token(token: Token, other: Token) -> bool:
|
|
return (
|
|
token.id == other.id
|
|
and token.text == other.text
|
|
and math.isclose(token.logprob, other.logprob, rel_tol=self.rtol)
|
|
and token.special == other.special
|
|
)
|
|
|
|
def eq_prefill_token(prefill_token: InputToken, other: InputToken) -> bool:
|
|
try:
|
|
return (
|
|
prefill_token.id == other.id
|
|
and prefill_token.text == other.text
|
|
and (
|
|
math.isclose(
|
|
prefill_token.logprob, other.logprob, rel_tol=self.rtol
|
|
)
|
|
if prefill_token.logprob is not None
|
|
else prefill_token.logprob == other.logprob
|
|
)
|
|
)
|
|
except TypeError:
|
|
return False
|
|
|
|
def eq_best_of(details: BestOfSequence, other: BestOfSequence) -> bool:
|
|
return (
|
|
details.finish_reason == other.finish_reason
|
|
and details.generated_tokens == other.generated_tokens
|
|
and details.seed == other.seed
|
|
and len(details.prefill) == len(other.prefill)
|
|
and all(
|
|
[
|
|
eq_prefill_token(d, o)
|
|
for d, o in zip(details.prefill, other.prefill)
|
|
]
|
|
)
|
|
and len(details.tokens) == len(other.tokens)
|
|
and all([eq_token(d, o) for d, o in zip(details.tokens, other.tokens)])
|
|
)
|
|
|
|
def eq_details(details: Details, other: Details) -> bool:
|
|
return (
|
|
details.finish_reason == other.finish_reason
|
|
and details.generated_tokens == other.generated_tokens
|
|
and details.seed == other.seed
|
|
and len(details.prefill) == len(other.prefill)
|
|
and all(
|
|
[
|
|
eq_prefill_token(d, o)
|
|
for d, o in zip(details.prefill, other.prefill)
|
|
]
|
|
)
|
|
and len(details.tokens) == len(other.tokens)
|
|
and all([eq_token(d, o) for d, o in zip(details.tokens, other.tokens)])
|
|
and (
|
|
len(details.best_of_sequences)
|
|
if details.best_of_sequences is not None
|
|
else 0
|
|
)
|
|
== (
|
|
len(other.best_of_sequences)
|
|
if other.best_of_sequences is not None
|
|
else 0
|
|
)
|
|
and (
|
|
all(
|
|
[
|
|
eq_best_of(d, o)
|
|
for d, o in zip(
|
|
details.best_of_sequences, other.best_of_sequences
|
|
)
|
|
]
|
|
)
|
|
if details.best_of_sequences is not None
|
|
else details.best_of_sequences == other.best_of_sequences
|
|
)
|
|
)
|
|
|
|
def eq_chat_complete(response: ChatComplete, other: ChatComplete) -> bool:
|
|
return (
|
|
response.choices[0].message.content == other.choices[0].message.content
|
|
)
|
|
|
|
def eq_chat_complete_chunk(
|
|
response: ChatCompletionChunk, other: ChatCompletionChunk
|
|
) -> bool:
|
|
return response.choices[0].delta.content == other.choices[0].delta.content
|
|
|
|
def eq_response(response: Response, other: Response) -> bool:
|
|
return response.generated_text == other.generated_text and eq_details(
|
|
response.details, other.details
|
|
)
|
|
|
|
serialized_data = convert_data(serialized_data)
|
|
snapshot_data = convert_data(snapshot_data)
|
|
|
|
if not isinstance(serialized_data, List):
|
|
serialized_data = [serialized_data]
|
|
if not isinstance(snapshot_data, List):
|
|
snapshot_data = [snapshot_data]
|
|
|
|
if isinstance(serialized_data[0], ChatComplete):
|
|
return len(snapshot_data) == len(serialized_data) and all(
|
|
[eq_chat_complete(r, o) for r, o in zip(serialized_data, snapshot_data)]
|
|
)
|
|
|
|
if isinstance(serialized_data[0], ChatCompletionChunk):
|
|
return len(snapshot_data) == len(serialized_data) and all(
|
|
[
|
|
eq_chat_complete_chunk(r, o)
|
|
for r, o in zip(serialized_data, snapshot_data)
|
|
]
|
|
)
|
|
|
|
return len(snapshot_data) == len(serialized_data) and all(
|
|
[eq_response(r, o) for r, o in zip(serialized_data, snapshot_data)]
|
|
)
|
|
|
|
|
|
class GenerousResponseComparator(ResponseComparator):
|
|
# Needed for GPTQ with exllama which has serious numerical fluctuations.
|
|
rtol = 0.75
|
|
|
|
|
|
class LauncherHandle:
|
|
def __init__(self, port: int):
|
|
self.client = AsyncClient(f"http://localhost:{port}")
|
|
|
|
def _inner_health(self):
|
|
raise NotImplementedError
|
|
|
|
async def health(self, timeout: int = 60):
|
|
assert timeout > 0
|
|
for _ in range(timeout):
|
|
if not self._inner_health():
|
|
raise RuntimeError("Launcher crashed")
|
|
|
|
try:
|
|
await self.client.generate("test")
|
|
return
|
|
except (ClientConnectorError, ClientOSError, ServerDisconnectedError) as e:
|
|
time.sleep(1)
|
|
raise RuntimeError("Health check failed")
|
|
|
|
|
|
class ContainerLauncherHandle(LauncherHandle):
|
|
def __init__(self, docker_client, container_name, port: int):
|
|
super(ContainerLauncherHandle, self).__init__(port)
|
|
self.docker_client = docker_client
|
|
self.container_name = container_name
|
|
|
|
def _inner_health(self) -> bool:
|
|
container = self.docker_client.containers.get(self.container_name)
|
|
return container.status in ["running", "created"]
|
|
|
|
|
|
class ProcessLauncherHandle(LauncherHandle):
|
|
def __init__(self, process, port: int):
|
|
super(ProcessLauncherHandle, self).__init__(port)
|
|
self.process = process
|
|
|
|
def _inner_health(self) -> bool:
|
|
return self.process.poll() is None
|
|
|
|
|
|
@pytest.fixture
|
|
def response_snapshot(snapshot):
|
|
return snapshot.use_extension(ResponseComparator)
|
|
|
|
|
|
@pytest.fixture
|
|
def generous_response_snapshot(snapshot):
|
|
return snapshot.use_extension(GenerousResponseComparator)
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def event_loop():
|
|
loop = asyncio.get_event_loop()
|
|
yield loop
|
|
loop.close()
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def launcher(event_loop):
|
|
@contextlib.contextmanager
|
|
def local_launcher(
|
|
model_id: str,
|
|
num_shard: Optional[int] = None,
|
|
quantize: Optional[str] = None,
|
|
trust_remote_code: bool = False,
|
|
use_flash_attention: bool = True,
|
|
disable_grammar_support: bool = False,
|
|
dtype: Optional[str] = None,
|
|
revision: Optional[str] = None,
|
|
):
|
|
port = random.randint(8000, 10_000)
|
|
master_port = random.randint(10_000, 20_000)
|
|
|
|
shard_uds_path = (
|
|
f"/tmp/tgi-tests-{model_id.split('/')[-1]}-{num_shard}-{quantize}-server"
|
|
)
|
|
|
|
args = [
|
|
"text-generation-launcher",
|
|
"--model-id",
|
|
model_id,
|
|
"--port",
|
|
str(port),
|
|
"--master-port",
|
|
str(master_port),
|
|
"--shard-uds-path",
|
|
shard_uds_path,
|
|
]
|
|
|
|
env = os.environ
|
|
|
|
if disable_grammar_support:
|
|
args.append("--disable-grammar-support")
|
|
if num_shard is not None:
|
|
args.extend(["--num-shard", str(num_shard)])
|
|
if quantize is not None:
|
|
args.append("--quantize")
|
|
args.append(quantize)
|
|
if dtype is not None:
|
|
args.append("--dtype")
|
|
args.append(dtype)
|
|
if revision is not None:
|
|
args.append("--revision")
|
|
args.append(revision)
|
|
if trust_remote_code:
|
|
args.append("--trust-remote-code")
|
|
|
|
env["LOG_LEVEL"] = "info,text_generation_router=debug"
|
|
|
|
if not use_flash_attention:
|
|
env["USE_FLASH_ATTENTION"] = "false"
|
|
|
|
with subprocess.Popen(
|
|
args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env
|
|
) as process:
|
|
yield ProcessLauncherHandle(process, port)
|
|
|
|
process.terminate()
|
|
process.wait(60)
|
|
|
|
launcher_output = process.stdout.read().decode("utf-8")
|
|
print(launcher_output, file=sys.stderr)
|
|
|
|
process.stdout.close()
|
|
process.stderr.close()
|
|
|
|
if not use_flash_attention:
|
|
del env["USE_FLASH_ATTENTION"]
|
|
|
|
@contextlib.contextmanager
|
|
def docker_launcher(
|
|
model_id: str,
|
|
num_shard: Optional[int] = None,
|
|
quantize: Optional[str] = None,
|
|
trust_remote_code: bool = False,
|
|
use_flash_attention: bool = True,
|
|
disable_grammar_support: bool = False,
|
|
dtype: Optional[str] = None,
|
|
revision: Optional[str] = None,
|
|
):
|
|
port = random.randint(8000, 10_000)
|
|
|
|
args = ["--model-id", model_id, "--env"]
|
|
|
|
if disable_grammar_support:
|
|
args.append("--disable-grammar-support")
|
|
if num_shard is not None:
|
|
args.extend(["--num-shard", str(num_shard)])
|
|
if quantize is not None:
|
|
args.append("--quantize")
|
|
args.append(quantize)
|
|
if dtype is not None:
|
|
args.append("--dtype")
|
|
args.append(dtype)
|
|
if revision is not None:
|
|
args.append("--revision")
|
|
args.append(revision)
|
|
if trust_remote_code:
|
|
args.append("--trust-remote-code")
|
|
|
|
client = docker.from_env()
|
|
|
|
container_name = f"tgi-tests-{model_id.split('/')[-1]}-{num_shard}-{quantize}"
|
|
|
|
try:
|
|
container = client.containers.get(container_name)
|
|
container.stop()
|
|
container.wait()
|
|
except NotFound:
|
|
pass
|
|
|
|
gpu_count = num_shard if num_shard is not None else 1
|
|
|
|
env = {
|
|
"LOG_LEVEL": "info,text_generation_router=debug",
|
|
}
|
|
if not use_flash_attention:
|
|
env["USE_FLASH_ATTENTION"] = "false"
|
|
|
|
if HUGGING_FACE_HUB_TOKEN is not None:
|
|
env["HUGGING_FACE_HUB_TOKEN"] = HUGGING_FACE_HUB_TOKEN
|
|
|
|
volumes = []
|
|
if DOCKER_VOLUME:
|
|
volumes = [f"{DOCKER_VOLUME}:/data"]
|
|
|
|
container = client.containers.run(
|
|
DOCKER_IMAGE,
|
|
command=args,
|
|
name=container_name,
|
|
environment=env,
|
|
auto_remove=False,
|
|
detach=True,
|
|
device_requests=[
|
|
docker.types.DeviceRequest(count=gpu_count, capabilities=[["gpu"]])
|
|
],
|
|
volumes=volumes,
|
|
ports={"80/tcp": port},
|
|
shm_size="1G",
|
|
)
|
|
|
|
yield ContainerLauncherHandle(client, container.name, port)
|
|
|
|
if not use_flash_attention:
|
|
del env["USE_FLASH_ATTENTION"]
|
|
|
|
try:
|
|
container.stop()
|
|
container.wait()
|
|
except NotFound:
|
|
pass
|
|
|
|
container_output = container.logs().decode("utf-8")
|
|
print(container_output, file=sys.stderr)
|
|
|
|
container.remove()
|
|
|
|
if DOCKER_IMAGE is not None:
|
|
return docker_launcher
|
|
return local_launcher
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def generate_load():
|
|
async def generate_load_inner(
|
|
client: AsyncClient,
|
|
prompt: str,
|
|
max_new_tokens: int,
|
|
n: int,
|
|
seed: Optional[int] = None,
|
|
grammar: Optional[Grammar] = None,
|
|
stop_sequences: Optional[List[str]] = None,
|
|
) -> List[Response]:
|
|
futures = [
|
|
client.generate(
|
|
prompt,
|
|
max_new_tokens=max_new_tokens,
|
|
decoder_input_details=True,
|
|
seed=seed,
|
|
grammar=grammar,
|
|
stop_sequences=stop_sequences,
|
|
)
|
|
for _ in range(n)
|
|
]
|
|
|
|
return await asyncio.gather(*futures)
|
|
|
|
return generate_load_inner
|