注意

Ray 2.40 默认使用 RLlib 的新 API 堆栈。Ray 团队已基本完成将算法、示例脚本和文档迁移到新代码库的工作。

如果您仍在使用旧的 API 堆栈,请参阅新 API 堆栈迁移指南了解如何迁移。

学习器 (Alpha)#

Learner 允许您抽象 RLModules 的训练逻辑。它支持基于梯度和非基于梯度的更新(例如 polyak averaging 等)。此 API 使您能够使用数据并行 (DDP) 来分布式 Learner。Learner 实现以下功能:

  1. 方便对 RLModule 进行基于梯度的更新。

  2. 为 polyak averaging 等非基于梯度的更新提供抽象。

  3. 报告训练统计信息。

  4. 检查点模块和优化器状态以实现持久化训练。

The Learner 类使用 LearnerGroup API 支持数据并行风格的训练。在此范例下,LearnerGroup 维护多个参数和超参数相同的 Learner 副本。这些 Learner 实例的每一个都在样本批次的某个分片上计算损失和梯度,然后将梯度累积到所有 Learner 实例。在本文中了解更多关于数据并行学习的信息。

LearnerGroup 还支持异步训练和(分布式)检查点,以增强训练期间的持久性。

在 RLlib 实验中启用 Learner API#

使用 AlgorithmConfig 中的 num_gpus_per_learnernum_cpus_per_learnernum_learners 参数调整训练资源数量。

from ray.rllib.algorithms.ppo.ppo import PPOConfig
config = (
    PPOConfig()
    .learners(
        num_learners=0,  # Set this to greater than 1 to allow for DDP style updates.
        num_gpus_per_learner=0,  # Set this to 1 to enable GPU training.
        num_cpus_per_learner=1,
    )
)

注意

此功能处于 alpha 阶段。如果您迁移到此算法,请通过 AlgorithmConfig.api_stack(enable_rl_module_and_learner=True, enable_env_runner_and_connector_v2=True) 启用该功能。

以下算法原生支持 Learner。为其他算法实现一个使用自定义 Learner 的算法以利用此 API。

算法

支持的框架

PPO

pytorch tensorflow

IMPALA

pytorch tensorflow

APPO

pytorch tensorflow

基本用法#

使用 LearnerGroup 实用程序与多个 learner 进行交互。

构造#

如果您通过 AlgorithmConfig 启用了 RLModuleLearner API,则调用 build_algo() 将为您构造一个 LearnerGroup,但如果您独立使用这些 API,可以按如下方式构造 LearnerGroup

env = gym.make("CartPole-v1")

# Create an AlgorithmConfig object from which we can build the
# LearnerGroup.
config = (
    PPOConfig()
    # Number of Learner workers (Ray actors).
    # Use 0 for no actors, only create a local Learner.
    # Use >=1 to create n DDP-style Learner workers (Ray actors).
    .learners(num_learners=1)
    # Specify the learner's hyperparameters.
    .training(
        use_kl_loss=True,
        kl_coeff=0.01,
        kl_target=0.05,
        clip_param=0.2,
        vf_clip_param=0.2,
        entropy_coeff=0.05,
        vf_loss_coeff=0.5
    )
)

# Construct a new LearnerGroup using our config object.
learner_group = config.build_learner_group(env=env)
env = gym.make("CartPole-v1")

# Create an AlgorithmConfig object from which we can build the
# Learner.
config = (
    PPOConfig()
    # Specify the Learner's hyperparameters.
    .training(
        use_kl_loss=True,
        kl_coeff=0.01,
        kl_target=0.05,
        clip_param=0.2,
        vf_clip_param=0.2,
        entropy_coeff=0.05,
        vf_loss_coeff=0.5
    )
)
# Construct a new Learner using our config object.
learner = config.build_learner(env=env)

# Needs to be called on the learner before calling any functions.
learner.build()

更新#

TIMESTEPS = {"num_env_steps_sampled_lifetime": 250}

# This is a blocking update.
results = learner_group.update(batch=DUMMY_BATCH, timesteps=TIMESTEPS)

# This is a non-blocking update. The results are returned in a future
# call to `update(..., async_update=True)`
_ = learner_group.update(batch=DUMMY_BATCH, async_update=True, timesteps=TIMESTEPS)

# Artificially wait for async request to be done to get the results
# in the next call to
# `LearnerGroup.update(..., async_update=True)`.
time.sleep(5)
results = learner_group.update(
    batch=DUMMY_BATCH, async_update=True, timesteps=TIMESTEPS
)
# `results` is a list of n result dicts from various Learner actors.
assert isinstance(results, list), results
assert isinstance(results[0], dict), results

更新 LearnerGroup 时,您可以对数据批次执行阻塞或异步更新。异步更新对于实现 APPO/IMPALA 等异步算法是必需的。

# This is a blocking update (given a training batch).
result = learner.update(batch=DUMMY_BATCH, timesteps=TIMESTEPS)

更新 Learner 时,您只能对数据批次执行阻塞更新。您可以通过重写 before_gradient_based_update()after_gradient_based_update() 来在基于梯度的更新之前或之后执行非基于梯度的更新。

获取和设置状态#

# Get the LearnerGroup's RLModule weights and optimizer states.
state = learner_group.get_state()
learner_group.set_state(state)

# Only get the RLModule weights.
weights = learner_group.get_weights()
learner_group.set_weights(weights)

通过 learner_group 使用 LearnerGroup.set_stateLearnerGroup.get_state 设置/获取所有 learner 的状态字典。这包括每个 learner 上的神经网络权重和优化器状态。例如,Adam 优化器的状态包含基于最近计算梯度的动量信息。如果您只想获取或设置所有 Learner 的 RLModules(神经网络)权重,您可以通过 LearnerGroup API LearnerGroup.get_weightsLearGroup.set_weights 来实现。

from ray.rllib.core import COMPONENT_RL_MODULE

# Get the Learner's RLModule weights and optimizer states.
state = learner.get_state()
# Note that `state` is now a dict:
# {
#    COMPONENT_RL_MODULE: [RLModule's state],
#    COMPONENT_OPTIMIZER: [Optimizer states],
# }
learner.set_state(state)

# Only get the RLModule weights (as numpy, not torch/tf).
rl_module_only_state = learner.get_state(components=COMPONENT_RL_MODULE)
# Note that `rl_module_only_state` is now a dict:
# {COMPONENT_RL_MODULE: [RLModule's state]}
learner.module.set_state(rl_module_only_state)

您可以使用 set_state()get_state() 设置和获取 Learner 的整个状态。如果只想获取 RLModule 的权重(不包含优化器状态),请在 get_state() 中使用 components=COMPONENT_RL_MODULE 参数(参见上面的代码)。如果只想设置 RLModule 的权重(不触及优化器状态),请使用 get_state() 并传入一个字典:{COMPONENT_RL_MODULE: [RLModule's state]}(参见上面的代码)。

检查点#

learner_group.save_to_path(LEARNER_GROUP_CKPT_DIR)
learner_group.restore_from_path(LEARNER_GROUP_CKPT_DIR)

通过 save_to_path() 检查点 LearnerGroup 中所有 learner 的状态,并通过 restore_from_path() 恢复已保存的 LearnerGroup 的状态。LearnerGroup 的状态包括神经网络权重和所有优化器状态。请注意,由于所有 Learner 实例的状态是相同的,因此仅保存第一个 Learner 的状态。

learner.save_to_path(LEARNER_CKPT_DIR)
learner.restore_from_path(LEARNER_CKPT_DIR)

通过 save_to_path() 检查点 Learner 的状态,并通过 restore_from_path() 恢复已保存的 Learner 的状态。Learner 的状态包括神经网络权重和所有优化器状态。

实现#

Learner 提供了许多 API 以实现灵活的实现,但是您需要实现的核心 API 是

方法

描述

configure_optimizers_for_module()

为 RLModule 设置任何优化器。

compute_loss_for_module()

计算模块基于梯度的更新的损失。

before_gradient_based_update()

在基于梯度的更新之前(!)对 RLModule 执行任何非基于梯度的更新,例如为您的网络添加噪声。

after_gradient_based_update()

在基于梯度的更新之后(!)对 RLModule 执行任何非基于梯度的更新,例如根据某个计划更新损失系数。

入门示例#

一个实现行为克隆的 Learner 可能如下所示

class BCTorchLearner(TorchLearner):

    @override(Learner)
    def compute_loss_for_module(
        self,
        *,
        module_id: ModuleID,
        config: AlgorithmConfig = None,
        batch: Dict[str, Any],
        fwd_out: Dict[str, TensorType],
    ) -> TensorType:

        # standard behavior cloning loss
        action_dist_inputs = fwd_out[SampleBatch.ACTION_DIST_INPUTS]
        action_dist_class = self._module[module_id].get_train_action_dist_cls()
        action_dist = action_dist_class.from_logits(action_dist_inputs)
        loss = -torch.mean(action_dist.logp(batch[SampleBatch.ACTIONS]))

        return loss