动态生成器#

警告

由于其局限性num_returns="dynamic" 生成器 API 自 Ray 2.8 起已软弃用。请改用 流式生成器 API

Python 生成器是行为类似于迭代器的函数,每次迭代产生一个值。Ray 支持远程生成器,用于两种用例:

  1. 当远程函数返回多个值时,以减少最大堆内存使用量。有关示例,请参阅设计模式指南

  2. 当返回值的数量由远程函数动态设置,而不是由调用者设置时。

远程生成器可用于 actor 和非 actor 任务。

num_returns 由任务调用者设置#

尽可能,调用者应使用 @ray.remote(num_returns=x)foo.options(num_returns=x).remote() 来设置远程函数的返回值数量。Ray 将向调用者返回这么多 ObjectRefs。然后,远程任务应返回相同数量的值,通常作为元组或列表。与动态设置返回值数量相比,这种方式增加了用户代码的复杂性,并减少了性能开销,因为 Ray 能够提前确切知道要向调用者返回多少 ObjectRefs

在不更改调用者语法的情况下,我们还可以使用远程生成器函数来迭代地生成值。生成器应生成与调用者指定的返回值数量相同的值,这些值将一次性存储在 Ray 的对象存储中。对于生成的值数量与调用者指定的数量不同的生成器,将引发错误。

例如,我们可以将以下返回返回值列表的代码

import numpy as np


@ray.remote
def large_values(num_returns):
    return [
        np.random.randint(np.iinfo(np.int8).max, size=(100_000_000, 1), dtype=np.int8)
        for _ in range(num_returns)
    ]

替换为这段使用生成器函数的代码:

@ray.remote
def large_values_generator(num_returns):
    for i in range(num_returns):
        yield np.random.randint(
            np.iinfo(np.int8).max, size=(100_000_000, 1), dtype=np.int8
        )
        print(f"yielded return value {i}")

这样做的优点是生成器函数不需要一次性将所有返回值保存在内存中。它可以一次生成一个数组,以减少内存压力。

num_returns 由任务执行者设置#

在某些情况下,调用者可能不知道要从远程函数预期多少返回值。例如,假设我们要编写一个任务,该任务将参数分解为等大小的块并返回它们。在执行任务之前,我们可能不知道参数的大小,因此也不知道预期的返回值数量。

在这些情况下,我们可以使用一个返回动态数量值的远程生成器函数。要使用此功能,请在 @ray.remote 装饰器或远程函数的 .options() 中设置 num_returns="dynamic"。然后,在调用远程函数时,Ray 将返回一个单个 ObjectRef,该 ObjectRef 将在任务完成时被填充为 DynamicObjectRefGenerator。该 DynamicObjectRefGenerator 可用于迭代包含任务返回的实际值的 ObjectRefs 列表。

import numpy as np


@ray.remote(num_returns="dynamic")
def split(array, chunk_size):
    while len(array) > 0:
        yield array[:chunk_size]
        array = array[chunk_size:]


array_ref = ray.put(np.zeros(np.random.randint(1000_000)))
block_size = 1000

# Returns an ObjectRef[DynamicObjectRefGenerator].
dynamic_ref = split.remote(array_ref, block_size)
print(dynamic_ref)
# ObjectRef(c8ef45ccd0112571ffffffffffffffffffffffff0100000001000000)

i = -1
ref_generator = ray.get(dynamic_ref)
print(ref_generator)
# <ray._raylet.DynamicObjectRefGenerator object at 0x7f7e2116b290>
for i, ref in enumerate(ref_generator):
    # Each DynamicObjectRefGenerator iteration returns an ObjectRef.
    assert len(ray.get(ref)) <= block_size
num_blocks_generated = i + 1
array_size = len(ray.get(array_ref))
assert array_size <= num_blocks_generated * block_size
print(f"Split array of size {array_size} into {num_blocks_generated} blocks of "
      f"size {block_size} each.")
# Split array of size 63153 into 64 blocks of size 1000 each.

# NOTE: The dynamic_ref points to the generated ObjectRefs. Make sure that this
# ObjectRef goes out of scope so that Ray can garbage-collect the internal
# ObjectRefs.
del dynamic_ref

我们也可以将由带有 num_returns="dynamic" 的任务返回的 ObjectRef 传递给另一个任务。该任务将接收 DynamicObjectRefGenerator,并可以使用它来迭代任务的返回值。类似地,您也可以将 ObjectRefGenerator 作为任务参数传递。

@ray.remote
def get_size(ref_generator : DynamicObjectRefGenerator):
    print(ref_generator)
    num_elements = 0
    for ref in ref_generator:
        array = ray.get(ref)
        assert len(array) <= block_size
        num_elements += len(array)
    return num_elements


# Returns an ObjectRef[DynamicObjectRefGenerator].
dynamic_ref = split.remote(array_ref, block_size)
assert array_size == ray.get(get_size.remote(dynamic_ref))
# (get_size pid=1504184)
# <ray._raylet.DynamicObjectRefGenerator object at 0x7f81c4250ad0>

# This also works, but should be avoided because you have to call an additional
# `ray.get`, which blocks the driver.
ref_generator = ray.get(dynamic_ref)
assert array_size == ray.get(get_size.remote(ref_generator))
# (get_size pid=1504184)
# <ray._raylet.DynamicObjectRefGenerator object at 0x7f81c4251b50>

异常处理#

如果生成器函数在生成所有值之前引发了异常,那么它已经存储的值仍然可以通过它们的 ObjectRefs 访问。其余的 ObjectRefs 将包含引发的异常。这对于静态和动态 num_returns 都适用。如果任务是使用 num_returns="dynamic" 调用,异常将作为额外的最终 ObjectRef 存储在 DynamicObjectRefGenerator 中。

@ray.remote
def generator():
    for i in range(2):
        yield i
    raise Exception("error")


ref1, ref2, ref3, ref4 = generator.options(num_returns=4).remote()
assert ray.get([ref1, ref2]) == [0, 1]
# All remaining ObjectRefs will contain the error.
try:
    ray.get([ref3, ref4])
except Exception as error:
    print(error)

dynamic_ref = generator.options(num_returns="dynamic").remote()
ref_generator = ray.get(dynamic_ref)
ref1, ref2, ref3 = ref_generator
assert ray.get([ref1, ref2]) == [0, 1]
# Generators with num_returns="dynamic" will store the exception in the final
# ObjectRef.
try:
    ray.get(ref3)
except Exception as error:
    print(error)

请注意,目前有一个已知错误,对于生成值数量超出预期的生成器,异常不会被传播。这种情况可能发生在两种情况下:

  1. num_returns 由调用者设置,但生成器任务返回的值多于此值。

  2. 当一个带有 num_returns="dynamic" 的生成器任务被重试执行,并且重试执行的任务生成的数量多于原始执行。请注意,一般来说,如果任务是非确定性的,Ray 不保证任务重试执行的正确性,建议为此类任务设置 @ray.remote(max_retries=0)

# Generators that yield more values than expected currently do not throw an
# exception (the error is only logged).
# See https://github.com/ray-project/ray/issues/28689.
ref1, ref2 = generator.options(num_returns=2).remote()
assert ray.get([ref1, ref2]) == [0, 1]
"""
(generator pid=2375938) 2022-09-28 11:08:51,386 ERROR worker.py:755 --
    Unhandled error: Task threw exception, but all return values already
    created.  This should only occur when using generator tasks.
...
"""

限制#

尽管生成器函数一次创建一个 ObjectRefs,但目前 Ray 不会调度依赖任务,直到整个任务完成并且所有值都已创建。这与返回列表形式的多个值的任务所使用的语义类似。