数据加载和预处理#
Ray Train 集成了 Ray Data,提供了一个高性能、可扩展的流式解决方案,用于加载和预处理大型数据集。主要优势包括:
流式数据加载和预处理,可扩展至 PB 级数据。
将繁重的数据预处理扩展到 CPU 节点,以避免 GPU 训练成为瓶颈。
自动快速故障恢复。
在分布式训练工作节点之间自动进行即时数据拆分。
有关 Ray Data 的更多详细信息,请参阅 Ray Data 文档。
注意
除了 Ray Data,您还可以继续使用框架原生的数据实用程序与 Ray Train 结合使用,例如 PyTorch Dataset、Hugging Face Dataset 和 Lightning DataModule。
在本指南中,我们将介绍如何在 Ray Train 脚本中集成 Ray Data,以及自定义数据摄取管道的各种方法。
快速入门#
安装 Ray Data 和 Ray Train
pip install -U "ray[data,train]"
数据摄取可以通过四个基本步骤来设置:
从输入数据创建 Ray Dataset。
对 Ray Dataset 应用预处理操作。
将预处理后的 Dataset 输入 Ray Train Trainer,Trainer 会在内部以流式方式将 Dataset 平均拆分到分布式训练工作节点上。
在训练函数中消费 Ray Dataset。
import torch
import ray
from ray import train
from ray.train import Checkpoint, ScalingConfig
from ray.train.torch import TorchTrainer
# Set this to True to use GPU.
# If False, do CPU training instead of GPU training.
use_gpu = False
# Step 1: Create a Ray Dataset from in-memory Python lists.
# You can also create a Ray Dataset from many other sources and file
# formats.
train_dataset = ray.data.from_items([{"x": [x], "y": [2 * x]} for x in range(200)])
# Step 2: Preprocess your Ray Dataset.
def increment(batch):
batch["y"] = batch["y"] + 1
return batch
train_dataset = train_dataset.map_batches(increment)
def train_func():
batch_size = 16
# Step 4: Access the dataset shard for the training worker via
# ``get_dataset_shard``.
train_data_shard = train.get_dataset_shard("train")
# `iter_torch_batches` returns an iterable object that
# yield tensor batches. Ray Data automatically moves the Tensor batches
# to GPU if you enable GPU training.
train_dataloader = train_data_shard.iter_torch_batches(
batch_size=batch_size, dtypes=torch.float32
)
for epoch_idx in range(1):
for batch in train_dataloader:
inputs, labels = batch["x"], batch["y"]
assert type(inputs) == torch.Tensor
assert type(labels) == torch.Tensor
assert inputs.shape[0] == batch_size
assert labels.shape[0] == batch_size
# Only check one batch for demo purposes.
# Replace the above with your actual model training code.
break
# Step 3: Create a TorchTrainer. Specify the number of training workers and
# pass in your Ray Dataset.
# The Ray Dataset is automatically split across all training workers.
trainer = TorchTrainer(
train_func,
datasets={"train": train_dataset},
scaling_config=ScalingConfig(num_workers=2, use_gpu=use_gpu)
)
result = trainer.fit()
from ray import train
# Create the train and validation datasets.
train_data = ray.data.read_csv("./train.csv")
val_data = ray.data.read_csv("./validation.csv")
def train_func_per_worker():
# Access Ray datsets in your train_func via ``get_dataset_shard``.
# Ray Data shards all datasets across workers by default.
train_ds = train.get_dataset_shard("train")
val_ds = train.get_dataset_shard("validation")
# Create Ray dataset iterables via ``iter_torch_batches``.
train_dataloader = train_ds.iter_torch_batches(batch_size=16)
val_dataloader = val_ds.iter_torch_batches(batch_size=16)
...
trainer = pl.Trainer(
# ...
)
# Feed the Ray dataset iterables to ``pl.Trainer.fit``.
trainer.fit(
model,
train_dataloaders=train_dataloader,
val_dataloaders=val_dataloader
)
trainer = TorchTrainer(
train_func,
# You can pass in multiple datasets to the Trainer.
datasets={"train": train_data, "validation": val_data},
scaling_config=ScalingConfig(num_workers=4),
)
trainer.fit()
import ray
import ray.train
from huggingface_hub import HfFileSystem
...
# Create the train and evaluation datasets using HfFileSystem.
fs = HfFileSystem()
train_data = ray.data.read_parquet("hf://datasets/your-dataset/train/", filesystem=fs)
eval_data = ray.data.read_parquet("hf://datasets/your-dataset/validation/", filesystem=fs)
def train_func():
# Access Ray datsets in your train_func via ``get_dataset_shard``.
# Ray Data shards all datasets across workers by default.
train_ds = ray.train.get_dataset_shard("train")
eval_ds = ray.train.get_dataset_shard("evaluation")
# Create Ray dataset iterables via ``iter_torch_batches``.
train_iterable_ds = train_ds.iter_torch_batches(batch_size=16)
eval_iterable_ds = eval_ds.iter_torch_batches(batch_size=16)
...
args = transformers.TrainingArguments(
...,
max_steps=max_steps # Required for iterable datasets
)
trainer = transformers.Trainer(
...,
model=model,
train_dataset=train_iterable_ds,
eval_dataset=eval_iterable_ds,
)
# Prepare your Transformers Trainer
trainer = ray.train.huggingface.transformers.prepare_trainer(trainer)
trainer.train()
trainer = TorchTrainer(
train_func,
# You can pass in multiple datasets to the Trainer.
datasets={"train": train_data, "evaluation": val_data},
scaling_config=ScalingConfig(num_workers=4, use_gpu=True),
)
trainer.fit()
加载数据#
Ray Datasets 可以从多种不同的数据源和格式创建。有关更多详细信息,请参阅 加载数据。
预处理数据#
Ray Data 支持广泛的预处理操作,您可以在训练前使用它们来转换数据。
有关通用预处理,请参阅 转换数据。
有关表格数据,请参阅 预处理结构化数据。
有关 PyTorch 张量,请参阅 使用 torch 张量进行转换。
有关优化昂贵的预处理操作,请参阅 缓存预处理后的数据集。
输入和拆分数据#
可以通过 datasets 参数将预处理后的数据集传递给 Ray Train Trainer(例如 TorchTrainer)。
传递给 Trainer 的 datasets 的数据集可以通过调用 ray.train.get_dataset_shard() 在每个分布式训练工作节点上运行的 train_loop_per_worker 中访问。
Ray Data 默认会跨工作节点拆分所有数据集。get_dataset_shard() 返回数据集中 1/n 的部分,其中 n 是训练工作节点的数量。
Ray Data 以流式方式即时进行数据拆分。
注意
请注意,由于 Ray Data 会拆分评估数据集,因此您需要将评估结果汇总到所有工作节点。您可以考虑使用 TorchMetrics(示例)或其他框架提供的实用程序。
可以通过传递 dataset_config 参数来覆盖此行为。有关配置拆分逻辑的更多信息,请参阅 拆分数据集。
消费数据#
在 train_loop_per_worker 内部,每个工作节点可以通过 ray.train.get_dataset_shard() 访问其数据集的分片。
可以通过多种方式消费这些数据:
要创建通用的批次可迭代对象,您可以调用
iter_batches()。要创建 PyTorch DataLoader 的替代品,您可以调用
iter_torch_batches()。
有关如何迭代数据的更多详细信息,请参阅 迭代数据。
从 PyTorch 数据开始#
某些框架提供了自己的数据集和数据加载实用程序。例如:
PyTorch: Dataset & DataLoader
Hugging Face: Dataset
PyTorch Lightning: LightningDataModule
您仍然可以直接将这些框架数据实用程序与 Ray Train 一起使用。
总的来说,您可以将这些概念进行如下比较:
PyTorch API |
HuggingFace API |
Ray Data API |
|---|---|---|
n/a |
有关更多详细信息,请参阅每个框架的以下部分:
选项 1(使用 Ray Data)
将您的 PyTorch Dataset 转换为 Ray Dataset。
通过
datasets参数将 Ray Dataset 传递给 TorchTrainer。在您的
train_loop_per_worker中,您可以通过ray.train.get_dataset_shard()访问数据集。通过
ray.data.DataIterator.iter_torch_batches()创建数据集可迭代对象。
有关更多详细信息,请参阅 从 PyTorch Datasets 和 DataLoaders 迁移。
选项 2(不使用 Ray Data)
直接在
train_loop_per_worker中实例化 Torch Dataset 和 DataLoader。使用
ray.train.torch.prepare_data_loader()实用程序为分布式训练设置 DataLoader。
LightningDataModule 是使用 PyTorch Datasets 和 DataLoaders 创建的。您可以在此处应用相同的逻辑。
选项 1(使用 Ray Data)
将您的 Hugging Face Dataset 转换为 Ray Dataset。有关说明,请参阅 Ray Data for Hugging Face。
通过
datasets参数将 Ray Dataset 传递给 TorchTrainer。在您的
train_loop_per_worker中,通过ray.train.get_dataset_shard()访问分片数据集。通过
ray.data.DataIterator.iter_torch_batches()创建可迭代数据集。在初始化
transformers.Trainer时传递可迭代数据集。使用
ray.train.huggingface.transformers.prepare_trainer()实用程序包装您的 transformers trainer。
选项 2(不使用 Ray Data)
在
train_loop_per_worker中直接实例化 Hugging Face Dataset。在初始化期间将 Hugging Face Dataset 传递给
transformers.Trainer。
提示
当不使用 Ray Data 直接使用 Torch 或 Hugging Face Datasets 时,请确保在 train_loop_per_worker *内部* 实例化您的 Dataset。在 train_loop_per_worker 外部实例化 Dataset 并通过全局范围传递它可能会导致错误,因为 Dataset 无法序列化。
注意
使用具有多个工作节点的 PyTorch DataLoader 时,应将进程启动方法设置为 forkserver 或 spawn。 Forking Ray Actors and Tasks is an anti-pattern 可能会导致意外问题,例如死锁。
data_loader = DataLoader(
dataset,
num_workers=2,
multiprocessing_context=multiprocessing.get_context("forkserver"),
...
)
拆分数据集#
默认情况下,Ray Train 使用 Dataset.streaming_split 将所有数据集拆分到工作节点。每个工作节点看到的是数据的不同子集,而不是迭代整个数据集。
如果您想自定义要拆分的数据集,请将 DataConfig 传递给 Trainer 构造函数。
例如,要仅拆分训练数据集,请执行以下操作:
import ray
from ray import train
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer
ds = ray.data.read_text(
"s3://anonymous@ray-example-data/sms_spam_collection_subset.txt"
)
train_ds, val_ds = ds.train_test_split(0.3)
def train_loop_per_worker():
# Get the sharded training dataset
train_ds = train.get_dataset_shard("train")
for _ in range(2):
for batch in train_ds.iter_batches(batch_size=128):
print("Do some training on batch", batch)
# Get the unsharded full validation dataset
val_ds = train.get_dataset_shard("val")
for _ in range(2):
for batch in val_ds.iter_batches(batch_size=128):
print("Do some evaluation on batch", batch)
my_trainer = TorchTrainer(
train_loop_per_worker,
scaling_config=ScalingConfig(num_workers=2),
datasets={"train": train_ds, "val": val_ds},
dataset_config=ray.train.DataConfig(
datasets_to_split=["train"],
),
)
my_trainer.fit()
完全自定义(高级)#
对于默认配置类未涵盖的用例,您还可以完全自定义输入数据集的拆分方式。定义一个自定义的 DataConfig 类(DeveloperAPI)。 DataConfig 类负责在节点之间进行数据共享设置和拆分。
# Note that this example class is doing the same thing as the basic DataConfig
# implementation included with Ray Train.
from typing import Optional, Dict, List
import ray
from ray import train
from ray.train.torch import TorchTrainer
from ray.train import DataConfig, ScalingConfig
from ray.data import Dataset, DataIterator, NodeIdStr
from ray.actor import ActorHandle
ds = ray.data.read_text(
"s3://anonymous@ray-example-data/sms_spam_collection_subset.txt"
)
def train_loop_per_worker():
# Get an iterator to the dataset we passed in below.
it = train.get_dataset_shard("train")
for _ in range(2):
for batch in it.iter_batches(batch_size=128):
print("Do some training on batch", batch)
class MyCustomDataConfig(DataConfig):
def configure(
self,
datasets: Dict[str, Dataset],
world_size: int,
worker_handles: Optional[List[ActorHandle]],
worker_node_ids: Optional[List[NodeIdStr]],
**kwargs,
) -> List[Dict[str, DataIterator]]:
assert len(datasets) == 1, "This example only handles the simple case"
# Configure Ray Data for ingest.
ctx = ray.data.DataContext.get_current()
ctx.execution_options = DataConfig.default_ingest_options()
# Split the stream into shards.
iterator_shards = datasets["train"].streaming_split(
world_size, equal=True, locality_hints=worker_node_ids
)
# Return the assigned iterators for each worker.
return [{"train": it} for it in iterator_shards]
my_trainer = TorchTrainer(
train_loop_per_worker,
scaling_config=ScalingConfig(num_workers=2),
datasets={"train": ds},
dataset_config=MyCustomDataConfig(),
)
my_trainer.fit()
子类必须是可序列化的,因为 Ray Train 会将其从驱动程序脚本复制到 Trainer 的驱动程序 actor。Ray Train 会在其 Trainer 组的主 actor 上调用其 configure 方法来为每个工作节点创建数据迭代器。
一般来说,您可以使用 DataConfig 来在工作节点开始迭代数据之前进行任何必要的共享设置。设置在每个 Trainer 运行开始时进行。
随机打乱#
为每个 epoch 随机打乱数据对于模型质量可能很重要,具体取决于您训练的模型。
Ray Data 提供了多种随机打乱选项,有关更多详细信息,请参阅 打乱数据。
启用可重复性#
在开发或进行模型超参数调优时,数据摄取的可重复性很重要,这样数据摄取就不会影响模型质量。请遵循以下三个步骤来启用可重复性:
步骤 1: 通过在 DataContext 中设置 preserve_order 标志,在 Ray Datasets 中启用确定性执行。
import ray
# Preserve ordering in Ray Datasets for reproducibility.
ctx = ray.data.DataContext.get_current()
ctx.execution_options.preserve_order = True
ds = ray.data.read_text(
"s3://anonymous@ray-example-data/sms_spam_collection_subset.txt"
)
步骤 2: 为任何打乱操作设置种子
seed参数传递给random_shuffleseed参数传递给randomize_block_orderlocal_shuffle_seed参数传递给iter_batches
步骤 3: 遵循您选择的训练框架启用可重复性的最佳实践。例如,请参阅 Pytorch 可重复性指南。
预处理结构化数据#
注意
本节适用于表格/结构化数据。预处理非结构化数据的推荐方法是使用 Ray Data 操作,例如 map_batches。有关更多详细信息,请参阅 Ray Data Working with Pytorch 指南。
对于表格数据,请使用 Ray Data 预处理器,它们实现了常见的数据预处理操作。您可以通过在将 Dataset 传递给 Trainer 之前对其应用这些预处理器来将其与 Ray Train Trainer 一起使用。例如:
import base64
import numpy as np
from tempfile import TemporaryDirectory
import ray
from ray import train
from ray.train import Checkpoint, ScalingConfig
from ray.train.torch import TorchTrainer
from ray.data.preprocessors import Concatenator, StandardScaler
dataset = ray.data.read_csv("s3://anonymous@air-example-data/breast_cancer.csv")
# Create preprocessors to scale some columns and concatenate the results.
scaler = StandardScaler(columns=["mean radius", "mean texture"])
columns_to_concatenate = dataset.columns()
columns_to_concatenate.remove("target")
concatenator = Concatenator(columns=columns_to_concatenate, dtype=np.float32)
# Compute dataset statistics and get transformed datasets. Note that the
# fit call is executed immediately, but the transformation is lazy.
dataset = scaler.fit_transform(dataset)
dataset = concatenator.fit_transform(dataset)
def train_loop_per_worker():
context = train.get_context()
print(context.get_metadata()) # prints {"preprocessor_pkl": ...}
# Get an iterator to the dataset we passed in below.
it = train.get_dataset_shard("train")
for _ in range(2):
# Prefetch 10 batches at a time.
for batch in it.iter_batches(batch_size=128, prefetch_batches=10):
print("Do some training on batch", batch)
# Save a checkpoint.
with TemporaryDirectory() as temp_dir:
train.report(
{"score": 2.0},
checkpoint=Checkpoint.from_directory(temp_dir),
)
# Serialize the preprocessor. Since serialize() returns bytes,
# convert to base64 string for JSON compatibility.
serialized_preprocessor = base64.b64encode(scaler.serialize()).decode("ascii")
my_trainer = TorchTrainer(
train_loop_per_worker,
scaling_config=ScalingConfig(num_workers=2),
datasets={"train": dataset},
metadata={"preprocessor_pkl": serialized_preprocessor},
)
# Get the fitted preprocessor back from the result metadata.
metadata = my_trainer.fit().checkpoint.get_metadata()
# Decode from base64 before deserializing
serialized_data = base64.b64decode(metadata["preprocessor_pkl"])
print(StandardScaler.deserialize(serialized_data))
此示例使用 Trainer(metadata={...}) 构造函数参数来持久化已拟合的预处理器。此参数指定一个字典,该字典可从 TrainContext.get_metadata() 和 checkpoint.get_metadata() 获取,用于 Trainer 保存的检查点。此设计支持为推理重新创建已拟合的预处理器。
性能提示#
预取批次#
在迭代数据集进行训练时,您可以增加 iter_batches 或 iter_torch_batches 中的 prefetch_batches 以进一步提高性能。在训练当前批次的同时,此方法会启动后台线程来获取和处理接下来的 N 个批次。
当训练受限于跨节点数据传输或最后一英里预处理(如转换为张量或执行 collate_fn)时,此方法会有帮助。然而,增加 prefetch_batches 会导致需要保存在堆内存中的数据量增加。默认情况下,prefetch_batches 设置为 1。
例如,以下代码为每个训练工作节点一次预取 10 个批次:
import ray
from ray import train
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer
ds = ray.data.read_text(
"s3://anonymous@ray-example-data/sms_spam_collection_subset.txt"
)
def train_loop_per_worker():
# Get an iterator to the dataset we passed in below.
it = train.get_dataset_shard("train")
for _ in range(2):
# Prefetch 10 batches at a time.
for batch in it.iter_batches(batch_size=128, prefetch_batches=10):
print("Do some training on batch", batch)
my_trainer = TorchTrainer(
train_loop_per_worker,
scaling_config=ScalingConfig(num_workers=2),
datasets={"train": ds},
)
my_trainer.fit()
避免在 collate_fn 中进行繁重的转换#
iter_batches 或 iter_torch_batches 中的 collate_fn 参数允许您在将数据馈送给模型之前对其进行转换。此操作在训练工作节点本地执行。避免在此函数中添加繁重的转换,因为它可能会成为瓶颈。相反,请在将 Dataset 传递给 Trainer 之前,使用 map 或 map_batches 应用转换。当您昂贵的转换需要 batch_size 作为输入时,例如文本分词,您可以 将其扩展到 Ray Data 以获得更好的性能。
缓存预处理后的数据集#
如果预处理后的 Dataset 足够小,可以放入 Ray 对象存储内存(默认值为集群总 RAM 的 30%),则通过在预处理后的数据集上调用 materialize() 来物化预处理后的数据集到 Ray 内置对象存储中。此方法告诉 Ray Data 计算整个预处理过程并将其固定在 Ray 对象存储内存中。因此,当重复迭代数据集时,不需要重新运行预处理操作。但是,如果预处理后的数据过大,无法放入 Ray 对象存储内存,此方法将大大降低性能,因为数据需要从磁盘溢出和读回。
需要每个 epoch 运行的转换,例如随机化,应该放在 materialize 调用之后。
from typing import Dict
import numpy as np
import ray
# Load the data.
train_ds = ray.data.read_parquet("s3://anonymous@ray-example-data/iris.parquet")
# Define a preprocessing function.
def normalize_length(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
new_col = batch["sepal.length"] / np.max(batch["sepal.length"])
batch["normalized.sepal.length"] = new_col
del batch["sepal.length"]
return batch
# Preprocess the data. Transformations that are made before the materialize call
# below are only run once.
train_ds = train_ds.map_batches(normalize_length)
# Materialize the dataset in object store memory.
# Only do this if train_ds is small enough to fit in object store memory.
train_ds = train_ds.materialize()
# Dummy augmentation transform.
def augment_data(batch):
return batch
# Add per-epoch preprocessing. Transformations that you want to run per-epoch, such
# as data augmentation or randomization, should go after the materialize call.
train_ds = train_ds.map_batches(augment_data)
# Pass train_ds to the Trainer
向集群添加仅 CPU 节点#
如果 GPU 训练受限于昂贵的 CPU 预处理,并且预处理后的 Dataset 过大无法放入对象存储内存,那么物化数据集将不起作用。在这种情况下,Ray 对异构资源的原生支持使您只需向集群添加更多仅 CPU 节点,Ray Data 就会自动将仅 CPU 预处理任务扩展到仅 CPU 节点,从而使 GPU 更饱和。
总的来说,添加仅 CPU 节点可以通过两种方式提供帮助:* 添加更多 CPU 核心有助于进一步并行化预处理。当 CPU 计算时间是瓶颈时,此方法很有帮助。* 增加对象存储内存,这 1)允许 Ray Data 在预处理和训练阶段之间缓冲更多数据,并且 2)提供更多内存以便可以 缓存预处理后的数据集。当内存是瓶颈时,此方法很有帮助。