MetricsLogger API#

注意

Ray 2.40 默认使用 RLlib 的新 API 栈。Ray 团队已基本完成算法、示例脚本和文档到新代码库的迁移。

如果您仍在使用旧的 API 栈,请参阅 新 API 栈迁移指南了解如何迁移的详细信息。

RLlib 团队设计了 MetricsLogger API,以统一并使强化学习 (RL) 实验期间的统计数据和指标的日志记录和处理变得可访问。RLlib 的 Algorithm 类及其所有子组件都各有一个 MetricsLogger 实例来管理该组件的指标和统计数据。当子组件向其父组件报告时,它会“聚合”(reduce)记录的结果并向上游发送。

RLlib 团队推荐将此 API 用于所有您的自定义代码中,例如基于 EnvRunner回调,在自定义损失函数中,或在自定义 training_step() 实现中。

../_images/metrics_logger_overview.svg

RLlib 的 MetricsLogger 系统:RLlib Algorithm 的每个子组件都拥有一个 MetricsLogger 实例,并使用它来本地记录值。当一个组件完成一个独立任务时,例如 EnvRunner 完成一个采样请求时,子组件 (EnvRunner) 的本地指标会被“聚合”(reduced),并发送到包含它的父组件 (Algorithm) 上游。父组件将收到的结果合并到它自己的 MetricsLogger 中,并在其自身的任务周期结束时,同样进行“聚合”,最终报告给用户或 Ray Tune。#

注意

截至目前,拥有 MetricsLogger 实例的 RLlib 组件包括 AlgorithmEnvRunnerLearner、所有 ConnectorV2 类以及所有 ~ray.rllib.utils.replay_buffers.EpisodeReplayBuffer 类。Ray 团队正在考虑将此 API 的访问范围扩展到其他组件。

MetricsLogger 的特性#

MetricsLogger API 提供以下功能

  • 记录随时间变化的标量值,例如损失、个体奖励或 episode 返回。

  • 配置不同的聚合(reduction)类型,特别是 mean(均值)、min(最小值)、max(最大值)或 sum(总和)。此外,用户可以通过设置 reduce=None 来完全不进行聚合,使记录的值保持不变。一个单独的 clear_on_reduce=True 设置允许在每次 reduce 事件时自动清除所有记录的值。

  • 指定滑动窗口,在此窗口上进行聚合(reduction),例如 window=100 表示对最后 100 个记录的值取平均;或者指定指数移动平均 (EMA) 系数,通过该系数计算出的均值中较旧值的权重会随时间衰减。

  • 将来自 n 个并行子组件的 n 个结果字典合并到本地的 MetricsLogger 中。这 n 个字典中的每一个都是在其各自子组件的 MetricsLogger 实例上执行 reduce 操作的结果。

  • 通过方便的 with ... 块记录独立代码块的执行时间。

  • 累加生命周期计数,并在此过程中自动计算相应的每秒吞吐量指标。

MetricsLogger 的内置用法#

RLlib 在现有代码库中广泛使用了 MetricsLogger API。下面是由此产生的典型信息流概览

  1. Algorithm 向其 nEnvRunner actor 发送并行采样请求。

  2. 每个 EnvRunner 通过在 RL 环境中执行步骤来收集训练数据,并将其标准统计数据记录到其 MetricsLogger 中,例如 episode 返回或 episode 长度。

  3. 每个 EnvRunner 在其自身的 MetricsLogger 实例上调用 reduce(),并返回结果统计字典。

  4. Algorithm 将接收到的 n 个统计字典合并到其自身的 MetricsLogger 实例中,位于顶层键“env_runners”下,从而保留了 EnvRunner actor 选择的所有日志设置。

  5. Algorithm 向其 mLearner actor 发送并行更新请求。

  6. 每个 Learner 通过计算损失和梯度来执行模型更新,并将其标准统计数据记录到其 MetricsLogger 中,例如总损失或平均梯度。

  7. 每个 Learner 在其自身的 MetricsLogger 实例上调用 reduce(),并返回结果统计字典。

  8. Algorithm 将接收到的 m 个统计字典合并到其自身的 MetricsLogger 实例中,位于顶层键“learners”下,从而保留了 Learner actor 选择的所有日志设置。

  9. Algorithm 可能会将其标准统计数据添加到其自身的 MetricsLogger 实例中,例如并行采样请求的平均时间。

  10. Algorithm 在其自身的 MetricsLogger 实例上调用 reduce(),编译并将完整且最终的统计字典返回给用户或 Ray Tune。

MetricsLogger API 详解#

在您可以在自定义代码中使用 MetricsLogger 之前,请熟悉如何实际使用其 API。

记录标量值#

要在您的 MetricsLogger 中的某个字符串键下记录标量值,请使用 log_value() 方法

from ray.rllib.utils.metrics.metrics_logger import MetricsLogger

logger = MetricsLogger()

# Log a scalar float value under the `loss` key. By default, all logged
# values under that key are averaged, once `reduce()` is called.
logger.log_value("loss", 0.01, reduce="mean", window=2)

默认情况下,MetricsLogger 通过平均值来聚合(reduce)值 (reduce="mean")。其他可用的聚合类型包括 reduce="min"(最小值)、reduce="max"(最大值)和 reduce="sum"(总和)。

指定 window 会导致聚合在最后 window 个记录的值上进行。例如,您可以继续在 loss 键下记录新值

logger.log_value("loss", 0.02)  # don't have to repeat `reduce` or `window` args,
                                # because the key already exists.
logger.log_value("loss", 0.03)
logger.log_value("loss", 0.04)
logger.log_value("loss", 0.05)

因为您指定了窗口为 2,MetricsLogger 只使用最后 2 个值来计算聚合结果。您可以通过 peek() 方法“窥视”当前的聚合结果

# Peek at the current, reduced value.
# Note that in the underlying structure, the internal values list still
# contains all logged values: 0.01, 0.02, 0.03, 0.04, and 0.05.
print(logger.peek("loss"))  # Expect: 0.045, which is the average over the last 2 values

peek() 方法允许您检查某个键当前底层的聚合结果,而无需实际调用 reduce()

警告

请勿在您的自定义代码中自己调用任何 MetricsLoggerreduce() 方法。RLlib 仅在任务周期结束时调用此 API。RLlib 完全控制所有这些“移交”点,因此除非您编写自己的子组件向父组件(例如 Algorithm)报告,否则请避免调用 reduce() 方法。

要获取当前的聚合结果,请改用 peek() 方法,该方法不会更改任何底层值。

除了提供扁平键之外,您还可以通过传递元组的方式在某个嵌套键下记录值

# Log a value under a deeper nested key.
logger.log_value(("some", "nested", "key"), -1.0)
print(logger.peek(("some", "nested", "key")))  # expect: -1.0

要使用除“mean”之外的聚合方法,请在 log_value() 中指定 reduce 参数

# Log a maximum value.
logger.log_value(key="max_value", value=0.0, reduce="max")

因为您没有指定 window 并且使用了 reduce="max",RLlib 使用无限窗口,这意味着 MetricsLogger 会在每次聚合或您查看当前值时报告生命周期内的最大值

for i in range(1000, 0, -1):
    logger.log_value(key="max_value", value=float(i))

logger.peek("max_value")  # Expect: 1000.0, which is the lifetime max (infinite window)

您也可以选择完全不进行聚合,而只是收集单个值,例如您随时间从环境中接收到的图像集合,对于这些图像,以任何方式进行聚合都没有意义。

使用 reduce=None 参数来实现这一点。但是,强烈建议您同时设置 clear_on_reduce=True 标志,因为否则此设置可能会导致内存泄漏。此标志确保 MetricsLogger 在每次 reduce() 移交操作后清空底层的数值列表,例如从 EnvRunnerAlgorithm 的移交。

logger.log_value("some_items", value="a", reduce=None, clear_on_reduce=True)
logger.log_value("some_items", value="b")
logger.log_value("some_items", value="c")
logger.log_value("some_items", value="d")

logger.peek("some_items")  # expect a list: ["a", "b", "c", "d"]

logger.reduce()
logger.peek("some_items")  # expect an empty list: []

记录一组嵌套标量值#

如果您正在记录嵌套结构的值,例如 {"time_s": 0.1, "lives": 5, "rounds_played": {"player1": 10, "player2": 4}},并且所有值在 reduceclear_on_reducewindow 等参数方面具有完全相同的日志设置,您也可以调用快捷方法 log_dict() 来实现。

from ray.rllib.utils.metrics.metrics_logger import MetricsLogger

logger = MetricsLogger()

# Log a bunch of scalar values within a nested dict.
stats = {"player1": 100.0, "player2": 105.0}
logger.log_dict(stats, key="mean_scores", reduce="mean", window=10)

# Later, do the same again.
stats = {"player1": 150.0, "player2": 110.0}
logger.log_dict(stats, key="mean_scores")

print(logger.peek(("mean_scores", "player1")))  # <- expect 125.0

记录非标量数据#

MetricsLogger 不仅限于标量值。您也可以使用它来记录图像、视频或任何其他复杂数据。

通常,您会选择前面描述的 reduce=None 参数。例如,要记录来自 CartPole 环境的连续三个图像帧,请执行以下操作

import gymnasium as gym

env = gym.make("CartPole-v1")

# Log three consecutive render frames from the env.
# Make sure to set ``clear_on_reduce=True`` to avoid memory leaks.
env.reset()
logger.log_value("some_images", value=env.render(), reduce=None, clear_on_reduce=True)
env.step(0)
logger.log_value("some_images", value=env.render())
env.step(1)
logger.log_value("some_images", value=env.render())

计时器#

MetricsLogger 具有上下文能力,并提供以下简单的 API 来记录计时器结果。请注意,您现在可以通过一行 with- 语句来对自定义代码中所有感兴趣的代码块进行计时

import time
from ray.rllib.utils.metrics.metrics_logger import MetricsLogger

logger = MetricsLogger()

# First delta measurement:
with logger.log_time("my_block_to_be_timed", reduce="mean", ema_coeff=0.1):
    time.sleep(1.0)

# EMA should be ~1sec.
assert 1.1 > logger.peek("my_block_to_be_timed") > 0.9

# Second delta measurement:
with logger.log_time("my_block_to_be_timed"):
    time.sleep(2.0)

# EMA should be ~1.1sec.
assert 1.15 > logger.peek("my_block_to_be_timed") > 1.05

注意

默认的日志记录行为是通过指数均值平均 (EMA) 进行,默认系数为 0.01。此默认值通常是在实验过程中对计时器结果进行平均的良好选择。

计数器#

如果您想对事物进行计数,例如在采样阶段执行的环境步骤数,并将这些计数累加,无论是整个生命周期还是某个特定阶段,请在调用 log_value() 时使用 reduce="sum" 参数。

如果您希望计数只累加到下一个“reduce”事件发生时为止,请将其与 clear_on_reduce=True 结合使用。如果您希望计数在整个生命周期内累加,请设置 clear_on_reduce=False,这是默认值。

from ray.rllib.utils.metrics.metrics_logger import MetricsLogger

logger = MetricsLogger()

logger.log_value("my_counter", 50, reduce="sum", window=None)
logger.log_value("my_counter", 25)
logger.peek("my_counter")  # expect: 75

# Even if your logger gets "reduced" from time to time, the counter keeps increasing
# because we set clear_on_reduce=False (default behavior):
logger.reduce()
logger.peek("my_counter")  # still expect: 75

# To clear the sum after each "reduce" event, set `clear_on_reduce=True`:
logger.log_value("my_temp_counter", 50, reduce="sum", window=None, clear_on_reduce=True)
logger.log_value("my_temp_counter", 25)
logger.peek("my_counter")  # expect: 75
logger.reduce()
logger.peek("my_counter")  # expect: 0 (upon reduction, all values are cleared)

自动吞吐量测量#

使用 reduce="sum"clear_on_reduce=False 设置记录的指标被视为 lifetime 计数器,它在整个实验过程中累加计数,而不会将值重置为 0。如果您还添加了 with_throughput=True 标志,底层指标会在每次 reduce() 操作时自动计算每秒吞吐量。

Algorithm 会自动为每个此类指标编译一个额外的键,在原始键后添加后缀 _throughput,并为其分配每秒吞吐量的值。

您可以将 peek() 方法与调用参数 throughput=True 一起使用来访问吞吐量值。例如

import time
from ray.rllib.utils.metrics.metrics_logger import MetricsLogger

logger = MetricsLogger()

for _ in range(3):
    logger.log_value("lifetime_count", 5, reduce="sum", with_throughput=True)

    # RLlib triggers a new throughput computation at each `reduce()` call
    logger.reduce()
    time.sleep(1.0)

    # Expect the first call to return NaN because we don't have a proper start time for the time delta.
    # From the second call on, expect a value of roughly 5/sec.
    print(logger.peek("lifetime_count", throughput=True))

示例 1:如何在 EnvRunner 回调中使用 MetricsLogger#

为了演示如何在 EnvRunner 上使用 MetricsLogger,请查看此处的端到端示例,该示例利用 RLlibCallback API 将自定义代码注入到 RL 环境循环中。

该示例计算 Acrobot-v1 RL 环境的平均“第一关节角度”,并通过 MetricsLogger API 记录结果。

请注意,此示例与此处描述的示例完全相同,但重点已转移,仅解释代码中的 MetricsLogger 相关方面。

import math
import numpy as np
from ray.rllib.algorithms.ppo import PPOConfig
from ray.rllib.callbacks.callbacks import RLlibCallback

# Define a custom RLlibCallback.

class LogAcrobotAngle(RLlibCallback):

    def on_episode_created(self, *, episode, **kwargs):
        # Initialize an empty list in the `custom_data` property of `episode`.
        episode.custom_data["theta1"] = []

    def on_episode_step(self, *, episode, env, **kwargs):
        # Compute the angle at every episode step and store it temporarily in episode:
        state = env.envs[0].unwrapped.state
        deg_theta1 = math.degrees(math.atan2(state[1], state[0]))
        episode.custom_data["theta1"].append(deg_theta1)

    def on_episode_end(self, *, episode, metrics_logger, **kwargs):
        theta1s = episode.custom_data["theta1"]
        avg_theta1 = np.mean(theta1s)

        # Log the resulting average angle - per episode - to the MetricsLogger.
        # Report with a sliding window of 50.
        metrics_logger.log_value("theta1_mean", avg_theta1, reduce="mean", window=50)

config = (
    PPOConfig()
    .environment("Acrobot-v1")
    .callbacks(
        callbacks_class=LogAcrobotAngle,
    )
)
ppo = config.build()

# Train n times. Expect `theta1_mean` to be found in the results under:
# `env_runners/theta1_mean`
for i in range(10):
    results = ppo.train()
    print(
        f"iter={i} "
        f"theta1_mean={results['env_runners']['theta1_mean']} "
        f"R={results['env_runners']['episode_return_mean']}"
    )

另请参阅此处的更复杂示例,了解如何生成 PacMan 热力图(图像)并将其记录到 WandB

示例 2:如何在自定义损失函数中使用 MetricsLogger#

您可以在自定义损失函数中记录指标。为此,请使用 Learner 的 self.metrics 属性。

注意

在记录损失值时,RLlib 团队建议使用 window=1 来始终报告准确的当前损失值,而不是随时间平滑的结果。这样,您可以立即注意到损失计算中的奇怪尖峰或不稳定行为,并将问题定位到特定迭代。

@override(TorchLearner)
def compute_loss_for_module(self, *, module_id, config, batch, fwd_out):
    ...

    loss_xyz = ...

    # Log a specific loss term.
    self.metrics.log_value("special_loss_term", loss_xyz, window=1)

    total_loss = loss_abc + loss_xyz

    return total_loss

请查看此处运行中的在损失函数内记录自定义值的端到端示例

示例 3:如何在自定义 Algorithm 中使用 MetricsLogger#

您可以在自定义 Algorithm 的 training_step() 方法中记录指标。为此,请使用 Algorithm 自己的 self.metrics 属性。

@override(Algorithm)
def training_step(self) -> None:
    ...

    # Log some value.
    self.metrics.log_value("some_mean_result", 1.5, window=5)

    ...

    with self.metrics.log_time(("timers", "some_code")):
        ... # time some code

请查看此处的运行中在 training_step() 中进行日志记录的端到端示例