Scheduling#

本页面概述了 Ray 如何决定将任务和 Actor 调度到节点。

标签#

标签提供了一种简化的解决方案,用于使用默认和自定义标签来控制任务、Actor 和放置组束的调度。请参阅 使用标签控制调度

标签是一项 Beta 功能。随着此功能趋于稳定,Ray 团队建议使用标签替换以下模式:

  • NodeAffinitySchedulingStrategy 当 soft=false 时。请改用默认的 ray.io/node-id 标签。

  • 任务和 Actor 的 accelerator_type 选项。请改用默认的 ray.io/accelerator-type 标签。

注意

一种旧的模式,推荐使用自定义资源进行基于标签的调度。现在,我们建议仅在需要使用数值来管理调度时才使用自定义资源。

资源#

每个任务或 Actor 都有 指定的资源需求。给定这些需求,节点可以处于以下状态之一:

  • 可行:节点拥有运行任务或 Actor 所需的资源。根据这些资源的当前可用性,有两种子状态:

    • 可用:节点拥有所需的资源,并且它们现在是空闲的。

    • 不可用:节点拥有所需的资源,但它们当前正被其他任务或 Actor 使用。

  • 不可行:节点没有所需的资源。例如,仅 CPU 的节点对于 GPU 任务是不可行的。

资源需求是硬性需求,这意味着只有可行的节点才有资格运行任务或 Actor。如果存在可行的节点,Ray 将根据下面讨论的其他因素,选择一个可用节点,或者等待一个不可用节点变得可用。如果所有节点都不可行,则在向集群添加可行节点之前,任务或 Actor 无法调度。

调度策略#

任务或 Actor 支持 scheduling_strategy 选项,用于指定在可行节点中决定最佳节点的策略。目前支持的策略如下:

“DEFAULT”#

"DEFAULT" 是 Ray 使用的默认策略。Ray 将任务或 Actor 调度到一组顶部的 k 个节点。具体来说,节点会先排序,以优先考虑已经调度了任务或 Actor 的节点(以实现局部性),然后优先考虑资源利用率较低的节点(以实现负载均衡)。在顶部的 k 个节点组中,节点会随机选择,以进一步提高负载均衡并减少大型集群中的冷启动延迟。

在实现方面,Ray 根据其逻辑资源的利用率计算集群中每个节点的得分。如果利用率低于某个阈值(由操作系统环境变量 RAY_scheduler_spread_threshold 控制,默认为 0.5),则得分为 0,否则得分为资源利用率本身(得分 1 表示节点已满负荷运行)。Ray 通过从得分最低的顶 k 个节点中随机选择一个来选择最佳调度节点。k 的值是(集群节点数 * RAY_scheduler_top_k_fraction 环境变量)和 RAY_scheduler_top_k_absolute 环境变量中的最大值。默认情况下,它是总节点数的 20%。

目前,Ray 会特殊处理那些不需要任何资源的 Actor(即 num_cpus=0 且没有其他资源),通过随机选择集群中的一个节点,而不考虑资源利用率。由于节点是随机选择的,因此不需要任何资源的 Actor 会有效地分布在整个集群中。

@ray.remote
def func():
    return 1


@ray.remote(num_cpus=1)
class Actor:
    pass


# If unspecified, "DEFAULT" scheduling strategy is used.
func.remote()
actor = Actor.remote()
# Explicitly set scheduling strategy to "DEFAULT".
func.options(scheduling_strategy="DEFAULT").remote()
actor = Actor.options(scheduling_strategy="DEFAULT").remote()

# Zero-CPU (and no other resources) actors are randomly assigned to nodes.
actor = Actor.options(num_cpus=0).remote()

“SPREAD”#

"SPREAD" 策略将尝试将任务或 Actor 分散到可用节点之间。

@ray.remote(scheduling_strategy="SPREAD")
def spread_func():
    return 2


@ray.remote(num_cpus=1)
class SpreadActor:
    pass


# Spread tasks across the cluster.
[spread_func.remote() for _ in range(10)]
# Spread actors across the cluster.
actors = [SpreadActor.options(scheduling_strategy="SPREAD").remote() for _ in range(10)]

PlacementGroupSchedulingStrategy#

PlacementGroupSchedulingStrategy 将任务或 Actor 调度到放置组所在的位置。这对于 Actor 组调度很有用。有关更多详细信息,请参阅 放置组

NodeAffinitySchedulingStrategy#

NodeAffinitySchedulingStrategy 是一种底层策略,它允许任务或 Actor 调度到由其节点 ID 指定的特定节点上。soft 标志指定了如果指定节点不存在(例如,如果节点死亡)或由于没有运行任务或 Actor 所需的资源而不可行,是否允许任务或 Actor 在其他地方运行。在这些情况下,如果 soft 为 True,则任务或 Actor 将调度到另一个可行的节点上。否则,任务或 Actor 将以 TaskUnschedulableErrorActorUnschedulableError 失败。只要指定的节点处于活动状态且可行,任务或 Actor 就会在那里运行,而不管 soft 标志如何。这意味着如果节点当前没有可用资源,任务或 Actor 将等待直到资源可用。此策略应用于其他高级调度策略(例如 放置组)无法提供所需的任务或 Actor 放置时。它有以下已知限制:

  • 它是一种底层策略,会阻止智能调度器进行优化。

  • 它无法充分利用自动扩展集群,因为在创建任务或 Actor 时必须知道节点 ID。

  • 在多租户集群中,很难做出最佳的静态放置决策:例如,一个应用程序将不知道同一节点上还调度了什么。

@ray.remote
def node_affinity_func():
    return ray.get_runtime_context().get_node_id()


@ray.remote(num_cpus=1)
class NodeAffinityActor:
    pass


# Only run the task on the local node.
node_affinity_func.options(
    scheduling_strategy=ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(
        node_id=ray.get_runtime_context().get_node_id(),
        soft=False,
    )
).remote()

# Run the two node_affinity_func tasks on the same node if possible.
node_affinity_func.options(
    scheduling_strategy=ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(
        node_id=ray.get(node_affinity_func.remote()),
        soft=True,
    )
).remote()

# Only run the actor on the local node.
actor = NodeAffinityActor.options(
    scheduling_strategy=ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(
        node_id=ray.get_runtime_context().get_node_id(),
        soft=False,
    )
).remote()

局部性感知调度#

默认情况下,Ray 优先选择与大型任务参数具有局部性的可用节点,以避免通过网络传输数据。如果存在多个大型任务参数,则优先选择具有最多本地对象字节的节点。这优先于 "DEFAULT" 调度策略,这意味着 Ray 将尝试在局部性优先节点上运行任务,而不管节点资源利用率如何。但是,如果局部性优先节点不可用,Ray 可能会在其他地方运行任务。当指定了其他调度策略时,它们具有更高的优先级,并且不再考虑数据局部性。

注意

局部性感知调度仅适用于任务,不适用于 Actor。

@ray.remote
def large_object_func():
    # Large object is stored in the local object store
    # and available in the distributed memory,
    # instead of returning inline directly to the caller.
    return [1] * (1024 * 1024)


@ray.remote
def small_object_func():
    # Small object is returned inline directly to the caller,
    # instead of storing in the distributed memory.
    return [1]


@ray.remote
def consume_func(data):
    return len(data)


large_object = large_object_func.remote()
small_object = small_object_func.remote()

# Ray will try to run consume_func on the same node
# where large_object_func runs.
consume_func.remote(large_object)

# Ray will try to spread consume_func across the entire cluster
# instead of only running on the node where large_object_func runs.
[
    consume_func.options(scheduling_strategy="SPREAD").remote(large_object)
    for i in range(10)
]

# Ray won't consider locality for scheduling consume_func
# since the argument is small and will be sent to the worker node inline directly.
consume_func.remote(small_object)

更多关于 Ray 调度#