使用离线数据#

注意

Ray 2.40 默认使用 RLlib 的新 API 栈。Ray 团队已基本完成算法、示例脚本和文档到新代码库的过渡。

如果您仍在使用旧 API 栈,请参阅新 API 栈迁移指南了解如何迁移的详细信息。

RLlib 的离线强化学习 API 使您能够处理从离线存储(例如,磁盘、云存储、流系统、Hadoop 分布式文件系统 (HDFS))读取的经验。例如,您可能想读取从先前训练运行中保存的经验、从专家收集的经验或从部署在Web 应用中的策略收集的经验。您还可以记录在线训练期间产生的新智能体经验,以便将来使用。

RLlib 使用 SingleAgentEpisode 对象表示轨迹序列(例如,(s, a, r, s', ...) 元组)(目前不支持多智能体离线训练)。使用这种 episode 格式可以高效地编码和压缩经验,重写轨迹,并通过 getter 方法实现用户友好的数据访问。在在线训练期间,RLlib 使用 SingleAgentEnvRunner actor 使用当前策略并行生成经验 episode。但是,RLlib 使用相同的 episode 格式从离线存储读取经验和向离线存储写入经验(参见 OfflineSingleAgentEnvRunner)。

您可以将经验直接存储为 RLlib 的 episode 格式或表(列)格式。在以下情况下,您应该使用 episode 格式:

  1. 您需要按轨迹分组并按时间排序的经验(例如,用于训练有状态模块)。

  2. 您只想在 RLlib 内部使用记录的经验(例如,用于离线强化学习或行为克隆)。

相反,在以下情况下,您应该优先选择表(列)格式:

  1. 您需要使用其他数据工具或 ML 库轻松读取数据。

注意

RLlib 的新 API 栈整合了支持独立应用的原则。因此,SingleAgentEpisode 类可以在 RLlib 上下文之外使用。为了通过外部数据工具(例如,进行数据转换)实现更快访问,建议使用表记录格式。

最重要的是,RLlib 的离线强化学习 API 构建在 Ray Data 之上,因此通常支持 Ray Data 支持的所有读写方法(例如 read_parquetread_json 等),其中 read_parquetwrite_parquet 是默认的读写方法。该 API 的核心设计原则是在学习器开始工作之前尽可能地即时应用数据转换,使学习器能够专注于模型更新。

提示

在从旧 API 栈过渡到新 API 栈期间,您还可以将新的离线 RL API 与使用旧 API 栈记录的 SampleBatch 数据一起使用。要启用此功能,请设置 config.offline_data(input_read_sample_batches=True)

示例:训练专家策略#

在此示例中,您在 CartPole-v1 环境中训练一个 PPO 智能体,直到它达到平均 episode 回报 450.0。您对此智能体进行检查点保存,然后使用其策略将专家数据记录到本地磁盘。

from ray.rllib.algorithms.ppo import PPOConfig
from ray.rllib.core.rl_module.default_model_config import DefaultModelConfig
from ray.rllib.utils.metrics import (
    ENV_RUNNER_RESULTS,
    EVALUATION_RESULTS,
    EPISODE_RETURN_MEAN,
)
from ray import tune

# Configure the PPO algorithm.
config = (
    PPOConfig()
    .environment("CartPole-v1")
    .training(
        lr=0.0003,
        # Run 6 SGD minibatch iterations on a batch.
        num_epochs=6,
        # Weigh the value function loss smaller than
        # the policy loss.
        vf_loss_coeff=0.01,
    )
    .rl_module(
        model_config=DefaultModelConfig(
            fcnet_hiddens=[32],
            fcnet_activation="linear",
            # Share encoder layers between value network
            # and policy.
            vf_share_layers=True,
        ),
    )
)

# Define the metric to use for stopping.
metric = f"{EVALUATION_RESULTS}/{ENV_RUNNER_RESULTS}/{EPISODE_RETURN_MEAN}"

# Define the Tuner.
tuner = tune.Tuner(
    "PPO",
    param_space=config,
    run_config=tune.RunConfig(
        stop={
            metric: 450.0,
        },
        name="docs_rllib_offline_pretrain_ppo",
        verbose=2,
        checkpoint_config=tune.CheckpointConfig(
            checkpoint_frequency=1,
            checkpoint_at_end=True,
        ),
    ),
)
results = tuner.fit()

# Store the best checkpoint to use it later for recording
# an expert policy.
best_checkpoint = (
    results
    .get_best_result(
        metric=metric,
        mode="max"
    )
    .checkpoint.path
)

在此示例中,您保存了一个在玩 CartPole-v1 方面成为专家的智能体的检查点。您在下一个示例中使用此检查点将专家数据记录到磁盘,该数据稍后用于离线训练以克隆另一个智能体。

示例:将专家数据记录到本地磁盘#

在您训练好专家策略以玩 CartPole-v1 后,您在此处加载其策略以在评估期间记录专家数据。您使用 5OfflineSingleAgentEnvRunner 实例,每次调用 sample() 收集 50 个完整 episode。在此示例中,您将经验直接存储在 RLlib 的 SingleAgentEpisode 对象中,每个 Parquet 文件不超过 25 个 episode 对象。总共运行 10 次评估,这将产生专家策略记录的 500 个 episode。您在下一个示例中使用这些数据通过离线 RL 训练一个新策略,该策略在玩 CartPole-v1 时应该达到 450.0 的回报。

from ray.rllib.algorithms.ppo import PPOConfig
from ray.rllib.core import (
    COMPONENT_LEARNER_GROUP,
    COMPONENT_LEARNER,
    COMPONENT_RL_MODULE,
    DEFAULT_MODULE_ID,
)
from ray.rllib.core.rl_module import RLModuleSpec

# Store recording data under the following path.
data_path = "/tmp/docs_rllib_offline_recording"

# Configure the algorithm for recording.
config = (
    PPOConfig()
    # The environment needs to be specified.
    .environment(
        env="CartPole-v1",
    )
    # Make sure to sample complete episodes because
    # you want to record RLlib's episode objects.
    .env_runners(
        batch_mode="complete_episodes",
    )
    # Set up 5 evaluation `EnvRunners` for recording.
    # Sample 50 episodes in each evaluation rollout.
    .evaluation(
        evaluation_num_env_runners=5,
        evaluation_duration=50,
        evaluation_duration_unit="episodes",
    )
    # Use the checkpointed expert policy from the preceding PPO training.
    # Note, we have to use the same `model_config` as
    # the one with which the expert policy was trained, otherwise
    # the module state can't be loaded.
    .rl_module(
        model_config=DefaultModelConfig(
            fcnet_hiddens=[32],
            fcnet_activation="linear",
            # Share encoder layers between value network
            # and policy.
            vf_share_layers=True,
        ),
    )
    # Define the output path and format. In this example you
    # want to store data directly in RLlib's episode objects.
    # Each Parquet file should hold no more than 25 episodes.
    .offline_data(
        output=data_path,
        output_write_episodes=True,
        output_max_rows_per_file=25,
    )
)

# Build the algorithm.
algo = config.build()
# Load now the PPO-trained `RLModule` to use in recording.
algo.restore_from_path(
    best_checkpoint,
    # Load only the `RLModule` component here.
    component=COMPONENT_RL_MODULE,
)

# Run 10 evaluation iterations and record the data.
for i in range(10):
    print(f"Iteration {i + 1}")
    eval_results = algo.evaluate()
    print(eval_results)

# Stop the algorithm. Note, this is important for when
# defining `output_max_rows_per_file`. Otherwise,
# remaining episodes in the `EnvRunner`s buffer isn't written to disk.
algo.stop()

注意

RLlib 将存储的 episode 数据格式化为 binary。每个 episode 都转换为其字典表示并使用 msgpack-numpy 序列化,确保版本兼容性。

RLlib 的记录过程高效,因为它在评估期间利用了多个 OfflineSingleAgentEnvRunner 实例,实现了并行数据写入。您可以浏览文件夹查看存储的 Parquet 数据

$ ls -la /tmp/docs_rllib_offline_recording/cartpole-v1

drwxr-xr-x. 22 user user 440 21. Nov 17:23 .
drwxr-xr-x.  3 user user  60 21. Nov 17:23 ..
drwxr-xr-x.  2 user user 540 21. Nov 17:23 run-000001-00004
drwxr-xr-x.  2 user user 540 21. Nov 17:23 run-000001-00009
drwxr-xr-x.  2 user user 540 21. Nov 17:23 run-000001-00012
drwxr-xr-x.  2 user user 540 21. Nov 17:23 run-000001-00016
drwxr-xr-x.  2 user user 540 21. Nov 17:23 run-000002-00004
drwxr-xr-x.  2 user user 540 21. Nov 17:23 run-000002-00007

提示

RLlib 在以 RL 环境命名的文件夹下存储记录。您会看到每个 OfflineSingleAgentEnvRunner 和写操作对应一个 Parquet 文件文件夹。写操作计数在第二个编号中给出。例如:上面,env-runner 1 在其第 4 次 sample() 调用中采样了 25 个 episode,然后(因为 output_max_rows_per_file=25)将所有采样的 episode 写入磁盘到文件 run-000001-00004 中。

注意

每个 worker 的写操作数量可能因策略 rollout 分布不均而异。更快的 worker 收集更多 episode,导致写操作计数不同。因此,不同 env-runner 实例生成的文件中的第二个编号可能不同。

示例:在之前保存的经验上进行训练#

在此示例中,您使用之前从您玩 CartPole-v1 的专家策略记录的 Parquet 数据进行行为克隆。数据需要在算法配置中链接(通过 input_ 属性)。

from ray import tune
from ray.rllib.algorithms.bc import BCConfig

# Setup the config for behavior cloning.
config = (
    BCConfig()
    .environment(
        # Use the `CartPole-v1` environment from which the
        # data was recorded. This is merely for receiving
        # action and observation spaces and to use it during
        # evaluation.
        env="CartPole-v1",
    )
    .learners(
        # Use a single learner.
        num_learners=0,
    )
    .training(
        # This has to be defined in the new offline RL API.
        train_batch_size_per_learner=1024,
    )
    .offline_data(
        # Link the data.
        input_=[data_path],
        # You want to read in RLlib's episode format b/c this
        # is how you recorded data.
        input_read_episodes=True,
        # Read smaller batches from the data than the learner
        # trains on. Note, each batch element is an episode
        # with multiple timesteps.
        input_read_batch_size=512,
        # Create exactly 2 `DataWorkers` that transform
        # the data on-the-fly. Give each of them a single
        # CPU.
        map_batches_kwargs={
            "concurrency": 2,
            "num_cpus": 1,
        },
        # When iterating over the data, prefetch two batches
        # to improve the data pipeline. Don't shuffle the
        # buffer (the data is too small).
        iter_batches_kwargs={
            "prefetch_batches": 2,
            "local_shuffle_buffer_size": None,
        },
        # You must set this for single-learner setups.
        dataset_num_iters_per_learner=1,
    )
    .evaluation(
        # Run evaluation to see how well the learned policy
        # performs. Run every 3rd training iteration an evaluation.
        evaluation_interval=3,
        # Use a single `EnvRunner` for evaluation.
        evaluation_num_env_runners=1,
        # In each evaluation rollout, collect 5 episodes of data.
        evaluation_duration=5,
        # Evaluate the policy parallel to training.
        evaluation_parallel_to_training=True,
    )
)

# Set the stopping metric to be the evaluation episode return mean.
metric = f"{EVALUATION_RESULTS}/{ENV_RUNNER_RESULTS}/{EPISODE_RETURN_MEAN}"

# Configure Ray Tune.
tuner = tune.Tuner(
    "BC",
    param_space=config,
    run_config=tune.RunConfig(
        name="docs_rllib_offline_bc",
        # Stop behavior cloning when we reach 450 in return.
        stop={metric: 450.0},
        checkpoint_config=tune.CheckpointConfig(
            # Only checkpoint at the end to be faster.
            checkpoint_frequency=0,
            checkpoint_at_end=True,
        ),
        verbose=2,
    )
)
# Run the experiment.
analysis = tuner.fit()

RLlib 中的行为克隆性能很高,完成一次训练迭代大约需要 2 毫秒。实验结果应类似于以下内容

Episode mean return over the course of BC training.

您应该需要大约 98 秒(456 次迭代)才能达到与 PPO 智能体相同的 episode 平均回报。虽然与 PPO 训练时间相比这可能看起来不太令人印象深刻,但需要注意的是,CartPole-v1 是一个非常简单的学习环境。在更复杂的环境中,需要更复杂的智能体和更长的训练时间,通过行为克隆进行预训练会非常有益。将行为克隆与随后的强化学习算法微调相结合,可以显著减少训练时间、资源消耗和相关成本。

使用外部专家经验#

您的专家数据通常已经可用,要么是从操作系统中记录的,要么是由人类专家直接提供的。通常,您可能会将这些数据存储为表格(列式)格式。RLlib 的新离线 RL API 通过允许通过指定组织专家数据的 schema 直接摄取数据来简化此类数据的使用。用于读取数据的 API 默认 schema 在 SCHEMA 中提供。

让我们考虑一个简单示例,其中您的专家数据以 schema:(o_t, a_t, r_t, o_tp1, d_t, i_t, logprobs_t) 存储。在这种情况下,您提供此 schema 如下

from ray.rllib.algorithms.bc import BCConfig
from ray.rllib.core.columns import Columns

config = (
    BCConfig()
    ...
    .offline_data(
        input_=[<input_path>],
        # Provide the schema of your data (map to column names known to RLlib).
        input_read_schema={
            Columns.OBS: "o_t",
            Columns.ACTIONS: "a_t",
            Columns.REWARDS: "r_t",
            Columns.NEXT_OBS: "o_tp1",
            Columns.INFOS: "i_t",
            "done": "d_t",
        },
    )
)

注意

在内部,旧版 gymdone 信号被映射到 gymnasiumterminated 信号,truncated 值默认为 False。RLlib 的 SingleAgentEpisode 结构与 gymnasium 对齐,遵循强化学习中更新的环境 API 标准。

将表格数据转换为 RLlib 的 episode 格式#

虽然表格格式具有广泛的兼容性并与 RLlib 的新离线 RL API 无缝集成,但在某些情况下您可能更喜欢使用 RLlib 的原生 episode 格式。正如前面简要提及的,这种情况通常发生在需要完整专家轨迹时。

注意

RLlib 分批处理表格数据,将每一行转换为一个 单步 episode。这种方法主要是出于程序简单性,因为数据通常不能假设以按时间顺序排列的、按 episode 分组的行形式到达,尽管偶尔可能会出现这种情况(但对此结构的了解在于用户,因为 RLlib 无法轻松自动推断它)。虽然可以将连续的 SingleAgentEpisode 块连接起来,但对于以某些乱序到达的块则无法做到。

如果您需要完整轨迹,可以将表格数据转换为 SingleAgentEpisode 对象,并将其存储为 Parquet 格式。下一个示例展示了如何操作。首先,您将先前训练好的专家策略的经验以表格格式存储(注意下面的 output_write_episodes=False 设置以激活表格数据输出)

from ray.rllib.algorithms.ppo import PPOConfig
from ray.rllib.core import (
    COMPONENT_LEARNER_GROUP,
    COMPONENT_LEARNER,
    COMPONENT_RL_MODULE,
    DEFAULT_MODULE_ID,
)
from ray.rllib.core.rl_module import RLModuleSpec

# Set up a path for the tabular data records.
tabular_data_path = "tmp/docs_rllib_offline_recording_tabular"

# Configure the algorithm for recording.
config = (
    PPOConfig()
    # The environment needs to be specified.
    .environment(
        env="CartPole-v1",
    )
    # Make sure to sample complete episodes because
    # you want to record RLlib's episode objects.
    .env_runners(
        batch_mode="complete_episodes",
    )
    # Set up 5 evaluation `EnvRunners` for recording.
    # Sample 50 episodes in each evaluation rollout.
    .evaluation(
        evaluation_num_env_runners=5,
        evaluation_duration=50,
    )
    # Use the checkpointed expert policy from the preceding PPO training.
    # Note, we have to use the same `model_config` as
    # the one with which the expert policy was trained, otherwise
    # the module state can't be loaded.
    .rl_module(
        model_config=DefaultModelConfig(
            fcnet_hiddens=[32],
            fcnet_activation="linear",
            # Share encoder layers between value network
            # and policy.
            vf_share_layers=True,
        ),
    )
    # Define the output path and format. In this example you
    # want to store data directly in RLlib's episode objects.
    .offline_data(
        output=tabular_data_path,
        # You want to store for this example tabular data.
        output_write_episodes=False,
    )
)

# Build the algorithm.
algo = config.build()
# Load the PPO-trained `RLModule` to use in recording.
algo.restore_from_path(
    best_checkpoint,
    # Load only the `RLModule` component here.
    component=COMPONENT_RL_MODULE,
)

# Run 10 evaluation iterations and record the data.
for i in range(10):
    print(f"Iteration {i + 1}")
    res_eval = algo.evaluate()
    print(res_eval)

# Stop the algorithm. Note, this is important for when
# defining `output_max_rows_per_file`. Otherwise,
# remaining episodes in the `EnvRunner`s buffer isn't written to disk.
algo.stop()

您可能已经注意到,以表格格式记录数据比以 episode 格式记录要慢得多。这种性能下降是由于需要额外的后处理才能将 episode 数据转换为列式格式。要确认记录的数据现在是列式格式,您可以打印其 schema

from ray import data

# Read the tabular data into a Ray dataset.
ds = ray.data.read_parquet(tabular_data_path)
# Now, print its schema.
print("Tabular data schema of expert experiences:\n")
print(ds.schema())

# Column              Type
# ------              ----
# eps_id              string
# agent_id            null
# module_id           null
# obs                 numpy.ndarray(shape=(4,), dtype=float)
# actions             int32
# rewards             double
# new_obs             numpy.ndarray(shape=(4,), dtype=float)
# terminateds         bool
# truncateds          bool
# action_dist_inputs  numpy.ndarray(shape=(2,), dtype=float)
# action_logp         float
# weights_seq_no      int64

注意

infos 全部为空时,它们不会存储到磁盘。

如果您的专家数据是以列式格式提供的,并且您需要在完整的专家轨迹上进行训练,您可以按照以下示例中的代码将您自己的数据转换为 RLlib 的 SingleAgentEpisode 对象

import gymnasium as gym
import msgpack
import msgpack_numpy as mnp

from collections import defaultdict

from ray import data
from ray.rllib.env.single_agent_episode import SingleAgentEpisode

# Load the dataset with the tabular data.
ds = data.read_parquet(tabular_data_path)

# Build the environment from which the data was sampled to get the
# spaces.
env = gym.make("CartPole-v1")
# Define buffers for episode data.
eps_obs = []
eps_actions = []
eps_rewards = []
# Note, extra-model-outputs needs to be a dictionary with list
# values.
eps_extra_model_outputs = defaultdict(list)
# Define a buffer for unwritten episodes.
episodes = []

# Start iterating over the rows of your experience data.
for i, row in enumerate(ds.iter_rows(prefetch_batches=10)):
    # If the episode isn't terminated nor truncated, buffer the data.
    if not row["terminateds"] and not row["truncateds"]:
        eps_obs.append(row["obs"])
        eps_actions.append(row["actions"])
        eps_rewards.append(row["rewards"])
        eps_extra_model_outputs["action_dist_inputs"].append(row["action_dist_inputs"])
        eps_extra_model_outputs["action_logp"].append(row["action_logp"])
    # Otherwise, build the episode.
    else:
        eps_obs.append(row["new_obs"])
        episode = SingleAgentEpisode(
            id_=row["eps_id"],
            agent_id=row["agent_id"],
            module_id=row["module_id"],
            observations=eps_obs,
            # Use the spaces from the environment.
            observation_space=env.observation_space,
            action_space=env.action_space,
            actions=eps_actions,
            rewards=eps_rewards,
            # Set the starting timestep to zero.
            t_started=0,
            # You don't want to have a lookback buffer.
            len_lookback_buffer=0,
            terminated=row["terminateds"],
            truncated=row["truncateds"],
            extra_model_outputs=eps_extra_model_outputs,
        )
        # Store the ready-to-write episode to the episode buffer.
        episodes.append(msgpack.packb(episode.get_state(), default=mnp.encode))
        # Clear all episode data buffers.
        eps_obs.clear()
        eps_actions.clear()
        eps_rewards.clear()
        eps_extra_model_outputs = defaultdict(list)

    # Write episodes to disk when the episode buffer holds 50 episodes.
    if len(episodes) > 49:
        # Generate a Ray dataset from episodes.
        episodes_ds = data.from_items(episodes)
        # Write the Parquet data and compress it.
        episodes_ds.write_parquet(
            f"/tmp/test_converting/file-{i}".zfill(6),
            compression="gzip",
        )
        # Delete the dataset in memory and clear the episode buffer.
        del episodes_ds
        episodes.clear()

# If we are finished and have unwritten episodes, write them now.
if len(episodes) > 0:
    episodes_ds = data.from_items(episodes)
    episodes_ds.write_parquet(
        f"/tmp/test_converting/file-{i}".zfill(6),
        compression="gzip",
    )
    del episodes_ds
    episodes.clear()

使用旧 API 栈 SampleBatch 记录#

如果您有之前使用 RLlib 旧 API 栈记录的专家数据,可以通过设置 input_read_sample_batches=True 将其无缝地用于新栈的离线 RL API。或者,您可以使用 RLlib 的 OfflinePreLearnerSampleBatch 记录转换为 SingleAgentEpisode 格式,如下所示

import msgpack
import msgpack_numpy as mnp

from ray import data
from ray.rllib.offline.offline_prelearner import OfflinePreLearner

# Set up the data path to your `SampleBatch` expert data.
data_path = ...
# Set up the write path for the Parquet episode data.
output_data_path = "/tmp/sample_batch_data"

# Load the `SampleBatch` recordings.
ds = data.read_json(data_path)

# Iterate over batches (of `SampleBatch`es) and convert them to episodes.
for i, batch in enumerate(ds.iter_batches(batch_size=100, prefetch_batches=2)):
    # Use the RLlib's `OfflinePreLearner` to convert `SampleBatch`es to episodes.
    episodes = OfflinePreLearner._map_sample_batch_to_episode(False, batch)["episodes"]

    # Create a dataset from the episodes. Note, for storing episodes you need to
    # serialize them through `msgpack-numpy`.
    episode_ds = data.from_items([msgpack.packb(eps.get_state(), default=mnp.encode) for eps in episodes])
    # Write the batch of episodes to local disk.
    episode_ds.write_parquet(output_data_path + f"/file-{i}".zfill(6), compression="gzip")

print("Finished converting `SampleBatch` data to episode data.")

注意

RLlib 将您的 SampleBatch 视为代表一个已终止/截断的 episode,并根据此假设构建其 SingleAgentEpisode

预处理、过滤和后处理#

在记录过程中,您的专家策略可能会利用对观察进行预处理的技术,例如 帧堆叠,或过滤方法,例如 均值-标准差过滤。同样,动作也可能经过预处理,例如 动作采样缩放。在其 EnvRunner 实例中,RLlib 在将观察传递给 RLModule 之前 应用此类预处理和过滤(通过 环境到模块 连接器流水线)。然而,原始观察(直接从环境接收到的)存储在 episode 中。同样,动作以其原始形式(直接从 RLModule 输出的)记录,同时在发送到环境之前进行预处理(通过 RLlib 的 模块到环境 连接器)。

仔细考虑在经验记录期间应用的预处理和过滤至关重要,因为它们显著影响专家策略如何学习以及随后在环境中的表现。例如,如果专家策略对观察使用 均值-标准差过滤,它会学习基于过滤观察的策略,而过滤器本身高度依赖于训练期间收集的经验。部署此专家策略时,必须在评估期间使用完全相同的过滤器以避免性能下降。同样,通过行为克隆训练的策略也可能需要对观察进行 均值-标准差过滤 以准确复制专家策略的行为。

扩展 I/O 吞吐量#

就像可以扩展在线训练一样,离线记录的 I/O 吞吐量也可以通过配置 RLlib env-runner 的数量来增加。使用 num_env_runners 设置在训练期间扩展记录,或使用 evaluation_num_env_runners 在仅评估记录期间进行扩展。每个 worker 独立运行,并行写入经验,从而实现写操作 I/O 吞吐量的线性扩展。在每个 OfflineSingleAgentEnvRunner 中,episode 在写入磁盘之前进行采样和序列化。

RLlib 中的离线 RL 训练是高度并行的,包括数据读取、后处理以及适用的更新。在使用离线数据进行训练时,通过增加用于将离线经验转换为学习器兼容格式(MultiAgentBatch)的 DataWorker 实例数量来实现可扩展性。Ray Data 通过利用文件元数据、预定义的批处理并发设置以及可用的系统资源,在内部优化读取操作。强烈建议不要覆盖这些默认设置,因为这样做可能会破坏此优化过程。

RLlib 中的数据处理涉及三个关键层,所有这些层都具有高度可扩展性

  1. 读取操作: 该层处理从指定文件夹中的文件摄取数据。它由 Ray Data 自动优化,不应手动扩展或调整。

  2. 后处理 (PreLearner): 在此阶段,如果需要,批次会转换为 RLlib 的 SingleAgentEpisode 格式,并通过 学习器连接器流水线。然后将处理后的数据转换为 MultiAgentBatch 对象进行更新。此层可以扩展 DataWorker 实例。

  3. 更新 (Learner): 此阶段涉及更新策略和相关模块。通过增加学习器数量(num_learners)来实现可扩展性,从而在更新期间并行处理批次。

下图展示了这些层及其可扩展性

Key layers of RLlib's fully scalable Offline RL API.

读取操作完全在 CPU 上执行,主要通过分配额外资源进行扩展(详情请参阅如何调优性能),因为它们完全由 Ray Data 管理。后处理可以通过增加映射操作的关键字参数中指定的并发级别进行扩展

config = (
    AlgorithmConfig()
    .offline_data(
        map_batches_kwargs={
            "concurrency": 10,
            "num_cpus": 4,
        }
    )
)

这将启动一个包含 10 个 DataWorker 实例的 actor 池,每个实例运行一个 RLlib 可调用类 OfflinePreLearner 的实例,用于后处理批次以更新 RLModule

注意

num_cpus (以及类似的 num_gpus)属性定义了分配给每个 DataWorker 的资源,而不是整个 actor 池的资源。

您在 RLlib 的 learners() 配置块中扩展学习器数量

config = (
    AlgorithmConfig()
    .learners(
        num_learners=4,
        num_gpus_per_learner=1,
    )
)

通过此配置,您可以启动一个包含 4 个(远程)Learner 的应用(有关 RLlib 学习器的更多详细信息,请参阅 Learner (Alpha)),每个学习器使用一个 GPU。

使用云存储#

与 RLlib 之前的栈不同,新的离线 RL API 是云无关的,并且与 PyArrow 完全集成。您可以使用任何可用的云存储路径或 PyArrow 兼容的文件系统。如果使用 PyArrow 或兼容的文件系统,请确保您的 input_ 路径是此文件系统内的相对路径。与 Ray Data 类似,您还可以使用占位符、文件或文件夹列表,或者简单地指定一个文件夹来递归读取。

例如,要从 GCS 中的存储桶读取,您可以按如下方式指定文件夹位置

config=(
    AlgorithmConfig()
    .offline_data(
        input_="gs://<your-bucket>/dir1",
    )
)

此配置允许 RLlib 从指定路径下的任何文件夹递归读取数据。如果您正在使用 GCS 的文件系统(例如,由于身份验证要求),请使用以下语法

import pyarrow.fs

# Define the PyArrow filesystem
gcs = pyarrow.fs.GcsFilesystem(
    # This is needed to resolve the hostname for public buckets.
    anonymous=True,
    retry_time_limit=timedelta(seconds=15)
)

# Define the configuration.
config= (
    AlgorithmConfig()
    .offline_data(
        # NOTE: Use a relative file path now
        input_="<public-bucket>/dir1",
        input_filesystem=gcs,
    )
)

您可以在 PyArrow 文件系统接口中了解更多关于 PyArrow 文件系统的信息,特别是关于云文件系统和所需的身份验证。

使用云存储进行记录#

在记录专家策略的经验时,您可以以类似的方式使用云存储

config= (
    AlgorithmConfig()
    .offline_data(
        output="gs://<your-bucket>/dir1",
    )
)

RLlib 然后直接写入云存储中的文件夹,如果存储桶中尚不存在,则创建该文件夹。与读取的唯一区别是您不能使用多个路径进行写入。因此,像这样的情况

config= (
    AlgorithmConfig()
    .offline_data(
        output=["gs://<your-bucket>/dir1", "gs://<your-bucket>/dir2"],
    )
)

不起作用。如果存储需要创建文件夹和/或写入文件的特殊权限,请确保集群用户被授予了必要的权限。否则会导致写入权限被拒绝,从而导致记录过程停止。

注意

使用云存储时,Ray Data 通常会流式传输数据,这意味着数据会分块消耗。这使得后处理和训练可以在短暂的预热阶段后开始。更具体地说,即使您的云存储很大,运行 RLlib 的节点也不需要相同的存储空间。

如何调优性能#

在 RLlib 的离线 RL API 中,各个关键层由不同的模块和配置管理,这使得有效扩展这些层并非易事。理解特定参数及其对系统性能的影响非常重要。

如何调优读取操作#

如前所述,读取操作层由 Ray Data 自动处理和动态优化。强烈建议避免修改此过程。但是,某些参数可以在一定程度上提升该层的性能,包括

  1. 可用资源(专用于作业)。

  2. 数据本地性。

  3. 数据分片。

  4. 数据修剪。

可用资源#

Ray Data 采用的调度策略独立于任何现有放置组,独立调度任务和 actor。因此,为作业中的其他任务和 actor 预留足够的资源至关重要。为了优化 Ray Data 读取操作的可扩展性并提高读取性能,考虑增加集群中的可用资源,同时保留对现有任务和 actor 的资源分配。需要监控和预置的关键资源是 CPU 和对象存储内存。对象存储内存不足,尤其是在重负载压力下,可能导致对象溢出到磁盘,这会严重影响应用性能。

带宽是影响集群内吞吐量的关键因素。在某些情况下,扩展节点数量可以增加带宽,从而增强数据从存储到消费进程的流动。此方法有益的场景包括

  • 与网络主干的独立连接:节点使用专用带宽,避免共享上行链路和潜在瓶颈(例如,有关 AWS 的信息请参阅此处,有关 GCP 网络带宽文档请参阅此处)。

  • 优化云访问:采用诸如 S3 Transfer AccelerationGoogle Cloud Storage FUSE 或并行加速数据传输方法等功能以增强性能。

数据本地性#

数据本地性是实现快速数据处理的关键因素。例如,如果您的数据位于 GCP 上,而在 AWS S3 或本地机器上运行 Ray 集群,则不可避免地会导致传输速率低和数据处理缓慢。为了确保最佳性能,将数据存储在与 Ray 集群相同的区域、相同的区域和云提供商中通常足以启用 RLlib 离线 RL API 的高效流式传输。其他需要考虑的调整包括

  • 多区域存储桶:使用多区域存储来提高数据可用性,并可能增强分布式系统的访问速度。

  • 存储桶内的存储类优化:对于频繁访问和低延迟流,使用标准存储。避免将 AWS Glacier 或 GCP Archive 等归档存储类用于流式工作负载,因为检索时间很高。

数据分片#

数据分片通过平衡块大小来提高获取、传输和读取数据的效率。如果块太大,可能导致传输和处理延迟,造成瓶颈。相反,过小的块可能导致元数据获取开销高,降低整体性能。找到最佳块大小对于平衡这些权衡并最大化吞吐量至关重要。

  • 经验法则:将数据文件大小保持在 64MiB 到 256MiB 之间。

数据修剪#

如果您的数据采用 Parquet 格式(RLlib 推荐的离线数据格式),您可以利用数据修剪来优化性能。Ray Data 在其 read_parquet() 方法中通过投影下推(列过滤)和过滤下推(行过滤)支持修剪。这些过滤器直接在文件扫描期间应用,减少了加载到内存中的不必要数据量。

例如,如果您只需要离线数据中的特定列(例如,避免加载 infos 列)

from ray.rllib.algorithms.algorithm_config import AlgorithmConfig
from ray.rllib.core.columns import Columns

config = (
    AlgorithmConfig()
    .offline_Data(
        input_read_method_kwargs={
            "columns": [
                Columns.EPS_ID,
                Columns.AGENT_ID,
                Columns.OBS,
                Columns.NEXT_OBS,
                Columns.REWARDS,
                Columns.ACTIONS,
                Columns.TERMINATED,
                Columns.TRUNCATED,
            ],
        },
    )
)

同样,如果您只需要数据集中的特定行,可以应用下推过滤器,如下所示

import pyarrow.dataset

from ray.rllib.algorithms.algorithm_config import AlgorithmConfig
from ray.rllib.core.columns import Columns

config = (
    AlgorithmConfig()
    .offline_data(
        input_read_method_kwargs={
            "filter": pyarrow.dataset.field(Columns.AGENT_ID) == "agent_1",
        },
    )
)

如何调优后处理 (PreLearner)#

在启用读取操作的高吞吐量时,确保后处理 (Pre-Learner) 阶段有足够的处理能力至关重要。此阶段容量不足可能导致背压,增加内存使用,严重时甚至导致对象溢出到磁盘或发生内存不足(参见内存不足预防)错误。

调优后处理 (Pre-Learner) 层通常比优化读取操作层更直接。调优后处理 (Pre-Learner) 层通常比优化读取操作层更直接。可以通过调整以下参数来优化其性能

  • Actor 池大小

  • 分配的资源

  • 读取批次和缓冲区大小。

Actor 池大小#

在内部,后处理 (PreLearner) 层由 map_batches() 操作定义,该操作启动一个 _ActorPool。此池中的每个 actor 运行一个 OfflinePreLearner 实例,用于后处理从磁盘到 RLlib Learner 的批次。显然,此 _ActorPool 的大小定义了此层的吞吐量,需要根据前一层的吞吐量进行微调,以避免背压。您可以使用 RLlib map_batches_kwargs 参数中的 concurrency 来定义此池大小

from ray.rllib.algorithm_config import AlgorithmConfig

config = (
    AlgorithmConfig()
    .offline_data(
        map_batches_kwargs={
            "concurrency": 4,
        },
    )
)

使用上述代码,您将使 Ray Data 能够启动最多 4 个并行 OfflinePreLearner actor,它们可以后处理您的数据以进行训练。

注意

Ray Data 根据您后处理 (Pre-Learner) 层的并行度动态调整其读取操作。它根据后处理 (Pre-Learner) 阶段的背压向上或向下扩展读取操作。这意味着整个流式管道的吞吐量由下游任务的性能以及分配给读取操作层的资源决定(参见如何调优读取操作)。然而,由于与读取操作向上或向下扩展相关的开销,背压——在严重情况下还会导致对象溢出或内存不足 (OOM) 错误——并非总是能完全避免。

您还可以通过提供一个区间而不是一个固定数字来启用后处理 (PreLearner) 中的自动扩展

from ray.rllib.algorithm_config import AlgorithmConfig

config = (
    AlgorithmConfig()
    .offline_data(
        map_batches_kwargs={
            "concurrency": (4, 8),
        },
    )
)

这使得 Ray Data 能够启动最多 8 个后处理 actor,以便更快地处理下游数据,例如在发生背压的情况下。

注意

后处理 (Pre-Learner) 层实现自动扩展的 actor 池并不能保证消除背压。添加更多 OffLinePreLearner 实例会给系统引入额外的开销。RLlib 的离线 RL 流水线针对流式数据进行了优化,流式数据通常表现出稳定的吞吐量和资源使用,除非上游和下游任务之间存在不平衡。经验法则建议仅在以下情况下考虑使用自动扩展:(1)吞吐量预期高度可变,(2)集群资源波动较大(例如,在共享或动态环境中),和/或(3)工作负载特性高度不可预测。

分配的资源#

除了后处理 actor 的数量之外,您还可以通过定义分配给 actor 池中每个 OffLinePreLearner 的资源来调优后处理 (PreLearner) 层的性能。这些资源可以通过 num_cpusnum_gpus 或在 ray_remote_args 中定义。

注意

通常,增加 CPU 数量足以在流水线的后处理阶段进行性能调优。仅在特殊情况下才需要 GPU,例如在定制流水线中。例如,RLlib 的 MARWIL 实现使用其 ConnectorPipelineV2 中的 GeneralAdvantageEstimation 连接器对经验批次应用通用优势估计。在这些计算中,应用算法的 RLModule 的价值模型,您可以通过在 GPU 上运行来加速此过程。

例如,为了为后处理 (PreLearner) 中的 4OfflinePreLearner 各提供 2 个 CPU,您可以使用以下语法

from ray.rllib.algorithms.algorithm_config import AlgorithmConfig

config = (
    AlgorithmConfig()
    .offline_data(
        map_batches_kwargs={
            "concurrency": 4,
            "num_cpus": 2,
        },
    )
)

警告

不要覆盖 RLlib 的 map_batches_kwargs 中的 batch_size。这通常会导致严重的性能下降。请注意,此 batch_sizetrain_batch_size_per_learner 不同:前者指定流式管道转换中的批次大小,而后者定义了在每个 Learner 内部用于训练的批次大小(即实际模型前向和后向传递的训练批次大小)。

读取批次和缓冲区大小#

当处理来自 SingleAgentEpisode 或旧版 SampleBatch 格式的数据时,微调 input_read_batch_size 参数提供了额外的优化机会。此参数控制从数据文件中检索的批次大小。在处理 episodic 或旧版 SampleBatch 数据时,其效果尤为显著,因为流式管道对这些数据利用 EpisodeReplayBuffer 来处理每行数据中包含的多个时间步。所有输入数据都会转换为 SingleAgentEpisode 实例(如果尚未采用此格式),并存储在 episode 重放缓冲区中,该缓冲区精确管理 train_batch_size_per_learner 的采样用于训练。

The OfflinePreLearner converts and buffers episodes before sampling the batches used in learning.

在流式处理流水线中实现数据摄取效率和采样变异之间的最佳平衡至关重要。考虑以下示例:假设每个 SingleAgentEpisode 的长度为 100 个时间步,并且您的 train_batch_size_per_learner 配置为 1000。每个 EpisodeReplayBuffer 实例的容量设置为 1000

from ray.rllib.algorithms.algorithm_config import AlgorithmConfig

config = (
    AlgorithmConfig()
    .training(
        # Train on a batch of 1000 timesteps each iteration.
        train_batch_size_per_learner=1000,
    )
    .offline_data(
        # Read in RLlib's new stack `SingleAgentEpisode` data.
        input_read_episodes=True
        # Define an input read batch size of 10 episodes.
        input_read_batch_size=10,
        # Set the replay buffer in the `OfflinePrelearner`
        # to 1,000 timesteps.
        prelearner_buffer_kwargs={
            "capacity": 1000,
        },
    )
)

如代码所示,如果您将 input_read_batch_size 配置为 10,则 10SingleAgentEpisode 中的每一个都适合缓冲区,从而可以在多个 episodes 的各种时间步长上进行采样。这会导致高采样变异。现在,考虑缓冲区容量减少到 500 的情况。

from ray.rllib.algorithms.algorithm_config import AlgorithmConfig

config = (
    AlgorithmConfig()
    .training(
        # Train on a batch of 1000 timesteps each iteration.
        train_batch_size_per_learner=1000,
    )
    .offline_data(
        # Read in RLlib's new stack `SingleAgentEpisode` data.
        input_read_episodes=True
        # Define an input read batch size of 10 episodes.
        input_read_batch_size=10,
        # Set the replay buffer in the `OfflinePrelearner`
        # to 500 timesteps.
        prelearner_buffer_kwargs={
            "capacity": 500,
        },
    )
)

在相同的 input_read_batch_size 下,一次只能缓冲 5SingleAgentEpisode,这会导致效率低下,因为读取的数据量多于可以保留用于采样的数据量。

在另一种场景中,如果每个 SingleAgentEpisode 的长度仍然是 100 个时间步,并且 train_batch_size_per_learner 被设置为 4000 个时间步(如下面的代码所示),则缓冲区会保存 10SingleAgentEpisode 实例。这种配置会导致较低的采样变异,因为许多时间步会被重复采样,从而降低了训练批次的多样性。这些示例突出说明了调整这些参数以有效平衡离线流式处理流水线中的数据摄取和采样多样性的重要性。

from ray.rllib.algorithms.algorithm_config import AlgorithmConfig

config = (
    AlgorithmConfig()
    .training(
        # Train on a batch of 4000 timesteps each iteration.
        train_batch_size_per_learner=4000,
    )
    .offline_data(
        # Read in RLlib's new stack `SingleAgentEpisode` data.
        input_read_episodes=True
        # Define an input read batch size of 10 episodes.
        input_read_batch_size=10,
        # Set the replay buffer in the `OfflinePrelearner`
        # to 1,000 timesteps.
        prelearner_buffer_kwargs={
            "capacity": 500,
        },
    )
)

提示

要选择一个适当的 input_read_batch_size,请查看您记录的 episodes 的长度。在某些情况下,每个单独的 episode 都足够长,可以满足 train_batch_size_per_learner,您可以选择 input_read_batch_size1。大多数情况下并非如此,您需要考虑应缓冲多少个 episodes,以平衡从读取输入中摄取的数据量与从 OfflinePreLearner 中的 EpisodeReplayBuffer 实例中采样的数据变异性。

如何调优更新 (Learner)#

更新 (Learner) 是 RLlib 离线强化学习流水线中最后的下游任务,其消耗速度决定了数据流水线的整体吞吐量。如果学习过程缓慢,可能会在上游层造成反压,可能导致对象溢出或内存不足 (OOM) 错误。因此,与上游组件协调并微调此层至关重要。可以在离线算法中调整几个参数来优化学习速度:

  • Actor 池大小

  • 分配的资源

  • 调度策略

  • 批量大小

  • 批量预取

  • Learner 迭代次数。

Actor 池大小#

RLlib 支持通过参数 num_learners 来扩展 Learner 实例。当此值为 0 时,RLlib 在本地进程中使用一个 Learner 实例,而对于 >0 的值,RLlib 使用 backend_executor_BackendExecutor 进行横向扩展。此 executor 会生成您指定的 Learner 实例数量,管理分布式训练并聚合跨 Learner actor 的中间结果。Learner 扩展会增加训练吞吐量,您只有在离线数据流水线中的上游组件能够以足以匹配增加的训练容量的速率提供数据时才应该应用它。RLlib 的离线 API 通过利用 streaming_split 在其最终层提供了强大的可扩展性。此功能将数据流分成多个子流,然后由单独的 Learner 实例处理,从而实现高效的并行消耗并提高整体吞吐量。

例如,要将 learners 的数量设置为 4,可以使用以下语法:

from ray.rllib.algorithms.algorithm_config import AlgorithmConfig

config = (
    AlgorithmConfig()
    .learners(num_learners=4)
)

已分配资源#

与后处理 (Pre-Learner) 层一样,分配额外的资源可以帮助解决训练缓慢的问题。主要利用的资源是 GPU,因为训练涉及通过 RLModule 的前向和后向传递,这可以由 GPU 显著加速。如果您的训练已经使用了 GPU,但性能仍然有问题,可以考虑通过以下方式进行扩展:为每个 Learner 添加更多 GPU 以增加 GPU 内存和计算能力(设置 config.learners(num_gpus_per_learner=...)),或者添加额外的 Learner worker 以进一步分配工作负载(通过设置 config.learners(num_learners=...))。此外,确保数据吞吐量和上游组件已优化,以使 learners 充分利用,因为上游容量不足会成为训练过程的瓶颈。

要为 learners 提供更多计算资源,请使用 num_gpus_per_learnernum_cpus_per_learner,如下所示:

from ray.rllib.algorithms.algorithm_config import AlgorithmConfig

config = (
    AlgorithmConfig()
    .learners(num_learners=4, num_gpus_per_learner=2)
)

调度策略#

Ray 中的调度策略通过尝试将任务和 actor 分布到集群中的多个节点上,从而最大化资源利用率和容错性,在任务和 actor 的放置中起着关键作用。当在单节点集群(即:一个大的头节点)上运行时,调度策略几乎没有明显的影​​响。然而,在多节点集群中,由于数据局部性的重要性,调度可以显著影响离线数据流水线的性能。数据处理发生在所有节点上,并且在训练期间保持数据局部性可以提高性能。

在这种情况下,您可以通过将 RLlib 的默认调度策略从 "PACK" 更改为 "SPREAD" 来改善数据局部性。此策略将 Learner actor 分布到整个集群中,允许 Ray Data <data> 利用局部性感知包选择,这可以提高效率。

以下是更改调度策略的示例:

"""Just for show-casing, don't run."""
import os
from ray import data
from ray.rllib.algorithms.algorithm_config.AlgorithmConfig

# Configure a "SPREAD" scheduling strategy for learners.
os.environ["TRAIN_ENABLE_WORKER_SPREAD_ENV"] = "1"

# Get the current data context.
data_context = data.DataContext.get_current()
# Set the execution options such that the Ray Data tries to match
# the locality of an output stream with where learners are located.
data_context.execution_options = data.ExecutionOptions(
    locality_with_output=True,
)

# Build the config.
config = (
    AlgorithmConfig()
    .learners(
        # Scale the learners.
        num_learners=4,
        num_gpus_per_learner=2,
    )
    .offline_data(
        ...,
        # Run in each RLlib training iteration 10
        # iterations per learner (each of them with
        # `train_batch_size_per_learner`).
        dataset_num_iters_per_learner=20,
    )
)

# Build the algorithm from the config.
algo = config.build()

# Train for 10 iterations.
for _ in range(10)
    res = algo.train()

批量大小#

批量大小是 RLlib 新离线强化学习 API 中最简单的性能优化参数之一。小批量大小可能无法充分利用硬件,导致效率低下;而过大的批量大小可能会超出内存限制。在流式处理流水线中,选定的批量大小影响数据在并行工作者之间如何分区和处理。较大的批量大小减少了频繁任务协调的开销,但如果超出硬件限制,它们会减慢整个流水线。您可以使用 train_batch_size_per_learner 属性配置训练批量大小,如下所示。

from ray.rllib.algorithms.algorithm_config import AlgorithmConfig

config = (
    AlgorithmConfig()
    .training(
        train_batch_size_per_learner=1024,
    )
)

Ray Data <data> 中,通常使用二的幂作为批量大小。但是,您可以根据需要自由选择任何整数值作为批量大小。

批量预取#

批量预取允许您控制离线数据流水线下游的数据消耗。主要目标是确保 learners 保持活动状态,保持数据流的持续性。这是通过在 learner 处理当前批次时准备下一个批次来实现的。预取决定为 learners 准备多少批次,应根据生成下一个批次所需的时间和 learner 的更新速度进行调整。预取过多的批次可能导致内存效率低下,在某些情况下甚至导致上游任务的反压。

您可以在 iter_batches_kwargs 中配置批量预取:

from ray.rllib.algorithms.algorithm_config import AlgorithmConfig

config = (
    AlgorithmConfig()
    .offline_data(
        iter_batches_kwargs={
            "prefetch_batches": 2,
        }
    )
)

警告

不要覆盖 RLlib 的 map_batches_kwargs 中的 batch_size。这通常会导致严重的性能下降。请注意,此 batch_size 不同于 train_batch_size_per_learner:前者指定了迭代流式处理流水线输出数据时的批量大小,而后者定义了每个 Learner 中用于训练的批量大小。

Learner 迭代次数#

此调优参数仅在使用多个 :Learner 实例时可用。在分布式学习中,每个 Learner 实例处理离线流式处理流水线的一个子流,迭代该子流中的批次。您可以控制每个 Learner 实例在每个 RLlib 训练迭代中运行的迭代次数。结果报告在每次 RLlib 训练迭代后发生。将此参数设置得太低会导致效率低下,而过高的值会阻碍训练监控,在某些情况下(例如 RLlib 的 MARWIL 实现中)还会导致训练数据过时。这是因为一些数据转换依赖于 RLModule,而 Learner 实例正在此模块上进行训练。每个子流的迭代次数由属性 dataset_num_iters_per_learner 控制,其默认值为 None,表示它在子流上运行一个 epoch。

您可以如下修改此值:

from ray.rllib.algorithms.algorithm_config import AlgorithmConfig

config = (
    AlgorithmConfig()
    .offline_data(
        # Train on 20 batches from the substream in each learner.
        dataset_num_iters_per_learner=20,
    )
)

自定义#

RLlib 中离线强化学习组件(例如 AlgorithmLearnerRLModule)的自定义过程与其在线强化学习对应项类似。有关详细指导,请参阅关于 算法 (Algorithms)Learners 和 RLlib RLModule 的文档。RLlib 中的新栈离线强化学习流式处理流水线支持在数据流中的各种级别和位置进行自定义,从而可以针对离线强化学习算法的特定要求设计定制解决方案。

  • Connector 级别

  • PreLearner 级别

  • 流水线级别。

Connector 级别#

SingleAgentEpisode 实例进行小型数据转换可以通过修改 ConnectorPipelineV2 轻松实现,它是 OfflinePreLearner 的一部分,用于准备 episodes 以进行训练。您可以利用 RLlib 库中的任何 connector(参见 RLlib 的默认 connectors)或创建自定义 connector(参见 RLlib 的 ConnectorV2 示例)以集成到 LearnerConnectorPipelineV2 中。必须仔细考虑应用 ConnectorV2 实例的顺序,如 RLlib 的 MARWIL 算法 实现中所示(参见 MARWIL 论文)。

MARWIL 算法 计算的损失超出了行为克隆,它在训练期间使用优势改进专家策略。这些优势通过使用值模型的 广义优势估计 (GAE) 在运行时计算。GAE 通过 GeneralAdvantageEstimation connector 计算。此 connector 有特定要求:它处理 SingleAgentEpisode 实例列表,并且必须是 ConnectorPipelineV2 中的最终组件之一。这是因为它依赖于包含 OBSREWARDSNEXT_OBSTERMINATEDTRUNCATED 字段的完全准备好的批次。此外,输入的 SingleAgentEpisode 实例必须已经包含一个人工拉长的时间步。

为了满足这些要求,流水线必须包含以下顺序的 ConnectorV2 实例:

  1. ray.rllib.connectors.learner.add_one_ts_to_episodes_and_truncate.AddOneTsToEpisodesAndTruncate 确保 SingleAgentEpisode 对象被拉长一个时间步。

  2. ray.rllib.connectors.common.add_observations_from_episodes_to_batch.AddObservationsFromEpisodesToBatch 将观测值 (OBS) 合并到批次中。

  3. ray.rllib.connectors.learner.add_next_observations_from_episodes_to_train_batch.AddNextObservationsFromEpisodesToTrainBatch 添加下一个观测值 (NEXT_OBS)。

  4. 最后,应用 ray.rllib.connectors.learner.general_advantage_estimation.GeneralAdvantageEstimation connector 部分。

下面是来自 RLlib 的 MARWIL 算法 的示例代码片段,演示了此设置:

@override(AlgorithmConfig)
def build_learner_connector(
    self,
    input_observation_space,
    input_action_space,
    device=None,
):
    pipeline = super().build_learner_connector(
        input_observation_space=input_observation_space,
        input_action_space=input_action_space,
        device=device,
    )

    # Before anything, add one ts to each episode (and record this in the loss
    # mask, so that the computations at this extra ts aren't used to compute
    # the loss).
    pipeline.prepend(AddOneTsToEpisodesAndTruncate())

    # Prepend the "add-NEXT_OBS-from-episodes-to-train-batch" connector piece (right
    # after the corresponding "add-OBS-..." default piece).
    pipeline.insert_after(
        AddObservationsFromEpisodesToBatch,
        AddNextObservationsFromEpisodesToTrainBatch(),
    )

    # At the end of the pipeline (when the batch is already completed), add the
    # GAE connector, which performs a vf forward pass, then computes the GAE
    # computations, and puts the results of this (advantages, value targets)
    # directly back in the batch. This is then the batch used for
    # `forward_train` and `compute_losses`.
    pipeline.append(
        GeneralAdvantageEstimation(gamma=self.gamma, lambda_=self.lambda_)
    )

    return pipeline

定义一个基础 LearnerConnector 流水线#

有多种方法可以自定义 LearnerConnectorPipeline。一种方法是如上所示覆盖 Algorithm 中的 build_learner_connector 方法。或者,您可以使用 learner_connector 属性直接定义自定义 ConnectorV2 部分添加到 LearnerConnectorPipeline 中:

def _make_learner_connector(input_observation_space, input_action_space):
    # Create the learner connector.
    return CustomLearnerConnector(
        parameter_1=0.3,
        parameter_2=100,
    )

config = (
    AlgorithmConfig()
    .training(
        # Add the connector pipeline as the starting point for
        # the learner connector pipeline.
        learner_connector=_make_learner_connector,
    )
)

如注释所述,将 ConnectorV2 部分添加到 LearnerConnectorPipeline 的这种方法仅适用于您打算操作原始 episodes 的情况,因为您的 ConnectorV2 部分是构建流水线其余部分(包括批处理和其他处理步骤)的基础。如果您的目标是在 LearnerConnectorPipeline 后续阶段修改数据,则应覆盖 Algorithmbuild_learner_connector 方法,或者考虑第三种选择:完全覆盖 PreLearner

PreLearner 级别#

如果您需要在更深层进行数据转换——在数据到达 SingleAgentEpisode 阶段之前——请考虑覆盖 OfflinePreLearner。此类协调完整的数据转换流水线,将原始输入数据转换为准备好进行训练的 MultiAgentBatch 对象。例如,如果您的数据以需要预解析和重构的专门格式(例如,XML、HTML、Protobuf、图像或视频)存储,您可能需要直接处理这些自定义格式。您可以利用 Ray Data's custom datasources <custom_datasource> 等工具(例如,read_binary_files())来管理摄取过程。为了确保这些数据得到适当的结构化并排序到 SingleAgentEpisode 对象中,您可以覆盖静态方法 _map_to_episodes()

为了进行更广泛的自定义,您可以重写 __call__ 方法来定义自定义转换步骤,实现独特的 LearnerConnectorPipeline,并为 Learner 构建 MultiAgentBatch 实例。

以下示例演示了如何使用自定义 OfflinePreLearner 处理文本数据并构建训练批次:

import gymnasium as gym
import numpy as np
import uuid
from typing import Any, Dict, List, Optional, Union

from ray import data
from ray.rllib.env.single_agent_episode import SingleAgentEpisode
from ray.rllib.offline.offline_prelearner import OfflinePreLearner, SCHEMA
from ray.rllib.utils.annotations import override
from ray.rllib.utils.typing import EpisodeType

class TextOfflinePreLearner(OfflinePreLearner):

    @staticmethod
    @override(OfflinePreLearner)
    def _map_to_episodes(
        is_multi_agent: bool,
        batch: Dict[str, Union[list, np.ndarray]],
        schema: Dict[str, str] = SCHEMA,
        to_numpy: bool = False,
        input_compress_columns: Optional[List[str]] = None,
        observation_space: gym.Space = None,
        action_space: gym.Space = None,
        vocabulary: Dict[str, Any] = None,
        **kwargs: Dict[str, Any],
    ) -> Dict[str, List[EpisodeType]]:

        # If we have no vocabulary raise an error.
        if not vocabulary:
            raise ValueError(
                "No `vocabulary`. It needs a vocabulary in form of dictionary ",
                "mapping tokens to their IDs."
            )
        # Define container for episodes.
        episodes = []

        # Data comes in batches of string arrays under the `"text"` key.
        for text in batch["text"]:
            # Split the text and tokenize.
            tokens = text.split(" ")
            # Encode tokens.
            encoded = [vocabulary[token] for token in tokens]
            one_hot_vectors = np.zeros((len(tokens), len(vocabulary), 1, 1))
            for i, token in enumerate(tokens):
                if token in vocabulary:
                    one_hot_vectors[i][vocabulary[token] - 1] = 1.0

            # Build the `SingleAgentEpisode`.
            episode = SingleAgentEpisode(
                # Generate a unique ID.
                id_=uuid.uuid4().hex,
                # agent_id="default_policy",
                # module_id="default_policy",
                # We use the starting token with all added tokens as observations.
                observations=[ohv for ohv in one_hot_vectors],
                observation_space=observation_space,
                # Actions are defined to be the "chosen" follow-up token after
                # given the observation.
                actions=encoded[1:],
                action_space=action_space,
                # Rewards are zero until the end of a sequence.
                rewards=[0.0 for i in range(len(encoded) - 2)] + [1.0],
                # The episode is always terminated (as sentences in the dataset are).
                terminated=True,
                truncated=False,
                # No lookback. You want the episode to start at timestep zero.
                len_lookback_buffer=0,
                t_started=0,
            )

            # If episodes should be numpy'ized. Some connectors need this.
            if to_numpy:
                episode.to_numpy()

            # Append the episode to the list of episodes.
            episodes.append(episode)

        # Return a batch with key `"episodes"`.
        return {"episodes": episodes}

# Define the dataset.
ds = data.read_text("s3://anonymous@ray-example-data/this.txt")

# Create a vocabulary.
tokens = []
for b in ds.iter_rows():
    tokens.extend(b["text"].split(" "))
vocabulary = {token: idx for idx, token in enumerate(set(tokens), start=1)}

# Take a small batch of 10 from the dataset.
batch = ds.take_batch(10)

# Now use your `OfflinePreLearner`.
episodes = TextOfflinePreLearner._map_to_episodes(
    is_multi_agent=False,
    batch=batch,
    to_numpy=True,
    schema=None,
    input_compress_columns=False,
    action_space=None,
    observation_space=None,
    vocabulary=vocabulary,
)

# Show the constructed episodes.
print(f"Episodes: {episodes}")

前面的示例说明了 RLlib 的离线强化学习 API 在自定义数据转换方面的灵活性。在这种情况下,定制的 OfflinePreLearner 处理一批文本数据(组织成句子),并将每个句子转换为一个 SingleAgentEpisode。静态方法返回一个字典,其中包含这些 SingleAgentEpisode 实例的列表。类似地,您可以通过覆盖 __call__() 方法来扩展此功能。例如,您可以实现一个 ray.rllib.connectors.learner.learner_connector_pipeline.LearnerConnectorPipeline,它将多个观测值(例如,token)堆叠在一起。这可以通过使用 RLlib 的 FrameStackingLearner 来实现,如下面的示例所示。

import gymnasium as gym
import numpy as np
import uuid
from typing import Any, Dict, List, Optional, Tuple, Union

from ray import data
from ray.actor import ActorHandle
from ray.rllib.algorithms.algorithm_config import AlgorithmConfig
from ray.rllib.algorithms.bc.bc_catalog import BCCatalog
from ray.rllib.algorithms.bc.torch.default_bc_torch_rl_module import DefaultBCTorchRLModule
from ray.rllib.connectors.common import AddObservationsFromEpisodesToBatch, BatchIndividualItems, NumpyToTensor, AgentToModuleMapping
from ray.rllib.connectors.learner.add_columns_from_episodes_to_train_batch import AddColumnsFromEpisodesToTrainBatch
from ray.rllib.connectors.learner.frame_stacking import FrameStackingLearner
from ray.rllib.connectors.learner.learner_connector_pipeline import LearnerConnectorPipeline
from ray.rllib.core.learner.learner import Learner
from ray.rllib.core.rl_module.default_model_config import DefaultModelConfig
from ray.rllib.core.rl_module.multi_rl_module import MultiRLModuleSpec
from ray.rllib.core.rl_module.rl_module import RLModuleSpec
from ray.rllib.env.single_agent_episode import SingleAgentEpisode
from ray.rllib.policy.sample_batch import MultiAgentBatch, SampleBatch
from ray.rllib.offline.offline_prelearner import OfflinePreLearner, SCHEMA

from ray.rllib.utils.annotations import override
from ray.rllib.utils.typing import EpisodeType, ModuleID

class TextOfflinePreLearner(OfflinePreLearner):

    @override(OfflinePreLearner)
    def __init__(
        self,
        config: "AlgorithmConfig",
        learner: Union[Learner, List[ActorHandle]] = None,
        locality_hints: Optional[List[str]] = None,
        spaces: Optional[Tuple[gym.Space, gym.Space]] = None,
        module_spec: Optional[MultiRLModuleSpec] = None,
        module_state: Optional[Dict[ModuleID, Any]] = None,
        vocabulary: Dict[str, Any] = None,
        **kwargs: Dict[str, Any],
    ):
        self.config = config
        self.spaces = spaces
        self.vocabulary = vocabulary
        self.vocabulary_size = len(self.vocabulary)

        # Build the `RLModule`.
        self._module = module_spec.build()
        if module_state:
            self._module.set_state(module_state)

        # Build the learner connector pipeline.
        self._learner_connector = LearnerConnectorPipeline(
            connectors=[
                FrameStackingLearner(
                    num_frames=4,
                )
            ],
            input_action_space=module_spec.action_space,
            input_observation_space=module_spec.observation_space,
        )
        self._learner_connector.append(
            AddObservationsFromEpisodesToBatch(as_learner_connector=True),
        )
        self._learner_connector.append(
            AddColumnsFromEpisodesToTrainBatch(),
        )
        self._learner_connector.append(
            BatchIndividualItems(multi_agent=False),
        )
        # Let us run exclusively on CPU, then we can convert here to Tensor.
        self._learner_connector.append(
            NumpyToTensor(as_learner_connector=True),
        )

    @override(OfflinePreLearner)
    def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, List[EpisodeType]]:

        # Convert raw data to episodes.
        episodes = TextOfflinePreLearner._map_to_episodes(
            is_multi_agent=False,
            batch=batch,
            to_numpy=True,
            schema=None,
            input_compress_columns=False,
            action_space=self.spaces[0],
            observation_space=self.spaces[1],
            vocabulary=self.vocabulary,
        )["episodes"]

        # Run the learner connector pipeline with the
        # `FrameStackLearner` piece.
        batch = self._learner_connector(
            rl_module=self._module,
            batch={},
            episodes=episodes,
            shared_data={},
        )

        # Convert to `MultiAgentBatch` for the learner.
        batch = MultiAgentBatch(
            {
                module_id: SampleBatch(module_data)
                for module_id, module_data in batch.items()
            },
            # TODO (simon): This can be run once for the batch and the
            # metrics, but we run it twice: here and later in the learner.
            env_steps=sum(e.env_steps() for e in episodes),
        )

        # Return the `MultiAgentBatch` under the `"batch"` key.
        return {"batch": batch}

    @staticmethod
    @override(OfflinePreLearner)
    def _map_to_episodes(
        is_multi_agent: bool,
        batch: Dict[str, Union[list, np.ndarray]],
        schema: Dict[str, str] = SCHEMA,
        to_numpy: bool = False,
        input_compress_columns: Optional[List[str]] = None,
        observation_space: gym.Space = None,
        action_space: gym.Space = None,
        vocabulary: Dict[str, Any] = None,
        **kwargs: Dict[str, Any],
    ) -> Dict[str, List[EpisodeType]]:

        # If we have no vocabulary raise an error.
        if not vocabulary:
            raise ValueError(
                "No `vocabulary`. It needs a vocabulary in form of dictionary ",
                "mapping tokens to their IDs."
            )
        # Define container for episodes.
        episodes = []

        # Data comes in batches of string arrays under the `"text"` key.
        for text in batch["text"]:
            # Split the text and tokenize.
            tokens = text.split(" ")
            # Encode tokens.
            encoded = [vocabulary[token] for token in tokens]
            one_hot_vectors = np.zeros((len(tokens), len(vocabulary), 1, 1))
            for i, token in enumerate(tokens):
                if token in vocabulary:
                    one_hot_vectors[i][vocabulary[token] - 1] = 1.0

            # Build the `SingleAgentEpisode`.
            episode = SingleAgentEpisode(
                # Generate a unique ID.
                id_=uuid.uuid4().hex,
                # agent_id="default_policy",
                # module_id="default_policy",
                # We use the starting token with all added tokens as observations.
                observations=[ohv for ohv in one_hot_vectors],
                observation_space=observation_space,
                # Actions are defined to be the "chosen" follow-up token after
                # given the observation.
                actions=encoded[1:],
                action_space=action_space,
                # Rewards are zero until the end of a sequence.
                rewards=[0.0 for i in range(len(encoded) - 2)] + [1.0],
                # The episode is always terminated (as sentences in the dataset are).
                terminated=True,
                truncated=False,
                # No lookback. You want the episode to start at timestep zero.
                len_lookback_buffer=0,
                t_started=0,
            )

            # If episodes should be numpy'ized. Some connectors need this.
            if to_numpy:
                episode.to_numpy()

            # Append the episode to the list of episodes.
            episodes.append(episode)

        # Return a batch with key `"episodes"`.
        return {"episodes": episodes}

# Define dataset on sample data.
ds = data.read_text("s3://anonymous@ray-example-data/this.txt")

# Create a vocabulary.
tokens = []
for b in ds.iter_rows():
    tokens.extend(b["text"].split(" "))
vocabulary = {token: idx for idx, token in enumerate(set(tokens), start=1)}

# Specify an `RLModule` and wrap it with a `MultiRLModuleSpec`. Note,
# on `Learner`` side any `RLModule` is an `MultiRLModule`.
module_spec = MultiRLModuleSpec(
    rl_module_specs={
        "default_policy": RLModuleSpec(
            model_config=DefaultModelConfig(
                conv_filters=[[16, 4, 2], [32, 4, 2], [64, 4, 2], [128, 4, 2]],
                conv_activation="relu",
            ),
            inference_only=False,
            module_class=DefaultBCTorchRLModule,
            catalog_class=BCCatalog,
            action_space = gym.spaces.Discrete(len(vocabulary)),
            observation_space=gym.spaces.Box(0.0, 1.0, (len(vocabulary), 1, 1), np.float32),
        ),
    },
)

# Take a small batch.
batch = ds.take_batch(10)

# Build and instance your `OfflinePreLearner`.
oplr = TextOfflinePreLearner(
    config=AlgorithmConfig(),
    spaces=(
        gym.spaces.Discrete(len(vocabulary)),
        gym.spaces.Box(0.0, 1.0, (len(vocabulary), 1, 1), np.float32)),
    module_spec=module_spec,
    vocabulary=vocabulary,
)

# Run your `OfflinePreLearner`.
transformed = oplr(batch)

# Show the generated batch.
print(f"Batch: {batch}")

完全自定义 OfflinePreLearner 的能力使您能够设计定制的数据转换工作流程。这包括定义特定的 learner connector 流水线和实现原始数据映射,从而能够对文本数据从原始格式到 MultiAgentBatch 进行多步处理。

要集成您的自定义 OfflinePreLearner,只需在您的 AlgorithmConfig 中指定它:

from ray.rllib.algorithms.algorithm_config import AlgorithmConfig

config = (
    AlgorithmConfig()
    .offline_data(
        # Provide your custom `OfflinePreLearner`.
        prelearner_class=TextOfflinePreLearner,
        # Provide special keyword arguments your `OfflinePreLearner` needs.
        prelearner_kwargs={
            "vocabulary": vocabulary,
        },
    )
)

如果这些自定义功能仍不能满足您的要求,请考虑转到流水线级别以获得更大的灵活性。

流水线级别#

在 RLlib 离线强化学习 API 的这个级别,您可以通过覆盖 OfflineData 类来重新定义从数据读取到批次迭代的完整流水线。然而,在大多数情况下,其他两个级别应该足以满足您的要求。操作完整的流水线需要谨慎处理,因为它可能会在很大程度上降低流水线的性能。在编写自己的流水线之前,仔细研究 OfflineData 类,以充分了解默认流水线的工作原理。主要有两种方法定义此流水线:

  • 定义数据读取过程的 __init__() 方法。

  • 定义数据映射和批次迭代的 sample() 方法。

例如,如果您有一些基础数据转换(例如将图像文件转换为 numpy 数组),请考虑覆盖 __init__() 方法。

import io
import logging
import numpy as np

from PIL import Image
from typing import Any, Dict

from ray import data
from ray.rllib.algorithms.algorithm_config import AlgorithmConfig
from ray.rllib.offline.offline_data import OfflineData
from ray.rllib.offline.offline_prelearner import OfflinePreLearner
from ray.rllib.utils.annotations import override

logger = logging.getLogger(__name__)


class ImageOfflineData(OfflineData):
    """This class overrides `OfflineData` to read in raw image data.

    The image data is from Ray Data`s S3 example bucket, namely
    `ray-example-data/batoidea/JPEGImages/`.
    To read in this data the raw bytes have to be decoded and then
    converted to `numpy` arrays. Each image array has a dimension
    (32, 32, 3).

    To just read in the raw image data and convert it to arrays it
    suffices to override the `OfflineData.__init__` method only.
    Note, that further transformations of the data - specifically
    into `SingleAgentEpisode` data - will be performed in a custom
    `OfflinePreLearner` defined in the `image_offline_prelearner`
    file. You could hard-code the usage of this prelearner here,
    but you will use the `prelearner_class` attribute in the
    `AlgorithmConfig` instead.
    """

    @override(OfflineData)
    def __init__(self, config: AlgorithmConfig):

        # Set class attributes.
        self.config = config
        self.is_multi_agent = self.config.is_multi_agent
        self.materialize_mapped_data = False
        self.path = self.config.input_

        self.data_read_batch_size = self.config.input_read_batch_size
        self.data_is_mapped = False

        # Define your function to map images to numpy arrays.
        def map_to_numpy(row: Dict[str, Any]) -> Dict[str, Any]:
            # Convert to byte stream.
            bytes_stream = io.BytesIO(row["bytes"])
            # Convert to image.
            image = Image.open(bytes_stream)
            # Return an array of the image.
            return {"array": np.array(image)}

        try:
            # Load the dataset and transform to arrays on-the-fly.
            self.data = data.read_binary_files(self.path).map(map_to_numpy)
        except Exception as e:
            logger.error(e)

        # Define further attributes needed in the `sample` method.
        self.batch_iterator = None
        self.map_batches_kwargs = self.config.map_batches_kwargs
        self.iter_batches_kwargs = self.config.iter_batches_kwargs
        # Use a custom OfflinePreLearner if needed.
        self.prelearner_class = self.config.prelearner_class or OfflinePreLearner

        # For remote learner setups.
        self.locality_hints = None
        self.learner_handles = None
        self.module_spec = None

在提供的代码示例中,您定义了一个自定义 OfflineData 类来处理图像数据的读取和预处理,将其从二进制编码格式转换为 numpy 数组。此外,您实现了一个自定义 OfflinePreLearner 来进一步处理这些数据,将其转换为 learner 可用的 MultiAgentBatch 格式。

import gymnasium as gym
import numpy as np
import random
import uuid

from typing import Any, Dict, List, Optional, Tuple, Union

from ray.actor import ActorHandle
from ray.rllib.algorithms.algorithm_config import AlgorithmConfig
from ray.rllib.core.learner.learner import Learner
from ray.rllib.core.rl_module.multi_rl_module import MultiRLModuleSpec
from ray.rllib.env.single_agent_episode import SingleAgentEpisode
from ray.rllib.offline.offline_prelearner import OfflinePreLearner, SCHEMA
from ray.rllib.utils.annotations import override
from ray.rllib.utils.typing import EpisodeType, ModuleID


class ImageOfflinePreLearner(OfflinePreLearner):
    """This class transforms image data to `MultiAgentBatch`es.

    While the `ImageOfflineData` class transforms raw image
    bytes to `numpy` arrays, this class maps these data in
    `SingleAgentEpisode` instances through the learner connector
    pipeline and finally outputs a >`MultiAgentBatch` ready for
    training in RLlib's `Learner`s.

    Note, the basic transformation from images to `SingleAgentEpisode`
    instances creates synthetic data that does not rely on any MDP
    and therefore no agent can learn from it. However, this example
    should show how to transform data into this form through
    overriding the `OfflinePreLearner`.
    """

    def __init__(
        self,
        config: "AlgorithmConfig",
        learner: Union[Learner, List[ActorHandle]],
        spaces: Optional[Tuple[gym.Space, gym.Space]] = None,
        module_spec: Optional[MultiRLModuleSpec] = None,
        module_state: Optional[Dict[ModuleID, Any]] = None,
        **kwargs: Dict[str, Any],
    ):
        # Set up necessary class attributes.
        self.config = config
        self.action_space = spaces[1]
        self.observation_space = spaces[0]
        self.input_read_episodes = self.config.input_read_episodes
        self.input_read_sample_batches = self.config.input_read_sample_batches
        self._policies_to_train = "default_policy"
        self._is_multi_agent = False

        # Build the `MultiRLModule` needed for the learner connector.
        self._module = module_spec.build()

        # Build the learner connector pipeline.
        self._learner_connector = self.config.build_learner_connector(
            input_observation_space=self.observation_space,
            input_action_space=self.action_space,
        )

    @override(OfflinePreLearner)
    @staticmethod
    def _map_to_episodes(
        is_multi_agent: bool,
        batch: Dict[str, Union[list, np.ndarray]],
        schema: Dict[str, str] = SCHEMA,
        to_numpy: bool = False,
        input_compress_columns: Optional[List[str]] = None,
        observation_space: gym.Space = None,
        action_space: gym.Space = None,
        **kwargs: Dict[str, Any],
    ) -> Dict[str, List[EpisodeType]]:

        # Define a container for the episodes.
        episodes = []

        # Batches come in as numpy arrays.
        for i, obs in enumerate(batch["array"]):

            # Construct your episode.
            episode = SingleAgentEpisode(
                id_=uuid.uuid4().hex,
                observations=[obs, obs],
                observation_space=observation_space,
                actions=[action_space.sample()],
                action_space=action_space,
                rewards=[random.random()],
                terminated=True,
                truncated=False,
                len_lookback_buffer=0,
                t_started=0,
            )

            # Numpy'ize, if necessary.
            if to_numpy:
                episode.to_numpy()

            # Store the episode in the container.
            episodes.append(episode)

        return {"episodes": episodes}

这演示了如何使用您自己的逻辑自定义整个离线数据流水线。您可以使用以下代码运行示例:

"""Example showing how to customize an offline data pipeline.

This example:
    - demonstrates how you can customized your offline data pipeline.
    - shows how you can override the `OfflineData` to read raw image
    data and transform it into `numpy ` arrays.
    - explains how you can override the `OfflinePreLearner` to
    transform data further into `SingleAgentEpisode` instances that
    can be processes by the learner connector pipeline.

How to run this script
----------------------
`python [script file name].py --checkpoint-at-end`

For debugging, use the following additional command line options
`--no-tune --num-env-runners=0`
which should allow you to set breakpoints anywhere in the RLlib code and
have the execution stop there for inspection and debugging.

For logging to your WandB account, use:
`--wandb-key=[your WandB API key] --wandb-project=[some project name]
--wandb-run-name=[optional: WandB run name (within the defined project)]`

Results to expect
-----------------
2024-12-03 19:59:23,043 INFO streaming_executor.py:109 -- Execution plan
of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadBinary] ->
TaskPoolMapOperator[Map(map_to_numpy)] -> LimitOperator[limit=128]
✔️  Dataset execution finished in 10.01 seconds: 100%|███████████████████
███████████████████████████████████████████████████████████████████████|
3.00/3.00 [00:10<00:00, 3.34s/ row]
- ReadBinary->SplitBlocks(11): Tasks: 0; Queued blocks: 0; Resources: 0.0
CPU, 0.0B object store: 100%|█████████████████████████████████████████|
3.00/3.00 [00:10<00:00, 3.34s/ row]
- Map(map_to_numpy): Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU,
0.0B object store: 100%|███████████████████████████████████████████████████|
3.00/3.00 [00:10<00:00, 3.34s/ row]
- limit=128: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 3.0KB object
store: 100%|██████████████████████████████████████████████████████████|
3.00/3.00 [00:10<00:00, 3.34s/ row]
Batch: {'batch': [MultiAgentBatch({}, env_steps=3)]}
"""

import gymnasium as gym
import numpy as np

from ray.rllib.algorithms.bc import BCConfig
from ray.rllib.algorithms.bc.bc_catalog import BCCatalog
from ray.rllib.algorithms.bc.torch.bc_torch_rl_module import BCTorchRLModule
from ray.rllib.core.rl_module.rl_module import RLModuleSpec, DefaultModelConfig
from ray.rllib.core.rl_module.multi_rl_module import MultiRLModuleSpec
from ray.rllib.examples.offline_rl.classes.image_offline_data import ImageOfflineData
from ray.rllib.examples.offline_rl.classes.image_offline_prelearner import (
    ImageOfflinePreLearner,
)

# Create an Algorithm configuration.
# TODO: Make this an actually running/learning example with RLunplugged
# data from S3 and add this to the CI.
config = (
    BCConfig()
    .environment(
        action_space=gym.spaces.Discrete(2),
        observation_space=gym.spaces.Box(0, 255, (32, 32, 3), np.float32),
    )
    .offline_data(
        input_=["s3://anonymous@ray-example-data/batoidea/JPEGImages/"],
        prelearner_class=ImageOfflinePreLearner,
    )
)

# Specify an `RLModule` and wrap it with a `MultiRLModuleSpec`. Note,
# on `Learner`` side any `RLModule` is an `MultiRLModule`.
module_spec = MultiRLModuleSpec(
    rl_module_specs={
        "default_policy": RLModuleSpec(
            model_config=DefaultModelConfig(
                conv_filters=[[16, 4, 2], [32, 4, 2], [64, 4, 2], [128, 4, 2]],
                conv_activation="relu",
            ),
            inference_only=False,
            module_class=BCTorchRLModule,
            catalog_class=BCCatalog,
            action_space=gym.spaces.Discrete(2),
            observation_space=gym.spaces.Box(0, 255, (32, 32, 3), np.float32),
        ),
    },
)

# Construct your `OfflineData` class instance.
offline_data = ImageOfflineData(config)

# Check, how the data is transformed. Note, the
# example dataset has only 3 such images.
batch = offline_data.data.take_batch(3)

# Construct your `OfflinePreLearner`.
offline_prelearner = ImageOfflinePreLearner(
    config=config,
    learner=None,
    spaces=(
        config.observation_space,
        config.action_space,
    ),
    module_spec=module_spec,
)

# Transform the raw data to `MultiAgentBatch` data.
batch = offline_prelearner(batch)

# Show the transformed batch.
print(f"Batch: {batch}")

提示

请仔细考虑这种方法:在许多情况下,在调用 RLlib 的离线强化学习 API 之前将数据完全转换为合适的格式可能更高效。例如,在上面的示例中,您可以预先将整个图像数据集预处理为 numpy 数组,并利用 RLlib 的默认 OfflineData 类进行后续步骤。

监控#

要有效地监控离线数据流水线,请利用 Ray Data 的内置监控能力。重点确保离线数据流式处理流水线的所有阶段都在积极处理数据。此外,密切关注 Learner 实例,特别是 learner_update_timer,它应该保持较低的值——对于小型模型,应在 0.02 左右——以指示高效的数据处理和模型更新。

注意

RLlib 不会将 Ray Data 指标包含在其结果中,也不会通过 Ray TuneTBXLoggerCallbackTensorboard 中显示它们。强烈建议启用 Ray Dashboard,可通过 127.0.0.1:8265 访问,以进行全面的监控和洞察。

输入 API#

您可以使用以下选项配置 agent 的经验输入:

def offline_data(
    self,
    *,
    # Specify how to generate experiences:
    # - A local directory or file glob expression (for example "/tmp/*.json").
    # - A cloud storage path or file glob expression (for example "gs://rl/").
    # - A list of individual file paths/URIs (for example ["/tmp/1.json",
    #   "s3://bucket/2.json"]).
    # - A file or directory path in a given `input_filesystem`.
    input_: Optional[Union[str, Callable[[IOContext], InputReader]]],
    # Read method for the `ray.data.Dataset` to read in the
    # offline data from `input_`. The default is `read_parquet` for Parquet
    # files. See https://docs.rayai.org.cn/en/latest/data/api/input_output.html for
    # more info about available read methods in `ray.data`.
    input_read_method: Optional[Union[str, Callable]],
    # Keyword args for `input_read_method`. These
    # are passed into the read method without checking. Use these
    # keyword args together with `map_batches_kwargs` and
    # `iter_batches_kwargs` to tune the performance of the data pipeline. It
    # is strongly recommended to rely on Ray Data's automatic read performance
    # tuning
    input_read_method_kwargs: Optional[Dict],
    # Table schema for converting offline data to episodes.
    # This schema maps the offline data columns to
    # `ray.rllib.core.columns.Columns`:
    # `{Columns.OBS: 'o_t', Columns.ACTIONS: 'a_t', ...}`. Columns in
    # the data set that aren't mapped through this schema are sorted into
    # episodes' `extra_model_outputs`. If no schema is passed in the default
    # schema used is `ray.rllib.offline.offline_data.SCHEMA`. If your data set
    # contains already the names in this schema, no `input_read_schema` is
    # needed. The same applies, if the offline data is in RLlib's
    # `EpisodeType` or old `SampleBatch` format
    input_read_schema: Optional[Dict[str, str]],
    # Whether offline data is already stored in RLlib's
    # `EpisodeType` format, i.e. `ray.rllib.env.SingleAgentEpisode` (multi
    # -agent is planned but not supported, yet). Reading episodes directly
    # avoids additional transform steps and is usually faster and
    # therefore the recommended format when your application remains fully
    # inside of RLlib's schema. The other format is a columnar format and is
    # agnostic to the RL framework used. Use the latter format, if you are
    # unsure when to use the data or in which RL framework. The default is
    # to read column data, i.e. `False`. `input_read_episodes` and
    # `input_read_sample_batches` can't be `True` at the same time. See
    # also `output_write_episodes` to define the output data format when
    # recording.
    input_read_episodes: Optional[bool],
    # Whether offline data is stored in RLlib's old
    # stack `SampleBatch` type. This is usually the case for older data
    # recorded with RLlib in JSON line format. Reading in `SampleBatch`
    # data needs extra transforms and might not concatenate episode chunks
    # contained in different `SampleBatch`es in the data. If possible avoid
    # to read `SampleBatch`es and convert them in a controlled form into
    # RLlib's `EpisodeType` (i.e. `SingleAgentEpisode`). The default is
    # `False`. `input_read_episodes` and `input_read_sample_batches` can't
    # be True at the same time.
    input_read_sample_batches: Optional[bool],
    # Batch size to pull from the data set. This could
    # differ from the `train_batch_size_per_learner`, if a dataset holds
    # `EpisodeType` (i.e. `SingleAgentEpisode`) or `SampleBatch`, or any
    # other data type that contains multiple timesteps in a single row of the
    # dataset. In such cases a single batch of size
    # `train_batch_size_per_learner` potentially pulls a multiple of
    # `train_batch_size_per_learner` timesteps from the offline dataset. The
    # default is `None` in which the `train_batch_size_per_learner` is pulled.
    input_read_batch_size: Optional[int],
    # A cloud filesystem to handle access to cloud storage when
    # reading experiences. Can be "gcs" for Google Cloud Storage, "s3" for AWS
    # S3 buckets, "abs" for Azure Blob Storage, or any filesystem supported
    # by PyArrow. In general the file path is sufficient for accessing data
    # from public or local storage systems. See
    # https://arrow.apache.ac.cn/docs/python/filesystems.html for details.
    input_filesystem: Optional[str],
    # A dictionary holding the kwargs for the filesystem
    # given by `input_filesystem`. See `gcsfs.GCSFilesystem` for GCS,
    # `pyarrow.fs.S3FileSystem`, for S3, and `ablfs.AzureBlobFilesystem` for
    # ABS filesystem arguments.
    input_filesystem_kwargs: Optional[Dict],
    # What input columns are compressed with LZ4 in the
    # input data. If data is stored in RLlib's `SingleAgentEpisode` (
    # `MultiAgentEpisode` not supported, yet). Note the providing
    # `rllib.core.columns.Columns.OBS` also tries to decompress
    # `rllib.core.columns.Columns.NEXT_OBS`.
    input_compress_columns: Optional[List[str]],
    # Whether the raw data should be materialized in memory.
    # This boosts performance, but requires enough memory to avoid an OOM, so
    # make sure that your cluster has the resources available. For very large
    # data you might want to switch to streaming mode by setting this to
    # `False` (default). If your algorithm doesn't need the RLModule in the
    # Learner connector pipeline or all (learner) connectors are stateless
    # you should consider setting `materialize_mapped_data` to `True`
    # instead (and set `materialize_data` to `False`). If your data doesn't
    # fit into memory and your Learner connector pipeline requires an RLModule
    # or is stateful, set both `materialize_data` and
    # `materialize_mapped_data` to `False`.
    materialize_data: Optional[bool],
    # Whether the data should be materialized after
    # running it through the Learner connector pipeline (i.e. after running
    # the `OfflinePreLearner`). This improves performance, but should only be
    # used in case the (learner) connector pipeline doesn't require an
    # RLModule and the (learner) connector pipeline is stateless. For example,
    # MARWIL's Learner connector pipeline requires the RLModule for value
    # function predictions and training batches would become stale after some
    # iterations causing learning degradation or divergence. Also ensure that
    # your cluster has enough memory available to avoid an OOM. If set to
    # `True`, make sure that `materialize_data` is set to `False` to
    # avoid materialization of two datasets. If your data doesn't fit into
    # memory and your Learner connector pipeline requires an RLModule or is
    # stateful, set both `materialize_data` and `materialize_mapped_data` to
    # `False`.
    materialize_mapped_data: Optional[bool],
    # Keyword args for the `map_batches` method. These are
    # passed into the `ray.data.Dataset.map_batches` method when sampling
    # without checking. If no arguments passed in the default arguments
    # `{'concurrency': max(2, num_learners), 'zero_copy_batch': True}` is
    # used. Use these keyword args together with `input_read_method_kwargs`
    # and `iter_batches_kwargs` to tune the performance of the data pipeline.
    map_batches_kwargs: Optional[Dict],
    # Keyword args for the `iter_batches` method. These are
    # passed into the `ray.data.Dataset.iter_batches` method when sampling
    # without checking. If no arguments are passed in, the default argument
    # `{'prefetch_batches': 2}` is used. Use these keyword args
    # together with `input_read_method_kwargs` and `map_batches_kwargs` to
    # tune the performance of the data pipeline.
    iter_batches_kwargs: Optional[Dict],
    # An optional `OfflinePreLearner` class that's used to
    # transform data batches in `ray.data.map_batches` used in the
    # `OfflineData` class to transform data from columns to batches that can
    # be used in the `Learner.update...()` methods. Override the
    # `OfflinePreLearner` class and pass your derived class in here, if you
    # need to make some further transformations specific for your data or
    # loss. The default is `None`` which uses the base `OfflinePreLearner`
    # defined in `ray.rllib.offline.offline_prelearner`.
    prelearner_class: Optional[Type],
    # An optional `EpisodeReplayBuffer` class is
    # used to buffer experiences when data is in `EpisodeType` or
    # RLlib's previous `SampleBatch` type format. In this case, a single
    # data row may contain multiple timesteps and the buffer serves two
    # purposes: (a) to store intermediate data in memory, and (b) to ensure
    # that exactly `train_batch_size_per_learner` experiences are sampled
    # per batch. The default is RLlib's `EpisodeReplayBuffer`.
    prelearner_buffer_class: Optional[Type],
    # Optional keyword arguments for intializing the
    # `EpisodeReplayBuffer`. In most cases this is simply the `capacity`
    # for the default buffer used (`EpisodeReplayBuffer`), but it may
    # differ if the `prelearner_buffer_class` uses a custom buffer.
    prelearner_buffer_kwargs: Optional[Dict],
    # Number of updates to run in each learner
    # during a single training iteration. If None, each learner runs a
    # complete epoch over its data block (the dataset is partitioned into
    # at least as many blocks as there are learners). The default is `None`.
    # This must be set to `1`, if a single (local) learner is used.
    dataset_num_iters_per_learner: Optional[int],
)

输出 API#

您可以使用以下选项配置 agent 的经验输出:

def offline_data(
    # Specify where experiences should be saved:
    # - None: don't save any experiences
    # - a path/URI to save to a custom output directory (for example, "s3://bckt/")
    output: Optional[str],
    # What sample batch columns to LZ4 compress in the output data.
    # Note that providing `rllib.core.columns.Columns.OBS` also
    # compresses `rllib.core.columns.Columns.NEXT_OBS`.
    output_compress_columns: Optional[List[str]],
    # Max output file size (in bytes) before rolling over to a new
    # file.
    output_max_file_size: Optional[float],
    # Max output row numbers before rolling over to a new file.
    output_max_rows_per_file: Optional[int],
    # Write method for the `ray.data.Dataset` to write the
    # offline data to `output`. The default is `read_parquet` for Parquet
    # files. See https://docs.rayai.org.cn/en/latest/data/api/input_output.html for
    # more info about available read methods in `ray.data`.
    output_write_method: Optional[str],
    # Keyword arguments for the `output_write_method`. These are
    # passed into the write method without checking.
    output_write_method_kwargs: Optional[Dict],
    # A cloud filesystem to handle access to cloud storage when
    # writing experiences. Can be "gcs" for Google Cloud Storage, "s3" for AWS
    # S3 buckets, "abs" for Azure Blob Storage, or any filesystem supported
    # by PyArrow. In general the file path is sufficient for accessing data
    # from public or local storage systems. See
    # https://arrow.apache.ac.cn/docs/python/filesystems.html for details.
    output_filesystem: Optional[str],
    # A dictionary holding the keyword arguments for the filesystem
    # given by `output_filesystem`. See `gcsfs.GCSFilesystem` for GCS,
    # `pyarrow.fs.S3FileSystem`, for S3, and `ablfs.AzureBlobFilesystem` for
    # ABS filesystem arguments.
    output_filesystem_kwargs: Optional[Dict],
    # If data should be recorded in RLlib's `EpisodeType`
    # format (i.e. `SingleAgentEpisode` objects). Use this format, if you
    # need data to be ordered in time and directly grouped by episodes for
    # example to train stateful modules or if you plan to use recordings
    # exclusively in RLlib. Otherwise data is recorded in tabular (columnar)
    # format. Default is `True`.
    output_write_episodes: Optional[bool],