使用 Hugging Face Transformers 进行分布式训练入门#
本教程演示如何将现有的 Hugging Face Transformers 脚本转换为使用 Ray Train 进行分布式训练。
在本指南中,您将学习如何
配置一个 训练函数,该函数可以正确报告指标并保存检查点。
为分布式训练作业配置 扩展 和资源需求(CPU 或 GPU)。
使用
TorchTrainer启动分布式训练作业。
要求#
在开始之前安装必要的软件包
pip install "ray[train]" torch "transformers[torch]" datasets evaluate numpy scikit-learn
快速入门#
这是最终代码结构的快速概述
from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig
def train_func():
# Your Transformers training code here
...
scaling_config = ScalingConfig(num_workers=2, use_gpu=True)
trainer = TorchTrainer(train_func, scaling_config=scaling_config)
result = trainer.fit()
关键组件是
train_func:在每个分布式训练工作进程上运行的 Python 代码。ScalingConfig:定义分布式训练工作进程的数量和 GPU 使用情况。TorchTrainer:启动和管理分布式训练作业。
代码比较:Hugging Face Transformers 与 Ray Train 集成#
比较标准的 Hugging Face Transformers 脚本与其 Ray Train 等效脚本
import os
import numpy as np
import evaluate
from datasets import load_dataset
from transformers import (
Trainer,
TrainingArguments,
AutoTokenizer,
AutoModelForSequenceClassification,
)
import ray.train.huggingface.transformers
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer
# [1] Encapsulate data preprocessing, training, and evaluation
# logic in a training function
# ============================================================
def train_func():
# Datasets
dataset = load_dataset("yelp_review_full")
tokenizer = AutoTokenizer.from_pretrained("bert-base-cased")
def tokenize_function(examples):
return tokenizer(examples["text"], padding="max_length", truncation=True)
small_train_dataset = (
dataset["train"].select(range(100)).map(tokenize_function, batched=True)
)
small_eval_dataset = (
dataset["test"].select(range(100)).map(tokenize_function, batched=True)
)
# Model
model = AutoModelForSequenceClassification.from_pretrained(
"bert-base-cased", num_labels=5
)
# Evaluation Metrics
metric = evaluate.load("accuracy")
def compute_metrics(eval_pred):
logits, labels = eval_pred
predictions = np.argmax(logits, axis=-1)
return metric.compute(predictions=predictions, references=labels)
# Hugging Face Trainer
training_args = TrainingArguments(
output_dir="test_trainer",
evaluation_strategy="epoch",
save_strategy="epoch",
report_to="none",
)
trainer = Trainer(
model=model,
args=training_args,
train_dataset=small_train_dataset,
eval_dataset=small_eval_dataset,
compute_metrics=compute_metrics,
)
# [2] Report Metrics and Checkpoints to Ray Train
# ===============================================
callback = ray.train.huggingface.transformers.RayTrainReportCallback()
trainer.add_callback(callback)
# [3] Prepare Transformers Trainer
# ================================
trainer = ray.train.huggingface.transformers.prepare_trainer(trainer)
# Start Training
trainer.train()
# [4] Define a Ray TorchTrainer to launch `train_func` on all workers
# ===================================================================
ray_trainer = TorchTrainer(
train_func,
scaling_config=ScalingConfig(num_workers=2, use_gpu=True),
# [4a] For multi-node clusters, configure persistent storage that is
# accessible across all worker nodes
# run_config=ray.train.RunConfig(storage_path="s3://..."),
)
result: ray.train.Result = ray_trainer.fit()
# [5] Load the trained model
with result.checkpoint.as_directory() as checkpoint_dir:
checkpoint_path = os.path.join(
checkpoint_dir,
ray.train.huggingface.transformers.RayTrainReportCallback.CHECKPOINT_NAME,
)
model = AutoModelForSequenceClassification.from_pretrained(checkpoint_path)
# Adapted from Hugging Face tutorial: https://hugging-face.cn/docs/transformers/training
import numpy as np
import evaluate
from datasets import load_dataset
from transformers import (
Trainer,
TrainingArguments,
AutoTokenizer,
AutoModelForSequenceClassification,
)
# Datasets
dataset = load_dataset("yelp_review_full")
tokenizer = AutoTokenizer.from_pretrained("bert-base-cased")
def tokenize_function(examples):
return tokenizer(examples["text"], padding="max_length", truncation=True)
small_train_dataset = dataset["train"].select(range(100)).map(tokenize_function, batched=True)
small_eval_dataset = dataset["test"].select(range(100)).map(tokenize_function, batched=True)
# Model
model = AutoModelForSequenceClassification.from_pretrained(
"bert-base-cased", num_labels=5
)
# Metrics
metric = evaluate.load("accuracy")
def compute_metrics(eval_pred):
logits, labels = eval_pred
predictions = np.argmax(logits, axis=-1)
return metric.compute(predictions=predictions, references=labels)
# Hugging Face Trainer
training_args = TrainingArguments(
output_dir="test_trainer", evaluation_strategy="epoch", report_to="none"
)
trainer = Trainer(
model=model,
args=training_args,
train_dataset=small_train_dataset,
eval_dataset=small_eval_dataset,
compute_metrics=compute_metrics,
)
# Start Training
trainer.train()
设置训练函数#
首先,更新您的训练代码以支持分布式训练。首先将您的代码封装在一个 训练函数 中
def train_func():
# Your model training code here.
...
每个分布式训练工作节点都会执行此函数。
您也可以通过 Trainer 的 train_loop_config 将 train_func 的输入参数指定为字典。例如
def train_func(config):
lr = config["lr"]
num_epochs = config["num_epochs"]
config = {"lr": 1e-4, "num_epochs": 10}
trainer = ray.train.torch.TorchTrainer(train_func, train_loop_config=config, ...)
警告
为避免通过 train_loop_config 传递大型数据对象以减少序列化和反序列化开销。相反,更推荐在 train_func 中直接初始化大型对象(例如数据集、模型)。
def load_dataset():
# Return a large in-memory dataset
...
def load_model():
# Return a large in-memory model instance
...
-config = {"data": load_dataset(), "model": load_model()}
def train_func(config):
- data = config["data"]
- model = config["model"]
+ data = load_dataset()
+ model = load_model()
...
trainer = ray.train.torch.TorchTrainer(train_func, train_loop_config=config, ...)
Ray Train 在进入训练函数之前,会在每个工作进程上设置分布式进程组。将所有逻辑放入此函数中,包括
数据集构建和预处理
模型初始化
Transformers Trainer 定义
注意
在使用 Hugging Face Datasets 或 Evaluate 时,请务必在训练函数内部调用 datasets.load_dataset 和 evaluate.load。不要从训练函数外部传递已加载的数据集和指标,因为这可能导致将对象传输到工作进程时出现序列化错误。
报告检查点和指标#
要持久化检查点并监控训练进度,请将 ray.train.huggingface.transformers.RayTrainReportCallback 实用程序回调添加到您的 Trainer 中
import transformers
from ray.train.huggingface.transformers import RayTrainReportCallback
def train_func():
...
trainer = transformers.Trainer(...)
+ trainer.add_callback(RayTrainReportCallback())
...
将指标和检查点报告给 Ray Train,可以实现与 Ray Tune 和 容错训练 的集成。 ray.train.huggingface.transformers.RayTrainReportCallback 提供了一个基本实现,您可以 自定义它 以满足您的需求。
准备一个 Transformers Trainer#
将您的 Transformers Trainer 传递给 prepare_trainer() 以验证配置并启用 Ray Data 集成
import transformers
import ray.train.huggingface.transformers
def train_func():
...
trainer = transformers.Trainer(...)
+ trainer = ray.train.huggingface.transformers.prepare_trainer(trainer)
trainer.train()
...
配置缩放和 GPU#
在训练函数之外,创建一个 ScalingConfig 对象来配置
num_workers- 分布式训练工作节点的数量。use_gpu- 每个工作节点是否应使用 GPU(或 CPU)。
from ray.train import ScalingConfig
scaling_config = ScalingConfig(num_workers=2, use_gpu=True)
有关更多详细信息,请参阅 配置缩放和 GPU。
配置持久存储#
创建一个 RunConfig 对象来指定结果(包括检查点和工件)将要保存的路径。
from ray.train import RunConfig
# Local path (/some/local/path/unique_run_name)
run_config = RunConfig(storage_path="/some/local/path", name="unique_run_name")
# Shared cloud storage URI (s3://bucket/unique_run_name)
run_config = RunConfig(storage_path="s3://bucket", name="unique_run_name")
# Shared NFS path (/mnt/nfs/unique_run_name)
run_config = RunConfig(storage_path="/mnt/nfs", name="unique_run_name")
警告
指定一个*共享存储位置*(如云存储或 NFS)对于单节点集群是*可选的*,但对于多节点集群是*必需的*。使用本地路径将在多节点集群的检查点过程中*引发错误*。
有关更多详细信息,请参阅 配置持久存储。
启动训练作业#
将所有这些结合起来,您现在可以使用 TorchTrainer 启动分布式训练作业。
from ray.train.torch import TorchTrainer
trainer = TorchTrainer(
train_func, scaling_config=scaling_config, run_config=run_config
)
result = trainer.fit()
访问训练结果#
训练完成后,将返回一个 Result 对象,其中包含有关训练运行的信息,包括训练期间报告的指标和检查点。
result.metrics # The metrics reported during training.
result.checkpoint # The latest checkpoint reported during training.
result.path # The path where logs are stored.
result.error # The exception that was raised, if training failed.
有关更多用法示例,请参阅 检查训练结果。
下一步#
现在您已将 Hugging Face Transformers 脚本转换为使用 Ray Train
TransformersTrainer 迁移指南#
Ray 2.1 引入了 TransformersTrainer,并带有 trainer_init_per_worker 接口来定义 transformers.Trainer 并执行预定义的训练函数。
Ray 2.7 引入了统一的 TorchTrainer API,它提供了更好的透明度、灵活性和简单性。此 API 与标准的 Hugging Face Transformers 脚本更加吻合,让您可以更好地控制训练代码。
import transformers
from transformers import AutoConfig, AutoModelForCausalLM
from datasets import load_dataset
import ray
from ray.train.huggingface import TransformersTrainer
from ray.train import ScalingConfig
from huggingface_hub import HfFileSystem
# Load datasets using HfFileSystem
path = "hf://datasets/Salesforce/wikitext/wikitext-2-raw-v1/"
fs = HfFileSystem()
# List the parquet files for each split
all_files = [f["name"] for f in fs.ls(path)]
train_files = [f for f in all_files if "train" in f and f.endswith(".parquet")]
validation_files = [f for f in all_files if "validation" in f and f.endswith(".parquet")]
ray_train_ds = ray.data.read_parquet(train_files, filesystem=fs)
ray_eval_ds = ray.data.read_parquet(validation_files, filesystem=fs)
# Define the Trainer generation function
def trainer_init_per_worker(train_dataset, eval_dataset, **config):
MODEL_NAME = "gpt2"
model_config = AutoConfig.from_pretrained(MODEL_NAME)
model = AutoModelForCausalLM.from_config(model_config)
args = transformers.TrainingArguments(
output_dir=f"{MODEL_NAME}-wikitext2",
evaluation_strategy="epoch",
save_strategy="epoch",
logging_strategy="epoch",
learning_rate=2e-5,
weight_decay=0.01,
max_steps=100,
)
return transformers.Trainer(
model=model,
args=args,
train_dataset=train_dataset,
eval_dataset=eval_dataset,
)
# Build a Ray TransformersTrainer
scaling_config = ScalingConfig(num_workers=4, use_gpu=True)
ray_trainer = TransformersTrainer(
trainer_init_per_worker=trainer_init_per_worker,
scaling_config=scaling_config,
datasets={"train": ray_train_ds, "validation": ray_eval_ds},
)
result = ray_trainer.fit()
import transformers
from transformers import AutoConfig, AutoModelForCausalLM
from datasets import load_dataset
import ray
from ray.train.torch import TorchTrainer
from ray.train.huggingface.transformers import (
RayTrainReportCallback,
prepare_trainer,
)
from ray.train import ScalingConfig
from huggingface_hub import HfFileSystem
# Load datasets using HfFileSystem
path = "hf://datasets/Salesforce/wikitext/wikitext-2-raw-v1/"
fs = HfFileSystem()
# List the parquet files for each split
all_files = [f["name"] for f in fs.ls(path)]
train_files = [f for f in all_files if "train" in f and f.endswith(".parquet")]
validation_files = [f for f in all_files if "validation" in f and f.endswith(".parquet")]
ray_train_ds = ray.data.read_parquet(train_files, filesystem=fs)
ray_eval_ds = ray.data.read_parquet(validation_files, filesystem=fs)
# [1] Define the full training function
# =====================================
def train_func():
MODEL_NAME = "gpt2"
model_config = AutoConfig.from_pretrained(MODEL_NAME)
model = AutoModelForCausalLM.from_config(model_config)
# [2] Build Ray Data iterables
# ============================
train_dataset = ray.train.get_dataset_shard("train")
eval_dataset = ray.train.get_dataset_shard("validation")
train_iterable_ds = train_dataset.iter_torch_batches(batch_size=8)
eval_iterable_ds = eval_dataset.iter_torch_batches(batch_size=8)
args = transformers.TrainingArguments(
output_dir=f"{MODEL_NAME}-wikitext2",
evaluation_strategy="epoch",
save_strategy="epoch",
logging_strategy="epoch",
learning_rate=2e-5,
weight_decay=0.01,
max_steps=100,
)
trainer = transformers.Trainer(
model=model,
args=args,
train_dataset=train_iterable_ds,
eval_dataset=eval_iterable_ds,
)
# [3] Add Ray Train Report Callback
# =================================
trainer.add_callback(RayTrainReportCallback())
# [4] Prepare your trainer
# ========================
trainer = prepare_trainer(trainer)
trainer.train()
# Build a Ray TorchTrainer
scaling_config = ScalingConfig(num_workers=4, use_gpu=True)
ray_trainer = TorchTrainer(
train_func,
scaling_config=scaling_config,
datasets={"train": ray_train_ds, "validation": ray_eval_ds},
)
result = ray_trainer.fit()