使用 LLM#

ray.data.llm 模块与关键的大型语言模型 (LLM) 推理引擎和已部署模型集成,以实现 LLM 批量推理。

本指南将向您展示如何使用 ray.data.llm

使用 LLM 执行批量推理#

概括来说,ray.data.llm 模块提供了一个 Processor 对象,该对象封装了在 Ray Data 数据集上执行 LLM 批量推理的逻辑。

您可以使用 build_llm_processor API 来构建处理器。以下示例使用 vLLMEngineProcessorConfigunsloth/Llama-3.1-8B-Instruct 模型构建处理器。

要运行此示例,请安装 vLLM,它是一个流行且优化的 LLM 推理引擎。

# Later versions *should* work but are not tested yet.
pip install -U vllm==0.7.2

vLLMEngineProcessorConfig 是 vLLM 引擎的配置对象。它包含模型名称、使用的 GPU 数量和分片数量,以及其他 vLLM 引擎配置。执行时,Processor 对象会实例化 vLLM 引擎的副本(底层使用 map_batches)。

import ray
from ray.data.llm import vLLMEngineProcessorConfig, build_llm_processor
import numpy as np

config = vLLMEngineProcessorConfig(
    model_source="unsloth/Llama-3.1-8B-Instruct",
    engine_kwargs={
        "enable_chunked_prefill": True,
        "max_num_batched_tokens": 4096,
        "max_model_len": 16384,
    },
    concurrency=1,
    batch_size=64,
)
processor = build_llm_processor(
    config,
    preprocess=lambda row: dict(
        messages=[
            {"role": "system", "content": "You are a bot that responds with haikus."},
            {"role": "user", "content": row["item"]}
        ],
        sampling_params=dict(
            temperature=0.3,
            max_tokens=250,
        )
    ),
    postprocess=lambda row: dict(
        answer=row["generated_text"],
        **row  # This will return all the original columns in the dataset.
    ),
)

ds = ray.data.from_items(["Start of the haiku is: Complete this for me..."])

ds = processor(ds)
ds.show(limit=1)
{'answer': 'Snowflakes gently fall\nBlanketing the winter scene\nFrozen peaceful hush'}

每个处理器都需要特定的输入列。您可以使用以下 API 找到更多信息

processor.log_input_column_names()
The first stage of the processor is ChatTemplateStage.
Required input columns:
        messages: A list of messages in OpenAI chat format. See https://platform.openai.com/docs/api-reference/chat/create for details.

某些模型可能需要指定 Hugging Face token。您可以在 runtime_env 参数中指定 token。

config = vLLMEngineProcessorConfig(
    model_source="unsloth/Llama-3.1-8B-Instruct",
    runtime_env={"env_vars": {"HF_TOKEN": "your_huggingface_token"}},
    concurrency=1,
    batch_size=64,
)

配置 vLLM 进行 LLM 推理#

使用 vLLMEngineProcessorConfig 配置 vLLM 引擎。

from ray.data.llm import vLLMEngineProcessorConfig

config = vLLMEngineProcessorConfig(
    model_source="unsloth/Llama-3.1-8B-Instruct",
    engine_kwargs={"max_model_len": 20000},
    concurrency=1,
    batch_size=64,
)

对于处理更大的模型,请指定模型并行。

config = vLLMEngineProcessorConfig(
    model_source="unsloth/Llama-3.1-8B-Instruct",
    engine_kwargs={
        "max_model_len": 16384,
        "tensor_parallel_size": 2,
        "pipeline_parallel_size": 2,
        "enable_chunked_prefill": True,
        "max_num_batched_tokens": 2048,
    },
    concurrency=1,
    batch_size=64,
)

底层的 Processor 对象会实例化 vLLM 引擎的副本,并自动配置并行 worker 来处理模型并行(如果指定了张量并行和流水线并行)。

为了优化模型加载,您可以将 load_format 配置为 runai_streamertensorizer

注意

在这种情况下,请安装包含 runai 依赖项的 vLLM:pip install -U "vllm[runai]==0.7.2"

config = vLLMEngineProcessorConfig(
    model_source="unsloth/Llama-3.1-8B-Instruct",
    engine_kwargs={"load_format": "runai_streamer"},
    concurrency=1,
    batch_size=64,
)

如果您的模型托管在 AWS S3 上,您可以在 model_source 参数中指定 S3 路径,并在 engine_kwargs 参数中指定 load_format="runai_streamer"

config = vLLMEngineProcessorConfig(
    model_source="s3://your-bucket/your-model/",  # Make sure adding the trailing slash!
    engine_kwargs={"load_format": "runai_streamer"},
    runtime_env={"env_vars": {
        "AWS_ACCESS_KEY_ID": "your_access_key_id",
        "AWS_SECRET_ACCESS_KEY": "your_secret_access_key",
        "AWS_REGION": "your_region",
    }},
    concurrency=1,
    batch_size=64,
)

要进行 multi-LoRA 批量推理,您需要在 engine_kwargs 中设置 LoRA 相关参数。详细信息请参见使用 LoRA 的 vLLM 示例

config = vLLMEngineProcessorConfig(
    model_source="unsloth/Llama-3.1-8B-Instruct",
    engine_kwargs={
        enable_lora=True,
        max_lora_rank=32,
        max_loras=1,
    },
    concurrency=1,
    batch_size=64,
)

使用 OpenAI 兼容端点进行批量推理#

您还可以调用具有 OpenAI 兼容 API 端点的已部署模型。

import ray
import os
from ray.data.llm import HttpRequestProcessorConfig, build_llm_processor

OPENAI_KEY = os.environ["OPENAI_API_KEY"]
ds = ray.data.from_items(["Hand me a haiku."])


config = HttpRequestProcessorConfig(
    url="https://api.openai.com/v1/chat/completions",
    headers={"Authorization": f"Bearer {OPENAI_KEY}"},
    qps=1,
)

processor = build_llm_processor(
    config,
    preprocess=lambda row: dict(
        payload=dict(
            model="gpt-4o-mini",
            messages=[
                {"role": "system", "content": "You are a bot that responds with haikus."},
                {"role": "user", "content": row["item"]}
            ],
            temperature=0.0,
            max_tokens=150,
        ),
    ),
    postprocess=lambda row: dict(response=row["http_response"]["choices"][0]["message"]["content"]),
)

ds = processor(ds)
print(ds.take_all())

使用情况数据收集#

收集以下功能和属性的数据以改进 Ray Data LLM

  • 用于构建 LLM 处理器的配置名称

  • 数据并行的并发用户数量

  • 请求的批处理大小

  • 用于构建 vLLMEngineProcessor 的模型架构

  • 用于构建 vLLMEngineProcessor 的任务类型

  • 用于构建 vLLMEngineProcessor 的引擎参数

  • 使用的张量并行大小和流水线并行大小

  • 使用的 GPU 类型和使用的 GPU 数量

如果您想选择退出使用情况数据收集,可以按照Ray 使用情况统计将其关闭。

生产指南#

将模型权重缓存到远程对象存储#

在将 Ray Data LLM 部署到大型集群时,模型加载可能会受到 HuggingFace 的速率限制。在这种情况下,您可以将模型缓存到远程对象存储(AWS S3 或 Google Cloud Storage)以实现更稳定的模型加载。

Ray Data LLM 提供了以下工具来帮助将模型上传到远程对象存储。

# Download model from HuggingFace, and upload to GCS
python -m ray.llm.utils.upload_model \
    --model-source facebook/opt-350m \
    --bucket-uri gs://my-bucket/path/to/facebook-opt-350m
# Or upload a local custom model to S3
python -m ray.llm.utils.upload_model \
    --model-source local/path/to/model \
    --bucket-uri s3://my-bucket/path/to/model_name

稍后,您可以在配置中使用远程对象存储 URI 作为 model_source

config = vLLMEngineProcessorConfig(
    model_source="gs://my-bucket/path/to/facebook-opt-350m",  # or s3://my-bucket/path/to/model_name
    ...
)