Ray 生成器#
Python 生成器的行为类似于迭代器,每次迭代会产生一个值。Ray 也支持生成器 API。
任何使用 ray.remote
装饰的生成器函数都会成为一个 Ray 生成器任务。生成器任务在任务完成前会将输出流式传输回调用者。
+import ray
import time
# Takes 25 seconds to finish.
[email protected]
def f():
for i in range(5):
time.sleep(5)
yield i
-for obj in f():
+for obj_ref in f.remote():
# Prints every 5 seconds and stops after 25 seconds.
- print(obj)
+ print(ray.get(obj_ref))
上面的 Ray 生成器每 5 秒产生一次输出,共 5 次。对于普通 Ray 任务,您必须等待 25 秒才能访问输出。而对于 Ray 生成器,调用者可以在任务 f
完成之前访问对象引用。
Ray 生成器在以下情况下非常有用:
您希望通过在任务完成前产生 (yield) 并垃圾回收 (GC) 输出,来减少堆内存或对象存储内存的使用。
您熟悉 Python 生成器,并希望获得等效的编程模型。
Ray 库使用 Ray 生成器来支持流式使用案例
Ray 生成器与现有 Ray API 无缝协作
您可以在 Actor 任务和非 Actor 任务中使用 Ray 生成器。
Ray 生成器兼容内置的容错功能,例如重试或 lineage 重建。
Ray 生成器兼容 ray.wait、ray.cancel 等 Ray API。
入门#
定义一个 Python 生成器函数,并使用 ray.remote
对其进行装饰,以创建 Ray 生成器。
import ray
import time
@ray.remote
def task():
for i in range(5):
time.sleep(5)
yield i
Ray 生成器任务返回一个 ObjectRefGenerator
对象,它兼容生成器和异步生成器 API。您可以访问该类的 next
、__iter__
、__anext__
、__aiter__
API。
每当任务调用 yield
时,生成器中就会有一个相应的输出就绪,并作为 Ray 对象引用可用。您可以调用 next(gen)
获取对象引用。如果 next
没有更多项可生成,它会引发 StopIteration
。如果 __anext__
没有更多项可生成,它会引发 StopAsyncIteration
。
next
API 会阻塞线程,直到任务使用 yield
生成下一个对象引用。由于 ObjectRefGenerator
只是一个 Python 生成器,您也可以使用 for 循环来迭代对象引用。
如果您想避免阻塞线程,可以使用 asyncio 或ray.wait API。
gen = task.remote()
# Blocks for 5 seconds.
ref = next(gen)
# return 0
ray.get(ref)
# Blocks for 5 seconds.
ref = next(gen)
# Return 1
ray.get(ref)
# Returns 2~4 every 5 seconds.
for ref in gen:
print(ray.get(ref))
注意
对于普通的 Python 生成器,生成器函数在对其调用 next
函数时会暂停和恢复。Ray 会立即执行生成器任务直到完成,而不管调用者是否正在轮询部分结果。
错误处理#
如果生成器任务发生故障(由于应用程序异常或系统错误,例如意外的节点故障),next(gen)
会返回包含异常的对象引用。当您调用 ray.get
时,Ray 会引发异常。
@ray.remote
def task():
for i in range(5):
time.sleep(1)
if i == 1:
raise ValueError
yield i
gen = task.remote()
# it's okay.
ray.get(next(gen))
# Raises an exception
try:
ray.get(next(gen))
except ValueError as e:
print(f"Exception is raised when i == 1 as expected {e}")
在上面的例子中,如果应用程序导致任务失败,Ray 会按照正确的顺序返回带有异常的对象引用。例如,如果 Ray 在第二次 yield 后引发异常,则第三次 next(gen)
将始终返回带有异常的对象引用。如果系统错误导致任务失败(例如,节点故障或 worker 进程故障),next(gen)
会在任何时候返回包含系统级异常的对象引用,且不保证顺序。这意味着当您有 N 次 yield 时,在发生故障时,生成器可以创建从 1 到 N + 1 个对象引用(N 个输出 + 一个带有系统级异常的引用)。
Actor 任务中的生成器#
Ray 生成器兼容 所有 Actor 执行模型。它可以与常规 Actor、异步 Actor 和线程化 Actor 无缝协作。
@ray.remote
class Actor:
def f(self):
for i in range(5):
yield i
@ray.remote
class AsyncActor:
async def f(self):
for i in range(5):
yield i
@ray.remote(max_concurrency=5)
class ThreadedActor:
def f(self):
for i in range(5):
yield i
actor = Actor.remote()
for ref in actor.f.remote():
print(ray.get(ref))
actor = AsyncActor.remote()
for ref in actor.f.remote():
print(ray.get(ref))
actor = ThreadedActor.remote()
for ref in actor.f.remote():
print(ray.get(ref))
将 Ray 生成器与 asyncio 结合使用#
返回的 ObjectRefGenerator
也兼容 asyncio。您可以使用 __anext__
或 async for
循环。
import asyncio
@ray.remote
def task():
for i in range(5):
time.sleep(1)
yield i
async def main():
async for ref in task.remote():
print(await ref)
asyncio.run(main())
对象引用的垃圾回收#
从 next(generator)
返回的引用只是一个常规的 Ray 对象引用,并以同样的方式进行分布式引用计数。如果通过 next
API 未从生成器中消费引用,则在生成器被垃圾回收 (GC) 时,这些引用也会被回收。
@ray.remote
def task():
for i in range(5):
time.sleep(1)
yield i
gen = task.remote()
ref1 = next(gen)
del gen
在以下示例中,Ray 在返回 ref1
后将其视为一个正常的 Ray 对象引用。其他未使用 next(gen)
消费的引用会在生成器被 GC 时移除。在本例中,垃圾回收发生当您调用 del gen
时。
容错#
容错功能兼容 Ray 生成器任务和 Actor 任务。例如:
任务容错功能:
max_retries
,retry_exceptions
Actor 容错功能:
max_restarts
,max_task_retries
对象容错功能:对象重建
取消#
ray.cancel()
函数适用于 Ray 生成器任务和 Actor 任务。从语义上讲,取消生成器任务与取消普通任务没有区别。当您取消任务时,next(gen)
可能会返回包含 TaskCancelledError
的引用,且不提供任何特定的顺序保证。
如何在不阻塞线程的情况下等待生成器(与 ray.wait 和 ray.get 的兼容性)#
使用生成器时,next
API 会阻塞其线程,直到下一个对象引用可用。但是,您可能不总是需要这种行为。您可能希望在不阻塞线程的情况下等待生成器。使用 Ray 生成器可以通过以下方式实现非阻塞等待
等待生成器任务完成
ObjectRefGenerator
有一个 API completed
。它返回一个对象引用,该引用在生成器任务完成或出错时可用。例如,您可以执行 ray.get(<generator_instance>.completed())
来等待任务完成。请注意,不允许对 ObjectRefGenerator
使用 ray.get
。
使用 asyncio 和 await
ObjectRefGenerator
兼容 asyncio。您可以创建多个 asyncio 任务来创建生成器任务并等待其完成,从而避免阻塞线程。
import asyncio
@ray.remote
def task():
for i in range(5):
time.sleep(1)
yield i
async def async_task():
async for ref in task.remote():
print(await ref)
async def main():
t1 = async_task()
t2 = async_task()
await asyncio.gather(t1, t2)
asyncio.run(main())
使用 ray.wait
您可以将 ObjectRefGenerator
作为输入传递给 ray.wait
。如果存在 next item
可用,则生成器被认为是“就绪”的。一旦 Ray 在就绪列表中找到该生成器,next(gen)
会立即返回下一个对象引用,而不会阻塞。请参阅下面的示例了解更多详细信息。
@ray.remote
def task():
for i in range(5):
time.sleep(5)
yield i
gen = task.remote()
# Because it takes 5 seconds to make the first yield,
# with 0 timeout, the generator is unready.
ready, unready = ray.wait([gen], timeout=0)
print("timeout 0, nothing is ready.")
print(ready)
assert len(ready) == 0
assert len(unready) == 1
# Without a timeout argument, ray.wait waits until the given argument
# is ready. When a next item is ready, it returns.
ready, unready = ray.wait([gen])
print("Wait for 5 seconds. The next item is ready.")
assert len(ready) == 1
assert len(unready) == 0
next(gen)
# Because the second yield hasn't happened yet,
ready, unready = ray.wait([gen], timeout=0)
print("Wait for 0 seconds. The next item is not ready.")
print(ready, unready)
assert len(ready) == 0
assert len(unready) == 1
ray.wait
的所有输入参数(例如 timeout
、num_returns
和 fetch_local
)都适用于生成器。
ray.wait
可以将常规 Ray 对象引用与生成器混合作为输入。在这种情况下,ray.wait
的所有输入参数(例如 timeout
、num_returns
和 fetch_local
)的行为与生成器一起使用时保持一致。
from ray._raylet import ObjectRefGenerator
@ray.remote
def generator_task():
for i in range(5):
time.sleep(5)
yield i
@ray.remote
def regular_task():
for i in range(5):
time.sleep(5)
return
gen = [generator_task.remote()]
ref = [regular_task.remote()]
ready, unready = [], [*gen, *ref]
result = []
while unready:
ready, unready = ray.wait(unready)
for r in ready:
if isinstance(r, ObjectRefGenerator):
try:
ref = next(r)
result.append(ray.get(ref))
except StopIteration:
pass
else:
unready.append(r)
else:
result.append(ray.get(r))
线程安全#
ObjectRefGenerator
对象不是线程安全的。
限制#
Ray 生成器不支持以下功能
throw
、send
和close
API。生成器中的
return
语句。将
ObjectRefGenerator
传递给其他任务或 Actor。