设置 FastAPI 和 HTTP#

本节将帮助您了解如何

  • 向 Serve 部署发送 HTTP 请求

  • 使用 Ray Serve 集成 FastAPI

  • 使用定制的 HTTP 适配器

  • 为您的用例选择使用哪个特性

  • 设置 keep alive 超时

选择正确的 HTTP 特性#

Serve 提供了一种分层方法,通过正确的 HTTP API 公开您的模型。

根据您的用例,您可以选择正确的抽象级别

通过 HTTP 调用部署#

部署 Serve 应用时,入口部署(ingress deployment)(即传递给 serve.run 的部署)将通过 HTTP 公开。

import starlette.requests
import requests
from ray import serve


@serve.deployment
class Counter:
    def __call__(self, request: starlette.requests.Request):
        return request.query_params


serve.run(Counter.bind())
resp = requests.get("http://localhost:8000?a=b&c=d")
assert resp.json() == {"a": "b", "c": "d"}

发送到 Serve HTTP 服务器 / 的请求将路由到部署的 __call__ 方法,并将 Starlette Request 对象作为唯一参数。 __call__ 方法可以返回任何可 JSON 序列化的对象或 Starlette Response 对象(例如,返回自定义状态码或自定义头部)。 通过在 serve.run() 或 Serve 配置文件中设置 route_prefix,可以将 Serve 应用的路由前缀从 / 更改为另一个字符串。

请求取消#

当处理请求的时间超过端到端超时或 HTTP 客户端在接收响应前断开连接时,Serve 会取消正在处理的请求。

  • 如果代理尚未将请求发送到副本,Serve 会直接丢弃该请求。

  • 如果请求已发送到副本,Serve 会尝试中断副本并取消请求。 正在副本上运行处理程序的 asyncio.Task 会被取消,并在下一次进入 await 语句时引发 asyncio.CancelledError。 有关更多信息,请参阅 asyncio 文档。 在 try-except 块中处理此异常,以自定义部署在请求被取消时的行为。

import asyncio
from ray import serve


@serve.deployment
async def startled():
    try:
        print("Replica received request!")
        await asyncio.sleep(10000)
    except asyncio.CancelledError:
        # Add custom behavior that should run
        # upon cancellation here.
        print("Request got cancelled!")

如果在请求完成之前,部署代码中没有剩余的 await 语句,则副本会照常处理请求,将响应发送回代理,然后代理丢弃响应。 在部署中使用 await 语句处理阻塞操作,以便 Serve 可以在不等待阻塞操作完成的情况下取消正在处理的请求。

取消会级联到在部署请求处理方法中生成的任何下游部署句柄、任务或 Actor 调用。 它们可以像入口部署一样处理 asyncio.CancelledError

为了防止异步调用被 asyncio.CancelledError 中断,请使用 asyncio.shield()

import asyncio
from ray import serve


@serve.deployment
class SnoringSleeper:
    async def snore(self):
        await asyncio.sleep(1)
        print("ZZZ")

    async def __call__(self):
        try:
            print("SnoringSleeper received request!")

            # Prevent the snore() method from being cancelled
            await asyncio.shield(self.snore())

        except asyncio.CancelledError:
            print("SnoringSleeper's request was cancelled!")


app = SnoringSleeper.bind()

当请求被取消时,SnoringSleeper 部署的 __call__() 方法内部会引发取消错误。 但是,取消不会在 snore() 调用内部引发,因此即使请求被取消,也会打印 ZZZ。 请注意,不能在 DeploymentHandle 调用上使用 asyncio.shield 来阻止下游处理程序被取消。 您还需要在该处理程序中显式处理取消错误。

FastAPI HTTP 部署#

如果您想定义更复杂的 HTTP 处理逻辑,Serve 可以与 FastAPI 集成。 这允许您使用 @serve.ingress 装饰器来定义一个 Serve 部署,该装饰器封装了一个 FastAPI 应用并提供其全部功能。 下面显示了最基本的示例,但有关 FastAPI 提供的所有功能的更多详细信息,例如可变路由、自动类型验证、依赖注入(例如,用于数据库连接)等,请参阅其文档

注意

集成 FastAPI 的 Serve 应用仍然遵守通过 Serve 设置的 route_prefix。 通过 FastAPI app 对象注册的路由会叠加在路由前缀之上。 例如,如果您的 Serve 应用的 route_prefix = /my_app,并且您使用 @app.get("/fetch_data") 装饰了一个方法,那么您可以通过向路径 /my_app/fetch_data 发送 GET 请求来调用该方法。

import ray
import requests
from fastapi import FastAPI
from ray import serve

app = FastAPI()


@serve.deployment
@serve.ingress(app)
class MyFastAPIDeployment:
    @app.get("/")
    def root(self):
        return "Hello, world!"


serve.run(MyFastAPIDeployment.bind(), route_prefix="/hello")
resp = requests.get("http://localhost:8000/hello")
assert resp.json() == "Hello, world!"

现在,如果您向 /hello 发送请求,它将路由到我们部署的 root 方法。 我们还可以轻松利用 FastAPI 定义具有不同 HTTP 方法的多个路由。

import ray
import requests
from fastapi import FastAPI
from ray import serve

app = FastAPI()


@serve.deployment
@serve.ingress(app)
class MyFastAPIDeployment:
    @app.get("/")
    def root(self):
        return "Hello, world!"

    @app.post("/{subpath}")
    def root(self, subpath: str):
        return f"Hello from {subpath}!"


serve.run(MyFastAPIDeployment.bind(), route_prefix="/hello")
resp = requests.post("http://localhost:8000/hello/Serve")
assert resp.json() == "Hello from Serve!"

您还可以将现有的 FastAPI 应用传递给部署,以便按原样进行服务。

import ray
import requests
from fastapi import FastAPI
from ray import serve

app = FastAPI()


@app.get("/")
def f():
    return "Hello from the root!"


@serve.deployment
@serve.ingress(app)
class FastAPIWrapper:
    pass


serve.run(FastAPIWrapper.bind(), route_prefix="/")
resp = requests.get("http://localhost:8000/")
assert resp.json() == "Hello from the root!"

这对于无需修改即可扩展现有 FastAPI 应用非常有用。 现有的中间件、自动 OpenAPI 文档生成以及其他高级 FastAPI 特性应该可以按原样工作。

WebSockets#

Serve 通过 FastAPI 支持 WebSockets。

from fastapi import FastAPI, WebSocket, WebSocketDisconnect

from ray import serve


app = FastAPI()


@serve.deployment
@serve.ingress(app)
class EchoServer:
    @app.websocket("/")
    async def echo(self, ws: WebSocket):
        await ws.accept()

        try:
            while True:
                text = await ws.receive_text()
                await ws.send_text(text)
        except WebSocketDisconnect:
            print("Client disconnected.")


serve_app = serve.run(EchoServer.bind())

使用 @app.websocket 装饰处理 WebSocket 请求的函数。 在 FastAPI 文档中阅读更多关于 FastAPI WebSockets 的信息。

使用 websockets 包 (pip install websockets) 查询部署。

from websockets.sync.client import connect

with connect("ws://localhost:8000") as websocket:
    websocket.send("Eureka!")
    assert websocket.recv() == "Eureka!"

    websocket.send("I've found it!")
    assert websocket.recv() == "I've found it!"

流式响应#

某些应用必须将增量结果流式传输回调用方。 这对于使用大型语言模型 (LLM) 生成文本或视频处理应用来说很常见。 完整的正向传播可能需要几秒钟,因此在结果可用时提供增量结果可以显著改善用户体验。

要使用 HTTP 响应流,请返回一个包装了 HTTP 处理程序生成器的 StreamingResponse。 基本的 HTTP 入口部署使用 __call__ 方法时支持此功能,使用 FastAPI 集成时也支持。

以下代码定义了一个 Serve 应用,它以增量方式流式传输直到指定 max 的数字。 客户端代码也已更新以处理流式输出。 此代码使用了 requests 库的 stream=True 选项。

import time
from typing import Generator

import requests
from starlette.responses import StreamingResponse
from starlette.requests import Request

from ray import serve


@serve.deployment
class StreamingResponder:
    def generate_numbers(self, max: int) -> Generator[str, None, None]:
        for i in range(max):
            yield str(i)
            time.sleep(0.1)

    def __call__(self, request: Request) -> StreamingResponse:
        max = request.query_params.get("max", "25")
        gen = self.generate_numbers(int(max))
        return StreamingResponse(gen, status_code=200, media_type="text/plain")


serve.run(StreamingResponder.bind())

r = requests.get("http://localhost:8000?max=10", stream=True)
start = time.time()
r.raise_for_status()
for chunk in r.iter_content(chunk_size=None, decode_unicode=True):
    print(f"Got result {round(time.time()-start, 1)}s after start: '{chunk}'")

将此代码保存到 stream.py 中并运行它。

$ python stream.py
[2023-05-25 10:44:23]  INFO ray._private.worker::Started a local Ray instance. View the dashboard at http://127.0.0.1:8265
(ServeController pid=40401) INFO 2023-05-25 10:44:25,296 controller 40401 deployment_state.py:1259 - Deploying new version of deployment default_StreamingResponder.
(ProxyActor pid=40403) INFO:     Started server process [40403]
(ServeController pid=40401) INFO 2023-05-25 10:44:25,333 controller 40401 deployment_state.py:1498 - Adding 1 replica to deployment default_StreamingResponder.
Got result 0.0s after start: '0'
Got result 0.1s after start: '1'
Got result 0.2s after start: '2'
Got result 0.3s after start: '3'
Got result 0.4s after start: '4'
Got result 0.5s after start: '5'
Got result 0.6s after start: '6'
Got result 0.7s after start: '7'
Got result 0.8s after start: '8'
Got result 0.9s after start: '9'
(ServeReplica:default_StreamingResponder pid=41052) INFO 2023-05-25 10:49:52,230 default_StreamingResponder default_StreamingResponder#qlZFCa yomKnJifNJ / default replica.py:634 - __CALL__ OK 1017.6ms

客户端断开连接时终止流#

在某些情况下,您可能希望在客户端断开连接且完整流尚未返回时停止处理请求。 如果您将异步生成器传递给 StreamingResponse,当客户端断开连接时,它将被取消并引发 asyncio.CancelledError。 请注意,生成器中必须在某个点有 await 语句,取消才会发生。

在下面的示例中,生成器将永远流式传输响应,直到客户端断开连接,然后打印已取消并退出。 将此代码保存到 stream.py 中并运行它。

import asyncio
import time
from typing import AsyncGenerator

import requests
from starlette.responses import StreamingResponse
from starlette.requests import Request

from ray import serve


@serve.deployment
class StreamingResponder:
    async def generate_forever(self) -> AsyncGenerator[str, None]:
        try:
            i = 0
            while True:
                yield str(i)
                i += 1
                await asyncio.sleep(0.1)
        except asyncio.CancelledError:
            print("Cancelled! Exiting.")

    def __call__(self, request: Request) -> StreamingResponse:
        gen = self.generate_forever()
        return StreamingResponse(gen, status_code=200, media_type="text/plain")


serve.run(StreamingResponder.bind())

r = requests.get("http://localhost:8000?max=10", stream=True)
start = time.time()
r.raise_for_status()
for i, chunk in enumerate(r.iter_content(chunk_size=None, decode_unicode=True)):
    print(f"Got result {round(time.time()-start, 1)}s after start: '{chunk}'")
    if i == 10:
        print("Client disconnecting")
        break
$ python stream.py
[2023-07-10 16:08:41]  INFO ray._private.worker::Started a local Ray instance. View the dashboard at http://127.0.0.1:8265
(ServeController pid=50801) INFO 2023-07-10 16:08:42,296 controller 40401 deployment_state.py:1259 - Deploying new version of deployment default_StreamingResponder.
(ProxyActor pid=50803) INFO:     Started server process [50803]
(ServeController pid=50805) INFO 2023-07-10 16:08:42,963 controller 50805 deployment_state.py:1586 - Adding 1 replica to deployment default_StreamingResponder.
Got result 0.0s after start: '0'
Got result 0.1s after start: '1'
Got result 0.2s after start: '2'
Got result 0.3s after start: '3'
Got result 0.4s after start: '4'
Got result 0.5s after start: '5'
Got result 0.6s after start: '6'
Got result 0.7s after start: '7'
Got result 0.8s after start: '8'
Got result 0.9s after start: '9'
Got result 1.0s after start: '10'
Client disconnecting
(ServeReplica:default_StreamingResponder pid=50842) Cancelled! Exiting.
(ServeReplica:default_StreamingResponder pid=50842) INFO 2023-07-10 16:08:45,756 default_StreamingResponder default_StreamingResponder#cmpnmF ahteNDQSWx / default replica.py:691 - __CALL__ OK 1019.1ms

设置 keep alive 超时#

Serve 内部使用 Uvicorn HTTP 服务器来服务 HTTP 请求。 默认情况下,Uvicorn 在请求之间保持 HTTP 连接保持活动状态 5 秒。 通过在 Serve 配置文件的 http_options 字段中设置 keep_alive_timeout_s 来修改 keep-alive 超时。 此配置对您的 Ray 集群全局有效,运行时无法更新。 您还可以设置 RAY_SERVE_HTTP_KEEP_ALIVE_TIMEOUT_S 环境变量来设置 keep alive 超时。 如果同时设置了 RAY_SERVE_HTTP_KEEP_ALIVE_TIMEOUT_Skeep_alive_timeout_s 配置,则 RAY_SERVE_HTTP_KEEP_ALIVE_TIMEOUT_S 优先。 有关更多信息,请参阅 Uvicorn 的 keep alive 超时指南