使用 Horovod 开始分布式训练#
Ray Train 为您配置 Horovod 环境和 Rendezvous 服务器,以便您运行 DistributedOptimizer
训练脚本。更多信息请参阅 Horovod 文档。
快速入门#
import os
import tempfile
import horovod.torch as hvd
import ray
from ray import train
from ray.train import Checkpoint, ScalingConfig
import ray.train.torch # Need this to use `train.torch.get_device()`
from ray.train.horovod import HorovodTrainer
import torch
import torch.nn as nn
# If using GPUs, set this to True.
use_gpu = False
input_size = 1
layer_size = 15
output_size = 1
num_epochs = 3
class NeuralNetwork(nn.Module):
def __init__(self):
super(NeuralNetwork, self).__init__()
self.layer1 = nn.Linear(input_size, layer_size)
self.relu = nn.ReLU()
self.layer2 = nn.Linear(layer_size, output_size)
def forward(self, input):
return self.layer2(self.relu(self.layer1(input)))
def train_loop_per_worker():
hvd.init()
dataset_shard = train.get_dataset_shard("train")
model = NeuralNetwork()
device = train.torch.get_device()
model.to(device)
loss_fn = nn.MSELoss()
lr_scaler = 1
optimizer = torch.optim.SGD(model.parameters(), lr=0.1 * lr_scaler)
# Horovod: wrap optimizer with DistributedOptimizer.
optimizer = hvd.DistributedOptimizer(
optimizer,
named_parameters=model.named_parameters(),
op=hvd.Average,
)
for epoch in range(num_epochs):
model.train()
for batch in dataset_shard.iter_torch_batches(
batch_size=32, dtypes=torch.float
):
inputs, labels = torch.unsqueeze(batch["x"], 1), batch["y"]
outputs = model(inputs)
loss = loss_fn(outputs, labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
print(f"epoch: {epoch}, loss: {loss.item()}")
with tempfile.TemporaryDirectory() as tmpdir:
torch.save(model.state_dict(), os.path.join(tmpdir, "model.pt"))
train.report(
{"loss": loss.item()}, checkpoint=Checkpoint.from_directory(tmpdir)
)
train_dataset = ray.data.from_items([{"x": x, "y": x + 1} for x in range(32)])
scaling_config = ScalingConfig(num_workers=3, use_gpu=use_gpu)
trainer = HorovodTrainer(
train_loop_per_worker=train_loop_per_worker,
scaling_config=scaling_config,
datasets={"train": train_dataset},
)
result = trainer.fit()
更新您的训练函数#
首先,更新您的训练函数以支持分布式训练。
如果您已有训练函数可以与 Horovod Ray Executor 一起运行,则无需进行任何额外更改。
要开始使用 Horovod,请访问 Horovod 指南。
创建 HorovodTrainer#
Trainer
是 Ray Train 中用于管理状态和执行训练的主要类。对于 Horovod,请使用HorovodTrainer
,您可以像这样进行设置
from ray.train import ScalingConfig
from ray.train.horovod import HorovodTrainer
# For GPU Training, set `use_gpu` to True.
use_gpu = False
trainer = HorovodTrainer(
train_func,
scaling_config=ScalingConfig(use_gpu=use_gpu, num_workers=2)
)
使用 Horovod 进行训练时,无论使用何种训练框架(例如 PyTorch 或 TensorFlow),都应始终使用 HorovodTrainer。
要自定义后端设置,可以传递一个 HorovodConfig
from ray.train import ScalingConfig
from ray.train.horovod import HorovodTrainer, HorovodConfig
trainer = HorovodTrainer(
train_func,
tensorflow_backend=HorovodConfig(...),
scaling_config=ScalingConfig(num_workers=2),
)
有关更多配置选项,请参阅 DataParallelTrainer
API。
运行训练函数#
拥有分布式训练函数和 Ray Train Trainer
后,您现在就可以开始训练了。
trainer.fit()
进一步阅读#
Ray Train 的 HorovodTrainer
用自己的实现替换了原生库的分布式通信后端。因此,其余的集成点保持不变。如果您将 Horovod 与 PyTorch 或 Tensorflow 一起使用,请参阅相应的指南以获取更多配置和信息。
如果您在不使用任何训练库的情况下实现自己的基于 Horovod 的训练例程,请通读用户指南,因为您可以将大部分内容应用于通用用例并轻松进行调整。