使用 Hugging Face Accelerate 开始分布式训练#

雷的 TorchTrainer 可以帮助您轻松地在分布式 Ray 集群上启动您的 Accelerate 训练。

您只需要使用 TorchTrainer 运行您现有的训练代码。最终代码预计如下所示

from accelerate import Accelerator

def train_func():
    # Instantiate the accelerator
    accelerator = Accelerator(...)

    model = ...
    optimizer = ...
    train_dataloader = ...
    eval_dataloader = ...
    lr_scheduler = ...

    # Prepare everything for distributed training
    (
        model,
        optimizer,
        train_dataloader,
        eval_dataloader,
        lr_scheduler,
    ) = accelerator.prepare(
        model, optimizer, train_dataloader, eval_dataloader, lr_scheduler
    )

    # Start training
    ...

from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig

trainer = TorchTrainer(
    train_func,
    scaling_config=ScalingConfig(...),
    # If running in a multi-node cluster, this is where you
    # should configure the run's persistent storage that is accessible
    # across all worker nodes.
    # run_config=ray.train.RunConfig(storage_path="s3://..."),
    ...
)
trainer.fit()

提示

分布式训练的模型和数据准备完全由 Accelerator 对象及其 Accelerator.prepare() 方法处理。

与原生 PyTorch 不同,不要在您的训练函数中调用任何额外的 Ray Train 实用程序,例如 prepare_model()prepare_data_loader()

配置 Accelerate#

在 Ray Train 中,您可以通过训练函数中的 accelerate.Accelerator 对象设置配置。以下是配置 Accelerate 的入门示例。

例如,要使用 Accelerate 运行 DeepSpeed,可以从字典创建一个 DeepSpeedPlugin

from accelerate import Accelerator, DeepSpeedPlugin

DEEPSPEED_CONFIG = {
    "fp16": {
        "enabled": True
    },
    "zero_optimization": {
        "stage": 3,
        "offload_optimizer": {
            "device": "cpu",
            "pin_memory": False
        },
        "overlap_comm": True,
        "contiguous_gradients": True,
        "reduce_bucket_size": "auto",
        "stage3_prefetch_bucket_size": "auto",
        "stage3_param_persistence_threshold": "auto",
        "gather_16bit_weights_on_model_save": True,
        "round_robin_gradients": True
    },
    "gradient_accumulation_steps": "auto",
    "gradient_clipping": "auto",
    "steps_per_print": 10,
    "train_batch_size": "auto",
    "train_micro_batch_size_per_gpu": "auto",
    "wall_clock_breakdown": False
}

def train_func():
    # Create a DeepSpeedPlugin from config dict
    ds_plugin = DeepSpeedPlugin(hf_ds_config=DEEPSPEED_CONFIG)

    # Initialize Accelerator
    accelerator = Accelerator(
        ...,
        deepspeed_plugin=ds_plugin,
    )

    # Start training
    ...

from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig

trainer = TorchTrainer(
    train_func,
    scaling_config=ScalingConfig(...),
    run_config=ray.train.RunConfig(storage_path="s3://..."),
    ...
)
trainer.fit()

对于 PyTorch FSDP,创建一个 FullyShardedDataParallelPlugin 并将其传递给 Accelerator。

from torch.distributed.fsdp.fully_sharded_data_parallel import FullOptimStateDictConfig, FullStateDictConfig
from accelerate import Accelerator, FullyShardedDataParallelPlugin

def train_func():
    fsdp_plugin = FullyShardedDataParallelPlugin(
        state_dict_config=FullStateDictConfig(
            offload_to_cpu=False,
            rank0_only=False
        ),
        optim_state_dict_config=FullOptimStateDictConfig(
            offload_to_cpu=False,
            rank0_only=False
        )
    )

    # Initialize accelerator
    accelerator = Accelerator(
        ...,
        fsdp_plugin=fsdp_plugin,
    )

    # Start training
    ...

from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig

trainer = TorchTrainer(
    train_func,
    scaling_config=ScalingConfig(...),
    run_config=ray.train.RunConfig(storage_path="s3://..."),
    ...
)
trainer.fit()

请注意,Accelerate 也提供了一个 CLI 工具 "accelerate config",用于生成配置并使用 "accelerate launch" 启动您的训练任务。但是,在这里这不是必需的,因为 Ray 的 TorchTrainer 已经设置了 Torch 分布式环境并在所有 worker 上启动了训练函数。

接下来,请参阅下面的端到端示例以获取更多详细信息

显示代码
"""
Minimal Ray Train and Accelerate example adapted from
https://github.com/huggingface/accelerate/blob/main/examples/nlp_example.py

Fine-tune a BERT model with Hugging Face Accelerate and Ray Train and Ray Data
"""

from tempfile import TemporaryDirectory

import evaluate
import torch
from accelerate import Accelerator
from datasets import load_dataset
from torch.optim import AdamW
from transformers import (
    AutoModelForSequenceClassification,
    AutoTokenizer,
    get_linear_schedule_with_warmup,
    set_seed,
)

import ray
import ray.train
from ray.train import Checkpoint, DataConfig, ScalingConfig
from ray.train.torch import TorchTrainer


def train_func(config):
    """Your training function that launches on each worker."""

    # Unpack training configs
    lr = config["lr"]
    seed = config["seed"]
    num_epochs = config["num_epochs"]
    train_batch_size = config["train_batch_size"]
    eval_batch_size = config["eval_batch_size"]
    train_ds_size = config["train_dataset_size"]

    set_seed(seed)

    # Initialize accelerator
    accelerator = Accelerator()

    # Load datasets and metrics
    metric = evaluate.load("glue", "mrpc")

    # Prepare Ray Data loaders
    # ====================================================
    train_ds = ray.train.get_dataset_shard("train")
    eval_ds = ray.train.get_dataset_shard("validation")

    tokenizer = AutoTokenizer.from_pretrained("bert-base-cased")

    def collate_fn(batch):
        outputs = tokenizer(
            list(batch["sentence1"]),
            list(batch["sentence2"]),
            truncation=True,
            padding="longest",
            return_tensors="pt",
        )
        outputs["labels"] = torch.LongTensor(batch["label"])
        outputs = {k: v.to(accelerator.device) for k, v in outputs.items()}
        return outputs

    train_dataloader = train_ds.iter_torch_batches(
        batch_size=train_batch_size, collate_fn=collate_fn
    )
    eval_dataloader = eval_ds.iter_torch_batches(
        batch_size=eval_batch_size, collate_fn=collate_fn
    )
    # ====================================================

    # Instantiate the model, optimizer, lr_scheduler
    model = AutoModelForSequenceClassification.from_pretrained(
        "bert-base-cased", return_dict=True
    )

    optimizer = AdamW(params=model.parameters(), lr=lr)

    steps_per_epoch = train_ds_size // (accelerator.num_processes * train_batch_size)
    lr_scheduler = get_linear_schedule_with_warmup(
        optimizer=optimizer,
        num_warmup_steps=100,
        num_training_steps=(steps_per_epoch * num_epochs),
    )

    # Prepare everything with accelerator
    model, optimizer, lr_scheduler = accelerator.prepare(model, optimizer, lr_scheduler)

    for epoch in range(num_epochs):
        # Training
        model.train()
        for batch in train_dataloader:
            outputs = model(**batch)
            loss = outputs.loss
            accelerator.backward(loss)
            optimizer.step()
            lr_scheduler.step()
            optimizer.zero_grad()

        # Evaluation
        model.eval()
        for batch in eval_dataloader:
            with torch.no_grad():
                outputs = model(**batch)
            predictions = outputs.logits.argmax(dim=-1)

            predictions, references = accelerator.gather_for_metrics(
                (predictions, batch["labels"])
            )
            metric.add_batch(
                predictions=predictions,
                references=references,
            )

        eval_metric = metric.compute()
        accelerator.print(f"epoch {epoch}:", eval_metric)

        # Report checkpoint and metrics to Ray Train
        # ==========================================
        with TemporaryDirectory() as tmpdir:
            if accelerator.is_main_process:
                unwrapped_model = accelerator.unwrap_model(model)
                accelerator.save(unwrapped_model, f"{tmpdir}/ckpt_{epoch}.bin")
                checkpoint = Checkpoint.from_directory(tmpdir)
            else:
                checkpoint = None
            ray.train.report(metrics=eval_metric, checkpoint=checkpoint)


if __name__ == "__main__":
    config = {
        "lr": 2e-5,
        "num_epochs": 3,
        "seed": 42,
        "train_batch_size": 16,
        "eval_batch_size": 32,
    }

    # Prepare Ray Datasets
    hf_datasets = load_dataset("glue", "mrpc")
    ray_datasets = {
        "train": ray.data.from_huggingface(hf_datasets["train"]),
        "validation": ray.data.from_huggingface(hf_datasets["validation"]),
    }
    config["train_dataset_size"] = ray_datasets["train"].count()

    trainer = TorchTrainer(
        train_func,
        train_loop_config=config,
        datasets=ray_datasets,
        dataset_config=DataConfig(datasets_to_split=["train", "validation"]),
        scaling_config=ScalingConfig(num_workers=4, use_gpu=True),
        # If running in a multi-node cluster, this is where you
        # should configure the run's persistent storage that is accessible
        # across all worker nodes.
        # run_config=ray.train.RunConfig(storage_path="s3://..."),
    )

    result = trainer.fit()

显示代码
"""
Minimal Ray Train + Accelerate example adapted from
https://github.com/huggingface/accelerate/blob/main/examples/nlp_example.py

Fine-tune a BERT model with Hugging Face Accelerate and Ray Train
"""

from tempfile import TemporaryDirectory

import evaluate
import torch
from accelerate import Accelerator
from datasets import load_dataset
from torch.optim import AdamW
from torch.utils.data import DataLoader
from transformers import (
    AutoModelForSequenceClassification,
    AutoTokenizer,
    get_linear_schedule_with_warmup,
    set_seed,
)

import ray.train
from ray.train import Checkpoint, ScalingConfig
from ray.train.torch import TorchTrainer


def train_func(config):
    """Your training function that will be launched on each worker."""

    # Unpack training configs
    lr = config["lr"]
    seed = config["seed"]
    num_epochs = config["num_epochs"]
    train_batch_size = config["train_batch_size"]
    eval_batch_size = config["eval_batch_size"]

    set_seed(seed)

    # Initialize accelerator
    accelerator = Accelerator()

    # Load datasets and metrics
    metric = evaluate.load("glue", "mrpc")

    # Prepare PyTorch DataLoaders
    # ====================================================
    hf_datasets = load_dataset("glue", "mrpc")

    tokenizer = AutoTokenizer.from_pretrained("bert-base-cased")

    def collate_fn(batch):
        outputs = tokenizer(
            [sample["sentence1"] for sample in batch],
            [sample["sentence2"] for sample in batch],
            truncation=True,
            padding="longest",
            return_tensors="pt",
        )
        outputs["labels"] = torch.LongTensor([sample["label"] for sample in batch])
        outputs = {k: v.to(accelerator.device) for k, v in outputs.items()}
        return outputs

    # Instantiate dataloaders.
    train_dataloader = DataLoader(
        hf_datasets["train"],
        shuffle=True,
        collate_fn=collate_fn,
        batch_size=train_batch_size,
        drop_last=True,
    )
    eval_dataloader = DataLoader(
        hf_datasets["validation"],
        shuffle=False,
        collate_fn=collate_fn,
        batch_size=eval_batch_size,
        drop_last=True,
    )
    # ====================================================

    # Instantiate the model, optimizer, lr_scheduler
    model = AutoModelForSequenceClassification.from_pretrained(
        "bert-base-cased", return_dict=True
    )

    optimizer = AdamW(params=model.parameters(), lr=lr)

    steps_per_epoch = len(train_dataloader)
    lr_scheduler = get_linear_schedule_with_warmup(
        optimizer=optimizer,
        num_warmup_steps=100,
        num_training_steps=(steps_per_epoch * num_epochs),
    )

    # Prepare everything with accelerator
    (
        model,
        optimizer,
        train_dataloader,
        eval_dataloader,
        lr_scheduler,
    ) = accelerator.prepare(
        model, optimizer, train_dataloader, eval_dataloader, lr_scheduler
    )

    for epoch in range(num_epochs):
        # Training
        model.train()
        for batch in train_dataloader:
            outputs = model(**batch)
            loss = outputs.loss
            accelerator.backward(loss)
            optimizer.step()
            lr_scheduler.step()
            optimizer.zero_grad()

        # Evaluation
        model.eval()
        for batch in eval_dataloader:
            with torch.no_grad():
                outputs = model(**batch)
            predictions = outputs.logits.argmax(dim=-1)

            predictions, references = accelerator.gather_for_metrics(
                (predictions, batch["labels"])
            )
            metric.add_batch(
                predictions=predictions,
                references=references,
            )

        eval_metric = metric.compute()
        accelerator.print(f"epoch {epoch}:", eval_metric)

        # Report Checkpoint and metrics to Ray Train
        # ==========================================
        with TemporaryDirectory() as tmpdir:
            if accelerator.is_main_process:
                unwrapped_model = accelerator.unwrap_model(model)
                accelerator.save(unwrapped_model, f"{tmpdir}/ckpt_{epoch}.bin")
                checkpoint = Checkpoint.from_directory(tmpdir)
            else:
                checkpoint = None
            ray.train.report(metrics=eval_metric, checkpoint=checkpoint)


if __name__ == "__main__":
    config = {
        "lr": 2e-5,
        "num_epochs": 3,
        "seed": 42,
        "train_batch_size": 16,
        "eval_batch_size": 32,
    }

    trainer = TorchTrainer(
        train_func,
        train_loop_config=config,
        scaling_config=ScalingConfig(num_workers=4, use_gpu=True),
        # If running in a multi-node cluster, this is where you
        # should configure the run's persistent storage that is accessible
        # across all worker nodes.
        # run_config=ray.train.RunConfig(storage_path="s3://..."),
    )

    result = trainer.fit()

另请参阅

如果您正在寻找更高级的用例,请查看此 Llama-2 微调示例

您可能还会发现这些用户指南有帮助

AccelerateTrainer 迁移指南#

在 Ray 2.7 之前,Ray Train 的 AccelerateTrainer API 是运行 Accelerate 代码的推荐方式。作为 TorchTrainer 的子类,AccelerateTrainer 接受由 accelerate config 生成的配置文件,并将其应用于所有 worker。除此之外,AccelerateTrainer 的功能与 TorchTrainer 完全相同。

然而,这导致了关于这是否是运行 Accelerate 代码的唯一方式的困惑。因为您可以使用 AcceleratorTorchTrainer 的组合表达全部的 Accelerate 功能,计划是在 Ray 2.8 中弃用 AccelerateTrainer,并建议您直接使用 TorchTrainer 运行您的 Accelerate 代码。