使用 Hugging Face Transformers 开始分布式训练#
本教程向您展示如何将现有的 Hugging Face Transformers 脚本转换为使用 Ray Train 进行分布式训练。
在本指南中,您将学习如何:
配置一个正确报告指标并保存检查点的训练函数。
为分布式训练作业配置 CPU 或 GPU 的规模和资源需求。
使用
TorchTrainer
启动分布式训练作业。
要求#
在开始之前安装必要的软件包
pip install "ray[train]" torch "transformers[torch]" datasets evaluate numpy scikit-learn
快速入门#
以下是最终代码结构的快速概览
from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig
def train_func():
# Your Transformers training code here
...
scaling_config = ScalingConfig(num_workers=2, use_gpu=True)
trainer = TorchTrainer(train_func, scaling_config=scaling_config)
result = trainer.fit()
关键组件包括:
train_func
:在每个分布式训练工作进程上运行的 Python 代码。ScalingConfig
:定义分布式训练工作进程的数量和 GPU 使用情况。TorchTrainer
:启动和管理分布式训练作业。
代码比较:Hugging Face Transformers 与 Ray Train 集成#
比较标准的 Hugging Face Transformers 脚本与其 Ray Train 等效脚本
# Adapted from Hugging Face tutorial: https://hugging-face.cn/docs/transformers/training
import numpy as np
import evaluate
from datasets import load_dataset
from transformers import (
Trainer,
TrainingArguments,
AutoTokenizer,
AutoModelForSequenceClassification,
)
# Datasets
dataset = load_dataset("yelp_review_full")
tokenizer = AutoTokenizer.from_pretrained("bert-base-cased")
def tokenize_function(examples):
return tokenizer(examples["text"], padding="max_length", truncation=True)
small_train_dataset = dataset["train"].select(range(100)).map(tokenize_function, batched=True)
small_eval_dataset = dataset["test"].select(range(100)).map(tokenize_function, batched=True)
# Model
model = AutoModelForSequenceClassification.from_pretrained(
"bert-base-cased", num_labels=5
)
# Metrics
metric = evaluate.load("accuracy")
def compute_metrics(eval_pred):
logits, labels = eval_pred
predictions = np.argmax(logits, axis=-1)
return metric.compute(predictions=predictions, references=labels)
# Hugging Face Trainer
training_args = TrainingArguments(
output_dir="test_trainer", evaluation_strategy="epoch", report_to="none"
)
trainer = Trainer(
model=model,
args=training_args,
train_dataset=small_train_dataset,
eval_dataset=small_eval_dataset,
compute_metrics=compute_metrics,
)
# Start Training
trainer.train()
import os
import numpy as np
import evaluate
from datasets import load_dataset
from transformers import (
Trainer,
TrainingArguments,
AutoTokenizer,
AutoModelForSequenceClassification,
)
import ray.train.huggingface.transformers
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer
# [1] Encapsulate data preprocessing, training, and evaluation
# logic in a training function
# ============================================================
def train_func():
# Datasets
dataset = load_dataset("yelp_review_full")
tokenizer = AutoTokenizer.from_pretrained("bert-base-cased")
def tokenize_function(examples):
return tokenizer(examples["text"], padding="max_length", truncation=True)
small_train_dataset = (
dataset["train"].select(range(100)).map(tokenize_function, batched=True)
)
small_eval_dataset = (
dataset["test"].select(range(100)).map(tokenize_function, batched=True)
)
# Model
model = AutoModelForSequenceClassification.from_pretrained(
"bert-base-cased", num_labels=5
)
# Evaluation Metrics
metric = evaluate.load("accuracy")
def compute_metrics(eval_pred):
logits, labels = eval_pred
predictions = np.argmax(logits, axis=-1)
return metric.compute(predictions=predictions, references=labels)
# Hugging Face Trainer
training_args = TrainingArguments(
output_dir="test_trainer",
evaluation_strategy="epoch",
save_strategy="epoch",
report_to="none",
)
trainer = Trainer(
model=model,
args=training_args,
train_dataset=small_train_dataset,
eval_dataset=small_eval_dataset,
compute_metrics=compute_metrics,
)
# [2] Report Metrics and Checkpoints to Ray Train
# ===============================================
callback = ray.train.huggingface.transformers.RayTrainReportCallback()
trainer.add_callback(callback)
# [3] Prepare Transformers Trainer
# ================================
trainer = ray.train.huggingface.transformers.prepare_trainer(trainer)
# Start Training
trainer.train()
# [4] Define a Ray TorchTrainer to launch `train_func` on all workers
# ===================================================================
ray_trainer = TorchTrainer(
train_func,
scaling_config=ScalingConfig(num_workers=2, use_gpu=True),
# [4a] For multi-node clusters, configure persistent storage that is
# accessible across all worker nodes
# run_config=ray.train.RunConfig(storage_path="s3://..."),
)
result: ray.train.Result = ray_trainer.fit()
# [5] Load the trained model
with result.checkpoint.as_directory() as checkpoint_dir:
checkpoint_path = os.path.join(
checkpoint_dir,
ray.train.huggingface.transformers.RayTrainReportCallback.CHECKPOINT_NAME,
)
model = AutoModelForSequenceClassification.from_pretrained(checkpoint_path)
设置训练函数#
首先,更新您的训练代码以支持分布式训练。将您的代码包装在一个训练函数中开始。
def train_func():
# Your model training code here.
...
每个分布式训练工作进程都会执行此函数。
您还可以通过 Trainer 的 train_loop_config
将 train_func
的输入参数指定为一个字典。例如:
def train_func(config):
lr = config["lr"]
num_epochs = config["num_epochs"]
config = {"lr": 1e-4, "num_epochs": 10}
trainer = ray.train.torch.TorchTrainer(train_func, train_loop_config=config, ...)
警告
避免通过 train_loop_config
传递大型数据对象,以减少序列化和反序列化的开销。相反,更推荐直接在 train_func
中初始化大型对象(例如数据集、模型)。
def load_dataset():
# Return a large in-memory dataset
...
def load_model():
# Return a large in-memory model instance
...
-config = {"data": load_dataset(), "model": load_model()}
def train_func(config):
- data = config["data"]
- model = config["model"]
+ data = load_dataset()
+ model = load_model()
...
trainer = ray.train.torch.TorchTrainer(train_func, train_loop_config=config, ...)
Ray Train 在进入训练函数之前,会在每个工作进程上设置分布式进程组。将您的所有逻辑放入此函数中,包括: - 数据集构建和预处理 - 模型初始化 - Transformers Trainer 定义
注意
使用 Hugging Face Datasets 或 Evaluate 时,务必在训练函数内部调用 datasets.load_dataset
和 evaluate.load
。不要从训练函数外部传递已加载的数据集和指标,因为这在将对象传输到工作进程时可能导致序列化错误。
报告检查点和指标#
为了持久化检查点和监控训练进度,请向您的 Trainer 添加一个 ray.train.huggingface.transformers.RayTrainReportCallback
实用工具回调。
import transformers
from ray.train.huggingface.transformers import RayTrainReportCallback
def train_func():
...
trainer = transformers.Trainer(...)
+ trainer.add_callback(RayTrainReportCallback())
...
将指标和检查点报告给 Ray Train 可以实现与 Ray Tune 和容错训练的集成。ray.train.huggingface.transformers.RayTrainReportCallback
提供了一个基本实现,您可以根据需要进行自定义。
准备 Transformers Trainer#
将您的 Transformers Trainer 传入 prepare_trainer()
以验证配置并启用 Ray Data 集成。
import transformers
import ray.train.huggingface.transformers
def train_func():
...
trainer = transformers.Trainer(...)
+ trainer = ray.train.huggingface.transformers.prepare_trainer(trainer)
trainer.train()
...
配置规模和 GPU#
在您的训练函数外部,创建一个 ScalingConfig
对象来配置:
num_workers
- 分布式训练工作进程的数量。use_gpu
- 每个工作进程是否应使用 GPU(或 CPU)。
from ray.train import ScalingConfig
scaling_config = ScalingConfig(num_workers=2, use_gpu=True)
更多详细信息,请参见配置规模和 GPU。
配置持久存储#
创建一个 RunConfig
对象,用于指定保存结果(包括检查点和工件)的路径。
from ray.train import RunConfig
# Local path (/some/local/path/unique_run_name)
run_config = RunConfig(storage_path="/some/local/path", name="unique_run_name")
# Shared cloud storage URI (s3://bucket/unique_run_name)
run_config = RunConfig(storage_path="s3://bucket", name="unique_run_name")
# Shared NFS path (/mnt/nfs/unique_run_name)
run_config = RunConfig(storage_path="/mnt/nfs", name="unique_run_name")
警告
对于单节点集群,指定共享存储位置(例如云存储或 NFS)是可选的,但对于多节点集群是必需的。对于多节点集群,使用本地路径在检查点保存时会引发错误。
更多详细信息,请参见配置持久存储。
启动训练作业#
将所有这些联系起来,您现在可以使用 TorchTrainer
启动分布式训练作业。
from ray.train.torch import TorchTrainer
trainer = TorchTrainer(
train_func, scaling_config=scaling_config, run_config=run_config
)
result = trainer.fit()
访问训练结果#
训练完成后,会返回一个 Result
对象,其中包含关于训练运行的信息,包括训练期间报告的指标和检查点。
result.metrics # The metrics reported during training.
result.checkpoint # The latest checkpoint reported during training.
result.path # The path where logs are stored.
result.error # The exception that was raised, if training failed.
更多使用示例,请参见检查训练结果。
下一步#
现在您已将 Hugging Face Transformers 脚本转换为使用 Ray Train:
TransformersTrainer 迁移指南#
Ray 2.1 引入了 TransformersTrainer
,它使用 trainer_init_per_worker
接口来定义 transformers.Trainer
并执行预定义的训练函数。
Ray 2.7 引入了统一的 TorchTrainer
API,它提供了更好的透明性、灵活性和简单性。此 API 与标准的 Hugging Face Transformers 脚本更紧密地对齐,让您更好地控制训练代码。
import transformers
from transformers import AutoConfig, AutoModelForCausalLM
from datasets import load_dataset
import ray
from ray.train.huggingface import TransformersTrainer
from ray.train import ScalingConfig
hf_datasets = load_dataset("wikitext", "wikitext-2-raw-v1")
# optional: preprocess the dataset
# hf_datasets = hf_datasets.map(preprocess, ...)
ray_train_ds = ray.data.from_huggingface(hf_datasets["train"])
ray_eval_ds = ray.data.from_huggingface(hf_datasets["validation"])
# Define the Trainer generation function
def trainer_init_per_worker(train_dataset, eval_dataset, **config):
MODEL_NAME = "gpt2"
model_config = AutoConfig.from_pretrained(MODEL_NAME)
model = AutoModelForCausalLM.from_config(model_config)
args = transformers.TrainingArguments(
output_dir=f"{MODEL_NAME}-wikitext2",
evaluation_strategy="epoch",
save_strategy="epoch",
logging_strategy="epoch",
learning_rate=2e-5,
weight_decay=0.01,
max_steps=100,
)
return transformers.Trainer(
model=model,
args=args,
train_dataset=train_dataset,
eval_dataset=eval_dataset,
)
# Build a Ray TransformersTrainer
scaling_config = ScalingConfig(num_workers=4, use_gpu=True)
ray_trainer = TransformersTrainer(
trainer_init_per_worker=trainer_init_per_worker,
scaling_config=scaling_config,
datasets={"train": ray_train_ds, "validation": ray_eval_ds},
)
result = ray_trainer.fit()
import transformers
from transformers import AutoConfig, AutoModelForCausalLM
from datasets import load_dataset
import ray
from ray.train.torch import TorchTrainer
from ray.train.huggingface.transformers import (
RayTrainReportCallback,
prepare_trainer,
)
from ray.train import ScalingConfig
hf_datasets = load_dataset("wikitext", "wikitext-2-raw-v1")
# optional: preprocess the dataset
# hf_datasets = hf_datasets.map(preprocess, ...)
ray_train_ds = ray.data.from_huggingface(hf_datasets["train"])
ray_eval_ds = ray.data.from_huggingface(hf_datasets["validation"])
# [1] Define the full training function
# =====================================
def train_func():
MODEL_NAME = "gpt2"
model_config = AutoConfig.from_pretrained(MODEL_NAME)
model = AutoModelForCausalLM.from_config(model_config)
# [2] Build Ray Data iterables
# ============================
train_dataset = ray.train.get_dataset_shard("train")
eval_dataset = ray.train.get_dataset_shard("validation")
train_iterable_ds = train_dataset.iter_torch_batches(batch_size=8)
eval_iterable_ds = eval_dataset.iter_torch_batches(batch_size=8)
args = transformers.TrainingArguments(
output_dir=f"{MODEL_NAME}-wikitext2",
evaluation_strategy="epoch",
save_strategy="epoch",
logging_strategy="epoch",
learning_rate=2e-5,
weight_decay=0.01,
max_steps=100,
)
trainer = transformers.Trainer(
model=model,
args=args,
train_dataset=train_iterable_ds,
eval_dataset=eval_iterable_ds,
)
# [3] Add Ray Train Report Callback
# =================================
trainer.add_callback(RayTrainReportCallback())
# [4] Prepare your trainer
# ========================
trainer = prepare_trainer(trainer)
trainer.train()
# Build a Ray TorchTrainer
scaling_config = ScalingConfig(num_workers=4, use_gpu=True)
ray_trainer = TorchTrainer(
train_func,
scaling_config=scaling_config,
datasets={"train": ray_train_ds, "validation": ray_eval_ds},
)
result = ray_trainer.fit()