快速入门#

Hello World#

这个“hello world”示例使用 Ray 编译图。首先,安装 Ray。

pip install "ray[cgraph]"

# For a ray version before 2.41, use the following instead:
# pip install "ray[adag]"

首先,定义一个简单的 Actor,它回显其参数。

import ray


@ray.remote
class SimpleActor:
    def echo(self, msg):
        return msg


接下来实例化 Actor,并使用经典的 Ray Core API remoteray.get 在 Actor 上执行任务。

import time

a = SimpleActor.remote()

# warmup
for _ in range(5):
    msg_ref = a.echo.remote("hello")
    ray.get(msg_ref)

start = time.perf_counter()
msg_ref = a.echo.remote("hello")
ray.get(msg_ref)
end = time.perf_counter()
print(f"Execution takes {(end - start) * 1000 * 1000} us")
Execution takes 969.0364822745323 us

现在,使用 Ray 编译图创建一个等效程序。首先,使用经典的 Ray Core 定义一个要执行的图,无需任何编译。稍后,编译此图,以应用优化并防止对图进行进一步修改。

首先,创建一个 Ray DAG(有向无环图),它是一个 Ray 任务的惰性执行图。请注意与经典 Ray Core API 的 3 个主要区别

  1. 使用 ray.dag.InputNode 上下文管理器指示应在运行时提供给 DAG 的哪些输入。

  2. 使用 bind() 代替 remote() 来指示惰性执行的 Ray 任务。

  3. 使用 execute() 执行 DAG。

在这里,定义并执行一个图。请注意,这里没有进行编译。它使用与前一个示例相同的执行后端

import ray.dag

with ray.dag.InputNode() as inp:
    # Note that it uses `bind` instead of `remote`.
    # This returns a ray.dag.DAGNode, instead of the usual ray.ObjectRef.
    dag = a.echo.bind(inp)

# warmup
for _ in range(5):
    msg_ref = dag.execute("hello")
    ray.get(msg_ref)

start = time.perf_counter()
# `dag.execute` runs the DAG and returns an ObjectRef. You can use `ray.get` API.
msg_ref = dag.execute("hello")
ray.get(msg_ref)
end = time.perf_counter()
print(f"Execution takes {(end - start) * 1000 * 1000} us")

接下来,使用 experimental_compile API 编译 dag。该图使用相同的 API 进行执行

dag = dag.experimental_compile()

# warmup
for _ in range(5):
    msg_ref = dag.execute("hello")
    ray.get(msg_ref)

start = time.perf_counter()
# `dag.execute` runs the DAG and returns CompiledDAGRef. Similar to
# ObjectRefs, you can use the ray.get API.
msg_ref = dag.execute("hello")
ray.get(msg_ref)
end = time.perf_counter()
print(f"Execution takes {(end - start) * 1000 * 1000} us")
Execution takes 86.72196418046951 us

同一个任务图的性能提高了 10 倍。这是因为函数 echo 开销很小,因此受系统开销影响很大。由于各种簿记和分布式协议,经典的 Ray Core API 通常有 1 毫秒以上的系统开销。

由于系统预先知道任务图,Ray 编译图可以提前预分配所有必要的资源,并大大降低系统开销。例如,如果 Actor a 与驱动程序在同一个节点上,Ray 编译图使用共享内存而不是 RPC 直接在驱动程序和 Actor 之间传输数据。

目前,DAG 任务在相关 Actor 的后台线程上运行。一个 Actor 一次只能参与一个 DAG。Actor 在参与编译图的同时,普通任务仍然可以在 Actor 上执行,但这些任务在主线程上执行。

完成后,您可以通过删除编译图或显式调用 dag.teardown() 来将其销毁。这允许在新的编译图中重复使用 Actor。

dag.teardown()

指定数据依赖项#

创建 DAG 时,可以将 ray.dag.DAGNode 作为参数传递给其他 .bind 调用来指定数据依赖项。例如,以下内容使用前面的示例创建一个 DAG,该 DAG 将同一消息从一个 Actor 传递到另一个 Actor

a = SimpleActor.remote()
b = SimpleActor.remote()

with ray.dag.InputNode() as inp:
    # Note that it uses `bind` instead of `remote`.
    # This returns a ray.dag.DAGNode, instead of the usual ray.ObjectRef.
    dag = a.echo.bind(inp)
    dag = b.echo.bind(dag)

dag = dag.experimental_compile()
print(ray.get(dag.execute("hello")))
hello

这是另一个示例,它将同一消息传递给两个 Actor,然后这两个 Actor 可以并行执行。它使用 ray.dag.MultiOutputNode 指示此 DAG 返回多个输出。然后,dag.execute() 返回多个 CompiledDAGRef 对象,每个节点一个

import ray.dag

a = SimpleActor.remote()
b = SimpleActor.remote()

with ray.dag.InputNode() as inp:
    # Note that it uses `bind` instead of `remote`.
    # This returns a ray.dag.DAGNode, instead of the usual ray.ObjectRef.
    dag = ray.dag.MultiOutputNode([a.echo.bind(inp), b.echo.bind(inp)])

dag = dag.experimental_compile()
print(ray.get(dag.execute("hello")))
Execution takes 86.72196418046951 us

请注意:* 在同一个 Actor 上,编译图按顺序执行。如果一个 Actor 在同一个编译图中有多个任务,它会先执行完所有任务,然后才开始执行下一个 DAG 输入。* 在同一个编译图中的 Actor 之间,执行可以是流水线的。一个 Actor 可能在下游 Actor 执行当前输入时开始执行下一个 DAG 输入。* 编译图目前仅支持 Actor 任务。不支持非 Actor 任务。

asyncio 支持#

如果您的编译图驱动程序在 asyncio 事件循环中运行,请使用 async API 确保执行编译图并获取结果不会阻塞事件循环。首先,将 enable_async=True 传递给 dag.experimental_compile()

import ray


@ray.remote
class EchoActor:
    def echo(self, msg):
        return msg


actor = EchoActor.remote()
with ray.dag.InputNode() as inp:
    dag = actor.echo.bind(inp)

cdag = dag.experimental_compile(enable_asyncio=True)

接下来,使用 execute_async 调用编译图。对 execute_async 调用 await 将在输入提交后返回,并返回一个可用于获取结果的 future。最后,使用 await 获取编译图的结果。

import asyncio


async def async_method(i):
    fut = await cdag.execute_async(i)
    result = await fut
    assert result == i


loop = asyncio.get_event_loop()
loop.run_until_complete(async_method(42))

执行和故障语义#

与经典 Ray Core 一样,Ray 编译图将异常传播到最终输出。特别是

  • 应用异常:如果应用任务抛出异常,编译图会将异常封装在 RayTaskError 中,并在调用方对结果调用 ray.get() 时抛出该异常。抛出的异常同时继承自 RayTaskError 和原始异常类。

  • 系统异常:系统异常包括 Actor 死亡或意外错误(例如网络错误)。对于 Actor 死亡,编译图会抛出 ActorDiedError,对于其他错误,它会抛出 RayChannelError

图在应用异常后仍然可以执行。但是,在发生系统异常时,图会自动关闭。如果 Actor 死亡导致图关闭,剩余的 Actor 会继续存活。

例如,此示例在 Actor 参与编译图时显式销毁了该 Actor。剩余的 Actor 是可重用的

from ray.dag import InputNode, MultiOutputNode


@ray.remote
class EchoActor:
    def echo(self, msg):
        return msg


actors = [EchoActor.remote() for _ in range(4)]
with InputNode() as inp:
    outputs = [actor.echo.bind(inp) for actor in actors]
    dag = MultiOutputNode(outputs)

compiled_dag = dag.experimental_compile()
# Kill one of the actors to simulate unexpected actor death.
ray.kill(actors[0])
ref = compiled_dag.execute(1)

live_actors = []
try:
    ray.get(ref)
except ray.exceptions.ActorDiedError:
    # At this point, the Compiled Graph is shutting down.
    for actor in actors:
        try:
            # Check for live actors.
            ray.get(actor.echo.remote("ping"))
            live_actors.append(actor)
        except ray.exceptions.RayActorError:
            pass

# Optionally, use the live actors to create a new Compiled Graph.
assert live_actors == actors[1:]

执行超时#

某些错误,例如 NCCL 网络错误,需要额外处理以避免挂起。将来,Ray 可能会尝试检测此类错误,但目前作为一种回退机制,它允许为 compiled_dag.execute()ray.get() 配置超时。

两者的默认超时时间均为 10 秒。设置以下环境变量可以更改默认超时时间

ray.get() 也有一个 timeout 参数,可以按每次调用设置超时时间。

CPU 到 GPU 通信#

使用经典的 Ray Core 时,在 Actor 之间传递 torch.Tensors 可能会变得昂贵,尤其是在设备之间传输时。这是因为 Ray Core 不知道最终目标设备。因此,您可能会看到源设备和目标设备以外的设备之间进行不必要的复制。

Ray 编译图自带对在不同设备上执行的 Actor 之间传递 torch.Tensors 的原生支持。开发人员现在可以在编译图声明中使用类型提示注解来指示 torch.Tensor 的最终目标设备。

import torch
import ray
import ray.dag


@ray.remote(num_gpus=1)
class GPUActor:
    def process(self, tensor: torch.Tensor):
        assert tensor.device.type == "cuda"
        return tensor.shape


actor = GPUActor.remote()

在 Ray Core 中,如果您尝试从驱动程序传递 CPU 张量,GPU Actor 会接收到 CPU 张量

# This will fail because the driver passes a CPU copy of the tensor,
# and the GPU actor also receives a CPU copy.
ray.get(actor.process.remote(torch.zeros(10)))

使用 Ray 编译图,您可以用类型提示注解 DAG 节点,以指示值中可能包含 torch.Tensor

with ray.dag.InputNode() as inp:
    inp = inp.with_tensor_transport(device="cuda")
    dag = actor.process.bind(inp)

cdag = dag.experimental_compile()
print(ray.get(cdag.execute(torch.zeros(10))))

在底层,Ray 编译图后端将 torch.Tensor 复制到 Ray Core 分配给 GPUActor 的 GPU 上。

当然,您也可以自己完成,但使用编译图有其优势

  • Ray 编译图可以最大限度地减少数据复制次数。例如,从一个 CPU 传递到多个 GPU 需要一次复制到共享内存缓冲区,然后每个目标 GPU 需要一次主机到设备的复制。

  • 将来,这可以通过诸如 内存锁定 等技术进一步优化,当 CPU 是目标时使用零拷贝反序列化等。

GPU 到 GPU 通信#

Ray 编译图支持基于 NCCL 的 CUDA torch.Tensor 对象传输,避免通过 Ray 基于 CPU 的共享内存对象存储进行任何复制。通过用户提供的类型提示,Ray 可以提前准备 NCCL 通信器和操作调度,从而避免死锁并实现计算与通信的重叠

Ray 编译图底层使用 cupy 来支持 NCCL 操作。cupy 版本会影响 NCCL 版本。Ray 团队未来还计划支持自定义通信器,例如支持跨 CPU 的集合操作或重用现有集合组。

首先,创建发送方和接收方 Actor。请注意,此示例需要至少 2 个 GPU。

import torch
import ray
import ray.dag
from ray.experimental.channel.torch_tensor_type import TorchTensorType


# Note that the following example requires at least 2 GPUs.
assert (
    ray.available_resources().get("GPU") >= 2
), "At least 2 GPUs are required to run this example."


@ray.remote(num_gpus=1)
class GPUSender:
    def send(self, shape):
        return torch.zeros(shape, device="cuda")


@ray.remote(num_gpus=1)
class GPUReceiver:
    def recv(self, tensor: torch.Tensor):
        assert tensor.device.type == "cuda"
        return tensor.shape


sender = GPUSender.remote()
receiver = GPUReceiver.remote()

为了支持使用 NCCL 进行 GPU 到 GPU 通信,请使用 with_tensor_transport API 提示包装包含要传输的 torch.Tensor 的 DAG 节点

with ray.dag.InputNode() as inp:
    dag = sender.send.bind(inp)
    # Add a type hint that the return value of `send` should use NCCL.
    dag = dag.with_tensor_transport("nccl")
    # NOTE: With ray<2.42, use `with_type_hint()` instead.
    # dag = dag.with_type_hint(TorchTensorType(transport="nccl"))
    dag = receiver.recv.bind(dag)

# Compile API prepares the NCCL communicator across all workers and schedule operations
# accordingly.
dag = dag.experimental_compile()
assert ray.get(dag.execute((10,))) == (10,)

当前的限制包括

  • 仅支持 torch.Tensor 和 NVIDIA NCCL

  • 支持点对点传输。集合通信操作即将推出。

  • 通信操作目前是同步进行的。计算与通信的重叠 是一个实验性功能。