本地模式#

重要提示

本用户指南仅展示如何使用 Ray Train V2 的本地模式。有关从 Ray Train V1 迁移到 V2 的信息,请参阅 Train V2 迁移指南: ray-project/ray#49454

什么是本地模式?#

Ray Train 中的本地模式会在不启动 Ray Train worker actor 的情况下运行您的训练函数。本地模式不会将您的训练代码分发到多个 Ray actor,而是直接在当前进程中执行您的训练函数。这提供了一个简化的调试环境,让您可以快速迭代您的训练逻辑。

本地模式支持两种执行模式

  • 单进程模式:在单个进程中运行您的训练函数,非常适合快速迭代和调试。

  • torchrun 多进程模式:启动多个进程以实现多 GPU 训练,有助于使用熟悉工具调试分布式训练逻辑。

如何启用本地模式#

您可以通过在您的 ScalingConfig 中设置 num_workers=0 来启用本地模式。

from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer

def train_func(config):
    # Your training logic
    pass

trainer = TorchTrainer(
    train_loop_per_worker=train_func,
    scaling_config=ScalingConfig(num_workers=0),
)
result = trainer.fit()

本地模式提供与您在分布式训练中使用的 ray.train API 相同的接口,因此您的训练代码无需任何其他修改即可运行。这使得在扩展到分布式训练之前,可以在本地轻松验证您的训练逻辑。

何时使用本地模式#

使用单进程本地模式来

  • 快速开发和迭代:在本地测试对训练函数的更改。

  • 编写单元测试:在简化环境中验证您的训练逻辑是否正常工作。

  • 调试训练逻辑:使用标准的 Python 调试工具逐行检查您的训练代码并识别问题。

使用 torchrun 的多进程本地模式来

  • 测试多 GPU 逻辑:使用熟悉的 torchrun 命令,验证您的分布式训练代码是否能在多个 GPU 上正确运行。

  • 迁移现有代码:将现有的基于 torchrun 的训练脚本引入 Ray Train,同时保留您的开发工作流程。

  • 调试分布式行为:使用 torchrun 的进程管理来隔离分布式训练逻辑中的问题。

注意

在本地模式下,Ray Train 不会启动 worker actor,但您的训练代码仍可使用其他 Ray 功能,例如 Ray Data(在单进程模式下)或在需要时启动 Ray actor。

单进程本地模式#

以下示例展示了如何使用 PyTorch 的单进程本地模式。

import torch
from torch import nn
import ray
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer

def train_func(config):
    model = nn.Linear(10, 1)
    optimizer = torch.optim.SGD(model.parameters(), lr=config["lr"])

    for epoch in range(config["epochs"]):
        # Training loop
        loss = model(torch.randn(32, 10)).sum()
        loss.backward()
        optimizer.step()

        # Report metrics
        ray.train.report({"loss": loss.item()})

trainer = TorchTrainer(
    train_loop_per_worker=train_func,
    train_loop_config={"lr": 0.01, "epochs": 3},
    scaling_config=ScalingConfig(num_workers=0),
)
result = trainer.fit()
print(f"Final loss: {result.metrics['loss']}")

注意

本地模式与所有 Ray Train 框架集成兼容,包括 PyTorch Lightning、Hugging Face Transformers、LightGBM、XGBoost、TensorFlow 等。

使用本地模式进行测试#

以下示例展示了如何使用本地模式编写单元测试。

import pytest
import ray
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer

def test_training_runs():
    def train_func(config):
        # Report minimal training result
        ray.train.report({"loss": 0.5})

    trainer = TorchTrainer(
        train_loop_per_worker=train_func,
        scaling_config=ScalingConfig(num_workers=0),
    )
    result = trainer.fit()

    assert result.error is None
    assert result.metrics["loss"] == 0.5

将本地模式与 Ray Data 结合使用#

单进程本地模式可与 Ray Data 无缝配合,用于数据加载和预处理。当您在本地模式下使用 Ray Data 时,Ray Data 会处理您的数据,并将其返回到本地进程中的训练函数。

以下示例展示了如何将 Ray Data 与单进程本地模式结合使用。

import ray
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer

def train_func(config):
    # Get the dataset shard
    train_dataset = ray.train.get_dataset_shard("train")

    # Iterate over batches
    for batch in train_dataset.iter_batches(batch_size=32):
        # Training logic
        pass

# Create a Ray Dataset
dataset = ray.data.read_csv("s3://bucket/data.csv")

trainer = TorchTrainer(
    train_loop_per_worker=train_func,
    scaling_config=ScalingConfig(num_workers=0),
    datasets={"train": dataset},
)
result = trainer.fit()

警告

在使用 torchrun 进行本地模式下的多进程训练时,Ray Data 不受支持。对于多进程训练,请使用标准的 PyTorch 数据加载机制,例如带有 DistributedSampler 的 DataLoader。

使用 torchrun 的多进程本地模式#

本地模式通过 torchrun 支持多 GPU 训练,让您可以使用 torchrun 的进程管理进行开发和调试。

单节点多 GPU 训练#

以下示例展示了如何在单节点上为多 GPU 训练使用 torchrun 和本地模式。当迁移现有的 PyTorch 训练代码或希望使用 torchrun 熟悉的进程管理来调试分布式训练逻辑时,此方法很有用。该示例使用标准的 PyTorch DataLoader 进行数据加载,便于您适应现有的 PyTorch 训练代码。

首先,创建您的训练脚本(train_script.py

import os
import tempfile
import torch
import torch.distributed as dist
from torch import nn
from torch.utils.data import DataLoader
from torchvision.datasets import FashionMNIST
from torchvision.transforms import ToTensor, Normalize, Compose
from filelock import FileLock
import ray
from ray.train import Checkpoint, ScalingConfig, get_context
from ray.train.torch import TorchTrainer

def train_func(config):
    # Load dataset with file locking to avoid multiple downloads
    transform = Compose([ToTensor(), Normalize((0.5,), (0.5,))])
    data_dir = "./data"
    # Only local rank 0 downloads the dataset
    local_rank = get_context().get_local_rank()
    if local_rank == 0:
        with FileLock(os.path.join(data_dir, "fashionmnist.lock")):
            train_dataset = FashionMNIST(
                root=data_dir, train=True, download=True, transform=transform
            )

    # Wait for rank 0 to finish downloading
    dist.barrier()

    # Now all ranks can safely load the dataset
    train_dataset = FashionMNIST(
        root=data_dir, train=True, download=False, transform=transform
    )
    train_loader = DataLoader(
        train_dataset, batch_size=config["batch_size"], shuffle=True
    )

    # Prepare dataloader for distributed training
    train_loader = ray.train.torch.prepare_data_loader(train_loader)

    # Prepare model for distributed training
    model = nn.Sequential(
        nn.Flatten(),
        nn.Linear(28 * 28, 128),
        nn.ReLU(),
        nn.Linear(128, 10)
    )
    model = ray.train.torch.prepare_model(model)

    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=config["lr"])

    # Training loop
    for epoch in range(config["epochs"]):
        # Set epoch for distributed sampler
        if ray.train.get_context().get_world_size() > 1:
            train_loader.sampler.set_epoch(epoch)

        epoch_loss = 0.0
        for batch_idx, (images, labels) in enumerate(train_loader):
            outputs = model(images)
            loss = criterion(outputs, labels)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            epoch_loss += loss.item()

        avg_loss = epoch_loss / len(train_loader)

        # Report metrics and checkpoint
        with tempfile.TemporaryDirectory() as temp_dir:
            torch.save(model.state_dict(), os.path.join(temp_dir, "model.pt"))
            ray.train.report(
                {"loss": avg_loss, "epoch": epoch},
                checkpoint=Checkpoint.from_directory(temp_dir)
            )

# Configure trainer for local mode
trainer = TorchTrainer(
    train_loop_per_worker=train_func,
    train_loop_config={"lr": 0.001, "epochs": 10, "batch_size": 32},
    scaling_config=ScalingConfig(num_workers=0, use_gpu=True),
)
result = trainer.fit()

然后,使用 torchrun 启动训练

# Train on 4 GPUs on a single node
torchrun --nproc-per-node=4 train_script.py

Ray Train 会自动检测 torchrun 环境变量并相应地配置分布式训练。您可以通过 ray.train.get_context() 访问分布式训练信息。

from ray.train import get_context

context = get_context()
print(f"World size: {context.get_world_size()}")
print(f"World rank: {context.get_world_rank()}")
print(f"Local rank: {context.get_local_rank()}")

警告

在使用 torchrun 进行本地模式下的多进程训练时,Ray Data 不受支持。对于多进程训练,请使用标准的 PyTorch 数据加载机制,例如带有 DistributedSampler 的 DataLoader。

多节点多 GPU 训练#

您还可以使用 torchrun 通过本地模式启动多节点训练。以下示例展示了如何跨 2 个节点(每个节点有 4 个 GPU)启动训练。

在主节点(192.168.1.1)上

RAY_TRAIN_V2_ENABLED=1 torchrun \
    --nnodes=2 \
    --nproc-per-node=4 \
    --node_rank=0 \
    --rdzv_backend=c10d \
    --rdzv_endpoint=192.168.1.1:29500 \
    --rdzv_id=job_id \
    train_script.py

在工作节点上

RAY_TRAIN_V2_ENABLED=1 torchrun \
    --nnodes=2 \
    --nproc-per-node=4 \
    --node_rank=1 \
    --rdzv_backend=c10d \
    --rdzv_endpoint=192.168.1.1:29500 \
    --rdzv_id=job_id \
    train_script.py

从本地模式过渡到分布式训练#

当您准备好从本地模式扩展到分布式训练时,只需将 num_workers 更改为大于 0 的值即可。

 trainer = TorchTrainer(
     train_loop_per_worker=train_func,
     train_loop_config=config,
-    scaling_config=ScalingConfig(num_workers=0),
+    scaling_config=ScalingConfig(num_workers=4, use_gpu=True),
 )

您的训练函数代码保持不变,Ray Train 会自动处理分布式协调。

限制和 API 差异#

本地模式提供了 Ray Train API 的简化实现,以便在没有分布式协调的情况下进行快速调试。但是,这意味着某些功能的行为方式不同或不可用。

本地模式下不可用的功能#

以下 Ray Train 功能在本地模式下不可用:

  • Worker 级容错:Ray Train 的自动容错功能(如 worker 故障时重启)不可用。如果您配置了 FailureConfig,这些设置在本地模式下不适用。

  • Callbacks:在 RunConfig 中指定的自定义回调函数在本地模式下不会被调用。

  • 带多进程训练的 Ray Data:在使用 torchrun 进行本地模式下的多进程训练时,Ray Data 不受支持。请改用标准的 PyTorch 数据加载机制。

API 行为差异#

下表总结了 ray.train API 在本地模式下的行为差异。

API

本地模式下的行为

ray.train.report()

仅在内存中存储 checkpoint(不持久化到存储)。忽略 checkpoint_upload_modecheckpoint_upload_fnvalidate_fndelete_local_checkpoint_after_upload 参数。本地记录指标,而不是通过报告管道。不调用 worker 之间的同步屏障。

ray.train.get_checkpoint()

返回内存中的最后一个 checkpoint。不从持久化存储加载 checkpoint。

ray.train.get_all_reported_checkpoints()

始终返回空列表。不跟踪 checkpoint 历史记录。

ray.train.collective.barrier()

无操作。

ray.train.collective.broadcast_from_rank_zero()

按原样返回数据。

ray.train.get_context().get_storage()

引发 NotImplementedError