快速入门#
Hello World#
这个“hello world”示例使用 Ray 编译图。首先,安装 Ray。
pip install "ray[cgraph]"
# For a ray version before 2.41, use the following instead:
# pip install "ray[adag]"
首先,定义一个简单的 Actor,它回显其参数。
import ray
@ray.remote
class SimpleActor:
def echo(self, msg):
return msg
接下来实例化 Actor,并使用经典的 Ray Core API remote
和 ray.get
在 Actor 上执行任务。
import time
a = SimpleActor.remote()
# warmup
for _ in range(5):
msg_ref = a.echo.remote("hello")
ray.get(msg_ref)
start = time.perf_counter()
msg_ref = a.echo.remote("hello")
ray.get(msg_ref)
end = time.perf_counter()
print(f"Execution takes {(end - start) * 1000 * 1000} us")
Execution takes 969.0364822745323 us
现在,使用 Ray 编译图创建一个等效程序。首先,使用经典的 Ray Core 定义一个要执行的图,无需任何编译。稍后,编译此图,以应用优化并防止对图进行进一步修改。
首先,创建一个 Ray DAG(有向无环图),它是一个 Ray 任务的惰性执行图。请注意与经典 Ray Core API 的 3 个主要区别
使用
ray.dag.InputNode
上下文管理器指示应在运行时提供给 DAG 的哪些输入。使用
execute()
执行 DAG。
在这里,定义并执行一个图。请注意,这里没有进行编译。它使用与前一个示例相同的执行后端
import ray.dag
with ray.dag.InputNode() as inp:
# Note that it uses `bind` instead of `remote`.
# This returns a ray.dag.DAGNode, instead of the usual ray.ObjectRef.
dag = a.echo.bind(inp)
# warmup
for _ in range(5):
msg_ref = dag.execute("hello")
ray.get(msg_ref)
start = time.perf_counter()
# `dag.execute` runs the DAG and returns an ObjectRef. You can use `ray.get` API.
msg_ref = dag.execute("hello")
ray.get(msg_ref)
end = time.perf_counter()
print(f"Execution takes {(end - start) * 1000 * 1000} us")
接下来,使用 experimental_compile
API 编译 dag
。该图使用相同的 API 进行执行
dag = dag.experimental_compile()
# warmup
for _ in range(5):
msg_ref = dag.execute("hello")
ray.get(msg_ref)
start = time.perf_counter()
# `dag.execute` runs the DAG and returns CompiledDAGRef. Similar to
# ObjectRefs, you can use the ray.get API.
msg_ref = dag.execute("hello")
ray.get(msg_ref)
end = time.perf_counter()
print(f"Execution takes {(end - start) * 1000 * 1000} us")
Execution takes 86.72196418046951 us
同一个任务图的性能提高了 10 倍。这是因为函数 echo
开销很小,因此受系统开销影响很大。由于各种簿记和分布式协议,经典的 Ray Core API 通常有 1 毫秒以上的系统开销。
由于系统预先知道任务图,Ray 编译图可以提前预分配所有必要的资源,并大大降低系统开销。例如,如果 Actor a
与驱动程序在同一个节点上,Ray 编译图使用共享内存而不是 RPC 直接在驱动程序和 Actor 之间传输数据。
目前,DAG 任务在相关 Actor 的后台线程上运行。一个 Actor 一次只能参与一个 DAG。Actor 在参与编译图的同时,普通任务仍然可以在 Actor 上执行,但这些任务在主线程上执行。
完成后,您可以通过删除编译图或显式调用 dag.teardown()
来将其销毁。这允许在新的编译图中重复使用 Actor。
dag.teardown()
指定数据依赖项#
创建 DAG 时,可以将 ray.dag.DAGNode
作为参数传递给其他 .bind
调用来指定数据依赖项。例如,以下内容使用前面的示例创建一个 DAG,该 DAG 将同一消息从一个 Actor 传递到另一个 Actor
a = SimpleActor.remote()
b = SimpleActor.remote()
with ray.dag.InputNode() as inp:
# Note that it uses `bind` instead of `remote`.
# This returns a ray.dag.DAGNode, instead of the usual ray.ObjectRef.
dag = a.echo.bind(inp)
dag = b.echo.bind(dag)
dag = dag.experimental_compile()
print(ray.get(dag.execute("hello")))
hello
这是另一个示例,它将同一消息传递给两个 Actor,然后这两个 Actor 可以并行执行。它使用 ray.dag.MultiOutputNode
指示此 DAG 返回多个输出。然后,dag.execute()
返回多个 CompiledDAGRef
对象,每个节点一个
import ray.dag
a = SimpleActor.remote()
b = SimpleActor.remote()
with ray.dag.InputNode() as inp:
# Note that it uses `bind` instead of `remote`.
# This returns a ray.dag.DAGNode, instead of the usual ray.ObjectRef.
dag = ray.dag.MultiOutputNode([a.echo.bind(inp), b.echo.bind(inp)])
dag = dag.experimental_compile()
print(ray.get(dag.execute("hello")))
Execution takes 86.72196418046951 us
请注意:* 在同一个 Actor 上,编译图按顺序执行。如果一个 Actor 在同一个编译图中有多个任务,它会先执行完所有任务,然后才开始执行下一个 DAG 输入。* 在同一个编译图中的 Actor 之间,执行可以是流水线的。一个 Actor 可能在下游 Actor 执行当前输入时开始执行下一个 DAG 输入。* 编译图目前仅支持 Actor 任务。不支持非 Actor 任务。
asyncio
支持#
如果您的编译图驱动程序在 asyncio
事件循环中运行,请使用 async
API 确保执行编译图并获取结果不会阻塞事件循环。首先,将 enable_async=True
传递给 dag.experimental_compile()
import ray
@ray.remote
class EchoActor:
def echo(self, msg):
return msg
actor = EchoActor.remote()
with ray.dag.InputNode() as inp:
dag = actor.echo.bind(inp)
cdag = dag.experimental_compile(enable_asyncio=True)
接下来,使用 execute_async
调用编译图。对 execute_async
调用 await
将在输入提交后返回,并返回一个可用于获取结果的 future。最后,使用 await
获取编译图的结果。
import asyncio
async def async_method(i):
fut = await cdag.execute_async(i)
result = await fut
assert result == i
loop = asyncio.get_event_loop()
loop.run_until_complete(async_method(42))
执行和故障语义#
与经典 Ray Core 一样,Ray 编译图将异常传播到最终输出。特别是
应用异常:如果应用任务抛出异常,编译图会将异常封装在
RayTaskError
中,并在调用方对结果调用ray.get()
时抛出该异常。抛出的异常同时继承自RayTaskError
和原始异常类。系统异常:系统异常包括 Actor 死亡或意外错误(例如网络错误)。对于 Actor 死亡,编译图会抛出
ActorDiedError
,对于其他错误,它会抛出RayChannelError
。
图在应用异常后仍然可以执行。但是,在发生系统异常时,图会自动关闭。如果 Actor 死亡导致图关闭,剩余的 Actor 会继续存活。
例如,此示例在 Actor 参与编译图时显式销毁了该 Actor。剩余的 Actor 是可重用的
from ray.dag import InputNode, MultiOutputNode
@ray.remote
class EchoActor:
def echo(self, msg):
return msg
actors = [EchoActor.remote() for _ in range(4)]
with InputNode() as inp:
outputs = [actor.echo.bind(inp) for actor in actors]
dag = MultiOutputNode(outputs)
compiled_dag = dag.experimental_compile()
# Kill one of the actors to simulate unexpected actor death.
ray.kill(actors[0])
ref = compiled_dag.execute(1)
live_actors = []
try:
ray.get(ref)
except ray.exceptions.ActorDiedError:
# At this point, the Compiled Graph is shutting down.
for actor in actors:
try:
# Check for live actors.
ray.get(actor.echo.remote("ping"))
live_actors.append(actor)
except ray.exceptions.RayActorError:
pass
# Optionally, use the live actors to create a new Compiled Graph.
assert live_actors == actors[1:]
执行超时#
某些错误,例如 NCCL 网络错误,需要额外处理以避免挂起。将来,Ray 可能会尝试检测此类错误,但目前作为一种回退机制,它允许为 compiled_dag.execute()
和 ray.get()
配置超时。
两者的默认超时时间均为 10 秒。设置以下环境变量可以更改默认超时时间
RAY_CGRAPH_submit_timeout
:compiled_dag.execute()
的超时时间。RAY_CGRAPH_get_timeout
:ray.get()
的超时时间。
ray.get()
也有一个 timeout 参数,可以按每次调用设置超时时间。
CPU 到 GPU 通信#
使用经典的 Ray Core 时,在 Actor 之间传递 torch.Tensors
可能会变得昂贵,尤其是在设备之间传输时。这是因为 Ray Core 不知道最终目标设备。因此,您可能会看到源设备和目标设备以外的设备之间进行不必要的复制。
Ray 编译图自带对在不同设备上执行的 Actor 之间传递 torch.Tensors
的原生支持。开发人员现在可以在编译图声明中使用类型提示注解来指示 torch.Tensor
的最终目标设备。
import torch
import ray
import ray.dag
@ray.remote(num_gpus=1)
class GPUActor:
def process(self, tensor: torch.Tensor):
assert tensor.device.type == "cuda"
return tensor.shape
actor = GPUActor.remote()
在 Ray Core 中,如果您尝试从驱动程序传递 CPU 张量,GPU Actor 会接收到 CPU 张量
# This will fail because the driver passes a CPU copy of the tensor,
# and the GPU actor also receives a CPU copy.
ray.get(actor.process.remote(torch.zeros(10)))
使用 Ray 编译图,您可以用类型提示注解 DAG 节点,以指示值中可能包含 torch.Tensor
with ray.dag.InputNode() as inp:
inp = inp.with_tensor_transport(device="cuda")
dag = actor.process.bind(inp)
cdag = dag.experimental_compile()
print(ray.get(cdag.execute(torch.zeros(10))))
在底层,Ray 编译图后端将 torch.Tensor
复制到 Ray Core 分配给 GPUActor
的 GPU 上。
当然,您也可以自己完成,但使用编译图有其优势
Ray 编译图可以最大限度地减少数据复制次数。例如,从一个 CPU 传递到多个 GPU 需要一次复制到共享内存缓冲区,然后每个目标 GPU 需要一次主机到设备的复制。
将来,这可以通过诸如 内存锁定 等技术进一步优化,当 CPU 是目标时使用零拷贝反序列化等。
GPU 到 GPU 通信#
Ray 编译图支持基于 NCCL 的 CUDA torch.Tensor
对象传输,避免通过 Ray 基于 CPU 的共享内存对象存储进行任何复制。通过用户提供的类型提示,Ray 可以提前准备 NCCL 通信器和操作调度,从而避免死锁并实现计算与通信的重叠。
Ray 编译图底层使用 cupy 来支持 NCCL 操作。cupy 版本会影响 NCCL 版本。Ray 团队未来还计划支持自定义通信器,例如支持跨 CPU 的集合操作或重用现有集合组。
首先,创建发送方和接收方 Actor。请注意,此示例需要至少 2 个 GPU。
import torch
import ray
import ray.dag
from ray.experimental.channel.torch_tensor_type import TorchTensorType
# Note that the following example requires at least 2 GPUs.
assert (
ray.available_resources().get("GPU") >= 2
), "At least 2 GPUs are required to run this example."
@ray.remote(num_gpus=1)
class GPUSender:
def send(self, shape):
return torch.zeros(shape, device="cuda")
@ray.remote(num_gpus=1)
class GPUReceiver:
def recv(self, tensor: torch.Tensor):
assert tensor.device.type == "cuda"
return tensor.shape
sender = GPUSender.remote()
receiver = GPUReceiver.remote()
为了支持使用 NCCL 进行 GPU 到 GPU 通信,请使用 with_tensor_transport
API 提示包装包含要传输的 torch.Tensor
的 DAG 节点
with ray.dag.InputNode() as inp:
dag = sender.send.bind(inp)
# Add a type hint that the return value of `send` should use NCCL.
dag = dag.with_tensor_transport("nccl")
# NOTE: With ray<2.42, use `with_type_hint()` instead.
# dag = dag.with_type_hint(TorchTensorType(transport="nccl"))
dag = receiver.recv.bind(dag)
# Compile API prepares the NCCL communicator across all workers and schedule operations
# accordingly.
dag = dag.experimental_compile()
assert ray.get(dag.execute((10,))) == (10,)
当前的限制包括
仅支持
torch.Tensor
和 NVIDIA NCCL支持点对点传输。集合通信操作即将推出。
通信操作目前是同步进行的。计算与通信的重叠 是一个实验性功能。