动态请求批量处理#

Serve 提供请求批量处理功能,可以在不牺牲延迟的情况下提高服务吞吐量。之所以能实现这一点,是因为 ML 模型可以利用高效的向量化计算一次处理批量请求。当模型使用成本较高且你想最大限度地利用硬件时,批量处理也是必要的。

TensorFlow、PyTorch 和 Scikit-Learn 等机器学习 (ML) 框架支持同时评估多个样本。Ray Serve 通过动态请求批量处理功能让你利用此特性。当请求到达时,Serve 将请求放入队列。此队列缓冲请求以形成批次。部署接收批次并对其进行评估。评估完成后,Ray Serve 会拆分结果批次,并单独返回每个响应。

为你的部署启用批量处理#

你可以使用 ray.serve.batch 装饰器启用批量处理。以下简单示例修改了 Model 类以接受批次

from ray import serve
from ray.serve.handle import DeploymentHandle


@serve.deployment
class Model:
    def __call__(self, single_sample: int) -> int:
        return single_sample * 2


handle: DeploymentHandle = serve.run(Model.bind())
assert handle.remote(1).result() == 2

批量处理装饰器要求你修改方法签名如下

  • 将方法声明为异步方法,因为装饰器在 asyncio 事件循环中执行批量处理。

  • 修改方法以接受其原始输入类型组成的列表作为输入。例如,arg1: int, arg2: str 应更改为 arg1: List[int], arg2: List[str]

  • 修改方法以返回一个列表。返回列表的长度和输入列表的长度必须相等,以便装饰器能够平均拆分输出,并将相应的响应返回给各自的请求。

from typing import List

import numpy as np

from ray import serve
from ray.serve.handle import DeploymentHandle


@serve.deployment
class Model:
    @serve.batch(max_batch_size=8, batch_wait_timeout_s=0.1)
    async def __call__(self, multiple_samples: List[int]) -> List[int]:
        # Use numpy's vectorized computation to efficiently process a batch.
        return np.array(multiple_samples) * 2


handle: DeploymentHandle = serve.run(Model.bind())
responses = [handle.remote(i) for i in range(8)]
assert list(r.result() for r in responses) == [i * 2 for i in range(8)]

你可以向装饰器提供两个可选参数。

  • batch_wait_timeout_s 控制 Serve 在第一个请求到达后应等待多久才能形成批次。

  • max_batch_size 控制批次的大小。第一个请求到达后,批量处理装饰器会等待直到达到 batch_wait_timeout_s 或形成一个完整批次(最多 max_batch_size)。如果达到超时,Serve 会将批次发送到模型,无论批次大小如何。

提示

你可以使用 set_batch_wait_timeout_sset_max_batch_size 方法重新配置你的 batch_wait_timeout_smax_batch_size 参数

from typing import Dict


@serve.deployment(
    # These values can be overridden in the Serve config.
    user_config={
        "max_batch_size": 10,
        "batch_wait_timeout_s": 0.5,
    }
)
class Model:
    @serve.batch(max_batch_size=8, batch_wait_timeout_s=0.1)
    async def __call__(self, multiple_samples: List[int]) -> List[int]:
        # Use numpy's vectorized computation to efficiently process a batch.
        return np.array(multiple_samples) * 2

    def reconfigure(self, user_config: Dict):
        self.__call__.set_max_batch_size(user_config["max_batch_size"])
        self.__call__.set_batch_wait_timeout_s(user_config["batch_wait_timeout_s"])


在构造函数或 reconfigure 方法中使用这些方法,通过你的 Serve 配置文件控制 @serve.batch 参数。

流式批量请求#

使用异步生成器流式传输批量请求的输出。以下示例将 StreamingResponder 类转换为接受批次。

import asyncio
from typing import AsyncGenerator
from starlette.requests import Request
from starlette.responses import StreamingResponse

from ray import serve


@serve.deployment
class StreamingResponder:
    async def generate_numbers(self, max: str) -> AsyncGenerator[str, None]:
        for i in range(max):
            yield str(i)
            await asyncio.sleep(0.1)

    def __call__(self, request: Request) -> StreamingResponse:
        max = int(request.query_params.get("max", "25"))
        gen = self.generate_numbers(max)
        return StreamingResponse(gen, status_code=200, media_type="text/plain")


使用 ray.serve.batch 装饰器装饰异步生成器函数。与非流式方法类似,该函数接受一个输入 List,并在每次迭代中 yield 一个与输入批次大小长度相同的输出迭代器。

import asyncio
from typing import List, AsyncGenerator, Union
from starlette.requests import Request
from starlette.responses import StreamingResponse

from ray import serve


@serve.deployment
class StreamingResponder:
    @serve.batch(max_batch_size=5, batch_wait_timeout_s=0.1)
    async def generate_numbers(
        self, max_list: List[str]
    ) -> AsyncGenerator[List[Union[int, StopIteration]], None]:
        for i in range(max(max_list)):
            next_numbers = []
            for requested_max in max_list:
                if requested_max > i:
                    next_numbers.append(str(i))
                else:
                    next_numbers.append(StopIteration)
            yield next_numbers
            await asyncio.sleep(0.1)

    async def __call__(self, request: Request) -> StreamingResponse:
        max = int(request.query_params.get("max", "25"))
        gen = self.generate_numbers(max)
        return StreamingResponse(gen, status_code=200, media_type="text/plain")


调用用 serve.batch 装饰的函数会返回一个异步生成器,你可以通过 await 来接收结果。

批次中的某些输入可能会比其他输入产生更少的输出。当特定输入没有更多内容可 yield 时,将一个 StopIteration 对象传递到输出迭代器中。此操作会终止 Serve 在使用该输入调用 serve.batch 函数时返回的生成器。当用 serve.batch 装饰的函数通过 HTTP 返回流式生成器时,此操作允许终端客户端在其调用完成后终止连接,而无需等待整个批次完成。

微调批量处理参数的技巧#

max_batch_size 理想情况下应为 2 的幂(2, 4, 8, 16, ...),因为 CPU 和 GPU 都针对这些形状的数据进行了优化。较大的批次大小会带来高内存成本以及对前几个请求的延迟惩罚。

设置 batch_wait_timeout_s 时应考虑端到端延迟 SLO(服务水平目标)。例如,如果你的延迟目标是 150ms,并且模型评估批次需要 100ms,则将 batch_wait_timeout_s 设置为远低于 150ms - 100ms = 50ms 的值。

在 Serve 部署图中使用批量处理时,上游节点和下游节点之间的关系也可能影响性能。考虑一个由两个模型组成的链,第一个模型设置 max_batch_size=8,第二个模型设置 max_batch_size=6。在这种情况下,当第一个模型完成一个包含 8 个请求的完整批次时,第二个模型会完成一个包含 6 个请求的批次,然后填充下一个批次,Serve 最初只会部分填充 8 - 6 = 2 个请求,这会导致额外的延迟成本。下游模型的批次大小理想情况下应是上游模型批次大小的倍数或因数,以确保批次协同工作达到最佳效果。