反模式:一次性使用 ray.get 获取过多对象会导致失败#
简而言之: 避免对过多对象调用 ray.get(),因为这会导致堆内存不足或对象存储空间不足。相反,一次处理一个批次。
如果您有大量任务需要并行运行,一次性对所有任务调用 ray.get() 可能会因为 Ray 需要同时将所有对象都获取到调用者端而导致堆内存不足或对象存储空间不足的失败。相反,您应该一次获取和处理一批结果。一批处理完成后,Ray 会驱逐该批中的对象,为后续批次腾出空间。
一次性使用 ray.get() 获取过多对象#
代码示例#
反模式
import ray
import numpy as np
ray.init()
def process_results(results):
# custom process logic
pass
@ray.remote
def return_big_object():
return np.zeros(1024 * 10)
NUM_TASKS = 1000
object_refs = [return_big_object.remote() for _ in range(NUM_TASKS)]
# This will fail with heap out-of-memory
# or object store out-of-space if NUM_TASKS is large enough.
results = ray.get(object_refs)
process_results(results)
更好的方法
BATCH_SIZE = 100
while object_refs:
# Process results in the finish order instead of the submission order.
ready_object_refs, object_refs = ray.wait(object_refs, num_returns=BATCH_SIZE)
# The node only needs enough space to store
# a batch of objects instead of all objects.
results = ray.get(ready_object_refs)
process_results(results)
这里除了分批获取以避免失败之外,我们还使用 ray.wait() 来按照完成顺序而不是提交顺序处理结果,以减少运行时间。有关更多详细信息,请参阅 反模式:使用 ray.get 按提交顺序处理结果会增加运行时间。