使用自定义算法进行请求路由#

警告

This API is in alpha and may change before becoming stable.

不同的 Ray Serve 应用程序需要不同的负载均衡逻辑。例如,在服务 LLM 时,您可能希望采用与负载均衡请求数量到副本不同的策略:例如,负载均衡正在进行的输入 token、负载均衡 kv-cache 利用率等。RequestRouter 是 Ray Serve 中的一个抽象,允许为每个部署扩展和自定义负载均衡逻辑。

本指南将介绍如何使用 RequestRouter API 实现给定部署副本之间的自定义负载均衡。它将涵盖以下内容

  • 定义一个简单的统一请求路由器进行负载均衡

  • 部署具有统一请求路由器的应用程序

  • 请求路由的实用混合类

  • 定义一个复杂的吞吐量感知请求路由器

  • 部署具有吞吐量感知请求路由器的应用程序

定义简单的统一请求路由器#

创建一个名为 custom_request_router.py 的文件,其中包含以下代码

import random
from ray.serve.request_router import (
    PendingRequest,
    RequestRouter,
    ReplicaID,
    ReplicaResult,
    RunningReplica,
)
from typing import (
    List,
    Optional,
)


class UniformRequestRouter(RequestRouter):
    async def choose_replicas(
        self,
        candidate_replicas: List[RunningReplica],
        pending_request: Optional[PendingRequest] = None,
    ) -> List[List[RunningReplica]]:
        print("UniformRequestRouter routing request")
        index = random.randint(0, len(candidate_replicas) - 1)
        return [[candidate_replicas[index]]]

    def on_request_routed(
        self,
        pending_request: PendingRequest,
        replica_id: ReplicaID,
        result: ReplicaResult,
    ):
        print("on_request_routed callback is called!!")


此代码定义了一个简单的统一请求路由器,该路由器将请求路由到一个随机副本,以平均分配负载,而不考虑每个副本的队列长度或请求的正文。RequestRouter。它实现了 choose_replicas 方法,该方法为所有传入请求返回随机副本。RequestRouter。返回类型是副本列表的列表,其中每个内部列表代表一个副本的秩。第一秩最受青睐,最后一秩最不受青睐。请求将按顺序尝试路由到每个秩集中请求队列最短的副本,直到副本能够处理该请求。如果没有任何副本能够处理该请求,则会使用回退延迟再次调用 choose_replicas,直到副本能够处理该请求。

注意

此请求路由器还实现了 on_request_routed,这可以帮助您在请求路由后更新请求路由器的状态。

部署具有统一请求路由器的应用程序#

要使用自定义请求路由器,您需要将 request_router_class 参数传递给 deployment 装饰器。另请注意,request_router_class 可以作为已导入的类或作为类的导入路径字符串传递。让我们部署一个使用统一请求路由器的简单应用程序,如下所示

from ray import serve
from ray.serve.request_router import ReplicaID
import time
from collections import defaultdict
from ray.serve.context import _get_internal_replica_context
from typing import Any, Dict
from ray.serve.config import RequestRouterConfig


@serve.deployment(
    request_router_config=RequestRouterConfig(
        request_router_class="custom_request_router:UniformRequestRouter",
    ),
    num_replicas=10,
    ray_actor_options={"num_cpus": 0},
)
class UniformRequestRouterApp:
    def __init__(self):
        context = _get_internal_replica_context()
        self.replica_id: ReplicaID = context.replica_id

    async def __call__(self):
        return self.replica_id


handle = serve.run(UniformRequestRouterApp.bind())
response = handle.remote().result()
print(f"Response from UniformRequestRouterApp: {response}")
# Example output:
#   Response from UniformRequestRouterApp:
#   Replica(id='67vc4ts5', deployment='UniformRequestRouterApp', app='default')

在请求路由时,控制台将打印“UniformRequestRouter routing request”和“on_request_routed callback is called!!”消息。响应也将被随机路由到一个副本。您可以通过发送更多请求并查看副本的分布大致相等来测试这一点。

注意

目前,配置请求路由器的唯一方法是将其作为参数传递给部署装饰器。这意味着您无法更改具有正在运行的路由器的现有部署句柄的请求路由器。如果您有特定用例需要重新配置部署句柄上的请求路由器,请在 Ray GitHub 存储库 上打开一个功能请求。

请求路由的实用混合类#

Ray Serve 提供了可以用来扩展请求路由器功能的实用混合类。这些混合类可用于实现常见的路由策略,如局部性感知路由、多路复用模型支持和 FIFO 请求路由。

  • FIFOMixin:此混合类实现先进先出 (FIFO) 请求路由。请求路由器的默认行为是 OOO(乱序),它将请求路由到通过 choose_replicas 传递的请求分配到的确切副本。此混合类对于可以独立于请求内容工作的路由算法很有用,因此请求可以按接收顺序尽快路由。通过在自定义请求路由器中包含此混合类,请求匹配算法将更新为 FIFO 路由请求。此混合类不提供任何其他标志或附加帮助方法。

  • LocalityMixin:此混合类实现局部性感知请求路由。它在副本更新之间更新内部状态,以跟踪同一节点、同一区域以及其他所有副本之间的位置。它提供了 apply_locality_routingrank_replicas_via_locality 帮助函数,根据副本与请求的局部性来路由和排名副本,这有助于降低延迟并提高性能。

  • MultiplexMixin:当您使用模型多路复用时,需要根据已知哪个副本已加载模型的“热”版本来路由请求。它在副本更新之间更新内部状态,以跟踪每个副本上加载的模型以及每个副本的模型缓存大小。它提供了 apply_multiplex_routingrank_replicas_via_multiplex 帮助函数,根据请求的多路复用模型 ID 来路由和排名副本。

定义一个复杂的吞吐量感知请求路由器#

一个功能齐全的请求路由器可以更复杂,并且应该考虑到多路复用模型、局部性、每个副本的请求队列长度,并使用吞吐量等自定义统计数据来决定路由请求到哪个副本。下面的类定义了一个吞吐量感知请求路由器,该路由器会考虑到这些因素来路由请求。将以下代码添加到 custom_request_router.py 文件中

from ray.serve.request_router import (
    FIFOMixin,
    LocalityMixin,
    MultiplexMixin,
    PendingRequest,
    RequestRouter,
    ReplicaID,
    ReplicaResult,
    RunningReplica,
)
from typing import (
    Dict,
    List,
    Optional,
)


class ThroughputAwareRequestRouter(
    FIFOMixin, MultiplexMixin, LocalityMixin, RequestRouter
):
    async def choose_replicas(
        self,
        candidate_replicas: List[RunningReplica],
        pending_request: Optional[PendingRequest] = None,
    ) -> List[List[RunningReplica]]:
        """
        This method chooses the best replica for the request based on
        multiplexed, locality, and custom throughput stats. The algorithm
        works as follows:

        1. Populate top_ranked_replicas based on available replicas based on
          multiplex_id
        2. Populate and override top_ranked_replicas info based on locality
         information of replicas (we want to prefer replicas that are in the
          same vicinity to this deployment)
        3. Select the replica with minimum throughput.
        """

        # Dictionary to hold the top-ranked replicas
        top_ranked_replicas: Dict[ReplicaID, RunningReplica] = {}
        # Take the best set of replicas for the multiplexed model
        if (
            pending_request is not None
            and pending_request.metadata.multiplexed_model_id
        ):
            ranked_replicas_multiplex: List[RunningReplica] = (
                self.rank_replicas_via_multiplex(
                    replicas=candidate_replicas,
                    multiplexed_model_id=pending_request.metadata.multiplexed_model_id,
                )
            )[0]

            # Filter out replicas that are not available (queue length exceed max ongoing request)
            ranked_replicas_multiplex = self.select_available_replicas(
                candidates=ranked_replicas_multiplex
            )

            for replica in ranked_replicas_multiplex:
                top_ranked_replicas[replica.replica_id] = replica

        # Take the best set of replicas in terms of locality
        ranked_replicas_locality: List[
            RunningReplica
        ] = self.rank_replicas_via_locality(replicas=candidate_replicas)[0]

        # Filter out replicas that are not available (queue length exceed max ongoing request)
        ranked_replicas_locality = self.select_available_replicas(
            candidates=ranked_replicas_locality
        )

        for replica in ranked_replicas_locality:
            top_ranked_replicas[replica.replica_id] = replica

        print("ThroughputAwareRequestRouter routing request")

        # Take the replica with minimum throughput.
        min_throughput_replicas = min(
            [replica for replica in top_ranked_replicas.values()],
            key=lambda r: r.routing_stats.get("throughput", 0),
        )
        return [[min_throughput_replicas]]


此请求路由器继承自 RequestRouter,以及用于 FIFO 请求路由的 FIFOMixin,用于局部性感知请求路由的 LocalityMixin,以及用于多路复用模型支持的 MultiplexMixin。它实现了 choose_replicas,以从 rank_replicas_via_multiplexrank_replicas_via_locality 获取最高排名的副本,并使用 select_available_replicas 帮助函数来过滤掉已达到其最大请求队列长度的副本。最后,它选择吞吐量最低的副本并返回最佳副本。

部署具有吞吐量感知请求路由器的应用程序#

要使用吞吐量感知请求路由器,您可以如下部署应用程序

def _time_ms() -> int:
    return int(time.time() * 1000)


@serve.deployment(
    request_router_config=RequestRouterConfig(
        request_router_class="custom_request_router:ThroughputAwareRequestRouter",
        request_routing_stats_period_s=1,
        request_routing_stats_timeout_s=1,
    ),
    num_replicas=3,
    ray_actor_options={"num_cpus": 0},
)
class ThroughputAwareRequestRouterApp:
    def __init__(self):
        self.throughput_buckets: Dict[int, int] = defaultdict(int)
        self.last_throughput_buckets = _time_ms()
        context = _get_internal_replica_context()
        self.replica_id: ReplicaID = context.replica_id

    def __call__(self):
        self.update_throughput()
        return self.replica_id

    def update_throughput(self):
        current_timestamp_ms = _time_ms()

        # Under high concurrency, requests can come in at different times. This
        # early return helps to skip if the current_timestamp_ms is more than a
        # second older than the last throughput bucket.
        if current_timestamp_ms < self.last_throughput_buckets - 1000:
            return

        # Record the request to the bucket
        self.throughput_buckets[current_timestamp_ms] += 1
        self.last_throughput_buckets = current_timestamp_ms

    def record_routing_stats(self) -> Dict[str, Any]:
        current_timestamp_ms = _time_ms()
        throughput = 0

        for t, c in list(self.throughput_buckets.items()):
            if t < current_timestamp_ms - 1000:
                # Remove the bucket if it is older than 1 second
                self.throughput_buckets.pop(t)
            else:
                throughput += c

        return {
            "throughput": throughput,
        }


handle = serve.run(ThroughputAwareRequestRouterApp.bind())
response = handle.remote().result()
print(f"Response from ThroughputAwareRequestRouterApp: {response}")
# Example output:
#   Response from ThroughputAwareRequestRouterApp:
#   Replica(id='tkywafya', deployment='ThroughputAwareRequestRouterApp', app='default')

与统一请求路由器类似,自定义请求路由器可以在 deployment 装饰器的 request_router_class 参数中定义。Serve 控制器通过调用 record_routing_stats 从每个部署的副本中提取统计数据。request_routing_stats_period_srequest_routing_stats_timeout_s 参数控制 Serve 控制器在其后台线程中从每个副本提取信息的频率和超时时间。您可以通过覆盖部署类定义中的 record_routing_stats 来自定义这些统计信息的发出。自定义请求路由器随后可以通过查找运行副本的 routing_stats 属性来获取更新的路由统计信息,并在路由策略中使用它。

警告

注意事项和限制

当您提供自定义路由器时,只要它是简单、独立的 Python 代码,并且仅依赖于标准库,Ray Serve 就可以完全支持它。一旦路由器变得更加复杂,例如依赖于其他自定义模块或包,您就需要确保这些模块被打包到 Docker 镜像或环境中。这是因为 Ray Serve 使用 cloudpickle 来序列化自定义路由器,并且它不会供应商传递依赖项 — 如果您的路由器继承自另一个模块中的超类或导入自定义包,则这些包必须存在于目标环境中。此外,环境对等性也很重要:Python 版本、cloudpickle 版本或库版本之间的差异可能会影响反序列化。

复杂路由器的替代方案

当您的自定义请求路由器具有复杂的依赖项或您希望更好地控制版本控制和部署时,您有几种选择

  • 使用内置路由器:考虑使用 Ray Serve 附带的路由器 — 这些路由器经过充分测试,可用于生产环境,并保证跨不同环境正常工作。

  • 贡献给 Ray Serve:如果您的路由器是通用的,并且可能使他人受益,请考虑将其作为内置路由器贡献给 Ray Serve,方法是在 Ray GitHub 存储库 上打开一个功能请求或拉取请求。建议的实现位置是 python/ray/serve/_private/request_router/

  • 确保环境中的依赖项:确保外部依赖项已安装在您的 Docker 镜像或环境中。