Ray 生成器#

Python 生成器的行为类似于迭代器,每次迭代会产生一个值。Ray 也支持生成器 API。

任何使用 ray.remote 装饰的生成器函数都会成为一个 Ray 生成器任务。生成器任务在任务完成前会将输出流式传输回调用者。

+import ray
 import time

 # Takes 25 seconds to finish.
[email protected]
 def f():
     for i in range(5):
         time.sleep(5)
         yield i

-for obj in f():
+for obj_ref in f.remote():
     # Prints every 5 seconds and stops after 25 seconds.
-    print(obj)
+    print(ray.get(obj_ref))

上面的 Ray 生成器每 5 秒产生一次输出,共 5 次。对于普通 Ray 任务,您必须等待 25 秒才能访问输出。而对于 Ray 生成器,调用者可以在任务 f 完成之前访问对象引用。

Ray 生成器在以下情况下非常有用:

  • 您希望通过在任务完成前产生 (yield) 并垃圾回收 (GC) 输出,来减少堆内存或对象存储内存的使用。

  • 您熟悉 Python 生成器,并希望获得等效的编程模型。

Ray 库使用 Ray 生成器来支持流式使用案例

  • Ray Serve 使用 Ray 生成器来支持流式响应

  • Ray Data 是一个流式数据处理库,它使用 Ray 生成器来控制和减少并发内存使用量。

Ray 生成器与现有 Ray API 无缝协作

  • 您可以在 Actor 任务和非 Actor 任务中使用 Ray 生成器。

  • Ray 生成器兼容所有 Actor 执行模型,包括 线程化 Actor异步 Actor

  • Ray 生成器兼容内置的容错功能,例如重试或 lineage 重建。

  • Ray 生成器兼容 ray.waitray.cancel 等 Ray API。

入门#

定义一个 Python 生成器函数,并使用 ray.remote 对其进行装饰,以创建 Ray 生成器。

import ray
import time

@ray.remote
def task():
    for i in range(5):
        time.sleep(5)
        yield i

Ray 生成器任务返回一个 ObjectRefGenerator 对象,它兼容生成器和异步生成器 API。您可以访问该类的 next__iter____anext____aiter__ API。

每当任务调用 yield 时,生成器中就会有一个相应的输出就绪,并作为 Ray 对象引用可用。您可以调用 next(gen) 获取对象引用。如果 next 没有更多项可生成,它会引发 StopIteration。如果 __anext__ 没有更多项可生成,它会引发 StopAsyncIteration

next API 会阻塞线程,直到任务使用 yield 生成下一个对象引用。由于 ObjectRefGenerator 只是一个 Python 生成器,您也可以使用 for 循环来迭代对象引用。

如果您想避免阻塞线程,可以使用 asyncio 或ray.wait API

gen = task.remote()
# Blocks for 5 seconds.
ref = next(gen)
# return 0
ray.get(ref)
# Blocks for 5 seconds.
ref = next(gen)
# Return 1
ray.get(ref)

# Returns 2~4 every 5 seconds.
for ref in gen:
    print(ray.get(ref))

注意

对于普通的 Python 生成器,生成器函数在对其调用 next 函数时会暂停和恢复。Ray 会立即执行生成器任务直到完成,而不管调用者是否正在轮询部分结果。

错误处理#

如果生成器任务发生故障(由于应用程序异常或系统错误,例如意外的节点故障),next(gen) 会返回包含异常的对象引用。当您调用 ray.get 时,Ray 会引发异常。

@ray.remote
def task():
    for i in range(5):
        time.sleep(1)
        if i == 1:
            raise ValueError
        yield i

gen = task.remote()
# it's okay.
ray.get(next(gen))

# Raises an exception
try:
    ray.get(next(gen))
except ValueError as e:
    print(f"Exception is raised when i == 1 as expected {e}")

在上面的例子中,如果应用程序导致任务失败,Ray 会按照正确的顺序返回带有异常的对象引用。例如,如果 Ray 在第二次 yield 后引发异常,则第三次 next(gen) 将始终返回带有异常的对象引用。如果系统错误导致任务失败(例如,节点故障或 worker 进程故障),next(gen) 会在任何时候返回包含系统级异常的对象引用,且不保证顺序。这意味着当您有 N 次 yield 时,在发生故障时,生成器可以创建从 1 到 N + 1 个对象引用(N 个输出 + 一个带有系统级异常的引用)。

Actor 任务中的生成器#

Ray 生成器兼容 所有 Actor 执行模型。它可以与常规 Actor、异步 Actor线程化 Actor 无缝协作。

@ray.remote
class Actor:
    def f(self):
        for i in range(5):
            yield i

@ray.remote
class AsyncActor:
    async def f(self):
        for i in range(5):
            yield i

@ray.remote(max_concurrency=5)
class ThreadedActor:
    def f(self):
        for i in range(5):
            yield i

actor = Actor.remote()
for ref in actor.f.remote():
    print(ray.get(ref))

actor = AsyncActor.remote()
for ref in actor.f.remote():
    print(ray.get(ref))

actor = ThreadedActor.remote()
for ref in actor.f.remote():
    print(ray.get(ref))

将 Ray 生成器与 asyncio 结合使用#

返回的 ObjectRefGenerator 也兼容 asyncio。您可以使用 __anext__async for 循环。

import asyncio

@ray.remote
def task():
    for i in range(5):
        time.sleep(1)
        yield i


async def main():
    async for ref in task.remote():
        print(await ref)

asyncio.run(main())

对象引用的垃圾回收#

next(generator) 返回的引用只是一个常规的 Ray 对象引用,并以同样的方式进行分布式引用计数。如果通过 next API 未从生成器中消费引用,则在生成器被垃圾回收 (GC) 时,这些引用也会被回收。

@ray.remote
def task():
    for i in range(5):
        time.sleep(1)
        yield i

gen = task.remote()
ref1 = next(gen)
del gen

在以下示例中,Ray 在返回 ref1 后将其视为一个正常的 Ray 对象引用。其他未使用 next(gen) 消费的引用会在生成器被 GC 时移除。在本例中,垃圾回收发生当您调用 del gen 时。

容错#

容错功能兼容 Ray 生成器任务和 Actor 任务。例如:

取消#

ray.cancel() 函数适用于 Ray 生成器任务和 Actor 任务。从语义上讲,取消生成器任务与取消普通任务没有区别。当您取消任务时,next(gen) 可能会返回包含 TaskCancelledError 的引用,且不提供任何特定的顺序保证。

如何在不阻塞线程的情况下等待生成器(与 ray.wait 和 ray.get 的兼容性)#

使用生成器时,next API 会阻塞其线程,直到下一个对象引用可用。但是,您可能不总是需要这种行为。您可能希望在不阻塞线程的情况下等待生成器。使用 Ray 生成器可以通过以下方式实现非阻塞等待

等待生成器任务完成

ObjectRefGenerator 有一个 API completed。它返回一个对象引用,该引用在生成器任务完成或出错时可用。例如,您可以执行 ray.get(<generator_instance>.completed()) 来等待任务完成。请注意,不允许对 ObjectRefGenerator 使用 ray.get

使用 asyncio 和 await

ObjectRefGenerator 兼容 asyncio。您可以创建多个 asyncio 任务来创建生成器任务并等待其完成,从而避免阻塞线程。

import asyncio

@ray.remote
def task():
    for i in range(5):
        time.sleep(1)
        yield i


async def async_task():
    async for ref in task.remote():
        print(await ref)

async def main():
    t1 = async_task()
    t2 = async_task()
    await asyncio.gather(t1, t2)

asyncio.run(main())

使用 ray.wait

您可以将 ObjectRefGenerator 作为输入传递给 ray.wait。如果存在 next item 可用,则生成器被认为是“就绪”的。一旦 Ray 在就绪列表中找到该生成器,next(gen) 会立即返回下一个对象引用,而不会阻塞。请参阅下面的示例了解更多详细信息。

@ray.remote
def task():
    for i in range(5):
        time.sleep(5)
        yield i

gen = task.remote()

# Because it takes 5 seconds to make the first yield,
# with 0 timeout, the generator is unready.
ready, unready = ray.wait([gen], timeout=0)
print("timeout 0, nothing is ready.")
print(ready)
assert len(ready) == 0
assert len(unready) == 1

# Without a timeout argument, ray.wait waits until the given argument
# is ready. When a next item is ready, it returns.
ready, unready = ray.wait([gen])
print("Wait for 5 seconds. The next item is ready.")
assert len(ready) == 1
assert len(unready) == 0
next(gen)

# Because the second yield hasn't happened yet,
ready, unready = ray.wait([gen], timeout=0)
print("Wait for 0 seconds. The next item is not ready.")
print(ready, unready)
assert len(ready) == 0
assert len(unready) == 1

ray.wait 的所有输入参数(例如 timeoutnum_returnsfetch_local)都适用于生成器。

ray.wait 可以将常规 Ray 对象引用与生成器混合作为输入。在这种情况下,ray.wait 的所有输入参数(例如 timeoutnum_returnsfetch_local)的行为与生成器一起使用时保持一致。

from ray._raylet import ObjectRefGenerator

@ray.remote
def generator_task():
    for i in range(5):
        time.sleep(5)
        yield i

@ray.remote
def regular_task():
    for i in range(5):
        time.sleep(5)
    return

gen = [generator_task.remote()]
ref = [regular_task.remote()]
ready, unready = [], [*gen, *ref]
result = []

while unready:
    ready, unready = ray.wait(unready)
    for r in ready:
        if isinstance(r, ObjectRefGenerator):
            try:
                ref = next(r)
                result.append(ray.get(ref))
            except StopIteration:
                pass
            else:
                unready.append(r)
        else:
            result.append(ray.get(r))

线程安全#

ObjectRefGenerator 对象不是线程安全的。

限制#

Ray 生成器不支持以下功能

  • throwsendclose API。

  • 生成器中的 return 语句。

  • ObjectRefGenerator 传递给其他任务或 Actor。

  • Ray Client