将数据导入/导出 Tune#

通常,你会发现需要将数据(数据集、模型、其他大型参数)传递到 Tune Trainable 中,并从中获取数据(指标、检查点、其他artifacts)。本指南将探讨不同的方法,并说明在何种情况下应使用它们。

让我们先定义一个简单的 Trainable 函数。我们将逐步扩展此函数的功能。

import random
import time
import pandas as pd


def training_function(config):
    # For now, we have nothing here.
    data = None
    model = {"hyperparameter_a": None, "hyperparameter_b": None}
    epochs = 0

    # Simulate training & evaluation - we obtain back a "metric" and a "trained_model".
    for epoch in range(epochs):
        # Simulate doing something expensive.
        time.sleep(1)
        metric = (0.1 + model["hyperparameter_a"] * epoch / 100) ** (
            -1
        ) + model["hyperparameter_b"] * 0.1 * data["A"].sum()
        trained_model = {"state": model, "epoch": epoch}

我们的 training_function 函数需要一个 pandas DataFrame、一个带有超参数的模型以及模型训练的 epoch 数量作为输入。模型的超参数会影响返回的指标,并且在每个 epoch(训练迭代)中,trained_model 的状态都会改变。

我们将使用Tuner API运行超参数优化。

from ray.tune import Tuner
from ray import tune

tuner = Tuner(training_function, tune_config=tune.TuneConfig(num_samples=4))

将数据导入 Tune#

首要任务是为 Trainable 提供输入。我们可以将其大致分为两类:变量和常量。

变量是我们想要调优的参数。它们对于每个Trial都会不同。例如,它们可以是神经网络的学习率和批量大小、随机森林的树数量和最大深度,或者在使用 Tune 作为批量训练执行引擎时的数据分区。

常量对于每个 Trial 都是相同的参数。它们可以是 epoch 数量、我们想要设置但不调优的模型超参数、数据集等等。通常,常量会相当大(例如数据集或模型)。

警告

training_function 外部作用域的对象也会自动序列化并发送到 Trial Actors,这可能导致意外行为。例如全局锁不工作(因为每个 Actor 都操作一个副本)或与序列化相关的常见错误。最佳实践是在 training_function 中不要引用任何外部作用域的对象。

通过搜索空间将数据传递到 Tune 运行中#

注意

TL;DR - 使用 param_space 参数来指定小的、可序列化的常量和变量。

将输入传递到 Trainable 中的第一种方法是搜索空间 (search space)(也可称为参数空间 (parameter space)config)。在 Trainable 本身中,它映射到作为参数传递给函数的 config 字典。你使用 Tunerparam_space 参数定义搜索空间。搜索空间是一个字典,可以由分布 (distributions)组成,这些分布将为每个 Trial 采样不同的值,或者由常量值组成。搜索空间可以由嵌套字典组成,这些嵌套字典中也可以包含分布。

警告

搜索空间中的每个值都将直接保存在 Trial 元数据中。这意味着搜索空间中的每个值必须是可序列化的,并且占用少量内存。

例如,将大型 pandas DataFrame 或不可序列化的模型对象作为搜索空间中的值传递会导致意外行为。最好情况下会因 Trial 元数据保存到磁盘时包含这些数据而导致严重的减速和磁盘空间占用。最坏情况下,数据无法发送到 Trial worker,会引发异常。更多详情请参见如何避免瓶颈?

相反,请使用字符串或其他标识符作为值,并根据这些值直接在 Trainable 内部初始化/加载对象。

注意

数据集 (Datasets)可以直接用作搜索空间中的值。

在我们的示例中,我们想调优两个模型超参数。我们还想设置 epoch 的数量,以便之后可以轻松调整。对于超参数,我们将使用 tune.uniform 分布。我们还将修改 training_function 以便从 config 字典中获取这些值。

def training_function(config):
    # For now, we have nothing here.
    data = None

    model = {
        "hyperparameter_a": config["hyperparameter_a"],
        "hyperparameter_b": config["hyperparameter_b"],
    }
    epochs = config["epochs"]

    # Simulate training & evaluation - we obtain back a "metric" and a "trained_model".
    for epoch in range(epochs):
        # Simulate doing something expensive.
        time.sleep(1)
        metric = (0.1 + model["hyperparameter_a"] * epoch / 100) ** (
            -1
        ) + model["hyperparameter_b"] * 0.1 * data["A"].sum()
        trained_model = {"state": model, "epoch": epoch}


tuner = Tuner(
    training_function,
    param_space={
        "hyperparameter_a": tune.uniform(0, 20),
        "hyperparameter_b": tune.uniform(-100, 100),
        "epochs": 10,
    },
)

使用 tune.with_parameters 在 Tune 运行中访问数据#

注意

TL;DR - 使用 tune.with_parameters 工具函数来指定大型常量参数。

如果我们有在所有 Trial 中都相同的较大对象,我们可以使用tune.with_parameters工具函数将它们直接传递到 Trainable 中。这些对象将存储在Ray 对象存储 (object store)中,以便每个 Trial worker 可以访问它们,获取本地副本在其进程中使用。

提示

放入 Ray 对象存储中的对象必须是可序列化的。

注意,大型对象的序列化(一次)和反序列化(每个 Trial)可能会带来性能开销。

在我们的示例中,我们将使用 tune.with_parameters 传递 data DataFrame。为此,我们需要修改函数签名,将 data 作为参数包含进去。

def training_function(config, data):
    model = {
        "hyperparameter_a": config["hyperparameter_a"],
        "hyperparameter_b": config["hyperparameter_b"],
    }
    epochs = config["epochs"]

    # Simulate training & evaluation - we obtain back a "metric" and a "trained_model".
    for epoch in range(epochs):
        # Simulate doing something expensive.
        time.sleep(1)
        metric = (0.1 + model["hyperparameter_a"] * epoch / 100) ** (
            -1
        ) + model["hyperparameter_b"] * 0.1 * data["A"].sum()
        trained_model = {"state": model, "epoch": epoch}


tuner = Tuner(
    training_function,
    param_space={
        "hyperparameter_a": tune.uniform(0, 20),
        "hyperparameter_b": tune.uniform(-100, 100),
        "epochs": 10,
    },
)

下一步是在将 training_function 传递给 Tuner 之前,使用 tune.with_parameters 包装它。tune.with_parameters 调用的每个关键字参数都将映射到 Trainable 签名中的关键字参数。

data = pd.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]})

tuner = Tuner(
    tune.with_parameters(training_function, data=data),
    param_space={
        "hyperparameter_a": tune.uniform(0, 20),
        "hyperparameter_b": tune.uniform(-100, 100),
        "epochs": 10,
    },
    tune_config=tune.TuneConfig(num_samples=4),
)

在 Tune Trainable 中加载数据#

你也可以直接在 Trainable 中从云存储、共享文件存储(如 NFS)或 Trainable worker 的本地磁盘加载数据。

警告

从磁盘加载时,请确保集群中的所有节点都能访问你尝试加载的文件。

一个常见的用例是使用 pandas、arrow 或任何其他框架从 S3 或任何其他云存储加载数据集。

Trainable worker 的工作目录将自动更改为相应的 Trial 目录。更多详情请参见如何在 Tune 训练函数中访问相对文件路径?

我们的调优运行现在可以运行了,尽管我们目前还无法获得任何有意义的输出。

results = tuner.fit()

从 Ray Tune 获取数据#

我们现在可以使用 training_function Trainable 运行我们的调优。下一步是向 Tune 报告可用于指导优化的指标 (metrics)。我们还需要对训练好的模型进行检查点 (checkpoint),以便中断后可以恢复训练,并在之后用于预测。

雷伊.图内.报告 的 API 是用于从 Trainable worker 获取数据。它可以在 Trainable 函数中多次调用。每次调用对应一次训练迭代(epoch、步长、树)。

使用 Tune 报告指标#

指标 (Metrics) 是通过 tune.report 调用中的 metrics 参数传递的值。Tune 搜索算法 (Search Algorithms)调度器 (Schedulers)可以使用指标来指导搜索。调优运行完成后,你可以分析结果,其中包含报告的指标。

注意

与搜索空间值类似,作为指标报告的每个值都将直接保存在 Trial 元数据中。这意味着作为指标报告的每个值必须是可序列化的,并且占用少量内存。

注意

Tune 会自动包含一些指标,例如训练迭代次数、时间戳等。完整列表请参见此处

在我们的示例中,我们希望最大化 metric。我们将在每个 epoch 向 Tune 报告它,并在 tune.TuneConfig 中设置 metricmode 参数,以便让 Tune 知道应将其用作优化目标。

from ray import train


def training_function(config, data):
    model = {
        "hyperparameter_a": config["hyperparameter_a"],
        "hyperparameter_b": config["hyperparameter_b"],
    }
    epochs = config["epochs"]

    # Simulate training & evaluation - we obtain back a "metric" and a "trained_model".
    for epoch in range(epochs):
        # Simulate doing something expensive.
        time.sleep(1)
        metric = (0.1 + model["hyperparameter_a"] * epoch / 100) ** (
            -1
        ) + model["hyperparameter_b"] * 0.1 * data["A"].sum()
        trained_model = {"state": model, "epoch": epoch}
        tune.report(metrics={"metric": metric})


tuner = Tuner(
    tune.with_parameters(training_function, data=data),
    param_space={
        "hyperparameter_a": tune.uniform(0, 20),
        "hyperparameter_b": tune.uniform(-100, 100),
        "epochs": 10,
    },
    tune_config=tune.TuneConfig(num_samples=4, metric="metric", mode="max"),
)

使用 Tune 回调记录指标#

使用 tune.report 记录的每个指标都可以在调优运行期间通过 Tune 回调 (Callbacks)访问。Ray Tune 提供了与流行框架(如 MLFlow、Weights & Biases、CometML 等)的多个内置集成。你也可以使用Callback API创建自己的回调。

回调在 TunerRunConfigcallback 参数中传递。

在我们的示例中,我们将使用 MLFlow 回调来追踪调优运行的进度和 metric 的变化值(需要安装 mlflow)。

import ray.tune
from ray.tune.logger.mlflow import MLflowLoggerCallback


def training_function(config, data):
    model = {
        "hyperparameter_a": config["hyperparameter_a"],
        "hyperparameter_b": config["hyperparameter_b"],
    }
    epochs = config["epochs"]

    # Simulate training & evaluation - we obtain back a "metric" and a "trained_model".
    for epoch in range(epochs):
        # Simulate doing something expensive.
        time.sleep(1)
        metric = (0.1 + model["hyperparameter_a"] * epoch / 100) ** (
            -1
        ) + model["hyperparameter_b"] * 0.1 * data["A"].sum()
        trained_model = {"state": model, "epoch": epoch}
        tune.report(metrics={"metric": metric})


tuner = tune.Tuner(
    tune.with_parameters(training_function, data=data),
    param_space={
        "hyperparameter_a": tune.uniform(0, 20),
        "hyperparameter_b": tune.uniform(-100, 100),
        "epochs": 10,
    },
    tune_config=tune.TuneConfig(num_samples=4, metric="metric", mode="max"),
    run_config=tune.RunConfig(
        callbacks=[MLflowLoggerCallback(experiment_name="example")]
    ),
)

使用检查点和其他 artifacts 从 Tune 获取数据#

除了指标,你可能还需要保存训练好的模型状态和任何其他 artifacts,以便从训练失败中恢复,并进一步检查和使用。这些不能保存为指标,因为它们通常太大且可能不容易序列化。最后,它们应持久化到磁盘或云存储中,以便在 Tune 运行中断或终止后仍可访问。

Ray Train 为此目的提供了检查点 (Checkpoint) API。Checkpoint 对象可以从各种来源创建(字典、目录、云存储)。

在 Ray Tune 中,用户在 Trainable 函数中创建 Checkpoints,并使用 tune.report 的可选参数 checkpoint 进行报告。Checkpoints 可以包含任意数据,并且可以在 Ray 集群中自由传递。调优运行结束后,可以从结果中获取 Checkpoints

Ray Tune 可以配置为自动将检查点同步到云存储,仅保留一定数量的检查点以节省空间(使用ray.tune.CheckpointConfig)等。

注意

实验状态本身是单独检查点的。更多详情请参见附录:Tune 存储的数据类型

在我们的示例中,我们希望能够从最新的检查点恢复训练,并在每次迭代时将 trained_model 保存到检查点中。为了实现这一点,我们将使用 sessionCheckpoint API。

import os
import pickle
import tempfile

from ray import tune

def training_function(config, data):
    model = {
        "hyperparameter_a": config["hyperparameter_a"],
        "hyperparameter_b": config["hyperparameter_b"],
    }
    epochs = config["epochs"]

    # Load the checkpoint, if there is any.
    checkpoint = tune.get_checkpoint()
    start_epoch = 0
    if checkpoint:
        with checkpoint.as_directory() as checkpoint_dir:
            with open(os.path.join(checkpoint_dir, "model.pkl"), "w") as f:
                checkpoint_dict = pickle.load(f)
        start_epoch = checkpoint_dict["epoch"] + 1
        model = checkpoint_dict["state"]

    # Simulate training & evaluation - we obtain back a "metric" and a "trained_model".
    for epoch in range(start_epoch, epochs):
        # Simulate doing something expensive.
        time.sleep(1)
        metric = (0.1 + model["hyperparameter_a"] * epoch / 100) ** (
            -1
        ) + model["hyperparameter_b"] * 0.1 * data["A"].sum()

        checkpoint_dict = {"state": model, "epoch": epoch}

        # Create the checkpoint.
        with tempfile.TemporaryDirectory() as temp_checkpoint_dir:
            with open(os.path.join(temp_checkpoint_dir, "model.pkl"), "w") as f:
                pickle.dump(checkpoint_dict, f)
            tune.report(
                {"metric": metric},
                checkpoint=tune.Checkpoint.from_directory(temp_checkpoint_dir),
            )


tuner = tune.Tuner(
    tune.with_parameters(training_function, data=data),
    param_space={
        "hyperparameter_a": tune.uniform(0, 20),
        "hyperparameter_b": tune.uniform(-100, 100),
        "epochs": 10,
    },
    tune_config=tune.TuneConfig(num_samples=4, metric="metric", mode="max"),
    run_config=tune.RunConfig(
        callbacks=[MLflowLoggerCallback(experiment_name="example")]
    ),
)

实施所有这些更改后,我们现在可以运行调优并获得有意义的指标和 artifacts。

results = tuner.fit()
results.get_dataframe()
2022-11-30 17:40:28,839 INFO tune.py:762 -- Total run time: 15.79 seconds (15.65 seconds for the tuning loop).
metric time_this_iter_s should_checkpoint done timesteps_total episodes_total training_iteration trial_id experiment_id date ... hostname node_ip time_since_restore timesteps_since_restore iterations_since_restore warmup_time config/epochs config/hyperparameter_a config/hyperparameter_b logdir
0 -58.399962 1.015951 True False NaN NaN 10 0b239_00000 acf38c19d59c4cf2ad7955807657b6ea 2022-11-30_17-40-26 ... ip-172-31-43-110 172.31.43.110 10.282120 0 10 0.003541 10 18.065981 -98.298928 /home/ubuntu/ray_results/training_function_202...
1 -24.461518 1.030420 True False NaN NaN 10 0b239_00001 5ca9e03d7cca46a7852cd501bc3f7b38 2022-11-30_17-40-28 ... ip-172-31-43-110 172.31.43.110 10.362581 0 10 0.004031 10 1.544918 -47.741455 /home/ubuntu/ray_results/training_function_202...
2 18.510299 1.034228 True False NaN NaN 10 0b239_00002 aa38dd786c714486a8d69fa5b372df48 2022-11-30_17-40-28 ... ip-172-31-43-110 172.31.43.110 10.333781 0 10 0.005286 10 8.129285 28.846415 /home/ubuntu/ray_results/training_function_202...
3 -16.138780 1.020072 True False NaN NaN 10 0b239_00003 5b401e15ab614332b631d552603a8d77 2022-11-30_17-40-28 ... ip-172-31-43-110 172.31.43.110 10.242707 0 10 0.003809 10 17.982020 -27.867871 /home/ubuntu/ray_results/training_function_202...

4 行 × 23 列

检查点、指标以及每个 trial 的日志目录可以通过 Tune 实验的 ResultGrid 输出访问。有关如何与返回的 ResultGrid 交互的更多信息,请参见分析 Tune 实验结果

完成 Tune 运行后如何访问结果?#

完成 Python 会话运行后,你仍然可以访问结果和检查点。默认情况下,Tune 会将实验结果保存到本地目录 ~/ray_results。你也可以配置 Tune 将结果持久化到云端。有关如何配置存储选项以持久化实验结果的更多信息,请参见如何在 Ray Tune 中配置持久化存储

你可以通过调用Tuner.restore(path_or_cloud_uri, trainable)来恢复 Tune 实验,其中 path_or_cloud_uri 指向实验保存到的文件系统或云端位置。恢复 Tuner 后,你可以通过调用 Tuner.get_results() 获取 ResultGrid 对象,然后按照上一节的说明进行操作,访问结果和检查点。