使用 Horovod 开始分布式训练#

Ray Train 为您配置 Horovod 环境和 Rendezvous 服务器,以便您运行 DistributedOptimizer 训练脚本。更多信息请参阅 Horovod 文档

快速入门#

import os
import tempfile

import horovod.torch as hvd
import ray
from ray import train
from ray.train import Checkpoint, ScalingConfig
import ray.train.torch  # Need this to use `train.torch.get_device()`
from ray.train.horovod import HorovodTrainer
import torch
import torch.nn as nn

# If using GPUs, set this to True.
use_gpu = False


input_size = 1
layer_size = 15
output_size = 1
num_epochs = 3


class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.layer1 = nn.Linear(input_size, layer_size)
        self.relu = nn.ReLU()
        self.layer2 = nn.Linear(layer_size, output_size)

    def forward(self, input):
        return self.layer2(self.relu(self.layer1(input)))


def train_loop_per_worker():
    hvd.init()
    dataset_shard = train.get_dataset_shard("train")
    model = NeuralNetwork()
    device = train.torch.get_device()
    model.to(device)
    loss_fn = nn.MSELoss()
    lr_scaler = 1
    optimizer = torch.optim.SGD(model.parameters(), lr=0.1 * lr_scaler)
    # Horovod: wrap optimizer with DistributedOptimizer.
    optimizer = hvd.DistributedOptimizer(
        optimizer,
        named_parameters=model.named_parameters(),
        op=hvd.Average,
    )
    for epoch in range(num_epochs):
        model.train()
        for batch in dataset_shard.iter_torch_batches(
            batch_size=32, dtypes=torch.float
        ):
            inputs, labels = torch.unsqueeze(batch["x"], 1), batch["y"]
            outputs = model(inputs)
            loss = loss_fn(outputs, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            print(f"epoch: {epoch}, loss: {loss.item()}")

        with tempfile.TemporaryDirectory() as tmpdir:
            torch.save(model.state_dict(), os.path.join(tmpdir, "model.pt"))
            train.report(
                {"loss": loss.item()}, checkpoint=Checkpoint.from_directory(tmpdir)
            )


train_dataset = ray.data.from_items([{"x": x, "y": x + 1} for x in range(32)])
scaling_config = ScalingConfig(num_workers=3, use_gpu=use_gpu)
trainer = HorovodTrainer(
    train_loop_per_worker=train_loop_per_worker,
    scaling_config=scaling_config,
    datasets={"train": train_dataset},
)
result = trainer.fit()

更新您的训练函数#

首先,更新您的训练函数以支持分布式训练。

如果您已有训练函数可以与 Horovod Ray Executor 一起运行,则无需进行任何额外更改。

要开始使用 Horovod,请访问 Horovod 指南

创建 HorovodTrainer#

Trainer 是 Ray Train 中用于管理状态和执行训练的主要类。对于 Horovod,请使用HorovodTrainer,您可以像这样进行设置

from ray.train import ScalingConfig
from ray.train.horovod import HorovodTrainer
# For GPU Training, set `use_gpu` to True.
use_gpu = False
trainer = HorovodTrainer(
    train_func,
    scaling_config=ScalingConfig(use_gpu=use_gpu, num_workers=2)
)

使用 Horovod 进行训练时,无论使用何种训练框架(例如 PyTorch 或 TensorFlow),都应始终使用 HorovodTrainer。

要自定义后端设置,可以传递一个 HorovodConfig

from ray.train import ScalingConfig
from ray.train.horovod import HorovodTrainer, HorovodConfig

trainer = HorovodTrainer(
    train_func,
    tensorflow_backend=HorovodConfig(...),
    scaling_config=ScalingConfig(num_workers=2),
)

有关更多配置选项,请参阅 DataParallelTrainer API。

运行训练函数#

拥有分布式训练函数和 Ray Train Trainer 后,您现在就可以开始训练了。

trainer.fit()

进一步阅读#

Ray Train 的 HorovodTrainer 用自己的实现替换了原生库的分布式通信后端。因此,其余的集成点保持不变。如果您将 Horovod 与 PyTorchTensorflow 一起使用,请参阅相应的指南以获取更多配置和信息。

如果您在不使用任何训练库的情况下实现自己的基于 Horovod 的训练例程,请通读用户指南,因为您可以将大部分内容应用于通用用例并轻松进行调整。