回调#

回调是向实验注入代码的最直接方法。您可以定义在特定事件下执行的代码,并将其传递给您的 AlgorithmConfig

以下是一个定义简单 lambda 函数的示例,该函数在剧集结束时打印剧集的返回值

from ray.rllib.algorithms.ppo import PPOConfig

ppo = config = (
    PPOConfig()
    .environment("CartPole-v1")
    .callbacks(
        on_episode_end=(
            lambda episode, **kw: print(f"Episode done. R={episode.get_return()}")
        )
    )
    .build()
)
ppo.train()

Callback lambdas 与有状态的 RLlibCallback#

有两种方法可以为各种回调事件定义自定义代码以供执行。

Callback lambdas#

如果注入的代码相当简单,并且不需要存储临时信息以供后续事件调用重用,则可以使用 lambda 函数,并将其作为之前所示的方法传递给 callbacks()

有关完整列表,请参阅 ref:Callback events <rllib-callback-event-overview>。事件的名称始终与 callbacks() 方法的参数名称匹配。

有状态的 RLlibCallback#

如果注入的代码是有状态的,并且会临时存储结果以供同一事件或不同事件触发的后续调用重用,则需要继承 RLlibCallback API,然后实现一个或多个方法,例如 on_algorithm_init()

以下是打印已终止剧集返回值的相同示例,但使用了 RLlibCallback 的子类。

from ray.rllib.algorithms.ppo import PPOConfig
from ray.rllib.callbacks.callbacks import RLlibCallback

class EpisodeReturn(RLlibCallback):
    def __init__(self):
        super().__init__()
        # Keep some global state in between individual callback events.
        self.overall_sum_of_rewards = 0.0

    def on_episode_end(self, *, episode, **kwargs):
        self.overall_sum_of_rewards += episode.get_return()
        print(f"Episode done. R={episode.get_return()} Global SUM={self.overall_sum_of_rewards}")

ppo = (
    PPOConfig()
    .environment("CartPole-v1")
    .callbacks(EpisodeReturn)
    .build()
)
ppo.train()

回调事件#

在训练迭代过程中,算法通常会遍历以下事件树,这是 RLlib 回调系统中所有支持事件的概览。

Algorithm
    .__init__()
        `on_algorithm_init` - After algorithm construction and setup.
    .train()
        `on_train_result` - After a training iteration.
    .evaluate()
        `on_evaluate_start` - Before evaluation starts using the eval ``EnvRunnerGroup``.
        `on_evaluate_end` - After evaluation is finished.
    .restore_from_path()
        `on_checkpoint_loaded` - After a checkpoint's new state has been loaded.

EnvRunner
    .__init__()
        `on_environment_created` - After the RL environment has been created.
    .sample()
        `on_episode_created` - After a new episode object has been created.
        `on_episode_start` - After an episode object has started (after ``env.reset()``).
        `on_episode_step` - After an episode object has stepped (after ``env.step()``).
        `on_episode_end` - After an episode object has terminated (or truncated).
        `on_sample_end` - At the end of the ``EnvRunner.sample()`` call.

请注意,树中的某些事件会同时发生,通过 Ray actor 在不同进程上。例如,EnvRunner actor 可能会触发其 on_episode_start 事件,同时另一个 EnvRunner actor 可能会触发其 on_sample_end 事件,而主 Algorithm 进程会触发 on_train_result

注意

RLlib 仅在 AlgorithmEnvRunner actor 上调用回调。Ray 团队正在考虑将回调扩展到 Learner actor,甚至可能扩展到 RLModule 实例。

RLlibCallback 的 Algorithm 绑定方法

RLlibCallback.on_algorithm_init(*, algorithm)

当新的 Algorithm 实例完成设置后运行的回调。

RLlibCallback.on_evaluate_start(*, algorithm)

评估开始前运行的回调。

RLlibCallback.on_evaluate_end(*, algorithm)

评估完成后运行。

RLlibCallback.on_env_runners_recreated(*, ...)

一个或多个 EnvRunner actor 被重新创建后运行的回调。

RLlibCallback.on_checkpoint_loaded(*, ...)

Algorithm 从检查点加载新状态后运行的回调。

RLlibCallback 的 EnvRunner 绑定方法

RLlibCallback.on_environment_created(*, ...)

创建新环境对象后运行的回调。

RLlibCallback.on_episode_created(*, episode)

创建新剧集(但尚未开始!)后运行的回调。

RLlibCallback.on_episode_start(*, episode[, ...])

剧集开始后立即运行的回调。

RLlibCallback.on_episode_step(*, episode[, ...])

在每个剧集步长(在记录了动作后)调用。

RLlibCallback.on_episode_end(*, episode[, ...])

剧集结束时调用(在记录了 terminated/truncated 后)。

RLlibCallback.on_sample_end(*[, env_runner, ...])

EnvRunner.sample() 结束时调用。

链接回调#

您可以定义多个 RLlibCallback 类,并将它们作为列表传递给 callbacks() 方法。您也可以向该方法的不同参数传递可调用对象的列表,而不是单个可调用对象。

例如,如果您已经编写了一个 RLlibCallback 的子类,并且想在不同实验中重用它。因为您的某个实验需要一些调试回调代码,您只想在短时间内临时注入它,用于几次运行。

链接回调的解析顺序#

RLlib 按如下方式解析给定事件的所有可用回调方法和可调用对象:

RLlibCallback 的子类优先于您通过 callbacks() 方法的各个参数或列表传递的单个或列表可调用对象。

例如,假设回调事件是 on_train_result,它在训练迭代结束时在算法进程内触发。

  • RLlib 循环遍历列表中所有给定的 RLlibCallback 子类,并调用它们的 on_train_result 方法。在此过程中,它保持用户在列表中提供的确切顺序。

  • 然后 RLlib 循环遍历所有定义的 on_train_result 可调用对象列表。您通过调用 callbacks() 方法并在此调用中定义 on_train_result 参数来配置这些。

class MyCallbacks(RLlibCallback):
    def on_train_result(self, *, algorithm, metrics_logger, result, **kwargs):
        print("RLlibCallback subclass")

class MyDebugCallbacks(RLlibCallback):
    def on_train_result(self, *, algorithm, metrics_logger, result, **kwargs):
        print("debug subclass")

# Define the callbacks order through the config.
# Subclasses first, then individual `on_train_result` (or other events) callables:
config.callbacks(
    callbacks_class=[MyDebugCallbacks, MyCallbacks],  # <- note: debug class first
    on_train_result=[
        lambda algorithm, **kw: print('lambda 1'),
        lambda algorithm, **kw: print('lambda 2'),
    ],
)

# When training the algorithm, after each training iteration, you should see
# something like:
# > debug subclass
# > RLlibCallback subclass
# > lambda 1
# > lambda 2

示例#

以下是两个示例,展示了如何在 Algorithm 进程以及 EnvRunner 进程上设置自定义回调。

示例 1: on_train_result#

以下示例演示了如何实现一个简单的自定义函数,该函数会不时地将回放缓冲区内容写入磁盘。

您通常不想将缓冲区内容与您的 Algorithm 检查点一起写入,因此通过自定义回调更少、更受控制地写入可能是一个不错的折衷。

import ormsgpack
from ray.rllib.algorithms.dqn import DQNConfig

def _write_buffer_if_necessary(algorithm, metrics_logger, result):
    # Write the buffer contents only every ith iteration.
    if algorithm.training_iteration % 2 == 0:
        # python dict
        buffer_contents = algorithm.local_replay_buffer.get_state()

        # binary
        msgpacked = ormsgpack.packb(
           buffer_contents,
           option=ormsgpack.OPT_SERIALIZE_NUMPY,
        )

        # Open some file and write the buffer contents into it using `ormsgpack`.
        with open("replay_buffer_contents.msgpack", "wb") as f:
           f.write(msgpacked)

config = (
    DQNConfig()
    .environment("CartPole-v1")
    .callbacks(
       on_train_result=_write_buffer_if_necessary,
    )
)
dqn = config.build()

# Train n times. Expect RLlib to write buffer every ith iteration.
for _ in range(4):
    print(dqn.train())

有关所有可用回调的确切调用签名以及它们期望的参数类型,请参阅 在 Algorithm 中调用的回调

示例 2: on_episode_stepon_episode_end#

以下示例演示了如何实现一个自定义 RLlibCallback 类,该类计算 Acrobot-v1 RL 环境 的平均“首次关节角度”。

../_images/acrobot-v1.png

Acrobot-v1 环境:环境变量描述了您将要计算并通过自定义回调记录的角度,如下所示:#

`theta1` is the angle of the first joint, where an angle of 0.0 indicates that the first
link is pointing directly downwards.

此示例利用 RLlib 的 MetricsLogger API 来记录注入代码的自定义计算。有关 MetricsLogger API 的更多详细信息,请参阅 MetricsLogger API

此外,请参阅这个更复杂的示例,该示例 生成 PacMan 热图(图像)并将其记录到 WandB

import math
import numpy as np
from ray.rllib.algorithms.ppo import PPOConfig
from ray.rllib.callbacks.callbacks import RLlibCallback

class LogAcrobotAngle(RLlibCallback):
    def on_episode_created(self, *, episode, **kwargs):
        # Initialize an empty list in the `custom_data` property of `episode`.
        episode.custom_data["theta1"] = []

    def on_episode_step(self, *, episode, env, **kwargs):
        # First get the angle from the env (note that `env` is a VectorEnv).
        # See https://github.com/Farama-Foundation/Gymnasium/blob/main/gymnasium/envs/classic_control/acrobot.py
        # for the env source code.
        cos_theta1, sin_theta1 = env.envs[0].unwrapped.state[0], env.envs[0].unwrapped.state[1]
        # Convert cos/sin/tan into degree.
        deg_theta1 = math.degrees(math.atan2(sin_theta1, cos_theta1))

        # Log the theta1 degree value in the episode object, temporarily.
        episode.custom_data["theta1"].append(deg_theta1)

    def on_episode_end(self, *, episode, metrics_logger, **kwargs):
        # Get all the logged theta1 degree values and average them.
        theta1s = episode.custom_data["theta1"]
        avg_theta1 = np.mean(theta1s)

        # Log the final result - per episode - to the MetricsLogger.
        # Report with a sliding/smoothing window of 50.
        metrics_logger.log_value("theta1_mean", avg_theta1, reduce="mean", window=50)

config = (
    PPOConfig()
    .environment("Acrobot-v1")
    .callbacks(
        callbacks_class=LogAcrobotAngle,
    )
)
ppo = config.build()

# Train n times. Expect to find `theta1_mean` in the results under:
# `env_runners/theta1_mean`
for i in range(10):
    results = ppo.train()
    print(
        f"iter={i} "
        f"theta1_mean={results['env_runners']['theta1_mean']} "
        f"R={results['env_runners']['episode_return_mean']}"
    )

提示

您可以根据调用 EnvRunner 是用于收集训练样本的常规“训练”EnvRunner,还是仅用于评估的评估 EnvRunner 来确定自定义逻辑的基础。访问 env_runner.config.in_evaluation 布尔标志,该标志在评估 EnvRunner actor 上为 True,在用于收集训练数据的 EnvRunner actor 上为 False。

有关所有可用回调的确切调用签名以及它们期望的参数类型,请参阅 在 Algorithm 中调用的回调