服务 ML 模型 (Tensorflow, PyTorch, Scikit-Learn, 其他)#

本指南介绍如何训练来自各种机器学习框架的模型并将其部署到 Ray Serve。

请参阅关键概念以了解 Ray Serve 的更多一般信息。

本示例训练并部署一个简单的 TensorFlow 神经网络。特别是,它展示了

  • 如何在 Ray Serve 部署中训练 TensorFlow 模型并从文件系统加载模型。

  • 如何解析 JSON 请求并进行预测。

Ray Serve 是框架无关的——你可以使用任何版本的 TensorFlow。本教程使用 TensorFlow 2 和 Keras。你还需要 requests 来向你的模型部署发送 HTTP 请求。如果尚未安装,请运行以下命令安装 TensorFlow 2 和 requests:

$ pip install "tensorflow>=2.0" requests "ray[serve]"

打开一个新的 Python 文件,命名为 tutorial_tensorflow.py。首先,导入 Ray Serve 和一些其他帮助程序。

from ray import serve

import os
import tempfile
import numpy as np
from starlette.requests import Request
from typing import Dict

import tensorflow as tf

接下来,使用 Keras 训练一个简单的 MNIST 模型。

TRAINED_MODEL_PATH = os.path.join(tempfile.gettempdir(), "mnist_model.h5")


def train_and_save_model():
    # Load mnist dataset
    mnist = tf.keras.datasets.mnist
    (x_train, y_train), (x_test, y_test) = mnist.load_data()
    x_train, x_test = x_train / 255.0, x_test / 255.0

    # Train a simple neural net model
    model = tf.keras.models.Sequential(
        [
            tf.keras.layers.Flatten(input_shape=(28, 28)),
            tf.keras.layers.Dense(128, activation="relu"),
            tf.keras.layers.Dropout(0.2),
            tf.keras.layers.Dense(10),
        ]
    )
    loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
    model.compile(optimizer="adam", loss=loss_fn, metrics=["accuracy"])
    model.fit(x_train, y_train, epochs=1)

    model.evaluate(x_test, y_test, verbose=2)
    model.summary()

    # Save the model in h5 format in local file system
    model.save(TRAINED_MODEL_PATH)


if not os.path.exists(TRAINED_MODEL_PATH):
    train_and_save_model()

接下来,定义一个 TFMnistModel 类,该类接受 HTTP 请求并运行你训练的 MNIST 模型。@serve.deployment 装饰器使其成为一个部署对象,你可以将其部署到 Ray Serve 上。请注意,Ray Serve 通过 HTTP 路由暴露此部署。默认情况下,当部署通过 HTTP 接收到请求时,Ray Serve 会调用 __call__ 方法。

@serve.deployment
class TFMnistModel:
    def __init__(self, model_path: str):
        import tensorflow as tf

        self.model_path = model_path
        self.model = tf.keras.models.load_model(model_path)

    async def __call__(self, starlette_request: Request) -> Dict:
        # Step 1: transform HTTP request -> tensorflow input
        # Here we define the request schema to be a json array.
        input_array = np.array((await starlette_request.json())["array"])
        reshaped_array = input_array.reshape((1, 28, 28))

        # Step 2: tensorflow input -> tensorflow output
        prediction = self.model(reshaped_array)

        # Step 3: tensorflow output -> web output
        return {"prediction": prediction.numpy().tolist(), "file": self.model_path}

注意

当你部署和实例化 TFMnistModel 类时,Ray Serve 会从你的文件系统加载 TensorFlow 模型,以便它稍后能够运行模型的推理并处理请求。

现在你已经定义了 Serve 部署,准备好它以便进行部署。

mnist_model = TFMnistModel.bind(TRAINED_MODEL_PATH)

注意

TFMnistModel.bind(TRAINED_MODEL_PATH) 将参数 TRAINED_MODEL_PATH 绑定到部署,并返回一个 DeploymentNode 对象,它是 TFMnistModel 部署对象的包装,你可以使用它与其他 DeploymentNodes 连接,形成更复杂的部署图

最后,通过终端将模型部署到 Ray Serve。

$ serve run tutorial_tensorflow:mnist_model

接下来,查询模型。在 Serve 运行时,打开另一个终端窗口,并在交互式 Python shell 或单独的 Python 脚本中运行以下代码:

import requests
import numpy as np

resp = requests.get(
    "http://localhost:8000/", json={"array": np.random.randn(28 * 28).tolist()}
)
print(resp.json())

你应该会得到类似如下的输出,尽管具体的预测结果可能有所不同

{
 "prediction": [[-1.504277229309082, ..., -6.793371200561523]],
 "file": "/tmp/mnist_model.h5"
}

本示例加载并部署一个 PyTorch ResNet 模型。特别是,它展示了

  • 如何从 PyTorch 的预训练模型库加载模型。

  • 如何解析 JSON 请求,转换负载并进行预测。

本教程需要 PyTorch 和 Torchvision。Ray Serve 是框架无关的,可与任何版本的 PyTorch 一起使用。你还需要 requests 来向你的模型部署发送 HTTP 请求。如果尚未安装,请运行以下命令安装它们:

$ pip install torch torchvision requests  "ray[serve]"

打开一个新的 Python 文件,命名为 tutorial_pytorch.py。首先,导入 Ray Serve 和一些其他帮助程序。

from ray import serve

from io import BytesIO
from PIL import Image
from starlette.requests import Request
from typing import Dict

import torch
from torchvision import transforms
from torchvision.models import resnet18

定义一个 ImageModel 类,该类解析输入数据,转换图像,并运行从 torchvision 加载的 ResNet18 模型。@serve.deployment 装饰器使其成为一个部署对象,你可以将其部署到 Ray Serve 上。请注意,Ray Serve 通过 HTTP 路由暴露此部署。默认情况下,当部署通过 HTTP 接收到请求时,Ray Serve 会调用 __call__ 方法。

@serve.deployment
class ImageModel:
    def __init__(self):
        self.model = resnet18(pretrained=True).eval()
        self.preprocessor = transforms.Compose(
            [
                transforms.Resize(224),
                transforms.CenterCrop(224),
                transforms.ToTensor(),
                transforms.Lambda(lambda t: t[:3, ...]),  # remove alpha channel
                transforms.Normalize(
                    mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]
                ),
            ]
        )

    async def __call__(self, starlette_request: Request) -> Dict:
        image_payload_bytes = await starlette_request.body()
        pil_image = Image.open(BytesIO(image_payload_bytes))
        print("[1/3] Parsed image data: {}".format(pil_image))

        pil_images = [pil_image]  # Our current batch size is one
        input_tensor = torch.cat(
            [self.preprocessor(i).unsqueeze(0) for i in pil_images]
        )
        print("[2/3] Images transformed, tensor shape {}".format(input_tensor.shape))

        with torch.no_grad():
            output_tensor = self.model(input_tensor)
        print("[3/3] Inference done!")
        return {"class_index": int(torch.argmax(output_tensor[0]))}

注意

当你部署和实例化 ImageModel 类时,Ray Serve 会从 torchvision 加载 ResNet18 模型,以便它稍后能够运行模型的推理并处理请求。

现在你已经定义了 Serve 部署,准备好它以便进行部署。

image_model = ImageModel.bind()

注意

ImageModel.bind() 返回一个 DeploymentNode 对象,它是 ImageModel 部署对象的包装,你可以使用它与其他 DeploymentNodes 连接,形成更复杂的部署图

最后,通过终端将模型部署到 Ray Serve。

$ serve run tutorial_pytorch:image_model

接下来,查询模型。在 Serve 运行时,打开另一个终端窗口,并在交互式 Python shell 或单独的 Python 脚本中运行以下代码:

import requests

ray_logo_bytes = requests.get(
    "https://raw.githubusercontent.com/ray-project/"
    "ray/master/doc/source/images/ray_header_logo.png"
).content

resp = requests.post("http://localhost:8000/", data=ray_logo_bytes)
print(resp.json())

你应该会得到类似如下的输出,尽管具体的数字可能有所不同

{'class_index': 919}

本示例训练并部署一个简单的 scikit-learn 分类器。特别是,它展示了

  • 如何在 Ray Serve 定义中从文件系统加载 scikit-learn 模型。

  • 如何解析 JSON 请求并进行预测。

Ray Serve 是框架无关的。你可以使用任何版本的 sklearn。你还需要 requests 来向你的模型部署发送 HTTP 请求。如果尚未安装,请运行以下命令安装 scikit-learn 和 requests:

$ pip install scikit-learn requests "ray[serve]"

打开一个新的 Python 文件,命名为 tutorial_sklearn.py。导入 Ray Serve 和一些其他帮助程序。

from ray import serve

import pickle
import json
import numpy as np
import os
import tempfile
from starlette.requests import Request
from typing import Dict

from sklearn.datasets import load_iris
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import mean_squared_error

训练分类器

接下来,使用Iris 数据集训练分类器。

首先,实例化一个从 scikit-learn 加载的 GradientBoostingClassifier

model = GradientBoostingClassifier()

接下来,加载 Iris 数据集,并将数据分割为训练集和验证集。

iris_dataset = load_iris()
data, target, target_names = (
    iris_dataset["data"],
    iris_dataset["target"],
    iris_dataset["target_names"],
)

np.random.shuffle(data)
np.random.shuffle(target)
train_x, train_y = data[:100], target[:100]
val_x, val_y = data[100:], target[100:]

然后,训练模型并将其保存到文件。

model.fit(train_x, train_y)
print("MSE:", mean_squared_error(model.predict(val_x), val_y))

# Save the model and label to file
MODEL_PATH = os.path.join(
    tempfile.gettempdir(), "iris_model_gradient_boosting_classifier.pkl"
)
LABEL_PATH = os.path.join(tempfile.gettempdir(), "iris_labels.json")

with open(MODEL_PATH, "wb") as f:
    pickle.dump(model, f)
with open(LABEL_PATH, "w") as f:
    json.dump(target_names.tolist(), f)

使用 Ray Serve 部署

最后,你就可以使用 Ray Serve 部署分类器了。

定义一个 BoostingModel 类,该类在你训练的 GradientBoosingClassifier 模型上运行推理并返回结果标签。它使用 @serve.deployment 装饰器标记,使其成为一个部署对象,以便你可以将其部署到 Ray Serve 上。请注意,Ray Serve 通过 HTTP 路由暴露此部署。默认情况下,当部署通过 HTTP 接收到请求时,Ray Serve 会调用 __call__ 方法。

@serve.deployment
class BoostingModel:
    def __init__(self, model_path: str, label_path: str):
        with open(model_path, "rb") as f:
            self.model = pickle.load(f)
        with open(label_path) as f:
            self.label_list = json.load(f)

    async def __call__(self, starlette_request: Request) -> Dict:
        payload = await starlette_request.json()
        print("Worker: received starlette request with data", payload)

        input_vector = [
            payload["sepal length"],
            payload["sepal width"],
            payload["petal length"],
            payload["petal width"],
        ]
        prediction = self.model.predict([input_vector])[0]
        human_name = self.label_list[prediction]
        return {"result": human_name}

注意

当你部署和实例化 BoostingModel 类时,Ray Serve 会从文件系统加载你训练的分类器模型,以便它稍后能够运行模型的推理并处理请求。

定义好 Serve 部署后,准备好它以便进行部署。

boosting_model = BoostingModel.bind(MODEL_PATH, LABEL_PATH)

注意

BoostingModel.bind(MODEL_PATH, LABEL_PATH) 将参数 MODEL_PATHLABEL_PATH 绑定到部署,并返回一个 DeploymentNode 对象,它是 BoostingModel 部署对象的包装,你可以使用它与其他 DeploymentNodes 连接,形成更复杂的部署图

最后,通过终端将模型部署到 Ray Serve。

$ serve run tutorial_sklearn:boosting_model

接下来,查询模型。在 Serve 运行时,打开另一个终端窗口,并在交互式 Python shell 或单独的 Python 脚本中运行以下代码:

import requests

sample_request_input = {
    "sepal length": 1.2,
    "sepal width": 1.0,
    "petal length": 1.1,
    "petal width": 0.9,
}
response = requests.get("http://localhost:8000/", json=sample_request_input)
print(response.text)

你应该会得到类似如下的输出,尽管具体的预测结果可能有所不同

{"result": "versicolor"}