Ray Data 内部原理#
本指南描述了 Ray Data 的实现。目标读者是 Ray Data 的高级用户和开发者。
有关 Ray Data 的更温和介绍,请参阅快速入门。
关键概念#
数据集和块#
数据集#
Dataset
是主要面向用户的 Python API。它表示一个分布式数据集合,并定义了数据加载和处理操作。你通常按以下方式使用此 API
从外部存储或内存数据创建 Ray Dataset。
对数据应用转换。
将输出写入外部存储或将输出提供给训练工作节点。
块#
块是 Ray Data 存储在对象存储中并在网络上传输的基本数据单元。每个块包含不相交的行子集,Ray Data 并行加载和转换这些块。
下图可视化了一个包含三个块的数据集,每个块包含 1000 行。Ray Data 将 Dataset
保留在触发执行的进程上(通常是驱动程序),并将这些块作为对象存储在 Ray 的共享内存 对象存储 中。
块格式#
块是 Arrow 表或 pandas
DataFrame。通常,块是 Arrow 表,除非 Arrow 无法表示你的数据。
块格式不影响 iter_batches()
等 API 返回的数据类型。
块大小限制#
Ray Data 限制块大小,以避免过多的通信开销并防止内存不足错误。小块有利于延迟和更流式的执行,而大块则减少调度器和通信开销。默认范围试图在大多数作业中做出很好的权衡。
Ray Data 尝试将块大小限制在 1 MiB 到 128 MiB 之间。要更改块大小范围,请配置 DataContext
的 target_min_block_size
和 target_max_block_size
属性。
import ray
ctx = ray.data.DataContext.get_current()
ctx.target_min_block_size = 1 * 1024 * 1024
ctx.target_max_block_size = 128 * 1024 * 1024
动态块分割#
如果一个块大于 192 MiB(比目标最大大小多 50%),Ray Data 会动态地将该块分割成更小的块。
要更改 Ray Data 分割块的大小阈值,请配置 MAX_SAFE_BLOCK_SIZE_FACTOR
。默认值为 1.5。
import ray
ray.data.context.MAX_SAFE_BLOCK_SIZE_FACTOR = 1.5
Ray Data 无法分割行。因此,如果你的数据集包含大行(例如,大图像),那么 Ray Data 无法限制块大小。
算子、计划和规划#
算子#
有两种类型的算子:逻辑算子和物理算子。逻辑算子是无状态对象,描述“要做什么”。物理算子是有状态对象,描述“如何做”。逻辑算子的一个例子是 ReadOp
,物理算子的一个例子是 TaskPoolMapOperator
。
计划#
逻辑计划是一系列逻辑算子,物理计划是一系列物理算子。当你调用 ray.data.read_images()
和 ray.data.Dataset.map_batches()
等 API 时,Ray Data 生成一个逻辑计划。执行开始时,规划器生成相应的物理计划。
规划器#
Ray Data 规划器将逻辑算子转换为一个或多个物理算子。例如,规划器将 ReadOp
逻辑算子转换为两个物理算子:InputDataBuffer
和 TaskPoolMapOperator
。ReadOp 逻辑算子仅描述输入数据,而 TaskPoolMapOperator
物理算子实际上启动任务来读取数据。
计划优化#
Ray Data 对逻辑计划和物理计划都应用优化。例如,OperatorFusionRule
将一系列物理 map 算子合并为一个 map 算子。这可以防止 map 算子之间不必要的序列化。
要添加自定义优化规则,请实现一个扩展 Rule
的类,并配置 DEFAULT_LOGICAL_RULES
或 DEFAULT_PHYSICAL_RULES
。
import ray
from ray.data._internal.logical.interfaces import Rule
class CustomRule(Rule):
def apply(self, plan):
...
ray.data._internal.logical.optimizers.DEFAULT_LOGICAL_RULES.append(CustomRule)
物理算子的类型#
物理算子接收一个块引用流,并输出另一个块引用流。一些物理算子启动 Ray Tasks 和 Actors 来转换块,而另一些则只操作引用。
MapOperator
是最常见的算子。所有读、转换和写操作都通过它实现。为了处理数据,MapOperator
实现使用 Ray Tasks 或 Ray Actors。
非 map 算子包括 OutputSplitter
和 LimitOperator
。这两个算子操作数据的引用,但不启动任务或修改底层数据。
执行#
执行器#
执行器调度任务并在物理算子之间移动数据。
执行器和算子位于数据集执行开始的进程上。对于批量推理作业,此进程通常是驱动程序。对于训练作业,执行器在名为 SplitCoordinator
的特殊 Actor 上运行,该 Actor 处理 streaming_split()
。
算子启动的任务和 Actor 会在集群中调度,输出存储在 Ray 的分布式对象存储中。执行器操作对象的引用,但不会将底层数据本身获取到执行器。
输出队列#
每个物理算子都有一个关联的输出队列。当物理算子产生输出时,执行器会将输出移动到该算子的输出队列中。
流式执行#
与批量同步执行不同,Ray Data 的流式执行不会等待一个算子完成才开始下一个。每个算子接收并输出一个块流。这种方法允许你处理超出集群内存大小的数据集。
调度循环#
执行器运行一个循环。每一步是这样的
等待正在运行的任务和 Actor 产生新的输出。
将新输出移动到相应的算子输出队列中。
选择一些算子并为其分配新的输入。这些算子通过启动新任务或操作元数据来处理新输入。
选择最优的算子来分配输入是 Ray Data 中最重要的决策之一。这个决策对 Ray Data 作业的性能、稳定性和可伸缩性至关重要。如果算子满足以下条件,执行器可以调度该算子
算子有输入。
有足够的可用资源。
算子没有发生背压。
如果存在多个可行的算子,执行器会选择输出队列最小的算子。
调度#
Ray Data 使用 Ray Core 进行执行。以下是 Ray Data 调度策略 的摘要
SPREAD
调度策略 确保数据块和 map 任务在整个集群中均匀分布。Dataset 任务默认忽略放置组,参见 Ray Data 和放置组。
如果总参数大小小于 50 MB,Map 操作使用
SPREAD
调度策略;否则,它们使用DEFAULT
调度策略。读操作使用
SPREAD
调度策略。所有其他操作,例如 split、sort 和 shuffle,使用
DEFAULT
调度策略。
Ray Data 和放置组#
默认情况下,Ray Data 将其任务和 Actor 配置为使用集群默认调度策略("DEFAULT
")。你可以在此处检查此配置变量:ray.data.DataContext.get_current().scheduling_strategy
。此调度策略将这些 Task 和 Actor 调度到任何当前放置组之外。要专门为 Ray Data 使用当前放置组资源,请设置 ray.data.DataContext.get_current().scheduling_strategy = None
。
仅在高级用例中考虑此覆盖以提高性能可预测性。一般建议是让 Ray Data 在放置组之外运行。
Ray Data 和 Tune#
当将 Ray Data 与 Ray Tune 结合使用时,重要的是确保有足够的空闲 CPU 供 Ray Data 运行。默认情况下,Tune 会尝试充分利用集群 CPU。这可能会阻止 Ray Data 调度任务,从而降低性能或导致工作负载挂起。
为了确保 Ray Data 执行始终有可用的 CPU 资源,请使用 max_concurrent_trials
Tune 选项限制并发 Tune 试验的数量。
import ray
from ray import tune
# This workload will use spare cluster resources for execution.
def objective(*args):
ray.data.range(10).show()
# Create a cluster with 4 CPU slots available.
ray.init(num_cpus=4)
# By setting `max_concurrent_trials=3`, this ensures the cluster will always
# have a sparse CPU for Dataset. Try setting `max_concurrent_trials=4` here,
# and notice that the experiment will appear to hang.
tuner = tune.Tuner(
tune.with_resources(objective, {"cpu": 1}),
tune_config=tune.TuneConfig(
num_samples=1,
max_concurrent_trials=3
)
)
tuner.fit()
内存管理#
本节描述了 Ray Data 如何管理执行内存和对象存储内存。
执行内存#
在执行期间,任务可以读取多个输入块,并写入多个输出块。输入和输出块会消耗工作节点堆内存以及通过 Ray 对象存储消耗的共享内存。Ray 通过溢出到磁盘来限制对象存储内存使用,但过多的工作节点堆内存使用可能会导致内存不足错误。
有关调整内存使用和防止内存不足错误的更多信息,请参阅性能指南。
对象存储内存#
Ray Data 使用 Ray 对象存储来存储数据块,这意味着它继承了 Ray 对象存储的内存管理功能。本节讨论相关功能
对象溢出:由于 Ray Data 使用 Ray 对象存储来存储数据块,因此任何无法放入对象存储内存的块都会自动溢出到磁盘。当需要时,下游计算任务会自动重新加载这些对象
局部性调度:Ray 优先在已经拥有对象本地副本的节点上调度计算任务,从而减少了在集群节点之间传输对象的需要。
引用计数:只要有任何 Dataset 引用 Dataset 块,它们就会通过对象存储引用计数保持存活。要释放内存,请删除对 Dataset 对象的任何 Python 引用。