高级:性能优化技巧与调优#
优化转换#
批量处理转换#
如果您的转换是向量化的,例如大多数 NumPy 或 pandas 操作,请使用 map_batches()
而不是 map()
。它速度更快。
如果您的转换不是向量化的,则没有性能提升。
优化读取#
调优读取的输出块#
默认情况下,Ray Data 根据以下步骤自动选择读取的输出块数量
传递给 Ray Data 读取 API 的
override_num_blocks
参数指定了输出块的数量,这等同于创建的读取任务数量。通常,如果读取操作后跟随
map()
或map_batches()
,则 map 操作会与读取操作融合;因此override_num_blocks
也决定了 map 任务的数量。
Ray Data 根据以下启发式方法决定输出块数量的默认值,按顺序应用
从默认值 200 开始。您可以通过设置
DataContext.read_op_min_num_blocks
来覆盖此值。最小块大小(默认值=1 MiB)。如果块数量会使块小于此阈值,则减少块数量以避免小块的开销。您可以通过设置
DataContext.target_min_block_size
(字节)来覆盖此值。最大块大小(默认值=128 MiB)。如果块数量会使块大于此阈值,则增加块数量以避免处理过程中出现内存不足错误。您可以通过设置
DataContext.target_max_block_size
(字节)来覆盖此值。可用 CPU。增加块数量以利用集群中所有可用的 CPU。Ray Data 选择的读取任务数量至少为可用 CPU 数量的 2 倍。
有时,手动调优块数量以优化应用程序会很有利。例如,以下代码将多个文件批量处理到同一个读取任务中,以避免创建过大的块。
import ray
# Pretend there are two CPUs.
ray.init(num_cpus=2)
# Repeat the iris.csv file 16 times.
ds = ray.data.read_csv(["example://iris.csv"] * 16)
print(ds.materialize())
MaterializedDataset(
num_blocks=4,
num_rows=2400,
...
)
但是假设您知道您想并行读取所有 16 个文件。例如,这可能是因为您知道自动扩缩器应该向集群添加额外的 CPU,或者因为您希望下游操作符并行转换每个文件的内容。您可以通过设置 override_num_blocks
参数来实现此行为。请注意以下代码中输出块的数量如何等于 override_num_blocks
import ray
# Pretend there are two CPUs.
ray.init(num_cpus=2)
# Repeat the iris.csv file 16 times.
ds = ray.data.read_csv(["example://iris.csv"] * 16, override_num_blocks=16)
print(ds.materialize())
MaterializedDataset(
num_blocks=16,
num_rows=2400,
...
)
当使用默认自动检测的块数量时,Ray Data 尝试将每个任务的输出限制在 DataContext.target_max_block_size
字节。但请注意,Ray Data 无法完美预测每个任务的输出大小,因此每个任务可能会产生一个或多个输出块。因此,最终 Dataset
中的总块数可能与指定的 override_num_blocks
不同。这是一个示例,我们手动指定 override_num_blocks=1
,但一个任务仍然在具体化的 Dataset 中产生了多个块
import ray
# Pretend there are two CPUs.
ray.init(num_cpus=2)
# Generate ~400MB of data.
ds = ray.data.range_tensor(5_000, shape=(10_000, ), override_num_blocks=1)
print(ds.materialize())
MaterializedDataset(
num_blocks=3,
num_rows=5000,
schema={data: numpy.ndarray(shape=(10000,), dtype=int64)}
)
目前,Ray Data 最多为每个输入文件分配一个读取任务。因此,如果输入文件的数量少于 override_num_blocks
,读取任务的数量上限为输入文件的数量。为了确保下游转换仍然可以按照期望的块数量执行,Ray Data 将读取任务的输出分割成总共 override_num_blocks
个块,并阻止与下游转换的融合。换句话说,每个读取任务的输出块在消费 map 任务执行之前会被具体化到 Ray 的对象存储中。例如,以下代码执行 read_csv()
时只有一个任务,但在执行 map()
之前,其输出被分割成 4 个块
import ray
# Pretend there are two CPUs.
ray.init(num_cpus=2)
ds = ray.data.read_csv("example://iris.csv").map(lambda row: row)
print(ds.materialize().stats())
...
Operator 1 ReadCSV->SplitBlocks(4): 1 tasks executed, 4 blocks produced in 0.01s
...
Operator 2 Map(<lambda>): 4 tasks executed, 4 blocks produced in 0.3s
...
要关闭此行为并允许读取和 map 操作符融合,请手动设置 override_num_blocks
。例如,此代码将文件数量设置为等于 override_num_blocks
import ray
# Pretend there are two CPUs.
ray.init(num_cpus=2)
ds = ray.data.read_csv("example://iris.csv", override_num_blocks=1).map(lambda row: row)
print(ds.materialize().stats())
...
Operator 1 ReadCSV->Map(<lambda>): 1 tasks executed, 1 blocks produced in 0.01s
...
调优读取资源#
默认情况下,Ray 为每个读取任务请求 1 个 CPU,这意味着每个 CPU 可以并发执行一个读取任务。对于从更多 IO 并行性中受益的数据源,您可以使用 ray_remote_args
参数为读取函数指定较低的 num_cpus
值。例如,使用 ray.data.read_parquet(path, ray_remote_args={"num_cpus": 0.25})
可以允许每个 CPU 最多有四个读取任务。
Parquet 列剪裁(谓词下推)#
默认情况下,ray.data.read_parquet()
会将 Parquet 文件中的所有列读入内存。如果您只需要部分列,请务必在调用 ray.data.read_parquet()
时明确指定列列表,以避免加载不必要的数据(谓词下推)。请注意,这比调用 select_columns()
更高效,因为列选择被下推到了文件扫描层。
import ray
# Read just two of the five columns of the Iris dataset.
ray.data.read_parquet(
"s3://anonymous@ray-example-data/iris.parquet",
columns=["sepal.length", "variety"],
)
Dataset(num_rows=150, schema={sepal.length: double, variety: string})
减少内存使用#
排除内存不足错误#
执行期间,一个任务可以读取多个输入块,并写入多个输出块。输入和输出块会消耗 worker 的堆内存以及通过 Ray 对象存储共享的内存。Ray 通过溢出到磁盘来限制对象存储的内存使用,但过多的 worker 堆内存使用可能会导致内存不足的情况。
Ray Data 尝试将其堆内存使用限制在 num_execution_slots * max_block_size
。除非指定自定义资源,否则执行槽的数量默认等于 CPU 的数量。最大块大小由配置参数 DataContext.target_max_block_size
设置,默认为 128MiB。如果 Dataset 包含 全对全混洗操作(例如 random_shuffle()
),则默认最大块大小由 DataContext.target_shuffle_max_block_size
控制,默认为 1GiB,以避免创建过多的小块。
注意
不建议修改 DataContext.target_max_block_size
。默认值已经在过多小块带来的高开销与过大块导致的堆内存过度使用之间取得了平衡。
当任务的输出大于最大块大小时,worker 会自动将输出分割成多个更小的块,以避免堆内存不足。然而,仍然可能出现过大的块,这会导致内存不足的情况。为避免这些问题
确保数据集中的单个项不会过大。目标是每行小于 10 MB。
始终调用
ds.map_batches()
时使用足够小的批处理大小,以便输出批处理能够轻松地放入堆内存。或者,如果不需要向量化执行,请使用ds.map()
。如果这些方法都不够,手动增加读取输出块或修改您的应用程序代码,以确保每个任务读取较小的数据量。
以调优批处理大小为例,以下代码使用一个任务加载一个 1 GB 的 Dataset
,其中包含 1000 行 1 MB 的数据,并使用 map_batches()
应用一个恒等函数。由于 map_batches()
的默认 batch_size
为 1024 行,此代码仅产生一个非常大的批处理,导致堆内存使用增加到 4 GB。
import ray
# Pretend there are two CPUs.
ray.init(num_cpus=2)
# Force Ray Data to use one task to show the memory issue.
ds = ray.data.range_tensor(1000, shape=(125_000, ), override_num_blocks=1)
# The default batch size is 1024 rows.
ds = ds.map_batches(lambda batch: batch)
print(ds.materialize().stats())
Operator 1 ReadRange->MapBatches(<lambda>): 1 tasks executed, 7 blocks produced in 1.33s
...
* Peak heap memory usage (MiB): 3302.17 min, 4233.51 max, 4100 mean
* Output num rows: 125 min, 125 max, 125 mean, 1000 total
* Output size bytes: 134000536 min, 196000784 max, 142857714 mean, 1000004000 total
...
设置较低的批处理大小可以降低峰值堆内存使用量
import ray
# Pretend there are two CPUs.
ray.init(num_cpus=2)
ds = ray.data.range_tensor(1000, shape=(125_000, ), override_num_blocks=1)
ds = ds.map_batches(lambda batch: batch, batch_size=32)
print(ds.materialize().stats())
Operator 1 ReadRange->MapBatches(<lambda>): 1 tasks executed, 7 blocks produced in 0.51s
...
* Peak heap memory usage (MiB): 587.09 min, 1569.57 max, 1207 mean
* Output num rows: 40 min, 160 max, 142 mean, 1000 total
* Output size bytes: 40000160 min, 160000640 max, 142857714 mean, 1000004000 total
...
改进 Ray Data 中的堆内存使用是一个活跃的开发领域。以下是当前已知的一些堆内存使用可能非常高的情况
读取大型(1 GiB 或更大)二进制文件。
转换单个行很大的 Dataset(100 MiB 或更大)。
在这些情况下,最后的办法是减少并发执行槽的数量。这可以通过自定义资源来实现。例如,使用 ds.map_batches(fn, num_cpus=2)
将 map_batches
任务的执行槽数量减半。
如果这些策略仍然不足,请在 GitHub 上提交一个 Ray Data Issue。
避免对象溢出#
Dataset 的中间和输出块存储在 Ray 的对象存储中。尽管 Ray Data 试图通过流式执行来最小化对象存储使用,工作集仍然可能超出对象存储容量。在这种情况下,Ray 会开始将块溢出到磁盘,这会显著减慢执行速度,甚至导致磁盘空间不足错误。
在某些情况下,溢出是预期的。特别是当整个 Dataset 的大小大于对象存储容量,并且满足以下任一条件时
使用了全对全混洗操作。或者,
调用了
ds.materialize()
。
否则,最好调优您的应用程序以避免溢出。推荐的策略是手动增加读取输出块或修改您的应用程序代码,以确保每个任务读取较小的数据量。
注意
这是一个活跃的开发领域。如果您的 Dataset 导致了溢出且您不知道原因,请在 GitHub 上提交一个 Ray Data Issue。
处理过小的块#
当您的 Dataset 的不同操作符产生不同大小的输出时,您可能会得到非常小的块,这会影响性能,甚至可能因元数据过多而导致崩溃。使用 ds.stats()
检查每个操作符的输出块是否至少为 1 MB,理想情况下为 100 MB。
如果您的块小于此大小,请考虑重新分区成更大的块。有两种方法可以做到这一点
如果您需要控制输出块的确切数量,请使用
ds.repartition(num_partitions)
。请注意,这是一种全对全操作,在执行重新分区之前会将所有块具体化到内存中。如果您不需要控制输出块的确切数量,而只是想生成更大的块,请使用
ds.map_batches(lambda batch: batch, batch_size=batch_size)
并将batch_size
设置为每个块所需的行数。这以流式方式执行,避免了具体化。
使用 ds.map_batches()
时,Ray Data 会合并块,以便每个 map 任务可以处理至少指定数量的行。请注意,选择的 batch_size
是任务输入块大小的下限,但不一定决定任务的最终输出块大小;有关如何确定块大小的更多信息,请参阅有关块内存使用量的部分。
为了说明这些,以下代码使用了两种策略将 10 个各含 1 行的微小块合并成 1 个含 10 行的较大块
import ray
# Pretend there are two CPUs.
ray.init(num_cpus=2)
# 1. Use ds.repartition().
ds = ray.data.range(10, override_num_blocks=10).repartition(1)
print(ds.materialize().stats())
# 2. Use ds.map_batches().
ds = ray.data.range(10, override_num_blocks=10).map_batches(lambda batch: batch, batch_size=10)
print(ds.materialize().stats())
# 1. ds.repartition() output.
Operator 1 ReadRange: 10 tasks executed, 10 blocks produced in 0.33s
...
* Output num rows: 1 min, 1 max, 1 mean, 10 total
...
Operator 2 Repartition: executed in 0.36s
Suboperator 0 RepartitionSplit: 10 tasks executed, 10 blocks produced
...
Suboperator 1 RepartitionReduce: 1 tasks executed, 1 blocks produced
...
* Output num rows: 10 min, 10 max, 10 mean, 10 total
...
# 2. ds.map_batches() output.
Operator 1 ReadRange->MapBatches(<lambda>): 1 tasks executed, 1 blocks produced in 0s
...
* Output num rows: 10 min, 10 max, 10 mean, 10 total
配置执行#
配置资源和局部性#
默认情况下,CPU 和 GPU 限制设置为集群大小,对象存储内存限制保守地设置为总对象存储大小的 1/4,以避免磁盘溢出的可能性。
您可能希望在以下场景中自定义这些限制:- 如果在集群上运行多个并发作业,设置较低的限制可以避免作业之间的资源争用。- 如果您想微调内存限制以最大化性能。- 对于加载到训练作业中的数据,您可能希望将对象存储内存设置为较低的值(例如 2 GB)以限制资源使用。
您可以使用全局 DataContext 配置执行选项。这些选项将应用于在该进程中启动的未来作业
ctx = ray.data.DataContext.get_current()
ctx.execution_options.resource_limits.cpu = 10
ctx.execution_options.resource_limits.gpu = 5
ctx.execution_options.resource_limits.object_store_memory = 10e9
注意
不建议修改 Ray Core 对象存储内存限制,因为这会减少任务执行的可用内存。唯一的例外是如果您使用的机器具有非常大的 RAM(每台 1 TB 或更大);在这种情况下,建议将对象存储设置为总 RAM 的 ~30-40%。
输出局部性(ML 摄取用例)#
ctx.execution_options.locality_with_output = True
将此参数设置为 True 会告诉 Ray Data 倾向于将操作符任务放置在集群中的消费节点上,而不是将它们均匀地分布在整个集群中。如果您知道您直接在消费节点上消费输出数据(例如,用于 ML 训练摄取),此设置会很有用。然而,其他用例可能会因使用此设置而招致性能损失。
可重现性#
确定性执行#
# By default, this is set to False.
ctx.execution_options.preserve_order = True
要启用确定性执行,请将上述设置设为 True。此设置可能会降低性能,但可确保在执行过程中保留块的顺序。此标志默认为 False。