使用 Ray Serve 部署 DLinear 模型进行在线服务#

   

本教程将启动一个在线服务,该服务

  • 部署已训练的 DLinear 模型工件以生成时间序列预测

  • 根据实时入站流量自动缩放

  • 涵盖服务周围的可观测性和调试

请注意,此笔记本要求您运行 DLinear 模型分布式训练 教程,以生成此教程所需的预训练模型工件。

Ray Serve 是一个高度可伸缩且灵活的模型服务库,用于构建在线推理 API。您可以

  • 将模型和业务逻辑封装为独立的 服务部署,并将它们 连接 起来(管道、集成等)。

  • 避免使用一个受网络和计算限制的大型服务,从而浪费资源

  • 利用异构资源的分数,而 SageMaker、Vertex、KServe 等无法做到这一点,并且可以通过 num_replicas 进行水平缩放

  • 根据流量自动扩展

  • 与 FastAPI 和 HTTP 集成

  • 设置 gRPC 服务以构建分布式系统和微服务

  • 启用基于批量大小、时间等的 动态批处理

  • 访问用于 服务 LLM 的实用工具集,这些工具集与推理引擎无关,并对 LLM 特定的功能(如多 LoRA 支持)提供了开箱即用的支持。

https://github.com/anyscale/e2e-timeseries/blob/main/images/ray_serve.png?raw=true

设置环境#

首先,导入必要的模块并设置 Ray Serve 部署的环境

import asyncio
import os

import aiohttp
import numpy as np
import pandas as pd
import requests
import torch
from fastapi import FastAPI
# Remove this setting when it becomes the default in a future release.
os.environ["RAY_TRAIN_V2_ENABLED"] = "1"

# Now it's safe to import from Ray.
import ray
from ray import serve
from starlette.requests import Request

使用 e2e_timeseries 模块初始化 Ray 集群,以便新生成的 worker 可以从中导入。

import e2e_timeseries
from e2e_timeseries.model import DLinear

ray.init(runtime_env={"py_modules": [e2e_timeseries]})

创建 Ray Serve 部署#

接下来,定义 DLinear 模型的 Ray Serve 端点。此实现使用可重用类来避免为每个请求重新加载模型。该部署支持 Pythonic 和 HTTP 请求,并具有动态批处理以实现高效推理。

DEPLOYMENT_NAME = "dlinear-ett-server"

# Create a FastAPI app that adds endpoints to the Serve deployment.
app = FastAPI(title="DLinear", description="predict future oil temperatures", version="0.1")


@serve.deployment(num_replicas=1, ray_actor_options={"num_cpus": 1, "num_gpus": 1})
@serve.ingress(app)
class DLinearModelServe:
    def __init__(self, model_checkpoint_path: str | None = None):
        checkpoint = torch.load(model_checkpoint_path, map_location=torch.device("cpu"))  # Load to CPU first
        self.args = checkpoint["train_args"]
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        print(f"Using device: {self.device}")

        # Load model from checkpoint.
        self.model = DLinear(self.args).float()
        self.model.load_state_dict(checkpoint["model_state_dict"])
        print(f"Model loaded successfully from {model_checkpoint_path}")

        self.model.to(self.device)
        self.model.eval()

    @serve.batch(max_batch_size=32, batch_wait_timeout_s=0.1)
    async def predict_batch(self, batch_x: list[list[float]]) -> list[list[float]]:
        """
        Expects a list of series, where each series is a 1D list of floats/integers.
        e.g., [[0.1, 0.2, ..., 0.N], [0.3, 0.4, ..., 0.M]]
        Each series is a 1D list of floats/integers.
        """

        # Convert list of 1D series to a 2D numpy array (batch_size, seq_len).
        batch_x = np.array(batch_x, dtype=np.float32)
        batch_x = torch.from_numpy(batch_x).float().to(self.device)

        # Ensure batch_x is 3D: (batch_size, seq_len, num_features)
        # For univariate 'S' models, num_features is 1.
        if batch_x.ndim == 2:
            batch_x = batch_x.unsqueeze(-1)

        with torch.no_grad():
            outputs = self.model(batch_x)
            # Output shape: (batch_size, pred_len, features_out)

        # Slice to get the prediction length part of the output.
        # The [:, :, :] part takes all output features.
        # For 'S' (single-feature) forecasting, DLinear typically outputs 1 feature.
        # For 'M' (multi-feature) forecasting, DLinear typically outputs multiple features.
        outputs = outputs[:, -self.args["pred_len"] :, :]

        # If 'S' (single feature forecasting) and the model's output for that single
        # feature has an explicit last dimension of 1, squeeze it.
        # This approach makes the output a list of 1D series (list of lists of floats).
        if outputs.shape[-1] == 1:
            outputs = outputs.squeeze(-1)  # Shape: (batch_size, pred_len)

        outputs_list = outputs.cpu().numpy().tolist()
        return outputs_list

    @app.post("/predict")
    async def predict_endpoint(self, request: Request):
        """
        Expects a JSON body, which is a list of floats/integers.
        e.g., [0.1, 0.2, ..., 0.N]
        where N must be equal to self.args.seq_len.
        """
        try:
            input_data = await request.json()
            if not isinstance(input_data, list):
                return {"error": "Invalid input. JSON list of numbers expected."}
            if len(input_data) != self.args["seq_len"]:
                return {"error": f"Invalid series length. Expected {self.args['seq_len']}, got {len(input_data)}."}

        except Exception as e:
            return {"error": f"Failed to parse JSON request: {str(e)}"}

        # Pass the single list input_data, wrapped in another list, to predict_batch.
        # Ray Serve's @serve.batch handles collecting these into a batch for predict_batch.
        # The await call returns the specific result for this input_data.
        single_prediction_output = await self.predict_batch(input_data)

        # single_prediction_output is expected to be a list[float] (the prediction for one series)
        return single_prediction_output

    # Expose get_seq_len as a GET endpoint.
    @app.get("/seq_len")
    async def get_sequence_length(self):
        return {"seq_len": self.args["seq_len"]}
模型组合

Ray Serve 可以轻松进行模型组合,您可以将包含 ML 模型或业务逻辑的多个部署组合成一个应用程序。您可以独立缩放分数资源并配置每个部署。

https://raw.githubusercontent.com/anyscale/foundational-ray-app/refs/heads/main/images/serve_composition.png

加载模型并启动服务#

加载已训练的 DLinear 模型并启动 Ray Serve 部署。模型检查点路径从训练期间创建的元数据文件中加载。

# Load the best checkpoint path from the metadata file created in the training notebook.
best_checkpoint_metadata_fpath = "/mnt/cluster_storage/checkpoints/best_checkpoint_path.txt"
with open(best_checkpoint_metadata_fpath, "r") as f:
    best_checkpoint_path = f.read().strip()


def serve_model(best_checkpoint_path):
    dlinear_app = DLinearModelServe.bind(model_checkpoint_path=best_checkpoint_path)

    # The route_prefix applies to all routes within the FastAPI app.
    serve.run(dlinear_app, name=DEPLOYMENT_NAME, route_prefix="/predict_dlinear")
    print(f"DLinear model deployment '{DEPLOYMENT_NAME}' is running with FastAPI app.")
    print("  Prediction endpoint: http://127.0.0.1:8000/predict_dlinear/predict")
    print("  Sequence length endpoint: http://127.0.0.1:8000/predict_dlinear/seq_len")

    print("\nTo stop the server, press Ctrl+C in the terminal where it's running.")


serve_model(best_checkpoint_path)

您应该会看到表明服务正在本地运行的日志。

INFO 2025-04-09 14:06:55,760 serve 31684 -- Started Serve in namespace "serve".
INFO 2025-04-09 14:06:57,875 serve 31684 -- Application 'dlinear-ett-server' is ready at http://127.0.0.1:8000/.

测试服务#

使用单个请求和并发批处理请求来测试已部署的 DLinear 模型,以演示动态批处理功能。

async def test_serve():
    # --- Example Client Code, which can be run in a separate script or after serve starts ---

    # Base URL for the service.
    base_url = "http://127.0.0.1:8000/predict_dlinear"
    seq_len_url = f"{base_url}/seq_len"
    predict_url = f"{base_url}/predict"

    # Get the proper seq_len for the deployed model.
    response = requests.get(seq_len_url)
    response.raise_for_status()
    seq_len_data = response.json()
    seq_len = seq_len_data.get("seq_len")

    # Load sample data for demonstration purposes.
    df = pd.read_csv("s3://air-example-data/electricity-transformer/ETTh2.csv")
    ot_series = df["OT"].tolist()

    # Create a single sample request from the loaded data.
    sample_input_series = ot_series[:seq_len]
    sample_request_body = sample_input_series

    print("\n--- Sending Single Synchronous Request to /predict endpoint ---")
    response = requests.post(predict_url, json=sample_request_body)
    response.raise_for_status()
    prediction = response.json()
    print(f"Prediction (first 5 values): {prediction[:5]}")

    print("\n--- Sending Batch Asynchronous Requests to /predict endpoint ---")
    sample_input_list = [sample_input_series] * 100  # Use identical requests

    async def fetch(session, url, data):
        async with session.post(url, json=data) as response:
            response.raise_for_status()
            return await response.json()

    async def fetch_all_concurrently(requests_to_send: list):
        async with aiohttp.ClientSession() as session:
            tasks = [fetch(session, predict_url, input_data) for input_data in requests_to_send]
            responses = await asyncio.gather(*tasks, return_exceptions=True)
            return responses

    predictions = await fetch_all_concurrently(sample_input_list)
    print(f"Finished predictions for {len(sample_input_list)} inputs")


# Running this code in a notebook creates an asyncio event loop in the global scope.
# So, use await directly.
await test_serve()
# Use `asyncio.run(test_serve())` instead if running the code in a script.
    

Ray Serve 的动态批处理会自动分块入站请求,以最大化吞吐量和硬件利用率,同时保持低延迟。

服务的可观测性

Ray Dashboard 会自动捕获 Ray Serve 应用程序的可观测性,位于 Serve 视图中。您可以查看服务部署及其副本,以及关于服务运行状况的时间序列指标。

https://raw.githubusercontent.com/anyscale/e2e-timeseries/refs/heads/main/images/serve_dashboard.png

生产部署注意事项#

Anyscale 服务

Anyscale Services 提供了一种容错、可伸缩且经过优化的方式来服务 Ray Serve 应用程序。有关更多详细信息,请参阅 API 参考。您可以

https://raw.githubusercontent.com/anyscale/e2e-timeseries/refs/heads/main/images/canary.png

Anyscale 上的 RayTurbo Serve 具有 Ray Serve 之上的更多功能

  • 快速自动缩放和模型加载,使服务更快地启动和运行,即使对于 LLM 也有 5 倍的改进

  • 对于高流量服务用例,QPS 提高 54%每秒流式传输的 token 高达 3 倍,并且没有代理瓶颈

  • 副本压缩,尽可能合并到更少的节点中,以减少资源碎片并提高硬件利用率。

  • 零停机增量部署,因此服务永远不会中断

  • 在多服务应用程序中为每个服务提供不同的环境 不同的环境

  • 多可用区感知 Ray Serve 副本调度,为可用区故障提供更高的冗余度。

部署到生产环境#

要在 Anyscale 上进行生产部署,您可以使用以下命令

# Production online service.
anyscale service deploy e2e_timeseries.serve:dlinear_model --name=dlinear-ett-forecaster \
  --containerfile="${WORKING_DIR}/containerfile" \
  --working-dir="${WORKING_DIR}" \
  --exclude=""

注意:

  • 此示例使用 containerfile 来定义依赖项,但您也可以轻松使用预构建的镜像。

  • 您可以将计算资源指定为 计算配置,或者在 服务配置 文件中内联指定。

  • 当您不指定计算并在工作区中启动时,默认情况下使用的是工作区的计算配置。

服务在远程运行后,您需要使用 bearer token 来查询它。您可以修改请求代码以使用此 token。

# Service specific config. Replace with your own values from the deployment logs.
base_url = "https://dlinear-ett-forecaster-jgz99.cld-kvedzwag2qa8i5bj.s.anyscaleuserdata.com"
token = "tXhmYYY7qMbrb1ToO9_J3n5_kD7ym7Nirs8djtip7P0"

# Requests config.
path = "/predict_dlinear/predict"
full_url = f"{base_url}{path}"
headers = {"Authorization": f"Bearer {token}"}

prediction = requests.post(full_url, json=sample_input_series, headers=headers).json()

不要忘记在不再需要服务时停止它。

anyscale service terminate --name dlinear-ett-forecaster
CI/CD

虽然 Anyscale JobsServices 是帮助您将工作负载投入生产的有用原子概念,但它们也非常适合作为大型 ML DAG 或 CI/CD 工作流中的节点。您可以将 Jobs 链接在一起,存储结果,然后使用这些工件为应用程序提供服务。然后,您可以根据事件、时间等触发服务更新并重新触发 Jobs。虽然您可以使用 Anyscale CLI 与任何编排平台集成,但 Anyscale 支持一些专用集成,如 AirflowPrefect

https://raw.githubusercontent.com/anyscale/e2e-timeseries/refs/heads/main/images/cicd.png