使用 Ray Train 和 DeepSpeed 微调 LLM#

完成时间: 20 分钟

本笔记本结合了 Ray TrainDeepSpeed,可在 GPU 和节点之间高效扩展 PyTorch 训练,同时最大程度地减少 GPU 内存使用。

这个实践示例包含以下内容

  • 微调 LLM

  • 使用 Ray Train 保存和恢复检查点

  • 配置 ZeRO 以实现内存和性能(阶段、混合精度、CPU 卸载)

  • 启动分布式训练作业

Anyscale 特定配置

注意: 此模板针对 Anyscale 平台进行了优化。在 Anyscale 上,大部分配置都是自动化的。在开源 Ray 上运行时,请手动完成以下步骤

  • 配置 Ray 集群:多节点设置和资源分配。
  • 管理依赖项:在每个节点上安装先决条件。
  • 设置存储:提供共享或分布式检查点存储。

安装依赖项(如果需要)#

仅当您的环境仍需要安装这些软件包时,才运行下面的单元格。

%%bash
pip install torch torchvision
pip install transformers datasets==3.6.0 trl==0.23.1
pip install deepspeed ray[train]

配置常量#

本笔记本使用简单的常量而不是 argparse 来简化执行。请根据需要进行调整。

# ---- Training constants (edit these) ----
MODEL_NAME = "gpt2"
DATASET_NAME = "ag_news"
BATCH_SIZE = 1
NUM_EPOCHS = 1
SEQ_LENGTH = 512
LEARNING_RATE = 1e-6
ZERO_STAGE = 3
TUTORIAL_STEPS = 30

# Ray scaling settings
NUM_WORKERS = 2
USE_GPU = True

# Storage
STORAGE_PATH = "/mnt/cluster_storage/"
EXPERIMENT_PREFIX = "deepspeed_sample"

1. 定义训练函数#

首先,定义每个 worker 要执行的训练循环函数。请注意,Ray Train 会为每个 worker 分配一个唯一的 GPU。Ray Train 在每个 worker 上运行此训练函数,以协调整个训练过程。训练函数概述了大多数深度学习工作流的通用高级结构,展示了设置、数据摄取、优化和报告阶段如何在每个 worker 上结合在一起。

训练函数执行以下操作

  1. 使用 DeepSpeed 初始化模型和优化器(setup_model_and_optimizer)。

  2. 如果存在检查点,则从检查点恢复训练(load_checkpoint)。

  3. 设置数据加载器(setup_dataloader)。

  4. 访问 Ray Train 分配给此 worker 的设备。

  5. 遍历指定的 epoch 数。

  6. 对于多 GPU 训练,请确保每个 worker 在每个 epoch 中看到唯一的数据分片。

  7. 对于每个批次

    • 将输入移至设备。

    • 运行前向传播以计算损失。

    • 记录损失。

  8. 使用 DeepSpeed 执行反向传播和优化器步骤。

  9. 聚合平均损失并报告指标,在每个 epoch 结束时保存检查点。(report_metrics_and_save_checkpoint

后续步骤定义了上述辅助函数(setup_model_and_optimizerload_checkpointsetup_dataloaderreport_metrics_and_save_checkpoint)。

from typing import Dict, Any

import os
os.environ["RAY_TRAIN_V2_ENABLED"] = "1"  # Ensure Ray Train v2 APIs
import ray

def train_loop(config: Dict[str, Any]) -> None:
    # (1) Initialize model and optimizer with DeepSpeed
    ds_engine = setup_model_and_optimizer(config["model_name"], config["learning_rate"], config["ds_config"])

    # (2) Load checkpoint if it exists
    ckpt = ray.train.get_checkpoint()
    start_epoch = 0
    if ckpt:
        start_epoch = load_checkpoint(ds_engine, ckpt)

    # (3) Set up dataloader
    train_loader = setup_dataloader(config["model_name"], config["dataset_name"], config["seq_length"], config["batch_size"])
    steps_per_epoch = len(train_loader)

    # (4) Access the device for this worker
    device = ray.train.torch.get_device()

    # Set model to training mode
    ds_engine.train()

    for epoch in range(start_epoch, config["epochs"]):
        # (6) Ensure unique shard per worker when using multiple GPUs
        if ray.train.get_context().get_world_size() > 1 and hasattr(train_loader, "sampler"):
            sampler = getattr(train_loader, "sampler", None)
            if sampler and hasattr(sampler, "set_epoch"):
                sampler.set_epoch(epoch)

        running_loss = 0.0
        num_batches = 0

        # (7) Iterate over batches
        for step, batch in enumerate(train_loader):
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)

            # Forward pass
            outputs = ds_engine(
                input_ids=input_ids,
                attention_mask=attention_mask,
                labels=input_ids,
                use_cache=False
            )
            loss = outputs.loss
            print(f"Epoch: {epoch} Step: {step + 1}/{steps_per_epoch} Loss: {loss.item()}")

            # Backward pass and optimizer step
            ds_engine.backward(loss)
            ds_engine.step()

            running_loss += loss.item()
            num_batches += 1

            # Stop early in the tutorial so runs finish quickly
            if step + 1 >= config["tutorial_steps"]:
                print(f"Stopping early at {config['tutorial_steps']} steps for the tutorial")
                break

        # (8) Report metrics and save checkpoint
        report_metrics_and_save_checkpoint(ds_engine, {"loss": running_loss / num_batches, "epoch": epoch})

Ray Train 在每个 worker 上运行 train_loop,这自然支持数据并行。在此设置中,每个 worker 处理唯一的数据分片,在本地计算梯度,并参与同步以保持模型参数一致。在此基础上,DeepSpeed 将模型和优化器状态分布在 GPU 之间,以减少内存使用和通信开销。

2. 设置数据加载器#

下面的代码演示了如何准备文本数据,以便每个 worker 在训练期间高效地提供批次。

  1. 从 Hugging Face Hub 下载分词器(AutoTokenizer)。

  2. 使用 Hugging Face 的 load_dataset 加载 ag_news 数据集。

  3. 通过调用 map 应用带填充和截断的分词。

  4. 将数据集转换为 PyTorch DataLoader,该加载器负责批处理和洗牌。

  5. 最后,调用 ray.train.torch.prepare_data_loader 使数据加载器准备好进行分布式处理。

当您使用数据并行时,每个 GPU worker 都将在数据集的唯一分片上进行训练,同时拥有自己的模型副本;梯度在每个步骤后进行同步。Ray Train 的 prepare_data_loader 包装了 PyTorch 的 DataLoader,并确保 worker 看到不重叠的数据,平衡分片,并正确处理 epoch 边界。

import ray.train
import ray.train.torch
from torch.utils.data import DataLoader
from transformers import AutoTokenizer
from datasets import load_dataset, DownloadConfig

def setup_dataloader(model_name: str, dataset_name: str, seq_length: int, batch_size: int) -> DataLoader:
    # (1) Get tokenizer
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    
    # Set pad token if not already set
    if tokenizer.pad_token is None:
        if tokenizer.eos_token is not None:
            tokenizer.pad_token = tokenizer.eos_token
        else:
            # Fallback for models without eos_token
            tokenizer.pad_token = tokenizer.unk_token

    # (2) Load dataset
    # This example uses only 1% of the dataset for quick testing. Adjust as needed.
    dataset = load_dataset(dataset_name, split="train[:1%]", download_config=DownloadConfig(disable_tqdm=True))

    # (3) Tokenize
    def tokenize_function(examples):
        return tokenizer(examples['text'], padding='max_length', max_length=seq_length, truncation=True)
    tokenized_dataset = dataset.map(tokenize_function, batched=True, num_proc=1, keep_in_memory=True)
    tokenized_dataset.set_format(type='torch', columns=['input_ids', 'attention_mask'])

    # (4) Create DataLoader
    data_loader = DataLoader(tokenized_dataset, batch_size=batch_size, shuffle=True)

    # (5) Use prepare_data_loader for distributed training
    return ray.train.torch.prepare_data_loader(data_loader)

以下代码演示了如何使用分词器对样本字符串进行编码。

  • AutoTokenizer.from_pretrained 为您的模型下载并配置分词器。

  • 您可以对任何文本字符串进行编码,并检查生成的 token ID 和 attention mask。

# Example usage of get_tokenizer
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
sample_text = "Ray Train and DeepSpeed make distributed training easy!"
encoded = tokenizer(sample_text)
print(encoded)

3. 初始化模型和优化器#

在准备和分发数据集后,下一步是设置模型和优化器进行训练。此函数执行以下操作

  1. 从 Hugging Face Hub 加载预训练模型(AutoModelForCausalLM)。

  2. 定义优化器(AdamW)。

  3. 使用 ZeRO 选项初始化 DeepSpeed 并返回 DeepSpeedEngine

DeepSpeed 的 initialize 始终会在参与训练的所有 worker 的 GPU 内存之间划分优化器状态(ZeRO Stage 1)。根据选择的阶段,它还可以划分梯度(Stage 2)和模型参数/权重(Stage 3)。这种分阶段的方法平衡了内存节省和通信开销,本教程将在后续步骤中更详细地介绍这些阶段

from typing import Dict, Any
import torch
from transformers import AutoModelForCausalLM
import deepspeed

def setup_model_and_optimizer(model_name: str, learning_rate: float, ds_config: Dict[str, Any]) -> deepspeed.runtime.engine.DeepSpeedEngine:
    # (1) Load pretrained model
    model = AutoModelForCausalLM.from_pretrained(model_name)

    # (2) Define optimizer
    optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

    # (3) Initialize with DeepSpeed (distributed and memory optimizations)
    ds_engine, _, _, _ = deepspeed.initialize(model=model, optimizer=optimizer, config=ds_config)
    return ds_engine

4. 检查点保存和加载#

检查点对于容错和在中断后恢复训练至关重要。本节演示了如何在分布式 Ray Train 和 DeepSpeed 设置中保存和恢复模型和优化器状态。它展示了每个 worker 如何保存自己的检查点分片,Ray 如何将它们捆绑成统一的检查点,以及这如何实现从保存状态无缝恢复或进一步微调。

保存检查点#

首先定义 Ray Train 在训练期间应如何保存检查点。下面的代码演示了如何创建临时目录,存储模型状态,并将检查点信息和指标报告回 Ray Train 以进行跟踪和协调。请注意,DeepSpeed 以分片格式保存模型和优化器状态,其中每个 worker 只存储其分片。

  1. 创建一个临时目录来存储检查点。

  2. 使用 DeepSpeed 的 save_checkpoint 保存分片模型和优化器状态。

  3. 使用 ray.train.report 将指标和检查点位置报告给 Ray Train。

import tempfile
import ray.train
from ray.train import Checkpoint

def report_metrics_and_save_checkpoint(
    ds_engine: deepspeed.runtime.engine.DeepSpeedEngine,
    metrics: Dict[str, Any]
) -> None:
    """Save worker checkpoints and report metrics to Ray Train.
    Each rank writes its shard to a temp directory so Ray Train bundles all of them.
    """
    ctx = ray.train.get_context()
    epoch_value = metrics["epoch"]

    with tempfile.TemporaryDirectory() as tmp_dir:
        checkpoint_dir = os.path.join(tmp_dir, "checkpoint")
        os.makedirs(checkpoint_dir, exist_ok=True)

        ds_engine.save_checkpoint(checkpoint_dir)

        epoch_file = os.path.join(checkpoint_dir, "epoch.txt")
        with open(epoch_file, "w", encoding="utf-8") as f:
            f.write(str(epoch_value))

        checkpoint = Checkpoint.from_directory(tmp_dir)
        ray.train.report(metrics, checkpoint=checkpoint)

        if ctx.get_world_rank() == 0:
            experiment_name = ctx.get_experiment_name()
            print(
                f"Checkpoint saved successfully for experiment {experiment_name} at {checkpoint_dir}. Metrics: {metrics}"
            )

加载检查点#

在保存检查点之后,下一步是能够从保存的状态恢复训练或评估。这确保了进度不会因中断而丢失,并允许长时间运行的作业在会话之间无缝继续。重新启动时,Ray Train 会向每个 worker 提供最新的检查点,以便 DeepSpeed 可以从中断处重建模型、优化器和训练进度。

使用 load_checkpoint 将先前保存的检查点恢复到 DeepSpeed 引擎中。

def load_checkpoint(ds_engine: deepspeed.runtime.engine.DeepSpeedEngine, ckpt: ray.train.Checkpoint) -> int:
    """Restore DeepSpeed state and determine next epoch."""
    next_epoch = 0
    try:
        with ckpt.as_directory() as checkpoint_dir:
            print(f"Loading checkpoint from {checkpoint_dir}")
            epoch_dir = os.path.join(checkpoint_dir, "checkpoint")
            if not os.path.isdir(epoch_dir):
                epoch_dir = checkpoint_dir

            ds_engine.load_checkpoint(epoch_dir)

            epoch_file = os.path.join(epoch_dir, "epoch.txt")
            if os.path.isfile(epoch_file):
                with open(epoch_file, "r", encoding="utf-8") as f:
                    last_epoch = int(f.read().strip())
                next_epoch = last_epoch + 1

    except Exception as e:
        raise RuntimeError(f"Checkpoint loading failed: {e}") from e
    return next_epoch

5. 配置 DeepSpeed#

在启动分布式训练之前,您需要定义一个 DeepSpeed 配置字典(ds_config),该字典控制数据类型设置、批次大小、包括 ZeRO(模型状态分区策略)在内的优化等。此配置决定了 DeepSpeed 如何管理 GPU 之间的内存、通信和性能。

下面的示例显示了一个最小的设置,它启用了 bfloat16 精度、梯度裁剪和 ZeRO 优化。您可以根据您的模型大小、硬件和性能目标进一步自定义此配置。有关更多详细信息,请参阅 高级配置

# DeepSpeed configuration
ds_config = {
    "train_micro_batch_size_per_gpu": BATCH_SIZE,
    "bf16": {"enabled": True},
    "grad_accum_dtype": "bf16",
    "zero_optimization": {
        "stage": ZERO_STAGE,
        "overlap_comm": True,
        "contiguous_gradients": True,
    },
    "gradient_clipping": 1.0,
}

6. 启动分布式训练#

最后一步是配置参数并启动分布式训练作业。Ray Train 的 TorchTrainer 会自动启动多个 worker—每个 GPU 一个—并在每个实例上执行 train_loop缩放配置决定启动多少 worker 以及它们使用的资源,而运行配置则管理存储和实验跟踪。

此函数执行以下操作

  1. 解析用于训练和模型设置的命令行参数。

  2. 定义 Ray Train ScalingConfig—例如,worker 的数量和 GPU 使用情况。

  3. 使用超参数和模型详细信息准备训练循环配置。

  4. 设置 Ray Train RunConfig 来管理存储和实验元数据。此示例设置了一个随机的实验名称,但您可以指定先前实验的名称来加载检查点。

  5. 创建一个 TorchTrainer,它会在多个 GPU worker 上启动训练函数。

  6. 使用 trainer.fit() 开始训练并打印结果。

import uuid
from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig, RunConfig

# Ray Train scaling configuration
scaling_config = ScalingConfig(num_workers=NUM_WORKERS, use_gpu=USE_GPU)

# Training loop configuration
train_loop_config = {
    "epochs": NUM_EPOCHS,
    "learning_rate": LEARNING_RATE,
    "batch_size": BATCH_SIZE,
    "ds_config": ds_config,
    "model_name": MODEL_NAME,
    "dataset_name": DATASET_NAME,
    "seq_length": SEQ_LENGTH,
    "tutorial_steps": TUTORIAL_STEPS,
}

# Ray Train run configuration
run_config = RunConfig(
    storage_path=STORAGE_PATH,
    # Set the name of the previous experiment when resuming from a checkpoint
    name=f"{EXPERIMENT_PREFIX}_{uuid.uuid4().hex[:8]}",
)

# Create and launch the trainer
trainer = TorchTrainer(
    train_loop_per_worker=train_loop,
    scaling_config=scaling_config,
    train_loop_config=train_loop_config,
    run_config=run_config,
)

# To actually run training, execute the following:
result = trainer.fit()
print(f"Training finished. Result: {result}")

作为独立脚本运行#

虽然本教程设计为在 Jupyter 笔记本中交互运行,但您也可以将相同的训练工作流作为独立的 Python 脚本启动。这对于运行更长的实验、自动化作业或在集群上部署训练很有用。

完整的 代码 也可用。要从命令行开始训练,请运行

python train.py

高级配置#

DeepSpeed 还有许多其他配置选项可用于调整性能和内存使用。本节介绍一些最常用的选项。有关更多详细信息,请参阅 DeepSpeed 文档

DeepSpeed ZeRO 阶段#

  • 阶段 1:划分优化器状态(使用 ZeRO 时始终开启)。

  • 阶段 2:另外划分梯度。

  • 阶段 3:另外划分模型参数或权重。

阶段越高,内存节省越多,但训练的通信开销和复杂性也可能越大。您可以通过 ds_config["zero_optimization"]["stage"] 选择阶段。有关更多详细信息,请参阅 DeepSpeed 文档。

ds_config = {
    "zero_optimization": {
        "stage": 2,  # or 1 or 3
    },
}

混合精度#

启用 BF16 或 FP16

ds_config = {
    "bf16": {"enabled": True},  # or "fp16": {"enabled": True}
}

CPU 卸载#

通过卸载到 CPU 来减少 GPU 内存压力,但会增加 PCIe 流量

ds_config = {
    "offload_param": {"device": "cpu", "pin_memory": True},
    # or
    "offload_optimizer": {"device": "cpu", "pin_memory": True},
}

总结#

在本教程中,您执行了以下操作

  • 使用 Ray Train 和 DeepSpeed ZeRO 微调了 LLM

  • 使用 Ray Train 的 prepare_data_loader 设置了分布式数据加载

  • 使用 Ray Train 的存储配置保存和管理了检查点

  • 使用 TorchTrainer 和缩放配置配置并启动了多 GPU 训练

  • 探索了高级 DeepSpeed 配置(ZeRO 阶段、混合精度和 CPU 卸载)