设置 FastAPI 和 HTTP#
本节帮助您了解如何
向 Serve 部署发送 HTTP 请求
使用 Ray Serve 与 FastAPI 集成
使用自定义 HTTP 适配器
为您的用例选择合适的功能
设置保活超时
选择合适的 HTTP 功能#
Serve 提供了一个分层的方法来公开您的模型以及合适的 HTTP API。
根据您的用例,您可以选择合适的抽象级别
如果您习惯使用原始请求对象,请使用
starlette.request.RequestsAPI。如果您想要一个功能齐全的 API 服务器,具有验证和文档生成功能,请使用 FastAPI 集成。
通过 HTTP 调用部署#
当您部署一个 Serve 应用程序时,入口部署(传递给 serve.run 的那个)将通过 HTTP 公开。
import starlette.requests
import requests
from ray import serve
@serve.deployment
class Counter:
def __call__(self, request: starlette.requests.Request):
return request.query_params
serve.run(Counter.bind())
resp = requests.get("https://:8000?a=b&c=d")
assert resp.json() == {"a": "b", "c": "d"}
对 / 处的 Serve HTTP 服务器的请求将被路由到部署的 __call__ 方法,并将 Starlette Request 对象 作为唯一的参数。 __call__ 方法可以返回任何可 JSON 序列化的对象或 Starlette Response 对象(例如,返回自定义状态码或自定义标头)。Serve 应用程序的路由前缀可以从 / 更改为其他字符串,方法是在 serve.run() 或 Serve 配置文件中设置 route_prefix。
请求取消#
当处理请求的时间超过端到端超时时间,或者 HTTP 客户端在收到响应之前断开连接时,Serve 会取消正在进行的请求
如果代理尚未将请求发送到副本,Serve 会简单地丢弃该请求。
如果请求已发送到副本,Serve 会尝试中断副本并取消请求。在副本上运行处理程序的
asyncio.Task将被取消,并在下次进入await语句时引发asyncio.CancelledError。有关更多信息,请参阅 asyncio 文档。在 try-except 块中处理此异常,以自定义部署在请求被取消时的行为
import asyncio
from ray import serve
@serve.deployment
async def startled():
try:
print("Replica received request!")
await asyncio.sleep(10000)
except asyncio.CancelledError:
# Add custom behavior that should run
# upon cancellation here.
print("Request got cancelled!")
如果部署中的代码在请求完成前没有剩余的 await 语句,副本将照常处理请求,将响应发送回代理,然后代理会丢弃该响应。请在部署中使用 await 语句处理阻塞操作,以便 Serve 可以取消正在进行的请求而无需等待阻塞操作完成。
取消会级联到在部署的请求处理方法中生成的任何下游部署句柄、任务或 actor 调用。这些可以以与入口部署相同的方式处理 asyncio.CancelledError。
为了防止异步调用被 asyncio.CancelledError 中断,请使用 asyncio.shield()
import asyncio
from ray import serve
@serve.deployment
class SnoringSleeper:
async def snore(self):
await asyncio.sleep(1)
print("ZZZ")
async def __call__(self):
try:
print("SnoringSleeper received request!")
# Prevent the snore() method from being cancelled
await asyncio.shield(self.snore())
except asyncio.CancelledError:
print("SnoringSleeper's request was cancelled!")
app = SnoringSleeper.bind()
当请求被取消时,会在 SnoringSleeper 部署的 __call__() 方法内部引发一个取消错误。但是,取消不会在 snore() 调用内部引发,因此即使请求被取消,也会打印 ZZZ。请注意,asyncio.shield 不能用于 DeploymentHandle 调用以防止下游处理程序被取消。您需要在该处理程序中显式处理取消错误。
FastAPI HTTP 部署#
如果您想定义更复杂的 HTTP 处理逻辑,Serve 集成了 FastAPI。这允许您使用 @serve.ingress 装饰器来定义一个 Serve 部署,该部署包装了一个 FastAPI 应用程序及其全部功能。最简单的示例如下所示,但有关 FastAPI 提供的所有功能,例如变量路由、自动类型验证、依赖注入(例如,数据库连接)等的更多详细信息,请参阅 其文档。
注意
与 FastAPI 集成的 Serve 应用程序仍然尊重通过 Serve 设置的 route_prefix。通过 FastAPI app 对象注册的路由位于路由前缀之上。例如,如果您的 Serve 应用程序具有 route_prefix = /my_app,并且您使用 @app.get("/fetch_data") 装饰了一个方法,那么您可以通过发送到路径 /my_app/fetch_data 的 GET 请求来调用该方法。
import ray
import requests
from fastapi import FastAPI
from ray import serve
app = FastAPI()
@serve.deployment
@serve.ingress(app)
class MyFastAPIDeployment:
@app.get("/")
def root(self):
return "Hello, world!"
serve.run(MyFastAPIDeployment.bind(), route_prefix="/hello")
resp = requests.get("https://:8000/hello")
assert resp.json() == "Hello, world!"
现在,如果您向 /hello 发送请求,该请求将被路由到我们部署的 root 方法。我们还可以轻松地利用 FastAPI 来定义具有不同 HTTP 方法的多个路由
import ray
import requests
from fastapi import FastAPI
from ray import serve
app = FastAPI()
@serve.deployment
@serve.ingress(app)
class MyFastAPIDeployment:
@app.get("/")
def root(self):
return "Hello, world!"
@app.post("/{subpath}")
def root(self, subpath: str):
return f"Hello from {subpath}!"
serve.run(MyFastAPIDeployment.bind(), route_prefix="/hello")
resp = requests.post("https://:8000/hello/Serve")
assert resp.json() == "Hello from Serve!"
您也可以将现有的 FastAPI 应用传递给部署,以按原样提供服务
import ray
import requests
from fastapi import FastAPI
from ray import serve
app = FastAPI()
@app.get("/")
def f():
return "Hello from the root!"
@serve.deployment
@serve.ingress(app)
class FastAPIWrapper:
pass
serve.run(FastAPIWrapper.bind(), route_prefix="/")
resp = requests.get("https://:8000/")
assert resp.json() == "Hello from the root!"
这对于扩展现有的 FastAPI 应用而无需修改非常有用。现有的中间件、自动 OpenAPI 文档生成和其他高级 FastAPI 功能应按原样工作。
WebSockets#
Serve 支持通过 FastAPI 进行 WebSockets
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from ray import serve
app = FastAPI()
@serve.deployment
@serve.ingress(app)
class EchoServer:
@app.websocket("/")
async def echo(self, ws: WebSocket):
await ws.accept()
try:
while True:
text = await ws.receive_text()
await ws.send_text(text)
except WebSocketDisconnect:
print("Client disconnected.")
serve_app = serve.run(EchoServer.bind())
使用 @app.websocket 装饰处理 WebSocket 请求的函数。在 FastAPI 文档 中阅读有关 FastAPI WebSockets 的更多信息。
使用 websockets 包(pip install websockets)查询部署
from websockets.sync.client import connect
with connect("ws://:8000") as websocket:
websocket.send("Eureka!")
assert websocket.recv() == "Eureka!"
websocket.send("I've found it!")
assert websocket.recv() == "I've found it!"
FastAPI 工厂模式#
Ray Serve 的对象模式(如前所示)要求 FastAPI 对象可以通过 cloudpickle 进行序列化,这会阻止使用一些标准库,例如 FastAPIInstrumentor,因为它们依赖于非可序列化组件,如线程锁。工厂模式直接在每个副本上创建 FastAPI 对象,从而避免了 FastAPI 对象序列化的需要。
import requests
from fastapi import FastAPI
from ray import serve
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
@serve.deployment
class ChildDeployment:
def __call__(self):
return "Hello from the child deployment!"
def fastapi_factory():
"""Factory-style FastAPI app used as Serve ingress.
We build the FastAPI app inside a factory and pass the callable to
@serve.ingress.
"""
app = FastAPI()
# In an object-based ingress (where the FastAPI app is stored on the
# deployment instance), Ray would need to serialize the app and its
# instrumentation. Some instrumentors (like FastAPIInstrumentor) are not
# picklable, which can cause serialization failures. Creating and
# instrumenting the app here sidesteps that issue.
FastAPIInstrumentor.instrument_app(app)
@app.get("/")
async def root():
# Handlers defined inside this factory don't have access to the
# ParentDeployment instance (i.e., there's no `self` here), so we
# can't call `self.child`. Instead, fetch a handle by deployment name.
handle = serve.get_deployment_handle("ChildDeployment", app_name="default")
return {"message": await handle.remote()}
return app
@serve.deployment
@serve.ingress(fastapi_factory)
class ParentDeployment:
def __init__(self, child):
self.child = child
serve.run(ParentDeployment.bind(ChildDeployment.bind()))
resp = requests.get("https://:8000/")
assert resp.json() == {"message": "Hello from the child deployment!"}
流式响应#
某些应用程序必须将增量结果流式传输回调用者。这对于使用大型语言模型 (LLM) 的文本生成或视频处理应用程序很常见。完整的正向传递可能需要几秒钟,因此提供可用时的增量结果可以提供更好的用户体验。
要使用 HTTP 响应流式传输,请返回一个 StreamingResponse,它包装了 HTTP 处理程序中的生成器。这支持使用 __call__ 方法的基本 HTTP 入口部署,以及使用 FastAPI 集成。
下面的代码定义了一个 Serve 应用程序,该应用程序会增量地将数字流式传输到提供的 max。客户端代码也已更新以处理流式输出。此代码使用 requests 库的 stream=True 选项。
import time
from typing import Generator
import requests
from starlette.responses import StreamingResponse
from starlette.requests import Request
from ray import serve
@serve.deployment
class StreamingResponder:
def generate_numbers(self, max: int) -> Generator[str, None, None]:
for i in range(max):
yield str(i)
time.sleep(0.1)
def __call__(self, request: Request) -> StreamingResponse:
max = request.query_params.get("max", "25")
gen = self.generate_numbers(int(max))
return StreamingResponse(gen, status_code=200, media_type="text/plain")
serve.run(StreamingResponder.bind())
r = requests.get("https://:8000?max=10", stream=True)
start = time.time()
r.raise_for_status()
for chunk in r.iter_content(chunk_size=None, decode_unicode=True):
print(f"Got result {round(time.time()-start, 1)}s after start: '{chunk}'")
将此代码保存在 stream.py 中并运行它
$ python stream.py
[2023-05-25 10:44:23] INFO ray._private.worker::Started a local Ray instance. View the dashboard at http://127.0.0.1:8265
(ServeController pid=40401) INFO 2023-05-25 10:44:25,296 controller 40401 deployment_state.py:1259 - Deploying new version of deployment default_StreamingResponder.
(ProxyActor pid=40403) INFO: Started server process [40403]
(ServeController pid=40401) INFO 2023-05-25 10:44:25,333 controller 40401 deployment_state.py:1498 - Adding 1 replica to deployment default_StreamingResponder.
Got result 0.0s after start: '0'
Got result 0.1s after start: '1'
Got result 0.2s after start: '2'
Got result 0.3s after start: '3'
Got result 0.4s after start: '4'
Got result 0.5s after start: '5'
Got result 0.6s after start: '6'
Got result 0.7s after start: '7'
Got result 0.8s after start: '8'
Got result 0.9s after start: '9'
(ServeReplica:default_StreamingResponder pid=41052) INFO 2023-05-25 10:49:52,230 default_StreamingResponder default_StreamingResponder#qlZFCa yomKnJifNJ / default replica.py:634 - __CALL__ OK 1017.6ms
客户端断开连接时终止流#
在某些情况下,您可能希望在客户端断开连接且尚未返回完整流之前停止处理请求。如果您将异步生成器传递给 StreamingResponse,则在客户端断开连接时,它将被取消并引发 asyncio.CancelledError。请注意,您必须在生成器中的某个点 await,以便发生取消。
在下面的示例中,生成器将永远流式传输响应,直到客户端断开连接,然后它会打印出已被取消并退出。将此代码保存在 stream.py 中并运行它
import asyncio
import time
from typing import AsyncGenerator
import requests
from starlette.responses import StreamingResponse
from starlette.requests import Request
from ray import serve
@serve.deployment
class StreamingResponder:
async def generate_forever(self) -> AsyncGenerator[str, None]:
try:
i = 0
while True:
yield str(i)
i += 1
await asyncio.sleep(0.1)
except asyncio.CancelledError:
print("Cancelled! Exiting.")
def __call__(self, request: Request) -> StreamingResponse:
gen = self.generate_forever()
return StreamingResponse(gen, status_code=200, media_type="text/plain")
serve.run(StreamingResponder.bind())
r = requests.get("https://:8000?max=10", stream=True)
start = time.time()
r.raise_for_status()
for i, chunk in enumerate(r.iter_content(chunk_size=None, decode_unicode=True)):
print(f"Got result {round(time.time()-start, 1)}s after start: '{chunk}'")
if i == 10:
print("Client disconnecting")
break
$ python stream.py
[2023-07-10 16:08:41] INFO ray._private.worker::Started a local Ray instance. View the dashboard at http://127.0.0.1:8265
(ServeController pid=50801) INFO 2023-07-10 16:08:42,296 controller 40401 deployment_state.py:1259 - Deploying new version of deployment default_StreamingResponder.
(ProxyActor pid=50803) INFO: Started server process [50803]
(ServeController pid=50805) INFO 2023-07-10 16:08:42,963 controller 50805 deployment_state.py:1586 - Adding 1 replica to deployment default_StreamingResponder.
Got result 0.0s after start: '0'
Got result 0.1s after start: '1'
Got result 0.2s after start: '2'
Got result 0.3s after start: '3'
Got result 0.4s after start: '4'
Got result 0.5s after start: '5'
Got result 0.6s after start: '6'
Got result 0.7s after start: '7'
Got result 0.8s after start: '8'
Got result 0.9s after start: '9'
Got result 1.0s after start: '10'
Client disconnecting
(ServeReplica:default_StreamingResponder pid=50842) Cancelled! Exiting.
(ServeReplica:default_StreamingResponder pid=50842) INFO 2023-07-10 16:08:45,756 default_StreamingResponder default_StreamingResponder#cmpnmF ahteNDQSWx / default replica.py:691 - __CALL__ OK 1019.1ms
设置保活超时#
Serve 在内部使用 Uvicorn HTTP 服务器来处理 HTTP 请求。默认情况下,Uvicorn 在请求之间将 HTTP 连接保持打开状态 5 秒。通过在 Serve 配置文件中的 http_options 字段设置 keep_alive_timeout_s 来修改保活超时。此配置对您的 Ray 集群是全局的,您无法在运行时更新它。您也可以设置 RAY_SERVE_HTTP_KEEP_ALIVE_TIMEOUT_S 环境变量来设置保活超时。如果同时设置了 RAY_SERVE_HTTP_KEEP_ALIVE_TIMEOUT_S 和 keep_alive_timeout_s,则 RAY_SERVE_HTTP_KEEP_ALIVE_TIMEOUT_S 具有更高的优先级。有关更多信息,请参阅 Uvicorn 的保活超时 指南。