带外通信#
通常,Ray actor 间的通信通过 actor 方法调用完成,数据通过分布式对象存储共享。然而,在某些用例中,带外通信可能很有用。
封装库进程#
许多库已经拥有成熟、高性能的内部通信栈,它们将 Ray 作为语言集成的 actor 调度器来利用。actor 之间的实际通信大多是使用现有通信栈进行带外完成的。例如,Horovod-on-Ray 使用 NCCL 或基于 MPI 的集体通信,而 RayDP 使用 Spark 的内部 RPC 和对象管理器。更多详情请参阅 Ray 分布式库模式。
Ray Collective#
Ray 的集体通信库(ray.util.collective
)允许分布式 CPU 或 GPU 之间进行高效的带外集体和点对点通信。更多详情请参阅 Ray Collective。
HTTP 服务器#
您可以在 actor 内部启动一个 HTTP 服务器,并向客户端暴露 HTTP 端点,这样 Ray 集群外部的用户就可以与该 actor 通信。
import ray
import asyncio
import requests
from aiohttp import web
@ray.remote
class Counter:
async def __init__(self):
self.counter = 0
asyncio.get_running_loop().create_task(self.run_http_server())
async def run_http_server(self):
app = web.Application()
app.add_routes([web.get("/", self.get)])
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, "127.0.0.1", 25001)
await site.start()
async def get(self, request):
return web.Response(text=str(self.counter))
async def increment(self):
self.counter = self.counter + 1
ray.init()
counter = Counter.remote()
[ray.get(counter.increment.remote()) for i in range(5)]
r = requests.get("http://127.0.0.1:25001/")
assert r.text == "5"
类似地,您也可以暴露其他类型的服务器(例如 gRPC 服务器)。
限制#
使用 Ray actor 进行带外通信时,请记住 Ray 不管理 actor 之间的调用。这意味着像分布式引用计数这样的功能将无法通过带外通信工作,因此您应该注意不要以这种方式传递对象引用。