端到端:离线批量推理#

离线批量推理是一个为固定数据集生成模型预测的过程。Ray Data 提供了一个高效且可扩展的批量推理解决方案,为深度学习应用提供更快的执行速度和成本效益。

../_images/stream-example.png

注意

本指南主要关注深度学习框架的批量推理。有关 LLM 批量推理的更多信息,请参阅 使用 LLM

快速入门#

首先,安装 Ray Data

pip install -U "ray[data]"

使用 Ray Data 进行离线推理涉及四个基本步骤

  • 第一步:将数据加载到 Ray Dataset 中。Ray Data 支持多种不同的数据源和格式。更多详细信息,请参阅 加载数据

  • 第二步:定义一个 Python 类来加载预训练模型。

  • 第三步:通过调用 ds.map_batches(),使用预训练模型转换数据集。更多详细信息,请参阅 转换数据

  • 第四步:通过迭代输出或保存结果来获取最终预测。更多详细信息,请参阅 迭代数据保存数据 用户指南。

有关针对您用例的更深入示例,请参阅 批量推理示例。有关如何配置批量推理,请参阅 配置指南

from typing import Dict
import numpy as np

import ray

# Step 1: Create a Ray Dataset from in-memory Numpy arrays.
# You can also create a Ray Dataset from many other sources and file
# formats.
ds = ray.data.from_numpy(np.asarray(["Complete this", "for me"]))

# Step 2: Define a Predictor class for inference.
# Use a class to initialize the model just once in `__init__`
# and reuse it for inference across multiple batches.
class HuggingFacePredictor:
    def __init__(self):
        from transformers import pipeline
        # Initialize a pre-trained GPT2 Huggingface pipeline.
        self.model = pipeline("text-generation", model="gpt2")

    # Logic for inference on 1 batch of data.
    def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, list]:
        # Get the predictions from the input batch.
        predictions = self.model(list(batch["data"]), max_length=20, num_return_sequences=1)
        # `predictions` is a list of length-one lists. For example:
        # [[{'generated_text': 'output_1'}], ..., [{'generated_text': 'output_2'}]]
        # Modify the output to get it into the following format instead:
        # ['output_1', 'output_2']
        batch["output"] = [sequences[0]["generated_text"] for sequences in predictions]
        return batch

# Step 2: Map the Predictor over the Dataset to get predictions.
# Use 2 parallel actors for inference. Each actor predicts on a
# different partition of data.
predictions = ds.map_batches(HuggingFacePredictor, compute=ray.data.ActorPoolStrategy(size=2))
# Step 3: Show one prediction output.
predictions.show(limit=1)
{'data': 'Complete this', 'output': 'Complete this information or purchase any item from this site.\n\nAll purchases are final and non-'}
from typing import Dict
import numpy as np
import torch
import torch.nn as nn

import ray

# Step 1: Create a Ray Dataset from in-memory Numpy arrays.
# You can also create a Ray Dataset from many other sources and file
# formats.
ds = ray.data.from_numpy(np.ones((1, 100)))

# Step 2: Define a Predictor class for inference.
# Use a class to initialize the model just once in `__init__`
# and reuse it for inference across multiple batches.
class TorchPredictor:
    def __init__(self):
        # Load a dummy neural network.
        # Set `self.model` to your pre-trained PyTorch model.
        self.model = nn.Sequential(
            nn.Linear(in_features=100, out_features=1),
            nn.Sigmoid(),
        )
        self.model.eval()

    # Logic for inference on 1 batch of data.
    def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
        tensor = torch.as_tensor(batch["data"], dtype=torch.float32)
        with torch.inference_mode():
            # Get the predictions from the input batch.
            return {"output": self.model(tensor).numpy()}

# Step 2: Map the Predictor over the Dataset to get predictions.
# Use 2 parallel actors for inference. Each actor predicts on a
# different partition of data.
predictions = ds.map_batches(TorchPredictor, compute=ray.data.ActorPoolStrategy(size=2))
# Step 3: Show one prediction output.
predictions.show(limit=1)
{'output': array([0.5590901], dtype=float32)}
from typing import Dict
import numpy as np

import ray

# Step 1: Create a Ray Dataset from in-memory Numpy arrays.
# You can also create a Ray Dataset from many other sources and file
# formats.
ds = ray.data.from_numpy(np.ones((1, 100)))

# Step 2: Define a Predictor class for inference.
# Use a class to initialize the model just once in `__init__`
# and reuse it for inference across multiple batches.
class TFPredictor:
    def __init__(self):
        from tensorflow import keras

        # Load a dummy neural network.
        # Set `self.model` to your pre-trained Keras model.
        input_layer = keras.Input(shape=(100,))
        output_layer = keras.layers.Dense(1, activation="sigmoid")
        self.model = keras.Sequential([input_layer, output_layer])

    # Logic for inference on 1 batch of data.
    def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
        # Get the predictions from the input batch.
        return {"output": self.model(batch["data"]).numpy()}

# Step 2: Map the Predictor over the Dataset to get predictions.
# Use 2 parallel actors for inference. Each actor predicts on a
# different partition of data.
predictions = ds.map_batches(TFPredictor, compute=ray.data.ActorPoolStrategy(size=2))
 # Step 3: Show one prediction output.
predictions.show(limit=1)
{'output': array([0.625576], dtype=float32)}

Ray Data 提供与 vLLM 的原生集成,vLLM 是一个高性能的大型语言模型(LLM)推理引擎。

import ray
from ray.data.llm import vLLMEngineProcessorConfig, build_processor
import numpy as np

config = vLLMEngineProcessorConfig(
    model="unsloth/Llama-3.1-8B-Instruct",
    engine_kwargs={
        "enable_chunked_prefill": True,
        "max_num_batched_tokens": 4096,
        "max_model_len": 16384,
    },
    concurrency=1,
    batch_size=64,
)
processor = build_processor(
    config,
    preprocess=lambda row: dict(
        messages=[
            {"role": "system", "content": "You are a bot that responds with haikus."},
            {"role": "user", "content": row["item"]}
        ],
        sampling_params=dict(
            temperature=0.3,
            max_tokens=250,
        )
    ),
    postprocess=lambda row: dict(
        answer=row["generated_text"]
    ),
)

ds = ray.data.from_items(["Start of the haiku is: Complete this for me..."])

ds = processor(ds)
ds.show(limit=1)
{'answer': 'Snowflakes gently fall\nBlanketing the winter scene\nFrozen peaceful hush'}

配置和故障排除#

使用 GPU 进行推理#

要使用 GPU 进行推理,请对代码进行以下更改

  1. 更新类实现,将模型和数据移动到 GPU 并从中移出。

  2. ds.map_batches() 调用中指定 num_gpus=1,以表明每个 actor 应使用 1 个 GPU。

  3. 指定 batch_size 进行推理。有关如何配置批大小的更多详细信息,请参阅 配置批大小

其余部分与 快速入门 相同。

from typing import Dict
import numpy as np

import ray

ds = ray.data.from_numpy(np.asarray(["Complete this", "for me"]))

class HuggingFacePredictor:
    def __init__(self):
        from transformers import pipeline
        # Set "cuda:0" as the device so the Huggingface pipeline uses GPU.
        self.model = pipeline("text-generation", model="gpt2", device="cuda:0")

    def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, list]:
        predictions = self.model(list(batch["data"]), max_length=20, num_return_sequences=1)
        batch["output"] = [sequences[0]["generated_text"] for sequences in predictions]
        return batch

# Use 2 actors, each actor using 1 GPU. 2 GPUs total.
predictions = ds.map_batches(
    HuggingFacePredictor,
    num_gpus=1,
    # Specify the batch size for inference.
    # Increase this for larger datasets.
    batch_size=1,
    # Set the concurrency to the number of GPUs in your cluster.
    compute=ray.data.ActorPoolStrategy(size=2),
    )
predictions.show(limit=1)
{'data': 'Complete this', 'output': 'Complete this poll. Which one do you think holds the most promise for you?\n\nThank you'}
from typing import Dict
import numpy as np
import torch
import torch.nn as nn

import ray

ds = ray.data.from_numpy(np.ones((1, 100)))

class TorchPredictor:
    def __init__(self):
        # Move the neural network to GPU device by specifying "cuda".
        self.model = nn.Sequential(
            nn.Linear(in_features=100, out_features=1),
            nn.Sigmoid(),
        ).cuda()
        self.model.eval()

    def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
        # Move the input batch to GPU device by specifying "cuda".
        tensor = torch.as_tensor(batch["data"], dtype=torch.float32, device="cuda")
        with torch.inference_mode():
            # Move the prediction output back to CPU before returning.
            return {"output": self.model(tensor).cpu().numpy()}

# Use 2 actors, each actor using 1 GPU. 2 GPUs total.
predictions = ds.map_batches(
    TorchPredictor,
    num_gpus=1,
    # Specify the batch size for inference.
    # Increase this for larger datasets.
    batch_size=1,
    # Set the concurrency to the number of GPUs in your cluster.
    compute=ray.data.ActorPoolStrategy(size=2),
    )
predictions.show(limit=1)
{'output': array([0.5590901], dtype=float32)}
from typing import Dict
import numpy as np

import ray

ds = ray.data.from_numpy(np.ones((1, 100)))

class TFPredictor:
    def __init__(self):
        import tensorflow as tf
        from tensorflow import keras

        # Move the neural network to GPU by specifying the GPU device.
        with tf.device("GPU:0"):
            input_layer = keras.Input(shape=(100,))
            output_layer = keras.layers.Dense(1, activation="sigmoid")
            self.model = keras.Sequential([input_layer, output_layer])

    def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
        import tensorflow as tf

        # Move the input batch to GPU by specifying GPU device.
        with tf.device("GPU:0"):
            return {"output": self.model(batch["data"]).numpy()}

# Use 2 actors, each actor using 1 GPU. 2 GPUs total.
predictions = ds.map_batches(
    TFPredictor,
    num_gpus=1,
    # Specify the batch size for inference.
    # Increase this for larger datasets.
    batch_size=1,
    # Set the concurrency to the number of GPUs in your cluster.
    compute=ray.data.ActorPoolStrategy(size=2),
)
predictions.show(limit=1)
{'output': array([0.625576], dtype=float32)}

配置批大小#

通过为 ds.map_batches() 设置 batch_size 参数,来配置传递给 __call__ 的输入批的大小。

增加批大小可以提高执行速度,因为推理是矢量化操作。对于 GPU 推理,增加批大小可以提高 GPU 利用率。将批大小设置为尽可能大,同时避免内存不足。如果遇到内存不足的错误,减小 batch_size 可能会有所帮助。

import numpy as np

import ray

ds = ray.data.from_numpy(np.ones((10, 100)))

def assert_batch(batch: Dict[str, np.ndarray]):
    assert len(batch) == 2
    return batch

# Specify that each input batch should be of size 2.
ds.map_batches(assert_batch, batch_size=2)

注意

默认的 batch_size 4096 对于具有大行的数据集(例如,具有许多列的表或大量图像的集合)可能过大。

处理 GPU 内存不足故障#

如果您遇到 CUDA 内存不足的问题,则您的批大小可能过大。通过按照 这些步骤 减小批大小。如果您的批大小已设置为 1,则使用更小的模型或具有更多内存的 GPU 设备。

对于使用大型模型的高级用户,您可以使用模型并行将模型分片到多个 GPU 上。

优化昂贵的 CPU 预处理#

如果您的工作负载除了模型推理外,还涉及昂贵的 CPU 预处理,则可以通过将预处理和推理逻辑分离到单独的操作中来优化吞吐量。这种分离允许对批次 \(N\) 的推理与对批次 \(N+1\) 的预处理同时执行。

有关在单独的 map 调用中执行预处理的示例,请参阅 使用 PyTorch ResNet18 进行图像分类批量推理

处理 CPU 内存不足故障#

如果您耗尽 CPU RAM,则很可能同时在同一节点上运行了过多的模型副本。例如,如果一个模型创建/运行时使用 5 GB RAM,而一台机器总共有 16 GB RAM,那么同一时间最多只能运行三个这样的模型。在这种情况下,每个任务/actor 的默认资源分配(一个 CPU)可能会导致 Ray 出现 OutOfMemoryError

假设您的集群有 4 个节点,每个节点有 16 个 CPU。为了限制每个节点最多运行 3 个这样的 actor,您可以覆盖 CPU 或内存

from typing import Dict
import numpy as np

import ray

ds = ray.data.from_numpy(np.asarray(["Complete this", "for me"]))

class HuggingFacePredictor:
    def __init__(self):
        from transformers import pipeline
        self.model = pipeline("text-generation", model="gpt2")

    def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, list]:
        predictions = self.model(list(batch["data"]), max_length=20, num_return_sequences=1)
        batch["output"] = [sequences[0]["generated_text"] for sequences in predictions]
        return batch

predictions = ds.map_batches(
    HuggingFacePredictor,
    # Require 5 CPUs per actor (so at most 3 can fit per 16 CPU node).
    num_cpus=5,
    # 3 actors per node, with 4 nodes in the cluster means concurrency of 12.
    compute=ray.data.ActorPoolStrategy(size=12),
    )
predictions.show(limit=1)