text-generation-inference/server/text_generation_server/server.py

215 lines
7.0 KiB
Python
Raw Normal View History

# Copyright (C) 2024 Habana Labs, Ltd. an Intel Company.
2022-10-08 10:30:12 +00:00
import asyncio
2022-10-17 12:59:00 +00:00
import os
import sys
import torch
import time
2022-10-17 12:59:00 +00:00
2022-10-08 10:30:12 +00:00
from grpc import aio
from loguru import logger
2022-10-08 10:30:12 +00:00
from grpc_reflection.v1alpha import reflection
from pathlib import Path
2023-01-31 17:53:56 +00:00
from typing import List, Optional
2022-10-08 10:30:12 +00:00
2023-03-07 17:52:22 +00:00
from text_generation_server.cache import Cache
from text_generation_server.interceptor import ExceptionInterceptor
from text_generation_server.models import Model, get_model
from text_generation_server.pb import generate_pb2_grpc, generate_pb2
from text_generation_server.tracing import UDSOpenTelemetryAioServerInterceptor
2022-10-08 10:30:12 +00:00
2023-09-27 10:22:09 +00:00
class TextGenerationService(generate_pb2_grpc.TextGenerationServiceServicer):
def __init__(
self,
model: Model,
cache: Cache,
server_urls: List[str],
):
self.cache = cache
self.model = model
self.server_urls = server_urls
# For some reason, inference_mode does not work well with GLOO which we use on CPU
# TODO: The inferecemode set messes up the autograd op dispatch. And results in aten::matmul
# op not optimized issue. Will investigate further.
# if model.device.type == "hpu":
# Force inference mode for the lifetime of TextGenerationService
# self._inference_mode_raii_guard = torch._C._InferenceMode(True)
2022-10-08 10:30:12 +00:00
async def Info(self, request, context):
return self.model.info
async def Health(self, request, context):
if self.model.device.type == "hpu":
torch.zeros((2, 2)).to("hpu")
return generate_pb2.HealthResponse()
2022-10-08 10:30:12 +00:00
async def ServiceDiscovery(self, request, context):
return generate_pb2.ServiceDiscoveryResponse(urls=self.server_urls)
async def ClearCache(self, request, context):
if request.HasField("id"):
self.cache.delete(request.id)
else:
self.cache.clear()
return generate_pb2.ClearCacheResponse()
2022-10-08 10:30:12 +00:00
async def FilterBatch(self, request, context):
batch = self.cache.pop(request.batch_id)
if batch is None:
raise ValueError(f"Batch ID {request.batch_id} not found in cache.")
filtered_batch = batch.filter(request.request_ids)
self.cache.set(filtered_batch)
return generate_pb2.FilterBatchResponse(batch=filtered_batch.to_pb())
async def Warmup(self, request, context):
def batch_from_pb(batch):
return self.model.batch_type.from_pb(
batch, self.model.tokenizer, self.model.dtype, self.model.device
)
batches = [batch_from_pb(batch) for batch in request.batches]
self.model.warmup(batches)
return generate_pb2.WarmupResponse()
async def Prefill(self, request, context):
start = time.time_ns()
batch = self.model.batch_type.from_pb(
request.batch, self.model.tokenizer, self.model.dtype, self.model.device
)
generations, next_batch, timings = self.model.generate_token([batch])
self.cache.set(next_batch)
return generate_pb2.PrefillResponse(
generations=[generation.to_pb() for generation in generations],
batch=next_batch.to_pb() if next_batch else None,
forward_ns=timings[0],
decode_ns=timings[1],
total_ns=time.time_ns() - start,
)
2022-10-08 10:30:12 +00:00
async def Decode(self, request, context):
start = time.time_ns()
if len(request.batches) == 0:
raise ValueError("Must provide at least one batch")
batches = []
for batch_pb in request.batches:
batch = self.cache.pop(batch_pb.id)
if batch is None:
raise ValueError(f"Batch ID {batch_pb.id} not found in cache.")
batches.append(batch)
if len(batches) == 0:
raise ValueError("All batches are empty")
generations, next_batch, timings = self.model.generate_token(batches)
self.cache.set(next_batch)
return generate_pb2.DecodeResponse(
generations=[generation.to_pb() for generation in generations],
batch=next_batch.to_pb() if next_batch else None,
concat_ns=None, # TODO: measure concat time
forward_ns=timings[0],
decode_ns=timings[1],
total_ns=time.time_ns() - start,
)
2022-10-08 10:30:12 +00:00
2022-10-18 13:19:03 +00:00
def serve(
model_id: str,
revision: Optional[str],
2023-12-11 11:46:30 +00:00
sharded: bool,
speculate: Optional[int],
dtype: Optional[str],
2023-12-11 11:46:30 +00:00
trust_remote_code: bool,
uds_path: Path,
):
# Remove default handler
logger.remove()
logger.add(
sys.stdout,
format="{message}",
filter="text_generation_server",
level="INFO",
serialize=False,
backtrace=True,
diagnose=False,
)
async def serve_inner(
model_id: str,
revision: Optional[str],
sharded: bool = False,
2023-12-11 11:46:30 +00:00
speculate: Optional[int] = None,
dtype: Optional[str] = None,
trust_remote_code: bool = False,
2022-10-08 10:30:12 +00:00
):
2022-10-18 13:19:03 +00:00
unix_socket_template = "unix://{}-{}"
logger.info("Server:server_inner: sharded ={}".format(sharded))
2022-10-08 10:30:12 +00:00
if sharded:
rank = int(os.environ["RANK"])
logger.info("Server:server_inner: rank ={}".format(rank))
2022-10-08 10:30:12 +00:00
server_urls = [
unix_socket_template.format(uds_path, rank) for rank in range(int(os.environ["WORLD_SIZE"]))
2022-10-08 10:30:12 +00:00
]
local_url = server_urls[int(os.environ["RANK"])]
2022-10-08 10:30:12 +00:00
else:
2022-10-18 13:19:03 +00:00
local_url = unix_socket_template.format(uds_path, 0)
2022-10-08 10:30:12 +00:00
server_urls = [local_url]
logger.info("Server:server_inner: data type = {}, local_url = {}".format(dtype, local_url))
if dtype == "bfloat16" or None:
data_type = torch.bfloat16
else:
data_type = torch.float
2023-12-11 08:24:09 +00:00
if revision == "None":
revision = None
try:
2023-12-11 11:46:30 +00:00
model = get_model(
2023-12-11 13:49:52 +00:00
model_id,
revision,
speculate,
data_type,
trust_remote_code
2023-12-11 11:46:30 +00:00
)
except Exception:
logger.exception("Error when initializing model")
raise
2023-02-13 12:02:45 +00:00
server = aio.server(
interceptors=[
ExceptionInterceptor(),
UDSOpenTelemetryAioServerInterceptor(),
]
)
generate_pb2_grpc.add_TextGenerationServiceServicer_to_server(
TextGenerationService(model, Cache(), server_urls), server
2022-10-08 10:30:12 +00:00
)
SERVICE_NAMES = (
generate_pb2.DESCRIPTOR.services_by_name["TextGenerationService"].full_name,
2022-10-08 10:30:12 +00:00
reflection.SERVICE_NAME,
)
reflection.enable_server_reflection(SERVICE_NAMES, server)
server.add_insecure_port(local_url)
2022-10-08 10:30:12 +00:00
await server.start()
logger.info("Server started at {}".format(local_url))
2022-10-18 13:19:03 +00:00
try:
await server.wait_for_termination()
except KeyboardInterrupt:
logger.info("Signal received. Shutting down")
2022-10-18 13:19:03 +00:00
await server.stop(0)
2022-10-08 10:30:12 +00:00
2023-12-11 11:46:30 +00:00
asyncio.run(
2023-12-11 13:49:52 +00:00
serve_inner(
model_id, revision, sharded, speculate, dtype, trust_remote_code
)
)