打乱类型#
Ray Data 提供了几种不同的打乱数据选项,可以在打乱控制的粒度与内存消耗和运行时长之间进行权衡。以下选项按资源消耗和运行时长的增加顺序排列;请根据您的用例选择最合适的方法。
打乱文件顺序#
要在读取前随机打乱输入文件的顺序,请调用支持打乱的读取函数,例如 read_images()
,并使用 shuffle="files"
参数。这会将输入文件随机分配给 worker 进行读取。
这是最快的打乱选项,纯粹是元数据操作。此选项不会打乱文件内的实际行,因此如果每个文件包含许多行,随机性可能会较差。
迭代批次时进行局部打乱#
要使用迭代方法(例如 iter_batches()
、iter_torch_batches()
和 iter_tf_batches()
)对一部分行进行局部打乱,请指定 local_shuffle_buffer_size
。这会在迭代期间将行打乱到指定的缓冲区大小。有关更多详细信息,请参阅迭代并打乱批次。
import ray
ds = ray.data.read_images(
"s3://anonymous@ray-example-data/image-datasets/simple",
shuffle="files",
)
这比打乱文件顺序慢,并且是在本地打乱行,无需网络传输。此局部打乱缓冲区可以与打乱文件顺序结合使用;请参阅打乱文件顺序。
提示
如果在使用 local_shuffle_buffer_size
时观察到吞吐量下降,请检查通过查看 ds.stats()
输出(在 Batch iteration time breakdown
下的 In batch formatting
)在批次创建中花费的总时间。如果此时间明显长于其他步骤花费的时间,请减小 local_shuffle_buffer_size
或完全关闭局部打乱缓冲区,仅打乱文件顺序。
import ray
ds = ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")
for batch in ds.iter_batches(
batch_size=2,
batch_format="numpy",
local_shuffle_buffer_size=250,
):
print(batch)
打乱块顺序#
此选项随机化数据集中块的顺序。块是 Ray Data 存储在对象存储中的基本数据块单元。单独应用此操作不涉及繁重的计算和通信。但是,它要求 Ray Data 在应用操作之前将所有块实例化到内存中。仅当您的数据集足够小,可以容纳到对象存储内存中时,才使用此选项。
要执行块顺序打乱,请使用 randomize_block_order
。
打乱所有行(全局打乱)#
要全局随机打乱所有行,请调用 random_shuffle()
。这是最慢的打乱选项,需要跨网络在 worker 之间传输数据。此选项在所有选项中实现了最佳随机性。
ds = ray.data.read_text(
"s3://anonymous@ray-example-data/sms_spam_collection_subset.txt"
)
# Randomize the block order of this dataset.
ds = ds.randomize_block_order()
高级:优化打乱#
注意
import ray
ds = (
ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")
.random_shuffle()
)
这是一个活跃的开发领域。如果您的 Dataset 使用了打乱操作并且在配置打乱时遇到问题,请在 GitHub 上提交 Ray Data 问题。
何时应该使用全局每 epoch 打乱?#
仅当您的模型对训练数据的随机性敏感时,才使用全局每 epoch 打乱。基于理论基础,所有基于梯度下降的模型训练器都受益于改进的(全局)打乱质量。实际上,对于表格数据/模型,这种益处尤其明显。但是,打乱越全局,打乱操作就越昂贵。由于数据传输成本,在多节点集群上进行分布式数据并行训练时,成本会增加。在使用超大型数据集时,此成本可能令人望而却步。
确定预处理时间、成本和每 epoch 打乱质量之间最佳权衡的最佳途径是,在不同的打乱策略(例如不打乱、局部打乱或全局打乱)下,衡量您的特定模型在每个训练步骤中的精度增益。
只要您的数据加载和打乱吞吐量高于您的训练吞吐量,您的 GPU 就应该饱和。如果您的模型对打乱敏感,请提高打乱质量,直到达到此阈值。
启用基于推送的打乱#
一些 Dataset 操作需要进行“打乱”操作,这意味着数据会从所有输入分区打乱到所有输出分区。这些操作包括 Dataset.random_shuffle
、Dataset.sort
和 Dataset.groupby
。例如,在排序操作期间,数据会在块之间重新排序,因此需要在分区之间进行打乱。将打乱扩展到大型数据和集群可能会很有挑战性,特别是当总数据集大小无法容纳到内存中时。
Ray Data 提供了一种称为基于推送的打乱的替代打乱实现,用于提高大规模性能。如果您的数据集包含超过 1000 个块或大于 1 TB,请尝试此选项。
要在本地或集群上试用此功能,您可以从 Ray 为 Dataset.random_shuffle
和 Dataset.sort
运行的每夜发布测试开始。为了了解您可以预期的性能,这里是在 20 台机器(AWS EC2 上的 m5.4xlarge 实例,每台 16 个 vCPU,64 GB RAM)上对 1-10 TB 数据进行 Dataset.random_shuffle
的一些运行时间结果。
要试用基于推送的打乱,请在运行应用程序时设置环境变量 RAY_DATA_PUSH_BASED_SHUFFLE=1
您还可以在程序执行期间通过设置 DataContext.use_push_based_shuffle
标志来指定打乱实现
大规模打乱可能需要一段时间才能完成。为了调试方便,打乱操作支持仅执行打乱的一部分,这样您可以更快地收集执行概要。下面是一个示例,展示如何将随机打乱操作限制为两个输出块
$ wget https://raw.githubusercontent.com/ray-project/ray/master/release/nightly_tests/dataset/sort.py
$ RAY_DATA_PUSH_BASED_SHUFFLE=1 python sort.py --num-partitions=10 --partition-size=1e7
# Dataset size: 10 partitions, 0.01GB partition size, 0.1GB total
# [dataset]: Run `pip install tqdm` to enable progress reporting.
# 2022-05-04 17:30:28,806 INFO push_based_shuffle.py:118 -- Using experimental push-based shuffle.
# Finished in 9.571171760559082
# ...
import ray
ctx = ray.data.DataContext.get_current()
ctx.use_push_based_shuffle = True
ds = (
ray.data.range(1000)
.random_shuffle()
)
本页内容
import ray
ctx = ray.data.DataContext.get_current()
ctx.set_config(
"debug_limit_shuffle_execution_to_num_blocks", 2
)
ds = (
ray.data.range(1000, override_num_blocks=10)
.random_shuffle()
.materialize()
)
print(ds.stats())
Operator 1 ReadRange->RandomShuffle: executed in 0.08s
Suboperator 0 ReadRange->RandomShuffleMap: 2/2 blocks executed
...