设置 FastAPI 和 HTTP#

本节帮助您了解如何

  • 向 Serve 部署发送 HTTP 请求

  • 使用 Ray Serve 与 FastAPI 集成

  • 使用自定义 HTTP 适配器

  • 为您的用例选择合适的功能

  • 设置保活超时

选择合适的 HTTP 功能#

Serve 提供了一个分层的方法来公开您的模型以及合适的 HTTP API。

根据您的用例,您可以选择合适的抽象级别

通过 HTTP 调用部署#

当您部署一个 Serve 应用程序时,入口部署(传递给 serve.run 的那个)将通过 HTTP 公开。

import starlette.requests
import requests
from ray import serve


@serve.deployment
class Counter:
    def __call__(self, request: starlette.requests.Request):
        return request.query_params


serve.run(Counter.bind())
resp = requests.get("https://:8000?a=b&c=d")
assert resp.json() == {"a": "b", "c": "d"}

/ 处的 Serve HTTP 服务器的请求将被路由到部署的 __call__ 方法,并将 Starlette Request 对象 作为唯一的参数。 __call__ 方法可以返回任何可 JSON 序列化的对象或 Starlette Response 对象(例如,返回自定义状态码或自定义标头)。Serve 应用程序的路由前缀可以从 / 更改为其他字符串,方法是在 serve.run() 或 Serve 配置文件中设置 route_prefix

请求取消#

当处理请求的时间超过端到端超时时间,或者 HTTP 客户端在收到响应之前断开连接时,Serve 会取消正在进行的请求

  • 如果代理尚未将请求发送到副本,Serve 会简单地丢弃该请求。

  • 如果请求已发送到副本,Serve 会尝试中断副本并取消请求。在副本上运行处理程序的 asyncio.Task 将被取消,并在下次进入 await 语句时引发 asyncio.CancelledError。有关更多信息,请参阅 asyncio 文档。在 try-except 块中处理此异常,以自定义部署在请求被取消时的行为

import asyncio
from ray import serve


@serve.deployment
async def startled():
    try:
        print("Replica received request!")
        await asyncio.sleep(10000)
    except asyncio.CancelledError:
        # Add custom behavior that should run
        # upon cancellation here.
        print("Request got cancelled!")

如果部署中的代码在请求完成前没有剩余的 await 语句,副本将照常处理请求,将响应发送回代理,然后代理会丢弃该响应。请在部署中使用 await 语句处理阻塞操作,以便 Serve 可以取消正在进行的请求而无需等待阻塞操作完成。

取消会级联到在部署的请求处理方法中生成的任何下游部署句柄、任务或 actor 调用。这些可以以与入口部署相同的方式处理 asyncio.CancelledError

为了防止异步调用被 asyncio.CancelledError 中断,请使用 asyncio.shield()

import asyncio
from ray import serve


@serve.deployment
class SnoringSleeper:
    async def snore(self):
        await asyncio.sleep(1)
        print("ZZZ")

    async def __call__(self):
        try:
            print("SnoringSleeper received request!")

            # Prevent the snore() method from being cancelled
            await asyncio.shield(self.snore())

        except asyncio.CancelledError:
            print("SnoringSleeper's request was cancelled!")


app = SnoringSleeper.bind()

当请求被取消时,会在 SnoringSleeper 部署的 __call__() 方法内部引发一个取消错误。但是,取消不会在 snore() 调用内部引发,因此即使请求被取消,也会打印 ZZZ。请注意,asyncio.shield 不能用于 DeploymentHandle 调用以防止下游处理程序被取消。您需要在该处理程序中显式处理取消错误。

FastAPI HTTP 部署#

如果您想定义更复杂的 HTTP 处理逻辑,Serve 集成了 FastAPI。这允许您使用 @serve.ingress 装饰器来定义一个 Serve 部署,该部署包装了一个 FastAPI 应用程序及其全部功能。最简单的示例如下所示,但有关 FastAPI 提供的所有功能,例如变量路由、自动类型验证、依赖注入(例如,数据库连接)等的更多详细信息,请参阅 其文档

注意

与 FastAPI 集成的 Serve 应用程序仍然尊重通过 Serve 设置的 route_prefix。通过 FastAPI app 对象注册的路由位于路由前缀之上。例如,如果您的 Serve 应用程序具有 route_prefix = /my_app,并且您使用 @app.get("/fetch_data") 装饰了一个方法,那么您可以通过发送到路径 /my_app/fetch_data 的 GET 请求来调用该方法。

import ray
import requests
from fastapi import FastAPI
from ray import serve

app = FastAPI()


@serve.deployment
@serve.ingress(app)
class MyFastAPIDeployment:
    @app.get("/")
    def root(self):
        return "Hello, world!"


serve.run(MyFastAPIDeployment.bind(), route_prefix="/hello")
resp = requests.get("https://:8000/hello")
assert resp.json() == "Hello, world!"

现在,如果您向 /hello 发送请求,该请求将被路由到我们部署的 root 方法。我们还可以轻松地利用 FastAPI 来定义具有不同 HTTP 方法的多个路由

import ray
import requests
from fastapi import FastAPI
from ray import serve

app = FastAPI()


@serve.deployment
@serve.ingress(app)
class MyFastAPIDeployment:
    @app.get("/")
    def root(self):
        return "Hello, world!"

    @app.post("/{subpath}")
    def root(self, subpath: str):
        return f"Hello from {subpath}!"


serve.run(MyFastAPIDeployment.bind(), route_prefix="/hello")
resp = requests.post("https://:8000/hello/Serve")
assert resp.json() == "Hello from Serve!"

您也可以将现有的 FastAPI 应用传递给部署,以按原样提供服务

import ray
import requests
from fastapi import FastAPI
from ray import serve

app = FastAPI()


@app.get("/")
def f():
    return "Hello from the root!"


@serve.deployment
@serve.ingress(app)
class FastAPIWrapper:
    pass


serve.run(FastAPIWrapper.bind(), route_prefix="/")
resp = requests.get("https://:8000/")
assert resp.json() == "Hello from the root!"

这对于扩展现有的 FastAPI 应用而无需修改非常有用。现有的中间件、自动 OpenAPI 文档生成和其他高级 FastAPI 功能应按原样工作。

WebSockets#

Serve 支持通过 FastAPI 进行 WebSockets

from fastapi import FastAPI, WebSocket, WebSocketDisconnect

from ray import serve


app = FastAPI()


@serve.deployment
@serve.ingress(app)
class EchoServer:
    @app.websocket("/")
    async def echo(self, ws: WebSocket):
        await ws.accept()

        try:
            while True:
                text = await ws.receive_text()
                await ws.send_text(text)
        except WebSocketDisconnect:
            print("Client disconnected.")


serve_app = serve.run(EchoServer.bind())

使用 @app.websocket 装饰处理 WebSocket 请求的函数。在 FastAPI 文档 中阅读有关 FastAPI WebSockets 的更多信息。

使用 websockets 包(pip install websockets)查询部署

from websockets.sync.client import connect

with connect("ws://:8000") as websocket:
    websocket.send("Eureka!")
    assert websocket.recv() == "Eureka!"

    websocket.send("I've found it!")
    assert websocket.recv() == "I've found it!"

FastAPI 工厂模式#

Ray Serve 的对象模式(如前所示)要求 FastAPI 对象可以通过 cloudpickle 进行序列化,这会阻止使用一些标准库,例如 FastAPIInstrumentor,因为它们依赖于非可序列化组件,如线程锁。工厂模式直接在每个副本上创建 FastAPI 对象,从而避免了 FastAPI 对象序列化的需要。

import requests
from fastapi import FastAPI
from ray import serve
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor


@serve.deployment
class ChildDeployment:
    def __call__(self):
        return "Hello from the child deployment!"


def fastapi_factory():
    """Factory-style FastAPI app used as Serve ingress.

    We build the FastAPI app inside a factory and pass the callable to
    @serve.ingress.
    """
    app = FastAPI()

    # In an object-based ingress (where the FastAPI app is stored on the
    # deployment instance), Ray would need to serialize the app and its
    # instrumentation. Some instrumentors (like FastAPIInstrumentor) are not
    # picklable, which can cause serialization failures. Creating and
    # instrumenting the app here sidesteps that issue.
    FastAPIInstrumentor.instrument_app(app)

    @app.get("/")
    async def root():
        # Handlers defined inside this factory don't have access to the
        # ParentDeployment instance (i.e., there's no `self` here), so we
        # can't call `self.child`. Instead, fetch a handle by deployment name.
        handle = serve.get_deployment_handle("ChildDeployment", app_name="default")
        return {"message": await handle.remote()}

    return app


@serve.deployment
@serve.ingress(fastapi_factory)
class ParentDeployment:
    def __init__(self, child):
        self.child = child


serve.run(ParentDeployment.bind(ChildDeployment.bind()))

resp = requests.get("https://:8000/")
assert resp.json() == {"message": "Hello from the child deployment!"}

流式响应#

某些应用程序必须将增量结果流式传输回调用者。这对于使用大型语言模型 (LLM) 的文本生成或视频处理应用程序很常见。完整的正向传递可能需要几秒钟,因此提供可用时的增量结果可以提供更好的用户体验。

要使用 HTTP 响应流式传输,请返回一个 StreamingResponse,它包装了 HTTP 处理程序中的生成器。这支持使用 __call__ 方法的基本 HTTP 入口部署,以及使用 FastAPI 集成

下面的代码定义了一个 Serve 应用程序,该应用程序会增量地将数字流式传输到提供的 max。客户端代码也已更新以处理流式输出。此代码使用 requests 库的 stream=True 选项。

import time
from typing import Generator

import requests
from starlette.responses import StreamingResponse
from starlette.requests import Request

from ray import serve


@serve.deployment
class StreamingResponder:
    def generate_numbers(self, max: int) -> Generator[str, None, None]:
        for i in range(max):
            yield str(i)
            time.sleep(0.1)

    def __call__(self, request: Request) -> StreamingResponse:
        max = request.query_params.get("max", "25")
        gen = self.generate_numbers(int(max))
        return StreamingResponse(gen, status_code=200, media_type="text/plain")


serve.run(StreamingResponder.bind())

r = requests.get("https://:8000?max=10", stream=True)
start = time.time()
r.raise_for_status()
for chunk in r.iter_content(chunk_size=None, decode_unicode=True):
    print(f"Got result {round(time.time()-start, 1)}s after start: '{chunk}'")

将此代码保存在 stream.py 中并运行它

$ python stream.py
[2023-05-25 10:44:23]  INFO ray._private.worker::Started a local Ray instance. View the dashboard at http://127.0.0.1:8265
(ServeController pid=40401) INFO 2023-05-25 10:44:25,296 controller 40401 deployment_state.py:1259 - Deploying new version of deployment default_StreamingResponder.
(ProxyActor pid=40403) INFO:     Started server process [40403]
(ServeController pid=40401) INFO 2023-05-25 10:44:25,333 controller 40401 deployment_state.py:1498 - Adding 1 replica to deployment default_StreamingResponder.
Got result 0.0s after start: '0'
Got result 0.1s after start: '1'
Got result 0.2s after start: '2'
Got result 0.3s after start: '3'
Got result 0.4s after start: '4'
Got result 0.5s after start: '5'
Got result 0.6s after start: '6'
Got result 0.7s after start: '7'
Got result 0.8s after start: '8'
Got result 0.9s after start: '9'
(ServeReplica:default_StreamingResponder pid=41052) INFO 2023-05-25 10:49:52,230 default_StreamingResponder default_StreamingResponder#qlZFCa yomKnJifNJ / default replica.py:634 - __CALL__ OK 1017.6ms

客户端断开连接时终止流#

在某些情况下,您可能希望在客户端断开连接且尚未返回完整流之前停止处理请求。如果您将异步生成器传递给 StreamingResponse,则在客户端断开连接时,它将被取消并引发 asyncio.CancelledError。请注意,您必须在生成器中的某个点 await,以便发生取消。

在下面的示例中,生成器将永远流式传输响应,直到客户端断开连接,然后它会打印出已被取消并退出。将此代码保存在 stream.py 中并运行它

import asyncio
import time
from typing import AsyncGenerator

import requests
from starlette.responses import StreamingResponse
from starlette.requests import Request

from ray import serve


@serve.deployment
class StreamingResponder:
    async def generate_forever(self) -> AsyncGenerator[str, None]:
        try:
            i = 0
            while True:
                yield str(i)
                i += 1
                await asyncio.sleep(0.1)
        except asyncio.CancelledError:
            print("Cancelled! Exiting.")

    def __call__(self, request: Request) -> StreamingResponse:
        gen = self.generate_forever()
        return StreamingResponse(gen, status_code=200, media_type="text/plain")


serve.run(StreamingResponder.bind())

r = requests.get("https://:8000?max=10", stream=True)
start = time.time()
r.raise_for_status()
for i, chunk in enumerate(r.iter_content(chunk_size=None, decode_unicode=True)):
    print(f"Got result {round(time.time()-start, 1)}s after start: '{chunk}'")
    if i == 10:
        print("Client disconnecting")
        break
$ python stream.py
[2023-07-10 16:08:41]  INFO ray._private.worker::Started a local Ray instance. View the dashboard at http://127.0.0.1:8265
(ServeController pid=50801) INFO 2023-07-10 16:08:42,296 controller 40401 deployment_state.py:1259 - Deploying new version of deployment default_StreamingResponder.
(ProxyActor pid=50803) INFO:     Started server process [50803]
(ServeController pid=50805) INFO 2023-07-10 16:08:42,963 controller 50805 deployment_state.py:1586 - Adding 1 replica to deployment default_StreamingResponder.
Got result 0.0s after start: '0'
Got result 0.1s after start: '1'
Got result 0.2s after start: '2'
Got result 0.3s after start: '3'
Got result 0.4s after start: '4'
Got result 0.5s after start: '5'
Got result 0.6s after start: '6'
Got result 0.7s after start: '7'
Got result 0.8s after start: '8'
Got result 0.9s after start: '9'
Got result 1.0s after start: '10'
Client disconnecting
(ServeReplica:default_StreamingResponder pid=50842) Cancelled! Exiting.
(ServeReplica:default_StreamingResponder pid=50842) INFO 2023-07-10 16:08:45,756 default_StreamingResponder default_StreamingResponder#cmpnmF ahteNDQSWx / default replica.py:691 - __CALL__ OK 1019.1ms

设置保活超时#

Serve 在内部使用 Uvicorn HTTP 服务器来处理 HTTP 请求。默认情况下,Uvicorn 在请求之间将 HTTP 连接保持打开状态 5 秒。通过在 Serve 配置文件中的 http_options 字段设置 keep_alive_timeout_s 来修改保活超时。此配置对您的 Ray 集群是全局的,您无法在运行时更新它。您也可以设置 RAY_SERVE_HTTP_KEEP_ALIVE_TIMEOUT_S 环境变量来设置保活超时。如果同时设置了 RAY_SERVE_HTTP_KEEP_ALIVE_TIMEOUT_Skeep_alive_timeout_s,则 RAY_SERVE_HTTP_KEEP_ALIVE_TIMEOUT_S 具有更高的优先级。有关更多信息,请参阅 Uvicorn 的保活超时 指南