Ray Data 内部原理#
本指南描述了 Ray Data 的实现。目标受众是高级用户和 Ray Data 开发者。
有关 Ray Data 的更简明介绍,请参阅 快速入门。
核心概念#
数据集和块#
数据集#
Dataset 是主要的面向用户的 Python API。它代表一个分布式数据集合,并定义数据加载和处理操作。您通常以这种方式使用 API:
从外部存储或内存数据创建 Ray 数据集。
对数据应用转换。
将输出写入外部存储或将输出馈送给训练 worker。
块#
块是 Ray Data 存储在对象存储和通过网络传输的基本数据批量单位。每个块包含行的不相交子集,Ray Data 并行加载和转换这些块。
下图可视化了一个包含三个块的数据集,每个块包含 1000 行。Ray Data 将 Dataset 保存在触发执行的进程(通常是驱动程序)中,并将块存储为 Ray 的共享内存 对象存储 中的对象。
块格式#
块是 Arrow 表或 pandas DataFrame。通常,块是 Arrow 表,除非 Arrow 无法表示您的数据。
块格式不会影响 iter_batches() 等 API 返回的数据类型。
块大小限制#
Ray Data 会限制块大小,以避免过度的通信开销并防止内存不足错误。小块有利于低延迟和流式执行,而大块可以减少调度器和通信开销。默认范围会尝试为大多数作业做出良好的权衡。
Ray Data 尝试将块大小限制在 1 MiB 到 128 MiB 之间。要更改块大小范围,请配置 DataContext 的 target_min_block_size 和 target_max_block_size 属性。
import ray
ctx = ray.data.DataContext.get_current()
ctx.target_min_block_size = 1 * 1024 * 1024
ctx.target_max_block_size = 128 * 1024 * 1024
动态块拆分#
如果一个块大于 192 MiB(目标最大尺寸的 50%),Ray Data 会动态地将该块拆分成更小的块。
要更改 Ray Data 拆分块的大小,请配置 MAX_SAFE_BLOCK_SIZE_FACTOR。默认值为 1.5。
import ray
ray.data.context.MAX_SAFE_BLOCK_SIZE_FACTOR = 1.5
Ray Data 无法拆分行。因此,如果您的数据集包含大行(例如,大图像),则 Ray Data 无法限制块的大小。
Shuffle 算法#
在数据处理中,shuffle 指的是重新分发单个数据集分区(在 Ray Data 中称为 块)的过程。
Ray Data 实现两种主要的 shuffle 算法:
哈希 Shuffle#
注意
Hash-shuffling 在 Ray 2.46 中可用。
Hash-shuffling 是一种经典的基于哈希分区的 shuffle,其中:
分区阶段:每个块中的行根据键列中的值进行哈希分区,分成指定数量的分区,遵循简单的残差公式
hash(key-values) % N(用于哈希表和几乎所有地方)。推送阶段:然后将来自单个块的分区分片推送到处理相应分区的相应聚合 actor(称为
HashShuffleAggregator)。Reduce 阶段:聚合器将接收到的单个分区的分片重新组合成块,可选地在生成结果块之前应用其他转换。
Hash-shuffling 特别适用于需要基于键的确定性分区(如 joins、group-by 操作和基于键的重新分区)的操作,通过确保具有相同键值的行被放置到同一分区。
注意
要在您的聚合和重新分区操作中使用 hash-shuffling,您目前需要在创建 Dataset 之前指定 ray.data.DataContext.get_current().shuffle_strategy = ShuffleStrategy.HASH_SHUFFLE。
范围分区 Shuffle#
基于范围分区的 shuffle 也是一种经典算法,它基于将数据集分割成目标数量的范围,这些范围由近似全序(已排序)数据集的真实范围的边界确定。
采样阶段:对每个输入块随机采样(10)行。样本合并成一个数据集,然后对该数据集进行排序并分割成目标数量的分区,定义近似的范围边界。
分区阶段:每个块根据上一步派生的范围边界进行排序并分割成分区。
Reduce 阶段:同一范围内的各个分区然后重新组合以生成结果块。
注意
Range-partitioning shuffle 是默认的 shuffling 策略。要显式设置它,请在创建 Dataset 之前指定 ray.data.DataContext.get_current().shuffle_strategy = ShuffleStrategy.SORT_SHUFFLE_PULL_BASED。
算子、计划和规划#
算子#
有两种类型的算子:逻辑算子和物理算子。逻辑算子是描述“做什么”的无状态对象。物理算子是描述“如何做”的有状态对象。逻辑算子的一个例子是 ReadOp,物理算子的一個例子是 TaskPoolMapOperator。
计划#
逻辑计划是一系列逻辑算子,而物理计划是一系列物理算子。当您调用 ray.data.read_images() 和 ray.data.Dataset.map_batches() 等 API 时,Ray Data 会生成一个逻辑计划。执行开始时,规划器会生成相应的物理计划。
规划器#
Ray Data 规划器将逻辑算子转换为一个或多个物理算子。例如,规划器将 ReadOp 逻辑算子转换为两个物理算子:InputDataBuffer 和 TaskPoolMapOperator。而 ReadOp 逻辑算子仅描述输入数据,TaskPoolMapOperator 物理算子实际启动任务来读取数据。
计划优化#
Ray Data 对逻辑计划和物理计划都应用优化。例如,OperatorFusionRule 将一系列物理 map 算子合并成一个 map 算子。这可以防止 map 算子之间不必要的序列化。
要添加自定义优化规则,请实现一个扩展 Rule 的类,并配置 DEFAULT_LOGICAL_RULES 或 DEFAULT_PHYSICAL_RULES。
import ray
from ray.data._internal.logical.interfaces import Rule
from ray.data._internal.logical.optimizers import get_logical_ruleset
class CustomRule(Rule):
def apply(self, plan):
...
logical_ruleset = get_logical_ruleset()
logical_ruleset.add(CustomRule)
物理算子类型#
物理算子接收块引用的流并输出另一个块引用的流。一些物理算子会启动 Ray 任务和 Actor 来转换块,而另一些只操作引用。
MapOperator 是最常见的算子。所有读取、转换和写入操作都用它实现。为了处理数据,MapOperator 实现会使用 Ray 任务或 Ray Actor。
非 map 算子包括 OutputSplitter 和 LimitOperator。这两个算子操作数据引用,但不会启动任务或修改底层数据。
执行#
执行器#
执行器调度任务并将数据在物理算子之间移动。
执行器和算子位于数据集执行开始的进程中。对于批量推理作业,此进程通常是驱动程序。对于训练作业,执行器运行在一个名为 SplitCoordinator 的特殊 actor 上,该 actor 处理 streaming_split()。
由算子启动的任务和 Actor 会在集群中调度,输出存储在 Ray 的分布式对象存储中。执行器操作对象引用,本身不会将底层数据提取到执行器。
输出队列#
每个物理算子都有一个关联的输出队列。当一个物理算子产生输出时,执行器会将输出移至该算子的输出队列。
流式执行#
与批量同步执行相比,Ray Data 的流式执行不会等待一个算子完成才开始下一个。每个算子接收和输出一个块流。这种方法允许您处理过大而无法放入集群内存的数据集。
调度循环#
执行器运行一个循环。每一步如下:
等待运行的任务和 Actor 产生新的输出。
将新输出移至相应的算子输出队列。
选择一些算子并为它们分配新输入。这些算子通过启动新任务或操作元数据来处理新输入。
选择最佳算子来分配输入是 Ray Data 中最重要的决定之一。此决定对 Ray Data 作业的性能、稳定性和可伸缩性至关重要。如果算子满足以下条件,执行器就可以调度该算子:
算子有输入。
有足够的可用资源。
算子未处于反压状态。
如果存在多个可行的算子,执行器会选择具有最小输出队列的算子。
Scheduling#
Ray Data 使用 Ray Core 进行执行。下面是 Ray Data 的 调度策略 摘要:
SPREAD调度策略确保数据块和 map 任务在集群中均匀分布。默认情况下,数据集任务会忽略放置组,请参阅 Ray Data 和放置组。
如果总参数大小小于 50 MB,Map 操作会使用 `SPREAD` 调度策略;否则,它们会使用 `DEFAULT` 调度策略。
Read 操作会使用 `SPREAD` 调度策略。
所有其他操作,如 split、sort 和 shuffle,都使用 `DEFAULT` 调度策略。
Ray Data 和放置组#
默认情况下,Ray Data 会将其任务和 Actor 配置为使用集群默认调度策略(`"DEFAULT"`)。您可以在此处检查此配置变量:ray.data.DataContext.get_current().scheduling_strategy。此调度策略会在任何现有放置组之外调度这些任务和 Actor。要将当前放置组资源专门用于 Ray Data,请将 ray.data.DataContext.get_current().scheduling_strategy = None 设置为。
仅在高级用例中考虑此覆盖以提高性能可预测性。一般建议是让 Ray Data 在放置组之外运行。
Ray Data 和 Tune#
将 Ray Data 与 Ray Tune 结合使用时,确保 Ray Data 有足够的可用 CPU 至关重要。默认情况下,Tune 会尝试充分利用集群 CPU。这会阻止 Ray Data 调度任务,降低性能或导致工作负载挂起。
为确保始终有 CPU 资源可用于 Ray Data 执行,请使用 `max_concurrent_trials` Tune 选项限制并发 Tune 试验的数量。
import ray
from ray import tune
# This workload will use spare cluster resources for execution.
def objective(*args):
ray.data.range(10).show()
# Create a cluster with 4 CPU slots available.
ray.init(num_cpus=4)
# By setting `max_concurrent_trials=3`, this ensures the cluster will always
# have a sparse CPU for Dataset. Try setting `max_concurrent_trials=4` here,
# and notice that the experiment will appear to hang.
tuner = tune.Tuner(
tune.with_resources(objective, {"cpu": 1}),
tune_config=tune.TuneConfig(
num_samples=1,
max_concurrent_trials=3
)
)
tuner.fit()
内存管理#
本节介绍 Ray Data 如何管理执行和对象存储内存。
执行内存#
执行期间,一个任务可以读取多个输入块,并写入多个输出块。输入和输出块会消耗 worker 堆内存以及通过 Ray 对象存储的共享内存。Ray 通过溢出到磁盘来限制对象存储内存使用,但过度的 worker 堆内存使用会导致内存不足错误。
有关调整内存使用和防止内存不足错误的更多信息,请参阅 性能指南。
对象存储内存#
Ray Data 使用 Ray 对象存储来存储数据块,这意味着它继承了 Ray 对象存储的内存管理功能。本节讨论相关功能:
对象溢出:由于 Ray Data 使用 Ray 对象存储来存储数据块,因此无法放入对象存储内存的任何块都会自动溢出到磁盘。当下游计算任务需要时,对象会自动重新加载。
本地调度:Ray 优先在已经拥有对象本地副本的节点上调度计算任务,从而减少在集群节点之间传输对象的需要。
引用计数:只要有任何 Dataset 引用 Dataset 块,这些块就会被对象存储引用计数器保留。要释放内存,请删除对 Dataset 对象的任何 Python 引用。