使用 TensorFlow/Keras 进行分布式训练入门#
Ray Train 的 TensorFlow 集成使您能够将 TensorFlow 和 Keras 训练函数扩展到多台机器和 GPU。
从技术上讲,Ray Train 会为您调度训练工作进程并配置 TF_CONFIG,使您能够运行 MultiWorkerMirroredStrategy 训练脚本。有关更多信息,请参阅 使用 TensorFlow 进行分布式训练。
本指南中的大多数示例都使用 TensorFlow 和 Keras,但 Ray Train 也支持纯 TensorFlow。
快速入门#
import ray
import tensorflow as tf
from ray import train
from ray.train import ScalingConfig
from ray.train.tensorflow import TensorflowTrainer
from ray.train.tensorflow.keras import ReportCheckpointCallback
# If using GPUs, set this to True.
use_gpu = False
a = 5
b = 10
size = 100
def build_model() -> tf.keras.Model:
model = tf.keras.Sequential(
[
tf.keras.layers.InputLayer(input_shape=()),
# Add feature dimension, expanding (batch_size,) to (batch_size, 1).
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(10),
tf.keras.layers.Dense(1),
]
)
return model
def train_func(config: dict):
batch_size = config.get("batch_size", 64)
epochs = config.get("epochs", 3)
strategy = tf.distribute.MultiWorkerMirroredStrategy()
with strategy.scope():
# Model building/compiling need to be within `strategy.scope()`.
multi_worker_model = build_model()
multi_worker_model.compile(
optimizer=tf.keras.optimizers.SGD(learning_rate=config.get("lr", 1e-3)),
loss=tf.keras.losses.mean_squared_error,
metrics=[tf.keras.metrics.mean_squared_error],
)
dataset = train.get_dataset_shard("train")
results = []
for _ in range(epochs):
tf_dataset = dataset.to_tf(
feature_columns="x", label_columns="y", batch_size=batch_size
)
history = multi_worker_model.fit(
tf_dataset, callbacks=[ReportCheckpointCallback()]
)
results.append(history.history)
return results
config = {"lr": 1e-3, "batch_size": 32, "epochs": 4}
train_dataset = ray.data.from_items(
[{"x": x / 200, "y": 2 * x / 200} for x in range(200)]
)
scaling_config = ScalingConfig(num_workers=2, use_gpu=use_gpu)
trainer = TensorflowTrainer(
train_loop_per_worker=train_func,
train_loop_config=config,
scaling_config=scaling_config,
datasets={"train": train_dataset},
)
result = trainer.fit()
print(result.metrics)
更新您的训练函数#
首先,更新您的 训练函数 以支持分布式训练。
注意
当前的 TensorFlow 实现支持 MultiWorkerMirroredStrategy(以及 MirroredStrategy)。如果您希望 Ray Train 支持其他策略,请在 GitHub 上提交 功能请求。
这些说明紧密遵循 TensorFlow 的 使用 Keras 进行多工作进程训练 教程。一个关键区别是 Ray Train 会为您处理环境变量的设置。
步骤 1:将您的模型包装在 MultiWorkerMirroredStrategy 中。
MultiWorkerMirroredStrategy 支持同步分布式训练。您必须在策略的作用域内构建和编译 Model。
with tf.distribute.MultiWorkerMirroredStrategy().scope():
model = ... # build model
model.compile()
步骤 2:将您的 Dataset 批次大小更新为全局批次大小。
适当设置 batch_size,因为 batch 会在工作进程之间平均分割。
-batch_size = worker_batch_size
+batch_size = worker_batch_size * train.get_context().get_world_size()
警告
Ray 不会自动设置任何与本地并行性或线程相关的环境变量或配置(“OMP_NUM_THREADS”除外)。如果您希望更精细地控制 TensorFlow 线程,请在 train_loop_per_worker 函数的开头使用 tf.config.threading 模块(例如 tf.config.threading.set_inter_op_parallelism_threads(num_cpus))。
创建 TensorflowTrainer#
Trainer 是 Ray Train 用于管理状态和执行训练的主要类。对于分布式 TensorFlow,请使用 TensorflowTrainer,您可以这样设置它:
from ray.train import ScalingConfig
from ray.train.tensorflow import TensorflowTrainer
# For GPU Training, set `use_gpu` to True.
use_gpu = False
trainer = TensorflowTrainer(
train_func,
scaling_config=ScalingConfig(use_gpu=use_gpu, num_workers=2)
)
要自定义后端设置,您可以传递一个 TensorflowConfig。
from ray.train import ScalingConfig
from ray.train.tensorflow import TensorflowTrainer, TensorflowConfig
trainer = TensorflowTrainer(
train_func,
tensorflow_backend=TensorflowConfig(...),
scaling_config=ScalingConfig(num_workers=2),
)
有关更多配置选项,请参阅 DataParallelTrainer API。
运行训练函数#
有了分布式训练函数和 Ray Train Trainer,您现在就可以开始训练了。
trainer.fit()
加载和预处理数据#
TensorFlow 默认使用其内部数据集分片策略,如指南中所述。如果您的 TensorFlow 数据集与分布式加载兼容,则无需更改任何内容。
如果您需要更高级的预处理,可以考虑使用 Ray Data 进行分布式数据摄取。请参阅 Ray Data 与 Ray Train。
主要区别在于,您可能希望在训练函数中将 Ray Data 数据集分片转换为 TensorFlow 数据集,以便使用 Keras API 进行模型训练。
请参阅此示例以了解分布式数据加载。相关部分是:
import tensorflow as tf
from ray import train
from ray.train.tensorflow import prepare_dataset_shard
def train_func(config: dict):
# ...
# Get dataset shard from Ray Train
dataset_shard = train.get_context().get_dataset_shard("train")
# Define a helper function to build a TensorFlow dataset
def to_tf_dataset(dataset, batch_size):
def to_tensor_iterator():
for batch in dataset.iter_tf_batches(
batch_size=batch_size, dtypes=tf.float32
):
yield batch["image"], batch["label"]
output_signature = (
tf.TensorSpec(shape=(None, 784), dtype=tf.float32),
tf.TensorSpec(shape=(None, 784), dtype=tf.float32),
)
tf_dataset = tf.data.Dataset.from_generator(
to_tensor_iterator, output_signature=output_signature
)
# Call prepare_dataset_shard to disable automatic sharding
# (since the dataset is already sharded)
return prepare_dataset_shard(tf_dataset)
for epoch in range(epochs):
# Call our helper function to build the dataset
tf_dataset = to_tf_dataset(
dataset=dataset_shard,
batch_size=64,
)
history = multi_worker_model.fit(tf_dataset)
报告结果#
在训练期间,训练循环应向 Ray Train 报告中间结果和检查点。此报告会将结果记录到控制台输出,并将它们追加到本地日志文件。日志记录还会触发检查点簿记。
使用 Keras 报告结果的最简单方法是使用 ReportCheckpointCallback。
from ray.train.tensorflow.keras import ReportCheckpointCallback
def train_func(config: dict):
# ...
for epoch in range(epochs):
model.fit(dataset, callbacks=[ReportCheckpointCallback()])
此回调会自动将 Keras 训练函数中的所有结果和检查点转发给 Ray Train。
聚合结果#
TensorFlow Keras 会自动聚合所有工作进程的指标。如果您希望对此进行更多控制,请考虑实现自定义训练循环。
保存和加载检查点#
您可以通过在训练函数中调用 train.report(metrics, checkpoint=Checkpoint(...)) 来保存 Checkpoints。此调用会将分布式工作进程中的检查点状态保存在执行 Python 脚本的 Trainer 上。
您可以通过 Trainer 的 checkpoint 属性访问最新保存的检查点,并通过 best_checkpoints 属性访问最佳保存的检查点。
这些具体示例演示了 Ray Train 在分布式训练中如何适当地保存检查点和模型权重(但不保存模型)。
import json
import os
import tempfile
from ray import train
from ray.train import Checkpoint, ScalingConfig
from ray.train.tensorflow import TensorflowTrainer
import numpy as np
def train_func(config):
import tensorflow as tf
n = 100
# create a toy dataset
# data : X - dim = (n, 4)
# target : Y - dim = (n, 1)
X = np.random.normal(0, 1, size=(n, 4))
Y = np.random.uniform(0, 1, size=(n, 1))
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
with strategy.scope():
# toy neural network : 1-layer
model = tf.keras.Sequential([tf.keras.layers.Dense(1, activation="linear", input_shape=(4,))])
model.compile(optimizer="Adam", loss="mean_squared_error", metrics=["mse"])
for epoch in range(config["num_epochs"]):
history = model.fit(X, Y, batch_size=20)
with tempfile.TemporaryDirectory() as temp_checkpoint_dir:
model.save(os.path.join(temp_checkpoint_dir, "model.keras"))
checkpoint_dict = os.path.join(temp_checkpoint_dir, "checkpoint.json")
with open(checkpoint_dict, "w") as f:
json.dump({"epoch": epoch}, f)
checkpoint = Checkpoint.from_directory(temp_checkpoint_dir)
train.report({"loss": history.history["loss"][0]}, checkpoint=checkpoint)
trainer = TensorflowTrainer(
train_func,
train_loop_config={"num_epochs": 5},
scaling_config=ScalingConfig(num_workers=2),
)
result = trainer.fit()
print(result.checkpoint)
默认情况下,检查点会持久化到每个运行的日志目录中的本地磁盘。
加载检查点#
import os
import tempfile
from ray import train
from ray.train import Checkpoint, ScalingConfig
from ray.train.tensorflow import TensorflowTrainer
import numpy as np
def train_func(config):
import tensorflow as tf
n = 100
# create a toy dataset
# data : X - dim = (n, 4)
# target : Y - dim = (n, 1)
X = np.random.normal(0, 1, size=(n, 4))
Y = np.random.uniform(0, 1, size=(n, 1))
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
with strategy.scope():
# toy neural network : 1-layer
checkpoint = train.get_checkpoint()
if checkpoint:
with checkpoint.as_directory() as checkpoint_dir:
model = tf.keras.models.load_model(
os.path.join(checkpoint_dir, "model.keras")
)
else:
model = tf.keras.Sequential(
[tf.keras.layers.Dense(1, activation="linear", input_shape=(4,))]
)
model.compile(optimizer="Adam", loss="mean_squared_error", metrics=["mse"])
for epoch in range(config["num_epochs"]):
history = model.fit(X, Y, batch_size=20)
with tempfile.TemporaryDirectory() as temp_checkpoint_dir:
model.save(os.path.join(temp_checkpoint_dir, "model.keras"))
extra_json = os.path.join(temp_checkpoint_dir, "checkpoint.json")
with open(extra_json, "w") as f:
json.dump({"epoch": epoch}, f)
checkpoint = Checkpoint.from_directory(temp_checkpoint_dir)
train.report({"loss": history.history["loss"][0]}, checkpoint=checkpoint)
trainer = TensorflowTrainer(
train_func,
train_loop_config={"num_epochs": 5},
scaling_config=ScalingConfig(num_workers=2),
)
result = trainer.fit()
print(result.checkpoint)
进一步阅读#
请参阅 用户指南 以探索更多主题。