使用 Ray Core 进行批量预测#
注意
对于大型数据集的批量推理的高级 API,请参阅使用 Ray Data 进行批量推理。本示例适用于希望对数据分片和执行有更多控制权的用户。
批量预测是使用训练好的模型为一组观测值生成预测的过程。它包含以下元素
输入数据集:这是一组用于生成预测的观测值集合。数据通常存储在 S3、HDFS 或数据库等外部存储系统中,并且可能很大。
ML 模型:这是经过训练的 ML 模型,通常也存储在外部存储系统中。
预测结果:这是将 ML 模型应用于观测值时的输出。预测结果通常会写回存储系统。
使用 Ray,您可以构建可扩展的批量预测,以实现大型数据集的高预测吞吐量。Ray Data 提供了一个用于离线批量推理的高级 API,具有内置优化。但是,为了获得更多控制权,您可以使用更底层的 Ray Core API。本示例演示了如何使用 Ray Core 进行批量推理,通过将数据集分割成不相交的分片,并在 Ray 集群中通过 Ray Tasks 或 Ray Actors 并行执行它们。
基于任务的批量预测#
使用 Ray 任务,您可以按以下方式构建批量预测程序
加载模型
启动 Ray 任务,每个任务接收模型和一个输入数据集分片
每个 worker 对分配的分片执行预测,并将结果写入
以 2009 年纽约市出租车数据为例。假设我们有这样一个简单模型
import pandas as pd
import numpy as np
def load_model():
# A dummy model.
def model(batch: pd.DataFrame) -> pd.DataFrame:
# Dummy payload so copying the model will actually copy some data
# across nodes.
model.payload = np.zeros(100_000_000)
return pd.DataFrame({"score": batch["passenger_count"] % 2 == 0})
return model
数据集包含 12 个文件(每个月一个),因此很自然地可以让每个 Ray 任务处理一个文件。通过接收模型和输入数据集的一个分片(即单个文件),我们可以定义一个 Ray 远程任务用于预测
import pyarrow.parquet as pq
import ray
@ray.remote
def make_prediction(model, shard_path):
df = pq.read_table(shard_path).to_pandas()
result = model(df)
# Write out the prediction result.
# NOTE: unless the driver will have to further process the
# result (other than simply writing out to storage system),
# writing out at remote task is recommended, as it can avoid
# congesting or overloading the driver.
# ...
# Here we just return the size about the result in this example.
return len(result)
驱动程序启动整个输入数据集的所有任务。
# 12 files, one for each remote task.
input_files = [
f"s3://anonymous@air-example-data/ursa-labs-taxi-data/downsampled_2009_full_year_data.parquet"
f"/fe41422b01c04169af2a65a83b753e0f_{i:06d}.parquet"
for i in range(12)
]
# ray.put() the model just once to local object store, and then pass the
# reference to the remote tasks.
model = load_model()
model_ref = ray.put(model)
result_refs = []
# Launch all prediction tasks.
for file in input_files:
# Launch a prediction task by passing model reference and shard file to it.
# NOTE: it would be highly inefficient if you are passing the model itself
# like make_prediction.remote(model, file), which in order to pass the model
# to remote node will ray.put(model) for each task, potentially overwhelming
# the local object store and causing out-of-disk error.
result_refs.append(make_prediction.remote(model_ref, file))
results = ray.get(result_refs)
# Let's check prediction output size.
for r in results:
print("Prediction output size:", r)
为了不使集群过载并导致 OOM,我们可以通过为任务设置适当的资源要求来控制并行度,有关此设计模式的详细信息,请参阅模式:使用资源限制并发运行的任务数量。例如,如果您很容易估算出从外部存储加载的数据的内存大小,您可以通过指定每个任务所需的内存量来控制并行度,例如使用 make_prediction.options(memory=100*1023*1025).remote(model_ref, file)
启动任务。Ray 将会正确处理,确保调度到节点的任务不会超过其总内存。
提示
为了避免重复将相同的模型存储到对象存储(这可能会导致驱动节点磁盘空间不足),请使用 ray.put() 将模型存储一次,然后传递引用。
提示
为了避免驱动节点拥塞或过载,最好让每个任务自行写入预测结果(而不是将结果返回给驱动程序,驱动程序实际上除了写入存储系统外什么也不做)。
基于 Actor 的批量预测#
在上述解决方案中,每个 Ray 任务在开始执行预测之前都必须从驱动节点获取模型。如果模型很大,这可能会产生显著的开销。我们可以通过使用 Ray actor 来优化它,actor 只会获取一次模型,并将其重用于分配给该 actor 的所有任务。
首先,我们定义一个可调用类,其结构包含一个用于加载/缓存模型的接口(即构造函数),另一个接口接收文件并执行预测。
import pandas as pd
import pyarrow.parquet as pq
import ray
@ray.remote
class BatchPredictor:
def __init__(self, model):
self.model = model
def predict(self, shard_path):
df = pq.read_table(shard_path).to_pandas()
result =self.model(df)
# Write out the prediction result.
# NOTE: unless the driver will have to further process the
# result (other than simply writing out to storage system),
# writing out at remote task is recommended, as it can avoid
# congesting or overloading the driver.
# ...
# Here we just return the size about the result in this example.
return len(result)
构造函数每个 actor worker 只调用一次。我们使用 ActorPool 来管理一组可以接收预测请求的 actor。
from ray.util.actor_pool import ActorPool
model = load_model()
model_ref = ray.put(model)
num_actors = 4
actors = [BatchPredictor.remote(model_ref) for _ in range(num_actors)]
pool = ActorPool(actors)
input_files = [
f"s3://anonymous@air-example-data/ursa-labs-taxi-data/downsampled_2009_full_year_data.parquet"
f"/fe41422b01c04169af2a65a83b753e0f_{i:06d}.parquet"
for i in range(12)
]
for file in input_files:
pool.submit(lambda a, v: a.predict.remote(v), file)
while pool.has_next():
print("Prediction output size:", pool.get_next())
请注意,ActorPool 的大小是固定的,这与基于任务的方法不同,后者中并行任务的数量可以是动态的(只要不超过 max_in_flight_tasks)。要实现自动扩缩容的 actor 池,您需要使用Ray Data 批量预测。
使用 GPU 进行批量预测#
如果您的集群有 GPU 节点并且您的预测器可以利用 GPU,您可以通过指定 num_gpus 将任务或 actor 指向这些 GPU 节点。Ray 将相应地将它们调度到 GPU 节点上。在节点上,您需要将模型移动到 GPU。以下是 Torch 模型的一个示例。
import torch
@ray.remote(num_gpus=1)
def make_torch_prediction(model: torch.nn.Module, shard_path):
# Move model to GPU.
model.to(torch.device("cuda"))
inputs = pq.read_table(shard_path).to_pandas().to_numpy()
results = []
# for each tensor in inputs:
# results.append(model(tensor))
#
# Write out the results right in task instead of returning back
# to the driver node (unless you have to), to avoid congest/overload
# driver node.
# ...
# Here we just return simple/light meta information.
return len(results)
常见问题#
如果模型很大,如何在 Ray 集群中高效地加载和传递模型?#
推荐的方法是(以基于任务的批量预测为例,基于 actor 的方法相同)
让驱动程序加载模型(例如从存储系统)
让驱动程序 ray.put(model) 将模型存储到对象存储中;并且
在启动远程任务时,将模型的相同引用传递给每个远程任务。远程任务在开始执行预测之前,会将模型(从驱动程序的对象存储中)获取到其本地对象存储中。
请注意,如果您跳过第 2 步并将模型(而不是引用)传递给远程任务,效率会非常低。如果模型很大并且任务很多,很可能会导致驱动节点磁盘空间不足而崩溃。
# GOOD: the model will be stored to driver's object store only once
model = load_model()
model_ref = ray.put(model)
for file in input_files:
make_prediction.remote(model_ref, file)
# BAD: the same model will be stored to driver's object store repeatedly for each task
model = load_model()
for file in input_files:
make_prediction.remote(model, file)
更多详细信息,请查看反模式:重复按值传递相同的较大参数会损害性能。
如何提高 GPU 利用率?#
要保持 GPU 繁忙,可以考虑以下几点
如果同一 GPU 节点有多个 GPU,则在同一节点上调度多个任务:如果同一节点上有多个 GPU 并且单个任务无法全部使用它们,您可以将多个任务指向该节点。这由 Ray 自动处理,例如,如果您指定 num_gpus=1 并且有 4 个 GPU,Ray 将会调度 4 个任务到该节点,前提是有足够的任务且没有其他资源限制。
使用基于 actor 的方法:如上所述,基于 actor 的方法更高效,因为它为许多任务重用了模型初始化,因此节点将把更多时间花在实际工作负载上。