核心组件#
本指南将详细介绍 Ray Serve LLM 核心组件的技术实现。您将了解实现可扩展性和模块化的抽象、协议和模式。
核心抽象#
除了 LLMServer 和 OpenAiIngress 之外,Ray Serve LLM 还定义了几个核心抽象,以实现可扩展性和模块化。
LLMEngine 协议#
抽象基类 LLMEngine 定义了所有推理引擎的契约。此抽象允许 Ray Serve LLM 支持多种引擎实现(vLLM、SGLang、TensorRT-LLM 等),并提供一致的接口。
该引擎在 **OpenAI API 级别** 上运行,而不是在原始提示级别上运行。这意味着:
它接受 OpenAI 格式的请求(
ChatCompletionRequest、CompletionRequest等)。它返回 OpenAI 格式的响应。
引擎特定的细节(如分词、采样)都隐藏在该接口之后。
关键方法#
class LLMEngine(ABC):
"""Base protocol for all LLM engines."""
@abstractmethod
async def chat(
self,
request: ChatCompletionRequest
) -> AsyncGenerator[Union[str, ChatCompletionResponse, ErrorResponse], None]:
"""Run a chat completion.
Yields:
- Streaming: yield "data: <json>\\n\\n" for each chunk.
- Non-streaming: yield single ChatCompletionResponse.
- Error: yield ErrorResponse.
- In all cases, it's still a generator to unify the upper-level logic.
"""
@abstractmethod
async def completions(
self,
request: CompletionRequest
) -> AsyncGenerator[Union[str, CompletionResponse, ErrorResponse], None]:
"""Run a text completion."""
@abstractmethod
async def embeddings(
self,
request: EmbeddingRequest
) -> AsyncGenerator[Union[EmbeddingResponse, ErrorResponse], None]:
"""Generate embeddings."""
@abstractmethod
async def start(self):
"""Start the engine (async initialization)."""
@abstractmethod
async def check_health(self) -> bool:
"""Check if engine is healthy."""
@abstractmethod
async def shutdown(self):
"""Gracefully shutdown the engine."""
引擎实现#
Ray Serve LLM 提供:
VLLMEngine:使用 vLLM 的生产就绪实现。
支持连续批处理和分页注意力。
支持所有类型的并行。
KV 缓存传输,用于预填充-解码分离。
自动前缀缓存 (APC)。
LoRA 适配器支持。
未来的实现可能包括:
TensorRT-LLM:NVIDIA 的优化推理引擎。
SGLang:使用 RadixAttention 进行快速服务。
Ray Serve LLM 与 vLLM 深度集成,因为它在引擎中提供了端到端的 Ray 支持,这在 worker 的精细放置和其他优化方面具有优势。引擎抽象使得在不更改核心服务逻辑的情况下添加新实现变得非常简单。
LLMConfig#
LLMConfig 是指定部署 LLM 所需一切信息的中心配置对象。
@dataclass
class LLMConfig:
"""Configuration for LLM deployment."""
# Model loading
model_loading_config: Union[dict, ModelLoadingConfig]
# Hardware requirements
accelerator_type: Optional[str] = None # For example, "A10G", "L4", "H100"
# Placement group configuration
placement_group_config: Optional[dict] = None
# Engine-specific arguments
engine_kwargs: Optional[dict] = None
# Ray Serve deployment configuration
deployment_config: Optional[dict] = None
# LoRA adapter configuration
lora_config: Optional[Union[dict, LoraConfig]] = None
# Runtime environment (env vars, pip packages)
runtime_env: Optional[dict] = None
模型加载配置#
ModelLoadingConfig 指定了模型加载的位置和方式。以下代码展示了配置结构:
@dataclass
class ModelLoadingConfig:
"""Configuration for model loading."""
# Model identifier (used for API requests)
model_id: str
# Model source (HuggingFace or cloud storage)
model_source: Union[str, dict]
# Examples:
# - "Qwen/Qwen2.5-7B-Instruct" (HuggingFace)
# - {"bucket_uri": "s3://my-bucket/models/qwen-7b"} (S3)
LoRA 配置#
以下代码展示了为共享基础模型服务多个 LoRA 适配器的配置结构:
@dataclass
class LoraConfig:
"""Configuration for LoRA multiplexing."""
# Path to LoRA weights (local or S3/GCS)
dynamic_lora_loading_path: Optional[str] = None
# Maximum number of adapters per replica
max_num_adapters_per_replica: int = 1
Ray Serve 的多路复用功能会自动将请求路由到已加载请求的 LoRA 适配器的副本,并使用 LRU 缓存进行适配器管理。
部署协议#
Ray Serve LLM 定义了两个关键协议,组件必须实现它们:
DeploymentProtocol#
所有部署的基础协议。
class DeploymentProtocol(Protocol):
"""Base protocol for Ray Serve LLM deployments."""
@classmethod
def get_deployment_options(cls, *args, **kwargs) -> dict:
"""Return Ray Serve deployment options.
Returns:
dict: Options including:
- placement_strategy: PlacementGroup configuration
- num_replicas: Initial replica count
- autoscaling_config: Autoscaling parameters
- ray_actor_options: Ray actor options
"""
此协议确保所有部署都能提供自己的放置、扩展和资源配置。
LLMServerProtocol#
LLM 服务器部署的扩展协议。
class LLMServerProtocol(DeploymentProtocol):
"""Protocol for LLM server deployments."""
@abstractmethod
async def chat(
self,
request: ChatCompletionRequest,
raw_request: Optional[Request] = None
) -> AsyncGenerator[Union[str, ChatCompletionResponse, ErrorResponse], None]:
"""Handle chat completion request."""
@abstractmethod
async def completions(
self,
request: CompletionRequest,
raw_request: Optional[Request] = None
) -> AsyncGenerator[Union[str, CompletionResponse, ErrorResponse], None]:
"""Handle text completion request."""
@abstractmethod
async def embeddings(
self,
request: EmbeddingRequest,
raw_request: Optional[Request] = None
) -> AsyncGenerator[Union[EmbeddingResponse, ErrorResponse], None]:
"""Handle embedding request."""
此协议确保所有 LLM 服务器实现(LLMServer、DPServer、PDProxyServer)提供一致的请求处理方法。
Builder 模式#
Ray Serve LLM 使用 Builder 模式将类定义与部署装饰分离。这提供了灵活性和可测试性。
关键原则:类不使用 @serve.deployment 进行装饰。装饰在 builder 函数中进行。
为什么使用 Builder?#
Builder 提供了两个主要优势:
灵活性:同一个类可以有不同的部署配置。
生产就绪:您可以在 YAML 文件中使用 Builder,并使用目标 Builder 模块运行
serve run config.yaml。
Builder 示例#
def my_build_function(
llm_config: LLMConfig,
) -> Deployment:
# Get default options from the class
serve_options = LLMServer.get_deployment_options(llm_config)
# Merge with user-provided options
serve_options.update(kwargs)
# Decorate and bind
return serve.deployment(deployment_cls).options(
**serve_options
).bind(llm_config)
您可以通过两种方式使用 Builder 函数:
# serve.py
from ray import serve
from ray.serve.llm import LLMConfig
from my_module import my_build_function
llm_config = LLMConfig(
model_loading_config=dict(
model_id="qwen-0.5b",
model_source="Qwen/Qwen2.5-0.5B-Instruct",
),
accelerator_type="A10G",
deployment_config=dict(
autoscaling_config=dict(
min_replicas=1,
max_replicas=2,
)
),
)
app = my_build_function(llm_config)
serve.run(app)
运行部署
python serve.py
# config.yaml
applications:
- args:
llm_config:
model_loading_config:
model_id: qwen-0.5b
model_source: Qwen/Qwen2.5-0.5B-Instruct
accelerator_type: A10G
deployment_config:
autoscaling_config:
min_replicas: 1
max_replicas: 2
import_path: my_module:my_build_function
name: custom_llm_deployment
route_prefix: /
运行部署
serve run config.yaml
异步构造函数模式#
LLMServer 使用异步构造函数来处理引擎初始化。此模式可确保在部署开始处理请求之前,引擎已完全启动。
class LLMServer(LLMServerProtocol):
"""LLM server deployment."""
async def __init__(self, llm_config: LLMConfig, **kwargs):
"""Async constructor - returns fully started instance.
Ray Serve calls this constructor when creating replicas.
By the time this returns, the engine is ready to serve.
"""
super().__init__()
self._init_shared(llm_config, **kwargs)
await self.start() # Start engine immediately
def _init_shared(self, llm_config: LLMConfig, **kwargs):
"""Shared initialization logic."""
self._llm_config = llm_config
self._engine_cls = self._get_engine_class()
# ... other initialization
async def start(self):
"""Start the underlying engine."""
self.engine = self._engine_cls(self._llm_config)
await asyncio.wait_for(
self._start_engine(),
timeout=600
)
@classmethod
def sync_init(cls, llm_config: LLMConfig, **kwargs) -> "LLMServer":
"""Sync constructor for testing.
Returns unstarted instance. Caller must call await start().
"""
instance = cls.__new__(cls)
LLMServerProtocol.__init__(instance)
instance._init_shared(llm_config, **kwargs)
return instance # Not started yet!
为什么使用异步构造函数?#
异步构造函数提供了几个好处:
引擎初始化是异步的:加载模型和分配 GPU 内存需要时间。
故障检测:如果引擎启动失败,副本将立即失败。
显式控制:清晰区分服务器何时准备就绪与初始化。
测试灵活性:
sync_init允许在不启动引擎的情况下进行测试。
组件关系#
下图展示了核心组件之间的关系:
┌─────────────────────────────────────────────────────────┐
│ RAY SERVE (Foundation) │
│ @serve.deployment | DeploymentHandle | Routing │
└────────────────────────┬────────────────────────────────┘
│
┌──────────────────┼──────────────────┐
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Protocol │ │ Ingress │ │ Config │
│ │ │ │ │ │
│ • Deploy │ │ • OpenAI │ │ • LLM │
│ Proto │ │ API │ │ Config │
│ • Server │ │ • Model │ │ • Model │
│ Proto │ │ Routing│ │ Loading│
└─────┬────┘ └────┬─────┘ └────┬─────┘
│ │ │
└────────┬───────┴────────────────────┘
│
▼
┌─────────────┐
│ LLMServer │
│ │
│ Implements: │
│ • Protocol │
│ │
│ Uses: │
│ • Config │
│ • Engine │
└──────┬──────┘
│
▼
┌─────────────┐
│ LLMEngine │
│ (Protocol) │
│ │
│ Implemented │
│ by: │
│ • VLLMEngine│
│ • Future... │
└─────────────┘
扩展点#
核心架构提供了几个扩展点:
自定义引擎#
实现 LLMEngine 协议以支持新的推理后端。
class MyCustomEngine(LLMEngine):
"""Custom engine implementation."""
async def chat(self, request):
# Your implementation
pass
# ... implement other methods
自定义服务器实现#
扩展 LLMServer 或直接实现 LLMServerProtocol。
class CustomLLMServer(LLMServer):
"""Custom server with additional features."""
async def chat(self, request, raw_request=None):
# Add custom preprocessing
modified_request = self.preprocess(request)
# Call parent implementation
async for chunk in super().chat(modified_request, raw_request):
yield chunk
自定义入口#
实现您自己的入口以支持自定义 API 格式。
from typing import List
from ray import serve
from ray.serve import DeploymentHandle
# Define your FastAPI app or Ray Serve application.
# For example: app = Application()
@serve.ingress(app)
class CustomIngress:
"""Custom ingress with non-OpenAI API."""
def __init__(self, server_handles: List[DeploymentHandle]):
self.handles = server_handles
@app.post("/custom/endpoint")
async def custom_endpoint(self, request: "CustomRequest"):
# CustomRequest is a user-defined request model.
# Your custom logic
pass
自定义 Builder#
为常见模式创建特定领域的 Builder。
def build_multimodal_deployment(
model_config: dict,
**kwargs
) -> Deployment:
"""Builder for multimodal models."""
llm_config = LLMConfig(
model_loading_config={
"input_modality": InputModality.MULTIMODAL,
**model_config
},
engine_kwargs={
"task": "multimodal",
}
)
return build_llm_deployment(llm_config, **kwargs)
这些扩展点允许您为特定用例自定义 Ray Serve LLM,而无需修改核心代码。