高级 Ray Serve 自动扩缩容#

本指南将介绍 autoscaling_config 中的更高级自动扩缩容参数以及一个高级模型组合示例。

自动扩缩容配置参数#

在本节中,我们将更详细地介绍 Serve 自动扩缩容的概念以及如何设置自动扩缩容配置。

[必需] 定义系统的稳定状态#

要定义部署的稳定状态,请设置 target_ongoing_requestsmax_ongoing_requests 的值。

target_ongoing_requests [默认值=2]#

注意

在 Ray 2.32.0 中,target_ongoing_requests 的默认值从 1.0 更改为 2.0。您可以继续手动设置以覆盖默认值。

Serve 会根据每个副本的平均正在处理的请求数来扩展或缩减部署的副本数量。具体来说,Serve 会将每个副本的*实际*正在处理的请求数与您在自动扩缩容配置中设置的目标值进行比较,并据此做出扩展或缩减的决定。使用 target_ongoing_requests 设置目标值,Serve 会尝试确保每个副本大约有指定数量的请求正在处理或排队等待。

始终对您的工作负载进行负载测试。例如,如果使用场景对延迟敏感,您可以降低 target_ongoing_requests 的值以保持高性能。对您的应用程序代码进行基准测试,并根据端到端延迟目标设置此值。

注意

例如,假设您有一个同步部署的两个副本,该部署具有 100ms 的延迟,并处理 30 QPS 的流量负载。那么 Serve 分配请求到副本的速度比副本处理完的速度快;随着时间的推移,越来越多的请求会排队到副本(这些请求是“正在处理的请求”),然后每个副本上正在处理的请求的平均数量会稳步增加。延迟也会增加,因为新请求必须等待旧请求处理完成。如果您设置 target_ongoing_requests = 1,Serve 会检测到每个副本的正在处理的请求数高于预期值,并添加更多副本。对于 3 个副本,您的系统平均每秒可以处理 30 个请求,每个副本有 1 个正在处理的请求。

max_ongoing_requests [默认值=5]#

注意

在 Ray 2.32.0 中,max_ongoing_requests 的默认值从 100 更改为 5。您可以继续手动设置以覆盖默认值。

还有一个最大队列限制,代理在将请求分配给副本时会遵守该限制。使用 max_ongoing_requests 定义此限制。将 max_ongoing_requests 设置为比 target_ongoing_requests 高约 20% 到 50%。

  • 将其设置得太低可能会限制吞吐量。请求将倾向于排队在代理处等待副本处理现有请求,而不是被转发给副本进行并发执行。

注意

max_ongoing_requests 应该调高,特别是对于轻量级请求,否则会影响整体吞吐量。

  • 将其设置得太高可能导致路由不平衡。具体来说,这可能导致在扩展期间出现非常高的尾部延迟,因为当自动扩缩容器由于流量高峰而扩展部署时,在启动新副本之前,大部分或所有请求可能都会被分配给现有副本。

[必需] 定义自动扩缩容的上限和下限#

要使用自动扩缩容,您需要定义系统允许的最小和最大资源数量。

  • min_replicas [默认值=1]:这是部署的最小副本数。如果您希望系统始终能够处理一定级别的流量,请将 min_replicas 设置为正数。另一方面,如果您预计会出现流量为空的时期并希望扩展到零以节省成本,请将 min_replicas = 0。请注意,将 min_replicas = 0 设置为 0 会导致更高的尾部延迟;当您开始发送流量时,部署会扩展,并且在 Serve 等待启动副本以服务请求时会出现冷启动时间。

  • max_replicas [默认值=1]:这是部署的最大副本数。此值应大于 min_replicas。当当前可用的集群资源(CPU、GPU 等)不足以支持更多副本时,Ray Serve 自动扩缩容依赖于 Ray 自动扩缩容器来扩展更多节点。

  • initial_replicas:这是为部署初始启动的副本数。此值默认为 min_replicas 的值。

[可选] 定义系统如何响应流量变化#

在稳定的流量流和适当配置的 min_replicasmax_replicas 的情况下,系统的稳定状态基本上是固定的,取决于 target_ongoing_requests 的选定配置值。然而,在达到稳定状态之前,您的系统正在响应流量变化。您希望系统如何响应流量变化决定了您希望如何设置其余的自动扩缩容配置。

  • upscale_delay_s [默认值=30s]:这定义了 Serve 在扩展部署副本数量之前等待的时间。换句话说,此参数控制扩展决策的频率。如果副本*持续*为 upscale_delay_s 秒提供比预期更多的请求,那么 Serve 将根据汇总的正在处理的请求指标来扩展副本数量。例如,如果您的服务可能会遇到流量爆发,您可以降低 upscale_delay_s 以便您的应用程序能够快速响应流量增加。

Ray Serve 允许您为不同的缩减场景使用不同的延迟,从而提供更精细地控制何时移除副本。当您希望在缩减到零与缩减到非零副本数量之间具有不同行为时,这特别有用。

  • downscale_delay_s [默认值=600s]:这定义了 Serve 在缩减部署副本数量之前等待的时间。如果副本*持续*在 downscale_delay_s 秒内提供比预期少的请求,Serve 将根据汇总的正在处理的请求指标来缩减副本数量。此延迟适用于所有缩减决策,除了可选的 1→0 转换(见下文)。例如,如果您的应用程序初始化速度较慢,您可以增加 downscale_delay_s 以使缩减不那么频繁,并避免在应用程序需要再次扩展时产生重新初始化的成本。

  • downscale_to_zero_delay_s [可选]:这定义了 Serve 在从一个副本缩减到零之前等待的时间(仅在 min_replicas = 0 时适用)。如果未指定,1→0 转换将使用 downscale_delay_s 的值。当您希望进行更保守的扩展到零行为时,这很有用。例如,您可以将 downscale_delay_s = 300 设置为常规缩减,但将 downscale_to_zero_delay_s = 1800 设置为等待 30 分钟后再扩展到零,从而避免在短暂不活动期间产生冷启动。

  • upscale_smoothing_factor [默认值=1.0] (已弃用):此参数已重命名为 upscaling_factorupscale_smoothing_factor 将在未来版本中删除。

  • downscale_smoothing_factor [默认值=1.0] (已弃用):此参数已重命名为 downscaling_factordownscale_smoothing_factor 将在未来版本中删除。

  • upscaling_factor [默认值=1.0]:用于放大或缓和每次扩展决策的乘数。例如,当应用程序在短时间内流量量很高时,您可以增加 upscaling_factor 以快速扩展资源。此参数就像一个“增益”因子,用于放大自动扩缩容算法的响应。

  • downscaling_factor [默认值=1.0]:用于放大或缓和每次缩减决策的乘数。例如,如果您希望您的应用程序对流量下降不那么敏感,并更保守地缩减,您可以减小 downscaling_factor 以减缓缩减的步伐。

  • metrics_interval_s [默认值=10]:未来此部署级别的配置将被删除,取而代之的是跨应用程序级别的全局配置。

这控制每个副本和句柄多久向自动扩缩容器发送一次有关当前正在处理的请求的报告。请注意,如果自动扩缩容器未收到更新的指标,则无法做出新的决策,因此您最有可能希望将这些值设置为小于或等于扩展和缩减延迟值。例如,如果您设置 upscale_delay_s = 3,但将推送间隔保持为 10 秒,则自动扩缩容器大约每 10 秒才扩展一次。

  • look_back_period_s [默认值=30]:这是计算每个副本的平均正在处理的请求数的窗口。

  • aggregation_function [默认值=“mean”]:这控制了如何在 look_back_period_s 时间窗口内聚合指标。聚合函数决定了 Ray Serve 如何将多个测量值合并为一个值以用于自动扩缩容决策。支持的值

    • "mean"(默认):使用指标的时间加权平均值。这提供了平滑的扩展行为,能够响应持续的流量模式。

    • "max":使用观察到的最大指标值。这使得自动扩缩容对峰值更敏感,在任何副本出现高负载时都能快速扩展。

    • "min":使用观察到的最小指标值。这导致更保守的扩展行为。

对于大多数工作负载,默认的 "mean" 聚合提供了最佳平衡。如果您需要快速响应流量峰值,请使用 "max",如果您更喜欢避免快速波动的保守扩展,请使用 "min"

自动扩缩容指标的工作原理#

理解指标如何在自动扩缩容系统中流动有助于您有效地配置参数。指标管道涉及多个阶段,每个阶段都有其自己的计时参数

┌──────────────────────────────────────────────────────────────────────────┐
│  Metrics Pipeline Overview                                               │
├──────────────────────────────────────────────────────────────────────────┤
│                                                                          │
│  Replicas/Handles         Controller             Autoscaling Policy      │
│  ┌──────────┐             ┌──────────┐           ┌──────────┐            │
│  │ Record   │   Push      │ Receive  │  Decide   │ Policy   │            │
│  │ Metrics  │────────────>│ Metrics  │──────────>│ Runs     │            │
│  │ (10s)    │   (10s)     │          │  (0.1s)   │          │            │
│  └──────────┘             │ Aggregate│           └──────────┘            │
│                           │ (30s)    │                                   │
│                           └──────────┘                                   │
│                                                                          │
└──────────────────────────────────────────────────────────────────────────┘

阶段 1:指标记录#

副本和部署句柄会持续记录自动扩缩容指标

  • 内容:正在处理的请求数量(排队中 + 运行中)

  • 频率:每 10 秒(可通过 metrics_interval_s 配置)

  • 存储:指标作为时间序列本地存储

阶段 2:指标推送#

副本和句柄会定期将它们的指标推送到控制器

  • 频率:每 10 秒(可通过 RAY_SERVE_REPLICA_AUTOSCALING_METRIC_PUSH_INTERVAL_SRAY_SERVE_HANDLE_AUTOSCALING_METRIC_PUSH_INTERVAL_S 配置)

  • 发送数据:原始时间序列数据和预聚合指标

    • 原始时间序列:在发送前,数据点会被裁剪到 look_back_period_s 窗口(只发送窗口内的最近测量值)

    • 预聚合指标:在副本/句柄处,在 look_back_period_s 窗口上计算的简单平均值

  • 控制器使用:控制器根据 RAY_SERVE_AGGREGATE_METRICS_AT_CONTROLLER 设置决定使用哪些数据(请参阅下面的阶段 3)

阶段 3:指标聚合#

控制器聚合指标以计算所有副本的总正在处理的请求。Ray Serve 支持两种聚合模式(由 RAY_SERVE_AGGREGATE_METRICS_AT_CONTROLLER 控制)

简单模式(默认 - RAY_SERVE_AGGREGATE_METRICS_AT_CONTROLLER=0

  • 输入:来自副本/句柄的预聚合简单平均值(已裁剪到 look_back_period_s

  • 方法:将所有来源的预聚合值相加。每个组件在发送前计算简单平均值(算术平均值)。

  • 输出:表示总正在处理的请求的单个值

  • 特点:轻量级,适用于大多数工作负载。但是,由于它使用简单平均值而不是时间加权平均值,因此当副本具有不同的指标报告间隔或指标在不同时间到达时,其精度可能较低。

聚合模式(实验性 - RAY_SERVE_AGGREGATE_METRICS_AT_CONTROLLER=1

  • 输入:来自副本/句柄的原始时间序列数据(已裁剪到 look_back_period_s

  • 方法:使用 aggregation_function(mean、max 或 min)进行时间加权聚合。它使用瞬时合并方法,将指标视为右连续阶梯函数。

  • 输出:表示总正在处理的请求的单个值

  • 特点:提供更数学上准确的聚合,特别是当副本以不同间隔报告指标或您需要精确的时间加权平均值时。代价是增加了控制器开销。

注意

聚合模式下仅适用 aggregation_function 参数。在简单模式下,聚合始终是预计算的简单平均值的总和。

注意

长期计划是弃用简单模式,转而使用聚合模式。聚合模式提供更准确的指标聚合,并将在未来版本中成为默认设置。考虑在您的部署中测试聚合模式(RAY_SERVE_AGGREGATE_METRICS_AT_CONTROLLER=1),以准备此过渡。

阶段 4:策略执行#

自动扩缩容策略会频繁运行以做出扩展决策,有关实现自定义扩展逻辑的详细信息,请参阅自定义部署策略

  • 频率:每 0.1 秒(可通过 RAY_SERVE_CONTROL_LOOP_INTERVAL_S 配置)

  • 输入AutoscalingContext

  • 输出(target_replicas, updated_policy_state) 元组

计时参数交互#

计时参数以重要的方式相互作用

记录间隔与推送间隔

  • 推送间隔 ≥ 记录间隔

  • 记录间隔(10 秒)决定了数据的粒度

  • 推送间隔(10 秒)决定了控制器数据的时效性

  • 默认值:每次推送包含 1 个数据点(10 秒 ÷ 10 秒)

推送间隔与回看期

  • look_back_period_s(30 秒)应 ≥ 推送间隔(10 秒)

  • 如果回看期太短,您将没有足够的数据来做出稳定的决策

  • 如果回看期太长,自动扩缩容将变得不那么响应

推送间隔与控制循环

  • 控制循环(0.1 秒)的运行速度远快于指标的到达速度(10 秒)

  • 大多数控制循环迭代都会重用现有指标

  • 只有当新鲜指标到达时,新的扩展决策才会发生

推送间隔与扩展/缩减延迟

  • 延迟(30 秒/600 秒)应 ≥ 推送间隔(10 秒)

  • 通常,延迟应设置为推送间隔的倍数,这样自动扩缩容器只会在连续多次违反指标后做出反应——这可以过滤掉短暂的峰值,并防止出现嘈杂、震荡的扩展。

  • 示例:upscale_delay_s = 5 但推送间隔 = 10 秒表示实际延迟 ≈ 10 秒

建议: 除非有特殊需求,否则请保持默认值。如果您需要更快的自动扩缩容,请先减小推送间隔,然后调整延迟。

环境变量#

几个环境变量会在较低级别控制自动扩缩容行为。这些变量会影响指标收集和控制循环计时

控制循环和超时设置#

  • RAY_SERVE_CONTROL_LOOP_INTERVAL_S(默认值:0.1 秒):Ray Serve 控制器运行自动扩缩容控制循环的频率。您的自动扩缩容策略函数以此频率执行。默认值 0.1 秒意味着策略大约每秒运行 10 次。

  • RAY_SERVE_RECORD_AUTOSCALING_STATS_TIMEOUT_S(默认值:10.0 秒):在自定义指标收集期间,record_autoscaling_stats() 方法完成允许的最大时间。如果超过此超时时间,则指标收集失败并记录警告。

  • RAY_SERVE_MIN_HANDLE_METRICS_TIMEOUT_S(默认值:10.0 秒):句柄指标收集的最小超时时间。系统使用此值和 2 * metrics_interval_s 中的较大者来确定何时丢弃过时的句柄指标。

高级功能标志#

  • RAY_SERVE_AGGREGATE_METRICS_AT_CONTROLLER(默认值:false):启用实验性指标聚合模式,在该模式下,控制器聚合原始时间序列数据,而不是使用预聚合指标。此模式提供更准确的时间加权平均值,但可能会增加控制器开销。有关详细信息,请参阅“自动扩缩容指标的工作原理”中的阶段 3。

模型组合示例#

确定多模型应用程序的自动扩缩容配置需要了解每个部署的扩展需求。每个部署都有不同的延迟和不同的并发级别。因此,为模型组合应用程序找到正确的自动扩缩容配置需要进行实验。

此示例是一个简单的应用程序,包含三个组合在一起的部署,以建立对多模型自动扩缩容的直观认识。假设这些部署

  • HeavyLoad:一个模拟的 200 毫秒工作负载,CPU 使用率高。

  • LightLoad:一个模拟的 100 毫秒工作负载,CPU 使用率高。

  • Driver:一个驱动部署,它分发到 HeavyLoadLightLoad 部署,并聚合两个输出。

尝试 1:一个 Driver 副本#

首先考虑以下部署配置。由于驱动部署的 CPU 使用率较低,并且它仅异步调用下游部署,因此分配一个固定的 Driver 副本是合理的。

- name: Driver
  num_replicas: 1
  max_ongoing_requests: 200
- name: HeavyLoad
  max_ongoing_requests: 3
  autoscaling_config:
    target_ongoing_requests: 1
    min_replicas: 0
    initial_replicas: 0
    max_replicas: 200
    upscale_delay_s: 3
    downscale_delay_s: 60
    upscaling_factor: 0.3
    downscaling_factor: 0.3
    metrics_interval_s: 2
    look_back_period_s: 10
- name: LightLoad
  max_ongoing_requests: 3
  autoscaling_config:
    target_ongoing_requests: 1
    min_replicas: 0
    initial_replicas: 0
    max_replicas: 200
    upscale_delay_s: 3
    downscale_delay_s: 60
    upscaling_factor: 0.3
    downscaling_factor: 0.3
    metrics_interval_s: 2
    look_back_period_s: 10
import time

from ray import serve
from ray.serve.handle import DeploymentHandle


@serve.deployment
class LightLoad:
    async def __call__(self) -> str:
        start = time.time()
        while time.time() - start < 0.1:
            pass

        return "light"


@serve.deployment
class HeavyLoad:
    async def __call__(self) -> str:
        start = time.time()
        while time.time() - start < 0.2:
            pass

        return "heavy"


@serve.deployment
class Driver:
    def __init__(self, a_handle, b_handle):
        self.a_handle: DeploymentHandle = a_handle
        self.b_handle: DeploymentHandle = b_handle

    async def __call__(self) -> str:
        a_future = self.a_handle.remote()
        b_future = self.b_handle.remote()

        return (await a_future), (await b_future)


app = Driver.bind(HeavyLoad.bind(), LightLoad.bind())

运行与Resnet 工作负载相同的 Locust 负载测试会产生以下结果

HeavyLoad 和 LightLoad 副本数

comp

正如您所料,自动扩展的 LightLoad 副本数量大约是自动扩展的 HeavyLoad 副本数量的一半。尽管两个部署每秒收到的请求数量相同,但 LightLoad 副本每秒可以处理的请求数是 HeavyLoad 副本的两倍,因此该部署处理相同流量负载所需的副本数量应该是前者的一半。

不幸的是,当 Locust 用户数增加到 100 时,服务延迟从 230 毫秒上升到 400 毫秒。

P50 延迟

QPS

comp_latency

comp_rps

请注意,为了充分处理 Locust 流量,HeavyLoad 副本的数量应大致等于 Locust 用户数。然而,当 Locust 用户数增加到 100 时,HeavyLoad 部署难以达到 100 个副本,而是只达到了 65 个副本。每个部署的延迟揭示了根本原因。虽然 HeavyLoadLightLoad 的延迟分别保持在 200 毫秒和 100 毫秒,但 Driver 的延迟从 230 毫秒上升到 400 毫秒。这表明高 Locust 工作负载可能正在压垮 Driver 副本并影响其异步事件循环的性能。

尝试 2:自动扩展 Driver#

在此尝试中,也为 Driver 设置了自动扩缩容配置,并将 target_ongoing_requests = 20。现在部署配置如下

- name: Driver
  max_ongoing_requests: 200
  autoscaling_config:
    target_ongoing_requests: 20
    min_replicas: 1
    initial_replicas: 1
    max_replicas: 10
    upscale_delay_s: 3
    downscale_delay_s: 60
    upscaling_factor: 0.3
    downscaling_factor: 0.3
    metrics_interval_s: 2
    look_back_period_s: 10
- name: HeavyLoad
  max_ongoing_requests: 3
  autoscaling_config:
    target_ongoing_requests: 1
    min_replicas: 0
    initial_replicas: 0
    max_replicas: 200
    upscale_delay_s: 3
    downscale_delay_s: 60
    upscaling_factor: 0.3
    downscaling_factor: 0.3
    metrics_interval_s: 2
    look_back_period_s: 10
- name: LightLoad
  max_ongoing_requests: 3
  autoscaling_config:
    target_ongoing_requests: 1
    min_replicas: 0
    initial_replicas: 0
    max_replicas: 200
    upscale_delay_s: 3
    downscale_delay_s: 60
    upscaling_factor: 0.3
    downscaling_factor: 0.3
    metrics_interval_s: 2
    look_back_period_s: 10

再次运行相同的 Locust 负载测试会产生以下结果

HeavyLoad 和 LightLoad 副本数

heavy

Driver 副本数

driver

有了多达 6 个 Driver 部署来接收和分发传入的请求,HeavyLoad 部署成功扩展到 90 多个副本,LightLoad 扩展到 47 个副本。此配置有助于在流量负载增加时保持应用程序延迟的一致性。

改进的 P50 延迟

改进的 RPS

comp_latency

comp_latency

故障排除指南#

自动扩展副本数量不稳定#

如果您的部署中的副本数量不断波动,即使流量相对稳定,请尝试以下方法

  • 设置更小的 upscaling_factordownscaling_factor。将这两个值都小于 1 有助于自动扩缩容器做出更保守的扩展和缩减决策。它有效地平滑了副本图表,减少了“尖锐的边缘”。

  • 设置一个与其余自动扩缩容配置匹配的 look_back_period_s 值。对于更长的扩展和缩减延迟值,更长的回看期可能有助于稳定副本图表,但对于更短的扩展和缩减延迟值,更短的回看期可能更合适。例如,以下副本图表显示了一个具有 upscale_delay_s = 3 的部署如何使用更长与更短的回看期。

look_back_period_s = 30

look_back_period_s = 3

look-back-before

look-back-after

流量高峰期间延迟出现高尖峰#

如果您预计您的应用程序会收到突发流量,并且同时希望部署在不活动期间缩减,那么您可能关心部署能够多快地扩展并响应突发流量。虽然突发流量开始时延迟的增加可能是不可避免的,但您可以尝试以下方法来改善突发流量期间的延迟。

  • 设置更低的 upscale_delay_s。自动扩缩容器始终等待 upscale_delay_s 秒后再做出扩展决策,因此降低此延迟允许自动扩缩容器更快地响应变化,特别是突发流量。

  • 设置更大的 upscaling_factor。如果 upscaling_factor > 1,则自动扩缩容器比正常情况更积极地扩展。此设置可以使您的部署对流量峰值更加敏感。

  • 降低 metrics_interval_s。始终将 metrics_interval_s 设置为小于或等于 upscale_delay_s,否则扩展会延迟,因为自动扩缩容器不够频繁地接收到最新的信息。

  • 设置更低的 max_ongoing_requests。如果 max_ongoing_requests 相对于 target_ongoing_requests 过高,那么当流量增加时,Serve 可能会在启动新副本之前将大部分或所有请求分配给现有副本。此设置可能导致扩展期间的延迟非常高。

部署缩减过快#

您可能会发现部署缩减过快。相反,您可能希望缩减更加保守,以最大化服务的可用性。

  • 设置更长的 downscale_delay_s。自动扩缩容器在做出缩减决策之前始终等待 downscale_delay_s 秒,因此通过增加此数字,您的系统在流量下降后有更长的“宽限期”,然后自动扩缩容器才开始移除副本。

  • 设置更小的 downscaling_factor。如果 downscaling_factor < 1,则自动扩缩容器移除的副本数量比它认为应移除以达到目标正在处理的请求数量要少。换句话说,自动扩缩容器做出更保守的缩减决策。

downscaling_factor = 1

downscaling_factor = 0.5

downscale-smooth-before

downscale-smooth-after

自定义自动扩缩容策略#

警告

自定义自动扩缩容策略是实验性的,并且可能在未来的版本中发生更改。

Ray Serve 的内置、由请求驱动的自动扩缩容适用于大多数应用程序。当您需要更多控制时,请使用*自定义自动扩缩容策略*——例如,基于外部指标(CloudWatch、Prometheus)进行扩展,预测可预测的流量(计划的批处理作业),或应用超出队列阈值的业务逻辑。

自定义策略允许您基于选择的任何指标或规则来实现扩展逻辑。

自定义部署策略#

自定义自动伸缩策略是用户提供的 Python 函数,它接收一个 AutoscalingContext 对象,并返回一个元组 (target_replicas, policy_state),用于单个部署。

  • 当前状态: 当前副本数量和部署元数据。

  • 内置指标: 总请求数、排队请求数、每个副本的计数。

  • 自定义指标: 您的部署通过 record_autoscaling_stats() 报告的值。(见下文。)

  • 容量边界: min / max 副本限制,已根据当前集群容量进行调整。

  • 策略状态: 一个 dict,您可以使用它来在控制循环迭代之间持久化任意状态。

  • 时间: 上一次缩放操作和“现在”的时间戳。

以下示例展示了一个在工作时间和晚上批量处理期间扩展,在非高峰时段缩小的策略

from datetime import datetime
from typing import Any, Dict
from ray.serve.config import AutoscalingContext


def scheduled_batch_processing_policy(
    ctx: AutoscalingContext,
) -> tuple[int, Dict[str, Any]]:
    current_time = datetime.now()
    current_hour = current_time.hour
    # Scale up during business hours (9 AM - 5 PM)
    if 9 <= current_hour < 17:
        return 2, {"reason": "Business hours"}
    # Scale up for evening batch processing (6 PM - 8 PM)
    elif 18 <= current_hour < 20:
        return 4, {"reason": "Evening batch processing"}
    # Minimal scaling during off-peak hours
    else:
        return 1, {"reason": "Off-peak hours"}


import asyncio

from ray import serve
from ray.serve.config import AutoscalingConfig, AutoscalingPolicy


@serve.deployment(
    autoscaling_config=AutoscalingConfig(
        min_replicas=1,
        max_replicas=12,
        policy=AutoscalingPolicy(
            policy_function="autoscaling_policy:scheduled_batch_processing_policy"
        ),
        metrics_interval_s=0.1,
    ),
    max_ongoing_requests=3,
)
class BatchProcessingDeployment:
    async def __call__(self) -> str:
        # Simulate batch processing work
        await asyncio.sleep(0.5)
        return "Hello, world!"


app = BatchProcessingDeployment.bind()

策略是 **按部署** 定义的。如果您不提供策略,Ray Serve 将回退到其内置的基于请求的策略。

Ray Serve 控制器每 RAY_SERVE_CONTROL_LOOP_INTERVAL_S 秒(默认 **0.1 秒**)调用一次策略函数,因此您的逻辑将针对近乎实时状态运行。

警告

保持策略函数 **快速且轻量**。缓慢的逻辑会阻塞 Serve 控制器并降低集群响应能力。

自定义指标#

您可以通过从部署中发出自己的指标来做出更丰富的决策。实现 record_autoscaling_stats() 以返回一个 dict[str, float]。Ray Serve 将在 AutoscalingContext 中显示这些值。

此示例演示了部署如何提供自己的指标(CPU 使用率、内存使用率),以及自动伸缩策略如何使用这些指标来做出伸缩决策

from typing import Any, Dict
from ray.serve.config import AutoscalingContext


def custom_metrics_autoscaling_policy(
    ctx: AutoscalingContext,
) -> tuple[int, Dict[str, Any]]:
    cpu_usage_metric = ctx.aggregated_metrics.get("cpu_usage", {})
    memory_usage_metric = ctx.aggregated_metrics.get("memory_usage", {})
    max_cpu_usage = list(cpu_usage_metric.values())[-1] if cpu_usage_metric else 0
    max_memory_usage = (
        list(memory_usage_metric.values())[-1] if memory_usage_metric else 0
    )

    if max_cpu_usage > 80 or max_memory_usage > 85:
        return min(ctx.capacity_adjusted_max_replicas, ctx.current_num_replicas + 1), {}
    elif max_cpu_usage < 30 and max_memory_usage < 40:
        return max(ctx.capacity_adjusted_min_replicas, ctx.current_num_replicas - 1), {}
    else:
        return ctx.current_num_replicas, {}


import time
from typing import Dict

from ray import serve


@serve.deployment(
    autoscaling_config={
        "min_replicas": 1,
        "max_replicas": 5,
        "metrics_interval_s": 0.1,
        "policy": {
            "policy_function": "autoscaling_policy:custom_metrics_autoscaling_policy"
        },
    },
    max_ongoing_requests=5,
)
class CustomMetricsDeployment:
    def __init__(self):
        self.cpu_usage = 50.0
        self.memory_usage = 60.0

    def __call__(self) -> str:
        time.sleep(0.5)
        self.cpu_usage = min(100, self.cpu_usage + 5)
        self.memory_usage = min(100, self.memory_usage + 3)
        return "Hello, world!"

    def record_autoscaling_stats(self) -> Dict[str, float]:
        self.cpu_usage = max(20, self.cpu_usage - 2)
        self.memory_usage = max(30, self.memory_usage - 1)
        return {
            "cpu_usage": self.cpu_usage,
            "memory_usage": self.memory_usage,
        }


# Create the app
app = CustomMetricsDeployment.bind()

注意

record_autoscaling_stats() 方法可以是同步的,也可以是异步的。它必须在 RAY_SERVE_RECORD_AUTOSCALING_STATS_TIMEOUT_S(默认为 10 秒)指定的超时时间内完成。

在您的策略中,通过以下方式访问自定义指标:

  • ctx.raw_metrics[metric_name] — 一个映射副本 ID 到原始指标值列表的字典。每个副本存储的数据点数量取决于 look_back_period_s(滑动窗口大小)和 metrics_interval_s(指标记录间隔)。

  • ctx.aggregated_metrics[metric_name] — 从每个副本的原始指标值计算出的时间加权平均值。

应用级自动伸缩#

默认情况下,Ray Serve 中的每个部署都会独立进行自动伸缩。当您有多个需要协调伸缩的部署时(例如,共享后端资源的部署、相互依赖的部署或需要负载感知路由的部署),您可以定义一个 **应用级自动伸缩策略**。此策略会同时为应用程序内的所有部署做出伸缩决策。

定义应用级策略#

应用级自动伸缩策略是一个函数,它接收一个 Dict[DeploymentID, AutoscalingContext] 对象(每个部署一个),并返回一个元组 (decisions, policy_state)。每个上下文包含一个部署的指标和边界,策略返回所有部署的目标副本数。

从应用级策略返回的 policy_state 必须是 Dict[DeploymentID, Dict] — 一个字典,将每个部署 ID 映射到其自身的字典状态。Serve 会存储此按部署的状态,并在下一个控制循环迭代中,将每个部署的状态注入到该部署的 AutoscalingContext.policy_state 中。

Serve 本身不解释 policy_state 的内容。每个部署状态字典中的所有键都由用户控制。以下示例显示了一个根据其相对负载来伸缩部署的策略,确保下游部署具有足够的容量来处理上游流量。

from typing import Dict, Tuple
from ray.serve.config import AutoscalingContext

from ray.serve._private.common import DeploymentID
from ray.serve.config import AutoscalingContext


def coordinated_scaling_policy(
    contexts: Dict[DeploymentID, AutoscalingContext]
) -> Tuple[Dict[DeploymentID, int], Dict]:
    """Scale deployments based on coordinated load balancing."""
    decisions = {}

    # Example: Scale a preprocessing deployment
    preprocessing_id = [d for d in contexts if d.name == "Preprocessor"][0]
    preprocessing_ctx = contexts[preprocessing_id]

    # Scale based on queue depth
    preprocessing_replicas = max(
        preprocessing_ctx.capacity_adjusted_min_replicas,
        min(
            preprocessing_ctx.capacity_adjusted_max_replicas,
            preprocessing_ctx.total_num_requests // 10,
        ),
    )
    decisions[preprocessing_id] = preprocessing_replicas

    # Example: Scale a model deployment proportionally
    model_id = [d for d in contexts if d.name == "Model"][0]
    model_ctx = contexts[model_id]

    # Scale model to handle preprocessing output
    # Assuming model takes 2x longer than preprocessing
    model_replicas = max(
        model_ctx.capacity_adjusted_min_replicas,
        min(model_ctx.capacity_adjusted_max_replicas, preprocessing_replicas * 2),
    )
    decisions[model_id] = model_replicas

    return decisions, {}

以下示例显示了一个在控制循环迭代之间持久化状态的状态化应用级策略。

from typing import Dict, Tuple, Any
from ray.serve.config import AutoscalingContext
from ray.serve._private.common import DeploymentID

def stateful_application_level_policy(
    contexts: Dict[DeploymentID, AutoscalingContext]
) -> Tuple[Dict[DeploymentID, int], Dict[DeploymentID, Dict[str, Any]]]:
    """Example policy demonstrating per-deployment state persistence."""
    decisions = {}
    policy_state = {}

    for deployment_id, ctx in contexts.items():
        # Read previous state for this deployment (persisted from last iteration)
        prev_state = ctx.policy_state or {}
        scale_count = prev_state.get("scale_count", 0)
        last_replicas = prev_state.get("last_replicas", ctx.current_num_replicas)

        # Simple scaling logic: scale based on queue depth
        desired_replicas = max(
            ctx.capacity_adjusted_min_replicas,
            min(
                ctx.capacity_adjusted_max_replicas,
                ctx.total_num_requests // 10,
            ),
        )
        decisions[deployment_id] = desired_replicas

        # Store per-deployment state that persists across iterations
        policy_state[deployment_id] = {
            "scale_count": scale_count + 1,
            "last_replicas": desired_replicas,
        }

    return decisions, policy_state


配置应用级自动伸缩#

要使用应用级策略,您需要定义您的部署

import time
from ray import serve


@serve.deployment
class Preprocessor:
    def __call__(self, input_data: str) -> str:
        # Simulate preprocessing work
        time.sleep(0.05)
        return f"preprocessed_{input_data}"


@serve.deployment
class Model:
    def __call__(self, preprocessed_data: str) -> str:
        # Simulate model inference (takes longer than preprocessing)
        time.sleep(0.1)
        return f"result_{preprocessed_data}"


@serve.deployment
class Driver:
    def __init__(self, preprocessor, model):
        self._preprocessor = preprocessor
        self._model = model

    async def __call__(self, input_data: str) -> str:
        # Coordinate preprocessing and model inference
        preprocessed = await self._preprocessor.remote(input_data)
        result = await self._model.remote(preprocessed)
        return result


app = Driver.bind(Preprocessor.bind(), Model.bind())

然后,在您的应用配置中指定应用级策略

applications:
  - name: MyApp
    import_path: application_level_autoscaling:app
    autoscaling_policy:
      policy_function: autoscaling_policy:coordinated_scaling_policy
    deployments:
      - name: Preprocessor
        autoscaling_config:
          min_replicas: 1
          max_replicas: 10
      - name: Model
        autoscaling_config:
          min_replicas: 2
          max_replicas: 20

注意

通过 serve.run() 以编程方式配置应用级自动伸缩策略将在未来版本中支持。

注意

当您同时指定部署级策略和应用级策略时,应用级策略具有优先权。如果您配置了两项,Ray Serve 会记录一个警告。

警告

注意事项和限制

当您提供自定义策略时,只要它是一个简单的、自包含的 Python 代码,并且仅依赖于标准库,Ray Serve 就可以完全支持它。一旦策略变得更复杂,例如依赖于其他自定义模块或包,您就需要将这些模块打包到 Docker 镜像或环境中。这是因为 Ray Serve 使用 cloudpickle 来序列化自定义策略,并且它不捆绑传递性依赖项 — 如果您的策略继承自另一个模块中的超类或导入了自定义包,这些必须存在于目标环境中。此外,环境一致性很重要:Python 版本、cloudpickle 版本或库版本之间的差异会影响反序列化。

复杂策略的替代方案

当您的自定义自动伸缩策略具有复杂的依赖项,或者您希望更好地控制版本和部署时,您有几种选择:

  • 贡献给 Ray Serve:如果您的策略是通用的,并且可能使其他人受益,请考虑将其作为内置策略贡献给 Ray Serve,方法是在 Ray GitHub 仓库 上打开一个功能请求或拉取请求。推荐的实现位置是 python/ray/serve/autoscaling_policy.py

  • 确保环境中的依赖项:确保外部依赖项已安装在您的 Docker 镜像或环境中。

外部伸缩 API#

警告

This API is in alpha and may change before becoming stable.

外部伸缩 API 提供对 Ray Serve 应用程序中任何部署副本数量的编程控制。与基于队列深度和进行中请求的 Ray Serve 内置自动伸缩不同,此 API 允许您根据定义的任何外部标准进行伸缩。

示例:预测性伸缩#

此示例展示了如何实现基于历史模式或预测的预测性伸缩。通过运行调整副本数量的外部脚本(基于一天中的时间),您可以在预期流量高峰之前提前扩展。

定义部署#

以下示例创建一个简单的文本处理部署,您可以对其进行外部伸缩。将此代码保存到名为 external_scaler_predictive.py 的文件中

import time
from ray import serve
from typing import Any

@serve.deployment(num_replicas=3)
class TextProcessor:
    """A simple text processing deployment that can be scaled externally."""
    def __init__(self):
        self.request_count = 0

    def __call__(self, text: Any) -> dict:
        # Simulate text processing work
        time.sleep(0.1)
        self.request_count += 1
        return {
            "request_count": self.request_count,
        }


app = TextProcessor.bind()
配置外部伸缩#

在使用外部伸缩 API 之前,请在您的应用程序配置中启用它,方法是将 external_scaler_enabled: true。将此配置保存到名为 external_scaler_config.yaml 的文件中

applications:
  - name: my-app
    import_path: external_scaler_predictive:app
    external_scaler_enabled: true
    deployments:
      - name: TextProcessor
        num_replicas: 1

警告

外部伸缩和内置自动伸缩是互斥的。您不能为同一个应用程序同时使用它们。如果您设置了 external_scaler_enabled: true,则 **不得** 在该应用程序的任何部署上配置 autoscaling_config。尝试同时使用两者会导致错误。

实现伸缩逻辑#

以下脚本实现了基于一天时间和历史流量模式的预测性伸缩。将此脚本保存到名为 external_scaler_predictive_client.py 的文件中

import logging
import time
from datetime import datetime
import requests

APPLICATION_NAME = "my-app"
DEPLOYMENT_NAME = "TextProcessor"
SERVE_ENDPOINT = "https://:8265"
SCALING_INTERVAL = 300  # Check every 5 minutes

logger = logging.getLogger(__name__)


def get_current_replicas(app_name: str, deployment_name: str) -> int:
    """Get current replica count. Returns -1 on error."""
    try:
        resp = requests.get(
            f"{SERVE_ENDPOINT}/api/serve/applications/",
            timeout=10
        )
        if resp.status_code != 200:
            logger.error(f"Failed to get applications: {resp.status_code}")
            return -1
            
        apps = resp.json().get("applications", {})
        if app_name not in apps:
            logger.error(f"Application {app_name} not found")
            return -1

        deployments = apps[app_name].get("deployments", {})
        if deployment_name in deployments:
            return deployments[deployment_name]["target_num_replicas"]
                
        logger.error(f"Deployment {deployment_name} not found")
        return -1
    except requests.exceptions.RequestException as e:
        logger.error(f"Request failed: {e}")
        return -1


def scale_deployment(app_name: str, deployment_name: str):
    """Scale deployment based on time of day."""
    hour = datetime.now().hour
    current = get_current_replicas(app_name, deployment_name)
    
    # Check if we successfully retrieved the current replica count
    if current == -1:
        logger.error("Failed to get current replicas, skipping scaling decision")
        return
    
    target = 10 if 9 <= hour < 17 else 3  # Peak hours: 9am-5pm
    
    delta = target - current
    if delta == 0:
        logger.info(f"Already at target ({current} replicas)")
        return
    
    action = "Adding" if delta > 0 else "Removing"
    logger.info(f"{action} {abs(delta)} replicas ({current} -> {target})")
    
    try:
        resp = requests.post(
            f"{SERVE_ENDPOINT}/api/v1/applications/{app_name}/deployments/{deployment_name}/scale",
            headers={"Content-Type": "application/json"},
            json={"target_num_replicas": target},
            timeout=10
        )
        if resp.status_code == 200:
            logger.info("Successfully scaled deployment")
        else:
            logger.error(f"Scale failed: {resp.status_code} - {resp.text}")
    except requests.exceptions.RequestException as e:
        logger.error(f"Request failed: {e}")


def main():
    logger.info(f"Starting predictive scaling for {APPLICATION_NAME}/{DEPLOYMENT_NAME}")
    while True:
        scale_deployment(APPLICATION_NAME, DEPLOYMENT_NAME)
        time.sleep(SCALING_INTERVAL)

该脚本使用外部伸缩 API 端点来伸缩部署

  • API 端点POST https://:8265/api/v1/applications/{application_name}/deployments/{deployment_name}/scale

  • 请求体{"target_num_replicas": <number>}(必须符合 ScaleDeploymentRequest 模式)

伸缩客户端根据一天中的时间持续调整副本数量

  • 工作时间(上午 9 点 - 下午 5 点):10 个副本

  • 非高峰时段:3 个副本

运行示例#

按照以下步骤运行完整示例

  1. 使用配置启动 Ray Serve 应用程序

serve run external_scaler_config.yaml
  1. 在单独的终端中运行预测性伸缩客户端

python external_scaler_predictive_client.py

客户端根据一天中的时间自动调整副本数量。您可以在 Ray Dashboard 中或通过查看应用程序日志来监控伸缩行为。

重要注意事项#

理解外部伸缩器如何与您的部署交互,有助于您构建可靠的伸缩逻辑

  • 幂等 API 调用:伸缩 API 是幂等的。您可以安全地多次使用相同的 target_num_replicas 值调用它,而不会产生副作用。这使得您的伸缩逻辑可以安全地按计划运行或响应重复的指标更新。

  • 与 serve deploy 的交互:当您使用 serve deploy 升级您的服务时,您通过外部伸缩器 API 设置的副本数量会保持不变。此行为与您期望的 Ray Serve 内置自动伸缩器的行为一致 — 部署更新不会重置副本数量。

  • 查询当前副本数量:您可以通过查询 GET /applications API 来获取任何部署的当前副本数量。

    curl -X GET https://:8265/api/serve/applications/ \
    

    响应遵循 ServeInstanceDetails 模式,该模式包含一个 applications 字段,其中包含以应用程序名称为键的字典。每个应用程序都包含有关其所有部署的详细信息,包括当前的副本数量。利用这些信息做出明智的伸缩决策。例如,您可以逐渐扩展,通过添加现有副本的百分比而不是跳转到固定数量。

  • 初始副本数量:当您首次部署应用程序时,Ray Serve 会创建在部署配置的 num_replicas 字段中指定的副本数量。然后,外部伸缩器可以根据您的伸缩逻辑动态调整此数量。