高级:性能提示和调优#

优化转换#

批量转换#

如果您的转换是向量化的,就像大多数 NumPy 或 pandas 操作一样,请使用 map_batches() 而不是 map()。它更快。

如果您的转换不是向量化的,则没有性能优势。

优化读取#

调整读取的输出块#

默认情况下,Ray Data 会根据以下过程自动选择读取的输出块数量

  • 传递给 Ray Data 的 读取 APIoverride_num_blocks 参数指定输出块的数量,这相当于要创建的读取任务的数量。

  • 通常,如果读取后跟 map()map_batches(),则 map 会与读取进行融合;因此 override_num_blocks 也决定了 map 任务的数量。

Ray Data 根据以下启发式方法决定输出块数量的默认值,并按顺序应用:

  1. 从默认值 200 开始。您可以通过设置 DataContext.read_op_min_num_blocks 来覆盖此值。

  2. 最小块大小(默认值:1 MiB)。如果块的数量会导致块小于此阈值,则减少块的数量以避免微小块的开销。您可以通过设置 DataContext.target_min_block_size(字节)来覆盖此值。

  3. 最大块大小(默认值:128 MiB)。如果块的数量会导致块大于此阈值,则增加块的数量以避免处理过程中出现内存不足错误。您可以通过设置 DataContext.target_max_block_size(字节)来覆盖此值。

  4. 可用 CPU。增加块的数量以利用群集中的所有可用 CPU。Ray Data 选择的读取任务数量至少是可用 CPU 数量的 2 倍。

有时,手动调整块的数量以优化应用程序是有益的。例如,以下代码将多个文件批量处理到同一个读取任务中,以避免创建过大的块。

import ray
# Pretend there are two CPUs.
ray.init(num_cpus=2)

# Repeat the iris.csv file 16 times.
ds = ray.data.read_csv(["s3://anonymous@ray-example-data/iris.csv"] * 16)
print(ds.materialize())
MaterializedDataset(
   num_blocks=4,
   num_rows=2400,
   ...
)

但是,假设您知道希望并行读取所有 16 个文件。这可能的原因是,您知道应该通过自动扩展程序向群集添加更多 CPU,或者您希望下游操作符并行处理每个文件的内容。通过设置 override_num_blocks 参数可以获得这种行为。请注意,在以下代码中,输出块的数量等于 override_num_blocks

import ray
# Pretend there are two CPUs.
ray.init(num_cpus=2)

# Repeat the iris.csv file 16 times.
ds = ray.data.read_csv(["s3://anonymous@ray-example-data/iris.csv"] * 16, override_num_blocks=16)
print(ds.materialize())
MaterializedDataset(
   num_blocks=16,
   num_rows=2400,
   ...
)

在使用默认自动检测的块数量时,Ray Data 会尝试将每个任务的输出限制在 DataContext.target_max_block_size 字节。请注意,Ray Data 无法完美预测每个任务的输出大小,因此每个任务都可能产生一个或多个输出块。因此,最终 Dataset 中的总块数可能与指定的 override_num_blocks 不同。以下是一个示例,其中我们手动指定了 override_num_blocks=1,但一个任务仍然在已物化的 Dataset 中生成多个块。

import ray
# Pretend there are two CPUs.
ray.init(num_cpus=2)

# Generate ~400MB of data.
ds = ray.data.range_tensor(5_000, shape=(10_000, ), override_num_blocks=1)
print(ds.materialize())
MaterializedDataset(
   num_blocks=3,
   num_rows=5000,
   schema={data: ArrowTensorTypeV2(shape=(10000,), dtype=int64)}
)

目前,Ray Data 每个输入文件最多可以分配一个读取任务。因此,如果输入文件的数量小于 override_num_blocks,则读取任务的数量将限制为输入文件的数量。为确保下游转换仍能以所需的块数执行,Ray Data 会将读取任务的输出分割成总共 override_num_blocks 个块,并防止与下游转换融合。换句话说,每个读取任务的输出块都会物化到 Ray 的对象存储中,然后再执行消耗它的 map 任务。例如,以下代码使用一个任务执行 read_csv(),但其输出在执行 map() 之前被分割成 4 个块。

import ray
# Pretend there are two CPUs.
ray.init(num_cpus=2)

ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv").map(lambda row: row)
print(ds.materialize().stats())
...
Operator 1 ReadCSV->SplitBlocks(4): 1 tasks executed, 4 blocks produced in 0.01s
...

Operator 2 Map(<lambda>): 4 tasks executed, 4 blocks produced in 0.3s
...

要关闭此行为并允许读取和 map 操作符融合,请手动设置 override_num_blocks。例如,此代码将文件数量设置为等于 override_num_blocks

import ray
# Pretend there are two CPUs.
ray.init(num_cpus=2)

ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv", override_num_blocks=1).map(lambda row: row)
print(ds.materialize().stats())
...
Operator 1 ReadCSV->Map(<lambda>): 1 tasks executed, 1 blocks produced in 0.01s
...

调整读取资源#

默认情况下,Ray 为每个读取任务请求 1 个 CPU,这意味着每个 CPU 可以同时执行一个读取任务。对于受益于更多 IO 并行化的数据源,您可以使用 ray_remote_args 参数为读取函数指定较低的 num_cpus 值。例如,使用 ray.data.read_parquet(path, ray_remote_args={"num_cpus": 0.25}) 可为每个 CPU 允许最多四个读取任务。

Parquet 列裁剪(投影下推)#

默认情况下,ray.data.read_parquet() 会将 Parquet 文件中的所有列读入内存。如果您只需要列的子集,请确保在调用 ray.data.read_parquet() 时显式指定列的列表,以避免加载不必要的数据(投影下推)。请注意,这比调用 select_columns() 更有效,因为列选择被推送到文件扫描。

import ray

# Read just two of the five columns of the Iris dataset.
ds = ray.data.read_parquet(
    "s3://anonymous@ray-example-data/iris.parquet",
    columns=["sepal.length", "variety"],
)

print(ds.schema())
Column        Type
------        ----
sepal.length  double
variety       string

减少内存使用#

排查内存不足错误#

在执行过程中,一个任务可以读取多个输入块,并写入多个输出块。输入和输出块通过 Ray 的对象存储消耗工作进程堆内存和共享内存。Ray 通过将数据溢出到磁盘来限制对象存储内存使用,但过多的工作进程堆内存使用会导致内存不足的情况。

Ray Data 尝试将其堆内存使用量限制在 num_execution_slots * max_block_size。执行槽的数量默认等于 CPU 的数量,除非指定了自定义资源。最大块大小由配置参数 DataContext.target_max_block_size 设置,默认值为 128MiB。如果 Dataset 包含 全对全 shuffle 操作(例如 random_shuffle()),则默认最大块大小由 DataContext.target_shuffle_max_block_size 控制,默认设置为 1GiB,以避免创建过多微小块。

注意

不建议修改 DataContext.target_max_block_size。默认值已选择,以平衡过多微小块带来的高开销与过大块带来的过度堆内存使用。

当任务的输出大于最大块大小时,工作进程会自动将输出分割成多个较小的块,以避免堆内存不足。但是,仍然可能出现过大的块,并可能导致内存不足的情况。为避免这些问题:

  1. 确保您的 Dataset 中没有单个项过大。目标是每行小于 10 MB。

  2. 始终使用 ds.map_batches(),并设置一个足够小的批次大小,以便输出批次能够轻松放入堆内存。或者,如果不需要向量化执行,请使用 ds.map()

  3. 如果以上方法都不足以解决问题,请手动增加 读取输出块 或修改您的应用程序代码,以确保每个任务读取的数据量更少。

作为调整批次大小的示例,以下代码使用一个任务加载一个 1 GB 的 Dataset,其中包含 1000 行 1 MB 的数据,并使用 map_batches() 应用一个恒等函数。由于 map_batches() 的默认 batch_size 为 1024 行,此代码仅产生一个非常大的批次,导致堆内存使用量增加到 4 GB。

import ray
# Pretend there are two CPUs.
ray.init(num_cpus=2)

# Force Ray Data to use one task to show the memory issue.
ds = ray.data.range_tensor(1000, shape=(125_000, ), override_num_blocks=1)
# The default batch size is 1024 rows.
ds = ds.map_batches(lambda batch: batch)
print(ds.materialize().stats())
Operator 1 ReadRange->MapBatches(<lambda>): 1 tasks executed, 7 blocks produced in 1.33s
  ...
* Peak heap memory usage (MiB): 3302.17 min, 4233.51 max, 4100 mean
* Output num rows: 125 min, 125 max, 125 mean, 1000 total
* Output size bytes: 134000536 min, 196000784 max, 142857714 mean, 1000004000 total
  ...

设置较低的批次大小会降低峰值堆内存使用量。

import ray
# Pretend there are two CPUs.
ray.init(num_cpus=2)

ds = ray.data.range_tensor(1000, shape=(125_000, ), override_num_blocks=1)
ds = ds.map_batches(lambda batch: batch, batch_size=32)
print(ds.materialize().stats())
Operator 1 ReadRange->MapBatches(<lambda>): 1 tasks executed, 7 blocks produced in 0.51s
...
* Peak heap memory usage (MiB): 587.09 min, 1569.57 max, 1207 mean
* Output num rows: 40 min, 160 max, 142 mean, 1000 total
* Output size bytes: 40000160 min, 160000640 max, 142857714 mean, 1000004000 total
...

在 Ray Data 中改进堆内存使用是一个活跃的开发领域。以下是当前已知可能导致堆内存使用量非常高的情况:

  1. 读取大型(1 GiB 或更大)二进制文件。

  2. 转换单个行很大的 Dataset(100 MiB 或更大)。

在这些情况下,最后的手段是减少并发执行槽的数量。这可以通过自定义资源来完成。例如,使用 ds.map_batches(fn, num_cpus=2) 来减半 map_batches 任务的执行槽数量。

如果这些策略仍然不足以解决问题,请 在 GitHub 上提交 Ray Data 问题

避免对象溢出#

Dataset 的中间块和输出块存储在 Ray 的对象存储中。尽管 Ray Data 尝试通过 流式执行 来最大限度地减少对象存储使用,但工作集仍有可能超过对象存储容量。在这种情况下,Ray 开始将块溢出到磁盘,这会显着减慢执行速度,甚至可能导致磁盘空间不足的错误。

在某些情况下,溢出是可预期的。特别地,如果 Dataset 的总大小大于对象存储容量,并且以下条件之一成立:

  1. 使用了 全对全 shuffle 操作。或,

  2. 调用了 ds.materialize()

否则,最好调整您的应用程序以避免溢出。推荐的策略是手动增加 读取输出块 或修改您的应用程序代码,以确保每个任务读取的数据量更少。

注意

这是一个活跃的开发领域。如果您的 Dataset 导致溢出并且您不知道原因,请 在 GitHub 上提交 Ray Data 问题

处理过小的块#

当您的 Dataset 的不同操作符产生不同大小的输出时,您可能会得到非常小的块,这会损害性能,甚至由于过多的元数据而导致崩溃。使用 ds.stats() 检查每个操作符的输出块是否至少为 1 MB,最好是 >100 MB。

如果您的块比这小,请考虑重新分区成更大的块。有两种方法可以做到这一点:

  1. 如果您需要控制确切的输出块数量,请使用 ds.repartition(num_partitions)。请注意,这是一个 全对全操作,它会在执行重新分区之前将所有块物化到内存中。

  2. 如果您不需要控制确切的输出块数量,只是想生成更大的块,请使用 ds.map_batches(lambda batch: batch, batch_size=batch_size) 并将 batch_size 设置为您想要的每块行数。这以流式方式执行,并避免物化。

当使用 ds.map_batches() 时,Ray Data 会合并块,以便每个 map 任务可以处理至少这么多行。请注意,选择的 batch_size 是任务输入块大小的下限,但它不一定决定任务最终的 *输出* 块大小;有关块大小如何确定的更多信息,请参阅有关块内存使用 的部分

为了说明这一点,以下代码使用这两种策略将 10 个每行 1 个小块合并为 1 个每行 10 个大块。

import ray
# Pretend there are two CPUs.
ray.init(num_cpus=2)

# 1. Use ds.repartition().
ds = ray.data.range(10, override_num_blocks=10).repartition(1)
print(ds.materialize().stats())

# 2. Use ds.map_batches().
ds = ray.data.range(10, override_num_blocks=10).map_batches(lambda batch: batch, batch_size=10)
print(ds.materialize().stats())
# 1. ds.repartition() output.
Operator 1 ReadRange: 10 tasks executed, 10 blocks produced in 0.33s
...
* Output num rows: 1 min, 1 max, 1 mean, 10 total
...
Operator 2 Repartition: executed in 0.36s

        Suboperator 0 RepartitionSplit: 10 tasks executed, 10 blocks produced
        ...

        Suboperator 1 RepartitionReduce: 1 tasks executed, 1 blocks produced
        ...
        * Output num rows: 10 min, 10 max, 10 mean, 10 total
        ...


# 2. ds.map_batches() output.
Operator 1 ReadRange->MapBatches(<lambda>): 1 tasks executed, 1 blocks produced in 0s
...
* Output num rows: 10 min, 10 max, 10 mean, 10 total

配置执行#

配置资源和本地性#

默认情况下,CPU 和 GPU 限制设置为群集大小,对象存储内存限制保守设置为总对象存储大小的 1/4,以避免磁盘溢出的可能性。

您可能希望在以下场景中自定义这些限制:

  • 如果在群集上运行多个并发作业,设置较低的限制可以避免作业之间的资源争用。

  • 如果您想微调内存限制以最大化性能。

  • 对于将数据加载到训练作业中,您可能希望将对象存储内存设置为较低的值(例如,2 GB)以限制资源使用。

您可以使用全局 DataContext 配置执行选项。这些选项将应用于进程中将启动的未来作业。

ctx = ray.data.DataContext.get_current()
ctx.execution_options.resource_limits = ctx.execution_options.resource_limits.copy(
    cpu=10,
    gpu=5,
    object_store_memory=10e9,
)

注意

请注意,默认情况下 Ray 只为其对象存储保留 30% 的内存。建议为所有 Ray Data 工作负载至少将其设置为 **\*50%\***。

输出本地性(ML ingest 用例)#

ctx.execution_options.locality_with_output = True

将此参数设置为 True 会告诉 Ray Data 优先将操作符任务放置在群集的消费者节点上,而不是均匀地分布在整个群集中。如果您知道您正在消费者节点上直接使用输出数据(例如,用于 ML 训练 ingest),则此设置可能很有用。但是,其他用例可能会因为此设置而导致性能下降。

可复现性#

确定性执行#

# By default, this is set to False.
ctx.execution_options.preserve_order = True

要启用确定性执行,请将前面的设置为 True。此设置可能会降低性能,但可确保在执行过程中保留块顺序。此标志默认为 False。