使用 PyTorch 开始分布式训练#

本教程将引导您完成将现有 PyTorch 脚本转换为使用 Ray Train 的过程。

了解如何

  1. 配置模型以分布式运行并运行在正确的 CPU/GPU 设备上。

  2. 配置数据加载器以将数据分片到 workers 并将数据放置在正确的 CPU 或 GPU 设备上。

  3. 配置一个 训练函数 来报告指标和保存检查点。

  4. 配置训练作业的 规模 以及 CPU 或 GPU 资源需求。

  5. 使用 TorchTrainer 类启动一个分布式训练作业。

快速入门#

作为参考,最终代码将看起来像下面这样

from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig

def train_func():
    # Your PyTorch training code here.
    ...

scaling_config = ScalingConfig(num_workers=2, use_gpu=True)
trainer = TorchTrainer(train_func, scaling_config=scaling_config)
result = trainer.fit()
  1. train_func 是在每个分布式训练 worker 上执行的 Python 代码。

  2. ScalingConfig 定义了分布式训练 worker 的数量以及是否使用 GPU。

  3. TorchTrainer 启动分布式训练作业。

比较使用和不使用 Ray Train 的 PyTorch 训练脚本。

import os
import tempfile

import torch
from torch.nn import CrossEntropyLoss
from torch.optim import Adam
from torch.utils.data import DataLoader
from torchvision.models import resnet18
from torchvision.datasets import FashionMNIST
from torchvision.transforms import ToTensor, Normalize, Compose

# Model, Loss, Optimizer
model = resnet18(num_classes=10)
model.conv1 = torch.nn.Conv2d(
    1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False
)
model.to("cuda")
criterion = CrossEntropyLoss()
optimizer = Adam(model.parameters(), lr=0.001)

# Data
transform = Compose([ToTensor(), Normalize((0.28604,), (0.32025,))])
train_data = FashionMNIST(root='./data', train=True, download=True, transform=transform)
train_loader = DataLoader(train_data, batch_size=128, shuffle=True)

# Training
for epoch in range(10):
    for images, labels in train_loader:
        images, labels = images.to("cuda"), labels.to("cuda")
        outputs = model(images)
        loss = criterion(outputs, labels)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    metrics = {"loss": loss.item(), "epoch": epoch}
    checkpoint_dir = tempfile.mkdtemp()
    checkpoint_path = os.path.join(checkpoint_dir, "model.pt")
    torch.save(model.state_dict(), checkpoint_path)
    print(metrics)
import os
import tempfile

import torch
from torch.nn import CrossEntropyLoss
from torch.optim import Adam
from torch.utils.data import DataLoader
from torchvision.models import resnet18
from torchvision.datasets import FashionMNIST
from torchvision.transforms import ToTensor, Normalize, Compose

import ray.train.torch

def train_func():
    # Model, Loss, Optimizer
    model = resnet18(num_classes=10)
    model.conv1 = torch.nn.Conv2d(
        1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False
    )
    # [1] Prepare model.
    model = ray.train.torch.prepare_model(model)
    # model.to("cuda")  # This is done by `prepare_model`
    criterion = CrossEntropyLoss()
    optimizer = Adam(model.parameters(), lr=0.001)

    # Data
    transform = Compose([ToTensor(), Normalize((0.28604,), (0.32025,))])
    data_dir = os.path.join(tempfile.gettempdir(), "data")
    train_data = FashionMNIST(root=data_dir, train=True, download=True, transform=transform)
    train_loader = DataLoader(train_data, batch_size=128, shuffle=True)
    # [2] Prepare dataloader.
    train_loader = ray.train.torch.prepare_data_loader(train_loader)

    # Training
    for epoch in range(10):
        if ray.train.get_context().get_world_size() > 1:
            train_loader.sampler.set_epoch(epoch)

        for images, labels in train_loader:
            # This is done by `prepare_data_loader`!
            # images, labels = images.to("cuda"), labels.to("cuda")
            outputs = model(images)
            loss = criterion(outputs, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        # [3] Report metrics and checkpoint.
        metrics = {"loss": loss.item(), "epoch": epoch}
        with tempfile.TemporaryDirectory() as temp_checkpoint_dir:
            torch.save(
                model.module.state_dict(),
                os.path.join(temp_checkpoint_dir, "model.pt")
            )
            ray.train.report(
                metrics,
                checkpoint=ray.train.Checkpoint.from_directory(temp_checkpoint_dir),
            )
        if ray.train.get_context().get_world_rank() == 0:
            print(metrics)

# [4] Configure scaling and resource requirements.
scaling_config = ray.train.ScalingConfig(num_workers=2, use_gpu=True)

# [5] Launch distributed training job.
trainer = ray.train.torch.TorchTrainer(
    train_func,
    scaling_config=scaling_config,
    # [5a] If running in a multi-node cluster, this is where you
    # should configure the run's persistent storage that is accessible
    # across all worker nodes.
    # run_config=ray.train.RunConfig(storage_path="s3://..."),
)
result = trainer.fit()

# [6] Load the trained model.
with result.checkpoint.as_directory() as checkpoint_dir:
    model_state_dict = torch.load(os.path.join(checkpoint_dir, "model.pt"))
    model = resnet18(num_classes=10)
    model.conv1 = torch.nn.Conv2d(
        1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False
    )
    model.load_state_dict(model_state_dict)

设置训练函数#

首先,更新您的训练代码以支持分布式训练。首先将您的代码包装在一个 训练函数

def train_func():
    # Your model training code here.
    ...

每个分布式训练 worker 都执行此函数。

您还可以通过 Trainer 的 train_loop_configtrain_func 的输入参数指定为一个字典。例如

def train_func(config):
    lr = config["lr"]
    num_epochs = config["num_epochs"]

config = {"lr": 1e-4, "num_epochs": 10}
trainer = ray.train.torch.TorchTrainer(train_func, train_loop_config=config, ...)

警告

避免通过 train_loop_config 传递大型数据对象,以减少序列化和反序列化开销。相反,更推荐直接在 train_func 中初始化大型对象(例如数据集、模型)。

 def load_dataset():
     # Return a large in-memory dataset
     ...

 def load_model():
     # Return a large in-memory model instance
     ...

-config = {"data": load_dataset(), "model": load_model()}

 def train_func(config):
-    data = config["data"]
-    model = config["model"]

+    data = load_dataset()
+    model = load_model()
     ...

 trainer = ray.train.torch.TorchTrainer(train_func, train_loop_config=config, ...)

设置模型#

使用 ray.train.torch.prepare_model() 实用函数来

  1. 将模型移动到正确的设备。

  2. 将其包装在 DistributedDataParallel 中。

-from torch.nn.parallel import DistributedDataParallel
+import ray.train.torch

 def train_func():

     ...

     # Create model.
     model = ...

     # Set up distributed training and device placement.
-    device_id = ... # Your logic to get the right device.
-    model = model.to(device_id or "cpu")
-    model = DistributedDataParallel(model, device_ids=[device_id])
+    model = ray.train.torch.prepare_model(model)

     ...

设置数据集#

使用 ray.train.torch.prepare_data_loader() 实用函数,它会

  1. 为您的 DataLoader 添加一个 DistributedSampler

  2. 将批次移动到正确的设备。

请注意,如果您将 Ray Data 传递给 Trainer,则不需要执行此步骤。参见数据加载和预处理

 from torch.utils.data import DataLoader
+import ray.train.torch

 def train_func():

     ...

     dataset = ...

     data_loader = DataLoader(dataset, batch_size=worker_batch_size, shuffle=True)
+    data_loader = ray.train.torch.prepare_data_loader(data_loader)

     for epoch in range(10):
+        if ray.train.get_context().get_world_size() > 1:
+            data_loader.sampler.set_epoch(epoch)

         for X, y in data_loader:
-            X = X.to_device(device)
-            y = y.to_device(device)

     ...

提示

请记住,DataLoader 接收一个 batch_size,这是每个 worker 的批量大小。可以使用以下公式根据 worker 批量大小(反之亦然)计算全局批量大小

global_batch_size = worker_batch_size * ray.train.get_context().get_world_size()

注意

如果您已经手动使用 DistributedSampler 设置了您的 DataLoaderprepare_data_loader() 不会再添加一个,并且会遵循现有采样器的配置。

注意

DistributedSampler 不适用于包装了 IterableDatasetDataLoader。如果您想使用数据集迭代器,可以考虑使用 Ray Data 代替 PyTorch DataLoader,因为它为大型数据集提供了高性能的流式数据摄取。

有关更多详细信息,请参阅数据加载和预处理

报告检查点和指标#

为了监控进度,您可以使用 ray.train.report() 实用函数报告中间指标和检查点。

+import os
+import tempfile

+import ray.train

 def train_func():

     ...

     with tempfile.TemporaryDirectory() as temp_checkpoint_dir:
        torch.save(
            model.state_dict(), os.path.join(temp_checkpoint_dir, "model.pt")
        )

+       metrics = {"loss": loss.item()}  # Training/validation metrics.

        # Build a Ray Train checkpoint from a directory
+       checkpoint = ray.train.Checkpoint.from_directory(temp_checkpoint_dir)

        # Ray Train will automatically save the checkpoint to persistent storage,
        # so the local `temp_checkpoint_dir` can be safely cleaned up after.
+       ray.train.report(metrics=metrics, checkpoint=checkpoint)

     ...

有关更多详细信息,请参阅监控和记录指标以及保存和加载检查点

配置规模和 GPU#

在您的训练函数之外,创建一个 ScalingConfig 对象来配置

  1. num_workers - 分布式训练 worker 进程的数量。

  2. use_gpu - 每个 worker 是否应该使用 GPU(或 CPU)。

from ray.train import ScalingConfig
scaling_config = ScalingConfig(num_workers=2, use_gpu=True)

有关更多详细信息,请参阅配置规模和 GPU

配置持久化存储#

创建一个 RunConfig 对象,指定保存结果(包括检查点和 artifact)的路径。

from ray.train import RunConfig

# Local path (/some/local/path/unique_run_name)
run_config = RunConfig(storage_path="/some/local/path", name="unique_run_name")

# Shared cloud storage URI (s3://bucket/unique_run_name)
run_config = RunConfig(storage_path="s3://bucket", name="unique_run_name")

# Shared NFS path (/mnt/nfs/unique_run_name)
run_config = RunConfig(storage_path="/mnt/nfs", name="unique_run_name")

警告

为单节点集群指定共享存储位置(例如云存储或 NFS)是可选的,但对于多节点集群是必需的。 对于多节点集群,使用本地路径将在检查点时引发错误

有关更多详细信息,请参阅配置持久化存储

启动训练作业#

将这一切结合起来,您现在可以使用 TorchTrainer 启动一个分布式训练作业。

from ray.train.torch import TorchTrainer

trainer = TorchTrainer(
    train_func, scaling_config=scaling_config, run_config=run_config
)
result = trainer.fit()

访问训练结果#

训练完成后,将返回一个 Result 对象,其中包含有关训练运行的信息,包括训练期间报告的指标和检查点。

result.metrics     # The metrics reported during training.
result.checkpoint  # The latest checkpoint reported during training.
result.path        # The path where logs are stored.
result.error       # The exception that was raised, if training failed.

有关更多使用示例,请参阅检查训练结果

下一步#

将 PyTorch 训练脚本转换为使用 Ray Train 后

  • 请参阅用户指南,了解如何执行特定任务。

  • 浏览示例,获取 Ray Train 的端到端使用示例。

  • 深入了解API 参考,了解本教程中使用的类和方法的更多详细信息。