异步推理#

本指南展示了如何在 Ray Serve 中使用后台任务处理异步运行长时间运行的推理。通过异步任务,您的 HTTP API 在系统后台执行工作时将保持响应。

为什么选择异步推理?#

Ray Serve 的客户需要一种方法来异步处理长时间运行的 API 请求。一些推理工作负载(例如视频处理或大型文档索引)比典型的 HTTP 超时时间更长,因此当用户提交此类请求时,系统应将工作放入后台队列以供稍后处理,并立即返回快速响应。这在任务异步执行的同时,将请求生命周期与计算时间解耦,同时仍能利用 Serve 的可伸缩性。

用例#

常见用例包括视频推理(例如,转码、检测和对长视频进行转录)以及文档索引管道,这些管道会摄取、解析和向量化大型文件或批处理。更广泛地说,任何不需要即时结果的长时运行 AI/ML 工作负载都可以从异步运行中受益。

核心概念#

  • @task_consumer:一个消费和执行队列中任务的 Serve 部署。需要 TaskProcessorConfig 参数来配置任务处理器;默认情况下,它使用 Celery 任务处理器,但您可以提供自己的实现。

  • @task_handler:应用于 @task_consumer 类中方法的装饰器。每个处理程序通过 name=... 声明它处理的任务;如果省略 name,则使用方法的函数名作为任务名。消费者配置的队列(通过上面的 TaskProcessorConfig 设置)中具有该名称的所有任务都将被路由到此方法以执行。

组件和 API#

以下各节将介绍异步推理的核心 API,并提供最小的示例以帮助您入门。

TaskProcessorConfig#

配置任务处理器,包括队列名称、适配器(默认为 Celery)、适配器配置、重试次数限制和死信队列。以下示例展示了如何配置任务处理器

from ray.serve.schema import TaskProcessorConfig, CeleryAdapterConfig

processor_config = TaskProcessorConfig(
    queue_name="my_queue",
    # Optional: Override default adapter string (default is Celery)
    # adapter="ray.serve.task_processor.CeleryTaskProcessorAdapter",
    adapter_config=CeleryAdapterConfig(
        broker_url="redis://:6379/0",     # Or "filesystem://" for local testing
        backend_url="redis://:6379/1",    # Result backend (optional for fire-and-forget)
    ),
    max_retries=5,
    failed_task_queue_name="failed_tasks",              # Application errors after retries
)

注意

文件系统代理仅用于本地测试,功能有限。例如,它不支持 cancel_tasks。对于生产部署,请使用生产就绪的代理,例如 Redis 或 RabbitMQ。有关支持的代理的完整列表,请参阅 Celery 代理文档

@task_consumer#

使用提供的 TaskProcessorConfig 将 Serve 部署转变为任务消费者的装饰器。以下代码创建了一个任务消费者

from ray import serve
from ray.serve.task_consumer import task_consumer

@serve.deployment
@task_consumer(task_processor_config=processor_config)
class SimpleConsumer:
    pass

@task_handler#

将消费者上的方法注册为命名任务处理程序的装饰器。以下示例展示了如何定义任务处理程序

from ray.serve.task_consumer import task_handler, task_consumer

@serve.deployment
@task_consumer(task_processor_config=processor_config)
class SimpleConsumer:
    @task_handler(name="process_request")
    def process_request(self, data):
        return f"processed: {data}"

注意

Ray Serve 目前仅支持同步处理程序。声明 async def 处理程序将引发 NotImplementedError

instantiate_adapter_from_config#

从给定的 TaskProcessorConfig 返回任务处理器适配器实例的工厂函数。您可以使用返回的对象来入队任务、获取状态、检索指标等。以下示例演示了如何创建适配器并入队任务

from ray.serve.task_consumer import instantiate_adapter_from_config

adapter = instantiate_adapter_from_config(task_processor_config=processor_config)
# Enqueue synchronously (returns TaskResult)
result = adapter.enqueue_task_sync(task_name="process_request", args=["hello"])
# Later, fetch status synchronously
status = adapter.get_task_status_sync(result.id)

注意

@serve.deployment 装饰器中指定的 Ray actor 选项(如 num_gpusnum_cpusresources 等)将应用于任务消费者副本。这允许您为任务处理工作负载分配特定的硬件资源。

端到端示例:文档索引#

此示例展示了如何配置处理器、构建带有处理程序的消费者、从入口点部署入队任务以及检查任务状态。

import io
import logging
import requests
from fastapi import FastAPI
from pydantic import BaseModel, HttpUrl
from PyPDF2 import PdfReader
from ray import serve
from ray.serve.schema import CeleryAdapterConfig, TaskProcessorConfig
from ray.serve.task_consumer import (
    instantiate_adapter_from_config,
    task_consumer,
    task_handler,
)

logger = logging.getLogger("ray.serve")
fastapi_app = FastAPI(title="Async PDF Processing API")

TASK_PROCESSOR_CONFIG = TaskProcessorConfig(
    queue_name="pdf_processing_queue",
    adapter_config=CeleryAdapterConfig(
        broker_url="redis://127.0.0.1:6379/0",
        backend_url="redis://127.0.0.1:6379/0",
    ),
    max_retries=3,
    failed_task_queue_name="failed_pdfs",
)

class ProcessPDFRequest(BaseModel):
    pdf_url: HttpUrl
    max_summary_paragraphs: int = 3


@serve.deployment(num_replicas=2, max_ongoing_requests=5)
@task_consumer(task_processor_config=TASK_PROCESSOR_CONFIG)
class PDFProcessor:
    """Background worker that processes PDF documents asynchronously."""

    @task_handler(name="process_pdf")
    def process_pdf(self, pdf_url: str, max_summary_paragraphs: int = 3):
        """Download PDF, extract text, and generate summary."""
        try:
            response = requests.get(pdf_url, timeout=30)
            response.raise_for_status()

            pdf_reader = PdfReader(io.BytesIO(response.content))
            if not pdf_reader.pages:
                raise ValueError("PDF contains no pages")

            full_text = "\n".join(
                page.extract_text() for page in pdf_reader.pages if page.extract_text()
            )
            if not full_text.strip():
                raise ValueError("PDF contains no extractable text")

            paragraphs = [p.strip() for p in full_text.split("\n\n") if p.strip()]
            summary = "\n\n".join(paragraphs[:max_summary_paragraphs])

            return {
                "status": "success",
                "pdf_url": pdf_url,
                "page_count": len(pdf_reader.pages),
                "word_count": len(full_text.split()),
                "summary": summary,
            }
        except requests.exceptions.RequestException as e:
            raise ValueError(f"Failed to download PDF: {str(e)}")
        except Exception as e:
            raise ValueError(f"Failed to process PDF: {str(e)}")


@serve.deployment()
@serve.ingress(fastapi_app)
class AsyncPDFAPI:
    """HTTP API for submitting and checking PDF processing tasks."""

    def __init__(self, task_processor_config: TaskProcessorConfig, handler):
        self.adapter = instantiate_adapter_from_config(task_processor_config)

    @fastapi_app.post("/process")
    async def process_pdf(self, request: ProcessPDFRequest):
        """Submit a PDF processing task and return task_id immediately."""
        task_result = self.adapter.enqueue_task_sync(
            task_name="process_pdf",
            kwargs={
                "pdf_url": str(request.pdf_url),
                "max_summary_paragraphs": request.max_summary_paragraphs,
            },
        )
        return {
            "task_id": task_result.id,
            "status": task_result.status,
            "message": "PDF processing task submitted successfully",
        }

    @fastapi_app.get("/status/{task_id}")
    async def get_status(self, task_id: str):
        """Get task status and results."""
        status = self.adapter.get_task_status_sync(task_id)
        return {
            "task_id": task_id,
            "status": status.status,
            "result": status.result if status.status == "SUCCESS" else None,
            "error": str(status.result) if status.status == "FAILURE" else None,
        }

app = AsyncPDFAPI.bind(TASK_PROCESSOR_CONFIG, PDFProcessor.bind())

在此示例中

  • DocumentIndexingConsumerdocument_indexing_queue 队列读取任务并处理它们。

  • API 通过 enqueue_task_sync 入队任务,并通过 get_task_status_sync 获取状态。

  • consumer 传递到 API.__init__ 中,可确保两个部署都成为 Serve 应用程序图的一部分。

并发性和可靠性#

通过在消费者部署上设置 max_ongoing_requests 来管理并发,此设置会限制每个副本可以同时处理的任务数量。对于至少一次传递,适配器应仅在处理程序成功完成后才确认任务。失败的任务最多重试 max_retries 次;重试次数用尽后,如果已配置,它们将被路由到失败任务的 DLQ。默认的 Celery 适配器在成功时确认,提供至少一次处理。

死信队列(DLQs)#

死信队列处理两种类型的问题任务

  • 无法处理的任务:系统将没有匹配处理程序的任务路由到 unprocessable_task_queue_name(如果已设置)。

  • 失败的任务:系统将经过重试用尽后引发应用程序异常的任务、参数不匹配的任务以及其他错误的任务路由到 failed_task_queue_name(如果已设置)。

滚动更新和兼容性#

在部署升级期间,旧的消费者副本和新的消费者副本可能会同时运行并从同一队列中拉取任务。如果任务模式或名称发生更改,任一版本都可能遇到不兼容的任务。

建议

  • 对任务名称和有效负载进行版本化,以便跨版本共存。

  • 在耗尽旧任务之前,不要删除处理程序

  • 监控 DLQs 以处理反序列化或处理程序解析失败,并根据需要重新入队或转换。

限制#

  • Ray Serve 仅支持同步的 @task_handler 方法。

  • 外部(非 Serve)工作器超出范围;所有消费者都作为 Serve 部署运行。

  • 传递保证最终取决于配置的代理。当您未配置结果后端时,结果是可选的。

注意

本指南中的 API 反映了 ray.serve.schemaray.serve.task_consumer 中的 Alpha 接口。