模型多路复用#
本节将帮助您了解如何使用 serve.multiplexed 和 serve.get_multiplexed_model_id API 来编写多路复用部署。
为什么需要模型多路复用?#
模型多路复用是一种有效服务具有相似输入类型的多个模型,并将它们从一个副本池中提供出来的技术。流量根据请求头被路由到相应的模型。为了用副本池服务多个模型,模型多路复用可以优化成本并均衡流量负载。这在您拥有许多形状相同但权重不同且不经常被调用的模型时非常有用。如果部署的任何副本已加载模型,则发往该模型的入站流量(基于请求头)将自动路由到该副本,从而避免不必要的加载时间。
编写多路复用部署#
要编写多路复用部署,请使用 serve.multiplexed 和 serve.get_multiplexed_model_id API。
假设您在 AWS S3 存储桶中拥有多个 PyTorch 模型,结构如下:
s3://my_bucket/1/model.pt
s3://my_bucket/2/model.pt
s3://my_bucket/3/model.pt
s3://my_bucket/4/model.pt
...
定义多路复用部署
from ray import serve
import aioboto3
import torch
import starlette
@serve.deployment
class ModelInferencer:
def __init__(self):
self.bucket_name = "my_bucket"
@serve.multiplexed(max_num_models_per_replica=3)
async def get_model(self, model_id: str):
session = aioboto3.Session()
async with session.resource("s3") as s3:
obj = await s3.Bucket(self.bucket_name)
await obj.download_file(f"{model_id}/model.pt", f"model_{model_id}.pt")
return torch.load(f"model_{model_id}.pt")
async def __call__(self, request: starlette.requests.Request):
model_id = serve.get_multiplexed_model_id()
model = await self.get_model(model_id)
return model.forward(torch.rand(64, 3, 512, 512))
entry = ModelInferencer.bind()
注意
serve.multiplexed API 还有一个 max_num_models_per_replica 参数。使用它来配置每个副本加载多少模型。如果模型数量大于 max_num_models_per_replica,Serve 将使用 LRU(最近最少使用)策略来逐出最近最少使用的模型。
提示
此代码示例使用了 PyTorch Model 对象。您也可以定义自己的模型类并在此处使用。要在模型被逐出时释放资源,请实现 __del__ 方法。Ray Serve 在模型被逐出时会在内部调用 __del__ 方法来释放资源。
serve.get_multiplexed_model_id 从请求头中检索模型 ID。然后将此 ID 传递给 get_model 函数。如果模型尚未缓存到副本中,Serve 将从 S3 存储桶加载它。否则,将返回缓存的模型。
注意
在内部,Serve 路由器使用请求头中的模型 ID 将流量路由到相应的副本。如果具有该模型的所有副本都已超额订阅,Ray Serve 会将请求路由到一个新副本,该副本随后将从 S3 存储桶加载并缓存模型。
要发送到特定模型的请求,请在请求头中包含 serve_multiplexed_model_id 字段,并将其值设置为您要发送请求的模型 ID。
import requests # noqa: E402
resp = requests.get(
"https://:8000", headers={"serve_multiplexed_model_id": str("1")}
)
注意
请求头中需要 serve_multiplexed_model_id,其值应该是您要发送请求的模型 ID。
如果在请求头中找不到 serve_multiplexed_model_id,Serve 将其视为普通请求并将其路由到一个随机副本。
运行上述代码后,您应该会在部署日志中看到以下行:
INFO 2023-05-24 01:19:03,853 default_Model default_Model#EjYmnQ CUpzhwUUNw / default replica.py:442 - Started executing request CUpzhwUUNw
INFO 2023-05-24 01:19:03,854 default_Model default_Model#EjYmnQ CUpzhwUUNw / default multiplex.py:131 - Loading model '1'.
INFO 2023-05-24 01:19:04,859 default_Model default_Model#EjYmnQ CUpzhwUUNw / default replica.py:542 - __CALL__ OK 1005.8ms
如果您继续加载更多模型并超过 max_num_models_per_replica,最近最少使用的模型将被逐出,您将在部署日志中看到以下行:
INFO 2023-05-24 01:19:15,988 default_Model default_Model#rimNjA WzjTbJvbPN / default replica.py:442 - Started executing request WzjTbJvbPN
INFO 2023-05-24 01:19:15,988 default_Model default_Model#rimNjA WzjTbJvbPN / default multiplex.py:145 - Unloading model '3'.
INFO 2023-05-24 01:19:15,988 default_Model default_Model#rimNjA WzjTbJvbPN / default multiplex.py:131 - Loading model '4'.
INFO 2023-05-24 01:19:16,993 default_Model default_Model#rimNjA WzjTbJvbPN / default replica.py:542 - __CALL__ OK 1005.7ms
您也可以使用 handle 的 options API 来发送到特定模型的请求。
obj_ref = handle.options(multiplexed_model_id="1").remote("<your param>")
在使用模型组合时,您可以使用 Serve DeploymentHandle 从上游部署发送请求到多路复用部署。您需要将 multiplexed_model_id 设置在 options 中。例如:
@serve.deployment
class Downstream:
def __call__(self):
return serve.get_multiplexed_model_id()
@serve.deployment
class Upstream:
def __init__(self, downstream: DeploymentHandle):
self._h = downstream
async def __call__(self, request: starlette.requests.Request):
return await self._h.options(multiplexed_model_id="bar").remote()
serve.run(Upstream.bind(Downstream.bind()))
resp = requests.get("https://:8000")