使用 Hugging Face Transformers 开始分布式训练#

本教程向您展示如何将现有的 Hugging Face Transformers 脚本转换为使用 Ray Train 进行分布式训练。

在本指南中,您将学习如何:

  1. 配置一个正确报告指标并保存检查点的训练函数

  2. 为分布式训练作业配置 CPU 或 GPU 的规模和资源需求。

  3. 使用 TorchTrainer 启动分布式训练作业。

要求#

在开始之前安装必要的软件包

pip install "ray[train]" torch "transformers[torch]" datasets evaluate numpy scikit-learn

快速入门#

以下是最终代码结构的快速概览

from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig

def train_func():
    # Your Transformers training code here
    ...

scaling_config = ScalingConfig(num_workers=2, use_gpu=True)
trainer = TorchTrainer(train_func, scaling_config=scaling_config)
result = trainer.fit()

关键组件包括:

  1. train_func:在每个分布式训练工作进程上运行的 Python 代码。

  2. ScalingConfig:定义分布式训练工作进程的数量和 GPU 使用情况。

  3. TorchTrainer:启动和管理分布式训练作业。

代码比较:Hugging Face Transformers 与 Ray Train 集成#

比较标准的 Hugging Face Transformers 脚本与其 Ray Train 等效脚本

# Adapted from Hugging Face tutorial: https://hugging-face.cn/docs/transformers/training

import numpy as np
import evaluate
from datasets import load_dataset
from transformers import (
    Trainer,
    TrainingArguments,
    AutoTokenizer,
    AutoModelForSequenceClassification,
)

# Datasets
dataset = load_dataset("yelp_review_full")
tokenizer = AutoTokenizer.from_pretrained("bert-base-cased")

def tokenize_function(examples):
    return tokenizer(examples["text"], padding="max_length", truncation=True)

small_train_dataset = dataset["train"].select(range(100)).map(tokenize_function, batched=True)
small_eval_dataset = dataset["test"].select(range(100)).map(tokenize_function, batched=True)

# Model
model = AutoModelForSequenceClassification.from_pretrained(
    "bert-base-cased", num_labels=5
)

# Metrics
metric = evaluate.load("accuracy")

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=-1)
    return metric.compute(predictions=predictions, references=labels)

# Hugging Face Trainer
training_args = TrainingArguments(
    output_dir="test_trainer", evaluation_strategy="epoch", report_to="none"
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=small_train_dataset,
    eval_dataset=small_eval_dataset,
    compute_metrics=compute_metrics,
)

# Start Training
trainer.train()
import os

import numpy as np
import evaluate
from datasets import load_dataset
from transformers import (
    Trainer,
    TrainingArguments,
    AutoTokenizer,
    AutoModelForSequenceClassification,
)

import ray.train.huggingface.transformers
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer


# [1] Encapsulate data preprocessing, training, and evaluation
# logic in a training function
# ============================================================
def train_func():
    # Datasets
    dataset = load_dataset("yelp_review_full")
    tokenizer = AutoTokenizer.from_pretrained("bert-base-cased")

    def tokenize_function(examples):
        return tokenizer(examples["text"], padding="max_length", truncation=True)

    small_train_dataset = (
        dataset["train"].select(range(100)).map(tokenize_function, batched=True)
    )
    small_eval_dataset = (
        dataset["test"].select(range(100)).map(tokenize_function, batched=True)
    )

    # Model
    model = AutoModelForSequenceClassification.from_pretrained(
        "bert-base-cased", num_labels=5
    )

    # Evaluation Metrics
    metric = evaluate.load("accuracy")

    def compute_metrics(eval_pred):
        logits, labels = eval_pred
        predictions = np.argmax(logits, axis=-1)
        return metric.compute(predictions=predictions, references=labels)

    # Hugging Face Trainer
    training_args = TrainingArguments(
        output_dir="test_trainer",
        evaluation_strategy="epoch",
        save_strategy="epoch",
        report_to="none",
    )

    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=small_train_dataset,
        eval_dataset=small_eval_dataset,
        compute_metrics=compute_metrics,
    )

    # [2] Report Metrics and Checkpoints to Ray Train
    # ===============================================
    callback = ray.train.huggingface.transformers.RayTrainReportCallback()
    trainer.add_callback(callback)

    # [3] Prepare Transformers Trainer
    # ================================
    trainer = ray.train.huggingface.transformers.prepare_trainer(trainer)

    # Start Training
    trainer.train()


# [4] Define a Ray TorchTrainer to launch `train_func` on all workers
# ===================================================================
ray_trainer = TorchTrainer(
    train_func,
    scaling_config=ScalingConfig(num_workers=2, use_gpu=True),
    # [4a] For multi-node clusters, configure persistent storage that is
    # accessible across all worker nodes
    # run_config=ray.train.RunConfig(storage_path="s3://..."),
)
result: ray.train.Result = ray_trainer.fit()

# [5] Load the trained model
with result.checkpoint.as_directory() as checkpoint_dir:
    checkpoint_path = os.path.join(
        checkpoint_dir,
        ray.train.huggingface.transformers.RayTrainReportCallback.CHECKPOINT_NAME,
    )
    model = AutoModelForSequenceClassification.from_pretrained(checkpoint_path)

设置训练函数#

首先,更新您的训练代码以支持分布式训练。将您的代码包装在一个训练函数中开始。

def train_func():
    # Your model training code here.
    ...

每个分布式训练工作进程都会执行此函数。

您还可以通过 Trainer 的 train_loop_configtrain_func 的输入参数指定为一个字典。例如:

def train_func(config):
    lr = config["lr"]
    num_epochs = config["num_epochs"]

config = {"lr": 1e-4, "num_epochs": 10}
trainer = ray.train.torch.TorchTrainer(train_func, train_loop_config=config, ...)

警告

避免通过 train_loop_config 传递大型数据对象,以减少序列化和反序列化的开销。相反,更推荐直接在 train_func 中初始化大型对象(例如数据集、模型)。

 def load_dataset():
     # Return a large in-memory dataset
     ...

 def load_model():
     # Return a large in-memory model instance
     ...

-config = {"data": load_dataset(), "model": load_model()}

 def train_func(config):
-    data = config["data"]
-    model = config["model"]

+    data = load_dataset()
+    model = load_model()
     ...

 trainer = ray.train.torch.TorchTrainer(train_func, train_loop_config=config, ...)

Ray Train 在进入训练函数之前,会在每个工作进程上设置分布式进程组。将您的所有逻辑放入此函数中,包括: - 数据集构建和预处理 - 模型初始化 - Transformers Trainer 定义

注意

使用 Hugging Face Datasets 或 Evaluate 时,务必在训练函数内部调用 datasets.load_datasetevaluate.load。不要从训练函数外部传递已加载的数据集和指标,因为这在将对象传输到工作进程时可能导致序列化错误。

报告检查点和指标#

为了持久化检查点和监控训练进度,请向您的 Trainer 添加一个 ray.train.huggingface.transformers.RayTrainReportCallback 实用工具回调。

 import transformers
 from ray.train.huggingface.transformers import RayTrainReportCallback

 def train_func():
     ...
     trainer = transformers.Trainer(...)
+    trainer.add_callback(RayTrainReportCallback())
     ...

将指标和检查点报告给 Ray Train 可以实现与 Ray Tune 和容错训练的集成。ray.train.huggingface.transformers.RayTrainReportCallback 提供了一个基本实现,您可以根据需要进行自定义

准备 Transformers Trainer#

将您的 Transformers Trainer 传入 prepare_trainer() 以验证配置并启用 Ray Data 集成。

 import transformers
 import ray.train.huggingface.transformers

 def train_func():
     ...
     trainer = transformers.Trainer(...)
+    trainer = ray.train.huggingface.transformers.prepare_trainer(trainer)
     trainer.train()
     ...

配置规模和 GPU#

在您的训练函数外部,创建一个 ScalingConfig 对象来配置:

  1. num_workers - 分布式训练工作进程的数量。

  2. use_gpu - 每个工作进程是否应使用 GPU(或 CPU)。

from ray.train import ScalingConfig
scaling_config = ScalingConfig(num_workers=2, use_gpu=True)

更多详细信息,请参见配置规模和 GPU

配置持久存储#

创建一个 RunConfig 对象,用于指定保存结果(包括检查点和工件)的路径。

from ray.train import RunConfig

# Local path (/some/local/path/unique_run_name)
run_config = RunConfig(storage_path="/some/local/path", name="unique_run_name")

# Shared cloud storage URI (s3://bucket/unique_run_name)
run_config = RunConfig(storage_path="s3://bucket", name="unique_run_name")

# Shared NFS path (/mnt/nfs/unique_run_name)
run_config = RunConfig(storage_path="/mnt/nfs", name="unique_run_name")

警告

对于单节点集群,指定共享存储位置(例如云存储或 NFS)是可选的,但对于多节点集群是必需的。对于多节点集群,使用本地路径在检查点保存时会引发错误

更多详细信息,请参见配置持久存储

启动训练作业#

将所有这些联系起来,您现在可以使用 TorchTrainer 启动分布式训练作业。

from ray.train.torch import TorchTrainer

trainer = TorchTrainer(
    train_func, scaling_config=scaling_config, run_config=run_config
)
result = trainer.fit()

访问训练结果#

训练完成后,会返回一个 Result 对象,其中包含关于训练运行的信息,包括训练期间报告的指标和检查点。

result.metrics     # The metrics reported during training.
result.checkpoint  # The latest checkpoint reported during training.
result.path        # The path where logs are stored.
result.error       # The exception that was raised, if training failed.

更多使用示例,请参见检查训练结果

下一步#

现在您已将 Hugging Face Transformers 脚本转换为使用 Ray Train:

  • 探索用户指南以了解特定任务

  • 浏览示例以了解端到端 Ray Train 应用

  • 查阅API 参考以获取类和方法的详细信息

TransformersTrainer 迁移指南#

Ray 2.1 引入了 TransformersTrainer,它使用 trainer_init_per_worker 接口来定义 transformers.Trainer 并执行预定义的训练函数。

Ray 2.7 引入了统一的 TorchTrainer API,它提供了更好的透明性、灵活性和简单性。此 API 与标准的 Hugging Face Transformers 脚本更紧密地对齐,让您更好地控制训练代码。

import transformers
from transformers import AutoConfig, AutoModelForCausalLM
from datasets import load_dataset

import ray
from ray.train.huggingface import TransformersTrainer
from ray.train import ScalingConfig


hf_datasets = load_dataset("wikitext", "wikitext-2-raw-v1")
# optional: preprocess the dataset
# hf_datasets = hf_datasets.map(preprocess, ...)

ray_train_ds = ray.data.from_huggingface(hf_datasets["train"])
ray_eval_ds = ray.data.from_huggingface(hf_datasets["validation"])

# Define the Trainer generation function
def trainer_init_per_worker(train_dataset, eval_dataset, **config):
    MODEL_NAME = "gpt2"
    model_config = AutoConfig.from_pretrained(MODEL_NAME)
    model = AutoModelForCausalLM.from_config(model_config)
    args = transformers.TrainingArguments(
        output_dir=f"{MODEL_NAME}-wikitext2",
        evaluation_strategy="epoch",
        save_strategy="epoch",
        logging_strategy="epoch",
        learning_rate=2e-5,
        weight_decay=0.01,
        max_steps=100,
    )
    return transformers.Trainer(
        model=model,
        args=args,
        train_dataset=train_dataset,
        eval_dataset=eval_dataset,
    )

# Build a Ray TransformersTrainer
scaling_config = ScalingConfig(num_workers=4, use_gpu=True)
ray_trainer = TransformersTrainer(
    trainer_init_per_worker=trainer_init_per_worker,
    scaling_config=scaling_config,
    datasets={"train": ray_train_ds, "validation": ray_eval_ds},
)
result = ray_trainer.fit()
import transformers
from transformers import AutoConfig, AutoModelForCausalLM
from datasets import load_dataset

import ray
from ray.train.torch import TorchTrainer
from ray.train.huggingface.transformers import (
    RayTrainReportCallback,
    prepare_trainer,
)
from ray.train import ScalingConfig


hf_datasets = load_dataset("wikitext", "wikitext-2-raw-v1")
# optional: preprocess the dataset
# hf_datasets = hf_datasets.map(preprocess, ...)

ray_train_ds = ray.data.from_huggingface(hf_datasets["train"])
ray_eval_ds = ray.data.from_huggingface(hf_datasets["validation"])

# [1] Define the full training function
# =====================================
def train_func():
    MODEL_NAME = "gpt2"
    model_config = AutoConfig.from_pretrained(MODEL_NAME)
    model = AutoModelForCausalLM.from_config(model_config)

    # [2] Build Ray Data iterables
    # ============================
    train_dataset = ray.train.get_dataset_shard("train")
    eval_dataset = ray.train.get_dataset_shard("validation")

    train_iterable_ds = train_dataset.iter_torch_batches(batch_size=8)
    eval_iterable_ds = eval_dataset.iter_torch_batches(batch_size=8)

    args = transformers.TrainingArguments(
        output_dir=f"{MODEL_NAME}-wikitext2",
        evaluation_strategy="epoch",
        save_strategy="epoch",
        logging_strategy="epoch",
        learning_rate=2e-5,
        weight_decay=0.01,
        max_steps=100,
    )

    trainer = transformers.Trainer(
        model=model,
        args=args,
        train_dataset=train_iterable_ds,
        eval_dataset=eval_iterable_ds,
    )

    # [3] Add Ray Train Report Callback
    # =================================
    trainer.add_callback(RayTrainReportCallback())

    # [4] Prepare your trainer
    # ========================
    trainer = prepare_trainer(trainer)
    trainer.train()

# Build a Ray TorchTrainer
scaling_config = ScalingConfig(num_workers=4, use_gpu=True)
ray_trainer = TorchTrainer(
    train_func,
    scaling_config=scaling_config,
    datasets={"train": ray_train_ds, "validation": ray_eval_ds},
)
result = ray_trainer.fit()