使用 Ray Train 和 DeepSpeed 微调 LLM#
完成时间: 20 分钟
本笔记本结合了 Ray Train 和 DeepSpeed,可在 GPU 和节点之间高效扩展 PyTorch 训练,同时最大程度地减少 GPU 内存使用。
这个实践示例包含以下内容
微调 LLM
使用 Ray Train 保存和恢复检查点
配置 ZeRO 以实现内存和性能(阶段、混合精度、CPU 卸载)
启动分布式训练作业
Anyscale 特定配置
注意: 此模板针对 Anyscale 平台进行了优化。在 Anyscale 上,大部分配置都是自动化的。在开源 Ray 上运行时,请手动完成以下步骤
- 配置 Ray 集群:多节点设置和资源分配。
- 管理依赖项:在每个节点上安装先决条件。
- 设置存储:提供共享或分布式检查点存储。
安装依赖项(如果需要)#
仅当您的环境仍需要安装这些软件包时,才运行下面的单元格。
%%bash
pip install torch torchvision
pip install transformers datasets==3.6.0 trl==0.23.1
pip install deepspeed ray[train]
配置常量#
本笔记本使用简单的常量而不是 argparse 来简化执行。请根据需要进行调整。
# ---- Training constants (edit these) ----
MODEL_NAME = "gpt2"
DATASET_NAME = "ag_news"
BATCH_SIZE = 1
NUM_EPOCHS = 1
SEQ_LENGTH = 512
LEARNING_RATE = 1e-6
ZERO_STAGE = 3
TUTORIAL_STEPS = 30
# Ray scaling settings
NUM_WORKERS = 2
USE_GPU = True
# Storage
STORAGE_PATH = "/mnt/cluster_storage/"
EXPERIMENT_PREFIX = "deepspeed_sample"
1. 定义训练函数#
首先,定义每个 worker 要执行的训练循环函数。请注意,Ray Train 会为每个 worker 分配一个唯一的 GPU。Ray Train 在每个 worker 上运行此训练函数,以协调整个训练过程。训练函数概述了大多数深度学习工作流的通用高级结构,展示了设置、数据摄取、优化和报告阶段如何在每个 worker 上结合在一起。
训练函数执行以下操作
使用 DeepSpeed 初始化模型和优化器(
setup_model_and_optimizer)。如果存在检查点,则从检查点恢复训练(
load_checkpoint)。设置数据加载器(
setup_dataloader)。访问 Ray Train 分配给此 worker 的设备。
遍历指定的 epoch 数。
对于多 GPU 训练,请确保每个 worker 在每个 epoch 中看到唯一的数据分片。
对于每个批次
将输入移至设备。
运行前向传播以计算损失。
记录损失。
使用 DeepSpeed 执行反向传播和优化器步骤。
聚合平均损失并报告指标,在每个 epoch 结束时保存检查点。(
report_metrics_and_save_checkpoint)
后续步骤定义了上述辅助函数(setup_model_and_optimizer、load_checkpoint、setup_dataloader、report_metrics_and_save_checkpoint)。
from typing import Dict, Any
import os
os.environ["RAY_TRAIN_V2_ENABLED"] = "1" # Ensure Ray Train v2 APIs
import ray
def train_loop(config: Dict[str, Any]) -> None:
# (1) Initialize model and optimizer with DeepSpeed
ds_engine = setup_model_and_optimizer(config["model_name"], config["learning_rate"], config["ds_config"])
# (2) Load checkpoint if it exists
ckpt = ray.train.get_checkpoint()
start_epoch = 0
if ckpt:
start_epoch = load_checkpoint(ds_engine, ckpt)
# (3) Set up dataloader
train_loader = setup_dataloader(config["model_name"], config["dataset_name"], config["seq_length"], config["batch_size"])
steps_per_epoch = len(train_loader)
# (4) Access the device for this worker
device = ray.train.torch.get_device()
# Set model to training mode
ds_engine.train()
for epoch in range(start_epoch, config["epochs"]):
# (6) Ensure unique shard per worker when using multiple GPUs
if ray.train.get_context().get_world_size() > 1 and hasattr(train_loader, "sampler"):
sampler = getattr(train_loader, "sampler", None)
if sampler and hasattr(sampler, "set_epoch"):
sampler.set_epoch(epoch)
running_loss = 0.0
num_batches = 0
# (7) Iterate over batches
for step, batch in enumerate(train_loader):
input_ids = batch['input_ids'].to(device)
attention_mask = batch['attention_mask'].to(device)
# Forward pass
outputs = ds_engine(
input_ids=input_ids,
attention_mask=attention_mask,
labels=input_ids,
use_cache=False
)
loss = outputs.loss
print(f"Epoch: {epoch} Step: {step + 1}/{steps_per_epoch} Loss: {loss.item()}")
# Backward pass and optimizer step
ds_engine.backward(loss)
ds_engine.step()
running_loss += loss.item()
num_batches += 1
# Stop early in the tutorial so runs finish quickly
if step + 1 >= config["tutorial_steps"]:
print(f"Stopping early at {config['tutorial_steps']} steps for the tutorial")
break
# (8) Report metrics and save checkpoint
report_metrics_and_save_checkpoint(ds_engine, {"loss": running_loss / num_batches, "epoch": epoch})
Ray Train 在每个 worker 上运行 train_loop,这自然支持数据并行。在此设置中,每个 worker 处理唯一的数据分片,在本地计算梯度,并参与同步以保持模型参数一致。在此基础上,DeepSpeed 将模型和优化器状态分布在 GPU 之间,以减少内存使用和通信开销。
2. 设置数据加载器#
下面的代码演示了如何准备文本数据,以便每个 worker 在训练期间高效地提供批次。
从 Hugging Face Hub 下载分词器(
AutoTokenizer)。使用 Hugging Face 的
load_dataset加载ag_news数据集。通过调用
map应用带填充和截断的分词。将数据集转换为 PyTorch
DataLoader,该加载器负责批处理和洗牌。最后,调用
ray.train.torch.prepare_data_loader使数据加载器准备好进行分布式处理。
当您使用数据并行时,每个 GPU worker 都将在数据集的唯一分片上进行训练,同时拥有自己的模型副本;梯度在每个步骤后进行同步。Ray Train 的 prepare_data_loader 包装了 PyTorch 的 DataLoader,并确保 worker 看到不重叠的数据,平衡分片,并正确处理 epoch 边界。
import ray.train
import ray.train.torch
from torch.utils.data import DataLoader
from transformers import AutoTokenizer
from datasets import load_dataset, DownloadConfig
def setup_dataloader(model_name: str, dataset_name: str, seq_length: int, batch_size: int) -> DataLoader:
# (1) Get tokenizer
tokenizer = AutoTokenizer.from_pretrained(model_name)
# Set pad token if not already set
if tokenizer.pad_token is None:
if tokenizer.eos_token is not None:
tokenizer.pad_token = tokenizer.eos_token
else:
# Fallback for models without eos_token
tokenizer.pad_token = tokenizer.unk_token
# (2) Load dataset
# This example uses only 1% of the dataset for quick testing. Adjust as needed.
dataset = load_dataset(dataset_name, split="train[:1%]", download_config=DownloadConfig(disable_tqdm=True))
# (3) Tokenize
def tokenize_function(examples):
return tokenizer(examples['text'], padding='max_length', max_length=seq_length, truncation=True)
tokenized_dataset = dataset.map(tokenize_function, batched=True, num_proc=1, keep_in_memory=True)
tokenized_dataset.set_format(type='torch', columns=['input_ids', 'attention_mask'])
# (4) Create DataLoader
data_loader = DataLoader(tokenized_dataset, batch_size=batch_size, shuffle=True)
# (5) Use prepare_data_loader for distributed training
return ray.train.torch.prepare_data_loader(data_loader)
以下代码演示了如何使用分词器对样本字符串进行编码。
AutoTokenizer.from_pretrained为您的模型下载并配置分词器。您可以对任何文本字符串进行编码,并检查生成的 token ID 和 attention mask。
# Example usage of get_tokenizer
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
sample_text = "Ray Train and DeepSpeed make distributed training easy!"
encoded = tokenizer(sample_text)
print(encoded)
3. 初始化模型和优化器#
在准备和分发数据集后,下一步是设置模型和优化器进行训练。此函数执行以下操作
从 Hugging Face Hub 加载预训练模型(
AutoModelForCausalLM)。定义优化器(
AdamW)。使用 ZeRO 选项初始化 DeepSpeed 并返回
DeepSpeedEngine。
DeepSpeed 的 initialize 始终会在参与训练的所有 worker 的 GPU 内存之间划分优化器状态(ZeRO Stage 1)。根据选择的阶段,它还可以划分梯度(Stage 2)和模型参数/权重(Stage 3)。这种分阶段的方法平衡了内存节省和通信开销,本教程将在后续步骤中更详细地介绍这些阶段 。
from typing import Dict, Any
import torch
from transformers import AutoModelForCausalLM
import deepspeed
def setup_model_and_optimizer(model_name: str, learning_rate: float, ds_config: Dict[str, Any]) -> deepspeed.runtime.engine.DeepSpeedEngine:
# (1) Load pretrained model
model = AutoModelForCausalLM.from_pretrained(model_name)
# (2) Define optimizer
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
# (3) Initialize with DeepSpeed (distributed and memory optimizations)
ds_engine, _, _, _ = deepspeed.initialize(model=model, optimizer=optimizer, config=ds_config)
return ds_engine
4. 检查点保存和加载#
检查点对于容错和在中断后恢复训练至关重要。本节演示了如何在分布式 Ray Train 和 DeepSpeed 设置中保存和恢复模型和优化器状态。它展示了每个 worker 如何保存自己的检查点分片,Ray 如何将它们捆绑成统一的检查点,以及这如何实现从保存状态无缝恢复或进一步微调。
保存检查点#
首先定义 Ray Train 在训练期间应如何保存检查点。下面的代码演示了如何创建临时目录,存储模型状态,并将检查点信息和指标报告回 Ray Train 以进行跟踪和协调。请注意,DeepSpeed 以分片格式保存模型和优化器状态,其中每个 worker 只存储其分片。
创建一个临时目录来存储检查点。
使用 DeepSpeed 的
save_checkpoint保存分片模型和优化器状态。使用
ray.train.report将指标和检查点位置报告给 Ray Train。
import tempfile
import ray.train
from ray.train import Checkpoint
def report_metrics_and_save_checkpoint(
ds_engine: deepspeed.runtime.engine.DeepSpeedEngine,
metrics: Dict[str, Any]
) -> None:
"""Save worker checkpoints and report metrics to Ray Train.
Each rank writes its shard to a temp directory so Ray Train bundles all of them.
"""
ctx = ray.train.get_context()
epoch_value = metrics["epoch"]
with tempfile.TemporaryDirectory() as tmp_dir:
checkpoint_dir = os.path.join(tmp_dir, "checkpoint")
os.makedirs(checkpoint_dir, exist_ok=True)
ds_engine.save_checkpoint(checkpoint_dir)
epoch_file = os.path.join(checkpoint_dir, "epoch.txt")
with open(epoch_file, "w", encoding="utf-8") as f:
f.write(str(epoch_value))
checkpoint = Checkpoint.from_directory(tmp_dir)
ray.train.report(metrics, checkpoint=checkpoint)
if ctx.get_world_rank() == 0:
experiment_name = ctx.get_experiment_name()
print(
f"Checkpoint saved successfully for experiment {experiment_name} at {checkpoint_dir}. Metrics: {metrics}"
)
加载检查点#
在保存检查点之后,下一步是能够从保存的状态恢复训练或评估。这确保了进度不会因中断而丢失,并允许长时间运行的作业在会话之间无缝继续。重新启动时,Ray Train 会向每个 worker 提供最新的检查点,以便 DeepSpeed 可以从中断处重建模型、优化器和训练进度。
使用 load_checkpoint 将先前保存的检查点恢复到 DeepSpeed 引擎中。
def load_checkpoint(ds_engine: deepspeed.runtime.engine.DeepSpeedEngine, ckpt: ray.train.Checkpoint) -> int:
"""Restore DeepSpeed state and determine next epoch."""
next_epoch = 0
try:
with ckpt.as_directory() as checkpoint_dir:
print(f"Loading checkpoint from {checkpoint_dir}")
epoch_dir = os.path.join(checkpoint_dir, "checkpoint")
if not os.path.isdir(epoch_dir):
epoch_dir = checkpoint_dir
ds_engine.load_checkpoint(epoch_dir)
epoch_file = os.path.join(epoch_dir, "epoch.txt")
if os.path.isfile(epoch_file):
with open(epoch_file, "r", encoding="utf-8") as f:
last_epoch = int(f.read().strip())
next_epoch = last_epoch + 1
except Exception as e:
raise RuntimeError(f"Checkpoint loading failed: {e}") from e
return next_epoch
5. 配置 DeepSpeed#
在启动分布式训练之前,您需要定义一个 DeepSpeed 配置字典(ds_config),该字典控制数据类型设置、批次大小、包括 ZeRO(模型状态分区策略)在内的优化等。此配置决定了 DeepSpeed 如何管理 GPU 之间的内存、通信和性能。
下面的示例显示了一个最小的设置,它启用了 bfloat16 精度、梯度裁剪和 ZeRO 优化。您可以根据您的模型大小、硬件和性能目标进一步自定义此配置。有关更多详细信息,请参阅 高级配置。
# DeepSpeed configuration
ds_config = {
"train_micro_batch_size_per_gpu": BATCH_SIZE,
"bf16": {"enabled": True},
"grad_accum_dtype": "bf16",
"zero_optimization": {
"stage": ZERO_STAGE,
"overlap_comm": True,
"contiguous_gradients": True,
},
"gradient_clipping": 1.0,
}
6. 启动分布式训练#
最后一步是配置参数并启动分布式训练作业。Ray Train 的 TorchTrainer 会自动启动多个 worker—每个 GPU 一个—并在每个实例上执行 train_loop。缩放配置决定启动多少 worker 以及它们使用的资源,而运行配置则管理存储和实验跟踪。
此函数执行以下操作
解析用于训练和模型设置的命令行参数。
定义 Ray Train
ScalingConfig—例如,worker 的数量和 GPU 使用情况。使用超参数和模型详细信息准备训练循环配置。
设置 Ray Train
RunConfig来管理存储和实验元数据。此示例设置了一个随机的实验名称,但您可以指定先前实验的名称来加载检查点。创建一个
TorchTrainer,它会在多个 GPU worker 上启动训练函数。使用
trainer.fit()开始训练并打印结果。
import uuid
from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig, RunConfig
# Ray Train scaling configuration
scaling_config = ScalingConfig(num_workers=NUM_WORKERS, use_gpu=USE_GPU)
# Training loop configuration
train_loop_config = {
"epochs": NUM_EPOCHS,
"learning_rate": LEARNING_RATE,
"batch_size": BATCH_SIZE,
"ds_config": ds_config,
"model_name": MODEL_NAME,
"dataset_name": DATASET_NAME,
"seq_length": SEQ_LENGTH,
"tutorial_steps": TUTORIAL_STEPS,
}
# Ray Train run configuration
run_config = RunConfig(
storage_path=STORAGE_PATH,
# Set the name of the previous experiment when resuming from a checkpoint
name=f"{EXPERIMENT_PREFIX}_{uuid.uuid4().hex[:8]}",
)
# Create and launch the trainer
trainer = TorchTrainer(
train_loop_per_worker=train_loop,
scaling_config=scaling_config,
train_loop_config=train_loop_config,
run_config=run_config,
)
# To actually run training, execute the following:
result = trainer.fit()
print(f"Training finished. Result: {result}")
作为独立脚本运行#
虽然本教程设计为在 Jupyter 笔记本中交互运行,但您也可以将相同的训练工作流作为独立的 Python 脚本启动。这对于运行更长的实验、自动化作业或在集群上部署训练很有用。
完整的 代码 也可用。要从命令行开始训练,请运行
python train.py
高级配置#
DeepSpeed 还有许多其他配置选项可用于调整性能和内存使用。本节介绍一些最常用的选项。有关更多详细信息,请参阅 DeepSpeed 文档。
DeepSpeed ZeRO 阶段#
阶段 1:划分优化器状态(使用 ZeRO 时始终开启)。
阶段 2:另外划分梯度。
阶段 3:另外划分模型参数或权重。
阶段越高,内存节省越多,但训练的通信开销和复杂性也可能越大。您可以通过 ds_config["zero_optimization"]["stage"] 选择阶段。有关更多详细信息,请参阅 DeepSpeed 文档。
ds_config = {
"zero_optimization": {
"stage": 2, # or 1 or 3
},
}
混合精度#
启用 BF16 或 FP16
ds_config = {
"bf16": {"enabled": True}, # or "fp16": {"enabled": True}
}
CPU 卸载#
通过卸载到 CPU 来减少 GPU 内存压力,但会增加 PCIe 流量
ds_config = {
"offload_param": {"device": "cpu", "pin_memory": True},
# or
"offload_optimizer": {"device": "cpu", "pin_memory": True},
}
总结#
在本教程中,您执行了以下操作
使用 Ray Train 和 DeepSpeed ZeRO 微调了 LLM
使用 Ray Train 的
prepare_data_loader设置了分布式数据加载使用 Ray Train 的存储配置保存和管理了检查点
使用
TorchTrainer和缩放配置配置并启动了多 GPU 训练探索了高级 DeepSpeed 配置(ZeRO 阶段、混合精度和 CPU 卸载)