处理失败和节点抢占(已弃用的 API)#

重要提示

本用户指南涵盖了已弃用的容错 API。请参阅 处理失败和节点抢占 获取新的 API 用户指南。

有关弃用和迁移的信息,请参阅 此处

自动从训练工作节点故障中恢复#

Ray Train 内置了容错机制,可以从工作节点故障(即 RayActorError)中恢复。检测到故障时,工作节点将被关闭,并添加新的工作节点。

训练函数将被重新启动,但可以通过检查点恢复之前的执行进度。

提示

为了在恢复时保留进度,您的训练函数必须实现 保存加载检查点 的逻辑。

每次从工作节点故障中恢复都被视为一次重试。重试次数可以通过传递给 TrainerRunConfig 中设置的 FailureConfig 参数的 max_failures 属性进行配置。

import ray.train

# Tries to recover a run up to this many times.
failure_config = ray.train.FailureConfig(max_failures=2)

# No limit on the number of retries.
failure_config = ray.train.FailureConfig(max_failures=-1)

将恢复哪个检查点?#

Ray Train 将自动从报告给 Ray Train 的最新可用 检查点 恢复训练。

这将是传递给 train.report() 的最后一个检查点。

恢复 Ray Train 实验#

在实验级别,Trainer 恢复允许您从中断处继续之前中断的实验。

Train 实验可能由于以下原因之一而被中断

  • 实验被手动中断(例如,Ctrl+C,或被抢占的头部节点实例)。

  • 头部节点崩溃(例如,OOM 或其他运行时错误)。

  • 整个集群宕机(例如,影响所有节点的网络错误)。

Trainer 恢复适用于 Ray Train 的所有内置 Trainer,但我们在示例中使用 TorchTrainer 进行演示。我们还使用 <Framework>Trainer 来指代所有内置 Trainer 共享的方法。

假设您的初始 Train 实验配置如下。实际的训练循环仅用于演示:重要的是已实现了 保存加载检查点

import os
import tempfile
from typing import Dict, Optional

import torch

import ray
from ray import train
from ray.train import Checkpoint
from ray.train.torch import TorchTrainer


def get_datasets() -> Dict[str, ray.data.Dataset]:
    return {"train": ray.data.from_items([{"x": i, "y": 2 * i} for i in range(10)])}


def train_loop_per_worker(config: dict):
    from torchvision.models import resnet18

    model = resnet18()

    # Checkpoint loading
    checkpoint: Optional[Checkpoint] = train.get_checkpoint()
    if checkpoint:
        with checkpoint.as_directory() as checkpoint_dir:
            model_state_dict = torch.load(os.path.join(checkpoint_dir, "model.pt"))
            model.load_state_dict(model_state_dict)

    model = train.torch.prepare_model(model)

    train_ds = train.get_dataset_shard("train")

    for epoch in range(5):
        # Do some training...

        # Checkpoint saving
        with tempfile.TemporaryDirectory() as tmpdir:
            torch.save(model.module.state_dict(), os.path.join(tmpdir, "model.pt"))
            train.report({"epoch": epoch}, checkpoint=Checkpoint.from_directory(tmpdir))


trainer = TorchTrainer(
    train_loop_per_worker=train_loop_per_worker,
    datasets=get_datasets(),
    scaling_config=train.ScalingConfig(num_workers=2),
    run_config=train.RunConfig(
        name="dl_trainer_restore", storage_path=os.path.expanduser("~/ray_results")
    ),
)
result = trainer.fit()

实验的结果和检查点会保存到 RunConfig 配置的路径中。如果实验因上述原因之一中断,请使用此路径进行恢复。

from ray.train.torch import TorchTrainer

restored_trainer = TorchTrainer.restore(
    path=os.path.expanduser("~/ray_results/dl_trainer_restore"),
    datasets=get_datasets(),
)

提示

您也可以从远程路径恢复(例如,从存储在 s3 存储桶中的实验目录)。

original_trainer = TorchTrainer(
    # ...
    run_config=train.RunConfig(
        # Configure cloud storage
        storage_path="s3://results-bucket",
        name="dl_trainer_restore",
    ),
)
result = trainer.fit()
restored_trainer = TorchTrainer.restore(
    "s3://results-bucket/dl_trainer_restore",
    datasets=get_datasets(),
)

注意

不同的 Trainer 在恢复时可能允许重新指定更多可选参数。只有 **数据集** 是在恢复时必需重新指定的,前提是它们在最初时已提供。

TorchTrainer.restoreTensorflowTrainer.restoreHorovodTrainer.restore 可以接受与其父类 DataParallelTrainer.restore 相同的参数。

除非另有说明,否则其他 Trainer 将接受与 BaseTrainer.restore 相同的参数。

自动恢复#

添加下面的分支逻辑将允许您在中断后运行相同的脚本,并从上次运行中断的地方继续训练。请注意,我们使用 <Framework>Trainer.can_restore 实用方法来确定给定实验目录的存在性和有效性。

experiment_path = os.path.expanduser("~/ray_results/dl_restore_autoresume")
if TorchTrainer.can_restore(experiment_path):
    trainer = TorchTrainer.restore(experiment_path, datasets=get_datasets())
    result = trainer.fit()
else:
    trainer = TorchTrainer(
        train_loop_per_worker=train_loop_per_worker,
        datasets=get_datasets(),
        scaling_config=train.ScalingConfig(num_workers=2),
        run_config=train.RunConfig(
            storage_path=os.path.expanduser("~/ray_results"),
            name="dl_restore_autoresume",
        ),
    )
result = trainer.fit()

另请参阅

有关完整示例,请参阅 BaseTrainer.restore 的文档字符串。

注意

<Framework>Trainer.restore<Framework>Trainer(..., resume_from_checkpoint=...) 不同。 resume_from_checkpoint 用于启动一个**新**的 Train 实验,该实验会将结果写入新目录并从迭代 0 开始。

<Framework>Trainer.restore 用于继续现有实验,新的结果将继续追加到现有日志中。