音频批处理推理#

   

本教程演示了一个批处理推理管道,该管道使用两个不同的 ML 模型将原始音频文件转换为精选的子集。该管道将四个阶段链接在一起,每个阶段都在同一 Ray 集群上运行,并使用异构资源。

  1. Common Voice 11.0 的英文验证集流式传输到 Ray 数据集中。

  2. 将每个剪辑重采样到 16 kHz,以兼容 Whisper。

  3. 使用 openai/whisper-large-v3-turbo 模型转录音频。

  4. 使用小型 Llama-3 模型判断每个转录稿的教育质量。

  5. 仅将得分 ≥ 3 的剪辑保留到 Parquet 数据集中。

Ray Data 在这种用例中尤其强大,因为它

  • 自动将工作并行化到集群中的机器

  • 无缝处理异构计算资源

  • 使用惰性执行来优化执行计划

  • 一旦第一个数据块可用,就通过每个阶段处理数据。这种流式执行模型最大限度地减少了第一次结果所需的时间,消除了大型中间数据存储,并最大限度地提高了资源利用率。

  • 相同的脚本只需进行很少的代码更改即可扩展到更大的 GPU 集群。

先决条件#

使用以下命令安装依赖项

pip install -r requirements.txt

本教程运行在具有五个 L4 GPU 工作节点(worker nodes)的集群上。

设置#

首先,导入必要的模块

import io
import os

import numpy as np
import ray
import torch
import torchaudio
import torchaudio.transforms as T
from ray.data.llm import build_processor, vLLMEngineProcessorConfig
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor

TRANSCRIPTION_MODEL = "openai/whisper-tiny"
JUDGE_MODEL = "unsloth/Meta-Llama-3.1-8B-Instruct"

流式数据摄入#

ray.data.read_parquet惰性方式读取记录并将其分发到集群中。这种方法利用了每个节点的网络带宽,并立即开始工作,而无需等待整个数据集下载。加载后,Ray 将数据分成块并将其分派给工作节点进行处理。

# Load the English subset of Common Voice 11.0.
raw_ds = ray.data.read_parquet("s3://anonymous@air-example-data/common_voice_11_0_audio_dataset.parquet")
# Subsample for demonstration purposes.
raw_ds = raw_ds.limit(1000)

音频预处理#

Whisper 检查点需要 16 kHz 的单声道音频。
使用 TorchAudio 在 CPU 上进行采样率调整,将张量流式传输回 Dataset。该操作并行运行,因此简单的 ds.map 会将工作分配到集群的 CPU 核心上。

ds.map() 会在集群中并行地将转换函数应用于每个记录。只要有可能,Ray 就会避免在网络连接上传输对象,以利用零拷贝读取,从而避免序列化和反序列化开销。

一旦数据块完成预处理,就可以移到下一阶段,而无需等待整个数据集。

def resample(item):
    # Resample at 16kHz, which is what openai/whisper-large-v3-turbo was trained on.
    audio_bytes = item.pop("audio_bytes")
    new_sampling_rate = 16000
    waveform, sampling_rate = torchaudio.load(io.BytesIO(audio_bytes), format="flac")
    waveform = T.Resample(sampling_rate, new_sampling_rate)(waveform).squeeze()
    item["arr"] = np.array(waveform)
    item["sampling_rate"] = new_sampling_rate
    return item


ds = raw_ds.map(resample)

接下来,使用 Whisper 的预处理器预处理数据。此步骤作为单独的阶段运行,以便可以独立于 Whisper 模型本身进行扩展。

map_batches() 一次转换整个记录批次,而不是单个项。通过将一个类传递给 map_batches(),Ray 会创建一个 Actor 进程,该进程在批次之间循环使用状态。 concurrency 参数控制并行处理批次的数量。 batch_format="pandas" 设置会在处理之前将批次转换为 pandas DataFrames。

class WhisperPreprocessor:
    def __init__(self):
        self.processor = AutoProcessor.from_pretrained(TRANSCRIPTION_MODEL)

    def __call__(self, batch):
        # The Whisper processor expects a list of 1D NumPy arrays (mono audio).

        # Extract log-mel spectogram of audio.
        extracted_features = self.processor(
            batch["arr"].tolist(),
            sampling_rate=batch["sampling_rate"][0],  # Expects int, not list.
            return_tensors="np",
            device="cpu",  # Eligible for GPU, but reserving GPUs for the models.
        ).input_features
        # extracted_featues is a pd.Series of np.array shape (3000,).
        return {"input_features": [arr for arr in extracted_features], "id": batch["id"]}


ds = ds.map_batches(WhisperPreprocessor, batch_size=2, batch_format="pandas", concurrency=1)
# ds.show(1)

运行 ds.show(1) 会显示类似以下的输出:

[{'id': '19ba96...', 'transcription': ' It is from Westport above the villages of Morrisk and La Canvae.'}]

请注意,由于 Ray Data 的惰性执行模型,ds.show(1) 会触发到目前为止的管道执行。这种方法意味着摄入和预处理实际上会运行以产生第一个结果。

使用 Whisper 进行 GPU 推理#

Ray Data 可以在适当的资源上调度用户定义的 Python 可调用对象。下面的 Transcriber第一次运行时惰性地将 openai/whisper-large-v3-turbo 加载到每个 GPU 工作节点上,后续的批次会重用预热的模型。

num_gpus=1 参数告诉 Ray 在具有 GPU 的节点上运行每个副本。Ray Data 会根据可用资源自动处理可变大小的批次。如果某个工作节点失败,Ray 会自动在另一个节点上重启该任务。

class Transcriber:
    def __init__(self):
        self.device = "cuda"
        self.dtype = torch.float16
        self.model_id = "openai/whisper-tiny"
        self.model = AutoModelForSpeechSeq2Seq.from_pretrained(self.model_id, torch_dtype=self.dtype, low_cpu_mem_usage=True, use_safetensors=True)
        self.model.to(self.device)

    def __call__(self, batch):
        # Fuse all list of np.array into a single np.array for faster tensor creation.
        spectograms = np.array(batch["input_features"])
        spectograms = torch.tensor(spectograms).to(self.device, dtype=self.dtype)

        with torch.no_grad():
            # Generate token IDs for the batched input features.
            token_ids = self.model.generate(spectograms)

        return {"id": batch["id"], "token_ids": token_ids.cpu().numpy()}


# Transcribe audio to text tokens using Whisper.
# Use 2 workers using 1 GPU each.
ds = ds.map_batches(Transcriber, batch_size=2, batch_format="numpy", concurrency=2, num_gpus=1)

现在将令牌解码为实际的转录。此步骤与上一步分开,以防止 GPU 阻塞 CPU 工作并避免空闲时间。此方法还允许独立扩展解码器的数量和 Whisper 副本的数量。

将 GPU 工作与 CPU 工作分开可消除 GPU 空闲。 concurrency=5batch_size=32 参数展示了如何使用比 GPU 工作更多的 CPU 工作和更大的批次大小。

class Decoder:
    def __init__(self):
        self.processor = AutoProcessor.from_pretrained(TRANSCRIPTION_MODEL)

    def __call__(self, batch):
        token_ids = batch.pop("token_ids")
        transcription = self.processor.batch_decode(token_ids, skip_special_tokens=True)
        batch["transcription"] = transcription
        return batch


ds = ds.map_batches(Decoder, batch_size=16, concurrency=5, batch_format="pandas")  # CPU only

# ds.take(1)

基于 LLM 的质量过滤器#

Llama-3 模型充当一个机器裁判,根据其教育价值为每个转录稿打分(从 1 👎 到 5 👍)。LLM Processor API 使用声明式 API 风格封装了批处理、提示格式化和 vLLM 引擎交互的繁重工作。

Ray Data 提供了一个高级 API,用于将 LLM 集成到数据管道中。预处理和后处理函数负责数据准备和结果解析。

# LLM as judge.
judge_config = vLLMEngineProcessorConfig(
    model_source=JUDGE_MODEL,
    engine_kwargs={
        "enable_chunked_prefill": True,
        "max_num_batched_tokens": 1028,
        "max_model_len": 4096,
        "structured_outputs_config": {"backend": "xgrammar"},
        "dtype": torch.float16,
    },
    concurrency=3,
    batch_size=2,
)

processor = build_processor(
    judge_config,
    preprocess=lambda row: dict(
        messages=[
            {
                "role": "system",
                "content": "You are an assistant that rates educational quality of a given text from a scale of 1 (not educational) to 5 (very educational). Only output your score, nothing else (no explanation, no comments, no other text). Acceptable outputs: 1, 2, 3, 4, 5",
            },
            {"role": "user", "content": row["transcription"]},
        ],
        sampling_params=dict(
            guided_decoding={"choice": ["1", "2", "3", "4", "5"]},
        ),
    ),
    postprocess=lambda row: dict(
        score=float(row.pop("generated_text")),  # Rename generated_text to score and convert to float.
        **row,  # Pass through the original fields.
    ),
)

# Rate educational quality of each transcription.
ds = processor(ds)

# Filter out uneducational content.
ds = ds.filter(expr="score >= 3")
# print(ds.take(1))

保留精选子集#

此时,Common Voice 的一小部分高质量数据已准备好用于下游任务,例如微调或评估。
write_parquet 调用最终触发整个管道的执行,并将块并行写入目标存储位置。此函数在可用时写入输出,并自动将结果分片到多个文件中,以便下游高效并行读取。

由于这是一项分布式工作负载,目标存储必须可供所有工作节点写入。该存储可以是 S3、NFS 或其他网络附加解决方案。Anyscale 通过在每个集群上自动创建和挂载共享存储选项来简化此过程。

# Save the filtered dataset to a Parquet file.
# This line triggers the lazy execution of the entire pipeline.
output_dir = "/mnt/cluster_storage/filtered_dataset/"
os.makedirs(output_dir, exist_ok=True)
ds.write_parquet(output_dir)

write_parquet 触发数据管道的完整执行,并将结果流式传输到一系列本地 Parquet 文件中。Ray 会自动将这些文件分片到多个输出中,以便高效的下游读取。

# List the files in the output directory.
print(os.listdir(output_dir))