模式:使用资源限制并行运行的任务数量#

在此模式中,我们使用资源来限制并行运行的任务数量。

默认情况下,每个 Ray 任务需要 1 个 CPU,每个 Ray actor 需要 0 个 CPU,因此调度器将任务并发性限制为可用 CPU 数量,而 actor 并发性则无限制。使用超过 1 个 CPU(例如,通过多线程)的任务可能会因并发任务的干扰而变慢,但除此之外可以安全运行。

然而,使用超过其相应比例内存的任务或 actor 可能会导致节点过载并引发 OOM 等问题。如果出现这种情况,我们可以通过增加任务或 actor 请求的资源量来减少每个节点上并行运行的任务或 actor 数量。这是因为 Ray 会确保给定节点上所有并行运行的任务或 actor 的资源需求总和不超过该节点的总资源。

注意

对于 actor 任务,运行中的 actor 数量限制了我们可以拥有的并行运行的 actor 任务数量。

用例示例#

您有一个数据处理工作负载,它使用 Ray 远程函数独立处理每个输入文件。由于每个任务都需要将输入数据加载到堆内存中并进行处理,运行太多任务可能导致 OOM。在这种情况下,您可以使用 memory 资源来限制并行运行的任务数量(使用 num_cpus 等其他资源也可以达到同样的目的)。请注意,与 num_cpus 类似,memory 资源需求是逻辑的,这意味着如果每个任务超过此数量,Ray 不会强制限制其物理内存使用。

代码示例#

无限制

import ray

# Assume this Ray node has 16 CPUs and 16G memory.
ray.init()


@ray.remote
def process(file):
    # Actual work is reading the file and process the data.
    # Assume it needs to use 2G memory.
    pass


NUM_FILES = 1000
result_refs = []
for i in range(NUM_FILES):
    # By default, process task will use 1 CPU resource and no other resources.
    # This means 16 tasks can run concurrently
    # and will OOM since 32G memory is needed while the node only has 16G.
    result_refs.append(process.remote(f"{i}.csv"))
ray.get(result_refs)

有限制

result_refs = []
for i in range(NUM_FILES):
    # Now each task will use 2G memory resource
    # and the number of concurrently running tasks is limited to 8.
    # In this case, setting num_cpus to 2 has the same effect.
    result_refs.append(
        process.options(memory=2 * 1024 * 1024 * 1024).remote(f"{i}.csv")
    )
ray.get(result_refs)