AsyncIO / Actor 并发#
在单个 actor 进程中,可以执行并发线程。
Ray 在 actor 中提供了两种并发类型
请注意,Python 的 全局解释器锁 (GIL) 一次只允许一个 Python 代码线程运行。
这意味着,如果您只是并行化 Python 代码,您将无法获得真正的并行性。如果您调用 Numpy、Cython、Tensorflow 或 PyTorch 代码,这些库在调用 C/C++ 函数时会释放 GIL。
无论是 线程 Actor 还是 Actor 的 AsyncIO 模型都无法绕过 GIL。
Actor 的 AsyncIO#
自 Python 3.5 起,可以使用 async/await 语法编写并发代码。Ray 原生集成 asyncio。您可以将 Ray 与 aiohttp、aioredis 等流行的异步框架一起使用。
import ray
import asyncio
@ray.remote
class AsyncActor:
def __init__(self, expected_num_tasks: int):
self._event = asyncio.Event()
self._curr_num_tasks = 0
self._expected_num_tasks = expected_num_tasks
# Multiple invocations of this method can run concurrently on the same event loop.
async def run_concurrent(self):
self._curr_num_tasks += 1
if self._curr_num_tasks == self._expected_num_tasks:
print("All coroutines are executing concurrently, unblocking.")
self._event.set()
else:
print("Waiting for other coroutines to start.")
await self._event.wait()
print("All coroutines ran concurrently.")
actor = AsyncActor.remote(4)
refs = [actor.run_concurrent.remote() for _ in range(4)]
# Fetch results using regular `ray.get`.
ray.get(refs)
# Fetch results using `asyncio` APIs.
async def get_async():
return await asyncio.gather(*refs)
asyncio.run(get_async())
(AsyncActor pid=9064) Waiting for other coroutines to start.
(AsyncActor pid=9064) Waiting for other coroutines to start.
(AsyncActor pid=9064) Waiting for other coroutines to start.
(AsyncActor pid=9064) All coroutines are executing concurrently, unblocking.
(AsyncActor pid=9064) All coroutines ran concurrently.
(AsyncActor pid=9064) All coroutines ran concurrently.
(AsyncActor pid=9064) All coroutines ran concurrently.
(AsyncActor pid=9064) All coroutines ran concurrently.
...
ObjectRefs 作为 asyncio.Futures#
ObjectRefs 可以转换为 asyncio.Futures。此功能使得可以在现有并发应用程序中 await Ray futures。
而不是
import ray
@ray.remote
def some_task():
return 1
ray.get(some_task.remote())
ray.wait([some_task.remote()])
您可以使用 Python 3.9 和 Python 3.10 等待 ref
import ray
import asyncio
@ray.remote
def some_task():
return 1
async def await_obj_ref():
await some_task.remote()
await asyncio.wait([some_task.remote()])
asyncio.run(await_obj_ref())
或者在 Python 3.11+ 中直接使用 Future 对象
import asyncio
async def convert_to_asyncio_future():
ref = some_task.remote()
fut: asyncio.Future = asyncio.wrap_future(ref.future())
print(await fut)
asyncio.run(convert_to_asyncio_future())
1
有关更多 asyncio 模式(包括超时和 asyncio.gather),请参阅 asyncio 文档。
ObjectRefs 作为 concurrent.futures.Futures#
ObjectRefs 也可以被包装成 concurrent.futures.Future 对象。这对于与现有 concurrent.futures API 进行交互非常有用。
import concurrent
refs = [some_task.remote() for _ in range(4)]
futs = [ref.future() for ref in refs]
for fut in concurrent.futures.as_completed(futs):
assert fut.done()
print(fut.result())
1
1
1
1
定义一个异步 Actor#
通过使用 async 方法定义,Ray 将自动检测 actor 是否支持 async 调用。
import ray
import asyncio
@ray.remote
class AsyncActor:
def __init__(self, expected_num_tasks: int):
self._event = asyncio.Event()
self._curr_num_tasks = 0
self._expected_num_tasks = expected_num_tasks
async def run_task(self):
print("Started task")
self._curr_num_tasks += 1
if self._curr_num_tasks == self._expected_num_tasks:
self._event.set()
else:
# Yield the event loop for multiple coroutines to run concurrently.
await self._event.wait()
print("Finished task")
actor = AsyncActor.remote(5)
# All 5 tasks will start at once and run concurrently.
ray.get([actor.run_task.remote() for _ in range(5)])
(AsyncActor pid=3456) Started task
(AsyncActor pid=3456) Started task
(AsyncActor pid=3456) Started task
(AsyncActor pid=3456) Started task
(AsyncActor pid=3456) Started task
(AsyncActor pid=3456) Finished task
(AsyncActor pid=3456) Finished task
(AsyncActor pid=3456) Finished task
(AsyncActor pid=3456) Finished task
(AsyncActor pid=3456) Finished task
在底层,Ray 在单个 Python 事件循环中运行所有方法。请注意,在异步 actor 方法中运行阻塞的 ray.get 或 ray.wait 是不允许的,因为 ray.get 会阻塞事件循环的执行。
在异步 actor 中,任何时候只能运行一个任务(尽管任务可以被复用)。AsyncActor 中只有一个线程!如果您需要线程池,请参阅 线程 Actor。
设置异步 Actor 的并发性#
您可以使用 max_concurrency 标志来设置同时运行的“并发”任务数量。默认情况下,可以同时运行 1000 个任务。
import asyncio
import ray
@ray.remote
class AsyncActor:
def __init__(self, batch_size: int):
self._event = asyncio.Event()
self._curr_tasks = 0
self._batch_size = batch_size
async def run_task(self):
print("Started task")
self._curr_tasks += 1
if self._curr_tasks == self._batch_size:
self._event.set()
else:
await self._event.wait()
self._event.clear()
self._curr_tasks = 0
print("Finished task")
actor = AsyncActor.options(max_concurrency=2).remote(2)
# Only 2 tasks will run concurrently.
# Once 2 finish, the next 2 should run.
ray.get([actor.run_task.remote() for _ in range(8)])
(AsyncActor pid=5859) Started task
(AsyncActor pid=5859) Started task
(AsyncActor pid=5859) Finished task
(AsyncActor pid=5859) Finished task
(AsyncActor pid=5859) Started task
(AsyncActor pid=5859) Started task
(AsyncActor pid=5859) Finished task
(AsyncActor pid=5859) Finished task
(AsyncActor pid=5859) Started task
(AsyncActor pid=5859) Started task
(AsyncActor pid=5859) Finished task
(AsyncActor pid=5859) Finished task
(AsyncActor pid=5859) Started task
(AsyncActor pid=5859) Started task
(AsyncActor pid=5859) Finished task
(AsyncActor pid=5859) Finished task
线程 Actor#
有时,asyncio 对于您的 actor 来说并不是理想的解决方案。例如,您可能有一个方法执行一些计算密集型任务,同时阻塞事件循环,并且不通过 await 来放弃控制。这会损害异步 Actor 的性能,因为异步 Actor 一次只能执行 1 个任务,并且依赖于 await 进行上下文切换。
相反,您可以在没有异步方法的情况下使用 max_concurrency Actor 选项,从而实现线程并发(例如线程池)。
警告
当 actor 定义中至少有一个 async def 方法时,Ray 会将 actor 识别为 AsyncActor 而不是 ThreadedActor。
@ray.remote
class ThreadedActor:
def task_1(self): print("I'm running in a thread!")
def task_2(self): print("I'm running in another thread!")
a = ThreadedActor.options(max_concurrency=2).remote()
ray.get([a.task_1.remote(), a.task_2.remote()])
(ThreadedActor pid=4822) I'm running in a thread!
(ThreadedActor pid=4822) I'm running in another thread!
每个线程 actor 的调用都将在一个线程池中运行。线程池的大小受 max_concurrency 值限制。
远程任务的 AsyncIO#
我们不支持远程任务的 asyncio。以下代码片段将失败
@ray.remote
async def f():
pass
相反,您可以将 async 函数包装在一个包装器中以同步运行任务。
async def f():
pass
@ray.remote
def wrapper():
import asyncio
asyncio.run(f())