多智能体环境#
在多智能体环境中,多个“智能体”同时动作,按回合顺序动作,或通过两者的任意组合动作。
例如,在交通模拟中,可能存在多个“汽车”和“交通信号灯”智能体同时交互,而在棋盘游戏中,两个或多个智能体可能按回合顺序动作。
可以使用几个不同的策略网络来控制各种智能体。因此,环境中的每个智能体都映射到一个特定的策略。此映射由用户提供的函数“映射函数”确定。请注意,如果存在 N 个智能体映射到 M 个策略,则 N 始终大于或等于 M,允许任何策略控制一个以上的智能体。
多智能体设置: N 个智能体生活在环境中,并采取由 M 个策略网络计算的动作。从智能体到策略的映射是灵活的,由用户提供的映射函数决定。此处,agent_1 和 agent_3 都映射到 policy_1,而 agent_2 映射到 policy_2。#
RLlib 的 MultiAgentEnv API#
提示
本段描述了 RLlib 自带的 MultiAgentEnv API,这是定义自定义多智能体环境逻辑的推荐方式。但是,如果您已在使用第三方多智能体 API,RLlib 也为 Farama 的 PettingZoo API 和 DeepMind 的 OpenSpiel API 提供了封装。
RLlib 的 MultiAgentEnv API 紧密遵循 Farama 的 gymnasium(单智能体) 环境的约定和 API,甚至继承自 gymnasium.Env。但它不是从 reset() 和 step() 返回单个的观察、奖励和终止/截断标志,而是自定义的 MultiAgentEnv 实现会为观察、奖励等输出单独的字典,其中每个字典将智能体 ID 映射到每个智能体的相应值。
这里是自定义 MultiAgentEnv 实现的初稿示例
from ray.rllib.env.multi_agent_env import MultiAgentEnv
class MyMultiAgentEnv(MultiAgentEnv):
def __init__(self, config=None):
super().__init__()
...
def reset(self, *, seed=None, options=None):
...
# return observation dict and infos dict.
return {"agent_1": [obs of agent_1], "agent_2": [obs of agent_2]}, {}
def step(self, action_dict):
# return observation dict, rewards dict, termination/truncation dicts, and infos dict
return {"agent_1": [obs of agent_1]}, {...}, ...
智能体定义#
您的环境中智能体的数量及其 ID 完全由您的 MultiAgentEnv 代码控制。您的环境决定了哪些智能体在回合重置后开始,哪些智能体在稍后进入回合,哪些智能体提前终止回合,哪些智能体留在回合直到整个回合结束。
要定义您的回合中可能出现的智能体 ID,请将 self.possible_agents 属性设置为所有可能智能体 ID 的列表。
def __init__(self, config=None):
super().__init__()
...
# Define all agent IDs that might even show up in your episodes.
self.possible_agents = ["agent_1", "agent_2"]
...
如果您的环境只以部分智能体 ID 开始,并且/或者在回合结束前终止某些智能体 ID,您还需要在回合过程中永久调整 self.agents 属性。如果另一方面,所有智能体 ID 在您的回合中都是静态的,您可以将 self.agents 设置为与 self.possible_agents 相同,并且在代码的其余部分不更改其值。
def __init__(self, config=None):
super().__init__()
...
# If your agents never change throughout the episode, set
# `self.agents` to the same list as `self.possible_agents`.
self.agents = self.possible_agents = ["agent_1", "agent_2"]
# Otherwise, you will have to adjust `self.agents` in `reset()` and `step()` to whatever the
# currently "alive" agents are.
...
观察空间和动作空间#
接下来,您应该在构造函数中设置每个(可能的)智能体 ID 的观察空间和动作空间。使用 self.observation_spaces 和 self.action_spaces 属性来定义将智能体 ID 映射到各个智能体空间的字典。例如:
import gymnasium as gym
import numpy as np
...
def __init__(self, config=None):
super().__init__()
...
self.observation_spaces = {
"agent_1": gym.spaces.Box(-1.0, 1.0, (4,), np.float32),
"agent_2": gym.spaces.Box(-1.0, 1.0, (3,), np.float32),
}
self.action_spaces = {
"agent_1": gym.spaces.Discrete(2),
"agent_2": gym.spaces.Box(0.0, 1.0, (1,), np.float32),
}
...
如果您的回合包含大量智能体,其中一些共享相同的观察空间或动作空间,而您不想创建非常大的空间字典,您还可以覆盖 get_observation_space() 和 get_action_space() 方法,并自行实现从智能体 ID 到空间的映射逻辑。例如:
def get_observation_space(self, agent_id):
if agent_id.startswith("robot_"):
return gym.spaces.Box(0, 255, (84, 84, 3), np.uint8)
elif agent_id.startswith("decision_maker"):
return gym.spaces.Discrete(2)
else:
raise ValueError(f"bad agent id: {agent_id}!")
观察、奖励和终止字典#
您需要在自定义 MultiAgentEnv 中实现的另外两件事是 reset() 和 step() 方法。与单智能体 gymnasium.Env 等效,您必须从 reset() 返回观察和信息,并从 step() 返回观察、奖励、终止/截断标志和信息,但这些都必须是映射智能体 ID 到各自智能体值的字典,而不是单个值。
让我们先看一个示例 reset() 实现
def reset(self, *, seed=None, options=None):
...
return {
"agent_1": np.array([0.0, 1.0, 0.0, 0.0]),
"agent_2": np.array([0.0, 0.0, 1.0]),
}, {} # <- empty info dict
这里,您的回合以两个智能体开始,并且都期望在下一个 step() 调用中计算并发送动作。
通常,返回的观察字典必须包含(并且仅包含)下一个应该动作的智能体。不应在下一个 step() 调用中动作的智能体 ID 必须不包含在返回的观察字典中。
同时动作的智能体环境: 两个智能体在每个时间步接收它们的观察,包括在 reset() 之后。请注意,只要返回的观察字典中存在该智能体的观察,智能体就必须计算并发送一个动作到下一个 step() 调用。#
请注意,观察字典确定智能体动作确切顺序的规则并不同等适用于奖励字典或终止/截断字典,它们都可能在任何时间步包含任何智能体 ID,而不管该智能体 ID 是否期望在下一个 step() 调用中动作。这是因为智能体 A 的一个动作可能会为智能体 B 触发奖励,即使智能体 B 当前没有自己动作。终止标志也是如此:智能体 A 的动作可能会以某种方式终止智能体 B 的回合,而智能体 B 甚至没有自己动作。
注意
使用特殊的智能体 ID __all__ 在终止字典和/或截断字典中,表示回合应为所有智能体 ID 结束,无论当时哪些智能体仍处于活动状态。RLlib 在这种情况下会自动终止所有智能体并结束回合。
总之,您的多智能体回合中智能体动作的确切顺序和同步是通过包含在(或缺少)您的观察字典中的智能体 ID 来确定的。只有那些期望在下一个 step() 调用中计算并发送动作的智能体 ID 必须包含在返回的观察字典中。
轮流动作的智能体环境: 两个智能体轮流动作。agent_1 在 reset() 后接收第一个观察,因此必须先计算并发送动作。收到此动作后,环境会返回 agent_2 的观察,它现在必须动作。收到 agent_2 的动作后,将返回 agent_1 的下一个观察,依此类推。#
这个简单的规则允许您设计任何类型的多智能体环境,从回合制游戏到所有智能体始终同时动作的环境,再到这两者任意复杂组合的模式。
复杂轮次顺序的环境: 三个智能体以看似混乱的顺序动作。agent_1 和 agent_3 在 reset() 后接收它们的初始观察,因此必须先计算并发送动作。收到这两个动作后,环境会返回 agent_1 和 agent_2 的观察,它们现在必须同时动作。收到 agent_1 和 agent_2 的动作后,将返回 agent_2 和 agent_3 的观察,依此类推。#
让我们看两个具体的、完整的 MultiAgentEnv 示例实现,一个是智能体始终同时动作,另一个是智能体按回合顺序动作。
示例:同时步进智能体的环境#
一个良好且简单的多智能体环境示例,其中所有智能体始终同时步进,是“石头剪刀布”游戏,其中两个智能体总共需要玩 N 局,每局都从“石头”、“剪刀”或“布”这三个动作中选择一个。每局之后,比较动作选择。石头赢剪刀,剪刀赢布,布赢石头。赢得该局的玩家获得 +1 奖励,输掉的玩家获得 -1 奖励。
这是您“石头剪刀布”游戏的初始类框架
import gymnasium as gym
from ray.rllib.env.multi_agent_env import MultiAgentEnv
class RockPaperScissors(MultiAgentEnv):
"""Two-player environment for the famous rock paper scissors game.
Both players always move simultaneously over a course of 10 timesteps in total.
The winner of each timestep receives reward of +1, the losing player -1.0.
The observation of each player is the last opponent action.
"""
ROCK = 0
PAPER = 1
SCISSORS = 2
LIZARD = 3
SPOCK = 4
WIN_MATRIX = {
(ROCK, ROCK): (0, 0),
(ROCK, PAPER): (-1, 1),
(ROCK, SCISSORS): (1, -1),
(PAPER, ROCK): (1, -1),
(PAPER, PAPER): (0, 0),
(PAPER, SCISSORS): (-1, 1),
(SCISSORS, ROCK): (-1, 1),
(SCISSORS, PAPER): (1, -1),
(SCISSORS, SCISSORS): (0, 0),
}
接下来,您可以实现类的构造函数
def __init__(self, config=None):
super().__init__()
self.agents = self.possible_agents = ["player1", "player2"]
# The observations are always the last taken actions. Hence observation- and
# action spaces are identical.
self.observation_spaces = self.action_spaces = {
"player1": gym.spaces.Discrete(3),
"player2": gym.spaces.Discrete(3),
}
self.last_move = None
self.num_moves = 0
请注意,我们在构造函数中指定了 self.agents = self.possible_agents,以表明智能体在回合过程中不会改变,并固定为 [player1, player2]。
reset 逻辑是简单地在返回的观察字典中添加两个玩家(两个玩家期望在下一个 step() 调用中同时动作),并重置一个 num_moves 计数器,该计数器跟踪正在进行的移动次数,以便在正好 10 个时间步(每个玩家 10 个动作)后终止回合。
def reset(self, *, seed=None, options=None):
self.num_moves = 0
# The first observation should not matter (none of the agents has moved yet).
# Set them to 0.
return {
"player1": 0,
"player2": 0,
}, {} # <- empty infos dict
最后,您的 step 方法应处理下一个观察(每个玩家观察对手刚刚选择的动作),奖励(根据上述输赢规则为 +1 或 -1),以及终止字典(如果移动次数达到 10,则将特殊智能体 ID __all__ 设置为 True)。截断字典和信息字典始终为空。
def step(self, action_dict):
self.num_moves += 1
move1 = action_dict["player1"]
move2 = action_dict["player2"]
# Set the next observations (simply use the other player's action).
# Note that because we are publishing both players in the observations dict,
# we expect both players to act in the next `step()` (simultaneous stepping).
observations = {"player1": move2, "player2": move1}
# Compute rewards for each player based on the win-matrix.
r1, r2 = self.WIN_MATRIX[move1, move2]
rewards = {"player1": r1, "player2": r2}
# Terminate the entire episode (for all agents) once 10 moves have been made.
terminateds = {"__all__": self.num_moves >= 10}
# Leave truncateds and infos empty.
return observations, rewards, terminateds, {}, {}
请点击此处查看完整的端到端示例脚本,展示如何运行多智能体 RLlib 设置,以您自己的 RockPaperScissors 环境。
示例:回合制环境#
现在让我们逐步介绍另一个多智能体环境示例实现,但这次您实现一个回合制游戏,其中有两个玩家(A 和 B),A 先开始游戏,然后 B 移动,然后 A 再移动,依此类推。
我们实现著名的井字棋游戏(有一个小变动),在 3x3 的棋盘上进行。每个玩家一次放置一个自己的棋子。一旦放置,棋子就不能移动。第一个完成一行(水平、对角线或垂直)的玩家赢得比赛,并获得 +1 奖励。输掉的玩家获得 -1 奖励。为了简化实现,原游戏的变动是尝试将棋子放在一个已被占用的格子上会导致棋盘完全不变,但移动的玩家会受到 -5 的惩罚(在原游戏中,此移动根本不允许,因此永远不会发生)。
这是您井字棋游戏的初始类框架
import gymnasium as gym
import numpy as np
from ray.rllib.env.multi_agent_env import MultiAgentEnv
class TicTacToe(MultiAgentEnv):
"""A two-player game in which any player tries to complete one row in a 3x3 field.
The observation space is Box(0.0, 1.0, (9,)), where each index represents a distinct
field on a 3x3 board and values of 0.0 mean the field is empty, -1.0 means
the opponend owns the field, and 1.0 means we occupy the field:
----------
| 0| 1| 2|
----------
| 3| 4| 5|
----------
| 6| 7| 8|
----------
The action space is Discrete(9) and actions landing on an already occupied field
are simply ignored (and thus useless to the player taking these actions).
Once a player completes a row, they receive +1.0 reward, the losing player receives
-1.0 reward. In all other cases, both players receive 0.0 reward.
"""
在您的构造函数中,确保您定义了游戏中可能出现的所有智能体 ID(“player1”和“player2”)、当前活动的智能体 ID(与所有可能的智能体相同)以及每个智能体的观察空间和动作空间。
def __init__(self, config=None):
super().__init__()
# Define the agents in the game.
self.agents = self.possible_agents = ["player1", "player2"]
# Each agent observes a 9D tensor, representing the 3x3 fields of the board.
# A 0 means an empty field, a 1 represents a piece of player 1, a -1 a piece of
# player 2.
self.observation_spaces = {
"player1": gym.spaces.Box(-1.0, 1.0, (9,), np.float32),
"player2": gym.spaces.Box(-1.0, 1.0, (9,), np.float32),
}
# Each player has 9 actions, encoding the 9 fields each player can place a piece
# on during their turn.
self.action_spaces = {
"player1": gym.spaces.Discrete(9),
"player2": gym.spaces.Discrete(9),
}
self.board = None
self.current_player = None
现在让我们实现您的 reset() 方法,在此方法中,您会清空棋盘(将其设置为所有 0),选择一个随机的起始玩家,并返回该起始玩家的第一个观察。请注意,您不返回另一个玩家的观察,因为该玩家不是下一个要动作的。
def reset(self, *, seed=None, options=None):
self.board = [
0,
0,
0,
0,
0,
0,
0,
0,
0,
]
# Pick a random player to start the game.
self.current_player = np.random.choice(["player1", "player2"])
# Return observations dict (only with the starting player, which is the one
# we expect to act next).
return {
self.current_player: np.array(self.board, np.float32),
}, {}
从这里开始,在每个 step() 中,您总是在两个智能体之间切换(您使用 self.current_player 属性来跟踪),并且只返回当前智能体的观察,因为那是您想要下一个动作的玩家。
您还根据三个标准计算两个智能体的奖励:当前玩家是否赢了(对手输了)?当前玩家是否将棋子放在了已被占用的格子上(受到惩罚)?游戏是否因为棋盘已满而结束(两个智能体都获得 0 奖励)?
def step(self, action_dict):
action = action_dict[self.current_player]
# Create a rewards-dict (containing the rewards of the agent that just acted).
rewards = {self.current_player: 0.0}
# Create a terminateds-dict with the special `__all__` agent ID, indicating that
# if True, the episode ends for all agents.
terminateds = {"__all__": False}
opponent = "player1" if self.current_player == "player2" else "player2"
# Penalize trying to place a piece on an already occupied field.
if self.board[action] != 0:
rewards[self.current_player] -= 5.0
# Change the board according to the (valid) action taken.
else:
self.board[action] = 1 if self.current_player == "player1" else -1
# After having placed a new piece, figure out whether the current player
# won or not.
if self.current_player == "player1":
win_val = [1, 1, 1]
else:
win_val = [-1, -1, -1]
if (
# Horizontal win.
self.board[:3] == win_val
or self.board[3:6] == win_val
or self.board[6:] == win_val
# Vertical win.
or self.board[0:7:3] == win_val
or self.board[1:8:3] == win_val
or self.board[2:9:3] == win_val
# Diagonal win.
or self.board[::3] == win_val
or self.board[2:7:2] == win_val
):
# Final reward is +5 for victory and -5 for a loss.
rewards[self.current_player] += 5.0
rewards[opponent] = -5.0
# Episode is done and needs to be reset for a new game.
terminateds["__all__"] = True
# The board might also be full w/o any player having won/lost.
# In this case, we simply end the episode and none of the players receives
# +1 or -1 reward.
elif 0 not in self.board:
terminateds["__all__"] = True
# Flip players and return an observations dict with only the next player to
# make a move in it.
self.current_player = opponent
return (
{self.current_player: np.array(self.board, np.float32)},
rewards,
terminateds,
{},
{},
)
分组智能体#
在多智能体 RL 中,通常会将智能体分组,每个组像单个智能体一样处理,具有元组(Tuple)的动作和观察空间(元组中每个项目对应一个单独的智能体)。
然后,这样的智能体组可以分配给单个策略进行集中式执行,或分配给实现集中式训练但分布式执行的专用多智能体策略。
您可以使用 with_agent_groups() 方法来定义这些组。
def with_agent_groups(
self,
groups: Dict[str, List[AgentID]],
obs_space: gym.Space = None,
act_space: gym.Space = None,
) -> "MultiAgentEnv":
"""Convenience method for grouping together agents in this env.
An agent group is a list of agent IDs that are mapped to a single
logical agent. All agents of the group must act at the same time in the
environment. The grouped agent exposes Tuple action and observation
spaces that are the concatenated action and obs spaces of the
individual agents.
The rewards of all the agents in a group are summed. The individual
agent rewards are available under the "individual_rewards" key of the
group info return.
Agent grouping is required to leverage algorithms such as Q-Mix.
Args:
groups: Mapping from group id to a list of the agent ids
of group members. If an agent id is not present in any group
value, it will be left ungrouped. The group id becomes a new agent ID
in the final environment.
obs_space: Optional observation space for the grouped
env. Must be a tuple space. If not provided, will infer this to be a
Tuple of n individual agents spaces (n=num agents in a group).
act_space: Optional action space for the grouped env.
Must be a tuple space. If not provided, will infer this to be a Tuple
of n individual agents spaces (n=num agents in a group).
.. testcode::
:skipif: True
from ray.rllib.env.multi_agent_env import MultiAgentEnv
class MyMultiAgentEnv(MultiAgentEnv):
# define your env here
...
env = MyMultiAgentEnv(...)
grouped_env = env.with_agent_groups(env, {
"group1": ["agent1", "agent2", "agent3"],
"group2": ["agent4", "agent5"],
})
"""
from ray.rllib.env.wrappers.group_agents_wrapper import GroupAgentsWrapper
return GroupAgentsWrapper(self, groups, obs_space, act_space)
对于具有多个组的环境,或智能体组与独立智能体的混合,您可以将分组与前面部分所述的策略映射 API 结合使用。
第三方多智能体环境 API#
除了 RLlib 自带的 MultiAgentEnv API 外,您还可以使用各种第三方 API 和库来实现自定义多智能体环境。
Farama PettingZoo#
PettingZoo 提供了一个包含 50 多个多样化多智能体环境的存储库,可通过内置的 PettingZooEnv 封装直接与 RLlib 兼容。
from pettingzoo.butterfly import pistonball_v6
from ray.rllib.algorithms.ppo import PPOConfig
from ray.rllib.env.wrappers.pettingzoo_env import PettingZooEnv
from ray.tune.registry import register_env
register_env(
"pistonball",
lambda cfg: PettingZooEnv(pistonball_v6.env(num_floors=cfg.get("n_pistons", 20))),
)
config = (
PPOConfig()
.environment("pistonball", env_config={"n_pistons": 30})
)
请参阅此处的示例脚本,了解有关 water world 环境 的完整端到端示例。
DeepMind OpenSpiel#
DeepMind 的 OpenSpiel API 是一个全面的框架,专为多智能体强化学习、博弈论和决策的研究和开发而设计。该 API 可通过内置的 PettingZooEnv 封装直接与 RLlib 兼容。
import pyspiel # pip install open_spiel
from ray.rllib.algorithms.ppo import PPOConfig
from ray.rllib.env.wrappers.open_spiel import OpenSpielEnv
from ray.tune.registry import register_env
register_env(
"open_spiel_env",
lambda cfg: OpenSpielEnv(pyspiel.load_game("connect_four")),
)
config = PPOConfig().environment("open_spiel_env")
有关使用 RLlib 算法通过自对弈策略训练的 Connect-4 环境的端到端示例,请点击此处。
使用 MultiAgentEnv 运行实际的训练实验#
如果所有智能体都使用相同的算法类来训练它们的策略,请按如下方式配置多智能体训练:
from ray.rllib.algorithm.ppo import PPOConfig
from ray.rllib.core.rl_module.multi_rl_module import MultiRLModuleSpec
from ray.rllib.core.rl_module.rl_module import RLModuleSpec
config = (
PPOConfig()
.environment(env="my_multiagent_env")
.multi_agent(
policy_mapping_fn=lambda agent_id, episode, **kwargs: (
"traffic_light" if agent_id.startswith("traffic_light_")
else random.choice(["car1", "car2"])
),
algorithm_config_overrides_per_module={
"car1": PPOConfig.overrides(gamma=0.85),
"car2": PPOConfig.overrides(lr=0.00001),
},
)
.rl_module(
rl_module_spec=MultiRLModuleSpec(rl_module_specs={
"car1": RLModuleSpec(),
"car2": RLModuleSpec(),
"traffic_light": RLModuleSpec(),
}),
)
)
algo = config.build()
print(algo.train())
要从更新中排除某些策略,请使用 config.multi_agent(policies_to_train=[..]) 配置设置。这允许在多智能体环境中运行,混合了非学习策略和学习策略。
def policy_mapping_fn(agent_id, episode, **kwargs):
agent_idx = int(agent_id[-1]) # 0 (player1) or 1 (player2)
return "learning_policy" if episode.id_ % 2 == agent_idx else "random_policy"
config = (
PPOConfig()
.environment(env="two_player_game")
.multi_agent(
policy_mapping_fn=policy_mapping_fn,
policies_to_train=["learning_policy"],
)
.rl_module(
rl_module_spec=MultiRLModuleSpec(rl_module_specs={
"learning_policy": RLModuleSpec(),
"random_policy": RLModuleSpec(rl_module_class=RandomRLModule),
}),
)
)
algo = config.build()
print(algo.train())
RLlib 将根据提供的 policy_mapping_fn 为每个策略创建决策并路由。每个策略的训练统计信息在 train() 返回的结果字典中单独报告。
示例脚本 rock_paper_scissors_heuristic_vs_learned.py 和 rock_paper_scissors_learned_vs_learned.py 展示了带有启发式和学习策略的竞争性策略。
为每个 EnvRunner 扩展到许多 MultiAgentEnv#
注意
与单智能体环境不同,多智能体设置尚不可向量化。Ray 团队正在通过利用 gymnasium >= 1.x 的自定义向量化功能来解决此限制。
策略之间的变量共享#
RLlib 支持跨策略的变量共享。
有关详细信息,请参阅PettingZoo 参数共享示例。