使用 LLM#

ray.data.llm 模块集成了关键的大型语言模型 (LLM) 推理引擎和已部署模型,以实现 LLM 的批量推理。

本指南将向您展示如何使用 ray.data.llm

快速入门:vLLM 批量推理#

只需几个步骤即可开始使用 vLLM 进行批量推理。本示例展示了在数据集上运行批量推理所需的最小设置。

注意

此快速入门教程需要 GPU,因为 vLLM 是 GPU 加速的。

首先,安装支持 LLM 的 Ray Data

pip install -U "ray[data, llm]>=2.49.1"

这是一个完整的最小示例,用于执行批量推理

import ray
from ray.data.llm import vLLMEngineProcessorConfig, build_processor

# Initialize Ray
ray.init()

# simple dataset
ds = ray.data.from_items([
    {"prompt": "What is machine learning?"},
    {"prompt": "Explain neural networks in one sentence."},
])

# Minimal vLLM configuration
config = vLLMEngineProcessorConfig(
    model_source="unsloth/Llama-3.1-8B-Instruct",
    concurrency=1,  # 1 vLLM engine replica
    batch_size=32,  # 32 samples per batch
    engine_kwargs={
        "max_model_len": 4096, # Fit into test GPU memory
    }
)

# Build processor
# preprocess: converts input row to format expected by vLLM (OpenAI chat format)
# postprocess: extracts generated text from vLLM output
processor = build_processor(
    config,
    preprocess=lambda row: {
        "messages": [{"role": "user", "content": row["prompt"]}],
        "sampling_params": {"temperature": 0.7, "max_tokens": 100},
    },
    postprocess=lambda row: {
        "prompt": row["prompt"],
        "response": row["generated_text"],
    },
)

# inference
ds = processor(ds)

# iterate through the results
for result in ds.iter_rows():
    print(f"Q: {result['prompt']}")
    print(f"A: {result['response']}\n")

# Alternative ways to get results:
# results = ds.take(10)  # Get first 10 results
# ds.show(limit=5)       # Print first 5 results
# ds.write_parquet("output.parquet")  # Save to file

本示例

  1. 创建一个包含提示的简单数据集

  2. 使用最少的设置配置 vLLM 处理器

  3. 构建一个处理器,该处理器负责预处理(将提示转换为 OpenAI 聊天格式)和后处理(提取生成的文本)

  4. 在数据集上运行推理

  5. 迭代结果

该处理器期望输入行具有 prompt 字段,并输出同时具有 promptresponse 字段的行。您可以使用 iter_rows()take()show() 来消费结果,或使用 write_parquet() 将结果保存到文件。

有关更多配置选项和高级功能,请参阅以下各节。

执行 LLM 批量推理#

总体而言,ray.data.llm 模块提供了一个 Processor 对象,该对象封装了在 Ray Data 数据集上使用 LLM 执行批量推理的逻辑。

您可以使用 build_processor API 来构建处理器。以下示例使用 vLLMEngineProcessorConfig 来构建 unsloth/Llama-3.1-8B-Instruct 模型的处理器。执行后,Processor 对象会实例化 vLLM 引擎的副本(在后台使用 map_batches)。

这是一个简单的配置示例

# Basic vLLM configuration
config = vLLMEngineProcessorConfig(
    model_source="unsloth/Llama-3.1-8B-Instruct",
    engine_kwargs={
        "enable_chunked_prefill": True,
        "max_num_batched_tokens": 4096,  # Reduce if CUDA OOM occurs
        "max_model_len": 4096,  # Constrain to fit test GPU memory
    },
    concurrency=1,
    batch_size=64,
)

该配置包含详细的注释,解释了

  • `concurrency`:vLLM 引擎副本的数量(通常每个节点一个)

  • `batch_size`:每个批次处理的样本数量(如果 GPU 内存有限,请减小)

  • `max_num_batched_tokens`:同时处理的最大 token 数(如果出现 CUDA OOM,请减小)

  • `accelerator_type`:指定 GPU 类型以获得最佳资源分配

vLLM 处理器期望输入采用 OpenAI 聊天格式,其中包含 'messages' 列,并输出一个包含模型响应的 'generated_text' 列。

某些模型可能需要指定 Hugging Face token。您可以在 runtime_env 参数中指定 token。

# Configuration with Hugging Face token
config_with_token = vLLMEngineProcessorConfig(
    model_source="unsloth/Llama-3.1-8B-Instruct",
    runtime_env={"env_vars": {"HF_TOKEN": "your_huggingface_token"}},
    concurrency=1,
    batch_size=64,
)

配置 vLLM 以进行 LLM 推理#

使用 vLLMEngineProcessorConfig 来配置 vLLM 引擎。

为了处理更大的模型,请指定模型并行

# Model parallelism configuration for larger models
# tensor_parallel_size=2: Split model across 2 GPUs for tensor parallelism
# pipeline_parallel_size=2: Use 2 pipeline stages (total 4 GPUs needed)
# Total GPUs required = tensor_parallel_size * pipeline_parallel_size = 4
config = vLLMEngineProcessorConfig(
    model_source="unsloth/Llama-3.1-8B-Instruct",
    engine_kwargs={
        "max_model_len": 16384,
        "tensor_parallel_size": 2,
        "pipeline_parallel_size": 2,
        "enable_chunked_prefill": True,
        "max_num_batched_tokens": 2048,
    },
    concurrency=1,
    batch_size=32,
    accelerator_type="L4",
)

底层 Processor 对象会实例化 vLLM 引擎的副本,并自动配置并行工作器来处理模型并行(如果指定了张量并行和流水线并行)。

为了优化模型加载,您可以将 load_format 配置为 runai_streamertensorizer

注意

在这种情况下,请安装带有 runai 依赖项的 vLLM:pip install -U "vllm[runai]>=0.10.1"

# RunAI streamer configuration for optimized model loading
# Note: Install vLLM with runai dependencies: pip install -U "vllm[runai]>=0.10.1"
config = vLLMEngineProcessorConfig(
    model_source="unsloth/Llama-3.1-8B-Instruct",
    engine_kwargs={
        "load_format": "runai_streamer",
        "max_model_len": 16384,
    },
    concurrency=1,
    batch_size=64,
)

如果您的模型托管在 AWS S3 上,您可以在 model_source 参数中指定 S3 路径,并在 engine_kwargs 参数中指定 load_format="runai_streamer"

# S3 hosted model configuration
s3_config = vLLMEngineProcessorConfig(
    model_source="s3://your-bucket/your-model-path/",
    engine_kwargs={
        "load_format": "runai_streamer",
        "max_model_len": 16384,
    },
    concurrency=1,
    batch_size=64,
)

要进行多 LoRA 批量推理,您需要在 engine_kwargs 中设置与 LoRA 相关的参数。有关详细信息,请参阅 vLLM 与 LoRA 示例

# Multi-LoRA configuration
config = vLLMEngineProcessorConfig(
    model_source="unsloth/Llama-3.1-8B-Instruct",
    engine_kwargs={
        "enable_lora": True,
        "max_lora_rank": 32,
        "max_loras": 1,
        "max_model_len": 16384,
    },
    concurrency=1,
    batch_size=32,
)

使用视觉语言模型 (VLM) 进行批量推理#

Ray Data LLM 还支持使用视觉语言模型进行批量推理。本示例展示了如何准备包含图像的数据集并使用视觉语言模型进行批量推理。

本示例在前一个示例的基础上进行了 2 项调整

  • vLLMEngineProcessorConfig 中设置 has_image=True

  • 在预处理器内部准备图像输入

首先,安装所需的依赖项

# Install required dependencies for vision-language models
pip install datasets>=4.0.0

首先,加载视觉数据集

    """
    Load vision dataset from Hugging Face.

    This function loads the LMMs-Eval-Lite dataset which contains:
    - Images with associated questions
    - Multiple choice answers
    - Various visual reasoning tasks
    """
    try:
        from huggingface_hub import HfFileSystem

        # Load "LMMs-Eval-Lite" dataset from Hugging Face using HfFileSystem
        path = "hf://datasets/lmms-lab/LMMs-Eval-Lite/coco2017_cap_val/"
        fs = HfFileSystem()
        vision_dataset = ray.data.read_parquet(path, filesystem=fs)

        return vision_dataset
    except ImportError:
        print(
            "huggingface_hub package not available. Install with: pip install huggingface_hub"
        )
        return None
    except Exception as e:
        print(f"Error loading dataset: {e}")
        return None


接下来,使用基本设置配置 VLM 处理器

vision_processor_config = vLLMEngineProcessorConfig(
    model_source="Qwen/Qwen2.5-VL-3B-Instruct",
    engine_kwargs=dict(
        tensor_parallel_size=1,
        pipeline_parallel_size=1,
        max_model_len=4096,
        enable_chunked_prefill=True,
        max_num_batched_tokens=2048,
    ),
    # Override Ray's runtime env to include the Hugging Face token. Ray Data uses Ray under the hood to orchestrate the inference pipeline.
    runtime_env=dict(
        env_vars=dict(
            # HF_TOKEN=HF_TOKEN, # Token not needed for public models
            VLLM_USE_V1="1",
        ),
    ),
    batch_size=16,
    accelerator_type="L4",
    concurrency=1,
    has_image=True,
)

定义预处理和后处理函数,将数据集行转换为 VLM 预期格式并提取模型响应

def vision_preprocess(row: dict) -> dict:
    """
    Preprocessing function for vision-language model inputs.

    Converts dataset rows into the format expected by the VLM:
    - System prompt for analysis instructions
    - User message with text and image content
    - Multiple choice formatting
    - Sampling parameters
    """
    choice_indices = ["A", "B", "C", "D", "E", "F", "G", "H"]

    return {
        "messages": [
            {
                "role": "system",
                "content": (
                    "Analyze the image and question carefully, using step-by-step reasoning. "
                    "First, describe any image provided in detail. Then, present your reasoning. "
                    "And finally your final answer in this format: Final Answer: <answer> "
                    "where <answer> is: The single correct letter choice A, B, C, D, E, F, etc. when options are provided. "
                    "Only include the letter. Your direct answer if no options are given, as a single phrase or number. "
                    "IMPORTANT: Remember, to end your answer with Final Answer: <answer>."
                ),
            },
            {
                "role": "user",
                "content": [
                    {"type": "text", "text": row["question"] + "\n\n"},
                    {
                        "type": "image",
                        # Ray Data accepts PIL Image or image URL
                        "image": Image.open(BytesIO(row["image"]["bytes"])),
                    },
                    {
                        "type": "text",
                        "text": "\n\nChoices:\n"
                        + "\n".join(
                            [
                                f"{choice_indices[i]}. {choice}"
                                for i, choice in enumerate(row["answer"])
                            ]
                        ),
                    },
                ],
            },
        ],
        "sampling_params": {
            "temperature": 0.3,
            "max_tokens": 150,
            "detokenize": False,
        },
        # Include original data for reference
        "original_data": {
            "question": row["question"],
            "answer_choices": row["answer"],
            "image_size": row["image"].get("width", 0) if row["image"] else 0,
        },
    }


def vision_postprocess(row: dict) -> dict:
    return {
        "resp": row["generated_text"],
    }


有关更全面的 VLM 配置和高级选项

    """Create VLM configuration."""
    return vLLMEngineProcessorConfig(
        model_source="Qwen/Qwen2.5-VL-3B-Instruct",
        engine_kwargs=dict(
            tensor_parallel_size=1,
            pipeline_parallel_size=1,
            max_model_len=4096,
            trust_remote_code=True,
            limit_mm_per_prompt={"image": 1},
        ),
        runtime_env={
            # "env_vars": {"HF_TOKEN": "your-hf-token-here"}  # Token not needed for public models
        },
        batch_size=1,
        accelerator_type="L4",
        concurrency=1,
        has_image=True,
    )


最后,运行 VLM 推理

    """Run the complete VLM example workflow."""
    config = create_vlm_config()
    vision_dataset = load_vision_dataset()

    if vision_dataset:
        # Build processor with preprocessing and postprocessing
        processor = build_processor(
            config, preprocess=vision_preprocess, postprocess=vision_postprocess
        )

        print("VLM processor configured successfully")
        print(f"Model: {config.model_source}")
        print(f"Has image support: {config.has_image}")
        result = processor(vision_dataset).take_all()
        return config, processor, result

使用嵌入模型进行批量推理#

Ray Data LLM 支持使用 vLLM 进行嵌入模型的批量推理。

    import ray
    from ray.data.llm import vLLMEngineProcessorConfig, build_processor

    embedding_config = vLLMEngineProcessorConfig(
        model_source="sentence-transformers/all-MiniLM-L6-v2",
        task_type="embed",
        engine_kwargs=dict(
            enable_prefix_caching=False,
            enable_chunked_prefill=False,
            max_model_len=256,
            enforce_eager=True,
        ),
        batch_size=32,
        concurrency=1,
        apply_chat_template=False,
        detokenize=False,
    )

    embedding_processor = build_processor(
        embedding_config,
        preprocess=lambda row: dict(prompt=row["text"]),
        postprocess=lambda row: {
            "text": row["prompt"],
            "embedding": row["embeddings"],
        },
    )

    texts = [
        "Hello world",
        "This is a test sentence",
        "Embedding models convert text to vectors",
    ]
    ds = ray.data.from_items([{"text": text} for text in texts])

    embedded_ds = embedding_processor(ds)
    embedded_ds.show(limit=1)
{'text': 'Hello world', 'embedding': [0.1, -0.2, 0.3, ...]}

嵌入模型的关键区别

  • 设置 task_type="embed"

  • 设置 apply_chat_template=Falsedetokenize=False

  • 使用直接的 prompt 输入而不是 messages

  • 通过 row["embeddings"] 访问嵌入

有关完整的嵌入配置示例,请参阅

# Embedding model configuration
embedding_config = vLLMEngineProcessorConfig(
    model_source="sentence-transformers/all-MiniLM-L6-v2",
    task_type="embed",
    engine_kwargs=dict(
        enable_prefix_caching=False,
        enable_chunked_prefill=False,
        max_model_len=256,
        enforce_eager=True,
    ),
    batch_size=32,
    concurrency=1,
    apply_chat_template=False,
    detokenize=False,
)

# Example usage for embeddings
def create_embedding_processor():
    return build_processor(
        embedding_config,
        preprocess=lambda row: dict(prompt=row["text"]),
        postprocess=lambda row: {
            "text": row["prompt"],
            "embedding": row["embeddings"],
        },
    )


使用兼容 OpenAI 的端点进行批量推理#

您还可以调用具有兼容 OpenAI API 端点的已部署模型。

    import ray

    OPENAI_KEY = os.environ["OPENAI_API_KEY"]
    ds = ray.data.from_items(["Hand me a haiku."])

    config = HttpRequestProcessorConfig(
        url="https://api.openai.com/v1/chat/completions",
        headers={"Authorization": f"Bearer {OPENAI_KEY}"},
        qps=1,
    )

    processor = build_processor(
        config,
        preprocess=lambda row: dict(
            payload=dict(
                model="gpt-4o-mini",
                messages=[
                    {
                        "role": "system",
                        "content": "You are a bot that responds with haikus.",
                    },
                    {"role": "user", "content": row["item"]},
                ],
                temperature=0.0,
                max_tokens=150,
            ),
        ),
        postprocess=lambda row: dict(
            response=row["http_response"]["choices"][0]["message"]["content"]
        ),
    )

    ds = processor(ds)
    print(ds.take_all())

使用服务部署进行批量推理#

您可以为批量推理配置任何 服务部署。这对于多轮对话特别有用,您可以在对话中跨对话共享 vLLM 引擎。要实现这一点,请创建一个 LLM 服务部署,并使用 ServeDeploymentProcessorConfig 类来配置处理器。

import ray
from ray import serve
from ray.data.llm import ServeDeploymentProcessorConfig, build_processor
from ray.serve.llm import (
    LLMConfig,
    ModelLoadingConfig,
    build_llm_deployment,
)
from ray.serve.llm.openai_api_models import CompletionRequest

llm_config = LLMConfig(
    model_loading_config=ModelLoadingConfig(
        model_id="facebook/opt-1.3b",
        model_source="facebook/opt-1.3b",
    ),
    deployment_config=dict(
        name="demo_deployment_config",
        autoscaling_config=dict(
            min_replicas=1,
            max_replicas=1,
        ),
    ),
    engine_kwargs=dict(
        enable_prefix_caching=True,
        enable_chunked_prefill=True,
        max_num_batched_tokens=4096,
    ),
)

APP_NAME = "demo_app"
DEPLOYMENT_NAME = "demo_deployment"
override_serve_options = dict(name=DEPLOYMENT_NAME)

llm_app = build_llm_deployment(
    llm_config, override_serve_options=override_serve_options
)
app = serve.run(llm_app, name=APP_NAME)
config = ServeDeploymentProcessorConfig(
    deployment_name=DEPLOYMENT_NAME,
    app_name=APP_NAME,
    dtype_mapping={
        "CompletionRequest": CompletionRequest,
    },
    concurrency=1,
    batch_size=64,
)

processor1 = build_processor(
    config,
    preprocess=lambda row: dict(
        method="completions",
        dtype="CompletionRequest",
        request_kwargs=dict(
            model="facebook/opt-1.3b",
            prompt=f"This is a prompt for {row['id']}",
            stream=False,
        ),
    ),
    postprocess=lambda row: dict(
        prompt=row["choices"][0]["text"],
    ),
)

processor2 = build_processor(
    config,
    preprocess=lambda row: dict(
        method="completions",
        dtype="CompletionRequest",
        request_kwargs=dict(
            model="facebook/opt-1.3b",
            prompt=row["prompt"],
            stream=False,
        ),
    ),
    postprocess=lambda row: row,
)

ds = ray.data.range(10)
ds = processor2(processor1(ds))
print(ds.take_all())

跨节点并行#

Ray Data LLM 支持跨节点并行,包括张量并行和流水线并行。您可以通过 vLLMEngineProcessorConfig 中的 engine_kwargs 参数配置并行级别。使用 ray 作为分布式执行器后端来启用跨节点并行。

config = vLLMEngineProcessorConfig(
    model_source="unsloth/Llama-3.1-8B-Instruct",
    engine_kwargs={
        "enable_chunked_prefill": True,
        "max_num_batched_tokens": 4096,
        "max_model_len": 16384,
        "pipeline_parallel_size": 4,
        "tensor_parallel_size": 4,
        "distributed_executor_backend": "ray",
    },
    batch_size=32,
    concurrency=1,
)

此外,您可以自定义放置组策略来控制 Ray 如何在节点之间放置 vLLM 引擎工作器。虽然您可以指定张量并行和流水线并行的程度,但模型等级到 GPU 的具体分配由 vLLM 引擎管理,您无法通过 Ray Data LLM API 直接进行配置。

config = vLLMEngineProcessorConfig(
    model_source="unsloth/Llama-3.1-8B-Instruct",
    engine_kwargs={
        "enable_chunked_prefill": True,
        "max_num_batched_tokens": 4096,
        "max_model_len": 16384,
        "pipeline_parallel_size": 2,
        "tensor_parallel_size": 2,
        "distributed_executor_backend": "ray",
    },
    batch_size=32,
    concurrency=1,
    placement_group_config={
        "bundles": [{"GPU": 1}] * 4,
        "strategy": "STRICT_PACK",
    },
)

除了跨节点并行,您还可以将 LLM 阶段水平扩展到多个节点。在 vLLMEngineProcessorConfig 中使用 concurrency 参数配置副本数量。

config = vLLMEngineProcessorConfig(
    model_source="unsloth/Llama-3.1-8B-Instruct",
    engine_kwargs={
        "enable_chunked_prefill": True,
        "max_num_batched_tokens": 4096,
        "max_model_len": 16384,
    },
    concurrency=10,
    batch_size=64,
)

使用情况数据收集#

将收集以下功能和属性的数据,以改进 Ray Data LLM

  • 用于构建 LLM 处理器所使用的配置名称

  • 用于数据并行化的并发用户数

  • 请求的批次大小

  • 用于构建 vLLMEngineProcessor 的模型架构

  • 用于构建 vLLMEngineProcessor 的任务类型

  • 用于构建 vLLMEngineProcessor 的引擎参数

  • 使用的张量并行大小和流水线并行大小

  • 使用的 GPU 类型和 GPU 数量

如果您想选择退出使用情况数据收集,可以按照 Ray 使用情况统计 来关闭它。

常见问题解答 (FAQ)#

GPU 内存管理和 CUDA OOM 预防#

如果您遇到 CUDA 内存不足错误,Ray Data LLM 提供了一些配置选项来优化 GPU 内存使用

# GPU memory management configuration
# If you encounter CUDA out of memory errors, try these optimizations:
config_memory_optimized = vLLMEngineProcessorConfig(
    model_source="unsloth/Llama-3.1-8B-Instruct",
    engine_kwargs={
        "max_model_len": 8192,
        "max_num_batched_tokens": 2048,
        "enable_chunked_prefill": True,
        "gpu_memory_utilization": 0.85,
        "block_size": 16,
    },
    concurrency=1,
    batch_size=16,
)

# For very large models or limited GPU memory:
config_minimal_memory = vLLMEngineProcessorConfig(
    model_source="unsloth/Llama-3.1-8B-Instruct",
    engine_kwargs={
        "max_model_len": 4096,
        "max_num_batched_tokens": 1024,
        "enable_chunked_prefill": True,
        "gpu_memory_utilization": 0.75,
    },
    concurrency=1,
    batch_size=8,
)

处理 GPU 内存问题的关键策略

  • 减小批次大小:从较小的批次(8-16)开始,然后逐渐增加

  • 降低 `max_num_batched_tokens`:从 4096 降至 2048 或 1024

  • 减小 `max_model_len`:在可能的情况下使用较短的上下文长度

  • 设置 `gpu_memory_utilization`:使用 0.75-0.85 而不是默认的 0.90

  • 使用更小的模型:考虑在资源受限的环境中使用更小的模型变体

如果您遇到 CUDA 内存不足,您的批次大小可能过大。设置一个明确的小批次大小,或使用更小的模型,或更大的 GPU。

如何将模型权重缓存到远程对象存储#

在将 Ray Data LLM 部署到大型集群时,模型加载可能会受到 HuggingFace 的速率限制。在这种情况下,您可以将模型缓存到远程对象存储(AWS S3 或 Google Cloud Storage)以获得更稳定的模型加载。

Ray Data LLM 提供以下实用工具来帮助将模型上传到远程对象存储。

# Download model from HuggingFace, and upload to GCS
python -m ray.llm.utils.upload_model \
    --model-source facebook/opt-350m \
    --bucket-uri gs://my-bucket/path/to/facebook-opt-350m
# Or upload a local custom model to S3
python -m ray.llm.utils.upload_model \
    --model-source local/path/to/model \
    --bucket-uri s3://my-bucket/path/to/model_name

之后,您可以使用远程对象存储 URI 作为配置中的 model_source

# S3 hosted model configuration
s3_config = vLLMEngineProcessorConfig(
    model_source="s3://your-bucket/your-model-path/",
    engine_kwargs={
        "load_format": "runai_streamer",
        "max_model_len": 16384,
    },
    concurrency=1,
    batch_size=64,
)