Ray Data 内部原理#

本指南描述了 Ray Data 的实现。目标读者是 Ray Data 的高级用户和开发者。

有关 Ray Data 的更温和介绍,请参阅快速入门

关键概念#

数据集和块#

数据集#

Dataset 是主要面向用户的 Python API。它表示一个分布式数据集合,并定义了数据加载和处理操作。你通常按以下方式使用此 API

  1. 从外部存储或内存数据创建 Ray Dataset。

  2. 对数据应用转换。

  3. 将输出写入外部存储或将输出提供给训练工作节点。

#

块是 Ray Data 存储在对象存储中并在网络上传输的基本数据单元。每个块包含不相交的行子集,Ray Data 并行加载和转换这些块。

下图可视化了一个包含三个块的数据集,每个块包含 1000 行。Ray Data 将 Dataset 保留在触发执行的进程上(通常是驱动程序),并将这些块作为对象存储在 Ray 的共享内存 对象存储 中。

../_images/dataset-arch.svg

块格式#

块是 Arrow 表或 pandas DataFrame。通常,块是 Arrow 表,除非 Arrow 无法表示你的数据。

块格式不影响 iter_batches() 等 API 返回的数据类型。

块大小限制#

Ray Data 限制块大小,以避免过多的通信开销并防止内存不足错误。小块有利于延迟和更流式的执行,而大块则减少调度器和通信开销。默认范围试图在大多数作业中做出很好的权衡。

Ray Data 尝试将块大小限制在 1 MiB 到 128 MiB 之间。要更改块大小范围,请配置 DataContexttarget_min_block_sizetarget_max_block_size 属性。

import ray

ctx = ray.data.DataContext.get_current()
ctx.target_min_block_size = 1 * 1024 * 1024
ctx.target_max_block_size = 128 * 1024 * 1024

动态块分割#

如果一个块大于 192 MiB(比目标最大大小多 50%),Ray Data 会动态地将该块分割成更小的块。

要更改 Ray Data 分割块的大小阈值,请配置 MAX_SAFE_BLOCK_SIZE_FACTOR。默认值为 1.5。

import ray

ray.data.context.MAX_SAFE_BLOCK_SIZE_FACTOR = 1.5

Ray Data 无法分割行。因此,如果你的数据集包含大行(例如,大图像),那么 Ray Data 无法限制块大小。

算子、计划和规划#

算子#

有两种类型的算子:逻辑算子和物理算子。逻辑算子是无状态对象,描述“要做什么”。物理算子是有状态对象,描述“如何做”。逻辑算子的一个例子是 ReadOp,物理算子的一个例子是 TaskPoolMapOperator

计划#

逻辑计划是一系列逻辑算子,物理计划是一系列物理算子。当你调用 ray.data.read_images()ray.data.Dataset.map_batches() 等 API 时,Ray Data 生成一个逻辑计划。执行开始时,规划器生成相应的物理计划。

规划器#

Ray Data 规划器将逻辑算子转换为一个或多个物理算子。例如,规划器将 ReadOp 逻辑算子转换为两个物理算子:InputDataBufferTaskPoolMapOperator。ReadOp 逻辑算子仅描述输入数据,而 TaskPoolMapOperator 物理算子实际上启动任务来读取数据。

计划优化#

Ray Data 对逻辑计划和物理计划都应用优化。例如,OperatorFusionRule 将一系列物理 map 算子合并为一个 map 算子。这可以防止 map 算子之间不必要的序列化。

要添加自定义优化规则,请实现一个扩展 Rule 的类,并配置 DEFAULT_LOGICAL_RULESDEFAULT_PHYSICAL_RULES

import ray
from ray.data._internal.logical.interfaces import Rule

class CustomRule(Rule):
    def apply(self, plan):
        ...

ray.data._internal.logical.optimizers.DEFAULT_LOGICAL_RULES.append(CustomRule)

物理算子的类型#

物理算子接收一个块引用流,并输出另一个块引用流。一些物理算子启动 Ray Tasks 和 Actors 来转换块,而另一些则只操作引用。

MapOperator 是最常见的算子。所有读、转换和写操作都通过它实现。为了处理数据,MapOperator 实现使用 Ray Tasks 或 Ray Actors。

非 map 算子包括 OutputSplitterLimitOperator。这两个算子操作数据的引用,但不启动任务或修改底层数据。

执行#

执行器#

执行器调度任务并在物理算子之间移动数据。

执行器和算子位于数据集执行开始的进程上。对于批量推理作业,此进程通常是驱动程序。对于训练作业,执行器在名为 SplitCoordinator 的特殊 Actor 上运行,该 Actor 处理 streaming_split()

算子启动的任务和 Actor 会在集群中调度,输出存储在 Ray 的分布式对象存储中。执行器操作对象的引用,但不会将底层数据本身获取到执行器。

输出队列#

每个物理算子都有一个关联的输出队列。当物理算子产生输出时,执行器会将输出移动到该算子的输出队列中。

流式执行#

与批量同步执行不同,Ray Data 的流式执行不会等待一个算子完成才开始下一个。每个算子接收并输出一个块流。这种方法允许你处理超出集群内存大小的数据集。

调度循环#

执行器运行一个循环。每一步是这样的

  1. 等待正在运行的任务和 Actor 产生新的输出。

  2. 将新输出移动到相应的算子输出队列中。

  3. 选择一些算子并为其分配新的输入。这些算子通过启动新任务或操作元数据来处理新输入。

选择最优的算子来分配输入是 Ray Data 中最重要的决策之一。这个决策对 Ray Data 作业的性能、稳定性和可伸缩性至关重要。如果算子满足以下条件,执行器可以调度该算子

  • 算子有输入。

  • 有足够的可用资源。

  • 算子没有发生背压。

如果存在多个可行的算子,执行器会选择输出队列最小的算子。

调度#

Ray Data 使用 Ray Core 进行执行。以下是 Ray Data 调度策略 的摘要

  • SPREAD 调度策略 确保数据块和 map 任务在整个集群中均匀分布。

  • Dataset 任务默认忽略放置组,参见 Ray Data 和放置组

  • 如果总参数大小小于 50 MB,Map 操作使用 SPREAD 调度策略;否则,它们使用 DEFAULT 调度策略。

  • 读操作使用 SPREAD 调度策略。

  • 所有其他操作,例如 split、sort 和 shuffle,使用 DEFAULT 调度策略。

Ray Data 和放置组#

默认情况下,Ray Data 将其任务和 Actor 配置为使用集群默认调度策略("DEFAULT")。你可以在此处检查此配置变量:ray.data.DataContext.get_current().scheduling_strategy。此调度策略将这些 Task 和 Actor 调度到任何当前放置组之外。要专门为 Ray Data 使用当前放置组资源,请设置 ray.data.DataContext.get_current().scheduling_strategy = None

仅在高级用例中考虑此覆盖以提高性能可预测性。一般建议是让 Ray Data 在放置组之外运行。

Ray Data 和 Tune#

当将 Ray Data 与 Ray Tune 结合使用时,重要的是确保有足够的空闲 CPU 供 Ray Data 运行。默认情况下,Tune 会尝试充分利用集群 CPU。这可能会阻止 Ray Data 调度任务,从而降低性能或导致工作负载挂起。

为了确保 Ray Data 执行始终有可用的 CPU 资源,请使用 max_concurrent_trials Tune 选项限制并发 Tune 试验的数量。

import ray
from ray import tune

# This workload will use spare cluster resources for execution.
def objective(*args):
    ray.data.range(10).show()

# Create a cluster with 4 CPU slots available.
ray.init(num_cpus=4)

# By setting `max_concurrent_trials=3`, this ensures the cluster will always
# have a sparse CPU for Dataset. Try setting `max_concurrent_trials=4` here,
# and notice that the experiment will appear to hang.
tuner = tune.Tuner(
    tune.with_resources(objective, {"cpu": 1}),
    tune_config=tune.TuneConfig(
        num_samples=1,
        max_concurrent_trials=3
    )
)
tuner.fit()

内存管理#

本节描述了 Ray Data 如何管理执行内存和对象存储内存。

执行内存#

在执行期间,任务可以读取多个输入块,并写入多个输出块。输入和输出块会消耗工作节点堆内存以及通过 Ray 对象存储消耗的共享内存。Ray 通过溢出到磁盘来限制对象存储内存使用,但过多的工作节点堆内存使用可能会导致内存不足错误。

有关调整内存使用和防止内存不足错误的更多信息,请参阅性能指南

对象存储内存#

Ray Data 使用 Ray 对象存储来存储数据块,这意味着它继承了 Ray 对象存储的内存管理功能。本节讨论相关功能

  • 对象溢出:由于 Ray Data 使用 Ray 对象存储来存储数据块,因此任何无法放入对象存储内存的块都会自动溢出到磁盘。当需要时,下游计算任务会自动重新加载这些对象

  • 局部性调度:Ray 优先在已经拥有对象本地副本的节点上调度计算任务,从而减少了在集群节点之间传输对象的需要。

  • 引用计数:只要有任何 Dataset 引用 Dataset 块,它们就会通过对象存储引用计数保持存活。要释放内存,请删除对 Dataset 对象的任何 Python 引用。