动态请求批处理#

Serve 提供请求批处理功能,可以在不牺牲延迟的情况下提高服务吞吐量。之所以能实现这种改进,是因为机器学习模型可以利用高效的矢量化计算一次处理一批请求。当模型使用成本较高且您希望最大化硬件利用率时,批处理也是必需的。

Tensorflow、PyTorch 和 Scikit-Learn 等机器学习(ML)框架支持同时评估多个样本。Ray Serve 允许您利用动态请求批处理这一功能。当请求到达时,Serve 会将请求放入队列。此队列缓冲请求以形成批次。部署会接收批次并进行评估。评估完成后,Ray Serve 会将结果批次拆分,然后单独返回每个响应。

为您的部署启用批处理#

您可以通过使用 ray.serve.batch 装饰器来启用批处理。以下简单示例修改了 Model 类以接受批次

from ray import serve
from ray.serve.handle import DeploymentHandle


@serve.deployment
class Model:
    def __call__(self, single_sample: int) -> int:
        return single_sample * 2


handle: DeploymentHandle = serve.run(Model.bind())
assert handle.remote(1).result() == 2

批处理装饰器要求您在方法签名中进行以下更改

  • 将方法声明为异步方法,因为装饰器在 asyncio 事件循环中进行批处理。

  • 修改方法以接受其原始输入类型的列表作为输入。例如,arg1: int, arg2: str 应更改为 arg1: List[int], arg2: List[str]

  • 修改方法以返回列表。返回列表的长度和输入列表的长度必须相同,这样装饰器才能均匀拆分输出并向各自的请求返回相应的响应。

from typing import List

import numpy as np

from ray import serve
from ray.serve.handle import DeploymentHandle


@serve.deployment
class Model:
    @serve.batch(max_batch_size=8, batch_wait_timeout_s=0.1)
    async def __call__(self, multiple_samples: List[int]) -> List[int]:
        # Use numpy's vectorized computation to efficiently process a batch.
        return np.array(multiple_samples) * 2


handle: DeploymentHandle = serve.run(Model.bind())
responses = [handle.remote(i) for i in range(8)]
assert list(r.result() for r in responses) == [i * 2 for i in range(8)]

您可以为装饰器提供 4 个可选参数

  • max_batch_size 控制批次的大小。默认值为 10。

  • batch_wait_timeout_s 控制 Serve 在第一个请求到达后需要等待多长时间才能形成批次。默认值为 0.01(10 毫秒)。

  • max_concurrent_batches 可以并发运行的最大批次数。默认值为 1。

  • batch_size_fn 用于计算有效批次大小的可选函数。如果提供,此函数将接收一个项目列表并返回一个表示批次大小的整数。这对于基于图的总节点数或序列的总 token 数等自定义指标进行批处理非常有用。如果为 None(默认值),则批次大小计算为 len(batch)

一旦第一个请求到达,批处理装饰器就会等待一个完整的批次(最多 max_batch_size)直到达到 batch_wait_timeout_s。如果达到超时,Serve 将把批次发送到模型,而不管批次大小如何。

提示

您可以使用 set_batch_wait_timeout_sset_max_batch_size 方法重新配置您的 batch_wait_timeout_smax_batch_size 参数

from typing import Dict


@serve.deployment(
    # These values can be overridden in the Serve config.
    user_config={
        "max_batch_size": 10,
        "batch_wait_timeout_s": 0.5,
    }
)
class Model:
    @serve.batch(max_batch_size=8, batch_wait_timeout_s=0.1)
    async def __call__(self, multiple_samples: List[int]) -> List[int]:
        # Use numpy's vectorized computation to efficiently process a batch.
        return np.array(multiple_samples) * 2

    def reconfigure(self, user_config: Dict):
        self.__call__.set_max_batch_size(user_config["max_batch_size"])
        self.__call__.set_batch_wait_timeout_s(user_config["batch_wait_timeout_s"])


在构造函数或 reconfigure 方法中使用这些方法,通过 Serve 配置文件控制 @serve.batch 参数。

自定义批次大小函数#

默认情况下,Ray Serve 将批次大小衡量为批次中的项目数(len(batch))。然而,在许多工作负载中,计算成本取决于项目本身的属性,而不仅仅是数量。例如:

  • 图神经网络(GNNs):成本取决于所有图的总节点数,而不是图的数量

  • 自然语言处理(NLP):Transformer 模型按总 token 数进行批处理,而不是按序列数

  • 可变分辨率图像:内存使用量取决于总像素数,而不是图像数量

使用 batch_size_fn 参数定义批次大小的自定义指标

图神经网络示例#

以下示例展示了如何按总节点数对图数据进行批处理

from typing import List

from ray import serve
from ray.serve.handle import DeploymentHandle


class Graph:
    """Simple graph data structure for GNN workloads."""

    def __init__(self, num_nodes: int, node_features: list):
        self.num_nodes = num_nodes
        self.node_features = node_features


@serve.deployment
class GraphNeuralNetwork:
    @serve.batch(
        max_batch_size=10000,  # Maximum total nodes per batch
        batch_wait_timeout_s=0.1,
        batch_size_fn=lambda graphs: sum(g.num_nodes for g in graphs),
    )
    async def predict(self, graphs: List[Graph]) -> List[float]:
        """Process a batch of graphs, batching by total node count."""
        # The batch_size_fn ensures that the total number of nodes
        # across all graphs in the batch doesn't exceed max_batch_size.
        # This prevents GPU memory overflow.
        results = []
        for graph in graphs:
            # Your GNN model inference logic here
            # For this example, just return a simple score
            score = float(graph.num_nodes * 0.1)
            results.append(score)
        return results

    async def __call__(self, graph: Graph) -> float:
        return await self.predict(graph)


handle: DeploymentHandle = serve.run(GraphNeuralNetwork.bind())

# Create test graphs with varying node counts
graphs = [
    Graph(num_nodes=100, node_features=[1.0] * 100),
    Graph(num_nodes=5000, node_features=[2.0] * 5000),
    Graph(num_nodes=3000, node_features=[3.0] * 3000),
]

# Send requests - they'll be batched by total node count
results = [handle.remote(g).result() for g in graphs]
print(f"Results: {results}")

在此示例中,batch_size_fn=lambda graphs: sum(g.num_nodes for g in graphs) 确保批次包含最多 10,000 个总节点,从而防止 GPU 内存溢出,而不管批次中包含多少单个图。

NLP token 批处理示例#

以下示例展示了如何按总 token 数对文本序列进行批处理

from typing import List

from ray import serve
from ray.serve.handle import DeploymentHandle


@serve.deployment
class TokenBatcher:
    @serve.batch(
        max_batch_size=512,  # Maximum total tokens per batch
        batch_wait_timeout_s=0.1,
        batch_size_fn=lambda sequences: sum(len(s.split()) for s in sequences),
    )
    async def process(self, sequences: List[str]) -> List[int]:
        """Process text sequences, batching by total token count."""
        # The batch_size_fn ensures total tokens don't exceed max_batch_size.
        # This is useful for transformer models with fixed context windows.
        return [len(seq.split()) for seq in sequences]

    async def __call__(self, sequence: str) -> int:
        return await self.process(sequence)


handle: DeploymentHandle = serve.run(TokenBatcher.bind())

# Create sequences with different lengths
sequences = [
    "This is a short sentence",
    "This is a much longer sentence with many more words to process",
    "Short",
]

# Send requests - they'll be batched by total token count
results = [handle.remote(seq).result() for seq in sequences]
print(f"Token counts: {results}")

此模式可确保总 token 数不超过模型的上下文窗口或内存限制。

流式批处理请求#

使用异步生成器从批处理请求中流式传输输出。以下示例将 StreamingResponder 类修改为接受批次。

import asyncio
from typing import AsyncGenerator
from starlette.requests import Request
from starlette.responses import StreamingResponse

from ray import serve


@serve.deployment
class StreamingResponder:
    async def generate_numbers(self, max: str) -> AsyncGenerator[str, None]:
        for i in range(max):
            yield str(i)
            await asyncio.sleep(0.1)

    def __call__(self, request: Request) -> StreamingResponse:
        max = int(request.query_params.get("max", "25"))
        gen = self.generate_numbers(max)
        return StreamingResponse(gen, status_code=200, media_type="text/plain")


ray.serve.batch 装饰器装饰异步生成器函数。与非流式方法类似,该函数接收一个输入列表,并在每次迭代中 yield 一个与输入批次大小长度相同的输出可迭代对象。

import asyncio
from typing import List, AsyncGenerator, Union
from starlette.requests import Request
from starlette.responses import StreamingResponse

from ray import serve


@serve.deployment
class StreamingResponder:
    @serve.batch(max_batch_size=5, batch_wait_timeout_s=0.1)
    async def generate_numbers(
        self, max_list: List[str]
    ) -> AsyncGenerator[List[Union[int, StopIteration]], None]:
        for i in range(max(max_list)):
            next_numbers = []
            for requested_max in max_list:
                if requested_max > i:
                    next_numbers.append(str(i))
                else:
                    next_numbers.append(StopIteration)
            yield next_numbers
            await asyncio.sleep(0.1)

    async def __call__(self, request: Request) -> StreamingResponse:
        max = int(request.query_params.get("max", "25"))
        gen = self.generate_numbers(max)
        return StreamingResponse(gen, status_code=200, media_type="text/plain")


调用 serve.batch 装饰的函数将返回一个异步生成器,您可以 await 它来接收结果。

批次中的某些输入可能产生的输出少于其他输入。当特定输入不再有要生成的内容时,请将 StopIteration 对象传递到输出可迭代对象中。此操作会终止 Serve 在调用具有该输入的 serve.batch 函数时返回的生成器。当 serve.batch 装饰的函数通过 HTTP 返回流式生成器时,此操作允许结束客户端的连接在其调用完成后终止,而不是等待整个批次完成。

微调批处理参数的技巧#

max_batch_size 最好是 2 的幂(2、4、8、16、……),因为 CPU 和 GPU 都针对这些形状的数据进行了优化。大批次大小会带来高内存成本以及前几个请求的延迟惩罚。

在使用 batch_size_fn 时,根据自定义指标设置 max_batch_size,而不是按项目计数。例如,如果按图中的总节点数进行批处理,则将 max_batch_size 设置为 GPU 的最大节点容量(例如 10,000 个节点),而不是图的数量。

设置 batch_wait_timeout_s 时,请考虑端到端延迟 SLO(服务级别目标)。例如,如果您的延迟目标是 150 毫秒,而模型评估批次需要 100 毫秒,则将 batch_wait_timeout_s 设置为远低于 150 毫秒 - 100 毫秒 = 50 毫秒的值。

在 Serve 部署图中进行批处理时,上游节点和下游节点之间的关系也可能影响性能。考虑一个由两个模型组成的链,第一个模型设置 max_batch_size=8,第二个模型设置 max_batch_size=6。在这种情况下,当第一个模型完成一个包含 8 个请求的完整批次时,第二个模型完成一个包含 6 个请求的批次,然后为了填充下一个批次(Serve 最初只部分填充了 8 - 6 = 2 个请求),会导致延迟成本。下游模型的批次大小最好是上游模型的倍数或因子,以确保批次能够协同工作。