AsyncIO / Actors 并发#

在单个 actor 进程中,可以执行并发线程。

Ray 在一个 actor 中提供两种类型的并发:

请记住,Python 的 全局解释器锁 (GIL) 只允许一次运行一个 Python 代码线程。

这意味着如果你只是并行化 Python 代码,你将无法获得真正的并行性。如果你调用 Numpy、Cython、Tensorflow 或 PyTorch 代码,这些库在调用 C/C++ 函数时会释放 GIL。

线程化 ActorsActors 的 AsyncIO 模型都无法绕过 GIL。

Actors 的 AsyncIO#

自 Python 3.5 起,可以使用 async/await 语法编写并发代码。Ray 原生集成了 asyncio。你可以将 ray 与流行的异步框架(如 aiohttp、aioredis 等)一起使用。

import ray
import asyncio

@ray.remote
class AsyncActor:
    # multiple invocation of this method can be running in
    # the event loop at the same time
    async def run_concurrent(self):
        print("started")
        await asyncio.sleep(2) # concurrent workload here
        print("finished")

actor = AsyncActor.remote()

# regular ray.get
ray.get([actor.run_concurrent.remote() for _ in range(4)])

# async ray.get
async def async_get():
    await actor.run_concurrent.remote()
asyncio.run(async_get())
(AsyncActor pid=40293) started
(AsyncActor pid=40293) started
(AsyncActor pid=40293) started
(AsyncActor pid=40293) started
(AsyncActor pid=40293) finished
(AsyncActor pid=40293) finished
(AsyncActor pid=40293) finished
(AsyncActor pid=40293) finished
...

将 ObjectRefs 作为 asyncio.Futures#

ObjectRefs 可以转换为 asyncio.Futures。此功能使得在现有并发应用中 await ray future 成为可能。

代替

import ray

@ray.remote
def some_task():
    return 1

ray.get(some_task.remote())
ray.wait([some_task.remote()])

你可以这样做:

import ray
import asyncio

@ray.remote
def some_task():
    return 1

async def await_obj_ref():
    await some_task.remote()
    await asyncio.wait([some_task.remote()])

asyncio.run(await_obj_ref())

有关更多 asyncio 模式,包括超时和 asyncio.gather,请参阅 asyncio 文档

如果你需要直接访问 future 对象,可以调用

import asyncio

async def convert_to_asyncio_future():
    ref = some_task.remote()
    fut: asyncio.Future = asyncio.wrap_future(ref.future())
    print(await fut)
asyncio.run(convert_to_asyncio_future())
1

将 ObjectRefs 作为 concurrent.futures.Futures#

ObjectRefs 也可以被包装成 concurrent.futures.Future 对象。这对于与现有 concurrent.futures API 交互非常有用

import concurrent

refs = [some_task.remote() for _ in range(4)]
futs = [ref.future() for ref in refs]
for fut in concurrent.futures.as_completed(futs):
    assert fut.done()
    print(fut.result())
1
1
1
1

定义 Async Actor#

通过使用 async 方法定义,Ray 会自动检测 actor 是否支持 async 调用。

import asyncio

@ray.remote
class AsyncActor:
    async def run_task(self):
        print("started")
        await asyncio.sleep(2) # Network, I/O task here
        print("ended")

actor = AsyncActor.remote()
# All 5 tasks should start at once. After 2 second they should all finish.
# they should finish at the same time
ray.get([actor.run_task.remote() for _ in range(5)])
(AsyncActor pid=3456) started
(AsyncActor pid=3456) started
(AsyncActor pid=3456) started
(AsyncActor pid=3456) started
(AsyncActor pid=3456) started
(AsyncActor pid=3456) ended
(AsyncActor pid=3456) ended
(AsyncActor pid=3456) ended
(AsyncActor pid=3456) ended
(AsyncActor pid=3456) ended

底层实现上,Ray 在单个 Python 事件循环内运行所有方法。请注意,不允许在 async actor 方法内运行阻塞的 ray.getray.wait,因为 ray.get 会阻塞事件循环的执行。

在 async actors 中,任何时候只能运行一个任务(尽管任务可以多路复用)。AsyncActor 中将只有一个线程!如果你想要线程池,请参阅 线程化 Actors

在 Async Actors 中设置并发#

你可以使用 max_concurrency 标志设置同时运行的“并发”任务数量。默认情况下,可以同时运行 1000 个任务。

import asyncio

@ray.remote
class AsyncActor:
    async def run_task(self):
        print("started")
        await asyncio.sleep(1) # Network, I/O task here
        print("ended")

actor = AsyncActor.options(max_concurrency=2).remote()

# Only 2 tasks will be running concurrently. Once 2 finish, the next 2 should run.
ray.get([actor.run_task.remote() for _ in range(8)])
(AsyncActor pid=5859) started
(AsyncActor pid=5859) started
(AsyncActor pid=5859) ended
(AsyncActor pid=5859) ended
(AsyncActor pid=5859) started
(AsyncActor pid=5859) started
(AsyncActor pid=5859) ended
(AsyncActor pid=5859) ended
(AsyncActor pid=5859) started
(AsyncActor pid=5859) started
(AsyncActor pid=5859) ended
(AsyncActor pid=5859) ended
(AsyncActor pid=5859) started
(AsyncActor pid=5859) started
(AsyncActor pid=5859) ended
(AsyncActor pid=5859) ended

线程化 Actors#

有时,asyncio 对你的 actor 来说不是一个理想的解决方案。例如,你可能有一个方法执行计算密集型任务,同时阻塞事件循环,而不通过 await 交出控制权。这会损害 Async Actor 的性能,因为 Async Actor 一次只能执行 1 个任务,并依赖 await 进行上下文切换。

相反,你可以在不使用任何 async 方法的情况下,使用 max_concurrency Actor 选项,从而实现线程化并发(类似于线程池)。

警告

当 actor 定义中至少有一个 async def 方法时,Ray 会将该 actor 识别为 AsyncActor 而不是 ThreadedActor。

@ray.remote
class ThreadedActor:
    def task_1(self): print("I'm running in a thread!")
    def task_2(self): print("I'm running in another thread!")

a = ThreadedActor.options(max_concurrency=2).remote()
ray.get([a.task_1.remote(), a.task_2.remote()])
(ThreadedActor pid=4822) I'm running in a thread!
(ThreadedActor pid=4822) I'm running in another thread!

线程化 actor 的每次调用都将在线程池中运行。线程池的大小受 max_concurrency 值限制。

远程任务的 AsyncIO#

我们不支持远程任务的 asyncio。以下代码片段将会失败:

@ray.remote
async def f():
    pass

相反,你可以使用包装器来同步运行 async 函数的任务

async def f():
    pass

@ray.remote
def wrapper():
    import asyncio
    asyncio.run(f())