处理故障和节点抢占(已弃用 API)#

重要

本用户指南涵盖已弃用的容错 API。请参阅处理故障和节点抢占以获取新 API 用户指南。

请参阅此处获取有关弃用和迁移的信息。

从 Train Worker 故障中自动恢复#

Ray Train 内置了容错机制,可从 worker 故障(即 RayActorError)中恢复。检测到故障时,worker 将被关闭并添加新的 worker。

训练函数将重新启动,但可以通过检查点恢复先前执行的进度。

提示

为了在恢复时保留进度,您的训练函数必须同时实现保存加载检查点的逻辑。

从 worker 故障中恢复的每个实例都被视为一次重试。重试次数可通过传递给 TrainerRunConfig 参数集中 FailureConfigmax_failures 属性进行配置。

import ray.train

# Tries to recover a run up to this many times.
failure_config = ray.train.FailureConfig(max_failures=2)

# No limit on the number of retries.
failure_config = ray.train.FailureConfig(max_failures=-1)

将恢复哪个检查点?#

Ray Train 将自动从报告给 Ray Train 的最新可用检查点恢复训练。

这将是传递给 train.report() 的最后一个检查点。

恢复 Ray Train 实验#

在实验层面,Trainer 恢复允许您从先前中断的实验中断处恢复。

Train 实验可能由于以下原因之一中断

  • 实验被手动中断(例如,Ctrl+C,或被抢占的头节点实例)。

  • 头节点崩溃(例如,OOM 或其他运行时错误)。

  • 整个集群宕机(例如,影响所有节点的网络错误)。

Ray Train 的所有内置 trainer 都可能进行 Trainer 恢复,但在示例中我们使用 TorchTrainer 进行演示。我们也使用 <Framework>Trainer 来指代所有内置 trainer 共享的方法。

假设您的初始 Train 实验配置如下。实际训练循环仅用于演示:重要细节是已经实现了保存加载检查点的逻辑。

import os
import tempfile
from typing import Dict, Optional

import torch

import ray
from ray import train
from ray.train import Checkpoint
from ray.train.torch import TorchTrainer


def get_datasets() -> Dict[str, ray.data.Dataset]:
    return {"train": ray.data.from_items([{"x": i, "y": 2 * i} for i in range(10)])}


def train_loop_per_worker(config: dict):
    from torchvision.models import resnet18

    model = resnet18()

    # Checkpoint loading
    checkpoint: Optional[Checkpoint] = train.get_checkpoint()
    if checkpoint:
        with checkpoint.as_directory() as checkpoint_dir:
            model_state_dict = torch.load(os.path.join(checkpoint_dir, "model.pt"))
            model.load_state_dict(model_state_dict)

    model = train.torch.prepare_model(model)

    train_ds = train.get_dataset_shard("train")

    for epoch in range(5):
        # Do some training...

        # Checkpoint saving
        with tempfile.TemporaryDirectory() as tmpdir:
            torch.save(model.module.state_dict(), os.path.join(tmpdir, "model.pt"))
            train.report({"epoch": epoch}, checkpoint=Checkpoint.from_directory(tmpdir))


trainer = TorchTrainer(
    train_loop_per_worker=train_loop_per_worker,
    datasets=get_datasets(),
    scaling_config=train.ScalingConfig(num_workers=2),
    run_config=train.RunConfig(
        name="dl_trainer_restore", storage_path=os.path.expanduser("~/ray_results")
    ),
)
result = trainer.fit()

实验结果和检查点保存到由 RunConfig 配置的路径。如果实验因上述原因之一中断,请使用此路径恢复

from ray.train.torch import TorchTrainer

restored_trainer = TorchTrainer.restore(
    path=os.path.expanduser("~/ray_results/dl_trainer_restore"),
    datasets=get_datasets(),
)

提示

您还可以从远程路径恢复(例如,从存储在 s3 存储桶中的实验目录)。

original_trainer = TorchTrainer(
    # ...
    run_config=train.RunConfig(
        # Configure cloud storage
        storage_path="s3://results-bucket",
        name="dl_trainer_restore",
    ),
)
result = trainer.fit()
restored_trainer = TorchTrainer.restore(
    "s3://results-bucket/dl_trainer_restore",
    datasets=get_datasets(),
)

注意

不同的 trainer 可以在恢复时选择重新指定更多参数。如果最初提供了数据集,则只有数据集需要在恢复时重新指定。

TorchTrainer.restoreTensorflowTrainer.restoreHorovodTrainer.restore 可以接受与其父类 DataParallelTrainer.restore 相同的参数。

除非另有说明,否则其他 trainer 将接受与 BaseTrainer.restore 相同的参数。

自动恢复#

添加下面的分支逻辑将允许您在中断后运行相同的脚本,从上次运行中断的地方继续训练。请注意,我们使用 <Framework>Trainer.can_restore 工具方法来确定给定实验目录的存在和有效性。

experiment_path = os.path.expanduser("~/ray_results/dl_restore_autoresume")
if TorchTrainer.can_restore(experiment_path):
    trainer = TorchTrainer.restore(experiment_path, datasets=get_datasets())
    result = trainer.fit()
else:
    trainer = TorchTrainer(
        train_loop_per_worker=train_loop_per_worker,
        datasets=get_datasets(),
        scaling_config=train.ScalingConfig(num_workers=2),
        run_config=train.RunConfig(
            storage_path=os.path.expanduser("~/ray_results"),
            name="dl_restore_autoresume",
        ),
    )
result = trainer.fit()

另请参阅

请参阅 BaseTrainer.restore 文档字符串以获取完整示例。

注意

<Framework>Trainer.restore<Framework>Trainer(..., resume_from_checkpoint=...) 不同。resume_from_checkpoint 用于启动一个的 Train 实验,该实验将结果写入新目录并从迭代 0 开始。

<Framework>Trainer.restore 用于继续现有实验,新结果将继续追加到现有日志中。