学习玩 Pong#
提示
对于生产级的分布式强化学习实现,请使用 Ray RLlib。
在此示例中,我们将使用 Gymnasium 训练一个非常简单的神经网络来玩 Pong。
从宏观上看,我们将使用多个 Ray actor 同时获取模拟轨迹并计算梯度。然后我们将这些梯度集中起来更新神经网络。更新后的神经网络将传递回每个 Ray actor 进行更多梯度计算。
此应用是根据 Andrej Karpathy 的 源代码 改编而来,修改极少(参见附带的博客文章)。
要运行此应用,首先安装一些依赖项。
pip install gymnasium[atari]==0.28.1
目前,在拥有 64 个物理核心的大型机器上,计算一个大小为 1 的批次更新需要大约 1 秒,大小为 10 的批次需要大约 2.5 秒。大小为 60 的批次需要大约 3 秒。在拥有 11 个节点(每个节点有 18 个物理核心)的集群上,大小为 300 的批次需要大约 10 秒。如果您看到的数据与这些数据有很大差异,请查看本页底部的故障排除部分,并考虑提交问题。
请注意,这些时间取决于轨迹耗时,而轨迹耗时又取决于策略的表现好坏。例如,一个非常糟糕的策略会很快输掉。随着策略的学习,我们预计这些数字会增加。
import numpy as np
import os
import ray
import time
import gymnasium as gym
超参数#
这里我们将定义一些使用的超参数。
H = 200 # The number of hidden layer neurons.
gamma = 0.99 # The discount factor for reward.
decay_rate = 0.99 # The decay factor for RMSProp leaky sum of grad^2.
D = 80 * 80 # The input dimensionality: 80x80 grid.
learning_rate = 1e-4 # Magnitude of the update.
辅助函数#
我们首先定义几个辅助函数
预处理:
preprocess
函数将原始的 210x160x3 uint8 帧预处理成一个一维的 6400 浮点向量。奖励处理:
process_rewards
函数将计算折现奖励。这个公式表明,采样动作的“价值”是之后所有奖励的加权和,但越晚的奖励其重要性呈指数级下降。轨迹:
rollout
函数会玩一整局 Pong 游戏(直到电脑或 RL 智能体输掉)。
def preprocess(img):
# Crop the image.
img = img[35:195]
# Downsample by factor of 2.
img = img[::2, ::2, 0]
# Erase background (background type 1).
img[img == 144] = 0
# Erase background (background type 2).
img[img == 109] = 0
# Set everything else (paddles, ball) to 1.
img[img != 0] = 1
return img.astype(float).ravel()
def process_rewards(r):
"""Compute discounted reward from a vector of rewards."""
discounted_r = np.zeros_like(r)
running_add = 0
for t in reversed(range(0, r.size)):
# Reset the sum, since this was a game boundary (pong specific!).
if r[t] != 0:
running_add = 0
running_add = running_add * gamma + r[t]
discounted_r[t] = running_add
return discounted_r
def rollout(model, env):
"""Evaluates env and model until the env returns "Terminated" or "Truncated".
Returns:
xs: A list of observations
hs: A list of model hidden states per observation
dlogps: A list of gradients
drs: A list of rewards.
"""
# Reset the game.
observation, info = env.reset()
# Note that prev_x is used in computing the difference frame.
prev_x = None
xs, hs, dlogps, drs = [], [], [], []
terminated = truncated = False
while not terminated and not truncated:
cur_x = preprocess(observation)
x = cur_x - prev_x if prev_x is not None else np.zeros(D)
prev_x = cur_x
aprob, h = model.policy_forward(x)
# Sample an action.
action = 2 if np.random.uniform() < aprob else 3
# The observation.
xs.append(x)
# The hidden state.
hs.append(h)
y = 1 if action == 2 else 0 # A "fake label".
# The gradient that encourages the action that was taken to be
# taken (see http://cs231n.github.io/neural-networks-2/#losses if
# confused).
dlogps.append(y - aprob)
observation, reward, terminated, truncated, info = env.step(action)
# Record reward (has to be done after we call step() to get reward
# for previous action).
drs.append(reward)
return xs, hs, dlogps, drs
神经网络#
这里,使用一个神经网络来定义玩 Pong 的“策略”(即,给定一个状态选择一个动作的函数)。
要在 NumPy 中实现神经网络,我们需要提供辅助函数来计算更新,并计算给定输入(在本例中是观测值)时神经网络的输出。
class Model(object):
"""This class holds the neural network weights."""
def __init__(self):
self.weights = {}
self.weights["W1"] = np.random.randn(H, D) / np.sqrt(D)
self.weights["W2"] = np.random.randn(H) / np.sqrt(H)
def policy_forward(self, x):
h = np.dot(self.weights["W1"], x)
h[h < 0] = 0 # ReLU nonlinearity.
logp = np.dot(self.weights["W2"], h)
# Softmax
p = 1.0 / (1.0 + np.exp(-logp))
# Return probability of taking action 2, and hidden state.
return p, h
def policy_backward(self, eph, epx, epdlogp):
"""Backward pass to calculate gradients.
Arguments:
eph: Array of intermediate hidden states.
epx: Array of experiences (observations).
epdlogp: Array of logps (output of last layer before softmax).
"""
dW2 = np.dot(eph.T, epdlogp).ravel()
dh = np.outer(epdlogp, self.weights["W2"])
# Backprop relu.
dh[eph <= 0] = 0
dW1 = np.dot(dh.T, epx)
return {"W1": dW1, "W2": dW2}
def update(self, grad_buffer, rmsprop_cache, lr, decay):
"""Applies the gradients to the model parameters with RMSProp."""
for k, v in self.weights.items():
g = grad_buffer[k]
rmsprop_cache[k] = decay * rmsprop_cache[k] + (1 - decay) * g ** 2
self.weights[k] += lr * g / (np.sqrt(rmsprop_cache[k]) + 1e-5)
def zero_grads(grad_buffer):
"""Reset the batch gradient buffer."""
for k, v in grad_buffer.items():
grad_buffer[k] = np.zeros_like(v)
并行化梯度#
我们定义一个 actor,它负责接收模型和环境,执行一次轨迹并计算梯度更新。
# This forces OpenMP to use 1 single thread, which is needed to
# prevent contention between multiple actors.
# See https://docs.rayai.org.cn/en/latest/ray-core/configure.html for
# more details.
os.environ["OMP_NUM_THREADS"] = "1"
# Tell numpy to only use one core. If we don't do this, each actor may
# try to use all of the cores and the resulting contention may result
# in no speedup over the serial version. Note that if numpy is using
# OpenBLAS, then you need to set OPENBLAS_NUM_THREADS=1, and you
# probably need to do it from the command line (so it happens before
# numpy is imported).
os.environ["MKL_NUM_THREADS"] = "1"
ray.init()
@ray.remote
class RolloutWorker(object):
def __init__(self):
self.env = gym.make("ale_py:ALE/Pong-v5")
def compute_gradient(self, model):
# Compute a simulation episode.
xs, hs, dlogps, drs = rollout(model, self.env)
reward_sum = sum(drs)
# Vectorize the arrays.
epx = np.vstack(xs)
eph = np.vstack(hs)
epdlogp = np.vstack(dlogps)
epr = np.vstack(drs)
# Compute the discounted reward backward through time.
discounted_epr = process_rewards(epr)
# Standardize the rewards to be unit normal (helps control the gradient
# estimator variance).
discounted_epr -= np.mean(discounted_epr)
discounted_epr /= np.std(discounted_epr)
# Modulate the gradient with advantage (the policy gradient magic
# happens right here).
epdlogp *= discounted_epr
return model.policy_backward(eph, epx, epdlogp), reward_sum
运行#
此示例易于并行化,因为网络可以并行玩十局游戏,并且游戏之间无需共享信息。
在循环中,网络重复玩 Pong 游戏并记录每局游戏的梯度。每十局游戏后,梯度会合并在一起用于更新网络。
iterations = 20
batch_size = 4
model = Model()
actors = [RolloutWorker.remote() for _ in range(batch_size)]
running_reward = None
# "Xavier" initialization.
# Update buffers that add up gradients over a batch.
grad_buffer = {k: np.zeros_like(v) for k, v in model.weights.items()}
# Update the rmsprop memory.
rmsprop_cache = {k: np.zeros_like(v) for k, v in model.weights.items()}
for i in range(1, 1 + iterations):
model_id = ray.put(model)
gradient_ids = []
# Launch tasks to compute gradients from multiple rollouts in parallel.
start_time = time.time()
gradient_ids = [actor.compute_gradient.remote(model_id) for actor in actors]
for batch in range(batch_size):
[grad_id], gradient_ids = ray.wait(gradient_ids)
grad, reward_sum = ray.get(grad_id)
# Accumulate the gradient over batch.
for k in model.weights:
grad_buffer[k] += grad[k]
running_reward = (
reward_sum
if running_reward is None
else running_reward * 0.99 + reward_sum * 0.01
)
end_time = time.time()
print(
"Batch {} computed {} rollouts in {} seconds, "
"running mean is {}".format(
i, batch_size, end_time - start_time, running_reward
)
)
model.update(grad_buffer, rmsprop_cache, learning_rate, decay_rate)
zero_grads(grad_buffer)