回调#
回调是向实验注入代码的最直接方法。您可以定义在特定事件下执行的代码,并将其传递给您的 AlgorithmConfig。
以下是一个定义简单 lambda 函数的示例,该函数在剧集结束时打印剧集的返回值
from ray.rllib.algorithms.ppo import PPOConfig
ppo = config = (
PPOConfig()
.environment("CartPole-v1")
.callbacks(
on_episode_end=(
lambda episode, **kw: print(f"Episode done. R={episode.get_return()}")
)
)
.build()
)
ppo.train()
Callback lambdas 与有状态的 RLlibCallback#
有两种方法可以为各种回调事件定义自定义代码以供执行。
Callback lambdas#
如果注入的代码相当简单,并且不需要存储临时信息以供后续事件调用重用,则可以使用 lambda 函数,并将其作为之前所示的方法传递给 callbacks()。
有关完整列表,请参阅 ref:Callback events <rllib-callback-event-overview>。事件的名称始终与 callbacks() 方法的参数名称匹配。
有状态的 RLlibCallback#
如果注入的代码是有状态的,并且会临时存储结果以供同一事件或不同事件触发的后续调用重用,则需要继承 RLlibCallback API,然后实现一个或多个方法,例如 on_algorithm_init()。
以下是打印已终止剧集返回值的相同示例,但使用了 RLlibCallback 的子类。
from ray.rllib.algorithms.ppo import PPOConfig
from ray.rllib.callbacks.callbacks import RLlibCallback
class EpisodeReturn(RLlibCallback):
def __init__(self):
super().__init__()
# Keep some global state in between individual callback events.
self.overall_sum_of_rewards = 0.0
def on_episode_end(self, *, episode, **kwargs):
self.overall_sum_of_rewards += episode.get_return()
print(f"Episode done. R={episode.get_return()} Global SUM={self.overall_sum_of_rewards}")
ppo = (
PPOConfig()
.environment("CartPole-v1")
.callbacks(EpisodeReturn)
.build()
)
ppo.train()
回调事件#
在训练迭代过程中,算法通常会遍历以下事件树,这是 RLlib 回调系统中所有支持事件的概览。
Algorithm
.__init__()
`on_algorithm_init` - After algorithm construction and setup.
.train()
`on_train_result` - After a training iteration.
.evaluate()
`on_evaluate_start` - Before evaluation starts using the eval ``EnvRunnerGroup``.
`on_evaluate_end` - After evaluation is finished.
.restore_from_path()
`on_checkpoint_loaded` - After a checkpoint's new state has been loaded.
EnvRunner
.__init__()
`on_environment_created` - After the RL environment has been created.
.sample()
`on_episode_created` - After a new episode object has been created.
`on_episode_start` - After an episode object has started (after ``env.reset()``).
`on_episode_step` - After an episode object has stepped (after ``env.step()``).
`on_episode_end` - After an episode object has terminated (or truncated).
`on_sample_end` - At the end of the ``EnvRunner.sample()`` call.
请注意,树中的某些事件会同时发生,通过 Ray actor 在不同进程上。例如,EnvRunner actor 可能会触发其 on_episode_start 事件,同时另一个 EnvRunner actor 可能会触发其 on_sample_end 事件,而主 Algorithm 进程会触发 on_train_result。
RLlibCallback 的 Algorithm 绑定方法
|
当新的 Algorithm 实例完成设置后运行的回调。 |
|
评估开始前运行的回调。 |
|
评估完成后运行。 |
一个或多个 EnvRunner actor 被重新创建后运行的回调。 |
|
Algorithm 从检查点加载新状态后运行的回调。 |
RLlibCallback 的 EnvRunner 绑定方法
创建新环境对象后运行的回调。 |
|
|
创建新剧集(但尚未开始!)后运行的回调。 |
|
剧集开始后立即运行的回调。 |
|
在每个剧集步长(在记录了动作后)调用。 |
|
剧集结束时调用(在记录了 terminated/truncated 后)。 |
|
在 |
链接回调#
您可以定义多个 RLlibCallback 类,并将它们作为列表传递给 callbacks() 方法。您也可以向该方法的不同参数传递可调用对象的列表,而不是单个可调用对象。
例如,如果您已经编写了一个 RLlibCallback 的子类,并且想在不同实验中重用它。因为您的某个实验需要一些调试回调代码,您只想在短时间内临时注入它,用于几次运行。
链接回调的解析顺序#
RLlib 按如下方式解析给定事件的所有可用回调方法和可调用对象:
RLlibCallback 的子类优先于您通过 callbacks() 方法的各个参数或列表传递的单个或列表可调用对象。
例如,假设回调事件是 on_train_result,它在训练迭代结束时在算法进程内触发。
RLlib 循环遍历列表中所有给定的
RLlibCallback子类,并调用它们的on_train_result方法。在此过程中,它保持用户在列表中提供的确切顺序。然后 RLlib 循环遍历所有定义的
on_train_result可调用对象列表。您通过调用callbacks()方法并在此调用中定义on_train_result参数来配置这些。
class MyCallbacks(RLlibCallback):
def on_train_result(self, *, algorithm, metrics_logger, result, **kwargs):
print("RLlibCallback subclass")
class MyDebugCallbacks(RLlibCallback):
def on_train_result(self, *, algorithm, metrics_logger, result, **kwargs):
print("debug subclass")
# Define the callbacks order through the config.
# Subclasses first, then individual `on_train_result` (or other events) callables:
config.callbacks(
callbacks_class=[MyDebugCallbacks, MyCallbacks], # <- note: debug class first
on_train_result=[
lambda algorithm, **kw: print('lambda 1'),
lambda algorithm, **kw: print('lambda 2'),
],
)
# When training the algorithm, after each training iteration, you should see
# something like:
# > debug subclass
# > RLlibCallback subclass
# > lambda 1
# > lambda 2
示例#
以下是两个示例,展示了如何在 Algorithm 进程以及 EnvRunner 进程上设置自定义回调。
示例 1: on_train_result#
以下示例演示了如何实现一个简单的自定义函数,该函数会不时地将回放缓冲区内容写入磁盘。
您通常不想将缓冲区内容与您的 Algorithm 检查点一起写入,因此通过自定义回调更少、更受控制地写入可能是一个不错的折衷。
import ormsgpack
from ray.rllib.algorithms.dqn import DQNConfig
def _write_buffer_if_necessary(algorithm, metrics_logger, result):
# Write the buffer contents only every ith iteration.
if algorithm.training_iteration % 2 == 0:
# python dict
buffer_contents = algorithm.local_replay_buffer.get_state()
# binary
msgpacked = ormsgpack.packb(
buffer_contents,
option=ormsgpack.OPT_SERIALIZE_NUMPY,
)
# Open some file and write the buffer contents into it using `ormsgpack`.
with open("replay_buffer_contents.msgpack", "wb") as f:
f.write(msgpacked)
config = (
DQNConfig()
.environment("CartPole-v1")
.callbacks(
on_train_result=_write_buffer_if_necessary,
)
)
dqn = config.build()
# Train n times. Expect RLlib to write buffer every ith iteration.
for _ in range(4):
print(dqn.train())
有关所有可用回调的确切调用签名以及它们期望的参数类型,请参阅 在 Algorithm 中调用的回调。
示例 2: on_episode_step 和 on_episode_end#
以下示例演示了如何实现一个自定义 RLlibCallback 类,该类计算 Acrobot-v1 RL 环境 的平均“首次关节角度”。
Acrobot-v1 环境:环境变量描述了您将要计算并通过自定义回调记录的角度,如下所示:#
`theta1` is the angle of the first joint, where an angle of 0.0 indicates that the first
link is pointing directly downwards.
此示例利用 RLlib 的 MetricsLogger API 来记录注入代码的自定义计算。有关 MetricsLogger API 的更多详细信息,请参阅 MetricsLogger API。
此外,请参阅这个更复杂的示例,该示例 生成 PacMan 热图(图像)并将其记录到 WandB。
import math
import numpy as np
from ray.rllib.algorithms.ppo import PPOConfig
from ray.rllib.callbacks.callbacks import RLlibCallback
class LogAcrobotAngle(RLlibCallback):
def on_episode_created(self, *, episode, **kwargs):
# Initialize an empty list in the `custom_data` property of `episode`.
episode.custom_data["theta1"] = []
def on_episode_step(self, *, episode, env, **kwargs):
# First get the angle from the env (note that `env` is a VectorEnv).
# See https://github.com/Farama-Foundation/Gymnasium/blob/main/gymnasium/envs/classic_control/acrobot.py
# for the env source code.
cos_theta1, sin_theta1 = env.envs[0].unwrapped.state[0], env.envs[0].unwrapped.state[1]
# Convert cos/sin/tan into degree.
deg_theta1 = math.degrees(math.atan2(sin_theta1, cos_theta1))
# Log the theta1 degree value in the episode object, temporarily.
episode.custom_data["theta1"].append(deg_theta1)
def on_episode_end(self, *, episode, metrics_logger, **kwargs):
# Get all the logged theta1 degree values and average them.
theta1s = episode.custom_data["theta1"]
avg_theta1 = np.mean(theta1s)
# Log the final result - per episode - to the MetricsLogger.
# Report with a sliding/smoothing window of 50.
metrics_logger.log_value("theta1_mean", avg_theta1, reduce="mean", window=50)
config = (
PPOConfig()
.environment("Acrobot-v1")
.callbacks(
callbacks_class=LogAcrobotAngle,
)
)
ppo = config.build()
# Train n times. Expect to find `theta1_mean` in the results under:
# `env_runners/theta1_mean`
for i in range(10):
results = ppo.train()
print(
f"iter={i} "
f"theta1_mean={results['env_runners']['theta1_mean']} "
f"R={results['env_runners']['episode_return_mean']}"
)
提示
您可以根据调用 EnvRunner 是用于收集训练样本的常规“训练”EnvRunner,还是仅用于评估的评估 EnvRunner 来确定自定义逻辑的基础。访问 env_runner.config.in_evaluation 布尔标志,该标志在评估 EnvRunner actor 上为 True,在用于收集训练数据的 EnvRunner actor 上为 False。
有关所有可用回调的确切调用签名以及它们期望的参数类型,请参阅 在 Algorithm 中调用的回调。