动态生成器#

Python 生成器是行为类似于迭代器的函数,每次迭代生成一个值。Ray 支持远程生成器用于两种用例:

  1. 在远程函数返回多个值时减少最大堆内存使用。请参阅设计模式指南中的示例。

  2. 当返回值的数量由远程函数动态设置,而不是由调用者设置时。

远程生成器可在 actor 任务和非 actor 任务中使用。

num_returns 由任务调用者设置#

在可能的情况下,调用者应使用 @ray.remote(num_returns=x)foo.options(num_returns=x).remote() 设置远程函数的返回值数量。Ray 将向调用者返回相应数量的 ObjectRefs。远程任务应返回相同数量的值,通常以元组或列表的形式。与动态设置返回值数量相比,这种方式为用户代码增加了更少的复杂性,性能开销也更小,因为 Ray 会提前准确知道需要向调用者返回多少 ObjectRefs

在不改变调用者语法的情况下,我们也可以使用远程生成器函数以迭代方式生成值。生成器应生成与调用者指定的返回值数量相同的值,这些值将一次一个地存储在 Ray 的对象存储中。如果生成器生成的值数量与调用者指定的数量不同,将引发错误。

例如,我们可以替换以下返回返回值列表的代码:

import numpy as np


@ray.remote
def large_values(num_returns):
    return [
        np.random.randint(np.iinfo(np.int8).max, size=(100_000_000, 1), dtype=np.int8)
        for _ in range(num_returns)
    ]

为使用生成器函数的这段代码:

@ray.remote
def large_values_generator(num_returns):
    for i in range(num_returns):
        yield np.random.randint(
            np.iinfo(np.int8).max, size=(100_000_000, 1), dtype=np.int8
        )
        print(f"yielded return value {i}")

这样做的好处是生成器函数不需要一次将所有返回值都保存在内存中。它可以一次生成一个数组,以减少内存压力。

num_returns 由任务执行器设置#

在某些情况下,调用者可能不知道远程函数会返回多少个值。例如,假设我们要编写一个任务,将其参数分解成大小相等的块并返回这些块。我们可能要等到执行任务时才知道参数的大小,因此我们无法知道预期返回值的数量。

在这些情况下,我们可以使用远程生成器函数,它返回一个动态数量的值。要使用此功能,请在 @ray.remote 装饰器或远程函数的 .options() 中设置 num_returns="dynamic"。然后,在调用远程函数时,Ray 将返回一个单个 ObjectRef,该 ObjectRef 将在任务完成时填充 DynamicObjectRefGenerator。可以使用 DynamicObjectRefGenerator 迭代包含任务返回的实际值的 ObjectRefs 列表。

import numpy as np


@ray.remote(num_returns="dynamic")
def split(array, chunk_size):
    while len(array) > 0:
        yield array[:chunk_size]
        array = array[chunk_size:]


array_ref = ray.put(np.zeros(np.random.randint(1000_000)))
block_size = 1000

# Returns an ObjectRef[DynamicObjectRefGenerator].
dynamic_ref = split.remote(array_ref, block_size)
print(dynamic_ref)
# ObjectRef(c8ef45ccd0112571ffffffffffffffffffffffff0100000001000000)

i = -1
ref_generator = ray.get(dynamic_ref)
print(ref_generator)
# <ray._raylet.DynamicObjectRefGenerator object at 0x7f7e2116b290>
for i, ref in enumerate(ref_generator):
    # Each DynamicObjectRefGenerator iteration returns an ObjectRef.
    assert len(ray.get(ref)) <= block_size
num_blocks_generated = i + 1
array_size = len(ray.get(array_ref))
assert array_size <= num_blocks_generated * block_size
print(f"Split array of size {array_size} into {num_blocks_generated} blocks of "
      f"size {block_size} each.")
# Split array of size 63153 into 64 blocks of size 1000 each.

# NOTE: The dynamic_ref points to the generated ObjectRefs. Make sure that this
# ObjectRef goes out of scope so that Ray can garbage-collect the internal
# ObjectRefs.
del dynamic_ref

我们还可以将带有 num_returns="dynamic" 的任务返回的 ObjectRef 传递给另一个任务。该任务将接收 DynamicObjectRefGenerator,它可以使用它来迭代任务的返回值。同样,你也可以将 ObjectRefGenerator 作为任务参数传递。

@ray.remote
def get_size(ref_generator : DynamicObjectRefGenerator):
    print(ref_generator)
    num_elements = 0
    for ref in ref_generator:
        array = ray.get(ref)
        assert len(array) <= block_size
        num_elements += len(array)
    return num_elements


# Returns an ObjectRef[DynamicObjectRefGenerator].
dynamic_ref = split.remote(array_ref, block_size)
assert array_size == ray.get(get_size.remote(dynamic_ref))
# (get_size pid=1504184)
# <ray._raylet.DynamicObjectRefGenerator object at 0x7f81c4250ad0>

# This also works, but should be avoided because you have to call an additional
# `ray.get`, which blocks the driver.
ref_generator = ray.get(dynamic_ref)
assert array_size == ray.get(get_size.remote(ref_generator))
# (get_size pid=1504184)
# <ray._raylet.DynamicObjectRefGenerator object at 0x7f81c4251b50>

异常处理#

如果生成器函数在生成所有值之前引发异常,则已存储的值仍然可以通过其 ObjectRefs 访问。其余的 ObjectRefs 将包含引发的异常。对于静态和动态 num_returns 都是如此。如果任务是使用 num_returns="dynamic" 调用的,则异常将作为额外的最后一个 ObjectRef 存储在 DynamicObjectRefGenerator 中。

@ray.remote
def generator():
    for i in range(2):
        yield i
    raise Exception("error")


ref1, ref2, ref3, ref4 = generator.options(num_returns=4).remote()
assert ray.get([ref1, ref2]) == [0, 1]
# All remaining ObjectRefs will contain the error.
try:
    ray.get([ref3, ref4])
except Exception as error:
    print(error)

dynamic_ref = generator.options(num_returns="dynamic").remote()
ref_generator = ray.get(dynamic_ref)
ref1, ref2, ref3 = ref_generator
assert ray.get([ref1, ref2]) == [0, 1]
# Generators with num_returns="dynamic" will store the exception in the final
# ObjectRef.
try:
    ray.get(ref3)
except Exception as error:
    print(error)

请注意,目前存在一个已知错误,即生成器如果生成的值多于预期,则不会传播异常。这可能发生在两种情况下:

  1. num_returns 由调用者设置,但生成器任务返回的值多于此值。

  2. 当带有 num_returns="dynamic" 的生成器任务被重新执行时,重新执行的任务生成的值多于原始执行。请注意,一般来说,如果任务是非确定性的,Ray 不保证任务重新执行的正确性,建议对此类任务设置 @ray.remote(max_retries=0)

# Generators that yield more values than expected currently do not throw an
# exception (the error is only logged).
# See https://github.com/ray-project/ray/issues/28689.
ref1, ref2 = generator.options(num_returns=2).remote()
assert ray.get([ref1, ref2]) == [0, 1]
"""
(generator pid=2375938) 2022-09-28 11:08:51,386 ERROR worker.py:755 --
    Unhandled error: Task threw exception, but all return values already
    created.  This should only occur when using generator tasks.
...
"""

限制#

尽管生成器函数一次创建一个 ObjectRef,但目前 Ray 不会在整个任务完成且所有值都已创建之前调度依赖任务。这与返回多个值作为列表的任务所使用的语义类似。