入门#

本教程将引导您完成编写和测试 Ray Serve 应用程序的过程。它将展示如何

  • 将机器学习模型转换为 Ray Serve 部署

  • 通过 HTTP 在本地测试 Ray Serve 应用程序

  • 将多模型机器学习模型组合成单个应用程序

在本教程中,我们将使用两个模型

您也可以使用任何 Python 框架中的自己的模型进行跟随。

部署这两个模型后,我们将通过 HTTP 请求对它们进行测试。

提示

如果您对如何改进本教程有任何建议,请告诉我们

要运行此示例,您需要安装以下库

pip install "ray[serve]" transformers requests torch

文本翻译模型(Ray Serve 之前)#

首先,让我们看看我们的文本翻译模型。以下是它的代码

# File name: model.py
from transformers import pipeline


class Translator:
    def __init__(self):
        # Load model
        self.model = pipeline("translation_en_to_fr", model="t5-small")

    def translate(self, text: str) -> str:
        # Run inference
        model_output = self.model(text)

        # Post-process output to return only the translation text
        translation = model_output[0]["translation_text"]

        return translation


translator = Translator()

translation = translator.translate("Hello world!")
print(translation)

名为 model.py 的 Python 文件使用 Translator 类将英语文本翻译成法语。

  • Translator__init__ 方法中的 self.model 变量存储了一个使用 t5-small 模型进行文本翻译的函数。

  • self.model 被用于英文文本时,它会返回一个字典格式为 [{"translation_text": "..."}] 的翻译后的法文文本。

  • Translatortranslate 方法通过索引字典来提取翻译后的文本。

您可以复制粘贴此脚本并在本地运行。它会将 "Hello world!" 翻译成 "Bonjour Monde!"

$ python model.py

Bonjour Monde!

请注意,TranslationPipeline 是本教程中的示例 ML 模型。您可以使用任何 Python 框架中的任意模型进行跟随。有关更多信息和示例,请参阅我们关于 scikit-learn、PyTorch 和 Tensorflow 的教程。

转换为 Ray Serve 应用程序#

在本节中,我们将使用 Ray Serve 部署文本翻译模型,使其可以扩展并通过 HTTP 查询。我们将首先将 Translator 转换为 Ray Serve 部署。

首先,我们打开一个新的 Python 文件并导入 rayray.serve

from starlette.requests import Request

import ray
from ray import serve

在这些导入之后,我们可以包含上面的模型代码

from transformers import pipeline


@serve.deployment(num_replicas=2, ray_actor_options={"num_cpus": 0.2, "num_gpus": 0})
class Translator:
    def __init__(self):
        # Load model
        self.model = pipeline("translation_en_to_fr", model="t5-small")

    def translate(self, text: str) -> str:
        # Run inference
        model_output = self.model(text)

        # Post-process output to return only the translation text
        translation = model_output[0]["translation_text"]

        return translation

    async def __call__(self, http_request: Request) -> str:
        english_text: str = await http_request.json()
        return self.translate(english_text)


Translator 类有两个修改

  1. 它有一个装饰器 @serve.deployment

  2. 它有一个新方法 __call__

该装饰器将 Translator 从 Python 类转换为 Ray Serve Deployment 对象。

每个部署都存储一个您编写的单个 Python 函数或类,并使用它来处理请求。您可以使用 @serve.deployment 装饰器中的参数独立地扩展和配置每个部署。示例配置了一些常用参数

  • num_replicas:一个整数,决定在 Ray 中运行多少个部署进程副本。请求会在这些副本之间进行负载均衡,允许您水平扩展部署。

  • ray_actor_options:一个包含每个副本配置选项的字典。

    • num_cpus:一个浮点数,表示每个副本应保留的逻辑 CPU 数量。您可以将其设为分数,以便将多个副本打包到 CPU 少于副本的机器上。

    • num_gpus:一个浮点数,表示每个副本应保留的逻辑 GPU 数量。您可以将其设为分数,以便将多个副本打包到 GPU 少于副本的机器上。

    • resources:一个包含副本其他资源需求的字典,例如非 GPU 加速器(如 HPU 或 TPU)。

所有这些参数都是可选的,因此您可以省略它们。

...
@serve.deployment
class Translator:
  ...

部署接收 Starlette HTTP request 对象 [1]。默认情况下,部署类的 __call__ 方法会在此 request 对象上被调用。返回值会作为 HTTP 响应体发送回去。

这就是为什么 Translator 需要一个新的 __call__ 方法。该方法通过读取其 JSON 数据并将其转发到 translate 方法来处理传入的 HTTP 请求。翻译后的文本会被返回并通过 HTTP 响应发送回去。您还可以使用 Ray Serve 的 FastAPI 集成来避免处理原始 HTTP 请求。有关 FastAPI 与 Serve 集成的更多信息,请参阅 FastAPI HTTP 部署

接下来,我们需要将 Translator 部署绑定到将传递给其构造函数的参数。这定义了一个 Ray Serve 应用程序,我们可以在本地运行或部署到生产环境(稍后您将看到应用程序可以包含多个部署)。由于 Translator 的构造函数不接受任何参数,因此我们可以不传递任何内容即可调用部署的 bind 方法。

translator_app = Translator.bind()

这样,我们就准备好在本地测试该应用程序了。

运行 Ray Serve 应用程序#

这是我们上面构建的完整的 Ray Serve 脚本。

# File name: serve_quickstart.py
from starlette.requests import Request

import ray
from ray import serve

from transformers import pipeline


@serve.deployment(num_replicas=2, ray_actor_options={"num_cpus": 0.2, "num_gpus": 0})
class Translator:
    def __init__(self):
        # Load model
        self.model = pipeline("translation_en_to_fr", model="t5-small")

    def translate(self, text: str) -> str:
        # Run inference
        model_output = self.model(text)

        # Post-process output to return only the translation text
        translation = model_output[0]["translation_text"]

        return translation

    async def __call__(self, http_request: Request) -> str:
        english_text: str = await http_request.json()
        return self.translate(english_text)


translator_app = Translator.bind()

要在本地测试,我们使用 serve run CLI 命令运行脚本。该命令接受部署的导入路径,格式为 module:application。请确保从包含此脚本本地副本的目录运行命令,并将脚本保存为 serve_quickstart.py,以便可以导入该应用程序。

$ serve run serve_quickstart:translator_app

此命令将运行 translator_app 应用程序,然后阻塞并向控制台流式传输日志。可以通过按 Ctrl-C 来终止它,这将拆除该应用程序。

现在我们可以通过 HTTP 测试我们的模型。默认情况下,它可以通过以下 URL 访问

http://127.0.0.1:8000/

我们将发送一个带有包含英文文本的 JSON 数据的 POST 请求。Translator__call__ 方法将解包此文本并将其转发给 translate 方法。这是一个请求“Hello world!”翻译的客户端脚本。

# File name: model_client.py
import requests

english_text = "Hello world!"

response = requests.post("http://127.0.0.1:8000/", json=english_text)
french_text = response.text

print(french_text)

要测试我们的部署,请首先确保 Translator 正在运行。

$ serve run serve_deployment:translator_app

Translator 运行时,我们可以打开另一个终端窗口并运行客户端脚本。这将通过 HTTP 获取响应。

$ python model_client.py

Bonjour monde!

组合多个模型#

Ray Serve 允许您将多个部署组合成一个 Ray Serve 应用程序。这使得轻松地将多个机器学习模型与业务逻辑结合起来以服务单个请求。我们可以使用 autoscaling_confignum_replicasnum_cpusnum_gpus 等参数来独立配置和扩展应用程序中的每个部署。

例如,让我们部署一个包含两个步骤的机器学习管道

  1. 摘要英文文本

  2. 将摘要翻译成法语

Translator 已经执行了第二步。我们可以使用 HuggingFace 的 SummarizationPipeline 来完成第一步。以下是一个本地运行的 SummarizationPipeline 示例。

# File name: summary_model.py
from transformers import pipeline


class Summarizer:
    def __init__(self):
        # Load model
        self.model = pipeline("summarization", model="t5-small")

    def summarize(self, text: str) -> str:
        # Run inference
        model_output = self.model(text, min_length=5, max_length=15)

        # Post-process output to return only the summary text
        summary = model_output[0]["summary_text"]

        return summary


summarizer = Summarizer()

summary = summarizer.summarize(
    "It was the best of times, it was the worst of times, it was the age "
    "of wisdom, it was the age of foolishness, it was the epoch of belief"
)
print(summary)

您可以复制粘贴此脚本并在本地运行。它将《双城记》的片段摘要为 it was the best of times, it was the worst of times .

$ python summary_model.py

it was the best of times, it was the worst of times .

这是一个将这两个模型链接在一起的应用程序。该图接收英文文本,对其进行摘要,然后进行翻译。

# File name: serve_quickstart_composed.py
from starlette.requests import Request

import ray
from ray import serve
from ray.serve.handle import DeploymentHandle

from transformers import pipeline


@serve.deployment
class Translator:
    def __init__(self):
        # Load model
        self.model = pipeline("translation_en_to_fr", model="t5-small")

    def translate(self, text: str) -> str:
        # Run inference
        model_output = self.model(text)

        # Post-process output to return only the translation text
        translation = model_output[0]["translation_text"]

        return translation


@serve.deployment
class Summarizer:
    def __init__(self, translator: DeploymentHandle):
        self.translator = translator

        # Load model.
        self.model = pipeline("summarization", model="t5-small")

    def summarize(self, text: str) -> str:
        # Run inference
        model_output = self.model(text, min_length=5, max_length=15)

        # Post-process output to return only the summary text
        summary = model_output[0]["summary_text"]

        return summary

    async def __call__(self, http_request: Request) -> str:
        english_text: str = await http_request.json()
        summary = self.summarize(english_text)

        translation = await self.translator.translate.remote(summary)
        return translation


app = Summarizer.bind(Translator.bind())

此脚本包含我们转换为部署的 Summarizer 类以及经过一些修改的 Translator 类。在此脚本中,Summarizer 类包含 __call__ 方法,因为请求首先发送到它。它还接受 Translator 的句柄作为其构造函数参数之一,以便它可以将摘要后的文本转发到 Translator 部署。__call__ 方法还包含一些新代码。

translation = await self.translator.translate.remote(summary)

self.translator.translate.remote(summary) 发出一个异步调用到 Translatortranslate 方法,并立即返回一个 DeploymentResponse 对象。在响应上调用 await 将等待远程方法调用执行并返回其返回值。响应也可以直接传递给另一个 DeploymentHandle 调用。

我们将完整的应用程序定义如下。

app = Summarizer.bind(Translator.bind())

在这里,我们将 Translator 绑定到其(空的)构造函数参数,然后我们将绑定的 Translator 作为 Summarizer 的构造函数参数传入。我们可以使用 serve run CLI 命令来运行此部署图。请确保从包含 serve_quickstart_composed.py 代码本地副本的目录运行此命令。

$ serve run serve_quickstart_composed:app

我们可以使用此客户端脚本向该图发出请求。

# File name: composed_client.py
import requests

english_text = (
    "It was the best of times, it was the worst of times, it was the age "
    "of wisdom, it was the age of foolishness, it was the epoch of belief"
)
response = requests.post("http://127.0.0.1:8000/", json=english_text)
french_text = response.text

print(french_text)

在应用程序运行时,我们可以打开另一个终端窗口并查询它。

$ python composed_client.py

c'était le meilleur des temps, c'était le pire des temps .

组合的 Ray Serve 应用程序允许您将机器学习管道的每个部分(例如推理和业务逻辑步骤)部署在单独的部署中。这些部署中的每一个都可以单独配置和扩展,确保您从资源中获得最大的性能。请参阅 模型组合 指南以了解更多信息。

后续步骤#

  • 深入了解 关键概念 以更深入地了解 Ray Serve。

  • 在 Ray Dashboard 中查看您的 Serve 应用程序的详细信息:Serve 视图

  • 了解有关如何将 Ray Serve 应用程序部署到生产环境的更多信息:生产指南

  • 查看更深入的流行机器学习框架教程:示例

脚注