生产指南#

在生产环境中运行 Ray Serve 的推荐方式是在 Kubernetes 上使用 KubeRay RayService 自定义资源。RayService 自定义资源会自动处理重要的生产需求,例如健康检查、状态报告、故障恢复和升级。如果您不在 Kubernetes 上运行,也可以直接使用 Serve CLI 在 Ray 集群上运行 Ray Serve。

本节将向您介绍如何生成 Serve 配置文件并使用 Serve CLI 进行部署的快速入门。有关更多详细信息,您可以查看生产指南中的其他页面

要在 VM 而非 Kubernetes 上部署,请参阅在 VM 上部署

工作示例:文本摘要和翻译应用#

在整个生产指南中,我们将使用以下 Serve 应用作为工作示例。该应用接收一段英文文本,然后将其总结并翻译成法语(默认)、德语或罗马尼亚语。

from starlette.requests import Request
from typing import Dict

import ray
from ray import serve
from ray.serve.handle import DeploymentHandle

from transformers import pipeline


@serve.deployment
class Translator:
    def __init__(self):
        self.language = "french"
        self.model = pipeline("translation_en_to_fr", model="t5-small")

    def translate(self, text: str) -> str:
        model_output = self.model(text)

        translation = model_output[0]["translation_text"]

        return translation

    def reconfigure(self, config: Dict):
        self.language = config.get("language", "french")

        if self.language.lower() == "french":
            self.model = pipeline("translation_en_to_fr", model="t5-small")
        elif self.language.lower() == "german":
            self.model = pipeline("translation_en_to_de", model="t5-small")
        elif self.language.lower() == "romanian":
            self.model = pipeline("translation_en_to_ro", model="t5-small")
        else:
            pass


@serve.deployment
class Summarizer:
    def __init__(self, translator: DeploymentHandle):
        # Load model
        self.model = pipeline("summarization", model="t5-small")
        self.translator = translator
        self.min_length = 5
        self.max_length = 15

    def summarize(self, text: str) -> str:
        # Run inference
        model_output = self.model(
            text, min_length=self.min_length, max_length=self.max_length
        )

        # Post-process output to return only the summary text
        summary = model_output[0]["summary_text"]

        return summary

    async def __call__(self, http_request: Request) -> str:
        english_text: str = await http_request.json()
        summary = self.summarize(english_text)

        return await self.translator.translate.remote(summary)

    def reconfigure(self, config: Dict):
        self.min_length = config.get("min_length", 5)
        self.max_length = config.get("max_length", 15)


app = Summarizer.bind(Translator.bind())

将此代码保存在本地文件 text_ml.py 中。在开发过程中,我们可能会使用 serve run 命令进行迭代运行、开发和重复(有关更多信息,请参阅开发工作流程)。准备好投入生产时,我们将生成一个结构化的配置文件,作为应用的单一事实来源。

可以使用 serve build 命令生成此配置文件

$ serve build text_ml:app -o serve_config.yaml

生成的此文件版本包含 import_pathruntime_env 以及应用中每个部署的配置选项。该应用需要 torchtransformers 包,因此修改生成的配置文件中的 runtime_env 字段以包含这两个 pip 包。将此配置保存在本地文件 serve_config.yaml 中。

proxy_location: EveryNode

http_options:
  host: 0.0.0.0
  port: 8000

applications:
- name: default
  route_prefix: /
  import_path: text_ml:app
  runtime_env:
    pip:
      - torch
      - transformers
  deployments:
  - name: Translator
    num_replicas: 1
    user_config:
      language: french
  - name: Summarizer
    num_replicas: 1

您可以使用 serve deploy 将应用部署到本地 Ray 集群,并使用 serve status 在运行时获取状态

# Start a local Ray cluster.
ray start --head

# Deploy the Text ML application to the local Ray cluster.
serve deploy serve_config.yaml
2022-08-16 12:51:22,043 SUCC scripts.py:180 --
Sent deploy request successfully!
 * Use `serve status` to check deployments' statuses.
 * Use `serve config` to see the running app's config.

$ serve status
proxies:
  cef533a072b0f03bf92a6b98cb4eb9153b7b7c7b7f15954feb2f38ec: HEALTHY
applications:
  default:
    status: RUNNING
    message: ''
    last_deployed_time_s: 1694041157.2211847
    deployments:
      Translator:
        status: HEALTHY
        replica_states:
          RUNNING: 1
        message: ''
      Summarizer:
        status: HEALTHY
        replica_states:
          RUNNING: 1
        message: ''

使用 Python requests 测试应用

import requests

english_text = (
    "It was the best of times, it was the worst of times, it was the age "
    "of wisdom, it was the age of foolishness, it was the epoch of belief"
)
response = requests.post("http://127.0.0.1:8000/", json=english_text)
french_text = response.text

print(french_text)
# 'c'était le meilleur des temps, c'était le pire des temps .'

要更新应用,请修改配置文件并再次使用 serve deploy

后续步骤#

要深入了解如何部署、更新和监控 Serve 应用,请参阅以下页面