多智能体环境#
注意
Ray 2.40 默认使用 RLlib 的新 API 栈。Ray 团队已基本完成将算法、示例脚本和文档迁移到新的代码库。
如果您仍在使用旧的 API 栈,请参阅新 API 栈迁移指南了解如何迁移的详细信息。
在多智能体环境中,多个“智能体”可以同时行动、按回合顺序行动,或者通过两者的任意组合方式行动。
例如,在交通模拟中,可能有多个“汽车”和“交通灯”智能体同时交互,而在棋盘游戏中,两个或更多智能体可能按回合顺序行动。
可以使用几种不同的策略网络来控制各种智能体。因此,环境中的每个智能体都映射到恰好一个特定的策略。这种映射由用户提供的函数决定,称为“映射函数”。请注意,如果存在映射到 M
个策略的 N
个智能体,则 N
总是大于或等于 M
,允许任何策略控制多个智能体。
多智能体设置: N
个智能体位于环境中,并执行由 M
个策略网络计算出的动作。智能体到策略的映射是灵活的,由用户提供的映射函数决定。这里,agent_1
和 agent_3
都映射到 policy_1
,而 agent_2
映射到 policy_2
。#
RLlib 的 MultiAgentEnv API#
提示
本段描述了 RLlib 自己的 :py:class`~ray.rllib.env.multi_agent_env.MultiAgentEnv` API,这是定义您自己的多智能体环境逻辑的推荐方式。然而,如果您已经在使用第三方多智能体 API,RLlib 也提供了 Farama 的 PettingZoo API 和 DeepMind 的 OpenSpiel API 的包装器。
RLlib 的 :py:class`~ray.rllib.env.multi_agent_env.MultiAgentEnv` API 紧密遵循 Farama 的 gymnasium(单智能体)环境的约定和 API,甚至继承自 gymnasium.Env
。然而,自定义的 :py:class`~ray.rllib.env.multi_agent_env.MultiAgentEnv` 实现不是从 reset()
和 step()
发布单个观察、奖励以及终止/截断标志,而是输出字典,一个用于观察,一个用于奖励,等等。在每个这样的多智能体字典中,智能体 ID 映射到各自的单个智能体的观察/奖励/等。
这里是 :py:class`~ray.rllib.env.multi_agent_env.MultiAgentEnv` 实现示例的初稿
from ray.rllib.env.multi_agent_env import MultiAgentEnv
class MyMultiAgentEnv(MultiAgentEnv):
def __init__(self, config=None):
super().__init__()
...
def reset(self, *, seed=None, options=None):
...
# return observation dict and infos dict.
return {"agent_1": [obs of agent_1], "agent_2": [obs of agent_2]}, {}
def step(self, action_dict):
# return observation dict, rewards dict, termination/truncation dicts, and infos dict
return {"agent_1": [obs of agent_1]}, {...}, ...
智能体定义#
环境中的智能体数量及其 ID 完全由您的 :py:class`~ray.rllib.env.multi_agent_env.MultiAgentEnv` 代码控制。您的环境决定哪些智能体在 episode 重置后开始,哪些智能体稍后进入 episode,哪些智能体提前终止 episode,以及哪些智能体留在 episode 中直到整个 episode 结束。
要定义哪些智能体 ID 可能出现在您的 episode 中,请将 self.possible_agents
属性设置为所有可能智能体 ID 的列表。
def __init__(self, config=None):
super().__init__()
...
# Define all agent IDs that might even show up in your episodes.
self.possible_agents = ["agent_1", "agent_2"]
...
如果您的环境仅以部分智能体 ID 开始和/或在 episode 结束前终止部分智能体 ID,您还需要在整个 episode 过程中永久调整 self.agents
属性。另一方面,如果所有智能体 ID 在您的 episode 中是静态的,您可以将 self.agents
设置为与 self.possible_agents
相同,并且在您代码的其余部分不更改其值。
def __init__(self, config=None):
super().__init__()
...
# If your agents never change throughout the episode, set
# `self.agents` to the same list as `self.possible_agents`.
self.agents = self.possible_agents = ["agent_1", "agent_2"]
# Otherwise, you will have to adjust `self.agents` in `reset()` and `step()` to whatever the
# currently "alive" agents are.
...
观察空间和动作空间#
接下来,您应该在构造函数中设置每个(可能)智能体 ID 的观察空间和动作空间。使用 self.observation_spaces
和 self.action_spaces
属性来定义将智能体 ID 映射到各个智能体空间的字典。例如
import gymnasium as gym
import numpy as np
...
def __init__(self, config=None):
super().__init__()
...
self.observation_spaces = {
"agent_1": gym.spaces.Box(-1.0, 1.0, (4,), np.float32),
"agent_2": gym.spaces.Box(-1.0, 1.0, (3,), np.float32),
}
self.action_spaces = {
"agent_1": gym.spaces.Discrete(2),
"agent_2": gym.spaces.Box(0.0, 1.0, (1,), np.float32),
}
...
如果您的 episode 包含大量智能体,其中一些共享相同的观察空间或动作空间,并且您不想创建非常大的空间字典,您还可以覆盖 get_observation_space()
和 get_action_space()
方法,并自己实现从智能体 ID 到空间的映射逻辑。例如
def get_observation_space(self, agent_id):
if agent_id.startswith("robot_"):
return gym.spaces.Box(0, 255, (84, 84, 3), np.uint8)
elif agent_id.startswith("decision_maker"):
return gym.spaces.Discrete(2)
else:
raise ValueError(f"bad agent id: {agent_id}!")
观察、奖励和终止字典#
在自定义的 MultiAgentEnv
中,您还需要实现 reset()
和 step()
方法。与单智能体 gymnasium.Env 类似,您需要从 reset()
返回观察和信息,并从 step()
返回观察、奖励、终止/截断标志和信息。然而,这些返回值不再是单个值,而必须是字典,将智能体 ID 映射到各个智能体的对应值。
让我们先看看一个 reset()
实现示例
def reset(self, *, seed=None, options=None):
...
return {
"agent_1": np.array([0.0, 1.0, 0.0, 0.0]),
"agent_2": np.array([0.0, 0.0, 1.0]),
}, {} # <- empty info dict
在这里,您的 episode 从包含两个智能体开始,并且两个智能体都预期计算并发送动作,以用于后续的 step()
调用。
通常,返回的观察字典必须包含那些(且仅包含那些)应在下一步行动的智能体。不应在下一次 step()
调用中行动的智能体 ID,它们的观察不得出现在返回的观察字典中。
智能体同时行动的环境: 两个智能体在每个时间步都收到其观察结果,包括紧随 reset()
之后。请注意,每当返回的观察字典中存在该智能体的观察时,该智能体必须计算并发送一个动作到下一次 step()
调用中。#
请注意,观察字典决定智能体行动顺序的规则并不同样适用于奖励字典或终止/截断字典,这些字典在任何时间步都可能包含任何智能体 ID,无论该智能体 ID 是否预期在下一次 step()
调用中行动。这是因为智能体 A 执行的动作可能会触发智能体 B 的奖励,即使智能体 B 当前并未行动。终止标志也是如此:智能体 A 的行动方式可能会在智能体 B 本身未行动的情况下终止智能体 B 在 episode 中的参与。
注意
在终止字典和/或截断字典中使用特殊智能体 ID __all__
来指示 episode 应该结束所有智能体 ID 的参与,无论此时哪些智能体仍然活跃。在这种情况下,RLlib 会自动终止所有智能体并结束 episode。
总而言之,您的多智能体 episode 中智能体行动的精确顺序和同步由观察字典中包含的(或缺失的)智能体 ID 决定。只有那些预期计算并发送动作到下一次 step()
调用的智能体 ID 才必须是返回的观察字典的一部分。
智能体轮流行动的环境: 两个智能体通过轮流行动。 agent_1
在 reset()
后收到第一个观察结果,因此必须首先计算并发送一个动作。接收到此动作后,环境返回 agent_2
的观察结果,此时 agent_2
需要行动。接收到 agent_2
的动作后,环境返回 agent_1
的下一个观察结果,依此类推。#
这个简单的规则允许您设计任何类型的多智能体环境,从回合制游戏到所有智能体总是同时行动的环境,以及这两种模式的任意复杂组合。
回合顺序复杂的环境: 三个智能体以看似混乱的顺序行动。 agent_1
和 agent_3
在 reset()
后收到其初始观察结果,因此必须首先计算并发送动作。接收到这两个动作后,环境返回 agent_1
和 agent_2
的观察结果,此时它们必须同时行动。接收到 agent_1
和 agent_2
的动作后,环境返回 agent_2
和 agent_3
的观察结果,依此类推。#
让我们看看两个具体的完整 MultiAgentEnv
实现示例,一个示例中的智能体总是同时行动,另一个示例中的智能体按回合顺序行动。
示例:智能体同时行动的环境#
智能体总是同时行动的多智能体环境的一个很好的简单示例是石头剪刀布游戏,其中两个智能体总共需要进行 N 次移动,每次都在“石头”、“剪刀”或“布”动作中选择。每次移动后,比较动作选择。石头胜剪刀,布胜石头,剪刀胜布。赢得移动的玩家获得 +1 奖励,输家获得 -1 奖励。
这是您的石头剪刀布游戏的初始类框架
import gymnasium as gym
from ray.rllib.env.multi_agent_env import MultiAgentEnv
class RockPaperScissors(MultiAgentEnv):
"""Two-player environment for the famous rock paper scissors game.
Both players always move simultaneously over a course of 10 timesteps in total.
The winner of each timestep receives reward of +1, the losing player -1.0.
The observation of each player is the last opponent action.
"""
ROCK = 0
PAPER = 1
SCISSORS = 2
LIZARD = 3
SPOCK = 4
WIN_MATRIX = {
(ROCK, ROCK): (0, 0),
(ROCK, PAPER): (-1, 1),
(ROCK, SCISSORS): (1, -1),
(PAPER, ROCK): (1, -1),
(PAPER, PAPER): (0, 0),
(PAPER, SCISSORS): (-1, 1),
(SCISSORS, ROCK): (-1, 1),
(SCISSORS, PAPER): (1, -1),
(SCISSORS, SCISSORS): (0, 0),
}
接下来,您可以实现类的构造函数
def __init__(self, config=None):
super().__init__()
self.agents = self.possible_agents = ["player1", "player2"]
# The observations are always the last taken actions. Hence observation- and
# action spaces are identical.
self.observation_spaces = self.action_spaces = {
"player1": gym.spaces.Discrete(3),
"player2": gym.spaces.Discrete(3),
}
self.last_move = None
self.num_moves = 0
请注意,我们在构造函数中指定了 self.agents = self.possible_agents
,以表明智能体在 episode 过程中不会改变,并固定为 [player1, player2]
。
reset
逻辑是简单地在返回的观察字典中添加两个玩家(两个玩家都预期在下一次 step()
调用中同时行动),并重置一个 num_moves
计数器,该计数器跟踪已进行的移动次数,以便在恰好 10 个时间步(任一玩家行动 10 次)后终止 episode。
def reset(self, *, seed=None, options=None):
self.num_moves = 0
# The first observation should not matter (none of the agents has moved yet).
# Set them to 0.
return {
"player1": 0,
"player2": 0,
}, {} # <- empty infos dict
最后,您的 step
方法应该处理下一个观察(每个玩家观察对手刚刚选择的动作)、奖励(根据上面解释的赢家/输家规则,+1 或 -1),以及终止字典(仅当移动次数达到 10 时,您将特殊智能体 ID __all__
设置为 True
)。 truncateds 和 infos 字典总是保持为空。
def step(self, action_dict):
self.num_moves += 1
move1 = action_dict["player1"]
move2 = action_dict["player2"]
# Set the next observations (simply use the other player's action).
# Note that because we are publishing both players in the observations dict,
# we expect both players to act in the next `step()` (simultaneous stepping).
observations = {"player1": move2, "player2": move1}
# Compute rewards for each player based on the win-matrix.
r1, r2 = self.WIN_MATRIX[move1, move2]
rewards = {"player1": r1, "player2": r2}
# Terminate the entire episode (for all agents) once 10 moves have been made.
terminateds = {"__all__": self.num_moves >= 10}
# Leave truncateds and infos empty.
return observations, rewards, terminateds, {}, {}
请参见此处,获取一个完整的端到端示例脚本,演示如何针对您的 RockPaperScissors
环境运行多智能体 RLlib 设置。
示例:回合制环境#
现在,让我们来看另一个多智能体环境的实现示例,但这次您将实现一个回合制游戏,其中有两个玩家(A 和 B),由 A 开始游戏,然后 B 移动,然后又是 A,依此类推。
我们实现著名的井字棋游戏(有一点微小的变动),在 3x3 的棋盘上进行。每个玩家一次在棋盘上放置一个棋子。棋子一旦放置就不能移动。首先完成一行(水平、对角线或垂直)的玩家赢得游戏并获得 +1 奖励。输家获得 -1 奖励。为了简化实现,与原始游戏的变动是,尝试将棋子放在已占用的区域会导致棋盘完全不变,但移动的玩家会因此受到 -5 的惩罚奖励(在原始游戏中,这种移动是根本不允许发生的)。
这是您的井字棋游戏的初始类框架
import gymnasium as gym
import numpy as np
from ray.rllib.env.multi_agent_env import MultiAgentEnv
class TicTacToe(MultiAgentEnv):
"""A two-player game in which any player tries to complete one row in a 3x3 field.
The observation space is Box(0.0, 1.0, (9,)), where each index represents a distinct
field on a 3x3 board and values of 0.0 mean the field is empty, -1.0 means
the opponend owns the field, and 1.0 means we occupy the field:
----------
| 0| 1| 2|
----------
| 3| 4| 5|
----------
| 6| 7| 8|
----------
The action space is Discrete(9) and actions landing on an already occupied field
are simply ignored (and thus useless to the player taking these actions).
Once a player completes a row, they receive +1.0 reward, the losing player receives
-1.0 reward. In all other cases, both players receive 0.0 reward.
"""
在您的构造函数中,确保定义所有可能出现在游戏中的智能体 ID(“player1”和“player2”)、当前活跃的智能体 ID(与所有可能智能体相同),以及每个智能体的观察空间和动作空间。
def __init__(self, config=None):
super().__init__()
# Define the agents in the game.
self.agents = self.possible_agents = ["player1", "player2"]
# Each agent observes a 9D tensor, representing the 3x3 fields of the board.
# A 0 means an empty field, a 1 represents a piece of player 1, a -1 a piece of
# player 2.
self.observation_spaces = {
"player1": gym.spaces.Box(-1.0, 1.0, (9,), np.float32),
"player2": gym.spaces.Box(-1.0, 1.0, (9,), np.float32),
}
# Each player has 9 actions, encoding the 9 fields each player can place a piece
# on during their turn.
self.action_spaces = {
"player1": gym.spaces.Discrete(9),
"player2": gym.spaces.Discrete(9),
}
self.board = None
self.current_player = None
现在让我们实现您的 reset()
方法,在该方法中您清空棋盘(全部设置为 0),随机选择一个起始玩家,并返回该起始玩家的第一个观察结果。请注意,您不返回另一位玩家的观察结果,因为这位玩家不会在下一步行动。
def reset(self, *, seed=None, options=None):
self.board = [
0,
0,
0,
0,
0,
0,
0,
0,
0,
]
# Pick a random player to start the game.
self.current_player = np.random.choice(["player1", "player2"])
# Return observations dict (only with the starting player, which is the one
# we expect to act next).
return {
self.current_player: np.array(self.board, np.float32),
}, {}
从这里开始,在每次 step()
中,您总是在这两个智能体之间切换(您使用 self.current_player
属性进行跟踪),并且只返回当前智能体的观察结果,因为这是您希望接下来行动的玩家。
您还根据三个标准计算两个智能体的奖励:当前玩家是否获胜(对手输了)?当前玩家是否将棋子放在已占用的区域(受到惩罚)?游戏是否因为棋盘已满而结束(两个智能体都获得 0 奖励)?
def step(self, action_dict):
action = action_dict[self.current_player]
# Create a rewards-dict (containing the rewards of the agent that just acted).
rewards = {self.current_player: 0.0}
# Create a terminateds-dict with the special `__all__` agent ID, indicating that
# if True, the episode ends for all agents.
terminateds = {"__all__": False}
opponent = "player1" if self.current_player == "player2" else "player2"
# Penalize trying to place a piece on an already occupied field.
if self.board[action] != 0:
rewards[self.current_player] -= 5.0
# Change the board according to the (valid) action taken.
else:
self.board[action] = 1 if self.current_player == "player1" else -1
# After having placed a new piece, figure out whether the current player
# won or not.
if self.current_player == "player1":
win_val = [1, 1, 1]
else:
win_val = [-1, -1, -1]
if (
# Horizontal win.
self.board[:3] == win_val
or self.board[3:6] == win_val
or self.board[6:] == win_val
# Vertical win.
or self.board[0:7:3] == win_val
or self.board[1:8:3] == win_val
or self.board[2:9:3] == win_val
# Diagonal win.
or self.board[::3] == win_val
or self.board[2:7:2] == win_val
):
# Final reward is +5 for victory and -5 for a loss.
rewards[self.current_player] += 5.0
rewards[opponent] = -5.0
# Episode is done and needs to be reset for a new game.
terminateds["__all__"] = True
# The board might also be full w/o any player having won/lost.
# In this case, we simply end the episode and none of the players receives
# +1 or -1 reward.
elif 0 not in self.board:
terminateds["__all__"] = True
# Flip players and return an observations dict with only the next player to
# make a move in it.
self.current_player = opponent
return (
{self.current_player: np.array(self.board, np.float32)},
rewards,
terminateds,
{},
{},
)
智能体分组#
在多智能体强化学习中,常见的情况是存在智能体组,其中每个组都被视为一个具有元组动作和观察空间(元组中的每个项对应组中的每个单独智能体)的单智能体。
然后,可以将这样一组智能体分配给单个策略进行集中执行,或者分配给实现集中训练但分散执行的专门多智能体策略。
您可以使用 with_agent_groups()
方法来定义这些组
def with_agent_groups(
self,
groups: Dict[str, List[AgentID]],
obs_space: gym.Space = None,
act_space: gym.Space = None,
) -> "MultiAgentEnv":
"""Convenience method for grouping together agents in this env.
An agent group is a list of agent IDs that are mapped to a single
logical agent. All agents of the group must act at the same time in the
environment. The grouped agent exposes Tuple action and observation
spaces that are the concatenated action and obs spaces of the
individual agents.
The rewards of all the agents in a group are summed. The individual
agent rewards are available under the "individual_rewards" key of the
group info return.
Agent grouping is required to leverage algorithms such as Q-Mix.
Args:
groups: Mapping from group id to a list of the agent ids
of group members. If an agent id is not present in any group
value, it will be left ungrouped. The group id becomes a new agent ID
in the final environment.
obs_space: Optional observation space for the grouped
env. Must be a tuple space. If not provided, will infer this to be a
Tuple of n individual agents spaces (n=num agents in a group).
act_space: Optional action space for the grouped env.
Must be a tuple space. If not provided, will infer this to be a Tuple
of n individual agents spaces (n=num agents in a group).
.. testcode::
:skipif: True
from ray.rllib.env.multi_agent_env import MultiAgentEnv
class MyMultiAgentEnv(MultiAgentEnv):
# define your env here
...
env = MyMultiAgentEnv(...)
grouped_env = env.with_agent_groups(env, {
"group1": ["agent1", "agent2", "agent3"],
"group2": ["agent4", "agent5"],
})
"""
from ray.rllib.env.wrappers.group_agents_wrapper import \
GroupAgentsWrapper
return GroupAgentsWrapper(self, groups, obs_space, act_space)
对于具有多个组,或智能体组和单个智能体混合的环境,您可以结合前面章节中描述的策略映射 API 来使用分组功能。
第三方多智能体环境 API#
除了 RLlib 自己的 :py:class`~ray.rllib.env.multi_agent_env.MultiAgentEnv` API,您还可以使用各种第三方 API 和库来实现自定义的多智能体环境。
Farama PettingZoo#
PettingZoo 提供了一个包含 50 多个多样化多智能体环境的仓库,通过内置的 PettingZooEnv
包装器直接与 RLlib 兼容
from pettingzoo.butterfly import pistonball_v6
from ray.rllib.algorithms.ppo import PPOConfig
from ray.rllib.env.wrappers.pettingzoo_env import PettingZooEnv
from ray.tune.registry import register_env
register_env(
"pistonball",
lambda cfg: PettingZooEnv(pistonball_v6.env(num_floors=cfg.get("n_pistons", 20))),
)
config = (
PPOConfig()
.environment("pistonball", env_config={"n_pistons": 30})
)
请参见此处的示例脚本,获取使用 water world 环境的端到端示例。
DeepMind OpenSpiel#
DeepMind 的 OpenSpiel API 是一个综合框架,旨在用于多智能体强化学习、博弈论和决策研究与开发。该 API 通过内置的 PettingZooEnv
包装器直接与 RLlib 兼容
import pyspiel # pip install open_spiel
from ray.rllib.algorithms.ppo import PPOConfig
from ray.rllib.env.wrappers.open_spiel import OpenSpielEnv
from ray.tune.registry import register_env
register_env(
"open_spiel_env",
lambda cfg: OpenSpielEnv(pyspiel.load_game("connect_four")),
)
config = PPOConfig().environment("open_spiel_env")
请参见此处,获取使用自博弈策略,通过 RLlib 算法训练 OpenSpiel 的 Connect-4 环境的端到端示例。
使用 MultiAgentEnv 运行实际训练实验#
如果所有智能体使用相同的算法类来训练其策略,请按如下方式配置多智能体训练:
from ray.rllib.algorithm.ppo import PPOConfig
from ray.rllib.core.rl_module.multi_rl_module import MultiRLModuleSpec
from ray.rllib.core.rl_module.rl_module import RLModuleSpec
config = (
PPOConfig()
.environment(env="my_multiagent_env")
.multi_agent(
policy_mapping_fn=lambda agent_id, episode, **kwargs: (
"traffic_light" if agent_id.startswith("traffic_light_")
else random.choice(["car1", "car2"])
),
algorithm_config_overrides_per_module={
"car1": PPOConfig.overrides(gamma=0.85),
"car2": PPOConfig.overrides(lr=0.00001),
},
)
.rl_module(
rl_module_spec=MultiRLModuleSpec(rl_module_specs={
"car1": RLModuleSpec(),
"car2": RLModuleSpec(),
"traffic_light": RLModuleSpec(),
}),
)
)
algo = config.build()
print(algo.train())
要排除某些策略不进行更新,请使用 config.multi_agent(policies_to_train=[..])
配置设置。这允许在混合了非学习策略和学习策略的多智能体环境中运行。
def policy_mapping_fn(agent_id, episode, **kwargs):
agent_idx = int(agent_id[-1]) # 0 (player1) or 1 (player2)
return "learning_policy" if episode.id_ % 2 == agent_idx else "random_policy"
config = (
PPOConfig()
.environment(env="two_player_game")
.multi_agent(
policy_mapping_fn=policy_mapping_fn,
policies_to_train=["learning_policy"],
)
.rl_module(
rl_module_spec=MultiRLModuleSpec(rl_module_specs={
"learning_policy": RLModuleSpec(),
"random_policy": RLModuleSpec(rl_module_class=RandomRLModule),
}),
)
)
algo = config.build()
print(algo.train())
RLlib 将根据提供的 policy_mapping_fn
创建并将决策路由到每个策略。每个策略的训练统计数据会在 train()
返回的结果字典中单独报告。
示例脚本 rock_paper_scissors_heuristic_vs_learned.py 和 rock_paper_scissors_learned_vs_learned.py 演示了启发式策略和学习策略之间的竞争。
每个 EnvRunner 扩展到多个 MultiAgentEnv#
注意
与单智能体环境不同,多智能体设置目前尚不可向量化。Ray 团队正在通过利用 gymnasium >= 1.x
的自定义向量化功能来解决这一限制。
策略之间的变量共享#
RLlib 支持策略之间的变量共享。
请参阅PettingZoo 参数共享示例了解详细信息。