动态请求批处理#
Serve 提供请求批处理功能,可以在不牺牲延迟的情况下提高服务吞吐量。之所以能实现这种改进,是因为机器学习模型可以利用高效的矢量化计算一次处理一批请求。当模型使用成本较高且您希望最大化硬件利用率时,批处理也是必需的。
Tensorflow、PyTorch 和 Scikit-Learn 等机器学习(ML)框架支持同时评估多个样本。Ray Serve 允许您利用动态请求批处理这一功能。当请求到达时,Serve 会将请求放入队列。此队列缓冲请求以形成批次。部署会接收批次并进行评估。评估完成后,Ray Serve 会将结果批次拆分,然后单独返回每个响应。
为您的部署启用批处理#
您可以通过使用 ray.serve.batch 装饰器来启用批处理。以下简单示例修改了 Model 类以接受批次
from ray import serve
from ray.serve.handle import DeploymentHandle
@serve.deployment
class Model:
def __call__(self, single_sample: int) -> int:
return single_sample * 2
handle: DeploymentHandle = serve.run(Model.bind())
assert handle.remote(1).result() == 2
批处理装饰器要求您在方法签名中进行以下更改
将方法声明为异步方法,因为装饰器在 asyncio 事件循环中进行批处理。
修改方法以接受其原始输入类型的列表作为输入。例如,
arg1: int, arg2: str应更改为arg1: List[int], arg2: List[str]。修改方法以返回列表。返回列表的长度和输入列表的长度必须相同,这样装饰器才能均匀拆分输出并向各自的请求返回相应的响应。
from typing import List
import numpy as np
from ray import serve
from ray.serve.handle import DeploymentHandle
@serve.deployment
class Model:
@serve.batch(max_batch_size=8, batch_wait_timeout_s=0.1)
async def __call__(self, multiple_samples: List[int]) -> List[int]:
# Use numpy's vectorized computation to efficiently process a batch.
return np.array(multiple_samples) * 2
handle: DeploymentHandle = serve.run(Model.bind())
responses = [handle.remote(i) for i in range(8)]
assert list(r.result() for r in responses) == [i * 2 for i in range(8)]
您可以为装饰器提供 4 个可选参数
max_batch_size控制批次的大小。默认值为 10。batch_wait_timeout_s控制 Serve 在第一个请求到达后需要等待多长时间才能形成批次。默认值为 0.01(10 毫秒)。max_concurrent_batches可以并发运行的最大批次数。默认值为 1。batch_size_fn用于计算有效批次大小的可选函数。如果提供,此函数将接收一个项目列表并返回一个表示批次大小的整数。这对于基于图的总节点数或序列的总 token 数等自定义指标进行批处理非常有用。如果为None(默认值),则批次大小计算为len(batch)。
一旦第一个请求到达,批处理装饰器就会等待一个完整的批次(最多 max_batch_size)直到达到 batch_wait_timeout_s。如果达到超时,Serve 将把批次发送到模型,而不管批次大小如何。
提示
您可以使用 set_batch_wait_timeout_s 和 set_max_batch_size 方法重新配置您的 batch_wait_timeout_s 和 max_batch_size 参数
from typing import Dict
@serve.deployment(
# These values can be overridden in the Serve config.
user_config={
"max_batch_size": 10,
"batch_wait_timeout_s": 0.5,
}
)
class Model:
@serve.batch(max_batch_size=8, batch_wait_timeout_s=0.1)
async def __call__(self, multiple_samples: List[int]) -> List[int]:
# Use numpy's vectorized computation to efficiently process a batch.
return np.array(multiple_samples) * 2
def reconfigure(self, user_config: Dict):
self.__call__.set_max_batch_size(user_config["max_batch_size"])
self.__call__.set_batch_wait_timeout_s(user_config["batch_wait_timeout_s"])
在构造函数或 reconfigure 方法中使用这些方法,通过 Serve 配置文件控制 @serve.batch 参数。
自定义批次大小函数#
默认情况下,Ray Serve 将批次大小衡量为批次中的项目数(len(batch))。然而,在许多工作负载中,计算成本取决于项目本身的属性,而不仅仅是数量。例如:
图神经网络(GNNs):成本取决于所有图的总节点数,而不是图的数量
自然语言处理(NLP):Transformer 模型按总 token 数进行批处理,而不是按序列数
可变分辨率图像:内存使用量取决于总像素数,而不是图像数量
使用 batch_size_fn 参数定义批次大小的自定义指标
图神经网络示例#
以下示例展示了如何按总节点数对图数据进行批处理
from typing import List
from ray import serve
from ray.serve.handle import DeploymentHandle
class Graph:
"""Simple graph data structure for GNN workloads."""
def __init__(self, num_nodes: int, node_features: list):
self.num_nodes = num_nodes
self.node_features = node_features
@serve.deployment
class GraphNeuralNetwork:
@serve.batch(
max_batch_size=10000, # Maximum total nodes per batch
batch_wait_timeout_s=0.1,
batch_size_fn=lambda graphs: sum(g.num_nodes for g in graphs),
)
async def predict(self, graphs: List[Graph]) -> List[float]:
"""Process a batch of graphs, batching by total node count."""
# The batch_size_fn ensures that the total number of nodes
# across all graphs in the batch doesn't exceed max_batch_size.
# This prevents GPU memory overflow.
results = []
for graph in graphs:
# Your GNN model inference logic here
# For this example, just return a simple score
score = float(graph.num_nodes * 0.1)
results.append(score)
return results
async def __call__(self, graph: Graph) -> float:
return await self.predict(graph)
handle: DeploymentHandle = serve.run(GraphNeuralNetwork.bind())
# Create test graphs with varying node counts
graphs = [
Graph(num_nodes=100, node_features=[1.0] * 100),
Graph(num_nodes=5000, node_features=[2.0] * 5000),
Graph(num_nodes=3000, node_features=[3.0] * 3000),
]
# Send requests - they'll be batched by total node count
results = [handle.remote(g).result() for g in graphs]
print(f"Results: {results}")
在此示例中,batch_size_fn=lambda graphs: sum(g.num_nodes for g in graphs) 确保批次包含最多 10,000 个总节点,从而防止 GPU 内存溢出,而不管批次中包含多少单个图。
NLP token 批处理示例#
以下示例展示了如何按总 token 数对文本序列进行批处理
from typing import List
from ray import serve
from ray.serve.handle import DeploymentHandle
@serve.deployment
class TokenBatcher:
@serve.batch(
max_batch_size=512, # Maximum total tokens per batch
batch_wait_timeout_s=0.1,
batch_size_fn=lambda sequences: sum(len(s.split()) for s in sequences),
)
async def process(self, sequences: List[str]) -> List[int]:
"""Process text sequences, batching by total token count."""
# The batch_size_fn ensures total tokens don't exceed max_batch_size.
# This is useful for transformer models with fixed context windows.
return [len(seq.split()) for seq in sequences]
async def __call__(self, sequence: str) -> int:
return await self.process(sequence)
handle: DeploymentHandle = serve.run(TokenBatcher.bind())
# Create sequences with different lengths
sequences = [
"This is a short sentence",
"This is a much longer sentence with many more words to process",
"Short",
]
# Send requests - they'll be batched by total token count
results = [handle.remote(seq).result() for seq in sequences]
print(f"Token counts: {results}")
此模式可确保总 token 数不超过模型的上下文窗口或内存限制。
流式批处理请求#
使用异步生成器从批处理请求中流式传输输出。以下示例将 StreamingResponder 类修改为接受批次。
import asyncio
from typing import AsyncGenerator
from starlette.requests import Request
from starlette.responses import StreamingResponse
from ray import serve
@serve.deployment
class StreamingResponder:
async def generate_numbers(self, max: str) -> AsyncGenerator[str, None]:
for i in range(max):
yield str(i)
await asyncio.sleep(0.1)
def __call__(self, request: Request) -> StreamingResponse:
max = int(request.query_params.get("max", "25"))
gen = self.generate_numbers(max)
return StreamingResponse(gen, status_code=200, media_type="text/plain")
用 ray.serve.batch 装饰器装饰异步生成器函数。与非流式方法类似,该函数接收一个输入列表,并在每次迭代中 yield 一个与输入批次大小长度相同的输出可迭代对象。
import asyncio
from typing import List, AsyncGenerator, Union
from starlette.requests import Request
from starlette.responses import StreamingResponse
from ray import serve
@serve.deployment
class StreamingResponder:
@serve.batch(max_batch_size=5, batch_wait_timeout_s=0.1)
async def generate_numbers(
self, max_list: List[str]
) -> AsyncGenerator[List[Union[int, StopIteration]], None]:
for i in range(max(max_list)):
next_numbers = []
for requested_max in max_list:
if requested_max > i:
next_numbers.append(str(i))
else:
next_numbers.append(StopIteration)
yield next_numbers
await asyncio.sleep(0.1)
async def __call__(self, request: Request) -> StreamingResponse:
max = int(request.query_params.get("max", "25"))
gen = self.generate_numbers(max)
return StreamingResponse(gen, status_code=200, media_type="text/plain")
调用 serve.batch 装饰的函数将返回一个异步生成器,您可以 await 它来接收结果。
批次中的某些输入可能产生的输出少于其他输入。当特定输入不再有要生成的内容时,请将 StopIteration 对象传递到输出可迭代对象中。此操作会终止 Serve 在调用具有该输入的 serve.batch 函数时返回的生成器。当 serve.batch 装饰的函数通过 HTTP 返回流式生成器时,此操作允许结束客户端的连接在其调用完成后终止,而不是等待整个批次完成。
微调批处理参数的技巧#
max_batch_size 最好是 2 的幂(2、4、8、16、……),因为 CPU 和 GPU 都针对这些形状的数据进行了优化。大批次大小会带来高内存成本以及前几个请求的延迟惩罚。
在使用 batch_size_fn 时,根据自定义指标设置 max_batch_size,而不是按项目计数。例如,如果按图中的总节点数进行批处理,则将 max_batch_size 设置为 GPU 的最大节点容量(例如 10,000 个节点),而不是图的数量。
设置 batch_wait_timeout_s 时,请考虑端到端延迟 SLO(服务级别目标)。例如,如果您的延迟目标是 150 毫秒,而模型评估批次需要 100 毫秒,则将 batch_wait_timeout_s 设置为远低于 150 毫秒 - 100 毫秒 = 50 毫秒的值。
在 Serve 部署图中进行批处理时,上游节点和下游节点之间的关系也可能影响性能。考虑一个由两个模型组成的链,第一个模型设置 max_batch_size=8,第二个模型设置 max_batch_size=6。在这种情况下,当第一个模型完成一个包含 8 个请求的完整批次时,第二个模型完成一个包含 6 个请求的批次,然后为了填充下一个批次(Serve 最初只部分填充了 8 - 6 = 2 个请求),会导致延迟成本。下游模型的批次大小最好是上游模型的倍数或因子,以确保批次能够协同工作。