端到端:离线批量推理#
离线批量推理是在固定输入数据集上生成模型预测的过程。Ray Data 为批量推理提供高效且可扩展的解决方案,为深度学习应用提供更快的执行速度和成本效益。

注意
本指南主要关注使用深度学习框架进行批量推理。有关使用 LLM 进行批量推理的更多信息,请参阅处理 LLMs。
快速入门#
首先,安装 Ray Data
pip install -U "ray[data]"
使用 Ray Data 进行离线推理涉及四个基本步骤
步骤 1:将您的数据加载到 Ray 数据集中。Ray Data 支持许多不同的数据源和格式。有关更多详细信息,请参阅加载数据。
步骤 2:定义一个 Python 类来加载预训练模型。
步骤 3:通过调用
ds.map_batches()
使用预训练模型转换您的数据集。有关更多详细信息,请参阅转换数据。
有关用例的更深入示例,请参阅批量推理示例。有关如何配置批量推理,请参阅配置指南。
from typing import Dict
import numpy as np
import ray
# Step 1: Create a Ray Dataset from in-memory Numpy arrays.
# You can also create a Ray Dataset from many other sources and file
# formats.
ds = ray.data.from_numpy(np.asarray(["Complete this", "for me"]))
# Step 2: Define a Predictor class for inference.
# Use a class to initialize the model just once in `__init__`
# and re-use it for inference across multiple batches.
class HuggingFacePredictor:
def __init__(self):
from transformers import pipeline
# Initialize a pre-trained GPT2 Huggingface pipeline.
self.model = pipeline("text-generation", model="gpt2")
# Logic for inference on 1 batch of data.
def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, list]:
# Get the predictions from the input batch.
predictions = self.model(list(batch["data"]), max_length=20, num_return_sequences=1)
# `predictions` is a list of length-one lists. For example:
# [[{'generated_text': 'output_1'}], ..., [{'generated_text': 'output_2'}]]
# Modify the output to get it into the following format instead:
# ['output_1', 'output_2']
batch["output"] = [sequences[0]["generated_text"] for sequences in predictions]
return batch
# Step 2: Map the Predictor over the Dataset to get predictions.
# Use 2 parallel actors for inference. Each actor predicts on a
# different partition of data.
predictions = ds.map_batches(HuggingFacePredictor, concurrency=2)
# Step 3: Show one prediction output.
predictions.show(limit=1)
{'data': 'Complete this', 'output': 'Complete this information or purchase any item from this site.\n\nAll purchases are final and non-'}
from typing import Dict
import numpy as np
import torch
import torch.nn as nn
import ray
# Step 1: Create a Ray Dataset from in-memory Numpy arrays.
# You can also create a Ray Dataset from many other sources and file
# formats.
ds = ray.data.from_numpy(np.ones((1, 100)))
# Step 2: Define a Predictor class for inference.
# Use a class to initialize the model just once in `__init__`
# and re-use it for inference across multiple batches.
class TorchPredictor:
def __init__(self):
# Load a dummy neural network.
# Set `self.model` to your pre-trained PyTorch model.
self.model = nn.Sequential(
nn.Linear(in_features=100, out_features=1),
nn.Sigmoid(),
)
self.model.eval()
# Logic for inference on 1 batch of data.
def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
tensor = torch.as_tensor(batch["data"], dtype=torch.float32)
with torch.inference_mode():
# Get the predictions from the input batch.
return {"output": self.model(tensor).numpy()}
# Step 2: Map the Predictor over the Dataset to get predictions.
# Use 2 parallel actors for inference. Each actor predicts on a
# different partition of data.
predictions = ds.map_batches(TorchPredictor, concurrency=2)
# Step 3: Show one prediction output.
predictions.show(limit=1)
{'output': array([0.5590901], dtype=float32)}
from typing import Dict
import numpy as np
import ray
# Step 1: Create a Ray Dataset from in-memory Numpy arrays.
# You can also create a Ray Dataset from many other sources and file
# formats.
ds = ray.data.from_numpy(np.ones((1, 100)))
# Step 2: Define a Predictor class for inference.
# Use a class to initialize the model just once in `__init__`
# and re-use it for inference across multiple batches.
class TFPredictor:
def __init__(self):
from tensorflow import keras
# Load a dummy neural network.
# Set `self.model` to your pre-trained Keras model.
input_layer = keras.Input(shape=(100,))
output_layer = keras.layers.Dense(1, activation="sigmoid")
self.model = keras.Sequential([input_layer, output_layer])
# Logic for inference on 1 batch of data.
def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
# Get the predictions from the input batch.
return {"output": self.model(batch["data"]).numpy()}
# Step 2: Map the Predictor over the Dataset to get predictions.
# Use 2 parallel actors for inference. Each actor predicts on a
# different partition of data.
predictions = ds.map_batches(TFPredictor, concurrency=2)
# Step 3: Show one prediction output.
predictions.show(limit=1)
{'output': array([0.625576], dtype=float32)}
Ray Data 提供与 vLLM 的原生集成,vLLM 是一个用于大型语言模型 (LLM) 的高性能推理引擎。
import ray
from ray.data.llm import vLLMEngineProcessorConfig, build_llm_processor
import numpy as np
config = vLLMEngineProcessorConfig(
model="unsloth/Llama-3.1-8B-Instruct",
engine_kwargs={
"enable_chunked_prefill": True,
"max_num_batched_tokens": 4096,
"max_model_len": 16384,
},
concurrency=1,
batch_size=64,
)
processor = build_llm_processor(
config,
preprocess=lambda row: dict(
messages=[
{"role": "system", "content": "You are a bot that responds with haikus."},
{"role": "user", "content": row["item"]}
],
sampling_params=dict(
temperature=0.3,
max_tokens=250,
)
),
postprocess=lambda row: dict(
answer=row["generated_text"]
),
)
ds = ray.data.from_items(["Start of the haiku is: Complete this for me..."])
ds = processor(ds)
ds.show(limit=1)
{'answer': 'Snowflakes gently fall\nBlanketing the winter scene\nFrozen peaceful hush'}
配置和故障排除#
使用 GPU 进行推理#
要使用 GPU 进行推理,请对您的代码进行以下更改
更新类实现,将模型和数据移入移出 GPU。
在
ds.map_batches()
调用中指定num_gpus=1
,以指示每个 Actor 应使用 1 个 GPU。指定推理的
batch_size
。有关如何配置批量大小的更多详细信息,请参阅配置批量大小。
其余部分与快速入门相同。
from typing import Dict
import numpy as np
import ray
ds = ray.data.from_numpy(np.asarray(["Complete this", "for me"]))
class HuggingFacePredictor:
def __init__(self):
from transformers import pipeline
# Set "cuda:0" as the device so the Huggingface pipeline uses GPU.
self.model = pipeline("text-generation", model="gpt2", device="cuda:0")
def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, list]:
predictions = self.model(list(batch["data"]), max_length=20, num_return_sequences=1)
batch["output"] = [sequences[0]["generated_text"] for sequences in predictions]
return batch
# Use 2 actors, each actor using 1 GPU. 2 GPUs total.
predictions = ds.map_batches(
HuggingFacePredictor,
num_gpus=1,
# Specify the batch size for inference.
# Increase this for larger datasets.
batch_size=1,
# Set the concurrency to the number of GPUs in your cluster.
concurrency=2,
)
predictions.show(limit=1)
{'data': 'Complete this', 'output': 'Complete this poll. Which one do you think holds the most promise for you?\n\nThank you'}
from typing import Dict
import numpy as np
import torch
import torch.nn as nn
import ray
ds = ray.data.from_numpy(np.ones((1, 100)))
class TorchPredictor:
def __init__(self):
# Move the neural network to GPU device by specifying "cuda".
self.model = nn.Sequential(
nn.Linear(in_features=100, out_features=1),
nn.Sigmoid(),
).cuda()
self.model.eval()
def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
# Move the input batch to GPU device by specifying "cuda".
tensor = torch.as_tensor(batch["data"], dtype=torch.float32, device="cuda")
with torch.inference_mode():
# Move the prediction output back to CPU before returning.
return {"output": self.model(tensor).cpu().numpy()}
# Use 2 actors, each actor using 1 GPU. 2 GPUs total.
predictions = ds.map_batches(
TorchPredictor,
num_gpus=1,
# Specify the batch size for inference.
# Increase this for larger datasets.
batch_size=1,
# Set the concurrency to the number of GPUs in your cluster.
concurrency=2,
)
predictions.show(limit=1)
{'output': array([0.5590901], dtype=float32)}
from typing import Dict
import numpy as np
import ray
ds = ray.data.from_numpy(np.ones((1, 100)))
class TFPredictor:
def __init__(self):
import tensorflow as tf
from tensorflow import keras
# Move the neural network to GPU by specifying the GPU device.
with tf.device("GPU:0"):
input_layer = keras.Input(shape=(100,))
output_layer = keras.layers.Dense(1, activation="sigmoid")
self.model = keras.Sequential([input_layer, output_layer])
def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
import tensorflow as tf
# Move the input batch to GPU by specifying GPU device.
with tf.device("GPU:0"):
return {"output": self.model(batch["data"]).numpy()}
# Use 2 actors, each actor using 1 GPU. 2 GPUs total.
predictions = ds.map_batches(
TFPredictor,
num_gpus=1,
# Specify the batch size for inference.
# Increase this for larger datasets.
batch_size=1,
# Set the concurrency to the number of GPUs in your cluster.
concurrency=2,
)
predictions.show(limit=1)
{'output': array([0.625576], dtype=float32)}
配置批量大小#
通过设置ds.map_batches()
的batch_size
参数,配置传递给__call__
的输入批量大小
增加批量大小可以加快执行速度,因为推理是向量化操作。对于 GPU 推理,增加批量大小可以提高 GPU 利用率。将批量大小设置为在不耗尽内存的情况下尽可能大。如果遇到内存不足错误,减小batch_size
可能会有所帮助。
import numpy as np
import ray
ds = ray.data.from_numpy(np.ones((10, 100)))
def assert_batch(batch: Dict[str, np.ndarray]):
assert len(batch) == 2
return batch
# Specify that each input batch should be of size 2.
ds.map_batches(assert_batch, batch_size=2)
注意
对于行较大的数据集(例如,具有多列的表或大型图像集合),默认的batch_size
4096
可能太大。
处理 GPU 内存不足故障#
如果您遇到 CUDA 内存不足问题,您的批量大小可能太大。请按照这些步骤减小批量大小。如果您的批量大小已设置为 1,则应使用较小的模型或具有更多内存的 GPU 设备。
对于使用大型模型的高级用户,您可以使用模型并行将模型分片到多个 GPU 上。
优化昂贵的 CPU 预处理#
如果您的工作负载除了模型推理外还涉及昂贵的 CPU 预处理,您可以通过将预处理和推理逻辑分离为单独的操作来优化吞吐量。这种分离允许批量 \(N\) 上的推理与批量 \(N+1\) 上的预处理并行执行。
有关在单独的map
调用中完成预处理的示例,请参阅使用 PyTorch ResNet18 进行图像分类批量推理。
处理 CPU 内存不足故障#
如果您用尽了 CPU RAM,很可能是同一节点上并发运行的模型副本过多。例如,如果一个模型在创建/运行时使用 5 GB RAM,而一台机器总共有 16 GB RAM,那么同时运行的模型不能超过三个。在这种情况下,每个任务/Actor 分配一个 CPU 的默认资源可能会导致 Ray 出现OutOfMemoryError
。
假设您的集群有 4 个节点,每个节点有 16 个 CPU。要将每个节点最多限制为 3 个这样的 Actor,您可以覆盖 CPU 或内存设置。
from typing import Dict
import numpy as np
import ray
ds = ray.data.from_numpy(np.asarray(["Complete this", "for me"]))
class HuggingFacePredictor:
def __init__(self):
from transformers import pipeline
self.model = pipeline("text-generation", model="gpt2")
def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, list]:
predictions = self.model(list(batch["data"]), max_length=20, num_return_sequences=1)
batch["output"] = [sequences[0]["generated_text"] for sequences in predictions]
return batch
predictions = ds.map_batches(
HuggingFacePredictor,
# Require 5 CPUs per actor (so at most 3 can fit per 16 CPU node).
num_cpus=5,
# 3 actors per node, with 4 nodes in the cluster means concurrency of 12.
concurrency=12,
)
predictions.show(limit=1)