快速入门#

Hello World#

这个“hello world”示例使用了 Ray 编译图。首先,安装 Ray。

pip install "ray[cgraph]"

# For a ray version before 2.41, use the following instead:
# pip install "ray[adag]"

首先,定义一个简单的 actor,它会回显其参数。

import ray


@ray.remote
class SimpleActor:
    def echo(self, msg):
        return msg


接下来实例化 actor,并使用经典的 Ray Core API remoteray.get 在 actor 上执行任务。

import time

a = SimpleActor.remote()

# warmup
for _ in range(5):
    msg_ref = a.echo.remote("hello")
    ray.get(msg_ref)

start = time.perf_counter()
msg_ref = a.echo.remote("hello")
ray.get(msg_ref)
end = time.perf_counter()
print(f"Execution takes {(end - start) * 1000 * 1000} us")
Execution takes 969.0364822745323 us

现在,使用 Ray 编译图创建一个等效的程序。首先,使用经典的 Ray Core 定义一个要执行的图,不进行任何编译。稍后,编译此图,以应用优化并防止对图进行进一步修改。

首先,创建一个 Ray DAG(有向无环图),这是一个惰性执行的 Ray 任务图。请注意与经典 Ray Core API 的 3 个主要区别:

  1. 使用 ray.dag.InputNode 上下文管理器来指示 DAG 的哪些输入将在运行时提供。

  2. 使用 bind() 而不是 remote() 来指示惰性执行的 Ray 任务。

  3. 使用 execute() 来执行 DAG。

在这里,定义一个图并执行它。请注意,这里**没有**进行编译。这使用的是与前面示例相同的执行后端。

import ray.dag

with ray.dag.InputNode() as inp:
    # Note that it uses `bind` instead of `remote`.
    # This returns a ray.dag.DAGNode, instead of the usual ray.ObjectRef.
    dag = a.echo.bind(inp)

# warmup
for _ in range(5):
    msg_ref = dag.execute("hello")
    ray.get(msg_ref)

start = time.perf_counter()
# `dag.execute` runs the DAG and returns an ObjectRef. You can use `ray.get` API.
msg_ref = dag.execute("hello")
ray.get(msg_ref)
end = time.perf_counter()
print(f"Execution takes {(end - start) * 1000 * 1000} us")

接下来,使用 experimental_compile API 编译 dag。图使用相同的 API 进行执行。

dag = dag.experimental_compile()

# warmup
for _ in range(5):
    msg_ref = dag.execute("hello")
    ray.get(msg_ref)

start = time.perf_counter()
# `dag.execute` runs the DAG and returns CompiledDAGRef. Similar to
# ObjectRefs, you can use the ray.get API.
msg_ref = dag.execute("hello")
ray.get(msg_ref)
end = time.perf_counter()
print(f"Execution takes {(end - start) * 1000 * 1000} us")
Execution takes 86.72196418046951 us

同一任务图的性能提高了 10 倍。这是因为 echo 函数开销很小,因此受系统开销的影响很大。由于各种簿记和分布式协议,经典的 Ray Core API 通常具有 1 毫秒以上的系统开销。

由于系统提前知道任务图,Ray 编译图可以提前预先分配所有必要的资源,并大大减少系统开销。例如,如果 actor a 与驱动程序位于同一节点上,Ray 编译图将使用共享内存而不是 RPC 来直接在驱动程序和 actor 之间传输数据。

当前,DAG 任务在相关 actor 的**后台线程**上运行。一个 actor 一次只能参与一个 DAG。在 actor 参与编译图时,普通任务仍然可以在 actor 上执行,但这些任务在主线程上执行。

完成后,可以通过删除编译图或显式调用 dag.teardown() 来拆解编译图。这允许 actor 在新的编译图中重用。

dag.teardown()

指定数据依赖项#

创建 DAG 时,可以将 ray.dag.DAGNode 作为参数传递给其他 .bind 调用,以指定数据依赖项。例如,以下示例使用前面的示例创建一个 DAG,该 DAG 将同一消息从一个 actor 传递到另一个 actor。

a = SimpleActor.remote()
b = SimpleActor.remote()

with ray.dag.InputNode() as inp:
    # Note that it uses `bind` instead of `remote`.
    # This returns a ray.dag.DAGNode, instead of the usual ray.ObjectRef.
    dag = a.echo.bind(inp)
    dag = b.echo.bind(dag)

dag = dag.experimental_compile()
print(ray.get(dag.execute("hello")))
hello

这是另一个将同一消息传递给两个 actor 的示例,这两个 actor 然后可以并行执行。它使用 ray.dag.MultiOutputNode 来指示此 DAG 返回多个输出。然后,dag.execute() 返回多个 CompiledDAGRef 对象,每个对象对应一个节点。

import ray.dag

a = SimpleActor.remote()
b = SimpleActor.remote()

with ray.dag.InputNode() as inp:
    # Note that it uses `bind` instead of `remote`.
    # This returns a ray.dag.DAGNode, instead of the usual ray.ObjectRef.
    dag = ray.dag.MultiOutputNode([a.echo.bind(inp), b.echo.bind(inp)])

dag = dag.experimental_compile()
print(ray.get(dag.execute("hello")))
Execution takes 86.72196418046951 us

请注意:* 在同一个 actor 上,编译图按顺序执行。如果一个 actor 在同一个编译图中具有多个任务,它将执行所有任务直到完成,然后再执行下一个 DAG 输入。* 在同一个编译图中的 actor 之间,执行可能会被流水线化。当下游 actor 执行当前任务输入时,一个 actor 可能会开始执行下一个 DAG 输入。* 编译图目前仅支持 actor 任务。不支持普通任务。

asyncio 支持#

如果您的编译图驱动程序正在 asyncio 事件循环中运行,请使用 async API,以确保执行编译图和获取结果不会阻塞事件循环。首先,将 enable_async=True 传递给 dag.experimental_compile()

import ray


@ray.remote
class EchoActor:
    def echo(self, msg):
        return msg


actor = EchoActor.remote()
with ray.dag.InputNode() as inp:
    dag = actor.echo.bind(inp)

cdag = dag.experimental_compile(enable_asyncio=True)

接下来,使用 execute_async 调用编译图。对 execute_async 调用 await 将在输入提交后返回,并返回一个可用于获取结果的 future。最后,使用 await 获取编译图的结果。

import asyncio


async def async_method(i):
    fut = await cdag.execute_async(i)
    result = await fut
    assert result == i


loop = asyncio.get_event_loop()
loop.run_until_complete(async_method(42))

执行和失败语义#

与经典的 Ray Core 一样,Ray 编译图将异常传播到最终输出。特别是:

  • 应用程序异常:如果应用程序任务抛出异常,编译图会将该异常包装在 RayTaskError 中,并在调用者对结果调用 ray.get() 时抛出。抛出的异常继承自 RayTaskError 和原始异常类。

  • 系统异常:系统异常包括 actor 死亡或意外错误,如网络错误。对于 actor 死亡,编译图会抛出 ActorDiedError,对于其他错误,它会抛出 RayChannelError

在发生应用程序异常后,图仍然可以执行。但是,在发生系统异常的情况下,图会自动关闭。如果 actor 的死亡导致图关闭,剩余的 actor 仍然存活。

例如,此示例在 actor 参与编译图时显式销毁了该 actor。剩余的 actor 可重用。

from ray.dag import InputNode, MultiOutputNode


@ray.remote
class EchoActor:
    def echo(self, msg):
        return msg


actors = [EchoActor.remote() for _ in range(4)]
with InputNode() as inp:
    outputs = [actor.echo.bind(inp) for actor in actors]
    dag = MultiOutputNode(outputs)

compiled_dag = dag.experimental_compile()
# Kill one of the actors to simulate unexpected actor death.
ray.kill(actors[0])
ref = compiled_dag.execute(1)

live_actors = []
try:
    ray.get(ref)
except ray.exceptions.ActorDiedError:
    # At this point, the Compiled Graph is shutting down.
    for actor in actors:
        try:
            # Check for live actors.
            ray.get(actor.echo.remote("ping"))
            live_actors.append(actor)
        except ray.exceptions.RayActorError:
            pass

# Optionally, use the live actors to create a new Compiled Graph.
assert live_actors == actors[1:]

执行超时#

某些错误,例如 NCCL 网络错误,需要额外的处理以避免挂起。将来,Ray 可能会尝试检测此类错误,但目前作为一种回退,它允许为 compiled_dag.execute()ray.get() 配置可配置的超时。

两者的默认超时时间均为 10 秒。设置以下环境变量以更改默认超时:

ray.get() 还具有一个超时参数,用于设置每次调用的超时。

CPU 到 GPU 通信#

使用经典的 Ray Core,在 actor 之间传递 torch.Tensors 可能会变得昂贵,尤其是在设备之间传输时。这是因为 Ray Core 不知道最终目标设备。因此,您可能会看到在源设备和目标设备以外的设备之间进行不必要的复制。

Ray 编译图内置了对在不同设备上运行的 actor 之间传递 torch.Tensors 的原生支持。开发人员现在可以在编译图声明中使用类型提示注解来指示 torch.Tensor 的最终目标设备。

import torch
import ray
import ray.dag


@ray.remote(num_gpus=1)
class GPUActor:
    def process(self, tensor: torch.Tensor):
        assert tensor.device.type == "cuda"
        return tensor.shape


actor = GPUActor.remote()

在 Ray Core 中,如果您尝试从驱动程序传递 CPU 张量,GPU actor 会收到一个 CPU 张量。

# This will fail because the driver passes a CPU copy of the tensor,
# and the GPU actor also receives a CPU copy.
ray.get(actor.process.remote(torch.zeros(10)))

使用 Ray 编译图,您可以使用类型提示注解 DAG 节点,以指示值中可能包含 torch.Tensor

with ray.dag.InputNode() as inp:
    inp = inp.with_tensor_transport(device="cuda")
    dag = actor.process.bind(inp)

cdag = dag.experimental_compile()
print(ray.get(cdag.execute(torch.zeros(10))))

在底层,Ray 编译图后端会将 torch.Tensor 复制到 Ray Core 分配给 GPUActor 的 GPU。

当然,您也可以自己完成,但使用编译图有其优势。

  • Ray 编译图可以最大限度地减少数据副本的数量。例如,从一个 CPU 传递到多个 GPU 需要复制到一个共享内存缓冲区,然后为每个目标 GPU 进行一次主机到设备的复制。

  • 将来,可以通过诸如 内存固定、在 CPU 是目标时使用零拷贝反序列化等技术进一步优化。

GPU 到 GPU 通信#

Ray 编译图支持基于 NCCL 的 CUDA torch.Tensor 对象传输,避免了通过 Ray 的基于 CPU 的共享内存对象存储进行任何复制。通过用户提供的类型提示,Ray 提前准备 NCCL 通信器和操作调度,避免死锁和 计算与通信重叠

Ray 编译图在底层使用 cupy 来支持 NCCL 操作。cupy 版本影响 NCCL 版本。Ray 团队还计划将来支持自定义通信器,例如支持跨 CPU 的集体操作或重用现有集体组。

首先,创建发送者和接收者 actor。请注意,此示例至少需要 2 个 GPU。

import torch
import ray
import ray.dag
from ray.experimental.channel.torch_tensor_type import TorchTensorType


# Note that the following example requires at least 2 GPUs.
assert (
    ray.available_resources().get("GPU") >= 2
), "At least 2 GPUs are required to run this example."


@ray.remote(num_gpus=1)
class GPUSender:
    def send(self, shape):
        return torch.zeros(shape, device="cuda")


@ray.remote(num_gpus=1)
class GPUReceiver:
    def recv(self, tensor: torch.Tensor):
        assert tensor.device.type == "cuda"
        return tensor.shape


sender = GPUSender.remote()
receiver = GPUReceiver.remote()

要支持使用 NCCL 进行 GPU 到 GPU 通信,请使用 with_tensor_transport API 提示包装包含您要传输的 torch.Tensor 的 DAG 节点。

with ray.dag.InputNode() as inp:
    dag = sender.send.bind(inp)
    # Add a type hint that the return value of `send` should use NCCL.
    dag = dag.with_tensor_transport("nccl")
    # NOTE: With ray<2.42, use `with_type_hint()` instead.
    # dag = dag.with_type_hint(TorchTensorType(transport="nccl"))
    dag = receiver.recv.bind(dag)

# Compile API prepares the NCCL communicator across all workers and schedule operations
# accordingly.
dag = dag.experimental_compile()
assert ray.get(dag.execute((10,))) == (10,)

当前限制包括:

  • torch.Tensor 和 NVIDIA NCCL 仅。

  • 支持点对点传输。集体通信操作即将推出。

  • 通信操作目前是同步进行的。计算与通信重叠 是一项实验性功能。