部署模型组合#

借助本指南,您可以

  • 将包含 ML 模型或业务逻辑的多个 部署 组合成一个 应用程序

  • 独立扩展和配置您的每个 ML 模型和业务逻辑步骤

注意

Ray 2.10 版本已完全移除已弃用的 RayServeHandleRayServeSyncHandle API。

使用 DeploymentHandles 组合部署#

在构建应用程序时,您可以将多个部署 .bind() 并将它们相互传递给构造函数。在运行时,在部署代码内部,Ray Serve 会将绑定的部署替换为 DeploymentHandles,您可以使用这些 DeploymentHandles 调用其他部署的方法。此功能使您可以将应用程序的步骤(如预处理、模型推理和后处理)划分为独立的部署,您可以独立地扩展和配置这些部署。

使用 handle.remote 向部署发送请求。这些请求可以包含普通的 Python args 和 kwargs,DeploymentHandles 可以直接将它们传递给方法。方法调用返回一个 DeploymentResponse,它代表了输出的未来。您可以 await 该响应以检索其结果,或将其传递给另一个下游的 DeploymentHandle 调用。

基本的 DeploymentHandle 示例#

此示例包含两个部署

 1# File name: hello.py
 2from ray import serve
 3from ray.serve.handle import DeploymentHandle
 4
 5
 6@serve.deployment
 7class LanguageClassifer:
 8    def __init__(
 9        self, spanish_responder: DeploymentHandle, french_responder: DeploymentHandle
10    ):
11        self.spanish_responder = spanish_responder
12        self.french_responder = french_responder
13
14    async def __call__(self, http_request):
15        request = await http_request.json()
16        language, name = request["language"], request["name"]
17
18        if language == "spanish":
19            response = self.spanish_responder.say_hello.remote(name)
20        elif language == "french":
21            response = self.french_responder.say_hello.remote(name)
22        else:
23            return "Please try again."
24
25        return await response
26
27
28@serve.deployment
29class SpanishResponder:
30    def say_hello(self, name: str):
31        return f"Hola {name}"
32
33
34@serve.deployment
35class FrenchResponder:
36    def say_hello(self, name: str):
37        return f"Bonjour {name}"
38
39
40spanish_responder = SpanishResponder.bind()
41french_responder = FrenchResponder.bind()
42language_classifier = LanguageClassifer.bind(spanish_responder, french_responder)

在第 42 行,LanguageClassifier 部署接收 spanish_responderfrench_responder 作为构造函数参数。在运行时,Ray Serve 将这些参数转换为 DeploymentHandles。然后,LanguageClassifier 可以使用此句柄调用 spanish_responderfrench_responder 的部署方法。

例如,LanguageClassifier__call__ 方法使用 HTTP 请求的值来决定是用西班牙语还是法语响应。然后,它在第 19 行和第 21 行使用 DeploymentHandles 将请求的名称转发给 spanish_responderfrench_responder。调用的格式如下:

response: DeploymentResponse = self.spanish_responder.say_hello.remote(name)

此调用包含几个部分

  • self.spanish_responder 是通过构造函数传入的 SpanishResponder 句柄。

  • say_hello 是要调用的 SpanishResponder 方法。

  • remote 表示这是对另一个部署的 DeploymentHandle 调用。

  • namesay_hello 的参数。您可以在此处传递任意数量的位置参数或关键字参数。

此调用返回一个 DeploymentResponse 对象,该对象是对结果的引用,而不是结果本身。此模式允许调用异步执行。要获取实际结果,请 await 该响应。 await 会阻塞直到异步调用执行,然后返回结果。在此示例中,第 25 行调用 await response 并返回结果字符串。

警告

您可以使用 response.result() 方法来获取远程 DeploymentHandle 调用的返回值。但是,请避免在部署内部调用 .result(),因为它会阻止部署执行任何其他代码,直到远程方法调用完成。使用 await 允许部署在等待远程方法调用完成的同时处理其他请求。您应该在部署内部使用 await 而不是 .result()

您可以复制上面的 hello.py 脚本,并使用 serve run 命令运行它。请确保从包含 hello.py 的目录运行该命令,以便它能够找到脚本。

$ serve run hello:language_classifier

您可以使用此客户端脚本与示例进行交互

# File name: hello_client.py
import requests

response = requests.post(
    "https://:8000", json={"language": "spanish", "name": "Dora"}
)
greeting = response.text
print(greeting)

serve run 命令运行时,打开一个单独的终端窗口并运行脚本

$ python hello_client.py

Hola Dora

注意

组合允许您分解应用程序并独立扩展每个部分。例如,假设此 LanguageClassifier 应用程序的请求中有 75% 是西班牙语,25% 是法语。您可以将 SpanishResponder 扩展到 3 个副本,将 FrenchResponder 扩展到 1 个副本,以便满足您的工作负载需求。这种灵活性也适用于预留 CPU 和 GPU 等资源,以及您可以为每个部署设置的任何其他配置。

通过组合,您可以避免在服务使用不同类型和数量资源的模型和业务逻辑步骤时出现应用程序级别的瓶颈。

链接 DeploymentHandle 调用#

Ray Serve 可以直接将 DeploymentHandle 返回的 DeploymentResponse 对象传递给另一个 DeploymentHandle 调用,从而链接管道的多个阶段。您不需要 await 第一个响应,Ray Serve 会在后台管理 await 行为。当第一个调用完成时,Ray Serve 将第一个调用的输出(而不是 DeploymentResponse 对象)直接传递给第二个调用。

例如,下面的代码示例在应用程序中定义了三个部署

  • 一个 Adder 部署,它将一个值增加其配置的增量。

  • 一个 Multiplier 部署,它将一个值乘以其配置的倍数。

  • 一个 Ingress 部署,它链接了到 adder 和 multiplier 的调用,并返回最终响应。

请注意,来自 Adder 句柄的响应如何直接传递给 Multiplier 句柄,但在 multiplier 内部,输入参数解析为 Adder 调用的输出。

# File name: chain.py
from ray import serve
from ray.serve.handle import DeploymentHandle, DeploymentResponse


@serve.deployment
class Adder:
    def __init__(self, increment: int):
        self._increment = increment

    def __call__(self, val: int) -> int:
        return val + self._increment


@serve.deployment
class Multiplier:
    def __init__(self, multiple: int):
        self._multiple = multiple

    def __call__(self, val: int) -> int:
        return val * self._multiple


@serve.deployment
class Ingress:
    def __init__(self, adder: DeploymentHandle, multiplier: DeploymentHandle):
        self._adder = adder
        self._multiplier = multiplier

    async def __call__(self, input: int) -> int:
        adder_response: DeploymentResponse = self._adder.remote(input)
        # Pass the adder response directly into the multiplier (no `await` needed).
        multiplier_response: DeploymentResponse = self._multiplier.remote(
            adder_response
        )
        # `await` the final chained response.
        return await multiplier_response


app = Ingress.bind(
    Adder.bind(increment=1),
    Multiplier.bind(multiple=2),
)

handle: DeploymentHandle = serve.run(app)
response = handle.remote(5)
assert response.result() == 12, "(5 + 1) * 2 = 12"

流式传输 DeploymentHandle 调用#

您还可以使用 DeploymentHandles 来进行流式方法调用,该调用返回多个输出。要进行流式调用,方法必须是生成器,并且您必须设置 handle.options(stream=True)。然后,句柄调用返回一个 DeploymentResponseGenerator,而不是单个 DeploymentResponse。您可以像在 async for 代码块中一样,将 DeploymentResponseGenerators 用作同步或异步生成器。与 DeploymentResponse.result() 类似,请避免在部署内部将 DeploymentResponseGenerator 用作同步生成器,因为这会阻止同一副本上的其他请求并发执行。请注意,您不能将 DeploymentResponseGenerators 传递给其他句柄调用。

示例

# File name: stream.py
from typing import AsyncGenerator, Generator

from ray import serve
from ray.serve.handle import DeploymentHandle, DeploymentResponseGenerator


@serve.deployment
class Streamer:
    def __call__(self, limit: int) -> Generator[int, None, None]:
        for i in range(limit):
            yield i


@serve.deployment
class Caller:
    def __init__(self, streamer: DeploymentHandle):
        self._streamer = streamer.options(
            # Must set `stream=True` on the handle, then the output will be a
            # response generator.
            stream=True,
        )

    async def __call__(self, limit: int) -> AsyncGenerator[int, None]:
        # Response generator can be used in an `async for` block.
        r: DeploymentResponseGenerator = self._streamer.remote(limit)
        async for i in r:
            yield i


app = Caller.bind(Streamer.bind())

handle: DeploymentHandle = serve.run(app).options(
    stream=True,
)

# Response generator can also be used as a regular generator in a sync context.
r: DeploymentResponseGenerator = handle.remote(10)
assert list(r) == list(range(10))

高级:在嵌套对象中传递 DeploymentResponse [完全弃用]#

警告

在嵌套对象中将 DeploymentResponse 传递给下游句柄调用已完全弃用,不再受支持。请改用 DeploymentResponse._to_object_ref() 手动传递嵌套对象中的相应对象引用。

DeploymentResponse 对象作为顶级参数或关键字参数传递仍然受支持。

高级:将 DeploymentResponse 转换为 Ray ObjectRef#

在底层,每个 DeploymentResponse 都对应一个 Ray ObjectRef,对于流式调用,则对应一个 ObjectRefGenerator。要将 DeploymentHandle 调用与 Ray Actors 或 Tasks 组合,您可能需要将响应解析为其 ObjectRef。为此,您可以使用 DeploymentResponse._to_object_refDeploymentResponse._to_object_ref_sync 开发人员 API。

示例

# File name: response_to_object_ref.py
import ray
from ray import serve
from ray.serve.handle import DeploymentHandle, DeploymentResponse


@ray.remote
def say_hi_task(inp: str):
    return f"Ray task got message: '{inp}'"


@serve.deployment
class SayHi:
    def __call__(self) -> str:
        return "Hi from Serve deployment"


@serve.deployment
class Ingress:
    def __init__(self, say_hi: DeploymentHandle):
        self._say_hi = say_hi

    async def __call__(self):
        # Make a call to the SayHi deployment and pass the result ref to
        # a downstream Ray task.
        response: DeploymentResponse = self._say_hi.remote()
        response_obj_ref: ray.ObjectRef = await response._to_object_ref()
        final_obj_ref: ray.ObjectRef = say_hi_task.remote(response_obj_ref)
        return await final_obj_ref


app = Ingress.bind(SayHi.bind())
handle: DeploymentHandle = serve.run(app)
assert handle.remote().result() == "Ray task got message: 'Hi from Serve deployment'"