Ray Serve 中的 asyncio 和并发最佳实践#

在 Ray Serve 部署中,每个副本内运行的代码都运行在 asyncio 事件循环上。Asyncio 能够实现高效的 I/O 密集型并发,但需要遵循一些最佳实践才能获得最佳性能。

本指南将解释

  • Ray Serve 中何时使用 async def 而不是 def

  • Ray Serve 如何执行您的代码(循环、线程和路由器)。

  • max_ongoing_requests 如何与 asyncio 并发交互。

  • 如何看待 Python 的 GIL、原生代码和真正的并行性。

除非另有说明,否则示例假定以下导入

from ray import serve
import asyncio

如何选择 async defdef#

使用此决策表作为起点

工作负载类型

推荐的处理器

原因

I/O 密集型(数据库、HTTP 调用、队列)

async def

允许事件循环同时处理许多请求,而每个请求都在等待 I/O。

CPU 密集型(模型推理、重数值计算)

defasync def 配合 offload

Async 本身并不能让 CPU 工作更快。您需要更多的副本、线程或原生并行性。

流式响应

async def 生成器

与反压和非阻塞迭代集成。

FastAPI 入口 (@serve.ingress)

defasync def

FastAPI 在线程池中运行 def 端点,因此它们不会阻塞事件循环。

Ray Serve 如何执行您的代码#

从高层次来看,请求通过路由器到达运行您的代码的副本 actor。

Client
   ↓
Serve router (asyncio loop A)
   ↓
Replica actor
   ├─ System / control loop
   └─ User code loop (your handlers)
       └─ Optional threadpool for sync methods

以下是决定使用 async def 还是 def 时需要考虑的关键点。

  • Serve 使用 asyncio 事件循环进行路由和运行副本。

  • 默认情况下,用户代码运行在与副本的主/控制循环不同的事件循环上,因此阻塞的用户代码不会干扰健康检查和自动扩展。

  • 根据 RAY_SERVE_RUN_SYNC_IN_THREADPOOL 的值,def 处理器可能直接运行在用户事件循环上(阻塞),或在线程池中运行(对事件循环非阻塞)。

纯 Serve 部署(无 FastAPI 入口)#

对于简单部署

@serve.deployment
class Echo:
    async def __call__(self, request):
        await asyncio.sleep(0.1)
        return "ok"
  • async def __call__ 直接在副本的用户事件循环上运行。

  • 当此处理器等待 asyncio.sleep 时,事件循环可以继续处理其他请求。

对于同步部署

@serve.deployment
class BlockingEcho:
    def __call__(self, request):
        # Blocking.
        import time
        time.sleep(1)
        return "ok"

此方法的执行方式取决于配置。

  • 使用 RAY_SERVE_RUN_SYNC_IN_THREADPOOL=0(当前默认值)时,__call__ 直接在用户事件循环上运行并阻塞它 1 秒钟。

  • 使用 RAY_SERVE_RUN_SYNC_IN_THREADPOOL=1 时,Serve 将 __call__ 卸载到线程池,因此事件循环保持响应。

FastAPI 入口 (@serve.ingress)#

当您使用 FastAPI 入口时,FastAPI 会控制端点如何运行。

from fastapi import FastAPI

app = FastAPI()

@serve.deployment
@serve.ingress(app)
class FastAPIDeployment:
    @app.get("/sync")
    def sync_endpoint(self):
        # FastAPI runs this in a threadpool.
        import time
        time.sleep(1)
        return "ok"

    @app.get("/async")
    async def async_endpoint(self):
        # Runs directly on FastAPI's asyncio loop.
        await asyncio.sleep(1)
        return "ok"

重要区别

  • FastAPI 始终将 def 端点分派到线程池。

  • 在纯 Serve 中,def 方法在事件循环上运行,除非您选择线程池行为。

实际中的阻塞与非阻塞#

阻塞代码会阻止事件循环处理其他工作。非阻塞代码在等待某事物时将控制权交还给事件循环。

阻塞 I/O 与异步 I/O#

阻塞 I/O 示例

@serve.deployment
class BlockingHTTP:
    async def __call__(self, request):
        # ❌ This blocks the event loop until the HTTP call finishes.
        import requests
        resp = requests.get("https://example.com/")
        return resp.text

尽管该方法是 async def,但 requests.get 会阻塞事件循环。在此请求调用期间,该副本无法运行其他请求。在 async def 中的阻塞仍然是阻塞。

使用异步 HTTP 客户端的非阻塞等效项

@serve.deployment
class AsyncHTTP:
    async def __call__(self, request):
        import httpx

        async with httpx.AsyncClient() as client:
            resp = await client.get("https://example.com/")
        return resp.text

使用线程池的非阻塞等效项

@serve.deployment
class ThreadedHTTP:
    async def __call__(self, request):
        import requests

        def fetch():
            return requests.get("https://example.com/").text

        # ✅ Offload blocking I/O to a worker thread.
        return await asyncio.to_thread(fetch)

并发不等于 Python 中的并行性#

人们通常期望 async 代码能“利用所有核心”或使 CPU 密集型代码运行更快。asyncio 并不能做到这一点。

并发:处理许多等待操作#

Asyncio 为 I/O 密集型工作负载提供了**并发**。

  • 当一个请求等待数据库时,另一个请求可以等待 HTTP 调用。

  • 处理器在每个 await 处将控制权交还给事件循环。

这对于主要等待外部系统的高吞吐量 API 是理想的。

并行性:利用多个 CPU 核心#

真正的 CPU 并行性通常来自

  • 多个进程(例如,多个 Serve 副本)。

  • 释放 GIL 并跨核心运行的原生代码。

Python 的 GIL 意味着,即使您使用线程池,纯 Python 字节码在一个进程中一次也只能运行一个线程。

使用释放 GIL 的原生代码#

许多数值和 ML 库在执行重型原生代码工作时会释放 GIL。

  • NumPy、许多线性代数例程。

  • PyTorch 和一些其他深度学习框架。

  • 一些图像处理或压缩库。

在这些情况下,您仍然可以从单个副本进程内的线程中获得有用的并行性。

@serve.deployment
class NumpyDeployment:
    def _heavy_numpy(self, array):
        import numpy as np
        # Many NumPy ops release the GIL while executing C/Fortran code.
        return np.linalg.svd(array)[0]

    async def __call__(self, request):
        import numpy as np
        # Create a sample array from request data
        array = np.random.rand(100, 100)
        # ✅ Multiple threads can run _heavy_numpy in parallel if
        # the underlying implementation releases the GIL.
        return await asyncio.to_thread(self._heavy_numpy, array)

但是

  • 释放 GIL 的行为是库特定的,有时也是操作特定的。

  • 一些库使用自己的内部线程池;将它们与您自己的线程池结合使用可能会导致 CPU 过载。

  • 您应该验证您的模型堆栈是否是线程安全的,然后再依赖这种形式的并行性。

为了可预测的 CPU 扩展,增加副本数量通常更简单。

总结#

  • async def 提高了 I/O 密集型代码的**并发性**。

  • CPU 密集型代码仅仅因为它 async 就不会运行得更快。

  • 并行 CPU 扩展主要来自**更多的进程**(副本或任务),以及在某些情况下,释放 GIL 的原生代码。

max_ongoing_requests 和副本并发的工作原理#

每个部署都有一个 max_ongoing_requests 配置,用于控制副本一次处理多少个进行中的请求。

@serve.deployment(max_ongoing_requests=32)
class MyService:
    async def __call__(self, request):
        await asyncio.sleep(1)
        return "ok"

要点

  • Ray Serve 使用内部信号量将每个副本的并发进行中请求限制为 max_ongoing_requests

  • 超出此限制的请求会在路由器中排队,或者根据配置等待容量可用,或者因反压而失败。

max_ongoing_requests 的有用性取决于您的处理器行为。

async 处理器和 max_ongoing_requests#

对于花费大部分时间等待 I/O 的 async def 处理器,max_ongoing_requests 直接控制并发。

@serve.deployment(max_ongoing_requests=100)
class AsyncIOBound:
    async def __call__(self, request):
        # Mostly waiting on an external system.
        await asyncio.sleep(0.1)
        return "ok"
  • 每个副本最多可以有 100 个请求正在进行中。

  • 当一个请求等待时,事件循环可以处理其他请求。

阻塞的 def 处理器和 max_ongoing_requests#

对于在事件循环上运行的阻塞 def 处理器(禁用线程池),max_ongoing_requests 并不能提供您期望的并发。

@serve.deployment(max_ongoing_requests=100)
class BlockingCPU:
    def __call__(self, request):
        # ❌ Blocks the user event loop.
        import time
        time.sleep(1)
        return "ok"

在这种情况下

  • 事件循环一次只能运行一个处理器。

  • 尽管 max_ongoing_requests=100,但副本实际上是串行处理请求的。

如果您启用了 sync-in-threadpool 行为(参见下一节),则每个进行中的请求都可以在线程中运行。

@serve.deployment(max_ongoing_requests=100)
class CPUWithThreadpool:
    def __call__(self, request):
        # With RAY_SERVE_RUN_SYNC_IN_THREADPOOL=1, each call runs in a thread.
        import time
        time.sleep(1)
        return "ok"

现在

  • 最多可以同时运行 max_ongoing_requests 个调用。

  • 实际吞吐量取决于

    • 线程池使用多少线程。

    • 您的工作负载是 CPU 密集型还是释放 GIL 的。

    • 底层原生库和系统资源。

对于重度 CPU 密集型工作负载,通常最好

  • 保持 max_ongoing_requests 适度(避免排队太多重任务),并且

  • 扩展**副本**(num_replicas),而不是将单个副本的并发推得太高。

环境变量和 sync-in-threadpool 警告#

Ray Serve 公开了一些环境变量,用于控制用户代码如何与事件循环和线程进行交互。

RAY_SERVE_RUN_SYNC_IN_THREADPOOL#

默认情况下(RAY_SERVE_RUN_SYNC_IN_THREADPOOL=0),这意味着部署中的同步方法直接在用户事件循环上运行。为了帮助您迁移到更安全模型,Serve 会发出类似以下内容的警告:

RAY_SERVE_RUN_SYNC_IN_THREADPOOL_WARNING: Calling sync method ‘…’ directly on the asyncio loop. In a future version, sync methods will be run in a threadpool by default…

此警告意味着

  • 您有一个 def 方法目前正在事件循环上运行。

  • 在将来的版本中,该方法将在线程池中运行。

您可以通过设置来选择加入将来的行为:

export RAY_SERVE_RUN_SYNC_IN_THREADPOOL=1

当此标志为 1

  • Serve 在线程池中运行同步方法。

  • 事件循环可以自由地继续为其他请求提供服务,同时同步方法正在运行。

在将此标志投入生产之前,请确保

  • 您的处理器代码和任何共享状态都是线程安全的。

  • 您的模型对象可以安全地从多个线程使用,或者您使用锁来保护它们。

RAY_SERVE_RUN_USER_CODE_IN_SEPARATE_THREAD#

默认情况下,Serve 在与副本的主/控制循环不同的事件循环中运行用户代码。

export RAY_SERVE_RUN_USER_CODE_IN_SEPARATE_THREAD=1  # default

这种隔离

  • 保护系统任务(健康检查、控制器通信)免受用户代码阻塞。

  • 会增加跨循环通信的开销,导致请求延迟增加。有关吞吐量优化的配置,请参阅 高吞吐量优化

您可以禁用此行为

export RAY_SERVE_RUN_USER_CODE_IN_SEPARATE_THREAD=0

只有高级用户才应更改此设置。当用户代码和系统任务共享一个事件循环时,用户代码中的任何阻塞操作都可能干扰副本健康和控制平面操作。

RAY_SERVE_RUN_ROUTER_IN_SEPARATE_LOOP#

Serve 的请求路由器也默认在自己的事件循环中运行。

export RAY_SERVE_RUN_ROUTER_IN_SEPARATE_LOOP=1  # default

这确保了

  • 即使某些副本正在运行缓慢的用户代码,路由器也可以继续路由和负载均衡请求。

禁用此设置

export RAY_SERVE_RUN_ROUTER_IN_SEPARATE_LOOP=0

将使路由器与其他工作共享事件循环。这可以在高级、高度优化的场景中减少开销,但会使系统对阻塞操作更敏感。请参阅 高吞吐量优化

对于大多数生产部署,您应该保留两个单独事件循环标志的默认值(1)。

批处理和流式语义#

批处理和流式处理都依赖事件循环来保持响应。它们不会改变您的代码运行位置:批处理处理器和流式处理器仍然在与其他处理器相同的用户事件循环上运行。这意味着,如果您在阻塞代码之上添加批处理或流式处理,则可能会使事件循环阻塞的影响变得更糟。

批处理#

当您启用批处理时,Serve 会将多个传入请求分组,并将它们作为列表传递给您的处理器。处理器仍然在用户事件循环上运行,但每次调用现在同时处理多个请求,而不是只处理一个。如果该批处理工作是阻塞的,它会同时阻塞事件循环处理所有这些请求。

以下示例显示了一个批处理部署。


@serve.deployment(max_ongoing_requests=64)
class BatchedModel:
    @serve.batch(max_batch_size=32)
    async def __call__(self, requests):
        # requests is a list of request objects.
        inputs = [r for r in requests]
        outputs = await self._run_model(inputs)
        return outputs
    
    async def _run_model(self, inputs):
        # Placeholder model function
        return [f"result_{i}" for i in inputs]

批处理处理器在用户事件循环上运行。

  • 如果 _run_model 是 CPU 密集型的并且内联运行,它会在批处理期间阻塞事件循环。

  • 您可以卸载批处理计算。

@serve.deployment(max_ongoing_requests=64)
class BatchedModelOffload:
    @serve.batch(max_batch_size=32)
    async def __call__(self, requests):
        # requests is a list of request objects.
        inputs = [r for r in requests]
        outputs = await self._run_model(inputs)
        return outputs

    async def _run_model(self, inputs):
        def run_sync():
            # Heavy CPU or GIL-releasing native code here.
            # Placeholder model function
            return [f"result_{i}" for i in inputs]

        loop = asyncio.get_running_loop()
        return await loop.run_in_executor(None, run_sync)

这可以保持事件循环响应,同时模型在线程中运行。

max_concurrent_batches 和事件循环的交出#

@serve.batch 装饰器接受一个 max_concurrent_batches 参数,该参数控制可以同时处理多少批。但是,只有当您的批处理处理器在处理过程中将控制权交还给事件循环时,此参数才能有效。

如果您的批处理处理器阻塞事件循环(例如,通过执行繁重的 CPU 工作而没有等待或卸载),max_concurrent_batches 将无法提供您期望的并发。只有当前批次交出控制权时,事件循环才能开始处理新批次。

要获得 max_concurrent_batches 的好处

  • 将您的批处理处理器使用 async def,并 await I/O 操作或卸载的 CPU 工作。

  • 使用 asyncio.to_thread()loop.run_in_executor() 将 CPU 密集型批处理卸载到线程池。

  • 避免阻塞操作,这些操作会阻止事件循环调度其他批次。

在上面卸载的批处理示例中,处理器在等待线程池执行程序时会交出对事件循环的控制,这使得多个批次可以同时进行(最多 max_concurrent_batches 的限制)。

流式处理#

流式处理与常规响应不同,因为客户端在您的处理器仍在运行时开始接收数据。Serve 调用您的处理器一次,返回一个生成器或异步生成器,然后反复要求它提供下一个块。该生成器代码仍在用户事件循环上运行(或在工作线程中运行,如果您卸载了它)。

流式处理对阻塞特别敏感。

  • 如果您在块之间阻塞,您将延迟将下一部分数据发送给客户端。

  • 当生成器在事件循环上阻塞时,该事件循环上的其他请求无法进行。

  • 系统也无法快速响应慢速客户端(反压)或取消。

错误的流式处理示例

@serve.deployment
class BlockingStream:
    def __call__(self, request):
        # ❌ Blocks the event loop between yields.
        import time
        for i in range(10):
            time.sleep(1)
            yield f"{i}\n"

更好的流式处理示例

@serve.deployment
class AsyncStream:
    async def __call__(self, request):
        # ✅ Yields items without blocking the loop.
        async def generator():
            for i in range(10):
                await asyncio.sleep(1)
                yield f"{i}\n"

        return generator()

在流式处理场景中

  • 优先使用 async def 生成器,并在 yield 之间使用 await

  • 避免在 yield 之间进行长时间的 CPU 密集型循环;如有必要,请卸载它们。

卸载模式:I/O、CPU#

本节总结了您可以在 async 处理器中使用的常见卸载模式。

async def 中的阻塞 I/O#

@serve.deployment
class OffloadIO:
    async def __call__(self, request):
        import requests

        def fetch():
            return requests.get("https://example.com/").text

        # Offload to a thread, free the event loop.
        body = await asyncio.to_thread(fetch)
        return body

async def 中的 CPU 密集型代码#

@serve.deployment
class OffloadCPU:
    def _compute(self, x):
        # CPU-intensive work.
        total = 0
        for i in range(10_000_000):
            total += (i * x) % 7
        return total

    async def __call__(self, request):
        x = 123
        loop = asyncio.get_running_loop()
        result = await loop.run_in_executor(None, self._compute, x)
        return str(result)

(高级) 使用 Ray 任务或远程 actor 实现真正的并行性#

注意

虽然您可以从 Ray Serve 部署中生成 Ray 任务,但不推荐这种方法,因为它缺乏可观测性和调试工具。

import ray

@ray.remote
def heavy_task(x):
    # Heavy compute runs in its own worker process.
    return x * x


@serve.deployment
class RayParallel:
    async def __call__(self, request):
        values = [1, 2, 3, 4]
        refs = [heavy_task.remote(v) for v in values]
        results = await asyncio.gather(*[r for r in refs])
        return {"results": results}

此模式

  • 使用多个 Ray 工作进程和进程。

  • 绕过了单个 Python 进程的 GIL 限制。

总结#

  • 对于 I/O 密集型和流式处理工作,请使用 async def,以便事件循环保持响应。

  • 使用 max_ongoing_requests 来限制每个副本的并发,但请记住,如果在事件循环上运行,阻塞的 def 处理器仍然可以串行化工作。

  • 一旦您的代码是线程安全的,请考虑启用 RAY_SERVE_RUN_SYNC_IN_THREADPOOL,并注意 sync-in-threadpool 警告。

  • 对于 CPU 密集型工作负载,请扩展副本或使用释放 GIL 的原生代码来实现真正的并行性。