部署模型组合#
通过本指南,您可以
注意
已弃用的 RayServeHandle
和 RayServeSyncHandle
API 已在 Ray 2.10 中完全移除。
使用 DeploymentHandles 组合部署#
构建应用时,您可以 .bind()
多个部署并将它们传递给彼此的构造函数。在运行时,在部署代码内部,Ray Serve 会将绑定的部署替换为 DeploymentHandle,您可以使用它来调用其他部署的方法。此功能允许您将应用的步骤(例如预处理、模型推理和后处理)划分为独立的部署,以便您可以独立地进行扩缩和配置。
使用 handle.remote
向部署发送请求。这些请求可以包含普通 Python 位置参数和关键字参数,DeploymentHandle
可以将它们直接传递给方法。方法调用返回一个 DeploymentResponse
,表示对输出的 Future 对象。您可以 await
该响应来检索其结果,或者将其传递给下游的另一个 DeploymentHandle
调用。
基本 DeploymentHandle 示例#
此示例包含两个部署
1# File name: hello.py
2from ray import serve
3from ray.serve.handle import DeploymentHandle
4
5
6@serve.deployment
7class LanguageClassifer:
8 def __init__(
9 self, spanish_responder: DeploymentHandle, french_responder: DeploymentHandle
10 ):
11 self.spanish_responder = spanish_responder
12 self.french_responder = french_responder
13
14 async def __call__(self, http_request):
15 request = await http_request.json()
16 language, name = request["language"], request["name"]
17
18 if language == "spanish":
19 response = self.spanish_responder.say_hello.remote(name)
20 elif language == "french":
21 response = self.french_responder.say_hello.remote(name)
22 else:
23 return "Please try again."
24
25 return await response
26
27
28@serve.deployment
29class SpanishResponder:
30 def say_hello(self, name: str):
31 return f"Hola {name}"
32
33
34@serve.deployment
35class FrenchResponder:
36 def say_hello(self, name: str):
37 return f"Bonjour {name}"
38
39
40spanish_responder = SpanishResponder.bind()
41french_responder = FrenchResponder.bind()
42language_classifier = LanguageClassifer.bind(spanish_responder, french_responder)
在第 42 行,LanguageClassifier
部署将 spanish_responder
和 french_responder
作为构造函数参数接收。在运行时,Ray Serve 会将这些参数转换为 DeploymentHandle
。然后,LanguageClassifier
可以使用此句柄调用 spanish_responder
和 french_responder
的部署方法。
例如,LanguageClassifier
的 __call__
方法使用 HTTP 请求的值来决定是回应西班牙语还是法语。然后,它在第 19 行和第 21 行使用 DeploymentHandle
将请求中的 name 转发给 spanish_responder
或 french_responder
。调用格式如下
response: DeploymentResponse = self.spanish_responder.say_hello.remote(name)
此调用包含几个部分
self.spanish_responder
是通过构造函数传入的SpanishResponder
句柄。say_hello
是要调用的SpanishResponder
方法。remote
表示这是对另一个部署的DeploymentHandle
调用。name
是say_hello
的参数。您可以在此处传递任意数量的位置参数或关键字参数。
此调用返回一个 DeploymentResponse
对象,它是对结果的引用,而不是结果本身。这种模式允许调用异步执行。要获取实际结果,请 await
响应。 await
会阻塞,直到异步调用执行完毕并返回结果。在此示例中,第 25 行调用 await response
并返回结果字符串。
警告
您可以使用 response.result()
方法获取远程 DeploymentHandle
调用的返回值。但是,应避免在部署内部调用 .result()
,因为它会阻塞部署执行任何其他代码,直到远程方法调用完成。使用 await
允许部署在等待远程方法调用完成的同时处理其他请求。您应该在部署内部使用 await
而非 .result()
。
您可以复制上面的 hello.py
脚本并使用 serve run
运行它。确保从包含 hello.py
的目录运行命令,以便它可以找到脚本
$ serve run hello:language_classifier
您可以使用此客户端脚本与示例进行交互
# File name: hello_client.py
import requests
response = requests.post(
"http://localhost:8000", json={"language": "spanish", "name": "Dora"}
)
greeting = response.text
print(greeting)
当 serve run
命令正在运行时,打开另一个终端窗口并运行脚本
$ python hello_client.py
Hola Dora
注意
组合允许您将应用分解并独立地扩缩每个部分。例如,假设 LanguageClassifier
应用的请求中有 75% 是西班牙语,25% 是法语。您可以将 SpanishResponder
扩缩到 3 个副本,将 FrenchResponder
扩缩到 1 个副本,以满足您的工作负载需求。这种灵活性也适用于预留 CPU 和 GPU 等资源,以及您可以为每个部署设置的任何其他配置。
通过组合,您可以避免在服务使用不同类型和数量资源的模型和业务逻辑步骤时出现应用级瓶颈。
链式调用 DeploymentHandle#
Ray Serve 可以直接将 DeploymentHandle
返回的 DeploymentResponse
对象传递给另一个 DeploymentHandle
调用,从而将流水线的多个阶段链接起来。您无需 await
第一个响应,Ray Serve 会在底层管理 await
行为。当第一个调用完成时,Ray Serve 会将第一个调用的输出(而不是 DeploymentResponse
对象)直接传递给第二个调用。
例如,下面的代码示例在一个应用中定义了三个部署
一个
Adder
部署,它根据配置的增量增加一个值。一个
Multiplier
部署,它根据配置的乘数乘以一个值。一个
Ingress
部署,它将对 Adder 和 Multiplier 的调用链接起来并返回最终响应。
注意 Adder
句柄的响应如何直接传递给 Multiplier
句柄,但在 Multiplier 内部,输入参数会解析为 Adder
调用的输出。
# File name: chain.py
from ray import serve
from ray.serve.handle import DeploymentHandle, DeploymentResponse
@serve.deployment
class Adder:
def __init__(self, increment: int):
self._increment = increment
def __call__(self, val: int) -> int:
return val + self._increment
@serve.deployment
class Multiplier:
def __init__(self, multiple: int):
self._multiple = multiple
def __call__(self, val: int) -> int:
return val * self._multiple
@serve.deployment
class Ingress:
def __init__(self, adder: DeploymentHandle, multiplier: DeploymentHandle):
self._adder = adder
self._multiplier = multiplier
async def __call__(self, input: int) -> int:
adder_response: DeploymentResponse = self._adder.remote(input)
# Pass the adder response directly into the multiplier (no `await` needed).
multiplier_response: DeploymentResponse = self._multiplier.remote(
adder_response
)
# `await` the final chained response.
return await multiplier_response
app = Ingress.bind(
Adder.bind(increment=1),
Multiplier.bind(multiple=2),
)
handle: DeploymentHandle = serve.run(app)
response = handle.remote(5)
assert response.result() == 12, "(5 + 1) * 2 = 12"
流式 DeploymentHandle 调用#
您还可以使用 DeploymentHandle
进行流式方法调用,该调用返回多个输出。要进行流式调用,方法必须是生成器,并且您必须设置 handle.options(stream=True)
。然后,句柄调用将返回 DeploymentResponseGenerator
,而不是一元的 DeploymentResponse
。您可以将 DeploymentResponseGenerator
用作同步或异步生成器,就像在 async for
代码块中一样。与 DeploymentResponse.result()
类似,应避免在部署内部使用 DeploymentResponseGenerator
作为同步生成器,因为这会阻塞该副本上的其他请求并发执行。请注意,您不能将 DeploymentResponseGenerator
传递给其他句柄调用。
示例
# File name: stream.py
from typing import AsyncGenerator, Generator
from ray import serve
from ray.serve.handle import DeploymentHandle, DeploymentResponseGenerator
@serve.deployment
class Streamer:
def __call__(self, limit: int) -> Generator[int, None, None]:
for i in range(limit):
yield i
@serve.deployment
class Caller:
def __init__(self, streamer: DeploymentHandle):
self._streamer = streamer.options(
# Must set `stream=True` on the handle, then the output will be a
# response generator.
stream=True,
)
async def __call__(self, limit: int) -> AsyncGenerator[int, None]:
# Response generator can be used in an `async for` block.
r: DeploymentResponseGenerator = self._streamer.remote(limit)
async for i in r:
yield i
app = Caller.bind(Streamer.bind())
handle: DeploymentHandle = serve.run(app).options(
stream=True,
)
# Response generator can also be used as a regular generator in a sync context.
r: DeploymentResponseGenerator = handle.remote(10)
assert list(r) == list(range(10))
高级:在嵌套对象中传递 DeploymentResponse [完全弃用]#
警告
在嵌套对象中将 DeploymentResponse
传递给下游句柄调用已完全弃用,不再支持。请手动使用 DeploymentResponse._to_object_ref()
来在嵌套对象中传递相应的对象引用。
仍支持将 DeploymentResponse
对象作为顶级位置参数或关键字参数传递。
高级:将 DeploymentResponse 转换为 Ray ObjectRef#
在底层,每个 DeploymentResponse
对应于一个 Ray ObjectRef
,或者对于流式调用而言是 ObjectRefGenerator
。为了将 DeploymentHandle
调用与 Ray Actors 或 Tasks 组合使用,您可能需要将响应解析为其 ObjectRef
。为此,您可以使用开发者 API DeploymentResponse._to_object_ref
和 DeploymentResponse._to_object_ref_sync
。
示例
# File name: response_to_object_ref.py
import ray
from ray import serve
from ray.serve.handle import DeploymentHandle, DeploymentResponse
@ray.remote
def say_hi_task(inp: str):
return f"Ray task got message: '{inp}'"
@serve.deployment
class SayHi:
def __call__(self) -> str:
return "Hi from Serve deployment"
@serve.deployment
class Ingress:
def __init__(self, say_hi: DeploymentHandle):
self._say_hi = say_hi
async def __call__(self):
# Make a call to the SayHi deployment and pass the result ref to
# a downstream Ray task.
response: DeploymentResponse = self._say_hi.remote()
response_obj_ref: ray.ObjectRef = await response._to_object_ref()
final_obj_ref: ray.ObjectRef = say_hi_task.remote(response_obj_ref)
return await final_obj_ref
app = Ingress.bind(SayHi.bind())
handle: DeploymentHandle = serve.run(app)
assert handle.remote().result() == "Ray task got message: 'Hi from Serve deployment'"