高级:性能提示和调优#
优化转换#
批量转换#
如果您的转换是向量化的,就像大多数 NumPy 或 pandas 操作一样,请使用 map_batches() 而不是 map()。它更快。
如果您的转换不是向量化的,则没有性能优势。
优化读取#
调整读取的输出块#
默认情况下,Ray Data 会根据以下过程自动选择读取的输出块数量
传递给 Ray Data 的 读取 API 的
override_num_blocks参数指定输出块的数量,这相当于要创建的读取任务的数量。通常,如果读取后跟
map()或map_batches(),则 map 会与读取进行融合;因此override_num_blocks也决定了 map 任务的数量。
Ray Data 根据以下启发式方法决定输出块数量的默认值,并按顺序应用:
从默认值 200 开始。您可以通过设置
DataContext.read_op_min_num_blocks来覆盖此值。最小块大小(默认值:1 MiB)。如果块的数量会导致块小于此阈值,则减少块的数量以避免微小块的开销。您可以通过设置
DataContext.target_min_block_size(字节)来覆盖此值。最大块大小(默认值:128 MiB)。如果块的数量会导致块大于此阈值,则增加块的数量以避免处理过程中出现内存不足错误。您可以通过设置
DataContext.target_max_block_size(字节)来覆盖此值。可用 CPU。增加块的数量以利用群集中的所有可用 CPU。Ray Data 选择的读取任务数量至少是可用 CPU 数量的 2 倍。
有时,手动调整块的数量以优化应用程序是有益的。例如,以下代码将多个文件批量处理到同一个读取任务中,以避免创建过大的块。
import ray
# Pretend there are two CPUs.
ray.init(num_cpus=2)
# Repeat the iris.csv file 16 times.
ds = ray.data.read_csv(["s3://anonymous@ray-example-data/iris.csv"] * 16)
print(ds.materialize())
MaterializedDataset(
num_blocks=4,
num_rows=2400,
...
)
但是,假设您知道希望并行读取所有 16 个文件。这可能的原因是,您知道应该通过自动扩展程序向群集添加更多 CPU,或者您希望下游操作符并行处理每个文件的内容。通过设置 override_num_blocks 参数可以获得这种行为。请注意,在以下代码中,输出块的数量等于 override_num_blocks。
import ray
# Pretend there are two CPUs.
ray.init(num_cpus=2)
# Repeat the iris.csv file 16 times.
ds = ray.data.read_csv(["s3://anonymous@ray-example-data/iris.csv"] * 16, override_num_blocks=16)
print(ds.materialize())
MaterializedDataset(
num_blocks=16,
num_rows=2400,
...
)
在使用默认自动检测的块数量时,Ray Data 会尝试将每个任务的输出限制在 DataContext.target_max_block_size 字节。请注意,Ray Data 无法完美预测每个任务的输出大小,因此每个任务都可能产生一个或多个输出块。因此,最终 Dataset 中的总块数可能与指定的 override_num_blocks 不同。以下是一个示例,其中我们手动指定了 override_num_blocks=1,但一个任务仍然在已物化的 Dataset 中生成多个块。
import ray
# Pretend there are two CPUs.
ray.init(num_cpus=2)
# Generate ~400MB of data.
ds = ray.data.range_tensor(5_000, shape=(10_000, ), override_num_blocks=1)
print(ds.materialize())
MaterializedDataset(
num_blocks=3,
num_rows=5000,
schema={data: ArrowTensorTypeV2(shape=(10000,), dtype=int64)}
)
目前,Ray Data 每个输入文件最多可以分配一个读取任务。因此,如果输入文件的数量小于 override_num_blocks,则读取任务的数量将限制为输入文件的数量。为确保下游转换仍能以所需的块数执行,Ray Data 会将读取任务的输出分割成总共 override_num_blocks 个块,并防止与下游转换融合。换句话说,每个读取任务的输出块都会物化到 Ray 的对象存储中,然后再执行消耗它的 map 任务。例如,以下代码使用一个任务执行 read_csv(),但其输出在执行 map() 之前被分割成 4 个块。
import ray
# Pretend there are two CPUs.
ray.init(num_cpus=2)
ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv").map(lambda row: row)
print(ds.materialize().stats())
...
Operator 1 ReadCSV->SplitBlocks(4): 1 tasks executed, 4 blocks produced in 0.01s
...
Operator 2 Map(<lambda>): 4 tasks executed, 4 blocks produced in 0.3s
...
要关闭此行为并允许读取和 map 操作符融合,请手动设置 override_num_blocks。例如,此代码将文件数量设置为等于 override_num_blocks。
import ray
# Pretend there are two CPUs.
ray.init(num_cpus=2)
ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv", override_num_blocks=1).map(lambda row: row)
print(ds.materialize().stats())
...
Operator 1 ReadCSV->Map(<lambda>): 1 tasks executed, 1 blocks produced in 0.01s
...
调整读取资源#
默认情况下,Ray 为每个读取任务请求 1 个 CPU,这意味着每个 CPU 可以同时执行一个读取任务。对于受益于更多 IO 并行化的数据源,您可以使用 ray_remote_args 参数为读取函数指定较低的 num_cpus 值。例如,使用 ray.data.read_parquet(path, ray_remote_args={"num_cpus": 0.25}) 可为每个 CPU 允许最多四个读取任务。
Parquet 列裁剪(投影下推)#
默认情况下,ray.data.read_parquet() 会将 Parquet 文件中的所有列读入内存。如果您只需要列的子集,请确保在调用 ray.data.read_parquet() 时显式指定列的列表,以避免加载不必要的数据(投影下推)。请注意,这比调用 select_columns() 更有效,因为列选择被推送到文件扫描。
import ray
# Read just two of the five columns of the Iris dataset.
ds = ray.data.read_parquet(
"s3://anonymous@ray-example-data/iris.parquet",
columns=["sepal.length", "variety"],
)
print(ds.schema())
Column Type
------ ----
sepal.length double
variety string
减少内存使用#
排查内存不足错误#
在执行过程中,一个任务可以读取多个输入块,并写入多个输出块。输入和输出块通过 Ray 的对象存储消耗工作进程堆内存和共享内存。Ray 通过将数据溢出到磁盘来限制对象存储内存使用,但过多的工作进程堆内存使用会导致内存不足的情况。
Ray Data 尝试将其堆内存使用量限制在 num_execution_slots * max_block_size。执行槽的数量默认等于 CPU 的数量,除非指定了自定义资源。最大块大小由配置参数 DataContext.target_max_block_size 设置,默认值为 128MiB。如果 Dataset 包含 全对全 shuffle 操作(例如 random_shuffle()),则默认最大块大小由 DataContext.target_shuffle_max_block_size 控制,默认设置为 1GiB,以避免创建过多微小块。
注意
不建议修改 DataContext.target_max_block_size。默认值已选择,以平衡过多微小块带来的高开销与过大块带来的过度堆内存使用。
当任务的输出大于最大块大小时,工作进程会自动将输出分割成多个较小的块,以避免堆内存不足。但是,仍然可能出现过大的块,并可能导致内存不足的情况。为避免这些问题:
确保您的 Dataset 中没有单个项过大。目标是每行小于 10 MB。
始终使用
ds.map_batches(),并设置一个足够小的批次大小,以便输出批次能够轻松放入堆内存。或者,如果不需要向量化执行,请使用ds.map()。如果以上方法都不足以解决问题,请手动增加 读取输出块 或修改您的应用程序代码,以确保每个任务读取的数据量更少。
作为调整批次大小的示例,以下代码使用一个任务加载一个 1 GB 的 Dataset,其中包含 1000 行 1 MB 的数据,并使用 map_batches() 应用一个恒等函数。由于 map_batches() 的默认 batch_size 为 1024 行,此代码仅产生一个非常大的批次,导致堆内存使用量增加到 4 GB。
import ray
# Pretend there are two CPUs.
ray.init(num_cpus=2)
# Force Ray Data to use one task to show the memory issue.
ds = ray.data.range_tensor(1000, shape=(125_000, ), override_num_blocks=1)
# The default batch size is 1024 rows.
ds = ds.map_batches(lambda batch: batch)
print(ds.materialize().stats())
Operator 1 ReadRange->MapBatches(<lambda>): 1 tasks executed, 7 blocks produced in 1.33s
...
* Peak heap memory usage (MiB): 3302.17 min, 4233.51 max, 4100 mean
* Output num rows: 125 min, 125 max, 125 mean, 1000 total
* Output size bytes: 134000536 min, 196000784 max, 142857714 mean, 1000004000 total
...
设置较低的批次大小会降低峰值堆内存使用量。
import ray
# Pretend there are two CPUs.
ray.init(num_cpus=2)
ds = ray.data.range_tensor(1000, shape=(125_000, ), override_num_blocks=1)
ds = ds.map_batches(lambda batch: batch, batch_size=32)
print(ds.materialize().stats())
Operator 1 ReadRange->MapBatches(<lambda>): 1 tasks executed, 7 blocks produced in 0.51s
...
* Peak heap memory usage (MiB): 587.09 min, 1569.57 max, 1207 mean
* Output num rows: 40 min, 160 max, 142 mean, 1000 total
* Output size bytes: 40000160 min, 160000640 max, 142857714 mean, 1000004000 total
...
在 Ray Data 中改进堆内存使用是一个活跃的开发领域。以下是当前已知可能导致堆内存使用量非常高的情况:
读取大型(1 GiB 或更大)二进制文件。
转换单个行很大的 Dataset(100 MiB 或更大)。
在这些情况下,最后的手段是减少并发执行槽的数量。这可以通过自定义资源来完成。例如,使用 ds.map_batches(fn, num_cpus=2) 来减半 map_batches 任务的执行槽数量。
如果这些策略仍然不足以解决问题,请 在 GitHub 上提交 Ray Data 问题。
避免对象溢出#
Dataset 的中间块和输出块存储在 Ray 的对象存储中。尽管 Ray Data 尝试通过 流式执行 来最大限度地减少对象存储使用,但工作集仍有可能超过对象存储容量。在这种情况下,Ray 开始将块溢出到磁盘,这会显着减慢执行速度,甚至可能导致磁盘空间不足的错误。
在某些情况下,溢出是可预期的。特别地,如果 Dataset 的总大小大于对象存储容量,并且以下条件之一成立:
使用了 全对全 shuffle 操作。或,
调用了
ds.materialize()。
否则,最好调整您的应用程序以避免溢出。推荐的策略是手动增加 读取输出块 或修改您的应用程序代码,以确保每个任务读取的数据量更少。
注意
这是一个活跃的开发领域。如果您的 Dataset 导致溢出并且您不知道原因,请 在 GitHub 上提交 Ray Data 问题。
处理过小的块#
当您的 Dataset 的不同操作符产生不同大小的输出时,您可能会得到非常小的块,这会损害性能,甚至由于过多的元数据而导致崩溃。使用 ds.stats() 检查每个操作符的输出块是否至少为 1 MB,最好是 >100 MB。
如果您的块比这小,请考虑重新分区成更大的块。有两种方法可以做到这一点:
如果您需要控制确切的输出块数量,请使用
ds.repartition(num_partitions)。请注意,这是一个 全对全操作,它会在执行重新分区之前将所有块物化到内存中。如果您不需要控制确切的输出块数量,只是想生成更大的块,请使用
ds.map_batches(lambda batch: batch, batch_size=batch_size)并将batch_size设置为您想要的每块行数。这以流式方式执行,并避免物化。
当使用 ds.map_batches() 时,Ray Data 会合并块,以便每个 map 任务可以处理至少这么多行。请注意,选择的 batch_size 是任务输入块大小的下限,但它不一定决定任务最终的 *输出* 块大小;有关块大小如何确定的更多信息,请参阅有关块内存使用 的部分。
为了说明这一点,以下代码使用这两种策略将 10 个每行 1 个小块合并为 1 个每行 10 个大块。
import ray
# Pretend there are two CPUs.
ray.init(num_cpus=2)
# 1. Use ds.repartition().
ds = ray.data.range(10, override_num_blocks=10).repartition(1)
print(ds.materialize().stats())
# 2. Use ds.map_batches().
ds = ray.data.range(10, override_num_blocks=10).map_batches(lambda batch: batch, batch_size=10)
print(ds.materialize().stats())
# 1. ds.repartition() output.
Operator 1 ReadRange: 10 tasks executed, 10 blocks produced in 0.33s
...
* Output num rows: 1 min, 1 max, 1 mean, 10 total
...
Operator 2 Repartition: executed in 0.36s
Suboperator 0 RepartitionSplit: 10 tasks executed, 10 blocks produced
...
Suboperator 1 RepartitionReduce: 1 tasks executed, 1 blocks produced
...
* Output num rows: 10 min, 10 max, 10 mean, 10 total
...
# 2. ds.map_batches() output.
Operator 1 ReadRange->MapBatches(<lambda>): 1 tasks executed, 1 blocks produced in 0s
...
* Output num rows: 10 min, 10 max, 10 mean, 10 total
配置执行#
配置资源和本地性#
默认情况下,CPU 和 GPU 限制设置为群集大小,对象存储内存限制保守设置为总对象存储大小的 1/4,以避免磁盘溢出的可能性。
您可能希望在以下场景中自定义这些限制:
如果在群集上运行多个并发作业,设置较低的限制可以避免作业之间的资源争用。
如果您想微调内存限制以最大化性能。
对于将数据加载到训练作业中,您可能希望将对象存储内存设置为较低的值(例如,2 GB)以限制资源使用。
您可以使用全局 DataContext 配置执行选项。这些选项将应用于进程中将启动的未来作业。
ctx = ray.data.DataContext.get_current()
ctx.execution_options.resource_limits = ctx.execution_options.resource_limits.copy(
cpu=10,
gpu=5,
object_store_memory=10e9,
)
注意
请注意,默认情况下 Ray 只为其对象存储保留 30% 的内存。建议为所有 Ray Data 工作负载至少将其设置为 **\*50%\***。
输出本地性(ML ingest 用例)#
ctx.execution_options.locality_with_output = True
将此参数设置为 True 会告诉 Ray Data 优先将操作符任务放置在群集的消费者节点上,而不是均匀地分布在整个群集中。如果您知道您正在消费者节点上直接使用输出数据(例如,用于 ML 训练 ingest),则此设置可能很有用。但是,其他用例可能会因为此设置而导致性能下降。
可复现性#
确定性执行#
# By default, this is set to False.
ctx.execution_options.preserve_order = True
要启用确定性执行,请将前面的设置为 True。此设置可能会降低性能,但可确保在执行过程中保留块顺序。此标志默认为 False。