构建一个使用工具的代理#

本教程将指导您使用 LangChain、LangGraph 和 Ray Serve 在 Anyscale 上构建和部署一个复杂的、使用工具的代理。

了解如何创建一个可扩展的微服务架构,其中每个组件——代理、LLM 和工具——都作为一个独立的、自动缩放的服务运行。

  • 代理(使用 LangGraph 构建)负责编排任务和管理对话状态。

  • LLM(Qwen 4 B)在自己的服务中运行,以实现专用、高速的推理。

  • 工具(天气 API)通过模型上下文协议(MCP)暴露自身,该协议是一个开放标准,允许代理动态地发现和使用它们。

这种解耦的设计提供了自动缩放、故障隔离以及在不更改代理代码的情况下更新或替换 LLM 或工具等组件的灵活性。

架构概述#

您可以从三个核心组件构建代理系统,每个组件都作为一个独立的 Ray Serve 应用程序运行。

组件#

  • 代理服务(LangChain 代理):操作的“大脑”。它负责编排多步推理和管理对话状态。它轻量级(仅 CPU)并与 Ray Serve 一起部署。注意: LangGraph v1 已弃用 createReactAgent 预构建组件。使用 LangChain 的 create_agent,它运行在 LangGraph 上并添加了一个灵活的中间件系统。

  • LLM 服务(Ray Serve LLM):“语言引擎”。它运行 Qwen/Qwen3-4B-Instruct-2507-FP8 模型,针对工具使用进行了优化。它与 vLLM 一起部署在 GPU(L4)上,以实现高速推理,并提供 OpenAI 兼容的 API。

  • 工具服务(MCP):“执行者”。它将天气 API 暴露为一组工具。代理在运行时使用模型上下文协议(MCP)发现这些工具。它也是一个无状态、仅 CPU 的服务。

Overall Architecture

此架构的优势#

这种架构允许每个组件独立扩展。您 GPU 密集型的 LLM 服务可以根据推理需求进行扩展和缩减,这与轻量级的、基于 CPU 的代理编排是分开的。

使用 Ray 和 Anyscale 的主要优势包括:#

  • 独立扩展:分别扩展 LLM 的 GPU 和代理/工具的 CPU。

  • 高可用性:零停机更新和故障自动恢复。

  • 灵活性:只需部署新服务即可轻松替换 LLM 或添加新工具。代理将在运行时发现它们——无需更改代码。

  • 增强的可观察性:Anyscale 为每个服务提供全面的日志、指标和跟踪。

附加资源#

有关 LLM 服务和 Ray Serve 的更多信息,请参阅以下内容

依赖项和计算资源要求#

Docker 镜像:为获得最佳兼容性和性能,请使用 Docker 镜像 anyscale/ray-llm:2.52.0-py311-cu128。此镜像包含 Ray 2.52.0、Python 3.11 和 CUDA 12.8 支持,为带 GPU 加速的 LLM 服务提供了所有必需的依赖项。

GPU 要求:部署需要两种计算资源:一个用于 LLM 服务的 L4 GPU(g6.2xlarge 实例,24 GB GPU 内存),以及一个用于 MCP 和代理服务的 m5d.xlarge(4 vCPU)。

Python 库:本项目使用 requirements.txt 进行依赖管理。运行以下命令安装依赖项

%%bash
# Install dependencies
pip install -r requirements.txt

实现:构建服务#

本项目由多个 Python 脚本组成,这些脚本协同工作以创建和提供代理。

步骤 1:创建 LLM 服务#

查看 llm_deploy_qwen.py 中的代码。此脚本使用 Ray Serve 的 build_openai_app 工具将 Qwen LLM(Qwen/Qwen3-4B-Instruct-2507-FP8)部署为 OpenAI 兼容的 API 端点。这允许您使用任何 OpenAI 兼容客户端(包括 LangChain)来使用 Qwen 模型。

以下是此脚本中的关键配置

  • accelerator_type="L4":指定 GPU 类型。L4 GPU(Ada Lovelace 架构)针对 FP8 精度进行了优化,使其对于此量化模型具有成本效益。为获得更高的吞吐量,请使用 H100 GPU。有关 GPU 选择指南,请参阅 GPU 指南文档

  • enable_auto_tool_choice=True:启用模型以根据输入自动决定何时使用工具。这对于 LLM 需要确定是调用工具还是直接响应的代理工作流至关重要。有关工具调用的更多信息,请参阅 工具和函数调用文档

  • tool_call_parser="hermes":指定工具调用的解析策略。hermes 解析器专为遵循 Hermes 函数调用格式的模型设计,Qwen 模型支持该格式。

  • trust_remote_code=True:从 Hugging Face 加载 Qwen 模型时需要,因为它们使用自定义聊天模板和标记化逻辑,这些逻辑不是标准 transformers 库的一部分。

其他 LLM 开发资源

步骤 2:创建 MCP 天气工具服务#

查看 weather_mcp_ray.py 中的代码,将天气工具部署为 MCP(模型上下文协议)服务。

天气工具服务的工作原理

weather_mcp_ray.py 脚本使用 FastMCP 来定义和公开与天气相关的工具。此服务是一个与 Ray Serve 一起部署的 FastAPI 应用程序,通过 HTTP 提供工具。

  • FastMCP 框架FastMCP 类提供了一种使用 Python 装饰器定义工具的方法。设置 stateless_http=True 使其适合部署为 HTTP 服务。

  • 工具注册:每个用 @mcp.tool() 装饰的函数都会成为一个可自动发现的工具

    • get_alerts(state: str):为给定的美国州代码获取活动天气警报。

    • get_forecast(latitude: float, longitude: float):检索指定坐标的 5 期天气预报。

  • 工具元数据:每个工具函数的文档字符串充当代理用来理解何时以及如何调用每个工具的描述。这对于 LLM 决定使用哪个工具至关重要。

  • Ray Serve 部署:与 Ray Serve 一起部署时,这会成为一个可扩展的微服务,可以处理来自代理实例的多个并发工具请求。

注意

Ray Serve 仅支持 MCP 中的无状态 HTTP 模式。设置 stateless_http=True 以防止在运行多个副本时出现“会话未找到”错误

mcp = FastMCP("weather", stateless_http=True)

附加资源

步骤 3:创建代理逻辑#

Agent Architecture

查看 agent_with_mcp.py 中的代码,定义用于编排 LLM 和工具的代理。

核心函数是 build_agent

# ========== BUILD AGENT ==========
async def build_agent():
    """Instantiate an agent with MCP tools when available."""
    mcp_tools = await get_mcp_tools()

    tools = list(mcp_tools)
    print(f"\n[Agent] Using {len(tools)} tool(s).")

    memory = MemorySaver()
    agent = create_agent(
        llm,
        tools,
        system_prompt=PROMPT,
        checkpointer=memory,
    )
    return agent

代理的工作原理

  • LLM 配置:使用 OpenAI 兼容的 API 连接到您已部署的 Qwen 模型。

  • 工具发现get_mcp_tools 函数使用 MultiServerMCPClient 从 MCP 服务自动发现可用工具。

  • 代理创建:使用 LangChain 的 create_agent 函数,使用 LLM、工具和系统提示创建一个代理。

  • 内存管理:使用 MemorySaver 在多个回合中维护对话状态。

步骤 4:创建代理部署脚本#

ray_serve_agent_deployment.py 脚本将代理部署为 Ray Serve 应用程序,并提供一个 /chat 端点。

import json
from contextlib import asynccontextmanager
from typing import AsyncGenerator
from uuid import uuid4

from fastapi import FastAPI, Request
from fastapi.encoders import jsonable_encoder
from starlette.responses import StreamingResponse
from ray import serve

from agent_with_mcp import build_agent  # Your factory that returns a LangChain / LangGraph agent.

# ----------------------------------------------------------------------
# FastAPI app with an async lifespan hook.
# ----------------------------------------------------------------------
@asynccontextmanager
async def lifespan(app: FastAPI):
    agent = await build_agent()  # Likely compiled with a checkpointer.
    app.state.agent = agent
    try:
        yield
    finally:
        if hasattr(agent, "aclose"):
            await agent.aclose()

fastapi_app = FastAPI(lifespan=lifespan)

@fastapi_app.post("/chat")
async def chat(request: Request):
    """
    POST /chat
    Body: {"user_request": "<text>", "thread_id": "<optional>", "checkpoint_ns": "<optional>"}

    Streams LangGraph 'update' dicts as SSE (one JSON object per event).
    """
    body = await request.json()
    user_request: str = body.get("user_request", "")

    # Threading and checkpoint identifiers.
    thread_id = (
        body.get("thread_id")
        or request.headers.get("X-Thread-Id")
        or str(uuid4())  # New thread per request if none provided.
    )
    checkpoint_ns = body.get("checkpoint_ns")  # Optional namespacing.

    # Build config for LangGraph.
    config = {"configurable": {"thread_id": thread_id}}
    if checkpoint_ns:
        config["configurable"]["checkpoint_ns"] = checkpoint_ns

    async def event_stream() -> AsyncGenerator[str, None]:
        agent = request.app.state.agent
        inputs = {"messages": [{"role": "user", "content": user_request}]}

        try:
            # Stream updates from the agent.
            async for update in agent.astream(inputs, config=config, stream_mode="updates"):
                safe_update = jsonable_encoder(update)
                # Proper SSE framing: "data: <json>\n\n".
                yield f"data: {json.dumps(safe_update)}\n\n"
        except Exception as e:
            # Don't crash the SSE; surface one terminal error event and end.
            err = {"error": type(e).__name__, "detail": str(e)}
            yield f"data: {json.dumps(err)}\n\n"

    # Expose thread id so the client can reuse it on the next call.
    headers = {"X-Thread-Id": thread_id}

    return StreamingResponse(
        event_stream(),
        media_type="text/event-stream",
        headers=headers,
    )

# ----------------------------------------------------------------------
# Ray Serve deployment wrapper.
# ----------------------------------------------------------------------
@serve.deployment(ray_actor_options={"num_cpus": 1})
@serve.ingress(fastapi_app)
class LangGraphServeDeployment:
    pass

app = LangGraphServeDeployment.bind()

# Deploy the agent app locally:
# serve run ray_serve_agent_deployment:app

# Deploy the agent using Anyscale service:
# anyscale service deploy ray_serve_agent_deployment:app

部署的工作原理

  • FastAPI 生命周期管理:使用 @asynccontextmanager 在启动时初始化代理,在关闭时清理。

  • 流式端点/chat 端点接受 POST 请求并返回服务器发送事件(SSE)。

    {
      "user_request": "What's the weather?",
      "thread_id": "optional-thread-id",
      "checkpoint_ns": "optional-namespace"
    }
    
  • 线程管理:每个对话都可以有一个 thread_id 来在请求之间维护上下文。如果您不提供 thread_id,系统将生成一个新的 UUID。

  • 事件流:使用 LangGraph 的 astream 将实时更新(工具调用、推理步骤、最终答案)作为 JSON 对象发出。

  • 资源分配:代理部署轻量级(每个副本 0.2 个 CPU,无 GPU),因为重计算发生在 LLM 服务中。

部署服务#

在查看了代码之后,将每个服务部署到 Anyscale。

步骤 5:部署 LLM 服务#

在 Anyscale 上部署 Qwen LLM 服务。此命令创建一个可扩展的 LLM 推理端点。

%%bash
anyscale service deploy llm_deploy_qwen:app --name llm_deploy_qwen_service

部署完成后,您将收到

  • 服务 URL(例如,https://llm-deploy-qwen-service-jgz99.cld-kvedzwag2qa8i5bj.s.anyscaleuserdata.com

  • 用于身份验证的 API 令牌

将这些值保存以供以后配置代理。 注意:您无需手动将 /v1 添加到 URL;代码使用 urljoin 自动添加它。

步骤 6:部署天气 MCP 服务#

部署天气工具服务。这会创建一个代理可以发现和调用天气工具的端点。

%%bash
anyscale service deploy weather_mcp_ray:app --name weather_mcp_service

部署完成后,您将收到

  • 服务 URL(例如,https://weather-mcp-service-jgz99.cld-kvedzwag2qa8i5bj.s.anyscaleuserdata.com

  • 用于身份验证的 API 令牌

注意:您无需手动将 /mcp 添加到 URL;代码使用 urljoin 自动添加它。

步骤 7:配置代理#

使用您从部署中收到的服务端点更新 agent_with_mcp.py。修改以下行

from urllib.parse import urljoin

API_KEY = "<your-llm-service-token>"
OPENAI_COMPAT_BASE_URL = "<your-llm-service-url>"  
MODEL = "Qwen/Qwen3-4B-Instruct-2507-FP8"
TEMPERATURE = 0.01
WEATHER_MCP_BASE_URL = "<your-mcp-service-url>"  
WEATHER_MCP_TOKEN = "<your-mcp-service-token>"

# The code uses urljoin to automatically append /v1 and /mcp:
llm = ChatOpenAI(
    model=MODEL,
    base_url=urljoin(OPENAI_COMPAT_BASE_URL, "v1"),
    api_key=API_KEY,
    ...
)

mcp_client = MultiServerMCPClient({
    "weather": {
        "url": urljoin(WEATHER_MCP_BASE_URL, "mcp"),
        ...
    }
})

步骤 8:在本地部署代理服务#

部署代理本身。为了进行本地测试,请在终端中使用 serve run。对于 Anyscale 上的生产部署,请参阅下一步。

本地部署

serve run ray_serve_agent_deployment:app  

测试代理#

步骤 9:发送测试请求#

在代理服务运行的情况下,向 /chat 端点发送请求。“helpers/agent_client_local.py”脚本发送请求并流式传输响应。

import json
import requests

SERVER_URL = "http://127.0.0.1:8000/chat"  # For local deployment.
HEADERS = {"Content-Type": "application/json"}

def chat(user_request: str, thread_id: str | None = None) -> None:
    """Send a chat request to the agent and stream the response."""
    payload = {"user_request": user_request}
    if thread_id:
        payload["thread_id"] = thread_id

    with requests.post(SERVER_URL, headers=HEADERS, json=payload, stream=True) as resp:
        resp.raise_for_status()
        # Capture thread_id for multi-turn conversations.
        server_thread = resp.headers.get("X-Thread-Id")
        if not thread_id and server_thread:
            print(f"[thread_id: {server_thread}]")
        # Stream SSE events.
        for line in resp.iter_lines():
            if not line:
                continue
            txt = line.decode("utf-8")
            if txt.startswith("data: "):
                txt = txt[len("data: "):]
            print(txt, flush=True)

# Test the agent.
chat("What's the weather in Palo Alto?")

终止本地服务器#

在本地测试代理后,您可以通过在同一终端中按 Ctrl + C 来关闭本地服务器,或者在另一个终端中运行以下命令来关闭它

serve shutdown -y

步骤 10:将代理部署到 Anyscale 生产环境#

在本地测试代理后,将其部署到 Anyscale 以供生产使用。这会创建一个可扩展的、托管的端点,并提供企业级功能。

为何部署到 Anyscale#

生产优势

  • 自动缩放:根据请求量自动缩放副本(0 到 N 个副本)

  • 高可用性:零停机部署,自动处理中断

  • 可观察性:内置指标、日志和分布式跟踪

  • 成本优化:空闲时缩减到零(配置正确时)

  • 负载均衡:将请求分发到多个代理副本

  • 故障隔离:代理、LLM 和工具作为独立服务运行

部署代理服务#

运行以下命令将您的代理部署到 Anyscale。此命令打包您的代码并创建一个生产就绪的服务。

%%bash
anyscale service deploy ray_serve_agent_deployment:app --name agent_service_langchain

理解部署输出#

运行部署命令后,您将收到

  • 服务 URL:您的代理的 HTTPS 端点(例如,https://agent-service-langchain-jgz99.cld-kvedzwag2qa8i5bj.s.anyscaleuserdata.com

  • 授权令牌:用于验证请求的 Bearer 令牌

  • 服务 UI 链接:在 Anyscale 控制台中监控您的服务的直接链接

测试生产代理#

部署后,使用经过身份验证的请求测试您的生产代理。使用您的部署详细信息更新以下代码。

注意:该存储库还包含“helpers/agent_client_anyscale.py”脚本供您参考。

import json
import requests

base_url = "https://agent-service-langchain-jgz99.cld-kvedzwag2qa8i5bj.s.anyscaleuserdata.com" ## replace with your service url
token = "nZp2BEjdloNlwGyxoWSpdalYGtkhfiHtfXhmV4BQuyk" ## replace with your service bearer token

SERVER_URL = f"{base_url}/chat"  # For Anyscale deployment.
HEADERS = {"Content-Type": "application/json",
"Authorization": f"Bearer {token}"
}

def chat(user_request: str, thread_id: str | None = None) -> None:
    """Send a chat request to the agent and stream the response."""
    payload = {"user_request": user_request}
    if thread_id:
        payload["thread_id"] = thread_id

    with requests.post(SERVER_URL, headers=HEADERS, json=payload, stream=True) as resp:
        resp.raise_for_status()
        # Capture thread_id for multi-turn conversations.
        server_thread = resp.headers.get("X-Thread-Id")
        if not thread_id and server_thread:
            print(f"[thread_id: {server_thread}]")
        # Stream SSE events.
        for line in resp.iter_lines():
            if not line:
                continue
            txt = line.decode("utf-8")
            if txt.startswith("data: "):
                txt = txt[len("data: "):]
            print(txt, flush=True)

# Test the agent.
chat("What's the weather in Palo Alto?")

终止生产服务器#

目前有三个生产服务正在运行:llm_deploy_qwen_serviceweather_mcp_serviceagent_service_langchain。您可以手动在服务页面上关闭它们,或者在另一个终端中运行以下命令来关闭它们。

anyscale service terminate -n llm_deploy_qwen_service
anyscale service terminate -n weather_mcp_service
anyscale service terminate -n agent_service_langchain

下一步#

您已成功使用 Anyscale 上的 Ray Serve 构建、部署和测试了一个多工具代理。此架构演示了如何构建具有独立扩展、故障隔离和动态工具发现的生产级 AI 应用程序。

扩展您的代理#

添加更多工具
使用数据库查询、API 集成或自定义业务逻辑等附加功能扩展 MCP 服务。MCP 协议允许您的代理动态发现新工具,而无需更改代码。有关实现示例,请参阅 Anyscale MCP 部署模板

替换或升级 LLM
将 Qwen 模型替换为其他工具调用模型,例如 GPT-4、Claude 或 Llama 系列。由于 LLM 作为独立服务运行,您可以进行 A/B 测试不同的模型或执行零停机升级。有关部署模式,请参阅 Anyscale LLM 服务模板

构建复杂的流程
使用 LangGraph 实现复杂的推理模式,例如多代理协作、迭代改进或基于工具输出的条件分支。

针对生产进行优化#

监控性能
使用 Anyscale 的内置可观察性来跟踪

  • 请求延迟和令牌吞吐量

  • GPU 利用率和内存使用量

  • 内存或磁盘使用量

  • 节点数量

要启用 LLM 指标,请参阅 使用 Ray Serve LLM 仪表板进行监控。注意:从 Ray 2.51 或更高版本开始,引擎指标日志记录默认启用。有关使用 Anyscale 服务进行监控的详细指标指南,请参阅 监控服务

高效扩展
独立配置每个服务的自动缩放策略

  • 根据 GPU 利用率扩展 LLM 服务

  • 根据请求量扩展代理服务

  • 根据特定的工作负载模式扩展工具服务

参阅 Ray Serve 自动缩放配置

生产最佳实践#

Anyscale 服务为在生产环境中运行代理提供了企业级功能。主要功能包括

有关生产部署的全面指南,请参阅 Anyscale 服务文档Anyscale Runtime 上的 Ray Serve