嵌套远程函数#

远程函数可以调用其他远程函数,从而产生嵌套任务。例如,考虑以下代码。

import ray


@ray.remote
def f():
    return 1


@ray.remote
def g():
    # Call f 4 times and return the resulting object refs.
    return [f.remote() for _ in range(4)]


@ray.remote
def h():
    # Call f 4 times, block until those 4 tasks finish,
    # retrieve the results, and return the values.
    return ray.get([f.remote() for _ in range(4)])


然后调用 gh 会产生以下行为。

>>> ray.get(g.remote())
[ObjectRef(b1457ba0911ae84989aae86f89409e953dd9a80e),
 ObjectRef(7c14a1d13a56d8dc01e800761a66f09201104275),
 ObjectRef(99763728ffc1a2c0766a2000ebabded52514e9a6),
 ObjectRef(9c2f372e1933b04b2936bb6f58161285829b9914)]

>>> ray.get(h.remote())
[1, 1, 1, 1]

一个限制是f 的定义必须出现在 gh 的定义之前,因为一旦 g 被定义,它就会被 pickle 并发送到工作节点,所以如果 f 尚未定义,定义将是不完整的。

阻塞时释放资源#

Ray 在阻塞时会释放 CPU 资源。这可以防止嵌套任务等待父任务持有的 CPU 资源而导致的死锁情况。考虑以下远程函数。

@ray.remote(num_cpus=1, num_gpus=1)
def g():
    return ray.get(f.remote())


g 任务执行时,在调用 ray.get 时发生阻塞,它会释放其 CPU 资源。当 ray.get 返回时,它会重新获取 CPU 资源。在任务的整个生命周期中,它将保留其 GPU 资源,因为任务很可能将继续使用 GPU 内存。