模式:使用资源限制并行运行的任务数量#
在此模式中,我们使用资源来限制并行运行的任务数量。
默认情况下,每个 Ray 任务需要 1 个 CPU,每个 Ray actor 需要 0 个 CPU,因此调度器将任务并发性限制为可用 CPU 数量,而 actor 并发性则无限制。使用超过 1 个 CPU(例如,通过多线程)的任务可能会因并发任务的干扰而变慢,但除此之外可以安全运行。
然而,使用超过其相应比例内存的任务或 actor 可能会导致节点过载并引发 OOM 等问题。如果出现这种情况,我们可以通过增加任务或 actor 请求的资源量来减少每个节点上并行运行的任务或 actor 数量。这是因为 Ray 会确保给定节点上所有并行运行的任务或 actor 的资源需求总和不超过该节点的总资源。
注意
对于 actor 任务,运行中的 actor 数量限制了我们可以拥有的并行运行的 actor 任务数量。
用例示例#
您有一个数据处理工作负载,它使用 Ray 远程函数独立处理每个输入文件。由于每个任务都需要将输入数据加载到堆内存中并进行处理,运行太多任务可能导致 OOM。在这种情况下,您可以使用 memory
资源来限制并行运行的任务数量(使用 num_cpus
等其他资源也可以达到同样的目的)。请注意,与 num_cpus
类似,memory
资源需求是逻辑的,这意味着如果每个任务超过此数量,Ray 不会强制限制其物理内存使用。
代码示例#
无限制
import ray
# Assume this Ray node has 16 CPUs and 16G memory.
ray.init()
@ray.remote
def process(file):
# Actual work is reading the file and process the data.
# Assume it needs to use 2G memory.
pass
NUM_FILES = 1000
result_refs = []
for i in range(NUM_FILES):
# By default, process task will use 1 CPU resource and no other resources.
# This means 16 tasks can run concurrently
# and will OOM since 32G memory is needed while the node only has 16G.
result_refs.append(process.remote(f"{i}.csv"))
ray.get(result_refs)
有限制
result_refs = []
for i in range(NUM_FILES):
# Now each task will use 2G memory resource
# and the number of concurrently running tasks is limited to 8.
# In this case, setting num_cpus to 2 has the same effect.
result_refs.append(
process.options(memory=2 * 1024 * 1024 * 1024).remote(f"{i}.csv")
)
ray.get(result_refs)