Pattern: 使用 ray.wait 限制待处理任务的数量#

在此模式中,我们使用 ray.wait() 来限制待处理任务的数量。

如果我们持续以比处理时间更快的速度提交任务,待处理任务队列将会不断累积任务,最终可能导致 OOM(内存溢出)。使用 ray.wait(),我们可以应用背压并限制待处理任务的数量,这样待处理任务队列就不会无限增长并导致 OOM。

注意

如果我们提交的任务数量是有限的,不太可能遇到上述问题,因为每个任务在队列中仅占用少量内存用于簿记。当有无限的任务流需要运行时,更容易出现这种情况。

注意

此方法主要用于限制同时在“飞行中”(即正在执行或等待执行)的任务数量。它也可以用来限制可以“并发”运行的任务数量,但不推荐这样做,因为它可能会影响调度性能。Ray 会根据资源可用性自动决定任务并行度,因此调整并发运行任务数量的推荐方法是 修改每个任务的资源需求

用例#

您有一个工作 actor,其处理任务的速度为每秒 X 个任务,并且您希望以低于 X 的速度提交任务以避免 OOM。

例如,Ray Serve 使用此模式来限制每个 worker 的待处理查询数量。

../../_images/limit-pending-tasks.svg

限制待处理任务的数量#

代码示例#

无背压

import ray

ray.init()


@ray.remote
class Actor:
    async def heavy_compute(self):
        # taking a long time...
        # await asyncio.sleep(5)
        return


actor = Actor.remote()

NUM_TASKS = 1000
result_refs = []
# When NUM_TASKS is large enough, this will eventually OOM.
for _ in range(NUM_TASKS):
    result_refs.append(actor.heavy_compute.remote())
ray.get(result_refs)

有背压

MAX_NUM_PENDING_TASKS = 100
result_refs = []
for _ in range(NUM_TASKS):
    if len(result_refs) > MAX_NUM_PENDING_TASKS:
        # update result_refs to only
        # track the remaining tasks.
        ready_refs, result_refs = ray.wait(result_refs, num_returns=1)
        ray.get(ready_refs)

    result_refs.append(actor.heavy_compute.remote())

ray.get(result_refs)