在 Ray Tune 中导入和导出数据#

通常,您会发现需要将数据传递给 Tune 的 Trainables(数据集、模型、其他大型参数),并从中获取数据(指标、检查点、其他工件)。在本指南中,我们将探讨不同的方法,并了解它们适用的场景。

让我们从定义一个简单的 Trainable 函数开始。随着教程的进行,我们将为该函数添加不同的功能。

import random
import time
import pandas as pd


def training_function(config):
    # For now, we have nothing here.
    data = None
    model = {"hyperparameter_a": None, "hyperparameter_b": None}
    epochs = 0

    # Simulate training & evaluation - we obtain back a "metric" and a "trained_model".
    for epoch in range(epochs):
        # Simulate doing something expensive.
        time.sleep(1)
        metric = (0.1 + model["hyperparameter_a"] * epoch / 100) ** (
            -1
        ) + model["hyperparameter_b"] * 0.1 * data["A"].sum()
        trained_model = {"state": model, "epoch": epoch}

我们的 training_function 函数需要一个 pandas DataFrame、一个带有某些超参数的模型以及训练模型的 epoch 数作为输入。模型的超参数会影响返回的指标,并且在每个 epoch(训练迭代)中,trained_model 的状态都会发生变化。

我们将使用 Tuner API 来运行超参数优化。

from ray.tune import Tuner
from ray import tune

tuner = Tuner(training_function, tune_config=tune.TuneConfig(num_samples=4))

将数据导入 Tune#

首先要解决的问题是为 Trainable 提供输入。我们可以将它们大致分为两类:变量和常量。

变量是我们想要调整的参数。它们对于每个 Trial 都是不同的。例如,对于神经网络,它们可能是学习率和批量大小;对于随机森林,它们可能是树的数量和最大深度;如果您将 Tune 用作批处理训练的执行引擎,则可能是数据分区。

常量是对每个 Trial 都相同的参数。这些可以是 epoch 的数量、我们想设置但不调整的模型超参数、数据集等等。通常,常量会相当大(例如,数据集或模型)。

警告

training_function 的外部作用域中的对象也会被自动序列化并发送到 Trial Actors,这可能会导致意外行为。例如,全局锁可能不起作用(因为每个 Actor 都操作副本)或出现与序列化相关的通用错误。最佳实践是不要在 training_function 中引用任何外部作用域的对象。

通过搜索空间将数据导入 Tune 运行#

注意

简而言之,使用 param_space 参数来指定小型、可序列化的常量和变量。

导入 Trainables 的第一种方法是使用 搜索空间(也可能称为 参数空间配置)。在 Trainable 内部,它映射到作为函数参数传递的 config 字典。您使用 Tunerparam_space 参数来定义搜索空间。搜索空间是一个字典,可以由 分布(为每个 Trial 采样一个不同的值)或常量值组成。搜索空间可以包含嵌套字典,而这些嵌套字典又可以包含分布。

警告

搜索空间中的每个值都会直接保存在 Trial 元数据中。这意味着搜索空间中的每个值必须是可序列化的,并且占用少量内存。

例如,将大型 pandas DataFrame 或不可序列化的模型对象作为搜索空间中的值传递将导致不期望的行为。最多会导致巨大的性能下降和磁盘空间使用,因为保存到磁盘的 Trial 元数据也将包含这些数据。最坏的情况下,会引发异常,因为数据无法发送到 Trial worker。有关更多详细信息,请参阅 如何避免瓶颈?

相反,请使用字符串或其他标识符作为您的值,并在 Trainable 内部直接根据这些值初始化/加载对象。

注意

数据集 可以直接用作搜索空间中的值。

在我们的示例中,我们想调整两个模型超参数。我们还想设置 epoch 的数量,以便以后可以轻松进行调整。对于超参数,我们将使用 tune.uniform 分布。我们还将修改 training_function 以从 config 字典中获取这些值。

def training_function(config):
    # For now, we have nothing here.
    data = None

    model = {
        "hyperparameter_a": config["hyperparameter_a"],
        "hyperparameter_b": config["hyperparameter_b"],
    }
    epochs = config["epochs"]

    # Simulate training & evaluation - we obtain back a "metric" and a "trained_model".
    for epoch in range(epochs):
        # Simulate doing something expensive.
        time.sleep(1)
        metric = (0.1 + model["hyperparameter_a"] * epoch / 100) ** (
            -1
        ) + model["hyperparameter_b"] * 0.1 * data["A"].sum()
        trained_model = {"state": model, "epoch": epoch}


tuner = Tuner(
    training_function,
    param_space={
        "hyperparameter_a": tune.uniform(0, 20),
        "hyperparameter_b": tune.uniform(-100, 100),
        "epochs": 10,
    },
)

使用 tune.with_parameters 在 Tune 运行中访问数据#

注意

简而言之,使用 tune.with_parameters 工具函数来指定大型常量参数。

如果我们具有对所有 Trial 保持不变的大型对象,我们可以使用 tune.with_parameters 工具将它们直接传递给 Trainable。这些对象将存储在 Ray 对象存储 中,以便每个 Trial worker 都可以访问它们以获取本地副本在其进程中使用。

提示

放入 Ray 对象存储的对象必须是可序列化的。

请注意,大型对象的序列化(一次)和反序列化(每个 Trial)可能会产生性能开销。

在我们的示例中,我们将使用 tune.with_parameters 来传递 data DataFrame。为此,我们需要修改函数签名,将 data 作为参数包含在内。

def training_function(config, data):
    model = {
        "hyperparameter_a": config["hyperparameter_a"],
        "hyperparameter_b": config["hyperparameter_b"],
    }
    epochs = config["epochs"]

    # Simulate training & evaluation - we obtain back a "metric" and a "trained_model".
    for epoch in range(epochs):
        # Simulate doing something expensive.
        time.sleep(1)
        metric = (0.1 + model["hyperparameter_a"] * epoch / 100) ** (
            -1
        ) + model["hyperparameter_b"] * 0.1 * data["A"].sum()
        trained_model = {"state": model, "epoch": epoch}


tuner = Tuner(
    training_function,
    param_space={
        "hyperparameter_a": tune.uniform(0, 20),
        "hyperparameter_b": tune.uniform(-100, 100),
        "epochs": 10,
    },
)

下一步是使用 tune.with_parameters 来包装 training_function,然后再将其传递给 Tunertune.with_parameters 调用中的每个关键字参数都将映射到 Trainable 签名中的关键字参数。

data = pd.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]})

tuner = Tuner(
    tune.with_parameters(training_function, data=data),
    param_space={
        "hyperparameter_a": tune.uniform(0, 20),
        "hyperparameter_b": tune.uniform(-100, 100),
        "epochs": 10,
    },
    tune_config=tune.TuneConfig(num_samples=4),
)

在 Tune Trainable 中加载数据#

您也可以直接在 Trainable 中从例如云存储、共享文件存储(如 NFS)或 Trainable worker 的本地磁盘加载数据。

警告

从磁盘加载时,请确保集群中的所有节点都能访问您要加载的文件。

一个常见的用例是使用 pandas、arrow 或任何其他框架从 S3 或任何其他云存储加载数据集。

Trainable worker 的工作目录将自动更改为相应的 Trial 目录。有关更多详细信息,请参阅 如何在 Tune 训练函数中访问相对文件路径?

现在可以运行我们的调优运行了,尽管我们还不会获得任何有意义的输出来。

results = tuner.fit()

从 Ray Tune 中导出数据#

现在我们可以使用 training_function Trainable 来运行我们的调优运行。下一步是将*指标*报告给 Tune,这些指标可用于指导优化。我们还希望*检查点*我们的训练模型,以便在中断后可以恢复训练,并稍后用于预测。

ray.tune.report API 用于从 Trainable workers 导出数据。它可以在 Trainable 函数中调用多次。每次调用对应一次训练迭代(epoch、step、tree)。

使用 Tune 报告指标#

*指标*是使用 tune.report 调用中的 metrics 参数传递的值。Tune 的 搜索算法调度器 可以使用指标来指导搜索。调优运行完成后,您可以 分析结果,其中包括报告的指标。

注意

与搜索空间值类似,作为指标报告的每个值都将直接保存在 Trial 元数据中。这意味着作为指标报告的每个值必须是可序列化的,并且占用少量内存。

注意

Tune 会自动包含一些指标,例如训练迭代、时间戳等。有关完整列表,请参阅 此处

在我们的示例中,我们想最大化 metric。我们将在每个 epoch 将其报告给 Tune,并在 tune.TuneConfig 中设置 metricmode 参数,以便 Tune 知道它应该将其用作优化目标。

from ray import tune


def training_function(config, data):
    model = {
        "hyperparameter_a": config["hyperparameter_a"],
        "hyperparameter_b": config["hyperparameter_b"],
    }
    epochs = config["epochs"]

    # Simulate training & evaluation - we obtain back a "metric" and a "trained_model".
    for epoch in range(epochs):
        # Simulate doing something expensive.
        time.sleep(1)
        metric = (0.1 + model["hyperparameter_a"] * epoch / 100) ** (
            -1
        ) + model["hyperparameter_b"] * 0.1 * data["A"].sum()
        trained_model = {"state": model, "epoch": epoch}
        tune.report(metrics={"metric": metric})


tuner = Tuner(
    tune.with_parameters(training_function, data=data),
    param_space={
        "hyperparameter_a": tune.uniform(0, 20),
        "hyperparameter_b": tune.uniform(-100, 100),
        "epochs": 10,
    },
    tune_config=tune.TuneConfig(num_samples=4, metric="metric", mode="max"),
)

使用 Tune 回调函数记录指标#

使用 tune.report 记录的每个指标都可以在调优运行期间通过 Tune 的 回调函数 访问。Ray Tune 提供了与 MLFlow、Weights & Biases、CometML 等流行框架的 多种内置集成。您还可以使用 Callback API 创建自己的回调函数。

回调函数通过 TunerRunConfigcallback 参数传递。

在我们的示例中,我们将使用 MLFlow 回调函数来跟踪我们的调优运行进度以及 metric 的变化值(需要安装 mlflow)。

import ray.tune
from ray.tune.logger.mlflow import MLflowLoggerCallback


def training_function(config, data):
    model = {
        "hyperparameter_a": config["hyperparameter_a"],
        "hyperparameter_b": config["hyperparameter_b"],
    }
    epochs = config["epochs"]

    # Simulate training & evaluation - we obtain back a "metric" and a "trained_model".
    for epoch in range(epochs):
        # Simulate doing something expensive.
        time.sleep(1)
        metric = (0.1 + model["hyperparameter_a"] * epoch / 100) ** (
            -1
        ) + model["hyperparameter_b"] * 0.1 * data["A"].sum()
        trained_model = {"state": model, "epoch": epoch}
        tune.report(metrics={"metric": metric})


tuner = tune.Tuner(
    tune.with_parameters(training_function, data=data),
    param_space={
        "hyperparameter_a": tune.uniform(0, 20),
        "hyperparameter_b": tune.uniform(-100, 100),
        "epochs": 10,
    },
    tune_config=tune.TuneConfig(num_samples=4, metric="metric", mode="max"),
    run_config=tune.RunConfig(
        callbacks=[MLflowLoggerCallback(experiment_name="example")]
    ),
)

使用检查点和其他工件从 Tune 中导出数据#

除了指标之外,您可能还需要保存训练模型的状态和任何其他工件,以便在训练失败时能够恢复,并进行进一步的检查和使用。这些不能保存为指标,因为它们通常太大,并且可能不容易序列化。最后,它们应该持久化到磁盘或云存储,以便在 Tune 运行中断或终止后仍能访问。

Ray Train 为此目的提供了一个 Checkpoint API。 Checkpoint 对象可以从各种来源创建(字典、目录、云存储)。

在 Ray Tune 中,Checkpoints 由用户在其 Trainable 函数中创建,并通过 tune.report 的可选 checkpoint 参数报告。Checkpoints 可以包含任意数据,并且可以在 Ray 集群中自由传递。调优运行结束后,可以从结果中 获取``Checkpoints

Ray Tune 可以配置为 自动将检查点同步到云存储,仅保留一定数量的检查点以节省空间(通过 ray.tune.CheckpointConfig),等等。

注意

实验状态本身会单独进行检查点。有关更多详细信息,请参阅 附录:Tune 存储的数据类型

在我们的示例中,我们希望能够从最后一个检查点恢复训练,并在每次迭代时将 trained_model 保存到检查点。为了实现这一点,我们将使用 sessionCheckpoint API。

import os
import pickle
import tempfile

from ray import tune

def training_function(config, data):
    model = {
        "hyperparameter_a": config["hyperparameter_a"],
        "hyperparameter_b": config["hyperparameter_b"],
    }
    epochs = config["epochs"]

    # Load the checkpoint, if there is any.
    checkpoint = tune.get_checkpoint()
    start_epoch = 0
    if checkpoint:
        with checkpoint.as_directory() as checkpoint_dir:
            with open(os.path.join(checkpoint_dir, "model.pkl"), "rb") as f:
                checkpoint_dict = pickle.load(f)
        start_epoch = checkpoint_dict["epoch"] + 1
        model = checkpoint_dict["state"]

    # Simulate training & evaluation - we obtain back a "metric" and a "trained_model".
    for epoch in range(start_epoch, epochs):
        # Simulate doing something expensive.
        time.sleep(1)
        metric = (0.1 + model["hyperparameter_a"] * epoch / 100) ** (
            -1
        ) + model["hyperparameter_b"] * 0.1 * data["A"].sum()

        checkpoint_dict = {"state": model, "epoch": epoch}

        # Create the checkpoint.
        with tempfile.TemporaryDirectory() as temp_checkpoint_dir:
            with open(os.path.join(temp_checkpoint_dir, "model.pkl"), "wb") as f:
                pickle.dump(checkpoint_dict, f)
            tune.report(
                {"metric": metric},
                checkpoint=tune.Checkpoint.from_directory(temp_checkpoint_dir),
            )


tuner = tune.Tuner(
    tune.with_parameters(training_function, data=data),
    param_space={
        "hyperparameter_a": tune.uniform(0, 20),
        "hyperparameter_b": tune.uniform(-100, 100),
        "epochs": 10,
    },
    tune_config=tune.TuneConfig(num_samples=4, metric="metric", mode="max"),
    run_config=tune.RunConfig(
        callbacks=[MLflowLoggerCallback(experiment_name="example")]
    ),
)

实施所有这些更改后,我们现在可以运行我们的调优并获得有意义的指标和工件。

results = tuner.fit()
results.get_dataframe()
2022-11-30 17:40:28,839 INFO tune.py:762 -- Total run time: 15.79 seconds (15.65 seconds for the tuning loop).
metric time_this_iter_s should_checkpoint done timesteps_total episodes_total training_iteration trial_id experiment_id date ... hostname node_ip time_since_restore timesteps_since_restore iterations_since_restore warmup_time config/epochs config/hyperparameter_a config/hyperparameter_b logdir
0 -58.399962 1.015951 True False NaN NaN 10 0b239_00000 acf38c19d59c4cf2ad7955807657b6ea 2022-11-30_17-40-26 ... ip-172-31-43-110 172.31.43.110 10.282120 0 10 0.003541 10 18.065981 -98.298928 /home/ubuntu/ray_results/training_function_202...
1 -24.461518 1.030420 True False NaN NaN 10 0b239_00001 5ca9e03d7cca46a7852cd501bc3f7b38 2022-11-30_17-40-28 ... ip-172-31-43-110 172.31.43.110 10.362581 0 10 0.004031 10 1.544918 -47.741455 /home/ubuntu/ray_results/training_function_202...
2 18.510299 1.034228 True False NaN NaN 10 0b239_00002 aa38dd786c714486a8d69fa5b372df48 2022-11-30_17-40-28 ... ip-172-31-43-110 172.31.43.110 10.333781 0 10 0.005286 10 8.129285 28.846415 /home/ubuntu/ray_results/training_function_202...
3 -16.138780 1.020072 True False NaN NaN 10 0b239_00003 5b401e15ab614332b631d552603a8d77 2022-11-30_17-40-28 ... ip-172-31-43-110 172.31.43.110 10.242707 0 10 0.003809 10 17.982020 -27.867871 /home/ubuntu/ray_results/training_function_202...

4 rows × 23 columns

检查点、指标和每个 trial 的日志目录都可以通过 Tune 实验的 ResultGrid 输出进行访问。有关如何与返回的 ResultGrid 交互的更多信息,请参阅 分析 Tune 实验结果

运行结束后如何访问 Tune 结果?#

完成 Python 会话运行后,您仍然可以访问结果和检查点。默认情况下,Tune 会将实验结果保存到 ~/ray_results 本地目录。您可以将 Tune 配置为也将其持久化到云端。有关如何配置持久化实验结果的存储选项的更多信息,请参阅 如何配置 Ray Tune 中的持久化存储

您可以通过调用 Tuner.restore(path_or_cloud_uri, trainable) 来恢复 Tune 实验,其中 path_or_cloud_uri 指向保存实验的文件系统或云上的位置。在 Tuner 恢复后,您可以通过调用 Tuner.get_results() 来访问结果和检查点,以获取 ResultGrid 对象,然后按照上一节的说明进行操作。