使用请求和响应流服务聊天机器人#

本示例部署了一个聊天机器人,它将输出流式传输回用户。它展示了如何:

  • 如何从 Serve 应用流式传输输出

  • 如何在 Serve 应用中使用 WebSockets

  • 如何将批量请求与流式输出结合

本教程应能帮助您解决以下用例:

  • 您想服务一个大型语言模型并按 token 流式传回结果。

  • 您想服务一个聊天机器人,该机器人接受来自用户的输入流。

本教程服务于 DialoGPT 语言模型。安装 Hugging Face 库以访问它:

pip install "ray[serve]" transformers torch

创建流式部署#

创建一个名为 textbot.py 的 Python 文件。首先,添加导入和 Serve 日志记录器

import asyncio
import logging
from queue import Empty

from fastapi import FastAPI
from starlette.responses import StreamingResponse
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer

from ray import serve

logger = logging.getLogger("ray.serve")

创建一个 FastAPI 部署,并在构造函数中初始化模型和 tokenizer。

fastapi_app = FastAPI()


@serve.deployment
@serve.ingress(fastapi_app)
class Textbot:
    def __init__(self, model_id: str):
        self.loop = asyncio.get_running_loop()

        self.model_id = model_id
        self.model = AutoModelForCausalLM.from_pretrained(self.model_id)
        self.tokenizer = AutoTokenizer.from_pretrained(self.model_id)

注意,构造函数还会缓存一个 asyncio 循环。当您需要运行模型并同时将其 token 流式传输回用户时,此行为非常有用。

添加以下逻辑以处理发送到 Textbot 的请求:

    @fastapi_app.post("/")
    def handle_request(self, prompt: str) -> StreamingResponse:
        logger.info(f'Got prompt: "{prompt}"')
        streamer = TextIteratorStreamer(
            self.tokenizer, timeout=0, skip_prompt=True, skip_special_tokens=True
        )
        self.loop.run_in_executor(None, self.generate_text, prompt, streamer)
        return StreamingResponse(
            self.consume_streamer(streamer), media_type="text/plain"
        )

    def generate_text(self, prompt: str, streamer: TextIteratorStreamer):
        input_ids = self.tokenizer([prompt], return_tensors="pt").input_ids
        self.model.generate(input_ids, streamer=streamer, max_length=10000)

    async def consume_streamer(self, streamer: TextIteratorStreamer):
        while True:
            try:
                for token in streamer:
                    logger.info(f'Yielding token: "{token}"')
                    yield token
                break
            except Empty:
                # The streamer raises an Empty exception if the next token
                # hasn't been generated yet. `await` here to yield control
                # back to the event loop so other coroutines can run.
                await asyncio.sleep(0.001)

Textbot 使用三种方法处理请求:

  • handle_request:HTTP 请求的入口点。FastAPI 自动解包 prompt 查询参数并将其传递给 handle_request。然后,此方法创建一个 TextIteratorStreamer。Hugging Face 提供了这个 streamer 作为访问语言模型生成的 token 的便捷接口。handle_request 随后使用 self.loop.run_in_executor 在后台线程中启动模型。此行为允许模型生成 token,而 handle_request 同时调用 self.consume_streamer 将 token 流式传输回用户。self.consume_streamer 是一个生成器,它从 streamer 中一个接一个地生成 token。最后,handle_requestself.consume_streamer 生成器传递给 Starlette StreamingResponse 并返回响应。Serve 解包 Starlette StreamingResponse 并将生成器的内容一个接一个地生成回用户。

  • generate_text:运行模型的方法。此方法在由 handle_request 启动的后台线程中运行。它将生成的 token 推入由 handle_request 构建的 streamer 中。

  • consume_streamer:一个生成器方法,用于消费由 handle_request 构建的 streamer。此方法持续从 streamer 生成 token,直到 generate_text 中的模型关闭 streamer。当 streamer 为空并等待新 token 时,此方法会调用 asyncio.sleep 并设置一个短暂超时,从而避免阻塞事件循环。

Textbot 绑定到语言模型。对于本教程,使用 "microsoft/DialoGPT-small" 模型:

app = Textbot.bind("microsoft/DialoGPT-small")

使用 serve run textbot:app 运行模型,然后从另一个终端窗口使用此脚本查询它:

import requests

prompt = "Tell me a story about dogs."

response = requests.post(f"http://localhost:8000/?prompt={prompt}", stream=True)
response.raise_for_status()
for chunk in response.iter_content(chunk_size=None, decode_unicode=True):
    print(chunk, end="")

    # Dogs are the best.

您应该看到输出一个 token 接一个 token 地打印出来。

使用 WebSockets 流式传输输入和输出#

WebSockets 允许您将输入流式传输到应用中,并将输出流式传输回客户端。使用 WebSockets 创建一个存储用户对话的聊天机器人。

创建一个名为 chatbot.py 的 Python 文件。首先添加导入:

import asyncio
import logging
from queue import Empty

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer

from ray import serve

logger = logging.getLogger("ray.serve")

创建一个 FastAPI 部署,并在构造函数中初始化模型和 tokenizer。

fastapi_app = FastAPI()


@serve.deployment
@serve.ingress(fastapi_app)
class Chatbot:
    def __init__(self, model_id: str):
        self.loop = asyncio.get_running_loop()

        self.model_id = model_id
        self.model = AutoModelForCausalLM.from_pretrained(self.model_id)
        self.tokenizer = AutoTokenizer.from_pretrained(self.model_id)

添加以下逻辑以处理发送到 Chatbot 的请求:

    @fastapi_app.websocket("/")
    async def handle_request(self, ws: WebSocket) -> None:
        await ws.accept()

        conversation = ""
        try:
            while True:
                prompt = await ws.receive_text()
                logger.info(f'Got prompt: "{prompt}"')
                conversation += prompt
                streamer = TextIteratorStreamer(
                    self.tokenizer,
                    timeout=0,
                    skip_prompt=True,
                    skip_special_tokens=True,
                )
                self.loop.run_in_executor(
                    None, self.generate_text, conversation, streamer
                )
                response = ""
                async for text in self.consume_streamer(streamer):
                    await ws.send_text(text)
                    response += text
                await ws.send_text("<<Response Finished>>")
                conversation += response
        except WebSocketDisconnect:
            print("Client disconnected.")

    def generate_text(self, prompt: str, streamer: TextIteratorStreamer):
        input_ids = self.tokenizer([prompt], return_tensors="pt").input_ids
        self.model.generate(input_ids, streamer=streamer, max_length=10000)

    async def consume_streamer(self, streamer: TextIteratorStreamer):
        while True:
            try:
                for token in streamer:
                    logger.info(f'Yielding token: "{token}"')
                    yield token
                break
            except Empty:
                await asyncio.sleep(0.001)


generate_textconsume_streamer 方法与 Textbot 中的相同。handle_request 方法已更新以处理 WebSocket 请求。

handle_request 方法使用 fastapi_app.websocket 装饰器装饰,这使其能够接受 WebSocket 请求。首先,它 await 接受客户端的 WebSocket 请求。然后,在客户端断开连接之前,它执行以下操作:

  • 使用 ws.receive_text 从客户端获取 prompt

  • 启动一个新的 TextIteratorStreamer 以访问生成的 token

  • 在后台线程中运行模型处理到目前为止的对话

  • 使用 ws.send_text 将模型的输出流式传回

  • 将 prompt 和响应存储在 conversation 字符串中

每次 handle_request 从客户端获取新的 prompt 时,它会将整个对话(附加新的 prompt)通过模型运行。当模型完成生成 token 时,handle_request 发送字符串 "<<Response Finished>>" 通知客户端模型已生成所有 token。handle_request 将持续运行,直到客户端明确断开连接。此断开连接会引发 WebSocketDisconnect 异常,从而结束调用。

阅读 FastAPI 文档以了解更多关于 WebSockets 的信息。

Chatbot 绑定到语言模型。对于本教程,使用 "microsoft/DialoGPT-small" 模型:

app = Chatbot.bind("microsoft/DialoGPT-small")

使用 serve run chatbot:app 运行模型。使用 websockets 包查询它,使用 pip install websockets 进行安装:

from websockets.sync.client import connect

with connect("ws://localhost:8000") as websocket:
    websocket.send("Space the final")
    while True:
        received = websocket.recv()
        if received == "<<Response Finished>>":
            break
        print(received, end="")
    print("\n")

    websocket.send(" These are the voyages")
    while True:
        received = websocket.recv()
        if received == "<<Response Finished>>":
            break
        print(received, end="")
    print("\n")

您应该看到输出一个 token 接一个 token 地打印出来。

批量处理请求并流式传输每个请求的输出#

通过在运行模型时将请求批量处理,提高模型利用率并降低请求延迟。

创建一个名为 batchbot.py 的 Python 文件。首先添加导入:

import asyncio
import logging
from queue import Empty, Queue

from fastapi import FastAPI
from transformers import AutoModelForCausalLM, AutoTokenizer

from ray import serve

logger = logging.getLogger("ray.serve")

警告

Hugging Face 对 Streamers 的支持仍在开发中,将来可能会发生变化。RawQueue 与 Hugging Face 4.30.2 中的 Streamers 接口兼容。但是,将来 Streamers 接口可能会改变,导致 RawQueue 与未来的 Hugging Face 模型不兼容。

TextbotChatbot 类似,Batchbot 需要一个 streamer 来流式传输批量请求的输出,但 Hugging Face Streamers 不支持批量请求。添加此自定义 RawStreamer 以处理 token 批次:

class RawStreamer:
    def __init__(self, timeout: float = None):
        self.q = Queue()
        self.stop_signal = None
        self.timeout = timeout

    def put(self, values):
        self.q.put(values)

    def end(self):
        self.q.put(self.stop_signal)

    def __iter__(self):
        return self

    def __next__(self):
        result = self.q.get(timeout=self.timeout)
        if result == self.stop_signal:
            raise StopIteration()
        else:
            return result


创建一个 FastAPI 部署,并在构造函数中初始化模型和 tokenizer。

fastapi_app = FastAPI()


@serve.deployment
@serve.ingress(fastapi_app)
class Batchbot:
    def __init__(self, model_id: str):
        self.loop = asyncio.get_running_loop()

        self.model_id = model_id
        self.model = AutoModelForCausalLM.from_pretrained(self.model_id)
        self.tokenizer = AutoTokenizer.from_pretrained(self.model_id)
        self.tokenizer.pad_token = self.tokenizer.eos_token

TextbotChatbot 不同,Batchbot 构造函数还设置了 pad_token。您需要设置此 token 来批量处理不同长度的 prompt。

添加以下逻辑以处理发送到 Batchbot 的请求:

    @fastapi_app.post("/")
    async def handle_request(self, prompt: str) -> StreamingResponse:
        logger.info(f'Got prompt: "{prompt}"')
        return StreamingResponse(self.run_model(prompt), media_type="text/plain")

    @serve.batch(max_batch_size=2, batch_wait_timeout_s=15)
    async def run_model(self, prompts: List[str]):
        streamer = RawStreamer()
        self.loop.run_in_executor(None, self.generate_text, prompts, streamer)
        on_prompt_tokens = True
        async for decoded_token_batch in self.consume_streamer(streamer):
            # The first batch of tokens contains the prompts, so we skip it.
            if not on_prompt_tokens:
                logger.info(f"Yielding decoded_token_batch: {decoded_token_batch}")
                yield decoded_token_batch
            else:
                logger.info(f"Skipped prompts: {decoded_token_batch}")
                on_prompt_tokens = False

    def generate_text(self, prompts: str, streamer: RawStreamer):
        input_ids = self.tokenizer(prompts, return_tensors="pt", padding=True).input_ids
        self.model.generate(input_ids, streamer=streamer, max_length=10000)

    async def consume_streamer(self, streamer: RawStreamer):
        while True:
            try:
                for token_batch in streamer:
                    decoded_tokens = []
                    for token in token_batch:
                        decoded_tokens.append(
                            self.tokenizer.decode(token, skip_special_tokens=True)
                        )
                    logger.info(f"Yielding decoded tokens: {decoded_tokens}")
                    yield decoded_tokens
                break
            except Empty:
                await asyncio.sleep(0.001)


Batchbot 使用四种方法处理请求:

  • handle_request:入口点方法。此方法简单地接收请求的 prompt 并调用其上的 run_model 方法。run_model 是一个生成器方法,也负责处理请求批处理。handle_requestrun_model 传递给 Starlette StreamingResponse 并返回响应,以便机器人可以将生成的 token 流式传输回客户端。

  • run_model:执行批处理的生成器方法。由于 run_model 使用 @serve.batch 装饰,它自动接收一个 prompt 批次。有关更多信息,请参阅 批处理指南run_model 创建一个 RawStreamer 以访问生成的 token。它在后台线程中调用 generate_text,并传递 prompt 和 streamer,类似于 Textbot。然后它遍历 consume_streamer 生成器,重复生成由模型生成的一批 token。

  • generate_text:运行模型的方法。它与 Textbot 中的 generate_text 大体相同,但有两点不同。首先,它接收并处理一批 prompt,而不是单个 prompt。其次,它设置了 padding=True,这样不同长度的 prompt 可以被批量处理在一起。

  • consume_streamer:一个生成器方法,用于消费由 handle_request 构建的 streamer。它与 Textbot 中的 consume_streamer 大体相同,但有一点不同。它使用 tokenizer 来解码生成的 token。通常,Hugging Face 的 streamer 会处理解码。由于此实现使用了自定义 RawStreamer,因此 consume_streamer 必须处理解码。

提示

批次中的某些输入生成的输出可能少于其他输入。当某个特定输入没有更多可生成的输出时,将一个 StopIteration 对象传递到输出可迭代对象中以终止该输入的请求。有关更多详细信息,请参阅 流式批量请求

Batchbot 绑定到语言模型。对于本教程,使用 "microsoft/DialoGPT-small" 模型:

app = Batchbot.bind("microsoft/DialoGPT-small")

使用 serve run batchbot:app 运行模型。从另外两个终端窗口使用此脚本查询它:

import requests

prompt = "Tell me a story about dogs."

response = requests.post(f"http://localhost:8000/?prompt={prompt}", stream=True)
response.raise_for_status()
for chunk in response.iter_content(chunk_size=None, decode_unicode=True):
    print(chunk, end="")

    # Dogs are the best.

您应该在两个窗口中看到输出一个 token 接一个 token 地打印出来。