关键概念#

数据集和块#

Ray Data 中有两个主要概念

  • 数据集

Dataset 是面向用户的主要 Python API。它表示分布式数据集合,并定义数据加载和处理操作。用户通常通过以下方式使用该 API:

  1. 从外部存储或内存数据创建 Dataset

  2. 对数据应用转换。

  3. 将输出写入外部存储或将输出馈送给训练 worker。

Dataset API 是延迟执行的,这意味着操作不会在您具体化或消费数据集之前执行,例如 show()。这使得 Ray Data 能够优化执行计划,并以管道式流式方式执行操作。

每个 Dataset 都由 blocks 组成。一个 block 是数据集中连续的行子集,它们分布在集群中并独立并行处理。

下图展示了一个包含三个块的数据集,每个块包含 1000 行。Ray Data 将 Dataset 保存在触发执行的进程上(通常是程序的入口点,称为Driver),并将块作为对象存储在 Ray 的共享内存对象存储中。在内部,Ray Data 使用 Pandas DataFrames 或 Arrow tables 表示块。

../_images/dataset-arch-with-blocks.svg

操作符和计划#

Ray Data 使用两阶段规划过程来高效执行操作。当您使用 Dataset API 编写程序时,Ray Data 首先构建一个逻辑计划——对要执行的操作的高级描述。当执行开始时,它会将其转换为一个物理计划,该计划精确指定了如何执行这些操作。

此图说明了完整的规划过程

../_images/get_execution_plan.svg

这些计划的构建块是操作符

  • 逻辑计划由描述什么操作的逻辑操作符组成。例如,ReadOp 指定要读取什么数据。

  • 物理计划由描述如何执行操作的物理操作符组成。例如,TaskPoolMapOperator 启动 Ray 任务来实际读取数据。

这是一个简单的 Ray Data 如何构建逻辑计划的示例。当您将操作链接在一起时,Ray Data 会在幕后构建逻辑计划。

dataset = ray.data.range(100)
dataset = dataset.add_column("test", lambda x: x["id"] + 1)
dataset = dataset.select_columns("test")

您可以通过打印数据集来检查生成的逻辑计划。

Project
+- MapBatches(add_column)
   +- Dataset(schema={...})

当执行开始时,Ray Data 会优化逻辑计划,然后将其转换为物理计划——一系列实现实际数据转换的操作符。在此转换过程中

  1. 一个逻辑操作符可能变成多个物理操作符。例如,ReadOp 同时变成 InputDataBufferTaskPoolMapOperator

  2. 逻辑计划和物理计划都经过优化过程。例如,OperatorFusionRule 合并 map 操作符以减少序列化开销。

物理操作符的工作原理如下

  • 接收块引用的流

  • 执行其操作(使用 Ray Tasks/Actors 转换数据或操纵引用)

  • 输出另一个块引用的流

有关 Ray Tasks 和 Actors 的更多详细信息,请参阅Ray Core 概念

注意

数据集的执行计划仅在您通过 show() 等操作具体化或消费数据集时运行。

流式执行模型#

Ray Data 使用流式执行模型高效处理大型数据集。

Ray Data 不会将整个数据集一次性具体化到内存中,而是可以通过操作管道以流式方式处理数据。

这对于推理和训练工作负载非常有用,因为数据集可能太大而无法完全放入内存,并且工作负载不需要将整个数据集一次性加载到内存中。

以下是流式执行模型如何工作的示例。下面的代码创建一个包含 1K 行的数据集,应用 map 和 filter 转换,然后调用 show 操作来触发管道。

import ray

# Create a dataset with 1K rows
ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")

# Define a pipeline of operations
ds = ds.map(lambda x: {"target1": x["target"] * 2})
ds = ds.map(lambda x: {"target2": x["target1"] * 2})
ds = ds.map(lambda x: {"target3": x["target2"] * 2})
ds = ds.filter(lambda x: x["target3"] % 4 == 0)

# Data starts flowing when you call a method like show()
ds.show(5)

这将创建如下所示的逻辑计划

Filter(<lambda>)
+- Map(<lambda>)
   +- Map(<lambda>)
      +- Map(<lambda>)
         +- Dataset(schema={...})

流式拓扑结构如下所示

../_images/streaming-topology.svg

在流式执行模型中,操作符以管道方式连接,每个操作符的输出队列直接馈送到下一个下游操作符的输入队列。这创建了数据在执行计划中高效流动。

流式执行模型为数据处理提供了显著优势。

特别是,管道架构支持多个阶段并发执行,提高了整体性能和资源利用率。例如,如果 map 操作符需要 GPU 资源,流式执行模型可以与 filter 操作符(可能在 CPU 上运行)并发执行 map 操作符,从而在管道的整个持续时间内有效利用 GPU。

总而言之,Ray Data 的流式执行模型可以高效地处理比可用内存大得多的数据集,同时通过集群并行执行保持高性能。

注意

诸如 ds.sort()ds.groupby() 等操作需要具体化数据,这可能会影响超大型数据集的内存使用。

您可以在这篇博客文章中详细了解流式执行模型。