使用 PyTorch 进行分布式训练入门#
本教程将引导您完成将现有 PyTorch 脚本转换为使用 Ray Train 的过程。
了解如何
配置模型以在正确的 CPU/GPU 设备上进行分布式运行。
配置数据加载器以在 工作节点 之间分片数据,并将数据放在正确的 CPU 或 GPU 设备上。
配置 训练函数 以报告指标和保存检查点。
为训练作业配置 缩放 以及 CPU 或 GPU 资源需求。
使用
TorchTrainer类启动分布式训练作业。
快速入门#
作为参考,最终代码将如下所示
from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig
def train_func():
# Your PyTorch training code here.
...
scaling_config = ScalingConfig(num_workers=2, use_gpu=True)
trainer = TorchTrainer(train_func, scaling_config=scaling_config)
result = trainer.fit()
train_func是在每个分布式训练工作节点上执行的 Python 代码。ScalingConfig定义了分布式训练工作节点的数量以及是否使用 GPU。TorchTrainer启动分布式训练作业。
比较使用 Ray Train 前后的 PyTorch 训练脚本。
import os
import tempfile
import torch
from torch.nn import CrossEntropyLoss
from torch.optim import Adam
from torch.utils.data import DataLoader
from torchvision.models import resnet18
from torchvision.datasets import FashionMNIST
from torchvision.transforms import ToTensor, Normalize, Compose
import ray.train.torch
def train_func():
# Model, Loss, Optimizer
model = resnet18(num_classes=10)
model.conv1 = torch.nn.Conv2d(
1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False
)
# [1] Prepare model.
model = ray.train.torch.prepare_model(model)
# model.to("cuda") # This is done by `prepare_model`
criterion = CrossEntropyLoss()
optimizer = Adam(model.parameters(), lr=0.001)
# Data
transform = Compose([ToTensor(), Normalize((0.28604,), (0.32025,))])
data_dir = os.path.join(tempfile.gettempdir(), "data")
train_data = FashionMNIST(root=data_dir, train=True, download=True, transform=transform)
train_loader = DataLoader(train_data, batch_size=128, shuffle=True)
# [2] Prepare dataloader.
train_loader = ray.train.torch.prepare_data_loader(train_loader)
# Training
for epoch in range(10):
if ray.train.get_context().get_world_size() > 1:
train_loader.sampler.set_epoch(epoch)
for images, labels in train_loader:
# This is done by `prepare_data_loader`!
# images, labels = images.to("cuda"), labels.to("cuda")
outputs = model(images)
loss = criterion(outputs, labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
# [3] Report metrics and checkpoint.
metrics = {"loss": loss.item(), "epoch": epoch}
with tempfile.TemporaryDirectory() as temp_checkpoint_dir:
torch.save(
model.module.state_dict(),
os.path.join(temp_checkpoint_dir, "model.pt")
)
ray.train.report(
metrics,
checkpoint=ray.train.Checkpoint.from_directory(temp_checkpoint_dir),
)
if ray.train.get_context().get_world_rank() == 0:
print(metrics)
# [4] Configure scaling and resource requirements.
scaling_config = ray.train.ScalingConfig(num_workers=2, use_gpu=True)
# [5] Launch distributed training job.
trainer = ray.train.torch.TorchTrainer(
train_func,
scaling_config=scaling_config,
# [5a] If running in a multi-node cluster, this is where you
# should configure the run's persistent storage that is accessible
# across all worker nodes.
# run_config=ray.train.RunConfig(storage_path="s3://..."),
)
result = trainer.fit()
# [6] Load the trained model.
with result.checkpoint.as_directory() as checkpoint_dir:
model_state_dict = torch.load(os.path.join(checkpoint_dir, "model.pt"))
model = resnet18(num_classes=10)
model.conv1 = torch.nn.Conv2d(
1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False
)
model.load_state_dict(model_state_dict)
import os
import tempfile
import torch
from torch.nn import CrossEntropyLoss
from torch.optim import Adam
from torch.utils.data import DataLoader
from torchvision.models import resnet18
from torchvision.datasets import FashionMNIST
from torchvision.transforms import ToTensor, Normalize, Compose
# Model, Loss, Optimizer
model = resnet18(num_classes=10)
model.conv1 = torch.nn.Conv2d(
1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False
)
model.to("cuda")
criterion = CrossEntropyLoss()
optimizer = Adam(model.parameters(), lr=0.001)
# Data
transform = Compose([ToTensor(), Normalize((0.28604,), (0.32025,))])
train_data = FashionMNIST(root='./data', train=True, download=True, transform=transform)
train_loader = DataLoader(train_data, batch_size=128, shuffle=True)
# Training
for epoch in range(10):
for images, labels in train_loader:
images, labels = images.to("cuda"), labels.to("cuda")
outputs = model(images)
loss = criterion(outputs, labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
metrics = {"loss": loss.item(), "epoch": epoch}
checkpoint_dir = tempfile.mkdtemp()
checkpoint_path = os.path.join(checkpoint_dir, "model.pt")
torch.save(model.state_dict(), checkpoint_path)
print(metrics)
设置训练函数#
首先,更新您的训练代码以支持分布式训练。首先将您的代码封装在一个 训练函数 中
def train_func():
# Your model training code here.
...
每个分布式训练工作节点都会执行此函数。
您也可以通过 Trainer 的 train_loop_config 将 train_func 的输入参数指定为字典。例如
def train_func(config):
lr = config["lr"]
num_epochs = config["num_epochs"]
config = {"lr": 1e-4, "num_epochs": 10}
trainer = ray.train.torch.TorchTrainer(train_func, train_loop_config=config, ...)
警告
为避免通过 train_loop_config 传递大型数据对象以减少序列化和反序列化开销。相反,更推荐在 train_func 中直接初始化大型对象(例如数据集、模型)。
def load_dataset():
# Return a large in-memory dataset
...
def load_model():
# Return a large in-memory model instance
...
-config = {"data": load_dataset(), "model": load_model()}
def train_func(config):
- data = config["data"]
- model = config["model"]
+ data = load_dataset()
+ model = load_model()
...
trainer = ray.train.torch.TorchTrainer(train_func, train_loop_config=config, ...)
设置模型#
使用 ray.train.torch.prepare_model() 实用函数来
将您的模型移动到正确的设备。
将其封装在
DistributedDataParallel中。
-from torch.nn.parallel import DistributedDataParallel
+import ray.train.torch
def train_func():
...
# Create model.
model = ...
# Set up distributed training and device placement.
- device_id = ... # Your logic to get the right device.
- model = model.to(device_id or "cpu")
- model = DistributedDataParallel(model, device_ids=[device_id])
+ model = ray.train.torch.prepare_model(model)
...
设置数据集#
使用 ray.train.torch.prepare_data_loader() 实用函数,该函数
将
DistributedSampler添加到您的DataLoader中。将批次移动到正确的设备。
请注意,如果您将 Ray Data 传递给 Trainer,则此步骤不是必需的。请参阅 数据加载和预处理。
from torch.utils.data import DataLoader
+import ray.train.torch
def train_func():
...
dataset = ...
data_loader = DataLoader(dataset, batch_size=worker_batch_size, shuffle=True)
+ data_loader = ray.train.torch.prepare_data_loader(data_loader)
for epoch in range(10):
+ if ray.train.get_context().get_world_size() > 1:
+ data_loader.sampler.set_epoch(epoch)
for X, y in data_loader:
- X = X.to_device(device)
- y = y.to_device(device)
...
提示
请记住,DataLoader 接受一个 batch_size,即每个工作节点的批次大小。可以使用以下公式计算全局批次大小(反之亦然):
global_batch_size = worker_batch_size * ray.train.get_context().get_world_size()
注意
如果您已经使用 DistributedSampler 手动设置了 DataLoader,prepare_data_loader() 将不会添加另一个,并且将尊重现有采样器的配置。
注意
DistributedSampler 不适用于包装 IterableDataset 的 DataLoader。如果您想使用数据集迭代器,请考虑使用 Ray Data 而不是 PyTorch DataLoader,因为它为大规模数据集提供了高性能的流式数据摄入。
有关更多详细信息,请参阅 数据加载和预处理。
报告检查点和指标#
为了监控进度,您可以使用 ray.train.report() 实用函数来报告中间指标和检查点。
+import os
+import tempfile
+import ray.train
def train_func():
...
with tempfile.TemporaryDirectory() as temp_checkpoint_dir:
torch.save(
model.state_dict(), os.path.join(temp_checkpoint_dir, "model.pt")
)
+ metrics = {"loss": loss.item()} # Training/validation metrics.
# Build a Ray Train checkpoint from a directory
+ checkpoint = ray.train.Checkpoint.from_directory(temp_checkpoint_dir)
# Ray Train will automatically save the checkpoint to persistent storage,
# so the local `temp_checkpoint_dir` can be safely cleaned up after.
+ ray.train.report(metrics=metrics, checkpoint=checkpoint)
...
配置缩放和 GPU#
在训练函数之外,创建一个 ScalingConfig 对象来配置
num_workers- 分布式训练工作节点的数量。use_gpu- 每个工作节点是否应使用 GPU(或 CPU)。
from ray.train import ScalingConfig
scaling_config = ScalingConfig(num_workers=2, use_gpu=True)
有关更多详细信息,请参阅 配置缩放和 GPU。
配置持久存储#
创建一个 RunConfig 对象来指定结果(包括检查点和工件)将要保存的路径。
from ray.train import RunConfig
# Local path (/some/local/path/unique_run_name)
run_config = RunConfig(storage_path="/some/local/path", name="unique_run_name")
# Shared cloud storage URI (s3://bucket/unique_run_name)
run_config = RunConfig(storage_path="s3://bucket", name="unique_run_name")
# Shared NFS path (/mnt/nfs/unique_run_name)
run_config = RunConfig(storage_path="/mnt/nfs", name="unique_run_name")
警告
指定一个*共享存储位置*(如云存储或 NFS)对于单节点集群是*可选的*,但对于多节点集群是*必需的*。使用本地路径将在多节点集群的检查点过程中*引发错误*。
有关更多详细信息,请参阅 配置持久存储。
启动训练作业#
将所有这些结合起来,您现在可以使用 TorchTrainer 启动分布式训练作业。
from ray.train.torch import TorchTrainer
trainer = TorchTrainer(
train_func, scaling_config=scaling_config, run_config=run_config
)
result = trainer.fit()
访问训练结果#
训练完成后,将返回一个 Result 对象,其中包含有关训练运行的信息,包括训练期间报告的指标和检查点。
result.metrics # The metrics reported during training.
result.checkpoint # The latest checkpoint reported during training.
result.path # The path where logs are stored.
result.error # The exception that was raised, if training failed.
有关更多用法示例,请参阅 检查训练结果。
下一步#
在您将 PyTorch 训练脚本转换为使用 Ray Train 后