使用请求批处理提供文本生成器服务#

本教程展示了如何部署一个使用批处理同时处理多个查询的文本生成器。你将学习如何

  • 实现一个处理批量请求的 Ray Serve 部署

  • 配置和优化批处理

  • 通过 HTTP 和 Python 查询模型

当你的模型支持并行处理(如 GPU 加速或向量化操作)时,批处理可以显著提高性能。通过同时处理多个请求,它可以提高吞吐量和硬件利用率。

注意

本教程重点介绍带批处理的在线服务。对于大型数据集的离线批处理,请参阅 使用 Ray Data 进行批量推理

先决条件#

pip install "ray[serve] transformers"

定义部署#

创建一个新的 Python 文件,命名为 tutorial_batch.py。首先,导入 Ray Serve 和其他一些辅助工具。

from typing import List

from starlette.requests import Request
from transformers import pipeline

from ray import serve

Ray Serve 提供了 @serve.batch 装饰器,用于自动将单个请求批量处理到一个函数或类方法。

被装饰的方法

  • 必须是 async def 来处理并发请求

  • 接收一个要一起处理的请求列表

  • 返回一个等长结果列表,每个请求对应一个结果

@serve.batch
async def my_batch_handler(self, requests: List):
    # Process multiple requests together
    results = []
    for request in requests:
        results.append(request)  # processing logic here
    return results

你可以从部署中的另一个 async def 方法调用批量处理器。Ray Serve 会将这些调用批量处理并一起执行,但返回的结果与普通函数调用一样是独立的。

class BatchingDeployment:
    @serve.batch
    async def my_batch_handler(self, requests: List):
        results = []
        for request in requests:
            results.append(request.json())  # processing logic here
        return results

    async def __call__(self, request):
        return await self.my_batch_handler(request)

注意

Ray Serve 默认使用机会性批处理 - 请求一到达就会执行,而不会等待完整的批次。你可以在 @serve.batch 装饰器中使用 batch_wait_timeout_s 来调整此行为,以牺牲延迟换取吞吐量的提升(默认为 0)。在低负载下增加此值可能会以延迟为代价提高吞吐量。

接下来,定义一个部署,该部署接收一个输入字符串列表,并在输入上运行向量化文本生成。

@serve.deployment
class BatchTextGenerator:
    def __init__(self, pipeline_key: str, model_key: str):
        self.model = pipeline(pipeline_key, model_key)

    @serve.batch(max_batch_size=4)
    async def handle_batch(self, inputs: List[str]) -> List[str]:
        print("Our input array has length:", len(inputs))

        results = self.model(inputs)
        return [result[0]["generated_text"] for result in results]

    async def __call__(self, request: Request) -> List[str]:
        return await self.handle_batch(request.query_params["text"])

接下来,准备部署。请注意,在 @serve.batch 装饰器中,你使用 max_batch_size=4 指定了最大批次大小。此选项限制了 Ray Serve 一次执行的最大可能批次大小。

generator = BatchTextGenerator.bind("text-generation", "gpt2")

部署选项#

你可以通过两种方式部署应用

选项 1:使用 Serve 命令行接口部署#

$ serve run tutorial_batch:generator --name "Text-Completion-App"

选项 2:使用 Python API 部署#

或者,你可以使用 Python API 中的 serve.run 函数部署应用。此命令返回一个句柄,你可以使用该句柄来查询部署。

from ray.serve.handle import DeploymentHandle

handle: DeploymentHandle = serve.run(generator, name="Text-Completion-App")

现在你可以使用此句柄来查询模型。请参阅下面的 查询模型 部分。

查询模型#

有多种方法可以与你部署的模型交互

1. 简单 HTTP 查询#

对于基本测试,使用 curl

$ curl "http://localhost:8000/?text=Once+upon+a+time"

2. 使用 Ray 并行发送 HTTP 请求#

为了获得更高的吞吐量,使用 Ray 远程任务 发送并行请求

import ray
import requests

@ray.remote
def send_query(text):
    resp = requests.post("http://localhost:8000/", params={"text": text})
    return resp.text

# Example batch of queries
texts = [
    'Once upon a time,',
    'Hi my name is Lewis and I like to',
    'In a galaxy far far away',
]

# Send all queries in parallel
results = ray.get([send_query.remote(text) for text in texts])

3. 使用 DeploymentHandle 发送请求#

对于更 Pythonic 的模型查询方式,你可以直接使用部署句柄

import ray
from ray import serve

input_batch = [
    'Once upon a time,',
    'Hi my name is Lewis and I like to',
    'In a galaxy far far away',
]

# initialize using the 'auto' option to connect to the already-running Ray cluster
ray.init(address="auto")

handle = serve.get_deployment_handle("BatchTextGenerator", app_name="Text-Completion-App")
responses = [handle.handle_batch.remote(text) for text in input_batch]
results = [r.result() for r in responses]

性能考量#

  • 如果你有足够的内存并希望获得更高的吞吐量,请增加 max_batch_size - 这可能会增加延迟

  • 如果吞吐量比延迟更重要,请增加 batch_wait_timeout_s