Ray Tune 中的容错设置#

容错是分布式机器学习实验的一项重要功能,可以帮助减轻由于内存不足和磁盘空间不足而导致的节点故障的影响。

通过容错,用户可以

  • 节省时间和资源,保持训练进度,即使节点发生故障。

  • 在分布式环境中获得可抢占的竞价实例节点的成本节省

另请参阅

分布式 Tune 实验中,启用容错的前提是配置某种形式的持久化存储,用于汇总所有 trial 的结果和检查点。请参阅 Ray Tune 中的持久化存储配置方法

在本指南中,我们将介绍如何启用 Ray Tune 提供的不同类型的容错。

Tune 中的实验级容错#

在实验级别,Tuner.restore 可以从中断处恢复之前的实验。

您应该在以下情况使用 Tuner.restore

  1. 调用 Tuner.fit() 的驱动程序脚本出错(例如,由于主节点内存不足或磁盘空间不足)。

  2. 实验被手动中断,按下 Ctrl+C

  3. 整个集群及其中的实验因短暂错误而崩溃,例如网络中断或 Ray 对象存储内存已满。

注意

Tuner.restore 并非用于恢复已终止的实验并修改超参数搜索空间或停止条件。相反,实验恢复是为了恢复和完成通过 Tuner.fit 提交的完全相同的任务

例如,考虑一个配置为运行 10 次训练迭代的 Tune 实验,其中所有 trials 都已完成。此时不能使用 Tuner.restore 来恢复实验,将训练迭代次数更改为 20,然后继续训练。

相反,这应该通过启动一个的实验,并使用上一个实验的检查点来初始化模型权重来实现。请参阅 此 FAQ 帖子 获取示例。

注意

用户定义的训练循环中的错误无法通过恢复来修复。相反,导致实验首次崩溃的问题应该是短暂的,这意味着恢复后重试可以成功。

恢复 Tune 实验#

假设您的初始 Tune 实验配置如下。实际的训练循环仅用于演示目的:重要的细节是在 trainable 中实现了保存和加载检查点

import json
import os
import tempfile

from ray import tune


def trainable(config):
    # Checkpoint loading
    checkpoint = tune.get_checkpoint()
    start = 1
    if checkpoint:
        with checkpoint.as_directory() as checkpoint_dir:
            with open(os.path.join(checkpoint_dir, "checkpoint.json"), "r") as f:
                state = json.load(f)
        start = state["epoch"] + 1

    for epoch in range(start, config["num_epochs"]):
        # Do some training...

        # Checkpoint saving
        with tempfile.TemporaryDirectory() as temp_checkpoint_dir:
            with open(os.path.join(temp_checkpoint_dir, "checkpoint.json"), "w") as f:
                json.dump({"epoch": epoch}, f)
            tune.report(
                {"epoch": epoch},
                checkpoint=tune.Checkpoint.from_directory(temp_checkpoint_dir),
            )


tuner = tune.Tuner(
    trainable,
    param_space={"num_epochs": 10},
    run_config=tune.RunConfig(
        storage_path=os.path.expanduser("~/ray_results"),
        name="tune_fault_tolerance_guide",
    ),
)
result_grid = tuner.fit()

实验的结果和检查点将保存到 ~/ray_results/tune_fault_tolerance_guide,这是由 RunConfig 配置的。如果实验因上述原因之一而中断,请使用此路径恢复

tuner = tune.Tuner.restore(
    os.path.expanduser("~/ray_results/tune_fault_tolerance_guide"),
    trainable=trainable,
    resume_errored=True,
)
tuner.fit()

提示

您还可以从云存储桶路径恢复实验

tuner = tune.Tuner.restore(
    path="s3://cloud-bucket/tune_fault_tolerance_guide", trainable=trainable
)

请参阅 Ray Tune 中的持久化存储配置方法

恢复配置#

Tune 允许根据实验中断时的状态来配置要恢复的 trials。

  • 默认情况下,处于 RUNNING 状态的未完成 trials 将被恢复。

  • 处于 ERRORED 状态的 trials 可以被恢复或从头开始重试。

  • TERMINATED 状态的 trials无法恢复。

tuner = tune.Tuner.restore(
    os.path.expanduser("~/ray_results/tune_fault_tolerance_guide"),
    trainable=trainable,
    resume_errored=True,
    restart_errored=False,
    resume_unfinished=True,
)

自动恢复#

在生产环境中运行时,可能需要一个单一脚本,该脚本(1)在开始时启动初始训练运行,(2)如果(1)已发生则恢复实验。

使用 Tuner.can_restore 工具来完成此操作。

import os
from ray import tune

storage_path = os.path.expanduser("~/ray_results")
exp_name = "tune_fault_tolerance_guide"
path = os.path.join(storage_path, exp_name)

if tune.Tuner.can_restore(path):
    tuner = tune.Tuner.restore(path, trainable=trainable, resume_errored=True)
else:
    tuner = tune.Tuner(
        trainable,
        param_space={"num_epochs": 10},
        run_config=tune.RunConfig(storage_path=storage_path, name=exp_name),
    )
tuner.fit()

首次运行此脚本将启动初始训练运行。第二次运行此脚本将尝试从第一次运行的输出中恢复。

使用 Ray 对象引用进行 Tune 实验恢复(高级)#

实验恢复通常发生在与原始运行不同的 Ray 会话中,在这种情况下,Ray 对象引用会被自动垃圾回收。如果对象引用与实验状态一起保存(例如,保存在每个 trial 的配置中),那么在恢复后尝试检索这些对象将无法正常工作:这些引用指向的对象已不再存在。

为了解决这个问题,您必须重新创建这些对象,将它们放入 Ray 对象存储,然后将新的对象引用传递给 Tune。

示例#

假设我们有一些大型预训练模型,我们希望在训练循环中以某种方式使用它们。例如,这可能是一个用于计算 Inception Score 来评估生成模型质量的图像分类模型。我们可能有多个模型要进行调优,每个 trial 都采样一个模型来使用。

import ray
from ray import tune


class LargeModel:
    def __init__(self, model_id):
        self.model_id = model_id
        # Load weights based on the `model_id`...


def train_fn(config):
    # Retrieve the model from the object store.
    model = ray.get(config["model_ref"])
    print(model.model_id)


# These models may be large, so `ray.put` them in the Ray Object Store
# to share the models between trials.
model_refs = [ray.put(LargeModel(1)), ray.put(LargeModel(2))]

tuner = tune.Tuner(
    train_fn,
    # Tune over the object references!
    param_space={"model_ref": tune.grid_search(model_refs)},
    run_config=tune.RunConfig(
        storage_path=os.path.expanduser("~/ray_results"), name="restore_object_refs"
    ),
)
tuner.fit()

为了恢复,我们只需要通过 Tuner.restore 重新指定 param_space

# Re-create the objects and put them in the object store.
param_space = {
    "model_ref": tune.grid_search([ray.put(LargeModel(1)), ray.put(LargeModel(2))])
}

tuner = tune.Tuner.restore(
    os.path.expanduser("~/ray_results/restore_object_refs"),
    trainable=train_fn,
    # Re-specify the `param_space` to update the object references.
    param_space=param_space,
    resume_errored=True,
)
tuner.fit()

注意

如果您正在调优 Ray Data,您也需要在 param_space 中重新指定它们。Ray Data 可能包含对象引用,因此上述相同的问题也适用。

请参阅下面的示例。

ds_1 = ray.data.from_items([{"x": i, "y": 2 * i} for i in range(128)])
ds_2 = ray.data.from_items([{"x": i, "y": 3 * i} for i in range(128)])

param_space = {
    "datasets": {"train": tune.grid_search([ds_1, ds_2])},
}

tuner = tune.Tuner.restore(..., param_space=param_space)

Tune 中的 Trial 级容错#

Trial 级容错处理集群中单个 trial 的故障,这些故障可能由以下原因引起:

  • 使用可抢占的竞价实例进行运行。

  • 短暂的网络连接问题。

  • 节点内存不足或磁盘空间不足。

Ray Tune 提供了一种使用 FailureConfig 配置单个 trials 的故障处理的方法。

假设我们正在使用上一个示例中实现了 trial 检查点保存和加载的 trainable,以下是如何配置 FailureConfig

from ray import tune

tuner = tune.Tuner(
    trainable,
    param_space={"num_epochs": 10},
    run_config=tune.RunConfig(
        storage_path=os.path.expanduser("~/ray_results"),
        name="trial_fault_tolerance",
        failure_config=tune.FailureConfig(max_failures=3),
    ),
)
tuner.fit()

当一个 trial 遇到运行时错误时,上述配置将最多重试该 trial max_failures=3 次。

同样,如果发生节点 X 的节点故障(例如,被抢占或连接丢失),此配置将最多重试位于节点 X 上的所有 trials 3 次。

总结#

在本用户指南中,我们介绍了如何在 Ray Tune 中启用实验级和 trial 级容错。

有关更多信息,请参阅以下资源: