使用请求和响应流服务聊天机器人#
本示例部署了一个聊天机器人,它将输出流式传输回用户。它展示了如何:
如何从 Serve 应用流式传输输出
如何在 Serve 应用中使用 WebSockets
如何将批量请求与流式输出结合
本教程应能帮助您解决以下用例:
您想服务一个大型语言模型并按 token 流式传回结果。
您想服务一个聊天机器人,该机器人接受来自用户的输入流。
本教程服务于 DialoGPT 语言模型。安装 Hugging Face 库以访问它:
pip install "ray[serve]" transformers torch
创建流式部署#
创建一个名为 textbot.py
的 Python 文件。首先,添加导入和 Serve 日志记录器。
import asyncio
import logging
from queue import Empty
from fastapi import FastAPI
from starlette.responses import StreamingResponse
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
from ray import serve
logger = logging.getLogger("ray.serve")
创建一个 FastAPI 部署,并在构造函数中初始化模型和 tokenizer。
fastapi_app = FastAPI()
@serve.deployment
@serve.ingress(fastapi_app)
class Textbot:
def __init__(self, model_id: str):
self.loop = asyncio.get_running_loop()
self.model_id = model_id
self.model = AutoModelForCausalLM.from_pretrained(self.model_id)
self.tokenizer = AutoTokenizer.from_pretrained(self.model_id)
注意,构造函数还会缓存一个 asyncio
循环。当您需要运行模型并同时将其 token 流式传输回用户时,此行为非常有用。
添加以下逻辑以处理发送到 Textbot
的请求:
@fastapi_app.post("/")
def handle_request(self, prompt: str) -> StreamingResponse:
logger.info(f'Got prompt: "{prompt}"')
streamer = TextIteratorStreamer(
self.tokenizer, timeout=0, skip_prompt=True, skip_special_tokens=True
)
self.loop.run_in_executor(None, self.generate_text, prompt, streamer)
return StreamingResponse(
self.consume_streamer(streamer), media_type="text/plain"
)
def generate_text(self, prompt: str, streamer: TextIteratorStreamer):
input_ids = self.tokenizer([prompt], return_tensors="pt").input_ids
self.model.generate(input_ids, streamer=streamer, max_length=10000)
async def consume_streamer(self, streamer: TextIteratorStreamer):
while True:
try:
for token in streamer:
logger.info(f'Yielding token: "{token}"')
yield token
break
except Empty:
# The streamer raises an Empty exception if the next token
# hasn't been generated yet. `await` here to yield control
# back to the event loop so other coroutines can run.
await asyncio.sleep(0.001)
Textbot
使用三种方法处理请求:
handle_request
:HTTP 请求的入口点。FastAPI 自动解包prompt
查询参数并将其传递给handle_request
。然后,此方法创建一个TextIteratorStreamer
。Hugging Face 提供了这个 streamer 作为访问语言模型生成的 token 的便捷接口。handle_request
随后使用self.loop.run_in_executor
在后台线程中启动模型。此行为允许模型生成 token,而handle_request
同时调用self.consume_streamer
将 token 流式传输回用户。self.consume_streamer
是一个生成器,它从 streamer 中一个接一个地生成 token。最后,handle_request
将self.consume_streamer
生成器传递给 StarletteStreamingResponse
并返回响应。Serve 解包 StarletteStreamingResponse
并将生成器的内容一个接一个地生成回用户。generate_text
:运行模型的方法。此方法在由handle_request
启动的后台线程中运行。它将生成的 token 推入由handle_request
构建的 streamer 中。consume_streamer
:一个生成器方法,用于消费由handle_request
构建的 streamer。此方法持续从 streamer 生成 token,直到generate_text
中的模型关闭 streamer。当 streamer 为空并等待新 token 时,此方法会调用asyncio.sleep
并设置一个短暂超时,从而避免阻塞事件循环。
将 Textbot
绑定到语言模型。对于本教程,使用 "microsoft/DialoGPT-small"
模型:
app = Textbot.bind("microsoft/DialoGPT-small")
使用 serve run textbot:app
运行模型,然后从另一个终端窗口使用此脚本查询它:
import requests
prompt = "Tell me a story about dogs."
response = requests.post(f"http://localhost:8000/?prompt={prompt}", stream=True)
response.raise_for_status()
for chunk in response.iter_content(chunk_size=None, decode_unicode=True):
print(chunk, end="")
# Dogs are the best.
您应该看到输出一个 token 接一个 token 地打印出来。
使用 WebSockets 流式传输输入和输出#
WebSockets 允许您将输入流式传输到应用中,并将输出流式传输回客户端。使用 WebSockets 创建一个存储用户对话的聊天机器人。
创建一个名为 chatbot.py
的 Python 文件。首先添加导入:
import asyncio
import logging
from queue import Empty
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
from ray import serve
logger = logging.getLogger("ray.serve")
创建一个 FastAPI 部署,并在构造函数中初始化模型和 tokenizer。
fastapi_app = FastAPI()
@serve.deployment
@serve.ingress(fastapi_app)
class Chatbot:
def __init__(self, model_id: str):
self.loop = asyncio.get_running_loop()
self.model_id = model_id
self.model = AutoModelForCausalLM.from_pretrained(self.model_id)
self.tokenizer = AutoTokenizer.from_pretrained(self.model_id)
添加以下逻辑以处理发送到 Chatbot
的请求:
@fastapi_app.websocket("/")
async def handle_request(self, ws: WebSocket) -> None:
await ws.accept()
conversation = ""
try:
while True:
prompt = await ws.receive_text()
logger.info(f'Got prompt: "{prompt}"')
conversation += prompt
streamer = TextIteratorStreamer(
self.tokenizer,
timeout=0,
skip_prompt=True,
skip_special_tokens=True,
)
self.loop.run_in_executor(
None, self.generate_text, conversation, streamer
)
response = ""
async for text in self.consume_streamer(streamer):
await ws.send_text(text)
response += text
await ws.send_text("<<Response Finished>>")
conversation += response
except WebSocketDisconnect:
print("Client disconnected.")
def generate_text(self, prompt: str, streamer: TextIteratorStreamer):
input_ids = self.tokenizer([prompt], return_tensors="pt").input_ids
self.model.generate(input_ids, streamer=streamer, max_length=10000)
async def consume_streamer(self, streamer: TextIteratorStreamer):
while True:
try:
for token in streamer:
logger.info(f'Yielding token: "{token}"')
yield token
break
except Empty:
await asyncio.sleep(0.001)
generate_text
和 consume_streamer
方法与 Textbot
中的相同。handle_request
方法已更新以处理 WebSocket 请求。
handle_request
方法使用 fastapi_app.websocket
装饰器装饰,这使其能够接受 WebSocket 请求。首先,它 await
接受客户端的 WebSocket 请求。然后,在客户端断开连接之前,它执行以下操作:
使用
ws.receive_text
从客户端获取 prompt启动一个新的
TextIteratorStreamer
以访问生成的 token在后台线程中运行模型处理到目前为止的对话
使用
ws.send_text
将模型的输出流式传回将 prompt 和响应存储在
conversation
字符串中
每次 handle_request
从客户端获取新的 prompt 时,它会将整个对话(附加新的 prompt)通过模型运行。当模型完成生成 token 时,handle_request
发送字符串 "<<Response Finished>>"
通知客户端模型已生成所有 token。handle_request
将持续运行,直到客户端明确断开连接。此断开连接会引发 WebSocketDisconnect
异常,从而结束调用。
阅读 FastAPI 文档以了解更多关于 WebSockets 的信息。
将 Chatbot
绑定到语言模型。对于本教程,使用 "microsoft/DialoGPT-small"
模型:
app = Chatbot.bind("microsoft/DialoGPT-small")
使用 serve run chatbot:app
运行模型。使用 websockets
包查询它,使用 pip install websockets
进行安装:
from websockets.sync.client import connect
with connect("ws://localhost:8000") as websocket:
websocket.send("Space the final")
while True:
received = websocket.recv()
if received == "<<Response Finished>>":
break
print(received, end="")
print("\n")
websocket.send(" These are the voyages")
while True:
received = websocket.recv()
if received == "<<Response Finished>>":
break
print(received, end="")
print("\n")
您应该看到输出一个 token 接一个 token 地打印出来。
批量处理请求并流式传输每个请求的输出#
通过在运行模型时将请求批量处理,提高模型利用率并降低请求延迟。
创建一个名为 batchbot.py
的 Python 文件。首先添加导入:
import asyncio
import logging
from queue import Empty, Queue
from fastapi import FastAPI
from transformers import AutoModelForCausalLM, AutoTokenizer
from ray import serve
logger = logging.getLogger("ray.serve")
警告
Hugging Face 对 Streamers
的支持仍在开发中,将来可能会发生变化。RawQueue
与 Hugging Face 4.30.2 中的 Streamers
接口兼容。但是,将来 Streamers
接口可能会改变,导致 RawQueue
与未来的 Hugging Face 模型不兼容。
与 Textbot
和 Chatbot
类似,Batchbot
需要一个 streamer 来流式传输批量请求的输出,但 Hugging Face Streamers
不支持批量请求。添加此自定义 RawStreamer
以处理 token 批次:
class RawStreamer:
def __init__(self, timeout: float = None):
self.q = Queue()
self.stop_signal = None
self.timeout = timeout
def put(self, values):
self.q.put(values)
def end(self):
self.q.put(self.stop_signal)
def __iter__(self):
return self
def __next__(self):
result = self.q.get(timeout=self.timeout)
if result == self.stop_signal:
raise StopIteration()
else:
return result
创建一个 FastAPI 部署,并在构造函数中初始化模型和 tokenizer。
fastapi_app = FastAPI()
@serve.deployment
@serve.ingress(fastapi_app)
class Batchbot:
def __init__(self, model_id: str):
self.loop = asyncio.get_running_loop()
self.model_id = model_id
self.model = AutoModelForCausalLM.from_pretrained(self.model_id)
self.tokenizer = AutoTokenizer.from_pretrained(self.model_id)
self.tokenizer.pad_token = self.tokenizer.eos_token
与 Textbot
和 Chatbot
不同,Batchbot
构造函数还设置了 pad_token
。您需要设置此 token 来批量处理不同长度的 prompt。
添加以下逻辑以处理发送到 Batchbot
的请求:
@fastapi_app.post("/")
async def handle_request(self, prompt: str) -> StreamingResponse:
logger.info(f'Got prompt: "{prompt}"')
return StreamingResponse(self.run_model(prompt), media_type="text/plain")
@serve.batch(max_batch_size=2, batch_wait_timeout_s=15)
async def run_model(self, prompts: List[str]):
streamer = RawStreamer()
self.loop.run_in_executor(None, self.generate_text, prompts, streamer)
on_prompt_tokens = True
async for decoded_token_batch in self.consume_streamer(streamer):
# The first batch of tokens contains the prompts, so we skip it.
if not on_prompt_tokens:
logger.info(f"Yielding decoded_token_batch: {decoded_token_batch}")
yield decoded_token_batch
else:
logger.info(f"Skipped prompts: {decoded_token_batch}")
on_prompt_tokens = False
def generate_text(self, prompts: str, streamer: RawStreamer):
input_ids = self.tokenizer(prompts, return_tensors="pt", padding=True).input_ids
self.model.generate(input_ids, streamer=streamer, max_length=10000)
async def consume_streamer(self, streamer: RawStreamer):
while True:
try:
for token_batch in streamer:
decoded_tokens = []
for token in token_batch:
decoded_tokens.append(
self.tokenizer.decode(token, skip_special_tokens=True)
)
logger.info(f"Yielding decoded tokens: {decoded_tokens}")
yield decoded_tokens
break
except Empty:
await asyncio.sleep(0.001)
Batchbot
使用四种方法处理请求:
handle_request
:入口点方法。此方法简单地接收请求的 prompt 并调用其上的run_model
方法。run_model
是一个生成器方法,也负责处理请求批处理。handle_request
将run_model
传递给 StarletteStreamingResponse
并返回响应,以便机器人可以将生成的 token 流式传输回客户端。run_model
:执行批处理的生成器方法。由于run_model
使用@serve.batch
装饰,它自动接收一个 prompt 批次。有关更多信息,请参阅 批处理指南 。run_model
创建一个RawStreamer
以访问生成的 token。它在后台线程中调用generate_text
,并传递 prompt 和 streamer,类似于Textbot
。然后它遍历consume_streamer
生成器,重复生成由模型生成的一批 token。generate_text
:运行模型的方法。它与Textbot
中的generate_text
大体相同,但有两点不同。首先,它接收并处理一批 prompt,而不是单个 prompt。其次,它设置了padding=True
,这样不同长度的 prompt 可以被批量处理在一起。consume_streamer
:一个生成器方法,用于消费由handle_request
构建的 streamer。它与Textbot
中的consume_streamer
大体相同,但有一点不同。它使用tokenizer
来解码生成的 token。通常,Hugging Face 的 streamer 会处理解码。由于此实现使用了自定义RawStreamer
,因此consume_streamer
必须处理解码。
提示
批次中的某些输入生成的输出可能少于其他输入。当某个特定输入没有更多可生成的输出时,将一个 StopIteration
对象传递到输出可迭代对象中以终止该输入的请求。有关更多详细信息,请参阅 流式批量请求。
将 Batchbot
绑定到语言模型。对于本教程,使用 "microsoft/DialoGPT-small"
模型:
app = Batchbot.bind("microsoft/DialoGPT-small")
使用 serve run batchbot:app
运行模型。从另外两个终端窗口使用此脚本查询它:
import requests
prompt = "Tell me a story about dogs."
response = requests.post(f"http://localhost:8000/?prompt={prompt}", stream=True)
response.raise_for_status()
for chunk in response.iter_content(chunk_size=None, decode_unicode=True):
print(chunk, end="")
# Dogs are the best.
您应该在两个窗口中看到输出一个 token 接一个 token 地打印出来。