处理失败和节点抢占(已弃用的 API)#
自动从训练工作节点故障中恢复#
Ray Train 内置了容错机制,可以从工作节点故障(即 RayActorError)中恢复。检测到故障时,工作节点将被关闭,并添加新的工作节点。
训练函数将被重新启动,但可以通过检查点恢复之前的执行进度。
每次从工作节点故障中恢复都被视为一次重试。重试次数可以通过传递给 Trainer 的 RunConfig 中设置的 FailureConfig 参数的 max_failures 属性进行配置。
import ray.train
# Tries to recover a run up to this many times.
failure_config = ray.train.FailureConfig(max_failures=2)
# No limit on the number of retries.
failure_config = ray.train.FailureConfig(max_failures=-1)
将恢复哪个检查点?#
Ray Train 将自动从报告给 Ray Train 的最新可用 检查点 恢复训练。
这将是传递给 train.report() 的最后一个检查点。
恢复 Ray Train 实验#
在实验级别,Trainer 恢复允许您从中断处继续之前中断的实验。
Train 实验可能由于以下原因之一而被中断
实验被手动中断(例如,Ctrl+C,或被抢占的头部节点实例)。
头部节点崩溃(例如,OOM 或其他运行时错误)。
整个集群宕机(例如,影响所有节点的网络错误)。
Trainer 恢复适用于 Ray Train 的所有内置 Trainer,但我们在示例中使用 TorchTrainer 进行演示。我们还使用 <Framework>Trainer 来指代所有内置 Trainer 共享的方法。
假设您的初始 Train 实验配置如下。实际的训练循环仅用于演示:重要的是已实现了 保存 和 加载检查点。
import os
import tempfile
from typing import Dict, Optional
import torch
import ray
from ray import train
from ray.train import Checkpoint
from ray.train.torch import TorchTrainer
def get_datasets() -> Dict[str, ray.data.Dataset]:
return {"train": ray.data.from_items([{"x": i, "y": 2 * i} for i in range(10)])}
def train_loop_per_worker(config: dict):
from torchvision.models import resnet18
model = resnet18()
# Checkpoint loading
checkpoint: Optional[Checkpoint] = train.get_checkpoint()
if checkpoint:
with checkpoint.as_directory() as checkpoint_dir:
model_state_dict = torch.load(os.path.join(checkpoint_dir, "model.pt"))
model.load_state_dict(model_state_dict)
model = train.torch.prepare_model(model)
train_ds = train.get_dataset_shard("train")
for epoch in range(5):
# Do some training...
# Checkpoint saving
with tempfile.TemporaryDirectory() as tmpdir:
torch.save(model.module.state_dict(), os.path.join(tmpdir, "model.pt"))
train.report({"epoch": epoch}, checkpoint=Checkpoint.from_directory(tmpdir))
trainer = TorchTrainer(
train_loop_per_worker=train_loop_per_worker,
datasets=get_datasets(),
scaling_config=train.ScalingConfig(num_workers=2),
run_config=train.RunConfig(
name="dl_trainer_restore", storage_path=os.path.expanduser("~/ray_results")
),
)
result = trainer.fit()
实验的结果和检查点会保存到 RunConfig 配置的路径中。如果实验因上述原因之一中断,请使用此路径进行恢复。
from ray.train.torch import TorchTrainer
restored_trainer = TorchTrainer.restore(
path=os.path.expanduser("~/ray_results/dl_trainer_restore"),
datasets=get_datasets(),
)
提示
您也可以从远程路径恢复(例如,从存储在 s3 存储桶中的实验目录)。
original_trainer = TorchTrainer(
# ...
run_config=train.RunConfig(
# Configure cloud storage
storage_path="s3://results-bucket",
name="dl_trainer_restore",
),
)
result = trainer.fit()
restored_trainer = TorchTrainer.restore(
"s3://results-bucket/dl_trainer_restore",
datasets=get_datasets(),
)
注意
不同的 Trainer 在恢复时可能允许重新指定更多可选参数。只有 **数据集** 是在恢复时必需重新指定的,前提是它们在最初时已提供。
TorchTrainer.restore、TensorflowTrainer.restore 和 HorovodTrainer.restore 可以接受与其父类 DataParallelTrainer.restore 相同的参数。
除非另有说明,否则其他 Trainer 将接受与 BaseTrainer.restore 相同的参数。
自动恢复#
添加下面的分支逻辑将允许您在中断后运行相同的脚本,并从上次运行中断的地方继续训练。请注意,我们使用 <Framework>Trainer.can_restore 实用方法来确定给定实验目录的存在性和有效性。
experiment_path = os.path.expanduser("~/ray_results/dl_restore_autoresume")
if TorchTrainer.can_restore(experiment_path):
trainer = TorchTrainer.restore(experiment_path, datasets=get_datasets())
result = trainer.fit()
else:
trainer = TorchTrainer(
train_loop_per_worker=train_loop_per_worker,
datasets=get_datasets(),
scaling_config=train.ScalingConfig(num_workers=2),
run_config=train.RunConfig(
storage_path=os.path.expanduser("~/ray_results"),
name="dl_restore_autoresume",
),
)
result = trainer.fit()
另请参阅
有关完整示例,请参阅 BaseTrainer.restore 的文档字符串。
注意
<Framework>Trainer.restore 与 <Framework>Trainer(..., resume_from_checkpoint=...) 不同。 resume_from_checkpoint 用于启动一个**新**的 Train 实验,该实验会将结果写入新目录并从迭代 0 开始。
<Framework>Trainer.restore 用于继续现有实验,新的结果将继续追加到现有日志中。