生产指南#

在生产环境中运行 Ray Serve 的推荐方式是在 Kubernetes 上使用 KubeRayRayService 自定义资源。RayService 自定义资源会自动处理重要的生产需求,例如健康检查、状态报告、故障恢复和升级。如果您不在 Kubernetes 上运行,也可以直接使用 Serve CLI 在 Ray 集群上运行 Ray Serve。

本节将引导您快速了解如何生成 Serve 配置文件并使用 Serve CLI 进行部署。有关更多详细信息,您可以查阅生产指南中的其他页面

有关在虚拟机上部署而不是在 Kubernetes 上部署的说明,请参阅 在虚拟机上部署

工作示例:文本摘要和翻译应用程序#

在整个生产指南中,我们将使用以下 Serve 应用程序作为工作示例。该应用程序接收一段英文文本,然后将其摘要并翻译成法语(默认)、德语或罗马尼亚语。

from starlette.requests import Request
from typing import Dict

import ray
from ray import serve
from ray.serve.handle import DeploymentHandle

from transformers import pipeline


@serve.deployment
class Translator:
    def __init__(self):
        self.language = "french"
        self.model = pipeline("translation_en_to_fr", model="t5-small")

    def translate(self, text: str) -> str:
        model_output = self.model(text)

        translation = model_output[0]["translation_text"]

        return translation

    def reconfigure(self, config: Dict):
        self.language = config.get("language", "french")

        if self.language.lower() == "french":
            self.model = pipeline("translation_en_to_fr", model="t5-small")
        elif self.language.lower() == "german":
            self.model = pipeline("translation_en_to_de", model="t5-small")
        elif self.language.lower() == "romanian":
            self.model = pipeline("translation_en_to_ro", model="t5-small")
        else:
            pass


@serve.deployment
class Summarizer:
    def __init__(self, translator: DeploymentHandle):
        # Load model
        self.model = pipeline("summarization", model="t5-small")
        self.translator = translator
        self.min_length = 5
        self.max_length = 15

    def summarize(self, text: str) -> str:
        # Run inference
        model_output = self.model(
            text, min_length=self.min_length, max_length=self.max_length
        )

        # Post-process output to return only the summary text
        summary = model_output[0]["summary_text"]

        return summary

    async def __call__(self, http_request: Request) -> str:
        english_text: str = await http_request.json()
        summary = self.summarize(english_text)

        return await self.translator.translate.remote(summary)

    def reconfigure(self, config: Dict):
        self.min_length = config.get("min_length", 5)
        self.max_length = config.get("max_length", 15)


app = Summarizer.bind(Translator.bind())

将此代码本地保存在 text_ml.py 中。在开发过程中,我们可能会使用 serve run 命令来迭代运行、开发和重复(有关更多信息,请参阅 开发工作流程)。当准备好进入生产环境时,我们将生成一个结构化的 配置文件,作为应用程序的唯一事实来源。

此配置文件可以使用 serve build 命令生成

$ serve build text_ml:app -o serve_config.yaml

此文件的生成版本包含一个 import_path、一个 runtime_env 以及应用程序中每个部署的配置选项。该应用程序需要 torchtransformers 包,因此请修改生成配置的 runtime_env 字段以包含这两个 pip 包。将此配置本地保存在 serve_config.yaml 中。

proxy_location: EveryNode

http_options:
  host: 0.0.0.0
  port: 8000

applications:
- name: default
  route_prefix: /
  import_path: text_ml:app
  runtime_env:
    pip:
      - torch
      - transformers
  deployments:
  - name: Translator
    num_replicas: 1
    user_config:
      language: french
  - name: Summarizer
    num_replicas: 1

您可以使用 serve deploy 将应用程序部署到本地 Ray 集群,并使用 serve status 在运行时获取状态

# Start a local Ray cluster.
ray start --head

# Deploy the Text ML application to the local Ray cluster.
serve deploy serve_config.yaml
2022-08-16 12:51:22,043 SUCC scripts.py:180 --
Sent deploy request successfully!
 * Use `serve status` to check deployments' statuses.
 * Use `serve config` to see the running app's config.

$ serve status
proxies:
  cef533a072b0f03bf92a6b98cb4eb9153b7b7c7b7f15954feb2f38ec: HEALTHY
applications:
  default:
    status: RUNNING
    message: ''
    last_deployed_time_s: 1694041157.2211847
    deployments:
      Translator:
        status: HEALTHY
        replica_states:
          RUNNING: 1
        message: ''
      Summarizer:
        status: HEALTHY
        replica_states:
          RUNNING: 1
        message: ''

使用 Python 的 requests 测试应用程序

import requests

english_text = (
    "It was the best of times, it was the worst of times, it was the age "
    "of wisdom, it was the age of foolishness, it was the epoch of belief"
)
response = requests.post("http://127.0.0.1:8000/", json=english_text)
french_text = response.text

print(french_text)
# 'c'était le meilleur des temps, c'était le pire des temps .'

要更新应用程序,请修改配置文件并再次使用 serve deploy

后续步骤#

有关部署、更新和监控 Serve 应用程序的更深入的讲解,请参阅以下页面