Ray 生成器#
Python 生成器是行为类似于迭代器的函数,每次迭代生成一个值。Ray 也支持生成器 API。
任何使用 ray.remote 装饰的生成器函数都会成为一个 Ray 生成器任务。生成器任务在任务完成之前将输出流式传输回调用者。
+import ray
import time
# Takes 25 seconds to finish.
+@ray.remote
def f():
for i in range(5):
time.sleep(5)
yield i
-for obj in f():
+for obj_ref in f.remote():
# Prints every 5 seconds and stops after 25 seconds.
- print(obj)
+ print(ray.get(obj_ref))
上面的 Ray 生成器每 5 秒生成一次输出,共 5 次。对于普通的 Ray 任务,您必须等待 25 秒才能访问输出。对于 Ray 生成器,调用者可以在任务 f 完成之前访问对象引用。
Ray 生成器在以下情况下非常有用:
当您希望通过在任务完成前生成并垃圾回收 (GC) 输出,来减少堆内存或对象存储内存使用时。
当您熟悉 Python 生成器并希望获得等效的编程模型时。
Ray 库使用 Ray 生成器来支持流式用例
Ray 生成器与现有 Ray API 无缝集成
您可以在 actor 和非 actor 任务中使用 Ray 生成器。
Ray 生成器与内置的容错功能(如重试或 lineage 重构)配合使用。
Ray 生成器与 ray.wait、ray.cancel 等 Ray API 配合使用。
入门#
定义一个 Python 生成器函数并使用 ray.remote 装饰它,以创建 Ray 生成器。
import ray
import time
@ray.remote
def task():
for i in range(5):
time.sleep(5)
yield i
Ray 生成器任务返回一个 ObjectRefGenerator 对象,该对象与生成器和异步生成器 API 兼容。您可以访问该类的 next、__iter__、__anext__、__aiter__ API。
每当任务调用 yield 时,相应的输出就准备好并可从生成器中作为 Ray 对象引用获取。您可以调用 next(gen) 来获取一个对象引用。如果 next 没有更多项可生成,它将引发 StopIteration。如果 __anext__ 没有更多项可生成,它将引发 StopAsyncIteration。
next API 会阻塞线程,直到任务使用 yield 生成下一个对象引用。由于 ObjectRefGenerator 只是一个 Python 生成器,您也可以使用 for 循环来迭代对象引用。
如果您想避免阻塞线程,可以使用 asyncio 或ray.wait API。
gen = task.remote()
# Blocks for 5 seconds.
ref = next(gen)
# return 0
ray.get(ref)
# Blocks for 5 seconds.
ref = next(gen)
# Return 1
ray.get(ref)
# Returns 2~4 every 5 seconds.
for ref in gen:
print(ray.get(ref))
注意
对于普通的 Python 生成器,在调用生成器上的 next 函数时,生成器函数会被暂停并恢复。Ray 会主动执行生成器任务直到完成,而不管调用者是否正在轮询部分结果。
错误处理#
如果生成器任务发生失败(由于应用程序异常或系统错误,如意外的节点故障),next(gen) 会返回一个包含异常的对象引用。当您调用 ray.get 时,Ray 会引发该异常。
@ray.remote
def task():
for i in range(5):
time.sleep(1)
if i == 1:
raise ValueError
yield i
gen = task.remote()
# it's okay.
ray.get(next(gen))
# Raises an exception
try:
ray.get(next(gen))
except ValueError as e:
print(f"Exception is raised when i == 1 as expected {e}")
在上面的示例中,如果应用程序导致任务失败,Ray 会按正确的顺序返回包含异常的对象引用。例如,如果 Ray 在第二次 yield 后引发异常,第三个 next(gen) 始终返回一个包含异常的对象引用。如果系统错误导致任务失败(例如,节点故障或工作进程故障),next(gen) 会在任何时候返回包含系统级异常的对象引用,但没有排序保证。这意味着当您有 N 次 yield 时,生成器在发生故障时最多可以创建 N + 1 个对象引用(N 个输出 + 包含系统级异常的引用)。
来自 Actor 任务的生成器#
Ray 生成器与**所有 actor 执行模型**兼容。它与常规 actor、异步 actor 和线程 actor 无缝协作。
@ray.remote
class Actor:
def f(self):
for i in range(5):
yield i
@ray.remote
class AsyncActor:
async def f(self):
for i in range(5):
yield i
@ray.remote(max_concurrency=5)
class ThreadedActor:
def f(self):
for i in range(5):
yield i
actor = Actor.remote()
for ref in actor.f.remote():
print(ray.get(ref))
actor = AsyncActor.remote()
for ref in actor.f.remote():
print(ray.get(ref))
actor = ThreadedActor.remote()
for ref in actor.f.remote():
print(ray.get(ref))
将 Ray 生成器与 asyncio 一起使用#
返回的 ObjectRefGenerator 也与 asyncio 兼容。您可以使用 __anext__ 或 async for 循环。
import asyncio
@ray.remote
def task():
for i in range(5):
time.sleep(1)
yield i
async def main():
async for ref in task.remote():
print(await ref)
asyncio.run(main())
对象引用的垃圾回收#
从 next(generator) 返回的 ref 只是一个普通的 Ray 对象引用,并且以相同的方式进行分布式引用计数。如果引用的生成器未被 next API 消耗,则当生成器被 GC 时,引用将被垃圾回收 (GC)。
@ray.remote
def task():
for i in range(5):
time.sleep(1)
yield i
gen = task.remote()
ref1 = next(gen)
del gen
在下面的示例中,Ray 在返回 ref1 后将其视为一个普通的 Ray 对象引用。未通过 next(gen) 消耗的其他引用将在生成器被 GC 时被删除。在此示例中,垃圾回收发生在调用 del gen 时。
容错#
容错功能 可与 Ray 生成器任务和 actor 任务配合使用。例如:
任务容错功能:
max_retries,retry_exceptionsActor 容错功能:
max_restarts,max_task_retries对象容错功能:对象重构
取消#
函数 ray.cancel() 可与 Ray 生成器任务和 actor 任务配合使用。在语义上,取消生成器任务与取消常规任务没有区别。当您取消任务时,next(gen) 可以返回包含 TaskCancelledError 的引用,但没有任何特殊的排序保证。
如何等待生成器而不阻塞线程(与 ray.wait 和 ray.get 的兼容性)#
在使用生成器时,next API 会阻塞其线程,直到下一个对象引用可用。但是,您可能并不总是希望这种行为。您可能希望在不阻塞线程的情况下等待生成器。通过 Ray 生成器,可以通过以下方式实现非阻塞等待:
等待生成器任务完成
ObjectRefGenerator 有一个 completed API。它返回一个在生成器任务完成或出错时可用的对象引用。例如,您可以执行 ray.get(<generator_instance>.completed()) 来等待任务完成。请注意,不允许使用 ray.get 来获取 ObjectRefGenerator。
使用 asyncio 和 await
ObjectRefGenerator 与 asyncio 兼容。您可以创建多个 asyncio 任务来创建生成器任务并等待它,以避免阻塞线程。
import asyncio
@ray.remote
def task():
for i in range(5):
time.sleep(1)
yield i
async def async_task():
async for ref in task.remote():
print(await ref)
async def main():
t1 = async_task()
t2 = async_task()
await asyncio.gather(t1, t2)
asyncio.run(main())
使用 ray.wait
您可以将 ObjectRefGenerator 作为输入传递给 ray.wait。当 next item 可用时,生成器就“准备好了”。一旦 Ray 从就绪列表中找到,next(gen) 会立即返回下一个对象引用,而不会阻塞。有关更多详细信息,请参阅下面的示例。
@ray.remote
def task():
for i in range(5):
time.sleep(5)
yield i
gen = task.remote()
# Because it takes 5 seconds to make the first yield,
# with 0 timeout, the generator is unready.
ready, unready = ray.wait([gen], timeout=0)
print("timeout 0, nothing is ready.")
print(ready)
assert len(ready) == 0
assert len(unready) == 1
# Without a timeout argument, ray.wait waits until the given argument
# is ready. When a next item is ready, it returns.
ready, unready = ray.wait([gen])
print("Wait for 5 seconds. The next item is ready.")
assert len(ready) == 1
assert len(unready) == 0
next(gen)
# Because the second yield hasn't happened yet,
ready, unready = ray.wait([gen], timeout=0)
print("Wait for 0 seconds. The next item is not ready.")
print(ready, unready)
assert len(ready) == 0
assert len(unready) == 1
ray.wait 的所有输入参数(例如 timeout、num_returns 和 fetch_local)都适用于生成器。
ray.wait 可以将常规 Ray 对象引用与生成器混合作为输入。在这种情况下,应用程序应处理所有输入参数(例如 timeout、num_returns 和 fetch_local)从 ray.wait 中,它们适用于生成器。
from ray._raylet import ObjectRefGenerator
@ray.remote
def generator_task():
for i in range(5):
time.sleep(5)
yield i
@ray.remote
def regular_task():
for i in range(5):
time.sleep(5)
return
gen = [generator_task.remote()]
ref = [regular_task.remote()]
ready, unready = [], [*gen, *ref]
result = []
while unready:
ready, unready = ray.wait(unready)
for r in ready:
if isinstance(r, ObjectRefGenerator):
try:
ref = next(r)
result.append(ray.get(ref))
except StopIteration:
pass
else:
unready.append(r)
else:
result.append(ray.get(r))
线程安全#
ObjectRefGenerator 对象不是线程安全的。
限制#
Ray 生成器不支持以下功能:
throw、send和closeAPI。生成器中的
return语句。将
ObjectRefGenerator传递给另一个任务或 actor。