Ray 集群通信库#

Ray 集群通信库(ray.util.collective)提供了一组原生集体原语,用于分布式 CPU 或 GPU 之间的通信。

Ray 集群通信库

  • 使 Ray Actor 和任务进程之间的带外集体通信效率提高 10 倍,

  • 可在分布式 CPU 和 GPU 上运行,

  • 使用 NCCL 和 GLOO 作为可选的高性能通信后端,

  • 适用于 Ray 上的分布式 ML 程序。

集体原语支持矩阵#

请参阅下方表格,了解当前所有集体调用在不同后端上的支持情况。

后端

gloo

nccl

设备

CPU

GPU

CPU

GPU

send

recv

broadcast

allreduce

reduce

allgather

gather

scatter

reduce_scatter

all-to-all

barrier

支持的张量类型#

  • torch.Tensor

  • numpy.ndarray

  • cupy.ndarray

用法#

安装和导入#

Ray 集群库与发布的 Ray wheel 包捆绑在一起。除了 Ray 之外,用户还需要安装 pygloocupy,以便分别使用 GLOO 和 NCCL 后端进行集体通信。

pip install pygloo
pip install cupy-cudaxxx # replace xxx with the right cuda version in your environment

要使用这些 API,请通过以下方式在您的 Actor/任务或驱动程序代码中导入 collective 包:

import ray.util.collective as col

初始化#

集体函数对集体组进行操作。集体组包含多个进程(在 Ray 中,它们通常是 Ray 管理的 Actor 或任务),这些进程将一起进入集体函数调用。在进行集体调用之前,用户需要将一组 Actor/任务静态地声明为一个集体组。

下面是一个示例代码片段,它使用 init_collective_group()create_collective_group() 这两个 API 在一些远程 Actor 之间初始化集体组。有关这两个 API 的详细说明,请参阅 API

import ray
import ray.util.collective as collective

import cupy as cp


@ray.remote(num_gpus=1)
class Worker:
   def __init__(self):
       self.send = cp.ones((4, ), dtype=cp.float32)
       self.recv = cp.zeros((4, ), dtype=cp.float32)

   def setup(self, world_size, rank):
       collective.init_collective_group(world_size, rank, "nccl", "default")
       return True

   def compute(self):
       collective.allreduce(self.send, "default")
       return self.send

   def destroy(self):
       collective.destroy_group()

# imperative
num_workers = 2
workers = []
init_rets = []
for i in range(num_workers):
   w = Worker.remote()
   workers.append(w)
   init_rets.append(w.setup.remote(num_workers, i))
_ = ray.get(init_rets)
results = ray.get([w.compute.remote() for w in workers])


# declarative
for i in range(num_workers):
   w = Worker.remote()
   workers.append(w)
_options = {
   "group_name": "177",
   "world_size": 2,
   "ranks": [0, 1],
   "backend": "nccl"
}
collective.create_collective_group(workers, **_options)
results = ray.get([w.compute.remote() for w in workers])

请注意,对于同一组 Actor/任务进程,可以使用 group_name 作为唯一标识符构建多个集体组。这使得可以指定不同(子)进程集之间的复杂通信模式。

集体通信#

查看 支持矩阵,了解当前支持的集体调用和后端的状态。

请注意,当前的一组集体通信 API 是命令式的,并表现出以下行为:

  • 所有集体 API 都是同步阻塞调用

  • 由于每个 API 只指定了集体通信的一部分,因此预计该 API 将由(预先声明的)集体组中的每个参与进程调用。所有进程都完成调用并相互会合后,集体通信才会发生并继续进行。

  • 这些 API 是命令式的,通信发生在带外——它们需要在集体进程(Actor/任务)代码内部使用。

下面是使用 ray.util.collective.allreduce 的示例:

import ray
import cupy
import ray.util.collective as col


@ray.remote(num_gpus=1)
class Worker:
    def __init__(self):
        self.buffer = cupy.ones((10,), dtype=cupy.float32)

    def compute(self):
        col.allreduce(self.buffer, "default")
        return self.buffer

# Create two actors A and B and create a collective group following the previous example...
A = Worker.remote()
B = Worker.remote()
# Invoke allreduce remotely
ray.get([A.compute.remote(), B.compute.remote()])

点对点通信#

ray.util.collective 也支持进程间的 P2P 发送/接收通信。

发送/接收与集体函数表现出相同的行为:它们是同步阻塞调用——一对发送和接收必须在配对的进程上同时调用,才能指定整个通信,并且必须成功相互会合才能继续进行。请参阅下面的代码示例:

import ray
import cupy
import ray.util.collective as col


@ray.remote(num_gpus=1)
class Worker:
    def __init__(self):
        self.buffer = cupy.ones((10,), dtype=cupy.float32)

    def get_buffer(self):
        return self.buffer

    def do_send(self, target_rank=0):
        # this call is blocking
        col.send(target_rank)

    def do_recv(self, src_rank=0):
        # this call is blocking
        col.recv(src_rank)

    def do_allreduce(self):
        # this call is blocking as well
        col.allreduce(self.buffer)
        return self.buffer

# Create two actors
A = Worker.remote()
B = Worker.remote()

# Put A and B in a collective group
col.create_collective_group([A, B], options={rank=[0, 1], ...})

# let A to send a message to B; a send/recv has to be specified once at each worker
ray.get([A.do_send.remote(target_rank=1), B.do_recv.remote(src_rank=0)])

# An anti-pattern: the following code will hang, because it doesn't instantiate the recv side call
ray.get([A.do_send.remote(target_rank=1)])

单 GPU 和多 GPU 集体原语#

在许多集群设置中,一台机器通常有多个 GPU;有效地利用 GPU-GPU 带宽(例如 NVLINK)可以显著提高通信性能。

ray.util.collective 支持多 GPU 集体调用,在这种情况下,一个进程(Actor/任务)管理多个 GPU(例如,通过 ray.remote(num_gpus=4))。使用这些多 GPU 集体函数通常比使用单 GPU 集体 API 并生成与 GPU 数量相等的进程更具性能优势。请参阅 API 参考文档,了解多 GPU 集体 API 的签名。

另外值得注意的是,所有多 GPU API 都有以下限制:

  • 仅支持 NCCL 后端。

  • 进行多 GPU 集体或 P2P 调用的集体进程需要拥有相同数量的 GPU 设备。

  • 多 GPU 集体函数的输入通常是张量列表,每个张量位于调用进程拥有的不同 GPU 设备上。

下面提供了利用多 GPU 集体 API 的示例代码:

import ray
import ray.util.collective as collective

import cupy as cp
from cupy.cuda import Device


@ray.remote(num_gpus=2)
class Worker:
   def __init__(self):
       with Device(0):
           self.send1 = cp.ones((4, ), dtype=cp.float32)
       with Device(1):
           self.send2 = cp.ones((4, ), dtype=cp.float32) * 2
       with Device(0):
           self.recv1 = cp.ones((4, ), dtype=cp.float32)
       with Device(1):
           self.recv2 = cp.ones((4, ), dtype=cp.float32) * 2

   def setup(self, world_size, rank):
       self.rank = rank
       collective.init_collective_group(world_size, rank, "nccl", "177")
       return True

   def allreduce_call(self):
       collective.allreduce_multigpu([self.send1, self.send2], "177")
       return [self.send1, self.send2]

   def p2p_call(self):
       if self.rank == 0:
          collective.send_multigpu(self.send1 * 2, 1, 1, "8")
       else:
          collective.recv_multigpu(self.recv2, 0, 0, "8")
       return self.recv2

# Note that the world size is 2 but there are 4 GPUs.
num_workers = 2
workers = []
init_rets = []
for i in range(num_workers):
   w = Worker.remote()
   workers.append(w)
   init_rets.append(w.setup.remote(num_workers, i))
a = ray.get(init_rets)
results = ray.get([w.allreduce_call.remote() for w in workers])
results = ray.get([w.p2p_call.remote() for w in workers])

更多资源#

以下链接提供了有关如何高效利用 ray.util.collective 库的有用资源。

API 参考#

在 ray.util.collective 命名空间下公开的 API。

class ray.util.collective.collective.GroupManager[source]#

使用此类来管理我们目前创建的集体组。

每个进程都将有一个 GroupManager 实例。每个进程可能属于多个集体组。成员信息和其他元数据存储在全局 _group_mgr 对象中。

create_collective_group(backend, world_size, rank, group_name, gloo_timeout)[source]#

在管理器中创建新集体组的入口。

也将注册和组信息放入管理器元数据中。

get_group_by_name(group_name)[source]#

按名称获取集体组句柄。

destroy_collective_group(group_name)[source]#

组析构函数。

ray.util.collective.collective.is_group_initialized(group_name)[source]#

检查此进程中是否已按组名初始化组。

ray.util.collective.collective.init_collective_group(world_size: int, rank: int, backend='nccl', group_name: str = 'default', gloo_timeout: int = 30000)[source]#

在 Actor 进程内部初始化一个集体组。

参数:
  • world_size – 组中的进程总数。

  • rank – 当前进程的排名。

  • backend – 要使用的 CCL 后端,NCCL 或 GLOO。

  • group_name – 集体组的名称。

返回:

ray.util.collective.collective.create_collective_group(actors, world_size: int, ranks: List[int], backend='nccl', group_name: str = 'default', gloo_timeout: int = 30000)[source]#

声明一个 Actor 列表作为集体组。

注意:此函数应在驱动程序进程中调用。

参数:
  • actors – 要设置到集体组中的 Actor 列表。

  • world_size – 组中的进程总数。

  • ranks (List[int]) – 每个 Actor 的排名。

  • backend – 要使用的 CCL 后端,NCCL 或 GLOO。

  • group_name – 集体组的名称。

返回:

ray.util.collective.collective.destroy_collective_group(group_name: str = 'default') None[source]#

销毁给定组名的集体组。

ray.util.collective.collective.get_rank(group_name: str = 'default') int[source]#

返回此进程在给定组中的排名。

参数:

group_name – 要查询的组名

返回:

此进程在指定组中的排名,如果组不存在或进程不属于该组,则为 -1。

ray.util.collective.collective.get_collective_group_size(group_name: str = 'default') int[source]#

返回给定组名的集体组的大小。

参数:

group_name – 要查询的组名

返回:

集体组的世界大小,如果组不

存在或进程不属于该组,则为 -1。

ray.util.collective.collective.allreduce(tensor, group_name: str = 'default', op=ReduceOp.SUM)[source]#

在组内集体 allreduce 张量。

参数:
  • tensor – 此进程上要进行 all-reduce 的张量。

  • group_name – 执行 allreduce 的集体组名。

  • op – reduce 操作。

返回:

ray.util.collective.collective.allreduce_multigpu(tensor_list: list, group_name: str = 'default', op=ReduceOp.SUM)[source]#

在组内集体 allreduce 张量列表。

参数:
  • tensor_list (List[tensor]) – 要进行 allreduced 的张量列表,每个张量位于一个 GPU 上。

  • group_name – 执行 allreduce 的集体组名。

返回:

ray.util.collective.collective.barrier(group_name: str = 'default')[source]#

屏障集体组中的所有进程。

参数:

group_name – 要进行屏障的组名。

返回:

ray.util.collective.collective.reduce(tensor, dst_rank: int = 0, group_name: str = 'default', op=ReduceOp.SUM)[source]#

在组内将张量 reduce 到目标排名。

参数:
  • tensor – 此进程上要进行 reduced 的张量。

  • dst_rank – 目标进程的排名。

  • group_name – 执行 reduce 的集体组名。

  • op – reduce 操作。

返回:

ray.util.collective.collective.reduce_multigpu(tensor_list: list, dst_rank: int = 0, dst_tensor: int = 0, group_name: str = 'default', op=ReduceOp.SUM)[source]#

在组内将张量 reduce 到目标排名和目标张量。

参数:
  • tensor_list – 此进程上要进行 reduced 的张量列表;每个张量位于一个 GPU 上。

  • dst_rank – 目标进程的排名。

  • dst_tensor – 目标的 GPU 索引。

  • group_name – 执行 reduce 的集体组名。

  • op – reduce 操作。

返回:

ray.util.collective.collective.broadcast(tensor, src_rank: int = 0, group_name: str = 'default')[source]#

将张量从源进程广播到所有其他进程。

参数:
  • tensor – 此进程上要广播(源)或接收(目标)的张量。

  • src_rank – 源进程的排名。

  • group_name – 执行广播的集体组名。

返回:

ray.util.collective.collective.broadcast_multigpu(tensor_list, src_rank: int = 0, src_tensor: int = 0, group_name: str = 'default')[source]#

将张量从源 GPU 广播到所有其他 GPU。

参数:
  • tensor_list – 要广播(源)或接收(目标)的张量。

  • src_rank – 源进程的排名。

  • src_tensor – 源进程上源 GPU 的索引。

  • group_name – 执行广播的集体组名。

返回:

ray.util.collective.collective.allgather(tensor_list: list, tensor, group_name: str = 'default')[source]#

将组中每个进程的张量 allgather 到一个列表中。

参数:
  • tensor_list – 结果,存储为张量列表。

  • tensor – 当前进程中的张量(要聚集的)

  • group_name – 集体组的名称。

返回:

ray.util.collective.collective.allgather_multigpu(output_tensor_lists: list, input_tensor_list: list, group_name: str = 'default')[source]#

将组中每个 GPU 的张量 allgather 到列表中。

参数:
  • output_tensor_lists (List[List[tensor]]) – 聚集的结果,形状必须是 num_gpus * world_size * shape(tensor)。

  • input_tensor_list – (List[tensor]): 张量列表,形状为 num_gpus * shape(tensor)。

  • group_name – 集体组的名称。

返回:

ray.util.collective.collective.reducescatter(tensor, tensor_list: list, group_name: str = 'default', op=ReduceOp.SUM)[source]#

在组内 reducescatter 张量列表。

在组内每个进程上 reduce 张量列表,然后 scatter reduce 后的张量列表——每个进程一个张量。

参数:
  • tensor – 此进程上的结果张量。

  • tensor_list – 要进行 reduce 和 scatter 的张量列表。

  • group_name – 集体组的名称。

  • op – reduce 操作。

返回:

ray.util.collective.collective.reducescatter_multigpu(output_tensor_list, input_tensor_lists, group_name: str = 'default', op=ReduceOp.SUM)[source]#

在所有 GPU 上 reducescatter 张量列表。

参数:
  • output_tensor_list – 结果张量列表,形状为:num_gpus * shape(tensor)。

  • input_tensor_lists – 原始张量,形状为:num_gpus * world_size * shape(tensor)。

  • group_name – 集体组的名称。

  • op – reduce 操作。

返回:

无。

ray.util.collective.collective.send(tensor, dst_rank: int, group_name: str = 'default')[source]#

同步发送张量到远程进程。

参数:
  • tensor – 要发送的张量。

  • dst_rank – 目标进程的排名。

  • group_name – 集体组的名称。

返回:

ray.util.collective.collective.send_multigpu(tensor, dst_rank: int, dst_gpu_index: int, group_name: str = 'default', n_elements: int = 0)[source]#

同步发送张量到远程 GPU。

函数假设每个进程拥有 >1 个 GPU,并且发送进程和接收进程拥有相等数量的 GPU。

参数:
  • tensor – 要发送的张量,位于一个 GPU 上。

  • dst_rank – 目标进程的排名。

  • dst_gpu_index – 目标 gpu 索引。

  • group_name – 集体组的名称。

  • n_elements – 如果指定,从张量起始地址发送接下来的 n 个元素。

返回:

ray.util.collective.collective.recv(tensor, src_rank: int, group_name: str = 'default')[source]#

同步接收远程进程发送的张量。

参数:
  • tensor – 接收到的张量。

  • src_rank – 源进程的排名。

  • group_name – 集体组的名称。

返回:

ray.util.collective.collective.recv_multigpu(tensor, src_rank: int, src_gpu_index: int, group_name: str = 'default', n_elements: int = 0)[source]#

同步接收远程 GPU 发送的张量。

函数假设每个进程拥有 >1 个 GPU,并且发送进程和接收进程拥有相等数量的 GPU。

参数:
  • tensor – 接收到的张量,位于一个 GPU 上。

  • src_rank – 源进程的排名。

  • src_gpu_index (int)

  • group_name – 集体组的名称。

返回:

ray.util.collective.collective.synchronize(gpu_id: int)[source]#

将当前进程同步到给定设备。

参数:

gpu_id – 要同步的 GPU 设备 id。

返回:

ray.util.collective.collective.get_group_handle(group_name: str = 'default')[source]#

检查组是否已初始化并返回组句柄。

参数:

group_name – 集体组的名称。

返回:

集体组句柄。