Ray Core 批量预测#
注意
对于用于处理大型数据集的高级批量推理 API,请参阅 Ray Data 批量推理。本示例适用于希望对数据分片和执行有更多控制的用户。
批量预测是使用训练好的模型为一组观测值生成预测的过程。它包含以下要素:
输入数据集:这是一组要生成预测的观测值。数据通常存储在 S3、HDFS 或数据库等外部存储系统中,并且可能非常大。
机器学习模型:这是一个训练好的 ML 模型,通常也存储在外部存储系统中。
预测:这是将 ML 模型应用于观测值时产生的输出。预测通常写回存储系统。
使用 Ray,您可以构建可扩展的批量预测,以处理大型数据集并实现高预测吞吐量。Ray Data 提供了 用于离线批量推理的高级 API,并内置了优化。但是,如果您需要更多控制,可以使用更底层的 Ray Core API。本示例通过将数据集分割成不相交的分片,并使用 Ray Tasks 或 Ray Actors 在 Ray 集群中并行执行它们,来演示使用 Ray Core 进行批量推理。
基于任务的批量预测#
使用 Ray 任务,您可以按以下方式构建批量预测程序:
加载模型
启动 Ray 任务,每个任务接收模型和一个输入数据集分片。
每个工作程序在分配的分片上执行预测,并写出结果。
以 2009 年纽约出租车数据为例。假设我们有这个简单的模型:
import pandas as pd
import numpy as np
def load_model():
# A dummy model.
def model(batch: pd.DataFrame) -> pd.DataFrame:
# Dummy payload so copying the model will actually copy some data
# across nodes.
model.payload = np.zeros(100_000_000)
return pd.DataFrame({"score": batch["passenger_count"] % 2 == 0})
return model
该数据集包含 12 个文件(每个月一个),因此我们可以自然地让每个 Ray 任务处理一个文件。通过接收模型和一个输入数据集分片(即单个文件),我们可以定义一个用于预测的 Ray 远程任务:
import pyarrow.parquet as pq
import ray
@ray.remote
def make_prediction(model, shard_path):
df = pq.read_table(shard_path).to_pandas()
result = model(df)
# Write out the prediction result.
# NOTE: unless the driver will have to further process the
# result (other than simply writing out to storage system),
# writing out at remote task is recommended, as it can avoid
# congesting or overloading the driver.
# ...
# Here we just return the size about the result in this example.
return len(result)
驱动程序启动整个输入数据集的所有任务。
# 12 files, one for each remote task.
input_files = [
f"s3://anonymous@air-example-data/ursa-labs-taxi-data/downsampled_2009_full_year_data.parquet"
f"/fe41422b01c04169af2a65a83b753e0f_{i:06d}.parquet"
for i in range(12)
]
# ray.put() the model just once to local object store, and then pass the
# reference to the remote tasks.
model = load_model()
model_ref = ray.put(model)
result_refs = []
# Launch all prediction tasks.
for file in input_files:
# Launch a prediction task by passing model reference and shard file to it.
# NOTE: it would be highly inefficient if you are passing the model itself
# like make_prediction.remote(model, file), which in order to pass the model
# to remote node will ray.put(model) for each task, potentially overwhelming
# the local object store and causing out-of-disk error.
result_refs.append(make_prediction.remote(model_ref, file))
results = ray.get(result_refs)
# Let's check prediction output size.
for r in results:
print("Prediction output size:", r)
为了避免使集群过载并导致 OOM(内存溢出),我们可以通过为任务设置适当的资源需求来控制并行度。有关此设计模式的详细信息,请参阅 模式:使用资源限制并发运行的任务数量。例如,如果很容易估计从外部存储加载的数据的内存占用量,您可以通过指定每个任务所需的内存量来控制并行度,例如,使用 make_prediction.options(memory=100*1023*1025).remote(model_ref, file) 启动任务。Ray 将会正确处理,确保调度到节点的任务不会超过其总内存。
提示
为了避免重复将同一模型存储到对象存储(这可能导致驱动程序节点磁盘空间不足),请使用 ray.put() 将模型存储一次,然后传递其引用。
提示
为了避免驱动程序节点拥堵或过载,最好让每个任务写出预测结果(而不是将结果返回给驱动程序,因为驱动程序实际什么都不做,只是写出到存储系统)。
基于 Actor 的批量预测#
在上述解决方案中,每个 Ray 任务在开始进行预测之前都必须从驱动程序节点获取模型。这是一个开销,如果模型很大,可能会非常显著。我们可以通过使用 Ray actors 来优化这一点,actors 会只获取一次模型并将其用于分配给 actor 的所有任务。
首先,我们定义一个可调用类,该类具有一个用于加载/缓存模型的接口(即构造函数),以及一个用于接收文件并执行预测的接口。
import pandas as pd
import pyarrow.parquet as pq
import ray
@ray.remote
class BatchPredictor:
def __init__(self, model):
self.model = model
def predict(self, shard_path):
df = pq.read_table(shard_path).to_pandas()
result =self.model(df)
# Write out the prediction result.
# NOTE: unless the driver will have to further process the
# result (other than simply writing out to storage system),
# writing out at remote task is recommended, as it can avoid
# congesting or overloading the driver.
# ...
# Here we just return the size about the result in this example.
return len(result)
构造函数每个 actor 工作程序只调用一次。我们使用 ActorPool 来管理一组可以接收预测请求的 actors。
from ray.util.actor_pool import ActorPool
model = load_model()
model_ref = ray.put(model)
num_actors = 4
actors = [BatchPredictor.remote(model_ref) for _ in range(num_actors)]
pool = ActorPool(actors)
input_files = [
f"s3://anonymous@air-example-data/ursa-labs-taxi-data/downsampled_2009_full_year_data.parquet"
f"/fe41422b01c04169af2a65a83b753e0f_{i:06d}.parquet"
for i in range(12)
]
for file in input_files:
pool.submit(lambda a, v: a.predict.remote(v), file)
while pool.has_next():
print("Prediction output size:", pool.get_next())
请注意,ActorPool 的大小是固定的,不像基于任务的方法,后者的并行任务数量可以是动态的(只要不超过 max_in_flight_tasks)。要拥有自动伸缩的 actor pool,您需要使用 Ray Data 批量预测。
带 GPU 的批量预测#
如果您的集群有 GPU 节点并且您的预测器可以利用 GPU,您可以通过指定 num_gpus 来将任务或 actors 指向这些 GPU 节点。Ray 会相应地将它们调度到 GPU 节点上。在节点上,您需要将模型移动到 GPU。以下是一个 Torch 模型的示例。
import torch
@ray.remote(num_gpus=1)
def make_torch_prediction(model: torch.nn.Module, shard_path):
# Move model to GPU.
model.to(torch.device("cuda"))
inputs = pq.read_table(shard_path).to_pandas().to_numpy()
results = []
# for each tensor in inputs:
# results.append(model(tensor))
#
# Write out the results right in task instead of returning back
# to the driver node (unless you have to), to avoid congest/overload
# driver node.
# ...
# Here we just return simple/light meta information.
return len(results)
常见问题解答#
如果在 Ray 集群中,模型很大,如何高效地加载和传递模型?#
推荐的方法是(以基于任务的批量预测为例,基于 actor 的方法类似):
让驱动程序加载模型(例如,从存储系统)。
让驱动程序 ray.put(model) 将模型存储到对象存储中;并且
在启动每个远程任务时,将模型的相同引用传递给它们。远程任务将在开始执行预测之前,从驱动程序的对象存储中将模型提取到其本地对象存储。
请注意,如果您跳过第 2 步并将模型(而不是引用)传递给远程任务,这是非常低效的。如果模型很大且任务很多,很可能会导致驱动程序节点磁盘空间不足而崩溃。
# GOOD: the model will be stored to driver's object store only once
model = load_model()
model_ref = ray.put(model)
for file in input_files:
make_prediction.remote(model_ref, file)
# BAD: the same model will be stored to driver's object store repeatedly for each task
model = load_model()
for file in input_files:
make_prediction.remote(model, file)
有关更多详细信息,请参阅 反模式:反复按值传递相同的较大参数会损害性能。
如何提高 GPU 利用率?#
为了让 GPU 保持忙碌,可以关注以下几点:
在具有多个 GPU 的同一节点上调度多个任务:如果同一节点上有多个 GPU,而单个任务无法全部使用它们,您可以将多个任务指向该节点。Ray 会自动处理这种情况,例如,如果您指定 num_gpus=1 并且有 4 个 GPU,Ray 将会把 4 个任务调度到该节点,前提是有足够多的任务且没有其他资源限制。
使用基于 actor 的方法:如上所述,基于 actor 的方法更有效,因为它为许多任务重用了模型初始化,因此节点将花费更多时间在实际工作负载上。