使用离线数据#
RLlib 的离线 RL API 使您能够处理从离线存储(例如,磁盘、云存储、流系统、Hadoop 分布式文件系统 (HDFS))读取的经验。例如,您可能希望读取从先前训练运行中保存的经验、从专家那里收集的经验,或从部署在 Web 应用程序 中的策略收集的经验。您还可以记录在线训练期间生成的新的代理经验以供将来使用。
RLlib 使用 SingleAgentEpisode 对象来表示轨迹序列(例如,(s, a, r, s', ...) 元组)(目前不支持多智能体离线训练)。使用此经验格式可以高效地编码和压缩经验,重写轨迹,并通过 getter 方法方便地访问数据。在在线训练期间,RLlib 使用 SingleAgentEnvRunner actor 并行生成当前策略的经验片段。但是,RLlib 使用相同的经验格式从离线存储读取经验和写入经验(请参阅 OfflineSingleAgentEnvRunner)。
您可以将经验存储在 RLlib 的经验格式或表格(列)格式中。当以下情况时,应使用经验格式:
您需要按轨迹分组并按时间排序的经验(例如,训练有状态模块)。
您希望仅在 RLlib 中使用记录的经验(例如,用于离线 RL 或行为克隆)。
相反,如果出现以下情况,您应该优先选择表格(列)格式:
您需要使用其他数据工具或 ML 库轻松读取数据。
注意
RLlib 的新 API 栈包含了支持独立应用程序的原则。因此,SingleAgentEpisode 类可以在 RLlib 上下文之外使用。为了通过外部数据工具(例如,用于数据转换)实现更快的访问,建议使用表格记录格式。
最重要的是,RLlib 的离线 RL API 构建在 Ray Data 之上,因此支持其所有读写方法(例如 read_parquet、read_json 等),其中 read_parquet 和 write_parquet 是默认的读写方法。API 的一个核心设计原则是在学习器参与之前尽可能多地在运行时应用数据转换,从而使学习器能够专注于模型更新。
提示
在从旧 API 栈到新 API 栈的过渡阶段,您还可以将新的离线 RL API 与使用旧 API 栈记录的 SampleBatch 数据一起使用。要启用此功能,请将 config.offline_data(input_read_sample_batches=True) 设置为 True。
示例:训练专家策略#
在此示例中,您将在 CartPole-v1 环境上训练一个 PPO 代理,直到其达到平均每集返回 450.0。您将此代理保存为检查点,然后使用其策略将专家数据记录到本地磁盘。
from ray.rllib.algorithms.ppo import PPOConfig
from ray.rllib.core.rl_module.default_model_config import DefaultModelConfig
from ray.rllib.utils.metrics import (
ENV_RUNNER_RESULTS,
EVALUATION_RESULTS,
EPISODE_RETURN_MEAN,
)
from ray import tune
# Configure the PPO algorithm.
config = (
PPOConfig()
.environment("CartPole-v1")
.training(
lr=0.0003,
# Run 6 SGD minibatch iterations on a batch.
num_epochs=6,
# Weigh the value function loss smaller than
# the policy loss.
vf_loss_coeff=0.01,
)
.rl_module(
model_config=DefaultModelConfig(
fcnet_hiddens=[32],
fcnet_activation="linear",
# Share encoder layers between value network
# and policy.
vf_share_layers=True,
),
)
)
# Define the metric to use for stopping.
metric = f"{EVALUATION_RESULTS}/{ENV_RUNNER_RESULTS}/{EPISODE_RETURN_MEAN}"
# Define the Tuner.
tuner = tune.Tuner(
"PPO",
param_space=config,
run_config=tune.RunConfig(
stop={
metric: 450.0,
},
name="docs_rllib_offline_pretrain_ppo",
verbose=2,
checkpoint_config=tune.CheckpointConfig(
checkpoint_frequency=1,
checkpoint_at_end=True,
),
),
)
results = tuner.fit()
# Store the best checkpoint to use it later for recording
# an expert policy.
best_checkpoint = (
results
.get_best_result(
metric=metric,
mode="max"
)
.checkpoint.path
)
在此示例中,您保存了一个已成为专家玩家 CartPole-v1 的代理的检查点。您将在下一个示例中使用此检查点将专家数据记录到磁盘,该数据稍后将用于离线训练以克隆另一个代理。
示例:将专家数据记录到本地磁盘#
在训练了一个擅长玩 CartPole-v1 的专家策略后,您在此处加载其策略以在评估期间记录专家数据。您使用 5 个 OfflineSingleAgentEnvRunner 实例来收集每个 sample() 调用 50 个完整经验。在此示例中,您将经验直接存储在 RLlib 的 SingleAgentEpisode 对象中,每个 Parquet 文件最多包含 25 个经验对象。总共,您运行 10 次评估,应从专家策略中获得 500 个记录的经验。您将在下一个示例中使用这些数据通过离线 RL 训练一个新策略,该策略在玩 CartPole-v1 时应达到 450.0 的返回。
from ray.rllib.algorithms.ppo import PPOConfig
from ray.rllib.core import (
COMPONENT_LEARNER_GROUP,
COMPONENT_LEARNER,
COMPONENT_RL_MODULE,
DEFAULT_MODULE_ID,
)
from ray.rllib.core.rl_module import RLModuleSpec
# Store recording data under the following path.
data_path = "/tmp/docs_rllib_offline_recording"
# Configure the algorithm for recording.
config = (
PPOConfig()
# The environment needs to be specified.
.environment(
env="CartPole-v1",
)
# Make sure to sample complete episodes because
# you want to record RLlib's episode objects.
.env_runners(
batch_mode="complete_episodes",
)
# Set up 5 evaluation `EnvRunners` for recording.
# Sample 50 episodes in each evaluation rollout.
.evaluation(
evaluation_num_env_runners=5,
evaluation_duration=50,
evaluation_duration_unit="episodes",
)
# Use the checkpointed expert policy from the preceding PPO training.
# Note, we have to use the same `model_config` as
# the one with which the expert policy was trained, otherwise
# the module state can't be loaded.
.rl_module(
model_config=DefaultModelConfig(
fcnet_hiddens=[32],
fcnet_activation="linear",
# Share encoder layers between value network
# and policy.
vf_share_layers=True,
),
)
# Define the output path and format. In this example you
# want to store data directly in RLlib's episode objects.
# Each Parquet file should hold no more than 25 episodes.
.offline_data(
output=data_path,
output_write_episodes=True,
output_max_rows_per_file=25,
)
)
# Build the algorithm.
algo = config.build()
# Load now the PPO-trained `RLModule` to use in recording.
algo.restore_from_path(
best_checkpoint,
# Load only the `RLModule` component here.
component=COMPONENT_RL_MODULE,
)
# Run 10 evaluation iterations and record the data.
for i in range(10):
print(f"Iteration {i + 1}")
eval_results = algo.evaluate()
print(eval_results)
# Stop the algorithm. Note, this is important for when
# defining `output_max_rows_per_file`. Otherwise,
# remaining episodes in the `EnvRunner`s buffer isn't written to disk.
algo.stop()
注意
RLlib 以 binary 格式存储记录的经验数据。每个经验都被转换为其字典表示形式,并使用 msgpack-numpy 进行序列化,以确保版本兼容性。
RLlib 的记录过程效率很高,因为它在评估期间使用了多个 OfflineSingleAgentEnvRunner 实例,实现了并行数据写入。您可以浏览文件夹以查看存储的 Parquet 数据。
$ ls -la /tmp/docs_rllib_offline_recording/cartpole-v1
drwxr-xr-x. 22 user user 440 21. Nov 17:23 .
drwxr-xr-x. 3 user user 60 21. Nov 17:23 ..
drwxr-xr-x. 2 user user 540 21. Nov 17:23 run-000001-00004
drwxr-xr-x. 2 user user 540 21. Nov 17:23 run-000001-00009
drwxr-xr-x. 2 user user 540 21. Nov 17:23 run-000001-00012
drwxr-xr-x. 2 user user 540 21. Nov 17:23 run-000001-00016
drwxr-xr-x. 2 user user 540 21. Nov 17:23 run-000002-00004
drwxr-xr-x. 2 user user 540 21. Nov 17:23 run-000002-00007
提示
RLlib 将记录存储在以 RL 环境命名的文件夹下。在其中,您会看到每个 OfflineSingleAgentEnvRunner 和写入操作的 Parquet 文件文件夹。写入操作计数包含在第二个编号中。例如:上面,env-runner 1 在其第 4 次 sample() 调用中采样了 25 个经验,然后(因为 output_max_rows_per_file=25)将所有采样的经验写入磁盘,文件名是 run-000001-00004。
注意
由于策略的 rollout 分布不均,每个 worker 的写入操作数可能会有所不同。较快的 worker 会收集更多经验,从而导致写入操作数的差异。因此,由不同 env-runner 实例生成的文件的第二个编号可能会有所不同。
示例:在先前保存的经验上进行训练#
在此示例中,您将使用先前从专家策略玩 CartPole-v1 记录的 Parquet 数据进行行为克隆。数据需要在算法的配置中链接(通过 input_ 属性)。
from ray import tune
from ray.rllib.algorithms.bc import BCConfig
# Setup the config for behavior cloning.
config = (
BCConfig()
.environment(
# Use the `CartPole-v1` environment from which the
# data was recorded. This is merely for receiving
# action and observation spaces and to use it during
# evaluation.
env="CartPole-v1",
)
.learners(
# Use a single learner.
num_learners=0,
)
.training(
# This has to be defined in the new offline RL API.
train_batch_size_per_learner=1024,
)
.offline_data(
# Link the data.
input_=[data_path],
# You want to read in RLlib's episode format b/c this
# is how you recorded data.
input_read_episodes=True,
# Read smaller batches from the data than the learner
# trains on. Note, each batch element is an episode
# with multiple timesteps.
input_read_batch_size=512,
# Create exactly 2 `DataWorkers` that transform
# the data on-the-fly. Give each of them a single
# CPU.
map_batches_kwargs={
"concurrency": 2,
"num_cpus": 1,
},
# When iterating over the data, prefetch two batches
# to improve the data pipeline. Don't shuffle the
# buffer (the data is too small).
iter_batches_kwargs={
"prefetch_batches": 2,
"local_shuffle_buffer_size": None,
},
# You must set this for single-learner setups.
dataset_num_iters_per_learner=1,
)
.evaluation(
# Run evaluation to see how well the learned policy
# performs. Run every 3rd training iteration an evaluation.
evaluation_interval=3,
# Use a single `EnvRunner` for evaluation.
evaluation_num_env_runners=1,
# In each evaluation rollout, collect 5 episodes of data.
evaluation_duration=5,
# Evaluate the policy parallel to training.
evaluation_parallel_to_training=True,
)
)
# Set the stopping metric to be the evaluation episode return mean.
metric = f"{EVALUATION_RESULTS}/{ENV_RUNNER_RESULTS}/{EPISODE_RETURN_MEAN}"
# Configure Ray Tune.
tuner = tune.Tuner(
"BC",
param_space=config,
run_config=tune.RunConfig(
name="docs_rllib_offline_bc",
# Stop behavior cloning when we reach 450 in return.
stop={metric: 450.0},
checkpoint_config=tune.CheckpointConfig(
# Only checkpoint at the end to be faster.
checkpoint_frequency=0,
checkpoint_at_end=True,
),
verbose=2,
)
)
# Run the experiment.
analysis = tuner.fit()
RLlib 中的行为克隆性能极高,单次训练迭代仅需约 2 毫秒。实验结果应类似于以下内容:
您大约需要 98 秒(456 次迭代)才能达到与 PPO 代理相同的平均每集返回。虽然与 PPO 训练时间相比这可能不那么令人印象深刻,但值得注意的是 CartPole-v1 是一个非常简单的学习环境。在更复杂、需要更高级的代理和更长的训练时间的环境中,通过行为克隆进行预训练可能非常有利。将行为克隆与后续的强化学习算法微调相结合,可以大大缩短训练时间、减少资源消耗和相关成本。
使用外部专家经验#
您的专家数据通常已经可用,无论是从生产系统记录的,还是直接由人类专家提供的。通常,您可能以表格(列式)格式存储此数据。RLlib 的新离线 RL API 通过允许通过指定架构来直接摄取此类数据来简化其使用,该架构组织了专家数据。API 读取数据的默认架构在 SCHEMA 中提供。
让我们看一个简单的例子,其中您的专家数据以以下架构存储:(o_t, a_t, r_t, o_tp1, d_t, i_t, logprobs_t)。在这种情况下,您按如下方式提供此架构:
from ray.rllib.algorithms.bc import BCConfig
from ray.rllib.core.columns import Columns
config = (
BCConfig()
...
.offline_data(
input_=[<input_path>],
# Provide the schema of your data (map to column names known to RLlib).
input_read_schema={
Columns.OBS: "o_t",
Columns.ACTIONS: "a_t",
Columns.REWARDS: "r_t",
Columns.NEXT_OBS: "o_tp1",
Columns.INFOS: "i_t",
"done": "d_t",
},
)
)
注意
内部,旧版 gym 的 done 信号被映射到 gymnasium 的 terminated 信号,其中 truncated 值默认为 False。RLlib 的 SingleAgentEpisode 结构与 gymnasium 对齐,符合强化学习中更新的环境 API 标准。
将表格数据转换为 RLlib 的经验格式#
虽然表格格式具有广泛的兼容性并与 RLlib 的新离线 RL API 无缝集成,但在某些情况下,您可能更喜欢使用 RLlib 的原生经验格式。如前所述,这种情况通常发生在需要完整专家轨迹时。
注意
RLlib 以批处理方式处理表格数据,将每一行转换为一个 *单步经验*。这种方法主要是出于程序上的简单性,因为数据通常不能假定为按时间顺序到达的、按经验分组的行,尽管有时可能(然而,RLlib 无法轻松自动推断出这种结构,因此这种结构的知识在于用户)。虽然可以连接连续的 SingleAgentEpisode 块,但无法对以某种乱序到达的块执行此操作。
如果您需要完整的轨迹,可以将表格数据转换为 SingleAgentEpisode 对象,并将它们存储为 Parquet 格式。下一个示例将展示如何做到这一点。首先,您将先前训练的专家策略的经验以表格格式存储(注意下面的 output_write_episodes=False 设置以激活表格数据输出):
from ray.rllib.algorithms.ppo import PPOConfig
from ray.rllib.core import (
COMPONENT_LEARNER_GROUP,
COMPONENT_LEARNER,
COMPONENT_RL_MODULE,
DEFAULT_MODULE_ID,
)
from ray.rllib.core.rl_module import RLModuleSpec
# Set up a path for the tabular data records.
tabular_data_path = "tmp/docs_rllib_offline_recording_tabular"
# Configure the algorithm for recording.
config = (
PPOConfig()
# The environment needs to be specified.
.environment(
env="CartPole-v1",
)
# Make sure to sample complete episodes because
# you want to record RLlib's episode objects.
.env_runners(
batch_mode="complete_episodes",
)
# Set up 5 evaluation `EnvRunners` for recording.
# Sample 50 episodes in each evaluation rollout.
.evaluation(
evaluation_num_env_runners=5,
evaluation_duration=50,
)
# Use the checkpointed expert policy from the preceding PPO training.
# Note, we have to use the same `model_config` as
# the one with which the expert policy was trained, otherwise
# the module state can't be loaded.
.rl_module(
model_config=DefaultModelConfig(
fcnet_hiddens=[32],
fcnet_activation="linear",
# Share encoder layers between value network
# and policy.
vf_share_layers=True,
),
)
# Define the output path and format. In this example you
# want to store data directly in RLlib's episode objects.
.offline_data(
output=tabular_data_path,
# You want to store for this example tabular data.
output_write_episodes=False,
)
)
# Build the algorithm.
algo = config.build()
# Load the PPO-trained `RLModule` to use in recording.
algo.restore_from_path(
best_checkpoint,
# Load only the `RLModule` component here.
component=COMPONENT_RL_MODULE,
)
# Run 10 evaluation iterations and record the data.
for i in range(10):
print(f"Iteration {i + 1}")
res_eval = algo.evaluate()
print(res_eval)
# Stop the algorithm. Note, this is important for when
# defining `output_max_rows_per_file`. Otherwise,
# remaining episodes in the `EnvRunner`s buffer isn't written to disk.
algo.stop()
您可能已经注意到,以表格格式记录数据比以经验格式记录要慢得多。这种较慢的性能是由于将经验数据转换为列式格式所需的额外后期处理。要确认记录的数据现在是列式格式,您可以打印其架构:
from ray import data
# Read the tabular data into a Ray dataset.
ds = ray.data.read_parquet(tabular_data_path)
# Now, print its schema.
print("Tabular data schema of expert experiences:\n")
print(ds.schema())
# Column Type
# ------ ----
# eps_id string
# agent_id null
# module_id null
# obs ArrowTensorTypeV2(shape=(4,), dtype=float)
# actions int32
# rewards double
# new_obs ArrowTensorTypeV2(shape=(4,), dtype=float)
# terminateds bool
# truncateds bool
# action_dist_inputs ArrowTensorTypeV2(shape=(2,), dtype=float)
# action_logp float
# weights_seq_no int64
注意
当 infos 全为空时,它们不会被写入磁盘。
如果您的专家数据是列式格式,并且您需要基于完整的专家轨迹进行训练,您可以按照以下示例中的代码将您自己的数据转换为 RLlib 的 SingleAgentEpisode 对象:
import gymnasium as gym
import msgpack
import msgpack_numpy as mnp
from collections import defaultdict
from ray import data
from ray.rllib.env.single_agent_episode import SingleAgentEpisode
# Load the dataset with the tabular data.
ds = data.read_parquet(tabular_data_path)
# Build the environment from which the data was sampled to get the
# spaces.
env = gym.make("CartPole-v1")
# Define buffers for episode data.
eps_obs = []
eps_actions = []
eps_rewards = []
# Note, extra-model-outputs needs to be a dictionary with list
# values.
eps_extra_model_outputs = defaultdict(list)
# Define a buffer for unwritten episodes.
episodes = []
# Start iterating over the rows of your experience data.
for i, row in enumerate(ds.iter_rows(prefetch_batches=10)):
# If the episode isn't terminated nor truncated, buffer the data.
if not row["terminateds"] and not row["truncateds"]:
eps_obs.append(row["obs"])
eps_actions.append(row["actions"])
eps_rewards.append(row["rewards"])
eps_extra_model_outputs["action_dist_inputs"].append(row["action_dist_inputs"])
eps_extra_model_outputs["action_logp"].append(row["action_logp"])
# Otherwise, build the episode.
else:
eps_obs.append(row["new_obs"])
episode = SingleAgentEpisode(
id_=row["eps_id"],
agent_id=row["agent_id"],
module_id=row["module_id"],
observations=eps_obs,
# Use the spaces from the environment.
observation_space=env.observation_space,
action_space=env.action_space,
actions=eps_actions,
rewards=eps_rewards,
# Set the starting timestep to zero.
t_started=0,
# You don't want to have a lookback buffer.
len_lookback_buffer=0,
terminated=row["terminateds"],
truncated=row["truncateds"],
extra_model_outputs=eps_extra_model_outputs,
)
# Store the ready-to-write episode to the episode buffer.
episodes.append(msgpack.packb(episode.get_state(), default=mnp.encode))
# Clear all episode data buffers.
eps_obs.clear()
eps_actions.clear()
eps_rewards.clear()
eps_extra_model_outputs = defaultdict(list)
# Write episodes to disk when the episode buffer holds 50 episodes.
if len(episodes) > 49:
# Generate a Ray dataset from episodes.
episodes_ds = data.from_items(episodes)
# Write the Parquet data and compress it.
episodes_ds.write_parquet(
f"/tmp/test_converting/file-{i}".zfill(6),
compression="gzip",
)
# Delete the dataset in memory and clear the episode buffer.
del episodes_ds
episodes.clear()
# If we are finished and have unwritten episodes, write them now.
if len(episodes) > 0:
episodes_ds = data.from_items(episodes)
episodes_ds.write_parquet(
f"/tmp/test_converting/file-{i}".zfill(6),
compression="gzip",
)
del episodes_ds
episodes.clear()
使用旧 API 栈 SampleBatch 录制#
如果您拥有先前使用 RLlib 旧 API 栈记录的专家数据,可以通过设置 input_read_sample_batches=True 来无缝地将其用于新栈的离线 RL API。或者,您可以使用 RLlib 的 OfflinePreLearner 将您的 SampleBatch 记录转换为 SingleAgentEpisode 格式,如下例所示:
import msgpack
import msgpack_numpy as mnp
from ray import data
from ray.rllib.offline.offline_prelearner import OfflinePreLearner
# Set up the data path to your `SampleBatch` expert data.
data_path = ...
# Set up the write path for the Parquet episode data.
output_data_path = "/tmp/sample_batch_data"
# Load the `SampleBatch` recordings.
ds = data.read_json(data_path)
# Iterate over batches (of `SampleBatch`es) and convert them to episodes.
for i, batch in enumerate(ds.iter_batches(batch_size=100, prefetch_batches=2)):
# Use the RLlib's `OfflinePreLearner` to convert `SampleBatch`es to episodes.
episodes = OfflinePreLearner._map_sample_batch_to_episode(False, batch)["episodes"]
# Create a dataset from the episodes. Note, for storing episodes you need to
# serialize them through `msgpack-numpy`.
episode_ds = data.from_items([msgpack.packb(eps.get_state(), default=mnp.encode) for eps in episodes])
# Write the batch of episodes to local disk.
episode_ds.write_parquet(output_data_path + f"/file-{i}".zfill(6), compression="gzip")
print("Finished converting `SampleBatch` data to episode data.")
注意
RLlib 将您的 SampleBatch 视为一个已终止/截断的经验,并在此假设下构建其 SingleAgentEpisode。
预处理、过滤和后处理#
在记录过程中,您的专家策略可能会使用观察值的预处理技术,例如 *帧堆叠*,或过滤方法,如 *均值-标准差过滤*。类似地,动作也可能经过预处理,例如 *动作采样* 或 *缩放*。在其 EnvRunner 实例中,RLlib 会在观察值传递给 RLModule **之前**,通过(*env-to-module* 连接器管道)应用此类预处理和过滤。然而,原始观察值(直接从环境中获得)会被存储在经验中。同样,动作以其原始形式(直接从 RLModule 输出)进行记录,同时(通过 RLlib 的 *module-to-env* 连接器)经过预处理,然后发送到环境。
仔细考虑经验记录过程中应用的预处理和过滤至关重要,因为它们会显著影响专家策略的学习方式以及随后在环境中的表现。例如,如果专家策略对观察值使用 *均值-标准差过滤*,它将基于过滤后的观察值学习策略,而过滤器本身高度依赖于在训练期间收集的经验。在部署此专家策略时,必须在评估期间使用完全相同的过滤器,以避免性能下降。类似地,通过行为克隆训练的策略也可能需要观察值的 *均值-标准差过滤器* 来准确复制专家策略的行为。
扩展 I/O 吞吐量#
就像在线训练可以扩展一样,可以通过配置 RLlib env-runner 的数量来增加离线记录 I/O 吞吐量。使用 num_env_runners 设置来扩展训练期间的记录,或者使用 evaluation_num_env_runners 来扩展仅评估期间的记录。每个 worker 独立运行,并行写入经验,从而实现写入操作 I/O 吞吐量的线性扩展。在每个 OfflineSingleAgentEnvRunner 中,会采样经验并进行序列化,然后写入磁盘。
RLlib 中的离线 RL 训练是高度并行化的,包括数据读取、后处理以及(如果适用)更新。在离线数据上训练时,通过增加用于将离线经验转换为学习器兼容格式(MultiAgentBatch)的 DataWorker 实例的数量来实现可扩展性。Ray Data 在底层优化读取操作,利用文件元数据、批处理后处理的预定义并发设置以及可用的系统资源。强烈建议不要覆盖这些默认设置,因为这样做可能会扰乱此优化过程。
RLlib 中的数据处理涉及三个关键层,所有这些层都是高度可扩展的:
读取操作:此层处理从指定文件夹中的文件摄取数据。它由 Ray Data 自动优化,不应手动调整或修改。
后处理(PreLearner):在此阶段,批次将被转换为(如果需要)RLlib 的
SingleAgentEpisode格式,并通过 *学习器连接器管道*。然后,处理后的数据被转换为MultiAgentBatch对象以进行更新。此层可以通过增加DataWorker实例的数量来扩展。更新(Learner):此阶段涉及策略和相关模块的更新。通过增加学习器的数量(
num_learners)来实现可扩展性,从而可以在更新期间并行处理批次。
下图说明了这些层及其可扩展性:
读取操作仅在 CPU 上执行,主要通过分配额外资源来扩展(有关详细信息,请参阅 如何调整性能),因为它们完全由 Ray Data 管理。后处理可以通过增加映射操作的关键字参数中指定的并发级别来扩展:
config = (
AlgorithmConfig()
.offline_data(
map_batches_kwargs={
"concurrency": 10,
"num_cpus": 4,
}
)
)
这将启动一个包含 10 个 DataWorker 实例的 actor 池,每个实例运行一个 RLlib 的可调用类 OfflinePreLearner,用于后处理批次以更新 RLModule。
注意
num_cpus(以及类似的 num_gpus)属性定义了 **分配给每个** DataWorker 的资源,而不是整个 actor 池。
您可以在 RLlib 的 learners() 配置块中扩展学习器的数量:
config = (
AlgorithmConfig()
.learners(
num_learners=4,
num_gpus_per_learner=1,
)
)
通过此配置,您将启动一个包含 4 个(远程)Learner 的应用程序(有关 RLlib 学习器的更多详细信息,请参阅 :ref:`Learner (Alpha)`),每个学习器使用一个 GPU。
使用云存储#
与 RLlib 的先前栈不同,新的离线 RL API 是云无关的,并与 PyArrow 完全集成。您可以利用任何可用的云存储路径或 PyArrow 兼容的文件系统。如果使用 PyArrow 或兼容的文件系统,请确保您的 input_ 路径是此文件系统内的相对路径。与 Ray Data 类似,您也可以使用占位符、文件或文件夹列表,或者仅指定一个文件夹以递归读取。
例如,要从 GCS 中的存储桶读取,您可以按如下方式指定文件夹位置:
config=(
AlgorithmConfig()
.offline_data(
input_="gs://<your-bucket>/dir1",
)
)
此配置允许 RLlib 从指定路径下的任何文件夹递归读取数据。如果您正在使用 GCS 的文件系统(例如,由于身份验证要求),请使用以下语法:
import pyarrow.fs
# Define the PyArrow filesystem
gcs = pyarrow.fs.GcsFilesystem(
# This is needed to resolve the hostname for public buckets.
anonymous=True,
retry_time_limit=timedelta(seconds=15)
)
# Define the configuration.
config= (
AlgorithmConfig()
.offline_data(
# NOTE: Use a relative file path now
input_="<public-bucket>/dir1",
input_filesystem=gcs,
)
)
您可以在 PyArrow 文件系统接口 中了解更多关于 PyArrow 的文件系统,特别是关于云文件系统和所需身份验证的信息。
使用云存储进行录制#
在从专家策略记录经验时,您也可以以类似的方式使用云存储:
config= (
AlgorithmConfig()
.offline_data(
output="gs://<your-bucket>/dir1",
)
)
RLlib 然后直接写入云存储中的文件夹,如果该文件夹在存储桶中尚不存在,则会创建它。与读取的区别在于,您不能使用多个路径进行写入。所以类似
config= (
AlgorithmConfig()
.offline_data(
output=["gs://<your-bucket>/dir1", "gs://<your-bucket>/dir2"],
)
)
这样的写法将无法工作。如果存储需要特殊权限来创建文件夹和/或写入文件,请确保集群用户获得必要的权限。否则将导致写入访问被拒绝,从而导致录制过程停止。
注意
使用云存储时,Ray Data 通常会流式传输数据,这意味着数据是以块的形式被消耗的。这使得后处理和训练可以在短暂的预热阶段后开始。更具体地说,即使您的云存储很大,运行 RLlib 的节点上也不需要相同大小的空间。
如何调整性能#
在 RLlib 的离线 RL API 中,各个关键层由不同的模块和配置管理,这使得有效扩展这些层变得不那么简单。了解特定参数及其对系统性能的影响至关重要。
如何调整读取操作#
如前所述,**读取操作**层由 Ray Data 自动处理并动态优化。强烈建议避免修改此过程。但是,有几个参数可以在一定程度上提高此层的性能,包括:
可用资源(分配给作业)。
数据局部性。
数据分片。
数据修剪。
可用资源#
Ray Data 采用的调度策略独立于任何现有放置组,它分别调度任务和 actor。因此,为作业中的其他任务和 actor 保留足够的资源至关重要。为了优化 Ray Data 在读取操作方面的可扩展性并提高读取性能,请考虑增加集群中的可用资源,同时保留现有任务和 actor 的资源分配。需要监控和配置的关键资源是 CPU 和对象存储内存。对象存储内存不足,尤其是在高压情况下,可能会导致对象溢出到磁盘,这会严重影响应用程序性能。
带宽是影响集群内吞吐量的关键因素。在某些情况下,扩展节点数量可以增加带宽,从而增强数据从存储到消费进程的流动。此方法有益的场景包括:
与网络骨干网的独立连接:节点使用专用带宽,避免共享上行链路和潜在瓶颈(例如,请参阅 AWS 和 GCP 网络带宽文档)。
优化的云访问:使用 S3 Transfer Acceleration、Google Cloud Storage FUSE 或并行和加速数据传输方法等功能来提高性能。
数据局部性#
数据局部性是实现快速数据处理的关键因素。例如,如果您的数据位于 GCP 上,而在 AWS S3 或本地机器上运行 Ray 集群,将不可避免地导致传输速率低下和数据处理缓慢。为确保最佳性能,将数据存储在与 Ray 集群相同的区域、同一可用区和云提供商中,通常足以启用 RLlib 离线 RL API 的高效流式传输。需要考虑的其他调整包括:
多区域存储桶:使用多区域存储来提高数据可用性,并可能提高分布式系统的访问速度。
存储桶内的存储类别优化:**标准存储**用于频繁访问和低延迟流式传输。避免使用 AWS Glacier 或 GCP Archive 等存档存储类别进行流式传输工作负载,因为检索时间较长。
数据分片#
数据分片通过平衡块大小来提高获取、传输和读取数据的效率。如果块太大,会导致传输和处理过程中的延迟,从而造成瓶颈。反之,过小的块会导致过高的元数据获取开销,降低整体性能。找到最佳块大小对于平衡这些权衡并最大化吞吐量至关重要。
经验法则:将数据文件大小保持在 64MiB 到 256MiB 之间。
数据修剪#
如果您的数据是 **Parquet** 格式(RLlib 推荐的离线数据格式),您可以利用数据修剪来优化性能。Ray Data 通过投影下推(列过滤)和过滤器下推(行过滤)在其 read_parquet() 方法中支持修剪。这些过滤器直接在文件扫描期间应用,减少了加载到内存中的不必要数据量。
例如,如果您只需要离线数据中的特定列(例如,避免加载 infos 列):
from ray.rllib.algorithms.algorithm_config import AlgorithmConfig
from ray.rllib.core.columns import Columns
config = (
AlgorithmConfig()
.offline_Data(
input_read_method_kwargs={
"columns": [
Columns.EPS_ID,
Columns.AGENT_ID,
Columns.OBS,
Columns.NEXT_OBS,
Columns.REWARDS,
Columns.ACTIONS,
Columns.TERMINATED,
Columns.TRUNCATED,
],
},
)
)
同样,如果您只需要数据集中的特定行,可以应用下推过滤器,如下所示
import pyarrow.dataset
from ray.rllib.algorithms.algorithm_config import AlgorithmConfig
from ray.rllib.core.columns import Columns
config = (
AlgorithmConfig()
.offline_data(
input_read_method_kwargs={
"filter": pyarrow.dataset.field(Columns.AGENT_ID) == "agent_1",
},
)
)
如何调整后处理(PreLearner)#
启用读取操作中的高吞吐量时,确保后处理(Pre-Learner)阶段有足够的处理能力至关重要。该阶段容量不足会导致反压,进而增加内存使用量,在严重情况下,还会导致对象溢出到磁盘甚至出现内存不足(Out-Of-Memory)(参见内存不足预防)错误。
调整**后处理(Pre-Learner)**层通常比优化**读取操作**层更直接。以下参数可以进行调整以优化其性能
Actor 池大小
分配的资源
读取批次和缓冲区大小。
Actor 池大小#
在内部,**后处理(PreLearner)**层由一个 map_batches() 操作定义,该操作启动一个 _ActorPool。池中的每个 actor 运行一个 OfflinePreLearner 实例,用于在数据从磁盘传输到 RLlib 的 Learner 的过程中对其进行转换。显然,这个 _ActorPool 的大小决定了该层的吞吐量,需要根据上一层的吞吐量进行微调,以避免反压。您可以使用 RLlib 的 map_batches_kwargs 参数中的 concurrency 来定义此池大小
from ray.rllib.algorithm_config import AlgorithmConfig
config = (
AlgorithmConfig()
.offline_data(
map_batches_kwargs={
"concurrency": 4,
},
)
)
使用前面的代码,您将使 Ray Data 启动最多 4 个并行的 OfflinePreLearner actor,这些 actor 可以对您的数据进行后处理以供训练。
注意
Ray Data 会根据您**后处理(Pre-Learner)**层的并行度动态调整其读取操作。它会根据**后处理(Pre-Learner)**阶段的反压来扩展读取操作。这意味着您的整个流式处理管道的吞吐量取决于下游任务的性能以及分配给**读取操作**层的资源(参见如何调整读取操作)。然而,由于扩展读取操作的开销,反压——以及在严重情况下,对象溢出或内存不足(OOM)错误——并不总是能完全避免。
您还可以通过提供一个间隔而不是一个固定数字来为您的**后处理(PreLearner)**启用自动缩放
from ray.rllib.algorithm_config import AlgorithmConfig
config = (
AlgorithmConfig()
.offline_data(
map_batches_kwargs={
"concurrency": (4, 8),
},
)
)
这允许 Ray Data 启动多达 8 个后处理 actor,以便更快地处理下游数据,例如在出现反压的情况下。
注意
在**后处理(Pre-Learner)**层实现自动缩放的 actor 池并不能保证消除反压。增加更多的 OffLinePreLearner 实例会给系统带来额外的开销。RLlib 的离线 RL 管道针对流式数据进行了优化,这些数据通常具有稳定的吞吐量和资源使用情况,除了上游和下游任务之间不平衡的情况。经验法则:仅在以下情况下考虑使用自动缩放:(1)预期吞吐量变化很大,(2)集群资源会发生波动(例如,在共享或动态环境中),和/或(3)工作负载特征高度不可预测。
分配的资源#
除了后处理 actor 的数量之外,您还可以通过定义要分配给 actor 池中每个 OffLinePreLearner 的资源来调整**后处理(PreLearner)**层的性能。这些资源可以通过 num_cpus 和 num_gpus 或在 ray_remote_args 中定义。
注意
通常,增加 CPU 数量足以对您管道的后处理阶段进行性能调优。GPU 仅在特殊情况下需要,例如在自定义管道中。例如,RLlib 的 MARWIL 实现使用其 ConnectorPipelineV2 中的 GeneralAdvantageEstimation 连接器来对经验批次应用 通用优势估计。在这些计算中,应用了算法 RLModule 的值模型,您可以通过在 GPU 上运行来加速它。
例如,要为**后处理(PreLearner)** 2 个 CPU 中的每个 4 个 OfflinePreLearner 分配,您可以使用以下语法
from ray.rllib.algorithms.algorithm_config import AlgorithmConfig
config = (
AlgorithmConfig()
.offline_data(
map_batches_kwargs={
"concurrency": 4,
"num_cpus": 2,
},
)
)
警告
不要覆盖 RLlib 的 map_batches_kwargs 中的 batch_size。这通常会导致性能大幅下降。请注意,此 batch_size 与 train_batch_size_per_learner 不同:前者指定流式管道转换中的批次大小,而后者定义了每个 Learner 中用于训练的批次大小(实际模型前向和后向传播用于训练的批次大小)。
读取批次和缓冲区大小#
当处理来自 SingleAgentEpisode 或旧版 SampleBatch 格式的数据时,微调 input_read_batch_size 参数提供了额外的优化机会。此参数控制从数据文件中检索的批次大小。当处理情景性或旧版 SampleBatch 数据时,其效果尤为显著,因为流式管道为此类数据使用 EpisodeReplayBuffer 来处理每行数据中包含的多个时间步。所有传入的数据都会转换为 SingleAgentEpisode 实例(如果尚未采用此格式),并存储在情景回放缓冲区中,该缓冲区精确管理为训练而采样的 train_batch_size_per_learner。
在您的流式处理管道中实现数据摄入效率和采样变化之间的最佳平衡至关重要。考虑以下示例:假设每个 SingleAgentEpisode 的长度为 100 个时间步,并且您的 train_batch_size_per_learner 配置为 1000。每个 EpisodeReplayBuffer 实例的容量设置为 1000
from ray.rllib.algorithms.algorithm_config import AlgorithmConfig
config = (
AlgorithmConfig()
.training(
# Train on a batch of 1000 timesteps each iteration.
train_batch_size_per_learner=1000,
)
.offline_data(
# Read in RLlib's new stack `SingleAgentEpisode` data.
input_read_episodes=True
# Define an input read batch size of 10 episodes.
input_read_batch_size=10,
# Set the replay buffer in the `OfflinePrelearner`
# to 1,000 timesteps.
prelearner_buffer_kwargs={
"capacity": 1000,
},
)
)
如果您将 input_read_batch_size 配置为 10,如代码所示,那么 10 个 SingleAgentEpisode 中的每个都能放入缓冲区,从而能够跨多个情景的广泛时间步进行采样。这会产生高采样变异性。现在,考虑将缓冲区容量减少到 500 的情况
from ray.rllib.algorithms.algorithm_config import AlgorithmConfig
config = (
AlgorithmConfig()
.training(
# Train on a batch of 1000 timesteps each iteration.
train_batch_size_per_learner=1000,
)
.offline_data(
# Read in RLlib's new stack `SingleAgentEpisode` data.
input_read_episodes=True
# Define an input read batch size of 10 episodes.
input_read_batch_size=10,
# Set the replay buffer in the `OfflinePrelearner`
# to 500 timesteps.
prelearner_buffer_kwargs={
"capacity": 500,
},
)
)
使用相同的 input_read_batch_size,一次只能缓冲 5 个 SingleAgentEpisode,这会导致效率低下,因为读取的数据多于可用于采样的数据。
在另一种情况下,如果每个 SingleAgentEpisode 的长度仍为 100 个时间步,并且 train_batch_size_per_learner 设置为 4000 个时间步(如下面的代码所示),缓冲区将包含 10 个 SingleAgentEpisode 实例。此配置会导致采样变异性降低,因为许多时间步被重复采样,降低了训练批次之间的多样性。这些示例突出了调整这些参数以有效平衡离线流式处理管道中的数据摄入和采样多样性的重要性。
from ray.rllib.algorithms.algorithm_config import AlgorithmConfig
config = (
AlgorithmConfig()
.training(
# Train on a batch of 4000 timesteps each iteration.
train_batch_size_per_learner=4000,
)
.offline_data(
# Read in RLlib's new stack `SingleAgentEpisode` data.
input_read_episodes=True
# Define an input read batch size of 10 episodes.
input_read_batch_size=10,
# Set the replay buffer in the `OfflinePrelearner`
# to 1,000 timesteps.
prelearner_buffer_kwargs={
"capacity": 500,
},
)
)
提示
要选择合适的 input_read_batch_size,请查看记录情景的长度。在某些情况下,每个单独的情景足够长以满足 train_batch_size_per_learner,您可以选择 input_read_batch_size 为 1。大多数情况下并非如此,您需要考虑缓冲多少情景才能平衡从读取输入中消化的数据量以及从 OfflinePreLearner 中的 EpisodeReplayBuffer 实例采样的数据的变异性。
如何调整更新(Learner)#
**更新(Learner)**是 RLlib 离线 RL 管道中的最后一个下游任务,其消耗速度决定了数据管道的整体吞吐量。如果学习过程缓慢,可能会导致上游层的反压,可能导致对象溢出或内存不足(OOM)错误。因此,与上游组件协同调整此层至关重要。可以通过调整几个参数来优化离线算法的学习速度
Actor 池大小
分配的资源
调度策略
批次大小
批次预取
Learner 迭代次数。
Actor 池大小#
RLlib 通过 num_learners 参数支持 Learner 实例的缩放。当此值为 0 时,RLlib 在本地进程中使用 Learner 实例;对于值 >0,RLlib 使用 backend_executor_BackendExecutor 进行扩展。此执行器会启动您指定的 Learner 实例数量,管理分布式训练,并聚合跨 Learner actor 的中间结果。Learner 缩放提高了训练吞吐量,只有当您的离线数据管道中的上游组件能够以足够的速度提供数据以匹配增加的训练能力时,才应应用它。RLlib 的离线 API 通过利用 streaming_split 在其最终层提供了强大的可扩展性。此功能将数据流分割成多个子流,然后由各个 Learner 实例处理,从而实现高效的并行消耗并提高整体吞吐量。
例如,要将 learner 数量设置为 4,您可以使用以下语法
from ray.rllib.algorithms.algorithm_config import AlgorithmConfig
config = (
AlgorithmConfig()
.learners(num_learners=4)
)
分配的资源#
与后处理(Pre-Learner)层一样,分配额外资源有助于解决训练缓慢的问题。主要可利用的资源是 GPU,因为训练涉及通过 RLModule 进行前向和后向传播,而 GPU 可以显著加速此过程。如果您的训练已在使用 GPU 并且性能仍然存在问题,请考虑通过为每个 Learner 添加更多 GPU 来增加 GPU 内存和计算能力(设置 config.learners(num_gpus_per_learner=...)),或者添加更多 Learner 工作器来进一步分散工作负载(通过设置 config.learners(num_learners=...))。此外,请确保数据吞吐量和上游组件已优化,以使 learner 得到充分利用,因为上游容量不足可能会成为训练过程的瓶颈。
要为您的 learner 提供更多计算能力,请使用 num_gpus_per_learner 或 num_cpus_per_learner,如下所示
from ray.rllib.algorithms.algorithm_config import AlgorithmConfig
config = (
AlgorithmConfig()
.learners(num_learners=4, num_gpus_per_learner=2)
)
调度策略#
Ray 中的调度策略通过尝试将任务和 actor 分布在集群的多个节点上,在任务和 actor 放置中起着关键作用,从而最大限度地提高资源利用率和容错能力。在单节点集群(即一个大型主节点)上运行时,调度策略的影响很小甚至可以忽略不计。然而,在多节点集群中,由于数据局部性的重要性,调度可能会显著影响离线数据管道的性能。数据处理发生在所有节点上,在训练期间保持数据局部性可以提高性能。
在这种情况下,您可以通过将 RLlib 的默认调度策略从 "PACK" 更改为 "SPREAD" 来提高数据局部性。此策略将 Learner actor 分布在集群中,使 Ray Data <data> 能够利用局部性感知的包选择,从而提高效率。
这是一个关于如何更改调度策略的示例
"""Just for show-casing, don't run."""
import os
from ray import data
from ray.rllib.algorithms.algorithm_config.AlgorithmConfig
# Configure a "SPREAD" scheduling strategy for learners.
os.environ["TRAIN_ENABLE_WORKER_SPREAD_ENV"] = "1"
# Get the current data context.
data_context = data.DataContext.get_current()
# Set the execution options such that the Ray Data tries to match
# the locality of an output stream with where learners are located.
data_context.execution_options = data.ExecutionOptions(
locality_with_output=True,
)
# Build the config.
config = (
AlgorithmConfig()
.learners(
# Scale the learners.
num_learners=4,
num_gpus_per_learner=2,
)
.offline_data(
...,
# Run in each RLlib training iteration 10
# iterations per learner (each of them with
# `train_batch_size_per_learner`).
dataset_num_iters_per_learner=20,
)
)
# Build the algorithm from the config.
algo = config.build()
# Train for 10 iterations.
for _ in range(10)
res = algo.train()
批次大小#
批次大小是优化 RLlib 新离线 RL API 性能的最简单参数之一。小批次大小可能导致硬件利用不足,从而效率低下,而过大的批次大小可能会超出内存限制。在流式处理管道中,所选的批次大小会影响数据如何在并行工作器之间划分和处理。较大的批次大小可以减少频繁任务协调的开销,但如果它们超过硬件限制,则会减慢整个管道的速度。您可以使用 train_batch_size_per_learner 属性来配置训练批次大小,如下所示。
from ray.rllib.algorithms.algorithm_config import AlgorithmConfig
config = (
AlgorithmConfig()
.training(
train_batch_size_per_learner=1024,
)
)
在 Ray Data <data> 中,通常的做法是使用 2 的幂作为批次大小。但是,您可以根据自己的需求自由选择任何整数值作为批次大小。
批次预取#
批次预取允许您控制离线数据管道下游的数据消耗。主要目标是确保 learner 保持活跃,维持连续的数据流。这是通过在 learner 处理当前批次时准备下一个批次来实现的。预取决定了为 learner 准备了多少批次,并且应根据生成下一个批次所需的时间和 learner 的更新速度进行调整。预取过多批次可能会导致内存效率低下,在某些情况下,还会导致上游任务出现反压。
您可以在 iter_batches_kwargs 中配置批次预取
from ray.rllib.algorithms.algorithm_config import AlgorithmConfig
config = (
AlgorithmConfig()
.offline_data(
iter_batches_kwargs={
"prefetch_batches": 2,
}
)
)
警告
不要覆盖 RLlib 的 map_batches_kwargs 中的 batch_size。这通常会导致性能大幅下降。请注意,此 batch_size 与 train_batch_size_per_learner 不同:前者指定在迭代流式管道的数据输出时的批次大小,而后者定义了每个 Learner 中用于训练的批次大小。
Learner 迭代次数#
此调优参数仅在使用多个 :Learner 实例时可用。在分布式学习中,每个 Learner 实例处理离线流式管道的一个子流,从该子流中迭代批次。您可以控制每个 Learner 实例在每次 RLlib 训练迭代中运行的次数。结果报告在每次 RLlib 训练迭代后进行。将此参数设置得过低会导致效率低下,而设置得过高可能会阻碍训练监控,并且在某些情况下(例如 RLlib 的 MARWIL 实现)会导致训练数据过时。这是因为一些数据转换依赖于 Learner 实例正在训练的同一个 RLModule。每个子流的迭代次数由属性 dataset_num_iters_per_learner 控制,该属性的默认值为 None,表示它在一个子流上运行一个 epoch。
您可以如下修改此值
from ray.rllib.algorithms.algorithm_config import AlgorithmConfig
config = (
AlgorithmConfig()
.offline_data(
# Train on 20 batches from the substream in each learner.
dataset_num_iters_per_learner=20,
)
)
自定义#
RLlib 中离线 RL 组件的自定义,例如 Algorithm、Learner 或 RLModule,与在线 RL 对应组件的自定义过程类似。有关详细指南,请参阅关于算法、Learners 和 RLlib 的RLModule的文档。RLlib 中的新栈离线 RL 流式处理管道支持在数据流内的各种级别和位置进行自定义,从而为满足您的离线 RL 算法的特定要求提供定制化解决方案。
连接器级别
PreLearner 级别
管道级别。
连接器级别#
通过修改 ConnectorPipelineV2 可以轻松实现对 SingleAgentEpisode 实例的小型数据转换。该组件是 OfflinePreLearner 的一部分,用于准备用于训练的情景。您可以利用 RLlib 库中的任何连接器(参见RLlib 的默认连接器),或者创建自定义连接器(参见RLlib 的 ConnectorV2 示例)集成到 Learner 的 ConnectorPipelineV2 中。必须仔细考虑 ConnectorV2 实例应用的顺序,如RLlib 的 MARWIL 算法的实现所示(参见MARWIL 论文)。
这个 MARWIL 算法计算了一个超越行为克隆的损失,通过在训练中使用优势来改进专家的策略。这些优势是通过使用值模型的 通用优势估计(GAE)计算的。GAE 是通过 GeneralAdvantageEstimation 连接器即时计算的。此连接器有特定要求:它处理 SingleAgentEpisode 实例列表,并且必须是 ConnectorPipelineV2 的最终组件之一。这是因为它的处理依赖于包含 OBS、REWARDS、NEXT_OBS、TERMINATED 和 TRUNCATED 字段的完全准备好的批次。此外,传入的 SingleAgentEpisode 实例必须已包含一个人为延长的时间步。
为满足这些要求,管道必须包含以下 ConnectorV2 实例序列
ray.rllib.connectors.learner.add_one_ts_to_episodes_and_truncate.AddOneTsToEpisodesAndTruncate确保SingleAgentEpisode对象延长一个时间步。ray.rllib.connectors.common.add_observations_from_episodes_to_batch.AddObservationsFromEpisodesToBatch将观察值(OBS)合并到批次中。ray.rllib.connectors.learner.add_next_observations_from_episodes_to_train_batch.AddNextObservationsFromEpisodesToTrainBatch添加下一个观察值(NEXT_OBS)。最后,应用
ray.rllib.connectors.learner.general_advantage_estimation.GeneralAdvantageEstimation连接器。
以下是 RLlib 的 MARWIL 算法中的示例代码片段,演示了此设置
@override(AlgorithmConfig)
def build_learner_connector(
self,
input_observation_space,
input_action_space,
device=None,
):
pipeline = super().build_learner_connector(
input_observation_space=input_observation_space,
input_action_space=input_action_space,
device=device,
)
# Before anything, add one ts to each episode (and record this in the loss
# mask, so that the computations at this extra ts aren't used to compute
# the loss).
pipeline.prepend(AddOneTsToEpisodesAndTruncate())
# Prepend the "add-NEXT_OBS-from-episodes-to-train-batch" connector piece (right
# after the corresponding "add-OBS-..." default piece).
pipeline.insert_after(
AddObservationsFromEpisodesToBatch,
AddNextObservationsFromEpisodesToTrainBatch(),
)
# At the end of the pipeline (when the batch is already completed), add the
# GAE connector, which performs a vf forward pass, then computes the GAE
# computations, and puts the results of this (advantages, value targets)
# directly back in the batch. This is then the batch used for
# `forward_train` and `compute_losses`.
pipeline.append(
GeneralAdvantageEstimation(gamma=self.gamma, lambda_=self.lambda_)
)
return pipeline
定义一个基础 LearnerConnector 管道#
有多种方法可以自定义 LearnerConnectorPipeline。一种方法是如上所示,在 Algorithm 中覆盖 build_learner_connector 方法。或者,您可以通过使用 learner_connector 属性,直接为 LearnerConnectorPipeline 定义自定义 ConnectorV2 片段。
def _make_learner_connector(input_observation_space, input_action_space):
# Create the learner connector.
return CustomLearnerConnector(
parameter_1=0.3,
parameter_2=100,
)
config = (
AlgorithmConfig()
.training(
# Add the connector pipeline as the starting point for
# the learner connector pipeline.
learner_connector=_make_learner_connector,
)
)
如注释中所述,此方法将 ConnectorV2 片段添加到 LearnerConnectorPipeline 中,仅适用于您打算操作原始情景的情况,因为您的 ConnectorV2 片段是构建管道其余部分(包括批处理和其他处理步骤)的基础。如果您想在 LearnerConnectorPipeline 中进一步修改数据,您应该覆盖 Algorithm 的 build_learner_connector 方法,或者考虑第三种选择:覆盖整个 PreLearner。
PreLearner 级别#
如果您需要在数据到达 SingleAgentEpisode 阶段之前进行更深层次的数据转换,请考虑重写 OfflinePreLearner。此类负责整个数据转换管道,将原始输入数据转换为适合训练的 MultiAgentBatch 对象。例如,如果您的数据存储在需要预解析和重构的特殊格式中(例如,XML、HTML、Protobuf、图像或视频),您可能需要直接处理这些自定义格式。您可以利用 Ray Data's custom datasources <custom_datasource>(例如,read_binary_files())等工具来管理摄取过程。为了确保这些数据被正确地结构化并排序到 SingleAgentEpisode 对象中,您可以重写静态方法 _map_to_episodes()。
对于更广泛的自定义,您可以重写 __call__ 方法来定义自定义转换步骤,实现一个独特的 LearnerConnectorPipeline,并为 Learner 构建 MultiAgentBatch 实例。
以下示例演示了如何使用自定义 OfflinePreLearner 来处理文本数据并构建训练批次
import gymnasium as gym
import numpy as np
import uuid
from typing import Any, Dict, List, Optional, Union
from ray import data
from ray.rllib.env.single_agent_episode import SingleAgentEpisode
from ray.rllib.offline.offline_prelearner import OfflinePreLearner, SCHEMA
from ray.rllib.utils.annotations import override
from ray.rllib.utils.typing import EpisodeType
class TextOfflinePreLearner(OfflinePreLearner):
@staticmethod
@override(OfflinePreLearner)
def _map_to_episodes(
is_multi_agent: bool,
batch: Dict[str, Union[list, np.ndarray]],
schema: Dict[str, str] = SCHEMA,
to_numpy: bool = False,
input_compress_columns: Optional[List[str]] = None,
observation_space: gym.Space = None,
action_space: gym.Space = None,
vocabulary: Dict[str, Any] = None,
**kwargs: Dict[str, Any],
) -> Dict[str, List[EpisodeType]]:
# If we have no vocabulary raise an error.
if not vocabulary:
raise ValueError(
"No `vocabulary`. It needs a vocabulary in form of dictionary ",
"mapping tokens to their IDs."
)
# Define container for episodes.
episodes = []
# Data comes in batches of string arrays under the `"text"` key.
for text in batch["text"]:
# Split the text and tokenize.
tokens = text.split(" ")
# Encode tokens.
encoded = [vocabulary[token] for token in tokens]
one_hot_vectors = np.zeros((len(tokens), len(vocabulary), 1, 1))
for i, token in enumerate(tokens):
if token in vocabulary:
one_hot_vectors[i][vocabulary[token] - 1] = 1.0
# Build the `SingleAgentEpisode`.
episode = SingleAgentEpisode(
# Generate a unique ID.
id_=uuid.uuid4().hex,
# agent_id="default_policy",
# module_id="default_policy",
# We use the starting token with all added tokens as observations.
observations=[ohv for ohv in one_hot_vectors],
observation_space=observation_space,
# Actions are defined to be the "chosen" follow-up token after
# given the observation.
actions=encoded[1:],
action_space=action_space,
# Rewards are zero until the end of a sequence.
rewards=[0.0 for i in range(len(encoded) - 2)] + [1.0],
# The episode is always terminated (as sentences in the dataset are).
terminated=True,
truncated=False,
# No lookback. You want the episode to start at timestep zero.
len_lookback_buffer=0,
t_started=0,
)
# If episodes should be numpy'ized. Some connectors need this.
if to_numpy:
episode.to_numpy()
# Append the episode to the list of episodes.
episodes.append(episode)
# Return a batch with key `"episodes"`.
return {"episodes": episodes}
# Define the dataset.
ds = data.read_text("s3://anonymous@ray-example-data/this.txt")
# Create a vocabulary.
tokens = []
for b in ds.iter_rows():
tokens.extend(b["text"].split(" "))
vocabulary = {token: idx for idx, token in enumerate(set(tokens), start=1)}
# Take a small batch of 10 from the dataset.
batch = ds.take_batch(10)
# Now use your `OfflinePreLearner`.
episodes = TextOfflinePreLearner._map_to_episodes(
is_multi_agent=False,
batch=batch,
to_numpy=True,
schema=None,
input_compress_columns=False,
action_space=None,
observation_space=None,
vocabulary=vocabulary,
)
# Show the constructed episodes.
print(f"Episodes: {episodes}")
前面的示例说明了 RLlib 的 Offline RL API 在自定义数据转换方面的灵活性。在本例中,一个自定义的 OfflinePreLearner 处理一批文本数据——组织成句子——并将每个句子转换为一个 SingleAgentEpisode。该静态方法返回一个包含这些 SingleAgentEpisode 实例的列表的字典。同样,您可以通过重写 __call__() 方法来扩展此功能。例如,您可以实现一个 ray.rllib.connectors.learner.learner_connector_pipeline.LearnerConnectorPipeline,该管道将多个观测值(例如,token)堆叠在一起。这可以通过使用 RLlib 的 FrameStackingLearner 来实现,并在下面的示例中进行了说明。
import gymnasium as gym
import numpy as np
import uuid
from typing import Any, Dict, List, Optional, Tuple, Union
from ray import data
from ray.actor import ActorHandle
from ray.rllib.algorithms.algorithm_config import AlgorithmConfig
from ray.rllib.algorithms.bc.bc_catalog import BCCatalog
from ray.rllib.algorithms.bc.torch.default_bc_torch_rl_module import DefaultBCTorchRLModule
from ray.rllib.connectors.common import AddObservationsFromEpisodesToBatch, BatchIndividualItems, NumpyToTensor, AgentToModuleMapping
from ray.rllib.connectors.learner.add_columns_from_episodes_to_train_batch import AddColumnsFromEpisodesToTrainBatch
from ray.rllib.connectors.learner.frame_stacking import FrameStackingLearner
from ray.rllib.connectors.learner.learner_connector_pipeline import LearnerConnectorPipeline
from ray.rllib.core.learner.learner import Learner
from ray.rllib.core.rl_module.default_model_config import DefaultModelConfig
from ray.rllib.core.rl_module.multi_rl_module import MultiRLModuleSpec
from ray.rllib.core.rl_module.rl_module import RLModuleSpec
from ray.rllib.env.single_agent_episode import SingleAgentEpisode
from ray.rllib.policy.sample_batch import MultiAgentBatch, SampleBatch
from ray.rllib.offline.offline_prelearner import OfflinePreLearner, SCHEMA
from ray.rllib.utils.annotations import override
from ray.rllib.utils.typing import EpisodeType, ModuleID
class TextOfflinePreLearner(OfflinePreLearner):
@override(OfflinePreLearner)
def __init__(
self,
config: "AlgorithmConfig",
learner: Union[Learner, List[ActorHandle]] = None,
locality_hints: Optional[List[str]] = None,
spaces: Optional[Tuple[gym.Space, gym.Space]] = None,
module_spec: Optional[MultiRLModuleSpec] = None,
module_state: Optional[Dict[ModuleID, Any]] = None,
vocabulary: Dict[str, Any] = None,
**kwargs: Dict[str, Any],
):
self.config = config
self.spaces = spaces
self.vocabulary = vocabulary
self.vocabulary_size = len(self.vocabulary)
# Build the `RLModule`.
self._module = module_spec.build()
if module_state:
self._module.set_state(module_state)
# Build the learner connector pipeline.
self._learner_connector = LearnerConnectorPipeline(
connectors=[
FrameStackingLearner(
num_frames=4,
)
],
input_action_space=module_spec.action_space,
input_observation_space=module_spec.observation_space,
)
self._learner_connector.append(
AddObservationsFromEpisodesToBatch(as_learner_connector=True),
)
self._learner_connector.append(
AddColumnsFromEpisodesToTrainBatch(),
)
self._learner_connector.append(
BatchIndividualItems(multi_agent=False),
)
# Let us run exclusively on CPU, then we can convert here to Tensor.
self._learner_connector.append(
NumpyToTensor(as_learner_connector=True),
)
@override(OfflinePreLearner)
def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, List[EpisodeType]]:
# Convert raw data to episodes.
episodes = TextOfflinePreLearner._map_to_episodes(
is_multi_agent=False,
batch=batch,
to_numpy=True,
schema=None,
input_compress_columns=False,
action_space=self.spaces[0],
observation_space=self.spaces[1],
vocabulary=self.vocabulary,
)["episodes"]
# Run the learner connector pipeline with the
# `FrameStackLearner` piece.
batch = self._learner_connector(
rl_module=self._module,
batch={},
episodes=episodes,
shared_data={},
)
# Convert to `MultiAgentBatch` for the learner.
batch = MultiAgentBatch(
{
module_id: SampleBatch(module_data)
for module_id, module_data in batch.items()
},
# TODO (simon): This can be run once for the batch and the
# metrics, but we run it twice: here and later in the learner.
env_steps=sum(e.env_steps() for e in episodes),
)
# Return the `MultiAgentBatch` under the `"batch"` key.
return {"batch": batch}
@staticmethod
@override(OfflinePreLearner)
def _map_to_episodes(
is_multi_agent: bool,
batch: Dict[str, Union[list, np.ndarray]],
schema: Dict[str, str] = SCHEMA,
to_numpy: bool = False,
input_compress_columns: Optional[List[str]] = None,
observation_space: gym.Space = None,
action_space: gym.Space = None,
vocabulary: Dict[str, Any] = None,
**kwargs: Dict[str, Any],
) -> Dict[str, List[EpisodeType]]:
# If we have no vocabulary raise an error.
if not vocabulary:
raise ValueError(
"No `vocabulary`. It needs a vocabulary in form of dictionary ",
"mapping tokens to their IDs."
)
# Define container for episodes.
episodes = []
# Data comes in batches of string arrays under the `"text"` key.
for text in batch["text"]:
# Split the text and tokenize.
tokens = text.split(" ")
# Encode tokens.
encoded = [vocabulary[token] for token in tokens]
one_hot_vectors = np.zeros((len(tokens), len(vocabulary), 1, 1))
for i, token in enumerate(tokens):
if token in vocabulary:
one_hot_vectors[i][vocabulary[token] - 1] = 1.0
# Build the `SingleAgentEpisode`.
episode = SingleAgentEpisode(
# Generate a unique ID.
id_=uuid.uuid4().hex,
# agent_id="default_policy",
# module_id="default_policy",
# We use the starting token with all added tokens as observations.
observations=[ohv for ohv in one_hot_vectors],
observation_space=observation_space,
# Actions are defined to be the "chosen" follow-up token after
# given the observation.
actions=encoded[1:],
action_space=action_space,
# Rewards are zero until the end of a sequence.
rewards=[0.0 for i in range(len(encoded) - 2)] + [1.0],
# The episode is always terminated (as sentences in the dataset are).
terminated=True,
truncated=False,
# No lookback. You want the episode to start at timestep zero.
len_lookback_buffer=0,
t_started=0,
)
# If episodes should be numpy'ized. Some connectors need this.
if to_numpy:
episode.to_numpy()
# Append the episode to the list of episodes.
episodes.append(episode)
# Return a batch with key `"episodes"`.
return {"episodes": episodes}
# Define dataset on sample data.
ds = data.read_text("s3://anonymous@ray-example-data/this.txt")
# Create a vocabulary.
tokens = []
for b in ds.iter_rows():
tokens.extend(b["text"].split(" "))
vocabulary = {token: idx for idx, token in enumerate(set(tokens), start=1)}
# Specify an `RLModule` and wrap it with a `MultiRLModuleSpec`. Note,
# on `Learner`` side any `RLModule` is an `MultiRLModule`.
module_spec = MultiRLModuleSpec(
rl_module_specs={
"default_policy": RLModuleSpec(
model_config=DefaultModelConfig(
conv_filters=[[16, 4, 2], [32, 4, 2], [64, 4, 2], [128, 4, 2]],
conv_activation="relu",
),
inference_only=False,
module_class=DefaultBCTorchRLModule,
catalog_class=BCCatalog,
action_space = gym.spaces.Discrete(len(vocabulary)),
observation_space=gym.spaces.Box(0.0, 1.0, (len(vocabulary), 1, 1), np.float32),
),
},
)
# Take a small batch.
batch = ds.take_batch(10)
# Build and instance your `OfflinePreLearner`.
oplr = TextOfflinePreLearner(
config=AlgorithmConfig(),
spaces=(
gym.spaces.Discrete(len(vocabulary)),
gym.spaces.Box(0.0, 1.0, (len(vocabulary), 1, 1), np.float32)),
module_spec=module_spec,
vocabulary=vocabulary,
)
# Run your `OfflinePreLearner`.
transformed = oplr(batch)
# Show the generated batch.
print(f"Batch: {batch}")
通过完全自定义 OfflinePreLearner 的能力,您可以设计定制的数据转换工作流。这包括定义特定的 learner connector pipeline 和实现原始数据映射,从而能够对文本数据进行从原始格式到 MultiAgentBatch 的多步处理。
要集成您的自定义 OfflinePreLearner,只需在您的 AlgorithmConfig 中指定它。
from ray.rllib.algorithms.algorithm_config import AlgorithmConfig
config = (
AlgorithmConfig()
.offline_data(
# Provide your custom `OfflinePreLearner`.
prelearner_class=TextOfflinePreLearner,
# Provide special keyword arguments your `OfflinePreLearner` needs.
prelearner_kwargs={
"vocabulary": vocabulary,
},
)
)
如果这些自定义功能仍然不能满足您的要求,请考虑转向 **Pipeline Level** 以获得更大的灵活性。
Pipeline level#
在 RLlib 的 Offline RL API 的这个级别,您可以通过重写 OfflineData 类来重新定义从数据读取到批次迭代的整个管道。但在大多数情况下,其他两个级别应该足以满足您的需求。操作整个管道需要谨慎处理,因为它可能会严重降低管道的性能。在开始编程自己的管道之前,请仔细研究 OfflineData 类,以充分了解默认管道的工作原理。主要有两个方法定义了这个管道:
定义数据读取过程的
__init__()方法。定义数据映射和批次迭代的
sample()方法。
例如,如果您有一些基础数据转换,例如将图像文件转换为 numpy 数组,您可以考虑重写 __init__() 方法。
import io
import logging
from typing import Any, Dict
import numpy as np
from PIL import Image
from ray import data
from ray.rllib.algorithms.algorithm_config import AlgorithmConfig
from ray.rllib.offline.offline_data import OfflineData
from ray.rllib.offline.offline_prelearner import OfflinePreLearner
from ray.rllib.utils.annotations import override
logger = logging.getLogger(__name__)
class ImageOfflineData(OfflineData):
"""This class overrides `OfflineData` to read in raw image data.
The image data is from Ray Data`s S3 example bucket, namely
`ray-example-data/batoidea/JPEGImages/`.
To read in this data the raw bytes have to be decoded and then
converted to `numpy` arrays. Each image array has a dimension
(32, 32, 3).
To just read in the raw image data and convert it to arrays it
suffices to override the `OfflineData.__init__` method only.
Note, that further transformations of the data - specifically
into `SingleAgentEpisode` data - will be performed in a custom
`OfflinePreLearner` defined in the `image_offline_prelearner`
file. You could hard-code the usage of this prelearner here,
but you will use the `prelearner_class` attribute in the
`AlgorithmConfig` instead.
"""
@override(OfflineData)
def __init__(self, config: AlgorithmConfig):
# Set class attributes.
self.config = config
self.is_multi_agent = self.config.is_multi_agent
self.materialize_mapped_data = False
self.path = self.config.input_
self.data_read_batch_size = self.config.input_read_batch_size
self.data_is_mapped = False
# Define your function to map images to numpy arrays.
def map_to_numpy(row: Dict[str, Any]) -> Dict[str, Any]:
# Convert to byte stream.
bytes_stream = io.BytesIO(row["bytes"])
# Convert to image.
image = Image.open(bytes_stream)
# Return an array of the image.
return {"array": np.array(image)}
try:
# Load the dataset and transform to arrays on-the-fly.
self.data = data.read_binary_files(self.path).map(map_to_numpy)
except Exception as e:
logger.error(e)
# Define further attributes needed in the `sample` method.
self.batch_iterator = None
self.map_batches_kwargs = self.config.map_batches_kwargs
self.iter_batches_kwargs = self.config.iter_batches_kwargs
# Use a custom OfflinePreLearner if needed.
self.prelearner_class = self.config.prelearner_class or OfflinePreLearner
# For remote learner setups.
self.locality_hints = None
self.learner_handles = None
self.module_spec = None
在提供的代码示例中,您定义了一个自定义的 OfflineData 类来处理图像数据的读取和预处理,将其从二进制编码格式转换为 numpy 数组。此外,您还实现了一个自定义的 OfflinePreLearner 来进一步处理这些数据,将其转换为适合学习器的 MultiAgentBatch 格式。
import random
import uuid
from typing import Any, Dict, List, Optional, Tuple, Union
import gymnasium as gym
import numpy as np
from ray.actor import ActorHandle
from ray.rllib.algorithms.algorithm_config import AlgorithmConfig
from ray.rllib.core.learner.learner import Learner
from ray.rllib.core.rl_module.multi_rl_module import MultiRLModuleSpec
from ray.rllib.env.single_agent_episode import SingleAgentEpisode
from ray.rllib.offline.offline_prelearner import SCHEMA, OfflinePreLearner
from ray.rllib.utils.annotations import override
from ray.rllib.utils.typing import EpisodeType, ModuleID
class ImageOfflinePreLearner(OfflinePreLearner):
"""This class transforms image data to `MultiAgentBatch`es.
While the `ImageOfflineData` class transforms raw image
bytes to `numpy` arrays, this class maps these data in
`SingleAgentEpisode` instances through the learner connector
pipeline and finally outputs a >`MultiAgentBatch` ready for
training in RLlib's `Learner`s.
Note, the basic transformation from images to `SingleAgentEpisode`
instances creates synthetic data that does not rely on any MDP
and therefore no agent can learn from it. However, this example
should show how to transform data into this form through
overriding the `OfflinePreLearner`.
"""
def __init__(
self,
config: "AlgorithmConfig",
learner: Union[Learner, List[ActorHandle]],
spaces: Optional[Tuple[gym.Space, gym.Space]] = None,
module_spec: Optional[MultiRLModuleSpec] = None,
module_state: Optional[Dict[ModuleID, Any]] = None,
**kwargs: Dict[str, Any],
):
# Set up necessary class attributes.
self.config = config
self.action_space = spaces[1]
self.observation_space = spaces[0]
self.input_read_episodes = self.config.input_read_episodes
self.input_read_sample_batches = self.config.input_read_sample_batches
self._policies_to_train = "default_policy"
self._is_multi_agent = False
# Build the `MultiRLModule` needed for the learner connector.
self._module = module_spec.build()
# Build the learner connector pipeline.
self._learner_connector = self.config.build_learner_connector(
input_observation_space=self.observation_space,
input_action_space=self.action_space,
)
@override(OfflinePreLearner)
@staticmethod
def _map_to_episodes(
is_multi_agent: bool,
batch: Dict[str, Union[list, np.ndarray]],
schema: Dict[str, str] = SCHEMA,
to_numpy: bool = False,
input_compress_columns: Optional[List[str]] = None,
observation_space: gym.Space = None,
action_space: gym.Space = None,
**kwargs: Dict[str, Any],
) -> Dict[str, List[EpisodeType]]:
# Define a container for the episodes.
episodes = []
# Batches come in as numpy arrays.
for i, obs in enumerate(batch["array"]):
# Construct your episode.
episode = SingleAgentEpisode(
id_=uuid.uuid4().hex,
observations=[obs, obs],
observation_space=observation_space,
actions=[action_space.sample()],
action_space=action_space,
rewards=[random.random()],
terminated=True,
truncated=False,
len_lookback_buffer=0,
t_started=0,
)
# Numpy'ize, if necessary.
if to_numpy:
episode.to_numpy()
# Store the episode in the container.
episodes.append(episode)
return {"episodes": episodes}
这演示了如何使用您自己的逻辑来定制整个离线数据管道。您可以使用以下代码运行该示例。
"""Example showing how to customize an offline data pipeline.
This example:
- demonstrates how you can customized your offline data pipeline.
- shows how you can override the `OfflineData` to read raw image
data and transform it into `numpy ` arrays.
- explains how you can override the `OfflinePreLearner` to
transform data further into `SingleAgentEpisode` instances that
can be processes by the learner connector pipeline.
How to run this script
----------------------
`python [script file name].py --checkpoint-at-end`
For debugging, use the following additional command line options
`--no-tune --num-env-runners=0`
which should allow you to set breakpoints anywhere in the RLlib code and
have the execution stop there for inspection and debugging.
For logging to your WandB account, use:
`--wandb-key=[your WandB API key] --wandb-project=[some project name]
--wandb-run-name=[optional: WandB run name (within the defined project)]`
Results to expect
-----------------
2024-12-03 19:59:23,043 INFO streaming_executor.py:109 -- Execution plan
of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadBinary] ->
TaskPoolMapOperator[Map(map_to_numpy)] -> LimitOperator[limit=128]
✔️ Dataset execution finished in 10.01 seconds: 100%|███████████████████
███████████████████████████████████████████████████████████████████████|
3.00/3.00 [00:10<00:00, 3.34s/ row]
- ReadBinary->SplitBlocks(11): Tasks: 0; Queued blocks: 0; Resources: 0.0
CPU, 0.0B object store: 100%|█████████████████████████████████████████|
3.00/3.00 [00:10<00:00, 3.34s/ row]
- Map(map_to_numpy): Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU,
0.0B object store: 100%|███████████████████████████████████████████████████|
3.00/3.00 [00:10<00:00, 3.34s/ row]
- limit=128: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 3.0KB object
store: 100%|██████████████████████████████████████████████████████████|
3.00/3.00 [00:10<00:00, 3.34s/ row]
Batch: {'batch': [MultiAgentBatch({}, env_steps=3)]}
"""
import gymnasium as gym
import numpy as np
from ray.rllib.algorithms.bc import BCConfig
from ray.rllib.algorithms.bc.bc_catalog import BCCatalog
from ray.rllib.algorithms.bc.torch.bc_torch_rl_module import BCTorchRLModule
from ray.rllib.core.rl_module.multi_rl_module import MultiRLModuleSpec
from ray.rllib.core.rl_module.rl_module import DefaultModelConfig, RLModuleSpec
from ray.rllib.examples.offline_rl.classes.image_offline_data import ImageOfflineData
from ray.rllib.examples.offline_rl.classes.image_offline_prelearner import (
ImageOfflinePreLearner,
)
# Create an Algorithm configuration.
# TODO: Make this an actually running/learning example with RLunplugged
# data from S3 and add this to the CI.
config = (
BCConfig()
.environment(
action_space=gym.spaces.Discrete(2),
observation_space=gym.spaces.Box(0, 255, (32, 32, 3), np.float32),
)
.offline_data(
input_=["s3://anonymous@ray-example-data/batoidea/JPEGImages/"],
prelearner_class=ImageOfflinePreLearner,
)
)
# Specify an `RLModule` and wrap it with a `MultiRLModuleSpec`. Note,
# on `Learner`` side any `RLModule` is an `MultiRLModule`.
module_spec = MultiRLModuleSpec(
rl_module_specs={
"default_policy": RLModuleSpec(
model_config=DefaultModelConfig(
conv_filters=[[16, 4, 2], [32, 4, 2], [64, 4, 2], [128, 4, 2]],
conv_activation="relu",
),
inference_only=False,
module_class=BCTorchRLModule,
catalog_class=BCCatalog,
action_space=gym.spaces.Discrete(2),
observation_space=gym.spaces.Box(0, 255, (32, 32, 3), np.float32),
),
},
)
# Construct your `OfflineData` class instance.
offline_data = ImageOfflineData(config)
# Check, how the data is transformed. Note, the
# example dataset has only 3 such images.
batch = offline_data.data.take_batch(3)
# Construct your `OfflinePreLearner`.
offline_prelearner = ImageOfflinePreLearner(
config=config,
learner=None,
spaces=(
config.observation_space,
config.action_space,
),
module_spec=module_spec,
)
# Transform the raw data to `MultiAgentBatch` data.
batch = offline_prelearner(batch)
# Show the transformed batch.
print(f"Batch: {batch}")
提示
请仔细考虑这种方法:在许多情况下,在 RLlib 的离线 RL API 之前将数据完全转换为合适的格式可能更有效。例如,在上面的示例中,您可以预先将整个图像数据集预处理为 numpy 数组,并利用 RLlib 的默认 OfflineData 类进行后续步骤。
Monitoring#
要有效地监控您的离线数据管道,请利用 Ray Data 的内置监控功能。重点确保您的离线数据流管道的所有阶段都在积极处理数据。此外,还要关注 Learner 实例,特别是 learner_update_timer,它应该保持较低的值——对于小型模型大约为 0.02——以表明数据处理和模型更新的效率。
注意
RLlib 不会将 Ray Data 指标包含在其结果中,也不会通过 Ray Tune 的 TBXLoggerCallback 在 Tensorboard 中显示它们。强烈建议启用 **Ray Dashboard**,可在 127.0.0.1:8265 访问,以获得全面的监控和洞察。
Input API#
您可以使用以下选项为代理配置经验输入:
def offline_data(
self,
*,
# Specify how to generate experiences:
# - A local directory or file glob expression (for example "/tmp/*.json").
# - A cloud storage path or file glob expression (for example "gs://rl/").
# - A list of individual file paths/URIs (for example ["/tmp/1.json",
# "s3://bucket/2.json"]).
# - A file or directory path in a given `input_filesystem`.
input_: Optional[Union[str, Callable[[IOContext], InputReader]]],
# Read method for the `ray.data.Dataset` to read in the
# offline data from `input_`. The default is `read_parquet` for Parquet
# files. See https://docs.rayai.org.cn/en/latest/data/api/input_output.html for
# more info about available read methods in `ray.data`.
input_read_method: Optional[Union[str, Callable]],
# Keyword args for `input_read_method`. These
# are passed into the read method without checking. Use these
# keyword args together with `map_batches_kwargs` and
# `iter_batches_kwargs` to tune the performance of the data pipeline. It
# is strongly recommended to rely on Ray Data's automatic read performance
# tuning
input_read_method_kwargs: Optional[Dict],
# Table schema for converting offline data to episodes.
# This schema maps the offline data columns to
# `ray.rllib.core.columns.Columns`:
# `{Columns.OBS: 'o_t', Columns.ACTIONS: 'a_t', ...}`. Columns in
# the data set that aren't mapped through this schema are sorted into
# episodes' `extra_model_outputs`. If no schema is passed in the default
# schema used is `ray.rllib.offline.offline_data.SCHEMA`. If your data set
# contains already the names in this schema, no `input_read_schema` is
# needed. The same applies, if the offline data is in RLlib's
# `EpisodeType` or old `SampleBatch` format
input_read_schema: Optional[Dict[str, str]],
# Whether offline data is already stored in RLlib's
# `EpisodeType` format, i.e. `ray.rllib.env.SingleAgentEpisode` (multi
# -agent is planned but not supported, yet). Reading episodes directly
# avoids additional transform steps and is usually faster and
# therefore the recommended format when your application remains fully
# inside of RLlib's schema. The other format is a columnar format and is
# agnostic to the RL framework used. Use the latter format, if you are
# unsure when to use the data or in which RL framework. The default is
# to read column data, i.e. `False`. `input_read_episodes` and
# `input_read_sample_batches` can't be `True` at the same time. See
# also `output_write_episodes` to define the output data format when
# recording.
input_read_episodes: Optional[bool],
# Whether offline data is stored in RLlib's old
# stack `SampleBatch` type. This is usually the case for older data
# recorded with RLlib in JSON line format. Reading in `SampleBatch`
# data needs extra transforms and might not concatenate episode chunks
# contained in different `SampleBatch`es in the data. If possible avoid
# to read `SampleBatch`es and convert them in a controlled form into
# RLlib's `EpisodeType` (i.e. `SingleAgentEpisode`). The default is
# `False`. `input_read_episodes` and `input_read_sample_batches` can't
# be True at the same time.
input_read_sample_batches: Optional[bool],
# Batch size to pull from the data set. This could
# differ from the `train_batch_size_per_learner`, if a dataset holds
# `EpisodeType` (i.e. `SingleAgentEpisode`) or `SampleBatch`, or any
# other data type that contains multiple timesteps in a single row of the
# dataset. In such cases a single batch of size
# `train_batch_size_per_learner` potentially pulls a multiple of
# `train_batch_size_per_learner` timesteps from the offline dataset. The
# default is `None` in which the `train_batch_size_per_learner` is pulled.
input_read_batch_size: Optional[int],
# A cloud filesystem to handle access to cloud storage when
# reading experiences. Can be "gcs" for Google Cloud Storage, "s3" for AWS
# S3 buckets, "abs" for Azure Blob Storage, or any filesystem supported
# by PyArrow. In general the file path is sufficient for accessing data
# from public or local storage systems. See
# https://arrow.apache.ac.cn/docs/python/filesystems.html for details.
input_filesystem: Optional[str],
# A dictionary holding the kwargs for the filesystem
# given by `input_filesystem`. See `gcsfs.GCSFilesystem` for GCS,
# `pyarrow.fs.S3FileSystem`, for S3, and `ablfs.AzureBlobFilesystem` for
# ABS filesystem arguments.
input_filesystem_kwargs: Optional[Dict],
# What input columns are compressed with LZ4 in the
# input data. If data is stored in RLlib's `SingleAgentEpisode` (
# `MultiAgentEpisode` not supported, yet). Note the providing
# `rllib.core.columns.Columns.OBS` also tries to decompress
# `rllib.core.columns.Columns.NEXT_OBS`.
input_compress_columns: Optional[List[str]],
# Whether the raw data should be materialized in memory.
# This boosts performance, but requires enough memory to avoid an OOM, so
# make sure that your cluster has the resources available. For very large
# data you might want to switch to streaming mode by setting this to
# `False` (default). If your algorithm doesn't need the RLModule in the
# Learner connector pipeline or all (learner) connectors are stateless
# you should consider setting `materialize_mapped_data` to `True`
# instead (and set `materialize_data` to `False`). If your data doesn't
# fit into memory and your Learner connector pipeline requires an RLModule
# or is stateful, set both `materialize_data` and
# `materialize_mapped_data` to `False`.
materialize_data: Optional[bool],
# Whether the data should be materialized after
# running it through the Learner connector pipeline (i.e. after running
# the `OfflinePreLearner`). This improves performance, but should only be
# used in case the (learner) connector pipeline doesn't require an
# RLModule and the (learner) connector pipeline is stateless. For example,
# MARWIL's Learner connector pipeline requires the RLModule for value
# function predictions and training batches would become stale after some
# iterations causing learning degradation or divergence. Also ensure that
# your cluster has enough memory available to avoid an OOM. If set to
# `True`, make sure that `materialize_data` is set to `False` to
# avoid materialization of two datasets. If your data doesn't fit into
# memory and your Learner connector pipeline requires an RLModule or is
# stateful, set both `materialize_data` and `materialize_mapped_data` to
# `False`.
materialize_mapped_data: Optional[bool],
# Keyword args for the `map_batches` method. These are
# passed into the `ray.data.Dataset.map_batches` method when sampling
# without checking. If no arguments passed in the default arguments
# `{'concurrency': max(2, num_learners), 'zero_copy_batch': True}` is
# used. Use these keyword args together with `input_read_method_kwargs`
# and `iter_batches_kwargs` to tune the performance of the data pipeline.
map_batches_kwargs: Optional[Dict],
# Keyword args for the `iter_batches` method. These are
# passed into the `ray.data.Dataset.iter_batches` method when sampling
# without checking. If no arguments are passed in, the default argument
# `{'prefetch_batches': 2}` is used. Use these keyword args
# together with `input_read_method_kwargs` and `map_batches_kwargs` to
# tune the performance of the data pipeline.
iter_batches_kwargs: Optional[Dict],
# An optional `OfflinePreLearner` class that's used to
# transform data batches in `ray.data.map_batches` used in the
# `OfflineData` class to transform data from columns to batches that can
# be used in the `Learner.update...()` methods. Override the
# `OfflinePreLearner` class and pass your derived class in here, if you
# need to make some further transformations specific for your data or
# loss. The default is `None`` which uses the base `OfflinePreLearner`
# defined in `ray.rllib.offline.offline_prelearner`.
prelearner_class: Optional[Type],
# An optional `EpisodeReplayBuffer` class is
# used to buffer experiences when data is in `EpisodeType` or
# RLlib's previous `SampleBatch` type format. In this case, a single
# data row may contain multiple timesteps and the buffer serves two
# purposes: (a) to store intermediate data in memory, and (b) to ensure
# that exactly `train_batch_size_per_learner` experiences are sampled
# per batch. The default is RLlib's `EpisodeReplayBuffer`.
prelearner_buffer_class: Optional[Type],
# Optional keyword arguments for initializing the
# `EpisodeReplayBuffer`. In most cases this is simply the `capacity`
# for the default buffer used (`EpisodeReplayBuffer`), but it may
# differ if the `prelearner_buffer_class` uses a custom buffer.
prelearner_buffer_kwargs: Optional[Dict],
# Number of updates to run in each learner
# during a single training iteration. If None, each learner runs a
# complete epoch over its data block (the dataset is partitioned into
# at least as many blocks as there are learners). The default is `None`.
# This must be set to `1`, if a single (local) learner is used.
dataset_num_iters_per_learner: Optional[int],
)
Output API#
您可以使用以下选项为代理配置经验输出:
def offline_data(
# Specify where experiences should be saved:
# - None: don't save any experiences
# - a path/URI to save to a custom output directory (for example, "s3://bckt/")
output: Optional[str],
# What sample batch columns to LZ4 compress in the output data.
# Note that providing `rllib.core.columns.Columns.OBS` also
# compresses `rllib.core.columns.Columns.NEXT_OBS`.
output_compress_columns: Optional[List[str]],
# Max output file size (in bytes) before rolling over to a new
# file.
output_max_file_size: Optional[float],
# Max output row numbers before rolling over to a new file.
output_max_rows_per_file: Optional[int],
# Write method for the `ray.data.Dataset` to write the
# offline data to `output`. The default is `read_parquet` for Parquet
# files. See https://docs.rayai.org.cn/en/latest/data/api/input_output.html for
# more info about available read methods in `ray.data`.
output_write_method: Optional[str],
# Keyword arguments for the `output_write_method`. These are
# passed into the write method without checking.
output_write_method_kwargs: Optional[Dict],
# A cloud filesystem to handle access to cloud storage when
# writing experiences. Can be "gcs" for Google Cloud Storage, "s3" for AWS
# S3 buckets, "abs" for Azure Blob Storage, or any filesystem supported
# by PyArrow. In general the file path is sufficient for accessing data
# from public or local storage systems. See
# https://arrow.apache.ac.cn/docs/python/filesystems.html for details.
output_filesystem: Optional[str],
# A dictionary holding the keyword arguments for the filesystem
# given by `output_filesystem`. See `gcsfs.GCSFilesystem` for GCS,
# `pyarrow.fs.S3FileSystem`, for S3, and `ablfs.AzureBlobFilesystem` for
# ABS filesystem arguments.
output_filesystem_kwargs: Optional[Dict],
# If data should be recorded in RLlib's `EpisodeType`
# format (i.e. `SingleAgentEpisode` objects). Use this format, if you
# need data to be ordered in time and directly grouped by episodes for
# example to train stateful modules or if you plan to use recordings
# exclusively in RLlib. Otherwise data is recorded in tabular (columnar)
# format. Default is `True`.
output_write_episodes: Optional[bool],