模型多路复用#

本节帮助您了解如何使用 serve.multiplexedserve.get_multiplexed_model_id API 编写多路复用部署。

为什么使用模型多路复用?#

模型多路复用是一种技术,用于从一组副本中高效地服务多个具有相似输入类型的模型。流量根据请求头路由到相应的模型。为了通过一组副本服务多个模型,模型多路复用优化了成本并负载均衡了流量。这在您可能有许多形状相同但权重不同且不经常调用的模型的情况下非常有用。如果部署的任何副本已加载该模型,则该模型(基于请求头)的传入流量将自动路由到该副本,从而避免不必要的加载时间。

编写多路复用部署#

要编写多路复用部署,请使用 serve.multiplexedserve.get_multiplexed_model_id API。

假设您在 AWS S3 存储桶中有多个 Torch 模型,结构如下

s3://my_bucket/1/model.pt
s3://my_bucket/2/model.pt
s3://my_bucket/3/model.pt
s3://my_bucket/4/model.pt
...

定义多路复用部署


from ray import serve
import aioboto3
import torch
import starlette


@serve.deployment
class ModelInferencer:
    def __init__(self):
        self.bucket_name = "my_bucket"

    @serve.multiplexed(max_num_models_per_replica=3)
    async def get_model(self, model_id: str):
        session = aioboto3.Session()
        async with session.resource("s3") as s3:
            obj = await s3.Bucket(self.bucket_name)
            await obj.download_file(f"{model_id}/model.pt", f"model_{model_id}.pt")
            return torch.load(f"model_{model_id}.pt")

    async def __call__(self, request: starlette.requests.Request):
        model_id = serve.get_multiplexed_model_id()
        model = await self.get_model(model_id)
        return model.forward(torch.rand(64, 3, 512, 512))


entry = ModelInferencer.bind()

注意

serve.multiplexed API 还有一个参数 max_num_models_per_replica。使用它来配置单个副本中加载的模型数量。如果模型数量大于 max_num_models_per_replica,Serve 将使用 LRU 策略驱逐最近最少使用的模型。

提示

此代码示例使用 PyTorch Model 对象。您也可以定义自己的模型类并在此处使用。要在模型被驱逐时释放资源,请实现 __del__ 方法。Ray Serve 会在内部调用 __del__ 方法来释放模型被驱逐时的资源。

serve.get_multiplexed_model_id 用于从请求头中检索模型 ID,然后将 model_id 传递给 get_model 函数。如果在副本中未找到模型 ID,Serve 将从 S3 存储桶加载模型并将其缓存在副本中。如果在副本中找到模型 ID,Serve 将返回缓存的模型。

注意

在内部,Serve 路由器将根据请求头中的模型 ID 将流量路由到相应的副本。如果所有持有该模型的副本都已超载,Ray Serve 会将请求发送到尚未加载模型的新副本。该副本将从 S3 存储桶加载模型并进行缓存。

要将请求发送到特定模型,请在请求头中包含 serve_multiplexed_model_id 字段,并将值设置为您要发送请求的模型 ID。

import requests  # noqa: E402

resp = requests.get(
    "http://localhost:8000", headers={"serve_multiplexed_model_id": str("1")}
)

注意

请求头中必须包含 serve_multiplexed_model_id 字段,其值应为您要发送请求的模型 ID。

如果请求头中未找到 serve_multiplexed_model_id,Serve 将将其视为普通请求并路由到随机副本。

运行上述代码后,您应该在部署日志中看到以下行

INFO 2023-05-24 01:19:03,853 default_Model default_Model#EjYmnQ CUpzhwUUNw / default replica.py:442 - Started executing request CUpzhwUUNw
INFO 2023-05-24 01:19:03,854 default_Model default_Model#EjYmnQ CUpzhwUUNw / default multiplex.py:131 - Loading model '1'.
INFO 2023-05-24 01:19:04,859 default_Model default_Model#EjYmnQ CUpzhwUUNw / default replica.py:542 - __CALL__ OK 1005.8ms

如果您继续加载更多模型并超出 max_num_models_per_replica,最近最少使用的模型将被驱逐,您将在部署日志中看到以下行:

INFO 2023-05-24 01:19:15,988 default_Model default_Model#rimNjA WzjTbJvbPN / default replica.py:442 - Started executing request WzjTbJvbPN
INFO 2023-05-24 01:19:15,988 default_Model default_Model#rimNjA WzjTbJvbPN / default multiplex.py:145 - Unloading model '3'.
INFO 2023-05-24 01:19:15,988 default_Model default_Model#rimNjA WzjTbJvbPN / default multiplex.py:131 - Loading model '4'.
INFO 2023-05-24 01:19:16,993 default_Model default_Model#rimNjA WzjTbJvbPN / default replica.py:542 - __CALL__ OK 1005.7ms

您还可以使用 handle 的 options API 将请求发送到特定模型。

obj_ref = handle.options(multiplexed_model_id="1").remote("<your param>")

使用模型组合时,您可以使用 Serve DeploymentHandle 从上游部署向多路复用部署发送请求。您需要在 options 中设置 multiplexed_model_id。例如

@serve.deployment
class Downstream:
    def __call__(self):
        return serve.get_multiplexed_model_id()


@serve.deployment
class Upstream:
    def __init__(self, downstream: DeploymentHandle):
        self._h = downstream

    async def __call__(self, request: starlette.requests.Request):
        return await self._h.options(multiplexed_model_id="bar").remote()


serve.run(Upstream.bind(Downstream.bind()))
resp = requests.get("http://localhost:8000")