模式:使用资源限制并发运行的任务数量#
在此模式中,我们使用资源来限制并发运行的任务数量。
默认情况下,Ray 任务每个需要 1 个 CPU,Ray actor 每个需要 0 个 CPU,因此调度器将任务的并发限制为可用 CPU,将 actor 的并发限制为无限。使用超过 1 个 CPU 的任务(例如,通过多线程)可能会由于并发任务的干扰而变慢,但否则可以安全运行。
然而,使用超出其比例内存份额的任务或 actor 可能会使节点过载并导致 OOM 等问题。在这种情况下,我们可以通过增加任务或 actor 所需的资源量来减少每个节点上并发运行的任务或 actor 的数量。这是因为 Ray 确保给定节点上所有并发运行的任务和 actor 的资源需求总和不超过节点的总资源。
注意
对于 actor 任务,运行的 actor 数量限制了我们可以拥有的并发 actor 任务数量。
用例#
您有一个数据处理工作负载,该工作负载使用 Ray 远程函数独立处理每个输入文件。由于每个任务都需要将输入数据加载到堆内存中并进行处理,运行过多的任务可能会导致 OOM。在这种情况下,您可以使用 memory 资源来限制并发运行的任务数量(其他资源如 num_cpus 的使用也可以达到相同的目的)。请注意,与 num_cpus 类似,memory 资源需求是逻辑的,这意味着 Ray 不会强制执行每个任务超出此数量的物理内存使用情况。
代码示例#
无限制
import ray
# Assume this Ray node has 16 CPUs and 16G memory.
ray.init()
@ray.remote
def process(file):
# Actual work is reading the file and process the data.
# Assume it needs to use 2G memory.
pass
NUM_FILES = 1000
result_refs = []
for i in range(NUM_FILES):
# By default, process task will use 1 CPU resource and no other resources.
# This means 16 tasks can run concurrently
# and will OOM since 32G memory is needed while the node only has 16G.
result_refs.append(process.remote(f"{i}.csv"))
ray.get(result_refs)
有 C 限制
result_refs = []
for i in range(NUM_FILES):
# Now each task will use 2G memory resource
# and the number of concurrently running tasks is limited to 8.
# In this case, setting num_cpus to 2 has the same effect.
result_refs.append(
process.options(memory=2 * 1024 * 1024 * 1024).remote(f"{i}.csv")
)
ray.get(result_refs)