Learner (Alpha)#

Learner 允许您抽象 RLModules 的训练逻辑。它支持基于梯度和非基于梯度的更新(例如 Polyak 平均等)。该 API 使您能够使用数据并行分发 Learner。Learner 实现以下功能:

  1. 促进对 RLModule 的基于梯度的更新。

  2. 提供非基于梯度的更新的抽象,例如 Polyak 平均等。

  3. 报告训练统计信息。

  4. 为持久化训练对模块和优化器状态进行检查点。

Learner 类支持使用 LearnerGroup API 进行数据并行风格的训练。在此范式下,LearnerGroup 维护相同 Learner 的多个副本,这些副本具有相同的参数和超参数。这些 Learner 实例中的每一个都在样本批次的某个分片上计算损失和梯度,然后跨 Learner 实例累积梯度。更多关于数据并行学习的信息,请参阅 这篇文章。

LearnerGroup 还支持异步训练和(分布式)检查点,以在训练期间保持持久性。

在 RLlib 实验中启用 Learner API#

使用 AlgorithmConfig 中的 num_gpus_per_learnernum_cpus_per_learnernum_learners 参数调整训练的资源量。

from ray.rllib.algorithms.ppo.ppo import PPOConfig
config = (
    PPOConfig()
    .learners(
        num_learners=0,  # Set this to greater than 1 to allow for DDP style updates.
        num_gpus_per_learner=0,  # Set this to 1 to enable GPU training.
        num_cpus_per_learner=1,
    )
)

注意

此功能处于 alpha 阶段。如果您迁移到此算法,请通过 AlgorithmConfig.api_stack(enable_rl_module_and_learner=True, enable_env_runner_and_connector_v2=True) 启用该功能。

以下算法开箱即用地支持 Learner。通过实现自定义 Learner 来利用此 API 来支持其他算法。

算法

支持的框架

PPO

pytorch tensorflow

IMPALA

pytorch tensorflow

APPO

pytorch tensorflow

基本用法#

使用 LearnerGroup 工具与多个 Learner 进行交互。

构造#

如果您通过 AlgorithmConfig 启用了 RLModuleLearner API,则调用 build_algo() 会为您构建一个 LearnerGroup,但如果您单独使用这些 API,则可以按如下方式构造 LearnerGroup

env = gym.make("CartPole-v1")

# Create an AlgorithmConfig object from which we can build the
# LearnerGroup.
config = (
    PPOConfig()
    # Number of Learner workers (Ray actors).
    # Use 0 for no actors, only create a local Learner.
    # Use >=1 to create n DDP-style Learner workers (Ray actors).
    .learners(num_learners=1)
    # Specify the learner's hyperparameters.
    .training(
        use_kl_loss=True,
        kl_coeff=0.01,
        kl_target=0.05,
        clip_param=0.2,
        vf_clip_param=0.2,
        entropy_coeff=0.05,
        vf_loss_coeff=0.5
    )
)

# Construct a new LearnerGroup using our config object.
learner_group = config.build_learner_group(env=env)
env = gym.make("CartPole-v1")

# Create an AlgorithmConfig object from which we can build the
# Learner.
config = (
    PPOConfig()
    # Specify the Learner's hyperparameters.
    .training(
        use_kl_loss=True,
        kl_coeff=0.01,
        kl_target=0.05,
        clip_param=0.2,
        vf_clip_param=0.2,
        entropy_coeff=0.05,
        vf_loss_coeff=0.5
    )
)
# Construct a new Learner using our config object.
learner = config.build_learner(env=env)

# Needs to be called on the learner before calling any functions.
learner.build()

更新#

TIMESTEPS = {"num_env_steps_sampled_lifetime": 250}

# This is a blocking update.
results = learner_group.update(batch=DUMMY_BATCH, timesteps=TIMESTEPS)

# This is a non-blocking update. The results are returned in a future
# call to `update(..., async_update=True)`
_ = learner_group.update(batch=DUMMY_BATCH, async_update=True, timesteps=TIMESTEPS)

# Artificially wait for async request to be done to get the results
# in the next call to
# `LearnerGroup.update(..., async_update=True)`.
time.sleep(5)
results = learner_group.update(
    batch=DUMMY_BATCH, async_update=True, timesteps=TIMESTEPS
)
# `results` is a list of n result dicts from various Learner actors.
assert isinstance(results, list), results
assert isinstance(results[0], dict), results

更新 LearnerGroup 时,您可以对数据批次执行阻塞或异步更新。异步更新对于实现 APPO/IMPALA 等异步算法是必需的。

# This is a blocking update (given a training batch).
result = learner.update(batch=DUMMY_BATCH, timesteps=TIMESTEPS)

更新 Learner 时,您只能对数据批次执行阻塞更新。您可以通过覆盖 before_gradient_based_update()after_gradient_based_update() 在基于梯度的更新之前或之后执行非基于梯度的更新。

获取和设置状态#

# Get the LearnerGroup's RLModule weights and optimizer states.
state = learner_group.get_state()
learner_group.set_state(state)

# Only get the RLModule weights.
weights = learner_group.get_weights()
learner_group.set_weights(weights)

通过 LearnerGroup.set_stateLearnerGroup.get_state 设置/获取 LearnerGroup 中所有 Learner 的状态字典。这包括每个 Learner 上的神经网络权重和所有优化器状态。例如,Adam 优化器的状态包含基于最近计算的梯度的动量信息。如果您只想获取或设置所有 Learner 的 RLModule(神经网络)的权重,可以通过 LearnerGroup API LearnerGroup.get_weightsLearnerGroup.set_weights 来实现。

from ray.rllib.core import COMPONENT_RL_MODULE

# Get the Learner's RLModule weights and optimizer states.
state = learner.get_state()
# Note that `state` is now a dict:
# {
#    COMPONENT_RL_MODULE: [RLModule's state],
#    COMPONENT_OPTIMIZER: [Optimizer states],
# }
learner.set_state(state)

# Only get the RLModule weights (as numpy, not torch/tf).
rl_module_only_state = learner.get_state(components=COMPONENT_RL_MODULE)
# Note that `rl_module_only_state` is now a dict:
# {COMPONENT_RL_MODULE: [RLModule's state]}
learner.module.set_state(rl_module_only_state)

您可以使用 set_state()get_state() 来设置和获取 Learner 的整个状态。要仅获取 RLModule 的权重(不含优化器状态),请在 get_state() 中使用 components=COMPONENT_RL_MODULE 参数(参见上面的代码)。要仅设置 RLModule 的权重(不更改优化器状态),请使用 get_state() 并传入一个字典:{COMPONENT_RL_MODULE: [RLModule's state]} (参见上面的代码)。

检查点#

learner_group.save_to_path(LEARNER_GROUP_CKPT_DIR)
learner_group.restore_from_path(LEARNER_GROUP_CKPT_DIR)

通过 save_to_path() 检查点 LearnerGroup 中所有 Learner 的状态,并通过 restore_from_path() 恢复已保存 LearnerGroup 的状态。LearnerGroup 的状态包括神经网络权重和所有优化器状态。请注意,由于所有 Learner 实例的状态是相同的,因此只保存第一个 Learner 的状态。

learner.save_to_path(LEARNER_CKPT_DIR)
learner.restore_from_path(LEARNER_CKPT_DIR)

通过 save_to_path() 检查点 Learner 的状态,并通过 restore_from_path() 恢复已保存 Learner 的状态。Learner 的状态包括神经网络权重和所有优化器状态。

实现#

Learner 提供了许多 API 以实现灵活的实现,但您需要实现的核心 API 是:

方法

描述

configure_optimizers_for_module()

为 RLModule 设置任何优化器。

compute_loss_for_module()

计算模块的基于梯度的更新的损失。

before_gradient_based_update()

在基于梯度的更新之前(!) 对 RLModule 执行任何非基于梯度的更新,例如给您的网络添加噪声。

after_gradient_based_update()

在基于梯度的更新之后(!) 对 RLModule 执行任何非基于梯度的更新,例如根据某些计划更新损失系数。

入门示例#

一个实现行为克隆的 Learner 可能如下所示:

class BCTorchLearner(TorchLearner):

    @override(Learner)
    def compute_loss_for_module(
        self,
        *,
        module_id: ModuleID,
        config: AlgorithmConfig = None,
        batch: Dict[str, Any],
        fwd_out: Dict[str, TensorType],
    ) -> TensorType:

        # standard behavior cloning loss
        action_dist_inputs = fwd_out[SampleBatch.ACTION_DIST_INPUTS]
        action_dist_class = self._module[module_id].get_train_action_dist_cls()
        action_dist = action_dist_class.from_logits(action_dist_inputs)
        loss = -torch.mean(action_dist.logp(batch[SampleBatch.ACTIONS]))

        return loss