关键概念#
数据集和块#
Ray Data 中有两个主要概念
数据集
块
Dataset
是面向用户的主要 Python API。它表示分布式数据集合,并定义数据加载和处理操作。用户通常通过以下方式使用该 API:
从外部存储或内存数据创建
Dataset
。对数据应用转换。
将输出写入外部存储或将输出馈送给训练 worker。
Dataset API 是延迟执行的,这意味着操作不会在您具体化或消费数据集之前执行,例如 show()
。这使得 Ray Data 能够优化执行计划,并以管道式流式方式执行操作。
每个 Dataset 都由 blocks 组成。一个 block 是数据集中连续的行子集,它们分布在集群中并独立并行处理。
下图展示了一个包含三个块的数据集,每个块包含 1000 行。Ray Data 将 Dataset
保存在触发执行的进程上(通常是程序的入口点,称为Driver),并将块作为对象存储在 Ray 的共享内存对象存储中。在内部,Ray Data 使用 Pandas DataFrames 或 Arrow tables 表示块。
操作符和计划#
Ray Data 使用两阶段规划过程来高效执行操作。当您使用 Dataset API 编写程序时,Ray Data 首先构建一个逻辑计划——对要执行的操作的高级描述。当执行开始时,它会将其转换为一个物理计划,该计划精确指定了如何执行这些操作。
此图说明了完整的规划过程
这些计划的构建块是操作符
逻辑计划由描述什么操作的逻辑操作符组成。例如,
ReadOp
指定要读取什么数据。物理计划由描述如何执行操作的物理操作符组成。例如,
TaskPoolMapOperator
启动 Ray 任务来实际读取数据。
这是一个简单的 Ray Data 如何构建逻辑计划的示例。当您将操作链接在一起时,Ray Data 会在幕后构建逻辑计划。
dataset = ray.data.range(100)
dataset = dataset.add_column("test", lambda x: x["id"] + 1)
dataset = dataset.select_columns("test")
您可以通过打印数据集来检查生成的逻辑计划。
Project
+- MapBatches(add_column)
+- Dataset(schema={...})
当执行开始时,Ray Data 会优化逻辑计划,然后将其转换为物理计划——一系列实现实际数据转换的操作符。在此转换过程中
一个逻辑操作符可能变成多个物理操作符。例如,
ReadOp
同时变成InputDataBuffer
和TaskPoolMapOperator
。逻辑计划和物理计划都经过优化过程。例如,
OperatorFusionRule
合并 map 操作符以减少序列化开销。
物理操作符的工作原理如下
接收块引用的流
执行其操作(使用 Ray Tasks/Actors 转换数据或操纵引用)
输出另一个块引用的流
有关 Ray Tasks 和 Actors 的更多详细信息,请参阅Ray Core 概念。
注意
数据集的执行计划仅在您通过 show()
等操作具体化或消费数据集时运行。
流式执行模型#
Ray Data 使用流式执行模型高效处理大型数据集。
Ray Data 不会将整个数据集一次性具体化到内存中,而是可以通过操作管道以流式方式处理数据。
这对于推理和训练工作负载非常有用,因为数据集可能太大而无法完全放入内存,并且工作负载不需要将整个数据集一次性加载到内存中。
以下是流式执行模型如何工作的示例。下面的代码创建一个包含 1K 行的数据集,应用 map 和 filter 转换,然后调用 show
操作来触发管道。
import ray
# Create a dataset with 1K rows
ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")
# Define a pipeline of operations
ds = ds.map(lambda x: {"target1": x["target"] * 2})
ds = ds.map(lambda x: {"target2": x["target1"] * 2})
ds = ds.map(lambda x: {"target3": x["target2"] * 2})
ds = ds.filter(lambda x: x["target3"] % 4 == 0)
# Data starts flowing when you call a method like show()
ds.show(5)
这将创建如下所示的逻辑计划
Filter(<lambda>)
+- Map(<lambda>)
+- Map(<lambda>)
+- Map(<lambda>)
+- Dataset(schema={...})
流式拓扑结构如下所示
在流式执行模型中,操作符以管道方式连接,每个操作符的输出队列直接馈送到下一个下游操作符的输入队列。这创建了数据在执行计划中高效流动。
流式执行模型为数据处理提供了显著优势。
特别是,管道架构支持多个阶段并发执行,提高了整体性能和资源利用率。例如,如果 map 操作符需要 GPU 资源,流式执行模型可以与 filter 操作符(可能在 CPU 上运行)并发执行 map 操作符,从而在管道的整个持续时间内有效利用 GPU。
总而言之,Ray Data 的流式执行模型可以高效地处理比可用内存大得多的数据集,同时通过集群并行执行保持高性能。
注意
诸如 ds.sort()
和 ds.groupby()
等操作需要具体化数据,这可能会影响超大型数据集的内存使用。
您可以在这篇博客文章中详细了解流式执行模型。