音频批处理推理#
本教程演示了一个批处理推理管道,该管道使用两个不同的 ML 模型将原始音频文件转换为精选的子集。该管道将四个阶段链接在一起,每个阶段都在同一 Ray 集群上运行,并使用异构资源。
将 Common Voice 11.0 的英文验证集流式传输到 Ray 数据集中。
将每个剪辑重采样到 16 kHz,以兼容 Whisper。
使用
openai/whisper-large-v3-turbo模型转录音频。使用小型 Llama-3 模型判断每个转录稿的教育质量。
仅将得分 ≥ 3 的剪辑保留到 Parquet 数据集中。
Ray Data 在这种用例中尤其强大,因为它
自动将工作并行化到集群中的机器
无缝处理异构计算资源
使用惰性执行来优化执行计划
一旦第一个数据块可用,就通过每个阶段处理数据。这种流式执行模型最大限度地减少了第一次结果所需的时间,消除了大型中间数据存储,并最大限度地提高了资源利用率。
相同的脚本只需进行很少的代码更改即可扩展到更大的 GPU 集群。
先决条件#
使用以下命令安装依赖项
pip install -r requirements.txt
本教程运行在具有五个 L4 GPU 工作节点(worker nodes)的集群上。
设置#
首先,导入必要的模块
import io
import os
import numpy as np
import ray
import torch
import torchaudio
import torchaudio.transforms as T
from ray.data.llm import build_processor, vLLMEngineProcessorConfig
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor
TRANSCRIPTION_MODEL = "openai/whisper-tiny"
JUDGE_MODEL = "unsloth/Meta-Llama-3.1-8B-Instruct"
流式数据摄入#
ray.data.read_parquet 以惰性方式读取记录并将其分发到集群中。这种方法利用了每个节点的网络带宽,并立即开始工作,而无需等待整个数据集下载。加载后,Ray 将数据分成块并将其分派给工作节点进行处理。
# Load the English subset of Common Voice 11.0.
raw_ds = ray.data.read_parquet("s3://anonymous@air-example-data/common_voice_11_0_audio_dataset.parquet")
# Subsample for demonstration purposes.
raw_ds = raw_ds.limit(1000)
音频预处理#
Whisper 检查点需要 16 kHz 的单声道音频。
使用 TorchAudio 在 CPU 上进行采样率调整,将张量流式传输回 Dataset。该操作并行运行,因此简单的 ds.map 会将工作分配到集群的 CPU 核心上。
ds.map() 会在集群中并行地将转换函数应用于每个记录。只要有可能,Ray 就会避免在网络连接上传输对象,以利用零拷贝读取,从而避免序列化和反序列化开销。
一旦数据块完成预处理,就可以移到下一阶段,而无需等待整个数据集。
def resample(item):
# Resample at 16kHz, which is what openai/whisper-large-v3-turbo was trained on.
audio_bytes = item.pop("audio_bytes")
new_sampling_rate = 16000
waveform, sampling_rate = torchaudio.load(io.BytesIO(audio_bytes), format="flac")
waveform = T.Resample(sampling_rate, new_sampling_rate)(waveform).squeeze()
item["arr"] = np.array(waveform)
item["sampling_rate"] = new_sampling_rate
return item
ds = raw_ds.map(resample)
接下来,使用 Whisper 的预处理器预处理数据。此步骤作为单独的阶段运行,以便可以独立于 Whisper 模型本身进行扩展。
map_batches() 一次转换整个记录批次,而不是单个项。通过将一个类传递给 map_batches(),Ray 会创建一个 Actor 进程,该进程在批次之间循环使用状态。 concurrency 参数控制并行处理批次的数量。 batch_format="pandas" 设置会在处理之前将批次转换为 pandas DataFrames。
class WhisperPreprocessor:
def __init__(self):
self.processor = AutoProcessor.from_pretrained(TRANSCRIPTION_MODEL)
def __call__(self, batch):
# The Whisper processor expects a list of 1D NumPy arrays (mono audio).
# Extract log-mel spectogram of audio.
extracted_features = self.processor(
batch["arr"].tolist(),
sampling_rate=batch["sampling_rate"][0], # Expects int, not list.
return_tensors="np",
device="cpu", # Eligible for GPU, but reserving GPUs for the models.
).input_features
# extracted_featues is a pd.Series of np.array shape (3000,).
return {"input_features": [arr for arr in extracted_features], "id": batch["id"]}
ds = ds.map_batches(WhisperPreprocessor, batch_size=2, batch_format="pandas", concurrency=1)
# ds.show(1)
运行 ds.show(1) 会显示类似以下的输出:
[{'id': '19ba96...', 'transcription': ' It is from Westport above the villages of Morrisk and La Canvae.'}]
请注意,由于 Ray Data 的惰性执行模型,ds.show(1) 会触发到目前为止的管道执行。这种方法意味着摄入和预处理实际上会运行以产生第一个结果。
使用 Whisper 进行 GPU 推理#
Ray Data 可以在适当的资源上调度用户定义的 Python 可调用对象。下面的 Transcriber 在第一次运行时惰性地将 openai/whisper-large-v3-turbo 加载到每个 GPU 工作节点上,后续的批次会重用预热的模型。
num_gpus=1 参数告诉 Ray 在具有 GPU 的节点上运行每个副本。Ray Data 会根据可用资源自动处理可变大小的批次。如果某个工作节点失败,Ray 会自动在另一个节点上重启该任务。
class Transcriber:
def __init__(self):
self.device = "cuda"
self.dtype = torch.float16
self.model_id = "openai/whisper-tiny"
self.model = AutoModelForSpeechSeq2Seq.from_pretrained(self.model_id, torch_dtype=self.dtype, low_cpu_mem_usage=True, use_safetensors=True)
self.model.to(self.device)
def __call__(self, batch):
# Fuse all list of np.array into a single np.array for faster tensor creation.
spectograms = np.array(batch["input_features"])
spectograms = torch.tensor(spectograms).to(self.device, dtype=self.dtype)
with torch.no_grad():
# Generate token IDs for the batched input features.
token_ids = self.model.generate(spectograms)
return {"id": batch["id"], "token_ids": token_ids.cpu().numpy()}
# Transcribe audio to text tokens using Whisper.
# Use 2 workers using 1 GPU each.
ds = ds.map_batches(Transcriber, batch_size=2, batch_format="numpy", concurrency=2, num_gpus=1)
现在将令牌解码为实际的转录。此步骤与上一步分开,以防止 GPU 阻塞 CPU 工作并避免空闲时间。此方法还允许独立扩展解码器的数量和 Whisper 副本的数量。
将 GPU 工作与 CPU 工作分开可消除 GPU 空闲。 concurrency=5 和 batch_size=32 参数展示了如何使用比 GPU 工作更多的 CPU 工作和更大的批次大小。
class Decoder:
def __init__(self):
self.processor = AutoProcessor.from_pretrained(TRANSCRIPTION_MODEL)
def __call__(self, batch):
token_ids = batch.pop("token_ids")
transcription = self.processor.batch_decode(token_ids, skip_special_tokens=True)
batch["transcription"] = transcription
return batch
ds = ds.map_batches(Decoder, batch_size=16, concurrency=5, batch_format="pandas") # CPU only
# ds.take(1)
基于 LLM 的质量过滤器#
Llama-3 模型充当一个机器裁判,根据其教育价值为每个转录稿打分(从 1 👎 到 5 👍)。LLM Processor API 使用声明式 API 风格封装了批处理、提示格式化和 vLLM 引擎交互的繁重工作。
Ray Data 提供了一个高级 API,用于将 LLM 集成到数据管道中。预处理和后处理函数负责数据准备和结果解析。
# LLM as judge.
judge_config = vLLMEngineProcessorConfig(
model_source=JUDGE_MODEL,
engine_kwargs={
"enable_chunked_prefill": True,
"max_num_batched_tokens": 1028,
"max_model_len": 4096,
"structured_outputs_config": {"backend": "xgrammar"},
"dtype": torch.float16,
},
concurrency=3,
batch_size=2,
)
processor = build_processor(
judge_config,
preprocess=lambda row: dict(
messages=[
{
"role": "system",
"content": "You are an assistant that rates educational quality of a given text from a scale of 1 (not educational) to 5 (very educational). Only output your score, nothing else (no explanation, no comments, no other text). Acceptable outputs: 1, 2, 3, 4, 5",
},
{"role": "user", "content": row["transcription"]},
],
sampling_params=dict(
guided_decoding={"choice": ["1", "2", "3", "4", "5"]},
),
),
postprocess=lambda row: dict(
score=float(row.pop("generated_text")), # Rename generated_text to score and convert to float.
**row, # Pass through the original fields.
),
)
# Rate educational quality of each transcription.
ds = processor(ds)
# Filter out uneducational content.
ds = ds.filter(expr="score >= 3")
# print(ds.take(1))
保留精选子集#
此时,Common Voice 的一小部分高质量数据已准备好用于下游任务,例如微调或评估。write_parquet 调用最终触发整个管道的执行,并将块并行写入目标存储位置。此函数在可用时写入输出,并自动将结果分片到多个文件中,以便下游高效并行读取。
由于这是一项分布式工作负载,目标存储必须可供所有工作节点写入。该存储可以是 S3、NFS 或其他网络附加解决方案。Anyscale 通过在每个集群上自动创建和挂载共享存储选项来简化此过程。
# Save the filtered dataset to a Parquet file.
# This line triggers the lazy execution of the entire pipeline.
output_dir = "/mnt/cluster_storage/filtered_dataset/"
os.makedirs(output_dir, exist_ok=True)
ds.write_parquet(output_dir)
write_parquet 触发数据管道的完整执行,并将结果流式传输到一系列本地 Parquet 文件中。Ray 会自动将这些文件分片到多个输出中,以便高效的下游读取。
# List the files in the output directory.
print(os.listdir(output_dir))