关键概念#

数据集和块#

Ray Data 中有两个主要概念:

  • 数据集

Dataset 是用户主要使用的 Python API。它代表一个分布式数据集,并定义了数据加载和处理操作。用户通常通过以下方式使用 API:

  1. 从外部存储或内存数据创建 Dataset

  2. 对数据应用转换。

  3. 将输出写入外部存储或将输出馈送到训练工作节点。

Dataset API 是惰性的,这意味着操作直到您具体化或消费数据集时才执行,例如调用 show()。这允许 Ray Data 优化执行计划并以流水线化的流式方式执行操作。

Block(块)是一组行,代表数据集的单个分区。块,作为由列式格式(如 Arrow)表示的行集合,

是 Ray Data 中数据处理的基本单元。

  1. 每个数据集都被划分为若干个块,然后

  2. 整个数据集的处理在块级别进行分布式和并行化(块并行处理,并且在大多数情况下是独立的)。

块是每个 Ray Data 数据集进行分区并存储在对象存储中的基本数据单元。数据处理在块级别并行化。

下图可视化了一个包含三个块的数据集,每个块包含 1000 行。Ray Data 将 Dataset 保存在触发执行的进程(通常是程序的入口点,称为 driver)上,并将块作为对象存储在 Ray 的共享内存 对象存储 中。内部,Ray Data 可以原生处理 Pandas DataFrame 或 PyArrow Table 形式的块。

../_images/dataset-arch-with-blocks.svg

运算符和计划#

Ray Data 使用一个两阶段的规划过程来高效地执行操作。当您使用 Dataset API 编写程序时,Ray Data 首先构建一个逻辑计划——一个描述要执行的操作的高层表示。当执行开始时,它将此逻辑计划转换为一个物理计划,该计划精确地指定如何执行这些操作。

此图说明了完整的规划过程。

../_images/get_execution_plan.svg

这些计划的构建块是运算符。

  • 逻辑计划由逻辑运算符组成,它们描述要执行的操作。例如,ReadOp 指定要读取哪些数据。

  • 物理计划由物理运算符组成,它们描述如何执行操作。例如,TaskPoolMapOperator 启动 Ray 任务来实际读取数据。

以下是一个 Ray Data 如何构建逻辑计划的简单示例。当您将操作链接在一起时,Ray Data 会在后台构建逻辑计划。

dataset = ray.data.range(100)
dataset = dataset.add_column("test", lambda x: x["id"] + 1)
dataset = dataset.select_columns("test")

您可以通过打印数据集来检查生成的逻辑计划。

Project
+- MapBatches(add_column)
   +- Dataset(schema={...})

当执行开始时,Ray Data 会优化逻辑计划,然后将其转换为物理计划——一系列实现实际数据转换的运算符。在此转换过程中,

  1. 一个逻辑运算符可能会变成多个物理运算符。例如,ReadOp 会变成 InputDataBufferTaskPoolMapOperator

  2. 逻辑计划和物理计划都会经过优化过程。例如,OperatorFusionRule 会合并 map 运算符以减少序列化开销。

物理运算符的工作方式是:

  • 接收一系列块引用

  • 执行其操作(通过 Ray 任务/ Actor 转换数据或操作引用)

  • 输出另一系列块引用。

有关 Ray 任务和 Actor 的更多详细信息,请参阅 Ray 核心概念

注意

数据集的执行计划仅在您通过 show() 等操作具体化或消费数据集时运行。

流式执行模型#

Ray Data 使用流式执行模型来高效地处理大型数据集。

Ray Data 不是一次性将整个数据集具体化到内存中,而是可以通过一系列操作以流式方式处理数据。

这对于数据集可能过大而无法放入内存,并且工作负载不需要整个数据集同时都在内存中的推理和训练工作负载非常有用。

以下是一个流式执行模型如何工作的示例。下面的代码创建了一个包含 1K 行的数据集,应用了 map 和 filter 转换,然后调用 show 操作来触发流水线。

import ray

# Create a dataset with 1K rows
ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")

# Define a pipeline of operations
ds = ds.map(lambda x: {"target1": x["target"] * 2})
ds = ds.map(lambda x: {"target2": x["target1"] * 2})
ds = ds.map(lambda x: {"target3": x["target2"] * 2})
ds = ds.filter(lambda x: x["target3"] % 4 == 0)

# Data starts flowing when you call a method like show()
ds.show(5)

这会创建一个如下所示的逻辑计划。

Filter(<lambda>)
+- Map(<lambda>)
   +- Map(<lambda>)
      +- Map(<lambda>)
         +- Dataset(schema={...})

流式拓扑图如下所示。

../_images/streaming-topology.svg

在流式执行模型中,运算符被连接成一个流水线,每个运算符的输出队列直接馈送到下一个下游运算符的输入队列。这在执行计划中创建了一个高效的数据流。

流式执行模型为数据处理提供了显著的优势。

特别是,流水线架构允许多个阶段并发执行,从而提高整体性能和资源利用率。例如,如果 map 运算符需要 GPU 资源,流式执行模型可以与 filter 运算符(可能在 CPU 上运行)并发执行 map 运算符,从而在流水线的整个持续时间内有效地利用 GPU。

总而言之,Ray Data 的流式执行模型可以高效地处理远大于可用内存的数据集,同时通过集群的并行执行保持高性能。

注意

ds.sort()ds.groupby() 这样的操作需要具体化数据,这可能会影响非常大数据集的内存使用。

您可以在这篇 博客文章 中阅读有关流式执行模型的更多信息。