使用 Ray Tune 进行超参数调优#

重要提示

本用户指南展示了如何集成 Ray Train 和 Ray Tune,为 Ray 2.43 版本开始推出的改版 Ray Train V2 调整分布式超参数运行。这可以通过启用环境变量 RAY_TRAIN_V2_ENABLED=1 来实现。本用户指南假设该环境变量已启用。

有关弃用和迁移的信息,请参阅 此处

Ray Train 可以与 Ray Tune 结合使用,对分布式训练运行进行超参数搜索。当您希望在将最佳性能的超参数在所有可用集群资源上长时间运行之前,对关键超参数进行小范围搜索时,这通常很有用。

快速入门#

在下面的示例中

  • Tuner 启动调优作业,该作业以不同的超参数配置运行 train_driver_fn 的试验。

  • train_driver_fn,它(1)接收超参数配置,(2)实例化一个 TorchTrainer(或任何其他框架的 trainer),以及(3)启动分布式训练作业。

  • ScalingConfig 定义了单个 Ray Train 运行的训练工作节点数和每个工作节点的资源。

  • train_fn_per_worker 是在每个试验的分布式训练工作节点上执行的 Python 代码。

import random
import tempfile
import uuid

import ray.train
import ray.train.torch
import ray.tune
from ray.tune.integration.ray_train import TuneReportCallback


# [1] Define your Ray Train worker code.
def train_fn_per_worker(train_loop_config: dict):
    # Unpack train worker hyperparameters.
    # Train feeds in the `train_loop_config` defined below.
    lr = train_loop_config["lr"]

    # training code here...
    print(
        ray.train.get_context().get_world_size(),
        ray.train.get_context().get_world_rank(),
        train_loop_config,
    )
    # model = ray.train.torch.prepare_model(...)  # Wrap model in DDP.
    with tempfile.TemporaryDirectory() as temp_checkpoint_dir:
        ray.train.report(
            {"loss": random.random()},
            checkpoint=ray.train.Checkpoint.from_directory(temp_checkpoint_dir),
        )


# [2] Define a function that launches the Ray Train run.
def train_driver_fn(config: dict):
    # Unpack run-level hyperparameters.
    # Tune feeds in hyperparameters defined in the `param_space` below.
    num_workers = config["num_workers"]

    trainer = ray.train.torch.TorchTrainer(
        train_fn_per_worker,
        train_loop_config=config["train_loop_config"],
        scaling_config=ray.train.ScalingConfig(
            num_workers=num_workers,
            # Uncomment to use GPUs.
            # use_gpu=True,
        ),
        run_config=ray.train.RunConfig(
            # [3] Assign unique names to each run.
            # Recommendation: use the trial id as part of the run name.
            name=f"train-trial_id={ray.tune.get_context().get_trial_id()}",
            # [4] (Optional) Pass in a `TuneReportCallback` to propagate
            # reported results to the Tuner.
            callbacks=[TuneReportCallback()],
            # (If multi-node, configure S3 / NFS as the storage path.)
            # storage_path="s3://...",
        ),
    )
    trainer.fit()


# Launch a single Train run.
# Note that you can only create a TuneReportCallback in a Ray Tune session.
# train_driver_fn({"num_workers": 4, "train_loop_config": {"lr": 1e-3}})


# Launch a sweep of hyperparameters with Ray Tune.
tuner = ray.tune.Tuner(
    train_driver_fn,
    param_space={
        "num_workers": ray.tune.choice([2, 4]),
        "train_loop_config": {
            "lr": ray.tune.grid_search([1e-3, 3e-4]),
            "batch_size": ray.tune.grid_search([32, 64]),
        },
    },
    run_config=ray.tune.RunConfig(
        name=f"tune_train_example-{uuid.uuid4().hex[:6]}",
        # (If multi-node, configure S3 / NFS as the storage path.)
        # storage_path="s3://...",
    ),
    # [5] (Optional) Set the maximum number of concurrent trials
    # in order to prevent too many Train driver processes from
    # being launched at once.
    tune_config=ray.tune.TuneConfig(max_concurrent_trials=2),
)
results = tuner.fit()

print(results.get_best_result(metric="loss", mode="min"))

Ray Tune 提供什么?#

Ray Tune 提供以下实用工具:

本用户指南仅关注 Ray Train 和 Ray Tune 之间的集成层。有关如何使用 Ray Tune 的更多详细信息,请参阅 Ray Tune 文档

为多个试验配置资源#

Ray Tune 启动多个试验,这些试验 在远程 Ray actor 中运行用户定义的函数,每个试验获得不同的采样超参数配置。

当单独使用 Ray Tune 时,试验直接在 Ray actor 中进行计算。例如,每个试验可以请求 1 个 GPU,并在远程 actor 本身中进行单进程模型训练。当在 Ray Tune 函数中使用 Ray Train 时,Tune 试验实际上并没有在此 actor 中进行大量计算——相反,它仅充当一个驱动进程,用于在别处运行的 Ray Train 工作节点进行启动和监控。

Ray Train 通过 ScalingConfig 请求自己的资源。有关更多详细信息,请参阅 配置规模和 GPU

../../_images/train_without_tune.png

一个单独的 Ray Train 运行,用于展示下一张图中的 Ray Tune 如何仅仅为这个进程树增加了一层层次结构。#

../../_images/train_tune_interop.png

从 Ray Tune 试验中启动 Ray Train 运行的示例。#

限制并发 Ray Train 运行的数量#

Ray Train 运行只有在所有工作节点的资源能够一次性获取时才能启动。这意味着多个启动 Train 运行的 Tune 试验将争夺 Ray 集群中的可用逻辑资源。

如果存在像 GPU 这样的限制性集群资源,那么就不可能同时运行所有超参数配置的训练。由于集群只有足够的资源来同时运行少量试验,因此请在 Tuner 上设置 tune.TuneConfig(max_concurrent_trials) 来限制“进行中”的 Train 运行的数量,以避免任何试验因资源不足而受阻。

# For a fixed size cluster, calculate this based on the limiting resource (ex: GPUs).
total_cluster_gpus = 8
num_gpu_workers_per_trial = 4
max_concurrent_trials = total_cluster_gpus // num_gpu_workers_per_trial


def train_driver_fn(config: dict):
    trainer = ray.train.torch.TorchTrainer(
        train_fn_per_worker,
        scaling_config=ray.train.ScalingConfig(
            num_workers=num_gpu_workers_per_trial, use_gpu=True
        ),
    )
    trainer.fit()


tuner = ray.tune.Tuner(
    train_driver_fn,
    tune_config=ray.tune.TuneConfig(max_concurrent_trials=max_concurrent_trials),
)

作为一个具体的例子,考虑一个固定大小的集群,拥有 128 个 CPU 和 8 个 GPU。

  • Tuner(param_space) 通过网格搜索扫描 4 个超参数配置:param_space={“train_loop_config”: {“batch_size”: tune.grid_search([8, 16, 32, 64])}}

  • 每个 Ray Train 运行都配置为使用 4 个 GPU 工作节点进行训练:ScalingConfig(num_workers=4, use_gpu=True)。由于只有 8 个 GPU,一次最多只能获取全部资源的 2 个 Train 运行。

  • 然而,由于集群中有很多 CPU 可用,因此可以立即启动总共 4 个 Ray Tune 试验(默认请求 1 个 CPU)。这会导致启动 2 个额外的 Ray Tune 试验进程,尽管它们内部的 Ray Train 运行只是等待资源,直到其他试验完成。当 Train 等待资源时,这会产生一些冗余的日志消息。如果总的超参数配置数量很大,也可能会出现过多的 Ray Tune 试验进程。

  • 为了解决这个问题,请设置 Tuner(tune_config=tune.TuneConfig(max_concurrent_trials=2))。现在,一次最多只有两个 Ray Tune 试验进程在运行。这个数字可以根据限制性的集群资源和每个试验所需的资源量来计算。

高级:设置 Train 驱动程序资源#

默认的 Train 驱动程序作为具有 1 个 CPU 的 Ray Tune 函数运行。Ray Tune 将这些函数调度到集群中任何具有可用逻辑 CPU 资源的节点上运行。

建议:如果您正在启动长时间运行的训练作业或使用 Spot 实例,这些充当 Ray Train 驱动进程的 Tune 函数应运行在“安全节点”上,这些节点发生故障的风险较低。例如,它们不应被调度到可抢占的 Spot 实例上运行,并且不应与训练工作节点共置。这可以是集群的头节点或专用 CPU 节点。

这是因为 Ray Train 驱动进程负责处理工作进程的容错,而工作进程更容易出错。运行 Train 工作节点的节点可能由于 Spot 抢占或其他由用户定义的模型训练代码引起的错误而崩溃。

  • 如果一个 Train 工作节点崩溃,位于不同节点上仍然存活的 Ray Train 驱动进程可以优雅地处理错误。

  • 另一方面,如果驱动进程崩溃,那么所有 Ray Train 工作节点将非优雅地退出,并且一些运行状态可能无法完全提交。

实现此行为的一种方法是为特定类型的节点设置自定义资源,并配置 Tune 函数来请求这些资源。

# Cluster setup:
# head_node:
#     resources:
#         CPU: 16.0
# worker_node_cpu:
#     resources:
#         CPU: 32.0
#         TRAIN_DRIVER_RESOURCE: 1.0
# worker_node_gpu:
#     resources:
#         GPU: 4.0

import ray.tune


def train_driver_fn(config):
    # trainer = TorchTrainer(...)
    ...


tuner = ray.tune.Tuner(
    ray.tune.with_resources(
        train_driver_fn,
        # Note: 0.01 is an arbitrary value to schedule the actor
        # onto the `worker_node_cpu` node type.
        {"TRAIN_DRIVER_RESOURCE": 0.01},
    ),
)

报告指标和检查点#

Ray Train 和 Ray Tune 都提供了工具来帮助通过 ray.train.reportray.tune.report API 上传和跟踪检查点。有关更多详细信息,请参阅 保存和加载检查点 用户指南。

如果 Ray Train 工作节点报告了检查点,那么在 Train 驱动程序级别保存另一个 Ray Tune 检查点是不必要的,因为它不包含任何额外的训练状态。Ray Train 驱动进程将已经定期将其状态快照到配置的 storage_path,这在关于容错的下一节中有更详细的描述。

为了从 Tuner 输出中访问检查点,您可以将检查点路径作为指标追加。提供的 TuneReportCallback 通过将报告的 Ray Train 结果传播到 Ray Tune 来实现这一点,其中检查点路径被附加为一个单独的指标。

高级:容错#

如果运行 Ray Train 驱动进程的 Ray Tune 试验崩溃,您可以通过以下方式在 Ray Tune 端启用试验容错:ray.tune.Tuner(run_config=ray.tune.RunConfig(failure_config))

Ray Train 端的容错是单独配置和处理的。有关更多详细信息,请参阅 处理故障和节点抢占 用户指南。

import tempfile

import ray.tune
import ray.train
import ray.train.torch


def train_fn_per_worker(train_loop_config: dict):
    # [1] Train worker restoration logic.
    checkpoint = ray.train.get_checkpoint()
    if checkpoint:
        with checkpoint.as_directory() as temp_checkpoint_dir:
            # model.load_state_dict(torch.load(...))
            ...

    with tempfile.TemporaryDirectory() as temp_checkpoint_dir:
        # torch.save(...)
        ray.train.report(
            {"loss": 0.1},
            checkpoint=ray.train.Checkpoint.from_directory(temp_checkpoint_dir),
        )


def train_fn_driver(config: dict):
    trainer = ray.train.torch.TorchTrainer(
        train_fn_per_worker,
        run_config=ray.train.RunConfig(
            # [2] Train driver restoration is automatic, as long as
            # the (storage_path, name) remains the same across trial restarts.
            # The easiest way to do this is to attach the trial ID in the name.
            # **Do not include any timestamps or random values in the name.**
            name=f"train-trial_id={ray.tune.get_context().get_trial_id()}",
            # [3] Enable worker-level fault tolerance to gracefully handle
            # Train worker failures.
            failure_config=ray.train.FailureConfig(max_failures=3),
            # (If multi-node, configure S3 / NFS as the storage path.)
            # storage_path="s3://...",
        ),
    )
    trainer.fit()


tuner = ray.tune.Tuner(
    train_fn_driver,
    run_config=ray.tune.RunConfig(
        # [4] Enable trial-level fault tolerance to gracefully handle
        # Train driver process failures.
        failure_config=ray.tune.FailureConfig(max_failures=3)
    ),
)
tuner.fit()

高级:使用 Ray Tune 回调#

Ray Tune 回调应在 Tuner 级别通过 ray.tune.RunConfig(callbacks) 传递。

对于依赖内置或自定义 Ray Tune 回调功能的 Ray Train 用户,可以通过将 Ray Train 作为单个试验 Tune 运行并将回调传递给 Tuner 来使用它们。

如果任何回调功能依赖于报告的指标,请确保将 ray.tune.integration.ray_train.TuneReportCallback 传递给 trainer 的回调,该回调会将结果传播到 Tuner。

import ray.tune
from ray.tune.integration.ray_train import TuneReportCallback
from ray.tune.logger import TBXLoggerCallback


def train_driver_fn(config: dict):
    trainer = TorchTrainer(
        ...,
        run_config=ray.train.RunConfig(..., callbacks=[TuneReportCallback()])
    )
    trainer.fit()


tuner = ray.tune.Tuner(
    train_driver_fn,
    run_config=ray.tune.RunConfig(callbacks=[TBXLoggerCallback()])
)

弃用的 `Tuner(trainer)` API#

直接接受 Ray Train trainer 实例的 Tuner(trainer) API 已在 Ray 2.43 中弃用,并将移除。

动机#

此 API 更改提供了多项优势:

  1. 更好的职责分离:解耦 Ray Train 和 Ray Tune 的职责。

  2. 改进的配置体验:使超参数和运行配置更加明确和灵活。

迁移步骤#

要从旧的 Tuner(trainer) API 迁移到新模式:

  1. 启用环境变量 RAY_TRAIN_V2_ENABLED=1

  2. Tuner(trainer) 替换为基于函数的实现,其中 Ray Train 在 Tune 试验中启动。

  3. 将您的训练逻辑移到一个驱动函数中,Tune 将使用不同的超参数来调用该函数。

附加资源#