Kueue 集群调度、优先级调度和自动伸缩 KubeRay CRD#

本指南演示了如何将 KubeRay 与 Kueue 集成,以在 Kubernetes 上为 Ray 应用程序启用高级调度功能,包括集群调度和优先级调度。

有关 RayJob 的实际用例,请参阅 RayJob 和 Kueue 的优先级调度RayJob 和 Kueue 的集群调度

什么是 Kueue?#

Kueue 是一个 Kubernetes 原生的作业排队系统,用于管理资源配额和作业生命周期。Kueue 决定何时

  • 让作业等待。

  • 允许作业启动,这会触发 Kubernetes 创建 Pod。

  • 抢占作业,这会触发 Kubernetes 删除活动的 Pod。

支持的 KubeRay CRD#

Kueue 原生支持以下 KubeRay API

  • RayJob:适用于批处理和模型训练工作负载(本指南涵盖)

  • RayCluster:适用于管理长期运行的 Ray 集群

  • RayService:用于服务模型和应用程序

注意:本指南重点介绍在 kind 集群上的详细 RayJob 示例。有关 RayCluster 和 RayService 的示例,请参阅 “使用 RayCluster 和 RayService” 部分。

先决条件#

开始之前,请确保您拥有一个 Kubernetes 集群。本指南使用本地 Kind 集群。

步骤 0:创建 Kind 集群#

kind create cluster

步骤 1:安装 KubeRay 操作员#

按照 部署 KubeRay Operator 的说明,从 Helm 存储库安装最新的稳定 KubeRay Operator。

步骤 2:安装 Kueue#

VERSION=v0.13.4
kubectl apply --server-side -f https://github.com/kubernetes-sigs/kueue/releases/download/$VERSION/manifests.yaml

有关安装 Kueue 的更多详细信息,请参阅 Kueue 安装注意:Kueue 和 RayJob 之间存在一些限制。有关更多详细信息,请参阅 Kueue 的限制

步骤 3:创建 Kueue 资源#

此清单创建了管理调度和资源分配所需的 Kueue 资源。

# kueue-resources.yaml
apiVersion: kueue.x-k8s.io/v1beta1
kind: ResourceFlavor
metadata:
  name: "default-flavor"
---
apiVersion: kueue.x-k8s.io/v1beta1
kind: ClusterQueue
metadata:
  name: "cluster-queue"
spec:
  preemption:
    withinClusterQueue: LowerPriority
  namespaceSelector: {} # Match all namespaces.
  resourceGroups:
  - coveredResources: ["cpu", "memory"]
    flavors:
    - name: "default-flavor"
      resources:
      - name: "cpu"
        nominalQuota: 3
      - name: "memory"
        nominalQuota: 6G
---
apiVersion: kueue.x-k8s.io/v1beta1
kind: LocalQueue
metadata:
  namespace: "default"
  name: "user-queue"
spec:
  clusterQueue: "cluster-queue"
---
apiVersion: kueue.x-k8s.io/v1beta1
kind: WorkloadPriorityClass
metadata:
  name: prod-priority
value: 1000
description: "Priority class for prod jobs"
---
apiVersion: kueue.x-k8s.io/v1beta1
kind: WorkloadPriorityClass
metadata:
  name: dev-priority
value: 100
description: "Priority class for development jobs"

YAML 清单配置了

  • ResourceFlavor

    • 由于 Kubernetes 集群中的计算资源是同质的,因此 default-flavor ResourceFlavor 是空的。换句话说,用户可以请求 1 个 CPU,而不必考虑它是 ARM 芯片还是 x86 芯片。

  • ClusterQueue

    • cluster-queue ClusterQueue 仅具有 1 个 default-flavor ResourceFlavor,其配额为 3 个 CPU 和 6G 内存。

    • cluster-queue ClusterQueue 具有抢占策略 withinClusterQueue: LowerPriority。此策略允许不符合其 ClusterQueue 名义配额的待定 RayJob 抢占 ClusterQueue 中优先级较低的活动 RayJob 自定义资源。

  • LocalQueue

    • user-queue LocalQueue 是 default 命名空间中属于 ClusterQueue 的命名空间范围对象。通常的做法是将一个命名空间分配给组织中的租户、团队或用户。用户将作业提交到 LocalQueue,而不是直接提交到 ClusterQueue。

  • WorkloadPriorityClass

    • prod-priority WorkloadPriorityClass 的值高于 dev-priority WorkloadPriorityClass。具有 prod-priority 优先级类的 RayJob 自定义资源优先于具有 dev-priority 优先级类的 RayJob 自定义资源。

创建 Kueue 资源

kubectl apply -f kueue-resources.yaml

步骤 4:使用 Kueue 进行集群调度#

Kueue 始终以“集群”模式分配工作负载。Kueue 以“全有或全无”的方式分配工作负载,确保 Kubernetes 从不部分配置 RayJob 或 RayCluster。使用集群调度策略来避免因工作负载调度效率低下而造成的计算资源浪费。

从 KubeRay 存储库下载 RayJob YAML 清单。

curl -LO https://raw.githubusercontent.com/ray-project/kuberay/master/ray-operator/config/samples/ray-job.kueue-toy-sample.yaml

在创建 RayJob 之前,修改 RayJob 元数据,如下所示

metadata:
  generateName: rayjob-sample-
  labels:
    kueue.x-k8s.io/queue-name: user-queue
    kueue.x-k8s.io/priority-class: dev-priority

创建两个具有相同优先级 dev-priority 的 RayJob 自定义资源。请注意 RayJob 自定义资源的这些重要要点

  • RayJob 自定义资源包括 1 个 head Pod 和 1 个 worker Pod,每个 Pod 请求 1 个 CPU 和 2G 内存。

  • RayJob 运行一个简单的 Python 脚本,该脚本演示了一个运行 600 次迭代的循环,打印迭代次数并每次迭代睡眠 1 秒。因此,在提交的 Kubernetes Job 启动后,RayJob 运行约 600 秒。

  • 将 RayJob 的 shutdownAfterJobFinishes 设置为 true,以启用自动清理。此设置会触发 KubeRay 在 RayJob 完成后删除 RayCluster。

    • Kueue 不处理 shutdownAfterJobFinishes 设置为 false 的 RayJob 自定义资源。有关更多详细信息,请参阅 Kueue 的限制

kubectl create -f ray-job.kueue-toy-sample.yaml

每个 RayJob 自定义资源总共请求 2 个 CPU 和 4G 内存。但是,ClusterQueue 总共只有 3 个 CPU 和 6G 内存。因此,第二个 RayJob 自定义资源保持待定状态,即使剩余资源足以运行一个 Pod,KubeRay 也不会从待定的 RayJob 创建 Pod。您还可以检查 ClusterQueue 以查看可用和已用配额。

$ kubectl get clusterqueues.kueue.x-k8s.io
NAME            COHORT   PENDING WORKLOADS
cluster-queue            1

$ kubectl get clusterqueues.kueue.x-k8s.io cluster-queue -o yaml
Status:
  Admitted Workloads:  1 # Workloads admitted by queue.
  Conditions:
    Last Transition Time:  2024-02-28T22:41:28Z
    Message:               Can admit new workloads
    Reason:                Ready
    Status:                True
    Type:                  Active
  Flavors Reservation:
    Name:  default-flavor
    Resources:
      Borrowed:  0
      Name:      cpu
      Total:     2
      Borrowed:  0
      Name:      memory
      Total:     4Gi
  Flavors Usage:
    Name:  default-flavor
    Resources:
      Borrowed:         0
      Name:             cpu
      Total:            2
      Borrowed:         0
      Name:             memory
      Total:            4Gi
  Pending Workloads:    1
  Reserving Workloads:  1

当第一个 RayJob 自定义资源完成后,Kueue 会分配待定的 RayJob 自定义资源。检查 RayJob 自定义资源的狀態并在它们完成后删除它们。

$ kubectl get rayjobs.ray.io
NAME                  JOB STATUS   DEPLOYMENT STATUS   START TIME             END TIME               AGE
rayjob-sample-ckvq4   SUCCEEDED    Complete            xxxxx                  xxxxx                  xxx
rayjob-sample-p5msp   SUCCEEDED    Complete            xxxxx                  xxxxx                  xxx

$ kubectl delete rayjob rayjob-sample-ckvq4
$ kubectl delete rayjob rayjob-sample-p5msp

步骤 5:使用 Kueue 进行优先级调度#

此步骤首先创建一个具有较低优先级类 dev-priority 的 RayJob,然后创建一个具有较高优先级类 prod-priority 的 RayJob。具有较高优先级类 prod-priority 的 RayJob 优先于具有较低优先级类 dev-priority 的 RayJob。Kueue 抢占具有较低优先级的 RayJob 以分配具有较高优先级的 RayJob。

如果您按照上一步进行操作,RayJob YAML 清单 ray-job.kueue-toy-sample.yaml 应该已经设置为 dev-priority 优先级类。创建一个具有较低优先级类 dev-priority 的 RayJob。

kubectl create -f ray-job.kueue-toy-sample.yaml

在创建具有较高优先级类 prod-priority 的 RayJob 之前,修改 RayJob 元数据,如下所示

metadata:
  generateName: rayjob-sample-
  labels:
    kueue.x-k8s.io/queue-name: user-queue
    kueue.x-k8s.io/priority-class: prod-priority

创建一个具有较高优先级类 prod-priority 的 RayJob。

kubectl create -f ray-job.kueue-toy-sample.yaml

您可以看到 KubeRay 操作员删除了属于具有较低优先级类 dev-priority 的 RayJob 的 Pod,并创建了属于具有较高优先级类 prod-priority 的 RayJob 的 Pod。

使用 RayCluster 和 RayService#

RayCluster 与 Kueue#

对于 RayCluster 资源的集群调度,Kueue 确保所有集群组件(head 和 worker 节点)一起配置。这可以防止部分集群创建和资源浪费。有关详细的 RayCluster 集成:请参阅 Kueue RayCluster 文档

RayService 与 Kueue#

RayService 与 Kueue 的集成实现了模型服务工作负载的集群调度,确保为服务基础架构提供一致的资源分配。有关详细的 RayService 集成:请参阅 Kueue RayService 文档

Ray 自动伸缩器与 Kueue#

Kueue 可以将 **RayCluster** 或 **RayService** 的底层集群视为 **弹性工作负载**。Kueue 管理整个集群的排队和配额,而内置的 Ray 自动伸缩器则根据资源需求缩放 worker Pod 的数量。本节展示了如何使用与现有 Kueue 集成指南类似的循序渐进方法,为 Kueue 管理的 Ray 工作负载启用自动伸缩。

支持的资源 – 在撰写本文时,Kueue 自动伸缩器集成支持 RayClusterRayService。对 RayJob 自动伸缩的支持正在开发中;有关更新,请参阅 Kueue 的问题跟踪器:issue

先决条件#

请确保您已经

  • 安装了 KubeRay 操作员

  • 安装了 **Kueue**(有关更多详细信息,请参阅 Kueue 安装,请注意,建议安装 Kueue 版本 >= v0.13)


步骤 1:创建 Kueue 资源#

定义 **ResourceFlavor**、**ClusterQueue** 和 **LocalQueue**,以便 Kueue 知道可以分配多少 CPU 和内存。下面的清单创建了一个名为 default-flavor 的 8 CPU/16 GiB 池,将其注册到名为 ray-cqClusterQueue 中,并定义了一个名为 ray-lqLocalQueue

apiVersion: kueue.x-k8s.io/v1beta1
kind: ResourceFlavor
metadata:
  name: default-flavor
---
apiVersion: kueue.x-k8s.io/v1beta1
kind: ClusterQueue
metadata:
  name: ray-cq
spec:
  cohort: ray-example
  namespaceSelector: {}
  resourceGroups:
  - coveredResources: ["cpu", "memory"]
    flavors:
    - name: default-flavor
      resources:
      - name: cpu
        nominalQuota: 8
      - name: memory
        nominalQuota: 16Gi
---
apiVersion: kueue.x-k8s.io/v1beta1
kind: LocalQueue
metadata:
  name: ray-lq
  namespace: default
spec:
  clusterQueue: ray-cq

应用资源

kubectl apply -f kueue-resources.yaml

步骤 2:在 Kueue 中启用弹性工作负载#

自动伸缩仅在 Kueue 的 ElasticJobsViaWorkloadSlices 功能门控启用时才有效。

运行以下命令将功能门控标志添加到 kueue-controller-manager Deployment

kubectl -n kueue-system patch deploy kueue-controller-manager \
  --type='json' \
  -p='[
    {
      "op": "add",
      "path": "/spec/template/spec/containers/0/args/-",
      "value": "--feature-gates=ElasticJobsViaWorkloadSlices=true"
    }
  ]'

RayCluster 自动伸缩#

步骤 1:配置弹性 RayCluster#

弹性 RayCluster 是可以在运行时更改其 worker 数量的集群。Kueue 需要进行三处更改才能将 RayCluster 识别为弹性

  1. 队列标签 – 设置 metadata.labels.kueue.x-k8s.io/queue-name: <localqueue>,以便 Kueue 对此集群进行排队。

  2. 弹性作业注解 – 添加 metadata.annotations.kueue.x-k8s.io/elastic-job: "true" 以将此集群标记为弹性。Kueue 创建 **WorkloadSlices** 进行缩放。

  3. 启用 Ray 自动伸缩器 – 在 RayCluster spec 中设置 spec.enableInTreeAutoscaling: true,并可选地配置 autoscalerOptions,例如 idleTimeoutSeconds

以下是一个最小的弹性 RayCluster 清单

apiVersion: ray.io/v1
kind: RayCluster
metadata:
  name: raycluster-kueue-autoscaler
  namespace: default
  labels:
    kueue.x-k8s.io/queue-name: ray-lq
  annotations:
    kueue.x-k8s.io/elastic-job: "true"  # Mark as elastic
spec:
  rayVersion: "2.46.0"
  enableInTreeAutoscaling: true          # Turn on the Ray autoscaler
  autoscalerOptions:
    idleTimeoutSeconds: 60              # Delete idle workers after 60 s
  headGroupSpec:
    serviceType: ClusterIP
    rayStartParams:
      dashboard-host: "0.0.0.0"
    template:
      spec:
        containers:
        - name: ray-head
          image: rayproject/ray:2.46.0
          resources:
            requests:
              cpu: "1"
              memory: "2Gi"
            limits:
              cpu: "2"
              memory: "5Gi"
  workerGroupSpecs:
  - groupName: workers
    replicas: 0       # start with no workers; autoscaler will add them
    minReplicas: 0    # lower bound
    maxReplicas: 4    # upper bound
    rayStartParams: {}
    template:
      spec:
        containers:
        - name: ray-worker
          image: rayproject/ray:2.46.0
          resources:
            requests:
              cpu: "1"
              memory: "1Gi"
            limits:
              cpu: "2"
              memory: "5Gi"

应用此清单并验证 Kueue 是否已分配相应的 Workload。

kubectl apply -f raycluster-kueue-autoscaler.yaml
kubectl get workloads.kueue.x-k8s.io -A

一旦 RayCluster 由 Kueue 调度,ADMITTED 列应显示 True

NAMESPACE   NAME                                           QUEUE    RESERVED IN   ADMITTED   FINISHED   AGE
default     raycluster-raycluster-kueue-autoscaler-21c46   ray-lq   ray-cq        True                  26s

步骤 2:验证 RayCluster 的自动伸缩#

要观察自动伸缩,请在集群上创建负载并监视 worker Pod 的出现。以下过程从 head Pod 内部运行一个 CPU 密集型工作负载,并监视缩放情况。

  1. 进入 head Pod

    HEAD_POD=$(kubectl get pod -l ray.io/node-type=head,ray.io/cluster=raycluster-kueue-autoscaler \
      -o jsonpath='{.items[0].metadata.name}')
    kubectl exec -it "$HEAD_POD" -- bash
    
  2. 运行工作负载:在 head 容器内执行以下 Python 脚本。它提交 20 个任务,每个任务消耗一个完整的 CPU 大约一分钟。

    python << 'EOF'
    import ray, time
    
    ray.init(address="auto")
    
    @ray.remote(num_cpus=1)
    def busy():
        end = time.time() + 60
        while time.time() < end:
            x = 0
            for i in range(100_000):
                x += i * i
        return 1
    
    tasks = [busy.remote() for _ in range(20)]
    print(sum(ray.get(tasks)))
    EOF
    

    由于 head Pod 只有一个 CPU,任务会排队,自动伸缩器会将 worker 副本数量提升到 maxReplicas

  3. 监视 worker Pod:在另一个终端中,监视 worker Pod 的缩放情况。

    kubectl get pods -w \
      -l ray.io/cluster=raycluster-kueue-autoscaler,ray.io/node-type=worker
    

    任务运行时应出现新的 worker Pod,在工作负载完成且空闲超时结束后消失。

RayService 自动伸缩#

步骤 1:配置弹性 RayService#

RayService 通过将 spec.rayClusterConfig 实现为托管的 RayCluster 来部署 Ray Serve 应用程序。Kueue 不直接与 RayService 对象交互。相反,KubeRay 操作员将相关的元数据从 RayService 传播到托管的 RayCluster,**Kueue 对该 RayCluster 进行排队和分配**。

要使 RayService 与 Kueue 和 Ray 自动伸缩器配合使用

  1. 队列标签 metadata.labels.kueue.x-k8s.io/queue-nameRayService 上。KubeRay 将服务标签传递给底层 RayCluster,从而允许 Kueue 对其进行排队。

  2. 弹性作业注解 metadata.annotations.kueue.x-k8s.io/elastic-job: "true"。此注解会传播到 RayCluster 并指示 Kueue 将其视为弹性工作负载。

  3. 启用 Ray 自动伸缩器 spec.rayClusterConfig,设置 enableInTreeAutoscaling: true 并指定 worker minReplicas/maxReplicas

以下清单部署了一个简单的 Ray Serve 应用程序,并启用了自动伸缩。Serve 应用程序(demo_app)和部署名称(ServiceAServiceB)是占位符,用于避免暗示特定的 KubeRay 示例。请根据您的应用程序调整部署和资源。

apiVersion: ray.io/v1
kind: RayService
metadata:
  name: rayservice-kueue-autoscaler
  namespace: default
  labels:
    kueue.x-k8s.io/queue-name: ray-lq       # copy to RayCluster
  annotations:
    kueue.x-k8s.io/elastic-job: "true"       # mark as elastic
spec:
  # A simple Serve config with two deployments
  serveConfigV2: |
    applications:
      - name: fruit_app
        import_path: fruit.deployment_graph
        route_prefix: /fruit
        runtime_env:
          working_dir: "https://github.com/ray-project/test_dag/archive/78b4a5da38796123d9f9ffff59bab2792a043e95.zip"
        deployments:
          - name: MangoStand
            num_replicas: 2
            max_replicas_per_node: 1
            user_config:
              price: 3
            ray_actor_options:
              num_cpus: 0.1
          - name: OrangeStand
            num_replicas: 1
            user_config:
              price: 2
            ray_actor_options:
              num_cpus: 0.1
          - name: PearStand
            num_replicas: 1
            user_config:
              price: 1
            ray_actor_options:
              num_cpus: 0.1
          - name: FruitMarket
            num_replicas: 1
            ray_actor_options:
              num_cpus: 0.1
      - name: math_app
        import_path: conditional_dag.serve_dag
        route_prefix: /calc
        runtime_env:
          working_dir: "https://github.com/ray-project/test_dag/archive/78b4a5da38796123d9f9ffff59bab2792a043e95.zip"
        deployments:
          - name: Adder
            num_replicas: 1
            user_config:
              increment: 3
            ray_actor_options:
              num_cpus: 0.1
          - name: Multiplier
            num_replicas: 1
            user_config:
              factor: 5
            ray_actor_options:
              num_cpus: 0.1
          - name: Router
            num_replicas: 1
  rayClusterConfig:
    rayVersion: "2.46.0"
    enableInTreeAutoscaling: true
    autoscalerOptions:
      idleTimeoutSeconds: 60
    headGroupSpec:
      serviceType: ClusterIP
      rayStartParams:
        dashboard-host: "0.0.0.0"
      template:
        spec:
          containers:
          - name: ray-head
            image: rayproject/ray:2.46.0
            resources:
              requests:
                cpu: "1"
                memory: "2Gi"
              limits:
                cpu: "2"
                memory: "5Gi"
    workerGroupSpecs:
    - groupName: workers
      replicas: 1            # initial workers
      minReplicas: 1         # lower bound
      maxReplicas: 5         # upper bound
      rayStartParams: {}
      template:
        spec:
          containers:
          - name: ray-worker
            image: rayproject/ray:2.46.0
            resources:
              requests:
                cpu: "1"
                memory: "1Gi"
              limits:
                cpu: "2"
                memory: "5Gi"

应用清单并验证服务的 RayCluster 是否已由 Kueue 分配。

kubectl apply -f rayservice-kueue-autoscaler.yaml
kubectl get workloads.kueue.x-k8s.io -A

一旦 RayService 已由 Kueue 调度,ADMITTED 列应显示 True

NAMESPACE   NAME                                                 QUEUE    RESERVED IN   ADMITTED   FINISHED   AGE
default     raycluster-rayservice-kueue-autoscaler-9xvcr-d7add   ray-lq   ray-cq        True                  21s

步骤 2:验证 RayService 的自动伸缩#

RayService 的自动伸缩最终由托管 RayCluster 上的负载驱动。验证过程与普通 RayCluster 的验证过程相同。

要验证自动伸缩

  1. 按照 步骤 2:验证 RayCluster 的自动伸缩 中的步骤进行操作,但使用 RayService 名称作为标签选择器。具体来说

    • 选择 head Pod 时,请使用(请记住替换您的集群名称)

      HEAD_POD=$(kubectl get pod \
      -l ray.io/node-type=head,ray.io/cluster=rayservice-kueue-autoscaler-9xvcr \
      -o jsonpath='{.items[0].metadata.name}')
      kubectl exec -it "$HEAD_POD" -- bash
      
    • 在 head 容器内,运行与 RayCluster 示例中相同的 CPU 密集型 Python 脚本。

      python << 'EOF'
      import ray, time
      
      ray.init(address="auto")
      
      @ray.remote(num_cpus=1)
      def busy():
          end = time.time() + 60
          while time.time() < end:
              x = 0
              for i in range(100_000):
                  x += i * i
          return 1
      
      tasks = [busy.remote() for _ in range(20)]
      print(sum(ray.get(tasks)))
      EOF
      
    • 在另一个终端中,使用以下命令监视 worker Pod:

      kubectl get pods -w \
        -l ray.io/cluster=rayservice-kueue-autoscaler,ray.io/node-type=worker
      

与 RayCluster 情况一样,worker Pod 在 CPU 密集型任务运行时会扩展到 maxReplicas,在任务完成且空闲超时结束后会缩减到 minReplicas。唯一的区别是,ray.io/cluster 标签现在匹配 RayService 名称(rayservice-kueue-autoscaler-9xvcr),而不是独立的 RayCluster 名称(raycluster-kueue-autoscaler)。

限制#

  • 功能状态ElasticJobsViaWorkloadSlices 功能门控当前为 **alpha** 状态。弹性自动伸缩仅适用于用 kueue.x-k8s.io/elastic-job: "true" 注解且配置了 enableInTreeAutoscaling: true(当 ray 镜像 < 2.47.0 时)的 RayCluster。

  • RayJob 支持RayJob 的自动伸缩尚不支持。Kueue 的维护者正在积极跟踪此项工作,并在其可用时更新其文档。

  • Kueue v0.13 之前的版本 – 如果您使用的 Kueue 版本早于 v0.13,请在安装后重启一次 Kueue 控制器,以确保 RayCluster 管理正常工作。