处理故障和节点抢占#

重要提示

本用户指南展示了如何为 Ray 2.43 起可用的全新 Ray Train V2 配置容错,方法是启用环境变量 RAY_TRAIN_V2_ENABLED=1本用户指南假定环境变量已启用。

有关弃用和迁移的信息,请参见 此处

Ray Train 在三个级别提供容错

  1. 工作进程容错处理在执行用户定义的训练函数时发生在一个或多个 Train 工作进程上的错误。

  2. 工作节点容错处理训练过程中可能发生的节点故障。

  3. 作业驱动程序容错处理 Ray Train 驱动程序进程崩溃的情况,需要重新启动训练,可能是在新集群上。

本用户指南涵盖了如何配置和使用这些容错机制。

工作进程和节点容错#

工作进程故障是指在训练工作进程的用户定义训练函数中发生的错误,例如 GPU 内存不足 (OOM) 错误、云存储访问错误或其他运行时错误。

节点故障是指导致整个节点宕机的错误,包括节点抢占、OOM、网络分区或其他硬件故障。本节涵盖工作节点故障。关于从主节点故障恢复的讨论,请参见 下一节

Ray Train 可以配置为自动从工作进程和工作节点故障中恢复。检测到故障时,所有工作进程将被关闭,如有必要将添加新节点,并启动一组新的工作进程。重新启动的训练工作进程可以通过加载最新检查点来恢复训练。

为了在恢复时保留进度,您的训练函数应实现 保存 加载检查点 的逻辑。否则,训练将从头开始。

每次从工作进程或节点故障恢复都被视为一次重试。重试次数可以通过传递给 TrainerRunConfig 中设置的 FailureConfig 参数的 max_failures 属性进行配置。默认情况下,工作进程容错被禁用,max_failures=0

import ray.train

# Tries to recover a run up to this many times.
failure_config = ray.train.FailureConfig(max_failures=2)

# No limit on the number of retries.
failure_config = ray.train.FailureConfig(max_failures=-1)

总而言之,一个具有工作进程容错功能的 Torch 训练脚本示例如下

import tempfile
import uuid

import ray.train
import ray.train.torch


def train_fn_per_worker(train_loop_config: dict):
    # [1] Train worker restoration logic.
    checkpoint = ray.train.get_checkpoint()
    if checkpoint:
        with checkpoint.as_directory() as temp_checkpoint_dir:
            # model.load_state_dict(torch.load(...))
            ...

    # [2] Checkpoint saving and reporting logic.
    with tempfile.TemporaryDirectory() as temp_checkpoint_dir:
        # torch.save(...)
        ray.train.report(
            {"loss": 0.1},
            checkpoint=ray.train.Checkpoint.from_directory(temp_checkpoint_dir),
        )


trainer = ray.train.torch.TorchTrainer(
    train_fn_per_worker,
    scaling_config=ray.train.ScalingConfig(num_workers=4),
    run_config=ray.train.RunConfig(
        # (If multi-node, configure S3 / NFS as the storage path.)
        # storage_path="s3://...",
        name=f"train_run-{uuid.uuid4().hex}",
        # [3] Enable worker-level fault tolerance to gracefully handle
        # Train worker failures.
        failure_config=ray.train.FailureConfig(max_failures=3),
    ),
)
trainer.fit()

将恢复哪个检查点?#

Ray Train 将使用最新可用的 报告给 Ray Train 的检查点 来填充 ray.train.get_checkpoint()。此方法返回的 Checkpoint 对象具有 as_directory()to_directory() 方法,用于从 RunConfig(storage_path) 下载检查点到本地磁盘。

注意

as_directory()to_directory() 即使节点上有多个工作进程,也只会在每个节点上下载一次检查点。工作进程在本地磁盘上共享相同的检查点目录。

图示示例#

考虑一个包含 CPU 主节点和 2 个 GPU 工作节点集群的示例。在 2 个工作节点上运行 4 个 GPU 训练工作进程。 存储路径已配置 为使用云存储,检查点就保存在那里。

../../_images/worker_failure_start.png

训练已运行一段时间,最新的检查点已保存到云存储。#

../../_images/worker_node_failure.png

其中一个工作 GPU 节点因硬件故障而失败。Ray Train 检测到此故障并关闭所有工作进程。由于当前检测到的故障次数少于配置的 max_failures,Ray Train 将尝试重新启动训练,而不是退出并引发错误。#

../../_images/worker_node_replacement.png

Ray Train 已请求一个新工作节点加入集群,并正在等待其启动。#

../../_images/worker_group_recovery.png

新工作节点已加入集群。Ray Train 重新启动所有工作进程,并为它们提供最新的检查点。工作进程从存储中下载检查点并使用它来恢复训练。#

作业驱动程序容错#

作业驱动程序容错用于处理 Ray Train 驱动程序进程被中断的情况。Ray Train 驱动程序进程是调用 trainer.fit() 的进程,通常位于集群的主节点上。

驱动程序进程可能因以下原因之一而被中断

  • 用户手动中断运行(例如,Ctrl+C)。

  • 运行驱动程序进程的节点(主节点)崩溃(例如,内存不足、磁盘空间不足)。

  • 整个集群宕机(例如,影响所有节点的网络错误)。

在这些情况下,需要重新启动 Ray Train 驱动程序(调用 trainer.fit())。重新启动的 Ray Train 驱动程序需要找到最少的运行状态才能接续上次运行中断的地方。此状态包括最新报告的检查点,它们位于 存储路径。Ray Train 从存储中获取最新的检查点信息,并将其传递给新启动的工作进程以恢复训练。

为了查找此运行状态,Ray Train 依赖于传入与上次运行 **相同** 的 RunConfig(storage_path, name) 对。如果 storage_pathname 不匹配,Ray Train 将无法找到上次运行的状态,并将从头开始一个新的运行。

警告

如果 name 被无意中重用,Ray Train 将获取上次运行的状态,即使用户尝试启动新的运行。因此,在启动新运行时,请务必传递唯一的运行名称。换句话说,name 应该是训练作业的唯一标识符。

注意

作业驱动程序崩溃和中断不计入 工作进程容错max_failures 限制。

以下是一个突出作业驱动程序容错最佳实践的示例训练脚本

# entrypoint.py

import argparse
import tempfile
import uuid

import ray.train
import ray.train.torch


def train_fn_per_worker(train_loop_config: dict):
    # [1] Train worker restoration logic.
    checkpoint = ray.train.get_checkpoint()
    if checkpoint:
        with checkpoint.as_directory() as temp_checkpoint_dir:
            # model.load_state_dict(torch.load(...))
            ...

    # [2] Checkpoint saving and reporting logic.
    with tempfile.TemporaryDirectory() as temp_checkpoint_dir:
        # torch.save(...)
        ray.train.report(
            {"loss": 0.1},
            checkpoint=ray.train.Checkpoint.from_directory(temp_checkpoint_dir),
        )


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--storage_path", type=str, required=True)
    parser.add_argument("--run_name", type=str, required=True)
    args = parser.parse_args()

    trainer = ray.train.torch.TorchTrainer(
        train_fn_per_worker,
        scaling_config=ray.train.ScalingConfig(num_workers=4),
        run_config=ray.train.RunConfig(
            # [3] Enable worker-level fault tolerance to gracefully handle
            # Train worker failures.
            failure_config=ray.train.FailureConfig(max_failures=3),
            # [4] (Recommendation) The (storage_path, name) pair should be
            # determined by the job submitter and passed in as arguments
            # to the entrypoint script.
            storage_path=args.storage_path,
            name=args.run_name,
        ),
    )
    trainer.fit()

然后,可以使用以下命令启动入口脚本

python entrypoint.py --storage_path s3://my_bucket/ --run_name unique_run_id=da823d5

如果作业被中断,可以使用相同的命令恢复训练。此示例显示了一个 da823d5 ID,该 ID 由启动作业的人确定。此 ID 通常可用于其他目的,例如设置 wandbmlflow 运行 ID。

图示示例#

考虑一个包含 CPU 主节点和 2 个 GPU 工作节点集群的示例。在 2 个工作节点上运行 4 个 GPU 训练工作进程。存储路径已配置为使用云存储,检查点就保存在那里。

../../_images/cluster_failure_start.png

训练已运行一段时间,最新的检查点和运行状态已保存到存储。#

../../_images/head_node_failure.png

主节点因某种原因(例如,内存不足错误)而崩溃,Ray Train 驱动程序进程被中断。#

../../_images/cluster_failure.png

由于主节点故障,整个集群宕机。#

../../_images/cluster_recovery.png

手动重启集群或某个作业提交系统启动了新的 Ray 集群。Ray Train 驱动程序进程在新主节点上运行。Ray Train 从 {storage_path}/{name}(例如,s3://my_bucket/my_run_name)的存储中获取运行状态信息,并将最新的检查点传递给新启动的工作进程以恢复训练。#

容错 API 弃用#

<Framework>Trainer.restore API 弃用#

从 Ray 2.43 开始,<Framework>Trainer.restore<Framework>Trainer.can_restore API 已弃用,并将被移除。

动机#

此 API 更改带来了多项好处

  1. 避免将用户代码保存到 pickle 文件:旧 API 将用户代码保存到 pickle 文件,这可能导致反序列化问题,从而导致运行无法恢复。

  2. 改进的配置体验:虽然某些配置从 pickle 文件加载,但某些参数需要重新指定,而另一部分参数甚至可以可选地重新指定。这使用户对正在恢复运行中实际使用的配置集感到困惑。

迁移步骤#

要从旧的 <Framework>Trainer.restore API 迁移到新模式

  1. 启用环境变量 RAY_TRAIN_V2_ENABLED=1

  2. <Framework>Trainer.restore 替换为常规的 <Framework>Trainer 构造函数,确保传入与上次运行 **相同** 的 storage_pathname

<Framework>Trainer(restore_from_checkpoint) API 弃用#

从 Ray 2.43 开始,<Framework>Trainer(restore_from_checkpoint) API 已弃用,并将被移除。

动机#

此 API 是造成混淆的常见原因,其价值很小。它仅用于设置 ray.train.get_checkpoint() 的初始值,但未加载任何其他运行状态。

迁移步骤#

只需通过 train_loop_config 参数传入初始检查点。有关代码示例,请参阅下方链接的迁移指南。

附加资源#