打乱类型#

Ray Data 提供了几种不同的打乱数据选项,可以在打乱控制的粒度与内存消耗和运行时长之间进行权衡。以下选项按资源消耗和运行时长的增加顺序排列;请根据您的用例选择最合适的方法。

打乱文件顺序#

要在读取前随机打乱输入文件的顺序,请调用支持打乱的读取函数,例如 read_images(),并使用 shuffle="files" 参数。这会将输入文件随机分配给 worker 进行读取。

这是最快的打乱选项,纯粹是元数据操作。此选项不会打乱文件内的实际行,因此如果每个文件包含许多行,随机性可能会较差。

迭代批次时进行局部打乱#

要使用迭代方法(例如 iter_batches()iter_torch_batches()iter_tf_batches())对一部分行进行局部打乱,请指定 local_shuffle_buffer_size。这会在迭代期间将行打乱到指定的缓冲区大小。有关更多详细信息,请参阅迭代并打乱批次

import ray

ds = ray.data.read_images(
    "s3://anonymous@ray-example-data/image-datasets/simple",
    shuffle="files",
)

这比打乱文件顺序慢,并且是在本地打乱行,无需网络传输。此局部打乱缓冲区可以与打乱文件顺序结合使用;请参阅打乱文件顺序

提示

如果在使用 local_shuffle_buffer_size 时观察到吞吐量下降,请检查通过查看 ds.stats() 输出(在 Batch iteration time breakdown 下的 In batch formatting)在批次创建中花费的总时间。如果此时间明显长于其他步骤花费的时间,请减小 local_shuffle_buffer_size 或完全关闭局部打乱缓冲区,仅打乱文件顺序

import ray

ds = ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")

for batch in ds.iter_batches(
    batch_size=2,
    batch_format="numpy",
    local_shuffle_buffer_size=250,
):
    print(batch)

打乱块顺序#

此选项随机化数据集中块的顺序。块是 Ray Data 存储在对象存储中的基本数据块单元。单独应用此操作不涉及繁重的计算和通信。但是,它要求 Ray Data 在应用操作之前将所有块实例化到内存中。仅当您的数据集足够小,可以容纳到对象存储内存中时,才使用此选项。

要执行块顺序打乱,请使用 randomize_block_order

打乱所有行(全局打乱)#

要全局随机打乱所有行,请调用 random_shuffle()。这是最慢的打乱选项,需要跨网络在 worker 之间传输数据。此选项在所有选项中实现了最佳随机性。

ds = ray.data.read_text(
    "s3://anonymous@ray-example-data/sms_spam_collection_subset.txt"
)

# Randomize the block order of this dataset.
ds = ds.randomize_block_order()

高级:优化打乱#

注意

import ray

ds = (
    ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")
    .random_shuffle()
)

这是一个活跃的开发领域。如果您的 Dataset 使用了打乱操作并且在配置打乱时遇到问题,请在 GitHub 上提交 Ray Data 问题

何时应该使用全局每 epoch 打乱?#

仅当您的模型对训练数据的随机性敏感时,才使用全局每 epoch 打乱。基于理论基础,所有基于梯度下降的模型训练器都受益于改进的(全局)打乱质量。实际上,对于表格数据/模型,这种益处尤其明显。但是,打乱越全局,打乱操作就越昂贵。由于数据传输成本,在多节点集群上进行分布式数据并行训练时,成本会增加。在使用超大型数据集时,此成本可能令人望而却步。

确定预处理时间、成本和每 epoch 打乱质量之间最佳权衡的最佳途径是,在不同的打乱策略(例如不打乱、局部打乱或全局打乱)下,衡量您的特定模型在每个训练步骤中的精度增益。

只要您的数据加载和打乱吞吐量高于您的训练吞吐量,您的 GPU 就应该饱和。如果您的模型对打乱敏感,请提高打乱质量,直到达到此阈值。

启用基于推送的打乱#

一些 Dataset 操作需要进行“打乱”操作,这意味着数据会从所有输入分区打乱到所有输出分区。这些操作包括 Dataset.random_shuffleDataset.sortDataset.groupby。例如,在排序操作期间,数据会在块之间重新排序,因此需要在分区之间进行打乱。将打乱扩展到大型数据和集群可能会很有挑战性,特别是当总数据集大小无法容纳到内存中时。

Ray Data 提供了一种称为基于推送的打乱的替代打乱实现,用于提高大规模性能。如果您的数据集包含超过 1000 个块或大于 1 TB,请尝试此选项。

要在本地或集群上试用此功能,您可以从 Ray 为 Dataset.random_shuffleDataset.sort 运行的每夜发布测试开始。为了了解您可以预期的性能,这里是在 20 台机器(AWS EC2 上的 m5.4xlarge 实例,每台 16 个 vCPU,64 GB RAM)上对 1-10 TB 数据进行 Dataset.random_shuffle 的一些运行时间结果。

要试用基于推送的打乱,请在运行应用程序时设置环境变量 RAY_DATA_PUSH_BASED_SHUFFLE=1

您还可以在程序执行期间通过设置 DataContext.use_push_based_shuffle 标志来指定打乱实现

https://docs.google.com/spreadsheets/d/e/2PACX-1vQvBWpdxHsW0-loasJsBpdarAixb7rjoo-lTgikghfCeKPQtjQDDo2fY51Yc1B6k_S4bnYEoChmFrH2/pubchart?oid=598567373&format=image

大规模打乱可能需要一段时间才能完成。为了调试方便,打乱操作支持仅执行打乱的一部分,这样您可以更快地收集执行概要。下面是一个示例,展示如何将随机打乱操作限制为两个输出块

$ wget https://raw.githubusercontent.com/ray-project/ray/master/release/nightly_tests/dataset/sort.py
$ RAY_DATA_PUSH_BASED_SHUFFLE=1 python sort.py --num-partitions=10 --partition-size=1e7

# Dataset size: 10 partitions, 0.01GB partition size, 0.1GB total
# [dataset]: Run `pip install tqdm` to enable progress reporting.
# 2022-05-04 17:30:28,806   INFO push_based_shuffle.py:118 -- Using experimental push-based shuffle.
# Finished in 9.571171760559082
# ...

下一页

保存数据

import ray

ctx = ray.data.DataContext.get_current()
ctx.use_push_based_shuffle = True

ds = (
    ray.data.range(1000)
    .random_shuffle()
)

本页内容

import ray

ctx = ray.data.DataContext.get_current()
ctx.set_config(
    "debug_limit_shuffle_execution_to_num_blocks", 2
)

ds = (
    ray.data.range(1000, override_num_blocks=10)
    .random_shuffle()
    .materialize()
)
print(ds.stats())
Operator 1 ReadRange->RandomShuffle: executed in 0.08s

    Suboperator 0 ReadRange->RandomShuffleMap: 2/2 blocks executed
    ...