AsyncIO / Actor 并发#

在单个 actor 进程中,可以执行并发线程。

Ray 在 actor 中提供了两种并发类型

请注意,Python 的 全局解释器锁 (GIL) 一次只允许一个 Python 代码线程运行。

这意味着,如果您只是并行化 Python 代码,您将无法获得真正的并行性。如果您调用 Numpy、Cython、Tensorflow 或 PyTorch 代码,这些库在调用 C/C++ 函数时会释放 GIL。

无论是 线程 Actor 还是 Actor 的 AsyncIO 模型都无法绕过 GIL。

Actor 的 AsyncIO#

自 Python 3.5 起,可以使用 async/await 语法编写并发代码。Ray 原生集成 asyncio。您可以将 Ray 与 aiohttp、aioredis 等流行的异步框架一起使用。

import ray
import asyncio

@ray.remote
class AsyncActor:
    def __init__(self, expected_num_tasks: int):
        self._event = asyncio.Event()
        self._curr_num_tasks = 0
        self._expected_num_tasks = expected_num_tasks

    # Multiple invocations of this method can run concurrently on the same event loop.
    async def run_concurrent(self):
        self._curr_num_tasks += 1
        if self._curr_num_tasks == self._expected_num_tasks:
            print("All coroutines are executing concurrently, unblocking.")
            self._event.set()
        else:
            print("Waiting for other coroutines to start.")

        await self._event.wait()
        print("All coroutines ran concurrently.")

actor = AsyncActor.remote(4)
refs = [actor.run_concurrent.remote() for _ in range(4)]

# Fetch results using regular `ray.get`.
ray.get(refs)

# Fetch results using `asyncio` APIs.
async def get_async():
    return await asyncio.gather(*refs)
asyncio.run(get_async())
(AsyncActor pid=9064) Waiting for other coroutines to start.
(AsyncActor pid=9064) Waiting for other coroutines to start.
(AsyncActor pid=9064) Waiting for other coroutines to start.
(AsyncActor pid=9064) All coroutines are executing concurrently, unblocking.
(AsyncActor pid=9064) All coroutines ran concurrently.
(AsyncActor pid=9064) All coroutines ran concurrently.
(AsyncActor pid=9064) All coroutines ran concurrently.
(AsyncActor pid=9064) All coroutines ran concurrently.
...

ObjectRefs 作为 asyncio.Futures#

ObjectRefs 可以转换为 asyncio.Futures。此功能使得可以在现有并发应用程序中 await Ray futures。

而不是

import ray

@ray.remote
def some_task():
    return 1

ray.get(some_task.remote())
ray.wait([some_task.remote()])

您可以使用 Python 3.9 和 Python 3.10 等待 ref

import ray
import asyncio

@ray.remote
def some_task():
    return 1

async def await_obj_ref():
    await some_task.remote()
    await asyncio.wait([some_task.remote()])

asyncio.run(await_obj_ref())

或者在 Python 3.11+ 中直接使用 Future 对象

import asyncio

async def convert_to_asyncio_future():
    ref = some_task.remote()
    fut: asyncio.Future = asyncio.wrap_future(ref.future())
    print(await fut)
asyncio.run(convert_to_asyncio_future())
1

有关更多 asyncio 模式(包括超时和 asyncio.gather),请参阅 asyncio 文档

ObjectRefs 作为 concurrent.futures.Futures#

ObjectRefs 也可以被包装成 concurrent.futures.Future 对象。这对于与现有 concurrent.futures API 进行交互非常有用。

import concurrent

refs = [some_task.remote() for _ in range(4)]
futs = [ref.future() for ref in refs]
for fut in concurrent.futures.as_completed(futs):
    assert fut.done()
    print(fut.result())
1
1
1
1

定义一个异步 Actor#

通过使用 async 方法定义,Ray 将自动检测 actor 是否支持 async 调用。

import ray
import asyncio


@ray.remote
class AsyncActor:
    def __init__(self, expected_num_tasks: int):
        self._event = asyncio.Event()
        self._curr_num_tasks = 0
        self._expected_num_tasks = expected_num_tasks

    async def run_task(self):
        print("Started task")
        self._curr_num_tasks += 1
        if self._curr_num_tasks == self._expected_num_tasks:
            self._event.set()
        else:
            # Yield the event loop for multiple coroutines to run concurrently.
            await self._event.wait()

        print("Finished task")

actor = AsyncActor.remote(5)
# All 5 tasks will start at once and run concurrently.
ray.get([actor.run_task.remote() for _ in range(5)])
(AsyncActor pid=3456) Started task
(AsyncActor pid=3456) Started task
(AsyncActor pid=3456) Started task
(AsyncActor pid=3456) Started task
(AsyncActor pid=3456) Started task
(AsyncActor pid=3456) Finished task
(AsyncActor pid=3456) Finished task
(AsyncActor pid=3456) Finished task
(AsyncActor pid=3456) Finished task
(AsyncActor pid=3456) Finished task

在底层,Ray 在单个 Python 事件循环中运行所有方法。请注意,在异步 actor 方法中运行阻塞的 ray.getray.wait 是不允许的,因为 ray.get 会阻塞事件循环的执行。

在异步 actor 中,任何时候只能运行一个任务(尽管任务可以被复用)。AsyncActor 中只有一个线程!如果您需要线程池,请参阅 线程 Actor

设置异步 Actor 的并发性#

您可以使用 max_concurrency 标志来设置同时运行的“并发”任务数量。默认情况下,可以同时运行 1000 个任务。

import asyncio
import ray

@ray.remote
class AsyncActor:
    def __init__(self, batch_size: int):
        self._event = asyncio.Event()
        self._curr_tasks = 0
        self._batch_size = batch_size

    async def run_task(self):
        print("Started task")
        self._curr_tasks += 1
        if self._curr_tasks == self._batch_size:
            self._event.set()
        else:
            await self._event.wait()
            self._event.clear()
            self._curr_tasks = 0

        print("Finished task")

actor = AsyncActor.options(max_concurrency=2).remote(2)

# Only 2 tasks will run concurrently.
# Once 2 finish, the next 2 should run.
ray.get([actor.run_task.remote() for _ in range(8)])
(AsyncActor pid=5859) Started task
(AsyncActor pid=5859) Started task
(AsyncActor pid=5859) Finished task
(AsyncActor pid=5859) Finished task
(AsyncActor pid=5859) Started task
(AsyncActor pid=5859) Started task
(AsyncActor pid=5859) Finished task
(AsyncActor pid=5859) Finished task
(AsyncActor pid=5859) Started task
(AsyncActor pid=5859) Started task
(AsyncActor pid=5859) Finished task
(AsyncActor pid=5859) Finished task
(AsyncActor pid=5859) Started task
(AsyncActor pid=5859) Started task
(AsyncActor pid=5859) Finished task
(AsyncActor pid=5859) Finished task

线程 Actor#

有时,asyncio 对于您的 actor 来说并不是理想的解决方案。例如,您可能有一个方法执行一些计算密集型任务,同时阻塞事件循环,并且不通过 await 来放弃控制。这会损害异步 Actor 的性能,因为异步 Actor 一次只能执行 1 个任务,并且依赖于 await 进行上下文切换。

相反,您可以在没有异步方法的情况下使用 max_concurrency Actor 选项,从而实现线程并发(例如线程池)。

警告

当 actor 定义中至少有一个 async def 方法时,Ray 会将 actor 识别为 AsyncActor 而不是 ThreadedActor。

@ray.remote
class ThreadedActor:
    def task_1(self): print("I'm running in a thread!")
    def task_2(self): print("I'm running in another thread!")

a = ThreadedActor.options(max_concurrency=2).remote()
ray.get([a.task_1.remote(), a.task_2.remote()])
(ThreadedActor pid=4822) I'm running in a thread!
(ThreadedActor pid=4822) I'm running in another thread!

每个线程 actor 的调用都将在一个线程池中运行。线程池的大小受 max_concurrency 值限制。

远程任务的 AsyncIO#

我们不支持远程任务的 asyncio。以下代码片段将失败

@ray.remote
async def f():
    pass

相反,您可以将 async 函数包装在一个包装器中以同步运行任务。

async def f():
    pass

@ray.remote
def wrapper():
    import asyncio
    asyncio.run(f())