高级 Python API#

自定义训练工作流#

基本训练示例中,Tune 会在每个训练迭代中调用算法的 train() 方法,并报告新的训练结果。有时,我们希望完全控制训练过程,但仍能在 Tune 中运行。Tune 支持使用自定义可训练函数来实现自定义训练工作流(示例)

课程学习#

在课程学习中,您可以在训练过程中将环境设置为不同的难度。这种设置允许算法通过与越来越难的阶段进行交互和探索,逐步学会如何解决实际的最终问题。通常,这种课程学习始于将环境设置为简单级别,然后随着训练的进行,逐渐转向更难解决的难度。有关课程学习的更多示例,请参阅“反向课程生成用于强化学习代理”的博文。

RLlib 的 Algorithm 和自定义回调 API 允许实现任何任意课程。本示例脚本介绍了您需要理解的基本概念。

首先,定义一些环境选项。本示例使用 FrozenLake-v1 环境,这是一个网格世界,您可以完全自定义其地图。RLlib 用代理必须导航的不同地图代表了三个不同难度的任务。

ENV_OPTIONS = {
    "is_slippery": False,
    # Limit the number of steps the agent is allowed to make in the env to
    # make it almost impossible to learn without the curriculum.
    "max_episode_steps": 16,
}

# Our 3 tasks: 0=easiest, 1=medium, 2=hard
ENV_MAPS = [
    # 0
    [
        "SFFHFFFH",
        "FFFHFFFF",
        "FFGFFFFF",
        "FFFFFFFF",
        "HFFFFFFF",
        "HHFFFFHF",
        "FFFFFHHF",
        "FHFFFFFF",
    ],
    # 1
    [
        "SFFHFFFH",
        "FFFHFFFF",
        "FFFFFFFF",
        "FFFFFFFF",
        "HFFFFFFF",
        "HHFFGFHF",
        "FFFFFHHF",
        "FHFFFFFF",
    ],
    # 2
    [
        "SFFHFFFH",
        "FFFHFFFF",
        "FFFFFFFF",
        "FFFFFFFF",
        "HFFFFFFF",
        "HHFFFFHF",
        "FFFFFHHF",
        "FHFFFFFG",
    ],
]

然后,定义控制课程的中心部分,这是一个自定义回调类,它覆盖了 on_train_result()

import ray
from ray import tune
from ray.rllib.callbacks.callbacks import RLlibCallback

class MyCallbacks(RLlibCallback):
    def on_train_result(self, algorithm, result, **kwargs):
        if result["env_runners"]["episode_return_mean"] > 200:
            task = 2
        elif result["env_runners"]["episode_return_mean"] > 100:
            task = 1
        else:
            task = 0
        algorithm.env_runner_group.foreach_worker(
            lambda ev: ev.foreach_env(
                lambda env: env.set_task(task)))

ray.init()
tune.Tuner(
    "PPO",
    param_space={
        "env": YourEnv,
        "callbacks": MyCallbacks,
    },
).fit()

全局协调#

有时,您需要协调 RLlib 管理的、存在于不同进程中的代码片段。例如,维护某个变量的全局平均值,或者集中控制策略使用的超参数可能很有用。Ray 通过命名 Actor 提供了一种实现这种协调的通用方法。要了解更多信息,请参阅Ray Actor。RLlib 会为这些 Actor 分配一个全局名称。您可以使用这些名称检索它们的句柄。例如,考虑维护一个由环境定期递增并在驱动程序中读取的共享全局计数器。

import ray


@ray.remote
class Counter:
    def __init__(self):
        self.count = 0

    def inc(self, n):
        self.count += n

    def get(self):
        return self.count


# on the driver
counter = Counter.options(name="global_counter").remote()
print(ray.get(counter.get.remote()))  # get the latest count

# in your envs
counter = ray.get_actor("global_counter")
counter.inc.remote(1)  # async call to increment the global count

Ray Actor 提供高级性能。在更复杂的情况下,您可以使用它们来实现参数服务器和 all-reduce 等通信模式。

可视化自定义指标#

像其他训练结果一样访问和可视化自定义指标

../_images/custom_metric.png

自定义探索行为#

RLlib 提供了一个统一的顶级 API 来配置和自定义代理的探索行为,包括决策,例如如何以及是否从分布中采样动作,是随机还是确定性的。使用内置的 Exploration 类设置行为。请参阅此包,您可以在 AlgorithmConfig().env_runners(..) 中指定和进一步配置它。除了使用其中一个可用类外,您还可以继承其中任何一个内置类,为其添加自定义行为,然后在配置中改用这个新类。

每个策略都有一个 Exploration 对象,RLlib 通过 AlgorithmConfig 的 .env_runners(exploration_config=...) 方法创建它。该方法通过特殊的“type”键指定要使用的类,并通过所有其他键指定构造函数参数。例如:

from ray.rllib.algorithms.algorithm_config import AlgorithmConfig

config = AlgorithmConfig().env_runners(
    exploration_config={
        # Special `type` key provides class information
        "type": "StochasticSampling",
        # Add any needed constructor args here.
        "constructor_arg": "value",
    }
)

下表列出了所有内置的 Exploration 子类以及默认使用它们的代理。

../_images/rllib-exploration-api-table.svg

Exploration 类实现 get_exploration_action 方法,您在该方法中定义确切的探索行为。它接受模型的输出、动作分布类、模型本身、一个时间步(例如,已经进行的全局环境采样步骤)和一个 explore 开关。它输出一个元组:a) 动作和 b) 对数似然度。


    def get_exploration_action(self,
                               *,
                               action_distribution: ActionDistribution,
                               timestep: Union[TensorType, int],
                               explore: bool = True):
        """Returns a (possibly) exploratory action and its log-likelihood.

        Given the Model's logits outputs and action distribution, returns an
        exploratory action.

        Args:
            action_distribution: The instantiated
                ActionDistribution object to work with when creating
                exploration actions.
            timestep: The current sampling time step. It can be a tensor
                for TF graph mode, otherwise an integer.
            explore: True: "Normal" exploration behavior.
                False: Suppress all exploratory behavior and return
                a deterministic action.

        Returns:
            A tuple consisting of 1) the chosen exploration action or a
            tf-op to fetch the exploration action from the graph and
            2) the log-likelihood of the exploration action.
        """
        pass

在最高级别,Algorithm.compute_actionsPolicy.compute_actions 方法有一个布尔值 explore 开关,RLlib 将其传递给 Exploration.get_exploration_action。如果 explore=None,RLlib 使用 Algorithm.config[“explore”] 的值,该值充当探索行为的主要开关,例如允许为了评估目的轻松关闭任何探索。请参阅训练过程中的自定义评估

以下是来自不同算法配置的示例摘录,用于从 rllib/algorithms/algorithm.py 设置不同的探索行为。

# All of the following configs go into Algorithm.config.

# 1) Switching *off* exploration by default.
# Behavior: Calling `compute_action(s)` without explicitly setting its `explore`
# param results in no exploration.
# However, explicitly calling `compute_action(s)` with `explore=True`
# still(!) results in exploration (per-call overrides default).
"explore": False,

# 2) Switching *on* exploration by default.
# Behavior: Calling `compute_action(s)` without explicitly setting its
# explore param results in exploration.
# However, explicitly calling `compute_action(s)` with `explore=False`
# results in no(!) exploration (per-call overrides default).
"explore": True,

# 3) Example exploration_config usages:
# a) DQN: see rllib/algorithms/dqn/dqn.py
"explore": True,
"exploration_config": {
   # Exploration sub-class by name or full path to module+class
   # (e.g., “ray.rllib.utils.exploration.epsilon_greedy.EpsilonGreedy”)
   "type": "EpsilonGreedy",
   # Parameters for the Exploration class' constructor:
   "initial_epsilon": 1.0,
   "final_epsilon": 0.02,
   "epsilon_timesteps": 10000,  # Timesteps over which to anneal epsilon.
},

# b) DQN Soft-Q: In order to switch to Soft-Q exploration, do instead:
"explore": True,
"exploration_config": {
   "type": "SoftQ",
   # Parameters for the Exploration class' constructor:
   "temperature": 1.0,
},

# c) All policy-gradient algos and SAC: see rllib/algorithms/algorithm.py
# Behavior: The algo samples stochastically from the
# model-parameterized distribution. This is the global Algorithm default
# setting defined in algorithm.py and used by all PG-type algos (plus SAC).
"explore": True,
"exploration_config": {
   "type": "StochasticSampling",
   "random_timesteps": 0,  # timesteps at beginning, over which to act uniformly randomly
},

训练过程中的自定义评估#

RLlib 报告在线训练奖励,但在某些情况下,您可能希望使用不同的设置来计算奖励。例如,关闭探索,或在特定的环境配置集上计算。通过将 evaluation_interval 设置为正整数,可以激活训练过程中的评估策略(Algorithm.train())。此值指定每次 RLlib 运行“评估步骤”时应发生的 Algorithm.train() 调用次数。

from ray.rllib.algorithms.algorithm_config import AlgorithmConfig

# Run one evaluation step on every 3rd `Algorithm.train()` call.
config = AlgorithmConfig().evaluation(
    evaluation_interval=3,
)

评估步骤运行,使用其自己的 EnvRunner 实例,持续 evaluation_duration 个回合或时间步,具体取决于 evaluation_duration_unit 设置,该设置可以取值 "episodes"(默认值)或 "timesteps"

# Every time we run an evaluation step, run it for exactly 10 episodes.
config = AlgorithmConfig().evaluation(
    evaluation_duration=10,
    evaluation_duration_unit="episodes",
)
# Every time we run an evaluation step, run it for (close to) 200 timesteps.
config = AlgorithmConfig().evaluation(
    evaluation_duration=200,
    evaluation_duration_unit="timesteps",
)

请注意,当使用 evaluation_duration_unit=timestepsevaluation_duration 设置不能被评估工作程序数量整除时,RLlib 会将指定的时间步数向上舍入到最接近的可被评估工作程序数量整除的整数时间步数。同样,当使用 evaluation_duration_unit=episodesevaluation_duration 设置不能被评估工作程序数量整除时,RLlib 会在前 n 个评估 EnvRunner 上运行剩余的回合,并在该时间内让剩余的工作程序空闲。您可以使用 evaluation_num_env_runners 配置评估工作程序。

例如

# Every time we run an evaluation step, run it for exactly 10 episodes, no matter,
# how many eval workers we have.
config = AlgorithmConfig().evaluation(
    evaluation_duration=10,
    evaluation_duration_unit="episodes",
    # What if number of eval workers is non-dividable by 10?
    # -> Run 7 episodes (1 per eval worker), then run 3 more episodes only using
    #    evaluation workers 1-3 (evaluation workers 4-7 remain idle during that time).
    evaluation_num_env_runners=7,
)

在每次评估步骤之前,RLlib 会将权重从主模型同步到所有评估工作程序。

默认情况下,RLlib 在相应的训练步骤 **之后** 立即运行评估步骤(如果当前迭代存在)。例如,对于 evaluation_interval=1,事件顺序是:train(0->1), eval(1), train(1->2), eval(2), train(2->3), ...。索引显示了 RLlib 使用的神经网络权重的版本。train(0->1) 是一个更新步骤,将权重从版本 0 更改为版本 1,然后 eval(1) 使用权重版本 1。权重索引 0 表示神经网络的随机初始化权重。

以下是另一个示例。对于 evaluation_interval=2,顺序是:train(0->1), train(1->2), eval(2), train(2->3), train(3->4), eval(4), ...

您可以选择不按顺序运行 traineval 步骤,而是使用 evaluation_parallel_to_training=True 配置设置并行运行它们。在这种情况下,RLlib 使用多线程同时运行训练和评估步骤。这种并行化可以显著加快评估过程,但会导致报告的训练和评估结果之间有 1 个迭代的延迟。在这种情况下,评估结果会滞后,因为它们使用的是稍旧的模型权重,RLlib 会在之前的训练步骤后同步这些权重。

例如,对于 evaluation_parallel_to_training=Trueevaluation_interval=1,顺序是:train(0->1) + eval(0), train(1->2) + eval(1), train(2->3) + eval(2),其中 + 连接同时发生的阶段。请注意,权重索引的变化与非并行示例相对应。评估权重索引现在比训练权重索引“落后一位”(train(1->**2**) + eval(**1**))。

当使用 evaluation_parallel_to_training=True 设置运行时,RLlib 支持 evaluation_duration 的特殊“auto”值。使用此自动设置可使评估步骤花费与当前进行的训练步骤大致相同的时间。

# Run evaluation and training at the same time via threading and make sure they roughly
# take the same time, such that the next `Algorithm.train()` call can execute
# immediately and not have to wait for a still ongoing (e.g. b/c of very long episodes)
# evaluation step:
config = AlgorithmConfig().evaluation(
    evaluation_interval=2,
    # run evaluation and training in parallel
    evaluation_parallel_to_training=True,
    # automatically end evaluation when train step has finished
    evaluation_duration="auto",
    evaluation_duration_unit="timesteps",  # <- this setting is ignored; RLlib
    # will always run by timesteps (not by complete
    # episodes) in this duration=auto mode
)

evaluation_config 键允许您覆盖评估工作程序的所有配置设置。例如,要在评估步骤中关闭探索,请执行以下操作:

# Switching off exploration behavior for evaluation workers
# (see rllib/algorithms/algorithm.py). Use any keys in this sub-dict that are
# also supported in the main Algorithm config.
config = AlgorithmConfig().evaluation(
    evaluation_config=AlgorithmConfig.overrides(explore=False),
)
# ... which is a more type-checked version of the old-style:
# config = AlgorithmConfig().evaluation(
#    evaluation_config={"explore": False},
# )

注意

策略梯度算法即使存在随机策略也能找到最优策略。将“explore=False”设置为会使评估工作程序不使用此随机策略。

RLlib 通过 evaluation_num_env_runners 设置确定评估步骤内的并行级别。如果您希望所需的评估回合或时间步尽可能并行运行,请将此参数设置为更大的值。例如,如果 evaluation_duration=10evaluation_duration_unit=episodes,和 evaluation_num_env_runners=10,每个评估 EnvRunner 在每个评估步骤中只需要运行一个回合。

如果观察到评估 EnvRunner 在评估过程中出现偶尔的失败(例如,一个有时会崩溃或停滞的环境),请使用以下设置组合来最小化该环境行为的负面影响:

请注意,无论是否进行并行评估,RLlib 都会遵守所有容错设置,例如 ignore_env_runner_failuresrestart_failed_env_runners,并将其应用于失败的评估工作程序。

以下是一个示例:

# Having an environment that occasionally blocks completely for e.g. 10min would
# also affect (and block) training. Here is how you can defend your evaluation setup
# against oft-crashing or -stalling envs (or other unstable components on your evaluation
# workers).
config = AlgorithmConfig().evaluation(
    evaluation_interval=1,
    evaluation_parallel_to_training=True,
    evaluation_duration="auto",
    evaluation_duration_unit="timesteps",  # <- default anyway
    evaluation_force_reset_envs_before_iteration=True,  # <- default anyway
)

此示例运行所有评估 EnvRunner 的并行采样,因此,如果其中一个工作程序花费太长时间来完成一个回合并返回数据或完全失败,其他评估 EnvRunner 仍将完成工作。

如果您想完全自定义评估步骤,请在配置中将 custom_eval_function 设置为一个可调用对象,该对象接受 Algorithm 对象和一个 EnvRunnerGroup 对象(Algorithm 的 self.evaluation_workers EnvRunnerGroup 实例),并返回一个指标字典。有关更多文档,请参阅algorithm.py

此端到端示例展示了如何在custom_evaluation.py中设置自定义在线评估。请注意,如果您只想在训练结束时评估您的策略,请将 evaluation_interval: [int] 设置为训练迭代次数,直到停止。

以下是一些 RLlib 如何在正常训练结果的 evaluation 键下嵌套报告自定义评估指标的示例。

------------------------------------------------------------------------
Sample output for `python custom_evaluation.py --no-custom-eval`
------------------------------------------------------------------------

INFO algorithm.py:623 -- Evaluating current policy for 10 episodes.
INFO algorithm.py:650 -- Running round 0 of parallel evaluation (2/10 episodes)
INFO algorithm.py:650 -- Running round 1 of parallel evaluation (4/10 episodes)
INFO algorithm.py:650 -- Running round 2 of parallel evaluation (6/10 episodes)
INFO algorithm.py:650 -- Running round 3 of parallel evaluation (8/10 episodes)
INFO algorithm.py:650 -- Running round 4 of parallel evaluation (10/10 episodes)

Result for PG_SimpleCorridor_2c6b27dc:
  ...
  evaluation:
    env_runners:
      custom_metrics: {}
      episode_len_mean: 15.864661654135338
      episode_return_max: 1.0
      episode_return_mean: 0.49624060150375937
      episode_return_min: 0.0
      episodes_this_iter: 133
------------------------------------------------------------------------
Sample output for `python custom_evaluation.py`
------------------------------------------------------------------------

INFO algorithm.py:631 -- Running custom eval function <function ...>
Update corridor length to 4
Update corridor length to 7
Custom evaluation round 1
Custom evaluation round 2
Custom evaluation round 3
Custom evaluation round 4

Result for PG_SimpleCorridor_0de4e686:
  ...
  evaluation:
    env_runners:
      custom_metrics: {}
      episode_len_mean: 9.15695067264574
      episode_return_max: 1.0
      episode_return_mean: 0.9596412556053812
      episode_return_min: 0.0
      episodes_this_iter: 223
      foo: 1

重写轨迹#

on_postprocess_traj 回调中,您可以完全访问轨迹批次(post_batch)和其他训练状态。您可以使用此信息重写轨迹,这有多种用途,包括:

  • 将奖励回溯到之前的时间步,例如,基于 info 中的值。

  • 向奖励添加基于模型的探索奖励。您可以使用自定义模型监督损失来训练模型。

要访问回调中的策略或模型(policy.model),请注意 info['pre_batch'] 返回一个元组,其中第一个元素是策略,第二个元素是批次本身。您还可以使用以下调用访问所有 rollout 工作程序的状

from ray.rllib.evaluation.rollout_worker import get_global_worker

# You can use this call from any callback to get a reference to the
# RolloutWorker running in the process. The RolloutWorker has references to
# all the policies, etc. See rollout_worker.py for more info.
rollout_worker = get_global_worker()

RLlib 在 post_batch 数据上定义策略损失,因此您可以对其进行修改以更改策略损失函数看到的数据。