使用 PyTorch 进行图像分类批量推理#

try-anyscale-quickstart-image-classification-batch-inference-with-pytorch

在此示例中,我们将介绍如何使用 Ray Data 进行使用多个 GPU worker 的大规模批量推理。

具体来说,我们将

  • 从 S3 存储桶加载 Imagenette 数据集并创建 Ray Dataset。

  • 加载预训练的 ResNet 模型。

  • 使用 Ray Data 预处理数据集并在多个 GPU 上并行进行模型推理

  • 评估预测结果并将结果保存到 S3/本地磁盘。

即使你没有可用的 GPU,此示例仍然可以工作,但整体性能会较慢。

请参阅此批量推理指南,了解如何调整此示例以使用你自己的模型和数据集的技巧和故障排除!

要运行此示例,你需要以下包

!pip install -q "ray[data]" torch torchvision

步骤 1:从 S3 读取数据集#

Imagenette 是 Imagenet 的一个子集,包含 10 个类别。我们将此数据集公开托管在 S3 存储桶中。由于我们这里只进行推理,因此我们只加载验证集。

在这里,我们使用 ray.data.read_images 从 S3 加载验证集。Ray Data 还支持从各种其他数据源和格式读取数据。

import ray

s3_uri = "s3://anonymous@air-example-data-2/imagenette2/train/"

ds = ray.data.read_images(s3_uri, mode="RGB")
ds
2023-06-27 23:23:57,184	INFO worker.py:1452 -- Connecting to existing Ray cluster at address: 10.0.5.141:6379...
2023-06-27 23:23:57,228	INFO worker.py:1627 -- Connected to Ray cluster. View the dashboard at https://session-kncgqf3p7w2j7qcsnz2safl4tj.i.anyscaleuserdata-staging.com 
2023-06-27 23:23:57,243	INFO packaging.py:347 -- Pushing file package 'gcs://_ray_pkg_32ef287a3a39e82021e70d2413880a69.zip' (4.49MiB) to Ray cluster...
2023-06-27 23:23:57,257	INFO packaging.py:360 -- Successfully pushed file package 'gcs://_ray_pkg_32ef287a3a39e82021e70d2413880a69.zip'.
2023-06-27 23:23:59,629	WARNING dataset.py:253 -- Important: Ray Data requires schemas for all datasets in Ray 2.5. This means that standalone Python objects are no longer supported. In addition, the default batch format is fixed to NumPy. To revert to legacy behavior temporarily, set the environment variable RAY_DATA_STRICT_MODE=0 on all cluster processes.

Learn more here: https://docs.rayai.org.cn/en/master/data/faq.html#migrating-to-strict-mode

检查 schema,我们可以看到数据集中有 1 列包含以 Numpy 数组形式存储的图像。

ds.schema()
Column  Type
------  ----
image   numpy.ndarray(ndim=3, dtype=uint8)

步骤 2:在单个批次上进行推理#

接下来,我们可以使用预训练的 ResNet152 模型,并按照此 PyTorch 示例,在单个批次数据上进行推理。

让我们从数据集中获取 10 个图像的批次。批次中的每个图像都表示为一个 Numpy 数组。

single_batch = ds.take_batch(10)

我们可以可视化此批次中的 1 个图像。

from PIL import Image

img = Image.fromarray(single_batch["image"][0])
img
../../_images/9b7bb855d83f101a312765bb1d3b489ffea958fe5486e0fb9997c444f481099a.png

现在,让我们下载一个预训练的 PyTorch Resnet 模型,并获取必要的预处理转换,以便在预测前对图像进行预处理。

import torch
from torchvision.models import ResNet152_Weights
from torchvision import transforms
from torchvision import models

weights = ResNet152_Weights.IMAGENET1K_V1

# Load the pretrained resnet model and move to GPU if one is available.

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = models.resnet152(weights=weights).to(device)
model.eval()

imagenet_transforms = weights.transforms
transform = transforms.Compose([transforms.ToTensor(), imagenet_transforms()])

然后,我们将这些转换应用到我们的图像批次,并将批次传递给模型进行推理,确保使用 GPU 设备进行推理。

我们可以看到批次中的大多数图像都被正确分类为“tench”,这是一种鱼类。

transformed_batch = [transform(image) for image in single_batch["image"]]
with torch.inference_mode():
    prediction_results = model(torch.stack(transformed_batch).to(device))
    classes = prediction_results.argmax(dim=1).cpu()

del model  # Free up GPU memory

labels = [weights.meta["categories"][i] for i in classes]
labels
['tench',
 'tench',
 'tench',
 'tench',
 'tench',
 'tench',
 'tench',
 'tench',
 'bittern',
 'tench']

步骤 3:使用 Ray Data 扩展到整个数据集#

通过使用 Ray Data,我们可以应用上一节中的相同逻辑,将处理扩展到整个数据集,利用集群中的所有 GPU。

预处理#

首先,让我们将预处理代码转换为 Ray Data。我们将预处理代码打包在一个 preprocess_image 函数中。此函数应只接受一个参数,该参数是一个字典,包含数据集中的单个图像,表示为 numpy 数组。我们使用上面定义的相同 transform 函数,并将转换后的图像存储在一个新的 transformed_image 字段中。

请注意,通常为了效率,我们只会保留转换后的图像。但是,为了本教程的目的,我们也保留了原始图像,以便在最后与预测结果一起显示它们。

import numpy as np
from typing import Dict

def preprocess_image(row: Dict[str, np.ndarray]):
    return {
        "original_image": row["image"],
        "transformed_image": transform(row["image"]),
    }

然后我们使用 map() 方法将函数逐行应用于整个数据集。我们使用它而不是 map_batches(),因为 torchvision 转换必须一次应用于一张图像,这是由于数据集中包含不同大小的图像。

通过使用 Ray Data 的 map() 方法,我们可以扩展预处理,利用 Ray 集群中的所有资源。

“注意:map() 方法是惰性的。直到我们使用 iter_batches()take() 等方法消费结果后,它才会执行。”

transformed_ds = ds.map(preprocess_image)
2023-06-27 23:25:59,387	WARNING dataset.py:4384 -- The `map`, `flat_map`, and `filter` operations are unvectorized and can be very slow. If you're using a vectorized transformation, consider using `.map_batches()` instead.

模型推理#

接下来,让我们转换模型推理部分。与预处理相比,模型推理有 2 个区别

  1. 模型加载和初始化通常开销较大。

  2. 如果我们批量处理数据,模型推理可以通过硬件加速进行优化。使用更大的批次可以提高 GPU 利用率和推理作业的总体运行时。

因此,我们将模型推理代码转换为以下 ResnetModel 类。在此类中,我们将开销较大的模型加载和初始化代码放在 __init__ 构造函数中,它只会运行一次。我们将模型推理代码放在 __call__ 方法中,该方法将为每个批次调用。

__call__ 方法接受一批数据项,而不是单个数据项。在这种情况下,该批次是一个字典,其中包含由我们的预处理步骤填充的 "transformed_image" 键,对应的值是表示为 np.ndarray 格式的图像 Numpy 数组。我们重用步骤 2 中的相同推理逻辑。

from typing import Dict
import numpy as np
import torch


class ResnetModel:
    def __init__(self):
        self.weights = ResNet152_Weights.IMAGENET1K_V1
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.model = models.resnet152(weights=self.weights).to(self.device)
        self.model.eval()

    def __call__(self, batch: Dict[str, np.ndarray]):
        # Convert the numpy array of images into a PyTorch tensor.
        # Move the tensor batch to GPU if available.
        torch_batch = torch.from_numpy(batch["transformed_image"]).to(self.device)
        with torch.inference_mode():
            prediction = self.model(torch_batch)
            predicted_classes = prediction.argmax(dim=1).detach().cpu()
            predicted_labels = [
                self.weights.meta["categories"][i] for i in predicted_classes
            ]
            return {
                "predicted_label": predicted_labels,
                "original_image": batch["original_image"],
            }

然后我们使用 map_batches() API 将模型应用于整个数据集

  • map_batches 的第一个参数是用户定义函数 (UDF),它可以是函数或类。由于此示例使用类,UDF 将作为长期运行的 Ray actors 运行。对于基于类的 UDF,使用 concurrency 参数指定并行 Actor 的数量。

  • num_gpus 参数指定每个 ResnetModel 实例所需的 GPU 数量。在此示例中,我们为每个模型副本需要 1 个 GPU。如果你正在进行 CPU 推理,可以删除 num_gpus=1

  • batch_size 参数表示每个批次中图像的数量。使用你自己的模型和数据集时,请查看 Ray dashboard 中的 GPU 内存使用情况,以尝试调整 batch_size。你应该尝试最大化批次大小,同时不超过 GPU 内存。你可能需要减小此数字才能在本地运行。

predictions = transformed_ds.map_batches(
    ResnetModel,
    concurrency=4,  # Use 4 GPUs. Change this number based on the number of GPUs in your cluster.
    num_gpus=1,  # Specify 1 GPU per model replica.
    batch_size=720,  # Use the largest batch size that can fit on our GPUs.
)

验证并保存结果#

让我们取一小批预测结果并验证它们。

prediction_batch = predictions.take_batch(5)
2023-06-27 23:26:04,893	INFO streaming_executor.py:91 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadImage->Map] -> ActorPoolMapOperator[MapBatches(ResnetModel)]
2023-06-27 23:26:04,894	INFO streaming_executor.py:92 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-06-27 23:26:04,895	INFO streaming_executor.py:94 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
2023-06-27 23:26:04,950	INFO actor_pool_map_operator.py:114 -- MapBatches(ResnetModel): Waiting for 4 pool actors to start...
2023-06-27 23:26:29,120	INFO streaming_executor.py:149 -- Shutting down <StreamingExecutor(Thread-36, started daemon 140560158410496)>.
2023-06-27 23:26:29,335	WARNING actor_pool_map_operator.py:264 -- To ensure full parallelization across an actor pool of size 4, the specified batch size should be at most 360. Your configured batch size for this operator was 720.

我们看到所有图像都被正确分类为“tench”,这是一种鱼类。

from PIL import Image

for image, prediction in zip(
    prediction_batch["original_image"], prediction_batch["predicted_label"]
):
    img = Image.fromarray(image)
    display(img)
    print("Prediction: ", prediction)
../../_images/dcfb9e74d5e2c80b6053ae5d9f03a1dc5da886a73116d25f89769a14b77a395f.png
Label:  tench
../../_images/99cc48399ce119902a2f9aa3330108ac8973c60131f94325949177824d1145fe.png
Label:  tench
../../_images/2e364ab27edad78af1772e1d8d3ff397febefec20d343ab5b449bad09fdefeef.png
Label:  tench
../../_images/5b1a113310c2fc8bd5fae12ab515bbb46bde7a8fd5215ab888560e335034bf09.png
Label:  tench
../../_images/77cfc3f82d4b234e10ff5009448162cad5e338455a15617290cc08d8346efebd.png
Label:  tench

如果样本看起来不错,我们可以继续将结果保存到外部存储(例如,本地磁盘或 AWS S3 等云存储)。有关所有支持的存储和文件格式,请参阅保存数据指南

import tempfile

temp_dir = tempfile.mkdtemp()

# First, drop the original images to avoid them being saved as part of the predictions.
# Then, write the predictions in parquet format to a path with the `local://` prefix
# to make sure all results get written on the head node.
predictions.drop_columns(["original_image"]).write_parquet(f"local://{temp_dir}")
print(f"Predictions saved to `{temp_dir}`!")
2023-06-27 23:26:38,105	INFO streaming_executor.py:91 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadImage->Map] -> ActorPoolMapOperator[MapBatches(ResnetModel)] -> TaskPoolMapOperator[MapBatches(<lambda>)] -> TaskPoolMapOperator[Write]
2023-06-27 23:26:38,106	INFO streaming_executor.py:92 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-06-27 23:26:38,106	INFO streaming_executor.py:94 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
2023-06-27 23:26:38,141	INFO actor_pool_map_operator.py:114 -- MapBatches(ResnetModel): Waiting for 4 pool actors to start...
2023-06-27 23:27:27,855	INFO streaming_executor.py:149 -- Shutting down <StreamingExecutor(Thread-74, stopped daemon 140560149755648)>.
Predictions saved to `/tmp/tmp0y52g_f5`!