部署模型组合#

通过本指南,您可以

  • 将包含机器学习模型或业务逻辑的多个 部署 组合成一个 应用

  • 独立地扩缩和配置每个机器学习模型和业务逻辑步骤

注意

已弃用的 RayServeHandleRayServeSyncHandle API 已在 Ray 2.10 中完全移除。

使用 DeploymentHandles 组合部署#

构建应用时,您可以 .bind() 多个部署并将它们传递给彼此的构造函数。在运行时,在部署代码内部,Ray Serve 会将绑定的部署替换为 DeploymentHandle,您可以使用它来调用其他部署的方法。此功能允许您将应用的步骤(例如预处理、模型推理和后处理)划分为独立的部署,以便您可以独立地进行扩缩和配置。

使用 handle.remote 向部署发送请求。这些请求可以包含普通 Python 位置参数和关键字参数,DeploymentHandle 可以将它们直接传递给方法。方法调用返回一个 DeploymentResponse,表示对输出的 Future 对象。您可以 await 该响应来检索其结果,或者将其传递给下游的另一个 DeploymentHandle 调用。

基本 DeploymentHandle 示例#

此示例包含两个部署

 1# File name: hello.py
 2from ray import serve
 3from ray.serve.handle import DeploymentHandle
 4
 5
 6@serve.deployment
 7class LanguageClassifer:
 8    def __init__(
 9        self, spanish_responder: DeploymentHandle, french_responder: DeploymentHandle
10    ):
11        self.spanish_responder = spanish_responder
12        self.french_responder = french_responder
13
14    async def __call__(self, http_request):
15        request = await http_request.json()
16        language, name = request["language"], request["name"]
17
18        if language == "spanish":
19            response = self.spanish_responder.say_hello.remote(name)
20        elif language == "french":
21            response = self.french_responder.say_hello.remote(name)
22        else:
23            return "Please try again."
24
25        return await response
26
27
28@serve.deployment
29class SpanishResponder:
30    def say_hello(self, name: str):
31        return f"Hola {name}"
32
33
34@serve.deployment
35class FrenchResponder:
36    def say_hello(self, name: str):
37        return f"Bonjour {name}"
38
39
40spanish_responder = SpanishResponder.bind()
41french_responder = FrenchResponder.bind()
42language_classifier = LanguageClassifer.bind(spanish_responder, french_responder)

在第 42 行,LanguageClassifier 部署将 spanish_responderfrench_responder 作为构造函数参数接收。在运行时,Ray Serve 会将这些参数转换为 DeploymentHandle。然后,LanguageClassifier 可以使用此句柄调用 spanish_responderfrench_responder 的部署方法。

例如,LanguageClassifier__call__ 方法使用 HTTP 请求的值来决定是回应西班牙语还是法语。然后,它在第 19 行和第 21 行使用 DeploymentHandle 将请求中的 name 转发给 spanish_responderfrench_responder。调用格式如下

response: DeploymentResponse = self.spanish_responder.say_hello.remote(name)

此调用包含几个部分

  • self.spanish_responder 是通过构造函数传入的 SpanishResponder 句柄。

  • say_hello 是要调用的 SpanishResponder 方法。

  • remote 表示这是对另一个部署的 DeploymentHandle 调用。

  • namesay_hello 的参数。您可以在此处传递任意数量的位置参数或关键字参数。

此调用返回一个 DeploymentResponse 对象,它是对结果的引用,而不是结果本身。这种模式允许调用异步执行。要获取实际结果,请 await 响应。 await 会阻塞,直到异步调用执行完毕并返回结果。在此示例中,第 25 行调用 await response 并返回结果字符串。

警告

您可以使用 response.result() 方法获取远程 DeploymentHandle 调用的返回值。但是,应避免在部署内部调用 .result(),因为它会阻塞部署执行任何其他代码,直到远程方法调用完成。使用 await 允许部署在等待远程方法调用完成的同时处理其他请求。您应该在部署内部使用 await 而非 .result()

您可以复制上面的 hello.py 脚本并使用 serve run 运行它。确保从包含 hello.py 的目录运行命令,以便它可以找到脚本

$ serve run hello:language_classifier

您可以使用此客户端脚本与示例进行交互

# File name: hello_client.py
import requests

response = requests.post(
    "http://localhost:8000", json={"language": "spanish", "name": "Dora"}
)
greeting = response.text
print(greeting)

serve run 命令正在运行时,打开另一个终端窗口并运行脚本

$ python hello_client.py

Hola Dora

注意

组合允许您将应用分解并独立地扩缩每个部分。例如,假设 LanguageClassifier 应用的请求中有 75% 是西班牙语,25% 是法语。您可以将 SpanishResponder 扩缩到 3 个副本,将 FrenchResponder 扩缩到 1 个副本,以满足您的工作负载需求。这种灵活性也适用于预留 CPU 和 GPU 等资源,以及您可以为每个部署设置的任何其他配置。

通过组合,您可以避免在服务使用不同类型和数量资源的模型和业务逻辑步骤时出现应用级瓶颈。

链式调用 DeploymentHandle#

Ray Serve 可以直接将 DeploymentHandle 返回的 DeploymentResponse 对象传递给另一个 DeploymentHandle 调用,从而将流水线的多个阶段链接起来。您无需 await 第一个响应,Ray Serve 会在底层管理 await 行为。当第一个调用完成时,Ray Serve 会将第一个调用的输出(而不是 DeploymentResponse 对象)直接传递给第二个调用。

例如,下面的代码示例在一个应用中定义了三个部署

  • 一个 Adder 部署,它根据配置的增量增加一个值。

  • 一个 Multiplier 部署,它根据配置的乘数乘以一个值。

  • 一个 Ingress 部署,它将对 Adder 和 Multiplier 的调用链接起来并返回最终响应。

注意 Adder 句柄的响应如何直接传递给 Multiplier 句柄,但在 Multiplier 内部,输入参数会解析为 Adder 调用的输出。

# File name: chain.py
from ray import serve
from ray.serve.handle import DeploymentHandle, DeploymentResponse


@serve.deployment
class Adder:
    def __init__(self, increment: int):
        self._increment = increment

    def __call__(self, val: int) -> int:
        return val + self._increment


@serve.deployment
class Multiplier:
    def __init__(self, multiple: int):
        self._multiple = multiple

    def __call__(self, val: int) -> int:
        return val * self._multiple


@serve.deployment
class Ingress:
    def __init__(self, adder: DeploymentHandle, multiplier: DeploymentHandle):
        self._adder = adder
        self._multiplier = multiplier

    async def __call__(self, input: int) -> int:
        adder_response: DeploymentResponse = self._adder.remote(input)
        # Pass the adder response directly into the multiplier (no `await` needed).
        multiplier_response: DeploymentResponse = self._multiplier.remote(
            adder_response
        )
        # `await` the final chained response.
        return await multiplier_response


app = Ingress.bind(
    Adder.bind(increment=1),
    Multiplier.bind(multiple=2),
)

handle: DeploymentHandle = serve.run(app)
response = handle.remote(5)
assert response.result() == 12, "(5 + 1) * 2 = 12"

流式 DeploymentHandle 调用#

您还可以使用 DeploymentHandle 进行流式方法调用,该调用返回多个输出。要进行流式调用,方法必须是生成器,并且您必须设置 handle.options(stream=True)。然后,句柄调用将返回 DeploymentResponseGenerator,而不是一元的 DeploymentResponse。您可以将 DeploymentResponseGenerator 用作同步或异步生成器,就像在 async for 代码块中一样。与 DeploymentResponse.result() 类似,应避免在部署内部使用 DeploymentResponseGenerator 作为同步生成器,因为这会阻塞该副本上的其他请求并发执行。请注意,您不能将 DeploymentResponseGenerator 传递给其他句柄调用。

示例

# File name: stream.py
from typing import AsyncGenerator, Generator

from ray import serve
from ray.serve.handle import DeploymentHandle, DeploymentResponseGenerator


@serve.deployment
class Streamer:
    def __call__(self, limit: int) -> Generator[int, None, None]:
        for i in range(limit):
            yield i


@serve.deployment
class Caller:
    def __init__(self, streamer: DeploymentHandle):
        self._streamer = streamer.options(
            # Must set `stream=True` on the handle, then the output will be a
            # response generator.
            stream=True,
        )

    async def __call__(self, limit: int) -> AsyncGenerator[int, None]:
        # Response generator can be used in an `async for` block.
        r: DeploymentResponseGenerator = self._streamer.remote(limit)
        async for i in r:
            yield i


app = Caller.bind(Streamer.bind())

handle: DeploymentHandle = serve.run(app).options(
    stream=True,
)

# Response generator can also be used as a regular generator in a sync context.
r: DeploymentResponseGenerator = handle.remote(10)
assert list(r) == list(range(10))

高级:在嵌套对象中传递 DeploymentResponse [完全弃用]#

警告

在嵌套对象中将 DeploymentResponse 传递给下游句柄调用已完全弃用,不再支持。请手动使用 DeploymentResponse._to_object_ref() 来在嵌套对象中传递相应的对象引用。

仍支持将 DeploymentResponse 对象作为顶级位置参数或关键字参数传递。

高级:将 DeploymentResponse 转换为 Ray ObjectRef#

在底层,每个 DeploymentResponse 对应于一个 Ray ObjectRef,或者对于流式调用而言是 ObjectRefGenerator。为了将 DeploymentHandle 调用与 Ray Actors 或 Tasks 组合使用,您可能需要将响应解析为其 ObjectRef。为此,您可以使用开发者 API DeploymentResponse._to_object_refDeploymentResponse._to_object_ref_sync

示例

# File name: response_to_object_ref.py
import ray
from ray import serve
from ray.serve.handle import DeploymentHandle, DeploymentResponse


@ray.remote
def say_hi_task(inp: str):
    return f"Ray task got message: '{inp}'"


@serve.deployment
class SayHi:
    def __call__(self) -> str:
        return "Hi from Serve deployment"


@serve.deployment
class Ingress:
    def __init__(self, say_hi: DeploymentHandle):
        self._say_hi = say_hi

    async def __call__(self):
        # Make a call to the SayHi deployment and pass the result ref to
        # a downstream Ray task.
        response: DeploymentResponse = self._say_hi.remote()
        response_obj_ref: ray.ObjectRef = await response._to_object_ref()
        final_obj_ref: ray.ObjectRef = say_hi_task.remote(response_obj_ref)
        return await final_obj_ref


app = Ingress.bind(SayHi.bind())
handle: DeploymentHandle = serve.run(app)
assert handle.remote().result() == "Ray task got message: 'Hi from Serve deployment'"