如何在 Ray Tune 中启用容错#

容错是分布式机器学习实验中的一个重要特性,有助于减轻因内存不足和磁盘空间不足导致的节点故障影响。

通过容错,用户可以

  • 通过保留训练进度来节省时间和资源,即使节点发生故障。

  • 在分布式设置中,利用可抢占 Spot 实例节点的成本优势

另请参阅

分布式 Tune 实验中,启用容错的一个先决条件是配置某种形式的持久化存储,用于整合所有试验结果和检查点。请参阅 如何在 Ray Tune 中配置持久化存储

在本指南中,我们将介绍如何在 Ray Tune 中启用不同类型的容错。

Tune 中的实验级别容错#

在实验级别,Tuner.restore 可以从中断的地方恢复之前中断的实验。

在以下情况下应使用 Tuner.restore

  1. 调用 Tuner.fit() 的驱动脚本出错(例如,由于头节点内存不足或磁盘空间不足)。

  2. 实验被手动中断,例如使用 Ctrl+C

  3. 整个集群及其上的实验因短暂错误而崩溃,例如网络中断或 Ray 对象存储内存占满。

注意

Tuner.restore 用于恢复已终止的实验并修改超参数搜索空间或停止标准。相反,实验恢复旨在恢复并完成先前通过 Tuner.fit 提交的确切作业

例如,考虑一个 Tune 实验,配置运行 10 次训练迭代,且所有试验均已完成。在这种情况下,Tuner.restore 不能用于恢复实验,将其训练迭代次数更改为 20,然后继续训练。

相反,这应该通过启动一个实验并使用之前实验中的检查点初始化模型权重来实现。请参阅 此 FAQ 帖子 了解示例。

注意

用户定义的训练循环中的错误无法通过恢复来修复。相反,导致实验崩溃的最初问题应该是短暂的,这意味着恢复后的重试尝试下次可以成功。

恢复 Tune 实验#

假设你的初始 Tune 实验配置如下。实际的训练循环仅用于演示目的:重要的细节是 在 trainable 中实现了保存和加载检查点

import json
import os
import tempfile

from ray import tune


def trainable(config):
    # Checkpoint loading
    checkpoint = tune.get_checkpoint()
    start = 1
    if checkpoint:
        with checkpoint.as_directory() as checkpoint_dir:
            with open(os.path.join(checkpoint_dir, "checkpoint.json"), "r") as f:
                state = json.load(f)
        start = state["epoch"] + 1

    for epoch in range(start, config["num_epochs"]):
        # Do some training...

        # Checkpoint saving
        with tempfile.TemporaryDirectory() as temp_checkpoint_dir:
            with open(os.path.join(temp_checkpoint_dir, "checkpoint.json"), "w") as f:
                json.dump({"epoch": epoch}, f)
            tune.report(
                {"epoch": epoch},
                checkpoint=tune.Checkpoint.from_directory(temp_checkpoint_dir),
            )


tuner = tune.Tuner(
    trainable,
    param_space={"num_epochs": 10},
    run_config=tune.RunConfig(
        storage_path=os.path.expanduser("~/ray_results"),
        name="tune_fault_tolerance_guide",
    ),
)
result_grid = tuner.fit()

实验的结果和检查点保存到 ~/ray_results/tune_fault_tolerance_guide,如 RunConfig 配置。如果实验因上述原因之一中断,使用此路径恢复

tuner = tune.Tuner.restore(
    os.path.expanduser("~/ray_results/tune_fault_tolerance_guide"),
    trainable=trainable,
    resume_errored=True,
)
tuner.fit()

提示

你也可以从云存储桶路径恢复实验

tuner = tune.Tuner.restore(
    path="s3://cloud-bucket/tune_fault_tolerance_guide", trainable=trainable
)

请参阅 如何在 Ray Tune 中配置持久化存储

恢复配置#

Tune 允许配置哪些试验应该被恢复,基于实验中断时的状态

  • 默认情况下,处于 RUNNING 状态的未完成试验将被恢复。

  • 处于 ERRORED 状态的试验可以被恢复或从头重试。

  • 处于 TERMINATED 状态的试验无法恢复。

tuner = tune.Tuner.restore(
    os.path.expanduser("~/ray_results/tune_fault_tolerance_guide"),
    trainable=trainable,
    resume_errored=True,
    restart_errored=False,
    resume_unfinished=True,
)

自动恢复#

在生产环境中运行时,可能需要一个单独的脚本,该脚本 (1) 在开始时启动初始训练运行,以及 (2) 如果 (1) 已经发生,则恢复实验。

使用 Tuner.can_restore 实用程序来实现此目的

import os
from ray import tune

storage_path = os.path.expanduser("~/ray_results")
exp_name = "tune_fault_tolerance_guide"
path = os.path.join(storage_path, exp_name)

if tune.Tuner.can_restore(path):
    tuner = tune.Tuner.restore(path, trainable=trainable, resume_errored=True)
else:
    tuner = tune.Tuner(
        trainable,
        param_space={"num_epochs": 10},
        run_config=tune.RunConfig(storage_path=storage_path, name=exp_name),
    )
tuner.fit()

首次运行此脚本将启动初始训练运行。第二次运行此脚本将尝试从首次运行的输出中恢复。

使用 Ray 对象引用恢复 Tune 实验 (高级)#

实验恢复通常发生在与原始运行不同的 Ray session 中,在这种情况下,Ray 对象引用会被自动垃圾回收。如果对象引用与实验状态一起保存(例如,在每个试验的 config 中),那么在恢复后尝试检索这些对象将无法正常工作:这些引用指向的对象已不再存在。

为了解决这个问题,你必须重新创建这些对象,将它们放入 Ray 对象存储中,然后将新的对象引用传递给 Tune。

示例#

假设我们有一个大型预训练模型,我们想在训练循环中以某种方式使用它。例如,这可能是一个图像分类模型,用于计算 Inception Score 来评估生成模型的质量。我们可能有多个模型需要调优,每个试验会采样其中一个模型来使用。

import ray
from ray import tune


class LargeModel:
    def __init__(self, model_id):
        self.model_id = model_id
        # Load weights based on the `model_id`...


def train_fn(config):
    # Retrieve the model from the object store.
    model = ray.get(config["model_ref"])
    print(model.model_id)


# These models may be large, so `ray.put` them in the Ray Object Store
# to share the models between trials.
model_refs = [ray.put(LargeModel(1)), ray.put(LargeModel(2))]

tuner = tune.Tuner(
    train_fn,
    # Tune over the object references!
    param_space={"model_ref": tune.grid_search(model_refs)},
    run_config=tune.RunConfig(
        storage_path=os.path.expanduser("~/ray_results"), name="restore_object_refs"
    ),
)
tuner.fit()

要恢复,我们只需要通过 Tuner.restore 重新指定 param_space

# Re-create the objects and put them in the object store.
param_space = {
    "model_ref": tune.grid_search([ray.put(LargeModel(1)), ray.put(LargeModel(2))])
}

tuner = tune.Tuner.restore(
    os.path.expanduser("~/ray_results/restore_object_refs"),
    trainable=train_fn,
    # Re-specify the `param_space` to update the object references.
    param_space=param_space,
    resume_errored=True,
)
tuner.fit()

注意

如果你正在对 Ray Data 进行调优,你也需要在 param_space 中重新指定它们。Ray Data 可以包含对象引用,因此上面描述的同样问题也适用。

请看下面的示例

ds_1 = ray.data.from_items([{"x": i, "y": 2 * i} for i in range(128)])
ds_2 = ray.data.from_items([{"x": i, "y": 3 * i} for i in range(128)])

param_space = {
    "datasets": {"train": tune.grid_search([ds_1, ds_2])},
}

tuner = tune.Tuner.restore(..., param_space=param_space)

Tune 中的试验级别容错#

试验级别容错处理集群中单个试验的失败,这些失败可能由以下原因引起

  • 使用可抢占 Spot 实例运行。

  • 短暂的网络连接问题。

  • 节点内存不足或磁盘空间不足。

Ray Tune 提供了一种使用 FailureConfig 配置单个试验故障处理的方式。

假设我们正在使用前一个示例中实现了试验检查点保存和加载的 trainable,下面是如何配置 FailureConfig

from ray import tune

tuner = tune.Tuner(
    trainable,
    param_space={"num_epochs": 10},
    run_config=tune.RunConfig(
        storage_path=os.path.expanduser("~/ray_results"),
        name="trial_fault_tolerance",
        failure_config=tune.FailureConfig(max_failures=3),
    ),
)
tuner.fit()

当一个试验遇到运行时错误时,上述配置将重新调度该试验,最多 max_failures=3 次。

类似地,如果节点 X 发生故障(例如,被抢占或丢失连接),此配置将重新调度位于节点 X 上的所有试验,最多 3 次。

总结#

在本用户指南中,我们介绍了如何在 Ray Tune 中启用实验级别和试验级别的容错。

请参阅以下资源获取更多信息