使用 TensorFlow/Keras 开始分布式训练#

Ray Train 的 TensorFlow 集成使你能够将 TensorFlow 和 Keras 的训练函数扩展到多台机器和多个 GPU 上。

从技术层面讲,Ray Train 会为你调度训练 Worker 并配置 TF_CONFIG,让你能够运行 MultiWorkerMirroredStrategy 训练脚本。更多信息请参阅 使用 TensorFlow 进行分布式训练

本指南中的大多数示例都使用带有 Keras 的 TensorFlow,但 Ray Train 也支持原始的 TensorFlow。

快速入门#

import ray
import tensorflow as tf

from ray import train
from ray.train import ScalingConfig
from ray.train.tensorflow import TensorflowTrainer
from ray.train.tensorflow.keras import ReportCheckpointCallback


# If using GPUs, set this to True.
use_gpu = False

a = 5
b = 10
size = 100


def build_model() -> tf.keras.Model:
    model = tf.keras.Sequential(
        [
            tf.keras.layers.InputLayer(input_shape=()),
            # Add feature dimension, expanding (batch_size,) to (batch_size, 1).
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(10),
            tf.keras.layers.Dense(1),
        ]
    )
    return model


def train_func(config: dict):
    batch_size = config.get("batch_size", 64)
    epochs = config.get("epochs", 3)

    strategy = tf.distribute.MultiWorkerMirroredStrategy()
    with strategy.scope():
        # Model building/compiling need to be within `strategy.scope()`.
        multi_worker_model = build_model()
        multi_worker_model.compile(
            optimizer=tf.keras.optimizers.SGD(learning_rate=config.get("lr", 1e-3)),
            loss=tf.keras.losses.mean_squared_error,
            metrics=[tf.keras.metrics.mean_squared_error],
        )

    dataset = train.get_dataset_shard("train")

    results = []
    for _ in range(epochs):
        tf_dataset = dataset.to_tf(
            feature_columns="x", label_columns="y", batch_size=batch_size
        )
        history = multi_worker_model.fit(
            tf_dataset, callbacks=[ReportCheckpointCallback()]
        )
        results.append(history.history)
    return results


config = {"lr": 1e-3, "batch_size": 32, "epochs": 4}

train_dataset = ray.data.from_items(
    [{"x": x / 200, "y": 2 * x / 200} for x in range(200)]
)
scaling_config = ScalingConfig(num_workers=2, use_gpu=use_gpu)
trainer = TensorflowTrainer(
    train_loop_per_worker=train_func,
    train_loop_config=config,
    scaling_config=scaling_config,
    datasets={"train": train_dataset},
)
result = trainer.fit()
print(result.metrics)

更新你的训练函数#

首先,更新你的训练函数以支持分布式训练。

注意

当前的 TensorFlow 实现支持 MultiWorkerMirroredStrategy(以及 MirroredStrategy)。如果你希望 Ray Train 支持其他策略,请在 GitHub 上提交功能请求

这些说明与 TensorFlow 的 使用 Keras 进行多 Worker 训练 教程非常相似。一个主要区别是 Ray Train 会为你处理环境变量的设置。

步骤 1:将模型包装在 MultiWorkerMirroredStrategy 中。

MultiWorkerMirroredStrategy 支持同步分布式训练。你必须在策略的作用域内构建和编译 Model

with tf.distribute.MultiWorkerMirroredStrategy().scope():
    model = ... # build model
    model.compile()

步骤 2:Dataset 的批大小更新为全局批大小。

适当设置 batch_size,因为 batch 会平均分配到 Worker 进程。

-batch_size = worker_batch_size
+batch_size = worker_batch_size * train.get_context().get_world_size()

警告

除了 “OMP_NUM_THREADS” 之外,Ray 不会自动设置任何与本地并行性或线程相关的环境变量或配置。如果你想更精细地控制 TensorFlow 的线程,可以在你的 train_loop_per_worker 函数开头使用 tf.config.threading 模块(例如 tf.config.threading.set_inter_op_parallelism_threads(num_cpus))。

创建 TensorflowTrainer#

Trainer 是 Ray Train 用于管理状态和执行训练的主要类。对于分布式 TensorFlow,请使用 TensorflowTrainer,你可以这样设置它

from ray.train import ScalingConfig
from ray.train.tensorflow import TensorflowTrainer
# For GPU Training, set `use_gpu` to True.
use_gpu = False
trainer = TensorflowTrainer(
    train_func,
    scaling_config=ScalingConfig(use_gpu=use_gpu, num_workers=2)
)

要自定义后端设置,你可以传递一个 TensorflowConfig

from ray.train import ScalingConfig
from ray.train.tensorflow import TensorflowTrainer, TensorflowConfig

trainer = TensorflowTrainer(
    train_func,
    tensorflow_backend=TensorflowConfig(...),
    scaling_config=ScalingConfig(num_workers=2),
)

有关更多可配置性,请参阅 DataParallelTrainer API。

运行训练函数#

有了分布式训练函数和 Ray Train Trainer,你现在就可以开始训练了。

trainer.fit()

加载和预处理数据#

TensorFlow 默认使用其自己的内部数据集分片策略,如指南所述。如果你的 TensorFlow 数据集与分布式加载兼容,则无需更改任何内容。

如果你需要更高级的预处理,可以考虑使用 Ray Data 进行分布式数据摄取。请参阅Ray Data 与 Ray Train

主要区别在于,你可能需要在训练函数中将 Ray Data 数据集分片转换为 TensorFlow 数据集,以便可以使用 Keras API 进行模型训练。

请参阅此示例以了解分布式数据加载。相关部分是

import tensorflow as tf
from ray import train
from ray.train.tensorflow import prepare_dataset_shard

def train_func(config: dict):
    # ...

    # Get dataset shard from Ray Train
    dataset_shard = train.get_context().get_dataset_shard("train")

    # Define a helper function to build a TensorFlow dataset
    def to_tf_dataset(dataset, batch_size):
        def to_tensor_iterator():
            for batch in dataset.iter_tf_batches(
                batch_size=batch_size, dtypes=tf.float32
            ):
                yield batch["image"], batch["label"]

        output_signature = (
            tf.TensorSpec(shape=(None, 784), dtype=tf.float32),
            tf.TensorSpec(shape=(None, 784), dtype=tf.float32),
        )
        tf_dataset = tf.data.Dataset.from_generator(
            to_tensor_iterator, output_signature=output_signature
        )
        # Call prepare_dataset_shard to disable automatic sharding
        # (since the dataset is already sharded)
        return prepare_dataset_shard(tf_dataset)

    for epoch in range(epochs):
        # Call our helper function to build the dataset
        tf_dataset = to_tf_dataset(
            dataset=dataset_shard,
            batch_size=64,
        )
        history = multi_worker_model.fit(tf_dataset)

报告结果#

在训练期间,训练循环应向 Ray Train 报告中间结果和检查点。此报告会将结果记录到控制台输出并附加到本地日志文件。日志记录还会触发检查点簿记

使用 Keras 报告结果最简单的方法是使用 ReportCheckpointCallback

from ray.train.tensorflow.keras import ReportCheckpointCallback

def train_func(config: dict):
    # ...
    for epoch in range(epochs):
        model.fit(dataset, callbacks=[ReportCheckpointCallback()])

此回调会自动将 Keras 训练函数的所有结果和检查点转发到 Ray Train。

聚合结果#

TensorFlow Keras 会自动聚合所有 Worker 的指标。如果你希望对此有更多控制权,请考虑实现自定义训练循环

保存和加载检查点#

你可以通过在训练函数中调用 train.report(metrics, checkpoint=Checkpoint(...)) 来保存Checkpoints。此调用会将分布式 Worker 的检查点状态保存到执行 Python 脚本的 Trainer 上。

你可以通过 Resultcheckpoint 属性访问最新的已保存检查点,并使用 best_checkpoints 属性访问最佳已保存检查点。

这些具体示例展示了 Ray Train 如何在分布式训练中适当保存检查点(模型权重而非模型)。

import json
import os
import tempfile

from ray import train
from ray.train import Checkpoint, ScalingConfig
from ray.train.tensorflow import TensorflowTrainer

import numpy as np

def train_func(config):
    import tensorflow as tf
    n = 100
    # create a toy dataset
    # data   : X - dim = (n, 4)
    # target : Y - dim = (n, 1)
    X = np.random.normal(0, 1, size=(n, 4))
    Y = np.random.uniform(0, 1, size=(n, 1))

    strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
    with strategy.scope():
        # toy neural network : 1-layer
        model = tf.keras.Sequential([tf.keras.layers.Dense(1, activation="linear", input_shape=(4,))])
        model.compile(optimizer="Adam", loss="mean_squared_error", metrics=["mse"])

    for epoch in range(config["num_epochs"]):
        history = model.fit(X, Y, batch_size=20)

        with tempfile.TemporaryDirectory() as temp_checkpoint_dir:
            model.save(os.path.join(temp_checkpoint_dir, "model.keras"))
            checkpoint_dict = os.path.join(temp_checkpoint_dir, "checkpoint.json")
            with open(checkpoint_dict, "w") as f:
                json.dump({"epoch": epoch}, f)
            checkpoint = Checkpoint.from_directory(temp_checkpoint_dir)

            train.report({"loss": history.history["loss"][0]}, checkpoint=checkpoint)

trainer = TensorflowTrainer(
    train_func,
    train_loop_config={"num_epochs": 5},
    scaling_config=ScalingConfig(num_workers=2),
)
result = trainer.fit()
print(result.checkpoint)

默认情况下,检查点会持久化到每次运行的日志目录中的本地磁盘。

加载检查点#

import os
import tempfile

from ray import train
from ray.train import Checkpoint, ScalingConfig
from ray.train.tensorflow import TensorflowTrainer

import numpy as np

def train_func(config):
    import tensorflow as tf
    n = 100
    # create a toy dataset
    # data   : X - dim = (n, 4)
    # target : Y - dim = (n, 1)
    X = np.random.normal(0, 1, size=(n, 4))
    Y = np.random.uniform(0, 1, size=(n, 1))

    strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
    with strategy.scope():
        # toy neural network : 1-layer
        checkpoint = train.get_checkpoint()
        if checkpoint:
            with checkpoint.as_directory() as checkpoint_dir:
                model = tf.keras.models.load_model(
                    os.path.join(checkpoint_dir, "model.keras")
                )
        else:
            model = tf.keras.Sequential(
                [tf.keras.layers.Dense(1, activation="linear", input_shape=(4,))]
            )
        model.compile(optimizer="Adam", loss="mean_squared_error", metrics=["mse"])

    for epoch in range(config["num_epochs"]):
        history = model.fit(X, Y, batch_size=20)

        with tempfile.TemporaryDirectory() as temp_checkpoint_dir:
            model.save(os.path.join(temp_checkpoint_dir, "model.keras"))
            extra_json = os.path.join(temp_checkpoint_dir, "checkpoint.json")
            with open(extra_json, "w") as f:
                json.dump({"epoch": epoch}, f)
            checkpoint = Checkpoint.from_directory(temp_checkpoint_dir)

            train.report({"loss": history.history["loss"][0]}, checkpoint=checkpoint)

trainer = TensorflowTrainer(
    train_func,
    train_loop_config={"num_epochs": 5},
    scaling_config=ScalingConfig(num_workers=2),
)
result = trainer.fit()
print(result.checkpoint)

# Start a new run from a loaded checkpoint
trainer = TensorflowTrainer(
    train_func,
    train_loop_config={"num_epochs": 5},
    scaling_config=ScalingConfig(num_workers=2),
    resume_from_checkpoint=result.checkpoint,
)
result = trainer.fit()

延伸阅读#

请参阅用户指南以探索更多主题