设置 FastAPI 和 HTTP#
本节将帮助您了解如何
向 Serve 部署发送 HTTP 请求
使用 Ray Serve 集成 FastAPI
使用定制的 HTTP 适配器
为您的用例选择使用哪个特性
设置 keep alive 超时
选择正确的 HTTP 特性#
Serve 提供了一种分层方法,通过正确的 HTTP API 公开您的模型。
根据您的用例,您可以选择正确的抽象级别
如果您习惯于处理原始请求对象,请使用
starlette.request.Requests
API。如果您想要一个功能齐全、带有验证和文档生成的 API 服务器,请使用FastAPI 集成。
通过 HTTP 调用部署#
部署 Serve 应用时,入口部署(ingress deployment)(即传递给 serve.run
的部署)将通过 HTTP 公开。
import starlette.requests
import requests
from ray import serve
@serve.deployment
class Counter:
def __call__(self, request: starlette.requests.Request):
return request.query_params
serve.run(Counter.bind())
resp = requests.get("http://localhost:8000?a=b&c=d")
assert resp.json() == {"a": "b", "c": "d"}
发送到 Serve HTTP 服务器 /
的请求将路由到部署的 __call__
方法,并将 Starlette Request 对象作为唯一参数。 __call__
方法可以返回任何可 JSON 序列化的对象或 Starlette Response 对象(例如,返回自定义状态码或自定义头部)。 通过在 serve.run()
或 Serve 配置文件中设置 route_prefix
,可以将 Serve 应用的路由前缀从 /
更改为另一个字符串。
请求取消#
当处理请求的时间超过端到端超时或 HTTP 客户端在接收响应前断开连接时,Serve 会取消正在处理的请求。
如果代理尚未将请求发送到副本,Serve 会直接丢弃该请求。
如果请求已发送到副本,Serve 会尝试中断副本并取消请求。 正在副本上运行处理程序的
asyncio.Task
会被取消,并在下一次进入await
语句时引发asyncio.CancelledError
。 有关更多信息,请参阅 asyncio 文档。 在 try-except 块中处理此异常,以自定义部署在请求被取消时的行为。
import asyncio
from ray import serve
@serve.deployment
async def startled():
try:
print("Replica received request!")
await asyncio.sleep(10000)
except asyncio.CancelledError:
# Add custom behavior that should run
# upon cancellation here.
print("Request got cancelled!")
如果在请求完成之前,部署代码中没有剩余的 await
语句,则副本会照常处理请求,将响应发送回代理,然后代理丢弃响应。 在部署中使用 await
语句处理阻塞操作,以便 Serve 可以在不等待阻塞操作完成的情况下取消正在处理的请求。
取消会级联到在部署请求处理方法中生成的任何下游部署句柄、任务或 Actor 调用。 它们可以像入口部署一样处理 asyncio.CancelledError
。
为了防止异步调用被 asyncio.CancelledError
中断,请使用 asyncio.shield()
import asyncio
from ray import serve
@serve.deployment
class SnoringSleeper:
async def snore(self):
await asyncio.sleep(1)
print("ZZZ")
async def __call__(self):
try:
print("SnoringSleeper received request!")
# Prevent the snore() method from being cancelled
await asyncio.shield(self.snore())
except asyncio.CancelledError:
print("SnoringSleeper's request was cancelled!")
app = SnoringSleeper.bind()
当请求被取消时,SnoringSleeper
部署的 __call__()
方法内部会引发取消错误。 但是,取消不会在 snore()
调用内部引发,因此即使请求被取消,也会打印 ZZZ
。 请注意,不能在 DeploymentHandle
调用上使用 asyncio.shield
来阻止下游处理程序被取消。 您还需要在该处理程序中显式处理取消错误。
FastAPI HTTP 部署#
如果您想定义更复杂的 HTTP 处理逻辑,Serve 可以与 FastAPI 集成。 这允许您使用 @serve.ingress
装饰器来定义一个 Serve 部署,该装饰器封装了一个 FastAPI 应用并提供其全部功能。 下面显示了最基本的示例,但有关 FastAPI 提供的所有功能的更多详细信息,例如可变路由、自动类型验证、依赖注入(例如,用于数据库连接)等,请参阅其文档。
注意
集成 FastAPI 的 Serve 应用仍然遵守通过 Serve 设置的 route_prefix
。 通过 FastAPI app
对象注册的路由会叠加在路由前缀之上。 例如,如果您的 Serve 应用的 route_prefix = /my_app
,并且您使用 @app.get("/fetch_data")
装饰了一个方法,那么您可以通过向路径 /my_app/fetch_data
发送 GET 请求来调用该方法。
import ray
import requests
from fastapi import FastAPI
from ray import serve
app = FastAPI()
@serve.deployment
@serve.ingress(app)
class MyFastAPIDeployment:
@app.get("/")
def root(self):
return "Hello, world!"
serve.run(MyFastAPIDeployment.bind(), route_prefix="/hello")
resp = requests.get("http://localhost:8000/hello")
assert resp.json() == "Hello, world!"
现在,如果您向 /hello
发送请求,它将路由到我们部署的 root
方法。 我们还可以轻松利用 FastAPI 定义具有不同 HTTP 方法的多个路由。
import ray
import requests
from fastapi import FastAPI
from ray import serve
app = FastAPI()
@serve.deployment
@serve.ingress(app)
class MyFastAPIDeployment:
@app.get("/")
def root(self):
return "Hello, world!"
@app.post("/{subpath}")
def root(self, subpath: str):
return f"Hello from {subpath}!"
serve.run(MyFastAPIDeployment.bind(), route_prefix="/hello")
resp = requests.post("http://localhost:8000/hello/Serve")
assert resp.json() == "Hello from Serve!"
您还可以将现有的 FastAPI 应用传递给部署,以便按原样进行服务。
import ray
import requests
from fastapi import FastAPI
from ray import serve
app = FastAPI()
@app.get("/")
def f():
return "Hello from the root!"
@serve.deployment
@serve.ingress(app)
class FastAPIWrapper:
pass
serve.run(FastAPIWrapper.bind(), route_prefix="/")
resp = requests.get("http://localhost:8000/")
assert resp.json() == "Hello from the root!"
这对于无需修改即可扩展现有 FastAPI 应用非常有用。 现有的中间件、自动 OpenAPI 文档生成以及其他高级 FastAPI 特性应该可以按原样工作。
WebSockets#
Serve 通过 FastAPI 支持 WebSockets。
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from ray import serve
app = FastAPI()
@serve.deployment
@serve.ingress(app)
class EchoServer:
@app.websocket("/")
async def echo(self, ws: WebSocket):
await ws.accept()
try:
while True:
text = await ws.receive_text()
await ws.send_text(text)
except WebSocketDisconnect:
print("Client disconnected.")
serve_app = serve.run(EchoServer.bind())
使用 @app.websocket
装饰处理 WebSocket 请求的函数。 在 FastAPI 文档中阅读更多关于 FastAPI WebSockets 的信息。
使用 websockets
包 (pip install websockets
) 查询部署。
from websockets.sync.client import connect
with connect("ws://localhost:8000") as websocket:
websocket.send("Eureka!")
assert websocket.recv() == "Eureka!"
websocket.send("I've found it!")
assert websocket.recv() == "I've found it!"
流式响应#
某些应用必须将增量结果流式传输回调用方。 这对于使用大型语言模型 (LLM) 生成文本或视频处理应用来说很常见。 完整的正向传播可能需要几秒钟,因此在结果可用时提供增量结果可以显著改善用户体验。
要使用 HTTP 响应流,请返回一个包装了 HTTP 处理程序生成器的 StreamingResponse。 基本的 HTTP 入口部署使用 __call__
方法时支持此功能,使用 FastAPI 集成时也支持。
以下代码定义了一个 Serve 应用,它以增量方式流式传输直到指定 max
的数字。 客户端代码也已更新以处理流式输出。 此代码使用了 requests 库的 stream=True
选项。
import time
from typing import Generator
import requests
from starlette.responses import StreamingResponse
from starlette.requests import Request
from ray import serve
@serve.deployment
class StreamingResponder:
def generate_numbers(self, max: int) -> Generator[str, None, None]:
for i in range(max):
yield str(i)
time.sleep(0.1)
def __call__(self, request: Request) -> StreamingResponse:
max = request.query_params.get("max", "25")
gen = self.generate_numbers(int(max))
return StreamingResponse(gen, status_code=200, media_type="text/plain")
serve.run(StreamingResponder.bind())
r = requests.get("http://localhost:8000?max=10", stream=True)
start = time.time()
r.raise_for_status()
for chunk in r.iter_content(chunk_size=None, decode_unicode=True):
print(f"Got result {round(time.time()-start, 1)}s after start: '{chunk}'")
将此代码保存到 stream.py
中并运行它。
$ python stream.py
[2023-05-25 10:44:23] INFO ray._private.worker::Started a local Ray instance. View the dashboard at http://127.0.0.1:8265
(ServeController pid=40401) INFO 2023-05-25 10:44:25,296 controller 40401 deployment_state.py:1259 - Deploying new version of deployment default_StreamingResponder.
(ProxyActor pid=40403) INFO: Started server process [40403]
(ServeController pid=40401) INFO 2023-05-25 10:44:25,333 controller 40401 deployment_state.py:1498 - Adding 1 replica to deployment default_StreamingResponder.
Got result 0.0s after start: '0'
Got result 0.1s after start: '1'
Got result 0.2s after start: '2'
Got result 0.3s after start: '3'
Got result 0.4s after start: '4'
Got result 0.5s after start: '5'
Got result 0.6s after start: '6'
Got result 0.7s after start: '7'
Got result 0.8s after start: '8'
Got result 0.9s after start: '9'
(ServeReplica:default_StreamingResponder pid=41052) INFO 2023-05-25 10:49:52,230 default_StreamingResponder default_StreamingResponder#qlZFCa yomKnJifNJ / default replica.py:634 - __CALL__ OK 1017.6ms
客户端断开连接时终止流#
在某些情况下,您可能希望在客户端断开连接且完整流尚未返回时停止处理请求。 如果您将异步生成器传递给 StreamingResponse
,当客户端断开连接时,它将被取消并引发 asyncio.CancelledError
。 请注意,生成器中必须在某个点有 await
语句,取消才会发生。
在下面的示例中,生成器将永远流式传输响应,直到客户端断开连接,然后打印已取消并退出。 将此代码保存到 stream.py
中并运行它。
import asyncio
import time
from typing import AsyncGenerator
import requests
from starlette.responses import StreamingResponse
from starlette.requests import Request
from ray import serve
@serve.deployment
class StreamingResponder:
async def generate_forever(self) -> AsyncGenerator[str, None]:
try:
i = 0
while True:
yield str(i)
i += 1
await asyncio.sleep(0.1)
except asyncio.CancelledError:
print("Cancelled! Exiting.")
def __call__(self, request: Request) -> StreamingResponse:
gen = self.generate_forever()
return StreamingResponse(gen, status_code=200, media_type="text/plain")
serve.run(StreamingResponder.bind())
r = requests.get("http://localhost:8000?max=10", stream=True)
start = time.time()
r.raise_for_status()
for i, chunk in enumerate(r.iter_content(chunk_size=None, decode_unicode=True)):
print(f"Got result {round(time.time()-start, 1)}s after start: '{chunk}'")
if i == 10:
print("Client disconnecting")
break
$ python stream.py
[2023-07-10 16:08:41] INFO ray._private.worker::Started a local Ray instance. View the dashboard at http://127.0.0.1:8265
(ServeController pid=50801) INFO 2023-07-10 16:08:42,296 controller 40401 deployment_state.py:1259 - Deploying new version of deployment default_StreamingResponder.
(ProxyActor pid=50803) INFO: Started server process [50803]
(ServeController pid=50805) INFO 2023-07-10 16:08:42,963 controller 50805 deployment_state.py:1586 - Adding 1 replica to deployment default_StreamingResponder.
Got result 0.0s after start: '0'
Got result 0.1s after start: '1'
Got result 0.2s after start: '2'
Got result 0.3s after start: '3'
Got result 0.4s after start: '4'
Got result 0.5s after start: '5'
Got result 0.6s after start: '6'
Got result 0.7s after start: '7'
Got result 0.8s after start: '8'
Got result 0.9s after start: '9'
Got result 1.0s after start: '10'
Client disconnecting
(ServeReplica:default_StreamingResponder pid=50842) Cancelled! Exiting.
(ServeReplica:default_StreamingResponder pid=50842) INFO 2023-07-10 16:08:45,756 default_StreamingResponder default_StreamingResponder#cmpnmF ahteNDQSWx / default replica.py:691 - __CALL__ OK 1019.1ms
设置 keep alive 超时#
Serve 内部使用 Uvicorn HTTP 服务器来服务 HTTP 请求。 默认情况下,Uvicorn 在请求之间保持 HTTP 连接保持活动状态 5 秒。 通过在 Serve 配置文件的 http_options
字段中设置 keep_alive_timeout_s
来修改 keep-alive 超时。 此配置对您的 Ray 集群全局有效,运行时无法更新。 您还可以设置 RAY_SERVE_HTTP_KEEP_ALIVE_TIMEOUT_S
环境变量来设置 keep alive 超时。 如果同时设置了 RAY_SERVE_HTTP_KEEP_ALIVE_TIMEOUT_S
和 keep_alive_timeout_s
配置,则 RAY_SERVE_HTTP_KEEP_ALIVE_TIMEOUT_S
优先。 有关更多信息,请参阅 Uvicorn 的 keep alive 超时指南。