入门指南#

本教程将引导您完成编写和测试 Ray Serve 应用的过程。它将向您展示如何

  • 将机器学习模型转换为 Ray Serve 部署

  • 通过 HTTP 在本地测试 Ray Serve 应用

  • 将多个机器学习模型组合成一个应用

在本教程中,我们将使用两个模型

您也可以使用任何 Python 框架中的自己的模型进行操作。

部署这两个模型后,我们将使用 HTTP 请求对其进行测试。

提示

如果您对如何改进本教程有任何建议,请告知我们

要运行此示例,您需要安装以下内容

pip install "ray[serve]" transformers requests torch

文本翻译模型(Ray Serve 之前)#

首先,让我们看一下文本翻译模型。代码如下

# File name: model.py
from transformers import pipeline


class Translator:
    def __init__(self):
        # Load model
        self.model = pipeline("translation_en_to_fr", model="t5-small")

    def translate(self, text: str) -> str:
        # Run inference
        model_output = self.model(text)

        # Post-process output to return only the translation text
        translation = model_output[0]["translation_text"]

        return translation


translator = Translator()

translation = translator.translate("Hello world!")
print(translation)

名为 model.py 的 Python 文件使用 Translator 类将英文文本翻译成法文。

  • Translator__init__ 方法中的 self.model 变量存储了一个使用 t5-small 模型翻译文本的函数。

  • self.model 被调用以翻译英文文本时,它会在一个格式为 [{"translation_text": "..."}] 的字典中返回翻译后的法文文本。

  • Translatortranslate 方法通过索引字典来提取翻译后的文本。

您可以复制此脚本并在本地运行。它将 "Hello world!" 翻译为 "Bonjour Monde!"

$ python model.py

Bonjour Monde!

请记住,TranslationPipeline 是本教程的示例 ML 模型。您可以使用任何 Python 框架中的任意模型进行操作。请查看我们的 scikit-learn、PyTorch 和 Tensorflow 教程以获取更多信息和示例

转换为 Ray Serve 应用#

在本节中,我们将使用 Ray Serve 部署文本翻译模型,以便对其进行扩展并通过 HTTP 查询。我们将从将 Translator 转换为 Ray Serve 部署开始。

首先,我们打开一个新的 Python 文件并导入 rayray.serve

from starlette.requests import Request

import ray
from ray import serve

导入这些模块后,我们可以包含上面的模型代码

from transformers import pipeline


@serve.deployment(num_replicas=2, ray_actor_options={"num_cpus": 0.2, "num_gpus": 0})
class Translator:
    def __init__(self):
        # Load model
        self.model = pipeline("translation_en_to_fr", model="t5-small")

    def translate(self, text: str) -> str:
        # Run inference
        model_output = self.model(text)

        # Post-process output to return only the translation text
        translation = model_output[0]["translation_text"]

        return translation

    async def __call__(self, http_request: Request) -> str:
        english_text: str = await http_request.json()
        return self.translate(english_text)


Translator 类有两个修改

  1. 它有一个装饰器 @serve.deployment

  2. 它有一个新方法 __call__

该装饰器将 Translator 从一个 Python 类转换为一个 Ray Serve Deployment 对象。

每个部署存储您编写的单个 Python 函数或类,并使用它来处理请求。您可以使用 @serve.deployment 装饰器中的参数独立地扩展和配置每个部署。本示例配置了一些常用参数

  • num_replicas: 一个整数,确定我们的部署进程在 Ray 中运行多少个副本。请求会在这些副本之间进行负载均衡,使您能够横向扩展部署。

  • ray_actor_options: 一个字典,包含每个副本的配置选项。

    • num_cpus: 一个浮点数,表示每个副本应保留的逻辑 CPU 数量。您可以将其设置为小数,以便在 CPU 数量少于副本数量的机器上打包多个副本。

    • num_gpus: 一个浮点数,表示每个副本应保留的逻辑 GPU 数量。您可以将其设置为小数,以便在 GPU 数量少于副本数量的机器上打包多个副本。

    • resources: 一个字典,包含副本的其他资源需求,例如非 GPU 加速器(如 HPU 或 TPU)。

所有这些参数都是可选的,您可以随意省略它们

...
@serve.deployment
class Translator:
  ...

部署会接收 Starlette HTTP request 对象 [1]。默认情况下,部署类的 __call__ 方法会在此 request 对象上被调用。返回值会作为 HTTP 响应体发送回去。

这就是为什么 Translator 需要一个新的 __call__ 方法。该方法通过读取传入 HTTP 请求的 JSON 数据并将其转发到 translate 方法来处理请求。翻译后的文本会被返回并通过 HTTP 响应发送回去。您还可以使用 Ray Serve 的 FastAPI 集成来避免处理原始 HTTP 请求。有关在 Serve 中使用 FastAPI 的更多信息,请查看FastAPI HTTP 部署

接下来,我们需要将 Translator 部署 bind(绑定)到将传递给其构造函数的参数。这定义了一个 Ray Serve 应用,我们可以本地运行或部署到生产环境(您稍后会看到应用可以包含多个部署)。由于 Translator 的构造函数不接受任何参数,我们可以直接调用部署的 bind 方法,无需传递任何参数

translator_app = Translator.bind()

至此,我们已准备好在本地测试应用。

运行 Ray Serve 应用#

这是我们上面构建的完整 Ray Serve 脚本

# File name: serve_quickstart.py
from starlette.requests import Request

import ray
from ray import serve

from transformers import pipeline


@serve.deployment(num_replicas=2, ray_actor_options={"num_cpus": 0.2, "num_gpus": 0})
class Translator:
    def __init__(self):
        # Load model
        self.model = pipeline("translation_en_to_fr", model="t5-small")

    def translate(self, text: str) -> str:
        # Run inference
        model_output = self.model(text)

        # Post-process output to return only the translation text
        translation = model_output[0]["translation_text"]

        return translation

    async def __call__(self, http_request: Request) -> str:
        english_text: str = await http_request.json()
        return self.translate(english_text)


translator_app = Translator.bind()

要在本地测试,我们使用 serve run CLI 命令运行脚本。该命令接受部署的导入路径,格式为 module:application。请确保在包含此脚本的本地副本(保存为 serve_quickstart.py)的目录中运行该命令,以便它可以导入应用

$ serve run serve_quickstart:translator_app

此命令将运行 translator_app 应用,然后阻塞并向控制台流式输出日志。可以使用 Ctrl-C 终止它,这将关闭该应用。

现在我们可以通过 HTTP 测试我们的模型。默认可以通过以下 URL 访问它

http://127.0.0.1:8000/

我们将发送一个包含英文文本的 JSON 数据 POST 请求。Translator__call__ 方法将解包此文本并将其转发到 translate 方法。这是一个请求翻译“Hello world!” 的客户端脚本

# File name: model_client.py
import requests

english_text = "Hello world!"

response = requests.post("http://127.0.0.1:8000/", json=english_text)
french_text = response.text

print(french_text)

要测试我们的部署,首先请确保 Translator 正在运行

$ serve run serve_deployment:translator_app

Translator 运行时,我们可以打开一个独立的终端窗口并运行客户端脚本。这将通过 HTTP 获取响应

$ python model_client.py

Bonjour monde!

组合多个模型#

Ray Serve 允许您将多个部署组合到一个 Ray Serve 应用中。这使得将多个机器学习模型与业务逻辑结合起来服务单个请求变得容易。我们可以使用 autoscaling_confignum_replicasnum_cpusnum_gpus 等参数来独立配置和扩展应用中的每个部署。

例如,让我们部署一个包含两个步骤的机器学习流水线

  1. 总结英文文本

  2. 将总结翻译成法文

Translator 已经执行步骤 2。我们可以使用 HuggingFace 的 SummarizationPipeline 来完成步骤 1。这是一个在本地运行的 SummarizationPipeline 示例

# File name: summary_model.py
from transformers import pipeline


class Summarizer:
    def __init__(self):
        # Load model
        self.model = pipeline("summarization", model="t5-small")

    def summarize(self, text: str) -> str:
        # Run inference
        model_output = self.model(text, min_length=5, max_length=15)

        # Post-process output to return only the summary text
        summary = model_output[0]["summary_text"]

        return summary


summarizer = Summarizer()

summary = summarizer.summarize(
    "It was the best of times, it was the worst of times, it was the age "
    "of wisdom, it was the age of foolishness, it was the epoch of belief"
)
print(summary)

您可以复制此脚本并在本地运行。它将《双城记》的片段总结为 it was the best of times, it was worst of times .

$ python summary_model.py

it was the best of times, it was worst of times .

这是一个将两个模型链接在一起的应用示例。该图接受英文文本,对其进行总结,然后进行翻译

# File name: serve_quickstart_composed.py
from starlette.requests import Request

import ray
from ray import serve
from ray.serve.handle import DeploymentHandle

from transformers import pipeline


@serve.deployment
class Translator:
    def __init__(self):
        # Load model
        self.model = pipeline("translation_en_to_fr", model="t5-small")

    def translate(self, text: str) -> str:
        # Run inference
        model_output = self.model(text)

        # Post-process output to return only the translation text
        translation = model_output[0]["translation_text"]

        return translation


@serve.deployment
class Summarizer:
    def __init__(self, translator: DeploymentHandle):
        self.translator = translator

        # Load model.
        self.model = pipeline("summarization", model="t5-small")

    def summarize(self, text: str) -> str:
        # Run inference
        model_output = self.model(text, min_length=5, max_length=15)

        # Post-process output to return only the summary text
        summary = model_output[0]["summary_text"]

        return summary

    async def __call__(self, http_request: Request) -> str:
        english_text: str = await http_request.json()
        summary = self.summarize(english_text)

        translation = await self.translator.translate.remote(summary)
        return translation


app = Summarizer.bind(Translator.bind())

此脚本包含已转换为部署的 Summarizer 类和经过一些修改的 Translator 类。在此脚本中,Summarizer 类包含 __call__ 方法,因为请求首先发送到它。它还接受一个 Translator 的句柄作为其构造函数参数之一,以便它可以将总结的文本转发到 Translator 部署。__call__ 方法还包含一些新代码

translation = await self.translator.translate.remote(summary)

self.translator.translate.remote(summary)Translatortranslate 方法发出一个异步调用,并立即返回一个 DeploymentResponse 对象。对该响应调用 await 会等待远程方法调用执行完毕并返回其返回值。该响应也可以直接传递给另一个 DeploymentHandle 调用。

我们定义完整的应用如下

app = Summarizer.bind(Translator.bind())

在这里,我们将 Translator 绑定到其(空)构造函数参数,然后将绑定的 Translator 作为 Summarizer 的构造函数参数传入。我们可以使用 serve run CLI 命令运行此部署图。请确保在包含 serve_quickstart_composed.py 代码的本地副本的目录中运行此命令

$ serve run serve_quickstart_composed:app

我们可以使用此客户端脚本向该图发出请求

# File name: composed_client.py
import requests

english_text = (
    "It was the best of times, it was the worst of times, it was the age "
    "of wisdom, it was the age of foolishness, it was the epoch of belief"
)
response = requests.post("http://127.0.0.1:8000/", json=english_text)
french_text = response.text

print(french_text)

应用运行时,我们可以打开一个独立的终端窗口并查询它

$ python composed_client.py

c'était le meilleur des temps, c'était le pire des temps .

组合式 Ray Serve 应用允许您将机器学习流水线的每个部分(例如推理和业务逻辑步骤)部署在独立的部署中。每个部署都可以单独配置和扩展,确保您能从资源中获得最大性能。有关更多信息,请参阅模型组合指南。

下一步#

  • 深入了解关键概念,以更深入地理解 Ray Serve。

  • 在 Ray Dashboard 中查看有关 Serve 应用的详细信息:Serve 视图

  • 了解更多关于如何将 Ray Serve 应用部署到生产环境的信息:生产指南

  • 查看流行机器学习框架的更深入教程:示例

脚注