注意

Ray 2.40 默认使用 RLlib 的新 API 栈。Ray 团队已基本完成将算法、示例脚本和文档迁移到新代码库的工作。

如果您仍在使用旧 API 栈,请参阅新 API 栈迁移指南以了解如何迁移的详细信息。

回放缓冲区#

RL 中回放缓冲区的快速介绍#

在强化学习中,当我们谈论回放缓冲区时,通常指一个存储和回放从智能体与环境交互中收集到的经验的缓冲区。在 Python 中,一个简单的缓冲区可以通过列表实现,元素被添加到列表中,然后从中采样。这类缓冲区主要用于离策略学习算法。这在直觉上是合理的,因为这些算法可以从存储在缓冲区中的经验中学习,这些经验可能由策略的先前版本(甚至是完全不同的“行为策略”)生成。

采样策略#

从回放缓冲区采样时,我们选择用于训练智能体的经验。一种对许多算法都有效的直接策略是均匀随机选取这些样本。一种更高级的策略(在许多情况下证明更好)是优先经验回放 (PER)。在 PER 中,缓冲区中的单个项目被赋予一个(标量)优先级值,表示它们的重要性,或者简单地说,我们期望从这些项目中学习到多少。优先级更高的经验更有可能被采样。

驱逐策略#

缓冲区的容量自然是有限的,无法容纳无限的经验。在算法运行过程中,缓冲区最终会达到其容量,为了给新的经验腾出空间,我们需要删除(驱逐)旧的经验。这通常按照先进先出的原则进行。对于您的算法来说,这意味着容量高的缓冲区提供了从旧样本中学习的机会,而容量小的缓冲区则使学习过程更接近于在策略上进行。实现水塘抽样(reservoir sampling)的缓冲区是此策略的一个例外。

RLlib 中的回放缓冲区#

RLlib 内置了一套可扩展的回放缓冲区。它们都支持两个基本方法 add()sample()。我们提供了一个基础 ReplayBuffer 类,您可以从中构建自己的缓冲区。在大多数算法中,我们需要 MultiAgentReplayBuffer。这是因为我们希望它们能够推广到多智能体情况。因此,这些缓冲区的 add()sample() 方法需要一个 policy_id 来处理每个策略的经验。查看 MultiAgentReplayBuffer,了解它如何扩展我们的基础类。您可以找到缓冲区类型和参数作为 RLlib 默认参数的一部分,用于修改它们的行为。它们是 replay_buffer_config 的一部分。

基本用法#

在运行实验时,您很少需要定义自己的回放缓冲区子类,而是配置现有的缓冲区。以下内容来自 RLlib 的示例部分:它运行 R2D2 算法并使用 PER(默认情况下不使用)。高亮行着重于 PER 配置。

可执行示例脚本
"""Simple example of how to modify replay buffer behaviour.

We modify DQN to utilize prioritized replay but supplying it with the
PrioritizedMultiAgentReplayBuffer instead of the standard MultiAgentReplayBuffer.
This is possible because DQN uses the DQN training iteration function,
which includes and a priority update, given that a fitting buffer is provided.
"""

import argparse

import ray
from ray import tune
from ray.tune.result import TRAINING_ITERATION
from ray.rllib.algorithms.dqn import DQNConfig
from ray.rllib.utils.framework import try_import_tf
from ray.rllib.utils.metrics import NUM_ENV_STEPS_SAMPLED_LIFETIME
from ray.rllib.utils.replay_buffers.replay_buffer import StorageUnit

tf1, tf, tfv = try_import_tf()

parser = argparse.ArgumentParser()

parser.add_argument("--num-cpus", type=int, default=0)
parser.add_argument(
    "--framework",
    choices=["tf", "tf2", "torch"],
    default="torch",
    help="The DL framework specifier.",
)
parser.add_argument(
    "--stop-iters", type=int, default=50, help="Number of iterations to train."
)
parser.add_argument(
    "--stop-timesteps", type=int, default=100000, help="Number of timesteps to train."
)

if __name__ == "__main__":
    args = parser.parse_args()

    ray.init(num_cpus=args.num_cpus or None)

    # This is where we add prioritized experiences replay
    # The training iteration function that is used by DQN already includes a priority
    # update step.
    replay_buffer_config = {
        "type": "MultiAgentPrioritizedReplayBuffer",
        # Although not necessary, we can modify the default constructor args of
        # the replay buffer here
        "prioritized_replay_alpha": 0.5,
        "storage_unit": StorageUnit.SEQUENCES,
        "replay_burn_in": 20,
        "zero_init_states": True,
    }

    config = (
        DQNConfig()
        .environment("CartPole-v1")
        .framework(framework=args.framework)
        .env_runners(num_env_runners=4)
        .training(
            model=dict(use_lstm=True, lstm_cell_size=64, max_seq_len=20),
            replay_buffer_config=replay_buffer_config,
        )
    )

    stop_config = {
        NUM_ENV_STEPS_SAMPLED_LIFETIME: args.stop_timesteps,
        TRAINING_ITERATION: args.stop_iters,
    }

    results = tune.Tuner(
        config.algo_class,
        param_space=config,
        run_config=tune.RunConfig(stop=stop_config),
    ).fit()

    ray.shutdown()

提示

由于其普遍性,大多数 Q-learning 算法都支持 PER。所需的优先级更新步骤嵌入在其训练迭代函数中。

警告

如果您的自定义缓冲区需要额外的交互,您也必须更改训练迭代函数!

指定缓冲区类型的方式与指定探索类型的方式相同。以下是指定类型的三种方式

更改回放缓冲区配置
config = (
    DQNConfig()
    .api_stack(
        enable_env_runner_and_connector_v2=False, enable_rl_module_and_learner=False
    )
    .training(replay_buffer_config={"type": ReplayBuffer})
)

another_config = (
    DQNConfig()
    .api_stack(
        enable_env_runner_and_connector_v2=False, enable_rl_module_and_learner=False
    )
    .training(replay_buffer_config={"type": "ReplayBuffer"})
)


yet_another_config = (
    DQNConfig()
    .api_stack(
        enable_env_runner_and_connector_v2=False, enable_rl_module_and_learner=False
    )
    .training(
        replay_buffer_config={"type": "ray.rllib.utils.replay_buffers.ReplayBuffer"}
    )
)

validate_buffer_config(config)
validate_buffer_config(another_config)
validate_buffer_config(yet_another_config)

# After validation, all three configs yield the same effective config
assert (
    config.replay_buffer_config
    == another_config.replay_buffer_config
    == yet_another_config.replay_buffer_config
)

除了 type 外,您还可以指定 capacity 和其他参数。这些参数主要是缓冲区的构造函数参数。存在以下类别

  1. 定义算法如何与回放缓冲区交互的参数。

    例如 worker_side_prioritization,用于决定在哪里计算优先级

  2. 实例化回放缓冲区的构造函数参数。

    例如 capacity,用于限制缓冲区大小

  3. 底层回放缓冲区方法的调用参数。

    例如 prioritized_replay_betaMultiAgentPrioritizedReplayBuffer 用于调用每个底层 PrioritizedReplayBuffersample() 方法

提示

大多数情况下,只需要关注 1. 和 2.。3. 是一项高级功能,支持 MultiAgentReplayBuffer 实例化需要构造函数或默认调用参数的底层缓冲区的用例。

ReplayBuffer 基类#

基础 ReplayBuffer 类仅支持在不同的 StorageUnit 中存储和回放经验。您可以使用 add() 方法将数据添加到缓冲区的存储中,并使用 sample() 方法回放数据。高级缓冲区类型在努力通过继承保持兼容性的同时增加了功能。以下是与 ReplayBuffer 交互的最基本方案的示例。

# We choose fragments because it does not impose restrictions on our batch to be added
buffer = ReplayBuffer(capacity=2, storage_unit=StorageUnit.FRAGMENTS)
dummy_batch = SampleBatch({"a": [1], "b": [2]})
buffer.add(dummy_batch)
buffer.sample(2)
# Because elements can be sampled multiple times, we receive a concatenated version
# of dummy_batch `{a: [1, 1], b: [2, 2,]}`.

构建您自己的 ReplayBuffer#

这里是如何实现您自己的 ReplayBuffer 类的玩具示例并让 SimpleQ 使用它的示例

class LessSampledReplayBuffer(ReplayBuffer):
    @override(ReplayBuffer)
    def sample(
        self, num_items: int, evict_sampled_more_then: int = 30, **kwargs
    ) -> Optional[SampleBatchType]:
        """Evicts experiences that have been sampled > evict_sampled_more_then times."""
        idxes = [random.randint(0, len(self) - 1) for _ in range(num_items)]
        often_sampled_idxes = list(
            filter(lambda x: self._hit_count[x] >= evict_sampled_more_then, set(idxes))
        )

        sample = self._encode_sample(idxes)
        self._num_timesteps_sampled += sample.count

        for idx in often_sampled_idxes:
            del self._storage[idx]
            self._hit_count = np.append(
                self._hit_count[:idx], self._hit_count[idx + 1 :]
            )

        return sample


config = (
    DQNConfig()
    .api_stack(
        enable_env_runner_and_connector_v2=False, enable_rl_module_and_learner=False
    )
    .environment(env="CartPole-v1")
    .training(replay_buffer_config={"type": LessSampledReplayBuffer})
)

tune.Tuner(
    "DQN",
    param_space=config,
    run_config=tune.RunConfig(
        stop={"training_iteration": 1},
    ),
).fit()

对于完整的实现,您应该考虑其他方法,例如 get_state()set_state()。一个更广泛的示例是 我们对水塘抽样的实现,即 ReservoirReplayBuffer

高级用法#

在 RLlib 中,所有回放缓冲区都实现了 ReplayBuffer 接口。因此,它们尽可能支持不同的 StorageUnit。回放缓冲区的 storage_unit 构造函数参数定义了经验如何存储,从而定义了经验采样的单位。稍后调用 sample() 方法时,num_items 将与该 storage_unit 相关。

这里是一个修改 storage_unit 并与自定义缓冲区交互的完整示例

# This line will make our buffer store only complete episodes found in a batch
config.training(replay_buffer_config={"storage_unit": StorageUnit.EPISODES})

less_sampled_buffer = LessSampledReplayBuffer(**config.replay_buffer_config)

# Gather some random experiences
env = RandomEnv()
terminated = truncated = False
batch = SampleBatch({})
t = 0
while not terminated and not truncated:
    obs, reward, terminated, truncated, info = env.step([0, 0])
    # Note that in order for RLlib to find out about start and end of an episode,
    # "t" and "terminateds" have to properly mark an episode's trajectory
    one_step_batch = SampleBatch(
        {
            "obs": [obs],
            "t": [t],
            "reward": [reward],
            "terminateds": [terminated],
            "truncateds": [truncated],
        }
    )
    batch = concat_samples([batch, one_step_batch])
    t += 1

less_sampled_buffer.add(batch)
for i in range(10):
    assert len(less_sampled_buffer._storage) == 1
    less_sampled_buffer.sample(num_items=1, evict_sampled_more_then=9)

assert len(less_sampled_buffer._storage) == 0

如上所述,RLlib 的 MultiAgentReplayBuffer 支持修改底层回放缓冲区。在内部,MultiAgentReplayBuffer 在单独的底层回放缓冲区中按策略存储经验。您可以通过指定底层 replay_buffer_config 来修改它们的行为,其工作方式与父级配置相同。

这里是如何创建一个使用替代底层 ReplayBufferMultiAgentReplayBuffer 的示例。MultiAgentReplayBuffer 可以保持不变。我们只需要指定我们自己的缓冲区以及一个默认的调用参数

config = (
    DQNConfig()
    .api_stack(
        enable_env_runner_and_connector_v2=False, enable_rl_module_and_learner=False
    )
    .training(
        replay_buffer_config={
            "type": "MultiAgentReplayBuffer",
            "underlying_replay_buffer_config": {
                "type": LessSampledReplayBuffer,
                # We can specify the default call argument
                # for the sample method of the underlying buffer method here.
                "evict_sampled_more_then": 20,
            },
        }
    )
    .environment(env="CartPole-v1")
)

tune.Tuner(
    "DQN",
    param_space=config.to_dict(),
    run_config=tune.RunConfig(
        stop={"env_runners/episode_return_mean": 40, "training_iteration": 7},
    ),
).fit()