Ray Collective Communication Lib#
Ray 的集体通信库 (ray.util.collective) 提供了一组用于分布式 CPU 或 GPU 之间通信的原生集体通信原语。
Ray 集体通信库
在 Ray Actor 和 Task 进程之间实现 10 倍更高效的带外 (out-of-band) 集体通信,
支持分布式 CPU 和 GPU,
使用 NCCL 和 GLOO 作为可选的高性能通信后端,
适用于 Ray 上的分布式 ML 程序。
集体通信原语支持矩阵#
下方是所有集体通信调用与不同后端当前支持的矩阵。
后端 |
||||
|---|---|---|---|---|
设备 |
CPU |
GPU |
CPU |
GPU |
send |
✔ |
✘ |
✘ |
✔ |
recv |
✔ |
✘ |
✘ |
✔ |
broadcast |
✔ |
✘ |
✘ |
✔ |
allreduce |
✔ |
✘ |
✘ |
✔ |
reduce |
✔ |
✘ |
✘ |
✔ |
allgather |
✔ |
✘ |
✘ |
✔ |
gather |
✘ |
✘ |
✘ |
✘ |
scatter |
✘ |
✘ |
✘ |
✘ |
reduce_scatter |
✔ |
✘ |
✘ |
✔ |
all-to-all |
✘ |
✘ |
✘ |
✘ |
barrier |
✔ |
✘ |
✘ |
✔ |
支持的张量类型#
torch.Tensornumpy.ndarraycupy.ndarray
用法#
安装和导入#
Ray collective 库已包含在发布的 Ray wheel 中。除了 Ray 之外,用户还需要安装 torch 或 cupy 才能使用 GLOO (torch.distributed.gloo) 和 NCCL 后端进行集体通信。
pip install torch
pip install cupy-cudaxxx # replace xxx with the right cuda version in your environment
要使用这些 API,请在您的 actor/task 或 driver 代码中通过以下方式导入 collective 包:
import ray.util.collective as col
初始化#
Collective 函数操作于 collective group。Collective group 包含一组将共同进入 collective 函数调用的进程(在 Ray 中,通常是 Ray 管理的 actors 或 tasks)。在进行 collective 调用之前,用户需要静态地将一组 actors/tasks 声明为一个 collective group。
下面是一个代码片段示例,它使用 init_collective_group() 和 create_collective_group() 这两个 API 来初始化一组远程 actor 之间的 collective groups。有关这两个 API 的详细说明,请参阅 API。
import ray
import ray.util.collective as collective
import cupy as cp
@ray.remote(num_gpus=1)
class Worker:
def __init__(self):
self.send = cp.ones((4, ), dtype=cp.float32)
self.recv = cp.zeros((4, ), dtype=cp.float32)
def setup(self, world_size, rank):
collective.init_collective_group(world_size, rank, "nccl", "default")
return True
def compute(self):
collective.allreduce(self.send, "default")
return self.send
def destroy(self):
collective.destroy_group()
# imperative
num_workers = 2
workers = []
init_rets = []
for i in range(num_workers):
w = Worker.remote()
workers.append(w)
init_rets.append(w.setup.remote(num_workers, i))
_ = ray.get(init_rets)
results = ray.get([w.compute.remote() for w in workers])
# declarative
for i in range(num_workers):
w = Worker.remote()
workers.append(w)
_options = {
"group_name": "177",
"world_size": 2,
"ranks": [0, 1],
"backend": "nccl"
}
collective.create_collective_group(workers, **_options)
results = ray.get([w.compute.remote() for w in workers])
请注意,对于同一组 actors/task 进程,可以构造多个 collective groups,并以 group_name 作为其唯一标识符。这使得指定不同 (子) 组进程之间的复杂通信模式成为可能。
集体通信#
请查看 支持矩阵 以了解当前支持的 collective 调用和后端的最新状态。
请注意,当前的 collective 通信 API 集是命令式的,并表现出以下行为:
所有 collective API 都是同步阻塞调用。
由于每个 API 只指定了集体通信的一部分,因此该 API 预期会被 (预先声明的) collective group 的每个参与进程调用。一旦所有进程都进行了调用并相互汇聚,集体通信就会发生并继续。
API 是命令式的,并且通信是带外进行的 — 它们需要在 collective 进程 (actor/task) 代码内部使用。
下面是一个使用 ray.util.collective.allreduce 的示例:
import ray
import cupy
import ray.util.collective as col
@ray.remote(num_gpus=1)
class Worker:
def __init__(self):
self.buffer = cupy.ones((10,), dtype=cupy.float32)
def compute(self):
col.allreduce(self.buffer, "default")
return self.buffer
# Create two actors A and B and create a collective group following the previous example...
A = Worker.remote()
B = Worker.remote()
# Invoke allreduce remotely
ray.get([A.compute.remote(), B.compute.remote()])
点对点通信#
ray.util.collective 还支持进程之间的 P2P send/recv 通信。
send/recv 与 collective 函数表现出相同的行为:它们是同步阻塞调用 — 成对的 send 和 recv 必须在配对的进程上一起调用才能指定整个通信,并且必须成功汇聚才能继续。请参阅下面的代码示例:
import ray
import cupy
import ray.util.collective as col
@ray.remote(num_gpus=1)
class Worker:
def __init__(self):
self.buffer = cupy.ones((10,), dtype=cupy.float32)
def get_buffer(self):
return self.buffer
def do_send(self, target_rank=0):
# this call is blocking
col.send(target_rank)
def do_recv(self, src_rank=0):
# this call is blocking
col.recv(src_rank)
def do_allreduce(self):
# this call is blocking as well
col.allreduce(self.buffer)
return self.buffer
# Create two actors
A = Worker.remote()
B = Worker.remote()
# Put A and B in a collective group
col.create_collective_group([A, B], options={rank=[0, 1], ...})
# let A to send a message to B; a send/recv has to be specified once at each worker
ray.get([A.do_send.remote(target_rank=1), B.do_recv.remote(src_rank=0)])
# An anti-pattern: the following code will hang, because it doesn't instantiate the recv side call
ray.get([A.do_send.remote(target_rank=1)])
单 GPU 和多 GPU 集体通信原语#
在许多集群设置中,一台机器通常拥有多个 GPU;有效利用 GPU-GPU 带宽,例如 NVLINK,可以显著提高通信性能。
ray.util.collective 支持多 GPU 集体通信调用,在这种情况下,一个进程 (actor/tasks) 管理多个 GPU (例如,通过 ray.remote(num_gpus=4))。使用这些多 GPU 集体通信函数通常比使用单 GPU 集体通信 API 并生成与 GPU 数量相等的进程数量更具性能优势。请参阅 API 参考以了解多 GPU 集体通信 API 的签名。
另外值得注意的是,所有多 GPU API 都具有以下限制:
仅支持 NCCL 后端。
执行多 GPU 集体通信或 P2P 调用的 Collective 进程需要拥有相同数量的 GPU 设备。
多 GPU 集体通信函数的输入通常是张量列表,每个张量位于调用者进程拥有的不同 GPU 设备上。
下面提供了利用多 GPU 集体通信 API 的代码示例。
import ray
import ray.util.collective as collective
import cupy as cp
from cupy.cuda import Device
@ray.remote(num_gpus=2)
class Worker:
def __init__(self):
with Device(0):
self.send1 = cp.ones((4, ), dtype=cp.float32)
with Device(1):
self.send2 = cp.ones((4, ), dtype=cp.float32) * 2
with Device(0):
self.recv1 = cp.ones((4, ), dtype=cp.float32)
with Device(1):
self.recv2 = cp.ones((4, ), dtype=cp.float32) * 2
def setup(self, world_size, rank):
self.rank = rank
collective.init_collective_group(world_size, rank, "nccl", "177")
return True
def allreduce_call(self):
collective.allreduce_multigpu([self.send1, self.send2], "177")
return [self.send1, self.send2]
def p2p_call(self):
if self.rank == 0:
collective.send_multigpu(self.send1 * 2, 1, 1, "8")
else:
collective.recv_multigpu(self.recv2, 0, 0, "8")
return self.recv2
# Note that the world size is 2 but there are 4 GPUs.
num_workers = 2
workers = []
init_rets = []
for i in range(num_workers):
w = Worker.remote()
workers.append(w)
init_rets.append(w.setup.remote(num_workers, i))
a = ray.get(init_rets)
results = ray.get([w.allreduce_call.remote() for w in workers])
results = ray.get([w.p2p_call.remote() for w in workers])
更多资源#
以下链接提供了关于如何有效利用 ray.util.collective 库的有用资源。
在
ray.util.collective.examples下的 更多运行示例。使用 Ray collective 库 扩展 spaCy 命名实体识别 (NER) 管道。
为数据并行分布式 ML 训练 实现 AllReduce 策略。
API 参考#
在 ray.util.collective 命名空间下公开的 API。
- class ray.util.collective.collective.GroupManager[source]#
使用此类来管理我们迄今为止创建的 collective groups。
每个进程都会有一个
GroupManager实例。每个进程可以属于多个 collective groups。成员信息和其他元数据存储在全局_group_mgr对象中。
- ray.util.collective.collective.is_group_initialized(group_name)[source]#
通过 group 名称检查 group 在当前进程中是否已初始化。
- ray.util.collective.collective.init_collective_group(world_size: int, rank: int, backend='NCCL', group_name: str = 'default', gloo_timeout: int = 30000)[source]#
在 actor 进程中初始化一个 collective group。
- 参数:
world_size – group 中的总进程数。
rank – 当前进程的 rank。
backend – 要使用的 CCL 后端,NCCL 或 GLOO。
group_name – collective group 的名称。
- 返回:
None
- ray.util.collective.collective.create_collective_group(actors, world_size: int, ranks: List[int], backend='NCCL', group_name: str = 'default', gloo_timeout: int = 30000)[source]#
将一组 actor 声明为一个 collective group。
注意:此函数应在 driver 进程中调用。
- 参数:
actors – 要设置为 collective group 的 actor 列表。
world_size – group 中的总进程数。
ranks (List[int]) – 每个 actor 的 rank。
backend – 要使用的 CCL 后端,NCCL 或 GLOO。
group_name – collective group 的名称。
- 返回:
None
- ray.util.collective.collective.destroy_collective_group(group_name: str = 'default') None[source]#
根据 group 名称销毁一个 collective group。
- ray.util.collective.collective.get_rank(group_name: str = 'default') int[source]#
返回此进程在给定 group 中的 rank。
- 参数:
group_name – 要查询的 group 名称。
- 返回:
当前进程在命名 group 中的 rank,如果 group 不存在或进程不属于该 group,则为 -1。
- ray.util.collective.collective.get_collective_group_size(group_name: str = 'default') int[source]#
返回具有给定名称的 collective group 的大小。
- 参数:
group_name – 要查询的 group 名称。
- 返回:
- collective group 的 world size,如果 group 不存在或进程不属于该 group,则为 -1。
不存在或进程不属于该 group。
- ray.util.collective.collective.allreduce(tensor, group_name: str = 'default', op=ReduceOp.SUM)[source]#
在 group 中对张量进行 collective allreduce。
- 参数:
tensor – 在当前进程上进行 all-reduced 的张量。
group_name – 用于执行 allreduce 的 collective group 名称。
op – 归约操作。
- 返回:
None
- ray.util.collective.collective.allreduce_multigpu(tensor_list: list, group_name: str = 'default', op=ReduceOp.SUM)[source]#
在 group 中对张量列表进行 collective allreduce。
- 参数:
tensor_list (List[tensor]) – 要 allreduced 的张量列表,每个张量都在一个 GPU 上。
group_name – 用于执行 allreduce 的 collective group 名称。
- 返回:
None
- ray.util.collective.collective.barrier(group_name: str = 'default')[source]#
在 collective group 中的所有进程上设置 barrier。
- 参数:
group_name – 要设置 barrier 的 group 名称。
- 返回:
None
- ray.util.collective.collective.reduce(tensor, dst_rank: int = 0, group_name: str = 'default', op=ReduceOp.SUM)[source]#
将张量从 group 归约到目标 rank。
- 参数:
tensor – 在当前进程上进行归约的张量。
dst_rank – 目标进程的 rank。
group_name – 用于执行 reduce 的 collective group 名称。
op – 归约操作。
- 返回:
None
- ray.util.collective.collective.reduce_multigpu(tensor_list: list, dst_rank: int = 0, dst_tensor: int = 0, group_name: str = 'default', op=ReduceOp.SUM)[source]#
将张量从 group 归约到目标 rank 和目标张量。
- 参数:
tensor_list – 在当前进程上进行归约的张量列表;每个张量位于一个 GPU 上。
dst_rank – 目标进程的 rank。
dst_tensor – 目标进程上的 GPU 索引。
group_name – 用于执行 reduce 的 collective group 名称。
op – 归约操作。
- 返回:
None
- ray.util.collective.collective.broadcast(tensor, src_rank: int = 0, group_name: str = 'default')[source]#
将张量从源进程广播到所有其他进程。
- 参数:
tensor – 要广播的张量 (源) 或要接收的张量 (目标)。
src_rank – 源进程的 rank。
group_name – 用于执行 broadcast 的 collective group 名称。
- 返回:
None
- ray.util.collective.collective.broadcast_multigpu(tensor_list, src_rank: int = 0, src_tensor: int = 0, group_name: str = 'default')[source]#
将张量从源 GPU 广播到所有其他 GPU。
- 参数:
tensor_list – 要广播 (源) 或接收 (目标) 的张量。
src_rank – 源进程的 rank。
src_tensor – 源进程上源 GPU 的索引。
group_name – 用于执行 broadcast 的 collective group 名称。
- 返回:
None
- ray.util.collective.collective.allgather(tensor_list: list, tensor, group_name: str = 'default')[source]#
将 group 中每个进程的张量 allgather 到一个列表中。
- 参数:
tensor_list – 结果,存储为张量列表。
tensor – 当前进程中 (要收集的) 张量。
group_name – collective group 的名称。
- 返回:
None
- ray.util.collective.collective.allgather_multigpu(output_tensor_lists: list, input_tensor_list: list, group_name: str = 'default')[source]#
将 group 中每个 GPU 的张量 allgather 到列表中。
- 参数:
output_tensor_lists (List[List[tensor]]) – 收集到的结果,形状必须为 num_gpus * world_size * shape(tensor)。
input_tensor_list – (List[tensor]): 张量列表,形状为 num_gpus * shape(tensor)。
group_name – collective group 的名称。
- 返回:
None
- ray.util.collective.collective.reducescatter(tensor, tensor_list: list, group_name: str = 'default', op=ReduceOp.SUM)[source]#
对 group 中的张量列表进行 reducescatter。
将 group 中每个进程的张量列表进行归约,然后将归约后的张量列表进行散列 — 每个进程一个张量。
- 参数:
tensor – 当前进程上的结果张量。
tensor_list – 要归约和散列的张量列表。
group_name – collective group 的名称。
op – 归约操作。
- 返回:
None
- ray.util.collective.collective.reducescatter_multigpu(output_tensor_list, input_tensor_lists, group_name: str = 'default', op=ReduceOp.SUM)[source]#
跨所有 GPU 约简散列(reducescatter)一个张量列表。
- 参数:
output_tensor_list – 结果张量列表,形状为:num_gpus * shape(tensor)。
input_tensor_lists – 原始张量,形状为:num_gpus * world_size * shape(tensor)。
group_name – collective group 的名称。
op – 归约操作。
- 返回:
None。
- ray.util.collective.collective.send(tensor, dst_rank: int, group_name: str = 'default')[source]#
同步发送一个张量到远程进程。
- 参数:
tensor – 要发送的张量。
dst_rank – 目标进程的 rank。
group_name – collective group 的名称。
- 返回:
None
- ray.util.collective.collective.send_multigpu(tensor, dst_rank: int, dst_gpu_index: int, group_name: str = 'default', n_elements: int = 0)[source]#
同步发送一个张量到远程 GPU。
该函数假定每个进程拥有多个 GPU,并且发送方和接收方的进程拥有相同数量的 GPU。
- 参数:
tensor – 要发送的张量,位于 GPU 上。
dst_rank – 目标进程的 rank。
dst_gpu_index – 目标 GPU 索引。
group_name – collective group 的名称。
n_elements – 如果指定,则发送张量起始地址之后的 n 个元素。
- 返回:
None
- ray.util.collective.collective.recv(tensor, src_rank: int, group_name: str = 'default')[source]#
同步从远程进程接收一个张量。
- 参数:
tensor – 接收到的张量。
src_rank – 源进程的 rank。
group_name – collective group 的名称。
- 返回:
None
- ray.util.collective.collective.recv_multigpu(tensor, src_rank: int, src_gpu_index: int, group_name: str = 'default', n_elements: int = 0)[source]#
同步从远程 GPU 接收一个张量。
该函数假定每个进程拥有多个 GPU,并且发送方和接收方的进程拥有相同数量的 GPU。
- 参数:
tensor – 接收到的张量,位于 GPU 上。
src_rank – 源进程的 rank。
src_gpu_index – 源进程上源 GPU 的索引。
group_name – 集体通信组的名称。
- 返回:
None